social media crossposting tool. 3rd time's the charm
mastodon
misskey
crossposting
bluesky
1import asyncio
2import re
3from abc import ABC
4from dataclasses import dataclass, field
5from typing import Any, Callable, override
6
7import websockets
8
9from bluesky.info import SERVICE, BlueskyService, validate_and_transform
10from cross.service import InputService, OutputService
11from database.connection import DatabasePool
12from util.util import LOGGER, normalize_service_url
13
14
15@dataclass(kw_only=True)
16class BlueskyInputOptions:
17 handle: str | None = None
18 did: str | None = None
19 pds: str | None = None
20 filters: list[re.Pattern[str]] = field(default_factory=lambda: [])
21
22 @classmethod
23 def from_dict(cls, data: dict[str, Any]) -> "BlueskyInputOptions":
24 validate_and_transform(data)
25
26 if "filters" in data:
27 data["filters"] = [re.compile(r) for r in data["filters"]]
28
29 return BlueskyInputOptions(**data)
30
31
32@dataclass(kw_only=True)
33class BlueskyJetstreamInputOptions(BlueskyInputOptions):
34 jetstream: str = "wss://jetstream2.us-west.bsky.network/subscribe"
35
36 @classmethod
37 def from_dict(cls, data: dict[str, Any]) -> "BlueskyJetstreamInputOptions":
38 jetstream = data.pop("jetstream", None)
39
40 base = BlueskyInputOptions.from_dict(data).__dict__.copy()
41 if jetstream:
42 base["jetstream"] = normalize_service_url(jetstream)
43
44 return BlueskyJetstreamInputOptions(**base)
45
46
47class BlueskyBaseInputService(BlueskyService, InputService, ABC):
48 def __init__(self, db: DatabasePool) -> None:
49 super().__init__(SERVICE, db)
50
51
52class BlueskyJetstreamInputService(BlueskyBaseInputService):
53 def __init__(self, db: DatabasePool, options: BlueskyJetstreamInputOptions) -> None:
54 super().__init__(db)
55 self.options: BlueskyJetstreamInputOptions = options
56 self._init_identity()
57
58 @override
59 def get_identity_options(self) -> tuple[str | None, str | None, str | None]:
60 return (self.options.handle, self.options.did, self.options.pds)
61
62 @override
63 async def listen(
64 self,
65 outputs: list[OutputService],
66 submitter: Callable[[Callable[[], None]], None],
67 ):
68 url = self.options.jetstream + "?"
69 url += "wantedCollections=app.bsky.feed.post"
70 url += "&wantedCollections=app.bsky.feed.repost"
71 url += f"&wantedDids={self.did}"
72
73 async for ws in websockets.connect(url):
74 try:
75 LOGGER.info("Listening to %s...", self.options.jetstream)
76
77 async def listen_for_messages():
78 async for msg in ws:
79 LOGGER.info(msg) # TODO
80
81 listen = asyncio.create_task(listen_for_messages())
82
83 _ = await asyncio.gather(listen)
84 except websockets.ConnectionClosedError as e:
85 LOGGER.error(e, stack_info=True, exc_info=True)
86 LOGGER.info("Reconnecting to %s...", self.options.jetstream)
87 continue