Scratch space for learning atproto app development
1import { cborToLexRecord, readCar } from "@atproto/repo";
2import { Subscription } from "@atproto/xrpc-server";
3import type { Database } from "#/db";
4
5export class Firehose {
6 public sub: Subscription<unknown>;
7
8 constructor(public service: string, public db: Database) {
9 this.sub = new Subscription({
10 service: service,
11 method: "com.atproto.sync.subscribeRepos",
12 getParams: () => ({}),
13 validate: (value: unknown) => value,
14 });
15 }
16
17 async handleEvent(evt: any): Promise<void> {
18 if (evt.$type !== "com.atproto.sync.subscribeRepos#commit") {
19 return;
20 }
21
22 const car = await readCar(evt.blocks);
23
24 for (const op of evt.ops) {
25 if (op.action !== "create") continue;
26 const uri = `at://${evt.repo}/${op.path}`;
27 const [collection] = op.path.split("/");
28 if (collection !== "app.bsky.feed.post") continue;
29
30 if (!op.cid) continue;
31 const recordBytes = car.blocks.get(op.cid);
32 if (!recordBytes) continue;
33 const record = cborToLexRecord(recordBytes);
34 await this.db
35 .insertInto("post")
36 .values({
37 uri,
38 text: record.text as string,
39 indexedAt: new Date().toISOString(),
40 })
41 .execute();
42 }
43 }
44
45 async run(subscriptionReconnectDelay: number) {
46 try {
47 for await (const evt of this.sub) {
48 try {
49 await this.handleEvent(evt);
50 } catch (err) {
51 console.error("repo subscription could not handle message", err);
52 }
53 }
54 } catch (err) {
55 console.error("repo subscription errored", err);
56 setTimeout(
57 () => this.run(subscriptionReconnectDelay),
58 subscriptionReconnectDelay
59 );
60 }
61 }
62}