Scratch space for learning atproto app development
1import pino from 'pino' 2import { IdResolver } from '@atproto/identity' 3import { Firehose } from '@atproto/sync' 4import type { Database } from '#/db' 5import * as Status from '#/lexicon/types/xyz/statusphere/status' 6 7export function createIngester(db: Database, idResolver: IdResolver) { 8 const logger = pino({ name: 'firehose ingestion' }) 9 return new Firehose({ 10 idResolver, 11 handleEvent: async (evt) => { 12 // Watch for write events 13 if (evt.event === 'create' || evt.event === 'update') { 14 const record = evt.record 15 16 // If the write is a valid status update 17 if ( 18 evt.collection === 'xyz.statusphere.status' && 19 Status.isRecord(record) && 20 Status.validateRecord(record).success 21 ) { 22 // Store the status in our SQLite 23 await db 24 .insertInto('status') 25 .values({ 26 uri: evt.uri.toString(), 27 authorDid: evt.did, 28 status: record.status, 29 createdAt: record.createdAt, 30 indexedAt: new Date().toISOString(), 31 }) 32 .onConflict((oc) => 33 oc.column('uri').doUpdateSet({ 34 status: record.status, 35 indexedAt: new Date().toISOString(), 36 }) 37 ) 38 .execute() 39 } 40 } else if ( 41 evt.event === 'delete' && 42 evt.collection === 'xyz.statusphere.status' 43 ) { 44 // Remove the status from our SQLite 45 await db.deleteFrom('status').where({ uri: evt.uri.toString() }) 46 } 47 }, 48 onError: (err) => { 49 logger.error({ err }, 'error on firehose ingestion') 50 }, 51 filterCollections: ['xyz.statusphere.status'], 52 excludeIdentity: true, 53 excludeAccount: true, 54 }) 55}