social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
1from util import LOGGER 2import requests, websockets 3import util, media_util, json, cross 4import database 5from database import DataBaseWorker 6from typing import Callable, Any 7import asyncio, time 8 9from bs4 import BeautifulSoup, Tag 10from bs4.element import NavigableString 11import markeddown 12from html import unescape 13import re 14 15POSSIBLE_MIMES = [ 16 'audio/ogg', 17 'audio/mp3', 18 'image/webp', 19 'image/jpeg', 20 'image/png', 21 'video/mp4', 22 'video/quicktime', 23 'video/webm' 24] 25 26md_parser = markeddown.HTMLToMarkdownParser() 27md_parser.preserve_spaces = True 28 29def tokenize_post(status: dict) -> list[cross.Token]: 30 soup = BeautifulSoup(status['content'], "html.parser") 31 tokens: list[cross.Token] = [] 32 33 tags: list[dict] = status.get('tags', []) 34 mentions: list[dict] = status.get('mentions', []) 35 36 def mdd(html): 37 md_parser.feed(unescape(html)) 38 md = md_parser.get_markdown() 39 md_parser.reset() 40 return md 41 42 def recurse(node) -> None: 43 if isinstance(node, NavigableString): 44 tokens.append(cross.TextToken(str(node))) 45 return 46 47 if isinstance(node, Tag): 48 if node.name.lower() == "a": 49 href = node.get("href", "") 50 inner_html = "".join(str(c) for c in node.contents) 51 link_text_md = mdd(inner_html) 52 53 if link_text_md.startswith('@'): 54 as_mention = link_text_md[1:] 55 for block in mentions: 56 if href == block.get('url'): 57 tokens.append(cross.MentionToken(block['acct'], block['url'])) 58 return 59 elif as_mention == block.get('acct') or as_mention == block.get('username'): 60 tokens.append(cross.MentionToken(block['acct'], block['url'])) 61 return 62 63 if link_text_md.startswith('#'): 64 as_tag = link_text_md[1:].lower() 65 if any(as_tag == block.get('name') for block in tags): 66 tokens.append(cross.TagToken(link_text_md[1:])) 67 return 68 69 # idk if we can safely convert this to string 70 tokens.append(cross.LinkToken(str(href), link_text_md)) 71 return 72 73 if node.find("a") is not None: 74 for child in node.contents: 75 recurse(child) 76 return 77 78 serialized = str(node) 79 markdownified = mdd(serialized) 80 if markdownified: 81 tokens.append(cross.TextToken(markdownified)) 82 return 83 return 84 85 for child in soup.contents: 86 recurse(child) 87 88 last_token = tokens[-1] 89 if last_token and isinstance(last_token, cross.TextToken) and last_token.text.endswith('\n\n'): 90 tokens[-1] = cross.TextToken(last_token.text[:-2]) 91 92 return tokens 93 94MARKDOWNY = ['text/x.misskeymarkdown', 'text/markdown', 'text/plain'] 95 96class MastodonPost(cross.Post): 97 def __init__(self, status: dict, tokens: list[cross.Token], media_attachments: list[media_util.MediaInfo]) -> None: 98 super().__init__() 99 self.status = status 100 self.media_attachments = media_attachments 101 self.tokens = tokens 102 103 def get_tokens(self) -> list[cross.Token]: 104 return self.tokens 105 106 def get_parent_id(self) -> str | None: 107 return self.status.get('in_reply_to_id') 108 109 def get_post_date_iso(self) -> str: 110 date = self.status.get('created_at') 111 return date or super().get_post_date_iso() 112 113 def get_cw(self) -> str: 114 return self.status.get('spoiler_text') or '' 115 116 def get_id(self) -> str: 117 return self.status['id'] 118 119 def get_languages(self) -> list[str]: 120 if self.status.get('language'): 121 return [self.status['language']] 122 return [] 123 124 def is_sensitive(self) -> bool: 125 return self.status.get('sensitive', False) 126 127 def get_attachments(self) -> list[media_util.MediaInfo]: 128 return self.media_attachments 129 130ALLOWED_VISIBILITY = ['public', 'unlisted'] 131 132class MastodonInputOptions(): 133 def __init__(self, o: dict) -> None: 134 self.allowed_visibility = ALLOWED_VISIBILITY 135 self.filters = [re.compile(f) for f in o.get('regex_filters', [])] 136 137 allowed_visibility = o.get('allowed_visibility') 138 if allowed_visibility is not None: 139 if any([v not in ALLOWED_VISIBILITY for v in allowed_visibility]): 140 raise ValueError(f"'allowed_visibility' only accepts {', '.join(ALLOWED_VISIBILITY)}, got: {allowed_visibility}") 141 self.allowed_visibility = allowed_visibility 142 143class MastodonInput(cross.Input): 144 def __init__(self, settings: dict, db: DataBaseWorker) -> None: 145 self.options = MastodonInputOptions(settings.get('options', {})) 146 self.token = util.as_envvar(settings.get('token')) or (_ for _ in ()).throw(ValueError("'token' is required")) 147 instance: str = util.as_envvar(settings.get('instance')) or (_ for _ in ()).throw(ValueError("'instance' is required")) 148 149 service = instance[:-1] if instance.endswith('/') else instance 150 151 LOGGER.info("Verifying %s credentails...", service) 152 responce = requests.get(f"{service}/api/v1/accounts/verify_credentials", headers={ 153 'Authorization': f'Bearer {self.token}' 154 }) 155 if responce.status_code != 200: 156 LOGGER.error("Failed to validate user credentials!") 157 responce.raise_for_status() 158 return 159 160 super().__init__(service, responce.json()["id"], settings, db) 161 self.streaming = self._get_streaming_url() 162 163 if not self.streaming: 164 raise Exception("Instance %s does not support streaming!", service) 165 166 def _get_streaming_url(self): 167 response = requests.get(f"{self.service}/api/v1/instance") 168 response.raise_for_status() 169 data: dict = response.json() 170 return (data.get('urls') or {}).get('streaming_api') 171 172 def __to_tokens(self, status: dict): 173 content_type = status.get('content_type', 'text/plain') 174 raw_text = status.get('text') 175 176 tags: list[str] = [] 177 for tag in status.get('tags', []): 178 tags.append(tag['name']) 179 180 mentions: list[tuple[str, str]] = [] 181 for mention in status.get('mentions', []): 182 mentions.append(('@' + mention['username'], '@' + mention['acct'])) 183 184 if raw_text and content_type in MARKDOWNY: 185 return cross.tokenize_markdown(raw_text, tags, mentions) 186 187 pleroma_ext: dict | None = status.get('pleroma', {}).get('content') 188 if pleroma_ext: 189 for ctype in MARKDOWNY: 190 if ctype not in pleroma_ext: 191 continue 192 193 return cross.tokenize_markdown(pleroma_ext[ctype], tags, mentions) 194 195 return tokenize_post(status) 196 197 def _on_create_post(self, outputs: list[cross.Output], status: dict): 198 # skip events from other users 199 if (status.get('account') or {})['id'] != self.user_id: 200 return 201 202 if status.get('reblog') or (status.get('quote_id') or status.get('quote')) or status.get('poll'): 203 # TODO polls not supported on bsky. maybe 3rd party? skip for now 204 # we don't handle reblogs. possible with bridgy(?) and self 205 # we don't handle quotes. 206 LOGGER.info("Skipping '%s'! Reblog, quote or poll..", status['id']) 207 return 208 209 in_reply: str | None = status.get('in_reply_to_id') 210 in_reply_to: str | None = status.get('in_reply_to_account_id') 211 if in_reply_to and in_reply_to != self.user_id: 212 # We don't support replies. 213 LOGGER.info("Skipping '%s'! Reply to other user..", status['id']) 214 return 215 216 if status.get('visibility') not in self.options.allowed_visibility: 217 # Skip f/o and direct posts 218 LOGGER.info("Skipping '%s'! '%s' visibility..", status['id'], status.get('visibility')) 219 return 220 221 success = database.try_insert_post(self.db, status['id'], in_reply, self.user_id, self.service) 222 if not success: 223 LOGGER.info("Skipping '%s' as parent post was not found in db!", status['id']) 224 return 225 226 tokens = self.__to_tokens(status) 227 if not cross.test_filters(tokens, self.options.filters): 228 LOGGER.info("Skipping '%s'. Matched a filter!", status['id']) 229 return 230 231 LOGGER.info("Crossposting '%s'...", status['id']) 232 233 media_attachments: list[media_util.MediaInfo] = [] 234 for attachment in status.get('media_attachments', []): 235 LOGGER.info("Downloading %s...", attachment['url']) 236 info = media_util.download_media(attachment['url'], attachment.get('description') or '') 237 if not info: 238 LOGGER.error("Skipping '%s'. Failed to download media!", status['id']) 239 return 240 media_attachments.append(info) 241 242 cross_post = MastodonPost(status, tokens, media_attachments) 243 for output in outputs: 244 output.accept_post(cross_post) 245 246 def _on_delete_post(self, outputs: list[cross.Output], identifier: str): 247 post = database.find_post(self.db, identifier, self.user_id, self.service) 248 if not post: 249 return 250 251 LOGGER.info("Deleting '%s'...", identifier) 252 for output in outputs: 253 output.delete_post(identifier) 254 database.delete_post(self.db, identifier, self.user_id, self.service) 255 256 def _on_post(self, outputs: list[cross.Output], event: str, payload: str): 257 if event == 'update': 258 self._on_create_post(outputs, json.loads(payload)) 259 elif event == 'delete': 260 self._on_delete_post(outputs, payload) 261 262 async def listen(self, outputs: list[cross.Output], submit: Callable[[Callable[[], Any]], Any]): 263 uri = f"{self.streaming}/api/v1/streaming?stream=user&access_token={self.token}" 264 265 async for ws in websockets.connect(uri, extra_headers={"User-Agent": "XPost/0.0.3"}): 266 try: 267 LOGGER.info("Listening to %s...", self.streaming) 268 269 async def listen_for_messages(): 270 async for msg in ws: 271 data = json.loads(msg) 272 event: str = data.get('event') 273 payload: str = data.get('payload') 274 275 submit(lambda: self._on_post(outputs, str(event), str(payload))) 276 277 listen = asyncio.create_task(listen_for_messages()) 278 279 await asyncio.gather(listen) 280 except websockets.ConnectionClosedError as e: 281 LOGGER.error(e, stack_info=True, exc_info=True) 282 LOGGER.info("Reconnecting to %s...", self.streaming) 283 continue 284 285ALLOWED_POSTING_VISIBILITY = ['public', 'unlisted', 'private'] 286 287class MastodonOutputOptions(): 288 def __init__(self, o: dict) -> None: 289 self.visibility = 'public' 290 291 visibility = o.get('visibility') 292 if visibility is not None: 293 if visibility not in ALLOWED_POSTING_VISIBILITY: 294 raise ValueError(f"'visibility' only accepts {', '.join(ALLOWED_POSTING_VISIBILITY)}, got: {visibility}") 295 self.visibility = visibility 296 297class MastodonOutput(cross.Output): 298 def __init__(self, input: cross.Input, settings: dict, db: DataBaseWorker) -> None: 299 super().__init__(input, settings, db) 300 self.options = settings.get('options') or {} 301 self.token = util.as_envvar(settings.get('token')) or (_ for _ in ()).throw(ValueError("'token' is required")) 302 instance: str = util.as_envvar(settings.get('instance')) or (_ for _ in ()).throw(ValueError("'instance' is required")) 303 304 self.service = instance[:-1] if instance.endswith('/') else instance 305 306 LOGGER.info("Verifying %s credentails...", self.service) 307 responce = requests.get(f"{self.service}/api/v1/accounts/verify_credentials", headers={ 308 'Authorization': f'Bearer {self.token}' 309 }) 310 if responce.status_code != 200: 311 LOGGER.error("Failed to validate user credentials!") 312 responce.raise_for_status() 313 return 314 self.user_id: str = responce.json()["id"] 315 316 LOGGER.info("Getting %s configuration...", self.service) 317 responce = requests.get(f"{self.service}/api/v1/instance", headers={ 318 'Authorization': f'Bearer {self.token}' 319 }) 320 if responce.status_code != 200: 321 LOGGER.error("Failed to get instance info!") 322 responce.raise_for_status() 323 return 324 325 instance_info: dict = responce.json() 326 configuration: dict = instance_info['configuration'] 327 328 statuses_config: dict = configuration.get('statuses', {}) 329 self.max_characters: int = statuses_config.get('max_characters', 500) 330 self.max_media_attachments: int = statuses_config.get('max_media_attachments', 4) 331 self.characters_reserved_per_url: int = statuses_config.get('characters_reserved_per_url', 23) 332 333 media_config: dict = configuration.get('media_attachments', {}) 334 self.image_size_limit: int = media_config.get('image_size_limit', 16777216) 335 self.video_size_limit: int = media_config.get('video_size_limit', 103809024) 336 self.supported_mime_types: list[str] = media_config.get('supported_mime_types', POSSIBLE_MIMES) 337 338 # *oma: max post chars 339 max_toot_chars = instance_info.get('max_toot_chars') 340 if max_toot_chars: 341 self.max_characters: int = max_toot_chars 342 343 # *oma: max upload limit 344 upload_limit = instance_info.get('upload_limit') 345 if upload_limit: 346 self.image_size_limit: int = upload_limit 347 self.video_size_limit: int = upload_limit 348 349 # *oma ext: supported text types 350 self.text_format = 'text/plain' 351 pleroma = instance_info.get('pleroma') 352 if pleroma: 353 post_formats: list[str] = pleroma.get('metadata', {}).get('post_formats', []) 354 if 'text/x.misskeymarkdown' in post_formats: 355 self.text_format = 'text/x.misskeymarkdown' 356 elif 'text/markdown' in post_formats: 357 self.text_format = 'text/markdown' 358 359 def upload_media(self, attachments: list[media_util.MediaInfo]) -> list[str] | None: 360 for a in attachments: 361 if a.mime.startswith('image/') and len(a.io) > self.image_size_limit: 362 return None 363 364 if a.mime.startswith('video/') and len(a.io) > self.video_size_limit: 365 return None 366 367 if not a.mime.startswith('image/') and not a.mime.startswith('video/'): 368 if len(a.io) > 7_000_000: 369 return None 370 371 uploads: list[dict] = [] 372 for a in attachments: 373 data = {} 374 if a.alt: 375 data['description'] = a.alt 376 377 req = requests.post(f"{self.service}/api/v2/media", headers= { 378 'Authorization': f'Bearer {self.token}' 379 }, files={'file': (a.name, a.io, a.mime)}, data=data) 380 381 if req.status_code == 200: 382 LOGGER.info("Uploaded %s! (%s)", a.name, req.json()['id']) 383 uploads.append({ 384 'done': True, 385 'id': req.json()['id'] 386 }) 387 elif req.status_code == 202: 388 LOGGER.info("Waiting for %s to process!", a.name) 389 uploads.append({ 390 'done': False, 391 'id': req.json()['id'] 392 }) 393 else: 394 LOGGER.error("Failed to upload %s! %s", a.name, req.text) 395 req.raise_for_status() 396 397 while any([not val['done'] for val in uploads]): 398 LOGGER.info("Waiting for media to process...") 399 time.sleep(3) 400 for media in uploads: 401 if media['done']: 402 continue 403 404 reqs = requests.get(f'{self.service}/api/v1/media/{media['id']}', headers={ 405 'Authorization': f'Bearer {self.token}' 406 }) 407 408 if reqs.status_code == 206: 409 continue 410 411 if reqs.status_code == 200: 412 media['done'] = True 413 continue 414 reqs.raise_for_status() 415 416 return [val['id'] for val in uploads] 417 418 def token_to_string(self, tokens: list[cross.Token]) -> str | None: 419 p_text: str = '' 420 421 for token in tokens: 422 if isinstance(token, cross.TextToken): 423 p_text += token.text 424 elif isinstance(token, cross.TagToken): 425 p_text += '#' + token.tag 426 elif isinstance(token, cross.LinkToken): 427 if util.canonical_label(token.label, token.href): 428 p_text += token.href 429 else: 430 if self.text_format == 'text/plain': 431 p_text += f'{token.label}: {token.href}' 432 elif self.text_format in {'text/x.misskeymarkdown', 'text/markdown'}: 433 p_text += f'[{token.label}]({token.href})' 434 else: 435 return None 436 437 return p_text 438 439 def split_tokens_media(self, tokens: list[cross.Token], media: list[media_util.MediaInfo]): 440 split_tokens = cross.split_tokens(tokens, self.max_characters, self.characters_reserved_per_url) 441 post_text: list[str] = [] 442 443 for block in split_tokens: 444 baked_text = self.token_to_string(block) 445 446 if baked_text is None: 447 return None 448 post_text.append(baked_text) 449 450 if not post_text: 451 post_text = [''] 452 453 posts: list[dict] = [{"text": post_text, "attachments": []} for post_text in post_text] 454 available_indices: list[int] = list(range(len(posts))) 455 456 current_image_post_idx: int | None = None 457 458 def make_blank_post() -> dict: 459 return { 460 "text": '', 461 "attachments": [] 462 } 463 464 def pop_next_empty_index() -> int: 465 if available_indices: 466 return available_indices.pop(0) 467 else: 468 new_idx = len(posts) 469 posts.append(make_blank_post()) 470 return new_idx 471 472 for att in media: 473 if ( 474 current_image_post_idx is not None 475 and len(posts[current_image_post_idx]["attachments"]) < self.max_media_attachments 476 ): 477 posts[current_image_post_idx]["attachments"].append(att) 478 else: 479 idx = pop_next_empty_index() 480 posts[idx]["attachments"].append(att) 481 current_image_post_idx = idx 482 483 result: list[tuple[str, list[media_util.MediaInfo]]] = [] 484 485 for p in posts: 486 result.append((p['text'], p["attachments"])) 487 488 return result 489 490 def accept_post(self, post: cross.Post): 491 parent_id = post.get_parent_id() 492 493 new_root_id: int | None = None 494 new_parent_id: int | None = None 495 496 reply_ref: str | None = None 497 if parent_id: 498 thread_tuple = database.find_mapped_thread( 499 self.db, 500 parent_id, 501 self.input.user_id, 502 self.input.service, 503 self.user_id, 504 self.service 505 ) 506 507 if not thread_tuple: 508 LOGGER.error("Failed to find thread tuple in the database!") 509 return None 510 511 _, reply_ref, new_root_id, new_parent_id = thread_tuple 512 513 lang: str 514 if post.get_languages(): 515 lang = post.get_languages()[0] 516 else: 517 lang = 'en' 518 519 raw_statuses = self.split_tokens_media(post.get_tokens(), post.get_attachments()) 520 if not raw_statuses: 521 LOGGER.error("Failed to split post into statuses?") 522 return None 523 baked_statuses = [] 524 525 for status, raw_media in raw_statuses: 526 media: list[str] | None = None 527 if raw_media: 528 media = self.upload_media(raw_media) 529 if not media: 530 LOGGER.error("Failed to upload attachments!") 531 return None 532 baked_statuses.append((status, media)) 533 continue 534 baked_statuses.append((status,[])) 535 536 created_statuses: list[str] = [] 537 538 for status, media in baked_statuses: 539 payload = { 540 'status': status, 541 'media_ids': media or [], 542 'spoiler_text': post.get_cw(), 543 'visibility': self.options.get('visibility', 'public'), 544 'content_type': self.text_format, 545 'language': lang 546 } 547 548 if media: 549 payload['sensitive'] = post.is_sensitive() 550 551 if post.get_cw(): 552 payload['sensitive'] = True 553 554 if not status: 555 payload['status'] = '🖼️' 556 557 if reply_ref: 558 payload['in_reply_to_id'] = reply_ref 559 560 reqs = requests.post(f'{self.service}/api/v1/statuses', headers={ 561 'Authorization': f'Bearer {self.token}', 562 'Content-Type': 'application/json' 563 }, json=payload) 564 565 if reqs.status_code != 200: 566 LOGGER.info("Failed to post status! %s - %s", reqs.status_code, reqs.text) 567 reqs.raise_for_status() 568 569 reply_ref = reqs.json()['id'] 570 LOGGER.info("Created new status %s!", reply_ref) 571 572 created_statuses.append(reqs.json()['id']) 573 574 db_post = database.find_post(self.db, post.get_id(), self.input.user_id, self.input.service) 575 assert db_post, "ghghghhhhh" 576 577 if new_root_id is None or new_parent_id is None: 578 new_root_id = database.insert_post( 579 self.db, 580 created_statuses[0], 581 self.user_id, 582 self.service 583 ) 584 new_parent_id = new_root_id 585 database.insert_mapping(self.db, db_post['id'], new_parent_id) 586 created_statuses = created_statuses[1:] 587 588 for db_id in created_statuses: 589 new_parent_id = database.insert_reply( 590 self.db, 591 db_id, 592 self.user_id, 593 self.service, 594 new_parent_id, 595 new_root_id 596 ) 597 database.insert_mapping(self.db, db_post['id'], new_parent_id) 598 599 def delete_post(self, identifier: str): 600 post = database.find_post(self.db, identifier, self.input.user_id, self.input.service) 601 if not post: 602 return 603 604 mappings = database.find_mappings(self.db, post['id'], self.service, self.user_id) 605 for mapping in mappings[::-1]: 606 LOGGER.info("Deleting '%s'...", mapping[0]) 607 requests.delete(f'{self.service}/api/v1/statuses/{mapping[0]}', headers={ 608 'Authorization': f'Bearer {self.token}' 609 }) 610 database.delete_post(self.db, mapping[0], self.service, self.user_id) 611