tracks lexicons and how many times they appeared on the jetstream
at migrate 14 kB view raw
1use ordered_varint::Variable; 2use rkyv::{ 3 Archive, Deserialize, Serialize, 4 api::high::{HighSerializer, HighValidator}, 5 bytecheck::CheckBytes, 6 de::Pool, 7 rancor::{self, Strategy}, 8 ser::allocator::ArenaHandle, 9 util::AlignedVec, 10}; 11use std::{ 12 io::{self, Read, Write}, 13 marker::PhantomData, 14}; 15 16use crate::error::{AppError, AppResult}; 17 18pub struct Item<T> { 19 pub timestamp: u64, 20 data: AlignedVec, 21 phantom: PhantomData<T>, 22} 23 24impl<T: Archive> Item<T> { 25 pub fn access(&self) -> &T::Archived { 26 unsafe { rkyv::access_unchecked::<T::Archived>(&self.data) } 27 } 28} 29 30impl<T> Item<T> 31where 32 T: Archive, 33 T::Archived: for<'a> CheckBytes<HighValidator<'a, rancor::Error>> 34 + Deserialize<T, Strategy<Pool, rancor::Error>>, 35{ 36 pub fn deser(&self) -> AppResult<T> { 37 rkyv::from_bytes(&self.data).map_err(AppError::from) 38 } 39} 40 41impl<T: for<'a> Serialize<HighSerializer<AlignedVec, ArenaHandle<'a>, rancor::Error>>> Item<T> { 42 pub fn new(timestamp: u64, data: &T) -> Self { 43 Item { 44 timestamp, 45 data: unsafe { rkyv::to_bytes(data).unwrap_unchecked() }, 46 phantom: PhantomData, 47 } 48 } 49} 50 51pub struct ItemEncoder<W: Write, T> { 52 writer: W, 53 prev_timestamp: u64, 54 prev_delta: i64, 55 _item: PhantomData<T>, 56} 57 58impl<W: Write, T> ItemEncoder<W, T> { 59 pub fn new(writer: W) -> Self { 60 ItemEncoder { 61 writer, 62 prev_timestamp: 0, 63 prev_delta: 0, 64 _item: PhantomData, 65 } 66 } 67 68 pub fn encode(&mut self, item: &Item<T>) -> AppResult<()> { 69 if self.prev_timestamp == 0 { 70 // self.writer.write_varint(item.timestamp)?; 71 self.prev_timestamp = item.timestamp; 72 self.write_data(&item.data)?; 73 return Ok(()); 74 } 75 76 let delta = (item.timestamp as i128 - self.prev_timestamp as i128) as i64; 77 78 self.writer.write_varint(delta - self.prev_delta)?; 79 self.prev_timestamp = item.timestamp; 80 self.prev_delta = delta; 81 82 self.write_data(&item.data)?; 83 84 Ok(()) 85 } 86 87 fn write_data(&mut self, data: &[u8]) -> AppResult<()> { 88 self.writer.write_varint(data.len())?; 89 self.writer.write_all(data)?; 90 Ok(()) 91 } 92 93 pub fn finish(mut self) -> AppResult<W> { 94 self.writer.flush()?; 95 Ok(self.writer) 96 } 97} 98 99pub struct ItemDecoder<R, T> { 100 reader: R, 101 current_timestamp: u64, 102 current_delta: i64, 103 first_item: bool, 104 _item: PhantomData<T>, 105} 106 107impl<R: Read, T: Archive> ItemDecoder<R, T> { 108 pub fn new(reader: R, start_timestamp: u64) -> AppResult<Self> { 109 Ok(ItemDecoder { 110 reader, 111 current_timestamp: start_timestamp, 112 current_delta: 0, 113 first_item: true, 114 _item: PhantomData, 115 }) 116 } 117 118 pub fn decode(&mut self) -> AppResult<Option<Item<T>>> { 119 if self.first_item { 120 // read the first timestamp 121 // let timestamp = match self.reader.read_varint::<u64>() { 122 // Ok(timestamp) => timestamp, 123 // Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None), 124 // Err(e) => return Err(e.into()), 125 // }; 126 // self.current_timestamp = timestamp; 127 128 let Some(data_raw) = self.read_item()? else { 129 return Ok(None); 130 }; 131 self.first_item = false; 132 return Ok(Some(Item { 133 timestamp: self.current_timestamp, 134 data: data_raw, 135 phantom: PhantomData, 136 })); 137 } 138 139 let Some(_delta) = self.read_timestamp()? else { 140 return Ok(None); 141 }; 142 143 // read data 144 let data_raw = match self.read_item()? { 145 Some(data_raw) => data_raw, 146 None => { 147 return Err(io::Error::new( 148 io::ErrorKind::UnexpectedEof, 149 "expected data after delta", 150 ) 151 .into()); 152 } 153 }; 154 155 Ok(Some(Item { 156 timestamp: self.current_timestamp, 157 data: data_raw, 158 phantom: PhantomData, 159 })) 160 } 161 162 // [10, 11, 12, 14] -> [1, 1, 2] -> [0, 1] 163 fn read_timestamp(&mut self) -> AppResult<Option<u64>> { 164 let delta = match self.reader.read_varint::<i64>() { 165 Ok(delta) => delta, 166 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None), 167 Err(e) => return Err(e.into()), 168 }; 169 self.current_delta += delta; 170 self.current_timestamp = 171 (self.current_timestamp as i128 + self.current_delta as i128) as u64; 172 Ok(Some(self.current_timestamp)) 173 } 174 175 fn read_item(&mut self) -> AppResult<Option<AlignedVec>> { 176 let data_len = match self.reader.read_varint::<usize>() { 177 Ok(data_len) => data_len, 178 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None), 179 Err(e) => return Err(e.into()), 180 }; 181 let mut data_raw = AlignedVec::with_capacity(data_len); 182 for _ in 0..data_len { 183 data_raw.push(0); 184 } 185 self.reader.read_exact(data_raw.as_mut_slice())?; 186 Ok(Some(data_raw)) 187 } 188} 189 190impl<R: Read, T: Archive> Iterator for ItemDecoder<R, T> { 191 type Item = AppResult<Item<T>>; 192 193 fn next(&mut self) -> Option<Self::Item> { 194 self.decode().transpose() 195 } 196} 197 198pub trait WriteVariableExt: Write { 199 fn write_varint(&mut self, value: impl Variable) -> io::Result<usize> { 200 value.encode_variable(self) 201 } 202} 203impl<W: Write> WriteVariableExt for W {} 204 205pub trait ReadVariableExt: Read { 206 fn read_varint<T: Variable>(&mut self) -> io::Result<T> { 207 T::decode_variable(self) 208 } 209} 210impl<R: Read> ReadVariableExt for R {} 211 212#[cfg(test)] 213mod test { 214 use super::*; 215 use rkyv::{Archive, Deserialize, Serialize}; 216 use std::io::Cursor; 217 218 #[derive(Archive, Deserialize, Serialize, Debug, PartialEq)] 219 #[rkyv(compare(PartialEq))] 220 struct TestData { 221 id: u32, 222 value: String, 223 } 224 225 #[test] 226 fn test_encoder_decoder_single_item() { 227 let data = TestData { 228 id: 123, 229 value: "test".to_string(), 230 }; 231 232 let item = Item::new(1000, &data); 233 234 // encode 235 let mut buffer = Vec::new(); 236 let mut encoder = ItemEncoder::new(&mut buffer); 237 encoder.encode(&item).unwrap(); 238 encoder.finish().unwrap(); 239 240 // decode 241 let cursor = Cursor::new(buffer); 242 let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap(); 243 244 let decoded_item = decoder.decode().unwrap().unwrap(); 245 assert_eq!(decoded_item.timestamp, 1000); 246 247 let decoded_data = decoded_item.access(); 248 assert_eq!(decoded_data.id, 123); 249 assert_eq!(decoded_data.value.as_str(), "test"); 250 } 251 252 #[test] 253 fn test_encoder_decoder_multiple_items() { 254 let items = vec![ 255 Item::new( 256 1000, 257 &TestData { 258 id: 1, 259 value: "first".to_string(), 260 }, 261 ), 262 Item::new( 263 1010, 264 &TestData { 265 id: 2, 266 value: "second".to_string(), 267 }, 268 ), 269 Item::new( 270 1015, 271 &TestData { 272 id: 3, 273 value: "third".to_string(), 274 }, 275 ), 276 Item::new( 277 1025, 278 &TestData { 279 id: 4, 280 value: "fourth".to_string(), 281 }, 282 ), 283 ]; 284 285 // encode 286 let mut buffer = Vec::new(); 287 let mut encoder = ItemEncoder::new(&mut buffer); 288 289 for item in &items { 290 encoder.encode(item).unwrap(); 291 } 292 encoder.finish().unwrap(); 293 294 // decode 295 let cursor = Cursor::new(buffer); 296 let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap(); 297 298 let mut decoded_items = Vec::new(); 299 while let Some(item) = decoder.decode().unwrap() { 300 decoded_items.push(item); 301 } 302 303 assert_eq!(decoded_items.len(), 4); 304 305 for (original, decoded) in items.iter().zip(decoded_items.iter()) { 306 assert_eq!(original.timestamp, decoded.timestamp); 307 assert_eq!(original.access().id, decoded.access().id); 308 assert_eq!( 309 original.access().value.as_str(), 310 decoded.access().value.as_str() 311 ); 312 } 313 } 314 315 #[test] 316 fn test_encoder_decoder_with_iterator() { 317 let items = vec![ 318 Item::new( 319 2000, 320 &TestData { 321 id: 10, 322 value: "a".to_string(), 323 }, 324 ), 325 Item::new( 326 2005, 327 &TestData { 328 id: 20, 329 value: "b".to_string(), 330 }, 331 ), 332 Item::new( 333 2012, 334 &TestData { 335 id: 30, 336 value: "c".to_string(), 337 }, 338 ), 339 ]; 340 341 // encode 342 let mut buffer = Vec::new(); 343 let mut encoder = ItemEncoder::new(&mut buffer); 344 345 for item in &items { 346 encoder.encode(item).unwrap(); 347 } 348 encoder.finish().unwrap(); 349 350 // decode 351 let cursor = Cursor::new(buffer); 352 let decoder = ItemDecoder::<_, TestData>::new(cursor, 2000).unwrap(); 353 354 let decoded_items: Result<Vec<_>, _> = decoder.collect(); 355 let decoded_items = decoded_items.unwrap(); 356 357 assert_eq!(decoded_items.len(), 3); 358 assert_eq!(decoded_items[0].timestamp, 2000); 359 assert_eq!(decoded_items[1].timestamp, 2005); 360 assert_eq!(decoded_items[2].timestamp, 2012); 361 362 assert_eq!(decoded_items[0].access().id, 10); 363 assert_eq!(decoded_items[1].access().id, 20); 364 assert_eq!(decoded_items[2].access().id, 30); 365 } 366 367 #[test] 368 fn test_delta_compression() { 369 let items = vec![ 370 Item::new( 371 1000, 372 &TestData { 373 id: 1, 374 value: "a".to_string(), 375 }, 376 ), 377 Item::new( 378 1010, 379 &TestData { 380 id: 2, 381 value: "b".to_string(), 382 }, 383 ), // delta = 10 384 Item::new( 385 1020, 386 &TestData { 387 id: 3, 388 value: "c".to_string(), 389 }, 390 ), // delta = 10, delta-of-delta = 0 391 Item::new( 392 1025, 393 &TestData { 394 id: 4, 395 value: "d".to_string(), 396 }, 397 ), // delta = 5, delta-of-delta = -5 398 ]; 399 400 let mut buffer = Vec::new(); 401 let mut encoder = ItemEncoder::new(&mut buffer); 402 403 for item in &items { 404 encoder.encode(item).unwrap(); 405 } 406 encoder.finish().unwrap(); 407 408 // decode and verify 409 let cursor = Cursor::new(buffer); 410 let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap(); 411 412 let decoded_items: Result<Vec<_>, _> = decoder.collect(); 413 let decoded_items = decoded_items.unwrap(); 414 415 for (original, decoded) in items.iter().zip(decoded_items.iter()) { 416 assert_eq!(original.timestamp, decoded.timestamp); 417 assert_eq!(original.access().id, decoded.access().id); 418 } 419 } 420 421 #[test] 422 fn test_empty_decode() { 423 let buffer = Vec::new(); 424 let cursor = Cursor::new(buffer); 425 let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap(); 426 427 let result = decoder.decode().unwrap(); 428 assert!(result.is_none()); 429 } 430 431 #[test] 432 fn test_backwards_timestamp() { 433 let items = vec![ 434 Item::new( 435 1000, 436 &TestData { 437 id: 1, 438 value: "first".to_string(), 439 }, 440 ), 441 Item::new( 442 900, 443 &TestData { 444 id: 2, 445 value: "second".to_string(), 446 }, 447 ), 448 ]; 449 450 let mut buffer = Vec::new(); 451 let mut encoder = ItemEncoder::new(&mut buffer); 452 453 for item in &items { 454 encoder.encode(item).unwrap(); 455 } 456 encoder.finish().unwrap(); 457 458 let cursor = Cursor::new(buffer); 459 let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap(); 460 461 let decoded_items: Result<Vec<_>, _> = decoder.collect(); 462 let decoded_items = decoded_items.unwrap(); 463 464 assert_eq!(decoded_items.len(), 2); 465 assert_eq!(decoded_items[0].timestamp, 1000); 466 assert_eq!(decoded_items[1].timestamp, 900); 467 } 468 469 #[test] 470 fn test_different_data_sizes() { 471 let small_data = TestData { 472 id: 1, 473 value: "x".to_string(), 474 }; 475 let large_data = TestData { 476 id: 2, 477 value: "a".repeat(1000), 478 }; 479 480 let items = vec![Item::new(1000, &small_data), Item::new(1001, &large_data)]; 481 482 let mut buffer = Vec::new(); 483 let mut encoder = ItemEncoder::new(&mut buffer); 484 485 for item in &items { 486 encoder.encode(item).unwrap(); 487 } 488 encoder.finish().unwrap(); 489 490 let cursor = Cursor::new(buffer); 491 let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap(); 492 493 let decoded_items: Result<Vec<_>, _> = decoder.collect(); 494 let decoded_items = decoded_items.unwrap(); 495 496 assert_eq!(decoded_items.len(), 2); 497 assert_eq!(decoded_items[0].access().value.as_str(), "x"); 498 assert_eq!(decoded_items[1].access().value.len(), 1000); 499 assert_eq!(decoded_items[1].access().value.as_str(), "a".repeat(1000)); 500 } 501}