馃 distributed transcription service thistle.dunkirk.sh
at v0.1.0 17 kB view raw
1import type { Database } from "bun:sqlite"; 2import { createEventSource } from "eventsource-client"; 3import { ErrorCode } from "./errors"; 4import { saveTranscriptVTT } from "./transcript-storage"; 5import { cleanVTT } from "./vtt-cleaner"; 6 7// Constants 8export const MAX_FILE_SIZE = 100 * 1024 * 1024; // 100MB 9export const MAX_ERROR_LENGTH = 255; 10 11// Types 12export type TranscriptionStatus = 13 | "uploading" 14 | "processing" 15 | "transcribing" 16 | "finalizing" 17 | "completed" 18 | "failed"; 19 20export interface TranscriptionUpdate { 21 status: TranscriptionStatus; 22 progress: number; 23 transcript?: string; 24 error_message?: string; 25 error_code?: string; 26} 27 28export interface WhisperJob { 29 id: string; 30 status: string; 31 progress?: number; 32 transcript?: string; 33 error_message?: string; 34} 35 36// Event emitter for real-time transcription updates with automatic cleanup 37export class TranscriptionEventEmitter { 38 private listeners = new Map< 39 string, 40 Set<(data: TranscriptionUpdate) => void> 41 >(); 42 private cleanupTimers = new Map<string, NodeJS.Timeout>(); 43 44 on(transcriptionId: string, callback: (data: TranscriptionUpdate) => void) { 45 if (!this.listeners.has(transcriptionId)) { 46 this.listeners.set(transcriptionId, new Set()); 47 } 48 this.listeners.get(transcriptionId)?.add(callback); 49 50 // Clear any pending cleanup for this transcription 51 const timer = this.cleanupTimers.get(transcriptionId); 52 if (timer) { 53 clearTimeout(timer); 54 this.cleanupTimers.delete(transcriptionId); 55 } 56 } 57 58 off(transcriptionId: string, callback: (data: TranscriptionUpdate) => void) { 59 this.listeners.get(transcriptionId)?.delete(callback); 60 61 // Schedule cleanup if no listeners remain 62 if (this.listeners.get(transcriptionId)?.size === 0) { 63 this.scheduleCleanup(transcriptionId); 64 } 65 } 66 67 emit(transcriptionId: string, data: TranscriptionUpdate) { 68 const callbacks = this.listeners.get(transcriptionId); 69 if (callbacks) { 70 for (const callback of callbacks) { 71 callback(data); 72 } 73 } 74 75 // Auto-cleanup completed/failed jobs after emission 76 if (data.status === "completed" || data.status === "failed") { 77 this.scheduleCleanup(transcriptionId); 78 } 79 } 80 81 hasListeners(transcriptionId: string): boolean { 82 return (this.listeners.get(transcriptionId)?.size ?? 0) > 0; 83 } 84 85 private scheduleCleanup(transcriptionId: string) { 86 // Clean up listeners after 5 minutes of inactivity 87 const timer = setTimeout( 88 () => { 89 this.listeners.delete(transcriptionId); 90 this.cleanupTimers.delete(transcriptionId); 91 }, 92 5 * 60 * 1000, 93 ); 94 95 this.cleanupTimers.set(transcriptionId, timer); 96 } 97} 98 99// Whisper service manager 100export class WhisperServiceManager { 101 private activeStreams = new Map< 102 string, 103 ReturnType<typeof createEventSource> 104 >(); 105 private streamLocks = new Set<string>(); 106 107 constructor( 108 private serviceUrl: string, 109 private db: Database, 110 private events: TranscriptionEventEmitter, 111 ) {} 112 113 async checkHealth(): Promise<boolean> { 114 try { 115 const response = await fetch(`${this.serviceUrl}/jobs`, { 116 method: "GET", 117 }); 118 return response.ok; 119 } catch { 120 return false; 121 } 122 } 123 124 async startTranscription( 125 transcriptionId: string, 126 filename: string, 127 ): Promise<void> { 128 try { 129 // Update status to processing 130 this.updateTranscription(transcriptionId, { 131 status: "processing", 132 progress: 10, 133 }); 134 135 // Read file from disk 136 const filePath = `./uploads/${filename}`; 137 const fileBuffer = await Bun.file(filePath).arrayBuffer(); 138 139 // Create form data for the Murmur server 140 const formData = new FormData(); 141 const file = new File([fileBuffer], filename, { type: "audio/mpeg" }); 142 formData.append("file", file); 143 144 // Call the Murmur server to start transcription 145 const response = await fetch(`${this.serviceUrl}/transcribe`, { 146 method: "POST", 147 body: formData, 148 }); 149 150 if (!response.ok) { 151 const errorText = await response.text().catch(() => "Unknown error"); 152 throw new Error( 153 `Whisper service returned ${response.status}: ${errorText}`, 154 ); 155 } 156 157 const { job_id } = await response.json(); 158 159 // Store Murmur's job_id in our database for tracking 160 this.db.run("UPDATE transcriptions SET whisper_job_id = ? WHERE id = ?", [ 161 job_id, 162 transcriptionId, 163 ]); 164 165 // Connect to SSE stream from Murmur (use the job_id returned by Murmur) 166 this.streamWhisperJob(transcriptionId, job_id); 167 } catch (error) { 168 console.error( 169 `[Transcription] Failed to start ${transcriptionId}:`, 170 error, 171 ); 172 const errorMessage = 173 error instanceof Error ? error.message : "Unknown error"; 174 const errorCode = 175 error instanceof Error && error.message.includes("Whisper service") 176 ? ErrorCode.WHISPER_SERVICE_ERROR 177 : ErrorCode.TRANSCRIPTION_FAILED; 178 179 this.updateTranscription(transcriptionId, { 180 status: "failed", 181 error_message: errorMessage, 182 }); 183 184 this.events.emit(transcriptionId, { 185 status: "failed", 186 progress: 0, 187 error_message: errorMessage, 188 error_code: errorCode, 189 }); 190 } 191 } 192 193 private streamWhisperJob(transcriptionId: string, jobId: string) { 194 // Prevent duplicate streams using locks 195 if (this.streamLocks.has(transcriptionId)) { 196 return; 197 } 198 199 this.streamLocks.add(transcriptionId); 200 201 const es = createEventSource({ 202 url: `${this.serviceUrl}/transcribe/${jobId}/stream`, 203 onMessage: async ({ event, data }) => { 204 try { 205 // Handle "error" events from SSE (e.g., "Job not found") 206 if (event === "error") { 207 const errorData = JSON.parse(data) as { error: string }; 208 console.error( 209 `[Stream] Whisper service error for ${transcriptionId}:`, 210 errorData.error, 211 ); 212 213 // Mark the job as failed in our database 214 this.updateTranscription(transcriptionId, { 215 status: "failed", 216 error_message: errorData.error, 217 }); 218 219 this.events.emit(transcriptionId, { 220 status: "failed", 221 progress: 0, 222 error_message: errorData.error, 223 error_code: ErrorCode.TRANSCRIPTION_FAILED, 224 }); 225 226 this.closeStream(transcriptionId); 227 return; 228 } 229 230 const update = JSON.parse(data) as WhisperJob; 231 await this.handleWhisperUpdate(transcriptionId, update); 232 } catch (err) { 233 console.error( 234 `[Stream] Error processing update for ${transcriptionId}:`, 235 err, 236 ); 237 } 238 }, 239 }); 240 241 this.activeStreams.set(transcriptionId, es); 242 } 243 244 private async handleWhisperUpdate( 245 transcriptionId: string, 246 update: WhisperJob, 247 ) { 248 if (update.status === "pending") { 249 // Initial status, no action needed 250 return; 251 } 252 253 if (update.status === "processing") { 254 // Murmur is initializing (file I/O, WhisperKit setup) - no transcript yet 255 const progress = Math.min(100, update.progress ?? 0); 256 257 this.updateTranscription(transcriptionId, { 258 status: "processing", 259 progress, 260 }); 261 262 this.events.emit(transcriptionId, { 263 status: "processing", 264 progress, 265 }); 266 } else if (update.status === "transcribing") { 267 // Active transcription with progress callbacks 268 const progress = Math.min(100, update.progress ?? 0); 269 270 // If progress is still 0, keep status as "processing" until real progress starts 271 const status = progress === 0 ? "processing" : "transcribing"; 272 273 // Strip WhisperKit special tokens from intermediate transcript 274 let transcript = update.transcript ?? ""; 275 transcript = transcript.replace(/<\|[^|]+\|>/g, "").trim(); 276 277 this.updateTranscription(transcriptionId, { 278 status, 279 progress, 280 }); 281 282 this.events.emit(transcriptionId, { 283 status, 284 progress, 285 transcript: transcript || undefined, 286 }); 287 } else if (update.status === "completed") { 288 // Set to finalizing state while we fetch and process the VTT 289 this.updateTranscription(transcriptionId, { 290 status: "finalizing", 291 progress: 100, 292 }); 293 294 this.events.emit(transcriptionId, { 295 status: "finalizing", 296 progress: 100, 297 }); 298 299 // Fetch and save VTT file from Murmur 300 const whisperJobId = this.db 301 .query<{ whisper_job_id: string }, [string]>( 302 "SELECT whisper_job_id FROM transcriptions WHERE id = ?", 303 ) 304 .get(transcriptionId)?.whisper_job_id; 305 306 if (whisperJobId) { 307 try { 308 const vttResponse = await fetch( 309 `${this.serviceUrl}/transcribe/${whisperJobId}?format=vtt`, 310 ); 311 if (vttResponse.ok) { 312 const vttContent = await vttResponse.text(); 313 const cleanedVTT = await cleanVTT(transcriptionId, vttContent); 314 await saveTranscriptVTT(transcriptionId, cleanedVTT); 315 this.updateTranscription(transcriptionId, {}); 316 } 317 } catch (error) { 318 console.warn( 319 `[Transcription] Failed to fetch VTT for ${transcriptionId}:`, 320 error, 321 ); 322 } 323 } 324 325 this.updateTranscription(transcriptionId, { 326 status: "completed", 327 progress: 100, 328 }); 329 330 this.events.emit(transcriptionId, { 331 status: "completed", 332 progress: 100, 333 }); 334 335 // Close stream - keep audio file for playback 336 this.closeStream(transcriptionId); 337 } else if (update.status === "failed") { 338 const errorMessage = ( 339 update.error_message ?? "Transcription failed" 340 ).substring(0, MAX_ERROR_LENGTH); 341 342 this.updateTranscription(transcriptionId, { 343 status: "failed", 344 error_message: errorMessage, 345 }); 346 347 this.events.emit(transcriptionId, { 348 status: "failed", 349 progress: 0, 350 error_message: errorMessage, 351 error_code: ErrorCode.TRANSCRIPTION_FAILED, 352 }); 353 354 // Only close stream - keep failed jobs in Whisper for debugging 355 this.closeStream(transcriptionId); 356 } 357 } 358 359 private closeStream(transcriptionId: string) { 360 const es = this.activeStreams.get(transcriptionId); 361 if (es) { 362 es.close(); 363 this.activeStreams.delete(transcriptionId); 364 } 365 this.streamLocks.delete(transcriptionId); 366 } 367 368 private updateTranscription( 369 transcriptionId: string, 370 data: { 371 status?: TranscriptionStatus; 372 progress?: number; 373 error_message?: string; 374 vttContent?: string; 375 }, 376 ) { 377 const updates: string[] = []; 378 const values: (string | number)[] = []; 379 380 if (data.status !== undefined) { 381 updates.push("status = ?"); 382 values.push(data.status); 383 } 384 if (data.progress !== undefined) { 385 updates.push("progress = ?"); 386 values.push(data.progress); 387 } 388 if (data.error_message !== undefined) { 389 updates.push("error_message = ?"); 390 values.push(data.error_message); 391 } 392 393 updates.push("updated_at = ?"); 394 values.push(Math.floor(Date.now() / 1000)); 395 396 values.push(transcriptionId); 397 398 this.db.run( 399 `UPDATE transcriptions SET ${updates.join(", ")} WHERE id = ?`, 400 values, 401 ); 402 } 403 404 async syncWithWhisper(): Promise<void> { 405 const whisperJobs = await this.fetchWhisperJobs(); 406 if (!whisperJobs) { 407 console.warn("[Sync] Murmur service unavailable"); 408 return; 409 } 410 411 const activeDbJobs = this.getActiveDbJobs(); 412 const activeJobsMap = new Map(activeDbJobs.map((j) => [j.id, j])); 413 414 await this.syncWhisperJobsToDb(whisperJobs, activeJobsMap); 415 await this.syncDbJobsToWhisper(activeDbJobs, whisperJobs); 416 } 417 418 private async fetchWhisperJobs(): Promise<WhisperJob[] | null> { 419 try { 420 const response = await fetch(`${this.serviceUrl}/jobs`); 421 if (!response.ok) { 422 console.warn("[Sync] Whisper service unavailable"); 423 return null; 424 } 425 const { jobs } = await response.json(); 426 return jobs; 427 } catch { 428 return null; 429 } 430 } 431 432 private getActiveDbJobs(): Array<{ 433 id: string; 434 whisper_job_id: string | null; 435 filename: string; 436 status: string; 437 }> { 438 return this.db 439 .query< 440 { 441 id: string; 442 whisper_job_id: string | null; 443 filename: string; 444 status: string; 445 }, 446 [] 447 >( 448 "SELECT id, whisper_job_id, filename, status FROM transcriptions WHERE status IN ('uploading', 'processing', 'transcribing')", 449 ) 450 .all(); 451 } 452 453 private async syncWhisperJobsToDb( 454 whisperJobs: WhisperJob[], 455 activeJobsMap: Map< 456 string, 457 { 458 id: string; 459 whisper_job_id: string | null; 460 filename: string; 461 status: string; 462 } 463 >, 464 ) { 465 for (const whisperJob of whisperJobs) { 466 // Try to find by whisper_job_id first, then fall back to id 467 let localJob = Array.from(activeJobsMap.values()).find( 468 (j) => j.whisper_job_id === whisperJob.id, 469 ); 470 471 if (!localJob) { 472 // Legacy: try matching by our transcriptionId === whisperJob.id 473 localJob = activeJobsMap.get(whisperJob.id); 474 } 475 476 if (!localJob) { 477 await this.handleOrphanedWhisperJob(whisperJob.id); 478 continue; 479 } 480 481 // Reconnect to active jobs on startup 482 if ( 483 whisperJob.status === "processing" || 484 whisperJob.status === "transcribing" 485 ) { 486 // Check if we're already streaming this job 487 if (!this.activeStreams.has(localJob.id)) { 488 console.log( 489 `[Sync] Reconnecting to active job ${localJob.id} (Murmur job ${whisperJob.id})`, 490 ); 491 this.streamWhisperJob(localJob.id, whisperJob.id); 492 } 493 } else if ( 494 whisperJob.status === "completed" || 495 whisperJob.status === "failed" 496 ) { 497 // Use our transcription ID, not Murmur's job ID 498 await this.syncCompletedJob(whisperJob, localJob.id); 499 } 500 } 501 } 502 503 private async handleOrphanedWhisperJob(jobId: string) { 504 // Check if this Murmur job_id exists in our DB (either as id or whisper_job_id) 505 const jobExists = this.db 506 .query<{ id: string }, [string, string]>( 507 "SELECT id FROM transcriptions WHERE id = ? OR whisper_job_id = ?", 508 ) 509 .get(jobId, jobId); 510 511 if (!jobExists) { 512 // Not our job - Murmur will keep it until explicitly deleted 513 console.warn( 514 `[Sync] Found orphaned job ${jobId} in Murmur (not in our DB)`, 515 ); 516 } 517 } 518 519 private async syncCompletedJob( 520 whisperJob: WhisperJob, 521 transcriptionId: string, 522 ) { 523 try { 524 const details = await this.fetchJobDetails(whisperJob.id); 525 if (!details) return; 526 527 if (details.status === "completed") { 528 // Fetch and save VTT file 529 try { 530 const vttResponse = await fetch( 531 `${this.serviceUrl}/transcribe/${whisperJob.id}?format=vtt`, 532 ); 533 if (vttResponse.ok) { 534 const vttContent = await vttResponse.text(); 535 const cleanedVTT = await cleanVTT(transcriptionId, vttContent); 536 await saveTranscriptVTT(transcriptionId, cleanedVTT); 537 this.updateTranscription(transcriptionId, {}); 538 } 539 } catch (error) { 540 console.warn( 541 `[Sync] Failed to fetch VTT for ${transcriptionId}:`, 542 error, 543 ); 544 } 545 546 // Set to finalizing state while we process 547 this.updateTranscription(transcriptionId, { 548 status: "finalizing", 549 progress: 100, 550 }); 551 552 this.events.emit(transcriptionId, { 553 status: "finalizing", 554 progress: 100, 555 }); 556 557 // Then immediately mark as completed 558 this.updateTranscription(transcriptionId, { 559 status: "completed", 560 progress: 100, 561 }); 562 563 this.events.emit(transcriptionId, { 564 status: "completed", 565 progress: 100, 566 }); 567 } else if (details.status === "failed") { 568 const errorMessage = ( 569 details.error_message ?? "Transcription failed" 570 ).substring(0, MAX_ERROR_LENGTH); 571 572 this.updateTranscription(transcriptionId, { 573 status: "failed", 574 error_message: errorMessage, 575 }); 576 577 this.events.emit(transcriptionId, { 578 status: "failed", 579 progress: 0, 580 error_message: errorMessage, 581 }); 582 } 583 584 // Job persists in Murmur until explicitly deleted - we just sync state 585 } catch { 586 console.warn( 587 `[Sync] Failed to retrieve details for job ${whisperJob.id}`, 588 ); 589 } 590 } 591 592 private async fetchJobDetails(jobId: string): Promise<WhisperJob | null> { 593 const response = await fetch(`${this.serviceUrl}/transcribe/${jobId}`); 594 if (!response.ok) return null; 595 return response.json(); 596 } 597 598 private async syncDbJobsToWhisper( 599 activeDbJobs: Array<{ 600 id: string; 601 whisper_job_id: string | null; 602 filename: string; 603 status: string; 604 }>, 605 whisperJobs: WhisperJob[], 606 ) { 607 for (const localJob of activeDbJobs) { 608 // Check if Murmur has this job (by whisper_job_id or legacy id match) 609 const whisperHasJob = whisperJobs.some( 610 (wj) => wj.id === localJob.whisper_job_id || wj.id === localJob.id, 611 ); 612 613 if (!whisperHasJob && localJob.whisper_job_id) { 614 // Job was lost from Murmur, mark as failed 615 const errorMessage = "Job lost - whisper service may have restarted"; 616 617 this.updateTranscription(localJob.id, { 618 status: "failed", 619 error_message: errorMessage, 620 }); 621 622 this.events.emit(localJob.id, { 623 status: "failed", 624 progress: 0, 625 error_message: errorMessage, 626 }); 627 } 628 } 629 } 630 631 async cleanupStaleFiles(): Promise<void> { 632 try { 633 // Find transcriptions older than 24 hours that are completed or failed 634 const staleTranscriptions = this.db 635 .query<{ filename: string }, [number]>( 636 `SELECT filename FROM transcriptions 637 WHERE status IN ('completed', 'failed') 638 AND updated_at < ?`, 639 ) 640 .all(Math.floor(Date.now() / 1000) - 24 * 60 * 60); 641 642 for (const { filename } of staleTranscriptions) { 643 const filePath = `./uploads/${filename}`; 644 await Bun.write(filePath, "").catch(() => {}); 645 } 646 } catch (error) { 647 console.error("[Cleanup] Failed:", error); 648 } 649 } 650}