Monorepo for wisp.place. A static site hosting service built on top of the AT Protocol. wisp.place
at main 12 kB view raw
1import { existsSync, rmSync } from 'fs' 2import { 3 getPdsForDid, 4 downloadAndCacheSite, 5 fetchSiteRecord 6} from './utils' 7import { upsertSite, tryAcquireLock, releaseLock } from './db' 8import { safeFetch } from '@wisp/safe-fetch' 9// import { isRecord, validateRecord } from '@wisp/lexicons/types/place/wisp/fs' 10import { isRecord } from '@wisp/lexicons/types/place/wisp/fs' 11import { Firehose } from '@atproto/sync' 12import { IdResolver } from '@atproto/identity' 13import { invalidateSiteCache, markSiteAsBeingCached, unmarkSiteAsBeingCached } from './cache' 14import { clearRedirectRulesCache } from './site-cache' 15 16const CACHE_DIR = './cache/sites' 17 18export class FirehoseWorker { 19 private firehose: Firehose | null = null 20 private idResolver: IdResolver 21 private isShuttingDown = false 22 private lastEventTime = Date.now() 23 private cacheCleanupInterval: NodeJS.Timeout | null = null 24 25 constructor( 26 private logger?: (msg: string, data?: Record<string, unknown>) => void 27 ) { 28 this.idResolver = new IdResolver() 29 this.startCacheCleanup() 30 } 31 32 private log(msg: string, data?: Record<string, unknown>) { 33 const log = this.logger || console.log 34 log(`[FirehoseWorker] ${msg}`, data || {}) 35 } 36 37 private startCacheCleanup() { 38 // Clear IdResolver cache every hour to prevent unbounded memory growth 39 // The IdResolver has an internal cache that never expires and can cause heap exhaustion 40 this.cacheCleanupInterval = setInterval(() => { 41 if (this.isShuttingDown) return 42 43 this.log('Clearing IdResolver cache to prevent memory leak') 44 45 // Recreate the IdResolver to clear its internal cache 46 this.idResolver = new IdResolver() 47 48 this.log('IdResolver cache cleared') 49 }, 60 * 60 * 1000) // Every hour 50 } 51 52 start() { 53 this.log('Starting firehose worker') 54 this.connect() 55 } 56 57 stop() { 58 this.log('Stopping firehose worker') 59 this.isShuttingDown = true 60 61 if (this.cacheCleanupInterval) { 62 clearInterval(this.cacheCleanupInterval) 63 this.cacheCleanupInterval = null 64 } 65 66 if (this.firehose) { 67 this.firehose.destroy() 68 this.firehose = null 69 } 70 } 71 72 private connect() { 73 if (this.isShuttingDown) return 74 75 this.log('Connecting to AT Protocol firehose') 76 77 this.firehose = new Firehose({ 78 idResolver: this.idResolver, 79 service: 'wss://bsky.network', 80 filterCollections: ['place.wisp.fs', 'place.wisp.settings'], 81 handleEvent: async (evt: any) => { 82 this.lastEventTime = Date.now() 83 84 // Watch for write events 85 if (evt.event === 'create' || evt.event === 'update') { 86 const record = evt.record 87 88 // If the write is a valid place.wisp.fs record 89 if ( 90 evt.collection === 'place.wisp.fs' && 91 isRecord(record) 92 // && validateRecord(record).success 93 ) { 94 this.log('Received place.wisp.fs event', { 95 did: evt.did, 96 event: evt.event, 97 rkey: evt.rkey 98 }) 99 100 try { 101 await this.handleCreateOrUpdate( 102 evt.did, 103 evt.rkey, 104 record, 105 evt.cid?.toString() 106 ) 107 } catch (err) { 108 console.error('Full error details:', err); 109 this.log('Error handling event', { 110 did: evt.did, 111 event: evt.event, 112 rkey: evt.rkey, 113 error: 114 err instanceof Error 115 ? err.message 116 : String(err) 117 }) 118 } 119 } 120 // Handle settings changes 121 else if (evt.collection === 'place.wisp.settings') { 122 this.log('Received place.wisp.settings event', { 123 did: evt.did, 124 event: evt.event, 125 rkey: evt.rkey 126 }) 127 128 try { 129 await this.handleSettingsChange(evt.did, evt.rkey) 130 } catch (err) { 131 this.log('Error handling settings change', { 132 did: evt.did, 133 event: evt.event, 134 rkey: evt.rkey, 135 error: 136 err instanceof Error 137 ? err.message 138 : String(err) 139 }) 140 } 141 } 142 } else if ( 143 evt.event === 'delete' && 144 evt.collection === 'place.wisp.fs' 145 ) { 146 this.log('Received delete event', { 147 did: evt.did, 148 rkey: evt.rkey 149 }) 150 151 try { 152 await this.handleDelete(evt.did, evt.rkey) 153 } catch (err) { 154 this.log('Error handling delete', { 155 did: evt.did, 156 rkey: evt.rkey, 157 error: 158 err instanceof Error ? err.message : String(err) 159 }) 160 } 161 } else if ( 162 evt.event === 'delete' && 163 evt.collection === 'place.wisp.settings' 164 ) { 165 this.log('Received settings delete event', { 166 did: evt.did, 167 rkey: evt.rkey 168 }) 169 170 try { 171 await this.handleSettingsChange(evt.did, evt.rkey) 172 } catch (err) { 173 this.log('Error handling settings delete', { 174 did: evt.did, 175 rkey: evt.rkey, 176 error: 177 err instanceof Error ? err.message : String(err) 178 }) 179 } 180 } 181 }, 182 onError: (err: any) => { 183 this.log('Firehose error', { 184 error: err instanceof Error ? err.message : String(err), 185 stack: err instanceof Error ? err.stack : undefined, 186 fullError: err 187 }) 188 console.error('Full firehose error:', err) 189 } 190 }) 191 192 this.firehose.start() 193 this.log('Firehose started') 194 } 195 196 private async handleCreateOrUpdate( 197 did: string, 198 site: string, 199 record: any, 200 eventCid?: string 201 ) { 202 this.log('Processing create/update', { did, site }) 203 204 // Record is already validated in handleEvent 205 const fsRecord = record 206 207 const pdsEndpoint = await getPdsForDid(did) 208 if (!pdsEndpoint) { 209 this.log('Could not resolve PDS for DID', { did }) 210 return 211 } 212 213 this.log('Resolved PDS', { did, pdsEndpoint }) 214 215 // Verify record exists on PDS and fetch its CID 216 this.log('Verifying record on PDS', { did, site }) 217 let verifiedCid: string 218 try { 219 const result = await fetchSiteRecord(did, site) 220 221 if (!result) { 222 this.log('Record not found on PDS, skipping cache', { 223 did, 224 site 225 }) 226 return 227 } 228 229 verifiedCid = result.cid 230 231 // Verify event CID matches PDS CID (prevent cache poisoning) 232 if (eventCid && eventCid !== verifiedCid) { 233 this.log('CID mismatch detected - potential spoofed event', { 234 did, 235 site, 236 eventCid, 237 verifiedCid 238 }) 239 return 240 } 241 242 this.log('Record verified on PDS', { did, site, cid: verifiedCid }) 243 } catch (err) { 244 this.log('Failed to verify record on PDS', { 245 did, 246 site, 247 error: err instanceof Error ? err.message : String(err) 248 }) 249 return 250 } 251 252 // Invalidate in-memory caches before updating 253 invalidateSiteCache(did, site) 254 255 // Mark site as being cached to prevent serving stale content during update 256 markSiteAsBeingCached(did, site) 257 258 try { 259 // Cache the record with verified CID (uses atomic swap internally) 260 // All instances cache locally for edge serving 261 await downloadAndCacheSite( 262 did, 263 site, 264 fsRecord, 265 pdsEndpoint, 266 verifiedCid 267 ) 268 269 // Clear redirect rules cache since the site was updated 270 clearRedirectRulesCache(did, site) 271 272 // Acquire distributed lock only for database write to prevent duplicate writes 273 // Note: upsertSite will check cache-only mode internally and skip if needed 274 const lockKey = `db:upsert:${did}:${site}` 275 const lockAcquired = await tryAcquireLock(lockKey) 276 277 if (!lockAcquired) { 278 this.log('Another instance is writing to DB, skipping upsert', { 279 did, 280 site 281 }) 282 this.log('Successfully processed create/update (cached locally)', { 283 did, 284 site 285 }) 286 return 287 } 288 289 try { 290 // Upsert site to database (only one instance does this) 291 // In cache-only mode, this will be a no-op 292 await upsertSite(did, site, fsRecord.site) 293 this.log( 294 'Successfully processed create/update (cached + DB updated)', 295 { did, site } 296 ) 297 } finally { 298 // Always release lock, even if DB write fails 299 await releaseLock(lockKey) 300 } 301 } finally { 302 // Always unmark, even if caching fails 303 unmarkSiteAsBeingCached(did, site) 304 } 305 } 306 307 private async handleDelete(did: string, site: string) { 308 this.log('Processing delete', { did, site }) 309 310 // All instances should delete their local cache (no lock needed) 311 const pdsEndpoint = await getPdsForDid(did) 312 if (!pdsEndpoint) { 313 this.log('Could not resolve PDS for DID', { did }) 314 return 315 } 316 317 // Verify record is actually deleted from PDS 318 try { 319 const recordUrl = `${pdsEndpoint}/xrpc/com.atproto.repo.getRecord?repo=${encodeURIComponent(did)}&collection=place.wisp.fs&rkey=${encodeURIComponent(site)}` 320 const recordRes = await safeFetch(recordUrl) 321 322 if (recordRes.ok) { 323 this.log('Record still exists on PDS, not deleting cache', { 324 did, 325 site 326 }) 327 return 328 } 329 330 this.log('Verified record is deleted from PDS', { 331 did, 332 site, 333 status: recordRes.status 334 }) 335 } catch (err) { 336 this.log('Error verifying deletion on PDS', { 337 did, 338 site, 339 error: err instanceof Error ? err.message : String(err) 340 }) 341 } 342 343 // Invalidate in-memory caches 344 invalidateSiteCache(did, site) 345 346 // Delete disk cache 347 this.deleteCache(did, site) 348 349 this.log('Successfully processed delete', { did, site }) 350 } 351 352 private async handleSettingsChange(did: string, rkey: string) { 353 this.log('Processing settings change', { did, rkey }) 354 355 // Invalidate in-memory caches (includes metadata which stores settings) 356 invalidateSiteCache(did, rkey) 357 358 // Check if site is already cached 359 const cacheDir = `${CACHE_DIR}/${did}/${rkey}` 360 const isCached = existsSync(cacheDir) 361 362 if (!isCached) { 363 this.log('Site not cached yet, checking if fs record exists', { did, rkey }) 364 365 // If site exists on PDS, cache it (which will include the new settings) 366 try { 367 const siteRecord = await fetchSiteRecord(did, rkey) 368 369 if (siteRecord) { 370 this.log('Site record found, triggering full cache with settings', { did, rkey }) 371 const pdsEndpoint = await getPdsForDid(did) 372 373 if (pdsEndpoint) { 374 // Mark as being cached 375 markSiteAsBeingCached(did, rkey) 376 377 try { 378 await downloadAndCacheSite(did, rkey, siteRecord.record, pdsEndpoint, siteRecord.cid) 379 this.log('Successfully cached site with new settings', { did, rkey }) 380 } finally { 381 unmarkSiteAsBeingCached(did, rkey) 382 } 383 } else { 384 this.log('Could not resolve PDS for DID', { did }) 385 } 386 } else { 387 this.log('No fs record found for site, skipping cache', { did, rkey }) 388 } 389 } catch (err) { 390 this.log('Failed to cache site after settings change', { 391 did, 392 rkey, 393 error: err instanceof Error ? err.message : String(err) 394 }) 395 } 396 397 this.log('Successfully processed settings change (new cache)', { did, rkey }) 398 return 399 } 400 401 // Site is already cached, just update the settings in metadata 402 try { 403 const { fetchSiteSettings, updateCacheMetadataSettings } = await import('./utils') 404 const settings = await fetchSiteSettings(did, rkey) 405 await updateCacheMetadataSettings(did, rkey, settings) 406 this.log('Updated cached settings', { did, rkey, hasSettings: !!settings }) 407 } catch (err) { 408 this.log('Failed to update cached settings', { 409 did, 410 rkey, 411 error: err instanceof Error ? err.message : String(err) 412 }) 413 } 414 415 this.log('Successfully processed settings change', { did, rkey }) 416 } 417 418 private deleteCache(did: string, site: string) { 419 const cacheDir = `${CACHE_DIR}/${did}/${site}` 420 421 if (!existsSync(cacheDir)) { 422 this.log('Cache directory does not exist, nothing to delete', { 423 did, 424 site 425 }) 426 return 427 } 428 429 try { 430 rmSync(cacheDir, { recursive: true, force: true }) 431 this.log('Cache deleted', { did, site, path: cacheDir }) 432 } catch (err) { 433 this.log('Failed to delete cache', { 434 did, 435 site, 436 path: cacheDir, 437 error: err instanceof Error ? err.message : String(err) 438 }) 439 } 440 } 441 442 getHealth() { 443 const isConnected = this.firehose !== null 444 const timeSinceLastEvent = Date.now() - this.lastEventTime 445 446 return { 447 connected: isConnected, 448 lastEventTime: this.lastEventTime, 449 timeSinceLastEvent, 450 healthy: isConnected && timeSinceLastEvent < 300000 // 5 minutes 451 } 452 } 453}