social media crossposting tool. 3rd time's the charm
mastodon
misskey
crossposting
bluesky
1from typing import Any
2
3import dns.resolver
4import requests
5
6import env
7from util.cache import TTLCache
8from util.util import LOGGER, normalize_service_url
9
10
11class DidDocument:
12 def __init__(self, raw_doc: dict[str, Any]) -> None:
13 self.raw: dict[str, Any] = raw_doc
14 self.atproto_pds: str | None = None
15
16 def get_atproto_pds(self) -> str | None:
17 if self.atproto_pds:
18 return self.atproto_pds
19
20 services = self.raw.get("service")
21 if not services:
22 return None
23
24 for service in services:
25 if (
26 service.get("id") == "#atproto_pds"
27 and service.get("type") == "AtprotoPersonalDataServer"
28 ):
29 endpoint = service.get("serviceEndpoint")
30 if endpoint:
31 url = normalize_service_url(endpoint)
32 self.atproto_pds = url
33 return url
34 self.atproto_pds = ""
35 return None
36
37
38class DidResolver:
39 def __init__(self, plc_host: str) -> None:
40 self.plc_host: str = plc_host
41 self.__cache: TTLCache[str, DidDocument] = TTLCache(ttl_seconds=12 * 60 * 60)
42
43 def try_resolve_plc(self, did: str) -> DidDocument | None:
44 url = f"{self.plc_host}/{did}"
45 response = requests.get(url, timeout=10, allow_redirects=True)
46
47 if response.status_code == 200:
48 return DidDocument(response.json())
49 elif response.status_code == 404 or response.status_code == 410:
50 return None # tombstone or not registered
51 else:
52 response.raise_for_status()
53
54 def try_resolve_web(self, did: str) -> DidDocument | None:
55 url = f"http://{did[len('did:web:') :]}/.well-known/did.json"
56 response = requests.get(url, timeout=10, allow_redirects=True)
57
58 if response.status_code == 200:
59 return DidDocument(response.json())
60 elif response.status_code == 404 or response.status_code == 410:
61 return None # tombstone or gone
62 else:
63 response.raise_for_status()
64
65 def resolve_did(self, did: str) -> DidDocument:
66 cached = self.__cache.get(did)
67 if cached:
68 return cached
69
70 if did.startswith("did:plc:"):
71 from_plc = self.try_resolve_plc(did)
72 if from_plc:
73 self.__cache.set(did, from_plc)
74 return from_plc
75 elif did.startswith("did:web:"):
76 from_web = self.try_resolve_web(did)
77 if from_web:
78 self.__cache.set(did, from_web)
79 return from_web
80 raise Exception(f"Failed to resolve {did}!")
81
82
83class HandleResolver:
84 def __init__(self) -> None:
85 self.__cache: TTLCache[str, str] = TTLCache()
86
87 def try_resolve_dns(self, handle: str) -> str | None:
88 try:
89 dns_query = f"_atproto.{handle}"
90 answers = dns.resolver.resolve(dns_query, "TXT")
91
92 for rdata in answers:
93 for txt_data in rdata.strings:
94 did = txt_data.decode("utf-8").strip()
95 if did.startswith("did="):
96 return did[4:]
97 except dns.resolver.NXDOMAIN:
98 LOGGER.debug(f"DNS record not found for _atproto.{handle}")
99 return None
100 except dns.resolver.NoAnswer:
101 LOGGER.debug(f"No TXT records found for _atproto.{handle}")
102 return None
103
104 def try_resolve_http(self, handle: str) -> str | None:
105 url = f"http://{handle}/.well-known/atproto-did"
106 response = requests.get(url, timeout=10, allow_redirects=True)
107
108 if response.status_code == 200:
109 did = response.text.strip()
110 if did.startswith("did:"):
111 return did
112 else:
113 raise ValueError(f"Got invalid did: from {url} = {did}!")
114 else:
115 response.raise_for_status()
116
117 def resolve_handle(self, handle: str) -> str:
118 cached = self.__cache.get(handle)
119 if cached:
120 return cached
121
122 from_dns = self.try_resolve_dns(handle)
123 if from_dns:
124 self.__cache.set(handle, from_dns)
125 return from_dns
126
127 from_http = self.try_resolve_http(handle)
128 if from_http:
129 self.__cache.set(handle, from_http)
130 return from_http
131
132 raise Exception(f"Failed to resolve handle {handle}!")
133
134
135handle_resolver = HandleResolver()
136did_resolver = DidResolver(env.PLC_HOST)