tracks lexicons and how many times they appeared on the jetstream

fix(server): tracking_since funcs, also fix old db get_hits

ptr.pet ebeab2a7 e28fd9c4

verified
Changed files
+57 -43
server
src
db
+57 -43
server/src/db/mod.rs
···
}
#[inline(always)]
+
fn get_part_opts() -> PartitionCreateOptions {
+
PartitionCreateOptions::default()
+
.compression(fjall::CompressionType::Miniz(9))
+
.compaction_strategy(fjall::compaction::Strategy::Fifo(fjall::compaction::Fifo {
+
limit: 5 * 1024 * 1024 * 1024, // 5 gb
+
ttl_seconds: Some(60 * 60 * 24 * 30), // 30 days
+
}))
+
}
+
+
#[inline(always)]
+
fn maybe_run_in_nsid_tree<T>(&self, nsid: &str, f: impl FnOnce(&Partition) -> T) -> Option<T> {
+
let _guard = scc::ebr::Guard::new();
+
let handle = match self.hits.peek(nsid, &_guard) {
+
Some(handle) => handle.clone(),
+
None => {
+
if self.inner.partition_exists(nsid) {
+
let handle = self
+
.inner
+
.open_partition(nsid, Self::get_part_opts())
+
.expect("cant open partition");
+
let _ = self.hits.insert(SmolStr::new(nsid), handle.clone());
+
handle
+
} else {
+
return None;
+
}
+
}
+
};
+
Some(f(&handle))
+
}
+
+
#[inline(always)]
fn run_in_nsid_tree<T>(
&self,
nsid: &str,
···
.hits
.entry(SmolStr::new(nsid))
.or_insert_with(|| {
-
let opts = PartitionCreateOptions::default()
-
.compression(fjall::CompressionType::Miniz(9))
-
.compaction_strategy(fjall::compaction::Strategy::Fifo(
-
fjall::compaction::Fifo {
-
limit: 5 * 1024 * 1024 * 1024, // 5 gb
-
ttl_seconds: Some(60 * 60 * 24 * 30), // 30 days
-
},
-
));
+
let opts = Self::get_part_opts();
self.inner.open_partition(nsid, opts).unwrap()
})
.get())
···
&self,
nsid: &str,
range: impl RangeBounds<u64>,
-
) -> AppResult<Box<dyn Iterator<Item = AppResult<(u64, NsidHit)>>>> {
+
) -> BoxedIter<AppResult<(u64, NsidHit)>> {
let start = range.start_bound().cloned().map(u64::to_be_bytes);
let end = range.end_bound().cloned().map(u64::to_be_bytes);
-
let _guard = scc::ebr::Guard::new();
-
let Some(tree) = self.hits.peek(nsid, &_guard) else {
-
return Ok(Box::new(std::iter::empty()));
-
};
-
-
Ok(Box::new(tree.range(TimestampRangeOld { start, end }).map(
-
|res| {
+
self.maybe_run_in_nsid_tree(nsid, |tree| -> BoxedIter<AppResult<(u64, NsidHit)>> {
+
Box::new(tree.range(TimestampRangeOld { start, end }).map(|res| {
res.map_err(AppError::from).map(|(key, val)| {
(
u64::from_be_bytes(key.as_ref().try_into().unwrap()),
unsafe { rkyv::from_bytes_unchecked::<_, Error>(&val).unwrap_unchecked() },
)
})
-
},
-
)))
+
}))
+
})
+
.unwrap_or_else(|| Box::new(std::iter::empty()))
}
pub fn tracking_since(&self) -> AppResult<u64> {
-
let _guard = scc::ebr::Guard::new();
// HACK: we should actually store when we started tracking but im lazy
// should be accurate enough
-
let Some(tree) = self.hits.peek("app.bsky.feed.like", &_guard) else {
-
return Ok(0);
-
};
-
let Some((timestamp_raw, _)) = tree.first_key_value()? else {
-
return Ok(0);
-
};
-
drop(_guard);
-
-
Ok(u64::from_be_bytes(
-
timestamp_raw.as_ref().try_into().unwrap(),
-
))
+
self.maybe_run_in_nsid_tree("app.bsky.feed.like", |tree| {
+
let Some((timestamp_raw, _)) = tree.first_key_value()? else {
+
return Ok(0);
+
};
+
Ok(u64::from_be_bytes(
+
timestamp_raw.as_ref().try_into().unwrap(),
+
))
+
})
+
.unwrap_or(Ok(0))
}
}
···
}
pub fn tracking_since(&self) -> AppResult<u64> {
-
let _guard = scc::ebr::Guard::new();
// HACK: we should actually store when we started tracking but im lazy
// should be accurate enough
-
let Some(handle) = self.hits.peek("app.bsky.feed.like", &_guard) else {
-
return Ok(0);
-
};
-
let Some((timestamps_raw, _)) = handle.tree.first_key_value()? else {
-
return Ok(0);
-
};
-
drop(_guard);
-
-
let mut timestamp_reader = Cursor::new(timestamps_raw);
-
timestamp_reader
-
.read_varint::<u64>()
-
.map_err(AppError::from)
+
self.maybe_run_in_nsid_tree("app.bsky.feed.like", |handle| {
+
let Some((timestamps_raw, _)) = handle.tree.first_key_value()? else {
+
return Ok(0);
+
};
+
let mut timestamp_reader = Cursor::new(timestamps_raw);
+
timestamp_reader
+
.read_varint::<u64>()
+
.map_err(AppError::from)
+
})
+
.unwrap_or(Ok(0))
}
}