social media crossposting tool. 3rd time's the charm
mastodon
misskey
crossposting
bluesky
1import asyncio
2import re
3from dataclasses import dataclass, field
4from pathlib import Path
5from typing import Any, Callable, override
6
7import websockets
8
9from cross.service import InputService, OutputService
10from mastodon.info import MastodonService
11from util.util import LOGGER
12
13ALLOWED_VISIBILITY: list[str] = ["public", "unlisted"]
14
15
16@dataclass(kw_only=True)
17class MastodonInputOptions:
18 token: str
19 instance: str
20 allowed_visibility: list[str] = field(
21 default_factory=lambda: ALLOWED_VISIBILITY.copy()
22 )
23 regex_filters: list[re.Pattern[str]] = field(default_factory=lambda: [])
24
25 @classmethod
26 def from_dict(cls, data: dict[str, Any]) -> "MastodonInputOptions":
27 data["instance"] = (
28 data["instance"][:-1]
29 if data["instance"].endswith("/")
30 else data["instance"]
31 )
32
33 if "allowed_visibility" in data:
34 for vis in data.get("allowed_visibility", []):
35 if vis not in ALLOWED_VISIBILITY:
36 raise ValueError(f"Invalid visibility option {vis}!")
37
38 if "regex_filters" in data:
39 data["regex_filters"] = [re.compile(r) for r in data["regex_filters"]]
40
41 return MastodonInputOptions(**data)
42
43
44class MastodonInputService(MastodonService, InputService):
45 def __init__(self, db: Path, options: MastodonInputOptions) -> None:
46 super().__init__(options.instance, db)
47 self.options: MastodonInputOptions = options
48
49 LOGGER.info("Verifying %s credentails...", self.url)
50 responce = self.verify_credentials()
51 self.user_id: str = responce["id"]
52
53 LOGGER.info("Getting %s configuration...", self.url)
54 responce = self.fetch_instance_info()
55 self.streaming_url: str = responce["urls"]["streaming_api"]
56
57 @override
58 def _get_token(self) -> str:
59 return self.options.token
60
61 @override
62 async def listen(
63 self,
64 outputs: list[OutputService],
65 submitter: Callable[[Callable[[], None]], None],
66 ):
67 url = f"{self.streaming_url}/api/v1/streaming?stream=user"
68
69 async for ws in websockets.connect(
70 url, additional_headers={"Authorization": f"Bearer {self.options.token}"}
71 ):
72 try:
73 LOGGER.info("Listening to %s...", self.streaming_url)
74
75 async def listen_for_messages():
76 async for msg in ws:
77 LOGGER.info(msg) # TODO
78
79 listen = asyncio.create_task(listen_for_messages())
80
81 _ = await asyncio.gather(listen)
82 except websockets.ConnectionClosedError as e:
83 LOGGER.error(e, stack_info=True, exc_info=True)
84 LOGGER.info("Reconnecting to %s...", self.streaming_url)
85 continue