···
+
use ordered_varint::Variable;
+
Archive, Deserialize, Serialize,
+
api::high::{HighSerializer, HighValidator},
+
rancor::{self, Strategy},
+
ser::allocator::ArenaHandle,
+
io::{self, Read, Write},
+
use crate::error::{AppError, AppResult};
+
phantom: PhantomData<T>,
+
impl<T: Archive> Item<T> {
+
pub fn access(&self) -> &T::Archived {
+
unsafe { rkyv::access_unchecked::<T::Archived>(&self.data) }
+
T::Archived: for<'a> CheckBytes<HighValidator<'a, rancor::Error>>
+
+ Deserialize<T, Strategy<Pool, rancor::Error>>,
+
pub fn deser(&self) -> AppResult<T> {
+
rkyv::from_bytes(&self.data).map_err(AppError::from)
+
impl<T: for<'a> Serialize<HighSerializer<AlignedVec, ArenaHandle<'a>, rancor::Error>>> Item<T> {
+
pub fn new(timestamp: u64, data: &T) -> Self {
+
data: unsafe { rkyv::to_bytes(data).unwrap_unchecked() },
+
pub struct ItemEncoder<W: Write, T> {
+
impl<W: Write, T> ItemEncoder<W, T> {
+
pub fn new(writer: W) -> Self {
+
pub fn encode(&mut self, item: &Item<T>) -> AppResult<()> {
+
if self.prev_timestamp == 0 {
+
// self.writer.write_varint(item.timestamp)?;
+
self.prev_timestamp = item.timestamp;
+
self.write_data(&item.data)?;
+
let delta = (item.timestamp as i128 - self.prev_timestamp as i128) as i64;
+
self.writer.write_varint(delta - self.prev_delta)?;
+
self.prev_timestamp = item.timestamp;
+
self.prev_delta = delta;
+
self.write_data(&item.data)?;
+
fn write_data(&mut self, data: &[u8]) -> AppResult<()> {
+
self.writer.write_varint(data.len())?;
+
self.writer.write_all(data)?;
+
pub fn finish(mut self) -> AppResult<W> {
+
pub struct ItemDecoder<R, T> {
+
current_timestamp: u64,
+
impl<R: Read, T: Archive> ItemDecoder<R, T> {
+
pub fn new(reader: R, start_timestamp: u64) -> AppResult<Self> {
+
current_timestamp: start_timestamp,
+
pub fn decode(&mut self) -> AppResult<Option<Item<T>>> {
+
// read the first timestamp
+
// let timestamp = match self.reader.read_varint::<u64>() {
+
// Ok(timestamp) => timestamp,
+
// Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
+
// Err(e) => return Err(e.into()),
+
// self.current_timestamp = timestamp;
+
let Some(data_raw) = self.read_item()? else {
+
self.first_item = false;
+
timestamp: self.current_timestamp,
+
let Some(_delta) = self.read_timestamp()? else {
+
let data_raw = match self.read_item()? {
+
Some(data_raw) => data_raw,
+
return Err(io::Error::new(
+
io::ErrorKind::UnexpectedEof,
+
"expected data after delta",
+
timestamp: self.current_timestamp,
+
// [10, 11, 12, 14] -> [1, 1, 2] -> [0, 1]
+
fn read_timestamp(&mut self) -> AppResult<Option<u64>> {
+
let delta = match self.reader.read_varint::<i64>() {
+
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
+
Err(e) => return Err(e.into()),
+
self.current_delta += delta;
+
self.current_timestamp =
+
(self.current_timestamp as i128 + self.current_delta as i128) as u64;
+
Ok(Some(self.current_timestamp))
+
fn read_item(&mut self) -> AppResult<Option<AlignedVec>> {
+
let data_len = match self.reader.read_varint::<usize>() {
+
Ok(data_len) => data_len,
+
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
+
Err(e) => return Err(e.into()),
+
let mut data_raw = AlignedVec::with_capacity(data_len);
+
self.reader.read_exact(data_raw.as_mut_slice())?;
+
impl<R: Read, T: Archive> Iterator for ItemDecoder<R, T> {
+
type Item = AppResult<Item<T>>;
+
fn next(&mut self) -> Option<Self::Item> {
+
self.decode().transpose()
+
pub trait WriteVariableExt: Write {
+
fn write_varint(&mut self, value: impl Variable) -> io::Result<usize> {
+
value.encode_variable(self)
+
impl<W: Write> WriteVariableExt for W {}
+
pub trait ReadVariableExt: Read {
+
fn read_varint<T: Variable>(&mut self) -> io::Result<T> {
+
T::decode_variable(self)
+
impl<R: Read> ReadVariableExt for R {}
+
use rkyv::{Archive, Deserialize, Serialize};
+
#[derive(Archive, Deserialize, Serialize, Debug, PartialEq)]
+
#[rkyv(compare(PartialEq))]
+
fn test_encoder_decoder_single_item() {
+
value: "test".to_string(),
+
let item = Item::new(1000, &data);
+
let mut buffer = Vec::new();
+
let mut encoder = ItemEncoder::new(&mut buffer);
+
encoder.encode(&item).unwrap();
+
encoder.finish().unwrap();
+
let cursor = Cursor::new(buffer);
+
let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
+
let decoded_item = decoder.decode().unwrap().unwrap();
+
assert_eq!(decoded_item.timestamp, 1000);
+
let decoded_data = decoded_item.access();
+
assert_eq!(decoded_data.id, 123);
+
assert_eq!(decoded_data.value.as_str(), "test");
+
fn test_encoder_decoder_multiple_items() {
+
value: "first".to_string(),
+
value: "second".to_string(),
+
value: "third".to_string(),
+
value: "fourth".to_string(),
+
let mut buffer = Vec::new();
+
let mut encoder = ItemEncoder::new(&mut buffer);
+
encoder.encode(item).unwrap();
+
encoder.finish().unwrap();
+
let cursor = Cursor::new(buffer);
+
let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
+
let mut decoded_items = Vec::new();
+
while let Some(item) = decoder.decode().unwrap() {
+
decoded_items.push(item);
+
assert_eq!(decoded_items.len(), 4);
+
for (original, decoded) in items.iter().zip(decoded_items.iter()) {
+
assert_eq!(original.timestamp, decoded.timestamp);
+
assert_eq!(original.access().id, decoded.access().id);
+
original.access().value.as_str(),
+
decoded.access().value.as_str()
+
fn test_encoder_decoder_with_iterator() {
+
value: "a".to_string(),
+
value: "b".to_string(),
+
value: "c".to_string(),
+
let mut buffer = Vec::new();
+
let mut encoder = ItemEncoder::new(&mut buffer);
+
encoder.encode(item).unwrap();
+
encoder.finish().unwrap();
+
let cursor = Cursor::new(buffer);
+
let decoder = ItemDecoder::<_, TestData>::new(cursor, 2000).unwrap();
+
let decoded_items: Result<Vec<_>, _> = decoder.collect();
+
let decoded_items = decoded_items.unwrap();
+
assert_eq!(decoded_items.len(), 3);
+
assert_eq!(decoded_items[0].timestamp, 2000);
+
assert_eq!(decoded_items[1].timestamp, 2005);
+
assert_eq!(decoded_items[2].timestamp, 2012);
+
assert_eq!(decoded_items[0].access().id, 10);
+
assert_eq!(decoded_items[1].access().id, 20);
+
assert_eq!(decoded_items[2].access().id, 30);
+
fn test_delta_compression() {
+
value: "a".to_string(),
+
value: "b".to_string(),
+
value: "c".to_string(),
+
), // delta = 10, delta-of-delta = 0
+
value: "d".to_string(),
+
), // delta = 5, delta-of-delta = -5
+
let mut buffer = Vec::new();
+
let mut encoder = ItemEncoder::new(&mut buffer);
+
encoder.encode(item).unwrap();
+
encoder.finish().unwrap();
+
let cursor = Cursor::new(buffer);
+
let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
+
let decoded_items: Result<Vec<_>, _> = decoder.collect();
+
let decoded_items = decoded_items.unwrap();
+
for (original, decoded) in items.iter().zip(decoded_items.iter()) {
+
assert_eq!(original.timestamp, decoded.timestamp);
+
assert_eq!(original.access().id, decoded.access().id);
+
fn test_empty_decode() {
+
let buffer = Vec::new();
+
let cursor = Cursor::new(buffer);
+
let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
+
let result = decoder.decode().unwrap();
+
assert!(result.is_none());
+
fn test_backwards_timestamp() {
+
value: "first".to_string(),
+
value: "second".to_string(),
+
let mut buffer = Vec::new();
+
let mut encoder = ItemEncoder::new(&mut buffer);
+
encoder.encode(item).unwrap();
+
encoder.finish().unwrap();
+
let cursor = Cursor::new(buffer);
+
let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
+
let decoded_items: Result<Vec<_>, _> = decoder.collect();
+
let decoded_items = decoded_items.unwrap();
+
assert_eq!(decoded_items.len(), 2);
+
assert_eq!(decoded_items[0].timestamp, 1000);
+
assert_eq!(decoded_items[1].timestamp, 900);
+
fn test_different_data_sizes() {
+
let small_data = TestData {
+
value: "x".to_string(),
+
let large_data = TestData {
+
value: "a".repeat(1000),
+
let items = vec![Item::new(1000, &small_data), Item::new(1001, &large_data)];
+
let mut buffer = Vec::new();
+
let mut encoder = ItemEncoder::new(&mut buffer);
+
encoder.encode(item).unwrap();
+
encoder.finish().unwrap();
+
let cursor = Cursor::new(buffer);
+
let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
+
let decoded_items: Result<Vec<_>, _> = decoder.collect();
+
let decoded_items = decoded_items.unwrap();
+
assert_eq!(decoded_items.len(), 2);
+
assert_eq!(decoded_items[0].access().value.as_str(), "x");
+
assert_eq!(decoded_items[1].access().value.len(), 1000);
+
assert_eq!(decoded_items[1].access().value.as_str(), "a".repeat(1000));