social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
1import asyncio 2import json 3import re 4from abc import ABC 5from dataclasses import dataclass, field 6from typing import Any, cast, override 7 8import websockets 9 10from atproto.util import AtUri 11from bluesky.facets import parse_facets 12from bluesky.info import SERVICE, BlueskyService, validate_and_transform 13from cross.attachments import ( 14 LabelsAttachment, 15 LanguagesAttachment, 16 MediaAttachment, 17 QuoteAttachment, 18 RemoteUrlAttachment, 19) 20from cross.media import Blob, download_blob 21from cross.post import Post 22from cross.service import InputService 23from database.connection import DatabasePool 24from util.util import normalize_service_url 25 26 27@dataclass(kw_only=True) 28class BlueskyInputOptions: 29 handle: str | None = None 30 did: str | None = None 31 pds: str | None = None 32 filters: list[re.Pattern[str]] = field(default_factory=lambda: []) 33 34 @classmethod 35 def from_dict(cls, data: dict[str, Any]) -> "BlueskyInputOptions": 36 validate_and_transform(data) 37 38 if "filters" in data: 39 data["filters"] = [re.compile(r) for r in data["filters"]] 40 41 return BlueskyInputOptions(**data) 42 43 44@dataclass(kw_only=True) 45class BlueskyJetstreamInputOptions(BlueskyInputOptions): 46 jetstream: str = "wss://jetstream2.us-west.bsky.network/subscribe" 47 48 @classmethod 49 def from_dict(cls, data: dict[str, Any]) -> "BlueskyJetstreamInputOptions": 50 jetstream = data.pop("jetstream", None) 51 52 base = BlueskyInputOptions.from_dict(data).__dict__.copy() 53 if jetstream: 54 base["jetstream"] = normalize_service_url(jetstream) 55 56 return BlueskyJetstreamInputOptions(**base) 57 58 59class BlueskyBaseInputService(BlueskyService, InputService, ABC): 60 def __init__(self, db: DatabasePool) -> None: 61 super().__init__(SERVICE, db) 62 63 def _on_post(self, record: dict[str, Any]): 64 post_uri = cast(str, record["$xpost.strongRef"]["uri"]) 65 post_cid = cast(str, record["$xpost.strongRef"]["cid"]) 66 67 parent_uri = cast( 68 str, None if not record.get("reply") else record["reply"]["parent"]["uri"] 69 ) 70 parent = None 71 if parent_uri: 72 parent = self._get_post(self.url, self.did, parent_uri) 73 if not parent: 74 self.log.info( 75 "Skipping %s, parent %s not found in db", post_uri, parent_uri 76 ) 77 return 78 79 text, fragments = parse_facets(record["text"], record.get('facets')) 80 post = Post(id=post_uri, parent_id=parent_uri, text=text) 81 post.fragments.extend(fragments) 82 83 did, _, rid = AtUri.record_uri(post_uri) 84 post.attachments.put( 85 RemoteUrlAttachment(url=f"https://bsky.app/profile/{did}/post/{rid}") 86 ) 87 88 embed: dict[str, Any] = record.get("embed", {}) 89 blob_urls: list[tuple[str, str, str | None]] = [] 90 def handle_embeds(embed: dict[str, Any]) -> str | None: 91 nonlocal blob_urls, post 92 match cast(str, embed["$type"]): 93 case "app.bsky.embed.record" | "app.bsky.embed.recordWithMedia": 94 rcrd = embed['record']['record'] if embed['record'].get('record') else embed['record'] 95 did, collection, _ = AtUri.record_uri(rcrd["uri"]) 96 if collection != "app.bsky.feed.post": 97 return f"Unhandled record collection {collection}" 98 if did != self.did: 99 return "" 100 101 rquote = self._get_post(self.url, did, rcrd["uri"]) 102 if not rquote: 103 return f"Quote {rcrd["uri"]} not found in the db" 104 post.attachments.put(QuoteAttachment(quoted_id=rcrd["uri"], quoted_user=did)) 105 106 if embed.get('media'): 107 return handle_embeds(embed["media"]) 108 case "app.bsky.embed.images": 109 for image in embed["images"]: 110 blob_cid = image["image"]["ref"]["$link"] 111 url = f"{self.pds}/xrpc/com.atproto.sync.getBlob?did={self.did}&cid={blob_cid}" 112 blob_urls.append((url, blob_cid, image.get("alt"))) 113 case "app.bsky.embed.video": 114 blob_cid = embed["video"]["ref"]["$link"] 115 url = f"{self.pds}/xrpc/com.atproto.sync.getBlob?did={self.did}&cid={blob_cid}" 116 blob_urls.append((url, blob_cid, embed.get("alt"))) 117 case _: 118 self.log.warning(f"Unhandled embed type {embed['$type']}") 119 120 if embed: 121 fexit = handle_embeds(embed) 122 if fexit is not None: 123 self.log.info("Skipping %s! %s", post_uri, fexit) 124 return 125 126 if blob_urls: 127 blobs: list[Blob] = [] 128 for url, cid, alt in blob_urls: 129 self.log.info("Downloading %s...", cid) 130 blob: Blob | None = download_blob(url, alt) 131 if not blob: 132 self.log.error( 133 "Skipping %s! Failed to download blob %s.", post_uri, cid 134 ) 135 return 136 blobs.append(blob) 137 post.attachments.put(MediaAttachment(blobs=blobs)) 138 139 if "langs" in record: 140 post.attachments.put(LanguagesAttachment(langs=record["langs"])) 141 if "labels" in record: 142 post.attachments.put( 143 LabelsAttachment( 144 labels=[ 145 label["val"].replace("-", " ") for label in record["values"] 146 ] 147 ), 148 ) 149 150 if parent: 151 self._insert_post( 152 { 153 "user": self.did, 154 "service": self.url, 155 "identifier": post_uri, 156 "parent": parent["id"], 157 "root": parent["id"] if not parent["root"] else parent["root"], 158 "extra_data": json.dumps({"cid": post_cid}), 159 } 160 ) 161 else: 162 self._insert_post( 163 { 164 "user": self.did, 165 "service": self.url, 166 "identifier": post_uri, 167 "extra_data": json.dumps({"cid": post_cid}), 168 } 169 ) 170 171 for out in self.outputs: 172 self.submitter(lambda: out.accept_post(post)) 173 174 def _on_repost(self, record: dict[str, Any]): 175 post_uri = cast(str, record["$xpost.strongRef"]["uri"]) 176 post_cid = cast(str, record["$xpost.strongRef"]["cid"]) 177 178 reposted_uri = cast(str, record["subject"]["uri"]) 179 reposted = self._get_post(self.url, self.did, reposted_uri) 180 if not reposted: 181 self.log.info( 182 "Skipping repost '%s' as reposted post '%s' was not found in the db.", 183 post_uri, 184 reposted_uri, 185 ) 186 return 187 188 self._insert_post( 189 { 190 "user": self.did, 191 "service": self.url, 192 "identifier": post_uri, 193 "reposted": reposted["id"], 194 "extra_data": json.dumps({"cid": post_cid}), 195 } 196 ) 197 198 for out in self.outputs: 199 self.submitter(lambda: out.accept_repost(post_uri, reposted_uri)) 200 201 def _on_delete_post(self, post_id: str, repost: bool): 202 post = self._get_post(self.url, self.did, post_id) 203 if not post: 204 return 205 206 if repost: 207 for output in self.outputs: 208 self.submitter(lambda: output.delete_repost(post_id)) 209 else: 210 for output in self.outputs: 211 self.submitter(lambda: output.delete_post(post_id)) 212 self._delete_post_by_id(post["id"]) 213 214 215class BlueskyJetstreamInputService(BlueskyBaseInputService): 216 def __init__(self, db: DatabasePool, options: BlueskyJetstreamInputOptions) -> None: 217 super().__init__(db) 218 self.options: BlueskyJetstreamInputOptions = options 219 self._init_identity() 220 221 @override 222 def get_identity_options(self) -> tuple[str | None, str | None, str | None]: 223 return (self.options.handle, self.options.did, self.options.pds) 224 225 def _accept_msg(self, msg: websockets.Data) -> None: 226 data: dict[str, Any] = cast(dict[str, Any], json.loads(msg)) 227 if data.get("did") != self.did: 228 return 229 commit: dict[str, Any] | None = data.get("commit") 230 if not commit: 231 return 232 233 commit_type: str = cast(str, commit["operation"]) 234 match commit_type: 235 case "create": 236 record: dict[str, Any] = cast(dict[str, Any], commit["record"]) 237 record["$xpost.strongRef"] = { 238 "cid": commit["cid"], 239 "uri": f"at://{self.did}/{commit['collection']}/{commit['rkey']}", 240 } 241 242 match cast(str, commit["collection"]): 243 case "app.bsky.feed.post": 244 self._on_post(record) 245 case "app.bsky.feed.repost": 246 self._on_repost(record) 247 case _: 248 pass 249 case "delete": 250 post_id: str = ( 251 f"at://{self.did}/{commit['collection']}/{commit['rkey']}" 252 ) 253 match cast(str, commit["collection"]): 254 case "app.bsky.feed.post": 255 self._on_delete_post(post_id, False) 256 case "app.bsky.feed.repost": 257 self._on_delete_post(post_id, True) 258 case _: 259 pass 260 case _: 261 pass 262 263 @override 264 async def listen(self): 265 url = self.options.jetstream + "?" 266 url += "wantedCollections=app.bsky.feed.post" 267 url += "&wantedCollections=app.bsky.feed.repost" 268 url += f"&wantedDids={self.did}" 269 270 async for ws in websockets.connect(url): 271 try: 272 self.log.info("Listening to %s...", self.options.jetstream) 273 274 async def listen_for_messages(): 275 async for msg in ws: 276 self.submitter(lambda: self._accept_msg(msg)) 277 278 listen = asyncio.create_task(listen_for_messages()) 279 280 _ = await asyncio.gather(listen) 281 except websockets.ConnectionClosedError as e: 282 self.log.error(e, stack_info=True, exc_info=True) 283 self.log.info("Reconnecting to %s...", self.options.jetstream) 284 continue