social media crossposting tool. 3rd time's the charm
mastodon
misskey
crossposting
bluesky
1import asyncio
2import json
3import re
4from abc import ABC
5from dataclasses import dataclass, field
6from typing import Any, cast, override
7
8import websockets
9
10from atproto.util import AtUri
11from bluesky.facets import parse_facets
12from bluesky.info import SERVICE, BlueskyService, validate_and_transform
13from cross.attachments import (
14 LabelsAttachment,
15 LanguagesAttachment,
16 MediaAttachment,
17 QuoteAttachment,
18 RemoteUrlAttachment,
19)
20from cross.media import Blob, download_blob
21from cross.post import Post
22from cross.service import InputService
23from database.connection import DatabasePool
24from util.util import normalize_service_url
25
26
27@dataclass(kw_only=True)
28class BlueskyInputOptions:
29 handle: str | None = None
30 did: str | None = None
31 pds: str | None = None
32 filters: list[re.Pattern[str]] = field(default_factory=lambda: [])
33
34 @classmethod
35 def from_dict(cls, data: dict[str, Any]) -> "BlueskyInputOptions":
36 validate_and_transform(data)
37
38 if "filters" in data:
39 data["filters"] = [re.compile(r) for r in data["filters"]]
40
41 return BlueskyInputOptions(**data)
42
43
44@dataclass(kw_only=True)
45class BlueskyJetstreamInputOptions(BlueskyInputOptions):
46 jetstream: str = "wss://jetstream2.us-west.bsky.network/subscribe"
47
48 @classmethod
49 def from_dict(cls, data: dict[str, Any]) -> "BlueskyJetstreamInputOptions":
50 jetstream = data.pop("jetstream", None)
51
52 base = BlueskyInputOptions.from_dict(data).__dict__.copy()
53 if jetstream:
54 base["jetstream"] = normalize_service_url(jetstream)
55
56 return BlueskyJetstreamInputOptions(**base)
57
58
59class BlueskyBaseInputService(BlueskyService, InputService, ABC):
60 def __init__(self, db: DatabasePool) -> None:
61 super().__init__(SERVICE, db)
62
63 def _on_post(self, record: dict[str, Any]):
64 post_uri = cast(str, record["$xpost.strongRef"]["uri"])
65 post_cid = cast(str, record["$xpost.strongRef"]["cid"])
66
67 parent_uri = cast(
68 str, None if not record.get("reply") else record["reply"]["parent"]["uri"]
69 )
70 parent = None
71 if parent_uri:
72 parent = self._get_post(self.url, self.did, parent_uri)
73 if not parent:
74 self.log.info(
75 "Skipping %s, parent %s not found in db", post_uri, parent_uri
76 )
77 return
78
79 text, fragments = parse_facets(record["text"], record.get('facets'))
80 post = Post(id=post_uri, parent_id=parent_uri, text=text)
81 post.fragments.extend(fragments)
82
83 did, _, rid = AtUri.record_uri(post_uri)
84 post.attachments.put(
85 RemoteUrlAttachment(url=f"https://bsky.app/profile/{did}/post/{rid}")
86 )
87
88 embed: dict[str, Any] = record.get("embed", {})
89 blob_urls: list[tuple[str, str, str | None]] = []
90 def handle_embeds(embed: dict[str, Any]) -> str | None:
91 nonlocal blob_urls, post
92 match cast(str, embed["$type"]):
93 case "app.bsky.embed.record" | "app.bsky.embed.recordWithMedia":
94 rcrd = embed['record']['record'] if embed['record'].get('record') else embed['record']
95 did, collection, _ = AtUri.record_uri(rcrd["uri"])
96 if collection != "app.bsky.feed.post":
97 return f"Unhandled record collection {collection}"
98 if did != self.did:
99 return ""
100
101 rquote = self._get_post(self.url, did, rcrd["uri"])
102 if not rquote:
103 return f"Quote {rcrd["uri"]} not found in the db"
104 post.attachments.put(QuoteAttachment(quoted_id=rcrd["uri"], quoted_user=did))
105
106 if embed.get('media'):
107 return handle_embeds(embed["media"])
108 case "app.bsky.embed.images":
109 for image in embed["images"]:
110 blob_cid = image["image"]["ref"]["$link"]
111 url = f"{self.pds}/xrpc/com.atproto.sync.getBlob?did={self.did}&cid={blob_cid}"
112 blob_urls.append((url, blob_cid, image.get("alt")))
113 case "app.bsky.embed.video":
114 blob_cid = embed["video"]["ref"]["$link"]
115 url = f"{self.pds}/xrpc/com.atproto.sync.getBlob?did={self.did}&cid={blob_cid}"
116 blob_urls.append((url, blob_cid, embed.get("alt")))
117 case _:
118 self.log.warning(f"Unhandled embed type {embed['$type']}")
119
120 if embed:
121 fexit = handle_embeds(embed)
122 if fexit is not None:
123 self.log.info("Skipping %s! %s", post_uri, fexit)
124 return
125
126 if blob_urls:
127 blobs: list[Blob] = []
128 for url, cid, alt in blob_urls:
129 self.log.info("Downloading %s...", cid)
130 blob: Blob | None = download_blob(url, alt)
131 if not blob:
132 self.log.error(
133 "Skipping %s! Failed to download blob %s.", post_uri, cid
134 )
135 return
136 blobs.append(blob)
137 post.attachments.put(MediaAttachment(blobs=blobs))
138
139 if "langs" in record:
140 post.attachments.put(LanguagesAttachment(langs=record["langs"]))
141 if "labels" in record:
142 post.attachments.put(
143 LabelsAttachment(
144 labels=[
145 label["val"].replace("-", " ") for label in record["values"]
146 ]
147 ),
148 )
149
150 if parent:
151 self._insert_post(
152 {
153 "user": self.did,
154 "service": self.url,
155 "identifier": post_uri,
156 "parent": parent["id"],
157 "root": parent["id"] if not parent["root"] else parent["root"],
158 "extra_data": json.dumps({"cid": post_cid}),
159 }
160 )
161 else:
162 self._insert_post(
163 {
164 "user": self.did,
165 "service": self.url,
166 "identifier": post_uri,
167 "extra_data": json.dumps({"cid": post_cid}),
168 }
169 )
170
171 for out in self.outputs:
172 self.submitter(lambda: out.accept_post(post))
173
174 def _on_repost(self, record: dict[str, Any]):
175 post_uri = cast(str, record["$xpost.strongRef"]["uri"])
176 post_cid = cast(str, record["$xpost.strongRef"]["cid"])
177
178 reposted_uri = cast(str, record["subject"]["uri"])
179 reposted = self._get_post(self.url, self.did, reposted_uri)
180 if not reposted:
181 self.log.info(
182 "Skipping repost '%s' as reposted post '%s' was not found in the db.",
183 post_uri,
184 reposted_uri,
185 )
186 return
187
188 self._insert_post(
189 {
190 "user": self.did,
191 "service": self.url,
192 "identifier": post_uri,
193 "reposted": reposted["id"],
194 "extra_data": json.dumps({"cid": post_cid}),
195 }
196 )
197
198 for out in self.outputs:
199 self.submitter(lambda: out.accept_repost(post_uri, reposted_uri))
200
201 def _on_delete_post(self, post_id: str, repost: bool):
202 post = self._get_post(self.url, self.did, post_id)
203 if not post:
204 return
205
206 if repost:
207 for output in self.outputs:
208 self.submitter(lambda: output.delete_repost(post_id))
209 else:
210 for output in self.outputs:
211 self.submitter(lambda: output.delete_post(post_id))
212 self._delete_post_by_id(post["id"])
213
214
215class BlueskyJetstreamInputService(BlueskyBaseInputService):
216 def __init__(self, db: DatabasePool, options: BlueskyJetstreamInputOptions) -> None:
217 super().__init__(db)
218 self.options: BlueskyJetstreamInputOptions = options
219 self._init_identity()
220
221 @override
222 def get_identity_options(self) -> tuple[str | None, str | None, str | None]:
223 return (self.options.handle, self.options.did, self.options.pds)
224
225 def _accept_msg(self, msg: websockets.Data) -> None:
226 data: dict[str, Any] = cast(dict[str, Any], json.loads(msg))
227 if data.get("did") != self.did:
228 return
229 commit: dict[str, Any] | None = data.get("commit")
230 if not commit:
231 return
232
233 commit_type: str = cast(str, commit["operation"])
234 match commit_type:
235 case "create":
236 record: dict[str, Any] = cast(dict[str, Any], commit["record"])
237 record["$xpost.strongRef"] = {
238 "cid": commit["cid"],
239 "uri": f"at://{self.did}/{commit['collection']}/{commit['rkey']}",
240 }
241
242 match cast(str, commit["collection"]):
243 case "app.bsky.feed.post":
244 self._on_post(record)
245 case "app.bsky.feed.repost":
246 self._on_repost(record)
247 case _:
248 pass
249 case "delete":
250 post_id: str = (
251 f"at://{self.did}/{commit['collection']}/{commit['rkey']}"
252 )
253 match cast(str, commit["collection"]):
254 case "app.bsky.feed.post":
255 self._on_delete_post(post_id, False)
256 case "app.bsky.feed.repost":
257 self._on_delete_post(post_id, True)
258 case _:
259 pass
260 case _:
261 pass
262
263 @override
264 async def listen(self):
265 url = self.options.jetstream + "?"
266 url += "wantedCollections=app.bsky.feed.post"
267 url += "&wantedCollections=app.bsky.feed.repost"
268 url += f"&wantedDids={self.did}"
269
270 async for ws in websockets.connect(url):
271 try:
272 self.log.info("Listening to %s...", self.options.jetstream)
273
274 async def listen_for_messages():
275 async for msg in ws:
276 self.submitter(lambda: self._accept_msg(msg))
277
278 listen = asyncio.create_task(listen_for_messages())
279
280 _ = await asyncio.gather(listen)
281 except websockets.ConnectionClosedError as e:
282 self.log.error(e, stack_info=True, exc_info=True)
283 self.log.info("Reconnecting to %s...", self.options.jetstream)
284 continue