Scratch space for learning atproto app development
1import type { RepoRecord } from '@atproto/lexicon' 2import { cborToLexRecord, readCar } from '@atproto/repo' 3import { AtUri } from '@atproto/syntax' 4import { Subscription } from '@atproto/xrpc-server' 5import type { CID } from 'multiformats/cid' 6import { 7 type Account, 8 type Commit, 9 type Identity, 10 type RepoEvent, 11 isAccount, 12 isCommit, 13 isIdentity, 14 isValidRepoEvent, 15} from './lexicons' 16 17type Opts = { 18 service?: string 19 getCursor?: () => Promise<number | undefined> 20 setCursor?: (cursor: number) => Promise<void> 21 subscriptionReconnectDelay?: number 22 filterCollections?: string[] 23 excludeIdentity?: boolean 24 excludeAccount?: boolean 25 excludeCommit?: boolean 26} 27 28export class Firehose { 29 public sub: Subscription<RepoEvent> 30 private abortController: AbortController 31 32 constructor(public opts: Opts) { 33 this.abortController = new AbortController() 34 this.sub = new Subscription({ 35 service: opts.service ?? 'https://bsky.network', 36 method: 'com.atproto.sync.subscribeRepos', 37 signal: this.abortController.signal, 38 getParams: async () => { 39 if (!opts.getCursor) return undefined 40 const cursor = await opts.getCursor() 41 return { cursor } 42 }, 43 validate: (value: unknown) => { 44 try { 45 return isValidRepoEvent(value) 46 } catch (err) { 47 console.error('repo subscription skipped invalid message', err) 48 } 49 }, 50 }) 51 } 52 53 async *run(): AsyncGenerator<Event> { 54 try { 55 for await (const evt of this.sub) { 56 try { 57 if (isCommit(evt) && !this.opts.excludeCommit) { 58 const parsed = await parseCommit(evt) 59 for (const write of parsed) { 60 if (!this.opts.filterCollections || this.opts.filterCollections.includes(write.uri.collection)) { 61 yield write 62 } 63 } 64 } else if (isAccount(evt) && !this.opts.excludeAccount) { 65 const parsed = parseAccount(evt) 66 if (parsed) { 67 yield parsed 68 } 69 } else if (isIdentity(evt) && !this.opts.excludeIdentity) { 70 yield parseIdentity(evt) 71 } 72 } catch (err) { 73 console.error('repo subscription could not handle message', err) 74 } 75 if (this.opts.setCursor && typeof evt.seq === 'number') { 76 await this.opts.setCursor(evt.seq) 77 } 78 } 79 } catch (err) { 80 console.error('repo subscription errored', err) 81 setTimeout(() => this.run(), this.opts.subscriptionReconnectDelay ?? 3000) 82 } 83 } 84 85 destroy() { 86 this.abortController.abort() 87 } 88} 89 90export const parseCommit = async (evt: Commit): Promise<CommitEvt[]> => { 91 const car = await readCar(evt.blocks) 92 93 const evts: CommitEvt[] = [] 94 95 for (const op of evt.ops) { 96 const uri = new AtUri(`at://${evt.repo}/${op.path}`) 97 98 const meta: CommitMeta = { 99 uri, 100 author: uri.host, 101 collection: uri.collection, 102 rkey: uri.rkey, 103 } 104 105 if (op.action === 'create' || op.action === 'update') { 106 if (!op.cid) continue 107 const recordBytes = car.blocks.get(op.cid) 108 if (!recordBytes) continue 109 const record = cborToLexRecord(recordBytes) 110 evts.push({ 111 ...meta, 112 event: op.action as 'create' | 'update', 113 cid: op.cid, 114 record, 115 }) 116 } 117 118 if (op.action === 'delete') { 119 evts.push({ 120 ...meta, 121 event: 'delete', 122 }) 123 } 124 } 125 126 return evts 127} 128 129export const parseIdentity = (evt: Identity): IdentityEvt => { 130 return { 131 event: 'identity', 132 did: evt.did, 133 handle: evt.handle, 134 } 135} 136 137export const parseAccount = (evt: Account): AccountEvt | undefined => { 138 if (evt.status && !isValidStatus(evt.status)) return 139 return { 140 event: 'account', 141 did: evt.did, 142 active: evt.active, 143 status: evt.status as AccountStatus, 144 } 145} 146 147const isValidStatus = (str: string): str is AccountStatus => { 148 return ['takendown', 'suspended', 'deleted', 'deactivated'].includes(str) 149} 150 151type Event = CommitEvt | IdentityEvt | AccountEvt 152 153type CommitMeta = { 154 uri: AtUri 155 author: string 156 collection: string 157 rkey: string 158} 159 160type CommitEvt = Create | Update | Delete 161 162type Create = CommitMeta & { 163 event: 'create' 164 record: RepoRecord 165 cid: CID 166} 167 168type Update = CommitMeta & { 169 event: 'update' 170} 171 172type Delete = CommitMeta & { 173 event: 'delete' 174} 175 176type IdentityEvt = { 177 event: 'identity' 178 did: string 179 handle?: string 180} 181 182type AccountEvt = { 183 event: 'account' 184 did: string 185 active: boolean 186 status?: AccountStatus 187} 188 189type AccountStatus = 'takendown' | 'suspended' | 'deleted' | 'deactivated'