import { existsSync, rmSync } from 'fs' import { getPdsForDid, downloadAndCacheSite, extractBlobCid, fetchSiteRecord } from './utils' import { upsertSite, tryAcquireLock, releaseLock } from './db' import { safeFetch } from './safe-fetch' import { isRecord, validateRecord } from '../lexicon/types/place/wisp/fs' import { Firehose } from '@atproto/sync' import { IdResolver } from '@atproto/identity' const CACHE_DIR = './cache/sites' export class FirehoseWorker { private firehose: Firehose | null = null private idResolver: IdResolver private isShuttingDown = false private lastEventTime = Date.now() constructor( private logger?: (msg: string, data?: Record) => void ) { this.idResolver = new IdResolver() } private log(msg: string, data?: Record) { const log = this.logger || console.log log(`[FirehoseWorker] ${msg}`, data || {}) } start() { this.log('Starting firehose worker') this.connect() } stop() { this.log('Stopping firehose worker') this.isShuttingDown = true if (this.firehose) { this.firehose.destroy() this.firehose = null } } private connect() { if (this.isShuttingDown) return this.log('Connecting to AT Protocol firehose') this.firehose = new Firehose({ idResolver: this.idResolver, service: 'wss://bsky.network', filterCollections: ['place.wisp.fs'], handleEvent: async (evt: any) => { this.lastEventTime = Date.now() // Watch for write events if (evt.event === 'create' || evt.event === 'update') { const record = evt.record // If the write is a valid place.wisp.fs record if ( evt.collection === 'place.wisp.fs' && isRecord(record) && validateRecord(record).success ) { this.log('Received place.wisp.fs event', { did: evt.did, event: evt.event, rkey: evt.rkey }) try { await this.handleCreateOrUpdate( evt.did, evt.rkey, record, evt.cid?.toString() ) } catch (err) { this.log('Error handling event', { did: evt.did, event: evt.event, rkey: evt.rkey, error: err instanceof Error ? err.message : String(err) }) } } } else if ( evt.event === 'delete' && evt.collection === 'place.wisp.fs' ) { this.log('Received delete event', { did: evt.did, rkey: evt.rkey }) try { await this.handleDelete(evt.did, evt.rkey) } catch (err) { this.log('Error handling delete', { did: evt.did, rkey: evt.rkey, error: err instanceof Error ? err.message : String(err) }) } } }, onError: (err: any) => { this.log('Firehose error', { error: err instanceof Error ? err.message : String(err), stack: err instanceof Error ? err.stack : undefined, fullError: err }) console.error('Full firehose error:', err) } }) this.firehose.start() this.log('Firehose started') } private async handleCreateOrUpdate( did: string, site: string, record: any, eventCid?: string ) { this.log('Processing create/update', { did, site }) // Record is already validated in handleEvent const fsRecord = record const pdsEndpoint = await getPdsForDid(did) if (!pdsEndpoint) { this.log('Could not resolve PDS for DID', { did }) return } this.log('Resolved PDS', { did, pdsEndpoint }) // Verify record exists on PDS and fetch its CID let verifiedCid: string try { const result = await fetchSiteRecord(did, site) if (!result) { this.log('Record not found on PDS, skipping cache', { did, site }) return } verifiedCid = result.cid // Verify event CID matches PDS CID (prevent cache poisoning) if (eventCid && eventCid !== verifiedCid) { this.log('CID mismatch detected - potential spoofed event', { did, site, eventCid, verifiedCid }) return } this.log('Record verified on PDS', { did, site, cid: verifiedCid }) } catch (err) { this.log('Failed to verify record on PDS', { did, site, error: err instanceof Error ? err.message : String(err) }) return } // Cache the record with verified CID (uses atomic swap internally) // All instances cache locally for edge serving await downloadAndCacheSite( did, site, fsRecord, pdsEndpoint, verifiedCid ) // Acquire distributed lock only for database write to prevent duplicate writes const lockKey = `db:upsert:${did}:${site}` const lockAcquired = await tryAcquireLock(lockKey) if (!lockAcquired) { this.log('Another instance is writing to DB, skipping upsert', { did, site }) this.log('Successfully processed create/update (cached locally)', { did, site }) return } try { // Upsert site to database (only one instance does this) await upsertSite(did, site, fsRecord.site) this.log( 'Successfully processed create/update (cached + DB updated)', { did, site } ) } finally { // Always release lock, even if DB write fails await releaseLock(lockKey) } } private async handleDelete(did: string, site: string) { this.log('Processing delete', { did, site }) // All instances should delete their local cache (no lock needed) const pdsEndpoint = await getPdsForDid(did) if (!pdsEndpoint) { this.log('Could not resolve PDS for DID', { did }) return } // Verify record is actually deleted from PDS try { const recordUrl = `${pdsEndpoint}/xrpc/com.atproto.repo.getRecord?repo=${encodeURIComponent(did)}&collection=place.wisp.fs&rkey=${encodeURIComponent(site)}` const recordRes = await safeFetch(recordUrl) if (recordRes.ok) { this.log('Record still exists on PDS, not deleting cache', { did, site }) return } this.log('Verified record is deleted from PDS', { did, site, status: recordRes.status }) } catch (err) { this.log('Error verifying deletion on PDS', { did, site, error: err instanceof Error ? err.message : String(err) }) } // Delete cache this.deleteCache(did, site) this.log('Successfully processed delete', { did, site }) } private deleteCache(did: string, site: string) { const cacheDir = `${CACHE_DIR}/${did}/${site}` if (!existsSync(cacheDir)) { this.log('Cache directory does not exist, nothing to delete', { did, site }) return } try { rmSync(cacheDir, { recursive: true, force: true }) this.log('Cache deleted', { did, site, path: cacheDir }) } catch (err) { this.log('Failed to delete cache', { did, site, path: cacheDir, error: err instanceof Error ? err.message : String(err) }) } } getHealth() { const isConnected = this.firehose !== null const timeSinceLastEvent = Date.now() - this.lastEventTime return { connected: isConnected, lastEventTime: this.lastEventTime, timeSinceLastEvent, healthy: isConnected && timeSinceLastEvent < 300000 // 5 minutes } } }