Scratch space for learning atproto app development
1import type { Database } from '#/db' 2import { Firehose } from '#/firehose/firehose' 3import * as Status from '#/lexicon/types/com/example/status' 4 5export class Ingester { 6 firehose: Firehose | undefined 7 constructor(public db: Database) {} 8 9 async start() { 10 const firehose = new Firehose({}) 11 12 for await (const evt of firehose.run()) { 13 // Watch for write events 14 if (evt.event === 'create' || evt.event === 'update') { 15 const record = evt.record 16 17 // If the write is a valid status update 18 if ( 19 evt.collection === 'com.example.status' && 20 Status.isRecord(record) && 21 Status.validateRecord(record).success 22 ) { 23 // Store the status in our SQLite 24 await this.db 25 .insertInto('status') 26 .values({ 27 uri: evt.uri.toString(), 28 authorDid: evt.author, 29 status: record.status, 30 createdAt: record.createdAt, 31 indexedAt: new Date().toISOString(), 32 }) 33 .onConflict((oc) => 34 oc.column('uri').doUpdateSet({ 35 status: record.status, 36 indexedAt: new Date().toISOString(), 37 }) 38 ) 39 .execute() 40 } 41 } else if ( 42 evt.event === 'delete' && 43 evt.collection === 'com.example.status' 44 ) { 45 // Remove the status from our SQLite 46 await this.db.deleteFrom('status').where({ uri: evt.uri.toString() }) 47 } 48 } 49 } 50 51 destroy() { 52 this.firehose?.destroy() 53 } 54}