tracks lexicons and how many times they appeared on the jetstream
1use std::{ops::Deref, time::Duration, u64, usize};
2
3use itertools::Itertools;
4use rclite::Arc;
5use smol_str::ToSmolStr;
6use tokio_util::sync::CancellationToken;
7use tracing::Level;
8use tracing_subscriber::EnvFilter;
9
10use crate::{
11 api::serve,
12 db::{Db, DbConfig, EventRecord},
13 error::AppError,
14 jetstream::JetstreamClient,
15 utils::{CLOCK, RelativeDateTime, get_time},
16};
17
18mod api;
19mod db;
20mod error;
21mod jetstream;
22mod utils;
23
24#[cfg(not(target_env = "msvc"))]
25#[global_allocator]
26static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
27
28#[cfg(target_env = "msvc")]
29#[global_allocator]
30static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
31
32#[tokio::main]
33async fn main() {
34 tracing_subscriber::fmt::fmt()
35 .with_env_filter(
36 EnvFilter::builder()
37 .with_default_directive(Level::INFO.into())
38 .from_env_lossy(),
39 )
40 .compact()
41 .init();
42
43 match std::env::args().nth(1).as_deref() {
44 Some("compact") => {
45 compact();
46 return;
47 }
48 Some("migrate") => {
49 migrate();
50 return;
51 }
52 Some("debug") => {
53 debug();
54 return;
55 }
56 Some("print") => {
57 print_all();
58 return;
59 }
60 Some(x) => {
61 tracing::error!("unknown command: {}", x);
62 return;
63 }
64 None => {}
65 }
66
67 let cancel_token = CancellationToken::new();
68
69 let db = Arc::new(
70 Db::new(DbConfig::default(), cancel_token.child_token()).expect("couldnt create db"),
71 );
72
73 rustls::crypto::ring::default_provider()
74 .install_default()
75 .expect("cant install rustls crypto provider");
76
77 let mut jetstream =
78 match JetstreamClient::new("wss://jetstream2.us-west.bsky.network/subscribe") {
79 Ok(client) => client,
80 Err(err) => {
81 tracing::error!("can't create jetstream client: {err}");
82 return;
83 }
84 };
85
86 let (event_tx, mut event_rx) = tokio::sync::mpsc::channel(1000);
87 let consume_events = tokio::spawn({
88 let consume_cancel = cancel_token.child_token();
89 async move {
90 jetstream.connect().await?;
91 loop {
92 tokio::select! {
93 maybe_event = jetstream.read(consume_cancel.child_token()) => match maybe_event {
94 Ok(event) => {
95 let Some(record) = EventRecord::from_jetstream(event) else {
96 continue;
97 };
98 event_tx.send(record).await?;
99 }
100 Err(err) => return Err(err),
101 },
102 _ = consume_cancel.cancelled() => break Ok(()),
103 }
104 }
105 }
106 });
107
108 let ingest_events = std::thread::spawn({
109 let db = db.clone();
110 move || {
111 let mut buffer = Vec::new();
112 loop {
113 let read = event_rx.blocking_recv_many(&mut buffer, 100);
114 if let Err(err) = db.ingest_events(buffer.drain(..)) {
115 tracing::error!("failed to ingest events: {}", err);
116 }
117 if read == 0 || db.is_shutting_down() {
118 break;
119 }
120 }
121 }
122 });
123
124 let db_task = tokio::task::spawn({
125 let db = db.clone();
126 async move {
127 let sync_period = Duration::from_secs(10);
128 let mut sync_interval = tokio::time::interval(sync_period);
129 sync_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
130
131 let compact_period = std::time::Duration::from_secs(60 * 30); // 30 mins
132 let mut compact_interval = tokio::time::interval(compact_period);
133 compact_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
134
135 loop {
136 let sync_db = async || {
137 tokio::task::spawn_blocking({
138 let db = db.clone();
139 move || {
140 if db.is_shutting_down() {
141 return;
142 }
143 match db.sync(false) {
144 Ok(_) => (),
145 Err(e) => tracing::error!("failed to sync db: {}", e),
146 }
147 }
148 })
149 .await
150 .unwrap();
151 };
152 let compact_db = async || {
153 tokio::task::spawn_blocking({
154 let db = db.clone();
155 move || {
156 if db.is_shutting_down() {
157 return;
158 }
159 let end = get_time();
160 let start = end - compact_period;
161 let range = start.as_secs()..end.as_secs();
162 tracing::info!(
163 {
164 start = %RelativeDateTime::from_now(start),
165 end = %RelativeDateTime::from_now(end),
166 },
167 "running compaction...",
168 );
169 match db.compact_all(db.cfg.max_block_size, range, false) {
170 Ok(_) => (),
171 Err(e) => tracing::error!("failed to compact db: {}", e),
172 }
173 }
174 })
175 .await
176 .unwrap();
177 };
178 tokio::select! {
179 _ = sync_interval.tick() => sync_db().await,
180 _ = compact_interval.tick() => compact_db().await,
181 _ = db.shutting_down() => break,
182 }
183 }
184 }
185 });
186
187 tokio::select! {
188 res = serve(db.clone(), cancel_token.child_token()) => {
189 if let Err(e) = res {
190 tracing::error!("serve failed: {}", e);
191 }
192 }
193 res = consume_events => {
194 let err =
195 res
196 .map_err(AppError::from)
197 .and_then(std::convert::identity)
198 .expect_err("consume events cant return ok");
199 tracing::error!("consume events failed: {}", err);
200 },
201 _ = tokio::signal::ctrl_c() => {
202 tracing::info!("received ctrl+c!");
203 cancel_token.cancel();
204 }
205 }
206
207 tracing::info!("shutting down...");
208 cancel_token.cancel();
209 ingest_events.join().expect("failed to join ingest events");
210 db_task.await.expect("cant join db task");
211 db.sync(true).expect("cant sync db");
212}
213
214fn print_all() {
215 let db = Db::new(DbConfig::default(), CancellationToken::new()).expect("couldnt create db");
216 let nsids = db.get_nsids().collect::<Vec<_>>();
217 let mut count = 0_usize;
218 for nsid in nsids {
219 println!("{}:", nsid.deref());
220 for hit in db.get_hits(&nsid, .., usize::MAX) {
221 let hit = hit.expect("aaa");
222 println!("{} {}", hit.timestamp, hit.deser().unwrap().deleted);
223 count += 1;
224 }
225 }
226 println!("total hits: {}", count);
227}
228
229fn debug() {
230 let db = Db::new(DbConfig::default(), CancellationToken::new()).expect("couldnt create db");
231 let info = db.info().expect("cant get db info");
232 println!("disk size: {}", info.disk_size);
233 for (nsid, blocks) in info.nsids {
234 print!("{nsid}:");
235 let mut last_size = 0;
236 let mut same_size_count = 0;
237 for item_count in blocks {
238 if item_count == last_size {
239 same_size_count += 1;
240 } else {
241 if same_size_count > 1 {
242 print!("x{}", same_size_count);
243 }
244 print!(" {item_count}");
245 same_size_count = 0;
246 }
247 last_size = item_count;
248 }
249 print!("\n");
250 }
251}
252
253fn compact() {
254 let db = Db::new(
255 DbConfig::default().ks(|c| {
256 c.max_journaling_size(u64::MAX)
257 .max_write_buffer_size(u64::MAX)
258 }),
259 CancellationToken::new(),
260 )
261 .expect("couldnt create db");
262 let info = db.info().expect("cant get db info");
263 db.major_compact().expect("cant compact");
264 std::thread::sleep(Duration::from_secs(5));
265 let compacted_info = db.info().expect("cant get db info");
266 println!(
267 "disk size: {} -> {}",
268 info.disk_size, compacted_info.disk_size
269 );
270 for (nsid, blocks) in info.nsids {
271 println!(
272 "{nsid}: {} -> {}",
273 blocks.len(),
274 compacted_info.nsids[&nsid].len()
275 )
276 }
277}
278
279fn migrate() {
280 let cancel_token = CancellationToken::new();
281 let from = Arc::new(
282 Db::new(
283 DbConfig::default().path(".fjall_data_from"),
284 cancel_token.child_token(),
285 )
286 .expect("couldnt create db"),
287 );
288 let to = Arc::new(
289 Db::new(
290 DbConfig::default().path(".fjall_data_to").ks(|c| {
291 c.max_journaling_size(u64::MAX)
292 .max_write_buffer_size(u64::MAX)
293 .compaction_workers(rayon::current_num_threads() * 4)
294 .flush_workers(rayon::current_num_threads() * 4)
295 }),
296 cancel_token.child_token(),
297 )
298 .expect("couldnt create db"),
299 );
300
301 let nsids = from.get_nsids().collect::<Vec<_>>();
302 let _eps_thread = std::thread::spawn({
303 let to = to.clone();
304 move || {
305 loop {
306 std::thread::sleep(Duration::from_secs(3));
307 let eps = to.eps();
308 if eps > 0 {
309 tracing::info!("{} rps", eps);
310 }
311 }
312 }
313 });
314 let mut threads = Vec::with_capacity(nsids.len());
315 let start = CLOCK.now();
316 for nsid in nsids {
317 let from = from.clone();
318 let to = to.clone();
319 threads.push(std::thread::spawn(move || {
320 tracing::info!("{}: migrating...", nsid.deref());
321 let mut count = 0_u64;
322 for hits in from
323 .get_hits(&nsid, .., usize::MAX)
324 .chunks(100000)
325 .into_iter()
326 {
327 to.ingest_events(hits.map(|hit| {
328 count += 1;
329 let hit = hit.expect("cant decode hit");
330 let mut timestamp = hit.timestamp;
331 // check if timestamp is microseconds, convert to seconds if it is
332 if timestamp > 10_000_000_000 {
333 timestamp /= 1_000_000;
334 }
335 EventRecord {
336 nsid: nsid.to_smolstr(),
337 timestamp,
338 deleted: hit.deser().unwrap().deleted,
339 }
340 }))
341 .expect("cant record event");
342 }
343 tracing::info!("{}: ingested {} events...", nsid.deref(), count);
344 count
345 }));
346 }
347 let mut total_count = 0_u64;
348 for thread in threads {
349 let count = thread.join().expect("thread panicked");
350 total_count += count;
351 }
352 let read_time = start.elapsed();
353 let read_per_second = total_count as f64 / read_time.as_secs_f64();
354 drop(from);
355 tracing::info!("starting sync!!!");
356 to.sync(true).expect("cant sync");
357 tracing::info!("persisting...");
358 let total_time = start.elapsed();
359 let write_per_second = total_count as f64 / (total_time - read_time).as_secs_f64();
360 tracing::info!(
361 "migrated {total_count} events in {total_time:?} ({read_per_second:.2} rps, {write_per_second:.2} wps)"
362 );
363}