Monorepo for wisp.place. A static site hosting service built on top of the AT Protocol. wisp.place
1import { Elysia } from 'elysia' 2import { requireAuth, type AuthenticatedContext } from '../lib/wisp-auth' 3import { NodeOAuthClient } from '@atproto/oauth-client-node' 4import { Agent } from '@atproto/api' 5import { TID } from '@atproto/common-web' 6import { 7 type UploadedFile, 8 type FileUploadResult, 9 processUploadedFiles, 10 createManifest, 11 updateFileBlobs, 12 shouldCompressFile, 13 compressFile, 14 computeCID, 15 extractBlobMap, 16 extractSubfsUris, 17 findLargeDirectories, 18 replaceDirectoryWithSubfs, 19 estimateDirectorySize 20} from '../lib/wisp-utils' 21import { upsertSite } from '../lib/db' 22import { logger } from '../lib/observability' 23import { validateRecord } from '../lexicons/types/place/wisp/fs' 24import { validateRecord as validateSubfsRecord } from '../lexicons/types/place/wisp/subfs' 25import { MAX_SITE_SIZE, MAX_FILE_SIZE, MAX_FILE_COUNT } from '../lib/constants' 26import { 27 createUploadJob, 28 getUploadJob, 29 updateJobProgress, 30 completeUploadJob, 31 failUploadJob, 32 addJobListener 33} from '../lib/upload-jobs' 34 35function isValidSiteName(siteName: string): boolean { 36 if (!siteName || typeof siteName !== 'string') return false; 37 38 // Length check (AT Protocol rkey limit) 39 if (siteName.length < 1 || siteName.length > 512) return false; 40 41 // Check for path traversal 42 if (siteName === '.' || siteName === '..') return false; 43 if (siteName.includes('/') || siteName.includes('\\')) return false; 44 if (siteName.includes('\0')) return false; 45 46 // AT Protocol rkey format: alphanumeric, dots, dashes, underscores, tildes, colons 47 // Based on NSID format rules 48 const validRkeyPattern = /^[a-zA-Z0-9._~:-]+$/; 49 if (!validRkeyPattern.test(siteName)) return false; 50 51 return true; 52} 53 54async function processUploadInBackground( 55 jobId: string, 56 agent: Agent, 57 did: string, 58 siteName: string, 59 fileArray: File[] 60): Promise<void> { 61 try { 62 // Try to fetch existing record to enable incremental updates 63 let existingBlobMap = new Map<string, { blobRef: any; cid: string }>(); 64 let oldSubfsUris: Array<{ uri: string; path: string }> = []; 65 console.log('Attempting to fetch existing record...'); 66 updateJobProgress(jobId, { phase: 'validating' }); 67 68 try { 69 const rkey = siteName; 70 const existingRecord = await agent.com.atproto.repo.getRecord({ 71 repo: did, 72 collection: 'place.wisp.fs', 73 rkey: rkey 74 }); 75 console.log('Existing record found!'); 76 77 if (existingRecord.data.value && typeof existingRecord.data.value === 'object' && 'root' in existingRecord.data.value) { 78 const manifest = existingRecord.data.value as any; 79 80 // Extract blob map from main record 81 existingBlobMap = extractBlobMap(manifest.root); 82 console.log(`Found existing manifest with ${existingBlobMap.size} files in main record`); 83 84 // Extract subfs URIs with their mount paths from main record 85 const subfsUris = extractSubfsUris(manifest.root); 86 oldSubfsUris = subfsUris; // Save for cleanup later 87 88 if (subfsUris.length > 0) { 89 console.log(`Found ${subfsUris.length} subfs records, fetching in parallel...`); 90 logger.info(`Fetching ${subfsUris.length} subfs records for blob reuse`); 91 92 // Fetch all subfs records in parallel 93 const subfsRecords = await Promise.all( 94 subfsUris.map(async ({ uri, path }) => { 95 try { 96 // Parse URI: at://did/collection/rkey 97 const parts = uri.replace('at://', '').split('/'); 98 const subDid = parts[0]; 99 const collection = parts[1]; 100 const subRkey = parts[2]; 101 102 const record = await agent.com.atproto.repo.getRecord({ 103 repo: subDid, 104 collection: collection, 105 rkey: subRkey 106 }); 107 108 return { record: record.data.value as any, mountPath: path }; 109 } catch (err: any) { 110 logger.warn(`Failed to fetch subfs record ${uri}: ${err?.message}`, err); 111 return null; 112 } 113 }) 114 ); 115 116 // Merge blob maps from all subfs records 117 let totalSubfsBlobs = 0; 118 for (const subfsData of subfsRecords) { 119 if (subfsData && subfsData.record && 'root' in subfsData.record) { 120 // Extract blobs with the correct mount path prefix 121 const subfsMap = extractBlobMap(subfsData.record.root, subfsData.mountPath); 122 subfsMap.forEach((value, key) => { 123 existingBlobMap.set(key, value); 124 totalSubfsBlobs++; 125 }); 126 } 127 } 128 129 console.log(`Merged ${totalSubfsBlobs} files from ${subfsUris.length} subfs records`); 130 logger.info(`Total blob map: ${existingBlobMap.size} files (main + subfs)`); 131 } 132 133 console.log(`Total existing blobs for reuse: ${existingBlobMap.size} files`); 134 logger.info(`Found existing manifest with ${existingBlobMap.size} files for incremental update`); 135 } 136 } catch (error: any) { 137 console.log('No existing record found or error:', error?.message || error); 138 if (error?.status !== 400 && error?.error !== 'RecordNotFound') { 139 logger.warn('Failed to fetch existing record, proceeding with full upload', error); 140 } 141 } 142 143 // Convert File objects to UploadedFile format 144 const uploadedFiles: UploadedFile[] = []; 145 const skippedFiles: Array<{ name: string; reason: string }> = []; 146 147 console.log('Processing files, count:', fileArray.length); 148 updateJobProgress(jobId, { phase: 'compressing' }); 149 150 for (let i = 0; i < fileArray.length; i++) { 151 const file = fileArray[i]; 152 console.log(`Processing file ${i + 1}/${fileArray.length}:`, file.name, file.size, 'bytes'); 153 updateJobProgress(jobId, { 154 filesProcessed: i + 1, 155 currentFile: file.name 156 }); 157 158 // Skip .git directory files 159 const normalizedPath = file.name.replace(/^[^\/]*\//, ''); 160 if (normalizedPath.startsWith('.git/') || normalizedPath === '.git') { 161 console.log(`Skipping .git file: ${file.name}`); 162 skippedFiles.push({ 163 name: file.name, 164 reason: '.git directory excluded' 165 }); 166 continue; 167 } 168 169 // Skip files that are too large 170 const maxSize = MAX_FILE_SIZE; 171 if (file.size > maxSize) { 172 skippedFiles.push({ 173 name: file.name, 174 reason: `file too large (${(file.size / 1024 / 1024).toFixed(2)}MB, max 100MB)` 175 }); 176 continue; 177 } 178 179 const arrayBuffer = await file.arrayBuffer(); 180 const originalContent = Buffer.from(arrayBuffer); 181 const originalMimeType = file.type || 'application/octet-stream'; 182 183 // Compress and base64 encode ALL files 184 const compressedContent = compressFile(originalContent); 185 const base64Content = Buffer.from(compressedContent.toString('base64'), 'binary'); 186 const compressionRatio = (compressedContent.length / originalContent.length * 100).toFixed(1); 187 console.log(`Compressing ${file.name}: ${originalContent.length} -> ${compressedContent.length} bytes (${compressionRatio}%), base64: ${base64Content.length} bytes`); 188 logger.info(`Compressing ${file.name}: ${originalContent.length} -> ${compressedContent.length} bytes (${compressionRatio}%), base64: ${base64Content.length} bytes`); 189 190 uploadedFiles.push({ 191 name: file.name, 192 content: base64Content, 193 mimeType: originalMimeType, 194 size: base64Content.length, 195 compressed: true, 196 originalMimeType 197 }); 198 } 199 200 // Update total file count after filtering (important for progress tracking) 201 updateJobProgress(jobId, { 202 totalFiles: uploadedFiles.length 203 }); 204 205 // Check total size limit 206 const totalSize = uploadedFiles.reduce((sum, file) => sum + file.size, 0); 207 const maxTotalSize = MAX_SITE_SIZE; 208 209 if (totalSize > maxTotalSize) { 210 throw new Error(`Total upload size ${(totalSize / 1024 / 1024).toFixed(2)}MB exceeds 300MB limit`); 211 } 212 213 // Check file count limit 214 if (uploadedFiles.length > MAX_FILE_COUNT) { 215 throw new Error(`File count ${uploadedFiles.length} exceeds ${MAX_FILE_COUNT} files limit`); 216 } 217 218 console.log(`After filtering: ${uploadedFiles.length} files to process (${skippedFiles.length} skipped)`); 219 220 if (uploadedFiles.length === 0) { 221 // Create empty manifest 222 const emptyManifest = { 223 $type: 'place.wisp.fs', 224 site: siteName, 225 root: { 226 type: 'directory', 227 entries: [] 228 }, 229 fileCount: 0, 230 createdAt: new Date().toISOString() 231 }; 232 233 const validationResult = validateRecord(emptyManifest); 234 if (!validationResult.success) { 235 throw new Error(`Invalid manifest: ${validationResult.error?.message || 'Validation failed'}`); 236 } 237 238 const rkey = siteName; 239 updateJobProgress(jobId, { phase: 'finalizing' }); 240 241 const record = await agent.com.atproto.repo.putRecord({ 242 repo: did, 243 collection: 'place.wisp.fs', 244 rkey: rkey, 245 record: emptyManifest 246 }); 247 248 await upsertSite(did, rkey, siteName); 249 250 completeUploadJob(jobId, { 251 success: true, 252 uri: record.data.uri, 253 cid: record.data.cid, 254 fileCount: 0, 255 siteName, 256 skippedFiles 257 }); 258 return; 259 } 260 261 // Process files into directory structure 262 console.log('Processing uploaded files into directory structure...'); 263 const validUploadedFiles = uploadedFiles.filter((f, i) => { 264 if (!f || !f.name || !f.content) { 265 console.error(`Filtering out invalid file at index ${i}`); 266 return false; 267 } 268 return true; 269 }); 270 271 const { directory, fileCount } = processUploadedFiles(validUploadedFiles); 272 console.log('Directory structure created, file count:', fileCount); 273 274 // Upload files as blobs with retry logic for DPoP nonce conflicts 275 console.log('Starting blob upload/reuse phase...'); 276 updateJobProgress(jobId, { phase: 'uploading' }); 277 278 // Helper function to upload blob with exponential backoff retry 279 const uploadBlobWithRetry = async ( 280 agent: Agent, 281 content: Buffer, 282 mimeType: string, 283 fileName: string, 284 maxRetries = 3 285 ) => { 286 for (let attempt = 0; attempt < maxRetries; attempt++) { 287 try { 288 return await agent.com.atproto.repo.uploadBlob(content, { encoding: mimeType }); 289 } catch (error: any) { 290 const isDPoPNonceError = 291 error?.message?.toLowerCase().includes('nonce') || 292 error?.message?.toLowerCase().includes('dpop') || 293 error?.status === 409; 294 295 if (isDPoPNonceError && attempt < maxRetries - 1) { 296 const backoffMs = 100 * Math.pow(2, attempt); // 100ms, 200ms, 400ms 297 logger.info(`[File Upload] 🔄 DPoP nonce conflict for ${fileName}, retrying in ${backoffMs}ms (attempt ${attempt + 1}/${maxRetries})`); 298 await new Promise(resolve => setTimeout(resolve, backoffMs)); 299 continue; 300 } 301 throw error; 302 } 303 } 304 throw new Error(`Failed to upload ${fileName} after ${maxRetries} attempts`); 305 }; 306 307 // Use sliding window concurrency for maximum throughput 308 const CONCURRENCY_LIMIT = 50; // Maximum concurrent uploads with retry logic 309 const uploadedBlobs: Array<{ 310 result: FileUploadResult; 311 filePath: string; 312 sentMimeType: string; 313 returnedMimeType: string; 314 reused: boolean; 315 }> = []; 316 317 // Process file with sliding window concurrency 318 const processFile = async (file: UploadedFile, index: number) => { 319 try { 320 if (!file || !file.name) { 321 throw new Error(`Undefined file at index ${index}`); 322 } 323 324 const fileCID = computeCID(file.content); 325 const normalizedPath = file.name.replace(/^[^\/]*\//, ''); 326 const existingBlob = existingBlobMap.get(normalizedPath) || existingBlobMap.get(file.name); 327 328 if (existingBlob && existingBlob.cid === fileCID) { 329 logger.info(`[File Upload] ♻️ Reused: ${file.name} (unchanged, CID: ${fileCID})`); 330 updateJobProgress(jobId, { filesReused: (getUploadJob(jobId)?.progress.filesReused || 0) + 1 }); 331 332 return { 333 result: { 334 hash: existingBlob.cid, 335 blobRef: existingBlob.blobRef, 336 ...(file.compressed && { 337 encoding: 'gzip' as const, 338 mimeType: file.originalMimeType || file.mimeType, 339 base64: true 340 }) 341 }, 342 filePath: file.name, 343 sentMimeType: file.mimeType, 344 returnedMimeType: existingBlob.blobRef.mimeType, 345 reused: true 346 }; 347 } 348 349 const uploadMimeType = file.compressed || file.mimeType.startsWith('text/html') 350 ? 'application/octet-stream' 351 : file.mimeType; 352 353 const compressionInfo = file.compressed ? ' (gzipped)' : ''; 354 const fileSizeMB = (file.size / 1024 / 1024).toFixed(2); 355 logger.info(`[File Upload] ⬆️ Uploading: ${file.name} (${fileSizeMB}MB${compressionInfo})`); 356 357 const uploadResult = await uploadBlobWithRetry( 358 agent, 359 file.content, 360 uploadMimeType, 361 file.name 362 ); 363 364 const returnedBlobRef = uploadResult.data.blob; 365 updateJobProgress(jobId, { filesUploaded: (getUploadJob(jobId)?.progress.filesUploaded || 0) + 1 }); 366 logger.info(`[File Upload] ✅ Uploaded: ${file.name} (CID: ${fileCID})`); 367 368 return { 369 result: { 370 hash: returnedBlobRef.ref.toString(), 371 blobRef: returnedBlobRef, 372 ...(file.compressed && { 373 encoding: 'gzip' as const, 374 mimeType: file.originalMimeType || file.mimeType, 375 base64: true 376 }) 377 }, 378 filePath: file.name, 379 sentMimeType: file.mimeType, 380 returnedMimeType: returnedBlobRef.mimeType, 381 reused: false 382 }; 383 } catch (uploadError) { 384 logger.error('Upload failed for file', uploadError); 385 throw uploadError; 386 } 387 }; 388 389 // Sliding window concurrency control 390 const processWithConcurrency = async () => { 391 const results: any[] = []; 392 let fileIndex = 0; 393 const executing = new Set<Promise<void>>(); 394 395 for (const file of validUploadedFiles) { 396 const currentIndex = fileIndex++; 397 398 const promise = processFile(file, currentIndex) 399 .then(result => { 400 results[currentIndex] = result; 401 }) 402 .catch(error => { 403 logger.error(`Failed to process file at index ${currentIndex}`, error); 404 throw error; // Re-throw to fail the entire upload 405 }) 406 .finally(() => { 407 executing.delete(promise); 408 }); 409 410 executing.add(promise); 411 412 if (executing.size >= CONCURRENCY_LIMIT) { 413 await Promise.race(executing); 414 } 415 } 416 417 // Wait for remaining uploads 418 await Promise.all(executing); 419 return results.filter(r => r !== undefined); // Filter out any undefined entries 420 }; 421 422 const allResults = await processWithConcurrency(); 423 uploadedBlobs.push(...allResults); 424 425 const currentReused = uploadedBlobs.filter(b => b.reused).length; 426 const currentUploaded = uploadedBlobs.filter(b => !b.reused).length; 427 logger.info(`[File Upload] 🎉 Upload complete → ${uploadedBlobs.length}/${validUploadedFiles.length} files (${currentUploaded} uploaded, ${currentReused} reused)`); 428 429 const reusedCount = uploadedBlobs.filter(b => b.reused).length; 430 const uploadedCount = uploadedBlobs.filter(b => !b.reused).length; 431 logger.info(`[File Upload] 🎉 Upload phase complete! Total: ${uploadedBlobs.length} files (${uploadedCount} uploaded, ${reusedCount} reused)`); 432 433 const uploadResults: FileUploadResult[] = uploadedBlobs.map(blob => blob.result); 434 const filePaths: string[] = uploadedBlobs.map(blob => blob.filePath); 435 436 // Update directory with file blobs 437 console.log('Updating directory with blob references...'); 438 updateJobProgress(jobId, { phase: 'creating_manifest' }); 439 const updatedDirectory = updateFileBlobs(directory, uploadResults, filePaths); 440 441 // Check if we need to split into subfs records 442 // Split proactively if we have lots of files to avoid hitting manifest size limits 443 const MAX_MANIFEST_SIZE = 140 * 1024; // 140KB to be safe (PDS limit is 150KB) 444 const FILE_COUNT_THRESHOLD = 250; // Start splitting early 445 const subfsRecords: Array<{ uri: string; path: string }> = []; 446 let workingDirectory = updatedDirectory; 447 let currentFileCount = fileCount; 448 449 // Create initial manifest to check size 450 let manifest = createManifest(siteName, workingDirectory, fileCount); 451 let manifestSize = JSON.stringify(manifest).length; 452 453 // Split if we have lots of files OR if manifest is already too large 454 if (fileCount >= FILE_COUNT_THRESHOLD || manifestSize > MAX_MANIFEST_SIZE) { 455 console.log(`⚠️ Large site detected (${fileCount} files, ${(manifestSize / 1024).toFixed(1)}KB), splitting into subfs records...`); 456 logger.info(`Large site with ${fileCount} files, splitting into subfs records`); 457 458 // Keep splitting until manifest fits under limit 459 let attempts = 0; 460 const MAX_ATTEMPTS = 100; // Allow many splits for very large sites 461 462 while (manifestSize > MAX_MANIFEST_SIZE && attempts < MAX_ATTEMPTS) { 463 attempts++; 464 465 // Find all directories sorted by size (largest first) 466 const directories = findLargeDirectories(workingDirectory); 467 directories.sort((a, b) => b.size - a.size); 468 469 if (directories.length === 0) { 470 // No more directories to split - this should be very rare 471 throw new Error( 472 `Cannot split manifest further - no subdirectories available. ` + 473 `Current size: ${(manifestSize / 1024).toFixed(1)}KB. ` + 474 `Try organizing files into subdirectories.` 475 ); 476 } 477 478 // Pick the largest directory 479 const largestDir = directories[0]; 480 console.log(` Split #${attempts}: ${largestDir.path} (${largestDir.fileCount} files, ${(largestDir.size / 1024).toFixed(1)}KB)`); 481 482 // Create a subfs record for this directory 483 const subfsRkey = TID.nextStr(); 484 const subfsManifest = { 485 $type: 'place.wisp.subfs' as const, 486 root: largestDir.directory, 487 fileCount: largestDir.fileCount, 488 createdAt: new Date().toISOString() 489 }; 490 491 // Validate subfs record 492 const subfsValidation = validateSubfsRecord(subfsManifest); 493 if (!subfsValidation.success) { 494 throw new Error(`Invalid subfs manifest: ${subfsValidation.error?.message || 'Validation failed'}`); 495 } 496 497 // Upload subfs record to PDS 498 const subfsRecord = await agent.com.atproto.repo.putRecord({ 499 repo: did, 500 collection: 'place.wisp.subfs', 501 rkey: subfsRkey, 502 record: subfsManifest 503 }); 504 505 const subfsUri = subfsRecord.data.uri; 506 subfsRecords.push({ uri: subfsUri, path: largestDir.path }); 507 console.log(` ✅ Created subfs: ${subfsUri}`); 508 logger.info(`Created subfs record for ${largestDir.path}: ${subfsUri}`); 509 510 // Replace directory with subfs node in the main tree 511 workingDirectory = replaceDirectoryWithSubfs(workingDirectory, largestDir.path, subfsUri); 512 513 // Recreate manifest and check new size 514 currentFileCount -= largestDir.fileCount; 515 manifest = createManifest(siteName, workingDirectory, fileCount); 516 manifestSize = JSON.stringify(manifest).length; 517 const newSizeKB = (manifestSize / 1024).toFixed(1); 518 console.log(` → Manifest now ${newSizeKB}KB with ${currentFileCount} files (${subfsRecords.length} subfs total)`); 519 520 // Check if we're under the limit now 521 if (manifestSize <= MAX_MANIFEST_SIZE) { 522 console.log(` ✅ Manifest fits! (${newSizeKB}KB < 140KB)`); 523 break; 524 } 525 } 526 527 if (manifestSize > MAX_MANIFEST_SIZE) { 528 throw new Error( 529 `Failed to fit manifest after splitting ${attempts} directories. ` + 530 `Current size: ${(manifestSize / 1024).toFixed(1)}KB. ` + 531 `This should never happen - please report this issue.` 532 ); 533 } 534 535 console.log(`✅ Split complete: ${subfsRecords.length} subfs records, ${currentFileCount} files in main, ${(manifestSize / 1024).toFixed(1)}KB manifest`); 536 logger.info(`Split into ${subfsRecords.length} subfs records, ${currentFileCount} files remaining in main tree`); 537 } else { 538 const manifestSizeKB = (manifestSize / 1024).toFixed(1); 539 console.log(`Manifest created (${fileCount} files, ${manifestSizeKB}KB JSON) - no splitting needed`); 540 } 541 542 const rkey = siteName; 543 updateJobProgress(jobId, { phase: 'finalizing' }); 544 545 console.log('Putting record to PDS with rkey:', rkey); 546 const record = await agent.com.atproto.repo.putRecord({ 547 repo: did, 548 collection: 'place.wisp.fs', 549 rkey: rkey, 550 record: manifest 551 }); 552 console.log('Record successfully created on PDS:', record.data.uri); 553 554 // Store site in database cache 555 await upsertSite(did, rkey, siteName); 556 557 // Clean up old subfs records if we had any 558 if (oldSubfsUris.length > 0) { 559 console.log(`Cleaning up ${oldSubfsUris.length} old subfs records...`); 560 logger.info(`Cleaning up ${oldSubfsUris.length} old subfs records`); 561 562 // Delete old subfs records in parallel (don't wait for completion) 563 Promise.all( 564 oldSubfsUris.map(async ({ uri }) => { 565 try { 566 // Parse URI: at://did/collection/rkey 567 const parts = uri.replace('at://', '').split('/'); 568 const subRkey = parts[2]; 569 570 await agent.com.atproto.repo.deleteRecord({ 571 repo: did, 572 collection: 'place.wisp.subfs', 573 rkey: subRkey 574 }); 575 576 console.log(` 🗑️ Deleted old subfs: ${uri}`); 577 logger.info(`Deleted old subfs record: ${uri}`); 578 } catch (err: any) { 579 // Don't fail the whole upload if cleanup fails 580 console.warn(`Failed to delete old subfs ${uri}:`, err?.message); 581 logger.warn(`Failed to delete old subfs ${uri}`, err); 582 } 583 }) 584 ).catch(err => { 585 // Log but don't fail if cleanup fails 586 logger.warn('Some subfs cleanup operations failed', err); 587 }); 588 } 589 590 completeUploadJob(jobId, { 591 success: true, 592 uri: record.data.uri, 593 cid: record.data.cid, 594 fileCount, 595 siteName, 596 skippedFiles, 597 uploadedCount: validUploadedFiles.length 598 }); 599 600 console.log('=== UPLOAD FILES COMPLETE ==='); 601 } catch (error) { 602 console.error('=== UPLOAD ERROR ==='); 603 console.error('Error details:', error); 604 logger.error('Upload error', error); 605 failUploadJob(jobId, error instanceof Error ? error.message : 'Unknown error'); 606 } 607} 608 609export const wispRoutes = (client: NodeOAuthClient, cookieSecret: string) => 610 new Elysia({ 611 prefix: '/wisp', 612 cookie: { 613 secrets: cookieSecret, 614 sign: ['did'] 615 } 616 }) 617 .derive(async ({ cookie }) => { 618 const auth = await requireAuth(client, cookie) 619 return { auth } 620 }) 621 .get( 622 '/upload-progress/:jobId', 623 async ({ params: { jobId }, auth, set }) => { 624 const job = getUploadJob(jobId); 625 626 if (!job) { 627 set.status = 404; 628 return { error: 'Job not found' }; 629 } 630 631 // Verify job belongs to authenticated user 632 if (job.did !== auth.did) { 633 set.status = 403; 634 return { error: 'Unauthorized' }; 635 } 636 637 // Set up SSE headers 638 set.headers = { 639 'Content-Type': 'text/event-stream', 640 'Cache-Control': 'no-cache', 641 'Connection': 'keep-alive' 642 }; 643 644 const stream = new ReadableStream({ 645 start(controller) { 646 const encoder = new TextEncoder(); 647 648 // Send initial state 649 const sendEvent = (event: string, data: any) => { 650 try { 651 const message = `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`; 652 controller.enqueue(encoder.encode(message)); 653 } catch (err) { 654 // Controller closed, ignore 655 } 656 }; 657 658 // Send keepalive comment every 15 seconds to prevent timeout 659 const keepaliveInterval = setInterval(() => { 660 try { 661 controller.enqueue(encoder.encode(': keepalive\n\n')); 662 } catch (err) { 663 // Controller closed, stop sending keepalives 664 clearInterval(keepaliveInterval); 665 } 666 }, 15000); 667 668 // Send current job state immediately 669 sendEvent('progress', { 670 status: job.status, 671 progress: job.progress, 672 result: job.result, 673 error: job.error 674 }); 675 676 // If job is already completed or failed, close the stream 677 if (job.status === 'completed' || job.status === 'failed') { 678 clearInterval(keepaliveInterval); 679 controller.close(); 680 return; 681 } 682 683 // Listen for updates 684 const cleanup = addJobListener(jobId, (event, data) => { 685 sendEvent(event, data); 686 687 // Close stream after done or error event 688 if (event === 'done' || event === 'error') { 689 clearInterval(keepaliveInterval); 690 setTimeout(() => { 691 try { 692 controller.close(); 693 } catch (err) { 694 // Already closed 695 } 696 }, 100); 697 } 698 }); 699 700 // Cleanup on disconnect 701 return () => { 702 clearInterval(keepaliveInterval); 703 cleanup(); 704 }; 705 } 706 }); 707 708 return new Response(stream); 709 } 710 ) 711 .post( 712 '/upload-files', 713 async ({ body, auth }) => { 714 const { siteName, files } = body as { 715 siteName: string; 716 files: File | File[] 717 }; 718 719 console.log('=== UPLOAD FILES START ==='); 720 console.log('Site name:', siteName); 721 console.log('Files received:', Array.isArray(files) ? files.length : 'single file'); 722 723 try { 724 if (!siteName) { 725 throw new Error('Site name is required') 726 } 727 728 if (!isValidSiteName(siteName)) { 729 throw new Error('Invalid site name: must be 1-512 characters and contain only alphanumeric, dots, dashes, underscores, tildes, and colons') 730 } 731 732 // Check if files were provided 733 const hasFiles = files && (Array.isArray(files) ? files.length > 0 : !!files); 734 735 if (!hasFiles) { 736 // Handle empty upload synchronously (fast operation) 737 const agent = new Agent((url, init) => auth.session.fetchHandler(url, init)) 738 739 const emptyManifest = { 740 $type: 'place.wisp.fs', 741 site: siteName, 742 root: { 743 type: 'directory', 744 entries: [] 745 }, 746 fileCount: 0, 747 createdAt: new Date().toISOString() 748 }; 749 750 const validationResult = validateRecord(emptyManifest); 751 if (!validationResult.success) { 752 throw new Error(`Invalid manifest: ${validationResult.error?.message || 'Validation failed'}`); 753 } 754 755 const rkey = siteName; 756 757 const record = await agent.com.atproto.repo.putRecord({ 758 repo: auth.did, 759 collection: 'place.wisp.fs', 760 rkey: rkey, 761 record: emptyManifest 762 }); 763 764 await upsertSite(auth.did, rkey, siteName); 765 766 return { 767 success: true, 768 uri: record.data.uri, 769 cid: record.data.cid, 770 fileCount: 0, 771 siteName 772 }; 773 } 774 775 // For file uploads, create a job and process in background 776 const fileArray = Array.isArray(files) ? files : [files]; 777 const jobId = createUploadJob(auth.did, siteName, fileArray.length); 778 779 // Create agent with OAuth session 780 const agent = new Agent((url, init) => auth.session.fetchHandler(url, init)) 781 console.log('Agent created for DID:', auth.did); 782 console.log('Created upload job:', jobId); 783 784 // Start background processing (don't await) 785 processUploadInBackground(jobId, agent, auth.did, siteName, fileArray).catch(err => { 786 console.error('Background upload process failed:', err); 787 logger.error('Background upload process failed', err); 788 }); 789 790 // Return immediately with job ID 791 return { 792 success: true, 793 jobId, 794 message: 'Upload started. Connect to /wisp/upload-progress/' + jobId + ' for progress updates.' 795 }; 796 } catch (error) { 797 console.error('=== UPLOAD ERROR ==='); 798 console.error('Error details:', error); 799 logger.error('Upload error', error); 800 throw new Error(`Failed to upload files: ${error instanceof Error ? error.message : 'Unknown error'}`); 801 } 802 } 803 )