Scratch space for learning atproto app development
1import type { Database } from '#/db'
2import * as Status from '#/lexicon/types/xyz/statusphere/status'
3import { IdResolver, MemoryCache } from '@atproto/identity'
4import { Event, Firehose } from '@atproto/sync'
5import pino from 'pino'
6import { env } from './env'
7
8const HOUR = 60e3 * 60
9const DAY = HOUR * 24
10
11export function createIngester(db: Database) {
12 const logger = pino({ name: 'firehose', level: env.LOG_LEVEL })
13 return new Firehose({
14 service: env.FIREHOSE_URL,
15 idResolver: new IdResolver({
16 plcUrl: env.PLC_URL,
17 didCache: new MemoryCache(HOUR, DAY),
18 }),
19 handleEvent: async (evt: Event) => {
20 // Watch for write events
21 if (evt.event === 'create' || evt.event === 'update') {
22 const now = new Date()
23 const record = evt.record
24
25 // If the write is a valid status update
26 if (
27 evt.collection === 'xyz.statusphere.status' &&
28 Status.isRecord(record) &&
29 Status.validateRecord(record).success
30 ) {
31 logger.debug(
32 { uri: evt.uri.toString(), status: record.status },
33 'ingesting status',
34 )
35
36 // Store the status in our SQLite
37 await db
38 .insertInto('status')
39 .values({
40 uri: evt.uri.toString(),
41 authorDid: evt.did,
42 status: record.status,
43 createdAt: record.createdAt,
44 indexedAt: now.toISOString(),
45 })
46 .onConflict((oc) =>
47 oc.column('uri').doUpdateSet({
48 status: record.status,
49 indexedAt: now.toISOString(),
50 }),
51 )
52 .execute()
53 }
54 } else if (
55 evt.event === 'delete' &&
56 evt.collection === 'xyz.statusphere.status'
57 ) {
58 logger.debug(
59 { uri: evt.uri.toString(), did: evt.did },
60 'deleting status',
61 )
62
63 // Remove the status from our SQLite
64 await db
65 .deleteFrom('status')
66 .where('uri', '=', evt.uri.toString())
67 .execute()
68 }
69 },
70 onError: (err: unknown) => {
71 logger.error({ err }, 'error on firehose ingestion')
72 },
73 filterCollections: ['xyz.statusphere.status'],
74 excludeIdentity: true,
75 excludeAccount: true,
76 })
77}