Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
at main 2.6 kB view raw
1use dashmap::DashMap; 2use rand::{Rng, distr::Alphanumeric}; 3use std::sync::Arc; 4use std::time::Duration; 5use tokio::task::{JoinHandle, spawn}; 6use tokio::time::sleep; 7use tokio_util::sync::{CancellationToken, DropGuard}; 8 9pub struct ExpiringTaskMap<T>(TaskMap<T>); 10 11/// need to manually implement clone because T is allowed to not be clone 12impl<T> Clone for ExpiringTaskMap<T> { 13 fn clone(&self) -> Self { 14 Self(self.0.clone()) 15 } 16} 17 18impl<T: Send + 'static> ExpiringTaskMap<T> { 19 pub fn new(expiration: Duration) -> Self { 20 let map = TaskMap { 21 map: Arc::new(DashMap::new()), 22 expiration, 23 }; 24 Self(map) 25 } 26 27 pub fn dispatch<F>(&self, task: F, cancel: CancellationToken) -> String 28 where 29 F: Future<Output = T> + Send + 'static, 30 { 31 let TaskMap { 32 ref map, 33 expiration, 34 } = self.0; 35 let task_key: String = rand::rng() 36 .sample_iter(&Alphanumeric) 37 .take(24) 38 .map(char::from) 39 .collect(); 40 41 // spawn a tokio task and put the join handle in the map for later retrieval 42 map.insert(task_key.clone(), (cancel.clone().drop_guard(), spawn(task))); 43 44 // spawn a second task to clean up the map in case it doesn't get claimed 45 let k = task_key.clone(); 46 let map = map.clone(); 47 spawn(async move { 48 if cancel 49 .run_until_cancelled(sleep(expiration)) 50 .await 51 .is_some() 52 // the (sleep) task completed first 53 { 54 map.remove(&k); 55 cancel.cancel(); 56 metrics::counter!("whoami_task_map_completions", "result" => "expired") 57 .increment(1); 58 } 59 }); 60 61 task_key 62 } 63 64 pub fn take(&self, key: &str) -> Option<JoinHandle<T>> { 65 if let Some((_key, (_guard, handle))) = self.0.map.remove(key) { 66 // when the _guard drops, it cancels the token for us 67 metrics::counter!("whoami_task_map_completions", "result" => "retrieved").increment(1); 68 Some(handle) 69 } else { 70 metrics::counter!("whoami_task_map_gones").increment(1); 71 None 72 } 73 } 74} 75 76struct TaskMap<T> { 77 map: Arc<DashMap<String, (DropGuard, JoinHandle<T>)>>, 78 expiration: Duration, 79} 80 81/// need to manually implement clone because T is allowed to not be clone 82impl<T> Clone for TaskMap<T> { 83 fn clone(&self) -> Self { 84 Self { 85 map: self.map.clone(), 86 expiration: self.expiration, 87 } 88 } 89}