Monorepo for wisp.place. A static site hosting service built on top of the AT Protocol.
wisp.place
1import { existsSync, rmSync } from 'fs'
2import {
3 getPdsForDid,
4 downloadAndCacheSite,
5 extractBlobCid,
6 fetchSiteRecord
7} from './utils'
8import { upsertSite, tryAcquireLock, releaseLock } from './db'
9import { safeFetch } from './safe-fetch'
10import { isRecord, validateRecord } from '../lexicon/types/place/wisp/fs'
11import { Firehose } from '@atproto/sync'
12import { IdResolver } from '@atproto/identity'
13import { invalidateSiteCache, markSiteAsBeingCached, unmarkSiteAsBeingCached } from './cache'
14
15const CACHE_DIR = './cache/sites'
16
17export class FirehoseWorker {
18 private firehose: Firehose | null = null
19 private idResolver: IdResolver
20 private isShuttingDown = false
21 private lastEventTime = Date.now()
22
23 constructor(
24 private logger?: (msg: string, data?: Record<string, unknown>) => void
25 ) {
26 this.idResolver = new IdResolver()
27 }
28
29 private log(msg: string, data?: Record<string, unknown>) {
30 const log = this.logger || console.log
31 log(`[FirehoseWorker] ${msg}`, data || {})
32 }
33
34 start() {
35 this.log('Starting firehose worker')
36 this.connect()
37 }
38
39 stop() {
40 this.log('Stopping firehose worker')
41 this.isShuttingDown = true
42
43 if (this.firehose) {
44 this.firehose.destroy()
45 this.firehose = null
46 }
47 }
48
49 private connect() {
50 if (this.isShuttingDown) return
51
52 this.log('Connecting to AT Protocol firehose')
53
54 this.firehose = new Firehose({
55 idResolver: this.idResolver,
56 service: 'wss://bsky.network',
57 filterCollections: ['place.wisp.fs'],
58 handleEvent: async (evt: any) => {
59 this.lastEventTime = Date.now()
60
61 // Watch for write events
62 if (evt.event === 'create' || evt.event === 'update') {
63 const record = evt.record
64
65 // If the write is a valid place.wisp.fs record
66 if (
67 evt.collection === 'place.wisp.fs' &&
68 isRecord(record) &&
69 validateRecord(record).success
70 ) {
71 this.log('Received place.wisp.fs event', {
72 did: evt.did,
73 event: evt.event,
74 rkey: evt.rkey
75 })
76
77 try {
78 await this.handleCreateOrUpdate(
79 evt.did,
80 evt.rkey,
81 record,
82 evt.cid?.toString()
83 )
84 } catch (err) {
85 console.error('Full error details:', err);
86 this.log('Error handling event', {
87 did: evt.did,
88 event: evt.event,
89 rkey: evt.rkey,
90 error:
91 err instanceof Error
92 ? err.message
93 : String(err)
94 })
95 }
96 }
97 } else if (
98 evt.event === 'delete' &&
99 evt.collection === 'place.wisp.fs'
100 ) {
101 this.log('Received delete event', {
102 did: evt.did,
103 rkey: evt.rkey
104 })
105
106 try {
107 await this.handleDelete(evt.did, evt.rkey)
108 } catch (err) {
109 this.log('Error handling delete', {
110 did: evt.did,
111 rkey: evt.rkey,
112 error:
113 err instanceof Error ? err.message : String(err)
114 })
115 }
116 }
117 },
118 onError: (err: any) => {
119 this.log('Firehose error', {
120 error: err instanceof Error ? err.message : String(err),
121 stack: err instanceof Error ? err.stack : undefined,
122 fullError: err
123 })
124 console.error('Full firehose error:', err)
125 }
126 })
127
128 this.firehose.start()
129 this.log('Firehose started')
130 }
131
132 private async handleCreateOrUpdate(
133 did: string,
134 site: string,
135 record: any,
136 eventCid?: string
137 ) {
138 this.log('Processing create/update', { did, site })
139
140 // Record is already validated in handleEvent
141 const fsRecord = record
142
143 const pdsEndpoint = await getPdsForDid(did)
144 if (!pdsEndpoint) {
145 this.log('Could not resolve PDS for DID', { did })
146 return
147 }
148
149 this.log('Resolved PDS', { did, pdsEndpoint })
150
151 // Verify record exists on PDS and fetch its CID
152 this.log('Verifying record on PDS', { did, site })
153 let verifiedCid: string
154 try {
155 const result = await fetchSiteRecord(did, site)
156
157 if (!result) {
158 this.log('Record not found on PDS, skipping cache', {
159 did,
160 site
161 })
162 return
163 }
164
165 verifiedCid = result.cid
166
167 // Verify event CID matches PDS CID (prevent cache poisoning)
168 if (eventCid && eventCid !== verifiedCid) {
169 this.log('CID mismatch detected - potential spoofed event', {
170 did,
171 site,
172 eventCid,
173 verifiedCid
174 })
175 return
176 }
177
178 this.log('Record verified on PDS', { did, site, cid: verifiedCid })
179 } catch (err) {
180 this.log('Failed to verify record on PDS', {
181 did,
182 site,
183 error: err instanceof Error ? err.message : String(err)
184 })
185 return
186 }
187
188 // Invalidate in-memory caches before updating
189 invalidateSiteCache(did, site)
190
191 // Mark site as being cached to prevent serving stale content during update
192 markSiteAsBeingCached(did, site)
193
194 try {
195 // Cache the record with verified CID (uses atomic swap internally)
196 // All instances cache locally for edge serving
197 await downloadAndCacheSite(
198 did,
199 site,
200 fsRecord,
201 pdsEndpoint,
202 verifiedCid
203 )
204
205 // Acquire distributed lock only for database write to prevent duplicate writes
206 // Note: upsertSite will check cache-only mode internally and skip if needed
207 const lockKey = `db:upsert:${did}:${site}`
208 const lockAcquired = await tryAcquireLock(lockKey)
209
210 if (!lockAcquired) {
211 this.log('Another instance is writing to DB, skipping upsert', {
212 did,
213 site
214 })
215 this.log('Successfully processed create/update (cached locally)', {
216 did,
217 site
218 })
219 return
220 }
221
222 try {
223 // Upsert site to database (only one instance does this)
224 // In cache-only mode, this will be a no-op
225 await upsertSite(did, site, fsRecord.site)
226 this.log(
227 'Successfully processed create/update (cached + DB updated)',
228 { did, site }
229 )
230 } finally {
231 // Always release lock, even if DB write fails
232 await releaseLock(lockKey)
233 }
234 } finally {
235 // Always unmark, even if caching fails
236 unmarkSiteAsBeingCached(did, site)
237 }
238 }
239
240 private async handleDelete(did: string, site: string) {
241 this.log('Processing delete', { did, site })
242
243 // All instances should delete their local cache (no lock needed)
244 const pdsEndpoint = await getPdsForDid(did)
245 if (!pdsEndpoint) {
246 this.log('Could not resolve PDS for DID', { did })
247 return
248 }
249
250 // Verify record is actually deleted from PDS
251 try {
252 const recordUrl = `${pdsEndpoint}/xrpc/com.atproto.repo.getRecord?repo=${encodeURIComponent(did)}&collection=place.wisp.fs&rkey=${encodeURIComponent(site)}`
253 const recordRes = await safeFetch(recordUrl)
254
255 if (recordRes.ok) {
256 this.log('Record still exists on PDS, not deleting cache', {
257 did,
258 site
259 })
260 return
261 }
262
263 this.log('Verified record is deleted from PDS', {
264 did,
265 site,
266 status: recordRes.status
267 })
268 } catch (err) {
269 this.log('Error verifying deletion on PDS', {
270 did,
271 site,
272 error: err instanceof Error ? err.message : String(err)
273 })
274 }
275
276 // Invalidate in-memory caches
277 invalidateSiteCache(did, site)
278
279 // Delete disk cache
280 this.deleteCache(did, site)
281
282 this.log('Successfully processed delete', { did, site })
283 }
284
285 private deleteCache(did: string, site: string) {
286 const cacheDir = `${CACHE_DIR}/${did}/${site}`
287
288 if (!existsSync(cacheDir)) {
289 this.log('Cache directory does not exist, nothing to delete', {
290 did,
291 site
292 })
293 return
294 }
295
296 try {
297 rmSync(cacheDir, { recursive: true, force: true })
298 this.log('Cache deleted', { did, site, path: cacheDir })
299 } catch (err) {
300 this.log('Failed to delete cache', {
301 did,
302 site,
303 path: cacheDir,
304 error: err instanceof Error ? err.message : String(err)
305 })
306 }
307 }
308
309 getHealth() {
310 const isConnected = this.firehose !== null
311 const timeSinceLastEvent = Date.now() - this.lastEventTime
312
313 return {
314 connected: isConnected,
315 lastEventTime: this.lastEventTime,
316 timeSinceLastEvent,
317 healthy: isConnected && timeSinceLastEvent < 300000 // 5 minutes
318 }
319 }
320}