service status on atproto
1import { JetstreamSubscription } from "@atcute/jetstream"; 2import { is, parse, parseCanonicalResourceUri } from "@atcute/lexicons"; 3import { 4 SystemsGazeBarometerService, 5 SystemsGazeBarometerCheck, 6 SystemsGazeBarometerState, 7} from "barometer-lexicon"; 8import { config } from "./config"; 9import store, { type Service } from "./store"; 10import { expect, getRecord, log } from "./utils"; 11 12const subscription = new JetstreamSubscription({ 13 url: "wss://jetstream2.us-east.bsky.network", 14 wantedCollections: [ 15 "systems.gaze.barometer.service", 16 "systems.gaze.barometer.check", 17 ], 18 wantedDids: [config.repoDid], 19}); 20 21export const handleEvents = async () => { 22 for await (const event of subscription) { 23 if (event.kind !== "commit") { 24 continue; 25 } 26 const { operation, collection, rkey } = event.commit; 27 // log.info(`${operation} at://${event.did}/${collection}/${rkey}`); 28 if (operation === "create" || operation === "update") { 29 const record = event.commit.record; 30 switch (collection) { 31 case "systems.gaze.barometer.service": { 32 const serviceRecord = parse( 33 SystemsGazeBarometerService.mainSchema, 34 record, 35 ); 36 // we dont care if its a dangling service 37 if (!serviceRecord.hostedBy) { 38 continue; 39 } 40 const hostAtUri = expect( 41 parseCanonicalResourceUri(serviceRecord.hostedBy), 42 ); 43 // not our host 44 if (hostAtUri.rkey !== store.hostname) { 45 continue; 46 } 47 const service: Service = store.services.get(rkey) ?? { 48 record: serviceRecord, 49 checks: new Set(), 50 }; 51 store.services.set(rkey, { 52 ...service, 53 record: serviceRecord, 54 }); 55 break; 56 } 57 case "systems.gaze.barometer.check": { 58 const checkRecord = parse( 59 SystemsGazeBarometerCheck.mainSchema, 60 record, 61 ); 62 const parsedServiceAtUri = expect( 63 parseCanonicalResourceUri(checkRecord.forService), 64 ); 65 let service = store.services.get(parsedServiceAtUri.rkey); 66 if (!service) { 67 const serviceRecord = await getRecord( 68 "systems.gaze.barometer.service", 69 parsedServiceAtUri.rkey, 70 ); 71 if (!serviceRecord.ok) { 72 // cant get service record 73 log.error( 74 `can't fetch service record (${checkRecord.forService}) for check record (at://${event.did}/${collection}/${rkey})`, 75 ); 76 continue; 77 } 78 service = { 79 record: serviceRecord.value, 80 checks: new Set(), 81 }; 82 } 83 service.checks.add(rkey); 84 store.checks.set(rkey, { record: checkRecord }); 85 store.services.set(parsedServiceAtUri.rkey, service); 86 break; 87 } 88 } 89 } else { 90 switch (collection) { 91 case "systems.gaze.barometer.service": { 92 const service = store.services.get(rkey); 93 if (!service) { 94 continue; 95 } 96 for (const checkKey of service.checks) { 97 store.checks.delete(checkKey); 98 } 99 store.services.delete(rkey); 100 break; 101 } 102 case "systems.gaze.barometer.check": { 103 const check = store.checks.get(rkey); 104 if (!check) { 105 continue; 106 } 107 const parsedServiceAtUri = expect( 108 parseCanonicalResourceUri(check.record.forService), 109 ); 110 const service = store.services.get(parsedServiceAtUri.rkey); 111 if (service) { 112 service.checks.delete(rkey); 113 store.services.set(parsedServiceAtUri.rkey, service); 114 } 115 store.checks.delete(rkey); 116 break; 117 } 118 } 119 } 120 } 121};