Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

retry upstream getRecord without CID if NotFound

Changed files
+57 -21
slingshot
+5
slingshot/src/error.rs
···
+
use crate::ErrorResponseObject;
use thiserror::Error;
#[derive(Debug, Error)]
···
MissingUpstreamCid,
#[error("upstream CID was not valid: {0}")]
BadUpstreamCid(String),
+
#[error("upstream atproto-looking bad request")]
+
UpstreamBadRequest(ErrorResponseObject),
+
#[error("upstream non-atproto bad request")]
+
UpstreamBadBadNotGoodRequest(reqwest::Error),
}
+1 -1
slingshot/src/lib.rs
···
pub use consumer::consume;
pub use firehose_cache::firehose_cache;
pub use identity::Identity;
-
pub use record::{CachedRecord, Repo};
+
pub use record::{CachedRecord, ErrorResponseObject, Repo};
pub use server::serve;
+51 -20
slingshot/src/record.rs
···
use crate::{Identity, error::RecordError};
use atrium_api::types::string::{Cid, Did, Nsid, RecordKey};
-
use reqwest::Client;
+
use reqwest::{Client, StatusCode};
use serde::{Deserialize, Serialize};
use serde_json::value::RawValue;
use std::str::FromStr;
···
value: Box<RawValue>,
}
+
#[derive(Debug, Deserialize)]
+
pub struct ErrorResponseObject {
+
error: String,
+
#[allow(dead_code)]
+
message: String,
+
}
+
#[derive(Clone)]
pub struct Repo {
identity: Identity,
···
return Err(RecordError::NotFound("could not get pds for DID"));
};
-
// TODO: throttle by host probably, generally guard against outgoing requests
+
// cid gets set to None for a retry, if it's Some and we got NotFound
+
let mut cid = cid;
-
let mut params = vec![
-
("repo", did.to_string()),
-
("collection", collection.to_string()),
-
("rkey", rkey.to_string()),
-
];
-
if let Some(cid) = cid {
-
params.push(("cid", cid.as_ref().to_string()));
-
}
-
let mut url = Url::parse_with_params(&pds, &params)?;
-
url.set_path("/xrpc/com.atproto.repo.getRecord");
+
let res = loop {
+
// TODO: throttle outgoing requests by host probably, generally guard against outgoing requests
+
let mut params = vec![
+
("repo", did.to_string()),
+
("collection", collection.to_string()),
+
("rkey", rkey.to_string()),
+
];
+
if let Some(cid) = cid {
+
params.push(("cid", cid.as_ref().to_string()));
+
}
+
let mut url = Url::parse_with_params(&pds, &params)?;
+
url.set_path("/xrpc/com.atproto.repo.getRecord");
+
+
let res = self
+
.client
+
.get(url.clone())
+
.send()
+
.await
+
.map_err(RecordError::SendError)?;
+
+
if res.status() == StatusCode::BAD_REQUEST {
+
// 1. if we're not able to parse json, it's not something we can handle
+
let err = res
+
.json::<ErrorResponseObject>()
+
.await
+
.map_err(RecordError::UpstreamBadBadNotGoodRequest)?;
+
// 2. if we are, is it a NotFound? and if so, did we try with a CID?
+
// if so, retry with no CID (api handler will reject for mismatch but
+
// with a nice error + warm cache)
+
if err.error == "NotFound" && cid.is_some() {
+
cid = &None;
+
continue;
+
} else {
+
return Err(RecordError::UpstreamBadRequest(err));
+
}
+
}
+
break res;
+
};
-
let res = self
-
.client
-
.get(url)
-
.send()
-
.await
-
.map_err(RecordError::SendError)?
+
let data = res
.error_for_status()
.map_err(RecordError::StatusError)? // TODO atproto error handling (think about handling not found)
.json::<RecordResponseObject>()
.await
.map_err(RecordError::ParseJsonError)?; // todo...
-
let Some(cid) = res.cid else {
+
let Some(cid) = data.cid else {
return Err(RecordError::MissingUpstreamCid);
};
let cid = Cid::from_str(&cid).map_err(|e| RecordError::BadUpstreamCid(e.to_string()))?;
Ok(CachedRecord::Found(RawRecord {
cid,
-
record: res.value.to_string(),
+
record: data.value.to_string(),
}))
}
}