Monorepo for wisp.place. A static site hosting service built on top of the AT Protocol. wisp.place
1import { Elysia } from 'elysia' 2import { requireAuth, type AuthenticatedContext } from '../lib/wisp-auth' 3import { NodeOAuthClient } from '@atproto/oauth-client-node' 4import { Agent } from '@atproto/api' 5import { TID } from '@atproto/common-web' 6import { 7 type UploadedFile, 8 type FileUploadResult, 9 processUploadedFiles, 10 createManifest, 11 updateFileBlobs, 12 shouldCompressFile, 13 compressFile, 14 computeCID, 15 extractBlobMap, 16 extractSubfsUris, 17 findLargeDirectories, 18 replaceDirectoryWithSubfs, 19 estimateDirectorySize 20} from '../lib/wisp-utils' 21import { upsertSite } from '../lib/db' 22import { logger } from '../lib/observability' 23import { validateRecord } from '../lexicons/types/place/wisp/fs' 24import { validateRecord as validateSubfsRecord } from '../lexicons/types/place/wisp/subfs' 25import { MAX_SITE_SIZE, MAX_FILE_SIZE, MAX_FILE_COUNT } from '../lib/constants' 26import { 27 createUploadJob, 28 getUploadJob, 29 updateJobProgress, 30 completeUploadJob, 31 failUploadJob, 32 addJobListener 33} from '../lib/upload-jobs' 34 35function isValidSiteName(siteName: string): boolean { 36 if (!siteName || typeof siteName !== 'string') return false; 37 38 // Length check (AT Protocol rkey limit) 39 if (siteName.length < 1 || siteName.length > 512) return false; 40 41 // Check for path traversal 42 if (siteName === '.' || siteName === '..') return false; 43 if (siteName.includes('/') || siteName.includes('\\')) return false; 44 if (siteName.includes('\0')) return false; 45 46 // AT Protocol rkey format: alphanumeric, dots, dashes, underscores, tildes, colons 47 // Based on NSID format rules 48 const validRkeyPattern = /^[a-zA-Z0-9._~:-]+$/; 49 if (!validRkeyPattern.test(siteName)) return false; 50 51 return true; 52} 53 54async function processUploadInBackground( 55 jobId: string, 56 agent: Agent, 57 did: string, 58 siteName: string, 59 fileArray: File[] 60): Promise<void> { 61 try { 62 // Try to fetch existing record to enable incremental updates 63 let existingBlobMap = new Map<string, { blobRef: any; cid: string }>(); 64 let oldSubfsUris: Array<{ uri: string; path: string }> = []; 65 console.log('Attempting to fetch existing record...'); 66 updateJobProgress(jobId, { phase: 'validating' }); 67 68 try { 69 const rkey = siteName; 70 const existingRecord = await agent.com.atproto.repo.getRecord({ 71 repo: did, 72 collection: 'place.wisp.fs', 73 rkey: rkey 74 }); 75 console.log('Existing record found!'); 76 77 if (existingRecord.data.value && typeof existingRecord.data.value === 'object' && 'root' in existingRecord.data.value) { 78 const manifest = existingRecord.data.value as any; 79 80 // Extract blob map from main record 81 existingBlobMap = extractBlobMap(manifest.root); 82 console.log(`Found existing manifest with ${existingBlobMap.size} files in main record`); 83 84 // Extract subfs URIs with their mount paths from main record 85 const subfsUris = extractSubfsUris(manifest.root); 86 oldSubfsUris = subfsUris; // Save for cleanup later 87 88 if (subfsUris.length > 0) { 89 console.log(`Found ${subfsUris.length} subfs records, fetching in parallel...`); 90 logger.info(`Fetching ${subfsUris.length} subfs records for blob reuse`); 91 92 // Fetch all subfs records in parallel 93 const subfsRecords = await Promise.all( 94 subfsUris.map(async ({ uri, path }) => { 95 try { 96 // Parse URI: at://did/collection/rkey 97 const parts = uri.replace('at://', '').split('/'); 98 const subDid = parts[0]; 99 const collection = parts[1]; 100 const subRkey = parts[2]; 101 102 const record = await agent.com.atproto.repo.getRecord({ 103 repo: subDid, 104 collection: collection, 105 rkey: subRkey 106 }); 107 108 return { record: record.data.value as any, mountPath: path }; 109 } catch (err: any) { 110 logger.warn(`Failed to fetch subfs record ${uri}: ${err?.message}`, err); 111 return null; 112 } 113 }) 114 ); 115 116 // Merge blob maps from all subfs records 117 let totalSubfsBlobs = 0; 118 for (const subfsData of subfsRecords) { 119 if (subfsData && subfsData.record && 'root' in subfsData.record) { 120 // Extract blobs with the correct mount path prefix 121 const subfsMap = extractBlobMap(subfsData.record.root, subfsData.mountPath); 122 subfsMap.forEach((value, key) => { 123 existingBlobMap.set(key, value); 124 totalSubfsBlobs++; 125 }); 126 } 127 } 128 129 console.log(`Merged ${totalSubfsBlobs} files from ${subfsUris.length} subfs records`); 130 logger.info(`Total blob map: ${existingBlobMap.size} files (main + subfs)`); 131 } 132 133 console.log(`Total existing blobs for reuse: ${existingBlobMap.size} files`); 134 logger.info(`Found existing manifest with ${existingBlobMap.size} files for incremental update`); 135 } 136 } catch (error: any) { 137 console.log('No existing record found or error:', error?.message || error); 138 if (error?.status !== 400 && error?.error !== 'RecordNotFound') { 139 logger.warn('Failed to fetch existing record, proceeding with full upload', error); 140 } 141 } 142 143 // Convert File objects to UploadedFile format 144 const uploadedFiles: UploadedFile[] = []; 145 const skippedFiles: Array<{ name: string; reason: string }> = []; 146 147 console.log('Processing files, count:', fileArray.length); 148 updateJobProgress(jobId, { phase: 'compressing' }); 149 150 for (let i = 0; i < fileArray.length; i++) { 151 const file = fileArray[i]; 152 153 // Skip undefined/null files 154 if (!file || !file.name) { 155 console.log(`Skipping undefined file at index ${i}`); 156 skippedFiles.push({ 157 name: `[undefined file at index ${i}]`, 158 reason: 'Invalid file object' 159 }); 160 continue; 161 } 162 163 console.log(`Processing file ${i + 1}/${fileArray.length}:`, file.name, file.size, 'bytes'); 164 updateJobProgress(jobId, { 165 filesProcessed: i + 1, 166 currentFile: file.name 167 }); 168 169 // Skip .git directory files 170 const normalizedPath = file.name.replace(/^[^\/]*\//, ''); 171 if (normalizedPath.startsWith('.git/') || normalizedPath === '.git') { 172 console.log(`Skipping .git file: ${file.name}`); 173 skippedFiles.push({ 174 name: file.name, 175 reason: '.git directory excluded' 176 }); 177 continue; 178 } 179 180 // Skip files that are too large 181 const maxSize = MAX_FILE_SIZE; 182 if (file.size > maxSize) { 183 skippedFiles.push({ 184 name: file.name, 185 reason: `file too large (${(file.size / 1024 / 1024).toFixed(2)}MB, max 100MB)` 186 }); 187 continue; 188 } 189 190 const arrayBuffer = await file.arrayBuffer(); 191 const originalContent = Buffer.from(arrayBuffer); 192 const originalMimeType = file.type || 'application/octet-stream'; 193 194 // Determine if file should be compressed 195 const shouldCompress = shouldCompressFile(originalMimeType); 196 197 // Text files (HTML/CSS/JS) need base64 encoding to prevent PDS content sniffing 198 // Audio files just need compression without base64 199 const needsBase64 = 200 originalMimeType.startsWith('text/') || 201 originalMimeType.startsWith('application/json') || 202 originalMimeType.startsWith('application/xml') || 203 originalMimeType === 'image/svg+xml'; 204 205 let finalContent: Buffer; 206 let compressed = false; 207 let base64Encoded = false; 208 209 if (shouldCompress) { 210 const compressedContent = compressFile(originalContent); 211 compressed = true; 212 213 if (needsBase64) { 214 // Text files: compress AND base64 encode 215 finalContent = Buffer.from(compressedContent.toString('base64'), 'binary'); 216 base64Encoded = true; 217 const compressionRatio = (compressedContent.length / originalContent.length * 100).toFixed(1); 218 console.log(`Compressing+base64 ${file.name}: ${originalContent.length} -> ${compressedContent.length} bytes (${compressionRatio}%), base64: ${finalContent.length} bytes`); 219 logger.info(`Compressing+base64 ${file.name}: ${originalContent.length} -> ${compressedContent.length} bytes (${compressionRatio}%), base64: ${finalContent.length} bytes`); 220 } else { 221 // Audio files: just compress, no base64 222 finalContent = compressedContent; 223 const compressionRatio = (compressedContent.length / originalContent.length * 100).toFixed(1); 224 console.log(`Compressing ${file.name}: ${originalContent.length} -> ${compressedContent.length} bytes (${compressionRatio}%)`); 225 logger.info(`Compressing ${file.name}: ${originalContent.length} -> ${compressedContent.length} bytes (${compressionRatio}%)`); 226 } 227 } else { 228 // Binary files: upload directly 229 finalContent = originalContent; 230 console.log(`Uploading ${file.name} directly: ${originalContent.length} bytes (no compression)`); 231 logger.info(`Uploading ${file.name} directly: ${originalContent.length} bytes (binary)`); 232 } 233 234 uploadedFiles.push({ 235 name: file.name, 236 content: finalContent, 237 mimeType: originalMimeType, 238 size: finalContent.length, 239 compressed, 240 base64Encoded, 241 originalMimeType 242 }); 243 } 244 245 // Update total file count after filtering (important for progress tracking) 246 updateJobProgress(jobId, { 247 totalFiles: uploadedFiles.length 248 }); 249 250 // Check total size limit 251 const totalSize = uploadedFiles.reduce((sum, file) => sum + file.size, 0); 252 const maxTotalSize = MAX_SITE_SIZE; 253 254 if (totalSize > maxTotalSize) { 255 throw new Error(`Total upload size ${(totalSize / 1024 / 1024).toFixed(2)}MB exceeds 300MB limit`); 256 } 257 258 // Check file count limit 259 if (uploadedFiles.length > MAX_FILE_COUNT) { 260 throw new Error(`File count ${uploadedFiles.length} exceeds ${MAX_FILE_COUNT} files limit`); 261 } 262 263 console.log(`After filtering: ${uploadedFiles.length} files to process (${skippedFiles.length} skipped)`); 264 265 if (uploadedFiles.length === 0) { 266 // Create empty manifest 267 const emptyManifest = { 268 $type: 'place.wisp.fs', 269 site: siteName, 270 root: { 271 type: 'directory', 272 entries: [] 273 }, 274 fileCount: 0, 275 createdAt: new Date().toISOString() 276 }; 277 278 const validationResult = validateRecord(emptyManifest); 279 if (!validationResult.success) { 280 throw new Error(`Invalid manifest: ${validationResult.error?.message || 'Validation failed'}`); 281 } 282 283 const rkey = siteName; 284 updateJobProgress(jobId, { phase: 'finalizing' }); 285 286 const record = await agent.com.atproto.repo.putRecord({ 287 repo: did, 288 collection: 'place.wisp.fs', 289 rkey: rkey, 290 record: emptyManifest 291 }); 292 293 await upsertSite(did, rkey, siteName); 294 295 completeUploadJob(jobId, { 296 success: true, 297 uri: record.data.uri, 298 cid: record.data.cid, 299 fileCount: 0, 300 siteName, 301 skippedFiles 302 }); 303 return; 304 } 305 306 // Process files into directory structure 307 console.log('Processing uploaded files into directory structure...'); 308 const validUploadedFiles = uploadedFiles.filter((f, i) => { 309 if (!f || !f.name || !f.content) { 310 console.error(`Filtering out invalid file at index ${i}`); 311 return false; 312 } 313 return true; 314 }); 315 316 const { directory, fileCount } = processUploadedFiles(validUploadedFiles); 317 console.log('Directory structure created, file count:', fileCount); 318 319 // Upload files as blobs with retry logic for DPoP nonce conflicts 320 console.log('Starting blob upload/reuse phase...'); 321 updateJobProgress(jobId, { phase: 'uploading' }); 322 323 // Helper function to upload blob with exponential backoff retry and timeout 324 const uploadBlobWithRetry = async ( 325 agent: Agent, 326 content: Buffer, 327 mimeType: string, 328 fileName: string, 329 maxRetries = 5 330 ) => { 331 for (let attempt = 0; attempt < maxRetries; attempt++) { 332 const controller = new AbortController(); 333 const timeoutMs = 300000; // 5 minute timeout per upload 334 const timeoutId = setTimeout(() => controller.abort(), timeoutMs); 335 336 try { 337 console.log(`[File Upload] Starting upload attempt ${attempt + 1}/${maxRetries} for ${fileName} (${content.length} bytes, ${mimeType})`); 338 339 const result = await agent.com.atproto.repo.uploadBlob(content, { encoding: mimeType }); 340 clearTimeout(timeoutId); 341 console.log(`[File Upload] ✅ Successfully uploaded ${fileName} on attempt ${attempt + 1}`); 342 return result; 343 } catch (error: any) { 344 clearTimeout(timeoutId); 345 346 const isDPoPNonceError = 347 error?.message?.toLowerCase().includes('nonce') || 348 error?.message?.toLowerCase().includes('dpop') || 349 error?.status === 409; 350 351 const isTimeout = error?.name === 'AbortError' || error?.message === 'Upload timeout'; 352 const isRateLimited = error?.status === 429 || error?.message?.toLowerCase().includes('rate'); 353 354 // Retry on DPoP nonce conflicts, timeouts, or rate limits 355 if ((isDPoPNonceError || isTimeout || isRateLimited) && attempt < maxRetries - 1) { 356 let backoffMs: number; 357 if (isRateLimited) { 358 backoffMs = 2000 * Math.pow(2, attempt); // 2s, 4s, 8s, 16s for rate limits 359 } else if (isTimeout) { 360 backoffMs = 1000 * Math.pow(2, attempt); // 1s, 2s, 4s, 8s for timeouts 361 } else { 362 backoffMs = 100 * Math.pow(2, attempt); // 100ms, 200ms, 400ms for DPoP 363 } 364 365 const reason = isDPoPNonceError ? 'DPoP nonce conflict' : isTimeout ? 'timeout' : 'rate limit'; 366 logger.info(`[File Upload] 🔄 ${reason} for ${fileName}, retrying in ${backoffMs}ms (attempt ${attempt + 1}/${maxRetries})`); 367 console.log(`[File Upload] 🔄 ${reason} for ${fileName}, retrying in ${backoffMs}ms`); 368 await new Promise(resolve => setTimeout(resolve, backoffMs)); 369 continue; 370 } 371 372 // Log detailed error information before throwing 373 logger.error(`[File Upload] ❌ Upload failed for ${fileName} (size: ${content.length} bytes, mimeType: ${mimeType}, attempt: ${attempt + 1}/${maxRetries})`, { 374 error: error?.error || error?.message || 'Unknown error', 375 status: error?.status, 376 headers: error?.headers, 377 success: error?.success 378 }); 379 console.error(`[File Upload] ❌ Upload failed for ${fileName}:`, { 380 error: error?.error || error?.message || 'Unknown error', 381 status: error?.status, 382 size: content.length, 383 mimeType, 384 attempt: attempt + 1 385 }); 386 throw error; 387 } 388 } 389 throw new Error(`Failed to upload ${fileName} after ${maxRetries} attempts`); 390 }; 391 392 // Use sliding window concurrency for maximum throughput 393 const CONCURRENCY_LIMIT = 20; // Maximum concurrent uploads 394 const uploadedBlobs: Array<{ 395 result: FileUploadResult; 396 filePath: string; 397 sentMimeType: string; 398 returnedMimeType: string; 399 reused: boolean; 400 }> = []; 401 const failedFiles: Array<{ 402 name: string; 403 index: number; 404 error: string; 405 size: number; 406 }> = []; 407 408 // Track completed files count for accurate progress 409 let completedFilesCount = 0; 410 411 // Process file with sliding window concurrency 412 const processFile = async (file: UploadedFile, index: number) => { 413 try { 414 if (!file || !file.name) { 415 throw new Error(`Undefined file at index ${index}`); 416 } 417 418 const fileCID = computeCID(file.content); 419 const normalizedPath = file.name.replace(/^[^\/]*\//, ''); 420 const existingBlob = existingBlobMap.get(normalizedPath) || existingBlobMap.get(file.name); 421 422 if (existingBlob && existingBlob.cid === fileCID) { 423 logger.info(`[File Upload] ♻️ Reused: ${file.name} (unchanged, CID: ${fileCID})`); 424 const reusedCount = (getUploadJob(jobId)?.progress.filesReused || 0) + 1; 425 completedFilesCount++; 426 updateJobProgress(jobId, { 427 filesReused: reusedCount, 428 currentFile: `${completedFilesCount}/${validUploadedFiles.length}: ${file.name} (reused)` 429 }); 430 431 return { 432 result: { 433 hash: existingBlob.cid, 434 blobRef: existingBlob.blobRef, 435 ...(file.compressed && { 436 encoding: 'gzip' as const, 437 mimeType: file.originalMimeType || file.mimeType, 438 base64: file.base64Encoded || false 439 }) 440 }, 441 filePath: file.name, 442 sentMimeType: file.mimeType, 443 returnedMimeType: existingBlob.blobRef.mimeType, 444 reused: true 445 }; 446 } 447 448 const uploadMimeType = file.compressed || file.mimeType.startsWith('text/html') 449 ? 'application/octet-stream' 450 : file.mimeType; 451 452 const compressionInfo = file.compressed ? ' (gzipped)' : ''; 453 const fileSizeMB = (file.size / 1024 / 1024).toFixed(2); 454 logger.info(`[File Upload] ⬆️ Uploading: ${file.name} (${fileSizeMB}MB${compressionInfo})`); 455 456 const uploadResult = await uploadBlobWithRetry( 457 agent, 458 file.content, 459 uploadMimeType, 460 file.name 461 ); 462 463 const returnedBlobRef = uploadResult.data.blob; 464 const uploadedCount = (getUploadJob(jobId)?.progress.filesUploaded || 0) + 1; 465 completedFilesCount++; 466 updateJobProgress(jobId, { 467 filesUploaded: uploadedCount, 468 currentFile: `${completedFilesCount}/${validUploadedFiles.length}: ${file.name} (uploaded)` 469 }); 470 logger.info(`[File Upload] ✅ Uploaded: ${file.name} (CID: ${fileCID})`); 471 472 return { 473 result: { 474 hash: returnedBlobRef.ref.toString(), 475 blobRef: returnedBlobRef, 476 ...(file.compressed && { 477 encoding: 'gzip' as const, 478 mimeType: file.originalMimeType || file.mimeType, 479 base64: file.base64Encoded || false 480 }) 481 }, 482 filePath: file.name, 483 sentMimeType: file.mimeType, 484 returnedMimeType: returnedBlobRef.mimeType, 485 reused: false 486 }; 487 } catch (uploadError) { 488 const fileName = file?.name || 'unknown'; 489 const fileSize = file?.size || 0; 490 const errorMessage = uploadError instanceof Error ? uploadError.message : 'Unknown error'; 491 const errorDetails = { 492 fileName, 493 fileSize, 494 index, 495 error: errorMessage, 496 stack: uploadError instanceof Error ? uploadError.stack : undefined 497 }; 498 logger.error(`Upload failed for file: ${fileName} (${fileSize} bytes) at index ${index}`, errorDetails); 499 console.error(`Upload failed for file: ${fileName} (${fileSize} bytes) at index ${index}`, errorDetails); 500 501 completedFilesCount++; 502 updateJobProgress(jobId, { 503 currentFile: `${completedFilesCount}/${validUploadedFiles.length}: ${fileName} (failed)` 504 }); 505 506 // Track failed file but don't throw - continue with other files 507 failedFiles.push({ 508 name: fileName, 509 index, 510 error: errorMessage, 511 size: fileSize 512 }); 513 514 return null; // Return null to indicate failure 515 } 516 }; 517 518 // Sliding window concurrency control 519 const processWithConcurrency = async () => { 520 const results: any[] = []; 521 let fileIndex = 0; 522 const executing = new Map<Promise<void>, { index: number; name: string }>(); 523 524 for (const file of validUploadedFiles) { 525 const currentIndex = fileIndex++; 526 527 const promise = processFile(file, currentIndex) 528 .then(result => { 529 results[currentIndex] = result; 530 }) 531 .catch(error => { 532 // This shouldn't happen since processFile catches errors, but just in case 533 logger.error(`Unexpected error processing file at index ${currentIndex}`, error); 534 results[currentIndex] = null; 535 }) 536 .finally(() => { 537 executing.delete(promise); 538 }); 539 540 executing.set(promise, { index: currentIndex, name: file.name }); 541 542 if (executing.size >= CONCURRENCY_LIMIT) { 543 await Promise.race(executing.keys()); 544 } 545 } 546 547 // Wait for remaining uploads 548 await Promise.all(executing.keys()); 549 console.log(`\n✅ Upload complete: ${completedFilesCount}/${validUploadedFiles.length} files processed\n`); 550 return results.filter(r => r !== undefined && r !== null); // Filter out null (failed) and undefined entries 551 }; 552 553 const allResults = await processWithConcurrency(); 554 uploadedBlobs.push(...allResults); 555 556 const currentReused = uploadedBlobs.filter(b => b.reused).length; 557 const currentUploaded = uploadedBlobs.filter(b => !b.reused).length; 558 const successfulCount = uploadedBlobs.length; 559 const failedCount = failedFiles.length; 560 561 logger.info(`[File Upload] 🎉 Upload complete → ${successfulCount}/${validUploadedFiles.length} files succeeded (${currentUploaded} uploaded, ${currentReused} reused), ${failedCount} failed`); 562 563 if (failedCount > 0) { 564 logger.warn(`[File Upload] ⚠️ Failed files:`, failedFiles); 565 console.warn(`[File Upload] ⚠️ ${failedCount} files failed to upload:`, failedFiles.map(f => f.name).join(', ')); 566 } 567 568 const reusedCount = uploadedBlobs.filter(b => b.reused).length; 569 const uploadedCount = uploadedBlobs.filter(b => !b.reused).length; 570 logger.info(`[File Upload] 🎉 Upload phase complete! Total: ${successfulCount} files (${uploadedCount} uploaded, ${reusedCount} reused)`); 571 572 const uploadResults: FileUploadResult[] = uploadedBlobs.map(blob => blob.result); 573 const filePaths: string[] = uploadedBlobs.map(blob => blob.filePath); 574 575 // Update directory with file blobs (only for successfully uploaded files) 576 console.log('Updating directory with blob references...'); 577 updateJobProgress(jobId, { phase: 'creating_manifest' }); 578 579 // Create a set of successfully uploaded paths for quick lookup 580 const successfulPaths = new Set(filePaths.map(path => path.replace(/^[^\/]*\//, ''))); 581 582 const updatedDirectory = updateFileBlobs(directory, uploadResults, filePaths, '', successfulPaths); 583 584 // Calculate actual file count (only successfully uploaded files) 585 const actualFileCount = uploadedBlobs.length; 586 587 // Check if we need to split into subfs records 588 // Split proactively if we have lots of files to avoid hitting manifest size limits 589 const MAX_MANIFEST_SIZE = 140 * 1024; // 140KB to be safe (PDS limit is 150KB) 590 const FILE_COUNT_THRESHOLD = 250; // Start splitting at this many files 591 const TARGET_FILE_COUNT = 200; // Try to keep main manifest under this many files 592 const subfsRecords: Array<{ uri: string; path: string }> = []; 593 let workingDirectory = updatedDirectory; 594 let currentFileCount = actualFileCount; 595 596 // Create initial manifest to check size 597 let manifest = createManifest(siteName, workingDirectory, actualFileCount); 598 let manifestSize = JSON.stringify(manifest).length; 599 600 // Split if we have lots of files OR if manifest is already too large 601 if (actualFileCount >= FILE_COUNT_THRESHOLD || manifestSize > MAX_MANIFEST_SIZE) { 602 console.log(`⚠️ Large site detected (${actualFileCount} files, ${(manifestSize / 1024).toFixed(1)}KB), splitting into subfs records...`); 603 logger.info(`Large site with ${actualFileCount} files, splitting into subfs records`); 604 605 // Keep splitting until manifest fits under limits (both size and file count) 606 let attempts = 0; 607 const MAX_ATTEMPTS = 100; // Allow many splits for very large sites 608 609 while ((manifestSize > MAX_MANIFEST_SIZE || currentFileCount > TARGET_FILE_COUNT) && attempts < MAX_ATTEMPTS) { 610 attempts++; 611 612 // Find all directories sorted by size (largest first) 613 const directories = findLargeDirectories(workingDirectory); 614 directories.sort((a, b) => b.size - a.size); 615 616 // Check if we can split subdirectories or need to split flat files 617 if (directories.length > 0) { 618 // Split the largest subdirectory 619 const largestDir = directories[0]; 620 console.log(` Split #${attempts}: ${largestDir.path} (${largestDir.fileCount} files, ${(largestDir.size / 1024).toFixed(1)}KB)`); 621 622 // Create a subfs record for this directory 623 const subfsRkey = TID.nextStr(); 624 const subfsManifest = { 625 $type: 'place.wisp.subfs' as const, 626 root: largestDir.directory, 627 fileCount: largestDir.fileCount, 628 createdAt: new Date().toISOString() 629 }; 630 631 // Validate subfs record 632 const subfsValidation = validateSubfsRecord(subfsManifest); 633 if (!subfsValidation.success) { 634 throw new Error(`Invalid subfs manifest: ${subfsValidation.error?.message || 'Validation failed'}`); 635 } 636 637 // Upload subfs record to PDS 638 const subfsRecord = await agent.com.atproto.repo.putRecord({ 639 repo: did, 640 collection: 'place.wisp.subfs', 641 rkey: subfsRkey, 642 record: subfsManifest 643 }); 644 645 const subfsUri = subfsRecord.data.uri; 646 subfsRecords.push({ uri: subfsUri, path: largestDir.path }); 647 console.log(` ✅ Created subfs: ${subfsUri}`); 648 logger.info(`Created subfs record for ${largestDir.path}: ${subfsUri}`); 649 650 // Replace directory with subfs node in the main tree 651 workingDirectory = replaceDirectoryWithSubfs(workingDirectory, largestDir.path, subfsUri); 652 currentFileCount -= largestDir.fileCount; 653 } else { 654 // No subdirectories - split flat files at root level 655 const rootFiles = workingDirectory.entries.filter(e => 'type' in e.node && e.node.type === 'file'); 656 657 if (rootFiles.length === 0) { 658 throw new Error( 659 `Cannot split manifest further - no files or directories available. ` + 660 `Current: ${currentFileCount} files, ${(manifestSize / 1024).toFixed(1)}KB.` 661 ); 662 } 663 664 // Take a chunk of files (aim for ~100 files per chunk) 665 const CHUNK_SIZE = 100; 666 const chunkFiles = rootFiles.slice(0, Math.min(CHUNK_SIZE, rootFiles.length)); 667 console.log(` Split #${attempts}: flat root (${chunkFiles.length} files)`); 668 669 // Create a directory with just these files 670 const chunkDirectory: Directory = { 671 $type: 'place.wisp.fs#directory' as const, 672 type: 'directory' as const, 673 entries: chunkFiles 674 }; 675 676 // Create subfs record for this chunk 677 const subfsRkey = TID.nextStr(); 678 const subfsManifest = { 679 $type: 'place.wisp.subfs' as const, 680 root: chunkDirectory, 681 fileCount: chunkFiles.length, 682 createdAt: new Date().toISOString() 683 }; 684 685 // Validate subfs record 686 const subfsValidation = validateSubfsRecord(subfsManifest); 687 if (!subfsValidation.success) { 688 throw new Error(`Invalid subfs manifest: ${subfsValidation.error?.message || 'Validation failed'}`); 689 } 690 691 // Upload subfs record to PDS 692 const subfsRecord = await agent.com.atproto.repo.putRecord({ 693 repo: did, 694 collection: 'place.wisp.subfs', 695 rkey: subfsRkey, 696 record: subfsManifest 697 }); 698 699 const subfsUri = subfsRecord.data.uri; 700 console.log(` ✅ Created flat subfs: ${subfsUri}`); 701 logger.info(`Created flat subfs record with ${chunkFiles.length} files: ${subfsUri}`); 702 703 // Remove these files from the working directory and add a subfs entry 704 const remainingEntries = workingDirectory.entries.filter( 705 e => !chunkFiles.some(cf => cf.name === e.name) 706 ); 707 708 // Add subfs entry (will be merged flat when expanded) 709 remainingEntries.push({ 710 name: `__subfs_${attempts}`, // Placeholder name, will be merged away 711 node: { 712 $type: 'place.wisp.fs#subfs' as const, 713 type: 'subfs' as const, 714 subject: subfsUri 715 } 716 }); 717 718 workingDirectory = { 719 $type: 'place.wisp.fs#directory' as const, 720 type: 'directory' as const, 721 entries: remainingEntries 722 }; 723 724 subfsRecords.push({ uri: subfsUri, path: `__subfs_${attempts}` }); 725 currentFileCount -= chunkFiles.length; 726 } 727 728 // Recreate manifest and check new size 729 manifest = createManifest(siteName, workingDirectory, currentFileCount); 730 manifestSize = JSON.stringify(manifest).length; 731 const newSizeKB = (manifestSize / 1024).toFixed(1); 732 console.log(` → Manifest now ${newSizeKB}KB with ${currentFileCount} files (${subfsRecords.length} subfs total)`); 733 734 // Check if we're under both limits now 735 if (manifestSize <= MAX_MANIFEST_SIZE && currentFileCount <= TARGET_FILE_COUNT) { 736 console.log(` ✅ Manifest fits! (${currentFileCount} files, ${newSizeKB}KB)`); 737 break; 738 } 739 } 740 741 if (manifestSize > MAX_MANIFEST_SIZE || currentFileCount > TARGET_FILE_COUNT) { 742 throw new Error( 743 `Failed to fit manifest after splitting ${attempts} directories. ` + 744 `Current: ${currentFileCount} files, ${(manifestSize / 1024).toFixed(1)}KB. ` + 745 `This should never happen - please report this issue.` 746 ); 747 } 748 749 console.log(`✅ Split complete: ${subfsRecords.length} subfs records, ${currentFileCount} files in main, ${(manifestSize / 1024).toFixed(1)}KB manifest`); 750 logger.info(`Split into ${subfsRecords.length} subfs records, ${currentFileCount} files remaining in main tree`); 751 } else { 752 const manifestSizeKB = (manifestSize / 1024).toFixed(1); 753 console.log(`Manifest created (${fileCount} files, ${manifestSizeKB}KB JSON) - no splitting needed`); 754 } 755 756 const rkey = siteName; 757 updateJobProgress(jobId, { phase: 'finalizing' }); 758 759 console.log('Putting record to PDS with rkey:', rkey); 760 const record = await agent.com.atproto.repo.putRecord({ 761 repo: did, 762 collection: 'place.wisp.fs', 763 rkey: rkey, 764 record: manifest 765 }); 766 console.log('Record successfully created on PDS:', record.data.uri); 767 768 // Store site in database cache 769 await upsertSite(did, rkey, siteName); 770 771 // Clean up old subfs records if we had any 772 if (oldSubfsUris.length > 0) { 773 console.log(`Cleaning up ${oldSubfsUris.length} old subfs records...`); 774 logger.info(`Cleaning up ${oldSubfsUris.length} old subfs records`); 775 776 // Delete old subfs records in parallel (don't wait for completion) 777 Promise.all( 778 oldSubfsUris.map(async ({ uri }) => { 779 try { 780 // Parse URI: at://did/collection/rkey 781 const parts = uri.replace('at://', '').split('/'); 782 const subRkey = parts[2]; 783 784 await agent.com.atproto.repo.deleteRecord({ 785 repo: did, 786 collection: 'place.wisp.subfs', 787 rkey: subRkey 788 }); 789 790 console.log(` 🗑️ Deleted old subfs: ${uri}`); 791 logger.info(`Deleted old subfs record: ${uri}`); 792 } catch (err: any) { 793 // Don't fail the whole upload if cleanup fails 794 console.warn(`Failed to delete old subfs ${uri}:`, err?.message); 795 logger.warn(`Failed to delete old subfs ${uri}`, err); 796 } 797 }) 798 ).catch(err => { 799 // Log but don't fail if cleanup fails 800 logger.warn('Some subfs cleanup operations failed', err); 801 }); 802 } 803 804 completeUploadJob(jobId, { 805 success: true, 806 uri: record.data.uri, 807 cid: record.data.cid, 808 fileCount, 809 siteName, 810 skippedFiles, 811 failedFiles, 812 uploadedCount: validUploadedFiles.length - failedFiles.length, 813 hasFailures: failedFiles.length > 0 814 }); 815 816 console.log('=== UPLOAD FILES COMPLETE ==='); 817 } catch (error) { 818 console.error('=== UPLOAD ERROR ==='); 819 console.error('Error details:', error); 820 logger.error('Upload error', error); 821 failUploadJob(jobId, error instanceof Error ? error.message : 'Unknown error'); 822 } 823} 824 825export const wispRoutes = (client: NodeOAuthClient, cookieSecret: string) => 826 new Elysia({ 827 prefix: '/wisp', 828 cookie: { 829 secrets: cookieSecret, 830 sign: ['did'] 831 } 832 }) 833 .derive(async ({ cookie }) => { 834 const auth = await requireAuth(client, cookie) 835 return { auth } 836 }) 837 .get( 838 '/upload-progress/:jobId', 839 async ({ params: { jobId }, auth, set }) => { 840 const job = getUploadJob(jobId); 841 842 if (!job) { 843 set.status = 404; 844 return { error: 'Job not found' }; 845 } 846 847 // Verify job belongs to authenticated user 848 if (job.did !== auth.did) { 849 set.status = 403; 850 return { error: 'Unauthorized' }; 851 } 852 853 // Set up SSE headers 854 set.headers = { 855 'Content-Type': 'text/event-stream', 856 'Cache-Control': 'no-cache', 857 'Connection': 'keep-alive' 858 }; 859 860 const stream = new ReadableStream({ 861 start(controller) { 862 const encoder = new TextEncoder(); 863 864 // Send initial state 865 const sendEvent = (event: string, data: any) => { 866 try { 867 const message = `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`; 868 controller.enqueue(encoder.encode(message)); 869 } catch (err) { 870 // Controller closed, ignore 871 } 872 }; 873 874 // Send keepalive comment every 15 seconds to prevent timeout 875 const keepaliveInterval = setInterval(() => { 876 try { 877 controller.enqueue(encoder.encode(': keepalive\n\n')); 878 } catch (err) { 879 // Controller closed, stop sending keepalives 880 clearInterval(keepaliveInterval); 881 } 882 }, 15000); 883 884 // Send current job state immediately 885 sendEvent('progress', { 886 status: job.status, 887 progress: job.progress, 888 result: job.result, 889 error: job.error 890 }); 891 892 // If job is already completed or failed, close the stream 893 if (job.status === 'completed' || job.status === 'failed') { 894 clearInterval(keepaliveInterval); 895 controller.close(); 896 return; 897 } 898 899 // Listen for updates 900 const cleanup = addJobListener(jobId, (event, data) => { 901 sendEvent(event, data); 902 903 // Close stream after done or error event 904 if (event === 'done' || event === 'error') { 905 clearInterval(keepaliveInterval); 906 setTimeout(() => { 907 try { 908 controller.close(); 909 } catch (err) { 910 // Already closed 911 } 912 }, 100); 913 } 914 }); 915 916 // Cleanup on disconnect 917 return () => { 918 clearInterval(keepaliveInterval); 919 cleanup(); 920 }; 921 } 922 }); 923 924 return new Response(stream); 925 } 926 ) 927 .post( 928 '/upload-files', 929 async ({ body, auth }) => { 930 const { siteName, files } = body as { 931 siteName: string; 932 files: File | File[] 933 }; 934 935 console.log('=== UPLOAD FILES START ==='); 936 console.log('Site name:', siteName); 937 console.log('Files received:', Array.isArray(files) ? files.length : 'single file'); 938 939 try { 940 if (!siteName) { 941 throw new Error('Site name is required') 942 } 943 944 if (!isValidSiteName(siteName)) { 945 throw new Error('Invalid site name: must be 1-512 characters and contain only alphanumeric, dots, dashes, underscores, tildes, and colons') 946 } 947 948 // Check if files were provided 949 const hasFiles = files && (Array.isArray(files) ? files.length > 0 : !!files); 950 951 if (!hasFiles) { 952 // Handle empty upload synchronously (fast operation) 953 const agent = new Agent((url, init) => auth.session.fetchHandler(url, init)) 954 955 const emptyManifest = { 956 $type: 'place.wisp.fs', 957 site: siteName, 958 root: { 959 type: 'directory', 960 entries: [] 961 }, 962 fileCount: 0, 963 createdAt: new Date().toISOString() 964 }; 965 966 const validationResult = validateRecord(emptyManifest); 967 if (!validationResult.success) { 968 throw new Error(`Invalid manifest: ${validationResult.error?.message || 'Validation failed'}`); 969 } 970 971 const rkey = siteName; 972 973 const record = await agent.com.atproto.repo.putRecord({ 974 repo: auth.did, 975 collection: 'place.wisp.fs', 976 rkey: rkey, 977 record: emptyManifest 978 }); 979 980 await upsertSite(auth.did, rkey, siteName); 981 982 return { 983 success: true, 984 uri: record.data.uri, 985 cid: record.data.cid, 986 fileCount: 0, 987 siteName 988 }; 989 } 990 991 // For file uploads, create a job and process in background 992 const fileArray = Array.isArray(files) ? files : [files]; 993 const jobId = createUploadJob(auth.did, siteName, fileArray.length); 994 995 // Track upload speeds to estimate progress 996 const uploadStats = { 997 speeds: [] as number[], // MB/s from completed uploads 998 getAverageSpeed(): number { 999 if (this.speeds.length === 0) return 3; // Default 3 MB/s 1000 const sum = this.speeds.reduce((a, b) => a + b, 0); 1001 return sum / this.speeds.length; 1002 } 1003 }; 1004 1005 // Create agent with OAuth session and upload progress monitoring 1006 const wrappedFetchHandler = async (url: string, init?: RequestInit) => { 1007 // Check if this is an uploadBlob request with a body 1008 if (url.includes('uploadBlob') && init?.body) { 1009 const originalBody = init.body; 1010 const bodySize = originalBody instanceof Uint8Array ? originalBody.length : 1011 originalBody instanceof ArrayBuffer ? originalBody.byteLength : 1012 typeof originalBody === 'string' ? new TextEncoder().encode(originalBody).length : 0; 1013 1014 const startTime = Date.now(); 1015 1016 if (bodySize > 10 * 1024 * 1024) { // Files over 10MB 1017 const sizeMB = (bodySize / 1024 / 1024).toFixed(1); 1018 const avgSpeed = uploadStats.getAverageSpeed(); 1019 const estimatedDuration = (bodySize / 1024 / 1024) / avgSpeed; 1020 1021 console.log(`[Upload Progress] Starting upload of ${sizeMB}MB file`); 1022 console.log(`[Upload Stats] Measured speeds from last ${uploadStats.speeds.length} files:`, uploadStats.speeds.map(s => s.toFixed(2) + ' MB/s').join(', ')); 1023 console.log(`[Upload Stats] Average speed: ${avgSpeed.toFixed(2)} MB/s, estimated duration: ${estimatedDuration.toFixed(0)}s`); 1024 1025 // Log estimated progress every 5 seconds 1026 const progressInterval = setInterval(() => { 1027 const elapsed = (Date.now() - startTime) / 1000; 1028 const estimatedPercent = Math.min(95, Math.round((elapsed / estimatedDuration) * 100)); 1029 const estimatedMB = Math.min(bodySize / 1024 / 1024, elapsed * avgSpeed).toFixed(1); 1030 console.log(`[Upload Progress] ~${estimatedPercent}% (~${estimatedMB}/${sizeMB}MB) - ${elapsed.toFixed(0)}s elapsed`); 1031 }, 5000); 1032 1033 try { 1034 const result = await auth.session.fetchHandler(url, init); 1035 clearInterval(progressInterval); 1036 const totalTime = (Date.now() - startTime) / 1000; 1037 const actualSpeed = (bodySize / 1024 / 1024) / totalTime; 1038 uploadStats.speeds.push(actualSpeed); 1039 // Keep only last 10 uploads for rolling average 1040 if (uploadStats.speeds.length > 10) uploadStats.speeds.shift(); 1041 console.log(`[Upload Progress] ✅ Completed ${sizeMB}MB in ${totalTime.toFixed(1)}s (${actualSpeed.toFixed(1)} MB/s)`); 1042 return result; 1043 } catch (err) { 1044 clearInterval(progressInterval); 1045 const elapsed = (Date.now() - startTime) / 1000; 1046 console.error(`[Upload Progress] ❌ Upload failed after ${elapsed.toFixed(1)}s`); 1047 throw err; 1048 } 1049 } else { 1050 // Track small files too for speed calculation 1051 try { 1052 const result = await auth.session.fetchHandler(url, init); 1053 const totalTime = (Date.now() - startTime) / 1000; 1054 if (totalTime > 0.5) { // Only track if > 0.5s 1055 const actualSpeed = (bodySize / 1024 / 1024) / totalTime; 1056 uploadStats.speeds.push(actualSpeed); 1057 if (uploadStats.speeds.length > 10) uploadStats.speeds.shift(); 1058 console.log(`[Upload Stats] Small file: ${(bodySize / 1024).toFixed(1)}KB in ${totalTime.toFixed(2)}s = ${actualSpeed.toFixed(2)} MB/s`); 1059 } 1060 return result; 1061 } catch (err) { 1062 throw err; 1063 } 1064 } 1065 } 1066 1067 // Normal request 1068 return auth.session.fetchHandler(url, init); 1069 }; 1070 1071 const agent = new Agent(wrappedFetchHandler) 1072 console.log('Agent created for DID:', auth.did); 1073 console.log('Created upload job:', jobId); 1074 1075 // Start background processing (don't await) 1076 processUploadInBackground(jobId, agent, auth.did, siteName, fileArray).catch(err => { 1077 console.error('Background upload process failed:', err); 1078 logger.error('Background upload process failed', err); 1079 }); 1080 1081 // Return immediately with job ID 1082 return { 1083 success: true, 1084 jobId, 1085 message: 'Upload started. Connect to /wisp/upload-progress/' + jobId + ' for progress updates.' 1086 }; 1087 } catch (error) { 1088 console.error('=== UPLOAD ERROR ==='); 1089 console.error('Error details:', error); 1090 logger.error('Upload error', error); 1091 throw new Error(`Failed to upload files: ${error instanceof Error ? error.message : 'Unknown error'}`); 1092 } 1093 } 1094 )