馃 distributed transcription service
thistle.dunkirk.sh
1import type { Database } from "bun:sqlite";
2import { createEventSource } from "eventsource-client";
3import { ErrorCode } from "./errors";
4import { saveTranscriptVTT } from "./transcript-storage";
5import { cleanVTT } from "./vtt-cleaner";
6
7// Constants
8export const MAX_FILE_SIZE = 100 * 1024 * 1024; // 100MB
9export const MAX_ERROR_LENGTH = 255;
10
11// Types
12export type TranscriptionStatus =
13 | "uploading"
14 | "processing"
15 | "transcribing"
16 | "completed"
17 | "failed";
18
19export interface TranscriptionUpdate {
20 status: TranscriptionStatus;
21 progress: number;
22 transcript?: string;
23 error_message?: string;
24 error_code?: string;
25}
26
27export interface WhisperJob {
28 id: string;
29 status: string;
30 progress?: number;
31 transcript?: string;
32 error_message?: string;
33}
34
35// Event emitter for real-time transcription updates with automatic cleanup
36export class TranscriptionEventEmitter {
37 private listeners = new Map<
38 string,
39 Set<(data: TranscriptionUpdate) => void>
40 >();
41 private cleanupTimers = new Map<string, NodeJS.Timeout>();
42
43 on(transcriptionId: string, callback: (data: TranscriptionUpdate) => void) {
44 if (!this.listeners.has(transcriptionId)) {
45 this.listeners.set(transcriptionId, new Set());
46 }
47 this.listeners.get(transcriptionId)?.add(callback);
48
49 // Clear any pending cleanup for this transcription
50 const timer = this.cleanupTimers.get(transcriptionId);
51 if (timer) {
52 clearTimeout(timer);
53 this.cleanupTimers.delete(transcriptionId);
54 }
55 }
56
57 off(transcriptionId: string, callback: (data: TranscriptionUpdate) => void) {
58 this.listeners.get(transcriptionId)?.delete(callback);
59
60 // Schedule cleanup if no listeners remain
61 if (this.listeners.get(transcriptionId)?.size === 0) {
62 this.scheduleCleanup(transcriptionId);
63 }
64 }
65
66 emit(transcriptionId: string, data: TranscriptionUpdate) {
67 const callbacks = this.listeners.get(transcriptionId);
68 if (callbacks) {
69 for (const callback of callbacks) {
70 callback(data);
71 }
72 }
73
74 // Auto-cleanup completed/failed jobs after emission
75 if (data.status === "completed" || data.status === "failed") {
76 this.scheduleCleanup(transcriptionId);
77 }
78 }
79
80 hasListeners(transcriptionId: string): boolean {
81 return (this.listeners.get(transcriptionId)?.size ?? 0) > 0;
82 }
83
84 private scheduleCleanup(transcriptionId: string) {
85 // Clean up listeners after 5 minutes of inactivity
86 const timer = setTimeout(
87 () => {
88 this.listeners.delete(transcriptionId);
89 this.cleanupTimers.delete(transcriptionId);
90 },
91 5 * 60 * 1000,
92 );
93
94 this.cleanupTimers.set(transcriptionId, timer);
95 }
96}
97
98// Whisper service manager
99export class WhisperServiceManager {
100 private activeStreams = new Map<
101 string,
102 ReturnType<typeof createEventSource>
103 >();
104 private streamLocks = new Set<string>();
105
106 constructor(
107 private serviceUrl: string,
108 private db: Database,
109 private events: TranscriptionEventEmitter,
110 ) {}
111
112 async checkHealth(): Promise<boolean> {
113 try {
114 const response = await fetch(`${this.serviceUrl}/jobs`, {
115 method: "GET",
116 });
117 return response.ok;
118 } catch {
119 return false;
120 }
121 }
122
123 async startTranscription(
124 transcriptionId: string,
125 filename: string,
126 ): Promise<void> {
127 try {
128 // Update status to processing
129 this.updateTranscription(transcriptionId, {
130 status: "processing",
131 progress: 10,
132 });
133
134 // Read file from disk
135 const filePath = `./uploads/${filename}`;
136 const fileBuffer = await Bun.file(filePath).arrayBuffer();
137
138 // Create form data for the Murmur server
139 const formData = new FormData();
140 const file = new File([fileBuffer], filename, { type: "audio/mpeg" });
141 formData.append("file", file);
142
143 // Call the Murmur server to start transcription
144 const response = await fetch(`${this.serviceUrl}/transcribe`, {
145 method: "POST",
146 body: formData,
147 });
148
149 if (!response.ok) {
150 const errorText = await response.text().catch(() => "Unknown error");
151 throw new Error(
152 `Whisper service returned ${response.status}: ${errorText}`,
153 );
154 }
155
156 const { job_id } = await response.json();
157
158 // Store Murmur's job_id in our database for tracking
159 this.db.run("UPDATE transcriptions SET whisper_job_id = ? WHERE id = ?", [
160 job_id,
161 transcriptionId,
162 ]);
163
164 // Connect to SSE stream from Murmur (use the job_id returned by Murmur)
165 this.streamWhisperJob(transcriptionId, job_id);
166 } catch (error) {
167 console.error(
168 `[Transcription] Failed to start ${transcriptionId}:`,
169 error,
170 );
171 const errorMessage =
172 error instanceof Error ? error.message : "Unknown error";
173 const errorCode =
174 error instanceof Error && error.message.includes("Whisper service")
175 ? ErrorCode.WHISPER_SERVICE_ERROR
176 : ErrorCode.TRANSCRIPTION_FAILED;
177
178 this.updateTranscription(transcriptionId, {
179 status: "failed",
180 error_message: errorMessage,
181 });
182
183 this.events.emit(transcriptionId, {
184 status: "failed",
185 progress: 0,
186 error_message: errorMessage,
187 error_code: errorCode,
188 });
189 }
190 }
191
192 private streamWhisperJob(
193 transcriptionId: string,
194 jobId: string,
195 ) {
196 // Prevent duplicate streams using locks
197 if (this.streamLocks.has(transcriptionId)) {
198 return;
199 }
200
201 this.streamLocks.add(transcriptionId);
202
203 const es = createEventSource({
204 url: `${this.serviceUrl}/transcribe/${jobId}/stream`,
205 onMessage: async ({ event, data }) => {
206 try {
207 // Handle "error" events from SSE (e.g., "Job not found")
208 if (event === "error") {
209 const errorData = JSON.parse(data) as { error: string };
210 console.error(
211 `[Stream] Whisper service error for ${transcriptionId}:`,
212 errorData.error,
213 );
214
215 // Mark the job as failed in our database
216 this.updateTranscription(transcriptionId, {
217 status: "failed",
218 error_message: errorData.error,
219 });
220
221 this.events.emit(transcriptionId, {
222 status: "failed",
223 progress: 0,
224 error_message: errorData.error,
225 error_code: ErrorCode.TRANSCRIPTION_FAILED,
226 });
227
228 this.closeStream(transcriptionId);
229 return;
230 }
231
232 const update = JSON.parse(data) as WhisperJob;
233 await this.handleWhisperUpdate(transcriptionId, update);
234 } catch (err) {
235 console.error(
236 `[Stream] Error processing update for ${transcriptionId}:`,
237 err,
238 );
239 }
240 },
241 });
242
243 this.activeStreams.set(transcriptionId, es);
244 }
245
246 private async handleWhisperUpdate(
247 transcriptionId: string,
248 update: WhisperJob,
249 ) {
250 if (update.status === "pending") {
251 // Initial status, no action needed
252 return;
253 }
254
255 if (update.status === "processing") {
256 // Murmur is initializing (file I/O, WhisperKit setup) - no transcript yet
257 const progress = Math.min(100, update.progress ?? 0);
258
259 this.updateTranscription(transcriptionId, {
260 status: "processing",
261 progress,
262 });
263
264 this.events.emit(transcriptionId, {
265 status: "processing",
266 progress,
267 });
268 } else if (update.status === "transcribing") {
269 // Active transcription with progress callbacks
270 const progress = Math.min(100, update.progress ?? 0);
271
272 // If progress is still 0, keep status as "processing" until real progress starts
273 const status = progress === 0 ? "processing" : "transcribing";
274
275 // Strip WhisperKit special tokens from intermediate transcript
276 let transcript = update.transcript ?? "";
277 transcript = transcript.replace(/<\|[^|]+\|>/g, "").trim();
278
279 this.updateTranscription(transcriptionId, {
280 status,
281 progress,
282 });
283
284 this.events.emit(transcriptionId, {
285 status,
286 progress,
287 transcript: transcript || undefined,
288 });
289 } else if (update.status === "completed") {
290 // Fetch and save VTT file from Murmur
291 const whisperJobId = this.db
292 .query<{ whisper_job_id: string }, [string]>(
293 "SELECT whisper_job_id FROM transcriptions WHERE id = ?",
294 )
295 .get(transcriptionId)?.whisper_job_id;
296
297 if (whisperJobId) {
298 try {
299 const vttResponse = await fetch(
300 `${this.serviceUrl}/transcribe/${whisperJobId}?format=vtt`,
301 );
302 if (vttResponse.ok) {
303 const vttContent = await vttResponse.text();
304 const cleanedVTT = await cleanVTT(transcriptionId, vttContent);
305 await saveTranscriptVTT(transcriptionId, cleanedVTT);
306 this.updateTranscription(transcriptionId, {});
307 }
308 } catch (error) {
309 console.warn(
310 `[Transcription] Failed to fetch VTT for ${transcriptionId}:`,
311 error,
312 );
313 }
314 }
315
316 this.updateTranscription(transcriptionId, {
317 status: "completed",
318 progress: 100,
319 });
320
321 this.events.emit(transcriptionId, {
322 status: "completed",
323 progress: 100,
324 });
325
326 // Close stream - keep audio file for playback
327 this.closeStream(transcriptionId);
328 } else if (update.status === "failed") {
329 const errorMessage = (
330 update.error_message ?? "Transcription failed"
331 ).substring(0, MAX_ERROR_LENGTH);
332
333 this.updateTranscription(transcriptionId, {
334 status: "failed",
335 error_message: errorMessage,
336 });
337
338 this.events.emit(transcriptionId, {
339 status: "failed",
340 progress: 0,
341 error_message: errorMessage,
342 error_code: ErrorCode.TRANSCRIPTION_FAILED,
343 });
344
345 // Only close stream - keep failed jobs in Whisper for debugging
346 this.closeStream(transcriptionId);
347 }
348 }
349
350 private closeStream(transcriptionId: string) {
351 const es = this.activeStreams.get(transcriptionId);
352 if (es) {
353 es.close();
354 this.activeStreams.delete(transcriptionId);
355 }
356 this.streamLocks.delete(transcriptionId);
357 }
358
359 private updateTranscription(
360 transcriptionId: string,
361 data: {
362 status?: TranscriptionStatus;
363 progress?: number;
364 error_message?: string;
365 vttContent?: string;
366 },
367 ) {
368 const updates: string[] = [];
369 const values: (string | number)[] = [];
370
371 if (data.status !== undefined) {
372 updates.push("status = ?");
373 values.push(data.status);
374 }
375 if (data.progress !== undefined) {
376 updates.push("progress = ?");
377 values.push(data.progress);
378 }
379 if (data.error_message !== undefined) {
380 updates.push("error_message = ?");
381 values.push(data.error_message);
382 }
383
384
385 updates.push("updated_at = ?");
386 values.push(Math.floor(Date.now() / 1000));
387
388 values.push(transcriptionId);
389
390 this.db.run(
391 `UPDATE transcriptions SET ${updates.join(", ")} WHERE id = ?`,
392 values,
393 );
394 }
395
396 async syncWithWhisper(): Promise<void> {
397 const whisperJobs = await this.fetchWhisperJobs();
398 if (!whisperJobs) {
399 throw new Error("Murmur service unavailable");
400 }
401
402 const activeDbJobs = this.getActiveDbJobs();
403 const activeJobsMap = new Map(activeDbJobs.map((j) => [j.id, j]));
404
405 await this.syncWhisperJobsToDb(whisperJobs, activeJobsMap);
406 await this.syncDbJobsToWhisper(activeDbJobs, whisperJobs);
407 }
408
409 private async fetchWhisperJobs(): Promise<WhisperJob[] | null> {
410 try {
411 const response = await fetch(`${this.serviceUrl}/jobs`);
412 if (!response.ok) {
413 console.warn("[Sync] Whisper service unavailable");
414 return null;
415 }
416 const { jobs } = await response.json();
417 return jobs;
418 } catch {
419 return null;
420 }
421 }
422
423 private getActiveDbJobs(): Array<{
424 id: string;
425 whisper_job_id: string | null;
426 filename: string;
427 status: string;
428 }> {
429 return this.db
430 .query<
431 {
432 id: string;
433 whisper_job_id: string | null;
434 filename: string;
435 status: string;
436 },
437 []
438 >(
439 "SELECT id, whisper_job_id, filename, status FROM transcriptions WHERE status IN ('uploading', 'processing', 'transcribing')",
440 )
441 .all();
442 }
443
444 private async syncWhisperJobsToDb(
445 whisperJobs: WhisperJob[],
446 activeJobsMap: Map<
447 string,
448 {
449 id: string;
450 whisper_job_id: string | null;
451 filename: string;
452 status: string;
453 }
454 >,
455 ) {
456 for (const whisperJob of whisperJobs) {
457 // Try to find by whisper_job_id first, then fall back to id
458 let localJob = Array.from(activeJobsMap.values()).find(
459 (j) => j.whisper_job_id === whisperJob.id,
460 );
461
462 if (!localJob) {
463 // Legacy: try matching by our transcriptionId === whisperJob.id
464 localJob = activeJobsMap.get(whisperJob.id);
465 }
466
467 if (!localJob) {
468 await this.handleOrphanedWhisperJob(whisperJob.id);
469 continue;
470 }
471
472 // Reconnect to active jobs on startup
473 if (
474 whisperJob.status === "processing" ||
475 whisperJob.status === "transcribing"
476 ) {
477 // Check if we're already streaming this job
478 if (!this.activeStreams.has(localJob.id)) {
479 console.log(
480 `[Sync] Reconnecting to active job ${localJob.id} (Murmur job ${whisperJob.id})`,
481 );
482 this.streamWhisperJob(localJob.id, whisperJob.id);
483 }
484 } else if (
485 whisperJob.status === "completed" ||
486 whisperJob.status === "failed"
487 ) {
488 // Use our transcription ID, not Murmur's job ID
489 await this.syncCompletedJob(whisperJob, localJob.id);
490 }
491 }
492 }
493
494 private async handleOrphanedWhisperJob(jobId: string) {
495 // Check if this Murmur job_id exists in our DB (either as id or whisper_job_id)
496 const jobExists = this.db
497 .query<{ id: string }, [string, string]>(
498 "SELECT id FROM transcriptions WHERE id = ? OR whisper_job_id = ?",
499 )
500 .get(jobId, jobId);
501
502 if (!jobExists) {
503 // Not our job - Murmur will keep it until explicitly deleted
504 console.warn(
505 `[Sync] Found orphaned job ${jobId} in Murmur (not in our DB)`,
506 );
507 }
508 }
509
510 private async syncCompletedJob(
511 whisperJob: WhisperJob,
512 transcriptionId: string,
513 ) {
514 try {
515 const details = await this.fetchJobDetails(whisperJob.id);
516 if (!details) return;
517
518 if (details.status === "completed") {
519 // Fetch and save VTT file
520 try {
521 const vttResponse = await fetch(
522 `${this.serviceUrl}/transcribe/${whisperJob.id}?format=vtt`,
523 );
524 if (vttResponse.ok) {
525 const vttContent = await vttResponse.text();
526 const cleanedVTT = await cleanVTT(transcriptionId, vttContent);
527 await saveTranscriptVTT(transcriptionId, cleanedVTT);
528 this.updateTranscription(transcriptionId, {});
529 }
530 } catch (error) {
531 console.warn(
532 `[Sync] Failed to fetch VTT for ${transcriptionId}:`,
533 error,
534 );
535 }
536
537 this.updateTranscription(transcriptionId, {
538 status: "completed",
539 progress: 100,
540 });
541
542 this.events.emit(transcriptionId, {
543 status: "completed",
544 progress: 100,
545 });
546 } else if (details.status === "failed") {
547 const errorMessage = (
548 details.error_message ?? "Transcription failed"
549 ).substring(0, MAX_ERROR_LENGTH);
550
551 this.updateTranscription(transcriptionId, {
552 status: "failed",
553 error_message: errorMessage,
554 });
555
556 this.events.emit(transcriptionId, {
557 status: "failed",
558 progress: 0,
559 error_message: errorMessage,
560 });
561 }
562
563 // Job persists in Murmur until explicitly deleted - we just sync state
564 } catch {
565 console.warn(
566 `[Sync] Failed to retrieve details for job ${whisperJob.id}`,
567 );
568 }
569 }
570
571 private async fetchJobDetails(jobId: string): Promise<WhisperJob | null> {
572 const response = await fetch(`${this.serviceUrl}/transcribe/${jobId}`);
573 if (!response.ok) return null;
574 return response.json();
575 }
576
577 private async syncDbJobsToWhisper(
578 activeDbJobs: Array<{
579 id: string;
580 whisper_job_id: string | null;
581 filename: string;
582 status: string;
583 }>,
584 whisperJobs: WhisperJob[],
585 ) {
586 for (const localJob of activeDbJobs) {
587 // Check if Murmur has this job (by whisper_job_id or legacy id match)
588 const whisperHasJob = whisperJobs.some(
589 (wj) => wj.id === localJob.whisper_job_id || wj.id === localJob.id,
590 );
591
592 if (!whisperHasJob && localJob.whisper_job_id) {
593 // Job was lost from Murmur, mark as failed
594 const errorMessage = "Job lost - whisper service may have restarted";
595
596 this.updateTranscription(localJob.id, {
597 status: "failed",
598 error_message: errorMessage,
599 });
600
601 this.events.emit(localJob.id, {
602 status: "failed",
603 progress: 0,
604 error_message: errorMessage,
605 });
606 }
607 }
608 }
609
610 async cleanupStaleFiles(): Promise<void> {
611 try {
612 // Find transcriptions older than 24 hours that are completed or failed
613 const staleTranscriptions = this.db
614 .query<{ filename: string }, [number]>(
615 `SELECT filename FROM transcriptions
616 WHERE status IN ('completed', 'failed')
617 AND updated_at < ?`,
618 )
619 .all(Math.floor(Date.now() / 1000) - 24 * 60 * 60);
620
621 for (const { filename } of staleTranscriptions) {
622 const filePath = `./uploads/${filename}`;
623 await Bun.write(filePath, "").catch(() => {});
624 }
625 } catch (error) {
626 console.error("[Cleanup] Failed:", error);
627 }
628 }
629}