tracks lexicons and how many times they appeared on the jetstream

refactor(server): delete old db impl, rename migrate to compact

ptr.pet 84d25308 48892f4c

verified
Changed files
+10 -218
server
src
+1 -210
server/src/db/mod.rs
···
}
}
-
// counts is nsid -> NsidCounts
-
// hits is tree per nsid: timestamp -> NsidHit
-
pub struct DbOld {
-
inner: Keyspace,
-
hits: scc::HashIndex<SmolStr, Partition>,
-
counts: Partition,
-
event_broadcaster: broadcast::Sender<(SmolStr, NsidCounts)>,
-
eps: Rate,
-
}
-
-
impl DbOld {
-
pub fn new(path: impl AsRef<Path>) -> AppResult<Self> {
-
tracing::info!("opening db...");
-
let ks = Config::new(path)
-
.cache_size(8 * 1024 * 1024) // from talna
-
.open()?;
-
Ok(Self {
-
hits: Default::default(),
-
counts: ks.open_partition(
-
"_counts",
-
PartitionCreateOptions::default().compression(fjall::CompressionType::None),
-
)?,
-
inner: ks,
-
event_broadcaster: broadcast::channel(1000).0,
-
eps: Rate::new(Duration::from_secs(1)),
-
})
-
}
-
-
pub fn eps(&self) -> usize {
-
self.eps.rate(&()) as usize
-
}
-
-
pub fn new_listener(&self) -> broadcast::Receiver<(SmolStr, NsidCounts)> {
-
self.event_broadcaster.subscribe()
-
}
-
-
#[inline(always)]
-
fn get_part_opts() -> PartitionCreateOptions {
-
PartitionCreateOptions::default()
-
.compression(fjall::CompressionType::Miniz(9))
-
.compaction_strategy(fjall::compaction::Strategy::Fifo(fjall::compaction::Fifo {
-
limit: 5 * 1024 * 1024 * 1024, // 5 gb
-
ttl_seconds: Some(60 * 60 * 24 * 30), // 30 days
-
}))
-
}
-
-
#[inline(always)]
-
fn maybe_run_in_nsid_tree<T>(&self, nsid: &str, f: impl FnOnce(&Partition) -> T) -> Option<T> {
-
let _guard = scc::ebr::Guard::new();
-
let handle = match self.hits.peek(nsid, &_guard) {
-
Some(handle) => handle.clone(),
-
None => {
-
if self.inner.partition_exists(nsid) {
-
let handle = self
-
.inner
-
.open_partition(nsid, Self::get_part_opts())
-
.expect("cant open partition");
-
let _ = self.hits.insert(SmolStr::new(nsid), handle.clone());
-
handle
-
} else {
-
return None;
-
}
-
}
-
};
-
Some(f(&handle))
-
}
-
-
#[inline(always)]
-
fn run_in_nsid_tree<T>(
-
&self,
-
nsid: &str,
-
f: impl FnOnce(&Partition) -> AppResult<T>,
-
) -> AppResult<T> {
-
f(self
-
.hits
-
.entry(SmolStr::new(nsid))
-
.or_insert_with(|| {
-
let opts = Self::get_part_opts();
-
self.inner.open_partition(nsid, opts).unwrap()
-
})
-
.get())
-
}
-
-
pub fn record_event(&self, e: EventRecord) -> AppResult<()> {
-
let EventRecord {
-
nsid,
-
timestamp,
-
deleted,
-
} = e;
-
-
self.insert_event(&nsid, timestamp, deleted)?;
-
// increment count
-
let mut counts = self.get_count(&nsid)?;
-
counts.last_seen = timestamp;
-
if deleted {
-
counts.deleted_count += 1;
-
} else {
-
counts.count += 1;
-
}
-
self.insert_count(&nsid, counts.clone())?;
-
if self.event_broadcaster.receiver_count() > 0 {
-
let _ = self.event_broadcaster.send((SmolStr::new(&nsid), counts));
-
}
-
self.eps.observe(&(), 1);
-
Ok(())
-
}
-
-
#[inline(always)]
-
fn insert_event(&self, nsid: &str, timestamp: u64, deleted: bool) -> AppResult<()> {
-
self.run_in_nsid_tree(nsid, |tree| {
-
tree.insert(
-
timestamp.to_be_bytes(),
-
unsafe { rkyv::to_bytes::<Error>(&NsidHit { deleted }).unwrap_unchecked() }
-
.as_slice(),
-
)
-
.map_err(AppError::from)
-
})
-
}
-
-
#[inline(always)]
-
fn insert_count(&self, nsid: &str, counts: NsidCounts) -> AppResult<()> {
-
self.counts
-
.insert(
-
nsid,
-
unsafe { rkyv::to_bytes::<Error>(&counts).unwrap_unchecked() }.as_slice(),
-
)
-
.map_err(AppError::from)
-
}
-
-
pub fn get_count(&self, nsid: &str) -> AppResult<NsidCounts> {
-
let Some(raw) = self.counts.get(nsid)? else {
-
return Ok(NsidCounts::default());
-
};
-
Ok(unsafe { rkyv::from_bytes_unchecked::<_, Error>(&raw).unwrap_unchecked() })
-
}
-
-
pub fn get_counts(&self) -> impl Iterator<Item = AppResult<(SmolStr, NsidCounts)>> {
-
self.counts.iter().map(|res| {
-
res.map_err(AppError::from).map(|(key, val)| {
-
(
-
SmolStr::new(unsafe { str::from_utf8_unchecked(&key) }),
-
unsafe { rkyv::from_bytes_unchecked::<_, Error>(&val).unwrap_unchecked() },
-
)
-
})
-
})
-
}
-
-
pub fn get_nsids(&self) -> impl Iterator<Item = impl Deref<Target = str> + 'static> {
-
self.inner
-
.list_partitions()
-
.into_iter()
-
.filter(|k| k.deref() != "_counts")
-
}
-
-
pub fn get_hits(
-
&self,
-
nsid: &str,
-
range: impl RangeBounds<u64>,
-
) -> BoxedIter<AppResult<(u64, NsidHit)>> {
-
let start = range.start_bound().cloned().map(u64::to_be_bytes);
-
let end = range.end_bound().cloned().map(u64::to_be_bytes);
-
-
self.maybe_run_in_nsid_tree(nsid, |tree| -> BoxedIter<AppResult<(u64, NsidHit)>> {
-
Box::new(tree.range(TimestampRangeOld { start, end }).map(|res| {
-
res.map_err(AppError::from).map(|(key, val)| {
-
(
-
u64::from_be_bytes(key.as_ref().try_into().unwrap()),
-
unsafe { rkyv::from_bytes_unchecked::<_, Error>(&val).unwrap_unchecked() },
-
)
-
})
-
}))
-
})
-
.unwrap_or_else(|| Box::new(std::iter::empty()))
-
}
-
-
pub fn tracking_since(&self) -> AppResult<u64> {
-
// HACK: we should actually store when we started tracking but im lazy
-
// should be accurate enough
-
self.maybe_run_in_nsid_tree("app.bsky.feed.like", |tree| {
-
let Some((timestamp_raw, _)) = tree.first_key_value()? else {
-
return Ok(0);
-
};
-
Ok(u64::from_be_bytes(
-
timestamp_raw.as_ref().try_into().unwrap(),
-
))
-
})
-
.unwrap_or(Ok(0))
-
}
-
}
-
type ItemDecoder = block::ItemDecoder<Cursor<Slice>, NsidHit>;
type ItemEncoder = block::ItemEncoder<Vec<u8>, NsidHit>;
type Item = block::Item<NsidHit>;
···
event_broadcaster: broadcast::channel(1000).0,
eps: Rate::new(Duration::from_secs(1)),
min_block_size: 512,
-
max_block_size: 100_000,
+
max_block_size: 500_000,
max_last_activity: Duration::from_secs(10),
})
}
···
self.end.as_ref()
}
}
-
-
type TimestampReprOld = [u8; 8];
-
-
struct TimestampRangeOld {
-
start: Bound<TimestampReprOld>,
-
end: Bound<TimestampReprOld>,
-
}
-
-
impl RangeBounds<TimestampReprOld> for TimestampRangeOld {
-
#[inline(always)]
-
fn start_bound(&self) -> Bound<&TimestampReprOld> {
-
self.start.as_ref()
-
}
-
-
#[inline(always)]
-
fn end_bound(&self) -> Bound<&TimestampReprOld> {
-
self.end.as_ref()
-
}
-
}
+9 -8
server/src/main.rs
···
use crate::{
api::serve,
-
db::{Db, DbOld, EventRecord},
+
db::{Db, EventRecord},
error::AppError,
jetstream::JetstreamClient,
};
···
.init();
match std::env::args().nth(1).as_deref() {
-
Some("migrate") => {
-
migrate();
+
Some("compact") => {
+
compact();
return;
}
Some("debug") => {
···
}
}
-
fn migrate() {
-
let from = Arc::new(DbOld::new(".fjall_data").expect("couldnt create db"));
-
let to = Arc::new(Db::new(".fjall_data_migrated").expect("couldnt create db"));
+
fn compact() {
+
let from = Arc::new(Db::new(".fjall_data_from").expect("couldnt create db"));
+
let to = Arc::new(Db::new(".fjall_data_to").expect("couldnt create db"));
let mut threads = Vec::new();
for nsid in from.get_nsids() {
···
tracing::info!("migrating {} ...", nsid.deref());
let mut count = 0_u64;
for hit in from.get_hits(&nsid, ..) {
-
let (timestamp, data) = hit.expect("cant read event");
+
let hit = hit.expect("cant read event");
+
let data = hit.access();
to.record_event(EventRecord {
nsid: nsid.to_smolstr(),
-
timestamp,
+
timestamp: hit.timestamp,
deleted: data.deleted,
})
.expect("cant record event");