馃 distributed transcription service thistle.dunkirk.sh
1import type { Database } from "bun:sqlite"; 2import { createEventSource } from "eventsource-client"; 3import { ErrorCode } from "./errors"; 4import { saveTranscriptVTT } from "./transcript-storage"; 5import { cleanVTT } from "./vtt-cleaner"; 6 7// Constants 8export const MAX_FILE_SIZE = 100 * 1024 * 1024; // 100MB 9export const MAX_ERROR_LENGTH = 255; 10 11// Types 12export type TranscriptionStatus = 13 | "uploading" 14 | "processing" 15 | "transcribing" 16 | "finalizing" 17 | "completed" 18 | "failed"; 19 20export interface TranscriptionUpdate { 21 status: TranscriptionStatus; 22 progress: number; 23 transcript?: string; 24 error_message?: string; 25 error_code?: string; 26} 27 28export interface WhisperJob { 29 id: string; 30 status: string; 31 progress?: number; 32 transcript?: string; 33 error_message?: string; 34} 35 36// Event emitter for real-time transcription updates with automatic cleanup 37export class TranscriptionEventEmitter { 38 private listeners = new Map< 39 string, 40 Set<(data: TranscriptionUpdate) => void> 41 >(); 42 private cleanupTimers = new Map<string, NodeJS.Timeout>(); 43 44 on(transcriptionId: string, callback: (data: TranscriptionUpdate) => void) { 45 if (!this.listeners.has(transcriptionId)) { 46 this.listeners.set(transcriptionId, new Set()); 47 } 48 this.listeners.get(transcriptionId)?.add(callback); 49 50 // Clear any pending cleanup for this transcription 51 const timer = this.cleanupTimers.get(transcriptionId); 52 if (timer) { 53 clearTimeout(timer); 54 this.cleanupTimers.delete(transcriptionId); 55 } 56 } 57 58 off(transcriptionId: string, callback: (data: TranscriptionUpdate) => void) { 59 this.listeners.get(transcriptionId)?.delete(callback); 60 61 // Schedule cleanup if no listeners remain 62 if (this.listeners.get(transcriptionId)?.size === 0) { 63 this.scheduleCleanup(transcriptionId); 64 } 65 } 66 67 emit(transcriptionId: string, data: TranscriptionUpdate) { 68 const callbacks = this.listeners.get(transcriptionId); 69 if (callbacks) { 70 for (const callback of callbacks) { 71 callback(data); 72 } 73 } 74 75 // Auto-cleanup completed/failed jobs after emission 76 if (data.status === "completed" || data.status === "failed") { 77 this.scheduleCleanup(transcriptionId); 78 } 79 } 80 81 hasListeners(transcriptionId: string): boolean { 82 return (this.listeners.get(transcriptionId)?.size ?? 0) > 0; 83 } 84 85 private scheduleCleanup(transcriptionId: string) { 86 // Clean up listeners after 5 minutes of inactivity 87 const timer = setTimeout( 88 () => { 89 this.listeners.delete(transcriptionId); 90 this.cleanupTimers.delete(transcriptionId); 91 }, 92 5 * 60 * 1000, 93 ); 94 95 this.cleanupTimers.set(transcriptionId, timer); 96 } 97} 98 99// Whisper service manager 100export class WhisperServiceManager { 101 private activeStreams = new Map< 102 string, 103 ReturnType<typeof createEventSource> 104 >(); 105 private streamLocks = new Set<string>(); 106 107 constructor( 108 private serviceUrl: string, 109 private db: Database, 110 private events: TranscriptionEventEmitter, 111 ) {} 112 113 async checkHealth(): Promise<boolean> { 114 try { 115 const response = await fetch(`${this.serviceUrl}/jobs`, { 116 method: "GET", 117 }); 118 return response.ok; 119 } catch { 120 return false; 121 } 122 } 123 124 async startTranscription( 125 transcriptionId: string, 126 filename: string, 127 ): Promise<void> { 128 try { 129 // Update status to processing 130 this.updateTranscription(transcriptionId, { 131 status: "processing", 132 progress: 10, 133 }); 134 135 // Read file from disk 136 const filePath = `./uploads/${filename}`; 137 const fileBuffer = await Bun.file(filePath).arrayBuffer(); 138 139 // Create form data for the Murmur server 140 const formData = new FormData(); 141 const file = new File([fileBuffer], filename, { type: "audio/mpeg" }); 142 formData.append("file", file); 143 144 // Call the Murmur server to start transcription 145 const response = await fetch(`${this.serviceUrl}/transcribe`, { 146 method: "POST", 147 body: formData, 148 }); 149 150 if (!response.ok) { 151 const errorText = await response.text().catch(() => "Unknown error"); 152 throw new Error( 153 `Whisper service returned ${response.status}: ${errorText}`, 154 ); 155 } 156 157 const { job_id } = await response.json(); 158 159 // Store Murmur's job_id in our database for tracking 160 this.db.run("UPDATE transcriptions SET whisper_job_id = ? WHERE id = ?", [ 161 job_id, 162 transcriptionId, 163 ]); 164 165 // Connect to SSE stream from Murmur (use the job_id returned by Murmur) 166 this.streamWhisperJob(transcriptionId, job_id); 167 } catch (error) { 168 console.error( 169 `[Transcription] Failed to start ${transcriptionId}:`, 170 error, 171 ); 172 const errorMessage = 173 error instanceof Error ? error.message : "Unknown error"; 174 const errorCode = 175 error instanceof Error && error.message.includes("Whisper service") 176 ? ErrorCode.WHISPER_SERVICE_ERROR 177 : ErrorCode.TRANSCRIPTION_FAILED; 178 179 this.updateTranscription(transcriptionId, { 180 status: "failed", 181 error_message: errorMessage, 182 }); 183 184 this.events.emit(transcriptionId, { 185 status: "failed", 186 progress: 0, 187 error_message: errorMessage, 188 error_code: errorCode, 189 }); 190 } 191 } 192 193 private streamWhisperJob( 194 transcriptionId: string, 195 jobId: string, 196 ) { 197 // Prevent duplicate streams using locks 198 if (this.streamLocks.has(transcriptionId)) { 199 return; 200 } 201 202 this.streamLocks.add(transcriptionId); 203 204 const es = createEventSource({ 205 url: `${this.serviceUrl}/transcribe/${jobId}/stream`, 206 onMessage: async ({ event, data }) => { 207 try { 208 // Handle "error" events from SSE (e.g., "Job not found") 209 if (event === "error") { 210 const errorData = JSON.parse(data) as { error: string }; 211 console.error( 212 `[Stream] Whisper service error for ${transcriptionId}:`, 213 errorData.error, 214 ); 215 216 // Mark the job as failed in our database 217 this.updateTranscription(transcriptionId, { 218 status: "failed", 219 error_message: errorData.error, 220 }); 221 222 this.events.emit(transcriptionId, { 223 status: "failed", 224 progress: 0, 225 error_message: errorData.error, 226 error_code: ErrorCode.TRANSCRIPTION_FAILED, 227 }); 228 229 this.closeStream(transcriptionId); 230 return; 231 } 232 233 const update = JSON.parse(data) as WhisperJob; 234 await this.handleWhisperUpdate(transcriptionId, update); 235 } catch (err) { 236 console.error( 237 `[Stream] Error processing update for ${transcriptionId}:`, 238 err, 239 ); 240 } 241 }, 242 }); 243 244 this.activeStreams.set(transcriptionId, es); 245 } 246 247 private async handleWhisperUpdate( 248 transcriptionId: string, 249 update: WhisperJob, 250 ) { 251 if (update.status === "pending") { 252 // Initial status, no action needed 253 return; 254 } 255 256 if (update.status === "processing") { 257 // Murmur is initializing (file I/O, WhisperKit setup) - no transcript yet 258 const progress = Math.min(100, update.progress ?? 0); 259 260 this.updateTranscription(transcriptionId, { 261 status: "processing", 262 progress, 263 }); 264 265 this.events.emit(transcriptionId, { 266 status: "processing", 267 progress, 268 }); 269 } else if (update.status === "transcribing") { 270 // Active transcription with progress callbacks 271 const progress = Math.min(100, update.progress ?? 0); 272 273 // If progress is still 0, keep status as "processing" until real progress starts 274 const status = progress === 0 ? "processing" : "transcribing"; 275 276 // Strip WhisperKit special tokens from intermediate transcript 277 let transcript = update.transcript ?? ""; 278 transcript = transcript.replace(/<\|[^|]+\|>/g, "").trim(); 279 280 this.updateTranscription(transcriptionId, { 281 status, 282 progress, 283 }); 284 285 this.events.emit(transcriptionId, { 286 status, 287 progress, 288 transcript: transcript || undefined, 289 }); 290 } else if (update.status === "completed") { 291 // Set to finalizing state while we fetch and process the VTT 292 this.updateTranscription(transcriptionId, { 293 status: "finalizing", 294 progress: 100, 295 }); 296 297 this.events.emit(transcriptionId, { 298 status: "finalizing", 299 progress: 100, 300 }); 301 302 // Fetch and save VTT file from Murmur 303 const whisperJobId = this.db 304 .query<{ whisper_job_id: string }, [string]>( 305 "SELECT whisper_job_id FROM transcriptions WHERE id = ?", 306 ) 307 .get(transcriptionId)?.whisper_job_id; 308 309 if (whisperJobId) { 310 try { 311 const vttResponse = await fetch( 312 `${this.serviceUrl}/transcribe/${whisperJobId}?format=vtt`, 313 ); 314 if (vttResponse.ok) { 315 const vttContent = await vttResponse.text(); 316 const cleanedVTT = await cleanVTT(transcriptionId, vttContent); 317 await saveTranscriptVTT(transcriptionId, cleanedVTT); 318 this.updateTranscription(transcriptionId, {}); 319 } 320 } catch (error) { 321 console.warn( 322 `[Transcription] Failed to fetch VTT for ${transcriptionId}:`, 323 error, 324 ); 325 } 326 } 327 328 this.updateTranscription(transcriptionId, { 329 status: "completed", 330 progress: 100, 331 }); 332 333 this.events.emit(transcriptionId, { 334 status: "completed", 335 progress: 100, 336 }); 337 338 // Close stream - keep audio file for playback 339 this.closeStream(transcriptionId); 340 } else if (update.status === "failed") { 341 const errorMessage = ( 342 update.error_message ?? "Transcription failed" 343 ).substring(0, MAX_ERROR_LENGTH); 344 345 this.updateTranscription(transcriptionId, { 346 status: "failed", 347 error_message: errorMessage, 348 }); 349 350 this.events.emit(transcriptionId, { 351 status: "failed", 352 progress: 0, 353 error_message: errorMessage, 354 error_code: ErrorCode.TRANSCRIPTION_FAILED, 355 }); 356 357 // Only close stream - keep failed jobs in Whisper for debugging 358 this.closeStream(transcriptionId); 359 } 360 } 361 362 private closeStream(transcriptionId: string) { 363 const es = this.activeStreams.get(transcriptionId); 364 if (es) { 365 es.close(); 366 this.activeStreams.delete(transcriptionId); 367 } 368 this.streamLocks.delete(transcriptionId); 369 } 370 371 private updateTranscription( 372 transcriptionId: string, 373 data: { 374 status?: TranscriptionStatus; 375 progress?: number; 376 error_message?: string; 377 vttContent?: string; 378 }, 379 ) { 380 const updates: string[] = []; 381 const values: (string | number)[] = []; 382 383 if (data.status !== undefined) { 384 updates.push("status = ?"); 385 values.push(data.status); 386 } 387 if (data.progress !== undefined) { 388 updates.push("progress = ?"); 389 values.push(data.progress); 390 } 391 if (data.error_message !== undefined) { 392 updates.push("error_message = ?"); 393 values.push(data.error_message); 394 } 395 396 397 updates.push("updated_at = ?"); 398 values.push(Math.floor(Date.now() / 1000)); 399 400 values.push(transcriptionId); 401 402 this.db.run( 403 `UPDATE transcriptions SET ${updates.join(", ")} WHERE id = ?`, 404 values, 405 ); 406 } 407 408 async syncWithWhisper(): Promise<void> { 409 const whisperJobs = await this.fetchWhisperJobs(); 410 if (!whisperJobs) { 411 console.warn("[Sync] Murmur service unavailable"); 412 return; 413 } 414 415 const activeDbJobs = this.getActiveDbJobs(); 416 const activeJobsMap = new Map(activeDbJobs.map((j) => [j.id, j])); 417 418 await this.syncWhisperJobsToDb(whisperJobs, activeJobsMap); 419 await this.syncDbJobsToWhisper(activeDbJobs, whisperJobs); 420 } 421 422 private async fetchWhisperJobs(): Promise<WhisperJob[] | null> { 423 try { 424 const response = await fetch(`${this.serviceUrl}/jobs`); 425 if (!response.ok) { 426 console.warn("[Sync] Whisper service unavailable"); 427 return null; 428 } 429 const { jobs } = await response.json(); 430 return jobs; 431 } catch { 432 return null; 433 } 434 } 435 436 private getActiveDbJobs(): Array<{ 437 id: string; 438 whisper_job_id: string | null; 439 filename: string; 440 status: string; 441 }> { 442 return this.db 443 .query< 444 { 445 id: string; 446 whisper_job_id: string | null; 447 filename: string; 448 status: string; 449 }, 450 [] 451 >( 452 "SELECT id, whisper_job_id, filename, status FROM transcriptions WHERE status IN ('uploading', 'processing', 'transcribing')", 453 ) 454 .all(); 455 } 456 457 private async syncWhisperJobsToDb( 458 whisperJobs: WhisperJob[], 459 activeJobsMap: Map< 460 string, 461 { 462 id: string; 463 whisper_job_id: string | null; 464 filename: string; 465 status: string; 466 } 467 >, 468 ) { 469 for (const whisperJob of whisperJobs) { 470 // Try to find by whisper_job_id first, then fall back to id 471 let localJob = Array.from(activeJobsMap.values()).find( 472 (j) => j.whisper_job_id === whisperJob.id, 473 ); 474 475 if (!localJob) { 476 // Legacy: try matching by our transcriptionId === whisperJob.id 477 localJob = activeJobsMap.get(whisperJob.id); 478 } 479 480 if (!localJob) { 481 await this.handleOrphanedWhisperJob(whisperJob.id); 482 continue; 483 } 484 485 // Reconnect to active jobs on startup 486 if ( 487 whisperJob.status === "processing" || 488 whisperJob.status === "transcribing" 489 ) { 490 // Check if we're already streaming this job 491 if (!this.activeStreams.has(localJob.id)) { 492 console.log( 493 `[Sync] Reconnecting to active job ${localJob.id} (Murmur job ${whisperJob.id})`, 494 ); 495 this.streamWhisperJob(localJob.id, whisperJob.id); 496 } 497 } else if ( 498 whisperJob.status === "completed" || 499 whisperJob.status === "failed" 500 ) { 501 // Use our transcription ID, not Murmur's job ID 502 await this.syncCompletedJob(whisperJob, localJob.id); 503 } 504 } 505 } 506 507 private async handleOrphanedWhisperJob(jobId: string) { 508 // Check if this Murmur job_id exists in our DB (either as id or whisper_job_id) 509 const jobExists = this.db 510 .query<{ id: string }, [string, string]>( 511 "SELECT id FROM transcriptions WHERE id = ? OR whisper_job_id = ?", 512 ) 513 .get(jobId, jobId); 514 515 if (!jobExists) { 516 // Not our job - Murmur will keep it until explicitly deleted 517 console.warn( 518 `[Sync] Found orphaned job ${jobId} in Murmur (not in our DB)`, 519 ); 520 } 521 } 522 523 private async syncCompletedJob( 524 whisperJob: WhisperJob, 525 transcriptionId: string, 526 ) { 527 try { 528 const details = await this.fetchJobDetails(whisperJob.id); 529 if (!details) return; 530 531 if (details.status === "completed") { 532 // Fetch and save VTT file 533 try { 534 const vttResponse = await fetch( 535 `${this.serviceUrl}/transcribe/${whisperJob.id}?format=vtt`, 536 ); 537 if (vttResponse.ok) { 538 const vttContent = await vttResponse.text(); 539 const cleanedVTT = await cleanVTT(transcriptionId, vttContent); 540 await saveTranscriptVTT(transcriptionId, cleanedVTT); 541 this.updateTranscription(transcriptionId, {}); 542 } 543 } catch (error) { 544 console.warn( 545 `[Sync] Failed to fetch VTT for ${transcriptionId}:`, 546 error, 547 ); 548 } 549 550 // Set to finalizing state while we process 551 this.updateTranscription(transcriptionId, { 552 status: "finalizing", 553 progress: 100, 554 }); 555 556 this.events.emit(transcriptionId, { 557 status: "finalizing", 558 progress: 100, 559 }); 560 561 // Then immediately mark as completed 562 this.updateTranscription(transcriptionId, { 563 status: "completed", 564 progress: 100, 565 }); 566 567 this.events.emit(transcriptionId, { 568 status: "completed", 569 progress: 100, 570 }); 571 } else if (details.status === "failed") { 572 const errorMessage = ( 573 details.error_message ?? "Transcription failed" 574 ).substring(0, MAX_ERROR_LENGTH); 575 576 this.updateTranscription(transcriptionId, { 577 status: "failed", 578 error_message: errorMessage, 579 }); 580 581 this.events.emit(transcriptionId, { 582 status: "failed", 583 progress: 0, 584 error_message: errorMessage, 585 }); 586 } 587 588 // Job persists in Murmur until explicitly deleted - we just sync state 589 } catch { 590 console.warn( 591 `[Sync] Failed to retrieve details for job ${whisperJob.id}`, 592 ); 593 } 594 } 595 596 private async fetchJobDetails(jobId: string): Promise<WhisperJob | null> { 597 const response = await fetch(`${this.serviceUrl}/transcribe/${jobId}`); 598 if (!response.ok) return null; 599 return response.json(); 600 } 601 602 private async syncDbJobsToWhisper( 603 activeDbJobs: Array<{ 604 id: string; 605 whisper_job_id: string | null; 606 filename: string; 607 status: string; 608 }>, 609 whisperJobs: WhisperJob[], 610 ) { 611 for (const localJob of activeDbJobs) { 612 // Check if Murmur has this job (by whisper_job_id or legacy id match) 613 const whisperHasJob = whisperJobs.some( 614 (wj) => wj.id === localJob.whisper_job_id || wj.id === localJob.id, 615 ); 616 617 if (!whisperHasJob && localJob.whisper_job_id) { 618 // Job was lost from Murmur, mark as failed 619 const errorMessage = "Job lost - whisper service may have restarted"; 620 621 this.updateTranscription(localJob.id, { 622 status: "failed", 623 error_message: errorMessage, 624 }); 625 626 this.events.emit(localJob.id, { 627 status: "failed", 628 progress: 0, 629 error_message: errorMessage, 630 }); 631 } 632 } 633 } 634 635 async cleanupStaleFiles(): Promise<void> { 636 try { 637 // Find transcriptions older than 24 hours that are completed or failed 638 const staleTranscriptions = this.db 639 .query<{ filename: string }, [number]>( 640 `SELECT filename FROM transcriptions 641 WHERE status IN ('completed', 'failed') 642 AND updated_at < ?`, 643 ) 644 .all(Math.floor(Date.now() / 1000) - 24 * 60 * 60); 645 646 for (const { filename } of staleTranscriptions) { 647 const filePath = `./uploads/${filename}`; 648 await Bun.write(filePath, "").catch(() => {}); 649 } 650 } catch (error) { 651 console.error("[Cleanup] Failed:", error); 652 } 653 } 654}