tracks lexicons and how many times they appeared on the jetstream

Compare changes

Choose any two refs to compare.

Changed files
+978 -68
client
src
lib
server
+2 -1
client/src/lib/components/StatsCard.svelte
···
<script lang="ts">
import { formatNumber } from "$lib/format";
+
import NumberFlow from "@number-flow/svelte";
const colorClasses = {
green: {
···
{title}
</h3>
<p class="text-xl md:text-2xl font-bold {colors.valueText}">
-
{formatNumber(value)}
+
<NumberFlow {value} />
</p>
</div>
+1 -5
client/src/lib/format.ts
···
return num.toLocaleString();
};
-
const isValidDate = (d: Date) => d instanceof Date && !isNaN(d.getTime());
export const formatTimestamp = (timestamp: number): string => {
-
const date = new Date(timestamp * 1000);
-
return isValidDate(date)
-
? date.toLocaleString()
-
: new Date(timestamp / 1000).toLocaleString();
+
return new Date(timestamp / 1000).toLocaleString();
};
+10 -19
server/src/db/handle.rs
···
impl LexiconHandle {
pub fn new(keyspace: &Keyspace, nsid: &str) -> Self {
let opts = PartitionCreateOptions::default()
-
.block_size(1024 * 48)
+
.block_size(1024 * 128)
.compression(fjall::CompressionType::Miniz(9));
Self {
tree: keyspace.open_partition(nsid, opts).unwrap(),
···
}
}
-
pub fn span(&self) -> tracing::Span {
-
tracing::info_span!("handle", nsid = %self.nsid)
-
}
-
pub fn nsid(&self) -> &SmolStr {
&self.nsid
}
···
self.buf.lock().len()
}
-
pub fn since_last_activity(&self) -> Duration {
-
Duration::from_nanos(
-
CLOCK.delta_as_nanos(self.last_insert.load(AtomicOrdering::Relaxed), CLOCK.raw()),
-
)
+
pub fn since_last_activity(&self) -> u64 {
+
CLOCK.delta_as_nanos(self.last_insert.load(AtomicOrdering::Relaxed), CLOCK.raw())
}
pub fn suggested_block_size(&self) -> usize {
···
range: impl RangeBounds<u64>,
sort: bool,
) -> AppResult<()> {
-
let _span = self.span().entered();
-
let start_limit = match range.start_bound().cloned() {
Bound::Included(start) => start,
Bound::Excluded(start) => start.saturating_add(1),
···
.range(start_key..end_key)
.collect::<Result<Vec<_>, _>>()?;
if blocks_to_compact.len() < 2 {
+
tracing::info!("{}: nothing to compact", self.nsid);
return Ok(());
}
···
})?;
if sort {
-
all_items.sort_unstable_by_key(|e| std::cmp::Reverse(e.timestamp));
+
all_items.sort_unstable_by_key(|e| e.timestamp);
}
let new_blocks = all_items
···
self.tree.insert(block.key, block.data)?;
}
-
let reduction =
-
((start_blocks_size - end_blocks_size) as f64 / start_blocks_size as f64) * 100.0;
tracing::info!(
-
{
-
start = start_blocks_size,
-
end = end_blocks_size,
-
},
-
"blocks compacted {reduction:.2}%",
+
"{}: compacted {} blocks to {} blocks ({}% reduction)",
+
self.nsid,
+
start_blocks_size,
+
end_blocks_size,
+
((start_blocks_size - end_blocks_size) as f64 / start_blocks_size as f64) * 100.0,
);
Ok(())
+35 -35
server/src/db/mod.rs
···
fmt::Debug,
io::Cursor,
ops::{Bound, Deref, RangeBounds},
-
path::Path,
+
path::{Path, PathBuf},
time::Duration,
};
use byteview::StrView;
-
use fjall::{Keyspace, Partition, PartitionCreateOptions};
+
use fjall::{Config, Keyspace, Partition, PartitionCreateOptions};
use itertools::{Either, Itertools};
-
use rayon::iter::{IntoParallelIterator, ParallelIterator};
+
use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
use rclite::Arc;
use rkyv::{Archive, Deserialize, Serialize, rancor::Error};
use smol_str::{SmolStr, ToSmolStr};
···
db::handle::{ItemDecoder, LexiconHandle},
error::{AppError, AppResult},
jetstream::JetstreamEvent,
-
utils::{CLOCK, RateTracker, ReadVariableExt, varints_unsigned_encoded},
+
utils::{RateTracker, ReadVariableExt, varints_unsigned_encoded},
};
mod block;
···
pub ks_config: fjall::Config,
pub min_block_size: usize,
pub max_block_size: usize,
-
pub max_last_activity: Duration,
+
pub max_last_activity: u64,
}
impl DbConfig {
···
impl Default for DbConfig {
fn default() -> Self {
Self {
-
ks_config: fjall::Config::default().cache_size(1024 * 1024 * 512),
-
min_block_size: 1000,
-
max_block_size: 250_000,
-
max_last_activity: Duration::from_secs(10),
+
ks_config: fjall::Config::default(),
+
min_block_size: 512,
+
max_block_size: 500_000,
+
max_last_activity: Duration::from_secs(10).as_nanos() as u64,
}
}
}
···
hits: scc::HashIndex<SmolStr, Arc<LexiconHandle>>,
sync_pool: threadpool::ThreadPool,
event_broadcaster: broadcast::Sender<(SmolStr, NsidCounts)>,
-
eps: RateTracker<100>, // 100 millis buckets
+
eps: RateTracker<100>,
cancel_token: CancellationToken,
}
···
}
pub fn sync(&self, all: bool) -> AppResult<()> {
-
let start = CLOCK.now();
// prepare all the data
let mut data = Vec::with_capacity(self.hits.len());
let _guard = scc::ebr::Guard::new();
···
let count = handle.item_count();
let data_count = count / block_size;
if count > 0 && (all || data_count > 0 || is_too_old) {
-
for _ in 0..data_count {
-
nsid_data.push((handle.clone(), block_size));
+
for i in 0..data_count {
+
nsid_data.push((i, handle.clone(), block_size));
total_count += block_size;
}
// only sync remainder if we haven't met block size
let remainder = count % block_size;
if (all || data_count == 0) && remainder > 0 {
-
nsid_data.push((handle.clone(), remainder));
+
nsid_data.push((data_count, handle.clone(), remainder));
total_count += remainder;
}
}
-
let _span = handle.span().entered();
-
if nsid_data.len() > 0 {
-
tracing::info!(
-
{blocks = %nsid_data.len(), count = %total_count},
-
"will encode & sync",
-
);
-
data.push(nsid_data);
-
}
+
tracing::info!(
+
"{}: will sync {} blocks ({} count)",
+
handle.nsid(),
+
nsid_data.len(),
+
total_count,
+
);
+
data.push(nsid_data);
}
drop(_guard);
···
.map(|chunk| {
chunk
.into_iter()
-
.map(|(handle, max_block_size)| {
-
(handle.take_block_items(max_block_size), handle)
+
.map(|(i, handle, max_block_size)| {
+
(i, handle.take_block_items(max_block_size), handle)
})
.collect::<Vec<_>>()
.into_par_iter()
-
.map(|(items, handle)| {
+
.map(|(i, items, handle)| {
let count = items.len();
let block = LexiconHandle::encode_block_from_items(items, count)?;
-
AppResult::Ok((block, handle))
+
tracing::info!(
+
"{}: encoded block with {} items",
+
handle.nsid(),
+
block.written,
+
);
+
AppResult::Ok((i, block, handle))
})
.collect::<Result<Vec<_>, _>>()
})
.try_for_each(|chunk| {
let chunk = chunk?;
-
for (block, handle) in chunk {
-
self.sync_pool.execute(move || {
-
let _span = handle.span().entered();
-
match handle.insert(block.key, block.data) {
+
for (i, block, handle) in chunk {
+
self.sync_pool
+
.execute(move || match handle.insert(block.key, block.data) {
Ok(_) => {
-
tracing::info!({count = %block.written}, "synced")
+
tracing::info!("{}: [{i}] synced {}", block.written, handle.nsid())
}
-
Err(err) => tracing::error!({ err = %err }, "failed to sync block"),
-
}
-
});
+
Err(err) => tracing::error!("failed to sync block: {}", err),
+
});
}
AppResult::Ok(())
})?;
self.sync_pool.join();
-
tracing::info!(time = %start.elapsed().as_secs_f64(), "synced all blocks");
Ok(())
}
+501
server/src/db_old/block.rs
···
+
use ordered_varint::Variable;
+
use rkyv::{
+
Archive, Deserialize, Serialize,
+
api::high::{HighSerializer, HighValidator},
+
bytecheck::CheckBytes,
+
de::Pool,
+
rancor::{self, Strategy},
+
ser::allocator::ArenaHandle,
+
util::AlignedVec,
+
};
+
use std::{
+
io::{self, Read, Write},
+
marker::PhantomData,
+
};
+
+
use crate::error::{AppError, AppResult};
+
+
pub struct Item<T> {
+
pub timestamp: u64,
+
data: AlignedVec,
+
phantom: PhantomData<T>,
+
}
+
+
impl<T: Archive> Item<T> {
+
pub fn access(&self) -> &T::Archived {
+
unsafe { rkyv::access_unchecked::<T::Archived>(&self.data) }
+
}
+
}
+
+
impl<T> Item<T>
+
where
+
T: Archive,
+
T::Archived: for<'a> CheckBytes<HighValidator<'a, rancor::Error>>
+
+ Deserialize<T, Strategy<Pool, rancor::Error>>,
+
{
+
pub fn deser(&self) -> AppResult<T> {
+
rkyv::from_bytes(&self.data).map_err(AppError::from)
+
}
+
}
+
+
impl<T: for<'a> Serialize<HighSerializer<AlignedVec, ArenaHandle<'a>, rancor::Error>>> Item<T> {
+
pub fn new(timestamp: u64, data: &T) -> Self {
+
Item {
+
timestamp,
+
data: unsafe { rkyv::to_bytes(data).unwrap_unchecked() },
+
phantom: PhantomData,
+
}
+
}
+
}
+
+
pub struct ItemEncoder<W: Write, T> {
+
writer: W,
+
prev_timestamp: u64,
+
prev_delta: i64,
+
_item: PhantomData<T>,
+
}
+
+
impl<W: Write, T> ItemEncoder<W, T> {
+
pub fn new(writer: W) -> Self {
+
ItemEncoder {
+
writer,
+
prev_timestamp: 0,
+
prev_delta: 0,
+
_item: PhantomData,
+
}
+
}
+
+
pub fn encode(&mut self, item: &Item<T>) -> AppResult<()> {
+
if self.prev_timestamp == 0 {
+
// self.writer.write_varint(item.timestamp)?;
+
self.prev_timestamp = item.timestamp;
+
self.write_data(&item.data)?;
+
return Ok(());
+
}
+
+
let delta = (item.timestamp as i128 - self.prev_timestamp as i128) as i64;
+
+
self.writer.write_varint(delta - self.prev_delta)?;
+
self.prev_timestamp = item.timestamp;
+
self.prev_delta = delta;
+
+
self.write_data(&item.data)?;
+
+
Ok(())
+
}
+
+
fn write_data(&mut self, data: &[u8]) -> AppResult<()> {
+
self.writer.write_varint(data.len())?;
+
self.writer.write_all(data)?;
+
Ok(())
+
}
+
+
pub fn finish(mut self) -> AppResult<W> {
+
self.writer.flush()?;
+
Ok(self.writer)
+
}
+
}
+
+
pub struct ItemDecoder<R, T> {
+
reader: R,
+
current_timestamp: u64,
+
current_delta: i64,
+
first_item: bool,
+
_item: PhantomData<T>,
+
}
+
+
impl<R: Read, T: Archive> ItemDecoder<R, T> {
+
pub fn new(reader: R, start_timestamp: u64) -> AppResult<Self> {
+
Ok(ItemDecoder {
+
reader,
+
current_timestamp: start_timestamp,
+
current_delta: 0,
+
first_item: true,
+
_item: PhantomData,
+
})
+
}
+
+
pub fn decode(&mut self) -> AppResult<Option<Item<T>>> {
+
if self.first_item {
+
// read the first timestamp
+
// let timestamp = match self.reader.read_varint::<u64>() {
+
// Ok(timestamp) => timestamp,
+
// Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
+
// Err(e) => return Err(e.into()),
+
// };
+
// self.current_timestamp = timestamp;
+
+
let Some(data_raw) = self.read_item()? else {
+
return Ok(None);
+
};
+
self.first_item = false;
+
return Ok(Some(Item {
+
timestamp: self.current_timestamp,
+
data: data_raw,
+
phantom: PhantomData,
+
}));
+
}
+
+
let Some(_delta) = self.read_timestamp()? else {
+
return Ok(None);
+
};
+
+
// read data
+
let data_raw = match self.read_item()? {
+
Some(data_raw) => data_raw,
+
None => {
+
return Err(io::Error::new(
+
io::ErrorKind::UnexpectedEof,
+
"expected data after delta",
+
)
+
.into());
+
}
+
};
+
+
Ok(Some(Item {
+
timestamp: self.current_timestamp,
+
data: data_raw,
+
phantom: PhantomData,
+
}))
+
}
+
+
// [10, 11, 12, 14] -> [1, 1, 2] -> [0, 1]
+
fn read_timestamp(&mut self) -> AppResult<Option<u64>> {
+
let delta = match self.reader.read_varint::<i64>() {
+
Ok(delta) => delta,
+
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
+
Err(e) => return Err(e.into()),
+
};
+
self.current_delta += delta;
+
self.current_timestamp =
+
(self.current_timestamp as i128 + self.current_delta as i128) as u64;
+
Ok(Some(self.current_timestamp))
+
}
+
+
fn read_item(&mut self) -> AppResult<Option<AlignedVec>> {
+
let data_len = match self.reader.read_varint::<usize>() {
+
Ok(data_len) => data_len,
+
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
+
Err(e) => return Err(e.into()),
+
};
+
let mut data_raw = AlignedVec::with_capacity(data_len);
+
for _ in 0..data_len {
+
data_raw.push(0);
+
}
+
self.reader.read_exact(data_raw.as_mut_slice())?;
+
Ok(Some(data_raw))
+
}
+
}
+
+
impl<R: Read, T: Archive> Iterator for ItemDecoder<R, T> {
+
type Item = AppResult<Item<T>>;
+
+
fn next(&mut self) -> Option<Self::Item> {
+
self.decode().transpose()
+
}
+
}
+
+
pub trait WriteVariableExt: Write {
+
fn write_varint(&mut self, value: impl Variable) -> io::Result<usize> {
+
value.encode_variable(self)
+
}
+
}
+
impl<W: Write> WriteVariableExt for W {}
+
+
pub trait ReadVariableExt: Read {
+
fn read_varint<T: Variable>(&mut self) -> io::Result<T> {
+
T::decode_variable(self)
+
}
+
}
+
impl<R: Read> ReadVariableExt for R {}
+
+
#[cfg(test)]
+
mod test {
+
use super::*;
+
use rkyv::{Archive, Deserialize, Serialize};
+
use std::io::Cursor;
+
+
#[derive(Archive, Deserialize, Serialize, Debug, PartialEq)]
+
#[rkyv(compare(PartialEq))]
+
struct TestData {
+
id: u32,
+
value: String,
+
}
+
+
#[test]
+
fn test_encoder_decoder_single_item() {
+
let data = TestData {
+
id: 123,
+
value: "test".to_string(),
+
};
+
+
let item = Item::new(1000, &data);
+
+
// encode
+
let mut buffer = Vec::new();
+
let mut encoder = ItemEncoder::new(&mut buffer);
+
encoder.encode(&item).unwrap();
+
encoder.finish().unwrap();
+
+
// decode
+
let cursor = Cursor::new(buffer);
+
let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
+
+
let decoded_item = decoder.decode().unwrap().unwrap();
+
assert_eq!(decoded_item.timestamp, 1000);
+
+
let decoded_data = decoded_item.access();
+
assert_eq!(decoded_data.id, 123);
+
assert_eq!(decoded_data.value.as_str(), "test");
+
}
+
+
#[test]
+
fn test_encoder_decoder_multiple_items() {
+
let items = vec![
+
Item::new(
+
1000,
+
&TestData {
+
id: 1,
+
value: "first".to_string(),
+
},
+
),
+
Item::new(
+
1010,
+
&TestData {
+
id: 2,
+
value: "second".to_string(),
+
},
+
),
+
Item::new(
+
1015,
+
&TestData {
+
id: 3,
+
value: "third".to_string(),
+
},
+
),
+
Item::new(
+
1025,
+
&TestData {
+
id: 4,
+
value: "fourth".to_string(),
+
},
+
),
+
];
+
+
// encode
+
let mut buffer = Vec::new();
+
let mut encoder = ItemEncoder::new(&mut buffer);
+
+
for item in &items {
+
encoder.encode(item).unwrap();
+
}
+
encoder.finish().unwrap();
+
+
// decode
+
let cursor = Cursor::new(buffer);
+
let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
+
+
let mut decoded_items = Vec::new();
+
while let Some(item) = decoder.decode().unwrap() {
+
decoded_items.push(item);
+
}
+
+
assert_eq!(decoded_items.len(), 4);
+
+
for (original, decoded) in items.iter().zip(decoded_items.iter()) {
+
assert_eq!(original.timestamp, decoded.timestamp);
+
assert_eq!(original.access().id, decoded.access().id);
+
assert_eq!(
+
original.access().value.as_str(),
+
decoded.access().value.as_str()
+
);
+
}
+
}
+
+
#[test]
+
fn test_encoder_decoder_with_iterator() {
+
let items = vec![
+
Item::new(
+
2000,
+
&TestData {
+
id: 10,
+
value: "a".to_string(),
+
},
+
),
+
Item::new(
+
2005,
+
&TestData {
+
id: 20,
+
value: "b".to_string(),
+
},
+
),
+
Item::new(
+
2012,
+
&TestData {
+
id: 30,
+
value: "c".to_string(),
+
},
+
),
+
];
+
+
// encode
+
let mut buffer = Vec::new();
+
let mut encoder = ItemEncoder::new(&mut buffer);
+
+
for item in &items {
+
encoder.encode(item).unwrap();
+
}
+
encoder.finish().unwrap();
+
+
// decode
+
let cursor = Cursor::new(buffer);
+
let decoder = ItemDecoder::<_, TestData>::new(cursor, 2000).unwrap();
+
+
let decoded_items: Result<Vec<_>, _> = decoder.collect();
+
let decoded_items = decoded_items.unwrap();
+
+
assert_eq!(decoded_items.len(), 3);
+
assert_eq!(decoded_items[0].timestamp, 2000);
+
assert_eq!(decoded_items[1].timestamp, 2005);
+
assert_eq!(decoded_items[2].timestamp, 2012);
+
+
assert_eq!(decoded_items[0].access().id, 10);
+
assert_eq!(decoded_items[1].access().id, 20);
+
assert_eq!(decoded_items[2].access().id, 30);
+
}
+
+
#[test]
+
fn test_delta_compression() {
+
let items = vec![
+
Item::new(
+
1000,
+
&TestData {
+
id: 1,
+
value: "a".to_string(),
+
},
+
),
+
Item::new(
+
1010,
+
&TestData {
+
id: 2,
+
value: "b".to_string(),
+
},
+
), // delta = 10
+
Item::new(
+
1020,
+
&TestData {
+
id: 3,
+
value: "c".to_string(),
+
},
+
), // delta = 10, delta-of-delta = 0
+
Item::new(
+
1025,
+
&TestData {
+
id: 4,
+
value: "d".to_string(),
+
},
+
), // delta = 5, delta-of-delta = -5
+
];
+
+
let mut buffer = Vec::new();
+
let mut encoder = ItemEncoder::new(&mut buffer);
+
+
for item in &items {
+
encoder.encode(item).unwrap();
+
}
+
encoder.finish().unwrap();
+
+
// decode and verify
+
let cursor = Cursor::new(buffer);
+
let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
+
+
let decoded_items: Result<Vec<_>, _> = decoder.collect();
+
let decoded_items = decoded_items.unwrap();
+
+
for (original, decoded) in items.iter().zip(decoded_items.iter()) {
+
assert_eq!(original.timestamp, decoded.timestamp);
+
assert_eq!(original.access().id, decoded.access().id);
+
}
+
}
+
+
#[test]
+
fn test_empty_decode() {
+
let buffer = Vec::new();
+
let cursor = Cursor::new(buffer);
+
let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
+
+
let result = decoder.decode().unwrap();
+
assert!(result.is_none());
+
}
+
+
#[test]
+
fn test_backwards_timestamp() {
+
let items = vec![
+
Item::new(
+
1000,
+
&TestData {
+
id: 1,
+
value: "first".to_string(),
+
},
+
),
+
Item::new(
+
900,
+
&TestData {
+
id: 2,
+
value: "second".to_string(),
+
},
+
),
+
];
+
+
let mut buffer = Vec::new();
+
let mut encoder = ItemEncoder::new(&mut buffer);
+
+
for item in &items {
+
encoder.encode(item).unwrap();
+
}
+
encoder.finish().unwrap();
+
+
let cursor = Cursor::new(buffer);
+
let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
+
+
let decoded_items: Result<Vec<_>, _> = decoder.collect();
+
let decoded_items = decoded_items.unwrap();
+
+
assert_eq!(decoded_items.len(), 2);
+
assert_eq!(decoded_items[0].timestamp, 1000);
+
assert_eq!(decoded_items[1].timestamp, 900);
+
}
+
+
#[test]
+
fn test_different_data_sizes() {
+
let small_data = TestData {
+
id: 1,
+
value: "x".to_string(),
+
};
+
let large_data = TestData {
+
id: 2,
+
value: "a".repeat(1000),
+
};
+
+
let items = vec![Item::new(1000, &small_data), Item::new(1001, &large_data)];
+
+
let mut buffer = Vec::new();
+
let mut encoder = ItemEncoder::new(&mut buffer);
+
+
for item in &items {
+
encoder.encode(item).unwrap();
+
}
+
encoder.finish().unwrap();
+
+
let cursor = Cursor::new(buffer);
+
let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
+
+
let decoded_items: Result<Vec<_>, _> = decoder.collect();
+
let decoded_items = decoded_items.unwrap();
+
+
assert_eq!(decoded_items.len(), 2);
+
assert_eq!(decoded_items[0].access().value.as_str(), "x");
+
assert_eq!(decoded_items[1].access().value.len(), 1000);
+
assert_eq!(decoded_items[1].access().value.as_str(), "a".repeat(1000));
+
}
+
}
+424
server/src/db_old/mod.rs
···
+
use std::{
+
io::Cursor,
+
ops::{Bound, Deref, RangeBounds},
+
path::Path,
+
sync::{
+
Arc,
+
atomic::{AtomicU64, AtomicUsize, Ordering as AtomicOrdering},
+
},
+
time::{Duration, Instant},
+
};
+
+
use fjall::{Config, Keyspace, Partition, PartitionCreateOptions, Slice};
+
use ordered_varint::Variable;
+
use rkyv::{Archive, Deserialize, Serialize, rancor::Error};
+
use smol_str::SmolStr;
+
use tokio::sync::broadcast;
+
+
use crate::{
+
db_old::block::{ReadVariableExt, WriteVariableExt},
+
error::{AppError, AppResult},
+
jetstream::JetstreamEvent,
+
utils::{DefaultRateTracker, get_time},
+
};
+
+
mod block;
+
+
#[derive(Clone, Debug, Default, Archive, Deserialize, Serialize, PartialEq)]
+
#[rkyv(compare(PartialEq), derive(Debug))]
+
pub struct NsidCounts {
+
pub count: u128,
+
pub deleted_count: u128,
+
pub last_seen: u64,
+
}
+
+
#[derive(Debug, Default, Archive, Deserialize, Serialize, PartialEq)]
+
#[rkyv(compare(PartialEq), derive(Debug))]
+
pub struct NsidHit {
+
pub deleted: bool,
+
}
+
+
#[derive(Clone)]
+
pub struct EventRecord {
+
pub nsid: SmolStr,
+
pub timestamp: u64, // seconds
+
pub deleted: bool,
+
}
+
+
impl EventRecord {
+
pub fn from_jetstream(event: JetstreamEvent) -> Option<Self> {
+
match event {
+
JetstreamEvent::Commit {
+
time_us, commit, ..
+
} => Some(Self {
+
nsid: commit.collection.into(),
+
timestamp: time_us / 1_000_000,
+
deleted: false,
+
}),
+
JetstreamEvent::Delete {
+
time_us, commit, ..
+
} => Some(Self {
+
nsid: commit.collection.into(),
+
timestamp: time_us / 1_000_000,
+
deleted: true,
+
}),
+
_ => None,
+
}
+
}
+
}
+
+
type ItemDecoder = block::ItemDecoder<Cursor<Slice>, NsidHit>;
+
type ItemEncoder = block::ItemEncoder<Vec<u8>, NsidHit>;
+
type Item = block::Item<NsidHit>;
+
+
pub struct LexiconHandle {
+
tree: Partition,
+
buf: Arc<scc::Queue<EventRecord>>,
+
buf_len: AtomicUsize,
+
last_insert: AtomicU64,
+
eps: DefaultRateTracker,
+
block_size: AtomicUsize,
+
}
+
+
impl LexiconHandle {
+
fn new(keyspace: &Keyspace, nsid: &str) -> Self {
+
let opts = PartitionCreateOptions::default().compression(fjall::CompressionType::Miniz(9));
+
Self {
+
tree: keyspace.open_partition(nsid, opts).unwrap(),
+
buf: Default::default(),
+
buf_len: AtomicUsize::new(0),
+
last_insert: AtomicU64::new(0),
+
eps: DefaultRateTracker::new(Duration::from_secs(5)),
+
block_size: AtomicUsize::new(1000),
+
}
+
}
+
+
fn item_count(&self) -> usize {
+
self.buf_len.load(AtomicOrdering::Acquire)
+
}
+
+
fn last_insert(&self) -> u64 {
+
self.last_insert.load(AtomicOrdering::Acquire)
+
}
+
+
fn suggested_block_size(&self) -> usize {
+
self.block_size.load(AtomicOrdering::Relaxed)
+
}
+
+
fn insert(&self, event: EventRecord) {
+
self.buf.push(event);
+
self.buf_len.fetch_add(1, AtomicOrdering::Release);
+
self.last_insert
+
.store(get_time().as_millis() as u64, AtomicOrdering::Release);
+
self.eps.observe(1);
+
let rate = self.eps.rate() as usize;
+
if rate != 0 {
+
self.block_size.store(rate * 60, AtomicOrdering::Relaxed);
+
}
+
}
+
+
fn sync(&self, max_block_size: usize) -> AppResult<usize> {
+
let mut writer = ItemEncoder::new(Vec::with_capacity(
+
size_of::<u64>() + self.item_count().min(max_block_size) * size_of::<(u64, NsidHit)>(),
+
));
+
let mut start_timestamp = None;
+
let mut end_timestamp = None;
+
let mut written = 0_usize;
+
while let Some(event) = self.buf.pop() {
+
let item = Item::new(
+
event.timestamp,
+
&NsidHit {
+
deleted: event.deleted,
+
},
+
);
+
writer.encode(&item)?;
+
if start_timestamp.is_none() {
+
start_timestamp = Some(event.timestamp);
+
}
+
end_timestamp = Some(event.timestamp);
+
if written >= max_block_size {
+
break;
+
}
+
written += 1;
+
}
+
if let (Some(start_timestamp), Some(end_timestamp)) = (start_timestamp, end_timestamp) {
+
self.buf_len.store(0, AtomicOrdering::Release);
+
let value = writer.finish()?;
+
let mut key = Vec::with_capacity(size_of::<u64>() * 2);
+
key.write_varint(start_timestamp)?;
+
key.write_varint(end_timestamp)?;
+
self.tree.insert(key, value)?;
+
}
+
Ok(written)
+
}
+
}
+
+
type BoxedIter<T> = Box<dyn Iterator<Item = T>>;
+
+
// counts is nsid -> NsidCounts
+
// hits is tree per nsid: varint start time + varint end time -> block of hits
+
pub struct Db {
+
inner: Keyspace,
+
hits: scc::HashIndex<SmolStr, Arc<LexiconHandle>>,
+
counts: Partition,
+
event_broadcaster: broadcast::Sender<(SmolStr, NsidCounts)>,
+
eps: DefaultRateTracker,
+
min_block_size: usize,
+
max_block_size: usize,
+
max_last_activity: Duration,
+
}
+
+
impl Db {
+
pub fn new(path: impl AsRef<Path>) -> AppResult<Self> {
+
tracing::info!("opening db...");
+
let ks = Config::new(path)
+
.cache_size(8 * 1024 * 1024) // from talna
+
.open()?;
+
Ok(Self {
+
hits: Default::default(),
+
counts: ks.open_partition(
+
"_counts",
+
PartitionCreateOptions::default().compression(fjall::CompressionType::None),
+
)?,
+
inner: ks,
+
event_broadcaster: broadcast::channel(1000).0,
+
eps: DefaultRateTracker::new(Duration::from_secs(1)),
+
min_block_size: 512,
+
max_block_size: 100_000,
+
max_last_activity: Duration::from_secs(10),
+
})
+
}
+
+
pub fn sync(&self, all: bool) -> AppResult<()> {
+
let _guard = scc::ebr::Guard::new();
+
for (nsid, tree) in self.hits.iter(&_guard) {
+
let count = tree.item_count();
+
let is_max_block_size = count > self.min_block_size.max(tree.suggested_block_size());
+
let is_too_old = (get_time().as_millis() as u64 - tree.last_insert())
+
> self.max_last_activity.as_millis() as u64;
+
if count > 0 && (all || is_max_block_size || is_too_old) {
+
loop {
+
let synced = tree.sync(self.max_block_size)?;
+
if synced == 0 {
+
break;
+
}
+
tracing::info!("synced {synced} of {nsid} to db");
+
}
+
}
+
}
+
Ok(())
+
}
+
+
#[inline(always)]
+
pub fn eps(&self) -> usize {
+
self.eps.rate() as usize
+
}
+
+
#[inline(always)]
+
pub fn new_listener(&self) -> broadcast::Receiver<(SmolStr, NsidCounts)> {
+
self.event_broadcaster.subscribe()
+
}
+
+
#[inline(always)]
+
fn maybe_run_in_nsid_tree<T>(
+
&self,
+
nsid: &str,
+
f: impl FnOnce(&LexiconHandle) -> T,
+
) -> Option<T> {
+
let _guard = scc::ebr::Guard::new();
+
let handle = match self.hits.peek(nsid, &_guard) {
+
Some(handle) => handle.clone(),
+
None => {
+
if self.inner.partition_exists(nsid) {
+
let handle = Arc::new(LexiconHandle::new(&self.inner, nsid));
+
let _ = self.hits.insert(SmolStr::new(nsid), handle.clone());
+
handle
+
} else {
+
return None;
+
}
+
}
+
};
+
Some(f(&handle))
+
}
+
+
#[inline(always)]
+
fn run_in_nsid_tree<T>(
+
&self,
+
nsid: SmolStr,
+
f: impl FnOnce(&LexiconHandle) -> AppResult<T>,
+
) -> AppResult<T> {
+
f(self
+
.hits
+
.entry(nsid.clone())
+
.or_insert_with(move || Arc::new(LexiconHandle::new(&self.inner, &nsid)))
+
.get())
+
}
+
+
pub fn record_event(&self, e: EventRecord) -> AppResult<()> {
+
let EventRecord {
+
nsid,
+
timestamp,
+
deleted,
+
} = e.clone();
+
+
// insert event
+
self.run_in_nsid_tree(nsid.clone(), move |tree| Ok(tree.insert(e)))?;
+
// increment count
+
let mut counts = self.get_count(&nsid)?;
+
counts.last_seen = timestamp;
+
if deleted {
+
counts.deleted_count += 1;
+
} else {
+
counts.count += 1;
+
}
+
self.insert_count(&nsid, counts.clone())?;
+
if self.event_broadcaster.receiver_count() > 0 {
+
let _ = self.event_broadcaster.send((SmolStr::new(&nsid), counts));
+
}
+
self.eps.observe(1);
+
Ok(())
+
}
+
+
#[inline(always)]
+
fn insert_count(&self, nsid: &str, counts: NsidCounts) -> AppResult<()> {
+
self.counts
+
.insert(
+
nsid,
+
unsafe { rkyv::to_bytes::<Error>(&counts).unwrap_unchecked() }.as_slice(),
+
)
+
.map_err(AppError::from)
+
}
+
+
pub fn get_count(&self, nsid: &str) -> AppResult<NsidCounts> {
+
let Some(raw) = self.counts.get(nsid)? else {
+
return Ok(NsidCounts::default());
+
};
+
Ok(unsafe { rkyv::from_bytes_unchecked::<_, Error>(&raw).unwrap_unchecked() })
+
}
+
+
pub fn get_counts(&self) -> impl Iterator<Item = AppResult<(SmolStr, NsidCounts)>> {
+
self.counts.iter().map(|res| {
+
res.map_err(AppError::from).map(|(key, val)| {
+
(
+
SmolStr::new(unsafe { str::from_utf8_unchecked(&key) }),
+
unsafe { rkyv::from_bytes_unchecked::<_, Error>(&val).unwrap_unchecked() },
+
)
+
})
+
})
+
}
+
+
pub fn get_nsids(&self) -> impl Iterator<Item = impl Deref<Target = str> + 'static> {
+
self.inner
+
.list_partitions()
+
.into_iter()
+
.filter(|k| k.deref() != "_counts")
+
}
+
+
pub fn get_hits_debug(&self, nsid: &str) -> BoxedIter<AppResult<(Slice, Slice)>> {
+
self.maybe_run_in_nsid_tree(nsid, |handle| -> BoxedIter<AppResult<(Slice, Slice)>> {
+
Box::new(
+
handle
+
.tree
+
.iter()
+
.rev()
+
.map(|res| res.map_err(AppError::from)),
+
)
+
})
+
.unwrap_or_else(|| Box::new(std::iter::empty()))
+
}
+
+
pub fn get_hits(
+
&self,
+
nsid: &str,
+
range: impl RangeBounds<u64> + std::fmt::Debug,
+
) -> BoxedIter<AppResult<Item>> {
+
let start = range
+
.start_bound()
+
.cloned()
+
.map(|t| unsafe { t.to_variable_vec().unwrap_unchecked() });
+
let end = range
+
.end_bound()
+
.cloned()
+
.map(|t| unsafe { t.to_variable_vec().unwrap_unchecked() });
+
let limit = match range.end_bound().cloned() {
+
Bound::Included(end) => end,
+
Bound::Excluded(end) => end.saturating_sub(1),
+
Bound::Unbounded => u64::MAX,
+
};
+
+
self.maybe_run_in_nsid_tree(nsid, move |handle| -> BoxedIter<AppResult<Item>> {
+
let map_block = move |(key, val)| {
+
let mut key_reader = Cursor::new(key);
+
let start_timestamp = key_reader.read_varint::<u64>()?;
+
let items =
+
ItemDecoder::new(Cursor::new(val), start_timestamp)?.take_while(move |item| {
+
item.as_ref().map_or(true, |item| item.timestamp <= limit)
+
});
+
Ok(items)
+
};
+
+
Box::new(
+
handle
+
.tree
+
.range(TimestampRange { start, end })
+
.map(move |res| res.map_err(AppError::from).and_then(map_block))
+
.flatten()
+
.flatten(),
+
)
+
})
+
.unwrap_or_else(|| Box::new(std::iter::empty()))
+
}
+
+
pub fn tracking_since(&self) -> AppResult<u64> {
+
// HACK: we should actually store when we started tracking but im lazy
+
// should be accurate enough
+
self.maybe_run_in_nsid_tree("app.bsky.feed.like", |handle| {
+
let Some((timestamps_raw, _)) = handle.tree.first_key_value()? else {
+
return Ok(0);
+
};
+
let mut timestamp_reader = Cursor::new(timestamps_raw);
+
timestamp_reader
+
.read_varint::<u64>()
+
.map_err(AppError::from)
+
})
+
.unwrap_or(Ok(0))
+
}
+
}
+
+
type TimestampRepr = Vec<u8>;
+
+
struct TimestampRange {
+
start: Bound<TimestampRepr>,
+
end: Bound<TimestampRepr>,
+
}
+
+
impl RangeBounds<TimestampRepr> for TimestampRange {
+
#[inline(always)]
+
fn start_bound(&self) -> Bound<&TimestampRepr> {
+
self.start.as_ref()
+
}
+
+
#[inline(always)]
+
fn end_bound(&self) -> Bound<&TimestampRepr> {
+
self.end.as_ref()
+
}
+
}
+
+
type TimestampReprOld = [u8; 8];
+
+
struct TimestampRangeOld {
+
start: Bound<TimestampReprOld>,
+
end: Bound<TimestampReprOld>,
+
}
+
+
impl RangeBounds<TimestampReprOld> for TimestampRangeOld {
+
#[inline(always)]
+
fn start_bound(&self) -> Bound<&TimestampReprOld> {
+
self.start.as_ref()
+
}
+
+
#[inline(always)]
+
fn end_bound(&self) -> Bound<&TimestampReprOld> {
+
self.end.as_ref()
+
}
+
}
+5 -8
server/src/main.rs
···
mod api;
mod db;
+
mod db_old;
mod error;
mod jetstream;
mod utils;
···
fn migrate() {
let cancel_token = CancellationToken::new();
-
let from = Arc::new(
-
Db::new(
-
DbConfig::default().path(".fjall_data_from"),
-
cancel_token.child_token(),
-
)
-
.expect("couldnt create db"),
-
);
+
+
let from = Arc::new(db_old::Db::new(".fjall_data_from").expect("couldnt create db"));
+
let to = Arc::new(
Db::new(
DbConfig::default().path(".fjall_data_to").ks(|c| {
···
);
let nsids = from.get_nsids().collect::<Vec<_>>();
-
let _eps_thread = std::thread::spawn({
+
let eps_thread = std::thread::spawn({
let to = to.clone();
move || {
loop {