social media crossposting tool. 3rd time's the charm
mastodon
misskey
crossposting
bluesky
1import asyncio
2import json
3import re
4import uuid
5from dataclasses import dataclass, field
6from typing import Any, cast, override
7
8import websockets
9
10from cross.service import InputService
11from database.connection import DatabasePool
12from misskey.info import MisskeyService
13from util.util import normalize_service_url
14
15ALLOWED_VISIBILITY = ["public", "home"]
16
17
18@dataclass
19class MisskeyInputOptions:
20 token: str
21 instance: str
22 allowed_visibility: list[str] = field(
23 default_factory=lambda: ALLOWED_VISIBILITY.copy()
24 )
25 filters: list[re.Pattern[str]] = field(default_factory=lambda: [])
26
27 @classmethod
28 def from_dict(cls, data: dict[str, Any]) -> "MisskeyInputOptions":
29 data["instance"] = normalize_service_url(data["instance"])
30
31 if "allowed_visibility" in data:
32 for vis in data.get("allowed_visibility", []):
33 if vis not in ALLOWED_VISIBILITY:
34 raise ValueError(f"Invalid visibility option {vis}!")
35
36 if "filters" in data:
37 data["filters"] = [re.compile(r) for r in data["filters"]]
38
39 return MisskeyInputOptions(**data)
40
41
42class MisskeyInputService(MisskeyService, InputService):
43 def __init__(self, db: DatabasePool, options: MisskeyInputOptions) -> None:
44 super().__init__(options.instance, db)
45 self.options: MisskeyInputOptions = options
46
47 self.log.info("Verifying %s credentails...", self.url)
48 responce = self.verify_credentials()
49 self.user_id: str = responce["id"]
50
51 @override
52 def _get_token(self) -> str:
53 return self.options.token
54
55 def _on_note(self, note: dict[str, Any]):
56 self.log.info(note) # TODO
57
58 def _accept_msg(self, msg: websockets.Data) -> None:
59 data: dict[str, Any] = cast(dict[str, Any], json.loads(msg))
60
61 if data["type"] == "channel":
62 type: str = cast(str, data["body"]["type"])
63 if type == "note" or type == "reply":
64 note_body = data["body"]["body"]
65 self._on_note(note_body)
66 return
67
68 async def _subscribe_to_home(self, ws: websockets.ClientConnection) -> None:
69 await ws.send(
70 json.dumps(
71 {
72 "type": "connect",
73 "body": {"channel": "homeTimeline", "id": str(uuid.uuid4())},
74 }
75 )
76 )
77 self.log.info("Subscribed to 'homeTimeline' channel...")
78
79 @override
80 async def listen(self):
81 streaming: str = f"{'wss' if self.url.startswith('https') else 'ws'}://{self.url.split('://', 1)[1]}"
82 url: str = f"{streaming}/streaming?i={self.options.token}"
83
84 async for ws in websockets.connect(url):
85 try:
86 self.log.info("Listening to %s...", streaming)
87 await self._subscribe_to_home(ws)
88
89 async def listen_for_messages():
90 async for msg in ws:
91 self.submitter(lambda: self._accept_msg(msg))
92
93 listen = asyncio.create_task(listen_for_messages())
94
95 _ = await asyncio.gather(listen)
96 except websockets.ConnectionClosedError as e:
97 self.log.error(e, stack_info=True, exc_info=True)
98 self.log.info("Reconnecting to %s...", streaming)
99 continue