1import { JetstreamSubscription } from "@atcute/jetstream";
2import { is, parse, parseCanonicalResourceUri } from "@atcute/lexicons";
3import {
4 SystemsGazeBarometerService,
5 SystemsGazeBarometerCheck,
6 SystemsGazeBarometerState,
7} from "barometer-lexicon";
8import { config } from "./config";
9import store, { type Service } from "./store";
10import { expect, getRecord, log } from "./utils";
11
12const subscription = new JetstreamSubscription({
13 url: "wss://jetstream2.us-east.bsky.network",
14 wantedCollections: [
15 "systems.gaze.barometer.service",
16 "systems.gaze.barometer.check",
17 ],
18 wantedDids: [config.repoDid],
19});
20
21export const handleEvents = async () => {
22 for await (const event of subscription) {
23 if (event.kind !== "commit") {
24 continue;
25 }
26 const { operation, collection, rkey } = event.commit;
27 // log.info(`${operation} at://${event.did}/${collection}/${rkey}`);
28 if (operation === "create" || operation === "update") {
29 const record = event.commit.record;
30 switch (collection) {
31 case "systems.gaze.barometer.service": {
32 const serviceRecord = parse(
33 SystemsGazeBarometerService.mainSchema,
34 record,
35 );
36 // we dont care if its a dangling service
37 if (!serviceRecord.hostedBy) {
38 continue;
39 }
40 const hostAtUri = expect(
41 parseCanonicalResourceUri(serviceRecord.hostedBy),
42 );
43 // not our host
44 if (hostAtUri.rkey !== store.hostname) {
45 continue;
46 }
47 const service: Service = store.services.get(rkey) ?? {
48 record: serviceRecord,
49 checks: new Set(),
50 };
51 store.services.set(rkey, {
52 ...service,
53 record: serviceRecord,
54 });
55 break;
56 }
57 case "systems.gaze.barometer.check": {
58 const checkRecord = parse(
59 SystemsGazeBarometerCheck.mainSchema,
60 record,
61 );
62 const parsedServiceAtUri = expect(
63 parseCanonicalResourceUri(checkRecord.forService),
64 );
65 let service = store.services.get(parsedServiceAtUri.rkey);
66 if (!service) {
67 const serviceRecord = await getRecord(
68 "systems.gaze.barometer.service",
69 parsedServiceAtUri.rkey,
70 );
71 if (!serviceRecord.ok) {
72 // cant get service record
73 log.error(
74 `can't fetch service record (${checkRecord.forService}) for check record (at://${event.did}/${collection}/${rkey})`,
75 );
76 continue;
77 }
78 service = {
79 record: serviceRecord.value,
80 checks: new Set(),
81 };
82 }
83 service.checks.add(rkey);
84 store.checks.set(rkey, { record: checkRecord });
85 store.services.set(parsedServiceAtUri.rkey, service);
86 break;
87 }
88 }
89 } else {
90 switch (collection) {
91 case "systems.gaze.barometer.service": {
92 const service = store.services.get(rkey);
93 if (!service) {
94 continue;
95 }
96 for (const checkKey of service.checks) {
97 store.checks.delete(checkKey);
98 }
99 store.services.delete(rkey);
100 break;
101 }
102 case "systems.gaze.barometer.check": {
103 const check = store.checks.get(rkey);
104 if (!check) {
105 continue;
106 }
107 const parsedServiceAtUri = expect(
108 parseCanonicalResourceUri(check.record.forService),
109 );
110 const service = store.services.get(parsedServiceAtUri.rkey);
111 if (service) {
112 service.checks.delete(rkey);
113 store.services.set(parsedServiceAtUri.rkey, service);
114 }
115 store.checks.delete(rkey);
116 break;
117 }
118 }
119 }
120 }
121};