tracks lexicons and how many times they appeared on the jetstream

refactor(server): implement own rate counter that should be faster, use quanta on it

ptr.pet 6b78fe2d 32529fe8

verified
Changed files
+225 -73
server
+35 -43
server/Cargo.lock
···
checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa"
[[package]]
-
name = "ahash"
-
version = "0.8.12"
-
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75"
-
dependencies = [
-
"cfg-if",
-
"getrandom 0.3.3",
-
"once_cell",
-
"version_check",
-
"zerocopy",
-
]
-
-
[[package]]
name = "aho-corasick"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
-
name = "pingora-limits"
-
version = "0.5.0"
-
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "a719a8cb5558ca06bd6076c97b8905d500ea556da89e132ba53d4272844f95b9"
-
dependencies = [
-
"ahash",
-
]
-
-
[[package]]
name = "pkg-config"
version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
[[package]]
+
name = "quanta"
+
version = "0.12.6"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "f3ab5a9d756f0d97bdc89019bd2e4ea098cf9cde50ee7564dde6b81ccc8f06c7"
+
dependencies = [
+
"crossbeam-utils",
+
"libc",
+
"once_cell",
+
"raw-cpuid",
+
"wasi 0.11.1+wasi-snapshot-preview1",
+
"web-sys",
+
"winapi",
+
]
+
+
[[package]]
name = "quick_cache"
version = "0.6.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
[[package]]
+
name = "raw-cpuid"
+
version = "11.5.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "c6df7ab838ed27997ba19a4664507e6f82b41fe6e20be42929332156e5e85146"
+
dependencies = [
+
"bitflags",
+
]
+
+
[[package]]
name = "redox_syscall"
version = "0.5.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
"fjall",
"futures-util",
"ordered-varint",
-
"pingora-limits",
+
"quanta",
"rkyv",
"rustls",
"scc",
···
[[package]]
+
name = "web-sys"
+
version = "0.3.77"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "33b6dd2ef9186f1f2072e409e99cd22a975331a6b3591b12c764e0e55c60d5d2"
+
dependencies = [
+
"js-sys",
+
"wasm-bindgen",
+
]
+
+
[[package]]
name = "webpki-root-certs"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
version = "0.8.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fdd20c5420375476fbd4394763288da7eb0cc0b8c11deed431a91562af7335d3"
-
-
[[package]]
-
name = "zerocopy"
-
version = "0.8.26"
-
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "1039dd0d3c310cf05de012d8a39ff557cb0d23087fd44cad61df08fc31907a2f"
-
dependencies = [
-
"zerocopy-derive",
-
]
-
-
[[package]]
-
name = "zerocopy-derive"
-
version = "0.8.26"
-
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "9ecf5b4cc5364572d7f4c329661bcc82724222973f2cab6f050a4e5c22f75181"
-
dependencies = [
-
"proc-macro2",
-
"quote",
-
"syn",
-
]
[[package]]
name = "zeroize"
+1 -1
server/Cargo.toml
···
futures-util = "0.3"
axum = { version = "0.8", default-features = false, features = ["http1", "tokio", "tracing", "json", "query"] }
axum-tws = { git = "https://github.com/90-008/axum-tws.git", features = ["http2"] }
-
pingora-limits = "0.5"
tower-http = {version = "0.6", features = ["request-id", "trace", "compression-full"]}
fjall = { version = "2", default-features = false, features = ["miniz", "lz4"] }
rkyv = {version = "0.8", features = ["unaligned"]}
···
ordered-varint = "2.0.0"
threadpool = "1.8.1"
snmalloc-rs = "0.3.8"
+
quanta = "0.12.6"
+21 -7
server/src/api.rs
···
collections::HashMap,
fmt::Display,
net::SocketAddr,
-
ops::Deref,
+
ops::{Bound, Deref, RangeBounds},
sync::Arc,
-
time::{Duration, UNIX_EPOCH},
+
time::Duration,
};
use anyhow::anyhow;
···
use crate::{
db::Db,
error::{AppError, AppResult},
-
utils::time_now,
};
struct LatencyMillis(u128);
···
const MAX_HITS: usize = 100_000;
+
#[derive(Debug)]
+
struct HitsRange {
+
from: Bound<u64>,
+
to: Bound<u64>,
+
}
+
+
impl RangeBounds<u64> for HitsRange {
+
fn start_bound(&self) -> Bound<&u64> {
+
self.from.as_ref()
+
}
+
+
fn end_bound(&self) -> Bound<&u64> {
+
self.to.as_ref()
+
}
+
}
+
async fn hits(
State(db): State<Arc<Db>>,
Query(params): Query<HitsQuery>,
) -> AppResult<Json<Vec<Hit>>> {
+
let from = params.to.map(Bound::Included).unwrap_or(Bound::Unbounded);
+
let to = params.from.map(Bound::Included).unwrap_or(Bound::Unbounded);
let maybe_hits = db
-
.get_hits(
-
&params.nsid,
-
params.to.unwrap_or(0)..params.from.unwrap_or(time_now()),
-
)
+
.get_hits(&params.nsid, HitsRange { from, to })
.take(MAX_HITS);
let mut hits = Vec::with_capacity(maybe_hits.size_hint().0);
+10 -15
server/src/db/mod.rs
···
db::block::{ReadVariableExt, WriteVariableExt},
error::{AppError, AppResult},
jetstream::JetstreamEvent,
-
utils::time_now,
+
utils::{DefaultRateTracker, RateTracker},
};
mod block;
···
pub struct LexiconHandle {
tree: Partition,
buf: Arc<scc::Queue<EventRecord>>,
+
// this is stored here since scc::Queue does not have O(1) length
buf_len: AtomicUsize, // relaxed
last_insert: AtomicInstant, // relaxed
-
block_size: AtomicUsize, // relaxed
-
eps: Rate,
+
eps: DefaultRateTracker,
}
impl LexiconHandle {
···
buf: Default::default(),
buf_len: AtomicUsize::new(0),
last_insert: AtomicInstant::now(),
-
eps: Rate::new(Duration::from_secs(5)),
-
block_size: AtomicUsize::new(1000),
+
eps: RateTracker::new(Duration::from_secs(10)),
}
}
···
}
fn suggested_block_size(&self) -> usize {
-
self.block_size.load(AtomicOrdering::Relaxed)
+
self.eps.rate() as usize * 60
}
fn insert(&self, event: EventRecord) {
···
self.buf_len.fetch_add(1, AtomicOrdering::Relaxed);
self.last_insert
.store(Instant::now(), AtomicOrdering::Relaxed);
-
self.eps.observe(&(), 1);
-
let rate = self.eps.rate(&()) as usize;
-
if rate != 0 {
-
self.block_size.store(rate * 60, AtomicOrdering::Relaxed);
-
}
+
self.eps.observe();
}
fn sync(&self, max_block_size: usize) -> AppResult<usize> {
···
hits: scc::HashIndex<SmolStr, Arc<LexiconHandle>>,
syncpool: threadpool::ThreadPool,
event_broadcaster: broadcast::Sender<(SmolStr, NsidCounts)>,
-
eps: Rate,
+
eps: RateTracker<100>,
shutting_down: AtomicBool,
min_block_size: usize,
max_block_size: usize,
···
)?,
inner: ks,
event_broadcaster: broadcast::channel(1000).0,
-
eps: Rate::new(Duration::from_secs(1)),
+
eps: RateTracker::new(Duration::from_secs(1)),
shutting_down: AtomicBool::new(false),
min_block_size: 512,
max_block_size: 500_000,
···
#[inline(always)]
pub fn eps(&self) -> usize {
-
self.eps.rate(&()) as usize
+
self.eps.rate() as usize
}
#[inline(always)]
···
if self.event_broadcaster.receiver_count() > 0 {
let _ = self.event_broadcaster.send((SmolStr::new(&nsid), counts));
}
-
self.eps.observe(&(), 1);
+
self.eps.observe();
Ok(())
}
+1 -1
server/src/main.rs
···
}
});
-
std::thread::spawn({
+
let sync_thread = std::thread::spawn({
let db = db.clone();
move || {
loop {
+157 -6
server/src/utils.rs
···
-
use std::time::UNIX_EPOCH;
+
use std::sync::atomic::{AtomicU64, Ordering};
+
use std::time::Duration;
+
+
pub static CLOCK: std::sync::LazyLock<quanta::Clock> =
+
std::sync::LazyLock::new(|| quanta::Clock::new());
+
+
/// simple thread-safe rate tracker using time buckets
+
/// divides time into fixed buckets and rotates through them
+
#[derive(Debug)]
+
pub struct RateTracker<const BUCKET_WINDOW: u64> {
+
buckets: Vec<AtomicU64>,
+
bucket_duration_nanos: u64,
+
window_duration: Duration,
+
last_bucket_time: AtomicU64,
+
start_time: u64, // raw time when tracker was created
+
}
+
+
pub type DefaultRateTracker = RateTracker<1000>;
+
+
impl<const BUCKET_WINDOW: u64> RateTracker<BUCKET_WINDOW> {
+
/// create a new rate tracker with the specified time window
+
pub fn new(window_duration: Duration) -> Self {
+
let bucket_duration_nanos = Duration::from_millis(BUCKET_WINDOW).as_nanos() as u64;
+
let num_buckets =
+
(window_duration.as_nanos() as u64 / bucket_duration_nanos).max(1) as usize;
+
+
let mut buckets = Vec::with_capacity(num_buckets);
+
for _ in 0..num_buckets {
+
buckets.push(AtomicU64::new(0));
+
}
+
+
let start_time = CLOCK.raw();
+
Self {
+
buckets,
+
bucket_duration_nanos,
+
window_duration,
+
last_bucket_time: AtomicU64::new(0),
+
start_time,
+
}
+
}
+
+
/// record an event
+
pub fn observe(&self) {
+
let now = CLOCK.raw();
+
self.maybe_advance_buckets(now);
+
+
let bucket_index = self.get_current_bucket_index(now);
+
self.buckets[bucket_index].fetch_add(1, Ordering::Relaxed);
+
}
+
+
/// get the current rate in events per second
+
pub fn rate(&self) -> f64 {
+
let now = CLOCK.raw();
+
self.maybe_advance_buckets(now);
+
+
let total_events: u64 = self
+
.buckets
+
.iter()
+
.map(|bucket| bucket.load(Ordering::Relaxed))
+
.sum();
+
+
total_events as f64 / self.window_duration.as_secs_f64()
+
}
+
+
fn get_current_bucket_index(&self, now: u64) -> usize {
+
let elapsed_nanos = CLOCK.delta_as_nanos(self.start_time, now);
+
let bucket_number = elapsed_nanos / self.bucket_duration_nanos;
+
(bucket_number as usize) % self.buckets.len()
+
}
+
+
fn maybe_advance_buckets(&self, now: u64) {
+
let elapsed_nanos = CLOCK.delta_as_nanos(self.start_time, now);
+
let current_bucket_time =
+
(elapsed_nanos / self.bucket_duration_nanos) * self.bucket_duration_nanos;
+
let last_bucket_time = self.last_bucket_time.load(Ordering::Relaxed);
+
+
if current_bucket_time > last_bucket_time {
+
// try to update the last bucket time
+
if self
+
.last_bucket_time
+
.compare_exchange_weak(
+
last_bucket_time,
+
current_bucket_time,
+
Ordering::Relaxed,
+
Ordering::Relaxed,
+
)
+
.is_ok()
+
{
+
// clear buckets that are now too old
+
let buckets_to_advance = ((current_bucket_time - last_bucket_time)
+
/ self.bucket_duration_nanos)
+
.min(self.buckets.len() as u64);
+
+
for i in 0..buckets_to_advance {
+
let bucket_time = last_bucket_time + (i + 1) * self.bucket_duration_nanos;
+
let bucket_index =
+
(bucket_time / self.bucket_duration_nanos) as usize % self.buckets.len();
+
self.buckets[bucket_index].store(0, Ordering::Relaxed);
+
}
+
}
+
}
+
}
+
}
+
+
#[cfg(test)]
+
mod tests {
+
use super::*;
+
use std::sync::Arc;
+
use std::thread;
+
+
#[test]
+
fn test_rate_tracker_basic() {
+
let tracker = DefaultRateTracker::new(Duration::from_secs(2));
+
+
// record some events
+
tracker.observe();
+
tracker.observe();
+
tracker.observe();
+
+
let rate = tracker.rate();
+
assert_eq!(rate, 1.5); // 3 events over 2 seconds = 1.5 events/sec
+
}
+
+
#[test]
+
fn test_rate_tracker_burst() {
+
let tracker = DefaultRateTracker::new(Duration::from_secs(1));
-
pub fn time_now() -> u64 {
-
std::time::SystemTime::now()
-
.duration_since(UNIX_EPOCH)
-
.expect("oops")
-
.as_micros() as u64
+
// record a lot of events
+
for _ in 0..1000 {
+
tracker.observe();
+
}
+
+
let rate = tracker.rate();
+
assert_eq!(rate, 1000.0); // 1000 events in 1 second
+
}
+
+
#[test]
+
fn test_rate_tracker_threading() {
+
let tracker = Arc::new(DefaultRateTracker::new(Duration::from_secs(1)));
+
let mut handles = vec![];
+
+
for _ in 0..4 {
+
let tracker_clone = Arc::clone(&tracker);
+
let handle = thread::spawn(move || {
+
for _ in 0..10 {
+
tracker_clone.observe();
+
}
+
});
+
handles.push(handle);
+
}
+
+
for handle in handles {
+
handle.join().unwrap();
+
}
+
+
let rate = tracker.rate();
+
assert_eq!(rate, 40.0); // 40 events in 1 second
+
}
}