social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
1from atproto import client_utils, Client, AtUri, IdResolver 2from atproto_client import models 3import json 4import cross 5import database 6from database import DataBaseWorker 7import util 8import media_util 9from util import LOGGER 10 11# only for lexicon reference 12SERVICE = 'https://bsky.app' 13 14ADULT_LABEL = ["sexual content", "nsfw"] 15PORN_LABEL = ["porn", "yiff"] 16 17class BlueskyOutput(cross.Output): 18 def __init__(self, input: cross.Input, settings: dict, db: DataBaseWorker) -> None: 19 super().__init__(input, settings, db) 20 self.options = util.safe_get(settings, 'options', {}) 21 22 if not util.get_or_envvar(settings, 'app-password'): 23 raise Exception("Account app password not provided!") 24 25 resolver = IdResolver() 26 did: str | None = util.get_or_envvar(settings, 'did') 27 if not did: 28 if not util.get_or_envvar(settings, 'handle'): 29 raise Exception("ATP handle not specified!") 30 LOGGER.info("Resolving ATP identity for %s...", util.get_or_envvar(settings, 'handle')) 31 did = resolver.handle.resolve(util.get_or_envvar(settings, 'handle')) 32 if not did: 33 raise Exception("Failed to resolve DID!") 34 35 pds: str | None = util.get_or_envvar(settings, 'pds') 36 if not pds: 37 LOGGER.info("Resolving PDS from DID document...") 38 did_doc = resolver.did.resolve(did) 39 if not did_doc: 40 raise Exception("Failed to resolve DID doc for '%s'", did) 41 pds = did_doc.get_pds_endpoint() 42 if not pds: 43 raise Exception("Failed to resolve PDS!") 44 45 self.client = Client(pds) 46 self.client.login(did, util.get_or_envvar(settings, 'app-password')) 47 self.bsky = Bluesky(self.client) 48 49 def _find_parent(self, parent_id: str): 50 login = self.client.me 51 if not login: 52 raise Exception("Client not logged in!") 53 54 reply_data = database.find_post(self.db, parent_id, self.input.user_id, self.input.service) 55 assert reply_data, "reply_data requested, but doesn't exist in db (should've been skipped bt firehose)" 56 57 reply_mappings = [json.loads(data[0]) for data in database.find_mappings(self.db, reply_data['id'], SERVICE, login.did)] 58 if not reply_mappings: 59 LOGGER.error("Failed to find mappings for a post in the db!") 60 return None 61 62 reply_record = models.AppBskyFeedPost.CreateRecordResponse(uri=str(reply_mappings[-1]['uri']), cid=str(reply_mappings[-1]['cid'])) 63 root_record = models.AppBskyFeedPost.CreateRecordResponse(uri=str(reply_mappings[0]['uri']), cid=str(reply_mappings[0]['cid'])) 64 if reply_data['root_id']: 65 root_data = database.find_post_by_id(self.db, reply_data['root_id']) 66 assert root_data, "root_data requested but doesn't exist in db" 67 68 root_mappings = [json.loads(data[0]) for data in database.find_mappings(self.db, reply_data['root_id'], SERVICE, login.did)] 69 if not root_mappings: 70 LOGGER.error("Failed to find mappings for a post in the db!") 71 return None 72 root_record = models.AppBskyFeedPost.CreateRecordResponse(uri=str(root_mappings[0]['uri']), cid=str(root_mappings[0]['cid'])) 73 74 return ( 75 models.create_strong_ref(root_record), 76 models.create_strong_ref(reply_record), 77 reply_data['root_id'], 78 reply_data['id'] 79 ) 80 81 def _split_attachments(self, attachments: list[cross.MediaAttachment]): 82 sup_media: list[cross.MediaAttachment] = [] 83 unsup_media: list[cross.MediaAttachment] = [] 84 85 for attachment in attachments: 86 attachment_type = attachment.get_type() 87 if not attachment_type: 88 continue 89 90 if attachment_type in {'video', 'image'}: # TODO convert gifs to videos 91 sup_media.append(attachment) 92 else: 93 unsup_media.append(attachment) 94 95 return (sup_media, unsup_media) 96 97 def _split_media_per_post( 98 self, 99 tokens: list[client_utils.TextBuilder], 100 media: list[cross.MediaAttachment]): 101 102 posts: list[dict] = [{"tokens": tokens, "attachments": []} for tokens in tokens] 103 available_indices: list[int] = list(range(len(posts))) 104 105 current_image_post_idx: int | None = None 106 107 def make_blank_post() -> dict: 108 return { 109 "tokens": [client_utils.TextBuilder().text('')], 110 "attachments": [] 111 } 112 113 def pop_next_empty_index() -> int: 114 if available_indices: 115 return available_indices.pop(0) 116 else: 117 new_idx = len(posts) 118 posts.append(make_blank_post()) 119 return new_idx 120 121 for att in media: 122 if att.get_type() == 'video': 123 current_image_post_idx = None 124 idx = pop_next_empty_index() 125 posts[idx]["attachments"].append(att) 126 elif att.get_type() == 'image': 127 if ( 128 current_image_post_idx is not None 129 and len(posts[current_image_post_idx]["attachments"]) < 4 130 ): 131 posts[current_image_post_idx]["attachments"].append(att) 132 else: 133 idx = pop_next_empty_index() 134 posts[idx]["attachments"].append(att) 135 current_image_post_idx = idx 136 137 result: list[tuple[client_utils.TextBuilder, list[cross.MediaAttachment]]] = [] 138 for p in posts: 139 result.append((p["tokens"], p["attachments"])) 140 return result 141 142 def accept_post(self, post: cross.Post): 143 login = self.client.me 144 if not login: 145 raise Exception("Client not logged in!") 146 147 parent_id = post.get_parent_id() 148 149 # used for db insertion 150 new_root_id = None 151 new_parent_id = None 152 153 root_ref = None 154 reply_ref = None 155 if parent_id: 156 parents = self._find_parent(parent_id) 157 if not parents: 158 return 159 root_ref, reply_ref, new_root_id, new_parent_id = parents 160 161 tokens = post.get_tokens() 162 163 unique_labels: set[str] = set() 164 cw = post.get_cw() 165 if cw: 166 tokens.insert(0, cross.TextToken("CW: " + cw + "\n\n")) 167 unique_labels.add('graphic-media') 168 169 if any(tag in cw for tag in ADULT_LABEL): 170 unique_labels.add('sexual') 171 172 if any(tag in cw for tag in PORN_LABEL): 173 unique_labels.add('porn') 174 175 if post.is_sensitive(): 176 unique_labels.add('graphic-media') 177 178 labels = models.ComAtprotoLabelDefs.SelfLabels(values=[models.ComAtprotoLabelDefs.SelfLabel(val=label) for label in unique_labels]) 179 180 sup_media, unsup_media = self._split_attachments(post.get_attachments()) 181 182 if unsup_media: 183 if tokens: 184 tokens.append(cross.TextToken('\n')) 185 for i, attachment in enumerate(unsup_media): 186 tokens.append(cross.LinkToken( 187 attachment.get_url(), 188 f"[{media_util.get_filename_from_url(attachment.get_url())}]" 189 )) 190 tokens.append(cross.TextToken(' ')) 191 192 193 split_tokens: list[list[cross.Token]] = util.split_tokens(post.get_tokens(), 300) 194 post_text: list[client_utils.TextBuilder] = [] 195 196 # convert tokens into rich text. skip post if contains unsupported tokens 197 for block in split_tokens: 198 rich_text = tokens_to_richtext(block) 199 200 if not rich_text: 201 LOGGER.error("Skipping '%s' as it contains invalid rich text types!", post.get_id()) 202 return 203 post_text.append(rich_text) 204 205 if not post_text: 206 post_text = [client_utils.TextBuilder().text('')] 207 208 # download media first. increased RAM usage, but more reliable 209 for m in sup_media: 210 if not m.bytes: 211 if m.get_type() == 'image': 212 image_bytes = media_util.download_blob(m.get_url(), max_bytes=2_000_000) 213 if not image_bytes: 214 LOGGER.error("Skipping post_id '%s', failed to download attachment! File too large?", post.get_id()) 215 return 216 m.bytes = image_bytes 217 elif m.get_type() == 'video': 218 video_bytes = media_util.download_blob(m.get_url(), max_bytes=100_000_000) 219 if not video_bytes: 220 LOGGER.error("Skipping post_id '%s', failed to download attachment! File too large?", post.get_id()) 221 return 222 m.bytes = video_bytes 223 224 created_records: list[models.AppBskyFeedPost.CreateRecordResponse] = [] 225 baked_media = self._split_media_per_post(post_text, sup_media) 226 227 for text, attachments in baked_media: 228 if not attachments: 229 if reply_ref and root_ref: 230 new_post = self.bsky.send_post(text, reply_to=models.AppBskyFeedPost.ReplyRef( 231 parent=reply_ref, 232 root=root_ref 233 ), labels=labels) 234 else: 235 new_post = self.bsky.send_post(text, labels=labels) 236 root_ref = models.create_strong_ref(new_post) 237 238 self.bsky.create_gates(self.options, new_post.uri) 239 reply_ref = models.create_strong_ref(new_post) 240 created_records.append(new_post) 241 else: 242 # if a single post is an image - everything else is an image 243 if attachments[0].get_type() == 'image': 244 images: list[bytes] = [] 245 image_alts: list[str] = [] 246 image_aspect_ratios: list[models.AppBskyEmbedDefs.AspectRatio] = [] 247 248 for attachment in attachments: 249 assert attachment.bytes 250 image_io = media_util.compress_image(attachment.bytes, quality=100) 251 metadata = attachment.create_meta(image_io) 252 253 if len(image_io) > 1_000_000: 254 LOGGER.info("Compressing %s...", attachment.get_url()) 255 256 images.append(image_io) 257 image_alts.append(attachment.get_alt()) 258 image_aspect_ratios.append(models.AppBskyEmbedDefs.AspectRatio( 259 width=metadata.get_width(), 260 height=metadata.get_height() 261 )) 262 263 new_post = self.bsky.send_images( 264 text=post_text[0], 265 images=images, 266 image_alts=image_alts, 267 image_aspect_ratios=image_aspect_ratios, 268 reply_to= models.AppBskyFeedPost.ReplyRef( 269 parent=reply_ref, 270 root=root_ref 271 ) if root_ref and reply_ref else None, 272 labels=labels 273 ) 274 if not root_ref: 275 root_ref = models.create_strong_ref(new_post) 276 277 self.bsky.create_gates(self.options, new_post.uri) 278 reply_ref = models.create_strong_ref(new_post) 279 created_records.append(new_post) 280 else: # video is guarantedd to be one 281 video_data = attachments[0] 282 assert video_data.bytes 283 video_io = video_data.bytes 284 285 metadata = video_data.create_meta(video_io) 286 if metadata.get_duration() > 180: 287 LOGGER.info("Skipping post_id '%s', video attachment too long!", post.get_id()) 288 return 289 290 probe = media_util.probe_bytes(video_io) 291 format_name = probe['format']['format_name'] 292 if 'mp4' not in format_name.split(','): 293 LOGGER.error("Converting %s to mp4...", video_data.get_url()) 294 video_io = media_util.convert_to_mp4(video_io) 295 296 aspect_ratio = models.AppBskyEmbedDefs.AspectRatio( 297 width=metadata.get_width(), 298 height=metadata.get_height() 299 ) 300 301 new_post = self.bsky.send_video( 302 text=post_text[0], 303 video=video_io, 304 video_aspect_ratio=aspect_ratio, 305 video_alt=video_data.get_alt(), 306 reply_to= models.AppBskyFeedPost.ReplyRef( 307 parent=reply_ref, 308 root=root_ref 309 ) if root_ref and reply_ref else None, 310 labels=labels 311 ) 312 if not root_ref: 313 root_ref = models.create_strong_ref(new_post) 314 315 self.bsky.create_gates(self.options, new_post.uri) 316 reply_ref = models.create_strong_ref(new_post) 317 created_records.append(new_post) 318 319 db_post = database.find_post(self.db, post.get_id(), self.input.user_id, self.input.service) 320 assert db_post, "ghghghhhhh" 321 322 db_identifiers = [json.dumps(cr.model_dump(), sort_keys=True) for cr in created_records] 323 324 if new_root_id is None or new_parent_id is None: 325 new_root_id = database.insert_post( 326 self.db, 327 db_identifiers[0], 328 login.did, 329 SERVICE 330 ) 331 new_parent_id = new_root_id 332 database.insert_mapping(self.db, db_post['id'], new_parent_id) 333 db_identifiers = db_identifiers[1:] 334 335 for db_id in db_identifiers: 336 new_parent_id = database.insert_reply( 337 self.db, 338 db_id, 339 login.did, 340 SERVICE, 341 new_parent_id, 342 new_root_id 343 ) 344 database.insert_mapping(self.db, db_post['id'], new_parent_id) 345 346 def delete_post(self, identifier: str): 347 login = self.client.me 348 if not login: 349 raise Exception("Client not logged in!") 350 351 post = database.find_post(self.db, identifier, self.input.user_id, self.input.service) 352 if not post: 353 return 354 355 mappings = database.find_mappings(self.db, post['id'], SERVICE, login.did) 356 for mapping in mappings[::-1]: 357 self.client.delete_post(json.loads(mapping[0])['uri']) 358 database.delete_post(self.db, mapping[0], SERVICE, login.did) 359 360 361class Bluesky(): 362 def __init__(self, client: Client) -> None: 363 self.client = client 364 365 def send_video( 366 self, 367 text: str | client_utils.TextBuilder, 368 video: bytes, 369 video_alt: str | None = None, 370 video_aspect_ratio: models.AppBskyEmbedDefs.AspectRatio | None = None, 371 reply_to: models.AppBskyFeedPost.ReplyRef | None = None, 372 langs: list[str] | None = None, 373 facets: list[models.AppBskyRichtextFacet.Main] | None = None, 374 labels: models.ComAtprotoLabelDefs.SelfLabels | None = None 375 ) -> models.AppBskyFeedPost.CreateRecordResponse: 376 377 if video_alt is None: 378 video_alt = '' 379 380 upload = self.client.upload_blob(video) 381 382 return self.send_post( 383 text, 384 reply_to=reply_to, 385 embed=models.AppBskyEmbedVideo.Main(video=upload.blob, alt=video_alt, aspect_ratio=video_aspect_ratio), 386 langs=langs, 387 facets=facets, 388 labels=labels 389 ) 390 391 def send_images( 392 self, 393 text: str | client_utils.TextBuilder, 394 images: list[bytes], 395 image_alts: list[str] | None = None, 396 image_aspect_ratios: list[models.AppBskyEmbedDefs.AspectRatio] | None = None, 397 reply_to: models.AppBskyFeedPost.ReplyRef | None = None, 398 langs: list[str] | None = None, 399 facets: list[models.AppBskyRichtextFacet.Main] | None = None, 400 labels: models.ComAtprotoLabelDefs.SelfLabels | None = None 401 ) -> models.AppBskyFeedPost.CreateRecordResponse: 402 403 if image_alts is None: 404 image_alts = [''] * len(images) 405 else: 406 diff = len(images) - len(image_alts) 407 image_alts = image_alts + [''] * diff 408 409 if image_aspect_ratios is None: 410 aligned_image_aspect_ratios = [None] * len(images) 411 else: 412 diff = len(images) - len(image_aspect_ratios) 413 aligned_image_aspect_ratios = image_aspect_ratios + [None] * diff 414 415 uploads = [self.client.upload_blob(image) for image in images] 416 417 embed_images = [ 418 models.AppBskyEmbedImages.Image(alt=alt, image=upload.blob, aspect_ratio=aspect_ratio) 419 for alt, upload, aspect_ratio in zip(image_alts, uploads, aligned_image_aspect_ratios) 420 ] 421 422 return self.send_post( 423 text, 424 reply_to=reply_to, 425 embed=models.AppBskyEmbedImages.Main(images=embed_images), 426 langs=langs, 427 facets=facets, 428 labels=labels 429 ) 430 431 def send_post( 432 self, 433 text: str | client_utils.TextBuilder, 434 reply_to: models.AppBskyFeedPost.ReplyRef | None = None, 435 embed: 436 None | 437 models.AppBskyEmbedImages.Main | 438 models.AppBskyEmbedExternal.Main | 439 models.AppBskyEmbedRecord.Main | 440 models.AppBskyEmbedRecordWithMedia.Main | 441 models.AppBskyEmbedVideo.Main = None, 442 langs: list[str] | None = None, 443 facets: list[models.AppBskyRichtextFacet.Main] | None = None, 444 labels: models.ComAtprotoLabelDefs.SelfLabels | None = None 445 ) -> models.AppBskyFeedPost.CreateRecordResponse: 446 447 if isinstance(text, client_utils.TextBuilder): 448 facets = text.build_facets() 449 text = text.build_text() 450 451 repo = self.client.me and self.client.me.did 452 if not repo: 453 raise Exception("Client not logged in!") 454 455 if not langs: 456 langs = ['en'] 457 458 record = models.AppBskyFeedPost.Record( 459 created_at=self.client.get_current_time_iso(), 460 text=text, 461 reply=reply_to, 462 embed=embed, 463 langs=langs, 464 facets=facets, 465 labels=labels 466 ) 467 return self.client.app.bsky.feed.post.create(repo, record) 468 469 def create_gates(self, options: dict, post_uri: str): 470 account = self.client.me 471 if not account: 472 raise Exception("Client not logged in!") 473 474 rkey = AtUri.from_str(post_uri).rkey 475 time = self.client.get_current_time_iso() 476 477 thread_gate_opts = options.get('thread_gate', []) 478 if 'everybody' not in thread_gate_opts: 479 allow = [] 480 if thread_gate_opts: 481 if 'following' in thread_gate_opts: 482 allow.append(models.AppBskyFeedThreadgate.FollowingRule()) 483 if 'followers' in thread_gate_opts: 484 allow.append(models.AppBskyFeedThreadgate.FollowerRule()) 485 if 'mentioned' in thread_gate_opts: 486 allow.append(models.AppBskyFeedThreadgate.MentionRule()) 487 488 thread_gate = models.AppBskyFeedThreadgate.Record( 489 post=post_uri, 490 created_at=time, 491 allow=allow 492 ) 493 494 self.client.app.bsky.feed.threadgate.create(account.did, thread_gate, rkey) 495 496 if options.get('quote_gate', False): 497 post_gate = models.AppBskyFeedPostgate.Record( 498 post=post_uri, 499 created_at=time, 500 embedding_rules=[ 501 models.AppBskyFeedPostgate.DisableRule() 502 ] 503 ) 504 505 self.client.app.bsky.feed.postgate.create(account.did, post_gate, rkey) 506 507 508def tokens_to_richtext(tokens: list[cross.Token]) -> client_utils.TextBuilder | None: 509 builder = client_utils.TextBuilder() 510 511 def flatten_link(href: str): 512 split = href.split('://', 1) 513 if len(split) > 1: 514 href = split[1] 515 516 if len(href) > 32: 517 href = href[:32] + '...' 518 519 return href 520 521 for token in tokens: 522 if isinstance(token, cross.TextToken): 523 builder.text(token.text) 524 elif isinstance(token, cross.LinkToken): 525 if util.canonical_label(token.label, token.href): 526 builder.link(flatten_link(token.href), token.href) 527 continue 528 529 builder.link(token.label, token.href) 530 elif isinstance(token, cross.TagToken): 531 builder.tag('#' + token.tag, token.tag) 532 else: 533 # fail on unsupported tokens 534 return None 535 536 return builder