Scratch space for learning atproto app development
1import pino from 'pino'
2import { IdResolver } from '@atproto/identity'
3import { Firehose } from '@atproto/sync'
4import type { Database } from '#/db'
5import * as Status from '#/lexicon/types/com/example/status'
6
7export function createIngester(db: Database, idResolver: IdResolver) {
8 const logger = pino({ name: 'firehose ingestion' })
9 return new Firehose({
10 idResolver,
11 handleEvent: async(evt) => {
12 // Watch for write events
13 if (evt.event === 'create' || evt.event === 'update') {
14 const record = evt.record
15
16 // If the write is a valid status update
17 if (
18 evt.collection === 'com.example.status' &&
19 Status.isRecord(record) &&
20 Status.validateRecord(record).success
21 ) {
22 // Store the status in our SQLite
23 await db
24 .insertInto('status')
25 .values({
26 authorDid: evt.did,
27 status: record.status,
28 updatedAt: record.updatedAt,
29 indexedAt: new Date().toISOString(),
30 })
31 .onConflict((oc) =>
32 oc.column('authorDid').doUpdateSet({
33 status: record.status,
34 updatedAt: record.updatedAt,
35 indexedAt: new Date().toISOString(),
36 })
37 )
38 .execute()
39 }
40 }
41 },
42 onError: (err) => {
43 logger.error({err}, 'error on firehose ingestion')
44 },
45 filterCollections: ['com.example.status'],
46 excludeIdentity: true,
47 excludeAccount: true,
48 })
49}