tracks lexicons and how many times they appeared on the jetstream
1use std::{ops::Deref, time::Duration, u64, usize};
2
3use itertools::Itertools;
4use rclite::Arc;
5use smol_str::ToSmolStr;
6use tokio_util::sync::CancellationToken;
7use tracing::Level;
8use tracing_subscriber::EnvFilter;
9
10use crate::{
11 api::serve,
12 db::{Db, DbConfig, EventRecord},
13 error::AppError,
14 jetstream::JetstreamClient,
15 utils::{CLOCK, RelativeDateTime, get_time},
16};
17
18mod api;
19mod db;
20mod error;
21mod jetstream;
22mod utils;
23
24#[cfg(not(target_env = "msvc"))]
25#[global_allocator]
26static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
27
28#[tokio::main]
29async fn main() {
30 tracing_subscriber::fmt::fmt()
31 .with_env_filter(
32 EnvFilter::builder()
33 .with_default_directive(Level::INFO.into())
34 .from_env_lossy(),
35 )
36 .compact()
37 .init();
38
39 match std::env::args().nth(1).as_deref() {
40 Some("compact") => {
41 compact();
42 return;
43 }
44 Some("migrate") => {
45 migrate();
46 return;
47 }
48 Some("debug") => {
49 debug();
50 return;
51 }
52 Some("print") => {
53 print_all();
54 return;
55 }
56 Some(x) => {
57 tracing::error!("unknown command: {}", x);
58 return;
59 }
60 None => {}
61 }
62
63 let cancel_token = CancellationToken::new();
64
65 let db = Arc::new(
66 Db::new(DbConfig::default(), cancel_token.child_token()).expect("couldnt create db"),
67 );
68
69 rustls::crypto::ring::default_provider()
70 .install_default()
71 .expect("cant install rustls crypto provider");
72
73 let urls = [
74 "wss://jetstream1.us-west.bsky.network/subscribe",
75 "wss://jetstream2.us-west.bsky.network/subscribe",
76 "wss://jetstream2.fr.hose.cam/subscribe",
77 "wss://jetstream.fire.hose.cam/subscribe",
78 ];
79 let mut jetstream = match JetstreamClient::new(urls) {
80 Ok(client) => client,
81 Err(err) => {
82 tracing::error!("can't create jetstream client: {err}");
83 return;
84 }
85 };
86
87 let (event_tx, mut event_rx) = tokio::sync::mpsc::channel(1000);
88 let consume_events = tokio::spawn({
89 let consume_cancel = cancel_token.child_token();
90 async move {
91 jetstream.connect().await?;
92 loop {
93 tokio::select! {
94 maybe_event = jetstream.read(consume_cancel.child_token()) => match maybe_event {
95 Ok(event) => {
96 let Some(record) = EventRecord::from_jetstream(event) else {
97 continue;
98 };
99 event_tx.send(record).await?;
100 }
101 Err(err) => return Err(err),
102 },
103 _ = consume_cancel.cancelled() => break Ok(()),
104 }
105 }
106 }
107 });
108
109 let ingest_events = std::thread::spawn({
110 let db = db.clone();
111 move || {
112 let mut buffer = Vec::new();
113 loop {
114 let read = event_rx.blocking_recv_many(&mut buffer, 500);
115 if let Err(err) = db.ingest_events(buffer.drain(..)) {
116 tracing::error!("failed to ingest events: {}", err);
117 }
118 if read == 0 || db.is_shutting_down() {
119 break;
120 }
121 }
122 }
123 });
124
125 let db_task = tokio::task::spawn({
126 let db = db.clone();
127 async move {
128 let sync_period = Duration::from_secs(10);
129 let mut sync_interval = tokio::time::interval(sync_period);
130 sync_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
131
132 let compact_period = std::time::Duration::from_secs(60 * 30); // 30 mins
133 let mut compact_interval = tokio::time::interval(compact_period);
134 compact_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
135
136 loop {
137 let sync_db = async || {
138 tokio::task::spawn_blocking({
139 let db = db.clone();
140 move || {
141 if db.is_shutting_down() {
142 return;
143 }
144 match db.sync(false) {
145 Ok(_) => (),
146 Err(e) => tracing::error!("failed to sync db: {}", e),
147 }
148 }
149 })
150 .await
151 .unwrap();
152 };
153 let compact_db = async || {
154 tokio::task::spawn_blocking({
155 let db = db.clone();
156 move || {
157 if db.is_shutting_down() {
158 return;
159 }
160 let end = get_time();
161 let start = end - compact_period;
162 let range = start.as_secs()..end.as_secs();
163 tracing::info!(
164 {
165 start = %RelativeDateTime::from_now(start),
166 end = %RelativeDateTime::from_now(end),
167 },
168 "running compaction...",
169 );
170 match db.compact_all(db.cfg.max_block_size, range, false) {
171 Ok(_) => (),
172 Err(e) => tracing::error!("failed to compact db: {}", e),
173 }
174 }
175 })
176 .await
177 .unwrap();
178 };
179 tokio::select! {
180 _ = sync_interval.tick() => sync_db().await,
181 _ = compact_interval.tick() => compact_db().await,
182 _ = db.shutting_down() => break,
183 }
184 }
185 }
186 });
187
188 tokio::select! {
189 res = serve(db.clone(), cancel_token.child_token()) => {
190 if let Err(e) = res {
191 tracing::error!("serve failed: {}", e);
192 }
193 }
194 res = consume_events => {
195 let err =
196 res
197 .map_err(AppError::from)
198 .and_then(std::convert::identity)
199 .expect_err("consume events cant return ok");
200 tracing::error!("consume events failed: {}", err);
201 },
202 _ = tokio::signal::ctrl_c() => {
203 tracing::info!("received ctrl+c!");
204 cancel_token.cancel();
205 }
206 }
207
208 tracing::info!("shutting down...");
209 cancel_token.cancel();
210 ingest_events.join().expect("failed to join ingest events");
211 db_task.await.expect("cant join db task");
212 db.sync(true).expect("cant sync db");
213}
214
215fn print_all() {
216 let db = Db::new(DbConfig::default(), CancellationToken::new()).expect("couldnt create db");
217 let nsids = db.get_nsids().collect::<Vec<_>>();
218 let mut count = 0_usize;
219 for nsid in nsids {
220 println!("{}:", nsid.deref());
221 for hit in db.get_hits(&nsid, .., usize::MAX) {
222 let hit = hit.expect("aaa");
223 println!("{} {}", hit.timestamp, hit.deser().unwrap().deleted);
224 count += 1;
225 }
226 }
227 println!("total hits: {}", count);
228}
229
230fn debug() {
231 let db = Db::new(DbConfig::default(), CancellationToken::new()).expect("couldnt create db");
232 let info = db.info().expect("cant get db info");
233 println!("disk size: {}", info.disk_size);
234 for (nsid, blocks) in info.nsids {
235 print!("{nsid}:");
236 let mut last_size = 0;
237 let mut same_size_count = 0;
238 for item_count in blocks {
239 if item_count == last_size {
240 same_size_count += 1;
241 } else {
242 if same_size_count > 1 {
243 print!("x{}", same_size_count);
244 }
245 print!(" {item_count}");
246 same_size_count = 0;
247 }
248 last_size = item_count;
249 }
250 print!("\n");
251 }
252}
253
254fn compact() {
255 let db = Db::new(
256 DbConfig::default().ks(|c| {
257 c.max_journaling_size(u64::MAX)
258 .max_write_buffer_size(u64::MAX)
259 }),
260 CancellationToken::new(),
261 )
262 .expect("couldnt create db");
263 let info = db.info().expect("cant get db info");
264 db.major_compact().expect("cant compact");
265 std::thread::sleep(Duration::from_secs(5));
266 let compacted_info = db.info().expect("cant get db info");
267 println!(
268 "disk size: {} -> {}",
269 info.disk_size, compacted_info.disk_size
270 );
271 for (nsid, blocks) in info.nsids {
272 println!(
273 "{nsid}: {} -> {}",
274 blocks.len(),
275 compacted_info.nsids[&nsid].len()
276 )
277 }
278}
279
280fn migrate() {
281 let cancel_token = CancellationToken::new();
282 let from = Arc::new(
283 Db::new(
284 DbConfig::default().path(".fjall_data_from"),
285 cancel_token.child_token(),
286 )
287 .expect("couldnt create db"),
288 );
289 let to = Arc::new(
290 Db::new(
291 DbConfig::default().path(".fjall_data_to").ks(|c| {
292 c.max_journaling_size(u64::MAX)
293 .max_write_buffer_size(u64::MAX)
294 .compaction_workers(rayon::current_num_threads() * 4)
295 .flush_workers(rayon::current_num_threads() * 4)
296 }),
297 cancel_token.child_token(),
298 )
299 .expect("couldnt create db"),
300 );
301
302 let nsids = from.get_nsids().collect::<Vec<_>>();
303 let _eps_thread = std::thread::spawn({
304 let to = to.clone();
305 move || {
306 loop {
307 std::thread::sleep(Duration::from_secs(3));
308 let eps = to.eps();
309 if eps > 0 {
310 tracing::info!("{} rps", eps);
311 }
312 }
313 }
314 });
315 let mut threads = Vec::with_capacity(nsids.len());
316 let start = CLOCK.now();
317 for nsid in nsids {
318 let from = from.clone();
319 let to = to.clone();
320 threads.push(std::thread::spawn(move || {
321 tracing::info!("{}: migrating...", nsid.deref());
322 let mut count = 0_u64;
323 for hits in from
324 .get_hits(&nsid, .., usize::MAX)
325 .chunks(100000)
326 .into_iter()
327 {
328 to.ingest_events(hits.map(|hit| {
329 count += 1;
330 let hit = hit.expect("cant decode hit");
331 EventRecord {
332 nsid: nsid.to_smolstr(),
333 timestamp: hit.timestamp,
334 deleted: hit.deser().unwrap().deleted,
335 }
336 }))
337 .expect("cant record event");
338 }
339 tracing::info!("{}: ingested {} events...", nsid.deref(), count);
340 count
341 }));
342 }
343 let mut total_count = 0_u64;
344 for thread in threads {
345 let count = thread.join().expect("thread panicked");
346 total_count += count;
347 }
348 let read_time = start.elapsed();
349 let read_per_second = total_count as f64 / read_time.as_secs_f64();
350 drop(from);
351 tracing::info!("starting sync!!!");
352 to.sync(true).expect("cant sync");
353 tracing::info!("persisting...");
354 let total_time = start.elapsed();
355 let write_per_second = total_count as f64 / (total_time - read_time).as_secs_f64();
356 tracing::info!(
357 "migrated {total_count} events in {total_time:?} ({read_per_second:.2} rps, {write_per_second:.2} wps)"
358 );
359}