Scratch space for learning atproto app development
1import pino from 'pino' 2import { IdResolver } from '@atproto/identity' 3import { Firehose } from '@atproto/sync' 4import type { Database } from '#/db' 5import * as Status from '#/lexicon/types/com/example/status' 6 7export function createIngester(db: Database, idResolver: IdResolver) { 8 const logger = pino({ name: 'firehose ingestion' }) 9 return new Firehose({ 10 idResolver, 11 handleEvent: async(evt) => { 12 // Watch for write events 13 if (evt.event === 'create' || evt.event === 'update') { 14 const record = evt.record 15 16 // If the write is a valid status update 17 if ( 18 evt.collection === 'com.example.status' && 19 Status.isRecord(record) && 20 Status.validateRecord(record).success 21 ) { 22 // Store the status in our SQLite 23 await db 24 .insertInto('status') 25 .values({ 26 authorDid: evt.did, 27 status: record.status, 28 updatedAt: record.updatedAt, 29 indexedAt: new Date().toISOString(), 30 }) 31 .onConflict((oc) => 32 oc.column('authorDid').doUpdateSet({ 33 status: record.status, 34 updatedAt: record.updatedAt, 35 indexedAt: new Date().toISOString(), 36 }) 37 ) 38 .execute() 39 } 40 } 41 }, 42 onError: (err) => { 43 logger.error({err}, 'error on firehose ingestion') 44 }, 45 filterCollections: ['com.example.status'], 46 excludeIdentity: true, 47 excludeAccount: true, 48 }) 49}