A simple AtProto app to read pet.mewsse.link records on my PDS.

Fix backfilling not following PDS latest rev

Mewsse 197a66ae 61fccc11

Changed files
+53 -9
src
+53 -9
src/ingester.ts
···
import { Client, simpleFetchHandler } from '@atcute/client'
import { findUserPDS, getUserDID } from './id-resolver.ts'
-
import { Jetstream } from '@skyware/jetstream'
import { RepoReader } from '@atcute/car/v4'
import { decode } from '@atcute/cbor'
import { logger } from "./lib/logger.ts"
···
logger.info(`Inserting record ${link.rkey}`)
}
-
if (!repoRev || repoRev === null) {
-
logger.error('Backfilling error: no last pds revision found')
-
return;
-
}
-
-
if (repoRev != lastRev?.rkey) {
await db
.insertInto('revs')
.values({
···
logger.info(`Backfilling ended`)
},
async jetstream(): Promise<Jetstream> {
const did = getUserDID()
const pds = await findUserPDS()
const jetstream = new Jetstream({
-
wantedCollections: ['pet.mewsse.link'],
-
wantedDids: [did]
})
jetstream.onCreate('pet.mewsse.link', async (event) => {
···
import { Client, simpleFetchHandler } from '@atcute/client'
import { findUserPDS, getUserDID } from './id-resolver.ts'
+
import { CommitType, Jetstream } from '@skyware/jetstream'
import { RepoReader } from '@atcute/car/v4'
import { decode } from '@atcute/cbor'
import { logger } from "./lib/logger.ts"
···
logger.info(`Inserting record ${link.rkey}`)
}
+
if (repoRev && repoRev != lastRev?.rkey) {
await db
.insertInto('revs')
.values({
···
logger.info(`Backfilling ended`)
},
+
async setLastRevision(rkey: string, createdAt: string): Promise<void> {
+
await db
+
.deleteFrom('revs')
+
.execute()
+
+
await db
+
.insertInto('revs')
+
.values({
+
rkey,
+
createdAt
+
})
+
.execute()
+
},
+
+
async rollbackRevision(): Promise<void> {
+
const lastInsert = await db
+
.selectFrom('links')
+
.select(['rkey', 'createdAt'])
+
.orderBy('createdAt', 'desc')
+
.limit(1)
+
.executeTakeFirst()
+
+
if (lastInsert !== undefined) {
+
await db
+
.deleteFrom('revs')
+
.execute()
+
+
await db
+
.insertInto('revs')
+
.values({
+
rkey: lastInsert.rkey,
+
createdAt: lastInsert.createdAt
+
})
+
.execute()
+
}
+
},
+
async jetstream(): Promise<Jetstream> {
const did = getUserDID()
const pds = await findUserPDS()
const jetstream = new Jetstream({
+
wantedDids: [did],
+
})
+
+
jetstream.on("commit", async (event) => {
+
if (event.commit.operation === CommitType.Create) {
+
const date = new Date()
+
await this.setLastRevision(event.commit.rev, date.toISOString())
+
return
+
}
+
+
if (event.commit.operation === CommitType.Delete) {
+
await this.rollbackRevision()
+
return
+
}
})
jetstream.onCreate('pet.mewsse.link', async (event) => {