social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky

more work on mastodon

zenfyr.dev 33d25a2a dba88a76

verified
Changed files
+167 -26
cross
mastodon
+5 -10
cross/service.py
···
self.conn.close()
class OutputService(Service):
-
def __init__(self, url: str, db: Path) -> None:
-
super().__init__(url, db)
-
def accept_post(self, post: Post):
-
LOGGER.warning("NOT IMPLEMENTED, accept_post %s", post.id)
def delete_post(self, post_id: str):
-
LOGGER.warning("NOT IMPLEMENTED, delete_post %s", post_id)
def accept_repost(self, repost_id: str, reposted_id: str):
-
LOGGER.warning("NOT IMPLEMENTED, accept_repost %s", repost_id)
def delete_repost(self, repost_id: str):
-
LOGGER.warning("NOT IMPLEMENTED, delete_repost %s", repost_id)
class InputService(Service):
-
def __init__(self, url: str, db: Path) -> None:
-
super().__init__(url, db)
-
async def listen(self, outputs: list[OutputService], submitter: Callable[[Callable[[], None]], None]):
pass
···
self.conn.close()
class OutputService(Service):
def accept_post(self, post: Post):
+
LOGGER.warning("NOT IMPLEMENTED (%s), accept_post %s", self.url, post.id)
def delete_post(self, post_id: str):
+
LOGGER.warning("NOT IMPLEMENTED (%s), delete_post %s", self.url, post_id)
def accept_repost(self, repost_id: str, reposted_id: str):
+
LOGGER.warning("NOT IMPLEMENTED (%s), accept_repost %s of %s", self.url, repost_id, reposted_id)
def delete_repost(self, repost_id: str):
+
LOGGER.warning("NOT IMPLEMENTED (%s), delete_repost %s", self.url, repost_id)
class InputService(Service):
+
async def listen(self, outputs: list[OutputService], submitter: Callable[[Callable[[], None]], None]): # pyright: ignore[reportUnusedParameter]
pass
+79
mastodon/info.py
···
···
+
from abc import ABC, abstractmethod
+
from dataclasses import dataclass
+
from typing import Any
+
+
import requests
+
+
from cross.service import Service
+
from util.util import LOGGER
+
+
+
@dataclass(kw_only=True)
+
class InstanceInfo:
+
max_characters: int = 500
+
max_media_attachments: int = 4
+
characters_reserved_per_url: int = 23
+
+
image_size_limit: int = 16777216
+
video_size_limit: int = 103809024
+
+
@classmethod
+
def from_api(cls, data: dict[str, Any]) -> "InstanceInfo":
+
config: dict[str, Any] = {}
+
+
if "statuses" in data:
+
statuses_config: dict[str, Any] = data.get("statuses", {})
+
if "max_characters" in statuses_config:
+
config["max_characters"] = statuses_config["max_characters"]
+
if "max_media_attachments" in statuses_config:
+
config["max_media_attachments"] = statuses_config[
+
"max_media_attachments"
+
]
+
if "characters_reserved_per_url" in statuses_config:
+
config["characters_reserved_per_url"] = statuses_config[
+
"characters_reserved_per_url"
+
]
+
+
if "media_attachments" in data:
+
media_config: dict[str, Any] = data.get("media_attachments", {})
+
if "image_size_limit" in media_config:
+
config["image_size_limit"] = media_config["image_size_limit"]
+
if "video_size_limit" in media_config:
+
config["video_size_limit"] = media_config["video_size_limit"]
+
+
# *oma extensions
+
if "max_toot_chars" in data:
+
config["max_characters"] = data["max_toot_chars"]
+
if "upload_limit" in data:
+
config["image_size_limit"] = data["upload_limit"]
+
config["video_size_limit"] = data["upload_limit"]
+
+
return InstanceInfo(**config)
+
+
+
class MastodonService(ABC, Service):
+
def verify_credentials(self):
+
token = self._get_token()
+
responce = requests.get(
+
f"{self.url}/api/v1/accounts/verify_credentials",
+
headers={"Authorization": f"Bearer {token}"},
+
)
+
if responce.status_code != 200:
+
LOGGER.error("Failed to validate user credentials!")
+
responce.raise_for_status()
+
return dict(responce.json())
+
+
def fetch_instance_info(self):
+
token = self._get_token()
+
responce = requests.get(
+
f"{self.url}/api/v1/instance",
+
headers={"Authorization": f"Bearer {token}"},
+
)
+
if responce.status_code != 200:
+
LOGGER.error("Failed to get instance info!")
+
responce.raise_for_status()
+
return dict(responce.json())
+
+
@abstractmethod
+
def _get_token(self) -> str:
+
pass
+55 -10
mastodon/input.py
···
import re
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Callable, override
from cross.service import InputService, OutputService
from util.util import LOGGER
ALLOWED_VISIBILITY: list[str] = ["public", "unlisted"]
@dataclass(kw_only=True)
class MastodonInputOptions:
token: str
instance: str
-
allowed_visibility: list[str] = field(default_factory=lambda: ALLOWED_VISIBILITY.copy())
regex_filters: list[re.Pattern[str]] = field(default_factory=lambda: [])
@classmethod
-
def from_dict(cls, data: dict[str, Any]) -> 'MastodonInputOptions':
-
data['instance'] = data['instance'][:-1] if data['instance'].endswith("/") else data['instance']
-
if 'allowed_visibility' in data:
-
for vis in data.get('allowed_visibility', []):
if vis not in ALLOWED_VISIBILITY:
raise ValueError(f"Invalid visibility option {vis}!")
-
if 'regex_filters' in data:
-
data['regex_filters'] = [re.compile(r) for r in data['regex_filters']]
return MastodonInputOptions(**data)
-
class MastodonInputService(InputService):
def __init__(self, db: Path, options: MastodonInputOptions) -> None:
super().__init__(options.instance, db)
LOGGER.info("Verifying %s credentails...", self.url)
@override
-
async def listen(self, outputs: list[OutputService], submitter: Callable[[Callable[[], None]], None]):
-
return await super().listen(outputs, submitter) # TODO
···
+
import asyncio
import re
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Callable, override
+
+
import websockets
from cross.service import InputService, OutputService
+
from mastodon.info import MastodonService
from util.util import LOGGER
ALLOWED_VISIBILITY: list[str] = ["public", "unlisted"]
+
@dataclass(kw_only=True)
class MastodonInputOptions:
token: str
instance: str
+
allowed_visibility: list[str] = field(
+
default_factory=lambda: ALLOWED_VISIBILITY.copy()
+
)
regex_filters: list[re.Pattern[str]] = field(default_factory=lambda: [])
@classmethod
+
def from_dict(cls, data: dict[str, Any]) -> "MastodonInputOptions":
+
data["instance"] = (
+
data["instance"][:-1]
+
if data["instance"].endswith("/")
+
else data["instance"]
+
)
+
if "allowed_visibility" in data:
+
for vis in data.get("allowed_visibility", []):
if vis not in ALLOWED_VISIBILITY:
raise ValueError(f"Invalid visibility option {vis}!")
+
if "regex_filters" in data:
+
data["regex_filters"] = [re.compile(r) for r in data["regex_filters"]]
return MastodonInputOptions(**data)
+
+
class MastodonInputService(MastodonService, InputService):
def __init__(self, db: Path, options: MastodonInputOptions) -> None:
super().__init__(options.instance, db)
+
self.options: MastodonInputOptions = options
LOGGER.info("Verifying %s credentails...", self.url)
+
responce = self.verify_credentials()
+
self.user_id: str = responce["id"]
+
+
LOGGER.info("Getting %s configuration...", self.url)
+
responce = self.fetch_instance_info()
+
self.streaming_url: str = responce["urls"]["streaming_api"]
@override
+
def _get_token(self) -> str:
+
return self.options.token
+
+
@override
+
async def listen(
+
self,
+
outputs: list[OutputService],
+
submitter: Callable[[Callable[[], None]], None],
+
):
+
uri = f"{self.streaming_url}/api/v1/streaming?stream=user"
+
+
async for ws in websockets.connect(
+
uri, additional_headers={"Authorization": f"Bearer {self.options.token}"}
+
):
+
try:
+
LOGGER.info("Listening to %s...", self.streaming_url)
+
+
async def listen_for_messages():
+
async for msg in ws:
+
LOGGER.info(msg) # TODO
+
+
listen = asyncio.create_task(listen_for_messages())
+
+
_ = await asyncio.gather(listen)
+
except websockets.ConnectionClosedError as e:
+
LOGGER.error(e, stack_info=True, exc_info=True)
+
LOGGER.info("Reconnecting to %s...", self.streaming_url)
+
continue
+28 -6
mastodon/output.py
···
from dataclasses import dataclass
from pathlib import Path
-
from typing import Any
from cross.service import OutputService
ALLOWED_POSTING_VISIBILITY: list[str] = ["public", "unlisted", "private"]
@dataclass(kw_only=True)
class MastodonOutputOptions:
token: str
···
visibility: str = "public"
@classmethod
-
def from_dict(cls, data: dict[str, Any]) -> 'MastodonOutputOptions':
-
data['instance'] = data['instance'][:-1] if data['instance'].endswith("/") else data['instance']
-
if 'visibility' in data:
-
if data['visibility'] not in ALLOWED_POSTING_VISIBILITY:
raise ValueError(f"Invalid visibility option {data['visibility']}!")
return MastodonOutputOptions(**data)
# TODO
-
class MastodonOutputService(OutputService):
def __init__(self, db: Path, options: MastodonOutputOptions) -> None:
super().__init__(options.instance, db)
···
from dataclasses import dataclass
from pathlib import Path
+
from typing import Any, override
+
from cross.service import OutputService
+
from mastodon.info import InstanceInfo, MastodonService
+
from util.util import LOGGER
ALLOWED_POSTING_VISIBILITY: list[str] = ["public", "unlisted", "private"]
+
@dataclass(kw_only=True)
class MastodonOutputOptions:
token: str
···
visibility: str = "public"
@classmethod
+
def from_dict(cls, data: dict[str, Any]) -> "MastodonOutputOptions":
+
data["instance"] = (
+
data["instance"][:-1]
+
if data["instance"].endswith("/")
+
else data["instance"]
+
)
+
if "visibility" in data:
+
if data["visibility"] not in ALLOWED_POSTING_VISIBILITY:
raise ValueError(f"Invalid visibility option {data['visibility']}!")
return MastodonOutputOptions(**data)
+
# TODO
+
class MastodonOutputService(MastodonService, OutputService):
def __init__(self, db: Path, options: MastodonOutputOptions) -> None:
super().__init__(options.instance, db)
+
self.options: MastodonOutputOptions = options
+
+
LOGGER.info("Verifying %s credentails...", self.url)
+
responce = self.verify_credentials()
+
self.user_id: str = responce["id"]
+
+
LOGGER.info("Getting %s configuration...", self.url)
+
responce = self.fetch_instance_info()
+
self.instance_info = InstanceInfo.from_api(responce)
+
+
@override
+
def _get_token(self) -> str:
+
return self.options.token