Scratch space for learning atproto app development
1import type { IncomingMessage } from "node:http"; 2 3import { type LexiconDoc, Lexicons } from "@atproto/lexicon"; 4import type { ErrorFrame, HandlerAuth } from "@atproto/xrpc-server"; 5import type { CID } from "multiformats/cid"; 6 7// @NOTE: this file is an ugly copy job of codegen output. I'd like to clean this whole thing up 8 9export function isObj(v: unknown): v is Record<string, unknown> { 10 return typeof v === "object" && v !== null; 11} 12 13export function hasProp<K extends PropertyKey>( 14 data: object, 15 prop: K 16): data is Record<K, unknown> { 17 return prop in data; 18} 19 20export interface QueryParams { 21 /** The last known event to backfill from. */ 22 cursor?: number; 23} 24 25export type RepoEvent = 26 | Commit 27 | Handle 28 | Migrate 29 | Tombstone 30 | Info 31 | { $type: string; [k: string]: unknown }; 32export type HandlerError = ErrorFrame<"FutureCursor" | "ConsumerTooSlow">; 33export type HandlerOutput = HandlerError | RepoEvent; 34export type HandlerReqCtx<HA extends HandlerAuth = never> = { 35 auth: HA; 36 params: QueryParams; 37 req: IncomingMessage; 38 signal: AbortSignal; 39}; 40export type Handler<HA extends HandlerAuth = never> = ( 41 ctx: HandlerReqCtx<HA> 42) => AsyncIterable<HandlerOutput>; 43 44export interface Commit { 45 seq: number; 46 rebase: boolean; 47 tooBig: boolean; 48 repo: string; 49 commit: CID; 50 prev?: CID | null; 51 /** The rev of the emitted commit */ 52 rev: string; 53 /** The rev of the last emitted commit from this repo */ 54 since: string | null; 55 /** CAR file containing relevant blocks */ 56 blocks: Uint8Array; 57 ops: RepoOp[]; 58 blobs: CID[]; 59 time: string; 60 [k: string]: unknown; 61} 62 63export function isCommit(v: unknown): v is Commit { 64 return ( 65 isObj(v) && 66 hasProp(v, "$type") && 67 v.$type === "com.atproto.sync.subscribeRepos#commit" 68 ); 69} 70 71export interface Handle { 72 seq: number; 73 did: string; 74 handle: string; 75 time: string; 76 [k: string]: unknown; 77} 78 79export function isHandle(v: unknown): v is Handle { 80 return ( 81 isObj(v) && 82 hasProp(v, "$type") && 83 v.$type === "com.atproto.sync.subscribeRepos#handle" 84 ); 85} 86 87export interface Migrate { 88 seq: number; 89 did: string; 90 migrateTo: string | null; 91 time: string; 92 [k: string]: unknown; 93} 94 95export function isMigrate(v: unknown): v is Migrate { 96 return ( 97 isObj(v) && 98 hasProp(v, "$type") && 99 v.$type === "com.atproto.sync.subscribeRepos#migrate" 100 ); 101} 102 103export interface Tombstone { 104 seq: number; 105 did: string; 106 time: string; 107 [k: string]: unknown; 108} 109 110export function isTombstone(v: unknown): v is Tombstone { 111 return ( 112 isObj(v) && 113 hasProp(v, "$type") && 114 v.$type === "com.atproto.sync.subscribeRepos#tombstone" 115 ); 116} 117 118export interface Info { 119 name: "OutdatedCursor" | (string & {}); 120 message?: string; 121 [k: string]: unknown; 122} 123 124export function isInfo(v: unknown): v is Info { 125 return ( 126 isObj(v) && 127 hasProp(v, "$type") && 128 v.$type === "com.atproto.sync.subscribeRepos#info" 129 ); 130} 131 132/** A repo operation, ie a write of a single record. For creates and updates, cid is the record's CID as of this operation. For deletes, it's null. */ 133export interface RepoOp { 134 action: "create" | "update" | "delete" | (string & {}); 135 path: string; 136 cid: CID | null; 137 [k: string]: unknown; 138} 139 140export function isRepoOp(v: unknown): v is RepoOp { 141 return ( 142 isObj(v) && 143 hasProp(v, "$type") && 144 v.$type === "com.atproto.sync.subscribeRepos#repoOp" 145 ); 146} 147 148export const ComAtprotoSyncSubscribeRepos: LexiconDoc = { 149 lexicon: 1, 150 id: "com.atproto.sync.subscribeRepos", 151 defs: { 152 main: { 153 type: "subscription", 154 description: "Subscribe to repo updates", 155 parameters: { 156 type: "params", 157 properties: { 158 cursor: { 159 type: "integer", 160 description: "The last known event to backfill from.", 161 }, 162 }, 163 }, 164 message: { 165 schema: { 166 type: "union", 167 refs: [ 168 "lex:com.atproto.sync.subscribeRepos#commit", 169 "lex:com.atproto.sync.subscribeRepos#handle", 170 "lex:com.atproto.sync.subscribeRepos#migrate", 171 "lex:com.atproto.sync.subscribeRepos#tombstone", 172 "lex:com.atproto.sync.subscribeRepos#info", 173 ], 174 }, 175 }, 176 errors: [ 177 { 178 name: "FutureCursor", 179 }, 180 { 181 name: "ConsumerTooSlow", 182 }, 183 ], 184 }, 185 commit: { 186 type: "object", 187 required: [ 188 "seq", 189 "rebase", 190 "tooBig", 191 "repo", 192 "commit", 193 "rev", 194 "since", 195 "blocks", 196 "ops", 197 "blobs", 198 "time", 199 ], 200 nullable: ["prev", "since"], 201 properties: { 202 seq: { 203 type: "integer", 204 }, 205 rebase: { 206 type: "boolean", 207 }, 208 tooBig: { 209 type: "boolean", 210 }, 211 repo: { 212 type: "string", 213 format: "did", 214 }, 215 commit: { 216 type: "cid-link", 217 }, 218 prev: { 219 type: "cid-link", 220 }, 221 rev: { 222 type: "string", 223 description: "The rev of the emitted commit", 224 }, 225 since: { 226 type: "string", 227 description: "The rev of the last emitted commit from this repo", 228 }, 229 blocks: { 230 type: "bytes", 231 description: "CAR file containing relevant blocks", 232 maxLength: 1000000, 233 }, 234 ops: { 235 type: "array", 236 items: { 237 type: "ref", 238 ref: "lex:com.atproto.sync.subscribeRepos#repoOp", 239 }, 240 maxLength: 200, 241 }, 242 blobs: { 243 type: "array", 244 items: { 245 type: "cid-link", 246 }, 247 }, 248 time: { 249 type: "string", 250 format: "datetime", 251 }, 252 }, 253 }, 254 handle: { 255 type: "object", 256 required: ["seq", "did", "handle", "time"], 257 properties: { 258 seq: { 259 type: "integer", 260 }, 261 did: { 262 type: "string", 263 format: "did", 264 }, 265 handle: { 266 type: "string", 267 format: "handle", 268 }, 269 time: { 270 type: "string", 271 format: "datetime", 272 }, 273 }, 274 }, 275 migrate: { 276 type: "object", 277 required: ["seq", "did", "migrateTo", "time"], 278 nullable: ["migrateTo"], 279 properties: { 280 seq: { 281 type: "integer", 282 }, 283 did: { 284 type: "string", 285 format: "did", 286 }, 287 migrateTo: { 288 type: "string", 289 }, 290 time: { 291 type: "string", 292 format: "datetime", 293 }, 294 }, 295 }, 296 tombstone: { 297 type: "object", 298 required: ["seq", "did", "time"], 299 properties: { 300 seq: { 301 type: "integer", 302 }, 303 did: { 304 type: "string", 305 format: "did", 306 }, 307 time: { 308 type: "string", 309 format: "datetime", 310 }, 311 }, 312 }, 313 info: { 314 type: "object", 315 required: ["name"], 316 properties: { 317 name: { 318 type: "string", 319 knownValues: ["OutdatedCursor"], 320 }, 321 message: { 322 type: "string", 323 }, 324 }, 325 }, 326 repoOp: { 327 type: "object", 328 description: 329 "A repo operation, ie a write of a single record. For creates and updates, cid is the record's CID as of this operation. For deletes, it's null.", 330 required: ["action", "path", "cid"], 331 nullable: ["cid"], 332 properties: { 333 action: { 334 type: "string", 335 knownValues: ["create", "update", "delete"], 336 }, 337 path: { 338 type: "string", 339 }, 340 cid: { 341 type: "cid-link", 342 }, 343 }, 344 }, 345 }, 346}; 347 348const lexicons = new Lexicons([ComAtprotoSyncSubscribeRepos]); 349 350export const isValidRepoEvent = (evt: unknown) => { 351 return lexicons.assertValidXrpcMessage<RepoEvent>( 352 "com.atproto.sync.subscribeRepos", 353 evt 354 ); 355};