forked from
microcosm.blue/microcosm-rs
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1//! cached record storage
2
3use crate::{Identity, error::RecordError};
4use atrium_api::types::string::{Cid, Did, Nsid, RecordKey};
5use reqwest::{Client, StatusCode};
6use serde::{Deserialize, Serialize};
7use serde_json::value::RawValue;
8use std::str::FromStr;
9use std::time::Duration;
10use url::Url;
11
12#[derive(Debug, Serialize, Deserialize)]
13pub struct RawRecord {
14 cid: Cid,
15 record: String,
16}
17
18// TODO: should be able to do typed CID
19impl From<(Cid, Box<RawValue>)> for RawRecord {
20 fn from((cid, rv): (Cid, Box<RawValue>)) -> Self {
21 Self {
22 cid,
23 record: rv.get().to_string(),
24 }
25 }
26}
27
28/// only for use with stored (validated) values, not general strings
29impl From<&RawRecord> for (Cid, Box<RawValue>) {
30 fn from(RawRecord { cid, record }: &RawRecord) -> Self {
31 (
32 cid.clone(),
33 RawValue::from_string(record.to_string())
34 .expect("stored string from RawValue to be valid"),
35 )
36 }
37}
38
39#[derive(Debug, Serialize, Deserialize)]
40pub enum CachedRecord {
41 Found(RawRecord),
42 Deleted,
43}
44
45//////// upstream record fetching
46
47#[derive(Deserialize)]
48struct RecordResponseObject {
49 #[allow(dead_code)] // expect it to be there but we ignore it
50 uri: String,
51 /// CID for this exact version of the record
52 ///
53 /// this is optional in the spec and that's potentially TODO for slingshot
54 cid: Option<String>,
55 /// the record itself as JSON
56 value: Box<RawValue>,
57}
58
59#[derive(Debug, Deserialize)]
60pub struct ErrorResponseObject {
61 pub error: String,
62 pub message: String,
63}
64
65#[derive(Clone)]
66pub struct Repo {
67 identity: Identity,
68 client: Client,
69}
70
71impl Repo {
72 pub fn new(identity: Identity) -> Self {
73 let client = Client::builder()
74 .user_agent(format!(
75 "microcosm slingshot v{} (dev: @bad-example.com)",
76 env!("CARGO_PKG_VERSION")
77 ))
78 .no_proxy()
79 .timeout(Duration::from_secs(10))
80 .build()
81 .unwrap();
82 Repo { identity, client }
83 }
84
85 pub async fn get_record(
86 &self,
87 did: &Did,
88 collection: &Nsid,
89 rkey: &RecordKey,
90 cid: &Option<Cid>,
91 ) -> Result<CachedRecord, RecordError> {
92 let Some(pds) = self.identity.did_to_pds(did.clone()).await? else {
93 return Err(RecordError::NotFound("could not get pds for DID"));
94 };
95
96 // cid gets set to None for a retry, if it's Some and we got NotFound
97 let mut cid = cid;
98
99 let res = loop {
100 // TODO: throttle outgoing requests by host probably, generally guard against outgoing requests
101 let mut params = vec![
102 ("repo", did.to_string()),
103 ("collection", collection.to_string()),
104 ("rkey", rkey.to_string()),
105 ];
106 if let Some(cid) = cid {
107 params.push(("cid", cid.as_ref().to_string()));
108 }
109 let mut url = Url::parse_with_params(&pds, ¶ms)?;
110 url.set_path("/xrpc/com.atproto.repo.getRecord");
111
112 let res = self
113 .client
114 .get(url.clone())
115 .send()
116 .await
117 .map_err(RecordError::SendError)?;
118
119 if res.status() == StatusCode::BAD_REQUEST {
120 // 1. if we're not able to parse json, it's not something we can handle
121 let err = res
122 .json::<ErrorResponseObject>()
123 .await
124 .map_err(RecordError::UpstreamBadBadNotGoodRequest)?;
125 // 2. if we are, is it a NotFound? and if so, did we try with a CID?
126 // if so, retry with no CID (api handler will reject for mismatch but
127 // with a nice error + warm cache)
128 if err.error == "NotFound" && cid.is_some() {
129 cid = &None;
130 continue;
131 } else {
132 return Err(RecordError::UpstreamBadRequest(err));
133 }
134 }
135 break res;
136 };
137
138 let data = res
139 .error_for_status()
140 .map_err(RecordError::StatusError)? // TODO atproto error handling (think about handling not found)
141 .json::<RecordResponseObject>()
142 .await
143 .map_err(RecordError::ParseJsonError)?; // todo...
144
145 let Some(cid) = data.cid else {
146 return Err(RecordError::MissingUpstreamCid);
147 };
148 let cid = Cid::from_str(&cid).map_err(|e| RecordError::BadUpstreamCid(e.to_string()))?;
149
150 Ok(CachedRecord::Found(RawRecord {
151 cid,
152 record: data.value.to_string(),
153 }))
154 }
155}