Scratch space for learning atproto app development
1import type { Database } from '#/db'
2import * as Status from '#/lexicon/types/xyz/statusphere/status'
3import { IdResolver, MemoryCache } from '@atproto/identity'
4import { Event, Firehose } from '@atproto/sync'
5import pino from 'pino'
6import { env } from './env'
7
8const HOUR = 60e3 * 60
9const DAY = HOUR * 24
10
11export function createIngester(db: Database) {
12 const logger = pino({ name: 'firehose', level: env.LOG_LEVEL })
13 return new Firehose({
14 filterCollections: ['xyz.statusphere.status'],
15 handleEvent: async (evt: Event) => {
16 // Watch for write events
17 if (evt.event === 'create' || evt.event === 'update') {
18 const now = new Date()
19 const record = evt.record
20
21 // If the write is a valid status update
22 if (
23 evt.collection === 'xyz.statusphere.status' &&
24 Status.isRecord(record) &&
25 Status.validateRecord(record).success
26 ) {
27 logger.debug(
28 { uri: evt.uri.toString(), status: record.status },
29 'ingesting status',
30 )
31
32 // Store the status in our SQLite
33 await db
34 .insertInto('status')
35 .values({
36 uri: evt.uri.toString(),
37 authorDid: evt.did,
38 status: record.status,
39 createdAt: record.createdAt,
40 indexedAt: now.toISOString(),
41 })
42 .onConflict((oc) =>
43 oc.column('uri').doUpdateSet({
44 status: record.status,
45 indexedAt: now.toISOString(),
46 }),
47 )
48 .execute()
49 }
50 } else if (
51 evt.event === 'delete' &&
52 evt.collection === 'xyz.statusphere.status'
53 ) {
54 logger.debug(
55 { uri: evt.uri.toString(), did: evt.did },
56 'deleting status',
57 )
58
59 // Remove the status from our SQLite
60 await db
61 .deleteFrom('status')
62 .where('uri', '=', evt.uri.toString())
63 .execute()
64 }
65 },
66 onError: (err: unknown) => {
67 logger.error({ err }, 'error on firehose ingestion')
68 },
69 excludeIdentity: true,
70 excludeAccount: true,
71 service: env.FIREHOSE_URL,
72 idResolver: new IdResolver({
73 plcUrl: env.PLC_URL,
74 didCache: new MemoryCache(HOUR, DAY),
75 }),
76 })
77}