tracks lexicons and how many times they appeared on the jetstream

refactor(server): use rclite and parking_lot, replace the scc::Queue with a Arc<Mutex<Vec<_>>>

ptr.pet 2a7d6005 e2dd9f2b

verified
Changed files
+67 -25
server
+35
server/Cargo.lock
···
]
[[package]]
+
name = "branches"
+
version = "0.2.2"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "a918aa7a861caeba57e502465c30e3a0d74ae02ee0b9db2933602fdb6a3a90e5"
+
dependencies = [
+
"rustc_version",
+
]
+
+
[[package]]
name = "brotli"
version = "8.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
[[package]]
+
name = "rclite"
+
version = "0.2.7"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "2f528dfeba924f5fc67bb84a17fe043451d1b392758016ce2d9e9116649b0f35"
+
dependencies = [
+
"branches",
+
]
+
+
[[package]]
name = "redox_syscall"
version = "0.5.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d"
[[package]]
+
name = "rustc_version"
+
version = "0.4.1"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "cfcb3a22ef46e85b45de6ee7e79d063319ebb6594faafcf1c225ea92ab6e9b92"
+
dependencies = [
+
"semver",
+
]
+
+
[[package]]
name = "rustix"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
checksum = "0f7d95a54511e0c7be3f51e8867aa8cf35148d7b9445d44de2f943e2b206e749"
[[package]]
+
name = "semver"
+
version = "1.0.26"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "56e6fa9c48d24d85fb3de5ad847117517440f6beceb7798af16b4a87d616b8d0"
+
+
[[package]]
name = "serde"
version = "1.0.219"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
"futures-util",
"itertools",
"ordered-varint",
+
"parking_lot",
"quanta",
"rayon",
+
"rclite",
"rkyv",
"rustls",
"scc",
+3 -1
server/Cargo.toml
···
async-trait = "0.1"
tracing-subscriber = {version = "0.3", features = ["env-filter"]}
tracing = "0.1"
-
tokio = { version = "1", features = ["full"] }
+
tokio = { version = "1", features = ["full", "parking_lot"] }
tokio-util = { version = "0.7", features = ["tracing"] }
rustls = { version = "0.23", default-features = false, features = ["log", "ring", "std"] }
tokio-websockets = { version = "0.12", features = ["client", "rustls-platform-verifier", "getrandom", "ring"] }
···
itertools = "0.14.0"
byteview = "0.6.1"
rayon = "1.10.0"
+
parking_lot = { version = "0.12", features = ["send_guard", "hardware-lock-elision"] }
+
rclite = "0.2.7"
+1 -1
server/src/api.rs
···
fmt::Display,
net::SocketAddr,
ops::{Bound, Deref, RangeBounds},
-
sync::Arc,
time::Duration,
};
···
routing::get,
};
use axum_tws::{Message, WebSocketUpgrade};
+
use rclite::Arc;
use serde::{Deserialize, Serialize};
use smol_str::SmolStr;
use tokio_util::sync::CancellationToken;
+4
server/src/db/block.rs
···
})
}
+
pub fn item_count(&self) -> usize {
+
self.expected
+
}
+
pub fn decode(&mut self) -> io::Result<Option<Item<T>>> {
if self.items_read == 0 {
// read the first timestamp
+22 -22
server/src/db/mod.rs
···
io::Cursor,
ops::{Bound, Deref, RangeBounds},
path::Path,
-
sync::{
-
Arc,
-
atomic::{AtomicU64, AtomicUsize, Ordering as AtomicOrdering},
-
},
+
sync::atomic::{AtomicU64, AtomicUsize, Ordering as AtomicOrdering},
time::Duration,
};
use byteview::ByteView;
use fjall::{Config, Keyspace, Partition, PartitionCreateOptions, Slice};
use itertools::{Either, Itertools};
+
use parking_lot::Mutex;
use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
+
use rclite::Arc;
use rkyv::{Archive, Deserialize, Serialize, rancor::Error};
use smol_str::SmolStr;
use tokio::sync::broadcast;
···
pub struct LexiconHandle {
tree: Partition,
nsid: SmolStr,
-
buf: Arc<scc::Queue<EventRecord>>,
-
// this is stored here since scc::Queue does not have O(1) length
-
buf_len: AtomicUsize, // seqcst
+
buf: Arc<Mutex<Vec<EventRecord>>>,
last_insert: AtomicU64, // relaxed
eps: DefaultRateTracker,
}
···
tree: keyspace.open_partition(nsid, opts).unwrap(),
nsid: nsid.into(),
buf: Default::default(),
-
buf_len: AtomicUsize::new(0),
last_insert: AtomicU64::new(0),
eps: RateTracker::new(Duration::from_secs(10)),
}
}
fn item_count(&self) -> usize {
-
self.buf_len.load(AtomicOrdering::SeqCst)
+
self.buf.lock().len()
}
fn since_last_activity(&self) -> u64 {
···
}
fn insert(&self, event: EventRecord) {
-
self.buf.push(event);
-
self.buf_len.fetch_add(1, AtomicOrdering::SeqCst);
+
self.buf.lock().push(event);
self.last_insert.store(CLOCK.raw(), AtomicOrdering::Relaxed);
self.eps.observe();
}
+
fn compact(&self) {}
+
fn encode_block(&self, item_count: usize) -> AppResult<Block> {
let mut writer = ItemEncoder::new(
WritableByteView::with_size(ItemEncoder::encoded_len(item_count)),
···
let mut start_timestamp = None;
let mut end_timestamp = None;
let mut written = 0_usize;
-
while let Some(event) = self.buf.pop() {
+
for event in self.buf.lock().drain(..) {
let item = Item::new(
event.timestamp,
&NsidHit {
···
.into());
}
if let (Some(start_timestamp), Some(end_timestamp)) = (start_timestamp, end_timestamp) {
-
self.buf_len.store(0, AtomicOrdering::SeqCst);
let value = writer.finish()?;
let key = varints_unsigned_encoded([start_timestamp, end_timestamp]);
return Ok(Block {
···
})
}
+
#[inline(always)]
pub fn shutting_down(&self) -> impl Future<Output = ()> {
self.cancel_token.cancelled()
}
+
#[inline(always)]
pub fn is_shutting_down(&self) -> bool {
self.cancel_token.is_cancelled()
+
}
+
+
#[inline(always)]
+
pub fn eps(&self) -> usize {
+
self.eps.rate() as usize
+
}
+
+
#[inline(always)]
+
pub fn new_listener(&self) -> broadcast::Receiver<(SmolStr, NsidCounts)> {
+
self.event_broadcaster.subscribe()
}
pub fn sync(&self, all: bool) -> AppResult<()> {
···
Ok(())
}
-
#[inline(always)]
-
pub fn eps(&self) -> usize {
-
self.eps.rate() as usize
-
}
-
-
#[inline(always)]
-
pub fn new_listener(&self) -> broadcast::Receiver<(SmolStr, NsidCounts)> {
-
self.event_broadcaster.subscribe()
-
}
+
pub fn compact(&self) {}
#[inline(always)]
fn maybe_run_in_nsid_tree<T>(
+2 -1
server/src/main.rs
···
-
use std::{ops::Deref, sync::Arc};
+
use std::ops::Deref;
+
use rclite::Arc;
use smol_str::ToSmolStr;
use tokio_util::sync::CancellationToken;
use tracing::Level;