tracks lexicons and how many times they appeared on the jetstream

refactor(server): implement WriteableByteView so we don't have to copy from a Vec while constructing fjall::Slice

ptr.pet a820ae13 925dbc95

verified
Changed files
+75 -17
server
+1
server/Cargo.lock
···
"async-trait",
"axum",
"axum-tws",
+
"byteview",
"fjall",
"futures-util",
"itertools",
+1
server/Cargo.toml
···
snmalloc-rs = "0.3.8"
quanta = "0.12.6"
itertools = "0.14.0"
+
byteview = "0.6.1"
+8 -8
server/src/db/block.rs
···
}
}
-
pub fn encode(&mut self, item: &Item<T>) -> AppResult<()> {
+
pub fn encode(&mut self, item: &Item<T>) -> io::Result<()> {
if self.prev_timestamp == 0 {
// self.writer.write_varint(item.timestamp)?;
self.prev_timestamp = item.timestamp;
···
Ok(())
}
-
fn write_data(&mut self, data: &[u8]) -> AppResult<()> {
+
fn write_data(&mut self, data: &[u8]) -> io::Result<()> {
self.writer.write_varint(data.len())?;
self.writer.write_all(data)?;
Ok(())
}
-
pub fn finish(mut self) -> AppResult<W> {
+
pub fn finish(mut self) -> io::Result<W> {
self.writer.flush()?;
Ok(self.writer)
}
···
}
impl<R: Read, T: Archive> ItemDecoder<R, T> {
-
pub fn new(reader: R, start_timestamp: u64) -> AppResult<Self> {
+
pub fn new(reader: R, start_timestamp: u64) -> io::Result<Self> {
Ok(ItemDecoder {
reader,
current_timestamp: start_timestamp,
···
})
}
-
pub fn decode(&mut self) -> AppResult<Option<Item<T>>> {
+
pub fn decode(&mut self) -> io::Result<Option<Item<T>>> {
if self.first_item {
// read the first timestamp
// let timestamp = match self.reader.read_varint::<u64>() {
···
}
// [10, 11, 12, 14] -> [1, 1, 2] -> [0, 1]
-
fn read_timestamp(&mut self) -> AppResult<Option<u64>> {
+
fn read_timestamp(&mut self) -> io::Result<Option<u64>> {
let delta = match self.reader.read_varint::<i64>() {
Ok(delta) => delta,
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
···
Ok(Some(self.current_timestamp))
}
-
fn read_item(&mut self) -> AppResult<Option<AlignedVec>> {
+
fn read_item(&mut self) -> io::Result<Option<AlignedVec>> {
let data_len = match self.reader.read_varint::<usize>() {
Ok(data_len) => data_len,
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
···
}
impl<R: Read, T: Archive> Iterator for ItemDecoder<R, T> {
-
type Item = AppResult<Item<T>>;
+
type Item = io::Result<Item<T>>;
fn next(&mut self) -> Option<Self::Item> {
self.decode().transpose()
+65 -9
server/src/db/mod.rs
···
use std::{
-
io::Cursor,
+
io::{self, Cursor, Write},
+
marker::PhantomData,
ops::{Bound, Deref, RangeBounds},
path::Path,
sync::{
···
time::Duration,
};
+
use byteview::ByteView;
use fjall::{Config, Keyspace, Partition, PartitionCreateOptions, Slice};
use itertools::{Either, Itertools};
use ordered_varint::Variable;
use rkyv::{Archive, Deserialize, Serialize, rancor::Error};
use smol_str::SmolStr;
use tokio::sync::broadcast;
+
use tokio_util::bytes::{self, BufMut};
use crate::{
db::block::{ReadVariableExt, WriteVariableExt},
···
}
type ItemDecoder = block::ItemDecoder<Cursor<Slice>, NsidHit>;
-
type ItemEncoder = block::ItemEncoder<Vec<u8>, NsidHit>;
+
type ItemEncoder = block::ItemEncoder<WritableByteView, NsidHit>;
type Item = block::Item<NsidHit>;
+
struct WritableByteView {
+
view: ByteView,
+
written: usize,
+
}
+
+
impl WritableByteView {
+
// returns None if the view already has a reference to it
+
fn with_size(capacity: usize) -> Self {
+
Self {
+
view: ByteView::with_size(capacity),
+
written: 0,
+
}
+
}
+
+
#[inline(always)]
+
fn into_inner(self) -> ByteView {
+
self.view
+
}
+
}
+
+
impl Write for WritableByteView {
+
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
+
let len = buf.len();
+
if len > self.view.len() - self.written {
+
return Err(std::io::Error::new(
+
std::io::ErrorKind::StorageFull,
+
"buffer full",
+
));
+
}
+
// SAFETY: this is safe because we have checked that the buffer is not full
+
// SAFETY: we own the mutator so no other references to the view exist
+
unsafe {
+
std::ptr::copy_nonoverlapping(
+
buf.as_ptr(),
+
self.view
+
.get_mut()
+
.unwrap_unchecked()
+
.as_mut_ptr()
+
.add(self.written),
+
len,
+
);
+
self.written += len;
+
}
+
Ok(len)
+
}
+
+
#[inline(always)]
+
fn flush(&mut self) -> std::io::Result<()> {
+
Ok(())
+
}
+
}
+
pub struct LexiconHandle {
tree: Partition,
buf: Arc<scc::Queue<EventRecord>>,
···
}
fn sync(&self, max_block_size: usize) -> AppResult<usize> {
-
let mut writer = ItemEncoder::new(Vec::with_capacity(
-
size_of::<u64>() + self.item_count().min(max_block_size) * size_of::<(u64, NsidHit)>(),
-
));
+
let buf_size =
+
size_of::<u64>() + self.item_count().min(max_block_size) * size_of::<(u64, NsidHit)>();
+
let mut writer = ItemEncoder::new(WritableByteView::with_size(buf_size));
let mut start_timestamp = None;
let mut end_timestamp = None;
let mut written = 0_usize;
···
let mut key = Vec::with_capacity(size_of::<u64>() * 2);
key.write_varint(start_timestamp)?;
key.write_varint(end_timestamp)?;
-
self.tree.insert(key, value)?;
+
self.tree.insert(key, value.into_inner())?;
}
Ok(written)
}
···
let map_block = move |(key, val)| {
let mut key_reader = Cursor::new(key);
let start_timestamp = key_reader.read_varint::<u64>()?;
-
let items =
-
ItemDecoder::new(Cursor::new(val), start_timestamp)?.take_while(move |item| {
+
let items = ItemDecoder::new(Cursor::new(val), start_timestamp)?
+
.take_while(move |item| {
item.as_ref().map_or(true, |item| item.timestamp <= limit)
-
});
+
})
+
.map(|res| res.map_err(AppError::from));
Ok(items)
};