social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
1import asyncio 2import re 3from dataclasses import dataclass, field 4from pathlib import Path 5from typing import Any, Callable, override 6 7import websockets 8 9from cross.service import InputService, OutputService 10from mastodon.info import MastodonService, validate_and_transform 11from util.util import LOGGER 12 13ALLOWED_VISIBILITY: list[str] = ["public", "unlisted"] 14 15 16@dataclass(kw_only=True) 17class MastodonInputOptions: 18 token: str 19 instance: str 20 allowed_visibility: list[str] = field( 21 default_factory=lambda: ALLOWED_VISIBILITY.copy() 22 ) 23 filters: list[re.Pattern[str]] = field(default_factory=lambda: []) 24 25 @classmethod 26 def from_dict(cls, data: dict[str, Any]) -> "MastodonInputOptions": 27 validate_and_transform(data) 28 29 if "allowed_visibility" in data: 30 for vis in data.get("allowed_visibility", []): 31 if vis not in ALLOWED_VISIBILITY: 32 raise ValueError(f"Invalid visibility option {vis}!") 33 34 if "filters" in data: 35 data["filters"] = [re.compile(r) for r in data["filters"]] 36 37 return MastodonInputOptions(**data) 38 39 40class MastodonInputService(MastodonService, InputService): 41 def __init__(self, db: Path, options: MastodonInputOptions) -> None: 42 super().__init__(options.instance, db) 43 self.options: MastodonInputOptions = options 44 45 LOGGER.info("Verifying %s credentails...", self.url) 46 responce = self.verify_credentials() 47 self.user_id: str = responce["id"] 48 49 LOGGER.info("Getting %s configuration...", self.url) 50 responce = self.fetch_instance_info() 51 self.streaming_url: str = responce["urls"]["streaming_api"] 52 53 @override 54 def _get_token(self) -> str: 55 return self.options.token 56 57 @override 58 async def listen( 59 self, 60 outputs: list[OutputService], 61 submitter: Callable[[Callable[[], None]], None], 62 ): 63 url = f"{self.streaming_url}/api/v1/streaming?stream=user" 64 65 async for ws in websockets.connect( 66 url, additional_headers={"Authorization": f"Bearer {self.options.token}"} 67 ): 68 try: 69 LOGGER.info("Listening to %s...", self.streaming_url) 70 71 async def listen_for_messages(): 72 async for msg in ws: 73 LOGGER.info(msg) # TODO 74 75 listen = asyncio.create_task(listen_for_messages()) 76 77 _ = await asyncio.gather(listen) 78 except websockets.ConnectionClosedError as e: 79 LOGGER.error(e, stack_info=True, exc_info=True) 80 LOGGER.info("Reconnecting to %s...", self.streaming_url) 81 continue