Scratch space for learning atproto app development
1import type { Database } from '#/db'
2import { Firehose } from '#/firehose/firehose'
3import * as Status from '#/lexicon/types/com/example/status'
4
5export class Ingester {
6 firehose: Firehose | undefined
7 constructor(public db: Database) {}
8
9 async start() {
10 const firehose = new Firehose({})
11
12 for await (const evt of firehose.run()) {
13 // Watch for write events
14 if (evt.event === 'create' || evt.event === 'update') {
15 const record = evt.record
16
17 // If the write is a valid status update
18 if (
19 evt.collection === 'com.example.status' &&
20 Status.isRecord(record) &&
21 Status.validateRecord(record).success
22 ) {
23 // Store the status in our SQLite
24 await this.db
25 .insertInto('status')
26 .values({
27 uri: evt.uri.toString(),
28 authorDid: evt.author,
29 status: record.status,
30 createdAt: record.createdAt,
31 indexedAt: new Date().toISOString(),
32 })
33 .onConflict((oc) =>
34 oc.column('uri').doUpdateSet({
35 status: record.status,
36 indexedAt: new Date().toISOString(),
37 })
38 )
39 .execute()
40 }
41 } else if (
42 evt.event === 'delete' &&
43 evt.collection === 'com.example.status'
44 ) {
45 // Remove the status from our SQLite
46 await this.db.deleteFrom('status').where({ uri: evt.uri.toString() })
47 }
48 }
49 }
50
51 destroy() {
52 this.firehose?.destroy()
53 }
54}