compact binary serialization format with built-in compression
at trunk 9.4 kB view raw
1use crate::{CompressionMethod, Error, FileHeader, Result, Tag, TagKind, len}; 2use std::io::{Seek, Write}; 3 4use chrono::{DateTime, Utc}; 5use uuid::Uuid; 6 7struct RawWriter<W: Write + Seek> { 8 writer: W, 9 position: usize, 10 header: FileHeader, 11} 12 13impl<W: Write + Seek> RawWriter<W> { 14 fn new(writer: W, compression: CompressionMethod) -> Result<Self> { 15 let header = FileHeader { 16 version: FileHeader::CURRENT_VERSION, 17 little_endian: true, 18 compression, 19 payload_length: 0, 20 }; 21 22 Ok(Self { 23 writer, 24 position: 0, 25 header, 26 }) 27 } 28 29 fn write_tag(&mut self, tag: &Tag) -> Result<()> { 30 self.write_tag_kind(tag.kind())?; 31 self.write_tag_of_kind(tag)?; 32 33 self.header.payload_length = self.position as u32; 34 35 Ok(()) 36 } 37 38 fn write_tag_kind(&mut self, kind: TagKind) -> Result<()> { 39 self.write_u8(kind as u8) 40 } 41 42 fn write_tag_of_kind(&mut self, tag: &Tag) -> Result<()> { 43 match tag { 44 Tag::U8(v) => self.write_u8(*v), 45 Tag::I8(v) => self.write_i8(*v), 46 Tag::U16(v) => self.write_u16(*v), 47 Tag::I16(v) => self.write_i16(*v), 48 Tag::U32(v) => self.write_u32(*v), 49 Tag::I32(v) => self.write_i32(*v), 50 Tag::U64(v) => self.write_u64(*v), 51 Tag::I64(v) => self.write_i64(*v), 52 Tag::F32(v) => self.write_f32(*v), 53 Tag::F64(v) => self.write_f64(*v), 54 Tag::Bool(v) => self.write_bool(*v), 55 Tag::String(s) => self.write_string(s), 56 Tag::Option(kind, opt) => self.write_option(kind, opt.as_deref()), 57 Tag::List(list) => self.write_list(list), 58 Tag::Array(kind, array) => self.write_array(*kind, array), 59 Tag::Map(map) => self.write_map(map), 60 Tag::Timestamp(dt) => self.write_timestamp(dt), 61 Tag::Uuid(uuid) => self.write_uuid(uuid), 62 } 63 } 64 65 fn write_u8(&mut self, v: u8) -> Result<()> { 66 self.writer.write_all(&[v]).map_err(Error::IoError)?; 67 self.position += len::INT_8; 68 Ok(()) 69 } 70 71 fn write_i8(&mut self, v: i8) -> Result<()> { 72 self.write_u8(v as u8) 73 } 74 75 fn write_u16(&mut self, v: u16) -> Result<()> { 76 let bytes = if self.header.little_endian { 77 v.to_le_bytes() 78 } else { 79 v.to_be_bytes() 80 }; 81 self.writer.write_all(&bytes).map_err(Error::IoError)?; 82 self.position += len::INT_16; 83 Ok(()) 84 } 85 86 fn write_i16(&mut self, v: i16) -> Result<()> { 87 self.write_u16(v as u16) 88 } 89 90 fn write_u32(&mut self, v: u32) -> Result<()> { 91 let bytes = if self.header.little_endian { 92 v.to_le_bytes() 93 } else { 94 v.to_be_bytes() 95 }; 96 self.writer.write_all(&bytes).map_err(Error::IoError)?; 97 self.position += len::INT_32; 98 Ok(()) 99 } 100 101 fn write_i32(&mut self, v: i32) -> Result<()> { 102 self.write_u32(v as u32) 103 } 104 105 fn write_u64(&mut self, v: u64) -> Result<()> { 106 let bytes = if self.header.little_endian { 107 v.to_le_bytes() 108 } else { 109 v.to_be_bytes() 110 }; 111 self.writer.write_all(&bytes).map_err(Error::IoError)?; 112 self.position += len::INT_64; 113 Ok(()) 114 } 115 116 fn write_i64(&mut self, v: i64) -> Result<()> { 117 self.write_u64(v as u64) 118 } 119 120 fn write_f32(&mut self, v: f32) -> Result<()> { 121 self.write_u32(v.to_bits()) 122 } 123 124 fn write_f64(&mut self, v: f64) -> Result<()> { 125 self.write_u64(v.to_bits()) 126 } 127 128 fn write_bool(&mut self, v: bool) -> Result<()> { 129 self.write_u8(if v { 1 } else { 0 }) 130 } 131 132 fn write_string(&mut self, s: &str) -> Result<()> { 133 self.write_u32(s.len() as u32)?; 134 self.writer 135 .write_all(s.as_bytes()) 136 .map_err(Error::IoError)?; 137 self.position += s.len(); 138 Ok(()) 139 } 140 141 fn write_option( 142 &mut self, 143 kind: &TagKind, 144 value: Option<&Tag>, 145 ) -> Result<()> { 146 self.write_tag_kind(*kind)?; 147 match value { 148 Some(tag) => { 149 self.write_bool(true)?; 150 self.write_tag_of_kind(tag) 151 } 152 None => self.write_bool(false), 153 } 154 } 155 156 fn write_list(&mut self, list: &[Tag]) -> Result<()> { 157 self.write_u32(list.len() as u32)?; 158 for tag in list { 159 self.write_tag(tag)?; 160 } 161 Ok(()) 162 } 163 164 fn write_array(&mut self, kind: TagKind, array: &[Tag]) -> Result<()> { 165 self.write_u32(array.len() as u32)?; 166 self.write_tag_kind(kind)?; 167 for tag in array { 168 self.write_tag_of_kind(tag)?; 169 } 170 Ok(()) 171 } 172 173 fn write_map(&mut self, map: &[(Tag, Tag)]) -> Result<()> { 174 self.write_u32(map.len() as u32)?; 175 for (key, value) in map { 176 self.write_tag(key)?; 177 self.write_tag(value)?; 178 } 179 Ok(()) 180 } 181 182 fn write_timestamp(&mut self, dt: &DateTime<Utc>) -> Result<()> { 183 let timestamp = dt.timestamp_millis(); 184 self.write_i64(timestamp) 185 } 186 187 fn write_uuid(&mut self, uuid: &Uuid) -> Result<()> { 188 self.writer 189 .write_all(uuid.as_bytes()) 190 .map_err(Error::IoError)?; 191 self.position += len::UUID; 192 Ok(()) 193 } 194} 195 196pub struct Writer<W: Write> { 197 inner: W, 198 raw_writer: RawWriter<std::io::Cursor<Vec<u8>>>, 199 compression: CompressionMethod, 200} 201 202impl<W: Write> Writer<W> { 203 pub fn new(writer: W, compression: CompressionMethod) -> Result<Self> { 204 let buffer = std::io::Cursor::new(Vec::new()); 205 let raw_writer = RawWriter::new(buffer, CompressionMethod::None)?; 206 Ok(Self { 207 inner: writer, 208 raw_writer, 209 compression, 210 }) 211 } 212 213 pub fn write_tag(&mut self, tag: &Tag) -> Result<()> { 214 self.raw_writer.write_tag(tag) 215 } 216 217 pub fn finish(mut self) -> Result<W> { 218 let uncompressed = self.raw_writer.writer.into_inner(); 219 let compressed = match self.compression { 220 CompressionMethod::None => uncompressed.clone(), 221 CompressionMethod::Gzip => { 222 #[cfg(feature = "gzip")] 223 { 224 let mut encoder = flate2::GzBuilder::new() 225 .mtime(0) 226 .operating_system(0) 227 .write(Vec::new(), flate2::Compression::fast()); 228 encoder.write_all(&uncompressed).map_err(Error::IoError)?; 229 encoder.finish().map_err(Error::IoError)? 230 } 231 #[cfg(not(feature = "gzip"))] 232 { 233 return Err(Error::UnsupportedCompression( 234 self.compression, 235 )); 236 } 237 } 238 CompressionMethod::Zlib => { 239 #[cfg(feature = "gzip")] 240 { 241 let mut encoder = flate2::write::ZlibEncoder::new( 242 Vec::new(), 243 flate2::Compression::default(), 244 ); 245 encoder.write_all(&uncompressed).map_err(Error::IoError)?; 246 encoder.finish().map_err(Error::IoError)? 247 } 248 #[cfg(not(feature = "gzip"))] 249 { 250 return Err(Error::UnsupportedCompression( 251 self.compression, 252 )); 253 } 254 } 255 CompressionMethod::Lz4 => { 256 #[cfg(feature = "lz4")] 257 { 258 let mut encoder = lz4::EncoderBuilder::new() 259 .level(4) 260 .build(Vec::new()) 261 .map_err(Error::IoError)?; 262 encoder.write_all(&uncompressed).map_err(Error::IoError)?; 263 let (compressed, _) = encoder.finish(); 264 compressed 265 } 266 #[cfg(not(feature = "lz4"))] 267 { 268 return Err(Error::UnsupportedCompression( 269 self.compression, 270 )); 271 } 272 } 273 }; 274 275 let header = FileHeader { 276 version: FileHeader::CURRENT_VERSION, 277 little_endian: true, 278 compression: self.compression, 279 payload_length: compressed.len() as u32, 280 }; 281 282 // write header 283 self.inner 284 .write_all(&FileHeader::MAGIC_BYTES) 285 .map_err(Error::IoError)?; 286 self.inner 287 .write_all(&[header.version]) 288 .map_err(Error::IoError)?; 289 self.inner 290 .write_all(&[!header.little_endian as u8]) 291 .map_err(Error::IoError)?; 292 self.inner 293 .write_all(&[header.compression as u8]) 294 .map_err(Error::IoError)?; 295 let payload_bytes = if header.little_endian { 296 header.payload_length.to_le_bytes() 297 } else { 298 header.payload_length.to_be_bytes() 299 }; 300 self.inner 301 .write_all(&payload_bytes) 302 .map_err(Error::IoError)?; 303 304 // write payload 305 self.inner.write_all(&compressed).map_err(Error::IoError)?; 306 307 Ok(self.inner) 308 } 309}