from pathlib import Path from typing import Any, override import dns.resolver import requests import env from util.cache import Cacheable, TTLCache from util.util import LOGGER, normalize_service_url, shutdown_hook class DidDocument(): def __init__(self, raw_doc: dict[str, Any]) -> None: self.raw: dict[str, Any] = raw_doc self.atproto_pds: str | None = None def get_atproto_pds(self) -> str | None: if self.atproto_pds: return self.atproto_pds services = self.raw.get("service") if not services: return None for service in services: if ( service.get("id") == "#atproto_pds" and service.get("type") == "AtprotoPersonalDataServer" ): endpoint = service.get("serviceEndpoint") if endpoint: url = normalize_service_url(endpoint) self.atproto_pds = url return url self.atproto_pds = "" return None class DidResolver(Cacheable): def __init__(self, plc_host: str) -> None: self.plc_host: str = plc_host self.__cache: TTLCache[str, DidDocument] = TTLCache(ttl_seconds=12 * 60 * 60) def try_resolve_plc(self, did: str) -> DidDocument | None: url = f"{self.plc_host}/{did}" response = requests.get(url, timeout=10, allow_redirects=True) if response.status_code == 200: return DidDocument(response.json()) elif response.status_code == 404 or response.status_code == 410: return None # tombstone or not registered else: response.raise_for_status() def try_resolve_web(self, did: str) -> DidDocument | None: url = f"http://{did[len('did:web:') :]}/.well-known/did.json" response = requests.get(url, timeout=10, allow_redirects=True) if response.status_code == 200: return DidDocument(response.json()) elif response.status_code == 404 or response.status_code == 410: return None # tombstone or gone else: response.raise_for_status() def resolve_did(self, did: str) -> DidDocument: cached = self.__cache.get(did) if cached: return cached if did.startswith("did:plc:"): from_plc = self.try_resolve_plc(did) if from_plc: self.__cache.set(did, from_plc) return from_plc elif did.startswith("did:web:"): from_web = self.try_resolve_web(did) if from_web: self.__cache.set(did, from_web) return from_web raise Exception(f"Failed to resolve {did}!") @override def dump_cache(self, path: Path): self.__cache.dump_cache(path) @override def load_cache(self, path: Path): self.__cache.load_cache(path) class HandleResolver(Cacheable): def __init__(self) -> None: self.__cache: TTLCache[str, str] = TTLCache(ttl_seconds=12 * 60 * 60) def try_resolve_dns(self, handle: str) -> str | None: try: dns_query = f"_atproto.{handle}" answers = dns.resolver.resolve(dns_query, "TXT") for rdata in answers: for txt_data in rdata.strings: did = txt_data.decode("utf-8").strip() if did.startswith("did="): return did[4:] except dns.resolver.NXDOMAIN: LOGGER.debug(f"DNS record not found for _atproto.{handle}") return None except dns.resolver.NoAnswer: LOGGER.debug(f"No TXT records found for _atproto.{handle}") return None def try_resolve_http(self, handle: str) -> str | None: url = f"http://{handle}/.well-known/atproto-did" response = requests.get(url, timeout=10, allow_redirects=True) if response.status_code == 200: did = response.text.strip() if did.startswith("did:"): return did else: raise ValueError(f"Got invalid did: from {url} = {did}!") else: response.raise_for_status() def resolve_handle(self, handle: str) -> str: cached = self.__cache.get(handle) if cached: return cached from_dns = self.try_resolve_dns(handle) if from_dns: self.__cache.set(handle, from_dns) return from_dns from_http = self.try_resolve_http(handle) if from_http: self.__cache.set(handle, from_http) return from_http raise Exception(f"Failed to resolve handle {handle}!") @override def dump_cache(self, path: Path): self.__cache.dump_cache(path) @override def load_cache(self, path: Path): self.__cache.load_cache(path) handle_resolver = HandleResolver() did_resolver = DidResolver(env.PLC_HOST) did_cache = env.CACHE_DIR.joinpath('did.cache') handle_cache = env.CACHE_DIR.joinpath('handle.cache') did_resolver.load_cache(did_cache) handle_resolver.load_cache(handle_cache) def cache_dump(): did_resolver.dump_cache(did_cache) handle_resolver.dump_cache(handle_cache) shutdown_hook.append(cache_dump)