tracks lexicons and how many times they appeared on the jetstream
1use std::io::{self, Read, Write};
2use std::sync::atomic::{AtomicU64, Ordering};
3use std::time::Duration;
4
5use byteview::ByteView;
6use ordered_varint::Variable;
7
8pub fn get_time() -> Duration {
9 std::time::SystemTime::now()
10 .duration_since(std::time::UNIX_EPOCH)
11 .unwrap()
12}
13
14pub trait WriteVariableExt: Write {
15 fn write_varint(&mut self, value: impl Variable) -> io::Result<usize> {
16 value.encode_variable(self)
17 }
18}
19impl<W: Write> WriteVariableExt for W {}
20
21pub trait ReadVariableExt: Read {
22 fn read_varint<T: Variable>(&mut self) -> io::Result<T> {
23 T::decode_variable(self)
24 }
25}
26impl<R: Read> ReadVariableExt for R {}
27
28pub struct WritableByteView {
29 view: ByteView,
30 written: usize,
31}
32
33impl WritableByteView {
34 // returns None if the view already has a reference to it
35 pub fn with_size(capacity: usize) -> Self {
36 Self {
37 view: ByteView::with_size(capacity),
38 written: 0,
39 }
40 }
41
42 #[inline(always)]
43 pub fn into_inner(self) -> ByteView {
44 self.view
45 }
46}
47
48impl Write for WritableByteView {
49 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
50 let len = buf.len();
51 if len > self.view.len() - self.written {
52 return Err(std::io::Error::new(
53 std::io::ErrorKind::StorageFull,
54 "buffer full",
55 ));
56 }
57 // SAFETY: this is safe because we have checked that the buffer is not full
58 // SAFETY: we own the mutator so no other references to the view exist
59 unsafe {
60 std::ptr::copy_nonoverlapping(
61 buf.as_ptr(),
62 self.view
63 .get_mut()
64 .unwrap_unchecked()
65 .as_mut_ptr()
66 .add(self.written),
67 len,
68 );
69 self.written += len;
70 }
71 Ok(len)
72 }
73
74 #[inline(always)]
75 fn flush(&mut self) -> std::io::Result<()> {
76 Ok(())
77 }
78}
79
80pub fn varints_unsigned_encoded<const N: usize>(values: [u64; N]) -> ByteView {
81 let mut buf =
82 WritableByteView::with_size(values.into_iter().map(varint_unsigned_encoded_len).sum());
83 for value in values {
84 // cant fail
85 let _ = buf.write_varint(value);
86 }
87 buf.into_inner()
88}
89
90// gets the encoded length of a varint-encoded unsigned integer
91// see ordered_varint
92pub fn varint_unsigned_encoded_len(value: u64) -> usize {
93 let value = value.to_be_bytes();
94 value
95 .iter()
96 .enumerate()
97 .find_map(|(index, &byte)| {
98 (byte > 0).then(|| {
99 let extra_bytes = 7 - index;
100 (byte < 16)
101 .then(|| extra_bytes + 1)
102 .unwrap_or_else(|| extra_bytes + 2)
103 })
104 })
105 .unwrap_or(0)
106 .max(1)
107}
108
109pub static CLOCK: std::sync::LazyLock<quanta::Clock> =
110 std::sync::LazyLock::new(|| quanta::Clock::new());
111
112/// simple thread-safe rate tracker using time buckets
113/// divides time into fixed buckets and rotates through them
114#[derive(Debug)]
115pub struct RateTracker<const BUCKET_WINDOW: u64> {
116 buckets: Vec<AtomicU64>,
117 last_bucket_time: AtomicU64,
118 bucket_duration_nanos: u64,
119 window_duration: Duration,
120 start_time: u64, // raw time when tracker was created
121}
122
123pub type DefaultRateTracker = RateTracker<1000>;
124
125impl<const BUCKET_WINDOW: u64> RateTracker<BUCKET_WINDOW> {
126 /// create a new rate tracker with the specified time window
127 pub fn new(window_duration: Duration) -> Self {
128 let bucket_duration_nanos = Duration::from_millis(BUCKET_WINDOW).as_nanos() as u64;
129 let num_buckets =
130 (window_duration.as_nanos() as u64 / bucket_duration_nanos).max(1) as usize;
131
132 let mut buckets = Vec::with_capacity(num_buckets);
133 for _ in 0..num_buckets {
134 buckets.push(AtomicU64::new(0));
135 }
136
137 let start_time = CLOCK.raw();
138 Self {
139 buckets,
140 bucket_duration_nanos,
141 window_duration,
142 last_bucket_time: AtomicU64::new(0),
143 start_time,
144 }
145 }
146
147 #[inline(always)]
148 fn elapsed(&self) -> u64 {
149 CLOCK.delta_as_nanos(self.start_time, CLOCK.raw())
150 }
151
152 /// record an event
153 pub fn observe(&self, count: u64) {
154 self.maybe_advance_buckets();
155
156 let bucket_index = self.get_current_bucket_index();
157 self.buckets[bucket_index].fetch_add(count, Ordering::Relaxed);
158 }
159
160 /// get the current rate in events per second
161 pub fn rate(&self) -> f64 {
162 self.maybe_advance_buckets();
163
164 let total_events: u64 = self
165 .buckets
166 .iter()
167 .map(|bucket| bucket.load(Ordering::Relaxed))
168 .sum();
169
170 total_events as f64 / self.window_duration.as_secs_f64()
171 }
172
173 fn get_current_bucket_index(&self) -> usize {
174 let bucket_number = self.elapsed() / self.bucket_duration_nanos;
175 (bucket_number as usize) % self.buckets.len()
176 }
177
178 fn maybe_advance_buckets(&self) {
179 let current_bucket_time =
180 (self.elapsed() / self.bucket_duration_nanos) * self.bucket_duration_nanos;
181 let last_bucket_time = self.last_bucket_time.load(Ordering::Relaxed);
182
183 if current_bucket_time > last_bucket_time {
184 // try to update the last bucket time
185 if self
186 .last_bucket_time
187 .compare_exchange_weak(
188 last_bucket_time,
189 current_bucket_time,
190 Ordering::Relaxed,
191 Ordering::Relaxed,
192 )
193 .is_ok()
194 {
195 // clear buckets that are now too old
196 let buckets_to_advance = ((current_bucket_time - last_bucket_time)
197 / self.bucket_duration_nanos)
198 .min(self.buckets.len() as u64);
199
200 for i in 0..buckets_to_advance {
201 let bucket_time = last_bucket_time + (i + 1) * self.bucket_duration_nanos;
202 let bucket_index =
203 (bucket_time / self.bucket_duration_nanos) as usize % self.buckets.len();
204 self.buckets[bucket_index].store(0, Ordering::Relaxed);
205 }
206 }
207 }
208 }
209}
210
211#[cfg(test)]
212mod tests {
213 use super::*;
214 use std::sync::Arc;
215 use std::thread;
216
217 #[test]
218 fn test_rate_tracker_basic() {
219 let tracker = DefaultRateTracker::new(Duration::from_secs(2));
220
221 // record some events
222 tracker.observe(3);
223
224 let rate = tracker.rate();
225 assert_eq!(rate, 1.5); // 3 events over 2 seconds = 1.5 events/sec
226 }
227
228 #[test]
229 fn test_rate_tracker_burst() {
230 let tracker = DefaultRateTracker::new(Duration::from_secs(1));
231
232 // record a lot of events
233 tracker.observe(1000);
234
235 let rate = tracker.rate();
236 assert_eq!(rate, 1000.0); // 1000 events in 1 second
237 }
238
239 #[test]
240 fn test_rate_tracker_threading() {
241 let tracker = Arc::new(DefaultRateTracker::new(Duration::from_secs(1)));
242 let mut handles = vec![];
243
244 for _ in 0..4 {
245 let tracker_clone = Arc::clone(&tracker);
246 let handle = thread::spawn(move || {
247 tracker_clone.observe(10);
248 });
249 handles.push(handle);
250 }
251
252 for handle in handles {
253 handle.join().unwrap();
254 }
255
256 let rate = tracker.rate();
257 assert_eq!(rate, 40.0); // 40 events in 1 second
258 }
259}
260
261#[derive(Debug, Clone, Copy, PartialEq, Eq)]
262pub enum TimeDirection {
263 Backwards, // Past (default)
264 Forwards, // Future
265}
266
267impl Default for TimeDirection {
268 fn default() -> Self {
269 TimeDirection::Backwards
270 }
271}
272
273#[derive(Debug, Clone, PartialEq, Eq)]
274pub struct RelativeDateTime {
275 duration: Duration,
276 direction: TimeDirection,
277}
278
279impl RelativeDateTime {
280 pub fn new(duration: Duration, direction: TimeDirection) -> Self {
281 Self {
282 duration,
283 direction,
284 }
285 }
286
287 pub fn from_now(duration: Duration) -> Self {
288 let cur = get_time();
289 if duration > cur {
290 Self::new(duration - cur, TimeDirection::Forwards)
291 } else {
292 Self::new(cur - duration, TimeDirection::Backwards)
293 }
294 }
295}
296
297impl std::fmt::Display for RelativeDateTime {
298 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
299 let secs = self.duration.as_secs();
300
301 if secs == 0 {
302 return write!(f, "now");
303 }
304
305 let (amount, unit) = match secs {
306 0 => unreachable!(), // handled above
307 1..=59 => (secs, "second"),
308 60..=3599 => (secs / 60, "minute"),
309 3600..=86399 => (secs / 3600, "hour"),
310 86400..=2591999 => (secs / 86400, "day"), // up to 29 days
311 2592000..=31535999 => (secs / 2592000, "month"), // 30 days to 364 days
312 _ => (secs / 31536000, "year"), // 365 days+
313 };
314
315 let plural = if amount != 1 { "s" } else { "" };
316
317 match self.direction {
318 TimeDirection::Forwards => write!(f, "in {} {}{}", amount, unit, plural),
319 TimeDirection::Backwards => write!(f, "{} {}{} ago", amount, unit, plural),
320 }
321 }
322}