social media crossposting tool. 3rd time's the charm
mastodon
misskey
crossposting
bluesky
1from pathlib import Path
2from typing import Any, override
3
4import dns.resolver
5import requests
6
7import env
8from util.cache import Cacheable, TTLCache
9from util.util import LOGGER, normalize_service_url, shutdown_hook
10
11
12class DidDocument():
13 def __init__(self, raw_doc: dict[str, Any]) -> None:
14 self.raw: dict[str, Any] = raw_doc
15 self.atproto_pds: str | None = None
16
17 def get_atproto_pds(self) -> str | None:
18 if self.atproto_pds:
19 return self.atproto_pds
20
21 services = self.raw.get("service")
22 if not services:
23 return None
24
25 for service in services:
26 if (
27 service.get("id") == "#atproto_pds"
28 and service.get("type") == "AtprotoPersonalDataServer"
29 ):
30 endpoint = service.get("serviceEndpoint")
31 if endpoint:
32 url = normalize_service_url(endpoint)
33 self.atproto_pds = url
34 return url
35 self.atproto_pds = ""
36 return None
37
38
39class DidResolver(Cacheable):
40 def __init__(self, plc_host: str) -> None:
41 self.plc_host: str = plc_host
42 self.__cache: TTLCache[str, DidDocument] = TTLCache(ttl_seconds=12 * 60 * 60)
43
44 def try_resolve_plc(self, did: str) -> DidDocument | None:
45 url = f"{self.plc_host}/{did}"
46 response = requests.get(url, timeout=10, allow_redirects=True)
47
48 if response.status_code == 200:
49 return DidDocument(response.json())
50 elif response.status_code == 404 or response.status_code == 410:
51 return None # tombstone or not registered
52 else:
53 response.raise_for_status()
54
55 def try_resolve_web(self, did: str) -> DidDocument | None:
56 url = f"http://{did[len('did:web:') :]}/.well-known/did.json"
57 response = requests.get(url, timeout=10, allow_redirects=True)
58
59 if response.status_code == 200:
60 return DidDocument(response.json())
61 elif response.status_code == 404 or response.status_code == 410:
62 return None # tombstone or gone
63 else:
64 response.raise_for_status()
65
66 def resolve_did(self, did: str) -> DidDocument:
67 cached = self.__cache.get(did)
68 if cached:
69 return cached
70
71 if did.startswith("did:plc:"):
72 from_plc = self.try_resolve_plc(did)
73 if from_plc:
74 self.__cache.set(did, from_plc)
75 return from_plc
76 elif did.startswith("did:web:"):
77 from_web = self.try_resolve_web(did)
78 if from_web:
79 self.__cache.set(did, from_web)
80 return from_web
81 raise Exception(f"Failed to resolve {did}!")
82
83 @override
84 def dump_cache(self, path: Path):
85 self.__cache.dump_cache(path)
86
87 @override
88 def load_cache(self, path: Path):
89 self.__cache.load_cache(path)
90
91class HandleResolver(Cacheable):
92 def __init__(self) -> None:
93 self.__cache: TTLCache[str, str] = TTLCache(ttl_seconds=12 * 60 * 60)
94
95 def try_resolve_dns(self, handle: str) -> str | None:
96 try:
97 dns_query = f"_atproto.{handle}"
98 answers = dns.resolver.resolve(dns_query, "TXT")
99
100 for rdata in answers:
101 for txt_data in rdata.strings:
102 did = txt_data.decode("utf-8").strip()
103 if did.startswith("did="):
104 return did[4:]
105 except dns.resolver.NXDOMAIN:
106 LOGGER.debug(f"DNS record not found for _atproto.{handle}")
107 return None
108 except dns.resolver.NoAnswer:
109 LOGGER.debug(f"No TXT records found for _atproto.{handle}")
110 return None
111
112 def try_resolve_http(self, handle: str) -> str | None:
113 url = f"http://{handle}/.well-known/atproto-did"
114 response = requests.get(url, timeout=10, allow_redirects=True)
115
116 if response.status_code == 200:
117 did = response.text.strip()
118 if did.startswith("did:"):
119 return did
120 else:
121 raise ValueError(f"Got invalid did: from {url} = {did}!")
122 else:
123 response.raise_for_status()
124
125 def resolve_handle(self, handle: str) -> str:
126 cached = self.__cache.get(handle)
127 if cached:
128 return cached
129
130 from_dns = self.try_resolve_dns(handle)
131 if from_dns:
132 self.__cache.set(handle, from_dns)
133 return from_dns
134
135 from_http = self.try_resolve_http(handle)
136 if from_http:
137 self.__cache.set(handle, from_http)
138 return from_http
139
140 raise Exception(f"Failed to resolve handle {handle}!")
141
142 @override
143 def dump_cache(self, path: Path):
144 self.__cache.dump_cache(path)
145
146 @override
147 def load_cache(self, path: Path):
148 self.__cache.load_cache(path)
149
150
151handle_resolver = HandleResolver()
152did_resolver = DidResolver(env.PLC_HOST)
153
154did_cache = env.CACHE_DIR.joinpath('did.cache')
155handle_cache = env.CACHE_DIR.joinpath('handle.cache')
156
157did_resolver.load_cache(did_cache)
158handle_resolver.load_cache(handle_cache)
159
160def cache_dump():
161 did_resolver.dump_cache(did_cache)
162 handle_resolver.dump_cache(handle_cache)
163
164shutdown_hook.append(cache_dump)