Scratch space for learning atproto app development

Review comments

Changed files
+31 -62
src
+5
src/context.ts
···
logger: pino.Logger
oauthClient: NodeOAuthClient
resolver: BidirectionalResolver
}
export async function createAppContext(): Promise<AppContext> {
···
logger,
oauthClient,
resolver,
}
}
···
logger: pino.Logger
oauthClient: NodeOAuthClient
resolver: BidirectionalResolver
+
destroy: () => Promise<void>
}
export async function createAppContext(): Promise<AppContext> {
···
logger,
oauthClient,
resolver,
+
async destroy() {
+
await ingester.destroy()
+
await db.destroy()
+
},
}
}
+2 -3
src/index.ts
···
-
import { createHttpTerminator } from 'http-terminator'
import { once } from 'node:events'
import { createAppContext } from '#/context'
import { env } from '#/env'
import { run } from '#/lib/process'
import { createRouter } from '#/routes'
-
import { startServer } from '#/lib/http'
run(async (killSignal) => {
// Create the application context
···
await terminate()
// Close the firehose connection
-
await ctx.ingester.destroy()
})
···
import { once } from 'node:events'
import { createAppContext } from '#/context'
import { env } from '#/env'
+
import { startServer } from '#/lib/http'
import { run } from '#/lib/process'
import { createRouter } from '#/routes'
run(async (killSignal) => {
// Create the application context
···
await terminate()
// Close the firehose connection
+
await ctx.destroy()
})
+7 -17
src/lib/http.ts
···
export type NextFunction = (err?: unknown) => void
-
export type Handler<
Req extends IncomingMessage = IncomingMessage,
Res extends ServerResponse<Req> = ServerResponse<Req>,
> = (req: Req, res: Res, next: NextFunction) => void
-
export type AsyncHandler<
Req extends IncomingMessage = IncomingMessage,
Res extends ServerResponse<Req> = ServerResponse<Req>,
-
> = (req: Req, res: Res, next: NextFunction) => Promise<void>
-
/**
* Wraps a request handler middleware to ensure that `next` is called if it
* throws or returns a promise that rejects.
···
export function handler<
Req extends IncomingMessage = IncomingMessage,
Res extends ServerResponse<Req> = ServerResponse<Req>,
-
>(fn: Handler<Req, Res> | AsyncHandler<Req, Res>): Handler<Req, Res> {
-
return (req, res, next) => {
-
// Optimization: NodeJS prefers objects over functions for garbage collection
-
const nextSafe = nextOnce.bind({ next })
try {
-
const result = fn(req, res, nextSafe)
-
if (result instanceof Promise) result.catch(nextSafe)
} catch (err) {
-
nextSafe(err)
}
-
}
-
-
function nextOnce(this: { next: NextFunction | null }, err?: unknown) {
-
const { next } = this
-
this.next = null
-
next?.(err)
}
}
···
export type NextFunction = (err?: unknown) => void
+
export type Middleware<
Req extends IncomingMessage = IncomingMessage,
Res extends ServerResponse<Req> = ServerResponse<Req>,
> = (req: Req, res: Res, next: NextFunction) => void
+
export type Handler<
Req extends IncomingMessage = IncomingMessage,
Res extends ServerResponse<Req> = ServerResponse<Req>,
+
> = (req: Req, res: Res) => unknown | Promise<unknown>
/**
* Wraps a request handler middleware to ensure that `next` is called if it
* throws or returns a promise that rejects.
···
export function handler<
Req extends IncomingMessage = IncomingMessage,
Res extends ServerResponse<Req> = ServerResponse<Req>,
+
>(fn: Handler<Req, Res>): Middleware<Req, Res> {
+
return async (req, res, next) => {
try {
+
await fn(req, res)
} catch (err) {
+
next(err)
}
}
}
+3 -3
src/lib/process.ts
···
* Runs a function with an abort signal that will be triggered when the process
* receives a termination signal.
*/
-
export async function run<F extends (signal: AbortSignal) => unknown>(
fn: F,
-
): Promise<Awaited<ReturnType<F>>> {
const killController = new AbortController()
const abort = (signal?: string) => {
···
for (const sig of SIGNALS) process.on(sig, abort)
try {
-
return (await fn(killController.signal)) as Awaited<ReturnType<F>>
} finally {
abort()
}
···
* Runs a function with an abort signal that will be triggered when the process
* receives a termination signal.
*/
+
export async function run<F extends (signal: AbortSignal) => Promise<void>>(
fn: F,
+
): Promise<void> {
const killController = new AbortController()
const abort = (signal?: string) => {
···
for (const sig of SIGNALS) process.on(sig, abort)
try {
+
await fn(killController.signal)
} finally {
abort()
}
+14 -39
src/routes.ts
···
import { OAuthResolverError } from '@atproto/oauth-client-node'
import express, { Request, Response } from 'express'
import { getIronSession } from 'iron-session'
-
import assert from 'node:assert'
import type {
IncomingMessage,
RequestListener,
···
import path from 'node:path'
import type { AppContext } from '#/context'
import * as Profile from '#/lexicon/types/app/bsky/actor/profile'
import * as Status from '#/lexicon/types/xyz/statusphere/status'
-
import { env } from '#/env'
import { handler } from '#/lib/http'
import { page } from '#/lib/view'
import { home, STATUS_OPTIONS } from '#/pages/home'
import { login } from '#/pages/login'
-
import { ifString } from '#/lib/util'
type Session = { did?: string }
···
})
if (!session.did) return null
try {
-
// force rotating the credentials if the request has a no-cache header
-
const refresh = req.headers['cache-control']?.includes('no-cache') || 'auto'
-
-
const oauthSession = await ctx.oauthClient.restore(session.did, refresh)
return oauthSession ? new Agent(oauthSession) : null
} catch (err) {
ctx.logger.warn({ err }, 'oauth restore failed')
···
router.get(
'/login',
handler(async (req: Request, res: Response) => {
-
res.type('html').send(page(login({})))
}),
)
// Login handler
router.post(
'/login',
-
express.urlencoded({ extended: true }),
handler(async (req: Request, res: Response) => {
const input = ifString(req.body.input)
// Validate
if (!input) {
-
res.type('html').send(page(login({ error: 'invalid input' })))
-
return
-
}
-
-
// @NOTE input can be a handle, a DID or a service URL (PDS).
-
-
// Initiate the OAuth flow
-
try {
-
const url = await ctx.oauthClient.authorize(input, {
-
scope: 'atproto transition:generic',
-
})
-
res.redirect(url.toString())
-
} catch (err) {
-
ctx.logger.error({ err }, 'oauth authorize failed')
-
-
const error =
-
err instanceof OAuthResolverError
-
? err.message
-
: "couldn't initiate login"
-
-
res.type('html').send(page(login({ error })))
}
}),
)
···
if (!agent) {
// Serve the logged-out view
-
res.type('html').send(page(home({ statuses, didHandleMap })))
-
return
}
// Fetch additional information about the logged-in user
···
// "Set status" handler
router.post(
'/status',
-
express.urlencoded({ extended: true }),
handler(async (req: Request, res: Response) => {
// If the user is signed in, get an agent which communicates with their server
const agent = await getSessionAgent(req, res, ctx)
if (!agent) {
-
res.redirect(`/login`)
-
return
}
const status = ifString(req.body?.status)
···
}
if (!Status.validateRecord(record).success) {
-
res.status(400).type('html').send('<h1>Error: Invalid status</h1>')
-
return
}
// Write the status record to the user's repository
···
uri = res.data.uri
} catch (err) {
ctx.logger.warn({ err }, 'failed to write record')
-
res
.status(500)
.type('html')
.send('<h1>Error: Failed to write record</h1>')
-
return
}
try {
···
import { OAuthResolverError } from '@atproto/oauth-client-node'
import express, { Request, Response } from 'express'
import { getIronSession } from 'iron-session'
import type {
IncomingMessage,
RequestListener,
···
import path from 'node:path'
import type { AppContext } from '#/context'
+
import { env } from '#/env'
import * as Profile from '#/lexicon/types/app/bsky/actor/profile'
import * as Status from '#/lexicon/types/xyz/statusphere/status'
import { handler } from '#/lib/http'
+
import { ifString } from '#/lib/util'
import { page } from '#/lib/view'
import { home, STATUS_OPTIONS } from '#/pages/home'
import { login } from '#/pages/login'
type Session = { did?: string }
···
})
if (!session.did) return null
try {
+
const oauthSession = await ctx.oauthClient.restore(session.did)
return oauthSession ? new Agent(oauthSession) : null
} catch (err) {
ctx.logger.warn({ err }, 'oauth restore failed')
···
router.get(
'/login',
handler(async (req: Request, res: Response) => {
+
return res.type('html').send(page(login({})))
}),
)
// Login handler
router.post(
'/login',
+
express.urlencoded(),
handler(async (req: Request, res: Response) => {
const input = ifString(req.body.input)
// Validate
if (!input) {
+
return res.type('html').send(page(login({ error: 'invalid input' })))
}
}),
)
···
if (!agent) {
// Serve the logged-out view
+
return res.type('html').send(page(home({ statuses, didHandleMap })))
}
// Fetch additional information about the logged-in user
···
// "Set status" handler
router.post(
'/status',
+
express.urlencoded(),
handler(async (req: Request, res: Response) => {
// If the user is signed in, get an agent which communicates with their server
const agent = await getSessionAgent(req, res, ctx)
if (!agent) {
+
return res.redirect(`/login`)
}
const status = ifString(req.body?.status)
···
}
if (!Status.validateRecord(record).success) {
+
return res
+
.status(400)
+
.type('html')
+
.send('<h1>Error: Invalid status</h1>')
}
// Write the status record to the user's repository
···
uri = res.data.uri
} catch (err) {
ctx.logger.warn({ err }, 'failed to write record')
+
return res
.status(500)
.type('html')
.send('<h1>Error: Failed to write record</h1>')
}
try {