Monorepo for wisp.place. A static site hosting service built on top of the AT Protocol. wisp.place
1import { existsSync, rmSync } from 'fs' 2import { 3 getPdsForDid, 4 downloadAndCacheSite, 5 extractBlobCid, 6 fetchSiteRecord 7} from './utils' 8import { upsertSite, tryAcquireLock, releaseLock } from './db' 9import { safeFetch } from './safe-fetch' 10import { isRecord, validateRecord } from '../lexicon/types/place/wisp/fs' 11import { Firehose } from '@atproto/sync' 12import { IdResolver } from '@atproto/identity' 13import { invalidateSiteCache, markSiteAsBeingCached, unmarkSiteAsBeingCached } from './cache' 14 15const CACHE_DIR = './cache/sites' 16 17export class FirehoseWorker { 18 private firehose: Firehose | null = null 19 private idResolver: IdResolver 20 private isShuttingDown = false 21 private lastEventTime = Date.now() 22 23 constructor( 24 private logger?: (msg: string, data?: Record<string, unknown>) => void 25 ) { 26 this.idResolver = new IdResolver() 27 } 28 29 private log(msg: string, data?: Record<string, unknown>) { 30 const log = this.logger || console.log 31 log(`[FirehoseWorker] ${msg}`, data || {}) 32 } 33 34 start() { 35 this.log('Starting firehose worker') 36 this.connect() 37 } 38 39 stop() { 40 this.log('Stopping firehose worker') 41 this.isShuttingDown = true 42 43 if (this.firehose) { 44 this.firehose.destroy() 45 this.firehose = null 46 } 47 } 48 49 private connect() { 50 if (this.isShuttingDown) return 51 52 this.log('Connecting to AT Protocol firehose') 53 54 this.firehose = new Firehose({ 55 idResolver: this.idResolver, 56 service: 'wss://bsky.network', 57 filterCollections: ['place.wisp.fs'], 58 handleEvent: async (evt: any) => { 59 this.lastEventTime = Date.now() 60 61 // Watch for write events 62 if (evt.event === 'create' || evt.event === 'update') { 63 const record = evt.record 64 65 // If the write is a valid place.wisp.fs record 66 if ( 67 evt.collection === 'place.wisp.fs' && 68 isRecord(record) && 69 validateRecord(record).success 70 ) { 71 this.log('Received place.wisp.fs event', { 72 did: evt.did, 73 event: evt.event, 74 rkey: evt.rkey 75 }) 76 77 try { 78 await this.handleCreateOrUpdate( 79 evt.did, 80 evt.rkey, 81 record, 82 evt.cid?.toString() 83 ) 84 } catch (err) { 85 console.error('Full error details:', err); 86 this.log('Error handling event', { 87 did: evt.did, 88 event: evt.event, 89 rkey: evt.rkey, 90 error: 91 err instanceof Error 92 ? err.message 93 : String(err) 94 }) 95 } 96 } 97 } else if ( 98 evt.event === 'delete' && 99 evt.collection === 'place.wisp.fs' 100 ) { 101 this.log('Received delete event', { 102 did: evt.did, 103 rkey: evt.rkey 104 }) 105 106 try { 107 await this.handleDelete(evt.did, evt.rkey) 108 } catch (err) { 109 this.log('Error handling delete', { 110 did: evt.did, 111 rkey: evt.rkey, 112 error: 113 err instanceof Error ? err.message : String(err) 114 }) 115 } 116 } 117 }, 118 onError: (err: any) => { 119 this.log('Firehose error', { 120 error: err instanceof Error ? err.message : String(err), 121 stack: err instanceof Error ? err.stack : undefined, 122 fullError: err 123 }) 124 console.error('Full firehose error:', err) 125 } 126 }) 127 128 this.firehose.start() 129 this.log('Firehose started') 130 } 131 132 private async handleCreateOrUpdate( 133 did: string, 134 site: string, 135 record: any, 136 eventCid?: string 137 ) { 138 this.log('Processing create/update', { did, site }) 139 140 // Record is already validated in handleEvent 141 const fsRecord = record 142 143 const pdsEndpoint = await getPdsForDid(did) 144 if (!pdsEndpoint) { 145 this.log('Could not resolve PDS for DID', { did }) 146 return 147 } 148 149 this.log('Resolved PDS', { did, pdsEndpoint }) 150 151 // Verify record exists on PDS and fetch its CID 152 this.log('Verifying record on PDS', { did, site }) 153 let verifiedCid: string 154 try { 155 const result = await fetchSiteRecord(did, site) 156 157 if (!result) { 158 this.log('Record not found on PDS, skipping cache', { 159 did, 160 site 161 }) 162 return 163 } 164 165 verifiedCid = result.cid 166 167 // Verify event CID matches PDS CID (prevent cache poisoning) 168 if (eventCid && eventCid !== verifiedCid) { 169 this.log('CID mismatch detected - potential spoofed event', { 170 did, 171 site, 172 eventCid, 173 verifiedCid 174 }) 175 return 176 } 177 178 this.log('Record verified on PDS', { did, site, cid: verifiedCid }) 179 } catch (err) { 180 this.log('Failed to verify record on PDS', { 181 did, 182 site, 183 error: err instanceof Error ? err.message : String(err) 184 }) 185 return 186 } 187 188 // Invalidate in-memory caches before updating 189 invalidateSiteCache(did, site) 190 191 // Mark site as being cached to prevent serving stale content during update 192 markSiteAsBeingCached(did, site) 193 194 try { 195 // Cache the record with verified CID (uses atomic swap internally) 196 // All instances cache locally for edge serving 197 await downloadAndCacheSite( 198 did, 199 site, 200 fsRecord, 201 pdsEndpoint, 202 verifiedCid 203 ) 204 205 // Acquire distributed lock only for database write to prevent duplicate writes 206 // Note: upsertSite will check cache-only mode internally and skip if needed 207 const lockKey = `db:upsert:${did}:${site}` 208 const lockAcquired = await tryAcquireLock(lockKey) 209 210 if (!lockAcquired) { 211 this.log('Another instance is writing to DB, skipping upsert', { 212 did, 213 site 214 }) 215 this.log('Successfully processed create/update (cached locally)', { 216 did, 217 site 218 }) 219 return 220 } 221 222 try { 223 // Upsert site to database (only one instance does this) 224 // In cache-only mode, this will be a no-op 225 await upsertSite(did, site, fsRecord.site) 226 this.log( 227 'Successfully processed create/update (cached + DB updated)', 228 { did, site } 229 ) 230 } finally { 231 // Always release lock, even if DB write fails 232 await releaseLock(lockKey) 233 } 234 } finally { 235 // Always unmark, even if caching fails 236 unmarkSiteAsBeingCached(did, site) 237 } 238 } 239 240 private async handleDelete(did: string, site: string) { 241 this.log('Processing delete', { did, site }) 242 243 // All instances should delete their local cache (no lock needed) 244 const pdsEndpoint = await getPdsForDid(did) 245 if (!pdsEndpoint) { 246 this.log('Could not resolve PDS for DID', { did }) 247 return 248 } 249 250 // Verify record is actually deleted from PDS 251 try { 252 const recordUrl = `${pdsEndpoint}/xrpc/com.atproto.repo.getRecord?repo=${encodeURIComponent(did)}&collection=place.wisp.fs&rkey=${encodeURIComponent(site)}` 253 const recordRes = await safeFetch(recordUrl) 254 255 if (recordRes.ok) { 256 this.log('Record still exists on PDS, not deleting cache', { 257 did, 258 site 259 }) 260 return 261 } 262 263 this.log('Verified record is deleted from PDS', { 264 did, 265 site, 266 status: recordRes.status 267 }) 268 } catch (err) { 269 this.log('Error verifying deletion on PDS', { 270 did, 271 site, 272 error: err instanceof Error ? err.message : String(err) 273 }) 274 } 275 276 // Invalidate in-memory caches 277 invalidateSiteCache(did, site) 278 279 // Delete disk cache 280 this.deleteCache(did, site) 281 282 this.log('Successfully processed delete', { did, site }) 283 } 284 285 private deleteCache(did: string, site: string) { 286 const cacheDir = `${CACHE_DIR}/${did}/${site}` 287 288 if (!existsSync(cacheDir)) { 289 this.log('Cache directory does not exist, nothing to delete', { 290 did, 291 site 292 }) 293 return 294 } 295 296 try { 297 rmSync(cacheDir, { recursive: true, force: true }) 298 this.log('Cache deleted', { did, site, path: cacheDir }) 299 } catch (err) { 300 this.log('Failed to delete cache', { 301 did, 302 site, 303 path: cacheDir, 304 error: err instanceof Error ? err.message : String(err) 305 }) 306 } 307 } 308 309 getHealth() { 310 const isConnected = this.firehose !== null 311 const timeSinceLastEvent = Date.now() - this.lastEventTime 312 313 return { 314 connected: isConnected, 315 lastEventTime: this.lastEventTime, 316 timeSinceLastEvent, 317 healthy: isConnected && timeSinceLastEvent < 300000 // 5 minutes 318 } 319 } 320}