Monorepo for wisp.place. A static site hosting service built on top of the AT Protocol. wisp.place
1import { Elysia } from 'elysia' 2import { requireAuth, type AuthenticatedContext } from '../lib/wisp-auth' 3import { NodeOAuthClient } from '@atproto/oauth-client-node' 4import { Agent } from '@atproto/api' 5import { TID } from '@atproto/common-web' 6import { 7 type UploadedFile, 8 type FileUploadResult, 9 processUploadedFiles, 10 createManifest, 11 updateFileBlobs, 12 shouldCompressFile, 13 compressFile, 14 computeCID, 15 extractBlobMap, 16 extractSubfsUris, 17 findLargeDirectories, 18 replaceDirectoryWithSubfs, 19 estimateDirectorySize 20} from '../lib/wisp-utils' 21import { upsertSite } from '../lib/db' 22import { logger } from '../lib/observability' 23import { validateRecord } from '../lexicons/types/place/wisp/fs' 24import { validateRecord as validateSubfsRecord } from '../lexicons/types/place/wisp/subfs' 25import { MAX_SITE_SIZE, MAX_FILE_SIZE, MAX_FILE_COUNT } from '../lib/constants' 26import { 27 createUploadJob, 28 getUploadJob, 29 updateJobProgress, 30 completeUploadJob, 31 failUploadJob, 32 addJobListener 33} from '../lib/upload-jobs' 34 35function isValidSiteName(siteName: string): boolean { 36 if (!siteName || typeof siteName !== 'string') return false; 37 38 // Length check (AT Protocol rkey limit) 39 if (siteName.length < 1 || siteName.length > 512) return false; 40 41 // Check for path traversal 42 if (siteName === '.' || siteName === '..') return false; 43 if (siteName.includes('/') || siteName.includes('\\')) return false; 44 if (siteName.includes('\0')) return false; 45 46 // AT Protocol rkey format: alphanumeric, dots, dashes, underscores, tildes, colons 47 // Based on NSID format rules 48 const validRkeyPattern = /^[a-zA-Z0-9._~:-]+$/; 49 if (!validRkeyPattern.test(siteName)) return false; 50 51 return true; 52} 53 54async function processUploadInBackground( 55 jobId: string, 56 agent: Agent, 57 did: string, 58 siteName: string, 59 fileArray: File[] 60): Promise<void> { 61 try { 62 // Try to fetch existing record to enable incremental updates 63 let existingBlobMap = new Map<string, { blobRef: any; cid: string }>(); 64 let oldSubfsUris: Array<{ uri: string; path: string }> = []; 65 console.log('Attempting to fetch existing record...'); 66 updateJobProgress(jobId, { phase: 'validating' }); 67 68 try { 69 const rkey = siteName; 70 const existingRecord = await agent.com.atproto.repo.getRecord({ 71 repo: did, 72 collection: 'place.wisp.fs', 73 rkey: rkey 74 }); 75 console.log('Existing record found!'); 76 77 if (existingRecord.data.value && typeof existingRecord.data.value === 'object' && 'root' in existingRecord.data.value) { 78 const manifest = existingRecord.data.value as any; 79 80 // Extract blob map from main record 81 existingBlobMap = extractBlobMap(manifest.root); 82 console.log(`Found existing manifest with ${existingBlobMap.size} files in main record`); 83 84 // Extract subfs URIs with their mount paths from main record 85 const subfsUris = extractSubfsUris(manifest.root); 86 oldSubfsUris = subfsUris; // Save for cleanup later 87 88 if (subfsUris.length > 0) { 89 console.log(`Found ${subfsUris.length} subfs records, fetching in parallel...`); 90 logger.info(`Fetching ${subfsUris.length} subfs records for blob reuse`); 91 92 // Fetch all subfs records in parallel 93 const subfsRecords = await Promise.all( 94 subfsUris.map(async ({ uri, path }) => { 95 try { 96 // Parse URI: at://did/collection/rkey 97 const parts = uri.replace('at://', '').split('/'); 98 const subDid = parts[0]; 99 const collection = parts[1]; 100 const subRkey = parts[2]; 101 102 const record = await agent.com.atproto.repo.getRecord({ 103 repo: subDid, 104 collection: collection, 105 rkey: subRkey 106 }); 107 108 return { record: record.data.value as any, mountPath: path }; 109 } catch (err: any) { 110 logger.warn(`Failed to fetch subfs record ${uri}: ${err?.message}`, err); 111 return null; 112 } 113 }) 114 ); 115 116 // Merge blob maps from all subfs records 117 let totalSubfsBlobs = 0; 118 for (const subfsData of subfsRecords) { 119 if (subfsData && subfsData.record && 'root' in subfsData.record) { 120 // Extract blobs with the correct mount path prefix 121 const subfsMap = extractBlobMap(subfsData.record.root, subfsData.mountPath); 122 subfsMap.forEach((value, key) => { 123 existingBlobMap.set(key, value); 124 totalSubfsBlobs++; 125 }); 126 } 127 } 128 129 console.log(`Merged ${totalSubfsBlobs} files from ${subfsUris.length} subfs records`); 130 logger.info(`Total blob map: ${existingBlobMap.size} files (main + subfs)`); 131 } 132 133 console.log(`Total existing blobs for reuse: ${existingBlobMap.size} files`); 134 logger.info(`Found existing manifest with ${existingBlobMap.size} files for incremental update`); 135 } 136 } catch (error: any) { 137 console.log('No existing record found or error:', error?.message || error); 138 if (error?.status !== 400 && error?.error !== 'RecordNotFound') { 139 logger.warn('Failed to fetch existing record, proceeding with full upload', error); 140 } 141 } 142 143 // Convert File objects to UploadedFile format 144 const uploadedFiles: UploadedFile[] = []; 145 const skippedFiles: Array<{ name: string; reason: string }> = []; 146 147 console.log('Processing files, count:', fileArray.length); 148 updateJobProgress(jobId, { phase: 'compressing' }); 149 150 for (let i = 0; i < fileArray.length; i++) { 151 const file = fileArray[i]; 152 153 // Skip undefined/null files 154 if (!file || !file.name) { 155 console.log(`Skipping undefined file at index ${i}`); 156 skippedFiles.push({ 157 name: `[undefined file at index ${i}]`, 158 reason: 'Invalid file object' 159 }); 160 continue; 161 } 162 163 console.log(`Processing file ${i + 1}/${fileArray.length}:`, file.name, file.size, 'bytes'); 164 updateJobProgress(jobId, { 165 filesProcessed: i + 1, 166 currentFile: file.name 167 }); 168 169 // Skip .git directory files 170 const normalizedPath = file.name.replace(/^[^\/]*\//, ''); 171 if (normalizedPath.startsWith('.git/') || normalizedPath === '.git') { 172 console.log(`Skipping .git file: ${file.name}`); 173 skippedFiles.push({ 174 name: file.name, 175 reason: '.git directory excluded' 176 }); 177 continue; 178 } 179 180 // Skip files that are too large 181 const maxSize = MAX_FILE_SIZE; 182 if (file.size > maxSize) { 183 skippedFiles.push({ 184 name: file.name, 185 reason: `file too large (${(file.size / 1024 / 1024).toFixed(2)}MB, max 100MB)` 186 }); 187 continue; 188 } 189 190 const arrayBuffer = await file.arrayBuffer(); 191 const originalContent = Buffer.from(arrayBuffer); 192 const originalMimeType = file.type || 'application/octet-stream'; 193 194 // Determine if file should be compressed 195 const shouldCompress = shouldCompressFile(originalMimeType); 196 197 // Text files (HTML/CSS/JS) need base64 encoding to prevent PDS content sniffing 198 // Audio files just need compression without base64 199 const needsBase64 = 200 originalMimeType.startsWith('text/') || 201 originalMimeType.startsWith('application/json') || 202 originalMimeType.startsWith('application/xml') || 203 originalMimeType === 'image/svg+xml'; 204 205 let finalContent: Buffer; 206 let compressed = false; 207 let base64Encoded = false; 208 209 if (shouldCompress) { 210 const compressedContent = compressFile(originalContent); 211 compressed = true; 212 213 if (needsBase64) { 214 // Text files: compress AND base64 encode 215 finalContent = Buffer.from(compressedContent.toString('base64'), 'binary'); 216 base64Encoded = true; 217 const compressionRatio = (compressedContent.length / originalContent.length * 100).toFixed(1); 218 console.log(`Compressing+base64 ${file.name}: ${originalContent.length} -> ${compressedContent.length} bytes (${compressionRatio}%), base64: ${finalContent.length} bytes`); 219 logger.info(`Compressing+base64 ${file.name}: ${originalContent.length} -> ${compressedContent.length} bytes (${compressionRatio}%), base64: ${finalContent.length} bytes`); 220 } else { 221 // Audio files: just compress, no base64 222 finalContent = compressedContent; 223 const compressionRatio = (compressedContent.length / originalContent.length * 100).toFixed(1); 224 console.log(`Compressing ${file.name}: ${originalContent.length} -> ${compressedContent.length} bytes (${compressionRatio}%)`); 225 logger.info(`Compressing ${file.name}: ${originalContent.length} -> ${compressedContent.length} bytes (${compressionRatio}%)`); 226 } 227 } else { 228 // Binary files: upload directly 229 finalContent = originalContent; 230 console.log(`Uploading ${file.name} directly: ${originalContent.length} bytes (no compression)`); 231 logger.info(`Uploading ${file.name} directly: ${originalContent.length} bytes (binary)`); 232 } 233 234 uploadedFiles.push({ 235 name: file.name, 236 content: finalContent, 237 mimeType: originalMimeType, 238 size: finalContent.length, 239 compressed, 240 base64Encoded, 241 originalMimeType 242 }); 243 } 244 245 // Update total file count after filtering (important for progress tracking) 246 updateJobProgress(jobId, { 247 totalFiles: uploadedFiles.length 248 }); 249 250 // Check total size limit 251 const totalSize = uploadedFiles.reduce((sum, file) => sum + file.size, 0); 252 const maxTotalSize = MAX_SITE_SIZE; 253 254 if (totalSize > maxTotalSize) { 255 throw new Error(`Total upload size ${(totalSize / 1024 / 1024).toFixed(2)}MB exceeds 300MB limit`); 256 } 257 258 // Check file count limit 259 if (uploadedFiles.length > MAX_FILE_COUNT) { 260 throw new Error(`File count ${uploadedFiles.length} exceeds ${MAX_FILE_COUNT} files limit`); 261 } 262 263 console.log(`After filtering: ${uploadedFiles.length} files to process (${skippedFiles.length} skipped)`); 264 265 if (uploadedFiles.length === 0) { 266 // Create empty manifest 267 const emptyManifest = { 268 $type: 'place.wisp.fs', 269 site: siteName, 270 root: { 271 type: 'directory', 272 entries: [] 273 }, 274 fileCount: 0, 275 createdAt: new Date().toISOString() 276 }; 277 278 const validationResult = validateRecord(emptyManifest); 279 if (!validationResult.success) { 280 throw new Error(`Invalid manifest: ${validationResult.error?.message || 'Validation failed'}`); 281 } 282 283 const rkey = siteName; 284 updateJobProgress(jobId, { phase: 'finalizing' }); 285 286 const record = await agent.com.atproto.repo.putRecord({ 287 repo: did, 288 collection: 'place.wisp.fs', 289 rkey: rkey, 290 record: emptyManifest 291 }); 292 293 await upsertSite(did, rkey, siteName); 294 295 completeUploadJob(jobId, { 296 success: true, 297 uri: record.data.uri, 298 cid: record.data.cid, 299 fileCount: 0, 300 siteName, 301 skippedFiles 302 }); 303 return; 304 } 305 306 // Process files into directory structure 307 console.log('Processing uploaded files into directory structure...'); 308 const validUploadedFiles = uploadedFiles.filter((f, i) => { 309 if (!f || !f.name || !f.content) { 310 console.error(`Filtering out invalid file at index ${i}`); 311 return false; 312 } 313 return true; 314 }); 315 316 const { directory, fileCount } = processUploadedFiles(validUploadedFiles); 317 console.log('Directory structure created, file count:', fileCount); 318 319 // Upload files as blobs with retry logic for DPoP nonce conflicts 320 console.log('Starting blob upload/reuse phase...'); 321 updateJobProgress(jobId, { phase: 'uploading' }); 322 323 // Helper function to upload blob with exponential backoff retry and timeout 324 const uploadBlobWithRetry = async ( 325 agent: Agent, 326 content: Buffer, 327 mimeType: string, 328 fileName: string, 329 maxRetries = 5 330 ) => { 331 for (let attempt = 0; attempt < maxRetries; attempt++) { 332 const controller = new AbortController(); 333 const timeoutMs = 300000; // 5 minute timeout per upload 334 const timeoutId = setTimeout(() => controller.abort(), timeoutMs); 335 336 try { 337 console.log(`[File Upload] Starting upload attempt ${attempt + 1}/${maxRetries} for ${fileName} (${content.length} bytes, ${mimeType})`); 338 339 const result = await agent.com.atproto.repo.uploadBlob(content, { encoding: mimeType }); 340 clearTimeout(timeoutId); 341 console.log(`[File Upload] ✅ Successfully uploaded ${fileName} on attempt ${attempt + 1}`); 342 return result; 343 } catch (error: any) { 344 clearTimeout(timeoutId); 345 346 const isDPoPNonceError = 347 error?.message?.toLowerCase().includes('nonce') || 348 error?.message?.toLowerCase().includes('dpop') || 349 error?.status === 409; 350 351 const isTimeout = error?.name === 'AbortError' || error?.message === 'Upload timeout'; 352 const isRateLimited = error?.status === 429 || error?.message?.toLowerCase().includes('rate'); 353 const isRequestEntityTooLarge = error?.status === 419 || error?.status === 413; 354 355 // Special handling for 419/413 Request Entity Too Large errors 356 if (isRequestEntityTooLarge) { 357 const customError = new Error('Your PDS is not allowing uploads large enough to store your site. Please contact your PDS host. This could also possibly be a result of it being behind Cloudflare free tier.'); 358 (customError as any).status = 419; 359 throw customError; 360 } 361 362 // Retry on DPoP nonce conflicts, timeouts, or rate limits 363 if ((isDPoPNonceError || isTimeout || isRateLimited) && attempt < maxRetries - 1) { 364 let backoffMs: number; 365 if (isRateLimited) { 366 backoffMs = 2000 * Math.pow(2, attempt); // 2s, 4s, 8s, 16s for rate limits 367 } else if (isTimeout) { 368 backoffMs = 1000 * Math.pow(2, attempt); // 1s, 2s, 4s, 8s for timeouts 369 } else { 370 backoffMs = 100 * Math.pow(2, attempt); // 100ms, 200ms, 400ms for DPoP 371 } 372 373 const reason = isDPoPNonceError ? 'DPoP nonce conflict' : isTimeout ? 'timeout' : 'rate limit'; 374 logger.info(`[File Upload] 🔄 ${reason} for ${fileName}, retrying in ${backoffMs}ms (attempt ${attempt + 1}/${maxRetries})`); 375 console.log(`[File Upload] 🔄 ${reason} for ${fileName}, retrying in ${backoffMs}ms`); 376 await new Promise(resolve => setTimeout(resolve, backoffMs)); 377 continue; 378 } 379 380 // Log detailed error information before throwing 381 logger.error(`[File Upload] ❌ Upload failed for ${fileName} (size: ${content.length} bytes, mimeType: ${mimeType}, attempt: ${attempt + 1}/${maxRetries})`, { 382 error: error?.error || error?.message || 'Unknown error', 383 status: error?.status, 384 headers: error?.headers, 385 success: error?.success 386 }); 387 console.error(`[File Upload] ❌ Upload failed for ${fileName}:`, { 388 error: error?.error || error?.message || 'Unknown error', 389 status: error?.status, 390 size: content.length, 391 mimeType, 392 attempt: attempt + 1 393 }); 394 throw error; 395 } 396 } 397 throw new Error(`Failed to upload ${fileName} after ${maxRetries} attempts`); 398 }; 399 400 // Use sliding window concurrency for maximum throughput 401 const CONCURRENCY_LIMIT = 20; // Maximum concurrent uploads 402 const uploadedBlobs: Array<{ 403 result: FileUploadResult; 404 filePath: string; 405 sentMimeType: string; 406 returnedMimeType: string; 407 reused: boolean; 408 }> = []; 409 const failedFiles: Array<{ 410 name: string; 411 index: number; 412 error: string; 413 size: number; 414 }> = []; 415 416 // Track completed files count for accurate progress 417 let completedFilesCount = 0; 418 419 // Process file with sliding window concurrency 420 const processFile = async (file: UploadedFile, index: number) => { 421 try { 422 if (!file || !file.name) { 423 throw new Error(`Undefined file at index ${index}`); 424 } 425 426 const fileCID = computeCID(file.content); 427 const normalizedPath = file.name.replace(/^[^\/]*\//, ''); 428 const existingBlob = existingBlobMap.get(normalizedPath) || existingBlobMap.get(file.name); 429 430 if (existingBlob && existingBlob.cid === fileCID) { 431 logger.info(`[File Upload] ♻️ Reused: ${file.name} (unchanged, CID: ${fileCID})`); 432 const reusedCount = (getUploadJob(jobId)?.progress.filesReused || 0) + 1; 433 completedFilesCount++; 434 updateJobProgress(jobId, { 435 filesReused: reusedCount, 436 currentFile: `${completedFilesCount}/${validUploadedFiles.length}: ${file.name} (reused)` 437 }); 438 439 return { 440 result: { 441 hash: existingBlob.cid, 442 blobRef: existingBlob.blobRef, 443 ...(file.compressed && { 444 encoding: 'gzip' as const, 445 mimeType: file.originalMimeType || file.mimeType, 446 base64: file.base64Encoded || false 447 }) 448 }, 449 filePath: file.name, 450 sentMimeType: file.mimeType, 451 returnedMimeType: existingBlob.blobRef.mimeType, 452 reused: true 453 }; 454 } 455 456 const uploadMimeType = file.compressed || file.mimeType.startsWith('text/html') 457 ? 'application/octet-stream' 458 : file.mimeType; 459 460 const compressionInfo = file.compressed ? ' (gzipped)' : ''; 461 const fileSizeMB = (file.size / 1024 / 1024).toFixed(2); 462 logger.info(`[File Upload] ⬆️ Uploading: ${file.name} (${fileSizeMB}MB${compressionInfo})`); 463 464 const uploadResult = await uploadBlobWithRetry( 465 agent, 466 file.content, 467 uploadMimeType, 468 file.name 469 ); 470 471 const returnedBlobRef = uploadResult.data.blob; 472 const uploadedCount = (getUploadJob(jobId)?.progress.filesUploaded || 0) + 1; 473 completedFilesCount++; 474 updateJobProgress(jobId, { 475 filesUploaded: uploadedCount, 476 currentFile: `${completedFilesCount}/${validUploadedFiles.length}: ${file.name} (uploaded)` 477 }); 478 logger.info(`[File Upload] ✅ Uploaded: ${file.name} (CID: ${fileCID})`); 479 480 return { 481 result: { 482 hash: returnedBlobRef.ref.toString(), 483 blobRef: returnedBlobRef, 484 ...(file.compressed && { 485 encoding: 'gzip' as const, 486 mimeType: file.originalMimeType || file.mimeType, 487 base64: file.base64Encoded || false 488 }) 489 }, 490 filePath: file.name, 491 sentMimeType: file.mimeType, 492 returnedMimeType: returnedBlobRef.mimeType, 493 reused: false 494 }; 495 } catch (uploadError) { 496 const fileName = file?.name || 'unknown'; 497 const fileSize = file?.size || 0; 498 const errorMessage = uploadError instanceof Error ? uploadError.message : 'Unknown error'; 499 const errorDetails = { 500 fileName, 501 fileSize, 502 index, 503 error: errorMessage, 504 stack: uploadError instanceof Error ? uploadError.stack : undefined 505 }; 506 logger.error(`Upload failed for file: ${fileName} (${fileSize} bytes) at index ${index}`, errorDetails); 507 console.error(`Upload failed for file: ${fileName} (${fileSize} bytes) at index ${index}`, errorDetails); 508 509 completedFilesCount++; 510 updateJobProgress(jobId, { 511 currentFile: `${completedFilesCount}/${validUploadedFiles.length}: ${fileName} (failed)` 512 }); 513 514 // Track failed file but don't throw - continue with other files 515 failedFiles.push({ 516 name: fileName, 517 index, 518 error: errorMessage, 519 size: fileSize 520 }); 521 522 return null; // Return null to indicate failure 523 } 524 }; 525 526 // Sliding window concurrency control 527 const processWithConcurrency = async () => { 528 const results: any[] = []; 529 let fileIndex = 0; 530 const executing = new Map<Promise<void>, { index: number; name: string }>(); 531 532 for (const file of validUploadedFiles) { 533 const currentIndex = fileIndex++; 534 535 const promise = processFile(file, currentIndex) 536 .then(result => { 537 results[currentIndex] = result; 538 }) 539 .catch(error => { 540 // This shouldn't happen since processFile catches errors, but just in case 541 logger.error(`Unexpected error processing file at index ${currentIndex}`, error); 542 results[currentIndex] = null; 543 }) 544 .finally(() => { 545 executing.delete(promise); 546 }); 547 548 executing.set(promise, { index: currentIndex, name: file.name }); 549 550 if (executing.size >= CONCURRENCY_LIMIT) { 551 await Promise.race(executing.keys()); 552 } 553 } 554 555 // Wait for remaining uploads 556 await Promise.all(executing.keys()); 557 console.log(`\n✅ Upload complete: ${completedFilesCount}/${validUploadedFiles.length} files processed\n`); 558 return results.filter(r => r !== undefined && r !== null); // Filter out null (failed) and undefined entries 559 }; 560 561 const allResults = await processWithConcurrency(); 562 uploadedBlobs.push(...allResults); 563 564 const currentReused = uploadedBlobs.filter(b => b.reused).length; 565 const currentUploaded = uploadedBlobs.filter(b => !b.reused).length; 566 const successfulCount = uploadedBlobs.length; 567 const failedCount = failedFiles.length; 568 569 logger.info(`[File Upload] 🎉 Upload complete → ${successfulCount}/${validUploadedFiles.length} files succeeded (${currentUploaded} uploaded, ${currentReused} reused), ${failedCount} failed`); 570 571 if (failedCount > 0) { 572 logger.warn(`[File Upload] ⚠️ Failed files:`, failedFiles); 573 console.warn(`[File Upload] ⚠️ ${failedCount} files failed to upload:`, failedFiles.map(f => f.name).join(', ')); 574 } 575 576 const reusedCount = uploadedBlobs.filter(b => b.reused).length; 577 const uploadedCount = uploadedBlobs.filter(b => !b.reused).length; 578 logger.info(`[File Upload] 🎉 Upload phase complete! Total: ${successfulCount} files (${uploadedCount} uploaded, ${reusedCount} reused)`); 579 580 const uploadResults: FileUploadResult[] = uploadedBlobs.map(blob => blob.result); 581 const filePaths: string[] = uploadedBlobs.map(blob => blob.filePath); 582 583 // Update directory with file blobs (only for successfully uploaded files) 584 console.log('Updating directory with blob references...'); 585 updateJobProgress(jobId, { phase: 'creating_manifest' }); 586 587 // Create a set of successfully uploaded paths for quick lookup 588 const successfulPaths = new Set(filePaths.map(path => path.replace(/^[^\/]*\//, ''))); 589 590 const updatedDirectory = updateFileBlobs(directory, uploadResults, filePaths, '', successfulPaths); 591 592 // Calculate actual file count (only successfully uploaded files) 593 const actualFileCount = uploadedBlobs.length; 594 595 // Check if we need to split into subfs records 596 // Split proactively if we have lots of files to avoid hitting manifest size limits 597 const MAX_MANIFEST_SIZE = 140 * 1024; // 140KB to be safe (PDS limit is 150KB) 598 const FILE_COUNT_THRESHOLD = 250; // Start splitting at this many files 599 const TARGET_FILE_COUNT = 200; // Try to keep main manifest under this many files 600 const subfsRecords: Array<{ uri: string; path: string }> = []; 601 let workingDirectory = updatedDirectory; 602 let currentFileCount = actualFileCount; 603 604 // Create initial manifest to check size 605 let manifest = createManifest(siteName, workingDirectory, actualFileCount); 606 let manifestSize = JSON.stringify(manifest).length; 607 608 // Split if we have lots of files OR if manifest is already too large 609 if (actualFileCount >= FILE_COUNT_THRESHOLD || manifestSize > MAX_MANIFEST_SIZE) { 610 console.log(`⚠️ Large site detected (${actualFileCount} files, ${(manifestSize / 1024).toFixed(1)}KB), splitting into subfs records...`); 611 logger.info(`Large site with ${actualFileCount} files, splitting into subfs records`); 612 613 // Keep splitting until manifest fits under limits (both size and file count) 614 let attempts = 0; 615 const MAX_ATTEMPTS = 100; // Allow many splits for very large sites 616 617 while ((manifestSize > MAX_MANIFEST_SIZE || currentFileCount > TARGET_FILE_COUNT) && attempts < MAX_ATTEMPTS) { 618 attempts++; 619 620 // Find all directories sorted by size (largest first) 621 const directories = findLargeDirectories(workingDirectory); 622 directories.sort((a, b) => b.size - a.size); 623 624 // Check if we can split subdirectories or need to split flat files 625 if (directories.length > 0) { 626 // Split the largest subdirectory 627 const largestDir = directories[0]; 628 console.log(` Split #${attempts}: ${largestDir.path} (${largestDir.fileCount} files, ${(largestDir.size / 1024).toFixed(1)}KB)`); 629 630 // Create a subfs record for this directory 631 const subfsRkey = TID.nextStr(); 632 const subfsManifest = { 633 $type: 'place.wisp.subfs' as const, 634 root: largestDir.directory, 635 fileCount: largestDir.fileCount, 636 createdAt: new Date().toISOString() 637 }; 638 639 // Validate subfs record 640 const subfsValidation = validateSubfsRecord(subfsManifest); 641 if (!subfsValidation.success) { 642 throw new Error(`Invalid subfs manifest: ${subfsValidation.error?.message || 'Validation failed'}`); 643 } 644 645 // Upload subfs record to PDS 646 const subfsRecord = await agent.com.atproto.repo.putRecord({ 647 repo: did, 648 collection: 'place.wisp.subfs', 649 rkey: subfsRkey, 650 record: subfsManifest 651 }); 652 653 const subfsUri = subfsRecord.data.uri; 654 subfsRecords.push({ uri: subfsUri, path: largestDir.path }); 655 console.log(` ✅ Created subfs: ${subfsUri}`); 656 logger.info(`Created subfs record for ${largestDir.path}: ${subfsUri}`); 657 658 // Replace directory with subfs node in the main tree 659 workingDirectory = replaceDirectoryWithSubfs(workingDirectory, largestDir.path, subfsUri); 660 currentFileCount -= largestDir.fileCount; 661 } else { 662 // No subdirectories - split flat files at root level 663 const rootFiles = workingDirectory.entries.filter(e => 'type' in e.node && e.node.type === 'file'); 664 665 if (rootFiles.length === 0) { 666 throw new Error( 667 `Cannot split manifest further - no files or directories available. ` + 668 `Current: ${currentFileCount} files, ${(manifestSize / 1024).toFixed(1)}KB.` 669 ); 670 } 671 672 // Take a chunk of files (aim for ~100 files per chunk) 673 const CHUNK_SIZE = 100; 674 const chunkFiles = rootFiles.slice(0, Math.min(CHUNK_SIZE, rootFiles.length)); 675 console.log(` Split #${attempts}: flat root (${chunkFiles.length} files)`); 676 677 // Create a directory with just these files 678 const chunkDirectory: Directory = { 679 $type: 'place.wisp.fs#directory' as const, 680 type: 'directory' as const, 681 entries: chunkFiles 682 }; 683 684 // Create subfs record for this chunk 685 const subfsRkey = TID.nextStr(); 686 const subfsManifest = { 687 $type: 'place.wisp.subfs' as const, 688 root: chunkDirectory, 689 fileCount: chunkFiles.length, 690 createdAt: new Date().toISOString() 691 }; 692 693 // Validate subfs record 694 const subfsValidation = validateSubfsRecord(subfsManifest); 695 if (!subfsValidation.success) { 696 throw new Error(`Invalid subfs manifest: ${subfsValidation.error?.message || 'Validation failed'}`); 697 } 698 699 // Upload subfs record to PDS 700 const subfsRecord = await agent.com.atproto.repo.putRecord({ 701 repo: did, 702 collection: 'place.wisp.subfs', 703 rkey: subfsRkey, 704 record: subfsManifest 705 }); 706 707 const subfsUri = subfsRecord.data.uri; 708 console.log(` ✅ Created flat subfs: ${subfsUri}`); 709 logger.info(`Created flat subfs record with ${chunkFiles.length} files: ${subfsUri}`); 710 711 // Remove these files from the working directory and add a subfs entry 712 const remainingEntries = workingDirectory.entries.filter( 713 e => !chunkFiles.some(cf => cf.name === e.name) 714 ); 715 716 // Add subfs entry (will be merged flat when expanded) 717 remainingEntries.push({ 718 name: `__subfs_${attempts}`, // Placeholder name, will be merged away 719 node: { 720 $type: 'place.wisp.fs#subfs' as const, 721 type: 'subfs' as const, 722 subject: subfsUri 723 } 724 }); 725 726 workingDirectory = { 727 $type: 'place.wisp.fs#directory' as const, 728 type: 'directory' as const, 729 entries: remainingEntries 730 }; 731 732 subfsRecords.push({ uri: subfsUri, path: `__subfs_${attempts}` }); 733 currentFileCount -= chunkFiles.length; 734 } 735 736 // Recreate manifest and check new size 737 manifest = createManifest(siteName, workingDirectory, currentFileCount); 738 manifestSize = JSON.stringify(manifest).length; 739 const newSizeKB = (manifestSize / 1024).toFixed(1); 740 console.log(` → Manifest now ${newSizeKB}KB with ${currentFileCount} files (${subfsRecords.length} subfs total)`); 741 742 // Check if we're under both limits now 743 if (manifestSize <= MAX_MANIFEST_SIZE && currentFileCount <= TARGET_FILE_COUNT) { 744 console.log(` ✅ Manifest fits! (${currentFileCount} files, ${newSizeKB}KB)`); 745 break; 746 } 747 } 748 749 if (manifestSize > MAX_MANIFEST_SIZE || currentFileCount > TARGET_FILE_COUNT) { 750 throw new Error( 751 `Failed to fit manifest after splitting ${attempts} directories. ` + 752 `Current: ${currentFileCount} files, ${(manifestSize / 1024).toFixed(1)}KB. ` + 753 `This should never happen - please report this issue.` 754 ); 755 } 756 757 console.log(`✅ Split complete: ${subfsRecords.length} subfs records, ${currentFileCount} files in main, ${(manifestSize / 1024).toFixed(1)}KB manifest`); 758 logger.info(`Split into ${subfsRecords.length} subfs records, ${currentFileCount} files remaining in main tree`); 759 } else { 760 const manifestSizeKB = (manifestSize / 1024).toFixed(1); 761 console.log(`Manifest created (${fileCount} files, ${manifestSizeKB}KB JSON) - no splitting needed`); 762 } 763 764 const rkey = siteName; 765 updateJobProgress(jobId, { phase: 'finalizing' }); 766 767 console.log('Putting record to PDS with rkey:', rkey); 768 const record = await agent.com.atproto.repo.putRecord({ 769 repo: did, 770 collection: 'place.wisp.fs', 771 rkey: rkey, 772 record: manifest 773 }); 774 console.log('Record successfully created on PDS:', record.data.uri); 775 776 // Store site in database cache 777 await upsertSite(did, rkey, siteName); 778 779 // Clean up old subfs records if we had any 780 if (oldSubfsUris.length > 0) { 781 console.log(`Cleaning up ${oldSubfsUris.length} old subfs records...`); 782 logger.info(`Cleaning up ${oldSubfsUris.length} old subfs records`); 783 784 // Delete old subfs records in parallel (don't wait for completion) 785 Promise.all( 786 oldSubfsUris.map(async ({ uri }) => { 787 try { 788 // Parse URI: at://did/collection/rkey 789 const parts = uri.replace('at://', '').split('/'); 790 const subRkey = parts[2]; 791 792 await agent.com.atproto.repo.deleteRecord({ 793 repo: did, 794 collection: 'place.wisp.subfs', 795 rkey: subRkey 796 }); 797 798 console.log(` 🗑️ Deleted old subfs: ${uri}`); 799 logger.info(`Deleted old subfs record: ${uri}`); 800 } catch (err: any) { 801 // Don't fail the whole upload if cleanup fails 802 console.warn(`Failed to delete old subfs ${uri}:`, err?.message); 803 logger.warn(`Failed to delete old subfs ${uri}`, err); 804 } 805 }) 806 ).catch(err => { 807 // Log but don't fail if cleanup fails 808 logger.warn('Some subfs cleanup operations failed', err); 809 }); 810 } 811 812 completeUploadJob(jobId, { 813 success: true, 814 uri: record.data.uri, 815 cid: record.data.cid, 816 fileCount, 817 siteName, 818 skippedFiles, 819 failedFiles, 820 uploadedCount: validUploadedFiles.length - failedFiles.length, 821 hasFailures: failedFiles.length > 0 822 }); 823 824 console.log('=== UPLOAD FILES COMPLETE ==='); 825 } catch (error) { 826 console.error('=== UPLOAD ERROR ==='); 827 console.error('Error details:', error); 828 logger.error('Upload error', error); 829 failUploadJob(jobId, error instanceof Error ? error.message : 'Unknown error'); 830 } 831} 832 833export const wispRoutes = (client: NodeOAuthClient, cookieSecret: string) => 834 new Elysia({ 835 prefix: '/wisp', 836 cookie: { 837 secrets: cookieSecret, 838 sign: ['did'] 839 } 840 }) 841 .derive(async ({ cookie }) => { 842 const auth = await requireAuth(client, cookie) 843 return { auth } 844 }) 845 .get( 846 '/upload-progress/:jobId', 847 async ({ params: { jobId }, auth, set }) => { 848 const job = getUploadJob(jobId); 849 850 if (!job) { 851 set.status = 404; 852 return { error: 'Job not found' }; 853 } 854 855 // Verify job belongs to authenticated user 856 if (job.did !== auth.did) { 857 set.status = 403; 858 return { error: 'Unauthorized' }; 859 } 860 861 // Set up SSE headers 862 set.headers = { 863 'Content-Type': 'text/event-stream', 864 'Cache-Control': 'no-cache', 865 'Connection': 'keep-alive' 866 }; 867 868 const stream = new ReadableStream({ 869 start(controller) { 870 const encoder = new TextEncoder(); 871 872 // Send initial state 873 const sendEvent = (event: string, data: any) => { 874 try { 875 const message = `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`; 876 controller.enqueue(encoder.encode(message)); 877 } catch (err) { 878 // Controller closed, ignore 879 } 880 }; 881 882 // Send keepalive comment every 15 seconds to prevent timeout 883 const keepaliveInterval = setInterval(() => { 884 try { 885 controller.enqueue(encoder.encode(': keepalive\n\n')); 886 } catch (err) { 887 // Controller closed, stop sending keepalives 888 clearInterval(keepaliveInterval); 889 } 890 }, 15000); 891 892 // Send current job state immediately 893 sendEvent('progress', { 894 status: job.status, 895 progress: job.progress, 896 result: job.result, 897 error: job.error 898 }); 899 900 // If job is already completed or failed, close the stream 901 if (job.status === 'completed' || job.status === 'failed') { 902 clearInterval(keepaliveInterval); 903 controller.close(); 904 return; 905 } 906 907 // Listen for updates 908 const cleanup = addJobListener(jobId, (event, data) => { 909 sendEvent(event, data); 910 911 // Close stream after done or error event 912 if (event === 'done' || event === 'error') { 913 clearInterval(keepaliveInterval); 914 setTimeout(() => { 915 try { 916 controller.close(); 917 } catch (err) { 918 // Already closed 919 } 920 }, 100); 921 } 922 }); 923 924 // Cleanup on disconnect 925 return () => { 926 clearInterval(keepaliveInterval); 927 cleanup(); 928 }; 929 } 930 }); 931 932 return new Response(stream); 933 } 934 ) 935 .post( 936 '/upload-files', 937 async ({ body, auth }) => { 938 const { siteName, files } = body as { 939 siteName: string; 940 files: File | File[] 941 }; 942 943 console.log('=== UPLOAD FILES START ==='); 944 console.log('Site name:', siteName); 945 console.log('Files received:', Array.isArray(files) ? files.length : 'single file'); 946 947 try { 948 if (!siteName) { 949 throw new Error('Site name is required') 950 } 951 952 if (!isValidSiteName(siteName)) { 953 throw new Error('Invalid site name: must be 1-512 characters and contain only alphanumeric, dots, dashes, underscores, tildes, and colons') 954 } 955 956 // Check if files were provided 957 const hasFiles = files && (Array.isArray(files) ? files.length > 0 : !!files); 958 959 if (!hasFiles) { 960 // Handle empty upload synchronously (fast operation) 961 const agent = new Agent((url, init) => auth.session.fetchHandler(url, init)) 962 963 const emptyManifest = { 964 $type: 'place.wisp.fs', 965 site: siteName, 966 root: { 967 type: 'directory', 968 entries: [] 969 }, 970 fileCount: 0, 971 createdAt: new Date().toISOString() 972 }; 973 974 const validationResult = validateRecord(emptyManifest); 975 if (!validationResult.success) { 976 throw new Error(`Invalid manifest: ${validationResult.error?.message || 'Validation failed'}`); 977 } 978 979 const rkey = siteName; 980 981 const record = await agent.com.atproto.repo.putRecord({ 982 repo: auth.did, 983 collection: 'place.wisp.fs', 984 rkey: rkey, 985 record: emptyManifest 986 }); 987 988 await upsertSite(auth.did, rkey, siteName); 989 990 return { 991 success: true, 992 uri: record.data.uri, 993 cid: record.data.cid, 994 fileCount: 0, 995 siteName 996 }; 997 } 998 999 // For file uploads, create a job and process in background 1000 const fileArray = Array.isArray(files) ? files : [files]; 1001 const jobId = createUploadJob(auth.did, siteName, fileArray.length); 1002 1003 // Track upload speeds to estimate progress 1004 const uploadStats = { 1005 speeds: [] as number[], // MB/s from completed uploads 1006 getAverageSpeed(): number { 1007 if (this.speeds.length === 0) return 3; // Default 3 MB/s 1008 const sum = this.speeds.reduce((a, b) => a + b, 0); 1009 return sum / this.speeds.length; 1010 } 1011 }; 1012 1013 // Create agent with OAuth session and upload progress monitoring 1014 const wrappedFetchHandler = async (url: string, init?: RequestInit) => { 1015 // Check if this is an uploadBlob request with a body 1016 if (url.includes('uploadBlob') && init?.body) { 1017 const originalBody = init.body; 1018 const bodySize = originalBody instanceof Uint8Array ? originalBody.length : 1019 originalBody instanceof ArrayBuffer ? originalBody.byteLength : 1020 typeof originalBody === 'string' ? new TextEncoder().encode(originalBody).length : 0; 1021 1022 const startTime = Date.now(); 1023 1024 if (bodySize > 10 * 1024 * 1024) { // Files over 10MB 1025 const sizeMB = (bodySize / 1024 / 1024).toFixed(1); 1026 const avgSpeed = uploadStats.getAverageSpeed(); 1027 const estimatedDuration = (bodySize / 1024 / 1024) / avgSpeed; 1028 1029 console.log(`[Upload Progress] Starting upload of ${sizeMB}MB file`); 1030 console.log(`[Upload Stats] Measured speeds from last ${uploadStats.speeds.length} files:`, uploadStats.speeds.map(s => s.toFixed(2) + ' MB/s').join(', ')); 1031 console.log(`[Upload Stats] Average speed: ${avgSpeed.toFixed(2)} MB/s, estimated duration: ${estimatedDuration.toFixed(0)}s`); 1032 1033 // Log estimated progress every 5 seconds 1034 const progressInterval = setInterval(() => { 1035 const elapsed = (Date.now() - startTime) / 1000; 1036 const estimatedPercent = Math.min(95, Math.round((elapsed / estimatedDuration) * 100)); 1037 const estimatedMB = Math.min(bodySize / 1024 / 1024, elapsed * avgSpeed).toFixed(1); 1038 console.log(`[Upload Progress] ~${estimatedPercent}% (~${estimatedMB}/${sizeMB}MB) - ${elapsed.toFixed(0)}s elapsed`); 1039 }, 5000); 1040 1041 try { 1042 const result = await auth.session.fetchHandler(url, init); 1043 clearInterval(progressInterval); 1044 const totalTime = (Date.now() - startTime) / 1000; 1045 const actualSpeed = (bodySize / 1024 / 1024) / totalTime; 1046 uploadStats.speeds.push(actualSpeed); 1047 // Keep only last 10 uploads for rolling average 1048 if (uploadStats.speeds.length > 10) uploadStats.speeds.shift(); 1049 console.log(`[Upload Progress] ✅ Completed ${sizeMB}MB in ${totalTime.toFixed(1)}s (${actualSpeed.toFixed(1)} MB/s)`); 1050 return result; 1051 } catch (err) { 1052 clearInterval(progressInterval); 1053 const elapsed = (Date.now() - startTime) / 1000; 1054 console.error(`[Upload Progress] ❌ Upload failed after ${elapsed.toFixed(1)}s`); 1055 throw err; 1056 } 1057 } else { 1058 // Track small files too for speed calculation 1059 try { 1060 const result = await auth.session.fetchHandler(url, init); 1061 const totalTime = (Date.now() - startTime) / 1000; 1062 if (totalTime > 0.5) { // Only track if > 0.5s 1063 const actualSpeed = (bodySize / 1024 / 1024) / totalTime; 1064 uploadStats.speeds.push(actualSpeed); 1065 if (uploadStats.speeds.length > 10) uploadStats.speeds.shift(); 1066 console.log(`[Upload Stats] Small file: ${(bodySize / 1024).toFixed(1)}KB in ${totalTime.toFixed(2)}s = ${actualSpeed.toFixed(2)} MB/s`); 1067 } 1068 return result; 1069 } catch (err) { 1070 throw err; 1071 } 1072 } 1073 } 1074 1075 // Normal request 1076 return auth.session.fetchHandler(url, init); 1077 }; 1078 1079 const agent = new Agent(wrappedFetchHandler) 1080 console.log('Agent created for DID:', auth.did); 1081 console.log('Created upload job:', jobId); 1082 1083 // Start background processing (don't await) 1084 processUploadInBackground(jobId, agent, auth.did, siteName, fileArray).catch(err => { 1085 console.error('Background upload process failed:', err); 1086 logger.error('Background upload process failed', err); 1087 }); 1088 1089 // Return immediately with job ID 1090 return { 1091 success: true, 1092 jobId, 1093 message: 'Upload started. Connect to /wisp/upload-progress/' + jobId + ' for progress updates.' 1094 }; 1095 } catch (error) { 1096 console.error('=== UPLOAD ERROR ==='); 1097 console.error('Error details:', error); 1098 logger.error('Upload error', error); 1099 throw new Error(`Failed to upload files: ${error instanceof Error ? error.message : 'Unknown error'}`); 1100 } 1101 } 1102 )