···
1
+
use ordered_varint::Variable;
3
+
Archive, Deserialize, Serialize,
4
+
api::high::{HighSerializer, HighValidator},
5
+
bytecheck::CheckBytes,
7
+
rancor::{self, Strategy},
8
+
ser::allocator::ArenaHandle,
12
+
io::{self, Read, Write},
13
+
marker::PhantomData,
16
+
use crate::error::{AppError, AppResult};
18
+
pub struct Item<T> {
21
+
phantom: PhantomData<T>,
24
+
impl<T: Archive> Item<T> {
25
+
pub fn access(&self) -> &T::Archived {
26
+
unsafe { rkyv::access_unchecked::<T::Archived>(&self.data) }
33
+
T::Archived: for<'a> CheckBytes<HighValidator<'a, rancor::Error>>
34
+
+ Deserialize<T, Strategy<Pool, rancor::Error>>,
36
+
pub fn deser(&self) -> AppResult<T> {
37
+
rkyv::from_bytes(&self.data).map_err(AppError::from)
41
+
impl<T: for<'a> Serialize<HighSerializer<AlignedVec, ArenaHandle<'a>, rancor::Error>>> Item<T> {
42
+
pub fn new(timestamp: u64, data: &T) -> Self {
45
+
data: unsafe { rkyv::to_bytes(data).unwrap_unchecked() },
46
+
phantom: PhantomData,
51
+
pub struct ItemEncoder<W: Write, T> {
53
+
prev_timestamp: u64,
55
+
_item: PhantomData<T>,
58
+
impl<W: Write, T> ItemEncoder<W, T> {
59
+
pub fn new(writer: W) -> Self {
68
+
pub fn encode(&mut self, item: &Item<T>) -> AppResult<()> {
69
+
if self.prev_timestamp == 0 {
70
+
// self.writer.write_varint(item.timestamp)?;
71
+
self.prev_timestamp = item.timestamp;
72
+
self.write_data(&item.data)?;
76
+
let delta = (item.timestamp as i128 - self.prev_timestamp as i128) as i64;
78
+
self.writer.write_varint(delta - self.prev_delta)?;
79
+
self.prev_timestamp = item.timestamp;
80
+
self.prev_delta = delta;
82
+
self.write_data(&item.data)?;
87
+
fn write_data(&mut self, data: &[u8]) -> AppResult<()> {
88
+
self.writer.write_varint(data.len())?;
89
+
self.writer.write_all(data)?;
93
+
pub fn finish(mut self) -> AppResult<W> {
94
+
self.writer.flush()?;
99
+
pub struct ItemDecoder<R, T> {
101
+
current_timestamp: u64,
102
+
current_delta: i64,
104
+
_item: PhantomData<T>,
107
+
impl<R: Read, T: Archive> ItemDecoder<R, T> {
108
+
pub fn new(reader: R, start_timestamp: u64) -> AppResult<Self> {
111
+
current_timestamp: start_timestamp,
114
+
_item: PhantomData,
118
+
pub fn decode(&mut self) -> AppResult<Option<Item<T>>> {
119
+
if self.first_item {
120
+
// read the first timestamp
121
+
// let timestamp = match self.reader.read_varint::<u64>() {
122
+
// Ok(timestamp) => timestamp,
123
+
// Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
124
+
// Err(e) => return Err(e.into()),
126
+
// self.current_timestamp = timestamp;
128
+
let Some(data_raw) = self.read_item()? else {
131
+
self.first_item = false;
132
+
return Ok(Some(Item {
133
+
timestamp: self.current_timestamp,
135
+
phantom: PhantomData,
139
+
let Some(_delta) = self.read_timestamp()? else {
144
+
let data_raw = match self.read_item()? {
145
+
Some(data_raw) => data_raw,
147
+
return Err(io::Error::new(
148
+
io::ErrorKind::UnexpectedEof,
149
+
"expected data after delta",
156
+
timestamp: self.current_timestamp,
158
+
phantom: PhantomData,
162
+
// [10, 11, 12, 14] -> [1, 1, 2] -> [0, 1]
163
+
fn read_timestamp(&mut self) -> AppResult<Option<u64>> {
164
+
let delta = match self.reader.read_varint::<i64>() {
165
+
Ok(delta) => delta,
166
+
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
167
+
Err(e) => return Err(e.into()),
169
+
self.current_delta += delta;
170
+
self.current_timestamp =
171
+
(self.current_timestamp as i128 + self.current_delta as i128) as u64;
172
+
Ok(Some(self.current_timestamp))
175
+
fn read_item(&mut self) -> AppResult<Option<AlignedVec>> {
176
+
let data_len = match self.reader.read_varint::<usize>() {
177
+
Ok(data_len) => data_len,
178
+
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
179
+
Err(e) => return Err(e.into()),
181
+
let mut data_raw = AlignedVec::with_capacity(data_len);
182
+
for _ in 0..data_len {
185
+
self.reader.read_exact(data_raw.as_mut_slice())?;
190
+
impl<R: Read, T: Archive> Iterator for ItemDecoder<R, T> {
191
+
type Item = AppResult<Item<T>>;
193
+
fn next(&mut self) -> Option<Self::Item> {
194
+
self.decode().transpose()
198
+
pub trait WriteVariableExt: Write {
199
+
fn write_varint(&mut self, value: impl Variable) -> io::Result<usize> {
200
+
value.encode_variable(self)
203
+
impl<W: Write> WriteVariableExt for W {}
205
+
pub trait ReadVariableExt: Read {
206
+
fn read_varint<T: Variable>(&mut self) -> io::Result<T> {
207
+
T::decode_variable(self)
210
+
impl<R: Read> ReadVariableExt for R {}
215
+
use rkyv::{Archive, Deserialize, Serialize};
216
+
use std::io::Cursor;
218
+
#[derive(Archive, Deserialize, Serialize, Debug, PartialEq)]
219
+
#[rkyv(compare(PartialEq))]
226
+
fn test_encoder_decoder_single_item() {
227
+
let data = TestData {
229
+
value: "test".to_string(),
232
+
let item = Item::new(1000, &data);
235
+
let mut buffer = Vec::new();
236
+
let mut encoder = ItemEncoder::new(&mut buffer);
237
+
encoder.encode(&item).unwrap();
238
+
encoder.finish().unwrap();
241
+
let cursor = Cursor::new(buffer);
242
+
let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
244
+
let decoded_item = decoder.decode().unwrap().unwrap();
245
+
assert_eq!(decoded_item.timestamp, 1000);
247
+
let decoded_data = decoded_item.access();
248
+
assert_eq!(decoded_data.id, 123);
249
+
assert_eq!(decoded_data.value.as_str(), "test");
253
+
fn test_encoder_decoder_multiple_items() {
259
+
value: "first".to_string(),
266
+
value: "second".to_string(),
273
+
value: "third".to_string(),
280
+
value: "fourth".to_string(),
286
+
let mut buffer = Vec::new();
287
+
let mut encoder = ItemEncoder::new(&mut buffer);
289
+
for item in &items {
290
+
encoder.encode(item).unwrap();
292
+
encoder.finish().unwrap();
295
+
let cursor = Cursor::new(buffer);
296
+
let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
298
+
let mut decoded_items = Vec::new();
299
+
while let Some(item) = decoder.decode().unwrap() {
300
+
decoded_items.push(item);
303
+
assert_eq!(decoded_items.len(), 4);
305
+
for (original, decoded) in items.iter().zip(decoded_items.iter()) {
306
+
assert_eq!(original.timestamp, decoded.timestamp);
307
+
assert_eq!(original.access().id, decoded.access().id);
309
+
original.access().value.as_str(),
310
+
decoded.access().value.as_str()
316
+
fn test_encoder_decoder_with_iterator() {
322
+
value: "a".to_string(),
329
+
value: "b".to_string(),
336
+
value: "c".to_string(),
342
+
let mut buffer = Vec::new();
343
+
let mut encoder = ItemEncoder::new(&mut buffer);
345
+
for item in &items {
346
+
encoder.encode(item).unwrap();
348
+
encoder.finish().unwrap();
351
+
let cursor = Cursor::new(buffer);
352
+
let decoder = ItemDecoder::<_, TestData>::new(cursor, 2000).unwrap();
354
+
let decoded_items: Result<Vec<_>, _> = decoder.collect();
355
+
let decoded_items = decoded_items.unwrap();
357
+
assert_eq!(decoded_items.len(), 3);
358
+
assert_eq!(decoded_items[0].timestamp, 2000);
359
+
assert_eq!(decoded_items[1].timestamp, 2005);
360
+
assert_eq!(decoded_items[2].timestamp, 2012);
362
+
assert_eq!(decoded_items[0].access().id, 10);
363
+
assert_eq!(decoded_items[1].access().id, 20);
364
+
assert_eq!(decoded_items[2].access().id, 30);
368
+
fn test_delta_compression() {
374
+
value: "a".to_string(),
381
+
value: "b".to_string(),
388
+
value: "c".to_string(),
390
+
), // delta = 10, delta-of-delta = 0
395
+
value: "d".to_string(),
397
+
), // delta = 5, delta-of-delta = -5
400
+
let mut buffer = Vec::new();
401
+
let mut encoder = ItemEncoder::new(&mut buffer);
403
+
for item in &items {
404
+
encoder.encode(item).unwrap();
406
+
encoder.finish().unwrap();
408
+
// decode and verify
409
+
let cursor = Cursor::new(buffer);
410
+
let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
412
+
let decoded_items: Result<Vec<_>, _> = decoder.collect();
413
+
let decoded_items = decoded_items.unwrap();
415
+
for (original, decoded) in items.iter().zip(decoded_items.iter()) {
416
+
assert_eq!(original.timestamp, decoded.timestamp);
417
+
assert_eq!(original.access().id, decoded.access().id);
422
+
fn test_empty_decode() {
423
+
let buffer = Vec::new();
424
+
let cursor = Cursor::new(buffer);
425
+
let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
427
+
let result = decoder.decode().unwrap();
428
+
assert!(result.is_none());
432
+
fn test_backwards_timestamp() {
438
+
value: "first".to_string(),
445
+
value: "second".to_string(),
450
+
let mut buffer = Vec::new();
451
+
let mut encoder = ItemEncoder::new(&mut buffer);
453
+
for item in &items {
454
+
encoder.encode(item).unwrap();
456
+
encoder.finish().unwrap();
458
+
let cursor = Cursor::new(buffer);
459
+
let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
461
+
let decoded_items: Result<Vec<_>, _> = decoder.collect();
462
+
let decoded_items = decoded_items.unwrap();
464
+
assert_eq!(decoded_items.len(), 2);
465
+
assert_eq!(decoded_items[0].timestamp, 1000);
466
+
assert_eq!(decoded_items[1].timestamp, 900);
470
+
fn test_different_data_sizes() {
471
+
let small_data = TestData {
473
+
value: "x".to_string(),
475
+
let large_data = TestData {
477
+
value: "a".repeat(1000),
480
+
let items = vec![Item::new(1000, &small_data), Item::new(1001, &large_data)];
482
+
let mut buffer = Vec::new();
483
+
let mut encoder = ItemEncoder::new(&mut buffer);
485
+
for item in &items {
486
+
encoder.encode(item).unwrap();
488
+
encoder.finish().unwrap();
490
+
let cursor = Cursor::new(buffer);
491
+
let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
493
+
let decoded_items: Result<Vec<_>, _> = decoder.collect();
494
+
let decoded_items = decoded_items.unwrap();
496
+
assert_eq!(decoded_items.len(), 2);
497
+
assert_eq!(decoded_items[0].access().value.as_str(), "x");
498
+
assert_eq!(decoded_items[1].access().value.len(), 1000);
499
+
assert_eq!(decoded_items[1].access().value.as_str(), "a".repeat(1000));