tracks lexicons and how many times they appeared on the jetstream

feat(server): zstd compression

ptr.pet 4174678b 6001b7f8

verified
Changed files
+178 -51
server
+1
server/.gitignore
···
target
.fjall_data*
···
target
.fjall_data*
+
zstd_dict
+5 -1
server/Cargo.toml
···
version = "0.1.0"
edition = "2024"
[dependencies]
anyhow = "1.0"
async-trait = "0.1"
···
rayon = "1.10.0"
parking_lot = { version = "0.12", features = ["send_guard", "hardware-lock-elision"] }
rclite = "0.2.7"
-
zstd = "0.13.3"
[target.'cfg(target_env = "msvc")'.dependencies]
snmalloc-rs = "0.3.8"
···
version = "0.1.0"
edition = "2024"
+
[features]
+
default = ["compress"]
+
compress = ["dep:zstd"]
+
[dependencies]
anyhow = "1.0"
async-trait = "0.1"
···
rayon = "1.10.0"
parking_lot = { version = "0.12", features = ["send_guard", "hardware-lock-elision"] }
rclite = "0.2.7"
+
zstd = { version = "0.13.3", optional = true, features = ["experimental"] }
[target.'cfg(target_env = "msvc")'.dependencies]
snmalloc-rs = "0.3.8"
+113 -16
server/src/db/handle.rs
···
use rclite::Arc;
use smol_str::SmolStr;
use crate::{
db::{EventRecord, NsidHit, block},
error::AppResult,
utils::{CLOCK, DefaultRateTracker, RateTracker, ReadVariableExt, varints_unsigned_encoded},
};
-
pub type ItemDecoder = block::ItemDecoder<Cursor<Slice>, NsidHit>;
-
pub type ItemEncoder = block::ItemEncoder<Vec<u8>, NsidHit>;
pub type Item = block::Item<NsidHit>;
pub struct Block {
pub written: usize,
pub key: ByteView,
···
buf: Arc<Mutex<Vec<EventRecord>>>,
last_insert: AtomicU64, // relaxed
eps: DefaultRateTracker,
}
impl Debug for LexiconHandle {
···
}
impl LexiconHandle {
-
pub fn new(keyspace: &Keyspace, nsid: &str) -> Self {
let opts = PartitionCreateOptions::default()
.block_size(1024 * 128)
-
.compression(fjall::CompressionType::Miniz(9));
Self {
tree: keyspace.open_partition(nsid, opts).unwrap(),
nsid: nsid.into(),
buf: Default::default(),
last_insert: AtomicU64::new(0),
eps: RateTracker::new(Duration::from_secs(10)),
}
}
pub fn nsid(&self) -> &SmolStr {
&self.nsid
}
···
}
let start_blocks_size = blocks_to_compact.len();
-
let keys_to_delete = blocks_to_compact.iter().map(|(key, _)| key);
let mut all_items =
blocks_to_compact
-
.iter()
.try_fold(Vec::new(), |mut acc, (key, value)| {
-
let mut timestamps = Cursor::new(key);
-
let start_timestamp = timestamps.read_varint()?;
-
let decoder = block::ItemDecoder::new(Cursor::new(value), start_timestamp)?;
let mut items = decoder.collect::<Result<Vec<_>, _>>()?;
acc.append(&mut items);
AppResult::Ok(acc)
···
.into_par_iter()
.map(|chunk| {
let count = chunk.len();
-
Self::encode_block_from_items(chunk, count)
})
.collect::<Result<Vec<_>, _>>()?;
let end_blocks_size = new_blocks.len();
···
}
pub fn encode_block_from_items(
items: impl IntoIterator<Item = Item>,
count: usize,
) -> AppResult<Block> {
···
.into());
}
if let (Some(start_timestamp), Some(end_timestamp)) = (start_timestamp, end_timestamp) {
-
let value = writer.finish()?;
let key = varints_unsigned_encoded([start_timestamp, end_timestamp]);
-
return Ok(Block {
-
written,
-
key,
-
data: value,
-
});
}
Err(std::io::Error::new(std::io::ErrorKind::WriteZero, "no items are in queue").into())
}
···
)
})
.collect()
}
}
···
use rclite::Arc;
use smol_str::SmolStr;
+
#[cfg(feature = "compress")]
+
use zstd::bulk::{Compressor as ZstdCompressor, Decompressor as ZstdDecompressor};
+
use crate::{
db::{EventRecord, NsidHit, block},
error::AppResult,
utils::{CLOCK, DefaultRateTracker, RateTracker, ReadVariableExt, varints_unsigned_encoded},
};
+
#[cfg(feature = "compress")]
+
thread_local! {
+
static COMPRESSOR: std::cell::RefCell<Option<ZstdCompressor<'static>>> = std::cell::RefCell::new(None);
+
static DECOMPRESSOR: std::cell::RefCell<Option<ZstdDecompressor<'static>>> = std::cell::RefCell::new(None);
+
}
+
+
type ItemDecoder = block::ItemDecoder<Cursor<Vec<u8>>, NsidHit>;
+
type ItemEncoder = block::ItemEncoder<Vec<u8>, NsidHit>;
pub type Item = block::Item<NsidHit>;
+
#[derive(Clone)]
+
pub enum Compression {
+
None,
+
#[cfg(feature = "compress")]
+
Zstd(ByteView),
+
}
+
+
impl Compression {
+
#[cfg(feature = "compress")]
+
fn get_dict(&self) -> Option<&ByteView> {
+
match self {
+
Compression::None => None,
+
Compression::Zstd(dict) => Some(dict),
+
}
+
}
+
}
+
pub struct Block {
pub written: usize,
pub key: ByteView,
···
buf: Arc<Mutex<Vec<EventRecord>>>,
last_insert: AtomicU64, // relaxed
eps: DefaultRateTracker,
+
compress: Compression,
}
impl Debug for LexiconHandle {
···
}
impl LexiconHandle {
+
pub fn new(keyspace: &Keyspace, nsid: &str, compress: Compression) -> Self {
let opts = PartitionCreateOptions::default()
.block_size(1024 * 128)
+
.compression(fjall::CompressionType::Lz4);
Self {
tree: keyspace.open_partition(nsid, opts).unwrap(),
nsid: nsid.into(),
buf: Default::default(),
last_insert: AtomicU64::new(0),
eps: RateTracker::new(Duration::from_secs(10)),
+
compress,
}
}
+
#[cfg(feature = "compress")]
+
fn with_compressor<T>(&self, mut f: impl FnMut(&mut ZstdCompressor<'static>) -> T) -> T {
+
COMPRESSOR.with_borrow_mut(|compressor| {
+
if compressor.is_none() {
+
*compressor = Some({
+
let mut c = ZstdCompressor::new(9).expect("cant construct zstd compressor");
+
c.include_checksum(false).unwrap();
+
if let Some(dict) = self.compress.get_dict() {
+
c.set_dictionary(9, dict).expect("cant set dict");
+
}
+
c
+
});
+
}
+
// SAFETY: this is safe because we just initialized the compressor
+
f(unsafe { compressor.as_mut().unwrap_unchecked() })
+
})
+
}
+
+
#[cfg(feature = "compress")]
+
pub fn compress(&self, data: impl AsRef<[u8]>) -> std::io::Result<Vec<u8>> {
+
self.with_compressor(|compressor| compressor.compress(data.as_ref()))
+
}
+
+
#[cfg(feature = "compress")]
+
fn with_decompressor<T>(&self, mut f: impl FnMut(&mut ZstdDecompressor<'static>) -> T) -> T {
+
DECOMPRESSOR.with_borrow_mut(|decompressor| {
+
if decompressor.is_none() {
+
*decompressor = Some({
+
let mut d = ZstdDecompressor::new().expect("cant construct zstd decompressor");
+
if let Some(dict) = self.compress.get_dict() {
+
d.set_dictionary(dict).expect("cant set dict");
+
}
+
d
+
});
+
}
+
// SAFETY: this is safe because we just initialized the decompressor
+
f(unsafe { decompressor.as_mut().unwrap_unchecked() })
+
})
+
}
+
+
#[cfg(feature = "compress")]
+
pub fn decompress(&self, data: impl AsRef<[u8]>) -> std::io::Result<Vec<u8>> {
+
self.with_decompressor(|decompressor| {
+
decompressor.decompress(data.as_ref(), 1024 * 1024 * 20)
+
})
+
}
+
pub fn nsid(&self) -> &SmolStr {
&self.nsid
}
···
}
let start_blocks_size = blocks_to_compact.len();
+
let keys_to_delete = blocks_to_compact
+
.iter()
+
.map(|(key, _)| key)
+
.cloned()
+
.collect_vec();
let mut all_items =
blocks_to_compact
+
.into_iter()
.try_fold(Vec::new(), |mut acc, (key, value)| {
+
let decoder = self.get_decoder_for(key, value)?;
let mut items = decoder.collect::<Result<Vec<_>, _>>()?;
acc.append(&mut items);
AppResult::Ok(acc)
···
.into_par_iter()
.map(|chunk| {
let count = chunk.len();
+
self.encode_block_from_items(chunk, count)
})
.collect::<Result<Vec<_>, _>>()?;
let end_blocks_size = new_blocks.len();
···
}
pub fn encode_block_from_items(
+
&self,
items: impl IntoIterator<Item = Item>,
count: usize,
) -> AppResult<Block> {
···
.into());
}
if let (Some(start_timestamp), Some(end_timestamp)) = (start_timestamp, end_timestamp) {
+
let data = self.put_raw_block(writer.finish()?)?;
let key = varints_unsigned_encoded([start_timestamp, end_timestamp]);
+
return Ok(Block { written, key, data });
}
Err(std::io::Error::new(std::io::ErrorKind::WriteZero, "no items are in queue").into())
}
···
)
})
.collect()
+
}
+
+
pub fn get_raw_block(&self, value: Slice) -> std::io::Result<Vec<u8>> {
+
match &self.compress {
+
Compression::None => Ok(value.as_ref().into()),
+
#[cfg(feature = "compress")]
+
Compression::Zstd(_) => self.decompress(value),
+
}
+
}
+
+
pub fn put_raw_block(&self, value: Vec<u8>) -> std::io::Result<Vec<u8>> {
+
match &self.compress {
+
Compression::None => Ok(value),
+
#[cfg(feature = "compress")]
+
Compression::Zstd(_) => self.compress(value),
+
}
+
}
+
+
pub fn get_decoder_for(&self, key: Slice, value: Slice) -> AppResult<ItemDecoder> {
+
let mut timestamps = Cursor::new(key);
+
let start_timestamp = timestamps.read_varint()?;
+
let decoder = ItemDecoder::new(Cursor::new(self.get_raw_block(value)?), start_timestamp)?;
+
Ok(decoder)
}
}
+48 -23
server/src/db/mod.rs
···
fmt::Debug,
io::Cursor,
ops::{Bound, Deref, RangeBounds},
-
path::{Path, PathBuf},
time::Duration,
};
use byteview::StrView;
-
use fjall::{Config, Keyspace, Partition, PartitionCreateOptions};
use itertools::{Either, Itertools};
-
use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
use rclite::Arc;
use rkyv::{Archive, Deserialize, Serialize, rancor::Error};
use smol_str::{SmolStr, ToSmolStr};
···
use tokio_util::sync::CancellationToken;
use crate::{
-
db::handle::{ItemDecoder, LexiconHandle},
error::{AppError, AppResult},
jetstream::JetstreamEvent,
utils::{RateTracker, ReadVariableExt, varints_unsigned_encoded},
···
pub struct DbConfig {
pub ks_config: fjall::Config,
pub min_block_size: usize,
pub max_block_size: usize,
pub max_last_activity: u64,
···
fn default() -> Self {
Self {
ks_config: fjall::Config::default(),
min_block_size: 512,
max_block_size: 500_000,
max_last_activity: Duration::from_secs(10).as_nanos() as u64,
···
event_broadcaster: broadcast::Sender<(SmolStr, NsidCounts)>,
eps: RateTracker<100>,
cancel_token: CancellationToken,
}
impl Db {
pub fn new(cfg: DbConfig, cancel_token: CancellationToken) -> AppResult<Self> {
tracing::info!("opening db...");
let ks = cfg.ks_config.clone().open()?;
Ok(Self {
cfg,
hits: Default::default(),
···
event_broadcaster: broadcast::channel(1000).0,
eps: RateTracker::new(Duration::from_secs(1)),
cancel_token,
})
}
···
.into_par_iter()
.map(|(i, items, handle)| {
let count = items.len();
-
let block = LexiconHandle::encode_block_from_items(items, count)?;
tracing::info!(
"{}: encoded block with {} items",
handle.nsid(),
···
Some(handle) => handle.clone(),
None => {
if self.ks.partition_exists(nsid.as_ref()) {
-
let handle = Arc::new(LexiconHandle::new(&self.ks, nsid.as_ref()));
let _ = self.hits.insert(SmolStr::new(nsid), handle.clone());
handle
} else {
···
#[inline(always)]
fn ensure_handle(&self, nsid: &SmolStr) -> impl Deref<Target = Arc<LexiconHandle>> + use<'_> {
-
self.hits
-
.entry(nsid.clone())
-
.or_insert_with(|| Arc::new(LexiconHandle::new(&self.ks, &nsid)))
}
pub fn ingest_events(&self, events: impl Iterator<Item = EventRecord>) -> AppResult<()> {
···
};
let block_lens = handle.iter().rev().try_fold(Vec::new(), |mut acc, item| {
let (key, value) = item?;
-
let mut timestamps = Cursor::new(key);
-
let start_timestamp = timestamps.read_varint()?;
-
let decoder = ItemDecoder::new(Cursor::new(value), start_timestamp)?;
acc.push(decoder.item_count());
AppResult::Ok(acc)
})?;
···
})
}
-
// train zstd dict with 100 blocks from every lexicon
pub fn train_zstd_dict(&self) -> AppResult<Vec<u8>> {
let samples = self
.get_nsids()
···
.map(|handle| {
handle
.iter()
-
.rev()
-
.map(|res| {
res.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
-
.map(|(_, value)| Cursor::new(value))
})
.take(1000)
})
···
return Either::Right(std::iter::empty());
};
-
let map_block = move |(key, val)| {
-
let mut key_reader = Cursor::new(key);
-
let start_timestamp = key_reader.read_varint::<u64>()?;
-
if start_timestamp < start_limit {
-
return Ok(None);
-
}
-
let items = handle::ItemDecoder::new(Cursor::new(val), start_timestamp)?
.take_while(move |item| {
item.as_ref().map_or(true, |item| {
item.timestamp <= end_limit && item.timestamp >= start_limit
···
fmt::Debug,
io::Cursor,
ops::{Bound, Deref, RangeBounds},
+
path::Path,
time::Duration,
};
use byteview::StrView;
+
use fjall::{Keyspace, Partition, PartitionCreateOptions};
use itertools::{Either, Itertools};
+
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use rclite::Arc;
use rkyv::{Archive, Deserialize, Serialize, rancor::Error};
use smol_str::{SmolStr, ToSmolStr};
···
use tokio_util::sync::CancellationToken;
use crate::{
+
db::handle::{Compression, LexiconHandle},
error::{AppError, AppResult},
jetstream::JetstreamEvent,
utils::{RateTracker, ReadVariableExt, varints_unsigned_encoded},
···
pub struct DbConfig {
pub ks_config: fjall::Config,
+
#[cfg(feature = "compress")]
+
pub dict_path: std::path::PathBuf,
pub min_block_size: usize,
pub max_block_size: usize,
pub max_last_activity: u64,
···
fn default() -> Self {
Self {
ks_config: fjall::Config::default(),
+
#[cfg(feature = "compress")]
+
dict_path: "zstd_dict".parse().unwrap(),
min_block_size: 512,
max_block_size: 500_000,
max_last_activity: Duration::from_secs(10).as_nanos() as u64,
···
event_broadcaster: broadcast::Sender<(SmolStr, NsidCounts)>,
eps: RateTracker<100>,
cancel_token: CancellationToken,
+
compression: Compression,
}
impl Db {
pub fn new(cfg: DbConfig, cancel_token: CancellationToken) -> AppResult<Self> {
tracing::info!("opening db...");
let ks = cfg.ks_config.clone().open()?;
+
let _compression = Compression::None;
+
#[cfg(feature = "compress")]
+
let dict = std::fs::File::open(&cfg.dict_path).ok().and_then(|mut f| {
+
let meta = f.metadata().ok()?;
+
byteview::ByteView::from_reader(&mut f, meta.len() as usize).ok()
+
});
+
#[cfg(feature = "compress")]
+
let _compression = match dict {
+
Some(dict) => {
+
tracing::info!(
+
"using zstd compression with dict from {}",
+
cfg.dict_path.to_string_lossy()
+
);
+
Compression::Zstd(dict)
+
}
+
None => Compression::None,
+
};
Ok(Self {
cfg,
hits: Default::default(),
···
event_broadcaster: broadcast::channel(1000).0,
eps: RateTracker::new(Duration::from_secs(1)),
cancel_token,
+
compression: _compression,
})
}
···
.into_par_iter()
.map(|(i, items, handle)| {
let count = items.len();
+
let block = handle.encode_block_from_items(items, count)?;
tracing::info!(
"{}: encoded block with {} items",
handle.nsid(),
···
Some(handle) => handle.clone(),
None => {
if self.ks.partition_exists(nsid.as_ref()) {
+
let handle = Arc::new(LexiconHandle::new(
+
&self.ks,
+
nsid.as_ref(),
+
self.compression.clone(),
+
));
let _ = self.hits.insert(SmolStr::new(nsid), handle.clone());
handle
} else {
···
#[inline(always)]
fn ensure_handle(&self, nsid: &SmolStr) -> impl Deref<Target = Arc<LexiconHandle>> + use<'_> {
+
self.hits.entry(nsid.clone()).or_insert_with(|| {
+
Arc::new(LexiconHandle::new(
+
&self.ks,
+
&nsid,
+
self.compression.clone(),
+
))
+
})
}
pub fn ingest_events(&self, events: impl Iterator<Item = EventRecord>) -> AppResult<()> {
···
};
let block_lens = handle.iter().rev().try_fold(Vec::new(), |mut acc, item| {
let (key, value) = item?;
+
let decoder = handle.get_decoder_for(key, value)?;
acc.push(decoder.item_count());
AppResult::Ok(acc)
})?;
···
})
}
+
// train zstd dict with 1000 blocks from every lexicon
+
#[cfg(feature = "compress")]
pub fn train_zstd_dict(&self) -> AppResult<Vec<u8>> {
let samples = self
.get_nsids()
···
.map(|handle| {
handle
.iter()
+
.map(move |res| {
res.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
+
.and_then(|(_, value)| Ok(Cursor::new(handle.get_raw_block(value)?)))
})
.take(1000)
})
···
return Either::Right(std::iter::empty());
};
+
let map_block = |(key, val)| {
+
let decoder = handle.get_decoder_for(key, val)?;
+
let items = decoder
.take_while(move |item| {
item.as_ref().map_or(true, |item| {
item.timestamp <= end_limit && item.timestamp >= start_limit
+11 -11
server/src/main.rs
···
debug();
return;
}
-
Some("traindict") => {
-
train_zstd_dict();
-
return;
-
}
Some(x) => {
tracing::error!("unknown command: {}", x);
return;
···
db.sync(true).expect("cant sync db");
}
-
fn train_zstd_dict() {
-
let db = Db::new(DbConfig::default(), CancellationToken::new()).expect("couldnt create db");
-
let dict_data = db.train_zstd_dict().expect("cant train zstd dict");
-
std::fs::write("zstd_dict", dict_data).expect("cant save zstd dict")
-
}
-
fn debug() {
let db = Db::new(DbConfig::default(), CancellationToken::new()).expect("couldnt create db");
let info = db.info().expect("cant get db info");
···
DbConfig::default().ks(|c| {
c.max_journaling_size(u64::MAX)
.max_write_buffer_size(u64::MAX)
}),
CancellationToken::new(),
)
···
fn migrate() {
let cancel_token = CancellationToken::new();
let from = Arc::new(
Db::new(
DbConfig::default().path(".fjall_data_from"),
···
)
.expect("couldnt create db"),
);
let to = Arc::new(
Db::new(
DbConfig::default().path(".fjall_data_to").ks(|c| {
···
);
let nsids = from.get_nsids().collect::<Vec<_>>();
-
let eps_thread = std::thread::spawn({
let to = to.clone();
move || {
loop {
···
debug();
return;
}
Some(x) => {
tracing::error!("unknown command: {}", x);
return;
···
db.sync(true).expect("cant sync db");
}
fn debug() {
let db = Db::new(DbConfig::default(), CancellationToken::new()).expect("couldnt create db");
let info = db.info().expect("cant get db info");
···
DbConfig::default().ks(|c| {
c.max_journaling_size(u64::MAX)
.max_write_buffer_size(u64::MAX)
+
.compaction_workers(rayon::current_num_threads() * 4)
+
.flush_workers(rayon::current_num_threads() * 4)
}),
CancellationToken::new(),
)
···
fn migrate() {
let cancel_token = CancellationToken::new();
+
let from = Arc::new(
Db::new(
DbConfig::default().path(".fjall_data_from"),
···
)
.expect("couldnt create db"),
);
+
#[cfg(feature = "compress")]
+
std::fs::write(
+
"zstd_dict",
+
from.train_zstd_dict().expect("cant get zstd dict"),
+
)
+
.expect("cant write zstd dict");
+
let to = Arc::new(
Db::new(
DbConfig::default().path(".fjall_data_to").ks(|c| {
···
);
let nsids = from.get_nsids().collect::<Vec<_>>();
+
let _eps_thread = std::thread::spawn({
let to = to.clone();
move || {
loop {