馃 distributed transcription service
thistle.dunkirk.sh
1import type { Database } from "bun:sqlite";
2import { createEventSource } from "eventsource-client";
3import { ErrorCode } from "./errors";
4
5// Constants
6export const MAX_FILE_SIZE = 25 * 1024 * 1024; // 25MB
7export const MAX_TRANSCRIPT_LENGTH = 50000;
8export const MAX_ERROR_LENGTH = 255;
9
10// Types
11export type TranscriptionStatus =
12 | "uploading"
13 | "processing"
14 | "completed"
15 | "failed";
16
17export interface TranscriptionUpdate {
18 status: TranscriptionStatus;
19 progress: number;
20 transcript?: string;
21 error_message?: string;
22 error_code?: string;
23}
24
25export interface WhisperJob {
26 id: string;
27 status: string;
28 progress?: number;
29 transcript?: string;
30 error_message?: string;
31}
32
33// Event emitter for real-time transcription updates with automatic cleanup
34export class TranscriptionEventEmitter {
35 private listeners = new Map<
36 string,
37 Set<(data: TranscriptionUpdate) => void>
38 >();
39 private cleanupTimers = new Map<string, NodeJS.Timeout>();
40
41 on(transcriptionId: string, callback: (data: TranscriptionUpdate) => void) {
42 if (!this.listeners.has(transcriptionId)) {
43 this.listeners.set(transcriptionId, new Set());
44 }
45 this.listeners.get(transcriptionId)?.add(callback);
46
47 // Clear any pending cleanup for this transcription
48 const timer = this.cleanupTimers.get(transcriptionId);
49 if (timer) {
50 clearTimeout(timer);
51 this.cleanupTimers.delete(transcriptionId);
52 }
53 }
54
55 off(transcriptionId: string, callback: (data: TranscriptionUpdate) => void) {
56 this.listeners.get(transcriptionId)?.delete(callback);
57
58 // Schedule cleanup if no listeners remain
59 if (this.listeners.get(transcriptionId)?.size === 0) {
60 this.scheduleCleanup(transcriptionId);
61 }
62 }
63
64 emit(transcriptionId: string, data: TranscriptionUpdate) {
65 const callbacks = this.listeners.get(transcriptionId);
66 if (callbacks) {
67 for (const callback of callbacks) {
68 callback(data);
69 }
70 }
71
72 // Auto-cleanup completed/failed jobs after emission
73 if (data.status === "completed" || data.status === "failed") {
74 this.scheduleCleanup(transcriptionId);
75 }
76 }
77
78 hasListeners(transcriptionId: string): boolean {
79 return (this.listeners.get(transcriptionId)?.size ?? 0) > 0;
80 }
81
82 private scheduleCleanup(transcriptionId: string) {
83 // Clean up listeners after 5 minutes of inactivity
84 const timer = setTimeout(
85 () => {
86 this.listeners.delete(transcriptionId);
87 this.cleanupTimers.delete(transcriptionId);
88 },
89 5 * 60 * 1000,
90 );
91
92 this.cleanupTimers.set(transcriptionId, timer);
93 }
94}
95
96// Whisper service manager
97export class WhisperServiceManager {
98 private activeStreams = new Map<
99 string,
100 ReturnType<typeof createEventSource>
101 >();
102 private streamLocks = new Set<string>();
103
104 constructor(
105 private serviceUrl: string,
106 private db: Database,
107 private events: TranscriptionEventEmitter,
108 ) {}
109
110 async startTranscription(
111 transcriptionId: string,
112 filename: string,
113 ): Promise<void> {
114 try {
115 // Update status to processing
116 this.updateTranscription(transcriptionId, {
117 status: "processing",
118 progress: 10,
119 });
120
121 // Read file from disk
122 const filePath = `./uploads/${filename}`;
123 const fileBuffer = await Bun.file(filePath).arrayBuffer();
124
125 // Create form data for the faster-whisper server
126 const formData = new FormData();
127 const file = new File([fileBuffer], filename, { type: "audio/mpeg" });
128 formData.append("file", file);
129
130 // Call the faster-whisper server to start transcription
131 const response = await fetch(`${this.serviceUrl}/transcribe`, {
132 method: "POST",
133 body: formData,
134 });
135
136 if (!response.ok) {
137 const errorText = await response.text().catch(() => "Unknown error");
138 throw new Error(
139 `Whisper service returned ${response.status}: ${errorText}`,
140 );
141 }
142
143 const { job_id } = await response.json();
144
145 // Connect to SSE stream from Whisper
146 this.streamWhisperJob(transcriptionId, job_id, filePath);
147 } catch (error) {
148 console.error(
149 `[Transcription] Failed to start ${transcriptionId}:`,
150 error,
151 );
152 const errorMessage =
153 error instanceof Error ? error.message : "Unknown error";
154 const errorCode =
155 error instanceof Error && error.message.includes("Whisper service")
156 ? ErrorCode.WHISPER_SERVICE_ERROR
157 : ErrorCode.TRANSCRIPTION_FAILED;
158
159 this.updateTranscription(transcriptionId, {
160 status: "failed",
161 error_message: errorMessage,
162 });
163
164 this.events.emit(transcriptionId, {
165 status: "failed",
166 progress: 0,
167 error_message: errorMessage,
168 error_code: errorCode,
169 });
170 }
171 }
172
173 private streamWhisperJob(
174 transcriptionId: string,
175 jobId: string,
176 filePath: string,
177 ) {
178 // Prevent duplicate streams using locks
179 if (this.streamLocks.has(transcriptionId)) {
180 return;
181 }
182
183 this.streamLocks.add(transcriptionId);
184
185 const es = createEventSource({
186 url: `${this.serviceUrl}/transcribe/${jobId}/stream`,
187 onMessage: ({ data }) => {
188 try {
189 const update = JSON.parse(data) as WhisperJob;
190 this.handleWhisperUpdate(transcriptionId, jobId, filePath, update);
191 } catch (err) {
192 console.error(
193 `[Stream] Error processing update for ${transcriptionId}:`,
194 err,
195 );
196 }
197 },
198 });
199
200 this.activeStreams.set(transcriptionId, es);
201 }
202
203 private handleWhisperUpdate(
204 transcriptionId: string,
205 jobId: string,
206 filePath: string,
207 update: WhisperJob,
208 ) {
209 if (update.status === "processing") {
210 const progress = Math.max(10, Math.min(95, update.progress ?? 0));
211 this.updateTranscription(transcriptionId, { progress });
212
213 this.events.emit(transcriptionId, {
214 status: "processing",
215 progress,
216 });
217 } else if (update.status === "completed") {
218 const transcript = (update.transcript ?? "").substring(
219 0,
220 MAX_TRANSCRIPT_LENGTH,
221 );
222
223 this.updateTranscription(transcriptionId, {
224 status: "completed",
225 progress: 100,
226 transcript,
227 });
228
229 this.events.emit(transcriptionId, {
230 status: "completed",
231 progress: 100,
232 transcript,
233 });
234
235 // Clean up
236 this.cleanupJob(transcriptionId, jobId, filePath);
237 } else if (update.status === "failed") {
238 const errorMessage = (
239 update.error_message ?? "Transcription failed"
240 ).substring(0, MAX_ERROR_LENGTH);
241
242 this.updateTranscription(transcriptionId, {
243 status: "failed",
244 error_message: errorMessage,
245 });
246
247 this.events.emit(transcriptionId, {
248 status: "failed",
249 progress: 0,
250 error_message: errorMessage,
251 error_code: ErrorCode.TRANSCRIPTION_FAILED,
252 });
253
254 this.closeStream(transcriptionId);
255 this.deleteWhisperJob(jobId);
256 }
257 }
258
259 private cleanupJob(transcriptionId: string, jobId: string, filePath: string) {
260 // Delete uploaded file
261 Bun.file(filePath)
262 .text()
263 .then(() => Bun.write(filePath, ""))
264 .catch(() => {});
265
266 this.closeStream(transcriptionId);
267 this.deleteWhisperJob(jobId);
268 }
269
270 private closeStream(transcriptionId: string) {
271 const es = this.activeStreams.get(transcriptionId);
272 if (es) {
273 es.close();
274 this.activeStreams.delete(transcriptionId);
275 }
276 this.streamLocks.delete(transcriptionId);
277 }
278
279 private async deleteWhisperJob(jobId: string) {
280 try {
281 await fetch(`${this.serviceUrl}/transcribe/${jobId}`, {
282 method: "DELETE",
283 });
284 } catch {
285 // Silent fail - job may already be deleted
286 }
287 }
288
289 private updateTranscription(
290 transcriptionId: string,
291 data: {
292 status?: TranscriptionStatus;
293 progress?: number;
294 transcript?: string;
295 error_message?: string;
296 },
297 ) {
298 const updates: string[] = [];
299 const values: (string | number)[] = [];
300
301 if (data.status !== undefined) {
302 updates.push("status = ?");
303 values.push(data.status);
304 }
305 if (data.progress !== undefined) {
306 updates.push("progress = ?");
307 values.push(data.progress);
308 }
309 if (data.transcript !== undefined) {
310 updates.push("transcript = ?");
311 values.push(data.transcript);
312 }
313 if (data.error_message !== undefined) {
314 updates.push("error_message = ?");
315 values.push(data.error_message);
316 }
317
318 updates.push("updated_at = ?");
319 values.push(Math.floor(Date.now() / 1000));
320
321 values.push(transcriptionId);
322
323 this.db.run(
324 `UPDATE transcriptions SET ${updates.join(", ")} WHERE id = ?`,
325 values,
326 );
327 }
328
329 async syncWithWhisper(): Promise<void> {
330 try {
331 const whisperJobs = await this.fetchWhisperJobs();
332 if (!whisperJobs) return;
333
334 const activeDbJobs = this.getActiveDbJobs();
335 const activeJobsMap = new Map(activeDbJobs.map((j) => [j.id, j]));
336
337 await this.syncWhisperJobsToDb(whisperJobs, activeJobsMap);
338 await this.syncDbJobsToWhisper(activeDbJobs, whisperJobs);
339 } catch (error) {
340 console.warn(
341 "[Sync] Failed:",
342 error instanceof Error ? error.message : "Unknown error",
343 );
344 }
345 }
346
347 private async fetchWhisperJobs(): Promise<WhisperJob[] | null> {
348 try {
349 const response = await fetch(`${this.serviceUrl}/jobs`);
350 if (!response.ok) {
351 console.warn("[Sync] Whisper service unavailable");
352 return null;
353 }
354 const { jobs } = await response.json();
355 return jobs;
356 } catch {
357 return null;
358 }
359 }
360
361 private getActiveDbJobs(): Array<{
362 id: string;
363 filename: string;
364 status: string;
365 }> {
366 return this.db
367 .query<{ id: string; filename: string; status: string }, []>(
368 "SELECT id, filename, status FROM transcriptions WHERE status IN ('uploading', 'processing')",
369 )
370 .all();
371 }
372
373 private async syncWhisperJobsToDb(
374 whisperJobs: WhisperJob[],
375 activeJobsMap: Map<
376 string,
377 { id: string; filename: string; status: string }
378 >,
379 ) {
380 for (const whisperJob of whisperJobs) {
381 const localJob = activeJobsMap.get(whisperJob.id);
382
383 if (!localJob) {
384 await this.handleOrphanedWhisperJob(whisperJob.id);
385 continue;
386 }
387
388 if (whisperJob.status === "completed" || whisperJob.status === "failed") {
389 await this.syncCompletedJob(whisperJob);
390 }
391 }
392 }
393
394 private async handleOrphanedWhisperJob(jobId: string) {
395 const jobExists = this.db
396 .query<{ id: string }, [string]>(
397 "SELECT id FROM transcriptions WHERE id = ?",
398 )
399 .get(jobId);
400
401 if (!jobExists) {
402 // Not our job, delete it from Whisper
403 await this.deleteWhisperJob(jobId);
404 }
405 }
406
407 private async syncCompletedJob(whisperJob: WhisperJob) {
408 try {
409 const details = await this.fetchJobDetails(whisperJob.id);
410 if (!details) return;
411
412 if (details.status === "completed") {
413 const transcript =
414 details.transcript?.substring(0, MAX_TRANSCRIPT_LENGTH) ?? "";
415
416 this.updateTranscription(whisperJob.id, {
417 status: "completed",
418 progress: 100,
419 transcript,
420 });
421
422 this.events.emit(whisperJob.id, {
423 status: "completed",
424 progress: 100,
425 transcript,
426 });
427 } else if (details.status === "failed") {
428 const errorMessage = (
429 details.error_message ?? "Transcription failed"
430 ).substring(0, MAX_ERROR_LENGTH);
431
432 this.updateTranscription(whisperJob.id, {
433 status: "failed",
434 error_message: errorMessage,
435 });
436
437 this.events.emit(whisperJob.id, {
438 status: "failed",
439 progress: 0,
440 error_message: errorMessage,
441 });
442 }
443
444 await this.deleteWhisperJob(whisperJob.id);
445 } catch {
446 console.warn(
447 `[Sync] Failed to retrieve details for job ${whisperJob.id}`,
448 );
449 }
450 }
451
452 private async fetchJobDetails(jobId: string): Promise<WhisperJob | null> {
453 const response = await fetch(`${this.serviceUrl}/transcribe/${jobId}`);
454 if (!response.ok) return null;
455 return response.json();
456 }
457
458 private async syncDbJobsToWhisper(
459 activeDbJobs: Array<{ id: string; filename: string; status: string }>,
460 whisperJobs: WhisperJob[],
461 ) {
462 for (const localJob of activeDbJobs) {
463 const whisperHasJob = whisperJobs.some((wj) => wj.id === localJob.id);
464
465 if (!whisperHasJob) {
466 // Job was lost, mark as failed
467 const errorMessage = "Job lost - whisper service may have restarted";
468
469 this.updateTranscription(localJob.id, {
470 status: "failed",
471 error_message: errorMessage,
472 });
473
474 this.events.emit(localJob.id, {
475 status: "failed",
476 progress: 0,
477 error_message: errorMessage,
478 });
479 }
480 }
481 }
482
483 async cleanupStaleFiles(): Promise<void> {
484 try {
485 // Find transcriptions older than 24 hours that are completed or failed
486 const staleTranscriptions = this.db
487 .query<{ filename: string }, [number]>(
488 `SELECT filename FROM transcriptions
489 WHERE status IN ('completed', 'failed')
490 AND updated_at < ?`,
491 )
492 .all(Math.floor(Date.now() / 1000) - 24 * 60 * 60);
493
494 for (const { filename } of staleTranscriptions) {
495 const filePath = `./uploads/${filename}`;
496 await Bun.write(filePath, "").catch(() => {});
497 }
498 } catch (error) {
499 console.error("[Cleanup] Failed:", error);
500 }
501 }
502}