A simple AtProto app to read pet.mewsse.link records on my PDS.
1import type { Records as _Records } from "@atcute/lexicons/ambient"
2import type { Did } from '@atcute/lexicons/syntax'
3import type { Database, Link } from './db.ts'
4
5import { Client, simpleFetchHandler } from '@atcute/client'
6import { findUserPDS, getUserDID } from './id-resolver.ts'
7import { Jetstream } from '@skyware/jetstream'
8import { RepoReader } from '@atcute/car/v4'
9import { decode } from '@atcute/cbor'
10import { logger } from "./lib/logger.ts"
11
12import type { CommitCreate } from "@skyware/jetstream"
13
14interface RepoParams {
15 did: Did<"web"> | Did<"plc">,
16 since?: string
17}
18
19export class IngestionError extends Error {
20 constructor(msg: string) {
21 super(msg)
22
23 Object.setPrototypeOf(this, IngestionError.prototype)
24 }
25}
26
27export function findImage(did: Did<"web"> | Did<"plc">, pds: string, record: any): string | null {
28 const imageCid = record.image ? record.image.ref.$link : null
29 if (!imageCid) return null
30
31 // let the user pull the blob with their browser directly fomr the pds
32 // decreasing space needed to run the service and prevent duplication
33 // if hosted at the same place as the pds (self host anyone ?)
34 return `${pds}/xrpc/com.atproto.sync.getBlob?did=${did}&cid=${imageCid}`
35}
36
37export function createIngester(db: Database) {
38 return {
39 async backfill(): Promise<any> {
40 const did = getUserDID()
41 const pds = await findUserPDS()
42 const handler = simpleFetchHandler({ service: pds })
43 const rpc = new Client({ handler })
44 const now = new Date()
45
46 logger.info(`Starting backfilling`)
47
48 const params: RepoParams = {
49 did
50 }
51
52 const lastRev = await db
53 .selectFrom('revs')
54 .select('rkey')
55 .orderBy('createdAt', 'desc')
56 .executeTakeFirst()
57
58 if (lastRev) {
59 params.since = lastRev.rkey
60 }
61
62 const { ok, data } = await rpc.get(`com.atproto.sync.getRepo`, {
63 params,
64 as: 'stream'
65 })
66
67 if (!ok) {
68 throw new IngestionError(`Error while syncing repo for ${did} on ${pds}`)
69 }
70
71 await using repo = RepoReader.fromStream(data)
72 let links: Array<Link> = []
73 let repoRev = null;
74
75 for await (const entry of repo) {
76 if (!repoRev) repoRev = entry.rkey
77 if (entry.collection != "pet.mewsse.link") continue
78 const link = decode(entry.bytes)
79
80 links.push({
81 rkey: entry.rkey,
82 link: link.link,
83 title: link.title,
84 description: link.description,
85 image: findImage(did, pds, link),
86 alt: link.alt ?? null,
87 nsfw: +(link.nsfw || 0),
88 big: +(link.big || 0),
89 createdAt: link.createdAt
90 })
91 }
92
93 links = links.sort((a, b) => a.createdAt > b.createdAt ? 1 : -1)
94
95 for await (const link of links) {
96 await db
97 .insertInto('links')
98 .values(link)
99 .onConflict((conflict) =>
100 conflict.column('rkey').doUpdateSet({
101 link: link.link,
102 title: link.title,
103 description: link.description,
104 image: link.image,
105 alt: link.alt,
106 nsfw: +(link.nsfw || 0),
107 big: +(link.big || 0),
108 })
109 )
110 .execute()
111
112 logger.info(`Inserting record ${link.rkey}`)
113 }
114
115 if (!repoRev || repoRev === null) {
116 logger.error('Backfilling error: no last pds revision found')
117 return;
118 }
119
120 if (repoRev != lastRev?.rkey) {
121 await db
122 .insertInto('revs')
123 .values({
124 rkey: repoRev,
125 createdAt: now.toISOString()
126 })
127 .execute()
128 }
129
130 logger.info(`Backfilling ended`)
131 },
132
133 async jetstream(): Promise<Jetstream> {
134 const did = getUserDID()
135 const pds = await findUserPDS()
136
137 const jetstream = new Jetstream({
138 wantedCollections: ['pet.mewsse.link'],
139 wantedDids: [did]
140 })
141
142 jetstream.onCreate('pet.mewsse.link', async (event) => {
143 if (event.commit.record.$type != "pet.mewsse.link") return
144
145 const rev = event.commit.rev
146 const record = event.commit.record
147
148
149 await db
150 .insertInto('links')
151 .values({
152 rkey: rev,
153 link: record.link,
154 title: record.title,
155 description: record.description ?? null,
156 image: findImage(did, pds, record),
157 alt: record.alt ?? null,
158 nsfw: +(record.nsfw || 0),
159 big: +(record.big || 0),
160 createdAt: record.createdAt
161 })
162 .execute()
163
164 logger.info(`Record ${rev} created`)
165 })
166
167 jetstream.onUpdate('pet.mewsse.link', async (event) => {
168 if (event.commit.record.$type != "pet.mewsse.link") return
169
170 const rev = event.commit.rev
171 const record = event.commit.record
172
173 await db
174 .updateTable('links')
175 .set({
176 link: record.link,
177 title: record.title,
178 description: record.description ?? null,
179 image: findImage(did, pds, record),
180 alt: record.alt ?? null,
181 createdAt: record.createdAt
182 })
183 .where('rkey', '=', rev)
184 .executeTakeFirstOrThrow()
185
186 logger.info(`Record ${rev} updated`)
187 })
188
189 jetstream.onDelete('pet.mewsse.link', async (event) => {
190 if (event.commit.collection != "pet.mewsse.link") return
191
192 await db
193 .deleteFrom('links')
194 .where('rkey', '=', event.commit.rkey)
195 .executeTakeFirstOrThrow()
196
197 logger.info(`Record ${event.commit.rkey} deleted`)
198 })
199
200 return jetstream
201 }
202 }
203}