1use super::LexiconSource;
2use crate::lexicon::LexiconDoc;
3use jacquard_api::com_atproto::repo::list_records::ListRecords;
4use jacquard_common::IntoStatic;
5use jacquard_common::types::ident::AtIdentifier;
6use jacquard_common::types::string::Nsid;
7use jacquard_common::xrpc::XrpcExt;
8use jacquard_identity::JacquardResolver;
9use jacquard_identity::resolver::{IdentityResolver, ResolverOptions};
10use miette::{Result, miette};
11use std::collections::HashMap;
12
13#[derive(Debug, Clone)]
14pub struct AtProtoSource {
15 pub endpoint: String,
16 pub slice: Option<String>,
17}
18
19impl AtProtoSource {
20 fn parse_lexicon_record(
21 record_data: &jacquard_common::types::value::Data<'_>,
22 ) -> Option<LexiconDoc<'static>> {
23 // Extract the 'value' field from the record
24 let value = match record_data {
25 jacquard_common::types::value::Data::Object(map) => map.0.get("value")?,
26 _ => {
27 eprintln!("Warning: Record is not an object");
28 return None;
29 }
30 };
31
32 match serde_json::to_string(value) {
33 Ok(json) => match serde_json::from_str::<LexiconDoc>(&json) {
34 Ok(doc) => Some(doc.into_static()),
35 Err(e) => {
36 eprintln!("Warning: Failed to parse lexicon from record value: {}", e);
37 None
38 }
39 },
40 Err(e) => {
41 eprintln!("Warning: Failed to serialize record value: {}", e);
42 None
43 }
44 }
45 }
46}
47
48impl LexiconSource for AtProtoSource {
49 async fn fetch(&self) -> Result<HashMap<String, LexiconDoc<'_>>> {
50 let http = reqwest::Client::new();
51 let resolver = JacquardResolver::new(http, ResolverOptions::default());
52
53 // Parse endpoint as at-identifier (handle or DID)
54 let identifier = AtIdentifier::new(&self.endpoint)
55 .map_err(|e| miette!("Invalid endpoint '{}': {}", self.endpoint, e))?;
56
57 // Resolve to get PDS endpoint
58 let did = match &identifier {
59 AtIdentifier::Did(d) => d.clone().into_static(),
60 AtIdentifier::Handle(h) => resolver.resolve_handle(h).await?,
61 };
62
63 let did_doc_resp = resolver.resolve_did_doc(&did).await?;
64
65 let did_doc = did_doc_resp.parse()?;
66
67 let pds = did_doc
68 .pds_endpoint()
69 .ok_or_else(|| miette!("No PDS endpoint found for {}", did))?;
70
71 // Determine repo - use slice if provided, otherwise use the resolved DID
72 let repo = if let Some(ref slice) = self.slice {
73 AtIdentifier::new(slice)
74 .map_err(|e| miette!("Invalid slice '{}': {}", slice, e))?
75 .into_static()
76 } else {
77 AtIdentifier::Did(did.clone())
78 };
79
80 let collection = Nsid::new("com.atproto.lexicon.schema")
81 .map_err(|e| miette!("Invalid collection NSID: {}", e))?;
82
83 let mut lexicons = HashMap::new();
84
85 // Try to fetch all records at once first
86 let req = ListRecords::new()
87 .repo(repo.clone().into_static())
88 .collection(collection.clone().into_static())
89 .build();
90
91 let resp = resolver.xrpc(pds.clone()).send(&req).await?;
92
93 match resp.into_output() {
94 Ok(output) => {
95 // Batch fetch succeeded
96 for record_data in output.records {
97 if let Some(doc) = Self::parse_lexicon_record(&record_data) {
98 let nsid = doc.id.to_string();
99 lexicons.insert(nsid, doc);
100 }
101 }
102 }
103 Err(e) => {
104 // Batch decode failed, try one-by-one with cursor
105 eprintln!("Warning: Batch decode failed from {}: {}", self.endpoint, e);
106 eprintln!("Retrying with limit=1 to skip invalid records...");
107
108 let mut cursor: Option<String> = None;
109 loop {
110 let req = if let Some(ref c) = cursor {
111 ListRecords::new()
112 .repo(repo.clone().into_static())
113 .collection(collection.clone().into_static())
114 .limit(1)
115 .cursor(c.clone())
116 .build()
117 } else {
118 ListRecords::new()
119 .repo(repo.clone().into_static())
120 .collection(collection.clone().into_static())
121 .limit(1)
122 .build()
123 };
124 let resp = resolver.xrpc(pds.clone()).send(&req).await?;
125
126 match resp.into_output() {
127 Ok(output) => {
128 for record_data in output.records {
129 if let Some(doc) = Self::parse_lexicon_record(&record_data) {
130 let nsid = doc.id.to_string();
131 lexicons.insert(nsid, doc);
132 }
133 }
134
135 if let Some(next_cursor) = output.cursor {
136 cursor = Some(next_cursor.to_string());
137 } else {
138 break;
139 }
140 }
141 Err(e) => {
142 eprintln!("Warning: Failed to decode record (skipping): {}", e);
143 // Try to continue with next record if possible
144 // This is a bit tricky since we don't have the cursor from failed decode
145 // For now, just break
146 break;
147 }
148 }
149 }
150 }
151 }
152
153 Ok(lexicons)
154 }
155}