tracks lexicons and how many times they appeared on the jetstream
at main 8.0 kB view raw
1use std::{ 2 fmt::Debug, 3 io::Cursor, 4 ops::{Bound, RangeBounds}, 5 sync::atomic::{AtomicU64, Ordering as AtomicOrdering}, 6 time::Duration, 7}; 8 9use byteview::ByteView; 10use fjall::{Keyspace, Partition, PartitionCreateOptions, Slice, Snapshot}; 11use itertools::Itertools; 12use parking_lot::Mutex; 13use rayon::iter::{IntoParallelIterator, ParallelIterator}; 14use rclite::Arc; 15use smol_str::SmolStr; 16 17use crate::{ 18 db::{EventRecord, NsidHit, block}, 19 error::{AppError, AppResult}, 20 utils::{ 21 ArcRefCnt, ArcliteSwap, CLOCK, DefaultRateTracker, RateTracker, ReadVariableExt, 22 varints_unsigned_encoded, 23 }, 24}; 25 26pub type ItemDecoder = block::ItemDecoder<Cursor<Slice>, NsidHit>; 27pub type ItemEncoder = block::ItemEncoder<Vec<u8>, NsidHit>; 28pub type Item = block::Item<NsidHit>; 29 30pub struct Block { 31 pub written: usize, 32 pub key: ByteView, 33 pub data: Vec<u8>, 34} 35 36pub struct LexiconHandle { 37 write_tree: Partition, 38 read_tree: ArcliteSwap<Snapshot>, 39 nsid: SmolStr, 40 buf: Arc<Mutex<Vec<EventRecord>>>, 41 last_insert: AtomicU64, // relaxed 42 eps: DefaultRateTracker, 43} 44 45impl Debug for LexiconHandle { 46 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 47 f.debug_struct("LexiconHandle") 48 .field("nsid", self.nsid()) 49 .finish() 50 } 51} 52 53impl LexiconHandle { 54 pub fn new(keyspace: &Keyspace, nsid: &str) -> Self { 55 let opts = PartitionCreateOptions::default() 56 .block_size(1024 * 48) 57 .compression(fjall::CompressionType::Miniz(9)); 58 let write_tree = keyspace.open_partition(nsid, opts).unwrap(); 59 let read_tree = ArcliteSwap::new(ArcRefCnt::new(write_tree.snapshot())); 60 Self { 61 write_tree, 62 read_tree, 63 nsid: nsid.into(), 64 buf: Default::default(), 65 last_insert: AtomicU64::new(0), 66 eps: RateTracker::new(Duration::from_secs(10)), 67 } 68 } 69 70 #[inline(always)] 71 pub fn read(&self) -> arc_swap::Guard<ArcRefCnt<Snapshot>> { 72 self.read_tree.load() 73 } 74 75 #[inline(always)] 76 pub fn update_tree(&self) { 77 self.read_tree 78 .store(ArcRefCnt::new(self.write_tree.snapshot())); 79 } 80 81 #[inline(always)] 82 pub fn span(&self) -> tracing::Span { 83 tracing::info_span!("handle", nsid = %self.nsid) 84 } 85 86 #[inline(always)] 87 pub fn nsid(&self) -> &SmolStr { 88 &self.nsid 89 } 90 91 #[inline(always)] 92 pub fn item_count(&self) -> usize { 93 self.buf.lock().len() 94 } 95 96 pub fn since_last_activity(&self) -> Duration { 97 Duration::from_nanos( 98 CLOCK.delta_as_nanos(self.last_insert.load(AtomicOrdering::Relaxed), CLOCK.raw()), 99 ) 100 } 101 102 pub fn suggested_block_size(&self) -> usize { 103 self.eps.rate() as usize * 60 104 } 105 106 pub fn queue(&self, events: impl IntoIterator<Item = EventRecord>) { 107 let mut count = 0; 108 self.buf.lock().extend(events.into_iter().inspect(|_| { 109 count += 1; 110 })); 111 self.last_insert.store(CLOCK.raw(), AtomicOrdering::Relaxed); 112 self.eps.observe(count); 113 } 114 115 pub fn compact( 116 &self, 117 compact_to: usize, 118 range: impl RangeBounds<u64>, 119 sort: bool, 120 ) -> AppResult<()> { 121 let _span = self.span().entered(); 122 123 let start_limit = match range.start_bound().cloned() { 124 Bound::Included(start) => start, 125 Bound::Excluded(start) => start.saturating_add(1), 126 Bound::Unbounded => 0, 127 }; 128 let end_limit = match range.end_bound().cloned() { 129 Bound::Included(end) => end, 130 Bound::Excluded(end) => end.saturating_sub(1), 131 Bound::Unbounded => u64::MAX, 132 }; 133 134 let start_key = varints_unsigned_encoded([start_limit]); 135 let end_key = varints_unsigned_encoded([end_limit]); 136 137 let blocks_to_compact = self 138 .read() 139 .range(start_key..end_key) 140 .collect::<Result<Vec<_>, _>>()?; 141 if blocks_to_compact.len() < 2 { 142 return Ok(()); 143 } 144 145 let start_blocks_size = blocks_to_compact.len(); 146 let keys_to_delete = blocks_to_compact.iter().map(|(key, _)| key); 147 let mut all_items = 148 blocks_to_compact 149 .iter() 150 .try_fold(Vec::new(), |mut acc, (key, value)| { 151 let mut timestamps = Cursor::new(key); 152 let start_timestamp = timestamps.read_varint()?; 153 let decoder = block::ItemDecoder::new(Cursor::new(value), start_timestamp)?; 154 let mut items = decoder.collect::<Result<Vec<_>, _>>()?; 155 acc.append(&mut items); 156 AppResult::Ok(acc) 157 })?; 158 159 if sort { 160 all_items.sort_unstable_by_key(|e| e.timestamp); 161 } 162 163 let new_blocks = all_items 164 .into_iter() 165 .chunks(compact_to) 166 .into_iter() 167 .map(|chunk| chunk.collect_vec()) 168 .collect_vec() 169 .into_par_iter() 170 .map(|chunk| { 171 let count = chunk.len(); 172 Self::encode_block_from_items(chunk, count) 173 }) 174 .collect::<Result<Vec<_>, _>>()?; 175 let end_blocks_size = new_blocks.len(); 176 177 for key in keys_to_delete { 178 self.write_tree.remove(key.clone())?; 179 } 180 for block in new_blocks { 181 self.write_tree.insert(block.key, block.data)?; 182 } 183 184 let reduction = 185 ((start_blocks_size - end_blocks_size) as f64 / start_blocks_size as f64) * 100.0; 186 tracing::info!( 187 { 188 start = start_blocks_size, 189 end = end_blocks_size, 190 }, 191 "blocks compacted {reduction:.2}%", 192 ); 193 194 Ok(()) 195 } 196 197 pub fn insert_block(&self, block: Block) -> AppResult<()> { 198 self.write_tree 199 .insert(block.key, block.data) 200 .map_err(AppError::from) 201 } 202 203 pub fn encode_block_from_items( 204 items: impl IntoIterator<Item = Item>, 205 count: usize, 206 ) -> AppResult<Block> { 207 if count == 0 { 208 return Err(std::io::Error::new( 209 std::io::ErrorKind::InvalidInput, 210 "no items requested", 211 ) 212 .into()); 213 } 214 let mut writer = 215 ItemEncoder::new(Vec::with_capacity(ItemEncoder::encoded_len(count)), count); 216 let mut start_timestamp = None; 217 let mut end_timestamp = None; 218 let mut written = 0_usize; 219 for item in items.into_iter().take(count) { 220 writer.encode(&item)?; 221 if start_timestamp.is_none() { 222 start_timestamp = Some(item.timestamp); 223 } 224 end_timestamp = Some(item.timestamp); 225 written += 1; 226 } 227 if written != count { 228 return Err(std::io::Error::new( 229 std::io::ErrorKind::InvalidData, 230 "unexpected number of items, invalid data?", 231 ) 232 .into()); 233 } 234 if let (Some(start_timestamp), Some(end_timestamp)) = (start_timestamp, end_timestamp) { 235 let value = writer.finish()?; 236 let key = varints_unsigned_encoded([start_timestamp, end_timestamp]); 237 return Ok(Block { 238 written, 239 key, 240 data: value, 241 }); 242 } 243 Err(std::io::Error::new(std::io::ErrorKind::WriteZero, "no items are in queue").into()) 244 } 245 246 pub fn take_block_items(&self, item_count: usize) -> Vec<Item> { 247 let mut buf = self.buf.lock(); 248 let end = item_count.min(buf.len()); 249 buf.drain(..end) 250 .map(|event| { 251 Item::new( 252 event.timestamp, 253 &NsidHit { 254 deleted: event.deleted, 255 }, 256 ) 257 }) 258 .collect() 259 } 260}