tracks lexicons and how many times they appeared on the jetstream
1use std::{ 2 collections::HashMap, 3 fmt::Debug, 4 io::Cursor, 5 ops::{Bound, Deref, RangeBounds}, 6 path::Path, 7 time::Duration, 8 u64, 9}; 10 11use byteview::StrView; 12use fjall::{Keyspace, Partition, PartitionCreateOptions}; 13use itertools::{Either, Itertools}; 14use rayon::iter::{IntoParallelIterator, ParallelIterator}; 15use rclite::Arc; 16use rkyv::{Archive, Deserialize, Serialize, rancor::Error}; 17use smol_str::{SmolStr, ToSmolStr}; 18use tokio::sync::broadcast; 19use tokio_util::sync::CancellationToken; 20 21use crate::{ 22 db::handle::{ItemDecoder, LexiconHandle}, 23 error::{AppError, AppResult}, 24 jetstream::JetstreamEvent, 25 utils::{CLOCK, RateTracker, ReadVariableExt, varints_unsigned_encoded}, 26}; 27 28mod block; 29mod handle; 30 31#[derive(Clone, Debug, Default, Archive, Deserialize, Serialize, PartialEq)] 32#[rkyv(compare(PartialEq), derive(Debug))] 33pub struct NsidCounts { 34 pub count: u128, 35 pub deleted_count: u128, 36 pub last_seen: u64, 37} 38 39#[derive(Debug, Default, Archive, Deserialize, Serialize, PartialEq)] 40#[rkyv(compare(PartialEq), derive(Debug))] 41pub struct NsidHit { 42 pub deleted: bool, 43} 44 45#[derive(Clone)] 46pub struct EventRecord { 47 pub nsid: SmolStr, 48 pub timestamp: u64, // seconds 49 pub deleted: bool, 50} 51 52impl EventRecord { 53 pub fn from_jetstream(event: JetstreamEvent) -> Option<Self> { 54 match event { 55 JetstreamEvent::Commit { 56 time_us, commit, .. 57 } => Some(Self { 58 nsid: commit.collection.into(), 59 timestamp: time_us / 1_000_000, 60 deleted: false, 61 }), 62 JetstreamEvent::Delete { 63 time_us, commit, .. 64 } => Some(Self { 65 nsid: commit.collection.into(), 66 timestamp: time_us / 1_000_000, 67 deleted: true, 68 }), 69 _ => None, 70 } 71 } 72} 73 74pub struct DbInfo { 75 pub nsids: HashMap<SmolStr, Vec<usize>>, 76 pub disk_size: u64, 77} 78 79pub struct DbConfig { 80 pub ks_config: fjall::Config, 81 pub min_block_size: usize, 82 pub max_block_size: usize, 83 pub max_last_activity: Duration, 84} 85 86impl DbConfig { 87 pub fn path(mut self, path: impl AsRef<Path>) -> Self { 88 self.ks_config = fjall::Config::new(path); 89 self 90 } 91 92 pub fn ks(mut self, f: impl FnOnce(fjall::Config) -> fjall::Config) -> Self { 93 self.ks_config = f(self.ks_config); 94 self 95 } 96} 97 98impl Default for DbConfig { 99 fn default() -> Self { 100 Self { 101 ks_config: fjall::Config::default() 102 .cache_size(1024 * 1024 * 512) 103 .max_write_buffer_size(u64::MAX), 104 min_block_size: 1000, 105 max_block_size: 250_000, 106 max_last_activity: Duration::from_secs(10), 107 } 108 } 109} 110 111// counts is nsid -> NsidCounts 112// hits is tree per nsid: varint start time + varint end time -> block of hits 113pub struct Db { 114 pub cfg: DbConfig, 115 pub ks: Keyspace, 116 counts: Partition, 117 hits: scc::HashIndex<SmolStr, Arc<LexiconHandle>>, 118 sync_pool: threadpool::ThreadPool, 119 event_broadcaster: broadcast::Sender<(SmolStr, NsidCounts)>, 120 eps: RateTracker<100>, // 100 millis buckets 121 cancel_token: CancellationToken, 122} 123 124impl Db { 125 pub fn new(cfg: DbConfig, cancel_token: CancellationToken) -> AppResult<Self> { 126 tracing::info!("opening db..."); 127 let ks = cfg.ks_config.clone().open()?; 128 Ok(Self { 129 cfg, 130 hits: Default::default(), 131 sync_pool: threadpool::Builder::new() 132 .num_threads(rayon::current_num_threads() * 2) 133 .build(), 134 counts: ks.open_partition( 135 "_counts", 136 PartitionCreateOptions::default().compression(fjall::CompressionType::None), 137 )?, 138 ks, 139 event_broadcaster: broadcast::channel(1000).0, 140 eps: RateTracker::new(Duration::from_secs(1)), 141 cancel_token, 142 }) 143 } 144 145 #[inline(always)] 146 pub fn shutting_down(&self) -> impl Future<Output = ()> { 147 self.cancel_token.cancelled() 148 } 149 150 #[inline(always)] 151 pub fn is_shutting_down(&self) -> bool { 152 self.cancel_token.is_cancelled() 153 } 154 155 #[inline(always)] 156 pub fn eps(&self) -> usize { 157 self.eps.rate() as usize 158 } 159 160 #[inline(always)] 161 pub fn new_listener(&self) -> broadcast::Receiver<(SmolStr, NsidCounts)> { 162 self.event_broadcaster.subscribe() 163 } 164 165 pub fn sync(&self, all: bool) -> AppResult<()> { 166 let start = CLOCK.now(); 167 // prepare all the data 168 let mut data = Vec::with_capacity(self.hits.len()); 169 let _guard = scc::ebr::Guard::new(); 170 for (_, handle) in self.hits.iter(&_guard) { 171 let mut nsid_data = Vec::with_capacity(2); 172 let mut total_count = 0; 173 let is_too_old = handle.since_last_activity() > self.cfg.max_last_activity; 174 // if we disconnect for a long time, we want to sync all of what we 175 // have to avoid having many small blocks (even if we run compaction 176 // later, it reduces work until we run compaction) 177 let block_size = (is_too_old || all) 178 .then_some(self.cfg.max_block_size) 179 .unwrap_or_else(|| { 180 self.cfg 181 .max_block_size 182 .min(self.cfg.min_block_size.max(handle.suggested_block_size())) 183 }); 184 let count = handle.item_count(); 185 let data_count = count / block_size; 186 if count > 0 && (all || data_count > 0 || is_too_old) { 187 for _ in 0..data_count { 188 nsid_data.push((handle.clone(), block_size)); 189 total_count += block_size; 190 } 191 // only sync remainder if we haven't met block size 192 let remainder = count % block_size; 193 if (all || data_count == 0) && remainder > 0 { 194 nsid_data.push((handle.clone(), remainder)); 195 total_count += remainder; 196 } 197 } 198 let _span = handle.span().entered(); 199 if nsid_data.len() > 0 { 200 tracing::info!( 201 {blocks = %nsid_data.len(), count = %total_count}, 202 "will encode & sync", 203 ); 204 data.push(nsid_data); 205 } 206 } 207 drop(_guard); 208 209 // process the blocks 210 data.into_par_iter() 211 .map(|chunk| { 212 chunk 213 .into_iter() 214 .map(|(handle, max_block_size)| { 215 (handle.take_block_items(max_block_size), handle) 216 }) 217 .collect::<Vec<_>>() 218 .into_par_iter() 219 .map(|(items, handle)| { 220 let count = items.len(); 221 let block = LexiconHandle::encode_block_from_items(items, count)?; 222 AppResult::Ok((block, handle)) 223 }) 224 .collect::<Result<Vec<_>, _>>() 225 }) 226 .try_for_each(|chunk| { 227 let chunk = chunk?; 228 for (block, handle) in chunk { 229 self.sync_pool.execute(move || { 230 let _span = handle.span().entered(); 231 match handle.insert(block.key, block.data) { 232 Ok(_) => { 233 tracing::info!({count = %block.written}, "synced") 234 } 235 Err(err) => tracing::error!({ err = %err }, "failed to sync block"), 236 } 237 }); 238 } 239 AppResult::Ok(()) 240 })?; 241 self.sync_pool.join(); 242 tracing::info!(time = %start.elapsed().as_secs_f64(), "synced all blocks"); 243 244 Ok(()) 245 } 246 247 pub fn compact( 248 &self, 249 nsid: impl AsRef<str>, 250 max_count: usize, 251 range: impl RangeBounds<u64>, 252 sort: bool, 253 ) -> AppResult<()> { 254 let Some(handle) = self.get_handle(nsid) else { 255 return Ok(()); 256 }; 257 handle.compact(max_count, range, sort) 258 } 259 260 pub fn compact_all( 261 &self, 262 max_count: usize, 263 range: impl RangeBounds<u64> + Clone, 264 sort: bool, 265 ) -> AppResult<()> { 266 for nsid in self.get_nsids() { 267 self.compact(nsid, max_count, range.clone(), sort)?; 268 } 269 Ok(()) 270 } 271 272 pub fn major_compact(&self) -> AppResult<()> { 273 self.compact_all(self.cfg.max_block_size, .., true)?; 274 Ok(()) 275 } 276 277 #[inline(always)] 278 fn get_handle(&self, nsid: impl AsRef<str>) -> Option<Arc<LexiconHandle>> { 279 let _guard = scc::ebr::Guard::new(); 280 let handle = match self.hits.peek(nsid.as_ref(), &_guard) { 281 Some(handle) => handle.clone(), 282 None => { 283 if self.ks.partition_exists(nsid.as_ref()) { 284 let handle = Arc::new(LexiconHandle::new(&self.ks, nsid.as_ref())); 285 let _ = self.hits.insert(SmolStr::new(nsid), handle.clone()); 286 handle 287 } else { 288 return None; 289 } 290 } 291 }; 292 Some(handle) 293 } 294 295 #[inline(always)] 296 fn ensure_handle(&self, nsid: &SmolStr) -> impl Deref<Target = Arc<LexiconHandle>> + use<'_> { 297 self.hits 298 .entry(nsid.clone()) 299 .or_insert_with(|| Arc::new(LexiconHandle::new(&self.ks, &nsid))) 300 } 301 302 pub fn ingest_events(&self, events: impl Iterator<Item = EventRecord>) -> AppResult<()> { 303 for (key, chunk) in events.chunk_by(|event| event.nsid.clone()).into_iter() { 304 let mut counts = self.get_count(&key)?; 305 let mut count = 0; 306 self.ensure_handle(&key).queue(chunk.inspect(|e| { 307 // increment count 308 counts.last_seen = e.timestamp; 309 if e.deleted { 310 counts.deleted_count += 1; 311 } else { 312 counts.count += 1; 313 } 314 count += 1; 315 })); 316 self.eps.observe(count); 317 self.insert_count(&key, &counts)?; 318 if self.event_broadcaster.receiver_count() > 0 { 319 let _ = self.event_broadcaster.send((key, counts)); 320 } 321 } 322 Ok(()) 323 } 324 325 #[inline(always)] 326 fn insert_count(&self, nsid: &str, counts: &NsidCounts) -> AppResult<()> { 327 self.counts 328 .insert( 329 nsid, 330 unsafe { rkyv::to_bytes::<Error>(counts).unwrap_unchecked() }.as_slice(), 331 ) 332 .map_err(AppError::from) 333 } 334 335 pub fn get_count(&self, nsid: &str) -> AppResult<NsidCounts> { 336 let Some(raw) = self.counts.get(nsid)? else { 337 return Ok(NsidCounts::default()); 338 }; 339 Ok(unsafe { rkyv::from_bytes_unchecked::<_, Error>(&raw).unwrap_unchecked() }) 340 } 341 342 pub fn get_counts(&self) -> impl Iterator<Item = AppResult<(SmolStr, NsidCounts)>> { 343 self.counts.iter().map(|res| { 344 res.map_err(AppError::from).map(|(key, val)| { 345 ( 346 SmolStr::new(unsafe { str::from_utf8_unchecked(&key) }), 347 unsafe { rkyv::from_bytes_unchecked::<_, Error>(&val).unwrap_unchecked() }, 348 ) 349 }) 350 }) 351 } 352 353 pub fn get_nsids(&self) -> impl Iterator<Item = StrView> { 354 self.ks 355 .list_partitions() 356 .into_iter() 357 .filter(|k| k.deref() != "_counts") 358 } 359 360 pub fn info(&self) -> AppResult<DbInfo> { 361 let mut nsids = HashMap::new(); 362 for nsid in self.get_nsids() { 363 let Some(handle) = self.get_handle(&nsid) else { 364 continue; 365 }; 366 let block_lens = handle.iter().rev().try_fold(Vec::new(), |mut acc, item| { 367 let (key, value) = item?; 368 let mut timestamps = Cursor::new(key); 369 let start_timestamp = timestamps.read_varint()?; 370 let decoder = ItemDecoder::new(Cursor::new(value), start_timestamp)?; 371 acc.push(decoder.item_count()); 372 AppResult::Ok(acc) 373 })?; 374 nsids.insert(nsid.to_smolstr(), block_lens); 375 } 376 Ok(DbInfo { 377 nsids, 378 disk_size: self.ks.disk_space(), 379 }) 380 } 381 382 pub fn get_hits( 383 &self, 384 nsid: &str, 385 range: impl RangeBounds<u64> + std::fmt::Debug, 386 max_items: usize, 387 ) -> impl Iterator<Item = AppResult<handle::Item>> { 388 let start_limit = match range.start_bound().cloned() { 389 Bound::Included(start) => start, 390 Bound::Excluded(start) => start.saturating_add(1), 391 Bound::Unbounded => 0, 392 }; 393 let end_limit = match range.end_bound().cloned() { 394 Bound::Included(end) => end, 395 Bound::Excluded(end) => end.saturating_sub(1), 396 Bound::Unbounded => u64::MAX, 397 }; 398 let end_key = varints_unsigned_encoded([end_limit]); 399 400 let Some(handle) = self.get_handle(nsid) else { 401 return Either::Right(std::iter::empty()); 402 }; 403 404 // let mut ts = CLOCK.now(); 405 let mut current_item_count = 0; 406 let map_block = move |(key, val)| { 407 if current_item_count > max_items { 408 return Ok(None); 409 } 410 let mut key_reader = Cursor::new(key); 411 let start_timestamp = key_reader.read_varint::<u64>()?; 412 // let end_timestamp = key_reader.read_varint::<u64>()?; 413 if start_timestamp < start_limit { 414 // tracing::info!( 415 // "stopped at block with timestamps {start_timestamp}..{end_timestamp} because {start_limit} is greater" 416 // ); 417 return Ok(None); 418 } 419 let decoder = handle::ItemDecoder::new(Cursor::new(val), start_timestamp)?; 420 current_item_count += decoder.item_count(); 421 // tracing::info!( 422 // "took {}ns to get block with size {}", 423 // ts.elapsed().as_nanos(), 424 // decoder.item_count() 425 // ); 426 // ts = CLOCK.now(); 427 Ok(Some( 428 decoder 429 .take_while(move |item| { 430 item.as_ref().map_or(true, |item| { 431 item.timestamp <= end_limit && item.timestamp >= start_limit 432 }) 433 }) 434 .map(|res| res.map_err(AppError::from)), 435 )) 436 }; 437 438 Either::Left( 439 handle 440 .range(..end_key) 441 .rev() 442 .map_while(move |res| res.map_err(AppError::from).and_then(map_block).transpose()) 443 .flatten() 444 .flatten(), 445 ) 446 } 447 448 pub fn tracking_since(&self) -> AppResult<u64> { 449 // HACK: we should actually store when we started tracking but im lazy 450 // this should be accurate enough 451 let Some(handle) = self.get_handle("app.bsky.feed.like") else { 452 return Ok(0); 453 }; 454 let Some((timestamps_raw, _)) = handle.first_key_value()? else { 455 return Ok(0); 456 }; 457 let mut timestamp_reader = Cursor::new(timestamps_raw); 458 timestamp_reader 459 .read_varint::<u64>() 460 .map_err(AppError::from) 461 } 462}