Monorepo for wisp.place. A static site hosting service built on top of the AT Protocol. wisp.place
at main 54 kB view raw
1import { Elysia } from 'elysia' 2import { requireAuth, type AuthenticatedContext } from '../lib/wisp-auth' 3import { NodeOAuthClient } from '@atproto/oauth-client-node' 4import { Agent } from '@atproto/api' 5import { TID } from '@atproto/common-web' 6import { 7 type UploadedFile, 8 type FileUploadResult, 9 processUploadedFiles, 10 updateFileBlobs, 11 findLargeDirectories, 12 replaceDirectoryWithSubfs, 13 estimateDirectorySize 14} from '@wisp/fs-utils' 15import { 16 shouldCompressFile, 17 compressFile, 18 computeCID, 19 extractBlobMap, 20 extractSubfsUris 21} from '@wisp/atproto-utils' 22import { createManifest } from '@wisp/fs-utils' 23import { upsertSite } from '../lib/db' 24import { createLogger } from '@wisp/observability' 25// import { validateRecord, type Directory } from '@wisp/lexicons/types/place/wisp/fs' 26import { type Directory } from '@wisp/lexicons/types/place/wisp/fs' 27// import { validateRecord as validateSubfsRecord } from '@wisp/lexicons/types/place/wisp/subfs' 28import { MAX_SITE_SIZE, MAX_FILE_SIZE, MAX_FILE_COUNT } from '@wisp/constants' 29import { 30 createUploadJob, 31 getUploadJob, 32 updateJobProgress, 33 completeUploadJob, 34 failUploadJob, 35 addJobListener 36} from '../lib/upload-jobs' 37import { createIgnoreMatcher, shouldIgnore, parseWispignore } from '../lib/ignore-patterns' 38import type { Ignore } from 'ignore' 39 40const logger = createLogger('main-app') 41 42function isValidSiteName(siteName: string): boolean { 43 if (!siteName || typeof siteName !== 'string') return false; 44 45 // Length check (AT Protocol rkey limit) 46 if (siteName.length < 1 || siteName.length > 512) return false; 47 48 // Check for path traversal 49 if (siteName === '.' || siteName === '..') return false; 50 if (siteName.includes('/') || siteName.includes('\\')) return false; 51 if (siteName.includes('\0')) return false; 52 53 // AT Protocol rkey format: alphanumeric, dots, dashes, underscores, tildes, colons 54 // Based on NSID format rules 55 const validRkeyPattern = /^[a-zA-Z0-9._~:-]+$/; 56 if (!validRkeyPattern.test(siteName)) return false; 57 58 return true; 59} 60 61async function processUploadInBackground( 62 jobId: string, 63 agent: Agent, 64 did: string, 65 siteName: string, 66 fileArray: File[] 67): Promise<void> { 68 try { 69 // Try to fetch existing record to enable incremental updates 70 let existingBlobMap = new Map<string, { blobRef: any; cid: string }>(); 71 let oldSubfsUris: Array<{ uri: string; path: string }> = []; 72 console.log('Attempting to fetch existing record...'); 73 updateJobProgress(jobId, { phase: 'validating' }); 74 75 try { 76 const rkey = siteName; 77 const existingRecord = await agent.com.atproto.repo.getRecord({ 78 repo: did, 79 collection: 'place.wisp.fs', 80 rkey: rkey 81 }); 82 console.log('Existing record found!'); 83 84 if (existingRecord.data.value && typeof existingRecord.data.value === 'object' && 'root' in existingRecord.data.value) { 85 const manifest = existingRecord.data.value as any; 86 87 // Extract blob map from main record 88 existingBlobMap = extractBlobMap(manifest.root); 89 console.log(`Found existing manifest with ${existingBlobMap.size} files in main record`); 90 91 // Extract subfs URIs with their mount paths from main record 92 const subfsUris = extractSubfsUris(manifest.root); 93 oldSubfsUris = subfsUris; // Save for cleanup later 94 95 if (subfsUris.length > 0) { 96 console.log(`Found ${subfsUris.length} subfs records, fetching in parallel...`); 97 logger.info(`Fetching ${subfsUris.length} subfs records for blob reuse`); 98 99 // Fetch all subfs records in parallel 100 const subfsRecords = await Promise.all( 101 subfsUris.map(async ({ uri, path }) => { 102 try { 103 // Parse URI: at://did/collection/rkey 104 const parts = uri.replace('at://', '').split('/'); 105 const subDid = parts[0]; 106 const collection = parts[1]; 107 const subRkey = parts[2]; 108 109 const record = await agent.com.atproto.repo.getRecord({ 110 repo: subDid, 111 collection: collection, 112 rkey: subRkey 113 }); 114 115 return { record: record.data.value as any, mountPath: path }; 116 } catch (err: any) { 117 logger.warn(`Failed to fetch subfs record ${uri}: ${err?.message}`, err); 118 return null; 119 } 120 }) 121 ); 122 123 // Merge blob maps from all subfs records 124 let totalSubfsBlobs = 0; 125 for (const subfsData of subfsRecords) { 126 if (subfsData && subfsData.record && 'root' in subfsData.record) { 127 // Extract blobs with the correct mount path prefix 128 const subfsMap = extractBlobMap(subfsData.record.root, subfsData.mountPath); 129 subfsMap.forEach((value, key) => { 130 existingBlobMap.set(key, value); 131 totalSubfsBlobs++; 132 }); 133 } 134 } 135 136 console.log(`Merged ${totalSubfsBlobs} files from ${subfsUris.length} subfs records`); 137 logger.info(`Total blob map: ${existingBlobMap.size} files (main + subfs)`); 138 } 139 140 console.log(`Total existing blobs for reuse: ${existingBlobMap.size} files`); 141 logger.info(`Found existing manifest with ${existingBlobMap.size} files for incremental update`); 142 } 143 } catch (error: any) { 144 console.log('No existing record found or error:', error?.message || error); 145 if (error?.status !== 400 && error?.error !== 'RecordNotFound') { 146 logger.warn('Failed to fetch existing record, proceeding with full upload', error); 147 } 148 } 149 150 // Check for .wispignore file in uploaded files 151 let customIgnorePatterns: string[] = []; 152 const wispignoreFile = fileArray.find(f => f && f.name && f.name.endsWith('.wispignore')); 153 if (wispignoreFile) { 154 try { 155 const content = await wispignoreFile.text(); 156 customIgnorePatterns = parseWispignore(content); 157 console.log(`Found .wispignore file with ${customIgnorePatterns.length} custom patterns`); 158 } catch (err) { 159 console.warn('Failed to parse .wispignore file:', err); 160 } 161 } 162 163 // Create ignore matcher with default and custom patterns 164 const ignoreMatcher = createIgnoreMatcher(customIgnorePatterns); 165 166 // Convert File objects to UploadedFile format 167 const uploadedFiles: UploadedFile[] = []; 168 const skippedFiles: Array<{ name: string; reason: string }> = []; 169 170 console.log('Processing files, count:', fileArray.length); 171 updateJobProgress(jobId, { phase: 'compressing' }); 172 173 for (let i = 0; i < fileArray.length; i++) { 174 const file = fileArray[i]; 175 176 // Skip undefined/null files 177 if (!file || !file.name) { 178 console.log(`Skipping undefined file at index ${i}`); 179 skippedFiles.push({ 180 name: `[undefined file at index ${i}]`, 181 reason: 'Invalid file object' 182 }); 183 continue; 184 } 185 186 console.log(`Processing file ${i + 1}/${fileArray.length}:`, file.name, file.size, 'bytes'); 187 updateJobProgress(jobId, { 188 filesProcessed: i + 1, 189 currentFile: file.name 190 }); 191 192 // Skip files that match ignore patterns 193 const normalizedPath = file.name.replace(/^[^\/]*\//, ''); 194 195 if (shouldIgnore(ignoreMatcher, normalizedPath)) { 196 console.log(`Skipping ignored file: ${file.name}`); 197 skippedFiles.push({ 198 name: file.name, 199 reason: 'matched ignore pattern' 200 }); 201 continue; 202 } 203 204 // Skip files that are too large 205 const maxSize = MAX_FILE_SIZE; 206 if (file.size > maxSize) { 207 skippedFiles.push({ 208 name: file.name, 209 reason: `file too large (${(file.size / 1024 / 1024).toFixed(2)}MB, max 100MB)` 210 }); 211 continue; 212 } 213 214 const arrayBuffer = await file.arrayBuffer(); 215 const originalContent = Buffer.from(arrayBuffer); 216 const originalMimeType = file.type || 'application/octet-stream'; 217 218 // Determine if file should be compressed (pass filename to exclude _redirects) 219 const shouldCompress = shouldCompressFile(originalMimeType, normalizedPath); 220 221 // Text files (HTML/CSS/JS) need base64 encoding to prevent PDS content sniffing 222 // Audio files just need compression without base64 223 const needsBase64 = 224 originalMimeType.startsWith('text/') || 225 originalMimeType.startsWith('application/json') || 226 originalMimeType.startsWith('application/xml') || 227 originalMimeType === 'image/svg+xml'; 228 229 let finalContent: Buffer; 230 let compressed = false; 231 let base64Encoded = false; 232 233 if (shouldCompress) { 234 const compressedContent = compressFile(originalContent); 235 compressed = true; 236 237 if (needsBase64) { 238 // Text files: compress AND base64 encode 239 finalContent = Buffer.from(compressedContent.toString('base64'), 'binary'); 240 base64Encoded = true; 241 const compressionRatio = (compressedContent.length / originalContent.length * 100).toFixed(1); 242 console.log(`Compressing+base64 ${file.name}: ${originalContent.length} -> ${compressedContent.length} bytes (${compressionRatio}%), base64: ${finalContent.length} bytes`); 243 logger.info(`Compressing+base64 ${file.name}: ${originalContent.length} -> ${compressedContent.length} bytes (${compressionRatio}%), base64: ${finalContent.length} bytes`); 244 } else { 245 // Audio files: just compress, no base64 246 finalContent = compressedContent; 247 const compressionRatio = (compressedContent.length / originalContent.length * 100).toFixed(1); 248 console.log(`Compressing ${file.name}: ${originalContent.length} -> ${compressedContent.length} bytes (${compressionRatio}%)`); 249 logger.info(`Compressing ${file.name}: ${originalContent.length} -> ${compressedContent.length} bytes (${compressionRatio}%)`); 250 } 251 } else { 252 // Binary files: upload directly 253 finalContent = originalContent; 254 console.log(`Uploading ${file.name} directly: ${originalContent.length} bytes (no compression)`); 255 logger.info(`Uploading ${file.name} directly: ${originalContent.length} bytes (binary)`); 256 } 257 258 uploadedFiles.push({ 259 name: file.name, 260 content: finalContent, 261 mimeType: originalMimeType, 262 size: finalContent.length, 263 compressed, 264 base64Encoded, 265 originalMimeType 266 }); 267 } 268 269 // Update total file count after filtering (important for progress tracking) 270 updateJobProgress(jobId, { 271 totalFiles: uploadedFiles.length 272 }); 273 274 // Check total size limit 275 const totalSize = uploadedFiles.reduce((sum, file) => sum + file.size, 0); 276 const maxTotalSize = MAX_SITE_SIZE; 277 278 if (totalSize > maxTotalSize) { 279 throw new Error(`Total upload size ${(totalSize / 1024 / 1024).toFixed(2)}MB exceeds 300MB limit`); 280 } 281 282 // Check file count limit 283 if (uploadedFiles.length > MAX_FILE_COUNT) { 284 throw new Error(`File count ${uploadedFiles.length} exceeds ${MAX_FILE_COUNT} files limit`); 285 } 286 287 console.log(`After filtering: ${uploadedFiles.length} files to process (${skippedFiles.length} skipped)`); 288 289 if (uploadedFiles.length === 0) { 290 // Create empty manifest 291 const emptyManifest = { 292 $type: 'place.wisp.fs', 293 site: siteName, 294 root: { 295 type: 'directory', 296 entries: [] 297 }, 298 fileCount: 0, 299 createdAt: new Date().toISOString() 300 }; 301 302 // const validationResult = validateRecord(emptyManifest); 303 // if (!validationResult.success) { 304 // throw new Error(`Invalid manifest: ${validationResult.error?.message || 'Validation failed'}`); 305 // } 306 307 const rkey = siteName; 308 updateJobProgress(jobId, { phase: 'finalizing' }); 309 310 const record = await agent.com.atproto.repo.putRecord({ 311 repo: did, 312 collection: 'place.wisp.fs', 313 rkey: rkey, 314 record: emptyManifest 315 }); 316 317 await upsertSite(did, rkey, siteName); 318 319 completeUploadJob(jobId, { 320 success: true, 321 uri: record.data.uri, 322 cid: record.data.cid, 323 fileCount: 0, 324 siteName, 325 skippedFiles 326 }); 327 return; 328 } 329 330 // Process files into directory structure 331 console.log('Processing uploaded files into directory structure...'); 332 const validUploadedFiles = uploadedFiles.filter((f, i) => { 333 if (!f || !f.name || !f.content) { 334 console.error(`Filtering out invalid file at index ${i}`); 335 return false; 336 } 337 return true; 338 }); 339 340 const { directory, fileCount } = processUploadedFiles(validUploadedFiles); 341 console.log('Directory structure created, file count:', fileCount); 342 343 // Upload files as blobs with retry logic for DPoP nonce conflicts 344 console.log('Starting blob upload/reuse phase...'); 345 updateJobProgress(jobId, { phase: 'uploading' }); 346 347 // Helper function to upload blob with exponential backoff retry and timeout 348 const uploadBlobWithRetry = async ( 349 agent: Agent, 350 content: Buffer, 351 mimeType: string, 352 fileName: string, 353 maxRetries = 5 354 ) => { 355 for (let attempt = 0; attempt < maxRetries; attempt++) { 356 const controller = new AbortController(); 357 const timeoutMs = 300000; // 5 minute timeout per upload 358 const timeoutId = setTimeout(() => controller.abort(), timeoutMs); 359 360 try { 361 console.log(`[File Upload] Starting upload attempt ${attempt + 1}/${maxRetries} for ${fileName} (${content.length} bytes, ${mimeType})`); 362 363 const result = await agent.com.atproto.repo.uploadBlob(content, { encoding: mimeType }); 364 clearTimeout(timeoutId); 365 console.log(`[File Upload] ✅ Successfully uploaded ${fileName} on attempt ${attempt + 1}`); 366 return result; 367 } catch (error: any) { 368 clearTimeout(timeoutId); 369 370 const isDPoPNonceError = 371 error?.message?.toLowerCase().includes('nonce') || 372 error?.message?.toLowerCase().includes('dpop') || 373 error?.status === 409; 374 375 const isTimeout = error?.name === 'AbortError' || error?.message === 'Upload timeout'; 376 const isRateLimited = error?.status === 429 || error?.message?.toLowerCase().includes('rate'); 377 const isRequestEntityTooLarge = error?.status === 419 || error?.status === 413; 378 379 // Special handling for 419/413 Request Entity Too Large errors 380 if (isRequestEntityTooLarge) { 381 const customError = new Error('Your PDS is not allowing uploads large enough to store your site. Please contact your PDS host. This could also possibly be a result of it being behind Cloudflare free tier.'); 382 (customError as any).status = 419; 383 throw customError; 384 } 385 386 // Retry on DPoP nonce conflicts, timeouts, or rate limits 387 if ((isDPoPNonceError || isTimeout || isRateLimited) && attempt < maxRetries - 1) { 388 let backoffMs: number; 389 if (isRateLimited) { 390 backoffMs = 2000 * Math.pow(2, attempt); // 2s, 4s, 8s, 16s for rate limits 391 } else if (isTimeout) { 392 backoffMs = 1000 * Math.pow(2, attempt); // 1s, 2s, 4s, 8s for timeouts 393 } else { 394 backoffMs = 100 * Math.pow(2, attempt); // 100ms, 200ms, 400ms for DPoP 395 } 396 397 const reason = isDPoPNonceError ? 'DPoP nonce conflict' : isTimeout ? 'timeout' : 'rate limit'; 398 logger.info(`[File Upload] 🔄 ${reason} for ${fileName}, retrying in ${backoffMs}ms (attempt ${attempt + 1}/${maxRetries})`); 399 console.log(`[File Upload] 🔄 ${reason} for ${fileName}, retrying in ${backoffMs}ms`); 400 await new Promise(resolve => setTimeout(resolve, backoffMs)); 401 continue; 402 } 403 404 // Log detailed error information before throwing 405 logger.error(`[File Upload] ❌ Upload failed for ${fileName} (size: ${content.length} bytes, mimeType: ${mimeType}, attempt: ${attempt + 1}/${maxRetries})`, { 406 error: error?.error || error?.message || 'Unknown error', 407 status: error?.status, 408 headers: error?.headers, 409 success: error?.success 410 }); 411 console.error(`[File Upload] ❌ Upload failed for ${fileName}:`, { 412 error: error?.error || error?.message || 'Unknown error', 413 status: error?.status, 414 size: content.length, 415 mimeType, 416 attempt: attempt + 1 417 }); 418 throw error; 419 } 420 } 421 throw new Error(`Failed to upload ${fileName} after ${maxRetries} attempts`); 422 }; 423 424 // Use sliding window concurrency for maximum throughput 425 const CONCURRENCY_LIMIT = 20; // Maximum concurrent uploads 426 const uploadedBlobs: Array<{ 427 result: FileUploadResult; 428 filePath: string; 429 sentMimeType: string; 430 returnedMimeType: string; 431 reused: boolean; 432 }> = []; 433 const failedFiles: Array<{ 434 name: string; 435 index: number; 436 error: string; 437 size: number; 438 }> = []; 439 440 // Track completed files count for accurate progress 441 let completedFilesCount = 0; 442 443 // Process file with sliding window concurrency 444 const processFile = async (file: UploadedFile, index: number) => { 445 try { 446 if (!file || !file.name) { 447 throw new Error(`Undefined file at index ${index}`); 448 } 449 450 const fileCID = computeCID(file.content); 451 const normalizedPath = file.name.replace(/^[^\/]*\//, ''); 452 const existingBlob = existingBlobMap.get(normalizedPath) || existingBlobMap.get(file.name); 453 454 if (existingBlob && existingBlob.cid === fileCID) { 455 logger.info(`[File Upload] ♻️ Reused: ${file.name} (unchanged, CID: ${fileCID})`); 456 const reusedCount = (getUploadJob(jobId)?.progress.filesReused || 0) + 1; 457 completedFilesCount++; 458 updateJobProgress(jobId, { 459 filesReused: reusedCount, 460 currentFile: `${completedFilesCount}/${validUploadedFiles.length}: ${file.name} (reused)` 461 }); 462 463 return { 464 result: { 465 hash: existingBlob.cid, 466 blobRef: existingBlob.blobRef, 467 ...(file.compressed && { 468 encoding: 'gzip' as const, 469 mimeType: file.originalMimeType || file.mimeType, 470 base64: file.base64Encoded || false 471 }) 472 }, 473 filePath: file.name, 474 sentMimeType: file.mimeType, 475 returnedMimeType: existingBlob.blobRef.mimeType, 476 reused: true 477 }; 478 } 479 480 const uploadMimeType = file.compressed || file.mimeType.startsWith('text/html') 481 ? 'application/octet-stream' 482 : file.mimeType; 483 484 const compressionInfo = file.compressed ? ' (gzipped)' : ''; 485 const fileSizeMB = (file.size / 1024 / 1024).toFixed(2); 486 logger.info(`[File Upload] ⬆️ Uploading: ${file.name} (${fileSizeMB}MB${compressionInfo})`); 487 488 const uploadResult = await uploadBlobWithRetry( 489 agent, 490 file.content, 491 uploadMimeType, 492 file.name 493 ); 494 495 const returnedBlobRef = uploadResult.data.blob; 496 const uploadedCount = (getUploadJob(jobId)?.progress.filesUploaded || 0) + 1; 497 completedFilesCount++; 498 updateJobProgress(jobId, { 499 filesUploaded: uploadedCount, 500 currentFile: `${completedFilesCount}/${validUploadedFiles.length}: ${file.name} (uploaded)` 501 }); 502 logger.info(`[File Upload] ✅ Uploaded: ${file.name} (CID: ${fileCID})`); 503 504 return { 505 result: { 506 hash: returnedBlobRef.ref.toString(), 507 blobRef: returnedBlobRef, 508 ...(file.compressed && { 509 encoding: 'gzip' as const, 510 mimeType: file.originalMimeType || file.mimeType, 511 base64: file.base64Encoded || false 512 }) 513 }, 514 filePath: file.name, 515 sentMimeType: file.mimeType, 516 returnedMimeType: returnedBlobRef.mimeType, 517 reused: false 518 }; 519 } catch (uploadError) { 520 const fileName = file?.name || 'unknown'; 521 const fileSize = file?.size || 0; 522 const errorMessage = uploadError instanceof Error ? uploadError.message : 'Unknown error'; 523 const errorDetails = { 524 fileName, 525 fileSize, 526 index, 527 error: errorMessage, 528 stack: uploadError instanceof Error ? uploadError.stack : undefined 529 }; 530 logger.error(`Upload failed for file: ${fileName} (${fileSize} bytes) at index ${index}`, errorDetails); 531 console.error(`Upload failed for file: ${fileName} (${fileSize} bytes) at index ${index}`, errorDetails); 532 533 completedFilesCount++; 534 updateJobProgress(jobId, { 535 currentFile: `${completedFilesCount}/${validUploadedFiles.length}: ${fileName} (failed)` 536 }); 537 538 // Track failed file but don't throw - continue with other files 539 failedFiles.push({ 540 name: fileName, 541 index, 542 error: errorMessage, 543 size: fileSize 544 }); 545 546 return null; // Return null to indicate failure 547 } 548 }; 549 550 // Sliding window concurrency control 551 const processWithConcurrency = async () => { 552 const results: any[] = []; 553 let fileIndex = 0; 554 const executing = new Map<Promise<void>, { index: number; name: string }>(); 555 556 for (const file of validUploadedFiles) { 557 const currentIndex = fileIndex++; 558 559 const promise = processFile(file, currentIndex) 560 .then(result => { 561 results[currentIndex] = result; 562 }) 563 .catch(error => { 564 // This shouldn't happen since processFile catches errors, but just in case 565 logger.error(`Unexpected error processing file at index ${currentIndex}`, error); 566 results[currentIndex] = null; 567 }) 568 .finally(() => { 569 executing.delete(promise); 570 }); 571 572 executing.set(promise, { index: currentIndex, name: file.name }); 573 574 if (executing.size >= CONCURRENCY_LIMIT) { 575 await Promise.race(executing.keys()); 576 } 577 } 578 579 // Wait for remaining uploads 580 await Promise.all(executing.keys()); 581 console.log(`\n✅ Upload complete: ${completedFilesCount}/${validUploadedFiles.length} files processed\n`); 582 return results.filter(r => r !== undefined && r !== null); // Filter out null (failed) and undefined entries 583 }; 584 585 const allResults = await processWithConcurrency(); 586 uploadedBlobs.push(...allResults); 587 588 const currentReused = uploadedBlobs.filter(b => b.reused).length; 589 const currentUploaded = uploadedBlobs.filter(b => !b.reused).length; 590 const successfulCount = uploadedBlobs.length; 591 const failedCount = failedFiles.length; 592 593 logger.info(`[File Upload] 🎉 Upload complete → ${successfulCount}/${validUploadedFiles.length} files succeeded (${currentUploaded} uploaded, ${currentReused} reused), ${failedCount} failed`); 594 595 if (failedCount > 0) { 596 logger.warn(`[File Upload] ⚠️ Failed files:`, failedFiles); 597 console.warn(`[File Upload] ⚠️ ${failedCount} files failed to upload:`, failedFiles.map(f => f.name).join(', ')); 598 } 599 600 const reusedCount = uploadedBlobs.filter(b => b.reused).length; 601 const uploadedCount = uploadedBlobs.filter(b => !b.reused).length; 602 logger.info(`[File Upload] 🎉 Upload phase complete! Total: ${successfulCount} files (${uploadedCount} uploaded, ${reusedCount} reused)`); 603 604 const uploadResults: FileUploadResult[] = uploadedBlobs.map(blob => blob.result); 605 const filePaths: string[] = uploadedBlobs.map(blob => blob.filePath); 606 607 // Update directory with file blobs (only for successfully uploaded files) 608 console.log('Updating directory with blob references...'); 609 updateJobProgress(jobId, { phase: 'creating_manifest' }); 610 611 // Create a set of successfully uploaded paths for quick lookup 612 const successfulPaths = new Set(filePaths.map(path => path.replace(/^[^\/]*\//, ''))); 613 614 const updatedDirectory = updateFileBlobs(directory, uploadResults, filePaths, '', successfulPaths); 615 616 // Calculate actual file count (only successfully uploaded files) 617 const actualFileCount = uploadedBlobs.length; 618 619 // Check if we need to split into subfs records 620 // Split proactively if we have lots of files to avoid hitting manifest size limits 621 const MAX_MANIFEST_SIZE = 140 * 1024; // 140KB to be safe (PDS limit is 150KB) 622 const FILE_COUNT_THRESHOLD = 250; // Start splitting at this many files 623 const TARGET_FILE_COUNT = 200; // Try to keep main manifest under this many files 624 const subfsRecords: Array<{ uri: string; path: string }> = []; 625 let workingDirectory = updatedDirectory; 626 let currentFileCount = actualFileCount; 627 628 // Create initial manifest to check size 629 let manifest = createManifest(siteName, workingDirectory, actualFileCount); 630 let manifestSize = JSON.stringify(manifest).length; 631 632 // Split if we have lots of files OR if manifest is already too large 633 if (actualFileCount >= FILE_COUNT_THRESHOLD || manifestSize > MAX_MANIFEST_SIZE) { 634 console.log(`⚠️ Large site detected (${actualFileCount} files, ${(manifestSize / 1024).toFixed(1)}KB), splitting into subfs records...`); 635 logger.info(`Large site with ${actualFileCount} files, splitting into subfs records`); 636 637 // Keep splitting until manifest fits under limits (both size and file count) 638 let attempts = 0; 639 const MAX_ATTEMPTS = 100; // Allow many splits for very large sites 640 641 while ((manifestSize > MAX_MANIFEST_SIZE || currentFileCount > TARGET_FILE_COUNT) && attempts < MAX_ATTEMPTS) { 642 attempts++; 643 644 // Find all directories sorted by size (largest first) 645 const directories = findLargeDirectories(workingDirectory); 646 directories.sort((a, b) => b.size - a.size); 647 648 // Check if we can split subdirectories or need to split flat files 649 if (directories.length > 0) { 650 // Split the largest subdirectory 651 const largestDir = directories[0]; 652 console.log(` Split #${attempts}: ${largestDir.path} (${largestDir.fileCount} files, ${(largestDir.size / 1024).toFixed(1)}KB)`); 653 654 // Create a subfs record for this directory 655 const subfsRkey = TID.nextStr(); 656 const subfsManifest = { 657 $type: 'place.wisp.subfs' as const, 658 root: largestDir.directory, 659 fileCount: largestDir.fileCount, 660 createdAt: new Date().toISOString() 661 }; 662 663 // Validate subfs record 664 // const subfsValidation = validateSubfsRecord(subfsManifest); 665 // if (!subfsValidation.success) { 666 // throw new Error(`Invalid subfs manifest: ${subfsValidation.error?.message || 'Validation failed'}`); 667 // } 668 669 // Upload subfs record to PDS 670 const subfsRecord = await agent.com.atproto.repo.putRecord({ 671 repo: did, 672 collection: 'place.wisp.subfs', 673 rkey: subfsRkey, 674 record: subfsManifest 675 }); 676 677 const subfsUri = subfsRecord.data.uri; 678 subfsRecords.push({ uri: subfsUri, path: largestDir.path }); 679 console.log(` ✅ Created subfs: ${subfsUri}`); 680 logger.info(`Created subfs record for ${largestDir.path}: ${subfsUri}`); 681 682 // Replace directory with subfs node in the main tree 683 workingDirectory = replaceDirectoryWithSubfs(workingDirectory, largestDir.path, subfsUri); 684 currentFileCount -= largestDir.fileCount; 685 } else { 686 // No subdirectories - split flat files at root level 687 const rootFiles = workingDirectory.entries.filter(e => 'type' in e.node && e.node.type === 'file'); 688 689 if (rootFiles.length === 0) { 690 throw new Error( 691 `Cannot split manifest further - no files or directories available. ` + 692 `Current: ${currentFileCount} files, ${(manifestSize / 1024).toFixed(1)}KB.` 693 ); 694 } 695 696 // Take a chunk of files (aim for ~100 files per chunk) 697 const CHUNK_SIZE = 100; 698 const chunkFiles = rootFiles.slice(0, Math.min(CHUNK_SIZE, rootFiles.length)); 699 console.log(` Split #${attempts}: flat root (${chunkFiles.length} files)`); 700 701 // Create a directory with just these files 702 const chunkDirectory: Directory = { 703 $type: 'place.wisp.fs#directory' as const, 704 type: 'directory' as const, 705 entries: chunkFiles 706 }; 707 708 // Create subfs record for this chunk 709 const subfsRkey = TID.nextStr(); 710 const subfsManifest = { 711 $type: 'place.wisp.subfs' as const, 712 root: chunkDirectory, 713 fileCount: chunkFiles.length, 714 createdAt: new Date().toISOString() 715 }; 716 717 // Validate subfs record 718 // const subfsValidation = validateSubfsRecord(subfsManifest); 719 // if (!subfsValidation.success) { 720 // throw new Error(`Invalid subfs manifest: ${subfsValidation.error?.message || 'Validation failed'}`); 721 // } 722 723 // Upload subfs record to PDS 724 const subfsRecord = await agent.com.atproto.repo.putRecord({ 725 repo: did, 726 collection: 'place.wisp.subfs', 727 rkey: subfsRkey, 728 record: subfsManifest 729 }); 730 731 const subfsUri = subfsRecord.data.uri; 732 console.log(` ✅ Created flat subfs: ${subfsUri}`); 733 logger.info(`Created flat subfs record with ${chunkFiles.length} files: ${subfsUri}`); 734 735 // Remove these files from the working directory and add a subfs entry 736 const remainingEntries = workingDirectory.entries.filter( 737 e => !chunkFiles.some(cf => cf.name === e.name) 738 ); 739 740 // Add subfs entry (will be merged flat when expanded) 741 remainingEntries.push({ 742 name: `__subfs_${attempts}`, // Placeholder name, will be merged away 743 node: { 744 $type: 'place.wisp.fs#subfs' as const, 745 type: 'subfs' as const, 746 subject: subfsUri, 747 flat: true // Merge entries directly into parent (default, but explicit for clarity) 748 } 749 }); 750 751 workingDirectory = { 752 $type: 'place.wisp.fs#directory' as const, 753 type: 'directory' as const, 754 entries: remainingEntries 755 }; 756 757 subfsRecords.push({ uri: subfsUri, path: `__subfs_${attempts}` }); 758 currentFileCount -= chunkFiles.length; 759 } 760 761 // Recreate manifest and check new size 762 manifest = createManifest(siteName, workingDirectory, currentFileCount); 763 manifestSize = JSON.stringify(manifest).length; 764 const newSizeKB = (manifestSize / 1024).toFixed(1); 765 console.log(` → Manifest now ${newSizeKB}KB with ${currentFileCount} files (${subfsRecords.length} subfs total)`); 766 767 // Check if we're under both limits now 768 if (manifestSize <= MAX_MANIFEST_SIZE && currentFileCount <= TARGET_FILE_COUNT) { 769 console.log(` ✅ Manifest fits! (${currentFileCount} files, ${newSizeKB}KB)`); 770 break; 771 } 772 } 773 774 if (manifestSize > MAX_MANIFEST_SIZE || currentFileCount > TARGET_FILE_COUNT) { 775 throw new Error( 776 `Failed to fit manifest after splitting ${attempts} directories. ` + 777 `Current: ${currentFileCount} files, ${(manifestSize / 1024).toFixed(1)}KB. ` + 778 `This should never happen - please report this issue.` 779 ); 780 } 781 782 console.log(`✅ Split complete: ${subfsRecords.length} subfs records, ${currentFileCount} files in main, ${(manifestSize / 1024).toFixed(1)}KB manifest`); 783 logger.info(`Split into ${subfsRecords.length} subfs records, ${currentFileCount} files remaining in main tree`); 784 } else { 785 const manifestSizeKB = (manifestSize / 1024).toFixed(1); 786 console.log(`Manifest created (${fileCount} files, ${manifestSizeKB}KB JSON) - no splitting needed`); 787 } 788 789 const rkey = siteName; 790 updateJobProgress(jobId, { phase: 'finalizing' }); 791 792 console.log('Putting record to PDS with rkey:', rkey); 793 const record = await agent.com.atproto.repo.putRecord({ 794 repo: did, 795 collection: 'place.wisp.fs', 796 rkey: rkey, 797 record: manifest 798 }); 799 console.log('Record successfully created on PDS:', record.data.uri); 800 801 // Store site in database cache 802 await upsertSite(did, rkey, siteName); 803 804 // Clean up old subfs records if we had any 805 if (oldSubfsUris.length > 0) { 806 console.log(`Cleaning up ${oldSubfsUris.length} old subfs records...`); 807 logger.info(`Cleaning up ${oldSubfsUris.length} old subfs records`); 808 809 // Delete old subfs records in parallel (don't wait for completion) 810 Promise.all( 811 oldSubfsUris.map(async ({ uri }) => { 812 try { 813 // Parse URI: at://did/collection/rkey 814 const parts = uri.replace('at://', '').split('/'); 815 const subRkey = parts[2]; 816 817 await agent.com.atproto.repo.deleteRecord({ 818 repo: did, 819 collection: 'place.wisp.subfs', 820 rkey: subRkey 821 }); 822 823 console.log(` 🗑️ Deleted old subfs: ${uri}`); 824 logger.info(`Deleted old subfs record: ${uri}`); 825 } catch (err: any) { 826 // Don't fail the whole upload if cleanup fails 827 console.warn(`Failed to delete old subfs ${uri}:`, err?.message); 828 logger.warn(`Failed to delete old subfs ${uri}`, err); 829 } 830 }) 831 ).catch(err => { 832 // Log but don't fail if cleanup fails 833 logger.warn('Some subfs cleanup operations failed', err); 834 }); 835 } 836 837 completeUploadJob(jobId, { 838 success: true, 839 uri: record.data.uri, 840 cid: record.data.cid, 841 fileCount, 842 siteName, 843 skippedFiles, 844 failedFiles, 845 uploadedCount: validUploadedFiles.length - failedFiles.length, 846 hasFailures: failedFiles.length > 0 847 }); 848 849 console.log('=== UPLOAD FILES COMPLETE ==='); 850 } catch (error) { 851 console.error('=== UPLOAD ERROR ==='); 852 console.error('Error details:', error); 853 logger.error('Upload error', error); 854 failUploadJob(jobId, error instanceof Error ? error.message : 'Unknown error'); 855 } 856} 857 858export const wispRoutes = (client: NodeOAuthClient, cookieSecret: string) => 859 new Elysia({ 860 prefix: '/wisp', 861 cookie: { 862 secrets: cookieSecret, 863 sign: ['did'] 864 } 865 }) 866 .derive(async ({ cookie }) => { 867 const auth = await requireAuth(client, cookie) 868 return { auth } 869 }) 870 .get( 871 '/upload-progress/:jobId', 872 async ({ params: { jobId }, auth, set }) => { 873 const job = getUploadJob(jobId); 874 875 if (!job) { 876 set.status = 404; 877 return { error: 'Job not found' }; 878 } 879 880 // Verify job belongs to authenticated user 881 if (job.did !== auth.did) { 882 set.status = 403; 883 return { error: 'Unauthorized' }; 884 } 885 886 // Set up SSE headers 887 set.headers = { 888 'Content-Type': 'text/event-stream', 889 'Cache-Control': 'no-cache', 890 'Connection': 'keep-alive' 891 }; 892 893 const stream = new ReadableStream({ 894 start(controller) { 895 const encoder = new TextEncoder(); 896 897 // Send initial state 898 const sendEvent = (event: string, data: any) => { 899 try { 900 const message = `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`; 901 controller.enqueue(encoder.encode(message)); 902 } catch (err) { 903 // Controller closed, ignore 904 } 905 }; 906 907 // Send keepalive comment every 15 seconds to prevent timeout 908 const keepaliveInterval = setInterval(() => { 909 try { 910 controller.enqueue(encoder.encode(': keepalive\n\n')); 911 } catch (err) { 912 // Controller closed, stop sending keepalives 913 clearInterval(keepaliveInterval); 914 } 915 }, 15000); 916 917 // Send current job state immediately 918 sendEvent('progress', { 919 status: job.status, 920 progress: job.progress, 921 result: job.result, 922 error: job.error 923 }); 924 925 // If job is already completed or failed, close the stream 926 if (job.status === 'completed' || job.status === 'failed') { 927 clearInterval(keepaliveInterval); 928 controller.close(); 929 return; 930 } 931 932 // Listen for updates 933 const cleanup = addJobListener(jobId, (event, data) => { 934 sendEvent(event, data); 935 936 // Close stream after done or error event 937 if (event === 'done' || event === 'error') { 938 clearInterval(keepaliveInterval); 939 setTimeout(() => { 940 try { 941 controller.close(); 942 } catch (err) { 943 // Already closed 944 } 945 }, 100); 946 } 947 }); 948 949 // Cleanup on disconnect 950 return () => { 951 clearInterval(keepaliveInterval); 952 cleanup(); 953 }; 954 } 955 }); 956 957 return new Response(stream); 958 } 959 ) 960 .post( 961 '/upload-files', 962 async ({ body, auth }) => { 963 const { siteName, files } = body as { 964 siteName: string; 965 files: File | File[] 966 }; 967 968 console.log('=== UPLOAD FILES START ==='); 969 console.log('Site name:', siteName); 970 console.log('Files received:', Array.isArray(files) ? files.length : 'single file'); 971 972 try { 973 if (!siteName) { 974 throw new Error('Site name is required') 975 } 976 977 if (!isValidSiteName(siteName)) { 978 throw new Error('Invalid site name: must be 1-512 characters and contain only alphanumeric, dots, dashes, underscores, tildes, and colons') 979 } 980 981 // Check if files were provided 982 const hasFiles = files && (Array.isArray(files) ? files.length > 0 : !!files); 983 984 if (!hasFiles) { 985 // Handle empty upload synchronously (fast operation) 986 const agent = new Agent((url, init) => auth.session.fetchHandler(url, init)) 987 988 const emptyManifest = { 989 $type: 'place.wisp.fs', 990 site: siteName, 991 root: { 992 type: 'directory', 993 entries: [] 994 }, 995 fileCount: 0, 996 createdAt: new Date().toISOString() 997 }; 998 999 // const validationResult = validateRecord(emptyManifest); 1000 // if (!validationResult.success) { 1001 // throw new Error(`Invalid manifest: ${validationResult.error?.message || 'Validation failed'}`); 1002 // } 1003 1004 const rkey = siteName; 1005 1006 const record = await agent.com.atproto.repo.putRecord({ 1007 repo: auth.did, 1008 collection: 'place.wisp.fs', 1009 rkey: rkey, 1010 record: emptyManifest 1011 }); 1012 1013 await upsertSite(auth.did, rkey, siteName); 1014 1015 return { 1016 success: true, 1017 uri: record.data.uri, 1018 cid: record.data.cid, 1019 fileCount: 0, 1020 siteName 1021 }; 1022 } 1023 1024 // For file uploads, create a job and process in background 1025 const fileArray = Array.isArray(files) ? files : [files]; 1026 const jobId = createUploadJob(auth.did, siteName, fileArray.length); 1027 1028 // Track upload speeds to estimate progress 1029 const uploadStats = { 1030 speeds: [] as number[], // MB/s from completed uploads 1031 getAverageSpeed(): number { 1032 if (this.speeds.length === 0) return 3; // Default 3 MB/s 1033 const sum = this.speeds.reduce((a, b) => a + b, 0); 1034 return sum / this.speeds.length; 1035 } 1036 }; 1037 1038 // Create agent with OAuth session and upload progress monitoring 1039 const wrappedFetchHandler = async (url: string, init?: RequestInit) => { 1040 // Check if this is an uploadBlob request with a body 1041 if (url.includes('uploadBlob') && init?.body) { 1042 const originalBody = init.body; 1043 const bodySize = originalBody instanceof Uint8Array ? originalBody.length : 1044 originalBody instanceof ArrayBuffer ? originalBody.byteLength : 1045 typeof originalBody === 'string' ? new TextEncoder().encode(originalBody).length : 0; 1046 1047 const startTime = Date.now(); 1048 1049 if (bodySize > 10 * 1024 * 1024) { // Files over 10MB 1050 const sizeMB = (bodySize / 1024 / 1024).toFixed(1); 1051 const avgSpeed = uploadStats.getAverageSpeed(); 1052 const estimatedDuration = (bodySize / 1024 / 1024) / avgSpeed; 1053 1054 console.log(`[Upload Progress] Starting upload of ${sizeMB}MB file`); 1055 console.log(`[Upload Stats] Measured speeds from last ${uploadStats.speeds.length} files:`, uploadStats.speeds.map(s => s.toFixed(2) + ' MB/s').join(', ')); 1056 console.log(`[Upload Stats] Average speed: ${avgSpeed.toFixed(2)} MB/s, estimated duration: ${estimatedDuration.toFixed(0)}s`); 1057 1058 // Log estimated progress every 5 seconds 1059 const progressInterval = setInterval(() => { 1060 const elapsed = (Date.now() - startTime) / 1000; 1061 const estimatedPercent = Math.min(95, Math.round((elapsed / estimatedDuration) * 100)); 1062 const estimatedMB = Math.min(bodySize / 1024 / 1024, elapsed * avgSpeed).toFixed(1); 1063 console.log(`[Upload Progress] ~${estimatedPercent}% (~${estimatedMB}/${sizeMB}MB) - ${elapsed.toFixed(0)}s elapsed`); 1064 }, 5000); 1065 1066 try { 1067 const result = await auth.session.fetchHandler(url, init); 1068 clearInterval(progressInterval); 1069 const totalTime = (Date.now() - startTime) / 1000; 1070 const actualSpeed = (bodySize / 1024 / 1024) / totalTime; 1071 uploadStats.speeds.push(actualSpeed); 1072 // Keep only last 10 uploads for rolling average 1073 if (uploadStats.speeds.length > 10) uploadStats.speeds.shift(); 1074 console.log(`[Upload Progress] ✅ Completed ${sizeMB}MB in ${totalTime.toFixed(1)}s (${actualSpeed.toFixed(1)} MB/s)`); 1075 return result; 1076 } catch (err) { 1077 clearInterval(progressInterval); 1078 const elapsed = (Date.now() - startTime) / 1000; 1079 console.error(`[Upload Progress] ❌ Upload failed after ${elapsed.toFixed(1)}s`); 1080 throw err; 1081 } 1082 } else { 1083 // Track small files too for speed calculation 1084 try { 1085 const result = await auth.session.fetchHandler(url, init); 1086 const totalTime = (Date.now() - startTime) / 1000; 1087 if (totalTime > 0.5) { // Only track if > 0.5s 1088 const actualSpeed = (bodySize / 1024 / 1024) / totalTime; 1089 uploadStats.speeds.push(actualSpeed); 1090 if (uploadStats.speeds.length > 10) uploadStats.speeds.shift(); 1091 console.log(`[Upload Stats] Small file: ${(bodySize / 1024).toFixed(1)}KB in ${totalTime.toFixed(2)}s = ${actualSpeed.toFixed(2)} MB/s`); 1092 } 1093 return result; 1094 } catch (err) { 1095 throw err; 1096 } 1097 } 1098 } 1099 1100 // Normal request 1101 return auth.session.fetchHandler(url, init); 1102 }; 1103 1104 const agent = new Agent(wrappedFetchHandler) 1105 console.log('Agent created for DID:', auth.did); 1106 console.log('Created upload job:', jobId); 1107 1108 // Start background processing (don't await) 1109 processUploadInBackground(jobId, agent, auth.did, siteName, fileArray).catch(err => { 1110 console.error('Background upload process failed:', err); 1111 logger.error('Background upload process failed', err); 1112 }); 1113 1114 // Return immediately with job ID 1115 return { 1116 success: true, 1117 jobId, 1118 message: 'Upload started. Connect to /wisp/upload-progress/' + jobId + ' for progress updates.' 1119 }; 1120 } catch (error) { 1121 console.error('=== UPLOAD ERROR ==='); 1122 console.error('Error details:', error); 1123 logger.error('Upload error', error); 1124 throw new Error(`Failed to upload files: ${error instanceof Error ? error.message : 'Unknown error'}`); 1125 } 1126 } 1127 )