Scratch space for learning atproto app development
1import pino from 'pino'
2import { IdResolver } from '@atproto/identity'
3import { Firehose } from '@atproto/sync'
4import type { Database } from '#/db'
5import * as Status from '#/lexicon/types/xyz/statusphere/status'
6
7export function createIngester(db: Database, idResolver: IdResolver) {
8 const logger = pino({ name: 'firehose ingestion' })
9 return new Firehose({
10 idResolver,
11 handleEvent: async (evt) => {
12 // Watch for write events
13 if (evt.event === 'create' || evt.event === 'update') {
14 const now = new Date()
15 const record = evt.record
16
17 // If the write is a valid status update
18 if (
19 evt.collection === 'xyz.statusphere.status' &&
20 Status.isRecord(record) &&
21 Status.validateRecord(record).success
22 ) {
23 // Store the status in our SQLite
24 await db
25 .insertInto('status')
26 .values({
27 uri: evt.uri.toString(),
28 authorDid: evt.did,
29 status: record.status,
30 createdAt: record.createdAt,
31 indexedAt: now.toISOString(),
32 })
33 .onConflict((oc) =>
34 oc.column('uri').doUpdateSet({
35 status: record.status,
36 indexedAt: now.toISOString(),
37 })
38 )
39 .execute()
40 }
41 } else if (
42 evt.event === 'delete' &&
43 evt.collection === 'xyz.statusphere.status'
44 ) {
45 // Remove the status from our SQLite
46 await db.deleteFrom('status').where('uri', '=', evt.uri.toString()).execute()
47 }
48 },
49 onError: (err) => {
50 logger.error({ err }, 'error on firehose ingestion')
51 },
52 filterCollections: ['xyz.statusphere.status'],
53 excludeIdentity: true,
54 excludeAccount: true,
55 })
56}