forked from
microcosm.blue/microcosm-rs
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1//! An example of how to listen for create/delete events on a specific DID and potentialy unknown
2//! NSID
3
4use atrium_api::types::string;
5use clap::Parser;
6use jetstream::{
7 events::{
8 CommitOp,
9 EventKind,
10 JetstreamEvent,
11 },
12 DefaultJetstreamEndpoints,
13 JetstreamCompression,
14 JetstreamConfig,
15 JetstreamConnector,
16};
17
18#[derive(Parser, Debug)]
19#[command(version, about, long_about = None)]
20struct Args {
21 /// The DIDs to listen for events on, if not provided we will listen for all DIDs.
22 #[arg(short, long)]
23 did: Option<Vec<string::Did>>,
24 /// The NSID for the collection to listen for (e.g. `blue.flashes.feed.post`).
25 #[arg(short, long)]
26 nsid: string::Nsid,
27}
28
29#[tokio::main]
30async fn main() -> anyhow::Result<()> {
31 let args = Args::parse();
32
33 let dids = args.did.unwrap_or_default();
34 let config: JetstreamConfig = JetstreamConfig {
35 endpoint: DefaultJetstreamEndpoints::USEastOne.into(),
36 wanted_collections: vec![args.nsid.clone()],
37 wanted_dids: dids.clone(),
38 compression: JetstreamCompression::Zstd,
39 ..Default::default()
40 };
41
42 let jetstream = JetstreamConnector::new(config)?;
43 let mut receiver = jetstream.connect().await?;
44
45 println!(
46 "Listening for new and updated '{}' events on DIDs: {:?}",
47 args.nsid.as_str(),
48 dids
49 );
50
51 while let Some(event) = receiver.recv().await {
52 if let JetstreamEvent {
53 kind: EventKind::Commit,
54 commit: Some(commit),
55 ..
56 } = event
57 {
58 if commit.collection != args.nsid {
59 continue;
60 }
61 if !(commit.operation == CommitOp::Create || commit.operation == CommitOp::Update) {
62 continue;
63 }
64 let Some(rec) = commit.record else { continue };
65 println!(
66 "New or updated record! ({})\n{:?}\n",
67 commit.rkey.as_str(),
68 rec.get()
69 );
70 }
71 }
72
73 Ok(())
74}