social media crossposting tool. 3rd time's the charm
mastodon
misskey
crossposting
bluesky
1import asyncio
2import re
3from dataclasses import dataclass, field
4from pathlib import Path
5from typing import Any, Callable, override
6
7import websockets
8
9from cross.service import InputService, OutputService
10from mastodon.info import MastodonService, validate_and_transform
11from util.util import LOGGER
12
13ALLOWED_VISIBILITY: list[str] = ["public", "unlisted"]
14
15
16@dataclass(kw_only=True)
17class MastodonInputOptions:
18 token: str
19 instance: str
20 allowed_visibility: list[str] = field(
21 default_factory=lambda: ALLOWED_VISIBILITY.copy()
22 )
23 filters: list[re.Pattern[str]] = field(default_factory=lambda: [])
24
25 @classmethod
26 def from_dict(cls, data: dict[str, Any]) -> "MastodonInputOptions":
27 validate_and_transform(data)
28
29 if "allowed_visibility" in data:
30 for vis in data.get("allowed_visibility", []):
31 if vis not in ALLOWED_VISIBILITY:
32 raise ValueError(f"Invalid visibility option {vis}!")
33
34 if "filters" in data:
35 data["filters"] = [re.compile(r) for r in data["filters"]]
36
37 return MastodonInputOptions(**data)
38
39
40class MastodonInputService(MastodonService, InputService):
41 def __init__(self, db: Path, options: MastodonInputOptions) -> None:
42 super().__init__(options.instance, db)
43 self.options: MastodonInputOptions = options
44
45 LOGGER.info("Verifying %s credentails...", self.url)
46 responce = self.verify_credentials()
47 self.user_id: str = responce["id"]
48
49 LOGGER.info("Getting %s configuration...", self.url)
50 responce = self.fetch_instance_info()
51 self.streaming_url: str = responce["urls"]["streaming_api"]
52
53 @override
54 def _get_token(self) -> str:
55 return self.options.token
56
57 @override
58 async def listen(
59 self,
60 outputs: list[OutputService],
61 submitter: Callable[[Callable[[], None]], None],
62 ):
63 url = f"{self.streaming_url}/api/v1/streaming?stream=user"
64
65 async for ws in websockets.connect(
66 url, additional_headers={"Authorization": f"Bearer {self.options.token}"}
67 ):
68 try:
69 LOGGER.info("Listening to %s...", self.streaming_url)
70
71 async def listen_for_messages():
72 async for msg in ws:
73 LOGGER.info(msg) # TODO
74
75 listen = asyncio.create_task(listen_for_messages())
76
77 _ = await asyncio.gather(listen)
78 except websockets.ConnectionClosedError as e:
79 LOGGER.error(e, stack_info=True, exc_info=True)
80 LOGGER.info("Reconnecting to %s...", self.streaming_url)
81 continue