tracks lexicons and how many times they appeared on the jetstream
at main 12 kB view raw
1use std::io::{self, Read, Write}; 2use std::ops::Deref; 3use std::sync::atomic::{AtomicU64, Ordering}; 4use std::time::Duration; 5 6use arc_swap::RefCnt; 7use byteview::ByteView; 8use ordered_varint::Variable; 9use rclite::Arc; 10 11pub fn get_time() -> Duration { 12 std::time::SystemTime::now() 13 .duration_since(std::time::UNIX_EPOCH) 14 .unwrap() 15} 16 17pub trait WriteVariableExt: Write { 18 fn write_varint(&mut self, value: impl Variable) -> io::Result<usize> { 19 value.encode_variable(self) 20 } 21} 22impl<W: Write> WriteVariableExt for W {} 23 24pub trait ReadVariableExt: Read { 25 fn read_varint<T: Variable>(&mut self) -> io::Result<T> { 26 T::decode_variable(self) 27 } 28} 29impl<R: Read> ReadVariableExt for R {} 30 31pub struct WritableByteView { 32 view: ByteView, 33 written: usize, 34} 35 36impl WritableByteView { 37 // returns None if the view already has a reference to it 38 pub fn with_size(capacity: usize) -> Self { 39 Self { 40 view: ByteView::with_size(capacity), 41 written: 0, 42 } 43 } 44 45 #[inline(always)] 46 pub fn into_inner(self) -> ByteView { 47 self.view 48 } 49} 50 51impl Write for WritableByteView { 52 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> { 53 let len = buf.len(); 54 if len > self.view.len() - self.written { 55 return Err(std::io::Error::new( 56 std::io::ErrorKind::StorageFull, 57 "buffer full", 58 )); 59 } 60 // SAFETY: this is safe because we have checked that the buffer is not full 61 // SAFETY: we own the mutator so no other references to the view exist 62 unsafe { 63 std::ptr::copy_nonoverlapping( 64 buf.as_ptr(), 65 self.view 66 .get_mut() 67 .unwrap_unchecked() 68 .as_mut_ptr() 69 .add(self.written), 70 len, 71 ); 72 self.written += len; 73 } 74 Ok(len) 75 } 76 77 #[inline(always)] 78 fn flush(&mut self) -> std::io::Result<()> { 79 Ok(()) 80 } 81} 82 83pub fn varints_unsigned_encoded<const N: usize>(values: [u64; N]) -> ByteView { 84 let mut buf = 85 WritableByteView::with_size(values.into_iter().map(varint_unsigned_encoded_len).sum()); 86 for value in values { 87 // cant fail 88 let _ = buf.write_varint(value); 89 } 90 buf.into_inner() 91} 92 93// gets the encoded length of a varint-encoded unsigned integer 94// see ordered_varint 95pub fn varint_unsigned_encoded_len(value: u64) -> usize { 96 let value = value.to_be_bytes(); 97 value 98 .iter() 99 .enumerate() 100 .find_map(|(index, &byte)| { 101 (byte > 0).then(|| { 102 let extra_bytes = 7 - index; 103 (byte < 16) 104 .then(|| extra_bytes + 1) 105 .unwrap_or_else(|| extra_bytes + 2) 106 }) 107 }) 108 .unwrap_or(0) 109 .max(1) 110} 111 112pub static CLOCK: std::sync::LazyLock<quanta::Clock> = 113 std::sync::LazyLock::new(|| quanta::Clock::new()); 114 115/// simple thread-safe rate tracker using time buckets 116/// divides time into fixed buckets and rotates through them 117#[derive(Debug)] 118pub struct RateTracker<const BUCKET_WINDOW: u64> { 119 buckets: Vec<AtomicU64>, 120 last_bucket_time: AtomicU64, 121 bucket_duration_nanos: u64, 122 window_duration: Duration, 123 start_time: u64, // raw time when tracker was created 124} 125 126pub type DefaultRateTracker = RateTracker<1000>; 127 128impl<const BUCKET_WINDOW: u64> RateTracker<BUCKET_WINDOW> { 129 /// create a new rate tracker with the specified time window 130 pub fn new(window_duration: Duration) -> Self { 131 let bucket_duration_nanos = Duration::from_millis(BUCKET_WINDOW).as_nanos() as u64; 132 let num_buckets = 133 (window_duration.as_nanos() as u64 / bucket_duration_nanos).max(1) as usize; 134 135 let mut buckets = Vec::with_capacity(num_buckets); 136 for _ in 0..num_buckets { 137 buckets.push(AtomicU64::new(0)); 138 } 139 140 let start_time = CLOCK.raw(); 141 Self { 142 buckets, 143 bucket_duration_nanos, 144 window_duration, 145 last_bucket_time: AtomicU64::new(0), 146 start_time, 147 } 148 } 149 150 #[inline(always)] 151 fn elapsed(&self) -> u64 { 152 CLOCK.delta_as_nanos(self.start_time, CLOCK.raw()) 153 } 154 155 /// record an event 156 pub fn observe(&self, count: u64) { 157 self.maybe_advance_buckets(); 158 159 let bucket_index = self.get_current_bucket_index(); 160 self.buckets[bucket_index].fetch_add(count, Ordering::Relaxed); 161 } 162 163 /// get the current rate in events per second 164 pub fn rate(&self) -> f64 { 165 self.maybe_advance_buckets(); 166 167 let total_events: u64 = self 168 .buckets 169 .iter() 170 .map(|bucket| bucket.load(Ordering::Relaxed)) 171 .sum(); 172 173 total_events as f64 / self.window_duration.as_secs_f64() 174 } 175 176 fn get_current_bucket_index(&self) -> usize { 177 let bucket_number = self.elapsed() / self.bucket_duration_nanos; 178 (bucket_number as usize) % self.buckets.len() 179 } 180 181 fn maybe_advance_buckets(&self) { 182 let current_bucket_time = 183 (self.elapsed() / self.bucket_duration_nanos) * self.bucket_duration_nanos; 184 let last_bucket_time = self.last_bucket_time.load(Ordering::Relaxed); 185 186 if current_bucket_time > last_bucket_time { 187 // try to update the last bucket time 188 if self 189 .last_bucket_time 190 .compare_exchange_weak( 191 last_bucket_time, 192 current_bucket_time, 193 Ordering::Relaxed, 194 Ordering::Relaxed, 195 ) 196 .is_ok() 197 { 198 // clear buckets that are now too old 199 let buckets_to_advance = ((current_bucket_time - last_bucket_time) 200 / self.bucket_duration_nanos) 201 .min(self.buckets.len() as u64); 202 203 for i in 0..buckets_to_advance { 204 let bucket_time = last_bucket_time + (i + 1) * self.bucket_duration_nanos; 205 let bucket_index = 206 (bucket_time / self.bucket_duration_nanos) as usize % self.buckets.len(); 207 self.buckets[bucket_index].store(0, Ordering::Relaxed); 208 } 209 } 210 } 211 } 212} 213 214#[cfg(test)] 215mod tests { 216 use super::*; 217 use std::sync::Arc; 218 use std::thread; 219 220 #[test] 221 fn test_rate_tracker_basic() { 222 let tracker = DefaultRateTracker::new(Duration::from_secs(2)); 223 224 // record some events 225 tracker.observe(3); 226 227 let rate = tracker.rate(); 228 assert_eq!(rate, 1.5); // 3 events over 2 seconds = 1.5 events/sec 229 } 230 231 #[test] 232 fn test_rate_tracker_burst() { 233 let tracker = DefaultRateTracker::new(Duration::from_secs(1)); 234 235 // record a lot of events 236 tracker.observe(1000); 237 238 let rate = tracker.rate(); 239 assert_eq!(rate, 1000.0); // 1000 events in 1 second 240 } 241 242 #[test] 243 fn test_rate_tracker_threading() { 244 let tracker = Arc::new(DefaultRateTracker::new(Duration::from_secs(1))); 245 let mut handles = vec![]; 246 247 for _ in 0..4 { 248 let tracker_clone = Arc::clone(&tracker); 249 let handle = thread::spawn(move || { 250 tracker_clone.observe(10); 251 }); 252 handles.push(handle); 253 } 254 255 for handle in handles { 256 handle.join().unwrap(); 257 } 258 259 let rate = tracker.rate(); 260 assert_eq!(rate, 40.0); // 40 events in 1 second 261 } 262} 263 264#[derive(Debug, Clone, Copy, PartialEq, Eq)] 265pub enum TimeDirection { 266 Backwards, // Past (default) 267 Forwards, // Future 268} 269 270impl Default for TimeDirection { 271 fn default() -> Self { 272 TimeDirection::Backwards 273 } 274} 275 276#[derive(Debug, Clone, PartialEq, Eq)] 277pub struct RelativeDateTime { 278 duration: Duration, 279 direction: TimeDirection, 280} 281 282impl RelativeDateTime { 283 pub fn new(duration: Duration, direction: TimeDirection) -> Self { 284 Self { 285 duration, 286 direction, 287 } 288 } 289 290 pub fn from_now(duration: Duration) -> Self { 291 let cur = get_time(); 292 if duration > cur { 293 Self::new(duration - cur, TimeDirection::Forwards) 294 } else { 295 Self::new(cur - duration, TimeDirection::Backwards) 296 } 297 } 298} 299 300impl std::fmt::Display for RelativeDateTime { 301 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 302 let secs = self.duration.as_secs(); 303 304 if secs == 0 { 305 return write!(f, "now"); 306 } 307 308 let (amount, unit) = match secs { 309 0 => unreachable!(), // handled above 310 1..=59 => (secs, "second"), 311 60..=3599 => (secs / 60, "minute"), 312 3600..=86399 => (secs / 3600, "hour"), 313 86400..=2591999 => (secs / 86400, "day"), // up to 29 days 314 2592000..=31535999 => (secs / 2592000, "month"), // 30 days to 364 days 315 _ => (secs / 31536000, "year"), // 365 days+ 316 }; 317 318 let plural = if amount != 1 { "s" } else { "" }; 319 320 match self.direction { 321 TimeDirection::Forwards => write!(f, "in {} {}{}", amount, unit, plural), 322 TimeDirection::Backwards => write!(f, "{} {}{} ago", amount, unit, plural), 323 } 324 } 325} 326 327pub type ArcliteSwap<T> = arc_swap::ArcSwapAny<ArcRefCnt<T>>; 328 329pub struct ArcRefCnt<T>(Arc<T>); 330 331impl<T> ArcRefCnt<T> { 332 pub fn new(value: T) -> Self { 333 Self(Arc::new(value)) 334 } 335} 336 337impl<T> Deref for ArcRefCnt<T> { 338 type Target = T; 339 340 fn deref(&self) -> &Self::Target { 341 &self.0 342 } 343} 344 345impl<T> Clone for ArcRefCnt<T> { 346 fn clone(&self) -> Self { 347 Self(self.0.clone()) 348 } 349} 350 351// SAFETY: uhhhhhhhh copied the Arc impl from arc_swap xd 352unsafe impl<T> RefCnt for ArcRefCnt<T> { 353 type Base = T; 354 355 fn into_ptr(me: Self) -> *mut Self::Base { 356 Arc::into_raw(me.0) as *mut T 357 } 358 359 fn as_ptr(me: &Self) -> *mut Self::Base { 360 // Slightly convoluted way to do this, but this avoids stacked borrows violations. The same 361 // intention as 362 // 363 // me as &T as *const T as *mut T 364 // 365 // We first create a "shallow copy" of me - one that doesn't really own its ref count 366 // (that's OK, me _does_ own it, so it can't be destroyed in the meantime). 367 // Then we can use into_raw (which preserves not having the ref count). 368 // 369 // We need to "revert" the changes we did. In current std implementation, the combination 370 // of from_raw and forget is no-op. But formally, into_raw shall be paired with from_raw 371 // and that read shall be paired with forget to properly "close the brackets". In future 372 // versions of STD, these may become something else that's not really no-op (unlikely, but 373 // possible), so we future-proof it a bit. 374 375 // SAFETY: &T cast to *const T will always be aligned, initialised and valid for reads 376 let ptr = Arc::into_raw(unsafe { std::ptr::read(&me.0) }); 377 let ptr = ptr as *mut T; 378 379 // SAFETY: We got the pointer from into_raw just above 380 std::mem::forget(unsafe { Arc::from_raw(ptr) }); 381 382 ptr 383 } 384 385 unsafe fn from_ptr(ptr: *const Self::Base) -> Self { 386 Self(unsafe { Arc::from_raw(ptr) }) 387 } 388}