Scratch space for learning atproto app development
1import type { RepoRecord } from '@atproto/lexicon'
2import { cborToLexRecord, readCar } from '@atproto/repo'
3import { AtUri } from '@atproto/syntax'
4import { Subscription } from '@atproto/xrpc-server'
5import type { CID } from 'multiformats/cid'
6import {
7 type Account,
8 type Commit,
9 type Identity,
10 type RepoEvent,
11 isAccount,
12 isCommit,
13 isIdentity,
14 isValidRepoEvent,
15} from './lexicons'
16
17type Opts = {
18 service?: string
19 getCursor?: () => Promise<number | undefined>
20 setCursor?: (cursor: number) => Promise<void>
21 subscriptionReconnectDelay?: number
22 filterCollections?: string[]
23 excludeIdentity?: boolean
24 excludeAccount?: boolean
25 excludeCommit?: boolean
26}
27
28export class Firehose {
29 public sub: Subscription<RepoEvent>
30 private abortController: AbortController
31
32 constructor(public opts: Opts) {
33 this.abortController = new AbortController()
34 this.sub = new Subscription({
35 service: opts.service ?? 'https://bsky.network',
36 method: 'com.atproto.sync.subscribeRepos',
37 signal: this.abortController.signal,
38 getParams: async () => {
39 if (!opts.getCursor) return undefined
40 const cursor = await opts.getCursor()
41 return { cursor }
42 },
43 validate: (value: unknown) => {
44 try {
45 return isValidRepoEvent(value)
46 } catch (err) {
47 console.error('repo subscription skipped invalid message', err)
48 }
49 },
50 })
51 }
52
53 async *run(): AsyncGenerator<Event> {
54 try {
55 for await (const evt of this.sub) {
56 try {
57 if (isCommit(evt) && !this.opts.excludeCommit) {
58 const parsed = await parseCommit(evt)
59 for (const write of parsed) {
60 if (
61 !this.opts.filterCollections ||
62 this.opts.filterCollections.includes(write.uri.collection)
63 ) {
64 yield write
65 }
66 }
67 } else if (isAccount(evt) && !this.opts.excludeAccount) {
68 const parsed = parseAccount(evt)
69 if (parsed) {
70 yield parsed
71 }
72 } else if (isIdentity(evt) && !this.opts.excludeIdentity) {
73 yield parseIdentity(evt)
74 }
75 } catch (err) {
76 console.error('repo subscription could not handle message', err)
77 }
78 if (this.opts.setCursor && typeof evt.seq === 'number') {
79 await this.opts.setCursor(evt.seq)
80 }
81 }
82 } catch (err) {
83 console.error('repo subscription errored', err)
84 setTimeout(() => this.run(), this.opts.subscriptionReconnectDelay ?? 3000)
85 }
86 }
87
88 destroy() {
89 this.abortController.abort()
90 }
91}
92
93export const parseCommit = async (evt: Commit): Promise<CommitEvt[]> => {
94 const car = await readCar(evt.blocks)
95
96 const evts: CommitEvt[] = []
97
98 for (const op of evt.ops) {
99 const uri = new AtUri(`at://${evt.repo}/${op.path}`)
100
101 const meta: CommitMeta = {
102 uri,
103 author: uri.host,
104 collection: uri.collection,
105 rkey: uri.rkey,
106 }
107
108 if (op.action === 'create' || op.action === 'update') {
109 if (!op.cid) continue
110 const recordBytes = car.blocks.get(op.cid)
111 if (!recordBytes) continue
112 const record = cborToLexRecord(recordBytes)
113 evts.push({
114 ...meta,
115 event: op.action as 'create' | 'update',
116 cid: op.cid,
117 record,
118 })
119 }
120
121 if (op.action === 'delete') {
122 evts.push({
123 ...meta,
124 event: 'delete',
125 })
126 }
127 }
128
129 return evts
130}
131
132export const parseIdentity = (evt: Identity): IdentityEvt => {
133 return {
134 event: 'identity',
135 did: evt.did,
136 handle: evt.handle,
137 }
138}
139
140export const parseAccount = (evt: Account): AccountEvt | undefined => {
141 if (evt.status && !isValidStatus(evt.status)) return
142 return {
143 event: 'account',
144 did: evt.did,
145 active: evt.active,
146 status: evt.status as AccountStatus,
147 }
148}
149
150const isValidStatus = (str: string): str is AccountStatus => {
151 return ['takendown', 'suspended', 'deleted', 'deactivated'].includes(str)
152}
153
154type Event = CommitEvt | IdentityEvt | AccountEvt
155
156type CommitMeta = {
157 uri: AtUri
158 author: string
159 collection: string
160 rkey: string
161}
162
163type CommitEvt = Create | Update | Delete
164
165type Create = CommitMeta & {
166 event: 'create'
167 record: RepoRecord
168 cid: CID
169}
170
171type Update = CommitMeta & {
172 event: 'update'
173 record: RepoRecord
174 cid: CID
175}
176
177type Delete = CommitMeta & {
178 event: 'delete'
179}
180
181type IdentityEvt = {
182 event: 'identity'
183 did: string
184 handle?: string
185}
186
187type AccountEvt = {
188 event: 'account'
189 did: string
190 active: boolean
191 status?: AccountStatus
192}
193
194type AccountStatus = 'takendown' | 'suspended' | 'deleted' | 'deactivated'