Monorepo for wisp.place. A static site hosting service built on top of the AT Protocol. wisp.place
1import { existsSync, rmSync } from 'fs' 2import { 3 getPdsForDid, 4 downloadAndCacheSite, 5 extractBlobCid, 6 fetchSiteRecord 7} from './utils' 8import { upsertSite, tryAcquireLock, releaseLock } from './db' 9import { safeFetch } from './safe-fetch' 10import { isRecord, validateRecord } from '../lexicon/types/place/wisp/fs' 11import { Firehose } from '@atproto/sync' 12import { IdResolver } from '@atproto/identity' 13import { invalidateSiteCache, markSiteAsBeingCached, unmarkSiteAsBeingCached } from './cache' 14import { clearRedirectRulesCache } from '../server' 15 16const CACHE_DIR = './cache/sites' 17 18export class FirehoseWorker { 19 private firehose: Firehose | null = null 20 private idResolver: IdResolver 21 private isShuttingDown = false 22 private lastEventTime = Date.now() 23 24 constructor( 25 private logger?: (msg: string, data?: Record<string, unknown>) => void 26 ) { 27 this.idResolver = new IdResolver() 28 } 29 30 private log(msg: string, data?: Record<string, unknown>) { 31 const log = this.logger || console.log 32 log(`[FirehoseWorker] ${msg}`, data || {}) 33 } 34 35 start() { 36 this.log('Starting firehose worker') 37 this.connect() 38 } 39 40 stop() { 41 this.log('Stopping firehose worker') 42 this.isShuttingDown = true 43 44 if (this.firehose) { 45 this.firehose.destroy() 46 this.firehose = null 47 } 48 } 49 50 private connect() { 51 if (this.isShuttingDown) return 52 53 this.log('Connecting to AT Protocol firehose') 54 55 this.firehose = new Firehose({ 56 idResolver: this.idResolver, 57 service: 'wss://bsky.network', 58 filterCollections: ['place.wisp.fs', 'place.wisp.settings'], 59 handleEvent: async (evt: any) => { 60 this.lastEventTime = Date.now() 61 62 // Watch for write events 63 if (evt.event === 'create' || evt.event === 'update') { 64 const record = evt.record 65 66 // If the write is a valid place.wisp.fs record 67 if ( 68 evt.collection === 'place.wisp.fs' && 69 isRecord(record) && 70 validateRecord(record).success 71 ) { 72 this.log('Received place.wisp.fs event', { 73 did: evt.did, 74 event: evt.event, 75 rkey: evt.rkey 76 }) 77 78 try { 79 await this.handleCreateOrUpdate( 80 evt.did, 81 evt.rkey, 82 record, 83 evt.cid?.toString() 84 ) 85 } catch (err) { 86 console.error('Full error details:', err); 87 this.log('Error handling event', { 88 did: evt.did, 89 event: evt.event, 90 rkey: evt.rkey, 91 error: 92 err instanceof Error 93 ? err.message 94 : String(err) 95 }) 96 } 97 } 98 // Handle settings changes 99 else if (evt.collection === 'place.wisp.settings') { 100 this.log('Received place.wisp.settings event', { 101 did: evt.did, 102 event: evt.event, 103 rkey: evt.rkey 104 }) 105 106 try { 107 await this.handleSettingsChange(evt.did, evt.rkey) 108 } catch (err) { 109 this.log('Error handling settings change', { 110 did: evt.did, 111 event: evt.event, 112 rkey: evt.rkey, 113 error: 114 err instanceof Error 115 ? err.message 116 : String(err) 117 }) 118 } 119 } 120 } else if ( 121 evt.event === 'delete' && 122 evt.collection === 'place.wisp.fs' 123 ) { 124 this.log('Received delete event', { 125 did: evt.did, 126 rkey: evt.rkey 127 }) 128 129 try { 130 await this.handleDelete(evt.did, evt.rkey) 131 } catch (err) { 132 this.log('Error handling delete', { 133 did: evt.did, 134 rkey: evt.rkey, 135 error: 136 err instanceof Error ? err.message : String(err) 137 }) 138 } 139 } else if ( 140 evt.event === 'delete' && 141 evt.collection === 'place.wisp.settings' 142 ) { 143 this.log('Received settings delete event', { 144 did: evt.did, 145 rkey: evt.rkey 146 }) 147 148 try { 149 await this.handleSettingsChange(evt.did, evt.rkey) 150 } catch (err) { 151 this.log('Error handling settings delete', { 152 did: evt.did, 153 rkey: evt.rkey, 154 error: 155 err instanceof Error ? err.message : String(err) 156 }) 157 } 158 } 159 }, 160 onError: (err: any) => { 161 this.log('Firehose error', { 162 error: err instanceof Error ? err.message : String(err), 163 stack: err instanceof Error ? err.stack : undefined, 164 fullError: err 165 }) 166 console.error('Full firehose error:', err) 167 } 168 }) 169 170 this.firehose.start() 171 this.log('Firehose started') 172 } 173 174 private async handleCreateOrUpdate( 175 did: string, 176 site: string, 177 record: any, 178 eventCid?: string 179 ) { 180 this.log('Processing create/update', { did, site }) 181 182 // Record is already validated in handleEvent 183 const fsRecord = record 184 185 const pdsEndpoint = await getPdsForDid(did) 186 if (!pdsEndpoint) { 187 this.log('Could not resolve PDS for DID', { did }) 188 return 189 } 190 191 this.log('Resolved PDS', { did, pdsEndpoint }) 192 193 // Verify record exists on PDS and fetch its CID 194 this.log('Verifying record on PDS', { did, site }) 195 let verifiedCid: string 196 try { 197 const result = await fetchSiteRecord(did, site) 198 199 if (!result) { 200 this.log('Record not found on PDS, skipping cache', { 201 did, 202 site 203 }) 204 return 205 } 206 207 verifiedCid = result.cid 208 209 // Verify event CID matches PDS CID (prevent cache poisoning) 210 if (eventCid && eventCid !== verifiedCid) { 211 this.log('CID mismatch detected - potential spoofed event', { 212 did, 213 site, 214 eventCid, 215 verifiedCid 216 }) 217 return 218 } 219 220 this.log('Record verified on PDS', { did, site, cid: verifiedCid }) 221 } catch (err) { 222 this.log('Failed to verify record on PDS', { 223 did, 224 site, 225 error: err instanceof Error ? err.message : String(err) 226 }) 227 return 228 } 229 230 // Invalidate in-memory caches before updating 231 invalidateSiteCache(did, site) 232 233 // Mark site as being cached to prevent serving stale content during update 234 markSiteAsBeingCached(did, site) 235 236 try { 237 // Cache the record with verified CID (uses atomic swap internally) 238 // All instances cache locally for edge serving 239 await downloadAndCacheSite( 240 did, 241 site, 242 fsRecord, 243 pdsEndpoint, 244 verifiedCid 245 ) 246 247 // Clear redirect rules cache since the site was updated 248 clearRedirectRulesCache(did, site) 249 250 // Acquire distributed lock only for database write to prevent duplicate writes 251 // Note: upsertSite will check cache-only mode internally and skip if needed 252 const lockKey = `db:upsert:${did}:${site}` 253 const lockAcquired = await tryAcquireLock(lockKey) 254 255 if (!lockAcquired) { 256 this.log('Another instance is writing to DB, skipping upsert', { 257 did, 258 site 259 }) 260 this.log('Successfully processed create/update (cached locally)', { 261 did, 262 site 263 }) 264 return 265 } 266 267 try { 268 // Upsert site to database (only one instance does this) 269 // In cache-only mode, this will be a no-op 270 await upsertSite(did, site, fsRecord.site) 271 this.log( 272 'Successfully processed create/update (cached + DB updated)', 273 { did, site } 274 ) 275 } finally { 276 // Always release lock, even if DB write fails 277 await releaseLock(lockKey) 278 } 279 } finally { 280 // Always unmark, even if caching fails 281 unmarkSiteAsBeingCached(did, site) 282 } 283 } 284 285 private async handleDelete(did: string, site: string) { 286 this.log('Processing delete', { did, site }) 287 288 // All instances should delete their local cache (no lock needed) 289 const pdsEndpoint = await getPdsForDid(did) 290 if (!pdsEndpoint) { 291 this.log('Could not resolve PDS for DID', { did }) 292 return 293 } 294 295 // Verify record is actually deleted from PDS 296 try { 297 const recordUrl = `${pdsEndpoint}/xrpc/com.atproto.repo.getRecord?repo=${encodeURIComponent(did)}&collection=place.wisp.fs&rkey=${encodeURIComponent(site)}` 298 const recordRes = await safeFetch(recordUrl) 299 300 if (recordRes.ok) { 301 this.log('Record still exists on PDS, not deleting cache', { 302 did, 303 site 304 }) 305 return 306 } 307 308 this.log('Verified record is deleted from PDS', { 309 did, 310 site, 311 status: recordRes.status 312 }) 313 } catch (err) { 314 this.log('Error verifying deletion on PDS', { 315 did, 316 site, 317 error: err instanceof Error ? err.message : String(err) 318 }) 319 } 320 321 // Invalidate in-memory caches 322 invalidateSiteCache(did, site) 323 324 // Delete disk cache 325 this.deleteCache(did, site) 326 327 this.log('Successfully processed delete', { did, site }) 328 } 329 330 private async handleSettingsChange(did: string, rkey: string) { 331 this.log('Processing settings change', { did, rkey }) 332 333 // Invalidate in-memory caches (includes metadata which stores settings) 334 invalidateSiteCache(did, rkey) 335 336 // Check if site is already cached 337 const cacheDir = `${CACHE_DIR}/${did}/${rkey}` 338 const isCached = existsSync(cacheDir) 339 340 if (!isCached) { 341 this.log('Site not cached yet, checking if fs record exists', { did, rkey }) 342 343 // If site exists on PDS, cache it (which will include the new settings) 344 try { 345 const siteRecord = await fetchSiteRecord(did, rkey) 346 347 if (siteRecord) { 348 this.log('Site record found, triggering full cache with settings', { did, rkey }) 349 const pdsEndpoint = await getPdsForDid(did) 350 351 if (pdsEndpoint) { 352 // Mark as being cached 353 markSiteAsBeingCached(did, rkey) 354 355 try { 356 await downloadAndCacheSite(did, rkey, siteRecord.record, pdsEndpoint, siteRecord.cid) 357 this.log('Successfully cached site with new settings', { did, rkey }) 358 } finally { 359 unmarkSiteAsBeingCached(did, rkey) 360 } 361 } else { 362 this.log('Could not resolve PDS for DID', { did }) 363 } 364 } else { 365 this.log('No fs record found for site, skipping cache', { did, rkey }) 366 } 367 } catch (err) { 368 this.log('Failed to cache site after settings change', { 369 did, 370 rkey, 371 error: err instanceof Error ? err.message : String(err) 372 }) 373 } 374 375 this.log('Successfully processed settings change (new cache)', { did, rkey }) 376 return 377 } 378 379 // Site is already cached, just update the settings in metadata 380 try { 381 const { fetchSiteSettings, updateCacheMetadataSettings } = await import('./utils') 382 const settings = await fetchSiteSettings(did, rkey) 383 await updateCacheMetadataSettings(did, rkey, settings) 384 this.log('Updated cached settings', { did, rkey, hasSettings: !!settings }) 385 } catch (err) { 386 this.log('Failed to update cached settings', { 387 did, 388 rkey, 389 error: err instanceof Error ? err.message : String(err) 390 }) 391 } 392 393 this.log('Successfully processed settings change', { did, rkey }) 394 } 395 396 private deleteCache(did: string, site: string) { 397 const cacheDir = `${CACHE_DIR}/${did}/${site}` 398 399 if (!existsSync(cacheDir)) { 400 this.log('Cache directory does not exist, nothing to delete', { 401 did, 402 site 403 }) 404 return 405 } 406 407 try { 408 rmSync(cacheDir, { recursive: true, force: true }) 409 this.log('Cache deleted', { did, site, path: cacheDir }) 410 } catch (err) { 411 this.log('Failed to delete cache', { 412 did, 413 site, 414 path: cacheDir, 415 error: err instanceof Error ? err.message : String(err) 416 }) 417 } 418 } 419 420 getHealth() { 421 const isConnected = this.firehose !== null 422 const timeSinceLastEvent = Date.now() - this.lastEventTime 423 424 return { 425 connected: isConnected, 426 lastEventTime: this.lastEventTime, 427 timeSinceLastEvent, 428 healthy: isConnected && timeSinceLastEvent < 300000 // 5 minutes 429 } 430 } 431}