A simple AtProto app to read pet.mewsse.link records on my PDS.

Cleanup and fix ingestion

Mewsse 1b2d4175 2ab2c9b9

Changed files
+46 -44
src
+9 -6
src/db.ts
···
import type { Migration, MigrationProvider } from 'kysely'
import type { Blob, LegacyBlob } from '@atcute/lexicons'
+
export type DatabaseSchema = {
links: Link,
revs: Rev
}
+
+
export type Embed = {
+
'$type': string,
+
image?: Blob | LegacyBlob,
+
alt?: string,
+
uri?: string
+
} | null
export type Link = {
rkey: string,
link: string,
title: string,
description: string | null,
-
embed: {
-
'$type': string,
-
image?: Blob | LegacyBlob,
-
alt?: string,
-
uri?: string
-
} | null,
+
embed: Embed,
nsfw: number,
big: number,
createdAt: string,
+17 -8
src/index.ts
···
+
import { findUserPDS, getUserDID } from './id-resolver.ts'
import { createDb, migrateToLatest } from './db.ts'
import { createIngester } from './ingester.ts'
import { Jetstream } from '@skyware/jetstream'
···
import process from 'node:process'
import dotenv from 'dotenv'
+
import type { Did } from '@atcute/lexicons/syntax'
import type { Database } from './db.ts'
dotenv.config()
···
export type Context = {
db: Database
jetstream: Jetstream,
-
httpServer: Router
+
httpServer: Router,
+
did: Did<"web"> | Did<"plc">,
+
pds: string
}
export class Server {
···
const port = process.env.PORT || "8080"
const dbPath = process.env.DB_PATH || ":memory"
+
const did = getUserDID()
+
const pds = await findUserPDS()
+
const db = createDb(dbPath)
await migrateToLatest(db)
logger.info('Migrating db to latest version')
-
const ingester = createIngester(db)
+
const ingester = createIngester(db, did, pds)
await ingester.backfill()
-
const jetstream = await ingester.jetstream()
-
-
jetstream.start()
-
logger.info("Starting jetstream client")
+
const jetstream = await ingester.jetstream(did, pds)
const httpServer = new Router()
-
createRoutes(httpServer, db)
const ctx: Context = {
db,
jetstream,
-
httpServer
+
httpServer,
+
did,
+
pds
}
+
jetstream.start()
+
logger.info("Starting jetstream client")
+
+
createRoutes(ctx)
httpServer.start(port)
logger.info(`Starting http server on port ${port}`)
+2 -18
src/ingester.ts
···
import type { Database, Link } from './db.ts'
import { Client, simpleFetchHandler } from '@atcute/client'
-
import { findUserPDS, getUserDID } from './id-resolver.ts'
import { CommitType, Jetstream } from '@skyware/jetstream'
import { RepoReader } from '@atcute/car/v4'
import { decode } from '@atcute/cbor'
···
}
}
-
export function findImage(did: Did<"web"> | Did<"plc">, pds: string, record: any): string | null {
-
const imageCid = record.image ? record.image.ref.$link : null
-
if (!imageCid) return null
-
-
// let the user pull the blob with their browser directly fomr the pds
-
// decreasing space needed to run the service and prevent duplication
-
// if hosted at the same place as the pds (self host anyone ?)
-
return `${pds}/xrpc/com.atproto.sync.getBlob?did=${did}&cid=${imageCid}`
-
}
-
-
export function createIngester(db: Database) {
+
export function createIngester(db: Database, did: Did<"web"> | Did<"plc">, pds: string) {
return {
async backfill(): Promise<any> {
-
const did = getUserDID()
-
const pds = await findUserPDS()
const handler = simpleFetchHandler({ service: pds })
const rpc = new Client({ handler })
const now = new Date()
···
}
},
-
async jetstream(): Promise<Jetstream> {
-
const did = getUserDID()
-
const pds = await findUserPDS()
-
+
async jetstream(did: Did<"web"> | Did<"plc">, pds: string): Promise<Jetstream> {
const jetstream = new Jetstream({
wantedDids: [did],
})
+18 -12
src/routes.ts
···
import { IncomingMessage, ServerResponse } from "node:http"
-
import { Router } from "./lib/router.ts"
+
import { findEmbed } from "./lib/embed.ts"
import path from "node:path"
import { Eta } from "eta"
-
import type { Database } from './db.ts'
+
import type { Context } from "./index.ts"
-
export const createRoutes = (router: Router, db: Database) => {
+
export const createRoutes = (ctx: Context) => {
const eta = new Eta({ views: path.join(import.meta.dirname, "views") })
const itemPerPages = parseInt(process.env.ITEM_PER_PAGES || "10", 10)
+
const embed = findEmbed(ctx.did, ctx.pds)
async function renderPage (
req:IncomingMessage,
···
) {
const offset = params ? parseInt(params?.page, 10) - 1 : 0
-
const links = await db
+
const links = await ctx.db
.selectFrom("links")
.selectAll()
.orderBy("createdAt", "desc")
···
.limit(itemPerPages)
.execute()
-
const totalLink = await db
+
const totalLink = await ctx.db
.selectFrom("links")
-
.select(db.fn.countAll().as("count"))
+
.select(ctx.db.fn.countAll().as("count"))
.executeTakeFirstOrThrow()
const pages = Math.ceil(parseInt(totalLink.count.toString(), 10) / itemPerPages)
···
const body = eta.render("./main", {
items: links,
pages: Array.from(Array(pages).keys()),
-
selected: parseInt(params?.page || "0", 10)
+
selected: parseInt(params?.page || "0", 10),
+
embed: embed
})
res.writeHead(200, { 'Content-Type': 'text/html' })
res.end(body)
}
-
router.get('/', async (req, res, params) => {
+
ctx.httpServer.get('/', async (req, res, params) => {
await renderPage(req, res, params)
})
-
router.get('/page/{page:number}', async (req, res, params) => {
+
ctx.httpServer.get('/page/{page:number}', async (req, res, params) => {
await renderPage(req, res, params)
})
-
router.get('/feed.atom', async (req, res, params) => {
-
const links = await db
+
ctx.httpServer.get('/feed.atom', async (req, res, params) => {
+
const links = await ctx.db
.selectFrom("links")
.selectAll()
.orderBy("createdAt", "desc")
···
const date = new Date(links[0].createdAt);
-
const body = eta.render("./feed", { items: links, date })
+
const body = eta.render("./feed", {
+
items: links,
+
date,
+
embed: embed
+
})
res.writeHead(200, { 'Content-Type': 'application/atom+xml' })
res.end(body)