tracks lexicons and how many times they appeared on the jetstream
at migrate 13 kB view raw
1use std::{ 2 io::Cursor, 3 ops::{Bound, Deref, RangeBounds}, 4 path::Path, 5 sync::{ 6 Arc, 7 atomic::{AtomicU64, AtomicUsize, Ordering as AtomicOrdering}, 8 }, 9 time::{Duration, Instant}, 10}; 11 12use fjall::{Config, Keyspace, Partition, PartitionCreateOptions, Slice}; 13use ordered_varint::Variable; 14use rkyv::{Archive, Deserialize, Serialize, rancor::Error}; 15use smol_str::SmolStr; 16use tokio::sync::broadcast; 17 18use crate::{ 19 db_old::block::{ReadVariableExt, WriteVariableExt}, 20 error::{AppError, AppResult}, 21 jetstream::JetstreamEvent, 22 utils::{DefaultRateTracker, get_time}, 23}; 24 25mod block; 26 27#[derive(Clone, Debug, Default, Archive, Deserialize, Serialize, PartialEq)] 28#[rkyv(compare(PartialEq), derive(Debug))] 29pub struct NsidCounts { 30 pub count: u128, 31 pub deleted_count: u128, 32 pub last_seen: u64, 33} 34 35#[derive(Debug, Default, Archive, Deserialize, Serialize, PartialEq)] 36#[rkyv(compare(PartialEq), derive(Debug))] 37pub struct NsidHit { 38 pub deleted: bool, 39} 40 41#[derive(Clone)] 42pub struct EventRecord { 43 pub nsid: SmolStr, 44 pub timestamp: u64, // seconds 45 pub deleted: bool, 46} 47 48impl EventRecord { 49 pub fn from_jetstream(event: JetstreamEvent) -> Option<Self> { 50 match event { 51 JetstreamEvent::Commit { 52 time_us, commit, .. 53 } => Some(Self { 54 nsid: commit.collection.into(), 55 timestamp: time_us / 1_000_000, 56 deleted: false, 57 }), 58 JetstreamEvent::Delete { 59 time_us, commit, .. 60 } => Some(Self { 61 nsid: commit.collection.into(), 62 timestamp: time_us / 1_000_000, 63 deleted: true, 64 }), 65 _ => None, 66 } 67 } 68} 69 70type ItemDecoder = block::ItemDecoder<Cursor<Slice>, NsidHit>; 71type ItemEncoder = block::ItemEncoder<Vec<u8>, NsidHit>; 72type Item = block::Item<NsidHit>; 73 74pub struct LexiconHandle { 75 tree: Partition, 76 buf: Arc<scc::Queue<EventRecord>>, 77 buf_len: AtomicUsize, 78 last_insert: AtomicU64, 79 eps: DefaultRateTracker, 80 block_size: AtomicUsize, 81} 82 83impl LexiconHandle { 84 fn new(keyspace: &Keyspace, nsid: &str) -> Self { 85 let opts = PartitionCreateOptions::default().compression(fjall::CompressionType::Miniz(9)); 86 Self { 87 tree: keyspace.open_partition(nsid, opts).unwrap(), 88 buf: Default::default(), 89 buf_len: AtomicUsize::new(0), 90 last_insert: AtomicU64::new(0), 91 eps: DefaultRateTracker::new(Duration::from_secs(5)), 92 block_size: AtomicUsize::new(1000), 93 } 94 } 95 96 fn item_count(&self) -> usize { 97 self.buf_len.load(AtomicOrdering::Acquire) 98 } 99 100 fn last_insert(&self) -> u64 { 101 self.last_insert.load(AtomicOrdering::Acquire) 102 } 103 104 fn suggested_block_size(&self) -> usize { 105 self.block_size.load(AtomicOrdering::Relaxed) 106 } 107 108 fn insert(&self, event: EventRecord) { 109 self.buf.push(event); 110 self.buf_len.fetch_add(1, AtomicOrdering::Release); 111 self.last_insert 112 .store(get_time().as_millis() as u64, AtomicOrdering::Release); 113 self.eps.observe(1); 114 let rate = self.eps.rate() as usize; 115 if rate != 0 { 116 self.block_size.store(rate * 60, AtomicOrdering::Relaxed); 117 } 118 } 119 120 fn sync(&self, max_block_size: usize) -> AppResult<usize> { 121 let mut writer = ItemEncoder::new(Vec::with_capacity( 122 size_of::<u64>() + self.item_count().min(max_block_size) * size_of::<(u64, NsidHit)>(), 123 )); 124 let mut start_timestamp = None; 125 let mut end_timestamp = None; 126 let mut written = 0_usize; 127 while let Some(event) = self.buf.pop() { 128 let item = Item::new( 129 event.timestamp, 130 &NsidHit { 131 deleted: event.deleted, 132 }, 133 ); 134 writer.encode(&item)?; 135 if start_timestamp.is_none() { 136 start_timestamp = Some(event.timestamp); 137 } 138 end_timestamp = Some(event.timestamp); 139 if written >= max_block_size { 140 break; 141 } 142 written += 1; 143 } 144 if let (Some(start_timestamp), Some(end_timestamp)) = (start_timestamp, end_timestamp) { 145 self.buf_len.store(0, AtomicOrdering::Release); 146 let value = writer.finish()?; 147 let mut key = Vec::with_capacity(size_of::<u64>() * 2); 148 key.write_varint(start_timestamp)?; 149 key.write_varint(end_timestamp)?; 150 self.tree.insert(key, value)?; 151 } 152 Ok(written) 153 } 154} 155 156type BoxedIter<T> = Box<dyn Iterator<Item = T>>; 157 158// counts is nsid -> NsidCounts 159// hits is tree per nsid: varint start time + varint end time -> block of hits 160pub struct Db { 161 inner: Keyspace, 162 hits: scc::HashIndex<SmolStr, Arc<LexiconHandle>>, 163 counts: Partition, 164 event_broadcaster: broadcast::Sender<(SmolStr, NsidCounts)>, 165 eps: DefaultRateTracker, 166 min_block_size: usize, 167 max_block_size: usize, 168 max_last_activity: Duration, 169} 170 171impl Db { 172 pub fn new(path: impl AsRef<Path>) -> AppResult<Self> { 173 tracing::info!("opening db..."); 174 let ks = Config::new(path) 175 .cache_size(8 * 1024 * 1024) // from talna 176 .open()?; 177 Ok(Self { 178 hits: Default::default(), 179 counts: ks.open_partition( 180 "_counts", 181 PartitionCreateOptions::default().compression(fjall::CompressionType::None), 182 )?, 183 inner: ks, 184 event_broadcaster: broadcast::channel(1000).0, 185 eps: DefaultRateTracker::new(Duration::from_secs(1)), 186 min_block_size: 512, 187 max_block_size: 100_000, 188 max_last_activity: Duration::from_secs(10), 189 }) 190 } 191 192 pub fn sync(&self, all: bool) -> AppResult<()> { 193 let _guard = scc::ebr::Guard::new(); 194 for (nsid, tree) in self.hits.iter(&_guard) { 195 let count = tree.item_count(); 196 let is_max_block_size = count > self.min_block_size.max(tree.suggested_block_size()); 197 let is_too_old = (get_time().as_millis() as u64 - tree.last_insert()) 198 > self.max_last_activity.as_millis() as u64; 199 if count > 0 && (all || is_max_block_size || is_too_old) { 200 loop { 201 let synced = tree.sync(self.max_block_size)?; 202 if synced == 0 { 203 break; 204 } 205 tracing::info!("synced {synced} of {nsid} to db"); 206 } 207 } 208 } 209 Ok(()) 210 } 211 212 #[inline(always)] 213 pub fn eps(&self) -> usize { 214 self.eps.rate() as usize 215 } 216 217 #[inline(always)] 218 pub fn new_listener(&self) -> broadcast::Receiver<(SmolStr, NsidCounts)> { 219 self.event_broadcaster.subscribe() 220 } 221 222 #[inline(always)] 223 fn maybe_run_in_nsid_tree<T>( 224 &self, 225 nsid: &str, 226 f: impl FnOnce(&LexiconHandle) -> T, 227 ) -> Option<T> { 228 let _guard = scc::ebr::Guard::new(); 229 let handle = match self.hits.peek(nsid, &_guard) { 230 Some(handle) => handle.clone(), 231 None => { 232 if self.inner.partition_exists(nsid) { 233 let handle = Arc::new(LexiconHandle::new(&self.inner, nsid)); 234 let _ = self.hits.insert(SmolStr::new(nsid), handle.clone()); 235 handle 236 } else { 237 return None; 238 } 239 } 240 }; 241 Some(f(&handle)) 242 } 243 244 #[inline(always)] 245 fn run_in_nsid_tree<T>( 246 &self, 247 nsid: SmolStr, 248 f: impl FnOnce(&LexiconHandle) -> AppResult<T>, 249 ) -> AppResult<T> { 250 f(self 251 .hits 252 .entry(nsid.clone()) 253 .or_insert_with(move || Arc::new(LexiconHandle::new(&self.inner, &nsid))) 254 .get()) 255 } 256 257 pub fn record_event(&self, e: EventRecord) -> AppResult<()> { 258 let EventRecord { 259 nsid, 260 timestamp, 261 deleted, 262 } = e.clone(); 263 264 // insert event 265 self.run_in_nsid_tree(nsid.clone(), move |tree| Ok(tree.insert(e)))?; 266 // increment count 267 let mut counts = self.get_count(&nsid)?; 268 counts.last_seen = timestamp; 269 if deleted { 270 counts.deleted_count += 1; 271 } else { 272 counts.count += 1; 273 } 274 self.insert_count(&nsid, counts.clone())?; 275 if self.event_broadcaster.receiver_count() > 0 { 276 let _ = self.event_broadcaster.send((SmolStr::new(&nsid), counts)); 277 } 278 self.eps.observe(1); 279 Ok(()) 280 } 281 282 #[inline(always)] 283 fn insert_count(&self, nsid: &str, counts: NsidCounts) -> AppResult<()> { 284 self.counts 285 .insert( 286 nsid, 287 unsafe { rkyv::to_bytes::<Error>(&counts).unwrap_unchecked() }.as_slice(), 288 ) 289 .map_err(AppError::from) 290 } 291 292 pub fn get_count(&self, nsid: &str) -> AppResult<NsidCounts> { 293 let Some(raw) = self.counts.get(nsid)? else { 294 return Ok(NsidCounts::default()); 295 }; 296 Ok(unsafe { rkyv::from_bytes_unchecked::<_, Error>(&raw).unwrap_unchecked() }) 297 } 298 299 pub fn get_counts(&self) -> impl Iterator<Item = AppResult<(SmolStr, NsidCounts)>> { 300 self.counts.iter().map(|res| { 301 res.map_err(AppError::from).map(|(key, val)| { 302 ( 303 SmolStr::new(unsafe { str::from_utf8_unchecked(&key) }), 304 unsafe { rkyv::from_bytes_unchecked::<_, Error>(&val).unwrap_unchecked() }, 305 ) 306 }) 307 }) 308 } 309 310 pub fn get_nsids(&self) -> impl Iterator<Item = impl Deref<Target = str> + 'static> { 311 self.inner 312 .list_partitions() 313 .into_iter() 314 .filter(|k| k.deref() != "_counts") 315 } 316 317 pub fn get_hits_debug(&self, nsid: &str) -> BoxedIter<AppResult<(Slice, Slice)>> { 318 self.maybe_run_in_nsid_tree(nsid, |handle| -> BoxedIter<AppResult<(Slice, Slice)>> { 319 Box::new( 320 handle 321 .tree 322 .iter() 323 .rev() 324 .map(|res| res.map_err(AppError::from)), 325 ) 326 }) 327 .unwrap_or_else(|| Box::new(std::iter::empty())) 328 } 329 330 pub fn get_hits( 331 &self, 332 nsid: &str, 333 range: impl RangeBounds<u64> + std::fmt::Debug, 334 ) -> BoxedIter<AppResult<Item>> { 335 let start = range 336 .start_bound() 337 .cloned() 338 .map(|t| unsafe { t.to_variable_vec().unwrap_unchecked() }); 339 let end = range 340 .end_bound() 341 .cloned() 342 .map(|t| unsafe { t.to_variable_vec().unwrap_unchecked() }); 343 let limit = match range.end_bound().cloned() { 344 Bound::Included(end) => end, 345 Bound::Excluded(end) => end.saturating_sub(1), 346 Bound::Unbounded => u64::MAX, 347 }; 348 349 self.maybe_run_in_nsid_tree(nsid, move |handle| -> BoxedIter<AppResult<Item>> { 350 let map_block = move |(key, val)| { 351 let mut key_reader = Cursor::new(key); 352 let start_timestamp = key_reader.read_varint::<u64>()?; 353 let items = 354 ItemDecoder::new(Cursor::new(val), start_timestamp)?.take_while(move |item| { 355 item.as_ref().map_or(true, |item| item.timestamp <= limit) 356 }); 357 Ok(items) 358 }; 359 360 Box::new( 361 handle 362 .tree 363 .range(TimestampRange { start, end }) 364 .map(move |res| res.map_err(AppError::from).and_then(map_block)) 365 .flatten() 366 .flatten(), 367 ) 368 }) 369 .unwrap_or_else(|| Box::new(std::iter::empty())) 370 } 371 372 pub fn tracking_since(&self) -> AppResult<u64> { 373 // HACK: we should actually store when we started tracking but im lazy 374 // should be accurate enough 375 self.maybe_run_in_nsid_tree("app.bsky.feed.like", |handle| { 376 let Some((timestamps_raw, _)) = handle.tree.first_key_value()? else { 377 return Ok(0); 378 }; 379 let mut timestamp_reader = Cursor::new(timestamps_raw); 380 timestamp_reader 381 .read_varint::<u64>() 382 .map_err(AppError::from) 383 }) 384 .unwrap_or(Ok(0)) 385 } 386} 387 388type TimestampRepr = Vec<u8>; 389 390struct TimestampRange { 391 start: Bound<TimestampRepr>, 392 end: Bound<TimestampRepr>, 393} 394 395impl RangeBounds<TimestampRepr> for TimestampRange { 396 #[inline(always)] 397 fn start_bound(&self) -> Bound<&TimestampRepr> { 398 self.start.as_ref() 399 } 400 401 #[inline(always)] 402 fn end_bound(&self) -> Bound<&TimestampRepr> { 403 self.end.as_ref() 404 } 405} 406 407type TimestampReprOld = [u8; 8]; 408 409struct TimestampRangeOld { 410 start: Bound<TimestampReprOld>, 411 end: Bound<TimestampReprOld>, 412} 413 414impl RangeBounds<TimestampReprOld> for TimestampRangeOld { 415 #[inline(always)] 416 fn start_bound(&self) -> Bound<&TimestampReprOld> { 417 self.start.as_ref() 418 } 419 420 #[inline(always)] 421 fn end_bound(&self) -> Bound<&TimestampReprOld> { 422 self.end.as_ref() 423 } 424}