tracks lexicons and how many times they appeared on the jetstream

Compare changes

Choose any two refs to compare.

Changed files
+945 -23
server
+9 -12
server/src/db/handle.rs
···
use crate::{
db::{EventRecord, NsidHit, block},
error::AppResult,
-
utils::{
-
CLOCK, DefaultRateTracker, RateTracker, ReadVariableExt, WritableByteView,
-
varints_unsigned_encoded,
-
},
+
utils::{CLOCK, DefaultRateTracker, RateTracker, ReadVariableExt, varints_unsigned_encoded},
};
pub type ItemDecoder = block::ItemDecoder<Cursor<Slice>, NsidHit>;
-
pub type ItemEncoder = block::ItemEncoder<WritableByteView, NsidHit>;
+
pub type ItemEncoder = block::ItemEncoder<Vec<u8>, NsidHit>;
pub type Item = block::Item<NsidHit>;
pub struct Block {
pub written: usize,
pub key: ByteView,
-
pub data: ByteView,
+
pub data: Vec<u8>,
}
pub struct LexiconHandle {
···
impl LexiconHandle {
pub fn new(keyspace: &Keyspace, nsid: &str) -> Self {
-
let opts = PartitionCreateOptions::default().compression(fjall::CompressionType::Miniz(9));
+
let opts = PartitionCreateOptions::default()
+
.block_size(1024 * 128)
+
.compression(fjall::CompressionType::Miniz(9));
Self {
tree: keyspace.open_partition(nsid, opts).unwrap(),
nsid: nsid.into(),
···
)
.into());
}
-
let mut writer = ItemEncoder::new(
-
WritableByteView::with_size(ItemEncoder::encoded_len(count)),
-
count,
-
);
+
let mut writer =
+
ItemEncoder::new(Vec::with_capacity(ItemEncoder::encoded_len(count)), count);
let mut start_timestamp = None;
let mut end_timestamp = None;
let mut written = 0_usize;
···
return Ok(Block {
written,
key,
-
data: value.into_inner(),
+
data: value,
});
}
Err(std::io::Error::new(std::io::ErrorKind::WriteZero, "no items are in queue").into())
+501
server/src/db_old/block.rs
···
+
use ordered_varint::Variable;
+
use rkyv::{
+
Archive, Deserialize, Serialize,
+
api::high::{HighSerializer, HighValidator},
+
bytecheck::CheckBytes,
+
de::Pool,
+
rancor::{self, Strategy},
+
ser::allocator::ArenaHandle,
+
util::AlignedVec,
+
};
+
use std::{
+
io::{self, Read, Write},
+
marker::PhantomData,
+
};
+
+
use crate::error::{AppError, AppResult};
+
+
pub struct Item<T> {
+
pub timestamp: u64,
+
data: AlignedVec,
+
phantom: PhantomData<T>,
+
}
+
+
impl<T: Archive> Item<T> {
+
pub fn access(&self) -> &T::Archived {
+
unsafe { rkyv::access_unchecked::<T::Archived>(&self.data) }
+
}
+
}
+
+
impl<T> Item<T>
+
where
+
T: Archive,
+
T::Archived: for<'a> CheckBytes<HighValidator<'a, rancor::Error>>
+
+ Deserialize<T, Strategy<Pool, rancor::Error>>,
+
{
+
pub fn deser(&self) -> AppResult<T> {
+
rkyv::from_bytes(&self.data).map_err(AppError::from)
+
}
+
}
+
+
impl<T: for<'a> Serialize<HighSerializer<AlignedVec, ArenaHandle<'a>, rancor::Error>>> Item<T> {
+
pub fn new(timestamp: u64, data: &T) -> Self {
+
Item {
+
timestamp,
+
data: unsafe { rkyv::to_bytes(data).unwrap_unchecked() },
+
phantom: PhantomData,
+
}
+
}
+
}
+
+
pub struct ItemEncoder<W: Write, T> {
+
writer: W,
+
prev_timestamp: u64,
+
prev_delta: i64,
+
_item: PhantomData<T>,
+
}
+
+
impl<W: Write, T> ItemEncoder<W, T> {
+
pub fn new(writer: W) -> Self {
+
ItemEncoder {
+
writer,
+
prev_timestamp: 0,
+
prev_delta: 0,
+
_item: PhantomData,
+
}
+
}
+
+
pub fn encode(&mut self, item: &Item<T>) -> AppResult<()> {
+
if self.prev_timestamp == 0 {
+
// self.writer.write_varint(item.timestamp)?;
+
self.prev_timestamp = item.timestamp;
+
self.write_data(&item.data)?;
+
return Ok(());
+
}
+
+
let delta = (item.timestamp as i128 - self.prev_timestamp as i128) as i64;
+
+
self.writer.write_varint(delta - self.prev_delta)?;
+
self.prev_timestamp = item.timestamp;
+
self.prev_delta = delta;
+
+
self.write_data(&item.data)?;
+
+
Ok(())
+
}
+
+
fn write_data(&mut self, data: &[u8]) -> AppResult<()> {
+
self.writer.write_varint(data.len())?;
+
self.writer.write_all(data)?;
+
Ok(())
+
}
+
+
pub fn finish(mut self) -> AppResult<W> {
+
self.writer.flush()?;
+
Ok(self.writer)
+
}
+
}
+
+
pub struct ItemDecoder<R, T> {
+
reader: R,
+
current_timestamp: u64,
+
current_delta: i64,
+
first_item: bool,
+
_item: PhantomData<T>,
+
}
+
+
impl<R: Read, T: Archive> ItemDecoder<R, T> {
+
pub fn new(reader: R, start_timestamp: u64) -> AppResult<Self> {
+
Ok(ItemDecoder {
+
reader,
+
current_timestamp: start_timestamp,
+
current_delta: 0,
+
first_item: true,
+
_item: PhantomData,
+
})
+
}
+
+
pub fn decode(&mut self) -> AppResult<Option<Item<T>>> {
+
if self.first_item {
+
// read the first timestamp
+
// let timestamp = match self.reader.read_varint::<u64>() {
+
// Ok(timestamp) => timestamp,
+
// Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
+
// Err(e) => return Err(e.into()),
+
// };
+
// self.current_timestamp = timestamp;
+
+
let Some(data_raw) = self.read_item()? else {
+
return Ok(None);
+
};
+
self.first_item = false;
+
return Ok(Some(Item {
+
timestamp: self.current_timestamp,
+
data: data_raw,
+
phantom: PhantomData,
+
}));
+
}
+
+
let Some(_delta) = self.read_timestamp()? else {
+
return Ok(None);
+
};
+
+
// read data
+
let data_raw = match self.read_item()? {
+
Some(data_raw) => data_raw,
+
None => {
+
return Err(io::Error::new(
+
io::ErrorKind::UnexpectedEof,
+
"expected data after delta",
+
)
+
.into());
+
}
+
};
+
+
Ok(Some(Item {
+
timestamp: self.current_timestamp,
+
data: data_raw,
+
phantom: PhantomData,
+
}))
+
}
+
+
// [10, 11, 12, 14] -> [1, 1, 2] -> [0, 1]
+
fn read_timestamp(&mut self) -> AppResult<Option<u64>> {
+
let delta = match self.reader.read_varint::<i64>() {
+
Ok(delta) => delta,
+
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
+
Err(e) => return Err(e.into()),
+
};
+
self.current_delta += delta;
+
self.current_timestamp =
+
(self.current_timestamp as i128 + self.current_delta as i128) as u64;
+
Ok(Some(self.current_timestamp))
+
}
+
+
fn read_item(&mut self) -> AppResult<Option<AlignedVec>> {
+
let data_len = match self.reader.read_varint::<usize>() {
+
Ok(data_len) => data_len,
+
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
+
Err(e) => return Err(e.into()),
+
};
+
let mut data_raw = AlignedVec::with_capacity(data_len);
+
for _ in 0..data_len {
+
data_raw.push(0);
+
}
+
self.reader.read_exact(data_raw.as_mut_slice())?;
+
Ok(Some(data_raw))
+
}
+
}
+
+
impl<R: Read, T: Archive> Iterator for ItemDecoder<R, T> {
+
type Item = AppResult<Item<T>>;
+
+
fn next(&mut self) -> Option<Self::Item> {
+
self.decode().transpose()
+
}
+
}
+
+
pub trait WriteVariableExt: Write {
+
fn write_varint(&mut self, value: impl Variable) -> io::Result<usize> {
+
value.encode_variable(self)
+
}
+
}
+
impl<W: Write> WriteVariableExt for W {}
+
+
pub trait ReadVariableExt: Read {
+
fn read_varint<T: Variable>(&mut self) -> io::Result<T> {
+
T::decode_variable(self)
+
}
+
}
+
impl<R: Read> ReadVariableExt for R {}
+
+
#[cfg(test)]
+
mod test {
+
use super::*;
+
use rkyv::{Archive, Deserialize, Serialize};
+
use std::io::Cursor;
+
+
#[derive(Archive, Deserialize, Serialize, Debug, PartialEq)]
+
#[rkyv(compare(PartialEq))]
+
struct TestData {
+
id: u32,
+
value: String,
+
}
+
+
#[test]
+
fn test_encoder_decoder_single_item() {
+
let data = TestData {
+
id: 123,
+
value: "test".to_string(),
+
};
+
+
let item = Item::new(1000, &data);
+
+
// encode
+
let mut buffer = Vec::new();
+
let mut encoder = ItemEncoder::new(&mut buffer);
+
encoder.encode(&item).unwrap();
+
encoder.finish().unwrap();
+
+
// decode
+
let cursor = Cursor::new(buffer);
+
let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
+
+
let decoded_item = decoder.decode().unwrap().unwrap();
+
assert_eq!(decoded_item.timestamp, 1000);
+
+
let decoded_data = decoded_item.access();
+
assert_eq!(decoded_data.id, 123);
+
assert_eq!(decoded_data.value.as_str(), "test");
+
}
+
+
#[test]
+
fn test_encoder_decoder_multiple_items() {
+
let items = vec![
+
Item::new(
+
1000,
+
&TestData {
+
id: 1,
+
value: "first".to_string(),
+
},
+
),
+
Item::new(
+
1010,
+
&TestData {
+
id: 2,
+
value: "second".to_string(),
+
},
+
),
+
Item::new(
+
1015,
+
&TestData {
+
id: 3,
+
value: "third".to_string(),
+
},
+
),
+
Item::new(
+
1025,
+
&TestData {
+
id: 4,
+
value: "fourth".to_string(),
+
},
+
),
+
];
+
+
// encode
+
let mut buffer = Vec::new();
+
let mut encoder = ItemEncoder::new(&mut buffer);
+
+
for item in &items {
+
encoder.encode(item).unwrap();
+
}
+
encoder.finish().unwrap();
+
+
// decode
+
let cursor = Cursor::new(buffer);
+
let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
+
+
let mut decoded_items = Vec::new();
+
while let Some(item) = decoder.decode().unwrap() {
+
decoded_items.push(item);
+
}
+
+
assert_eq!(decoded_items.len(), 4);
+
+
for (original, decoded) in items.iter().zip(decoded_items.iter()) {
+
assert_eq!(original.timestamp, decoded.timestamp);
+
assert_eq!(original.access().id, decoded.access().id);
+
assert_eq!(
+
original.access().value.as_str(),
+
decoded.access().value.as_str()
+
);
+
}
+
}
+
+
#[test]
+
fn test_encoder_decoder_with_iterator() {
+
let items = vec![
+
Item::new(
+
2000,
+
&TestData {
+
id: 10,
+
value: "a".to_string(),
+
},
+
),
+
Item::new(
+
2005,
+
&TestData {
+
id: 20,
+
value: "b".to_string(),
+
},
+
),
+
Item::new(
+
2012,
+
&TestData {
+
id: 30,
+
value: "c".to_string(),
+
},
+
),
+
];
+
+
// encode
+
let mut buffer = Vec::new();
+
let mut encoder = ItemEncoder::new(&mut buffer);
+
+
for item in &items {
+
encoder.encode(item).unwrap();
+
}
+
encoder.finish().unwrap();
+
+
// decode
+
let cursor = Cursor::new(buffer);
+
let decoder = ItemDecoder::<_, TestData>::new(cursor, 2000).unwrap();
+
+
let decoded_items: Result<Vec<_>, _> = decoder.collect();
+
let decoded_items = decoded_items.unwrap();
+
+
assert_eq!(decoded_items.len(), 3);
+
assert_eq!(decoded_items[0].timestamp, 2000);
+
assert_eq!(decoded_items[1].timestamp, 2005);
+
assert_eq!(decoded_items[2].timestamp, 2012);
+
+
assert_eq!(decoded_items[0].access().id, 10);
+
assert_eq!(decoded_items[1].access().id, 20);
+
assert_eq!(decoded_items[2].access().id, 30);
+
}
+
+
#[test]
+
fn test_delta_compression() {
+
let items = vec![
+
Item::new(
+
1000,
+
&TestData {
+
id: 1,
+
value: "a".to_string(),
+
},
+
),
+
Item::new(
+
1010,
+
&TestData {
+
id: 2,
+
value: "b".to_string(),
+
},
+
), // delta = 10
+
Item::new(
+
1020,
+
&TestData {
+
id: 3,
+
value: "c".to_string(),
+
},
+
), // delta = 10, delta-of-delta = 0
+
Item::new(
+
1025,
+
&TestData {
+
id: 4,
+
value: "d".to_string(),
+
},
+
), // delta = 5, delta-of-delta = -5
+
];
+
+
let mut buffer = Vec::new();
+
let mut encoder = ItemEncoder::new(&mut buffer);
+
+
for item in &items {
+
encoder.encode(item).unwrap();
+
}
+
encoder.finish().unwrap();
+
+
// decode and verify
+
let cursor = Cursor::new(buffer);
+
let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
+
+
let decoded_items: Result<Vec<_>, _> = decoder.collect();
+
let decoded_items = decoded_items.unwrap();
+
+
for (original, decoded) in items.iter().zip(decoded_items.iter()) {
+
assert_eq!(original.timestamp, decoded.timestamp);
+
assert_eq!(original.access().id, decoded.access().id);
+
}
+
}
+
+
#[test]
+
fn test_empty_decode() {
+
let buffer = Vec::new();
+
let cursor = Cursor::new(buffer);
+
let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
+
+
let result = decoder.decode().unwrap();
+
assert!(result.is_none());
+
}
+
+
#[test]
+
fn test_backwards_timestamp() {
+
let items = vec![
+
Item::new(
+
1000,
+
&TestData {
+
id: 1,
+
value: "first".to_string(),
+
},
+
),
+
Item::new(
+
900,
+
&TestData {
+
id: 2,
+
value: "second".to_string(),
+
},
+
),
+
];
+
+
let mut buffer = Vec::new();
+
let mut encoder = ItemEncoder::new(&mut buffer);
+
+
for item in &items {
+
encoder.encode(item).unwrap();
+
}
+
encoder.finish().unwrap();
+
+
let cursor = Cursor::new(buffer);
+
let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
+
+
let decoded_items: Result<Vec<_>, _> = decoder.collect();
+
let decoded_items = decoded_items.unwrap();
+
+
assert_eq!(decoded_items.len(), 2);
+
assert_eq!(decoded_items[0].timestamp, 1000);
+
assert_eq!(decoded_items[1].timestamp, 900);
+
}
+
+
#[test]
+
fn test_different_data_sizes() {
+
let small_data = TestData {
+
id: 1,
+
value: "x".to_string(),
+
};
+
let large_data = TestData {
+
id: 2,
+
value: "a".repeat(1000),
+
};
+
+
let items = vec![Item::new(1000, &small_data), Item::new(1001, &large_data)];
+
+
let mut buffer = Vec::new();
+
let mut encoder = ItemEncoder::new(&mut buffer);
+
+
for item in &items {
+
encoder.encode(item).unwrap();
+
}
+
encoder.finish().unwrap();
+
+
let cursor = Cursor::new(buffer);
+
let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
+
+
let decoded_items: Result<Vec<_>, _> = decoder.collect();
+
let decoded_items = decoded_items.unwrap();
+
+
assert_eq!(decoded_items.len(), 2);
+
assert_eq!(decoded_items[0].access().value.as_str(), "x");
+
assert_eq!(decoded_items[1].access().value.len(), 1000);
+
assert_eq!(decoded_items[1].access().value.as_str(), "a".repeat(1000));
+
}
+
}
+424
server/src/db_old/mod.rs
···
+
use std::{
+
io::Cursor,
+
ops::{Bound, Deref, RangeBounds},
+
path::Path,
+
sync::{
+
Arc,
+
atomic::{AtomicU64, AtomicUsize, Ordering as AtomicOrdering},
+
},
+
time::{Duration, Instant},
+
};
+
+
use fjall::{Config, Keyspace, Partition, PartitionCreateOptions, Slice};
+
use ordered_varint::Variable;
+
use rkyv::{Archive, Deserialize, Serialize, rancor::Error};
+
use smol_str::SmolStr;
+
use tokio::sync::broadcast;
+
+
use crate::{
+
db_old::block::{ReadVariableExt, WriteVariableExt},
+
error::{AppError, AppResult},
+
jetstream::JetstreamEvent,
+
utils::{DefaultRateTracker, get_time},
+
};
+
+
mod block;
+
+
#[derive(Clone, Debug, Default, Archive, Deserialize, Serialize, PartialEq)]
+
#[rkyv(compare(PartialEq), derive(Debug))]
+
pub struct NsidCounts {
+
pub count: u128,
+
pub deleted_count: u128,
+
pub last_seen: u64,
+
}
+
+
#[derive(Debug, Default, Archive, Deserialize, Serialize, PartialEq)]
+
#[rkyv(compare(PartialEq), derive(Debug))]
+
pub struct NsidHit {
+
pub deleted: bool,
+
}
+
+
#[derive(Clone)]
+
pub struct EventRecord {
+
pub nsid: SmolStr,
+
pub timestamp: u64, // seconds
+
pub deleted: bool,
+
}
+
+
impl EventRecord {
+
pub fn from_jetstream(event: JetstreamEvent) -> Option<Self> {
+
match event {
+
JetstreamEvent::Commit {
+
time_us, commit, ..
+
} => Some(Self {
+
nsid: commit.collection.into(),
+
timestamp: time_us / 1_000_000,
+
deleted: false,
+
}),
+
JetstreamEvent::Delete {
+
time_us, commit, ..
+
} => Some(Self {
+
nsid: commit.collection.into(),
+
timestamp: time_us / 1_000_000,
+
deleted: true,
+
}),
+
_ => None,
+
}
+
}
+
}
+
+
type ItemDecoder = block::ItemDecoder<Cursor<Slice>, NsidHit>;
+
type ItemEncoder = block::ItemEncoder<Vec<u8>, NsidHit>;
+
type Item = block::Item<NsidHit>;
+
+
pub struct LexiconHandle {
+
tree: Partition,
+
buf: Arc<scc::Queue<EventRecord>>,
+
buf_len: AtomicUsize,
+
last_insert: AtomicU64,
+
eps: DefaultRateTracker,
+
block_size: AtomicUsize,
+
}
+
+
impl LexiconHandle {
+
fn new(keyspace: &Keyspace, nsid: &str) -> Self {
+
let opts = PartitionCreateOptions::default().compression(fjall::CompressionType::Miniz(9));
+
Self {
+
tree: keyspace.open_partition(nsid, opts).unwrap(),
+
buf: Default::default(),
+
buf_len: AtomicUsize::new(0),
+
last_insert: AtomicU64::new(0),
+
eps: DefaultRateTracker::new(Duration::from_secs(5)),
+
block_size: AtomicUsize::new(1000),
+
}
+
}
+
+
fn item_count(&self) -> usize {
+
self.buf_len.load(AtomicOrdering::Acquire)
+
}
+
+
fn last_insert(&self) -> u64 {
+
self.last_insert.load(AtomicOrdering::Acquire)
+
}
+
+
fn suggested_block_size(&self) -> usize {
+
self.block_size.load(AtomicOrdering::Relaxed)
+
}
+
+
fn insert(&self, event: EventRecord) {
+
self.buf.push(event);
+
self.buf_len.fetch_add(1, AtomicOrdering::Release);
+
self.last_insert
+
.store(get_time().as_millis() as u64, AtomicOrdering::Release);
+
self.eps.observe(1);
+
let rate = self.eps.rate() as usize;
+
if rate != 0 {
+
self.block_size.store(rate * 60, AtomicOrdering::Relaxed);
+
}
+
}
+
+
fn sync(&self, max_block_size: usize) -> AppResult<usize> {
+
let mut writer = ItemEncoder::new(Vec::with_capacity(
+
size_of::<u64>() + self.item_count().min(max_block_size) * size_of::<(u64, NsidHit)>(),
+
));
+
let mut start_timestamp = None;
+
let mut end_timestamp = None;
+
let mut written = 0_usize;
+
while let Some(event) = self.buf.pop() {
+
let item = Item::new(
+
event.timestamp,
+
&NsidHit {
+
deleted: event.deleted,
+
},
+
);
+
writer.encode(&item)?;
+
if start_timestamp.is_none() {
+
start_timestamp = Some(event.timestamp);
+
}
+
end_timestamp = Some(event.timestamp);
+
if written >= max_block_size {
+
break;
+
}
+
written += 1;
+
}
+
if let (Some(start_timestamp), Some(end_timestamp)) = (start_timestamp, end_timestamp) {
+
self.buf_len.store(0, AtomicOrdering::Release);
+
let value = writer.finish()?;
+
let mut key = Vec::with_capacity(size_of::<u64>() * 2);
+
key.write_varint(start_timestamp)?;
+
key.write_varint(end_timestamp)?;
+
self.tree.insert(key, value)?;
+
}
+
Ok(written)
+
}
+
}
+
+
type BoxedIter<T> = Box<dyn Iterator<Item = T>>;
+
+
// counts is nsid -> NsidCounts
+
// hits is tree per nsid: varint start time + varint end time -> block of hits
+
pub struct Db {
+
inner: Keyspace,
+
hits: scc::HashIndex<SmolStr, Arc<LexiconHandle>>,
+
counts: Partition,
+
event_broadcaster: broadcast::Sender<(SmolStr, NsidCounts)>,
+
eps: DefaultRateTracker,
+
min_block_size: usize,
+
max_block_size: usize,
+
max_last_activity: Duration,
+
}
+
+
impl Db {
+
pub fn new(path: impl AsRef<Path>) -> AppResult<Self> {
+
tracing::info!("opening db...");
+
let ks = Config::new(path)
+
.cache_size(8 * 1024 * 1024) // from talna
+
.open()?;
+
Ok(Self {
+
hits: Default::default(),
+
counts: ks.open_partition(
+
"_counts",
+
PartitionCreateOptions::default().compression(fjall::CompressionType::None),
+
)?,
+
inner: ks,
+
event_broadcaster: broadcast::channel(1000).0,
+
eps: DefaultRateTracker::new(Duration::from_secs(1)),
+
min_block_size: 512,
+
max_block_size: 100_000,
+
max_last_activity: Duration::from_secs(10),
+
})
+
}
+
+
pub fn sync(&self, all: bool) -> AppResult<()> {
+
let _guard = scc::ebr::Guard::new();
+
for (nsid, tree) in self.hits.iter(&_guard) {
+
let count = tree.item_count();
+
let is_max_block_size = count > self.min_block_size.max(tree.suggested_block_size());
+
let is_too_old = (get_time().as_millis() as u64 - tree.last_insert())
+
> self.max_last_activity.as_millis() as u64;
+
if count > 0 && (all || is_max_block_size || is_too_old) {
+
loop {
+
let synced = tree.sync(self.max_block_size)?;
+
if synced == 0 {
+
break;
+
}
+
tracing::info!("synced {synced} of {nsid} to db");
+
}
+
}
+
}
+
Ok(())
+
}
+
+
#[inline(always)]
+
pub fn eps(&self) -> usize {
+
self.eps.rate() as usize
+
}
+
+
#[inline(always)]
+
pub fn new_listener(&self) -> broadcast::Receiver<(SmolStr, NsidCounts)> {
+
self.event_broadcaster.subscribe()
+
}
+
+
#[inline(always)]
+
fn maybe_run_in_nsid_tree<T>(
+
&self,
+
nsid: &str,
+
f: impl FnOnce(&LexiconHandle) -> T,
+
) -> Option<T> {
+
let _guard = scc::ebr::Guard::new();
+
let handle = match self.hits.peek(nsid, &_guard) {
+
Some(handle) => handle.clone(),
+
None => {
+
if self.inner.partition_exists(nsid) {
+
let handle = Arc::new(LexiconHandle::new(&self.inner, nsid));
+
let _ = self.hits.insert(SmolStr::new(nsid), handle.clone());
+
handle
+
} else {
+
return None;
+
}
+
}
+
};
+
Some(f(&handle))
+
}
+
+
#[inline(always)]
+
fn run_in_nsid_tree<T>(
+
&self,
+
nsid: SmolStr,
+
f: impl FnOnce(&LexiconHandle) -> AppResult<T>,
+
) -> AppResult<T> {
+
f(self
+
.hits
+
.entry(nsid.clone())
+
.or_insert_with(move || Arc::new(LexiconHandle::new(&self.inner, &nsid)))
+
.get())
+
}
+
+
pub fn record_event(&self, e: EventRecord) -> AppResult<()> {
+
let EventRecord {
+
nsid,
+
timestamp,
+
deleted,
+
} = e.clone();
+
+
// insert event
+
self.run_in_nsid_tree(nsid.clone(), move |tree| Ok(tree.insert(e)))?;
+
// increment count
+
let mut counts = self.get_count(&nsid)?;
+
counts.last_seen = timestamp;
+
if deleted {
+
counts.deleted_count += 1;
+
} else {
+
counts.count += 1;
+
}
+
self.insert_count(&nsid, counts.clone())?;
+
if self.event_broadcaster.receiver_count() > 0 {
+
let _ = self.event_broadcaster.send((SmolStr::new(&nsid), counts));
+
}
+
self.eps.observe(1);
+
Ok(())
+
}
+
+
#[inline(always)]
+
fn insert_count(&self, nsid: &str, counts: NsidCounts) -> AppResult<()> {
+
self.counts
+
.insert(
+
nsid,
+
unsafe { rkyv::to_bytes::<Error>(&counts).unwrap_unchecked() }.as_slice(),
+
)
+
.map_err(AppError::from)
+
}
+
+
pub fn get_count(&self, nsid: &str) -> AppResult<NsidCounts> {
+
let Some(raw) = self.counts.get(nsid)? else {
+
return Ok(NsidCounts::default());
+
};
+
Ok(unsafe { rkyv::from_bytes_unchecked::<_, Error>(&raw).unwrap_unchecked() })
+
}
+
+
pub fn get_counts(&self) -> impl Iterator<Item = AppResult<(SmolStr, NsidCounts)>> {
+
self.counts.iter().map(|res| {
+
res.map_err(AppError::from).map(|(key, val)| {
+
(
+
SmolStr::new(unsafe { str::from_utf8_unchecked(&key) }),
+
unsafe { rkyv::from_bytes_unchecked::<_, Error>(&val).unwrap_unchecked() },
+
)
+
})
+
})
+
}
+
+
pub fn get_nsids(&self) -> impl Iterator<Item = impl Deref<Target = str> + 'static> {
+
self.inner
+
.list_partitions()
+
.into_iter()
+
.filter(|k| k.deref() != "_counts")
+
}
+
+
pub fn get_hits_debug(&self, nsid: &str) -> BoxedIter<AppResult<(Slice, Slice)>> {
+
self.maybe_run_in_nsid_tree(nsid, |handle| -> BoxedIter<AppResult<(Slice, Slice)>> {
+
Box::new(
+
handle
+
.tree
+
.iter()
+
.rev()
+
.map(|res| res.map_err(AppError::from)),
+
)
+
})
+
.unwrap_or_else(|| Box::new(std::iter::empty()))
+
}
+
+
pub fn get_hits(
+
&self,
+
nsid: &str,
+
range: impl RangeBounds<u64> + std::fmt::Debug,
+
) -> BoxedIter<AppResult<Item>> {
+
let start = range
+
.start_bound()
+
.cloned()
+
.map(|t| unsafe { t.to_variable_vec().unwrap_unchecked() });
+
let end = range
+
.end_bound()
+
.cloned()
+
.map(|t| unsafe { t.to_variable_vec().unwrap_unchecked() });
+
let limit = match range.end_bound().cloned() {
+
Bound::Included(end) => end,
+
Bound::Excluded(end) => end.saturating_sub(1),
+
Bound::Unbounded => u64::MAX,
+
};
+
+
self.maybe_run_in_nsid_tree(nsid, move |handle| -> BoxedIter<AppResult<Item>> {
+
let map_block = move |(key, val)| {
+
let mut key_reader = Cursor::new(key);
+
let start_timestamp = key_reader.read_varint::<u64>()?;
+
let items =
+
ItemDecoder::new(Cursor::new(val), start_timestamp)?.take_while(move |item| {
+
item.as_ref().map_or(true, |item| item.timestamp <= limit)
+
});
+
Ok(items)
+
};
+
+
Box::new(
+
handle
+
.tree
+
.range(TimestampRange { start, end })
+
.map(move |res| res.map_err(AppError::from).and_then(map_block))
+
.flatten()
+
.flatten(),
+
)
+
})
+
.unwrap_or_else(|| Box::new(std::iter::empty()))
+
}
+
+
pub fn tracking_since(&self) -> AppResult<u64> {
+
// HACK: we should actually store when we started tracking but im lazy
+
// should be accurate enough
+
self.maybe_run_in_nsid_tree("app.bsky.feed.like", |handle| {
+
let Some((timestamps_raw, _)) = handle.tree.first_key_value()? else {
+
return Ok(0);
+
};
+
let mut timestamp_reader = Cursor::new(timestamps_raw);
+
timestamp_reader
+
.read_varint::<u64>()
+
.map_err(AppError::from)
+
})
+
.unwrap_or(Ok(0))
+
}
+
}
+
+
type TimestampRepr = Vec<u8>;
+
+
struct TimestampRange {
+
start: Bound<TimestampRepr>,
+
end: Bound<TimestampRepr>,
+
}
+
+
impl RangeBounds<TimestampRepr> for TimestampRange {
+
#[inline(always)]
+
fn start_bound(&self) -> Bound<&TimestampRepr> {
+
self.start.as_ref()
+
}
+
+
#[inline(always)]
+
fn end_bound(&self) -> Bound<&TimestampRepr> {
+
self.end.as_ref()
+
}
+
}
+
+
type TimestampReprOld = [u8; 8];
+
+
struct TimestampRangeOld {
+
start: Bound<TimestampReprOld>,
+
end: Bound<TimestampReprOld>,
+
}
+
+
impl RangeBounds<TimestampReprOld> for TimestampRangeOld {
+
#[inline(always)]
+
fn start_bound(&self) -> Bound<&TimestampReprOld> {
+
self.start.as_ref()
+
}
+
+
#[inline(always)]
+
fn end_bound(&self) -> Bound<&TimestampReprOld> {
+
self.end.as_ref()
+
}
+
}
+11 -11
server/src/main.rs
···
mod api;
mod db;
+
mod db_old;
mod error;
mod jetstream;
mod utils;
···
fn migrate() {
let cancel_token = CancellationToken::new();
-
let from = Arc::new(
-
Db::new(
-
DbConfig::default().path(".fjall_data_from"),
-
cancel_token.child_token(),
-
)
-
.expect("couldnt create db"),
-
);
+
+
let from = Arc::new(db_old::Db::new(".fjall_data_from").expect("couldnt create db"));
+
let to = Arc::new(
Db::new(
DbConfig::default().path(".fjall_data_to").ks(|c| {
-
c.max_journaling_size(1024 * 1024 * 1024 * 8)
+
c.max_journaling_size(u64::MAX)
.max_write_buffer_size(u64::MAX)
-
.compaction_workers(rayon::current_num_threads() * 2)
-
.flush_workers(rayon::current_num_threads() * 2)
+
.compaction_workers(rayon::current_num_threads() * 4)
+
.flush_workers(rayon::current_num_threads() * 4)
}),
cancel_token.child_token(),
)
···
move || {
loop {
std::thread::sleep(Duration::from_secs(3));
-
tracing::info!("{} rps", to.eps());
+
let eps = to.eps();
+
if eps > 0 {
+
tracing::info!("{} rps", eps);
+
}
}
}
});