A simple AtProto app to read pet.mewsse.link records on my PDS.
1import type { Records as _Records } from "@atcute/lexicons/ambient"
2import type { Did } from '@atcute/lexicons/syntax'
3import type { Database, Link } from './db.ts'
4
5import { Client, simpleFetchHandler } from '@atcute/client'
6import { CommitType, Jetstream } from '@skyware/jetstream'
7import { RepoReader } from '@atcute/car/v4'
8import { decode } from '@atcute/cbor'
9import { logger } from "./lib/logger.ts"
10import { findEmbed } from "./lib/embed.ts"
11
12interface RepoParams {
13 did: Did<"web"> | Did<"plc">,
14 since?: string
15}
16
17export class IngestionError extends Error {
18 constructor(msg: string) {
19 super(msg)
20
21 Object.setPrototypeOf(this, IngestionError.prototype)
22 }
23}
24
25export function createIngester(db: Database, did: Did<"web"> | Did<"plc">, pds: string) {
26 return {
27 async backfill(): Promise<any> {
28 const handler = simpleFetchHandler({ service: pds })
29 const rpc = new Client({ handler })
30 const now = new Date()
31
32 logger.info(`Starting backfilling`)
33
34 const params: RepoParams = {
35 did
36 }
37
38 const lastRev = await db
39 .selectFrom('revs')
40 .select('rkey')
41 .orderBy('createdAt', 'desc')
42 .executeTakeFirst()
43
44 if (lastRev) {
45 params.since = lastRev.rkey
46 }
47
48 const { ok, data } = await rpc.get(`com.atproto.sync.getRepo`, {
49 params,
50 as: 'stream'
51 })
52
53 if (!ok) {
54 throw new IngestionError(`Error while syncing repo for ${did} on ${pds}`)
55 }
56
57 await using repo = RepoReader.fromStream(data)
58 let links: Array<Link> = []
59 let repoRev = null;
60
61 for await (const entry of repo) {
62 if (!repoRev) repoRev = entry.rkey
63 if (entry.collection != "pet.mewsse.link") continue
64
65 const link = decode(entry.bytes)
66 const embed = await findEmbed(did, pds, link.embed)
67
68 links.push({
69 rkey: entry.rkey,
70 url: link.url,
71 title: link.title,
72 description: link.description,
73 embed: embed,
74 nsfw: +(link.nsfw || 0),
75 createdAt: link.createdAt
76 })
77 }
78
79 links = links.sort((a, b) => a.createdAt > b.createdAt ? 1 : -1)
80
81 for await (const link of links) {
82 await db
83 .insertInto('links')
84 .values(link)
85 .onConflict((conflict) =>
86 conflict.column('rkey').doUpdateSet({
87 url: link.url,
88 title: link.title,
89 description: link.description,
90 embed: link.embed,
91 nsfw: +(link.nsfw || 0),
92 })
93 )
94 .execute()
95
96 logger.info(`Inserting record ${link.rkey}`)
97 }
98
99 if (repoRev && repoRev != lastRev?.rkey) {
100 await db
101 .insertInto('revs')
102 .values({
103 rkey: repoRev,
104 createdAt: now.toISOString()
105 })
106 .execute()
107 }
108
109 logger.info(`Backfilling ended`)
110 },
111
112 async setLastRevision(rkey: string, createdAt: string): Promise<void> {
113 await db
114 .deleteFrom('revs')
115 .execute()
116
117 await db
118 .insertInto('revs')
119 .values({
120 rkey,
121 createdAt
122 })
123 .execute()
124 },
125
126 async rollbackRevision(): Promise<void> {
127 const lastInsert = await db
128 .selectFrom('links')
129 .select(['rkey', 'createdAt'])
130 .orderBy('createdAt', 'desc')
131 .limit(1)
132 .executeTakeFirst()
133
134 if (lastInsert !== undefined) {
135 await db
136 .deleteFrom('revs')
137 .execute()
138
139 await db
140 .insertInto('revs')
141 .values({
142 rkey: lastInsert.rkey,
143 createdAt: lastInsert.createdAt
144 })
145 .execute()
146 }
147 },
148
149 async jetstream(did: Did<"web"> | Did<"plc">, pds: string): Promise<Jetstream> {
150 const jetstream = new Jetstream({
151 wantedDids: [did],
152 })
153
154 jetstream.on("commit", async (event) => {
155 if (event.commit.operation === CommitType.Create) {
156 const date = new Date()
157 await this.setLastRevision(event.commit.rev, date.toISOString())
158 return
159 }
160
161 if (event.commit.operation === CommitType.Delete) {
162 await this.rollbackRevision()
163 return
164 }
165 })
166
167 jetstream.onCreate('pet.mewsse.link', async (event) => {
168 if (event.commit.record.$type != "pet.mewsse.link") return
169
170 const rev = event.commit.rev
171 const record = event.commit.record
172
173
174 await db
175 .insertInto('links')
176 .values({
177 rkey: rev,
178 url: record.url,
179 title: record.title,
180 description: record.description ?? null,
181 embed: await findEmbed(did, pds, record.embed),
182 nsfw: +(record.nsfw || 0),
183 createdAt: record.createdAt
184 })
185 .execute()
186
187 logger.info(`Record ${rev} created`)
188 })
189
190 jetstream.onUpdate('pet.mewsse.link', async (event) => {
191 if (event.commit.record.$type != "pet.mewsse.link") return
192
193 const rev = event.commit.rev
194 const record = event.commit.record
195
196 await db
197 .updateTable('links')
198 .set({
199 url: record.url,
200 title: record.title,
201 description: record.description ?? null,
202 embed: await findEmbed(did, pds, record.embed),
203 nsfw: +(record.nsfw || 0),
204 createdAt: record.createdAt
205 })
206 .where('rkey', '=', rev)
207 .executeTakeFirst()
208
209 logger.info(`Record ${rev} updated`)
210 })
211
212 jetstream.onDelete('pet.mewsse.link', async (event) => {
213 if (event.commit.collection != "pet.mewsse.link") return
214
215 await db
216 .deleteFrom('links')
217 .where('rkey', '=', event.commit.rkey)
218 .executeTakeFirstOrThrow()
219
220 logger.info(`Record ${event.commit.rkey} deleted`)
221 })
222
223 return jetstream
224 }
225 }
226}