Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use hickory_resolver::{ResolveError, TokioResolver}; 2use std::collections::{HashSet, VecDeque}; 3use std::path::Path; 4use std::sync::Arc; 5/// for now we're gonna just keep doing more cache 6/// 7/// plc.director x foyer, ttl kept with data, refresh deferred to background on fetch 8/// 9/// things we need: 10/// 11/// 1. handle -> DID resolution: getRecord must accept a handle for `repo` param 12/// 2. DID -> PDS resolution: so we know where to getRecord 13/// 3. DID -> handle resolution: for bidirectional handle validation and in case we want to offer this 14use std::time::Duration; 15use tokio::sync::Mutex; 16use tokio_util::sync::CancellationToken; 17 18use crate::error::IdentityError; 19use atrium_api::{ 20 did_doc::DidDocument, 21 types::string::{Did, Handle}, 22}; 23use atrium_common::resolver::Resolver; 24use atrium_identity::{ 25 did::{CommonDidResolver, CommonDidResolverConfig, DEFAULT_PLC_DIRECTORY_URL}, 26 handle::{AtprotoHandleResolver, AtprotoHandleResolverConfig, DnsTxtResolver}, 27}; 28use atrium_oauth::DefaultHttpClient; // it's probably not worth bringing all of atrium_oauth for this but 29use foyer::{DirectFsDeviceOptions, Engine, HybridCache, HybridCacheBuilder}; 30use serde::{Deserialize, Serialize}; 31use time::UtcDateTime; 32 33/// once we have something resolved, don't re-resolve until after this period 34const MIN_TTL: Duration = Duration::from_secs(4 * 3600); // probably shoudl have a max ttl 35const MIN_NOT_FOUND_TTL: Duration = Duration::from_secs(60); 36 37#[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)] 38enum IdentityKey { 39 Handle(Handle), 40 Did(Did), 41} 42 43#[derive(Debug, Serialize, Deserialize)] 44struct IdentityVal(UtcDateTime, IdentityData); 45 46#[derive(Debug, Serialize, Deserialize)] 47enum IdentityData { 48 NotFound, 49 Did(Did), 50 Doc(PartialMiniDoc), 51} 52 53/// partial representation of a com.bad-example.identity mini atproto doc 54/// 55/// partial because the handle is not verified 56#[derive(Debug, Clone, Serialize, Deserialize)] 57pub struct PartialMiniDoc { 58 /// an atproto handle (**unverified**) 59 /// 60 /// the first valid atproto handle from the did doc's aka 61 pub unverified_handle: Handle, 62 /// the did's atproto pds url (TODO: type this?) 63 /// 64 /// note: atrium *does* actually parse it into a URI, it just doesn't return 65 /// that for some reason 66 pub pds: String, 67 /// for now we're just pulling this straight from the did doc 68 /// 69 /// would be nice to type and validate it 70 /// 71 /// this is the publicKeyMultibase from the did doc. 72 /// legacy key encoding not supported. 73 /// `id`, `type`, and `controller` must be checked, but aren't stored. 74 pub signing_key: String, 75} 76 77impl TryFrom<DidDocument> for PartialMiniDoc { 78 type Error = String; 79 fn try_from(did_doc: DidDocument) -> Result<Self, Self::Error> { 80 // must use the first valid handle 81 let mut unverified_handle = None; 82 let Some(ref doc_akas) = did_doc.also_known_as else { 83 return Err("did doc missing `also_known_as`".to_string()); 84 }; 85 for aka in doc_akas { 86 let Some(maybe_handle) = aka.strip_prefix("at://") else { 87 continue; 88 }; 89 let Ok(valid_handle) = Handle::new(maybe_handle.to_string()) else { 90 continue; 91 }; 92 unverified_handle = Some(valid_handle); 93 break; 94 } 95 let Some(unverified_handle) = unverified_handle else { 96 return Err("no valid atproto handles in `also_known_as`".to_string()); 97 }; 98 99 // atrium seems to get service endpoint getters 100 let Some(pds) = did_doc.get_pds_endpoint() else { 101 return Err("no valid pds service found".to_string()); 102 }; 103 104 // TODO can't use atrium's get_signing_key() becuase it fails to check type and controller 105 // so if we check those and reject it, we might miss a later valid key in the array 106 // (todo is to fix atrium) 107 // actually: atrium might be flexible for legacy reps. for now we're rejecting legacy rep. 108 109 // must use the first valid signing key 110 let mut signing_key = None; 111 let Some(verification_methods) = did_doc.verification_method else { 112 return Err("no verification methods found".to_string()); 113 }; 114 for method in verification_methods { 115 if method.id != format!("{}#atproto", did_doc.id) { 116 continue; 117 } 118 if method.r#type != "Multikey" { 119 continue; 120 } 121 if method.controller != did_doc.id { 122 continue; 123 } 124 let Some(key) = method.public_key_multibase else { 125 continue; 126 }; 127 signing_key = Some(key); 128 break; 129 } 130 let Some(signing_key) = signing_key else { 131 return Err("no valid atproto signing key found in verification methods".to_string()); 132 }; 133 134 Ok(PartialMiniDoc { 135 unverified_handle, 136 pds, 137 signing_key, 138 }) 139 } 140} 141 142/// multi-producer *single-consumer* queue structures (wrap in arc-mutex plz) 143/// 144/// the hashset allows testing for presense of items in the queue. 145/// this has absolutely no support for multiple queue consumers. 146#[derive(Debug, Default)] 147struct RefreshQueue { 148 queue: VecDeque<IdentityKey>, 149 items: HashSet<IdentityKey>, 150} 151 152#[derive(Clone)] 153pub struct Identity { 154 handle_resolver: Arc<AtprotoHandleResolver<HickoryDnsTxtResolver, DefaultHttpClient>>, 155 did_resolver: Arc<CommonDidResolver<DefaultHttpClient>>, 156 cache: HybridCache<IdentityKey, IdentityVal>, 157 /// multi-producer *single consumer* queue 158 refresh_queue: Arc<Mutex<RefreshQueue>>, 159 /// just a lock to ensure only one refresher (queue consumer) is running (to be improved with a better refresher) 160 refresher: Arc<Mutex<()>>, 161} 162 163impl Identity { 164 pub async fn new(cache_dir: impl AsRef<Path>) -> Result<Self, IdentityError> { 165 let http_client = Arc::new(DefaultHttpClient::default()); 166 let handle_resolver = AtprotoHandleResolver::new(AtprotoHandleResolverConfig { 167 dns_txt_resolver: HickoryDnsTxtResolver::new().unwrap(), 168 http_client: http_client.clone(), 169 }); 170 let did_resolver = CommonDidResolver::new(CommonDidResolverConfig { 171 plc_directory_url: DEFAULT_PLC_DIRECTORY_URL.to_string(), 172 http_client: http_client.clone(), 173 }); 174 175 let cache = HybridCacheBuilder::new() 176 .with_name("identity") 177 .memory(16 * 2_usize.pow(20)) 178 .with_weighter(|k, v| std::mem::size_of_val(k) + std::mem::size_of_val(v)) 179 .storage(Engine::small()) 180 .with_device_options( 181 DirectFsDeviceOptions::new(cache_dir) 182 .with_capacity(2_usize.pow(30)) // TODO: configurable (1GB to have something) 183 .with_file_size(2_usize.pow(20)), // note: this does limit the max cached item size, warning jumbo records 184 ) 185 .build() 186 .await?; 187 188 Ok(Self { 189 handle_resolver: Arc::new(handle_resolver), 190 did_resolver: Arc::new(did_resolver), 191 cache, 192 refresh_queue: Default::default(), 193 refresher: Default::default(), 194 }) 195 } 196 197 /// Resolve (and verify!) an atproto handle to a DID 198 /// 199 /// The result can be stale 200 /// 201 /// `None` if the handle can't be found or verification fails 202 pub async fn handle_to_did(&self, handle: Handle) -> Result<Option<Did>, IdentityError> { 203 let Some(did) = self.handle_to_unverified_did(&handle).await? else { 204 return Ok(None); 205 }; 206 let Some(doc) = self.did_to_partial_mini_doc(&did).await? else { 207 return Ok(None); 208 }; 209 if doc.unverified_handle != handle { 210 return Ok(None); 211 } 212 Ok(Some(did)) 213 } 214 215 /// Resolve a DID to a pds url 216 /// 217 /// This *also* incidentally resolves and verifies the handle, which might 218 /// make it slower than expected 219 pub async fn did_to_pds(&self, did: Did) -> Result<Option<String>, IdentityError> { 220 let Some(mini_doc) = self.did_to_partial_mini_doc(&did).await? else { 221 return Ok(None); 222 }; 223 Ok(Some(mini_doc.pds)) 224 } 225 226 /// Resolve (and cache but **not verify**) a handle to a DID 227 async fn handle_to_unverified_did( 228 &self, 229 handle: &Handle, 230 ) -> Result<Option<Did>, IdentityError> { 231 let key = IdentityKey::Handle(handle.clone()); 232 let entry = self 233 .cache 234 .fetch(key.clone(), { 235 let handle = handle.clone(); 236 let resolver = self.handle_resolver.clone(); 237 || async move { 238 match resolver.resolve(&handle).await { 239 Ok(did) => Ok(IdentityVal(UtcDateTime::now(), IdentityData::Did(did))), 240 Err(atrium_identity::Error::NotFound) => { 241 Ok(IdentityVal(UtcDateTime::now(), IdentityData::NotFound)) 242 } 243 Err(other) => Err(foyer::Error::Other(Box::new({ 244 log::debug!("other error resolving handle: {other:?}"); 245 IdentityError::ResolutionFailed(other) 246 }))), 247 } 248 } 249 }) 250 .await?; 251 252 let now = UtcDateTime::now(); 253 let IdentityVal(last_fetch, data) = entry.value(); 254 match data { 255 IdentityData::Doc(_) => { 256 log::error!("identity value mixup: got a doc from a handle key (should be a did)"); 257 Err(IdentityError::IdentityValTypeMixup(handle.to_string())) 258 } 259 IdentityData::NotFound => { 260 if (now - *last_fetch) >= MIN_NOT_FOUND_TTL { 261 self.queue_refresh(key).await; 262 } 263 Ok(None) 264 } 265 IdentityData::Did(did) => { 266 if (now - *last_fetch) >= MIN_TTL { 267 self.queue_refresh(key).await; 268 } 269 Ok(Some(did.clone())) 270 } 271 } 272 } 273 274 /// Fetch (and cache) a partial mini doc from a did 275 pub async fn did_to_partial_mini_doc( 276 &self, 277 did: &Did, 278 ) -> Result<Option<PartialMiniDoc>, IdentityError> { 279 let key = IdentityKey::Did(did.clone()); 280 let entry = self 281 .cache 282 .fetch(key.clone(), { 283 let did = did.clone(); 284 let resolver = self.did_resolver.clone(); 285 || async move { 286 match resolver.resolve(&did).await { 287 Ok(did_doc) => { 288 // TODO: fix in atrium: should verify id is did 289 if did_doc.id != did.to_string() { 290 return Err(foyer::Error::other(Box::new( 291 IdentityError::BadDidDoc( 292 "did doc's id did not match did".to_string(), 293 ), 294 ))); 295 } 296 let mini_doc = did_doc.try_into().map_err(|e| { 297 foyer::Error::Other(Box::new(IdentityError::BadDidDoc(e))) 298 })?; 299 Ok(IdentityVal(UtcDateTime::now(), IdentityData::Doc(mini_doc))) 300 } 301 Err(atrium_identity::Error::NotFound) => { 302 Ok(IdentityVal(UtcDateTime::now(), IdentityData::NotFound)) 303 } 304 Err(other) => Err(foyer::Error::Other(Box::new( 305 IdentityError::ResolutionFailed(other), 306 ))), 307 } 308 } 309 }) 310 .await?; 311 312 let now = UtcDateTime::now(); 313 let IdentityVal(last_fetch, data) = entry.value(); 314 match data { 315 IdentityData::Did(_) => { 316 log::error!("identity value mixup: got a did from a did key (should be a doc)"); 317 Err(IdentityError::IdentityValTypeMixup(did.to_string())) 318 } 319 IdentityData::NotFound => { 320 if (now - *last_fetch) >= MIN_NOT_FOUND_TTL { 321 self.queue_refresh(key).await; 322 } 323 Ok(None) 324 } 325 IdentityData::Doc(mini_did) => { 326 if (now - *last_fetch) >= MIN_TTL { 327 self.queue_refresh(key).await; 328 } 329 Ok(Some(mini_did.clone())) 330 } 331 } 332 } 333 334 /// put a refresh task on the queue 335 /// 336 /// this can be safely called from multiple concurrent tasks 337 async fn queue_refresh(&self, key: IdentityKey) { 338 // todo: max queue size 339 let mut q = self.refresh_queue.lock().await; 340 if !q.items.contains(&key) { 341 q.items.insert(key.clone()); 342 q.queue.push_back(key); 343 } 344 } 345 346 /// find out what's next in the queue. concurrent consumers are not allowed. 347 /// 348 /// intent is to leave the item in the queue while refreshing, so that a 349 /// producer will not re-add it if it's in progress. there's definitely 350 /// better ways to do this, but this is ~simple for as far as a single 351 /// consumer can take us. 352 /// 353 /// we could take it from the queue but leave it in the set and remove from 354 /// set later, but splitting them apart feels more bug-prone. 355 async fn peek_refresh(&self) -> Option<IdentityKey> { 356 let q = self.refresh_queue.lock().await; 357 q.queue.front().cloned() 358 } 359 360 /// call to clear the latest key from the refresh queue. concurrent consumers not allowed. 361 /// 362 /// must provide the last peeked refresh queue item as a small safety check 363 async fn complete_refresh(&self, key: &IdentityKey) -> Result<(), IdentityError> { 364 let mut q = self.refresh_queue.lock().await; 365 366 let Some(queue_key) = q.queue.pop_front() else { 367 // gone from queue + since we're in an error condition, make sure it's not stuck in items 368 // (not toctou because we have the lock) 369 // bolder here than below and removing from items because if the queue is *empty*, then we 370 // know it hasn't been re-added since losing sync. 371 if q.items.remove(key) { 372 log::error!("identity refresh: queue de-sync: not in "); 373 } else { 374 log::warn!( 375 "identity refresh: tried to complete with wrong key. are multiple queue consumers running?" 376 ); 377 } 378 return Err(IdentityError::RefreshQueueKeyError("no key in queue")); 379 }; 380 381 if queue_key != *key { 382 // extra weird case here, what's the most defensive behaviour? 383 // we have two keys: ours should have been first but isn't. this shouldn't happen, so let's 384 // just leave items alone for it. risks unbounded growth but we're in a bad place already. 385 // the other key is the one we just popped. we didn't want it, so maybe we should put it 386 // back, BUT if we somehow ended up with concurrent consumers, we have bigger problems. take 387 // responsibility for taking it instead: remove it from items as well, and just drop it. 388 // 389 // hope that whoever calls us takes this error seriously. 390 if q.items.remove(&queue_key) { 391 log::warn!( 392 "identity refresh: queue de-sync + dropping a bystander key without refreshing it!" 393 ); 394 } else { 395 // you thought things couldn't get weirder? (i mean hopefully they can't) 396 log::error!("identity refresh: queue de-sync + bystander key also de-sync!?"); 397 } 398 return Err(IdentityError::RefreshQueueKeyError( 399 "wrong key at front of queue", 400 )); 401 } 402 403 if q.items.remove(key) { 404 Ok(()) 405 } else { 406 log::error!("identity refresh: queue de-sync: key not in items"); 407 Err(IdentityError::RefreshQueueKeyError("key not in items")) 408 } 409 } 410 411 /// run the refresh queue consumer 412 pub async fn run_refresher(&self, shutdown: CancellationToken) -> Result<(), IdentityError> { 413 let _guard = self 414 .refresher 415 .try_lock() 416 .expect("there to only be one refresher running"); 417 loop { 418 if shutdown.is_cancelled() { 419 log::info!("identity refresher: exiting for shutdown: closing cache..."); 420 if let Err(e) = self.cache.close().await { 421 log::error!("cache close errored: {e}"); 422 } else { 423 log::info!("identity cache closed.") 424 } 425 return Ok(()); 426 } 427 let Some(task_key) = self.peek_refresh().await else { 428 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; 429 continue; 430 }; 431 match task_key { 432 IdentityKey::Handle(ref handle) => { 433 log::trace!("refreshing handle {handle:?}"); 434 match self.handle_resolver.resolve(handle).await { 435 Ok(did) => { 436 self.cache.insert( 437 task_key.clone(), 438 IdentityVal(UtcDateTime::now(), IdentityData::Did(did)), 439 ); 440 } 441 Err(atrium_identity::Error::NotFound) => { 442 self.cache.insert( 443 task_key.clone(), 444 IdentityVal(UtcDateTime::now(), IdentityData::NotFound), 445 ); 446 } 447 Err(err) => { 448 log::warn!( 449 "failed to refresh handle: {err:?}. leaving stale (should we eventually do something?)" 450 ); 451 } 452 } 453 self.complete_refresh(&task_key).await?; // failures are bugs, so break loop 454 } 455 IdentityKey::Did(ref did) => { 456 log::trace!("refreshing did doc: {did:?}"); 457 458 match self.did_resolver.resolve(did).await { 459 Ok(did_doc) => { 460 // TODO: fix in atrium: should verify id is did 461 if did_doc.id != did.to_string() { 462 log::warn!( 463 "refreshed did doc failed: wrong did doc id. dropping refresh." 464 ); 465 continue; 466 } 467 let mini_doc = match did_doc.try_into() { 468 Ok(md) => md, 469 Err(e) => { 470 log::warn!( 471 "converting mini doc failed: {e:?}. dropping refresh." 472 ); 473 continue; 474 } 475 }; 476 self.cache.insert( 477 task_key.clone(), 478 IdentityVal(UtcDateTime::now(), IdentityData::Doc(mini_doc)), 479 ); 480 } 481 Err(atrium_identity::Error::NotFound) => { 482 self.cache.insert( 483 task_key.clone(), 484 IdentityVal(UtcDateTime::now(), IdentityData::NotFound), 485 ); 486 } 487 Err(err) => { 488 log::warn!( 489 "failed to refresh did doc: {err:?}. leaving stale (should we eventually do something?)" 490 ); 491 } 492 } 493 494 self.complete_refresh(&task_key).await?; // failures are bugs, so break loop 495 } 496 } 497 } 498 } 499} 500 501pub struct HickoryDnsTxtResolver(TokioResolver); 502 503impl HickoryDnsTxtResolver { 504 fn new() -> Result<Self, ResolveError> { 505 Ok(Self(TokioResolver::builder_tokio()?.build())) 506 } 507} 508 509impl DnsTxtResolver for HickoryDnsTxtResolver { 510 async fn resolve( 511 &self, 512 query: &str, 513 ) -> core::result::Result<Vec<String>, Box<dyn std::error::Error + Send + Sync>> { 514 match self.0.txt_lookup(query).await { 515 Ok(r) => { 516 metrics::counter!("whoami_resolve_dns_txt", "success" => "true").increment(1); 517 Ok(r.iter().map(|r| r.to_string()).collect()) 518 } 519 Err(e) => { 520 metrics::counter!("whoami_resolve_dns_txt", "success" => "false").increment(1); 521 Err(e.into()) 522 } 523 } 524 } 525}