Scratch space for learning atproto app development
1import type { Database } from '#/db'
2import { Firehose } from '#/firehose/firehose'
3import * as Status from '#/lexicon/types/com/example/status'
4
5export class Ingester {
6 firehose: Firehose | undefined
7 constructor(public db: Database) {}
8
9 async start() {
10 const firehose = new Firehose({})
11
12 for await (const evt of firehose.run()) {
13 // Watch for write events
14 if (evt.event === 'create' || evt.event === 'update') {
15 const record = evt.record
16
17 // If the write is a valid status update
18 if (
19 evt.collection === 'com.example.status' &&
20 Status.isRecord(record) &&
21 Status.validateRecord(record).success
22 ) {
23 // Store the status in our SQLite
24 await this.db
25 .insertInto('status')
26 .values({
27 authorDid: evt.author,
28 status: record.status,
29 updatedAt: record.updatedAt,
30 indexedAt: new Date().toISOString(),
31 })
32 .onConflict((oc) =>
33 oc.column('authorDid').doUpdateSet({
34 status: record.status,
35 updatedAt: record.updatedAt,
36 indexedAt: new Date().toISOString(),
37 })
38 )
39 .execute()
40 }
41 }
42 }
43 }
44
45 destroy() {
46 this.firehose?.destroy()
47 }
48}