Scratch space for learning atproto app development

tweak linting

dholms 05fbd57d b186646a

+6
biome.json
···
{
"$schema": "https://biomejs.dev/schemas/1.8.3/schema.json",
+
"javascript": {
+
"formatter": {
+
"semicolons": "asNeeded",
+
"quoteStyle": "single"
+
}
+
},
"formatter": {
"indentStyle": "space",
"lineWidth": 120
+7 -7
src/config.ts
···
-
import type pino from "pino";
-
import type { Database } from "#/db";
-
import type { Ingester } from "#/firehose/ingester";
+
import type pino from 'pino'
+
import type { Database } from '#/db'
+
import type { Ingester } from '#/firehose/ingester'
export type AppContext = {
-
db: Database;
-
ingester: Ingester;
-
logger: pino.Logger;
-
};
+
db: Database
+
ingester: Ingester
+
logger: pino.Logger
+
}
+11 -11
src/db/index.ts
···
-
import SqliteDb from "better-sqlite3";
-
import { Kysely, Migrator, SqliteDialect } from "kysely";
-
import { migrationProvider } from "./migrations";
-
import type { DatabaseSchema } from "./schema";
+
import SqliteDb from 'better-sqlite3'
+
import { Kysely, Migrator, SqliteDialect } from 'kysely'
+
import { migrationProvider } from './migrations'
+
import type { DatabaseSchema } from './schema'
export const createDb = (location: string): Database => {
return new Kysely<DatabaseSchema>({
dialect: new SqliteDialect({
database: new SqliteDb(location),
}),
-
});
-
};
+
})
+
}
export const migrateToLatest = async (db: Database) => {
-
const migrator = new Migrator({ db, provider: migrationProvider });
-
const { error } = await migrator.migrateToLatest();
-
if (error) throw error;
-
};
+
const migrator = new Migrator({ db, provider: migrationProvider })
+
const { error } = await migrator.migrateToLatest()
+
if (error) throw error
+
}
-
export type Database = Kysely<DatabaseSchema>;
+
export type Database = Kysely<DatabaseSchema>
+12 -12
src/db/migrations.ts
···
-
import type { Kysely, Migration, MigrationProvider } from "kysely";
+
import type { Kysely, Migration, MigrationProvider } from 'kysely'
-
const migrations: Record<string, Migration> = {};
+
const migrations: Record<string, Migration> = {}
export const migrationProvider: MigrationProvider = {
async getMigrations() {
-
return migrations;
+
return migrations
},
-
};
+
}
-
migrations["001"] = {
+
migrations['001'] = {
async up(db: Kysely<unknown>) {
await db.schema
-
.createTable("post")
-
.addColumn("uri", "varchar", (col) => col.primaryKey())
-
.addColumn("text", "varchar", (col) => col.notNull())
-
.addColumn("indexedAt", "varchar", (col) => col.notNull())
-
.execute();
+
.createTable('post')
+
.addColumn('uri', 'varchar', (col) => col.primaryKey())
+
.addColumn('text', 'varchar', (col) => col.notNull())
+
.addColumn('indexedAt', 'varchar', (col) => col.notNull())
+
.execute()
},
async down(db: Kysely<unknown>) {
-
await db.schema.dropTable("post").execute();
+
await db.schema.dropTable('post').execute()
},
-
};
+
}
+6 -6
src/db/schema.ts
···
export type DatabaseSchema = {
-
post: Post;
-
};
+
post: Post
+
}
export type Post = {
-
uri: string;
-
text: string;
-
indexedAt: string;
-
};
+
uri: string
+
text: string
+
indexedAt: string
+
}
+7 -7
src/env.ts
···
-
import dotenv from "dotenv";
-
import { cleanEnv, host, num, port, str, testOnly } from "envalid";
+
import dotenv from 'dotenv'
+
import { cleanEnv, host, num, port, str, testOnly } from 'envalid'
-
dotenv.config();
+
dotenv.config()
export const env = cleanEnv(process.env, {
-
NODE_ENV: str({ devDefault: testOnly("test"), choices: ["development", "production", "test"] }),
-
HOST: host({ devDefault: testOnly("localhost") }),
+
NODE_ENV: str({ devDefault: testOnly('test'), choices: ['development', 'production', 'test'] }),
+
HOST: host({ devDefault: testOnly('localhost') }),
PORT: port({ devDefault: testOnly(3000) }),
-
CORS_ORIGIN: str({ devDefault: testOnly("http://localhost:3000") }),
+
CORS_ORIGIN: str({ devDefault: testOnly('http://localhost:3000') }),
COMMON_RATE_LIMIT_MAX_REQUESTS: num({ devDefault: testOnly(1000) }),
COMMON_RATE_LIMIT_WINDOW_MS: num({ devDefault: testOnly(1000) }),
-
});
+
})
+88 -94
src/firehose/firehose.ts
···
-
import type { RepoRecord } from "@atproto/lexicon";
-
import { cborToLexRecord, readCar } from "@atproto/repo";
-
import { AtUri } from "@atproto/syntax";
-
import { Subscription } from "@atproto/xrpc-server";
-
import type { CID } from "multiformats/cid";
+
import type { RepoRecord } from '@atproto/lexicon'
+
import { cborToLexRecord, readCar } from '@atproto/repo'
+
import { AtUri } from '@atproto/syntax'
+
import { Subscription } from '@atproto/xrpc-server'
+
import type { CID } from 'multiformats/cid'
import {
type Account,
type Commit,
···
isCommit,
isIdentity,
isValidRepoEvent,
-
} from "./lexicons";
+
} from './lexicons'
type Opts = {
-
service?: string;
-
getCursor?: () => Promise<number | undefined>;
-
setCursor?: (cursor: number) => Promise<void>;
-
subscriptionReconnectDelay?: number;
-
filterCollections?: string[];
-
excludeIdentity?: boolean;
-
excludeAccount?: boolean;
-
excludeCommit?: boolean;
-
};
+
service?: string
+
getCursor?: () => Promise<number | undefined>
+
setCursor?: (cursor: number) => Promise<void>
+
subscriptionReconnectDelay?: number
+
filterCollections?: string[]
+
excludeIdentity?: boolean
+
excludeAccount?: boolean
+
excludeCommit?: boolean
+
}
export class Firehose {
-
public sub: Subscription<RepoEvent>;
-
private abortController: AbortController;
+
public sub: Subscription<RepoEvent>
+
private abortController: AbortController
constructor(public opts: Opts) {
-
this.abortController = new AbortController();
+
this.abortController = new AbortController()
this.sub = new Subscription({
-
service: opts.service ?? "https://bsky.network",
-
method: "com.atproto.sync.subscribeRepos",
+
service: opts.service ?? 'https://bsky.network',
+
method: 'com.atproto.sync.subscribeRepos',
signal: this.abortController.signal,
getParams: async () => {
-
if (!opts.getCursor) return undefined;
-
const cursor = await opts.getCursor();
-
return { cursor };
+
if (!opts.getCursor) return undefined
+
const cursor = await opts.getCursor()
+
return { cursor }
},
validate: (value: unknown) => {
try {
-
return isValidRepoEvent(value);
+
return isValidRepoEvent(value)
} catch (err) {
-
console.error("repo subscription skipped invalid message", err);
+
console.error('repo subscription skipped invalid message', err)
}
},
-
});
+
})
}
async *run(): AsyncGenerator<Event> {
···
for await (const evt of this.sub) {
try {
if (isCommit(evt) && !this.opts.excludeCommit) {
-
const parsed = await parseCommit(evt);
+
const parsed = await parseCommit(evt)
for (const write of parsed) {
-
if (
-
!this.opts.filterCollections ||
-
this.opts.filterCollections.includes(write.uri.collection)
-
) {
-
yield write;
+
if (!this.opts.filterCollections || this.opts.filterCollections.includes(write.uri.collection)) {
+
yield write
}
}
} else if (isAccount(evt) && !this.opts.excludeAccount) {
-
const parsed = parseAccount(evt);
+
const parsed = parseAccount(evt)
if (parsed) {
-
yield parsed;
+
yield parsed
}
} else if (isIdentity(evt) && !this.opts.excludeIdentity) {
-
yield parseIdentity(evt);
+
yield parseIdentity(evt)
}
} catch (err) {
-
console.error("repo subscription could not handle message", err);
+
console.error('repo subscription could not handle message', err)
}
-
if (this.opts.setCursor && typeof evt.seq === "number") {
-
await this.opts.setCursor(evt.seq);
+
if (this.opts.setCursor && typeof evt.seq === 'number') {
+
await this.opts.setCursor(evt.seq)
}
}
} catch (err) {
-
console.error("repo subscription errored", err);
-
setTimeout(
-
() => this.run(),
-
this.opts.subscriptionReconnectDelay ?? 3000
-
);
+
console.error('repo subscription errored', err)
+
setTimeout(() => this.run(), this.opts.subscriptionReconnectDelay ?? 3000)
}
}
destroy() {
-
this.abortController.abort();
+
this.abortController.abort()
}
}
export const parseCommit = async (evt: Commit): Promise<CommitEvt[]> => {
-
const car = await readCar(evt.blocks);
+
const car = await readCar(evt.blocks)
-
const evts: CommitEvt[] = [];
+
const evts: CommitEvt[] = []
for (const op of evt.ops) {
-
const uri = new AtUri(`at://${evt.repo}/${op.path}`);
+
const uri = new AtUri(`at://${evt.repo}/${op.path}`)
const meta: CommitMeta = {
uri,
author: uri.host,
collection: uri.collection,
rkey: uri.rkey,
-
};
+
}
-
if (op.action === "create" || op.action === "update") {
-
if (!op.cid) continue;
-
const recordBytes = car.blocks.get(op.cid);
-
if (!recordBytes) continue;
-
const record = cborToLexRecord(recordBytes);
+
if (op.action === 'create' || op.action === 'update') {
+
if (!op.cid) continue
+
const recordBytes = car.blocks.get(op.cid)
+
if (!recordBytes) continue
+
const record = cborToLexRecord(recordBytes)
evts.push({
...meta,
-
event: op.action as "create" | "update",
+
event: op.action as 'create' | 'update',
cid: op.cid,
record,
-
});
+
})
}
-
if (op.action === "delete") {
+
if (op.action === 'delete') {
evts.push({
...meta,
-
event: "delete",
-
});
+
event: 'delete',
+
})
}
}
-
return evts;
-
};
+
return evts
+
}
export const parseIdentity = (evt: Identity): IdentityEvt => {
return {
-
event: "identity",
+
event: 'identity',
did: evt.did,
handle: evt.handle,
-
};
-
};
+
}
+
}
export const parseAccount = (evt: Account): AccountEvt | undefined => {
-
if (evt.status && !isValidStatus(evt.status)) return;
+
if (evt.status && !isValidStatus(evt.status)) return
return {
-
event: "account",
+
event: 'account',
did: evt.did,
active: evt.active,
status: evt.status as AccountStatus,
-
};
-
};
+
}
+
}
const isValidStatus = (str: string): str is AccountStatus => {
-
return ["takendown", "suspended", "deleted", "deactivated"].includes(str);
-
};
+
return ['takendown', 'suspended', 'deleted', 'deactivated'].includes(str)
+
}
-
type Event = CommitEvt | IdentityEvt | AccountEvt;
+
type Event = CommitEvt | IdentityEvt | AccountEvt
type CommitMeta = {
-
uri: AtUri;
-
author: string;
-
collection: string;
-
rkey: string;
-
};
+
uri: AtUri
+
author: string
+
collection: string
+
rkey: string
+
}
-
type CommitEvt = Create | Update | Delete;
+
type CommitEvt = Create | Update | Delete
type Create = CommitMeta & {
-
event: "create";
-
record: RepoRecord;
-
cid: CID;
-
};
+
event: 'create'
+
record: RepoRecord
+
cid: CID
+
}
type Update = CommitMeta & {
-
event: "update";
-
};
+
event: 'update'
+
}
type Delete = CommitMeta & {
-
event: "delete";
-
};
+
event: 'delete'
+
}
type IdentityEvt = {
-
event: "identity";
-
did: string;
-
handle?: string;
-
};
+
event: 'identity'
+
did: string
+
handle?: string
+
}
type AccountEvt = {
-
event: "account";
-
did: string;
-
active: boolean;
-
status?: AccountStatus;
-
};
+
event: 'account'
+
did: string
+
active: boolean
+
status?: AccountStatus
+
}
-
type AccountStatus = "takendown" | "suspended" | "deleted" | "deactivated";
+
type AccountStatus = 'takendown' | 'suspended' | 'deleted' | 'deactivated'
+10 -10
src/firehose/ingester.ts
···
-
import type { Database } from "#/db";
-
import { Firehose } from "#/firehose/firehose";
+
import type { Database } from '#/db'
+
import { Firehose } from '#/firehose/firehose'
export class Ingester {
-
firehose: Firehose | undefined;
+
firehose: Firehose | undefined
constructor(public db: Database) {}
async start() {
-
const firehose = new Firehose({});
+
const firehose = new Firehose({})
for await (const evt of firehose.run()) {
-
if (evt.event === "create") {
-
if (evt.collection !== "app.bsky.feed.post") continue;
-
const post: any = evt.record; // @TODO fix types
+
if (evt.event === 'create') {
+
if (evt.collection !== 'app.bsky.feed.post') continue
+
const post: any = evt.record // @TODO fix types
await this.db
-
.insertInto("post")
+
.insertInto('post')
.values({
uri: evt.uri.toString(),
text: post.text as string,
indexedAt: new Date().toISOString(),
})
-
.execute();
+
.execute()
}
}
}
destroy() {
-
this.firehose?.destroy();
+
this.firehose?.destroy()
}
}
+148 -205
src/firehose/lexicons.ts
···
-
import type { IncomingMessage } from "node:http";
+
import type { IncomingMessage } from 'node:http'
-
import { type LexiconDoc, Lexicons } from "@atproto/lexicon";
-
import type { ErrorFrame, HandlerAuth } from "@atproto/xrpc-server";
-
import type { CID } from "multiformats/cid";
+
import { type LexiconDoc, Lexicons } from '@atproto/lexicon'
+
import type { ErrorFrame, HandlerAuth } from '@atproto/xrpc-server'
+
import type { CID } from 'multiformats/cid'
// @NOTE: this file is an ugly copy job of codegen output. I'd like to clean this whole thing up
export function isObj(v: unknown): v is Record<string, unknown> {
-
return typeof v === "object" && v !== null;
+
return typeof v === 'object' && v !== null
}
-
export function hasProp<K extends PropertyKey>(
-
data: object,
-
prop: K
-
): data is Record<K, unknown> {
-
return prop in data;
+
export function hasProp<K extends PropertyKey>(data: object, prop: K): data is Record<K, unknown> {
+
return prop in data
}
export interface QueryParams {
/** The last known event seq number to backfill from. */
-
cursor?: number;
+
cursor?: number
}
export type RepoEvent =
···
| Migrate
| Tombstone
| Info
-
| { $type: string; [k: string]: unknown };
-
export type HandlerError = ErrorFrame<"FutureCursor" | "ConsumerTooSlow">;
-
export type HandlerOutput = HandlerError | RepoEvent;
+
| { $type: string; [k: string]: unknown }
+
export type HandlerError = ErrorFrame<'FutureCursor' | 'ConsumerTooSlow'>
+
export type HandlerOutput = HandlerError | RepoEvent
export type HandlerReqCtx<HA extends HandlerAuth = never> = {
-
auth: HA;
-
params: QueryParams;
-
req: IncomingMessage;
-
signal: AbortSignal;
-
};
-
export type Handler<HA extends HandlerAuth = never> = (
-
ctx: HandlerReqCtx<HA>
-
) => AsyncIterable<HandlerOutput>;
+
auth: HA
+
params: QueryParams
+
req: IncomingMessage
+
signal: AbortSignal
+
}
+
export type Handler<HA extends HandlerAuth = never> = (ctx: HandlerReqCtx<HA>) => AsyncIterable<HandlerOutput>
/** Represents an update of repository state. Note that empty commits are allowed, which include no repo data changes, but an update to rev and signature. */
export interface Commit {
/** The stream sequence number of this message. */
-
seq: number;
+
seq: number
/** DEPRECATED -- unused */
-
rebase: boolean;
+
rebase: boolean
/** Indicates that this commit contained too many ops, or data size was too large. Consumers will need to make a separate request to get missing data. */
-
tooBig: boolean;
+
tooBig: boolean
/** The repo this event comes from. */
-
repo: string;
+
repo: string
/** Repo commit object CID. */
-
commit: CID;
+
commit: CID
/** DEPRECATED -- unused. WARNING -- nullable and optional; stick with optional to ensure golang interoperability. */
-
prev?: CID | null;
+
prev?: CID | null
/** The rev of the emitted commit. Note that this information is also in the commit object included in blocks, unless this is a tooBig event. */
-
rev: string;
+
rev: string
/** The rev of the last emitted commit from this repo (if any). */
-
since: string | null;
+
since: string | null
/** CAR file containing relevant blocks, as a diff since the previous repo state. */
-
blocks: Uint8Array;
-
ops: RepoOp[];
-
blobs: CID[];
+
blocks: Uint8Array
+
ops: RepoOp[]
+
blobs: CID[]
/** Timestamp of when this message was originally broadcast. */
-
time: string;
-
[k: string]: unknown;
+
time: string
+
[k: string]: unknown
}
export function isCommit(v: unknown): v is Commit {
-
return (
-
isObj(v) &&
-
hasProp(v, "$type") &&
-
v.$type === "com.atproto.sync.subscribeRepos#commit"
-
);
+
return isObj(v) && hasProp(v, '$type') && v.$type === 'com.atproto.sync.subscribeRepos#commit'
}
/** Represents a change to an account's identity. Could be an updated handle, signing key, or pds hosting endpoint. Serves as a prod to all downstream services to refresh their identity cache. */
export interface Identity {
-
seq: number;
-
did: string;
-
time: string;
+
seq: number
+
did: string
+
time: string
/** The current handle for the account, or 'handle.invalid' if validation fails. This field is optional, might have been validated or passed-through from an upstream source. Semantics and behaviors for PDS vs Relay may evolve in the future; see atproto specs for more details. */
-
handle?: string;
-
[k: string]: unknown;
+
handle?: string
+
[k: string]: unknown
}
export function isIdentity(v: unknown): v is Identity {
-
return (
-
isObj(v) &&
-
hasProp(v, "$type") &&
-
v.$type === "com.atproto.sync.subscribeRepos#identity"
-
);
+
return isObj(v) && hasProp(v, '$type') && v.$type === 'com.atproto.sync.subscribeRepos#identity'
}
/** Represents a change to an account's status on a host (eg, PDS or Relay). The semantics of this event are that the status is at the host which emitted the event, not necessarily that at the currently active PDS. Eg, a Relay takedown would emit a takedown with active=false, even if the PDS is still active. */
export interface Account {
-
seq: number;
-
did: string;
-
time: string;
+
seq: number
+
did: string
+
time: string
/** Indicates that the account has a repository which can be fetched from the host that emitted this event. */
-
active: boolean;
+
active: boolean
/** If active=false, this optional field indicates a reason for why the account is not active. */
-
status?:
-
| "takendown"
-
| "suspended"
-
| "deleted"
-
| "deactivated"
-
| (string & {});
-
[k: string]: unknown;
+
status?: 'takendown' | 'suspended' | 'deleted' | 'deactivated' | (string & {})
+
[k: string]: unknown
}
export function isAccount(v: unknown): v is Account {
-
return (
-
isObj(v) &&
-
hasProp(v, "$type") &&
-
v.$type === "com.atproto.sync.subscribeRepos#account"
-
);
+
return isObj(v) && hasProp(v, '$type') && v.$type === 'com.atproto.sync.subscribeRepos#account'
}
/** DEPRECATED -- Use #identity event instead */
export interface Handle {
-
seq: number;
-
did: string;
-
handle: string;
-
time: string;
-
[k: string]: unknown;
+
seq: number
+
did: string
+
handle: string
+
time: string
+
[k: string]: unknown
}
export function isHandle(v: unknown): v is Handle {
-
return (
-
isObj(v) &&
-
hasProp(v, "$type") &&
-
v.$type === "com.atproto.sync.subscribeRepos#handle"
-
);
+
return isObj(v) && hasProp(v, '$type') && v.$type === 'com.atproto.sync.subscribeRepos#handle'
}
/** DEPRECATED -- Use #account event instead */
export interface Migrate {
-
seq: number;
-
did: string;
-
migrateTo: string | null;
-
time: string;
-
[k: string]: unknown;
+
seq: number
+
did: string
+
migrateTo: string | null
+
time: string
+
[k: string]: unknown
}
export function isMigrate(v: unknown): v is Migrate {
-
return (
-
isObj(v) &&
-
hasProp(v, "$type") &&
-
v.$type === "com.atproto.sync.subscribeRepos#migrate"
-
);
+
return isObj(v) && hasProp(v, '$type') && v.$type === 'com.atproto.sync.subscribeRepos#migrate'
}
/** DEPRECATED -- Use #account event instead */
export interface Tombstone {
-
seq: number;
-
did: string;
-
time: string;
-
[k: string]: unknown;
+
seq: number
+
did: string
+
time: string
+
[k: string]: unknown
}
export function isTombstone(v: unknown): v is Tombstone {
-
return (
-
isObj(v) &&
-
hasProp(v, "$type") &&
-
v.$type === "com.atproto.sync.subscribeRepos#tombstone"
-
);
+
return isObj(v) && hasProp(v, '$type') && v.$type === 'com.atproto.sync.subscribeRepos#tombstone'
}
export interface Info {
-
name: "OutdatedCursor" | (string & {});
-
message?: string;
-
[k: string]: unknown;
+
name: 'OutdatedCursor' | (string & {})
+
message?: string
+
[k: string]: unknown
}
export function isInfo(v: unknown): v is Info {
-
return (
-
isObj(v) &&
-
hasProp(v, "$type") &&
-
v.$type === "com.atproto.sync.subscribeRepos#info"
-
);
+
return isObj(v) && hasProp(v, '$type') && v.$type === 'com.atproto.sync.subscribeRepos#info'
}
/** A repo operation, ie a mutation of a single record. */
export interface RepoOp {
-
action: "create" | "update" | "delete" | (string & {});
-
path: string;
+
action: 'create' | 'update' | 'delete' | (string & {})
+
path: string
/** For creates and updates, the new record CID. For deletions, null. */
-
cid: CID | null;
-
[k: string]: unknown;
+
cid: CID | null
+
[k: string]: unknown
}
export function isRepoOp(v: unknown): v is RepoOp {
-
return (
-
isObj(v) &&
-
hasProp(v, "$type") &&
-
v.$type === "com.atproto.sync.subscribeRepos#repoOp"
-
);
+
return isObj(v) && hasProp(v, '$type') && v.$type === 'com.atproto.sync.subscribeRepos#repoOp'
}
export const ComAtprotoSyncSubscribeRepos: LexiconDoc = {
lexicon: 1,
-
id: "com.atproto.sync.subscribeRepos",
+
id: 'com.atproto.sync.subscribeRepos',
defs: {
main: {
-
type: "subscription",
-
description: "Subscribe to repo updates",
+
type: 'subscription',
+
description: 'Subscribe to repo updates',
parameters: {
-
type: "params",
+
type: 'params',
properties: {
cursor: {
-
type: "integer",
-
description: "The last known event to backfill from.",
+
type: 'integer',
+
description: 'The last known event to backfill from.',
},
},
},
message: {
schema: {
-
type: "union",
+
type: 'union',
refs: [
-
"lex:com.atproto.sync.subscribeRepos#commit",
-
"lex:com.atproto.sync.subscribeRepos#handle",
-
"lex:com.atproto.sync.subscribeRepos#migrate",
-
"lex:com.atproto.sync.subscribeRepos#tombstone",
-
"lex:com.atproto.sync.subscribeRepos#info",
+
'lex:com.atproto.sync.subscribeRepos#commit',
+
'lex:com.atproto.sync.subscribeRepos#handle',
+
'lex:com.atproto.sync.subscribeRepos#migrate',
+
'lex:com.atproto.sync.subscribeRepos#tombstone',
+
'lex:com.atproto.sync.subscribeRepos#info',
],
},
},
errors: [
{
-
name: "FutureCursor",
+
name: 'FutureCursor',
},
{
-
name: "ConsumerTooSlow",
+
name: 'ConsumerTooSlow',
},
],
},
commit: {
-
type: "object",
-
required: [
-
"seq",
-
"rebase",
-
"tooBig",
-
"repo",
-
"commit",
-
"rev",
-
"since",
-
"blocks",
-
"ops",
-
"blobs",
-
"time",
-
],
-
nullable: ["prev", "since"],
+
type: 'object',
+
required: ['seq', 'rebase', 'tooBig', 'repo', 'commit', 'rev', 'since', 'blocks', 'ops', 'blobs', 'time'],
+
nullable: ['prev', 'since'],
properties: {
seq: {
-
type: "integer",
+
type: 'integer',
},
rebase: {
-
type: "boolean",
+
type: 'boolean',
},
tooBig: {
-
type: "boolean",
+
type: 'boolean',
},
repo: {
-
type: "string",
-
format: "did",
+
type: 'string',
+
format: 'did',
},
commit: {
-
type: "cid-link",
+
type: 'cid-link',
},
prev: {
-
type: "cid-link",
+
type: 'cid-link',
},
rev: {
-
type: "string",
-
description: "The rev of the emitted commit",
+
type: 'string',
+
description: 'The rev of the emitted commit',
},
since: {
-
type: "string",
-
description: "The rev of the last emitted commit from this repo",
+
type: 'string',
+
description: 'The rev of the last emitted commit from this repo',
},
blocks: {
-
type: "bytes",
-
description: "CAR file containing relevant blocks",
+
type: 'bytes',
+
description: 'CAR file containing relevant blocks',
maxLength: 1000000,
},
ops: {
-
type: "array",
+
type: 'array',
items: {
-
type: "ref",
-
ref: "lex:com.atproto.sync.subscribeRepos#repoOp",
+
type: 'ref',
+
ref: 'lex:com.atproto.sync.subscribeRepos#repoOp',
},
maxLength: 200,
},
blobs: {
-
type: "array",
+
type: 'array',
items: {
-
type: "cid-link",
+
type: 'cid-link',
},
},
time: {
-
type: "string",
-
format: "datetime",
+
type: 'string',
+
format: 'datetime',
},
},
},
handle: {
-
type: "object",
-
required: ["seq", "did", "handle", "time"],
+
type: 'object',
+
required: ['seq', 'did', 'handle', 'time'],
properties: {
seq: {
-
type: "integer",
+
type: 'integer',
},
did: {
-
type: "string",
-
format: "did",
+
type: 'string',
+
format: 'did',
},
handle: {
-
type: "string",
-
format: "handle",
+
type: 'string',
+
format: 'handle',
},
time: {
-
type: "string",
-
format: "datetime",
+
type: 'string',
+
format: 'datetime',
},
},
},
migrate: {
-
type: "object",
-
required: ["seq", "did", "migrateTo", "time"],
-
nullable: ["migrateTo"],
+
type: 'object',
+
required: ['seq', 'did', 'migrateTo', 'time'],
+
nullable: ['migrateTo'],
properties: {
seq: {
-
type: "integer",
+
type: 'integer',
},
did: {
-
type: "string",
-
format: "did",
+
type: 'string',
+
format: 'did',
},
migrateTo: {
-
type: "string",
+
type: 'string',
},
time: {
-
type: "string",
-
format: "datetime",
+
type: 'string',
+
format: 'datetime',
},
},
},
tombstone: {
-
type: "object",
-
required: ["seq", "did", "time"],
+
type: 'object',
+
required: ['seq', 'did', 'time'],
properties: {
seq: {
-
type: "integer",
+
type: 'integer',
},
did: {
-
type: "string",
-
format: "did",
+
type: 'string',
+
format: 'did',
},
time: {
-
type: "string",
-
format: "datetime",
+
type: 'string',
+
format: 'datetime',
},
},
},
info: {
-
type: "object",
-
required: ["name"],
+
type: 'object',
+
required: ['name'],
properties: {
name: {
-
type: "string",
-
knownValues: ["OutdatedCursor"],
+
type: 'string',
+
knownValues: ['OutdatedCursor'],
},
message: {
-
type: "string",
+
type: 'string',
},
},
},
repoOp: {
-
type: "object",
+
type: 'object',
description:
"A repo operation, ie a write of a single record. For creates and updates, cid is the record's CID as of this operation. For deletes, it's null.",
-
required: ["action", "path", "cid"],
-
nullable: ["cid"],
+
required: ['action', 'path', 'cid'],
+
nullable: ['cid'],
properties: {
action: {
-
type: "string",
-
knownValues: ["create", "update", "delete"],
+
type: 'string',
+
knownValues: ['create', 'update', 'delete'],
},
path: {
-
type: "string",
+
type: 'string',
},
cid: {
-
type: "cid-link",
+
type: 'cid-link',
},
},
},
},
-
};
+
}
-
const lexicons = new Lexicons([ComAtprotoSyncSubscribeRepos]);
+
const lexicons = new Lexicons([ComAtprotoSyncSubscribeRepos])
export const isValidRepoEvent = (evt: unknown) => {
-
return lexicons.assertValidXrpcMessage<RepoEvent>(
-
"com.atproto.sync.subscribeRepos",
-
evt
-
);
-
};
+
return lexicons.assertValidXrpcMessage<RepoEvent>('com.atproto.sync.subscribeRepos', evt)
+
}
+10 -10
src/index.ts
···
-
import { Server } from "#/server";
+
import { Server } from '#/server'
const run = async () => {
-
const server = await Server.create();
+
const server = await Server.create()
const onCloseSignal = async () => {
-
setTimeout(() => process.exit(1), 10000).unref(); // Force shutdown after 10s
-
await server.close();
-
process.exit();
-
};
+
setTimeout(() => process.exit(1), 10000).unref() // Force shutdown after 10s
+
await server.close()
+
process.exit()
+
}
-
process.on("SIGINT", onCloseSignal);
-
process.on("SIGTERM", onCloseSignal);
-
};
+
process.on('SIGINT', onCloseSignal)
+
process.on('SIGTERM', onCloseSignal)
+
}
-
run();
+
run()
+8 -8
src/middleware/errorHandler.ts
···
-
import type { ErrorRequestHandler, RequestHandler } from "express";
-
import { StatusCodes } from "http-status-codes";
+
import type { ErrorRequestHandler, RequestHandler } from 'express'
+
import { StatusCodes } from 'http-status-codes'
const unexpectedRequest: RequestHandler = (_req, res) => {
-
res.sendStatus(StatusCodes.NOT_FOUND);
-
};
+
res.sendStatus(StatusCodes.NOT_FOUND)
+
}
const addErrorToRequestLog: ErrorRequestHandler = (err, _req, res, next) => {
-
res.locals.err = err;
-
next(err);
-
};
+
res.locals.err = err
+
next(err)
+
}
-
export default () => [unexpectedRequest, addErrorToRequestLog];
+
export default () => [unexpectedRequest, addErrorToRequestLog]
+52 -52
src/middleware/requestLogger.ts
···
-
import { randomUUID } from "node:crypto";
-
import type { IncomingMessage, ServerResponse } from "node:http";
-
import type { Request, RequestHandler, Response } from "express";
-
import { StatusCodes, getReasonPhrase } from "http-status-codes";
-
import type { LevelWithSilent } from "pino";
-
import { type CustomAttributeKeys, type Options, pinoHttp } from "pino-http";
+
import { randomUUID } from 'node:crypto'
+
import type { IncomingMessage, ServerResponse } from 'node:http'
+
import type { Request, RequestHandler, Response } from 'express'
+
import { StatusCodes, getReasonPhrase } from 'http-status-codes'
+
import type { LevelWithSilent } from 'pino'
+
import { type CustomAttributeKeys, type Options, pinoHttp } from 'pino-http'
-
import { env } from "#/common/utils/envConfig";
+
import { env } from '#/env'
enum LogLevel {
-
Fatal = "fatal",
-
Error = "error",
-
Warn = "warn",
-
Info = "info",
-
Debug = "debug",
-
Trace = "trace",
-
Silent = "silent",
+
Fatal = 'fatal',
+
Error = 'error',
+
Warn = 'warn',
+
Info = 'info',
+
Debug = 'debug',
+
Trace = 'trace',
+
Silent = 'silent',
}
type PinoCustomProps = {
-
request: Request;
-
response: Response;
-
error: Error;
-
responseBody: unknown;
-
};
+
request: Request
+
response: Response
+
error: Error
+
responseBody: unknown
+
}
const requestLogger = (options?: Options): RequestHandler[] => {
const pinoOptions: Options = {
enabled: env.isProduction,
-
customProps: customProps as unknown as Options["customProps"],
+
customProps: customProps as unknown as Options['customProps'],
redact: [],
genReqId,
customLogLevel,
···
customErrorMessage: (_req, res) => `request errored with status code: ${res.statusCode}`,
customAttributeKeys,
...options,
-
};
-
return [responseBodyMiddleware, pinoHttp(pinoOptions)];
-
};
+
}
+
return [responseBodyMiddleware, pinoHttp(pinoOptions)]
+
}
const customAttributeKeys: CustomAttributeKeys = {
-
req: "request",
-
res: "response",
-
err: "error",
-
responseTime: "timeTaken",
-
};
+
req: 'request',
+
res: 'response',
+
err: 'error',
+
responseTime: 'timeTaken',
+
}
const customProps = (req: Request, res: Response): PinoCustomProps => ({
request: req,
response: res,
error: res.locals.err,
responseBody: res.locals.responseBody,
-
});
+
})
const responseBodyMiddleware: RequestHandler = (_req, res, next) => {
-
const isNotProduction = !env.isProduction;
+
const isNotProduction = !env.isProduction
if (isNotProduction) {
-
const originalSend = res.send;
+
const originalSend = res.send
res.send = (content) => {
-
res.locals.responseBody = content;
-
res.send = originalSend;
-
return originalSend.call(res, content);
-
};
+
res.locals.responseBody = content
+
res.send = originalSend
+
return originalSend.call(res, content)
+
}
}
-
next();
-
};
+
next()
+
}
const customLogLevel = (_req: IncomingMessage, res: ServerResponse<IncomingMessage>, err?: Error): LevelWithSilent => {
-
if (err || res.statusCode >= StatusCodes.INTERNAL_SERVER_ERROR) return LogLevel.Error;
-
if (res.statusCode >= StatusCodes.BAD_REQUEST) return LogLevel.Warn;
-
if (res.statusCode >= StatusCodes.MULTIPLE_CHOICES) return LogLevel.Silent;
-
return LogLevel.Info;
-
};
+
if (err || res.statusCode >= StatusCodes.INTERNAL_SERVER_ERROR) return LogLevel.Error
+
if (res.statusCode >= StatusCodes.BAD_REQUEST) return LogLevel.Warn
+
if (res.statusCode >= StatusCodes.MULTIPLE_CHOICES) return LogLevel.Silent
+
return LogLevel.Info
+
}
const customSuccessMessage = (req: IncomingMessage, res: ServerResponse<IncomingMessage>) => {
-
if (res.statusCode === StatusCodes.NOT_FOUND) return getReasonPhrase(StatusCodes.NOT_FOUND);
-
return `${req.method} completed`;
-
};
+
if (res.statusCode === StatusCodes.NOT_FOUND) return getReasonPhrase(StatusCodes.NOT_FOUND)
+
return `${req.method} completed`
+
}
const genReqId = (req: IncomingMessage, res: ServerResponse<IncomingMessage>) => {
-
const existingID = req.id ?? req.headers["x-request-id"];
-
if (existingID) return existingID;
-
const id = randomUUID();
-
res.setHeader("X-Request-Id", id);
-
return id;
-
};
+
const existingID = req.id ?? req.headers['x-request-id']
+
if (existingID) return existingID
+
const id = randomUUID()
+
res.setHeader('X-Request-Id', id)
+
return id
+
}
-
export default requestLogger();
+
export default requestLogger()
+12 -17
src/routes/index.ts
···
-
import express from "express";
-
import type { AppContext } from "#/config";
-
import { handler } from "./util";
+
import express from 'express'
+
import type { AppContext } from '#/config'
+
import { handler } from './util'
export const createRouter = (ctx: AppContext) => {
-
const router = express.Router();
+
const router = express.Router()
router.get(
-
"/",
+
'/',
handler(async (req, res) => {
-
const posts = await ctx.db
-
.selectFrom("post")
-
.selectAll()
-
.orderBy("indexedAt", "desc")
-
.limit(10)
-
.execute();
-
const postTexts = posts.map((row) => row.text);
-
res.json(postTexts);
-
})
-
);
+
const posts = await ctx.db.selectFrom('post').selectAll().orderBy('indexedAt', 'desc').limit(10).execute()
+
const postTexts = posts.map((row) => row.text)
+
res.json(postTexts)
+
}),
+
)
-
return router;
-
};
+
return router
+
}
+5 -10
src/routes/util.ts
···
-
import type express from "express";
+
import type express from 'express'
export const handler =
-
(fn: express.Handler) =>
-
async (
-
req: express.Request,
-
res: express.Response,
-
next: express.NextFunction
-
) => {
+
(fn: express.Handler) => async (req: express.Request, res: express.Response, next: express.NextFunction) => {
try {
-
await fn(req, res, next);
+
await fn(req, res, next)
} catch (err) {
-
next(err);
+
next(err)
}
-
};
+
}
+41 -41
src/server.ts
···
-
import events from "node:events";
-
import type http from "node:http";
-
import cors from "cors";
-
import express, { type Express } from "express";
-
import helmet from "helmet";
-
import { pino } from "pino";
+
import events from 'node:events'
+
import type http from 'node:http'
+
import cors from 'cors'
+
import express, { type Express } from 'express'
+
import helmet from 'helmet'
+
import { pino } from 'pino'
-
import { createDb, migrateToLatest } from "#/db";
-
import { env } from "#/env";
-
import { Ingester } from "#/firehose/ingester";
-
import errorHandler from "#/middleware/errorHandler";
-
import requestLogger from "#/middleware/requestLogger";
-
import { createRouter } from "#/routes";
-
import type { AppContext } from "./config";
+
import { createDb, migrateToLatest } from '#/db'
+
import { env } from '#/env'
+
import { Ingester } from '#/firehose/ingester'
+
import errorHandler from '#/middleware/errorHandler'
+
import requestLogger from '#/middleware/requestLogger'
+
import { createRouter } from '#/routes'
+
import type { AppContext } from './config'
export class Server {
constructor(
public app: express.Application,
public server: http.Server,
-
public ctx: AppContext
+
public ctx: AppContext,
) {}
static async create() {
-
const { NODE_ENV, HOST, PORT } = env;
+
const { NODE_ENV, HOST, PORT } = env
-
const logger = pino({ name: "server start" });
-
const db = createDb(":memory:");
-
await migrateToLatest(db);
-
const ingester = new Ingester(db);
-
ingester.start();
+
const logger = pino({ name: 'server start' })
+
const db = createDb(':memory:')
+
await migrateToLatest(db)
+
const ingester = new Ingester(db)
+
ingester.start()
const ctx = {
db,
ingester,
logger,
-
};
+
}
-
const app: Express = express();
+
const app: Express = express()
// Set the application to trust the reverse proxy
-
app.set("trust proxy", true);
+
app.set('trust proxy', true)
// TODO: middleware for sqlite server
// TODO: middleware for OAuth
// Middlewares
-
app.use(express.json());
-
app.use(express.urlencoded({ extended: true }));
-
app.use(cors({ origin: env.CORS_ORIGIN, credentials: true }));
-
app.use(helmet());
+
app.use(express.json())
+
app.use(express.urlencoded({ extended: true }))
+
app.use(cors({ origin: env.CORS_ORIGIN, credentials: true }))
+
app.use(helmet())
// Request logging
-
app.use(requestLogger);
+
app.use(requestLogger)
// Routes
-
const router = createRouter(ctx);
-
app.use(router);
+
const router = createRouter(ctx)
+
app.use(router)
// Error handlers
-
app.use(errorHandler());
+
app.use(errorHandler())
-
const server = app.listen(env.PORT);
-
await events.once(server, "listening");
-
logger.info(`Server (${NODE_ENV}) running on port http://${HOST}:${PORT}`);
+
const server = app.listen(env.PORT)
+
await events.once(server, 'listening')
+
logger.info(`Server (${NODE_ENV}) running on port http://${HOST}:${PORT}`)
-
return new Server(app, server, ctx);
+
return new Server(app, server, ctx)
}
async close() {
-
this.ctx.logger.info("sigint received, shutting down");
-
this.ctx.ingester.destroy();
+
this.ctx.logger.info('sigint received, shutting down')
+
this.ctx.ingester.destroy()
return new Promise<void>((resolve) => {
this.server.close(() => {
-
this.ctx.logger.info("server closed");
-
resolve();
-
});
-
});
+
this.ctx.logger.info('server closed')
+
resolve()
+
})
+
})
}
}