馃 distributed transcription service
thistle.dunkirk.sh
1import db from "./db/schema";
2import {
3 authenticateUser,
4 cleanupExpiredSessions,
5 createSession,
6 createUser,
7 deleteSession,
8 deleteUser,
9 getSession,
10 getSessionFromRequest,
11 getUserBySession,
12 getUserSessionsForUser,
13 updateUserAvatar,
14 updateUserEmail,
15 updateUserName,
16 updateUserPassword,
17} from "./lib/auth";
18import { handleError, ValidationErrors } from "./lib/errors";
19import { requireAuth } from "./lib/middleware";
20import {
21 MAX_FILE_SIZE,
22 TranscriptionEventEmitter,
23 type TranscriptionUpdate,
24 WhisperServiceManager,
25} from "./lib/transcription";
26import { getTranscript, getTranscriptVTT } from "./lib/transcript-storage";
27import indexHTML from "./pages/index.html";
28import settingsHTML from "./pages/settings.html";
29import transcribeHTML from "./pages/transcribe.html";
30
31// Environment variables
32const WHISPER_SERVICE_URL =
33 process.env.WHISPER_SERVICE_URL || "http://localhost:8000";
34
35// Create uploads and transcripts directories if they don't exist
36await Bun.write("./uploads/.gitkeep", "");
37await Bun.write("./transcripts/.gitkeep", "");
38
39// Initialize transcription system
40console.log(
41 `[Transcription] Connecting to Murmur at ${WHISPER_SERVICE_URL}...`,
42);
43const transcriptionEvents = new TranscriptionEventEmitter();
44const whisperService = new WhisperServiceManager(
45 WHISPER_SERVICE_URL,
46 db,
47 transcriptionEvents,
48);
49
50// Clean up expired sessions every hour
51setInterval(cleanupExpiredSessions, 60 * 60 * 1000);
52
53// Sync with Whisper DB on startup
54try {
55 await whisperService.syncWithWhisper();
56 console.log("[Transcription] Successfully connected to Murmur");
57} catch (error) {
58 console.warn(
59 "[Transcription] Murmur unavailable at startup:",
60 error instanceof Error ? error.message : "Unknown error",
61 );
62}
63
64// Periodic sync every 5 minutes as backup (SSE handles real-time updates)
65setInterval(() => whisperService.syncWithWhisper(), 5 * 60 * 1000);
66
67// Clean up stale files daily
68setInterval(() => whisperService.cleanupStaleFiles(), 24 * 60 * 60 * 1000);
69
70const server = Bun.serve({
71 port: 3000,
72 idleTimeout: 120, // 120 seconds for SSE connections
73 routes: {
74 "/": indexHTML,
75 "/settings": settingsHTML,
76 "/transcribe": transcribeHTML,
77 "/api/auth/register": {
78 POST: async (req) => {
79 try {
80 const body = await req.json();
81 const { email, password, name } = body;
82 if (!email || !password) {
83 return Response.json(
84 { error: "Email and password required" },
85 { status: 400 },
86 );
87 }
88 if (password.length < 8) {
89 return Response.json(
90 { error: "Password must be at least 8 characters" },
91 { status: 400 },
92 );
93 }
94 const user = await createUser(email, password, name);
95 const ipAddress =
96 req.headers.get("x-forwarded-for") ??
97 req.headers.get("x-real-ip") ??
98 "unknown";
99 const userAgent = req.headers.get("user-agent") ?? "unknown";
100 const sessionId = createSession(user.id, ipAddress, userAgent);
101 return Response.json(
102 { user: { id: user.id, email: user.email } },
103 {
104 headers: {
105 "Set-Cookie": `session=${sessionId}; HttpOnly; Path=/; Max-Age=${7 * 24 * 60 * 60}; SameSite=Lax`,
106 },
107 },
108 );
109 } catch (err: unknown) {
110 const error = err as { message?: string };
111 if (error.message?.includes("UNIQUE constraint failed")) {
112 return Response.json(
113 { error: "Email already registered" },
114 { status: 400 },
115 );
116 }
117 return Response.json(
118 { error: "Registration failed" },
119 { status: 500 },
120 );
121 }
122 },
123 },
124 "/api/auth/login": {
125 POST: async (req) => {
126 try {
127 const body = await req.json();
128 const { email, password } = body;
129 if (!email || !password) {
130 return Response.json(
131 { error: "Email and password required" },
132 { status: 400 },
133 );
134 }
135 const user = await authenticateUser(email, password);
136 if (!user) {
137 return Response.json(
138 { error: "Invalid email or password" },
139 { status: 401 },
140 );
141 }
142 const ipAddress =
143 req.headers.get("x-forwarded-for") ??
144 req.headers.get("x-real-ip") ??
145 "unknown";
146 const userAgent = req.headers.get("user-agent") ?? "unknown";
147 const sessionId = createSession(user.id, ipAddress, userAgent);
148 return Response.json(
149 { user: { id: user.id, email: user.email } },
150 {
151 headers: {
152 "Set-Cookie": `session=${sessionId}; HttpOnly; Path=/; Max-Age=${7 * 24 * 60 * 60}; SameSite=Lax`,
153 },
154 },
155 );
156 } catch {
157 return Response.json({ error: "Login failed" }, { status: 500 });
158 }
159 },
160 },
161 "/api/auth/logout": {
162 POST: async (req) => {
163 const sessionId = getSessionFromRequest(req);
164 if (sessionId) {
165 deleteSession(sessionId);
166 }
167 return Response.json(
168 { success: true },
169 {
170 headers: {
171 "Set-Cookie":
172 "session=; HttpOnly; Path=/; Max-Age=0; SameSite=Lax",
173 },
174 },
175 );
176 },
177 },
178 "/api/auth/me": {
179 GET: (req) => {
180 const sessionId = getSessionFromRequest(req);
181 if (!sessionId) {
182 return Response.json({ error: "Not authenticated" }, { status: 401 });
183 }
184 const user = getUserBySession(sessionId);
185 if (!user) {
186 return Response.json({ error: "Invalid session" }, { status: 401 });
187 }
188 return Response.json({
189 email: user.email,
190 name: user.name,
191 avatar: user.avatar,
192 created_at: user.created_at,
193 });
194 },
195 },
196 "/api/sessions": {
197 GET: (req) => {
198 const sessionId = getSessionFromRequest(req);
199 if (!sessionId) {
200 return Response.json({ error: "Not authenticated" }, { status: 401 });
201 }
202 const user = getUserBySession(sessionId);
203 if (!user) {
204 return Response.json({ error: "Invalid session" }, { status: 401 });
205 }
206 const sessions = getUserSessionsForUser(user.id);
207 return Response.json({
208 sessions: sessions.map((s) => ({
209 id: s.id,
210 ip_address: s.ip_address,
211 user_agent: s.user_agent,
212 created_at: s.created_at,
213 expires_at: s.expires_at,
214 })),
215 });
216 },
217 DELETE: async (req) => {
218 const currentSessionId = getSessionFromRequest(req);
219 if (!currentSessionId) {
220 return Response.json({ error: "Not authenticated" }, { status: 401 });
221 }
222 const user = getUserBySession(currentSessionId);
223 if (!user) {
224 return Response.json({ error: "Invalid session" }, { status: 401 });
225 }
226 const body = await req.json();
227 const targetSessionId = body.sessionId;
228 if (!targetSessionId) {
229 return Response.json(
230 { error: "Session ID required" },
231 { status: 400 },
232 );
233 }
234 // Verify the session belongs to the user
235 const targetSession = getSession(targetSessionId);
236 if (!targetSession || targetSession.user_id !== user.id) {
237 return Response.json({ error: "Session not found" }, { status: 404 });
238 }
239 deleteSession(targetSessionId);
240 return Response.json({ success: true });
241 },
242 },
243 "/api/user": {
244 DELETE: (req) => {
245 const sessionId = getSessionFromRequest(req);
246 if (!sessionId) {
247 return Response.json({ error: "Not authenticated" }, { status: 401 });
248 }
249 const user = getUserBySession(sessionId);
250 if (!user) {
251 return Response.json({ error: "Invalid session" }, { status: 401 });
252 }
253 deleteUser(user.id);
254 return Response.json(
255 { success: true },
256 {
257 headers: {
258 "Set-Cookie":
259 "session=; HttpOnly; Path=/; Max-Age=0; SameSite=Lax",
260 },
261 },
262 );
263 },
264 },
265 "/api/user/email": {
266 PUT: async (req) => {
267 const sessionId = getSessionFromRequest(req);
268 if (!sessionId) {
269 return Response.json({ error: "Not authenticated" }, { status: 401 });
270 }
271 const user = getUserBySession(sessionId);
272 if (!user) {
273 return Response.json({ error: "Invalid session" }, { status: 401 });
274 }
275 const body = await req.json();
276 const { email } = body;
277 if (!email) {
278 return Response.json({ error: "Email required" }, { status: 400 });
279 }
280 try {
281 updateUserEmail(user.id, email);
282 return Response.json({ success: true });
283 } catch (err: unknown) {
284 const error = err as { message?: string };
285 if (error.message?.includes("UNIQUE constraint failed")) {
286 return Response.json(
287 { error: "Email already in use" },
288 { status: 400 },
289 );
290 }
291 return Response.json(
292 { error: "Failed to update email" },
293 { status: 500 },
294 );
295 }
296 },
297 },
298 "/api/user/password": {
299 PUT: async (req) => {
300 const sessionId = getSessionFromRequest(req);
301 if (!sessionId) {
302 return Response.json({ error: "Not authenticated" }, { status: 401 });
303 }
304 const user = getUserBySession(sessionId);
305 if (!user) {
306 return Response.json({ error: "Invalid session" }, { status: 401 });
307 }
308 const body = await req.json();
309 const { password } = body;
310 if (!password) {
311 return Response.json({ error: "Password required" }, { status: 400 });
312 }
313 if (password.length < 8) {
314 return Response.json(
315 { error: "Password must be at least 8 characters" },
316 { status: 400 },
317 );
318 }
319 try {
320 await updateUserPassword(user.id, password);
321 return Response.json({ success: true });
322 } catch {
323 return Response.json(
324 { error: "Failed to update password" },
325 { status: 500 },
326 );
327 }
328 },
329 },
330 "/api/user/name": {
331 PUT: async (req) => {
332 const sessionId = getSessionFromRequest(req);
333 if (!sessionId) {
334 return Response.json({ error: "Not authenticated" }, { status: 401 });
335 }
336 const user = getUserBySession(sessionId);
337 if (!user) {
338 return Response.json({ error: "Invalid session" }, { status: 401 });
339 }
340 const body = await req.json();
341 const { name } = body;
342 if (!name) {
343 return Response.json({ error: "Name required" }, { status: 400 });
344 }
345 try {
346 updateUserName(user.id, name);
347 return Response.json({ success: true });
348 } catch {
349 return Response.json(
350 { error: "Failed to update name" },
351 { status: 500 },
352 );
353 }
354 },
355 },
356 "/api/user/avatar": {
357 PUT: async (req) => {
358 const sessionId = getSessionFromRequest(req);
359 if (!sessionId) {
360 return Response.json({ error: "Not authenticated" }, { status: 401 });
361 }
362 const user = getUserBySession(sessionId);
363 if (!user) {
364 return Response.json({ error: "Invalid session" }, { status: 401 });
365 }
366 const body = await req.json();
367 const { avatar } = body;
368 if (!avatar) {
369 return Response.json({ error: "Avatar required" }, { status: 400 });
370 }
371 try {
372 updateUserAvatar(user.id, avatar);
373 return Response.json({ success: true });
374 } catch {
375 return Response.json(
376 { error: "Failed to update avatar" },
377 { status: 500 },
378 );
379 }
380 },
381 },
382 "/api/transcriptions/:id/stream": {
383 GET: async (req) => {
384 const sessionId = getSessionFromRequest(req);
385 if (!sessionId) {
386 return Response.json({ error: "Not authenticated" }, { status: 401 });
387 }
388 const user = getUserBySession(sessionId);
389 if (!user) {
390 return Response.json({ error: "Invalid session" }, { status: 401 });
391 }
392 const transcriptionId = req.params.id;
393 // Verify ownership
394 const transcription = db
395 .query<{ id: string; user_id: number; status: string }, [string]>(
396 "SELECT id, user_id, status FROM transcriptions WHERE id = ?",
397 )
398 .get(transcriptionId);
399 if (!transcription || transcription.user_id !== user.id) {
400 return Response.json(
401 { error: "Transcription not found" },
402 { status: 404 },
403 );
404 }
405 // Event-driven SSE stream with reconnection support
406 const stream = new ReadableStream({
407 async start(controller) {
408 const encoder = new TextEncoder();
409 let isClosed = false;
410 let lastEventId = Math.floor(Date.now() / 1000);
411
412 const sendEvent = (data: Partial<TranscriptionUpdate>) => {
413 if (isClosed) return;
414 try {
415 // Send event ID for reconnection support
416 lastEventId = Math.floor(Date.now() / 1000);
417 controller.enqueue(
418 encoder.encode(
419 `id: ${lastEventId}\nevent: update\ndata: ${JSON.stringify(data)}\n\n`,
420 ),
421 );
422 } catch {
423 // Controller already closed (client disconnected)
424 isClosed = true;
425 }
426 };
427
428 const sendHeartbeat = () => {
429 if (isClosed) return;
430 try {
431 controller.enqueue(encoder.encode(": heartbeat\n\n"));
432 } catch {
433 isClosed = true;
434 }
435 };
436 // Send initial state from DB and file
437 const current = db
438 .query<
439 {
440 status: string;
441 progress: number;
442 },
443 [string]
444 >(
445 "SELECT status, progress FROM transcriptions WHERE id = ?",
446 )
447 .get(transcriptionId);
448 if (current) {
449 // Load transcript from file if completed
450 let transcript: string | undefined;
451 if (current.status === "completed") {
452 transcript = (await getTranscript(transcriptionId)) || undefined;
453 }
454 sendEvent({
455 status: current.status as TranscriptionUpdate["status"],
456 progress: current.progress,
457 transcript,
458 });
459 }
460 // If already complete, close immediately
461 if (
462 current?.status === "completed" ||
463 current?.status === "failed"
464 ) {
465 isClosed = true;
466 controller.close();
467 return;
468 }
469 // Send heartbeats every 2.5 seconds to keep connection alive
470 const heartbeatInterval = setInterval(sendHeartbeat, 2500);
471
472 // Subscribe to EventEmitter for live updates
473 const updateHandler = (data: TranscriptionUpdate) => {
474 if (isClosed) return;
475
476 // Only send changed fields to save bandwidth
477 const payload: Partial<TranscriptionUpdate> = {
478 status: data.status,
479 progress: data.progress,
480 };
481
482 if (data.transcript !== undefined) {
483 payload.transcript = data.transcript;
484 }
485 if (data.error_message !== undefined) {
486 payload.error_message = data.error_message;
487 }
488
489 sendEvent(payload);
490
491 // Close stream when done
492 if (data.status === "completed" || data.status === "failed") {
493 isClosed = true;
494 clearInterval(heartbeatInterval);
495 transcriptionEvents.off(transcriptionId, updateHandler);
496 controller.close();
497 }
498 };
499 transcriptionEvents.on(transcriptionId, updateHandler);
500 // Cleanup on client disconnect
501 return () => {
502 isClosed = true;
503 clearInterval(heartbeatInterval);
504 transcriptionEvents.off(transcriptionId, updateHandler);
505 };
506 },
507 });
508 return new Response(stream, {
509 headers: {
510 "Content-Type": "text/event-stream",
511 "Cache-Control": "no-cache",
512 Connection: "keep-alive",
513 },
514 });
515 },
516 },
517 "/api/transcriptions/health": {
518 GET: async () => {
519 const isHealthy = await whisperService.checkHealth();
520 return Response.json({ available: isHealthy });
521 },
522 },
523 "/api/transcriptions/:id": {
524 GET: async (req) => {
525 try {
526 const user = requireAuth(req);
527 const transcriptionId = req.params.id;
528
529 // Verify ownership
530 const transcription = db
531 .query<
532 {
533 id: string;
534 user_id: number;
535 status: string;
536 original_filename: string;
537 },
538 [string]
539 >(
540 "SELECT id, user_id, status, original_filename FROM transcriptions WHERE id = ?",
541 )
542 .get(transcriptionId);
543
544 if (!transcription || transcription.user_id !== user.id) {
545 return Response.json(
546 { error: "Transcription not found" },
547 { status: 404 },
548 );
549 }
550
551 if (transcription.status !== "completed") {
552 return Response.json(
553 { error: "Transcription not completed yet" },
554 { status: 400 },
555 );
556 }
557
558 // Get format from query parameter
559 const url = new URL(req.url);
560 const format = url.searchParams.get("format");
561
562 // Return WebVTT format if requested
563 if (format === "vtt") {
564 const vttContent = await getTranscriptVTT(transcriptionId);
565
566 if (!vttContent) {
567 return Response.json(
568 { error: "VTT transcript not available" },
569 { status: 404 },
570 );
571 }
572
573 return new Response(vttContent, {
574 headers: {
575 "Content-Type": "text/vtt",
576 "Content-Disposition": `attachment; filename="${transcription.original_filename}.vtt"`,
577 },
578 });
579 }
580
581 // Default: return plain text transcript from file
582 const transcript = await getTranscript(transcriptionId);
583 if (!transcript) {
584 return Response.json(
585 { error: "Transcript not available" },
586 { status: 404 },
587 );
588 }
589
590 return new Response(transcript, {
591 headers: {
592 "Content-Type": "text/plain",
593 },
594 });
595 } catch (error) {
596 return handleError(error);
597 }
598 },
599 },
600 "/api/transcriptions/:id/audio": {
601 GET: async (req) => {
602 try {
603 const user = requireAuth(req);
604 const transcriptionId = req.params.id;
605
606 // Verify ownership and get filename
607 const transcription = db
608 .query<
609 {
610 id: string;
611 user_id: number;
612 filename: string;
613 status: string;
614 },
615 [string]
616 >("SELECT id, user_id, filename, status FROM transcriptions WHERE id = ?")
617 .get(transcriptionId);
618
619 if (!transcription || transcription.user_id !== user.id) {
620 return Response.json(
621 { error: "Transcription not found" },
622 { status: 404 },
623 );
624 }
625
626 if (transcription.status !== "completed") {
627 return Response.json(
628 { error: "Transcription not completed yet" },
629 { status: 400 },
630 );
631 }
632
633 // Serve the audio file with range request support
634 const filePath = `./uploads/${transcription.filename}`;
635 const file = Bun.file(filePath);
636
637 if (!(await file.exists())) {
638 return Response.json({ error: "Audio file not found" }, { status: 404 });
639 }
640
641 const fileSize = file.size;
642 const range = req.headers.get("range");
643
644 // Handle range requests for seeking
645 if (range) {
646 const parts = range.replace(/bytes=/, "").split("-");
647 const start = Number.parseInt(parts[0] || "0", 10);
648 const end = parts[1] ? Number.parseInt(parts[1], 10) : fileSize - 1;
649 const chunkSize = end - start + 1;
650
651 const fileSlice = file.slice(start, end + 1);
652
653 return new Response(fileSlice, {
654 status: 206,
655 headers: {
656 "Content-Range": `bytes ${start}-${end}/${fileSize}`,
657 "Accept-Ranges": "bytes",
658 "Content-Length": chunkSize.toString(),
659 "Content-Type": file.type || "audio/mpeg",
660 },
661 });
662 }
663
664 // No range request, send entire file
665 return new Response(file, {
666 headers: {
667 "Content-Type": file.type || "audio/mpeg",
668 "Accept-Ranges": "bytes",
669 "Content-Length": fileSize.toString(),
670 },
671 });
672 } catch (error) {
673 return handleError(error);
674 }
675 },
676 },
677 "/api/transcriptions": {
678 GET: async (req) => {
679 try {
680 const user = requireAuth(req);
681
682 const transcriptions = db
683 .query<
684 {
685 id: string;
686 filename: string;
687 original_filename: string;
688 status: string;
689 progress: number;
690 created_at: number;
691 },
692 [number]
693 >(
694 "SELECT id, filename, original_filename, status, progress, created_at FROM transcriptions WHERE user_id = ? ORDER BY created_at DESC",
695 )
696 .all(user.id);
697
698 // Load transcripts from files for completed jobs
699 const jobs = await Promise.all(
700 transcriptions.map(async (t) => {
701 let transcript: string | null = null;
702 if (t.status === "completed") {
703 transcript = await getTranscript(t.id);
704 }
705 return {
706 id: t.id,
707 filename: t.original_filename,
708 status: t.status,
709 progress: t.progress,
710 transcript,
711 created_at: t.created_at,
712 };
713 }),
714 );
715
716 return Response.json({ jobs });
717 } catch (error) {
718 return handleError(error);
719 }
720 },
721 POST: async (req) => {
722 try {
723 const user = requireAuth(req);
724
725 const formData = await req.formData();
726 const file = formData.get("audio") as File;
727
728 if (!file) throw ValidationErrors.missingField("audio");
729
730 // Validate file type
731 const fileExtension = file.name.split(".").pop()?.toLowerCase();
732 const allowedExtensions = [
733 "mp3",
734 "wav",
735 "m4a",
736 "aac",
737 "ogg",
738 "webm",
739 "flac",
740 "mp4",
741 ];
742 const isAudioType =
743 file.type.startsWith("audio/") || file.type === "video/mp4";
744 const isAudioExtension =
745 fileExtension && allowedExtensions.includes(fileExtension);
746
747 if (!isAudioType && !isAudioExtension) {
748 throw ValidationErrors.unsupportedFileType(
749 "MP3, WAV, M4A, AAC, OGG, WebM, FLAC",
750 );
751 }
752
753 if (file.size > MAX_FILE_SIZE) {
754 throw ValidationErrors.fileTooLarge("25MB");
755 }
756
757 // Generate unique filename
758 const transcriptionId = crypto.randomUUID();
759 const filename = `${transcriptionId}.${fileExtension}`;
760
761 // Save file to disk
762 const uploadDir = "./uploads";
763 await Bun.write(`${uploadDir}/${filename}`, file);
764
765 // Create database record
766 db.run(
767 "INSERT INTO transcriptions (id, user_id, filename, original_filename, status) VALUES (?, ?, ?, ?, ?)",
768 [transcriptionId, user.id, filename, file.name, "uploading"],
769 );
770
771 // Start transcription in background
772 whisperService.startTranscription(transcriptionId, filename);
773
774 return Response.json({
775 id: transcriptionId,
776 message: "Upload successful, transcription started",
777 });
778 } catch (error) {
779 return handleError(error);
780 }
781 },
782 },
783 },
784 development: {
785 hmr: true,
786 console: true,
787 },
788});
789console.log(`馃 Thistle running at http://localhost:${server.port}`);