social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
1from atproto import client_utils, Request, AsyncFirehoseSubscribeReposClient, CAR, CID 2from atproto_client import models 3from atproto_client.models.utils import get_or_create as get_model_or_create 4from atproto_client.models.blob_ref import BlobRef 5from atproto_firehose import models as firehose_models, parse_subscribe_repos_message as parse_firehose 6from atproto2 import Client2, resolve_identity 7from httpx import Timeout 8import json 9import cross 10import database 11from database import DataBaseWorker 12import util 13import media_util 14from util import LOGGER 15import re 16from typing import Callable, Any 17 18# only for lexicon reference 19SERVICE = 'https://bsky.app' 20 21# TODO this is terrible and stupid 22ADULT_PATTERN = re.compile(r"\b(sexual content|nsfw|erotic|adult only|18\+)\b", re.IGNORECASE) 23PORN_PATTERN = re.compile(r"\b(porn|yiff|hentai|pornographic|fetish)\b", re.IGNORECASE) 24 25def tokenize_post(post: dict) -> list[cross.Token]: 26 text: str = post.get('text', '') 27 if not text: 28 return [] 29 text = text.encode(encoding='utf-8').decode(encoding='utf-8') 30 31 facets: list[dict] = post.get('facets', []) 32 if not facets: 33 return [cross.TextToken(text)] 34 35 slices: list[tuple[int, int, str, str]] = [] 36 37 for facet in facets: 38 features: list[dict] = facet.get('features', []) 39 if not features: 40 continue 41 42 # we don't support overlapping facets/features 43 feature = features[0] 44 feature_type = feature['$type'] 45 index = facet['index'] 46 if feature_type == 'app.bsky.richtext.facet#tag': 47 slices.append((index['byteStart'], index['byteEnd'], 'tag', feature['tag'])) 48 elif feature_type == 'app.bsky.richtext.facet#link': 49 slices.append((index['byteStart'], index['byteEnd'], 'link', feature['uri'])) 50 elif feature_type == 'app.bsky.richtext.facet#mention': 51 slices.append((index['byteStart'], index['byteEnd'], 'mention', feature['did'])) 52 53 if not slices: 54 return [cross.TextToken(text)] 55 56 slices.sort(key=lambda s: s[0]) 57 unique: list[tuple[int, int, str, str]] = [] 58 current_end = 0 59 for start, end, ttype, val in slices: 60 if start >= current_end: 61 unique.append((start, end, ttype, val)) 62 current_end = end 63 64 if not unique: 65 return [cross.TextToken(text)] 66 67 tokens: list[cross.Token] = [] 68 prev = 0 69 70 for start, end, ttype, val in unique: 71 if start > prev: 72 # text between facets 73 tokens.append(cross.TextToken(text[prev:start])) 74 # facet token 75 if ttype == 'link': 76 label = text[start:end] 77 78 # try to unflatten links 79 split = val.split('://') 80 if len(split) > 1: 81 if split[1].startswith(label): 82 tokens.append(cross.LinkToken(val, '')) 83 elif label.endswith('...') and split[1].startswith(label[:-3]): 84 tokens.append(cross.LinkToken(val, '')) 85 else: 86 tokens.append(cross.LinkToken(val, label)) 87 elif ttype == 'tag': 88 tokens.append(cross.TagToken(val)) 89 elif ttype == 'mention': 90 tokens.append(cross.MentionToken(text[start:end], val)) 91 prev = end 92 93 if prev < len(text): 94 tokens.append(cross.TextToken(text[prev:])) 95 96 for t in tokens: 97 print(t.__dict__) 98 99 return tokens 100 101class BlueskyPost(cross.Post): 102 def __init__(self, post: dict, tokens: list[cross.Token], attachments: list[media_util.MediaInfo]) -> None: 103 super().__init__() 104 self.post = post 105 self.tokens = tokens 106 107 self.id = json.dumps(self.post['$xpost.strongRef'], sort_keys=True) 108 109 self.parent_id = None 110 if self.post.get('reply'): 111 self.parent_id = json.dumps(self.post['reply']['parent'], sort_keys=True) 112 113 labels = self.post.get('labels', {}).get('values') 114 self.cw = '' 115 if labels: 116 self.cw = ', '.join([str(label['val']).replace('-', ' ') for label in labels]) 117 self.attachments = attachments 118 119 def get_tokens(self) -> list[cross.Token]: 120 return self.tokens 121 122 def get_parent_id(self) -> str | None: 123 return self.parent_id 124 125 def get_post_date_iso(self) -> str: 126 return self.post.get('createdAt') or super().get_post_date_iso() 127 128 def get_cw(self) -> str: 129 return self.cw or '' 130 131 def get_id(self) -> str: 132 return self.id 133 134 def get_languages(self) -> list[str]: 135 return self.post.get('langs', []) or [] 136 137 def is_sensitive(self) -> bool: 138 return self.post.get('labels', {}).get('values') or False 139 140 def get_attachments(self) -> list[media_util.MediaInfo]: 141 return self.attachments 142 143class BlueskyInputOptions(): 144 def __init__(self, o: dict) -> None: 145 self.filters = [re.compile(f) for f in o.get('regex_filters', [])] 146 147class BlueskyInput(cross.Input): 148 def __init__(self, settings: dict, db: DataBaseWorker) -> None: 149 self.options = BlueskyInputOptions(settings.get('options', {})) 150 did, pds = resolve_identity( 151 handle=util.as_envvar(settings.get('handle')), 152 did=util.as_envvar(settings.get('did')), 153 pds=util.as_envvar(settings.get('pds')) 154 ) 155 self.pds = pds 156 157 # PDS is Not a service, the lexicon and rids are the same across pds 158 super().__init__(SERVICE, did, settings, db) 159 160 def _on_post(self, outputs: list[cross.Output], post: dict[str, Any]): 161 post_ref = json.dumps(post['$xpost.strongRef'], sort_keys=True) 162 163 parent_ref = None 164 if post.get('reply'): 165 parent_ref = json.dumps(post['reply']['parent'], sort_keys=True) 166 167 success = database.try_insert_post(self.db, post_ref, parent_ref, self.user_id, self.service) 168 if not success: 169 LOGGER.info("Skipping '%s' as parent post was not found in db!", post_ref) 170 return 171 172 tokens = tokenize_post(post) 173 if not cross.test_filters(tokens, self.options.filters): 174 LOGGER.info("Skipping '%s'. Matched a filter!", post_ref) 175 return 176 177 LOGGER.info("Crossposting '%s'...", post_ref) 178 179 def get_blob_url(blob: str): 180 return f'{self.pds}/xrpc/com.atproto.sync.getBlob?did={self.user_id}&cid={blob}' 181 182 attachments: list[media_util.MediaInfo] = [] 183 embed = post.get('embed', {}) 184 if embed.get('$type') == 'app.bsky.embed.images': 185 model = get_model_or_create(embed, model=models.AppBskyEmbedImages.Main) 186 assert isinstance(model, models.AppBskyEmbedImages.Main) 187 188 for image in model.images: 189 url = get_blob_url(image.image.cid.encode()) 190 LOGGER.info("Downloading %s...", url) 191 io = media_util.download_media(url, image.alt) 192 if not io: 193 LOGGER.error("Skipping '%s'. Failed to download media!", post_ref) 194 return 195 attachments.append(io) 196 elif embed.get('$type') == 'app.bsky.embed.video': 197 model = get_model_or_create(embed, model=models.AppBskyEmbedVideo.Main) 198 assert isinstance(model, models.AppBskyEmbedVideo.Main) 199 url = get_blob_url(model.video.cid.encode()) 200 LOGGER.info("Downloading %s...", url) 201 io = media_util.download_media(url, model.alt if model.alt else '') 202 if not io: 203 LOGGER.error("Skipping '%s'. Failed to download media!", post_ref) 204 return 205 attachments.append(io) 206 207 cross_post = BlueskyPost(post, tokens, attachments) 208 for output in outputs: 209 output.accept_post(cross_post) 210 return 211 212 def _on_delete_post(self, outputs: list[cross.Output], post_id: dict): 213 identifier = json.dumps(post_id, sort_keys=True) 214 post = database.find_post(self.db, identifier, self.user_id, self.service) 215 if not post: 216 return 217 218 LOGGER.info("Deleting '%s'...", identifier) 219 for output in outputs: 220 output.delete_post(identifier) 221 database.delete_post(self.db, identifier, self.user_id, self.service) 222 223class BlueskyPdsInput(BlueskyInput): 224 def __init__(self, settings: dict, db: DataBaseWorker) -> None: 225 super().__init__(settings, db) 226 227 def __on_commit(self, outputs: list[cross.Output], message: firehose_models.MessageFrame): 228 blocks = message.body.get('blocks') 229 if not blocks: 230 return 231 232 parsed = parse_firehose(message) 233 if not isinstance(parsed, models.ComAtprotoSyncSubscribeRepos.Commit): 234 return 235 blocks = parsed.blocks 236 237 car = None 238 def get_lazy_repo() -> CAR: 239 nonlocal car, blocks 240 241 if isinstance(blocks, str): 242 blocks = blocks.encode() 243 assert blocks 244 245 if car: 246 return car 247 car = CAR.from_bytes(blocks) 248 return car 249 250 for op in parsed.ops: 251 if op.action == 'delete': 252 if not op.prev: 253 continue 254 255 if not op.path.startswith('app.bsky.feed.post'): 256 continue 257 258 self._on_delete_post(outputs, { 259 'cid': op.prev.encode(), 260 'uri': f'at://{parsed.repo}/{op.path}' 261 }) 262 continue 263 264 if op.action != 'create': 265 continue 266 267 if not op.cid: 268 continue 269 270 record_data = get_lazy_repo().blocks.get(op.cid) 271 if not record_data: 272 continue 273 274 record_dict = dict(record_data) 275 record_dict['$xpost.strongRef'] = { 276 'cid': op.cid.encode(), 277 'uri': f'at://{parsed.repo}/{op.path}' 278 } 279 if record_dict['$type'] == 'app.bsky.feed.post': 280 self._on_post(outputs, record_dict) 281 282 283 async def listen(self, outputs: list[cross.Output], submit: Callable[[Callable[[], Any]], Any]): 284 streaming: str = f"wss://{self.pds.split("://", 1)[1]}/xrpc" 285 286 client = AsyncFirehoseSubscribeReposClient(base_uri=streaming) 287 288 async def on_message(message: firehose_models.MessageFrame): 289 if message.header.t != '#commit': 290 return 291 292 if message.body.get('repo') != self.user_id: 293 return 294 295 if message.body.get('tooBig'): 296 LOGGER.error("#commit message is tooBig!") 297 return 298 299 submit(lambda: self.__on_commit(outputs, message)) 300 return 301 302 LOGGER.info("Listening to %s...", streaming + '/com.atproto.sync.subscribeRepos') 303 await client.start(on_message) 304 305ALLOWED_GATES = ['mentioned', 'following', 'followers', 'everybody'] 306 307class BlueskyOutputOptions: 308 def __init__(self, o: dict) -> None: 309 self.quote_gate: bool = False 310 self.thread_gate: list[str] = ['everybody'] 311 self.encode_videos: bool = True 312 313 quote_gate = o.get('quote_gate') 314 if quote_gate is not None: 315 self.quote_gate = bool(quote_gate) 316 317 thread_gate = o.get('thread_gate') 318 if thread_gate is not None: 319 if any([v not in ALLOWED_GATES for v in thread_gate]): 320 raise ValueError(f"'thread_gate' only accepts {', '.join(ALLOWED_GATES)} or [], got: {thread_gate}") 321 self.thread_gate = thread_gate 322 323 encode_videos = o.get('encode_videos') 324 if encode_videos is not None: 325 self.encode_videos = bool(encode_videos) 326 327class BlueskyOutput(cross.Output): 328 def __init__(self, input: cross.Input, settings: dict, db: DataBaseWorker) -> None: 329 super().__init__(input, settings, db) 330 self.options = BlueskyOutputOptions(settings.get('options') or {}) 331 332 if not util.as_envvar(settings.get('app-password')): 333 raise Exception("Account app password not provided!") 334 335 did, pds = resolve_identity( 336 handle=util.as_envvar(settings.get('handle')), 337 did=util.as_envvar(settings.get('did')), 338 pds=util.as_envvar(settings.get('pds')) 339 ) 340 341 reqs = Request(timeout=Timeout(None, connect=30.0)) 342 343 self.bsky = Client2(pds, request=reqs) 344 self.bsky.login(did, util.as_envvar(settings.get('app-password'))) 345 346 def _find_parent(self, parent_id: str): 347 login = self.bsky.me 348 if not login: 349 raise Exception("Client not logged in!") 350 351 thread_tuple = database.find_mapped_thread( 352 self.db, 353 parent_id, 354 self.input.user_id, 355 self.input.service, 356 login.did, 357 SERVICE 358 ) 359 360 if not thread_tuple: 361 LOGGER.error("Failed to find thread tuple in the database!") 362 return None 363 364 root_ref = json.loads(thread_tuple[0]) 365 reply_ref = json.loads(thread_tuple[1]) 366 367 root_record = models.AppBskyFeedPost.CreateRecordResponse(uri=str(root_ref['uri']), cid=str(root_ref['cid'])) 368 reply_record = models.AppBskyFeedPost.CreateRecordResponse(uri=str(reply_ref['uri']), cid=str(reply_ref['cid'])) 369 370 return ( 371 models.create_strong_ref(root_record), 372 models.create_strong_ref(reply_record), 373 thread_tuple[2], 374 thread_tuple[3] 375 ) 376 377 def _split_attachments(self, attachments: list[media_util.MediaInfo]): 378 sup_media: list[media_util.MediaInfo] = [] 379 unsup_media: list[media_util.MediaInfo] = [] 380 381 for a in attachments: 382 if a.mime.startswith('image/') or a.mime.startswith('video/'): # TODO convert gifs to videos 383 sup_media.append(a) 384 else: 385 unsup_media.append(a) 386 387 return (sup_media, unsup_media) 388 389 def _split_media_per_post( 390 self, 391 tokens: list[client_utils.TextBuilder], 392 media: list[media_util.MediaInfo]): 393 394 posts: list[dict] = [{"tokens": tokens, "attachments": []} for tokens in tokens] 395 available_indices: list[int] = list(range(len(posts))) 396 397 current_image_post_idx: int | None = None 398 399 def make_blank_post() -> dict: 400 return { 401 "tokens": [client_utils.TextBuilder().text('')], 402 "attachments": [] 403 } 404 405 def pop_next_empty_index() -> int: 406 if available_indices: 407 return available_indices.pop(0) 408 else: 409 new_idx = len(posts) 410 posts.append(make_blank_post()) 411 return new_idx 412 413 for att in media: 414 if att.mime.startswith('video/'): 415 current_image_post_idx = None 416 idx = pop_next_empty_index() 417 posts[idx]["attachments"].append(att) 418 elif att.mime.startswith('image/'): 419 if ( 420 current_image_post_idx is not None 421 and len(posts[current_image_post_idx]["attachments"]) < 4 422 ): 423 posts[current_image_post_idx]["attachments"].append(att) 424 else: 425 idx = pop_next_empty_index() 426 posts[idx]["attachments"].append(att) 427 current_image_post_idx = idx 428 429 result: list[tuple[client_utils.TextBuilder, list[media_util.MediaInfo]]] = [] 430 for p in posts: 431 result.append((p["tokens"], p["attachments"])) 432 return result 433 434 def accept_post(self, post: cross.Post): 435 login = self.bsky.me 436 if not login: 437 raise Exception("Client not logged in!") 438 439 parent_id = post.get_parent_id() 440 441 # used for db insertion 442 new_root_id = None 443 new_parent_id = None 444 445 root_ref = None 446 reply_ref = None 447 if parent_id: 448 parents = self._find_parent(parent_id) 449 if not parents: 450 return 451 root_ref, reply_ref, new_root_id, new_parent_id = parents 452 453 tokens = post.get_tokens().copy() 454 455 unique_labels: set[str] = set() 456 cw = post.get_cw() 457 if cw: 458 tokens.insert(0, cross.TextToken("CW: " + cw + "\n\n")) 459 unique_labels.add('graphic-media') 460 461 # from bsky.app, a post can only have one of those labels 462 if PORN_PATTERN.search(cw): 463 unique_labels.add('porn') 464 elif ADULT_PATTERN.search(cw): 465 unique_labels.add('sexual') 466 467 if post.is_sensitive(): 468 unique_labels.add('graphic-media') 469 470 labels = models.ComAtprotoLabelDefs.SelfLabels(values=[models.ComAtprotoLabelDefs.SelfLabel(val=label) for label in unique_labels]) 471 472 sup_media, unsup_media = self._split_attachments(post.get_attachments()) 473 474 if unsup_media: 475 if tokens: 476 tokens.append(cross.TextToken('\n')) 477 for i, attachment in enumerate(unsup_media): 478 tokens.append(cross.LinkToken( 479 attachment.url, 480 f"[{media_util.get_filename_from_url(attachment.url)}]" 481 )) 482 tokens.append(cross.TextToken(' ')) 483 484 485 split_tokens: list[list[cross.Token]] = cross.split_tokens(tokens, 300) 486 post_text: list[client_utils.TextBuilder] = [] 487 488 # convert tokens into rich text. skip post if contains unsupported tokens 489 for block in split_tokens: 490 rich_text = tokens_to_richtext(block) 491 492 if not rich_text: 493 LOGGER.error("Skipping '%s' as it contains invalid rich text types!", post.get_id()) 494 return 495 post_text.append(rich_text) 496 497 if not post_text: 498 post_text = [client_utils.TextBuilder().text('')] 499 500 for m in sup_media: 501 if m.mime.startswith('image/'): 502 if len(m.io) > 2_000_000: 503 LOGGER.error("Skipping post_id '%s', failed to download attachment! File too large.", post.get_id()) 504 return 505 506 if m.mime.startswith('video/'): 507 if m.mime != 'video/mp4' and not self.options.encode_videos: 508 LOGGER.info("Video is not mp4, but encoding is disabled. Skipping '%s'...", post.get_id()) 509 return 510 511 if len(m.io) > 100_000_000: 512 LOGGER.error("Skipping post_id '%s', failed to download attachment! File too large?", post.get_id()) 513 return 514 515 created_records: list[models.AppBskyFeedPost.CreateRecordResponse] = [] 516 baked_media = self._split_media_per_post(post_text, sup_media) 517 518 for text, attachments in baked_media: 519 if not attachments: 520 if reply_ref and root_ref: 521 new_post = self.bsky.send_post(text, reply_to=models.AppBskyFeedPost.ReplyRef( 522 parent=reply_ref, 523 root=root_ref 524 ), labels=labels, time_iso=post.get_post_date_iso()) 525 else: 526 new_post = self.bsky.send_post(text, labels=labels, time_iso=post.get_post_date_iso()) 527 root_ref = models.create_strong_ref(new_post) 528 529 self.bsky.create_gates( 530 self.options.thread_gate, 531 self.options.quote_gate, 532 new_post.uri, 533 time_iso=post.get_post_date_iso() 534 ) 535 reply_ref = models.create_strong_ref(new_post) 536 created_records.append(new_post) 537 else: 538 # if a single post is an image - everything else is an image 539 if attachments[0].mime.startswith('image/'): 540 images: list[bytes] = [] 541 image_alts: list[str] = [] 542 image_aspect_ratios: list[models.AppBskyEmbedDefs.AspectRatio] = [] 543 544 for attachment in attachments: 545 image_io = media_util.compress_image(attachment.io, quality=100) 546 metadata = media_util.get_media_meta(image_io) 547 548 if len(image_io) > 1_000_000: 549 LOGGER.info("Compressing %s...", attachment.name) 550 image_io = media_util.compress_image(image_io) 551 552 images.append(image_io) 553 image_alts.append(attachment.alt) 554 image_aspect_ratios.append(models.AppBskyEmbedDefs.AspectRatio( 555 width=metadata['width'], 556 height=metadata['height'] 557 )) 558 559 new_post = self.bsky.send_images( 560 text=post_text[0], 561 images=images, 562 image_alts=image_alts, 563 image_aspect_ratios=image_aspect_ratios, 564 reply_to= models.AppBskyFeedPost.ReplyRef( 565 parent=reply_ref, 566 root=root_ref 567 ) if root_ref and reply_ref else None, 568 labels=labels, 569 time_iso=post.get_post_date_iso() 570 ) 571 if not root_ref: 572 root_ref = models.create_strong_ref(new_post) 573 574 self.bsky.create_gates( 575 self.options.thread_gate, 576 self.options.quote_gate, 577 new_post.uri, 578 time_iso=post.get_post_date_iso() 579 ) 580 reply_ref = models.create_strong_ref(new_post) 581 created_records.append(new_post) 582 else: # video is guarantedd to be one 583 metadata = media_util.get_media_meta(attachments[0].io) 584 if metadata['duration'] > 180: 585 LOGGER.info("Skipping post_id '%s', video attachment too long!", post.get_id()) 586 return 587 588 video_io = attachments[0].io 589 if attachments[0].mime != 'video/mp4': 590 LOGGER.info("Converting %s to mp4...", attachments[0].name) 591 video_io = media_util.convert_to_mp4(video_io) 592 593 aspect_ratio = models.AppBskyEmbedDefs.AspectRatio( 594 width=metadata['width'], 595 height=metadata['height'] 596 ) 597 598 new_post = self.bsky.send_video( 599 text=post_text[0], 600 video=video_io, 601 video_aspect_ratio=aspect_ratio, 602 video_alt=attachments[0].alt, 603 reply_to= models.AppBskyFeedPost.ReplyRef( 604 parent=reply_ref, 605 root=root_ref 606 ) if root_ref and reply_ref else None, 607 labels=labels, 608 time_iso=post.get_post_date_iso() 609 ) 610 if not root_ref: 611 root_ref = models.create_strong_ref(new_post) 612 613 self.bsky.create_gates( 614 self.options.thread_gate, 615 self.options.quote_gate, 616 new_post.uri, 617 time_iso=post.get_post_date_iso() 618 ) 619 reply_ref = models.create_strong_ref(new_post) 620 created_records.append(new_post) 621 622 db_post = database.find_post(self.db, post.get_id(), self.input.user_id, self.input.service) 623 assert db_post, "ghghghhhhh" 624 625 db_identifiers = [json.dumps(cr.model_dump(), sort_keys=True) for cr in created_records] 626 627 if new_root_id is None or new_parent_id is None: 628 new_root_id = database.insert_post( 629 self.db, 630 db_identifiers[0], 631 login.did, 632 SERVICE 633 ) 634 new_parent_id = new_root_id 635 database.insert_mapping(self.db, db_post['id'], new_parent_id) 636 db_identifiers = db_identifiers[1:] 637 638 for db_id in db_identifiers: 639 new_parent_id = database.insert_reply( 640 self.db, 641 db_id, 642 login.did, 643 SERVICE, 644 new_parent_id, 645 new_root_id 646 ) 647 database.insert_mapping(self.db, db_post['id'], new_parent_id) 648 649 def delete_post(self, identifier: str): 650 login = self.bsky.me 651 if not login: 652 raise Exception("Client not logged in!") 653 654 post = database.find_post(self.db, identifier, self.input.user_id, self.input.service) 655 if not post: 656 return 657 658 mappings = database.find_mappings(self.db, post['id'], SERVICE, login.did) 659 for mapping in mappings[::-1]: 660 LOGGER.info("Deleting '%s'...", mapping[0]) 661 self.bsky.delete_post(json.loads(mapping[0])['uri']) 662 database.delete_post(self.db, mapping[0], SERVICE, login.did) 663 664 665def tokens_to_richtext(tokens: list[cross.Token]) -> client_utils.TextBuilder | None: 666 builder = client_utils.TextBuilder() 667 668 def flatten_link(href: str): 669 split = href.split('://', 1) 670 if len(split) > 1: 671 href = split[1] 672 673 if len(href) > 32: 674 href = href[:32] + '...' 675 676 return href 677 678 for token in tokens: 679 if isinstance(token, cross.TextToken): 680 builder.text(token.text) 681 elif isinstance(token, cross.LinkToken): 682 if util.canonical_label(token.label, token.href): 683 builder.link(flatten_link(token.href), token.href) 684 continue 685 686 builder.link(token.label, token.href) 687 elif isinstance(token, cross.TagToken): 688 builder.tag('#' + token.tag, token.tag) 689 else: 690 # fail on unsupported tokens 691 return None 692 693 return builder