Monorepo for Wisp.place. A static site hosting service built on top of the AT Protocol.
at main 7.1 kB view raw
1import { existsSync, rmSync } from 'fs' 2import { 3 getPdsForDid, 4 downloadAndCacheSite, 5 extractBlobCid, 6 fetchSiteRecord 7} from './utils' 8import { upsertSite, tryAcquireLock, releaseLock } from './db' 9import { safeFetch } from './safe-fetch' 10import { isRecord, validateRecord } from '../lexicon/types/place/wisp/fs' 11import { Firehose } from '@atproto/sync' 12import { IdResolver } from '@atproto/identity' 13 14const CACHE_DIR = './cache/sites' 15 16export class FirehoseWorker { 17 private firehose: Firehose | null = null 18 private idResolver: IdResolver 19 private isShuttingDown = false 20 private lastEventTime = Date.now() 21 22 constructor( 23 private logger?: (msg: string, data?: Record<string, unknown>) => void 24 ) { 25 this.idResolver = new IdResolver() 26 } 27 28 private log(msg: string, data?: Record<string, unknown>) { 29 const log = this.logger || console.log 30 log(`[FirehoseWorker] ${msg}`, data || {}) 31 } 32 33 start() { 34 this.log('Starting firehose worker') 35 this.connect() 36 } 37 38 stop() { 39 this.log('Stopping firehose worker') 40 this.isShuttingDown = true 41 42 if (this.firehose) { 43 this.firehose.destroy() 44 this.firehose = null 45 } 46 } 47 48 private connect() { 49 if (this.isShuttingDown) return 50 51 this.log('Connecting to AT Protocol firehose') 52 53 this.firehose = new Firehose({ 54 idResolver: this.idResolver, 55 service: 'wss://bsky.network', 56 filterCollections: ['place.wisp.fs'], 57 handleEvent: async (evt: any) => { 58 this.lastEventTime = Date.now() 59 60 // Watch for write events 61 if (evt.event === 'create' || evt.event === 'update') { 62 const record = evt.record 63 64 // If the write is a valid place.wisp.fs record 65 if ( 66 evt.collection === 'place.wisp.fs' && 67 isRecord(record) && 68 validateRecord(record).success 69 ) { 70 this.log('Received place.wisp.fs event', { 71 did: evt.did, 72 event: evt.event, 73 rkey: evt.rkey 74 }) 75 76 try { 77 await this.handleCreateOrUpdate( 78 evt.did, 79 evt.rkey, 80 record, 81 evt.cid?.toString() 82 ) 83 } catch (err) { 84 this.log('Error handling event', { 85 did: evt.did, 86 event: evt.event, 87 rkey: evt.rkey, 88 error: 89 err instanceof Error 90 ? err.message 91 : String(err) 92 }) 93 } 94 } 95 } else if ( 96 evt.event === 'delete' && 97 evt.collection === 'place.wisp.fs' 98 ) { 99 this.log('Received delete event', { 100 did: evt.did, 101 rkey: evt.rkey 102 }) 103 104 try { 105 await this.handleDelete(evt.did, evt.rkey) 106 } catch (err) { 107 this.log('Error handling delete', { 108 did: evt.did, 109 rkey: evt.rkey, 110 error: 111 err instanceof Error ? err.message : String(err) 112 }) 113 } 114 } 115 }, 116 onError: (err: any) => { 117 this.log('Firehose error', { 118 error: err instanceof Error ? err.message : String(err), 119 stack: err instanceof Error ? err.stack : undefined, 120 fullError: err 121 }) 122 console.error('Full firehose error:', err) 123 } 124 }) 125 126 this.firehose.start() 127 this.log('Firehose started') 128 } 129 130 private async handleCreateOrUpdate( 131 did: string, 132 site: string, 133 record: any, 134 eventCid?: string 135 ) { 136 this.log('Processing create/update', { did, site }) 137 138 // Record is already validated in handleEvent 139 const fsRecord = record 140 141 const pdsEndpoint = await getPdsForDid(did) 142 if (!pdsEndpoint) { 143 this.log('Could not resolve PDS for DID', { did }) 144 return 145 } 146 147 this.log('Resolved PDS', { did, pdsEndpoint }) 148 149 // Verify record exists on PDS and fetch its CID 150 let verifiedCid: string 151 try { 152 const result = await fetchSiteRecord(did, site) 153 154 if (!result) { 155 this.log('Record not found on PDS, skipping cache', { 156 did, 157 site 158 }) 159 return 160 } 161 162 verifiedCid = result.cid 163 164 // Verify event CID matches PDS CID (prevent cache poisoning) 165 if (eventCid && eventCid !== verifiedCid) { 166 this.log('CID mismatch detected - potential spoofed event', { 167 did, 168 site, 169 eventCid, 170 verifiedCid 171 }) 172 return 173 } 174 175 this.log('Record verified on PDS', { did, site, cid: verifiedCid }) 176 } catch (err) { 177 this.log('Failed to verify record on PDS', { 178 did, 179 site, 180 error: err instanceof Error ? err.message : String(err) 181 }) 182 return 183 } 184 185 // Cache the record with verified CID (uses atomic swap internally) 186 // All instances cache locally for edge serving 187 await downloadAndCacheSite( 188 did, 189 site, 190 fsRecord, 191 pdsEndpoint, 192 verifiedCid 193 ) 194 195 // Acquire distributed lock only for database write to prevent duplicate writes 196 const lockKey = `db:upsert:${did}:${site}` 197 const lockAcquired = await tryAcquireLock(lockKey) 198 199 if (!lockAcquired) { 200 this.log('Another instance is writing to DB, skipping upsert', { 201 did, 202 site 203 }) 204 this.log('Successfully processed create/update (cached locally)', { 205 did, 206 site 207 }) 208 return 209 } 210 211 try { 212 // Upsert site to database (only one instance does this) 213 await upsertSite(did, site, fsRecord.site) 214 this.log( 215 'Successfully processed create/update (cached + DB updated)', 216 { did, site } 217 ) 218 } finally { 219 // Always release lock, even if DB write fails 220 await releaseLock(lockKey) 221 } 222 } 223 224 private async handleDelete(did: string, site: string) { 225 this.log('Processing delete', { did, site }) 226 227 // All instances should delete their local cache (no lock needed) 228 const pdsEndpoint = await getPdsForDid(did) 229 if (!pdsEndpoint) { 230 this.log('Could not resolve PDS for DID', { did }) 231 return 232 } 233 234 // Verify record is actually deleted from PDS 235 try { 236 const recordUrl = `${pdsEndpoint}/xrpc/com.atproto.repo.getRecord?repo=${encodeURIComponent(did)}&collection=place.wisp.fs&rkey=${encodeURIComponent(site)}` 237 const recordRes = await safeFetch(recordUrl) 238 239 if (recordRes.ok) { 240 this.log('Record still exists on PDS, not deleting cache', { 241 did, 242 site 243 }) 244 return 245 } 246 247 this.log('Verified record is deleted from PDS', { 248 did, 249 site, 250 status: recordRes.status 251 }) 252 } catch (err) { 253 this.log('Error verifying deletion on PDS', { 254 did, 255 site, 256 error: err instanceof Error ? err.message : String(err) 257 }) 258 } 259 260 // Delete cache 261 this.deleteCache(did, site) 262 263 this.log('Successfully processed delete', { did, site }) 264 } 265 266 private deleteCache(did: string, site: string) { 267 const cacheDir = `${CACHE_DIR}/${did}/${site}` 268 269 if (!existsSync(cacheDir)) { 270 this.log('Cache directory does not exist, nothing to delete', { 271 did, 272 site 273 }) 274 return 275 } 276 277 try { 278 rmSync(cacheDir, { recursive: true, force: true }) 279 this.log('Cache deleted', { did, site, path: cacheDir }) 280 } catch (err) { 281 this.log('Failed to delete cache', { 282 did, 283 site, 284 path: cacheDir, 285 error: err instanceof Error ? err.message : String(err) 286 }) 287 } 288 } 289 290 getHealth() { 291 const isConnected = this.firehose !== null 292 const timeSinceLastEvent = Date.now() - this.lastEventTime 293 294 return { 295 connected: isConnected, 296 lastEventTime: this.lastEventTime, 297 timeSinceLastEvent, 298 healthy: isConnected && timeSinceLastEvent < 300000 // 5 minutes 299 } 300 } 301}