social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
1from typing import Any 2import dns.resolver 3import requests 4from util.cache import TTLCache 5from util.util import LOGGER 6 7class DidResolver: 8 def __init__(self, plc_host: str) -> None: 9 self.plc_host: str = plc_host 10 self.__cache: TTLCache[str, dict[str, Any]] = TTLCache(ttl_seconds=12*60*60) 11 12 def try_resolve_plc(self, did: str) -> dict[str, Any] | None: 13 url = f"{self.plc_host}/{did}" 14 response = requests.get(url, timeout=10, allow_redirects=True) 15 16 if response.status_code == 200: 17 return response.json() 18 elif response.status_code == 404 or response.status_code == 410: 19 return None # tombstone or not registered 20 else: 21 response.raise_for_status() 22 23 def try_resolve_web(self, did: str) -> dict[str, Any] | None: 24 url = f"http://{did[len('did:web:'):]}/.well-known/did.json" 25 response = requests.get(url, timeout=10, allow_redirects=True) 26 27 if response.status_code == 200: 28 return response.json() 29 elif response.status_code == 404 or response.status_code == 410: 30 return None # tombstone or gone 31 else: 32 response.raise_for_status() 33 34 def resolve_did(self, did: str) -> dict[str, Any]: 35 cached = self.__cache.get(did) 36 if cached: 37 return cached 38 39 if did.startswith('did:plc:'): 40 from_plc = self.try_resolve_plc(did) 41 if from_plc: 42 self.__cache.set(did, from_plc) 43 return from_plc 44 elif did.startswith('did:web:'): 45 from_web = self.try_resolve_web(did) 46 if from_web: 47 self.__cache.set(did, from_web) 48 return from_web 49 raise Exception(f"Failed to resolve {did}!") 50 51class HandleResolver: 52 def __init__(self) -> None: 53 self.__cache: TTLCache[str, str] = TTLCache() 54 55 def try_resolve_dns(self, handle: str) -> str | None: 56 try: 57 dns_query = f"_atproto.{handle}" 58 answers = dns.resolver.resolve(dns_query, "TXT") 59 60 for rdata in answers: 61 for txt_data in rdata.strings: 62 did = txt_data.decode('utf-8').strip() 63 if did.startswith("did="): 64 return did[4:] 65 except dns.resolver.NXDOMAIN: 66 LOGGER.debug(f"DNS record not found for _atproto.{handle}") 67 return None 68 except dns.resolver.NoAnswer: 69 LOGGER.debug(f"No TXT records found for _atproto.{handle}") 70 return None 71 72 def try_resolve_http(self, handle: str) -> str | None: 73 url = f"http://{handle}/.well-known/atproto-did" 74 response = requests.get(url, timeout=10, allow_redirects=True) 75 76 if response.status_code == 200: 77 did = response.text.strip() 78 if did.startswith("did:"): 79 return did 80 else: 81 raise ValueError(f"Got invalid did: from {url} = {did}!") 82 else: 83 response.raise_for_status() 84 85 86 def resolve_handle(self, handle: str) -> str: 87 cached = self.__cache.get(handle) 88 if cached: 89 return cached 90 91 from_dns = self.try_resolve_dns(handle) 92 if from_dns: 93 self.__cache.set(handle, from_dns) 94 return from_dns 95 96 from_http = self.try_resolve_http(handle) 97 if from_http: 98 self.__cache.set(handle, from_http) 99 return from_http 100 101 raise Exception(f"Failed to resolve handle {handle}!")