馃 distributed transcription service
thistle.dunkirk.sh
1import db from "./db/schema";
2import {
3 authenticateUser,
4 cleanupExpiredSessions,
5 createSession,
6 createUser,
7 deleteSession,
8 deleteUser,
9 getSession,
10 getSessionFromRequest,
11 getUserBySession,
12 getUserSessionsForUser,
13 updateUserAvatar,
14 updateUserEmail,
15 updateUserName,
16 updateUserPassword,
17} from "./lib/auth";
18import { handleError, ValidationErrors } from "./lib/errors";
19import { requireAuth } from "./lib/middleware";
20import {
21 MAX_FILE_SIZE,
22 TranscriptionEventEmitter,
23 type TranscriptionUpdate,
24 WhisperServiceManager,
25} from "./lib/transcription";
26import { getTranscript, getTranscriptVTT } from "./lib/transcript-storage";
27import indexHTML from "./pages/index.html";
28import settingsHTML from "./pages/settings.html";
29import transcribeHTML from "./pages/transcribe.html";
30
31// Environment variables
32const WHISPER_SERVICE_URL =
33 process.env.WHISPER_SERVICE_URL || "http://localhost:8000";
34
35// Create uploads and transcripts directories if they don't exist
36await Bun.write("./uploads/.gitkeep", "");
37await Bun.write("./transcripts/.gitkeep", "");
38
39// Initialize transcription system
40console.log(
41 `[Transcription] Connecting to Murmur at ${WHISPER_SERVICE_URL}...`,
42);
43const transcriptionEvents = new TranscriptionEventEmitter();
44const whisperService = new WhisperServiceManager(
45 WHISPER_SERVICE_URL,
46 db,
47 transcriptionEvents,
48);
49
50// Clean up expired sessions every hour
51setInterval(cleanupExpiredSessions, 60 * 60 * 1000);
52
53// Sync with Whisper DB on startup
54try {
55 await whisperService.syncWithWhisper();
56 console.log("[Transcription] Successfully connected to Murmur");
57} catch (error) {
58 console.warn(
59 "[Transcription] Murmur unavailable at startup:",
60 error instanceof Error ? error.message : "Unknown error",
61 );
62}
63
64// Periodic sync every 5 minutes as backup (SSE handles real-time updates)
65setInterval(() => whisperService.syncWithWhisper(), 5 * 60 * 1000);
66
67// Clean up stale files daily
68setInterval(() => whisperService.cleanupStaleFiles(), 24 * 60 * 60 * 1000);
69
70const server = Bun.serve({
71 port: 3000,
72 idleTimeout: 120, // 120 seconds for SSE connections
73 routes: {
74 "/": indexHTML,
75 "/settings": settingsHTML,
76 "/transcribe": transcribeHTML,
77 "/api/auth/register": {
78 POST: async (req) => {
79 try {
80 const body = await req.json();
81 const { email, password, name } = body;
82 if (!email || !password) {
83 return Response.json(
84 { error: "Email and password required" },
85 { status: 400 },
86 );
87 }
88 if (password.length < 8) {
89 return Response.json(
90 { error: "Password must be at least 8 characters" },
91 { status: 400 },
92 );
93 }
94 const user = await createUser(email, password, name);
95 const ipAddress =
96 req.headers.get("x-forwarded-for") ??
97 req.headers.get("x-real-ip") ??
98 "unknown";
99 const userAgent = req.headers.get("user-agent") ?? "unknown";
100 const sessionId = createSession(user.id, ipAddress, userAgent);
101 return Response.json(
102 { user: { id: user.id, email: user.email } },
103 {
104 headers: {
105 "Set-Cookie": `session=${sessionId}; HttpOnly; Path=/; Max-Age=${7 * 24 * 60 * 60}; SameSite=Lax`,
106 },
107 },
108 );
109 } catch (err: unknown) {
110 const error = err as { message?: string };
111 if (error.message?.includes("UNIQUE constraint failed")) {
112 return Response.json(
113 { error: "Email already registered" },
114 { status: 400 },
115 );
116 }
117 return Response.json(
118 { error: "Registration failed" },
119 { status: 500 },
120 );
121 }
122 },
123 },
124 "/api/auth/login": {
125 POST: async (req) => {
126 try {
127 const body = await req.json();
128 const { email, password } = body;
129 if (!email || !password) {
130 return Response.json(
131 { error: "Email and password required" },
132 { status: 400 },
133 );
134 }
135 const user = await authenticateUser(email, password);
136 if (!user) {
137 return Response.json(
138 { error: "Invalid email or password" },
139 { status: 401 },
140 );
141 }
142 const ipAddress =
143 req.headers.get("x-forwarded-for") ??
144 req.headers.get("x-real-ip") ??
145 "unknown";
146 const userAgent = req.headers.get("user-agent") ?? "unknown";
147 const sessionId = createSession(user.id, ipAddress, userAgent);
148 return Response.json(
149 { user: { id: user.id, email: user.email } },
150 {
151 headers: {
152 "Set-Cookie": `session=${sessionId}; HttpOnly; Path=/; Max-Age=${7 * 24 * 60 * 60}; SameSite=Lax`,
153 },
154 },
155 );
156 } catch {
157 return Response.json({ error: "Login failed" }, { status: 500 });
158 }
159 },
160 },
161 "/api/auth/logout": {
162 POST: async (req) => {
163 const sessionId = getSessionFromRequest(req);
164 if (sessionId) {
165 deleteSession(sessionId);
166 }
167 return Response.json(
168 { success: true },
169 {
170 headers: {
171 "Set-Cookie":
172 "session=; HttpOnly; Path=/; Max-Age=0; SameSite=Lax",
173 },
174 },
175 );
176 },
177 },
178 "/api/auth/me": {
179 GET: (req) => {
180 const sessionId = getSessionFromRequest(req);
181 if (!sessionId) {
182 return Response.json({ error: "Not authenticated" }, { status: 401 });
183 }
184 const user = getUserBySession(sessionId);
185 if (!user) {
186 return Response.json({ error: "Invalid session" }, { status: 401 });
187 }
188 return Response.json({
189 email: user.email,
190 name: user.name,
191 avatar: user.avatar,
192 created_at: user.created_at,
193 });
194 },
195 },
196 "/api/sessions": {
197 GET: (req) => {
198 const sessionId = getSessionFromRequest(req);
199 if (!sessionId) {
200 return Response.json({ error: "Not authenticated" }, { status: 401 });
201 }
202 const user = getUserBySession(sessionId);
203 if (!user) {
204 return Response.json({ error: "Invalid session" }, { status: 401 });
205 }
206 const sessions = getUserSessionsForUser(user.id);
207 return Response.json({
208 sessions: sessions.map((s) => ({
209 id: s.id,
210 ip_address: s.ip_address,
211 user_agent: s.user_agent,
212 created_at: s.created_at,
213 expires_at: s.expires_at,
214 })),
215 });
216 },
217 DELETE: async (req) => {
218 const currentSessionId = getSessionFromRequest(req);
219 if (!currentSessionId) {
220 return Response.json({ error: "Not authenticated" }, { status: 401 });
221 }
222 const user = getUserBySession(currentSessionId);
223 if (!user) {
224 return Response.json({ error: "Invalid session" }, { status: 401 });
225 }
226 const body = await req.json();
227 const targetSessionId = body.sessionId;
228 if (!targetSessionId) {
229 return Response.json(
230 { error: "Session ID required" },
231 { status: 400 },
232 );
233 }
234 // Verify the session belongs to the user
235 const targetSession = getSession(targetSessionId);
236 if (!targetSession || targetSession.user_id !== user.id) {
237 return Response.json({ error: "Session not found" }, { status: 404 });
238 }
239 deleteSession(targetSessionId);
240 return Response.json({ success: true });
241 },
242 },
243 "/api/user": {
244 DELETE: (req) => {
245 const sessionId = getSessionFromRequest(req);
246 if (!sessionId) {
247 return Response.json({ error: "Not authenticated" }, { status: 401 });
248 }
249 const user = getUserBySession(sessionId);
250 if (!user) {
251 return Response.json({ error: "Invalid session" }, { status: 401 });
252 }
253 deleteUser(user.id);
254 return Response.json(
255 { success: true },
256 {
257 headers: {
258 "Set-Cookie":
259 "session=; HttpOnly; Path=/; Max-Age=0; SameSite=Lax",
260 },
261 },
262 );
263 },
264 },
265 "/api/user/email": {
266 PUT: async (req) => {
267 const sessionId = getSessionFromRequest(req);
268 if (!sessionId) {
269 return Response.json({ error: "Not authenticated" }, { status: 401 });
270 }
271 const user = getUserBySession(sessionId);
272 if (!user) {
273 return Response.json({ error: "Invalid session" }, { status: 401 });
274 }
275 const body = await req.json();
276 const { email } = body;
277 if (!email) {
278 return Response.json({ error: "Email required" }, { status: 400 });
279 }
280 try {
281 updateUserEmail(user.id, email);
282 return Response.json({ success: true });
283 } catch (err: unknown) {
284 const error = err as { message?: string };
285 if (error.message?.includes("UNIQUE constraint failed")) {
286 return Response.json(
287 { error: "Email already in use" },
288 { status: 400 },
289 );
290 }
291 return Response.json(
292 { error: "Failed to update email" },
293 { status: 500 },
294 );
295 }
296 },
297 },
298 "/api/user/password": {
299 PUT: async (req) => {
300 const sessionId = getSessionFromRequest(req);
301 if (!sessionId) {
302 return Response.json({ error: "Not authenticated" }, { status: 401 });
303 }
304 const user = getUserBySession(sessionId);
305 if (!user) {
306 return Response.json({ error: "Invalid session" }, { status: 401 });
307 }
308 const body = await req.json();
309 const { password } = body;
310 if (!password) {
311 return Response.json({ error: "Password required" }, { status: 400 });
312 }
313 if (password.length < 8) {
314 return Response.json(
315 { error: "Password must be at least 8 characters" },
316 { status: 400 },
317 );
318 }
319 try {
320 await updateUserPassword(user.id, password);
321 return Response.json({ success: true });
322 } catch {
323 return Response.json(
324 { error: "Failed to update password" },
325 { status: 500 },
326 );
327 }
328 },
329 },
330 "/api/user/name": {
331 PUT: async (req) => {
332 const sessionId = getSessionFromRequest(req);
333 if (!sessionId) {
334 return Response.json({ error: "Not authenticated" }, { status: 401 });
335 }
336 const user = getUserBySession(sessionId);
337 if (!user) {
338 return Response.json({ error: "Invalid session" }, { status: 401 });
339 }
340 const body = await req.json();
341 const { name } = body;
342 if (!name) {
343 return Response.json({ error: "Name required" }, { status: 400 });
344 }
345 try {
346 updateUserName(user.id, name);
347 return Response.json({ success: true });
348 } catch {
349 return Response.json(
350 { error: "Failed to update name" },
351 { status: 500 },
352 );
353 }
354 },
355 },
356 "/api/user/avatar": {
357 PUT: async (req) => {
358 const sessionId = getSessionFromRequest(req);
359 if (!sessionId) {
360 return Response.json({ error: "Not authenticated" }, { status: 401 });
361 }
362 const user = getUserBySession(sessionId);
363 if (!user) {
364 return Response.json({ error: "Invalid session" }, { status: 401 });
365 }
366 const body = await req.json();
367 const { avatar } = body;
368 if (!avatar) {
369 return Response.json({ error: "Avatar required" }, { status: 400 });
370 }
371 try {
372 updateUserAvatar(user.id, avatar);
373 return Response.json({ success: true });
374 } catch {
375 return Response.json(
376 { error: "Failed to update avatar" },
377 { status: 500 },
378 );
379 }
380 },
381 },
382 "/api/transcriptions/:id/stream": {
383 GET: async (req) => {
384 const sessionId = getSessionFromRequest(req);
385 if (!sessionId) {
386 return Response.json({ error: "Not authenticated" }, { status: 401 });
387 }
388 const user = getUserBySession(sessionId);
389 if (!user) {
390 return Response.json({ error: "Invalid session" }, { status: 401 });
391 }
392 const transcriptionId = req.params.id;
393 // Verify ownership
394 const transcription = db
395 .query<{ id: string; user_id: number; status: string }, [string]>(
396 "SELECT id, user_id, status FROM transcriptions WHERE id = ?",
397 )
398 .get(transcriptionId);
399 if (!transcription || transcription.user_id !== user.id) {
400 return Response.json(
401 { error: "Transcription not found" },
402 { status: 404 },
403 );
404 }
405 // Event-driven SSE stream with reconnection support
406 const stream = new ReadableStream({
407 async start(controller) {
408 const encoder = new TextEncoder();
409 let isClosed = false;
410 let lastEventId = Math.floor(Date.now() / 1000);
411
412 const sendEvent = (data: Partial<TranscriptionUpdate>) => {
413 if (isClosed) return;
414 try {
415 // Send event ID for reconnection support
416 lastEventId = Math.floor(Date.now() / 1000);
417 controller.enqueue(
418 encoder.encode(
419 `id: ${lastEventId}\nevent: update\ndata: ${JSON.stringify(data)}\n\n`,
420 ),
421 );
422 } catch {
423 // Controller already closed (client disconnected)
424 isClosed = true;
425 }
426 };
427
428 const sendHeartbeat = () => {
429 if (isClosed) return;
430 try {
431 controller.enqueue(encoder.encode(": heartbeat\n\n"));
432 } catch {
433 isClosed = true;
434 }
435 };
436 // Send initial state from DB and file
437 const current = db
438 .query<
439 {
440 status: string;
441 progress: number;
442 },
443 [string]
444 >(
445 "SELECT status, progress FROM transcriptions WHERE id = ?",
446 )
447 .get(transcriptionId);
448 if (current) {
449 // Load transcript from file if completed
450 let transcript: string | undefined;
451 if (current.status === "completed") {
452 transcript = (await getTranscript(transcriptionId)) || undefined;
453 }
454 sendEvent({
455 status: current.status as TranscriptionUpdate["status"],
456 progress: current.progress,
457 transcript,
458 });
459 }
460 // If already complete, close immediately
461 if (
462 current?.status === "completed" ||
463 current?.status === "failed"
464 ) {
465 isClosed = true;
466 controller.close();
467 return;
468 }
469 // Send heartbeats every 2.5 seconds to keep connection alive
470 const heartbeatInterval = setInterval(sendHeartbeat, 2500);
471
472 // Subscribe to EventEmitter for live updates
473 const updateHandler = (data: TranscriptionUpdate) => {
474 if (isClosed) return;
475
476 // Only send changed fields to save bandwidth
477 const payload: Partial<TranscriptionUpdate> = {
478 status: data.status,
479 progress: data.progress,
480 };
481
482 if (data.transcript !== undefined) {
483 payload.transcript = data.transcript;
484 }
485 if (data.error_message !== undefined) {
486 payload.error_message = data.error_message;
487 }
488
489 sendEvent(payload);
490
491 // Close stream when done
492 if (data.status === "completed" || data.status === "failed") {
493 isClosed = true;
494 clearInterval(heartbeatInterval);
495 transcriptionEvents.off(transcriptionId, updateHandler);
496 controller.close();
497 }
498 };
499 transcriptionEvents.on(transcriptionId, updateHandler);
500 // Cleanup on client disconnect
501 return () => {
502 isClosed = true;
503 clearInterval(heartbeatInterval);
504 transcriptionEvents.off(transcriptionId, updateHandler);
505 };
506 },
507 });
508 return new Response(stream, {
509 headers: {
510 "Content-Type": "text/event-stream",
511 "Cache-Control": "no-cache",
512 Connection: "keep-alive",
513 },
514 });
515 },
516 },
517 "/api/transcriptions/health": {
518 GET: async () => {
519 const isHealthy = await whisperService.checkHealth();
520 return Response.json({ available: isHealthy });
521 },
522 },
523 "/api/transcriptions/:id": {
524 GET: async (req) => {
525 try {
526 const user = requireAuth(req);
527 const transcriptionId = req.params.id;
528
529 // Verify ownership
530 const transcription = db
531 .query<
532 {
533 id: string;
534 user_id: number;
535 status: string;
536 original_filename: string;
537 },
538 [string]
539 >(
540 "SELECT id, user_id, status, original_filename FROM transcriptions WHERE id = ?",
541 )
542 .get(transcriptionId);
543
544 if (!transcription || transcription.user_id !== user.id) {
545 return Response.json(
546 { error: "Transcription not found" },
547 { status: 404 },
548 );
549 }
550
551 if (transcription.status !== "completed") {
552 return Response.json(
553 { error: "Transcription not completed yet" },
554 { status: 400 },
555 );
556 }
557
558 // Get format from query parameter
559 const url = new URL(req.url);
560 const format = url.searchParams.get("format");
561
562 // Return WebVTT format if requested
563 if (format === "vtt") {
564 const vttContent = await getTranscriptVTT(transcriptionId);
565
566 if (!vttContent) {
567 return Response.json(
568 { error: "VTT transcript not available" },
569 { status: 404 },
570 );
571 }
572
573 return new Response(vttContent, {
574 headers: {
575 "Content-Type": "text/vtt",
576 "Content-Disposition": `attachment; filename="${transcription.original_filename}.vtt"`,
577 },
578 });
579 }
580
581 // Default: return plain text transcript from file
582 const transcript = await getTranscript(transcriptionId);
583 if (!transcript) {
584 return Response.json(
585 { error: "Transcript not available" },
586 { status: 404 },
587 );
588 }
589
590 return new Response(transcript, {
591 headers: {
592 "Content-Type": "text/plain",
593 },
594 });
595 } catch (error) {
596 return handleError(error);
597 }
598 },
599 },
600 "/api/transcriptions": {
601 GET: async (req) => {
602 try {
603 const user = requireAuth(req);
604
605 const transcriptions = db
606 .query<
607 {
608 id: string;
609 filename: string;
610 original_filename: string;
611 status: string;
612 progress: number;
613 created_at: number;
614 },
615 [number]
616 >(
617 "SELECT id, filename, original_filename, status, progress, created_at FROM transcriptions WHERE user_id = ? ORDER BY created_at DESC",
618 )
619 .all(user.id);
620
621 // Load transcripts from files for completed jobs
622 const jobs = await Promise.all(
623 transcriptions.map(async (t) => {
624 let transcript: string | null = null;
625 if (t.status === "completed") {
626 transcript = await getTranscript(t.id);
627 }
628 return {
629 id: t.id,
630 filename: t.original_filename,
631 status: t.status,
632 progress: t.progress,
633 transcript,
634 created_at: t.created_at,
635 };
636 }),
637 );
638
639 return Response.json({ jobs });
640 } catch (error) {
641 return handleError(error);
642 }
643 },
644 POST: async (req) => {
645 try {
646 const user = requireAuth(req);
647
648 const formData = await req.formData();
649 const file = formData.get("audio") as File;
650
651 if (!file) throw ValidationErrors.missingField("audio");
652
653 // Validate file type
654 const fileExtension = file.name.split(".").pop()?.toLowerCase();
655 const allowedExtensions = [
656 "mp3",
657 "wav",
658 "m4a",
659 "aac",
660 "ogg",
661 "webm",
662 "flac",
663 "mp4",
664 ];
665 const isAudioType =
666 file.type.startsWith("audio/") || file.type === "video/mp4";
667 const isAudioExtension =
668 fileExtension && allowedExtensions.includes(fileExtension);
669
670 if (!isAudioType && !isAudioExtension) {
671 throw ValidationErrors.unsupportedFileType(
672 "MP3, WAV, M4A, AAC, OGG, WebM, FLAC",
673 );
674 }
675
676 if (file.size > MAX_FILE_SIZE) {
677 throw ValidationErrors.fileTooLarge("25MB");
678 }
679
680 // Generate unique filename
681 const transcriptionId = crypto.randomUUID();
682 const filename = `${transcriptionId}.${fileExtension}`;
683
684 // Save file to disk
685 const uploadDir = "./uploads";
686 await Bun.write(`${uploadDir}/${filename}`, file);
687
688 // Create database record
689 db.run(
690 "INSERT INTO transcriptions (id, user_id, filename, original_filename, status) VALUES (?, ?, ?, ?, ?)",
691 [transcriptionId, user.id, filename, file.name, "uploading"],
692 );
693
694 // Start transcription in background
695 whisperService.startTranscription(transcriptionId, filename);
696
697 return Response.json({
698 id: transcriptionId,
699 message: "Upload successful, transcription started",
700 });
701 } catch (error) {
702 return handleError(error);
703 }
704 },
705 },
706 },
707 development: {
708 hmr: true,
709 console: true,
710 },
711});
712console.log(`馃 Thistle running at http://localhost:${server.port}`);