Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
at pocket 19 kB view raw
1use crate::{Cursor, Did, Nsid, RecordKey}; 2use bincode::{ 3 config::{standard, Config}, 4 de::Decode as BincodeDecode, 5 decode_from_slice, 6 enc::Encode as BincodeEncode, 7 encode_to_vec, 8 error::{DecodeError, EncodeError}, 9}; 10use lsm_tree::range::prefix_to_range; 11use std::fmt; 12use std::marker::PhantomData; 13use std::ops::{Bound, Range}; 14use thiserror::Error; 15 16#[non_exhaustive] 17#[derive(Error, Debug)] 18pub enum EncodingError { 19 #[error("failed to parse Atrium string type: {0}")] 20 BadAtriumStringType(&'static str), 21 #[error("Not enough NSID segments for a usable prefix")] 22 NotEnoughNsidSegments, 23 #[error("failed to bincode-encode: {0}")] 24 BincodeEncodeFailed(#[from] EncodeError), 25 #[error("failed to bincode-decode: {0}")] 26 BincodeDecodeFailed(#[from] DecodeError), 27 #[error("decode missing suffix bytes")] 28 DecodeMissingSuffix, 29 #[error("decode ran out of bytes")] 30 DecodeNotEnoughBytes, 31 #[error("string contained a null byte, which is not allowed, which is annoying, sorry")] 32 StringContainedNull, 33 #[error("string was not terminated with null byte")] 34 UnterminatedString, 35 #[error("could not convert from utf8: {0}")] 36 NotUtf8(#[from] std::str::Utf8Error), 37 #[error("could not convert from utf8: {0}")] 38 NotUtf8String(#[from] std::string::FromUtf8Error), 39 #[error("could not get array from slice: {0}")] 40 BadSlice(#[from] std::array::TryFromSliceError), 41 #[error("wrong static prefix. expected {1:?}, found {0:?}")] 42 WrongStaticPrefix(String, String), // found, expected 43 #[error("failed to deserialize json")] 44 JsonError(#[from] serde_json::Error), 45 #[error("unexpected extra bytes ({0} bytes) left after decoding")] 46 DecodeTooManyBytes(usize), 47 #[error("expected exclusive bound from lsm_tree (likely bug)")] 48 BadRangeBound, 49 #[error("expected a truncated u64 for mod {0}, found remainder: {1}")] 50 InvalidTruncated(u64, u64), 51} 52 53pub type EncodingResult<T> = Result<T, EncodingError>; 54 55fn bincode_conf() -> impl Config { 56 standard() 57 .with_big_endian() 58 .with_fixed_int_encoding() 59 .with_limit::<{ 2_usize.pow(20) }>() // 1MB 60} 61 62pub trait DbBytes { 63 fn to_db_bytes(&self) -> EncodingResult<Vec<u8>>; 64 fn from_db_bytes(bytes: &[u8]) -> Result<(Self, usize), EncodingError> 65 where 66 Self: Sized; 67 fn as_prefix_range_end(&self) -> EncodingResult<Vec<u8>> { 68 let bytes = self.to_db_bytes()?; 69 let (_, Bound::Excluded(range_end)) = prefix_to_range(&bytes) else { 70 return Err(EncodingError::BadRangeBound); 71 }; 72 Ok(range_end.to_vec()) 73 } 74} 75 76pub trait SubPrefixBytes<T> { 77 fn sub_prefix(input: T) -> EncodingResult<Vec<u8>>; 78} 79 80#[derive(PartialEq)] 81pub struct DbConcat<P: DbBytes, S: DbBytes> { 82 pub prefix: P, 83 pub suffix: S, 84} 85 86impl<P: DbBytes + PartialEq + std::fmt::Debug, S: DbBytes + PartialEq + std::fmt::Debug> 87 DbConcat<P, S> 88{ 89 pub fn from_pair(prefix: P, suffix: S) -> Self { 90 Self { prefix, suffix } 91 } 92 pub fn from_prefix_to_db_bytes(prefix: &P) -> EncodingResult<Vec<u8>> { 93 prefix.to_db_bytes() 94 } 95 pub fn to_prefix_db_bytes(&self) -> EncodingResult<Vec<u8>> { 96 self.prefix.to_db_bytes() 97 } 98 pub fn prefix_range_end(prefix: &P) -> EncodingResult<Vec<u8>> { 99 prefix.as_prefix_range_end() 100 } 101 pub fn range_end(&self) -> EncodingResult<Vec<u8>> { 102 Self::prefix_range_end(&self.prefix) 103 } 104 pub fn range(&self) -> Result<Range<Vec<u8>>, EncodingError> { 105 let prefix_bytes = self.prefix.to_db_bytes()?; 106 let (Bound::Included(start), Bound::Excluded(end)) = prefix_to_range(&prefix_bytes) else { 107 return Err(EncodingError::BadRangeBound); 108 }; 109 Ok(start.to_vec()..end.to_vec()) 110 } 111 pub fn range_to_prefix_end(&self) -> Result<Range<Vec<u8>>, EncodingError> { 112 Ok(self.to_db_bytes()?..self.range_end()?) 113 } 114} 115 116impl<P: DbBytes + Default, S: DbBytes + Default> Default for DbConcat<P, S> { 117 fn default() -> Self { 118 Self { 119 prefix: Default::default(), 120 suffix: Default::default(), 121 } 122 } 123} 124 125impl<P: DbBytes + std::fmt::Debug, S: DbBytes + std::fmt::Debug> fmt::Debug for DbConcat<P, S> { 126 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 127 write!(f, "DbConcat<{:?} || {:?}>", self.prefix, self.suffix) 128 } 129} 130 131impl<P: DbBytes, S: DbBytes> DbBytes for DbConcat<P, S> { 132 fn to_db_bytes(&self) -> EncodingResult<Vec<u8>> { 133 let mut combined = self.prefix.to_db_bytes()?; 134 combined.append(&mut self.suffix.to_db_bytes()?); 135 Ok(combined) 136 } 137 fn from_db_bytes(bytes: &[u8]) -> Result<(Self, usize), EncodingError> 138 where 139 Self: Sized, 140 { 141 let (prefix, eaten) = P::from_db_bytes(bytes)?; 142 assert!( 143 eaten <= bytes.len(), 144 "eaten({}) < len({})", 145 eaten, 146 bytes.len() 147 ); 148 let Some(suffix_bytes) = bytes.get(eaten..) else { 149 return Err(EncodingError::DecodeMissingSuffix); 150 }; 151 if suffix_bytes.is_empty() { 152 return Err(EncodingError::DecodeMissingSuffix); 153 }; 154 let (suffix, also_eaten) = S::from_db_bytes(suffix_bytes)?; 155 assert!( 156 also_eaten <= suffix_bytes.len(), 157 "also eaten({}) < suffix len({})", 158 also_eaten, 159 suffix_bytes.len() 160 ); 161 Ok((Self { prefix, suffix }, eaten + also_eaten)) 162 } 163} 164 165#[derive(Debug, Default, PartialEq)] 166pub struct DbEmpty(()); 167impl DbBytes for DbEmpty { 168 fn to_db_bytes(&self) -> EncodingResult<Vec<u8>> { 169 Ok(vec![]) 170 } 171 fn from_db_bytes(_: &[u8]) -> Result<(Self, usize), EncodingError> { 172 Ok((Self(()), 0)) 173 } 174} 175 176pub trait StaticStr { 177 fn static_str() -> &'static str; 178} 179 180#[derive(PartialEq)] 181pub struct DbStaticStr<S: StaticStr> { 182 marker: PhantomData<S>, 183} 184impl<S: StaticStr> Default for DbStaticStr<S> { 185 fn default() -> Self { 186 Self { 187 marker: PhantomData, 188 } 189 } 190} 191impl<S: StaticStr> fmt::Debug for DbStaticStr<S> { 192 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 193 write!(f, "DbStaticStr({:?})", S::static_str()) 194 } 195} 196impl<S: StaticStr> DbBytes for DbStaticStr<S> { 197 fn to_db_bytes(&self) -> EncodingResult<Vec<u8>> { 198 S::static_str().to_string().to_db_bytes() 199 } 200 fn from_db_bytes(bytes: &[u8]) -> Result<(Self, usize), EncodingError> { 201 let (prefix, eaten) = String::from_db_bytes(bytes)?; 202 if prefix != S::static_str() { 203 return Err(EncodingError::WrongStaticPrefix( 204 prefix, 205 S::static_str().to_string(), 206 )); 207 } 208 Ok(( 209 Self { 210 marker: PhantomData, 211 }, 212 eaten, 213 )) 214 } 215} 216 217/// marker trait: impl on a type to indicate that that DbBytes should use bincode on it 218pub trait UseBincodePlz {} 219 220impl<T> DbBytes for T 221where 222 T: BincodeEncode + BincodeDecode<()> + UseBincodePlz + Sized + std::fmt::Debug, 223{ 224 fn to_db_bytes(&self) -> EncodingResult<Vec<u8>> { 225 Ok(encode_to_vec(self, bincode_conf())?) 226 } 227 fn from_db_bytes(bytes: &[u8]) -> Result<(Self, usize), EncodingError> { 228 Ok(decode_from_slice(bytes, bincode_conf())?) 229 } 230} 231 232/// helper trait: impl on a type to get helpers to implement DbBytes 233pub trait SerdeBytes: serde::Serialize + for<'a> serde::Deserialize<'a> { 234 fn to_bytes(&self) -> EncodingResult<Vec<u8>> 235 where 236 Self: std::fmt::Debug, 237 { 238 Ok(bincode::serde::encode_to_vec(self, bincode_conf())?) 239 } 240 fn from_bytes(bytes: &[u8]) -> Result<(Self, usize), EncodingError> { 241 Ok(bincode::serde::decode_from_slice(bytes, bincode_conf())?) 242 } 243} 244 245////// 246 247impl<const N: usize> UseBincodePlz for [u8; N] {} 248 249// bare bytes (NOT prefix-encoded!) 250impl DbBytes for Vec<u8> { 251 fn to_db_bytes(&self) -> EncodingResult<Vec<u8>> { 252 Ok(self.to_vec()) 253 } 254 // greedy, consumes ALL remaining bytes 255 fn from_db_bytes(bytes: &[u8]) -> Result<(Self, usize), EncodingError> { 256 Ok((bytes.to_owned(), bytes.len())) 257 } 258} 259 260/// Lexicographic-sort-friendly null-terminating serialization for String 261/// 262/// Null bytes technically can appear within utf-8 strings. Currently we will just bail in that case. 263/// 264/// In the future, null bytes could be escaped, or maybe this becomes SLIP-encoded. Either should be 265/// backwards-compatible I think. 266/// 267/// TODO: wrap in another type. it's actually probably not desirable to serialize strings this way 268/// *except* where needed as a prefix. 269impl DbBytes for String { 270 fn to_db_bytes(&self) -> EncodingResult<Vec<u8>> { 271 let mut v = self.as_bytes().to_vec(); 272 if v.contains(&0x00) { 273 return Err(EncodingError::StringContainedNull); 274 } 275 v.push(0x00); 276 Ok(v) 277 } 278 fn from_db_bytes(bytes: &[u8]) -> Result<(Self, usize), EncodingError> { 279 for (i, byte) in bytes.iter().enumerate() { 280 if *byte == 0x00 { 281 let (string_bytes, _) = bytes.split_at(i); 282 let s = std::str::from_utf8(string_bytes)?; 283 return Ok((s.to_string(), i + 1)); // +1 for the null byte 284 } 285 } 286 Err(EncodingError::UnterminatedString) 287 } 288} 289 290impl SubPrefixBytes<&str> for String { 291 fn sub_prefix(input: &str) -> EncodingResult<Vec<u8>> { 292 let v = input.as_bytes(); 293 if v.contains(&0x00) { 294 return Err(EncodingError::StringContainedNull); 295 } 296 // NO null terminator!! 297 Ok(v.to_vec()) 298 } 299} 300 301impl DbBytes for Did { 302 fn from_db_bytes(bytes: &[u8]) -> Result<(Self, usize), EncodingError> { 303 let (s, n) = decode_from_slice(bytes, bincode_conf())?; 304 let me = Self::new(s).map_err(EncodingError::BadAtriumStringType)?; 305 Ok((me, n)) 306 } 307 fn to_db_bytes(&self) -> EncodingResult<Vec<u8>> { 308 Ok(encode_to_vec(self.as_ref(), bincode_conf())?) 309 } 310} 311 312impl DbBytes for Nsid { 313 fn from_db_bytes(bytes: &[u8]) -> Result<(Self, usize), EncodingError> { 314 let (s, n) = String::from_db_bytes(bytes)?; // null-terminated DbBytes impl!! 315 let me = Self::new(s).map_err(EncodingError::BadAtriumStringType)?; 316 Ok((me, n)) 317 } 318 fn to_db_bytes(&self) -> EncodingResult<Vec<u8>> { 319 String::to_db_bytes(&self.to_string()) // null-terminated DbBytes impl!!!! 320 } 321} 322impl SubPrefixBytes<&str> for Nsid { 323 fn sub_prefix(input: &str) -> EncodingResult<Vec<u8>> { 324 String::sub_prefix(input) 325 } 326} 327 328impl DbBytes for RecordKey { 329 fn from_db_bytes(bytes: &[u8]) -> Result<(Self, usize), EncodingError> { 330 let (s, n) = decode_from_slice(bytes, bincode_conf())?; 331 let me = Self::new(s).map_err(EncodingError::BadAtriumStringType)?; 332 Ok((me, n)) 333 } 334 fn to_db_bytes(&self) -> EncodingResult<Vec<u8>> { 335 Ok(encode_to_vec(self.as_ref(), bincode_conf())?) 336 } 337} 338 339impl DbBytes for Cursor { 340 fn to_db_bytes(&self) -> EncodingResult<Vec<u8>> { 341 Ok(self.to_raw_u64().to_be_bytes().to_vec()) 342 } 343 fn from_db_bytes(bytes: &[u8]) -> Result<(Self, usize), EncodingError> { 344 if bytes.len() < 8 { 345 return Err(EncodingError::DecodeNotEnoughBytes); 346 } 347 let bytes8 = TryInto::<[u8; 8]>::try_into(&bytes[..8])?; 348 let cursor = Cursor::from_raw_u64(u64::from_be_bytes(bytes8)); 349 Ok((cursor, 8)) 350 } 351} 352 353impl DbBytes for serde_json::Value { 354 fn to_db_bytes(&self) -> EncodingResult<Vec<u8>> { 355 self.to_string().to_db_bytes() 356 } 357 fn from_db_bytes(bytes: &[u8]) -> Result<(Self, usize), EncodingError> { 358 let (s, n) = String::from_db_bytes(bytes)?; 359 let v = s.parse()?; 360 Ok((v, n)) 361 } 362} 363 364pub fn db_complete<T: DbBytes>(bytes: &[u8]) -> Result<T, EncodingError> { 365 let (t, n) = T::from_db_bytes(bytes)?; 366 if n < bytes.len() { 367 return Err(EncodingError::DecodeTooManyBytes(bytes.len() - n)); 368 } 369 Ok(t) 370} 371 372#[cfg(test)] 373mod test { 374 use super::{ 375 Cursor, DbBytes, DbConcat, DbEmpty, DbStaticStr, EncodingResult, Nsid, StaticStr, 376 SubPrefixBytes, 377 }; 378 379 #[test] 380 fn test_db_empty() -> EncodingResult<()> { 381 let original = DbEmpty::default(); 382 let serialized = original.to_db_bytes()?; 383 assert_eq!(serialized.len(), 0); 384 let (restored, bytes_consumed) = DbEmpty::from_db_bytes(&serialized)?; 385 assert_eq!(restored, original); 386 assert_eq!(bytes_consumed, 0); 387 Ok(()) 388 } 389 390 #[test] 391 fn test_string_roundtrip() -> EncodingResult<()> { 392 for (case, desc) in [ 393 ("", "empty string"), 394 ("a", "basic string"), 395 ("asdf asdf asdf even µnicode", "unicode string"), 396 ] { 397 let serialized = case.to_string().to_db_bytes()?; 398 let (restored, bytes_consumed) = String::from_db_bytes(&serialized)?; 399 assert_eq!(&restored, case, "string round-trip: {desc}"); 400 assert_eq!( 401 bytes_consumed, 402 serialized.len(), 403 "exact bytes consumed for round-trip: {desc}" 404 ); 405 } 406 Ok(()) 407 } 408 409 #[test] 410 fn test_string_serialized_lexicographic_sort() -> EncodingResult<()> { 411 let aa = "aa".to_string().to_db_bytes()?; 412 let b = "b".to_string().to_db_bytes()?; 413 assert!(b > aa); 414 Ok(()) 415 } 416 417 #[test] 418 fn test_nullstring_can_prefix() -> EncodingResult<()> { 419 for (s, pre, is_pre, desc) in [ 420 ("", "", true, "empty strings"), 421 ("", "a", false, "longer prefix"), 422 ("a", "", true, "empty prefix matches"), 423 ("a", "a", true, "whole string matches"), 424 ("a", "b", false, "entirely different"), 425 ("ab", "a", true, "prefix matches"), 426 ("ab", "b", false, "shorter and entirely different"), 427 ] { 428 let serialized = s.to_string().to_db_bytes()?; 429 let prefixed = String::sub_prefix(pre)?; 430 assert_eq!(serialized.starts_with(&prefixed), is_pre, "{desc}"); 431 } 432 Ok(()) 433 } 434 435 #[test] 436 fn test_nsid_can_prefix() -> EncodingResult<()> { 437 for (s, pre, is_pre, desc) in [ 438 ("ab.cd.ef", "", true, "empty prefix"), 439 ("ab.cd.ef", "a", true, "tiny prefix"), 440 ("ab.cd.ef", "abc", false, "bad prefix"), 441 ("ab.cd.ef", "ab", true, "segment prefix"), 442 ("ab.cd.ef", "ab.cd", true, "multi-segment prefix"), 443 ("ab.cd.ef", "ab.cd.ef", true, "full match"), 444 ("ab.cd.ef", "ab.cd.ef.g", false, "prefix longer"), 445 ] { 446 let serialized = Nsid::new(s.to_string()).unwrap().to_db_bytes()?; 447 let prefixed = Nsid::sub_prefix(pre)?; 448 assert_eq!(serialized.starts_with(&prefixed), is_pre, "{desc}"); 449 } 450 Ok(()) 451 } 452 453 #[test] 454 fn test_string_cursor_prefix_roundtrip() -> EncodingResult<()> { 455 type TwoThings = DbConcat<String, Cursor>; 456 for (lazy_prefix, tired_suffix, desc) in [ 457 ("", 0, "empty string and cursor"), 458 ("aaa", 0, "zero-cursor"), 459 ("", 1234, "empty string"), 460 ("aaaaa", 789, "string and cursor"), 461 ] { 462 let original = TwoThings { 463 prefix: lazy_prefix.to_string(), 464 suffix: Cursor::from_raw_u64(tired_suffix), 465 }; 466 let serialized = original.to_db_bytes()?; 467 let (restored, bytes_consumed) = TwoThings::from_db_bytes(&serialized)?; 468 assert_eq!(restored, original, "round-trip: {desc}"); 469 assert_eq!( 470 bytes_consumed, 471 serialized.len(), 472 "exact bytes consumed for round-trip: {desc}" 473 ); 474 } 475 Ok(()) 476 } 477 478 #[test] 479 fn test_cursor_string_prefix_roundtrip() -> EncodingResult<()> { 480 type TwoThings = DbConcat<Cursor, String>; 481 for (tired_prefix, sad_suffix, desc) in [ 482 (0, "", "empty string and cursor"), 483 (0, "aaa", "zero-cursor"), 484 (1234, "", "empty string"), 485 (789, "aaaaa", "string and cursor"), 486 ] { 487 let original = TwoThings { 488 prefix: Cursor::from_raw_u64(tired_prefix), 489 suffix: sad_suffix.to_string(), 490 }; 491 let serialized = original.to_db_bytes()?; 492 let (restored, bytes_consumed) = TwoThings::from_db_bytes(&serialized)?; 493 assert_eq!(restored, original, "round-trip: {desc}"); 494 assert_eq!( 495 bytes_consumed, 496 serialized.len(), 497 "exact bytes consumed for round-trip: {desc}" 498 ); 499 } 500 Ok(()) 501 } 502 503 #[test] 504 fn test_static_str() -> EncodingResult<()> { 505 #[derive(Debug, PartialEq)] 506 struct AStaticStr {} 507 impl StaticStr for AStaticStr { 508 fn static_str() -> &'static str { 509 "a static str" 510 } 511 } 512 type ADbStaticStr = DbStaticStr<AStaticStr>; 513 514 let original = ADbStaticStr::default(); 515 let serialized = original.to_db_bytes()?; 516 let (restored, bytes_consumed) = ADbStaticStr::from_db_bytes(&serialized)?; 517 assert_eq!(restored, original); 518 assert_eq!(bytes_consumed, serialized.len()); 519 assert!(serialized.starts_with("a static str".as_bytes())); 520 521 Ok(()) 522 } 523 524 #[test] 525 fn test_static_str_empty() -> EncodingResult<()> { 526 #[derive(Debug, PartialEq)] 527 struct AnEmptyStr {} 528 impl StaticStr for AnEmptyStr { 529 fn static_str() -> &'static str { 530 "" 531 } 532 } 533 type ADbEmptyStr = DbStaticStr<AnEmptyStr>; 534 let original = ADbEmptyStr::default(); 535 let serialized = original.to_db_bytes()?; 536 let (restored, bytes_consumed) = ADbEmptyStr::from_db_bytes(&serialized)?; 537 assert_eq!(restored, original); 538 assert_eq!(bytes_consumed, serialized.len()); 539 assert_eq!(serialized, &[0x00]); 540 541 Ok(()) 542 } 543 544 #[test] 545 fn test_static_prefix() -> EncodingResult<()> { 546 #[derive(Debug, PartialEq)] 547 struct AStaticPrefix {} 548 impl StaticStr for AStaticPrefix { 549 fn static_str() -> &'static str { 550 "a static prefix" 551 } 552 } 553 type ADbStaticPrefix = DbStaticStr<AStaticPrefix>; 554 555 type PrefixedCursor = DbConcat<ADbStaticPrefix, Cursor>; 556 557 let original = PrefixedCursor { 558 prefix: Default::default(), 559 suffix: Cursor::from_raw_u64(123), 560 }; 561 let serialized = original.to_db_bytes()?; 562 let (restored, bytes_consumed) = PrefixedCursor::from_db_bytes(&serialized)?; 563 assert_eq!(restored, original); 564 assert_eq!(bytes_consumed, serialized.len()); 565 assert_eq!(restored.suffix.to_raw_u64(), 123); 566 assert!(serialized.starts_with("a static prefix".as_bytes())); 567 568 Ok(()) 569 } 570}