forked from
microcosm.blue/microcosm-rs
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use hickory_resolver::{ResolveError, TokioResolver};
2use std::collections::{HashSet, VecDeque};
3use std::path::Path;
4use std::sync::Arc;
5/// for now we're gonna just keep doing more cache
6///
7/// plc.director x foyer, ttl kept with data, refresh deferred to background on fetch
8///
9/// things we need:
10///
11/// 1. handle -> DID resolution: getRecord must accept a handle for `repo` param
12/// 2. DID -> PDS resolution: so we know where to getRecord
13/// 3. DID -> handle resolution: for bidirectional handle validation and in case we want to offer this
14use std::time::Duration;
15use tokio::sync::Mutex;
16
17use crate::error::IdentityError;
18use atrium_api::{
19 did_doc::DidDocument,
20 types::string::{Did, Handle},
21};
22use atrium_common::resolver::Resolver;
23use atrium_identity::{
24 did::{CommonDidResolver, CommonDidResolverConfig, DEFAULT_PLC_DIRECTORY_URL},
25 handle::{AtprotoHandleResolver, AtprotoHandleResolverConfig, DnsTxtResolver},
26};
27use atrium_oauth::DefaultHttpClient; // it's probably not worth bringing all of atrium_oauth for this but
28use foyer::{DirectFsDeviceOptions, Engine, HybridCache, HybridCacheBuilder};
29use serde::{Deserialize, Serialize};
30use time::UtcDateTime;
31
32/// once we have something resolved, don't re-resolve until after this period
33const MIN_TTL: Duration = Duration::from_secs(4 * 3600); // probably shoudl have a max ttl
34const MIN_NOT_FOUND_TTL: Duration = Duration::from_secs(60);
35
36#[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)]
37enum IdentityKey {
38 Handle(Handle),
39 Did(Did),
40}
41
42#[derive(Debug, Serialize, Deserialize)]
43struct IdentityVal(UtcDateTime, IdentityData);
44
45#[derive(Debug, Serialize, Deserialize)]
46enum IdentityData {
47 NotFound,
48 Did(Did),
49 Doc(PartialMiniDoc),
50}
51
52/// partial representation of a com.bad-example.identity mini atproto doc
53///
54/// partial because the handle is not verified
55#[derive(Debug, Clone, Serialize, Deserialize)]
56struct PartialMiniDoc {
57 /// an atproto handle (**unverified**)
58 ///
59 /// the first valid atproto handle from the did doc's aka
60 unverified_handle: Handle,
61 /// the did's atproto pds url (TODO: type this?)
62 ///
63 /// note: atrium *does* actually parse it into a URI, it just doesn't return
64 /// that for some reason
65 pds: String,
66 /// for now we're just pulling this straight from the did doc
67 ///
68 /// would be nice to type and validate it
69 ///
70 /// this is the publicKeyMultibase from the did doc.
71 /// legacy key encoding not supported.
72 /// `id`, `type`, and `controller` must be checked, but aren't stored.
73 signing_key: String,
74}
75
76impl TryFrom<DidDocument> for PartialMiniDoc {
77 type Error = String;
78 fn try_from(did_doc: DidDocument) -> Result<Self, Self::Error> {
79 // must use the first valid handle
80 let mut unverified_handle = None;
81 let Some(ref doc_akas) = did_doc.also_known_as else {
82 return Err("did doc missing `also_known_as`".to_string());
83 };
84 for aka in doc_akas {
85 let Some(maybe_handle) = aka.strip_prefix("at://") else {
86 continue;
87 };
88 let Ok(valid_handle) = Handle::new(maybe_handle.to_string()) else {
89 continue;
90 };
91 unverified_handle = Some(valid_handle);
92 break;
93 }
94 let Some(unverified_handle) = unverified_handle else {
95 return Err("no valid atproto handles in `also_known_as`".to_string());
96 };
97
98 // atrium seems to get service endpoint getters
99 let Some(pds) = did_doc.get_pds_endpoint() else {
100 return Err("no valid pds service found".to_string());
101 };
102
103 // TODO can't use atrium's get_signing_key() becuase it fails to check type and controller
104 // so if we check those and reject it, we might miss a later valid key in the array
105 // (todo is to fix atrium)
106 // actually: atrium might be flexible for legacy reps. for now we're rejecting legacy rep.
107
108 // must use the first valid signing key
109 let mut signing_key = None;
110 let Some(verification_methods) = did_doc.verification_method else {
111 return Err("no verification methods found".to_string());
112 };
113 for method in verification_methods {
114 if method.id != format!("{}#atproto", did_doc.id) {
115 continue;
116 }
117 if method.r#type != "Multikey" {
118 continue;
119 }
120 if method.controller != did_doc.id {
121 continue;
122 }
123 let Some(key) = method.public_key_multibase else {
124 continue;
125 };
126 signing_key = Some(key);
127 break;
128 }
129 let Some(signing_key) = signing_key else {
130 return Err("no valid atproto signing key found in verification methods".to_string());
131 };
132
133 Ok(PartialMiniDoc {
134 unverified_handle,
135 pds,
136 signing_key,
137 })
138 }
139}
140
141/// multi-producer *single-consumer* queue structures (wrap in arc-mutex plz)
142///
143/// the hashset allows testing for presense of items in the queue.
144/// this has absolutely no support for multiple queue consumers.
145#[derive(Debug, Default)]
146struct RefreshQueue {
147 queue: VecDeque<IdentityKey>,
148 items: HashSet<IdentityKey>,
149}
150
151#[derive(Clone)]
152pub struct Identity {
153 handle_resolver: Arc<AtprotoHandleResolver<HickoryDnsTxtResolver, DefaultHttpClient>>,
154 did_resolver: Arc<CommonDidResolver<DefaultHttpClient>>,
155 cache: HybridCache<IdentityKey, IdentityVal>,
156 /// multi-producer *single consumer* queue
157 refresh_queue: Arc<Mutex<RefreshQueue>>,
158 /// just a lock to ensure only one refresher (queue consumer) is running (to be improved with a better refresher)
159 refresher: Arc<Mutex<()>>,
160}
161
162impl Identity {
163 pub async fn new(cache_dir: impl AsRef<Path>) -> Result<Self, IdentityError> {
164 let http_client = Arc::new(DefaultHttpClient::default());
165 let handle_resolver = AtprotoHandleResolver::new(AtprotoHandleResolverConfig {
166 dns_txt_resolver: HickoryDnsTxtResolver::new().unwrap(),
167 http_client: http_client.clone(),
168 });
169 let did_resolver = CommonDidResolver::new(CommonDidResolverConfig {
170 plc_directory_url: DEFAULT_PLC_DIRECTORY_URL.to_string(),
171 http_client: http_client.clone(),
172 });
173
174 let cache = HybridCacheBuilder::new()
175 .with_name("identity")
176 .memory(16 * 2_usize.pow(20))
177 .with_weighter(|k, v| std::mem::size_of_val(k) + std::mem::size_of_val(v))
178 .storage(Engine::large())
179 .with_device_options(DirectFsDeviceOptions::new(cache_dir))
180 .build()
181 .await?;
182
183 Ok(Self {
184 handle_resolver: Arc::new(handle_resolver),
185 did_resolver: Arc::new(did_resolver),
186 cache,
187 refresh_queue: Default::default(),
188 refresher: Default::default(),
189 })
190 }
191
192 /// Resolve (and verify!) an atproto handle to a DID
193 ///
194 /// The result can be stale
195 ///
196 /// `None` if the handle can't be found or verification fails
197 pub async fn handle_to_did(&self, handle: Handle) -> Result<Option<Did>, IdentityError> {
198 let Some(did) = self.handle_to_unverified_did(&handle).await? else {
199 return Ok(None);
200 };
201 let Some(doc) = self.did_to_partial_mini_doc(&did).await? else {
202 return Ok(None);
203 };
204 if doc.unverified_handle != handle {
205 return Ok(None);
206 }
207 Ok(Some(did))
208 }
209
210 /// Resolve (and verify!) a DID to a pds url
211 ///
212 /// This *also* incidentally resolves and verifies the handle, which might
213 /// make it slower than expected
214 pub async fn did_to_pds(&self, did: Did) -> Result<Option<String>, IdentityError> {
215 let Some(mini_doc) = self.did_to_partial_mini_doc(&did).await? else {
216 return Ok(None);
217 };
218 Ok(Some(mini_doc.pds))
219 }
220
221 /// Resolve (and cache but **not verify**) a handle to a DID
222 async fn handle_to_unverified_did(
223 &self,
224 handle: &Handle,
225 ) -> Result<Option<Did>, IdentityError> {
226 let key = IdentityKey::Handle(handle.clone());
227 let entry = self
228 .cache
229 .fetch(key.clone(), {
230 let handle = handle.clone();
231 let resolver = self.handle_resolver.clone();
232 || async move {
233 match resolver.resolve(&handle).await {
234 Ok(did) => Ok(IdentityVal(UtcDateTime::now(), IdentityData::Did(did))),
235 Err(atrium_identity::Error::NotFound) => {
236 Ok(IdentityVal(UtcDateTime::now(), IdentityData::NotFound))
237 }
238 Err(other) => Err(foyer::Error::Other(Box::new(
239 IdentityError::ResolutionFailed(other),
240 ))),
241 }
242 }
243 })
244 .await?;
245
246 let now = UtcDateTime::now();
247 let IdentityVal(last_fetch, data) = entry.value();
248 match data {
249 IdentityData::Doc(_) => {
250 log::error!("identity value mixup: got a doc from a handle key (should be a did)");
251 Err(IdentityError::IdentityValTypeMixup(handle.to_string()))
252 }
253 IdentityData::NotFound => {
254 if (now - *last_fetch) >= MIN_NOT_FOUND_TTL {
255 self.queue_refresh(key).await;
256 }
257 Ok(None)
258 }
259 IdentityData::Did(did) => {
260 if (now - *last_fetch) >= MIN_TTL {
261 self.queue_refresh(key).await;
262 }
263 Ok(Some(did.clone()))
264 }
265 }
266 }
267
268 /// Fetch (and cache) a partial mini doc from a did
269 async fn did_to_partial_mini_doc(
270 &self,
271 did: &Did,
272 ) -> Result<Option<PartialMiniDoc>, IdentityError> {
273 let key = IdentityKey::Did(did.clone());
274 let entry = self
275 .cache
276 .fetch(key.clone(), {
277 let did = did.clone();
278 let resolver = self.did_resolver.clone();
279 || async move {
280 match resolver.resolve(&did).await {
281 Ok(did_doc) => {
282 // TODO: fix in atrium: should verify id is did
283 if did_doc.id != did.to_string() {
284 return Err(foyer::Error::other(Box::new(
285 IdentityError::BadDidDoc(
286 "did doc's id did not match did".to_string(),
287 ),
288 )));
289 }
290 let mini_doc = did_doc.try_into().map_err(|e| {
291 foyer::Error::Other(Box::new(IdentityError::BadDidDoc(e)))
292 })?;
293 Ok(IdentityVal(UtcDateTime::now(), IdentityData::Doc(mini_doc)))
294 }
295 Err(atrium_identity::Error::NotFound) => {
296 Ok(IdentityVal(UtcDateTime::now(), IdentityData::NotFound))
297 }
298 Err(other) => Err(foyer::Error::Other(Box::new(
299 IdentityError::ResolutionFailed(other),
300 ))),
301 }
302 }
303 })
304 .await?;
305
306 let now = UtcDateTime::now();
307 let IdentityVal(last_fetch, data) = entry.value();
308 match data {
309 IdentityData::Did(_) => {
310 log::error!("identity value mixup: got a did from a did key (should be a doc)");
311 Err(IdentityError::IdentityValTypeMixup(did.to_string()))
312 }
313 IdentityData::NotFound => {
314 if (now - *last_fetch) >= MIN_NOT_FOUND_TTL {
315 self.queue_refresh(key).await;
316 }
317 Ok(None)
318 }
319 IdentityData::Doc(mini_did) => {
320 if (now - *last_fetch) >= MIN_TTL {
321 self.queue_refresh(key).await;
322 }
323 Ok(Some(mini_did.clone()))
324 }
325 }
326 }
327
328 /// put a refresh task on the queue
329 ///
330 /// this can be safely called from multiple concurrent tasks
331 async fn queue_refresh(&self, key: IdentityKey) {
332 // todo: max queue size
333 let mut q = self.refresh_queue.lock().await;
334 if !q.items.contains(&key) {
335 q.items.insert(key.clone());
336 q.queue.push_back(key);
337 }
338 }
339
340 /// find out what's next in the queue. concurrent consumers are not allowed.
341 ///
342 /// intent is to leave the item in the queue while refreshing, so that a
343 /// producer will not re-add it if it's in progress. there's definitely
344 /// better ways to do this, but this is ~simple for as far as a single
345 /// consumer can take us.
346 ///
347 /// we could take it from the queue but leave it in the set and remove from
348 /// set later, but splitting them apart feels more bug-prone.
349 async fn peek_refresh(&self) -> Option<IdentityKey> {
350 let q = self.refresh_queue.lock().await;
351 q.queue.front().cloned()
352 }
353
354 /// call to clear the latest key from the refresh queue. concurrent consumers not allowed.
355 ///
356 /// must provide the last peeked refresh queue item as a small safety check
357 async fn complete_refresh(&self, key: &IdentityKey) -> Result<(), IdentityError> {
358 let mut q = self.refresh_queue.lock().await;
359
360 let Some(queue_key) = q.queue.pop_front() else {
361 // gone from queue + since we're in an error condition, make sure it's not stuck in items
362 // (not toctou because we have the lock)
363 // bolder here than below and removing from items because if the queue is *empty*, then we
364 // know it hasn't been re-added since losing sync.
365 if q.items.remove(key) {
366 log::error!("identity refresh: queue de-sync: not in ");
367 } else {
368 log::warn!(
369 "identity refresh: tried to complete with wrong key. are multiple queue consumers running?"
370 );
371 }
372 return Err(IdentityError::RefreshQueueKeyError("no key in queue"));
373 };
374
375 if queue_key != *key {
376 // extra weird case here, what's the most defensive behaviour?
377 // we have two keys: ours should have been first but isn't. this shouldn't happen, so let's
378 // just leave items alone for it. risks unbounded growth but we're in a bad place already.
379 // the other key is the one we just popped. we didn't want it, so maybe we should put it
380 // back, BUT if we somehow ended up with concurrent consumers, we have bigger problems. take
381 // responsibility for taking it instead: remove it from items as well, and just drop it.
382 //
383 // hope that whoever calls us takes this error seriously.
384 if q.items.remove(&queue_key) {
385 log::warn!(
386 "identity refresh: queue de-sync + dropping a bystander key without refreshing it!"
387 );
388 } else {
389 // you thought things couldn't get weirder? (i mean hopefully they can't)
390 log::error!("identity refresh: queue de-sync + bystander key also de-sync!?");
391 }
392 return Err(IdentityError::RefreshQueueKeyError(
393 "wrong key at front of queue",
394 ));
395 }
396
397 if q.items.remove(key) {
398 Ok(())
399 } else {
400 log::error!("identity refresh: queue de-sync: key not in items");
401 Err(IdentityError::RefreshQueueKeyError("key not in items"))
402 }
403 }
404
405 /// run the refresh queue consumer
406 pub async fn run_refresher(&self) -> Result<(), IdentityError> {
407 let _guard = self
408 .refresher
409 .try_lock()
410 .expect("there to only be one refresher running");
411 loop {
412 let Some(task_key) = self.peek_refresh().await else {
413 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
414 continue;
415 };
416 match task_key {
417 IdentityKey::Handle(ref handle) => {
418 log::trace!("refreshing handle {handle:?}");
419 match self.handle_resolver.resolve(handle).await {
420 Ok(did) => {
421 self.cache.insert(
422 task_key.clone(),
423 IdentityVal(UtcDateTime::now(), IdentityData::Did(did)),
424 );
425 }
426 Err(atrium_identity::Error::NotFound) => {
427 self.cache.insert(
428 task_key.clone(),
429 IdentityVal(UtcDateTime::now(), IdentityData::NotFound),
430 );
431 }
432 Err(err) => {
433 log::warn!(
434 "failed to refresh handle: {err:?}. leaving stale (should we eventually do something?)"
435 );
436 }
437 }
438 self.complete_refresh(&task_key).await?; // failures are bugs, so break loop
439 }
440 IdentityKey::Did(ref did) => {
441 log::trace!("refreshing did doc: {did:?}");
442
443 match self.did_resolver.resolve(did).await {
444 Ok(did_doc) => {
445 // TODO: fix in atrium: should verify id is did
446 if did_doc.id != did.to_string() {
447 log::warn!(
448 "refreshed did doc failed: wrong did doc id. dropping refresh."
449 );
450 continue;
451 }
452 let mini_doc = match did_doc.try_into() {
453 Ok(md) => md,
454 Err(e) => {
455 log::warn!(
456 "converting mini doc failed: {e:?}. dropping refresh."
457 );
458 continue;
459 }
460 };
461 self.cache.insert(
462 task_key.clone(),
463 IdentityVal(UtcDateTime::now(), IdentityData::Doc(mini_doc)),
464 );
465 }
466 Err(atrium_identity::Error::NotFound) => {
467 self.cache.insert(
468 task_key.clone(),
469 IdentityVal(UtcDateTime::now(), IdentityData::NotFound),
470 );
471 }
472 Err(err) => {
473 log::warn!(
474 "failed to refresh did doc: {err:?}. leaving stale (should we eventually do something?)"
475 );
476 }
477 }
478
479 self.complete_refresh(&task_key).await?; // failures are bugs, so break loop
480 }
481 }
482 }
483 }
484}
485
486pub struct HickoryDnsTxtResolver(TokioResolver);
487
488impl HickoryDnsTxtResolver {
489 fn new() -> Result<Self, ResolveError> {
490 Ok(Self(TokioResolver::builder_tokio()?.build()))
491 }
492}
493
494impl DnsTxtResolver for HickoryDnsTxtResolver {
495 async fn resolve(
496 &self,
497 query: &str,
498 ) -> core::result::Result<Vec<String>, Box<dyn std::error::Error + Send + Sync>> {
499 match self.0.txt_lookup(query).await {
500 Ok(r) => {
501 metrics::counter!("whoami_resolve_dns_txt", "success" => "true").increment(1);
502 Ok(r.iter().map(|r| r.to_string()).collect())
503 }
504 Err(e) => {
505 metrics::counter!("whoami_resolve_dns_txt", "success" => "false").increment(1);
506 Err(e.into())
507 }
508 }
509 }
510}