tracks lexicons and how many times they appeared on the jetstream

fix(server): use deser to access item data because zero-copy is not guaranteed

ptr.pet 84b0f5ef 48460bf4

verified
Changed files
+37 -19
server
+2
.gitignore
···
result
server/bsky_zstd_dictionary
···
result
server/bsky_zstd_dictionary
+
fjall_data*
+
.fjall_data*
+1 -1
server/src/api.rs
···
for maybe_hit in maybe_hits {
let hit = maybe_hit?;
-
let hit_data = hit.access();
hits.push(Hit {
timestamp: hit.timestamp,
···
for maybe_hit in maybe_hits {
let hit = maybe_hit?;
+
let hit_data = hit.deser()?;
hits.push(Hit {
timestamp: hit.timestamp,
+33 -17
server/src/db/block.rs
···
};
use rkyv::{
-
Archive, Serialize, api::high::HighSerializer, rancor, ser::allocator::ArenaHandle,
util::AlignedVec,
};
-
use crate::utils::{ReadVariableExt, WriteVariableExt};
pub struct Item<T> {
pub timestamp: u64,
-
data: AlignedVec,
phantom: PhantomData<T>,
}
-
impl<T: Archive> Item<T> {
-
pub fn access(&self) -> &T::Archived {
-
unsafe { rkyv::access_unchecked::<T::Archived>(&self.data) }
}
}
···
let decoded_item = decoder.decode().unwrap().unwrap();
assert_eq!(decoded_item.timestamp, 1000);
-
let decoded_data = decoded_item.access();
assert_eq!(decoded_data.id, 123);
assert_eq!(decoded_data.value.as_str(), "test");
}
···
for (original, decoded) in items.iter().zip(decoded_items.iter()) {
assert_eq!(original.timestamp, decoded.timestamp);
-
assert_eq!(original.access().id, decoded.access().id);
assert_eq!(
-
original.access().value.as_str(),
-
decoded.access().value.as_str()
);
}
}
···
assert_eq!(decoded_items[1].timestamp, 2005);
assert_eq!(decoded_items[2].timestamp, 2012);
-
assert_eq!(decoded_items[0].access().id, 10);
-
assert_eq!(decoded_items[1].access().id, 20);
-
assert_eq!(decoded_items[2].access().id, 30);
}
#[test]
···
for (original, decoded) in items.iter().zip(decoded_items.iter()) {
assert_eq!(original.timestamp, decoded.timestamp);
-
assert_eq!(original.access().id, decoded.access().id);
}
}
···
let decoded_items = decoded_items.unwrap();
assert_eq!(decoded_items.len(), 2);
-
assert_eq!(decoded_items[0].access().value.as_str(), "x");
-
assert_eq!(decoded_items[1].access().value.len(), 1000);
-
assert_eq!(decoded_items[1].access().value.as_str(), "a".repeat(1000));
}
}
···
};
use rkyv::{
+
Archive, Deserialize, Serialize,
+
api::high::{HighSerializer, HighValidator},
+
bytecheck::CheckBytes,
+
de::Pool,
+
rancor::{self, Strategy},
+
ser::allocator::ArenaHandle,
util::AlignedVec,
};
+
use crate::{
+
error::{AppError, AppResult},
+
utils::{ReadVariableExt, WriteVariableExt},
+
};
pub struct Item<T> {
pub timestamp: u64,
+
pub data: AlignedVec,
phantom: PhantomData<T>,
}
+
impl<T> Item<T>
+
where
+
T: Archive,
+
T::Archived: for<'a> CheckBytes<HighValidator<'a, rancor::Error>>
+
+ Deserialize<T, Strategy<Pool, rancor::Error>>,
+
{
+
pub fn deser(&self) -> AppResult<T> {
+
rkyv::from_bytes(&self.data).map_err(AppError::from)
}
}
···
let decoded_item = decoder.decode().unwrap().unwrap();
assert_eq!(decoded_item.timestamp, 1000);
+
let decoded_data = decoded_item.deser().unwrap();
assert_eq!(decoded_data.id, 123);
assert_eq!(decoded_data.value.as_str(), "test");
}
···
for (original, decoded) in items.iter().zip(decoded_items.iter()) {
assert_eq!(original.timestamp, decoded.timestamp);
+
assert_eq!(original.deser().unwrap().id, decoded.deser().unwrap().id);
assert_eq!(
+
original.deser().unwrap().value.as_str(),
+
decoded.deser().unwrap().value.as_str()
);
}
}
···
assert_eq!(decoded_items[1].timestamp, 2005);
assert_eq!(decoded_items[2].timestamp, 2012);
+
assert_eq!(decoded_items[0].deser().unwrap().id, 10);
+
assert_eq!(decoded_items[1].deser().unwrap().id, 20);
+
assert_eq!(decoded_items[2].deser().unwrap().id, 30);
}
#[test]
···
for (original, decoded) in items.iter().zip(decoded_items.iter()) {
assert_eq!(original.timestamp, decoded.timestamp);
+
assert_eq!(original.deser().unwrap().id, decoded.deser().unwrap().id);
}
}
···
let decoded_items = decoded_items.unwrap();
assert_eq!(decoded_items.len(), 2);
+
assert_eq!(decoded_items[0].deser().unwrap().value.as_str(), "x");
+
assert_eq!(decoded_items[1].deser().unwrap().value.len(), 1000);
+
assert_eq!(
+
decoded_items[1].deser().unwrap().value.as_str(),
+
"a".repeat(1000)
+
);
}
}
+1 -1
server/src/main.rs
···
EventRecord {
nsid: nsid.to_smolstr(),
timestamp: hit.timestamp,
-
deleted: hit.access().deleted,
}
}))
.expect("cant record event");
···
EventRecord {
nsid: nsid.to_smolstr(),
timestamp: hit.timestamp,
+
deleted: hit.deser().unwrap().deleted,
}
}))
.expect("cant record event");