Monorepo for wisp.place. A static site hosting service built on top of the AT Protocol. wisp.place
1import { existsSync, rmSync } from 'fs' 2import { 3 getPdsForDid, 4 downloadAndCacheSite, 5 extractBlobCid, 6 fetchSiteRecord 7} from './utils' 8import { upsertSite, tryAcquireLock, releaseLock } from './db' 9import { safeFetch } from './safe-fetch' 10import { isRecord, validateRecord } from '../lexicon/types/place/wisp/fs' 11import { Firehose } from '@atproto/sync' 12import { IdResolver } from '@atproto/identity' 13import { invalidateSiteCache } from './cache' 14 15const CACHE_DIR = './cache/sites' 16 17export class FirehoseWorker { 18 private firehose: Firehose | null = null 19 private idResolver: IdResolver 20 private isShuttingDown = false 21 private lastEventTime = Date.now() 22 23 constructor( 24 private logger?: (msg: string, data?: Record<string, unknown>) => void 25 ) { 26 this.idResolver = new IdResolver() 27 } 28 29 private log(msg: string, data?: Record<string, unknown>) { 30 const log = this.logger || console.log 31 log(`[FirehoseWorker] ${msg}`, data || {}) 32 } 33 34 start() { 35 this.log('Starting firehose worker') 36 this.connect() 37 } 38 39 stop() { 40 this.log('Stopping firehose worker') 41 this.isShuttingDown = true 42 43 if (this.firehose) { 44 this.firehose.destroy() 45 this.firehose = null 46 } 47 } 48 49 private connect() { 50 if (this.isShuttingDown) return 51 52 this.log('Connecting to AT Protocol firehose') 53 54 this.firehose = new Firehose({ 55 idResolver: this.idResolver, 56 service: 'wss://bsky.network', 57 filterCollections: ['place.wisp.fs'], 58 handleEvent: async (evt: any) => { 59 this.lastEventTime = Date.now() 60 61 // Watch for write events 62 if (evt.event === 'create' || evt.event === 'update') { 63 const record = evt.record 64 65 // If the write is a valid place.wisp.fs record 66 if ( 67 evt.collection === 'place.wisp.fs' && 68 isRecord(record) && 69 validateRecord(record).success 70 ) { 71 this.log('Received place.wisp.fs event', { 72 did: evt.did, 73 event: evt.event, 74 rkey: evt.rkey 75 }) 76 77 try { 78 await this.handleCreateOrUpdate( 79 evt.did, 80 evt.rkey, 81 record, 82 evt.cid?.toString() 83 ) 84 } catch (err) { 85 this.log('Error handling event', { 86 did: evt.did, 87 event: evt.event, 88 rkey: evt.rkey, 89 error: 90 err instanceof Error 91 ? err.message 92 : String(err) 93 }) 94 } 95 } 96 } else if ( 97 evt.event === 'delete' && 98 evt.collection === 'place.wisp.fs' 99 ) { 100 this.log('Received delete event', { 101 did: evt.did, 102 rkey: evt.rkey 103 }) 104 105 try { 106 await this.handleDelete(evt.did, evt.rkey) 107 } catch (err) { 108 this.log('Error handling delete', { 109 did: evt.did, 110 rkey: evt.rkey, 111 error: 112 err instanceof Error ? err.message : String(err) 113 }) 114 } 115 } 116 }, 117 onError: (err: any) => { 118 this.log('Firehose error', { 119 error: err instanceof Error ? err.message : String(err), 120 stack: err instanceof Error ? err.stack : undefined, 121 fullError: err 122 }) 123 console.error('Full firehose error:', err) 124 } 125 }) 126 127 this.firehose.start() 128 this.log('Firehose started') 129 } 130 131 private async handleCreateOrUpdate( 132 did: string, 133 site: string, 134 record: any, 135 eventCid?: string 136 ) { 137 this.log('Processing create/update', { did, site }) 138 139 // Record is already validated in handleEvent 140 const fsRecord = record 141 142 const pdsEndpoint = await getPdsForDid(did) 143 if (!pdsEndpoint) { 144 this.log('Could not resolve PDS for DID', { did }) 145 return 146 } 147 148 this.log('Resolved PDS', { did, pdsEndpoint }) 149 150 // Verify record exists on PDS and fetch its CID 151 let verifiedCid: string 152 try { 153 const result = await fetchSiteRecord(did, site) 154 155 if (!result) { 156 this.log('Record not found on PDS, skipping cache', { 157 did, 158 site 159 }) 160 return 161 } 162 163 verifiedCid = result.cid 164 165 // Verify event CID matches PDS CID (prevent cache poisoning) 166 if (eventCid && eventCid !== verifiedCid) { 167 this.log('CID mismatch detected - potential spoofed event', { 168 did, 169 site, 170 eventCid, 171 verifiedCid 172 }) 173 return 174 } 175 176 this.log('Record verified on PDS', { did, site, cid: verifiedCid }) 177 } catch (err) { 178 this.log('Failed to verify record on PDS', { 179 did, 180 site, 181 error: err instanceof Error ? err.message : String(err) 182 }) 183 return 184 } 185 186 // Invalidate in-memory caches before updating 187 invalidateSiteCache(did, site) 188 189 // Cache the record with verified CID (uses atomic swap internally) 190 // All instances cache locally for edge serving 191 await downloadAndCacheSite( 192 did, 193 site, 194 fsRecord, 195 pdsEndpoint, 196 verifiedCid 197 ) 198 199 // Acquire distributed lock only for database write to prevent duplicate writes 200 // Note: upsertSite will check cache-only mode internally and skip if needed 201 const lockKey = `db:upsert:${did}:${site}` 202 const lockAcquired = await tryAcquireLock(lockKey) 203 204 if (!lockAcquired) { 205 this.log('Another instance is writing to DB, skipping upsert', { 206 did, 207 site 208 }) 209 this.log('Successfully processed create/update (cached locally)', { 210 did, 211 site 212 }) 213 return 214 } 215 216 try { 217 // Upsert site to database (only one instance does this) 218 // In cache-only mode, this will be a no-op 219 await upsertSite(did, site, fsRecord.site) 220 this.log( 221 'Successfully processed create/update (cached + DB updated)', 222 { did, site } 223 ) 224 } finally { 225 // Always release lock, even if DB write fails 226 await releaseLock(lockKey) 227 } 228 } 229 230 private async handleDelete(did: string, site: string) { 231 this.log('Processing delete', { did, site }) 232 233 // All instances should delete their local cache (no lock needed) 234 const pdsEndpoint = await getPdsForDid(did) 235 if (!pdsEndpoint) { 236 this.log('Could not resolve PDS for DID', { did }) 237 return 238 } 239 240 // Verify record is actually deleted from PDS 241 try { 242 const recordUrl = `${pdsEndpoint}/xrpc/com.atproto.repo.getRecord?repo=${encodeURIComponent(did)}&collection=place.wisp.fs&rkey=${encodeURIComponent(site)}` 243 const recordRes = await safeFetch(recordUrl) 244 245 if (recordRes.ok) { 246 this.log('Record still exists on PDS, not deleting cache', { 247 did, 248 site 249 }) 250 return 251 } 252 253 this.log('Verified record is deleted from PDS', { 254 did, 255 site, 256 status: recordRes.status 257 }) 258 } catch (err) { 259 this.log('Error verifying deletion on PDS', { 260 did, 261 site, 262 error: err instanceof Error ? err.message : String(err) 263 }) 264 } 265 266 // Invalidate in-memory caches 267 invalidateSiteCache(did, site) 268 269 // Delete disk cache 270 this.deleteCache(did, site) 271 272 this.log('Successfully processed delete', { did, site }) 273 } 274 275 private deleteCache(did: string, site: string) { 276 const cacheDir = `${CACHE_DIR}/${did}/${site}` 277 278 if (!existsSync(cacheDir)) { 279 this.log('Cache directory does not exist, nothing to delete', { 280 did, 281 site 282 }) 283 return 284 } 285 286 try { 287 rmSync(cacheDir, { recursive: true, force: true }) 288 this.log('Cache deleted', { did, site, path: cacheDir }) 289 } catch (err) { 290 this.log('Failed to delete cache', { 291 did, 292 site, 293 path: cacheDir, 294 error: err instanceof Error ? err.message : String(err) 295 }) 296 } 297 } 298 299 getHealth() { 300 const isConnected = this.firehose !== null 301 const timeSinceLastEvent = Date.now() - this.lastEventTime 302 303 return { 304 connected: isConnected, 305 lastEventTime: this.lastEventTime, 306 timeSinceLastEvent, 307 healthy: isConnected && timeSinceLastEvent < 300000 // 5 minutes 308 } 309 } 310}