tracks lexicons and how many times they appeared on the jetstream
at migrate 9.4 kB view raw
1use std::io::{self, Read, Write}; 2use std::sync::atomic::{AtomicU64, Ordering}; 3use std::time::Duration; 4 5use byteview::ByteView; 6use ordered_varint::Variable; 7 8pub fn get_time() -> Duration { 9 std::time::SystemTime::now() 10 .duration_since(std::time::UNIX_EPOCH) 11 .unwrap() 12} 13 14pub trait WriteVariableExt: Write { 15 fn write_varint(&mut self, value: impl Variable) -> io::Result<usize> { 16 value.encode_variable(self) 17 } 18} 19impl<W: Write> WriteVariableExt for W {} 20 21pub trait ReadVariableExt: Read { 22 fn read_varint<T: Variable>(&mut self) -> io::Result<T> { 23 T::decode_variable(self) 24 } 25} 26impl<R: Read> ReadVariableExt for R {} 27 28pub struct WritableByteView { 29 view: ByteView, 30 written: usize, 31} 32 33impl WritableByteView { 34 // returns None if the view already has a reference to it 35 pub fn with_size(capacity: usize) -> Self { 36 Self { 37 view: ByteView::with_size(capacity), 38 written: 0, 39 } 40 } 41 42 #[inline(always)] 43 pub fn into_inner(self) -> ByteView { 44 self.view 45 } 46} 47 48impl Write for WritableByteView { 49 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> { 50 let len = buf.len(); 51 if len > self.view.len() - self.written { 52 return Err(std::io::Error::new( 53 std::io::ErrorKind::StorageFull, 54 "buffer full", 55 )); 56 } 57 // SAFETY: this is safe because we have checked that the buffer is not full 58 // SAFETY: we own the mutator so no other references to the view exist 59 unsafe { 60 std::ptr::copy_nonoverlapping( 61 buf.as_ptr(), 62 self.view 63 .get_mut() 64 .unwrap_unchecked() 65 .as_mut_ptr() 66 .add(self.written), 67 len, 68 ); 69 self.written += len; 70 } 71 Ok(len) 72 } 73 74 #[inline(always)] 75 fn flush(&mut self) -> std::io::Result<()> { 76 Ok(()) 77 } 78} 79 80pub fn varints_unsigned_encoded<const N: usize>(values: [u64; N]) -> ByteView { 81 let mut buf = 82 WritableByteView::with_size(values.into_iter().map(varint_unsigned_encoded_len).sum()); 83 for value in values { 84 // cant fail 85 let _ = buf.write_varint(value); 86 } 87 buf.into_inner() 88} 89 90// gets the encoded length of a varint-encoded unsigned integer 91// see ordered_varint 92pub fn varint_unsigned_encoded_len(value: u64) -> usize { 93 let value = value.to_be_bytes(); 94 value 95 .iter() 96 .enumerate() 97 .find_map(|(index, &byte)| { 98 (byte > 0).then(|| { 99 let extra_bytes = 7 - index; 100 (byte < 16) 101 .then(|| extra_bytes + 1) 102 .unwrap_or_else(|| extra_bytes + 2) 103 }) 104 }) 105 .unwrap_or(0) 106 .max(1) 107} 108 109pub static CLOCK: std::sync::LazyLock<quanta::Clock> = 110 std::sync::LazyLock::new(|| quanta::Clock::new()); 111 112/// simple thread-safe rate tracker using time buckets 113/// divides time into fixed buckets and rotates through them 114#[derive(Debug)] 115pub struct RateTracker<const BUCKET_WINDOW: u64> { 116 buckets: Vec<AtomicU64>, 117 last_bucket_time: AtomicU64, 118 bucket_duration_nanos: u64, 119 window_duration: Duration, 120 start_time: u64, // raw time when tracker was created 121} 122 123pub type DefaultRateTracker = RateTracker<1000>; 124 125impl<const BUCKET_WINDOW: u64> RateTracker<BUCKET_WINDOW> { 126 /// create a new rate tracker with the specified time window 127 pub fn new(window_duration: Duration) -> Self { 128 let bucket_duration_nanos = Duration::from_millis(BUCKET_WINDOW).as_nanos() as u64; 129 let num_buckets = 130 (window_duration.as_nanos() as u64 / bucket_duration_nanos).max(1) as usize; 131 132 let mut buckets = Vec::with_capacity(num_buckets); 133 for _ in 0..num_buckets { 134 buckets.push(AtomicU64::new(0)); 135 } 136 137 let start_time = CLOCK.raw(); 138 Self { 139 buckets, 140 bucket_duration_nanos, 141 window_duration, 142 last_bucket_time: AtomicU64::new(0), 143 start_time, 144 } 145 } 146 147 #[inline(always)] 148 fn elapsed(&self) -> u64 { 149 CLOCK.delta_as_nanos(self.start_time, CLOCK.raw()) 150 } 151 152 /// record an event 153 pub fn observe(&self, count: u64) { 154 self.maybe_advance_buckets(); 155 156 let bucket_index = self.get_current_bucket_index(); 157 self.buckets[bucket_index].fetch_add(count, Ordering::Relaxed); 158 } 159 160 /// get the current rate in events per second 161 pub fn rate(&self) -> f64 { 162 self.maybe_advance_buckets(); 163 164 let total_events: u64 = self 165 .buckets 166 .iter() 167 .map(|bucket| bucket.load(Ordering::Relaxed)) 168 .sum(); 169 170 total_events as f64 / self.window_duration.as_secs_f64() 171 } 172 173 fn get_current_bucket_index(&self) -> usize { 174 let bucket_number = self.elapsed() / self.bucket_duration_nanos; 175 (bucket_number as usize) % self.buckets.len() 176 } 177 178 fn maybe_advance_buckets(&self) { 179 let current_bucket_time = 180 (self.elapsed() / self.bucket_duration_nanos) * self.bucket_duration_nanos; 181 let last_bucket_time = self.last_bucket_time.load(Ordering::Relaxed); 182 183 if current_bucket_time > last_bucket_time { 184 // try to update the last bucket time 185 if self 186 .last_bucket_time 187 .compare_exchange_weak( 188 last_bucket_time, 189 current_bucket_time, 190 Ordering::Relaxed, 191 Ordering::Relaxed, 192 ) 193 .is_ok() 194 { 195 // clear buckets that are now too old 196 let buckets_to_advance = ((current_bucket_time - last_bucket_time) 197 / self.bucket_duration_nanos) 198 .min(self.buckets.len() as u64); 199 200 for i in 0..buckets_to_advance { 201 let bucket_time = last_bucket_time + (i + 1) * self.bucket_duration_nanos; 202 let bucket_index = 203 (bucket_time / self.bucket_duration_nanos) as usize % self.buckets.len(); 204 self.buckets[bucket_index].store(0, Ordering::Relaxed); 205 } 206 } 207 } 208 } 209} 210 211#[cfg(test)] 212mod tests { 213 use super::*; 214 use std::sync::Arc; 215 use std::thread; 216 217 #[test] 218 fn test_rate_tracker_basic() { 219 let tracker = DefaultRateTracker::new(Duration::from_secs(2)); 220 221 // record some events 222 tracker.observe(3); 223 224 let rate = tracker.rate(); 225 assert_eq!(rate, 1.5); // 3 events over 2 seconds = 1.5 events/sec 226 } 227 228 #[test] 229 fn test_rate_tracker_burst() { 230 let tracker = DefaultRateTracker::new(Duration::from_secs(1)); 231 232 // record a lot of events 233 tracker.observe(1000); 234 235 let rate = tracker.rate(); 236 assert_eq!(rate, 1000.0); // 1000 events in 1 second 237 } 238 239 #[test] 240 fn test_rate_tracker_threading() { 241 let tracker = Arc::new(DefaultRateTracker::new(Duration::from_secs(1))); 242 let mut handles = vec![]; 243 244 for _ in 0..4 { 245 let tracker_clone = Arc::clone(&tracker); 246 let handle = thread::spawn(move || { 247 tracker_clone.observe(10); 248 }); 249 handles.push(handle); 250 } 251 252 for handle in handles { 253 handle.join().unwrap(); 254 } 255 256 let rate = tracker.rate(); 257 assert_eq!(rate, 40.0); // 40 events in 1 second 258 } 259} 260 261#[derive(Debug, Clone, Copy, PartialEq, Eq)] 262pub enum TimeDirection { 263 Backwards, // Past (default) 264 Forwards, // Future 265} 266 267impl Default for TimeDirection { 268 fn default() -> Self { 269 TimeDirection::Backwards 270 } 271} 272 273#[derive(Debug, Clone, PartialEq, Eq)] 274pub struct RelativeDateTime { 275 duration: Duration, 276 direction: TimeDirection, 277} 278 279impl RelativeDateTime { 280 pub fn new(duration: Duration, direction: TimeDirection) -> Self { 281 Self { 282 duration, 283 direction, 284 } 285 } 286 287 pub fn from_now(duration: Duration) -> Self { 288 let cur = get_time(); 289 if duration > cur { 290 Self::new(duration - cur, TimeDirection::Forwards) 291 } else { 292 Self::new(cur - duration, TimeDirection::Backwards) 293 } 294 } 295} 296 297impl std::fmt::Display for RelativeDateTime { 298 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 299 let secs = self.duration.as_secs(); 300 301 if secs == 0 { 302 return write!(f, "now"); 303 } 304 305 let (amount, unit) = match secs { 306 0 => unreachable!(), // handled above 307 1..=59 => (secs, "second"), 308 60..=3599 => (secs / 60, "minute"), 309 3600..=86399 => (secs / 3600, "hour"), 310 86400..=2591999 => (secs / 86400, "day"), // up to 29 days 311 2592000..=31535999 => (secs / 2592000, "month"), // 30 days to 364 days 312 _ => (secs / 31536000, "year"), // 365 days+ 313 }; 314 315 let plural = if amount != 1 { "s" } else { "" }; 316 317 match self.direction { 318 TimeDirection::Forwards => write!(f, "in {} {}{}", amount, unit, plural), 319 TimeDirection::Backwards => write!(f, "{} {}{} ago", amount, unit, plural), 320 } 321 } 322}