tracks lexicons and how many times they appeared on the jetstream
1use std::{
2 fmt::Debug,
3 io::Cursor,
4 ops::{Bound, RangeBounds},
5 sync::atomic::{AtomicU64, Ordering as AtomicOrdering},
6 time::Duration,
7};
8
9use byteview::ByteView;
10use fjall::{Keyspace, Partition, PartitionCreateOptions, Slice, Snapshot};
11use itertools::Itertools;
12use parking_lot::Mutex;
13use rayon::iter::{IntoParallelIterator, ParallelIterator};
14use rclite::Arc;
15use smol_str::SmolStr;
16
17use crate::{
18 db::{EventRecord, NsidHit, block},
19 error::{AppError, AppResult},
20 utils::{
21 ArcRefCnt, ArcliteSwap, CLOCK, DefaultRateTracker, RateTracker, ReadVariableExt,
22 varints_unsigned_encoded,
23 },
24};
25
26pub type ItemDecoder = block::ItemDecoder<Cursor<Slice>, NsidHit>;
27pub type ItemEncoder = block::ItemEncoder<Vec<u8>, NsidHit>;
28pub type Item = block::Item<NsidHit>;
29
30pub struct Block {
31 pub written: usize,
32 pub key: ByteView,
33 pub data: Vec<u8>,
34}
35
36pub struct LexiconHandle {
37 write_tree: Partition,
38 read_tree: ArcliteSwap<Snapshot>,
39 nsid: SmolStr,
40 buf: Arc<Mutex<Vec<EventRecord>>>,
41 last_insert: AtomicU64, // relaxed
42 eps: DefaultRateTracker,
43}
44
45impl Debug for LexiconHandle {
46 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47 f.debug_struct("LexiconHandle")
48 .field("nsid", self.nsid())
49 .finish()
50 }
51}
52
53impl LexiconHandle {
54 pub fn new(keyspace: &Keyspace, nsid: &str) -> Self {
55 let opts = PartitionCreateOptions::default()
56 .block_size(1024 * 48)
57 .compression(fjall::CompressionType::Miniz(9));
58 let write_tree = keyspace.open_partition(nsid, opts).unwrap();
59 let read_tree = ArcliteSwap::new(ArcRefCnt::new(write_tree.snapshot()));
60 Self {
61 write_tree,
62 read_tree,
63 nsid: nsid.into(),
64 buf: Default::default(),
65 last_insert: AtomicU64::new(0),
66 eps: RateTracker::new(Duration::from_secs(10)),
67 }
68 }
69
70 #[inline(always)]
71 pub fn read(&self) -> arc_swap::Guard<ArcRefCnt<Snapshot>> {
72 self.read_tree.load()
73 }
74
75 #[inline(always)]
76 pub fn update_tree(&self) {
77 self.read_tree
78 .store(ArcRefCnt::new(self.write_tree.snapshot()));
79 }
80
81 #[inline(always)]
82 pub fn span(&self) -> tracing::Span {
83 tracing::info_span!("handle", nsid = %self.nsid)
84 }
85
86 #[inline(always)]
87 pub fn nsid(&self) -> &SmolStr {
88 &self.nsid
89 }
90
91 #[inline(always)]
92 pub fn item_count(&self) -> usize {
93 self.buf.lock().len()
94 }
95
96 pub fn since_last_activity(&self) -> Duration {
97 Duration::from_nanos(
98 CLOCK.delta_as_nanos(self.last_insert.load(AtomicOrdering::Relaxed), CLOCK.raw()),
99 )
100 }
101
102 pub fn suggested_block_size(&self) -> usize {
103 self.eps.rate() as usize * 60
104 }
105
106 pub fn queue(&self, events: impl IntoIterator<Item = EventRecord>) {
107 let mut count = 0;
108 self.buf.lock().extend(events.into_iter().inspect(|_| {
109 count += 1;
110 }));
111 self.last_insert.store(CLOCK.raw(), AtomicOrdering::Relaxed);
112 self.eps.observe(count);
113 }
114
115 pub fn compact(
116 &self,
117 compact_to: usize,
118 range: impl RangeBounds<u64>,
119 sort: bool,
120 ) -> AppResult<()> {
121 let _span = self.span().entered();
122
123 let start_limit = match range.start_bound().cloned() {
124 Bound::Included(start) => start,
125 Bound::Excluded(start) => start.saturating_add(1),
126 Bound::Unbounded => 0,
127 };
128 let end_limit = match range.end_bound().cloned() {
129 Bound::Included(end) => end,
130 Bound::Excluded(end) => end.saturating_sub(1),
131 Bound::Unbounded => u64::MAX,
132 };
133
134 let start_key = varints_unsigned_encoded([start_limit]);
135 let end_key = varints_unsigned_encoded([end_limit]);
136
137 let blocks_to_compact = self
138 .read()
139 .range(start_key..end_key)
140 .collect::<Result<Vec<_>, _>>()?;
141 if blocks_to_compact.len() < 2 {
142 return Ok(());
143 }
144
145 let start_blocks_size = blocks_to_compact.len();
146 let keys_to_delete = blocks_to_compact.iter().map(|(key, _)| key);
147 let mut all_items =
148 blocks_to_compact
149 .iter()
150 .try_fold(Vec::new(), |mut acc, (key, value)| {
151 let mut timestamps = Cursor::new(key);
152 let start_timestamp = timestamps.read_varint()?;
153 let decoder = block::ItemDecoder::new(Cursor::new(value), start_timestamp)?;
154 let mut items = decoder.collect::<Result<Vec<_>, _>>()?;
155 acc.append(&mut items);
156 AppResult::Ok(acc)
157 })?;
158
159 if sort {
160 all_items.sort_unstable_by_key(|e| e.timestamp);
161 }
162
163 let new_blocks = all_items
164 .into_iter()
165 .chunks(compact_to)
166 .into_iter()
167 .map(|chunk| chunk.collect_vec())
168 .collect_vec()
169 .into_par_iter()
170 .map(|chunk| {
171 let count = chunk.len();
172 Self::encode_block_from_items(chunk, count)
173 })
174 .collect::<Result<Vec<_>, _>>()?;
175 let end_blocks_size = new_blocks.len();
176
177 for key in keys_to_delete {
178 self.write_tree.remove(key.clone())?;
179 }
180 for block in new_blocks {
181 self.write_tree.insert(block.key, block.data)?;
182 }
183
184 let reduction =
185 ((start_blocks_size - end_blocks_size) as f64 / start_blocks_size as f64) * 100.0;
186 tracing::info!(
187 {
188 start = start_blocks_size,
189 end = end_blocks_size,
190 },
191 "blocks compacted {reduction:.2}%",
192 );
193
194 Ok(())
195 }
196
197 pub fn insert_block(&self, block: Block) -> AppResult<()> {
198 self.write_tree
199 .insert(block.key, block.data)
200 .map_err(AppError::from)
201 }
202
203 pub fn encode_block_from_items(
204 items: impl IntoIterator<Item = Item>,
205 count: usize,
206 ) -> AppResult<Block> {
207 if count == 0 {
208 return Err(std::io::Error::new(
209 std::io::ErrorKind::InvalidInput,
210 "no items requested",
211 )
212 .into());
213 }
214 let mut writer =
215 ItemEncoder::new(Vec::with_capacity(ItemEncoder::encoded_len(count)), count);
216 let mut start_timestamp = None;
217 let mut end_timestamp = None;
218 let mut written = 0_usize;
219 for item in items.into_iter().take(count) {
220 writer.encode(&item)?;
221 if start_timestamp.is_none() {
222 start_timestamp = Some(item.timestamp);
223 }
224 end_timestamp = Some(item.timestamp);
225 written += 1;
226 }
227 if written != count {
228 return Err(std::io::Error::new(
229 std::io::ErrorKind::InvalidData,
230 "unexpected number of items, invalid data?",
231 )
232 .into());
233 }
234 if let (Some(start_timestamp), Some(end_timestamp)) = (start_timestamp, end_timestamp) {
235 let value = writer.finish()?;
236 let key = varints_unsigned_encoded([start_timestamp, end_timestamp]);
237 return Ok(Block {
238 written,
239 key,
240 data: value,
241 });
242 }
243 Err(std::io::Error::new(std::io::ErrorKind::WriteZero, "no items are in queue").into())
244 }
245
246 pub fn take_block_items(&self, item_count: usize) -> Vec<Item> {
247 let mut buf = self.buf.lock();
248 let end = item_count.min(buf.len());
249 buf.drain(..end)
250 .map(|event| {
251 Item::new(
252 event.timestamp,
253 &NsidHit {
254 deleted: event.deleted,
255 },
256 )
257 })
258 .collect()
259 }
260}