Monorepo for wisp.place. A static site hosting service built on top of the AT Protocol. wisp.place
1import { existsSync, rmSync } from 'fs' 2import { 3 getPdsForDid, 4 downloadAndCacheSite, 5 extractBlobCid, 6 fetchSiteRecord 7} from './utils' 8import { upsertSite, tryAcquireLock, releaseLock } from './db' 9import { safeFetch } from './safe-fetch' 10import { isRecord, validateRecord } from '../lexicon/types/place/wisp/fs' 11import { Firehose } from '@atproto/sync' 12import { IdResolver } from '@atproto/identity' 13import { invalidateSiteCache, markSiteAsBeingCached, unmarkSiteAsBeingCached } from './cache' 14 15const CACHE_DIR = './cache/sites' 16 17export class FirehoseWorker { 18 private firehose: Firehose | null = null 19 private idResolver: IdResolver 20 private isShuttingDown = false 21 private lastEventTime = Date.now() 22 23 constructor( 24 private logger?: (msg: string, data?: Record<string, unknown>) => void 25 ) { 26 this.idResolver = new IdResolver() 27 } 28 29 private log(msg: string, data?: Record<string, unknown>) { 30 const log = this.logger || console.log 31 log(`[FirehoseWorker] ${msg}`, data || {}) 32 } 33 34 start() { 35 this.log('Starting firehose worker') 36 this.connect() 37 } 38 39 stop() { 40 this.log('Stopping firehose worker') 41 this.isShuttingDown = true 42 43 if (this.firehose) { 44 this.firehose.destroy() 45 this.firehose = null 46 } 47 } 48 49 private connect() { 50 if (this.isShuttingDown) return 51 52 this.log('Connecting to AT Protocol firehose') 53 54 this.firehose = new Firehose({ 55 idResolver: this.idResolver, 56 service: 'wss://bsky.network', 57 filterCollections: ['place.wisp.fs'], 58 handleEvent: async (evt: any) => { 59 this.lastEventTime = Date.now() 60 61 // Watch for write events 62 if (evt.event === 'create' || evt.event === 'update') { 63 const record = evt.record 64 65 // If the write is a valid place.wisp.fs record 66 if ( 67 evt.collection === 'place.wisp.fs' && 68 isRecord(record) && 69 validateRecord(record).success 70 ) { 71 this.log('Received place.wisp.fs event', { 72 did: evt.did, 73 event: evt.event, 74 rkey: evt.rkey 75 }) 76 77 try { 78 await this.handleCreateOrUpdate( 79 evt.did, 80 evt.rkey, 81 record, 82 evt.cid?.toString() 83 ) 84 } catch (err) { 85 this.log('Error handling event', { 86 did: evt.did, 87 event: evt.event, 88 rkey: evt.rkey, 89 error: 90 err instanceof Error 91 ? err.message 92 : String(err) 93 }) 94 } 95 } 96 } else if ( 97 evt.event === 'delete' && 98 evt.collection === 'place.wisp.fs' 99 ) { 100 this.log('Received delete event', { 101 did: evt.did, 102 rkey: evt.rkey 103 }) 104 105 try { 106 await this.handleDelete(evt.did, evt.rkey) 107 } catch (err) { 108 this.log('Error handling delete', { 109 did: evt.did, 110 rkey: evt.rkey, 111 error: 112 err instanceof Error ? err.message : String(err) 113 }) 114 } 115 } 116 }, 117 onError: (err: any) => { 118 this.log('Firehose error', { 119 error: err instanceof Error ? err.message : String(err), 120 stack: err instanceof Error ? err.stack : undefined, 121 fullError: err 122 }) 123 console.error('Full firehose error:', err) 124 } 125 }) 126 127 this.firehose.start() 128 this.log('Firehose started') 129 } 130 131 private async handleCreateOrUpdate( 132 did: string, 133 site: string, 134 record: any, 135 eventCid?: string 136 ) { 137 this.log('Processing create/update', { did, site }) 138 139 // Record is already validated in handleEvent 140 const fsRecord = record 141 142 const pdsEndpoint = await getPdsForDid(did) 143 if (!pdsEndpoint) { 144 this.log('Could not resolve PDS for DID', { did }) 145 return 146 } 147 148 this.log('Resolved PDS', { did, pdsEndpoint }) 149 150 // Verify record exists on PDS and fetch its CID 151 this.log('Verifying record on PDS', { did, site }) 152 let verifiedCid: string 153 try { 154 const result = await fetchSiteRecord(did, site) 155 156 if (!result) { 157 this.log('Record not found on PDS, skipping cache', { 158 did, 159 site 160 }) 161 return 162 } 163 164 verifiedCid = result.cid 165 166 // Verify event CID matches PDS CID (prevent cache poisoning) 167 if (eventCid && eventCid !== verifiedCid) { 168 this.log('CID mismatch detected - potential spoofed event', { 169 did, 170 site, 171 eventCid, 172 verifiedCid 173 }) 174 return 175 } 176 177 this.log('Record verified on PDS', { did, site, cid: verifiedCid }) 178 } catch (err) { 179 this.log('Failed to verify record on PDS', { 180 did, 181 site, 182 error: err instanceof Error ? err.message : String(err) 183 }) 184 return 185 } 186 187 // Invalidate in-memory caches before updating 188 invalidateSiteCache(did, site) 189 190 // Mark site as being cached to prevent serving stale content during update 191 markSiteAsBeingCached(did, site) 192 193 try { 194 // Cache the record with verified CID (uses atomic swap internally) 195 // All instances cache locally for edge serving 196 await downloadAndCacheSite( 197 did, 198 site, 199 fsRecord, 200 pdsEndpoint, 201 verifiedCid 202 ) 203 204 // Acquire distributed lock only for database write to prevent duplicate writes 205 // Note: upsertSite will check cache-only mode internally and skip if needed 206 const lockKey = `db:upsert:${did}:${site}` 207 const lockAcquired = await tryAcquireLock(lockKey) 208 209 if (!lockAcquired) { 210 this.log('Another instance is writing to DB, skipping upsert', { 211 did, 212 site 213 }) 214 this.log('Successfully processed create/update (cached locally)', { 215 did, 216 site 217 }) 218 return 219 } 220 221 try { 222 // Upsert site to database (only one instance does this) 223 // In cache-only mode, this will be a no-op 224 await upsertSite(did, site, fsRecord.site) 225 this.log( 226 'Successfully processed create/update (cached + DB updated)', 227 { did, site } 228 ) 229 } finally { 230 // Always release lock, even if DB write fails 231 await releaseLock(lockKey) 232 } 233 } finally { 234 // Always unmark, even if caching fails 235 unmarkSiteAsBeingCached(did, site) 236 } 237 } 238 239 private async handleDelete(did: string, site: string) { 240 this.log('Processing delete', { did, site }) 241 242 // All instances should delete their local cache (no lock needed) 243 const pdsEndpoint = await getPdsForDid(did) 244 if (!pdsEndpoint) { 245 this.log('Could not resolve PDS for DID', { did }) 246 return 247 } 248 249 // Verify record is actually deleted from PDS 250 try { 251 const recordUrl = `${pdsEndpoint}/xrpc/com.atproto.repo.getRecord?repo=${encodeURIComponent(did)}&collection=place.wisp.fs&rkey=${encodeURIComponent(site)}` 252 const recordRes = await safeFetch(recordUrl) 253 254 if (recordRes.ok) { 255 this.log('Record still exists on PDS, not deleting cache', { 256 did, 257 site 258 }) 259 return 260 } 261 262 this.log('Verified record is deleted from PDS', { 263 did, 264 site, 265 status: recordRes.status 266 }) 267 } catch (err) { 268 this.log('Error verifying deletion on PDS', { 269 did, 270 site, 271 error: err instanceof Error ? err.message : String(err) 272 }) 273 } 274 275 // Invalidate in-memory caches 276 invalidateSiteCache(did, site) 277 278 // Delete disk cache 279 this.deleteCache(did, site) 280 281 this.log('Successfully processed delete', { did, site }) 282 } 283 284 private deleteCache(did: string, site: string) { 285 const cacheDir = `${CACHE_DIR}/${did}/${site}` 286 287 if (!existsSync(cacheDir)) { 288 this.log('Cache directory does not exist, nothing to delete', { 289 did, 290 site 291 }) 292 return 293 } 294 295 try { 296 rmSync(cacheDir, { recursive: true, force: true }) 297 this.log('Cache deleted', { did, site, path: cacheDir }) 298 } catch (err) { 299 this.log('Failed to delete cache', { 300 did, 301 site, 302 path: cacheDir, 303 error: err instanceof Error ? err.message : String(err) 304 }) 305 } 306 } 307 308 getHealth() { 309 const isConnected = this.firehose !== null 310 const timeSinceLastEvent = Date.now() - this.lastEventTime 311 312 return { 313 connected: isConnected, 314 lastEventTime: this.lastEventTime, 315 timeSinceLastEvent, 316 healthy: isConnected && timeSinceLastEvent < 300000 // 5 minutes 317 } 318 } 319}