Scratch space for learning atproto app development
1import type { Database } from '#/db' 2import * as Status from '#/lexicon/types/xyz/statusphere/status' 3import { IdResolver, MemoryCache } from '@atproto/identity' 4import { Event, Firehose } from '@atproto/sync' 5import pino from 'pino' 6 7const HOUR = 60e3 * 60 8const DAY = HOUR * 24 9 10export function createIngester(db: Database) { 11 const logger = pino({ name: 'firehose ingestion' }) 12 return new Firehose({ 13 idResolver: new IdResolver({ 14 didCache: new MemoryCache(HOUR, DAY), 15 }), 16 handleEvent: async (evt: Event) => { 17 // Watch for write events 18 if (evt.event === 'create' || evt.event === 'update') { 19 const now = new Date() 20 const record = evt.record 21 22 // If the write is a valid status update 23 if ( 24 evt.collection === 'xyz.statusphere.status' && 25 Status.isRecord(record) && 26 Status.validateRecord(record).success 27 ) { 28 // Store the status in our SQLite 29 await db 30 .insertInto('status') 31 .values({ 32 uri: evt.uri.toString(), 33 authorDid: evt.did, 34 status: record.status, 35 createdAt: record.createdAt, 36 indexedAt: now.toISOString(), 37 }) 38 .onConflict((oc) => 39 oc.column('uri').doUpdateSet({ 40 status: record.status, 41 indexedAt: now.toISOString(), 42 }), 43 ) 44 .execute() 45 } 46 } else if ( 47 evt.event === 'delete' && 48 evt.collection === 'xyz.statusphere.status' 49 ) { 50 // Remove the status from our SQLite 51 await db 52 .deleteFrom('status') 53 .where('uri', '=', evt.uri.toString()) 54 .execute() 55 } 56 }, 57 onError: (err: unknown) => { 58 logger.error({ err }, 'error on firehose ingestion') 59 }, 60 filterCollections: ['xyz.statusphere.status'], 61 excludeIdentity: true, 62 excludeAccount: true, 63 }) 64}