tracks lexicons and how many times they appeared on the jetstream

refactor(server): use rayon to compute the blocks, and then exec inserting them in thread pool

ptr.pet 52fd5331 a820ae13

verified
Changed files
+173 -83
server
+31
server/Cargo.lock
···
]
[[package]]
+
name = "crossbeam-deque"
+
version = "0.8.6"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51"
+
dependencies = [
+
"crossbeam-epoch",
+
"crossbeam-utils",
+
]
+
+
[[package]]
name = "crossbeam-epoch"
version = "0.9.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
[[package]]
+
name = "rayon"
+
version = "1.10.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa"
+
dependencies = [
+
"either",
+
"rayon-core",
+
]
+
+
[[package]]
+
name = "rayon-core"
+
version = "1.12.1"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2"
+
dependencies = [
+
"crossbeam-deque",
+
"crossbeam-utils",
+
]
+
+
[[package]]
name = "redox_syscall"
version = "0.5.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
"itertools",
"ordered-varint",
"quanta",
+
"rayon",
"rkyv",
"rustls",
"scc",
+1
server/Cargo.toml
···
quanta = "0.12.6"
itertools = "0.14.0"
byteview = "0.6.1"
+
rayon = "1.10.0"
+1 -1
server/src/api.rs
···
to: Option<u64>,
}
-
#[derive(Serialize)]
+
#[derive(Debug, Serialize)]
struct Hit {
timestamp: u64,
deleted: bool,
-2
server/src/db/block.rs
···
marker::PhantomData,
};
-
use crate::error::AppResult;
-
pub struct Item<T> {
pub timestamp: u64,
data: AlignedVec,
+76 -48
server/src/db/mod.rs
···
use std::{
-
io::{self, Cursor, Write},
-
marker::PhantomData,
+
io::{Cursor, Write},
ops::{Bound, Deref, RangeBounds},
path::Path,
sync::{
Arc,
-
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering as AtomicOrdering},
+
atomic::{AtomicU64, AtomicUsize, Ordering as AtomicOrdering},
},
time::Duration,
};
···
use fjall::{Config, Keyspace, Partition, PartitionCreateOptions, Slice};
use itertools::{Either, Itertools};
use ordered_varint::Variable;
+
use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
use rkyv::{Archive, Deserialize, Serialize, rancor::Error};
use smol_str::SmolStr;
use tokio::sync::broadcast;
-
use tokio_util::bytes::{self, BufMut};
+
use tokio_util::sync::CancellationToken;
use crate::{
db::block::{ReadVariableExt, WriteVariableExt},
···
}
}
+
struct Block {
+
written: usize,
+
key: ByteView,
+
data: ByteView,
+
}
+
pub struct LexiconHandle {
tree: Partition,
+
nsid: SmolStr,
buf: Arc<scc::Queue<EventRecord>>,
// this is stored here since scc::Queue does not have O(1) length
-
buf_len: AtomicUsize, // relaxed
+
buf_len: AtomicUsize, // seqcst
last_insert: AtomicU64, // relaxed
eps: DefaultRateTracker,
}
···
let opts = PartitionCreateOptions::default().compression(fjall::CompressionType::Miniz(9));
Self {
tree: keyspace.open_partition(nsid, opts).unwrap(),
+
nsid: nsid.into(),
buf: Default::default(),
buf_len: AtomicUsize::new(0),
last_insert: AtomicU64::new(0),
···
}
fn item_count(&self) -> usize {
-
self.buf_len.load(AtomicOrdering::Relaxed)
+
self.buf_len.load(AtomicOrdering::SeqCst)
}
fn since_last_activity(&self) -> u64 {
···
fn insert(&self, event: EventRecord) {
self.buf.push(event);
-
self.buf_len.fetch_add(1, AtomicOrdering::Relaxed);
+
self.buf_len.fetch_add(1, AtomicOrdering::SeqCst);
self.last_insert.store(CLOCK.raw(), AtomicOrdering::Relaxed);
self.eps.observe();
}
-
fn sync(&self, max_block_size: usize) -> AppResult<usize> {
+
fn encode_block(&self, max_block_size: usize) -> AppResult<Option<Block>> {
let buf_size =
size_of::<u64>() + self.item_count().min(max_block_size) * size_of::<(u64, NsidHit)>();
let mut writer = ItemEncoder::new(WritableByteView::with_size(buf_size));
···
written += 1;
}
if let (Some(start_timestamp), Some(end_timestamp)) = (start_timestamp, end_timestamp) {
-
self.buf_len.store(0, AtomicOrdering::Relaxed);
+
self.buf_len.store(0, AtomicOrdering::SeqCst);
let value = writer.finish()?;
-
let mut key = Vec::with_capacity(size_of::<u64>() * 2);
+
let mut key = WritableByteView::with_size(size_of::<u64>() * 2);
key.write_varint(start_timestamp)?;
key.write_varint(end_timestamp)?;
-
self.tree.insert(key, value.into_inner())?;
+
return Ok(Some(Block {
+
written,
+
key: key.into_inner(),
+
data: value.into_inner(),
+
}));
}
-
Ok(written)
+
Ok(None)
}
}
···
inner: Keyspace,
counts: Partition,
hits: scc::HashIndex<SmolStr, Arc<LexiconHandle>>,
-
syncpool: threadpool::ThreadPool,
+
sync_pool: threadpool::ThreadPool,
event_broadcaster: broadcast::Sender<(SmolStr, NsidCounts)>,
eps: RateTracker<100>,
-
shutting_down: AtomicBool,
+
cancel_token: CancellationToken,
min_block_size: usize,
max_block_size: usize,
max_last_activity: u64,
}
impl Db {
-
pub fn new(path: impl AsRef<Path>) -> AppResult<Self> {
+
pub fn new(path: impl AsRef<Path>, cancel_token: CancellationToken) -> AppResult<Self> {
tracing::info!("opening db...");
let ks = Config::new(path)
.cache_size(8 * 1024 * 1024) // from talna
.open()?;
Ok(Self {
hits: Default::default(),
-
syncpool: threadpool::Builder::new().num_threads(256).build(),
+
sync_pool: threadpool::Builder::new()
+
.num_threads(rayon::current_num_threads() * 2)
+
.build(),
counts: ks.open_partition(
"_counts",
PartitionCreateOptions::default().compression(fjall::CompressionType::None),
···
inner: ks,
event_broadcaster: broadcast::channel(1000).0,
eps: RateTracker::new(Duration::from_secs(1)),
-
shutting_down: AtomicBool::new(false),
+
cancel_token,
min_block_size: 512,
max_block_size: 500_000,
max_last_activity: Duration::from_secs(10).as_nanos() as u64,
})
}
-
pub fn shutdown(&self) -> AppResult<()> {
-
self.shutting_down.store(true, AtomicOrdering::Release);
-
self.sync(true)
+
pub fn shutting_down(&self) -> impl Future<Output = ()> {
+
self.cancel_token.cancelled()
}
pub fn is_shutting_down(&self) -> bool {
-
self.shutting_down.load(AtomicOrdering::Acquire)
+
self.cancel_token.is_cancelled()
}
pub fn sync(&self, all: bool) -> AppResult<()> {
-
let mut execs = Vec::with_capacity(self.hits.len());
+
// prepare all the data
+
let mut data = Vec::with_capacity(self.hits.len());
let _guard = scc::ebr::Guard::new();
-
for (nsid, tree) in self.hits.iter(&_guard) {
-
let count = tree.item_count();
-
let is_max_block_size = count > self.min_block_size.max(tree.suggested_block_size());
-
let is_too_old = tree.since_last_activity() > self.max_last_activity;
-
if count > 0 && (all || is_max_block_size || is_too_old) {
-
let nsid = nsid.clone();
-
let tree = tree.clone();
-
let max_block_size = self.max_block_size;
-
execs.push(move || {
-
loop {
-
let synced = match tree.sync(max_block_size) {
-
Ok(synced) => synced,
-
Err(err) => {
-
tracing::error!("failed to sync {nsid}: {err}");
-
break;
-
}
-
};
-
if synced == 0 {
-
break;
-
}
-
tracing::info!("synced {synced} of {nsid} to db");
-
}
-
});
+
for (_, handle) in self.hits.iter(&_guard) {
+
let block_size = self
+
.max_block_size
+
.min(self.min_block_size.max(handle.suggested_block_size()));
+
let count = handle.item_count();
+
let data_count = count / block_size;
+
let is_too_old = handle.since_last_activity() > self.max_last_activity;
+
if count > 0 && (all || data_count > 0 || is_too_old) {
+
for i in 0..data_count {
+
data.push((i, handle.clone(), block_size));
+
}
+
// only sync remainder if we haven't met block size
+
let remainder = count % block_size;
+
if data_count == 0 && remainder > 0 {
+
data.push((data_count, handle.clone(), remainder));
+
}
}
}
drop(_guard);
-
for exec in execs {
-
self.syncpool.execute(exec);
+
// process the blocks
+
let mut blocks = Vec::with_capacity(data.len());
+
data.into_par_iter()
+
.map(|(i, handle, max_block_size)| {
+
handle
+
.encode_block(max_block_size)
+
.transpose()
+
.map(|r| r.map(|block| (i, block, handle.clone())))
+
})
+
.collect_into_vec(&mut blocks);
+
+
// execute into db
+
for item in blocks.into_iter() {
+
let Some((i, block, handle)) = item.transpose()? else {
+
continue;
+
};
+
self.sync_pool
+
.execute(move || match handle.tree.insert(block.key, block.data) {
+
Ok(_) => {
+
tracing::info!("[{i}] synced {} of {} to db", block.written, handle.nsid)
+
}
+
Err(err) => tracing::error!("failed to sync block: {}", err),
+
});
}
-
self.syncpool.join();
+
self.sync_pool.join();
Ok(())
}
+51 -20
server/src/main.rs
···
None => {}
}
-
let db = Arc::new(Db::new(".fjall_data").expect("couldnt create db"));
+
let cancel_token = CancellationToken::new();
+
+
let db =
+
Arc::new(Db::new(".fjall_data", cancel_token.child_token()).expect("couldnt create db"));
rustls::crypto::ring::default_provider()
.install_default()
···
}
};
-
let cancel_token = CancellationToken::new();
-
+
let (event_tx, mut event_rx) = tokio::sync::mpsc::channel(1000);
let consume_events = tokio::spawn({
let consume_cancel = cancel_token.child_token();
-
let db = db.clone();
async move {
jetstream.connect().await?;
loop {
···
let Some(record) = EventRecord::from_jetstream(event) else {
continue;
};
-
let db = db.clone();
-
tokio::task::spawn_blocking(move || {
-
if let Err(err) = db.record_event(record) {
-
tracing::error!("failed to record event: {}", err);
-
}
-
});
+
event_tx.send(record).await?;
}
Err(err) => return Err(err),
},
···
}
});
-
std::thread::spawn({
+
let ingest_events = std::thread::spawn({
let db = db.clone();
move || {
+
let mut buffer = Vec::new();
loop {
-
if db.is_shutting_down() {
+
let read = event_rx.blocking_recv_many(&mut buffer, 100);
+
if let Err(err) = db.ingest_events(buffer.drain(..)) {
+
tracing::error!("failed to ingest events: {}", err);
+
}
+
if read == 0 || db.is_shutting_down() {
break;
}
-
match db.sync(false) {
-
Ok(_) => (),
-
Err(e) => tracing::error!("failed to sync db: {}", e),
+
}
+
}
+
});
+
+
let sync_task = tokio::task::spawn({
+
let db = db.clone();
+
async move {
+
loop {
+
let sync_db = tokio::task::spawn_blocking({
+
let db = db.clone();
+
move || {
+
if db.is_shutting_down() {
+
return;
+
}
+
match db.sync(false) {
+
Ok(_) => (),
+
Err(e) => tracing::error!("failed to sync db: {}", e),
+
}
+
}
+
});
+
tokio::select! {
+
_ = sync_db => {}
+
_ = db.shutting_down() => break,
+
}
+
tokio::select! {
+
_ = tokio::time::sleep(std::time::Duration::from_secs(10)) => {}
+
_ = db.shutting_down() => break,
}
-
std::thread::sleep(std::time::Duration::from_secs(10));
}
}
});
···
}
tracing::info!("shutting down...");
-
db.shutdown().expect("couldnt shutdown db");
+
cancel_token.cancel();
+
ingest_events.join().expect("failed to join ingest events");
+
sync_task.await.expect("cant join sync task");
+
db.sync(true).expect("cant sync db");
}
fn debug() {
-
let db = Db::new(".fjall_data").expect("couldnt create db");
+
let db = Db::new(".fjall_data", CancellationToken::new()).expect("couldnt create db");
for nsid in db.get_nsids() {
let nsid = nsid.deref();
for hit in db.get_hits(nsid, ..) {
···
}
fn compact() {
-
let from = Arc::new(Db::new(".fjall_data_from").expect("couldnt create db"));
-
let to = Arc::new(Db::new(".fjall_data_to").expect("couldnt create db"));
+
let cancel_token = CancellationToken::new();
+
let from = Arc::new(
+
Db::new(".fjall_data_from", cancel_token.child_token()).expect("couldnt create db"),
+
);
+
let to =
+
Arc::new(Db::new(".fjall_data_to", cancel_token.child_token()).expect("couldnt create db"));
let nsids = from.get_nsids().collect::<Vec<_>>();
let mut threads = Vec::with_capacity(nsids.len());
+13 -12
server/src/utils.rs
···
#[derive(Debug)]
pub struct RateTracker<const BUCKET_WINDOW: u64> {
buckets: Vec<AtomicU64>,
+
last_bucket_time: AtomicU64,
bucket_duration_nanos: u64,
window_duration: Duration,
-
last_bucket_time: AtomicU64,
start_time: u64, // raw time when tracker was created
}
···
}
}
+
#[inline(always)]
+
fn elapsed(&self) -> u64 {
+
CLOCK.delta_as_nanos(self.start_time, CLOCK.raw())
+
}
+
/// record an event
pub fn observe(&self) {
-
let now = CLOCK.raw();
-
self.maybe_advance_buckets(now);
+
self.maybe_advance_buckets();
-
let bucket_index = self.get_current_bucket_index(now);
+
let bucket_index = self.get_current_bucket_index();
self.buckets[bucket_index].fetch_add(1, Ordering::Relaxed);
}
/// get the current rate in events per second
pub fn rate(&self) -> f64 {
-
let now = CLOCK.raw();
-
self.maybe_advance_buckets(now);
+
self.maybe_advance_buckets();
let total_events: u64 = self
.buckets
···
total_events as f64 / self.window_duration.as_secs_f64()
}
-
fn get_current_bucket_index(&self, now: u64) -> usize {
-
let elapsed_nanos = CLOCK.delta_as_nanos(self.start_time, now);
-
let bucket_number = elapsed_nanos / self.bucket_duration_nanos;
+
fn get_current_bucket_index(&self) -> usize {
+
let bucket_number = self.elapsed() / self.bucket_duration_nanos;
(bucket_number as usize) % self.buckets.len()
}
-
fn maybe_advance_buckets(&self, now: u64) {
-
let elapsed_nanos = CLOCK.delta_as_nanos(self.start_time, now);
+
fn maybe_advance_buckets(&self) {
let current_bucket_time =
-
(elapsed_nanos / self.bucket_duration_nanos) * self.bucket_duration_nanos;
+
(self.elapsed() / self.bucket_duration_nanos) * self.bucket_duration_nanos;
let last_bucket_time = self.last_bucket_time.load(Ordering::Relaxed);
if current_bucket_time > last_bucket_time {