tracks lexicons and how many times they appeared on the jetstream
at main 17 kB view raw
1use std::{ 2 fmt::Debug, 3 io::Cursor, 4 ops::{Bound, Deref, RangeBounds}, 5 path::Path, 6 time::Duration, 7 u64, 8}; 9 10use ahash::{AHashMap, AHashSet}; 11use byteview::StrView; 12use fjall::{Keyspace, Partition, PartitionCreateOptions}; 13use itertools::{Either, Itertools}; 14use rayon::iter::{IntoParallelIterator, ParallelIterator}; 15use rclite::Arc; 16use rkyv::{Archive, Deserialize, Serialize, rancor::Error}; 17use smol_str::{SmolStr, ToSmolStr}; 18use tokio::sync::broadcast; 19use tokio_util::sync::CancellationToken; 20 21use crate::{ 22 db::handle::{ItemDecoder, LexiconHandle}, 23 error::{AppError, AppResult}, 24 jetstream::JetstreamEvent, 25 utils::{CLOCK, RateTracker, ReadVariableExt, varints_unsigned_encoded}, 26}; 27 28mod block; 29mod handle; 30 31#[derive(Clone, Debug, Default, Archive, Deserialize, Serialize, PartialEq)] 32#[rkyv(compare(PartialEq), derive(Debug))] 33pub struct NsidCounts { 34 pub count: u128, 35 pub deleted_count: u128, 36 pub last_seen: u64, 37} 38 39#[derive(Debug, Default, Archive, Deserialize, Serialize, PartialEq)] 40#[rkyv(compare(PartialEq), derive(Debug))] 41pub struct NsidHit { 42 pub deleted: bool, 43} 44 45#[derive(Clone)] 46pub struct EventRecord { 47 pub nsid: SmolStr, 48 pub timestamp: u64, // seconds 49 pub deleted: bool, 50} 51 52impl EventRecord { 53 pub fn from_jetstream(event: JetstreamEvent) -> Option<Self> { 54 match event { 55 JetstreamEvent::Commit { 56 time_us, commit, .. 57 } => Some(Self { 58 nsid: commit.collection.into(), 59 timestamp: time_us / 1_000_000, 60 deleted: false, 61 }), 62 JetstreamEvent::Delete { 63 time_us, commit, .. 64 } => Some(Self { 65 nsid: commit.collection.into(), 66 timestamp: time_us / 1_000_000, 67 deleted: true, 68 }), 69 _ => None, 70 } 71 } 72} 73 74pub struct DbInfo { 75 pub nsids: AHashMap<SmolStr, Vec<usize>>, 76 pub disk_size: u64, 77} 78 79pub struct DbConfig { 80 pub ks_config: fjall::Config, 81 pub min_block_size: usize, 82 pub max_block_size: usize, 83 pub max_last_activity: Duration, 84} 85 86impl DbConfig { 87 pub fn path(mut self, path: impl AsRef<Path>) -> Self { 88 self.ks_config = fjall::Config::new(path); 89 self 90 } 91 92 pub fn ks(mut self, f: impl FnOnce(fjall::Config) -> fjall::Config) -> Self { 93 self.ks_config = f(self.ks_config); 94 self 95 } 96} 97 98impl Default for DbConfig { 99 fn default() -> Self { 100 Self { 101 ks_config: fjall::Config::default() 102 .cache_size(1024 * 1024 * 512) 103 .max_write_buffer_size(u64::MAX), 104 min_block_size: 1000, 105 max_block_size: 250_000, 106 max_last_activity: Duration::from_secs(10), 107 } 108 } 109} 110 111// counts is nsid -> NsidCounts 112// hits is tree per nsid: varint start time + varint end time -> block of hits 113pub struct Db { 114 pub cfg: DbConfig, 115 pub ks: Keyspace, 116 counts: Partition, 117 hits: scc::HashIndex<SmolStr, Arc<LexiconHandle>, ahash::RandomState>, 118 sync_pool: threadpool::ThreadPool, 119 event_broadcaster: broadcast::Sender<(SmolStr, NsidCounts)>, 120 eps: RateTracker<100>, // 100 millis buckets 121 cancel_token: CancellationToken, 122} 123 124impl Db { 125 pub fn new(cfg: DbConfig, cancel_token: CancellationToken) -> AppResult<Self> { 126 tracing::info!("opening db..."); 127 let ks = cfg.ks_config.clone().open()?; 128 Ok(Self { 129 cfg, 130 hits: Default::default(), 131 sync_pool: threadpool::Builder::new() 132 .num_threads(rayon::current_num_threads() * 2) 133 .build(), 134 counts: ks.open_partition( 135 "_counts", 136 PartitionCreateOptions::default().compression(fjall::CompressionType::None), 137 )?, 138 ks, 139 event_broadcaster: broadcast::channel(1000).0, 140 eps: RateTracker::new(Duration::from_secs(1)), 141 cancel_token, 142 }) 143 } 144 145 #[inline(always)] 146 pub fn shutting_down(&self) -> impl Future<Output = ()> { 147 self.cancel_token.cancelled() 148 } 149 150 #[inline(always)] 151 pub fn is_shutting_down(&self) -> bool { 152 self.cancel_token.is_cancelled() 153 } 154 155 #[inline(always)] 156 pub fn eps(&self) -> usize { 157 self.eps.rate() as usize 158 } 159 160 #[inline(always)] 161 pub fn new_listener(&self) -> broadcast::Receiver<(SmolStr, NsidCounts)> { 162 self.event_broadcaster.subscribe() 163 } 164 165 pub fn sync(&self, all: bool) -> AppResult<()> { 166 let start = CLOCK.now(); 167 // prepare all the data 168 let nsids_len = self.hits.len(); 169 let mut data = Vec::with_capacity(nsids_len); 170 let mut nsids = AHashSet::with_capacity(nsids_len); 171 let _guard = scc::ebr::Guard::new(); 172 for (nsid, handle) in self.hits.iter(&_guard) { 173 let mut nsid_data = Vec::with_capacity(2); 174 // let mut total_count = 0; 175 let is_too_old = handle.since_last_activity() > self.cfg.max_last_activity; 176 // if we disconnect for a long time, we want to sync all of what we 177 // have to avoid having many small blocks (even if we run compaction 178 // later, it reduces work until we run compaction) 179 let block_size = (is_too_old || all) 180 .then_some(self.cfg.max_block_size) 181 .unwrap_or_else(|| { 182 self.cfg 183 .max_block_size 184 .min(self.cfg.min_block_size.max(handle.suggested_block_size())) 185 }); 186 let count = handle.item_count(); 187 let data_count = count / block_size; 188 if count > 0 && (all || data_count > 0 || is_too_old) { 189 for _ in 0..data_count { 190 nsid_data.push((handle.clone(), block_size)); 191 // total_count += block_size; 192 } 193 // only sync remainder if we haven't met block size 194 let remainder = count % block_size; 195 if (all || data_count == 0) && remainder > 0 { 196 nsid_data.push((handle.clone(), remainder)); 197 // total_count += remainder; 198 } 199 } 200 let _span = handle.span().entered(); 201 if nsid_data.len() > 0 { 202 // tracing::info!( 203 // {blocks = %nsid_data.len(), count = %total_count}, 204 // "will encode & sync", 205 // ); 206 nsids.insert(nsid.clone()); 207 data.push(nsid_data); 208 } 209 } 210 drop(_guard); 211 212 // process the blocks 213 data.into_par_iter() 214 .map(|chunk| { 215 chunk 216 .into_iter() 217 .map(|(handle, max_block_size)| { 218 (handle.take_block_items(max_block_size), handle) 219 }) 220 .collect::<Vec<_>>() 221 .into_par_iter() 222 .map(|(items, handle)| { 223 let count = items.len(); 224 let block = LexiconHandle::encode_block_from_items(items, count)?; 225 AppResult::Ok((block, handle)) 226 }) 227 .collect::<Result<Vec<_>, _>>() 228 }) 229 .try_for_each(|chunk| { 230 let chunk = chunk?; 231 for (block, handle) in chunk { 232 self.sync_pool.execute(move || { 233 let _span = handle.span().entered(); 234 let written = block.written; 235 match handle.insert_block(block) { 236 Ok(_) => { 237 tracing::info!({count = %written}, "synced") 238 } 239 Err(err) => tracing::error!({ err = %err }, "failed to sync block"), 240 } 241 }); 242 } 243 AppResult::Ok(()) 244 })?; 245 self.sync_pool.join(); 246 247 // update snapshots for all (changed) handles 248 for nsid in nsids { 249 self.hits.peek_with(&nsid, |_, handle| handle.update_tree()); 250 } 251 252 tracing::info!(time = %start.elapsed().as_secs_f64(), "synced all blocks"); 253 254 Ok(()) 255 } 256 257 pub fn compact( 258 &self, 259 nsid: impl AsRef<str>, 260 max_count: usize, 261 range: impl RangeBounds<u64>, 262 sort: bool, 263 ) -> AppResult<()> { 264 let Some(handle) = self.get_handle(nsid) else { 265 return Ok(()); 266 }; 267 handle.compact(max_count, range, sort)?; 268 handle.update_tree(); 269 Ok(()) 270 } 271 272 pub fn compact_all( 273 &self, 274 max_count: usize, 275 range: impl RangeBounds<u64> + Clone, 276 sort: bool, 277 ) -> AppResult<()> { 278 for nsid in self.get_nsids() { 279 self.compact(nsid, max_count, range.clone(), sort)?; 280 } 281 Ok(()) 282 } 283 284 pub fn major_compact(&self) -> AppResult<()> { 285 self.compact_all(self.cfg.max_block_size, .., true)?; 286 Ok(()) 287 } 288 289 #[inline(always)] 290 fn get_handle(&self, nsid: impl AsRef<str>) -> Option<Arc<LexiconHandle>> { 291 let _guard = scc::ebr::Guard::new(); 292 let handle = match self.hits.peek(nsid.as_ref(), &_guard) { 293 Some(handle) => handle.clone(), 294 None => { 295 if self.ks.partition_exists(nsid.as_ref()) { 296 let handle = Arc::new(LexiconHandle::new(&self.ks, nsid.as_ref())); 297 let _ = self.hits.insert(SmolStr::new(nsid), handle.clone()); 298 handle 299 } else { 300 return None; 301 } 302 } 303 }; 304 Some(handle) 305 } 306 307 #[inline(always)] 308 fn ensure_handle(&self, nsid: &SmolStr) -> impl Deref<Target = Arc<LexiconHandle>> + use<'_> { 309 self.hits 310 .entry(nsid.clone()) 311 .or_insert_with(|| Arc::new(LexiconHandle::new(&self.ks, &nsid))) 312 } 313 314 pub fn ingest_events(&self, events: impl Iterator<Item = EventRecord>) -> AppResult<()> { 315 let mut seen_events = 0; 316 for (key, chunk) in events.chunk_by(|event| event.nsid.clone()).into_iter() { 317 let mut counts = self.get_count(&key)?; 318 self.ensure_handle(&key).queue(chunk.inspect(|e| { 319 // increment count 320 counts.last_seen = e.timestamp; 321 if e.deleted { 322 counts.deleted_count += 1; 323 } else { 324 counts.count += 1; 325 } 326 seen_events += 1; 327 })); 328 self.insert_count(&key, &counts)?; 329 if self.event_broadcaster.receiver_count() > 0 { 330 let _ = self.event_broadcaster.send((key, counts)); 331 } 332 } 333 self.eps.observe(seen_events); 334 Ok(()) 335 } 336 337 #[inline(always)] 338 fn insert_count(&self, nsid: &str, counts: &NsidCounts) -> AppResult<()> { 339 self.counts 340 .insert( 341 nsid, 342 unsafe { rkyv::to_bytes::<Error>(counts).unwrap_unchecked() }.as_slice(), 343 ) 344 .map_err(AppError::from) 345 } 346 347 pub fn get_count(&self, nsid: &str) -> AppResult<NsidCounts> { 348 let Some(raw) = self.counts.get(nsid)? else { 349 return Ok(NsidCounts::default()); 350 }; 351 Ok(unsafe { rkyv::from_bytes_unchecked::<_, Error>(&raw).unwrap_unchecked() }) 352 } 353 354 pub fn get_counts(&self) -> impl Iterator<Item = AppResult<(SmolStr, NsidCounts)>> { 355 self.counts.iter().map(|res| { 356 res.map_err(AppError::from).map(|(key, val)| { 357 ( 358 SmolStr::new(unsafe { str::from_utf8_unchecked(&key) }), 359 unsafe { rkyv::from_bytes_unchecked::<_, Error>(&val).unwrap_unchecked() }, 360 ) 361 }) 362 }) 363 } 364 365 pub fn get_nsids(&self) -> impl Iterator<Item = StrView> { 366 self.ks 367 .list_partitions() 368 .into_iter() 369 .filter(|k| k.deref() != "_counts") 370 } 371 372 pub fn info(&self) -> AppResult<DbInfo> { 373 let mut nsids = AHashMap::new(); 374 for nsid in self.get_nsids() { 375 let Some(handle) = self.get_handle(&nsid) else { 376 continue; 377 }; 378 let block_lens = handle 379 .read() 380 .iter() 381 .rev() 382 .try_fold(Vec::new(), |mut acc, item| { 383 let (key, value) = item?; 384 let mut timestamps = Cursor::new(key); 385 let start_timestamp = timestamps.read_varint()?; 386 let decoder = ItemDecoder::new(Cursor::new(value), start_timestamp)?; 387 acc.push(decoder.item_count()); 388 AppResult::Ok(acc) 389 })?; 390 nsids.insert(nsid.to_smolstr(), block_lens); 391 } 392 Ok(DbInfo { 393 nsids, 394 disk_size: self.ks.disk_space(), 395 }) 396 } 397 398 pub fn get_hits( 399 &self, 400 nsid: &str, 401 range: impl RangeBounds<u64> + std::fmt::Debug, 402 max_items: usize, 403 ) -> impl Iterator<Item = AppResult<handle::Item>> { 404 let start_limit = match range.start_bound().cloned() { 405 Bound::Included(start) => start, 406 Bound::Excluded(start) => start.saturating_add(1), 407 Bound::Unbounded => 0, 408 }; 409 let end_limit = match range.end_bound().cloned() { 410 Bound::Included(end) => end, 411 Bound::Excluded(end) => end.saturating_sub(1), 412 Bound::Unbounded => u64::MAX, 413 }; 414 let end_key = varints_unsigned_encoded([end_limit]); 415 416 let Some(handle) = self.get_handle(nsid) else { 417 return Either::Right(std::iter::empty()); 418 }; 419 420 // let mut ts = CLOCK.now(); 421 let map_block = move |(res, current_item_count)| -> AppResult<(Option<_>, usize)> { 422 if current_item_count >= max_items { 423 return Ok((None, current_item_count)); 424 } 425 let (key, val) = res?; 426 let mut key_reader = Cursor::new(key); 427 let start_timestamp = key_reader.read_varint::<u64>()?; 428 // let end_timestamp = key_reader.read_varint::<u64>()?; 429 if start_timestamp < start_limit { 430 // tracing::info!( 431 // "stopped at block with timestamps {start_timestamp}..{end_timestamp} because {start_limit} is greater" 432 // ); 433 return Ok((None, current_item_count)); 434 } 435 let decoder = handle::ItemDecoder::new(Cursor::new(val), start_timestamp)?; 436 let current_item_count = current_item_count + decoder.item_count(); 437 // tracing::info!( 438 // "took {}ns to get block with size {}", 439 // ts.elapsed().as_nanos(), 440 // decoder.item_count() 441 // ); 442 // ts = CLOCK.now(); 443 Ok(( 444 Some( 445 decoder 446 .take_while(move |item| { 447 item.as_ref().map_or(true, |item| { 448 item.timestamp <= end_limit && item.timestamp >= start_limit 449 }) 450 }) 451 .map(|res| res.map_err(AppError::from)), 452 ), 453 current_item_count, 454 )) 455 }; 456 457 let (blocks, _counted) = handle 458 .read() 459 .range(..end_key) 460 .map(|res| res.map_err(AppError::from)) 461 .rev() 462 .fold_while( 463 (Vec::with_capacity(20), 0), 464 |(mut blocks, current_item_count), res| { 465 use itertools::FoldWhile::*; 466 467 match map_block((res, current_item_count)) { 468 Ok((Some(block), current_item_count)) => { 469 blocks.push(Ok(block)); 470 Continue((blocks, current_item_count)) 471 } 472 Ok((None, current_item_count)) => Done((blocks, current_item_count)), 473 Err(err) => { 474 blocks.push(Err(err)); 475 Done((blocks, current_item_count)) 476 } 477 } 478 }, 479 ) 480 .into_inner(); 481 482 // tracing::info!( 483 // "got blocks with size {}, item count {counted}", 484 // blocks.len() 485 // ); 486 487 Either::Left(blocks.into_iter().rev().flatten().flatten()) 488 } 489 490 pub fn tracking_since(&self) -> AppResult<u64> { 491 // HACK: we should actually store when we started tracking but im lazy 492 // this should be accurate enough 493 let Some(handle) = self.get_handle("app.bsky.feed.like") else { 494 return Ok(0); 495 }; 496 let Some((timestamps_raw, _)) = handle.read().first_key_value()? else { 497 return Ok(0); 498 }; 499 let mut timestamp_reader = Cursor::new(timestamps_raw); 500 timestamp_reader 501 .read_varint::<u64>() 502 .map_err(AppError::from) 503 } 504}