🪻 distributed transcription service thistle.dunkirk.sh

feat: use the new reconnection logic from murmur

dunkirk.sh a342a634 c58eeb51

verified
Changed files
+325 -87
src
whisper-server
+69 -14
src/components/transcription.ts
···
interface TranscriptionJob {
id: string;
filename: string;
-
status: "uploading" | "processing" | "completed" | "failed";
+
status: "uploading" | "processing" | "transcribing" | "completed" | "failed";
progress: number;
transcript?: string;
created_at: number;
···
}
.status-processing {
+
background: color-mix(in srgb, var(--primary) 10%, transparent);
+
color: var(--primary);
+
}
+
+
.status-transcribing {
background: color-mix(in srgb, var(--accent) 10%, transparent);
color: var(--accent);
}
···
background: var(--secondary);
border-radius: 2px;
margin-bottom: 1rem;
+
overflow: hidden;
+
position: relative;
}
.progress-fill {
···
transition: width 0.3s;
}
+
.progress-fill.indeterminate {
+
width: 30%;
+
background: var(--primary);
+
animation: progress-slide 1.5s ease-in-out infinite;
+
}
+
+
@keyframes progress-slide {
+
0% {
+
transform: translateX(-100%);
+
}
+
100% {
+
transform: translateX(333%);
+
}
+
}
+
.job-transcript {
background: color-mix(in srgb, var(--primary) 5%, transparent);
border-radius: 6px;
···
private eventSources: Map<string, EventSource> = new Map();
private handleAuthChange = async () => {
+
await this.checkHealth();
await this.loadJobs();
this.connectToJobStreams();
};
override async connectedCallback() {
super.connectedCallback();
+
await this.checkHealth();
await this.loadJobs();
this.connectToJobStreams();
···
private connectToJobStreams() {
// Connect to SSE streams for active jobs
for (const job of this.jobs) {
-
if (job.status === "processing" || job.status === "uploading") {
+
if (
+
job.status === "processing" ||
+
job.status === "transcribing" ||
+
job.status === "uploading"
+
) {
this.connectToJobStream(job.id);
}
}
···
const eventSource = new EventSource(`/api/transcriptions/${jobId}/stream`);
-
eventSource.onmessage = (event) => {
+
// Handle named "update" events from SSE stream
+
eventSource.addEventListener("update", (event) => {
const update = JSON.parse(event.data);
// Update the job in our list efficiently (mutate in place for Lit)
···
this.eventSources.delete(jobId);
}
}
-
};
+
});
eventSource.onerror = (error) => {
console.warn(`SSE connection error for job ${jobId}:`, error);
eventSource.close();
this.eventSources.delete(jobId);
+
// Check if the job still exists before retrying
+
const job = this.jobs.find((j) => j.id === jobId);
+
if (!job) {
+
console.log(`Job ${jobId} no longer exists, skipping retry`);
+
return;
+
}
+
+
// Don't retry if job is already in a terminal state
+
if (job.status === "completed" || job.status === "failed") {
+
console.log(`Job ${jobId} is ${job.status}, skipping retry`);
+
return;
+
}
+
// Retry connection up to 3 times with exponential backoff
if (retryCount < 3) {
const backoff = 2 ** retryCount * 1000; // 1s, 2s, 4s
···
this.eventSources.set(jobId, eventSource);
}
+
async checkHealth() {
+
try {
+
const response = await fetch("/api/transcriptions/health");
+
if (response.ok) {
+
const data = await response.json();
+
this.serviceAvailable = data.available;
+
} else {
+
this.serviceAvailable = false;
+
}
+
} catch {
+
this.serviceAvailable = false;
+
}
+
}
+
async loadJobs() {
try {
const response = await fetch("/api/transcriptions");
if (response.ok) {
const data = await response.json();
this.jobs = data.jobs;
-
this.serviceAvailable = true;
+
// Don't override serviceAvailable - it's set by checkHealth()
} else if (response.status === 404) {
// Transcription service not available - show empty state
this.jobs = [];
-
this.serviceAvailable = false;
} else {
console.error("Failed to load jobs:", response.status);
-
this.serviceAvailable = false;
}
} catch (error) {
// Network error or service unavailable - don't break the page
console.warn("Transcription service unavailable:", error);
this.jobs = [];
-
this.serviceAvailable = false;
}
}
···
return;
}
-
if (file.size > 25 * 1024 * 1024) {
-
// 25MB limit
-
alert("File size must be less than 25MB");
+
if (file.size > 100 * 1024 * 1024) {
+
// 100MB limit
+
alert("File size must be less than 100MB");
return;
}
···
}
</div>
<div class="upload-hint">
-
${this.serviceAvailable ? "Supports MP3, WAV, M4A, AAC, OGG, WebM, FLAC up to 25MB - Requires faster-whisper server" : "Transcription service unavailable"}
+
${this.serviceAvailable ? "Supports MP3, WAV, M4A, AAC, OGG, WebM, FLAC up to 100MB" : "Transcription is currently unavailable"}
</div>
<input type="file" class="file-input" accept="audio/mpeg,audio/wav,audio/m4a,audio/mp4,audio/aac,audio/ogg,audio/webm,audio/flac,.m4a" @change=${this.handleFileSelect} ${!this.serviceAvailable ? "disabled" : ""} />
</div>
···
</div>
${
-
job.status === "uploading" || job.status === "processing"
+
job.status === "uploading" ||
+
job.status === "processing" ||
+
job.status === "transcribing"
? html`
<div class="progress-bar">
-
<div class="progress-fill" style="width: ${job.progress}%"></div>
+
<div class="progress-fill ${job.status === "processing" ? "indeterminate" : ""}" style="${job.status === "processing" ? "" : `width: ${job.progress}%`}"></div>
</div>
`
: ""
+8
src/db/schema.ts
···
CREATE INDEX IF NOT EXISTS idx_transcriptions_status ON transcriptions(status);
`,
},
+
{
+
version: 3,
+
name: "Add whisper_job_id to transcriptions",
+
sql: `
+
ALTER TABLE transcriptions ADD COLUMN whisper_job_id TEXT;
+
CREATE INDEX IF NOT EXISTS idx_transcriptions_whisper_job_id ON transcriptions(whisper_job_id);
+
`,
+
},
];
function getCurrentVersion(): number {
+71 -7
src/index.ts
···
await Bun.write("./uploads/.gitkeep", "");
// Initialize transcription system
+
console.log(
+
`[Transcription] Connecting to Murmur at ${WHISPER_SERVICE_URL}...`,
+
);
const transcriptionEvents = new TranscriptionEventEmitter();
const whisperService = new WhisperServiceManager(
WHISPER_SERVICE_URL,
···
setInterval(cleanupExpiredSessions, 60 * 60 * 1000);
// Sync with Whisper DB on startup
-
await whisperService.syncWithWhisper();
+
try {
+
await whisperService.syncWithWhisper();
+
console.log("[Transcription] Successfully connected to Murmur");
+
} catch (error) {
+
console.warn(
+
"[Transcription] Murmur unavailable at startup:",
+
error instanceof Error ? error.message : "Unknown error",
+
);
+
}
// Periodic sync every 5 minutes as backup (SSE handles real-time updates)
setInterval(() => whisperService.syncWithWhisper(), 5 * 60 * 1000);
···
{ status: 404 },
);
}
-
// Event-driven SSE stream (NO POLLING!)
+
// Event-driven SSE stream with reconnection support
const stream = new ReadableStream({
start(controller) {
const encoder = new TextEncoder();
+
let isClosed = false;
+
let lastEventId = Math.floor(Date.now() / 1000);
const sendEvent = (data: Partial<TranscriptionUpdate>) => {
-
controller.enqueue(
-
encoder.encode(`data: ${JSON.stringify(data)}\n\n`),
-
);
+
if (isClosed) return;
+
try {
+
// Send event ID for reconnection support
+
lastEventId = Math.floor(Date.now() / 1000);
+
controller.enqueue(
+
encoder.encode(
+
`id: ${lastEventId}\nevent: update\ndata: ${JSON.stringify(data)}\n\n`,
+
),
+
);
+
} catch {
+
// Controller already closed (client disconnected)
+
isClosed = true;
+
}
+
};
+
+
const sendHeartbeat = () => {
+
if (isClosed) return;
+
try {
+
controller.enqueue(encoder.encode(": heartbeat\n\n"));
+
} catch {
+
isClosed = true;
+
}
};
// Send initial state from DB
const current = db
···
current?.status === "completed" ||
current?.status === "failed"
) {
+
isClosed = true;
controller.close();
return;
}
+
// Send heartbeats every 2.5 seconds to keep connection alive
+
const heartbeatInterval = setInterval(sendHeartbeat, 2500);
+
// Subscribe to EventEmitter for live updates
const updateHandler = (data: TranscriptionUpdate) => {
+
if (isClosed) return;
+
// Only send changed fields to save bandwidth
const payload: Partial<TranscriptionUpdate> = {
status: data.status,
···
// Close stream when done
if (data.status === "completed" || data.status === "failed") {
+
isClosed = true;
+
clearInterval(heartbeatInterval);
transcriptionEvents.off(transcriptionId, updateHandler);
controller.close();
}
···
transcriptionEvents.on(transcriptionId, updateHandler);
// Cleanup on client disconnect
return () => {
+
isClosed = true;
+
clearInterval(heartbeatInterval);
transcriptionEvents.off(transcriptionId, updateHandler);
};
},
···
});
},
},
+
"/api/transcriptions/health": {
+
GET: async () => {
+
const isHealthy = await whisperService.checkHealth();
+
return Response.json({ available: isHealthy });
+
},
+
},
"/api/transcriptions": {
GET: (req) => {
try {
···
if (!file) throw ValidationErrors.missingField("audio");
-
if (!file.type.startsWith("audio/")) {
+
// Validate file type
+
const fileExtension = file.name.split(".").pop()?.toLowerCase();
+
const allowedExtensions = [
+
"mp3",
+
"wav",
+
"m4a",
+
"aac",
+
"ogg",
+
"webm",
+
"flac",
+
"mp4",
+
];
+
const isAudioType =
+
file.type.startsWith("audio/") || file.type === "video/mp4";
+
const isAudioExtension =
+
fileExtension && allowedExtensions.includes(fileExtension);
+
+
if (!isAudioType && !isAudioExtension) {
throw ValidationErrors.unsupportedFileType(
"MP3, WAV, M4A, AAC, OGG, WebM, FLAC",
);
···
// Generate unique filename
const transcriptionId = crypto.randomUUID();
-
const fileExtension = file.name.split(".").pop();
const filename = `${transcriptionId}.${fileExtension}`;
// Save file to disk
+176 -65
src/lib/transcription.ts
···
import { ErrorCode } from "./errors";
// Constants
-
export const MAX_FILE_SIZE = 25 * 1024 * 1024; // 25MB
+
export const MAX_FILE_SIZE = 100 * 1024 * 1024; // 100MB
export const MAX_TRANSCRIPT_LENGTH = 50000;
export const MAX_ERROR_LENGTH = 255;
···
export type TranscriptionStatus =
| "uploading"
| "processing"
+
| "transcribing"
| "completed"
| "failed";
···
private events: TranscriptionEventEmitter,
) {}
+
async checkHealth(): Promise<boolean> {
+
try {
+
const response = await fetch(`${this.serviceUrl}/jobs`, {
+
method: "GET",
+
});
+
return response.ok;
+
} catch {
+
return false;
+
}
+
}
+
async startTranscription(
transcriptionId: string,
filename: string,
···
const filePath = `./uploads/${filename}`;
const fileBuffer = await Bun.file(filePath).arrayBuffer();
-
// Create form data for the faster-whisper server
+
// Create form data for the Murmur server
const formData = new FormData();
const file = new File([fileBuffer], filename, { type: "audio/mpeg" });
formData.append("file", file);
-
// Call the faster-whisper server to start transcription
+
// Call the Murmur server to start transcription
const response = await fetch(`${this.serviceUrl}/transcribe`, {
method: "POST",
body: formData,
···
const { job_id } = await response.json();
-
// Connect to SSE stream from Whisper
+
// Store Murmur's job_id in our database for tracking
+
this.db.run("UPDATE transcriptions SET whisper_job_id = ? WHERE id = ?", [
+
job_id,
+
transcriptionId,
+
]);
+
+
// Connect to SSE stream from Murmur (use the job_id returned by Murmur)
this.streamWhisperJob(transcriptionId, job_id, filePath);
} catch (error) {
console.error(
···
const es = createEventSource({
url: `${this.serviceUrl}/transcribe/${jobId}/stream`,
-
onMessage: ({ data }) => {
+
onMessage: ({ event, data }) => {
try {
+
// Handle "error" events from SSE (e.g., "Job not found")
+
if (event === "error") {
+
const errorData = JSON.parse(data) as { error: string };
+
console.error(
+
`[Stream] Whisper service error for ${transcriptionId}:`,
+
errorData.error,
+
);
+
+
// Mark the job as failed in our database
+
this.updateTranscription(transcriptionId, {
+
status: "failed",
+
error_message: errorData.error,
+
});
+
+
this.events.emit(transcriptionId, {
+
status: "failed",
+
progress: 0,
+
error_message: errorData.error,
+
error_code: ErrorCode.TRANSCRIPTION_FAILED,
+
});
+
+
this.closeStream(transcriptionId);
+
return;
+
}
+
const update = JSON.parse(data) as WhisperJob;
-
this.handleWhisperUpdate(transcriptionId, jobId, filePath, update);
+
this.handleWhisperUpdate(transcriptionId, filePath, update);
} catch (err) {
console.error(
`[Stream] Error processing update for ${transcriptionId}:`,
···
private handleWhisperUpdate(
transcriptionId: string,
-
jobId: string,
filePath: string,
update: WhisperJob,
) {
+
if (update.status === "pending") {
+
// Initial status, no action needed
+
return;
+
}
+
if (update.status === "processing") {
-
const progress = Math.max(10, Math.min(95, update.progress ?? 0));
-
this.updateTranscription(transcriptionId, { progress });
+
// Murmur is initializing (file I/O, WhisperKit setup) - no transcript yet
+
const progress = Math.min(100, update.progress ?? 0);
+
+
this.updateTranscription(transcriptionId, {
+
status: "processing",
+
progress,
+
});
this.events.emit(transcriptionId, {
status: "processing",
progress,
});
+
} else if (update.status === "transcribing") {
+
// Active transcription with progress callbacks
+
const progress = Math.min(100, update.progress ?? 0);
+
+
// If progress is still 0, keep status as "processing" until real progress starts
+
const status = progress === 0 ? "processing" : "transcribing";
+
+
// Strip WhisperKit special tokens from intermediate transcript
+
let transcript = update.transcript ?? "";
+
transcript = transcript.replace(/<\|[^|]+\|>/g, "").trim();
+
+
this.updateTranscription(transcriptionId, {
+
status,
+
progress,
+
transcript,
+
});
+
+
this.events.emit(transcriptionId, {
+
status,
+
progress,
+
transcript: transcript || undefined,
+
});
} else if (update.status === "completed") {
+
// Final transcript should already have tokens stripped by Murmur
const transcript = (update.transcript ?? "").substring(
0,
MAX_TRANSCRIPT_LENGTH,
···
transcript,
});
-
// Clean up
-
this.cleanupJob(transcriptionId, jobId, filePath);
+
// Only close stream and delete local file - keep Whisper job for potential replay/debugging
+
this.closeStream(transcriptionId);
+
this.deleteLocalFile(filePath);
} else if (update.status === "failed") {
const errorMessage = (
update.error_message ?? "Transcription failed"
···
error_code: ErrorCode.TRANSCRIPTION_FAILED,
});
+
// Only close stream - keep failed jobs in Whisper for debugging
this.closeStream(transcriptionId);
-
this.deleteWhisperJob(jobId);
}
}
-
private cleanupJob(transcriptionId: string, jobId: string, filePath: string) {
-
// Delete uploaded file
-
Bun.file(filePath)
-
.text()
-
.then(() => Bun.write(filePath, ""))
-
.catch(() => {});
-
-
this.closeStream(transcriptionId);
-
this.deleteWhisperJob(jobId);
-
}
-
private closeStream(transcriptionId: string) {
const es = this.activeStreams.get(transcriptionId);
if (es) {
···
this.streamLocks.delete(transcriptionId);
}
-
private async deleteWhisperJob(jobId: string) {
-
try {
-
await fetch(`${this.serviceUrl}/transcribe/${jobId}`, {
-
method: "DELETE",
-
});
-
} catch {
-
// Silent fail - job may already be deleted
-
}
+
private deleteLocalFile(filePath: string) {
+
// Delete uploaded file from disk
+
Bun.file(filePath)
+
.text()
+
.then(() => Bun.write(filePath, ""))
+
.catch(() => {});
}
private updateTranscription(
···
}
async syncWithWhisper(): Promise<void> {
-
try {
-
const whisperJobs = await this.fetchWhisperJobs();
-
if (!whisperJobs) return;
+
const whisperJobs = await this.fetchWhisperJobs();
+
if (!whisperJobs) {
+
throw new Error("Murmur service unavailable");
+
}
-
const activeDbJobs = this.getActiveDbJobs();
-
const activeJobsMap = new Map(activeDbJobs.map((j) => [j.id, j]));
+
const activeDbJobs = this.getActiveDbJobs();
+
const activeJobsMap = new Map(activeDbJobs.map((j) => [j.id, j]));
-
await this.syncWhisperJobsToDb(whisperJobs, activeJobsMap);
-
await this.syncDbJobsToWhisper(activeDbJobs, whisperJobs);
-
} catch (error) {
-
console.warn(
-
"[Sync] Failed:",
-
error instanceof Error ? error.message : "Unknown error",
-
);
-
}
+
await this.syncWhisperJobsToDb(whisperJobs, activeJobsMap);
+
await this.syncDbJobsToWhisper(activeDbJobs, whisperJobs);
}
private async fetchWhisperJobs(): Promise<WhisperJob[] | null> {
···
private getActiveDbJobs(): Array<{
id: string;
+
whisper_job_id: string | null;
filename: string;
status: string;
}> {
return this.db
-
.query<{ id: string; filename: string; status: string }, []>(
-
"SELECT id, filename, status FROM transcriptions WHERE status IN ('uploading', 'processing')",
+
.query<
+
{
+
id: string;
+
whisper_job_id: string | null;
+
filename: string;
+
status: string;
+
},
+
[]
+
>(
+
"SELECT id, whisper_job_id, filename, status FROM transcriptions WHERE status IN ('uploading', 'processing', 'transcribing')",
)
.all();
}
···
whisperJobs: WhisperJob[],
activeJobsMap: Map<
string,
-
{ id: string; filename: string; status: string }
+
{
+
id: string;
+
whisper_job_id: string | null;
+
filename: string;
+
status: string;
+
}
>,
) {
for (const whisperJob of whisperJobs) {
-
const localJob = activeJobsMap.get(whisperJob.id);
+
// Try to find by whisper_job_id first, then fall back to id
+
let localJob = Array.from(activeJobsMap.values()).find(
+
(j) => j.whisper_job_id === whisperJob.id,
+
);
+
+
if (!localJob) {
+
// Legacy: try matching by our transcriptionId === whisperJob.id
+
localJob = activeJobsMap.get(whisperJob.id);
+
}
if (!localJob) {
await this.handleOrphanedWhisperJob(whisperJob.id);
continue;
}
-
if (whisperJob.status === "completed" || whisperJob.status === "failed") {
-
await this.syncCompletedJob(whisperJob);
+
// Reconnect to active jobs on startup
+
if (
+
whisperJob.status === "processing" ||
+
whisperJob.status === "transcribing"
+
) {
+
// Check if we're already streaming this job
+
if (!this.activeStreams.has(localJob.id)) {
+
console.log(
+
`[Sync] Reconnecting to active job ${localJob.id} (Murmur job ${whisperJob.id})`,
+
);
+
const filePath = `./uploads/${localJob.filename}`;
+
this.streamWhisperJob(localJob.id, whisperJob.id, filePath);
+
}
+
} else if (
+
whisperJob.status === "completed" ||
+
whisperJob.status === "failed"
+
) {
+
// Use our transcription ID, not Murmur's job ID
+
await this.syncCompletedJob(whisperJob, localJob.id);
}
}
}
private async handleOrphanedWhisperJob(jobId: string) {
+
// Check if this Murmur job_id exists in our DB (either as id or whisper_job_id)
const jobExists = this.db
-
.query<{ id: string }, [string]>(
-
"SELECT id FROM transcriptions WHERE id = ?",
+
.query<{ id: string }, [string, string]>(
+
"SELECT id FROM transcriptions WHERE id = ? OR whisper_job_id = ?",
)
-
.get(jobId);
+
.get(jobId, jobId);
if (!jobExists) {
-
// Not our job, delete it from Whisper
-
await this.deleteWhisperJob(jobId);
+
// Not our job - Murmur will keep it until explicitly deleted
+
console.warn(
+
`[Sync] Found orphaned job ${jobId} in Murmur (not in our DB)`,
+
);
}
}
-
private async syncCompletedJob(whisperJob: WhisperJob) {
+
private async syncCompletedJob(
+
whisperJob: WhisperJob,
+
transcriptionId: string,
+
) {
try {
const details = await this.fetchJobDetails(whisperJob.id);
if (!details) return;
···
const transcript =
details.transcript?.substring(0, MAX_TRANSCRIPT_LENGTH) ?? "";
-
this.updateTranscription(whisperJob.id, {
+
this.updateTranscription(transcriptionId, {
status: "completed",
progress: 100,
transcript,
});
-
this.events.emit(whisperJob.id, {
+
this.events.emit(transcriptionId, {
status: "completed",
progress: 100,
transcript,
···
details.error_message ?? "Transcription failed"
).substring(0, MAX_ERROR_LENGTH);
-
this.updateTranscription(whisperJob.id, {
+
this.updateTranscription(transcriptionId, {
status: "failed",
error_message: errorMessage,
});
-
this.events.emit(whisperJob.id, {
+
this.events.emit(transcriptionId, {
status: "failed",
progress: 0,
error_message: errorMessage,
});
}
-
await this.deleteWhisperJob(whisperJob.id);
+
// Job persists in Murmur until explicitly deleted - we just sync state
} catch {
console.warn(
`[Sync] Failed to retrieve details for job ${whisperJob.id}`,
···
}
private async syncDbJobsToWhisper(
-
activeDbJobs: Array<{ id: string; filename: string; status: string }>,
+
activeDbJobs: Array<{
+
id: string;
+
whisper_job_id: string | null;
+
filename: string;
+
status: string;
+
}>,
whisperJobs: WhisperJob[],
) {
for (const localJob of activeDbJobs) {
-
const whisperHasJob = whisperJobs.some((wj) => wj.id === localJob.id);
+
// Check if Murmur has this job (by whisper_job_id or legacy id match)
+
const whisperHasJob = whisperJobs.some(
+
(wj) => wj.id === localJob.whisper_job_id || wj.id === localJob.id,
+
);
-
if (!whisperHasJob) {
-
// Job was lost, mark as failed
+
if (!whisperHasJob && localJob.whisper_job_id) {
+
// Job was lost from Murmur, mark as failed
const errorMessage = "Job lost - whisper service may have restarted";
this.updateTranscription(localJob.id, {
+1 -1
whisper-server/main.py
···
# --- 1. Load Model on Startup ---
# This loads the model only once, not on every request
print("--- Loading faster-whisper model... ---")
-
model_size = "small"
+
model_size = "medium.en"
# You can change this to "cuda" and "float16" if you have a GPU
model = WhisperModel(model_size, device="cpu", compute_type="int8")
print(f"--- Model '{model_size}' loaded. Server is ready. ---")