Monorepo for wisp.place. A static site hosting service built on top of the AT Protocol. wisp.place

hosting service now is using node.js for runtime, better lexicon validation across main app and microservice

Changed files
+756 -253
hosting-service
src
lexicons
types
place
wisp
lib
routes
-60
hosting-service/bun.lock
···
-
{
-
"lockfileVersion": 1,
-
"workspaces": {
-
"": {
-
"name": "wisp-hosting-service",
-
"dependencies": {
-
"@atproto/api": "^0.13.20",
-
"@atproto/xrpc": "^0.6.4",
-
"hono": "^4.6.14",
-
"postgres": "^3.4.5",
-
},
-
"devDependencies": {
-
"@types/bun": "latest",
-
},
-
},
-
},
-
"packages": {
-
"@atproto/api": ["@atproto/api@0.13.35", "", { "dependencies": { "@atproto/common-web": "^0.4.0", "@atproto/lexicon": "^0.4.6", "@atproto/syntax": "^0.3.2", "@atproto/xrpc": "^0.6.8", "await-lock": "^2.2.2", "multiformats": "^9.9.0", "tlds": "^1.234.0", "zod": "^3.23.8" } }, "sha512-vsEfBj0C333TLjDppvTdTE0IdKlXuljKSveAeI4PPx/l6eUKNnDTsYxvILtXUVzwUlTDmSRqy5O4Ryh78n1b7g=="],
-
-
"@atproto/common-web": ["@atproto/common-web@0.4.3", "", { "dependencies": { "graphemer": "^1.4.0", "multiformats": "^9.9.0", "uint8arrays": "3.0.0", "zod": "^3.23.8" } }, "sha512-nRDINmSe4VycJzPo6fP/hEltBcULFxt9Kw7fQk6405FyAWZiTluYHlXOnU7GkQfeUK44OENG1qFTBcmCJ7e8pg=="],
-
-
"@atproto/lexicon": ["@atproto/lexicon@0.4.14", "", { "dependencies": { "@atproto/common-web": "^0.4.2", "@atproto/syntax": "^0.4.0", "iso-datestring-validator": "^2.2.2", "multiformats": "^9.9.0", "zod": "^3.23.8" } }, "sha512-jiKpmH1QER3Gvc7JVY5brwrfo+etFoe57tKPQX/SmPwjvUsFnJAow5xLIryuBaJgFAhnTZViXKs41t//pahGHQ=="],
-
-
"@atproto/syntax": ["@atproto/syntax@0.3.4", "", {}, "sha512-8CNmi5DipOLaVeSMPggMe7FCksVag0aO6XZy9WflbduTKM4dFZVCs4686UeMLfGRXX+X966XgwECHoLYrovMMg=="],
-
-
"@atproto/xrpc": ["@atproto/xrpc@0.6.12", "", { "dependencies": { "@atproto/lexicon": "^0.4.10", "zod": "^3.23.8" } }, "sha512-Ut3iISNLujlmY9Gu8sNU+SPDJDvqlVzWddU8qUr0Yae5oD4SguaUFjjhireMGhQ3M5E0KljQgDbTmnBo1kIZ3w=="],
-
-
"@types/bun": ["@types/bun@1.3.1", "", { "dependencies": { "bun-types": "1.3.1" } }, "sha512-4jNMk2/K9YJtfqwoAa28c8wK+T7nvJFOjxI4h/7sORWcypRNxBpr+TPNaCfVWq70tLCJsqoFwcf0oI0JU/fvMQ=="],
-
-
"@types/node": ["@types/node@24.9.1", "", { "dependencies": { "undici-types": "~7.16.0" } }, "sha512-QoiaXANRkSXK6p0Duvt56W208du4P9Uye9hWLWgGMDTEoKPhuenzNcC4vGUmrNkiOKTlIrBoyNQYNpSwfEZXSg=="],
-
-
"@types/react": ["@types/react@19.2.2", "", { "dependencies": { "csstype": "^3.0.2" } }, "sha512-6mDvHUFSjyT2B2yeNx2nUgMxh9LtOWvkhIU3uePn2I2oyNymUAX1NIsdgviM4CH+JSrp2D2hsMvJOkxY+0wNRA=="],
-
-
"await-lock": ["await-lock@2.2.2", "", {}, "sha512-aDczADvlvTGajTDjcjpJMqRkOF6Qdz3YbPZm/PyW6tKPkx2hlYBzxMhEywM/tU72HrVZjgl5VCdRuMlA7pZ8Gw=="],
-
-
"bun-types": ["bun-types@1.3.1", "", { "dependencies": { "@types/node": "*" }, "peerDependencies": { "@types/react": "^19" } }, "sha512-NMrcy7smratanWJ2mMXdpatalovtxVggkj11bScuWuiOoXTiKIu2eVS1/7qbyI/4yHedtsn175n4Sm4JcdHLXw=="],
-
-
"csstype": ["csstype@3.1.3", "", {}, "sha512-M1uQkMl8rQK/szD0LNhtqxIPLpimGm8sOBwU7lLnCpSbTyY3yeU1Vc7l4KT5zT4s/yOxHH5O7tIuuLOCnLADRw=="],
-
-
"graphemer": ["graphemer@1.4.0", "", {}, "sha512-EtKwoO6kxCL9WO5xipiHTZlSzBm7WLT627TqC/uVRd0HKmq8NXyebnNYxDoBi7wt8eTWrUrKXCOVaFq9x1kgag=="],
-
-
"hono": ["hono@4.10.2", "", {}, "sha512-p6fyzl+mQo6uhESLxbF5WlBOAJMDh36PljwlKtP5V1v09NxlqGru3ShK+4wKhSuhuYf8qxMmrivHOa/M7q0sMg=="],
-
-
"iso-datestring-validator": ["iso-datestring-validator@2.2.2", "", {}, "sha512-yLEMkBbLZTlVQqOnQ4FiMujR6T4DEcCb1xizmvXS+OxuhwcbtynoosRzdMA69zZCShCNAbi+gJ71FxZBBXx1SA=="],
-
-
"multiformats": ["multiformats@9.9.0", "", {}, "sha512-HoMUjhH9T8DDBNT+6xzkrd9ga/XiBI4xLr58LJACwK6G3HTOPeMz4nB4KJs33L2BelrIJa7P0VuNaVF3hMYfjg=="],
-
-
"postgres": ["postgres@3.4.7", "", {}, "sha512-Jtc2612XINuBjIl/QTWsV5UvE8UHuNblcO3vVADSrKsrc6RqGX6lOW1cEo3CM2v0XG4Nat8nI+YM7/f26VxXLw=="],
-
-
"tlds": ["tlds@1.261.0", "", { "bin": { "tlds": "bin.js" } }, "sha512-QXqwfEl9ddlGBaRFXIvNKK6OhipSiLXuRuLJX5DErz0o0Q0rYxulWLdFryTkV5PkdZct5iMInwYEGe/eR++1AA=="],
-
-
"uint8arrays": ["uint8arrays@3.0.0", "", { "dependencies": { "multiformats": "^9.4.2" } }, "sha512-HRCx0q6O9Bfbp+HHSfQQKD7wU70+lydKVt4EghkdOvlK/NlrF90z+eXV34mUd48rNvVJXwkrMSPpCATkct8fJA=="],
-
-
"undici-types": ["undici-types@7.16.0", "", {}, "sha512-Zz+aZWSj8LE6zoxD+xrjh4VfkIG8Ya6LvYkZqtUQGJPZjYl53ypCaUwWqo7eI0x66KBGeRo+mlBEkMSeSZ38Nw=="],
-
-
"zod": ["zod@3.25.76", "", {}, "sha512-gzUt/qt81nXsFGKIFcC3YnfEAx5NkunCfnDlvuBSSFS02bcXu4Lmea0AFIUwbLWxWPx3d9p8S5QoaujKcNQxcQ=="],
-
-
"@atproto/lexicon/@atproto/syntax": ["@atproto/syntax@0.4.1", "", {}, "sha512-CJdImtLAiFO+0z3BWTtxwk6aY5w4t8orHTMVJgkf++QRJWTxPbIFko/0hrkADB7n2EruDxDSeAgfUGehpH6ngw=="],
-
}
-
}
+13 -5
hosting-service/package.json
···
"version": "1.0.0",
"type": "module",
"scripts": {
-
"dev": "bun --watch src/index.ts",
-
"start": "bun src/index.ts"
+
"dev": "tsx watch src/index.ts",
+
"start": "node --loader tsx src/index.ts"
},
"dependencies": {
+
"@atproto/api": "^0.17.4",
+
"@atproto/identity": "^0.4.9",
+
"@atproto/lexicon": "^0.5.1",
+
"@atproto/sync": "^0.1.35",
+
"@atproto/xrpc": "^0.7.5",
+
"@hono/node-server": "^1.13.7",
"hono": "^4.6.14",
-
"@atproto/api": "^0.13.20",
-
"@atproto/xrpc": "^0.6.4",
+
"mime-types": "^2.1.35",
+
"multiformats": "^13.4.1",
"postgres": "^3.4.5"
},
"devDependencies": {
-
"@types/bun": "latest"
+
"@types/mime-types": "^2.1.4",
+
"@types/node": "^22.10.5",
+
"tsx": "^4.19.2"
}
}
+9 -9
hosting-service/src/index.ts
···
-
import { serve } from 'bun';
-
import app from './server';
-
import { FirehoseWorker } from './lib/firehose';
+
import { serve } from '@hono/node-server';
+
import app from './server.js';
+
import { FirehoseWorker } from './lib/firehose.js';
import { mkdirSync, existsSync } from 'fs';
-
const PORT = process.env.PORT || 3001;
+
const PORT = process.env.PORT ? parseInt(process.env.PORT) : 3001;
const CACHE_DIR = './cache/sites';
// Ensure cache directory exists
···
Server: http://localhost:${PORT}
Health: http://localhost:${PORT}/health
Cache: ${CACHE_DIR}
-
Firehose: Connected to Jetstream
+
Firehose: Connected to Firehose
`);
// Graceful shutdown
-
process.on('SIGINT', () => {
+
process.on('SIGINT', async () => {
console.log('\n🛑 Shutting down...');
firehose.stop();
-
server.stop();
+
server.close();
process.exit(0);
});
-
process.on('SIGTERM', () => {
+
process.on('SIGTERM', async () => {
console.log('\n🛑 Shutting down...');
firehose.stop();
-
server.stop();
+
server.close();
process.exit(0);
});
+127
hosting-service/src/lexicon/lexicons.ts
···
+
/**
+
* GENERATED CODE - DO NOT MODIFY
+
*/
+
import {
+
type LexiconDoc,
+
Lexicons,
+
ValidationError,
+
type ValidationResult,
+
} from '@atproto/lexicon'
+
import { type $Typed, is$typed, maybe$typed } from './util.js'
+
+
export const schemaDict = {
+
PlaceWispFs: {
+
lexicon: 1,
+
id: 'place.wisp.fs',
+
defs: {
+
main: {
+
type: 'record',
+
description: 'Virtual filesystem manifest for a Wisp site',
+
record: {
+
type: 'object',
+
required: ['site', 'root', 'createdAt'],
+
properties: {
+
site: {
+
type: 'string',
+
},
+
root: {
+
type: 'ref',
+
ref: 'lex:place.wisp.fs#directory',
+
},
+
fileCount: {
+
type: 'integer',
+
minimum: 0,
+
maximum: 1000,
+
},
+
createdAt: {
+
type: 'string',
+
format: 'datetime',
+
},
+
},
+
},
+
},
+
file: {
+
type: 'object',
+
required: ['type', 'blob'],
+
properties: {
+
type: {
+
type: 'string',
+
const: 'file',
+
},
+
blob: {
+
type: 'blob',
+
accept: ['*/*'],
+
maxSize: 1000000,
+
description: 'Content blob ref',
+
},
+
},
+
},
+
directory: {
+
type: 'object',
+
required: ['type', 'entries'],
+
properties: {
+
type: {
+
type: 'string',
+
const: 'directory',
+
},
+
entries: {
+
type: 'array',
+
maxLength: 500,
+
items: {
+
type: 'ref',
+
ref: 'lex:place.wisp.fs#entry',
+
},
+
},
+
},
+
},
+
entry: {
+
type: 'object',
+
required: ['name', 'node'],
+
properties: {
+
name: {
+
type: 'string',
+
maxLength: 255,
+
},
+
node: {
+
type: 'union',
+
refs: ['lex:place.wisp.fs#file', 'lex:place.wisp.fs#directory'],
+
},
+
},
+
},
+
},
+
},
+
} as const satisfies Record<string, LexiconDoc>
+
export const schemas = Object.values(schemaDict) satisfies LexiconDoc[]
+
export const lexicons: Lexicons = new Lexicons(schemas)
+
+
export function validate<T extends { $type: string }>(
+
v: unknown,
+
id: string,
+
hash: string,
+
requiredType: true,
+
): ValidationResult<T>
+
export function validate<T extends { $type?: string }>(
+
v: unknown,
+
id: string,
+
hash: string,
+
requiredType?: false,
+
): ValidationResult<T>
+
export function validate(
+
v: unknown,
+
id: string,
+
hash: string,
+
requiredType?: boolean,
+
): ValidationResult {
+
return (requiredType ? is$typed : maybe$typed)(v, id, hash)
+
? lexicons.validate(`${id}#${hash}`, v)
+
: {
+
success: false,
+
error: new ValidationError(
+
`Must be an object with "${hash === 'main' ? id : `${id}#${hash}`}" $type property`,
+
),
+
}
+
}
+
+
export const ids = {
+
PlaceWispFs: 'place.wisp.fs',
+
} as const
+79
hosting-service/src/lexicon/types/place/wisp/fs.ts
···
+
/**
+
* GENERATED CODE - DO NOT MODIFY
+
*/
+
import { type ValidationResult, BlobRef } from '@atproto/lexicon'
+
import { CID } from 'multiformats/cid'
+
import { validate as _validate } from '../../../lexicons'
+
import { type $Typed, is$typed as _is$typed, type OmitKey } from '../../../util'
+
+
const is$typed = _is$typed,
+
validate = _validate
+
const id = 'place.wisp.fs'
+
+
export interface Record {
+
$type: 'place.wisp.fs'
+
site: string
+
root: Directory
+
fileCount?: number
+
createdAt: string
+
[k: string]: unknown
+
}
+
+
const hashRecord = 'main'
+
+
export function isRecord<V>(v: V) {
+
return is$typed(v, id, hashRecord)
+
}
+
+
export function validateRecord<V>(v: V) {
+
return validate<Record & V>(v, id, hashRecord, true)
+
}
+
+
export interface File {
+
$type?: 'place.wisp.fs#file'
+
type: 'file'
+
/** Content blob ref */
+
blob: BlobRef
+
}
+
+
const hashFile = 'file'
+
+
export function isFile<V>(v: V) {
+
return is$typed(v, id, hashFile)
+
}
+
+
export function validateFile<V>(v: V) {
+
return validate<File & V>(v, id, hashFile)
+
}
+
+
export interface Directory {
+
$type?: 'place.wisp.fs#directory'
+
type: 'directory'
+
entries: Entry[]
+
}
+
+
const hashDirectory = 'directory'
+
+
export function isDirectory<V>(v: V) {
+
return is$typed(v, id, hashDirectory)
+
}
+
+
export function validateDirectory<V>(v: V) {
+
return validate<Directory & V>(v, id, hashDirectory)
+
}
+
+
export interface Entry {
+
$type?: 'place.wisp.fs#entry'
+
name: string
+
node: $Typed<File> | $Typed<Directory> | { $type: string }
+
}
+
+
const hashEntry = 'entry'
+
+
export function isEntry<V>(v: V) {
+
return is$typed(v, id, hashEntry)
+
}
+
+
export function validateEntry<V>(v: V) {
+
return validate<Entry & V>(v, id, hashEntry)
+
}
+82
hosting-service/src/lexicon/util.ts
···
+
/**
+
* GENERATED CODE - DO NOT MODIFY
+
*/
+
+
import { type ValidationResult } from '@atproto/lexicon'
+
+
export type OmitKey<T, K extends keyof T> = {
+
[K2 in keyof T as K2 extends K ? never : K2]: T[K2]
+
}
+
+
export type $Typed<V, T extends string = string> = V & { $type: T }
+
export type Un$Typed<V extends { $type?: string }> = OmitKey<V, '$type'>
+
+
export type $Type<Id extends string, Hash extends string> = Hash extends 'main'
+
? Id
+
: `${Id}#${Hash}`
+
+
function isObject<V>(v: V): v is V & object {
+
return v != null && typeof v === 'object'
+
}
+
+
function is$type<Id extends string, Hash extends string>(
+
$type: unknown,
+
id: Id,
+
hash: Hash,
+
): $type is $Type<Id, Hash> {
+
return hash === 'main'
+
? $type === id
+
: // $type === `${id}#${hash}`
+
typeof $type === 'string' &&
+
$type.length === id.length + 1 + hash.length &&
+
$type.charCodeAt(id.length) === 35 /* '#' */ &&
+
$type.startsWith(id) &&
+
$type.endsWith(hash)
+
}
+
+
export type $TypedObject<
+
V,
+
Id extends string,
+
Hash extends string,
+
> = V extends {
+
$type: $Type<Id, Hash>
+
}
+
? V
+
: V extends { $type?: string }
+
? V extends { $type?: infer T extends $Type<Id, Hash> }
+
? V & { $type: T }
+
: never
+
: V & { $type: $Type<Id, Hash> }
+
+
export function is$typed<V, Id extends string, Hash extends string>(
+
v: V,
+
id: Id,
+
hash: Hash,
+
): v is $TypedObject<V, Id, Hash> {
+
return isObject(v) && '$type' in v && is$type(v.$type, id, hash)
+
}
+
+
export function maybe$typed<V, Id extends string, Hash extends string>(
+
v: V,
+
id: Id,
+
hash: Hash,
+
): v is V & object & { $type?: $Type<Id, Hash> } {
+
return (
+
isObject(v) &&
+
('$type' in v ? v.$type === undefined || is$type(v.$type, id, hash) : true)
+
)
+
}
+
+
export type Validator<R = unknown> = (v: unknown) => ValidationResult<R>
+
export type ValidatorParam<V extends Validator> =
+
V extends Validator<infer R> ? R : never
+
+
/**
+
* Utility function that allows to convert a "validate*" utility function into a
+
* type predicate.
+
*/
+
export function asPredicate<V extends Validator>(validate: V) {
+
return function <T>(v: T): v is T & ValidatorParam<V> {
+
return validate(v).success
+
}
+
}
+70 -163
hosting-service/src/lib/firehose.ts
···
import { existsSync, rmSync } from 'fs';
-
import type { WispFsRecord } from './types';
import { getPdsForDid, downloadAndCacheSite, extractBlobCid, fetchSiteRecord } from './utils';
import { upsertSite } from './db';
import { safeFetch } from './safe-fetch';
+
import { isRecord, validateRecord } from '../lexicon/types/place/wisp/fs';
+
import { Firehose } from '@atproto/sync';
+
import { IdResolver } from '@atproto/identity';
const CACHE_DIR = './cache/sites';
-
const JETSTREAM_URL = 'wss://jetstream2.us-west.bsky.network/subscribe';
-
const RECONNECT_DELAY = 5000; // 5 seconds
-
const MAX_RECONNECT_DELAY = 60000; // 1 minute
-
-
interface JetstreamCommitEvent {
-
did: string;
-
time_us: number;
-
type: 'com' | 'identity' | 'account';
-
kind: 'commit';
-
commit: {
-
rev: string;
-
operation: 'create' | 'update' | 'delete';
-
collection: string;
-
rkey: string;
-
record?: any;
-
cid?: string;
-
};
-
}
-
-
interface JetstreamIdentityEvent {
-
did: string;
-
time_us: number;
-
type: 'identity';
-
kind: 'update';
-
identity: {
-
did: string;
-
handle: string;
-
seq: number;
-
time: string;
-
};
-
}
-
-
interface JetstreamAccountEvent {
-
did: string;
-
time_us: number;
-
type: 'account';
-
kind: 'update' | 'delete';
-
account: {
-
active: boolean;
-
did: string;
-
seq: number;
-
time: string;
-
};
-
}
-
-
type JetstreamEvent =
-
| JetstreamCommitEvent
-
| JetstreamIdentityEvent
-
| JetstreamAccountEvent;
export class FirehoseWorker {
-
private ws: WebSocket | null = null;
-
private reconnectAttempts = 0;
-
private reconnectTimeout: Timer | null = null;
+
private firehose: Firehose | null = null;
+
private idResolver: IdResolver;
private isShuttingDown = false;
private lastEventTime = Date.now();
constructor(
private logger?: (msg: string, data?: Record<string, unknown>) => void,
-
) {}
+
) {
+
this.idResolver = new IdResolver();
+
}
private log(msg: string, data?: Record<string, unknown>) {
const log = this.logger || console.log;
···
this.log('Stopping firehose worker');
this.isShuttingDown = true;
-
if (this.reconnectTimeout) {
-
clearTimeout(this.reconnectTimeout);
-
this.reconnectTimeout = null;
-
}
-
-
if (this.ws) {
-
this.ws.close();
-
this.ws = null;
+
if (this.firehose) {
+
this.firehose.destroy();
+
this.firehose = null;
}
}
private connect() {
if (this.isShuttingDown) return;
-
const url = new URL(JETSTREAM_URL);
-
url.searchParams.set('wantedCollections', 'place.wisp.fs');
+
this.log('Connecting to AT Protocol firehose');
-
this.log('Connecting to Jetstream', { url: url.toString() });
+
this.firehose = new Firehose({
+
idResolver: this.idResolver,
+
service: 'wss://bsky.network',
+
filterCollections: ['place.wisp.fs'],
+
handleEvent: async (evt) => {
+
this.lastEventTime = Date.now();
-
try {
-
this.ws = new WebSocket(url.toString());
+
// Watch for write events
+
if (evt.event === 'create' || evt.event === 'update') {
+
const record = evt.record;
-
this.ws.onopen = () => {
-
this.log('Connected to Jetstream');
-
this.reconnectAttempts = 0;
-
this.lastEventTime = Date.now();
-
};
+
// If the write is a valid place.wisp.fs record
+
if (
+
evt.collection === 'place.wisp.fs' &&
+
isRecord(record) &&
+
validateRecord(record).success
+
) {
+
this.log('Received place.wisp.fs event', {
+
did: evt.did,
+
event: evt.event,
+
rkey: evt.rkey,
+
});
-
this.ws.onmessage = async (event) => {
-
this.lastEventTime = Date.now();
-
-
try {
-
const data = JSON.parse(event.data as string) as JetstreamEvent;
-
await this.handleEvent(data);
-
} catch (err) {
-
this.log('Error processing event', {
-
error: err instanceof Error ? err.message : String(err),
+
try {
+
await this.handleCreateOrUpdate(evt.did, evt.rkey, record, evt.cid?.toString());
+
} catch (err) {
+
this.log('Error handling event', {
+
did: evt.did,
+
event: evt.event,
+
rkey: evt.rkey,
+
error: err instanceof Error ? err.message : String(err),
+
});
+
}
+
}
+
} else if (evt.event === 'delete' && evt.collection === 'place.wisp.fs') {
+
this.log('Received delete event', {
+
did: evt.did,
+
rkey: evt.rkey,
});
-
}
-
};
-
this.ws.onerror = (error) => {
-
this.log('WebSocket error', { error: String(error) });
-
};
-
-
this.ws.onclose = () => {
-
this.log('WebSocket closed');
-
this.ws = null;
-
-
if (!this.isShuttingDown) {
-
this.scheduleReconnect();
+
try {
+
await this.handleDelete(evt.did, evt.rkey);
+
} catch (err) {
+
this.log('Error handling delete', {
+
did: evt.did,
+
rkey: evt.rkey,
+
error: err instanceof Error ? err.message : String(err),
+
});
+
}
}
-
};
-
} catch (err) {
-
this.log('Failed to create WebSocket', {
-
error: err instanceof Error ? err.message : String(err),
-
});
-
this.scheduleReconnect();
-
}
-
}
-
-
private scheduleReconnect() {
-
if (this.isShuttingDown) return;
-
-
this.reconnectAttempts++;
-
const delay = Math.min(
-
RECONNECT_DELAY * Math.pow(2, this.reconnectAttempts - 1),
-
MAX_RECONNECT_DELAY,
-
);
-
-
this.log(`Scheduling reconnect attempt ${this.reconnectAttempts}`, {
-
delay: `${delay}ms`,
-
});
-
-
this.reconnectTimeout = setTimeout(() => {
-
this.connect();
-
}, delay);
-
}
-
-
private async handleEvent(event: JetstreamEvent) {
-
if (event.kind !== 'commit') return;
-
-
const commitEvent = event as JetstreamCommitEvent;
-
const { commit, did } = commitEvent;
-
-
if (commit.collection !== 'place.wisp.fs') return;
-
-
this.log('Received place.wisp.fs event', {
-
did,
-
operation: commit.operation,
-
rkey: commit.rkey,
+
},
+
onError: (err) => {
+
this.log('Firehose error', {
+
error: err instanceof Error ? err.message : String(err),
+
stack: err instanceof Error ? err.stack : undefined,
+
fullError: err,
+
});
+
console.error('Full firehose error:', err);
+
},
});
-
try {
-
if (commit.operation === 'create' || commit.operation === 'update') {
-
// Pass the CID from the event for verification
-
await this.handleCreateOrUpdate(did, commit.rkey, commit.record, commit.cid);
-
} else if (commit.operation === 'delete') {
-
await this.handleDelete(did, commit.rkey);
-
}
-
} catch (err) {
-
this.log('Error handling event', {
-
did,
-
operation: commit.operation,
-
rkey: commit.rkey,
-
error: err instanceof Error ? err.message : String(err),
-
});
-
}
+
this.firehose.start();
+
this.log('Firehose started');
}
private async handleCreateOrUpdate(did: string, site: string, record: any, eventCid?: string) {
this.log('Processing create/update', { did, site });
-
if (!this.validateRecord(record)) {
-
this.log('Invalid record structure, skipping', { did, site });
-
return;
-
}
-
-
const fsRecord = record as WispFsRecord;
+
// Record is already validated in handleEvent
+
const fsRecord = record;
const pdsEndpoint = await getPdsForDid(did);
if (!pdsEndpoint) {
···
this.log('Successfully processed delete', { did, site });
}
-
private validateRecord(record: any): boolean {
-
if (!record || typeof record !== 'object') return false;
-
if (record.$type !== 'place.wisp.fs') return false;
-
if (!record.root || typeof record.root !== 'object') return false;
-
if (!record.site || typeof record.site !== 'string') return false;
-
return true;
-
}
-
private deleteCache(did: string, site: string) {
const cacheDir = `${CACHE_DIR}/${did}/${site}`;
···
}
getHealth() {
-
const isConnected = this.ws !== null && this.ws.readyState === WebSocket.OPEN;
+
const isConnected = this.firehose !== null;
const timeSinceLastEvent = Date.now() - this.lastEventTime;
return {
connected: isConnected,
-
reconnectAttempts: this.reconnectAttempts,
lastEventTime: this.lastEventTime,
timeSinceLastEvent,
healthy: isConnected && timeSinceLastEvent < 300000, // 5 minutes
+15 -14
hosting-service/src/server.ts
···
import { Hono } from 'hono';
-
import { serveStatic } from 'hono/bun';
import { getWispDomain, getCustomDomain, getCustomDomainByHash } from './lib/db';
import { resolveDid, getPdsForDid, fetchSiteRecord, downloadAndCacheSite, getCachedFilePath, isCached, sanitizePath } from './lib/utils';
import { rewriteHtmlPaths, isHtmlContent } from './lib/html-rewriter';
-
import { existsSync } from 'fs';
+
import { existsSync, readFileSync } from 'fs';
+
import { lookup } from 'mime-types';
const app = new Hono();
···
const cachedFile = getCachedFilePath(did, rkey, requestPath);
if (existsSync(cachedFile)) {
-
const file = Bun.file(cachedFile);
-
return new Response(file, {
+
const content = readFileSync(cachedFile);
+
const mimeType = lookup(cachedFile) || 'application/octet-stream';
+
return new Response(content, {
headers: {
-
'Content-Type': file.type || 'application/octet-stream',
+
'Content-Type': mimeType,
},
});
}
···
if (!requestPath.includes('.')) {
const indexFile = getCachedFilePath(did, rkey, `${requestPath}/index.html`);
if (existsSync(indexFile)) {
-
const file = Bun.file(indexFile);
-
return new Response(file, {
+
const content = readFileSync(indexFile);
+
return new Response(content, {
headers: {
'Content-Type': 'text/html; charset=utf-8',
},
···
const cachedFile = getCachedFilePath(did, rkey, requestPath);
if (existsSync(cachedFile)) {
-
const file = Bun.file(cachedFile);
+
const mimeType = lookup(cachedFile) || 'application/octet-stream';
// Check if this is HTML content that needs rewriting
-
if (isHtmlContent(requestPath, file.type)) {
-
const content = await file.text();
+
if (isHtmlContent(requestPath, mimeType)) {
+
const content = readFileSync(cachedFile, 'utf-8');
const rewritten = rewriteHtmlPaths(content, basePath);
return new Response(rewritten, {
headers: {
···
}
// Non-HTML files served with proper MIME type
-
return new Response(file, {
+
const content = readFileSync(cachedFile);
+
return new Response(content, {
headers: {
-
'Content-Type': file.type || 'application/octet-stream',
+
'Content-Type': mimeType,
},
});
}
···
if (!requestPath.includes('.')) {
const indexFile = getCachedFilePath(did, rkey, `${requestPath}/index.html`);
if (existsSync(indexFile)) {
-
const file = Bun.file(indexFile);
-
const content = await file.text();
+
const content = readFileSync(indexFile, 'utf-8');
const rewritten = rewriteHtmlPaths(content, basePath);
return new Response(rewritten, {
headers: {
-1
src/index.ts
···
import { Elysia } from 'elysia'
import { cors } from '@elysiajs/cors'
import { staticPlugin } from '@elysiajs/static'
-
import { openapi, fromTypes } from '@elysiajs/openapi'
import type { Config } from './lib/types'
import { BASE_HOST } from './lib/constants'
+44
src/lexicons/index.ts
···
+
/**
+
* GENERATED CODE - DO NOT MODIFY
+
*/
+
import {
+
type Auth,
+
type Options as XrpcOptions,
+
Server as XrpcServer,
+
type StreamConfigOrHandler,
+
type MethodConfigOrHandler,
+
createServer as createXrpcServer,
+
} from '@atproto/xrpc-server'
+
import { schemas } from './lexicons.js'
+
+
export function createServer(options?: XrpcOptions): Server {
+
return new Server(options)
+
}
+
+
export class Server {
+
xrpc: XrpcServer
+
place: PlaceNS
+
+
constructor(options?: XrpcOptions) {
+
this.xrpc = createXrpcServer(schemas, options)
+
this.place = new PlaceNS(this)
+
}
+
}
+
+
export class PlaceNS {
+
_server: Server
+
wisp: PlaceWispNS
+
+
constructor(server: Server) {
+
this._server = server
+
this.wisp = new PlaceWispNS(server)
+
}
+
}
+
+
export class PlaceWispNS {
+
_server: Server
+
+
constructor(server: Server) {
+
this._server = server
+
}
+
}
+127
src/lexicons/lexicons.ts
···
+
/**
+
* GENERATED CODE - DO NOT MODIFY
+
*/
+
import {
+
type LexiconDoc,
+
Lexicons,
+
ValidationError,
+
type ValidationResult,
+
} from '@atproto/lexicon'
+
import { type $Typed, is$typed, maybe$typed } from './util.js'
+
+
export const schemaDict = {
+
PlaceWispFs: {
+
lexicon: 1,
+
id: 'place.wisp.fs',
+
defs: {
+
main: {
+
type: 'record',
+
description: 'Virtual filesystem manifest for a Wisp site',
+
record: {
+
type: 'object',
+
required: ['site', 'root', 'createdAt'],
+
properties: {
+
site: {
+
type: 'string',
+
},
+
root: {
+
type: 'ref',
+
ref: 'lex:place.wisp.fs#directory',
+
},
+
fileCount: {
+
type: 'integer',
+
minimum: 0,
+
maximum: 1000,
+
},
+
createdAt: {
+
type: 'string',
+
format: 'datetime',
+
},
+
},
+
},
+
},
+
file: {
+
type: 'object',
+
required: ['type', 'blob'],
+
properties: {
+
type: {
+
type: 'string',
+
const: 'file',
+
},
+
blob: {
+
type: 'blob',
+
accept: ['*/*'],
+
maxSize: 1000000,
+
description: 'Content blob ref',
+
},
+
},
+
},
+
directory: {
+
type: 'object',
+
required: ['type', 'entries'],
+
properties: {
+
type: {
+
type: 'string',
+
const: 'directory',
+
},
+
entries: {
+
type: 'array',
+
maxLength: 500,
+
items: {
+
type: 'ref',
+
ref: 'lex:place.wisp.fs#entry',
+
},
+
},
+
},
+
},
+
entry: {
+
type: 'object',
+
required: ['name', 'node'],
+
properties: {
+
name: {
+
type: 'string',
+
maxLength: 255,
+
},
+
node: {
+
type: 'union',
+
refs: ['lex:place.wisp.fs#file', 'lex:place.wisp.fs#directory'],
+
},
+
},
+
},
+
},
+
},
+
} as const satisfies Record<string, LexiconDoc>
+
export const schemas = Object.values(schemaDict) satisfies LexiconDoc[]
+
export const lexicons: Lexicons = new Lexicons(schemas)
+
+
export function validate<T extends { $type: string }>(
+
v: unknown,
+
id: string,
+
hash: string,
+
requiredType: true,
+
): ValidationResult<T>
+
export function validate<T extends { $type?: string }>(
+
v: unknown,
+
id: string,
+
hash: string,
+
requiredType?: false,
+
): ValidationResult<T>
+
export function validate(
+
v: unknown,
+
id: string,
+
hash: string,
+
requiredType?: boolean,
+
): ValidationResult {
+
return (requiredType ? is$typed : maybe$typed)(v, id, hash)
+
? lexicons.validate(`${id}#${hash}`, v)
+
: {
+
success: false,
+
error: new ValidationError(
+
`Must be an object with "${hash === 'main' ? id : `${id}#${hash}`}" $type property`,
+
),
+
}
+
}
+
+
export const ids = {
+
PlaceWispFs: 'place.wisp.fs',
+
} as const
+85
src/lexicons/types/place/wisp/fs.ts
···
+
/**
+
* GENERATED CODE - DO NOT MODIFY
+
*/
+
import { type ValidationResult, BlobRef } from '@atproto/lexicon'
+
import { CID } from 'multiformats/cid'
+
import { validate as _validate } from '../../../lexicons'
+
import { type $Typed, is$typed as _is$typed, type OmitKey } from '../../../util'
+
+
const is$typed = _is$typed,
+
validate = _validate
+
const id = 'place.wisp.fs'
+
+
export interface Main {
+
$type: 'place.wisp.fs'
+
site: string
+
root: Directory
+
fileCount?: number
+
createdAt: string
+
[k: string]: unknown
+
}
+
+
const hashMain = 'main'
+
+
export function isMain<V>(v: V) {
+
return is$typed(v, id, hashMain)
+
}
+
+
export function validateMain<V>(v: V) {
+
return validate<Main & V>(v, id, hashMain, true)
+
}
+
+
export {
+
type Main as Record,
+
isMain as isRecord,
+
validateMain as validateRecord,
+
}
+
+
export interface File {
+
$type?: 'place.wisp.fs#file'
+
type: 'file'
+
/** Content blob ref */
+
blob: BlobRef
+
}
+
+
const hashFile = 'file'
+
+
export function isFile<V>(v: V) {
+
return is$typed(v, id, hashFile)
+
}
+
+
export function validateFile<V>(v: V) {
+
return validate<File & V>(v, id, hashFile)
+
}
+
+
export interface Directory {
+
$type?: 'place.wisp.fs#directory'
+
type: 'directory'
+
entries: Entry[]
+
}
+
+
const hashDirectory = 'directory'
+
+
export function isDirectory<V>(v: V) {
+
return is$typed(v, id, hashDirectory)
+
}
+
+
export function validateDirectory<V>(v: V) {
+
return validate<Directory & V>(v, id, hashDirectory)
+
}
+
+
export interface Entry {
+
$type?: 'place.wisp.fs#entry'
+
name: string
+
node: $Typed<File> | $Typed<Directory> | { $type: string }
+
}
+
+
const hashEntry = 'entry'
+
+
export function isEntry<V>(v: V) {
+
return is$typed(v, id, hashEntry)
+
}
+
+
export function validateEntry<V>(v: V) {
+
return validate<Entry & V>(v, id, hashEntry)
+
}
+82
src/lexicons/util.ts
···
+
/**
+
* GENERATED CODE - DO NOT MODIFY
+
*/
+
+
import { type ValidationResult } from '@atproto/lexicon'
+
+
export type OmitKey<T, K extends keyof T> = {
+
[K2 in keyof T as K2 extends K ? never : K2]: T[K2]
+
}
+
+
export type $Typed<V, T extends string = string> = V & { $type: T }
+
export type Un$Typed<V extends { $type?: string }> = OmitKey<V, '$type'>
+
+
export type $Type<Id extends string, Hash extends string> = Hash extends 'main'
+
? Id
+
: `${Id}#${Hash}`
+
+
function isObject<V>(v: V): v is V & object {
+
return v != null && typeof v === 'object'
+
}
+
+
function is$type<Id extends string, Hash extends string>(
+
$type: unknown,
+
id: Id,
+
hash: Hash,
+
): $type is $Type<Id, Hash> {
+
return hash === 'main'
+
? $type === id
+
: // $type === `${id}#${hash}`
+
typeof $type === 'string' &&
+
$type.length === id.length + 1 + hash.length &&
+
$type.charCodeAt(id.length) === 35 /* '#' */ &&
+
$type.startsWith(id) &&
+
$type.endsWith(hash)
+
}
+
+
export type $TypedObject<
+
V,
+
Id extends string,
+
Hash extends string,
+
> = V extends {
+
$type: $Type<Id, Hash>
+
}
+
? V
+
: V extends { $type?: string }
+
? V extends { $type?: infer T extends $Type<Id, Hash> }
+
? V & { $type: T }
+
: never
+
: V & { $type: $Type<Id, Hash> }
+
+
export function is$typed<V, Id extends string, Hash extends string>(
+
v: V,
+
id: Id,
+
hash: Hash,
+
): v is $TypedObject<V, Id, Hash> {
+
return isObject(v) && '$type' in v && is$type(v.$type, id, hash)
+
}
+
+
export function maybe$typed<V, Id extends string, Hash extends string>(
+
v: V,
+
id: Id,
+
hash: Hash,
+
): v is V & object & { $type?: $Type<Id, Hash> } {
+
return (
+
isObject(v) &&
+
('$type' in v ? v.$type === undefined || is$type(v.$type, id, hash) : true)
+
)
+
}
+
+
export type Validator<R = unknown> = (v: unknown) => ValidationResult<R>
+
export type ValidatorParam<V extends Validator> =
+
V extends Validator<infer R> ? R : never
+
+
/**
+
* Utility function that allows to convert a "validate*" utility function into a
+
* type predicate.
+
*/
+
export function asPredicate<V extends Validator>(validate: V) {
+
return function <T>(v: T): v is T & ValidatorParam<V> {
+
return validate(v).success
+
}
+
}
+10 -1
src/lib/wisp-utils.ts
···
import type { BlobRef } from "@atproto/api";
import type { Record, Directory, File, Entry } from "../lexicon/types/place/wisp/fs";
+
import { validateRecord } from "../lexicon/types/place/wisp/fs";
export interface UploadedFile {
name: string;
···
root: Directory,
fileCount: number
): Record {
-
return {
+
const manifest = {
$type: 'place.wisp.fs' as const,
site: siteName,
root,
fileCount,
createdAt: new Date().toISOString()
};
+
+
// Validate the manifest before returning
+
const validationResult = validateRecord(manifest);
+
if (!validationResult.success) {
+
throw new Error(`Invalid manifest: ${validationResult.error?.message || 'Validation failed'}`);
+
}
+
+
return manifest;
}
/**
+13
src/routes/wisp.ts
···
} from '../lib/wisp-utils'
import { upsertSite } from '../lib/db'
import { logger } from '../lib/logger'
+
import { validateRecord } from '../lexicon/types/place/wisp/fs'
function isValidSiteName(siteName: string): boolean {
if (!siteName || typeof siteName !== 'string') return false;
···
fileCount: 0,
createdAt: new Date().toISOString()
};
+
+
// Validate the manifest
+
const validationResult = validateRecord(emptyManifest);
+
if (!validationResult.success) {
+
throw new Error(`Invalid manifest: ${validationResult.error?.message || 'Validation failed'}`);
+
}
// Use site name as rkey
const rkey = siteName;
···
fileCount: 0,
createdAt: new Date().toISOString()
};
+
+
// Validate the manifest
+
const validationResult = validateRecord(emptyManifest);
+
if (!validationResult.success) {
+
throw new Error(`Invalid manifest: ${validationResult.error?.message || 'Validation failed'}`);
+
}
// Use site name as rkey
const rkey = siteName;