forked from
microcosm.blue/microcosm-rs
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1//! cached record storage
2
3use crate::{Identity, error::RecordError};
4use atrium_api::types::string::{Cid, Did, Nsid, RecordKey};
5use reqwest::Client;
6use serde::{Deserialize, Serialize};
7use serde_json::value::RawValue;
8use std::str::FromStr;
9use std::time::Duration;
10use url::Url;
11
12#[derive(Debug, Serialize, Deserialize)]
13pub struct RawRecord {
14 cid: Cid,
15 record: String,
16}
17
18// TODO: should be able to do typed CID
19impl From<(Cid, Box<RawValue>)> for RawRecord {
20 fn from((cid, rv): (Cid, Box<RawValue>)) -> Self {
21 Self {
22 cid,
23 record: rv.get().to_string(),
24 }
25 }
26}
27
28/// only for use with stored (validated) values, not general strings
29impl From<&RawRecord> for (Cid, Box<RawValue>) {
30 fn from(RawRecord { cid, record }: &RawRecord) -> Self {
31 (
32 cid.clone(),
33 RawValue::from_string(record.to_string())
34 .expect("stored string from RawValue to be valid"),
35 )
36 }
37}
38
39#[derive(Debug, Serialize, Deserialize)]
40pub enum CachedRecord {
41 Found(RawRecord),
42 Deleted,
43}
44
45//////// upstream record fetching
46
47#[derive(Deserialize)]
48struct RecordResponseObject {
49 #[allow(dead_code)] // expect it to be there but we ignore it
50 uri: String,
51 /// CID for this exact version of the record
52 ///
53 /// this is optional in the spec and that's potentially TODO for slingshot
54 cid: Option<String>,
55 /// the record itself as JSON
56 value: Box<RawValue>,
57}
58
59#[derive(Clone)]
60pub struct Repo {
61 identity: Identity,
62 client: Client,
63}
64
65impl Repo {
66 pub fn new(identity: Identity) -> Self {
67 let client = Client::builder()
68 .user_agent(format!(
69 "microcosm slingshot v{} (dev: @bad-example.com)",
70 env!("CARGO_PKG_VERSION")
71 ))
72 .no_proxy()
73 .timeout(Duration::from_secs(10))
74 .build()
75 .unwrap();
76 Repo { identity, client }
77 }
78
79 pub async fn get_record(
80 &self,
81 did: &Did,
82 collection: &Nsid,
83 rkey: &RecordKey,
84 cid: &Option<Cid>,
85 ) -> Result<CachedRecord, RecordError> {
86 let Some(pds) = self.identity.did_to_pds(did.clone()).await? else {
87 return Err(RecordError::NotFound("could not get pds for DID"));
88 };
89
90 // TODO: throttle by host probably, generally guard against outgoing requests
91
92 let mut params = vec![
93 ("repo", did.to_string()),
94 ("collection", collection.to_string()),
95 ("rkey", rkey.to_string()),
96 ];
97 if let Some(cid) = cid {
98 params.push(("cid", cid.as_ref().to_string()));
99 }
100 let mut url = Url::parse_with_params(&pds, ¶ms)?;
101 url.set_path("/xrpc/com.atproto.repo.getRecord");
102
103 let res = self
104 .client
105 .get(url)
106 .send()
107 .await
108 .map_err(RecordError::SendError)?
109 .error_for_status()
110 .map_err(RecordError::StatusError)? // TODO atproto error handling (think about handling not found)
111 .json::<RecordResponseObject>()
112 .await
113 .map_err(RecordError::ParseJsonError)?; // todo...
114
115 let Some(cid) = res.cid else {
116 return Err(RecordError::MissingUpstreamCid);
117 };
118 let cid = Cid::from_str(&cid).map_err(|e| RecordError::BadUpstreamCid(e.to_string()))?;
119
120 Ok(CachedRecord::Found(RawRecord {
121 cid,
122 record: res.value.to_string(),
123 }))
124 }
125}