tracks lexicons and how many times they appeared on the jetstream
1use std::{
2 io::{self, Read, Write},
3 marker::PhantomData,
4 usize,
5};
6
7use rkyv::{
8 Archive, Deserialize, Serialize,
9 api::high::{HighSerializer, HighValidator},
10 bytecheck::CheckBytes,
11 de::Pool,
12 rancor::{self, Strategy},
13 ser::allocator::ArenaHandle,
14 util::AlignedVec,
15};
16
17use crate::{
18 error::{AppError, AppResult},
19 utils::{ReadVariableExt, WriteVariableExt},
20};
21
22pub struct Item<T> {
23 pub timestamp: u64,
24 pub data: AlignedVec,
25 phantom: PhantomData<T>,
26}
27
28impl<T> Item<T>
29where
30 T: Archive,
31 T::Archived: for<'a> CheckBytes<HighValidator<'a, rancor::Error>>
32 + Deserialize<T, Strategy<Pool, rancor::Error>>,
33{
34 pub fn deser(&self) -> AppResult<T> {
35 rkyv::from_bytes(&self.data).map_err(AppError::from)
36 }
37}
38
39impl<T: for<'a> Serialize<HighSerializer<AlignedVec, ArenaHandle<'a>, rancor::Error>>> Item<T> {
40 pub fn new(timestamp: u64, data: &T) -> Self {
41 Item {
42 timestamp,
43 data: unsafe { rkyv::to_bytes(data).unwrap_unchecked() },
44 phantom: PhantomData,
45 }
46 }
47}
48
49pub struct ItemEncoder<W: Write, T> {
50 writer: W,
51 prev_timestamp: u64,
52 prev_delta: i64,
53 item_count: usize,
54 _item: PhantomData<T>,
55}
56
57impl<W: Write, T> ItemEncoder<W, T> {
58 pub fn new(writer: W, item_count: usize) -> Self {
59 assert!(item_count > 0);
60 ItemEncoder {
61 writer,
62 prev_timestamp: 0,
63 prev_delta: 0,
64 item_count,
65 _item: PhantomData,
66 }
67 }
68
69 /// NOTE: this is a best effort estimate of the encoded length of the block.
70 /// if T contains variable-length data, the encoded length may be larger than this estimate.
71 pub fn encoded_len(item_count: usize) -> usize {
72 // items length + item count * delta length + data length
73 size_of::<usize>() + item_count * size_of::<(i64, T)>()
74 }
75
76 pub fn encode(&mut self, item: &Item<T>) -> io::Result<()> {
77 if self.prev_timestamp == 0 {
78 self.writer.write_varint(self.item_count)?;
79 // self.writer.write_varint(item.timestamp)?;
80 self.prev_timestamp = item.timestamp;
81 self.write_data(&item.data)?;
82 return Ok(());
83 }
84
85 let delta = (item.timestamp as i128 - self.prev_timestamp as i128) as i64;
86
87 self.writer.write_varint(delta - self.prev_delta)?;
88 self.prev_timestamp = item.timestamp;
89 self.prev_delta = delta;
90
91 self.write_data(&item.data)?;
92
93 Ok(())
94 }
95
96 fn write_data(&mut self, data: &[u8]) -> io::Result<()> {
97 self.writer.write_varint(data.len())?;
98 self.writer.write_all(data)?;
99 Ok(())
100 }
101
102 pub fn finish(mut self) -> io::Result<W> {
103 self.writer.flush()?;
104 Ok(self.writer)
105 }
106}
107
108pub struct ItemDecoder<R, T> {
109 reader: R,
110 current_timestamp: u64,
111 current_delta: i64,
112 items_read: usize,
113 expected: usize,
114 _item: PhantomData<T>,
115}
116
117impl<R: Read, T: Archive> ItemDecoder<R, T> {
118 pub fn new(mut reader: R, start_timestamp: u64) -> io::Result<Self> {
119 let expected = match reader.read_varint() {
120 Ok(expected) => expected,
121 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => 0,
122 Err(e) => return Err(e.into()),
123 };
124
125 Ok(ItemDecoder {
126 reader,
127 current_timestamp: start_timestamp,
128 current_delta: 0,
129 items_read: 0,
130 expected,
131 _item: PhantomData,
132 })
133 }
134
135 pub fn item_count(&self) -> usize {
136 self.expected
137 }
138
139 pub fn decode(&mut self) -> io::Result<Option<Item<T>>> {
140 if self.items_read == 0 {
141 // read the first timestamp
142 // let timestamp = match self.reader.read_varint::<u64>() {
143 // Ok(timestamp) => timestamp,
144 // Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
145 // Err(e) => return Err(e.into()),
146 // };
147 // self.current_timestamp = timestamp;
148
149 let Some(data_raw) = self.read_item()? else {
150 return Ok(None);
151 };
152
153 self.items_read += 1;
154 return Ok(Some(Item {
155 timestamp: self.current_timestamp,
156 data: data_raw,
157 phantom: PhantomData,
158 }));
159 }
160
161 if self.items_read >= self.expected {
162 return Ok(None);
163 }
164
165 let Some(_delta) = self.read_timestamp()? else {
166 return Ok(None);
167 };
168
169 // read data
170 let data_raw = match self.read_item()? {
171 Some(data_raw) => data_raw,
172 None => {
173 return Err(io::Error::new(
174 io::ErrorKind::UnexpectedEof,
175 "expected data after delta",
176 )
177 .into());
178 }
179 };
180
181 self.items_read += 1;
182 Ok(Some(Item {
183 timestamp: self.current_timestamp,
184 data: data_raw,
185 phantom: PhantomData,
186 }))
187 }
188
189 // [10, 11, 12, 14] -> [1, 1, 2] -> [0, 1]
190 fn read_timestamp(&mut self) -> io::Result<Option<u64>> {
191 let delta = match self.reader.read_varint::<i64>() {
192 Ok(delta) => delta,
193 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
194 Err(e) => return Err(e.into()),
195 };
196 self.current_delta += delta;
197 self.current_timestamp =
198 (self.current_timestamp as i128 + self.current_delta as i128) as u64;
199 Ok(Some(self.current_timestamp))
200 }
201
202 fn read_item(&mut self) -> io::Result<Option<AlignedVec>> {
203 let data_len = match self.reader.read_varint::<usize>() {
204 Ok(data_len) => data_len,
205 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
206 Err(e) => return Err(e.into()),
207 };
208 let mut data_raw = AlignedVec::with_capacity(data_len);
209 for _ in 0..data_len {
210 data_raw.push(0);
211 }
212 self.reader.read_exact(data_raw.as_mut_slice())?;
213 Ok(Some(data_raw))
214 }
215}
216
217impl<R: Read, T: Archive> Iterator for ItemDecoder<R, T> {
218 type Item = io::Result<Item<T>>;
219
220 fn next(&mut self) -> Option<Self::Item> {
221 self.decode().transpose()
222 }
223
224 fn size_hint(&self) -> (usize, Option<usize>) {
225 (self.expected, Some(self.expected))
226 }
227}
228
229#[cfg(test)]
230mod test {
231 use super::*;
232 use rkyv::{Archive, Deserialize, Serialize};
233 use std::io::Cursor;
234
235 #[derive(Archive, Deserialize, Serialize, Debug, PartialEq)]
236 #[rkyv(compare(PartialEq))]
237 struct TestData {
238 id: u32,
239 value: String,
240 }
241
242 #[test]
243 fn test_encoder_decoder_single_item() {
244 let data = TestData {
245 id: 123,
246 value: "test".to_string(),
247 };
248
249 let item = Item::new(1000, &data);
250
251 // encode
252 let mut buffer = Vec::new();
253 let mut encoder = ItemEncoder::new(&mut buffer, 1);
254 encoder.encode(&item).unwrap();
255 encoder.finish().unwrap();
256
257 // decode
258 let cursor = Cursor::new(buffer);
259 let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
260
261 let decoded_item = decoder.decode().unwrap().unwrap();
262 assert_eq!(decoded_item.timestamp, 1000);
263
264 let decoded_data = decoded_item.deser().unwrap();
265 assert_eq!(decoded_data.id, 123);
266 assert_eq!(decoded_data.value.as_str(), "test");
267 }
268
269 #[test]
270 fn test_encoder_decoder_multiple_items() {
271 let items = vec![
272 Item::new(
273 1000,
274 &TestData {
275 id: 1,
276 value: "first".to_string(),
277 },
278 ),
279 Item::new(
280 1010,
281 &TestData {
282 id: 2,
283 value: "second".to_string(),
284 },
285 ),
286 Item::new(
287 1015,
288 &TestData {
289 id: 3,
290 value: "third".to_string(),
291 },
292 ),
293 Item::new(
294 1025,
295 &TestData {
296 id: 4,
297 value: "fourth".to_string(),
298 },
299 ),
300 ];
301
302 // encode
303 let mut buffer = Vec::new();
304 let mut encoder = ItemEncoder::new(&mut buffer, items.len());
305
306 for item in &items {
307 encoder.encode(item).unwrap();
308 }
309 encoder.finish().unwrap();
310
311 // decode
312 let cursor = Cursor::new(buffer);
313 let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
314
315 let mut decoded_items = Vec::new();
316 while let Some(item) = decoder.decode().unwrap() {
317 decoded_items.push(item);
318 }
319
320 assert_eq!(decoded_items.len(), 4);
321
322 for (original, decoded) in items.iter().zip(decoded_items.iter()) {
323 assert_eq!(original.timestamp, decoded.timestamp);
324 assert_eq!(original.deser().unwrap().id, decoded.deser().unwrap().id);
325 assert_eq!(
326 original.deser().unwrap().value.as_str(),
327 decoded.deser().unwrap().value.as_str()
328 );
329 }
330 }
331
332 #[test]
333 fn test_encoder_decoder_with_iterator() {
334 let items = vec![
335 Item::new(
336 2000,
337 &TestData {
338 id: 10,
339 value: "a".to_string(),
340 },
341 ),
342 Item::new(
343 2005,
344 &TestData {
345 id: 20,
346 value: "b".to_string(),
347 },
348 ),
349 Item::new(
350 2012,
351 &TestData {
352 id: 30,
353 value: "c".to_string(),
354 },
355 ),
356 ];
357
358 // encode
359 let mut buffer = Vec::new();
360 let mut encoder = ItemEncoder::new(&mut buffer, items.len());
361
362 for item in &items {
363 encoder.encode(item).unwrap();
364 }
365 encoder.finish().unwrap();
366
367 // decode
368 let cursor = Cursor::new(buffer);
369 let decoder = ItemDecoder::<_, TestData>::new(cursor, 2000).unwrap();
370
371 let decoded_items: Result<Vec<_>, _> = decoder.collect();
372 let decoded_items = decoded_items.unwrap();
373
374 assert_eq!(decoded_items.len(), 3);
375 assert_eq!(decoded_items[0].timestamp, 2000);
376 assert_eq!(decoded_items[1].timestamp, 2005);
377 assert_eq!(decoded_items[2].timestamp, 2012);
378
379 assert_eq!(decoded_items[0].deser().unwrap().id, 10);
380 assert_eq!(decoded_items[1].deser().unwrap().id, 20);
381 assert_eq!(decoded_items[2].deser().unwrap().id, 30);
382 }
383
384 #[test]
385 fn test_delta_compression() {
386 let items = vec![
387 Item::new(
388 1000,
389 &TestData {
390 id: 1,
391 value: "a".to_string(),
392 },
393 ),
394 Item::new(
395 1010,
396 &TestData {
397 id: 2,
398 value: "b".to_string(),
399 },
400 ), // delta = 10
401 Item::new(
402 1020,
403 &TestData {
404 id: 3,
405 value: "c".to_string(),
406 },
407 ), // delta = 10, delta-of-delta = 0
408 Item::new(
409 1025,
410 &TestData {
411 id: 4,
412 value: "d".to_string(),
413 },
414 ), // delta = 5, delta-of-delta = -5
415 ];
416
417 let mut buffer = Vec::new();
418 let mut encoder = ItemEncoder::new(&mut buffer, items.len());
419
420 for item in &items {
421 encoder.encode(item).unwrap();
422 }
423 encoder.finish().unwrap();
424
425 // decode and verify
426 let cursor = Cursor::new(buffer);
427 let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
428
429 let decoded_items: Result<Vec<_>, _> = decoder.collect();
430 let decoded_items = decoded_items.unwrap();
431
432 for (original, decoded) in items.iter().zip(decoded_items.iter()) {
433 assert_eq!(original.timestamp, decoded.timestamp);
434 assert_eq!(original.deser().unwrap().id, decoded.deser().unwrap().id);
435 }
436 }
437
438 #[test]
439 fn test_empty_decode() {
440 let buffer = Vec::new();
441 let cursor = Cursor::new(buffer);
442 let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
443
444 let result = decoder.decode().unwrap();
445 assert!(result.is_none());
446 }
447
448 #[test]
449 fn test_backwards_timestamp() {
450 let items = vec![
451 Item::new(
452 1000,
453 &TestData {
454 id: 1,
455 value: "first".to_string(),
456 },
457 ),
458 Item::new(
459 900,
460 &TestData {
461 id: 2,
462 value: "second".to_string(),
463 },
464 ),
465 ];
466
467 let mut buffer = Vec::new();
468 let mut encoder = ItemEncoder::new(&mut buffer, items.len());
469
470 for item in &items {
471 encoder.encode(item).unwrap();
472 }
473 encoder.finish().unwrap();
474
475 let cursor = Cursor::new(buffer);
476 let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
477
478 let decoded_items: Result<Vec<_>, _> = decoder.collect();
479 let decoded_items = decoded_items.unwrap();
480
481 assert_eq!(decoded_items.len(), 2);
482 assert_eq!(decoded_items[0].timestamp, 1000);
483 assert_eq!(decoded_items[1].timestamp, 900);
484 }
485
486 #[test]
487 fn test_different_data_sizes() {
488 let small_data = TestData {
489 id: 1,
490 value: "x".to_string(),
491 };
492 let large_data = TestData {
493 id: 2,
494 value: "a".repeat(1000),
495 };
496
497 let items = vec![Item::new(1000, &small_data), Item::new(1001, &large_data)];
498
499 let mut buffer = Vec::new();
500 let mut encoder = ItemEncoder::new(&mut buffer, items.len());
501
502 for item in &items {
503 encoder.encode(item).unwrap();
504 }
505 encoder.finish().unwrap();
506
507 let cursor = Cursor::new(buffer);
508 let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
509
510 let decoded_items: Result<Vec<_>, _> = decoder.collect();
511 let decoded_items = decoded_items.unwrap();
512
513 assert_eq!(decoded_items.len(), 2);
514 assert_eq!(decoded_items[0].deser().unwrap().value.as_str(), "x");
515 assert_eq!(decoded_items[1].deser().unwrap().value.len(), 1000);
516 assert_eq!(
517 decoded_items[1].deser().unwrap().value.as_str(),
518 "a".repeat(1000)
519 );
520 }
521}