tracks lexicons and how many times they appeared on the jetstream
1use std::{ops::Deref, time::Duration, u64, usize};
2
3use itertools::Itertools;
4use rclite::Arc;
5use smol_str::ToSmolStr;
6use tokio_util::sync::CancellationToken;
7use tracing::Level;
8use tracing_subscriber::EnvFilter;
9
10use crate::{
11 api::serve,
12 db::{Db, DbConfig, EventRecord},
13 error::AppError,
14 jetstream::JetstreamClient,
15 utils::{CLOCK, RelativeDateTime, get_time},
16};
17
18mod api;
19mod db;
20mod error;
21mod jetstream;
22mod utils;
23
24#[global_allocator]
25static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
26
27#[tokio::main]
28async fn main() {
29 tracing_subscriber::fmt::fmt()
30 .with_env_filter(
31 EnvFilter::builder()
32 .with_default_directive(Level::INFO.into())
33 .from_env_lossy(),
34 )
35 .compact()
36 .init();
37
38 match std::env::args().nth(1).as_deref() {
39 Some("compact") => {
40 compact();
41 return;
42 }
43 Some("migrate") => {
44 migrate();
45 return;
46 }
47 Some("debug") => {
48 debug();
49 return;
50 }
51 Some("print") => {
52 print_all();
53 return;
54 }
55 Some(x) => {
56 tracing::error!("unknown command: {}", x);
57 return;
58 }
59 None => {}
60 }
61
62 let cancel_token = CancellationToken::new();
63
64 let db = Arc::new(
65 Db::new(DbConfig::default(), cancel_token.child_token()).expect("couldnt create db"),
66 );
67
68 rustls::crypto::ring::default_provider()
69 .install_default()
70 .expect("cant install rustls crypto provider");
71
72 let mut jetstream =
73 match JetstreamClient::new("wss://jetstream2.us-west.bsky.network/subscribe") {
74 Ok(client) => client,
75 Err(err) => {
76 tracing::error!("can't create jetstream client: {err}");
77 return;
78 }
79 };
80
81 let (event_tx, mut event_rx) = tokio::sync::mpsc::channel(1000);
82 let consume_events = tokio::spawn({
83 let consume_cancel = cancel_token.child_token();
84 async move {
85 jetstream.connect().await?;
86 loop {
87 tokio::select! {
88 maybe_event = jetstream.read(consume_cancel.child_token()) => match maybe_event {
89 Ok(event) => {
90 let Some(record) = EventRecord::from_jetstream(event) else {
91 continue;
92 };
93 event_tx.send(record).await?;
94 }
95 Err(err) => return Err(err),
96 },
97 _ = consume_cancel.cancelled() => break Ok(()),
98 }
99 }
100 }
101 });
102
103 let ingest_events = std::thread::spawn({
104 let db = db.clone();
105 move || {
106 let mut buffer = Vec::new();
107 loop {
108 let read = event_rx.blocking_recv_many(&mut buffer, 100);
109 if let Err(err) = db.ingest_events(buffer.drain(..)) {
110 tracing::error!("failed to ingest events: {}", err);
111 }
112 if read == 0 || db.is_shutting_down() {
113 break;
114 }
115 }
116 }
117 });
118
119 let db_task = tokio::task::spawn({
120 let db = db.clone();
121 async move {
122 let sync_period = Duration::from_secs(10);
123 let mut sync_interval = tokio::time::interval(sync_period);
124 sync_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
125
126 let compact_period = std::time::Duration::from_secs(60 * 30); // 30 mins
127 let mut compact_interval = tokio::time::interval(compact_period);
128 compact_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
129
130 loop {
131 let sync_db = async || {
132 tokio::task::spawn_blocking({
133 let db = db.clone();
134 move || {
135 if db.is_shutting_down() {
136 return;
137 }
138 match db.sync(false) {
139 Ok(_) => (),
140 Err(e) => tracing::error!("failed to sync db: {}", e),
141 }
142 }
143 })
144 .await
145 .unwrap();
146 };
147 let compact_db = async || {
148 tokio::task::spawn_blocking({
149 let db = db.clone();
150 move || {
151 if db.is_shutting_down() {
152 return;
153 }
154 let end = get_time();
155 let start = end - compact_period;
156 let range = start.as_secs()..end.as_secs();
157 tracing::info!(
158 {
159 start = %RelativeDateTime::from_now(start),
160 end = %RelativeDateTime::from_now(end),
161 },
162 "running compaction...",
163 );
164 match db.compact_all(db.cfg.max_block_size, range, false) {
165 Ok(_) => (),
166 Err(e) => tracing::error!("failed to compact db: {}", e),
167 }
168 }
169 })
170 .await
171 .unwrap();
172 };
173 tokio::select! {
174 _ = sync_interval.tick() => sync_db().await,
175 _ = compact_interval.tick() => compact_db().await,
176 _ = db.shutting_down() => break,
177 }
178 }
179 }
180 });
181
182 tokio::select! {
183 res = serve(db.clone(), cancel_token.child_token()) => {
184 if let Err(e) = res {
185 tracing::error!("serve failed: {}", e);
186 }
187 }
188 res = consume_events => {
189 let err =
190 res
191 .map_err(AppError::from)
192 .and_then(std::convert::identity)
193 .expect_err("consume events cant return ok");
194 tracing::error!("consume events failed: {}", err);
195 },
196 _ = tokio::signal::ctrl_c() => {
197 tracing::info!("received ctrl+c!");
198 cancel_token.cancel();
199 }
200 }
201
202 tracing::info!("shutting down...");
203 cancel_token.cancel();
204 ingest_events.join().expect("failed to join ingest events");
205 db_task.await.expect("cant join db task");
206 db.sync(true).expect("cant sync db");
207}
208
209fn print_all() {
210 let db = Db::new(DbConfig::default(), CancellationToken::new()).expect("couldnt create db");
211 let nsids = db.get_nsids().collect::<Vec<_>>();
212 let mut count = 0_usize;
213 for nsid in nsids {
214 println!("{}:", nsid.deref());
215 for hit in db.get_hits(&nsid, .., usize::MAX) {
216 let hit = hit.expect("aaa");
217 println!("{} {}", hit.timestamp, hit.deser().unwrap().deleted);
218 count += 1;
219 }
220 }
221 println!("total hits: {}", count);
222}
223
224fn debug() {
225 let db = Db::new(DbConfig::default(), CancellationToken::new()).expect("couldnt create db");
226 let info = db.info().expect("cant get db info");
227 println!("disk size: {}", info.disk_size);
228 for (nsid, blocks) in info.nsids {
229 print!("{nsid}:");
230 let mut last_size = 0;
231 let mut same_size_count = 0;
232 for item_count in blocks {
233 if item_count == last_size {
234 same_size_count += 1;
235 } else {
236 if same_size_count > 1 {
237 print!("x{}", same_size_count);
238 }
239 print!(" {item_count}");
240 same_size_count = 0;
241 }
242 last_size = item_count;
243 }
244 print!("\n");
245 }
246}
247
248fn compact() {
249 let db = Db::new(
250 DbConfig::default().ks(|c| {
251 c.max_journaling_size(u64::MAX)
252 .max_write_buffer_size(u64::MAX)
253 }),
254 CancellationToken::new(),
255 )
256 .expect("couldnt create db");
257 let info = db.info().expect("cant get db info");
258 db.major_compact().expect("cant compact");
259 std::thread::sleep(Duration::from_secs(5));
260 let compacted_info = db.info().expect("cant get db info");
261 println!(
262 "disk size: {} -> {}",
263 info.disk_size, compacted_info.disk_size
264 );
265 for (nsid, blocks) in info.nsids {
266 println!(
267 "{nsid}: {} -> {}",
268 blocks.len(),
269 compacted_info.nsids[&nsid].len()
270 )
271 }
272}
273
274fn migrate() {
275 let cancel_token = CancellationToken::new();
276 let from = Arc::new(
277 Db::new(
278 DbConfig::default().path(".fjall_data_from"),
279 cancel_token.child_token(),
280 )
281 .expect("couldnt create db"),
282 );
283 let to = Arc::new(
284 Db::new(
285 DbConfig::default().path(".fjall_data_to").ks(|c| {
286 c.max_journaling_size(u64::MAX)
287 .max_write_buffer_size(u64::MAX)
288 .compaction_workers(rayon::current_num_threads() * 4)
289 .flush_workers(rayon::current_num_threads() * 4)
290 }),
291 cancel_token.child_token(),
292 )
293 .expect("couldnt create db"),
294 );
295
296 let nsids = from.get_nsids().collect::<Vec<_>>();
297 let _eps_thread = std::thread::spawn({
298 let to = to.clone();
299 move || {
300 loop {
301 std::thread::sleep(Duration::from_secs(3));
302 let eps = to.eps();
303 if eps > 0 {
304 tracing::info!("{} rps", eps);
305 }
306 }
307 }
308 });
309 let mut threads = Vec::with_capacity(nsids.len());
310 let start = CLOCK.now();
311 for nsid in nsids {
312 let from = from.clone();
313 let to = to.clone();
314 threads.push(std::thread::spawn(move || {
315 tracing::info!("{}: migrating...", nsid.deref());
316 let mut count = 0_u64;
317 for hits in from
318 .get_hits(&nsid, .., usize::MAX)
319 .chunks(100000)
320 .into_iter()
321 {
322 to.ingest_events(hits.map(|hit| {
323 count += 1;
324 let hit = hit.expect("cant decode hit");
325 EventRecord {
326 nsid: nsid.to_smolstr(),
327 timestamp: hit.timestamp,
328 deleted: hit.deser().unwrap().deleted,
329 }
330 }))
331 .expect("cant record event");
332 }
333 tracing::info!("{}: ingested {} events...", nsid.deref(), count);
334 count
335 }));
336 }
337 let mut total_count = 0_u64;
338 for thread in threads {
339 let count = thread.join().expect("thread panicked");
340 total_count += count;
341 }
342 let read_time = start.elapsed();
343 let read_per_second = total_count as f64 / read_time.as_secs_f64();
344 drop(from);
345 tracing::info!("starting sync!!!");
346 to.sync(true).expect("cant sync");
347 tracing::info!("persisting...");
348 let total_time = start.elapsed();
349 let write_per_second = total_count as f64 / (total_time - read_time).as_secs_f64();
350 tracing::info!(
351 "migrated {total_count} events in {total_time:?} ({read_per_second:.2} rps, {write_per_second:.2} wps)"
352 );
353}