Monorepo for wisp.place. A static site hosting service built on top of the AT Protocol. wisp.place
1import { Elysia } from 'elysia' 2import { requireAuth, type AuthenticatedContext } from '../lib/wisp-auth' 3import { NodeOAuthClient } from '@atproto/oauth-client-node' 4import { Agent } from '@atproto/api' 5import { TID } from '@atproto/common-web' 6import { 7 type UploadedFile, 8 type FileUploadResult, 9 processUploadedFiles, 10 updateFileBlobs, 11 findLargeDirectories, 12 replaceDirectoryWithSubfs, 13 estimateDirectorySize 14} from '@wisp/fs-utils' 15import { 16 shouldCompressFile, 17 compressFile, 18 computeCID, 19 extractBlobMap, 20 extractSubfsUris 21} from '@wisp/atproto-utils' 22import { createManifest } from '@wisp/fs-utils' 23import { upsertSite } from '../lib/db' 24import { createLogger } from '@wisp/observability' 25import { validateRecord, type Directory } from '@wisp/lexicons/types/place/wisp/fs' 26import { validateRecord as validateSubfsRecord } from '@wisp/lexicons/types/place/wisp/subfs' 27import { MAX_SITE_SIZE, MAX_FILE_SIZE, MAX_FILE_COUNT } from '@wisp/constants' 28import { 29 createUploadJob, 30 getUploadJob, 31 updateJobProgress, 32 completeUploadJob, 33 failUploadJob, 34 addJobListener 35} from '../lib/upload-jobs' 36 37const logger = createLogger('main-app') 38 39function isValidSiteName(siteName: string): boolean { 40 if (!siteName || typeof siteName !== 'string') return false; 41 42 // Length check (AT Protocol rkey limit) 43 if (siteName.length < 1 || siteName.length > 512) return false; 44 45 // Check for path traversal 46 if (siteName === '.' || siteName === '..') return false; 47 if (siteName.includes('/') || siteName.includes('\\')) return false; 48 if (siteName.includes('\0')) return false; 49 50 // AT Protocol rkey format: alphanumeric, dots, dashes, underscores, tildes, colons 51 // Based on NSID format rules 52 const validRkeyPattern = /^[a-zA-Z0-9._~:-]+$/; 53 if (!validRkeyPattern.test(siteName)) return false; 54 55 return true; 56} 57 58async function processUploadInBackground( 59 jobId: string, 60 agent: Agent, 61 did: string, 62 siteName: string, 63 fileArray: File[] 64): Promise<void> { 65 try { 66 // Try to fetch existing record to enable incremental updates 67 let existingBlobMap = new Map<string, { blobRef: any; cid: string }>(); 68 let oldSubfsUris: Array<{ uri: string; path: string }> = []; 69 console.log('Attempting to fetch existing record...'); 70 updateJobProgress(jobId, { phase: 'validating' }); 71 72 try { 73 const rkey = siteName; 74 const existingRecord = await agent.com.atproto.repo.getRecord({ 75 repo: did, 76 collection: 'place.wisp.fs', 77 rkey: rkey 78 }); 79 console.log('Existing record found!'); 80 81 if (existingRecord.data.value && typeof existingRecord.data.value === 'object' && 'root' in existingRecord.data.value) { 82 const manifest = existingRecord.data.value as any; 83 84 // Extract blob map from main record 85 existingBlobMap = extractBlobMap(manifest.root); 86 console.log(`Found existing manifest with ${existingBlobMap.size} files in main record`); 87 88 // Extract subfs URIs with their mount paths from main record 89 const subfsUris = extractSubfsUris(manifest.root); 90 oldSubfsUris = subfsUris; // Save for cleanup later 91 92 if (subfsUris.length > 0) { 93 console.log(`Found ${subfsUris.length} subfs records, fetching in parallel...`); 94 logger.info(`Fetching ${subfsUris.length} subfs records for blob reuse`); 95 96 // Fetch all subfs records in parallel 97 const subfsRecords = await Promise.all( 98 subfsUris.map(async ({ uri, path }) => { 99 try { 100 // Parse URI: at://did/collection/rkey 101 const parts = uri.replace('at://', '').split('/'); 102 const subDid = parts[0]; 103 const collection = parts[1]; 104 const subRkey = parts[2]; 105 106 const record = await agent.com.atproto.repo.getRecord({ 107 repo: subDid, 108 collection: collection, 109 rkey: subRkey 110 }); 111 112 return { record: record.data.value as any, mountPath: path }; 113 } catch (err: any) { 114 logger.warn(`Failed to fetch subfs record ${uri}: ${err?.message}`, err); 115 return null; 116 } 117 }) 118 ); 119 120 // Merge blob maps from all subfs records 121 let totalSubfsBlobs = 0; 122 for (const subfsData of subfsRecords) { 123 if (subfsData && subfsData.record && 'root' in subfsData.record) { 124 // Extract blobs with the correct mount path prefix 125 const subfsMap = extractBlobMap(subfsData.record.root, subfsData.mountPath); 126 subfsMap.forEach((value, key) => { 127 existingBlobMap.set(key, value); 128 totalSubfsBlobs++; 129 }); 130 } 131 } 132 133 console.log(`Merged ${totalSubfsBlobs} files from ${subfsUris.length} subfs records`); 134 logger.info(`Total blob map: ${existingBlobMap.size} files (main + subfs)`); 135 } 136 137 console.log(`Total existing blobs for reuse: ${existingBlobMap.size} files`); 138 logger.info(`Found existing manifest with ${existingBlobMap.size} files for incremental update`); 139 } 140 } catch (error: any) { 141 console.log('No existing record found or error:', error?.message || error); 142 if (error?.status !== 400 && error?.error !== 'RecordNotFound') { 143 logger.warn('Failed to fetch existing record, proceeding with full upload', error); 144 } 145 } 146 147 // Convert File objects to UploadedFile format 148 const uploadedFiles: UploadedFile[] = []; 149 const skippedFiles: Array<{ name: string; reason: string }> = []; 150 151 console.log('Processing files, count:', fileArray.length); 152 updateJobProgress(jobId, { phase: 'compressing' }); 153 154 for (let i = 0; i < fileArray.length; i++) { 155 const file = fileArray[i]; 156 157 // Skip undefined/null files 158 if (!file || !file.name) { 159 console.log(`Skipping undefined file at index ${i}`); 160 skippedFiles.push({ 161 name: `[undefined file at index ${i}]`, 162 reason: 'Invalid file object' 163 }); 164 continue; 165 } 166 167 console.log(`Processing file ${i + 1}/${fileArray.length}:`, file.name, file.size, 'bytes'); 168 updateJobProgress(jobId, { 169 filesProcessed: i + 1, 170 currentFile: file.name 171 }); 172 173 // Skip unwanted files and directories 174 const normalizedPath = file.name.replace(/^[^\/]*\//, ''); 175 const fileName = normalizedPath.split('/').pop() || ''; 176 const pathParts = normalizedPath.split('/'); 177 178 // .git directory (version control - thousands of files) 179 if (normalizedPath.startsWith('.git/') || normalizedPath === '.git') { 180 console.log(`Skipping .git file: ${file.name}`); 181 skippedFiles.push({ 182 name: file.name, 183 reason: '.git directory excluded' 184 }); 185 continue; 186 } 187 188 // .DS_Store (macOS metadata - can leak info) 189 if (fileName === '.DS_Store') { 190 console.log(`Skipping .DS_Store file: ${file.name}`); 191 skippedFiles.push({ 192 name: file.name, 193 reason: '.DS_Store file excluded' 194 }); 195 continue; 196 } 197 198 // .env files (environment variables with secrets) 199 if (fileName.startsWith('.env')) { 200 console.log(`Skipping .env file: ${file.name}`); 201 skippedFiles.push({ 202 name: file.name, 203 reason: 'environment files excluded for security' 204 }); 205 continue; 206 } 207 208 // node_modules (dependency folder - can be 100,000+ files) 209 if (pathParts.includes('node_modules')) { 210 console.log(`Skipping node_modules file: ${file.name}`); 211 skippedFiles.push({ 212 name: file.name, 213 reason: 'node_modules excluded' 214 }); 215 continue; 216 } 217 218 // OS metadata files 219 if (fileName === 'Thumbs.db' || fileName === 'desktop.ini' || fileName.startsWith('._')) { 220 console.log(`Skipping OS metadata file: ${file.name}`); 221 skippedFiles.push({ 222 name: file.name, 223 reason: 'OS metadata file excluded' 224 }); 225 continue; 226 } 227 228 // macOS system directories 229 if (pathParts.includes('.Spotlight-V100') || pathParts.includes('.Trashes') || pathParts.includes('.fseventsd')) { 230 console.log(`Skipping macOS system file: ${file.name}`); 231 skippedFiles.push({ 232 name: file.name, 233 reason: 'macOS system directory excluded' 234 }); 235 continue; 236 } 237 238 // Cache and temp directories 239 if (pathParts.some(part => part === '.cache' || part === '.temp' || part === '.tmp')) { 240 console.log(`Skipping cache/temp file: ${file.name}`); 241 skippedFiles.push({ 242 name: file.name, 243 reason: 'cache/temp directory excluded' 244 }); 245 continue; 246 } 247 248 // Python cache 249 if (pathParts.includes('__pycache__') || fileName.endsWith('.pyc')) { 250 console.log(`Skipping Python cache file: ${file.name}`); 251 skippedFiles.push({ 252 name: file.name, 253 reason: 'Python cache excluded' 254 }); 255 continue; 256 } 257 258 // Python virtual environments 259 if (pathParts.some(part => part === '.venv' || part === 'venv' || part === 'env')) { 260 console.log(`Skipping Python venv file: ${file.name}`); 261 skippedFiles.push({ 262 name: file.name, 263 reason: 'Python virtual environment excluded' 264 }); 265 continue; 266 } 267 268 // Editor swap files 269 if (fileName.endsWith('.swp') || fileName.endsWith('.swo') || fileName.endsWith('~')) { 270 console.log(`Skipping editor swap file: ${file.name}`); 271 skippedFiles.push({ 272 name: file.name, 273 reason: 'editor swap file excluded' 274 }); 275 continue; 276 } 277 278 // Skip files that are too large 279 const maxSize = MAX_FILE_SIZE; 280 if (file.size > maxSize) { 281 skippedFiles.push({ 282 name: file.name, 283 reason: `file too large (${(file.size / 1024 / 1024).toFixed(2)}MB, max 100MB)` 284 }); 285 continue; 286 } 287 288 const arrayBuffer = await file.arrayBuffer(); 289 const originalContent = Buffer.from(arrayBuffer); 290 const originalMimeType = file.type || 'application/octet-stream'; 291 292 // Determine if file should be compressed (pass filename to exclude _redirects) 293 const shouldCompress = shouldCompressFile(originalMimeType, normalizedPath); 294 295 // Text files (HTML/CSS/JS) need base64 encoding to prevent PDS content sniffing 296 // Audio files just need compression without base64 297 const needsBase64 = 298 originalMimeType.startsWith('text/') || 299 originalMimeType.startsWith('application/json') || 300 originalMimeType.startsWith('application/xml') || 301 originalMimeType === 'image/svg+xml'; 302 303 let finalContent: Buffer; 304 let compressed = false; 305 let base64Encoded = false; 306 307 if (shouldCompress) { 308 const compressedContent = compressFile(originalContent); 309 compressed = true; 310 311 if (needsBase64) { 312 // Text files: compress AND base64 encode 313 finalContent = Buffer.from(compressedContent.toString('base64'), 'binary'); 314 base64Encoded = true; 315 const compressionRatio = (compressedContent.length / originalContent.length * 100).toFixed(1); 316 console.log(`Compressing+base64 ${file.name}: ${originalContent.length} -> ${compressedContent.length} bytes (${compressionRatio}%), base64: ${finalContent.length} bytes`); 317 logger.info(`Compressing+base64 ${file.name}: ${originalContent.length} -> ${compressedContent.length} bytes (${compressionRatio}%), base64: ${finalContent.length} bytes`); 318 } else { 319 // Audio files: just compress, no base64 320 finalContent = compressedContent; 321 const compressionRatio = (compressedContent.length / originalContent.length * 100).toFixed(1); 322 console.log(`Compressing ${file.name}: ${originalContent.length} -> ${compressedContent.length} bytes (${compressionRatio}%)`); 323 logger.info(`Compressing ${file.name}: ${originalContent.length} -> ${compressedContent.length} bytes (${compressionRatio}%)`); 324 } 325 } else { 326 // Binary files: upload directly 327 finalContent = originalContent; 328 console.log(`Uploading ${file.name} directly: ${originalContent.length} bytes (no compression)`); 329 logger.info(`Uploading ${file.name} directly: ${originalContent.length} bytes (binary)`); 330 } 331 332 uploadedFiles.push({ 333 name: file.name, 334 content: finalContent, 335 mimeType: originalMimeType, 336 size: finalContent.length, 337 compressed, 338 base64Encoded, 339 originalMimeType 340 }); 341 } 342 343 // Update total file count after filtering (important for progress tracking) 344 updateJobProgress(jobId, { 345 totalFiles: uploadedFiles.length 346 }); 347 348 // Check total size limit 349 const totalSize = uploadedFiles.reduce((sum, file) => sum + file.size, 0); 350 const maxTotalSize = MAX_SITE_SIZE; 351 352 if (totalSize > maxTotalSize) { 353 throw new Error(`Total upload size ${(totalSize / 1024 / 1024).toFixed(2)}MB exceeds 300MB limit`); 354 } 355 356 // Check file count limit 357 if (uploadedFiles.length > MAX_FILE_COUNT) { 358 throw new Error(`File count ${uploadedFiles.length} exceeds ${MAX_FILE_COUNT} files limit`); 359 } 360 361 console.log(`After filtering: ${uploadedFiles.length} files to process (${skippedFiles.length} skipped)`); 362 363 if (uploadedFiles.length === 0) { 364 // Create empty manifest 365 const emptyManifest = { 366 $type: 'place.wisp.fs', 367 site: siteName, 368 root: { 369 type: 'directory', 370 entries: [] 371 }, 372 fileCount: 0, 373 createdAt: new Date().toISOString() 374 }; 375 376 const validationResult = validateRecord(emptyManifest); 377 if (!validationResult.success) { 378 throw new Error(`Invalid manifest: ${validationResult.error?.message || 'Validation failed'}`); 379 } 380 381 const rkey = siteName; 382 updateJobProgress(jobId, { phase: 'finalizing' }); 383 384 const record = await agent.com.atproto.repo.putRecord({ 385 repo: did, 386 collection: 'place.wisp.fs', 387 rkey: rkey, 388 record: emptyManifest 389 }); 390 391 await upsertSite(did, rkey, siteName); 392 393 completeUploadJob(jobId, { 394 success: true, 395 uri: record.data.uri, 396 cid: record.data.cid, 397 fileCount: 0, 398 siteName, 399 skippedFiles 400 }); 401 return; 402 } 403 404 // Process files into directory structure 405 console.log('Processing uploaded files into directory structure...'); 406 const validUploadedFiles = uploadedFiles.filter((f, i) => { 407 if (!f || !f.name || !f.content) { 408 console.error(`Filtering out invalid file at index ${i}`); 409 return false; 410 } 411 return true; 412 }); 413 414 const { directory, fileCount } = processUploadedFiles(validUploadedFiles); 415 console.log('Directory structure created, file count:', fileCount); 416 417 // Upload files as blobs with retry logic for DPoP nonce conflicts 418 console.log('Starting blob upload/reuse phase...'); 419 updateJobProgress(jobId, { phase: 'uploading' }); 420 421 // Helper function to upload blob with exponential backoff retry and timeout 422 const uploadBlobWithRetry = async ( 423 agent: Agent, 424 content: Buffer, 425 mimeType: string, 426 fileName: string, 427 maxRetries = 5 428 ) => { 429 for (let attempt = 0; attempt < maxRetries; attempt++) { 430 const controller = new AbortController(); 431 const timeoutMs = 300000; // 5 minute timeout per upload 432 const timeoutId = setTimeout(() => controller.abort(), timeoutMs); 433 434 try { 435 console.log(`[File Upload] Starting upload attempt ${attempt + 1}/${maxRetries} for ${fileName} (${content.length} bytes, ${mimeType})`); 436 437 const result = await agent.com.atproto.repo.uploadBlob(content, { encoding: mimeType }); 438 clearTimeout(timeoutId); 439 console.log(`[File Upload] ✅ Successfully uploaded ${fileName} on attempt ${attempt + 1}`); 440 return result; 441 } catch (error: any) { 442 clearTimeout(timeoutId); 443 444 const isDPoPNonceError = 445 error?.message?.toLowerCase().includes('nonce') || 446 error?.message?.toLowerCase().includes('dpop') || 447 error?.status === 409; 448 449 const isTimeout = error?.name === 'AbortError' || error?.message === 'Upload timeout'; 450 const isRateLimited = error?.status === 429 || error?.message?.toLowerCase().includes('rate'); 451 const isRequestEntityTooLarge = error?.status === 419 || error?.status === 413; 452 453 // Special handling for 419/413 Request Entity Too Large errors 454 if (isRequestEntityTooLarge) { 455 const customError = new Error('Your PDS is not allowing uploads large enough to store your site. Please contact your PDS host. This could also possibly be a result of it being behind Cloudflare free tier.'); 456 (customError as any).status = 419; 457 throw customError; 458 } 459 460 // Retry on DPoP nonce conflicts, timeouts, or rate limits 461 if ((isDPoPNonceError || isTimeout || isRateLimited) && attempt < maxRetries - 1) { 462 let backoffMs: number; 463 if (isRateLimited) { 464 backoffMs = 2000 * Math.pow(2, attempt); // 2s, 4s, 8s, 16s for rate limits 465 } else if (isTimeout) { 466 backoffMs = 1000 * Math.pow(2, attempt); // 1s, 2s, 4s, 8s for timeouts 467 } else { 468 backoffMs = 100 * Math.pow(2, attempt); // 100ms, 200ms, 400ms for DPoP 469 } 470 471 const reason = isDPoPNonceError ? 'DPoP nonce conflict' : isTimeout ? 'timeout' : 'rate limit'; 472 logger.info(`[File Upload] 🔄 ${reason} for ${fileName}, retrying in ${backoffMs}ms (attempt ${attempt + 1}/${maxRetries})`); 473 console.log(`[File Upload] 🔄 ${reason} for ${fileName}, retrying in ${backoffMs}ms`); 474 await new Promise(resolve => setTimeout(resolve, backoffMs)); 475 continue; 476 } 477 478 // Log detailed error information before throwing 479 logger.error(`[File Upload] ❌ Upload failed for ${fileName} (size: ${content.length} bytes, mimeType: ${mimeType}, attempt: ${attempt + 1}/${maxRetries})`, { 480 error: error?.error || error?.message || 'Unknown error', 481 status: error?.status, 482 headers: error?.headers, 483 success: error?.success 484 }); 485 console.error(`[File Upload] ❌ Upload failed for ${fileName}:`, { 486 error: error?.error || error?.message || 'Unknown error', 487 status: error?.status, 488 size: content.length, 489 mimeType, 490 attempt: attempt + 1 491 }); 492 throw error; 493 } 494 } 495 throw new Error(`Failed to upload ${fileName} after ${maxRetries} attempts`); 496 }; 497 498 // Use sliding window concurrency for maximum throughput 499 const CONCURRENCY_LIMIT = 20; // Maximum concurrent uploads 500 const uploadedBlobs: Array<{ 501 result: FileUploadResult; 502 filePath: string; 503 sentMimeType: string; 504 returnedMimeType: string; 505 reused: boolean; 506 }> = []; 507 const failedFiles: Array<{ 508 name: string; 509 index: number; 510 error: string; 511 size: number; 512 }> = []; 513 514 // Track completed files count for accurate progress 515 let completedFilesCount = 0; 516 517 // Process file with sliding window concurrency 518 const processFile = async (file: UploadedFile, index: number) => { 519 try { 520 if (!file || !file.name) { 521 throw new Error(`Undefined file at index ${index}`); 522 } 523 524 const fileCID = computeCID(file.content); 525 const normalizedPath = file.name.replace(/^[^\/]*\//, ''); 526 const existingBlob = existingBlobMap.get(normalizedPath) || existingBlobMap.get(file.name); 527 528 if (existingBlob && existingBlob.cid === fileCID) { 529 logger.info(`[File Upload] ♻️ Reused: ${file.name} (unchanged, CID: ${fileCID})`); 530 const reusedCount = (getUploadJob(jobId)?.progress.filesReused || 0) + 1; 531 completedFilesCount++; 532 updateJobProgress(jobId, { 533 filesReused: reusedCount, 534 currentFile: `${completedFilesCount}/${validUploadedFiles.length}: ${file.name} (reused)` 535 }); 536 537 return { 538 result: { 539 hash: existingBlob.cid, 540 blobRef: existingBlob.blobRef, 541 ...(file.compressed && { 542 encoding: 'gzip' as const, 543 mimeType: file.originalMimeType || file.mimeType, 544 base64: file.base64Encoded || false 545 }) 546 }, 547 filePath: file.name, 548 sentMimeType: file.mimeType, 549 returnedMimeType: existingBlob.blobRef.mimeType, 550 reused: true 551 }; 552 } 553 554 const uploadMimeType = file.compressed || file.mimeType.startsWith('text/html') 555 ? 'application/octet-stream' 556 : file.mimeType; 557 558 const compressionInfo = file.compressed ? ' (gzipped)' : ''; 559 const fileSizeMB = (file.size / 1024 / 1024).toFixed(2); 560 logger.info(`[File Upload] ⬆️ Uploading: ${file.name} (${fileSizeMB}MB${compressionInfo})`); 561 562 const uploadResult = await uploadBlobWithRetry( 563 agent, 564 file.content, 565 uploadMimeType, 566 file.name 567 ); 568 569 const returnedBlobRef = uploadResult.data.blob; 570 const uploadedCount = (getUploadJob(jobId)?.progress.filesUploaded || 0) + 1; 571 completedFilesCount++; 572 updateJobProgress(jobId, { 573 filesUploaded: uploadedCount, 574 currentFile: `${completedFilesCount}/${validUploadedFiles.length}: ${file.name} (uploaded)` 575 }); 576 logger.info(`[File Upload] ✅ Uploaded: ${file.name} (CID: ${fileCID})`); 577 578 return { 579 result: { 580 hash: returnedBlobRef.ref.toString(), 581 blobRef: returnedBlobRef, 582 ...(file.compressed && { 583 encoding: 'gzip' as const, 584 mimeType: file.originalMimeType || file.mimeType, 585 base64: file.base64Encoded || false 586 }) 587 }, 588 filePath: file.name, 589 sentMimeType: file.mimeType, 590 returnedMimeType: returnedBlobRef.mimeType, 591 reused: false 592 }; 593 } catch (uploadError) { 594 const fileName = file?.name || 'unknown'; 595 const fileSize = file?.size || 0; 596 const errorMessage = uploadError instanceof Error ? uploadError.message : 'Unknown error'; 597 const errorDetails = { 598 fileName, 599 fileSize, 600 index, 601 error: errorMessage, 602 stack: uploadError instanceof Error ? uploadError.stack : undefined 603 }; 604 logger.error(`Upload failed for file: ${fileName} (${fileSize} bytes) at index ${index}`, errorDetails); 605 console.error(`Upload failed for file: ${fileName} (${fileSize} bytes) at index ${index}`, errorDetails); 606 607 completedFilesCount++; 608 updateJobProgress(jobId, { 609 currentFile: `${completedFilesCount}/${validUploadedFiles.length}: ${fileName} (failed)` 610 }); 611 612 // Track failed file but don't throw - continue with other files 613 failedFiles.push({ 614 name: fileName, 615 index, 616 error: errorMessage, 617 size: fileSize 618 }); 619 620 return null; // Return null to indicate failure 621 } 622 }; 623 624 // Sliding window concurrency control 625 const processWithConcurrency = async () => { 626 const results: any[] = []; 627 let fileIndex = 0; 628 const executing = new Map<Promise<void>, { index: number; name: string }>(); 629 630 for (const file of validUploadedFiles) { 631 const currentIndex = fileIndex++; 632 633 const promise = processFile(file, currentIndex) 634 .then(result => { 635 results[currentIndex] = result; 636 }) 637 .catch(error => { 638 // This shouldn't happen since processFile catches errors, but just in case 639 logger.error(`Unexpected error processing file at index ${currentIndex}`, error); 640 results[currentIndex] = null; 641 }) 642 .finally(() => { 643 executing.delete(promise); 644 }); 645 646 executing.set(promise, { index: currentIndex, name: file.name }); 647 648 if (executing.size >= CONCURRENCY_LIMIT) { 649 await Promise.race(executing.keys()); 650 } 651 } 652 653 // Wait for remaining uploads 654 await Promise.all(executing.keys()); 655 console.log(`\n✅ Upload complete: ${completedFilesCount}/${validUploadedFiles.length} files processed\n`); 656 return results.filter(r => r !== undefined && r !== null); // Filter out null (failed) and undefined entries 657 }; 658 659 const allResults = await processWithConcurrency(); 660 uploadedBlobs.push(...allResults); 661 662 const currentReused = uploadedBlobs.filter(b => b.reused).length; 663 const currentUploaded = uploadedBlobs.filter(b => !b.reused).length; 664 const successfulCount = uploadedBlobs.length; 665 const failedCount = failedFiles.length; 666 667 logger.info(`[File Upload] 🎉 Upload complete → ${successfulCount}/${validUploadedFiles.length} files succeeded (${currentUploaded} uploaded, ${currentReused} reused), ${failedCount} failed`); 668 669 if (failedCount > 0) { 670 logger.warn(`[File Upload] ⚠️ Failed files:`, failedFiles); 671 console.warn(`[File Upload] ⚠️ ${failedCount} files failed to upload:`, failedFiles.map(f => f.name).join(', ')); 672 } 673 674 const reusedCount = uploadedBlobs.filter(b => b.reused).length; 675 const uploadedCount = uploadedBlobs.filter(b => !b.reused).length; 676 logger.info(`[File Upload] 🎉 Upload phase complete! Total: ${successfulCount} files (${uploadedCount} uploaded, ${reusedCount} reused)`); 677 678 const uploadResults: FileUploadResult[] = uploadedBlobs.map(blob => blob.result); 679 const filePaths: string[] = uploadedBlobs.map(blob => blob.filePath); 680 681 // Update directory with file blobs (only for successfully uploaded files) 682 console.log('Updating directory with blob references...'); 683 updateJobProgress(jobId, { phase: 'creating_manifest' }); 684 685 // Create a set of successfully uploaded paths for quick lookup 686 const successfulPaths = new Set(filePaths.map(path => path.replace(/^[^\/]*\//, ''))); 687 688 const updatedDirectory = updateFileBlobs(directory, uploadResults, filePaths, '', successfulPaths); 689 690 // Calculate actual file count (only successfully uploaded files) 691 const actualFileCount = uploadedBlobs.length; 692 693 // Check if we need to split into subfs records 694 // Split proactively if we have lots of files to avoid hitting manifest size limits 695 const MAX_MANIFEST_SIZE = 140 * 1024; // 140KB to be safe (PDS limit is 150KB) 696 const FILE_COUNT_THRESHOLD = 250; // Start splitting at this many files 697 const TARGET_FILE_COUNT = 200; // Try to keep main manifest under this many files 698 const subfsRecords: Array<{ uri: string; path: string }> = []; 699 let workingDirectory = updatedDirectory; 700 let currentFileCount = actualFileCount; 701 702 // Create initial manifest to check size 703 let manifest = createManifest(siteName, workingDirectory, actualFileCount); 704 let manifestSize = JSON.stringify(manifest).length; 705 706 // Split if we have lots of files OR if manifest is already too large 707 if (actualFileCount >= FILE_COUNT_THRESHOLD || manifestSize > MAX_MANIFEST_SIZE) { 708 console.log(`⚠️ Large site detected (${actualFileCount} files, ${(manifestSize / 1024).toFixed(1)}KB), splitting into subfs records...`); 709 logger.info(`Large site with ${actualFileCount} files, splitting into subfs records`); 710 711 // Keep splitting until manifest fits under limits (both size and file count) 712 let attempts = 0; 713 const MAX_ATTEMPTS = 100; // Allow many splits for very large sites 714 715 while ((manifestSize > MAX_MANIFEST_SIZE || currentFileCount > TARGET_FILE_COUNT) && attempts < MAX_ATTEMPTS) { 716 attempts++; 717 718 // Find all directories sorted by size (largest first) 719 const directories = findLargeDirectories(workingDirectory); 720 directories.sort((a, b) => b.size - a.size); 721 722 // Check if we can split subdirectories or need to split flat files 723 if (directories.length > 0) { 724 // Split the largest subdirectory 725 const largestDir = directories[0]; 726 console.log(` Split #${attempts}: ${largestDir.path} (${largestDir.fileCount} files, ${(largestDir.size / 1024).toFixed(1)}KB)`); 727 728 // Create a subfs record for this directory 729 const subfsRkey = TID.nextStr(); 730 const subfsManifest = { 731 $type: 'place.wisp.subfs' as const, 732 root: largestDir.directory, 733 fileCount: largestDir.fileCount, 734 createdAt: new Date().toISOString() 735 }; 736 737 // Validate subfs record 738 const subfsValidation = validateSubfsRecord(subfsManifest); 739 if (!subfsValidation.success) { 740 throw new Error(`Invalid subfs manifest: ${subfsValidation.error?.message || 'Validation failed'}`); 741 } 742 743 // Upload subfs record to PDS 744 const subfsRecord = await agent.com.atproto.repo.putRecord({ 745 repo: did, 746 collection: 'place.wisp.subfs', 747 rkey: subfsRkey, 748 record: subfsManifest 749 }); 750 751 const subfsUri = subfsRecord.data.uri; 752 subfsRecords.push({ uri: subfsUri, path: largestDir.path }); 753 console.log(` ✅ Created subfs: ${subfsUri}`); 754 logger.info(`Created subfs record for ${largestDir.path}: ${subfsUri}`); 755 756 // Replace directory with subfs node in the main tree 757 workingDirectory = replaceDirectoryWithSubfs(workingDirectory, largestDir.path, subfsUri); 758 currentFileCount -= largestDir.fileCount; 759 } else { 760 // No subdirectories - split flat files at root level 761 const rootFiles = workingDirectory.entries.filter(e => 'type' in e.node && e.node.type === 'file'); 762 763 if (rootFiles.length === 0) { 764 throw new Error( 765 `Cannot split manifest further - no files or directories available. ` + 766 `Current: ${currentFileCount} files, ${(manifestSize / 1024).toFixed(1)}KB.` 767 ); 768 } 769 770 // Take a chunk of files (aim for ~100 files per chunk) 771 const CHUNK_SIZE = 100; 772 const chunkFiles = rootFiles.slice(0, Math.min(CHUNK_SIZE, rootFiles.length)); 773 console.log(` Split #${attempts}: flat root (${chunkFiles.length} files)`); 774 775 // Create a directory with just these files 776 const chunkDirectory: Directory = { 777 $type: 'place.wisp.fs#directory' as const, 778 type: 'directory' as const, 779 entries: chunkFiles 780 }; 781 782 // Create subfs record for this chunk 783 const subfsRkey = TID.nextStr(); 784 const subfsManifest = { 785 $type: 'place.wisp.subfs' as const, 786 root: chunkDirectory, 787 fileCount: chunkFiles.length, 788 createdAt: new Date().toISOString() 789 }; 790 791 // Validate subfs record 792 const subfsValidation = validateSubfsRecord(subfsManifest); 793 if (!subfsValidation.success) { 794 throw new Error(`Invalid subfs manifest: ${subfsValidation.error?.message || 'Validation failed'}`); 795 } 796 797 // Upload subfs record to PDS 798 const subfsRecord = await agent.com.atproto.repo.putRecord({ 799 repo: did, 800 collection: 'place.wisp.subfs', 801 rkey: subfsRkey, 802 record: subfsManifest 803 }); 804 805 const subfsUri = subfsRecord.data.uri; 806 console.log(` ✅ Created flat subfs: ${subfsUri}`); 807 logger.info(`Created flat subfs record with ${chunkFiles.length} files: ${subfsUri}`); 808 809 // Remove these files from the working directory and add a subfs entry 810 const remainingEntries = workingDirectory.entries.filter( 811 e => !chunkFiles.some(cf => cf.name === e.name) 812 ); 813 814 // Add subfs entry (will be merged flat when expanded) 815 remainingEntries.push({ 816 name: `__subfs_${attempts}`, // Placeholder name, will be merged away 817 node: { 818 $type: 'place.wisp.fs#subfs' as const, 819 type: 'subfs' as const, 820 subject: subfsUri, 821 flat: true // Merge entries directly into parent (default, but explicit for clarity) 822 } 823 }); 824 825 workingDirectory = { 826 $type: 'place.wisp.fs#directory' as const, 827 type: 'directory' as const, 828 entries: remainingEntries 829 }; 830 831 subfsRecords.push({ uri: subfsUri, path: `__subfs_${attempts}` }); 832 currentFileCount -= chunkFiles.length; 833 } 834 835 // Recreate manifest and check new size 836 manifest = createManifest(siteName, workingDirectory, currentFileCount); 837 manifestSize = JSON.stringify(manifest).length; 838 const newSizeKB = (manifestSize / 1024).toFixed(1); 839 console.log(` → Manifest now ${newSizeKB}KB with ${currentFileCount} files (${subfsRecords.length} subfs total)`); 840 841 // Check if we're under both limits now 842 if (manifestSize <= MAX_MANIFEST_SIZE && currentFileCount <= TARGET_FILE_COUNT) { 843 console.log(` ✅ Manifest fits! (${currentFileCount} files, ${newSizeKB}KB)`); 844 break; 845 } 846 } 847 848 if (manifestSize > MAX_MANIFEST_SIZE || currentFileCount > TARGET_FILE_COUNT) { 849 throw new Error( 850 `Failed to fit manifest after splitting ${attempts} directories. ` + 851 `Current: ${currentFileCount} files, ${(manifestSize / 1024).toFixed(1)}KB. ` + 852 `This should never happen - please report this issue.` 853 ); 854 } 855 856 console.log(`✅ Split complete: ${subfsRecords.length} subfs records, ${currentFileCount} files in main, ${(manifestSize / 1024).toFixed(1)}KB manifest`); 857 logger.info(`Split into ${subfsRecords.length} subfs records, ${currentFileCount} files remaining in main tree`); 858 } else { 859 const manifestSizeKB = (manifestSize / 1024).toFixed(1); 860 console.log(`Manifest created (${fileCount} files, ${manifestSizeKB}KB JSON) - no splitting needed`); 861 } 862 863 const rkey = siteName; 864 updateJobProgress(jobId, { phase: 'finalizing' }); 865 866 console.log('Putting record to PDS with rkey:', rkey); 867 const record = await agent.com.atproto.repo.putRecord({ 868 repo: did, 869 collection: 'place.wisp.fs', 870 rkey: rkey, 871 record: manifest 872 }); 873 console.log('Record successfully created on PDS:', record.data.uri); 874 875 // Store site in database cache 876 await upsertSite(did, rkey, siteName); 877 878 // Clean up old subfs records if we had any 879 if (oldSubfsUris.length > 0) { 880 console.log(`Cleaning up ${oldSubfsUris.length} old subfs records...`); 881 logger.info(`Cleaning up ${oldSubfsUris.length} old subfs records`); 882 883 // Delete old subfs records in parallel (don't wait for completion) 884 Promise.all( 885 oldSubfsUris.map(async ({ uri }) => { 886 try { 887 // Parse URI: at://did/collection/rkey 888 const parts = uri.replace('at://', '').split('/'); 889 const subRkey = parts[2]; 890 891 await agent.com.atproto.repo.deleteRecord({ 892 repo: did, 893 collection: 'place.wisp.subfs', 894 rkey: subRkey 895 }); 896 897 console.log(` 🗑️ Deleted old subfs: ${uri}`); 898 logger.info(`Deleted old subfs record: ${uri}`); 899 } catch (err: any) { 900 // Don't fail the whole upload if cleanup fails 901 console.warn(`Failed to delete old subfs ${uri}:`, err?.message); 902 logger.warn(`Failed to delete old subfs ${uri}`, err); 903 } 904 }) 905 ).catch(err => { 906 // Log but don't fail if cleanup fails 907 logger.warn('Some subfs cleanup operations failed', err); 908 }); 909 } 910 911 completeUploadJob(jobId, { 912 success: true, 913 uri: record.data.uri, 914 cid: record.data.cid, 915 fileCount, 916 siteName, 917 skippedFiles, 918 failedFiles, 919 uploadedCount: validUploadedFiles.length - failedFiles.length, 920 hasFailures: failedFiles.length > 0 921 }); 922 923 console.log('=== UPLOAD FILES COMPLETE ==='); 924 } catch (error) { 925 console.error('=== UPLOAD ERROR ==='); 926 console.error('Error details:', error); 927 logger.error('Upload error', error); 928 failUploadJob(jobId, error instanceof Error ? error.message : 'Unknown error'); 929 } 930} 931 932export const wispRoutes = (client: NodeOAuthClient, cookieSecret: string) => 933 new Elysia({ 934 prefix: '/wisp', 935 cookie: { 936 secrets: cookieSecret, 937 sign: ['did'] 938 } 939 }) 940 .derive(async ({ cookie }) => { 941 const auth = await requireAuth(client, cookie) 942 return { auth } 943 }) 944 .get( 945 '/upload-progress/:jobId', 946 async ({ params: { jobId }, auth, set }) => { 947 const job = getUploadJob(jobId); 948 949 if (!job) { 950 set.status = 404; 951 return { error: 'Job not found' }; 952 } 953 954 // Verify job belongs to authenticated user 955 if (job.did !== auth.did) { 956 set.status = 403; 957 return { error: 'Unauthorized' }; 958 } 959 960 // Set up SSE headers 961 set.headers = { 962 'Content-Type': 'text/event-stream', 963 'Cache-Control': 'no-cache', 964 'Connection': 'keep-alive' 965 }; 966 967 const stream = new ReadableStream({ 968 start(controller) { 969 const encoder = new TextEncoder(); 970 971 // Send initial state 972 const sendEvent = (event: string, data: any) => { 973 try { 974 const message = `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`; 975 controller.enqueue(encoder.encode(message)); 976 } catch (err) { 977 // Controller closed, ignore 978 } 979 }; 980 981 // Send keepalive comment every 15 seconds to prevent timeout 982 const keepaliveInterval = setInterval(() => { 983 try { 984 controller.enqueue(encoder.encode(': keepalive\n\n')); 985 } catch (err) { 986 // Controller closed, stop sending keepalives 987 clearInterval(keepaliveInterval); 988 } 989 }, 15000); 990 991 // Send current job state immediately 992 sendEvent('progress', { 993 status: job.status, 994 progress: job.progress, 995 result: job.result, 996 error: job.error 997 }); 998 999 // If job is already completed or failed, close the stream 1000 if (job.status === 'completed' || job.status === 'failed') { 1001 clearInterval(keepaliveInterval); 1002 controller.close(); 1003 return; 1004 } 1005 1006 // Listen for updates 1007 const cleanup = addJobListener(jobId, (event, data) => { 1008 sendEvent(event, data); 1009 1010 // Close stream after done or error event 1011 if (event === 'done' || event === 'error') { 1012 clearInterval(keepaliveInterval); 1013 setTimeout(() => { 1014 try { 1015 controller.close(); 1016 } catch (err) { 1017 // Already closed 1018 } 1019 }, 100); 1020 } 1021 }); 1022 1023 // Cleanup on disconnect 1024 return () => { 1025 clearInterval(keepaliveInterval); 1026 cleanup(); 1027 }; 1028 } 1029 }); 1030 1031 return new Response(stream); 1032 } 1033 ) 1034 .post( 1035 '/upload-files', 1036 async ({ body, auth }) => { 1037 const { siteName, files } = body as { 1038 siteName: string; 1039 files: File | File[] 1040 }; 1041 1042 console.log('=== UPLOAD FILES START ==='); 1043 console.log('Site name:', siteName); 1044 console.log('Files received:', Array.isArray(files) ? files.length : 'single file'); 1045 1046 try { 1047 if (!siteName) { 1048 throw new Error('Site name is required') 1049 } 1050 1051 if (!isValidSiteName(siteName)) { 1052 throw new Error('Invalid site name: must be 1-512 characters and contain only alphanumeric, dots, dashes, underscores, tildes, and colons') 1053 } 1054 1055 // Check if files were provided 1056 const hasFiles = files && (Array.isArray(files) ? files.length > 0 : !!files); 1057 1058 if (!hasFiles) { 1059 // Handle empty upload synchronously (fast operation) 1060 const agent = new Agent((url, init) => auth.session.fetchHandler(url, init)) 1061 1062 const emptyManifest = { 1063 $type: 'place.wisp.fs', 1064 site: siteName, 1065 root: { 1066 type: 'directory', 1067 entries: [] 1068 }, 1069 fileCount: 0, 1070 createdAt: new Date().toISOString() 1071 }; 1072 1073 const validationResult = validateRecord(emptyManifest); 1074 if (!validationResult.success) { 1075 throw new Error(`Invalid manifest: ${validationResult.error?.message || 'Validation failed'}`); 1076 } 1077 1078 const rkey = siteName; 1079 1080 const record = await agent.com.atproto.repo.putRecord({ 1081 repo: auth.did, 1082 collection: 'place.wisp.fs', 1083 rkey: rkey, 1084 record: emptyManifest 1085 }); 1086 1087 await upsertSite(auth.did, rkey, siteName); 1088 1089 return { 1090 success: true, 1091 uri: record.data.uri, 1092 cid: record.data.cid, 1093 fileCount: 0, 1094 siteName 1095 }; 1096 } 1097 1098 // For file uploads, create a job and process in background 1099 const fileArray = Array.isArray(files) ? files : [files]; 1100 const jobId = createUploadJob(auth.did, siteName, fileArray.length); 1101 1102 // Track upload speeds to estimate progress 1103 const uploadStats = { 1104 speeds: [] as number[], // MB/s from completed uploads 1105 getAverageSpeed(): number { 1106 if (this.speeds.length === 0) return 3; // Default 3 MB/s 1107 const sum = this.speeds.reduce((a, b) => a + b, 0); 1108 return sum / this.speeds.length; 1109 } 1110 }; 1111 1112 // Create agent with OAuth session and upload progress monitoring 1113 const wrappedFetchHandler = async (url: string, init?: RequestInit) => { 1114 // Check if this is an uploadBlob request with a body 1115 if (url.includes('uploadBlob') && init?.body) { 1116 const originalBody = init.body; 1117 const bodySize = originalBody instanceof Uint8Array ? originalBody.length : 1118 originalBody instanceof ArrayBuffer ? originalBody.byteLength : 1119 typeof originalBody === 'string' ? new TextEncoder().encode(originalBody).length : 0; 1120 1121 const startTime = Date.now(); 1122 1123 if (bodySize > 10 * 1024 * 1024) { // Files over 10MB 1124 const sizeMB = (bodySize / 1024 / 1024).toFixed(1); 1125 const avgSpeed = uploadStats.getAverageSpeed(); 1126 const estimatedDuration = (bodySize / 1024 / 1024) / avgSpeed; 1127 1128 console.log(`[Upload Progress] Starting upload of ${sizeMB}MB file`); 1129 console.log(`[Upload Stats] Measured speeds from last ${uploadStats.speeds.length} files:`, uploadStats.speeds.map(s => s.toFixed(2) + ' MB/s').join(', ')); 1130 console.log(`[Upload Stats] Average speed: ${avgSpeed.toFixed(2)} MB/s, estimated duration: ${estimatedDuration.toFixed(0)}s`); 1131 1132 // Log estimated progress every 5 seconds 1133 const progressInterval = setInterval(() => { 1134 const elapsed = (Date.now() - startTime) / 1000; 1135 const estimatedPercent = Math.min(95, Math.round((elapsed / estimatedDuration) * 100)); 1136 const estimatedMB = Math.min(bodySize / 1024 / 1024, elapsed * avgSpeed).toFixed(1); 1137 console.log(`[Upload Progress] ~${estimatedPercent}% (~${estimatedMB}/${sizeMB}MB) - ${elapsed.toFixed(0)}s elapsed`); 1138 }, 5000); 1139 1140 try { 1141 const result = await auth.session.fetchHandler(url, init); 1142 clearInterval(progressInterval); 1143 const totalTime = (Date.now() - startTime) / 1000; 1144 const actualSpeed = (bodySize / 1024 / 1024) / totalTime; 1145 uploadStats.speeds.push(actualSpeed); 1146 // Keep only last 10 uploads for rolling average 1147 if (uploadStats.speeds.length > 10) uploadStats.speeds.shift(); 1148 console.log(`[Upload Progress] ✅ Completed ${sizeMB}MB in ${totalTime.toFixed(1)}s (${actualSpeed.toFixed(1)} MB/s)`); 1149 return result; 1150 } catch (err) { 1151 clearInterval(progressInterval); 1152 const elapsed = (Date.now() - startTime) / 1000; 1153 console.error(`[Upload Progress] ❌ Upload failed after ${elapsed.toFixed(1)}s`); 1154 throw err; 1155 } 1156 } else { 1157 // Track small files too for speed calculation 1158 try { 1159 const result = await auth.session.fetchHandler(url, init); 1160 const totalTime = (Date.now() - startTime) / 1000; 1161 if (totalTime > 0.5) { // Only track if > 0.5s 1162 const actualSpeed = (bodySize / 1024 / 1024) / totalTime; 1163 uploadStats.speeds.push(actualSpeed); 1164 if (uploadStats.speeds.length > 10) uploadStats.speeds.shift(); 1165 console.log(`[Upload Stats] Small file: ${(bodySize / 1024).toFixed(1)}KB in ${totalTime.toFixed(2)}s = ${actualSpeed.toFixed(2)} MB/s`); 1166 } 1167 return result; 1168 } catch (err) { 1169 throw err; 1170 } 1171 } 1172 } 1173 1174 // Normal request 1175 return auth.session.fetchHandler(url, init); 1176 }; 1177 1178 const agent = new Agent(wrappedFetchHandler) 1179 console.log('Agent created for DID:', auth.did); 1180 console.log('Created upload job:', jobId); 1181 1182 // Start background processing (don't await) 1183 processUploadInBackground(jobId, agent, auth.did, siteName, fileArray).catch(err => { 1184 console.error('Background upload process failed:', err); 1185 logger.error('Background upload process failed', err); 1186 }); 1187 1188 // Return immediately with job ID 1189 return { 1190 success: true, 1191 jobId, 1192 message: 'Upload started. Connect to /wisp/upload-progress/' + jobId + ' for progress updates.' 1193 }; 1194 } catch (error) { 1195 console.error('=== UPLOAD ERROR ==='); 1196 console.error('Error details:', error); 1197 logger.error('Upload error', error); 1198 throw new Error(`Failed to upload files: ${error instanceof Error ? error.message : 'Unknown error'}`); 1199 } 1200 } 1201 )