Scratch space for learning atproto app development
1import type { IncomingMessage } from 'node:http'
2
3import { type LexiconDoc, Lexicons } from '@atproto/lexicon'
4import type { ErrorFrame, HandlerAuth } from '@atproto/xrpc-server'
5import type { CID } from 'multiformats/cid'
6
7// @NOTE: this file is an ugly copy job of codegen output. I'd like to clean this whole thing up
8
9export function isObj(v: unknown): v is Record<string, unknown> {
10 return typeof v === 'object' && v !== null
11}
12
13export function hasProp<K extends PropertyKey>(data: object, prop: K): data is Record<K, unknown> {
14 return prop in data
15}
16
17export interface QueryParams {
18 /** The last known event seq number to backfill from. */
19 cursor?: number
20}
21
22export type RepoEvent =
23 | Commit
24 | Identity
25 | Account
26 | Handle
27 | Migrate
28 | Tombstone
29 | Info
30 | { $type: string; [k: string]: unknown }
31export type HandlerError = ErrorFrame<'FutureCursor' | 'ConsumerTooSlow'>
32export type HandlerOutput = HandlerError | RepoEvent
33export type HandlerReqCtx<HA extends HandlerAuth = never> = {
34 auth: HA
35 params: QueryParams
36 req: IncomingMessage
37 signal: AbortSignal
38}
39export type Handler<HA extends HandlerAuth = never> = (ctx: HandlerReqCtx<HA>) => AsyncIterable<HandlerOutput>
40
41/** Represents an update of repository state. Note that empty commits are allowed, which include no repo data changes, but an update to rev and signature. */
42export interface Commit {
43 /** The stream sequence number of this message. */
44 seq: number
45 /** DEPRECATED -- unused */
46 rebase: boolean
47 /** Indicates that this commit contained too many ops, or data size was too large. Consumers will need to make a separate request to get missing data. */
48 tooBig: boolean
49 /** The repo this event comes from. */
50 repo: string
51 /** Repo commit object CID. */
52 commit: CID
53 /** DEPRECATED -- unused. WARNING -- nullable and optional; stick with optional to ensure golang interoperability. */
54 prev?: CID | null
55 /** The rev of the emitted commit. Note that this information is also in the commit object included in blocks, unless this is a tooBig event. */
56 rev: string
57 /** The rev of the last emitted commit from this repo (if any). */
58 since: string | null
59 /** CAR file containing relevant blocks, as a diff since the previous repo state. */
60 blocks: Uint8Array
61 ops: RepoOp[]
62 blobs: CID[]
63 /** Timestamp of when this message was originally broadcast. */
64 time: string
65 [k: string]: unknown
66}
67
68export function isCommit(v: unknown): v is Commit {
69 return isObj(v) && hasProp(v, '$type') && v.$type === 'com.atproto.sync.subscribeRepos#commit'
70}
71
72/** Represents a change to an account's identity. Could be an updated handle, signing key, or pds hosting endpoint. Serves as a prod to all downstream services to refresh their identity cache. */
73export interface Identity {
74 seq: number
75 did: string
76 time: string
77 /** The current handle for the account, or 'handle.invalid' if validation fails. This field is optional, might have been validated or passed-through from an upstream source. Semantics and behaviors for PDS vs Relay may evolve in the future; see atproto specs for more details. */
78 handle?: string
79 [k: string]: unknown
80}
81
82export function isIdentity(v: unknown): v is Identity {
83 return isObj(v) && hasProp(v, '$type') && v.$type === 'com.atproto.sync.subscribeRepos#identity'
84}
85
86/** Represents a change to an account's status on a host (eg, PDS or Relay). The semantics of this event are that the status is at the host which emitted the event, not necessarily that at the currently active PDS. Eg, a Relay takedown would emit a takedown with active=false, even if the PDS is still active. */
87export interface Account {
88 seq: number
89 did: string
90 time: string
91 /** Indicates that the account has a repository which can be fetched from the host that emitted this event. */
92 active: boolean
93 /** If active=false, this optional field indicates a reason for why the account is not active. */
94 status?: 'takendown' | 'suspended' | 'deleted' | 'deactivated' | (string & {})
95 [k: string]: unknown
96}
97
98export function isAccount(v: unknown): v is Account {
99 return isObj(v) && hasProp(v, '$type') && v.$type === 'com.atproto.sync.subscribeRepos#account'
100}
101
102/** DEPRECATED -- Use #identity event instead */
103export interface Handle {
104 seq: number
105 did: string
106 handle: string
107 time: string
108 [k: string]: unknown
109}
110
111export function isHandle(v: unknown): v is Handle {
112 return isObj(v) && hasProp(v, '$type') && v.$type === 'com.atproto.sync.subscribeRepos#handle'
113}
114
115/** DEPRECATED -- Use #account event instead */
116export interface Migrate {
117 seq: number
118 did: string
119 migrateTo: string | null
120 time: string
121 [k: string]: unknown
122}
123
124export function isMigrate(v: unknown): v is Migrate {
125 return isObj(v) && hasProp(v, '$type') && v.$type === 'com.atproto.sync.subscribeRepos#migrate'
126}
127
128/** DEPRECATED -- Use #account event instead */
129export interface Tombstone {
130 seq: number
131 did: string
132 time: string
133 [k: string]: unknown
134}
135
136export function isTombstone(v: unknown): v is Tombstone {
137 return isObj(v) && hasProp(v, '$type') && v.$type === 'com.atproto.sync.subscribeRepos#tombstone'
138}
139
140export interface Info {
141 name: 'OutdatedCursor' | (string & {})
142 message?: string
143 [k: string]: unknown
144}
145
146export function isInfo(v: unknown): v is Info {
147 return isObj(v) && hasProp(v, '$type') && v.$type === 'com.atproto.sync.subscribeRepos#info'
148}
149
150/** A repo operation, ie a mutation of a single record. */
151export interface RepoOp {
152 action: 'create' | 'update' | 'delete' | (string & {})
153 path: string
154 /** For creates and updates, the new record CID. For deletions, null. */
155 cid: CID | null
156 [k: string]: unknown
157}
158
159export function isRepoOp(v: unknown): v is RepoOp {
160 return isObj(v) && hasProp(v, '$type') && v.$type === 'com.atproto.sync.subscribeRepos#repoOp'
161}
162
163export const ComAtprotoSyncSubscribeRepos: LexiconDoc = {
164 lexicon: 1,
165 id: 'com.atproto.sync.subscribeRepos',
166 defs: {
167 main: {
168 type: 'subscription',
169 description: 'Subscribe to repo updates',
170 parameters: {
171 type: 'params',
172 properties: {
173 cursor: {
174 type: 'integer',
175 description: 'The last known event to backfill from.',
176 },
177 },
178 },
179 message: {
180 schema: {
181 type: 'union',
182 refs: [
183 'lex:com.atproto.sync.subscribeRepos#commit',
184 'lex:com.atproto.sync.subscribeRepos#handle',
185 'lex:com.atproto.sync.subscribeRepos#migrate',
186 'lex:com.atproto.sync.subscribeRepos#tombstone',
187 'lex:com.atproto.sync.subscribeRepos#info',
188 ],
189 },
190 },
191 errors: [
192 {
193 name: 'FutureCursor',
194 },
195 {
196 name: 'ConsumerTooSlow',
197 },
198 ],
199 },
200 commit: {
201 type: 'object',
202 required: ['seq', 'rebase', 'tooBig', 'repo', 'commit', 'rev', 'since', 'blocks', 'ops', 'blobs', 'time'],
203 nullable: ['prev', 'since'],
204 properties: {
205 seq: {
206 type: 'integer',
207 },
208 rebase: {
209 type: 'boolean',
210 },
211 tooBig: {
212 type: 'boolean',
213 },
214 repo: {
215 type: 'string',
216 format: 'did',
217 },
218 commit: {
219 type: 'cid-link',
220 },
221 prev: {
222 type: 'cid-link',
223 },
224 rev: {
225 type: 'string',
226 description: 'The rev of the emitted commit',
227 },
228 since: {
229 type: 'string',
230 description: 'The rev of the last emitted commit from this repo',
231 },
232 blocks: {
233 type: 'bytes',
234 description: 'CAR file containing relevant blocks',
235 maxLength: 1000000,
236 },
237 ops: {
238 type: 'array',
239 items: {
240 type: 'ref',
241 ref: 'lex:com.atproto.sync.subscribeRepos#repoOp',
242 },
243 maxLength: 200,
244 },
245 blobs: {
246 type: 'array',
247 items: {
248 type: 'cid-link',
249 },
250 },
251 time: {
252 type: 'string',
253 format: 'datetime',
254 },
255 },
256 },
257 handle: {
258 type: 'object',
259 required: ['seq', 'did', 'handle', 'time'],
260 properties: {
261 seq: {
262 type: 'integer',
263 },
264 did: {
265 type: 'string',
266 format: 'did',
267 },
268 handle: {
269 type: 'string',
270 format: 'handle',
271 },
272 time: {
273 type: 'string',
274 format: 'datetime',
275 },
276 },
277 },
278 migrate: {
279 type: 'object',
280 required: ['seq', 'did', 'migrateTo', 'time'],
281 nullable: ['migrateTo'],
282 properties: {
283 seq: {
284 type: 'integer',
285 },
286 did: {
287 type: 'string',
288 format: 'did',
289 },
290 migrateTo: {
291 type: 'string',
292 },
293 time: {
294 type: 'string',
295 format: 'datetime',
296 },
297 },
298 },
299 tombstone: {
300 type: 'object',
301 required: ['seq', 'did', 'time'],
302 properties: {
303 seq: {
304 type: 'integer',
305 },
306 did: {
307 type: 'string',
308 format: 'did',
309 },
310 time: {
311 type: 'string',
312 format: 'datetime',
313 },
314 },
315 },
316 info: {
317 type: 'object',
318 required: ['name'],
319 properties: {
320 name: {
321 type: 'string',
322 knownValues: ['OutdatedCursor'],
323 },
324 message: {
325 type: 'string',
326 },
327 },
328 },
329 repoOp: {
330 type: 'object',
331 description:
332 "A repo operation, ie a write of a single record. For creates and updates, cid is the record's CID as of this operation. For deletes, it's null.",
333 required: ['action', 'path', 'cid'],
334 nullable: ['cid'],
335 properties: {
336 action: {
337 type: 'string',
338 knownValues: ['create', 'update', 'delete'],
339 },
340 path: {
341 type: 'string',
342 },
343 cid: {
344 type: 'cid-link',
345 },
346 },
347 },
348 },
349}
350
351const lexicons = new Lexicons([ComAtprotoSyncSubscribeRepos])
352
353export const isValidRepoEvent = (evt: unknown) => {
354 return lexicons.assertValidXrpcMessage<RepoEvent>('com.atproto.sync.subscribeRepos', evt)
355}