馃 distributed transcription service thistle.dunkirk.sh
1import db from "./db/schema"; 2import { 3 authenticateUser, 4 cleanupExpiredSessions, 5 createSession, 6 createUser, 7 deleteSession, 8 deleteUser, 9 getSession, 10 getSessionFromRequest, 11 getUserBySession, 12 getUserSessionsForUser, 13 updateUserAvatar, 14 updateUserEmail, 15 updateUserName, 16 updateUserPassword, 17} from "./lib/auth"; 18import { handleError, ValidationErrors } from "./lib/errors"; 19import { requireAuth } from "./lib/middleware"; 20import { 21 MAX_FILE_SIZE, 22 TranscriptionEventEmitter, 23 type TranscriptionUpdate, 24 WhisperServiceManager, 25} from "./lib/transcription"; 26import { getTranscript, getTranscriptVTT } from "./lib/transcript-storage"; 27import indexHTML from "./pages/index.html"; 28import settingsHTML from "./pages/settings.html"; 29import transcribeHTML from "./pages/transcribe.html"; 30 31// Environment variables 32const WHISPER_SERVICE_URL = 33 process.env.WHISPER_SERVICE_URL || "http://localhost:8000"; 34 35// Create uploads and transcripts directories if they don't exist 36await Bun.write("./uploads/.gitkeep", ""); 37await Bun.write("./transcripts/.gitkeep", ""); 38 39// Initialize transcription system 40console.log( 41 `[Transcription] Connecting to Murmur at ${WHISPER_SERVICE_URL}...`, 42); 43const transcriptionEvents = new TranscriptionEventEmitter(); 44const whisperService = new WhisperServiceManager( 45 WHISPER_SERVICE_URL, 46 db, 47 transcriptionEvents, 48); 49 50// Clean up expired sessions every hour 51setInterval(cleanupExpiredSessions, 60 * 60 * 1000); 52 53// Sync with Whisper DB on startup 54try { 55 await whisperService.syncWithWhisper(); 56 console.log("[Transcription] Successfully connected to Murmur"); 57} catch (error) { 58 console.warn( 59 "[Transcription] Murmur unavailable at startup:", 60 error instanceof Error ? error.message : "Unknown error", 61 ); 62} 63 64// Periodic sync every 5 minutes as backup (SSE handles real-time updates) 65setInterval(() => whisperService.syncWithWhisper(), 5 * 60 * 1000); 66 67// Clean up stale files daily 68setInterval(() => whisperService.cleanupStaleFiles(), 24 * 60 * 60 * 1000); 69 70const server = Bun.serve({ 71 port: 3000, 72 idleTimeout: 120, // 120 seconds for SSE connections 73 routes: { 74 "/": indexHTML, 75 "/settings": settingsHTML, 76 "/transcribe": transcribeHTML, 77 "/api/auth/register": { 78 POST: async (req) => { 79 try { 80 const body = await req.json(); 81 const { email, password, name } = body; 82 if (!email || !password) { 83 return Response.json( 84 { error: "Email and password required" }, 85 { status: 400 }, 86 ); 87 } 88 if (password.length < 8) { 89 return Response.json( 90 { error: "Password must be at least 8 characters" }, 91 { status: 400 }, 92 ); 93 } 94 const user = await createUser(email, password, name); 95 const ipAddress = 96 req.headers.get("x-forwarded-for") ?? 97 req.headers.get("x-real-ip") ?? 98 "unknown"; 99 const userAgent = req.headers.get("user-agent") ?? "unknown"; 100 const sessionId = createSession(user.id, ipAddress, userAgent); 101 return Response.json( 102 { user: { id: user.id, email: user.email } }, 103 { 104 headers: { 105 "Set-Cookie": `session=${sessionId}; HttpOnly; Path=/; Max-Age=${7 * 24 * 60 * 60}; SameSite=Lax`, 106 }, 107 }, 108 ); 109 } catch (err: unknown) { 110 const error = err as { message?: string }; 111 if (error.message?.includes("UNIQUE constraint failed")) { 112 return Response.json( 113 { error: "Email already registered" }, 114 { status: 400 }, 115 ); 116 } 117 return Response.json( 118 { error: "Registration failed" }, 119 { status: 500 }, 120 ); 121 } 122 }, 123 }, 124 "/api/auth/login": { 125 POST: async (req) => { 126 try { 127 const body = await req.json(); 128 const { email, password } = body; 129 if (!email || !password) { 130 return Response.json( 131 { error: "Email and password required" }, 132 { status: 400 }, 133 ); 134 } 135 const user = await authenticateUser(email, password); 136 if (!user) { 137 return Response.json( 138 { error: "Invalid email or password" }, 139 { status: 401 }, 140 ); 141 } 142 const ipAddress = 143 req.headers.get("x-forwarded-for") ?? 144 req.headers.get("x-real-ip") ?? 145 "unknown"; 146 const userAgent = req.headers.get("user-agent") ?? "unknown"; 147 const sessionId = createSession(user.id, ipAddress, userAgent); 148 return Response.json( 149 { user: { id: user.id, email: user.email } }, 150 { 151 headers: { 152 "Set-Cookie": `session=${sessionId}; HttpOnly; Path=/; Max-Age=${7 * 24 * 60 * 60}; SameSite=Lax`, 153 }, 154 }, 155 ); 156 } catch { 157 return Response.json({ error: "Login failed" }, { status: 500 }); 158 } 159 }, 160 }, 161 "/api/auth/logout": { 162 POST: async (req) => { 163 const sessionId = getSessionFromRequest(req); 164 if (sessionId) { 165 deleteSession(sessionId); 166 } 167 return Response.json( 168 { success: true }, 169 { 170 headers: { 171 "Set-Cookie": 172 "session=; HttpOnly; Path=/; Max-Age=0; SameSite=Lax", 173 }, 174 }, 175 ); 176 }, 177 }, 178 "/api/auth/me": { 179 GET: (req) => { 180 const sessionId = getSessionFromRequest(req); 181 if (!sessionId) { 182 return Response.json({ error: "Not authenticated" }, { status: 401 }); 183 } 184 const user = getUserBySession(sessionId); 185 if (!user) { 186 return Response.json({ error: "Invalid session" }, { status: 401 }); 187 } 188 return Response.json({ 189 email: user.email, 190 name: user.name, 191 avatar: user.avatar, 192 created_at: user.created_at, 193 }); 194 }, 195 }, 196 "/api/sessions": { 197 GET: (req) => { 198 const sessionId = getSessionFromRequest(req); 199 if (!sessionId) { 200 return Response.json({ error: "Not authenticated" }, { status: 401 }); 201 } 202 const user = getUserBySession(sessionId); 203 if (!user) { 204 return Response.json({ error: "Invalid session" }, { status: 401 }); 205 } 206 const sessions = getUserSessionsForUser(user.id); 207 return Response.json({ 208 sessions: sessions.map((s) => ({ 209 id: s.id, 210 ip_address: s.ip_address, 211 user_agent: s.user_agent, 212 created_at: s.created_at, 213 expires_at: s.expires_at, 214 })), 215 }); 216 }, 217 DELETE: async (req) => { 218 const currentSessionId = getSessionFromRequest(req); 219 if (!currentSessionId) { 220 return Response.json({ error: "Not authenticated" }, { status: 401 }); 221 } 222 const user = getUserBySession(currentSessionId); 223 if (!user) { 224 return Response.json({ error: "Invalid session" }, { status: 401 }); 225 } 226 const body = await req.json(); 227 const targetSessionId = body.sessionId; 228 if (!targetSessionId) { 229 return Response.json( 230 { error: "Session ID required" }, 231 { status: 400 }, 232 ); 233 } 234 // Verify the session belongs to the user 235 const targetSession = getSession(targetSessionId); 236 if (!targetSession || targetSession.user_id !== user.id) { 237 return Response.json({ error: "Session not found" }, { status: 404 }); 238 } 239 deleteSession(targetSessionId); 240 return Response.json({ success: true }); 241 }, 242 }, 243 "/api/user": { 244 DELETE: (req) => { 245 const sessionId = getSessionFromRequest(req); 246 if (!sessionId) { 247 return Response.json({ error: "Not authenticated" }, { status: 401 }); 248 } 249 const user = getUserBySession(sessionId); 250 if (!user) { 251 return Response.json({ error: "Invalid session" }, { status: 401 }); 252 } 253 deleteUser(user.id); 254 return Response.json( 255 { success: true }, 256 { 257 headers: { 258 "Set-Cookie": 259 "session=; HttpOnly; Path=/; Max-Age=0; SameSite=Lax", 260 }, 261 }, 262 ); 263 }, 264 }, 265 "/api/user/email": { 266 PUT: async (req) => { 267 const sessionId = getSessionFromRequest(req); 268 if (!sessionId) { 269 return Response.json({ error: "Not authenticated" }, { status: 401 }); 270 } 271 const user = getUserBySession(sessionId); 272 if (!user) { 273 return Response.json({ error: "Invalid session" }, { status: 401 }); 274 } 275 const body = await req.json(); 276 const { email } = body; 277 if (!email) { 278 return Response.json({ error: "Email required" }, { status: 400 }); 279 } 280 try { 281 updateUserEmail(user.id, email); 282 return Response.json({ success: true }); 283 } catch (err: unknown) { 284 const error = err as { message?: string }; 285 if (error.message?.includes("UNIQUE constraint failed")) { 286 return Response.json( 287 { error: "Email already in use" }, 288 { status: 400 }, 289 ); 290 } 291 return Response.json( 292 { error: "Failed to update email" }, 293 { status: 500 }, 294 ); 295 } 296 }, 297 }, 298 "/api/user/password": { 299 PUT: async (req) => { 300 const sessionId = getSessionFromRequest(req); 301 if (!sessionId) { 302 return Response.json({ error: "Not authenticated" }, { status: 401 }); 303 } 304 const user = getUserBySession(sessionId); 305 if (!user) { 306 return Response.json({ error: "Invalid session" }, { status: 401 }); 307 } 308 const body = await req.json(); 309 const { password } = body; 310 if (!password) { 311 return Response.json({ error: "Password required" }, { status: 400 }); 312 } 313 if (password.length < 8) { 314 return Response.json( 315 { error: "Password must be at least 8 characters" }, 316 { status: 400 }, 317 ); 318 } 319 try { 320 await updateUserPassword(user.id, password); 321 return Response.json({ success: true }); 322 } catch { 323 return Response.json( 324 { error: "Failed to update password" }, 325 { status: 500 }, 326 ); 327 } 328 }, 329 }, 330 "/api/user/name": { 331 PUT: async (req) => { 332 const sessionId = getSessionFromRequest(req); 333 if (!sessionId) { 334 return Response.json({ error: "Not authenticated" }, { status: 401 }); 335 } 336 const user = getUserBySession(sessionId); 337 if (!user) { 338 return Response.json({ error: "Invalid session" }, { status: 401 }); 339 } 340 const body = await req.json(); 341 const { name } = body; 342 if (!name) { 343 return Response.json({ error: "Name required" }, { status: 400 }); 344 } 345 try { 346 updateUserName(user.id, name); 347 return Response.json({ success: true }); 348 } catch { 349 return Response.json( 350 { error: "Failed to update name" }, 351 { status: 500 }, 352 ); 353 } 354 }, 355 }, 356 "/api/user/avatar": { 357 PUT: async (req) => { 358 const sessionId = getSessionFromRequest(req); 359 if (!sessionId) { 360 return Response.json({ error: "Not authenticated" }, { status: 401 }); 361 } 362 const user = getUserBySession(sessionId); 363 if (!user) { 364 return Response.json({ error: "Invalid session" }, { status: 401 }); 365 } 366 const body = await req.json(); 367 const { avatar } = body; 368 if (!avatar) { 369 return Response.json({ error: "Avatar required" }, { status: 400 }); 370 } 371 try { 372 updateUserAvatar(user.id, avatar); 373 return Response.json({ success: true }); 374 } catch { 375 return Response.json( 376 { error: "Failed to update avatar" }, 377 { status: 500 }, 378 ); 379 } 380 }, 381 }, 382 "/api/transcriptions/:id/stream": { 383 GET: async (req) => { 384 const sessionId = getSessionFromRequest(req); 385 if (!sessionId) { 386 return Response.json({ error: "Not authenticated" }, { status: 401 }); 387 } 388 const user = getUserBySession(sessionId); 389 if (!user) { 390 return Response.json({ error: "Invalid session" }, { status: 401 }); 391 } 392 const transcriptionId = req.params.id; 393 // Verify ownership 394 const transcription = db 395 .query<{ id: string; user_id: number; status: string }, [string]>( 396 "SELECT id, user_id, status FROM transcriptions WHERE id = ?", 397 ) 398 .get(transcriptionId); 399 if (!transcription || transcription.user_id !== user.id) { 400 return Response.json( 401 { error: "Transcription not found" }, 402 { status: 404 }, 403 ); 404 } 405 // Event-driven SSE stream with reconnection support 406 const stream = new ReadableStream({ 407 async start(controller) { 408 const encoder = new TextEncoder(); 409 let isClosed = false; 410 let lastEventId = Math.floor(Date.now() / 1000); 411 412 const sendEvent = (data: Partial<TranscriptionUpdate>) => { 413 if (isClosed) return; 414 try { 415 // Send event ID for reconnection support 416 lastEventId = Math.floor(Date.now() / 1000); 417 controller.enqueue( 418 encoder.encode( 419 `id: ${lastEventId}\nevent: update\ndata: ${JSON.stringify(data)}\n\n`, 420 ), 421 ); 422 } catch { 423 // Controller already closed (client disconnected) 424 isClosed = true; 425 } 426 }; 427 428 const sendHeartbeat = () => { 429 if (isClosed) return; 430 try { 431 controller.enqueue(encoder.encode(": heartbeat\n\n")); 432 } catch { 433 isClosed = true; 434 } 435 }; 436 // Send initial state from DB and file 437 const current = db 438 .query< 439 { 440 status: string; 441 progress: number; 442 }, 443 [string] 444 >( 445 "SELECT status, progress FROM transcriptions WHERE id = ?", 446 ) 447 .get(transcriptionId); 448 if (current) { 449 // Load transcript from file if completed 450 let transcript: string | undefined; 451 if (current.status === "completed") { 452 transcript = (await getTranscript(transcriptionId)) || undefined; 453 } 454 sendEvent({ 455 status: current.status as TranscriptionUpdate["status"], 456 progress: current.progress, 457 transcript, 458 }); 459 } 460 // If already complete, close immediately 461 if ( 462 current?.status === "completed" || 463 current?.status === "failed" 464 ) { 465 isClosed = true; 466 controller.close(); 467 return; 468 } 469 // Send heartbeats every 2.5 seconds to keep connection alive 470 const heartbeatInterval = setInterval(sendHeartbeat, 2500); 471 472 // Subscribe to EventEmitter for live updates 473 const updateHandler = (data: TranscriptionUpdate) => { 474 if (isClosed) return; 475 476 // Only send changed fields to save bandwidth 477 const payload: Partial<TranscriptionUpdate> = { 478 status: data.status, 479 progress: data.progress, 480 }; 481 482 if (data.transcript !== undefined) { 483 payload.transcript = data.transcript; 484 } 485 if (data.error_message !== undefined) { 486 payload.error_message = data.error_message; 487 } 488 489 sendEvent(payload); 490 491 // Close stream when done 492 if (data.status === "completed" || data.status === "failed") { 493 isClosed = true; 494 clearInterval(heartbeatInterval); 495 transcriptionEvents.off(transcriptionId, updateHandler); 496 controller.close(); 497 } 498 }; 499 transcriptionEvents.on(transcriptionId, updateHandler); 500 // Cleanup on client disconnect 501 return () => { 502 isClosed = true; 503 clearInterval(heartbeatInterval); 504 transcriptionEvents.off(transcriptionId, updateHandler); 505 }; 506 }, 507 }); 508 return new Response(stream, { 509 headers: { 510 "Content-Type": "text/event-stream", 511 "Cache-Control": "no-cache", 512 Connection: "keep-alive", 513 }, 514 }); 515 }, 516 }, 517 "/api/transcriptions/health": { 518 GET: async () => { 519 const isHealthy = await whisperService.checkHealth(); 520 return Response.json({ available: isHealthy }); 521 }, 522 }, 523 "/api/transcriptions/:id": { 524 GET: async (req) => { 525 try { 526 const user = requireAuth(req); 527 const transcriptionId = req.params.id; 528 529 // Verify ownership 530 const transcription = db 531 .query< 532 { 533 id: string; 534 user_id: number; 535 status: string; 536 original_filename: string; 537 }, 538 [string] 539 >( 540 "SELECT id, user_id, status, original_filename FROM transcriptions WHERE id = ?", 541 ) 542 .get(transcriptionId); 543 544 if (!transcription || transcription.user_id !== user.id) { 545 return Response.json( 546 { error: "Transcription not found" }, 547 { status: 404 }, 548 ); 549 } 550 551 if (transcription.status !== "completed") { 552 return Response.json( 553 { error: "Transcription not completed yet" }, 554 { status: 400 }, 555 ); 556 } 557 558 // Get format from query parameter 559 const url = new URL(req.url); 560 const format = url.searchParams.get("format"); 561 562 // Return WebVTT format if requested 563 if (format === "vtt") { 564 const vttContent = await getTranscriptVTT(transcriptionId); 565 566 if (!vttContent) { 567 return Response.json( 568 { error: "VTT transcript not available" }, 569 { status: 404 }, 570 ); 571 } 572 573 return new Response(vttContent, { 574 headers: { 575 "Content-Type": "text/vtt", 576 "Content-Disposition": `attachment; filename="${transcription.original_filename}.vtt"`, 577 }, 578 }); 579 } 580 581 // Default: return plain text transcript from file 582 const transcript = await getTranscript(transcriptionId); 583 if (!transcript) { 584 return Response.json( 585 { error: "Transcript not available" }, 586 { status: 404 }, 587 ); 588 } 589 590 return new Response(transcript, { 591 headers: { 592 "Content-Type": "text/plain", 593 }, 594 }); 595 } catch (error) { 596 return handleError(error); 597 } 598 }, 599 }, 600 "/api/transcriptions": { 601 GET: async (req) => { 602 try { 603 const user = requireAuth(req); 604 605 const transcriptions = db 606 .query< 607 { 608 id: string; 609 filename: string; 610 original_filename: string; 611 status: string; 612 progress: number; 613 created_at: number; 614 }, 615 [number] 616 >( 617 "SELECT id, filename, original_filename, status, progress, created_at FROM transcriptions WHERE user_id = ? ORDER BY created_at DESC", 618 ) 619 .all(user.id); 620 621 // Load transcripts from files for completed jobs 622 const jobs = await Promise.all( 623 transcriptions.map(async (t) => { 624 let transcript: string | null = null; 625 if (t.status === "completed") { 626 transcript = await getTranscript(t.id); 627 } 628 return { 629 id: t.id, 630 filename: t.original_filename, 631 status: t.status, 632 progress: t.progress, 633 transcript, 634 created_at: t.created_at, 635 }; 636 }), 637 ); 638 639 return Response.json({ jobs }); 640 } catch (error) { 641 return handleError(error); 642 } 643 }, 644 POST: async (req) => { 645 try { 646 const user = requireAuth(req); 647 648 const formData = await req.formData(); 649 const file = formData.get("audio") as File; 650 651 if (!file) throw ValidationErrors.missingField("audio"); 652 653 // Validate file type 654 const fileExtension = file.name.split(".").pop()?.toLowerCase(); 655 const allowedExtensions = [ 656 "mp3", 657 "wav", 658 "m4a", 659 "aac", 660 "ogg", 661 "webm", 662 "flac", 663 "mp4", 664 ]; 665 const isAudioType = 666 file.type.startsWith("audio/") || file.type === "video/mp4"; 667 const isAudioExtension = 668 fileExtension && allowedExtensions.includes(fileExtension); 669 670 if (!isAudioType && !isAudioExtension) { 671 throw ValidationErrors.unsupportedFileType( 672 "MP3, WAV, M4A, AAC, OGG, WebM, FLAC", 673 ); 674 } 675 676 if (file.size > MAX_FILE_SIZE) { 677 throw ValidationErrors.fileTooLarge("25MB"); 678 } 679 680 // Generate unique filename 681 const transcriptionId = crypto.randomUUID(); 682 const filename = `${transcriptionId}.${fileExtension}`; 683 684 // Save file to disk 685 const uploadDir = "./uploads"; 686 await Bun.write(`${uploadDir}/${filename}`, file); 687 688 // Create database record 689 db.run( 690 "INSERT INTO transcriptions (id, user_id, filename, original_filename, status) VALUES (?, ?, ?, ?, ?)", 691 [transcriptionId, user.id, filename, file.name, "uploading"], 692 ); 693 694 // Start transcription in background 695 whisperService.startTranscription(transcriptionId, filename); 696 697 return Response.json({ 698 id: transcriptionId, 699 message: "Upload successful, transcription started", 700 }); 701 } catch (error) { 702 return handleError(error); 703 } 704 }, 705 }, 706 }, 707 development: { 708 hmr: true, 709 console: true, 710 }, 711}); 712console.log(`馃 Thistle running at http://localhost:${server.port}`);