Monorepo for wisp.place. A static site hosting service built on top of the AT Protocol.
wisp.place
1import { existsSync, rmSync } from 'fs'
2import {
3 getPdsForDid,
4 downloadAndCacheSite,
5 extractBlobCid,
6 fetchSiteRecord
7} from './utils'
8import { upsertSite, tryAcquireLock, releaseLock } from './db'
9import { safeFetch } from './safe-fetch'
10import { isRecord, validateRecord } from '../lexicon/types/place/wisp/fs'
11import { Firehose } from '@atproto/sync'
12import { IdResolver } from '@atproto/identity'
13import { invalidateSiteCache } from './cache'
14
15const CACHE_DIR = './cache/sites'
16
17export class FirehoseWorker {
18 private firehose: Firehose | null = null
19 private idResolver: IdResolver
20 private isShuttingDown = false
21 private lastEventTime = Date.now()
22
23 constructor(
24 private logger?: (msg: string, data?: Record<string, unknown>) => void
25 ) {
26 this.idResolver = new IdResolver()
27 }
28
29 private log(msg: string, data?: Record<string, unknown>) {
30 const log = this.logger || console.log
31 log(`[FirehoseWorker] ${msg}`, data || {})
32 }
33
34 start() {
35 this.log('Starting firehose worker')
36 this.connect()
37 }
38
39 stop() {
40 this.log('Stopping firehose worker')
41 this.isShuttingDown = true
42
43 if (this.firehose) {
44 this.firehose.destroy()
45 this.firehose = null
46 }
47 }
48
49 private connect() {
50 if (this.isShuttingDown) return
51
52 this.log('Connecting to AT Protocol firehose')
53
54 this.firehose = new Firehose({
55 idResolver: this.idResolver,
56 service: 'wss://bsky.network',
57 filterCollections: ['place.wisp.fs'],
58 handleEvent: async (evt: any) => {
59 this.lastEventTime = Date.now()
60
61 // Watch for write events
62 if (evt.event === 'create' || evt.event === 'update') {
63 const record = evt.record
64
65 // If the write is a valid place.wisp.fs record
66 if (
67 evt.collection === 'place.wisp.fs' &&
68 isRecord(record) &&
69 validateRecord(record).success
70 ) {
71 this.log('Received place.wisp.fs event', {
72 did: evt.did,
73 event: evt.event,
74 rkey: evt.rkey
75 })
76
77 try {
78 await this.handleCreateOrUpdate(
79 evt.did,
80 evt.rkey,
81 record,
82 evt.cid?.toString()
83 )
84 } catch (err) {
85 this.log('Error handling event', {
86 did: evt.did,
87 event: evt.event,
88 rkey: evt.rkey,
89 error:
90 err instanceof Error
91 ? err.message
92 : String(err)
93 })
94 }
95 }
96 } else if (
97 evt.event === 'delete' &&
98 evt.collection === 'place.wisp.fs'
99 ) {
100 this.log('Received delete event', {
101 did: evt.did,
102 rkey: evt.rkey
103 })
104
105 try {
106 await this.handleDelete(evt.did, evt.rkey)
107 } catch (err) {
108 this.log('Error handling delete', {
109 did: evt.did,
110 rkey: evt.rkey,
111 error:
112 err instanceof Error ? err.message : String(err)
113 })
114 }
115 }
116 },
117 onError: (err: any) => {
118 this.log('Firehose error', {
119 error: err instanceof Error ? err.message : String(err),
120 stack: err instanceof Error ? err.stack : undefined,
121 fullError: err
122 })
123 console.error('Full firehose error:', err)
124 }
125 })
126
127 this.firehose.start()
128 this.log('Firehose started')
129 }
130
131 private async handleCreateOrUpdate(
132 did: string,
133 site: string,
134 record: any,
135 eventCid?: string
136 ) {
137 this.log('Processing create/update', { did, site })
138
139 // Record is already validated in handleEvent
140 const fsRecord = record
141
142 const pdsEndpoint = await getPdsForDid(did)
143 if (!pdsEndpoint) {
144 this.log('Could not resolve PDS for DID', { did })
145 return
146 }
147
148 this.log('Resolved PDS', { did, pdsEndpoint })
149
150 // Verify record exists on PDS and fetch its CID
151 let verifiedCid: string
152 try {
153 const result = await fetchSiteRecord(did, site)
154
155 if (!result) {
156 this.log('Record not found on PDS, skipping cache', {
157 did,
158 site
159 })
160 return
161 }
162
163 verifiedCid = result.cid
164
165 // Verify event CID matches PDS CID (prevent cache poisoning)
166 if (eventCid && eventCid !== verifiedCid) {
167 this.log('CID mismatch detected - potential spoofed event', {
168 did,
169 site,
170 eventCid,
171 verifiedCid
172 })
173 return
174 }
175
176 this.log('Record verified on PDS', { did, site, cid: verifiedCid })
177 } catch (err) {
178 this.log('Failed to verify record on PDS', {
179 did,
180 site,
181 error: err instanceof Error ? err.message : String(err)
182 })
183 return
184 }
185
186 // Invalidate in-memory caches before updating
187 invalidateSiteCache(did, site)
188
189 // Cache the record with verified CID (uses atomic swap internally)
190 // All instances cache locally for edge serving
191 await downloadAndCacheSite(
192 did,
193 site,
194 fsRecord,
195 pdsEndpoint,
196 verifiedCid
197 )
198
199 // Acquire distributed lock only for database write to prevent duplicate writes
200 // Note: upsertSite will check cache-only mode internally and skip if needed
201 const lockKey = `db:upsert:${did}:${site}`
202 const lockAcquired = await tryAcquireLock(lockKey)
203
204 if (!lockAcquired) {
205 this.log('Another instance is writing to DB, skipping upsert', {
206 did,
207 site
208 })
209 this.log('Successfully processed create/update (cached locally)', {
210 did,
211 site
212 })
213 return
214 }
215
216 try {
217 // Upsert site to database (only one instance does this)
218 // In cache-only mode, this will be a no-op
219 await upsertSite(did, site, fsRecord.site)
220 this.log(
221 'Successfully processed create/update (cached + DB updated)',
222 { did, site }
223 )
224 } finally {
225 // Always release lock, even if DB write fails
226 await releaseLock(lockKey)
227 }
228 }
229
230 private async handleDelete(did: string, site: string) {
231 this.log('Processing delete', { did, site })
232
233 // All instances should delete their local cache (no lock needed)
234 const pdsEndpoint = await getPdsForDid(did)
235 if (!pdsEndpoint) {
236 this.log('Could not resolve PDS for DID', { did })
237 return
238 }
239
240 // Verify record is actually deleted from PDS
241 try {
242 const recordUrl = `${pdsEndpoint}/xrpc/com.atproto.repo.getRecord?repo=${encodeURIComponent(did)}&collection=place.wisp.fs&rkey=${encodeURIComponent(site)}`
243 const recordRes = await safeFetch(recordUrl)
244
245 if (recordRes.ok) {
246 this.log('Record still exists on PDS, not deleting cache', {
247 did,
248 site
249 })
250 return
251 }
252
253 this.log('Verified record is deleted from PDS', {
254 did,
255 site,
256 status: recordRes.status
257 })
258 } catch (err) {
259 this.log('Error verifying deletion on PDS', {
260 did,
261 site,
262 error: err instanceof Error ? err.message : String(err)
263 })
264 }
265
266 // Invalidate in-memory caches
267 invalidateSiteCache(did, site)
268
269 // Delete disk cache
270 this.deleteCache(did, site)
271
272 this.log('Successfully processed delete', { did, site })
273 }
274
275 private deleteCache(did: string, site: string) {
276 const cacheDir = `${CACHE_DIR}/${did}/${site}`
277
278 if (!existsSync(cacheDir)) {
279 this.log('Cache directory does not exist, nothing to delete', {
280 did,
281 site
282 })
283 return
284 }
285
286 try {
287 rmSync(cacheDir, { recursive: true, force: true })
288 this.log('Cache deleted', { did, site, path: cacheDir })
289 } catch (err) {
290 this.log('Failed to delete cache', {
291 did,
292 site,
293 path: cacheDir,
294 error: err instanceof Error ? err.message : String(err)
295 })
296 }
297 }
298
299 getHealth() {
300 const isConnected = this.firehose !== null
301 const timeSinceLastEvent = Date.now() - this.lastEventTime
302
303 return {
304 connected: isConnected,
305 lastEventTime: this.lastEventTime,
306 timeSinceLastEvent,
307 healthy: isConnected && timeSinceLastEvent < 300000 // 5 minutes
308 }
309 }
310}