tracks lexicons and how many times they appeared on the jetstream
1use std::{ops::Deref, time::Duration, u64};
2
3use itertools::Itertools;
4use rclite::Arc;
5use smol_str::ToSmolStr;
6use tokio_util::sync::CancellationToken;
7use tracing::Level;
8use tracing_subscriber::EnvFilter;
9
10use crate::{
11 api::serve,
12 db::{Db, DbConfig, EventRecord},
13 error::AppError,
14 jetstream::JetstreamClient,
15 utils::{CLOCK, RelativeDateTime, get_time},
16};
17
18mod api;
19mod db;
20mod db_old;
21mod error;
22mod jetstream;
23mod utils;
24
25#[cfg(not(target_env = "msvc"))]
26#[global_allocator]
27static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
28
29#[cfg(target_env = "msvc")]
30#[global_allocator]
31static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
32
33#[tokio::main]
34async fn main() {
35 tracing_subscriber::fmt::fmt()
36 .with_env_filter(
37 EnvFilter::builder()
38 .with_default_directive(Level::INFO.into())
39 .from_env_lossy(),
40 )
41 .compact()
42 .init();
43
44 match std::env::args().nth(1).as_deref() {
45 Some("compact") => {
46 compact();
47 return;
48 }
49 Some("migrate") => {
50 migrate();
51 return;
52 }
53 Some("debug") => {
54 debug();
55 return;
56 }
57 Some(x) => {
58 tracing::error!("unknown command: {}", x);
59 return;
60 }
61 None => {}
62 }
63
64 let cancel_token = CancellationToken::new();
65
66 let db = Arc::new(
67 Db::new(DbConfig::default(), cancel_token.child_token()).expect("couldnt create db"),
68 );
69
70 rustls::crypto::ring::default_provider()
71 .install_default()
72 .expect("cant install rustls crypto provider");
73
74 let mut jetstream =
75 match JetstreamClient::new("wss://jetstream2.us-west.bsky.network/subscribe") {
76 Ok(client) => client,
77 Err(err) => {
78 tracing::error!("can't create jetstream client: {err}");
79 return;
80 }
81 };
82
83 let (event_tx, mut event_rx) = tokio::sync::mpsc::channel(1000);
84 let consume_events = tokio::spawn({
85 let consume_cancel = cancel_token.child_token();
86 async move {
87 jetstream.connect().await?;
88 loop {
89 tokio::select! {
90 maybe_event = jetstream.read(consume_cancel.child_token()) => match maybe_event {
91 Ok(event) => {
92 let Some(record) = EventRecord::from_jetstream(event) else {
93 continue;
94 };
95 event_tx.send(record).await?;
96 }
97 Err(err) => return Err(err),
98 },
99 _ = consume_cancel.cancelled() => break Ok(()),
100 }
101 }
102 }
103 });
104
105 let ingest_events = std::thread::spawn({
106 let db = db.clone();
107 move || {
108 let mut buffer = Vec::new();
109 loop {
110 let read = event_rx.blocking_recv_many(&mut buffer, 100);
111 if let Err(err) = db.ingest_events(buffer.drain(..)) {
112 tracing::error!("failed to ingest events: {}", err);
113 }
114 if read == 0 || db.is_shutting_down() {
115 break;
116 }
117 }
118 }
119 });
120
121 let db_task = tokio::task::spawn({
122 let db = db.clone();
123 async move {
124 let sync_period = Duration::from_secs(10);
125 let mut sync_interval = tokio::time::interval(sync_period);
126 sync_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
127
128 let compact_period = std::time::Duration::from_secs(60 * 30); // 30 mins
129 let mut compact_interval = tokio::time::interval(compact_period);
130 compact_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
131
132 loop {
133 let sync_db = async || {
134 tokio::task::spawn_blocking({
135 let db = db.clone();
136 move || {
137 if db.is_shutting_down() {
138 return;
139 }
140 match db.sync(false) {
141 Ok(_) => (),
142 Err(e) => tracing::error!("failed to sync db: {}", e),
143 }
144 }
145 })
146 .await
147 .unwrap();
148 };
149 let compact_db = async || {
150 tokio::task::spawn_blocking({
151 let db = db.clone();
152 move || {
153 if db.is_shutting_down() {
154 return;
155 }
156 let end = get_time() - compact_period / 2;
157 let start = end - compact_period;
158 let range = start.as_secs()..end.as_secs();
159 tracing::info!(
160 {
161 start = %RelativeDateTime::from_now(start),
162 end = %RelativeDateTime::from_now(end),
163 },
164 "running compaction...",
165 );
166 match db.compact_all(db.cfg.max_block_size, range, false) {
167 Ok(_) => (),
168 Err(e) => tracing::error!("failed to compact db: {}", e),
169 }
170 }
171 })
172 .await
173 .unwrap();
174 };
175 tokio::select! {
176 _ = sync_interval.tick() => sync_db().await,
177 _ = compact_interval.tick() => compact_db().await,
178 _ = db.shutting_down() => break,
179 }
180 }
181 }
182 });
183
184 tokio::select! {
185 res = serve(db.clone(), cancel_token.child_token()) => {
186 if let Err(e) = res {
187 tracing::error!("serve failed: {}", e);
188 }
189 }
190 res = consume_events => {
191 let err =
192 res
193 .map_err(AppError::from)
194 .and_then(std::convert::identity)
195 .expect_err("consume events cant return ok");
196 tracing::error!("consume events failed: {}", err);
197 },
198 _ = tokio::signal::ctrl_c() => {
199 tracing::info!("received ctrl+c!");
200 cancel_token.cancel();
201 }
202 }
203
204 tracing::info!("shutting down...");
205 cancel_token.cancel();
206 ingest_events.join().expect("failed to join ingest events");
207 db_task.await.expect("cant join db task");
208 db.sync(true).expect("cant sync db");
209}
210
211fn debug() {
212 let db = Db::new(DbConfig::default(), CancellationToken::new()).expect("couldnt create db");
213 let info = db.info().expect("cant get db info");
214 println!("disk size: {}", info.disk_size);
215 for (nsid, blocks) in info.nsids {
216 print!("{nsid}:");
217 let mut last_size = 0;
218 let mut same_size_count = 0;
219 for item_count in blocks {
220 if item_count == last_size {
221 same_size_count += 1;
222 } else {
223 if same_size_count > 1 {
224 print!("x{}", same_size_count);
225 }
226 print!(" {item_count}");
227 same_size_count = 0;
228 }
229 last_size = item_count;
230 }
231 print!("\n");
232 }
233}
234
235fn compact() {
236 let db = Db::new(
237 DbConfig::default().ks(|c| {
238 c.max_journaling_size(u64::MAX)
239 .max_write_buffer_size(u64::MAX)
240 }),
241 CancellationToken::new(),
242 )
243 .expect("couldnt create db");
244 let info = db.info().expect("cant get db info");
245 db.major_compact().expect("cant compact");
246 std::thread::sleep(Duration::from_secs(5));
247 let compacted_info = db.info().expect("cant get db info");
248 println!(
249 "disk size: {} -> {}",
250 info.disk_size, compacted_info.disk_size
251 );
252 for (nsid, blocks) in info.nsids {
253 println!(
254 "{nsid}: {} -> {}",
255 blocks.len(),
256 compacted_info.nsids[&nsid].len()
257 )
258 }
259}
260
261fn migrate() {
262 let cancel_token = CancellationToken::new();
263
264 let from = Arc::new(db_old::Db::new(".fjall_data_from").expect("couldnt create db"));
265
266 let to = Arc::new(
267 Db::new(
268 DbConfig::default().path(".fjall_data_to").ks(|c| {
269 c.max_journaling_size(u64::MAX)
270 .max_write_buffer_size(u64::MAX)
271 .compaction_workers(rayon::current_num_threads() * 4)
272 .flush_workers(rayon::current_num_threads() * 4)
273 }),
274 cancel_token.child_token(),
275 )
276 .expect("couldnt create db"),
277 );
278
279 let nsids = from.get_nsids().collect::<Vec<_>>();
280 let eps_thread = std::thread::spawn({
281 let to = to.clone();
282 move || {
283 loop {
284 std::thread::sleep(Duration::from_secs(3));
285 let eps = to.eps();
286 if eps > 0 {
287 tracing::info!("{} rps", eps);
288 }
289 }
290 }
291 });
292 let mut threads = Vec::with_capacity(nsids.len());
293 let start = CLOCK.now();
294 for nsid in nsids {
295 let from = from.clone();
296 let to = to.clone();
297 threads.push(std::thread::spawn(move || {
298 tracing::info!("{}: migrating...", nsid.deref());
299 let mut count = 0_u64;
300 for hits in from.get_hits(&nsid, ..).chunks(100000).into_iter() {
301 to.ingest_events(hits.map(|hit| {
302 count += 1;
303 let hit = hit.expect("cant decode hit");
304 EventRecord {
305 nsid: nsid.to_smolstr(),
306 timestamp: hit.timestamp,
307 deleted: hit.deser().unwrap().deleted,
308 }
309 }))
310 .expect("cant record event");
311 }
312 tracing::info!("{}: ingested {} events...", nsid.deref(), count);
313 count
314 }));
315 }
316 let mut total_count = 0_u64;
317 for thread in threads {
318 let count = thread.join().expect("thread panicked");
319 total_count += count;
320 }
321 let read_time = start.elapsed();
322 let read_per_second = total_count as f64 / read_time.as_secs_f64();
323 drop(from);
324 tracing::info!("starting sync!!!");
325 to.sync(true).expect("cant sync");
326 tracing::info!("persisting...");
327 let total_time = start.elapsed();
328 let write_per_second = total_count as f64 / (total_time - read_time).as_secs_f64();
329 tracing::info!(
330 "migrated {total_count} events in {total_time:?} ({read_per_second:.2} rps, {write_per_second:.2} wps)"
331 );
332}