social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
1from abc import ABC, abstractmethod 2from dataclasses import dataclass 3from typing import Any 4 5import requests 6 7from cross.service import Service 8from util.util import normalize_service_url 9 10def validate_and_transform(data: dict[str, Any]): 11 if 'token' not in data or 'instance' not in data: 12 raise KeyError("Missing required values 'token' or 'instance'") 13 14 data["instance"] = normalize_service_url(data["instance"]) 15 16@dataclass(kw_only=True) 17class InstanceInfo: 18 max_characters: int = 500 19 max_media_attachments: int = 4 20 characters_reserved_per_url: int = 23 21 22 image_size_limit: int = 16777216 23 video_size_limit: int = 103809024 24 25 @classmethod 26 def from_api(cls, data: dict[str, Any]) -> "InstanceInfo": 27 config: dict[str, Any] = {} 28 29 if "statuses" in data: 30 statuses_config: dict[str, Any] = data.get("statuses", {}) 31 if "max_characters" in statuses_config: 32 config["max_characters"] = statuses_config["max_characters"] 33 if "max_media_attachments" in statuses_config: 34 config["max_media_attachments"] = statuses_config[ 35 "max_media_attachments" 36 ] 37 if "characters_reserved_per_url" in statuses_config: 38 config["characters_reserved_per_url"] = statuses_config[ 39 "characters_reserved_per_url" 40 ] 41 42 if "media_attachments" in data: 43 media_config: dict[str, Any] = data.get("media_attachments", {}) 44 if "image_size_limit" in media_config: 45 config["image_size_limit"] = media_config["image_size_limit"] 46 if "video_size_limit" in media_config: 47 config["video_size_limit"] = media_config["video_size_limit"] 48 49 # *oma extensions 50 if "max_toot_chars" in data: 51 config["max_characters"] = data["max_toot_chars"] 52 if "upload_limit" in data: 53 config["image_size_limit"] = data["upload_limit"] 54 config["video_size_limit"] = data["upload_limit"] 55 56 return InstanceInfo(**config) 57 58 59class MastodonService(ABC, Service): 60 def verify_credentials(self): 61 token = self._get_token() 62 responce = requests.get( 63 f"{self.url}/api/v1/accounts/verify_credentials", 64 headers={"Authorization": f"Bearer {token}"}, 65 ) 66 if responce.status_code != 200: 67 self.log.error("Failed to validate user credentials!") 68 responce.raise_for_status() 69 return dict(responce.json()) 70 71 def fetch_instance_info(self): 72 token = self._get_token() 73 responce = requests.get( 74 f"{self.url}/api/v1/instance", 75 headers={"Authorization": f"Bearer {token}"}, 76 ) 77 if responce.status_code != 200: 78 self.log.error("Failed to get instance info!") 79 responce.raise_for_status() 80 return dict(responce.json()) 81 82 @abstractmethod 83 def _get_token(self) -> str: 84 pass