forked from
nekomimi.pet/wisp.place-monorepo
Monorepo for Wisp.place. A static site hosting service built on top of the AT Protocol.
1import { existsSync, rmSync } from 'fs'
2import {
3 getPdsForDid,
4 downloadAndCacheSite,
5 extractBlobCid,
6 fetchSiteRecord
7} from './utils'
8import { upsertSite, tryAcquireLock, releaseLock } from './db'
9import { safeFetch } from './safe-fetch'
10import { isRecord, validateRecord } from '../lexicon/types/place/wisp/fs'
11import { Firehose } from '@atproto/sync'
12import { IdResolver } from '@atproto/identity'
13
14const CACHE_DIR = './cache/sites'
15
16export class FirehoseWorker {
17 private firehose: Firehose | null = null
18 private idResolver: IdResolver
19 private isShuttingDown = false
20 private lastEventTime = Date.now()
21
22 constructor(
23 private logger?: (msg: string, data?: Record<string, unknown>) => void
24 ) {
25 this.idResolver = new IdResolver()
26 }
27
28 private log(msg: string, data?: Record<string, unknown>) {
29 const log = this.logger || console.log
30 log(`[FirehoseWorker] ${msg}`, data || {})
31 }
32
33 start() {
34 this.log('Starting firehose worker')
35 this.connect()
36 }
37
38 stop() {
39 this.log('Stopping firehose worker')
40 this.isShuttingDown = true
41
42 if (this.firehose) {
43 this.firehose.destroy()
44 this.firehose = null
45 }
46 }
47
48 private connect() {
49 if (this.isShuttingDown) return
50
51 this.log('Connecting to AT Protocol firehose')
52
53 this.firehose = new Firehose({
54 idResolver: this.idResolver,
55 service: 'wss://bsky.network',
56 filterCollections: ['place.wisp.fs'],
57 handleEvent: async (evt: any) => {
58 this.lastEventTime = Date.now()
59
60 // Watch for write events
61 if (evt.event === 'create' || evt.event === 'update') {
62 const record = evt.record
63
64 // If the write is a valid place.wisp.fs record
65 if (
66 evt.collection === 'place.wisp.fs' &&
67 isRecord(record) &&
68 validateRecord(record).success
69 ) {
70 this.log('Received place.wisp.fs event', {
71 did: evt.did,
72 event: evt.event,
73 rkey: evt.rkey
74 })
75
76 try {
77 await this.handleCreateOrUpdate(
78 evt.did,
79 evt.rkey,
80 record,
81 evt.cid?.toString()
82 )
83 } catch (err) {
84 this.log('Error handling event', {
85 did: evt.did,
86 event: evt.event,
87 rkey: evt.rkey,
88 error:
89 err instanceof Error
90 ? err.message
91 : String(err)
92 })
93 }
94 }
95 } else if (
96 evt.event === 'delete' &&
97 evt.collection === 'place.wisp.fs'
98 ) {
99 this.log('Received delete event', {
100 did: evt.did,
101 rkey: evt.rkey
102 })
103
104 try {
105 await this.handleDelete(evt.did, evt.rkey)
106 } catch (err) {
107 this.log('Error handling delete', {
108 did: evt.did,
109 rkey: evt.rkey,
110 error:
111 err instanceof Error ? err.message : String(err)
112 })
113 }
114 }
115 },
116 onError: (err: any) => {
117 this.log('Firehose error', {
118 error: err instanceof Error ? err.message : String(err),
119 stack: err instanceof Error ? err.stack : undefined,
120 fullError: err
121 })
122 console.error('Full firehose error:', err)
123 }
124 })
125
126 this.firehose.start()
127 this.log('Firehose started')
128 }
129
130 private async handleCreateOrUpdate(
131 did: string,
132 site: string,
133 record: any,
134 eventCid?: string
135 ) {
136 this.log('Processing create/update', { did, site })
137
138 // Record is already validated in handleEvent
139 const fsRecord = record
140
141 const pdsEndpoint = await getPdsForDid(did)
142 if (!pdsEndpoint) {
143 this.log('Could not resolve PDS for DID', { did })
144 return
145 }
146
147 this.log('Resolved PDS', { did, pdsEndpoint })
148
149 // Verify record exists on PDS and fetch its CID
150 let verifiedCid: string
151 try {
152 const result = await fetchSiteRecord(did, site)
153
154 if (!result) {
155 this.log('Record not found on PDS, skipping cache', {
156 did,
157 site
158 })
159 return
160 }
161
162 verifiedCid = result.cid
163
164 // Verify event CID matches PDS CID (prevent cache poisoning)
165 if (eventCid && eventCid !== verifiedCid) {
166 this.log('CID mismatch detected - potential spoofed event', {
167 did,
168 site,
169 eventCid,
170 verifiedCid
171 })
172 return
173 }
174
175 this.log('Record verified on PDS', { did, site, cid: verifiedCid })
176 } catch (err) {
177 this.log('Failed to verify record on PDS', {
178 did,
179 site,
180 error: err instanceof Error ? err.message : String(err)
181 })
182 return
183 }
184
185 // Cache the record with verified CID (uses atomic swap internally)
186 // All instances cache locally for edge serving
187 await downloadAndCacheSite(
188 did,
189 site,
190 fsRecord,
191 pdsEndpoint,
192 verifiedCid
193 )
194
195 // Acquire distributed lock only for database write to prevent duplicate writes
196 const lockKey = `db:upsert:${did}:${site}`
197 const lockAcquired = await tryAcquireLock(lockKey)
198
199 if (!lockAcquired) {
200 this.log('Another instance is writing to DB, skipping upsert', {
201 did,
202 site
203 })
204 this.log('Successfully processed create/update (cached locally)', {
205 did,
206 site
207 })
208 return
209 }
210
211 try {
212 // Upsert site to database (only one instance does this)
213 await upsertSite(did, site, fsRecord.site)
214 this.log(
215 'Successfully processed create/update (cached + DB updated)',
216 { did, site }
217 )
218 } finally {
219 // Always release lock, even if DB write fails
220 await releaseLock(lockKey)
221 }
222 }
223
224 private async handleDelete(did: string, site: string) {
225 this.log('Processing delete', { did, site })
226
227 // All instances should delete their local cache (no lock needed)
228 const pdsEndpoint = await getPdsForDid(did)
229 if (!pdsEndpoint) {
230 this.log('Could not resolve PDS for DID', { did })
231 return
232 }
233
234 // Verify record is actually deleted from PDS
235 try {
236 const recordUrl = `${pdsEndpoint}/xrpc/com.atproto.repo.getRecord?repo=${encodeURIComponent(did)}&collection=place.wisp.fs&rkey=${encodeURIComponent(site)}`
237 const recordRes = await safeFetch(recordUrl)
238
239 if (recordRes.ok) {
240 this.log('Record still exists on PDS, not deleting cache', {
241 did,
242 site
243 })
244 return
245 }
246
247 this.log('Verified record is deleted from PDS', {
248 did,
249 site,
250 status: recordRes.status
251 })
252 } catch (err) {
253 this.log('Error verifying deletion on PDS', {
254 did,
255 site,
256 error: err instanceof Error ? err.message : String(err)
257 })
258 }
259
260 // Delete cache
261 this.deleteCache(did, site)
262
263 this.log('Successfully processed delete', { did, site })
264 }
265
266 private deleteCache(did: string, site: string) {
267 const cacheDir = `${CACHE_DIR}/${did}/${site}`
268
269 if (!existsSync(cacheDir)) {
270 this.log('Cache directory does not exist, nothing to delete', {
271 did,
272 site
273 })
274 return
275 }
276
277 try {
278 rmSync(cacheDir, { recursive: true, force: true })
279 this.log('Cache deleted', { did, site, path: cacheDir })
280 } catch (err) {
281 this.log('Failed to delete cache', {
282 did,
283 site,
284 path: cacheDir,
285 error: err instanceof Error ? err.message : String(err)
286 })
287 }
288 }
289
290 getHealth() {
291 const isConnected = this.firehose !== null
292 const timeSinceLastEvent = Date.now() - this.lastEventTime
293
294 return {
295 connected: isConnected,
296 lastEventTime: this.lastEventTime,
297 timeSinceLastEvent,
298 healthy: isConnected && timeSinceLastEvent < 300000 // 5 minutes
299 }
300 }
301}