tracks lexicons and how many times they appeared on the jetstream
1use std::{ 2 ops::{Bound, Deref, RangeBounds}, 3 path::Path, 4 time::Duration, 5}; 6 7use fjall::{Config, Keyspace, Partition, PartitionCreateOptions}; 8use pingora_limits::rate::Rate; 9use rkyv::{Archive, Deserialize, Serialize, rancor::Error}; 10use smol_str::SmolStr; 11use tokio::sync::broadcast; 12 13use crate::{ 14 error::{AppError, AppResult}, 15 jetstream::JetstreamEvent, 16}; 17 18#[derive(Clone, Debug, Default, Archive, Deserialize, Serialize, PartialEq)] 19#[rkyv(compare(PartialEq), derive(Debug))] 20pub struct NsidCounts { 21 pub count: u128, 22 pub deleted_count: u128, 23 pub last_seen: u64, 24} 25 26#[derive(Debug, Default, Archive, Deserialize, Serialize, PartialEq)] 27#[rkyv(compare(PartialEq), derive(Debug))] 28pub struct NsidHit { 29 pub deleted: bool, 30} 31 32pub struct EventRecord { 33 pub nsid: SmolStr, 34 pub timestamp: u64, 35 pub deleted: bool, 36} 37 38impl EventRecord { 39 pub fn from_jetstream(event: JetstreamEvent) -> Option<Self> { 40 match event { 41 JetstreamEvent::Commit { 42 time_us, commit, .. 43 } => Some(Self { 44 nsid: commit.collection.into(), 45 timestamp: time_us, 46 deleted: false, 47 }), 48 JetstreamEvent::Delete { 49 time_us, commit, .. 50 } => Some(Self { 51 nsid: commit.collection.into(), 52 timestamp: time_us, 53 deleted: true, 54 }), 55 _ => None, 56 } 57 } 58} 59 60// counts is nsid -> NsidCounts 61// hits is tree per nsid: timestamp -> NsidHit 62pub struct Db { 63 inner: Keyspace, 64 hits: papaya::HashMap<SmolStr, Partition>, 65 counts: Partition, 66 event_broadcaster: broadcast::Sender<(SmolStr, NsidCounts)>, 67 eps: Rate, 68} 69 70impl Db { 71 pub fn new(path: impl AsRef<Path>) -> AppResult<Self> { 72 tracing::info!("opening db..."); 73 let ks = Config::new(path) 74 .cache_size(8 * 1024 * 1024) // from talna 75 .open()?; 76 Ok(Self { 77 hits: Default::default(), 78 counts: ks.open_partition( 79 "_counts", 80 PartitionCreateOptions::default().compression(fjall::CompressionType::None), 81 )?, 82 inner: ks, 83 event_broadcaster: broadcast::channel(1000).0, 84 eps: Rate::new(Duration::from_secs(1)), 85 }) 86 } 87 88 pub fn eps(&self) -> usize { 89 self.eps.rate(&()) as usize 90 } 91 92 pub fn new_listener(&self) -> broadcast::Receiver<(SmolStr, NsidCounts)> { 93 self.event_broadcaster.subscribe() 94 } 95 96 #[inline(always)] 97 fn run_in_nsid_tree<T>( 98 &self, 99 nsid: &str, 100 f: impl FnOnce(&Partition) -> AppResult<T>, 101 ) -> AppResult<T> { 102 f(self.hits.pin().get_or_insert_with(SmolStr::new(nsid), || { 103 let opts = PartitionCreateOptions::default() 104 .compression(fjall::CompressionType::Miniz(9)) 105 .compaction_strategy(fjall::compaction::Strategy::Fifo(fjall::compaction::Fifo { 106 limit: 5 * 1024 * 1024 * 1024, // 5 gb 107 ttl_seconds: Some(60 * 60 * 24 * 30), // 30 days 108 })); 109 self.inner.open_partition(nsid, opts).unwrap() 110 })) 111 } 112 113 pub fn record_event(&self, e: EventRecord) -> AppResult<()> { 114 let EventRecord { 115 nsid, 116 timestamp, 117 deleted, 118 } = e; 119 120 self.insert_event(&nsid, timestamp, deleted)?; 121 // increment count 122 let mut counts = self.get_count(&nsid)?; 123 counts.last_seen = timestamp; 124 if deleted { 125 counts.deleted_count += 1; 126 } else { 127 counts.count += 1; 128 } 129 self.insert_count(&nsid, counts.clone())?; 130 if self.event_broadcaster.receiver_count() > 0 { 131 let _ = self.event_broadcaster.send((SmolStr::new(&nsid), counts)); 132 } 133 self.eps.observe(&(), 1); 134 Ok(()) 135 } 136 137 #[inline(always)] 138 fn insert_event(&self, nsid: &str, timestamp: u64, deleted: bool) -> AppResult<()> { 139 self.run_in_nsid_tree(nsid, |tree| { 140 tree.insert( 141 timestamp.to_be_bytes(), 142 unsafe { rkyv::to_bytes::<Error>(&NsidHit { deleted }).unwrap_unchecked() } 143 .as_slice(), 144 ) 145 .map_err(AppError::from) 146 }) 147 } 148 149 #[inline(always)] 150 fn insert_count(&self, nsid: &str, counts: NsidCounts) -> AppResult<()> { 151 self.counts 152 .insert( 153 nsid, 154 unsafe { rkyv::to_bytes::<Error>(&counts).unwrap_unchecked() }.as_slice(), 155 ) 156 .map_err(AppError::from) 157 } 158 159 pub fn get_count(&self, nsid: &str) -> AppResult<NsidCounts> { 160 let Some(raw) = self.counts.get(nsid)? else { 161 return Ok(NsidCounts::default()); 162 }; 163 Ok(unsafe { rkyv::from_bytes_unchecked::<_, Error>(&raw).unwrap_unchecked() }) 164 } 165 166 pub fn get_counts(&self) -> impl Iterator<Item = AppResult<(SmolStr, NsidCounts)>> { 167 self.counts.iter().map(|res| { 168 res.map_err(AppError::from).map(|(key, val)| { 169 ( 170 SmolStr::new(unsafe { str::from_utf8_unchecked(&key) }), 171 unsafe { rkyv::from_bytes_unchecked::<_, Error>(&val).unwrap_unchecked() }, 172 ) 173 }) 174 }) 175 } 176 177 pub fn get_nsids(&self) -> impl Iterator<Item = impl Deref<Target = str>> { 178 self.inner 179 .list_partitions() 180 .into_iter() 181 .filter(|k| k.deref() != "_counts") 182 } 183 184 pub fn get_hits( 185 &self, 186 nsid: &str, 187 range: impl RangeBounds<u64>, 188 ) -> AppResult<Box<dyn Iterator<Item = AppResult<(u64, NsidHit)>>>> { 189 let start = range.start_bound().cloned().map(u64::to_be_bytes); 190 let end = range.end_bound().cloned().map(u64::to_be_bytes); 191 192 let _guard = self.hits.guard(); 193 let Some(tree) = self.hits.get(nsid, &_guard) else { 194 return Ok(Box::new(std::iter::empty())); 195 }; 196 197 Ok(Box::new(tree.range(TimestampRange { start, end }).map( 198 |res| { 199 res.map_err(AppError::from).map(|(key, val)| { 200 ( 201 u64::from_be_bytes(key.as_ref().try_into().unwrap()), 202 unsafe { rkyv::from_bytes_unchecked::<_, Error>(&val).unwrap_unchecked() }, 203 ) 204 }) 205 }, 206 ))) 207 } 208} 209 210type TimestampRepr = [u8; 8]; 211 212struct TimestampRange { 213 start: Bound<TimestampRepr>, 214 end: Bound<TimestampRepr>, 215} 216 217impl RangeBounds<TimestampRepr> for TimestampRange { 218 #[inline(always)] 219 fn start_bound(&self) -> Bound<&TimestampRepr> { 220 self.start.as_ref() 221 } 222 223 #[inline(always)] 224 fn end_bound(&self) -> Bound<&TimestampRepr> { 225 self.end.as_ref() 226 } 227}