forked from
nekomimi.pet/wisp.place-monorepo
Monorepo for Wisp.place. A static site hosting service built on top of the AT Protocol.
1import { existsSync, rmSync } from 'fs';
2import { getPdsForDid, downloadAndCacheSite, extractBlobCid, fetchSiteRecord } from './utils';
3import { upsertSite, tryAcquireLock, releaseLock } from './db';
4import { safeFetch } from './safe-fetch';
5import { isRecord, validateRecord } from '../lexicon/types/place/wisp/fs';
6import { Firehose } from '@atproto/sync';
7import { IdResolver } from '@atproto/identity';
8
9const CACHE_DIR = './cache/sites';
10
11export class FirehoseWorker {
12 private firehose: Firehose | null = null;
13 private idResolver: IdResolver;
14 private isShuttingDown = false;
15 private lastEventTime = Date.now();
16
17 constructor(
18 private logger?: (msg: string, data?: Record<string, unknown>) => void,
19 ) {
20 this.idResolver = new IdResolver();
21 }
22
23 private log(msg: string, data?: Record<string, unknown>) {
24 const log = this.logger || console.log;
25 log(`[FirehoseWorker] ${msg}`, data || {});
26 }
27
28 start() {
29 this.log('Starting firehose worker');
30 this.connect();
31 }
32
33 stop() {
34 this.log('Stopping firehose worker');
35 this.isShuttingDown = true;
36
37 if (this.firehose) {
38 this.firehose.destroy();
39 this.firehose = null;
40 }
41 }
42
43 private connect() {
44 if (this.isShuttingDown) return;
45
46 this.log('Connecting to AT Protocol firehose');
47
48 this.firehose = new Firehose({
49 idResolver: this.idResolver,
50 service: 'wss://bsky.network',
51 filterCollections: ['place.wisp.fs'],
52 handleEvent: async (evt: any) => {
53 this.lastEventTime = Date.now();
54
55 // Watch for write events
56 if (evt.event === 'create' || evt.event === 'update') {
57 const record = evt.record;
58
59 // If the write is a valid place.wisp.fs record
60 if (
61 evt.collection === 'place.wisp.fs' &&
62 isRecord(record) &&
63 validateRecord(record).success
64 ) {
65 this.log('Received place.wisp.fs event', {
66 did: evt.did,
67 event: evt.event,
68 rkey: evt.rkey,
69 });
70
71 try {
72 await this.handleCreateOrUpdate(evt.did, evt.rkey, record, evt.cid?.toString());
73 } catch (err) {
74 this.log('Error handling event', {
75 did: evt.did,
76 event: evt.event,
77 rkey: evt.rkey,
78 error: err instanceof Error ? err.message : String(err),
79 });
80 }
81 }
82 } else if (evt.event === 'delete' && evt.collection === 'place.wisp.fs') {
83 this.log('Received delete event', {
84 did: evt.did,
85 rkey: evt.rkey,
86 });
87
88 try {
89 await this.handleDelete(evt.did, evt.rkey);
90 } catch (err) {
91 this.log('Error handling delete', {
92 did: evt.did,
93 rkey: evt.rkey,
94 error: err instanceof Error ? err.message : String(err),
95 });
96 }
97 }
98 },
99 onError: (err: any) => {
100 this.log('Firehose error', {
101 error: err instanceof Error ? err.message : String(err),
102 stack: err instanceof Error ? err.stack : undefined,
103 fullError: err,
104 });
105 console.error('Full firehose error:', err);
106 },
107 });
108
109 this.firehose.start();
110 this.log('Firehose started');
111 }
112
113 private async handleCreateOrUpdate(did: string, site: string, record: any, eventCid?: string) {
114 this.log('Processing create/update', { did, site });
115
116 // Record is already validated in handleEvent
117 const fsRecord = record;
118
119 const pdsEndpoint = await getPdsForDid(did);
120 if (!pdsEndpoint) {
121 this.log('Could not resolve PDS for DID', { did });
122 return;
123 }
124
125 this.log('Resolved PDS', { did, pdsEndpoint });
126
127 // Verify record exists on PDS and fetch its CID
128 let verifiedCid: string;
129 try {
130 const result = await fetchSiteRecord(did, site);
131
132 if (!result) {
133 this.log('Record not found on PDS, skipping cache', { did, site });
134 return;
135 }
136
137 verifiedCid = result.cid;
138
139 // Verify event CID matches PDS CID (prevent cache poisoning)
140 if (eventCid && eventCid !== verifiedCid) {
141 this.log('CID mismatch detected - potential spoofed event', {
142 did,
143 site,
144 eventCid,
145 verifiedCid
146 });
147 return;
148 }
149
150 this.log('Record verified on PDS', { did, site, cid: verifiedCid });
151 } catch (err) {
152 this.log('Failed to verify record on PDS', {
153 did,
154 site,
155 error: err instanceof Error ? err.message : String(err),
156 });
157 return;
158 }
159
160 // Cache the record with verified CID (uses atomic swap internally)
161 // All instances cache locally for edge serving
162 await downloadAndCacheSite(did, site, fsRecord, pdsEndpoint, verifiedCid);
163
164 // Acquire distributed lock only for database write to prevent duplicate writes
165 const lockKey = `db:upsert:${did}:${site}`;
166 const lockAcquired = await tryAcquireLock(lockKey);
167
168 if (!lockAcquired) {
169 this.log('Another instance is writing to DB, skipping upsert', { did, site });
170 this.log('Successfully processed create/update (cached locally)', { did, site });
171 return;
172 }
173
174 try {
175 // Upsert site to database (only one instance does this)
176 await upsertSite(did, site, fsRecord.site);
177 this.log('Successfully processed create/update (cached + DB updated)', { did, site });
178 } finally {
179 // Always release lock, even if DB write fails
180 await releaseLock(lockKey);
181 }
182 }
183
184 private async handleDelete(did: string, site: string) {
185 this.log('Processing delete', { did, site });
186
187 // All instances should delete their local cache (no lock needed)
188 const pdsEndpoint = await getPdsForDid(did);
189 if (!pdsEndpoint) {
190 this.log('Could not resolve PDS for DID', { did });
191 return;
192 }
193
194 // Verify record is actually deleted from PDS
195 try {
196 const recordUrl = `${pdsEndpoint}/xrpc/com.atproto.repo.getRecord?repo=${encodeURIComponent(did)}&collection=place.wisp.fs&rkey=${encodeURIComponent(site)}`;
197 const recordRes = await safeFetch(recordUrl);
198
199 if (recordRes.ok) {
200 this.log('Record still exists on PDS, not deleting cache', {
201 did,
202 site,
203 });
204 return;
205 }
206
207 this.log('Verified record is deleted from PDS', {
208 did,
209 site,
210 status: recordRes.status,
211 });
212 } catch (err) {
213 this.log('Error verifying deletion on PDS', {
214 did,
215 site,
216 error: err instanceof Error ? err.message : String(err),
217 });
218 }
219
220 // Delete cache
221 this.deleteCache(did, site);
222
223 this.log('Successfully processed delete', { did, site });
224 }
225
226 private deleteCache(did: string, site: string) {
227 const cacheDir = `${CACHE_DIR}/${did}/${site}`;
228
229 if (!existsSync(cacheDir)) {
230 this.log('Cache directory does not exist, nothing to delete', {
231 did,
232 site,
233 });
234 return;
235 }
236
237 try {
238 rmSync(cacheDir, { recursive: true, force: true });
239 this.log('Cache deleted', { did, site, path: cacheDir });
240 } catch (err) {
241 this.log('Failed to delete cache', {
242 did,
243 site,
244 path: cacheDir,
245 error: err instanceof Error ? err.message : String(err),
246 });
247 }
248 }
249
250 getHealth() {
251 const isConnected = this.firehose !== null;
252 const timeSinceLastEvent = Date.now() - this.lastEventTime;
253
254 return {
255 connected: isConnected,
256 lastEventTime: this.lastEventTime,
257 timeSinceLastEvent,
258 healthy: isConnected && timeSinceLastEvent < 300000, // 5 minutes
259 };
260 }
261}