social media crossposting tool. 3rd time's the charm
mastodon
misskey
crossposting
bluesky
1from typing import Any
2import dns.resolver
3import requests
4from util.cache import TTLCache
5from util.util import LOGGER
6
7class DidResolver:
8 def __init__(self, plc_host: str) -> None:
9 self.plc_host: str = plc_host
10 self.__cache: TTLCache[str, dict[str, Any]] = TTLCache(ttl_seconds=12*60*60)
11
12 def try_resolve_plc(self, did: str) -> dict[str, Any] | None:
13 url = f"{self.plc_host}/{did}"
14 response = requests.get(url, timeout=10, allow_redirects=True)
15
16 if response.status_code == 200:
17 return response.json()
18 elif response.status_code == 404 or response.status_code == 410:
19 return None # tombstone or not registered
20 else:
21 response.raise_for_status()
22
23 def try_resolve_web(self, did: str) -> dict[str, Any] | None:
24 url = f"http://{did[len('did:web:'):]}/.well-known/did.json"
25 response = requests.get(url, timeout=10, allow_redirects=True)
26
27 if response.status_code == 200:
28 return response.json()
29 elif response.status_code == 404 or response.status_code == 410:
30 return None # tombstone or gone
31 else:
32 response.raise_for_status()
33
34 def resolve_did(self, did: str) -> dict[str, Any]:
35 cached = self.__cache.get(did)
36 if cached:
37 return cached
38
39 if did.startswith('did:plc:'):
40 from_plc = self.try_resolve_plc(did)
41 if from_plc:
42 self.__cache.set(did, from_plc)
43 return from_plc
44 elif did.startswith('did:web:'):
45 from_web = self.try_resolve_web(did)
46 if from_web:
47 self.__cache.set(did, from_web)
48 return from_web
49 raise Exception(f"Failed to resolve {did}!")
50
51class HandleResolver:
52 def __init__(self) -> None:
53 self.__cache: TTLCache[str, str] = TTLCache()
54
55 def try_resolve_dns(self, handle: str) -> str | None:
56 try:
57 dns_query = f"_atproto.{handle}"
58 answers = dns.resolver.resolve(dns_query, "TXT")
59
60 for rdata in answers:
61 for txt_data in rdata.strings:
62 did = txt_data.decode('utf-8').strip()
63 if did.startswith("did="):
64 return did[4:]
65 except dns.resolver.NXDOMAIN:
66 LOGGER.debug(f"DNS record not found for _atproto.{handle}")
67 return None
68 except dns.resolver.NoAnswer:
69 LOGGER.debug(f"No TXT records found for _atproto.{handle}")
70 return None
71
72 def try_resolve_http(self, handle: str) -> str | None:
73 url = f"http://{handle}/.well-known/atproto-did"
74 response = requests.get(url, timeout=10, allow_redirects=True)
75
76 if response.status_code == 200:
77 did = response.text.strip()
78 if did.startswith("did:"):
79 return did
80 else:
81 raise ValueError(f"Got invalid did: from {url} = {did}!")
82 else:
83 response.raise_for_status()
84
85
86 def resolve_handle(self, handle: str) -> str:
87 cached = self.__cache.get(handle)
88 if cached:
89 return cached
90
91 from_dns = self.try_resolve_dns(handle)
92 if from_dns:
93 self.__cache.set(handle, from_dns)
94 return from_dns
95
96 from_http = self.try_resolve_http(handle)
97 if from_http:
98 self.__cache.set(handle, from_http)
99 return from_http
100
101 raise Exception(f"Failed to resolve handle {handle}!")