馃 distributed transcription service thistle.dunkirk.sh
1import db from "./db/schema"; 2import { 3 authenticateUser, 4 cleanupExpiredSessions, 5 createSession, 6 createUser, 7 deleteSession, 8 deleteUser, 9 getSession, 10 getSessionFromRequest, 11 getUserBySession, 12 getUserSessionsForUser, 13 updateUserAvatar, 14 updateUserEmail, 15 updateUserName, 16 updateUserPassword, 17} from "./lib/auth"; 18import { handleError, ValidationErrors } from "./lib/errors"; 19import { requireAuth } from "./lib/middleware"; 20import { 21 MAX_FILE_SIZE, 22 TranscriptionEventEmitter, 23 type TranscriptionUpdate, 24 WhisperServiceManager, 25} from "./lib/transcription"; 26import { getTranscript, getTranscriptVTT } from "./lib/transcript-storage"; 27import indexHTML from "./pages/index.html"; 28import settingsHTML from "./pages/settings.html"; 29import transcribeHTML from "./pages/transcribe.html"; 30 31// Environment variables 32const WHISPER_SERVICE_URL = 33 process.env.WHISPER_SERVICE_URL || "http://localhost:8000"; 34 35// Create uploads and transcripts directories if they don't exist 36await Bun.write("./uploads/.gitkeep", ""); 37await Bun.write("./transcripts/.gitkeep", ""); 38 39// Initialize transcription system 40console.log( 41 `[Transcription] Connecting to Murmur at ${WHISPER_SERVICE_URL}...`, 42); 43const transcriptionEvents = new TranscriptionEventEmitter(); 44const whisperService = new WhisperServiceManager( 45 WHISPER_SERVICE_URL, 46 db, 47 transcriptionEvents, 48); 49 50// Clean up expired sessions every hour 51setInterval(cleanupExpiredSessions, 60 * 60 * 1000); 52 53// Sync with Whisper DB on startup 54try { 55 await whisperService.syncWithWhisper(); 56 console.log("[Transcription] Successfully connected to Murmur"); 57} catch (error) { 58 console.warn( 59 "[Transcription] Murmur unavailable at startup:", 60 error instanceof Error ? error.message : "Unknown error", 61 ); 62} 63 64// Periodic sync every 5 minutes as backup (SSE handles real-time updates) 65setInterval(async () => { 66 try { 67 await whisperService.syncWithWhisper(); 68 } catch (error) { 69 console.warn( 70 "[Sync] Failed to sync with Murmur:", 71 error instanceof Error ? error.message : "Unknown error", 72 ); 73 } 74}, 5 * 60 * 1000); 75 76// Clean up stale files daily 77setInterval(() => whisperService.cleanupStaleFiles(), 24 * 60 * 60 * 1000); 78 79const server = Bun.serve({ 80 port: 3000, 81 idleTimeout: 120, // 120 seconds for SSE connections 82 routes: { 83 "/": indexHTML, 84 "/settings": settingsHTML, 85 "/transcribe": transcribeHTML, 86 "/api/auth/register": { 87 POST: async (req) => { 88 try { 89 const body = await req.json(); 90 const { email, password, name } = body; 91 if (!email || !password) { 92 return Response.json( 93 { error: "Email and password required" }, 94 { status: 400 }, 95 ); 96 } 97 if (password.length < 8) { 98 return Response.json( 99 { error: "Password must be at least 8 characters" }, 100 { status: 400 }, 101 ); 102 } 103 const user = await createUser(email, password, name); 104 const ipAddress = 105 req.headers.get("x-forwarded-for") ?? 106 req.headers.get("x-real-ip") ?? 107 "unknown"; 108 const userAgent = req.headers.get("user-agent") ?? "unknown"; 109 const sessionId = createSession(user.id, ipAddress, userAgent); 110 return Response.json( 111 { user: { id: user.id, email: user.email } }, 112 { 113 headers: { 114 "Set-Cookie": `session=${sessionId}; HttpOnly; Path=/; Max-Age=${7 * 24 * 60 * 60}; SameSite=Lax`, 115 }, 116 }, 117 ); 118 } catch (err: unknown) { 119 const error = err as { message?: string }; 120 if (error.message?.includes("UNIQUE constraint failed")) { 121 return Response.json( 122 { error: "Email already registered" }, 123 { status: 400 }, 124 ); 125 } 126 return Response.json( 127 { error: "Registration failed" }, 128 { status: 500 }, 129 ); 130 } 131 }, 132 }, 133 "/api/auth/login": { 134 POST: async (req) => { 135 try { 136 const body = await req.json(); 137 const { email, password } = body; 138 if (!email || !password) { 139 return Response.json( 140 { error: "Email and password required" }, 141 { status: 400 }, 142 ); 143 } 144 const user = await authenticateUser(email, password); 145 if (!user) { 146 return Response.json( 147 { error: "Invalid email or password" }, 148 { status: 401 }, 149 ); 150 } 151 const ipAddress = 152 req.headers.get("x-forwarded-for") ?? 153 req.headers.get("x-real-ip") ?? 154 "unknown"; 155 const userAgent = req.headers.get("user-agent") ?? "unknown"; 156 const sessionId = createSession(user.id, ipAddress, userAgent); 157 return Response.json( 158 { user: { id: user.id, email: user.email } }, 159 { 160 headers: { 161 "Set-Cookie": `session=${sessionId}; HttpOnly; Path=/; Max-Age=${7 * 24 * 60 * 60}; SameSite=Lax`, 162 }, 163 }, 164 ); 165 } catch { 166 return Response.json({ error: "Login failed" }, { status: 500 }); 167 } 168 }, 169 }, 170 "/api/auth/logout": { 171 POST: async (req) => { 172 const sessionId = getSessionFromRequest(req); 173 if (sessionId) { 174 deleteSession(sessionId); 175 } 176 return Response.json( 177 { success: true }, 178 { 179 headers: { 180 "Set-Cookie": 181 "session=; HttpOnly; Path=/; Max-Age=0; SameSite=Lax", 182 }, 183 }, 184 ); 185 }, 186 }, 187 "/api/auth/me": { 188 GET: (req) => { 189 const sessionId = getSessionFromRequest(req); 190 if (!sessionId) { 191 return Response.json({ error: "Not authenticated" }, { status: 401 }); 192 } 193 const user = getUserBySession(sessionId); 194 if (!user) { 195 return Response.json({ error: "Invalid session" }, { status: 401 }); 196 } 197 return Response.json({ 198 email: user.email, 199 name: user.name, 200 avatar: user.avatar, 201 created_at: user.created_at, 202 }); 203 }, 204 }, 205 "/api/sessions": { 206 GET: (req) => { 207 const sessionId = getSessionFromRequest(req); 208 if (!sessionId) { 209 return Response.json({ error: "Not authenticated" }, { status: 401 }); 210 } 211 const user = getUserBySession(sessionId); 212 if (!user) { 213 return Response.json({ error: "Invalid session" }, { status: 401 }); 214 } 215 const sessions = getUserSessionsForUser(user.id); 216 return Response.json({ 217 sessions: sessions.map((s) => ({ 218 id: s.id, 219 ip_address: s.ip_address, 220 user_agent: s.user_agent, 221 created_at: s.created_at, 222 expires_at: s.expires_at, 223 })), 224 }); 225 }, 226 DELETE: async (req) => { 227 const currentSessionId = getSessionFromRequest(req); 228 if (!currentSessionId) { 229 return Response.json({ error: "Not authenticated" }, { status: 401 }); 230 } 231 const user = getUserBySession(currentSessionId); 232 if (!user) { 233 return Response.json({ error: "Invalid session" }, { status: 401 }); 234 } 235 const body = await req.json(); 236 const targetSessionId = body.sessionId; 237 if (!targetSessionId) { 238 return Response.json( 239 { error: "Session ID required" }, 240 { status: 400 }, 241 ); 242 } 243 // Verify the session belongs to the user 244 const targetSession = getSession(targetSessionId); 245 if (!targetSession || targetSession.user_id !== user.id) { 246 return Response.json({ error: "Session not found" }, { status: 404 }); 247 } 248 deleteSession(targetSessionId); 249 return Response.json({ success: true }); 250 }, 251 }, 252 "/api/user": { 253 DELETE: (req) => { 254 const sessionId = getSessionFromRequest(req); 255 if (!sessionId) { 256 return Response.json({ error: "Not authenticated" }, { status: 401 }); 257 } 258 const user = getUserBySession(sessionId); 259 if (!user) { 260 return Response.json({ error: "Invalid session" }, { status: 401 }); 261 } 262 deleteUser(user.id); 263 return Response.json( 264 { success: true }, 265 { 266 headers: { 267 "Set-Cookie": 268 "session=; HttpOnly; Path=/; Max-Age=0; SameSite=Lax", 269 }, 270 }, 271 ); 272 }, 273 }, 274 "/api/user/email": { 275 PUT: async (req) => { 276 const sessionId = getSessionFromRequest(req); 277 if (!sessionId) { 278 return Response.json({ error: "Not authenticated" }, { status: 401 }); 279 } 280 const user = getUserBySession(sessionId); 281 if (!user) { 282 return Response.json({ error: "Invalid session" }, { status: 401 }); 283 } 284 const body = await req.json(); 285 const { email } = body; 286 if (!email) { 287 return Response.json({ error: "Email required" }, { status: 400 }); 288 } 289 try { 290 updateUserEmail(user.id, email); 291 return Response.json({ success: true }); 292 } catch (err: unknown) { 293 const error = err as { message?: string }; 294 if (error.message?.includes("UNIQUE constraint failed")) { 295 return Response.json( 296 { error: "Email already in use" }, 297 { status: 400 }, 298 ); 299 } 300 return Response.json( 301 { error: "Failed to update email" }, 302 { status: 500 }, 303 ); 304 } 305 }, 306 }, 307 "/api/user/password": { 308 PUT: async (req) => { 309 const sessionId = getSessionFromRequest(req); 310 if (!sessionId) { 311 return Response.json({ error: "Not authenticated" }, { status: 401 }); 312 } 313 const user = getUserBySession(sessionId); 314 if (!user) { 315 return Response.json({ error: "Invalid session" }, { status: 401 }); 316 } 317 const body = await req.json(); 318 const { password } = body; 319 if (!password) { 320 return Response.json({ error: "Password required" }, { status: 400 }); 321 } 322 if (password.length < 8) { 323 return Response.json( 324 { error: "Password must be at least 8 characters" }, 325 { status: 400 }, 326 ); 327 } 328 try { 329 await updateUserPassword(user.id, password); 330 return Response.json({ success: true }); 331 } catch { 332 return Response.json( 333 { error: "Failed to update password" }, 334 { status: 500 }, 335 ); 336 } 337 }, 338 }, 339 "/api/user/name": { 340 PUT: async (req) => { 341 const sessionId = getSessionFromRequest(req); 342 if (!sessionId) { 343 return Response.json({ error: "Not authenticated" }, { status: 401 }); 344 } 345 const user = getUserBySession(sessionId); 346 if (!user) { 347 return Response.json({ error: "Invalid session" }, { status: 401 }); 348 } 349 const body = await req.json(); 350 const { name } = body; 351 if (!name) { 352 return Response.json({ error: "Name required" }, { status: 400 }); 353 } 354 try { 355 updateUserName(user.id, name); 356 return Response.json({ success: true }); 357 } catch { 358 return Response.json( 359 { error: "Failed to update name" }, 360 { status: 500 }, 361 ); 362 } 363 }, 364 }, 365 "/api/user/avatar": { 366 PUT: async (req) => { 367 const sessionId = getSessionFromRequest(req); 368 if (!sessionId) { 369 return Response.json({ error: "Not authenticated" }, { status: 401 }); 370 } 371 const user = getUserBySession(sessionId); 372 if (!user) { 373 return Response.json({ error: "Invalid session" }, { status: 401 }); 374 } 375 const body = await req.json(); 376 const { avatar } = body; 377 if (!avatar) { 378 return Response.json({ error: "Avatar required" }, { status: 400 }); 379 } 380 try { 381 updateUserAvatar(user.id, avatar); 382 return Response.json({ success: true }); 383 } catch { 384 return Response.json( 385 { error: "Failed to update avatar" }, 386 { status: 500 }, 387 ); 388 } 389 }, 390 }, 391 "/api/transcriptions/:id/stream": { 392 GET: async (req) => { 393 const sessionId = getSessionFromRequest(req); 394 if (!sessionId) { 395 return Response.json({ error: "Not authenticated" }, { status: 401 }); 396 } 397 const user = getUserBySession(sessionId); 398 if (!user) { 399 return Response.json({ error: "Invalid session" }, { status: 401 }); 400 } 401 const transcriptionId = req.params.id; 402 // Verify ownership 403 const transcription = db 404 .query<{ id: string; user_id: number; status: string }, [string]>( 405 "SELECT id, user_id, status FROM transcriptions WHERE id = ?", 406 ) 407 .get(transcriptionId); 408 if (!transcription || transcription.user_id !== user.id) { 409 return Response.json( 410 { error: "Transcription not found" }, 411 { status: 404 }, 412 ); 413 } 414 // Event-driven SSE stream with reconnection support 415 const stream = new ReadableStream({ 416 async start(controller) { 417 const encoder = new TextEncoder(); 418 let isClosed = false; 419 let lastEventId = Math.floor(Date.now() / 1000); 420 421 const sendEvent = (data: Partial<TranscriptionUpdate>) => { 422 if (isClosed) return; 423 try { 424 // Send event ID for reconnection support 425 lastEventId = Math.floor(Date.now() / 1000); 426 controller.enqueue( 427 encoder.encode( 428 `id: ${lastEventId}\nevent: update\ndata: ${JSON.stringify(data)}\n\n`, 429 ), 430 ); 431 } catch { 432 // Controller already closed (client disconnected) 433 isClosed = true; 434 } 435 }; 436 437 const sendHeartbeat = () => { 438 if (isClosed) return; 439 try { 440 controller.enqueue(encoder.encode(": heartbeat\n\n")); 441 } catch { 442 isClosed = true; 443 } 444 }; 445 // Send initial state from DB and file 446 const current = db 447 .query< 448 { 449 status: string; 450 progress: number; 451 }, 452 [string] 453 >( 454 "SELECT status, progress FROM transcriptions WHERE id = ?", 455 ) 456 .get(transcriptionId); 457 if (current) { 458 // Load transcript from file if completed 459 let transcript: string | undefined; 460 if (current.status === "completed") { 461 transcript = (await getTranscript(transcriptionId)) || undefined; 462 } 463 sendEvent({ 464 status: current.status as TranscriptionUpdate["status"], 465 progress: current.progress, 466 transcript, 467 }); 468 } 469 // If already complete, close immediately 470 if ( 471 current?.status === "completed" || 472 current?.status === "failed" 473 ) { 474 isClosed = true; 475 controller.close(); 476 return; 477 } 478 // Send heartbeats every 2.5 seconds to keep connection alive 479 const heartbeatInterval = setInterval(sendHeartbeat, 2500); 480 481 // Subscribe to EventEmitter for live updates 482 const updateHandler = (data: TranscriptionUpdate) => { 483 if (isClosed) return; 484 485 // Only send changed fields to save bandwidth 486 const payload: Partial<TranscriptionUpdate> = { 487 status: data.status, 488 progress: data.progress, 489 }; 490 491 if (data.transcript !== undefined) { 492 payload.transcript = data.transcript; 493 } 494 if (data.error_message !== undefined) { 495 payload.error_message = data.error_message; 496 } 497 498 sendEvent(payload); 499 500 // Close stream when done 501 if (data.status === "completed" || data.status === "failed") { 502 isClosed = true; 503 clearInterval(heartbeatInterval); 504 transcriptionEvents.off(transcriptionId, updateHandler); 505 controller.close(); 506 } 507 }; 508 transcriptionEvents.on(transcriptionId, updateHandler); 509 // Cleanup on client disconnect 510 return () => { 511 isClosed = true; 512 clearInterval(heartbeatInterval); 513 transcriptionEvents.off(transcriptionId, updateHandler); 514 }; 515 }, 516 }); 517 return new Response(stream, { 518 headers: { 519 "Content-Type": "text/event-stream", 520 "Cache-Control": "no-cache", 521 Connection: "keep-alive", 522 }, 523 }); 524 }, 525 }, 526 "/api/transcriptions/health": { 527 GET: async () => { 528 const isHealthy = await whisperService.checkHealth(); 529 return Response.json({ available: isHealthy }); 530 }, 531 }, 532 "/api/transcriptions/:id": { 533 GET: async (req) => { 534 try { 535 const user = requireAuth(req); 536 const transcriptionId = req.params.id; 537 538 // Verify ownership 539 const transcription = db 540 .query< 541 { 542 id: string; 543 user_id: number; 544 status: string; 545 original_filename: string; 546 }, 547 [string] 548 >( 549 "SELECT id, user_id, status, original_filename FROM transcriptions WHERE id = ?", 550 ) 551 .get(transcriptionId); 552 553 if (!transcription || transcription.user_id !== user.id) { 554 return Response.json( 555 { error: "Transcription not found" }, 556 { status: 404 }, 557 ); 558 } 559 560 if (transcription.status !== "completed") { 561 return Response.json( 562 { error: "Transcription not completed yet" }, 563 { status: 400 }, 564 ); 565 } 566 567 // Get format from query parameter 568 const url = new URL(req.url); 569 const format = url.searchParams.get("format"); 570 571 // Return WebVTT format if requested 572 if (format === "vtt") { 573 const vttContent = await getTranscriptVTT(transcriptionId); 574 575 if (!vttContent) { 576 return Response.json( 577 { error: "VTT transcript not available" }, 578 { status: 404 }, 579 ); 580 } 581 582 return new Response(vttContent, { 583 headers: { 584 "Content-Type": "text/vtt", 585 "Content-Disposition": `attachment; filename="${transcription.original_filename}.vtt"`, 586 }, 587 }); 588 } 589 590 // Default: return plain text transcript from file 591 const transcript = await getTranscript(transcriptionId); 592 if (!transcript) { 593 return Response.json( 594 { error: "Transcript not available" }, 595 { status: 404 }, 596 ); 597 } 598 599 return new Response(transcript, { 600 headers: { 601 "Content-Type": "text/plain", 602 }, 603 }); 604 } catch (error) { 605 return handleError(error); 606 } 607 }, 608 }, 609 "/api/transcriptions/:id/audio": { 610 GET: async (req) => { 611 try { 612 const user = requireAuth(req); 613 const transcriptionId = req.params.id; 614 615 // Verify ownership and get filename 616 const transcription = db 617 .query< 618 { 619 id: string; 620 user_id: number; 621 filename: string; 622 status: string; 623 }, 624 [string] 625 >("SELECT id, user_id, filename, status FROM transcriptions WHERE id = ?") 626 .get(transcriptionId); 627 628 if (!transcription || transcription.user_id !== user.id) { 629 return Response.json( 630 { error: "Transcription not found" }, 631 { status: 404 }, 632 ); 633 } 634 635 if (transcription.status !== "completed") { 636 return Response.json( 637 { error: "Transcription not completed yet" }, 638 { status: 400 }, 639 ); 640 } 641 642 // Serve the audio file with range request support 643 const filePath = `./uploads/${transcription.filename}`; 644 const file = Bun.file(filePath); 645 646 if (!(await file.exists())) { 647 return Response.json({ error: "Audio file not found" }, { status: 404 }); 648 } 649 650 const fileSize = file.size; 651 const range = req.headers.get("range"); 652 653 // Handle range requests for seeking 654 if (range) { 655 const parts = range.replace(/bytes=/, "").split("-"); 656 const start = Number.parseInt(parts[0] || "0", 10); 657 const end = parts[1] ? Number.parseInt(parts[1], 10) : fileSize - 1; 658 const chunkSize = end - start + 1; 659 660 const fileSlice = file.slice(start, end + 1); 661 662 return new Response(fileSlice, { 663 status: 206, 664 headers: { 665 "Content-Range": `bytes ${start}-${end}/${fileSize}`, 666 "Accept-Ranges": "bytes", 667 "Content-Length": chunkSize.toString(), 668 "Content-Type": file.type || "audio/mpeg", 669 }, 670 }); 671 } 672 673 // No range request, send entire file 674 return new Response(file, { 675 headers: { 676 "Content-Type": file.type || "audio/mpeg", 677 "Accept-Ranges": "bytes", 678 "Content-Length": fileSize.toString(), 679 }, 680 }); 681 } catch (error) { 682 return handleError(error); 683 } 684 }, 685 }, 686 "/api/transcriptions": { 687 GET: async (req) => { 688 try { 689 const user = requireAuth(req); 690 691 const transcriptions = db 692 .query< 693 { 694 id: string; 695 filename: string; 696 original_filename: string; 697 status: string; 698 progress: number; 699 created_at: number; 700 }, 701 [number] 702 >( 703 "SELECT id, filename, original_filename, status, progress, created_at FROM transcriptions WHERE user_id = ? ORDER BY created_at DESC", 704 ) 705 .all(user.id); 706 707 // Load transcripts from files for completed jobs 708 const jobs = await Promise.all( 709 transcriptions.map(async (t) => { 710 let transcript: string | null = null; 711 if (t.status === "completed") { 712 transcript = await getTranscript(t.id); 713 } 714 return { 715 id: t.id, 716 filename: t.original_filename, 717 status: t.status, 718 progress: t.progress, 719 transcript, 720 created_at: t.created_at, 721 }; 722 }), 723 ); 724 725 return Response.json({ jobs }); 726 } catch (error) { 727 return handleError(error); 728 } 729 }, 730 POST: async (req) => { 731 try { 732 const user = requireAuth(req); 733 734 const formData = await req.formData(); 735 const file = formData.get("audio") as File; 736 737 if (!file) throw ValidationErrors.missingField("audio"); 738 739 // Validate file type 740 const fileExtension = file.name.split(".").pop()?.toLowerCase(); 741 const allowedExtensions = [ 742 "mp3", 743 "wav", 744 "m4a", 745 "aac", 746 "ogg", 747 "webm", 748 "flac", 749 "mp4", 750 ]; 751 const isAudioType = 752 file.type.startsWith("audio/") || file.type === "video/mp4"; 753 const isAudioExtension = 754 fileExtension && allowedExtensions.includes(fileExtension); 755 756 if (!isAudioType && !isAudioExtension) { 757 throw ValidationErrors.unsupportedFileType( 758 "MP3, WAV, M4A, AAC, OGG, WebM, FLAC", 759 ); 760 } 761 762 if (file.size > MAX_FILE_SIZE) { 763 throw ValidationErrors.fileTooLarge("25MB"); 764 } 765 766 // Generate unique filename 767 const transcriptionId = crypto.randomUUID(); 768 const filename = `${transcriptionId}.${fileExtension}`; 769 770 // Save file to disk 771 const uploadDir = "./uploads"; 772 await Bun.write(`${uploadDir}/${filename}`, file); 773 774 // Create database record 775 db.run( 776 "INSERT INTO transcriptions (id, user_id, filename, original_filename, status) VALUES (?, ?, ?, ?, ?)", 777 [transcriptionId, user.id, filename, file.name, "uploading"], 778 ); 779 780 // Start transcription in background 781 whisperService.startTranscription(transcriptionId, filename); 782 783 return Response.json({ 784 id: transcriptionId, 785 message: "Upload successful, transcription started", 786 }); 787 } catch (error) { 788 return handleError(error); 789 } 790 }, 791 }, 792 }, 793 development: { 794 hmr: true, 795 console: true, 796 }, 797}); 798console.log(`馃 Thistle running at http://localhost:${server.port}`);