tracks lexicons and how many times they appeared on the jetstream
1use std::{
2 io::Cursor,
3 ops::{Bound, Deref, RangeBounds},
4 path::Path,
5 sync::{
6 Arc,
7 atomic::{AtomicU64, AtomicUsize, Ordering as AtomicOrdering},
8 },
9 time::{Duration, Instant},
10};
11
12use fjall::{Config, Keyspace, Partition, PartitionCreateOptions, Slice};
13use ordered_varint::Variable;
14use rkyv::{Archive, Deserialize, Serialize, rancor::Error};
15use smol_str::SmolStr;
16use tokio::sync::broadcast;
17
18use crate::{
19 db_old::block::{ReadVariableExt, WriteVariableExt},
20 error::{AppError, AppResult},
21 jetstream::JetstreamEvent,
22 utils::{DefaultRateTracker, get_time},
23};
24
25mod block;
26
27#[derive(Clone, Debug, Default, Archive, Deserialize, Serialize, PartialEq)]
28#[rkyv(compare(PartialEq), derive(Debug))]
29pub struct NsidCounts {
30 pub count: u128,
31 pub deleted_count: u128,
32 pub last_seen: u64,
33}
34
35#[derive(Debug, Default, Archive, Deserialize, Serialize, PartialEq)]
36#[rkyv(compare(PartialEq), derive(Debug))]
37pub struct NsidHit {
38 pub deleted: bool,
39}
40
41#[derive(Clone)]
42pub struct EventRecord {
43 pub nsid: SmolStr,
44 pub timestamp: u64, // seconds
45 pub deleted: bool,
46}
47
48impl EventRecord {
49 pub fn from_jetstream(event: JetstreamEvent) -> Option<Self> {
50 match event {
51 JetstreamEvent::Commit {
52 time_us, commit, ..
53 } => Some(Self {
54 nsid: commit.collection.into(),
55 timestamp: time_us / 1_000_000,
56 deleted: false,
57 }),
58 JetstreamEvent::Delete {
59 time_us, commit, ..
60 } => Some(Self {
61 nsid: commit.collection.into(),
62 timestamp: time_us / 1_000_000,
63 deleted: true,
64 }),
65 _ => None,
66 }
67 }
68}
69
70type ItemDecoder = block::ItemDecoder<Cursor<Slice>, NsidHit>;
71type ItemEncoder = block::ItemEncoder<Vec<u8>, NsidHit>;
72type Item = block::Item<NsidHit>;
73
74pub struct LexiconHandle {
75 tree: Partition,
76 buf: Arc<scc::Queue<EventRecord>>,
77 buf_len: AtomicUsize,
78 last_insert: AtomicU64,
79 eps: DefaultRateTracker,
80 block_size: AtomicUsize,
81}
82
83impl LexiconHandle {
84 fn new(keyspace: &Keyspace, nsid: &str) -> Self {
85 let opts = PartitionCreateOptions::default().compression(fjall::CompressionType::Miniz(9));
86 Self {
87 tree: keyspace.open_partition(nsid, opts).unwrap(),
88 buf: Default::default(),
89 buf_len: AtomicUsize::new(0),
90 last_insert: AtomicU64::new(0),
91 eps: DefaultRateTracker::new(Duration::from_secs(5)),
92 block_size: AtomicUsize::new(1000),
93 }
94 }
95
96 fn item_count(&self) -> usize {
97 self.buf_len.load(AtomicOrdering::Acquire)
98 }
99
100 fn last_insert(&self) -> u64 {
101 self.last_insert.load(AtomicOrdering::Acquire)
102 }
103
104 fn suggested_block_size(&self) -> usize {
105 self.block_size.load(AtomicOrdering::Relaxed)
106 }
107
108 fn insert(&self, event: EventRecord) {
109 self.buf.push(event);
110 self.buf_len.fetch_add(1, AtomicOrdering::Release);
111 self.last_insert
112 .store(get_time().as_millis() as u64, AtomicOrdering::Release);
113 self.eps.observe(1);
114 let rate = self.eps.rate() as usize;
115 if rate != 0 {
116 self.block_size.store(rate * 60, AtomicOrdering::Relaxed);
117 }
118 }
119
120 fn sync(&self, max_block_size: usize) -> AppResult<usize> {
121 let mut writer = ItemEncoder::new(Vec::with_capacity(
122 size_of::<u64>() + self.item_count().min(max_block_size) * size_of::<(u64, NsidHit)>(),
123 ));
124 let mut start_timestamp = None;
125 let mut end_timestamp = None;
126 let mut written = 0_usize;
127 while let Some(event) = self.buf.pop() {
128 let item = Item::new(
129 event.timestamp,
130 &NsidHit {
131 deleted: event.deleted,
132 },
133 );
134 writer.encode(&item)?;
135 if start_timestamp.is_none() {
136 start_timestamp = Some(event.timestamp);
137 }
138 end_timestamp = Some(event.timestamp);
139 if written >= max_block_size {
140 break;
141 }
142 written += 1;
143 }
144 if let (Some(start_timestamp), Some(end_timestamp)) = (start_timestamp, end_timestamp) {
145 self.buf_len.store(0, AtomicOrdering::Release);
146 let value = writer.finish()?;
147 let mut key = Vec::with_capacity(size_of::<u64>() * 2);
148 key.write_varint(start_timestamp)?;
149 key.write_varint(end_timestamp)?;
150 self.tree.insert(key, value)?;
151 }
152 Ok(written)
153 }
154}
155
156type BoxedIter<T> = Box<dyn Iterator<Item = T>>;
157
158// counts is nsid -> NsidCounts
159// hits is tree per nsid: varint start time + varint end time -> block of hits
160pub struct Db {
161 inner: Keyspace,
162 hits: scc::HashIndex<SmolStr, Arc<LexiconHandle>>,
163 counts: Partition,
164 event_broadcaster: broadcast::Sender<(SmolStr, NsidCounts)>,
165 eps: DefaultRateTracker,
166 min_block_size: usize,
167 max_block_size: usize,
168 max_last_activity: Duration,
169}
170
171impl Db {
172 pub fn new(path: impl AsRef<Path>) -> AppResult<Self> {
173 tracing::info!("opening db...");
174 let ks = Config::new(path)
175 .cache_size(8 * 1024 * 1024) // from talna
176 .open()?;
177 Ok(Self {
178 hits: Default::default(),
179 counts: ks.open_partition(
180 "_counts",
181 PartitionCreateOptions::default().compression(fjall::CompressionType::None),
182 )?,
183 inner: ks,
184 event_broadcaster: broadcast::channel(1000).0,
185 eps: DefaultRateTracker::new(Duration::from_secs(1)),
186 min_block_size: 512,
187 max_block_size: 100_000,
188 max_last_activity: Duration::from_secs(10),
189 })
190 }
191
192 pub fn sync(&self, all: bool) -> AppResult<()> {
193 let _guard = scc::ebr::Guard::new();
194 for (nsid, tree) in self.hits.iter(&_guard) {
195 let count = tree.item_count();
196 let is_max_block_size = count > self.min_block_size.max(tree.suggested_block_size());
197 let is_too_old = (get_time().as_millis() as u64 - tree.last_insert())
198 > self.max_last_activity.as_millis() as u64;
199 if count > 0 && (all || is_max_block_size || is_too_old) {
200 loop {
201 let synced = tree.sync(self.max_block_size)?;
202 if synced == 0 {
203 break;
204 }
205 tracing::info!("synced {synced} of {nsid} to db");
206 }
207 }
208 }
209 Ok(())
210 }
211
212 #[inline(always)]
213 pub fn eps(&self) -> usize {
214 self.eps.rate() as usize
215 }
216
217 #[inline(always)]
218 pub fn new_listener(&self) -> broadcast::Receiver<(SmolStr, NsidCounts)> {
219 self.event_broadcaster.subscribe()
220 }
221
222 #[inline(always)]
223 fn maybe_run_in_nsid_tree<T>(
224 &self,
225 nsid: &str,
226 f: impl FnOnce(&LexiconHandle) -> T,
227 ) -> Option<T> {
228 let _guard = scc::ebr::Guard::new();
229 let handle = match self.hits.peek(nsid, &_guard) {
230 Some(handle) => handle.clone(),
231 None => {
232 if self.inner.partition_exists(nsid) {
233 let handle = Arc::new(LexiconHandle::new(&self.inner, nsid));
234 let _ = self.hits.insert(SmolStr::new(nsid), handle.clone());
235 handle
236 } else {
237 return None;
238 }
239 }
240 };
241 Some(f(&handle))
242 }
243
244 #[inline(always)]
245 fn run_in_nsid_tree<T>(
246 &self,
247 nsid: SmolStr,
248 f: impl FnOnce(&LexiconHandle) -> AppResult<T>,
249 ) -> AppResult<T> {
250 f(self
251 .hits
252 .entry(nsid.clone())
253 .or_insert_with(move || Arc::new(LexiconHandle::new(&self.inner, &nsid)))
254 .get())
255 }
256
257 pub fn record_event(&self, e: EventRecord) -> AppResult<()> {
258 let EventRecord {
259 nsid,
260 timestamp,
261 deleted,
262 } = e.clone();
263
264 // insert event
265 self.run_in_nsid_tree(nsid.clone(), move |tree| Ok(tree.insert(e)))?;
266 // increment count
267 let mut counts = self.get_count(&nsid)?;
268 counts.last_seen = timestamp;
269 if deleted {
270 counts.deleted_count += 1;
271 } else {
272 counts.count += 1;
273 }
274 self.insert_count(&nsid, counts.clone())?;
275 if self.event_broadcaster.receiver_count() > 0 {
276 let _ = self.event_broadcaster.send((SmolStr::new(&nsid), counts));
277 }
278 self.eps.observe(1);
279 Ok(())
280 }
281
282 #[inline(always)]
283 fn insert_count(&self, nsid: &str, counts: NsidCounts) -> AppResult<()> {
284 self.counts
285 .insert(
286 nsid,
287 unsafe { rkyv::to_bytes::<Error>(&counts).unwrap_unchecked() }.as_slice(),
288 )
289 .map_err(AppError::from)
290 }
291
292 pub fn get_count(&self, nsid: &str) -> AppResult<NsidCounts> {
293 let Some(raw) = self.counts.get(nsid)? else {
294 return Ok(NsidCounts::default());
295 };
296 Ok(unsafe { rkyv::from_bytes_unchecked::<_, Error>(&raw).unwrap_unchecked() })
297 }
298
299 pub fn get_counts(&self) -> impl Iterator<Item = AppResult<(SmolStr, NsidCounts)>> {
300 self.counts.iter().map(|res| {
301 res.map_err(AppError::from).map(|(key, val)| {
302 (
303 SmolStr::new(unsafe { str::from_utf8_unchecked(&key) }),
304 unsafe { rkyv::from_bytes_unchecked::<_, Error>(&val).unwrap_unchecked() },
305 )
306 })
307 })
308 }
309
310 pub fn get_nsids(&self) -> impl Iterator<Item = impl Deref<Target = str> + 'static> {
311 self.inner
312 .list_partitions()
313 .into_iter()
314 .filter(|k| k.deref() != "_counts")
315 }
316
317 pub fn get_hits_debug(&self, nsid: &str) -> BoxedIter<AppResult<(Slice, Slice)>> {
318 self.maybe_run_in_nsid_tree(nsid, |handle| -> BoxedIter<AppResult<(Slice, Slice)>> {
319 Box::new(
320 handle
321 .tree
322 .iter()
323 .rev()
324 .map(|res| res.map_err(AppError::from)),
325 )
326 })
327 .unwrap_or_else(|| Box::new(std::iter::empty()))
328 }
329
330 pub fn get_hits(
331 &self,
332 nsid: &str,
333 range: impl RangeBounds<u64> + std::fmt::Debug,
334 ) -> BoxedIter<AppResult<Item>> {
335 let start = range
336 .start_bound()
337 .cloned()
338 .map(|t| unsafe { t.to_variable_vec().unwrap_unchecked() });
339 let end = range
340 .end_bound()
341 .cloned()
342 .map(|t| unsafe { t.to_variable_vec().unwrap_unchecked() });
343 let limit = match range.end_bound().cloned() {
344 Bound::Included(end) => end,
345 Bound::Excluded(end) => end.saturating_sub(1),
346 Bound::Unbounded => u64::MAX,
347 };
348
349 self.maybe_run_in_nsid_tree(nsid, move |handle| -> BoxedIter<AppResult<Item>> {
350 let map_block = move |(key, val)| {
351 let mut key_reader = Cursor::new(key);
352 let start_timestamp = key_reader.read_varint::<u64>()?;
353 let items =
354 ItemDecoder::new(Cursor::new(val), start_timestamp)?.take_while(move |item| {
355 item.as_ref().map_or(true, |item| item.timestamp <= limit)
356 });
357 Ok(items)
358 };
359
360 Box::new(
361 handle
362 .tree
363 .range(TimestampRange { start, end })
364 .map(move |res| res.map_err(AppError::from).and_then(map_block))
365 .flatten()
366 .flatten(),
367 )
368 })
369 .unwrap_or_else(|| Box::new(std::iter::empty()))
370 }
371
372 pub fn tracking_since(&self) -> AppResult<u64> {
373 // HACK: we should actually store when we started tracking but im lazy
374 // should be accurate enough
375 self.maybe_run_in_nsid_tree("app.bsky.feed.like", |handle| {
376 let Some((timestamps_raw, _)) = handle.tree.first_key_value()? else {
377 return Ok(0);
378 };
379 let mut timestamp_reader = Cursor::new(timestamps_raw);
380 timestamp_reader
381 .read_varint::<u64>()
382 .map_err(AppError::from)
383 })
384 .unwrap_or(Ok(0))
385 }
386}
387
388type TimestampRepr = Vec<u8>;
389
390struct TimestampRange {
391 start: Bound<TimestampRepr>,
392 end: Bound<TimestampRepr>,
393}
394
395impl RangeBounds<TimestampRepr> for TimestampRange {
396 #[inline(always)]
397 fn start_bound(&self) -> Bound<&TimestampRepr> {
398 self.start.as_ref()
399 }
400
401 #[inline(always)]
402 fn end_bound(&self) -> Bound<&TimestampRepr> {
403 self.end.as_ref()
404 }
405}
406
407type TimestampReprOld = [u8; 8];
408
409struct TimestampRangeOld {
410 start: Bound<TimestampReprOld>,
411 end: Bound<TimestampReprOld>,
412}
413
414impl RangeBounds<TimestampReprOld> for TimestampRangeOld {
415 #[inline(always)]
416 fn start_bound(&self) -> Bound<&TimestampReprOld> {
417 self.start.as_ref()
418 }
419
420 #[inline(always)]
421 fn end_bound(&self) -> Bound<&TimestampReprOld> {
422 self.end.as_ref()
423 }
424}