gork.rs
210 lines 6.1 kB view raw
1use std::{ 2 str::FromStr, 3 sync::{Arc, Mutex}, 4}; 5 6use cursor::load_cursor; 7use jacquard::{ 8 api::{ 9 app_bsky::{self, feed::post::Post}, 10 com_atproto::repo::strong_ref::StrongRef, 11 }, 12 client::{Agent, AgentSessionExt, MemoryCredentialSession}, 13 types::{aturi::AtUri, string::Datetime, value}, 14}; 15use metrics_exporter_prometheus::PrometheusBuilder; 16use serde_json::Value; 17use tracing::{error, info}; 18 19use rocketman::{ 20 connection::JetstreamConnection, 21 handler::{self, Ingestors}, 22 ingestion::LexiconIngestor, 23 options::JetstreamOptions, 24 types::event::{Commit, Event}, 25}; 26 27use async_trait::async_trait; 28 29mod cursor; 30mod ingestors; 31 32fn setup_tracing() { 33 tracing_subscriber::fmt() 34 .with_max_level(tracing::Level::INFO) 35 .init(); 36} 37 38fn setup_metrics() { 39 // Initialize metrics here 40 if let Err(e) = PrometheusBuilder::new().install() { 41 error!( 42 "Failed to install, program will run without Prometheus exporter: {}", 43 e 44 ); 45 } 46} 47 48async fn setup_bsky_sess() -> anyhow::Result<Agent<MemoryCredentialSession>> { 49 let (session, auth) = MemoryCredentialSession::authenticated( 50 std::env::var("ATP_USER")?.into(), 51 std::env::var("ATP_PASSWORD")?.into(), 52 None, 53 ) 54 .await?; 55 let agent: Agent<_> = Agent::from(session); 56 info!("logged in as {}", auth.handle); 57 58 Ok(agent) 59} 60 61#[tokio::main] 62async fn main() { 63 dotenvy::dotenv().ok(); 64 setup_tracing(); 65 setup_metrics(); 66 info!("gorkin it..."); 67 68 let agent = match setup_bsky_sess().await { 69 Ok(r) => r, 70 Err(e) => panic!("{}", e.to_string()), 71 }; 72 // init the builder 73 let opts = JetstreamOptions::builder() 74 // your EXACT nsids 75 .wanted_collections(vec!["app.bsky.feed.post".to_string()]) 76 .bound(8 * 8 * 8 * 8 * 8 * 8) // 262144 77 .build(); 78 // create the jetstream connector 79 let jetstream = JetstreamConnection::new(opts); 80 81 // create your ingestors 82 let mut ingestors = Ingestors::new(); 83 84 ingestors.commits.insert( 85 // your EXACT nsid 86 "app.bsky.feed.post".to_string(), 87 Box::new(MyCoolIngestor::new(agent)), 88 ); 89 90 // arc it 91 let ingestors = Arc::new(ingestors); 92 93 let cursor: Arc<Mutex<Option<u64>>> = Arc::new(Mutex::new(load_cursor().await)); 94 95 let msg_rx = jetstream.get_msg_rx(); 96 let reconnect_tx = jetstream.get_reconnect_tx(); 97 98 // spawn 10 tasks to process messages from the queue concurrently 99 for i in 0..10 { 100 let msg_rx_clone = msg_rx.clone(); 101 let ingestors_clone = Arc::clone(&ingestors); 102 let reconnect_tx_clone = reconnect_tx.clone(); 103 let c_cursor = cursor.clone(); 104 105 tokio::spawn(async move { 106 info!("Starting worker thread {}", i); 107 while let Ok(message) = msg_rx_clone.recv_async().await { 108 if let Err(e) = handler::handle_message( 109 message, 110 &ingestors_clone, 111 reconnect_tx_clone.clone(), 112 c_cursor.clone(), 113 ) 114 .await 115 { 116 eprintln!("Error processing message in worker {}: {}", i, e); 117 }; 118 } 119 }); 120 } 121 122 let c_cursor = cursor.clone(); 123 tokio::spawn(async move { 124 loop { 125 tokio::time::sleep(std::time::Duration::from_secs(60)).await; 126 let cursor_to_store: Option<u64> = { 127 let cursor_guard = c_cursor.lock().unwrap(); 128 *cursor_guard 129 }; 130 if let Some(cursor) = cursor_to_store { 131 if let Err(e) = cursor::store_cursor(cursor).await { 132 error!("Error storing cursor: {}", e); 133 } 134 } 135 } 136 }); 137 138 // connect to jetstream 139 // retries internally, but may fail if there is an extreme error. 140 if let Err(e) = jetstream.connect(cursor.clone()).await { 141 eprintln!("Failed to connect to Jetstream: {}", e); 142 std::process::exit(1); 143 } 144} 145 146pub struct MyCoolIngestor { 147 agent: Agent<MemoryCredentialSession>, 148} 149 150impl MyCoolIngestor { 151 pub fn new(agent: Agent<MemoryCredentialSession>) -> Self { 152 Self { agent } 153 } 154} 155 156/// A cool ingestor implementation. 157#[async_trait] 158impl LexiconIngestor for MyCoolIngestor { 159 async fn ingest(&self, message: Event<Value>) -> anyhow::Result<()> { 160 if let Some(Commit { 161 record: Some(record), 162 cid: Some(cid), 163 rkey, 164 collection, 165 operation, 166 rev: _, 167 }) = message.commit 168 { 169 let poast: app_bsky::feed::post::Post = 170 value::from_json_value::<app_bsky::feed::post::Post>(record)?; 171 172 if !(poast.text.starts_with("@gork.bluesky.bot") 173 && (poast.text.contains("is this") 174 || poast.text.contains("am i") 175 || poast.text.contains("do you"))) 176 { 177 return Ok(()); 178 }; 179 // set the proper reply stuff to reply to mentioned post 180 181 // get the strongref of the above post 182 let rcid = StrongRef::new() 183 .cid(cid) 184 .uri(AtUri::from_str(&format!( 185 "at://{}/{}/{}", 186 message.did, collection, rkey 187 ))?) 188 .build(); 189 190 // get parent CID of above post, else get above post's CID 191 let parent_cid = match poast.reply { 192 Some(reply) => reply.parent, 193 None => todo!(), 194 }; 195 196 let post = Post::new() 197 .reply(app_bsky::feed::post::ReplyRef { 198 parent: parent_cid, 199 root: rcid, 200 extra_data: None, 201 }) 202 .text("yeh") 203 .created_at(Datetime::now()) 204 .build(); 205 206 self.agent.create_record(post, None).await?; 207 } 208 Ok(()) 209 } 210}