tracks lexicons and how many times they appeared on the jetstream

feat(server): improve the logging of sync and compaction

ptr.pet 9036e3a9 ce8f1ca0

verified
Changed files
+34 -29
server
src
+13 -6
server/src/db/handle.rs
···
}
}
pub fn nsid(&self) -> &SmolStr {
&self.nsid
}
···
range: impl RangeBounds<u64>,
sort: bool,
) -> AppResult<()> {
let start_limit = match range.start_bound().cloned() {
Bound::Included(start) => start,
Bound::Excluded(start) => start.saturating_add(1),
···
.range(start_key..end_key)
.collect::<Result<Vec<_>, _>>()?;
if blocks_to_compact.len() < 2 {
-
tracing::info!("{}: nothing to compact", self.nsid);
return Ok(());
}
···
self.tree.insert(block.key, block.data)?;
}
tracing::info!(
-
"{}: compacted {} blocks to {} blocks ({}% reduction)",
-
self.nsid,
-
start_blocks_size,
-
end_blocks_size,
-
((start_blocks_size - end_blocks_size) as f64 / start_blocks_size as f64) * 100.0,
);
Ok(())
···
}
}
+
pub fn span(&self) -> tracing::Span {
+
tracing::info_span!("handle", nsid = %self.nsid)
+
}
+
pub fn nsid(&self) -> &SmolStr {
&self.nsid
}
···
range: impl RangeBounds<u64>,
sort: bool,
) -> AppResult<()> {
+
let _span = self.span().entered();
+
let start_limit = match range.start_bound().cloned() {
Bound::Included(start) => start,
Bound::Excluded(start) => start.saturating_add(1),
···
.range(start_key..end_key)
.collect::<Result<Vec<_>, _>>()?;
if blocks_to_compact.len() < 2 {
return Ok(());
}
···
self.tree.insert(block.key, block.data)?;
}
+
let reduction =
+
((start_blocks_size - end_blocks_size) as f64 / start_blocks_size as f64) * 100.0;
tracing::info!(
+
{
+
start = start_blocks_size,
+
end = end_blocks_size,
+
},
+
"blocks compacted {reduction:.2}%",
);
Ok(())
+21 -23
server/src/db/mod.rs
···
db::handle::{ItemDecoder, LexiconHandle},
error::{AppError, AppResult},
jetstream::JetstreamEvent,
-
utils::{RateTracker, ReadVariableExt, varints_unsigned_encoded},
};
mod block;
···
}
pub fn sync(&self, all: bool) -> AppResult<()> {
// prepare all the data
let mut data = Vec::with_capacity(self.hits.len());
let _guard = scc::ebr::Guard::new();
···
let count = handle.item_count();
let data_count = count / block_size;
if count > 0 && (all || data_count > 0 || is_too_old) {
-
for i in 0..data_count {
-
nsid_data.push((i, handle.clone(), block_size));
total_count += block_size;
}
// only sync remainder if we haven't met block size
let remainder = count % block_size;
if (all || data_count == 0) && remainder > 0 {
-
nsid_data.push((data_count, handle.clone(), remainder));
total_count += remainder;
}
}
tracing::info!(
-
"{}: will sync {} blocks ({} count)",
-
handle.nsid(),
-
nsid_data.len(),
-
total_count,
);
data.push(nsid_data);
}
···
.map(|chunk| {
chunk
.into_iter()
-
.map(|(i, handle, max_block_size)| {
-
(i, handle.take_block_items(max_block_size), handle)
})
.collect::<Vec<_>>()
.into_par_iter()
-
.map(|(i, items, handle)| {
let count = items.len();
let block = LexiconHandle::encode_block_from_items(items, count)?;
-
tracing::info!(
-
"{}: encoded block with {} items",
-
handle.nsid(),
-
block.written,
-
);
-
AppResult::Ok((i, block, handle))
})
.collect::<Result<Vec<_>, _>>()
})
.try_for_each(|chunk| {
let chunk = chunk?;
-
for (i, block, handle) in chunk {
-
self.sync_pool
-
.execute(move || match handle.insert(block.key, block.data) {
Ok(_) => {
-
tracing::info!("{}: [{i}] synced {}", handle.nsid(), block.written)
}
-
Err(err) => tracing::error!("failed to sync block: {}", err),
-
});
}
AppResult::Ok(())
})?;
self.sync_pool.join();
Ok(())
}
···
db::handle::{ItemDecoder, LexiconHandle},
error::{AppError, AppResult},
jetstream::JetstreamEvent,
+
utils::{CLOCK, RateTracker, ReadVariableExt, varints_unsigned_encoded},
};
mod block;
···
}
pub fn sync(&self, all: bool) -> AppResult<()> {
+
let start = CLOCK.now();
// prepare all the data
let mut data = Vec::with_capacity(self.hits.len());
let _guard = scc::ebr::Guard::new();
···
let count = handle.item_count();
let data_count = count / block_size;
if count > 0 && (all || data_count > 0 || is_too_old) {
+
for _ in 0..data_count {
+
nsid_data.push((handle.clone(), block_size));
total_count += block_size;
}
// only sync remainder if we haven't met block size
let remainder = count % block_size;
if (all || data_count == 0) && remainder > 0 {
+
nsid_data.push((handle.clone(), remainder));
total_count += remainder;
}
}
+
let _span = handle.span().entered();
tracing::info!(
+
{blocks = %nsid_data.len(), count = %total_count},
+
"will encode & sync",
);
data.push(nsid_data);
}
···
.map(|chunk| {
chunk
.into_iter()
+
.map(|(handle, max_block_size)| {
+
(handle.take_block_items(max_block_size), handle)
})
.collect::<Vec<_>>()
.into_par_iter()
+
.map(|(items, handle)| {
let count = items.len();
let block = LexiconHandle::encode_block_from_items(items, count)?;
+
AppResult::Ok((block, handle))
})
.collect::<Result<Vec<_>, _>>()
})
.try_for_each(|chunk| {
let chunk = chunk?;
+
for (block, handle) in chunk {
+
self.sync_pool.execute(move || {
+
let _span = handle.span().entered();
+
match handle.insert(block.key, block.data) {
Ok(_) => {
+
tracing::info!({count = %block.written}, "synced")
}
+
Err(err) => tracing::error!({ err = %err }, "failed to sync block"),
+
}
+
});
}
AppResult::Ok(())
})?;
self.sync_pool.join();
+
tracing::info!(time = %start.elapsed().as_secs_f64(), "synced all blocks");
Ok(())
}