tracks lexicons and how many times they appeared on the jetstream
1use std::{ops::Deref, sync::Arc};
2
3use smol_str::ToSmolStr;
4#[cfg(not(target_env = "msvc"))]
5use tikv_jemallocator::Jemalloc;
6use tokio_util::sync::CancellationToken;
7use tracing::Level;
8use tracing_subscriber::EnvFilter;
9
10use crate::{
11 api::serve,
12 db::{Db, EventRecord},
13 error::AppError,
14 jetstream::JetstreamClient,
15};
16
17mod api;
18mod db;
19mod error;
20mod jetstream;
21mod utils;
22
23#[cfg(not(target_env = "msvc"))]
24#[global_allocator]
25static GLOBAL: Jemalloc = Jemalloc;
26
27#[tokio::main]
28async fn main() {
29 tracing_subscriber::fmt::fmt()
30 .with_env_filter(
31 EnvFilter::builder()
32 .with_default_directive(Level::INFO.into())
33 .from_env_lossy(),
34 )
35 .compact()
36 .init();
37
38 match std::env::args().nth(1).as_deref() {
39 Some("compact") => {
40 compact();
41 return;
42 }
43 Some("debug") => {
44 debug();
45 return;
46 }
47 Some(x) => {
48 tracing::error!("unknown command: {}", x);
49 return;
50 }
51 None => {}
52 }
53
54 let db = Arc::new(Db::new(".fjall_data").expect("couldnt create db"));
55
56 rustls::crypto::ring::default_provider()
57 .install_default()
58 .expect("cant install rustls crypto provider");
59
60 let mut jetstream =
61 match JetstreamClient::new("wss://jetstream2.us-west.bsky.network/subscribe") {
62 Ok(client) => client,
63 Err(err) => {
64 tracing::error!("can't create jetstream client: {err}");
65 return;
66 }
67 };
68
69 let cancel_token = CancellationToken::new();
70
71 let consume_events = tokio::spawn({
72 let consume_cancel = cancel_token.child_token();
73 let db = db.clone();
74 async move {
75 jetstream.connect().await?;
76 loop {
77 tokio::select! {
78 maybe_event = jetstream.read(consume_cancel.child_token()) => match maybe_event {
79 Ok(event) => {
80 let Some(record) = EventRecord::from_jetstream(event) else {
81 continue;
82 };
83 let db = db.clone();
84 tokio::task::spawn_blocking(move || {
85 if let Err(err) = db.record_event(record) {
86 tracing::error!("failed to record event: {}", err);
87 }
88 });
89 }
90 Err(err) => return Err(err),
91 },
92 _ = consume_cancel.cancelled() => break Ok(()),
93 }
94 }
95 }
96 });
97
98 std::thread::spawn({
99 let db = db.clone();
100 move || {
101 loop {
102 match db.sync(false) {
103 Ok(_) => (),
104 Err(e) => tracing::error!("failed to sync db: {}", e),
105 }
106 std::thread::sleep(std::time::Duration::from_secs(1));
107 }
108 }
109 });
110
111 tokio::select! {
112 res = serve(db.clone(), cancel_token.child_token()) => {
113 if let Err(e) = res {
114 tracing::error!("serve failed: {}", e);
115 }
116 }
117 res = consume_events => {
118 let err =
119 res
120 .map_err(AppError::from)
121 .and_then(std::convert::identity)
122 .expect_err("consume events cant return ok");
123 tracing::error!("consume events failed: {}", err);
124 },
125 _ = tokio::signal::ctrl_c() => {
126 tracing::info!("received ctrl+c!");
127 cancel_token.cancel();
128 }
129 }
130
131 tracing::info!("shutting down...");
132 db.sync(true).expect("couldnt sync db");
133}
134
135fn debug() {
136 let db = Db::new(".fjall_data").expect("couldnt create db");
137 for nsid in db.get_nsids() {
138 let nsid = nsid.deref();
139 for hit in db.get_hits(nsid, ..) {
140 let hit = hit.expect("cant read event");
141 println!("{nsid} {}", hit.timestamp);
142 }
143 }
144}
145
146fn compact() {
147 let from = Arc::new(Db::new(".fjall_data_from").expect("couldnt create db"));
148 let to = Arc::new(Db::new(".fjall_data_to").expect("couldnt create db"));
149
150 let mut threads = Vec::new();
151 for nsid in from.get_nsids() {
152 let from = from.clone();
153 let to = to.clone();
154 threads.push(std::thread::spawn(move || {
155 tracing::info!("migrating {} ...", nsid.deref());
156 let mut count = 0_u64;
157 for hit in from.get_hits(&nsid, ..) {
158 let hit = hit.expect("cant read event");
159 let data = hit.access();
160 to.record_event(EventRecord {
161 nsid: nsid.to_smolstr(),
162 timestamp: hit.timestamp,
163 deleted: data.deleted,
164 })
165 .expect("cant record event");
166 count += 1;
167 }
168 count
169 }));
170 }
171
172 let mut total_count = 0_u64;
173 for thread in threads {
174 total_count += thread.join().expect("thread panicked");
175 }
176 to.sync(true).expect("cant sync");
177 tracing::info!("migrated {total_count} events!");
178}