Monorepo for wisp.place. A static site hosting service built on top of the AT Protocol.
wisp.place
1import { existsSync, rmSync } from 'fs'
2import {
3 getPdsForDid,
4 downloadAndCacheSite,
5 extractBlobCid,
6 fetchSiteRecord
7} from './utils'
8import { upsertSite, tryAcquireLock, releaseLock } from './db'
9import { safeFetch } from './safe-fetch'
10import { isRecord, validateRecord } from '../lexicon/types/place/wisp/fs'
11import { Firehose } from '@atproto/sync'
12import { IdResolver } from '@atproto/identity'
13import { invalidateSiteCache, markSiteAsBeingCached, unmarkSiteAsBeingCached } from './cache'
14import { clearRedirectRulesCache } from '../server'
15
16const CACHE_DIR = './cache/sites'
17
18export class FirehoseWorker {
19 private firehose: Firehose | null = null
20 private idResolver: IdResolver
21 private isShuttingDown = false
22 private lastEventTime = Date.now()
23
24 constructor(
25 private logger?: (msg: string, data?: Record<string, unknown>) => void
26 ) {
27 this.idResolver = new IdResolver()
28 }
29
30 private log(msg: string, data?: Record<string, unknown>) {
31 const log = this.logger || console.log
32 log(`[FirehoseWorker] ${msg}`, data || {})
33 }
34
35 start() {
36 this.log('Starting firehose worker')
37 this.connect()
38 }
39
40 stop() {
41 this.log('Stopping firehose worker')
42 this.isShuttingDown = true
43
44 if (this.firehose) {
45 this.firehose.destroy()
46 this.firehose = null
47 }
48 }
49
50 private connect() {
51 if (this.isShuttingDown) return
52
53 this.log('Connecting to AT Protocol firehose')
54
55 this.firehose = new Firehose({
56 idResolver: this.idResolver,
57 service: 'wss://bsky.network',
58 filterCollections: ['place.wisp.fs', 'place.wisp.settings'],
59 handleEvent: async (evt: any) => {
60 this.lastEventTime = Date.now()
61
62 // Watch for write events
63 if (evt.event === 'create' || evt.event === 'update') {
64 const record = evt.record
65
66 // If the write is a valid place.wisp.fs record
67 if (
68 evt.collection === 'place.wisp.fs' &&
69 isRecord(record) &&
70 validateRecord(record).success
71 ) {
72 this.log('Received place.wisp.fs event', {
73 did: evt.did,
74 event: evt.event,
75 rkey: evt.rkey
76 })
77
78 try {
79 await this.handleCreateOrUpdate(
80 evt.did,
81 evt.rkey,
82 record,
83 evt.cid?.toString()
84 )
85 } catch (err) {
86 console.error('Full error details:', err);
87 this.log('Error handling event', {
88 did: evt.did,
89 event: evt.event,
90 rkey: evt.rkey,
91 error:
92 err instanceof Error
93 ? err.message
94 : String(err)
95 })
96 }
97 }
98 // Handle settings changes
99 else if (evt.collection === 'place.wisp.settings') {
100 this.log('Received place.wisp.settings event', {
101 did: evt.did,
102 event: evt.event,
103 rkey: evt.rkey
104 })
105
106 try {
107 await this.handleSettingsChange(evt.did, evt.rkey)
108 } catch (err) {
109 this.log('Error handling settings change', {
110 did: evt.did,
111 event: evt.event,
112 rkey: evt.rkey,
113 error:
114 err instanceof Error
115 ? err.message
116 : String(err)
117 })
118 }
119 }
120 } else if (
121 evt.event === 'delete' &&
122 evt.collection === 'place.wisp.fs'
123 ) {
124 this.log('Received delete event', {
125 did: evt.did,
126 rkey: evt.rkey
127 })
128
129 try {
130 await this.handleDelete(evt.did, evt.rkey)
131 } catch (err) {
132 this.log('Error handling delete', {
133 did: evt.did,
134 rkey: evt.rkey,
135 error:
136 err instanceof Error ? err.message : String(err)
137 })
138 }
139 } else if (
140 evt.event === 'delete' &&
141 evt.collection === 'place.wisp.settings'
142 ) {
143 this.log('Received settings delete event', {
144 did: evt.did,
145 rkey: evt.rkey
146 })
147
148 try {
149 await this.handleSettingsChange(evt.did, evt.rkey)
150 } catch (err) {
151 this.log('Error handling settings delete', {
152 did: evt.did,
153 rkey: evt.rkey,
154 error:
155 err instanceof Error ? err.message : String(err)
156 })
157 }
158 }
159 },
160 onError: (err: any) => {
161 this.log('Firehose error', {
162 error: err instanceof Error ? err.message : String(err),
163 stack: err instanceof Error ? err.stack : undefined,
164 fullError: err
165 })
166 console.error('Full firehose error:', err)
167 }
168 })
169
170 this.firehose.start()
171 this.log('Firehose started')
172 }
173
174 private async handleCreateOrUpdate(
175 did: string,
176 site: string,
177 record: any,
178 eventCid?: string
179 ) {
180 this.log('Processing create/update', { did, site })
181
182 // Record is already validated in handleEvent
183 const fsRecord = record
184
185 const pdsEndpoint = await getPdsForDid(did)
186 if (!pdsEndpoint) {
187 this.log('Could not resolve PDS for DID', { did })
188 return
189 }
190
191 this.log('Resolved PDS', { did, pdsEndpoint })
192
193 // Verify record exists on PDS and fetch its CID
194 this.log('Verifying record on PDS', { did, site })
195 let verifiedCid: string
196 try {
197 const result = await fetchSiteRecord(did, site)
198
199 if (!result) {
200 this.log('Record not found on PDS, skipping cache', {
201 did,
202 site
203 })
204 return
205 }
206
207 verifiedCid = result.cid
208
209 // Verify event CID matches PDS CID (prevent cache poisoning)
210 if (eventCid && eventCid !== verifiedCid) {
211 this.log('CID mismatch detected - potential spoofed event', {
212 did,
213 site,
214 eventCid,
215 verifiedCid
216 })
217 return
218 }
219
220 this.log('Record verified on PDS', { did, site, cid: verifiedCid })
221 } catch (err) {
222 this.log('Failed to verify record on PDS', {
223 did,
224 site,
225 error: err instanceof Error ? err.message : String(err)
226 })
227 return
228 }
229
230 // Invalidate in-memory caches before updating
231 invalidateSiteCache(did, site)
232
233 // Mark site as being cached to prevent serving stale content during update
234 markSiteAsBeingCached(did, site)
235
236 try {
237 // Cache the record with verified CID (uses atomic swap internally)
238 // All instances cache locally for edge serving
239 await downloadAndCacheSite(
240 did,
241 site,
242 fsRecord,
243 pdsEndpoint,
244 verifiedCid
245 )
246
247 // Clear redirect rules cache since the site was updated
248 clearRedirectRulesCache(did, site)
249
250 // Acquire distributed lock only for database write to prevent duplicate writes
251 // Note: upsertSite will check cache-only mode internally and skip if needed
252 const lockKey = `db:upsert:${did}:${site}`
253 const lockAcquired = await tryAcquireLock(lockKey)
254
255 if (!lockAcquired) {
256 this.log('Another instance is writing to DB, skipping upsert', {
257 did,
258 site
259 })
260 this.log('Successfully processed create/update (cached locally)', {
261 did,
262 site
263 })
264 return
265 }
266
267 try {
268 // Upsert site to database (only one instance does this)
269 // In cache-only mode, this will be a no-op
270 await upsertSite(did, site, fsRecord.site)
271 this.log(
272 'Successfully processed create/update (cached + DB updated)',
273 { did, site }
274 )
275 } finally {
276 // Always release lock, even if DB write fails
277 await releaseLock(lockKey)
278 }
279 } finally {
280 // Always unmark, even if caching fails
281 unmarkSiteAsBeingCached(did, site)
282 }
283 }
284
285 private async handleDelete(did: string, site: string) {
286 this.log('Processing delete', { did, site })
287
288 // All instances should delete their local cache (no lock needed)
289 const pdsEndpoint = await getPdsForDid(did)
290 if (!pdsEndpoint) {
291 this.log('Could not resolve PDS for DID', { did })
292 return
293 }
294
295 // Verify record is actually deleted from PDS
296 try {
297 const recordUrl = `${pdsEndpoint}/xrpc/com.atproto.repo.getRecord?repo=${encodeURIComponent(did)}&collection=place.wisp.fs&rkey=${encodeURIComponent(site)}`
298 const recordRes = await safeFetch(recordUrl)
299
300 if (recordRes.ok) {
301 this.log('Record still exists on PDS, not deleting cache', {
302 did,
303 site
304 })
305 return
306 }
307
308 this.log('Verified record is deleted from PDS', {
309 did,
310 site,
311 status: recordRes.status
312 })
313 } catch (err) {
314 this.log('Error verifying deletion on PDS', {
315 did,
316 site,
317 error: err instanceof Error ? err.message : String(err)
318 })
319 }
320
321 // Invalidate in-memory caches
322 invalidateSiteCache(did, site)
323
324 // Delete disk cache
325 this.deleteCache(did, site)
326
327 this.log('Successfully processed delete', { did, site })
328 }
329
330 private async handleSettingsChange(did: string, rkey: string) {
331 this.log('Processing settings change', { did, rkey })
332
333 // Invalidate in-memory caches (includes metadata which stores settings)
334 invalidateSiteCache(did, rkey)
335
336 // Check if site is already cached
337 const cacheDir = `${CACHE_DIR}/${did}/${rkey}`
338 const isCached = existsSync(cacheDir)
339
340 if (!isCached) {
341 this.log('Site not cached yet, checking if fs record exists', { did, rkey })
342
343 // If site exists on PDS, cache it (which will include the new settings)
344 try {
345 const siteRecord = await fetchSiteRecord(did, rkey)
346
347 if (siteRecord) {
348 this.log('Site record found, triggering full cache with settings', { did, rkey })
349 const pdsEndpoint = await getPdsForDid(did)
350
351 if (pdsEndpoint) {
352 // Mark as being cached
353 markSiteAsBeingCached(did, rkey)
354
355 try {
356 await downloadAndCacheSite(did, rkey, siteRecord.record, pdsEndpoint, siteRecord.cid)
357 this.log('Successfully cached site with new settings', { did, rkey })
358 } finally {
359 unmarkSiteAsBeingCached(did, rkey)
360 }
361 } else {
362 this.log('Could not resolve PDS for DID', { did })
363 }
364 } else {
365 this.log('No fs record found for site, skipping cache', { did, rkey })
366 }
367 } catch (err) {
368 this.log('Failed to cache site after settings change', {
369 did,
370 rkey,
371 error: err instanceof Error ? err.message : String(err)
372 })
373 }
374
375 this.log('Successfully processed settings change (new cache)', { did, rkey })
376 return
377 }
378
379 // Site is already cached, just update the settings in metadata
380 try {
381 const { fetchSiteSettings, updateCacheMetadataSettings } = await import('./utils')
382 const settings = await fetchSiteSettings(did, rkey)
383 await updateCacheMetadataSettings(did, rkey, settings)
384 this.log('Updated cached settings', { did, rkey, hasSettings: !!settings })
385 } catch (err) {
386 this.log('Failed to update cached settings', {
387 did,
388 rkey,
389 error: err instanceof Error ? err.message : String(err)
390 })
391 }
392
393 this.log('Successfully processed settings change', { did, rkey })
394 }
395
396 private deleteCache(did: string, site: string) {
397 const cacheDir = `${CACHE_DIR}/${did}/${site}`
398
399 if (!existsSync(cacheDir)) {
400 this.log('Cache directory does not exist, nothing to delete', {
401 did,
402 site
403 })
404 return
405 }
406
407 try {
408 rmSync(cacheDir, { recursive: true, force: true })
409 this.log('Cache deleted', { did, site, path: cacheDir })
410 } catch (err) {
411 this.log('Failed to delete cache', {
412 did,
413 site,
414 path: cacheDir,
415 error: err instanceof Error ? err.message : String(err)
416 })
417 }
418 }
419
420 getHealth() {
421 const isConnected = this.firehose !== null
422 const timeSinceLastEvent = Date.now() - this.lastEventTime
423
424 return {
425 connected: isConnected,
426 lastEventTime: this.lastEventTime,
427 timeSinceLastEvent,
428 healthy: isConnected && timeSinceLastEvent < 300000 // 5 minutes
429 }
430 }
431}