Scratch space for learning atproto app development
1import type { RepoRecord } from "@atproto/lexicon"; 2import { cborToLexRecord, readCar } from "@atproto/repo"; 3import { AtUri } from "@atproto/syntax"; 4import { Subscription } from "@atproto/xrpc-server"; 5import type { CID } from "multiformats/cid"; 6import { 7 type Commit, 8 type RepoEvent, 9 isCommit, 10 isValidRepoEvent, 11} from "./lexicons"; 12 13type Opts = { 14 service?: string; 15 getCursor?: () => Promise<number | undefined>; 16 setCursor?: (cursor: number) => Promise<void>; 17 subscriptionReconnectDelay?: number; 18}; 19 20export class Firehose { 21 public sub: Subscription<RepoEvent>; 22 private abortController: AbortController; 23 24 constructor(public opts: Opts) { 25 this.abortController = new AbortController(); 26 this.sub = new Subscription({ 27 service: opts.service ?? "https://bsky.network", 28 method: "com.atproto.sync.subscribeRepos", 29 signal: this.abortController.signal, 30 getParams: async () => { 31 if (!opts.getCursor) return undefined; 32 const cursor = await opts.getCursor(); 33 return { cursor }; 34 }, 35 validate: (value: unknown) => { 36 try { 37 return isValidRepoEvent(value); 38 } catch (err) { 39 console.error("repo subscription skipped invalid message", err); 40 } 41 }, 42 }); 43 } 44 45 async *run(): AsyncGenerator<Event> { 46 try { 47 for await (const evt of this.sub) { 48 try { 49 const parsed = await parseEvent(evt); 50 for (const op of parsed) { 51 yield op; 52 } 53 } catch (err) { 54 console.error("repo subscription could not handle message", err); 55 } 56 if (this.opts.setCursor && typeof evt.seq === "number") { 57 await this.opts.setCursor(evt.seq); 58 } 59 } 60 } catch (err) { 61 console.error("repo subscription errored", err); 62 setTimeout( 63 () => this.run(), 64 this.opts.subscriptionReconnectDelay ?? 3000 65 ); 66 } 67 } 68 69 destroy() { 70 this.abortController.abort(); 71 } 72} 73 74export const parseEvent = async (evt: RepoEvent): Promise<Event[]> => { 75 if (!isCommit(evt)) return []; 76 return parseCommit(evt); 77}; 78 79export const parseCommit = async (evt: Commit): Promise<Event[]> => { 80 const car = await readCar(evt.blocks); 81 82 const evts: Event[] = []; 83 84 for (const op of evt.ops) { 85 const uri = new AtUri(`at://${evt.repo}/${op.path}`); 86 87 const meta: CommitMeta = { 88 uri, 89 author: uri.host, 90 collection: uri.collection, 91 rkey: uri.rkey, 92 }; 93 94 if (op.action === "create" || op.action === "update") { 95 if (!op.cid) continue; 96 const recordBytes = car.blocks.get(op.cid); 97 if (!recordBytes) continue; 98 const record = cborToLexRecord(recordBytes); 99 evts.push({ 100 ...meta, 101 event: op.action as "create" | "update", 102 cid: op.cid, 103 record, 104 }); 105 } 106 107 if (op.action === "delete") { 108 evts.push({ 109 ...meta, 110 event: "delete", 111 }); 112 } 113 } 114 115 return evts; 116}; 117 118type Event = Create | Update | Delete; 119 120type CommitMeta = { 121 uri: AtUri; 122 author: string; 123 collection: string; 124 rkey: string; 125}; 126 127type Create = CommitMeta & { 128 event: "create"; 129 record: RepoRecord; 130 cid: CID; 131}; 132 133type Update = CommitMeta & { 134 event: "update"; 135}; 136 137type Delete = CommitMeta & { 138 event: "delete"; 139};