Scratch space for learning atproto app development
at main 2.3 kB view raw
1import type { Database } from '#/db' 2import * as Status from '#/lexicon/types/xyz/statusphere/status' 3import { IdResolver, MemoryCache } from '@atproto/identity' 4import { Event, Firehose } from '@atproto/sync' 5import pino from 'pino' 6import { env } from './env' 7 8const HOUR = 60e3 * 60 9const DAY = HOUR * 24 10 11export function createIngester(db: Database) { 12 const logger = pino({ name: 'firehose', level: env.LOG_LEVEL }) 13 return new Firehose({ 14 filterCollections: ['xyz.statusphere.status'], 15 handleEvent: async (evt: Event) => { 16 // Watch for write events 17 if (evt.event === 'create' || evt.event === 'update') { 18 const now = new Date() 19 const record = evt.record 20 21 // If the write is a valid status update 22 if ( 23 evt.collection === 'xyz.statusphere.status' && 24 Status.isRecord(record) && 25 Status.validateRecord(record).success 26 ) { 27 logger.debug( 28 { uri: evt.uri.toString(), status: record.status }, 29 'ingesting status', 30 ) 31 32 // Store the status in our SQLite 33 await db 34 .insertInto('status') 35 .values({ 36 uri: evt.uri.toString(), 37 authorDid: evt.did, 38 status: record.status, 39 createdAt: record.createdAt, 40 indexedAt: now.toISOString(), 41 }) 42 .onConflict((oc) => 43 oc.column('uri').doUpdateSet({ 44 status: record.status, 45 indexedAt: now.toISOString(), 46 }), 47 ) 48 .execute() 49 } 50 } else if ( 51 evt.event === 'delete' && 52 evt.collection === 'xyz.statusphere.status' 53 ) { 54 logger.debug( 55 { uri: evt.uri.toString(), did: evt.did }, 56 'deleting status', 57 ) 58 59 // Remove the status from our SQLite 60 await db 61 .deleteFrom('status') 62 .where('uri', '=', evt.uri.toString()) 63 .execute() 64 } 65 }, 66 onError: (err: unknown) => { 67 logger.error({ err }, 'error on firehose ingestion') 68 }, 69 excludeIdentity: true, 70 excludeAccount: true, 71 service: env.FIREHOSE_URL, 72 idResolver: new IdResolver({ 73 plcUrl: env.PLC_URL, 74 didCache: new MemoryCache(HOUR, DAY), 75 }), 76 }) 77}