tracks lexicons and how many times they appeared on the jetstream
at migrate 15 kB view raw
1use std::{ 2 collections::HashMap, 3 fmt::Debug, 4 io::Cursor, 5 ops::{Bound, Deref, RangeBounds}, 6 path::{Path, PathBuf}, 7 time::Duration, 8}; 9 10use byteview::StrView; 11use fjall::{Config, Keyspace, Partition, PartitionCreateOptions}; 12use itertools::{Either, Itertools}; 13use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}; 14use rclite::Arc; 15use rkyv::{Archive, Deserialize, Serialize, rancor::Error}; 16use smol_str::{SmolStr, ToSmolStr}; 17use tokio::sync::broadcast; 18use tokio_util::sync::CancellationToken; 19 20use crate::{ 21 db::handle::{ItemDecoder, LexiconHandle}, 22 error::{AppError, AppResult}, 23 jetstream::JetstreamEvent, 24 utils::{RateTracker, ReadVariableExt, varints_unsigned_encoded}, 25}; 26 27mod block; 28mod handle; 29 30#[derive(Clone, Debug, Default, Archive, Deserialize, Serialize, PartialEq)] 31#[rkyv(compare(PartialEq), derive(Debug))] 32pub struct NsidCounts { 33 pub count: u128, 34 pub deleted_count: u128, 35 pub last_seen: u64, 36} 37 38#[derive(Debug, Default, Archive, Deserialize, Serialize, PartialEq)] 39#[rkyv(compare(PartialEq), derive(Debug))] 40pub struct NsidHit { 41 pub deleted: bool, 42} 43 44#[derive(Clone)] 45pub struct EventRecord { 46 pub nsid: SmolStr, 47 pub timestamp: u64, // seconds 48 pub deleted: bool, 49} 50 51impl EventRecord { 52 pub fn from_jetstream(event: JetstreamEvent) -> Option<Self> { 53 match event { 54 JetstreamEvent::Commit { 55 time_us, commit, .. 56 } => Some(Self { 57 nsid: commit.collection.into(), 58 timestamp: time_us / 1_000_000, 59 deleted: false, 60 }), 61 JetstreamEvent::Delete { 62 time_us, commit, .. 63 } => Some(Self { 64 nsid: commit.collection.into(), 65 timestamp: time_us / 1_000_000, 66 deleted: true, 67 }), 68 _ => None, 69 } 70 } 71} 72 73pub struct DbInfo { 74 pub nsids: HashMap<SmolStr, Vec<usize>>, 75 pub disk_size: u64, 76} 77 78pub struct DbConfig { 79 pub ks_config: fjall::Config, 80 pub min_block_size: usize, 81 pub max_block_size: usize, 82 pub max_last_activity: u64, 83} 84 85impl DbConfig { 86 pub fn path(mut self, path: impl AsRef<Path>) -> Self { 87 self.ks_config = fjall::Config::new(path); 88 self 89 } 90 91 pub fn ks(mut self, f: impl FnOnce(fjall::Config) -> fjall::Config) -> Self { 92 self.ks_config = f(self.ks_config); 93 self 94 } 95} 96 97impl Default for DbConfig { 98 fn default() -> Self { 99 Self { 100 ks_config: fjall::Config::default(), 101 min_block_size: 512, 102 max_block_size: 500_000, 103 max_last_activity: Duration::from_secs(10).as_nanos() as u64, 104 } 105 } 106} 107 108// counts is nsid -> NsidCounts 109// hits is tree per nsid: varint start time + varint end time -> block of hits 110pub struct Db { 111 pub cfg: DbConfig, 112 pub ks: Keyspace, 113 counts: Partition, 114 hits: scc::HashIndex<SmolStr, Arc<LexiconHandle>>, 115 sync_pool: threadpool::ThreadPool, 116 event_broadcaster: broadcast::Sender<(SmolStr, NsidCounts)>, 117 eps: RateTracker<100>, 118 cancel_token: CancellationToken, 119} 120 121impl Db { 122 pub fn new(cfg: DbConfig, cancel_token: CancellationToken) -> AppResult<Self> { 123 tracing::info!("opening db..."); 124 let ks = cfg.ks_config.clone().open()?; 125 Ok(Self { 126 cfg, 127 hits: Default::default(), 128 sync_pool: threadpool::Builder::new() 129 .num_threads(rayon::current_num_threads() * 2) 130 .build(), 131 counts: ks.open_partition( 132 "_counts", 133 PartitionCreateOptions::default().compression(fjall::CompressionType::None), 134 )?, 135 ks, 136 event_broadcaster: broadcast::channel(1000).0, 137 eps: RateTracker::new(Duration::from_secs(1)), 138 cancel_token, 139 }) 140 } 141 142 #[inline(always)] 143 pub fn shutting_down(&self) -> impl Future<Output = ()> { 144 self.cancel_token.cancelled() 145 } 146 147 #[inline(always)] 148 pub fn is_shutting_down(&self) -> bool { 149 self.cancel_token.is_cancelled() 150 } 151 152 #[inline(always)] 153 pub fn eps(&self) -> usize { 154 self.eps.rate() as usize 155 } 156 157 #[inline(always)] 158 pub fn new_listener(&self) -> broadcast::Receiver<(SmolStr, NsidCounts)> { 159 self.event_broadcaster.subscribe() 160 } 161 162 pub fn sync(&self, all: bool) -> AppResult<()> { 163 // prepare all the data 164 let mut data = Vec::with_capacity(self.hits.len()); 165 let _guard = scc::ebr::Guard::new(); 166 for (_, handle) in self.hits.iter(&_guard) { 167 let mut nsid_data = Vec::with_capacity(2); 168 let mut total_count = 0; 169 let is_too_old = handle.since_last_activity() > self.cfg.max_last_activity; 170 // if we disconnect for a long time, we want to sync all of what we 171 // have to avoid having many small blocks (even if we run compaction 172 // later, it reduces work until we run compaction) 173 let block_size = (is_too_old || all) 174 .then_some(self.cfg.max_block_size) 175 .unwrap_or_else(|| { 176 self.cfg 177 .max_block_size 178 .min(self.cfg.min_block_size.max(handle.suggested_block_size())) 179 }); 180 let count = handle.item_count(); 181 let data_count = count / block_size; 182 if count > 0 && (all || data_count > 0 || is_too_old) { 183 for i in 0..data_count { 184 nsid_data.push((i, handle.clone(), block_size)); 185 total_count += block_size; 186 } 187 // only sync remainder if we haven't met block size 188 let remainder = count % block_size; 189 if (all || data_count == 0) && remainder > 0 { 190 nsid_data.push((data_count, handle.clone(), remainder)); 191 total_count += remainder; 192 } 193 } 194 tracing::info!( 195 "{}: will sync {} blocks ({} count)", 196 handle.nsid(), 197 nsid_data.len(), 198 total_count, 199 ); 200 data.push(nsid_data); 201 } 202 drop(_guard); 203 204 // process the blocks 205 data.into_par_iter() 206 .map(|chunk| { 207 chunk 208 .into_iter() 209 .map(|(i, handle, max_block_size)| { 210 (i, handle.take_block_items(max_block_size), handle) 211 }) 212 .collect::<Vec<_>>() 213 .into_par_iter() 214 .map(|(i, items, handle)| { 215 let count = items.len(); 216 let block = LexiconHandle::encode_block_from_items(items, count)?; 217 tracing::info!( 218 "{}: encoded block with {} items", 219 handle.nsid(), 220 block.written, 221 ); 222 AppResult::Ok((i, block, handle)) 223 }) 224 .collect::<Result<Vec<_>, _>>() 225 }) 226 .try_for_each(|chunk| { 227 let chunk = chunk?; 228 for (i, block, handle) in chunk { 229 self.sync_pool 230 .execute(move || match handle.insert(block.key, block.data) { 231 Ok(_) => { 232 tracing::info!("{}: [{i}] synced {}", block.written, handle.nsid()) 233 } 234 Err(err) => tracing::error!("failed to sync block: {}", err), 235 }); 236 } 237 AppResult::Ok(()) 238 })?; 239 self.sync_pool.join(); 240 241 Ok(()) 242 } 243 244 pub fn compact( 245 &self, 246 nsid: impl AsRef<str>, 247 max_count: usize, 248 range: impl RangeBounds<u64>, 249 sort: bool, 250 ) -> AppResult<()> { 251 let Some(handle) = self.get_handle(nsid) else { 252 return Ok(()); 253 }; 254 handle.compact(max_count, range, sort) 255 } 256 257 pub fn compact_all( 258 &self, 259 max_count: usize, 260 range: impl RangeBounds<u64> + Clone, 261 sort: bool, 262 ) -> AppResult<()> { 263 for nsid in self.get_nsids() { 264 self.compact(nsid, max_count, range.clone(), sort)?; 265 } 266 Ok(()) 267 } 268 269 pub fn major_compact(&self) -> AppResult<()> { 270 self.compact_all(self.cfg.max_block_size, .., true)?; 271 let _guard = scc::ebr::Guard::new(); 272 for (_, handle) in self.hits.iter(&_guard) { 273 handle.deref().major_compact()?; 274 } 275 Ok(()) 276 } 277 278 #[inline(always)] 279 fn get_handle(&self, nsid: impl AsRef<str>) -> Option<Arc<LexiconHandle>> { 280 let _guard = scc::ebr::Guard::new(); 281 let handle = match self.hits.peek(nsid.as_ref(), &_guard) { 282 Some(handle) => handle.clone(), 283 None => { 284 if self.ks.partition_exists(nsid.as_ref()) { 285 let handle = Arc::new(LexiconHandle::new(&self.ks, nsid.as_ref())); 286 let _ = self.hits.insert(SmolStr::new(nsid), handle.clone()); 287 handle 288 } else { 289 return None; 290 } 291 } 292 }; 293 Some(handle) 294 } 295 296 #[inline(always)] 297 fn ensure_handle(&self, nsid: &SmolStr) -> impl Deref<Target = Arc<LexiconHandle>> + use<'_> { 298 self.hits 299 .entry(nsid.clone()) 300 .or_insert_with(|| Arc::new(LexiconHandle::new(&self.ks, &nsid))) 301 } 302 303 pub fn ingest_events(&self, events: impl Iterator<Item = EventRecord>) -> AppResult<()> { 304 for (key, chunk) in events.chunk_by(|event| event.nsid.clone()).into_iter() { 305 let mut counts = self.get_count(&key)?; 306 let mut count = 0; 307 self.ensure_handle(&key).queue(chunk.inspect(|e| { 308 // increment count 309 counts.last_seen = e.timestamp; 310 if e.deleted { 311 counts.deleted_count += 1; 312 } else { 313 counts.count += 1; 314 } 315 count += 1; 316 })); 317 self.eps.observe(count); 318 self.insert_count(&key, &counts)?; 319 if self.event_broadcaster.receiver_count() > 0 { 320 let _ = self.event_broadcaster.send((key, counts)); 321 } 322 } 323 Ok(()) 324 } 325 326 #[inline(always)] 327 fn insert_count(&self, nsid: &str, counts: &NsidCounts) -> AppResult<()> { 328 self.counts 329 .insert( 330 nsid, 331 unsafe { rkyv::to_bytes::<Error>(counts).unwrap_unchecked() }.as_slice(), 332 ) 333 .map_err(AppError::from) 334 } 335 336 pub fn get_count(&self, nsid: &str) -> AppResult<NsidCounts> { 337 let Some(raw) = self.counts.get(nsid)? else { 338 return Ok(NsidCounts::default()); 339 }; 340 Ok(unsafe { rkyv::from_bytes_unchecked::<_, Error>(&raw).unwrap_unchecked() }) 341 } 342 343 pub fn get_counts(&self) -> impl Iterator<Item = AppResult<(SmolStr, NsidCounts)>> { 344 self.counts.iter().map(|res| { 345 res.map_err(AppError::from).map(|(key, val)| { 346 ( 347 SmolStr::new(unsafe { str::from_utf8_unchecked(&key) }), 348 unsafe { rkyv::from_bytes_unchecked::<_, Error>(&val).unwrap_unchecked() }, 349 ) 350 }) 351 }) 352 } 353 354 pub fn get_nsids(&self) -> impl Iterator<Item = StrView> { 355 self.ks 356 .list_partitions() 357 .into_iter() 358 .filter(|k| k.deref() != "_counts") 359 } 360 361 pub fn info(&self) -> AppResult<DbInfo> { 362 let mut nsids = HashMap::new(); 363 for nsid in self.get_nsids() { 364 let Some(handle) = self.get_handle(&nsid) else { 365 continue; 366 }; 367 let block_lens = handle.iter().rev().try_fold(Vec::new(), |mut acc, item| { 368 let (key, value) = item?; 369 let mut timestamps = Cursor::new(key); 370 let start_timestamp = timestamps.read_varint()?; 371 let decoder = ItemDecoder::new(Cursor::new(value), start_timestamp)?; 372 acc.push(decoder.item_count()); 373 AppResult::Ok(acc) 374 })?; 375 nsids.insert(nsid.to_smolstr(), block_lens); 376 } 377 Ok(DbInfo { 378 nsids, 379 disk_size: self.ks.disk_space(), 380 }) 381 } 382 383 pub fn get_hits( 384 &self, 385 nsid: &str, 386 range: impl RangeBounds<u64> + std::fmt::Debug, 387 ) -> impl Iterator<Item = AppResult<handle::Item>> { 388 let start_limit = match range.start_bound().cloned() { 389 Bound::Included(start) => start, 390 Bound::Excluded(start) => start.saturating_add(1), 391 Bound::Unbounded => 0, 392 }; 393 let end_limit = match range.end_bound().cloned() { 394 Bound::Included(end) => end, 395 Bound::Excluded(end) => end.saturating_sub(1), 396 Bound::Unbounded => u64::MAX, 397 }; 398 let end_key = varints_unsigned_encoded([end_limit]); 399 400 let Some(handle) = self.get_handle(nsid) else { 401 return Either::Right(std::iter::empty()); 402 }; 403 404 let map_block = move |(key, val)| { 405 let mut key_reader = Cursor::new(key); 406 let start_timestamp = key_reader.read_varint::<u64>()?; 407 if start_timestamp < start_limit { 408 return Ok(None); 409 } 410 let items = handle::ItemDecoder::new(Cursor::new(val), start_timestamp)? 411 .take_while(move |item| { 412 item.as_ref().map_or(true, |item| { 413 item.timestamp <= end_limit && item.timestamp >= start_limit 414 }) 415 }) 416 .map(|res| res.map_err(AppError::from)); 417 Ok(Some(items)) 418 }; 419 420 Either::Left( 421 handle 422 .range(..end_key) 423 .rev() 424 .map_while(move |res| res.map_err(AppError::from).and_then(map_block).transpose()) 425 .collect::<Vec<_>>() 426 .into_iter() 427 .rev() 428 .flatten() 429 .flatten(), 430 ) 431 } 432 433 pub fn tracking_since(&self) -> AppResult<u64> { 434 // HACK: we should actually store when we started tracking but im lazy 435 // this should be accurate enough 436 let Some(handle) = self.get_handle("app.bsky.feed.like") else { 437 return Ok(0); 438 }; 439 let Some((timestamps_raw, _)) = handle.first_key_value()? else { 440 return Ok(0); 441 }; 442 let mut timestamp_reader = Cursor::new(timestamps_raw); 443 timestamp_reader 444 .read_varint::<u64>() 445 .map_err(AppError::from) 446 } 447}