tracks lexicons and how many times they appeared on the jetstream
1use std::io::{self, Read, Write};
2use std::ops::Deref;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::time::Duration;
5
6use arc_swap::RefCnt;
7use byteview::ByteView;
8use ordered_varint::Variable;
9use rclite::Arc;
10
11pub fn get_time() -> Duration {
12 std::time::SystemTime::now()
13 .duration_since(std::time::UNIX_EPOCH)
14 .unwrap()
15}
16
17pub trait WriteVariableExt: Write {
18 fn write_varint(&mut self, value: impl Variable) -> io::Result<usize> {
19 value.encode_variable(self)
20 }
21}
22impl<W: Write> WriteVariableExt for W {}
23
24pub trait ReadVariableExt: Read {
25 fn read_varint<T: Variable>(&mut self) -> io::Result<T> {
26 T::decode_variable(self)
27 }
28}
29impl<R: Read> ReadVariableExt for R {}
30
31pub struct WritableByteView {
32 view: ByteView,
33 written: usize,
34}
35
36impl WritableByteView {
37 // returns None if the view already has a reference to it
38 pub fn with_size(capacity: usize) -> Self {
39 Self {
40 view: ByteView::with_size(capacity),
41 written: 0,
42 }
43 }
44
45 #[inline(always)]
46 pub fn into_inner(self) -> ByteView {
47 self.view
48 }
49}
50
51impl Write for WritableByteView {
52 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
53 let len = buf.len();
54 if len > self.view.len() - self.written {
55 return Err(std::io::Error::new(
56 std::io::ErrorKind::StorageFull,
57 "buffer full",
58 ));
59 }
60 // SAFETY: this is safe because we have checked that the buffer is not full
61 // SAFETY: we own the mutator so no other references to the view exist
62 unsafe {
63 std::ptr::copy_nonoverlapping(
64 buf.as_ptr(),
65 self.view
66 .get_mut()
67 .unwrap_unchecked()
68 .as_mut_ptr()
69 .add(self.written),
70 len,
71 );
72 self.written += len;
73 }
74 Ok(len)
75 }
76
77 #[inline(always)]
78 fn flush(&mut self) -> std::io::Result<()> {
79 Ok(())
80 }
81}
82
83pub fn varints_unsigned_encoded<const N: usize>(values: [u64; N]) -> ByteView {
84 let mut buf =
85 WritableByteView::with_size(values.into_iter().map(varint_unsigned_encoded_len).sum());
86 for value in values {
87 // cant fail
88 let _ = buf.write_varint(value);
89 }
90 buf.into_inner()
91}
92
93// gets the encoded length of a varint-encoded unsigned integer
94// see ordered_varint
95pub fn varint_unsigned_encoded_len(value: u64) -> usize {
96 let value = value.to_be_bytes();
97 value
98 .iter()
99 .enumerate()
100 .find_map(|(index, &byte)| {
101 (byte > 0).then(|| {
102 let extra_bytes = 7 - index;
103 (byte < 16)
104 .then(|| extra_bytes + 1)
105 .unwrap_or_else(|| extra_bytes + 2)
106 })
107 })
108 .unwrap_or(0)
109 .max(1)
110}
111
112pub static CLOCK: std::sync::LazyLock<quanta::Clock> =
113 std::sync::LazyLock::new(|| quanta::Clock::new());
114
115/// simple thread-safe rate tracker using time buckets
116/// divides time into fixed buckets and rotates through them
117#[derive(Debug)]
118pub struct RateTracker<const BUCKET_WINDOW: u64> {
119 buckets: Vec<AtomicU64>,
120 last_bucket_time: AtomicU64,
121 bucket_duration_nanos: u64,
122 window_duration: Duration,
123 start_time: u64, // raw time when tracker was created
124}
125
126pub type DefaultRateTracker = RateTracker<1000>;
127
128impl<const BUCKET_WINDOW: u64> RateTracker<BUCKET_WINDOW> {
129 /// create a new rate tracker with the specified time window
130 pub fn new(window_duration: Duration) -> Self {
131 let bucket_duration_nanos = Duration::from_millis(BUCKET_WINDOW).as_nanos() as u64;
132 let num_buckets =
133 (window_duration.as_nanos() as u64 / bucket_duration_nanos).max(1) as usize;
134
135 let mut buckets = Vec::with_capacity(num_buckets);
136 for _ in 0..num_buckets {
137 buckets.push(AtomicU64::new(0));
138 }
139
140 let start_time = CLOCK.raw();
141 Self {
142 buckets,
143 bucket_duration_nanos,
144 window_duration,
145 last_bucket_time: AtomicU64::new(0),
146 start_time,
147 }
148 }
149
150 #[inline(always)]
151 fn elapsed(&self) -> u64 {
152 CLOCK.delta_as_nanos(self.start_time, CLOCK.raw())
153 }
154
155 /// record an event
156 pub fn observe(&self, count: u64) {
157 self.maybe_advance_buckets();
158
159 let bucket_index = self.get_current_bucket_index();
160 self.buckets[bucket_index].fetch_add(count, Ordering::Relaxed);
161 }
162
163 /// get the current rate in events per second
164 pub fn rate(&self) -> f64 {
165 self.maybe_advance_buckets();
166
167 let total_events: u64 = self
168 .buckets
169 .iter()
170 .map(|bucket| bucket.load(Ordering::Relaxed))
171 .sum();
172
173 total_events as f64 / self.window_duration.as_secs_f64()
174 }
175
176 fn get_current_bucket_index(&self) -> usize {
177 let bucket_number = self.elapsed() / self.bucket_duration_nanos;
178 (bucket_number as usize) % self.buckets.len()
179 }
180
181 fn maybe_advance_buckets(&self) {
182 let current_bucket_time =
183 (self.elapsed() / self.bucket_duration_nanos) * self.bucket_duration_nanos;
184 let last_bucket_time = self.last_bucket_time.load(Ordering::Relaxed);
185
186 if current_bucket_time > last_bucket_time {
187 // try to update the last bucket time
188 if self
189 .last_bucket_time
190 .compare_exchange_weak(
191 last_bucket_time,
192 current_bucket_time,
193 Ordering::Relaxed,
194 Ordering::Relaxed,
195 )
196 .is_ok()
197 {
198 // clear buckets that are now too old
199 let buckets_to_advance = ((current_bucket_time - last_bucket_time)
200 / self.bucket_duration_nanos)
201 .min(self.buckets.len() as u64);
202
203 for i in 0..buckets_to_advance {
204 let bucket_time = last_bucket_time + (i + 1) * self.bucket_duration_nanos;
205 let bucket_index =
206 (bucket_time / self.bucket_duration_nanos) as usize % self.buckets.len();
207 self.buckets[bucket_index].store(0, Ordering::Relaxed);
208 }
209 }
210 }
211 }
212}
213
214#[cfg(test)]
215mod tests {
216 use super::*;
217 use std::sync::Arc;
218 use std::thread;
219
220 #[test]
221 fn test_rate_tracker_basic() {
222 let tracker = DefaultRateTracker::new(Duration::from_secs(2));
223
224 // record some events
225 tracker.observe(3);
226
227 let rate = tracker.rate();
228 assert_eq!(rate, 1.5); // 3 events over 2 seconds = 1.5 events/sec
229 }
230
231 #[test]
232 fn test_rate_tracker_burst() {
233 let tracker = DefaultRateTracker::new(Duration::from_secs(1));
234
235 // record a lot of events
236 tracker.observe(1000);
237
238 let rate = tracker.rate();
239 assert_eq!(rate, 1000.0); // 1000 events in 1 second
240 }
241
242 #[test]
243 fn test_rate_tracker_threading() {
244 let tracker = Arc::new(DefaultRateTracker::new(Duration::from_secs(1)));
245 let mut handles = vec![];
246
247 for _ in 0..4 {
248 let tracker_clone = Arc::clone(&tracker);
249 let handle = thread::spawn(move || {
250 tracker_clone.observe(10);
251 });
252 handles.push(handle);
253 }
254
255 for handle in handles {
256 handle.join().unwrap();
257 }
258
259 let rate = tracker.rate();
260 assert_eq!(rate, 40.0); // 40 events in 1 second
261 }
262}
263
264#[derive(Debug, Clone, Copy, PartialEq, Eq)]
265pub enum TimeDirection {
266 Backwards, // Past (default)
267 Forwards, // Future
268}
269
270impl Default for TimeDirection {
271 fn default() -> Self {
272 TimeDirection::Backwards
273 }
274}
275
276#[derive(Debug, Clone, PartialEq, Eq)]
277pub struct RelativeDateTime {
278 duration: Duration,
279 direction: TimeDirection,
280}
281
282impl RelativeDateTime {
283 pub fn new(duration: Duration, direction: TimeDirection) -> Self {
284 Self {
285 duration,
286 direction,
287 }
288 }
289
290 pub fn from_now(duration: Duration) -> Self {
291 let cur = get_time();
292 if duration > cur {
293 Self::new(duration - cur, TimeDirection::Forwards)
294 } else {
295 Self::new(cur - duration, TimeDirection::Backwards)
296 }
297 }
298}
299
300impl std::fmt::Display for RelativeDateTime {
301 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
302 let secs = self.duration.as_secs();
303
304 if secs == 0 {
305 return write!(f, "now");
306 }
307
308 let (amount, unit) = match secs {
309 0 => unreachable!(), // handled above
310 1..=59 => (secs, "second"),
311 60..=3599 => (secs / 60, "minute"),
312 3600..=86399 => (secs / 3600, "hour"),
313 86400..=2591999 => (secs / 86400, "day"), // up to 29 days
314 2592000..=31535999 => (secs / 2592000, "month"), // 30 days to 364 days
315 _ => (secs / 31536000, "year"), // 365 days+
316 };
317
318 let plural = if amount != 1 { "s" } else { "" };
319
320 match self.direction {
321 TimeDirection::Forwards => write!(f, "in {} {}{}", amount, unit, plural),
322 TimeDirection::Backwards => write!(f, "{} {}{} ago", amount, unit, plural),
323 }
324 }
325}
326
327pub type ArcliteSwap<T> = arc_swap::ArcSwapAny<ArcRefCnt<T>>;
328
329pub struct ArcRefCnt<T>(Arc<T>);
330
331impl<T> ArcRefCnt<T> {
332 pub fn new(value: T) -> Self {
333 Self(Arc::new(value))
334 }
335}
336
337impl<T> Deref for ArcRefCnt<T> {
338 type Target = T;
339
340 fn deref(&self) -> &Self::Target {
341 &self.0
342 }
343}
344
345impl<T> Clone for ArcRefCnt<T> {
346 fn clone(&self) -> Self {
347 Self(self.0.clone())
348 }
349}
350
351// SAFETY: uhhhhhhhh copied the Arc impl from arc_swap xd
352unsafe impl<T> RefCnt for ArcRefCnt<T> {
353 type Base = T;
354
355 fn into_ptr(me: Self) -> *mut Self::Base {
356 Arc::into_raw(me.0) as *mut T
357 }
358
359 fn as_ptr(me: &Self) -> *mut Self::Base {
360 // Slightly convoluted way to do this, but this avoids stacked borrows violations. The same
361 // intention as
362 //
363 // me as &T as *const T as *mut T
364 //
365 // We first create a "shallow copy" of me - one that doesn't really own its ref count
366 // (that's OK, me _does_ own it, so it can't be destroyed in the meantime).
367 // Then we can use into_raw (which preserves not having the ref count).
368 //
369 // We need to "revert" the changes we did. In current std implementation, the combination
370 // of from_raw and forget is no-op. But formally, into_raw shall be paired with from_raw
371 // and that read shall be paired with forget to properly "close the brackets". In future
372 // versions of STD, these may become something else that's not really no-op (unlikely, but
373 // possible), so we future-proof it a bit.
374
375 // SAFETY: &T cast to *const T will always be aligned, initialised and valid for reads
376 let ptr = Arc::into_raw(unsafe { std::ptr::read(&me.0) });
377 let ptr = ptr as *mut T;
378
379 // SAFETY: We got the pointer from into_raw just above
380 std::mem::forget(unsafe { Arc::from_raw(ptr) });
381
382 ptr
383 }
384
385 unsafe fn from_ptr(ptr: *const Self::Base) -> Self {
386 Self(unsafe { Arc::from_raw(ptr) })
387 }
388}