social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
1from abc import ABC 2from dataclasses import dataclass, field 3import re 4from typing import Any, Callable, override 5 6from bluesky.info import BlueskyService, validate_and_transform 7from cross.service import InputService, OutputService 8 9 10@dataclass(kw_only=True) 11class BlueskyInputOptions: 12 handle: str | None 13 did: str | None 14 pds: str | None 15 filters: list[re.Pattern[str]] = field(default_factory=lambda: []) 16 17 @classmethod 18 def from_dict(cls, data: dict[str, Any]) -> "BlueskyInputOptions": 19 validate_and_transform(data) 20 21 if "filters" in data: 22 data["filters"] = [re.compile(r) for r in data["filters"]] 23 24 return BlueskyInputOptions(**data) 25 26 27class BlueskyBaseInputService(BlueskyService, InputService, ABC): 28 pass 29 30 31class BlueskyJetstreamInputService(BlueskyBaseInputService): 32 @override 33 async def listen( 34 self, 35 outputs: list[OutputService], 36 submitter: Callable[[Callable[[], None]], None], 37 ): 38 return await super().listen(outputs, submitter) # TODO