tracks lexicons and how many times they appeared on the jetstream
at migrate 7.2 kB view raw
1use std::{ 2 fmt::Debug, 3 io::Cursor, 4 ops::{Bound, Deref, RangeBounds}, 5 sync::atomic::{AtomicU64, Ordering as AtomicOrdering}, 6 time::Duration, 7}; 8 9use byteview::ByteView; 10use fjall::{Keyspace, Partition, PartitionCreateOptions, Slice}; 11use itertools::Itertools; 12use parking_lot::Mutex; 13use rayon::iter::{IntoParallelIterator, ParallelIterator}; 14use rclite::Arc; 15use smol_str::SmolStr; 16 17use crate::{ 18 db::{EventRecord, NsidHit, block}, 19 error::AppResult, 20 utils::{CLOCK, DefaultRateTracker, RateTracker, ReadVariableExt, varints_unsigned_encoded}, 21}; 22 23pub type ItemDecoder = block::ItemDecoder<Cursor<Slice>, NsidHit>; 24pub type ItemEncoder = block::ItemEncoder<Vec<u8>, NsidHit>; 25pub type Item = block::Item<NsidHit>; 26 27pub struct Block { 28 pub written: usize, 29 pub key: ByteView, 30 pub data: Vec<u8>, 31} 32 33pub struct LexiconHandle { 34 tree: Partition, 35 nsid: SmolStr, 36 buf: Arc<Mutex<Vec<EventRecord>>>, 37 last_insert: AtomicU64, // relaxed 38 eps: DefaultRateTracker, 39} 40 41impl Debug for LexiconHandle { 42 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 43 f.debug_struct("LexiconHandle") 44 .field("nsid", self.nsid()) 45 .finish() 46 } 47} 48 49impl Deref for LexiconHandle { 50 type Target = Partition; 51 52 fn deref(&self) -> &Self::Target { 53 &self.tree 54 } 55} 56 57impl LexiconHandle { 58 pub fn new(keyspace: &Keyspace, nsid: &str) -> Self { 59 let opts = PartitionCreateOptions::default() 60 .block_size(1024 * 128) 61 .compression(fjall::CompressionType::Miniz(9)); 62 Self { 63 tree: keyspace.open_partition(nsid, opts).unwrap(), 64 nsid: nsid.into(), 65 buf: Default::default(), 66 last_insert: AtomicU64::new(0), 67 eps: RateTracker::new(Duration::from_secs(10)), 68 } 69 } 70 71 pub fn nsid(&self) -> &SmolStr { 72 &self.nsid 73 } 74 75 pub fn item_count(&self) -> usize { 76 self.buf.lock().len() 77 } 78 79 pub fn since_last_activity(&self) -> u64 { 80 CLOCK.delta_as_nanos(self.last_insert.load(AtomicOrdering::Relaxed), CLOCK.raw()) 81 } 82 83 pub fn suggested_block_size(&self) -> usize { 84 self.eps.rate() as usize * 60 85 } 86 87 pub fn queue(&self, events: impl IntoIterator<Item = EventRecord>) { 88 let mut count = 0; 89 self.buf.lock().extend(events.into_iter().inspect(|_| { 90 count += 1; 91 })); 92 self.last_insert.store(CLOCK.raw(), AtomicOrdering::Relaxed); 93 self.eps.observe(count); 94 } 95 96 pub fn compact( 97 &self, 98 compact_to: usize, 99 range: impl RangeBounds<u64>, 100 sort: bool, 101 ) -> AppResult<()> { 102 let start_limit = match range.start_bound().cloned() { 103 Bound::Included(start) => start, 104 Bound::Excluded(start) => start.saturating_add(1), 105 Bound::Unbounded => 0, 106 }; 107 let end_limit = match range.end_bound().cloned() { 108 Bound::Included(end) => end, 109 Bound::Excluded(end) => end.saturating_sub(1), 110 Bound::Unbounded => u64::MAX, 111 }; 112 113 let start_key = varints_unsigned_encoded([start_limit]); 114 let end_key = varints_unsigned_encoded([end_limit]); 115 116 let blocks_to_compact = self 117 .tree 118 .range(start_key..end_key) 119 .collect::<Result<Vec<_>, _>>()?; 120 if blocks_to_compact.len() < 2 { 121 tracing::info!("{}: nothing to compact", self.nsid); 122 return Ok(()); 123 } 124 125 let start_blocks_size = blocks_to_compact.len(); 126 let keys_to_delete = blocks_to_compact.iter().map(|(key, _)| key); 127 let mut all_items = 128 blocks_to_compact 129 .iter() 130 .try_fold(Vec::new(), |mut acc, (key, value)| { 131 let mut timestamps = Cursor::new(key); 132 let start_timestamp = timestamps.read_varint()?; 133 let decoder = block::ItemDecoder::new(Cursor::new(value), start_timestamp)?; 134 let mut items = decoder.collect::<Result<Vec<_>, _>>()?; 135 acc.append(&mut items); 136 AppResult::Ok(acc) 137 })?; 138 139 if sort { 140 all_items.sort_unstable_by_key(|e| e.timestamp); 141 } 142 143 let new_blocks = all_items 144 .into_iter() 145 .chunks(compact_to) 146 .into_iter() 147 .map(|chunk| chunk.collect_vec()) 148 .collect_vec() 149 .into_par_iter() 150 .map(|chunk| { 151 let count = chunk.len(); 152 Self::encode_block_from_items(chunk, count) 153 }) 154 .collect::<Result<Vec<_>, _>>()?; 155 let end_blocks_size = new_blocks.len(); 156 157 for key in keys_to_delete { 158 self.tree.remove(key.clone())?; 159 } 160 for block in new_blocks { 161 self.tree.insert(block.key, block.data)?; 162 } 163 164 tracing::info!( 165 "{}: compacted {} blocks to {} blocks ({}% reduction)", 166 self.nsid, 167 start_blocks_size, 168 end_blocks_size, 169 ((start_blocks_size - end_blocks_size) as f64 / start_blocks_size as f64) * 100.0, 170 ); 171 172 Ok(()) 173 } 174 175 pub fn encode_block_from_items( 176 items: impl IntoIterator<Item = Item>, 177 count: usize, 178 ) -> AppResult<Block> { 179 if count == 0 { 180 return Err(std::io::Error::new( 181 std::io::ErrorKind::InvalidInput, 182 "no items requested", 183 ) 184 .into()); 185 } 186 let mut writer = 187 ItemEncoder::new(Vec::with_capacity(ItemEncoder::encoded_len(count)), count); 188 let mut start_timestamp = None; 189 let mut end_timestamp = None; 190 let mut written = 0_usize; 191 for item in items.into_iter().take(count) { 192 writer.encode(&item)?; 193 if start_timestamp.is_none() { 194 start_timestamp = Some(item.timestamp); 195 } 196 end_timestamp = Some(item.timestamp); 197 written += 1; 198 } 199 if written != count { 200 return Err(std::io::Error::new( 201 std::io::ErrorKind::InvalidData, 202 "unexpected number of items, invalid data?", 203 ) 204 .into()); 205 } 206 if let (Some(start_timestamp), Some(end_timestamp)) = (start_timestamp, end_timestamp) { 207 let value = writer.finish()?; 208 let key = varints_unsigned_encoded([start_timestamp, end_timestamp]); 209 return Ok(Block { 210 written, 211 key, 212 data: value, 213 }); 214 } 215 Err(std::io::Error::new(std::io::ErrorKind::WriteZero, "no items are in queue").into()) 216 } 217 218 pub fn take_block_items(&self, item_count: usize) -> Vec<Item> { 219 let mut buf = self.buf.lock(); 220 let end = item_count.min(buf.len()); 221 buf.drain(..end) 222 .map(|event| { 223 Item::new( 224 event.timestamp, 225 &NsidHit { 226 deleted: event.deleted, 227 }, 228 ) 229 }) 230 .collect() 231 } 232}