Scratch space for learning atproto app development
1import pino from 'pino' 2import { IdResolver } from '@atproto/identity' 3import { Firehose } from '@atproto/sync' 4import type { Database } from '#/db' 5import * as Status from '#/lexicon/types/xyz/statusphere/status' 6 7export function createIngester(db: Database, idResolver: IdResolver) { 8 const logger = pino({ name: 'firehose ingestion' }) 9 return new Firehose({ 10 idResolver, 11 handleEvent: async (evt) => { 12 // Watch for write events 13 if (evt.event === 'create' || evt.event === 'update') { 14 const now = new Date() 15 const record = evt.record 16 17 // If the write is a valid status update 18 if ( 19 evt.collection === 'xyz.statusphere.status' && 20 Status.isRecord(record) && 21 Status.validateRecord(record).success 22 ) { 23 // Store the status in our SQLite 24 await db 25 .insertInto('status') 26 .values({ 27 uri: evt.uri.toString(), 28 authorDid: evt.did, 29 status: record.status, 30 createdAt: record.createdAt, 31 indexedAt: now.toISOString(), 32 }) 33 .onConflict((oc) => 34 oc.column('uri').doUpdateSet({ 35 status: record.status, 36 indexedAt: now.toISOString(), 37 }) 38 ) 39 .execute() 40 } 41 } else if ( 42 evt.event === 'delete' && 43 evt.collection === 'xyz.statusphere.status' 44 ) { 45 // Remove the status from our SQLite 46 await db.deleteFrom('status').where('uri', '=', evt.uri.toString()).execute() 47 } 48 }, 49 onError: (err) => { 50 logger.error({ err }, 'error on firehose ingestion') 51 }, 52 filterCollections: ['xyz.statusphere.status'], 53 excludeIdentity: true, 54 excludeAccount: true, 55 }) 56}