tracks lexicons and how many times they appeared on the jetstream

refactor(server): improve record events by splitting it into ingest_events

ptr.pet 7cd979b3 13e1dd85

verified
Changed files
+52 -24
server
+16
server/Cargo.lock
···
checksum = "c0d05e1c0dbad51b52c38bda7adceef61b9efc2baf04acfe8726a8c4630a6f57"
[[package]]
+
name = "either"
+
version = "1.15.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719"
+
+
[[package]]
name = "enum_dispatch"
version = "0.3.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
"bitflags",
"cfg-if",
"libc",
+
]
+
+
[[package]]
+
name = "itertools"
+
version = "0.14.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285"
+
dependencies = [
+
"either",
]
[[package]]
···
"axum-tws",
"fjall",
"futures-util",
+
"itertools",
"ordered-varint",
"quanta",
"rkyv",
+1
server/Cargo.toml
···
threadpool = "1.8.1"
snmalloc-rs = "0.3.8"
quanta = "0.12.6"
+
itertools = "0.14.0"
+35 -24
server/src/db/mod.rs
···
};
use fjall::{Config, Keyspace, Partition, PartitionCreateOptions, Slice};
+
use itertools::Itertools;
use ordered_varint::Variable;
use rkyv::{Archive, Deserialize, Serialize, rancor::Error};
use smol_str::SmolStr;
···
#[inline(always)]
fn run_in_nsid_tree<T>(
&self,
-
nsid: SmolStr,
+
nsid: &SmolStr,
f: impl FnOnce(&LexiconHandle) -> AppResult<T>,
) -> AppResult<T> {
f(self
.hits
.entry(nsid.clone())
-
.or_insert_with(move || Arc::new(LexiconHandle::new(&self.inner, &nsid)))
+
.or_insert_with(|| Arc::new(LexiconHandle::new(&self.inner, &nsid)))
.get())
}
-
pub fn record_event(&self, e: EventRecord) -> AppResult<()> {
-
let EventRecord {
-
nsid,
-
timestamp,
-
deleted,
-
} = e.clone();
+
pub fn ingest_events(&self, events: impl Iterator<Item = EventRecord>) -> AppResult<()> {
+
for (key, chunk) in events.chunk_by(|event| event.nsid.clone()).into_iter() {
+
let mut counts = self.get_count(&key)?;
+
self.run_in_nsid_tree(&key, move |tree| {
+
for event in chunk {
+
let EventRecord {
+
timestamp, deleted, ..
+
} = event.clone();
-
// insert event
-
self.run_in_nsid_tree(nsid.clone(), move |tree| Ok(tree.insert(e)))?;
-
// increment count
-
let mut counts = self.get_count(&nsid)?;
-
counts.last_seen = timestamp;
-
if deleted {
-
counts.deleted_count += 1;
-
} else {
-
counts.count += 1;
-
}
-
self.insert_count(&nsid, counts.clone())?;
-
if self.event_broadcaster.receiver_count() > 0 {
-
let _ = self.event_broadcaster.send((SmolStr::new(&nsid), counts));
+
tree.insert(event);
+
+
// increment count
+
counts.last_seen = timestamp;
+
if deleted {
+
counts.deleted_count += 1;
+
} else {
+
counts.count += 1;
+
}
+
+
self.eps.observe();
+
}
+
Ok(())
+
})?;
+
self.insert_count(&key, &counts)?;
+
if self.event_broadcaster.receiver_count() > 0 {
+
let _ = self.event_broadcaster.send((key, counts));
+
}
}
-
self.eps.observe();
Ok(())
}
+
pub fn record_event(&self, e: EventRecord) -> AppResult<()> {
+
self.ingest_events(std::iter::once(e))
+
}
+
#[inline(always)]
-
fn insert_count(&self, nsid: &str, counts: NsidCounts) -> AppResult<()> {
+
fn insert_count(&self, nsid: &str, counts: &NsidCounts) -> AppResult<()> {
self.counts
.insert(
nsid,
-
unsafe { rkyv::to_bytes::<Error>(&counts).unwrap_unchecked() }.as_slice(),
+
unsafe { rkyv::to_bytes::<Error>(counts).unwrap_unchecked() }.as_slice(),
)
.map_err(AppError::from)
}