forked from
microcosm.blue/microcosm-rs
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use hickory_resolver::{ResolveError, TokioResolver};
2use std::collections::{HashSet, VecDeque};
3use std::path::Path;
4use std::sync::Arc;
5/// for now we're gonna just keep doing more cache
6///
7/// plc.director x foyer, ttl kept with data, refresh deferred to background on fetch
8///
9/// things we need:
10///
11/// 1. handle -> DID resolution: getRecord must accept a handle for `repo` param
12/// 2. DID -> PDS resolution: so we know where to getRecord
13/// 3. DID -> handle resolution: for bidirectional handle validation and in case we want to offer this
14use std::time::Duration;
15use tokio::sync::Mutex;
16use tokio_util::sync::CancellationToken;
17
18use crate::error::IdentityError;
19use atrium_api::{
20 did_doc::DidDocument,
21 types::string::{Did, Handle},
22};
23use atrium_common::resolver::Resolver;
24use atrium_identity::{
25 did::{CommonDidResolver, CommonDidResolverConfig, DEFAULT_PLC_DIRECTORY_URL},
26 handle::{AtprotoHandleResolver, AtprotoHandleResolverConfig, DnsTxtResolver},
27};
28use atrium_oauth::DefaultHttpClient; // it's probably not worth bringing all of atrium_oauth for this but
29use foyer::{DirectFsDeviceOptions, Engine, HybridCache, HybridCacheBuilder};
30use serde::{Deserialize, Serialize};
31use time::UtcDateTime;
32
33/// once we have something resolved, don't re-resolve until after this period
34const MIN_TTL: Duration = Duration::from_secs(4 * 3600); // probably shoudl have a max ttl
35const MIN_NOT_FOUND_TTL: Duration = Duration::from_secs(60);
36
37#[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)]
38enum IdentityKey {
39 Handle(Handle),
40 Did(Did),
41}
42
43#[derive(Debug, Serialize, Deserialize)]
44struct IdentityVal(UtcDateTime, IdentityData);
45
46#[derive(Debug, Serialize, Deserialize)]
47enum IdentityData {
48 NotFound,
49 Did(Did),
50 Doc(PartialMiniDoc),
51}
52
53/// partial representation of a com.bad-example.identity mini atproto doc
54///
55/// partial because the handle is not verified
56#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct PartialMiniDoc {
58 /// an atproto handle (**unverified**)
59 ///
60 /// the first valid atproto handle from the did doc's aka
61 pub unverified_handle: Handle,
62 /// the did's atproto pds url (TODO: type this?)
63 ///
64 /// note: atrium *does* actually parse it into a URI, it just doesn't return
65 /// that for some reason
66 pub pds: String,
67 /// for now we're just pulling this straight from the did doc
68 ///
69 /// would be nice to type and validate it
70 ///
71 /// this is the publicKeyMultibase from the did doc.
72 /// legacy key encoding not supported.
73 /// `id`, `type`, and `controller` must be checked, but aren't stored.
74 pub signing_key: String,
75}
76
77impl TryFrom<DidDocument> for PartialMiniDoc {
78 type Error = String;
79 fn try_from(did_doc: DidDocument) -> Result<Self, Self::Error> {
80 // must use the first valid handle
81 let mut unverified_handle = None;
82 let Some(ref doc_akas) = did_doc.also_known_as else {
83 return Err("did doc missing `also_known_as`".to_string());
84 };
85 for aka in doc_akas {
86 let Some(maybe_handle) = aka.strip_prefix("at://") else {
87 continue;
88 };
89 let Ok(valid_handle) = Handle::new(maybe_handle.to_string()) else {
90 continue;
91 };
92 unverified_handle = Some(valid_handle);
93 break;
94 }
95 let Some(unverified_handle) = unverified_handle else {
96 return Err("no valid atproto handles in `also_known_as`".to_string());
97 };
98
99 // atrium seems to get service endpoint getters
100 let Some(pds) = did_doc.get_pds_endpoint() else {
101 return Err("no valid pds service found".to_string());
102 };
103
104 // TODO can't use atrium's get_signing_key() becuase it fails to check type and controller
105 // so if we check those and reject it, we might miss a later valid key in the array
106 // (todo is to fix atrium)
107 // actually: atrium might be flexible for legacy reps. for now we're rejecting legacy rep.
108
109 // must use the first valid signing key
110 let mut signing_key = None;
111 let Some(verification_methods) = did_doc.verification_method else {
112 return Err("no verification methods found".to_string());
113 };
114 for method in verification_methods {
115 if method.id != format!("{}#atproto", did_doc.id) {
116 continue;
117 }
118 if method.r#type != "Multikey" {
119 continue;
120 }
121 if method.controller != did_doc.id {
122 continue;
123 }
124 let Some(key) = method.public_key_multibase else {
125 continue;
126 };
127 signing_key = Some(key);
128 break;
129 }
130 let Some(signing_key) = signing_key else {
131 return Err("no valid atproto signing key found in verification methods".to_string());
132 };
133
134 Ok(PartialMiniDoc {
135 unverified_handle,
136 pds,
137 signing_key,
138 })
139 }
140}
141
142/// multi-producer *single-consumer* queue structures (wrap in arc-mutex plz)
143///
144/// the hashset allows testing for presense of items in the queue.
145/// this has absolutely no support for multiple queue consumers.
146#[derive(Debug, Default)]
147struct RefreshQueue {
148 queue: VecDeque<IdentityKey>,
149 items: HashSet<IdentityKey>,
150}
151
152#[derive(Clone)]
153pub struct Identity {
154 handle_resolver: Arc<AtprotoHandleResolver<HickoryDnsTxtResolver, DefaultHttpClient>>,
155 did_resolver: Arc<CommonDidResolver<DefaultHttpClient>>,
156 cache: HybridCache<IdentityKey, IdentityVal>,
157 /// multi-producer *single consumer* queue
158 refresh_queue: Arc<Mutex<RefreshQueue>>,
159 /// just a lock to ensure only one refresher (queue consumer) is running (to be improved with a better refresher)
160 refresher: Arc<Mutex<()>>,
161}
162
163impl Identity {
164 pub async fn new(cache_dir: impl AsRef<Path>) -> Result<Self, IdentityError> {
165 let http_client = Arc::new(DefaultHttpClient::default());
166 let handle_resolver = AtprotoHandleResolver::new(AtprotoHandleResolverConfig {
167 dns_txt_resolver: HickoryDnsTxtResolver::new().unwrap(),
168 http_client: http_client.clone(),
169 });
170 let did_resolver = CommonDidResolver::new(CommonDidResolverConfig {
171 plc_directory_url: DEFAULT_PLC_DIRECTORY_URL.to_string(),
172 http_client: http_client.clone(),
173 });
174
175 let cache = HybridCacheBuilder::new()
176 .with_name("identity")
177 .memory(16 * 2_usize.pow(20))
178 .with_weighter(|k, v| std::mem::size_of_val(k) + std::mem::size_of_val(v))
179 .storage(Engine::small())
180 .with_device_options(
181 DirectFsDeviceOptions::new(cache_dir)
182 .with_capacity(2_usize.pow(30)) // TODO: configurable (1GB to have something)
183 .with_file_size(2_usize.pow(20)), // note: this does limit the max cached item size, warning jumbo records
184 )
185 .build()
186 .await?;
187
188 Ok(Self {
189 handle_resolver: Arc::new(handle_resolver),
190 did_resolver: Arc::new(did_resolver),
191 cache,
192 refresh_queue: Default::default(),
193 refresher: Default::default(),
194 })
195 }
196
197 /// Resolve (and verify!) an atproto handle to a DID
198 ///
199 /// The result can be stale
200 ///
201 /// `None` if the handle can't be found or verification fails
202 pub async fn handle_to_did(&self, handle: Handle) -> Result<Option<Did>, IdentityError> {
203 let Some(did) = self.handle_to_unverified_did(&handle).await? else {
204 return Ok(None);
205 };
206 let Some(doc) = self.did_to_partial_mini_doc(&did).await? else {
207 return Ok(None);
208 };
209 if doc.unverified_handle != handle {
210 return Ok(None);
211 }
212 Ok(Some(did))
213 }
214
215 /// Resolve a DID to a pds url
216 ///
217 /// This *also* incidentally resolves and verifies the handle, which might
218 /// make it slower than expected
219 pub async fn did_to_pds(&self, did: Did) -> Result<Option<String>, IdentityError> {
220 let Some(mini_doc) = self.did_to_partial_mini_doc(&did).await? else {
221 return Ok(None);
222 };
223 Ok(Some(mini_doc.pds))
224 }
225
226 /// Resolve (and cache but **not verify**) a handle to a DID
227 async fn handle_to_unverified_did(
228 &self,
229 handle: &Handle,
230 ) -> Result<Option<Did>, IdentityError> {
231 let key = IdentityKey::Handle(handle.clone());
232 let entry = self
233 .cache
234 .fetch(key.clone(), {
235 let handle = handle.clone();
236 let resolver = self.handle_resolver.clone();
237 || async move {
238 match resolver.resolve(&handle).await {
239 Ok(did) => Ok(IdentityVal(UtcDateTime::now(), IdentityData::Did(did))),
240 Err(atrium_identity::Error::NotFound) => {
241 Ok(IdentityVal(UtcDateTime::now(), IdentityData::NotFound))
242 }
243 Err(other) => Err(foyer::Error::Other(Box::new({
244 log::debug!("other error resolving handle: {other:?}");
245 IdentityError::ResolutionFailed(other)
246 }))),
247 }
248 }
249 })
250 .await?;
251
252 let now = UtcDateTime::now();
253 let IdentityVal(last_fetch, data) = entry.value();
254 match data {
255 IdentityData::Doc(_) => {
256 log::error!("identity value mixup: got a doc from a handle key (should be a did)");
257 Err(IdentityError::IdentityValTypeMixup(handle.to_string()))
258 }
259 IdentityData::NotFound => {
260 if (now - *last_fetch) >= MIN_NOT_FOUND_TTL {
261 self.queue_refresh(key).await;
262 }
263 Ok(None)
264 }
265 IdentityData::Did(did) => {
266 if (now - *last_fetch) >= MIN_TTL {
267 self.queue_refresh(key).await;
268 }
269 Ok(Some(did.clone()))
270 }
271 }
272 }
273
274 /// Fetch (and cache) a partial mini doc from a did
275 pub async fn did_to_partial_mini_doc(
276 &self,
277 did: &Did,
278 ) -> Result<Option<PartialMiniDoc>, IdentityError> {
279 let key = IdentityKey::Did(did.clone());
280 let entry = self
281 .cache
282 .fetch(key.clone(), {
283 let did = did.clone();
284 let resolver = self.did_resolver.clone();
285 || async move {
286 match resolver.resolve(&did).await {
287 Ok(did_doc) => {
288 // TODO: fix in atrium: should verify id is did
289 if did_doc.id != did.to_string() {
290 return Err(foyer::Error::other(Box::new(
291 IdentityError::BadDidDoc(
292 "did doc's id did not match did".to_string(),
293 ),
294 )));
295 }
296 let mini_doc = did_doc.try_into().map_err(|e| {
297 foyer::Error::Other(Box::new(IdentityError::BadDidDoc(e)))
298 })?;
299 Ok(IdentityVal(UtcDateTime::now(), IdentityData::Doc(mini_doc)))
300 }
301 Err(atrium_identity::Error::NotFound) => {
302 Ok(IdentityVal(UtcDateTime::now(), IdentityData::NotFound))
303 }
304 Err(other) => Err(foyer::Error::Other(Box::new(
305 IdentityError::ResolutionFailed(other),
306 ))),
307 }
308 }
309 })
310 .await?;
311
312 let now = UtcDateTime::now();
313 let IdentityVal(last_fetch, data) = entry.value();
314 match data {
315 IdentityData::Did(_) => {
316 log::error!("identity value mixup: got a did from a did key (should be a doc)");
317 Err(IdentityError::IdentityValTypeMixup(did.to_string()))
318 }
319 IdentityData::NotFound => {
320 if (now - *last_fetch) >= MIN_NOT_FOUND_TTL {
321 self.queue_refresh(key).await;
322 }
323 Ok(None)
324 }
325 IdentityData::Doc(mini_did) => {
326 if (now - *last_fetch) >= MIN_TTL {
327 self.queue_refresh(key).await;
328 }
329 Ok(Some(mini_did.clone()))
330 }
331 }
332 }
333
334 /// put a refresh task on the queue
335 ///
336 /// this can be safely called from multiple concurrent tasks
337 async fn queue_refresh(&self, key: IdentityKey) {
338 // todo: max queue size
339 let mut q = self.refresh_queue.lock().await;
340 if !q.items.contains(&key) {
341 q.items.insert(key.clone());
342 q.queue.push_back(key);
343 }
344 }
345
346 /// find out what's next in the queue. concurrent consumers are not allowed.
347 ///
348 /// intent is to leave the item in the queue while refreshing, so that a
349 /// producer will not re-add it if it's in progress. there's definitely
350 /// better ways to do this, but this is ~simple for as far as a single
351 /// consumer can take us.
352 ///
353 /// we could take it from the queue but leave it in the set and remove from
354 /// set later, but splitting them apart feels more bug-prone.
355 async fn peek_refresh(&self) -> Option<IdentityKey> {
356 let q = self.refresh_queue.lock().await;
357 q.queue.front().cloned()
358 }
359
360 /// call to clear the latest key from the refresh queue. concurrent consumers not allowed.
361 ///
362 /// must provide the last peeked refresh queue item as a small safety check
363 async fn complete_refresh(&self, key: &IdentityKey) -> Result<(), IdentityError> {
364 let mut q = self.refresh_queue.lock().await;
365
366 let Some(queue_key) = q.queue.pop_front() else {
367 // gone from queue + since we're in an error condition, make sure it's not stuck in items
368 // (not toctou because we have the lock)
369 // bolder here than below and removing from items because if the queue is *empty*, then we
370 // know it hasn't been re-added since losing sync.
371 if q.items.remove(key) {
372 log::error!("identity refresh: queue de-sync: not in ");
373 } else {
374 log::warn!(
375 "identity refresh: tried to complete with wrong key. are multiple queue consumers running?"
376 );
377 }
378 return Err(IdentityError::RefreshQueueKeyError("no key in queue"));
379 };
380
381 if queue_key != *key {
382 // extra weird case here, what's the most defensive behaviour?
383 // we have two keys: ours should have been first but isn't. this shouldn't happen, so let's
384 // just leave items alone for it. risks unbounded growth but we're in a bad place already.
385 // the other key is the one we just popped. we didn't want it, so maybe we should put it
386 // back, BUT if we somehow ended up with concurrent consumers, we have bigger problems. take
387 // responsibility for taking it instead: remove it from items as well, and just drop it.
388 //
389 // hope that whoever calls us takes this error seriously.
390 if q.items.remove(&queue_key) {
391 log::warn!(
392 "identity refresh: queue de-sync + dropping a bystander key without refreshing it!"
393 );
394 } else {
395 // you thought things couldn't get weirder? (i mean hopefully they can't)
396 log::error!("identity refresh: queue de-sync + bystander key also de-sync!?");
397 }
398 return Err(IdentityError::RefreshQueueKeyError(
399 "wrong key at front of queue",
400 ));
401 }
402
403 if q.items.remove(key) {
404 Ok(())
405 } else {
406 log::error!("identity refresh: queue de-sync: key not in items");
407 Err(IdentityError::RefreshQueueKeyError("key not in items"))
408 }
409 }
410
411 /// run the refresh queue consumer
412 pub async fn run_refresher(&self, shutdown: CancellationToken) -> Result<(), IdentityError> {
413 let _guard = self
414 .refresher
415 .try_lock()
416 .expect("there to only be one refresher running");
417 loop {
418 if shutdown.is_cancelled() {
419 log::info!("identity refresher: exiting for shutdown: closing cache...");
420 if let Err(e) = self.cache.close().await {
421 log::error!("cache close errored: {e}");
422 } else {
423 log::info!("identity cache closed.")
424 }
425 return Ok(());
426 }
427 let Some(task_key) = self.peek_refresh().await else {
428 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
429 continue;
430 };
431 match task_key {
432 IdentityKey::Handle(ref handle) => {
433 log::trace!("refreshing handle {handle:?}");
434 match self.handle_resolver.resolve(handle).await {
435 Ok(did) => {
436 self.cache.insert(
437 task_key.clone(),
438 IdentityVal(UtcDateTime::now(), IdentityData::Did(did)),
439 );
440 }
441 Err(atrium_identity::Error::NotFound) => {
442 self.cache.insert(
443 task_key.clone(),
444 IdentityVal(UtcDateTime::now(), IdentityData::NotFound),
445 );
446 }
447 Err(err) => {
448 log::warn!(
449 "failed to refresh handle: {err:?}. leaving stale (should we eventually do something?)"
450 );
451 }
452 }
453 self.complete_refresh(&task_key).await?; // failures are bugs, so break loop
454 }
455 IdentityKey::Did(ref did) => {
456 log::trace!("refreshing did doc: {did:?}");
457
458 match self.did_resolver.resolve(did).await {
459 Ok(did_doc) => {
460 // TODO: fix in atrium: should verify id is did
461 if did_doc.id != did.to_string() {
462 log::warn!(
463 "refreshed did doc failed: wrong did doc id. dropping refresh."
464 );
465 continue;
466 }
467 let mini_doc = match did_doc.try_into() {
468 Ok(md) => md,
469 Err(e) => {
470 log::warn!(
471 "converting mini doc failed: {e:?}. dropping refresh."
472 );
473 continue;
474 }
475 };
476 self.cache.insert(
477 task_key.clone(),
478 IdentityVal(UtcDateTime::now(), IdentityData::Doc(mini_doc)),
479 );
480 }
481 Err(atrium_identity::Error::NotFound) => {
482 self.cache.insert(
483 task_key.clone(),
484 IdentityVal(UtcDateTime::now(), IdentityData::NotFound),
485 );
486 }
487 Err(err) => {
488 log::warn!(
489 "failed to refresh did doc: {err:?}. leaving stale (should we eventually do something?)"
490 );
491 }
492 }
493
494 self.complete_refresh(&task_key).await?; // failures are bugs, so break loop
495 }
496 }
497 }
498 }
499}
500
501pub struct HickoryDnsTxtResolver(TokioResolver);
502
503impl HickoryDnsTxtResolver {
504 fn new() -> Result<Self, ResolveError> {
505 Ok(Self(TokioResolver::builder_tokio()?.build()))
506 }
507}
508
509impl DnsTxtResolver for HickoryDnsTxtResolver {
510 async fn resolve(
511 &self,
512 query: &str,
513 ) -> core::result::Result<Vec<String>, Box<dyn std::error::Error + Send + Sync>> {
514 match self.0.txt_lookup(query).await {
515 Ok(r) => {
516 metrics::counter!("whoami_resolve_dns_txt", "success" => "true").increment(1);
517 Ok(r.iter().map(|r| r.to_string()).collect())
518 }
519 Err(e) => {
520 metrics::counter!("whoami_resolve_dns_txt", "success" => "false").increment(1);
521 Err(e.into())
522 }
523 }
524 }
525}