tracks lexicons and how many times they appeared on the jetstream
1use ordered_varint::Variable;
2use rkyv::{
3 Archive, Deserialize, Serialize,
4 api::high::{HighSerializer, HighValidator},
5 bytecheck::CheckBytes,
6 de::Pool,
7 rancor::{self, Strategy},
8 ser::allocator::ArenaHandle,
9 util::AlignedVec,
10};
11use std::{
12 io::{self, Read, Write},
13 marker::PhantomData,
14};
15
16use crate::error::{AppError, AppResult};
17
18pub struct Item<T> {
19 pub timestamp: u64,
20 data: AlignedVec,
21 phantom: PhantomData<T>,
22}
23
24impl<T: Archive> Item<T> {
25 pub fn access(&self) -> &T::Archived {
26 unsafe { rkyv::access_unchecked::<T::Archived>(&self.data) }
27 }
28}
29
30impl<T> Item<T>
31where
32 T: Archive,
33 T::Archived: for<'a> CheckBytes<HighValidator<'a, rancor::Error>>
34 + Deserialize<T, Strategy<Pool, rancor::Error>>,
35{
36 pub fn deser(&self) -> AppResult<T> {
37 rkyv::from_bytes(&self.data).map_err(AppError::from)
38 }
39}
40
41impl<T: for<'a> Serialize<HighSerializer<AlignedVec, ArenaHandle<'a>, rancor::Error>>> Item<T> {
42 pub fn new(timestamp: u64, data: &T) -> Self {
43 Item {
44 timestamp,
45 data: unsafe { rkyv::to_bytes(data).unwrap_unchecked() },
46 phantom: PhantomData,
47 }
48 }
49}
50
51pub struct ItemEncoder<W: Write, T> {
52 writer: W,
53 prev_timestamp: u64,
54 prev_delta: i64,
55 _item: PhantomData<T>,
56}
57
58impl<W: Write, T> ItemEncoder<W, T> {
59 pub fn new(writer: W) -> Self {
60 ItemEncoder {
61 writer,
62 prev_timestamp: 0,
63 prev_delta: 0,
64 _item: PhantomData,
65 }
66 }
67
68 pub fn encode(&mut self, item: &Item<T>) -> AppResult<()> {
69 if self.prev_timestamp == 0 {
70 // self.writer.write_varint(item.timestamp)?;
71 self.prev_timestamp = item.timestamp;
72 self.write_data(&item.data)?;
73 return Ok(());
74 }
75
76 let delta = (item.timestamp as i128 - self.prev_timestamp as i128) as i64;
77
78 self.writer.write_varint(delta - self.prev_delta)?;
79 self.prev_timestamp = item.timestamp;
80 self.prev_delta = delta;
81
82 self.write_data(&item.data)?;
83
84 Ok(())
85 }
86
87 fn write_data(&mut self, data: &[u8]) -> AppResult<()> {
88 self.writer.write_varint(data.len())?;
89 self.writer.write_all(data)?;
90 Ok(())
91 }
92
93 pub fn finish(mut self) -> AppResult<W> {
94 self.writer.flush()?;
95 Ok(self.writer)
96 }
97}
98
99pub struct ItemDecoder<R, T> {
100 reader: R,
101 current_timestamp: u64,
102 current_delta: i64,
103 first_item: bool,
104 _item: PhantomData<T>,
105}
106
107impl<R: Read, T: Archive> ItemDecoder<R, T> {
108 pub fn new(reader: R, start_timestamp: u64) -> AppResult<Self> {
109 Ok(ItemDecoder {
110 reader,
111 current_timestamp: start_timestamp,
112 current_delta: 0,
113 first_item: true,
114 _item: PhantomData,
115 })
116 }
117
118 pub fn decode(&mut self) -> AppResult<Option<Item<T>>> {
119 if self.first_item {
120 // read the first timestamp
121 // let timestamp = match self.reader.read_varint::<u64>() {
122 // Ok(timestamp) => timestamp,
123 // Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
124 // Err(e) => return Err(e.into()),
125 // };
126 // self.current_timestamp = timestamp;
127
128 let Some(data_raw) = self.read_item()? else {
129 return Ok(None);
130 };
131 self.first_item = false;
132 return Ok(Some(Item {
133 timestamp: self.current_timestamp,
134 data: data_raw,
135 phantom: PhantomData,
136 }));
137 }
138
139 let Some(_delta) = self.read_timestamp()? else {
140 return Ok(None);
141 };
142
143 // read data
144 let data_raw = match self.read_item()? {
145 Some(data_raw) => data_raw,
146 None => {
147 return Err(io::Error::new(
148 io::ErrorKind::UnexpectedEof,
149 "expected data after delta",
150 )
151 .into());
152 }
153 };
154
155 Ok(Some(Item {
156 timestamp: self.current_timestamp,
157 data: data_raw,
158 phantom: PhantomData,
159 }))
160 }
161
162 // [10, 11, 12, 14] -> [1, 1, 2] -> [0, 1]
163 fn read_timestamp(&mut self) -> AppResult<Option<u64>> {
164 let delta = match self.reader.read_varint::<i64>() {
165 Ok(delta) => delta,
166 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
167 Err(e) => return Err(e.into()),
168 };
169 self.current_delta += delta;
170 self.current_timestamp =
171 (self.current_timestamp as i128 + self.current_delta as i128) as u64;
172 Ok(Some(self.current_timestamp))
173 }
174
175 fn read_item(&mut self) -> AppResult<Option<AlignedVec>> {
176 let data_len = match self.reader.read_varint::<usize>() {
177 Ok(data_len) => data_len,
178 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
179 Err(e) => return Err(e.into()),
180 };
181 let mut data_raw = AlignedVec::with_capacity(data_len);
182 for _ in 0..data_len {
183 data_raw.push(0);
184 }
185 self.reader.read_exact(data_raw.as_mut_slice())?;
186 Ok(Some(data_raw))
187 }
188}
189
190impl<R: Read, T: Archive> Iterator for ItemDecoder<R, T> {
191 type Item = AppResult<Item<T>>;
192
193 fn next(&mut self) -> Option<Self::Item> {
194 self.decode().transpose()
195 }
196}
197
198pub trait WriteVariableExt: Write {
199 fn write_varint(&mut self, value: impl Variable) -> io::Result<usize> {
200 value.encode_variable(self)
201 }
202}
203impl<W: Write> WriteVariableExt for W {}
204
205pub trait ReadVariableExt: Read {
206 fn read_varint<T: Variable>(&mut self) -> io::Result<T> {
207 T::decode_variable(self)
208 }
209}
210impl<R: Read> ReadVariableExt for R {}
211
212#[cfg(test)]
213mod test {
214 use super::*;
215 use rkyv::{Archive, Deserialize, Serialize};
216 use std::io::Cursor;
217
218 #[derive(Archive, Deserialize, Serialize, Debug, PartialEq)]
219 #[rkyv(compare(PartialEq))]
220 struct TestData {
221 id: u32,
222 value: String,
223 }
224
225 #[test]
226 fn test_encoder_decoder_single_item() {
227 let data = TestData {
228 id: 123,
229 value: "test".to_string(),
230 };
231
232 let item = Item::new(1000, &data);
233
234 // encode
235 let mut buffer = Vec::new();
236 let mut encoder = ItemEncoder::new(&mut buffer);
237 encoder.encode(&item).unwrap();
238 encoder.finish().unwrap();
239
240 // decode
241 let cursor = Cursor::new(buffer);
242 let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
243
244 let decoded_item = decoder.decode().unwrap().unwrap();
245 assert_eq!(decoded_item.timestamp, 1000);
246
247 let decoded_data = decoded_item.access();
248 assert_eq!(decoded_data.id, 123);
249 assert_eq!(decoded_data.value.as_str(), "test");
250 }
251
252 #[test]
253 fn test_encoder_decoder_multiple_items() {
254 let items = vec![
255 Item::new(
256 1000,
257 &TestData {
258 id: 1,
259 value: "first".to_string(),
260 },
261 ),
262 Item::new(
263 1010,
264 &TestData {
265 id: 2,
266 value: "second".to_string(),
267 },
268 ),
269 Item::new(
270 1015,
271 &TestData {
272 id: 3,
273 value: "third".to_string(),
274 },
275 ),
276 Item::new(
277 1025,
278 &TestData {
279 id: 4,
280 value: "fourth".to_string(),
281 },
282 ),
283 ];
284
285 // encode
286 let mut buffer = Vec::new();
287 let mut encoder = ItemEncoder::new(&mut buffer);
288
289 for item in &items {
290 encoder.encode(item).unwrap();
291 }
292 encoder.finish().unwrap();
293
294 // decode
295 let cursor = Cursor::new(buffer);
296 let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
297
298 let mut decoded_items = Vec::new();
299 while let Some(item) = decoder.decode().unwrap() {
300 decoded_items.push(item);
301 }
302
303 assert_eq!(decoded_items.len(), 4);
304
305 for (original, decoded) in items.iter().zip(decoded_items.iter()) {
306 assert_eq!(original.timestamp, decoded.timestamp);
307 assert_eq!(original.access().id, decoded.access().id);
308 assert_eq!(
309 original.access().value.as_str(),
310 decoded.access().value.as_str()
311 );
312 }
313 }
314
315 #[test]
316 fn test_encoder_decoder_with_iterator() {
317 let items = vec![
318 Item::new(
319 2000,
320 &TestData {
321 id: 10,
322 value: "a".to_string(),
323 },
324 ),
325 Item::new(
326 2005,
327 &TestData {
328 id: 20,
329 value: "b".to_string(),
330 },
331 ),
332 Item::new(
333 2012,
334 &TestData {
335 id: 30,
336 value: "c".to_string(),
337 },
338 ),
339 ];
340
341 // encode
342 let mut buffer = Vec::new();
343 let mut encoder = ItemEncoder::new(&mut buffer);
344
345 for item in &items {
346 encoder.encode(item).unwrap();
347 }
348 encoder.finish().unwrap();
349
350 // decode
351 let cursor = Cursor::new(buffer);
352 let decoder = ItemDecoder::<_, TestData>::new(cursor, 2000).unwrap();
353
354 let decoded_items: Result<Vec<_>, _> = decoder.collect();
355 let decoded_items = decoded_items.unwrap();
356
357 assert_eq!(decoded_items.len(), 3);
358 assert_eq!(decoded_items[0].timestamp, 2000);
359 assert_eq!(decoded_items[1].timestamp, 2005);
360 assert_eq!(decoded_items[2].timestamp, 2012);
361
362 assert_eq!(decoded_items[0].access().id, 10);
363 assert_eq!(decoded_items[1].access().id, 20);
364 assert_eq!(decoded_items[2].access().id, 30);
365 }
366
367 #[test]
368 fn test_delta_compression() {
369 let items = vec![
370 Item::new(
371 1000,
372 &TestData {
373 id: 1,
374 value: "a".to_string(),
375 },
376 ),
377 Item::new(
378 1010,
379 &TestData {
380 id: 2,
381 value: "b".to_string(),
382 },
383 ), // delta = 10
384 Item::new(
385 1020,
386 &TestData {
387 id: 3,
388 value: "c".to_string(),
389 },
390 ), // delta = 10, delta-of-delta = 0
391 Item::new(
392 1025,
393 &TestData {
394 id: 4,
395 value: "d".to_string(),
396 },
397 ), // delta = 5, delta-of-delta = -5
398 ];
399
400 let mut buffer = Vec::new();
401 let mut encoder = ItemEncoder::new(&mut buffer);
402
403 for item in &items {
404 encoder.encode(item).unwrap();
405 }
406 encoder.finish().unwrap();
407
408 // decode and verify
409 let cursor = Cursor::new(buffer);
410 let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
411
412 let decoded_items: Result<Vec<_>, _> = decoder.collect();
413 let decoded_items = decoded_items.unwrap();
414
415 for (original, decoded) in items.iter().zip(decoded_items.iter()) {
416 assert_eq!(original.timestamp, decoded.timestamp);
417 assert_eq!(original.access().id, decoded.access().id);
418 }
419 }
420
421 #[test]
422 fn test_empty_decode() {
423 let buffer = Vec::new();
424 let cursor = Cursor::new(buffer);
425 let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
426
427 let result = decoder.decode().unwrap();
428 assert!(result.is_none());
429 }
430
431 #[test]
432 fn test_backwards_timestamp() {
433 let items = vec![
434 Item::new(
435 1000,
436 &TestData {
437 id: 1,
438 value: "first".to_string(),
439 },
440 ),
441 Item::new(
442 900,
443 &TestData {
444 id: 2,
445 value: "second".to_string(),
446 },
447 ),
448 ];
449
450 let mut buffer = Vec::new();
451 let mut encoder = ItemEncoder::new(&mut buffer);
452
453 for item in &items {
454 encoder.encode(item).unwrap();
455 }
456 encoder.finish().unwrap();
457
458 let cursor = Cursor::new(buffer);
459 let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
460
461 let decoded_items: Result<Vec<_>, _> = decoder.collect();
462 let decoded_items = decoded_items.unwrap();
463
464 assert_eq!(decoded_items.len(), 2);
465 assert_eq!(decoded_items[0].timestamp, 1000);
466 assert_eq!(decoded_items[1].timestamp, 900);
467 }
468
469 #[test]
470 fn test_different_data_sizes() {
471 let small_data = TestData {
472 id: 1,
473 value: "x".to_string(),
474 };
475 let large_data = TestData {
476 id: 2,
477 value: "a".repeat(1000),
478 };
479
480 let items = vec![Item::new(1000, &small_data), Item::new(1001, &large_data)];
481
482 let mut buffer = Vec::new();
483 let mut encoder = ItemEncoder::new(&mut buffer);
484
485 for item in &items {
486 encoder.encode(item).unwrap();
487 }
488 encoder.finish().unwrap();
489
490 let cursor = Cursor::new(buffer);
491 let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
492
493 let decoded_items: Result<Vec<_>, _> = decoder.collect();
494 let decoded_items = decoded_items.unwrap();
495
496 assert_eq!(decoded_items.len(), 2);
497 assert_eq!(decoded_items[0].access().value.as_str(), "x");
498 assert_eq!(decoded_items[1].access().value.len(), 1000);
499 assert_eq!(decoded_items[1].access().value.as_str(), "a".repeat(1000));
500 }
501}