Scratch space for learning atproto app development
1import type { RepoRecord } from "@atproto/lexicon";
2import { cborToLexRecord, readCar } from "@atproto/repo";
3import { AtUri } from "@atproto/syntax";
4import { Subscription } from "@atproto/xrpc-server";
5import type { CID } from "multiformats/cid";
6import {
7 type Account,
8 type Commit,
9 type Identity,
10 type RepoEvent,
11 isAccount,
12 isCommit,
13 isIdentity,
14 isValidRepoEvent,
15} from "./lexicons";
16
17type Opts = {
18 service?: string;
19 getCursor?: () => Promise<number | undefined>;
20 setCursor?: (cursor: number) => Promise<void>;
21 subscriptionReconnectDelay?: number;
22 filterCollections?: string[];
23 excludeIdentity?: boolean;
24 excludeAccount?: boolean;
25 excludeCommit?: boolean;
26};
27
28export class Firehose {
29 public sub: Subscription<RepoEvent>;
30 private abortController: AbortController;
31
32 constructor(public opts: Opts) {
33 this.abortController = new AbortController();
34 this.sub = new Subscription({
35 service: opts.service ?? "https://bsky.network",
36 method: "com.atproto.sync.subscribeRepos",
37 signal: this.abortController.signal,
38 getParams: async () => {
39 if (!opts.getCursor) return undefined;
40 const cursor = await opts.getCursor();
41 return { cursor };
42 },
43 validate: (value: unknown) => {
44 try {
45 return isValidRepoEvent(value);
46 } catch (err) {
47 console.error("repo subscription skipped invalid message", err);
48 }
49 },
50 });
51 }
52
53 async *run(): AsyncGenerator<Event> {
54 try {
55 for await (const evt of this.sub) {
56 try {
57 if (isCommit(evt) && !this.opts.excludeCommit) {
58 const parsed = await parseCommit(evt);
59 for (const write of parsed) {
60 if (
61 !this.opts.filterCollections ||
62 this.opts.filterCollections.includes(write.uri.collection)
63 ) {
64 yield write;
65 }
66 }
67 } else if (isAccount(evt) && !this.opts.excludeAccount) {
68 const parsed = parseAccount(evt);
69 if (parsed) {
70 yield parsed;
71 }
72 } else if (isIdentity(evt) && !this.opts.excludeIdentity) {
73 yield parseIdentity(evt);
74 }
75 } catch (err) {
76 console.error("repo subscription could not handle message", err);
77 }
78 if (this.opts.setCursor && typeof evt.seq === "number") {
79 await this.opts.setCursor(evt.seq);
80 }
81 }
82 } catch (err) {
83 console.error("repo subscription errored", err);
84 setTimeout(
85 () => this.run(),
86 this.opts.subscriptionReconnectDelay ?? 3000
87 );
88 }
89 }
90
91 destroy() {
92 this.abortController.abort();
93 }
94}
95
96export const parseCommit = async (evt: Commit): Promise<CommitEvt[]> => {
97 const car = await readCar(evt.blocks);
98
99 const evts: CommitEvt[] = [];
100
101 for (const op of evt.ops) {
102 const uri = new AtUri(`at://${evt.repo}/${op.path}`);
103
104 const meta: CommitMeta = {
105 uri,
106 author: uri.host,
107 collection: uri.collection,
108 rkey: uri.rkey,
109 };
110
111 if (op.action === "create" || op.action === "update") {
112 if (!op.cid) continue;
113 const recordBytes = car.blocks.get(op.cid);
114 if (!recordBytes) continue;
115 const record = cborToLexRecord(recordBytes);
116 evts.push({
117 ...meta,
118 event: op.action as "create" | "update",
119 cid: op.cid,
120 record,
121 });
122 }
123
124 if (op.action === "delete") {
125 evts.push({
126 ...meta,
127 event: "delete",
128 });
129 }
130 }
131
132 return evts;
133};
134
135export const parseIdentity = (evt: Identity): IdentityEvt => {
136 return {
137 event: "identity",
138 did: evt.did,
139 handle: evt.handle,
140 };
141};
142
143export const parseAccount = (evt: Account): AccountEvt | undefined => {
144 if (evt.status && !isValidStatus(evt.status)) return;
145 return {
146 event: "account",
147 did: evt.did,
148 active: evt.active,
149 status: evt.status as AccountStatus,
150 };
151};
152
153const isValidStatus = (str: string): str is AccountStatus => {
154 return ["takendown", "suspended", "deleted", "deactivated"].includes(str);
155};
156
157type Event = CommitEvt | IdentityEvt | AccountEvt;
158
159type CommitMeta = {
160 uri: AtUri;
161 author: string;
162 collection: string;
163 rkey: string;
164};
165
166type CommitEvt = Create | Update | Delete;
167
168type Create = CommitMeta & {
169 event: "create";
170 record: RepoRecord;
171 cid: CID;
172};
173
174type Update = CommitMeta & {
175 event: "update";
176};
177
178type Delete = CommitMeta & {
179 event: "delete";
180};
181
182type IdentityEvt = {
183 event: "identity";
184 did: string;
185 handle?: string;
186};
187
188type AccountEvt = {
189 event: "account";
190 did: string;
191 active: boolean;
192 status?: AccountStatus;
193};
194
195type AccountStatus = "takendown" | "suspended" | "deleted" | "deactivated";