Scratch space for learning atproto app development

Fix: handle firehose update events

Changed files
+18 -3
src
+6 -1
src/firehose/firehose.ts
···
if (isCommit(evt) && !this.opts.excludeCommit) {
const parsed = await parseCommit(evt)
for (const write of parsed) {
-
if (!this.opts.filterCollections || this.opts.filterCollections.includes(write.uri.collection)) {
yield write
}
}
···
type Update = CommitMeta & {
event: 'update'
}
type Delete = CommitMeta & {
···
if (isCommit(evt) && !this.opts.excludeCommit) {
const parsed = await parseCommit(evt)
for (const write of parsed) {
+
if (
+
!this.opts.filterCollections ||
+
this.opts.filterCollections.includes(write.uri.collection)
+
) {
yield write
}
}
···
type Update = CommitMeta & {
event: 'update'
+
record: RepoRecord
+
cid: CID
}
type Delete = CommitMeta & {
+12 -2
src/firehose/ingester.ts
···
const firehose = new Firehose({})
for await (const evt of firehose.run()) {
-
if (evt.event === 'create') {
const record = evt.record
if (
evt.collection === 'com.example.status' &&
Status.isRecord(record) &&
Status.validateRecord(record).success
) {
await this.db
.insertInto('status')
.values({
···
updatedAt: record.updatedAt,
indexedAt: new Date().toISOString(),
})
-
.onConflict((oc) => oc.doNothing())
.execute()
}
}
···
const firehose = new Firehose({})
for await (const evt of firehose.run()) {
+
// Watch for write events
+
if (evt.event === 'create' || evt.event === 'update') {
const record = evt.record
+
+
// If the write is a valid status update
if (
evt.collection === 'com.example.status' &&
Status.isRecord(record) &&
Status.validateRecord(record).success
) {
+
// Store the status in our SQLite
await this.db
.insertInto('status')
.values({
···
updatedAt: record.updatedAt,
indexedAt: new Date().toISOString(),
})
+
.onConflict((oc) =>
+
oc.column('authorDid').doUpdateSet({
+
status: record.status,
+
updatedAt: record.updatedAt,
+
indexedAt: new Date().toISOString(),
+
})
+
)
.execute()
}
}