馃 distributed transcription service
thistle.dunkirk.sh
1import type { Database } from "bun:sqlite";
2import { createEventSource } from "eventsource-client";
3import { ErrorCode } from "./errors";
4import { saveTranscriptVTT } from "./transcript-storage";
5import { cleanVTT } from "./vtt-cleaner";
6
7// Constants
8export const MAX_FILE_SIZE = 100 * 1024 * 1024; // 100MB
9export const MAX_ERROR_LENGTH = 255;
10
11// Types
12export type TranscriptionStatus =
13 | "uploading"
14 | "processing"
15 | "transcribing"
16 | "finalizing"
17 | "completed"
18 | "failed";
19
20export interface TranscriptionUpdate {
21 status: TranscriptionStatus;
22 progress: number;
23 transcript?: string;
24 error_message?: string;
25 error_code?: string;
26}
27
28export interface WhisperJob {
29 id: string;
30 status: string;
31 progress?: number;
32 transcript?: string;
33 error_message?: string;
34}
35
36// Event emitter for real-time transcription updates with automatic cleanup
37export class TranscriptionEventEmitter {
38 private listeners = new Map<
39 string,
40 Set<(data: TranscriptionUpdate) => void>
41 >();
42 private cleanupTimers = new Map<string, NodeJS.Timeout>();
43
44 on(transcriptionId: string, callback: (data: TranscriptionUpdate) => void) {
45 if (!this.listeners.has(transcriptionId)) {
46 this.listeners.set(transcriptionId, new Set());
47 }
48 this.listeners.get(transcriptionId)?.add(callback);
49
50 // Clear any pending cleanup for this transcription
51 const timer = this.cleanupTimers.get(transcriptionId);
52 if (timer) {
53 clearTimeout(timer);
54 this.cleanupTimers.delete(transcriptionId);
55 }
56 }
57
58 off(transcriptionId: string, callback: (data: TranscriptionUpdate) => void) {
59 this.listeners.get(transcriptionId)?.delete(callback);
60
61 // Schedule cleanup if no listeners remain
62 if (this.listeners.get(transcriptionId)?.size === 0) {
63 this.scheduleCleanup(transcriptionId);
64 }
65 }
66
67 emit(transcriptionId: string, data: TranscriptionUpdate) {
68 const callbacks = this.listeners.get(transcriptionId);
69 if (callbacks) {
70 for (const callback of callbacks) {
71 callback(data);
72 }
73 }
74
75 // Auto-cleanup completed/failed jobs after emission
76 if (data.status === "completed" || data.status === "failed") {
77 this.scheduleCleanup(transcriptionId);
78 }
79 }
80
81 hasListeners(transcriptionId: string): boolean {
82 return (this.listeners.get(transcriptionId)?.size ?? 0) > 0;
83 }
84
85 private scheduleCleanup(transcriptionId: string) {
86 // Clean up listeners after 5 minutes of inactivity
87 const timer = setTimeout(
88 () => {
89 this.listeners.delete(transcriptionId);
90 this.cleanupTimers.delete(transcriptionId);
91 },
92 5 * 60 * 1000,
93 );
94
95 this.cleanupTimers.set(transcriptionId, timer);
96 }
97}
98
99// Whisper service manager
100export class WhisperServiceManager {
101 private activeStreams = new Map<
102 string,
103 ReturnType<typeof createEventSource>
104 >();
105 private streamLocks = new Set<string>();
106
107 constructor(
108 private serviceUrl: string,
109 private db: Database,
110 private events: TranscriptionEventEmitter,
111 ) {}
112
113 async checkHealth(): Promise<boolean> {
114 try {
115 const response = await fetch(`${this.serviceUrl}/jobs`, {
116 method: "GET",
117 });
118 return response.ok;
119 } catch {
120 return false;
121 }
122 }
123
124 async startTranscription(
125 transcriptionId: string,
126 filename: string,
127 ): Promise<void> {
128 try {
129 // Update status to processing
130 this.updateTranscription(transcriptionId, {
131 status: "processing",
132 progress: 10,
133 });
134
135 // Read file from disk
136 const filePath = `./uploads/${filename}`;
137 const fileBuffer = await Bun.file(filePath).arrayBuffer();
138
139 // Create form data for the Murmur server
140 const formData = new FormData();
141 const file = new File([fileBuffer], filename, { type: "audio/mpeg" });
142 formData.append("file", file);
143
144 // Call the Murmur server to start transcription
145 const response = await fetch(`${this.serviceUrl}/transcribe`, {
146 method: "POST",
147 body: formData,
148 });
149
150 if (!response.ok) {
151 const errorText = await response.text().catch(() => "Unknown error");
152 throw new Error(
153 `Whisper service returned ${response.status}: ${errorText}`,
154 );
155 }
156
157 const { job_id } = await response.json();
158
159 // Store Murmur's job_id in our database for tracking
160 this.db.run("UPDATE transcriptions SET whisper_job_id = ? WHERE id = ?", [
161 job_id,
162 transcriptionId,
163 ]);
164
165 // Connect to SSE stream from Murmur (use the job_id returned by Murmur)
166 this.streamWhisperJob(transcriptionId, job_id);
167 } catch (error) {
168 console.error(
169 `[Transcription] Failed to start ${transcriptionId}:`,
170 error,
171 );
172 const errorMessage =
173 error instanceof Error ? error.message : "Unknown error";
174 const errorCode =
175 error instanceof Error && error.message.includes("Whisper service")
176 ? ErrorCode.WHISPER_SERVICE_ERROR
177 : ErrorCode.TRANSCRIPTION_FAILED;
178
179 this.updateTranscription(transcriptionId, {
180 status: "failed",
181 error_message: errorMessage,
182 });
183
184 this.events.emit(transcriptionId, {
185 status: "failed",
186 progress: 0,
187 error_message: errorMessage,
188 error_code: errorCode,
189 });
190 }
191 }
192
193 private streamWhisperJob(transcriptionId: string, jobId: string) {
194 // Prevent duplicate streams using locks
195 if (this.streamLocks.has(transcriptionId)) {
196 return;
197 }
198
199 this.streamLocks.add(transcriptionId);
200
201 const es = createEventSource({
202 url: `${this.serviceUrl}/transcribe/${jobId}/stream`,
203 onMessage: async ({ event, data }) => {
204 try {
205 // Handle "error" events from SSE (e.g., "Job not found")
206 if (event === "error") {
207 const errorData = JSON.parse(data) as { error: string };
208 console.error(
209 `[Stream] Whisper service error for ${transcriptionId}:`,
210 errorData.error,
211 );
212
213 // Mark the job as failed in our database
214 this.updateTranscription(transcriptionId, {
215 status: "failed",
216 error_message: errorData.error,
217 });
218
219 this.events.emit(transcriptionId, {
220 status: "failed",
221 progress: 0,
222 error_message: errorData.error,
223 error_code: ErrorCode.TRANSCRIPTION_FAILED,
224 });
225
226 this.closeStream(transcriptionId);
227 return;
228 }
229
230 const update = JSON.parse(data) as WhisperJob;
231 await this.handleWhisperUpdate(transcriptionId, update);
232 } catch (err) {
233 console.error(
234 `[Stream] Error processing update for ${transcriptionId}:`,
235 err,
236 );
237 }
238 },
239 });
240
241 this.activeStreams.set(transcriptionId, es);
242 }
243
244 private async handleWhisperUpdate(
245 transcriptionId: string,
246 update: WhisperJob,
247 ) {
248 if (update.status === "pending") {
249 // Initial status, no action needed
250 return;
251 }
252
253 if (update.status === "processing") {
254 // Murmur is initializing (file I/O, WhisperKit setup) - no transcript yet
255 const progress = Math.min(100, update.progress ?? 0);
256
257 this.updateTranscription(transcriptionId, {
258 status: "processing",
259 progress,
260 });
261
262 this.events.emit(transcriptionId, {
263 status: "processing",
264 progress,
265 });
266 } else if (update.status === "transcribing") {
267 // Active transcription with progress callbacks
268 const progress = Math.min(100, update.progress ?? 0);
269
270 // If progress is still 0, keep status as "processing" until real progress starts
271 const status = progress === 0 ? "processing" : "transcribing";
272
273 // Strip WhisperKit special tokens from intermediate transcript
274 let transcript = update.transcript ?? "";
275 transcript = transcript.replace(/<\|[^|]+\|>/g, "").trim();
276
277 this.updateTranscription(transcriptionId, {
278 status,
279 progress,
280 });
281
282 this.events.emit(transcriptionId, {
283 status,
284 progress,
285 transcript: transcript || undefined,
286 });
287 } else if (update.status === "completed") {
288 // Set to finalizing state while we fetch and process the VTT
289 this.updateTranscription(transcriptionId, {
290 status: "finalizing",
291 progress: 100,
292 });
293
294 this.events.emit(transcriptionId, {
295 status: "finalizing",
296 progress: 100,
297 });
298
299 // Fetch and save VTT file from Murmur
300 const whisperJobId = this.db
301 .query<{ whisper_job_id: string }, [string]>(
302 "SELECT whisper_job_id FROM transcriptions WHERE id = ?",
303 )
304 .get(transcriptionId)?.whisper_job_id;
305
306 if (whisperJobId) {
307 try {
308 const vttResponse = await fetch(
309 `${this.serviceUrl}/transcribe/${whisperJobId}?format=vtt`,
310 );
311 if (vttResponse.ok) {
312 const vttContent = await vttResponse.text();
313 const cleanedVTT = await cleanVTT(transcriptionId, vttContent);
314 await saveTranscriptVTT(transcriptionId, cleanedVTT);
315 this.updateTranscription(transcriptionId, {});
316 }
317 } catch (error) {
318 console.warn(
319 `[Transcription] Failed to fetch VTT for ${transcriptionId}:`,
320 error,
321 );
322 }
323 }
324
325 this.updateTranscription(transcriptionId, {
326 status: "completed",
327 progress: 100,
328 });
329
330 this.events.emit(transcriptionId, {
331 status: "completed",
332 progress: 100,
333 });
334
335 // Close stream - keep audio file for playback
336 this.closeStream(transcriptionId);
337 } else if (update.status === "failed") {
338 const errorMessage = (
339 update.error_message ?? "Transcription failed"
340 ).substring(0, MAX_ERROR_LENGTH);
341
342 this.updateTranscription(transcriptionId, {
343 status: "failed",
344 error_message: errorMessage,
345 });
346
347 this.events.emit(transcriptionId, {
348 status: "failed",
349 progress: 0,
350 error_message: errorMessage,
351 error_code: ErrorCode.TRANSCRIPTION_FAILED,
352 });
353
354 // Only close stream - keep failed jobs in Whisper for debugging
355 this.closeStream(transcriptionId);
356 }
357 }
358
359 private closeStream(transcriptionId: string) {
360 const es = this.activeStreams.get(transcriptionId);
361 if (es) {
362 es.close();
363 this.activeStreams.delete(transcriptionId);
364 }
365 this.streamLocks.delete(transcriptionId);
366 }
367
368 private updateTranscription(
369 transcriptionId: string,
370 data: {
371 status?: TranscriptionStatus;
372 progress?: number;
373 error_message?: string;
374 vttContent?: string;
375 },
376 ) {
377 const updates: string[] = [];
378 const values: (string | number)[] = [];
379
380 if (data.status !== undefined) {
381 updates.push("status = ?");
382 values.push(data.status);
383 }
384 if (data.progress !== undefined) {
385 updates.push("progress = ?");
386 values.push(data.progress);
387 }
388 if (data.error_message !== undefined) {
389 updates.push("error_message = ?");
390 values.push(data.error_message);
391 }
392
393 updates.push("updated_at = ?");
394 values.push(Math.floor(Date.now() / 1000));
395
396 values.push(transcriptionId);
397
398 this.db.run(
399 `UPDATE transcriptions SET ${updates.join(", ")} WHERE id = ?`,
400 values,
401 );
402 }
403
404 async syncWithWhisper(): Promise<void> {
405 const whisperJobs = await this.fetchWhisperJobs();
406 if (!whisperJobs) {
407 console.warn("[Sync] Murmur service unavailable");
408 return;
409 }
410
411 const activeDbJobs = this.getActiveDbJobs();
412 const activeJobsMap = new Map(activeDbJobs.map((j) => [j.id, j]));
413
414 await this.syncWhisperJobsToDb(whisperJobs, activeJobsMap);
415 await this.syncDbJobsToWhisper(activeDbJobs, whisperJobs);
416 }
417
418 private async fetchWhisperJobs(): Promise<WhisperJob[] | null> {
419 try {
420 const response = await fetch(`${this.serviceUrl}/jobs`);
421 if (!response.ok) {
422 console.warn("[Sync] Whisper service unavailable");
423 return null;
424 }
425 const { jobs } = await response.json();
426 return jobs;
427 } catch {
428 return null;
429 }
430 }
431
432 private getActiveDbJobs(): Array<{
433 id: string;
434 whisper_job_id: string | null;
435 filename: string;
436 status: string;
437 }> {
438 return this.db
439 .query<
440 {
441 id: string;
442 whisper_job_id: string | null;
443 filename: string;
444 status: string;
445 },
446 []
447 >(
448 "SELECT id, whisper_job_id, filename, status FROM transcriptions WHERE status IN ('uploading', 'processing', 'transcribing')",
449 )
450 .all();
451 }
452
453 private async syncWhisperJobsToDb(
454 whisperJobs: WhisperJob[],
455 activeJobsMap: Map<
456 string,
457 {
458 id: string;
459 whisper_job_id: string | null;
460 filename: string;
461 status: string;
462 }
463 >,
464 ) {
465 for (const whisperJob of whisperJobs) {
466 // Try to find by whisper_job_id first, then fall back to id
467 let localJob = Array.from(activeJobsMap.values()).find(
468 (j) => j.whisper_job_id === whisperJob.id,
469 );
470
471 if (!localJob) {
472 // Legacy: try matching by our transcriptionId === whisperJob.id
473 localJob = activeJobsMap.get(whisperJob.id);
474 }
475
476 if (!localJob) {
477 await this.handleOrphanedWhisperJob(whisperJob.id);
478 continue;
479 }
480
481 // Reconnect to active jobs on startup
482 if (
483 whisperJob.status === "processing" ||
484 whisperJob.status === "transcribing"
485 ) {
486 // Check if we're already streaming this job
487 if (!this.activeStreams.has(localJob.id)) {
488 console.log(
489 `[Sync] Reconnecting to active job ${localJob.id} (Murmur job ${whisperJob.id})`,
490 );
491 this.streamWhisperJob(localJob.id, whisperJob.id);
492 }
493 } else if (
494 whisperJob.status === "completed" ||
495 whisperJob.status === "failed"
496 ) {
497 // Use our transcription ID, not Murmur's job ID
498 await this.syncCompletedJob(whisperJob, localJob.id);
499 }
500 }
501 }
502
503 private async handleOrphanedWhisperJob(jobId: string) {
504 // Check if this Murmur job_id exists in our DB (either as id or whisper_job_id)
505 const jobExists = this.db
506 .query<{ id: string }, [string, string]>(
507 "SELECT id FROM transcriptions WHERE id = ? OR whisper_job_id = ?",
508 )
509 .get(jobId, jobId);
510
511 if (!jobExists) {
512 // Not our job - Murmur will keep it until explicitly deleted
513 console.warn(
514 `[Sync] Found orphaned job ${jobId} in Murmur (not in our DB)`,
515 );
516 }
517 }
518
519 private async syncCompletedJob(
520 whisperJob: WhisperJob,
521 transcriptionId: string,
522 ) {
523 try {
524 const details = await this.fetchJobDetails(whisperJob.id);
525 if (!details) return;
526
527 if (details.status === "completed") {
528 // Fetch and save VTT file
529 try {
530 const vttResponse = await fetch(
531 `${this.serviceUrl}/transcribe/${whisperJob.id}?format=vtt`,
532 );
533 if (vttResponse.ok) {
534 const vttContent = await vttResponse.text();
535 const cleanedVTT = await cleanVTT(transcriptionId, vttContent);
536 await saveTranscriptVTT(transcriptionId, cleanedVTT);
537 this.updateTranscription(transcriptionId, {});
538 }
539 } catch (error) {
540 console.warn(
541 `[Sync] Failed to fetch VTT for ${transcriptionId}:`,
542 error,
543 );
544 }
545
546 // Set to finalizing state while we process
547 this.updateTranscription(transcriptionId, {
548 status: "finalizing",
549 progress: 100,
550 });
551
552 this.events.emit(transcriptionId, {
553 status: "finalizing",
554 progress: 100,
555 });
556
557 // Then immediately mark as completed
558 this.updateTranscription(transcriptionId, {
559 status: "completed",
560 progress: 100,
561 });
562
563 this.events.emit(transcriptionId, {
564 status: "completed",
565 progress: 100,
566 });
567 } else if (details.status === "failed") {
568 const errorMessage = (
569 details.error_message ?? "Transcription failed"
570 ).substring(0, MAX_ERROR_LENGTH);
571
572 this.updateTranscription(transcriptionId, {
573 status: "failed",
574 error_message: errorMessage,
575 });
576
577 this.events.emit(transcriptionId, {
578 status: "failed",
579 progress: 0,
580 error_message: errorMessage,
581 });
582 }
583
584 // Job persists in Murmur until explicitly deleted - we just sync state
585 } catch {
586 console.warn(
587 `[Sync] Failed to retrieve details for job ${whisperJob.id}`,
588 );
589 }
590 }
591
592 private async fetchJobDetails(jobId: string): Promise<WhisperJob | null> {
593 const response = await fetch(`${this.serviceUrl}/transcribe/${jobId}`);
594 if (!response.ok) return null;
595 return response.json();
596 }
597
598 private async syncDbJobsToWhisper(
599 activeDbJobs: Array<{
600 id: string;
601 whisper_job_id: string | null;
602 filename: string;
603 status: string;
604 }>,
605 whisperJobs: WhisperJob[],
606 ) {
607 for (const localJob of activeDbJobs) {
608 // Check if Murmur has this job (by whisper_job_id or legacy id match)
609 const whisperHasJob = whisperJobs.some(
610 (wj) => wj.id === localJob.whisper_job_id || wj.id === localJob.id,
611 );
612
613 if (!whisperHasJob && localJob.whisper_job_id) {
614 // Job was lost from Murmur, mark as failed
615 const errorMessage = "Job lost - whisper service may have restarted";
616
617 this.updateTranscription(localJob.id, {
618 status: "failed",
619 error_message: errorMessage,
620 });
621
622 this.events.emit(localJob.id, {
623 status: "failed",
624 progress: 0,
625 error_message: errorMessage,
626 });
627 }
628 }
629 }
630
631 async cleanupStaleFiles(): Promise<void> {
632 try {
633 // Find transcriptions older than 24 hours that are completed or failed
634 const staleTranscriptions = this.db
635 .query<{ filename: string }, [number]>(
636 `SELECT filename FROM transcriptions
637 WHERE status IN ('completed', 'failed')
638 AND updated_at < ?`,
639 )
640 .all(Math.floor(Date.now() / 1000) - 24 * 60 * 60);
641
642 for (const { filename } of staleTranscriptions) {
643 const filePath = `./uploads/${filename}`;
644 await Bun.write(filePath, "").catch(() => {});
645 }
646 } catch (error) {
647 console.error("[Cleanup] Failed:", error);
648 }
649 }
650}