A simple AtProto app to read pet.mewsse.link records on my PDS.
1import type { Records as _Records } from "@atcute/lexicons/ambient" 2import type { Did } from '@atcute/lexicons/syntax' 3import type { Database, Link } from './db.ts' 4 5import { Client, simpleFetchHandler } from '@atcute/client' 6import { findUserPDS, getUserDID } from './id-resolver.ts' 7import { Jetstream } from '@skyware/jetstream' 8import { RepoReader } from '@atcute/car/v4' 9import { decode } from '@atcute/cbor' 10import { logger } from "./lib/logger.ts" 11 12import type { CommitCreate } from "@skyware/jetstream" 13 14interface RepoParams { 15 did: Did<"web"> | Did<"plc">, 16 since?: string 17} 18 19export class IngestionError extends Error { 20 constructor(msg: string) { 21 super(msg) 22 23 Object.setPrototypeOf(this, IngestionError.prototype) 24 } 25} 26 27export function findImage(did: Did<"web"> | Did<"plc">, pds: string, record: any): string | null { 28 const imageCid = record.image ? record.image.ref.$link : null 29 if (!imageCid) return null 30 31 // let the user pull the blob with their browser directly fomr the pds 32 // decreasing space needed to run the service and prevent duplication 33 // if hosted at the same place as the pds (self host anyone ?) 34 return `${pds}/xrpc/com.atproto.sync.getBlob?did=${did}&cid=${imageCid}` 35} 36 37export function createIngester(db: Database) { 38 return { 39 async backfill(): Promise<any> { 40 const did = getUserDID() 41 const pds = await findUserPDS() 42 const handler = simpleFetchHandler({ service: pds }) 43 const rpc = new Client({ handler }) 44 const now = new Date() 45 46 logger.info(`Starting backfilling`) 47 48 const params: RepoParams = { 49 did 50 } 51 52 const lastRev = await db 53 .selectFrom('revs') 54 .select('rkey') 55 .orderBy('createdAt', 'desc') 56 .executeTakeFirst() 57 58 if (lastRev) { 59 params.since = lastRev.rkey 60 } 61 62 const { ok, data } = await rpc.get(`com.atproto.sync.getRepo`, { 63 params, 64 as: 'stream' 65 }) 66 67 if (!ok) { 68 throw new IngestionError(`Error while syncing repo for ${did} on ${pds}`) 69 } 70 71 await using repo = RepoReader.fromStream(data) 72 let links: Array<Link> = [] 73 let repoRev = null; 74 75 for await (const entry of repo) { 76 if (!repoRev) repoRev = entry.rkey 77 if (entry.collection != "pet.mewsse.link") continue 78 const link = decode(entry.bytes) 79 80 links.push({ 81 rkey: entry.rkey, 82 link: link.link, 83 title: link.title, 84 description: link.description, 85 image: findImage(did, pds, link), 86 alt: link.alt ?? null, 87 nsfw: +(link.nsfw || 0), 88 big: +(link.big || 0), 89 createdAt: link.createdAt 90 }) 91 } 92 93 links = links.sort((a, b) => a.createdAt > b.createdAt ? 1 : -1) 94 95 for await (const link of links) { 96 await db 97 .insertInto('links') 98 .values(link) 99 .onConflict((conflict) => 100 conflict.column('rkey').doUpdateSet({ 101 link: link.link, 102 title: link.title, 103 description: link.description, 104 image: link.image, 105 alt: link.alt, 106 nsfw: +(link.nsfw || 0), 107 big: +(link.big || 0), 108 }) 109 ) 110 .execute() 111 112 logger.info(`Inserting record ${link.rkey}`) 113 } 114 115 if (!repoRev || repoRev === null) { 116 logger.error('Backfilling error: no last pds revision found') 117 return; 118 } 119 120 if (repoRev != lastRev?.rkey) { 121 await db 122 .insertInto('revs') 123 .values({ 124 rkey: repoRev, 125 createdAt: now.toISOString() 126 }) 127 .execute() 128 } 129 130 logger.info(`Backfilling ended`) 131 }, 132 133 async jetstream(): Promise<Jetstream> { 134 const did = getUserDID() 135 const pds = await findUserPDS() 136 137 const jetstream = new Jetstream({ 138 wantedCollections: ['pet.mewsse.link'], 139 wantedDids: [did] 140 }) 141 142 jetstream.onCreate('pet.mewsse.link', async (event) => { 143 if (event.commit.record.$type != "pet.mewsse.link") return 144 145 const rev = event.commit.rev 146 const record = event.commit.record 147 148 149 await db 150 .insertInto('links') 151 .values({ 152 rkey: rev, 153 link: record.link, 154 title: record.title, 155 description: record.description ?? null, 156 image: findImage(did, pds, record), 157 alt: record.alt ?? null, 158 nsfw: +(record.nsfw || 0), 159 big: +(record.big || 0), 160 createdAt: record.createdAt 161 }) 162 .execute() 163 164 logger.info(`Record ${rev} created`) 165 }) 166 167 jetstream.onUpdate('pet.mewsse.link', async (event) => { 168 if (event.commit.record.$type != "pet.mewsse.link") return 169 170 const rev = event.commit.rev 171 const record = event.commit.record 172 173 await db 174 .updateTable('links') 175 .set({ 176 link: record.link, 177 title: record.title, 178 description: record.description ?? null, 179 image: findImage(did, pds, record), 180 alt: record.alt ?? null, 181 createdAt: record.createdAt 182 }) 183 .where('rkey', '=', rev) 184 .executeTakeFirstOrThrow() 185 186 logger.info(`Record ${rev} updated`) 187 }) 188 189 jetstream.onDelete('pet.mewsse.link', async (event) => { 190 if (event.commit.collection != "pet.mewsse.link") return 191 192 await db 193 .deleteFrom('links') 194 .where('rkey', '=', event.commit.rkey) 195 .executeTakeFirstOrThrow() 196 197 logger.info(`Record ${event.commit.rkey} deleted`) 198 }) 199 200 return jetstream 201 } 202 } 203}