Monorepo for wisp.place. A static site hosting service built on top of the AT Protocol.
wisp.place
1import { existsSync, rmSync } from 'fs'
2import {
3 getPdsForDid,
4 downloadAndCacheSite,
5 extractBlobCid,
6 fetchSiteRecord
7} from './utils'
8import { upsertSite, tryAcquireLock, releaseLock } from './db'
9import { safeFetch } from './safe-fetch'
10import { isRecord, validateRecord } from '../lexicon/types/place/wisp/fs'
11import { Firehose } from '@atproto/sync'
12import { IdResolver } from '@atproto/identity'
13import { invalidateSiteCache, markSiteAsBeingCached, unmarkSiteAsBeingCached } from './cache'
14
15const CACHE_DIR = './cache/sites'
16
17export class FirehoseWorker {
18 private firehose: Firehose | null = null
19 private idResolver: IdResolver
20 private isShuttingDown = false
21 private lastEventTime = Date.now()
22
23 constructor(
24 private logger?: (msg: string, data?: Record<string, unknown>) => void
25 ) {
26 this.idResolver = new IdResolver()
27 }
28
29 private log(msg: string, data?: Record<string, unknown>) {
30 const log = this.logger || console.log
31 log(`[FirehoseWorker] ${msg}`, data || {})
32 }
33
34 start() {
35 this.log('Starting firehose worker')
36 this.connect()
37 }
38
39 stop() {
40 this.log('Stopping firehose worker')
41 this.isShuttingDown = true
42
43 if (this.firehose) {
44 this.firehose.destroy()
45 this.firehose = null
46 }
47 }
48
49 private connect() {
50 if (this.isShuttingDown) return
51
52 this.log('Connecting to AT Protocol firehose')
53
54 this.firehose = new Firehose({
55 idResolver: this.idResolver,
56 service: 'wss://bsky.network',
57 filterCollections: ['place.wisp.fs'],
58 handleEvent: async (evt: any) => {
59 this.lastEventTime = Date.now()
60
61 // Watch for write events
62 if (evt.event === 'create' || evt.event === 'update') {
63 const record = evt.record
64
65 // If the write is a valid place.wisp.fs record
66 if (
67 evt.collection === 'place.wisp.fs' &&
68 isRecord(record) &&
69 validateRecord(record).success
70 ) {
71 this.log('Received place.wisp.fs event', {
72 did: evt.did,
73 event: evt.event,
74 rkey: evt.rkey
75 })
76
77 try {
78 await this.handleCreateOrUpdate(
79 evt.did,
80 evt.rkey,
81 record,
82 evt.cid?.toString()
83 )
84 } catch (err) {
85 this.log('Error handling event', {
86 did: evt.did,
87 event: evt.event,
88 rkey: evt.rkey,
89 error:
90 err instanceof Error
91 ? err.message
92 : String(err)
93 })
94 }
95 }
96 } else if (
97 evt.event === 'delete' &&
98 evt.collection === 'place.wisp.fs'
99 ) {
100 this.log('Received delete event', {
101 did: evt.did,
102 rkey: evt.rkey
103 })
104
105 try {
106 await this.handleDelete(evt.did, evt.rkey)
107 } catch (err) {
108 this.log('Error handling delete', {
109 did: evt.did,
110 rkey: evt.rkey,
111 error:
112 err instanceof Error ? err.message : String(err)
113 })
114 }
115 }
116 },
117 onError: (err: any) => {
118 this.log('Firehose error', {
119 error: err instanceof Error ? err.message : String(err),
120 stack: err instanceof Error ? err.stack : undefined,
121 fullError: err
122 })
123 console.error('Full firehose error:', err)
124 }
125 })
126
127 this.firehose.start()
128 this.log('Firehose started')
129 }
130
131 private async handleCreateOrUpdate(
132 did: string,
133 site: string,
134 record: any,
135 eventCid?: string
136 ) {
137 this.log('Processing create/update', { did, site })
138
139 // Record is already validated in handleEvent
140 const fsRecord = record
141
142 const pdsEndpoint = await getPdsForDid(did)
143 if (!pdsEndpoint) {
144 this.log('Could not resolve PDS for DID', { did })
145 return
146 }
147
148 this.log('Resolved PDS', { did, pdsEndpoint })
149
150 // Verify record exists on PDS and fetch its CID
151 this.log('Verifying record on PDS', { did, site })
152 let verifiedCid: string
153 try {
154 const result = await fetchSiteRecord(did, site)
155
156 if (!result) {
157 this.log('Record not found on PDS, skipping cache', {
158 did,
159 site
160 })
161 return
162 }
163
164 verifiedCid = result.cid
165
166 // Verify event CID matches PDS CID (prevent cache poisoning)
167 if (eventCid && eventCid !== verifiedCid) {
168 this.log('CID mismatch detected - potential spoofed event', {
169 did,
170 site,
171 eventCid,
172 verifiedCid
173 })
174 return
175 }
176
177 this.log('Record verified on PDS', { did, site, cid: verifiedCid })
178 } catch (err) {
179 this.log('Failed to verify record on PDS', {
180 did,
181 site,
182 error: err instanceof Error ? err.message : String(err)
183 })
184 return
185 }
186
187 // Invalidate in-memory caches before updating
188 invalidateSiteCache(did, site)
189
190 // Mark site as being cached to prevent serving stale content during update
191 markSiteAsBeingCached(did, site)
192
193 try {
194 // Cache the record with verified CID (uses atomic swap internally)
195 // All instances cache locally for edge serving
196 await downloadAndCacheSite(
197 did,
198 site,
199 fsRecord,
200 pdsEndpoint,
201 verifiedCid
202 )
203
204 // Acquire distributed lock only for database write to prevent duplicate writes
205 // Note: upsertSite will check cache-only mode internally and skip if needed
206 const lockKey = `db:upsert:${did}:${site}`
207 const lockAcquired = await tryAcquireLock(lockKey)
208
209 if (!lockAcquired) {
210 this.log('Another instance is writing to DB, skipping upsert', {
211 did,
212 site
213 })
214 this.log('Successfully processed create/update (cached locally)', {
215 did,
216 site
217 })
218 return
219 }
220
221 try {
222 // Upsert site to database (only one instance does this)
223 // In cache-only mode, this will be a no-op
224 await upsertSite(did, site, fsRecord.site)
225 this.log(
226 'Successfully processed create/update (cached + DB updated)',
227 { did, site }
228 )
229 } finally {
230 // Always release lock, even if DB write fails
231 await releaseLock(lockKey)
232 }
233 } finally {
234 // Always unmark, even if caching fails
235 unmarkSiteAsBeingCached(did, site)
236 }
237 }
238
239 private async handleDelete(did: string, site: string) {
240 this.log('Processing delete', { did, site })
241
242 // All instances should delete their local cache (no lock needed)
243 const pdsEndpoint = await getPdsForDid(did)
244 if (!pdsEndpoint) {
245 this.log('Could not resolve PDS for DID', { did })
246 return
247 }
248
249 // Verify record is actually deleted from PDS
250 try {
251 const recordUrl = `${pdsEndpoint}/xrpc/com.atproto.repo.getRecord?repo=${encodeURIComponent(did)}&collection=place.wisp.fs&rkey=${encodeURIComponent(site)}`
252 const recordRes = await safeFetch(recordUrl)
253
254 if (recordRes.ok) {
255 this.log('Record still exists on PDS, not deleting cache', {
256 did,
257 site
258 })
259 return
260 }
261
262 this.log('Verified record is deleted from PDS', {
263 did,
264 site,
265 status: recordRes.status
266 })
267 } catch (err) {
268 this.log('Error verifying deletion on PDS', {
269 did,
270 site,
271 error: err instanceof Error ? err.message : String(err)
272 })
273 }
274
275 // Invalidate in-memory caches
276 invalidateSiteCache(did, site)
277
278 // Delete disk cache
279 this.deleteCache(did, site)
280
281 this.log('Successfully processed delete', { did, site })
282 }
283
284 private deleteCache(did: string, site: string) {
285 const cacheDir = `${CACHE_DIR}/${did}/${site}`
286
287 if (!existsSync(cacheDir)) {
288 this.log('Cache directory does not exist, nothing to delete', {
289 did,
290 site
291 })
292 return
293 }
294
295 try {
296 rmSync(cacheDir, { recursive: true, force: true })
297 this.log('Cache deleted', { did, site, path: cacheDir })
298 } catch (err) {
299 this.log('Failed to delete cache', {
300 did,
301 site,
302 path: cacheDir,
303 error: err instanceof Error ? err.message : String(err)
304 })
305 }
306 }
307
308 getHealth() {
309 const isConnected = this.firehose !== null
310 const timeSinceLastEvent = Date.now() - this.lastEventTime
311
312 return {
313 connected: isConnected,
314 lastEventTime: this.lastEventTime,
315 timeSinceLastEvent,
316 healthy: isConnected && timeSinceLastEvent < 300000 // 5 minutes
317 }
318 }
319}