tracks lexicons and how many times they appeared on the jetstream
1use std::{
2 collections::HashMap,
3 fmt::Debug,
4 io::Cursor,
5 ops::{Bound, Deref, RangeBounds},
6 path::Path,
7 time::Duration,
8 u64,
9};
10
11use byteview::StrView;
12use fjall::{Keyspace, Partition, PartitionCreateOptions};
13use itertools::{Either, Itertools};
14use rayon::iter::{IntoParallelIterator, ParallelIterator};
15use rclite::Arc;
16use rkyv::{Archive, Deserialize, Serialize, rancor::Error};
17use smol_str::{SmolStr, ToSmolStr};
18use tokio::sync::broadcast;
19use tokio_util::sync::CancellationToken;
20
21use crate::{
22 db::handle::{ItemDecoder, LexiconHandle},
23 error::{AppError, AppResult},
24 jetstream::JetstreamEvent,
25 utils::{CLOCK, RateTracker, ReadVariableExt, varints_unsigned_encoded},
26};
27
28mod block;
29mod handle;
30
31#[derive(Clone, Debug, Default, Archive, Deserialize, Serialize, PartialEq)]
32#[rkyv(compare(PartialEq), derive(Debug))]
33pub struct NsidCounts {
34 pub count: u128,
35 pub deleted_count: u128,
36 pub last_seen: u64,
37}
38
39#[derive(Debug, Default, Archive, Deserialize, Serialize, PartialEq)]
40#[rkyv(compare(PartialEq), derive(Debug))]
41pub struct NsidHit {
42 pub deleted: bool,
43}
44
45#[derive(Clone)]
46pub struct EventRecord {
47 pub nsid: SmolStr,
48 pub timestamp: u64, // seconds
49 pub deleted: bool,
50}
51
52impl EventRecord {
53 pub fn from_jetstream(event: JetstreamEvent) -> Option<Self> {
54 match event {
55 JetstreamEvent::Commit {
56 time_us, commit, ..
57 } => Some(Self {
58 nsid: commit.collection.into(),
59 timestamp: time_us / 1_000_000,
60 deleted: false,
61 }),
62 JetstreamEvent::Delete {
63 time_us, commit, ..
64 } => Some(Self {
65 nsid: commit.collection.into(),
66 timestamp: time_us / 1_000_000,
67 deleted: true,
68 }),
69 _ => None,
70 }
71 }
72}
73
74pub struct DbInfo {
75 pub nsids: HashMap<SmolStr, Vec<usize>>,
76 pub disk_size: u64,
77}
78
79pub struct DbConfig {
80 pub ks_config: fjall::Config,
81 pub min_block_size: usize,
82 pub max_block_size: usize,
83 pub max_last_activity: Duration,
84}
85
86impl DbConfig {
87 pub fn path(mut self, path: impl AsRef<Path>) -> Self {
88 self.ks_config = fjall::Config::new(path);
89 self
90 }
91
92 pub fn ks(mut self, f: impl FnOnce(fjall::Config) -> fjall::Config) -> Self {
93 self.ks_config = f(self.ks_config);
94 self
95 }
96}
97
98impl Default for DbConfig {
99 fn default() -> Self {
100 Self {
101 ks_config: fjall::Config::default()
102 .cache_size(1024 * 1024 * 512)
103 .max_write_buffer_size(u64::MAX),
104 min_block_size: 1000,
105 max_block_size: 250_000,
106 max_last_activity: Duration::from_secs(10),
107 }
108 }
109}
110
111// counts is nsid -> NsidCounts
112// hits is tree per nsid: varint start time + varint end time -> block of hits
113pub struct Db {
114 pub cfg: DbConfig,
115 pub ks: Keyspace,
116 counts: Partition,
117 hits: scc::HashIndex<SmolStr, Arc<LexiconHandle>>,
118 sync_pool: threadpool::ThreadPool,
119 event_broadcaster: broadcast::Sender<(SmolStr, NsidCounts)>,
120 eps: RateTracker<100>, // 100 millis buckets
121 cancel_token: CancellationToken,
122}
123
124impl Db {
125 pub fn new(cfg: DbConfig, cancel_token: CancellationToken) -> AppResult<Self> {
126 tracing::info!("opening db...");
127 let ks = cfg.ks_config.clone().open()?;
128 Ok(Self {
129 cfg,
130 hits: Default::default(),
131 sync_pool: threadpool::Builder::new()
132 .num_threads(rayon::current_num_threads() * 2)
133 .build(),
134 counts: ks.open_partition(
135 "_counts",
136 PartitionCreateOptions::default().compression(fjall::CompressionType::None),
137 )?,
138 ks,
139 event_broadcaster: broadcast::channel(1000).0,
140 eps: RateTracker::new(Duration::from_secs(1)),
141 cancel_token,
142 })
143 }
144
145 #[inline(always)]
146 pub fn shutting_down(&self) -> impl Future<Output = ()> {
147 self.cancel_token.cancelled()
148 }
149
150 #[inline(always)]
151 pub fn is_shutting_down(&self) -> bool {
152 self.cancel_token.is_cancelled()
153 }
154
155 #[inline(always)]
156 pub fn eps(&self) -> usize {
157 self.eps.rate() as usize
158 }
159
160 #[inline(always)]
161 pub fn new_listener(&self) -> broadcast::Receiver<(SmolStr, NsidCounts)> {
162 self.event_broadcaster.subscribe()
163 }
164
165 pub fn sync(&self, all: bool) -> AppResult<()> {
166 let start = CLOCK.now();
167 // prepare all the data
168 let mut data = Vec::with_capacity(self.hits.len());
169 let _guard = scc::ebr::Guard::new();
170 for (_, handle) in self.hits.iter(&_guard) {
171 let mut nsid_data = Vec::with_capacity(2);
172 let mut total_count = 0;
173 let is_too_old = handle.since_last_activity() > self.cfg.max_last_activity;
174 // if we disconnect for a long time, we want to sync all of what we
175 // have to avoid having many small blocks (even if we run compaction
176 // later, it reduces work until we run compaction)
177 let block_size = (is_too_old || all)
178 .then_some(self.cfg.max_block_size)
179 .unwrap_or_else(|| {
180 self.cfg
181 .max_block_size
182 .min(self.cfg.min_block_size.max(handle.suggested_block_size()))
183 });
184 let count = handle.item_count();
185 let data_count = count / block_size;
186 if count > 0 && (all || data_count > 0 || is_too_old) {
187 for _ in 0..data_count {
188 nsid_data.push((handle.clone(), block_size));
189 total_count += block_size;
190 }
191 // only sync remainder if we haven't met block size
192 let remainder = count % block_size;
193 if (all || data_count == 0) && remainder > 0 {
194 nsid_data.push((handle.clone(), remainder));
195 total_count += remainder;
196 }
197 }
198 let _span = handle.span().entered();
199 if nsid_data.len() > 0 {
200 tracing::info!(
201 {blocks = %nsid_data.len(), count = %total_count},
202 "will encode & sync",
203 );
204 data.push(nsid_data);
205 }
206 }
207 drop(_guard);
208
209 // process the blocks
210 data.into_par_iter()
211 .map(|chunk| {
212 chunk
213 .into_iter()
214 .map(|(handle, max_block_size)| {
215 (handle.take_block_items(max_block_size), handle)
216 })
217 .collect::<Vec<_>>()
218 .into_par_iter()
219 .map(|(items, handle)| {
220 let count = items.len();
221 let block = LexiconHandle::encode_block_from_items(items, count)?;
222 AppResult::Ok((block, handle))
223 })
224 .collect::<Result<Vec<_>, _>>()
225 })
226 .try_for_each(|chunk| {
227 let chunk = chunk?;
228 for (block, handle) in chunk {
229 self.sync_pool.execute(move || {
230 let _span = handle.span().entered();
231 match handle.insert(block.key, block.data) {
232 Ok(_) => {
233 tracing::info!({count = %block.written}, "synced")
234 }
235 Err(err) => tracing::error!({ err = %err }, "failed to sync block"),
236 }
237 });
238 }
239 AppResult::Ok(())
240 })?;
241 self.sync_pool.join();
242 tracing::info!(time = %start.elapsed().as_secs_f64(), "synced all blocks");
243
244 Ok(())
245 }
246
247 pub fn compact(
248 &self,
249 nsid: impl AsRef<str>,
250 max_count: usize,
251 range: impl RangeBounds<u64>,
252 sort: bool,
253 ) -> AppResult<()> {
254 let Some(handle) = self.get_handle(nsid) else {
255 return Ok(());
256 };
257 handle.compact(max_count, range, sort)
258 }
259
260 pub fn compact_all(
261 &self,
262 max_count: usize,
263 range: impl RangeBounds<u64> + Clone,
264 sort: bool,
265 ) -> AppResult<()> {
266 for nsid in self.get_nsids() {
267 self.compact(nsid, max_count, range.clone(), sort)?;
268 }
269 Ok(())
270 }
271
272 pub fn major_compact(&self) -> AppResult<()> {
273 self.compact_all(self.cfg.max_block_size, .., true)?;
274 Ok(())
275 }
276
277 #[inline(always)]
278 fn get_handle(&self, nsid: impl AsRef<str>) -> Option<Arc<LexiconHandle>> {
279 let _guard = scc::ebr::Guard::new();
280 let handle = match self.hits.peek(nsid.as_ref(), &_guard) {
281 Some(handle) => handle.clone(),
282 None => {
283 if self.ks.partition_exists(nsid.as_ref()) {
284 let handle = Arc::new(LexiconHandle::new(&self.ks, nsid.as_ref()));
285 let _ = self.hits.insert(SmolStr::new(nsid), handle.clone());
286 handle
287 } else {
288 return None;
289 }
290 }
291 };
292 Some(handle)
293 }
294
295 #[inline(always)]
296 fn ensure_handle(&self, nsid: &SmolStr) -> impl Deref<Target = Arc<LexiconHandle>> + use<'_> {
297 self.hits
298 .entry(nsid.clone())
299 .or_insert_with(|| Arc::new(LexiconHandle::new(&self.ks, &nsid)))
300 }
301
302 pub fn ingest_events(&self, events: impl Iterator<Item = EventRecord>) -> AppResult<()> {
303 for (key, chunk) in events.chunk_by(|event| event.nsid.clone()).into_iter() {
304 let mut counts = self.get_count(&key)?;
305 let mut count = 0;
306 self.ensure_handle(&key).queue(chunk.inspect(|e| {
307 // increment count
308 counts.last_seen = e.timestamp;
309 if e.deleted {
310 counts.deleted_count += 1;
311 } else {
312 counts.count += 1;
313 }
314 count += 1;
315 }));
316 self.eps.observe(count);
317 self.insert_count(&key, &counts)?;
318 if self.event_broadcaster.receiver_count() > 0 {
319 let _ = self.event_broadcaster.send((key, counts));
320 }
321 }
322 Ok(())
323 }
324
325 #[inline(always)]
326 fn insert_count(&self, nsid: &str, counts: &NsidCounts) -> AppResult<()> {
327 self.counts
328 .insert(
329 nsid,
330 unsafe { rkyv::to_bytes::<Error>(counts).unwrap_unchecked() }.as_slice(),
331 )
332 .map_err(AppError::from)
333 }
334
335 pub fn get_count(&self, nsid: &str) -> AppResult<NsidCounts> {
336 let Some(raw) = self.counts.get(nsid)? else {
337 return Ok(NsidCounts::default());
338 };
339 Ok(unsafe { rkyv::from_bytes_unchecked::<_, Error>(&raw).unwrap_unchecked() })
340 }
341
342 pub fn get_counts(&self) -> impl Iterator<Item = AppResult<(SmolStr, NsidCounts)>> {
343 self.counts.iter().map(|res| {
344 res.map_err(AppError::from).map(|(key, val)| {
345 (
346 SmolStr::new(unsafe { str::from_utf8_unchecked(&key) }),
347 unsafe { rkyv::from_bytes_unchecked::<_, Error>(&val).unwrap_unchecked() },
348 )
349 })
350 })
351 }
352
353 pub fn get_nsids(&self) -> impl Iterator<Item = StrView> {
354 self.ks
355 .list_partitions()
356 .into_iter()
357 .filter(|k| k.deref() != "_counts")
358 }
359
360 pub fn info(&self) -> AppResult<DbInfo> {
361 let mut nsids = HashMap::new();
362 for nsid in self.get_nsids() {
363 let Some(handle) = self.get_handle(&nsid) else {
364 continue;
365 };
366 let block_lens = handle.iter().rev().try_fold(Vec::new(), |mut acc, item| {
367 let (key, value) = item?;
368 let mut timestamps = Cursor::new(key);
369 let start_timestamp = timestamps.read_varint()?;
370 let decoder = ItemDecoder::new(Cursor::new(value), start_timestamp)?;
371 acc.push(decoder.item_count());
372 AppResult::Ok(acc)
373 })?;
374 nsids.insert(nsid.to_smolstr(), block_lens);
375 }
376 Ok(DbInfo {
377 nsids,
378 disk_size: self.ks.disk_space(),
379 })
380 }
381
382 pub fn get_hits(
383 &self,
384 nsid: &str,
385 range: impl RangeBounds<u64> + std::fmt::Debug,
386 max_items: usize,
387 ) -> impl Iterator<Item = AppResult<handle::Item>> {
388 let start_limit = match range.start_bound().cloned() {
389 Bound::Included(start) => start,
390 Bound::Excluded(start) => start.saturating_add(1),
391 Bound::Unbounded => 0,
392 };
393 let end_limit = match range.end_bound().cloned() {
394 Bound::Included(end) => end,
395 Bound::Excluded(end) => end.saturating_sub(1),
396 Bound::Unbounded => u64::MAX,
397 };
398 let end_key = varints_unsigned_encoded([end_limit]);
399
400 let Some(handle) = self.get_handle(nsid) else {
401 return Either::Right(std::iter::empty());
402 };
403
404 // let mut ts = CLOCK.now();
405 let mut current_item_count = 0;
406 let map_block = move |(key, val)| {
407 if current_item_count > max_items {
408 return Ok(None);
409 }
410 let mut key_reader = Cursor::new(key);
411 let start_timestamp = key_reader.read_varint::<u64>()?;
412 // let end_timestamp = key_reader.read_varint::<u64>()?;
413 if start_timestamp < start_limit {
414 // tracing::info!(
415 // "stopped at block with timestamps {start_timestamp}..{end_timestamp} because {start_limit} is greater"
416 // );
417 return Ok(None);
418 }
419 let decoder = handle::ItemDecoder::new(Cursor::new(val), start_timestamp)?;
420 current_item_count += decoder.item_count();
421 // tracing::info!(
422 // "took {}ns to get block with size {}",
423 // ts.elapsed().as_nanos(),
424 // decoder.item_count()
425 // );
426 // ts = CLOCK.now();
427 Ok(Some(
428 decoder
429 .take_while(move |item| {
430 item.as_ref().map_or(true, |item| {
431 item.timestamp <= end_limit && item.timestamp >= start_limit
432 })
433 })
434 .map(|res| res.map_err(AppError::from)),
435 ))
436 };
437
438 Either::Left(
439 handle
440 .range(..end_key)
441 .rev()
442 .map_while(move |res| res.map_err(AppError::from).and_then(map_block).transpose())
443 .flatten()
444 .flatten(),
445 )
446 }
447
448 pub fn tracking_since(&self) -> AppResult<u64> {
449 // HACK: we should actually store when we started tracking but im lazy
450 // this should be accurate enough
451 let Some(handle) = self.get_handle("app.bsky.feed.like") else {
452 return Ok(0);
453 };
454 let Some((timestamps_raw, _)) = handle.first_key_value()? else {
455 return Ok(0);
456 };
457 let mut timestamp_reader = Cursor::new(timestamps_raw);
458 timestamp_reader
459 .read_varint::<u64>()
460 .map_err(AppError::from)
461 }
462}