forked from
microcosm.blue/microcosm-rs
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1mod jetstream;
2mod jsonl_file;
3
4use crate::storage::LinkStorage;
5use crate::{ActionableEvent, RecordId};
6use anyhow::Result;
7use jetstream::consume_jetstream;
8use jsonl_file::consume_jsonl_file;
9use links::collect_links;
10use metrics::{counter, describe_counter, describe_histogram, histogram, Unit};
11use std::path::PathBuf;
12use std::sync::atomic::{AtomicU32, Ordering};
13use std::sync::Arc;
14use std::thread;
15use tinyjson::JsonValue;
16use tokio_util::sync::CancellationToken;
17
18pub fn consume(
19 mut store: impl LinkStorage,
20 qsize: Arc<AtomicU32>,
21 fixture: Option<PathBuf>,
22 stream: String,
23 staying_alive: CancellationToken,
24) -> Result<()> {
25 describe_counter!(
26 "consumer_events_non_actionable",
27 Unit::Count,
28 "count of non-actionable events"
29 );
30 describe_counter!(
31 "consumer_events_actionable",
32 Unit::Count,
33 "count of action by type. *all* atproto record delete events are included"
34 );
35 describe_counter!(
36 "consumer_events_actionable_links",
37 Unit::Count,
38 "total links encountered"
39 );
40 describe_histogram!(
41 "consumer_events_actionable_links",
42 Unit::Count,
43 "number of links per message"
44 );
45
46 let (receiver, consumer_handle) = if let Some(f) = fixture {
47 let (sender, receiver) = flume::bounded(21);
48 (
49 receiver,
50 thread::spawn(move || consume_jsonl_file(f, sender)),
51 )
52 } else {
53 let (sender, receiver) = flume::bounded(32_768); // eek
54 let cursor = store.get_cursor().unwrap();
55 (
56 receiver,
57 thread::spawn(move || consume_jetstream(sender, cursor, stream, staying_alive)),
58 )
59 };
60
61 for update in receiver.iter() {
62 if let Some((action, ts)) = get_actionable(&update) {
63 {
64 store.push(&action, ts).unwrap();
65 qsize.store(receiver.len().try_into().unwrap(), Ordering::Relaxed);
66 }
67 } else {
68 counter!("consumer_events_non_actionable").increment(1);
69 }
70 }
71
72 consumer_handle.join().unwrap()
73}
74
75pub fn get_actionable(event: &JsonValue) -> Option<(ActionableEvent, u64)> {
76 let JsonValue::Object(root) = event else {
77 return None;
78 };
79 let JsonValue::Number(time_us) = root.get("time_us")? else {
80 return None;
81 };
82 let cursor = *time_us as u64;
83 // todo: clean up
84 match event {
85 JsonValue::Object(root)
86 if root.get("kind") == Some(&JsonValue::String("commit".to_string())) =>
87 {
88 let JsonValue::String(did) = root.get("did")? else {
89 return None;
90 };
91 let JsonValue::Object(commit) = root.get("commit")? else {
92 return None;
93 };
94 let JsonValue::String(collection) = commit.get("collection")? else {
95 return None;
96 };
97 let JsonValue::String(rkey) = commit.get("rkey")? else {
98 return None;
99 };
100 match commit.get("operation")? {
101 JsonValue::String(op) if op == "create" => {
102 let links = collect_links(commit.get("record")?);
103 counter!("consumer_events_actionable", "action_type" => "create_links", "collection" => collection.clone()).increment(1);
104 histogram!("consumer_events_actionable_links", "action_type" => "create_links", "collection" => collection.clone()).record(links.len() as f64);
105 for link in &links {
106 counter!("consumer_events_actionable_links",
107 "action_type" => "create_links",
108 "collection" => collection.clone(),
109 "path" => link.path.clone(),
110 "link_type" => link.target.name(),
111 )
112 .increment(links.len() as u64);
113 }
114 if links.is_empty() {
115 None
116 } else {
117 Some((
118 ActionableEvent::CreateLinks {
119 record_id: RecordId {
120 did: did.into(),
121 collection: collection.clone(),
122 rkey: rkey.clone(),
123 },
124 links,
125 },
126 cursor,
127 ))
128 }
129 }
130 JsonValue::String(op) if op == "update" => {
131 let links = collect_links(commit.get("record")?);
132 counter!("consumer_events_actionable", "action_type" => "update_links", "collection" => collection.clone()).increment(1);
133 histogram!("consumer_events_actionable_links", "action_type" => "update_links", "collection" => collection.clone()).record(links.len() as f64);
134 for link in &links {
135 counter!("consumer_events_actionable_links",
136 "action_type" => "update_links",
137 "collection" => collection.clone(),
138 "path" => link.path.clone(),
139 "link_type" => link.target.name(),
140 )
141 .increment(links.len() as u64);
142 }
143 Some((
144 ActionableEvent::UpdateLinks {
145 record_id: RecordId {
146 did: did.into(),
147 collection: collection.clone(),
148 rkey: rkey.clone(),
149 },
150 new_links: links,
151 },
152 cursor,
153 ))
154 }
155 JsonValue::String(op) if op == "delete" => {
156 counter!("consumer_events_actionable", "action_type" => "delete_record", "collection" => collection.clone()).increment(1);
157 Some((
158 ActionableEvent::DeleteRecord(RecordId {
159 did: did.into(),
160 collection: collection.clone(),
161 rkey: rkey.clone(),
162 }),
163 cursor,
164 ))
165 }
166 _ => None,
167 }
168 }
169 JsonValue::Object(root)
170 if root.get("kind") == Some(&JsonValue::String("account".to_string())) =>
171 {
172 let JsonValue::Object(account) = root.get("account")? else {
173 return None;
174 };
175 let did = account.get("did")?.get::<String>()?.clone();
176 match (account.get("active")?.get::<bool>()?, account.get("status")) {
177 (true, None) => {
178 counter!("consumer_events_actionable", "action_type" => "account", "action" => "activate").increment(1);
179 Some((ActionableEvent::ActivateAccount(did.into()), cursor))
180 }
181 (false, Some(JsonValue::String(status))) => match status.as_ref() {
182 "deactivated" => {
183 counter!("consumer_events_actionable", "action_type" => "account", "action" => "deactivate").increment(1);
184 Some((ActionableEvent::DeactivateAccount(did.into()), cursor))
185 }
186 "deleted" => {
187 counter!("consumer_events_actionable", "action_type" => "account", "action" => "delete").increment(1);
188 Some((ActionableEvent::DeleteAccount(did.into()), cursor))
189 }
190 // TODO: are we missing handling for suspended and deactivated accounts?
191 _ => None,
192 },
193 _ => None,
194 }
195 }
196 _ => None,
197 }
198}
199
200#[cfg(test)]
201mod tests {
202 use super::*;
203 use links::{CollectedLink, Link};
204
205 #[test]
206 fn test_create_like() {
207 let rec = r#"{
208 "did":"did:plc:icprmty6ticzracr5urz4uum",
209 "time_us":1736448492661668,
210 "kind":"commit",
211 "commit":{"rev":"3lfddpt5qa62c","operation":"create","collection":"app.bsky.feed.like","rkey":"3lfddpt5djw2c","record":{
212 "$type":"app.bsky.feed.like",
213 "createdAt":"2025-01-09T18:48:10.412Z",
214 "subject":{"cid":"bafyreihazf62qvmusup55ojhkzwbmzee6rxtsug3e6eg33mnjrgthxvozu","uri":"at://did:plc:lphckw3dz4mnh3ogmfpdgt6z/app.bsky.feed.post/3lfdau5f7wk23"}
215 },
216 "cid":"bafyreidgcs2id7nsbp6co42ind2wcig3riwcvypwan6xdywyfqklovhdjq"}
217 }"#.parse().unwrap();
218 let action = get_actionable(&rec);
219 assert_eq!(
220 action,
221 Some((
222 ActionableEvent::CreateLinks {
223 record_id: RecordId {
224 did: "did:plc:icprmty6ticzracr5urz4uum".into(),
225 collection: "app.bsky.feed.like".into(),
226 rkey: "3lfddpt5djw2c".into(),
227 },
228 links: vec![CollectedLink {
229 path: ".subject.uri".into(),
230 target: Link::AtUri(
231 "at://did:plc:lphckw3dz4mnh3ogmfpdgt6z/app.bsky.feed.post/3lfdau5f7wk23"
232 .into()
233 )
234 },],
235 },
236 1736448492661668
237 ))
238 )
239 }
240
241 #[test]
242 fn test_update_profile() {
243 let rec = r#"{
244 "did":"did:plc:tcmiubbjtkwhmnwmrvr2eqnx",
245 "time_us":1736453696817289,"kind":"commit",
246 "commit":{
247 "rev":"3lfdikw7q772c",
248 "operation":"update",
249 "collection":"app.bsky.actor.profile",
250 "rkey":"self",
251 "record":{
252 "$type":"app.bsky.actor.profile",
253 "avatar":{"$type":"blob","ref":{"$link":"bafkreidcg5jzz3hpdtlc7um7w5masiugdqicc5fltuajqped7fx66hje54"},"mimeType":"image/jpeg","size":295764},
254 "banner":{"$type":"blob","ref":{"$link":"bafkreiahaswf2yex2zfn3ynpekhw6mfj7254ra7ly27zjk73czghnz2wni"},"mimeType":"image/jpeg","size":856461},
255 "createdAt":"2024-08-30T21:33:06.945Z",
256 "description":"Professor, QUB | Belfast via Derry \\n\\nViews personal | Reposts are not an endorsement\\n\\nhttps://go.qub.ac.uk/charvey",
257 "displayName":"Colin Harvey",
258 "pinnedPost":{"cid":"bafyreifyrepqer22xsqqnqulpcxzpu7wcgeuzk6p5c23zxzctaiwmlro7y","uri":"at://did:plc:tcmiubbjtkwhmnwmrvr2eqnx/app.bsky.feed.post/3lf66ri63u22t"}
259 },
260 "cid":"bafyreiem4j5p7duz67negvqarq3s5h7o45fvytevhrzkkn2p6eqdkcf74m"
261 }
262 }"#.parse().unwrap();
263 let action = get_actionable(&rec);
264 assert_eq!(
265 action,
266 Some((
267 ActionableEvent::UpdateLinks {
268 record_id: RecordId {
269 did: "did:plc:tcmiubbjtkwhmnwmrvr2eqnx".into(),
270 collection: "app.bsky.actor.profile".into(),
271 rkey: "self".into(),
272 },
273 new_links: vec![CollectedLink {
274 path: ".pinnedPost.uri".into(),
275 target: Link::AtUri(
276 "at://did:plc:tcmiubbjtkwhmnwmrvr2eqnx/app.bsky.feed.post/3lf66ri63u22t"
277 .into()
278 ),
279 },],
280 },
281 1736453696817289
282 ))
283 )
284 }
285
286 #[test]
287 fn test_delete_like() {
288 let rec = r#"{
289 "did":"did:plc:3pa2ss4l2sqzhy6wud4btqsj",
290 "time_us":1736448492690783,
291 "kind":"commit",
292 "commit":{"rev":"3lfddpt7vnx24","operation":"delete","collection":"app.bsky.feed.like","rkey":"3lbiu72lczk2w"}
293 }"#.parse().unwrap();
294 let action = get_actionable(&rec);
295 assert_eq!(
296 action,
297 Some((
298 ActionableEvent::DeleteRecord(RecordId {
299 did: "did:plc:3pa2ss4l2sqzhy6wud4btqsj".into(),
300 collection: "app.bsky.feed.like".into(),
301 rkey: "3lbiu72lczk2w".into(),
302 }),
303 1736448492690783
304 ))
305 )
306 }
307
308 #[test]
309 fn test_delete_account() {
310 let rec = r#"{
311 "did":"did:plc:zsgqovouzm2gyksjkqrdodsw",
312 "time_us":1736451739215876,
313 "kind":"account",
314 "account":{"active":false,"did":"did:plc:zsgqovouzm2gyksjkqrdodsw","seq":3040934738,"status":"deleted","time":"2025-01-09T19:42:18.972Z"}
315 }"#.parse().unwrap();
316 let action = get_actionable(&rec);
317 assert_eq!(
318 action,
319 Some((
320 ActionableEvent::DeleteAccount("did:plc:zsgqovouzm2gyksjkqrdodsw".into()),
321 1736451739215876
322 ))
323 )
324 }
325
326 #[test]
327 fn test_deactivate_account() {
328 let rec = r#"{
329 "did":"did:plc:l4jb3hkq7lrblferbywxkiol","time_us":1736451745611273,"kind":"account","account":{"active":false,"did":"did:plc:l4jb3hkq7lrblferbywxkiol","seq":3040939563,"status":"deactivated","time":"2025-01-09T19:42:22.035Z"}
330 }"#.parse().unwrap();
331 let action = get_actionable(&rec);
332 assert_eq!(
333 action,
334 Some((
335 ActionableEvent::DeactivateAccount("did:plc:l4jb3hkq7lrblferbywxkiol".into()),
336 1736451745611273
337 ))
338 )
339 }
340
341 #[test]
342 fn test_activate_account() {
343 let rec = r#"{
344 "did":"did:plc:nct6zfb2j4emoj4yjomxwml2","time_us":1736451747292706,"kind":"account","account":{"active":true,"did":"did:plc:nct6zfb2j4emoj4yjomxwml2","seq":3040940775,"time":"2025-01-09T19:42:26.924Z"}
345 }"#.parse().unwrap();
346 let action = get_actionable(&rec);
347 assert_eq!(
348 action,
349 Some((
350 ActionableEvent::ActivateAccount("did:plc:nct6zfb2j4emoj4yjomxwml2".into()),
351 1736451747292706
352 ))
353 )
354 }
355}