Scratch space for learning atproto app development
1import type { RepoRecord } from '@atproto/lexicon'
2import { cborToLexRecord, readCar } from '@atproto/repo'
3import { AtUri } from '@atproto/syntax'
4import { Subscription } from '@atproto/xrpc-server'
5import type { CID } from 'multiformats/cid'
6import {
7 type Account,
8 type Commit,
9 type Identity,
10 type RepoEvent,
11 isAccount,
12 isCommit,
13 isIdentity,
14 isValidRepoEvent,
15} from './lexicons'
16
17type Opts = {
18 service?: string
19 getCursor?: () => Promise<number | undefined>
20 setCursor?: (cursor: number) => Promise<void>
21 subscriptionReconnectDelay?: number
22 filterCollections?: string[]
23 excludeIdentity?: boolean
24 excludeAccount?: boolean
25 excludeCommit?: boolean
26}
27
28export class Firehose {
29 public sub: Subscription<RepoEvent>
30 private abortController: AbortController
31
32 constructor(public opts: Opts) {
33 this.abortController = new AbortController()
34 this.sub = new Subscription({
35 service: opts.service ?? 'https://bsky.network',
36 method: 'com.atproto.sync.subscribeRepos',
37 signal: this.abortController.signal,
38 getParams: async () => {
39 if (!opts.getCursor) return undefined
40 const cursor = await opts.getCursor()
41 return { cursor }
42 },
43 validate: (value: unknown) => {
44 try {
45 return isValidRepoEvent(value)
46 } catch (err) {
47 console.error('repo subscription skipped invalid message', err)
48 }
49 },
50 })
51 }
52
53 async *run(): AsyncGenerator<Event> {
54 try {
55 for await (const evt of this.sub) {
56 try {
57 if (isCommit(evt) && !this.opts.excludeCommit) {
58 const parsed = await parseCommit(evt)
59 for (const write of parsed) {
60 if (!this.opts.filterCollections || this.opts.filterCollections.includes(write.uri.collection)) {
61 yield write
62 }
63 }
64 } else if (isAccount(evt) && !this.opts.excludeAccount) {
65 const parsed = parseAccount(evt)
66 if (parsed) {
67 yield parsed
68 }
69 } else if (isIdentity(evt) && !this.opts.excludeIdentity) {
70 yield parseIdentity(evt)
71 }
72 } catch (err) {
73 console.error('repo subscription could not handle message', err)
74 }
75 if (this.opts.setCursor && typeof evt.seq === 'number') {
76 await this.opts.setCursor(evt.seq)
77 }
78 }
79 } catch (err) {
80 console.error('repo subscription errored', err)
81 setTimeout(() => this.run(), this.opts.subscriptionReconnectDelay ?? 3000)
82 }
83 }
84
85 destroy() {
86 this.abortController.abort()
87 }
88}
89
90export const parseCommit = async (evt: Commit): Promise<CommitEvt[]> => {
91 const car = await readCar(evt.blocks)
92
93 const evts: CommitEvt[] = []
94
95 for (const op of evt.ops) {
96 const uri = new AtUri(`at://${evt.repo}/${op.path}`)
97
98 const meta: CommitMeta = {
99 uri,
100 author: uri.host,
101 collection: uri.collection,
102 rkey: uri.rkey,
103 }
104
105 if (op.action === 'create' || op.action === 'update') {
106 if (!op.cid) continue
107 const recordBytes = car.blocks.get(op.cid)
108 if (!recordBytes) continue
109 const record = cborToLexRecord(recordBytes)
110 evts.push({
111 ...meta,
112 event: op.action as 'create' | 'update',
113 cid: op.cid,
114 record,
115 })
116 }
117
118 if (op.action === 'delete') {
119 evts.push({
120 ...meta,
121 event: 'delete',
122 })
123 }
124 }
125
126 return evts
127}
128
129export const parseIdentity = (evt: Identity): IdentityEvt => {
130 return {
131 event: 'identity',
132 did: evt.did,
133 handle: evt.handle,
134 }
135}
136
137export const parseAccount = (evt: Account): AccountEvt | undefined => {
138 if (evt.status && !isValidStatus(evt.status)) return
139 return {
140 event: 'account',
141 did: evt.did,
142 active: evt.active,
143 status: evt.status as AccountStatus,
144 }
145}
146
147const isValidStatus = (str: string): str is AccountStatus => {
148 return ['takendown', 'suspended', 'deleted', 'deactivated'].includes(str)
149}
150
151type Event = CommitEvt | IdentityEvt | AccountEvt
152
153type CommitMeta = {
154 uri: AtUri
155 author: string
156 collection: string
157 rkey: string
158}
159
160type CommitEvt = Create | Update | Delete
161
162type Create = CommitMeta & {
163 event: 'create'
164 record: RepoRecord
165 cid: CID
166}
167
168type Update = CommitMeta & {
169 event: 'update'
170}
171
172type Delete = CommitMeta & {
173 event: 'delete'
174}
175
176type IdentityEvt = {
177 event: 'identity'
178 did: string
179 handle?: string
180}
181
182type AccountEvt = {
183 event: 'account'
184 did: string
185 active: boolean
186 status?: AccountStatus
187}
188
189type AccountStatus = 'takendown' | 'suspended' | 'deleted' | 'deactivated'