social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
1from atproto import client_utils, Request, AsyncFirehoseSubscribeReposClient, CAR, CID 2from atproto_client import models 3from atproto_client.models.utils import get_or_create as get_model_or_create 4from atproto_client.models.blob_ref import BlobRef 5from atproto_firehose import models as firehose_models, parse_subscribe_repos_message as parse_firehose 6from atproto2 import Client2, resolve_identity 7from httpx import Timeout 8import json 9import cross 10import database 11from database import DataBaseWorker 12import util 13import media_util 14from util import LOGGER 15import re 16from typing import Callable, Any 17 18# only for lexicon reference 19SERVICE = 'https://bsky.app' 20 21# TODO this is terrible and stupid 22ADULT_PATTERN = re.compile(r"\b(sexual content|nsfw|erotic|adult only|18\+)\b", re.IGNORECASE) 23PORN_PATTERN = re.compile(r"\b(porn|yiff|hentai|pornographic|fetish)\b", re.IGNORECASE) 24 25def tokenize_post(post: dict) -> list[cross.Token]: 26 text: str = post.get('text', '') 27 if not text: 28 return [] 29 text = text.encode(encoding='utf-8').decode(encoding='utf-8') 30 31 facets: list[dict] = post.get('facets', []) 32 if not facets: 33 return [cross.TextToken(text)] 34 35 slices: list[tuple[int, int, str, str]] = [] 36 37 for facet in facets: 38 features: list[dict] = facet.get('features', []) 39 if not features: 40 continue 41 42 # we don't support overlapping facets/features 43 feature = features[0] 44 feature_type = feature['$type'] 45 index = facet['index'] 46 if feature_type == 'app.bsky.richtext.facet#tag': 47 slices.append((index['byteStart'], index['byteEnd'], 'tag', feature['tag'])) 48 elif feature_type == 'app.bsky.richtext.facet#link': 49 slices.append((index['byteStart'], index['byteEnd'], 'link', feature['uri'])) 50 elif feature_type == 'app.bsky.richtext.facet#mention': 51 slices.append((index['byteStart'], index['byteEnd'], 'mention', feature['did'])) 52 53 if not slices: 54 return [cross.TextToken(text)] 55 56 slices.sort(key=lambda s: s[0]) 57 unique: list[tuple[int, int, str, str]] = [] 58 current_end = 0 59 for start, end, ttype, val in slices: 60 if start >= current_end: 61 unique.append((start, end, ttype, val)) 62 current_end = end 63 64 if not unique: 65 return [cross.TextToken(text)] 66 67 tokens: list[cross.Token] = [] 68 prev = 0 69 70 for start, end, ttype, val in unique: 71 if start > prev: 72 # text between facets 73 tokens.append(cross.TextToken(text[prev:start])) 74 # facet token 75 if ttype == 'link': 76 label = text[start:end] 77 78 # try to unflatten links 79 split = val.split('://') 80 if len(split) > 1: 81 if split[1].startswith(label): 82 tokens.append(cross.LinkToken(val, '')) 83 elif label.endswith('...') and split[1].startswith(label[:-3]): 84 tokens.append(cross.LinkToken(val, '')) 85 else: 86 tokens.append(cross.LinkToken(val, label)) 87 elif ttype == 'tag': 88 tokens.append(cross.TagToken(val)) 89 elif ttype == 'mention': 90 tokens.append(cross.MentionToken(text[start:end], val)) 91 prev = end 92 93 if prev < len(text): 94 tokens.append(cross.TextToken(text[prev:])) 95 96 for t in tokens: 97 print(t.__dict__) 98 99 return tokens 100 101class BlueskyPost(cross.Post): 102 def __init__(self, pds_url: str, did: str, post: dict) -> None: 103 super().__init__() 104 self.post = post 105 self.tokens = tokenize_post(post) 106 107 self.id = json.dumps(self.post['$xpost.strongRef'], sort_keys=True) 108 109 self.parent_id = None 110 if self.post.get('reply'): 111 self.parent_id = json.dumps(self.post['reply']['parent'], sort_keys=True) 112 113 labels = self.post.get('labels', {}).get('values') 114 self.cw = '' 115 if labels: 116 self.cw = ', '.join([str(label['val']).replace('-', ' ') for label in labels]) 117 118 def get_blob_url(blob: str): 119 nonlocal pds_url, did 120 return f'{pds_url}/xrpc/com.atproto.sync.getBlob?did={did}&cid={blob}' 121 122 attachments: list[cross.MediaAttachment] = [] 123 embed = self.post.get('embed', {}) 124 if embed.get('$type') == 'app.bsky.embed.images': 125 model = get_model_or_create(embed, model=models.AppBskyEmbedImages.Main) 126 assert isinstance(model, models.AppBskyEmbedImages.Main) 127 128 for image in model.images: 129 attachments.append(BlueskyAttachment( 130 get_blob_url(image.image.cid.encode()), 131 'image', image.alt 132 )) 133 elif embed.get('$type') == 'app.bsky.embed.video': 134 model = get_model_or_create(embed, model=models.AppBskyEmbedVideo.Main) 135 assert isinstance(model, models.AppBskyEmbedVideo.Main) 136 137 attachments.append(BlueskyAttachment( 138 get_blob_url(model.video.cid.encode()), 139 'video', model.alt if model.alt else '' 140 )) 141 self.attachments = attachments 142 143 def get_tokens(self) -> list[cross.Token]: 144 return self.tokens 145 146 def get_parent_id(self) -> str | None: 147 return self.parent_id 148 149 def get_post_date_iso(self) -> str: 150 return self.post.get('createdAt') or super().get_post_date_iso() 151 152 def get_cw(self) -> str: 153 return self.cw or '' 154 155 def get_id(self) -> str: 156 return self.id 157 158 def get_languages(self) -> list[str]: 159 return self.post.get('langs', []) or [] 160 161 def is_sensitive(self) -> bool: 162 return self.post.get('labels', {}).get('values') or False 163 164 def get_attachments(self) -> list[cross.MediaAttachment]: 165 return self.attachments or [] 166 167class BlueskyAttachment(cross.MediaAttachment): 168 def __init__(self, url: str, type: str, alt: str) -> None: 169 super().__init__() 170 self.url = url 171 self.type = type 172 self.alt = alt 173 174 def get_url(self) -> str: 175 return self.url 176 177 def get_type(self) -> str | None: 178 return self.type 179 180 def create_meta(self, bytes: bytes) -> cross.MediaMeta: 181 o_meta = media_util.get_media_meta(bytes) 182 return cross.MediaMeta(o_meta['width'], o_meta['height'], o_meta.get('duration', -1)) 183 184 def get_alt(self) -> str: 185 return self.alt 186 187class BlueskyInput(cross.Input): 188 def __init__(self, settings: dict, db: DataBaseWorker) -> None: 189 self.options = settings.get('options', {}) 190 did, pds = resolve_identity( 191 handle=util.as_envvar(settings.get('hanlde')), 192 did=util.as_envvar(settings.get('did')), 193 pds=util.as_envvar(settings.get('pds')) 194 ) 195 self.pds = pds 196 197 # PDS is Not a service, the lexicon and rids are the same across pds 198 super().__init__(SERVICE, did, settings, db) 199 200 def _on_post(self, outputs: list[cross.Output], post: dict[str, Any]): 201 post_ref = json.dumps(post['$xpost.strongRef'], sort_keys=True) 202 203 parent_ref = None 204 if post.get('reply'): 205 parent_ref = json.dumps(post['reply']['parent'], sort_keys=True) 206 207 root_id = None 208 parent_id = None 209 if parent_ref: 210 parent_post = database.find_post(self.db, parent_ref, self.user_id, self.service) 211 if not parent_post: 212 LOGGER.info("Skipping '%s' as parent post was not found in db!", post_ref) 213 return 214 215 root_id = parent_post['id'] 216 parent_id = root_id 217 if parent_post['root_id']: 218 root_id = parent_post['root_id'] 219 220 LOGGER.info("Crossposting '%s'...", post_ref) 221 if root_id and parent_id: 222 database.insert_reply( 223 self.db, 224 post_ref, 225 self.user_id, 226 self.service, 227 parent_id, 228 root_id 229 ) 230 else: 231 database.insert_post( 232 self.db, 233 post_ref, 234 self.user_id, 235 self.service 236 ) 237 238 cross_post = BlueskyPost(self.pds, self.user_id, post) 239 for output in outputs: 240 output.accept_post(cross_post) 241 return 242 243 def _on_delete_post(self, outputs: list[cross.Output], post_id: dict): 244 identifier = json.dumps(post_id, sort_keys=True) 245 post = database.find_post(self.db, identifier, self.user_id, self.service) 246 if not post: 247 return 248 249 LOGGER.info("Deleting '%s'...", identifier) 250 for output in outputs: 251 output.delete_post(identifier) 252 database.delete_post(self.db, identifier, self.user_id, self.service) 253 254class BlueskyPdsInput(BlueskyInput): 255 def __init__(self, settings: dict, db: DataBaseWorker) -> None: 256 super().__init__(settings, db) 257 258 def __on_commit(self, outputs: list[cross.Output], message: firehose_models.MessageFrame): 259 blocks = message.body.get('blocks') 260 if not blocks: 261 return 262 263 parsed = parse_firehose(message) 264 if not isinstance(parsed, models.ComAtprotoSyncSubscribeRepos.Commit): 265 return 266 blocks = parsed.blocks 267 268 car = None 269 def get_lazy_repo() -> CAR: 270 nonlocal car, blocks 271 272 if isinstance(blocks, str): 273 blocks = blocks.encode() 274 assert blocks 275 276 if car: 277 return car 278 car = CAR.from_bytes(blocks) 279 return car 280 281 for op in parsed.ops: 282 if op.action == 'delete': 283 if not op.prev: 284 continue 285 286 if not op.path.startswith('app.bsky.feed.post'): 287 continue 288 289 self._on_delete_post(outputs, { 290 'cid': op.prev.encode(), 291 'uri': f'at://{parsed.repo}/{op.path}' 292 }) 293 continue 294 295 if op.action != 'create': 296 continue 297 298 if not op.cid: 299 continue 300 301 record_data = get_lazy_repo().blocks.get(op.cid) 302 if not record_data: 303 continue 304 305 record_dict = dict(record_data) 306 record_dict['$xpost.strongRef'] = { 307 'cid': op.cid.encode(), 308 'uri': f'at://{parsed.repo}/{op.path}' 309 } 310 if record_dict['$type'] == 'app.bsky.feed.post': 311 self._on_post(outputs, record_dict) 312 313 314 async def listen(self, outputs: list[cross.Output], submit: Callable[[Callable[[], Any]], Any]): 315 streaming: str = f"wss://{self.pds.split("://", 1)[1]}/xrpc" 316 317 client = AsyncFirehoseSubscribeReposClient(base_uri=streaming) 318 319 async def on_message(message: firehose_models.MessageFrame): 320 if message.header.t != '#commit': 321 return 322 323 if message.body.get('repo') != self.user_id: 324 return 325 326 if message.body.get('tooBig'): 327 LOGGER.error("#commit message is tooBig!") 328 return 329 330 submit(lambda: self.__on_commit(outputs, message)) 331 return 332 333 LOGGER.info("Listening to %s...", streaming + '/com.atproto.sync.subscribeRepos') 334 await client.start(on_message) 335 336class BlueskyOutput(cross.Output): 337 def __init__(self, input: cross.Input, settings: dict, db: DataBaseWorker) -> None: 338 super().__init__(input, settings, db) 339 self.options = settings.get('options') or {} 340 341 if not util.as_envvar(settings.get('app-password')): 342 raise Exception("Account app password not provided!") 343 344 did, pds = resolve_identity( 345 handle=util.as_envvar(settings.get('hanlde')), 346 did=util.as_envvar(settings.get('did')), 347 pds=util.as_envvar(settings.get('pds')) 348 ) 349 350 reqs = Request(timeout=Timeout(None, connect=30.0)) 351 352 self.bsky = Client2(pds, request=reqs) 353 self.bsky.login(did, util.as_envvar(settings.get('app-password'))) 354 355 def _find_parent(self, parent_id: str): 356 login = self.bsky.me 357 if not login: 358 raise Exception("Client not logged in!") 359 360 thread_tuple = database.find_mapped_thread( 361 self.db, 362 parent_id, 363 self.input.user_id, 364 self.input.service, 365 login.did, 366 SERVICE 367 ) 368 369 if not thread_tuple: 370 LOGGER.error("Failed to find thread tuple in the database!") 371 return None 372 373 root_ref = json.loads(thread_tuple[0]) 374 reply_ref = json.loads(thread_tuple[1]) 375 376 root_record = models.AppBskyFeedPost.CreateRecordResponse(uri=str(root_ref['uri']), cid=str(root_ref['cid'])) 377 reply_record = models.AppBskyFeedPost.CreateRecordResponse(uri=str(reply_ref['uri']), cid=str(reply_ref['cid'])) 378 379 return ( 380 models.create_strong_ref(root_record), 381 models.create_strong_ref(reply_record), 382 thread_tuple[2], 383 thread_tuple[3] 384 ) 385 386 def _split_attachments(self, attachments: list[cross.MediaAttachment]): 387 sup_media: list[cross.MediaAttachment] = [] 388 unsup_media: list[cross.MediaAttachment] = [] 389 390 for attachment in attachments: 391 attachment_type = attachment.get_type() 392 if not attachment_type: 393 continue 394 395 if attachment_type in {'video', 'image'}: # TODO convert gifs to videos 396 sup_media.append(attachment) 397 else: 398 unsup_media.append(attachment) 399 400 return (sup_media, unsup_media) 401 402 def _split_media_per_post( 403 self, 404 tokens: list[client_utils.TextBuilder], 405 media: list[cross.MediaAttachment]): 406 407 posts: list[dict] = [{"tokens": tokens, "attachments": []} for tokens in tokens] 408 available_indices: list[int] = list(range(len(posts))) 409 410 current_image_post_idx: int | None = None 411 412 def make_blank_post() -> dict: 413 return { 414 "tokens": [client_utils.TextBuilder().text('')], 415 "attachments": [] 416 } 417 418 def pop_next_empty_index() -> int: 419 if available_indices: 420 return available_indices.pop(0) 421 else: 422 new_idx = len(posts) 423 posts.append(make_blank_post()) 424 return new_idx 425 426 for att in media: 427 if att.get_type() == 'video': 428 current_image_post_idx = None 429 idx = pop_next_empty_index() 430 posts[idx]["attachments"].append(att) 431 elif att.get_type() == 'image': 432 if ( 433 current_image_post_idx is not None 434 and len(posts[current_image_post_idx]["attachments"]) < 4 435 ): 436 posts[current_image_post_idx]["attachments"].append(att) 437 else: 438 idx = pop_next_empty_index() 439 posts[idx]["attachments"].append(att) 440 current_image_post_idx = idx 441 442 result: list[tuple[client_utils.TextBuilder, list[cross.MediaAttachment]]] = [] 443 for p in posts: 444 result.append((p["tokens"], p["attachments"])) 445 return result 446 447 def accept_post(self, post: cross.Post): 448 login = self.bsky.me 449 if not login: 450 raise Exception("Client not logged in!") 451 452 parent_id = post.get_parent_id() 453 454 # used for db insertion 455 new_root_id = None 456 new_parent_id = None 457 458 root_ref = None 459 reply_ref = None 460 if parent_id: 461 parents = self._find_parent(parent_id) 462 if not parents: 463 return 464 root_ref, reply_ref, new_root_id, new_parent_id = parents 465 466 tokens = post.get_tokens().copy() 467 468 unique_labels: set[str] = set() 469 cw = post.get_cw() 470 if cw: 471 tokens.insert(0, cross.TextToken("CW: " + cw + "\n\n")) 472 unique_labels.add('graphic-media') 473 474 if ADULT_PATTERN.search(cw): 475 unique_labels.add('sexual') 476 477 if PORN_PATTERN.search(cw): 478 unique_labels.add('porn') 479 480 if post.is_sensitive(): 481 unique_labels.add('graphic-media') 482 483 labels = models.ComAtprotoLabelDefs.SelfLabels(values=[models.ComAtprotoLabelDefs.SelfLabel(val=label) for label in unique_labels]) 484 485 sup_media, unsup_media = self._split_attachments(post.get_attachments()) 486 487 if unsup_media: 488 if tokens: 489 tokens.append(cross.TextToken('\n')) 490 for i, attachment in enumerate(unsup_media): 491 tokens.append(cross.LinkToken( 492 attachment.get_url(), 493 f"[{media_util.get_filename_from_url(attachment.get_url())}]" 494 )) 495 tokens.append(cross.TextToken(' ')) 496 497 498 split_tokens: list[list[cross.Token]] = util.split_tokens(tokens, 300) 499 post_text: list[client_utils.TextBuilder] = [] 500 501 # convert tokens into rich text. skip post if contains unsupported tokens 502 for block in split_tokens: 503 rich_text = tokens_to_richtext(block) 504 505 if not rich_text: 506 LOGGER.error("Skipping '%s' as it contains invalid rich text types!", post.get_id()) 507 return 508 post_text.append(rich_text) 509 510 if not post_text: 511 post_text = [client_utils.TextBuilder().text('')] 512 513 # download media first. increased RAM usage, but more reliable 514 for m in sup_media: 515 if not m.bytes: 516 if m.get_type() == 'image': 517 image_bytes = media_util.download_blob(m.get_url(), max_bytes=2_000_000) 518 if not image_bytes: 519 LOGGER.error("Skipping post_id '%s', failed to download attachment! File too large?", post.get_id()) 520 return 521 m.bytes = image_bytes 522 elif m.get_type() == 'video': 523 video_bytes = media_util.download_blob(m.get_url(), max_bytes=100_000_000) 524 if not video_bytes: 525 LOGGER.error("Skipping post_id '%s', failed to download attachment! File too large?", post.get_id()) 526 return 527 m.bytes = video_bytes 528 529 created_records: list[models.AppBskyFeedPost.CreateRecordResponse] = [] 530 baked_media = self._split_media_per_post(post_text, sup_media) 531 532 for text, attachments in baked_media: 533 if not attachments: 534 if reply_ref and root_ref: 535 new_post = self.bsky.send_post(text, reply_to=models.AppBskyFeedPost.ReplyRef( 536 parent=reply_ref, 537 root=root_ref 538 ), labels=labels, time_iso=post.get_post_date_iso()) 539 else: 540 new_post = self.bsky.send_post(text, labels=labels, time_iso=post.get_post_date_iso()) 541 root_ref = models.create_strong_ref(new_post) 542 543 self.bsky.create_gates(self.options, new_post.uri, time_iso=post.get_post_date_iso()) 544 reply_ref = models.create_strong_ref(new_post) 545 created_records.append(new_post) 546 else: 547 # if a single post is an image - everything else is an image 548 if attachments[0].get_type() == 'image': 549 images: list[bytes] = [] 550 image_alts: list[str] = [] 551 image_aspect_ratios: list[models.AppBskyEmbedDefs.AspectRatio] = [] 552 553 for attachment in attachments: 554 assert attachment.bytes 555 image_io = media_util.compress_image(attachment.bytes, quality=100) 556 metadata = attachment.create_meta(image_io) 557 558 if len(image_io) > 1_000_000: 559 LOGGER.info("Compressing %s...", attachment.get_url()) 560 561 images.append(image_io) 562 image_alts.append(attachment.get_alt()) 563 image_aspect_ratios.append(models.AppBskyEmbedDefs.AspectRatio( 564 width=metadata.get_width(), 565 height=metadata.get_height() 566 )) 567 568 new_post = self.bsky.send_images( 569 text=post_text[0], 570 images=images, 571 image_alts=image_alts, 572 image_aspect_ratios=image_aspect_ratios, 573 reply_to= models.AppBskyFeedPost.ReplyRef( 574 parent=reply_ref, 575 root=root_ref 576 ) if root_ref and reply_ref else None, 577 labels=labels, 578 time_iso=post.get_post_date_iso() 579 ) 580 if not root_ref: 581 root_ref = models.create_strong_ref(new_post) 582 583 self.bsky.create_gates(self.options, new_post.uri, time_iso=post.get_post_date_iso()) 584 reply_ref = models.create_strong_ref(new_post) 585 created_records.append(new_post) 586 else: # video is guarantedd to be one 587 video_data = attachments[0] 588 assert video_data.bytes 589 video_io = video_data.bytes 590 591 metadata = video_data.create_meta(video_io) 592 if metadata.get_duration() > 180: 593 LOGGER.info("Skipping post_id '%s', video attachment too long!", post.get_id()) 594 return 595 596 probe = media_util.probe_bytes(video_io) 597 format_name = probe['format']['format_name'] 598 if 'mp4' not in format_name.split(','): 599 LOGGER.error("Converting %s to mp4...", video_data.get_url()) 600 video_io = media_util.convert_to_mp4(video_io) 601 602 aspect_ratio = models.AppBskyEmbedDefs.AspectRatio( 603 width=metadata.get_width(), 604 height=metadata.get_height() 605 ) 606 607 new_post = self.bsky.send_video( 608 text=post_text[0], 609 video=video_io, 610 video_aspect_ratio=aspect_ratio, 611 video_alt=video_data.get_alt(), 612 reply_to= models.AppBskyFeedPost.ReplyRef( 613 parent=reply_ref, 614 root=root_ref 615 ) if root_ref and reply_ref else None, 616 labels=labels, 617 time_iso=post.get_post_date_iso() 618 ) 619 if not root_ref: 620 root_ref = models.create_strong_ref(new_post) 621 622 self.bsky.create_gates(self.options, new_post.uri, time_iso=post.get_post_date_iso()) 623 reply_ref = models.create_strong_ref(new_post) 624 created_records.append(new_post) 625 626 db_post = database.find_post(self.db, post.get_id(), self.input.user_id, self.input.service) 627 assert db_post, "ghghghhhhh" 628 629 db_identifiers = [json.dumps(cr.model_dump(), sort_keys=True) for cr in created_records] 630 631 if new_root_id is None or new_parent_id is None: 632 new_root_id = database.insert_post( 633 self.db, 634 db_identifiers[0], 635 login.did, 636 SERVICE 637 ) 638 new_parent_id = new_root_id 639 database.insert_mapping(self.db, db_post['id'], new_parent_id) 640 db_identifiers = db_identifiers[1:] 641 642 for db_id in db_identifiers: 643 new_parent_id = database.insert_reply( 644 self.db, 645 db_id, 646 login.did, 647 SERVICE, 648 new_parent_id, 649 new_root_id 650 ) 651 database.insert_mapping(self.db, db_post['id'], new_parent_id) 652 653 def delete_post(self, identifier: str): 654 login = self.bsky.me 655 if not login: 656 raise Exception("Client not logged in!") 657 658 post = database.find_post(self.db, identifier, self.input.user_id, self.input.service) 659 if not post: 660 return 661 662 mappings = database.find_mappings(self.db, post['id'], SERVICE, login.did) 663 for mapping in mappings[::-1]: 664 LOGGER.info("Deleting '%s'...", mapping[0]) 665 self.bsky.delete_post(json.loads(mapping[0])['uri']) 666 database.delete_post(self.db, mapping[0], SERVICE, login.did) 667 668 669def tokens_to_richtext(tokens: list[cross.Token]) -> client_utils.TextBuilder | None: 670 builder = client_utils.TextBuilder() 671 672 def flatten_link(href: str): 673 split = href.split('://', 1) 674 if len(split) > 1: 675 href = split[1] 676 677 if len(href) > 32: 678 href = href[:32] + '...' 679 680 return href 681 682 for token in tokens: 683 if isinstance(token, cross.TextToken): 684 builder.text(token.text) 685 elif isinstance(token, cross.LinkToken): 686 if util.canonical_label(token.label, token.href): 687 builder.link(flatten_link(token.href), token.href) 688 continue 689 690 builder.link(token.label, token.href) 691 elif isinstance(token, cross.TagToken): 692 builder.tag('#' + token.tag, token.tag) 693 else: 694 # fail on unsupported tokens 695 return None 696 697 return builder