tracks lexicons and how many times they appeared on the jetstream

refactor(server): use relaxed ordering for keeping track of lexicon buf len / last activity / block size since we don't really need any strict ordering

ptr.pet 32529fe8 3456ca30

verified
Changed files
+9 -9
server
src
db
+9 -9
server/src/db/mod.rs
···
pub struct LexiconHandle {
tree: Partition,
buf: Arc<scc::Queue<EventRecord>>,
-
buf_len: AtomicUsize,
-
last_insert: AtomicInstant,
+
buf_len: AtomicUsize, // relaxed
+
last_insert: AtomicInstant, // relaxed
+
block_size: AtomicUsize, // relaxed
eps: Rate,
-
block_size: AtomicUsize,
}
impl LexiconHandle {
···
}
fn item_count(&self) -> usize {
-
self.buf_len.load(AtomicOrdering::Acquire)
+
self.buf_len.load(AtomicOrdering::Relaxed)
}
fn last_insert(&self) -> Instant {
-
self.last_insert.load(AtomicOrdering::Acquire)
+
self.last_insert.load(AtomicOrdering::Relaxed)
}
fn suggested_block_size(&self) -> usize {
···
fn insert(&self, event: EventRecord) {
self.buf.push(event);
-
self.buf_len.fetch_add(1, AtomicOrdering::Release);
+
self.buf_len.fetch_add(1, AtomicOrdering::Relaxed);
self.last_insert
-
.store(Instant::now(), AtomicOrdering::Release);
+
.store(Instant::now(), AtomicOrdering::Relaxed);
self.eps.observe(&(), 1);
let rate = self.eps.rate(&()) as usize;
if rate != 0 {
···
written += 1;
}
if let (Some(start_timestamp), Some(end_timestamp)) = (start_timestamp, end_timestamp) {
-
self.buf_len.store(0, AtomicOrdering::Release);
+
self.buf_len.store(0, AtomicOrdering::Relaxed);
let value = writer.finish()?;
let mut key = Vec::with_capacity(size_of::<u64>() * 2);
key.write_varint(start_timestamp)?;
···
// hits is tree per nsid: varint start time + varint end time -> block of hits
pub struct Db {
inner: Keyspace,
+
counts: Partition,
hits: scc::HashIndex<SmolStr, Arc<LexiconHandle>>,
syncpool: threadpool::ThreadPool,
-
counts: Partition,
event_broadcaster: broadcast::Sender<(SmolStr, NsidCounts)>,
eps: Rate,
shutting_down: AtomicBool,