social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
1import asyncio 2import re 3from dataclasses import dataclass, field 4from pathlib import Path 5from typing import Any, Callable, override 6 7import websockets 8 9from cross.service import InputService, OutputService 10from mastodon.info import MastodonService 11from util.util import LOGGER 12 13ALLOWED_VISIBILITY: list[str] = ["public", "unlisted"] 14 15 16@dataclass(kw_only=True) 17class MastodonInputOptions: 18 token: str 19 instance: str 20 allowed_visibility: list[str] = field( 21 default_factory=lambda: ALLOWED_VISIBILITY.copy() 22 ) 23 regex_filters: list[re.Pattern[str]] = field(default_factory=lambda: []) 24 25 @classmethod 26 def from_dict(cls, data: dict[str, Any]) -> "MastodonInputOptions": 27 data["instance"] = ( 28 data["instance"][:-1] 29 if data["instance"].endswith("/") 30 else data["instance"] 31 ) 32 33 if "allowed_visibility" in data: 34 for vis in data.get("allowed_visibility", []): 35 if vis not in ALLOWED_VISIBILITY: 36 raise ValueError(f"Invalid visibility option {vis}!") 37 38 if "regex_filters" in data: 39 data["regex_filters"] = [re.compile(r) for r in data["regex_filters"]] 40 41 return MastodonInputOptions(**data) 42 43 44class MastodonInputService(MastodonService, InputService): 45 def __init__(self, db: Path, options: MastodonInputOptions) -> None: 46 super().__init__(options.instance, db) 47 self.options: MastodonInputOptions = options 48 49 LOGGER.info("Verifying %s credentails...", self.url) 50 responce = self.verify_credentials() 51 self.user_id: str = responce["id"] 52 53 LOGGER.info("Getting %s configuration...", self.url) 54 responce = self.fetch_instance_info() 55 self.streaming_url: str = responce["urls"]["streaming_api"] 56 57 @override 58 def _get_token(self) -> str: 59 return self.options.token 60 61 @override 62 async def listen( 63 self, 64 outputs: list[OutputService], 65 submitter: Callable[[Callable[[], None]], None], 66 ): 67 url = f"{self.streaming_url}/api/v1/streaming?stream=user" 68 69 async for ws in websockets.connect( 70 url, additional_headers={"Authorization": f"Bearer {self.options.token}"} 71 ): 72 try: 73 LOGGER.info("Listening to %s...", self.streaming_url) 74 75 async def listen_for_messages(): 76 async for msg in ws: 77 LOGGER.info(msg) # TODO 78 79 listen = asyncio.create_task(listen_for_messages()) 80 81 _ = await asyncio.gather(listen) 82 except websockets.ConnectionClosedError as e: 83 LOGGER.error(e, stack_info=True, exc_info=True) 84 LOGGER.info("Reconnecting to %s...", self.streaming_url) 85 continue