A better Rust ATProto crate
at main 6.0 kB view raw
1use super::LexiconSource; 2use crate::lexicon::LexiconDoc; 3use jacquard_api::com_atproto::repo::list_records::ListRecords; 4use jacquard_common::IntoStatic; 5use jacquard_common::types::ident::AtIdentifier; 6use jacquard_common::types::string::Nsid; 7use jacquard_common::xrpc::XrpcExt; 8use jacquard_identity::JacquardResolver; 9use jacquard_identity::resolver::{IdentityResolver, ResolverOptions}; 10use miette::{Result, miette}; 11use std::collections::HashMap; 12 13#[derive(Debug, Clone)] 14pub struct AtProtoSource { 15 pub endpoint: String, 16 pub slice: Option<String>, 17} 18 19impl AtProtoSource { 20 fn parse_lexicon_record( 21 record_data: &jacquard_common::types::value::Data<'_>, 22 ) -> Option<LexiconDoc<'static>> { 23 // Extract the 'value' field from the record 24 let value = match record_data { 25 jacquard_common::types::value::Data::Object(map) => map.0.get("value")?, 26 _ => { 27 eprintln!("Warning: Record is not an object"); 28 return None; 29 } 30 }; 31 32 match serde_json::to_string(value) { 33 Ok(json) => match serde_json::from_str::<LexiconDoc>(&json) { 34 Ok(doc) => Some(doc.into_static()), 35 Err(e) => { 36 eprintln!("Warning: Failed to parse lexicon from record value: {}", e); 37 None 38 } 39 }, 40 Err(e) => { 41 eprintln!("Warning: Failed to serialize record value: {}", e); 42 None 43 } 44 } 45 } 46} 47 48impl LexiconSource for AtProtoSource { 49 async fn fetch(&self) -> Result<HashMap<String, LexiconDoc<'_>>> { 50 let http = reqwest::Client::new(); 51 let resolver = JacquardResolver::new(http, ResolverOptions::default()); 52 53 // Parse endpoint as at-identifier (handle or DID) 54 let identifier = AtIdentifier::new(&self.endpoint) 55 .map_err(|e| miette!("Invalid endpoint '{}': {}", self.endpoint, e))?; 56 57 // Resolve to get PDS endpoint 58 let did = match &identifier { 59 AtIdentifier::Did(d) => d.clone().into_static(), 60 AtIdentifier::Handle(h) => resolver.resolve_handle(h).await?, 61 }; 62 63 let did_doc_resp = resolver.resolve_did_doc(&did).await?; 64 65 let did_doc = did_doc_resp.parse()?; 66 67 let pds = did_doc 68 .pds_endpoint() 69 .ok_or_else(|| miette!("No PDS endpoint found for {}", did))?; 70 71 // Determine repo - use slice if provided, otherwise use the resolved DID 72 let repo = if let Some(ref slice) = self.slice { 73 AtIdentifier::new(slice) 74 .map_err(|e| miette!("Invalid slice '{}': {}", slice, e))? 75 .into_static() 76 } else { 77 AtIdentifier::Did(did.clone()) 78 }; 79 80 let collection = Nsid::new("com.atproto.lexicon.schema") 81 .map_err(|e| miette!("Invalid collection NSID: {}", e))?; 82 83 let mut lexicons = HashMap::new(); 84 85 // Try to fetch all records at once first 86 let req = ListRecords::new() 87 .repo(repo.clone().into_static()) 88 .collection(collection.clone().into_static()) 89 .build(); 90 91 let resp = resolver.xrpc(pds.clone()).send(&req).await?; 92 93 match resp.into_output() { 94 Ok(output) => { 95 // Batch fetch succeeded 96 for record_data in output.records { 97 if let Some(doc) = Self::parse_lexicon_record(&record_data) { 98 let nsid = doc.id.to_string(); 99 lexicons.insert(nsid, doc); 100 } 101 } 102 } 103 Err(e) => { 104 // Batch decode failed, try one-by-one with cursor 105 eprintln!("Warning: Batch decode failed from {}: {}", self.endpoint, e); 106 eprintln!("Retrying with limit=1 to skip invalid records..."); 107 108 let mut cursor: Option<String> = None; 109 loop { 110 let req = if let Some(ref c) = cursor { 111 ListRecords::new() 112 .repo(repo.clone().into_static()) 113 .collection(collection.clone().into_static()) 114 .limit(1) 115 .cursor(c.clone()) 116 .build() 117 } else { 118 ListRecords::new() 119 .repo(repo.clone().into_static()) 120 .collection(collection.clone().into_static()) 121 .limit(1) 122 .build() 123 }; 124 let resp = resolver.xrpc(pds.clone()).send(&req).await?; 125 126 match resp.into_output() { 127 Ok(output) => { 128 for record_data in output.records { 129 if let Some(doc) = Self::parse_lexicon_record(&record_data) { 130 let nsid = doc.id.to_string(); 131 lexicons.insert(nsid, doc); 132 } 133 } 134 135 if let Some(next_cursor) = output.cursor { 136 cursor = Some(next_cursor.to_string()); 137 } else { 138 break; 139 } 140 } 141 Err(e) => { 142 eprintln!("Warning: Failed to decode record (skipping): {}", e); 143 // Try to continue with next record if possible 144 // This is a bit tricky since we don't have the cursor from failed decode 145 // For now, just break 146 break; 147 } 148 } 149 } 150 } 151 } 152 153 Ok(lexicons) 154 } 155}