Monorepo for wisp.place. A static site hosting service built on top of the AT Protocol. wisp.place
1import { existsSync, rmSync } from 'fs' 2import { 3 getPdsForDid, 4 downloadAndCacheSite, 5 extractBlobCid, 6 fetchSiteRecord 7} from './utils' 8import { upsertSite, tryAcquireLock, releaseLock } from './db' 9import { safeFetch } from './safe-fetch' 10import { isRecord, validateRecord } from '../lexicon/types/place/wisp/fs' 11import { Firehose } from '@atproto/sync' 12import { IdResolver } from '@atproto/identity' 13import { invalidateSiteCache, markSiteAsBeingCached, unmarkSiteAsBeingCached } from './cache' 14import { clearRedirectRulesCache } from '../server' 15 16const CACHE_DIR = './cache/sites' 17 18export class FirehoseWorker { 19 private firehose: Firehose | null = null 20 private idResolver: IdResolver 21 private isShuttingDown = false 22 private lastEventTime = Date.now() 23 24 constructor( 25 private logger?: (msg: string, data?: Record<string, unknown>) => void 26 ) { 27 this.idResolver = new IdResolver() 28 } 29 30 private log(msg: string, data?: Record<string, unknown>) { 31 const log = this.logger || console.log 32 log(`[FirehoseWorker] ${msg}`, data || {}) 33 } 34 35 start() { 36 this.log('Starting firehose worker') 37 this.connect() 38 } 39 40 stop() { 41 this.log('Stopping firehose worker') 42 this.isShuttingDown = true 43 44 if (this.firehose) { 45 this.firehose.destroy() 46 this.firehose = null 47 } 48 } 49 50 private connect() { 51 if (this.isShuttingDown) return 52 53 this.log('Connecting to AT Protocol firehose') 54 55 this.firehose = new Firehose({ 56 idResolver: this.idResolver, 57 service: 'wss://bsky.network', 58 filterCollections: ['place.wisp.fs', 'place.wisp.settings'], 59 handleEvent: async (evt: any) => { 60 this.lastEventTime = Date.now() 61 62 // Watch for write events 63 if (evt.event === 'create' || evt.event === 'update') { 64 const record = evt.record 65 66 // If the write is a valid place.wisp.fs record 67 if ( 68 evt.collection === 'place.wisp.fs' && 69 isRecord(record) && 70 validateRecord(record).success 71 ) { 72 this.log('Received place.wisp.fs event', { 73 did: evt.did, 74 event: evt.event, 75 rkey: evt.rkey 76 }) 77 78 try { 79 await this.handleCreateOrUpdate( 80 evt.did, 81 evt.rkey, 82 record, 83 evt.cid?.toString() 84 ) 85 } catch (err) { 86 console.error('Full error details:', err); 87 this.log('Error handling event', { 88 did: evt.did, 89 event: evt.event, 90 rkey: evt.rkey, 91 error: 92 err instanceof Error 93 ? err.message 94 : String(err) 95 }) 96 } 97 } 98 // Handle settings changes 99 else if (evt.collection === 'place.wisp.settings') { 100 this.log('Received place.wisp.settings event', { 101 did: evt.did, 102 event: evt.event, 103 rkey: evt.rkey 104 }) 105 106 try { 107 await this.handleSettingsChange(evt.did, evt.rkey) 108 } catch (err) { 109 this.log('Error handling settings change', { 110 did: evt.did, 111 event: evt.event, 112 rkey: evt.rkey, 113 error: 114 err instanceof Error 115 ? err.message 116 : String(err) 117 }) 118 } 119 } 120 } else if ( 121 evt.event === 'delete' && 122 evt.collection === 'place.wisp.fs' 123 ) { 124 this.log('Received delete event', { 125 did: evt.did, 126 rkey: evt.rkey 127 }) 128 129 try { 130 await this.handleDelete(evt.did, evt.rkey) 131 } catch (err) { 132 this.log('Error handling delete', { 133 did: evt.did, 134 rkey: evt.rkey, 135 error: 136 err instanceof Error ? err.message : String(err) 137 }) 138 } 139 } else if ( 140 evt.event === 'delete' && 141 evt.collection === 'place.wisp.settings' 142 ) { 143 this.log('Received settings delete event', { 144 did: evt.did, 145 rkey: evt.rkey 146 }) 147 148 try { 149 await this.handleSettingsChange(evt.did, evt.rkey) 150 } catch (err) { 151 this.log('Error handling settings delete', { 152 did: evt.did, 153 rkey: evt.rkey, 154 error: 155 err instanceof Error ? err.message : String(err) 156 }) 157 } 158 } 159 }, 160 onError: (err: any) => { 161 this.log('Firehose error', { 162 error: err instanceof Error ? err.message : String(err), 163 stack: err instanceof Error ? err.stack : undefined, 164 fullError: err 165 }) 166 console.error('Full firehose error:', err) 167 } 168 }) 169 170 this.firehose.start() 171 this.log('Firehose started') 172 } 173 174 private async handleCreateOrUpdate( 175 did: string, 176 site: string, 177 record: any, 178 eventCid?: string 179 ) { 180 this.log('Processing create/update', { did, site }) 181 182 // Record is already validated in handleEvent 183 const fsRecord = record 184 185 const pdsEndpoint = await getPdsForDid(did) 186 if (!pdsEndpoint) { 187 this.log('Could not resolve PDS for DID', { did }) 188 return 189 } 190 191 this.log('Resolved PDS', { did, pdsEndpoint }) 192 193 // Verify record exists on PDS and fetch its CID 194 this.log('Verifying record on PDS', { did, site }) 195 let verifiedCid: string 196 try { 197 const result = await fetchSiteRecord(did, site) 198 199 if (!result) { 200 this.log('Record not found on PDS, skipping cache', { 201 did, 202 site 203 }) 204 return 205 } 206 207 verifiedCid = result.cid 208 209 // Verify event CID matches PDS CID (prevent cache poisoning) 210 if (eventCid && eventCid !== verifiedCid) { 211 this.log('CID mismatch detected - potential spoofed event', { 212 did, 213 site, 214 eventCid, 215 verifiedCid 216 }) 217 return 218 } 219 220 this.log('Record verified on PDS', { did, site, cid: verifiedCid }) 221 } catch (err) { 222 this.log('Failed to verify record on PDS', { 223 did, 224 site, 225 error: err instanceof Error ? err.message : String(err) 226 }) 227 return 228 } 229 230 // Invalidate in-memory caches before updating 231 invalidateSiteCache(did, site) 232 233 // Mark site as being cached to prevent serving stale content during update 234 markSiteAsBeingCached(did, site) 235 236 try { 237 // Cache the record with verified CID (uses atomic swap internally) 238 // All instances cache locally for edge serving 239 await downloadAndCacheSite( 240 did, 241 site, 242 fsRecord, 243 pdsEndpoint, 244 verifiedCid 245 ) 246 247 // Clear redirect rules cache since the site was updated 248 clearRedirectRulesCache(did, site) 249 250 // Acquire distributed lock only for database write to prevent duplicate writes 251 // Note: upsertSite will check cache-only mode internally and skip if needed 252 const lockKey = `db:upsert:${did}:${site}` 253 const lockAcquired = await tryAcquireLock(lockKey) 254 255 if (!lockAcquired) { 256 this.log('Another instance is writing to DB, skipping upsert', { 257 did, 258 site 259 }) 260 this.log('Successfully processed create/update (cached locally)', { 261 did, 262 site 263 }) 264 return 265 } 266 267 try { 268 // Upsert site to database (only one instance does this) 269 // In cache-only mode, this will be a no-op 270 await upsertSite(did, site, fsRecord.site) 271 this.log( 272 'Successfully processed create/update (cached + DB updated)', 273 { did, site } 274 ) 275 } finally { 276 // Always release lock, even if DB write fails 277 await releaseLock(lockKey) 278 } 279 } finally { 280 // Always unmark, even if caching fails 281 unmarkSiteAsBeingCached(did, site) 282 } 283 } 284 285 private async handleDelete(did: string, site: string) { 286 this.log('Processing delete', { did, site }) 287 288 // All instances should delete their local cache (no lock needed) 289 const pdsEndpoint = await getPdsForDid(did) 290 if (!pdsEndpoint) { 291 this.log('Could not resolve PDS for DID', { did }) 292 return 293 } 294 295 // Verify record is actually deleted from PDS 296 try { 297 const recordUrl = `${pdsEndpoint}/xrpc/com.atproto.repo.getRecord?repo=${encodeURIComponent(did)}&collection=place.wisp.fs&rkey=${encodeURIComponent(site)}` 298 const recordRes = await safeFetch(recordUrl) 299 300 if (recordRes.ok) { 301 this.log('Record still exists on PDS, not deleting cache', { 302 did, 303 site 304 }) 305 return 306 } 307 308 this.log('Verified record is deleted from PDS', { 309 did, 310 site, 311 status: recordRes.status 312 }) 313 } catch (err) { 314 this.log('Error verifying deletion on PDS', { 315 did, 316 site, 317 error: err instanceof Error ? err.message : String(err) 318 }) 319 } 320 321 // Invalidate in-memory caches 322 invalidateSiteCache(did, site) 323 324 // Delete disk cache 325 this.deleteCache(did, site) 326 327 this.log('Successfully processed delete', { did, site }) 328 } 329 330 private async handleSettingsChange(did: string, rkey: string) { 331 this.log('Processing settings change', { did, rkey }) 332 333 // Invalidate in-memory caches (includes metadata which stores settings) 334 invalidateSiteCache(did, rkey) 335 336 // Update on-disk metadata with new settings 337 try { 338 const { fetchSiteSettings, updateCacheMetadataSettings } = await import('./utils') 339 const settings = await fetchSiteSettings(did, rkey) 340 await updateCacheMetadataSettings(did, rkey, settings) 341 this.log('Updated cached settings', { did, rkey, hasSettings: !!settings }) 342 } catch (err) { 343 this.log('Failed to update cached settings', { 344 did, 345 rkey, 346 error: err instanceof Error ? err.message : String(err) 347 }) 348 } 349 350 this.log('Successfully processed settings change', { did, rkey }) 351 } 352 353 private deleteCache(did: string, site: string) { 354 const cacheDir = `${CACHE_DIR}/${did}/${site}` 355 356 if (!existsSync(cacheDir)) { 357 this.log('Cache directory does not exist, nothing to delete', { 358 did, 359 site 360 }) 361 return 362 } 363 364 try { 365 rmSync(cacheDir, { recursive: true, force: true }) 366 this.log('Cache deleted', { did, site, path: cacheDir }) 367 } catch (err) { 368 this.log('Failed to delete cache', { 369 did, 370 site, 371 path: cacheDir, 372 error: err instanceof Error ? err.message : String(err) 373 }) 374 } 375 } 376 377 getHealth() { 378 const isConnected = this.firehose !== null 379 const timeSinceLastEvent = Date.now() - this.lastEventTime 380 381 return { 382 connected: isConnected, 383 lastEventTime: this.lastEventTime, 384 timeSinceLastEvent, 385 healthy: isConnected && timeSinceLastEvent < 300000 // 5 minutes 386 } 387 } 388}