Scratch space for learning atproto app development
1import type { RepoRecord } from "@atproto/lexicon";
2import { cborToLexRecord, readCar } from "@atproto/repo";
3import { AtUri } from "@atproto/syntax";
4import { Subscription } from "@atproto/xrpc-server";
5import type { CID } from "multiformats/cid";
6import {
7 type Commit,
8 type RepoEvent,
9 isCommit,
10 isValidRepoEvent,
11} from "./lexicons";
12
13type Opts = {
14 service?: string;
15 getCursor?: () => Promise<number | undefined>;
16 setCursor?: (cursor: number) => Promise<void>;
17 subscriptionReconnectDelay?: number;
18};
19
20export class Firehose {
21 public sub: Subscription<RepoEvent>;
22 private abortController: AbortController;
23
24 constructor(public opts: Opts) {
25 this.abortController = new AbortController();
26 this.sub = new Subscription({
27 service: opts.service ?? "https://bsky.network",
28 method: "com.atproto.sync.subscribeRepos",
29 signal: this.abortController.signal,
30 getParams: async () => {
31 if (!opts.getCursor) return undefined;
32 const cursor = await opts.getCursor();
33 return { cursor };
34 },
35 validate: (value: unknown) => {
36 try {
37 return isValidRepoEvent(value);
38 } catch (err) {
39 console.error("repo subscription skipped invalid message", err);
40 }
41 },
42 });
43 }
44
45 async *run(): AsyncGenerator<Event> {
46 try {
47 for await (const evt of this.sub) {
48 try {
49 const parsed = await parseEvent(evt);
50 for (const op of parsed) {
51 yield op;
52 }
53 } catch (err) {
54 console.error("repo subscription could not handle message", err);
55 }
56 if (this.opts.setCursor && typeof evt.seq === "number") {
57 await this.opts.setCursor(evt.seq);
58 }
59 }
60 } catch (err) {
61 console.error("repo subscription errored", err);
62 setTimeout(
63 () => this.run(),
64 this.opts.subscriptionReconnectDelay ?? 3000
65 );
66 }
67 }
68
69 destroy() {
70 this.abortController.abort();
71 }
72}
73
74export const parseEvent = async (evt: RepoEvent): Promise<Event[]> => {
75 if (!isCommit(evt)) return [];
76 return parseCommit(evt);
77};
78
79export const parseCommit = async (evt: Commit): Promise<Event[]> => {
80 const car = await readCar(evt.blocks);
81
82 const evts: Event[] = [];
83
84 for (const op of evt.ops) {
85 const uri = new AtUri(`at://${evt.repo}/${op.path}`);
86
87 const meta: CommitMeta = {
88 uri,
89 author: uri.host,
90 collection: uri.collection,
91 rkey: uri.rkey,
92 };
93
94 if (op.action === "create" || op.action === "update") {
95 if (!op.cid) continue;
96 const recordBytes = car.blocks.get(op.cid);
97 if (!recordBytes) continue;
98 const record = cborToLexRecord(recordBytes);
99 evts.push({
100 ...meta,
101 event: op.action as "create" | "update",
102 cid: op.cid,
103 record,
104 });
105 }
106
107 if (op.action === "delete") {
108 evts.push({
109 ...meta,
110 event: "delete",
111 });
112 }
113 }
114
115 return evts;
116};
117
118type Event = Create | Update | Delete;
119
120type CommitMeta = {
121 uri: AtUri;
122 author: string;
123 collection: string;
124 rkey: string;
125};
126
127type Create = CommitMeta & {
128 event: "create";
129 record: RepoRecord;
130 cid: CID;
131};
132
133type Update = CommitMeta & {
134 event: "update";
135};
136
137type Delete = CommitMeta & {
138 event: "delete";
139};