A simple AtProto app to read pet.mewsse.link records on my PDS.

Rework revision management for better backfilling + fix order insertion

Mewsse 13d2e446 e42bd43b

Changed files
+52 -15
src
+13
src/db.ts
···
export type DatabaseSchema = {
links: Link,
}
export type Link = {
···
tag: string | null,
image: string | null,
alt: string | null,
createdAt: string,
}
···
.addColumn('alt', 'varchar')
.addColumn('createdAt', 'varchar', (col) => col.notNull())
.execute()
},
async down(db: Kysely<any>) {
await db.schema.dropTable('links').execute()
},
}
···
export type DatabaseSchema = {
links: Link,
+
revs: Rev
}
export type Link = {
···
tag: string | null,
image: string | null,
alt: string | null,
+
createdAt: string,
+
}
+
+
export type Rev = {
+
rkey: string,
createdAt: string,
}
···
.addColumn('alt', 'varchar')
.addColumn('createdAt', 'varchar', (col) => col.notNull())
.execute()
+
+
await db.schema
+
.createTable('revs')
+
.addColumn('rkey', 'varchar', (col) => col.primaryKey())
+
.addColumn('createdAt', 'varchar', (col) => col.notNull())
+
.execute()
},
async down(db: Kysely<any>) {
await db.schema.dropTable('links').execute()
+
await db.schema.dropTable('rev').execute()
},
}
+39 -15
src/ingester.ts
···
}
const lastRev = await db
-
.selectFrom('links')
.select('rkey')
.orderBy('createdAt', 'desc')
.executeTakeFirst()
···
}
await using repo = RepoReader.fromStream(data)
for await (const entry of repo) {
if (entry.collection != "pet.mewsse.link") continue
const link : Link = decode(entry.bytes)
await db
.insertInto('links')
-
.values({
-
rkey: entry.rkey,
-
link: link.link,
-
title: link.title,
-
description: link.description,
-
tag: link.tag ?? null,
-
image: findImage(did, pds, link),
-
alt: link.alt ?? null,
-
createdAt: link.createdAt
-
})
.onConflict((conflict) =>
conflict.column('rkey').doUpdateSet({
link: link.link,
title: link.title,
description: link.description,
-
tag: link.tag ?? null,
-
image: findImage(did, pds, link),
-
alt: link.alt ?? null,
})
)
.execute()
-
logger.info(`Inserting record ${entry.rkey}`)
}
logger.info(`Backfilling ended`)
···
}
const lastRev = await db
+
.selectFrom('revs')
.select('rkey')
.orderBy('createdAt', 'desc')
.executeTakeFirst()
···
}
await using repo = RepoReader.fromStream(data)
+
let links:Array<Link> = []
+
let repoRev = null;
for await (const entry of repo) {
+
if (!repoRev) repoRev = entry.rkey
if (entry.collection != "pet.mewsse.link") continue
const link : Link = decode(entry.bytes)
+
links.push({
+
rkey: entry.rkey,
+
link: link.link,
+
title: link.title,
+
description: link.description,
+
tag: link.tag ?? null,
+
image: findImage(did, pds, link),
+
alt: link.alt ?? null,
+
createdAt: link.createdAt
+
})
+
}
+
+
links = links.sort((a, b) => a.createdAt > b.createdAt ? 1 : -1)
+
+
for await (const link of links) {
await db
.insertInto('links')
+
.values(link)
.onConflict((conflict) =>
conflict.column('rkey').doUpdateSet({
link: link.link,
title: link.title,
description: link.description,
+
tag: link.tag,
+
image: link.image,
+
alt: link.alt,
})
)
.execute()
+
logger.info(`Inserting record ${link.rkey}`)
+
}
+
+
if (!repoRev || repoRev === null) {
+
logger.error('Backfilling error: no last pds revision found')
+
return;
+
}
+
+
if (repoRev != lastRev?.rkey) {
+
await db
+
.insertInto('revs')
+
.values({
+
rkey: repoRev,
+
createdAt: now.toISOString()
+
})
+
.execute()
}
logger.info(`Backfilling ended`)