social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
1import asyncio 2import json 3import re 4from dataclasses import dataclass, field 5from typing import Any, cast, override 6 7import websockets 8 9from cross.service import InputService 10from database.connection import DatabasePool 11from mastodon.info import MastodonService, validate_and_transform 12 13ALLOWED_VISIBILITY: list[str] = ["public", "unlisted"] 14 15 16@dataclass(kw_only=True) 17class MastodonInputOptions: 18 token: str 19 instance: str 20 allowed_visibility: list[str] = field( 21 default_factory=lambda: ALLOWED_VISIBILITY.copy() 22 ) 23 filters: list[re.Pattern[str]] = field(default_factory=lambda: []) 24 25 @classmethod 26 def from_dict(cls, data: dict[str, Any]) -> "MastodonInputOptions": 27 validate_and_transform(data) 28 29 if "allowed_visibility" in data: 30 for vis in data.get("allowed_visibility", []): 31 if vis not in ALLOWED_VISIBILITY: 32 raise ValueError(f"Invalid visibility option {vis}!") 33 34 if "filters" in data: 35 data["filters"] = [re.compile(r) for r in data["filters"]] 36 37 return MastodonInputOptions(**data) 38 39 40class MastodonInputService(MastodonService, InputService): 41 def __init__(self, db: DatabasePool, options: MastodonInputOptions) -> None: 42 super().__init__(options.instance, db) 43 self.options: MastodonInputOptions = options 44 45 self.log.info("Verifying %s credentails...", self.url) 46 responce = self.verify_credentials() 47 self.user_id: str = responce["id"] 48 49 self.log.info("Getting %s configuration...", self.url) 50 responce = self.fetch_instance_info() 51 self.streaming_url: str = responce["urls"]["streaming_api"] 52 53 @override 54 def _get_token(self) -> str: 55 return self.options.token 56 57 def _on_create_post(self, status: dict[str, Any]): 58 self.log.info(status) # TODO 59 60 def _on_delete_post(self, status_id: str): 61 self.log.info(status_id) # TODO 62 63 def _accept_msg(self, msg: websockets.Data) -> None: 64 data: dict[str, Any] = cast(dict[str, Any], json.loads(msg)) 65 event: str = cast(str, data['event']) 66 payload: str = cast(str, data['payload']) 67 68 if event == "update": 69 self._on_create_post(json.loads(payload)) 70 elif event == "delete": 71 self._on_delete_post(payload) 72 73 @override 74 async def listen(self): 75 url = f"{self.streaming_url}/api/v1/streaming?stream=user" 76 77 async for ws in websockets.connect( 78 url, additional_headers={"Authorization": f"Bearer {self.options.token}"} 79 ): 80 try: 81 self.log.info("Listening to %s...", self.streaming_url) 82 83 async def listen_for_messages(): 84 async for msg in ws: 85 self.submitter(lambda: self._accept_msg(msg)) 86 87 listen = asyncio.create_task(listen_for_messages()) 88 89 _ = await asyncio.gather(listen) 90 except websockets.ConnectionClosedError as e: 91 self.log.error(e, stack_info=True, exc_info=True) 92 self.log.info("Reconnecting to %s...", self.streaming_url) 93 continue