Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1//! cached record storage 2 3use crate::{Identity, error::RecordError}; 4use atrium_api::types::string::{Cid, Did, Nsid, RecordKey}; 5use reqwest::Client; 6use serde::{Deserialize, Serialize}; 7use serde_json::value::RawValue; 8use std::str::FromStr; 9use std::time::Duration; 10use url::Url; 11 12#[derive(Debug, Serialize, Deserialize)] 13pub struct RawRecord { 14 cid: Cid, 15 record: String, 16} 17 18// TODO: should be able to do typed CID 19impl From<(Cid, Box<RawValue>)> for RawRecord { 20 fn from((cid, rv): (Cid, Box<RawValue>)) -> Self { 21 Self { 22 cid, 23 record: rv.get().to_string(), 24 } 25 } 26} 27 28/// only for use with stored (validated) values, not general strings 29impl From<&RawRecord> for (Cid, Box<RawValue>) { 30 fn from(RawRecord { cid, record }: &RawRecord) -> Self { 31 ( 32 cid.clone(), 33 RawValue::from_string(record.to_string()) 34 .expect("stored string from RawValue to be valid"), 35 ) 36 } 37} 38 39#[derive(Debug, Serialize, Deserialize)] 40pub enum CachedRecord { 41 Found(RawRecord), 42 Deleted, 43} 44 45//////// upstream record fetching 46 47#[derive(Deserialize)] 48struct RecordResponseObject { 49 #[allow(dead_code)] // expect it to be there but we ignore it 50 uri: String, 51 /// CID for this exact version of the record 52 /// 53 /// this is optional in the spec and that's potentially TODO for slingshot 54 cid: Option<String>, 55 /// the record itself as JSON 56 value: Box<RawValue>, 57} 58 59#[derive(Clone)] 60pub struct Repo { 61 identity: Identity, 62 client: Client, 63} 64 65impl Repo { 66 pub fn new(identity: Identity) -> Self { 67 let client = Client::builder() 68 .user_agent(format!( 69 "microcosm slingshot v{} (dev: @bad-example.com)", 70 env!("CARGO_PKG_VERSION") 71 )) 72 .no_proxy() 73 .timeout(Duration::from_secs(10)) 74 .build() 75 .unwrap(); 76 Repo { identity, client } 77 } 78 79 pub async fn get_record( 80 &self, 81 did: &Did, 82 collection: &Nsid, 83 rkey: &RecordKey, 84 cid: &Option<Cid>, 85 ) -> Result<CachedRecord, RecordError> { 86 let Some(pds) = self.identity.did_to_pds(did.clone()).await? else { 87 return Err(RecordError::NotFound("could not get pds for DID")); 88 }; 89 90 // TODO: throttle by host probably, generally guard against outgoing requests 91 92 let mut params = vec![ 93 ("repo", did.to_string()), 94 ("collection", collection.to_string()), 95 ("rkey", rkey.to_string()), 96 ]; 97 if let Some(cid) = cid { 98 params.push(("cid", cid.as_ref().to_string())); 99 } 100 let mut url = Url::parse_with_params(&pds, &params)?; 101 url.set_path("/xrpc/com.atproto.repo.getRecord"); 102 103 let res = self 104 .client 105 .get(url) 106 .send() 107 .await 108 .map_err(RecordError::SendError)? 109 .error_for_status() 110 .map_err(RecordError::StatusError)? // TODO atproto error handling (think about handling not found) 111 .json::<RecordResponseObject>() 112 .await 113 .map_err(RecordError::ParseJsonError)?; // todo... 114 115 let Some(cid) = res.cid else { 116 return Err(RecordError::MissingUpstreamCid); 117 }; 118 let cid = Cid::from_str(&cid).map_err(|e| RecordError::BadUpstreamCid(e.to_string()))?; 119 120 Ok(CachedRecord::Found(RawRecord { 121 cid, 122 record: res.value.to_string(), 123 })) 124 } 125}