Scratch space for learning atproto app development
1import pino from 'pino'
2import { IdResolver } from '@atproto/identity'
3import { Firehose } from '@atproto/sync'
4import type { Database } from '#/db'
5import * as Status from '#/lexicon/types/xyz/statusphere/status'
6
7export function createIngester(db: Database, idResolver: IdResolver) {
8 const logger = pino({ name: 'firehose ingestion' })
9 return new Firehose({
10 idResolver,
11 handleEvent: async (evt) => {
12 // Watch for write events
13 if (evt.event === 'create' || evt.event === 'update') {
14 const record = evt.record
15
16 // If the write is a valid status update
17 if (
18 evt.collection === 'xyz.statusphere.status' &&
19 Status.isRecord(record) &&
20 Status.validateRecord(record).success
21 ) {
22 // Store the status in our SQLite
23 await db
24 .insertInto('status')
25 .values({
26 uri: evt.uri.toString(),
27 authorDid: evt.did,
28 status: record.status,
29 createdAt: record.createdAt,
30 indexedAt: new Date().toISOString(),
31 })
32 .onConflict((oc) =>
33 oc.column('uri').doUpdateSet({
34 status: record.status,
35 indexedAt: new Date().toISOString(),
36 })
37 )
38 .execute()
39 }
40 } else if (
41 evt.event === 'delete' &&
42 evt.collection === 'xyz.statusphere.status'
43 ) {
44 // Remove the status from our SQLite
45 await db.deleteFrom('status').where({ uri: evt.uri.toString() })
46 }
47 },
48 onError: (err) => {
49 logger.error({ err }, 'error on firehose ingestion')
50 },
51 filterCollections: ['xyz.statusphere.status'],
52 excludeIdentity: true,
53 excludeAccount: true,
54 })
55}