Monorepo for wisp.place. A static site hosting service built on top of the AT Protocol.
wisp.place
1import { existsSync, rmSync } from 'fs'
2import {
3 getPdsForDid,
4 downloadAndCacheSite,
5 extractBlobCid,
6 fetchSiteRecord
7} from './utils'
8import { upsertSite, tryAcquireLock, releaseLock } from './db'
9import { safeFetch } from './safe-fetch'
10import { isRecord, validateRecord } from '../lexicon/types/place/wisp/fs'
11import { Firehose } from '@atproto/sync'
12import { IdResolver } from '@atproto/identity'
13import { invalidateSiteCache } from './cache'
14
15const CACHE_DIR = './cache/sites'
16
17export class FirehoseWorker {
18 private firehose: Firehose | null = null
19 private idResolver: IdResolver
20 private isShuttingDown = false
21 private lastEventTime = Date.now()
22
23 constructor(
24 private logger?: (msg: string, data?: Record<string, unknown>) => void
25 ) {
26 this.idResolver = new IdResolver()
27 }
28
29 private log(msg: string, data?: Record<string, unknown>) {
30 const log = this.logger || console.log
31 log(`[FirehoseWorker] ${msg}`, data || {})
32 }
33
34 start() {
35 this.log('Starting firehose worker')
36 this.connect()
37 }
38
39 stop() {
40 this.log('Stopping firehose worker')
41 this.isShuttingDown = true
42
43 if (this.firehose) {
44 this.firehose.destroy()
45 this.firehose = null
46 }
47 }
48
49 private connect() {
50 if (this.isShuttingDown) return
51
52 this.log('Connecting to AT Protocol firehose')
53
54 this.firehose = new Firehose({
55 idResolver: this.idResolver,
56 service: 'wss://bsky.network',
57 filterCollections: ['place.wisp.fs'],
58 handleEvent: async (evt: any) => {
59 this.lastEventTime = Date.now()
60
61 // Watch for write events
62 if (evt.event === 'create' || evt.event === 'update') {
63 const record = evt.record
64
65 // If the write is a valid place.wisp.fs record
66 if (
67 evt.collection === 'place.wisp.fs' &&
68 isRecord(record) &&
69 validateRecord(record).success
70 ) {
71 this.log('Received place.wisp.fs event', {
72 did: evt.did,
73 event: evt.event,
74 rkey: evt.rkey
75 })
76
77 try {
78 await this.handleCreateOrUpdate(
79 evt.did,
80 evt.rkey,
81 record,
82 evt.cid?.toString()
83 )
84 } catch (err) {
85 this.log('Error handling event', {
86 did: evt.did,
87 event: evt.event,
88 rkey: evt.rkey,
89 error:
90 err instanceof Error
91 ? err.message
92 : String(err)
93 })
94 }
95 }
96 } else if (
97 evt.event === 'delete' &&
98 evt.collection === 'place.wisp.fs'
99 ) {
100 this.log('Received delete event', {
101 did: evt.did,
102 rkey: evt.rkey
103 })
104
105 try {
106 await this.handleDelete(evt.did, evt.rkey)
107 } catch (err) {
108 this.log('Error handling delete', {
109 did: evt.did,
110 rkey: evt.rkey,
111 error:
112 err instanceof Error ? err.message : String(err)
113 })
114 }
115 }
116 },
117 onError: (err: any) => {
118 this.log('Firehose error', {
119 error: err instanceof Error ? err.message : String(err),
120 stack: err instanceof Error ? err.stack : undefined,
121 fullError: err
122 })
123 console.error('Full firehose error:', err)
124 }
125 })
126
127 this.firehose.start()
128 this.log('Firehose started')
129 }
130
131 private async handleCreateOrUpdate(
132 did: string,
133 site: string,
134 record: any,
135 eventCid?: string
136 ) {
137 this.log('Processing create/update', { did, site })
138
139 // Record is already validated in handleEvent
140 const fsRecord = record
141
142 const pdsEndpoint = await getPdsForDid(did)
143 if (!pdsEndpoint) {
144 this.log('Could not resolve PDS for DID', { did })
145 return
146 }
147
148 this.log('Resolved PDS', { did, pdsEndpoint })
149
150 // Verify record exists on PDS and fetch its CID
151 this.log('Verifying record on PDS', { did, site })
152 let verifiedCid: string
153 try {
154 const result = await fetchSiteRecord(did, site)
155
156 if (!result) {
157 this.log('Record not found on PDS, skipping cache', {
158 did,
159 site
160 })
161 return
162 }
163
164 verifiedCid = result.cid
165
166 // Verify event CID matches PDS CID (prevent cache poisoning)
167 if (eventCid && eventCid !== verifiedCid) {
168 this.log('CID mismatch detected - potential spoofed event', {
169 did,
170 site,
171 eventCid,
172 verifiedCid
173 })
174 return
175 }
176
177 this.log('Record verified on PDS', { did, site, cid: verifiedCid })
178 } catch (err) {
179 this.log('Failed to verify record on PDS', {
180 did,
181 site,
182 error: err instanceof Error ? err.message : String(err)
183 })
184 return
185 }
186
187 // Invalidate in-memory caches before updating
188 invalidateSiteCache(did, site)
189
190 // Cache the record with verified CID (uses atomic swap internally)
191 // All instances cache locally for edge serving
192 await downloadAndCacheSite(
193 did,
194 site,
195 fsRecord,
196 pdsEndpoint,
197 verifiedCid
198 )
199
200 // Acquire distributed lock only for database write to prevent duplicate writes
201 // Note: upsertSite will check cache-only mode internally and skip if needed
202 const lockKey = `db:upsert:${did}:${site}`
203 const lockAcquired = await tryAcquireLock(lockKey)
204
205 if (!lockAcquired) {
206 this.log('Another instance is writing to DB, skipping upsert', {
207 did,
208 site
209 })
210 this.log('Successfully processed create/update (cached locally)', {
211 did,
212 site
213 })
214 return
215 }
216
217 try {
218 // Upsert site to database (only one instance does this)
219 // In cache-only mode, this will be a no-op
220 await upsertSite(did, site, fsRecord.site)
221 this.log(
222 'Successfully processed create/update (cached + DB updated)',
223 { did, site }
224 )
225 } finally {
226 // Always release lock, even if DB write fails
227 await releaseLock(lockKey)
228 }
229 }
230
231 private async handleDelete(did: string, site: string) {
232 this.log('Processing delete', { did, site })
233
234 // All instances should delete their local cache (no lock needed)
235 const pdsEndpoint = await getPdsForDid(did)
236 if (!pdsEndpoint) {
237 this.log('Could not resolve PDS for DID', { did })
238 return
239 }
240
241 // Verify record is actually deleted from PDS
242 try {
243 const recordUrl = `${pdsEndpoint}/xrpc/com.atproto.repo.getRecord?repo=${encodeURIComponent(did)}&collection=place.wisp.fs&rkey=${encodeURIComponent(site)}`
244 const recordRes = await safeFetch(recordUrl)
245
246 if (recordRes.ok) {
247 this.log('Record still exists on PDS, not deleting cache', {
248 did,
249 site
250 })
251 return
252 }
253
254 this.log('Verified record is deleted from PDS', {
255 did,
256 site,
257 status: recordRes.status
258 })
259 } catch (err) {
260 this.log('Error verifying deletion on PDS', {
261 did,
262 site,
263 error: err instanceof Error ? err.message : String(err)
264 })
265 }
266
267 // Invalidate in-memory caches
268 invalidateSiteCache(did, site)
269
270 // Delete disk cache
271 this.deleteCache(did, site)
272
273 this.log('Successfully processed delete', { did, site })
274 }
275
276 private deleteCache(did: string, site: string) {
277 const cacheDir = `${CACHE_DIR}/${did}/${site}`
278
279 if (!existsSync(cacheDir)) {
280 this.log('Cache directory does not exist, nothing to delete', {
281 did,
282 site
283 })
284 return
285 }
286
287 try {
288 rmSync(cacheDir, { recursive: true, force: true })
289 this.log('Cache deleted', { did, site, path: cacheDir })
290 } catch (err) {
291 this.log('Failed to delete cache', {
292 did,
293 site,
294 path: cacheDir,
295 error: err instanceof Error ? err.message : String(err)
296 })
297 }
298 }
299
300 getHealth() {
301 const isConnected = this.firehose !== null
302 const timeSinceLastEvent = Date.now() - this.lastEventTime
303
304 return {
305 connected: isConnected,
306 lastEventTime: this.lastEventTime,
307 timeSinceLastEvent,
308 healthy: isConnected && timeSinceLastEvent < 300000 // 5 minutes
309 }
310 }
311}