social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
1import asyncio 2import json 3import re 4from abc import ABC 5from dataclasses import dataclass, field 6from typing import Any, cast, override 7 8import websockets 9 10from atproto.util import AtUri 11from bluesky.info import SERVICE, BlueskyService, validate_and_transform 12from cross.attachments import ( 13 LabelsAttachment, 14 LanguagesAttachment, 15 MediaAttachment, 16 RemoteUrlAttachment, 17) 18from cross.media import Blob, download_blob 19from cross.post import Post 20from cross.service import InputService 21from database.connection import DatabasePool 22from util.util import normalize_service_url 23 24 25@dataclass(kw_only=True) 26class BlueskyInputOptions: 27 handle: str | None = None 28 did: str | None = None 29 pds: str | None = None 30 filters: list[re.Pattern[str]] = field(default_factory=lambda: []) 31 32 @classmethod 33 def from_dict(cls, data: dict[str, Any]) -> "BlueskyInputOptions": 34 validate_and_transform(data) 35 36 if "filters" in data: 37 data["filters"] = [re.compile(r) for r in data["filters"]] 38 39 return BlueskyInputOptions(**data) 40 41 42@dataclass(kw_only=True) 43class BlueskyJetstreamInputOptions(BlueskyInputOptions): 44 jetstream: str = "wss://jetstream2.us-west.bsky.network/subscribe" 45 46 @classmethod 47 def from_dict(cls, data: dict[str, Any]) -> "BlueskyJetstreamInputOptions": 48 jetstream = data.pop("jetstream", None) 49 50 base = BlueskyInputOptions.from_dict(data).__dict__.copy() 51 if jetstream: 52 base["jetstream"] = normalize_service_url(jetstream) 53 54 return BlueskyJetstreamInputOptions(**base) 55 56 57class BlueskyBaseInputService(BlueskyService, InputService, ABC): 58 def __init__(self, db: DatabasePool) -> None: 59 super().__init__(SERVICE, db) 60 61 def _on_post(self, record: dict[str, Any]): 62 post_uri = cast(str, record["$xpost.strongRef"]["uri"]) 63 post_cid = cast(str, record["$xpost.strongRef"]["cid"]) 64 65 parent_uri = cast( 66 str, None if not record.get("reply") else record["reply"]["parent"]["uri"] 67 ) 68 parent = None 69 if parent_uri: 70 parent = self._get_post(self.url, self.did, parent_uri) 71 if not parent: 72 self.log.info( 73 "Skipping %s, parent %s not found in db", post_uri, parent_uri 74 ) 75 return 76 77 # TODO FRAGMENTS 78 post = Post(id=post_uri, parent_id=parent_uri, text=record["text"]) 79 did, _, rid = AtUri.record_uri(post_uri) 80 post.attachments.put( 81 RemoteUrlAttachment(url=f"https://bsky.app/profile/{did}/post/{rid}") 82 ) 83 84 embed = record.get("embed", {}) 85 if embed: 86 match cast(str, embed["$type"]): 87 case "app.bsky.embed.record" | "app.bsky.embed.recordWithMedia": 88 _, collection, _ = AtUri.record_uri( 89 cast(str, embed["record"]["uri"]) 90 ) 91 if collection == "app.bsky.feed.post": 92 self.log.info("Skipping '%s'! Quote..", post_uri) 93 return 94 case "app.bsky.embed.images": 95 blobs: list[Blob] = [] 96 for image in embed["images"]: 97 blob_cid = image["image"]["ref"]["$link"] 98 url = f"{self.pds}/xrpc/com.atproto.sync.getBlob?did={self.did}&cid={blob_cid}" 99 self.log.info("Downloading %s...", blob_cid) 100 blob: Blob | None = download_blob(url, image.get("alt")) 101 if not blob: 102 self.log.error( 103 "Skipping %s! Failed to download blob %s.", 104 post_uri, 105 blob_cid, 106 ) 107 return 108 blobs.append(blob) 109 post.attachments.put(MediaAttachment(blobs=blobs)) 110 case "app.bsky.embed.video": 111 blob_cid = embed["video"]["ref"]["$link"] 112 url = f"{self.pds}/xrpc/com.atproto.sync.getBlob?did={self.did}&cid={blob_cid}" 113 self.log.info("Downloading %s...", blob_cid) 114 blob: Blob | None = download_blob(url, embed.get("alt")) 115 if not blob: 116 self.log.error( 117 "Skipping %s! Failed to download blob %s.", 118 post_uri, 119 blob_cid, 120 ) 121 return 122 post.attachments.put(MediaAttachment(blobs=[blob])) 123 case _: 124 self.log.warning(f"Unhandled embedd type {embed['$type']}") 125 pass 126 127 if "langs" in record: 128 post.attachments.put(LanguagesAttachment(langs=record["langs"])) 129 if "labels" in record: 130 post.attachments.put( 131 LabelsAttachment( 132 labels=[ 133 label["val"].replace("-", " ") for label in record["values"] 134 ] 135 ), 136 ) 137 138 if parent: 139 self._insert_post( 140 { 141 "user": self.did, 142 "service": self.url, 143 "identifier": post_uri, 144 "parent": parent["id"], 145 "root": parent["id"] if not parent["root"] else parent["root"], 146 "extra_data": json.dumps({"cid": post_cid}), 147 } 148 ) 149 else: 150 self._insert_post( 151 { 152 "user": self.did, 153 "service": self.url, 154 "identifier": post_uri, 155 "extra_data": json.dumps({"cid": post_cid}), 156 } 157 ) 158 159 for out in self.outputs: 160 self.submitter(lambda: out.accept_post(post)) 161 162 def _on_repost(self, record: dict[str, Any]): 163 post_uri = cast(str, record["$xpost.strongRef"]["uri"]) 164 post_cid = cast(str, record["$xpost.strongRef"]["cid"]) 165 166 reposted_uri = cast(str, record["subject"]["uri"]) 167 reposted = self._get_post(self.url, self.did, reposted_uri) 168 if not reposted: 169 self.log.info( 170 "Skipping repost '%s' as reposted post '%s' was not found in the db.", 171 post_uri, 172 reposted_uri, 173 ) 174 return 175 176 self._insert_post( 177 { 178 "user": self.did, 179 "service": self.url, 180 "identifier": post_uri, 181 "reposted": reposted["id"], 182 "extra_data": json.dumps({"cid": post_cid}), 183 } 184 ) 185 186 for out in self.outputs: 187 self.submitter(lambda: out.accept_repost(post_uri, reposted_uri)) 188 189 def _on_delete_post(self, post_id: str, repost: bool): 190 post = self._get_post(self.url, self.did, post_id) 191 if not post: 192 return 193 194 if repost: 195 for output in self.outputs: 196 self.submitter(lambda: output.delete_repost(post_id)) 197 else: 198 for output in self.outputs: 199 self.submitter(lambda: output.delete_post(post_id)) 200 self._delete_post_by_id(post["id"]) 201 202 203class BlueskyJetstreamInputService(BlueskyBaseInputService): 204 def __init__(self, db: DatabasePool, options: BlueskyJetstreamInputOptions) -> None: 205 super().__init__(db) 206 self.options: BlueskyJetstreamInputOptions = options 207 self._init_identity() 208 209 @override 210 def get_identity_options(self) -> tuple[str | None, str | None, str | None]: 211 return (self.options.handle, self.options.did, self.options.pds) 212 213 def _accept_msg(self, msg: websockets.Data) -> None: 214 data: dict[str, Any] = cast(dict[str, Any], json.loads(msg)) 215 if data.get("did") != self.did: 216 return 217 commit: dict[str, Any] | None = data.get("commit") 218 if not commit: 219 return 220 221 commit_type: str = cast(str, commit["operation"]) 222 match commit_type: 223 case "create": 224 record: dict[str, Any] = cast(dict[str, Any], commit["record"]) 225 record["$xpost.strongRef"] = { 226 "cid": commit["cid"], 227 "uri": f"at://{self.did}/{commit['collection']}/{commit['rkey']}", 228 } 229 230 match cast(str, commit["collection"]): 231 case "app.bsky.feed.post": 232 self._on_post(record) 233 case "app.bsky.feed.repost": 234 self._on_repost(record) 235 case _: 236 pass 237 case "delete": 238 post_id: str = ( 239 f"at://{self.did}/{commit['collection']}/{commit['rkey']}" 240 ) 241 match cast(str, commit["collection"]): 242 case "app.bsky.feed.post": 243 self._on_delete_post(post_id, False) 244 case "app.bsky.feed.repost": 245 self._on_delete_post(post_id, True) 246 case _: 247 pass 248 case _: 249 pass 250 251 @override 252 async def listen(self): 253 url = self.options.jetstream + "?" 254 url += "wantedCollections=app.bsky.feed.post" 255 url += "&wantedCollections=app.bsky.feed.repost" 256 url += f"&wantedDids={self.did}" 257 258 async for ws in websockets.connect(url): 259 try: 260 self.log.info("Listening to %s...", self.options.jetstream) 261 262 async def listen_for_messages(): 263 async for msg in ws: 264 self.submitter(lambda: self._accept_msg(msg)) 265 266 listen = asyncio.create_task(listen_for_messages()) 267 268 _ = await asyncio.gather(listen) 269 except websockets.ConnectionClosedError as e: 270 self.log.error(e, stack_info=True, exc_info=True) 271 self.log.info("Reconnecting to %s...", self.options.jetstream) 272 continue