gork.rs
1use std::{
2 str::FromStr,
3 sync::{Arc, Mutex},
4};
5
6use cursor::load_cursor;
7use jacquard::{
8 api::{
9 app_bsky::{self, feed::post::Post},
10 com_atproto::repo::strong_ref::StrongRef,
11 },
12 client::{Agent, AgentSessionExt, MemoryCredentialSession},
13 types::{aturi::AtUri, string::Datetime, value},
14};
15use metrics_exporter_prometheus::PrometheusBuilder;
16use serde_json::Value;
17use tracing::{error, info};
18
19use rocketman::{
20 connection::JetstreamConnection,
21 handler::{self, Ingestors},
22 ingestion::LexiconIngestor,
23 options::JetstreamOptions,
24 types::event::{Commit, Event},
25};
26
27use async_trait::async_trait;
28
29mod cursor;
30mod ingestors;
31
32fn setup_tracing() {
33 tracing_subscriber::fmt()
34 .with_max_level(tracing::Level::INFO)
35 .init();
36}
37
38fn setup_metrics() {
39 // Initialize metrics here
40 if let Err(e) = PrometheusBuilder::new().install() {
41 error!(
42 "Failed to install, program will run without Prometheus exporter: {}",
43 e
44 );
45 }
46}
47
48async fn setup_bsky_sess() -> anyhow::Result<Agent<MemoryCredentialSession>> {
49 let (session, auth) = MemoryCredentialSession::authenticated(
50 std::env::var("ATP_USER")?.into(),
51 std::env::var("ATP_PASSWORD")?.into(),
52 None,
53 )
54 .await?;
55 let agent: Agent<_> = Agent::from(session);
56 info!("logged in as {}", auth.handle);
57
58 Ok(agent)
59}
60
61#[tokio::main]
62async fn main() {
63 dotenvy::dotenv().ok();
64 setup_tracing();
65 setup_metrics();
66 info!("gorkin it...");
67
68 let agent = match setup_bsky_sess().await {
69 Ok(r) => r,
70 Err(e) => panic!("{}", e.to_string()),
71 };
72 // init the builder
73 let opts = JetstreamOptions::builder()
74 // your EXACT nsids
75 .wanted_collections(vec!["app.bsky.feed.post".to_string()])
76 .bound(8 * 8 * 8 * 8 * 8 * 8) // 262144
77 .build();
78 // create the jetstream connector
79 let jetstream = JetstreamConnection::new(opts);
80
81 // create your ingestors
82 let mut ingestors = Ingestors::new();
83
84 ingestors.commits.insert(
85 // your EXACT nsid
86 "app.bsky.feed.post".to_string(),
87 Box::new(MyCoolIngestor::new(agent)),
88 );
89
90 // arc it
91 let ingestors = Arc::new(ingestors);
92
93 let cursor: Arc<Mutex<Option<u64>>> = Arc::new(Mutex::new(load_cursor().await));
94
95 let msg_rx = jetstream.get_msg_rx();
96 let reconnect_tx = jetstream.get_reconnect_tx();
97
98 // spawn 10 tasks to process messages from the queue concurrently
99 for i in 0..10 {
100 let msg_rx_clone = msg_rx.clone();
101 let ingestors_clone = Arc::clone(&ingestors);
102 let reconnect_tx_clone = reconnect_tx.clone();
103 let c_cursor = cursor.clone();
104
105 tokio::spawn(async move {
106 info!("Starting worker thread {}", i);
107 while let Ok(message) = msg_rx_clone.recv_async().await {
108 if let Err(e) = handler::handle_message(
109 message,
110 &ingestors_clone,
111 reconnect_tx_clone.clone(),
112 c_cursor.clone(),
113 )
114 .await
115 {
116 eprintln!("Error processing message in worker {}: {}", i, e);
117 };
118 }
119 });
120 }
121
122 let c_cursor = cursor.clone();
123 tokio::spawn(async move {
124 loop {
125 tokio::time::sleep(std::time::Duration::from_secs(60)).await;
126 let cursor_to_store: Option<u64> = {
127 let cursor_guard = c_cursor.lock().unwrap();
128 *cursor_guard
129 };
130 if let Some(cursor) = cursor_to_store {
131 if let Err(e) = cursor::store_cursor(cursor).await {
132 error!("Error storing cursor: {}", e);
133 }
134 }
135 }
136 });
137
138 // connect to jetstream
139 // retries internally, but may fail if there is an extreme error.
140 if let Err(e) = jetstream.connect(cursor.clone()).await {
141 eprintln!("Failed to connect to Jetstream: {}", e);
142 std::process::exit(1);
143 }
144}
145
146pub struct MyCoolIngestor {
147 agent: Agent<MemoryCredentialSession>,
148}
149
150impl MyCoolIngestor {
151 pub fn new(agent: Agent<MemoryCredentialSession>) -> Self {
152 Self { agent }
153 }
154}
155
156/// A cool ingestor implementation.
157#[async_trait]
158impl LexiconIngestor for MyCoolIngestor {
159 async fn ingest(&self, message: Event<Value>) -> anyhow::Result<()> {
160 if let Some(Commit {
161 record: Some(record),
162 cid: Some(cid),
163 rkey,
164 collection,
165 operation,
166 rev: _,
167 }) = message.commit
168 {
169 let poast: app_bsky::feed::post::Post =
170 value::from_json_value::<app_bsky::feed::post::Post>(record)?;
171
172 if !(poast.text.starts_with("@gork.bluesky.bot")
173 && (poast.text.contains("is this")
174 || poast.text.contains("am i")
175 || poast.text.contains("do you")))
176 {
177 return Ok(());
178 };
179 // set the proper reply stuff to reply to mentioned post
180
181 // get the strongref of the above post
182 let rcid = StrongRef::new()
183 .cid(cid)
184 .uri(AtUri::from_str(&format!(
185 "at://{}/{}/{}",
186 message.did, collection, rkey
187 ))?)
188 .build();
189
190 // get parent CID of above post, else get above post's CID
191 let parent_cid = match poast.reply {
192 Some(reply) => reply.parent,
193 None => todo!(),
194 };
195
196 let post = Post::new()
197 .reply(app_bsky::feed::post::ReplyRef {
198 parent: parent_cid,
199 root: rcid,
200 extra_data: None,
201 })
202 .text("yeh")
203 .created_at(Datetime::now())
204 .build();
205
206 self.agent.create_record(post, None).await?;
207 }
208 Ok(())
209 }
210}