Scratch space for learning atproto app development
1import type { RepoRecord } from '@atproto/lexicon' 2import { cborToLexRecord, readCar } from '@atproto/repo' 3import { AtUri } from '@atproto/syntax' 4import { Subscription } from '@atproto/xrpc-server' 5import type { CID } from 'multiformats/cid' 6import { 7 type Account, 8 type Commit, 9 type Identity, 10 type RepoEvent, 11 isAccount, 12 isCommit, 13 isIdentity, 14 isValidRepoEvent, 15} from './lexicons' 16 17type Opts = { 18 service?: string 19 getCursor?: () => Promise<number | undefined> 20 setCursor?: (cursor: number) => Promise<void> 21 subscriptionReconnectDelay?: number 22 filterCollections?: string[] 23 excludeIdentity?: boolean 24 excludeAccount?: boolean 25 excludeCommit?: boolean 26} 27 28export class Firehose { 29 public sub: Subscription<RepoEvent> 30 private abortController: AbortController 31 32 constructor(public opts: Opts) { 33 this.abortController = new AbortController() 34 this.sub = new Subscription({ 35 service: opts.service ?? 'https://bsky.network', 36 method: 'com.atproto.sync.subscribeRepos', 37 signal: this.abortController.signal, 38 getParams: async () => { 39 if (!opts.getCursor) return undefined 40 const cursor = await opts.getCursor() 41 return { cursor } 42 }, 43 validate: (value: unknown) => { 44 try { 45 return isValidRepoEvent(value) 46 } catch (err) { 47 console.error('repo subscription skipped invalid message', err) 48 } 49 }, 50 }) 51 } 52 53 async *run(): AsyncGenerator<Event> { 54 try { 55 for await (const evt of this.sub) { 56 try { 57 if (isCommit(evt) && !this.opts.excludeCommit) { 58 const parsed = await parseCommit(evt) 59 for (const write of parsed) { 60 if ( 61 !this.opts.filterCollections || 62 this.opts.filterCollections.includes(write.uri.collection) 63 ) { 64 yield write 65 } 66 } 67 } else if (isAccount(evt) && !this.opts.excludeAccount) { 68 const parsed = parseAccount(evt) 69 if (parsed) { 70 yield parsed 71 } 72 } else if (isIdentity(evt) && !this.opts.excludeIdentity) { 73 yield parseIdentity(evt) 74 } 75 } catch (err) { 76 console.error('repo subscription could not handle message', err) 77 } 78 if (this.opts.setCursor && typeof evt.seq === 'number') { 79 await this.opts.setCursor(evt.seq) 80 } 81 } 82 } catch (err) { 83 console.error('repo subscription errored', err) 84 setTimeout(() => this.run(), this.opts.subscriptionReconnectDelay ?? 3000) 85 } 86 } 87 88 destroy() { 89 this.abortController.abort() 90 } 91} 92 93export const parseCommit = async (evt: Commit): Promise<CommitEvt[]> => { 94 const car = await readCar(evt.blocks) 95 96 const evts: CommitEvt[] = [] 97 98 for (const op of evt.ops) { 99 const uri = new AtUri(`at://${evt.repo}/${op.path}`) 100 101 const meta: CommitMeta = { 102 uri, 103 author: uri.host, 104 collection: uri.collection, 105 rkey: uri.rkey, 106 } 107 108 if (op.action === 'create' || op.action === 'update') { 109 if (!op.cid) continue 110 const recordBytes = car.blocks.get(op.cid) 111 if (!recordBytes) continue 112 const record = cborToLexRecord(recordBytes) 113 evts.push({ 114 ...meta, 115 event: op.action as 'create' | 'update', 116 cid: op.cid, 117 record, 118 }) 119 } 120 121 if (op.action === 'delete') { 122 evts.push({ 123 ...meta, 124 event: 'delete', 125 }) 126 } 127 } 128 129 return evts 130} 131 132export const parseIdentity = (evt: Identity): IdentityEvt => { 133 return { 134 event: 'identity', 135 did: evt.did, 136 handle: evt.handle, 137 } 138} 139 140export const parseAccount = (evt: Account): AccountEvt | undefined => { 141 if (evt.status && !isValidStatus(evt.status)) return 142 return { 143 event: 'account', 144 did: evt.did, 145 active: evt.active, 146 status: evt.status as AccountStatus, 147 } 148} 149 150const isValidStatus = (str: string): str is AccountStatus => { 151 return ['takendown', 'suspended', 'deleted', 'deactivated'].includes(str) 152} 153 154type Event = CommitEvt | IdentityEvt | AccountEvt 155 156type CommitMeta = { 157 uri: AtUri 158 author: string 159 collection: string 160 rkey: string 161} 162 163type CommitEvt = Create | Update | Delete 164 165type Create = CommitMeta & { 166 event: 'create' 167 record: RepoRecord 168 cid: CID 169} 170 171type Update = CommitMeta & { 172 event: 'update' 173 record: RepoRecord 174 cid: CID 175} 176 177type Delete = CommitMeta & { 178 event: 'delete' 179} 180 181type IdentityEvt = { 182 event: 'identity' 183 did: string 184 handle?: string 185} 186 187type AccountEvt = { 188 event: 'account' 189 did: string 190 active: boolean 191 status?: AccountStatus 192} 193 194type AccountStatus = 'takendown' | 'suspended' | 'deleted' | 'deactivated'