馃 distributed transcription service thistle.dunkirk.sh
1import type { Database } from "bun:sqlite"; 2import { createEventSource } from "eventsource-client"; 3import { ErrorCode } from "./errors"; 4 5// Constants 6export const MAX_FILE_SIZE = 25 * 1024 * 1024; // 25MB 7export const MAX_TRANSCRIPT_LENGTH = 50000; 8export const MAX_ERROR_LENGTH = 255; 9 10// Types 11export type TranscriptionStatus = 12 | "uploading" 13 | "processing" 14 | "completed" 15 | "failed"; 16 17export interface TranscriptionUpdate { 18 status: TranscriptionStatus; 19 progress: number; 20 transcript?: string; 21 error_message?: string; 22 error_code?: string; 23} 24 25export interface WhisperJob { 26 id: string; 27 status: string; 28 progress?: number; 29 transcript?: string; 30 error_message?: string; 31} 32 33// Event emitter for real-time transcription updates with automatic cleanup 34export class TranscriptionEventEmitter { 35 private listeners = new Map< 36 string, 37 Set<(data: TranscriptionUpdate) => void> 38 >(); 39 private cleanupTimers = new Map<string, NodeJS.Timeout>(); 40 41 on(transcriptionId: string, callback: (data: TranscriptionUpdate) => void) { 42 if (!this.listeners.has(transcriptionId)) { 43 this.listeners.set(transcriptionId, new Set()); 44 } 45 this.listeners.get(transcriptionId)?.add(callback); 46 47 // Clear any pending cleanup for this transcription 48 const timer = this.cleanupTimers.get(transcriptionId); 49 if (timer) { 50 clearTimeout(timer); 51 this.cleanupTimers.delete(transcriptionId); 52 } 53 } 54 55 off(transcriptionId: string, callback: (data: TranscriptionUpdate) => void) { 56 this.listeners.get(transcriptionId)?.delete(callback); 57 58 // Schedule cleanup if no listeners remain 59 if (this.listeners.get(transcriptionId)?.size === 0) { 60 this.scheduleCleanup(transcriptionId); 61 } 62 } 63 64 emit(transcriptionId: string, data: TranscriptionUpdate) { 65 const callbacks = this.listeners.get(transcriptionId); 66 if (callbacks) { 67 for (const callback of callbacks) { 68 callback(data); 69 } 70 } 71 72 // Auto-cleanup completed/failed jobs after emission 73 if (data.status === "completed" || data.status === "failed") { 74 this.scheduleCleanup(transcriptionId); 75 } 76 } 77 78 hasListeners(transcriptionId: string): boolean { 79 return (this.listeners.get(transcriptionId)?.size ?? 0) > 0; 80 } 81 82 private scheduleCleanup(transcriptionId: string) { 83 // Clean up listeners after 5 minutes of inactivity 84 const timer = setTimeout( 85 () => { 86 this.listeners.delete(transcriptionId); 87 this.cleanupTimers.delete(transcriptionId); 88 }, 89 5 * 60 * 1000, 90 ); 91 92 this.cleanupTimers.set(transcriptionId, timer); 93 } 94} 95 96// Whisper service manager 97export class WhisperServiceManager { 98 private activeStreams = new Map< 99 string, 100 ReturnType<typeof createEventSource> 101 >(); 102 private streamLocks = new Set<string>(); 103 104 constructor( 105 private serviceUrl: string, 106 private db: Database, 107 private events: TranscriptionEventEmitter, 108 ) {} 109 110 async startTranscription( 111 transcriptionId: string, 112 filename: string, 113 ): Promise<void> { 114 try { 115 // Update status to processing 116 this.updateTranscription(transcriptionId, { 117 status: "processing", 118 progress: 10, 119 }); 120 121 // Read file from disk 122 const filePath = `./uploads/${filename}`; 123 const fileBuffer = await Bun.file(filePath).arrayBuffer(); 124 125 // Create form data for the faster-whisper server 126 const formData = new FormData(); 127 const file = new File([fileBuffer], filename, { type: "audio/mpeg" }); 128 formData.append("file", file); 129 130 // Call the faster-whisper server to start transcription 131 const response = await fetch(`${this.serviceUrl}/transcribe`, { 132 method: "POST", 133 body: formData, 134 }); 135 136 if (!response.ok) { 137 const errorText = await response.text().catch(() => "Unknown error"); 138 throw new Error( 139 `Whisper service returned ${response.status}: ${errorText}`, 140 ); 141 } 142 143 const { job_id } = await response.json(); 144 145 // Connect to SSE stream from Whisper 146 this.streamWhisperJob(transcriptionId, job_id, filePath); 147 } catch (error) { 148 console.error( 149 `[Transcription] Failed to start ${transcriptionId}:`, 150 error, 151 ); 152 const errorMessage = 153 error instanceof Error ? error.message : "Unknown error"; 154 const errorCode = 155 error instanceof Error && error.message.includes("Whisper service") 156 ? ErrorCode.WHISPER_SERVICE_ERROR 157 : ErrorCode.TRANSCRIPTION_FAILED; 158 159 this.updateTranscription(transcriptionId, { 160 status: "failed", 161 error_message: errorMessage, 162 }); 163 164 this.events.emit(transcriptionId, { 165 status: "failed", 166 progress: 0, 167 error_message: errorMessage, 168 error_code: errorCode, 169 }); 170 } 171 } 172 173 private streamWhisperJob( 174 transcriptionId: string, 175 jobId: string, 176 filePath: string, 177 ) { 178 // Prevent duplicate streams using locks 179 if (this.streamLocks.has(transcriptionId)) { 180 return; 181 } 182 183 this.streamLocks.add(transcriptionId); 184 185 const es = createEventSource({ 186 url: `${this.serviceUrl}/transcribe/${jobId}/stream`, 187 onMessage: ({ data }) => { 188 try { 189 const update = JSON.parse(data) as WhisperJob; 190 this.handleWhisperUpdate(transcriptionId, jobId, filePath, update); 191 } catch (err) { 192 console.error( 193 `[Stream] Error processing update for ${transcriptionId}:`, 194 err, 195 ); 196 } 197 }, 198 }); 199 200 this.activeStreams.set(transcriptionId, es); 201 } 202 203 private handleWhisperUpdate( 204 transcriptionId: string, 205 jobId: string, 206 filePath: string, 207 update: WhisperJob, 208 ) { 209 if (update.status === "processing") { 210 const progress = Math.max(10, Math.min(95, update.progress ?? 0)); 211 this.updateTranscription(transcriptionId, { progress }); 212 213 this.events.emit(transcriptionId, { 214 status: "processing", 215 progress, 216 }); 217 } else if (update.status === "completed") { 218 const transcript = (update.transcript ?? "").substring( 219 0, 220 MAX_TRANSCRIPT_LENGTH, 221 ); 222 223 this.updateTranscription(transcriptionId, { 224 status: "completed", 225 progress: 100, 226 transcript, 227 }); 228 229 this.events.emit(transcriptionId, { 230 status: "completed", 231 progress: 100, 232 transcript, 233 }); 234 235 // Clean up 236 this.cleanupJob(transcriptionId, jobId, filePath); 237 } else if (update.status === "failed") { 238 const errorMessage = ( 239 update.error_message ?? "Transcription failed" 240 ).substring(0, MAX_ERROR_LENGTH); 241 242 this.updateTranscription(transcriptionId, { 243 status: "failed", 244 error_message: errorMessage, 245 }); 246 247 this.events.emit(transcriptionId, { 248 status: "failed", 249 progress: 0, 250 error_message: errorMessage, 251 error_code: ErrorCode.TRANSCRIPTION_FAILED, 252 }); 253 254 this.closeStream(transcriptionId); 255 this.deleteWhisperJob(jobId); 256 } 257 } 258 259 private cleanupJob(transcriptionId: string, jobId: string, filePath: string) { 260 // Delete uploaded file 261 Bun.file(filePath) 262 .text() 263 .then(() => Bun.write(filePath, "")) 264 .catch(() => {}); 265 266 this.closeStream(transcriptionId); 267 this.deleteWhisperJob(jobId); 268 } 269 270 private closeStream(transcriptionId: string) { 271 const es = this.activeStreams.get(transcriptionId); 272 if (es) { 273 es.close(); 274 this.activeStreams.delete(transcriptionId); 275 } 276 this.streamLocks.delete(transcriptionId); 277 } 278 279 private async deleteWhisperJob(jobId: string) { 280 try { 281 await fetch(`${this.serviceUrl}/transcribe/${jobId}`, { 282 method: "DELETE", 283 }); 284 } catch { 285 // Silent fail - job may already be deleted 286 } 287 } 288 289 private updateTranscription( 290 transcriptionId: string, 291 data: { 292 status?: TranscriptionStatus; 293 progress?: number; 294 transcript?: string; 295 error_message?: string; 296 }, 297 ) { 298 const updates: string[] = []; 299 const values: (string | number)[] = []; 300 301 if (data.status !== undefined) { 302 updates.push("status = ?"); 303 values.push(data.status); 304 } 305 if (data.progress !== undefined) { 306 updates.push("progress = ?"); 307 values.push(data.progress); 308 } 309 if (data.transcript !== undefined) { 310 updates.push("transcript = ?"); 311 values.push(data.transcript); 312 } 313 if (data.error_message !== undefined) { 314 updates.push("error_message = ?"); 315 values.push(data.error_message); 316 } 317 318 updates.push("updated_at = ?"); 319 values.push(Math.floor(Date.now() / 1000)); 320 321 values.push(transcriptionId); 322 323 this.db.run( 324 `UPDATE transcriptions SET ${updates.join(", ")} WHERE id = ?`, 325 values, 326 ); 327 } 328 329 async syncWithWhisper(): Promise<void> { 330 try { 331 const whisperJobs = await this.fetchWhisperJobs(); 332 if (!whisperJobs) return; 333 334 const activeDbJobs = this.getActiveDbJobs(); 335 const activeJobsMap = new Map(activeDbJobs.map((j) => [j.id, j])); 336 337 await this.syncWhisperJobsToDb(whisperJobs, activeJobsMap); 338 await this.syncDbJobsToWhisper(activeDbJobs, whisperJobs); 339 } catch (error) { 340 console.warn( 341 "[Sync] Failed:", 342 error instanceof Error ? error.message : "Unknown error", 343 ); 344 } 345 } 346 347 private async fetchWhisperJobs(): Promise<WhisperJob[] | null> { 348 try { 349 const response = await fetch(`${this.serviceUrl}/jobs`); 350 if (!response.ok) { 351 console.warn("[Sync] Whisper service unavailable"); 352 return null; 353 } 354 const { jobs } = await response.json(); 355 return jobs; 356 } catch { 357 return null; 358 } 359 } 360 361 private getActiveDbJobs(): Array<{ 362 id: string; 363 filename: string; 364 status: string; 365 }> { 366 return this.db 367 .query<{ id: string; filename: string; status: string }, []>( 368 "SELECT id, filename, status FROM transcriptions WHERE status IN ('uploading', 'processing')", 369 ) 370 .all(); 371 } 372 373 private async syncWhisperJobsToDb( 374 whisperJobs: WhisperJob[], 375 activeJobsMap: Map< 376 string, 377 { id: string; filename: string; status: string } 378 >, 379 ) { 380 for (const whisperJob of whisperJobs) { 381 const localJob = activeJobsMap.get(whisperJob.id); 382 383 if (!localJob) { 384 await this.handleOrphanedWhisperJob(whisperJob.id); 385 continue; 386 } 387 388 if (whisperJob.status === "completed" || whisperJob.status === "failed") { 389 await this.syncCompletedJob(whisperJob); 390 } 391 } 392 } 393 394 private async handleOrphanedWhisperJob(jobId: string) { 395 const jobExists = this.db 396 .query<{ id: string }, [string]>( 397 "SELECT id FROM transcriptions WHERE id = ?", 398 ) 399 .get(jobId); 400 401 if (!jobExists) { 402 // Not our job, delete it from Whisper 403 await this.deleteWhisperJob(jobId); 404 } 405 } 406 407 private async syncCompletedJob(whisperJob: WhisperJob) { 408 try { 409 const details = await this.fetchJobDetails(whisperJob.id); 410 if (!details) return; 411 412 if (details.status === "completed") { 413 const transcript = 414 details.transcript?.substring(0, MAX_TRANSCRIPT_LENGTH) ?? ""; 415 416 this.updateTranscription(whisperJob.id, { 417 status: "completed", 418 progress: 100, 419 transcript, 420 }); 421 422 this.events.emit(whisperJob.id, { 423 status: "completed", 424 progress: 100, 425 transcript, 426 }); 427 } else if (details.status === "failed") { 428 const errorMessage = ( 429 details.error_message ?? "Transcription failed" 430 ).substring(0, MAX_ERROR_LENGTH); 431 432 this.updateTranscription(whisperJob.id, { 433 status: "failed", 434 error_message: errorMessage, 435 }); 436 437 this.events.emit(whisperJob.id, { 438 status: "failed", 439 progress: 0, 440 error_message: errorMessage, 441 }); 442 } 443 444 await this.deleteWhisperJob(whisperJob.id); 445 } catch { 446 console.warn( 447 `[Sync] Failed to retrieve details for job ${whisperJob.id}`, 448 ); 449 } 450 } 451 452 private async fetchJobDetails(jobId: string): Promise<WhisperJob | null> { 453 const response = await fetch(`${this.serviceUrl}/transcribe/${jobId}`); 454 if (!response.ok) return null; 455 return response.json(); 456 } 457 458 private async syncDbJobsToWhisper( 459 activeDbJobs: Array<{ id: string; filename: string; status: string }>, 460 whisperJobs: WhisperJob[], 461 ) { 462 for (const localJob of activeDbJobs) { 463 const whisperHasJob = whisperJobs.some((wj) => wj.id === localJob.id); 464 465 if (!whisperHasJob) { 466 // Job was lost, mark as failed 467 const errorMessage = "Job lost - whisper service may have restarted"; 468 469 this.updateTranscription(localJob.id, { 470 status: "failed", 471 error_message: errorMessage, 472 }); 473 474 this.events.emit(localJob.id, { 475 status: "failed", 476 progress: 0, 477 error_message: errorMessage, 478 }); 479 } 480 } 481 } 482 483 async cleanupStaleFiles(): Promise<void> { 484 try { 485 // Find transcriptions older than 24 hours that are completed or failed 486 const staleTranscriptions = this.db 487 .query<{ filename: string }, [number]>( 488 `SELECT filename FROM transcriptions 489 WHERE status IN ('completed', 'failed') 490 AND updated_at < ?`, 491 ) 492 .all(Math.floor(Date.now() / 1000) - 24 * 60 * 60); 493 494 for (const { filename } of staleTranscriptions) { 495 const filePath = `./uploads/${filename}`; 496 await Bun.write(filePath, "").catch(() => {}); 497 } 498 } catch (error) { 499 console.error("[Cleanup] Failed:", error); 500 } 501 } 502}