tracks lexicons and how many times they appeared on the jetstream

feat(server): sort on major compact

ptr.pet 29914c94 bf7842d0

verified
Changed files
+17 -10
server
+11 -2
server/src/db/handle.rs
···
self.eps.observe();
}
-
pub fn compact(&self, compact_to: usize, range: impl RangeBounds<u64>) -> AppResult<()> {
+
pub fn compact(
+
&self,
+
compact_to: usize,
+
range: impl RangeBounds<u64>,
+
sort: bool,
+
) -> AppResult<()> {
let start_limit = match range.start_bound().cloned() {
Bound::Included(start) => start,
Bound::Excluded(start) => start.saturating_add(1),
···
let start_blocks_size = blocks_to_compact.len();
let keys_to_delete = blocks_to_compact.iter().map(|(key, _)| key);
-
let all_items =
+
let mut all_items =
blocks_to_compact
.iter()
.try_fold(Vec::new(), |mut acc, (key, value)| {
···
acc.append(&mut items);
AppResult::Ok(acc)
})?;
+
+
if sort {
+
all_items.sort_unstable_by_key(|e| e.timestamp);
+
}
let new_blocks = all_items
.into_iter()
+5 -3
server/src/db/mod.rs
···
nsid: impl AsRef<str>,
max_count: usize,
range: impl RangeBounds<u64>,
+
sort: bool,
) -> AppResult<()> {
let Some(handle) = self.get_handle(nsid) else {
return Ok(());
};
-
handle.compact(max_count, range)
+
handle.compact(max_count, range, sort)
}
pub fn compact_all(
&self,
max_count: usize,
range: impl RangeBounds<u64> + Clone,
+
sort: bool,
) -> AppResult<()> {
for nsid in self.get_nsids() {
-
self.compact(nsid, max_count, range.clone())?;
+
self.compact(nsid, max_count, range.clone(), sort)?;
}
Ok(())
}
pub fn major_compact(&self) -> AppResult<()> {
-
self.compact_all(self.max_block_size, ..)?;
+
self.compact_all(self.max_block_size, .., true)?;
let _guard = scc::ebr::Guard::new();
for (_, handle) in self.hits.iter(&_guard) {
handle.deref().major_compact()?;
+1 -1
server/src/main.rs
···
},
"running compaction...",
);
-
match db.compact_all(db.max_block_size, range) {
+
match db.compact_all(db.max_block_size, range, false) {
Ok(_) => (),
Err(e) => tracing::error!("failed to compact db: {}", e),
}
-4
server/src/utils.rs
···
}
}
-
pub fn ago(duration: Duration) -> Self {
-
Self::new(duration, TimeDirection::Backwards)
-
}
-
pub fn from_now(duration: Duration) -> Self {
let cur = get_time();
if duration > cur {