social media crossposting tool. 3rd time's the charm
mastodon
misskey
crossposting
bluesky
1from abc import ABC
2from dataclasses import dataclass, field
3import re
4from typing import Any, Callable, override
5
6from bluesky.info import BlueskyService, validate_and_transform
7from cross.service import InputService, OutputService
8
9
10@dataclass(kw_only=True)
11class BlueskyInputOptions:
12 handle: str | None
13 did: str | None
14 pds: str | None
15 filters: list[re.Pattern[str]] = field(default_factory=lambda: [])
16
17 @classmethod
18 def from_dict(cls, data: dict[str, Any]) -> "BlueskyInputOptions":
19 validate_and_transform(data)
20
21 if "filters" in data:
22 data["filters"] = [re.compile(r) for r in data["filters"]]
23
24 return BlueskyInputOptions(**data)
25
26
27class BlueskyBaseInputService(BlueskyService, InputService, ABC):
28 pass
29
30
31class BlueskyJetstreamInputService(BlueskyBaseInputService):
32 @override
33 async def listen(
34 self,
35 outputs: list[OutputService],
36 submitter: Callable[[Callable[[], None]], None],
37 ):
38 return await super().listen(outputs, submitter) # TODO