tracks lexicons and how many times they appeared on the jetstream
1use std::{
2 fmt::Debug,
3 io::Cursor,
4 ops::{Bound, Deref, RangeBounds},
5 path::Path,
6 time::Duration,
7 u64,
8};
9
10use ahash::{AHashMap, AHashSet};
11use byteview::StrView;
12use fjall::{Keyspace, Partition, PartitionCreateOptions};
13use itertools::{Either, Itertools};
14use rayon::iter::{IntoParallelIterator, ParallelIterator};
15use rclite::Arc;
16use rkyv::{Archive, Deserialize, Serialize, rancor::Error};
17use smol_str::{SmolStr, ToSmolStr};
18use tokio::sync::broadcast;
19use tokio_util::sync::CancellationToken;
20
21use crate::{
22 db::handle::{ItemDecoder, LexiconHandle},
23 error::{AppError, AppResult},
24 jetstream::JetstreamEvent,
25 utils::{CLOCK, RateTracker, ReadVariableExt, varints_unsigned_encoded},
26};
27
28mod block;
29mod handle;
30
31#[derive(Clone, Debug, Default, Archive, Deserialize, Serialize, PartialEq)]
32#[rkyv(compare(PartialEq), derive(Debug))]
33pub struct NsidCounts {
34 pub count: u128,
35 pub deleted_count: u128,
36 pub last_seen: u64,
37}
38
39#[derive(Debug, Default, Archive, Deserialize, Serialize, PartialEq)]
40#[rkyv(compare(PartialEq), derive(Debug))]
41pub struct NsidHit {
42 pub deleted: bool,
43}
44
45#[derive(Clone)]
46pub struct EventRecord {
47 pub nsid: SmolStr,
48 pub timestamp: u64, // seconds
49 pub deleted: bool,
50}
51
52impl EventRecord {
53 pub fn from_jetstream(event: JetstreamEvent) -> Option<Self> {
54 match event {
55 JetstreamEvent::Commit {
56 time_us, commit, ..
57 } => Some(Self {
58 nsid: commit.collection.into(),
59 timestamp: time_us / 1_000_000,
60 deleted: false,
61 }),
62 JetstreamEvent::Delete {
63 time_us, commit, ..
64 } => Some(Self {
65 nsid: commit.collection.into(),
66 timestamp: time_us / 1_000_000,
67 deleted: true,
68 }),
69 _ => None,
70 }
71 }
72}
73
74pub struct DbInfo {
75 pub nsids: AHashMap<SmolStr, Vec<usize>>,
76 pub disk_size: u64,
77}
78
79pub struct DbConfig {
80 pub ks_config: fjall::Config,
81 pub min_block_size: usize,
82 pub max_block_size: usize,
83 pub max_last_activity: Duration,
84}
85
86impl DbConfig {
87 pub fn path(mut self, path: impl AsRef<Path>) -> Self {
88 self.ks_config = fjall::Config::new(path);
89 self
90 }
91
92 pub fn ks(mut self, f: impl FnOnce(fjall::Config) -> fjall::Config) -> Self {
93 self.ks_config = f(self.ks_config);
94 self
95 }
96}
97
98impl Default for DbConfig {
99 fn default() -> Self {
100 Self {
101 ks_config: fjall::Config::default()
102 .cache_size(1024 * 1024 * 512)
103 .max_write_buffer_size(u64::MAX),
104 min_block_size: 1000,
105 max_block_size: 250_000,
106 max_last_activity: Duration::from_secs(10),
107 }
108 }
109}
110
111// counts is nsid -> NsidCounts
112// hits is tree per nsid: varint start time + varint end time -> block of hits
113pub struct Db {
114 pub cfg: DbConfig,
115 pub ks: Keyspace,
116 counts: Partition,
117 hits: scc::HashIndex<SmolStr, Arc<LexiconHandle>, ahash::RandomState>,
118 sync_pool: threadpool::ThreadPool,
119 event_broadcaster: broadcast::Sender<(SmolStr, NsidCounts)>,
120 eps: RateTracker<100>, // 100 millis buckets
121 cancel_token: CancellationToken,
122}
123
124impl Db {
125 pub fn new(cfg: DbConfig, cancel_token: CancellationToken) -> AppResult<Self> {
126 tracing::info!("opening db...");
127 let ks = cfg.ks_config.clone().open()?;
128 Ok(Self {
129 cfg,
130 hits: Default::default(),
131 sync_pool: threadpool::Builder::new()
132 .num_threads(rayon::current_num_threads() * 2)
133 .build(),
134 counts: ks.open_partition(
135 "_counts",
136 PartitionCreateOptions::default().compression(fjall::CompressionType::None),
137 )?,
138 ks,
139 event_broadcaster: broadcast::channel(1000).0,
140 eps: RateTracker::new(Duration::from_secs(1)),
141 cancel_token,
142 })
143 }
144
145 #[inline(always)]
146 pub fn shutting_down(&self) -> impl Future<Output = ()> {
147 self.cancel_token.cancelled()
148 }
149
150 #[inline(always)]
151 pub fn is_shutting_down(&self) -> bool {
152 self.cancel_token.is_cancelled()
153 }
154
155 #[inline(always)]
156 pub fn eps(&self) -> usize {
157 self.eps.rate() as usize
158 }
159
160 #[inline(always)]
161 pub fn new_listener(&self) -> broadcast::Receiver<(SmolStr, NsidCounts)> {
162 self.event_broadcaster.subscribe()
163 }
164
165 pub fn sync(&self, all: bool) -> AppResult<()> {
166 let start = CLOCK.now();
167 // prepare all the data
168 let nsids_len = self.hits.len();
169 let mut data = Vec::with_capacity(nsids_len);
170 let mut nsids = AHashSet::with_capacity(nsids_len);
171 let _guard = scc::ebr::Guard::new();
172 for (nsid, handle) in self.hits.iter(&_guard) {
173 let mut nsid_data = Vec::with_capacity(2);
174 // let mut total_count = 0;
175 let is_too_old = handle.since_last_activity() > self.cfg.max_last_activity;
176 // if we disconnect for a long time, we want to sync all of what we
177 // have to avoid having many small blocks (even if we run compaction
178 // later, it reduces work until we run compaction)
179 let block_size = (is_too_old || all)
180 .then_some(self.cfg.max_block_size)
181 .unwrap_or_else(|| {
182 self.cfg
183 .max_block_size
184 .min(self.cfg.min_block_size.max(handle.suggested_block_size()))
185 });
186 let count = handle.item_count();
187 let data_count = count / block_size;
188 if count > 0 && (all || data_count > 0 || is_too_old) {
189 for _ in 0..data_count {
190 nsid_data.push((handle.clone(), block_size));
191 // total_count += block_size;
192 }
193 // only sync remainder if we haven't met block size
194 let remainder = count % block_size;
195 if (all || data_count == 0) && remainder > 0 {
196 nsid_data.push((handle.clone(), remainder));
197 // total_count += remainder;
198 }
199 }
200 let _span = handle.span().entered();
201 if nsid_data.len() > 0 {
202 // tracing::info!(
203 // {blocks = %nsid_data.len(), count = %total_count},
204 // "will encode & sync",
205 // );
206 nsids.insert(nsid.clone());
207 data.push(nsid_data);
208 }
209 }
210 drop(_guard);
211
212 // process the blocks
213 data.into_par_iter()
214 .map(|chunk| {
215 chunk
216 .into_iter()
217 .map(|(handle, max_block_size)| {
218 (handle.take_block_items(max_block_size), handle)
219 })
220 .collect::<Vec<_>>()
221 .into_par_iter()
222 .map(|(items, handle)| {
223 let count = items.len();
224 let block = LexiconHandle::encode_block_from_items(items, count)?;
225 AppResult::Ok((block, handle))
226 })
227 .collect::<Result<Vec<_>, _>>()
228 })
229 .try_for_each(|chunk| {
230 let chunk = chunk?;
231 for (block, handle) in chunk {
232 self.sync_pool.execute(move || {
233 let _span = handle.span().entered();
234 let written = block.written;
235 match handle.insert_block(block) {
236 Ok(_) => {
237 tracing::info!({count = %written}, "synced")
238 }
239 Err(err) => tracing::error!({ err = %err }, "failed to sync block"),
240 }
241 });
242 }
243 AppResult::Ok(())
244 })?;
245 self.sync_pool.join();
246
247 // update snapshots for all (changed) handles
248 for nsid in nsids {
249 self.hits.peek_with(&nsid, |_, handle| handle.update_tree());
250 }
251
252 tracing::info!(time = %start.elapsed().as_secs_f64(), "synced all blocks");
253
254 Ok(())
255 }
256
257 pub fn compact(
258 &self,
259 nsid: impl AsRef<str>,
260 max_count: usize,
261 range: impl RangeBounds<u64>,
262 sort: bool,
263 ) -> AppResult<()> {
264 let Some(handle) = self.get_handle(nsid) else {
265 return Ok(());
266 };
267 handle.compact(max_count, range, sort)?;
268 handle.update_tree();
269 Ok(())
270 }
271
272 pub fn compact_all(
273 &self,
274 max_count: usize,
275 range: impl RangeBounds<u64> + Clone,
276 sort: bool,
277 ) -> AppResult<()> {
278 for nsid in self.get_nsids() {
279 self.compact(nsid, max_count, range.clone(), sort)?;
280 }
281 Ok(())
282 }
283
284 pub fn major_compact(&self) -> AppResult<()> {
285 self.compact_all(self.cfg.max_block_size, .., true)?;
286 Ok(())
287 }
288
289 #[inline(always)]
290 fn get_handle(&self, nsid: impl AsRef<str>) -> Option<Arc<LexiconHandle>> {
291 let _guard = scc::ebr::Guard::new();
292 let handle = match self.hits.peek(nsid.as_ref(), &_guard) {
293 Some(handle) => handle.clone(),
294 None => {
295 if self.ks.partition_exists(nsid.as_ref()) {
296 let handle = Arc::new(LexiconHandle::new(&self.ks, nsid.as_ref()));
297 let _ = self.hits.insert(SmolStr::new(nsid), handle.clone());
298 handle
299 } else {
300 return None;
301 }
302 }
303 };
304 Some(handle)
305 }
306
307 #[inline(always)]
308 fn ensure_handle(&self, nsid: &SmolStr) -> impl Deref<Target = Arc<LexiconHandle>> + use<'_> {
309 self.hits
310 .entry(nsid.clone())
311 .or_insert_with(|| Arc::new(LexiconHandle::new(&self.ks, &nsid)))
312 }
313
314 pub fn ingest_events(&self, events: impl Iterator<Item = EventRecord>) -> AppResult<()> {
315 let mut seen_events = 0;
316 for (key, chunk) in events.chunk_by(|event| event.nsid.clone()).into_iter() {
317 let mut counts = self.get_count(&key)?;
318 self.ensure_handle(&key).queue(chunk.inspect(|e| {
319 // increment count
320 counts.last_seen = e.timestamp;
321 if e.deleted {
322 counts.deleted_count += 1;
323 } else {
324 counts.count += 1;
325 }
326 seen_events += 1;
327 }));
328 self.insert_count(&key, &counts)?;
329 if self.event_broadcaster.receiver_count() > 0 {
330 let _ = self.event_broadcaster.send((key, counts));
331 }
332 }
333 self.eps.observe(seen_events);
334 Ok(())
335 }
336
337 #[inline(always)]
338 fn insert_count(&self, nsid: &str, counts: &NsidCounts) -> AppResult<()> {
339 self.counts
340 .insert(
341 nsid,
342 unsafe { rkyv::to_bytes::<Error>(counts).unwrap_unchecked() }.as_slice(),
343 )
344 .map_err(AppError::from)
345 }
346
347 pub fn get_count(&self, nsid: &str) -> AppResult<NsidCounts> {
348 let Some(raw) = self.counts.get(nsid)? else {
349 return Ok(NsidCounts::default());
350 };
351 Ok(unsafe { rkyv::from_bytes_unchecked::<_, Error>(&raw).unwrap_unchecked() })
352 }
353
354 pub fn get_counts(&self) -> impl Iterator<Item = AppResult<(SmolStr, NsidCounts)>> {
355 self.counts.iter().map(|res| {
356 res.map_err(AppError::from).map(|(key, val)| {
357 (
358 SmolStr::new(unsafe { str::from_utf8_unchecked(&key) }),
359 unsafe { rkyv::from_bytes_unchecked::<_, Error>(&val).unwrap_unchecked() },
360 )
361 })
362 })
363 }
364
365 pub fn get_nsids(&self) -> impl Iterator<Item = StrView> {
366 self.ks
367 .list_partitions()
368 .into_iter()
369 .filter(|k| k.deref() != "_counts")
370 }
371
372 pub fn info(&self) -> AppResult<DbInfo> {
373 let mut nsids = AHashMap::new();
374 for nsid in self.get_nsids() {
375 let Some(handle) = self.get_handle(&nsid) else {
376 continue;
377 };
378 let block_lens = handle
379 .read()
380 .iter()
381 .rev()
382 .try_fold(Vec::new(), |mut acc, item| {
383 let (key, value) = item?;
384 let mut timestamps = Cursor::new(key);
385 let start_timestamp = timestamps.read_varint()?;
386 let decoder = ItemDecoder::new(Cursor::new(value), start_timestamp)?;
387 acc.push(decoder.item_count());
388 AppResult::Ok(acc)
389 })?;
390 nsids.insert(nsid.to_smolstr(), block_lens);
391 }
392 Ok(DbInfo {
393 nsids,
394 disk_size: self.ks.disk_space(),
395 })
396 }
397
398 pub fn get_hits(
399 &self,
400 nsid: &str,
401 range: impl RangeBounds<u64> + std::fmt::Debug,
402 max_items: usize,
403 ) -> impl Iterator<Item = AppResult<handle::Item>> {
404 let start_limit = match range.start_bound().cloned() {
405 Bound::Included(start) => start,
406 Bound::Excluded(start) => start.saturating_add(1),
407 Bound::Unbounded => 0,
408 };
409 let end_limit = match range.end_bound().cloned() {
410 Bound::Included(end) => end,
411 Bound::Excluded(end) => end.saturating_sub(1),
412 Bound::Unbounded => u64::MAX,
413 };
414 let end_key = varints_unsigned_encoded([end_limit]);
415
416 let Some(handle) = self.get_handle(nsid) else {
417 return Either::Right(std::iter::empty());
418 };
419
420 // let mut ts = CLOCK.now();
421 let map_block = move |(res, current_item_count)| -> AppResult<(Option<_>, usize)> {
422 if current_item_count >= max_items {
423 return Ok((None, current_item_count));
424 }
425 let (key, val) = res?;
426 let mut key_reader = Cursor::new(key);
427 let start_timestamp = key_reader.read_varint::<u64>()?;
428 // let end_timestamp = key_reader.read_varint::<u64>()?;
429 if start_timestamp < start_limit {
430 // tracing::info!(
431 // "stopped at block with timestamps {start_timestamp}..{end_timestamp} because {start_limit} is greater"
432 // );
433 return Ok((None, current_item_count));
434 }
435 let decoder = handle::ItemDecoder::new(Cursor::new(val), start_timestamp)?;
436 let current_item_count = current_item_count + decoder.item_count();
437 // tracing::info!(
438 // "took {}ns to get block with size {}",
439 // ts.elapsed().as_nanos(),
440 // decoder.item_count()
441 // );
442 // ts = CLOCK.now();
443 Ok((
444 Some(
445 decoder
446 .take_while(move |item| {
447 item.as_ref().map_or(true, |item| {
448 item.timestamp <= end_limit && item.timestamp >= start_limit
449 })
450 })
451 .map(|res| res.map_err(AppError::from)),
452 ),
453 current_item_count,
454 ))
455 };
456
457 let (blocks, _counted) = handle
458 .read()
459 .range(..end_key)
460 .map(|res| res.map_err(AppError::from))
461 .rev()
462 .fold_while(
463 (Vec::with_capacity(20), 0),
464 |(mut blocks, current_item_count), res| {
465 use itertools::FoldWhile::*;
466
467 match map_block((res, current_item_count)) {
468 Ok((Some(block), current_item_count)) => {
469 blocks.push(Ok(block));
470 Continue((blocks, current_item_count))
471 }
472 Ok((None, current_item_count)) => Done((blocks, current_item_count)),
473 Err(err) => {
474 blocks.push(Err(err));
475 Done((blocks, current_item_count))
476 }
477 }
478 },
479 )
480 .into_inner();
481
482 // tracing::info!(
483 // "got blocks with size {}, item count {counted}",
484 // blocks.len()
485 // );
486
487 Either::Left(blocks.into_iter().rev().flatten().flatten())
488 }
489
490 pub fn tracking_since(&self) -> AppResult<u64> {
491 // HACK: we should actually store when we started tracking but im lazy
492 // this should be accurate enough
493 let Some(handle) = self.get_handle("app.bsky.feed.like") else {
494 return Ok(0);
495 };
496 let Some((timestamps_raw, _)) = handle.read().first_key_value()? else {
497 return Ok(0);
498 };
499 let mut timestamp_reader = Cursor::new(timestamps_raw);
500 timestamp_reader
501 .read_varint::<u64>()
502 .map_err(AppError::from)
503 }
504}