diff --git a/biome.json b/biome.json new file mode 100644 index 0000000..35792d4 --- /dev/null +++ b/biome.json @@ -0,0 +1,33 @@ +{ + "formatter": { + "indentStyle": "space", + "lineWidth": 100 + }, + "linter": { + "rules": { + "a11y": { + "useAriaPropsForRole": "off", + "useButtonType": "off", + "useSemanticElements": "off", + "noSvgWithoutTitle": "off" + }, + "complexity": { + "noStaticOnlyClass": "off", + "noForEach": "off" + }, + "suspicious": { + "noArrayIndexKey": "off", + "noPrototypeBuiltins": "off" + }, + "style": { + "noNonNullAssertion": "off" + } + } + }, + "javascript": { + "formatter": { + "quoteStyle": "single" + } + } +} + diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 0000000..06ff7e9 --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,13 @@ +services: + prism-db: + image: postgres:latest + environment: + - POSTGRES_USER=prism + - POSTGRES_PASSWORD=prism + - POSTGRES_DB=prism + ports: + - 5432:5432 + healthcheck: + test: 'exit 0' + + diff --git a/migrations/20251029194100_create_firehose_event_table.ts b/migrations/20251029194100_create_firehose_event_table.ts deleted file mode 100644 index 32e5206..0000000 --- a/migrations/20251029194100_create_firehose_event_table.ts +++ /dev/null @@ -1,16 +0,0 @@ -import { Kysely, sql } from 'kysely' - -export async function up(db: Kysely): Promise { - await db.schema - .createTable('firehose_event') - .addColumn('timestamp', 'timestamptz', (col) => - col.notNull().defaultTo(sql`now()`) - ) - .addColumn('event_type', 'text', (col) => col.notNull()) - .addColumn('event_data', 'jsonb', (col) => col.notNull()) - .execute() -} - -export async function down(db: Kysely): Promise { - await db.schema.dropTable('firehose_event').execute() -} diff --git a/migrations/20251108164300_create_channels_invites_users_shards.ts b/migrations/20251108164300_create_channels_invites_users_shards.ts new file mode 100644 index 0000000..1f9fe76 --- /dev/null +++ b/migrations/20251108164300_create_channels_invites_users_shards.ts @@ -0,0 +1,86 @@ +import { Kysely, sql } from 'kysely' + +export async function up(db: Kysely): Promise { + await db.schema + .createTable('account') + .addColumn('did', 'text', (col) => col.primaryKey().notNull()) + .addColumn('handle', 'text', (col) => col.notNull().unique()) + .addColumn('pds_hostname', 'text', (col) => + col.references('pds.hostname') + ) + .addColumn('created_at', 'timestamptz', (col) => col.notNull().defaultTo(sql`now()`)) + .execute() + + await db.schema + .createTable('lattice') + .addColumn('uri', 'text', (col) => col.primaryKey().notNull()) + .addColumn('cid', 'text', (col) => col.notNull()) + .addColumn('creator_did', 'text', (col) => + col.references('account.did').notNull() + ) + .addColumn('indexed_at', 'timestamptz', (col) => col.notNull().defaultTo(sql`now()`)) + .addColumn('data', 'jsonb', (col) => col.notNull()) + .execute() + + await db.schema + .createTable('shard') + .addColumn('uri', 'text', (col) => col.primaryKey().notNull()) + .addColumn('cid', 'text', (col) => col.notNull()) + .addColumn('creator_did', 'text', (col) => + col.references('account.did').notNull() + ) + .addColumn('indexed_at', 'timestamptz', (col) => col.notNull().defaultTo(sql`now()`)) + .addColumn('data', 'jsonb', (col) => col.notNull()) + .execute() + + await db.schema + .createTable('channel') + .addColumn('uri', 'text', (col) => col.primaryKey().notNull()) + .addColumn('cid', 'text', (col) => col.notNull().unique()) + .addColumn('creator_did', 'text', (col) => + col.references('account.did').notNull() + ) + .addColumn('indexed_at', 'timestamptz', (col) => col.notNull().defaultTo(sql`now()`)) + .addColumn('data', 'jsonb', (col) => col.notNull()) + .execute() + + await db.schema + .createTable('channel_invite') + .addColumn('uri', 'text', (col) => col.primaryKey().notNull()) + .addColumn('cid', 'text', (col) => col.notNull()) + .addColumn('channel', 'text', (col) => + col.references('channel.cid').notNull() + ) + .addColumn('creator_did', 'text', (col) => + col.references('account.did').notNull() + ) + .addColumn('recipient_did', 'text', (col) => + col.references('account.did').notNull() + ) + .addColumn('indexed_at', 'timestamptz', (col) => col.notNull().defaultTo(sql`now()`)) + .addColumn('data', 'jsonb', (col) => col.notNull()) + .execute() + + await db.schema + .createTable('channel_membership') + .addColumn('uri', 'text', (col) => col.primaryKey().notNull()) + .addColumn('cid', 'text', (col) => col.notNull()) + .addColumn('channel', 'text', (col) => + col.references('channel.cid').notNull() + ) + .addColumn('recipient_did', 'text', (col) => + col.references('account.did').notNull() + ) + .addColumn('indexed_at', 'timestamptz', (col) => col.notNull().defaultTo(sql`now()`)) + .addColumn('data', 'jsonb', (col) => col.notNull()) + .execute() +} + +export async function down(db: Kysely): Promise { + await db.schema.dropTable('channel_membership').execute() + await db.schema.dropTable('channel_invite').execute() + await db.schema.dropTable('channel').execute() + await db.schema.dropTable('shard').execute() + await db.schema.dropTable('lattice').execute() + await db.schema.dropTable('account').execute() +} diff --git a/package.json b/package.json index dd2a790..533fec9 100644 --- a/package.json +++ b/package.json @@ -6,7 +6,8 @@ "scripts": { "test": "echo \"Error: no test specified\" && exit 1", "dev": "tsx src/index.ts", - "lint": "eslint src/", + "lint": "npx biome lint ./src && npx biome format ./src", + "lint:fix": "npx biome lint --fix ./src && npx biome format --fix ./src", "db:migrate": "npx ts-node src/scripts/migrate.ts latest", "db:revert": "npx ts-node src/scripts/migrate.ts down" }, diff --git a/src/firehose.ts b/src/firehose.ts index 3a1c409..39f8cf3 100644 --- a/src/firehose.ts +++ b/src/firehose.ts @@ -1,68 +1,199 @@ import { Firehose, CommitEvent, AccountEvent, IdentityEvent } from "@skyware/firehose"; import WebSocket from "ws"; import { db } from "./db"; -import { Insertable } from "kysely"; -import { FirehoseEventTable } from "./db"; - -const saveEvent = async (type: 'commit' | 'identity' | 'account', data: any) => { -    try { -        await db.insertInto('firehose_event').values({ -            event_type: type, -            event_data: data -        }).execute(); -    } catch (error) { -        console.error("\nFailed to save event to database:", error); -    } -}; +import { Insertable, sql } from "kysely"; + +interface AtpRecord { + $type: string; + createdAt: string; + [key: string]: any; +} + +interface NewAccount { + did: string; + handle: string; + pds_hostname: string; + created_at: any; +} + +interface NewLattice { uri: string; cid: string; creator_did: string; indexed_at: string; data: AtpRecord } +interface NewShard { uri: string; cid: string; creator_did: string; indexed_at: string; data: AtpRecord } +interface NewChannel { uri: string; cid: string; creator_did: string; indexed_at: string; data: AtpRecord } + +interface NewChannelInvite { + uri: string; + cid: string; + creator_did: string; + indexed_at: string; + data: AtpRecord; + channel?: string; + recipient_did?: string; +} + +interface NewChannelMembership { + uri: string; + cid: string; + indexed_at: string; + data: AtpRecord; + channel?: string; + recipient_did?: string; +} + +async function insertRecords(table: string, records: any[], did: string) { + if (records.length === 0) return; + try { + await db.insertInto(table as any) + .values(records) + .execute(); + console.log(`[${did}] Inserted ${records.length} records into '${table}'.`); + } catch (e: any) { + console.error(`[${did}] Failed to insert records into '${table}': ${e.message}`); + } +} + +//TODO: de-duplicate from pds-backfill file. - Lewis const main = () => { -    console.log("Starting Firehose listener..."); + console.log("Starting Firehose listener..."); + + const firehose = new Firehose({ + ws: WebSocket, + }); + + firehose.on("commit", async (commit: CommitEvent) => { + const { repo: did, time: indexedAt } = commit + + const newLattices: NewLattice[] = [] + const newShards: NewShard[] = [] + const newChannels: NewChannel[] = [] + const newInvites: NewChannelInvite[] = [] + const newMemberships: NewChannelMembership[] = [] + + const LATTICE_LEXICON = 'systems.gmstn.development.lattice' + const SHARD_LEXICON = 'systems.gmstn.development.shard' + const CHANNEL_LEXICON = 'systems.gmstn.development.channel' + const CHANNEL_INVITE_LEXICON = 'systems.gmstn.development.channel.invite' + const CHANNEL_MEMBERSHIP_LEXICON = 'systems.gmstn.development.channel.membership' + + const createOps = commit.ops.filter(op => op.action === 'create'); + + for (const op of createOps) { + const record = op.record as AtpRecord + const collection = record?.$type + + if (!collection || !collection.startsWith('systems.gmstn.development.')) { + continue + } + + const uri = op.uri + const cid = op.cid.toString() + const creatorDid = did + + if (!record.createdAt) { + console.warn(`[${did}] Found matching record without 'createdAt', skipping. URI: ${uri}`); + continue; + } + + const baseRecord = { + uri: uri, + cid: cid, + indexed_at: indexedAt, + data: record, + } + + switch (collection) { + case LATTICE_LEXICON: + newLattices.push({ + ...baseRecord, + creator_did: creatorDid + } as NewLattice) + break + case SHARD_LEXICON: + newShards.push({ + ...baseRecord, + creator_did: creatorDid + } as NewShard) + break + case CHANNEL_LEXICON: + newChannels.push({ + ...baseRecord, + creator_did: creatorDid + } as NewChannel) + break + + case CHANNEL_INVITE_LEXICON: { + const recipientDid = record.recipient -    const firehose = new Firehose({ -        ws: WebSocket, -    }); + const existingAccount = await db.selectFrom('account') + .select('did') + .where('did', '=', recipientDid) + .executeTakeFirst() -    firehose.on("commit", async (commit: CommitEvent) => { -        const createOps = commit.ops.filter(op => op.action === 'create'); -        const relevantOps = []; + if (!existingAccount) { + try { + const newAccount: NewAccount = { + did: recipientDid, + handle: recipientDid, + // We'll probably resolve this later, no problem :3 + pds_hostname: null, + created_at: sql`now()`, + } + await db.insertInto('account') + .values(newAccount) + .onConflict(oc => oc.column('did').doNothing()) + .execute() + console.log(`[${did}] Created new placeholder account entry for recipient ${recipientDid}.`) + } catch (e) { + console.error(`[${did}] Failed to upsert recipient account ${recipientDid}:`, e) + break + } + } -        for (const op of createOps) { -            const recordType = op.record['$type']; + newInvites.push({ + ...baseRecord, + creator_did: did, + channel: record.channel.cid, + recipient_did: recipientDid, + } as NewChannelInvite) + break + } -            if (recordType && (recordType.startsWith('com.atproto.') || recordType.startsWith('systems.gmstn.'))) { -                relevantOps.push(op); -            } -        } + case CHANNEL_MEMBERSHIP_LEXICON: + newMemberships.push({ + ...baseRecord, + channel: record.channel, + recipient_did: creatorDid, + } as NewChannelMembership) + break -        if (relevantOps.length > 0) { -            await saveEvent('commit', commit); -        } -    }); + default: + console.warn(`[${did}] Unhandled 'systems.gmstn.development.*' lexicon: ${collection}`) + } + } -    firehose.on("identity", async (identity: IdentityEvent) => { -        await saveEvent('identity', identity); -    }); + await insertRecords('lattice', newLattices, did) + await insertRecords('shard', newShards, did) + await insertRecords('channel', newChannels, did) + await insertRecords('channel_invite', newInvites, did) + await insertRecords('channel_membership', newMemberships, did) + }); -    firehose.on("account", async (account: AccountEvent) => { -        await saveEvent('account', account); -    }); + firehose.on("open", () => { + console.log("\nConnection opened"); + }); -    firehose.on("open", () => { -        console.log("\nConnection opened"); -    }); + firehose.on("close", (cursor) => { + console.log(`\nConnection closed. Last cursor was: ${cursor}. Restarting.`); + firehose.start(); + }); -    firehose.on("close", (cursor) => { -        console.log(`\nConnection closed. Last cursor was: ${cursor}. Restarting.`); -        firehose.start(); -    }); + firehose.on("error", ({ error, cursor }) => { + console.error(`\nAn error occurred at cursor ${cursor}:`, error); + }); -    firehose.on("error", ({ error, cursor }) => { -        console.error(`\nAn error occurred at cursor ${cursor}:`, error); -    }); -     -    firehose.start(); + firehose.start(); -    console.log("Listeners attached. Waiting for events..."); + console.log("Listeners attached. Waiting for events..."); }; main(); diff --git a/src/pds-backfill.ts b/src/pds-backfill.ts index b6e2f16..64319f2 100644 --- a/src/pds-backfill.ts +++ b/src/pds-backfill.ts @@ -1,78 +1,208 @@ +import { sql, Kysely } from 'kysely'; import { db } from './db'; -export interface FirehoseEventTable { - timestamp: ColumnType; - event_type: string; - event_data: Record; -} -export type FirehoseEvent = Selectable; -export type NewFirehoseEvent = Insertable; - interface AtpRecord { $type: string; createdAt: string; [key: string]: any; } -async function processSingleRepo(pdsHostname: string, did: string) { - const pdsBaseUrl = `https://` + pdsHostname; - const getRepoUrl = new URL(`/xrpc/com.atproto.sync.getRepo`, pdsBaseUrl); - getRepoUrl.searchParams.set('did', did); +interface NewAccount { + did: string; + handle: string; + pds_hostname: string; + created_at: any; +} + +interface NewLattice { uri: string; cid: string; creator_did: string; indexed_at: string; data: AtpRecord } +interface NewShard { uri: string; cid: string; creator_did: string; indexed_at: string; data: AtpRecord } +interface NewChannel { uri: string; cid: string; creator_did: string; indexed_at: string; data: AtpRecord } + +interface NewChannelInvite { + uri: string; + cid: string; + creator_did: string; + indexed_at: string; + data: AtpRecord; + channel?: string; + recipient_did?: string; +} - let car: any; +interface NewChannelMembership { + uri: string; + cid: string; + indexed_at: string; + data: AtpRecord; + channel?: string; + recipient_did?: string; +} + +const delay = (ms: number) => new Promise(resolve => setTimeout(resolve, ms)); + +async function processSingleRepo(pdsHostname: string, did: string): Promise { + const pdsBaseUrl = `https://${pdsHostname}` + const getRepoUrl = new URL(`/xrpc/com.atproto.sync.getRepo`, pdsBaseUrl) + getRepoUrl.searchParams.set('did', did) + + let car: any + let carRoot: string try { const { CarReader } = await import('@ipld/car'); - const response = await fetch(getRepoUrl.href); + const response = await fetch(getRepoUrl.href) if (!response.ok) { - throw new Error(`Failed to getRepo for ${did}: ${response.status} ${response.statusText}`); + throw new Error(`Failed to getRepo for ${did}: ${response.status} ${response.statusText}`) + } + const carBytes = new Uint8Array(await response.arrayBuffer()) + car = await CarReader.fromBytes(carBytes) + const roots = await car.getRoots() + if (roots.length === 0) { + throw new Error('CAR file has no root CID') } - const carBytes = new Uint8Array(await response.arrayBuffer()); - car = await CarReader.fromBytes(carBytes); + carRoot = roots[0].toString() } catch (e: any) { - console.error(`[${did}] Failed to fetch or parse CAR: ${e.message}`); - return; + console.error(`[${did}] Failed to fetch or parse CAR: ${e.message}`) + return } - const recordsToInsert: NewFirehoseEvent[] = []; + const newLattices: NewLattice[] = [] + const newShards: NewShard[] = [] + const newChannels: NewChannel[] = [] + const newInvites: NewChannelInvite[] = [] + const newMemberships: NewChannelMembership[] = [] + + const LATTICE_LEXICON = 'systems.gmstn.development.lattice' + const SHARD_LEXICON = 'systems.gmstn.development.shard' + const CHANNEL_LEXICON = 'systems.gmstn.development.channel' + const CHANNEL_INVITE_LEXICON = 'systems.gmstn.development.channel.invite' + const CHANNEL_MEMBERSHIP_LEXICON = 'systems.gmstn.development.channel.membership' try { const cbor = await import('@ipld/dag-cbor'); - for await (const block of car.blocks()) { - const record = cbor.decode(block.bytes) as AtpRecord; + const record = cbor.decode(block.bytes) as AtpRecord + const cid = block.cid.toString() + + const collection = record?.$type if ( - record && - record.$type && - typeof record.$type === 'string' && - record.$type.startsWith('systems.gmstn.') + collection && + typeof collection === 'string' && + collection.startsWith('systems.gmstn.development.') ) { if (!record.createdAt || typeof record.createdAt !== 'string') { - console.warn(`[${did}] Found matching record without valid 'createdAt', skipping.`); - continue; + console.warn(`[${did}] Found matching record without valid 'createdAt', skipping.`) + continue + } + + const uri = `at://${did}/${collection}/${cid}` + + const baseRecord = { + uri: uri, + cid: cid, + indexed_at: record.createdAt, + data: record, } - recordsToInsert.push({ - timestamp: record.createdAt, - event_type: record.$type, - event_data: record, - }); + switch (collection) { + case LATTICE_LEXICON: + newLattices.push({ + ...baseRecord, + creator_did: did + } as NewLattice) + break + case SHARD_LEXICON: + newShards.push({ + ...baseRecord, + creator_did: did + } as NewShard) + break + case CHANNEL_LEXICON: + newChannels.push({ + ...baseRecord, + creator_did: did + } as NewChannel) + break + case CHANNEL_INVITE_LEXICON: { + const recipientDid = record.recipient + + const existingAccount = await db.selectFrom('account') + .select('did') + .where('did', '=', recipientDid) + .executeTakeFirst() + + if (!existingAccount) { + try { + const newAccount: NewAccount = { + did: recipientDid, + handle: recipientDid, + // We'll probably resolve this later, no problem :3 + pds_hostname: null, + created_at: sql`now()`, + } + await db.insertInto('account') + .values(newAccount) + .onConflict(oc => oc.column('did').doNothing()) + .execute() + console.log(`[${did}] Created new placeholder account entry for recipient ${recipientDid}.`) + } catch (e) { + console.error(`[${did}] Failed to upsert recipient account ${recipientDid}:`, e) + break + } + } + + newInvites.push({ + ...baseRecord, + creator_did: did, + channel: record.channel.cid, + recipient_did: recipientDid, + } as NewChannelInvite) + break + } + case CHANNEL_MEMBERSHIP_LEXICON: + newMemberships.push({ + ...baseRecord, + channel: record.channel.cid, + recipient_did: did, + } as NewChannelMembership) + break + default: + console.warn(`[${did}] Unhandled 'systems.gmstn.development.*' lexicon: ${collection}`) + } } } } catch (e: any) { - console.error(`[${did}] Error parsing CAR blocks: ${e.message}. Skipping rest of repo.`); - return; + console.error(`[${did}] Error parsing CAR blocks: ${e.message}. Skipping rest of repo.`) + return + } + + if (newLattices.length > 0) { + await insertRecords('lattice', newLattices, did) + } + if (newShards.length > 0) { + await insertRecords('shard', newShards, did) } + if (newChannels.length > 0) { + await insertRecords('channel', newChannels, did) + } + if (newInvites.length > 0) { + await insertRecords('channel_invite', newInvites, did) + } + if (newMemberships.length > 0) { + await insertRecords('channel_membership', newMemberships, did) + } +} - if (recordsToInsert.length > 0) { +async function insertRecords(table: string, records: any[], did: string) { + if (records.length === 0) return; try { - await db.insertInto('firehose_event').values(recordsToInsert).execute(); - console.log(`[${did}] Inserted ${recordsToInsert.length} 'systems.gmstn.*' records.`); + await db.insertInto(table as any) + .values(records) + .execute(); + console.log(`[${did}] Inserted ${records.length} records into '${table}'.`); } catch (e: any) { - console.error(`[${did}] Failed to insert records into DB: ${e.message}`); + console.error(`[${did}] Failed to insert records into '${table}': ${e.message}`); } - } } async function backfillPds(pdsHostname: string) { @@ -103,6 +233,30 @@ async function backfillPds(pdsHostname: string) { console.log(`Fetched ${dids.length} repos. Cursor: ${cursor}`); + const newAccounts: NewAccount[] = dids.map(repo => ({ + did: repo, + handle: repo, + pds_hostname: pdsHostname, + created_at: sql`now()`, + })); + + if (newAccounts.length > 0) { + try { + await db.insertInto('account') + .values(newAccounts) + .onConflict((oc) => oc + .column('did') + .doUpdateSet({ + pds_hostname: sql`excluded.pds_hostname`, + }) + ) + .execute(); + console.log(`Successfully bulk upserted ${newAccounts.length} accounts.`); + } catch (e: any) { + console.error(`Failed to bulk upsert accounts: ${e.message}`); + } + } + const BATCH_SIZE = 10; for (let i = 0; i < dids.length; i += BATCH_SIZE) { const batch = dids.slice(i, i + BATCH_SIZE); @@ -131,50 +285,53 @@ async function backfillPds(pdsHostname: string) { } async function main() { - let pdsesToBackfill: { hostname: string }[] = []; + const CONSTANT_DELAY_MS = 10000; - try { - pdsesToBackfill = await db - .selectFrom('pds') - .select('hostname') - .where('backfilled_at', 'is', null) - .orderBy( - (eb) => eb - .case() - .when('hostname', 'like', '%.bsky.network') - .then(1) - .else(0) - .end(), - 'asc' - ) - .orderBy('added_at', 'asc') - .execute(); + while (true) { + let pdsesToBackfill: { hostname: string }[] = []; - if (pdsesToBackfill.length === 0) { - console.log('No PDSs to backfill. All caught up! Exiting.'); - await db.destroy(); - return; - } - - console.log(`Found ${pdsesToBackfill.length} PDS(s) to backfill. Starting job...`); - - } catch (e: any) { - console.error('Failed to fetch PDS list from database:', e.message); - process.exit(1); - } - - for (const pds of pdsesToBackfill) { try { - await backfillPds(pds.hostname); - } catch (e) { - console.error(`---`); - console.error(`Job for ${pds.hostname} failed. Moving to next PDS.`); - console.error(`---`); + pdsesToBackfill = await db + .selectFrom('pds') + .select('hostname') + .where((eb) => eb.or([ + eb('backfilled_at', 'is', null), + eb('backfilled_at', '<', sql`now() - interval '24 hours'`), + ])) + .orderBy( + (eb) => eb + .case() + .when('hostname', 'like', '%.bsky.network') + .then(1) + .else(0) + .end(), + 'asc' + ) + .orderBy('added_at', 'asc') + .execute(); + + if (pdsesToBackfill.length === 0) { + console.log('No PDSs currently needing backfill. Waiting and checking again...'); + } else { + console.log(`Found ${pdsesToBackfill.length} PDS(s) to backfill. Starting job...`); + + for (const pds of pdsesToBackfill) { + try { + await backfillPds(pds.hostname); + } catch (e) { + console.error(`---`); + console.error(`Job for ${pds.hostname} failed. Moving to next PDS.`); + console.error(`---`); + } + } + } + } catch (e: any) { + console.error('Fatal error during continuous backfill loop:', e.message); } - } - console.log('All backfill jobs complete. Closing database connection.'); - await db.destroy(); + console.log(`Waiting for ${CONSTANT_DELAY_MS / 1000} seconds before next pass.`); + await delay(CONSTANT_DELAY_MS); + } } if (require.main === module) {