馃 distributed transcription service
thistle.dunkirk.sh
1import db from "./db/schema";
2import {
3 authenticateUser,
4 cleanupExpiredSessions,
5 createSession,
6 createUser,
7 deleteSession,
8 deleteUser,
9 getSession,
10 getSessionFromRequest,
11 getUserBySession,
12 getUserSessionsForUser,
13 updateUserAvatar,
14 updateUserEmail,
15 updateUserName,
16 updateUserPassword,
17} from "./lib/auth";
18import { handleError, ValidationErrors } from "./lib/errors";
19import { requireAuth } from "./lib/middleware";
20import {
21 MAX_FILE_SIZE,
22 TranscriptionEventEmitter,
23 type TranscriptionUpdate,
24 WhisperServiceManager,
25} from "./lib/transcription";
26import { getTranscript } from "./lib/transcript-storage";
27import indexHTML from "./pages/index.html";
28import settingsHTML from "./pages/settings.html";
29import transcribeHTML from "./pages/transcribe.html";
30
31// Environment variables
32const WHISPER_SERVICE_URL =
33 process.env.WHISPER_SERVICE_URL || "http://localhost:8000";
34
35// Create uploads and transcripts directories if they don't exist
36await Bun.write("./uploads/.gitkeep", "");
37await Bun.write("./transcripts/.gitkeep", "");
38
39// Initialize transcription system
40console.log(
41 `[Transcription] Connecting to Murmur at ${WHISPER_SERVICE_URL}...`,
42);
43const transcriptionEvents = new TranscriptionEventEmitter();
44const whisperService = new WhisperServiceManager(
45 WHISPER_SERVICE_URL,
46 db,
47 transcriptionEvents,
48);
49
50// Clean up expired sessions every hour
51setInterval(cleanupExpiredSessions, 60 * 60 * 1000);
52
53// Sync with Whisper DB on startup
54try {
55 await whisperService.syncWithWhisper();
56 console.log("[Transcription] Successfully connected to Murmur");
57} catch (error) {
58 console.warn(
59 "[Transcription] Murmur unavailable at startup:",
60 error instanceof Error ? error.message : "Unknown error",
61 );
62}
63
64// Periodic sync every 5 minutes as backup (SSE handles real-time updates)
65setInterval(() => whisperService.syncWithWhisper(), 5 * 60 * 1000);
66
67// Clean up stale files daily
68setInterval(() => whisperService.cleanupStaleFiles(), 24 * 60 * 60 * 1000);
69
70const server = Bun.serve({
71 port: 3000,
72 idleTimeout: 120, // 120 seconds for SSE connections
73 routes: {
74 "/": indexHTML,
75 "/settings": settingsHTML,
76 "/transcribe": transcribeHTML,
77 "/api/auth/register": {
78 POST: async (req) => {
79 try {
80 const body = await req.json();
81 const { email, password, name } = body;
82 if (!email || !password) {
83 return Response.json(
84 { error: "Email and password required" },
85 { status: 400 },
86 );
87 }
88 if (password.length < 8) {
89 return Response.json(
90 { error: "Password must be at least 8 characters" },
91 { status: 400 },
92 );
93 }
94 const user = await createUser(email, password, name);
95 const ipAddress =
96 req.headers.get("x-forwarded-for") ??
97 req.headers.get("x-real-ip") ??
98 "unknown";
99 const userAgent = req.headers.get("user-agent") ?? "unknown";
100 const sessionId = createSession(user.id, ipAddress, userAgent);
101 return Response.json(
102 { user: { id: user.id, email: user.email } },
103 {
104 headers: {
105 "Set-Cookie": `session=${sessionId}; HttpOnly; Path=/; Max-Age=${7 * 24 * 60 * 60}; SameSite=Lax`,
106 },
107 },
108 );
109 } catch (err: unknown) {
110 const error = err as { message?: string };
111 if (error.message?.includes("UNIQUE constraint failed")) {
112 return Response.json(
113 { error: "Email already registered" },
114 { status: 400 },
115 );
116 }
117 return Response.json(
118 { error: "Registration failed" },
119 { status: 500 },
120 );
121 }
122 },
123 },
124 "/api/auth/login": {
125 POST: async (req) => {
126 try {
127 const body = await req.json();
128 const { email, password } = body;
129 if (!email || !password) {
130 return Response.json(
131 { error: "Email and password required" },
132 { status: 400 },
133 );
134 }
135 const user = await authenticateUser(email, password);
136 if (!user) {
137 return Response.json(
138 { error: "Invalid email or password" },
139 { status: 401 },
140 );
141 }
142 const ipAddress =
143 req.headers.get("x-forwarded-for") ??
144 req.headers.get("x-real-ip") ??
145 "unknown";
146 const userAgent = req.headers.get("user-agent") ?? "unknown";
147 const sessionId = createSession(user.id, ipAddress, userAgent);
148 return Response.json(
149 { user: { id: user.id, email: user.email } },
150 {
151 headers: {
152 "Set-Cookie": `session=${sessionId}; HttpOnly; Path=/; Max-Age=${7 * 24 * 60 * 60}; SameSite=Lax`,
153 },
154 },
155 );
156 } catch {
157 return Response.json({ error: "Login failed" }, { status: 500 });
158 }
159 },
160 },
161 "/api/auth/logout": {
162 POST: async (req) => {
163 const sessionId = getSessionFromRequest(req);
164 if (sessionId) {
165 deleteSession(sessionId);
166 }
167 return Response.json(
168 { success: true },
169 {
170 headers: {
171 "Set-Cookie":
172 "session=; HttpOnly; Path=/; Max-Age=0; SameSite=Lax",
173 },
174 },
175 );
176 },
177 },
178 "/api/auth/me": {
179 GET: (req) => {
180 const sessionId = getSessionFromRequest(req);
181 if (!sessionId) {
182 return Response.json({ error: "Not authenticated" }, { status: 401 });
183 }
184 const user = getUserBySession(sessionId);
185 if (!user) {
186 return Response.json({ error: "Invalid session" }, { status: 401 });
187 }
188 return Response.json({
189 email: user.email,
190 name: user.name,
191 avatar: user.avatar,
192 created_at: user.created_at,
193 });
194 },
195 },
196 "/api/sessions": {
197 GET: (req) => {
198 const sessionId = getSessionFromRequest(req);
199 if (!sessionId) {
200 return Response.json({ error: "Not authenticated" }, { status: 401 });
201 }
202 const user = getUserBySession(sessionId);
203 if (!user) {
204 return Response.json({ error: "Invalid session" }, { status: 401 });
205 }
206 const sessions = getUserSessionsForUser(user.id);
207 return Response.json({
208 sessions: sessions.map((s) => ({
209 id: s.id,
210 ip_address: s.ip_address,
211 user_agent: s.user_agent,
212 created_at: s.created_at,
213 expires_at: s.expires_at,
214 })),
215 });
216 },
217 DELETE: async (req) => {
218 const currentSessionId = getSessionFromRequest(req);
219 if (!currentSessionId) {
220 return Response.json({ error: "Not authenticated" }, { status: 401 });
221 }
222 const user = getUserBySession(currentSessionId);
223 if (!user) {
224 return Response.json({ error: "Invalid session" }, { status: 401 });
225 }
226 const body = await req.json();
227 const targetSessionId = body.sessionId;
228 if (!targetSessionId) {
229 return Response.json(
230 { error: "Session ID required" },
231 { status: 400 },
232 );
233 }
234 // Verify the session belongs to the user
235 const targetSession = getSession(targetSessionId);
236 if (!targetSession || targetSession.user_id !== user.id) {
237 return Response.json({ error: "Session not found" }, { status: 404 });
238 }
239 deleteSession(targetSessionId);
240 return Response.json({ success: true });
241 },
242 },
243 "/api/user": {
244 DELETE: (req) => {
245 const sessionId = getSessionFromRequest(req);
246 if (!sessionId) {
247 return Response.json({ error: "Not authenticated" }, { status: 401 });
248 }
249 const user = getUserBySession(sessionId);
250 if (!user) {
251 return Response.json({ error: "Invalid session" }, { status: 401 });
252 }
253 deleteUser(user.id);
254 return Response.json(
255 { success: true },
256 {
257 headers: {
258 "Set-Cookie":
259 "session=; HttpOnly; Path=/; Max-Age=0; SameSite=Lax",
260 },
261 },
262 );
263 },
264 },
265 "/api/user/email": {
266 PUT: async (req) => {
267 const sessionId = getSessionFromRequest(req);
268 if (!sessionId) {
269 return Response.json({ error: "Not authenticated" }, { status: 401 });
270 }
271 const user = getUserBySession(sessionId);
272 if (!user) {
273 return Response.json({ error: "Invalid session" }, { status: 401 });
274 }
275 const body = await req.json();
276 const { email } = body;
277 if (!email) {
278 return Response.json({ error: "Email required" }, { status: 400 });
279 }
280 try {
281 updateUserEmail(user.id, email);
282 return Response.json({ success: true });
283 } catch (err: unknown) {
284 const error = err as { message?: string };
285 if (error.message?.includes("UNIQUE constraint failed")) {
286 return Response.json(
287 { error: "Email already in use" },
288 { status: 400 },
289 );
290 }
291 return Response.json(
292 { error: "Failed to update email" },
293 { status: 500 },
294 );
295 }
296 },
297 },
298 "/api/user/password": {
299 PUT: async (req) => {
300 const sessionId = getSessionFromRequest(req);
301 if (!sessionId) {
302 return Response.json({ error: "Not authenticated" }, { status: 401 });
303 }
304 const user = getUserBySession(sessionId);
305 if (!user) {
306 return Response.json({ error: "Invalid session" }, { status: 401 });
307 }
308 const body = await req.json();
309 const { password } = body;
310 if (!password) {
311 return Response.json({ error: "Password required" }, { status: 400 });
312 }
313 if (password.length < 8) {
314 return Response.json(
315 { error: "Password must be at least 8 characters" },
316 { status: 400 },
317 );
318 }
319 try {
320 await updateUserPassword(user.id, password);
321 return Response.json({ success: true });
322 } catch {
323 return Response.json(
324 { error: "Failed to update password" },
325 { status: 500 },
326 );
327 }
328 },
329 },
330 "/api/user/name": {
331 PUT: async (req) => {
332 const sessionId = getSessionFromRequest(req);
333 if (!sessionId) {
334 return Response.json({ error: "Not authenticated" }, { status: 401 });
335 }
336 const user = getUserBySession(sessionId);
337 if (!user) {
338 return Response.json({ error: "Invalid session" }, { status: 401 });
339 }
340 const body = await req.json();
341 const { name } = body;
342 if (!name) {
343 return Response.json({ error: "Name required" }, { status: 400 });
344 }
345 try {
346 updateUserName(user.id, name);
347 return Response.json({ success: true });
348 } catch {
349 return Response.json(
350 { error: "Failed to update name" },
351 { status: 500 },
352 );
353 }
354 },
355 },
356 "/api/user/avatar": {
357 PUT: async (req) => {
358 const sessionId = getSessionFromRequest(req);
359 if (!sessionId) {
360 return Response.json({ error: "Not authenticated" }, { status: 401 });
361 }
362 const user = getUserBySession(sessionId);
363 if (!user) {
364 return Response.json({ error: "Invalid session" }, { status: 401 });
365 }
366 const body = await req.json();
367 const { avatar } = body;
368 if (!avatar) {
369 return Response.json({ error: "Avatar required" }, { status: 400 });
370 }
371 try {
372 updateUserAvatar(user.id, avatar);
373 return Response.json({ success: true });
374 } catch {
375 return Response.json(
376 { error: "Failed to update avatar" },
377 { status: 500 },
378 );
379 }
380 },
381 },
382 "/api/transcriptions/:id/stream": {
383 GET: async (req) => {
384 const sessionId = getSessionFromRequest(req);
385 if (!sessionId) {
386 return Response.json({ error: "Not authenticated" }, { status: 401 });
387 }
388 const user = getUserBySession(sessionId);
389 if (!user) {
390 return Response.json({ error: "Invalid session" }, { status: 401 });
391 }
392 const transcriptionId = req.params.id;
393 // Verify ownership
394 const transcription = db
395 .query<{ id: string; user_id: number; status: string }, [string]>(
396 "SELECT id, user_id, status FROM transcriptions WHERE id = ?",
397 )
398 .get(transcriptionId);
399 if (!transcription || transcription.user_id !== user.id) {
400 return Response.json(
401 { error: "Transcription not found" },
402 { status: 404 },
403 );
404 }
405 // Event-driven SSE stream with reconnection support
406 const stream = new ReadableStream({
407 async start(controller) {
408 const encoder = new TextEncoder();
409 let isClosed = false;
410 let lastEventId = Math.floor(Date.now() / 1000);
411
412 const sendEvent = (data: Partial<TranscriptionUpdate>) => {
413 if (isClosed) return;
414 try {
415 // Send event ID for reconnection support
416 lastEventId = Math.floor(Date.now() / 1000);
417 controller.enqueue(
418 encoder.encode(
419 `id: ${lastEventId}\nevent: update\ndata: ${JSON.stringify(data)}\n\n`,
420 ),
421 );
422 } catch {
423 // Controller already closed (client disconnected)
424 isClosed = true;
425 }
426 };
427
428 const sendHeartbeat = () => {
429 if (isClosed) return;
430 try {
431 controller.enqueue(encoder.encode(": heartbeat\n\n"));
432 } catch {
433 isClosed = true;
434 }
435 };
436 // Send initial state from DB and file
437 const current = db
438 .query<
439 {
440 status: string;
441 progress: number;
442 },
443 [string]
444 >(
445 "SELECT status, progress FROM transcriptions WHERE id = ?",
446 )
447 .get(transcriptionId);
448 if (current) {
449 // Load transcript from file if completed
450 let transcript: string | undefined;
451 if (current.status === "completed") {
452 transcript = (await getTranscript(transcriptionId)) || undefined;
453 }
454 sendEvent({
455 status: current.status as TranscriptionUpdate["status"],
456 progress: current.progress,
457 transcript,
458 });
459 }
460 // If already complete, close immediately
461 if (
462 current?.status === "completed" ||
463 current?.status === "failed"
464 ) {
465 isClosed = true;
466 controller.close();
467 return;
468 }
469 // Send heartbeats every 2.5 seconds to keep connection alive
470 const heartbeatInterval = setInterval(sendHeartbeat, 2500);
471
472 // Subscribe to EventEmitter for live updates
473 const updateHandler = (data: TranscriptionUpdate) => {
474 if (isClosed) return;
475
476 // Only send changed fields to save bandwidth
477 const payload: Partial<TranscriptionUpdate> = {
478 status: data.status,
479 progress: data.progress,
480 };
481
482 if (data.transcript !== undefined) {
483 payload.transcript = data.transcript;
484 }
485 if (data.error_message !== undefined) {
486 payload.error_message = data.error_message;
487 }
488
489 sendEvent(payload);
490
491 // Close stream when done
492 if (data.status === "completed" || data.status === "failed") {
493 isClosed = true;
494 clearInterval(heartbeatInterval);
495 transcriptionEvents.off(transcriptionId, updateHandler);
496 controller.close();
497 }
498 };
499 transcriptionEvents.on(transcriptionId, updateHandler);
500 // Cleanup on client disconnect
501 return () => {
502 isClosed = true;
503 clearInterval(heartbeatInterval);
504 transcriptionEvents.off(transcriptionId, updateHandler);
505 };
506 },
507 });
508 return new Response(stream, {
509 headers: {
510 "Content-Type": "text/event-stream",
511 "Cache-Control": "no-cache",
512 Connection: "keep-alive",
513 },
514 });
515 },
516 },
517 "/api/transcriptions/health": {
518 GET: async () => {
519 const isHealthy = await whisperService.checkHealth();
520 return Response.json({ available: isHealthy });
521 },
522 },
523 "/api/transcriptions": {
524 GET: async (req) => {
525 try {
526 const user = requireAuth(req);
527
528 const transcriptions = db
529 .query<
530 {
531 id: string;
532 filename: string;
533 original_filename: string;
534 status: string;
535 progress: number;
536 created_at: number;
537 },
538 [number]
539 >(
540 "SELECT id, filename, original_filename, status, progress, created_at FROM transcriptions WHERE user_id = ? ORDER BY created_at DESC",
541 )
542 .all(user.id);
543
544 // Load transcripts from files for completed jobs
545 const jobs = await Promise.all(
546 transcriptions.map(async (t) => {
547 let transcript: string | null = null;
548 if (t.status === "completed") {
549 transcript = await getTranscript(t.id);
550 }
551 return {
552 id: t.id,
553 filename: t.original_filename,
554 status: t.status,
555 progress: t.progress,
556 transcript,
557 created_at: t.created_at,
558 };
559 }),
560 );
561
562 return Response.json({ jobs });
563 } catch (error) {
564 return handleError(error);
565 }
566 },
567 POST: async (req) => {
568 try {
569 const user = requireAuth(req);
570
571 const formData = await req.formData();
572 const file = formData.get("audio") as File;
573
574 if (!file) throw ValidationErrors.missingField("audio");
575
576 // Validate file type
577 const fileExtension = file.name.split(".").pop()?.toLowerCase();
578 const allowedExtensions = [
579 "mp3",
580 "wav",
581 "m4a",
582 "aac",
583 "ogg",
584 "webm",
585 "flac",
586 "mp4",
587 ];
588 const isAudioType =
589 file.type.startsWith("audio/") || file.type === "video/mp4";
590 const isAudioExtension =
591 fileExtension && allowedExtensions.includes(fileExtension);
592
593 if (!isAudioType && !isAudioExtension) {
594 throw ValidationErrors.unsupportedFileType(
595 "MP3, WAV, M4A, AAC, OGG, WebM, FLAC",
596 );
597 }
598
599 if (file.size > MAX_FILE_SIZE) {
600 throw ValidationErrors.fileTooLarge("25MB");
601 }
602
603 // Generate unique filename
604 const transcriptionId = crypto.randomUUID();
605 const filename = `${transcriptionId}.${fileExtension}`;
606
607 // Save file to disk
608 const uploadDir = "./uploads";
609 await Bun.write(`${uploadDir}/${filename}`, file);
610
611 // Create database record
612 db.run(
613 "INSERT INTO transcriptions (id, user_id, filename, original_filename, status) VALUES (?, ?, ?, ?, ?)",
614 [transcriptionId, user.id, filename, file.name, "uploading"],
615 );
616
617 // Start transcription in background
618 whisperService.startTranscription(transcriptionId, filename);
619
620 return Response.json({
621 id: transcriptionId,
622 message: "Upload successful, transcription started",
623 });
624 } catch (error) {
625 return handleError(error);
626 }
627 },
628 },
629 },
630 development: {
631 hmr: true,
632 console: true,
633 },
634});
635console.log(`馃 Thistle running at http://localhost:${server.port}`);