馃 distributed transcription service
thistle.dunkirk.sh
1import db from "./db/schema";
2import {
3 authenticateUser,
4 cleanupExpiredSessions,
5 createSession,
6 createUser,
7 deleteSession,
8 deleteUser,
9 getSession,
10 getSessionFromRequest,
11 getUserBySession,
12 getUserSessionsForUser,
13 updateUserAvatar,
14 updateUserEmail,
15 updateUserName,
16 updateUserPassword,
17} from "./lib/auth";
18import { handleError, ValidationErrors } from "./lib/errors";
19import { requireAuth } from "./lib/middleware";
20import {
21 MAX_FILE_SIZE,
22 TranscriptionEventEmitter,
23 type TranscriptionUpdate,
24 WhisperServiceManager,
25} from "./lib/transcription";
26import { getTranscript, getTranscriptVTT } from "./lib/transcript-storage";
27import indexHTML from "./pages/index.html";
28import settingsHTML from "./pages/settings.html";
29import transcribeHTML from "./pages/transcribe.html";
30
31// Environment variables
32const WHISPER_SERVICE_URL =
33 process.env.WHISPER_SERVICE_URL || "http://localhost:8000";
34
35// Create uploads and transcripts directories if they don't exist
36await Bun.write("./uploads/.gitkeep", "");
37await Bun.write("./transcripts/.gitkeep", "");
38
39// Initialize transcription system
40console.log(
41 `[Transcription] Connecting to Murmur at ${WHISPER_SERVICE_URL}...`,
42);
43const transcriptionEvents = new TranscriptionEventEmitter();
44const whisperService = new WhisperServiceManager(
45 WHISPER_SERVICE_URL,
46 db,
47 transcriptionEvents,
48);
49
50// Clean up expired sessions every hour
51setInterval(cleanupExpiredSessions, 60 * 60 * 1000);
52
53// Sync with Whisper DB on startup
54try {
55 await whisperService.syncWithWhisper();
56 console.log("[Transcription] Successfully connected to Murmur");
57} catch (error) {
58 console.warn(
59 "[Transcription] Murmur unavailable at startup:",
60 error instanceof Error ? error.message : "Unknown error",
61 );
62}
63
64// Periodic sync every 5 minutes as backup (SSE handles real-time updates)
65setInterval(async () => {
66 try {
67 await whisperService.syncWithWhisper();
68 } catch (error) {
69 console.warn(
70 "[Sync] Failed to sync with Murmur:",
71 error instanceof Error ? error.message : "Unknown error",
72 );
73 }
74}, 5 * 60 * 1000);
75
76// Clean up stale files daily
77setInterval(() => whisperService.cleanupStaleFiles(), 24 * 60 * 60 * 1000);
78
79const server = Bun.serve({
80 port: 3000,
81 idleTimeout: 120, // 120 seconds for SSE connections
82 routes: {
83 "/": indexHTML,
84 "/settings": settingsHTML,
85 "/transcribe": transcribeHTML,
86 "/api/auth/register": {
87 POST: async (req) => {
88 try {
89 const body = await req.json();
90 const { email, password, name } = body;
91 if (!email || !password) {
92 return Response.json(
93 { error: "Email and password required" },
94 { status: 400 },
95 );
96 }
97 if (password.length < 8) {
98 return Response.json(
99 { error: "Password must be at least 8 characters" },
100 { status: 400 },
101 );
102 }
103 const user = await createUser(email, password, name);
104 const ipAddress =
105 req.headers.get("x-forwarded-for") ??
106 req.headers.get("x-real-ip") ??
107 "unknown";
108 const userAgent = req.headers.get("user-agent") ?? "unknown";
109 const sessionId = createSession(user.id, ipAddress, userAgent);
110 return Response.json(
111 { user: { id: user.id, email: user.email } },
112 {
113 headers: {
114 "Set-Cookie": `session=${sessionId}; HttpOnly; Path=/; Max-Age=${7 * 24 * 60 * 60}; SameSite=Lax`,
115 },
116 },
117 );
118 } catch (err: unknown) {
119 const error = err as { message?: string };
120 if (error.message?.includes("UNIQUE constraint failed")) {
121 return Response.json(
122 { error: "Email already registered" },
123 { status: 400 },
124 );
125 }
126 return Response.json(
127 { error: "Registration failed" },
128 { status: 500 },
129 );
130 }
131 },
132 },
133 "/api/auth/login": {
134 POST: async (req) => {
135 try {
136 const body = await req.json();
137 const { email, password } = body;
138 if (!email || !password) {
139 return Response.json(
140 { error: "Email and password required" },
141 { status: 400 },
142 );
143 }
144 const user = await authenticateUser(email, password);
145 if (!user) {
146 return Response.json(
147 { error: "Invalid email or password" },
148 { status: 401 },
149 );
150 }
151 const ipAddress =
152 req.headers.get("x-forwarded-for") ??
153 req.headers.get("x-real-ip") ??
154 "unknown";
155 const userAgent = req.headers.get("user-agent") ?? "unknown";
156 const sessionId = createSession(user.id, ipAddress, userAgent);
157 return Response.json(
158 { user: { id: user.id, email: user.email } },
159 {
160 headers: {
161 "Set-Cookie": `session=${sessionId}; HttpOnly; Path=/; Max-Age=${7 * 24 * 60 * 60}; SameSite=Lax`,
162 },
163 },
164 );
165 } catch {
166 return Response.json({ error: "Login failed" }, { status: 500 });
167 }
168 },
169 },
170 "/api/auth/logout": {
171 POST: async (req) => {
172 const sessionId = getSessionFromRequest(req);
173 if (sessionId) {
174 deleteSession(sessionId);
175 }
176 return Response.json(
177 { success: true },
178 {
179 headers: {
180 "Set-Cookie":
181 "session=; HttpOnly; Path=/; Max-Age=0; SameSite=Lax",
182 },
183 },
184 );
185 },
186 },
187 "/api/auth/me": {
188 GET: (req) => {
189 const sessionId = getSessionFromRequest(req);
190 if (!sessionId) {
191 return Response.json({ error: "Not authenticated" }, { status: 401 });
192 }
193 const user = getUserBySession(sessionId);
194 if (!user) {
195 return Response.json({ error: "Invalid session" }, { status: 401 });
196 }
197 return Response.json({
198 email: user.email,
199 name: user.name,
200 avatar: user.avatar,
201 created_at: user.created_at,
202 });
203 },
204 },
205 "/api/sessions": {
206 GET: (req) => {
207 const sessionId = getSessionFromRequest(req);
208 if (!sessionId) {
209 return Response.json({ error: "Not authenticated" }, { status: 401 });
210 }
211 const user = getUserBySession(sessionId);
212 if (!user) {
213 return Response.json({ error: "Invalid session" }, { status: 401 });
214 }
215 const sessions = getUserSessionsForUser(user.id);
216 return Response.json({
217 sessions: sessions.map((s) => ({
218 id: s.id,
219 ip_address: s.ip_address,
220 user_agent: s.user_agent,
221 created_at: s.created_at,
222 expires_at: s.expires_at,
223 })),
224 });
225 },
226 DELETE: async (req) => {
227 const currentSessionId = getSessionFromRequest(req);
228 if (!currentSessionId) {
229 return Response.json({ error: "Not authenticated" }, { status: 401 });
230 }
231 const user = getUserBySession(currentSessionId);
232 if (!user) {
233 return Response.json({ error: "Invalid session" }, { status: 401 });
234 }
235 const body = await req.json();
236 const targetSessionId = body.sessionId;
237 if (!targetSessionId) {
238 return Response.json(
239 { error: "Session ID required" },
240 { status: 400 },
241 );
242 }
243 // Verify the session belongs to the user
244 const targetSession = getSession(targetSessionId);
245 if (!targetSession || targetSession.user_id !== user.id) {
246 return Response.json({ error: "Session not found" }, { status: 404 });
247 }
248 deleteSession(targetSessionId);
249 return Response.json({ success: true });
250 },
251 },
252 "/api/user": {
253 DELETE: (req) => {
254 const sessionId = getSessionFromRequest(req);
255 if (!sessionId) {
256 return Response.json({ error: "Not authenticated" }, { status: 401 });
257 }
258 const user = getUserBySession(sessionId);
259 if (!user) {
260 return Response.json({ error: "Invalid session" }, { status: 401 });
261 }
262 deleteUser(user.id);
263 return Response.json(
264 { success: true },
265 {
266 headers: {
267 "Set-Cookie":
268 "session=; HttpOnly; Path=/; Max-Age=0; SameSite=Lax",
269 },
270 },
271 );
272 },
273 },
274 "/api/user/email": {
275 PUT: async (req) => {
276 const sessionId = getSessionFromRequest(req);
277 if (!sessionId) {
278 return Response.json({ error: "Not authenticated" }, { status: 401 });
279 }
280 const user = getUserBySession(sessionId);
281 if (!user) {
282 return Response.json({ error: "Invalid session" }, { status: 401 });
283 }
284 const body = await req.json();
285 const { email } = body;
286 if (!email) {
287 return Response.json({ error: "Email required" }, { status: 400 });
288 }
289 try {
290 updateUserEmail(user.id, email);
291 return Response.json({ success: true });
292 } catch (err: unknown) {
293 const error = err as { message?: string };
294 if (error.message?.includes("UNIQUE constraint failed")) {
295 return Response.json(
296 { error: "Email already in use" },
297 { status: 400 },
298 );
299 }
300 return Response.json(
301 { error: "Failed to update email" },
302 { status: 500 },
303 );
304 }
305 },
306 },
307 "/api/user/password": {
308 PUT: async (req) => {
309 const sessionId = getSessionFromRequest(req);
310 if (!sessionId) {
311 return Response.json({ error: "Not authenticated" }, { status: 401 });
312 }
313 const user = getUserBySession(sessionId);
314 if (!user) {
315 return Response.json({ error: "Invalid session" }, { status: 401 });
316 }
317 const body = await req.json();
318 const { password } = body;
319 if (!password) {
320 return Response.json({ error: "Password required" }, { status: 400 });
321 }
322 if (password.length < 8) {
323 return Response.json(
324 { error: "Password must be at least 8 characters" },
325 { status: 400 },
326 );
327 }
328 try {
329 await updateUserPassword(user.id, password);
330 return Response.json({ success: true });
331 } catch {
332 return Response.json(
333 { error: "Failed to update password" },
334 { status: 500 },
335 );
336 }
337 },
338 },
339 "/api/user/name": {
340 PUT: async (req) => {
341 const sessionId = getSessionFromRequest(req);
342 if (!sessionId) {
343 return Response.json({ error: "Not authenticated" }, { status: 401 });
344 }
345 const user = getUserBySession(sessionId);
346 if (!user) {
347 return Response.json({ error: "Invalid session" }, { status: 401 });
348 }
349 const body = await req.json();
350 const { name } = body;
351 if (!name) {
352 return Response.json({ error: "Name required" }, { status: 400 });
353 }
354 try {
355 updateUserName(user.id, name);
356 return Response.json({ success: true });
357 } catch {
358 return Response.json(
359 { error: "Failed to update name" },
360 { status: 500 },
361 );
362 }
363 },
364 },
365 "/api/user/avatar": {
366 PUT: async (req) => {
367 const sessionId = getSessionFromRequest(req);
368 if (!sessionId) {
369 return Response.json({ error: "Not authenticated" }, { status: 401 });
370 }
371 const user = getUserBySession(sessionId);
372 if (!user) {
373 return Response.json({ error: "Invalid session" }, { status: 401 });
374 }
375 const body = await req.json();
376 const { avatar } = body;
377 if (!avatar) {
378 return Response.json({ error: "Avatar required" }, { status: 400 });
379 }
380 try {
381 updateUserAvatar(user.id, avatar);
382 return Response.json({ success: true });
383 } catch {
384 return Response.json(
385 { error: "Failed to update avatar" },
386 { status: 500 },
387 );
388 }
389 },
390 },
391 "/api/transcriptions/:id/stream": {
392 GET: async (req) => {
393 const sessionId = getSessionFromRequest(req);
394 if (!sessionId) {
395 return Response.json({ error: "Not authenticated" }, { status: 401 });
396 }
397 const user = getUserBySession(sessionId);
398 if (!user) {
399 return Response.json({ error: "Invalid session" }, { status: 401 });
400 }
401 const transcriptionId = req.params.id;
402 // Verify ownership
403 const transcription = db
404 .query<{ id: string; user_id: number; status: string }, [string]>(
405 "SELECT id, user_id, status FROM transcriptions WHERE id = ?",
406 )
407 .get(transcriptionId);
408 if (!transcription || transcription.user_id !== user.id) {
409 return Response.json(
410 { error: "Transcription not found" },
411 { status: 404 },
412 );
413 }
414 // Event-driven SSE stream with reconnection support
415 const stream = new ReadableStream({
416 async start(controller) {
417 const encoder = new TextEncoder();
418 let isClosed = false;
419 let lastEventId = Math.floor(Date.now() / 1000);
420
421 const sendEvent = (data: Partial<TranscriptionUpdate>) => {
422 if (isClosed) return;
423 try {
424 // Send event ID for reconnection support
425 lastEventId = Math.floor(Date.now() / 1000);
426 controller.enqueue(
427 encoder.encode(
428 `id: ${lastEventId}\nevent: update\ndata: ${JSON.stringify(data)}\n\n`,
429 ),
430 );
431 } catch {
432 // Controller already closed (client disconnected)
433 isClosed = true;
434 }
435 };
436
437 const sendHeartbeat = () => {
438 if (isClosed) return;
439 try {
440 controller.enqueue(encoder.encode(": heartbeat\n\n"));
441 } catch {
442 isClosed = true;
443 }
444 };
445 // Send initial state from DB and file
446 const current = db
447 .query<
448 {
449 status: string;
450 progress: number;
451 },
452 [string]
453 >(
454 "SELECT status, progress FROM transcriptions WHERE id = ?",
455 )
456 .get(transcriptionId);
457 if (current) {
458 // Load transcript from file if completed
459 let transcript: string | undefined;
460 if (current.status === "completed") {
461 transcript = (await getTranscript(transcriptionId)) || undefined;
462 }
463 sendEvent({
464 status: current.status as TranscriptionUpdate["status"],
465 progress: current.progress,
466 transcript,
467 });
468 }
469 // If already complete, close immediately
470 if (
471 current?.status === "completed" ||
472 current?.status === "failed"
473 ) {
474 isClosed = true;
475 controller.close();
476 return;
477 }
478 // Send heartbeats every 2.5 seconds to keep connection alive
479 const heartbeatInterval = setInterval(sendHeartbeat, 2500);
480
481 // Subscribe to EventEmitter for live updates
482 const updateHandler = (data: TranscriptionUpdate) => {
483 if (isClosed) return;
484
485 // Only send changed fields to save bandwidth
486 const payload: Partial<TranscriptionUpdate> = {
487 status: data.status,
488 progress: data.progress,
489 };
490
491 if (data.transcript !== undefined) {
492 payload.transcript = data.transcript;
493 }
494 if (data.error_message !== undefined) {
495 payload.error_message = data.error_message;
496 }
497
498 sendEvent(payload);
499
500 // Close stream when done
501 if (data.status === "completed" || data.status === "failed") {
502 isClosed = true;
503 clearInterval(heartbeatInterval);
504 transcriptionEvents.off(transcriptionId, updateHandler);
505 controller.close();
506 }
507 };
508 transcriptionEvents.on(transcriptionId, updateHandler);
509 // Cleanup on client disconnect
510 return () => {
511 isClosed = true;
512 clearInterval(heartbeatInterval);
513 transcriptionEvents.off(transcriptionId, updateHandler);
514 };
515 },
516 });
517 return new Response(stream, {
518 headers: {
519 "Content-Type": "text/event-stream",
520 "Cache-Control": "no-cache",
521 Connection: "keep-alive",
522 },
523 });
524 },
525 },
526 "/api/transcriptions/health": {
527 GET: async () => {
528 const isHealthy = await whisperService.checkHealth();
529 return Response.json({ available: isHealthy });
530 },
531 },
532 "/api/transcriptions/:id": {
533 GET: async (req) => {
534 try {
535 const user = requireAuth(req);
536 const transcriptionId = req.params.id;
537
538 // Verify ownership
539 const transcription = db
540 .query<
541 {
542 id: string;
543 user_id: number;
544 status: string;
545 original_filename: string;
546 },
547 [string]
548 >(
549 "SELECT id, user_id, status, original_filename FROM transcriptions WHERE id = ?",
550 )
551 .get(transcriptionId);
552
553 if (!transcription || transcription.user_id !== user.id) {
554 return Response.json(
555 { error: "Transcription not found" },
556 { status: 404 },
557 );
558 }
559
560 if (transcription.status !== "completed") {
561 return Response.json(
562 { error: "Transcription not completed yet" },
563 { status: 400 },
564 );
565 }
566
567 // Get format from query parameter
568 const url = new URL(req.url);
569 const format = url.searchParams.get("format");
570
571 // Return WebVTT format if requested
572 if (format === "vtt") {
573 const vttContent = await getTranscriptVTT(transcriptionId);
574
575 if (!vttContent) {
576 return Response.json(
577 { error: "VTT transcript not available" },
578 { status: 404 },
579 );
580 }
581
582 return new Response(vttContent, {
583 headers: {
584 "Content-Type": "text/vtt",
585 "Content-Disposition": `attachment; filename="${transcription.original_filename}.vtt"`,
586 },
587 });
588 }
589
590 // Default: return plain text transcript from file
591 const transcript = await getTranscript(transcriptionId);
592 if (!transcript) {
593 return Response.json(
594 { error: "Transcript not available" },
595 { status: 404 },
596 );
597 }
598
599 return new Response(transcript, {
600 headers: {
601 "Content-Type": "text/plain",
602 },
603 });
604 } catch (error) {
605 return handleError(error);
606 }
607 },
608 },
609 "/api/transcriptions/:id/audio": {
610 GET: async (req) => {
611 try {
612 const user = requireAuth(req);
613 const transcriptionId = req.params.id;
614
615 // Verify ownership and get filename
616 const transcription = db
617 .query<
618 {
619 id: string;
620 user_id: number;
621 filename: string;
622 status: string;
623 },
624 [string]
625 >("SELECT id, user_id, filename, status FROM transcriptions WHERE id = ?")
626 .get(transcriptionId);
627
628 if (!transcription || transcription.user_id !== user.id) {
629 return Response.json(
630 { error: "Transcription not found" },
631 { status: 404 },
632 );
633 }
634
635 if (transcription.status !== "completed") {
636 return Response.json(
637 { error: "Transcription not completed yet" },
638 { status: 400 },
639 );
640 }
641
642 // Serve the audio file with range request support
643 const filePath = `./uploads/${transcription.filename}`;
644 const file = Bun.file(filePath);
645
646 if (!(await file.exists())) {
647 return Response.json({ error: "Audio file not found" }, { status: 404 });
648 }
649
650 const fileSize = file.size;
651 const range = req.headers.get("range");
652
653 // Handle range requests for seeking
654 if (range) {
655 const parts = range.replace(/bytes=/, "").split("-");
656 const start = Number.parseInt(parts[0] || "0", 10);
657 const end = parts[1] ? Number.parseInt(parts[1], 10) : fileSize - 1;
658 const chunkSize = end - start + 1;
659
660 const fileSlice = file.slice(start, end + 1);
661
662 return new Response(fileSlice, {
663 status: 206,
664 headers: {
665 "Content-Range": `bytes ${start}-${end}/${fileSize}`,
666 "Accept-Ranges": "bytes",
667 "Content-Length": chunkSize.toString(),
668 "Content-Type": file.type || "audio/mpeg",
669 },
670 });
671 }
672
673 // No range request, send entire file
674 return new Response(file, {
675 headers: {
676 "Content-Type": file.type || "audio/mpeg",
677 "Accept-Ranges": "bytes",
678 "Content-Length": fileSize.toString(),
679 },
680 });
681 } catch (error) {
682 return handleError(error);
683 }
684 },
685 },
686 "/api/transcriptions": {
687 GET: async (req) => {
688 try {
689 const user = requireAuth(req);
690
691 const transcriptions = db
692 .query<
693 {
694 id: string;
695 filename: string;
696 original_filename: string;
697 status: string;
698 progress: number;
699 created_at: number;
700 },
701 [number]
702 >(
703 "SELECT id, filename, original_filename, status, progress, created_at FROM transcriptions WHERE user_id = ? ORDER BY created_at DESC",
704 )
705 .all(user.id);
706
707 // Load transcripts from files for completed jobs
708 const jobs = await Promise.all(
709 transcriptions.map(async (t) => {
710 let transcript: string | null = null;
711 if (t.status === "completed") {
712 transcript = await getTranscript(t.id);
713 }
714 return {
715 id: t.id,
716 filename: t.original_filename,
717 status: t.status,
718 progress: t.progress,
719 transcript,
720 created_at: t.created_at,
721 };
722 }),
723 );
724
725 return Response.json({ jobs });
726 } catch (error) {
727 return handleError(error);
728 }
729 },
730 POST: async (req) => {
731 try {
732 const user = requireAuth(req);
733
734 const formData = await req.formData();
735 const file = formData.get("audio") as File;
736
737 if (!file) throw ValidationErrors.missingField("audio");
738
739 // Validate file type
740 const fileExtension = file.name.split(".").pop()?.toLowerCase();
741 const allowedExtensions = [
742 "mp3",
743 "wav",
744 "m4a",
745 "aac",
746 "ogg",
747 "webm",
748 "flac",
749 "mp4",
750 ];
751 const isAudioType =
752 file.type.startsWith("audio/") || file.type === "video/mp4";
753 const isAudioExtension =
754 fileExtension && allowedExtensions.includes(fileExtension);
755
756 if (!isAudioType && !isAudioExtension) {
757 throw ValidationErrors.unsupportedFileType(
758 "MP3, WAV, M4A, AAC, OGG, WebM, FLAC",
759 );
760 }
761
762 if (file.size > MAX_FILE_SIZE) {
763 throw ValidationErrors.fileTooLarge("25MB");
764 }
765
766 // Generate unique filename
767 const transcriptionId = crypto.randomUUID();
768 const filename = `${transcriptionId}.${fileExtension}`;
769
770 // Save file to disk
771 const uploadDir = "./uploads";
772 await Bun.write(`${uploadDir}/${filename}`, file);
773
774 // Create database record
775 db.run(
776 "INSERT INTO transcriptions (id, user_id, filename, original_filename, status) VALUES (?, ?, ?, ?, ?)",
777 [transcriptionId, user.id, filename, file.name, "uploading"],
778 );
779
780 // Start transcription in background
781 whisperService.startTranscription(transcriptionId, filename);
782
783 return Response.json({
784 id: transcriptionId,
785 message: "Upload successful, transcription started",
786 });
787 } catch (error) {
788 return handleError(error);
789 }
790 },
791 },
792 },
793 development: {
794 hmr: true,
795 console: true,
796 },
797});
798console.log(`馃 Thistle running at http://localhost:${server.port}`);