Scratch space for learning atproto app development

refactor firehose

dholms a4ee1607 19149b18

+4 -8
package.json
···
},
"dependencies": {
"@atproto/repo": "^0.4.1",
+
"@atproto/syntax": "^0.3.0",
"@atproto/xrpc-server": "^0.5.3",
"better-sqlite3": "^11.1.2",
"cors": "^2.8.5",
···
"helmet": "^7.1.0",
"http-status-codes": "^2.3.0",
"kysely": "^0.27.4",
+
"multiformats": "^9.9.0",
"pino-http": "^10.0.0"
},
"devDependencies": {
···
"vitest": "^2.0.0"
},
"lint-staged": {
-
"*.{js,ts,cjs,mjs,d.cts,d.mts,json,jsonc}": [
-
"biome check --apply --no-errors-on-unmatched"
-
]
+
"*.{js,ts,cjs,mjs,d.cts,d.mts,json,jsonc}": ["biome check --apply --no-errors-on-unmatched"]
},
"tsup": {
-
"entry": [
-
"src",
-
"!src/**/__tests__/**",
-
"!src/**/*.test.*"
-
],
+
"entry": ["src", "!src/**/__tests__/**", "!src/**/*.test.*"],
"splitting": false,
"sourcemap": true,
"clean": true
+2 -2
src/config.ts
···
import type pino from "pino";
import type { Database } from "#/db";
-
import type { Firehose } from "#/firehose";
+
import type { Ingester } from "#/firehose/ingester";
export type AppContext = {
db: Database;
-
firehose: Firehose;
+
ingester: Ingester;
logger: pino.Logger;
};
-62
src/firehose.ts
···
-
import { cborToLexRecord, readCar } from "@atproto/repo";
-
import { Subscription } from "@atproto/xrpc-server";
-
import type { Database } from "#/db";
-
-
export class Firehose {
-
public sub: Subscription<unknown>;
-
-
constructor(public service: string, public db: Database) {
-
this.sub = new Subscription({
-
service: service,
-
method: "com.atproto.sync.subscribeRepos",
-
getParams: () => ({}),
-
validate: (value: unknown) => value,
-
});
-
}
-
-
async handleEvent(evt: any): Promise<void> {
-
if (evt.$type !== "com.atproto.sync.subscribeRepos#commit") {
-
return;
-
}
-
-
const car = await readCar(evt.blocks);
-
-
for (const op of evt.ops) {
-
if (op.action !== "create") continue;
-
const uri = `at://${evt.repo}/${op.path}`;
-
const [collection] = op.path.split("/");
-
if (collection !== "app.bsky.feed.post") continue;
-
-
if (!op.cid) continue;
-
const recordBytes = car.blocks.get(op.cid);
-
if (!recordBytes) continue;
-
const record = cborToLexRecord(recordBytes);
-
await this.db
-
.insertInto("post")
-
.values({
-
uri,
-
text: record.text as string,
-
indexedAt: new Date().toISOString(),
-
})
-
.execute();
-
}
-
}
-
-
async run(subscriptionReconnectDelay: number) {
-
try {
-
for await (const evt of this.sub) {
-
try {
-
await this.handleEvent(evt);
-
} catch (err) {
-
console.error("repo subscription could not handle message", err);
-
}
-
}
-
} catch (err) {
-
console.error("repo subscription errored", err);
-
setTimeout(
-
() => this.run(subscriptionReconnectDelay),
-
subscriptionReconnectDelay
-
);
-
}
-
}
-
}
+139
src/firehose/firehose.ts
···
+
import type { RepoRecord } from "@atproto/lexicon";
+
import { cborToLexRecord, readCar } from "@atproto/repo";
+
import { AtUri } from "@atproto/syntax";
+
import { Subscription } from "@atproto/xrpc-server";
+
import type { CID } from "multiformats/cid";
+
import {
+
type Commit,
+
type RepoEvent,
+
isCommit,
+
isValidRepoEvent,
+
} from "./lexicons";
+
+
type Opts = {
+
service?: string;
+
getCursor?: () => Promise<number | undefined>;
+
setCursor?: (cursor: number) => Promise<void>;
+
subscriptionReconnectDelay?: number;
+
};
+
+
export class Firehose {
+
public sub: Subscription<RepoEvent>;
+
private abortController: AbortController;
+
+
constructor(public opts: Opts) {
+
this.abortController = new AbortController();
+
this.sub = new Subscription({
+
service: opts.service ?? "https://bsky.network",
+
method: "com.atproto.sync.subscribeRepos",
+
signal: this.abortController.signal,
+
getParams: async () => {
+
if (!opts.getCursor) return undefined;
+
const cursor = await opts.getCursor();
+
return { cursor };
+
},
+
validate: (value: unknown) => {
+
try {
+
return isValidRepoEvent(value);
+
} catch (err) {
+
console.error("repo subscription skipped invalid message", err);
+
}
+
},
+
});
+
}
+
+
async *run(): AsyncGenerator<Event> {
+
try {
+
for await (const evt of this.sub) {
+
try {
+
const parsed = await parseEvent(evt);
+
for (const op of parsed) {
+
yield op;
+
}
+
} catch (err) {
+
console.error("repo subscription could not handle message", err);
+
}
+
if (this.opts.setCursor && typeof evt.seq === "number") {
+
await this.opts.setCursor(evt.seq);
+
}
+
}
+
} catch (err) {
+
console.error("repo subscription errored", err);
+
setTimeout(
+
() => this.run(),
+
this.opts.subscriptionReconnectDelay ?? 3000
+
);
+
}
+
}
+
+
destroy() {
+
this.abortController.abort();
+
}
+
}
+
+
export const parseEvent = async (evt: RepoEvent): Promise<Event[]> => {
+
if (!isCommit(evt)) return [];
+
return parseCommit(evt);
+
};
+
+
export const parseCommit = async (evt: Commit): Promise<Event[]> => {
+
const car = await readCar(evt.blocks);
+
+
const evts: Event[] = [];
+
+
for (const op of evt.ops) {
+
const uri = new AtUri(`at://${evt.repo}/${op.path}`);
+
+
const meta: CommitMeta = {
+
uri,
+
author: uri.host,
+
collection: uri.collection,
+
rkey: uri.rkey,
+
};
+
+
if (op.action === "create" || op.action === "update") {
+
if (!op.cid) continue;
+
const recordBytes = car.blocks.get(op.cid);
+
if (!recordBytes) continue;
+
const record = cborToLexRecord(recordBytes);
+
evts.push({
+
...meta,
+
event: op.action as "create" | "update",
+
cid: op.cid,
+
record,
+
});
+
}
+
+
if (op.action === "delete") {
+
evts.push({
+
...meta,
+
event: "delete",
+
});
+
}
+
}
+
+
return evts;
+
};
+
+
type Event = Create | Update | Delete;
+
+
type CommitMeta = {
+
uri: AtUri;
+
author: string;
+
collection: string;
+
rkey: string;
+
};
+
+
type Create = CommitMeta & {
+
event: "create";
+
record: RepoRecord;
+
cid: CID;
+
};
+
+
type Update = CommitMeta & {
+
event: "update";
+
};
+
+
type Delete = CommitMeta & {
+
event: "delete";
+
};
+30
src/firehose/ingester.ts
···
+
import type { Database } from "#/db";
+
import { Firehose } from "#/firehose/firehose";
+
+
export class Ingester {
+
firehose: Firehose | undefined;
+
constructor(public db: Database) {}
+
+
async start() {
+
const firehose = new Firehose({});
+
+
for await (const evt of firehose.run()) {
+
if (evt.event === "create") {
+
if (evt.collection !== "app.bsky.feed.post") continue;
+
const post: any = evt.record; // @TODO fix types
+
await this.db
+
.insertInto("post")
+
.values({
+
uri: evt.uri.toString(),
+
text: post.text as string,
+
indexedAt: new Date().toISOString(),
+
})
+
.execute();
+
}
+
}
+
}
+
+
destroy() {
+
this.firehose?.destroy();
+
}
+
}
+355
src/firehose/lexicons.ts
···
+
import type { IncomingMessage } from "node:http";
+
+
import { type LexiconDoc, Lexicons } from "@atproto/lexicon";
+
import type { ErrorFrame, HandlerAuth } from "@atproto/xrpc-server";
+
import type { CID } from "multiformats/cid";
+
+
// @NOTE: this file is an ugly copy job of codegen output. I'd like to clean this whole thing up
+
+
export function isObj(v: unknown): v is Record<string, unknown> {
+
return typeof v === "object" && v !== null;
+
}
+
+
export function hasProp<K extends PropertyKey>(
+
data: object,
+
prop: K
+
): data is Record<K, unknown> {
+
return prop in data;
+
}
+
+
export interface QueryParams {
+
/** The last known event to backfill from. */
+
cursor?: number;
+
}
+
+
export type RepoEvent =
+
| Commit
+
| Handle
+
| Migrate
+
| Tombstone
+
| Info
+
| { $type: string; [k: string]: unknown };
+
export type HandlerError = ErrorFrame<"FutureCursor" | "ConsumerTooSlow">;
+
export type HandlerOutput = HandlerError | RepoEvent;
+
export type HandlerReqCtx<HA extends HandlerAuth = never> = {
+
auth: HA;
+
params: QueryParams;
+
req: IncomingMessage;
+
signal: AbortSignal;
+
};
+
export type Handler<HA extends HandlerAuth = never> = (
+
ctx: HandlerReqCtx<HA>
+
) => AsyncIterable<HandlerOutput>;
+
+
export interface Commit {
+
seq: number;
+
rebase: boolean;
+
tooBig: boolean;
+
repo: string;
+
commit: CID;
+
prev?: CID | null;
+
/** The rev of the emitted commit */
+
rev: string;
+
/** The rev of the last emitted commit from this repo */
+
since: string | null;
+
/** CAR file containing relevant blocks */
+
blocks: Uint8Array;
+
ops: RepoOp[];
+
blobs: CID[];
+
time: string;
+
[k: string]: unknown;
+
}
+
+
export function isCommit(v: unknown): v is Commit {
+
return (
+
isObj(v) &&
+
hasProp(v, "$type") &&
+
v.$type === "com.atproto.sync.subscribeRepos#commit"
+
);
+
}
+
+
export interface Handle {
+
seq: number;
+
did: string;
+
handle: string;
+
time: string;
+
[k: string]: unknown;
+
}
+
+
export function isHandle(v: unknown): v is Handle {
+
return (
+
isObj(v) &&
+
hasProp(v, "$type") &&
+
v.$type === "com.atproto.sync.subscribeRepos#handle"
+
);
+
}
+
+
export interface Migrate {
+
seq: number;
+
did: string;
+
migrateTo: string | null;
+
time: string;
+
[k: string]: unknown;
+
}
+
+
export function isMigrate(v: unknown): v is Migrate {
+
return (
+
isObj(v) &&
+
hasProp(v, "$type") &&
+
v.$type === "com.atproto.sync.subscribeRepos#migrate"
+
);
+
}
+
+
export interface Tombstone {
+
seq: number;
+
did: string;
+
time: string;
+
[k: string]: unknown;
+
}
+
+
export function isTombstone(v: unknown): v is Tombstone {
+
return (
+
isObj(v) &&
+
hasProp(v, "$type") &&
+
v.$type === "com.atproto.sync.subscribeRepos#tombstone"
+
);
+
}
+
+
export interface Info {
+
name: "OutdatedCursor" | (string & {});
+
message?: string;
+
[k: string]: unknown;
+
}
+
+
export function isInfo(v: unknown): v is Info {
+
return (
+
isObj(v) &&
+
hasProp(v, "$type") &&
+
v.$type === "com.atproto.sync.subscribeRepos#info"
+
);
+
}
+
+
/** A repo operation, ie a write of a single record. For creates and updates, cid is the record's CID as of this operation. For deletes, it's null. */
+
export interface RepoOp {
+
action: "create" | "update" | "delete" | (string & {});
+
path: string;
+
cid: CID | null;
+
[k: string]: unknown;
+
}
+
+
export function isRepoOp(v: unknown): v is RepoOp {
+
return (
+
isObj(v) &&
+
hasProp(v, "$type") &&
+
v.$type === "com.atproto.sync.subscribeRepos#repoOp"
+
);
+
}
+
+
export const ComAtprotoSyncSubscribeRepos: LexiconDoc = {
+
lexicon: 1,
+
id: "com.atproto.sync.subscribeRepos",
+
defs: {
+
main: {
+
type: "subscription",
+
description: "Subscribe to repo updates",
+
parameters: {
+
type: "params",
+
properties: {
+
cursor: {
+
type: "integer",
+
description: "The last known event to backfill from.",
+
},
+
},
+
},
+
message: {
+
schema: {
+
type: "union",
+
refs: [
+
"lex:com.atproto.sync.subscribeRepos#commit",
+
"lex:com.atproto.sync.subscribeRepos#handle",
+
"lex:com.atproto.sync.subscribeRepos#migrate",
+
"lex:com.atproto.sync.subscribeRepos#tombstone",
+
"lex:com.atproto.sync.subscribeRepos#info",
+
],
+
},
+
},
+
errors: [
+
{
+
name: "FutureCursor",
+
},
+
{
+
name: "ConsumerTooSlow",
+
},
+
],
+
},
+
commit: {
+
type: "object",
+
required: [
+
"seq",
+
"rebase",
+
"tooBig",
+
"repo",
+
"commit",
+
"rev",
+
"since",
+
"blocks",
+
"ops",
+
"blobs",
+
"time",
+
],
+
nullable: ["prev", "since"],
+
properties: {
+
seq: {
+
type: "integer",
+
},
+
rebase: {
+
type: "boolean",
+
},
+
tooBig: {
+
type: "boolean",
+
},
+
repo: {
+
type: "string",
+
format: "did",
+
},
+
commit: {
+
type: "cid-link",
+
},
+
prev: {
+
type: "cid-link",
+
},
+
rev: {
+
type: "string",
+
description: "The rev of the emitted commit",
+
},
+
since: {
+
type: "string",
+
description: "The rev of the last emitted commit from this repo",
+
},
+
blocks: {
+
type: "bytes",
+
description: "CAR file containing relevant blocks",
+
maxLength: 1000000,
+
},
+
ops: {
+
type: "array",
+
items: {
+
type: "ref",
+
ref: "lex:com.atproto.sync.subscribeRepos#repoOp",
+
},
+
maxLength: 200,
+
},
+
blobs: {
+
type: "array",
+
items: {
+
type: "cid-link",
+
},
+
},
+
time: {
+
type: "string",
+
format: "datetime",
+
},
+
},
+
},
+
handle: {
+
type: "object",
+
required: ["seq", "did", "handle", "time"],
+
properties: {
+
seq: {
+
type: "integer",
+
},
+
did: {
+
type: "string",
+
format: "did",
+
},
+
handle: {
+
type: "string",
+
format: "handle",
+
},
+
time: {
+
type: "string",
+
format: "datetime",
+
},
+
},
+
},
+
migrate: {
+
type: "object",
+
required: ["seq", "did", "migrateTo", "time"],
+
nullable: ["migrateTo"],
+
properties: {
+
seq: {
+
type: "integer",
+
},
+
did: {
+
type: "string",
+
format: "did",
+
},
+
migrateTo: {
+
type: "string",
+
},
+
time: {
+
type: "string",
+
format: "datetime",
+
},
+
},
+
},
+
tombstone: {
+
type: "object",
+
required: ["seq", "did", "time"],
+
properties: {
+
seq: {
+
type: "integer",
+
},
+
did: {
+
type: "string",
+
format: "did",
+
},
+
time: {
+
type: "string",
+
format: "datetime",
+
},
+
},
+
},
+
info: {
+
type: "object",
+
required: ["name"],
+
properties: {
+
name: {
+
type: "string",
+
knownValues: ["OutdatedCursor"],
+
},
+
message: {
+
type: "string",
+
},
+
},
+
},
+
repoOp: {
+
type: "object",
+
description:
+
"A repo operation, ie a write of a single record. For creates and updates, cid is the record's CID as of this operation. For deletes, it's null.",
+
required: ["action", "path", "cid"],
+
nullable: ["cid"],
+
properties: {
+
action: {
+
type: "string",
+
knownValues: ["create", "update", "delete"],
+
},
+
path: {
+
type: "string",
+
},
+
cid: {
+
type: "cid-link",
+
},
+
},
+
},
+
},
+
};
+
+
const lexicons = new Lexicons([ComAtprotoSyncSubscribeRepos]);
+
+
export const isValidRepoEvent = (evt: unknown) => {
+
return lexicons.assertValidXrpcMessage<RepoEvent>(
+
"com.atproto.sync.subscribeRepos",
+
evt
+
);
+
};
+5 -4
src/server.ts
···
import requestLogger from "#/common/middleware/requestLogger";
import { env } from "#/common/utils/envConfig";
import { createDb, migrateToLatest } from "#/db";
-
import { Firehose } from "#/firehose";
+
import { Ingester } from "#/firehose/ingester";
import { createRouter } from "#/routes";
import type { AppContext } from "./config";
···
const logger = pino({ name: "server start" });
const db = createDb(":memory:");
await migrateToLatest(db);
-
const firehose = new Firehose("https://bsky.network", db);
-
firehose.run(10);
+
const ingester = new Ingester(db);
+
ingester.start();
const ctx = {
db,
-
firehose,
+
ingester,
logger,
};
···
async close() {
this.ctx.logger.info("sigint received, shutting down");
+
this.ctx.ingester.destroy();
return new Promise<void>((resolve) => {
this.server.close(() => {
this.ctx.logger.info("server closed");