tracks lexicons and how many times they appeared on the jetstream

fix(server): encode_block takes correct amount of items, improve rate limiter usage

ptr.pet d0e611bc 7503870a

verified
Changed files
+61 -57
server
+26 -18
server/src/db/handle.rs
···
self.eps.rate() as usize * 60
}
-
pub fn queue(&self, event: EventRecord) {
-
self.buf.lock().push(event);
+
pub fn queue(&self, events: impl IntoIterator<Item = EventRecord>) {
+
let mut count = 0;
+
self.buf.lock().extend(events.into_iter().inspect(|_| {
+
count += 1;
+
}));
self.last_insert.store(CLOCK.raw(), AtomicOrdering::Relaxed);
-
self.eps.observe();
+
self.eps.observe(count);
}
pub fn compact(
···
items: impl IntoIterator<Item = Item>,
count: usize,
) -> AppResult<Block> {
+
if count == 0 {
+
return Err(std::io::Error::new(
+
std::io::ErrorKind::InvalidInput,
+
"no items requested",
+
)
+
.into());
+
}
let mut writer = ItemEncoder::new(
WritableByteView::with_size(ItemEncoder::encoded_len(count)),
count,
···
let mut start_timestamp = None;
let mut end_timestamp = None;
let mut written = 0_usize;
-
for item in items {
+
for item in items.into_iter().take(count) {
writer.encode(&item)?;
if start_timestamp.is_none() {
start_timestamp = Some(item.timestamp);
}
end_timestamp = Some(item.timestamp);
-
if written >= count {
-
break;
-
}
written += 1;
}
+
if written != count {
+
return Err(std::io::Error::new(
+
std::io::ErrorKind::InvalidData,
+
"unexpected number of items, invalid data?",
+
)
+
.into());
+
}
if let (Some(start_timestamp), Some(end_timestamp)) = (start_timestamp, end_timestamp) {
let value = writer.finish()?;
let key = varints_unsigned_encoded([start_timestamp, end_timestamp]);
···
}
pub fn encode_block(&self, item_count: usize) -> AppResult<Block> {
-
let block = Self::encode_block_from_items(
-
self.buf.lock().drain(..).map(|event| {
+
let mut buf = self.buf.lock();
+
let end = item_count.min(buf.len());
+
Self::encode_block_from_items(
+
buf.drain(..end).map(|event| {
Item::new(
event.timestamp,
&NsidHit {
···
)
}),
item_count,
-
)?;
-
if block.written != item_count {
-
return Err(std::io::Error::new(
-
std::io::ErrorKind::InvalidData,
-
"unexpected number of items, invalid data?",
-
)
-
.into());
-
}
-
Ok(block)
+
)
}
}
+8 -18
server/src/db/mod.rs
···
// if we disconnect for a long time, we want to sync all of what we
// have to avoid having many small blocks (even if we run compaction
// later, it reduces work until we run compaction)
-
let block_size = is_too_old
+
let block_size = (is_too_old || all)
.then_some(self.max_block_size)
.unwrap_or_else(|| {
self.max_block_size
···
pub fn ingest_events(&self, events: impl Iterator<Item = EventRecord>) -> AppResult<()> {
for (key, chunk) in events.chunk_by(|event| event.nsid.clone()).into_iter() {
let mut counts = self.get_count(&key)?;
-
let handle = self.ensure_handle(&key);
-
for event in chunk {
-
let EventRecord {
-
timestamp, deleted, ..
-
} = event.clone();
-
-
handle.queue(event);
-
+
let mut count = 0;
+
self.ensure_handle(&key).queue(chunk.inspect(|e| {
// increment count
-
counts.last_seen = timestamp;
-
if deleted {
+
counts.last_seen = e.timestamp;
+
if e.deleted {
counts.deleted_count += 1;
} else {
counts.count += 1;
}
-
-
self.eps.observe();
-
}
+
count += 1;
+
}));
+
self.eps.observe(count);
self.insert_count(&key, &counts)?;
if self.event_broadcaster.receiver_count() > 0 {
let _ = self.event_broadcaster.send((key, counts));
}
}
Ok(())
-
}
-
-
pub fn record_event(&self, e: EventRecord) -> AppResult<()> {
-
self.ingest_events(std::iter::once(e))
}
#[inline(always)]
+22 -10
server/src/main.rs
···
use std::{ops::Deref, time::Duration};
+
use itertools::Itertools;
use rclite::Arc;
use smol_str::ToSmolStr;
use tokio_util::sync::CancellationToken;
···
Arc::new(Db::new(".fjall_data_to", cancel_token.child_token()).expect("couldnt create db"));
let nsids = from.get_nsids().collect::<Vec<_>>();
+
let eps_thread = std::thread::spawn({
+
let to = to.clone();
+
move || {
+
loop {
+
std::thread::sleep(Duration::from_secs(3));
+
tracing::info!("{} rps", to.eps());
+
}
+
}
+
});
let mut threads = Vec::with_capacity(nsids.len());
let start = CLOCK.now();
for nsid in nsids {
let from = from.clone();
let to = to.clone();
threads.push(std::thread::spawn(move || {
-
tracing::info!("migrating {} ...", nsid.deref());
+
tracing::info!("{}: migrating...", nsid.deref());
let mut count = 0_u64;
-
for hit in from.get_hits(&nsid, ..) {
-
let hit = hit.expect("cant read event");
-
let data = hit.access();
-
to.record_event(EventRecord {
-
nsid: nsid.to_smolstr(),
-
timestamp: hit.timestamp,
-
deleted: data.deleted,
-
})
+
for hits in from.get_hits(&nsid, ..).chunks(100000).into_iter() {
+
to.ingest_events(hits.map(|hit| {
+
count += 1;
+
let hit = hit.expect("cant decode hit");
+
EventRecord {
+
nsid: nsid.to_smolstr(),
+
timestamp: hit.timestamp,
+
deleted: hit.access().deleted,
+
}
+
}))
.expect("cant record event");
-
count += 1;
}
+
tracing::info!("{}: ingested {} events...", nsid.deref(), count);
count
}));
}
+5 -11
server/src/utils.rs
···
}
/// record an event
-
pub fn observe(&self) {
+
pub fn observe(&self, count: u64) {
self.maybe_advance_buckets();
let bucket_index = self.get_current_bucket_index();
-
self.buckets[bucket_index].fetch_add(1, Ordering::Relaxed);
+
self.buckets[bucket_index].fetch_add(count, Ordering::Relaxed);
}
/// get the current rate in events per second
···
let tracker = DefaultRateTracker::new(Duration::from_secs(2));
// record some events
-
tracker.observe();
-
tracker.observe();
-
tracker.observe();
+
tracker.observe(3);
let rate = tracker.rate();
assert_eq!(rate, 1.5); // 3 events over 2 seconds = 1.5 events/sec
···
let tracker = DefaultRateTracker::new(Duration::from_secs(1));
// record a lot of events
-
for _ in 0..1000 {
-
tracker.observe();
-
}
+
tracker.observe(1000);
let rate = tracker.rate();
assert_eq!(rate, 1000.0); // 1000 events in 1 second
···
for _ in 0..4 {
let tracker_clone = Arc::clone(&tracker);
let handle = thread::spawn(move || {
-
for _ in 0..10 {
-
tracker_clone.observe();
-
}
+
tracker_clone.observe(10);
});
handles.push(handle);
}