Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

spawn blocking instead of block_in_place

Changed files
+36 -9
constellation
src
server
+36 -9
constellation/src/server/mod.rs
···
use std::collections::{HashMap, HashSet};
use std::time::{Duration, UNIX_EPOCH};
use tokio::net::{TcpListener, ToSocketAddrs};
-
use tokio::task::block_in_place;
+
use tokio::task::spawn_blocking;
use tokio_util::sync::CancellationToken;
use crate::storage::{LinkReader, StorageStats};
···
const INDEX_BEGAN_AT_TS: u64 = 1738083600; // TODO: not this
+
fn to500(e: tokio::task::JoinError) -> http::StatusCode {
+
eprintln!("handler join error: {e}");
+
http::StatusCode::INTERNAL_SERVER_ERROR
+
}
+
pub async fn serve<S, A>(store: S, addr: A, stay_alive: CancellationToken) -> anyhow::Result<()>
where
S: LinkReader,
···
"/",
get({
let store = store.clone();
-
move |accept| async { block_in_place(|| hello(accept, store)) }
+
move |accept| async {
+
spawn_blocking(|| hello(accept, store))
+
.await
+
.map_err(to500)?
+
}
}),
)
.route(
"/links/count",
get({
let store = store.clone();
-
move |accept, query| async { block_in_place(|| count_links(accept, query, store)) }
+
move |accept, query| async {
+
spawn_blocking(|| count_links(accept, query, store))
+
.await
+
.map_err(to500)?
+
}
}),
)
.route(
···
get({
let store = store.clone();
move |accept, query| async {
-
block_in_place(|| count_distinct_dids(accept, query, store))
+
spawn_blocking(|| count_distinct_dids(accept, query, store))
+
.await
+
.map_err(to500)?
}
}),
)
···
get({
let store = store.clone();
move |accept, query| async {
-
block_in_place(|| get_backlinks(accept, query, store))
+
spawn_blocking(|| get_backlinks(accept, query, store))
+
.await
+
.map_err(to500)?
}
}),
)
···
"/links",
get({
let store = store.clone();
-
move |accept, query| async { block_in_place(|| get_links(accept, query, store)) }
+
move |accept, query| async {
+
spawn_blocking(|| get_links(accept, query, store))
+
.await
+
.map_err(to500)?
+
}
}),
)
.route(
···
get({
let store = store.clone();
move |accept, query| async {
-
block_in_place(|| get_distinct_dids(accept, query, store))
+
spawn_blocking(|| get_distinct_dids(accept, query, store))
+
.await
+
.map_err(to500)?
}
}),
)
···
get({
let store = store.clone();
move |accept, query| async {
-
block_in_place(|| count_all_links(accept, query, store))
+
spawn_blocking(|| count_all_links(accept, query, store))
+
.await
+
.map_err(to500)?
}
}),
)
···
get({
let store = store.clone();
move |accept, query| async {
-
block_in_place(|| explore_links(accept, query, store))
+
spawn_blocking(|| explore_links(accept, query, store))
+
.await
+
.map_err(to500)?
}
}),
)