social media crossposting tool. 3rd time's the charm
mastodon
misskey
crossposting
bluesky
1from abc import ABC, abstractmethod
2from dataclasses import dataclass
3from typing import Any
4
5import requests
6
7from cross.service import Service
8from util.util import LOGGER, normalize_service_url
9
10def validate_and_transform(data: dict[str, Any]):
11 if 'token' not in data or 'instance' not in data:
12 raise KeyError("Missing required values 'token' or 'instance'")
13
14 data["instance"] = normalize_service_url(data["instance"])
15
16@dataclass(kw_only=True)
17class InstanceInfo:
18 max_characters: int = 500
19 max_media_attachments: int = 4
20 characters_reserved_per_url: int = 23
21
22 image_size_limit: int = 16777216
23 video_size_limit: int = 103809024
24
25 @classmethod
26 def from_api(cls, data: dict[str, Any]) -> "InstanceInfo":
27 config: dict[str, Any] = {}
28
29 if "statuses" in data:
30 statuses_config: dict[str, Any] = data.get("statuses", {})
31 if "max_characters" in statuses_config:
32 config["max_characters"] = statuses_config["max_characters"]
33 if "max_media_attachments" in statuses_config:
34 config["max_media_attachments"] = statuses_config[
35 "max_media_attachments"
36 ]
37 if "characters_reserved_per_url" in statuses_config:
38 config["characters_reserved_per_url"] = statuses_config[
39 "characters_reserved_per_url"
40 ]
41
42 if "media_attachments" in data:
43 media_config: dict[str, Any] = data.get("media_attachments", {})
44 if "image_size_limit" in media_config:
45 config["image_size_limit"] = media_config["image_size_limit"]
46 if "video_size_limit" in media_config:
47 config["video_size_limit"] = media_config["video_size_limit"]
48
49 # *oma extensions
50 if "max_toot_chars" in data:
51 config["max_characters"] = data["max_toot_chars"]
52 if "upload_limit" in data:
53 config["image_size_limit"] = data["upload_limit"]
54 config["video_size_limit"] = data["upload_limit"]
55
56 return InstanceInfo(**config)
57
58
59class MastodonService(ABC, Service):
60 def verify_credentials(self):
61 token = self._get_token()
62 responce = requests.get(
63 f"{self.url}/api/v1/accounts/verify_credentials",
64 headers={"Authorization": f"Bearer {token}"},
65 )
66 if responce.status_code != 200:
67 LOGGER.error("Failed to validate user credentials!")
68 responce.raise_for_status()
69 return dict(responce.json())
70
71 def fetch_instance_info(self):
72 token = self._get_token()
73 responce = requests.get(
74 f"{self.url}/api/v1/instance",
75 headers={"Authorization": f"Bearer {token}"},
76 )
77 if responce.status_code != 200:
78 LOGGER.error("Failed to get instance info!")
79 responce.raise_for_status()
80 return dict(responce.json())
81
82 @abstractmethod
83 def _get_token(self) -> str:
84 pass