Monorepo for wisp.place. A static site hosting service built on top of the AT Protocol.
wisp.place
1import { existsSync, rmSync } from 'fs'
2import {
3 getPdsForDid,
4 downloadAndCacheSite,
5 fetchSiteRecord
6} from './utils'
7import { upsertSite, tryAcquireLock, releaseLock } from './db'
8import { safeFetch } from '@wisp/safe-fetch'
9// import { isRecord, validateRecord } from '@wisp/lexicons/types/place/wisp/fs'
10import { isRecord } from '@wisp/lexicons/types/place/wisp/fs'
11import { Firehose } from '@atproto/sync'
12import { IdResolver } from '@atproto/identity'
13import { invalidateSiteCache, markSiteAsBeingCached, unmarkSiteAsBeingCached } from './cache'
14import { clearRedirectRulesCache } from './site-cache'
15
16const CACHE_DIR = './cache/sites'
17
18export class FirehoseWorker {
19 private firehose: Firehose | null = null
20 private idResolver: IdResolver
21 private isShuttingDown = false
22 private lastEventTime = Date.now()
23 private cacheCleanupInterval: NodeJS.Timeout | null = null
24
25 constructor(
26 private logger?: (msg: string, data?: Record<string, unknown>) => void
27 ) {
28 this.idResolver = new IdResolver()
29 this.startCacheCleanup()
30 }
31
32 private log(msg: string, data?: Record<string, unknown>) {
33 const log = this.logger || console.log
34 log(`[FirehoseWorker] ${msg}`, data || {})
35 }
36
37 private startCacheCleanup() {
38 // Clear IdResolver cache every hour to prevent unbounded memory growth
39 // The IdResolver has an internal cache that never expires and can cause heap exhaustion
40 this.cacheCleanupInterval = setInterval(() => {
41 if (this.isShuttingDown) return
42
43 this.log('Clearing IdResolver cache to prevent memory leak')
44
45 // Recreate the IdResolver to clear its internal cache
46 this.idResolver = new IdResolver()
47
48 this.log('IdResolver cache cleared')
49 }, 60 * 60 * 1000) // Every hour
50 }
51
52 start() {
53 this.log('Starting firehose worker')
54 this.connect()
55 }
56
57 stop() {
58 this.log('Stopping firehose worker')
59 this.isShuttingDown = true
60
61 if (this.cacheCleanupInterval) {
62 clearInterval(this.cacheCleanupInterval)
63 this.cacheCleanupInterval = null
64 }
65
66 if (this.firehose) {
67 this.firehose.destroy()
68 this.firehose = null
69 }
70 }
71
72 private connect() {
73 if (this.isShuttingDown) return
74
75 this.log('Connecting to AT Protocol firehose')
76
77 this.firehose = new Firehose({
78 idResolver: this.idResolver,
79 service: 'wss://bsky.network',
80 filterCollections: ['place.wisp.fs', 'place.wisp.settings'],
81 handleEvent: async (evt: any) => {
82 this.lastEventTime = Date.now()
83
84 // Watch for write events
85 if (evt.event === 'create' || evt.event === 'update') {
86 const record = evt.record
87
88 // If the write is a valid place.wisp.fs record
89 if (
90 evt.collection === 'place.wisp.fs' &&
91 isRecord(record)
92 // && validateRecord(record).success
93 ) {
94 this.log('Received place.wisp.fs event', {
95 did: evt.did,
96 event: evt.event,
97 rkey: evt.rkey
98 })
99
100 try {
101 await this.handleCreateOrUpdate(
102 evt.did,
103 evt.rkey,
104 record,
105 evt.cid?.toString()
106 )
107 } catch (err) {
108 console.error('Full error details:', err);
109 this.log('Error handling event', {
110 did: evt.did,
111 event: evt.event,
112 rkey: evt.rkey,
113 error:
114 err instanceof Error
115 ? err.message
116 : String(err)
117 })
118 }
119 }
120 // Handle settings changes
121 else if (evt.collection === 'place.wisp.settings') {
122 this.log('Received place.wisp.settings event', {
123 did: evt.did,
124 event: evt.event,
125 rkey: evt.rkey
126 })
127
128 try {
129 await this.handleSettingsChange(evt.did, evt.rkey)
130 } catch (err) {
131 this.log('Error handling settings change', {
132 did: evt.did,
133 event: evt.event,
134 rkey: evt.rkey,
135 error:
136 err instanceof Error
137 ? err.message
138 : String(err)
139 })
140 }
141 }
142 } else if (
143 evt.event === 'delete' &&
144 evt.collection === 'place.wisp.fs'
145 ) {
146 this.log('Received delete event', {
147 did: evt.did,
148 rkey: evt.rkey
149 })
150
151 try {
152 await this.handleDelete(evt.did, evt.rkey)
153 } catch (err) {
154 this.log('Error handling delete', {
155 did: evt.did,
156 rkey: evt.rkey,
157 error:
158 err instanceof Error ? err.message : String(err)
159 })
160 }
161 } else if (
162 evt.event === 'delete' &&
163 evt.collection === 'place.wisp.settings'
164 ) {
165 this.log('Received settings delete event', {
166 did: evt.did,
167 rkey: evt.rkey
168 })
169
170 try {
171 await this.handleSettingsChange(evt.did, evt.rkey)
172 } catch (err) {
173 this.log('Error handling settings delete', {
174 did: evt.did,
175 rkey: evt.rkey,
176 error:
177 err instanceof Error ? err.message : String(err)
178 })
179 }
180 }
181 },
182 onError: (err: any) => {
183 this.log('Firehose error', {
184 error: err instanceof Error ? err.message : String(err),
185 stack: err instanceof Error ? err.stack : undefined,
186 fullError: err
187 })
188 console.error('Full firehose error:', err)
189 }
190 })
191
192 this.firehose.start()
193 this.log('Firehose started')
194 }
195
196 private async handleCreateOrUpdate(
197 did: string,
198 site: string,
199 record: any,
200 eventCid?: string
201 ) {
202 this.log('Processing create/update', { did, site })
203
204 // Record is already validated in handleEvent
205 const fsRecord = record
206
207 const pdsEndpoint = await getPdsForDid(did)
208 if (!pdsEndpoint) {
209 this.log('Could not resolve PDS for DID', { did })
210 return
211 }
212
213 this.log('Resolved PDS', { did, pdsEndpoint })
214
215 // Verify record exists on PDS and fetch its CID
216 this.log('Verifying record on PDS', { did, site })
217 let verifiedCid: string
218 try {
219 const result = await fetchSiteRecord(did, site)
220
221 if (!result) {
222 this.log('Record not found on PDS, skipping cache', {
223 did,
224 site
225 })
226 return
227 }
228
229 verifiedCid = result.cid
230
231 // Verify event CID matches PDS CID (prevent cache poisoning)
232 if (eventCid && eventCid !== verifiedCid) {
233 this.log('CID mismatch detected - potential spoofed event', {
234 did,
235 site,
236 eventCid,
237 verifiedCid
238 })
239 return
240 }
241
242 this.log('Record verified on PDS', { did, site, cid: verifiedCid })
243 } catch (err) {
244 this.log('Failed to verify record on PDS', {
245 did,
246 site,
247 error: err instanceof Error ? err.message : String(err)
248 })
249 return
250 }
251
252 // Invalidate in-memory caches before updating
253 invalidateSiteCache(did, site)
254
255 // Mark site as being cached to prevent serving stale content during update
256 markSiteAsBeingCached(did, site)
257
258 try {
259 // Cache the record with verified CID (uses atomic swap internally)
260 // All instances cache locally for edge serving
261 await downloadAndCacheSite(
262 did,
263 site,
264 fsRecord,
265 pdsEndpoint,
266 verifiedCid
267 )
268
269 // Clear redirect rules cache since the site was updated
270 clearRedirectRulesCache(did, site)
271
272 // Acquire distributed lock only for database write to prevent duplicate writes
273 // Note: upsertSite will check cache-only mode internally and skip if needed
274 const lockKey = `db:upsert:${did}:${site}`
275 const lockAcquired = await tryAcquireLock(lockKey)
276
277 if (!lockAcquired) {
278 this.log('Another instance is writing to DB, skipping upsert', {
279 did,
280 site
281 })
282 this.log('Successfully processed create/update (cached locally)', {
283 did,
284 site
285 })
286 return
287 }
288
289 try {
290 // Upsert site to database (only one instance does this)
291 // In cache-only mode, this will be a no-op
292 await upsertSite(did, site, fsRecord.site)
293 this.log(
294 'Successfully processed create/update (cached + DB updated)',
295 { did, site }
296 )
297 } finally {
298 // Always release lock, even if DB write fails
299 await releaseLock(lockKey)
300 }
301 } finally {
302 // Always unmark, even if caching fails
303 unmarkSiteAsBeingCached(did, site)
304 }
305 }
306
307 private async handleDelete(did: string, site: string) {
308 this.log('Processing delete', { did, site })
309
310 // All instances should delete their local cache (no lock needed)
311 const pdsEndpoint = await getPdsForDid(did)
312 if (!pdsEndpoint) {
313 this.log('Could not resolve PDS for DID', { did })
314 return
315 }
316
317 // Verify record is actually deleted from PDS
318 try {
319 const recordUrl = `${pdsEndpoint}/xrpc/com.atproto.repo.getRecord?repo=${encodeURIComponent(did)}&collection=place.wisp.fs&rkey=${encodeURIComponent(site)}`
320 const recordRes = await safeFetch(recordUrl)
321
322 if (recordRes.ok) {
323 this.log('Record still exists on PDS, not deleting cache', {
324 did,
325 site
326 })
327 return
328 }
329
330 this.log('Verified record is deleted from PDS', {
331 did,
332 site,
333 status: recordRes.status
334 })
335 } catch (err) {
336 this.log('Error verifying deletion on PDS', {
337 did,
338 site,
339 error: err instanceof Error ? err.message : String(err)
340 })
341 }
342
343 // Invalidate in-memory caches
344 invalidateSiteCache(did, site)
345
346 // Delete disk cache
347 this.deleteCache(did, site)
348
349 this.log('Successfully processed delete', { did, site })
350 }
351
352 private async handleSettingsChange(did: string, rkey: string) {
353 this.log('Processing settings change', { did, rkey })
354
355 // Invalidate in-memory caches (includes metadata which stores settings)
356 invalidateSiteCache(did, rkey)
357
358 // Check if site is already cached
359 const cacheDir = `${CACHE_DIR}/${did}/${rkey}`
360 const isCached = existsSync(cacheDir)
361
362 if (!isCached) {
363 this.log('Site not cached yet, checking if fs record exists', { did, rkey })
364
365 // If site exists on PDS, cache it (which will include the new settings)
366 try {
367 const siteRecord = await fetchSiteRecord(did, rkey)
368
369 if (siteRecord) {
370 this.log('Site record found, triggering full cache with settings', { did, rkey })
371 const pdsEndpoint = await getPdsForDid(did)
372
373 if (pdsEndpoint) {
374 // Mark as being cached
375 markSiteAsBeingCached(did, rkey)
376
377 try {
378 await downloadAndCacheSite(did, rkey, siteRecord.record, pdsEndpoint, siteRecord.cid)
379 this.log('Successfully cached site with new settings', { did, rkey })
380 } finally {
381 unmarkSiteAsBeingCached(did, rkey)
382 }
383 } else {
384 this.log('Could not resolve PDS for DID', { did })
385 }
386 } else {
387 this.log('No fs record found for site, skipping cache', { did, rkey })
388 }
389 } catch (err) {
390 this.log('Failed to cache site after settings change', {
391 did,
392 rkey,
393 error: err instanceof Error ? err.message : String(err)
394 })
395 }
396
397 this.log('Successfully processed settings change (new cache)', { did, rkey })
398 return
399 }
400
401 // Site is already cached, just update the settings in metadata
402 try {
403 const { fetchSiteSettings, updateCacheMetadataSettings } = await import('./utils')
404 const settings = await fetchSiteSettings(did, rkey)
405 await updateCacheMetadataSettings(did, rkey, settings)
406 this.log('Updated cached settings', { did, rkey, hasSettings: !!settings })
407 } catch (err) {
408 this.log('Failed to update cached settings', {
409 did,
410 rkey,
411 error: err instanceof Error ? err.message : String(err)
412 })
413 }
414
415 this.log('Successfully processed settings change', { did, rkey })
416 }
417
418 private deleteCache(did: string, site: string) {
419 const cacheDir = `${CACHE_DIR}/${did}/${site}`
420
421 if (!existsSync(cacheDir)) {
422 this.log('Cache directory does not exist, nothing to delete', {
423 did,
424 site
425 })
426 return
427 }
428
429 try {
430 rmSync(cacheDir, { recursive: true, force: true })
431 this.log('Cache deleted', { did, site, path: cacheDir })
432 } catch (err) {
433 this.log('Failed to delete cache', {
434 did,
435 site,
436 path: cacheDir,
437 error: err instanceof Error ? err.message : String(err)
438 })
439 }
440 }
441
442 getHealth() {
443 const isConnected = this.firehose !== null
444 const timeSinceLastEvent = Date.now() - this.lastEventTime
445
446 return {
447 connected: isConnected,
448 lastEventTime: this.lastEventTime,
449 timeSinceLastEvent,
450 healthy: isConnected && timeSinceLastEvent < 300000 // 5 minutes
451 }
452 }
453}