tracks lexicons and how many times they appeared on the jetstream
at main 15 kB view raw
1use std::{ 2 io::{self, Read, Write}, 3 marker::PhantomData, 4 usize, 5}; 6 7use rkyv::{ 8 Archive, Deserialize, Serialize, 9 api::high::{HighSerializer, HighValidator}, 10 bytecheck::CheckBytes, 11 de::Pool, 12 rancor::{self, Strategy}, 13 ser::allocator::ArenaHandle, 14 util::AlignedVec, 15}; 16 17use crate::{ 18 error::{AppError, AppResult}, 19 utils::{ReadVariableExt, WriteVariableExt}, 20}; 21 22pub struct Item<T> { 23 pub timestamp: u64, 24 pub data: AlignedVec, 25 phantom: PhantomData<T>, 26} 27 28impl<T> Item<T> 29where 30 T: Archive, 31 T::Archived: for<'a> CheckBytes<HighValidator<'a, rancor::Error>> 32 + Deserialize<T, Strategy<Pool, rancor::Error>>, 33{ 34 pub fn deser(&self) -> AppResult<T> { 35 rkyv::from_bytes(&self.data).map_err(AppError::from) 36 } 37} 38 39impl<T: for<'a> Serialize<HighSerializer<AlignedVec, ArenaHandle<'a>, rancor::Error>>> Item<T> { 40 pub fn new(timestamp: u64, data: &T) -> Self { 41 Item { 42 timestamp, 43 data: unsafe { rkyv::to_bytes(data).unwrap_unchecked() }, 44 phantom: PhantomData, 45 } 46 } 47} 48 49pub struct ItemEncoder<W: Write, T> { 50 writer: W, 51 prev_timestamp: u64, 52 prev_delta: i64, 53 item_count: usize, 54 _item: PhantomData<T>, 55} 56 57impl<W: Write, T> ItemEncoder<W, T> { 58 pub fn new(writer: W, item_count: usize) -> Self { 59 assert!(item_count > 0); 60 ItemEncoder { 61 writer, 62 prev_timestamp: 0, 63 prev_delta: 0, 64 item_count, 65 _item: PhantomData, 66 } 67 } 68 69 /// NOTE: this is a best effort estimate of the encoded length of the block. 70 /// if T contains variable-length data, the encoded length may be larger than this estimate. 71 pub fn encoded_len(item_count: usize) -> usize { 72 // items length + item count * delta length + data length 73 size_of::<usize>() + item_count * size_of::<(i64, T)>() 74 } 75 76 pub fn encode(&mut self, item: &Item<T>) -> io::Result<()> { 77 if self.prev_timestamp == 0 { 78 self.writer.write_varint(self.item_count)?; 79 // self.writer.write_varint(item.timestamp)?; 80 self.prev_timestamp = item.timestamp; 81 self.write_data(&item.data)?; 82 return Ok(()); 83 } 84 85 let delta = (item.timestamp as i128 - self.prev_timestamp as i128) as i64; 86 87 self.writer.write_varint(delta - self.prev_delta)?; 88 self.prev_timestamp = item.timestamp; 89 self.prev_delta = delta; 90 91 self.write_data(&item.data)?; 92 93 Ok(()) 94 } 95 96 fn write_data(&mut self, data: &[u8]) -> io::Result<()> { 97 self.writer.write_varint(data.len())?; 98 self.writer.write_all(data)?; 99 Ok(()) 100 } 101 102 pub fn finish(mut self) -> io::Result<W> { 103 self.writer.flush()?; 104 Ok(self.writer) 105 } 106} 107 108pub struct ItemDecoder<R, T> { 109 reader: R, 110 current_timestamp: u64, 111 current_delta: i64, 112 items_read: usize, 113 expected: usize, 114 _item: PhantomData<T>, 115} 116 117impl<R: Read, T: Archive> ItemDecoder<R, T> { 118 pub fn new(mut reader: R, start_timestamp: u64) -> io::Result<Self> { 119 let expected = match reader.read_varint() { 120 Ok(expected) => expected, 121 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => 0, 122 Err(e) => return Err(e.into()), 123 }; 124 125 Ok(ItemDecoder { 126 reader, 127 current_timestamp: start_timestamp, 128 current_delta: 0, 129 items_read: 0, 130 expected, 131 _item: PhantomData, 132 }) 133 } 134 135 pub fn item_count(&self) -> usize { 136 self.expected 137 } 138 139 pub fn decode(&mut self) -> io::Result<Option<Item<T>>> { 140 if self.items_read == 0 { 141 // read the first timestamp 142 // let timestamp = match self.reader.read_varint::<u64>() { 143 // Ok(timestamp) => timestamp, 144 // Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None), 145 // Err(e) => return Err(e.into()), 146 // }; 147 // self.current_timestamp = timestamp; 148 149 let Some(data_raw) = self.read_item()? else { 150 return Ok(None); 151 }; 152 153 self.items_read += 1; 154 return Ok(Some(Item { 155 timestamp: self.current_timestamp, 156 data: data_raw, 157 phantom: PhantomData, 158 })); 159 } 160 161 if self.items_read >= self.expected { 162 return Ok(None); 163 } 164 165 let Some(_delta) = self.read_timestamp()? else { 166 return Ok(None); 167 }; 168 169 // read data 170 let data_raw = match self.read_item()? { 171 Some(data_raw) => data_raw, 172 None => { 173 return Err(io::Error::new( 174 io::ErrorKind::UnexpectedEof, 175 "expected data after delta", 176 ) 177 .into()); 178 } 179 }; 180 181 self.items_read += 1; 182 Ok(Some(Item { 183 timestamp: self.current_timestamp, 184 data: data_raw, 185 phantom: PhantomData, 186 })) 187 } 188 189 // [10, 11, 12, 14] -> [1, 1, 2] -> [0, 1] 190 fn read_timestamp(&mut self) -> io::Result<Option<u64>> { 191 let delta = match self.reader.read_varint::<i64>() { 192 Ok(delta) => delta, 193 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None), 194 Err(e) => return Err(e.into()), 195 }; 196 self.current_delta += delta; 197 self.current_timestamp = 198 (self.current_timestamp as i128 + self.current_delta as i128) as u64; 199 Ok(Some(self.current_timestamp)) 200 } 201 202 fn read_item(&mut self) -> io::Result<Option<AlignedVec>> { 203 let data_len = match self.reader.read_varint::<usize>() { 204 Ok(data_len) => data_len, 205 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None), 206 Err(e) => return Err(e.into()), 207 }; 208 let mut data_raw = AlignedVec::with_capacity(data_len); 209 for _ in 0..data_len { 210 data_raw.push(0); 211 } 212 self.reader.read_exact(data_raw.as_mut_slice())?; 213 Ok(Some(data_raw)) 214 } 215} 216 217impl<R: Read, T: Archive> Iterator for ItemDecoder<R, T> { 218 type Item = io::Result<Item<T>>; 219 220 fn next(&mut self) -> Option<Self::Item> { 221 self.decode().transpose() 222 } 223 224 fn size_hint(&self) -> (usize, Option<usize>) { 225 (self.expected, Some(self.expected)) 226 } 227} 228 229#[cfg(test)] 230mod test { 231 use super::*; 232 use rkyv::{Archive, Deserialize, Serialize}; 233 use std::io::Cursor; 234 235 #[derive(Archive, Deserialize, Serialize, Debug, PartialEq)] 236 #[rkyv(compare(PartialEq))] 237 struct TestData { 238 id: u32, 239 value: String, 240 } 241 242 #[test] 243 fn test_encoder_decoder_single_item() { 244 let data = TestData { 245 id: 123, 246 value: "test".to_string(), 247 }; 248 249 let item = Item::new(1000, &data); 250 251 // encode 252 let mut buffer = Vec::new(); 253 let mut encoder = ItemEncoder::new(&mut buffer, 1); 254 encoder.encode(&item).unwrap(); 255 encoder.finish().unwrap(); 256 257 // decode 258 let cursor = Cursor::new(buffer); 259 let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap(); 260 261 let decoded_item = decoder.decode().unwrap().unwrap(); 262 assert_eq!(decoded_item.timestamp, 1000); 263 264 let decoded_data = decoded_item.deser().unwrap(); 265 assert_eq!(decoded_data.id, 123); 266 assert_eq!(decoded_data.value.as_str(), "test"); 267 } 268 269 #[test] 270 fn test_encoder_decoder_multiple_items() { 271 let items = vec![ 272 Item::new( 273 1000, 274 &TestData { 275 id: 1, 276 value: "first".to_string(), 277 }, 278 ), 279 Item::new( 280 1010, 281 &TestData { 282 id: 2, 283 value: "second".to_string(), 284 }, 285 ), 286 Item::new( 287 1015, 288 &TestData { 289 id: 3, 290 value: "third".to_string(), 291 }, 292 ), 293 Item::new( 294 1025, 295 &TestData { 296 id: 4, 297 value: "fourth".to_string(), 298 }, 299 ), 300 ]; 301 302 // encode 303 let mut buffer = Vec::new(); 304 let mut encoder = ItemEncoder::new(&mut buffer, items.len()); 305 306 for item in &items { 307 encoder.encode(item).unwrap(); 308 } 309 encoder.finish().unwrap(); 310 311 // decode 312 let cursor = Cursor::new(buffer); 313 let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap(); 314 315 let mut decoded_items = Vec::new(); 316 while let Some(item) = decoder.decode().unwrap() { 317 decoded_items.push(item); 318 } 319 320 assert_eq!(decoded_items.len(), 4); 321 322 for (original, decoded) in items.iter().zip(decoded_items.iter()) { 323 assert_eq!(original.timestamp, decoded.timestamp); 324 assert_eq!(original.deser().unwrap().id, decoded.deser().unwrap().id); 325 assert_eq!( 326 original.deser().unwrap().value.as_str(), 327 decoded.deser().unwrap().value.as_str() 328 ); 329 } 330 } 331 332 #[test] 333 fn test_encoder_decoder_with_iterator() { 334 let items = vec![ 335 Item::new( 336 2000, 337 &TestData { 338 id: 10, 339 value: "a".to_string(), 340 }, 341 ), 342 Item::new( 343 2005, 344 &TestData { 345 id: 20, 346 value: "b".to_string(), 347 }, 348 ), 349 Item::new( 350 2012, 351 &TestData { 352 id: 30, 353 value: "c".to_string(), 354 }, 355 ), 356 ]; 357 358 // encode 359 let mut buffer = Vec::new(); 360 let mut encoder = ItemEncoder::new(&mut buffer, items.len()); 361 362 for item in &items { 363 encoder.encode(item).unwrap(); 364 } 365 encoder.finish().unwrap(); 366 367 // decode 368 let cursor = Cursor::new(buffer); 369 let decoder = ItemDecoder::<_, TestData>::new(cursor, 2000).unwrap(); 370 371 let decoded_items: Result<Vec<_>, _> = decoder.collect(); 372 let decoded_items = decoded_items.unwrap(); 373 374 assert_eq!(decoded_items.len(), 3); 375 assert_eq!(decoded_items[0].timestamp, 2000); 376 assert_eq!(decoded_items[1].timestamp, 2005); 377 assert_eq!(decoded_items[2].timestamp, 2012); 378 379 assert_eq!(decoded_items[0].deser().unwrap().id, 10); 380 assert_eq!(decoded_items[1].deser().unwrap().id, 20); 381 assert_eq!(decoded_items[2].deser().unwrap().id, 30); 382 } 383 384 #[test] 385 fn test_delta_compression() { 386 let items = vec![ 387 Item::new( 388 1000, 389 &TestData { 390 id: 1, 391 value: "a".to_string(), 392 }, 393 ), 394 Item::new( 395 1010, 396 &TestData { 397 id: 2, 398 value: "b".to_string(), 399 }, 400 ), // delta = 10 401 Item::new( 402 1020, 403 &TestData { 404 id: 3, 405 value: "c".to_string(), 406 }, 407 ), // delta = 10, delta-of-delta = 0 408 Item::new( 409 1025, 410 &TestData { 411 id: 4, 412 value: "d".to_string(), 413 }, 414 ), // delta = 5, delta-of-delta = -5 415 ]; 416 417 let mut buffer = Vec::new(); 418 let mut encoder = ItemEncoder::new(&mut buffer, items.len()); 419 420 for item in &items { 421 encoder.encode(item).unwrap(); 422 } 423 encoder.finish().unwrap(); 424 425 // decode and verify 426 let cursor = Cursor::new(buffer); 427 let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap(); 428 429 let decoded_items: Result<Vec<_>, _> = decoder.collect(); 430 let decoded_items = decoded_items.unwrap(); 431 432 for (original, decoded) in items.iter().zip(decoded_items.iter()) { 433 assert_eq!(original.timestamp, decoded.timestamp); 434 assert_eq!(original.deser().unwrap().id, decoded.deser().unwrap().id); 435 } 436 } 437 438 #[test] 439 fn test_empty_decode() { 440 let buffer = Vec::new(); 441 let cursor = Cursor::new(buffer); 442 let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap(); 443 444 let result = decoder.decode().unwrap(); 445 assert!(result.is_none()); 446 } 447 448 #[test] 449 fn test_backwards_timestamp() { 450 let items = vec![ 451 Item::new( 452 1000, 453 &TestData { 454 id: 1, 455 value: "first".to_string(), 456 }, 457 ), 458 Item::new( 459 900, 460 &TestData { 461 id: 2, 462 value: "second".to_string(), 463 }, 464 ), 465 ]; 466 467 let mut buffer = Vec::new(); 468 let mut encoder = ItemEncoder::new(&mut buffer, items.len()); 469 470 for item in &items { 471 encoder.encode(item).unwrap(); 472 } 473 encoder.finish().unwrap(); 474 475 let cursor = Cursor::new(buffer); 476 let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap(); 477 478 let decoded_items: Result<Vec<_>, _> = decoder.collect(); 479 let decoded_items = decoded_items.unwrap(); 480 481 assert_eq!(decoded_items.len(), 2); 482 assert_eq!(decoded_items[0].timestamp, 1000); 483 assert_eq!(decoded_items[1].timestamp, 900); 484 } 485 486 #[test] 487 fn test_different_data_sizes() { 488 let small_data = TestData { 489 id: 1, 490 value: "x".to_string(), 491 }; 492 let large_data = TestData { 493 id: 2, 494 value: "a".repeat(1000), 495 }; 496 497 let items = vec![Item::new(1000, &small_data), Item::new(1001, &large_data)]; 498 499 let mut buffer = Vec::new(); 500 let mut encoder = ItemEncoder::new(&mut buffer, items.len()); 501 502 for item in &items { 503 encoder.encode(item).unwrap(); 504 } 505 encoder.finish().unwrap(); 506 507 let cursor = Cursor::new(buffer); 508 let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap(); 509 510 let decoded_items: Result<Vec<_>, _> = decoder.collect(); 511 let decoded_items = decoded_items.unwrap(); 512 513 assert_eq!(decoded_items.len(), 2); 514 assert_eq!(decoded_items[0].deser().unwrap().value.as_str(), "x"); 515 assert_eq!(decoded_items[1].deser().unwrap().value.len(), 1000); 516 assert_eq!( 517 decoded_items[1].deser().unwrap().value.as_str(), 518 "a".repeat(1000) 519 ); 520 } 521}