馃 distributed transcription service thistle.dunkirk.sh
1import type { Database } from "bun:sqlite"; 2import { createEventSource } from "eventsource-client"; 3import { ErrorCode } from "./errors"; 4import { saveTranscriptVTT } from "./transcript-storage"; 5import { cleanVTT } from "./vtt-cleaner"; 6 7// Constants 8export const MAX_FILE_SIZE = 100 * 1024 * 1024; // 100MB 9export const MAX_ERROR_LENGTH = 255; 10 11// Types 12export type TranscriptionStatus = 13 | "uploading" 14 | "processing" 15 | "transcribing" 16 | "completed" 17 | "failed"; 18 19export interface TranscriptionUpdate { 20 status: TranscriptionStatus; 21 progress: number; 22 transcript?: string; 23 error_message?: string; 24 error_code?: string; 25} 26 27export interface WhisperJob { 28 id: string; 29 status: string; 30 progress?: number; 31 transcript?: string; 32 error_message?: string; 33} 34 35// Event emitter for real-time transcription updates with automatic cleanup 36export class TranscriptionEventEmitter { 37 private listeners = new Map< 38 string, 39 Set<(data: TranscriptionUpdate) => void> 40 >(); 41 private cleanupTimers = new Map<string, NodeJS.Timeout>(); 42 43 on(transcriptionId: string, callback: (data: TranscriptionUpdate) => void) { 44 if (!this.listeners.has(transcriptionId)) { 45 this.listeners.set(transcriptionId, new Set()); 46 } 47 this.listeners.get(transcriptionId)?.add(callback); 48 49 // Clear any pending cleanup for this transcription 50 const timer = this.cleanupTimers.get(transcriptionId); 51 if (timer) { 52 clearTimeout(timer); 53 this.cleanupTimers.delete(transcriptionId); 54 } 55 } 56 57 off(transcriptionId: string, callback: (data: TranscriptionUpdate) => void) { 58 this.listeners.get(transcriptionId)?.delete(callback); 59 60 // Schedule cleanup if no listeners remain 61 if (this.listeners.get(transcriptionId)?.size === 0) { 62 this.scheduleCleanup(transcriptionId); 63 } 64 } 65 66 emit(transcriptionId: string, data: TranscriptionUpdate) { 67 const callbacks = this.listeners.get(transcriptionId); 68 if (callbacks) { 69 for (const callback of callbacks) { 70 callback(data); 71 } 72 } 73 74 // Auto-cleanup completed/failed jobs after emission 75 if (data.status === "completed" || data.status === "failed") { 76 this.scheduleCleanup(transcriptionId); 77 } 78 } 79 80 hasListeners(transcriptionId: string): boolean { 81 return (this.listeners.get(transcriptionId)?.size ?? 0) > 0; 82 } 83 84 private scheduleCleanup(transcriptionId: string) { 85 // Clean up listeners after 5 minutes of inactivity 86 const timer = setTimeout( 87 () => { 88 this.listeners.delete(transcriptionId); 89 this.cleanupTimers.delete(transcriptionId); 90 }, 91 5 * 60 * 1000, 92 ); 93 94 this.cleanupTimers.set(transcriptionId, timer); 95 } 96} 97 98// Whisper service manager 99export class WhisperServiceManager { 100 private activeStreams = new Map< 101 string, 102 ReturnType<typeof createEventSource> 103 >(); 104 private streamLocks = new Set<string>(); 105 106 constructor( 107 private serviceUrl: string, 108 private db: Database, 109 private events: TranscriptionEventEmitter, 110 ) {} 111 112 async checkHealth(): Promise<boolean> { 113 try { 114 const response = await fetch(`${this.serviceUrl}/jobs`, { 115 method: "GET", 116 }); 117 return response.ok; 118 } catch { 119 return false; 120 } 121 } 122 123 async startTranscription( 124 transcriptionId: string, 125 filename: string, 126 ): Promise<void> { 127 try { 128 // Update status to processing 129 this.updateTranscription(transcriptionId, { 130 status: "processing", 131 progress: 10, 132 }); 133 134 // Read file from disk 135 const filePath = `./uploads/${filename}`; 136 const fileBuffer = await Bun.file(filePath).arrayBuffer(); 137 138 // Create form data for the Murmur server 139 const formData = new FormData(); 140 const file = new File([fileBuffer], filename, { type: "audio/mpeg" }); 141 formData.append("file", file); 142 143 // Call the Murmur server to start transcription 144 const response = await fetch(`${this.serviceUrl}/transcribe`, { 145 method: "POST", 146 body: formData, 147 }); 148 149 if (!response.ok) { 150 const errorText = await response.text().catch(() => "Unknown error"); 151 throw new Error( 152 `Whisper service returned ${response.status}: ${errorText}`, 153 ); 154 } 155 156 const { job_id } = await response.json(); 157 158 // Store Murmur's job_id in our database for tracking 159 this.db.run("UPDATE transcriptions SET whisper_job_id = ? WHERE id = ?", [ 160 job_id, 161 transcriptionId, 162 ]); 163 164 // Connect to SSE stream from Murmur (use the job_id returned by Murmur) 165 this.streamWhisperJob(transcriptionId, job_id); 166 } catch (error) { 167 console.error( 168 `[Transcription] Failed to start ${transcriptionId}:`, 169 error, 170 ); 171 const errorMessage = 172 error instanceof Error ? error.message : "Unknown error"; 173 const errorCode = 174 error instanceof Error && error.message.includes("Whisper service") 175 ? ErrorCode.WHISPER_SERVICE_ERROR 176 : ErrorCode.TRANSCRIPTION_FAILED; 177 178 this.updateTranscription(transcriptionId, { 179 status: "failed", 180 error_message: errorMessage, 181 }); 182 183 this.events.emit(transcriptionId, { 184 status: "failed", 185 progress: 0, 186 error_message: errorMessage, 187 error_code: errorCode, 188 }); 189 } 190 } 191 192 private streamWhisperJob( 193 transcriptionId: string, 194 jobId: string, 195 ) { 196 // Prevent duplicate streams using locks 197 if (this.streamLocks.has(transcriptionId)) { 198 return; 199 } 200 201 this.streamLocks.add(transcriptionId); 202 203 const es = createEventSource({ 204 url: `${this.serviceUrl}/transcribe/${jobId}/stream`, 205 onMessage: async ({ event, data }) => { 206 try { 207 // Handle "error" events from SSE (e.g., "Job not found") 208 if (event === "error") { 209 const errorData = JSON.parse(data) as { error: string }; 210 console.error( 211 `[Stream] Whisper service error for ${transcriptionId}:`, 212 errorData.error, 213 ); 214 215 // Mark the job as failed in our database 216 this.updateTranscription(transcriptionId, { 217 status: "failed", 218 error_message: errorData.error, 219 }); 220 221 this.events.emit(transcriptionId, { 222 status: "failed", 223 progress: 0, 224 error_message: errorData.error, 225 error_code: ErrorCode.TRANSCRIPTION_FAILED, 226 }); 227 228 this.closeStream(transcriptionId); 229 return; 230 } 231 232 const update = JSON.parse(data) as WhisperJob; 233 await this.handleWhisperUpdate(transcriptionId, update); 234 } catch (err) { 235 console.error( 236 `[Stream] Error processing update for ${transcriptionId}:`, 237 err, 238 ); 239 } 240 }, 241 }); 242 243 this.activeStreams.set(transcriptionId, es); 244 } 245 246 private async handleWhisperUpdate( 247 transcriptionId: string, 248 update: WhisperJob, 249 ) { 250 if (update.status === "pending") { 251 // Initial status, no action needed 252 return; 253 } 254 255 if (update.status === "processing") { 256 // Murmur is initializing (file I/O, WhisperKit setup) - no transcript yet 257 const progress = Math.min(100, update.progress ?? 0); 258 259 this.updateTranscription(transcriptionId, { 260 status: "processing", 261 progress, 262 }); 263 264 this.events.emit(transcriptionId, { 265 status: "processing", 266 progress, 267 }); 268 } else if (update.status === "transcribing") { 269 // Active transcription with progress callbacks 270 const progress = Math.min(100, update.progress ?? 0); 271 272 // If progress is still 0, keep status as "processing" until real progress starts 273 const status = progress === 0 ? "processing" : "transcribing"; 274 275 // Strip WhisperKit special tokens from intermediate transcript 276 let transcript = update.transcript ?? ""; 277 transcript = transcript.replace(/<\|[^|]+\|>/g, "").trim(); 278 279 this.updateTranscription(transcriptionId, { 280 status, 281 progress, 282 }); 283 284 this.events.emit(transcriptionId, { 285 status, 286 progress, 287 transcript: transcript || undefined, 288 }); 289 } else if (update.status === "completed") { 290 // Fetch and save VTT file from Murmur 291 const whisperJobId = this.db 292 .query<{ whisper_job_id: string }, [string]>( 293 "SELECT whisper_job_id FROM transcriptions WHERE id = ?", 294 ) 295 .get(transcriptionId)?.whisper_job_id; 296 297 if (whisperJobId) { 298 try { 299 const vttResponse = await fetch( 300 `${this.serviceUrl}/transcribe/${whisperJobId}?format=vtt`, 301 ); 302 if (vttResponse.ok) { 303 const vttContent = await vttResponse.text(); 304 const cleanedVTT = await cleanVTT(transcriptionId, vttContent); 305 await saveTranscriptVTT(transcriptionId, cleanedVTT); 306 this.updateTranscription(transcriptionId, {}); 307 } 308 } catch (error) { 309 console.warn( 310 `[Transcription] Failed to fetch VTT for ${transcriptionId}:`, 311 error, 312 ); 313 } 314 } 315 316 this.updateTranscription(transcriptionId, { 317 status: "completed", 318 progress: 100, 319 }); 320 321 this.events.emit(transcriptionId, { 322 status: "completed", 323 progress: 100, 324 }); 325 326 // Close stream - keep audio file for playback 327 this.closeStream(transcriptionId); 328 } else if (update.status === "failed") { 329 const errorMessage = ( 330 update.error_message ?? "Transcription failed" 331 ).substring(0, MAX_ERROR_LENGTH); 332 333 this.updateTranscription(transcriptionId, { 334 status: "failed", 335 error_message: errorMessage, 336 }); 337 338 this.events.emit(transcriptionId, { 339 status: "failed", 340 progress: 0, 341 error_message: errorMessage, 342 error_code: ErrorCode.TRANSCRIPTION_FAILED, 343 }); 344 345 // Only close stream - keep failed jobs in Whisper for debugging 346 this.closeStream(transcriptionId); 347 } 348 } 349 350 private closeStream(transcriptionId: string) { 351 const es = this.activeStreams.get(transcriptionId); 352 if (es) { 353 es.close(); 354 this.activeStreams.delete(transcriptionId); 355 } 356 this.streamLocks.delete(transcriptionId); 357 } 358 359 private updateTranscription( 360 transcriptionId: string, 361 data: { 362 status?: TranscriptionStatus; 363 progress?: number; 364 error_message?: string; 365 vttContent?: string; 366 }, 367 ) { 368 const updates: string[] = []; 369 const values: (string | number)[] = []; 370 371 if (data.status !== undefined) { 372 updates.push("status = ?"); 373 values.push(data.status); 374 } 375 if (data.progress !== undefined) { 376 updates.push("progress = ?"); 377 values.push(data.progress); 378 } 379 if (data.error_message !== undefined) { 380 updates.push("error_message = ?"); 381 values.push(data.error_message); 382 } 383 384 385 updates.push("updated_at = ?"); 386 values.push(Math.floor(Date.now() / 1000)); 387 388 values.push(transcriptionId); 389 390 this.db.run( 391 `UPDATE transcriptions SET ${updates.join(", ")} WHERE id = ?`, 392 values, 393 ); 394 } 395 396 async syncWithWhisper(): Promise<void> { 397 const whisperJobs = await this.fetchWhisperJobs(); 398 if (!whisperJobs) { 399 throw new Error("Murmur service unavailable"); 400 } 401 402 const activeDbJobs = this.getActiveDbJobs(); 403 const activeJobsMap = new Map(activeDbJobs.map((j) => [j.id, j])); 404 405 await this.syncWhisperJobsToDb(whisperJobs, activeJobsMap); 406 await this.syncDbJobsToWhisper(activeDbJobs, whisperJobs); 407 } 408 409 private async fetchWhisperJobs(): Promise<WhisperJob[] | null> { 410 try { 411 const response = await fetch(`${this.serviceUrl}/jobs`); 412 if (!response.ok) { 413 console.warn("[Sync] Whisper service unavailable"); 414 return null; 415 } 416 const { jobs } = await response.json(); 417 return jobs; 418 } catch { 419 return null; 420 } 421 } 422 423 private getActiveDbJobs(): Array<{ 424 id: string; 425 whisper_job_id: string | null; 426 filename: string; 427 status: string; 428 }> { 429 return this.db 430 .query< 431 { 432 id: string; 433 whisper_job_id: string | null; 434 filename: string; 435 status: string; 436 }, 437 [] 438 >( 439 "SELECT id, whisper_job_id, filename, status FROM transcriptions WHERE status IN ('uploading', 'processing', 'transcribing')", 440 ) 441 .all(); 442 } 443 444 private async syncWhisperJobsToDb( 445 whisperJobs: WhisperJob[], 446 activeJobsMap: Map< 447 string, 448 { 449 id: string; 450 whisper_job_id: string | null; 451 filename: string; 452 status: string; 453 } 454 >, 455 ) { 456 for (const whisperJob of whisperJobs) { 457 // Try to find by whisper_job_id first, then fall back to id 458 let localJob = Array.from(activeJobsMap.values()).find( 459 (j) => j.whisper_job_id === whisperJob.id, 460 ); 461 462 if (!localJob) { 463 // Legacy: try matching by our transcriptionId === whisperJob.id 464 localJob = activeJobsMap.get(whisperJob.id); 465 } 466 467 if (!localJob) { 468 await this.handleOrphanedWhisperJob(whisperJob.id); 469 continue; 470 } 471 472 // Reconnect to active jobs on startup 473 if ( 474 whisperJob.status === "processing" || 475 whisperJob.status === "transcribing" 476 ) { 477 // Check if we're already streaming this job 478 if (!this.activeStreams.has(localJob.id)) { 479 console.log( 480 `[Sync] Reconnecting to active job ${localJob.id} (Murmur job ${whisperJob.id})`, 481 ); 482 this.streamWhisperJob(localJob.id, whisperJob.id); 483 } 484 } else if ( 485 whisperJob.status === "completed" || 486 whisperJob.status === "failed" 487 ) { 488 // Use our transcription ID, not Murmur's job ID 489 await this.syncCompletedJob(whisperJob, localJob.id); 490 } 491 } 492 } 493 494 private async handleOrphanedWhisperJob(jobId: string) { 495 // Check if this Murmur job_id exists in our DB (either as id or whisper_job_id) 496 const jobExists = this.db 497 .query<{ id: string }, [string, string]>( 498 "SELECT id FROM transcriptions WHERE id = ? OR whisper_job_id = ?", 499 ) 500 .get(jobId, jobId); 501 502 if (!jobExists) { 503 // Not our job - Murmur will keep it until explicitly deleted 504 console.warn( 505 `[Sync] Found orphaned job ${jobId} in Murmur (not in our DB)`, 506 ); 507 } 508 } 509 510 private async syncCompletedJob( 511 whisperJob: WhisperJob, 512 transcriptionId: string, 513 ) { 514 try { 515 const details = await this.fetchJobDetails(whisperJob.id); 516 if (!details) return; 517 518 if (details.status === "completed") { 519 // Fetch and save VTT file 520 try { 521 const vttResponse = await fetch( 522 `${this.serviceUrl}/transcribe/${whisperJob.id}?format=vtt`, 523 ); 524 if (vttResponse.ok) { 525 const vttContent = await vttResponse.text(); 526 const cleanedVTT = await cleanVTT(transcriptionId, vttContent); 527 await saveTranscriptVTT(transcriptionId, cleanedVTT); 528 this.updateTranscription(transcriptionId, {}); 529 } 530 } catch (error) { 531 console.warn( 532 `[Sync] Failed to fetch VTT for ${transcriptionId}:`, 533 error, 534 ); 535 } 536 537 this.updateTranscription(transcriptionId, { 538 status: "completed", 539 progress: 100, 540 }); 541 542 this.events.emit(transcriptionId, { 543 status: "completed", 544 progress: 100, 545 }); 546 } else if (details.status === "failed") { 547 const errorMessage = ( 548 details.error_message ?? "Transcription failed" 549 ).substring(0, MAX_ERROR_LENGTH); 550 551 this.updateTranscription(transcriptionId, { 552 status: "failed", 553 error_message: errorMessage, 554 }); 555 556 this.events.emit(transcriptionId, { 557 status: "failed", 558 progress: 0, 559 error_message: errorMessage, 560 }); 561 } 562 563 // Job persists in Murmur until explicitly deleted - we just sync state 564 } catch { 565 console.warn( 566 `[Sync] Failed to retrieve details for job ${whisperJob.id}`, 567 ); 568 } 569 } 570 571 private async fetchJobDetails(jobId: string): Promise<WhisperJob | null> { 572 const response = await fetch(`${this.serviceUrl}/transcribe/${jobId}`); 573 if (!response.ok) return null; 574 return response.json(); 575 } 576 577 private async syncDbJobsToWhisper( 578 activeDbJobs: Array<{ 579 id: string; 580 whisper_job_id: string | null; 581 filename: string; 582 status: string; 583 }>, 584 whisperJobs: WhisperJob[], 585 ) { 586 for (const localJob of activeDbJobs) { 587 // Check if Murmur has this job (by whisper_job_id or legacy id match) 588 const whisperHasJob = whisperJobs.some( 589 (wj) => wj.id === localJob.whisper_job_id || wj.id === localJob.id, 590 ); 591 592 if (!whisperHasJob && localJob.whisper_job_id) { 593 // Job was lost from Murmur, mark as failed 594 const errorMessage = "Job lost - whisper service may have restarted"; 595 596 this.updateTranscription(localJob.id, { 597 status: "failed", 598 error_message: errorMessage, 599 }); 600 601 this.events.emit(localJob.id, { 602 status: "failed", 603 progress: 0, 604 error_message: errorMessage, 605 }); 606 } 607 } 608 } 609 610 async cleanupStaleFiles(): Promise<void> { 611 try { 612 // Find transcriptions older than 24 hours that are completed or failed 613 const staleTranscriptions = this.db 614 .query<{ filename: string }, [number]>( 615 `SELECT filename FROM transcriptions 616 WHERE status IN ('completed', 'failed') 617 AND updated_at < ?`, 618 ) 619 .all(Math.floor(Date.now() / 1000) - 24 * 60 * 60); 620 621 for (const { filename } of staleTranscriptions) { 622 const filePath = `./uploads/${filename}`; 623 await Bun.write(filePath, "").catch(() => {}); 624 } 625 } catch (error) { 626 console.error("[Cleanup] Failed:", error); 627 } 628 } 629}