Scratch space for learning atproto app development
1import type { IncomingMessage } from "node:http";
2
3import { type LexiconDoc, Lexicons } from "@atproto/lexicon";
4import type { ErrorFrame, HandlerAuth } from "@atproto/xrpc-server";
5import type { CID } from "multiformats/cid";
6
7// @NOTE: this file is an ugly copy job of codegen output. I'd like to clean this whole thing up
8
9export function isObj(v: unknown): v is Record<string, unknown> {
10 return typeof v === "object" && v !== null;
11}
12
13export function hasProp<K extends PropertyKey>(
14 data: object,
15 prop: K
16): data is Record<K, unknown> {
17 return prop in data;
18}
19
20export interface QueryParams {
21 /** The last known event to backfill from. */
22 cursor?: number;
23}
24
25export type RepoEvent =
26 | Commit
27 | Handle
28 | Migrate
29 | Tombstone
30 | Info
31 | { $type: string; [k: string]: unknown };
32export type HandlerError = ErrorFrame<"FutureCursor" | "ConsumerTooSlow">;
33export type HandlerOutput = HandlerError | RepoEvent;
34export type HandlerReqCtx<HA extends HandlerAuth = never> = {
35 auth: HA;
36 params: QueryParams;
37 req: IncomingMessage;
38 signal: AbortSignal;
39};
40export type Handler<HA extends HandlerAuth = never> = (
41 ctx: HandlerReqCtx<HA>
42) => AsyncIterable<HandlerOutput>;
43
44export interface Commit {
45 seq: number;
46 rebase: boolean;
47 tooBig: boolean;
48 repo: string;
49 commit: CID;
50 prev?: CID | null;
51 /** The rev of the emitted commit */
52 rev: string;
53 /** The rev of the last emitted commit from this repo */
54 since: string | null;
55 /** CAR file containing relevant blocks */
56 blocks: Uint8Array;
57 ops: RepoOp[];
58 blobs: CID[];
59 time: string;
60 [k: string]: unknown;
61}
62
63export function isCommit(v: unknown): v is Commit {
64 return (
65 isObj(v) &&
66 hasProp(v, "$type") &&
67 v.$type === "com.atproto.sync.subscribeRepos#commit"
68 );
69}
70
71export interface Handle {
72 seq: number;
73 did: string;
74 handle: string;
75 time: string;
76 [k: string]: unknown;
77}
78
79export function isHandle(v: unknown): v is Handle {
80 return (
81 isObj(v) &&
82 hasProp(v, "$type") &&
83 v.$type === "com.atproto.sync.subscribeRepos#handle"
84 );
85}
86
87export interface Migrate {
88 seq: number;
89 did: string;
90 migrateTo: string | null;
91 time: string;
92 [k: string]: unknown;
93}
94
95export function isMigrate(v: unknown): v is Migrate {
96 return (
97 isObj(v) &&
98 hasProp(v, "$type") &&
99 v.$type === "com.atproto.sync.subscribeRepos#migrate"
100 );
101}
102
103export interface Tombstone {
104 seq: number;
105 did: string;
106 time: string;
107 [k: string]: unknown;
108}
109
110export function isTombstone(v: unknown): v is Tombstone {
111 return (
112 isObj(v) &&
113 hasProp(v, "$type") &&
114 v.$type === "com.atproto.sync.subscribeRepos#tombstone"
115 );
116}
117
118export interface Info {
119 name: "OutdatedCursor" | (string & {});
120 message?: string;
121 [k: string]: unknown;
122}
123
124export function isInfo(v: unknown): v is Info {
125 return (
126 isObj(v) &&
127 hasProp(v, "$type") &&
128 v.$type === "com.atproto.sync.subscribeRepos#info"
129 );
130}
131
132/** A repo operation, ie a write of a single record. For creates and updates, cid is the record's CID as of this operation. For deletes, it's null. */
133export interface RepoOp {
134 action: "create" | "update" | "delete" | (string & {});
135 path: string;
136 cid: CID | null;
137 [k: string]: unknown;
138}
139
140export function isRepoOp(v: unknown): v is RepoOp {
141 return (
142 isObj(v) &&
143 hasProp(v, "$type") &&
144 v.$type === "com.atproto.sync.subscribeRepos#repoOp"
145 );
146}
147
148export const ComAtprotoSyncSubscribeRepos: LexiconDoc = {
149 lexicon: 1,
150 id: "com.atproto.sync.subscribeRepos",
151 defs: {
152 main: {
153 type: "subscription",
154 description: "Subscribe to repo updates",
155 parameters: {
156 type: "params",
157 properties: {
158 cursor: {
159 type: "integer",
160 description: "The last known event to backfill from.",
161 },
162 },
163 },
164 message: {
165 schema: {
166 type: "union",
167 refs: [
168 "lex:com.atproto.sync.subscribeRepos#commit",
169 "lex:com.atproto.sync.subscribeRepos#handle",
170 "lex:com.atproto.sync.subscribeRepos#migrate",
171 "lex:com.atproto.sync.subscribeRepos#tombstone",
172 "lex:com.atproto.sync.subscribeRepos#info",
173 ],
174 },
175 },
176 errors: [
177 {
178 name: "FutureCursor",
179 },
180 {
181 name: "ConsumerTooSlow",
182 },
183 ],
184 },
185 commit: {
186 type: "object",
187 required: [
188 "seq",
189 "rebase",
190 "tooBig",
191 "repo",
192 "commit",
193 "rev",
194 "since",
195 "blocks",
196 "ops",
197 "blobs",
198 "time",
199 ],
200 nullable: ["prev", "since"],
201 properties: {
202 seq: {
203 type: "integer",
204 },
205 rebase: {
206 type: "boolean",
207 },
208 tooBig: {
209 type: "boolean",
210 },
211 repo: {
212 type: "string",
213 format: "did",
214 },
215 commit: {
216 type: "cid-link",
217 },
218 prev: {
219 type: "cid-link",
220 },
221 rev: {
222 type: "string",
223 description: "The rev of the emitted commit",
224 },
225 since: {
226 type: "string",
227 description: "The rev of the last emitted commit from this repo",
228 },
229 blocks: {
230 type: "bytes",
231 description: "CAR file containing relevant blocks",
232 maxLength: 1000000,
233 },
234 ops: {
235 type: "array",
236 items: {
237 type: "ref",
238 ref: "lex:com.atproto.sync.subscribeRepos#repoOp",
239 },
240 maxLength: 200,
241 },
242 blobs: {
243 type: "array",
244 items: {
245 type: "cid-link",
246 },
247 },
248 time: {
249 type: "string",
250 format: "datetime",
251 },
252 },
253 },
254 handle: {
255 type: "object",
256 required: ["seq", "did", "handle", "time"],
257 properties: {
258 seq: {
259 type: "integer",
260 },
261 did: {
262 type: "string",
263 format: "did",
264 },
265 handle: {
266 type: "string",
267 format: "handle",
268 },
269 time: {
270 type: "string",
271 format: "datetime",
272 },
273 },
274 },
275 migrate: {
276 type: "object",
277 required: ["seq", "did", "migrateTo", "time"],
278 nullable: ["migrateTo"],
279 properties: {
280 seq: {
281 type: "integer",
282 },
283 did: {
284 type: "string",
285 format: "did",
286 },
287 migrateTo: {
288 type: "string",
289 },
290 time: {
291 type: "string",
292 format: "datetime",
293 },
294 },
295 },
296 tombstone: {
297 type: "object",
298 required: ["seq", "did", "time"],
299 properties: {
300 seq: {
301 type: "integer",
302 },
303 did: {
304 type: "string",
305 format: "did",
306 },
307 time: {
308 type: "string",
309 format: "datetime",
310 },
311 },
312 },
313 info: {
314 type: "object",
315 required: ["name"],
316 properties: {
317 name: {
318 type: "string",
319 knownValues: ["OutdatedCursor"],
320 },
321 message: {
322 type: "string",
323 },
324 },
325 },
326 repoOp: {
327 type: "object",
328 description:
329 "A repo operation, ie a write of a single record. For creates and updates, cid is the record's CID as of this operation. For deletes, it's null.",
330 required: ["action", "path", "cid"],
331 nullable: ["cid"],
332 properties: {
333 action: {
334 type: "string",
335 knownValues: ["create", "update", "delete"],
336 },
337 path: {
338 type: "string",
339 },
340 cid: {
341 type: "cid-link",
342 },
343 },
344 },
345 },
346};
347
348const lexicons = new Lexicons([ComAtprotoSyncSubscribeRepos]);
349
350export const isValidRepoEvent = (evt: unknown) => {
351 return lexicons.assertValidXrpcMessage<RepoEvent>(
352 "com.atproto.sync.subscribeRepos",
353 evt
354 );
355};