Scratch space for learning atproto app development
1import type { IncomingMessage } from "node:http"; 2 3import { type LexiconDoc, Lexicons } from "@atproto/lexicon"; 4import type { ErrorFrame, HandlerAuth } from "@atproto/xrpc-server"; 5import type { CID } from "multiformats/cid"; 6 7// @NOTE: this file is an ugly copy job of codegen output. I'd like to clean this whole thing up 8 9export function isObj(v: unknown): v is Record<string, unknown> { 10 return typeof v === "object" && v !== null; 11} 12 13export function hasProp<K extends PropertyKey>( 14 data: object, 15 prop: K 16): data is Record<K, unknown> { 17 return prop in data; 18} 19 20export interface QueryParams { 21 /** The last known event seq number to backfill from. */ 22 cursor?: number; 23} 24 25export type RepoEvent = 26 | Commit 27 | Identity 28 | Account 29 | Handle 30 | Migrate 31 | Tombstone 32 | Info 33 | { $type: string; [k: string]: unknown }; 34export type HandlerError = ErrorFrame<"FutureCursor" | "ConsumerTooSlow">; 35export type HandlerOutput = HandlerError | RepoEvent; 36export type HandlerReqCtx<HA extends HandlerAuth = never> = { 37 auth: HA; 38 params: QueryParams; 39 req: IncomingMessage; 40 signal: AbortSignal; 41}; 42export type Handler<HA extends HandlerAuth = never> = ( 43 ctx: HandlerReqCtx<HA> 44) => AsyncIterable<HandlerOutput>; 45 46/** Represents an update of repository state. Note that empty commits are allowed, which include no repo data changes, but an update to rev and signature. */ 47export interface Commit { 48 /** The stream sequence number of this message. */ 49 seq: number; 50 /** DEPRECATED -- unused */ 51 rebase: boolean; 52 /** Indicates that this commit contained too many ops, or data size was too large. Consumers will need to make a separate request to get missing data. */ 53 tooBig: boolean; 54 /** The repo this event comes from. */ 55 repo: string; 56 /** Repo commit object CID. */ 57 commit: CID; 58 /** DEPRECATED -- unused. WARNING -- nullable and optional; stick with optional to ensure golang interoperability. */ 59 prev?: CID | null; 60 /** The rev of the emitted commit. Note that this information is also in the commit object included in blocks, unless this is a tooBig event. */ 61 rev: string; 62 /** The rev of the last emitted commit from this repo (if any). */ 63 since: string | null; 64 /** CAR file containing relevant blocks, as a diff since the previous repo state. */ 65 blocks: Uint8Array; 66 ops: RepoOp[]; 67 blobs: CID[]; 68 /** Timestamp of when this message was originally broadcast. */ 69 time: string; 70 [k: string]: unknown; 71} 72 73export function isCommit(v: unknown): v is Commit { 74 return ( 75 isObj(v) && 76 hasProp(v, "$type") && 77 v.$type === "com.atproto.sync.subscribeRepos#commit" 78 ); 79} 80 81/** Represents a change to an account's identity. Could be an updated handle, signing key, or pds hosting endpoint. Serves as a prod to all downstream services to refresh their identity cache. */ 82export interface Identity { 83 seq: number; 84 did: string; 85 time: string; 86 /** The current handle for the account, or 'handle.invalid' if validation fails. This field is optional, might have been validated or passed-through from an upstream source. Semantics and behaviors for PDS vs Relay may evolve in the future; see atproto specs for more details. */ 87 handle?: string; 88 [k: string]: unknown; 89} 90 91export function isIdentity(v: unknown): v is Identity { 92 return ( 93 isObj(v) && 94 hasProp(v, "$type") && 95 v.$type === "com.atproto.sync.subscribeRepos#identity" 96 ); 97} 98 99/** Represents a change to an account's status on a host (eg, PDS or Relay). The semantics of this event are that the status is at the host which emitted the event, not necessarily that at the currently active PDS. Eg, a Relay takedown would emit a takedown with active=false, even if the PDS is still active. */ 100export interface Account { 101 seq: number; 102 did: string; 103 time: string; 104 /** Indicates that the account has a repository which can be fetched from the host that emitted this event. */ 105 active: boolean; 106 /** If active=false, this optional field indicates a reason for why the account is not active. */ 107 status?: 108 | "takendown" 109 | "suspended" 110 | "deleted" 111 | "deactivated" 112 | (string & {}); 113 [k: string]: unknown; 114} 115 116export function isAccount(v: unknown): v is Account { 117 return ( 118 isObj(v) && 119 hasProp(v, "$type") && 120 v.$type === "com.atproto.sync.subscribeRepos#account" 121 ); 122} 123 124/** DEPRECATED -- Use #identity event instead */ 125export interface Handle { 126 seq: number; 127 did: string; 128 handle: string; 129 time: string; 130 [k: string]: unknown; 131} 132 133export function isHandle(v: unknown): v is Handle { 134 return ( 135 isObj(v) && 136 hasProp(v, "$type") && 137 v.$type === "com.atproto.sync.subscribeRepos#handle" 138 ); 139} 140 141/** DEPRECATED -- Use #account event instead */ 142export interface Migrate { 143 seq: number; 144 did: string; 145 migrateTo: string | null; 146 time: string; 147 [k: string]: unknown; 148} 149 150export function isMigrate(v: unknown): v is Migrate { 151 return ( 152 isObj(v) && 153 hasProp(v, "$type") && 154 v.$type === "com.atproto.sync.subscribeRepos#migrate" 155 ); 156} 157 158/** DEPRECATED -- Use #account event instead */ 159export interface Tombstone { 160 seq: number; 161 did: string; 162 time: string; 163 [k: string]: unknown; 164} 165 166export function isTombstone(v: unknown): v is Tombstone { 167 return ( 168 isObj(v) && 169 hasProp(v, "$type") && 170 v.$type === "com.atproto.sync.subscribeRepos#tombstone" 171 ); 172} 173 174export interface Info { 175 name: "OutdatedCursor" | (string & {}); 176 message?: string; 177 [k: string]: unknown; 178} 179 180export function isInfo(v: unknown): v is Info { 181 return ( 182 isObj(v) && 183 hasProp(v, "$type") && 184 v.$type === "com.atproto.sync.subscribeRepos#info" 185 ); 186} 187 188/** A repo operation, ie a mutation of a single record. */ 189export interface RepoOp { 190 action: "create" | "update" | "delete" | (string & {}); 191 path: string; 192 /** For creates and updates, the new record CID. For deletions, null. */ 193 cid: CID | null; 194 [k: string]: unknown; 195} 196 197export function isRepoOp(v: unknown): v is RepoOp { 198 return ( 199 isObj(v) && 200 hasProp(v, "$type") && 201 v.$type === "com.atproto.sync.subscribeRepos#repoOp" 202 ); 203} 204 205export const ComAtprotoSyncSubscribeRepos: LexiconDoc = { 206 lexicon: 1, 207 id: "com.atproto.sync.subscribeRepos", 208 defs: { 209 main: { 210 type: "subscription", 211 description: "Subscribe to repo updates", 212 parameters: { 213 type: "params", 214 properties: { 215 cursor: { 216 type: "integer", 217 description: "The last known event to backfill from.", 218 }, 219 }, 220 }, 221 message: { 222 schema: { 223 type: "union", 224 refs: [ 225 "lex:com.atproto.sync.subscribeRepos#commit", 226 "lex:com.atproto.sync.subscribeRepos#handle", 227 "lex:com.atproto.sync.subscribeRepos#migrate", 228 "lex:com.atproto.sync.subscribeRepos#tombstone", 229 "lex:com.atproto.sync.subscribeRepos#info", 230 ], 231 }, 232 }, 233 errors: [ 234 { 235 name: "FutureCursor", 236 }, 237 { 238 name: "ConsumerTooSlow", 239 }, 240 ], 241 }, 242 commit: { 243 type: "object", 244 required: [ 245 "seq", 246 "rebase", 247 "tooBig", 248 "repo", 249 "commit", 250 "rev", 251 "since", 252 "blocks", 253 "ops", 254 "blobs", 255 "time", 256 ], 257 nullable: ["prev", "since"], 258 properties: { 259 seq: { 260 type: "integer", 261 }, 262 rebase: { 263 type: "boolean", 264 }, 265 tooBig: { 266 type: "boolean", 267 }, 268 repo: { 269 type: "string", 270 format: "did", 271 }, 272 commit: { 273 type: "cid-link", 274 }, 275 prev: { 276 type: "cid-link", 277 }, 278 rev: { 279 type: "string", 280 description: "The rev of the emitted commit", 281 }, 282 since: { 283 type: "string", 284 description: "The rev of the last emitted commit from this repo", 285 }, 286 blocks: { 287 type: "bytes", 288 description: "CAR file containing relevant blocks", 289 maxLength: 1000000, 290 }, 291 ops: { 292 type: "array", 293 items: { 294 type: "ref", 295 ref: "lex:com.atproto.sync.subscribeRepos#repoOp", 296 }, 297 maxLength: 200, 298 }, 299 blobs: { 300 type: "array", 301 items: { 302 type: "cid-link", 303 }, 304 }, 305 time: { 306 type: "string", 307 format: "datetime", 308 }, 309 }, 310 }, 311 handle: { 312 type: "object", 313 required: ["seq", "did", "handle", "time"], 314 properties: { 315 seq: { 316 type: "integer", 317 }, 318 did: { 319 type: "string", 320 format: "did", 321 }, 322 handle: { 323 type: "string", 324 format: "handle", 325 }, 326 time: { 327 type: "string", 328 format: "datetime", 329 }, 330 }, 331 }, 332 migrate: { 333 type: "object", 334 required: ["seq", "did", "migrateTo", "time"], 335 nullable: ["migrateTo"], 336 properties: { 337 seq: { 338 type: "integer", 339 }, 340 did: { 341 type: "string", 342 format: "did", 343 }, 344 migrateTo: { 345 type: "string", 346 }, 347 time: { 348 type: "string", 349 format: "datetime", 350 }, 351 }, 352 }, 353 tombstone: { 354 type: "object", 355 required: ["seq", "did", "time"], 356 properties: { 357 seq: { 358 type: "integer", 359 }, 360 did: { 361 type: "string", 362 format: "did", 363 }, 364 time: { 365 type: "string", 366 format: "datetime", 367 }, 368 }, 369 }, 370 info: { 371 type: "object", 372 required: ["name"], 373 properties: { 374 name: { 375 type: "string", 376 knownValues: ["OutdatedCursor"], 377 }, 378 message: { 379 type: "string", 380 }, 381 }, 382 }, 383 repoOp: { 384 type: "object", 385 description: 386 "A repo operation, ie a write of a single record. For creates and updates, cid is the record's CID as of this operation. For deletes, it's null.", 387 required: ["action", "path", "cid"], 388 nullable: ["cid"], 389 properties: { 390 action: { 391 type: "string", 392 knownValues: ["create", "update", "delete"], 393 }, 394 path: { 395 type: "string", 396 }, 397 cid: { 398 type: "cid-link", 399 }, 400 }, 401 }, 402 }, 403}; 404 405const lexicons = new Lexicons([ComAtprotoSyncSubscribeRepos]); 406 407export const isValidRepoEvent = (evt: unknown) => { 408 return lexicons.assertValidXrpcMessage<RepoEvent>( 409 "com.atproto.sync.subscribeRepos", 410 evt 411 ); 412};