Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use hickory_resolver::{ResolveError, TokioResolver}; 2use std::collections::{HashSet, VecDeque}; 3use std::path::Path; 4use std::sync::Arc; 5/// for now we're gonna just keep doing more cache 6/// 7/// plc.director x foyer, ttl kept with data, refresh deferred to background on fetch 8/// 9/// things we need: 10/// 11/// 1. handle -> DID resolution: getRecord must accept a handle for `repo` param 12/// 2. DID -> PDS resolution: so we know where to getRecord 13/// 3. DID -> handle resolution: for bidirectional handle validation and in case we want to offer this 14use std::time::Duration; 15use tokio::sync::Mutex; 16 17use crate::error::IdentityError; 18use atrium_api::{ 19 did_doc::DidDocument, 20 types::string::{Did, Handle}, 21}; 22use atrium_common::resolver::Resolver; 23use atrium_identity::{ 24 did::{CommonDidResolver, CommonDidResolverConfig, DEFAULT_PLC_DIRECTORY_URL}, 25 handle::{AtprotoHandleResolver, AtprotoHandleResolverConfig, DnsTxtResolver}, 26}; 27use atrium_oauth::DefaultHttpClient; // it's probably not worth bringing all of atrium_oauth for this but 28use foyer::{DirectFsDeviceOptions, Engine, HybridCache, HybridCacheBuilder}; 29use serde::{Deserialize, Serialize}; 30use time::UtcDateTime; 31 32/// once we have something resolved, don't re-resolve until after this period 33const MIN_TTL: Duration = Duration::from_secs(4 * 3600); // probably shoudl have a max ttl 34const MIN_NOT_FOUND_TTL: Duration = Duration::from_secs(60); 35 36#[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)] 37enum IdentityKey { 38 Handle(Handle), 39 Did(Did), 40} 41 42#[derive(Debug, Serialize, Deserialize)] 43struct IdentityVal(UtcDateTime, IdentityData); 44 45#[derive(Debug, Serialize, Deserialize)] 46enum IdentityData { 47 NotFound, 48 Did(Did), 49 Doc(PartialMiniDoc), 50} 51 52/// partial representation of a com.bad-example.identity mini atproto doc 53/// 54/// partial because the handle is not verified 55#[derive(Debug, Clone, Serialize, Deserialize)] 56struct PartialMiniDoc { 57 /// an atproto handle (**unverified**) 58 /// 59 /// the first valid atproto handle from the did doc's aka 60 unverified_handle: Handle, 61 /// the did's atproto pds url (TODO: type this?) 62 /// 63 /// note: atrium *does* actually parse it into a URI, it just doesn't return 64 /// that for some reason 65 pds: String, 66 /// for now we're just pulling this straight from the did doc 67 /// 68 /// would be nice to type and validate it 69 /// 70 /// this is the publicKeyMultibase from the did doc. 71 /// legacy key encoding not supported. 72 /// `id`, `type`, and `controller` must be checked, but aren't stored. 73 signing_key: String, 74} 75 76impl TryFrom<DidDocument> for PartialMiniDoc { 77 type Error = String; 78 fn try_from(did_doc: DidDocument) -> Result<Self, Self::Error> { 79 // must use the first valid handle 80 let mut unverified_handle = None; 81 let Some(ref doc_akas) = did_doc.also_known_as else { 82 return Err("did doc missing `also_known_as`".to_string()); 83 }; 84 for aka in doc_akas { 85 let Some(maybe_handle) = aka.strip_prefix("at://") else { 86 continue; 87 }; 88 let Ok(valid_handle) = Handle::new(maybe_handle.to_string()) else { 89 continue; 90 }; 91 unverified_handle = Some(valid_handle); 92 break; 93 } 94 let Some(unverified_handle) = unverified_handle else { 95 return Err("no valid atproto handles in `also_known_as`".to_string()); 96 }; 97 98 // atrium seems to get service endpoint getters 99 let Some(pds) = did_doc.get_pds_endpoint() else { 100 return Err("no valid pds service found".to_string()); 101 }; 102 103 // TODO can't use atrium's get_signing_key() becuase it fails to check type and controller 104 // so if we check those and reject it, we might miss a later valid key in the array 105 // (todo is to fix atrium) 106 // actually: atrium might be flexible for legacy reps. for now we're rejecting legacy rep. 107 108 // must use the first valid signing key 109 let mut signing_key = None; 110 let Some(verification_methods) = did_doc.verification_method else { 111 return Err("no verification methods found".to_string()); 112 }; 113 for method in verification_methods { 114 if method.id != format!("{}#atproto", did_doc.id) { 115 continue; 116 } 117 if method.r#type != "Multikey" { 118 continue; 119 } 120 if method.controller != did_doc.id { 121 continue; 122 } 123 let Some(key) = method.public_key_multibase else { 124 continue; 125 }; 126 signing_key = Some(key); 127 break; 128 } 129 let Some(signing_key) = signing_key else { 130 return Err("no valid atproto signing key found in verification methods".to_string()); 131 }; 132 133 Ok(PartialMiniDoc { 134 unverified_handle, 135 pds, 136 signing_key, 137 }) 138 } 139} 140 141/// multi-producer *single-consumer* queue structures (wrap in arc-mutex plz) 142/// 143/// the hashset allows testing for presense of items in the queue. 144/// this has absolutely no support for multiple queue consumers. 145#[derive(Debug, Default)] 146struct RefreshQueue { 147 queue: VecDeque<IdentityKey>, 148 items: HashSet<IdentityKey>, 149} 150 151#[derive(Clone)] 152pub struct Identity { 153 handle_resolver: Arc<AtprotoHandleResolver<HickoryDnsTxtResolver, DefaultHttpClient>>, 154 did_resolver: Arc<CommonDidResolver<DefaultHttpClient>>, 155 cache: HybridCache<IdentityKey, IdentityVal>, 156 /// multi-producer *single consumer* queue 157 refresh_queue: Arc<Mutex<RefreshQueue>>, 158 /// just a lock to ensure only one refresher (queue consumer) is running (to be improved with a better refresher) 159 refresher: Arc<Mutex<()>>, 160} 161 162impl Identity { 163 pub async fn new(cache_dir: impl AsRef<Path>) -> Result<Self, IdentityError> { 164 let http_client = Arc::new(DefaultHttpClient::default()); 165 let handle_resolver = AtprotoHandleResolver::new(AtprotoHandleResolverConfig { 166 dns_txt_resolver: HickoryDnsTxtResolver::new().unwrap(), 167 http_client: http_client.clone(), 168 }); 169 let did_resolver = CommonDidResolver::new(CommonDidResolverConfig { 170 plc_directory_url: DEFAULT_PLC_DIRECTORY_URL.to_string(), 171 http_client: http_client.clone(), 172 }); 173 174 let cache = HybridCacheBuilder::new() 175 .with_name("identity") 176 .memory(16 * 2_usize.pow(20)) 177 .with_weighter(|k, v| std::mem::size_of_val(k) + std::mem::size_of_val(v)) 178 .storage(Engine::large()) 179 .with_device_options(DirectFsDeviceOptions::new(cache_dir)) 180 .build() 181 .await?; 182 183 Ok(Self { 184 handle_resolver: Arc::new(handle_resolver), 185 did_resolver: Arc::new(did_resolver), 186 cache, 187 refresh_queue: Default::default(), 188 refresher: Default::default(), 189 }) 190 } 191 192 /// Resolve (and verify!) an atproto handle to a DID 193 /// 194 /// The result can be stale 195 /// 196 /// `None` if the handle can't be found or verification fails 197 pub async fn handle_to_did(&self, handle: Handle) -> Result<Option<Did>, IdentityError> { 198 let Some(did) = self.handle_to_unverified_did(&handle).await? else { 199 return Ok(None); 200 }; 201 let Some(doc) = self.did_to_partial_mini_doc(&did).await? else { 202 return Ok(None); 203 }; 204 if doc.unverified_handle != handle { 205 return Ok(None); 206 } 207 Ok(Some(did)) 208 } 209 210 /// Resolve (and verify!) a DID to a pds url 211 /// 212 /// This *also* incidentally resolves and verifies the handle, which might 213 /// make it slower than expected 214 pub async fn did_to_pds(&self, did: Did) -> Result<Option<String>, IdentityError> { 215 let Some(mini_doc) = self.did_to_partial_mini_doc(&did).await? else { 216 return Ok(None); 217 }; 218 Ok(Some(mini_doc.pds)) 219 } 220 221 /// Resolve (and cache but **not verify**) a handle to a DID 222 async fn handle_to_unverified_did( 223 &self, 224 handle: &Handle, 225 ) -> Result<Option<Did>, IdentityError> { 226 let key = IdentityKey::Handle(handle.clone()); 227 let entry = self 228 .cache 229 .fetch(key.clone(), { 230 let handle = handle.clone(); 231 let resolver = self.handle_resolver.clone(); 232 || async move { 233 match resolver.resolve(&handle).await { 234 Ok(did) => Ok(IdentityVal(UtcDateTime::now(), IdentityData::Did(did))), 235 Err(atrium_identity::Error::NotFound) => { 236 Ok(IdentityVal(UtcDateTime::now(), IdentityData::NotFound)) 237 } 238 Err(other) => Err(foyer::Error::Other(Box::new( 239 IdentityError::ResolutionFailed(other), 240 ))), 241 } 242 } 243 }) 244 .await?; 245 246 let now = UtcDateTime::now(); 247 let IdentityVal(last_fetch, data) = entry.value(); 248 match data { 249 IdentityData::Doc(_) => { 250 log::error!("identity value mixup: got a doc from a handle key (should be a did)"); 251 Err(IdentityError::IdentityValTypeMixup(handle.to_string())) 252 } 253 IdentityData::NotFound => { 254 if (now - *last_fetch) >= MIN_NOT_FOUND_TTL { 255 self.queue_refresh(key).await; 256 } 257 Ok(None) 258 } 259 IdentityData::Did(did) => { 260 if (now - *last_fetch) >= MIN_TTL { 261 self.queue_refresh(key).await; 262 } 263 Ok(Some(did.clone())) 264 } 265 } 266 } 267 268 /// Fetch (and cache) a partial mini doc from a did 269 async fn did_to_partial_mini_doc( 270 &self, 271 did: &Did, 272 ) -> Result<Option<PartialMiniDoc>, IdentityError> { 273 let key = IdentityKey::Did(did.clone()); 274 let entry = self 275 .cache 276 .fetch(key.clone(), { 277 let did = did.clone(); 278 let resolver = self.did_resolver.clone(); 279 || async move { 280 match resolver.resolve(&did).await { 281 Ok(did_doc) => { 282 // TODO: fix in atrium: should verify id is did 283 if did_doc.id != did.to_string() { 284 return Err(foyer::Error::other(Box::new( 285 IdentityError::BadDidDoc( 286 "did doc's id did not match did".to_string(), 287 ), 288 ))); 289 } 290 let mini_doc = did_doc.try_into().map_err(|e| { 291 foyer::Error::Other(Box::new(IdentityError::BadDidDoc(e))) 292 })?; 293 Ok(IdentityVal(UtcDateTime::now(), IdentityData::Doc(mini_doc))) 294 } 295 Err(atrium_identity::Error::NotFound) => { 296 Ok(IdentityVal(UtcDateTime::now(), IdentityData::NotFound)) 297 } 298 Err(other) => Err(foyer::Error::Other(Box::new( 299 IdentityError::ResolutionFailed(other), 300 ))), 301 } 302 } 303 }) 304 .await?; 305 306 let now = UtcDateTime::now(); 307 let IdentityVal(last_fetch, data) = entry.value(); 308 match data { 309 IdentityData::Did(_) => { 310 log::error!("identity value mixup: got a did from a did key (should be a doc)"); 311 Err(IdentityError::IdentityValTypeMixup(did.to_string())) 312 } 313 IdentityData::NotFound => { 314 if (now - *last_fetch) >= MIN_NOT_FOUND_TTL { 315 self.queue_refresh(key).await; 316 } 317 Ok(None) 318 } 319 IdentityData::Doc(mini_did) => { 320 if (now - *last_fetch) >= MIN_TTL { 321 self.queue_refresh(key).await; 322 } 323 Ok(Some(mini_did.clone())) 324 } 325 } 326 } 327 328 /// put a refresh task on the queue 329 /// 330 /// this can be safely called from multiple concurrent tasks 331 async fn queue_refresh(&self, key: IdentityKey) { 332 // todo: max queue size 333 let mut q = self.refresh_queue.lock().await; 334 if !q.items.contains(&key) { 335 q.items.insert(key.clone()); 336 q.queue.push_back(key); 337 } 338 } 339 340 /// find out what's next in the queue. concurrent consumers are not allowed. 341 /// 342 /// intent is to leave the item in the queue while refreshing, so that a 343 /// producer will not re-add it if it's in progress. there's definitely 344 /// better ways to do this, but this is ~simple for as far as a single 345 /// consumer can take us. 346 /// 347 /// we could take it from the queue but leave it in the set and remove from 348 /// set later, but splitting them apart feels more bug-prone. 349 async fn peek_refresh(&self) -> Option<IdentityKey> { 350 let q = self.refresh_queue.lock().await; 351 q.queue.front().cloned() 352 } 353 354 /// call to clear the latest key from the refresh queue. concurrent consumers not allowed. 355 /// 356 /// must provide the last peeked refresh queue item as a small safety check 357 async fn complete_refresh(&self, key: &IdentityKey) -> Result<(), IdentityError> { 358 let mut q = self.refresh_queue.lock().await; 359 360 let Some(queue_key) = q.queue.pop_front() else { 361 // gone from queue + since we're in an error condition, make sure it's not stuck in items 362 // (not toctou because we have the lock) 363 // bolder here than below and removing from items because if the queue is *empty*, then we 364 // know it hasn't been re-added since losing sync. 365 if q.items.remove(key) { 366 log::error!("identity refresh: queue de-sync: not in "); 367 } else { 368 log::warn!( 369 "identity refresh: tried to complete with wrong key. are multiple queue consumers running?" 370 ); 371 } 372 return Err(IdentityError::RefreshQueueKeyError("no key in queue")); 373 }; 374 375 if queue_key != *key { 376 // extra weird case here, what's the most defensive behaviour? 377 // we have two keys: ours should have been first but isn't. this shouldn't happen, so let's 378 // just leave items alone for it. risks unbounded growth but we're in a bad place already. 379 // the other key is the one we just popped. we didn't want it, so maybe we should put it 380 // back, BUT if we somehow ended up with concurrent consumers, we have bigger problems. take 381 // responsibility for taking it instead: remove it from items as well, and just drop it. 382 // 383 // hope that whoever calls us takes this error seriously. 384 if q.items.remove(&queue_key) { 385 log::warn!( 386 "identity refresh: queue de-sync + dropping a bystander key without refreshing it!" 387 ); 388 } else { 389 // you thought things couldn't get weirder? (i mean hopefully they can't) 390 log::error!("identity refresh: queue de-sync + bystander key also de-sync!?"); 391 } 392 return Err(IdentityError::RefreshQueueKeyError( 393 "wrong key at front of queue", 394 )); 395 } 396 397 if q.items.remove(key) { 398 Ok(()) 399 } else { 400 log::error!("identity refresh: queue de-sync: key not in items"); 401 Err(IdentityError::RefreshQueueKeyError("key not in items")) 402 } 403 } 404 405 /// run the refresh queue consumer 406 pub async fn run_refresher(&self) -> Result<(), IdentityError> { 407 let _guard = self 408 .refresher 409 .try_lock() 410 .expect("there to only be one refresher running"); 411 loop { 412 let Some(task_key) = self.peek_refresh().await else { 413 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; 414 continue; 415 }; 416 match task_key { 417 IdentityKey::Handle(ref handle) => { 418 log::trace!("refreshing handle {handle:?}"); 419 match self.handle_resolver.resolve(handle).await { 420 Ok(did) => { 421 self.cache.insert( 422 task_key.clone(), 423 IdentityVal(UtcDateTime::now(), IdentityData::Did(did)), 424 ); 425 } 426 Err(atrium_identity::Error::NotFound) => { 427 self.cache.insert( 428 task_key.clone(), 429 IdentityVal(UtcDateTime::now(), IdentityData::NotFound), 430 ); 431 } 432 Err(err) => { 433 log::warn!( 434 "failed to refresh handle: {err:?}. leaving stale (should we eventually do something?)" 435 ); 436 } 437 } 438 self.complete_refresh(&task_key).await?; // failures are bugs, so break loop 439 } 440 IdentityKey::Did(ref did) => { 441 log::trace!("refreshing did doc: {did:?}"); 442 443 match self.did_resolver.resolve(did).await { 444 Ok(did_doc) => { 445 // TODO: fix in atrium: should verify id is did 446 if did_doc.id != did.to_string() { 447 log::warn!( 448 "refreshed did doc failed: wrong did doc id. dropping refresh." 449 ); 450 continue; 451 } 452 let mini_doc = match did_doc.try_into() { 453 Ok(md) => md, 454 Err(e) => { 455 log::warn!( 456 "converting mini doc failed: {e:?}. dropping refresh." 457 ); 458 continue; 459 } 460 }; 461 self.cache.insert( 462 task_key.clone(), 463 IdentityVal(UtcDateTime::now(), IdentityData::Doc(mini_doc)), 464 ); 465 } 466 Err(atrium_identity::Error::NotFound) => { 467 self.cache.insert( 468 task_key.clone(), 469 IdentityVal(UtcDateTime::now(), IdentityData::NotFound), 470 ); 471 } 472 Err(err) => { 473 log::warn!( 474 "failed to refresh did doc: {err:?}. leaving stale (should we eventually do something?)" 475 ); 476 } 477 } 478 479 self.complete_refresh(&task_key).await?; // failures are bugs, so break loop 480 } 481 } 482 } 483 } 484} 485 486pub struct HickoryDnsTxtResolver(TokioResolver); 487 488impl HickoryDnsTxtResolver { 489 fn new() -> Result<Self, ResolveError> { 490 Ok(Self(TokioResolver::builder_tokio()?.build())) 491 } 492} 493 494impl DnsTxtResolver for HickoryDnsTxtResolver { 495 async fn resolve( 496 &self, 497 query: &str, 498 ) -> core::result::Result<Vec<String>, Box<dyn std::error::Error + Send + Sync>> { 499 match self.0.txt_lookup(query).await { 500 Ok(r) => { 501 metrics::counter!("whoami_resolve_dns_txt", "success" => "true").increment(1); 502 Ok(r.iter().map(|r| r.to_string()).collect()) 503 } 504 Err(e) => { 505 metrics::counter!("whoami_resolve_dns_txt", "success" => "false").increment(1); 506 Err(e.into()) 507 } 508 } 509 } 510}