tracks lexicons and how many times they appeared on the jetstream
1use std::{ops::Deref, time::Duration, u64, usize};
2
3use itertools::Itertools;
4use rclite::Arc;
5use smol_str::ToSmolStr;
6use tokio_util::sync::CancellationToken;
7use tracing::Level;
8use tracing_subscriber::EnvFilter;
9
10use crate::{
11 api::serve,
12 db::{Db, DbConfig, EventRecord},
13 error::AppError,
14 jetstream::JetstreamClient,
15 utils::{CLOCK, RelativeDateTime, get_time},
16};
17
18mod api;
19mod db;
20mod error;
21mod jetstream;
22mod utils;
23
24#[cfg(not(target_env = "msvc"))]
25#[global_allocator]
26static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
27
28#[tokio::main]
29async fn main() {
30 tracing_subscriber::fmt::fmt()
31 .with_env_filter(
32 EnvFilter::builder()
33 .with_default_directive(Level::INFO.into())
34 .from_env_lossy(),
35 )
36 .compact()
37 .init();
38
39 match std::env::args().nth(1).as_deref() {
40 Some("compact") => {
41 compact();
42 return;
43 }
44 Some("migrate") => {
45 migrate();
46 return;
47 }
48 Some("debug") => {
49 debug();
50 return;
51 }
52 Some("print") => {
53 print_all();
54 return;
55 }
56 Some(x) => {
57 tracing::error!("unknown command: {}", x);
58 return;
59 }
60 None => {}
61 }
62
63 let cancel_token = CancellationToken::new();
64
65 let db = Arc::new(
66 Db::new(DbConfig::default(), cancel_token.child_token()).expect("couldnt create db"),
67 );
68
69 rustls::crypto::ring::default_provider()
70 .install_default()
71 .expect("cant install rustls crypto provider");
72
73 let mut jetstream =
74 match JetstreamClient::new("wss://jetstream2.us-west.bsky.network/subscribe") {
75 Ok(client) => client,
76 Err(err) => {
77 tracing::error!("can't create jetstream client: {err}");
78 return;
79 }
80 };
81
82 let (event_tx, mut event_rx) = tokio::sync::mpsc::channel(1000);
83 let consume_events = tokio::spawn({
84 let consume_cancel = cancel_token.child_token();
85 async move {
86 jetstream.connect().await?;
87 loop {
88 tokio::select! {
89 maybe_event = jetstream.read(consume_cancel.child_token()) => match maybe_event {
90 Ok(event) => {
91 let Some(record) = EventRecord::from_jetstream(event) else {
92 continue;
93 };
94 event_tx.send(record).await?;
95 }
96 Err(err) => return Err(err),
97 },
98 _ = consume_cancel.cancelled() => break Ok(()),
99 }
100 }
101 }
102 });
103
104 let ingest_events = std::thread::spawn({
105 let db = db.clone();
106 move || {
107 let mut buffer = Vec::new();
108 loop {
109 let read = event_rx.blocking_recv_many(&mut buffer, 500);
110 if let Err(err) = db.ingest_events(buffer.drain(..)) {
111 tracing::error!("failed to ingest events: {}", err);
112 }
113 if read == 0 || db.is_shutting_down() {
114 break;
115 }
116 }
117 }
118 });
119
120 let db_task = tokio::task::spawn({
121 let db = db.clone();
122 async move {
123 let sync_period = Duration::from_secs(10);
124 let mut sync_interval = tokio::time::interval(sync_period);
125 sync_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
126
127 let compact_period = std::time::Duration::from_secs(60 * 30); // 30 mins
128 let mut compact_interval = tokio::time::interval(compact_period);
129 compact_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
130
131 loop {
132 let sync_db = async || {
133 tokio::task::spawn_blocking({
134 let db = db.clone();
135 move || {
136 if db.is_shutting_down() {
137 return;
138 }
139 match db.sync(false) {
140 Ok(_) => (),
141 Err(e) => tracing::error!("failed to sync db: {}", e),
142 }
143 }
144 })
145 .await
146 .unwrap();
147 };
148 let compact_db = async || {
149 tokio::task::spawn_blocking({
150 let db = db.clone();
151 move || {
152 if db.is_shutting_down() {
153 return;
154 }
155 let end = get_time();
156 let start = end - compact_period;
157 let range = start.as_secs()..end.as_secs();
158 tracing::info!(
159 {
160 start = %RelativeDateTime::from_now(start),
161 end = %RelativeDateTime::from_now(end),
162 },
163 "running compaction...",
164 );
165 match db.compact_all(db.cfg.max_block_size, range, false) {
166 Ok(_) => (),
167 Err(e) => tracing::error!("failed to compact db: {}", e),
168 }
169 }
170 })
171 .await
172 .unwrap();
173 };
174 tokio::select! {
175 _ = sync_interval.tick() => sync_db().await,
176 _ = compact_interval.tick() => compact_db().await,
177 _ = db.shutting_down() => break,
178 }
179 }
180 }
181 });
182
183 tokio::select! {
184 res = serve(db.clone(), cancel_token.child_token()) => {
185 if let Err(e) = res {
186 tracing::error!("serve failed: {}", e);
187 }
188 }
189 res = consume_events => {
190 let err =
191 res
192 .map_err(AppError::from)
193 .and_then(std::convert::identity)
194 .expect_err("consume events cant return ok");
195 tracing::error!("consume events failed: {}", err);
196 },
197 _ = tokio::signal::ctrl_c() => {
198 tracing::info!("received ctrl+c!");
199 cancel_token.cancel();
200 }
201 }
202
203 tracing::info!("shutting down...");
204 cancel_token.cancel();
205 ingest_events.join().expect("failed to join ingest events");
206 db_task.await.expect("cant join db task");
207 db.sync(true).expect("cant sync db");
208}
209
210fn print_all() {
211 let db = Db::new(DbConfig::default(), CancellationToken::new()).expect("couldnt create db");
212 let nsids = db.get_nsids().collect::<Vec<_>>();
213 let mut count = 0_usize;
214 for nsid in nsids {
215 println!("{}:", nsid.deref());
216 for hit in db.get_hits(&nsid, .., usize::MAX) {
217 let hit = hit.expect("aaa");
218 println!("{} {}", hit.timestamp, hit.deser().unwrap().deleted);
219 count += 1;
220 }
221 }
222 println!("total hits: {}", count);
223}
224
225fn debug() {
226 let db = Db::new(DbConfig::default(), CancellationToken::new()).expect("couldnt create db");
227 let info = db.info().expect("cant get db info");
228 println!("disk size: {}", info.disk_size);
229 for (nsid, blocks) in info.nsids {
230 print!("{nsid}:");
231 let mut last_size = 0;
232 let mut same_size_count = 0;
233 for item_count in blocks {
234 if item_count == last_size {
235 same_size_count += 1;
236 } else {
237 if same_size_count > 1 {
238 print!("x{}", same_size_count);
239 }
240 print!(" {item_count}");
241 same_size_count = 0;
242 }
243 last_size = item_count;
244 }
245 print!("\n");
246 }
247}
248
249fn compact() {
250 let db = Db::new(
251 DbConfig::default().ks(|c| {
252 c.max_journaling_size(u64::MAX)
253 .max_write_buffer_size(u64::MAX)
254 }),
255 CancellationToken::new(),
256 )
257 .expect("couldnt create db");
258 let info = db.info().expect("cant get db info");
259 db.major_compact().expect("cant compact");
260 std::thread::sleep(Duration::from_secs(5));
261 let compacted_info = db.info().expect("cant get db info");
262 println!(
263 "disk size: {} -> {}",
264 info.disk_size, compacted_info.disk_size
265 );
266 for (nsid, blocks) in info.nsids {
267 println!(
268 "{nsid}: {} -> {}",
269 blocks.len(),
270 compacted_info.nsids[&nsid].len()
271 )
272 }
273}
274
275fn migrate() {
276 let cancel_token = CancellationToken::new();
277 let from = Arc::new(
278 Db::new(
279 DbConfig::default().path(".fjall_data_from"),
280 cancel_token.child_token(),
281 )
282 .expect("couldnt create db"),
283 );
284 let to = Arc::new(
285 Db::new(
286 DbConfig::default().path(".fjall_data_to").ks(|c| {
287 c.max_journaling_size(u64::MAX)
288 .max_write_buffer_size(u64::MAX)
289 .compaction_workers(rayon::current_num_threads() * 4)
290 .flush_workers(rayon::current_num_threads() * 4)
291 }),
292 cancel_token.child_token(),
293 )
294 .expect("couldnt create db"),
295 );
296
297 let nsids = from.get_nsids().collect::<Vec<_>>();
298 let _eps_thread = std::thread::spawn({
299 let to = to.clone();
300 move || {
301 loop {
302 std::thread::sleep(Duration::from_secs(3));
303 let eps = to.eps();
304 if eps > 0 {
305 tracing::info!("{} rps", eps);
306 }
307 }
308 }
309 });
310 let mut threads = Vec::with_capacity(nsids.len());
311 let start = CLOCK.now();
312 for nsid in nsids {
313 let from = from.clone();
314 let to = to.clone();
315 threads.push(std::thread::spawn(move || {
316 tracing::info!("{}: migrating...", nsid.deref());
317 let mut count = 0_u64;
318 for hits in from
319 .get_hits(&nsid, .., usize::MAX)
320 .chunks(100000)
321 .into_iter()
322 {
323 to.ingest_events(hits.map(|hit| {
324 count += 1;
325 let hit = hit.expect("cant decode hit");
326 EventRecord {
327 nsid: nsid.to_smolstr(),
328 timestamp: hit.timestamp,
329 deleted: hit.deser().unwrap().deleted,
330 }
331 }))
332 .expect("cant record event");
333 }
334 tracing::info!("{}: ingested {} events...", nsid.deref(), count);
335 count
336 }));
337 }
338 let mut total_count = 0_u64;
339 for thread in threads {
340 let count = thread.join().expect("thread panicked");
341 total_count += count;
342 }
343 let read_time = start.elapsed();
344 let read_per_second = total_count as f64 / read_time.as_secs_f64();
345 drop(from);
346 tracing::info!("starting sync!!!");
347 to.sync(true).expect("cant sync");
348 tracing::info!("persisting...");
349 let total_time = start.elapsed();
350 let write_per_second = total_count as f64 / (total_time - read_time).as_secs_f64();
351 tracing::info!(
352 "migrated {total_count} events in {total_time:?} ({read_per_second:.2} rps, {write_per_second:.2} wps)"
353 );
354}