service status on atproto
at main 4.0 kB view raw
1import { JetstreamSubscription } from "@atcute/jetstream"; 2import { 3 is, 4 parse, 5 parseCanonicalResourceUri, 6 type RecordKey, 7} from "@atcute/lexicons"; 8import { 9 SystemsGazeBarometerService, 10 SystemsGazeBarometerCheck, 11} from "barometer-lexicon"; 12import { config } from "./config"; 13import store, { type Service } from "./store"; 14import { expect, getRecord, getUri, log, type ServiceUri } from "./utils"; 15 16const subscription = new JetstreamSubscription({ 17 url: "wss://jetstream2.us-east.bsky.network", 18 wantedCollections: [ 19 "systems.gaze.barometer.service", 20 "systems.gaze.barometer.check", 21 "systems.gaze.barometer.state", 22 ], 23 wantedDids: [config.repoDid], 24}); 25 26const handleService = async ( 27 record: Record<string, unknown>, 28 rkey: RecordKey, 29) => { 30 const collection = "systems.gaze.barometer.service"; 31 const serviceRecord = parse(SystemsGazeBarometerService.mainSchema, record); 32 // we dont care if its a dangling service 33 if (!serviceRecord.hostedBy) return true; 34 const hostAtUri = expect(parseCanonicalResourceUri(serviceRecord.hostedBy)); 35 // not our host 36 if (hostAtUri.rkey !== store.hostname) return true; 37 const serviceUri = getUri(collection, rkey); 38 const service: Service = store.services.get(serviceUri) ?? { 39 record: serviceRecord, 40 checks: new Set(), 41 rkey, 42 }; 43 store.services.set(serviceUri, { 44 ...service, 45 record: serviceRecord, 46 }); 47 return false; 48}; 49 50const handleCheck = async ( 51 record: Record<string, unknown>, 52 rkey: RecordKey, 53) => { 54 const collection = "systems.gaze.barometer.check"; 55 const checkRecord = parse(SystemsGazeBarometerCheck.mainSchema, record); 56 const checkUri = getUri(collection, rkey); 57 const serviceUri = checkRecord.forService as ServiceUri; 58 const maybeService = await store.getOrFetch(serviceUri); 59 if (!maybeService.ok) { 60 log.error( 61 `can't fetch service record (${serviceUri}) for check record (${checkUri})`, 62 ); 63 return true; 64 } 65 const service = maybeService.value; 66 service.checks.add(rkey); 67 store.checks.set(checkUri, { 68 record: checkRecord, 69 rkey, 70 }); 71 store.services.set(serviceUri, service); 72 return false; 73}; 74 75export const handleEvents = async () => { 76 for await (const event of subscription) { 77 if (event.kind !== "commit") continue; 78 const { operation, collection, rkey } = event.commit; 79 // log.info(`${operation} at://${event.did}/${collection}/${rkey}`); 80 if (operation === "create" || operation === "update") { 81 const record = event.commit.record; 82 switch (collection) { 83 case "systems.gaze.barometer.service": { 84 if (await handleService(record, rkey)) continue; 85 break; 86 } 87 case "systems.gaze.barometer.check": { 88 if (await handleCheck(record, rkey)) continue; 89 break; 90 } 91 } 92 } else { 93 switch (collection) { 94 case "systems.gaze.barometer.service": { 95 const serviceUri = getUri(collection, rkey); 96 const service = store.services.get(serviceUri); 97 if (!service) continue; 98 for (const checkRkey of service.checks) { 99 store.checks.delete( 100 getUri("systems.gaze.barometer.check", checkRkey), 101 ); 102 } 103 store.services.delete(serviceUri); 104 break; 105 } 106 case "systems.gaze.barometer.check": { 107 const checkUri = getUri(collection, rkey); 108 const check = store.checks.get(checkUri); 109 if (!check) continue; 110 const serviceUri = check.record.forService as ServiceUri; 111 const service = store.services.get(serviceUri); 112 if (service) { 113 service.checks.delete(rkey); 114 store.services.set(serviceUri, service); 115 } 116 store.checks.delete(checkUri); 117 break; 118 } 119 case "systems.gaze.barometer.state": { 120 store.states.delete(getUri(collection, rkey)); 121 break; 122 } 123 } 124 } 125 } 126};