forked from
microcosm.blue/microcosm-rs
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use dashmap::DashMap;
2use rand::{Rng, distr::Alphanumeric};
3use std::sync::Arc;
4use std::time::Duration;
5use tokio::task::{JoinHandle, spawn};
6use tokio::time::sleep;
7use tokio_util::sync::{CancellationToken, DropGuard};
8
9pub struct ExpiringTaskMap<T>(TaskMap<T>);
10
11/// need to manually implement clone because T is allowed to not be clone
12impl<T> Clone for ExpiringTaskMap<T> {
13 fn clone(&self) -> Self {
14 Self(self.0.clone())
15 }
16}
17
18impl<T: Send + 'static> ExpiringTaskMap<T> {
19 pub fn new(expiration: Duration) -> Self {
20 let map = TaskMap {
21 map: Arc::new(DashMap::new()),
22 expiration,
23 };
24 Self(map)
25 }
26
27 pub fn dispatch<F>(&self, task: F, cancel: CancellationToken) -> String
28 where
29 F: Future<Output = T> + Send + 'static,
30 {
31 let TaskMap {
32 ref map,
33 expiration,
34 } = self.0;
35 let task_key: String = rand::rng()
36 .sample_iter(&Alphanumeric)
37 .take(24)
38 .map(char::from)
39 .collect();
40
41 // spawn a tokio task and put the join handle in the map for later retrieval
42 map.insert(task_key.clone(), (cancel.clone().drop_guard(), spawn(task)));
43
44 // spawn a second task to clean up the map in case it doesn't get claimed
45 let k = task_key.clone();
46 let map = map.clone();
47 spawn(async move {
48 if cancel
49 .run_until_cancelled(sleep(expiration))
50 .await
51 .is_some()
52 // the (sleep) task completed first
53 {
54 map.remove(&k);
55 cancel.cancel();
56 metrics::counter!("whoami_task_map_completions", "result" => "expired")
57 .increment(1);
58 }
59 });
60
61 task_key
62 }
63
64 pub fn take(&self, key: &str) -> Option<JoinHandle<T>> {
65 if let Some((_key, (_guard, handle))) = self.0.map.remove(key) {
66 // when the _guard drops, it cancels the token for us
67 metrics::counter!("whoami_task_map_completions", "result" => "retrieved").increment(1);
68 Some(handle)
69 } else {
70 metrics::counter!("whoami_task_map_gones").increment(1);
71 None
72 }
73 }
74}
75
76struct TaskMap<T> {
77 map: Arc<DashMap<String, (DropGuard, JoinHandle<T>)>>,
78 expiration: Duration,
79}
80
81/// need to manually implement clone because T is allowed to not be clone
82impl<T> Clone for TaskMap<T> {
83 fn clone(&self) -> Self {
84 Self {
85 map: self.map.clone(),
86 expiration: self.expiration,
87 }
88 }
89}