Monorepo for Wisp.place. A static site hosting service built on top of the AT Protocol.

add backfill

Changed files
+177 -2
hosting-service
+3 -2
hosting-service/package.json
···
"version": "1.0.0",
"type": "module",
"scripts": {
-
"dev": "tsx watch src/index.ts",
"build": "tsc",
-
"start": "tsx src/index.ts"
},
"dependencies": {
"@atproto/api": "^0.17.4",
···
"version": "1.0.0",
"type": "module",
"scripts": {
+
"dev": "tsx --env-file=.env watch src/index.ts",
"build": "tsc",
+
"start": "tsx --env-file=.env src/index.ts",
+
"backfill": "tsx --env-file=.env src/index.ts --backfill"
},
"dependencies": {
"@atproto/api": "^0.17.4",
+19
hosting-service/src/index.ts
···
import { FirehoseWorker } from './lib/firehose';
import { logger } from './lib/observability';
import { mkdirSync, existsSync } from 'fs';
const PORT = process.env.PORT ? parseInt(process.env.PORT) : 3001;
const CACHE_DIR = process.env.CACHE_DIR || './cache/sites';
// Ensure cache directory exists
if (!existsSync(CACHE_DIR)) {
···
});
firehose.start();
// Add health check endpoint
app.get('/health', (c) => {
···
import { FirehoseWorker } from './lib/firehose';
import { logger } from './lib/observability';
import { mkdirSync, existsSync } from 'fs';
+
import { backfillCache } from './lib/backfill';
const PORT = process.env.PORT ? parseInt(process.env.PORT) : 3001;
const CACHE_DIR = process.env.CACHE_DIR || './cache/sites';
+
+
// Parse CLI arguments
+
const args = process.argv.slice(2);
+
const hasBackfillFlag = args.includes('--backfill');
+
const backfillOnStartup = hasBackfillFlag || process.env.BACKFILL_ON_STARTUP === 'true';
// Ensure cache directory exists
if (!existsSync(CACHE_DIR)) {
···
});
firehose.start();
+
+
// Run backfill if requested
+
if (backfillOnStartup) {
+
console.log('🔄 Backfill requested, starting cache backfill...');
+
backfillCache({
+
skipExisting: true,
+
concurrency: 3,
+
}).then((stats) => {
+
console.log('✅ Cache backfill completed');
+
}).catch((err) => {
+
console.error('❌ Cache backfill error:', err);
+
});
+
}
// Add health check endpoint
app.get('/health', (c) => {
+136
hosting-service/src/lib/backfill.ts
···
···
+
import { getAllSites } from './db';
+
import { fetchSiteRecord, getPdsForDid, downloadAndCacheSite, isCached } from './utils';
+
import { logger } from './observability';
+
+
export interface BackfillOptions {
+
skipExisting?: boolean; // Skip sites already in cache
+
concurrency?: number; // Number of sites to cache concurrently
+
maxSites?: number; // Maximum number of sites to backfill (for testing)
+
}
+
+
export interface BackfillStats {
+
total: number;
+
cached: number;
+
skipped: number;
+
failed: number;
+
duration: number;
+
}
+
+
/**
+
* Backfill all sites from the database into the local cache
+
*/
+
export async function backfillCache(options: BackfillOptions = {}): Promise<BackfillStats> {
+
const {
+
skipExisting = true,
+
concurrency = 3,
+
maxSites,
+
} = options;
+
+
const startTime = Date.now();
+
const stats: BackfillStats = {
+
total: 0,
+
cached: 0,
+
skipped: 0,
+
failed: 0,
+
duration: 0,
+
};
+
+
logger.info('Starting cache backfill', { skipExisting, concurrency, maxSites });
+
console.log(`
+
╔══════════════════════════════════════════╗
+
║ CACHE BACKFILL STARTING ║
+
╚══════════════════════════════════════════╝
+
`);
+
+
try {
+
// Get all sites from database
+
let sites = await getAllSites();
+
stats.total = sites.length;
+
+
logger.info(`Found ${sites.length} sites in database`);
+
console.log(`📊 Found ${sites.length} sites in database`);
+
+
// Limit if specified
+
if (maxSites && maxSites > 0) {
+
sites = sites.slice(0, maxSites);
+
console.log(`⚙️ Limited to ${maxSites} sites for backfill`);
+
}
+
+
// Process sites in batches
+
const batches: typeof sites[] = [];
+
for (let i = 0; i < sites.length; i += concurrency) {
+
batches.push(sites.slice(i, i + concurrency));
+
}
+
+
let processed = 0;
+
for (const batch of batches) {
+
await Promise.all(
+
batch.map(async (site) => {
+
try {
+
// Check if already cached
+
if (skipExisting && isCached(site.did, site.rkey)) {
+
stats.skipped++;
+
processed++;
+
logger.debug(`Skipping already cached site`, { did: site.did, rkey: site.rkey });
+
console.log(`⏭️ [${processed}/${sites.length}] Skipped (cached): ${site.display_name || site.rkey}`);
+
return;
+
}
+
+
// Fetch site record
+
const siteData = await fetchSiteRecord(site.did, site.rkey);
+
if (!siteData) {
+
stats.failed++;
+
processed++;
+
logger.error('Site record not found during backfill', null, { did: site.did, rkey: site.rkey });
+
console.log(`❌ [${processed}/${sites.length}] Failed (not found): ${site.display_name || site.rkey}`);
+
return;
+
}
+
+
// Get PDS endpoint
+
const pdsEndpoint = await getPdsForDid(site.did);
+
if (!pdsEndpoint) {
+
stats.failed++;
+
processed++;
+
logger.error('PDS not found during backfill', null, { did: site.did });
+
console.log(`❌ [${processed}/${sites.length}] Failed (no PDS): ${site.display_name || site.rkey}`);
+
return;
+
}
+
+
// Download and cache site
+
await downloadAndCacheSite(site.did, site.rkey, siteData.record, pdsEndpoint, siteData.cid);
+
stats.cached++;
+
processed++;
+
logger.info('Successfully cached site during backfill', { did: site.did, rkey: site.rkey });
+
console.log(`✅ [${processed}/${sites.length}] Cached: ${site.display_name || site.rkey}`);
+
} catch (err) {
+
stats.failed++;
+
processed++;
+
logger.error('Failed to cache site during backfill', err, { did: site.did, rkey: site.rkey });
+
console.log(`❌ [${processed}/${sites.length}] Failed: ${site.display_name || site.rkey}`);
+
}
+
})
+
);
+
}
+
+
stats.duration = Date.now() - startTime;
+
+
console.log(`
+
╔══════════════════════════════════════════╗
+
║ CACHE BACKFILL COMPLETED ║
+
╚══════════════════════════════════════════╝
+
+
📊 Total Sites: ${stats.total}
+
✅ Cached: ${stats.cached}
+
⏭️ Skipped: ${stats.skipped}
+
❌ Failed: ${stats.failed}
+
⏱️ Duration: ${(stats.duration / 1000).toFixed(2)}s
+
`);
+
+
logger.info('Cache backfill completed', stats);
+
} catch (err) {
+
logger.error('Cache backfill failed', err);
+
console.error('❌ Cache backfill failed:', err);
+
}
+
+
return stats;
+
}
+19
hosting-service/src/lib/db.ts
···
}
}
/**
* Generate a numeric lock ID from a string key
* PostgreSQL advisory locks use bigint (64-bit signed integer)
···
}
}
+
export interface SiteRecord {
+
did: string;
+
rkey: string;
+
display_name?: string;
+
}
+
+
export async function getAllSites(): Promise<SiteRecord[]> {
+
try {
+
const result = await sql<SiteRecord[]>`
+
SELECT did, rkey, display_name FROM sites
+
ORDER BY created_at DESC
+
`;
+
return result;
+
} catch (err) {
+
console.error('Failed to get all sites', err);
+
return [];
+
}
+
}
+
/**
* Generate a numeric lock ID from a string key
* PostgreSQL advisory locks use bigint (64-bit signed integer)