Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use std::collections::{BTreeMap, VecDeque}; 2use std::ops::RangeBounds; 3use std::sync::Arc; 4use std::time::{Duration, Instant}; 5use thiserror::Error; 6use tokio::sync::Mutex; 7 8#[derive(Debug, Error)] 9pub enum EnqueueError<T> { 10 #[error("queue ouput dropped")] 11 OutputDropped(T), 12} 13 14pub trait Key: Eq + Ord + Clone {} 15impl<T: Eq + Ord + Clone> Key for T {} 16 17#[derive(Debug)] 18struct Queue<K: Key, T> { 19 queue: VecDeque<(Instant, K)>, 20 items: BTreeMap<K, T>, 21} 22 23pub struct Input<K: Key, T> { 24 q: Arc<Mutex<Queue<K, T>>>, 25} 26 27impl<K: Key, T> Input<K, T> { 28 /// if a key is already present, its previous item will be overwritten and 29 /// its delay time will be reset for the new item. 30 /// 31 /// errors if the remover has been dropped 32 pub async fn enqueue(&self, key: K, item: T) -> Result<(), EnqueueError<T>> { 33 if Arc::strong_count(&self.q) == 1 { 34 return Err(EnqueueError::OutputDropped(item)); 35 } 36 // TODO: try to push out an old element first 37 // for now we just hope there's a listener 38 let now = Instant::now(); 39 let mut q = self.q.lock().await; 40 q.queue.push_back((now, key.clone())); 41 q.items.insert(key, item); 42 Ok(()) 43 } 44 /// remove an item from the queue, by key 45 /// 46 /// the item itself is removed, but the key will remain in the queue -- it 47 /// will simply be skipped over when a new output item is requested. this 48 /// keeps the removal cheap (=btreemap remove), for a bit of space overhead 49 pub async fn remove_range(&self, range: impl RangeBounds<K>) { 50 let n = { 51 let mut q = self.q.lock().await; 52 let keys = q 53 .items 54 .range(range) 55 .map(|(k, _)| k) 56 .cloned() 57 .collect::<Vec<_>>(); 58 for k in &keys { 59 q.items.remove(k); 60 } 61 keys.len() 62 }; 63 if n == 0 { 64 metrics::counter!("delay_queue_remove_not_found").increment(1); 65 } else { 66 metrics::counter!("delay_queue_remove_total_records").increment(1); 67 metrics::counter!("delay_queue_remove_total_links").increment(n as u64); 68 } 69 } 70} 71 72pub struct Output<K: Key, T> { 73 delay: Duration, 74 q: Arc<Mutex<Queue<K, T>>>, 75} 76 77impl<K: Key, T> Output<K, T> { 78 pub async fn next(&self) -> Option<T> { 79 let get = || async { 80 let mut q = self.q.lock().await; 81 metrics::gauge!("delay_queue_queue_len").set(q.queue.len() as f64); 82 metrics::gauge!("delay_queue_queue_capacity").set(q.queue.capacity() as f64); 83 while let Some((t, k)) = q.queue.pop_front() { 84 // skip over queued keys that were removed from items 85 if let Some(item) = q.items.remove(&k) { 86 return Some((t, item)); 87 } 88 } 89 None 90 }; 91 loop { 92 if let Some((t, item)) = get().await { 93 let now = Instant::now(); 94 let expected_release = t + self.delay; 95 if expected_release.saturating_duration_since(now) > Duration::from_millis(1) { 96 tokio::time::sleep_until(expected_release.into()).await; 97 metrics::counter!("delay_queue_emit_total", "early" => "yes").increment(1); 98 metrics::histogram!("delay_queue_emit_overshoot").record(0); 99 } else { 100 let overshoot = now.saturating_duration_since(expected_release); 101 metrics::counter!("delay_queue_emit_total", "early" => "no").increment(1); 102 metrics::histogram!("delay_queue_emit_overshoot") 103 .record(overshoot.as_secs_f64()); 104 } 105 return Some(item); 106 } else if Arc::strong_count(&self.q) == 1 { 107 return None; 108 } 109 // the queue is *empty*, so we need to wait at least as long as the current delay 110 tokio::time::sleep(self.delay).await; 111 metrics::counter!("delay_queue_entirely_empty_total").increment(1); 112 } 113 } 114} 115 116pub fn removable_delay_queue<K: Key, T>(delay: Duration) -> (Input<K, T>, Output<K, T>) { 117 let q: Arc<Mutex<Queue<K, T>>> = Arc::new(Mutex::new(Queue { 118 queue: VecDeque::new(), 119 items: BTreeMap::new(), 120 })); 121 122 let input = Input::<K, T> { q: q.clone() }; 123 let output = Output::<K, T> { q, delay }; 124 (input, output) 125}