A simple AtProto app to read pet.mewsse.link records on my PDS.
at main 6.0 kB view raw
1import type { Records as _Records } from "@atcute/lexicons/ambient" 2import type { Did } from '@atcute/lexicons/syntax' 3import type { Database, Link } from './db.ts' 4 5import { Client, simpleFetchHandler } from '@atcute/client' 6import { CommitType, Jetstream } from '@skyware/jetstream' 7import { RepoReader } from '@atcute/car/v4' 8import { decode } from '@atcute/cbor' 9import { logger } from "./lib/logger.ts" 10import { findEmbed } from "./lib/embed.ts" 11 12interface RepoParams { 13 did: Did<"web"> | Did<"plc">, 14 since?: string 15} 16 17export class IngestionError extends Error { 18 constructor(msg: string) { 19 super(msg) 20 21 Object.setPrototypeOf(this, IngestionError.prototype) 22 } 23} 24 25export function createIngester(db: Database, did: Did<"web"> | Did<"plc">, pds: string) { 26 return { 27 async backfill(): Promise<any> { 28 const handler = simpleFetchHandler({ service: pds }) 29 const rpc = new Client({ handler }) 30 const now = new Date() 31 32 logger.info(`Starting backfilling`) 33 34 const params: RepoParams = { 35 did 36 } 37 38 const lastRev = await db 39 .selectFrom('revs') 40 .select('rkey') 41 .orderBy('createdAt', 'desc') 42 .executeTakeFirst() 43 44 if (lastRev) { 45 params.since = lastRev.rkey 46 } 47 48 const { ok, data } = await rpc.get(`com.atproto.sync.getRepo`, { 49 params, 50 as: 'stream' 51 }) 52 53 if (!ok) { 54 throw new IngestionError(`Error while syncing repo for ${did} on ${pds}`) 55 } 56 57 await using repo = RepoReader.fromStream(data) 58 let links: Array<Link> = [] 59 let repoRev = null; 60 61 for await (const entry of repo) { 62 if (!repoRev) repoRev = entry.rkey 63 if (entry.collection != "pet.mewsse.link") continue 64 65 const link = decode(entry.bytes) 66 const embed = await findEmbed(did, pds, link.embed) 67 68 links.push({ 69 rkey: entry.rkey, 70 url: link.url, 71 title: link.title, 72 description: link.description, 73 embed: embed, 74 nsfw: +(link.nsfw || 0), 75 createdAt: link.createdAt 76 }) 77 } 78 79 links = links.sort((a, b) => a.createdAt > b.createdAt ? 1 : -1) 80 81 for await (const link of links) { 82 await db 83 .insertInto('links') 84 .values(link) 85 .onConflict((conflict) => 86 conflict.column('rkey').doUpdateSet({ 87 url: link.url, 88 title: link.title, 89 description: link.description, 90 embed: link.embed, 91 nsfw: +(link.nsfw || 0), 92 }) 93 ) 94 .execute() 95 96 logger.info(`Inserting record ${link.rkey}`) 97 } 98 99 if (repoRev && repoRev != lastRev?.rkey) { 100 await db 101 .insertInto('revs') 102 .values({ 103 rkey: repoRev, 104 createdAt: now.toISOString() 105 }) 106 .execute() 107 } 108 109 logger.info(`Backfilling ended`) 110 }, 111 112 async setLastRevision(rkey: string, createdAt: string): Promise<void> { 113 await db 114 .deleteFrom('revs') 115 .execute() 116 117 await db 118 .insertInto('revs') 119 .values({ 120 rkey, 121 createdAt 122 }) 123 .execute() 124 }, 125 126 async rollbackRevision(): Promise<void> { 127 const lastInsert = await db 128 .selectFrom('links') 129 .select(['rkey', 'createdAt']) 130 .orderBy('createdAt', 'desc') 131 .limit(1) 132 .executeTakeFirst() 133 134 if (lastInsert !== undefined) { 135 await db 136 .deleteFrom('revs') 137 .execute() 138 139 await db 140 .insertInto('revs') 141 .values({ 142 rkey: lastInsert.rkey, 143 createdAt: lastInsert.createdAt 144 }) 145 .execute() 146 } 147 }, 148 149 async jetstream(did: Did<"web"> | Did<"plc">, pds: string): Promise<Jetstream> { 150 const jetstream = new Jetstream({ 151 wantedDids: [did], 152 }) 153 154 jetstream.on("commit", async (event) => { 155 if (event.commit.operation === CommitType.Create) { 156 const date = new Date() 157 await this.setLastRevision(event.commit.rev, date.toISOString()) 158 return 159 } 160 161 if (event.commit.operation === CommitType.Delete) { 162 await this.rollbackRevision() 163 return 164 } 165 }) 166 167 jetstream.onCreate('pet.mewsse.link', async (event) => { 168 if (event.commit.record.$type != "pet.mewsse.link") return 169 170 const rev = event.commit.rev 171 const record = event.commit.record 172 173 174 await db 175 .insertInto('links') 176 .values({ 177 rkey: rev, 178 url: record.url, 179 title: record.title, 180 description: record.description ?? null, 181 embed: await findEmbed(did, pds, record.embed), 182 nsfw: +(record.nsfw || 0), 183 createdAt: record.createdAt 184 }) 185 .execute() 186 187 logger.info(`Record ${rev} created`) 188 }) 189 190 jetstream.onUpdate('pet.mewsse.link', async (event) => { 191 if (event.commit.record.$type != "pet.mewsse.link") return 192 193 const rev = event.commit.rev 194 const record = event.commit.record 195 196 await db 197 .updateTable('links') 198 .set({ 199 url: record.url, 200 title: record.title, 201 description: record.description ?? null, 202 embed: await findEmbed(did, pds, record.embed), 203 nsfw: +(record.nsfw || 0), 204 createdAt: record.createdAt 205 }) 206 .where('rkey', '=', rev) 207 .executeTakeFirst() 208 209 logger.info(`Record ${rev} updated`) 210 }) 211 212 jetstream.onDelete('pet.mewsse.link', async (event) => { 213 if (event.commit.collection != "pet.mewsse.link") return 214 215 await db 216 .deleteFrom('links') 217 .where('rkey', '=', event.commit.rkey) 218 .executeTakeFirstOrThrow() 219 220 logger.info(`Record ${event.commit.rkey} deleted`) 221 }) 222 223 return jetstream 224 } 225 } 226}