social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
1import asyncio 2from dataclasses import dataclass, field 3import json 4from pathlib import Path 5import re 6from typing import Any, Callable, override 7import uuid 8 9import websockets 10 11from cross.service import InputService, OutputService 12from misskey.info import MisskeyService 13from util.util import LOGGER 14 15ALLOWED_VISIBILITY = ["public", "home"] 16 17 18@dataclass 19class MisskeyInputOptions: 20 token: str 21 instance: str 22 allowed_visibility: list[str] = field( 23 default_factory=lambda: ALLOWED_VISIBILITY.copy() 24 ) 25 regex_filters: list[re.Pattern[str]] = field(default_factory=lambda: []) 26 27 @classmethod 28 def from_dict(cls, data: dict[str, Any]) -> "MisskeyInputOptions": 29 data["instance"] = ( 30 data["instance"][:-1] 31 if data["instance"].endswith("/") 32 else data["instance"] 33 ) 34 35 if "allowed_visibility" in data: 36 for vis in data.get("allowed_visibility", []): 37 if vis not in ALLOWED_VISIBILITY: 38 raise ValueError(f"Invalid visibility option {vis}!") 39 40 if "regex_filters" in data: 41 data["regex_filters"] = [re.compile(r) for r in data["regex_filters"]] 42 43 return MisskeyInputOptions(**data) 44 45 46class MisskeyInputService(MisskeyService, InputService): 47 def __init__(self, db: Path, options: MisskeyInputOptions) -> None: 48 super().__init__(options.instance, db) 49 self.options: MisskeyInputOptions = options 50 51 LOGGER.info("Verifying %s credentails...", self.url) 52 responce = self.verify_credentials() 53 self.user_id: str = responce["id"] 54 55 @override 56 def _get_token(self) -> str: 57 return self.options.token 58 59 async def _subscribe_to_home(self, ws: websockets.ClientConnection) -> None: 60 await ws.send( 61 json.dumps( 62 { 63 "type": "connect", 64 "body": {"channel": "homeTimeline", "id": str(uuid.uuid4())}, 65 } 66 ) 67 ) 68 LOGGER.info("Subscribed to 'homeTimeline' channel...") 69 70 @override 71 async def listen( 72 self, 73 outputs: list[OutputService], 74 submitter: Callable[[Callable[[], None]], None], 75 ): 76 streaming: str = f"{'wss' if self.url.startswith('https') else 'ws'}://{self.url.split('://', 1)[1]}" 77 url: str = f"{streaming}/streaming?i={self.options.token}" 78 79 async for ws in websockets.connect(url): 80 try: 81 LOGGER.info("Listening to %s...", streaming) 82 await self._subscribe_to_home(ws) 83 84 async def listen_for_messages(): 85 async for msg in ws: 86 LOGGER.info(msg) # TODO 87 88 #keepalive = asyncio.create_task(self._send_keepalive(ws)) 89 listen = asyncio.create_task(listen_for_messages()) 90 91 _ = await asyncio.gather(listen) 92 except websockets.ConnectionClosedError as e: 93 LOGGER.error(e, stack_info=True, exc_info=True) 94 LOGGER.info("Reconnecting to %s...", streaming) 95 continue