Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
at main 2.1 kB view raw
1//! An example of how to listen for create/delete events on a specific DID and potentialy unknown 2//! NSID 3 4use atrium_api::types::string; 5use clap::Parser; 6use jetstream::{ 7 events::{ 8 CommitOp, 9 EventKind, 10 JetstreamEvent, 11 }, 12 DefaultJetstreamEndpoints, 13 JetstreamCompression, 14 JetstreamConfig, 15 JetstreamConnector, 16}; 17 18#[derive(Parser, Debug)] 19#[command(version, about, long_about = None)] 20struct Args { 21 /// The DIDs to listen for events on, if not provided we will listen for all DIDs. 22 #[arg(short, long)] 23 did: Option<Vec<string::Did>>, 24 /// The NSID for the collection to listen for (e.g. `blue.flashes.feed.post`). 25 #[arg(short, long)] 26 nsid: string::Nsid, 27} 28 29#[tokio::main] 30async fn main() -> anyhow::Result<()> { 31 let args = Args::parse(); 32 33 let dids = args.did.unwrap_or_default(); 34 let config: JetstreamConfig = JetstreamConfig { 35 endpoint: DefaultJetstreamEndpoints::USEastOne.into(), 36 wanted_collections: vec![args.nsid.clone()], 37 wanted_dids: dids.clone(), 38 compression: JetstreamCompression::Zstd, 39 ..Default::default() 40 }; 41 42 let jetstream = JetstreamConnector::new(config)?; 43 let mut receiver = jetstream.connect().await?; 44 45 println!( 46 "Listening for new and updated '{}' events on DIDs: {:?}", 47 args.nsid.as_str(), 48 dids 49 ); 50 51 while let Some(event) = receiver.recv().await { 52 if let JetstreamEvent { 53 kind: EventKind::Commit, 54 commit: Some(commit), 55 .. 56 } = event 57 { 58 if commit.collection != args.nsid { 59 continue; 60 } 61 if !(commit.operation == CommitOp::Create || commit.operation == CommitOp::Update) { 62 continue; 63 } 64 let Some(rec) = commit.record else { continue }; 65 println!( 66 "New or updated record! ({})\n{:?}\n", 67 commit.rkey.as_str(), 68 rec.get() 69 ); 70 } 71 } 72 73 Ok(()) 74}