Scratch space for learning atproto app development
1import { cborToLexRecord, readCar } from "@atproto/repo"; 2import { Subscription } from "@atproto/xrpc-server"; 3import type { Database } from "#/db"; 4 5export class Firehose { 6 public sub: Subscription<unknown>; 7 8 constructor(public service: string, public db: Database) { 9 this.sub = new Subscription({ 10 service: service, 11 method: "com.atproto.sync.subscribeRepos", 12 getParams: () => ({}), 13 validate: (value: unknown) => value, 14 }); 15 } 16 17 async handleEvent(evt: any): Promise<void> { 18 if (evt.$type !== "com.atproto.sync.subscribeRepos#commit") { 19 return; 20 } 21 22 const car = await readCar(evt.blocks); 23 24 for (const op of evt.ops) { 25 if (op.action !== "create") continue; 26 const uri = `at://${evt.repo}/${op.path}`; 27 const [collection] = op.path.split("/"); 28 if (collection !== "app.bsky.feed.post") continue; 29 30 if (!op.cid) continue; 31 const recordBytes = car.blocks.get(op.cid); 32 if (!recordBytes) continue; 33 const record = cborToLexRecord(recordBytes); 34 await this.db 35 .insertInto("post") 36 .values({ 37 uri, 38 text: record.text as string, 39 indexedAt: new Date().toISOString(), 40 }) 41 .execute(); 42 } 43 } 44 45 async run(subscriptionReconnectDelay: number) { 46 try { 47 for await (const evt of this.sub) { 48 try { 49 await this.handleEvent(evt); 50 } catch (err) { 51 console.error("repo subscription could not handle message", err); 52 } 53 } 54 } catch (err) { 55 console.error("repo subscription errored", err); 56 setTimeout( 57 () => this.run(subscriptionReconnectDelay), 58 subscriptionReconnectDelay 59 ); 60 } 61 } 62}