Monorepo for wisp.place. A static site hosting service built on top of the AT Protocol. wisp.place
1import { Elysia } from 'elysia' 2import { requireAuth, type AuthenticatedContext } from '../lib/wisp-auth' 3import { NodeOAuthClient } from '@atproto/oauth-client-node' 4import { Agent } from '@atproto/api' 5import { TID } from '@atproto/common-web' 6import { 7 type UploadedFile, 8 type FileUploadResult, 9 processUploadedFiles, 10 createManifest, 11 updateFileBlobs, 12 shouldCompressFile, 13 compressFile, 14 computeCID, 15 extractBlobMap, 16 extractSubfsUris, 17 findLargeDirectories, 18 replaceDirectoryWithSubfs, 19 estimateDirectorySize 20} from '../lib/wisp-utils' 21import { upsertSite } from '../lib/db' 22import { logger } from '../lib/observability' 23import { validateRecord } from '../lexicons/types/place/wisp/fs' 24import { validateRecord as validateSubfsRecord } from '../lexicons/types/place/wisp/subfs' 25import { MAX_SITE_SIZE, MAX_FILE_SIZE, MAX_FILE_COUNT } from '../lib/constants' 26import { 27 createUploadJob, 28 getUploadJob, 29 updateJobProgress, 30 completeUploadJob, 31 failUploadJob, 32 addJobListener 33} from '../lib/upload-jobs' 34 35function isValidSiteName(siteName: string): boolean { 36 if (!siteName || typeof siteName !== 'string') return false; 37 38 // Length check (AT Protocol rkey limit) 39 if (siteName.length < 1 || siteName.length > 512) return false; 40 41 // Check for path traversal 42 if (siteName === '.' || siteName === '..') return false; 43 if (siteName.includes('/') || siteName.includes('\\')) return false; 44 if (siteName.includes('\0')) return false; 45 46 // AT Protocol rkey format: alphanumeric, dots, dashes, underscores, tildes, colons 47 // Based on NSID format rules 48 const validRkeyPattern = /^[a-zA-Z0-9._~:-]+$/; 49 if (!validRkeyPattern.test(siteName)) return false; 50 51 return true; 52} 53 54async function processUploadInBackground( 55 jobId: string, 56 agent: Agent, 57 did: string, 58 siteName: string, 59 fileArray: File[] 60): Promise<void> { 61 try { 62 // Try to fetch existing record to enable incremental updates 63 let existingBlobMap = new Map<string, { blobRef: any; cid: string }>(); 64 let oldSubfsUris: Array<{ uri: string; path: string }> = []; 65 console.log('Attempting to fetch existing record...'); 66 updateJobProgress(jobId, { phase: 'validating' }); 67 68 try { 69 const rkey = siteName; 70 const existingRecord = await agent.com.atproto.repo.getRecord({ 71 repo: did, 72 collection: 'place.wisp.fs', 73 rkey: rkey 74 }); 75 console.log('Existing record found!'); 76 77 if (existingRecord.data.value && typeof existingRecord.data.value === 'object' && 'root' in existingRecord.data.value) { 78 const manifest = existingRecord.data.value as any; 79 80 // Extract blob map from main record 81 existingBlobMap = extractBlobMap(manifest.root); 82 console.log(`Found existing manifest with ${existingBlobMap.size} files in main record`); 83 84 // Extract subfs URIs with their mount paths from main record 85 const subfsUris = extractSubfsUris(manifest.root); 86 oldSubfsUris = subfsUris; // Save for cleanup later 87 88 if (subfsUris.length > 0) { 89 console.log(`Found ${subfsUris.length} subfs records, fetching in parallel...`); 90 logger.info(`Fetching ${subfsUris.length} subfs records for blob reuse`); 91 92 // Fetch all subfs records in parallel 93 const subfsRecords = await Promise.all( 94 subfsUris.map(async ({ uri, path }) => { 95 try { 96 // Parse URI: at://did/collection/rkey 97 const parts = uri.replace('at://', '').split('/'); 98 const subDid = parts[0]; 99 const collection = parts[1]; 100 const subRkey = parts[2]; 101 102 const record = await agent.com.atproto.repo.getRecord({ 103 repo: subDid, 104 collection: collection, 105 rkey: subRkey 106 }); 107 108 return { record: record.data.value as any, mountPath: path }; 109 } catch (err: any) { 110 logger.warn(`Failed to fetch subfs record ${uri}: ${err?.message}`, err); 111 return null; 112 } 113 }) 114 ); 115 116 // Merge blob maps from all subfs records 117 let totalSubfsBlobs = 0; 118 for (const subfsData of subfsRecords) { 119 if (subfsData && subfsData.record && 'root' in subfsData.record) { 120 // Extract blobs with the correct mount path prefix 121 const subfsMap = extractBlobMap(subfsData.record.root, subfsData.mountPath); 122 subfsMap.forEach((value, key) => { 123 existingBlobMap.set(key, value); 124 totalSubfsBlobs++; 125 }); 126 } 127 } 128 129 console.log(`Merged ${totalSubfsBlobs} files from ${subfsUris.length} subfs records`); 130 logger.info(`Total blob map: ${existingBlobMap.size} files (main + subfs)`); 131 } 132 133 console.log(`Total existing blobs for reuse: ${existingBlobMap.size} files`); 134 logger.info(`Found existing manifest with ${existingBlobMap.size} files for incremental update`); 135 } 136 } catch (error: any) { 137 console.log('No existing record found or error:', error?.message || error); 138 if (error?.status !== 400 && error?.error !== 'RecordNotFound') { 139 logger.warn('Failed to fetch existing record, proceeding with full upload', error); 140 } 141 } 142 143 // Convert File objects to UploadedFile format 144 const uploadedFiles: UploadedFile[] = []; 145 const skippedFiles: Array<{ name: string; reason: string }> = []; 146 147 console.log('Processing files, count:', fileArray.length); 148 updateJobProgress(jobId, { phase: 'compressing' }); 149 150 for (let i = 0; i < fileArray.length; i++) { 151 const file = fileArray[i]; 152 153 // Skip undefined/null files 154 if (!file || !file.name) { 155 console.log(`Skipping undefined file at index ${i}`); 156 skippedFiles.push({ 157 name: `[undefined file at index ${i}]`, 158 reason: 'Invalid file object' 159 }); 160 continue; 161 } 162 163 console.log(`Processing file ${i + 1}/${fileArray.length}:`, file.name, file.size, 'bytes'); 164 updateJobProgress(jobId, { 165 filesProcessed: i + 1, 166 currentFile: file.name 167 }); 168 169 // Skip .git directory files 170 const normalizedPath = file.name.replace(/^[^\/]*\//, ''); 171 if (normalizedPath.startsWith('.git/') || normalizedPath === '.git') { 172 console.log(`Skipping .git file: ${file.name}`); 173 skippedFiles.push({ 174 name: file.name, 175 reason: '.git directory excluded' 176 }); 177 continue; 178 } 179 180 // Skip files that are too large 181 const maxSize = MAX_FILE_SIZE; 182 if (file.size > maxSize) { 183 skippedFiles.push({ 184 name: file.name, 185 reason: `file too large (${(file.size / 1024 / 1024).toFixed(2)}MB, max 100MB)` 186 }); 187 continue; 188 } 189 190 const arrayBuffer = await file.arrayBuffer(); 191 const originalContent = Buffer.from(arrayBuffer); 192 const originalMimeType = file.type || 'application/octet-stream'; 193 194 // Determine if file should be compressed (pass filename to exclude _redirects) 195 const shouldCompress = shouldCompressFile(originalMimeType, normalizedPath); 196 197 // Text files (HTML/CSS/JS) need base64 encoding to prevent PDS content sniffing 198 // Audio files just need compression without base64 199 const needsBase64 = 200 originalMimeType.startsWith('text/') || 201 originalMimeType.startsWith('application/json') || 202 originalMimeType.startsWith('application/xml') || 203 originalMimeType === 'image/svg+xml'; 204 205 let finalContent: Buffer; 206 let compressed = false; 207 let base64Encoded = false; 208 209 if (shouldCompress) { 210 const compressedContent = compressFile(originalContent); 211 compressed = true; 212 213 if (needsBase64) { 214 // Text files: compress AND base64 encode 215 finalContent = Buffer.from(compressedContent.toString('base64'), 'binary'); 216 base64Encoded = true; 217 const compressionRatio = (compressedContent.length / originalContent.length * 100).toFixed(1); 218 console.log(`Compressing+base64 ${file.name}: ${originalContent.length} -> ${compressedContent.length} bytes (${compressionRatio}%), base64: ${finalContent.length} bytes`); 219 logger.info(`Compressing+base64 ${file.name}: ${originalContent.length} -> ${compressedContent.length} bytes (${compressionRatio}%), base64: ${finalContent.length} bytes`); 220 } else { 221 // Audio files: just compress, no base64 222 finalContent = compressedContent; 223 const compressionRatio = (compressedContent.length / originalContent.length * 100).toFixed(1); 224 console.log(`Compressing ${file.name}: ${originalContent.length} -> ${compressedContent.length} bytes (${compressionRatio}%)`); 225 logger.info(`Compressing ${file.name}: ${originalContent.length} -> ${compressedContent.length} bytes (${compressionRatio}%)`); 226 } 227 } else { 228 // Binary files: upload directly 229 finalContent = originalContent; 230 console.log(`Uploading ${file.name} directly: ${originalContent.length} bytes (no compression)`); 231 logger.info(`Uploading ${file.name} directly: ${originalContent.length} bytes (binary)`); 232 } 233 234 uploadedFiles.push({ 235 name: file.name, 236 content: finalContent, 237 mimeType: originalMimeType, 238 size: finalContent.length, 239 compressed, 240 base64Encoded, 241 originalMimeType 242 }); 243 } 244 245 // Update total file count after filtering (important for progress tracking) 246 updateJobProgress(jobId, { 247 totalFiles: uploadedFiles.length 248 }); 249 250 // Check total size limit 251 const totalSize = uploadedFiles.reduce((sum, file) => sum + file.size, 0); 252 const maxTotalSize = MAX_SITE_SIZE; 253 254 if (totalSize > maxTotalSize) { 255 throw new Error(`Total upload size ${(totalSize / 1024 / 1024).toFixed(2)}MB exceeds 300MB limit`); 256 } 257 258 // Check file count limit 259 if (uploadedFiles.length > MAX_FILE_COUNT) { 260 throw new Error(`File count ${uploadedFiles.length} exceeds ${MAX_FILE_COUNT} files limit`); 261 } 262 263 console.log(`After filtering: ${uploadedFiles.length} files to process (${skippedFiles.length} skipped)`); 264 265 if (uploadedFiles.length === 0) { 266 // Create empty manifest 267 const emptyManifest = { 268 $type: 'place.wisp.fs', 269 site: siteName, 270 root: { 271 type: 'directory', 272 entries: [] 273 }, 274 fileCount: 0, 275 createdAt: new Date().toISOString() 276 }; 277 278 const validationResult = validateRecord(emptyManifest); 279 if (!validationResult.success) { 280 throw new Error(`Invalid manifest: ${validationResult.error?.message || 'Validation failed'}`); 281 } 282 283 const rkey = siteName; 284 updateJobProgress(jobId, { phase: 'finalizing' }); 285 286 const record = await agent.com.atproto.repo.putRecord({ 287 repo: did, 288 collection: 'place.wisp.fs', 289 rkey: rkey, 290 record: emptyManifest 291 }); 292 293 await upsertSite(did, rkey, siteName); 294 295 completeUploadJob(jobId, { 296 success: true, 297 uri: record.data.uri, 298 cid: record.data.cid, 299 fileCount: 0, 300 siteName, 301 skippedFiles 302 }); 303 return; 304 } 305 306 // Process files into directory structure 307 console.log('Processing uploaded files into directory structure...'); 308 const validUploadedFiles = uploadedFiles.filter((f, i) => { 309 if (!f || !f.name || !f.content) { 310 console.error(`Filtering out invalid file at index ${i}`); 311 return false; 312 } 313 return true; 314 }); 315 316 const { directory, fileCount } = processUploadedFiles(validUploadedFiles); 317 console.log('Directory structure created, file count:', fileCount); 318 319 // Upload files as blobs with retry logic for DPoP nonce conflicts 320 console.log('Starting blob upload/reuse phase...'); 321 updateJobProgress(jobId, { phase: 'uploading' }); 322 323 // Helper function to upload blob with exponential backoff retry and timeout 324 const uploadBlobWithRetry = async ( 325 agent: Agent, 326 content: Buffer, 327 mimeType: string, 328 fileName: string, 329 maxRetries = 5 330 ) => { 331 for (let attempt = 0; attempt < maxRetries; attempt++) { 332 const controller = new AbortController(); 333 const timeoutMs = 300000; // 5 minute timeout per upload 334 const timeoutId = setTimeout(() => controller.abort(), timeoutMs); 335 336 try { 337 console.log(`[File Upload] Starting upload attempt ${attempt + 1}/${maxRetries} for ${fileName} (${content.length} bytes, ${mimeType})`); 338 339 const result = await agent.com.atproto.repo.uploadBlob(content, { encoding: mimeType }); 340 clearTimeout(timeoutId); 341 console.log(`[File Upload] ✅ Successfully uploaded ${fileName} on attempt ${attempt + 1}`); 342 return result; 343 } catch (error: any) { 344 clearTimeout(timeoutId); 345 346 const isDPoPNonceError = 347 error?.message?.toLowerCase().includes('nonce') || 348 error?.message?.toLowerCase().includes('dpop') || 349 error?.status === 409; 350 351 const isTimeout = error?.name === 'AbortError' || error?.message === 'Upload timeout'; 352 const isRateLimited = error?.status === 429 || error?.message?.toLowerCase().includes('rate'); 353 const isRequestEntityTooLarge = error?.status === 419 || error?.status === 413; 354 355 // Special handling for 419/413 Request Entity Too Large errors 356 if (isRequestEntityTooLarge) { 357 const customError = new Error('Your PDS is not allowing uploads large enough to store your site. Please contact your PDS host. This could also possibly be a result of it being behind Cloudflare free tier.'); 358 (customError as any).status = 419; 359 throw customError; 360 } 361 362 // Retry on DPoP nonce conflicts, timeouts, or rate limits 363 if ((isDPoPNonceError || isTimeout || isRateLimited) && attempt < maxRetries - 1) { 364 let backoffMs: number; 365 if (isRateLimited) { 366 backoffMs = 2000 * Math.pow(2, attempt); // 2s, 4s, 8s, 16s for rate limits 367 } else if (isTimeout) { 368 backoffMs = 1000 * Math.pow(2, attempt); // 1s, 2s, 4s, 8s for timeouts 369 } else { 370 backoffMs = 100 * Math.pow(2, attempt); // 100ms, 200ms, 400ms for DPoP 371 } 372 373 const reason = isDPoPNonceError ? 'DPoP nonce conflict' : isTimeout ? 'timeout' : 'rate limit'; 374 logger.info(`[File Upload] 🔄 ${reason} for ${fileName}, retrying in ${backoffMs}ms (attempt ${attempt + 1}/${maxRetries})`); 375 console.log(`[File Upload] 🔄 ${reason} for ${fileName}, retrying in ${backoffMs}ms`); 376 await new Promise(resolve => setTimeout(resolve, backoffMs)); 377 continue; 378 } 379 380 // Log detailed error information before throwing 381 logger.error(`[File Upload] ❌ Upload failed for ${fileName} (size: ${content.length} bytes, mimeType: ${mimeType}, attempt: ${attempt + 1}/${maxRetries})`, { 382 error: error?.error || error?.message || 'Unknown error', 383 status: error?.status, 384 headers: error?.headers, 385 success: error?.success 386 }); 387 console.error(`[File Upload] ❌ Upload failed for ${fileName}:`, { 388 error: error?.error || error?.message || 'Unknown error', 389 status: error?.status, 390 size: content.length, 391 mimeType, 392 attempt: attempt + 1 393 }); 394 throw error; 395 } 396 } 397 throw new Error(`Failed to upload ${fileName} after ${maxRetries} attempts`); 398 }; 399 400 // Use sliding window concurrency for maximum throughput 401 const CONCURRENCY_LIMIT = 20; // Maximum concurrent uploads 402 const uploadedBlobs: Array<{ 403 result: FileUploadResult; 404 filePath: string; 405 sentMimeType: string; 406 returnedMimeType: string; 407 reused: boolean; 408 }> = []; 409 const failedFiles: Array<{ 410 name: string; 411 index: number; 412 error: string; 413 size: number; 414 }> = []; 415 416 // Track completed files count for accurate progress 417 let completedFilesCount = 0; 418 419 // Process file with sliding window concurrency 420 const processFile = async (file: UploadedFile, index: number) => { 421 try { 422 if (!file || !file.name) { 423 throw new Error(`Undefined file at index ${index}`); 424 } 425 426 const fileCID = computeCID(file.content); 427 const normalizedPath = file.name.replace(/^[^\/]*\//, ''); 428 const existingBlob = existingBlobMap.get(normalizedPath) || existingBlobMap.get(file.name); 429 430 if (existingBlob && existingBlob.cid === fileCID) { 431 logger.info(`[File Upload] ♻️ Reused: ${file.name} (unchanged, CID: ${fileCID})`); 432 const reusedCount = (getUploadJob(jobId)?.progress.filesReused || 0) + 1; 433 completedFilesCount++; 434 updateJobProgress(jobId, { 435 filesReused: reusedCount, 436 currentFile: `${completedFilesCount}/${validUploadedFiles.length}: ${file.name} (reused)` 437 }); 438 439 return { 440 result: { 441 hash: existingBlob.cid, 442 blobRef: existingBlob.blobRef, 443 ...(file.compressed && { 444 encoding: 'gzip' as const, 445 mimeType: file.originalMimeType || file.mimeType, 446 base64: file.base64Encoded || false 447 }) 448 }, 449 filePath: file.name, 450 sentMimeType: file.mimeType, 451 returnedMimeType: existingBlob.blobRef.mimeType, 452 reused: true 453 }; 454 } 455 456 const uploadMimeType = file.compressed || file.mimeType.startsWith('text/html') 457 ? 'application/octet-stream' 458 : file.mimeType; 459 460 const compressionInfo = file.compressed ? ' (gzipped)' : ''; 461 const fileSizeMB = (file.size / 1024 / 1024).toFixed(2); 462 logger.info(`[File Upload] ⬆️ Uploading: ${file.name} (${fileSizeMB}MB${compressionInfo})`); 463 464 const uploadResult = await uploadBlobWithRetry( 465 agent, 466 file.content, 467 uploadMimeType, 468 file.name 469 ); 470 471 const returnedBlobRef = uploadResult.data.blob; 472 const uploadedCount = (getUploadJob(jobId)?.progress.filesUploaded || 0) + 1; 473 completedFilesCount++; 474 updateJobProgress(jobId, { 475 filesUploaded: uploadedCount, 476 currentFile: `${completedFilesCount}/${validUploadedFiles.length}: ${file.name} (uploaded)` 477 }); 478 logger.info(`[File Upload] ✅ Uploaded: ${file.name} (CID: ${fileCID})`); 479 480 return { 481 result: { 482 hash: returnedBlobRef.ref.toString(), 483 blobRef: returnedBlobRef, 484 ...(file.compressed && { 485 encoding: 'gzip' as const, 486 mimeType: file.originalMimeType || file.mimeType, 487 base64: file.base64Encoded || false 488 }) 489 }, 490 filePath: file.name, 491 sentMimeType: file.mimeType, 492 returnedMimeType: returnedBlobRef.mimeType, 493 reused: false 494 }; 495 } catch (uploadError) { 496 const fileName = file?.name || 'unknown'; 497 const fileSize = file?.size || 0; 498 const errorMessage = uploadError instanceof Error ? uploadError.message : 'Unknown error'; 499 const errorDetails = { 500 fileName, 501 fileSize, 502 index, 503 error: errorMessage, 504 stack: uploadError instanceof Error ? uploadError.stack : undefined 505 }; 506 logger.error(`Upload failed for file: ${fileName} (${fileSize} bytes) at index ${index}`, errorDetails); 507 console.error(`Upload failed for file: ${fileName} (${fileSize} bytes) at index ${index}`, errorDetails); 508 509 completedFilesCount++; 510 updateJobProgress(jobId, { 511 currentFile: `${completedFilesCount}/${validUploadedFiles.length}: ${fileName} (failed)` 512 }); 513 514 // Track failed file but don't throw - continue with other files 515 failedFiles.push({ 516 name: fileName, 517 index, 518 error: errorMessage, 519 size: fileSize 520 }); 521 522 return null; // Return null to indicate failure 523 } 524 }; 525 526 // Sliding window concurrency control 527 const processWithConcurrency = async () => { 528 const results: any[] = []; 529 let fileIndex = 0; 530 const executing = new Map<Promise<void>, { index: number; name: string }>(); 531 532 for (const file of validUploadedFiles) { 533 const currentIndex = fileIndex++; 534 535 const promise = processFile(file, currentIndex) 536 .then(result => { 537 results[currentIndex] = result; 538 }) 539 .catch(error => { 540 // This shouldn't happen since processFile catches errors, but just in case 541 logger.error(`Unexpected error processing file at index ${currentIndex}`, error); 542 results[currentIndex] = null; 543 }) 544 .finally(() => { 545 executing.delete(promise); 546 }); 547 548 executing.set(promise, { index: currentIndex, name: file.name }); 549 550 if (executing.size >= CONCURRENCY_LIMIT) { 551 await Promise.race(executing.keys()); 552 } 553 } 554 555 // Wait for remaining uploads 556 await Promise.all(executing.keys()); 557 console.log(`\n✅ Upload complete: ${completedFilesCount}/${validUploadedFiles.length} files processed\n`); 558 return results.filter(r => r !== undefined && r !== null); // Filter out null (failed) and undefined entries 559 }; 560 561 const allResults = await processWithConcurrency(); 562 uploadedBlobs.push(...allResults); 563 564 const currentReused = uploadedBlobs.filter(b => b.reused).length; 565 const currentUploaded = uploadedBlobs.filter(b => !b.reused).length; 566 const successfulCount = uploadedBlobs.length; 567 const failedCount = failedFiles.length; 568 569 logger.info(`[File Upload] 🎉 Upload complete → ${successfulCount}/${validUploadedFiles.length} files succeeded (${currentUploaded} uploaded, ${currentReused} reused), ${failedCount} failed`); 570 571 if (failedCount > 0) { 572 logger.warn(`[File Upload] ⚠️ Failed files:`, failedFiles); 573 console.warn(`[File Upload] ⚠️ ${failedCount} files failed to upload:`, failedFiles.map(f => f.name).join(', ')); 574 } 575 576 const reusedCount = uploadedBlobs.filter(b => b.reused).length; 577 const uploadedCount = uploadedBlobs.filter(b => !b.reused).length; 578 logger.info(`[File Upload] 🎉 Upload phase complete! Total: ${successfulCount} files (${uploadedCount} uploaded, ${reusedCount} reused)`); 579 580 const uploadResults: FileUploadResult[] = uploadedBlobs.map(blob => blob.result); 581 const filePaths: string[] = uploadedBlobs.map(blob => blob.filePath); 582 583 // Update directory with file blobs (only for successfully uploaded files) 584 console.log('Updating directory with blob references...'); 585 updateJobProgress(jobId, { phase: 'creating_manifest' }); 586 587 // Create a set of successfully uploaded paths for quick lookup 588 const successfulPaths = new Set(filePaths.map(path => path.replace(/^[^\/]*\//, ''))); 589 590 const updatedDirectory = updateFileBlobs(directory, uploadResults, filePaths, '', successfulPaths); 591 592 // Calculate actual file count (only successfully uploaded files) 593 const actualFileCount = uploadedBlobs.length; 594 595 // Check if we need to split into subfs records 596 // Split proactively if we have lots of files to avoid hitting manifest size limits 597 const MAX_MANIFEST_SIZE = 140 * 1024; // 140KB to be safe (PDS limit is 150KB) 598 const FILE_COUNT_THRESHOLD = 250; // Start splitting at this many files 599 const TARGET_FILE_COUNT = 200; // Try to keep main manifest under this many files 600 const subfsRecords: Array<{ uri: string; path: string }> = []; 601 let workingDirectory = updatedDirectory; 602 let currentFileCount = actualFileCount; 603 604 // Create initial manifest to check size 605 let manifest = createManifest(siteName, workingDirectory, actualFileCount); 606 let manifestSize = JSON.stringify(manifest).length; 607 608 // Split if we have lots of files OR if manifest is already too large 609 if (actualFileCount >= FILE_COUNT_THRESHOLD || manifestSize > MAX_MANIFEST_SIZE) { 610 console.log(`⚠️ Large site detected (${actualFileCount} files, ${(manifestSize / 1024).toFixed(1)}KB), splitting into subfs records...`); 611 logger.info(`Large site with ${actualFileCount} files, splitting into subfs records`); 612 613 // Keep splitting until manifest fits under limits (both size and file count) 614 let attempts = 0; 615 const MAX_ATTEMPTS = 100; // Allow many splits for very large sites 616 617 while ((manifestSize > MAX_MANIFEST_SIZE || currentFileCount > TARGET_FILE_COUNT) && attempts < MAX_ATTEMPTS) { 618 attempts++; 619 620 // Find all directories sorted by size (largest first) 621 const directories = findLargeDirectories(workingDirectory); 622 directories.sort((a, b) => b.size - a.size); 623 624 // Check if we can split subdirectories or need to split flat files 625 if (directories.length > 0) { 626 // Split the largest subdirectory 627 const largestDir = directories[0]; 628 console.log(` Split #${attempts}: ${largestDir.path} (${largestDir.fileCount} files, ${(largestDir.size / 1024).toFixed(1)}KB)`); 629 630 // Create a subfs record for this directory 631 const subfsRkey = TID.nextStr(); 632 const subfsManifest = { 633 $type: 'place.wisp.subfs' as const, 634 root: largestDir.directory, 635 fileCount: largestDir.fileCount, 636 createdAt: new Date().toISOString() 637 }; 638 639 // Validate subfs record 640 const subfsValidation = validateSubfsRecord(subfsManifest); 641 if (!subfsValidation.success) { 642 throw new Error(`Invalid subfs manifest: ${subfsValidation.error?.message || 'Validation failed'}`); 643 } 644 645 // Upload subfs record to PDS 646 const subfsRecord = await agent.com.atproto.repo.putRecord({ 647 repo: did, 648 collection: 'place.wisp.subfs', 649 rkey: subfsRkey, 650 record: subfsManifest 651 }); 652 653 const subfsUri = subfsRecord.data.uri; 654 subfsRecords.push({ uri: subfsUri, path: largestDir.path }); 655 console.log(` ✅ Created subfs: ${subfsUri}`); 656 logger.info(`Created subfs record for ${largestDir.path}: ${subfsUri}`); 657 658 // Replace directory with subfs node in the main tree 659 workingDirectory = replaceDirectoryWithSubfs(workingDirectory, largestDir.path, subfsUri); 660 currentFileCount -= largestDir.fileCount; 661 } else { 662 // No subdirectories - split flat files at root level 663 const rootFiles = workingDirectory.entries.filter(e => 'type' in e.node && e.node.type === 'file'); 664 665 if (rootFiles.length === 0) { 666 throw new Error( 667 `Cannot split manifest further - no files or directories available. ` + 668 `Current: ${currentFileCount} files, ${(manifestSize / 1024).toFixed(1)}KB.` 669 ); 670 } 671 672 // Take a chunk of files (aim for ~100 files per chunk) 673 const CHUNK_SIZE = 100; 674 const chunkFiles = rootFiles.slice(0, Math.min(CHUNK_SIZE, rootFiles.length)); 675 console.log(` Split #${attempts}: flat root (${chunkFiles.length} files)`); 676 677 // Create a directory with just these files 678 const chunkDirectory: Directory = { 679 $type: 'place.wisp.fs#directory' as const, 680 type: 'directory' as const, 681 entries: chunkFiles 682 }; 683 684 // Create subfs record for this chunk 685 const subfsRkey = TID.nextStr(); 686 const subfsManifest = { 687 $type: 'place.wisp.subfs' as const, 688 root: chunkDirectory, 689 fileCount: chunkFiles.length, 690 createdAt: new Date().toISOString() 691 }; 692 693 // Validate subfs record 694 const subfsValidation = validateSubfsRecord(subfsManifest); 695 if (!subfsValidation.success) { 696 throw new Error(`Invalid subfs manifest: ${subfsValidation.error?.message || 'Validation failed'}`); 697 } 698 699 // Upload subfs record to PDS 700 const subfsRecord = await agent.com.atproto.repo.putRecord({ 701 repo: did, 702 collection: 'place.wisp.subfs', 703 rkey: subfsRkey, 704 record: subfsManifest 705 }); 706 707 const subfsUri = subfsRecord.data.uri; 708 console.log(` ✅ Created flat subfs: ${subfsUri}`); 709 logger.info(`Created flat subfs record with ${chunkFiles.length} files: ${subfsUri}`); 710 711 // Remove these files from the working directory and add a subfs entry 712 const remainingEntries = workingDirectory.entries.filter( 713 e => !chunkFiles.some(cf => cf.name === e.name) 714 ); 715 716 // Add subfs entry (will be merged flat when expanded) 717 remainingEntries.push({ 718 name: `__subfs_${attempts}`, // Placeholder name, will be merged away 719 node: { 720 $type: 'place.wisp.fs#subfs' as const, 721 type: 'subfs' as const, 722 subject: subfsUri, 723 flat: true // Merge entries directly into parent (default, but explicit for clarity) 724 } 725 }); 726 727 workingDirectory = { 728 $type: 'place.wisp.fs#directory' as const, 729 type: 'directory' as const, 730 entries: remainingEntries 731 }; 732 733 subfsRecords.push({ uri: subfsUri, path: `__subfs_${attempts}` }); 734 currentFileCount -= chunkFiles.length; 735 } 736 737 // Recreate manifest and check new size 738 manifest = createManifest(siteName, workingDirectory, currentFileCount); 739 manifestSize = JSON.stringify(manifest).length; 740 const newSizeKB = (manifestSize / 1024).toFixed(1); 741 console.log(` → Manifest now ${newSizeKB}KB with ${currentFileCount} files (${subfsRecords.length} subfs total)`); 742 743 // Check if we're under both limits now 744 if (manifestSize <= MAX_MANIFEST_SIZE && currentFileCount <= TARGET_FILE_COUNT) { 745 console.log(` ✅ Manifest fits! (${currentFileCount} files, ${newSizeKB}KB)`); 746 break; 747 } 748 } 749 750 if (manifestSize > MAX_MANIFEST_SIZE || currentFileCount > TARGET_FILE_COUNT) { 751 throw new Error( 752 `Failed to fit manifest after splitting ${attempts} directories. ` + 753 `Current: ${currentFileCount} files, ${(manifestSize / 1024).toFixed(1)}KB. ` + 754 `This should never happen - please report this issue.` 755 ); 756 } 757 758 console.log(`✅ Split complete: ${subfsRecords.length} subfs records, ${currentFileCount} files in main, ${(manifestSize / 1024).toFixed(1)}KB manifest`); 759 logger.info(`Split into ${subfsRecords.length} subfs records, ${currentFileCount} files remaining in main tree`); 760 } else { 761 const manifestSizeKB = (manifestSize / 1024).toFixed(1); 762 console.log(`Manifest created (${fileCount} files, ${manifestSizeKB}KB JSON) - no splitting needed`); 763 } 764 765 const rkey = siteName; 766 updateJobProgress(jobId, { phase: 'finalizing' }); 767 768 console.log('Putting record to PDS with rkey:', rkey); 769 const record = await agent.com.atproto.repo.putRecord({ 770 repo: did, 771 collection: 'place.wisp.fs', 772 rkey: rkey, 773 record: manifest 774 }); 775 console.log('Record successfully created on PDS:', record.data.uri); 776 777 // Store site in database cache 778 await upsertSite(did, rkey, siteName); 779 780 // Clean up old subfs records if we had any 781 if (oldSubfsUris.length > 0) { 782 console.log(`Cleaning up ${oldSubfsUris.length} old subfs records...`); 783 logger.info(`Cleaning up ${oldSubfsUris.length} old subfs records`); 784 785 // Delete old subfs records in parallel (don't wait for completion) 786 Promise.all( 787 oldSubfsUris.map(async ({ uri }) => { 788 try { 789 // Parse URI: at://did/collection/rkey 790 const parts = uri.replace('at://', '').split('/'); 791 const subRkey = parts[2]; 792 793 await agent.com.atproto.repo.deleteRecord({ 794 repo: did, 795 collection: 'place.wisp.subfs', 796 rkey: subRkey 797 }); 798 799 console.log(` 🗑️ Deleted old subfs: ${uri}`); 800 logger.info(`Deleted old subfs record: ${uri}`); 801 } catch (err: any) { 802 // Don't fail the whole upload if cleanup fails 803 console.warn(`Failed to delete old subfs ${uri}:`, err?.message); 804 logger.warn(`Failed to delete old subfs ${uri}`, err); 805 } 806 }) 807 ).catch(err => { 808 // Log but don't fail if cleanup fails 809 logger.warn('Some subfs cleanup operations failed', err); 810 }); 811 } 812 813 completeUploadJob(jobId, { 814 success: true, 815 uri: record.data.uri, 816 cid: record.data.cid, 817 fileCount, 818 siteName, 819 skippedFiles, 820 failedFiles, 821 uploadedCount: validUploadedFiles.length - failedFiles.length, 822 hasFailures: failedFiles.length > 0 823 }); 824 825 console.log('=== UPLOAD FILES COMPLETE ==='); 826 } catch (error) { 827 console.error('=== UPLOAD ERROR ==='); 828 console.error('Error details:', error); 829 logger.error('Upload error', error); 830 failUploadJob(jobId, error instanceof Error ? error.message : 'Unknown error'); 831 } 832} 833 834export const wispRoutes = (client: NodeOAuthClient, cookieSecret: string) => 835 new Elysia({ 836 prefix: '/wisp', 837 cookie: { 838 secrets: cookieSecret, 839 sign: ['did'] 840 } 841 }) 842 .derive(async ({ cookie }) => { 843 const auth = await requireAuth(client, cookie) 844 return { auth } 845 }) 846 .get( 847 '/upload-progress/:jobId', 848 async ({ params: { jobId }, auth, set }) => { 849 const job = getUploadJob(jobId); 850 851 if (!job) { 852 set.status = 404; 853 return { error: 'Job not found' }; 854 } 855 856 // Verify job belongs to authenticated user 857 if (job.did !== auth.did) { 858 set.status = 403; 859 return { error: 'Unauthorized' }; 860 } 861 862 // Set up SSE headers 863 set.headers = { 864 'Content-Type': 'text/event-stream', 865 'Cache-Control': 'no-cache', 866 'Connection': 'keep-alive' 867 }; 868 869 const stream = new ReadableStream({ 870 start(controller) { 871 const encoder = new TextEncoder(); 872 873 // Send initial state 874 const sendEvent = (event: string, data: any) => { 875 try { 876 const message = `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`; 877 controller.enqueue(encoder.encode(message)); 878 } catch (err) { 879 // Controller closed, ignore 880 } 881 }; 882 883 // Send keepalive comment every 15 seconds to prevent timeout 884 const keepaliveInterval = setInterval(() => { 885 try { 886 controller.enqueue(encoder.encode(': keepalive\n\n')); 887 } catch (err) { 888 // Controller closed, stop sending keepalives 889 clearInterval(keepaliveInterval); 890 } 891 }, 15000); 892 893 // Send current job state immediately 894 sendEvent('progress', { 895 status: job.status, 896 progress: job.progress, 897 result: job.result, 898 error: job.error 899 }); 900 901 // If job is already completed or failed, close the stream 902 if (job.status === 'completed' || job.status === 'failed') { 903 clearInterval(keepaliveInterval); 904 controller.close(); 905 return; 906 } 907 908 // Listen for updates 909 const cleanup = addJobListener(jobId, (event, data) => { 910 sendEvent(event, data); 911 912 // Close stream after done or error event 913 if (event === 'done' || event === 'error') { 914 clearInterval(keepaliveInterval); 915 setTimeout(() => { 916 try { 917 controller.close(); 918 } catch (err) { 919 // Already closed 920 } 921 }, 100); 922 } 923 }); 924 925 // Cleanup on disconnect 926 return () => { 927 clearInterval(keepaliveInterval); 928 cleanup(); 929 }; 930 } 931 }); 932 933 return new Response(stream); 934 } 935 ) 936 .post( 937 '/upload-files', 938 async ({ body, auth }) => { 939 const { siteName, files } = body as { 940 siteName: string; 941 files: File | File[] 942 }; 943 944 console.log('=== UPLOAD FILES START ==='); 945 console.log('Site name:', siteName); 946 console.log('Files received:', Array.isArray(files) ? files.length : 'single file'); 947 948 try { 949 if (!siteName) { 950 throw new Error('Site name is required') 951 } 952 953 if (!isValidSiteName(siteName)) { 954 throw new Error('Invalid site name: must be 1-512 characters and contain only alphanumeric, dots, dashes, underscores, tildes, and colons') 955 } 956 957 // Check if files were provided 958 const hasFiles = files && (Array.isArray(files) ? files.length > 0 : !!files); 959 960 if (!hasFiles) { 961 // Handle empty upload synchronously (fast operation) 962 const agent = new Agent((url, init) => auth.session.fetchHandler(url, init)) 963 964 const emptyManifest = { 965 $type: 'place.wisp.fs', 966 site: siteName, 967 root: { 968 type: 'directory', 969 entries: [] 970 }, 971 fileCount: 0, 972 createdAt: new Date().toISOString() 973 }; 974 975 const validationResult = validateRecord(emptyManifest); 976 if (!validationResult.success) { 977 throw new Error(`Invalid manifest: ${validationResult.error?.message || 'Validation failed'}`); 978 } 979 980 const rkey = siteName; 981 982 const record = await agent.com.atproto.repo.putRecord({ 983 repo: auth.did, 984 collection: 'place.wisp.fs', 985 rkey: rkey, 986 record: emptyManifest 987 }); 988 989 await upsertSite(auth.did, rkey, siteName); 990 991 return { 992 success: true, 993 uri: record.data.uri, 994 cid: record.data.cid, 995 fileCount: 0, 996 siteName 997 }; 998 } 999 1000 // For file uploads, create a job and process in background 1001 const fileArray = Array.isArray(files) ? files : [files]; 1002 const jobId = createUploadJob(auth.did, siteName, fileArray.length); 1003 1004 // Track upload speeds to estimate progress 1005 const uploadStats = { 1006 speeds: [] as number[], // MB/s from completed uploads 1007 getAverageSpeed(): number { 1008 if (this.speeds.length === 0) return 3; // Default 3 MB/s 1009 const sum = this.speeds.reduce((a, b) => a + b, 0); 1010 return sum / this.speeds.length; 1011 } 1012 }; 1013 1014 // Create agent with OAuth session and upload progress monitoring 1015 const wrappedFetchHandler = async (url: string, init?: RequestInit) => { 1016 // Check if this is an uploadBlob request with a body 1017 if (url.includes('uploadBlob') && init?.body) { 1018 const originalBody = init.body; 1019 const bodySize = originalBody instanceof Uint8Array ? originalBody.length : 1020 originalBody instanceof ArrayBuffer ? originalBody.byteLength : 1021 typeof originalBody === 'string' ? new TextEncoder().encode(originalBody).length : 0; 1022 1023 const startTime = Date.now(); 1024 1025 if (bodySize > 10 * 1024 * 1024) { // Files over 10MB 1026 const sizeMB = (bodySize / 1024 / 1024).toFixed(1); 1027 const avgSpeed = uploadStats.getAverageSpeed(); 1028 const estimatedDuration = (bodySize / 1024 / 1024) / avgSpeed; 1029 1030 console.log(`[Upload Progress] Starting upload of ${sizeMB}MB file`); 1031 console.log(`[Upload Stats] Measured speeds from last ${uploadStats.speeds.length} files:`, uploadStats.speeds.map(s => s.toFixed(2) + ' MB/s').join(', ')); 1032 console.log(`[Upload Stats] Average speed: ${avgSpeed.toFixed(2)} MB/s, estimated duration: ${estimatedDuration.toFixed(0)}s`); 1033 1034 // Log estimated progress every 5 seconds 1035 const progressInterval = setInterval(() => { 1036 const elapsed = (Date.now() - startTime) / 1000; 1037 const estimatedPercent = Math.min(95, Math.round((elapsed / estimatedDuration) * 100)); 1038 const estimatedMB = Math.min(bodySize / 1024 / 1024, elapsed * avgSpeed).toFixed(1); 1039 console.log(`[Upload Progress] ~${estimatedPercent}% (~${estimatedMB}/${sizeMB}MB) - ${elapsed.toFixed(0)}s elapsed`); 1040 }, 5000); 1041 1042 try { 1043 const result = await auth.session.fetchHandler(url, init); 1044 clearInterval(progressInterval); 1045 const totalTime = (Date.now() - startTime) / 1000; 1046 const actualSpeed = (bodySize / 1024 / 1024) / totalTime; 1047 uploadStats.speeds.push(actualSpeed); 1048 // Keep only last 10 uploads for rolling average 1049 if (uploadStats.speeds.length > 10) uploadStats.speeds.shift(); 1050 console.log(`[Upload Progress] ✅ Completed ${sizeMB}MB in ${totalTime.toFixed(1)}s (${actualSpeed.toFixed(1)} MB/s)`); 1051 return result; 1052 } catch (err) { 1053 clearInterval(progressInterval); 1054 const elapsed = (Date.now() - startTime) / 1000; 1055 console.error(`[Upload Progress] ❌ Upload failed after ${elapsed.toFixed(1)}s`); 1056 throw err; 1057 } 1058 } else { 1059 // Track small files too for speed calculation 1060 try { 1061 const result = await auth.session.fetchHandler(url, init); 1062 const totalTime = (Date.now() - startTime) / 1000; 1063 if (totalTime > 0.5) { // Only track if > 0.5s 1064 const actualSpeed = (bodySize / 1024 / 1024) / totalTime; 1065 uploadStats.speeds.push(actualSpeed); 1066 if (uploadStats.speeds.length > 10) uploadStats.speeds.shift(); 1067 console.log(`[Upload Stats] Small file: ${(bodySize / 1024).toFixed(1)}KB in ${totalTime.toFixed(2)}s = ${actualSpeed.toFixed(2)} MB/s`); 1068 } 1069 return result; 1070 } catch (err) { 1071 throw err; 1072 } 1073 } 1074 } 1075 1076 // Normal request 1077 return auth.session.fetchHandler(url, init); 1078 }; 1079 1080 const agent = new Agent(wrappedFetchHandler) 1081 console.log('Agent created for DID:', auth.did); 1082 console.log('Created upload job:', jobId); 1083 1084 // Start background processing (don't await) 1085 processUploadInBackground(jobId, agent, auth.did, siteName, fileArray).catch(err => { 1086 console.error('Background upload process failed:', err); 1087 logger.error('Background upload process failed', err); 1088 }); 1089 1090 // Return immediately with job ID 1091 return { 1092 success: true, 1093 jobId, 1094 message: 'Upload started. Connect to /wisp/upload-progress/' + jobId + ' for progress updates.' 1095 }; 1096 } catch (error) { 1097 console.error('=== UPLOAD ERROR ==='); 1098 console.error('Error details:', error); 1099 logger.error('Upload error', error); 1100 throw new Error(`Failed to upload files: ${error instanceof Error ? error.message : 'Unknown error'}`); 1101 } 1102 } 1103 )