Scratch space for learning atproto app development
1import type { Database } from '#/db' 2import { Firehose } from '#/firehose/firehose' 3import * as Status from '#/lexicon/types/com/example/status' 4 5export class Ingester { 6 firehose: Firehose | undefined 7 constructor(public db: Database) {} 8 9 async start() { 10 const firehose = new Firehose({}) 11 12 for await (const evt of firehose.run()) { 13 // Watch for write events 14 if (evt.event === 'create' || evt.event === 'update') { 15 const record = evt.record 16 17 // If the write is a valid status update 18 if ( 19 evt.collection === 'com.example.status' && 20 Status.isRecord(record) && 21 Status.validateRecord(record).success 22 ) { 23 // Store the status in our SQLite 24 await this.db 25 .insertInto('status') 26 .values({ 27 authorDid: evt.author, 28 status: record.status, 29 updatedAt: record.updatedAt, 30 indexedAt: new Date().toISOString(), 31 }) 32 .onConflict((oc) => 33 oc.column('authorDid').doUpdateSet({ 34 status: record.status, 35 updatedAt: record.updatedAt, 36 indexedAt: new Date().toISOString(), 37 }) 38 ) 39 .execute() 40 } 41 } 42 } 43 } 44 45 destroy() { 46 this.firehose?.destroy() 47 } 48}