forked from
microcosm.blue/microcosm-rs
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use std::collections::{BTreeMap, VecDeque};
2use std::ops::RangeBounds;
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5use thiserror::Error;
6use tokio::sync::Mutex;
7
8#[derive(Debug, Error)]
9pub enum EnqueueError<T> {
10 #[error("queue ouput dropped")]
11 OutputDropped(T),
12}
13
14pub trait Key: Eq + Ord + Clone {}
15impl<T: Eq + Ord + Clone> Key for T {}
16
17#[derive(Debug)]
18struct Queue<K: Key, T> {
19 queue: VecDeque<(Instant, K)>,
20 items: BTreeMap<K, T>,
21}
22
23pub struct Input<K: Key, T> {
24 q: Arc<Mutex<Queue<K, T>>>,
25}
26
27impl<K: Key, T> Input<K, T> {
28 /// if a key is already present, its previous item will be overwritten and
29 /// its delay time will be reset for the new item.
30 ///
31 /// errors if the remover has been dropped
32 pub async fn enqueue(&self, key: K, item: T) -> Result<(), EnqueueError<T>> {
33 if Arc::strong_count(&self.q) == 1 {
34 return Err(EnqueueError::OutputDropped(item));
35 }
36 // TODO: try to push out an old element first
37 // for now we just hope there's a listener
38 let now = Instant::now();
39 let mut q = self.q.lock().await;
40 q.queue.push_back((now, key.clone()));
41 q.items.insert(key, item);
42 Ok(())
43 }
44 /// remove an item from the queue, by key
45 ///
46 /// the item itself is removed, but the key will remain in the queue -- it
47 /// will simply be skipped over when a new output item is requested. this
48 /// keeps the removal cheap (=btreemap remove), for a bit of space overhead
49 pub async fn remove_range(&self, range: impl RangeBounds<K>) {
50 let n = {
51 let mut q = self.q.lock().await;
52 let keys = q
53 .items
54 .range(range)
55 .map(|(k, _)| k)
56 .cloned()
57 .collect::<Vec<_>>();
58 for k in &keys {
59 q.items.remove(k);
60 }
61 keys.len()
62 };
63 if n == 0 {
64 metrics::counter!("delay_queue_remove_not_found").increment(1);
65 } else {
66 metrics::counter!("delay_queue_remove_total_records").increment(1);
67 metrics::counter!("delay_queue_remove_total_links").increment(n as u64);
68 }
69 }
70}
71
72pub struct Output<K: Key, T> {
73 delay: Duration,
74 q: Arc<Mutex<Queue<K, T>>>,
75}
76
77impl<K: Key, T> Output<K, T> {
78 pub async fn next(&self) -> Option<T> {
79 let get = || async {
80 let mut q = self.q.lock().await;
81 metrics::gauge!("delay_queue_queue_len").set(q.queue.len() as f64);
82 metrics::gauge!("delay_queue_queue_capacity").set(q.queue.capacity() as f64);
83 while let Some((t, k)) = q.queue.pop_front() {
84 // skip over queued keys that were removed from items
85 if let Some(item) = q.items.remove(&k) {
86 return Some((t, item));
87 }
88 }
89 None
90 };
91 loop {
92 if let Some((t, item)) = get().await {
93 let now = Instant::now();
94 let expected_release = t + self.delay;
95 if expected_release.saturating_duration_since(now) > Duration::from_millis(1) {
96 tokio::time::sleep_until(expected_release.into()).await;
97 metrics::counter!("delay_queue_emit_total", "early" => "yes").increment(1);
98 metrics::histogram!("delay_queue_emit_overshoot").record(0);
99 } else {
100 let overshoot = now.saturating_duration_since(expected_release);
101 metrics::counter!("delay_queue_emit_total", "early" => "no").increment(1);
102 metrics::histogram!("delay_queue_emit_overshoot")
103 .record(overshoot.as_secs_f64());
104 }
105 return Some(item);
106 } else if Arc::strong_count(&self.q) == 1 {
107 return None;
108 }
109 // the queue is *empty*, so we need to wait at least as long as the current delay
110 tokio::time::sleep(self.delay).await;
111 metrics::counter!("delay_queue_entirely_empty_total").increment(1);
112 }
113 }
114}
115
116pub fn removable_delay_queue<K: Key, T>(delay: Duration) -> (Input<K, T>, Output<K, T>) {
117 let q: Arc<Mutex<Queue<K, T>>> = Arc::new(Mutex::new(Queue {
118 queue: VecDeque::new(),
119 items: BTreeMap::new(),
120 }));
121
122 let input = Input::<K, T> { q: q.clone() };
123 let output = Output::<K, T> { q, delay };
124 (input, output)
125}