tracks lexicons and how many times they appeared on the jetstream

fix(server): implement max items on get_hits so we dont read every block if requested

ptr.pet f4563532 dc9df83a

verified
Changed files
+14 -6
server
+1 -1
server/src/api.rs
···
let from = params.to.map(Bound::Included).unwrap_or(Bound::Unbounded);
let to = params.from.map(Bound::Included).unwrap_or(Bound::Unbounded);
let maybe_hits = db
-
.get_hits(&params.nsid, HitsRange { from, to })
.take(MAX_HITS);
let mut hits = Vec::with_capacity(maybe_hits.size_hint().0);
···
let from = params.to.map(Bound::Included).unwrap_or(Bound::Unbounded);
let to = params.from.map(Bound::Included).unwrap_or(Bound::Unbounded);
let maybe_hits = db
+
.get_hits(&params.nsid, HitsRange { from, to }, MAX_HITS)
.take(MAX_HITS);
let mut hits = Vec::with_capacity(maybe_hits.size_hint().0);
+7 -3
server/src/db/mod.rs
···
&self,
nsid: &str,
range: impl RangeBounds<u64> + std::fmt::Debug,
) -> impl Iterator<Item = AppResult<handle::Item>> {
let start_limit = match range.start_bound().cloned() {
Bound::Included(start) => start,
···
};
// let mut ts = CLOCK.now();
let map_block = move |(key, val)| {
let mut key_reader = Cursor::new(key);
let start_timestamp = key_reader.read_varint::<u64>()?;
// let end_timestamp = key_reader.read_varint::<u64>()?;
if start_timestamp < start_limit {
// tracing::info!(
-
// "skipped block with timestamps {start_timestamp}..{end_timestamp} because {start_limit} is greater"
// );
return Ok(None);
-
} else {
-
// tracing::info!("using block with timestamp {start_timestamp}..{end_timestamp}");
}
let decoder = handle::ItemDecoder::new(Cursor::new(val), start_timestamp)?;
// tracing::info!(
// "took {}ns to get block with size {}",
// ts.elapsed().as_nanos(),
···
&self,
nsid: &str,
range: impl RangeBounds<u64> + std::fmt::Debug,
+
max_items: usize,
) -> impl Iterator<Item = AppResult<handle::Item>> {
let start_limit = match range.start_bound().cloned() {
Bound::Included(start) => start,
···
};
// let mut ts = CLOCK.now();
+
let mut current_item_count = 0;
let map_block = move |(key, val)| {
let mut key_reader = Cursor::new(key);
let start_timestamp = key_reader.read_varint::<u64>()?;
// let end_timestamp = key_reader.read_varint::<u64>()?;
if start_timestamp < start_limit {
// tracing::info!(
+
// "stopped at block with timestamps {start_timestamp}..{end_timestamp} because {start_limit} is greater"
// );
return Ok(None);
}
let decoder = handle::ItemDecoder::new(Cursor::new(val), start_timestamp)?;
+
current_item_count += decoder.item_count();
+
if current_item_count > max_items {
+
return Ok(None);
+
}
// tracing::info!(
// "took {}ns to get block with size {}",
// ts.elapsed().as_nanos(),
+6 -2
server/src/main.rs
···
-
use std::{ops::Deref, time::Duration, u64};
use itertools::Itertools;
use rclite::Arc;
···
threads.push(std::thread::spawn(move || {
tracing::info!("{}: migrating...", nsid.deref());
let mut count = 0_u64;
-
for hits in from.get_hits(&nsid, ..).chunks(100000).into_iter() {
to.ingest_events(hits.map(|hit| {
count += 1;
let hit = hit.expect("cant decode hit");
···
+
use std::{ops::Deref, time::Duration, u64, usize};
use itertools::Itertools;
use rclite::Arc;
···
threads.push(std::thread::spawn(move || {
tracing::info!("{}: migrating...", nsid.deref());
let mut count = 0_u64;
+
for hits in from
+
.get_hits(&nsid, .., usize::MAX)
+
.chunks(100000)
+
.into_iter()
+
{
to.ingest_events(hits.map(|hit| {
count += 1;
let hit = hit.expect("cant decode hit");