···
1
+
import { sql, Kysely } from 'kysely';
import { db } from './db';
3
-
export interface FirehoseEventTable {
4
-
timestamp: ColumnType<Date, string | Date, never>;
6
-
event_data: Record<string, any>;
8
-
export type FirehoseEvent = Selectable<FirehoseEventTable>;
9
-
export type NewFirehoseEvent = Insertable<FirehoseEventTable>;
17
-
async function processSingleRepo(pdsHostname: string, did: string) {
18
-
const pdsBaseUrl = `https://` + pdsHostname;
19
-
const getRepoUrl = new URL(`/xrpc/com.atproto.sync.getRepo`, pdsBaseUrl);
20
-
getRepoUrl.searchParams.set('did', did);
10
+
interface NewAccount {
13
+
pds_hostname: string;
17
+
interface NewLattice { uri: string; cid: string; creator_did: string; indexed_at: string; data: AtpRecord }
18
+
interface NewShard { uri: string; cid: string; creator_did: string; indexed_at: string; data: AtpRecord }
19
+
interface NewChannel { uri: string; cid: string; creator_did: string; indexed_at: string; data: AtpRecord }
21
+
interface NewChannelInvite {
24
+
creator_did: string;
28
+
recipient_did?: string;
31
+
interface NewChannelMembership {
37
+
recipient_did?: string;
40
+
const delay = (ms: number) => new Promise(resolve => setTimeout(resolve, ms));
42
+
async function processSingleRepo(pdsHostname: string, did: string): Promise<void> {
43
+
const pdsBaseUrl = `https://${pdsHostname}`
44
+
const getRepoUrl = new URL(`/xrpc/com.atproto.sync.getRepo`, pdsBaseUrl)
45
+
getRepoUrl.searchParams.set('did', did)
const { CarReader } = await import('@ipld/car');
26
-
const response = await fetch(getRepoUrl.href);
52
+
const response = await fetch(getRepoUrl.href)
28
-
throw new Error(`Failed to getRepo for ${did}: ${response.status} ${response.statusText}`);
54
+
throw new Error(`Failed to getRepo for ${did}: ${response.status} ${response.statusText}`)
56
+
const carBytes = new Uint8Array(await response.arrayBuffer())
57
+
car = await CarReader.fromBytes(carBytes)
58
+
const roots = await car.getRoots()
59
+
if (roots.length === 0) {
60
+
throw new Error('CAR file has no root CID')
30
-
const carBytes = new Uint8Array(await response.arrayBuffer());
31
-
car = await CarReader.fromBytes(carBytes);
62
+
carRoot = roots[0].toString()
33
-
console.error(`[${did}] Failed to fetch or parse CAR: ${e.message}`);
64
+
console.error(`[${did}] Failed to fetch or parse CAR: ${e.message}`)
37
-
const recordsToInsert: NewFirehoseEvent[] = [];
68
+
const newLattices: NewLattice[] = []
69
+
const newShards: NewShard[] = []
70
+
const newChannels: NewChannel[] = []
71
+
const newInvites: NewChannelInvite[] = []
72
+
const newMemberships: NewChannelMembership[] = []
74
+
const LATTICE_LEXICON = 'systems.gmstn.development.lattice'
75
+
const SHARD_LEXICON = 'systems.gmstn.development.shard'
76
+
const CHANNEL_LEXICON = 'systems.gmstn.development.channel'
77
+
const CHANNEL_INVITE_LEXICON = 'systems.gmstn.development.channel.invite'
78
+
const CHANNEL_MEMBERSHIP_LEXICON = 'systems.gmstn.development.channel.membership'
const cbor = await import('@ipld/dag-cbor');
for await (const block of car.blocks()) {
43
-
const record = cbor.decode(block.bytes) as AtpRecord;
83
+
const record = cbor.decode(block.bytes) as AtpRecord
84
+
const cid = block.cid.toString()
86
+
const collection = record?.$type
48
-
typeof record.$type === 'string' &&
49
-
record.$type.startsWith('systems.gmstn.')
90
+
typeof collection === 'string' &&
91
+
collection.startsWith('systems.gmstn.development.')
if (!record.createdAt || typeof record.createdAt !== 'string') {
52
-
console.warn(`[${did}] Found matching record without valid 'createdAt', skipping.`);
94
+
console.warn(`[${did}] Found matching record without valid 'createdAt', skipping.`)
98
+
const uri = `at://${did}/${collection}/${cid}`
100
+
const baseRecord = {
103
+
indexed_at: record.createdAt,
56
-
recordsToInsert.push({
57
-
timestamp: record.createdAt,
58
-
event_type: record.$type,
107
+
switch (collection) {
108
+
case LATTICE_LEXICON:
114
+
case SHARD_LEXICON:
120
+
case CHANNEL_LEXICON:
126
+
case CHANNEL_INVITE_LEXICON: {
127
+
const recipientDid = record.recipient
129
+
const existingAccount = await db.selectFrom('account')
131
+
.where('did', '=', recipientDid)
132
+
.executeTakeFirst()
134
+
if (!existingAccount) {
136
+
const newAccount: NewAccount = {
138
+
handle: recipientDid,
139
+
// We'll probably resolve this later, no problem :3
140
+
pds_hostname: null,
141
+
created_at: sql`now()`,
143
+
await db.insertInto('account')
144
+
.values(newAccount)
145
+
.onConflict(oc => oc.column('did').doNothing())
147
+
console.log(`[${did}] Created new placeholder account entry for recipient ${recipientDid}.`)
149
+
console.error(`[${did}] Failed to upsert recipient account ${recipientDid}:`, e)
157
+
channel: record.channel.cid,
158
+
recipient_did: recipientDid,
159
+
} as NewChannelInvite)
162
+
case CHANNEL_MEMBERSHIP_LEXICON:
163
+
newMemberships.push({
165
+
channel: record.channel.cid,
166
+
recipient_did: did,
167
+
} as NewChannelMembership)
170
+
console.warn(`[${did}] Unhandled 'systems.gmstn.development.*' lexicon: ${collection}`)
64
-
console.error(`[${did}] Error parsing CAR blocks: ${e.message}. Skipping rest of repo.`);
175
+
console.error(`[${did}] Error parsing CAR blocks: ${e.message}. Skipping rest of repo.`)
179
+
if (newLattices.length > 0) {
180
+
await insertRecords('lattice', newLattices, did)
182
+
if (newShards.length > 0) {
183
+
await insertRecords('shard', newShards, did)
185
+
if (newChannels.length > 0) {
186
+
await insertRecords('channel', newChannels, did)
188
+
if (newInvites.length > 0) {
189
+
await insertRecords('channel_invite', newInvites, did)
191
+
if (newMemberships.length > 0) {
192
+
await insertRecords('channel_membership', newMemberships, did)
68
-
if (recordsToInsert.length > 0) {
196
+
async function insertRecords(table: string, records: any[], did: string) {
197
+
if (records.length === 0) return;
70
-
await db.insertInto('firehose_event').values(recordsToInsert).execute();
71
-
console.log(`[${did}] Inserted ${recordsToInsert.length} 'systems.gmstn.*' records.`);
199
+
await db.insertInto(table as any)
202
+
console.log(`[${did}] Inserted ${records.length} records into '${table}'.`);
73
-
console.error(`[${did}] Failed to insert records into DB: ${e.message}`);
204
+
console.error(`[${did}] Failed to insert records into '${table}': ${e.message}`);
async function backfillPds(pdsHostname: string) {
···
console.log(`Fetched ${dids.length} repos. Cursor: ${cursor}`);
236
+
const newAccounts: NewAccount[] = dids.map(repo => ({
239
+
pds_hostname: pdsHostname,
240
+
created_at: sql`now()`,
243
+
if (newAccounts.length > 0) {
245
+
await db.insertInto('account')
246
+
.values(newAccounts)
247
+
.onConflict((oc) => oc
250
+
pds_hostname: sql`excluded.pds_hostname`,
254
+
console.log(`Successfully bulk upserted ${newAccounts.length} accounts.`);
256
+
console.error(`Failed to bulk upsert accounts: ${e.message}`);
for (let i = 0; i < dids.length; i += BATCH_SIZE) {
const batch = dids.slice(i, i + BATCH_SIZE);
···
134
-
let pdsesToBackfill: { hostname: string }[] = [];
288
+
const CONSTANT_DELAY_MS = 10000;
137
-
pdsesToBackfill = await db
139
-
.select('hostname')
140
-
.where('backfilled_at', 'is', null)
144
-
.when('hostname', 'like', '%.bsky.network')
150
-
.orderBy('added_at', 'asc')
291
+
let pdsesToBackfill: { hostname: string }[] = [];
153
-
if (pdsesToBackfill.length === 0) {
154
-
console.log('No PDSs to backfill. All caught up! Exiting.');
155
-
await db.destroy();
159
-
console.log(`Found ${pdsesToBackfill.length} PDS(s) to backfill. Starting job...`);
162
-
console.error('Failed to fetch PDS list from database:', e.message);
166
-
for (const pds of pdsesToBackfill) {
168
-
await backfillPds(pds.hostname);
170
-
console.error(`---`);
171
-
console.error(`Job for ${pds.hostname} failed. Moving to next PDS.`);
172
-
console.error(`---`);
294
+
pdsesToBackfill = await db
296
+
.select('hostname')
297
+
.where((eb) => eb.or([
298
+
eb('backfilled_at', 'is', null),
299
+
eb('backfilled_at', '<', sql`now() - interval '24 hours'`),
304
+
.when('hostname', 'like', '%.bsky.network')
310
+
.orderBy('added_at', 'asc')
313
+
if (pdsesToBackfill.length === 0) {
314
+
console.log('No PDSs currently needing backfill. Waiting and checking again...');
316
+
console.log(`Found ${pdsesToBackfill.length} PDS(s) to backfill. Starting job...`);
318
+
for (const pds of pdsesToBackfill) {
320
+
await backfillPds(pds.hostname);
322
+
console.error(`---`);
323
+
console.error(`Job for ${pds.hostname} failed. Moving to next PDS.`);
324
+
console.error(`---`);
329
+
console.error('Fatal error during continuous backfill loop:', e.message);
176
-
console.log('All backfill jobs complete. Closing database connection.');
177
-
await db.destroy();
332
+
console.log(`Waiting for ${CONSTANT_DELAY_MS / 1000} seconds before next pass.`);
333
+
await delay(CONSTANT_DELAY_MS);
if (require.main === module) {