social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
at next 5.3 kB view raw
1from pathlib import Path 2from typing import Any, override 3 4import dns.resolver 5import requests 6 7import env 8from util.cache import Cacheable, TTLCache 9from util.util import LOGGER, normalize_service_url, shutdown_hook 10 11 12class DidDocument(): 13 def __init__(self, raw_doc: dict[str, Any]) -> None: 14 self.raw: dict[str, Any] = raw_doc 15 self.atproto_pds: str | None = None 16 17 def get_atproto_pds(self) -> str | None: 18 if self.atproto_pds: 19 return self.atproto_pds 20 21 services = self.raw.get("service") 22 if not services: 23 return None 24 25 for service in services: 26 if ( 27 service.get("id") == "#atproto_pds" 28 and service.get("type") == "AtprotoPersonalDataServer" 29 ): 30 endpoint = service.get("serviceEndpoint") 31 if endpoint: 32 url = normalize_service_url(endpoint) 33 self.atproto_pds = url 34 return url 35 self.atproto_pds = "" 36 return None 37 38 39class DidResolver(Cacheable): 40 def __init__(self, plc_host: str) -> None: 41 self.plc_host: str = plc_host 42 self.__cache: TTLCache[str, DidDocument] = TTLCache(ttl_seconds=12 * 60 * 60) 43 44 def try_resolve_plc(self, did: str) -> DidDocument | None: 45 url = f"{self.plc_host}/{did}" 46 response = requests.get(url, timeout=10, allow_redirects=True) 47 48 if response.status_code == 200: 49 return DidDocument(response.json()) 50 elif response.status_code == 404 or response.status_code == 410: 51 return None # tombstone or not registered 52 else: 53 response.raise_for_status() 54 55 def try_resolve_web(self, did: str) -> DidDocument | None: 56 url = f"http://{did[len('did:web:') :]}/.well-known/did.json" 57 response = requests.get(url, timeout=10, allow_redirects=True) 58 59 if response.status_code == 200: 60 return DidDocument(response.json()) 61 elif response.status_code == 404 or response.status_code == 410: 62 return None # tombstone or gone 63 else: 64 response.raise_for_status() 65 66 def resolve_did(self, did: str) -> DidDocument: 67 cached = self.__cache.get(did) 68 if cached: 69 return cached 70 71 if did.startswith("did:plc:"): 72 from_plc = self.try_resolve_plc(did) 73 if from_plc: 74 self.__cache.set(did, from_plc) 75 return from_plc 76 elif did.startswith("did:web:"): 77 from_web = self.try_resolve_web(did) 78 if from_web: 79 self.__cache.set(did, from_web) 80 return from_web 81 raise Exception(f"Failed to resolve {did}!") 82 83 @override 84 def dump_cache(self, path: Path): 85 self.__cache.dump_cache(path) 86 87 @override 88 def load_cache(self, path: Path): 89 self.__cache.load_cache(path) 90 91class HandleResolver(Cacheable): 92 def __init__(self) -> None: 93 self.__cache: TTLCache[str, str] = TTLCache(ttl_seconds=12 * 60 * 60) 94 95 def try_resolve_dns(self, handle: str) -> str | None: 96 try: 97 dns_query = f"_atproto.{handle}" 98 answers = dns.resolver.resolve(dns_query, "TXT") 99 100 for rdata in answers: 101 for txt_data in rdata.strings: 102 did = txt_data.decode("utf-8").strip() 103 if did.startswith("did="): 104 return did[4:] 105 except dns.resolver.NXDOMAIN: 106 LOGGER.debug(f"DNS record not found for _atproto.{handle}") 107 return None 108 except dns.resolver.NoAnswer: 109 LOGGER.debug(f"No TXT records found for _atproto.{handle}") 110 return None 111 112 def try_resolve_http(self, handle: str) -> str | None: 113 url = f"http://{handle}/.well-known/atproto-did" 114 response = requests.get(url, timeout=10, allow_redirects=True) 115 116 if response.status_code == 200: 117 did = response.text.strip() 118 if did.startswith("did:"): 119 return did 120 else: 121 raise ValueError(f"Got invalid did: from {url} = {did}!") 122 else: 123 response.raise_for_status() 124 125 def resolve_handle(self, handle: str) -> str: 126 cached = self.__cache.get(handle) 127 if cached: 128 return cached 129 130 from_dns = self.try_resolve_dns(handle) 131 if from_dns: 132 self.__cache.set(handle, from_dns) 133 return from_dns 134 135 from_http = self.try_resolve_http(handle) 136 if from_http: 137 self.__cache.set(handle, from_http) 138 return from_http 139 140 raise Exception(f"Failed to resolve handle {handle}!") 141 142 @override 143 def dump_cache(self, path: Path): 144 self.__cache.dump_cache(path) 145 146 @override 147 def load_cache(self, path: Path): 148 self.__cache.load_cache(path) 149 150 151handle_resolver = HandleResolver() 152did_resolver = DidResolver(env.PLC_HOST) 153 154did_cache = env.CACHE_DIR.joinpath('did.cache') 155handle_cache = env.CACHE_DIR.joinpath('handle.cache') 156 157did_resolver.load_cache(did_cache) 158handle_resolver.load_cache(handle_cache) 159 160def cache_dump(): 161 did_resolver.dump_cache(did_cache) 162 handle_resolver.dump_cache(handle_cache) 163 164shutdown_hook.append(cache_dump)