Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1mod jetstream; 2mod jsonl_file; 3 4use crate::storage::LinkStorage; 5use crate::{ActionableEvent, RecordId}; 6use anyhow::Result; 7use jetstream::consume_jetstream; 8use jsonl_file::consume_jsonl_file; 9use links::collect_links; 10use metrics::{counter, describe_counter, describe_histogram, histogram, Unit}; 11use std::path::PathBuf; 12use std::sync::atomic::{AtomicU32, Ordering}; 13use std::sync::Arc; 14use std::thread; 15use tinyjson::JsonValue; 16use tokio_util::sync::CancellationToken; 17 18pub fn consume( 19 mut store: impl LinkStorage, 20 qsize: Arc<AtomicU32>, 21 fixture: Option<PathBuf>, 22 stream: String, 23 staying_alive: CancellationToken, 24) -> Result<()> { 25 describe_counter!( 26 "consumer_events_non_actionable", 27 Unit::Count, 28 "count of non-actionable events" 29 ); 30 describe_counter!( 31 "consumer_events_actionable", 32 Unit::Count, 33 "count of action by type. *all* atproto record delete events are included" 34 ); 35 describe_counter!( 36 "consumer_events_actionable_links", 37 Unit::Count, 38 "total links encountered" 39 ); 40 describe_histogram!( 41 "consumer_events_actionable_links", 42 Unit::Count, 43 "number of links per message" 44 ); 45 46 let (receiver, consumer_handle) = if let Some(f) = fixture { 47 let (sender, receiver) = flume::bounded(21); 48 ( 49 receiver, 50 thread::spawn(move || consume_jsonl_file(f, sender)), 51 ) 52 } else { 53 let (sender, receiver) = flume::bounded(32_768); // eek 54 let cursor = store.get_cursor().unwrap(); 55 ( 56 receiver, 57 thread::spawn(move || consume_jetstream(sender, cursor, stream, staying_alive)), 58 ) 59 }; 60 61 for update in receiver.iter() { 62 if let Some((action, ts)) = get_actionable(&update) { 63 { 64 store.push(&action, ts).unwrap(); 65 qsize.store(receiver.len().try_into().unwrap(), Ordering::Relaxed); 66 } 67 } else { 68 counter!("consumer_events_non_actionable").increment(1); 69 } 70 } 71 72 consumer_handle.join().unwrap() 73} 74 75pub fn get_actionable(event: &JsonValue) -> Option<(ActionableEvent, u64)> { 76 let JsonValue::Object(root) = event else { 77 return None; 78 }; 79 let JsonValue::Number(time_us) = root.get("time_us")? else { 80 return None; 81 }; 82 let cursor = *time_us as u64; 83 // todo: clean up 84 match event { 85 JsonValue::Object(root) 86 if root.get("kind") == Some(&JsonValue::String("commit".to_string())) => 87 { 88 let JsonValue::String(did) = root.get("did")? else { 89 return None; 90 }; 91 let JsonValue::Object(commit) = root.get("commit")? else { 92 return None; 93 }; 94 let JsonValue::String(collection) = commit.get("collection")? else { 95 return None; 96 }; 97 let JsonValue::String(rkey) = commit.get("rkey")? else { 98 return None; 99 }; 100 match commit.get("operation")? { 101 JsonValue::String(op) if op == "create" => { 102 let links = collect_links(commit.get("record")?); 103 counter!("consumer_events_actionable", "action_type" => "create_links", "collection" => collection.clone()).increment(1); 104 histogram!("consumer_events_actionable_links", "action_type" => "create_links", "collection" => collection.clone()).record(links.len() as f64); 105 for link in &links { 106 counter!("consumer_events_actionable_links", 107 "action_type" => "create_links", 108 "collection" => collection.clone(), 109 "path" => link.path.clone(), 110 "link_type" => link.target.name(), 111 ) 112 .increment(links.len() as u64); 113 } 114 if links.is_empty() { 115 None 116 } else { 117 Some(( 118 ActionableEvent::CreateLinks { 119 record_id: RecordId { 120 did: did.into(), 121 collection: collection.clone(), 122 rkey: rkey.clone(), 123 }, 124 links, 125 }, 126 cursor, 127 )) 128 } 129 } 130 JsonValue::String(op) if op == "update" => { 131 let links = collect_links(commit.get("record")?); 132 counter!("consumer_events_actionable", "action_type" => "update_links", "collection" => collection.clone()).increment(1); 133 histogram!("consumer_events_actionable_links", "action_type" => "update_links", "collection" => collection.clone()).record(links.len() as f64); 134 for link in &links { 135 counter!("consumer_events_actionable_links", 136 "action_type" => "update_links", 137 "collection" => collection.clone(), 138 "path" => link.path.clone(), 139 "link_type" => link.target.name(), 140 ) 141 .increment(links.len() as u64); 142 } 143 Some(( 144 ActionableEvent::UpdateLinks { 145 record_id: RecordId { 146 did: did.into(), 147 collection: collection.clone(), 148 rkey: rkey.clone(), 149 }, 150 new_links: links, 151 }, 152 cursor, 153 )) 154 } 155 JsonValue::String(op) if op == "delete" => { 156 counter!("consumer_events_actionable", "action_type" => "delete_record", "collection" => collection.clone()).increment(1); 157 Some(( 158 ActionableEvent::DeleteRecord(RecordId { 159 did: did.into(), 160 collection: collection.clone(), 161 rkey: rkey.clone(), 162 }), 163 cursor, 164 )) 165 } 166 _ => None, 167 } 168 } 169 JsonValue::Object(root) 170 if root.get("kind") == Some(&JsonValue::String("account".to_string())) => 171 { 172 let JsonValue::Object(account) = root.get("account")? else { 173 return None; 174 }; 175 let did = account.get("did")?.get::<String>()?.clone(); 176 match (account.get("active")?.get::<bool>()?, account.get("status")) { 177 (true, None) => { 178 counter!("consumer_events_actionable", "action_type" => "account", "action" => "activate").increment(1); 179 Some((ActionableEvent::ActivateAccount(did.into()), cursor)) 180 } 181 (false, Some(JsonValue::String(status))) => match status.as_ref() { 182 "deactivated" => { 183 counter!("consumer_events_actionable", "action_type" => "account", "action" => "deactivate").increment(1); 184 Some((ActionableEvent::DeactivateAccount(did.into()), cursor)) 185 } 186 "deleted" => { 187 counter!("consumer_events_actionable", "action_type" => "account", "action" => "delete").increment(1); 188 Some((ActionableEvent::DeleteAccount(did.into()), cursor)) 189 } 190 // TODO: are we missing handling for suspended and deactivated accounts? 191 _ => None, 192 }, 193 _ => None, 194 } 195 } 196 _ => None, 197 } 198} 199 200#[cfg(test)] 201mod tests { 202 use super::*; 203 use links::{CollectedLink, Link}; 204 205 #[test] 206 fn test_create_like() { 207 let rec = r#"{ 208 "did":"did:plc:icprmty6ticzracr5urz4uum", 209 "time_us":1736448492661668, 210 "kind":"commit", 211 "commit":{"rev":"3lfddpt5qa62c","operation":"create","collection":"app.bsky.feed.like","rkey":"3lfddpt5djw2c","record":{ 212 "$type":"app.bsky.feed.like", 213 "createdAt":"2025-01-09T18:48:10.412Z", 214 "subject":{"cid":"bafyreihazf62qvmusup55ojhkzwbmzee6rxtsug3e6eg33mnjrgthxvozu","uri":"at://did:plc:lphckw3dz4mnh3ogmfpdgt6z/app.bsky.feed.post/3lfdau5f7wk23"} 215 }, 216 "cid":"bafyreidgcs2id7nsbp6co42ind2wcig3riwcvypwan6xdywyfqklovhdjq"} 217 }"#.parse().unwrap(); 218 let action = get_actionable(&rec); 219 assert_eq!( 220 action, 221 Some(( 222 ActionableEvent::CreateLinks { 223 record_id: RecordId { 224 did: "did:plc:icprmty6ticzracr5urz4uum".into(), 225 collection: "app.bsky.feed.like".into(), 226 rkey: "3lfddpt5djw2c".into(), 227 }, 228 links: vec![CollectedLink { 229 path: ".subject.uri".into(), 230 target: Link::AtUri( 231 "at://did:plc:lphckw3dz4mnh3ogmfpdgt6z/app.bsky.feed.post/3lfdau5f7wk23" 232 .into() 233 ) 234 },], 235 }, 236 1736448492661668 237 )) 238 ) 239 } 240 241 #[test] 242 fn test_update_profile() { 243 let rec = r#"{ 244 "did":"did:plc:tcmiubbjtkwhmnwmrvr2eqnx", 245 "time_us":1736453696817289,"kind":"commit", 246 "commit":{ 247 "rev":"3lfdikw7q772c", 248 "operation":"update", 249 "collection":"app.bsky.actor.profile", 250 "rkey":"self", 251 "record":{ 252 "$type":"app.bsky.actor.profile", 253 "avatar":{"$type":"blob","ref":{"$link":"bafkreidcg5jzz3hpdtlc7um7w5masiugdqicc5fltuajqped7fx66hje54"},"mimeType":"image/jpeg","size":295764}, 254 "banner":{"$type":"blob","ref":{"$link":"bafkreiahaswf2yex2zfn3ynpekhw6mfj7254ra7ly27zjk73czghnz2wni"},"mimeType":"image/jpeg","size":856461}, 255 "createdAt":"2024-08-30T21:33:06.945Z", 256 "description":"Professor, QUB | Belfast via Derry \\n\\nViews personal | Reposts are not an endorsement\\n\\nhttps://go.qub.ac.uk/charvey", 257 "displayName":"Colin Harvey", 258 "pinnedPost":{"cid":"bafyreifyrepqer22xsqqnqulpcxzpu7wcgeuzk6p5c23zxzctaiwmlro7y","uri":"at://did:plc:tcmiubbjtkwhmnwmrvr2eqnx/app.bsky.feed.post/3lf66ri63u22t"} 259 }, 260 "cid":"bafyreiem4j5p7duz67negvqarq3s5h7o45fvytevhrzkkn2p6eqdkcf74m" 261 } 262 }"#.parse().unwrap(); 263 let action = get_actionable(&rec); 264 assert_eq!( 265 action, 266 Some(( 267 ActionableEvent::UpdateLinks { 268 record_id: RecordId { 269 did: "did:plc:tcmiubbjtkwhmnwmrvr2eqnx".into(), 270 collection: "app.bsky.actor.profile".into(), 271 rkey: "self".into(), 272 }, 273 new_links: vec![CollectedLink { 274 path: ".pinnedPost.uri".into(), 275 target: Link::AtUri( 276 "at://did:plc:tcmiubbjtkwhmnwmrvr2eqnx/app.bsky.feed.post/3lf66ri63u22t" 277 .into() 278 ), 279 },], 280 }, 281 1736453696817289 282 )) 283 ) 284 } 285 286 #[test] 287 fn test_delete_like() { 288 let rec = r#"{ 289 "did":"did:plc:3pa2ss4l2sqzhy6wud4btqsj", 290 "time_us":1736448492690783, 291 "kind":"commit", 292 "commit":{"rev":"3lfddpt7vnx24","operation":"delete","collection":"app.bsky.feed.like","rkey":"3lbiu72lczk2w"} 293 }"#.parse().unwrap(); 294 let action = get_actionable(&rec); 295 assert_eq!( 296 action, 297 Some(( 298 ActionableEvent::DeleteRecord(RecordId { 299 did: "did:plc:3pa2ss4l2sqzhy6wud4btqsj".into(), 300 collection: "app.bsky.feed.like".into(), 301 rkey: "3lbiu72lczk2w".into(), 302 }), 303 1736448492690783 304 )) 305 ) 306 } 307 308 #[test] 309 fn test_delete_account() { 310 let rec = r#"{ 311 "did":"did:plc:zsgqovouzm2gyksjkqrdodsw", 312 "time_us":1736451739215876, 313 "kind":"account", 314 "account":{"active":false,"did":"did:plc:zsgqovouzm2gyksjkqrdodsw","seq":3040934738,"status":"deleted","time":"2025-01-09T19:42:18.972Z"} 315 }"#.parse().unwrap(); 316 let action = get_actionable(&rec); 317 assert_eq!( 318 action, 319 Some(( 320 ActionableEvent::DeleteAccount("did:plc:zsgqovouzm2gyksjkqrdodsw".into()), 321 1736451739215876 322 )) 323 ) 324 } 325 326 #[test] 327 fn test_deactivate_account() { 328 let rec = r#"{ 329 "did":"did:plc:l4jb3hkq7lrblferbywxkiol","time_us":1736451745611273,"kind":"account","account":{"active":false,"did":"did:plc:l4jb3hkq7lrblferbywxkiol","seq":3040939563,"status":"deactivated","time":"2025-01-09T19:42:22.035Z"} 330 }"#.parse().unwrap(); 331 let action = get_actionable(&rec); 332 assert_eq!( 333 action, 334 Some(( 335 ActionableEvent::DeactivateAccount("did:plc:l4jb3hkq7lrblferbywxkiol".into()), 336 1736451745611273 337 )) 338 ) 339 } 340 341 #[test] 342 fn test_activate_account() { 343 let rec = r#"{ 344 "did":"did:plc:nct6zfb2j4emoj4yjomxwml2","time_us":1736451747292706,"kind":"account","account":{"active":true,"did":"did:plc:nct6zfb2j4emoj4yjomxwml2","seq":3040940775,"time":"2025-01-09T19:42:26.924Z"} 345 }"#.parse().unwrap(); 346 let action = get_actionable(&rec); 347 assert_eq!( 348 action, 349 Some(( 350 ActionableEvent::ActivateAccount("did:plc:nct6zfb2j4emoj4yjomxwml2".into()), 351 1736451747292706 352 )) 353 ) 354 } 355}