social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky

add atproto identity basics, some cleanup.

zenfyr.dev b74b024b ed796641

verified
Changed files
+229 -29
atproto
bluesky
cross
mastodon
misskey
util
+101
atproto/identity.py
···
+
from typing import Any
+
import dns.resolver
+
import requests
+
from util.cache import TTLCache
+
from util.util import LOGGER
+
+
class DidResolver:
+
def __init__(self, plc_host: str) -> None:
+
self.plc_host: str = plc_host
+
self.__cache: TTLCache[str, dict[str, Any]] = TTLCache(ttl_seconds=12*60*60)
+
+
def try_resolve_plc(self, did: str) -> dict[str, Any] | None:
+
url = f"{self.plc_host}/{did}"
+
response = requests.get(url, timeout=10, allow_redirects=True)
+
+
if response.status_code == 200:
+
return response.json()
+
elif response.status_code == 404 or response.status_code == 410:
+
return None # tombstone or not registered
+
else:
+
response.raise_for_status()
+
+
def try_resolve_web(self, did: str) -> dict[str, Any] | None:
+
url = f"http://{did[len('did:web:'):]}/.well-known/did.json"
+
response = requests.get(url, timeout=10, allow_redirects=True)
+
+
if response.status_code == 200:
+
return response.json()
+
elif response.status_code == 404 or response.status_code == 410:
+
return None # tombstone or gone
+
else:
+
response.raise_for_status()
+
+
def resolve_did(self, did: str) -> dict[str, Any]:
+
cached = self.__cache.get(did)
+
if cached:
+
return cached
+
+
if did.startswith('did:plc:'):
+
from_plc = self.try_resolve_plc(did)
+
if from_plc:
+
self.__cache.set(did, from_plc)
+
return from_plc
+
elif did.startswith('did:web:'):
+
from_web = self.try_resolve_web(did)
+
if from_web:
+
self.__cache.set(did, from_web)
+
return from_web
+
raise Exception(f"Failed to resolve {did}!")
+
+
class HandleResolver:
+
def __init__(self) -> None:
+
self.__cache: TTLCache[str, str] = TTLCache()
+
+
def try_resolve_dns(self, handle: str) -> str | None:
+
try:
+
dns_query = f"_atproto.{handle}"
+
answers = dns.resolver.resolve(dns_query, "TXT")
+
+
for rdata in answers:
+
for txt_data in rdata.strings:
+
did = txt_data.decode('utf-8').strip()
+
if did.startswith("did="):
+
return did[4:]
+
except dns.resolver.NXDOMAIN:
+
LOGGER.debug(f"DNS record not found for _atproto.{handle}")
+
return None
+
except dns.resolver.NoAnswer:
+
LOGGER.debug(f"No TXT records found for _atproto.{handle}")
+
return None
+
+
def try_resolve_http(self, handle: str) -> str | None:
+
url = f"http://{handle}/.well-known/atproto-did"
+
response = requests.get(url, timeout=10, allow_redirects=True)
+
+
if response.status_code == 200:
+
did = response.text.strip()
+
if did.startswith("did:"):
+
return did
+
else:
+
raise ValueError(f"Got invalid did: from {url} = {did}!")
+
else:
+
response.raise_for_status()
+
+
+
def resolve_handle(self, handle: str) -> str:
+
cached = self.__cache.get(handle)
+
if cached:
+
return cached
+
+
from_dns = self.try_resolve_dns(handle)
+
if from_dns:
+
self.__cache.set(handle, from_dns)
+
return from_dns
+
+
from_http = self.try_resolve_http(handle)
+
if from_http:
+
self.__cache.set(handle, from_http)
+
return from_http
+
+
raise Exception(f"Failed to resolve handle {handle}!")
+19
bluesky/info.py
···
+
from abc import ABC
+
from typing import Any
+
from cross.service import Service
+
from util.util import normalize_service_url
+
+
def validate_and_transform(data: dict[str, Any]):
+
if not data["handle"] and not data["did"]:
+
raise KeyError("no 'handle' or 'did' specified for bluesky input!")
+
+
if "did" in data:
+
did = str(data["did"]) # only did:web and did:plc are supported
+
if not did.startswith("did:plc:") and not did.startswith("did:web:"):
+
raise ValueError(f"Invalid handle {did}!")
+
+
if "pds" in data:
+
data["pds"] = normalize_service_url(data["pds"])
+
+
class BlueskyService(ABC, Service):
+
pass
+38
bluesky/input.py
···
+
from abc import ABC
+
from dataclasses import dataclass, field
+
import re
+
from typing import Any, Callable, override
+
+
from bluesky.info import BlueskyService, validate_and_transform
+
from cross.service import InputService, OutputService
+
+
+
@dataclass(kw_only=True)
+
class BlueskyInputOptions:
+
handle: str | None
+
did: str | None
+
pds: str | None
+
filters: list[re.Pattern[str]] = field(default_factory=lambda: [])
+
+
@classmethod
+
def from_dict(cls, data: dict[str, Any]) -> "BlueskyInputOptions":
+
validate_and_transform(data)
+
+
if "filters" in data:
+
data["filters"] = [re.compile(r) for r in data["filters"]]
+
+
return BlueskyInputOptions(**data)
+
+
+
class BlueskyBaseInputService(BlueskyService, InputService, ABC):
+
pass
+
+
+
class BlueskyJetstreamInputService(BlueskyBaseInputService):
+
@override
+
async def listen(
+
self,
+
outputs: list[OutputService],
+
submitter: Callable[[Callable[[], None]], None],
+
):
+
return await super().listen(outputs, submitter) # TODO
+4 -2
cross/service.py
···
import sqlite3
+
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Callable, cast
···
LOGGER.warning("NOT IMPLEMENTED (%s), delete_repost %s", self.url, repost_id)
-
class InputService(Service):
+
class InputService(ABC, Service):
+
@abstractmethod
async def listen(
self,
outputs: list[OutputService],
submitter: Callable[[Callable[[], None]], None],
-
): # pyright: ignore[reportUnusedParameter]
+
):
pass
+2
env.py
···
import os
+
DEV = bool(os.environ.get("DEV")) or False
DATA_DIR = os.environ.get("DATA_DIR") or "./data"
MIGRATIONS_DIR = os.environ.get("MIGRATIONS_DIR") or "./migrations"
+
PLC_HOST = os.environ.get("PLC_HOST") or "http://plc.directory"
+6 -1
mastodon/info.py
···
import requests
from cross.service import Service
-
from util.util import LOGGER
+
from util.util import LOGGER, normalize_service_url
+
+
def validate_and_transform(data: dict[str, Any]):
+
if 'token' not in data or 'instance' not in data:
+
raise KeyError("Missing required values 'token' or 'instance'")
+
data["instance"] = normalize_service_url(data["instance"])
@dataclass(kw_only=True)
class InstanceInfo:
+5 -9
mastodon/input.py
···
import websockets
from cross.service import InputService, OutputService
-
from mastodon.info import MastodonService
+
from mastodon.info import MastodonService, validate_and_transform
from util.util import LOGGER
ALLOWED_VISIBILITY: list[str] = ["public", "unlisted"]
···
allowed_visibility: list[str] = field(
default_factory=lambda: ALLOWED_VISIBILITY.copy()
)
-
regex_filters: list[re.Pattern[str]] = field(default_factory=lambda: [])
+
filters: list[re.Pattern[str]] = field(default_factory=lambda: [])
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "MastodonInputOptions":
-
data["instance"] = (
-
data["instance"][:-1]
-
if data["instance"].endswith("/")
-
else data["instance"]
-
)
+
validate_and_transform(data)
if "allowed_visibility" in data:
for vis in data.get("allowed_visibility", []):
if vis not in ALLOWED_VISIBILITY:
raise ValueError(f"Invalid visibility option {vis}!")
-
if "regex_filters" in data:
-
data["regex_filters"] = [re.compile(r) for r in data["regex_filters"]]
+
if "filters" in data:
+
data["filters"] = [re.compile(r) for r in data["filters"]]
return MastodonInputOptions(**data)
+3 -7
mastodon/output.py
···
from typing import Any, override
from cross.service import OutputService
-
from mastodon.info import InstanceInfo, MastodonService
+
from mastodon.info import InstanceInfo, MastodonService, validate_and_transform
from util.util import LOGGER
ALLOWED_POSTING_VISIBILITY: list[str] = ["public", "unlisted", "private"]
···
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "MastodonOutputOptions":
-
data["instance"] = (
-
data["instance"][:-1]
-
if data["instance"].endswith("/")
-
else data["instance"]
-
)
+
validate_and_transform(data)
if "visibility" in data:
if data["visibility"] not in ALLOWED_POSTING_VISIBILITY:
···
LOGGER.info("Getting %s configuration...", self.url)
responce = self.fetch_instance_info()
-
self.instance_info = InstanceInfo.from_api(responce)
+
self.instance_info: InstanceInfo = InstanceInfo.from_api(responce)
@override
def _get_token(self) -> str:
+5 -9
misskey/input.py
···
from cross.service import InputService, OutputService
from misskey.info import MisskeyService
-
from util.util import LOGGER
+
from util.util import LOGGER, normalize_service_url
ALLOWED_VISIBILITY = ["public", "home"]
···
allowed_visibility: list[str] = field(
default_factory=lambda: ALLOWED_VISIBILITY.copy()
)
-
regex_filters: list[re.Pattern[str]] = field(default_factory=lambda: [])
+
filters: list[re.Pattern[str]] = field(default_factory=lambda: [])
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "MisskeyInputOptions":
-
data["instance"] = (
-
data["instance"][:-1]
-
if data["instance"].endswith("/")
-
else data["instance"]
-
)
+
data["instance"] = normalize_service_url(data["instance"])
if "allowed_visibility" in data:
for vis in data.get("allowed_visibility", []):
if vis not in ALLOWED_VISIBILITY:
raise ValueError(f"Invalid visibility option {vis}!")
-
if "regex_filters" in data:
-
data["regex_filters"] = [re.compile(r) for r in data["regex_filters"]]
+
if "filters" in data:
+
data["filters"] = [re.compile(r) for r in data["filters"]]
return MisskeyInputOptions(**data)
+1
pyproject.toml
···
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
+
"dnspython>=2.8.0",
"python-magic>=0.4.27",
"requests>=2.32.5",
"websockets>=15.0.1",
+25
util/cache.py
···
+
import time
+
from typing import Generic, TypeVar
+
+
K = TypeVar("K")
+
V = TypeVar("V")
+
+
class TTLCache(Generic[K, V]):
+
def __init__(self, ttl_seconds: int = 3600) -> None:
+
self.ttl: int = ttl_seconds
+
self.__cache: dict[K, tuple[V, float]] = {}
+
+
def get(self, key: K) -> V | None:
+
if key in self.__cache:
+
value, timestamp = self.__cache[key]
+
if time.time() - timestamp < self.ttl:
+
return value
+
else:
+
del self.__cache[key]
+
return None
+
+
def set(self, key: K, value: V) -> None:
+
self.__cache[key] = (value, time.time())
+
+
def clear(self) -> None:
+
self.__cache.clear()
+9 -1
util/util.py
···
import os
from typing import Any
-
logging.basicConfig(stream=sys.stderr, level=logging.INFO)
+
import env
+
+
logging.basicConfig(stream=sys.stderr, level=logging.DEBUG if env.DEV else logging.INFO)
LOGGER = logging.getLogger("XPost")
+
+
def normalize_service_url(url: str) -> str:
+
if not url.startswith("https://") and not url.startswith("http://"):
+
raise ValueError(f"Invalid service url {url}! Only http/https are supported.")
+
+
return url[:-1] if url.endswith('/') else url
def read_env(data: dict[str, Any]) -> None:
keys = list(data.keys())
+11
uv.lock
···
]
[[package]]
+
name = "dnspython"
+
version = "2.8.0"
+
source = { registry = "https://pypi.org/simple" }
+
sdist = { url = "https://files.pythonhosted.org/packages/8c/8b/57666417c0f90f08bcafa776861060426765fdb422eb10212086fb811d26/dnspython-2.8.0.tar.gz", hash = "sha256:181d3c6996452cb1189c4046c61599b84a5a86e099562ffde77d26984ff26d0f", size = 368251, upload-time = "2025-09-07T18:58:00.022Z" }
+
wheels = [
+
{ url = "https://files.pythonhosted.org/packages/ba/5a/18ad964b0086c6e62e2e7500f7edc89e3faa45033c71c1893d34eed2b2de/dnspython-2.8.0-py3-none-any.whl", hash = "sha256:01d9bbc4a2d76bf0db7c1f729812ded6d912bd318d3b1cf81d30c0f845dbf3af", size = 331094, upload-time = "2025-09-07T18:57:58.071Z" },
+
]
+
+
[[package]]
name = "idna"
version = "3.11"
source = { registry = "https://pypi.org/simple" }
···
version = "0.1.0"
source = { virtual = "." }
dependencies = [
+
{ name = "dnspython" },
{ name = "python-magic" },
{ name = "requests" },
{ name = "websockets" },
···
[package.metadata]
requires-dist = [
+
{ name = "dnspython", specifier = ">=2.8.0" },
{ name = "python-magic", specifier = ">=0.4.27" },
{ name = "requests", specifier = ">=2.32.5" },
{ name = "websockets", specifier = ">=15.0.1" },