tracks lexicons and how many times they appeared on the jetstream
1use std::{
2 collections::HashMap,
3 fmt::Debug,
4 io::Cursor,
5 ops::{Bound, Deref, RangeBounds},
6 path::{Path, PathBuf},
7 time::Duration,
8};
9
10use byteview::StrView;
11use fjall::{Config, Keyspace, Partition, PartitionCreateOptions};
12use itertools::{Either, Itertools};
13use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
14use rclite::Arc;
15use rkyv::{Archive, Deserialize, Serialize, rancor::Error};
16use smol_str::{SmolStr, ToSmolStr};
17use tokio::sync::broadcast;
18use tokio_util::sync::CancellationToken;
19
20use crate::{
21 db::handle::{ItemDecoder, LexiconHandle},
22 error::{AppError, AppResult},
23 jetstream::JetstreamEvent,
24 utils::{RateTracker, ReadVariableExt, varints_unsigned_encoded},
25};
26
27mod block;
28mod handle;
29
30#[derive(Clone, Debug, Default, Archive, Deserialize, Serialize, PartialEq)]
31#[rkyv(compare(PartialEq), derive(Debug))]
32pub struct NsidCounts {
33 pub count: u128,
34 pub deleted_count: u128,
35 pub last_seen: u64,
36}
37
38#[derive(Debug, Default, Archive, Deserialize, Serialize, PartialEq)]
39#[rkyv(compare(PartialEq), derive(Debug))]
40pub struct NsidHit {
41 pub deleted: bool,
42}
43
44#[derive(Clone)]
45pub struct EventRecord {
46 pub nsid: SmolStr,
47 pub timestamp: u64, // seconds
48 pub deleted: bool,
49}
50
51impl EventRecord {
52 pub fn from_jetstream(event: JetstreamEvent) -> Option<Self> {
53 match event {
54 JetstreamEvent::Commit {
55 time_us, commit, ..
56 } => Some(Self {
57 nsid: commit.collection.into(),
58 timestamp: time_us / 1_000_000,
59 deleted: false,
60 }),
61 JetstreamEvent::Delete {
62 time_us, commit, ..
63 } => Some(Self {
64 nsid: commit.collection.into(),
65 timestamp: time_us / 1_000_000,
66 deleted: true,
67 }),
68 _ => None,
69 }
70 }
71}
72
73pub struct DbInfo {
74 pub nsids: HashMap<SmolStr, Vec<usize>>,
75 pub disk_size: u64,
76}
77
78pub struct DbConfig {
79 pub ks_config: fjall::Config,
80 pub min_block_size: usize,
81 pub max_block_size: usize,
82 pub max_last_activity: u64,
83}
84
85impl DbConfig {
86 pub fn path(mut self, path: impl AsRef<Path>) -> Self {
87 self.ks_config = fjall::Config::new(path);
88 self
89 }
90
91 pub fn ks(mut self, f: impl FnOnce(fjall::Config) -> fjall::Config) -> Self {
92 self.ks_config = f(self.ks_config);
93 self
94 }
95}
96
97impl Default for DbConfig {
98 fn default() -> Self {
99 Self {
100 ks_config: fjall::Config::default(),
101 min_block_size: 512,
102 max_block_size: 500_000,
103 max_last_activity: Duration::from_secs(10).as_nanos() as u64,
104 }
105 }
106}
107
108// counts is nsid -> NsidCounts
109// hits is tree per nsid: varint start time + varint end time -> block of hits
110pub struct Db {
111 pub cfg: DbConfig,
112 pub ks: Keyspace,
113 counts: Partition,
114 hits: scc::HashIndex<SmolStr, Arc<LexiconHandle>>,
115 sync_pool: threadpool::ThreadPool,
116 event_broadcaster: broadcast::Sender<(SmolStr, NsidCounts)>,
117 eps: RateTracker<100>,
118 cancel_token: CancellationToken,
119}
120
121impl Db {
122 pub fn new(cfg: DbConfig, cancel_token: CancellationToken) -> AppResult<Self> {
123 tracing::info!("opening db...");
124 let ks = cfg.ks_config.clone().open()?;
125 Ok(Self {
126 cfg,
127 hits: Default::default(),
128 sync_pool: threadpool::Builder::new()
129 .num_threads(rayon::current_num_threads() * 2)
130 .build(),
131 counts: ks.open_partition(
132 "_counts",
133 PartitionCreateOptions::default().compression(fjall::CompressionType::None),
134 )?,
135 ks,
136 event_broadcaster: broadcast::channel(1000).0,
137 eps: RateTracker::new(Duration::from_secs(1)),
138 cancel_token,
139 })
140 }
141
142 #[inline(always)]
143 pub fn shutting_down(&self) -> impl Future<Output = ()> {
144 self.cancel_token.cancelled()
145 }
146
147 #[inline(always)]
148 pub fn is_shutting_down(&self) -> bool {
149 self.cancel_token.is_cancelled()
150 }
151
152 #[inline(always)]
153 pub fn eps(&self) -> usize {
154 self.eps.rate() as usize
155 }
156
157 #[inline(always)]
158 pub fn new_listener(&self) -> broadcast::Receiver<(SmolStr, NsidCounts)> {
159 self.event_broadcaster.subscribe()
160 }
161
162 pub fn sync(&self, all: bool) -> AppResult<()> {
163 // prepare all the data
164 let mut data = Vec::with_capacity(self.hits.len());
165 let _guard = scc::ebr::Guard::new();
166 for (_, handle) in self.hits.iter(&_guard) {
167 let mut nsid_data = Vec::with_capacity(2);
168 let mut total_count = 0;
169 let is_too_old = handle.since_last_activity() > self.cfg.max_last_activity;
170 // if we disconnect for a long time, we want to sync all of what we
171 // have to avoid having many small blocks (even if we run compaction
172 // later, it reduces work until we run compaction)
173 let block_size = (is_too_old || all)
174 .then_some(self.cfg.max_block_size)
175 .unwrap_or_else(|| {
176 self.cfg
177 .max_block_size
178 .min(self.cfg.min_block_size.max(handle.suggested_block_size()))
179 });
180 let count = handle.item_count();
181 let data_count = count / block_size;
182 if count > 0 && (all || data_count > 0 || is_too_old) {
183 for i in 0..data_count {
184 nsid_data.push((i, handle.clone(), block_size));
185 total_count += block_size;
186 }
187 // only sync remainder if we haven't met block size
188 let remainder = count % block_size;
189 if (all || data_count == 0) && remainder > 0 {
190 nsid_data.push((data_count, handle.clone(), remainder));
191 total_count += remainder;
192 }
193 }
194 tracing::info!(
195 "{}: will sync {} blocks ({} count)",
196 handle.nsid(),
197 nsid_data.len(),
198 total_count,
199 );
200 data.push(nsid_data);
201 }
202 drop(_guard);
203
204 // process the blocks
205 data.into_par_iter()
206 .map(|chunk| {
207 chunk
208 .into_iter()
209 .map(|(i, handle, max_block_size)| {
210 (i, handle.take_block_items(max_block_size), handle)
211 })
212 .collect::<Vec<_>>()
213 .into_par_iter()
214 .map(|(i, items, handle)| {
215 let count = items.len();
216 let block = LexiconHandle::encode_block_from_items(items, count)?;
217 tracing::info!(
218 "{}: encoded block with {} items",
219 handle.nsid(),
220 block.written,
221 );
222 AppResult::Ok((i, block, handle))
223 })
224 .collect::<Result<Vec<_>, _>>()
225 })
226 .try_for_each(|chunk| {
227 let chunk = chunk?;
228 for (i, block, handle) in chunk {
229 self.sync_pool
230 .execute(move || match handle.insert(block.key, block.data) {
231 Ok(_) => {
232 tracing::info!("{}: [{i}] synced {}", block.written, handle.nsid())
233 }
234 Err(err) => tracing::error!("failed to sync block: {}", err),
235 });
236 }
237 AppResult::Ok(())
238 })?;
239 self.sync_pool.join();
240
241 Ok(())
242 }
243
244 pub fn compact(
245 &self,
246 nsid: impl AsRef<str>,
247 max_count: usize,
248 range: impl RangeBounds<u64>,
249 sort: bool,
250 ) -> AppResult<()> {
251 let Some(handle) = self.get_handle(nsid) else {
252 return Ok(());
253 };
254 handle.compact(max_count, range, sort)
255 }
256
257 pub fn compact_all(
258 &self,
259 max_count: usize,
260 range: impl RangeBounds<u64> + Clone,
261 sort: bool,
262 ) -> AppResult<()> {
263 for nsid in self.get_nsids() {
264 self.compact(nsid, max_count, range.clone(), sort)?;
265 }
266 Ok(())
267 }
268
269 pub fn major_compact(&self) -> AppResult<()> {
270 self.compact_all(self.cfg.max_block_size, .., true)?;
271 let _guard = scc::ebr::Guard::new();
272 for (_, handle) in self.hits.iter(&_guard) {
273 handle.deref().major_compact()?;
274 }
275 Ok(())
276 }
277
278 #[inline(always)]
279 fn get_handle(&self, nsid: impl AsRef<str>) -> Option<Arc<LexiconHandle>> {
280 let _guard = scc::ebr::Guard::new();
281 let handle = match self.hits.peek(nsid.as_ref(), &_guard) {
282 Some(handle) => handle.clone(),
283 None => {
284 if self.ks.partition_exists(nsid.as_ref()) {
285 let handle = Arc::new(LexiconHandle::new(&self.ks, nsid.as_ref()));
286 let _ = self.hits.insert(SmolStr::new(nsid), handle.clone());
287 handle
288 } else {
289 return None;
290 }
291 }
292 };
293 Some(handle)
294 }
295
296 #[inline(always)]
297 fn ensure_handle(&self, nsid: &SmolStr) -> impl Deref<Target = Arc<LexiconHandle>> + use<'_> {
298 self.hits
299 .entry(nsid.clone())
300 .or_insert_with(|| Arc::new(LexiconHandle::new(&self.ks, &nsid)))
301 }
302
303 pub fn ingest_events(&self, events: impl Iterator<Item = EventRecord>) -> AppResult<()> {
304 for (key, chunk) in events.chunk_by(|event| event.nsid.clone()).into_iter() {
305 let mut counts = self.get_count(&key)?;
306 let mut count = 0;
307 self.ensure_handle(&key).queue(chunk.inspect(|e| {
308 // increment count
309 counts.last_seen = e.timestamp;
310 if e.deleted {
311 counts.deleted_count += 1;
312 } else {
313 counts.count += 1;
314 }
315 count += 1;
316 }));
317 self.eps.observe(count);
318 self.insert_count(&key, &counts)?;
319 if self.event_broadcaster.receiver_count() > 0 {
320 let _ = self.event_broadcaster.send((key, counts));
321 }
322 }
323 Ok(())
324 }
325
326 #[inline(always)]
327 fn insert_count(&self, nsid: &str, counts: &NsidCounts) -> AppResult<()> {
328 self.counts
329 .insert(
330 nsid,
331 unsafe { rkyv::to_bytes::<Error>(counts).unwrap_unchecked() }.as_slice(),
332 )
333 .map_err(AppError::from)
334 }
335
336 pub fn get_count(&self, nsid: &str) -> AppResult<NsidCounts> {
337 let Some(raw) = self.counts.get(nsid)? else {
338 return Ok(NsidCounts::default());
339 };
340 Ok(unsafe { rkyv::from_bytes_unchecked::<_, Error>(&raw).unwrap_unchecked() })
341 }
342
343 pub fn get_counts(&self) -> impl Iterator<Item = AppResult<(SmolStr, NsidCounts)>> {
344 self.counts.iter().map(|res| {
345 res.map_err(AppError::from).map(|(key, val)| {
346 (
347 SmolStr::new(unsafe { str::from_utf8_unchecked(&key) }),
348 unsafe { rkyv::from_bytes_unchecked::<_, Error>(&val).unwrap_unchecked() },
349 )
350 })
351 })
352 }
353
354 pub fn get_nsids(&self) -> impl Iterator<Item = StrView> {
355 self.ks
356 .list_partitions()
357 .into_iter()
358 .filter(|k| k.deref() != "_counts")
359 }
360
361 pub fn info(&self) -> AppResult<DbInfo> {
362 let mut nsids = HashMap::new();
363 for nsid in self.get_nsids() {
364 let Some(handle) = self.get_handle(&nsid) else {
365 continue;
366 };
367 let block_lens = handle.iter().rev().try_fold(Vec::new(), |mut acc, item| {
368 let (key, value) = item?;
369 let mut timestamps = Cursor::new(key);
370 let start_timestamp = timestamps.read_varint()?;
371 let decoder = ItemDecoder::new(Cursor::new(value), start_timestamp)?;
372 acc.push(decoder.item_count());
373 AppResult::Ok(acc)
374 })?;
375 nsids.insert(nsid.to_smolstr(), block_lens);
376 }
377 Ok(DbInfo {
378 nsids,
379 disk_size: self.ks.disk_space(),
380 })
381 }
382
383 pub fn get_hits(
384 &self,
385 nsid: &str,
386 range: impl RangeBounds<u64> + std::fmt::Debug,
387 ) -> impl Iterator<Item = AppResult<handle::Item>> {
388 let start_limit = match range.start_bound().cloned() {
389 Bound::Included(start) => start,
390 Bound::Excluded(start) => start.saturating_add(1),
391 Bound::Unbounded => 0,
392 };
393 let end_limit = match range.end_bound().cloned() {
394 Bound::Included(end) => end,
395 Bound::Excluded(end) => end.saturating_sub(1),
396 Bound::Unbounded => u64::MAX,
397 };
398 let end_key = varints_unsigned_encoded([end_limit]);
399
400 let Some(handle) = self.get_handle(nsid) else {
401 return Either::Right(std::iter::empty());
402 };
403
404 let map_block = move |(key, val)| {
405 let mut key_reader = Cursor::new(key);
406 let start_timestamp = key_reader.read_varint::<u64>()?;
407 if start_timestamp < start_limit {
408 return Ok(None);
409 }
410 let items = handle::ItemDecoder::new(Cursor::new(val), start_timestamp)?
411 .take_while(move |item| {
412 item.as_ref().map_or(true, |item| {
413 item.timestamp <= end_limit && item.timestamp >= start_limit
414 })
415 })
416 .map(|res| res.map_err(AppError::from));
417 Ok(Some(items))
418 };
419
420 Either::Left(
421 handle
422 .range(..end_key)
423 .rev()
424 .map_while(move |res| res.map_err(AppError::from).and_then(map_block).transpose())
425 .collect::<Vec<_>>()
426 .into_iter()
427 .rev()
428 .flatten()
429 .flatten(),
430 )
431 }
432
433 pub fn tracking_since(&self) -> AppResult<u64> {
434 // HACK: we should actually store when we started tracking but im lazy
435 // this should be accurate enough
436 let Some(handle) = self.get_handle("app.bsky.feed.like") else {
437 return Ok(0);
438 };
439 let Some((timestamps_raw, _)) = handle.first_key_value()? else {
440 return Ok(0);
441 };
442 let mut timestamp_reader = Cursor::new(timestamps_raw);
443 timestamp_reader
444 .read_varint::<u64>()
445 .map_err(AppError::from)
446 }
447}