forked from
microcosm.blue/microcosm-rs
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1//! A very basic example of how to listen for create/delete events on a specific DID and NSID.
2
3use atrium_api::{
4 record::KnownRecord::AppBskyFeedPost,
5 types::string,
6};
7use clap::Parser;
8use jetstream::{
9 events::{
10 CommitEvent,
11 CommitOp,
12 EventKind,
13 JetstreamEvent,
14 },
15 DefaultJetstreamEndpoints,
16 JetstreamCompression,
17 JetstreamConfig,
18 JetstreamConnector,
19};
20
21#[derive(Parser, Debug)]
22#[command(version, about, long_about = None)]
23struct Args {
24 /// The DIDs to listen for events on, if not provided we will listen for all DIDs.
25 #[arg(short, long)]
26 did: Option<Vec<string::Did>>,
27}
28
29#[tokio::main]
30async fn main() -> anyhow::Result<()> {
31 let args = Args::parse();
32
33 let dids = args.did.unwrap_or_default();
34 let config = JetstreamConfig {
35 endpoint: DefaultJetstreamEndpoints::USEastOne.into(),
36 wanted_collections: vec![string::Nsid::new("app.bsky.feed.post".to_string()).unwrap()],
37 wanted_dids: dids.clone(),
38 compression: JetstreamCompression::Zstd,
39 ..Default::default()
40 };
41
42 let jetstream = JetstreamConnector::new(config)?;
43 let mut receiver = jetstream.connect().await?;
44
45 println!("Listening for 'app.bsky.feed.post' events on DIDs: {dids:?}");
46
47 while let Some(event) = receiver.recv().await {
48 if let JetstreamEvent {
49 kind: EventKind::Commit,
50 commit:
51 Some(CommitEvent {
52 operation: CommitOp::Create,
53 rkey,
54 record: Some(record),
55 ..
56 }),
57 ..
58 } = event
59 {
60 if let Ok(AppBskyFeedPost(rec)) = serde_json::from_str(record.get()) {
61 println!("New post created! ({})\n{:?}\n", rkey.as_str(), rec.text);
62 }
63 }
64 }
65
66 Ok(())
67}