Switch to relational model of gmstn records #3

merged
opened by lewis.moe targeting main

The backfill and firehose logic should in fact save respective records into meaningful tables as they come in, instead of a raw table to straighten out after the fact. If it's found down the line that synchronous shunting of records into relational tables isn't fast enough, we can always decouple it with a raw table later again.

+33
biome.json
···
+
{
+
"formatter": {
+
"indentStyle": "space",
+
"lineWidth": 100
+
},
+
"linter": {
+
"rules": {
+
"a11y": {
+
"useAriaPropsForRole": "off",
+
"useButtonType": "off",
+
"useSemanticElements": "off",
+
"noSvgWithoutTitle": "off"
+
},
+
"complexity": {
+
"noStaticOnlyClass": "off",
+
"noForEach": "off"
+
},
+
"suspicious": {
+
"noArrayIndexKey": "off",
+
"noPrototypeBuiltins": "off"
+
},
+
"style": {
+
"noNonNullAssertion": "off"
+
}
+
}
+
},
+
"javascript": {
+
"formatter": {
+
"quoteStyle": "single"
+
}
+
}
+
}
+
+13
docker-compose.yaml
···
+
services:
+
prism-db:
+
image: postgres:latest
+
environment:
+
- POSTGRES_USER=prism
+
- POSTGRES_PASSWORD=prism
+
- POSTGRES_DB=prism
+
ports:
+
- 5432:5432
+
healthcheck:
+
test: 'exit 0'
+
+
-16
migrations/20251029194100_create_firehose_event_table.ts
···
-
import { Kysely, sql } from 'kysely'
-
-
export async function up(db: Kysely<any>): Promise<void> {
-
await db.schema
-
.createTable('firehose_event')
-
.addColumn('timestamp', 'timestamptz', (col) =>
-
col.notNull().defaultTo(sql`now()`)
-
)
-
.addColumn('event_type', 'text', (col) => col.notNull())
-
.addColumn('event_data', 'jsonb', (col) => col.notNull())
-
.execute()
-
}
-
-
export async function down(db: Kysely<any>): Promise<void> {
-
await db.schema.dropTable('firehose_event').execute()
-
}
+86
migrations/20251108164300_create_channels_invites_users_shards.ts
···
+
import { Kysely, sql } from 'kysely'
+
+
export async function up(db: Kysely<any>): Promise<void> {
+
await db.schema
+
.createTable('account')
+
.addColumn('did', 'text', (col) => col.primaryKey().notNull())
+
.addColumn('handle', 'text', (col) => col.notNull().unique())
+
.addColumn('pds_hostname', 'text', (col) =>
+
col.references('pds.hostname')
+
)
+
.addColumn('created_at', 'timestamptz', (col) => col.notNull().defaultTo(sql`now()`))
+
.execute()
+
+
await db.schema
+
.createTable('lattice')
+
.addColumn('uri', 'text', (col) => col.primaryKey().notNull())
+
.addColumn('cid', 'text', (col) => col.notNull())
+
.addColumn('creator_did', 'text', (col) =>
+
col.references('account.did').notNull()
+
)
+
.addColumn('indexed_at', 'timestamptz', (col) => col.notNull().defaultTo(sql`now()`))
+
.addColumn('data', 'jsonb', (col) => col.notNull())
+
.execute()
+
+
await db.schema
+
.createTable('shard')
+
.addColumn('uri', 'text', (col) => col.primaryKey().notNull())
+
.addColumn('cid', 'text', (col) => col.notNull())
+
.addColumn('creator_did', 'text', (col) =>
+
col.references('account.did').notNull()
+
)
+
.addColumn('indexed_at', 'timestamptz', (col) => col.notNull().defaultTo(sql`now()`))
+
.addColumn('data', 'jsonb', (col) => col.notNull())
+
.execute()
+
+
await db.schema
+
.createTable('channel')
+
.addColumn('uri', 'text', (col) => col.primaryKey().notNull())
+
.addColumn('cid', 'text', (col) => col.notNull().unique())
+
.addColumn('creator_did', 'text', (col) =>
+
col.references('account.did').notNull()
+
)
+
.addColumn('indexed_at', 'timestamptz', (col) => col.notNull().defaultTo(sql`now()`))
+
.addColumn('data', 'jsonb', (col) => col.notNull())
+
.execute()
+
+
await db.schema
+
.createTable('channel_invite')
+
.addColumn('uri', 'text', (col) => col.primaryKey().notNull())
+
.addColumn('cid', 'text', (col) => col.notNull())
+
.addColumn('channel', 'text', (col) =>
+
col.references('channel.cid').notNull()
+
)
+
.addColumn('creator_did', 'text', (col) =>
+
col.references('account.did').notNull()
+
)
+
.addColumn('recipient_did', 'text', (col) =>
+
col.references('account.did').notNull()
+
)
+
.addColumn('indexed_at', 'timestamptz', (col) => col.notNull().defaultTo(sql`now()`))
+
.addColumn('data', 'jsonb', (col) => col.notNull())
+
.execute()
+
+
await db.schema
+
.createTable('channel_membership')
+
.addColumn('uri', 'text', (col) => col.primaryKey().notNull())
+
.addColumn('cid', 'text', (col) => col.notNull())
+
.addColumn('channel', 'text', (col) =>
+
col.references('channel.cid').notNull()
+
)
+
.addColumn('recipient_did', 'text', (col) =>
+
col.references('account.did').notNull()
+
)
+
.addColumn('indexed_at', 'timestamptz', (col) => col.notNull().defaultTo(sql`now()`))
+
.addColumn('data', 'jsonb', (col) => col.notNull())
+
.execute()
+
}
+
+
export async function down(db: Kysely<any>): Promise<void> {
+
await db.schema.dropTable('channel_membership').execute()
+
await db.schema.dropTable('channel_invite').execute()
+
await db.schema.dropTable('channel').execute()
+
await db.schema.dropTable('shard').execute()
+
await db.schema.dropTable('lattice').execute()
+
await db.schema.dropTable('account').execute()
+
}
+2 -1
package.json
···
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1",
"dev": "tsx src/index.ts",
-
"lint": "eslint src/",
+
"lint": "npx biome lint ./src && npx biome format ./src",
+
"lint:fix": "npx biome lint --fix ./src && npx biome format --fix ./src",
"db:migrate": "npx ts-node src/scripts/migrate.ts latest",
"db:revert": "npx ts-node src/scripts/migrate.ts down"
},
+180 -49
src/firehose.ts
···
import { Firehose, CommitEvent, AccountEvent, IdentityEvent } from "@skyware/firehose";
import WebSocket from "ws";
import { db } from "./db";
-
import { Insertable } from "kysely";
-
import { FirehoseEventTable } from "./db";
-
-
const saveEvent = async (type: 'commit' | 'identity' | 'account', data: any) => {
-
    try {
-
        await db.insertInto('firehose_event').values({
-
            event_type: type,
-
            event_data: data
-
        }).execute();
-
    } catch (error) {
-
        console.error("\nFailed to save event to database:", error);
-
    }
-
};
+
import { Insertable, sql } from "kysely";
+
+
interface AtpRecord {
+
$type: string;
+
createdAt: string;
+
[key: string]: any;
+
}
+
+
interface NewAccount {
+
did: string;
+
handle: string;
+
pds_hostname: string;
+
created_at: any;
+
}
+
+
interface NewLattice { uri: string; cid: string; creator_did: string; indexed_at: string; data: AtpRecord }
+
interface NewShard { uri: string; cid: string; creator_did: string; indexed_at: string; data: AtpRecord }
+
interface NewChannel { uri: string; cid: string; creator_did: string; indexed_at: string; data: AtpRecord }
+
+
interface NewChannelInvite {
+
uri: string;
+
cid: string;
+
creator_did: string;
+
indexed_at: string;
+
data: AtpRecord;
+
channel?: string;
+
recipient_did?: string;
+
}
+
+
interface NewChannelMembership {
+
uri: string;
+
cid: string;
+
indexed_at: string;
+
data: AtpRecord;
+
channel?: string;
+
recipient_did?: string;
+
}
+
+
async function insertRecords(table: string, records: any[], did: string) {
+
if (records.length === 0) return;
+
try {
+
await db.insertInto(table as any)
+
.values(records)
+
.execute();
+
console.log(`[${did}] Inserted ${records.length} records into '${table}'.`);
+
} catch (e: any) {
+
console.error(`[${did}] Failed to insert records into '${table}': ${e.message}`);
+
}
+
}
+
+
//TODO: de-duplicate from pds-backfill file. - Lewis
const main = () => {
-
    console.log("Starting Firehose listener...");
+
console.log("Starting Firehose listener...");
+
+
const firehose = new Firehose({
+
ws: WebSocket,
+
});
+
+
firehose.on("commit", async (commit: CommitEvent) => {
+
const { repo: did, time: indexedAt } = commit
+
+
const newLattices: NewLattice[] = []
+
const newShards: NewShard[] = []
+
const newChannels: NewChannel[] = []
+
const newInvites: NewChannelInvite[] = []
+
const newMemberships: NewChannelMembership[] = []
+
+
const LATTICE_LEXICON = 'systems.gmstn.development.lattice'
+
const SHARD_LEXICON = 'systems.gmstn.development.shard'
+
const CHANNEL_LEXICON = 'systems.gmstn.development.channel'
+
const CHANNEL_INVITE_LEXICON = 'systems.gmstn.development.channel.invite'
+
const CHANNEL_MEMBERSHIP_LEXICON = 'systems.gmstn.development.channel.membership'
+
+
const createOps = commit.ops.filter(op => op.action === 'create');
+
+
for (const op of createOps) {
+
const record = op.record as AtpRecord
+
const collection = record?.$type
+
+
if (!collection || !collection.startsWith('systems.gmstn.development.')) {
+
continue
+
}
+
+
const uri = op.uri
+
const cid = op.cid.toString()
+
const creatorDid = did
+
+
if (!record.createdAt) {
+
console.warn(`[${did}] Found matching record without 'createdAt', skipping. URI: ${uri}`);
+
continue;
+
}
+
+
const baseRecord = {
+
uri: uri,
+
cid: cid,
+
indexed_at: indexedAt,
+
data: record,
+
}
+
+
switch (collection) {
+
case LATTICE_LEXICON:
+
newLattices.push({
+
...baseRecord,
+
creator_did: creatorDid
+
} as NewLattice)
+
break
+
case SHARD_LEXICON:
+
newShards.push({
+
...baseRecord,
+
creator_did: creatorDid
+
} as NewShard)
+
break
+
case CHANNEL_LEXICON:
+
newChannels.push({
+
...baseRecord,
+
creator_did: creatorDid
+
} as NewChannel)
+
break
+
+
case CHANNEL_INVITE_LEXICON: {
+
const recipientDid = record.recipient
-
    const firehose = new Firehose({
-
        ws: WebSocket,
-
    });
+
const existingAccount = await db.selectFrom('account')
+
.select('did')
+
.where('did', '=', recipientDid)
+
.executeTakeFirst()
-
    firehose.on("commit", async (commit: CommitEvent) => {
-
        const createOps = commit.ops.filter(op => op.action === 'create');
-
        const relevantOps = [];
+
if (!existingAccount) {
+
try {
+
const newAccount: NewAccount = {
+
did: recipientDid,
+
handle: recipientDid,
+
// We'll probably resolve this later, no problem :3
+
pds_hostname: null,
+
created_at: sql`now()`,
+
}
+
await db.insertInto('account')
+
.values(newAccount)
+
.onConflict(oc => oc.column('did').doNothing())
+
.execute()
+
console.log(`[${did}] Created new placeholder account entry for recipient ${recipientDid}.`)
+
} catch (e) {
+
console.error(`[${did}] Failed to upsert recipient account ${recipientDid}:`, e)
+
break
+
}
+
}
-
        for (const op of createOps) {
-
            const recordType = op.record['$type'];
+
newInvites.push({
+
...baseRecord,
+
creator_did: did,
+
channel: record.channel.cid,
+
recipient_did: recipientDid,
+
} as NewChannelInvite)
+
break
+
}
-
            if (recordType && (recordType.startsWith('com.atproto.') || recordType.startsWith('systems.gmstn.'))) {
-
                relevantOps.push(op);
-
            }
-
        }
+
case CHANNEL_MEMBERSHIP_LEXICON:
+
newMemberships.push({
+
...baseRecord,
+
channel: record.channel,
+
recipient_did: creatorDid,
+
} as NewChannelMembership)
+
break
-
        if (relevantOps.length > 0) {
-
            await saveEvent('commit', commit);
-
        }
-
    });
+
default:
+
console.warn(`[${did}] Unhandled 'systems.gmstn.development.*' lexicon: ${collection}`)
+
}
+
}
-
    firehose.on("identity", async (identity: IdentityEvent) => {
-
        await saveEvent('identity', identity);
-
    });
+
await insertRecords('lattice', newLattices, did)
+
await insertRecords('shard', newShards, did)
+
await insertRecords('channel', newChannels, did)
+
await insertRecords('channel_invite', newInvites, did)
+
await insertRecords('channel_membership', newMemberships, did)
+
});
-
    firehose.on("account", async (account: AccountEvent) => {
-
        await saveEvent('account', account);
-
    });
+
firehose.on("open", () => {
+
console.log("\nConnection opened");
+
});
-
    firehose.on("open", () => {
-
        console.log("\nConnection opened");
-
    });
+
firehose.on("close", (cursor) => {
+
console.log(`\nConnection closed. Last cursor was: ${cursor}. Restarting.`);
+
firehose.start();
+
});
-
    firehose.on("close", (cursor) => {
-
        console.log(`\nConnection closed. Last cursor was: ${cursor}. Restarting.`);
-
        firehose.start();
-
    });
+
firehose.on("error", ({ error, cursor }) => {
+
console.error(`\nAn error occurred at cursor ${cursor}:`, error);
+
});
-
    firehose.on("error", ({ error, cursor }) => {
-
        console.error(`\nAn error occurred at cursor ${cursor}:`, error);
-
    });
-
    
-
    firehose.start();
+
firehose.start();
-
    console.log("Listeners attached. Waiting for events...");
+
console.log("Listeners attached. Waiting for events...");
};
main();
+236 -79
src/pds-backfill.ts
···
+
import { sql, Kysely } from 'kysely';
import { db } from './db';
-
export interface FirehoseEventTable {
-
timestamp: ColumnType<Date, string | Date, never>;
-
event_type: string;
-
event_data: Record<string, any>;
-
}
-
export type FirehoseEvent = Selectable<FirehoseEventTable>;
-
export type NewFirehoseEvent = Insertable<FirehoseEventTable>;
-
interface AtpRecord {
$type: string;
createdAt: string;
[key: string]: any;
}
-
async function processSingleRepo(pdsHostname: string, did: string) {
-
const pdsBaseUrl = `https://` + pdsHostname;
-
const getRepoUrl = new URL(`/xrpc/com.atproto.sync.getRepo`, pdsBaseUrl);
-
getRepoUrl.searchParams.set('did', did);
+
interface NewAccount {
+
did: string;
+
handle: string;
+
pds_hostname: string;
+
created_at: any;
+
}
+
+
interface NewLattice { uri: string; cid: string; creator_did: string; indexed_at: string; data: AtpRecord }
+
interface NewShard { uri: string; cid: string; creator_did: string; indexed_at: string; data: AtpRecord }
+
interface NewChannel { uri: string; cid: string; creator_did: string; indexed_at: string; data: AtpRecord }
+
+
interface NewChannelInvite {
+
uri: string;
+
cid: string;
+
creator_did: string;
+
indexed_at: string;
+
data: AtpRecord;
+
channel?: string;
+
recipient_did?: string;
+
}
-
let car: any;
+
interface NewChannelMembership {
+
uri: string;
+
cid: string;
+
indexed_at: string;
+
data: AtpRecord;
+
channel?: string;
+
recipient_did?: string;
+
}
+
+
const delay = (ms: number) => new Promise(resolve => setTimeout(resolve, ms));
+
+
async function processSingleRepo(pdsHostname: string, did: string): Promise<void> {
+
const pdsBaseUrl = `https://${pdsHostname}`
+
const getRepoUrl = new URL(`/xrpc/com.atproto.sync.getRepo`, pdsBaseUrl)
+
getRepoUrl.searchParams.set('did', did)
+
+
let car: any
+
let carRoot: string
try {
const { CarReader } = await import('@ipld/car');
-
const response = await fetch(getRepoUrl.href);
+
const response = await fetch(getRepoUrl.href)
if (!response.ok) {
-
throw new Error(`Failed to getRepo for ${did}: ${response.status} ${response.statusText}`);
+
throw new Error(`Failed to getRepo for ${did}: ${response.status} ${response.statusText}`)
+
}
+
const carBytes = new Uint8Array(await response.arrayBuffer())
+
car = await CarReader.fromBytes(carBytes)
+
const roots = await car.getRoots()
+
if (roots.length === 0) {
+
throw new Error('CAR file has no root CID')
}
-
const carBytes = new Uint8Array(await response.arrayBuffer());
-
car = await CarReader.fromBytes(carBytes);
+
carRoot = roots[0].toString()
} catch (e: any) {
-
console.error(`[${did}] Failed to fetch or parse CAR: ${e.message}`);
-
return;
+
console.error(`[${did}] Failed to fetch or parse CAR: ${e.message}`)
+
return
}
-
const recordsToInsert: NewFirehoseEvent[] = [];
+
const newLattices: NewLattice[] = []
+
const newShards: NewShard[] = []
+
const newChannels: NewChannel[] = []
+
const newInvites: NewChannelInvite[] = []
+
const newMemberships: NewChannelMembership[] = []
+
+
const LATTICE_LEXICON = 'systems.gmstn.development.lattice'
+
const SHARD_LEXICON = 'systems.gmstn.development.shard'
+
const CHANNEL_LEXICON = 'systems.gmstn.development.channel'
+
const CHANNEL_INVITE_LEXICON = 'systems.gmstn.development.channel.invite'
+
const CHANNEL_MEMBERSHIP_LEXICON = 'systems.gmstn.development.channel.membership'
try {
const cbor = await import('@ipld/dag-cbor');
-
for await (const block of car.blocks()) {
-
const record = cbor.decode(block.bytes) as AtpRecord;
+
const record = cbor.decode(block.bytes) as AtpRecord
+
const cid = block.cid.toString()
+
+
const collection = record?.$type
if (
-
record &&
-
record.$type &&
-
typeof record.$type === 'string' &&
-
record.$type.startsWith('systems.gmstn.')
+
collection &&
+
typeof collection === 'string' &&
+
collection.startsWith('systems.gmstn.development.')
) {
if (!record.createdAt || typeof record.createdAt !== 'string') {
-
console.warn(`[${did}] Found matching record without valid 'createdAt', skipping.`);
-
continue;
+
console.warn(`[${did}] Found matching record without valid 'createdAt', skipping.`)
+
continue
+
}
+
+
const uri = `at://${did}/${collection}/${cid}`
+
+
const baseRecord = {
+
uri: uri,
+
cid: cid,
+
indexed_at: record.createdAt,
+
data: record,
}
-
recordsToInsert.push({
-
timestamp: record.createdAt,
-
event_type: record.$type,
-
event_data: record,
-
});
+
switch (collection) {
+
case LATTICE_LEXICON:
+
newLattices.push({
+
...baseRecord,
+
creator_did: did
+
} as NewLattice)
+
break
+
case SHARD_LEXICON:
+
newShards.push({
+
...baseRecord,
+
creator_did: did
+
} as NewShard)
+
break
+
case CHANNEL_LEXICON:
+
newChannels.push({
+
...baseRecord,
+
creator_did: did
+
} as NewChannel)
+
break
+
case CHANNEL_INVITE_LEXICON: {
+
const recipientDid = record.recipient
+
+
const existingAccount = await db.selectFrom('account')
+
.select('did')
+
.where('did', '=', recipientDid)
+
.executeTakeFirst()
+
+
if (!existingAccount) {
+
try {
+
const newAccount: NewAccount = {
+
did: recipientDid,
+
handle: recipientDid,
+
// We'll probably resolve this later, no problem :3
+
pds_hostname: null,
+
created_at: sql`now()`,
+
}
+
await db.insertInto('account')
+
.values(newAccount)
+
.onConflict(oc => oc.column('did').doNothing())
+
.execute()
+
console.log(`[${did}] Created new placeholder account entry for recipient ${recipientDid}.`)
+
} catch (e) {
+
console.error(`[${did}] Failed to upsert recipient account ${recipientDid}:`, e)
+
break
+
}
+
}
+
+
newInvites.push({
+
...baseRecord,
+
creator_did: did,
+
channel: record.channel.cid,
+
recipient_did: recipientDid,
+
} as NewChannelInvite)
+
break
+
}
+
case CHANNEL_MEMBERSHIP_LEXICON:
+
newMemberships.push({
+
...baseRecord,
+
channel: record.channel.cid,
+
recipient_did: did,
+
} as NewChannelMembership)
+
break
+
default:
+
console.warn(`[${did}] Unhandled 'systems.gmstn.development.*' lexicon: ${collection}`)
+
}
}
}
} catch (e: any) {
-
console.error(`[${did}] Error parsing CAR blocks: ${e.message}. Skipping rest of repo.`);
-
return;
+
console.error(`[${did}] Error parsing CAR blocks: ${e.message}. Skipping rest of repo.`)
+
return
+
}
+
+
if (newLattices.length > 0) {
+
await insertRecords('lattice', newLattices, did)
+
}
+
if (newShards.length > 0) {
+
await insertRecords('shard', newShards, did)
}
+
if (newChannels.length > 0) {
+
await insertRecords('channel', newChannels, did)
+
}
+
if (newInvites.length > 0) {
+
await insertRecords('channel_invite', newInvites, did)
+
}
+
if (newMemberships.length > 0) {
+
await insertRecords('channel_membership', newMemberships, did)
+
}
+
}
-
if (recordsToInsert.length > 0) {
+
async function insertRecords(table: string, records: any[], did: string) {
+
if (records.length === 0) return;
try {
-
await db.insertInto('firehose_event').values(recordsToInsert).execute();
-
console.log(`[${did}] Inserted ${recordsToInsert.length} 'systems.gmstn.*' records.`);
+
await db.insertInto(table as any)
+
.values(records)
+
.execute();
+
console.log(`[${did}] Inserted ${records.length} records into '${table}'.`);
} catch (e: any) {
-
console.error(`[${did}] Failed to insert records into DB: ${e.message}`);
+
console.error(`[${did}] Failed to insert records into '${table}': ${e.message}`);
}
-
}
}
async function backfillPds(pdsHostname: string) {
···
console.log(`Fetched ${dids.length} repos. Cursor: ${cursor}`);
+
const newAccounts: NewAccount[] = dids.map(repo => ({
+
did: repo,
+
handle: repo,
+
pds_hostname: pdsHostname,
+
created_at: sql`now()`,
+
}));
+
+
if (newAccounts.length > 0) {
+
try {
+
await db.insertInto('account')
+
.values(newAccounts)
+
.onConflict((oc) => oc
+
.column('did')
+
.doUpdateSet({
+
pds_hostname: sql`excluded.pds_hostname`,
+
})
+
)
+
.execute();
+
console.log(`Successfully bulk upserted ${newAccounts.length} accounts.`);
+
} catch (e: any) {
+
console.error(`Failed to bulk upsert accounts: ${e.message}`);
+
}
+
}
+
const BATCH_SIZE = 10;
for (let i = 0; i < dids.length; i += BATCH_SIZE) {
const batch = dids.slice(i, i + BATCH_SIZE);
···
}
async function main() {
-
let pdsesToBackfill: { hostname: string }[] = [];
+
const CONSTANT_DELAY_MS = 10000;
-
try {
-
pdsesToBackfill = await db
-
.selectFrom('pds')
-
.select('hostname')
-
.where('backfilled_at', 'is', null)
-
.orderBy(
-
(eb) => eb
-
.case()
-
.when('hostname', 'like', '%.bsky.network')
-
.then(1)
-
.else(0)
-
.end(),
-
'asc'
-
)
-
.orderBy('added_at', 'asc')
-
.execute();
+
while (true) {
+
let pdsesToBackfill: { hostname: string }[] = [];
-
if (pdsesToBackfill.length === 0) {
-
console.log('No PDSs to backfill. All caught up! Exiting.');
-
await db.destroy();
-
return;
-
}
-
-
console.log(`Found ${pdsesToBackfill.length} PDS(s) to backfill. Starting job...`);
-
-
} catch (e: any) {
-
console.error('Failed to fetch PDS list from database:', e.message);
-
process.exit(1);
-
}
-
-
for (const pds of pdsesToBackfill) {
try {
-
await backfillPds(pds.hostname);
-
} catch (e) {
-
console.error(`---`);
-
console.error(`Job for ${pds.hostname} failed. Moving to next PDS.`);
-
console.error(`---`);
+
pdsesToBackfill = await db
+
.selectFrom('pds')
+
.select('hostname')
+
.where((eb) => eb.or([
+
eb('backfilled_at', 'is', null),
+
eb('backfilled_at', '<', sql`now() - interval '24 hours'`),
+
]))
+
.orderBy(
+
(eb) => eb
+
.case()
+
.when('hostname', 'like', '%.bsky.network')
+
.then(1)
+
.else(0)
+
.end(),
+
'asc'
+
)
+
.orderBy('added_at', 'asc')
+
.execute();
+
+
if (pdsesToBackfill.length === 0) {
+
console.log('No PDSs currently needing backfill. Waiting and checking again...');
+
} else {
+
console.log(`Found ${pdsesToBackfill.length} PDS(s) to backfill. Starting job...`);
+
+
for (const pds of pdsesToBackfill) {
+
try {
+
await backfillPds(pds.hostname);
+
} catch (e) {
+
console.error(`---`);
+
console.error(`Job for ${pds.hostname} failed. Moving to next PDS.`);
+
console.error(`---`);
+
}
+
}
+
}
+
} catch (e: any) {
+
console.error('Fatal error during continuous backfill loop:', e.message);
}
-
}
-
console.log('All backfill jobs complete. Closing database connection.');
-
await db.destroy();
+
console.log(`Waiting for ${CONSTANT_DELAY_MS / 1000} seconds before next pass.`);
+
await delay(CONSTANT_DELAY_MS);
+
}
}
if (require.main === module) {