Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

resolve handle early and return uri with did

Changed files
+102 -37
slingshot
+9 -2
slingshot/src/main.rs
···
Ok(())
});
-
let repo = Repo::new(identity);
let server_shutdown = shutdown.clone();
let server_cache_handle = cache.clone();
tasks.spawn(async move {
-
serve(server_cache_handle, repo, args.host, server_shutdown).await?;
Ok(())
});
···
Ok(())
});
+
let repo = Repo::new(identity.clone());
let server_shutdown = shutdown.clone();
let server_cache_handle = cache.clone();
tasks.spawn(async move {
+
serve(
+
server_cache_handle,
+
identity,
+
repo,
+
args.host,
+
server_shutdown,
+
)
+
.await?;
Ok(())
});
+8 -18
slingshot/src/record.rs
···
//! cached record storage
use crate::{Identity, error::RecordError};
-
use atrium_api::types::string::{Cid, Did, Handle};
use reqwest::Client;
use serde::{Deserialize, Serialize};
use serde_json::value::RawValue;
···
pub async fn get_record(
&self,
-
did_or_handle: String,
-
collection: String,
-
rkey: String,
-
cid: Option<String>,
) -> Result<CachedRecord, RecordError> {
-
let did = match Did::new(did_or_handle.clone()) {
-
Ok(did) => did,
-
Err(_) => {
-
let handle = Handle::new(did_or_handle).map_err(|_| RecordError::BadRepo)?;
-
let Some(did) = self.identity.handle_to_did(handle).await? else {
-
return Err(RecordError::NotFound("could not resolve and verify handle"));
-
};
-
did
-
}
-
};
let Some(pds) = self.identity.did_to_pds(did.clone()).await? else {
return Err(RecordError::NotFound("could not get pds for DID"));
};
···
let mut params = vec![
("repo", did.to_string()),
-
("collection", collection),
-
("rkey", rkey),
];
if let Some(cid) = cid {
-
params.push(("cid", cid));
}
let mut url = Url::parse_with_params(&pds, &params)?;
url.set_path("/xrpc/com.atproto.repo.getRecord");
···
//! cached record storage
use crate::{Identity, error::RecordError};
+
use atrium_api::types::string::{Cid, Did, Nsid, RecordKey};
use reqwest::Client;
use serde::{Deserialize, Serialize};
use serde_json::value::RawValue;
···
pub async fn get_record(
&self,
+
did: &Did,
+
collection: &Nsid,
+
rkey: &RecordKey,
+
cid: &Option<Cid>,
) -> Result<CachedRecord, RecordError> {
let Some(pds) = self.identity.did_to_pds(did.clone()).await? else {
return Err(RecordError::NotFound("could not get pds for DID"));
};
···
let mut params = vec![
("repo", did.to_string()),
+
("collection", collection.to_string()),
+
("rkey", rkey.to_string()),
];
if let Some(cid) = cid {
+
params.push(("cid", cid.as_ref().to_string()));
}
let mut url = Url::parse_with_params(&pds, &params)?;
url.set_path("/xrpc/com.atproto.repo.getRecord");
+85 -17
slingshot/src/server.rs
···
-
use crate::{CachedRecord, Repo, error::ServerError};
use foyer::HybridCache;
use serde::Serialize;
use std::sync::Arc;
use tokio_util::sync::CancellationToken;
···
message: "This record was deleted".to_string(),
}
}
}
fn bad_request_handler(err: poem::Error) -> GetRecordResponse {
···
/// also list `InvalidRequest`, `ExpiredToken`, and `InvalidToken`. Of
/// these, slingshot will only return `RecordNotFound` or `InvalidRequest`.
#[oai(status = 400)]
-
BadRequest(Json<XrpcErrorResponseObject>),
}
struct Xrpc {
cache: HybridCache<String, CachedRecord>,
repo: Arc<Repo>,
}
···
/// record.
Query(cid): Query<Option<String>>,
) -> GetRecordResponse {
-
// TODO: yeah yeah
-
let at_uri = format!("at://{repo}/{collection}/{rkey}");
let entry = self
.cache
···
let repo_api = self.repo.clone();
|| async move {
repo_api
-
.get_record(repo, collection, rkey, cid)
.await
.map_err(|e| foyer::Error::Other(Box::new(e)))
}
···
match *entry {
CachedRecord::Found(ref raw) => {
let (found_cid, raw_value) = raw.into();
-
let found_cid = found_cid.as_ref().to_string();
if cid.clone().map(|c| c != found_cid).unwrap_or(false) {
return GetRecordResponse::BadRequest(Json(XrpcErrorResponseObject {
error: "RecordNotFound".to_string(),
···
serde_json::from_str(raw_value.get()).expect("RawValue to be valid json");
GetRecordResponse::Ok(Json(FoundRecordResponseObject {
uri: at_uri,
-
cid: Some(found_cid),
value,
}))
}
···
pub async fn serve(
cache: HybridCache<String, CachedRecord>,
repo: Repo,
host: Option<String>,
_shutdown: CancellationToken,
) -> Result<(), ServerError> {
let repo = Arc::new(repo);
-
let api_service =
-
OpenApiService::new(Xrpc { cache, repo }, "Slingshot", env!("CARGO_PKG_VERSION"))
-
.server("http://localhost:3000")
-
.url_prefix("/xrpc");
let mut app = Route::new()
.nest("/", api_service.scalar())
···
.install_default()
.expect("alskfjalksdjf");
-
app = app
-
.at("/.well-known/did.json", get_did_doc(&host));
let auto_cert = AutoCert::builder()
.directory_url(LETS_ENCRYPT_PRODUCTION)
···
async fn run<L>(listener: L, app: Route) -> Result<(), ServerError>
where
-
L: Listener + 'static
{
let app = app
-
.with(Cors::new()
-
.allow_method(Method::GET)
-
.allow_credentials(false))
.with(Tracing);
Server::new(listener)
.name("slingshot")
···
+
use crate::{CachedRecord, Identity, Repo, error::ServerError};
+
use atrium_api::types::string::{Cid, Did, Handle, Nsid, RecordKey};
use foyer::HybridCache;
use serde::Serialize;
+
use std::str::FromStr;
use std::sync::Arc;
use tokio_util::sync::CancellationToken;
···
message: "This record was deleted".to_string(),
}
}
+
}
+
type XrpcError = Json<XrpcErrorResponseObject>;
+
fn xrpc_error(error: impl AsRef<str>, message: impl AsRef<str>) -> XrpcError {
+
Json(XrpcErrorResponseObject {
+
error: error.as_ref().to_string(),
+
message: message.as_ref().to_string(),
+
})
}
fn bad_request_handler(err: poem::Error) -> GetRecordResponse {
···
/// also list `InvalidRequest`, `ExpiredToken`, and `InvalidToken`. Of
/// these, slingshot will only return `RecordNotFound` or `InvalidRequest`.
#[oai(status = 400)]
+
BadRequest(XrpcError),
+
/// Just using 500 for potentially upstream errors for now
+
#[oai(status = 500)]
+
ServerError(XrpcError),
}
struct Xrpc {
cache: HybridCache<String, CachedRecord>,
+
identity: Identity,
repo: Arc<Repo>,
}
···
/// record.
Query(cid): Query<Option<String>>,
) -> GetRecordResponse {
+
let did = match Did::new(repo.clone()) {
+
Ok(did) => did,
+
Err(_) => {
+
let Ok(handle) = Handle::new(repo) else {
+
return GetRecordResponse::BadRequest(xrpc_error(
+
"InvalidRequest",
+
"repo was not a valid DID or handle",
+
));
+
};
+
if let Ok(res) = self.identity.handle_to_did(handle).await {
+
if let Some(did) = res {
+
did
+
} else {
+
return GetRecordResponse::BadRequest(xrpc_error(
+
"InvalidRequest",
+
"Could not resolve handle repo to a DID",
+
));
+
}
+
} else {
+
return GetRecordResponse::ServerError(xrpc_error(
+
"ResolutionFailed",
+
"errored while trying to resolve handle to DID",
+
));
+
}
+
}
+
};
+
+
let Ok(collection) = Nsid::new(collection) else {
+
return GetRecordResponse::BadRequest(xrpc_error(
+
"InvalidRequest",
+
"invalid NSID for collection",
+
));
+
};
+
+
let Ok(rkey) = RecordKey::new(rkey) else {
+
return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "invalid rkey"));
+
};
+
+
let cid: Option<Cid> = if let Some(cid) = cid {
+
let Ok(cid) = Cid::from_str(&cid) else {
+
return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "invalid CID"));
+
};
+
Some(cid)
+
} else {
+
None
+
};
+
+
let at_uri = format!("at://{}/{}/{}", &*did, &*collection, &*rkey);
let entry = self
.cache
···
let repo_api = self.repo.clone();
|| async move {
repo_api
+
.get_record(&did, &collection, &rkey, &cid)
.await
.map_err(|e| foyer::Error::Other(Box::new(e)))
}
···
match *entry {
CachedRecord::Found(ref raw) => {
let (found_cid, raw_value) = raw.into();
if cid.clone().map(|c| c != found_cid).unwrap_or(false) {
return GetRecordResponse::BadRequest(Json(XrpcErrorResponseObject {
error: "RecordNotFound".to_string(),
···
serde_json::from_str(raw_value.get()).expect("RawValue to be valid json");
GetRecordResponse::Ok(Json(FoundRecordResponseObject {
uri: at_uri,
+
cid: Some(found_cid.as_ref().to_string()),
value,
}))
}
···
pub async fn serve(
cache: HybridCache<String, CachedRecord>,
+
identity: Identity,
repo: Repo,
host: Option<String>,
_shutdown: CancellationToken,
) -> Result<(), ServerError> {
let repo = Arc::new(repo);
+
let api_service = OpenApiService::new(
+
Xrpc {
+
cache,
+
identity,
+
repo,
+
},
+
"Slingshot",
+
env!("CARGO_PKG_VERSION"),
+
)
+
.server("http://localhost:3000")
+
.url_prefix("/xrpc");
let mut app = Route::new()
.nest("/", api_service.scalar())
···
.install_default()
.expect("alskfjalksdjf");
+
app = app.at("/.well-known/did.json", get_did_doc(&host));
let auto_cert = AutoCert::builder()
.directory_url(LETS_ENCRYPT_PRODUCTION)
···
async fn run<L>(listener: L, app: Route) -> Result<(), ServerError>
where
+
L: Listener + 'static,
{
let app = app
+
.with(
+
Cors::new()
+
.allow_origin("*")
+
.allow_methods([Method::GET])
+
.allow_credentials(false),
+
)
.with(Tracing);
Server::new(listener)
.name("slingshot")