tracks lexicons and how many times they appeared on the jetstream
1use std::{
2 fmt::Debug,
3 io::Cursor,
4 ops::{Bound, Deref, RangeBounds},
5 sync::atomic::{AtomicU64, Ordering as AtomicOrdering},
6 time::Duration,
7};
8
9use byteview::ByteView;
10use fjall::{Keyspace, Partition, PartitionCreateOptions, Slice};
11use itertools::Itertools;
12use parking_lot::Mutex;
13use rayon::iter::{IntoParallelIterator, ParallelIterator};
14use rclite::Arc;
15use smol_str::SmolStr;
16
17use crate::{
18 db::{EventRecord, NsidHit, block},
19 error::AppResult,
20 utils::{CLOCK, DefaultRateTracker, RateTracker, ReadVariableExt, varints_unsigned_encoded},
21};
22
23pub type ItemDecoder = block::ItemDecoder<Cursor<Slice>, NsidHit>;
24pub type ItemEncoder = block::ItemEncoder<Vec<u8>, NsidHit>;
25pub type Item = block::Item<NsidHit>;
26
27pub struct Block {
28 pub written: usize,
29 pub key: ByteView,
30 pub data: Vec<u8>,
31}
32
33pub struct LexiconHandle {
34 tree: Partition,
35 nsid: SmolStr,
36 buf: Arc<Mutex<Vec<EventRecord>>>,
37 last_insert: AtomicU64, // relaxed
38 eps: DefaultRateTracker,
39}
40
41impl Debug for LexiconHandle {
42 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43 f.debug_struct("LexiconHandle")
44 .field("nsid", self.nsid())
45 .finish()
46 }
47}
48
49impl Deref for LexiconHandle {
50 type Target = Partition;
51
52 fn deref(&self) -> &Self::Target {
53 &self.tree
54 }
55}
56
57impl LexiconHandle {
58 pub fn new(keyspace: &Keyspace, nsid: &str) -> Self {
59 let opts = PartitionCreateOptions::default()
60 .block_size(1024 * 128)
61 .compression(fjall::CompressionType::Miniz(9));
62 Self {
63 tree: keyspace.open_partition(nsid, opts).unwrap(),
64 nsid: nsid.into(),
65 buf: Default::default(),
66 last_insert: AtomicU64::new(0),
67 eps: RateTracker::new(Duration::from_secs(10)),
68 }
69 }
70
71 pub fn nsid(&self) -> &SmolStr {
72 &self.nsid
73 }
74
75 pub fn item_count(&self) -> usize {
76 self.buf.lock().len()
77 }
78
79 pub fn since_last_activity(&self) -> u64 {
80 CLOCK.delta_as_nanos(self.last_insert.load(AtomicOrdering::Relaxed), CLOCK.raw())
81 }
82
83 pub fn suggested_block_size(&self) -> usize {
84 self.eps.rate() as usize * 60
85 }
86
87 pub fn queue(&self, events: impl IntoIterator<Item = EventRecord>) {
88 let mut count = 0;
89 self.buf.lock().extend(events.into_iter().inspect(|_| {
90 count += 1;
91 }));
92 self.last_insert.store(CLOCK.raw(), AtomicOrdering::Relaxed);
93 self.eps.observe(count);
94 }
95
96 pub fn compact(
97 &self,
98 compact_to: usize,
99 range: impl RangeBounds<u64>,
100 sort: bool,
101 ) -> AppResult<()> {
102 let start_limit = match range.start_bound().cloned() {
103 Bound::Included(start) => start,
104 Bound::Excluded(start) => start.saturating_add(1),
105 Bound::Unbounded => 0,
106 };
107 let end_limit = match range.end_bound().cloned() {
108 Bound::Included(end) => end,
109 Bound::Excluded(end) => end.saturating_sub(1),
110 Bound::Unbounded => u64::MAX,
111 };
112
113 let start_key = varints_unsigned_encoded([start_limit]);
114 let end_key = varints_unsigned_encoded([end_limit]);
115
116 let blocks_to_compact = self
117 .tree
118 .range(start_key..end_key)
119 .collect::<Result<Vec<_>, _>>()?;
120 if blocks_to_compact.len() < 2 {
121 tracing::info!("{}: nothing to compact", self.nsid);
122 return Ok(());
123 }
124
125 let start_blocks_size = blocks_to_compact.len();
126 let keys_to_delete = blocks_to_compact.iter().map(|(key, _)| key);
127 let mut all_items =
128 blocks_to_compact
129 .iter()
130 .try_fold(Vec::new(), |mut acc, (key, value)| {
131 let mut timestamps = Cursor::new(key);
132 let start_timestamp = timestamps.read_varint()?;
133 let decoder = block::ItemDecoder::new(Cursor::new(value), start_timestamp)?;
134 let mut items = decoder.collect::<Result<Vec<_>, _>>()?;
135 acc.append(&mut items);
136 AppResult::Ok(acc)
137 })?;
138
139 if sort {
140 all_items.sort_unstable_by_key(|e| e.timestamp);
141 }
142
143 let new_blocks = all_items
144 .into_iter()
145 .chunks(compact_to)
146 .into_iter()
147 .map(|chunk| chunk.collect_vec())
148 .collect_vec()
149 .into_par_iter()
150 .map(|chunk| {
151 let count = chunk.len();
152 Self::encode_block_from_items(chunk, count)
153 })
154 .collect::<Result<Vec<_>, _>>()?;
155 let end_blocks_size = new_blocks.len();
156
157 for key in keys_to_delete {
158 self.tree.remove(key.clone())?;
159 }
160 for block in new_blocks {
161 self.tree.insert(block.key, block.data)?;
162 }
163
164 tracing::info!(
165 "{}: compacted {} blocks to {} blocks ({}% reduction)",
166 self.nsid,
167 start_blocks_size,
168 end_blocks_size,
169 ((start_blocks_size - end_blocks_size) as f64 / start_blocks_size as f64) * 100.0,
170 );
171
172 Ok(())
173 }
174
175 pub fn encode_block_from_items(
176 items: impl IntoIterator<Item = Item>,
177 count: usize,
178 ) -> AppResult<Block> {
179 if count == 0 {
180 return Err(std::io::Error::new(
181 std::io::ErrorKind::InvalidInput,
182 "no items requested",
183 )
184 .into());
185 }
186 let mut writer =
187 ItemEncoder::new(Vec::with_capacity(ItemEncoder::encoded_len(count)), count);
188 let mut start_timestamp = None;
189 let mut end_timestamp = None;
190 let mut written = 0_usize;
191 for item in items.into_iter().take(count) {
192 writer.encode(&item)?;
193 if start_timestamp.is_none() {
194 start_timestamp = Some(item.timestamp);
195 }
196 end_timestamp = Some(item.timestamp);
197 written += 1;
198 }
199 if written != count {
200 return Err(std::io::Error::new(
201 std::io::ErrorKind::InvalidData,
202 "unexpected number of items, invalid data?",
203 )
204 .into());
205 }
206 if let (Some(start_timestamp), Some(end_timestamp)) = (start_timestamp, end_timestamp) {
207 let value = writer.finish()?;
208 let key = varints_unsigned_encoded([start_timestamp, end_timestamp]);
209 return Ok(Block {
210 written,
211 key,
212 data: value,
213 });
214 }
215 Err(std::io::Error::new(std::io::ErrorKind::WriteZero, "no items are in queue").into())
216 }
217
218 pub fn take_block_items(&self, item_count: usize) -> Vec<Item> {
219 let mut buf = self.buf.lock();
220 let end = item_count.min(buf.len());
221 buf.drain(..end)
222 .map(|event| {
223 Item::new(
224 event.timestamp,
225 &NsidHit {
226 deleted: event.deleted,
227 },
228 )
229 })
230 .collect()
231 }
232}