social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
1import asyncio 2import json 3import re 4import uuid 5from dataclasses import dataclass, field 6from typing import Any, cast, override 7 8import websockets 9 10from cross.service import InputService 11from database.connection import DatabasePool 12from misskey.info import MisskeyService 13from util.util import normalize_service_url 14 15ALLOWED_VISIBILITY = ["public", "home"] 16 17 18@dataclass 19class MisskeyInputOptions: 20 token: str 21 instance: str 22 allowed_visibility: list[str] = field( 23 default_factory=lambda: ALLOWED_VISIBILITY.copy() 24 ) 25 filters: list[re.Pattern[str]] = field(default_factory=lambda: []) 26 27 @classmethod 28 def from_dict(cls, data: dict[str, Any]) -> "MisskeyInputOptions": 29 data["instance"] = normalize_service_url(data["instance"]) 30 31 if "allowed_visibility" in data: 32 for vis in data.get("allowed_visibility", []): 33 if vis not in ALLOWED_VISIBILITY: 34 raise ValueError(f"Invalid visibility option {vis}!") 35 36 if "filters" in data: 37 data["filters"] = [re.compile(r) for r in data["filters"]] 38 39 return MisskeyInputOptions(**data) 40 41 42class MisskeyInputService(MisskeyService, InputService): 43 def __init__(self, db: DatabasePool, options: MisskeyInputOptions) -> None: 44 super().__init__(options.instance, db) 45 self.options: MisskeyInputOptions = options 46 47 self.log.info("Verifying %s credentails...", self.url) 48 responce = self.verify_credentials() 49 self.user_id: str = responce["id"] 50 51 @override 52 def _get_token(self) -> str: 53 return self.options.token 54 55 def _on_note(self, note: dict[str, Any]): 56 self.log.info(note) # TODO 57 58 def _accept_msg(self, msg: websockets.Data) -> None: 59 data: dict[str, Any] = cast(dict[str, Any], json.loads(msg)) 60 61 if data["type"] == "channel": 62 type: str = cast(str, data["body"]["type"]) 63 if type == "note" or type == "reply": 64 note_body = data["body"]["body"] 65 self._on_note(note_body) 66 return 67 68 async def _subscribe_to_home(self, ws: websockets.ClientConnection) -> None: 69 await ws.send( 70 json.dumps( 71 { 72 "type": "connect", 73 "body": {"channel": "homeTimeline", "id": str(uuid.uuid4())}, 74 } 75 ) 76 ) 77 self.log.info("Subscribed to 'homeTimeline' channel...") 78 79 @override 80 async def listen(self): 81 streaming: str = f"{'wss' if self.url.startswith('https') else 'ws'}://{self.url.split('://', 1)[1]}" 82 url: str = f"{streaming}/streaming?i={self.options.token}" 83 84 async for ws in websockets.connect(url): 85 try: 86 self.log.info("Listening to %s...", streaming) 87 await self._subscribe_to_home(ws) 88 89 async def listen_for_messages(): 90 async for msg in ws: 91 self.submitter(lambda: self._accept_msg(msg)) 92 93 listen = asyncio.create_task(listen_for_messages()) 94 95 _ = await asyncio.gather(listen) 96 except websockets.ConnectionClosedError as e: 97 self.log.error(e, stack_info=True, exc_info=True) 98 self.log.info("Reconnecting to %s...", streaming) 99 continue