social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
1from typing import Any 2 3import dns.resolver 4import requests 5 6import env 7from util.cache import TTLCache 8from util.util import LOGGER, normalize_service_url 9 10 11class DidDocument: 12 def __init__(self, raw_doc: dict[str, Any]) -> None: 13 self.raw: dict[str, Any] = raw_doc 14 self.atproto_pds: str | None = None 15 16 def get_atproto_pds(self) -> str | None: 17 if self.atproto_pds: 18 return self.atproto_pds 19 20 services = self.raw.get("service") 21 if not services: 22 return None 23 24 for service in services: 25 if ( 26 service.get("id") == "#atproto_pds" 27 and service.get("type") == "AtprotoPersonalDataServer" 28 ): 29 endpoint = service.get("serviceEndpoint") 30 if endpoint: 31 url = normalize_service_url(endpoint) 32 self.atproto_pds = url 33 return url 34 self.atproto_pds = "" 35 return None 36 37 38class DidResolver: 39 def __init__(self, plc_host: str) -> None: 40 self.plc_host: str = plc_host 41 self.__cache: TTLCache[str, DidDocument] = TTLCache(ttl_seconds=12 * 60 * 60) 42 43 def try_resolve_plc(self, did: str) -> DidDocument | None: 44 url = f"{self.plc_host}/{did}" 45 response = requests.get(url, timeout=10, allow_redirects=True) 46 47 if response.status_code == 200: 48 return DidDocument(response.json()) 49 elif response.status_code == 404 or response.status_code == 410: 50 return None # tombstone or not registered 51 else: 52 response.raise_for_status() 53 54 def try_resolve_web(self, did: str) -> DidDocument | None: 55 url = f"http://{did[len('did:web:') :]}/.well-known/did.json" 56 response = requests.get(url, timeout=10, allow_redirects=True) 57 58 if response.status_code == 200: 59 return DidDocument(response.json()) 60 elif response.status_code == 404 or response.status_code == 410: 61 return None # tombstone or gone 62 else: 63 response.raise_for_status() 64 65 def resolve_did(self, did: str) -> DidDocument: 66 cached = self.__cache.get(did) 67 if cached: 68 return cached 69 70 if did.startswith("did:plc:"): 71 from_plc = self.try_resolve_plc(did) 72 if from_plc: 73 self.__cache.set(did, from_plc) 74 return from_plc 75 elif did.startswith("did:web:"): 76 from_web = self.try_resolve_web(did) 77 if from_web: 78 self.__cache.set(did, from_web) 79 return from_web 80 raise Exception(f"Failed to resolve {did}!") 81 82 83class HandleResolver: 84 def __init__(self) -> None: 85 self.__cache: TTLCache[str, str] = TTLCache() 86 87 def try_resolve_dns(self, handle: str) -> str | None: 88 try: 89 dns_query = f"_atproto.{handle}" 90 answers = dns.resolver.resolve(dns_query, "TXT") 91 92 for rdata in answers: 93 for txt_data in rdata.strings: 94 did = txt_data.decode("utf-8").strip() 95 if did.startswith("did="): 96 return did[4:] 97 except dns.resolver.NXDOMAIN: 98 LOGGER.debug(f"DNS record not found for _atproto.{handle}") 99 return None 100 except dns.resolver.NoAnswer: 101 LOGGER.debug(f"No TXT records found for _atproto.{handle}") 102 return None 103 104 def try_resolve_http(self, handle: str) -> str | None: 105 url = f"http://{handle}/.well-known/atproto-did" 106 response = requests.get(url, timeout=10, allow_redirects=True) 107 108 if response.status_code == 200: 109 did = response.text.strip() 110 if did.startswith("did:"): 111 return did 112 else: 113 raise ValueError(f"Got invalid did: from {url} = {did}!") 114 else: 115 response.raise_for_status() 116 117 def resolve_handle(self, handle: str) -> str: 118 cached = self.__cache.get(handle) 119 if cached: 120 return cached 121 122 from_dns = self.try_resolve_dns(handle) 123 if from_dns: 124 self.__cache.set(handle, from_dns) 125 return from_dns 126 127 from_http = self.try_resolve_http(handle) 128 if from_http: 129 self.__cache.set(handle, from_http) 130 return from_http 131 132 raise Exception(f"Failed to resolve handle {handle}!") 133 134 135handle_resolver = HandleResolver() 136did_resolver = DidResolver(env.PLC_HOST)