Scratch space for learning atproto app development
1import type { RepoRecord } from "@atproto/lexicon"; 2import { cborToLexRecord, readCar } from "@atproto/repo"; 3import { AtUri } from "@atproto/syntax"; 4import { Subscription } from "@atproto/xrpc-server"; 5import type { CID } from "multiformats/cid"; 6import { 7 type Account, 8 type Commit, 9 type Identity, 10 type RepoEvent, 11 isAccount, 12 isCommit, 13 isIdentity, 14 isValidRepoEvent, 15} from "./lexicons"; 16 17type Opts = { 18 service?: string; 19 getCursor?: () => Promise<number | undefined>; 20 setCursor?: (cursor: number) => Promise<void>; 21 subscriptionReconnectDelay?: number; 22 filterCollections?: string[]; 23 excludeIdentity?: boolean; 24 excludeAccount?: boolean; 25 excludeCommit?: boolean; 26}; 27 28export class Firehose { 29 public sub: Subscription<RepoEvent>; 30 private abortController: AbortController; 31 32 constructor(public opts: Opts) { 33 this.abortController = new AbortController(); 34 this.sub = new Subscription({ 35 service: opts.service ?? "https://bsky.network", 36 method: "com.atproto.sync.subscribeRepos", 37 signal: this.abortController.signal, 38 getParams: async () => { 39 if (!opts.getCursor) return undefined; 40 const cursor = await opts.getCursor(); 41 return { cursor }; 42 }, 43 validate: (value: unknown) => { 44 try { 45 return isValidRepoEvent(value); 46 } catch (err) { 47 console.error("repo subscription skipped invalid message", err); 48 } 49 }, 50 }); 51 } 52 53 async *run(): AsyncGenerator<Event> { 54 try { 55 for await (const evt of this.sub) { 56 try { 57 if (isCommit(evt) && !this.opts.excludeCommit) { 58 const parsed = await parseCommit(evt); 59 for (const write of parsed) { 60 if ( 61 !this.opts.filterCollections || 62 this.opts.filterCollections.includes(write.uri.collection) 63 ) { 64 yield write; 65 } 66 } 67 } else if (isAccount(evt) && !this.opts.excludeAccount) { 68 const parsed = parseAccount(evt); 69 if (parsed) { 70 yield parsed; 71 } 72 } else if (isIdentity(evt) && !this.opts.excludeIdentity) { 73 yield parseIdentity(evt); 74 } 75 } catch (err) { 76 console.error("repo subscription could not handle message", err); 77 } 78 if (this.opts.setCursor && typeof evt.seq === "number") { 79 await this.opts.setCursor(evt.seq); 80 } 81 } 82 } catch (err) { 83 console.error("repo subscription errored", err); 84 setTimeout( 85 () => this.run(), 86 this.opts.subscriptionReconnectDelay ?? 3000 87 ); 88 } 89 } 90 91 destroy() { 92 this.abortController.abort(); 93 } 94} 95 96export const parseCommit = async (evt: Commit): Promise<CommitEvt[]> => { 97 const car = await readCar(evt.blocks); 98 99 const evts: CommitEvt[] = []; 100 101 for (const op of evt.ops) { 102 const uri = new AtUri(`at://${evt.repo}/${op.path}`); 103 104 const meta: CommitMeta = { 105 uri, 106 author: uri.host, 107 collection: uri.collection, 108 rkey: uri.rkey, 109 }; 110 111 if (op.action === "create" || op.action === "update") { 112 if (!op.cid) continue; 113 const recordBytes = car.blocks.get(op.cid); 114 if (!recordBytes) continue; 115 const record = cborToLexRecord(recordBytes); 116 evts.push({ 117 ...meta, 118 event: op.action as "create" | "update", 119 cid: op.cid, 120 record, 121 }); 122 } 123 124 if (op.action === "delete") { 125 evts.push({ 126 ...meta, 127 event: "delete", 128 }); 129 } 130 } 131 132 return evts; 133}; 134 135export const parseIdentity = (evt: Identity): IdentityEvt => { 136 return { 137 event: "identity", 138 did: evt.did, 139 handle: evt.handle, 140 }; 141}; 142 143export const parseAccount = (evt: Account): AccountEvt | undefined => { 144 if (evt.status && !isValidStatus(evt.status)) return; 145 return { 146 event: "account", 147 did: evt.did, 148 active: evt.active, 149 status: evt.status as AccountStatus, 150 }; 151}; 152 153const isValidStatus = (str: string): str is AccountStatus => { 154 return ["takendown", "suspended", "deleted", "deactivated"].includes(str); 155}; 156 157type Event = CommitEvt | IdentityEvt | AccountEvt; 158 159type CommitMeta = { 160 uri: AtUri; 161 author: string; 162 collection: string; 163 rkey: string; 164}; 165 166type CommitEvt = Create | Update | Delete; 167 168type Create = CommitMeta & { 169 event: "create"; 170 record: RepoRecord; 171 cid: CID; 172}; 173 174type Update = CommitMeta & { 175 event: "update"; 176}; 177 178type Delete = CommitMeta & { 179 event: "delete"; 180}; 181 182type IdentityEvt = { 183 event: "identity"; 184 did: string; 185 handle?: string; 186}; 187 188type AccountEvt = { 189 event: "account"; 190 did: string; 191 active: boolean; 192 status?: AccountStatus; 193}; 194 195type AccountStatus = "takendown" | "suspended" | "deleted" | "deactivated";