service status on atproto

feat: handle relevant events from the jetstream

ptr.pet 7ad815b1 7ed262bd

verified
Changed files
+171 -10
proxy
+15
proxy/bun.lock
···
"@atcute/client": "^4.0.3",
"@atcute/identity": "^1.0.3",
"@atcute/identity-resolver": "^1.1.3",
+
"@atcute/jetstream": "^1.0.2",
"@atcute/lexicons": "^1.1.0",
"@atcute/tid": "^1.0.2",
"barometer-lexicon": "file:../lib",
···
"@atcute/identity-resolver": ["@atcute/identity-resolver@1.1.3", "", { "dependencies": { "@atcute/lexicons": "^1.0.4", "@atcute/util-fetch": "^1.0.1", "@badrap/valita": "^0.4.4" }, "peerDependencies": { "@atcute/identity": "^1.0.0" } }, "sha512-KZgGgg99CWaV7Df3+h3X/WMrDzTPQVfsaoIVbTNLx2B56BvCL2EmaxPSVw/7BFUJMZHlVU4rtoEB4lyvNyMswA=="],
+
"@atcute/jetstream": ["@atcute/jetstream@1.0.2", "", { "dependencies": { "@atcute/lexicons": "^1.0.2", "@badrap/valita": "^0.4.2", "@mary-ext/event-iterator": "^1.0.0", "@mary-ext/simple-event-emitter": "^1.0.0", "partysocket": "^1.1.4", "type-fest": "^4.41.0", "yocto-queue": "^1.2.1" } }, "sha512-ZtdNNxl4zq9cgUpXSL9F+AsXUZt0Zuyj0V7974D7LxdMxfTItPnMZ9dRG8GoFkkGz3+pszdsG888Ix8C0F2+mA=="],
+
"@atcute/lex-cli": ["@atcute/lex-cli@2.1.1", "", { "dependencies": { "@atcute/lexicon-doc": "^1.0.2", "@badrap/valita": "^0.4.5", "@externdefs/collider": "^0.3.0", "picocolors": "^1.1.1", "prettier": "^3.5.3" }, "bin": { "lex-cli": "cli.mjs" } }, "sha512-QaR0sOP8Z24opGHKsSfleDbP/ahUb6HECkVaOqSwG7ORZzbLK1w0265o1BRjCVr2dT6FxlsMUa2Ge85JMA9bxg=="],
"@atcute/lexicon-doc": ["@atcute/lexicon-doc@1.0.3", "", { "dependencies": { "@badrap/valita": "^0.4.5" } }, "sha512-U7rinsTOwXGGcrF6/s7GzTXargcQpDr4BTrj5ci/XTK+POEK5jpcI+Ag1fF932pBX3k97em6y4TWwTSO8M/McQ=="],
···
"@externdefs/collider": ["@externdefs/collider@0.3.0", "", { "peerDependencies": { "@badrap/valita": "^0.4.4" } }, "sha512-x5CpeZ4c8n+1wMFthUMWSQKqCGcQo52/Qbda5ES+JFRRg/D8Ep6/JOvUUq5HExFuv/wW+6UYG2U/mXzw0IAd8Q=="],
+
"@mary-ext/event-iterator": ["@mary-ext/event-iterator@1.0.0", "", { "dependencies": { "yocto-queue": "^1.2.1" } }, "sha512-l6gCPsWJ8aRCe/s7/oCmero70kDHgIK5m4uJvYgwEYTqVxoBOIXbKr5tnkLqUHEg6mNduB4IWvms3h70Hp9ADQ=="],
+
+
"@mary-ext/simple-event-emitter": ["@mary-ext/simple-event-emitter@1.0.0", "", {}, "sha512-meA/zJZKIN1RVBNEYIbjufkUrW7/tRjHH60FjolpG1ixJKo76TB208qefQLNdOVDA7uIG0CGEDuhmMirtHKLAg=="],
+
"@types/bun": ["@types/bun@1.2.18", "", { "dependencies": { "bun-types": "1.2.18" } }, "sha512-Xf6RaWVheyemaThV0kUfaAUvCNokFr+bH8Jxp+tTZfx7dAPA8z9ePnP9S9+Vspzuxxx9JRAXhnyccRj3GyCMdQ=="],
"@types/node": ["@types/node@24.0.13", "", { "dependencies": { "undici-types": "~7.8.0" } }, "sha512-Qm9OYVOFHFYg3wJoTSrz80hoec5Lia/dPp84do3X7dZvLikQvM1YpmvTBEdIr/e+U8HTkFjLHLnl78K/qjf+jQ=="],
···
"esm-env": ["esm-env@1.2.2", "", {}, "sha512-Epxrv+Nr/CaL4ZcFGPJIYLWFom+YeV1DqMLHJoEd9SYRxNbaFruBwfEX/kkHUJf55j2+TUbmDcmuilbP1TmXHA=="],
+
"event-target-polyfill": ["event-target-polyfill@0.0.4", "", {}, "sha512-Gs6RLjzlLRdT8X9ZipJdIZI/Y6/HhRLyq9RdDlCsnpxr/+Nn6bU2EFGuC94GjxqhM+Nmij2Vcq98yoHrU8uNFQ=="],
+
"get-caller-file": ["get-caller-file@2.0.5", "", {}, "sha512-DyFP3BM/3YHTQOCUL/w0OZHR0lpKeGrxotcHWcqNEdnltqFwXVfhEBQ94eIo34AfQpo0rGki4cyIiftY06h2Fg=="],
"has-flag": ["has-flag@4.0.0", "", {}, "sha512-EykJT/Q1KjTWctppgIAgfSO0tKVuZUjhgMr17kqTumMl6Afv3EISleU7qZUzoXDFTAHTDC4NOoG/ZxU3EvlMPQ=="],
···
"lodash": ["lodash@4.17.21", "", {}, "sha512-v2kDEe57lecTulaDIuNTPy3Ry4gLGJ6Z1O3vE1krgXZNrsQ+LFTGHVxVjcXPs17LhbZVGedAJv8XZ1tvj5FvSg=="],
"parsimmon": ["parsimmon@1.18.1", "", {}, "sha512-u7p959wLfGAhJpSDJVYXoyMCXWYwHia78HhRBWqk7AIbxdmlrfdp5wX0l3xv/iTSH5HvhN9K7o26hwwpgS5Nmw=="],
+
+
"partysocket": ["partysocket@1.1.4", "", { "dependencies": { "event-target-polyfill": "^0.0.4" } }, "sha512-jXP7PFj2h5/v4UjDS8P7MZy6NJUQ7sspiFyxL4uc/+oKOL+KdtXzHnTV8INPGxBrLTXgalyG3kd12Qm7WrYc3A=="],
"picocolors": ["picocolors@1.1.1", "", {}, "sha512-xceH2snhtb5M9liqDsmEw56le376mTZkEX/jEb/RxNFyegNul7eNslCXP9FDj/Lcu0X8KEyMceP2ntpaHrDEVA=="],
···
"tslib": ["tslib@2.8.1", "", {}, "sha512-oJFu94HQb+KVduSUQL7wnpmqnfmLsOA/nAh6b6EH0wCEoK0/mPeXU6c3wKDV83MkOuHPRHtSXKKU99IBazS/2w=="],
+
"type-fest": ["type-fest@4.41.0", "", {}, "sha512-TeTSQ6H5YHvpqVwBRcnLDCBnDOHWYu7IvGbHT6N8AOymcr9PJGjc1GTtiWZTYg0NCgYwvnYWEkVChQAr9bjfwA=="],
+
"typescript": ["typescript@5.8.3", "", { "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" } }, "sha512-p1diW6TqL9L07nNxvRMM7hMMw4c5XOo/1ibL4aAIGmSAt9slTE1Xgw5KWuof2uTOvCg9BY7ZRi+GaF+7sfgPeQ=="],
"undici-types": ["undici-types@7.8.0", "", {}, "sha512-9UJ2xGDvQ43tYyVMpuHlsgApydB8ZKfVYTsLDhXkFL/6gfkp+U8xTGdh8pMJv1SpZna0zxG1DwsKZsreLbXBxw=="],
···
"yargs": ["yargs@17.7.2", "", { "dependencies": { "cliui": "^8.0.1", "escalade": "^3.1.1", "get-caller-file": "^2.0.5", "require-directory": "^2.1.1", "string-width": "^4.2.3", "y18n": "^5.0.5", "yargs-parser": "^21.1.1" } }, "sha512-7dSzzRQ++CKnNI/krKnYRV7JKKPUXMEh61soaHKg9mrWEhzFWhFnxPxGl+69cD1Ou63C13NUPCnmIcrvqCuM6w=="],
"yargs-parser": ["yargs-parser@21.1.1", "", {}, "sha512-tVpsJW7DdjecAiFpbIB1e3qxIQsE6NoPc5/eTdrbbIC4h0LVsWhnoa3g+m2HclBIujHzsxZ4VJVA+GUuc2/LBw=="],
+
+
"yocto-queue": ["yocto-queue@1.2.1", "", {}, "sha512-AyeEbWOu/TAXdxlV9wmGcR0+yh2j3vYPGOECcIj2S7MkrLyC7ne+oye2BKTItt0ii2PHk4cDy+95+LshzbXnGg=="],
"chalk/supports-color": ["supports-color@7.2.0", "", { "dependencies": { "has-flag": "^4.0.0" } }, "sha512-qpCAvRl9stuOHveKsn7HncJRvv501qIacKzQlO/+Lwxc9+0q2wLyv4Dfvt80/DPn2pqOBsJdDiogXGR9+OvwRw=="],
}
+1
proxy/package.json
···
"@atcute/client": "^4.0.3",
"@atcute/identity": "^1.0.3",
"@atcute/identity-resolver": "^1.1.3",
+
"@atcute/jetstream": "^1.0.2",
"@atcute/lexicons": "^1.1.0",
"@atcute/tid": "^1.0.2",
"barometer-lexicon": "file:../lib",
+3
proxy/src/index.ts
···
} from "./utils";
import store from "./store";
import routes from "./routes";
+
import { handleEvents } from "./jetstream";
const docResolver = new CompositeDidDocumentResolver({
methods: {
···
});
console.log(`server running on http://localhost:${server.port}`);
+
+
await handleEvents();
+121
proxy/src/jetstream.ts
···
+
import { JetstreamSubscription } from "@atcute/jetstream";
+
import { is, parse, parseCanonicalResourceUri } from "@atcute/lexicons";
+
import {
+
SystemsGazeBarometerService,
+
SystemsGazeBarometerCheck,
+
SystemsGazeBarometerState,
+
} from "barometer-lexicon";
+
import { config } from "./config";
+
import store, { type Service } from "./store";
+
import { expect, getRecord, log } from "./utils";
+
+
const subscription = new JetstreamSubscription({
+
url: "wss://jetstream2.us-east.bsky.network",
+
wantedCollections: [
+
"systems.gaze.barometer.service",
+
"systems.gaze.barometer.check",
+
],
+
wantedDids: [config.repoDid],
+
});
+
+
export const handleEvents = async () => {
+
for await (const event of subscription) {
+
if (event.kind !== "commit") {
+
continue;
+
}
+
const { operation, collection, rkey } = event.commit;
+
// log.info(`${operation} at://${event.did}/${collection}/${rkey}`);
+
if (operation === "create" || operation === "update") {
+
const record = event.commit.record;
+
switch (collection) {
+
case "systems.gaze.barometer.service": {
+
const serviceRecord = parse(
+
SystemsGazeBarometerService.mainSchema,
+
record,
+
);
+
// we dont care if its a dangling service
+
if (!serviceRecord.hostedBy) {
+
continue;
+
}
+
const hostAtUri = expect(
+
parseCanonicalResourceUri(serviceRecord.hostedBy),
+
);
+
// not our host
+
if (hostAtUri.rkey !== store.hostname) {
+
continue;
+
}
+
const service: Service = store.services.get(rkey) ?? {
+
record: serviceRecord,
+
checks: new Set(),
+
};
+
store.services.set(rkey, {
+
...service,
+
record: serviceRecord,
+
});
+
break;
+
}
+
case "systems.gaze.barometer.check": {
+
const checkRecord = parse(
+
SystemsGazeBarometerCheck.mainSchema,
+
record,
+
);
+
const parsedServiceAtUri = expect(
+
parseCanonicalResourceUri(checkRecord.forService),
+
);
+
let service = store.services.get(parsedServiceAtUri.rkey);
+
if (!service) {
+
const serviceRecord = await getRecord(
+
"systems.gaze.barometer.service",
+
parsedServiceAtUri.rkey,
+
);
+
if (!serviceRecord.ok) {
+
// cant get service record
+
log.error(
+
`can't fetch service record (${checkRecord.forService}) for check record (at://${event.did}/${collection}/${rkey})`,
+
);
+
continue;
+
}
+
service = {
+
record: serviceRecord.value,
+
checks: new Set(),
+
};
+
}
+
service.checks.add(rkey);
+
store.checks.set(rkey, { record: checkRecord });
+
store.services.set(parsedServiceAtUri.rkey, service);
+
break;
+
}
+
}
+
} else {
+
switch (collection) {
+
case "systems.gaze.barometer.service": {
+
const service = store.services.get(rkey);
+
if (!service) {
+
continue;
+
}
+
for (const checkKey of service.checks) {
+
store.checks.delete(checkKey);
+
}
+
store.services.delete(rkey);
+
break;
+
}
+
case "systems.gaze.barometer.check": {
+
const check = store.checks.get(rkey);
+
if (!check) {
+
continue;
+
}
+
const parsedServiceAtUri = expect(
+
parseCanonicalResourceUri(check.record.forService),
+
);
+
const service = store.services.get(parsedServiceAtUri.rkey);
+
if (service) {
+
service.checks.delete(rkey);
+
store.services.set(parsedServiceAtUri.rkey, service);
+
}
+
store.checks.delete(rkey);
+
break;
+
}
+
}
+
}
+
}
+
};
+28 -9
proxy/src/routes/push.ts
···
import { err, expect, getRecord, ok, putRecord, type Result } from "../utils";
-
import { parseCanonicalResourceUri, safeParse } from "@atcute/lexicons";
+
import {
+
parseCanonicalResourceUri,
+
safeParse,
+
type CanonicalResourceUri,
+
type ParsedCanonicalResourceUri,
+
type ResourceUri,
+
} from "@atcute/lexicons";
import store, { type Service } from "../store";
import { systemctlShow } from "../systemd";
import { config } from "../config";
···
const data = maybeData.value;
let service: Service | undefined = undefined;
+
let serviceAtUri: ResourceUri;
+
let parsedServiceAtUri: ParsedCanonicalResourceUri;
if (data.state.forService) {
-
const serviceAtUri = expect(
+
parsedServiceAtUri = expect(
parseCanonicalResourceUri(data.state.forService),
);
-
service = store.services.get(serviceAtUri.rkey);
+
service = store.services.get(parsedServiceAtUri.rkey);
if (!service) {
let serviceRecord = await getRecord(
"systems.gaze.barometer.service",
-
serviceAtUri.rkey,
+
parsedServiceAtUri.rkey,
);
if (!serviceRecord.ok) {
return badRequest({
···
}
service = {
record: serviceRecord.value,
-
checks: new Map(),
+
checks: new Set(),
};
-
store.services.set(serviceAtUri.rkey, service);
+
store.services.set(parsedServiceAtUri.rkey, service);
}
+
serviceAtUri = data.state.forService;
} else if (data.serviceName) {
const serviceInfo = await systemctlShow(data.serviceName);
if (serviceInfo.ok) {
···
data.state.forService = putAt.uri;
service = {
record,
-
checks: new Map(),
+
checks: new Set(),
};
store.services.set(rkey, service);
+
serviceAtUri = putAt.uri;
+
parsedServiceAtUri = expect(parseCanonicalResourceUri(putAt.uri));
} else {
return badRequest({
msg: `could not fetch service from systemd: ${serviceInfo.error}`,
···
const checkAtUri = expect(
parseCanonicalResourceUri(data.state.generatedBy),
);
-
let check = service.checks.get(checkAtUri.rkey);
+
let check = store.checks.get(checkAtUri.rkey);
if (!check) {
let checkRecord = await getRecord(
"systems.gaze.barometer.check",
···
check = {
record: checkRecord.value,
};
-
service.checks.set(checkAtUri.rkey, check);
+
store.checks.set(checkAtUri.rkey, check);
}
+
if (check.record.forService !== serviceAtUri) {
+
return badRequest({
+
msg: `check record does not point to the same service as the state record service`,
+
});
+
}
+
// update services with check
+
service.checks.add(checkAtUri.rkey);
+
store.services.set(parsedServiceAtUri.rkey, service);
}
const result = await putRecord(
+3 -1
proxy/src/store.ts
···
record: SystemsGazeBarometerCheck.Main;
}
export interface Service {
-
checks: Map<RecordKey, Check>;
+
checks: Set<RecordKey>;
record: SystemsGazeBarometerService.Main;
}
class Store {
services;
+
checks;
host: SystemsGazeBarometerHost.Main | null;
hostname: string;
constructor() {
this.services = new Map<RecordKey, Service>();
+
this.checks = new Map<RecordKey, Check>();
this.host = null;
this.hostname = os.hostname();
}