social media crossposting tool. 3rd time's the charm
mastodon
misskey
crossposting
bluesky
1import asyncio
2from dataclasses import dataclass, field
3import json
4from pathlib import Path
5import re
6from typing import Any, Callable, override
7import uuid
8
9import websockets
10
11from cross.service import InputService, OutputService
12from misskey.info import MisskeyService
13from util.util import LOGGER
14
15ALLOWED_VISIBILITY = ["public", "home"]
16
17
18@dataclass
19class MisskeyInputOptions:
20 token: str
21 instance: str
22 allowed_visibility: list[str] = field(
23 default_factory=lambda: ALLOWED_VISIBILITY.copy()
24 )
25 regex_filters: list[re.Pattern[str]] = field(default_factory=lambda: [])
26
27 @classmethod
28 def from_dict(cls, data: dict[str, Any]) -> "MisskeyInputOptions":
29 data["instance"] = (
30 data["instance"][:-1]
31 if data["instance"].endswith("/")
32 else data["instance"]
33 )
34
35 if "allowed_visibility" in data:
36 for vis in data.get("allowed_visibility", []):
37 if vis not in ALLOWED_VISIBILITY:
38 raise ValueError(f"Invalid visibility option {vis}!")
39
40 if "regex_filters" in data:
41 data["regex_filters"] = [re.compile(r) for r in data["regex_filters"]]
42
43 return MisskeyInputOptions(**data)
44
45
46class MisskeyInputService(MisskeyService, InputService):
47 def __init__(self, db: Path, options: MisskeyInputOptions) -> None:
48 super().__init__(options.instance, db)
49 self.options: MisskeyInputOptions = options
50
51 LOGGER.info("Verifying %s credentails...", self.url)
52 responce = self.verify_credentials()
53 self.user_id: str = responce["id"]
54
55 @override
56 def _get_token(self) -> str:
57 return self.options.token
58
59 async def _subscribe_to_home(self, ws: websockets.ClientConnection) -> None:
60 await ws.send(
61 json.dumps(
62 {
63 "type": "connect",
64 "body": {"channel": "homeTimeline", "id": str(uuid.uuid4())},
65 }
66 )
67 )
68 LOGGER.info("Subscribed to 'homeTimeline' channel...")
69
70 @override
71 async def listen(
72 self,
73 outputs: list[OutputService],
74 submitter: Callable[[Callable[[], None]], None],
75 ):
76 streaming: str = f"{'wss' if self.url.startswith('https') else 'ws'}://{self.url.split('://', 1)[1]}"
77 url: str = f"{streaming}/streaming?i={self.options.token}"
78
79 async for ws in websockets.connect(url):
80 try:
81 LOGGER.info("Listening to %s...", streaming)
82 await self._subscribe_to_home(ws)
83
84 async def listen_for_messages():
85 async for msg in ws:
86 LOGGER.info(msg) # TODO
87
88 #keepalive = asyncio.create_task(self._send_keepalive(ws))
89 listen = asyncio.create_task(listen_for_messages())
90
91 _ = await asyncio.gather(listen)
92 except websockets.ConnectionClosedError as e:
93 LOGGER.error(e, stack_info=True, exc_info=True)
94 LOGGER.info("Reconnecting to %s...", streaming)
95 continue