Monorepo for wisp.place. A static site hosting service built on top of the AT Protocol. wisp.place
1import { Elysia } from 'elysia' 2import { requireAuth, type AuthenticatedContext } from '../lib/wisp-auth' 3import { NodeOAuthClient } from '@atproto/oauth-client-node' 4import { Agent } from '@atproto/api' 5import { TID } from '@atproto/common-web' 6import { 7 type UploadedFile, 8 type FileUploadResult, 9 processUploadedFiles, 10 createManifest, 11 updateFileBlobs, 12 shouldCompressFile, 13 compressFile, 14 computeCID, 15 extractBlobMap, 16 extractSubfsUris, 17 findLargeDirectories, 18 replaceDirectoryWithSubfs, 19 estimateDirectorySize 20} from '../lib/wisp-utils' 21import { upsertSite } from '../lib/db' 22import { logger } from '../lib/observability' 23import { validateRecord } from '../lexicons/types/place/wisp/fs' 24import { validateRecord as validateSubfsRecord } from '../lexicons/types/place/wisp/subfs' 25import { MAX_SITE_SIZE, MAX_FILE_SIZE, MAX_FILE_COUNT } from '../lib/constants' 26import { 27 createUploadJob, 28 getUploadJob, 29 updateJobProgress, 30 completeUploadJob, 31 failUploadJob, 32 addJobListener 33} from '../lib/upload-jobs' 34 35function isValidSiteName(siteName: string): boolean { 36 if (!siteName || typeof siteName !== 'string') return false; 37 38 // Length check (AT Protocol rkey limit) 39 if (siteName.length < 1 || siteName.length > 512) return false; 40 41 // Check for path traversal 42 if (siteName === '.' || siteName === '..') return false; 43 if (siteName.includes('/') || siteName.includes('\\')) return false; 44 if (siteName.includes('\0')) return false; 45 46 // AT Protocol rkey format: alphanumeric, dots, dashes, underscores, tildes, colons 47 // Based on NSID format rules 48 const validRkeyPattern = /^[a-zA-Z0-9._~:-]+$/; 49 if (!validRkeyPattern.test(siteName)) return false; 50 51 return true; 52} 53 54async function processUploadInBackground( 55 jobId: string, 56 agent: Agent, 57 did: string, 58 siteName: string, 59 fileArray: File[] 60): Promise<void> { 61 try { 62 // Try to fetch existing record to enable incremental updates 63 let existingBlobMap = new Map<string, { blobRef: any; cid: string }>(); 64 let oldSubfsUris: Array<{ uri: string; path: string }> = []; 65 console.log('Attempting to fetch existing record...'); 66 updateJobProgress(jobId, { phase: 'validating' }); 67 68 try { 69 const rkey = siteName; 70 const existingRecord = await agent.com.atproto.repo.getRecord({ 71 repo: did, 72 collection: 'place.wisp.fs', 73 rkey: rkey 74 }); 75 console.log('Existing record found!'); 76 77 if (existingRecord.data.value && typeof existingRecord.data.value === 'object' && 'root' in existingRecord.data.value) { 78 const manifest = existingRecord.data.value as any; 79 80 // Extract blob map from main record 81 existingBlobMap = extractBlobMap(manifest.root); 82 console.log(`Found existing manifest with ${existingBlobMap.size} files in main record`); 83 84 // Extract subfs URIs with their mount paths from main record 85 const subfsUris = extractSubfsUris(manifest.root); 86 oldSubfsUris = subfsUris; // Save for cleanup later 87 88 if (subfsUris.length > 0) { 89 console.log(`Found ${subfsUris.length} subfs records, fetching in parallel...`); 90 logger.info(`Fetching ${subfsUris.length} subfs records for blob reuse`); 91 92 // Fetch all subfs records in parallel 93 const subfsRecords = await Promise.all( 94 subfsUris.map(async ({ uri, path }) => { 95 try { 96 // Parse URI: at://did/collection/rkey 97 const parts = uri.replace('at://', '').split('/'); 98 const subDid = parts[0]; 99 const collection = parts[1]; 100 const subRkey = parts[2]; 101 102 const record = await agent.com.atproto.repo.getRecord({ 103 repo: subDid, 104 collection: collection, 105 rkey: subRkey 106 }); 107 108 return { record: record.data.value as any, mountPath: path }; 109 } catch (err: any) { 110 logger.warn(`Failed to fetch subfs record ${uri}: ${err?.message}`, err); 111 return null; 112 } 113 }) 114 ); 115 116 // Merge blob maps from all subfs records 117 let totalSubfsBlobs = 0; 118 for (const subfsData of subfsRecords) { 119 if (subfsData && subfsData.record && 'root' in subfsData.record) { 120 // Extract blobs with the correct mount path prefix 121 const subfsMap = extractBlobMap(subfsData.record.root, subfsData.mountPath); 122 subfsMap.forEach((value, key) => { 123 existingBlobMap.set(key, value); 124 totalSubfsBlobs++; 125 }); 126 } 127 } 128 129 console.log(`Merged ${totalSubfsBlobs} files from ${subfsUris.length} subfs records`); 130 logger.info(`Total blob map: ${existingBlobMap.size} files (main + subfs)`); 131 } 132 133 console.log(`Total existing blobs for reuse: ${existingBlobMap.size} files`); 134 logger.info(`Found existing manifest with ${existingBlobMap.size} files for incremental update`); 135 } 136 } catch (error: any) { 137 console.log('No existing record found or error:', error?.message || error); 138 if (error?.status !== 400 && error?.error !== 'RecordNotFound') { 139 logger.warn('Failed to fetch existing record, proceeding with full upload', error); 140 } 141 } 142 143 // Convert File objects to UploadedFile format 144 const uploadedFiles: UploadedFile[] = []; 145 const skippedFiles: Array<{ name: string; reason: string }> = []; 146 147 console.log('Processing files, count:', fileArray.length); 148 updateJobProgress(jobId, { phase: 'compressing' }); 149 150 for (let i = 0; i < fileArray.length; i++) { 151 const file = fileArray[i]; 152 153 // Skip undefined/null files 154 if (!file || !file.name) { 155 console.log(`Skipping undefined file at index ${i}`); 156 skippedFiles.push({ 157 name: `[undefined file at index ${i}]`, 158 reason: 'Invalid file object' 159 }); 160 continue; 161 } 162 163 console.log(`Processing file ${i + 1}/${fileArray.length}:`, file.name, file.size, 'bytes'); 164 updateJobProgress(jobId, { 165 filesProcessed: i + 1, 166 currentFile: file.name 167 }); 168 169 // Skip unwanted files and directories 170 const normalizedPath = file.name.replace(/^[^\/]*\//, ''); 171 const fileName = normalizedPath.split('/').pop() || ''; 172 const pathParts = normalizedPath.split('/'); 173 174 // .git directory (version control - thousands of files) 175 if (normalizedPath.startsWith('.git/') || normalizedPath === '.git') { 176 console.log(`Skipping .git file: ${file.name}`); 177 skippedFiles.push({ 178 name: file.name, 179 reason: '.git directory excluded' 180 }); 181 continue; 182 } 183 184 // .DS_Store (macOS metadata - can leak info) 185 if (fileName === '.DS_Store') { 186 console.log(`Skipping .DS_Store file: ${file.name}`); 187 skippedFiles.push({ 188 name: file.name, 189 reason: '.DS_Store file excluded' 190 }); 191 continue; 192 } 193 194 // .env files (environment variables with secrets) 195 if (fileName.startsWith('.env')) { 196 console.log(`Skipping .env file: ${file.name}`); 197 skippedFiles.push({ 198 name: file.name, 199 reason: 'environment files excluded for security' 200 }); 201 continue; 202 } 203 204 // node_modules (dependency folder - can be 100,000+ files) 205 if (pathParts.includes('node_modules')) { 206 console.log(`Skipping node_modules file: ${file.name}`); 207 skippedFiles.push({ 208 name: file.name, 209 reason: 'node_modules excluded' 210 }); 211 continue; 212 } 213 214 // OS metadata files 215 if (fileName === 'Thumbs.db' || fileName === 'desktop.ini' || fileName.startsWith('._')) { 216 console.log(`Skipping OS metadata file: ${file.name}`); 217 skippedFiles.push({ 218 name: file.name, 219 reason: 'OS metadata file excluded' 220 }); 221 continue; 222 } 223 224 // macOS system directories 225 if (pathParts.includes('.Spotlight-V100') || pathParts.includes('.Trashes') || pathParts.includes('.fseventsd')) { 226 console.log(`Skipping macOS system file: ${file.name}`); 227 skippedFiles.push({ 228 name: file.name, 229 reason: 'macOS system directory excluded' 230 }); 231 continue; 232 } 233 234 // Cache and temp directories 235 if (pathParts.some(part => part === '.cache' || part === '.temp' || part === '.tmp')) { 236 console.log(`Skipping cache/temp file: ${file.name}`); 237 skippedFiles.push({ 238 name: file.name, 239 reason: 'cache/temp directory excluded' 240 }); 241 continue; 242 } 243 244 // Python cache 245 if (pathParts.includes('__pycache__') || fileName.endsWith('.pyc')) { 246 console.log(`Skipping Python cache file: ${file.name}`); 247 skippedFiles.push({ 248 name: file.name, 249 reason: 'Python cache excluded' 250 }); 251 continue; 252 } 253 254 // Python virtual environments 255 if (pathParts.some(part => part === '.venv' || part === 'venv' || part === 'env')) { 256 console.log(`Skipping Python venv file: ${file.name}`); 257 skippedFiles.push({ 258 name: file.name, 259 reason: 'Python virtual environment excluded' 260 }); 261 continue; 262 } 263 264 // Editor swap files 265 if (fileName.endsWith('.swp') || fileName.endsWith('.swo') || fileName.endsWith('~')) { 266 console.log(`Skipping editor swap file: ${file.name}`); 267 skippedFiles.push({ 268 name: file.name, 269 reason: 'editor swap file excluded' 270 }); 271 continue; 272 } 273 274 // Skip files that are too large 275 const maxSize = MAX_FILE_SIZE; 276 if (file.size > maxSize) { 277 skippedFiles.push({ 278 name: file.name, 279 reason: `file too large (${(file.size / 1024 / 1024).toFixed(2)}MB, max 100MB)` 280 }); 281 continue; 282 } 283 284 const arrayBuffer = await file.arrayBuffer(); 285 const originalContent = Buffer.from(arrayBuffer); 286 const originalMimeType = file.type || 'application/octet-stream'; 287 288 // Determine if file should be compressed (pass filename to exclude _redirects) 289 const shouldCompress = shouldCompressFile(originalMimeType, normalizedPath); 290 291 // Text files (HTML/CSS/JS) need base64 encoding to prevent PDS content sniffing 292 // Audio files just need compression without base64 293 const needsBase64 = 294 originalMimeType.startsWith('text/') || 295 originalMimeType.startsWith('application/json') || 296 originalMimeType.startsWith('application/xml') || 297 originalMimeType === 'image/svg+xml'; 298 299 let finalContent: Buffer; 300 let compressed = false; 301 let base64Encoded = false; 302 303 if (shouldCompress) { 304 const compressedContent = compressFile(originalContent); 305 compressed = true; 306 307 if (needsBase64) { 308 // Text files: compress AND base64 encode 309 finalContent = Buffer.from(compressedContent.toString('base64'), 'binary'); 310 base64Encoded = true; 311 const compressionRatio = (compressedContent.length / originalContent.length * 100).toFixed(1); 312 console.log(`Compressing+base64 ${file.name}: ${originalContent.length} -> ${compressedContent.length} bytes (${compressionRatio}%), base64: ${finalContent.length} bytes`); 313 logger.info(`Compressing+base64 ${file.name}: ${originalContent.length} -> ${compressedContent.length} bytes (${compressionRatio}%), base64: ${finalContent.length} bytes`); 314 } else { 315 // Audio files: just compress, no base64 316 finalContent = compressedContent; 317 const compressionRatio = (compressedContent.length / originalContent.length * 100).toFixed(1); 318 console.log(`Compressing ${file.name}: ${originalContent.length} -> ${compressedContent.length} bytes (${compressionRatio}%)`); 319 logger.info(`Compressing ${file.name}: ${originalContent.length} -> ${compressedContent.length} bytes (${compressionRatio}%)`); 320 } 321 } else { 322 // Binary files: upload directly 323 finalContent = originalContent; 324 console.log(`Uploading ${file.name} directly: ${originalContent.length} bytes (no compression)`); 325 logger.info(`Uploading ${file.name} directly: ${originalContent.length} bytes (binary)`); 326 } 327 328 uploadedFiles.push({ 329 name: file.name, 330 content: finalContent, 331 mimeType: originalMimeType, 332 size: finalContent.length, 333 compressed, 334 base64Encoded, 335 originalMimeType 336 }); 337 } 338 339 // Update total file count after filtering (important for progress tracking) 340 updateJobProgress(jobId, { 341 totalFiles: uploadedFiles.length 342 }); 343 344 // Check total size limit 345 const totalSize = uploadedFiles.reduce((sum, file) => sum + file.size, 0); 346 const maxTotalSize = MAX_SITE_SIZE; 347 348 if (totalSize > maxTotalSize) { 349 throw new Error(`Total upload size ${(totalSize / 1024 / 1024).toFixed(2)}MB exceeds 300MB limit`); 350 } 351 352 // Check file count limit 353 if (uploadedFiles.length > MAX_FILE_COUNT) { 354 throw new Error(`File count ${uploadedFiles.length} exceeds ${MAX_FILE_COUNT} files limit`); 355 } 356 357 console.log(`After filtering: ${uploadedFiles.length} files to process (${skippedFiles.length} skipped)`); 358 359 if (uploadedFiles.length === 0) { 360 // Create empty manifest 361 const emptyManifest = { 362 $type: 'place.wisp.fs', 363 site: siteName, 364 root: { 365 type: 'directory', 366 entries: [] 367 }, 368 fileCount: 0, 369 createdAt: new Date().toISOString() 370 }; 371 372 const validationResult = validateRecord(emptyManifest); 373 if (!validationResult.success) { 374 throw new Error(`Invalid manifest: ${validationResult.error?.message || 'Validation failed'}`); 375 } 376 377 const rkey = siteName; 378 updateJobProgress(jobId, { phase: 'finalizing' }); 379 380 const record = await agent.com.atproto.repo.putRecord({ 381 repo: did, 382 collection: 'place.wisp.fs', 383 rkey: rkey, 384 record: emptyManifest 385 }); 386 387 await upsertSite(did, rkey, siteName); 388 389 completeUploadJob(jobId, { 390 success: true, 391 uri: record.data.uri, 392 cid: record.data.cid, 393 fileCount: 0, 394 siteName, 395 skippedFiles 396 }); 397 return; 398 } 399 400 // Process files into directory structure 401 console.log('Processing uploaded files into directory structure...'); 402 const validUploadedFiles = uploadedFiles.filter((f, i) => { 403 if (!f || !f.name || !f.content) { 404 console.error(`Filtering out invalid file at index ${i}`); 405 return false; 406 } 407 return true; 408 }); 409 410 const { directory, fileCount } = processUploadedFiles(validUploadedFiles); 411 console.log('Directory structure created, file count:', fileCount); 412 413 // Upload files as blobs with retry logic for DPoP nonce conflicts 414 console.log('Starting blob upload/reuse phase...'); 415 updateJobProgress(jobId, { phase: 'uploading' }); 416 417 // Helper function to upload blob with exponential backoff retry and timeout 418 const uploadBlobWithRetry = async ( 419 agent: Agent, 420 content: Buffer, 421 mimeType: string, 422 fileName: string, 423 maxRetries = 5 424 ) => { 425 for (let attempt = 0; attempt < maxRetries; attempt++) { 426 const controller = new AbortController(); 427 const timeoutMs = 300000; // 5 minute timeout per upload 428 const timeoutId = setTimeout(() => controller.abort(), timeoutMs); 429 430 try { 431 console.log(`[File Upload] Starting upload attempt ${attempt + 1}/${maxRetries} for ${fileName} (${content.length} bytes, ${mimeType})`); 432 433 const result = await agent.com.atproto.repo.uploadBlob(content, { encoding: mimeType }); 434 clearTimeout(timeoutId); 435 console.log(`[File Upload] ✅ Successfully uploaded ${fileName} on attempt ${attempt + 1}`); 436 return result; 437 } catch (error: any) { 438 clearTimeout(timeoutId); 439 440 const isDPoPNonceError = 441 error?.message?.toLowerCase().includes('nonce') || 442 error?.message?.toLowerCase().includes('dpop') || 443 error?.status === 409; 444 445 const isTimeout = error?.name === 'AbortError' || error?.message === 'Upload timeout'; 446 const isRateLimited = error?.status === 429 || error?.message?.toLowerCase().includes('rate'); 447 const isRequestEntityTooLarge = error?.status === 419 || error?.status === 413; 448 449 // Special handling for 419/413 Request Entity Too Large errors 450 if (isRequestEntityTooLarge) { 451 const customError = new Error('Your PDS is not allowing uploads large enough to store your site. Please contact your PDS host. This could also possibly be a result of it being behind Cloudflare free tier.'); 452 (customError as any).status = 419; 453 throw customError; 454 } 455 456 // Retry on DPoP nonce conflicts, timeouts, or rate limits 457 if ((isDPoPNonceError || isTimeout || isRateLimited) && attempt < maxRetries - 1) { 458 let backoffMs: number; 459 if (isRateLimited) { 460 backoffMs = 2000 * Math.pow(2, attempt); // 2s, 4s, 8s, 16s for rate limits 461 } else if (isTimeout) { 462 backoffMs = 1000 * Math.pow(2, attempt); // 1s, 2s, 4s, 8s for timeouts 463 } else { 464 backoffMs = 100 * Math.pow(2, attempt); // 100ms, 200ms, 400ms for DPoP 465 } 466 467 const reason = isDPoPNonceError ? 'DPoP nonce conflict' : isTimeout ? 'timeout' : 'rate limit'; 468 logger.info(`[File Upload] 🔄 ${reason} for ${fileName}, retrying in ${backoffMs}ms (attempt ${attempt + 1}/${maxRetries})`); 469 console.log(`[File Upload] 🔄 ${reason} for ${fileName}, retrying in ${backoffMs}ms`); 470 await new Promise(resolve => setTimeout(resolve, backoffMs)); 471 continue; 472 } 473 474 // Log detailed error information before throwing 475 logger.error(`[File Upload] ❌ Upload failed for ${fileName} (size: ${content.length} bytes, mimeType: ${mimeType}, attempt: ${attempt + 1}/${maxRetries})`, { 476 error: error?.error || error?.message || 'Unknown error', 477 status: error?.status, 478 headers: error?.headers, 479 success: error?.success 480 }); 481 console.error(`[File Upload] ❌ Upload failed for ${fileName}:`, { 482 error: error?.error || error?.message || 'Unknown error', 483 status: error?.status, 484 size: content.length, 485 mimeType, 486 attempt: attempt + 1 487 }); 488 throw error; 489 } 490 } 491 throw new Error(`Failed to upload ${fileName} after ${maxRetries} attempts`); 492 }; 493 494 // Use sliding window concurrency for maximum throughput 495 const CONCURRENCY_LIMIT = 20; // Maximum concurrent uploads 496 const uploadedBlobs: Array<{ 497 result: FileUploadResult; 498 filePath: string; 499 sentMimeType: string; 500 returnedMimeType: string; 501 reused: boolean; 502 }> = []; 503 const failedFiles: Array<{ 504 name: string; 505 index: number; 506 error: string; 507 size: number; 508 }> = []; 509 510 // Track completed files count for accurate progress 511 let completedFilesCount = 0; 512 513 // Process file with sliding window concurrency 514 const processFile = async (file: UploadedFile, index: number) => { 515 try { 516 if (!file || !file.name) { 517 throw new Error(`Undefined file at index ${index}`); 518 } 519 520 const fileCID = computeCID(file.content); 521 const normalizedPath = file.name.replace(/^[^\/]*\//, ''); 522 const existingBlob = existingBlobMap.get(normalizedPath) || existingBlobMap.get(file.name); 523 524 if (existingBlob && existingBlob.cid === fileCID) { 525 logger.info(`[File Upload] ♻️ Reused: ${file.name} (unchanged, CID: ${fileCID})`); 526 const reusedCount = (getUploadJob(jobId)?.progress.filesReused || 0) + 1; 527 completedFilesCount++; 528 updateJobProgress(jobId, { 529 filesReused: reusedCount, 530 currentFile: `${completedFilesCount}/${validUploadedFiles.length}: ${file.name} (reused)` 531 }); 532 533 return { 534 result: { 535 hash: existingBlob.cid, 536 blobRef: existingBlob.blobRef, 537 ...(file.compressed && { 538 encoding: 'gzip' as const, 539 mimeType: file.originalMimeType || file.mimeType, 540 base64: file.base64Encoded || false 541 }) 542 }, 543 filePath: file.name, 544 sentMimeType: file.mimeType, 545 returnedMimeType: existingBlob.blobRef.mimeType, 546 reused: true 547 }; 548 } 549 550 const uploadMimeType = file.compressed || file.mimeType.startsWith('text/html') 551 ? 'application/octet-stream' 552 : file.mimeType; 553 554 const compressionInfo = file.compressed ? ' (gzipped)' : ''; 555 const fileSizeMB = (file.size / 1024 / 1024).toFixed(2); 556 logger.info(`[File Upload] ⬆️ Uploading: ${file.name} (${fileSizeMB}MB${compressionInfo})`); 557 558 const uploadResult = await uploadBlobWithRetry( 559 agent, 560 file.content, 561 uploadMimeType, 562 file.name 563 ); 564 565 const returnedBlobRef = uploadResult.data.blob; 566 const uploadedCount = (getUploadJob(jobId)?.progress.filesUploaded || 0) + 1; 567 completedFilesCount++; 568 updateJobProgress(jobId, { 569 filesUploaded: uploadedCount, 570 currentFile: `${completedFilesCount}/${validUploadedFiles.length}: ${file.name} (uploaded)` 571 }); 572 logger.info(`[File Upload] ✅ Uploaded: ${file.name} (CID: ${fileCID})`); 573 574 return { 575 result: { 576 hash: returnedBlobRef.ref.toString(), 577 blobRef: returnedBlobRef, 578 ...(file.compressed && { 579 encoding: 'gzip' as const, 580 mimeType: file.originalMimeType || file.mimeType, 581 base64: file.base64Encoded || false 582 }) 583 }, 584 filePath: file.name, 585 sentMimeType: file.mimeType, 586 returnedMimeType: returnedBlobRef.mimeType, 587 reused: false 588 }; 589 } catch (uploadError) { 590 const fileName = file?.name || 'unknown'; 591 const fileSize = file?.size || 0; 592 const errorMessage = uploadError instanceof Error ? uploadError.message : 'Unknown error'; 593 const errorDetails = { 594 fileName, 595 fileSize, 596 index, 597 error: errorMessage, 598 stack: uploadError instanceof Error ? uploadError.stack : undefined 599 }; 600 logger.error(`Upload failed for file: ${fileName} (${fileSize} bytes) at index ${index}`, errorDetails); 601 console.error(`Upload failed for file: ${fileName} (${fileSize} bytes) at index ${index}`, errorDetails); 602 603 completedFilesCount++; 604 updateJobProgress(jobId, { 605 currentFile: `${completedFilesCount}/${validUploadedFiles.length}: ${fileName} (failed)` 606 }); 607 608 // Track failed file but don't throw - continue with other files 609 failedFiles.push({ 610 name: fileName, 611 index, 612 error: errorMessage, 613 size: fileSize 614 }); 615 616 return null; // Return null to indicate failure 617 } 618 }; 619 620 // Sliding window concurrency control 621 const processWithConcurrency = async () => { 622 const results: any[] = []; 623 let fileIndex = 0; 624 const executing = new Map<Promise<void>, { index: number; name: string }>(); 625 626 for (const file of validUploadedFiles) { 627 const currentIndex = fileIndex++; 628 629 const promise = processFile(file, currentIndex) 630 .then(result => { 631 results[currentIndex] = result; 632 }) 633 .catch(error => { 634 // This shouldn't happen since processFile catches errors, but just in case 635 logger.error(`Unexpected error processing file at index ${currentIndex}`, error); 636 results[currentIndex] = null; 637 }) 638 .finally(() => { 639 executing.delete(promise); 640 }); 641 642 executing.set(promise, { index: currentIndex, name: file.name }); 643 644 if (executing.size >= CONCURRENCY_LIMIT) { 645 await Promise.race(executing.keys()); 646 } 647 } 648 649 // Wait for remaining uploads 650 await Promise.all(executing.keys()); 651 console.log(`\n✅ Upload complete: ${completedFilesCount}/${validUploadedFiles.length} files processed\n`); 652 return results.filter(r => r !== undefined && r !== null); // Filter out null (failed) and undefined entries 653 }; 654 655 const allResults = await processWithConcurrency(); 656 uploadedBlobs.push(...allResults); 657 658 const currentReused = uploadedBlobs.filter(b => b.reused).length; 659 const currentUploaded = uploadedBlobs.filter(b => !b.reused).length; 660 const successfulCount = uploadedBlobs.length; 661 const failedCount = failedFiles.length; 662 663 logger.info(`[File Upload] 🎉 Upload complete → ${successfulCount}/${validUploadedFiles.length} files succeeded (${currentUploaded} uploaded, ${currentReused} reused), ${failedCount} failed`); 664 665 if (failedCount > 0) { 666 logger.warn(`[File Upload] ⚠️ Failed files:`, failedFiles); 667 console.warn(`[File Upload] ⚠️ ${failedCount} files failed to upload:`, failedFiles.map(f => f.name).join(', ')); 668 } 669 670 const reusedCount = uploadedBlobs.filter(b => b.reused).length; 671 const uploadedCount = uploadedBlobs.filter(b => !b.reused).length; 672 logger.info(`[File Upload] 🎉 Upload phase complete! Total: ${successfulCount} files (${uploadedCount} uploaded, ${reusedCount} reused)`); 673 674 const uploadResults: FileUploadResult[] = uploadedBlobs.map(blob => blob.result); 675 const filePaths: string[] = uploadedBlobs.map(blob => blob.filePath); 676 677 // Update directory with file blobs (only for successfully uploaded files) 678 console.log('Updating directory with blob references...'); 679 updateJobProgress(jobId, { phase: 'creating_manifest' }); 680 681 // Create a set of successfully uploaded paths for quick lookup 682 const successfulPaths = new Set(filePaths.map(path => path.replace(/^[^\/]*\//, ''))); 683 684 const updatedDirectory = updateFileBlobs(directory, uploadResults, filePaths, '', successfulPaths); 685 686 // Calculate actual file count (only successfully uploaded files) 687 const actualFileCount = uploadedBlobs.length; 688 689 // Check if we need to split into subfs records 690 // Split proactively if we have lots of files to avoid hitting manifest size limits 691 const MAX_MANIFEST_SIZE = 140 * 1024; // 140KB to be safe (PDS limit is 150KB) 692 const FILE_COUNT_THRESHOLD = 250; // Start splitting at this many files 693 const TARGET_FILE_COUNT = 200; // Try to keep main manifest under this many files 694 const subfsRecords: Array<{ uri: string; path: string }> = []; 695 let workingDirectory = updatedDirectory; 696 let currentFileCount = actualFileCount; 697 698 // Create initial manifest to check size 699 let manifest = createManifest(siteName, workingDirectory, actualFileCount); 700 let manifestSize = JSON.stringify(manifest).length; 701 702 // Split if we have lots of files OR if manifest is already too large 703 if (actualFileCount >= FILE_COUNT_THRESHOLD || manifestSize > MAX_MANIFEST_SIZE) { 704 console.log(`⚠️ Large site detected (${actualFileCount} files, ${(manifestSize / 1024).toFixed(1)}KB), splitting into subfs records...`); 705 logger.info(`Large site with ${actualFileCount} files, splitting into subfs records`); 706 707 // Keep splitting until manifest fits under limits (both size and file count) 708 let attempts = 0; 709 const MAX_ATTEMPTS = 100; // Allow many splits for very large sites 710 711 while ((manifestSize > MAX_MANIFEST_SIZE || currentFileCount > TARGET_FILE_COUNT) && attempts < MAX_ATTEMPTS) { 712 attempts++; 713 714 // Find all directories sorted by size (largest first) 715 const directories = findLargeDirectories(workingDirectory); 716 directories.sort((a, b) => b.size - a.size); 717 718 // Check if we can split subdirectories or need to split flat files 719 if (directories.length > 0) { 720 // Split the largest subdirectory 721 const largestDir = directories[0]; 722 console.log(` Split #${attempts}: ${largestDir.path} (${largestDir.fileCount} files, ${(largestDir.size / 1024).toFixed(1)}KB)`); 723 724 // Create a subfs record for this directory 725 const subfsRkey = TID.nextStr(); 726 const subfsManifest = { 727 $type: 'place.wisp.subfs' as const, 728 root: largestDir.directory, 729 fileCount: largestDir.fileCount, 730 createdAt: new Date().toISOString() 731 }; 732 733 // Validate subfs record 734 const subfsValidation = validateSubfsRecord(subfsManifest); 735 if (!subfsValidation.success) { 736 throw new Error(`Invalid subfs manifest: ${subfsValidation.error?.message || 'Validation failed'}`); 737 } 738 739 // Upload subfs record to PDS 740 const subfsRecord = await agent.com.atproto.repo.putRecord({ 741 repo: did, 742 collection: 'place.wisp.subfs', 743 rkey: subfsRkey, 744 record: subfsManifest 745 }); 746 747 const subfsUri = subfsRecord.data.uri; 748 subfsRecords.push({ uri: subfsUri, path: largestDir.path }); 749 console.log(` ✅ Created subfs: ${subfsUri}`); 750 logger.info(`Created subfs record for ${largestDir.path}: ${subfsUri}`); 751 752 // Replace directory with subfs node in the main tree 753 workingDirectory = replaceDirectoryWithSubfs(workingDirectory, largestDir.path, subfsUri); 754 currentFileCount -= largestDir.fileCount; 755 } else { 756 // No subdirectories - split flat files at root level 757 const rootFiles = workingDirectory.entries.filter(e => 'type' in e.node && e.node.type === 'file'); 758 759 if (rootFiles.length === 0) { 760 throw new Error( 761 `Cannot split manifest further - no files or directories available. ` + 762 `Current: ${currentFileCount} files, ${(manifestSize / 1024).toFixed(1)}KB.` 763 ); 764 } 765 766 // Take a chunk of files (aim for ~100 files per chunk) 767 const CHUNK_SIZE = 100; 768 const chunkFiles = rootFiles.slice(0, Math.min(CHUNK_SIZE, rootFiles.length)); 769 console.log(` Split #${attempts}: flat root (${chunkFiles.length} files)`); 770 771 // Create a directory with just these files 772 const chunkDirectory: Directory = { 773 $type: 'place.wisp.fs#directory' as const, 774 type: 'directory' as const, 775 entries: chunkFiles 776 }; 777 778 // Create subfs record for this chunk 779 const subfsRkey = TID.nextStr(); 780 const subfsManifest = { 781 $type: 'place.wisp.subfs' as const, 782 root: chunkDirectory, 783 fileCount: chunkFiles.length, 784 createdAt: new Date().toISOString() 785 }; 786 787 // Validate subfs record 788 const subfsValidation = validateSubfsRecord(subfsManifest); 789 if (!subfsValidation.success) { 790 throw new Error(`Invalid subfs manifest: ${subfsValidation.error?.message || 'Validation failed'}`); 791 } 792 793 // Upload subfs record to PDS 794 const subfsRecord = await agent.com.atproto.repo.putRecord({ 795 repo: did, 796 collection: 'place.wisp.subfs', 797 rkey: subfsRkey, 798 record: subfsManifest 799 }); 800 801 const subfsUri = subfsRecord.data.uri; 802 console.log(` ✅ Created flat subfs: ${subfsUri}`); 803 logger.info(`Created flat subfs record with ${chunkFiles.length} files: ${subfsUri}`); 804 805 // Remove these files from the working directory and add a subfs entry 806 const remainingEntries = workingDirectory.entries.filter( 807 e => !chunkFiles.some(cf => cf.name === e.name) 808 ); 809 810 // Add subfs entry (will be merged flat when expanded) 811 remainingEntries.push({ 812 name: `__subfs_${attempts}`, // Placeholder name, will be merged away 813 node: { 814 $type: 'place.wisp.fs#subfs' as const, 815 type: 'subfs' as const, 816 subject: subfsUri, 817 flat: true // Merge entries directly into parent (default, but explicit for clarity) 818 } 819 }); 820 821 workingDirectory = { 822 $type: 'place.wisp.fs#directory' as const, 823 type: 'directory' as const, 824 entries: remainingEntries 825 }; 826 827 subfsRecords.push({ uri: subfsUri, path: `__subfs_${attempts}` }); 828 currentFileCount -= chunkFiles.length; 829 } 830 831 // Recreate manifest and check new size 832 manifest = createManifest(siteName, workingDirectory, currentFileCount); 833 manifestSize = JSON.stringify(manifest).length; 834 const newSizeKB = (manifestSize / 1024).toFixed(1); 835 console.log(` → Manifest now ${newSizeKB}KB with ${currentFileCount} files (${subfsRecords.length} subfs total)`); 836 837 // Check if we're under both limits now 838 if (manifestSize <= MAX_MANIFEST_SIZE && currentFileCount <= TARGET_FILE_COUNT) { 839 console.log(` ✅ Manifest fits! (${currentFileCount} files, ${newSizeKB}KB)`); 840 break; 841 } 842 } 843 844 if (manifestSize > MAX_MANIFEST_SIZE || currentFileCount > TARGET_FILE_COUNT) { 845 throw new Error( 846 `Failed to fit manifest after splitting ${attempts} directories. ` + 847 `Current: ${currentFileCount} files, ${(manifestSize / 1024).toFixed(1)}KB. ` + 848 `This should never happen - please report this issue.` 849 ); 850 } 851 852 console.log(`✅ Split complete: ${subfsRecords.length} subfs records, ${currentFileCount} files in main, ${(manifestSize / 1024).toFixed(1)}KB manifest`); 853 logger.info(`Split into ${subfsRecords.length} subfs records, ${currentFileCount} files remaining in main tree`); 854 } else { 855 const manifestSizeKB = (manifestSize / 1024).toFixed(1); 856 console.log(`Manifest created (${fileCount} files, ${manifestSizeKB}KB JSON) - no splitting needed`); 857 } 858 859 const rkey = siteName; 860 updateJobProgress(jobId, { phase: 'finalizing' }); 861 862 console.log('Putting record to PDS with rkey:', rkey); 863 const record = await agent.com.atproto.repo.putRecord({ 864 repo: did, 865 collection: 'place.wisp.fs', 866 rkey: rkey, 867 record: manifest 868 }); 869 console.log('Record successfully created on PDS:', record.data.uri); 870 871 // Store site in database cache 872 await upsertSite(did, rkey, siteName); 873 874 // Clean up old subfs records if we had any 875 if (oldSubfsUris.length > 0) { 876 console.log(`Cleaning up ${oldSubfsUris.length} old subfs records...`); 877 logger.info(`Cleaning up ${oldSubfsUris.length} old subfs records`); 878 879 // Delete old subfs records in parallel (don't wait for completion) 880 Promise.all( 881 oldSubfsUris.map(async ({ uri }) => { 882 try { 883 // Parse URI: at://did/collection/rkey 884 const parts = uri.replace('at://', '').split('/'); 885 const subRkey = parts[2]; 886 887 await agent.com.atproto.repo.deleteRecord({ 888 repo: did, 889 collection: 'place.wisp.subfs', 890 rkey: subRkey 891 }); 892 893 console.log(` 🗑️ Deleted old subfs: ${uri}`); 894 logger.info(`Deleted old subfs record: ${uri}`); 895 } catch (err: any) { 896 // Don't fail the whole upload if cleanup fails 897 console.warn(`Failed to delete old subfs ${uri}:`, err?.message); 898 logger.warn(`Failed to delete old subfs ${uri}`, err); 899 } 900 }) 901 ).catch(err => { 902 // Log but don't fail if cleanup fails 903 logger.warn('Some subfs cleanup operations failed', err); 904 }); 905 } 906 907 completeUploadJob(jobId, { 908 success: true, 909 uri: record.data.uri, 910 cid: record.data.cid, 911 fileCount, 912 siteName, 913 skippedFiles, 914 failedFiles, 915 uploadedCount: validUploadedFiles.length - failedFiles.length, 916 hasFailures: failedFiles.length > 0 917 }); 918 919 console.log('=== UPLOAD FILES COMPLETE ==='); 920 } catch (error) { 921 console.error('=== UPLOAD ERROR ==='); 922 console.error('Error details:', error); 923 logger.error('Upload error', error); 924 failUploadJob(jobId, error instanceof Error ? error.message : 'Unknown error'); 925 } 926} 927 928export const wispRoutes = (client: NodeOAuthClient, cookieSecret: string) => 929 new Elysia({ 930 prefix: '/wisp', 931 cookie: { 932 secrets: cookieSecret, 933 sign: ['did'] 934 } 935 }) 936 .derive(async ({ cookie }) => { 937 const auth = await requireAuth(client, cookie) 938 return { auth } 939 }) 940 .get( 941 '/upload-progress/:jobId', 942 async ({ params: { jobId }, auth, set }) => { 943 const job = getUploadJob(jobId); 944 945 if (!job) { 946 set.status = 404; 947 return { error: 'Job not found' }; 948 } 949 950 // Verify job belongs to authenticated user 951 if (job.did !== auth.did) { 952 set.status = 403; 953 return { error: 'Unauthorized' }; 954 } 955 956 // Set up SSE headers 957 set.headers = { 958 'Content-Type': 'text/event-stream', 959 'Cache-Control': 'no-cache', 960 'Connection': 'keep-alive' 961 }; 962 963 const stream = new ReadableStream({ 964 start(controller) { 965 const encoder = new TextEncoder(); 966 967 // Send initial state 968 const sendEvent = (event: string, data: any) => { 969 try { 970 const message = `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`; 971 controller.enqueue(encoder.encode(message)); 972 } catch (err) { 973 // Controller closed, ignore 974 } 975 }; 976 977 // Send keepalive comment every 15 seconds to prevent timeout 978 const keepaliveInterval = setInterval(() => { 979 try { 980 controller.enqueue(encoder.encode(': keepalive\n\n')); 981 } catch (err) { 982 // Controller closed, stop sending keepalives 983 clearInterval(keepaliveInterval); 984 } 985 }, 15000); 986 987 // Send current job state immediately 988 sendEvent('progress', { 989 status: job.status, 990 progress: job.progress, 991 result: job.result, 992 error: job.error 993 }); 994 995 // If job is already completed or failed, close the stream 996 if (job.status === 'completed' || job.status === 'failed') { 997 clearInterval(keepaliveInterval); 998 controller.close(); 999 return; 1000 } 1001 1002 // Listen for updates 1003 const cleanup = addJobListener(jobId, (event, data) => { 1004 sendEvent(event, data); 1005 1006 // Close stream after done or error event 1007 if (event === 'done' || event === 'error') { 1008 clearInterval(keepaliveInterval); 1009 setTimeout(() => { 1010 try { 1011 controller.close(); 1012 } catch (err) { 1013 // Already closed 1014 } 1015 }, 100); 1016 } 1017 }); 1018 1019 // Cleanup on disconnect 1020 return () => { 1021 clearInterval(keepaliveInterval); 1022 cleanup(); 1023 }; 1024 } 1025 }); 1026 1027 return new Response(stream); 1028 } 1029 ) 1030 .post( 1031 '/upload-files', 1032 async ({ body, auth }) => { 1033 const { siteName, files } = body as { 1034 siteName: string; 1035 files: File | File[] 1036 }; 1037 1038 console.log('=== UPLOAD FILES START ==='); 1039 console.log('Site name:', siteName); 1040 console.log('Files received:', Array.isArray(files) ? files.length : 'single file'); 1041 1042 try { 1043 if (!siteName) { 1044 throw new Error('Site name is required') 1045 } 1046 1047 if (!isValidSiteName(siteName)) { 1048 throw new Error('Invalid site name: must be 1-512 characters and contain only alphanumeric, dots, dashes, underscores, tildes, and colons') 1049 } 1050 1051 // Check if files were provided 1052 const hasFiles = files && (Array.isArray(files) ? files.length > 0 : !!files); 1053 1054 if (!hasFiles) { 1055 // Handle empty upload synchronously (fast operation) 1056 const agent = new Agent((url, init) => auth.session.fetchHandler(url, init)) 1057 1058 const emptyManifest = { 1059 $type: 'place.wisp.fs', 1060 site: siteName, 1061 root: { 1062 type: 'directory', 1063 entries: [] 1064 }, 1065 fileCount: 0, 1066 createdAt: new Date().toISOString() 1067 }; 1068 1069 const validationResult = validateRecord(emptyManifest); 1070 if (!validationResult.success) { 1071 throw new Error(`Invalid manifest: ${validationResult.error?.message || 'Validation failed'}`); 1072 } 1073 1074 const rkey = siteName; 1075 1076 const record = await agent.com.atproto.repo.putRecord({ 1077 repo: auth.did, 1078 collection: 'place.wisp.fs', 1079 rkey: rkey, 1080 record: emptyManifest 1081 }); 1082 1083 await upsertSite(auth.did, rkey, siteName); 1084 1085 return { 1086 success: true, 1087 uri: record.data.uri, 1088 cid: record.data.cid, 1089 fileCount: 0, 1090 siteName 1091 }; 1092 } 1093 1094 // For file uploads, create a job and process in background 1095 const fileArray = Array.isArray(files) ? files : [files]; 1096 const jobId = createUploadJob(auth.did, siteName, fileArray.length); 1097 1098 // Track upload speeds to estimate progress 1099 const uploadStats = { 1100 speeds: [] as number[], // MB/s from completed uploads 1101 getAverageSpeed(): number { 1102 if (this.speeds.length === 0) return 3; // Default 3 MB/s 1103 const sum = this.speeds.reduce((a, b) => a + b, 0); 1104 return sum / this.speeds.length; 1105 } 1106 }; 1107 1108 // Create agent with OAuth session and upload progress monitoring 1109 const wrappedFetchHandler = async (url: string, init?: RequestInit) => { 1110 // Check if this is an uploadBlob request with a body 1111 if (url.includes('uploadBlob') && init?.body) { 1112 const originalBody = init.body; 1113 const bodySize = originalBody instanceof Uint8Array ? originalBody.length : 1114 originalBody instanceof ArrayBuffer ? originalBody.byteLength : 1115 typeof originalBody === 'string' ? new TextEncoder().encode(originalBody).length : 0; 1116 1117 const startTime = Date.now(); 1118 1119 if (bodySize > 10 * 1024 * 1024) { // Files over 10MB 1120 const sizeMB = (bodySize / 1024 / 1024).toFixed(1); 1121 const avgSpeed = uploadStats.getAverageSpeed(); 1122 const estimatedDuration = (bodySize / 1024 / 1024) / avgSpeed; 1123 1124 console.log(`[Upload Progress] Starting upload of ${sizeMB}MB file`); 1125 console.log(`[Upload Stats] Measured speeds from last ${uploadStats.speeds.length} files:`, uploadStats.speeds.map(s => s.toFixed(2) + ' MB/s').join(', ')); 1126 console.log(`[Upload Stats] Average speed: ${avgSpeed.toFixed(2)} MB/s, estimated duration: ${estimatedDuration.toFixed(0)}s`); 1127 1128 // Log estimated progress every 5 seconds 1129 const progressInterval = setInterval(() => { 1130 const elapsed = (Date.now() - startTime) / 1000; 1131 const estimatedPercent = Math.min(95, Math.round((elapsed / estimatedDuration) * 100)); 1132 const estimatedMB = Math.min(bodySize / 1024 / 1024, elapsed * avgSpeed).toFixed(1); 1133 console.log(`[Upload Progress] ~${estimatedPercent}% (~${estimatedMB}/${sizeMB}MB) - ${elapsed.toFixed(0)}s elapsed`); 1134 }, 5000); 1135 1136 try { 1137 const result = await auth.session.fetchHandler(url, init); 1138 clearInterval(progressInterval); 1139 const totalTime = (Date.now() - startTime) / 1000; 1140 const actualSpeed = (bodySize / 1024 / 1024) / totalTime; 1141 uploadStats.speeds.push(actualSpeed); 1142 // Keep only last 10 uploads for rolling average 1143 if (uploadStats.speeds.length > 10) uploadStats.speeds.shift(); 1144 console.log(`[Upload Progress] ✅ Completed ${sizeMB}MB in ${totalTime.toFixed(1)}s (${actualSpeed.toFixed(1)} MB/s)`); 1145 return result; 1146 } catch (err) { 1147 clearInterval(progressInterval); 1148 const elapsed = (Date.now() - startTime) / 1000; 1149 console.error(`[Upload Progress] ❌ Upload failed after ${elapsed.toFixed(1)}s`); 1150 throw err; 1151 } 1152 } else { 1153 // Track small files too for speed calculation 1154 try { 1155 const result = await auth.session.fetchHandler(url, init); 1156 const totalTime = (Date.now() - startTime) / 1000; 1157 if (totalTime > 0.5) { // Only track if > 0.5s 1158 const actualSpeed = (bodySize / 1024 / 1024) / totalTime; 1159 uploadStats.speeds.push(actualSpeed); 1160 if (uploadStats.speeds.length > 10) uploadStats.speeds.shift(); 1161 console.log(`[Upload Stats] Small file: ${(bodySize / 1024).toFixed(1)}KB in ${totalTime.toFixed(2)}s = ${actualSpeed.toFixed(2)} MB/s`); 1162 } 1163 return result; 1164 } catch (err) { 1165 throw err; 1166 } 1167 } 1168 } 1169 1170 // Normal request 1171 return auth.session.fetchHandler(url, init); 1172 }; 1173 1174 const agent = new Agent(wrappedFetchHandler) 1175 console.log('Agent created for DID:', auth.did); 1176 console.log('Created upload job:', jobId); 1177 1178 // Start background processing (don't await) 1179 processUploadInBackground(jobId, agent, auth.did, siteName, fileArray).catch(err => { 1180 console.error('Background upload process failed:', err); 1181 logger.error('Background upload process failed', err); 1182 }); 1183 1184 // Return immediately with job ID 1185 return { 1186 success: true, 1187 jobId, 1188 message: 'Upload started. Connect to /wisp/upload-progress/' + jobId + ' for progress updates.' 1189 }; 1190 } catch (error) { 1191 console.error('=== UPLOAD ERROR ==='); 1192 console.error('Error details:', error); 1193 logger.error('Upload error', error); 1194 throw new Error(`Failed to upload files: ${error instanceof Error ? error.message : 'Unknown error'}`); 1195 } 1196 } 1197 )