Monorepo for wisp.place. A static site hosting service built on top of the AT Protocol. wisp.place
at v1.0.0 7.7 kB view raw
1import { existsSync, rmSync } from 'fs'; 2import { getPdsForDid, downloadAndCacheSite, extractBlobCid, fetchSiteRecord } from './utils'; 3import { upsertSite, tryAcquireLock, releaseLock } from './db'; 4import { safeFetch } from './safe-fetch'; 5import { isRecord, validateRecord } from '../lexicon/types/place/wisp/fs'; 6import { Firehose } from '@atproto/sync'; 7import { IdResolver } from '@atproto/identity'; 8 9const CACHE_DIR = './cache/sites'; 10 11export class FirehoseWorker { 12 private firehose: Firehose | null = null; 13 private idResolver: IdResolver; 14 private isShuttingDown = false; 15 private lastEventTime = Date.now(); 16 17 constructor( 18 private logger?: (msg: string, data?: Record<string, unknown>) => void, 19 ) { 20 this.idResolver = new IdResolver(); 21 } 22 23 private log(msg: string, data?: Record<string, unknown>) { 24 const log = this.logger || console.log; 25 log(`[FirehoseWorker] ${msg}`, data || {}); 26 } 27 28 start() { 29 this.log('Starting firehose worker'); 30 this.connect(); 31 } 32 33 stop() { 34 this.log('Stopping firehose worker'); 35 this.isShuttingDown = true; 36 37 if (this.firehose) { 38 this.firehose.destroy(); 39 this.firehose = null; 40 } 41 } 42 43 private connect() { 44 if (this.isShuttingDown) return; 45 46 this.log('Connecting to AT Protocol firehose'); 47 48 this.firehose = new Firehose({ 49 idResolver: this.idResolver, 50 service: 'wss://bsky.network', 51 filterCollections: ['place.wisp.fs'], 52 handleEvent: async (evt: any) => { 53 this.lastEventTime = Date.now(); 54 55 // Watch for write events 56 if (evt.event === 'create' || evt.event === 'update') { 57 const record = evt.record; 58 59 // If the write is a valid place.wisp.fs record 60 if ( 61 evt.collection === 'place.wisp.fs' && 62 isRecord(record) && 63 validateRecord(record).success 64 ) { 65 this.log('Received place.wisp.fs event', { 66 did: evt.did, 67 event: evt.event, 68 rkey: evt.rkey, 69 }); 70 71 try { 72 await this.handleCreateOrUpdate(evt.did, evt.rkey, record, evt.cid?.toString()); 73 } catch (err) { 74 this.log('Error handling event', { 75 did: evt.did, 76 event: evt.event, 77 rkey: evt.rkey, 78 error: err instanceof Error ? err.message : String(err), 79 }); 80 } 81 } 82 } else if (evt.event === 'delete' && evt.collection === 'place.wisp.fs') { 83 this.log('Received delete event', { 84 did: evt.did, 85 rkey: evt.rkey, 86 }); 87 88 try { 89 await this.handleDelete(evt.did, evt.rkey); 90 } catch (err) { 91 this.log('Error handling delete', { 92 did: evt.did, 93 rkey: evt.rkey, 94 error: err instanceof Error ? err.message : String(err), 95 }); 96 } 97 } 98 }, 99 onError: (err: any) => { 100 this.log('Firehose error', { 101 error: err instanceof Error ? err.message : String(err), 102 stack: err instanceof Error ? err.stack : undefined, 103 fullError: err, 104 }); 105 console.error('Full firehose error:', err); 106 }, 107 }); 108 109 this.firehose.start(); 110 this.log('Firehose started'); 111 } 112 113 private async handleCreateOrUpdate(did: string, site: string, record: any, eventCid?: string) { 114 this.log('Processing create/update', { did, site }); 115 116 // Record is already validated in handleEvent 117 const fsRecord = record; 118 119 const pdsEndpoint = await getPdsForDid(did); 120 if (!pdsEndpoint) { 121 this.log('Could not resolve PDS for DID', { did }); 122 return; 123 } 124 125 this.log('Resolved PDS', { did, pdsEndpoint }); 126 127 // Verify record exists on PDS and fetch its CID 128 let verifiedCid: string; 129 try { 130 const result = await fetchSiteRecord(did, site); 131 132 if (!result) { 133 this.log('Record not found on PDS, skipping cache', { did, site }); 134 return; 135 } 136 137 verifiedCid = result.cid; 138 139 // Verify event CID matches PDS CID (prevent cache poisoning) 140 if (eventCid && eventCid !== verifiedCid) { 141 this.log('CID mismatch detected - potential spoofed event', { 142 did, 143 site, 144 eventCid, 145 verifiedCid 146 }); 147 return; 148 } 149 150 this.log('Record verified on PDS', { did, site, cid: verifiedCid }); 151 } catch (err) { 152 this.log('Failed to verify record on PDS', { 153 did, 154 site, 155 error: err instanceof Error ? err.message : String(err), 156 }); 157 return; 158 } 159 160 // Cache the record with verified CID (uses atomic swap internally) 161 // All instances cache locally for edge serving 162 await downloadAndCacheSite(did, site, fsRecord, pdsEndpoint, verifiedCid); 163 164 // Acquire distributed lock only for database write to prevent duplicate writes 165 const lockKey = `db:upsert:${did}:${site}`; 166 const lockAcquired = await tryAcquireLock(lockKey); 167 168 if (!lockAcquired) { 169 this.log('Another instance is writing to DB, skipping upsert', { did, site }); 170 this.log('Successfully processed create/update (cached locally)', { did, site }); 171 return; 172 } 173 174 try { 175 // Upsert site to database (only one instance does this) 176 await upsertSite(did, site, fsRecord.site); 177 this.log('Successfully processed create/update (cached + DB updated)', { did, site }); 178 } finally { 179 // Always release lock, even if DB write fails 180 await releaseLock(lockKey); 181 } 182 } 183 184 private async handleDelete(did: string, site: string) { 185 this.log('Processing delete', { did, site }); 186 187 // All instances should delete their local cache (no lock needed) 188 const pdsEndpoint = await getPdsForDid(did); 189 if (!pdsEndpoint) { 190 this.log('Could not resolve PDS for DID', { did }); 191 return; 192 } 193 194 // Verify record is actually deleted from PDS 195 try { 196 const recordUrl = `${pdsEndpoint}/xrpc/com.atproto.repo.getRecord?repo=${encodeURIComponent(did)}&collection=place.wisp.fs&rkey=${encodeURIComponent(site)}`; 197 const recordRes = await safeFetch(recordUrl); 198 199 if (recordRes.ok) { 200 this.log('Record still exists on PDS, not deleting cache', { 201 did, 202 site, 203 }); 204 return; 205 } 206 207 this.log('Verified record is deleted from PDS', { 208 did, 209 site, 210 status: recordRes.status, 211 }); 212 } catch (err) { 213 this.log('Error verifying deletion on PDS', { 214 did, 215 site, 216 error: err instanceof Error ? err.message : String(err), 217 }); 218 } 219 220 // Delete cache 221 this.deleteCache(did, site); 222 223 this.log('Successfully processed delete', { did, site }); 224 } 225 226 private deleteCache(did: string, site: string) { 227 const cacheDir = `${CACHE_DIR}/${did}/${site}`; 228 229 if (!existsSync(cacheDir)) { 230 this.log('Cache directory does not exist, nothing to delete', { 231 did, 232 site, 233 }); 234 return; 235 } 236 237 try { 238 rmSync(cacheDir, { recursive: true, force: true }); 239 this.log('Cache deleted', { did, site, path: cacheDir }); 240 } catch (err) { 241 this.log('Failed to delete cache', { 242 did, 243 site, 244 path: cacheDir, 245 error: err instanceof Error ? err.message : String(err), 246 }); 247 } 248 } 249 250 getHealth() { 251 const isConnected = this.firehose !== null; 252 const timeSinceLastEvent = Date.now() - this.lastEventTime; 253 254 return { 255 connected: isConnected, 256 lastEventTime: this.lastEventTime, 257 timeSinceLastEvent, 258 healthy: isConnected && timeSinceLastEvent < 300000, // 5 minutes 259 }; 260 } 261}