social media crossposting tool. 3rd time's the charm
mastodon
misskey
crossposting
bluesky
1import asyncio
2import json
3import re
4from abc import ABC
5from dataclasses import dataclass, field
6from typing import Any, cast, override
7
8import websockets
9
10from bluesky.info import SERVICE, BlueskyService, validate_and_transform
11from cross.service import InputService
12from database.connection import DatabasePool
13from util.util import LOGGER, normalize_service_url
14
15
16@dataclass(kw_only=True)
17class BlueskyInputOptions:
18 handle: str | None = None
19 did: str | None = None
20 pds: str | None = None
21 filters: list[re.Pattern[str]] = field(default_factory=lambda: [])
22
23 @classmethod
24 def from_dict(cls, data: dict[str, Any]) -> "BlueskyInputOptions":
25 validate_and_transform(data)
26
27 if "filters" in data:
28 data["filters"] = [re.compile(r) for r in data["filters"]]
29
30 return BlueskyInputOptions(**data)
31
32
33@dataclass(kw_only=True)
34class BlueskyJetstreamInputOptions(BlueskyInputOptions):
35 jetstream: str = "wss://jetstream2.us-west.bsky.network/subscribe"
36
37 @classmethod
38 def from_dict(cls, data: dict[str, Any]) -> "BlueskyJetstreamInputOptions":
39 jetstream = data.pop("jetstream", None)
40
41 base = BlueskyInputOptions.from_dict(data).__dict__.copy()
42 if jetstream:
43 base["jetstream"] = normalize_service_url(jetstream)
44
45 return BlueskyJetstreamInputOptions(**base)
46
47
48class BlueskyBaseInputService(BlueskyService, InputService, ABC):
49 def __init__(self, db: DatabasePool) -> None:
50 super().__init__(SERVICE, db)
51
52 def _on_post(self, record: dict[str, Any]):
53 LOGGER.info(record) # TODO
54
55 def _on_repost(self, record: dict[str, Any]):
56 LOGGER.info(record) # TODO
57
58 def _on_delete_post(self, post_id: str, repost: bool):
59 LOGGER.info("%s | %s", post_id, repost) # TODO
60
61
62class BlueskyJetstreamInputService(BlueskyBaseInputService):
63 def __init__(self, db: DatabasePool, options: BlueskyJetstreamInputOptions) -> None:
64 super().__init__(db)
65 self.options: BlueskyJetstreamInputOptions = options
66 self._init_identity()
67
68 @override
69 def get_identity_options(self) -> tuple[str | None, str | None, str | None]:
70 return (self.options.handle, self.options.did, self.options.pds)
71
72 def _accept_msg(self, msg: websockets.Data) -> None:
73 data: dict[str, Any] = cast(dict[str, Any], json.loads(msg))
74 if data.get("did") != self.did:
75 return
76 commit: dict[str, Any] | None = data.get("commit")
77 if not commit:
78 return
79
80 commit_type: str = cast(str, commit["operation"])
81 match commit_type:
82 case "create":
83 record: dict[str, Any] = cast(dict[str, Any], commit["record"])
84 record["$xpost.strongRef"] = {
85 "cid": commit["cid"],
86 "uri": f"at://{self.did}/{commit['collection']}/{commit['rkey']}",
87 }
88
89 match cast(str, commit["collection"]):
90 case "app.bsky.feed.post":
91 self._on_post(record)
92 case "app.bsky.feed.repost":
93 self._on_repost(record)
94 case _:
95 pass
96 case "delete":
97 post_id: str = (
98 f"at://{self.did}/{commit['collection']}/{commit['rkey']}"
99 )
100 match cast(str, commit["collection"]):
101 case "app.bsky.feed.post":
102 self._on_delete_post(post_id, False)
103 case "app.bsky.feed.repost":
104 self._on_delete_post(post_id, True)
105 case _:
106 pass
107 case _:
108 pass
109
110 @override
111 async def listen(self):
112 url = self.options.jetstream + "?"
113 url += "wantedCollections=app.bsky.feed.post"
114 url += "&wantedCollections=app.bsky.feed.repost"
115 url += f"&wantedDids={self.did}"
116
117 async for ws in websockets.connect(url):
118 try:
119 LOGGER.info("Listening to %s...", self.options.jetstream)
120
121 async def listen_for_messages():
122 async for msg in ws:
123 self.submitter(lambda: self._accept_msg(msg))
124
125 listen = asyncio.create_task(listen_for_messages())
126
127 _ = await asyncio.gather(listen)
128 except websockets.ConnectionClosedError as e:
129 LOGGER.error(e, stack_info=True, exc_info=True)
130 LOGGER.info("Reconnecting to %s...", self.options.jetstream)
131 continue