Monorepo for wisp.place. A static site hosting service built on top of the AT Protocol. wisp.place
1import { existsSync, rmSync } from 'fs' 2import { 3 getPdsForDid, 4 downloadAndCacheSite, 5 extractBlobCid, 6 fetchSiteRecord 7} from './utils' 8import { upsertSite, tryAcquireLock, releaseLock } from './db' 9import { safeFetch } from './safe-fetch' 10import { isRecord, validateRecord } from '../lexicon/types/place/wisp/fs' 11import { Firehose } from '@atproto/sync' 12import { IdResolver } from '@atproto/identity' 13import { invalidateSiteCache } from './cache' 14 15const CACHE_DIR = './cache/sites' 16 17export class FirehoseWorker { 18 private firehose: Firehose | null = null 19 private idResolver: IdResolver 20 private isShuttingDown = false 21 private lastEventTime = Date.now() 22 23 constructor( 24 private logger?: (msg: string, data?: Record<string, unknown>) => void 25 ) { 26 this.idResolver = new IdResolver() 27 } 28 29 private log(msg: string, data?: Record<string, unknown>) { 30 const log = this.logger || console.log 31 log(`[FirehoseWorker] ${msg}`, data || {}) 32 } 33 34 start() { 35 this.log('Starting firehose worker') 36 this.connect() 37 } 38 39 stop() { 40 this.log('Stopping firehose worker') 41 this.isShuttingDown = true 42 43 if (this.firehose) { 44 this.firehose.destroy() 45 this.firehose = null 46 } 47 } 48 49 private connect() { 50 if (this.isShuttingDown) return 51 52 this.log('Connecting to AT Protocol firehose') 53 54 this.firehose = new Firehose({ 55 idResolver: this.idResolver, 56 service: 'wss://bsky.network', 57 filterCollections: ['place.wisp.fs'], 58 handleEvent: async (evt: any) => { 59 this.lastEventTime = Date.now() 60 61 // Watch for write events 62 if (evt.event === 'create' || evt.event === 'update') { 63 const record = evt.record 64 65 // If the write is a valid place.wisp.fs record 66 if ( 67 evt.collection === 'place.wisp.fs' && 68 isRecord(record) && 69 validateRecord(record).success 70 ) { 71 this.log('Received place.wisp.fs event', { 72 did: evt.did, 73 event: evt.event, 74 rkey: evt.rkey 75 }) 76 77 try { 78 await this.handleCreateOrUpdate( 79 evt.did, 80 evt.rkey, 81 record, 82 evt.cid?.toString() 83 ) 84 } catch (err) { 85 this.log('Error handling event', { 86 did: evt.did, 87 event: evt.event, 88 rkey: evt.rkey, 89 error: 90 err instanceof Error 91 ? err.message 92 : String(err) 93 }) 94 } 95 } 96 } else if ( 97 evt.event === 'delete' && 98 evt.collection === 'place.wisp.fs' 99 ) { 100 this.log('Received delete event', { 101 did: evt.did, 102 rkey: evt.rkey 103 }) 104 105 try { 106 await this.handleDelete(evt.did, evt.rkey) 107 } catch (err) { 108 this.log('Error handling delete', { 109 did: evt.did, 110 rkey: evt.rkey, 111 error: 112 err instanceof Error ? err.message : String(err) 113 }) 114 } 115 } 116 }, 117 onError: (err: any) => { 118 this.log('Firehose error', { 119 error: err instanceof Error ? err.message : String(err), 120 stack: err instanceof Error ? err.stack : undefined, 121 fullError: err 122 }) 123 console.error('Full firehose error:', err) 124 } 125 }) 126 127 this.firehose.start() 128 this.log('Firehose started') 129 } 130 131 private async handleCreateOrUpdate( 132 did: string, 133 site: string, 134 record: any, 135 eventCid?: string 136 ) { 137 this.log('Processing create/update', { did, site }) 138 139 // Record is already validated in handleEvent 140 const fsRecord = record 141 142 const pdsEndpoint = await getPdsForDid(did) 143 if (!pdsEndpoint) { 144 this.log('Could not resolve PDS for DID', { did }) 145 return 146 } 147 148 this.log('Resolved PDS', { did, pdsEndpoint }) 149 150 // Verify record exists on PDS and fetch its CID 151 this.log('Verifying record on PDS', { did, site }) 152 let verifiedCid: string 153 try { 154 const result = await fetchSiteRecord(did, site) 155 156 if (!result) { 157 this.log('Record not found on PDS, skipping cache', { 158 did, 159 site 160 }) 161 return 162 } 163 164 verifiedCid = result.cid 165 166 // Verify event CID matches PDS CID (prevent cache poisoning) 167 if (eventCid && eventCid !== verifiedCid) { 168 this.log('CID mismatch detected - potential spoofed event', { 169 did, 170 site, 171 eventCid, 172 verifiedCid 173 }) 174 return 175 } 176 177 this.log('Record verified on PDS', { did, site, cid: verifiedCid }) 178 } catch (err) { 179 this.log('Failed to verify record on PDS', { 180 did, 181 site, 182 error: err instanceof Error ? err.message : String(err) 183 }) 184 return 185 } 186 187 // Invalidate in-memory caches before updating 188 invalidateSiteCache(did, site) 189 190 // Cache the record with verified CID (uses atomic swap internally) 191 // All instances cache locally for edge serving 192 await downloadAndCacheSite( 193 did, 194 site, 195 fsRecord, 196 pdsEndpoint, 197 verifiedCid 198 ) 199 200 // Acquire distributed lock only for database write to prevent duplicate writes 201 // Note: upsertSite will check cache-only mode internally and skip if needed 202 const lockKey = `db:upsert:${did}:${site}` 203 const lockAcquired = await tryAcquireLock(lockKey) 204 205 if (!lockAcquired) { 206 this.log('Another instance is writing to DB, skipping upsert', { 207 did, 208 site 209 }) 210 this.log('Successfully processed create/update (cached locally)', { 211 did, 212 site 213 }) 214 return 215 } 216 217 try { 218 // Upsert site to database (only one instance does this) 219 // In cache-only mode, this will be a no-op 220 await upsertSite(did, site, fsRecord.site) 221 this.log( 222 'Successfully processed create/update (cached + DB updated)', 223 { did, site } 224 ) 225 } finally { 226 // Always release lock, even if DB write fails 227 await releaseLock(lockKey) 228 } 229 } 230 231 private async handleDelete(did: string, site: string) { 232 this.log('Processing delete', { did, site }) 233 234 // All instances should delete their local cache (no lock needed) 235 const pdsEndpoint = await getPdsForDid(did) 236 if (!pdsEndpoint) { 237 this.log('Could not resolve PDS for DID', { did }) 238 return 239 } 240 241 // Verify record is actually deleted from PDS 242 try { 243 const recordUrl = `${pdsEndpoint}/xrpc/com.atproto.repo.getRecord?repo=${encodeURIComponent(did)}&collection=place.wisp.fs&rkey=${encodeURIComponent(site)}` 244 const recordRes = await safeFetch(recordUrl) 245 246 if (recordRes.ok) { 247 this.log('Record still exists on PDS, not deleting cache', { 248 did, 249 site 250 }) 251 return 252 } 253 254 this.log('Verified record is deleted from PDS', { 255 did, 256 site, 257 status: recordRes.status 258 }) 259 } catch (err) { 260 this.log('Error verifying deletion on PDS', { 261 did, 262 site, 263 error: err instanceof Error ? err.message : String(err) 264 }) 265 } 266 267 // Invalidate in-memory caches 268 invalidateSiteCache(did, site) 269 270 // Delete disk cache 271 this.deleteCache(did, site) 272 273 this.log('Successfully processed delete', { did, site }) 274 } 275 276 private deleteCache(did: string, site: string) { 277 const cacheDir = `${CACHE_DIR}/${did}/${site}` 278 279 if (!existsSync(cacheDir)) { 280 this.log('Cache directory does not exist, nothing to delete', { 281 did, 282 site 283 }) 284 return 285 } 286 287 try { 288 rmSync(cacheDir, { recursive: true, force: true }) 289 this.log('Cache deleted', { did, site, path: cacheDir }) 290 } catch (err) { 291 this.log('Failed to delete cache', { 292 did, 293 site, 294 path: cacheDir, 295 error: err instanceof Error ? err.message : String(err) 296 }) 297 } 298 } 299 300 getHealth() { 301 const isConnected = this.firehose !== null 302 const timeSinceLastEvent = Date.now() - this.lastEventTime 303 304 return { 305 connected: isConnected, 306 lastEventTime: this.lastEventTime, 307 timeSinceLastEvent, 308 healthy: isConnected && timeSinceLastEvent < 300000 // 5 minutes 309 } 310 } 311}