Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1//! A very basic example of how to listen for create/delete events on a specific DID and NSID. 2 3use atrium_api::{ 4 record::KnownRecord::AppBskyFeedPost, 5 types::string, 6}; 7use clap::Parser; 8use jetstream::{ 9 events::{ 10 CommitEvent, 11 CommitOp, 12 EventKind, 13 JetstreamEvent, 14 }, 15 DefaultJetstreamEndpoints, 16 JetstreamCompression, 17 JetstreamConfig, 18 JetstreamConnector, 19}; 20 21#[derive(Parser, Debug)] 22#[command(version, about, long_about = None)] 23struct Args { 24 /// The DIDs to listen for events on, if not provided we will listen for all DIDs. 25 #[arg(short, long)] 26 did: Option<Vec<string::Did>>, 27} 28 29#[tokio::main] 30async fn main() -> anyhow::Result<()> { 31 let args = Args::parse(); 32 33 let dids = args.did.unwrap_or_default(); 34 let config = JetstreamConfig { 35 endpoint: DefaultJetstreamEndpoints::USEastOne.into(), 36 wanted_collections: vec![string::Nsid::new("app.bsky.feed.post".to_string()).unwrap()], 37 wanted_dids: dids.clone(), 38 compression: JetstreamCompression::Zstd, 39 ..Default::default() 40 }; 41 42 let jetstream = JetstreamConnector::new(config)?; 43 let mut receiver = jetstream.connect().await?; 44 45 println!("Listening for 'app.bsky.feed.post' events on DIDs: {dids:?}"); 46 47 while let Some(event) = receiver.recv().await { 48 if let JetstreamEvent { 49 kind: EventKind::Commit, 50 commit: 51 Some(CommitEvent { 52 operation: CommitOp::Create, 53 rkey, 54 record: Some(record), 55 .. 56 }), 57 .. 58 } = event 59 { 60 if let Ok(AppBskyFeedPost(rec)) = serde_json::from_str(record.get()) { 61 println!("New post created! ({})\n{:?}\n", rkey.as_str(), rec.text); 62 } 63 } 64 } 65 66 Ok(()) 67}