···
import { requireAuth, type AuthenticatedContext } from '../lib/wisp-auth'
import { NodeOAuthClient } from '@atproto/oauth-client-node'
import { Agent } from '@atproto/api'
5
+
import { TID } from '@atproto/common-web'
···
17
+
findLargeDirectories,
18
+
replaceDirectoryWithSubfs,
19
+
estimateDirectorySize
} from '../lib/wisp-utils'
import { upsertSite } from '../lib/db'
import { logger } from '../lib/observability'
import { validateRecord } from '../lexicons/types/place/wisp/fs'
24
+
import { validateRecord as validateSubfsRecord } from '../lexicons/types/place/wisp/subfs'
import { MAX_SITE_SIZE, MAX_FILE_SIZE, MAX_FILE_COUNT } from '../lib/constants'
33
+
} from '../lib/upload-jobs'
function isValidSiteName(siteName: string): boolean {
if (!siteName || typeof siteName !== 'string') return false;
···
54
+
async function processUploadInBackground(
62
+
// Try to fetch existing record to enable incremental updates
63
+
let existingBlobMap = new Map<string, { blobRef: any; cid: string }>();
64
+
let oldSubfsUris: Array<{ uri: string; path: string }> = [];
65
+
console.log('Attempting to fetch existing record...');
66
+
updateJobProgress(jobId, { phase: 'validating' });
69
+
const rkey = siteName;
70
+
const existingRecord = await agent.com.atproto.repo.getRecord({
72
+
collection: 'place.wisp.fs',
75
+
console.log('Existing record found!');
77
+
if (existingRecord.data.value && typeof existingRecord.data.value === 'object' && 'root' in existingRecord.data.value) {
78
+
const manifest = existingRecord.data.value as any;
80
+
// Extract blob map from main record
81
+
existingBlobMap = extractBlobMap(manifest.root);
82
+
console.log(`Found existing manifest with ${existingBlobMap.size} files in main record`);
84
+
// Extract subfs URIs with their mount paths from main record
85
+
const subfsUris = extractSubfsUris(manifest.root);
86
+
oldSubfsUris = subfsUris; // Save for cleanup later
88
+
if (subfsUris.length > 0) {
89
+
console.log(`Found ${subfsUris.length} subfs records, fetching in parallel...`);
90
+
logger.info(`Fetching ${subfsUris.length} subfs records for blob reuse`);
92
+
// Fetch all subfs records in parallel
93
+
const subfsRecords = await Promise.all(
94
+
subfsUris.map(async ({ uri, path }) => {
96
+
// Parse URI: at://did/collection/rkey
97
+
const parts = uri.replace('at://', '').split('/');
98
+
const subDid = parts[0];
99
+
const collection = parts[1];
100
+
const subRkey = parts[2];
102
+
const record = await agent.com.atproto.repo.getRecord({
104
+
collection: collection,
108
+
return { record: record.data.value as any, mountPath: path };
109
+
} catch (err: any) {
110
+
logger.warn(`Failed to fetch subfs record ${uri}: ${err?.message}`, err);
116
+
// Merge blob maps from all subfs records
117
+
let totalSubfsBlobs = 0;
118
+
for (const subfsData of subfsRecords) {
119
+
if (subfsData && subfsData.record && 'root' in subfsData.record) {
120
+
// Extract blobs with the correct mount path prefix
121
+
const subfsMap = extractBlobMap(subfsData.record.root, subfsData.mountPath);
122
+
subfsMap.forEach((value, key) => {
123
+
existingBlobMap.set(key, value);
129
+
console.log(`Merged ${totalSubfsBlobs} files from ${subfsUris.length} subfs records`);
130
+
logger.info(`Total blob map: ${existingBlobMap.size} files (main + subfs)`);
133
+
console.log(`Total existing blobs for reuse: ${existingBlobMap.size} files`);
134
+
logger.info(`Found existing manifest with ${existingBlobMap.size} files for incremental update`);
136
+
} catch (error: any) {
137
+
console.log('No existing record found or error:', error?.message || error);
138
+
if (error?.status !== 400 && error?.error !== 'RecordNotFound') {
139
+
logger.warn('Failed to fetch existing record, proceeding with full upload', error);
143
+
// Convert File objects to UploadedFile format
144
+
const uploadedFiles: UploadedFile[] = [];
145
+
const skippedFiles: Array<{ name: string; reason: string }> = [];
147
+
console.log('Processing files, count:', fileArray.length);
148
+
updateJobProgress(jobId, { phase: 'compressing' });
150
+
for (let i = 0; i < fileArray.length; i++) {
151
+
const file = fileArray[i];
152
+
console.log(`Processing file ${i + 1}/${fileArray.length}:`, file.name, file.size, 'bytes');
153
+
updateJobProgress(jobId, {
154
+
filesProcessed: i + 1,
155
+
currentFile: file.name
158
+
// Skip .git directory files
159
+
const normalizedPath = file.name.replace(/^[^\/]*\//, '');
160
+
if (normalizedPath.startsWith('.git/') || normalizedPath === '.git') {
161
+
console.log(`Skipping .git file: ${file.name}`);
162
+
skippedFiles.push({
164
+
reason: '.git directory excluded'
169
+
// Skip files that are too large
170
+
const maxSize = MAX_FILE_SIZE;
171
+
if (file.size > maxSize) {
172
+
skippedFiles.push({
174
+
reason: `file too large (${(file.size / 1024 / 1024).toFixed(2)}MB, max 100MB)`
179
+
const arrayBuffer = await file.arrayBuffer();
180
+
const originalContent = Buffer.from(arrayBuffer);
181
+
const originalMimeType = file.type || 'application/octet-stream';
183
+
// Compress and base64 encode ALL files
184
+
const compressedContent = compressFile(originalContent);
185
+
const base64Content = Buffer.from(compressedContent.toString('base64'), 'binary');
186
+
const compressionRatio = (compressedContent.length / originalContent.length * 100).toFixed(1);
187
+
console.log(`Compressing ${file.name}: ${originalContent.length} -> ${compressedContent.length} bytes (${compressionRatio}%), base64: ${base64Content.length} bytes`);
188
+
logger.info(`Compressing ${file.name}: ${originalContent.length} -> ${compressedContent.length} bytes (${compressionRatio}%), base64: ${base64Content.length} bytes`);
190
+
uploadedFiles.push({
192
+
content: base64Content,
193
+
mimeType: originalMimeType,
194
+
size: base64Content.length,
200
+
// Update total file count after filtering (important for progress tracking)
201
+
updateJobProgress(jobId, {
202
+
totalFiles: uploadedFiles.length
205
+
// Check total size limit
206
+
const totalSize = uploadedFiles.reduce((sum, file) => sum + file.size, 0);
207
+
const maxTotalSize = MAX_SITE_SIZE;
209
+
if (totalSize > maxTotalSize) {
210
+
throw new Error(`Total upload size ${(totalSize / 1024 / 1024).toFixed(2)}MB exceeds 300MB limit`);
213
+
// Check file count limit
214
+
if (uploadedFiles.length > MAX_FILE_COUNT) {
215
+
throw new Error(`File count ${uploadedFiles.length} exceeds ${MAX_FILE_COUNT} files limit`);
218
+
console.log(`After filtering: ${uploadedFiles.length} files to process (${skippedFiles.length} skipped)`);
220
+
if (uploadedFiles.length === 0) {
221
+
// Create empty manifest
222
+
const emptyManifest = {
223
+
$type: 'place.wisp.fs',
230
+
createdAt: new Date().toISOString()
233
+
const validationResult = validateRecord(emptyManifest);
234
+
if (!validationResult.success) {
235
+
throw new Error(`Invalid manifest: ${validationResult.error?.message || 'Validation failed'}`);
238
+
const rkey = siteName;
239
+
updateJobProgress(jobId, { phase: 'finalizing' });
241
+
const record = await agent.com.atproto.repo.putRecord({
243
+
collection: 'place.wisp.fs',
245
+
record: emptyManifest
248
+
await upsertSite(did, rkey, siteName);
250
+
completeUploadJob(jobId, {
252
+
uri: record.data.uri,
253
+
cid: record.data.cid,
261
+
// Process files into directory structure
262
+
console.log('Processing uploaded files into directory structure...');
263
+
const validUploadedFiles = uploadedFiles.filter((f, i) => {
264
+
if (!f || !f.name || !f.content) {
265
+
console.error(`Filtering out invalid file at index ${i}`);
271
+
const { directory, fileCount } = processUploadedFiles(validUploadedFiles);
272
+
console.log('Directory structure created, file count:', fileCount);
274
+
// Upload files as blobs with retry logic for DPoP nonce conflicts
275
+
console.log('Starting blob upload/reuse phase...');
276
+
updateJobProgress(jobId, { phase: 'uploading' });
278
+
// Helper function to upload blob with exponential backoff retry
279
+
const uploadBlobWithRetry = async (
286
+
for (let attempt = 0; attempt < maxRetries; attempt++) {
288
+
return await agent.com.atproto.repo.uploadBlob(content, { encoding: mimeType });
289
+
} catch (error: any) {
290
+
const isDPoPNonceError =
291
+
error?.message?.toLowerCase().includes('nonce') ||
292
+
error?.message?.toLowerCase().includes('dpop') ||
293
+
error?.status === 409;
295
+
if (isDPoPNonceError && attempt < maxRetries - 1) {
296
+
const backoffMs = 100 * Math.pow(2, attempt); // 100ms, 200ms, 400ms
297
+
logger.info(`[File Upload] 🔄 DPoP nonce conflict for ${fileName}, retrying in ${backoffMs}ms (attempt ${attempt + 1}/${maxRetries})`);
298
+
await new Promise(resolve => setTimeout(resolve, backoffMs));
304
+
throw new Error(`Failed to upload ${fileName} after ${maxRetries} attempts`);
307
+
// Use sliding window concurrency for maximum throughput
308
+
const CONCURRENCY_LIMIT = 50; // Maximum concurrent uploads with retry logic
309
+
const uploadedBlobs: Array<{
310
+
result: FileUploadResult;
312
+
sentMimeType: string;
313
+
returnedMimeType: string;
317
+
// Process file with sliding window concurrency
318
+
const processFile = async (file: UploadedFile, index: number) => {
320
+
if (!file || !file.name) {
321
+
throw new Error(`Undefined file at index ${index}`);
324
+
const fileCID = computeCID(file.content);
325
+
const normalizedPath = file.name.replace(/^[^\/]*\//, '');
326
+
const existingBlob = existingBlobMap.get(normalizedPath) || existingBlobMap.get(file.name);
328
+
if (existingBlob && existingBlob.cid === fileCID) {
329
+
logger.info(`[File Upload] ♻️ Reused: ${file.name} (unchanged, CID: ${fileCID})`);
330
+
updateJobProgress(jobId, { filesReused: (getUploadJob(jobId)?.progress.filesReused || 0) + 1 });
334
+
hash: existingBlob.cid,
335
+
blobRef: existingBlob.blobRef,
336
+
...(file.compressed && {
337
+
encoding: 'gzip' as const,
338
+
mimeType: file.originalMimeType || file.mimeType,
342
+
filePath: file.name,
343
+
sentMimeType: file.mimeType,
344
+
returnedMimeType: existingBlob.blobRef.mimeType,
349
+
const uploadMimeType = file.compressed || file.mimeType.startsWith('text/html')
350
+
? 'application/octet-stream'
353
+
const compressionInfo = file.compressed ? ' (gzipped)' : '';
354
+
const fileSizeMB = (file.size / 1024 / 1024).toFixed(2);
355
+
logger.info(`[File Upload] ⬆️ Uploading: ${file.name} (${fileSizeMB}MB${compressionInfo})`);
357
+
const uploadResult = await uploadBlobWithRetry(
364
+
const returnedBlobRef = uploadResult.data.blob;
365
+
updateJobProgress(jobId, { filesUploaded: (getUploadJob(jobId)?.progress.filesUploaded || 0) + 1 });
366
+
logger.info(`[File Upload] ✅ Uploaded: ${file.name} (CID: ${fileCID})`);
370
+
hash: returnedBlobRef.ref.toString(),
371
+
blobRef: returnedBlobRef,
372
+
...(file.compressed && {
373
+
encoding: 'gzip' as const,
374
+
mimeType: file.originalMimeType || file.mimeType,
378
+
filePath: file.name,
379
+
sentMimeType: file.mimeType,
380
+
returnedMimeType: returnedBlobRef.mimeType,
383
+
} catch (uploadError) {
384
+
logger.error('Upload failed for file', uploadError);
389
+
// Sliding window concurrency control
390
+
const processWithConcurrency = async () => {
391
+
const results: any[] = [];
393
+
const executing = new Set<Promise<void>>();
395
+
for (const file of validUploadedFiles) {
396
+
const currentIndex = fileIndex++;
398
+
const promise = processFile(file, currentIndex)
400
+
results[currentIndex] = result;
403
+
logger.error(`Failed to process file at index ${currentIndex}`, error);
404
+
throw error; // Re-throw to fail the entire upload
407
+
executing.delete(promise);
410
+
executing.add(promise);
412
+
if (executing.size >= CONCURRENCY_LIMIT) {
413
+
await Promise.race(executing);
417
+
// Wait for remaining uploads
418
+
await Promise.all(executing);
419
+
return results.filter(r => r !== undefined); // Filter out any undefined entries
422
+
const allResults = await processWithConcurrency();
423
+
uploadedBlobs.push(...allResults);
425
+
const currentReused = uploadedBlobs.filter(b => b.reused).length;
426
+
const currentUploaded = uploadedBlobs.filter(b => !b.reused).length;
427
+
logger.info(`[File Upload] 🎉 Upload complete → ${uploadedBlobs.length}/${validUploadedFiles.length} files (${currentUploaded} uploaded, ${currentReused} reused)`);
429
+
const reusedCount = uploadedBlobs.filter(b => b.reused).length;
430
+
const uploadedCount = uploadedBlobs.filter(b => !b.reused).length;
431
+
logger.info(`[File Upload] 🎉 Upload phase complete! Total: ${uploadedBlobs.length} files (${uploadedCount} uploaded, ${reusedCount} reused)`);
433
+
const uploadResults: FileUploadResult[] = uploadedBlobs.map(blob => blob.result);
434
+
const filePaths: string[] = uploadedBlobs.map(blob => blob.filePath);
436
+
// Update directory with file blobs
437
+
console.log('Updating directory with blob references...');
438
+
updateJobProgress(jobId, { phase: 'creating_manifest' });
439
+
const updatedDirectory = updateFileBlobs(directory, uploadResults, filePaths);
441
+
// Check if we need to split into subfs records
442
+
// Split proactively if we have lots of files to avoid hitting manifest size limits
443
+
const MAX_MANIFEST_SIZE = 140 * 1024; // 140KB to be safe (PDS limit is 150KB)
444
+
const FILE_COUNT_THRESHOLD = 250; // Start splitting early
445
+
const subfsRecords: Array<{ uri: string; path: string }> = [];
446
+
let workingDirectory = updatedDirectory;
447
+
let currentFileCount = fileCount;
449
+
// Create initial manifest to check size
450
+
let manifest = createManifest(siteName, workingDirectory, fileCount);
451
+
let manifestSize = JSON.stringify(manifest).length;
453
+
// Split if we have lots of files OR if manifest is already too large
454
+
if (fileCount >= FILE_COUNT_THRESHOLD || manifestSize > MAX_MANIFEST_SIZE) {
455
+
console.log(`⚠️ Large site detected (${fileCount} files, ${(manifestSize / 1024).toFixed(1)}KB), splitting into subfs records...`);
456
+
logger.info(`Large site with ${fileCount} files, splitting into subfs records`);
458
+
// Keep splitting until manifest fits under limit
460
+
const MAX_ATTEMPTS = 100; // Allow many splits for very large sites
462
+
while (manifestSize > MAX_MANIFEST_SIZE && attempts < MAX_ATTEMPTS) {
465
+
// Find all directories sorted by size (largest first)
466
+
const directories = findLargeDirectories(workingDirectory);
467
+
directories.sort((a, b) => b.size - a.size);
469
+
if (directories.length === 0) {
470
+
// No more directories to split - this should be very rare
472
+
`Cannot split manifest further - no subdirectories available. ` +
473
+
`Current size: ${(manifestSize / 1024).toFixed(1)}KB. ` +
474
+
`Try organizing files into subdirectories.`
478
+
// Pick the largest directory
479
+
const largestDir = directories[0];
480
+
console.log(` Split #${attempts}: ${largestDir.path} (${largestDir.fileCount} files, ${(largestDir.size / 1024).toFixed(1)}KB)`);
482
+
// Create a subfs record for this directory
483
+
const subfsRkey = TID.nextStr();
484
+
const subfsManifest = {
485
+
$type: 'place.wisp.subfs' as const,
486
+
root: largestDir.directory,
487
+
fileCount: largestDir.fileCount,
488
+
createdAt: new Date().toISOString()
491
+
// Validate subfs record
492
+
const subfsValidation = validateSubfsRecord(subfsManifest);
493
+
if (!subfsValidation.success) {
494
+
throw new Error(`Invalid subfs manifest: ${subfsValidation.error?.message || 'Validation failed'}`);
497
+
// Upload subfs record to PDS
498
+
const subfsRecord = await agent.com.atproto.repo.putRecord({
500
+
collection: 'place.wisp.subfs',
502
+
record: subfsManifest
505
+
const subfsUri = subfsRecord.data.uri;
506
+
subfsRecords.push({ uri: subfsUri, path: largestDir.path });
507
+
console.log(` ✅ Created subfs: ${subfsUri}`);
508
+
logger.info(`Created subfs record for ${largestDir.path}: ${subfsUri}`);
510
+
// Replace directory with subfs node in the main tree
511
+
workingDirectory = replaceDirectoryWithSubfs(workingDirectory, largestDir.path, subfsUri);
513
+
// Recreate manifest and check new size
514
+
currentFileCount -= largestDir.fileCount;
515
+
manifest = createManifest(siteName, workingDirectory, fileCount);
516
+
manifestSize = JSON.stringify(manifest).length;
517
+
const newSizeKB = (manifestSize / 1024).toFixed(1);
518
+
console.log(` → Manifest now ${newSizeKB}KB with ${currentFileCount} files (${subfsRecords.length} subfs total)`);
520
+
// Check if we're under the limit now
521
+
if (manifestSize <= MAX_MANIFEST_SIZE) {
522
+
console.log(` ✅ Manifest fits! (${newSizeKB}KB < 140KB)`);
527
+
if (manifestSize > MAX_MANIFEST_SIZE) {
529
+
`Failed to fit manifest after splitting ${attempts} directories. ` +
530
+
`Current size: ${(manifestSize / 1024).toFixed(1)}KB. ` +
531
+
`This should never happen - please report this issue.`
535
+
console.log(`✅ Split complete: ${subfsRecords.length} subfs records, ${currentFileCount} files in main, ${(manifestSize / 1024).toFixed(1)}KB manifest`);
536
+
logger.info(`Split into ${subfsRecords.length} subfs records, ${currentFileCount} files remaining in main tree`);
538
+
const manifestSizeKB = (manifestSize / 1024).toFixed(1);
539
+
console.log(`Manifest created (${fileCount} files, ${manifestSizeKB}KB JSON) - no splitting needed`);
542
+
const rkey = siteName;
543
+
updateJobProgress(jobId, { phase: 'finalizing' });
545
+
console.log('Putting record to PDS with rkey:', rkey);
546
+
const record = await agent.com.atproto.repo.putRecord({
548
+
collection: 'place.wisp.fs',
552
+
console.log('Record successfully created on PDS:', record.data.uri);
554
+
// Store site in database cache
555
+
await upsertSite(did, rkey, siteName);
557
+
// Clean up old subfs records if we had any
558
+
if (oldSubfsUris.length > 0) {
559
+
console.log(`Cleaning up ${oldSubfsUris.length} old subfs records...`);
560
+
logger.info(`Cleaning up ${oldSubfsUris.length} old subfs records`);
562
+
// Delete old subfs records in parallel (don't wait for completion)
564
+
oldSubfsUris.map(async ({ uri }) => {
566
+
// Parse URI: at://did/collection/rkey
567
+
const parts = uri.replace('at://', '').split('/');
568
+
const subRkey = parts[2];
570
+
await agent.com.atproto.repo.deleteRecord({
572
+
collection: 'place.wisp.subfs',
576
+
console.log(` 🗑️ Deleted old subfs: ${uri}`);
577
+
logger.info(`Deleted old subfs record: ${uri}`);
578
+
} catch (err: any) {
579
+
// Don't fail the whole upload if cleanup fails
580
+
console.warn(`Failed to delete old subfs ${uri}:`, err?.message);
581
+
logger.warn(`Failed to delete old subfs ${uri}`, err);
585
+
// Log but don't fail if cleanup fails
586
+
logger.warn('Some subfs cleanup operations failed', err);
590
+
completeUploadJob(jobId, {
592
+
uri: record.data.uri,
593
+
cid: record.data.cid,
597
+
uploadedCount: validUploadedFiles.length
600
+
console.log('=== UPLOAD FILES COMPLETE ===');
602
+
console.error('=== UPLOAD ERROR ===');
603
+
console.error('Error details:', error);
604
+
logger.error('Upload error', error);
605
+
failUploadJob(jobId, error instanceof Error ? error.message : 'Unknown error');
export const wispRoutes = (client: NodeOAuthClient, cookieSecret: string) =>
···
const auth = await requireAuth(client, cookie)
622
+
'/upload-progress/:jobId',
623
+
async ({ params: { jobId }, auth, set }) => {
624
+
const job = getUploadJob(jobId);
628
+
return { error: 'Job not found' };
631
+
// Verify job belongs to authenticated user
632
+
if (job.did !== auth.did) {
634
+
return { error: 'Unauthorized' };
637
+
// Set up SSE headers
639
+
'Content-Type': 'text/event-stream',
640
+
'Cache-Control': 'no-cache',
641
+
'Connection': 'keep-alive'
644
+
const stream = new ReadableStream({
645
+
start(controller) {
646
+
const encoder = new TextEncoder();
648
+
// Send initial state
649
+
const sendEvent = (event: string, data: any) => {
651
+
const message = `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`;
652
+
controller.enqueue(encoder.encode(message));
654
+
// Controller closed, ignore
658
+
// Send keepalive comment every 15 seconds to prevent timeout
659
+
const keepaliveInterval = setInterval(() => {
661
+
controller.enqueue(encoder.encode(': keepalive\n\n'));
663
+
// Controller closed, stop sending keepalives
664
+
clearInterval(keepaliveInterval);
668
+
// Send current job state immediately
669
+
sendEvent('progress', {
670
+
status: job.status,
671
+
progress: job.progress,
672
+
result: job.result,
676
+
// If job is already completed or failed, close the stream
677
+
if (job.status === 'completed' || job.status === 'failed') {
678
+
clearInterval(keepaliveInterval);
679
+
controller.close();
683
+
// Listen for updates
684
+
const cleanup = addJobListener(jobId, (event, data) => {
685
+
sendEvent(event, data);
687
+
// Close stream after done or error event
688
+
if (event === 'done' || event === 'error') {
689
+
clearInterval(keepaliveInterval);
692
+
controller.close();
700
+
// Cleanup on disconnect
702
+
clearInterval(keepaliveInterval);
708
+
return new Response(stream);
async ({ body, auth }) => {
···
const hasFiles = files && (Array.isArray(files) ? files.length > 0 : !!files);
77
-
// Create agent with OAuth session
736
+
// Handle empty upload synchronously (fast operation)
const agent = new Agent((url, init) => auth.session.fetchHandler(url, init))
80
-
// Create empty manifest
···
createdAt: new Date().toISOString()
92
-
// Validate the manifest
const validationResult = validateRecord(emptyManifest);
if (!validationResult.success) {
throw new Error(`Invalid manifest: ${validationResult.error?.message || 'Validation failed'}`);
98
-
// Use site name as rkey
const record = await agent.com.atproto.repo.putRecord({
···
775
+
// For file uploads, create a job and process in background
776
+
const fileArray = Array.isArray(files) ? files : [files];
777
+
const jobId = createUploadJob(auth.did, siteName, fileArray.length);
// Create agent with OAuth session
const agent = new Agent((url, init) => auth.session.fetchHandler(url, init))
console.log('Agent created for DID:', auth.did);
123
-
// Try to fetch existing record to enable incremental updates
124
-
let existingBlobMap = new Map<string, { blobRef: any; cid: string }>();
125
-
console.log('Attempting to fetch existing record...');
127
-
const rkey = siteName;
128
-
const existingRecord = await agent.com.atproto.repo.getRecord({
130
-
collection: 'place.wisp.fs',
133
-
console.log('Existing record found!');
135
-
if (existingRecord.data.value && typeof existingRecord.data.value === 'object' && 'root' in existingRecord.data.value) {
136
-
const manifest = existingRecord.data.value as any;
137
-
existingBlobMap = extractBlobMap(manifest.root);
138
-
console.log(`Found existing manifest with ${existingBlobMap.size} files for incremental update`);
139
-
logger.info(`Found existing manifest with ${existingBlobMap.size} files for incremental update`);
141
-
} catch (error: any) {
142
-
console.log('No existing record found or error:', error?.message || error);
143
-
// Record doesn't exist yet, this is a new site
144
-
if (error?.status !== 400 && error?.error !== 'RecordNotFound') {
145
-
logger.warn('Failed to fetch existing record, proceeding with full upload', error);
149
-
// Convert File objects to UploadedFile format
150
-
// Elysia gives us File objects directly, handle both single file and array
151
-
const fileArray = Array.isArray(files) ? files : [files];
152
-
const uploadedFiles: UploadedFile[] = [];
153
-
const skippedFiles: Array<{ name: string; reason: string }> = [];
155
-
console.log('Processing files, count:', fileArray.length);
157
-
for (let i = 0; i < fileArray.length; i++) {
158
-
const file = fileArray[i];
159
-
console.log(`Processing file ${i + 1}/${fileArray.length}:`, file.name, file.size, 'bytes');
161
-
// Skip files that are too large (limit to 100MB per file)
162
-
const maxSize = MAX_FILE_SIZE; // 100MB
163
-
if (file.size > maxSize) {
164
-
skippedFiles.push({
166
-
reason: `file too large (${(file.size / 1024 / 1024).toFixed(2)}MB, max 100MB)`
171
-
const arrayBuffer = await file.arrayBuffer();
172
-
const originalContent = Buffer.from(arrayBuffer);
173
-
const originalMimeType = file.type || 'application/octet-stream';
175
-
// Compress and base64 encode ALL files
176
-
const compressedContent = compressFile(originalContent);
177
-
// Base64 encode the gzipped content to prevent PDS content sniffing
178
-
// Convert base64 string to bytes using binary encoding (each char becomes exactly one byte)
179
-
// This is what PDS receives and computes CID on
180
-
const base64Content = Buffer.from(compressedContent.toString('base64'), 'binary');
181
-
const compressionRatio = (compressedContent.length / originalContent.length * 100).toFixed(1);
182
-
console.log(`Compressing ${file.name}: ${originalContent.length} -> ${compressedContent.length} bytes (${compressionRatio}%), base64: ${base64Content.length} bytes`);
183
-
logger.info(`Compressing ${file.name}: ${originalContent.length} -> ${compressedContent.length} bytes (${compressionRatio}%), base64: ${base64Content.length} bytes`);
185
-
uploadedFiles.push({
187
-
content: base64Content, // This is the gzipped+base64 content that will be uploaded and CID-computed
188
-
mimeType: originalMimeType,
189
-
size: base64Content.length,
195
-
// Check total size limit (300MB)
196
-
const totalSize = uploadedFiles.reduce((sum, file) => sum + file.size, 0);
197
-
const maxTotalSize = MAX_SITE_SIZE; // 300MB
199
-
if (totalSize > maxTotalSize) {
200
-
throw new Error(`Total upload size ${(totalSize / 1024 / 1024).toFixed(2)}MB exceeds 300MB limit`);
203
-
// Check file count limit (2000 files)
204
-
if (uploadedFiles.length > MAX_FILE_COUNT) {
205
-
throw new Error(`File count ${uploadedFiles.length} exceeds ${MAX_FILE_COUNT} files limit`);
208
-
if (uploadedFiles.length === 0) {
210
-
// Create empty manifest
211
-
const emptyManifest = {
212
-
$type: 'place.wisp.fs',
219
-
createdAt: new Date().toISOString()
222
-
// Validate the manifest
223
-
const validationResult = validateRecord(emptyManifest);
224
-
if (!validationResult.success) {
225
-
throw new Error(`Invalid manifest: ${validationResult.error?.message || 'Validation failed'}`);
782
+
console.log('Created upload job:', jobId);
228
-
// Use site name as rkey
229
-
const rkey = siteName;
231
-
const record = await agent.com.atproto.repo.putRecord({
233
-
collection: 'place.wisp.fs',
235
-
record: emptyManifest
238
-
await upsertSite(auth.did, rkey, siteName);
242
-
uri: record.data.uri,
243
-
cid: record.data.cid,
247
-
message: 'Site created but no valid web files were found to upload'
251
-
// Process files into directory structure
252
-
console.log('Processing uploaded files into directory structure...');
253
-
console.log('uploadedFiles array length:', uploadedFiles.length);
254
-
console.log('uploadedFiles contents:', uploadedFiles.map((f, i) => `${i}: ${f?.name || 'UNDEFINED'}`));
256
-
// Filter out any undefined/null/invalid entries (defensive)
257
-
const validUploadedFiles = uploadedFiles.filter((f, i) => {
259
-
console.error(`Filtering out undefined/null file at index ${i}`);
263
-
console.error(`Filtering out file with no name at index ${i}:`, f);
267
-
console.error(`Filtering out file with no content at index ${i}:`, f.name);
784
+
// Start background processing (don't await)
785
+
processUploadInBackground(jobId, agent, auth.did, siteName, fileArray).catch(err => {
786
+
console.error('Background upload process failed:', err);
787
+
logger.error('Background upload process failed', err);
272
-
if (validUploadedFiles.length !== uploadedFiles.length) {
273
-
console.warn(`Filtered out ${uploadedFiles.length - validUploadedFiles.length} invalid files`);
275
-
console.log('validUploadedFiles length:', validUploadedFiles.length);
277
-
const { directory, fileCount } = processUploadedFiles(validUploadedFiles);
278
-
console.log('Directory structure created, file count:', fileCount);
280
-
// Upload files as blobs in parallel (or reuse existing blobs with matching CIDs)
281
-
console.log('Starting blob upload/reuse phase...');
282
-
// For compressed files, we upload as octet-stream and store the original MIME type in metadata
283
-
// For text/html files, we also use octet-stream as a workaround for PDS image pipeline issues
284
-
const uploadPromises = validUploadedFiles.map(async (file, i) => {
286
-
// Skip undefined files (shouldn't happen after filter, but defensive)
287
-
if (!file || !file.name) {
288
-
console.error(`ERROR: Undefined file at index ${i} in validUploadedFiles!`);
289
-
throw new Error(`Undefined file at index ${i}`);
292
-
// Compute CID for this file to check if it already exists
293
-
// Note: file.content is already gzipped+base64 encoded
294
-
const fileCID = computeCID(file.content);
296
-
// Normalize the file path for comparison (remove base folder prefix like "cobblemon/")
297
-
const normalizedPath = file.name.replace(/^[^\/]*\//, '');
299
-
// Check if we have an existing blob with the same CID
300
-
// Try both the normalized path and the full path
301
-
const existingBlob = existingBlobMap.get(normalizedPath) || existingBlobMap.get(file.name);
303
-
if (existingBlob && existingBlob.cid === fileCID) {
304
-
// Reuse existing blob - no need to upload
305
-
logger.info(`[File Upload] Reusing existing blob for: ${file.name} (CID: ${fileCID})`);
309
-
hash: existingBlob.cid,
310
-
blobRef: existingBlob.blobRef,
311
-
...(file.compressed && {
312
-
encoding: 'gzip' as const,
313
-
mimeType: file.originalMimeType || file.mimeType,
317
-
filePath: file.name,
318
-
sentMimeType: file.mimeType,
319
-
returnedMimeType: existingBlob.blobRef.mimeType,
324
-
// File is new or changed - upload it
325
-
// If compressed, always upload as octet-stream
326
-
// Otherwise, workaround: PDS incorrectly processes text/html through image pipeline
327
-
const uploadMimeType = file.compressed || file.mimeType.startsWith('text/html')
328
-
? 'application/octet-stream'
331
-
const compressionInfo = file.compressed ? ' (gzipped)' : '';
332
-
logger.info(`[File Upload] Uploading new/changed file: ${file.name} (original: ${file.mimeType}, sending as: ${uploadMimeType}, ${file.size} bytes${compressionInfo}, CID: ${fileCID})`);
334
-
const uploadResult = await agent.com.atproto.repo.uploadBlob(
337
-
encoding: uploadMimeType
341
-
const returnedBlobRef = uploadResult.data.blob;
343
-
// Use the blob ref exactly as returned from PDS
346
-
hash: returnedBlobRef.ref.toString(),
347
-
blobRef: returnedBlobRef,
348
-
...(file.compressed && {
349
-
encoding: 'gzip' as const,
350
-
mimeType: file.originalMimeType || file.mimeType,
354
-
filePath: file.name,
355
-
sentMimeType: file.mimeType,
356
-
returnedMimeType: returnedBlobRef.mimeType,
359
-
} catch (uploadError) {
360
-
logger.error('Upload failed for file', uploadError);
365
-
// Wait for all uploads to complete
366
-
const uploadedBlobs = await Promise.all(uploadPromises);
368
-
// Count reused vs uploaded blobs
369
-
const reusedCount = uploadedBlobs.filter(b => (b as any).reused).length;
370
-
const uploadedCount = uploadedBlobs.filter(b => !(b as any).reused).length;
371
-
console.log(`Blob statistics: ${reusedCount} reused, ${uploadedCount} uploaded, ${uploadedBlobs.length} total`);
372
-
logger.info(`Blob statistics: ${reusedCount} reused, ${uploadedCount} uploaded, ${uploadedBlobs.length} total`);
374
-
// Extract results and file paths in correct order
375
-
const uploadResults: FileUploadResult[] = uploadedBlobs.map(blob => blob.result);
376
-
const filePaths: string[] = uploadedBlobs.map(blob => blob.filePath);
378
-
// Update directory with file blobs
379
-
console.log('Updating directory with blob references...');
380
-
const updatedDirectory = updateFileBlobs(directory, uploadResults, filePaths);
383
-
console.log('Creating manifest...');
384
-
const manifest = createManifest(siteName, updatedDirectory, fileCount);
385
-
console.log('Manifest created successfully');
387
-
// Use site name as rkey
388
-
const rkey = siteName;
392
-
console.log('Putting record to PDS with rkey:', rkey);
393
-
record = await agent.com.atproto.repo.putRecord({
395
-
collection: 'place.wisp.fs',
399
-
console.log('Record successfully created on PDS:', record.data.uri);
400
-
} catch (putRecordError: any) {
401
-
console.error('FAILED to create record on PDS:', putRecordError);
402
-
logger.error('Failed to create record on PDS', putRecordError);
404
-
throw putRecordError;
407
-
// Store site in database cache
408
-
await upsertSite(auth.did, rkey, siteName);
790
+
// Return immediately with job ID
412
-
uri: record.data.uri,
413
-
cid: record.data.cid,
417
-
uploadedCount: validUploadedFiles.length
794
+
message: 'Upload started. Connect to /wisp/upload-progress/' + jobId + ' for progress updates.'
420
-
console.log('=== UPLOAD FILES COMPLETE ===');
console.error('=== UPLOAD ERROR ===');
console.error('Error details:', error);
425
-
console.error('Stack trace:', error instanceof Error ? error.stack : 'N/A');
426
-
logger.error('Upload error', error, {
427
-
message: error instanceof Error ? error.message : 'Unknown error',
428
-
name: error instanceof Error ? error.name : undefined
799
+
logger.error('Upload error', error);
throw new Error(`Failed to upload files: ${error instanceof Error ? error.message : 'Unknown error'}`);