Scratch space for learning atproto app development
1import type { Database } from '#/db' 2import * as Status from '#/lexicon/types/xyz/statusphere/status' 3import { IdResolver, MemoryCache } from '@atproto/identity' 4import { Event, Firehose } from '@atproto/sync' 5import pino from 'pino' 6import { env } from './env' 7 8const HOUR = 60e3 * 60 9const DAY = HOUR * 24 10 11export function createIngester(db: Database) { 12 const logger = pino({ name: 'firehose', level: env.LOG_LEVEL }) 13 return new Firehose({ 14 service: env.FIREHOSE_URL, 15 idResolver: new IdResolver({ 16 plcUrl: env.PLC_URL, 17 didCache: new MemoryCache(HOUR, DAY), 18 }), 19 handleEvent: async (evt: Event) => { 20 // Watch for write events 21 if (evt.event === 'create' || evt.event === 'update') { 22 const now = new Date() 23 const record = evt.record 24 25 // If the write is a valid status update 26 if ( 27 evt.collection === 'xyz.statusphere.status' && 28 Status.isRecord(record) && 29 Status.validateRecord(record).success 30 ) { 31 logger.debug( 32 { uri: evt.uri.toString(), status: record.status }, 33 'ingesting status', 34 ) 35 36 // Store the status in our SQLite 37 await db 38 .insertInto('status') 39 .values({ 40 uri: evt.uri.toString(), 41 authorDid: evt.did, 42 status: record.status, 43 createdAt: record.createdAt, 44 indexedAt: now.toISOString(), 45 }) 46 .onConflict((oc) => 47 oc.column('uri').doUpdateSet({ 48 status: record.status, 49 indexedAt: now.toISOString(), 50 }), 51 ) 52 .execute() 53 } 54 } else if ( 55 evt.event === 'delete' && 56 evt.collection === 'xyz.statusphere.status' 57 ) { 58 logger.debug( 59 { uri: evt.uri.toString(), did: evt.did }, 60 'deleting status', 61 ) 62 63 // Remove the status from our SQLite 64 await db 65 .deleteFrom('status') 66 .where('uri', '=', evt.uri.toString()) 67 .execute() 68 } 69 }, 70 onError: (err: unknown) => { 71 logger.error({ err }, 'error on firehose ingestion') 72 }, 73 filterCollections: ['xyz.statusphere.status'], 74 excludeIdentity: true, 75 excludeAccount: true, 76 }) 77}