Scratch space for learning atproto app development
1import type { Database } from '#/db'
2import * as Status from '#/lexicon/types/xyz/statusphere/status'
3import { IdResolver, MemoryCache } from '@atproto/identity'
4import { Event, Firehose } from '@atproto/sync'
5import pino from 'pino'
6
7const HOUR = 60e3 * 60
8const DAY = HOUR * 24
9
10export function createIngester(db: Database) {
11 const logger = pino({ name: 'firehose ingestion' })
12 return new Firehose({
13 idResolver: new IdResolver({
14 didCache: new MemoryCache(HOUR, DAY),
15 }),
16 handleEvent: async (evt: Event) => {
17 // Watch for write events
18 if (evt.event === 'create' || evt.event === 'update') {
19 const now = new Date()
20 const record = evt.record
21
22 // If the write is a valid status update
23 if (
24 evt.collection === 'xyz.statusphere.status' &&
25 Status.isRecord(record) &&
26 Status.validateRecord(record).success
27 ) {
28 // Store the status in our SQLite
29 await db
30 .insertInto('status')
31 .values({
32 uri: evt.uri.toString(),
33 authorDid: evt.did,
34 status: record.status,
35 createdAt: record.createdAt,
36 indexedAt: now.toISOString(),
37 })
38 .onConflict((oc) =>
39 oc.column('uri').doUpdateSet({
40 status: record.status,
41 indexedAt: now.toISOString(),
42 }),
43 )
44 .execute()
45 }
46 } else if (
47 evt.event === 'delete' &&
48 evt.collection === 'xyz.statusphere.status'
49 ) {
50 // Remove the status from our SQLite
51 await db
52 .deleteFrom('status')
53 .where('uri', '=', evt.uri.toString())
54 .execute()
55 }
56 },
57 onError: (err: unknown) => {
58 logger.error({ err }, 'error on firehose ingestion')
59 },
60 filterCollections: ['xyz.statusphere.status'],
61 excludeIdentity: true,
62 excludeAccount: true,
63 })
64}