馃 distributed transcription service
thistle.dunkirk.sh
1import db from "./db/schema";
2import {
3 authenticateUser,
4 cleanupExpiredSessions,
5 createSession,
6 createUser,
7 deleteSession,
8 deleteUser,
9 getSession,
10 getSessionFromRequest,
11 getUserBySession,
12 getUserSessionsForUser,
13 updateUserAvatar,
14 updateUserEmail,
15 updateUserName,
16 updateUserPassword,
17} from "./lib/auth";
18import { handleError, ValidationErrors } from "./lib/errors";
19import { requireAuth } from "./lib/middleware";
20import {
21 MAX_FILE_SIZE,
22 TranscriptionEventEmitter,
23 type TranscriptionUpdate,
24 WhisperServiceManager,
25} from "./lib/transcription";
26import { getTranscript, getTranscriptVTT } from "./lib/transcript-storage";
27import indexHTML from "./pages/index.html";
28import settingsHTML from "./pages/settings.html";
29import transcribeHTML from "./pages/transcribe.html";
30
31// Environment variables
32const WHISPER_SERVICE_URL =
33 process.env.WHISPER_SERVICE_URL || "http://localhost:8000";
34
35// Create uploads and transcripts directories if they don't exist
36await Bun.write("./uploads/.gitkeep", "");
37await Bun.write("./transcripts/.gitkeep", "");
38
39// Initialize transcription system
40console.log(
41 `[Transcription] Connecting to Murmur at ${WHISPER_SERVICE_URL}...`,
42);
43const transcriptionEvents = new TranscriptionEventEmitter();
44const whisperService = new WhisperServiceManager(
45 WHISPER_SERVICE_URL,
46 db,
47 transcriptionEvents,
48);
49
50// Clean up expired sessions every hour
51setInterval(cleanupExpiredSessions, 60 * 60 * 1000);
52
53// Sync with Whisper DB on startup
54try {
55 await whisperService.syncWithWhisper();
56 console.log("[Transcription] Successfully connected to Murmur");
57} catch (error) {
58 console.warn(
59 "[Transcription] Murmur unavailable at startup:",
60 error instanceof Error ? error.message : "Unknown error",
61 );
62}
63
64// Periodic sync every 5 minutes as backup (SSE handles real-time updates)
65setInterval(async () => {
66 try {
67 await whisperService.syncWithWhisper();
68 } catch (error) {
69 console.warn(
70 "[Sync] Failed to sync with Murmur:",
71 error instanceof Error ? error.message : "Unknown error",
72 );
73 }
74}, 5 * 60 * 1000);
75
76// Clean up stale files daily
77setInterval(() => whisperService.cleanupStaleFiles(), 24 * 60 * 60 * 1000);
78
79const server = Bun.serve({
80 port: 3000,
81 idleTimeout: 120, // 120 seconds for SSE connections
82 routes: {
83 "/": indexHTML,
84 "/settings": settingsHTML,
85 "/transcribe": transcribeHTML,
86 "/api/auth/register": {
87 POST: async (req) => {
88 try {
89 const body = await req.json();
90 const { email, password, name } = body;
91 if (!email || !password) {
92 return Response.json(
93 { error: "Email and password required" },
94 { status: 400 },
95 );
96 }
97 // Password is client-side hashed (PBKDF2), should be 64 char hex
98 if (password.length !== 64 || !/^[0-9a-f]+$/.test(password)) {
99 return Response.json(
100 { error: "Invalid password format" },
101 { status: 400 },
102 );
103 }
104 const user = await createUser(email, password, name);
105 const ipAddress =
106 req.headers.get("x-forwarded-for") ??
107 req.headers.get("x-real-ip") ??
108 "unknown";
109 const userAgent = req.headers.get("user-agent") ?? "unknown";
110 const sessionId = createSession(user.id, ipAddress, userAgent);
111 return Response.json(
112 { user: { id: user.id, email: user.email } },
113 {
114 headers: {
115 "Set-Cookie": `session=${sessionId}; HttpOnly; Path=/; Max-Age=${7 * 24 * 60 * 60}; SameSite=Lax`,
116 },
117 },
118 );
119 } catch (err: unknown) {
120 const error = err as { message?: string };
121 if (error.message?.includes("UNIQUE constraint failed")) {
122 return Response.json(
123 { error: "Email already registered" },
124 { status: 400 },
125 );
126 }
127 return Response.json(
128 { error: "Registration failed" },
129 { status: 500 },
130 );
131 }
132 },
133 },
134 "/api/auth/login": {
135 POST: async (req) => {
136 try {
137 const body = await req.json();
138 const { email, password } = body;
139 if (!email || !password) {
140 return Response.json(
141 { error: "Email and password required" },
142 { status: 400 },
143 );
144 }
145 // Password is client-side hashed (PBKDF2), should be 64 char hex
146 if (password.length !== 64 || !/^[0-9a-f]+$/.test(password)) {
147 return Response.json(
148 { error: "Invalid password format" },
149 { status: 400 },
150 );
151 }
152 const user = await authenticateUser(email, password);
153 if (!user) {
154 return Response.json(
155 { error: "Invalid email or password" },
156 { status: 401 },
157 );
158 }
159 const ipAddress =
160 req.headers.get("x-forwarded-for") ??
161 req.headers.get("x-real-ip") ??
162 "unknown";
163 const userAgent = req.headers.get("user-agent") ?? "unknown";
164 const sessionId = createSession(user.id, ipAddress, userAgent);
165 return Response.json(
166 { user: { id: user.id, email: user.email } },
167 {
168 headers: {
169 "Set-Cookie": `session=${sessionId}; HttpOnly; Path=/; Max-Age=${7 * 24 * 60 * 60}; SameSite=Lax`,
170 },
171 },
172 );
173 } catch {
174 return Response.json({ error: "Login failed" }, { status: 500 });
175 }
176 },
177 },
178 "/api/auth/logout": {
179 POST: async (req) => {
180 const sessionId = getSessionFromRequest(req);
181 if (sessionId) {
182 deleteSession(sessionId);
183 }
184 return Response.json(
185 { success: true },
186 {
187 headers: {
188 "Set-Cookie":
189 "session=; HttpOnly; Path=/; Max-Age=0; SameSite=Lax",
190 },
191 },
192 );
193 },
194 },
195 "/api/auth/me": {
196 GET: (req) => {
197 const sessionId = getSessionFromRequest(req);
198 if (!sessionId) {
199 return Response.json({ error: "Not authenticated" }, { status: 401 });
200 }
201 const user = getUserBySession(sessionId);
202 if (!user) {
203 return Response.json({ error: "Invalid session" }, { status: 401 });
204 }
205 return Response.json({
206 email: user.email,
207 name: user.name,
208 avatar: user.avatar,
209 created_at: user.created_at,
210 });
211 },
212 },
213 "/api/sessions": {
214 GET: (req) => {
215 const sessionId = getSessionFromRequest(req);
216 if (!sessionId) {
217 return Response.json({ error: "Not authenticated" }, { status: 401 });
218 }
219 const user = getUserBySession(sessionId);
220 if (!user) {
221 return Response.json({ error: "Invalid session" }, { status: 401 });
222 }
223 const sessions = getUserSessionsForUser(user.id);
224 return Response.json({
225 sessions: sessions.map((s) => ({
226 id: s.id,
227 ip_address: s.ip_address,
228 user_agent: s.user_agent,
229 created_at: s.created_at,
230 expires_at: s.expires_at,
231 })),
232 });
233 },
234 DELETE: async (req) => {
235 const currentSessionId = getSessionFromRequest(req);
236 if (!currentSessionId) {
237 return Response.json({ error: "Not authenticated" }, { status: 401 });
238 }
239 const user = getUserBySession(currentSessionId);
240 if (!user) {
241 return Response.json({ error: "Invalid session" }, { status: 401 });
242 }
243 const body = await req.json();
244 const targetSessionId = body.sessionId;
245 if (!targetSessionId) {
246 return Response.json(
247 { error: "Session ID required" },
248 { status: 400 },
249 );
250 }
251 // Verify the session belongs to the user
252 const targetSession = getSession(targetSessionId);
253 if (!targetSession || targetSession.user_id !== user.id) {
254 return Response.json({ error: "Session not found" }, { status: 404 });
255 }
256 deleteSession(targetSessionId);
257 return Response.json({ success: true });
258 },
259 },
260 "/api/user": {
261 DELETE: (req) => {
262 const sessionId = getSessionFromRequest(req);
263 if (!sessionId) {
264 return Response.json({ error: "Not authenticated" }, { status: 401 });
265 }
266 const user = getUserBySession(sessionId);
267 if (!user) {
268 return Response.json({ error: "Invalid session" }, { status: 401 });
269 }
270 deleteUser(user.id);
271 return Response.json(
272 { success: true },
273 {
274 headers: {
275 "Set-Cookie":
276 "session=; HttpOnly; Path=/; Max-Age=0; SameSite=Lax",
277 },
278 },
279 );
280 },
281 },
282 "/api/user/email": {
283 PUT: async (req) => {
284 const sessionId = getSessionFromRequest(req);
285 if (!sessionId) {
286 return Response.json({ error: "Not authenticated" }, { status: 401 });
287 }
288 const user = getUserBySession(sessionId);
289 if (!user) {
290 return Response.json({ error: "Invalid session" }, { status: 401 });
291 }
292 const body = await req.json();
293 const { email } = body;
294 if (!email) {
295 return Response.json({ error: "Email required" }, { status: 400 });
296 }
297 try {
298 updateUserEmail(user.id, email);
299 return Response.json({ success: true });
300 } catch (err: unknown) {
301 const error = err as { message?: string };
302 if (error.message?.includes("UNIQUE constraint failed")) {
303 return Response.json(
304 { error: "Email already in use" },
305 { status: 400 },
306 );
307 }
308 return Response.json(
309 { error: "Failed to update email" },
310 { status: 500 },
311 );
312 }
313 },
314 },
315 "/api/user/password": {
316 PUT: async (req) => {
317 const sessionId = getSessionFromRequest(req);
318 if (!sessionId) {
319 return Response.json({ error: "Not authenticated" }, { status: 401 });
320 }
321 const user = getUserBySession(sessionId);
322 if (!user) {
323 return Response.json({ error: "Invalid session" }, { status: 401 });
324 }
325 const body = await req.json();
326 const { password } = body;
327 if (!password) {
328 return Response.json({ error: "Password required" }, { status: 400 });
329 }
330 // Password is client-side hashed (PBKDF2), should be 64 char hex
331 if (password.length !== 64 || !/^[0-9a-f]+$/.test(password)) {
332 return Response.json(
333 { error: "Invalid password format" },
334 { status: 400 },
335 );
336 }
337 try {
338 await updateUserPassword(user.id, password);
339 return Response.json({ success: true });
340 } catch {
341 return Response.json(
342 { error: "Failed to update password" },
343 { status: 500 },
344 );
345 }
346 },
347 },
348 "/api/user/name": {
349 PUT: async (req) => {
350 const sessionId = getSessionFromRequest(req);
351 if (!sessionId) {
352 return Response.json({ error: "Not authenticated" }, { status: 401 });
353 }
354 const user = getUserBySession(sessionId);
355 if (!user) {
356 return Response.json({ error: "Invalid session" }, { status: 401 });
357 }
358 const body = await req.json();
359 const { name } = body;
360 if (!name) {
361 return Response.json({ error: "Name required" }, { status: 400 });
362 }
363 try {
364 updateUserName(user.id, name);
365 return Response.json({ success: true });
366 } catch {
367 return Response.json(
368 { error: "Failed to update name" },
369 { status: 500 },
370 );
371 }
372 },
373 },
374 "/api/user/avatar": {
375 PUT: async (req) => {
376 const sessionId = getSessionFromRequest(req);
377 if (!sessionId) {
378 return Response.json({ error: "Not authenticated" }, { status: 401 });
379 }
380 const user = getUserBySession(sessionId);
381 if (!user) {
382 return Response.json({ error: "Invalid session" }, { status: 401 });
383 }
384 const body = await req.json();
385 const { avatar } = body;
386 if (!avatar) {
387 return Response.json({ error: "Avatar required" }, { status: 400 });
388 }
389 try {
390 updateUserAvatar(user.id, avatar);
391 return Response.json({ success: true });
392 } catch {
393 return Response.json(
394 { error: "Failed to update avatar" },
395 { status: 500 },
396 );
397 }
398 },
399 },
400 "/api/transcriptions/:id/stream": {
401 GET: async (req) => {
402 const sessionId = getSessionFromRequest(req);
403 if (!sessionId) {
404 return Response.json({ error: "Not authenticated" }, { status: 401 });
405 }
406 const user = getUserBySession(sessionId);
407 if (!user) {
408 return Response.json({ error: "Invalid session" }, { status: 401 });
409 }
410 const transcriptionId = req.params.id;
411 // Verify ownership
412 const transcription = db
413 .query<{ id: string; user_id: number; status: string }, [string]>(
414 "SELECT id, user_id, status FROM transcriptions WHERE id = ?",
415 )
416 .get(transcriptionId);
417 if (!transcription || transcription.user_id !== user.id) {
418 return Response.json(
419 { error: "Transcription not found" },
420 { status: 404 },
421 );
422 }
423 // Event-driven SSE stream with reconnection support
424 const stream = new ReadableStream({
425 async start(controller) {
426 const encoder = new TextEncoder();
427 let isClosed = false;
428 let lastEventId = Math.floor(Date.now() / 1000);
429
430 const sendEvent = (data: Partial<TranscriptionUpdate>) => {
431 if (isClosed) return;
432 try {
433 // Send event ID for reconnection support
434 lastEventId = Math.floor(Date.now() / 1000);
435 controller.enqueue(
436 encoder.encode(
437 `id: ${lastEventId}\nevent: update\ndata: ${JSON.stringify(data)}\n\n`,
438 ),
439 );
440 } catch {
441 // Controller already closed (client disconnected)
442 isClosed = true;
443 }
444 };
445
446 const sendHeartbeat = () => {
447 if (isClosed) return;
448 try {
449 controller.enqueue(encoder.encode(": heartbeat\n\n"));
450 } catch {
451 isClosed = true;
452 }
453 };
454 // Send initial state from DB and file
455 const current = db
456 .query<
457 {
458 status: string;
459 progress: number;
460 },
461 [string]
462 >(
463 "SELECT status, progress FROM transcriptions WHERE id = ?",
464 )
465 .get(transcriptionId);
466 if (current) {
467 // Load transcript from file if completed
468 let transcript: string | undefined;
469 if (current.status === "completed") {
470 transcript = (await getTranscript(transcriptionId)) || undefined;
471 }
472 sendEvent({
473 status: current.status as TranscriptionUpdate["status"],
474 progress: current.progress,
475 transcript,
476 });
477 }
478 // If already complete, close immediately
479 if (
480 current?.status === "completed" ||
481 current?.status === "failed"
482 ) {
483 isClosed = true;
484 controller.close();
485 return;
486 }
487 // Send heartbeats every 2.5 seconds to keep connection alive
488 const heartbeatInterval = setInterval(sendHeartbeat, 2500);
489
490 // Subscribe to EventEmitter for live updates
491 const updateHandler = (data: TranscriptionUpdate) => {
492 if (isClosed) return;
493
494 // Only send changed fields to save bandwidth
495 const payload: Partial<TranscriptionUpdate> = {
496 status: data.status,
497 progress: data.progress,
498 };
499
500 if (data.transcript !== undefined) {
501 payload.transcript = data.transcript;
502 }
503 if (data.error_message !== undefined) {
504 payload.error_message = data.error_message;
505 }
506
507 sendEvent(payload);
508
509 // Close stream when done
510 if (data.status === "completed" || data.status === "failed") {
511 isClosed = true;
512 clearInterval(heartbeatInterval);
513 transcriptionEvents.off(transcriptionId, updateHandler);
514 controller.close();
515 }
516 };
517 transcriptionEvents.on(transcriptionId, updateHandler);
518 // Cleanup on client disconnect
519 return () => {
520 isClosed = true;
521 clearInterval(heartbeatInterval);
522 transcriptionEvents.off(transcriptionId, updateHandler);
523 };
524 },
525 });
526 return new Response(stream, {
527 headers: {
528 "Content-Type": "text/event-stream",
529 "Cache-Control": "no-cache",
530 Connection: "keep-alive",
531 },
532 });
533 },
534 },
535 "/api/transcriptions/health": {
536 GET: async () => {
537 const isHealthy = await whisperService.checkHealth();
538 return Response.json({ available: isHealthy });
539 },
540 },
541 "/api/transcriptions/:id": {
542 GET: async (req) => {
543 try {
544 const user = requireAuth(req);
545 const transcriptionId = req.params.id;
546
547 // Verify ownership
548 const transcription = db
549 .query<
550 {
551 id: string;
552 user_id: number;
553 status: string;
554 original_filename: string;
555 },
556 [string]
557 >(
558 "SELECT id, user_id, status, original_filename FROM transcriptions WHERE id = ?",
559 )
560 .get(transcriptionId);
561
562 if (!transcription || transcription.user_id !== user.id) {
563 return Response.json(
564 { error: "Transcription not found" },
565 { status: 404 },
566 );
567 }
568
569 if (transcription.status !== "completed") {
570 return Response.json(
571 { error: "Transcription not completed yet" },
572 { status: 400 },
573 );
574 }
575
576 // Get format from query parameter
577 const url = new URL(req.url);
578 const format = url.searchParams.get("format");
579
580 // Return WebVTT format if requested
581 if (format === "vtt") {
582 const vttContent = await getTranscriptVTT(transcriptionId);
583
584 if (!vttContent) {
585 return Response.json(
586 { error: "VTT transcript not available" },
587 { status: 404 },
588 );
589 }
590
591 return new Response(vttContent, {
592 headers: {
593 "Content-Type": "text/vtt",
594 "Content-Disposition": `attachment; filename="${transcription.original_filename}.vtt"`,
595 },
596 });
597 }
598
599 // Default: return plain text transcript from file
600 const transcript = await getTranscript(transcriptionId);
601 if (!transcript) {
602 return Response.json(
603 { error: "Transcript not available" },
604 { status: 404 },
605 );
606 }
607
608 return new Response(transcript, {
609 headers: {
610 "Content-Type": "text/plain",
611 },
612 });
613 } catch (error) {
614 return handleError(error);
615 }
616 },
617 },
618 "/api/transcriptions/:id/audio": {
619 GET: async (req) => {
620 try {
621 const user = requireAuth(req);
622 const transcriptionId = req.params.id;
623
624 // Verify ownership and get filename
625 const transcription = db
626 .query<
627 {
628 id: string;
629 user_id: number;
630 filename: string;
631 status: string;
632 },
633 [string]
634 >("SELECT id, user_id, filename, status FROM transcriptions WHERE id = ?")
635 .get(transcriptionId);
636
637 if (!transcription || transcription.user_id !== user.id) {
638 return Response.json(
639 { error: "Transcription not found" },
640 { status: 404 },
641 );
642 }
643
644 if (transcription.status !== "completed") {
645 return Response.json(
646 { error: "Transcription not completed yet" },
647 { status: 400 },
648 );
649 }
650
651 // Serve the audio file with range request support
652 const filePath = `./uploads/${transcription.filename}`;
653 const file = Bun.file(filePath);
654
655 if (!(await file.exists())) {
656 return Response.json({ error: "Audio file not found" }, { status: 404 });
657 }
658
659 const fileSize = file.size;
660 const range = req.headers.get("range");
661
662 // Handle range requests for seeking
663 if (range) {
664 const parts = range.replace(/bytes=/, "").split("-");
665 const start = Number.parseInt(parts[0] || "0", 10);
666 const end = parts[1] ? Number.parseInt(parts[1], 10) : fileSize - 1;
667 const chunkSize = end - start + 1;
668
669 const fileSlice = file.slice(start, end + 1);
670
671 return new Response(fileSlice, {
672 status: 206,
673 headers: {
674 "Content-Range": `bytes ${start}-${end}/${fileSize}`,
675 "Accept-Ranges": "bytes",
676 "Content-Length": chunkSize.toString(),
677 "Content-Type": file.type || "audio/mpeg",
678 },
679 });
680 }
681
682 // No range request, send entire file
683 return new Response(file, {
684 headers: {
685 "Content-Type": file.type || "audio/mpeg",
686 "Accept-Ranges": "bytes",
687 "Content-Length": fileSize.toString(),
688 },
689 });
690 } catch (error) {
691 return handleError(error);
692 }
693 },
694 },
695 "/api/transcriptions": {
696 GET: async (req) => {
697 try {
698 const user = requireAuth(req);
699
700 const transcriptions = db
701 .query<
702 {
703 id: string;
704 filename: string;
705 original_filename: string;
706 status: string;
707 progress: number;
708 created_at: number;
709 },
710 [number]
711 >(
712 "SELECT id, filename, original_filename, status, progress, created_at FROM transcriptions WHERE user_id = ? ORDER BY created_at DESC",
713 )
714 .all(user.id);
715
716 // Load transcripts from files for completed jobs
717 const jobs = await Promise.all(
718 transcriptions.map(async (t) => {
719 let transcript: string | null = null;
720 if (t.status === "completed") {
721 transcript = await getTranscript(t.id);
722 }
723 return {
724 id: t.id,
725 filename: t.original_filename,
726 status: t.status,
727 progress: t.progress,
728 transcript,
729 created_at: t.created_at,
730 };
731 }),
732 );
733
734 return Response.json({ jobs });
735 } catch (error) {
736 return handleError(error);
737 }
738 },
739 POST: async (req) => {
740 try {
741 const user = requireAuth(req);
742
743 const formData = await req.formData();
744 const file = formData.get("audio") as File;
745
746 if (!file) throw ValidationErrors.missingField("audio");
747
748 // Validate file type
749 const fileExtension = file.name.split(".").pop()?.toLowerCase();
750 const allowedExtensions = [
751 "mp3",
752 "wav",
753 "m4a",
754 "aac",
755 "ogg",
756 "webm",
757 "flac",
758 "mp4",
759 ];
760 const isAudioType =
761 file.type.startsWith("audio/") || file.type === "video/mp4";
762 const isAudioExtension =
763 fileExtension && allowedExtensions.includes(fileExtension);
764
765 if (!isAudioType && !isAudioExtension) {
766 throw ValidationErrors.unsupportedFileType(
767 "MP3, WAV, M4A, AAC, OGG, WebM, FLAC",
768 );
769 }
770
771 if (file.size > MAX_FILE_SIZE) {
772 throw ValidationErrors.fileTooLarge("25MB");
773 }
774
775 // Generate unique filename
776 const transcriptionId = crypto.randomUUID();
777 const filename = `${transcriptionId}.${fileExtension}`;
778
779 // Save file to disk
780 const uploadDir = "./uploads";
781 await Bun.write(`${uploadDir}/${filename}`, file);
782
783 // Create database record
784 db.run(
785 "INSERT INTO transcriptions (id, user_id, filename, original_filename, status) VALUES (?, ?, ?, ?, ?)",
786 [transcriptionId, user.id, filename, file.name, "uploading"],
787 );
788
789 // Start transcription in background
790 whisperService.startTranscription(transcriptionId, filename);
791
792 return Response.json({
793 id: transcriptionId,
794 message: "Upload successful, transcription started",
795 });
796 } catch (error) {
797 return handleError(error);
798 }
799 },
800 },
801 },
802 development: {
803 hmr: true,
804 console: true,
805 },
806});
807console.log(`馃 Thistle running at http://localhost:${server.port}`);