馃 distributed transcription service
thistle.dunkirk.sh
1import db from "./db/schema";
2import {
3 authenticateUser,
4 cleanupExpiredSessions,
5 createSession,
6 createUser,
7 deleteSession,
8 deleteUser,
9 getSession,
10 getSessionFromRequest,
11 getUserBySession,
12 getUserSessionsForUser,
13 updateUserAvatar,
14 updateUserEmail,
15 updateUserName,
16 updateUserPassword,
17} from "./lib/auth";
18import { handleError, ValidationErrors } from "./lib/errors";
19import { requireAuth } from "./lib/middleware";
20import {
21 MAX_FILE_SIZE,
22 TranscriptionEventEmitter,
23 type TranscriptionUpdate,
24 WhisperServiceManager,
25} from "./lib/transcription";
26import indexHTML from "./pages/index.html";
27import settingsHTML from "./pages/settings.html";
28import transcribeHTML from "./pages/transcribe.html";
29
30// Environment variables
31const WHISPER_SERVICE_URL =
32 process.env.WHISPER_SERVICE_URL || "http://localhost:8000";
33
34// Create uploads directory if it doesn't exist
35await Bun.write("./uploads/.gitkeep", "");
36
37// Initialize transcription system
38console.log(
39 `[Transcription] Connecting to Murmur at ${WHISPER_SERVICE_URL}...`,
40);
41const transcriptionEvents = new TranscriptionEventEmitter();
42const whisperService = new WhisperServiceManager(
43 WHISPER_SERVICE_URL,
44 db,
45 transcriptionEvents,
46);
47
48// Clean up expired sessions every hour
49setInterval(cleanupExpiredSessions, 60 * 60 * 1000);
50
51// Sync with Whisper DB on startup
52try {
53 await whisperService.syncWithWhisper();
54 console.log("[Transcription] Successfully connected to Murmur");
55} catch (error) {
56 console.warn(
57 "[Transcription] Murmur unavailable at startup:",
58 error instanceof Error ? error.message : "Unknown error",
59 );
60}
61
62// Periodic sync every 5 minutes as backup (SSE handles real-time updates)
63setInterval(() => whisperService.syncWithWhisper(), 5 * 60 * 1000);
64
65// Clean up stale files daily
66setInterval(() => whisperService.cleanupStaleFiles(), 24 * 60 * 60 * 1000);
67
68const server = Bun.serve({
69 port: 3000,
70 idleTimeout: 120, // 120 seconds for SSE connections
71 routes: {
72 "/": indexHTML,
73 "/settings": settingsHTML,
74 "/transcribe": transcribeHTML,
75 "/api/auth/register": {
76 POST: async (req) => {
77 try {
78 const body = await req.json();
79 const { email, password, name } = body;
80 if (!email || !password) {
81 return Response.json(
82 { error: "Email and password required" },
83 { status: 400 },
84 );
85 }
86 if (password.length < 8) {
87 return Response.json(
88 { error: "Password must be at least 8 characters" },
89 { status: 400 },
90 );
91 }
92 const user = await createUser(email, password, name);
93 const ipAddress =
94 req.headers.get("x-forwarded-for") ??
95 req.headers.get("x-real-ip") ??
96 "unknown";
97 const userAgent = req.headers.get("user-agent") ?? "unknown";
98 const sessionId = createSession(user.id, ipAddress, userAgent);
99 return Response.json(
100 { user: { id: user.id, email: user.email } },
101 {
102 headers: {
103 "Set-Cookie": `session=${sessionId}; HttpOnly; Path=/; Max-Age=${7 * 24 * 60 * 60}; SameSite=Lax`,
104 },
105 },
106 );
107 } catch (err: unknown) {
108 const error = err as { message?: string };
109 if (error.message?.includes("UNIQUE constraint failed")) {
110 return Response.json(
111 { error: "Email already registered" },
112 { status: 400 },
113 );
114 }
115 return Response.json(
116 { error: "Registration failed" },
117 { status: 500 },
118 );
119 }
120 },
121 },
122 "/api/auth/login": {
123 POST: async (req) => {
124 try {
125 const body = await req.json();
126 const { email, password } = body;
127 if (!email || !password) {
128 return Response.json(
129 { error: "Email and password required" },
130 { status: 400 },
131 );
132 }
133 const user = await authenticateUser(email, password);
134 if (!user) {
135 return Response.json(
136 { error: "Invalid email or password" },
137 { status: 401 },
138 );
139 }
140 const ipAddress =
141 req.headers.get("x-forwarded-for") ??
142 req.headers.get("x-real-ip") ??
143 "unknown";
144 const userAgent = req.headers.get("user-agent") ?? "unknown";
145 const sessionId = createSession(user.id, ipAddress, userAgent);
146 return Response.json(
147 { user: { id: user.id, email: user.email } },
148 {
149 headers: {
150 "Set-Cookie": `session=${sessionId}; HttpOnly; Path=/; Max-Age=${7 * 24 * 60 * 60}; SameSite=Lax`,
151 },
152 },
153 );
154 } catch {
155 return Response.json({ error: "Login failed" }, { status: 500 });
156 }
157 },
158 },
159 "/api/auth/logout": {
160 POST: async (req) => {
161 const sessionId = getSessionFromRequest(req);
162 if (sessionId) {
163 deleteSession(sessionId);
164 }
165 return Response.json(
166 { success: true },
167 {
168 headers: {
169 "Set-Cookie":
170 "session=; HttpOnly; Path=/; Max-Age=0; SameSite=Lax",
171 },
172 },
173 );
174 },
175 },
176 "/api/auth/me": {
177 GET: (req) => {
178 const sessionId = getSessionFromRequest(req);
179 if (!sessionId) {
180 return Response.json({ error: "Not authenticated" }, { status: 401 });
181 }
182 const user = getUserBySession(sessionId);
183 if (!user) {
184 return Response.json({ error: "Invalid session" }, { status: 401 });
185 }
186 return Response.json({
187 email: user.email,
188 name: user.name,
189 avatar: user.avatar,
190 created_at: user.created_at,
191 });
192 },
193 },
194 "/api/sessions": {
195 GET: (req) => {
196 const sessionId = getSessionFromRequest(req);
197 if (!sessionId) {
198 return Response.json({ error: "Not authenticated" }, { status: 401 });
199 }
200 const user = getUserBySession(sessionId);
201 if (!user) {
202 return Response.json({ error: "Invalid session" }, { status: 401 });
203 }
204 const sessions = getUserSessionsForUser(user.id);
205 return Response.json({
206 sessions: sessions.map((s) => ({
207 id: s.id,
208 ip_address: s.ip_address,
209 user_agent: s.user_agent,
210 created_at: s.created_at,
211 expires_at: s.expires_at,
212 })),
213 });
214 },
215 DELETE: async (req) => {
216 const currentSessionId = getSessionFromRequest(req);
217 if (!currentSessionId) {
218 return Response.json({ error: "Not authenticated" }, { status: 401 });
219 }
220 const user = getUserBySession(currentSessionId);
221 if (!user) {
222 return Response.json({ error: "Invalid session" }, { status: 401 });
223 }
224 const body = await req.json();
225 const targetSessionId = body.sessionId;
226 if (!targetSessionId) {
227 return Response.json(
228 { error: "Session ID required" },
229 { status: 400 },
230 );
231 }
232 // Verify the session belongs to the user
233 const targetSession = getSession(targetSessionId);
234 if (!targetSession || targetSession.user_id !== user.id) {
235 return Response.json({ error: "Session not found" }, { status: 404 });
236 }
237 deleteSession(targetSessionId);
238 return Response.json({ success: true });
239 },
240 },
241 "/api/user": {
242 DELETE: (req) => {
243 const sessionId = getSessionFromRequest(req);
244 if (!sessionId) {
245 return Response.json({ error: "Not authenticated" }, { status: 401 });
246 }
247 const user = getUserBySession(sessionId);
248 if (!user) {
249 return Response.json({ error: "Invalid session" }, { status: 401 });
250 }
251 deleteUser(user.id);
252 return Response.json(
253 { success: true },
254 {
255 headers: {
256 "Set-Cookie":
257 "session=; HttpOnly; Path=/; Max-Age=0; SameSite=Lax",
258 },
259 },
260 );
261 },
262 },
263 "/api/user/email": {
264 PUT: async (req) => {
265 const sessionId = getSessionFromRequest(req);
266 if (!sessionId) {
267 return Response.json({ error: "Not authenticated" }, { status: 401 });
268 }
269 const user = getUserBySession(sessionId);
270 if (!user) {
271 return Response.json({ error: "Invalid session" }, { status: 401 });
272 }
273 const body = await req.json();
274 const { email } = body;
275 if (!email) {
276 return Response.json({ error: "Email required" }, { status: 400 });
277 }
278 try {
279 updateUserEmail(user.id, email);
280 return Response.json({ success: true });
281 } catch (err: unknown) {
282 const error = err as { message?: string };
283 if (error.message?.includes("UNIQUE constraint failed")) {
284 return Response.json(
285 { error: "Email already in use" },
286 { status: 400 },
287 );
288 }
289 return Response.json(
290 { error: "Failed to update email" },
291 { status: 500 },
292 );
293 }
294 },
295 },
296 "/api/user/password": {
297 PUT: async (req) => {
298 const sessionId = getSessionFromRequest(req);
299 if (!sessionId) {
300 return Response.json({ error: "Not authenticated" }, { status: 401 });
301 }
302 const user = getUserBySession(sessionId);
303 if (!user) {
304 return Response.json({ error: "Invalid session" }, { status: 401 });
305 }
306 const body = await req.json();
307 const { password } = body;
308 if (!password) {
309 return Response.json({ error: "Password required" }, { status: 400 });
310 }
311 if (password.length < 8) {
312 return Response.json(
313 { error: "Password must be at least 8 characters" },
314 { status: 400 },
315 );
316 }
317 try {
318 await updateUserPassword(user.id, password);
319 return Response.json({ success: true });
320 } catch {
321 return Response.json(
322 { error: "Failed to update password" },
323 { status: 500 },
324 );
325 }
326 },
327 },
328 "/api/user/name": {
329 PUT: async (req) => {
330 const sessionId = getSessionFromRequest(req);
331 if (!sessionId) {
332 return Response.json({ error: "Not authenticated" }, { status: 401 });
333 }
334 const user = getUserBySession(sessionId);
335 if (!user) {
336 return Response.json({ error: "Invalid session" }, { status: 401 });
337 }
338 const body = await req.json();
339 const { name } = body;
340 if (!name) {
341 return Response.json({ error: "Name required" }, { status: 400 });
342 }
343 try {
344 updateUserName(user.id, name);
345 return Response.json({ success: true });
346 } catch {
347 return Response.json(
348 { error: "Failed to update name" },
349 { status: 500 },
350 );
351 }
352 },
353 },
354 "/api/user/avatar": {
355 PUT: async (req) => {
356 const sessionId = getSessionFromRequest(req);
357 if (!sessionId) {
358 return Response.json({ error: "Not authenticated" }, { status: 401 });
359 }
360 const user = getUserBySession(sessionId);
361 if (!user) {
362 return Response.json({ error: "Invalid session" }, { status: 401 });
363 }
364 const body = await req.json();
365 const { avatar } = body;
366 if (!avatar) {
367 return Response.json({ error: "Avatar required" }, { status: 400 });
368 }
369 try {
370 updateUserAvatar(user.id, avatar);
371 return Response.json({ success: true });
372 } catch {
373 return Response.json(
374 { error: "Failed to update avatar" },
375 { status: 500 },
376 );
377 }
378 },
379 },
380 "/api/transcriptions/:id/stream": {
381 GET: (req) => {
382 const sessionId = getSessionFromRequest(req);
383 if (!sessionId) {
384 return Response.json({ error: "Not authenticated" }, { status: 401 });
385 }
386 const user = getUserBySession(sessionId);
387 if (!user) {
388 return Response.json({ error: "Invalid session" }, { status: 401 });
389 }
390 const transcriptionId = req.params.id;
391 // Verify ownership
392 const transcription = db
393 .query<{ id: string; user_id: number; status: string }, [string]>(
394 "SELECT id, user_id, status FROM transcriptions WHERE id = ?",
395 )
396 .get(transcriptionId);
397 if (!transcription || transcription.user_id !== user.id) {
398 return Response.json(
399 { error: "Transcription not found" },
400 { status: 404 },
401 );
402 }
403 // Event-driven SSE stream with reconnection support
404 const stream = new ReadableStream({
405 start(controller) {
406 const encoder = new TextEncoder();
407 let isClosed = false;
408 let lastEventId = Math.floor(Date.now() / 1000);
409
410 const sendEvent = (data: Partial<TranscriptionUpdate>) => {
411 if (isClosed) return;
412 try {
413 // Send event ID for reconnection support
414 lastEventId = Math.floor(Date.now() / 1000);
415 controller.enqueue(
416 encoder.encode(
417 `id: ${lastEventId}\nevent: update\ndata: ${JSON.stringify(data)}\n\n`,
418 ),
419 );
420 } catch {
421 // Controller already closed (client disconnected)
422 isClosed = true;
423 }
424 };
425
426 const sendHeartbeat = () => {
427 if (isClosed) return;
428 try {
429 controller.enqueue(encoder.encode(": heartbeat\n\n"));
430 } catch {
431 isClosed = true;
432 }
433 };
434 // Send initial state from DB
435 const current = db
436 .query<
437 {
438 status: string;
439 progress: number;
440 transcript: string | null;
441 },
442 [string]
443 >(
444 "SELECT status, progress, transcript FROM transcriptions WHERE id = ?",
445 )
446 .get(transcriptionId);
447 if (current) {
448 sendEvent({
449 status: current.status as TranscriptionUpdate["status"],
450 progress: current.progress,
451 transcript: current.transcript || undefined,
452 });
453 }
454 // If already complete, close immediately
455 if (
456 current?.status === "completed" ||
457 current?.status === "failed"
458 ) {
459 isClosed = true;
460 controller.close();
461 return;
462 }
463 // Send heartbeats every 2.5 seconds to keep connection alive
464 const heartbeatInterval = setInterval(sendHeartbeat, 2500);
465
466 // Subscribe to EventEmitter for live updates
467 const updateHandler = (data: TranscriptionUpdate) => {
468 if (isClosed) return;
469
470 // Only send changed fields to save bandwidth
471 const payload: Partial<TranscriptionUpdate> = {
472 status: data.status,
473 progress: data.progress,
474 };
475
476 if (data.transcript !== undefined) {
477 payload.transcript = data.transcript;
478 }
479 if (data.error_message !== undefined) {
480 payload.error_message = data.error_message;
481 }
482
483 sendEvent(payload);
484
485 // Close stream when done
486 if (data.status === "completed" || data.status === "failed") {
487 isClosed = true;
488 clearInterval(heartbeatInterval);
489 transcriptionEvents.off(transcriptionId, updateHandler);
490 controller.close();
491 }
492 };
493 transcriptionEvents.on(transcriptionId, updateHandler);
494 // Cleanup on client disconnect
495 return () => {
496 isClosed = true;
497 clearInterval(heartbeatInterval);
498 transcriptionEvents.off(transcriptionId, updateHandler);
499 };
500 },
501 });
502 return new Response(stream, {
503 headers: {
504 "Content-Type": "text/event-stream",
505 "Cache-Control": "no-cache",
506 Connection: "keep-alive",
507 },
508 });
509 },
510 },
511 "/api/transcriptions/health": {
512 GET: async () => {
513 const isHealthy = await whisperService.checkHealth();
514 return Response.json({ available: isHealthy });
515 },
516 },
517 "/api/transcriptions": {
518 GET: (req) => {
519 try {
520 const user = requireAuth(req);
521
522 const transcriptions = db
523 .query<
524 {
525 id: string;
526 filename: string;
527 original_filename: string;
528 status: string;
529 progress: number;
530 transcript: string | null;
531 created_at: number;
532 },
533 [number]
534 >(
535 "SELECT id, filename, original_filename, status, progress, transcript, created_at FROM transcriptions WHERE user_id = ? ORDER BY created_at DESC",
536 )
537 .all(user.id);
538
539 return Response.json({
540 jobs: transcriptions.map((t) => ({
541 id: t.id,
542 filename: t.original_filename,
543 status: t.status,
544 progress: t.progress,
545 transcript: t.transcript,
546 created_at: t.created_at,
547 })),
548 });
549 } catch (error) {
550 return handleError(error);
551 }
552 },
553 POST: async (req) => {
554 try {
555 const user = requireAuth(req);
556
557 const formData = await req.formData();
558 const file = formData.get("audio") as File;
559
560 if (!file) throw ValidationErrors.missingField("audio");
561
562 // Validate file type
563 const fileExtension = file.name.split(".").pop()?.toLowerCase();
564 const allowedExtensions = [
565 "mp3",
566 "wav",
567 "m4a",
568 "aac",
569 "ogg",
570 "webm",
571 "flac",
572 "mp4",
573 ];
574 const isAudioType =
575 file.type.startsWith("audio/") || file.type === "video/mp4";
576 const isAudioExtension =
577 fileExtension && allowedExtensions.includes(fileExtension);
578
579 if (!isAudioType && !isAudioExtension) {
580 throw ValidationErrors.unsupportedFileType(
581 "MP3, WAV, M4A, AAC, OGG, WebM, FLAC",
582 );
583 }
584
585 if (file.size > MAX_FILE_SIZE) {
586 throw ValidationErrors.fileTooLarge("25MB");
587 }
588
589 // Generate unique filename
590 const transcriptionId = crypto.randomUUID();
591 const filename = `${transcriptionId}.${fileExtension}`;
592
593 // Save file to disk
594 const uploadDir = "./uploads";
595 await Bun.write(`${uploadDir}/${filename}`, file);
596
597 // Create database record
598 db.run(
599 "INSERT INTO transcriptions (id, user_id, filename, original_filename, status) VALUES (?, ?, ?, ?, ?)",
600 [transcriptionId, user.id, filename, file.name, "uploading"],
601 );
602
603 // Start transcription in background
604 whisperService.startTranscription(transcriptionId, filename);
605
606 return Response.json({
607 id: transcriptionId,
608 message: "Upload successful, transcription started",
609 });
610 } catch (error) {
611 return handleError(error);
612 }
613 },
614 },
615 },
616 development: {
617 hmr: true,
618 console: true,
619 },
620});
621console.log(`馃 Thistle running at http://localhost:${server.port}`);