tracks lexicons and how many times they appeared on the jetstream

fix(server): max_items wasnt working in get_hits

ptr.pet a050efab bd2613ce

verified
Changed files
+37 -16
server
src
db
+37 -16
server/src/db/mod.rs
···
};
// let mut ts = CLOCK.now();
-
let mut current_item_count = 0;
-
let map_block = move |(key, val)| {
if current_item_count >= max_items {
-
return Ok(None);
}
let mut key_reader = Cursor::new(key);
let start_timestamp = key_reader.read_varint::<u64>()?;
// let end_timestamp = key_reader.read_varint::<u64>()?;
···
// tracing::info!(
// "stopped at block with timestamps {start_timestamp}..{end_timestamp} because {start_limit} is greater"
// );
-
return Ok(None);
}
let decoder = handle::ItemDecoder::new(Cursor::new(val), start_timestamp)?;
-
current_item_count += decoder.item_count();
// tracing::info!(
// "took {}ns to get block with size {}",
// ts.elapsed().as_nanos(),
// decoder.item_count()
// );
// ts = CLOCK.now();
-
Ok(Some(
-
decoder
-
.take_while(move |item| {
-
item.as_ref().map_or(true, |item| {
-
item.timestamp <= end_limit && item.timestamp >= start_limit
})
-
})
-
.map(|res| res.map_err(AppError::from)),
))
};
-
let blocks = handle
.range(..end_key)
.rev()
-
.map_while(move |res| res.map_err(AppError::from).and_then(map_block).transpose())
-
.collect_vec();
tracing::info!(
-
"got blocks with size {}, item count {current_item_count}",
blocks.len()
);
···
};
// let mut ts = CLOCK.now();
+
let map_block = move |(res, current_item_count)| -> AppResult<(Option<_>, usize)> {
if current_item_count >= max_items {
+
return Ok((None, current_item_count));
}
+
let (key, val) = res?;
let mut key_reader = Cursor::new(key);
let start_timestamp = key_reader.read_varint::<u64>()?;
// let end_timestamp = key_reader.read_varint::<u64>()?;
···
// tracing::info!(
// "stopped at block with timestamps {start_timestamp}..{end_timestamp} because {start_limit} is greater"
// );
+
return Ok((None, current_item_count));
}
let decoder = handle::ItemDecoder::new(Cursor::new(val), start_timestamp)?;
+
let current_item_count = current_item_count + decoder.item_count();
// tracing::info!(
// "took {}ns to get block with size {}",
// ts.elapsed().as_nanos(),
// decoder.item_count()
// );
// ts = CLOCK.now();
+
Ok((
+
Some(
+
decoder
+
.take_while(move |item| {
+
item.as_ref().map_or(true, |item| {
+
item.timestamp <= end_limit && item.timestamp >= start_limit
+
})
})
+
.map(|res| res.map_err(AppError::from)),
+
),
+
current_item_count,
))
};
+
let (blocks, counted) = handle
.range(..end_key)
+
.map(|res| res.map_err(AppError::from))
.rev()
+
.fold_while(
+
(Vec::with_capacity(20), 0),
+
|(mut blocks, current_item_count), res| {
+
use itertools::FoldWhile::*;
+
+
match map_block((res, current_item_count)) {
+
Ok((Some(block), current_item_count)) => {
+
blocks.push(Ok(block));
+
Continue((blocks, current_item_count))
+
}
+
Ok((None, current_item_count)) => Done((blocks, current_item_count)),
+
Err(err) => {
+
blocks.push(Err(err));
+
Done((blocks, current_item_count))
+
}
+
}
+
},
+
)
+
.into_inner();
tracing::info!(
+
"got blocks with size {}, item count {counted}",
blocks.len()
);