Monorepo for wisp.place. A static site hosting service built on top of the AT Protocol.
wisp.place
1import { Elysia } from 'elysia'
2import { requireAuth, type AuthenticatedContext } from '../lib/wisp-auth'
3import { NodeOAuthClient } from '@atproto/oauth-client-node'
4import { Agent } from '@atproto/api'
5import { TID } from '@atproto/common-web'
6import {
7 type UploadedFile,
8 type FileUploadResult,
9 processUploadedFiles,
10 createManifest,
11 updateFileBlobs,
12 shouldCompressFile,
13 compressFile,
14 computeCID,
15 extractBlobMap,
16 extractSubfsUris,
17 findLargeDirectories,
18 replaceDirectoryWithSubfs,
19 estimateDirectorySize
20} from '../lib/wisp-utils'
21import { upsertSite } from '../lib/db'
22import { logger } from '../lib/observability'
23import { validateRecord } from '../lexicons/types/place/wisp/fs'
24import { validateRecord as validateSubfsRecord } from '../lexicons/types/place/wisp/subfs'
25import { MAX_SITE_SIZE, MAX_FILE_SIZE, MAX_FILE_COUNT } from '../lib/constants'
26import {
27 createUploadJob,
28 getUploadJob,
29 updateJobProgress,
30 completeUploadJob,
31 failUploadJob,
32 addJobListener
33} from '../lib/upload-jobs'
34
35function isValidSiteName(siteName: string): boolean {
36 if (!siteName || typeof siteName !== 'string') return false;
37
38 // Length check (AT Protocol rkey limit)
39 if (siteName.length < 1 || siteName.length > 512) return false;
40
41 // Check for path traversal
42 if (siteName === '.' || siteName === '..') return false;
43 if (siteName.includes('/') || siteName.includes('\\')) return false;
44 if (siteName.includes('\0')) return false;
45
46 // AT Protocol rkey format: alphanumeric, dots, dashes, underscores, tildes, colons
47 // Based on NSID format rules
48 const validRkeyPattern = /^[a-zA-Z0-9._~:-]+$/;
49 if (!validRkeyPattern.test(siteName)) return false;
50
51 return true;
52}
53
54async function processUploadInBackground(
55 jobId: string,
56 agent: Agent,
57 did: string,
58 siteName: string,
59 fileArray: File[]
60): Promise<void> {
61 try {
62 // Try to fetch existing record to enable incremental updates
63 let existingBlobMap = new Map<string, { blobRef: any; cid: string }>();
64 let oldSubfsUris: Array<{ uri: string; path: string }> = [];
65 console.log('Attempting to fetch existing record...');
66 updateJobProgress(jobId, { phase: 'validating' });
67
68 try {
69 const rkey = siteName;
70 const existingRecord = await agent.com.atproto.repo.getRecord({
71 repo: did,
72 collection: 'place.wisp.fs',
73 rkey: rkey
74 });
75 console.log('Existing record found!');
76
77 if (existingRecord.data.value && typeof existingRecord.data.value === 'object' && 'root' in existingRecord.data.value) {
78 const manifest = existingRecord.data.value as any;
79
80 // Extract blob map from main record
81 existingBlobMap = extractBlobMap(manifest.root);
82 console.log(`Found existing manifest with ${existingBlobMap.size} files in main record`);
83
84 // Extract subfs URIs with their mount paths from main record
85 const subfsUris = extractSubfsUris(manifest.root);
86 oldSubfsUris = subfsUris; // Save for cleanup later
87
88 if (subfsUris.length > 0) {
89 console.log(`Found ${subfsUris.length} subfs records, fetching in parallel...`);
90 logger.info(`Fetching ${subfsUris.length} subfs records for blob reuse`);
91
92 // Fetch all subfs records in parallel
93 const subfsRecords = await Promise.all(
94 subfsUris.map(async ({ uri, path }) => {
95 try {
96 // Parse URI: at://did/collection/rkey
97 const parts = uri.replace('at://', '').split('/');
98 const subDid = parts[0];
99 const collection = parts[1];
100 const subRkey = parts[2];
101
102 const record = await agent.com.atproto.repo.getRecord({
103 repo: subDid,
104 collection: collection,
105 rkey: subRkey
106 });
107
108 return { record: record.data.value as any, mountPath: path };
109 } catch (err: any) {
110 logger.warn(`Failed to fetch subfs record ${uri}: ${err?.message}`, err);
111 return null;
112 }
113 })
114 );
115
116 // Merge blob maps from all subfs records
117 let totalSubfsBlobs = 0;
118 for (const subfsData of subfsRecords) {
119 if (subfsData && subfsData.record && 'root' in subfsData.record) {
120 // Extract blobs with the correct mount path prefix
121 const subfsMap = extractBlobMap(subfsData.record.root, subfsData.mountPath);
122 subfsMap.forEach((value, key) => {
123 existingBlobMap.set(key, value);
124 totalSubfsBlobs++;
125 });
126 }
127 }
128
129 console.log(`Merged ${totalSubfsBlobs} files from ${subfsUris.length} subfs records`);
130 logger.info(`Total blob map: ${existingBlobMap.size} files (main + subfs)`);
131 }
132
133 console.log(`Total existing blobs for reuse: ${existingBlobMap.size} files`);
134 logger.info(`Found existing manifest with ${existingBlobMap.size} files for incremental update`);
135 }
136 } catch (error: any) {
137 console.log('No existing record found or error:', error?.message || error);
138 if (error?.status !== 400 && error?.error !== 'RecordNotFound') {
139 logger.warn('Failed to fetch existing record, proceeding with full upload', error);
140 }
141 }
142
143 // Convert File objects to UploadedFile format
144 const uploadedFiles: UploadedFile[] = [];
145 const skippedFiles: Array<{ name: string; reason: string }> = [];
146
147 console.log('Processing files, count:', fileArray.length);
148 updateJobProgress(jobId, { phase: 'compressing' });
149
150 for (let i = 0; i < fileArray.length; i++) {
151 const file = fileArray[i];
152 console.log(`Processing file ${i + 1}/${fileArray.length}:`, file.name, file.size, 'bytes');
153 updateJobProgress(jobId, {
154 filesProcessed: i + 1,
155 currentFile: file.name
156 });
157
158 // Skip .git directory files
159 const normalizedPath = file.name.replace(/^[^\/]*\//, '');
160 if (normalizedPath.startsWith('.git/') || normalizedPath === '.git') {
161 console.log(`Skipping .git file: ${file.name}`);
162 skippedFiles.push({
163 name: file.name,
164 reason: '.git directory excluded'
165 });
166 continue;
167 }
168
169 // Skip files that are too large
170 const maxSize = MAX_FILE_SIZE;
171 if (file.size > maxSize) {
172 skippedFiles.push({
173 name: file.name,
174 reason: `file too large (${(file.size / 1024 / 1024).toFixed(2)}MB, max 100MB)`
175 });
176 continue;
177 }
178
179 const arrayBuffer = await file.arrayBuffer();
180 const originalContent = Buffer.from(arrayBuffer);
181 const originalMimeType = file.type || 'application/octet-stream';
182
183 // Compress and base64 encode ALL files
184 const compressedContent = compressFile(originalContent);
185 const base64Content = Buffer.from(compressedContent.toString('base64'), 'binary');
186 const compressionRatio = (compressedContent.length / originalContent.length * 100).toFixed(1);
187 console.log(`Compressing ${file.name}: ${originalContent.length} -> ${compressedContent.length} bytes (${compressionRatio}%), base64: ${base64Content.length} bytes`);
188 logger.info(`Compressing ${file.name}: ${originalContent.length} -> ${compressedContent.length} bytes (${compressionRatio}%), base64: ${base64Content.length} bytes`);
189
190 uploadedFiles.push({
191 name: file.name,
192 content: base64Content,
193 mimeType: originalMimeType,
194 size: base64Content.length,
195 compressed: true,
196 originalMimeType
197 });
198 }
199
200 // Update total file count after filtering (important for progress tracking)
201 updateJobProgress(jobId, {
202 totalFiles: uploadedFiles.length
203 });
204
205 // Check total size limit
206 const totalSize = uploadedFiles.reduce((sum, file) => sum + file.size, 0);
207 const maxTotalSize = MAX_SITE_SIZE;
208
209 if (totalSize > maxTotalSize) {
210 throw new Error(`Total upload size ${(totalSize / 1024 / 1024).toFixed(2)}MB exceeds 300MB limit`);
211 }
212
213 // Check file count limit
214 if (uploadedFiles.length > MAX_FILE_COUNT) {
215 throw new Error(`File count ${uploadedFiles.length} exceeds ${MAX_FILE_COUNT} files limit`);
216 }
217
218 console.log(`After filtering: ${uploadedFiles.length} files to process (${skippedFiles.length} skipped)`);
219
220 if (uploadedFiles.length === 0) {
221 // Create empty manifest
222 const emptyManifest = {
223 $type: 'place.wisp.fs',
224 site: siteName,
225 root: {
226 type: 'directory',
227 entries: []
228 },
229 fileCount: 0,
230 createdAt: new Date().toISOString()
231 };
232
233 const validationResult = validateRecord(emptyManifest);
234 if (!validationResult.success) {
235 throw new Error(`Invalid manifest: ${validationResult.error?.message || 'Validation failed'}`);
236 }
237
238 const rkey = siteName;
239 updateJobProgress(jobId, { phase: 'finalizing' });
240
241 const record = await agent.com.atproto.repo.putRecord({
242 repo: did,
243 collection: 'place.wisp.fs',
244 rkey: rkey,
245 record: emptyManifest
246 });
247
248 await upsertSite(did, rkey, siteName);
249
250 completeUploadJob(jobId, {
251 success: true,
252 uri: record.data.uri,
253 cid: record.data.cid,
254 fileCount: 0,
255 siteName,
256 skippedFiles
257 });
258 return;
259 }
260
261 // Process files into directory structure
262 console.log('Processing uploaded files into directory structure...');
263 const validUploadedFiles = uploadedFiles.filter((f, i) => {
264 if (!f || !f.name || !f.content) {
265 console.error(`Filtering out invalid file at index ${i}`);
266 return false;
267 }
268 return true;
269 });
270
271 const { directory, fileCount } = processUploadedFiles(validUploadedFiles);
272 console.log('Directory structure created, file count:', fileCount);
273
274 // Upload files as blobs with retry logic for DPoP nonce conflicts
275 console.log('Starting blob upload/reuse phase...');
276 updateJobProgress(jobId, { phase: 'uploading' });
277
278 // Helper function to upload blob with exponential backoff retry
279 const uploadBlobWithRetry = async (
280 agent: Agent,
281 content: Buffer,
282 mimeType: string,
283 fileName: string,
284 maxRetries = 3
285 ) => {
286 for (let attempt = 0; attempt < maxRetries; attempt++) {
287 try {
288 return await agent.com.atproto.repo.uploadBlob(content, { encoding: mimeType });
289 } catch (error: any) {
290 const isDPoPNonceError =
291 error?.message?.toLowerCase().includes('nonce') ||
292 error?.message?.toLowerCase().includes('dpop') ||
293 error?.status === 409;
294
295 if (isDPoPNonceError && attempt < maxRetries - 1) {
296 const backoffMs = 100 * Math.pow(2, attempt); // 100ms, 200ms, 400ms
297 logger.info(`[File Upload] 🔄 DPoP nonce conflict for ${fileName}, retrying in ${backoffMs}ms (attempt ${attempt + 1}/${maxRetries})`);
298 await new Promise(resolve => setTimeout(resolve, backoffMs));
299 continue;
300 }
301 throw error;
302 }
303 }
304 throw new Error(`Failed to upload ${fileName} after ${maxRetries} attempts`);
305 };
306
307 // Use sliding window concurrency for maximum throughput
308 const CONCURRENCY_LIMIT = 50; // Maximum concurrent uploads with retry logic
309 const uploadedBlobs: Array<{
310 result: FileUploadResult;
311 filePath: string;
312 sentMimeType: string;
313 returnedMimeType: string;
314 reused: boolean;
315 }> = [];
316
317 // Process file with sliding window concurrency
318 const processFile = async (file: UploadedFile, index: number) => {
319 try {
320 if (!file || !file.name) {
321 throw new Error(`Undefined file at index ${index}`);
322 }
323
324 const fileCID = computeCID(file.content);
325 const normalizedPath = file.name.replace(/^[^\/]*\//, '');
326 const existingBlob = existingBlobMap.get(normalizedPath) || existingBlobMap.get(file.name);
327
328 if (existingBlob && existingBlob.cid === fileCID) {
329 logger.info(`[File Upload] ♻️ Reused: ${file.name} (unchanged, CID: ${fileCID})`);
330 updateJobProgress(jobId, { filesReused: (getUploadJob(jobId)?.progress.filesReused || 0) + 1 });
331
332 return {
333 result: {
334 hash: existingBlob.cid,
335 blobRef: existingBlob.blobRef,
336 ...(file.compressed && {
337 encoding: 'gzip' as const,
338 mimeType: file.originalMimeType || file.mimeType,
339 base64: true
340 })
341 },
342 filePath: file.name,
343 sentMimeType: file.mimeType,
344 returnedMimeType: existingBlob.blobRef.mimeType,
345 reused: true
346 };
347 }
348
349 const uploadMimeType = file.compressed || file.mimeType.startsWith('text/html')
350 ? 'application/octet-stream'
351 : file.mimeType;
352
353 const compressionInfo = file.compressed ? ' (gzipped)' : '';
354 const fileSizeMB = (file.size / 1024 / 1024).toFixed(2);
355 logger.info(`[File Upload] ⬆️ Uploading: ${file.name} (${fileSizeMB}MB${compressionInfo})`);
356
357 const uploadResult = await uploadBlobWithRetry(
358 agent,
359 file.content,
360 uploadMimeType,
361 file.name
362 );
363
364 const returnedBlobRef = uploadResult.data.blob;
365 updateJobProgress(jobId, { filesUploaded: (getUploadJob(jobId)?.progress.filesUploaded || 0) + 1 });
366 logger.info(`[File Upload] ✅ Uploaded: ${file.name} (CID: ${fileCID})`);
367
368 return {
369 result: {
370 hash: returnedBlobRef.ref.toString(),
371 blobRef: returnedBlobRef,
372 ...(file.compressed && {
373 encoding: 'gzip' as const,
374 mimeType: file.originalMimeType || file.mimeType,
375 base64: true
376 })
377 },
378 filePath: file.name,
379 sentMimeType: file.mimeType,
380 returnedMimeType: returnedBlobRef.mimeType,
381 reused: false
382 };
383 } catch (uploadError) {
384 logger.error('Upload failed for file', uploadError);
385 throw uploadError;
386 }
387 };
388
389 // Sliding window concurrency control
390 const processWithConcurrency = async () => {
391 const results: any[] = [];
392 let fileIndex = 0;
393 const executing = new Set<Promise<void>>();
394
395 for (const file of validUploadedFiles) {
396 const currentIndex = fileIndex++;
397
398 const promise = processFile(file, currentIndex)
399 .then(result => {
400 results[currentIndex] = result;
401 })
402 .catch(error => {
403 logger.error(`Failed to process file at index ${currentIndex}`, error);
404 throw error; // Re-throw to fail the entire upload
405 })
406 .finally(() => {
407 executing.delete(promise);
408 });
409
410 executing.add(promise);
411
412 if (executing.size >= CONCURRENCY_LIMIT) {
413 await Promise.race(executing);
414 }
415 }
416
417 // Wait for remaining uploads
418 await Promise.all(executing);
419 return results.filter(r => r !== undefined); // Filter out any undefined entries
420 };
421
422 const allResults = await processWithConcurrency();
423 uploadedBlobs.push(...allResults);
424
425 const currentReused = uploadedBlobs.filter(b => b.reused).length;
426 const currentUploaded = uploadedBlobs.filter(b => !b.reused).length;
427 logger.info(`[File Upload] 🎉 Upload complete → ${uploadedBlobs.length}/${validUploadedFiles.length} files (${currentUploaded} uploaded, ${currentReused} reused)`);
428
429 const reusedCount = uploadedBlobs.filter(b => b.reused).length;
430 const uploadedCount = uploadedBlobs.filter(b => !b.reused).length;
431 logger.info(`[File Upload] 🎉 Upload phase complete! Total: ${uploadedBlobs.length} files (${uploadedCount} uploaded, ${reusedCount} reused)`);
432
433 const uploadResults: FileUploadResult[] = uploadedBlobs.map(blob => blob.result);
434 const filePaths: string[] = uploadedBlobs.map(blob => blob.filePath);
435
436 // Update directory with file blobs
437 console.log('Updating directory with blob references...');
438 updateJobProgress(jobId, { phase: 'creating_manifest' });
439 const updatedDirectory = updateFileBlobs(directory, uploadResults, filePaths);
440
441 // Check if we need to split into subfs records
442 // Split proactively if we have lots of files to avoid hitting manifest size limits
443 const MAX_MANIFEST_SIZE = 140 * 1024; // 140KB to be safe (PDS limit is 150KB)
444 const FILE_COUNT_THRESHOLD = 250; // Start splitting early
445 const subfsRecords: Array<{ uri: string; path: string }> = [];
446 let workingDirectory = updatedDirectory;
447 let currentFileCount = fileCount;
448
449 // Create initial manifest to check size
450 let manifest = createManifest(siteName, workingDirectory, fileCount);
451 let manifestSize = JSON.stringify(manifest).length;
452
453 // Split if we have lots of files OR if manifest is already too large
454 if (fileCount >= FILE_COUNT_THRESHOLD || manifestSize > MAX_MANIFEST_SIZE) {
455 console.log(`⚠️ Large site detected (${fileCount} files, ${(manifestSize / 1024).toFixed(1)}KB), splitting into subfs records...`);
456 logger.info(`Large site with ${fileCount} files, splitting into subfs records`);
457
458 // Keep splitting until manifest fits under limit
459 let attempts = 0;
460 const MAX_ATTEMPTS = 100; // Allow many splits for very large sites
461
462 while (manifestSize > MAX_MANIFEST_SIZE && attempts < MAX_ATTEMPTS) {
463 attempts++;
464
465 // Find all directories sorted by size (largest first)
466 const directories = findLargeDirectories(workingDirectory);
467 directories.sort((a, b) => b.size - a.size);
468
469 if (directories.length === 0) {
470 // No more directories to split - this should be very rare
471 throw new Error(
472 `Cannot split manifest further - no subdirectories available. ` +
473 `Current size: ${(manifestSize / 1024).toFixed(1)}KB. ` +
474 `Try organizing files into subdirectories.`
475 );
476 }
477
478 // Pick the largest directory
479 const largestDir = directories[0];
480 console.log(` Split #${attempts}: ${largestDir.path} (${largestDir.fileCount} files, ${(largestDir.size / 1024).toFixed(1)}KB)`);
481
482 // Create a subfs record for this directory
483 const subfsRkey = TID.nextStr();
484 const subfsManifest = {
485 $type: 'place.wisp.subfs' as const,
486 root: largestDir.directory,
487 fileCount: largestDir.fileCount,
488 createdAt: new Date().toISOString()
489 };
490
491 // Validate subfs record
492 const subfsValidation = validateSubfsRecord(subfsManifest);
493 if (!subfsValidation.success) {
494 throw new Error(`Invalid subfs manifest: ${subfsValidation.error?.message || 'Validation failed'}`);
495 }
496
497 // Upload subfs record to PDS
498 const subfsRecord = await agent.com.atproto.repo.putRecord({
499 repo: did,
500 collection: 'place.wisp.subfs',
501 rkey: subfsRkey,
502 record: subfsManifest
503 });
504
505 const subfsUri = subfsRecord.data.uri;
506 subfsRecords.push({ uri: subfsUri, path: largestDir.path });
507 console.log(` ✅ Created subfs: ${subfsUri}`);
508 logger.info(`Created subfs record for ${largestDir.path}: ${subfsUri}`);
509
510 // Replace directory with subfs node in the main tree
511 workingDirectory = replaceDirectoryWithSubfs(workingDirectory, largestDir.path, subfsUri);
512
513 // Recreate manifest and check new size
514 currentFileCount -= largestDir.fileCount;
515 manifest = createManifest(siteName, workingDirectory, fileCount);
516 manifestSize = JSON.stringify(manifest).length;
517 const newSizeKB = (manifestSize / 1024).toFixed(1);
518 console.log(` → Manifest now ${newSizeKB}KB with ${currentFileCount} files (${subfsRecords.length} subfs total)`);
519
520 // Check if we're under the limit now
521 if (manifestSize <= MAX_MANIFEST_SIZE) {
522 console.log(` ✅ Manifest fits! (${newSizeKB}KB < 140KB)`);
523 break;
524 }
525 }
526
527 if (manifestSize > MAX_MANIFEST_SIZE) {
528 throw new Error(
529 `Failed to fit manifest after splitting ${attempts} directories. ` +
530 `Current size: ${(manifestSize / 1024).toFixed(1)}KB. ` +
531 `This should never happen - please report this issue.`
532 );
533 }
534
535 console.log(`✅ Split complete: ${subfsRecords.length} subfs records, ${currentFileCount} files in main, ${(manifestSize / 1024).toFixed(1)}KB manifest`);
536 logger.info(`Split into ${subfsRecords.length} subfs records, ${currentFileCount} files remaining in main tree`);
537 } else {
538 const manifestSizeKB = (manifestSize / 1024).toFixed(1);
539 console.log(`Manifest created (${fileCount} files, ${manifestSizeKB}KB JSON) - no splitting needed`);
540 }
541
542 const rkey = siteName;
543 updateJobProgress(jobId, { phase: 'finalizing' });
544
545 console.log('Putting record to PDS with rkey:', rkey);
546 const record = await agent.com.atproto.repo.putRecord({
547 repo: did,
548 collection: 'place.wisp.fs',
549 rkey: rkey,
550 record: manifest
551 });
552 console.log('Record successfully created on PDS:', record.data.uri);
553
554 // Store site in database cache
555 await upsertSite(did, rkey, siteName);
556
557 // Clean up old subfs records if we had any
558 if (oldSubfsUris.length > 0) {
559 console.log(`Cleaning up ${oldSubfsUris.length} old subfs records...`);
560 logger.info(`Cleaning up ${oldSubfsUris.length} old subfs records`);
561
562 // Delete old subfs records in parallel (don't wait for completion)
563 Promise.all(
564 oldSubfsUris.map(async ({ uri }) => {
565 try {
566 // Parse URI: at://did/collection/rkey
567 const parts = uri.replace('at://', '').split('/');
568 const subRkey = parts[2];
569
570 await agent.com.atproto.repo.deleteRecord({
571 repo: did,
572 collection: 'place.wisp.subfs',
573 rkey: subRkey
574 });
575
576 console.log(` 🗑️ Deleted old subfs: ${uri}`);
577 logger.info(`Deleted old subfs record: ${uri}`);
578 } catch (err: any) {
579 // Don't fail the whole upload if cleanup fails
580 console.warn(`Failed to delete old subfs ${uri}:`, err?.message);
581 logger.warn(`Failed to delete old subfs ${uri}`, err);
582 }
583 })
584 ).catch(err => {
585 // Log but don't fail if cleanup fails
586 logger.warn('Some subfs cleanup operations failed', err);
587 });
588 }
589
590 completeUploadJob(jobId, {
591 success: true,
592 uri: record.data.uri,
593 cid: record.data.cid,
594 fileCount,
595 siteName,
596 skippedFiles,
597 uploadedCount: validUploadedFiles.length
598 });
599
600 console.log('=== UPLOAD FILES COMPLETE ===');
601 } catch (error) {
602 console.error('=== UPLOAD ERROR ===');
603 console.error('Error details:', error);
604 logger.error('Upload error', error);
605 failUploadJob(jobId, error instanceof Error ? error.message : 'Unknown error');
606 }
607}
608
609export const wispRoutes = (client: NodeOAuthClient, cookieSecret: string) =>
610 new Elysia({
611 prefix: '/wisp',
612 cookie: {
613 secrets: cookieSecret,
614 sign: ['did']
615 }
616 })
617 .derive(async ({ cookie }) => {
618 const auth = await requireAuth(client, cookie)
619 return { auth }
620 })
621 .get(
622 '/upload-progress/:jobId',
623 async ({ params: { jobId }, auth, set }) => {
624 const job = getUploadJob(jobId);
625
626 if (!job) {
627 set.status = 404;
628 return { error: 'Job not found' };
629 }
630
631 // Verify job belongs to authenticated user
632 if (job.did !== auth.did) {
633 set.status = 403;
634 return { error: 'Unauthorized' };
635 }
636
637 // Set up SSE headers
638 set.headers = {
639 'Content-Type': 'text/event-stream',
640 'Cache-Control': 'no-cache',
641 'Connection': 'keep-alive'
642 };
643
644 const stream = new ReadableStream({
645 start(controller) {
646 const encoder = new TextEncoder();
647
648 // Send initial state
649 const sendEvent = (event: string, data: any) => {
650 try {
651 const message = `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`;
652 controller.enqueue(encoder.encode(message));
653 } catch (err) {
654 // Controller closed, ignore
655 }
656 };
657
658 // Send keepalive comment every 15 seconds to prevent timeout
659 const keepaliveInterval = setInterval(() => {
660 try {
661 controller.enqueue(encoder.encode(': keepalive\n\n'));
662 } catch (err) {
663 // Controller closed, stop sending keepalives
664 clearInterval(keepaliveInterval);
665 }
666 }, 15000);
667
668 // Send current job state immediately
669 sendEvent('progress', {
670 status: job.status,
671 progress: job.progress,
672 result: job.result,
673 error: job.error
674 });
675
676 // If job is already completed or failed, close the stream
677 if (job.status === 'completed' || job.status === 'failed') {
678 clearInterval(keepaliveInterval);
679 controller.close();
680 return;
681 }
682
683 // Listen for updates
684 const cleanup = addJobListener(jobId, (event, data) => {
685 sendEvent(event, data);
686
687 // Close stream after done or error event
688 if (event === 'done' || event === 'error') {
689 clearInterval(keepaliveInterval);
690 setTimeout(() => {
691 try {
692 controller.close();
693 } catch (err) {
694 // Already closed
695 }
696 }, 100);
697 }
698 });
699
700 // Cleanup on disconnect
701 return () => {
702 clearInterval(keepaliveInterval);
703 cleanup();
704 };
705 }
706 });
707
708 return new Response(stream);
709 }
710 )
711 .post(
712 '/upload-files',
713 async ({ body, auth }) => {
714 const { siteName, files } = body as {
715 siteName: string;
716 files: File | File[]
717 };
718
719 console.log('=== UPLOAD FILES START ===');
720 console.log('Site name:', siteName);
721 console.log('Files received:', Array.isArray(files) ? files.length : 'single file');
722
723 try {
724 if (!siteName) {
725 throw new Error('Site name is required')
726 }
727
728 if (!isValidSiteName(siteName)) {
729 throw new Error('Invalid site name: must be 1-512 characters and contain only alphanumeric, dots, dashes, underscores, tildes, and colons')
730 }
731
732 // Check if files were provided
733 const hasFiles = files && (Array.isArray(files) ? files.length > 0 : !!files);
734
735 if (!hasFiles) {
736 // Handle empty upload synchronously (fast operation)
737 const agent = new Agent((url, init) => auth.session.fetchHandler(url, init))
738
739 const emptyManifest = {
740 $type: 'place.wisp.fs',
741 site: siteName,
742 root: {
743 type: 'directory',
744 entries: []
745 },
746 fileCount: 0,
747 createdAt: new Date().toISOString()
748 };
749
750 const validationResult = validateRecord(emptyManifest);
751 if (!validationResult.success) {
752 throw new Error(`Invalid manifest: ${validationResult.error?.message || 'Validation failed'}`);
753 }
754
755 const rkey = siteName;
756
757 const record = await agent.com.atproto.repo.putRecord({
758 repo: auth.did,
759 collection: 'place.wisp.fs',
760 rkey: rkey,
761 record: emptyManifest
762 });
763
764 await upsertSite(auth.did, rkey, siteName);
765
766 return {
767 success: true,
768 uri: record.data.uri,
769 cid: record.data.cid,
770 fileCount: 0,
771 siteName
772 };
773 }
774
775 // For file uploads, create a job and process in background
776 const fileArray = Array.isArray(files) ? files : [files];
777 const jobId = createUploadJob(auth.did, siteName, fileArray.length);
778
779 // Create agent with OAuth session
780 const agent = new Agent((url, init) => auth.session.fetchHandler(url, init))
781 console.log('Agent created for DID:', auth.did);
782 console.log('Created upload job:', jobId);
783
784 // Start background processing (don't await)
785 processUploadInBackground(jobId, agent, auth.did, siteName, fileArray).catch(err => {
786 console.error('Background upload process failed:', err);
787 logger.error('Background upload process failed', err);
788 });
789
790 // Return immediately with job ID
791 return {
792 success: true,
793 jobId,
794 message: 'Upload started. Connect to /wisp/upload-progress/' + jobId + ' for progress updates.'
795 };
796 } catch (error) {
797 console.error('=== UPLOAD ERROR ===');
798 console.error('Error details:', error);
799 logger.error('Upload error', error);
800 throw new Error(`Failed to upload files: ${error instanceof Error ? error.message : 'Unknown error'}`);
801 }
802 }
803 )