social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
1from util import LOGGER 2import requests, websockets 3import util, media_util, json, cross 4import database 5from database import DataBaseWorker 6from typing import Callable, Any 7import asyncio, time 8 9from bs4 import BeautifulSoup, Tag 10from bs4.element import NavigableString 11import markeddown 12from html import unescape 13import re 14 15POSSIBLE_MIMES = [ 16 'audio/ogg', 17 'audio/mp3', 18 'image/webp', 19 'image/jpeg', 20 'image/png', 21 'video/mp4', 22 'video/quicktime', 23 'video/webm' 24] 25 26md_parser = markeddown.HTMLToMarkdownParser() 27md_parser.preserve_spaces = True 28 29def tokenize_post(status: dict) -> list[cross.Token]: 30 if not status.get('content'): 31 return [] 32 33 soup = BeautifulSoup(status['content'], "html.parser") 34 tokens: list[cross.Token] = [] 35 36 tags: list[dict] = status.get('tags', []) 37 mentions: list[dict] = status.get('mentions', []) 38 39 def mdd(html): 40 md_parser.feed(unescape(html)) 41 md = md_parser.get_markdown() 42 md_parser.reset() 43 return md 44 45 def recurse(node) -> None: 46 if isinstance(node, NavigableString): 47 tokens.append(cross.TextToken(str(node))) 48 return 49 50 if isinstance(node, Tag): 51 if node.name.lower() == "a": 52 href = node.get("href", "") 53 inner_html = "".join(str(c) for c in node.contents) 54 link_text_md = mdd(inner_html) 55 56 if link_text_md.startswith('@'): 57 as_mention = link_text_md[1:] 58 for block in mentions: 59 if href == block.get('url'): 60 tokens.append(cross.MentionToken(block['acct'], block['url'])) 61 return 62 elif as_mention == block.get('acct') or as_mention == block.get('username'): 63 tokens.append(cross.MentionToken(block['acct'], block['url'])) 64 return 65 66 if link_text_md.startswith('#'): 67 as_tag = link_text_md[1:].lower() 68 if any(as_tag == block.get('name') for block in tags): 69 tokens.append(cross.TagToken(link_text_md[1:])) 70 return 71 72 # idk if we can safely convert this to string 73 tokens.append(cross.LinkToken(str(href), link_text_md)) 74 return 75 76 if node.find("a") is not None: 77 for child in node.contents: 78 recurse(child) 79 return 80 81 serialized = str(node) 82 markdownified = mdd(serialized) 83 if markdownified: 84 tokens.append(cross.TextToken(markdownified)) 85 return 86 return 87 88 for child in soup.contents: 89 recurse(child) 90 91 if not tokens: 92 return [] 93 94 last_token = tokens[-1] 95 if last_token and isinstance(last_token, cross.TextToken) and last_token.text.endswith('\n\n'): 96 tokens[-1] = cross.TextToken(last_token.text[:-2]) 97 98 return tokens 99 100MARKDOWNY = ['text/x.misskeymarkdown', 'text/markdown', 'text/plain'] 101 102class MastodonPost(cross.Post): 103 def __init__(self, status: dict, tokens: list[cross.Token], media_attachments: list[media_util.MediaInfo]) -> None: 104 super().__init__() 105 self.status = status 106 self.media_attachments = media_attachments 107 self.tokens = tokens 108 109 def get_tokens(self) -> list[cross.Token]: 110 return self.tokens 111 112 def get_parent_id(self) -> str | None: 113 return self.status.get('in_reply_to_id') 114 115 def get_post_date_iso(self) -> str: 116 date = self.status.get('created_at') 117 return date or super().get_post_date_iso() 118 119 def get_cw(self) -> str: 120 return self.status.get('spoiler_text') or '' 121 122 def get_id(self) -> str: 123 return self.status['id'] 124 125 def get_languages(self) -> list[str]: 126 if self.status.get('language'): 127 return [self.status['language']] 128 return [] 129 130 def is_sensitive(self) -> bool: 131 return self.status.get('sensitive', False) 132 133 def get_attachments(self) -> list[media_util.MediaInfo]: 134 return self.media_attachments 135 136ALLOWED_VISIBILITY = ['public', 'unlisted'] 137 138class MastodonInputOptions(): 139 def __init__(self, o: dict) -> None: 140 self.allowed_visibility = ALLOWED_VISIBILITY 141 self.filters = [re.compile(f) for f in o.get('regex_filters', [])] 142 143 allowed_visibility = o.get('allowed_visibility') 144 if allowed_visibility is not None: 145 if any([v not in ALLOWED_VISIBILITY for v in allowed_visibility]): 146 raise ValueError(f"'allowed_visibility' only accepts {', '.join(ALLOWED_VISIBILITY)}, got: {allowed_visibility}") 147 self.allowed_visibility = allowed_visibility 148 149class MastodonInput(cross.Input): 150 def __init__(self, settings: dict, db: DataBaseWorker) -> None: 151 self.options = MastodonInputOptions(settings.get('options', {})) 152 self.token = util.as_envvar(settings.get('token')) or (_ for _ in ()).throw(ValueError("'token' is required")) 153 instance: str = util.as_envvar(settings.get('instance')) or (_ for _ in ()).throw(ValueError("'instance' is required")) 154 155 service = instance[:-1] if instance.endswith('/') else instance 156 157 LOGGER.info("Verifying %s credentails...", service) 158 responce = requests.get(f"{service}/api/v1/accounts/verify_credentials", headers={ 159 'Authorization': f'Bearer {self.token}' 160 }) 161 if responce.status_code != 200: 162 LOGGER.error("Failed to validate user credentials!") 163 responce.raise_for_status() 164 return 165 166 super().__init__(service, responce.json()["id"], settings, db) 167 self.streaming = self._get_streaming_url() 168 169 if not self.streaming: 170 raise Exception("Instance %s does not support streaming!", service) 171 172 def _get_streaming_url(self): 173 response = requests.get(f"{self.service}/api/v1/instance") 174 response.raise_for_status() 175 data: dict = response.json() 176 return (data.get('urls') or {}).get('streaming_api') 177 178 def __to_tokens(self, status: dict): 179 content_type = status.get('content_type', 'text/plain') 180 raw_text = status.get('text') 181 182 tags: list[str] = [] 183 for tag in status.get('tags', []): 184 tags.append(tag['name']) 185 186 mentions: list[tuple[str, str]] = [] 187 for mention in status.get('mentions', []): 188 mentions.append(('@' + mention['username'], '@' + mention['acct'])) 189 190 if raw_text and content_type in MARKDOWNY: 191 return cross.tokenize_markdown(raw_text, tags, mentions) 192 193 akkoma_ext: dict | None = status.get('akkoma', {}).get('source') 194 if akkoma_ext: 195 if akkoma_ext.get('mediaType') in MARKDOWNY: 196 return cross.tokenize_markdown(akkoma_ext["content"], tags, mentions) 197 198 return tokenize_post(status) 199 200 def _on_create_post(self, outputs: list[cross.Output], status: dict): 201 # skip events from other users 202 if (status.get('account') or {})['id'] != self.user_id: 203 return 204 205 if status.get('reblog') or (status.get('quote_id') or status.get('quote')) or status.get('poll'): 206 # TODO polls not supported on bsky. maybe 3rd party? skip for now 207 # we don't handle reblogs. possible with bridgy(?) and self 208 # we don't handle quotes. 209 LOGGER.info("Skipping '%s'! Reblog, quote or poll..", status['id']) 210 return 211 212 in_reply: str | None = status.get('in_reply_to_id') 213 in_reply_to: str | None = status.get('in_reply_to_account_id') 214 if in_reply_to and in_reply_to != self.user_id: 215 # We don't support replies. 216 LOGGER.info("Skipping '%s'! Reply to other user..", status['id']) 217 return 218 219 if status.get('visibility') not in self.options.allowed_visibility: 220 # Skip f/o and direct posts 221 LOGGER.info("Skipping '%s'! '%s' visibility..", status['id'], status.get('visibility')) 222 return 223 224 success = database.try_insert_post(self.db, status['id'], in_reply, self.user_id, self.service) 225 if not success: 226 LOGGER.info("Skipping '%s' as parent post was not found in db!", status['id']) 227 return 228 229 tokens = self.__to_tokens(status) 230 if not cross.test_filters(tokens, self.options.filters): 231 LOGGER.info("Skipping '%s'. Matched a filter!", status['id']) 232 return 233 234 LOGGER.info("Crossposting '%s'...", status['id']) 235 236 media_attachments: list[media_util.MediaInfo] = [] 237 for attachment in status.get('media_attachments', []): 238 LOGGER.info("Downloading %s...", attachment['url']) 239 info = media_util.download_media(attachment['url'], attachment.get('description') or '') 240 if not info: 241 LOGGER.error("Skipping '%s'. Failed to download media!", status['id']) 242 return 243 media_attachments.append(info) 244 245 cross_post = MastodonPost(status, tokens, media_attachments) 246 for output in outputs: 247 output.accept_post(cross_post) 248 249 def _on_delete_post(self, outputs: list[cross.Output], identifier: str): 250 post = database.find_post(self.db, identifier, self.user_id, self.service) 251 if not post: 252 return 253 254 LOGGER.info("Deleting '%s'...", identifier) 255 for output in outputs: 256 output.delete_post(identifier) 257 database.delete_post(self.db, identifier, self.user_id, self.service) 258 259 def _on_post(self, outputs: list[cross.Output], event: str, payload: str): 260 if event == 'update': 261 self._on_create_post(outputs, json.loads(payload)) 262 elif event == 'delete': 263 self._on_delete_post(outputs, payload) 264 265 async def listen(self, outputs: list[cross.Output], submit: Callable[[Callable[[], Any]], Any]): 266 uri = f"{self.streaming}/api/v1/streaming?stream=user&access_token={self.token}" 267 268 async for ws in websockets.connect(uri, extra_headers={"User-Agent": "XPost/0.0.3"}): 269 try: 270 LOGGER.info("Listening to %s...", self.streaming) 271 272 async def listen_for_messages(): 273 async for msg in ws: 274 data = json.loads(msg) 275 event: str = data.get('event') 276 payload: str = data.get('payload') 277 278 submit(lambda: self._on_post(outputs, str(event), str(payload))) 279 280 listen = asyncio.create_task(listen_for_messages()) 281 282 await asyncio.gather(listen) 283 except websockets.ConnectionClosedError as e: 284 LOGGER.error(e, stack_info=True, exc_info=True) 285 LOGGER.info("Reconnecting to %s...", self.streaming) 286 continue 287 288ALLOWED_POSTING_VISIBILITY = ['public', 'unlisted', 'private'] 289 290class MastodonOutputOptions(): 291 def __init__(self, o: dict) -> None: 292 self.visibility = 'public' 293 294 visibility = o.get('visibility') 295 if visibility is not None: 296 if visibility not in ALLOWED_POSTING_VISIBILITY: 297 raise ValueError(f"'visibility' only accepts {', '.join(ALLOWED_POSTING_VISIBILITY)}, got: {visibility}") 298 self.visibility = visibility 299 300class MastodonOutput(cross.Output): 301 def __init__(self, input: cross.Input, settings: dict, db: DataBaseWorker) -> None: 302 super().__init__(input, settings, db) 303 self.options = settings.get('options') or {} 304 self.token = util.as_envvar(settings.get('token')) or (_ for _ in ()).throw(ValueError("'token' is required")) 305 instance: str = util.as_envvar(settings.get('instance')) or (_ for _ in ()).throw(ValueError("'instance' is required")) 306 307 self.service = instance[:-1] if instance.endswith('/') else instance 308 309 LOGGER.info("Verifying %s credentails...", self.service) 310 responce = requests.get(f"{self.service}/api/v1/accounts/verify_credentials", headers={ 311 'Authorization': f'Bearer {self.token}' 312 }) 313 if responce.status_code != 200: 314 LOGGER.error("Failed to validate user credentials!") 315 responce.raise_for_status() 316 return 317 self.user_id: str = responce.json()["id"] 318 319 LOGGER.info("Getting %s configuration...", self.service) 320 responce = requests.get(f"{self.service}/api/v1/instance", headers={ 321 'Authorization': f'Bearer {self.token}' 322 }) 323 if responce.status_code != 200: 324 LOGGER.error("Failed to get instance info!") 325 responce.raise_for_status() 326 return 327 328 instance_info: dict = responce.json() 329 configuration: dict = instance_info['configuration'] 330 331 statuses_config: dict = configuration.get('statuses', {}) 332 self.max_characters: int = statuses_config.get('max_characters', 500) 333 self.max_media_attachments: int = statuses_config.get('max_media_attachments', 4) 334 self.characters_reserved_per_url: int = statuses_config.get('characters_reserved_per_url', 23) 335 336 media_config: dict = configuration.get('media_attachments', {}) 337 self.image_size_limit: int = media_config.get('image_size_limit', 16777216) 338 self.video_size_limit: int = media_config.get('video_size_limit', 103809024) 339 self.supported_mime_types: list[str] = media_config.get('supported_mime_types', POSSIBLE_MIMES) 340 341 # *oma: max post chars 342 max_toot_chars = instance_info.get('max_toot_chars') 343 if max_toot_chars: 344 self.max_characters: int = max_toot_chars 345 346 # *oma: max upload limit 347 upload_limit = instance_info.get('upload_limit') 348 if upload_limit: 349 self.image_size_limit: int = upload_limit 350 self.video_size_limit: int = upload_limit 351 352 # *oma ext: supported text types 353 self.text_format = 'text/plain' 354 pleroma = instance_info.get('pleroma') 355 if pleroma: 356 post_formats: list[str] = pleroma.get('metadata', {}).get('post_formats', []) 357 if 'text/x.misskeymarkdown' in post_formats: 358 self.text_format = 'text/x.misskeymarkdown' 359 elif 'text/markdown' in post_formats: 360 self.text_format = 'text/markdown' 361 362 def upload_media(self, attachments: list[media_util.MediaInfo]) -> list[str] | None: 363 for a in attachments: 364 if a.mime.startswith('image/') and len(a.io) > self.image_size_limit: 365 return None 366 367 if a.mime.startswith('video/') and len(a.io) > self.video_size_limit: 368 return None 369 370 if not a.mime.startswith('image/') and not a.mime.startswith('video/'): 371 if len(a.io) > 7_000_000: 372 return None 373 374 uploads: list[dict] = [] 375 for a in attachments: 376 data = {} 377 if a.alt: 378 data['description'] = a.alt 379 380 req = requests.post(f"{self.service}/api/v2/media", headers= { 381 'Authorization': f'Bearer {self.token}' 382 }, files={'file': (a.name, a.io, a.mime)}, data=data) 383 384 if req.status_code == 200: 385 LOGGER.info("Uploaded %s! (%s)", a.name, req.json()['id']) 386 uploads.append({ 387 'done': True, 388 'id': req.json()['id'] 389 }) 390 elif req.status_code == 202: 391 LOGGER.info("Waiting for %s to process!", a.name) 392 uploads.append({ 393 'done': False, 394 'id': req.json()['id'] 395 }) 396 else: 397 LOGGER.error("Failed to upload %s! %s", a.name, req.text) 398 req.raise_for_status() 399 400 while any([not val['done'] for val in uploads]): 401 LOGGER.info("Waiting for media to process...") 402 time.sleep(3) 403 for media in uploads: 404 if media['done']: 405 continue 406 407 reqs = requests.get(f'{self.service}/api/v1/media/{media['id']}', headers={ 408 'Authorization': f'Bearer {self.token}' 409 }) 410 411 if reqs.status_code == 206: 412 continue 413 414 if reqs.status_code == 200: 415 media['done'] = True 416 continue 417 reqs.raise_for_status() 418 419 return [val['id'] for val in uploads] 420 421 def token_to_string(self, tokens: list[cross.Token]) -> str | None: 422 p_text: str = '' 423 424 for token in tokens: 425 if isinstance(token, cross.TextToken): 426 p_text += token.text 427 elif isinstance(token, cross.TagToken): 428 p_text += '#' + token.tag 429 elif isinstance(token, cross.LinkToken): 430 if util.canonical_label(token.label, token.href): 431 p_text += token.href 432 else: 433 if self.text_format == 'text/plain': 434 p_text += f'{token.label}: {token.href}' 435 elif self.text_format in {'text/x.misskeymarkdown', 'text/markdown'}: 436 p_text += f'[{token.label}]({token.href})' 437 else: 438 return None 439 440 return p_text 441 442 def split_tokens_media(self, tokens: list[cross.Token], media: list[media_util.MediaInfo]): 443 split_tokens = cross.split_tokens(tokens, self.max_characters, self.characters_reserved_per_url) 444 post_text: list[str] = [] 445 446 for block in split_tokens: 447 baked_text = self.token_to_string(block) 448 449 if baked_text is None: 450 return None 451 post_text.append(baked_text) 452 453 if not post_text: 454 post_text = [''] 455 456 posts: list[dict] = [{"text": post_text, "attachments": []} for post_text in post_text] 457 available_indices: list[int] = list(range(len(posts))) 458 459 current_image_post_idx: int | None = None 460 461 def make_blank_post() -> dict: 462 return { 463 "text": '', 464 "attachments": [] 465 } 466 467 def pop_next_empty_index() -> int: 468 if available_indices: 469 return available_indices.pop(0) 470 else: 471 new_idx = len(posts) 472 posts.append(make_blank_post()) 473 return new_idx 474 475 for att in media: 476 if ( 477 current_image_post_idx is not None 478 and len(posts[current_image_post_idx]["attachments"]) < self.max_media_attachments 479 ): 480 posts[current_image_post_idx]["attachments"].append(att) 481 else: 482 idx = pop_next_empty_index() 483 posts[idx]["attachments"].append(att) 484 current_image_post_idx = idx 485 486 result: list[tuple[str, list[media_util.MediaInfo]]] = [] 487 488 for p in posts: 489 result.append((p['text'], p["attachments"])) 490 491 return result 492 493 def accept_post(self, post: cross.Post): 494 parent_id = post.get_parent_id() 495 496 new_root_id: int | None = None 497 new_parent_id: int | None = None 498 499 reply_ref: str | None = None 500 if parent_id: 501 thread_tuple = database.find_mapped_thread( 502 self.db, 503 parent_id, 504 self.input.user_id, 505 self.input.service, 506 self.user_id, 507 self.service 508 ) 509 510 if not thread_tuple: 511 LOGGER.error("Failed to find thread tuple in the database!") 512 return None 513 514 _, reply_ref, new_root_id, new_parent_id = thread_tuple 515 516 lang: str 517 if post.get_languages(): 518 lang = post.get_languages()[0] 519 else: 520 lang = 'en' 521 522 raw_statuses = self.split_tokens_media(post.get_tokens(), post.get_attachments()) 523 if not raw_statuses: 524 LOGGER.error("Failed to split post into statuses?") 525 return None 526 baked_statuses = [] 527 528 for status, raw_media in raw_statuses: 529 media: list[str] | None = None 530 if raw_media: 531 media = self.upload_media(raw_media) 532 if not media: 533 LOGGER.error("Failed to upload attachments!") 534 return None 535 baked_statuses.append((status, media)) 536 continue 537 baked_statuses.append((status,[])) 538 539 created_statuses: list[str] = [] 540 541 for status, media in baked_statuses: 542 payload = { 543 'status': status, 544 'media_ids': media or [], 545 'spoiler_text': post.get_cw(), 546 'visibility': self.options.get('visibility', 'public'), 547 'content_type': self.text_format, 548 'language': lang 549 } 550 551 if media: 552 payload['sensitive'] = post.is_sensitive() 553 554 if post.get_cw(): 555 payload['sensitive'] = True 556 557 if not status: 558 payload['status'] = '🖼️' 559 560 if reply_ref: 561 payload['in_reply_to_id'] = reply_ref 562 563 reqs = requests.post(f'{self.service}/api/v1/statuses', headers={ 564 'Authorization': f'Bearer {self.token}', 565 'Content-Type': 'application/json' 566 }, json=payload) 567 568 if reqs.status_code != 200: 569 LOGGER.info("Failed to post status! %s - %s", reqs.status_code, reqs.text) 570 reqs.raise_for_status() 571 572 reply_ref = reqs.json()['id'] 573 LOGGER.info("Created new status %s!", reply_ref) 574 575 created_statuses.append(reqs.json()['id']) 576 577 db_post = database.find_post(self.db, post.get_id(), self.input.user_id, self.input.service) 578 assert db_post, "ghghghhhhh" 579 580 if new_root_id is None or new_parent_id is None: 581 new_root_id = database.insert_post( 582 self.db, 583 created_statuses[0], 584 self.user_id, 585 self.service 586 ) 587 new_parent_id = new_root_id 588 database.insert_mapping(self.db, db_post['id'], new_parent_id) 589 created_statuses = created_statuses[1:] 590 591 for db_id in created_statuses: 592 new_parent_id = database.insert_reply( 593 self.db, 594 db_id, 595 self.user_id, 596 self.service, 597 new_parent_id, 598 new_root_id 599 ) 600 database.insert_mapping(self.db, db_post['id'], new_parent_id) 601 602 def delete_post(self, identifier: str): 603 post = database.find_post(self.db, identifier, self.input.user_id, self.input.service) 604 if not post: 605 return 606 607 mappings = database.find_mappings(self.db, post['id'], self.service, self.user_id) 608 for mapping in mappings[::-1]: 609 LOGGER.info("Deleting '%s'...", mapping[0]) 610 requests.delete(f'{self.service}/api/v1/statuses/{mapping[0]}', headers={ 611 'Authorization': f'Bearer {self.token}' 612 }) 613 database.delete_post(self.db, mapping[0], self.service, self.user_id) 614