1import { JetstreamSubscription } from "@atcute/jetstream";
2import {
3 is,
4 parse,
5 parseCanonicalResourceUri,
6 type RecordKey,
7} from "@atcute/lexicons";
8import {
9 SystemsGazeBarometerService,
10 SystemsGazeBarometerCheck,
11} from "barometer-lexicon";
12import { config } from "./config";
13import store, { type Service } from "./store";
14import { expect, getRecord, getUri, log, type ServiceUri } from "./utils";
15
16const subscription = new JetstreamSubscription({
17 url: "wss://jetstream2.us-east.bsky.network",
18 wantedCollections: [
19 "systems.gaze.barometer.service",
20 "systems.gaze.barometer.check",
21 "systems.gaze.barometer.state",
22 ],
23 wantedDids: [config.repoDid],
24});
25
26const handleService = async (
27 record: Record<string, unknown>,
28 rkey: RecordKey,
29) => {
30 const collection = "systems.gaze.barometer.service";
31 const serviceRecord = parse(SystemsGazeBarometerService.mainSchema, record);
32 // we dont care if its a dangling service
33 if (!serviceRecord.hostedBy) return true;
34 const hostAtUri = expect(parseCanonicalResourceUri(serviceRecord.hostedBy));
35 // not our host
36 if (hostAtUri.rkey !== store.hostname) return true;
37 const serviceUri = getUri(collection, rkey);
38 const service: Service = store.services.get(serviceUri) ?? {
39 record: serviceRecord,
40 checks: new Set(),
41 rkey,
42 };
43 store.services.set(serviceUri, {
44 ...service,
45 record: serviceRecord,
46 });
47 return false;
48};
49
50const handleCheck = async (
51 record: Record<string, unknown>,
52 rkey: RecordKey,
53) => {
54 const collection = "systems.gaze.barometer.check";
55 const checkRecord = parse(SystemsGazeBarometerCheck.mainSchema, record);
56 const checkUri = getUri(collection, rkey);
57 const serviceUri = checkRecord.forService as ServiceUri;
58 const maybeService = await store.getOrFetch(serviceUri);
59 if (!maybeService.ok) {
60 log.error(
61 `can't fetch service record (${serviceUri}) for check record (${checkUri})`,
62 );
63 return true;
64 }
65 const service = maybeService.value;
66 service.checks.add(rkey);
67 store.checks.set(checkUri, {
68 record: checkRecord,
69 rkey,
70 });
71 store.services.set(serviceUri, service);
72 return false;
73};
74
75export const handleEvents = async () => {
76 for await (const event of subscription) {
77 if (event.kind !== "commit") continue;
78 const { operation, collection, rkey } = event.commit;
79 // log.info(`${operation} at://${event.did}/${collection}/${rkey}`);
80 if (operation === "create" || operation === "update") {
81 const record = event.commit.record;
82 switch (collection) {
83 case "systems.gaze.barometer.service": {
84 if (await handleService(record, rkey)) continue;
85 break;
86 }
87 case "systems.gaze.barometer.check": {
88 if (await handleCheck(record, rkey)) continue;
89 break;
90 }
91 }
92 } else {
93 switch (collection) {
94 case "systems.gaze.barometer.service": {
95 const serviceUri = getUri(collection, rkey);
96 const service = store.services.get(serviceUri);
97 if (!service) continue;
98 for (const checkRkey of service.checks) {
99 store.checks.delete(
100 getUri("systems.gaze.barometer.check", checkRkey),
101 );
102 }
103 store.services.delete(serviceUri);
104 break;
105 }
106 case "systems.gaze.barometer.check": {
107 const checkUri = getUri(collection, rkey);
108 const check = store.checks.get(checkUri);
109 if (!check) continue;
110 const serviceUri = check.record.forService as ServiceUri;
111 const service = store.services.get(serviceUri);
112 if (service) {
113 service.checks.delete(rkey);
114 store.services.set(serviceUri, service);
115 }
116 store.checks.delete(checkUri);
117 break;
118 }
119 case "systems.gaze.barometer.state": {
120 store.states.delete(getUri(collection, rkey));
121 break;
122 }
123 }
124 }
125 }
126};