馃 distributed transcription service
thistle.dunkirk.sh
1import type { Database } from "bun:sqlite";
2import { createEventSource } from "eventsource-client";
3import { ErrorCode } from "./errors";
4import { saveTranscriptVTT } from "./transcript-storage";
5import { cleanVTT } from "./vtt-cleaner";
6
7// Constants
8export const MAX_FILE_SIZE = 100 * 1024 * 1024; // 100MB
9export const MAX_ERROR_LENGTH = 255;
10
11// Types
12export type TranscriptionStatus =
13 | "uploading"
14 | "processing"
15 | "transcribing"
16 | "finalizing"
17 | "completed"
18 | "failed";
19
20export interface TranscriptionUpdate {
21 status: TranscriptionStatus;
22 progress: number;
23 transcript?: string;
24 error_message?: string;
25 error_code?: string;
26}
27
28export interface WhisperJob {
29 id: string;
30 status: string;
31 progress?: number;
32 transcript?: string;
33 error_message?: string;
34}
35
36// Event emitter for real-time transcription updates with automatic cleanup
37export class TranscriptionEventEmitter {
38 private listeners = new Map<
39 string,
40 Set<(data: TranscriptionUpdate) => void>
41 >();
42 private cleanupTimers = new Map<string, NodeJS.Timeout>();
43
44 on(transcriptionId: string, callback: (data: TranscriptionUpdate) => void) {
45 if (!this.listeners.has(transcriptionId)) {
46 this.listeners.set(transcriptionId, new Set());
47 }
48 this.listeners.get(transcriptionId)?.add(callback);
49
50 // Clear any pending cleanup for this transcription
51 const timer = this.cleanupTimers.get(transcriptionId);
52 if (timer) {
53 clearTimeout(timer);
54 this.cleanupTimers.delete(transcriptionId);
55 }
56 }
57
58 off(transcriptionId: string, callback: (data: TranscriptionUpdate) => void) {
59 this.listeners.get(transcriptionId)?.delete(callback);
60
61 // Schedule cleanup if no listeners remain
62 if (this.listeners.get(transcriptionId)?.size === 0) {
63 this.scheduleCleanup(transcriptionId);
64 }
65 }
66
67 emit(transcriptionId: string, data: TranscriptionUpdate) {
68 const callbacks = this.listeners.get(transcriptionId);
69 if (callbacks) {
70 for (const callback of callbacks) {
71 callback(data);
72 }
73 }
74
75 // Auto-cleanup completed/failed jobs after emission
76 if (data.status === "completed" || data.status === "failed") {
77 this.scheduleCleanup(transcriptionId);
78 }
79 }
80
81 hasListeners(transcriptionId: string): boolean {
82 return (this.listeners.get(transcriptionId)?.size ?? 0) > 0;
83 }
84
85 private scheduleCleanup(transcriptionId: string) {
86 // Clean up listeners after 5 minutes of inactivity
87 const timer = setTimeout(
88 () => {
89 this.listeners.delete(transcriptionId);
90 this.cleanupTimers.delete(transcriptionId);
91 },
92 5 * 60 * 1000,
93 );
94
95 this.cleanupTimers.set(transcriptionId, timer);
96 }
97}
98
99// Whisper service manager
100export class WhisperServiceManager {
101 private activeStreams = new Map<
102 string,
103 ReturnType<typeof createEventSource>
104 >();
105 private streamLocks = new Set<string>();
106
107 constructor(
108 private serviceUrl: string,
109 private db: Database,
110 private events: TranscriptionEventEmitter,
111 ) {}
112
113 async checkHealth(): Promise<boolean> {
114 try {
115 const response = await fetch(`${this.serviceUrl}/jobs`, {
116 method: "GET",
117 });
118 return response.ok;
119 } catch {
120 return false;
121 }
122 }
123
124 async startTranscription(
125 transcriptionId: string,
126 filename: string,
127 ): Promise<void> {
128 try {
129 // Update status to processing
130 this.updateTranscription(transcriptionId, {
131 status: "processing",
132 progress: 10,
133 });
134
135 // Read file from disk
136 const filePath = `./uploads/${filename}`;
137 const fileBuffer = await Bun.file(filePath).arrayBuffer();
138
139 // Create form data for the Murmur server
140 const formData = new FormData();
141 const file = new File([fileBuffer], filename, { type: "audio/mpeg" });
142 formData.append("file", file);
143
144 // Call the Murmur server to start transcription
145 const response = await fetch(`${this.serviceUrl}/transcribe`, {
146 method: "POST",
147 body: formData,
148 });
149
150 if (!response.ok) {
151 const errorText = await response.text().catch(() => "Unknown error");
152 throw new Error(
153 `Whisper service returned ${response.status}: ${errorText}`,
154 );
155 }
156
157 const { job_id } = await response.json();
158
159 // Store Murmur's job_id in our database for tracking
160 this.db.run("UPDATE transcriptions SET whisper_job_id = ? WHERE id = ?", [
161 job_id,
162 transcriptionId,
163 ]);
164
165 // Connect to SSE stream from Murmur (use the job_id returned by Murmur)
166 this.streamWhisperJob(transcriptionId, job_id);
167 } catch (error) {
168 console.error(
169 `[Transcription] Failed to start ${transcriptionId}:`,
170 error,
171 );
172 const errorMessage =
173 error instanceof Error ? error.message : "Unknown error";
174 const errorCode =
175 error instanceof Error && error.message.includes("Whisper service")
176 ? ErrorCode.WHISPER_SERVICE_ERROR
177 : ErrorCode.TRANSCRIPTION_FAILED;
178
179 this.updateTranscription(transcriptionId, {
180 status: "failed",
181 error_message: errorMessage,
182 });
183
184 this.events.emit(transcriptionId, {
185 status: "failed",
186 progress: 0,
187 error_message: errorMessage,
188 error_code: errorCode,
189 });
190 }
191 }
192
193 private streamWhisperJob(
194 transcriptionId: string,
195 jobId: string,
196 ) {
197 // Prevent duplicate streams using locks
198 if (this.streamLocks.has(transcriptionId)) {
199 return;
200 }
201
202 this.streamLocks.add(transcriptionId);
203
204 const es = createEventSource({
205 url: `${this.serviceUrl}/transcribe/${jobId}/stream`,
206 onMessage: async ({ event, data }) => {
207 try {
208 // Handle "error" events from SSE (e.g., "Job not found")
209 if (event === "error") {
210 const errorData = JSON.parse(data) as { error: string };
211 console.error(
212 `[Stream] Whisper service error for ${transcriptionId}:`,
213 errorData.error,
214 );
215
216 // Mark the job as failed in our database
217 this.updateTranscription(transcriptionId, {
218 status: "failed",
219 error_message: errorData.error,
220 });
221
222 this.events.emit(transcriptionId, {
223 status: "failed",
224 progress: 0,
225 error_message: errorData.error,
226 error_code: ErrorCode.TRANSCRIPTION_FAILED,
227 });
228
229 this.closeStream(transcriptionId);
230 return;
231 }
232
233 const update = JSON.parse(data) as WhisperJob;
234 await this.handleWhisperUpdate(transcriptionId, update);
235 } catch (err) {
236 console.error(
237 `[Stream] Error processing update for ${transcriptionId}:`,
238 err,
239 );
240 }
241 },
242 });
243
244 this.activeStreams.set(transcriptionId, es);
245 }
246
247 private async handleWhisperUpdate(
248 transcriptionId: string,
249 update: WhisperJob,
250 ) {
251 if (update.status === "pending") {
252 // Initial status, no action needed
253 return;
254 }
255
256 if (update.status === "processing") {
257 // Murmur is initializing (file I/O, WhisperKit setup) - no transcript yet
258 const progress = Math.min(100, update.progress ?? 0);
259
260 this.updateTranscription(transcriptionId, {
261 status: "processing",
262 progress,
263 });
264
265 this.events.emit(transcriptionId, {
266 status: "processing",
267 progress,
268 });
269 } else if (update.status === "transcribing") {
270 // Active transcription with progress callbacks
271 const progress = Math.min(100, update.progress ?? 0);
272
273 // If progress is still 0, keep status as "processing" until real progress starts
274 const status = progress === 0 ? "processing" : "transcribing";
275
276 // Strip WhisperKit special tokens from intermediate transcript
277 let transcript = update.transcript ?? "";
278 transcript = transcript.replace(/<\|[^|]+\|>/g, "").trim();
279
280 this.updateTranscription(transcriptionId, {
281 status,
282 progress,
283 });
284
285 this.events.emit(transcriptionId, {
286 status,
287 progress,
288 transcript: transcript || undefined,
289 });
290 } else if (update.status === "completed") {
291 // Set to finalizing state while we fetch and process the VTT
292 this.updateTranscription(transcriptionId, {
293 status: "finalizing",
294 progress: 100,
295 });
296
297 this.events.emit(transcriptionId, {
298 status: "finalizing",
299 progress: 100,
300 });
301
302 // Fetch and save VTT file from Murmur
303 const whisperJobId = this.db
304 .query<{ whisper_job_id: string }, [string]>(
305 "SELECT whisper_job_id FROM transcriptions WHERE id = ?",
306 )
307 .get(transcriptionId)?.whisper_job_id;
308
309 if (whisperJobId) {
310 try {
311 const vttResponse = await fetch(
312 `${this.serviceUrl}/transcribe/${whisperJobId}?format=vtt`,
313 );
314 if (vttResponse.ok) {
315 const vttContent = await vttResponse.text();
316 const cleanedVTT = await cleanVTT(transcriptionId, vttContent);
317 await saveTranscriptVTT(transcriptionId, cleanedVTT);
318 this.updateTranscription(transcriptionId, {});
319 }
320 } catch (error) {
321 console.warn(
322 `[Transcription] Failed to fetch VTT for ${transcriptionId}:`,
323 error,
324 );
325 }
326 }
327
328 this.updateTranscription(transcriptionId, {
329 status: "completed",
330 progress: 100,
331 });
332
333 this.events.emit(transcriptionId, {
334 status: "completed",
335 progress: 100,
336 });
337
338 // Close stream - keep audio file for playback
339 this.closeStream(transcriptionId);
340 } else if (update.status === "failed") {
341 const errorMessage = (
342 update.error_message ?? "Transcription failed"
343 ).substring(0, MAX_ERROR_LENGTH);
344
345 this.updateTranscription(transcriptionId, {
346 status: "failed",
347 error_message: errorMessage,
348 });
349
350 this.events.emit(transcriptionId, {
351 status: "failed",
352 progress: 0,
353 error_message: errorMessage,
354 error_code: ErrorCode.TRANSCRIPTION_FAILED,
355 });
356
357 // Only close stream - keep failed jobs in Whisper for debugging
358 this.closeStream(transcriptionId);
359 }
360 }
361
362 private closeStream(transcriptionId: string) {
363 const es = this.activeStreams.get(transcriptionId);
364 if (es) {
365 es.close();
366 this.activeStreams.delete(transcriptionId);
367 }
368 this.streamLocks.delete(transcriptionId);
369 }
370
371 private updateTranscription(
372 transcriptionId: string,
373 data: {
374 status?: TranscriptionStatus;
375 progress?: number;
376 error_message?: string;
377 vttContent?: string;
378 },
379 ) {
380 const updates: string[] = [];
381 const values: (string | number)[] = [];
382
383 if (data.status !== undefined) {
384 updates.push("status = ?");
385 values.push(data.status);
386 }
387 if (data.progress !== undefined) {
388 updates.push("progress = ?");
389 values.push(data.progress);
390 }
391 if (data.error_message !== undefined) {
392 updates.push("error_message = ?");
393 values.push(data.error_message);
394 }
395
396
397 updates.push("updated_at = ?");
398 values.push(Math.floor(Date.now() / 1000));
399
400 values.push(transcriptionId);
401
402 this.db.run(
403 `UPDATE transcriptions SET ${updates.join(", ")} WHERE id = ?`,
404 values,
405 );
406 }
407
408 async syncWithWhisper(): Promise<void> {
409 const whisperJobs = await this.fetchWhisperJobs();
410 if (!whisperJobs) {
411 console.warn("[Sync] Murmur service unavailable");
412 return;
413 }
414
415 const activeDbJobs = this.getActiveDbJobs();
416 const activeJobsMap = new Map(activeDbJobs.map((j) => [j.id, j]));
417
418 await this.syncWhisperJobsToDb(whisperJobs, activeJobsMap);
419 await this.syncDbJobsToWhisper(activeDbJobs, whisperJobs);
420 }
421
422 private async fetchWhisperJobs(): Promise<WhisperJob[] | null> {
423 try {
424 const response = await fetch(`${this.serviceUrl}/jobs`);
425 if (!response.ok) {
426 console.warn("[Sync] Whisper service unavailable");
427 return null;
428 }
429 const { jobs } = await response.json();
430 return jobs;
431 } catch {
432 return null;
433 }
434 }
435
436 private getActiveDbJobs(): Array<{
437 id: string;
438 whisper_job_id: string | null;
439 filename: string;
440 status: string;
441 }> {
442 return this.db
443 .query<
444 {
445 id: string;
446 whisper_job_id: string | null;
447 filename: string;
448 status: string;
449 },
450 []
451 >(
452 "SELECT id, whisper_job_id, filename, status FROM transcriptions WHERE status IN ('uploading', 'processing', 'transcribing')",
453 )
454 .all();
455 }
456
457 private async syncWhisperJobsToDb(
458 whisperJobs: WhisperJob[],
459 activeJobsMap: Map<
460 string,
461 {
462 id: string;
463 whisper_job_id: string | null;
464 filename: string;
465 status: string;
466 }
467 >,
468 ) {
469 for (const whisperJob of whisperJobs) {
470 // Try to find by whisper_job_id first, then fall back to id
471 let localJob = Array.from(activeJobsMap.values()).find(
472 (j) => j.whisper_job_id === whisperJob.id,
473 );
474
475 if (!localJob) {
476 // Legacy: try matching by our transcriptionId === whisperJob.id
477 localJob = activeJobsMap.get(whisperJob.id);
478 }
479
480 if (!localJob) {
481 await this.handleOrphanedWhisperJob(whisperJob.id);
482 continue;
483 }
484
485 // Reconnect to active jobs on startup
486 if (
487 whisperJob.status === "processing" ||
488 whisperJob.status === "transcribing"
489 ) {
490 // Check if we're already streaming this job
491 if (!this.activeStreams.has(localJob.id)) {
492 console.log(
493 `[Sync] Reconnecting to active job ${localJob.id} (Murmur job ${whisperJob.id})`,
494 );
495 this.streamWhisperJob(localJob.id, whisperJob.id);
496 }
497 } else if (
498 whisperJob.status === "completed" ||
499 whisperJob.status === "failed"
500 ) {
501 // Use our transcription ID, not Murmur's job ID
502 await this.syncCompletedJob(whisperJob, localJob.id);
503 }
504 }
505 }
506
507 private async handleOrphanedWhisperJob(jobId: string) {
508 // Check if this Murmur job_id exists in our DB (either as id or whisper_job_id)
509 const jobExists = this.db
510 .query<{ id: string }, [string, string]>(
511 "SELECT id FROM transcriptions WHERE id = ? OR whisper_job_id = ?",
512 )
513 .get(jobId, jobId);
514
515 if (!jobExists) {
516 // Not our job - Murmur will keep it until explicitly deleted
517 console.warn(
518 `[Sync] Found orphaned job ${jobId} in Murmur (not in our DB)`,
519 );
520 }
521 }
522
523 private async syncCompletedJob(
524 whisperJob: WhisperJob,
525 transcriptionId: string,
526 ) {
527 try {
528 const details = await this.fetchJobDetails(whisperJob.id);
529 if (!details) return;
530
531 if (details.status === "completed") {
532 // Fetch and save VTT file
533 try {
534 const vttResponse = await fetch(
535 `${this.serviceUrl}/transcribe/${whisperJob.id}?format=vtt`,
536 );
537 if (vttResponse.ok) {
538 const vttContent = await vttResponse.text();
539 const cleanedVTT = await cleanVTT(transcriptionId, vttContent);
540 await saveTranscriptVTT(transcriptionId, cleanedVTT);
541 this.updateTranscription(transcriptionId, {});
542 }
543 } catch (error) {
544 console.warn(
545 `[Sync] Failed to fetch VTT for ${transcriptionId}:`,
546 error,
547 );
548 }
549
550 // Set to finalizing state while we process
551 this.updateTranscription(transcriptionId, {
552 status: "finalizing",
553 progress: 100,
554 });
555
556 this.events.emit(transcriptionId, {
557 status: "finalizing",
558 progress: 100,
559 });
560
561 // Then immediately mark as completed
562 this.updateTranscription(transcriptionId, {
563 status: "completed",
564 progress: 100,
565 });
566
567 this.events.emit(transcriptionId, {
568 status: "completed",
569 progress: 100,
570 });
571 } else if (details.status === "failed") {
572 const errorMessage = (
573 details.error_message ?? "Transcription failed"
574 ).substring(0, MAX_ERROR_LENGTH);
575
576 this.updateTranscription(transcriptionId, {
577 status: "failed",
578 error_message: errorMessage,
579 });
580
581 this.events.emit(transcriptionId, {
582 status: "failed",
583 progress: 0,
584 error_message: errorMessage,
585 });
586 }
587
588 // Job persists in Murmur until explicitly deleted - we just sync state
589 } catch {
590 console.warn(
591 `[Sync] Failed to retrieve details for job ${whisperJob.id}`,
592 );
593 }
594 }
595
596 private async fetchJobDetails(jobId: string): Promise<WhisperJob | null> {
597 const response = await fetch(`${this.serviceUrl}/transcribe/${jobId}`);
598 if (!response.ok) return null;
599 return response.json();
600 }
601
602 private async syncDbJobsToWhisper(
603 activeDbJobs: Array<{
604 id: string;
605 whisper_job_id: string | null;
606 filename: string;
607 status: string;
608 }>,
609 whisperJobs: WhisperJob[],
610 ) {
611 for (const localJob of activeDbJobs) {
612 // Check if Murmur has this job (by whisper_job_id or legacy id match)
613 const whisperHasJob = whisperJobs.some(
614 (wj) => wj.id === localJob.whisper_job_id || wj.id === localJob.id,
615 );
616
617 if (!whisperHasJob && localJob.whisper_job_id) {
618 // Job was lost from Murmur, mark as failed
619 const errorMessage = "Job lost - whisper service may have restarted";
620
621 this.updateTranscription(localJob.id, {
622 status: "failed",
623 error_message: errorMessage,
624 });
625
626 this.events.emit(localJob.id, {
627 status: "failed",
628 progress: 0,
629 error_message: errorMessage,
630 });
631 }
632 }
633 }
634
635 async cleanupStaleFiles(): Promise<void> {
636 try {
637 // Find transcriptions older than 24 hours that are completed or failed
638 const staleTranscriptions = this.db
639 .query<{ filename: string }, [number]>(
640 `SELECT filename FROM transcriptions
641 WHERE status IN ('completed', 'failed')
642 AND updated_at < ?`,
643 )
644 .all(Math.floor(Date.now() / 1000) - 24 * 60 * 60);
645
646 for (const { filename } of staleTranscriptions) {
647 const filePath = `./uploads/${filename}`;
648 await Bun.write(filePath, "").catch(() => {});
649 }
650 } catch (error) {
651 console.error("[Cleanup] Failed:", error);
652 }
653 }
654}