social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
1from util import LOGGER 2import requests, websockets 3import util, media_util, json, cross 4import database 5from database import DataBaseWorker 6from typing import Callable, Any 7import asyncio, time 8 9from bs4 import BeautifulSoup, Tag 10from bs4.element import NavigableString 11import markeddown 12from html import unescape 13import re 14 15POSSIBLE_MIMES = [ 16 'audio/ogg', 17 'audio/mp3', 18 'image/webp', 19 'image/jpeg', 20 'image/png', 21 'video/mp4', 22 'video/quicktime', 23 'video/webm' 24] 25 26md_parser = markeddown.HTMLToMarkdownParser() 27md_parser.preserve_spaces = True 28 29def tokenize_post(status: dict) -> list[cross.Token]: 30 soup = BeautifulSoup(status['content'], "html.parser") 31 tokens: list[cross.Token] = [] 32 33 tags: list[dict] = status.get('tags', []) 34 mentions: list[dict] = status.get('mentions', []) 35 36 def mdd(html): 37 md_parser.feed(unescape(html)) 38 md = md_parser.get_markdown() 39 md_parser.reset() 40 return md 41 42 def recurse(node) -> None: 43 if isinstance(node, NavigableString): 44 tokens.append(cross.TextToken(str(node))) 45 return 46 47 if isinstance(node, Tag): 48 if node.name.lower() == "a": 49 href = node.get("href", "") 50 inner_html = "".join(str(c) for c in node.contents) 51 link_text_md = mdd(inner_html) 52 53 if link_text_md.startswith('@'): 54 as_mention = link_text_md[1:] 55 for block in mentions: 56 if href == block.get('url'): 57 tokens.append(cross.MentionToken(block['acct'], block['url'])) 58 return 59 elif as_mention == block.get('acct') or as_mention == block.get('username'): 60 tokens.append(cross.MentionToken(block['acct'], block['url'])) 61 return 62 63 if link_text_md.startswith('#'): 64 as_tag = link_text_md[1:].lower() 65 if any(as_tag == block.get('name') for block in tags): 66 tokens.append(cross.TagToken(link_text_md[1:])) 67 return 68 69 # idk if we can safely convert this to string 70 tokens.append(cross.LinkToken(str(href), link_text_md)) 71 return 72 73 if node.find("a") is not None: 74 for child in node.contents: 75 recurse(child) 76 return 77 78 serialized = str(node) 79 markdownified = mdd(serialized) 80 if markdownified: 81 tokens.append(cross.TextToken(markdownified)) 82 return 83 return 84 85 for child in soup.contents: 86 recurse(child) 87 88 last_token = tokens[-1] 89 if last_token and isinstance(last_token, cross.TextToken) and last_token.text.endswith('\n\n'): 90 tokens[-1] = cross.TextToken(last_token.text[:-2]) 91 92 return tokens 93 94MARKDOWNY = ['text/x.misskeymarkdown', 'text/markdown', 'text/plain'] 95 96class MastodonPost(cross.Post): 97 def __init__(self, status: dict, tokens: list[cross.Token], media_attachments: list[media_util.MediaInfo]) -> None: 98 super().__init__() 99 self.status = status 100 self.media_attachments = media_attachments 101 self.tokens = tokens 102 103 def get_tokens(self) -> list[cross.Token]: 104 return self.tokens 105 106 def get_parent_id(self) -> str | None: 107 return self.status.get('in_reply_to_id') 108 109 def get_post_date_iso(self) -> str: 110 date = self.status.get('created_at') 111 return date or super().get_post_date_iso() 112 113 def get_cw(self) -> str: 114 return self.status.get('spoiler_text') or '' 115 116 def get_id(self) -> str: 117 return self.status['id'] 118 119 def get_languages(self) -> list[str]: 120 if self.status.get('language'): 121 return [self.status['language']] 122 return [] 123 124 def is_sensitive(self) -> bool: 125 return self.status.get('sensitive', False) 126 127 def get_attachments(self) -> list[media_util.MediaInfo]: 128 return self.media_attachments 129 130ALLOWED_VISIBILITY = ['public', 'unlisted'] 131 132class MastodonInputOptions(): 133 def __init__(self, o: dict) -> None: 134 self.allowed_visibility = ALLOWED_VISIBILITY 135 self.filters = [re.compile(f) for f in o.get('regex_filters', [])] 136 137 allowed_visibility = o.get('allowed_visibility') 138 if allowed_visibility is not None: 139 if any([v not in ALLOWED_VISIBILITY for v in allowed_visibility]): 140 raise ValueError(f"'allowed_visibility' only accepts {', '.join(ALLOWED_VISIBILITY)}, got: {allowed_visibility}") 141 self.allowed_visibility = allowed_visibility 142 143class MastodonInput(cross.Input): 144 def __init__(self, settings: dict, db: DataBaseWorker) -> None: 145 self.options = MastodonInputOptions(settings.get('options', {})) 146 self.token = util.as_envvar(settings.get('token')) or (_ for _ in ()).throw(ValueError("'token' is required")) 147 instance: str = util.as_envvar(settings.get('instance')) or (_ for _ in ()).throw(ValueError("'instance' is required")) 148 149 service = instance[:-1] if instance.endswith('/') else instance 150 151 LOGGER.info("Verifying %s credentails...", service) 152 responce = requests.get(f"{service}/api/v1/accounts/verify_credentials", headers={ 153 'Authorization': f'Bearer {self.token}' 154 }) 155 if responce.status_code != 200: 156 LOGGER.error("Failed to validate user credentials!") 157 responce.raise_for_status() 158 return 159 160 super().__init__(service, responce.json()["id"], settings, db) 161 self.streaming = self._get_streaming_url() 162 163 if not self.streaming: 164 raise Exception("Instance %s does not support streaming!", service) 165 166 def _get_streaming_url(self): 167 response = requests.get(f"{self.service}/api/v1/instance") 168 response.raise_for_status() 169 data: dict = response.json() 170 return (data.get('urls') or {}).get('streaming_api') 171 172 def __to_tokens(self, status: dict): 173 content_type = status.get('content_type', 'text/plain') 174 raw_text = status.get('text') 175 176 tags: list[str] = [] 177 for tag in status.get('tags', []): 178 tags.append(tag['name']) 179 180 mentions: list[tuple[str, str]] = [] 181 for mention in status.get('mentions', []): 182 mentions.append(('@' + mention['username'], '@' + mention['acct'])) 183 184 if raw_text and content_type in MARKDOWNY: 185 return cross.tokenize_markdown(raw_text, tags, mentions) 186 187 akkoma_ext: dict | None = status.get('akkoma', {}).get('source') 188 if akkoma_ext: 189 if akkoma_ext.get('mediaType') in MARKDOWNY: 190 return cross.tokenize_markdown(akkoma_ext["content"], tags, mentions) 191 192 return tokenize_post(status) 193 194 def _on_create_post(self, outputs: list[cross.Output], status: dict): 195 # skip events from other users 196 if (status.get('account') or {})['id'] != self.user_id: 197 return 198 199 if status.get('reblog') or (status.get('quote_id') or status.get('quote')) or status.get('poll'): 200 # TODO polls not supported on bsky. maybe 3rd party? skip for now 201 # we don't handle reblogs. possible with bridgy(?) and self 202 # we don't handle quotes. 203 LOGGER.info("Skipping '%s'! Reblog, quote or poll..", status['id']) 204 return 205 206 in_reply: str | None = status.get('in_reply_to_id') 207 in_reply_to: str | None = status.get('in_reply_to_account_id') 208 if in_reply_to and in_reply_to != self.user_id: 209 # We don't support replies. 210 LOGGER.info("Skipping '%s'! Reply to other user..", status['id']) 211 return 212 213 if status.get('visibility') not in self.options.allowed_visibility: 214 # Skip f/o and direct posts 215 LOGGER.info("Skipping '%s'! '%s' visibility..", status['id'], status.get('visibility')) 216 return 217 218 success = database.try_insert_post(self.db, status['id'], in_reply, self.user_id, self.service) 219 if not success: 220 LOGGER.info("Skipping '%s' as parent post was not found in db!", status['id']) 221 return 222 223 tokens = self.__to_tokens(status) 224 if not cross.test_filters(tokens, self.options.filters): 225 LOGGER.info("Skipping '%s'. Matched a filter!", status['id']) 226 return 227 228 LOGGER.info("Crossposting '%s'...", status['id']) 229 230 media_attachments: list[media_util.MediaInfo] = [] 231 for attachment in status.get('media_attachments', []): 232 LOGGER.info("Downloading %s...", attachment['url']) 233 info = media_util.download_media(attachment['url'], attachment.get('description') or '') 234 if not info: 235 LOGGER.error("Skipping '%s'. Failed to download media!", status['id']) 236 return 237 media_attachments.append(info) 238 239 cross_post = MastodonPost(status, tokens, media_attachments) 240 for output in outputs: 241 output.accept_post(cross_post) 242 243 def _on_delete_post(self, outputs: list[cross.Output], identifier: str): 244 post = database.find_post(self.db, identifier, self.user_id, self.service) 245 if not post: 246 return 247 248 LOGGER.info("Deleting '%s'...", identifier) 249 for output in outputs: 250 output.delete_post(identifier) 251 database.delete_post(self.db, identifier, self.user_id, self.service) 252 253 def _on_post(self, outputs: list[cross.Output], event: str, payload: str): 254 if event == 'update': 255 self._on_create_post(outputs, json.loads(payload)) 256 elif event == 'delete': 257 self._on_delete_post(outputs, payload) 258 259 async def listen(self, outputs: list[cross.Output], submit: Callable[[Callable[[], Any]], Any]): 260 uri = f"{self.streaming}/api/v1/streaming?stream=user&access_token={self.token}" 261 262 async for ws in websockets.connect(uri, extra_headers={"User-Agent": "XPost/0.0.3"}): 263 try: 264 LOGGER.info("Listening to %s...", self.streaming) 265 266 async def listen_for_messages(): 267 async for msg in ws: 268 data = json.loads(msg) 269 event: str = data.get('event') 270 payload: str = data.get('payload') 271 272 submit(lambda: self._on_post(outputs, str(event), str(payload))) 273 274 listen = asyncio.create_task(listen_for_messages()) 275 276 await asyncio.gather(listen) 277 except websockets.ConnectionClosedError as e: 278 LOGGER.error(e, stack_info=True, exc_info=True) 279 LOGGER.info("Reconnecting to %s...", self.streaming) 280 continue 281 282ALLOWED_POSTING_VISIBILITY = ['public', 'unlisted', 'private'] 283 284class MastodonOutputOptions(): 285 def __init__(self, o: dict) -> None: 286 self.visibility = 'public' 287 288 visibility = o.get('visibility') 289 if visibility is not None: 290 if visibility not in ALLOWED_POSTING_VISIBILITY: 291 raise ValueError(f"'visibility' only accepts {', '.join(ALLOWED_POSTING_VISIBILITY)}, got: {visibility}") 292 self.visibility = visibility 293 294class MastodonOutput(cross.Output): 295 def __init__(self, input: cross.Input, settings: dict, db: DataBaseWorker) -> None: 296 super().__init__(input, settings, db) 297 self.options = settings.get('options') or {} 298 self.token = util.as_envvar(settings.get('token')) or (_ for _ in ()).throw(ValueError("'token' is required")) 299 instance: str = util.as_envvar(settings.get('instance')) or (_ for _ in ()).throw(ValueError("'instance' is required")) 300 301 self.service = instance[:-1] if instance.endswith('/') else instance 302 303 LOGGER.info("Verifying %s credentails...", self.service) 304 responce = requests.get(f"{self.service}/api/v1/accounts/verify_credentials", headers={ 305 'Authorization': f'Bearer {self.token}' 306 }) 307 if responce.status_code != 200: 308 LOGGER.error("Failed to validate user credentials!") 309 responce.raise_for_status() 310 return 311 self.user_id: str = responce.json()["id"] 312 313 LOGGER.info("Getting %s configuration...", self.service) 314 responce = requests.get(f"{self.service}/api/v1/instance", headers={ 315 'Authorization': f'Bearer {self.token}' 316 }) 317 if responce.status_code != 200: 318 LOGGER.error("Failed to get instance info!") 319 responce.raise_for_status() 320 return 321 322 instance_info: dict = responce.json() 323 configuration: dict = instance_info['configuration'] 324 325 statuses_config: dict = configuration.get('statuses', {}) 326 self.max_characters: int = statuses_config.get('max_characters', 500) 327 self.max_media_attachments: int = statuses_config.get('max_media_attachments', 4) 328 self.characters_reserved_per_url: int = statuses_config.get('characters_reserved_per_url', 23) 329 330 media_config: dict = configuration.get('media_attachments', {}) 331 self.image_size_limit: int = media_config.get('image_size_limit', 16777216) 332 self.video_size_limit: int = media_config.get('video_size_limit', 103809024) 333 self.supported_mime_types: list[str] = media_config.get('supported_mime_types', POSSIBLE_MIMES) 334 335 # *oma: max post chars 336 max_toot_chars = instance_info.get('max_toot_chars') 337 if max_toot_chars: 338 self.max_characters: int = max_toot_chars 339 340 # *oma: max upload limit 341 upload_limit = instance_info.get('upload_limit') 342 if upload_limit: 343 self.image_size_limit: int = upload_limit 344 self.video_size_limit: int = upload_limit 345 346 # *oma ext: supported text types 347 self.text_format = 'text/plain' 348 pleroma = instance_info.get('pleroma') 349 if pleroma: 350 post_formats: list[str] = pleroma.get('metadata', {}).get('post_formats', []) 351 if 'text/x.misskeymarkdown' in post_formats: 352 self.text_format = 'text/x.misskeymarkdown' 353 elif 'text/markdown' in post_formats: 354 self.text_format = 'text/markdown' 355 356 def upload_media(self, attachments: list[media_util.MediaInfo]) -> list[str] | None: 357 for a in attachments: 358 if a.mime.startswith('image/') and len(a.io) > self.image_size_limit: 359 return None 360 361 if a.mime.startswith('video/') and len(a.io) > self.video_size_limit: 362 return None 363 364 if not a.mime.startswith('image/') and not a.mime.startswith('video/'): 365 if len(a.io) > 7_000_000: 366 return None 367 368 uploads: list[dict] = [] 369 for a in attachments: 370 data = {} 371 if a.alt: 372 data['description'] = a.alt 373 374 req = requests.post(f"{self.service}/api/v2/media", headers= { 375 'Authorization': f'Bearer {self.token}' 376 }, files={'file': (a.name, a.io, a.mime)}, data=data) 377 378 if req.status_code == 200: 379 LOGGER.info("Uploaded %s! (%s)", a.name, req.json()['id']) 380 uploads.append({ 381 'done': True, 382 'id': req.json()['id'] 383 }) 384 elif req.status_code == 202: 385 LOGGER.info("Waiting for %s to process!", a.name) 386 uploads.append({ 387 'done': False, 388 'id': req.json()['id'] 389 }) 390 else: 391 LOGGER.error("Failed to upload %s! %s", a.name, req.text) 392 req.raise_for_status() 393 394 while any([not val['done'] for val in uploads]): 395 LOGGER.info("Waiting for media to process...") 396 time.sleep(3) 397 for media in uploads: 398 if media['done']: 399 continue 400 401 reqs = requests.get(f'{self.service}/api/v1/media/{media['id']}', headers={ 402 'Authorization': f'Bearer {self.token}' 403 }) 404 405 if reqs.status_code == 206: 406 continue 407 408 if reqs.status_code == 200: 409 media['done'] = True 410 continue 411 reqs.raise_for_status() 412 413 return [val['id'] for val in uploads] 414 415 def token_to_string(self, tokens: list[cross.Token]) -> str | None: 416 p_text: str = '' 417 418 for token in tokens: 419 if isinstance(token, cross.TextToken): 420 p_text += token.text 421 elif isinstance(token, cross.TagToken): 422 p_text += '#' + token.tag 423 elif isinstance(token, cross.LinkToken): 424 if util.canonical_label(token.label, token.href): 425 p_text += token.href 426 else: 427 if self.text_format == 'text/plain': 428 p_text += f'{token.label}: {token.href}' 429 elif self.text_format in {'text/x.misskeymarkdown', 'text/markdown'}: 430 p_text += f'[{token.label}]({token.href})' 431 else: 432 return None 433 434 return p_text 435 436 def split_tokens_media(self, tokens: list[cross.Token], media: list[media_util.MediaInfo]): 437 split_tokens = cross.split_tokens(tokens, self.max_characters, self.characters_reserved_per_url) 438 post_text: list[str] = [] 439 440 for block in split_tokens: 441 baked_text = self.token_to_string(block) 442 443 if baked_text is None: 444 return None 445 post_text.append(baked_text) 446 447 if not post_text: 448 post_text = [''] 449 450 posts: list[dict] = [{"text": post_text, "attachments": []} for post_text in post_text] 451 available_indices: list[int] = list(range(len(posts))) 452 453 current_image_post_idx: int | None = None 454 455 def make_blank_post() -> dict: 456 return { 457 "text": '', 458 "attachments": [] 459 } 460 461 def pop_next_empty_index() -> int: 462 if available_indices: 463 return available_indices.pop(0) 464 else: 465 new_idx = len(posts) 466 posts.append(make_blank_post()) 467 return new_idx 468 469 for att in media: 470 if ( 471 current_image_post_idx is not None 472 and len(posts[current_image_post_idx]["attachments"]) < self.max_media_attachments 473 ): 474 posts[current_image_post_idx]["attachments"].append(att) 475 else: 476 idx = pop_next_empty_index() 477 posts[idx]["attachments"].append(att) 478 current_image_post_idx = idx 479 480 result: list[tuple[str, list[media_util.MediaInfo]]] = [] 481 482 for p in posts: 483 result.append((p['text'], p["attachments"])) 484 485 return result 486 487 def accept_post(self, post: cross.Post): 488 parent_id = post.get_parent_id() 489 490 new_root_id: int | None = None 491 new_parent_id: int | None = None 492 493 reply_ref: str | None = None 494 if parent_id: 495 thread_tuple = database.find_mapped_thread( 496 self.db, 497 parent_id, 498 self.input.user_id, 499 self.input.service, 500 self.user_id, 501 self.service 502 ) 503 504 if not thread_tuple: 505 LOGGER.error("Failed to find thread tuple in the database!") 506 return None 507 508 _, reply_ref, new_root_id, new_parent_id = thread_tuple 509 510 lang: str 511 if post.get_languages(): 512 lang = post.get_languages()[0] 513 else: 514 lang = 'en' 515 516 raw_statuses = self.split_tokens_media(post.get_tokens(), post.get_attachments()) 517 if not raw_statuses: 518 LOGGER.error("Failed to split post into statuses?") 519 return None 520 baked_statuses = [] 521 522 for status, raw_media in raw_statuses: 523 media: list[str] | None = None 524 if raw_media: 525 media = self.upload_media(raw_media) 526 if not media: 527 LOGGER.error("Failed to upload attachments!") 528 return None 529 baked_statuses.append((status, media)) 530 continue 531 baked_statuses.append((status,[])) 532 533 created_statuses: list[str] = [] 534 535 for status, media in baked_statuses: 536 payload = { 537 'status': status, 538 'media_ids': media or [], 539 'spoiler_text': post.get_cw(), 540 'visibility': self.options.get('visibility', 'public'), 541 'content_type': self.text_format, 542 'language': lang 543 } 544 545 if media: 546 payload['sensitive'] = post.is_sensitive() 547 548 if post.get_cw(): 549 payload['sensitive'] = True 550 551 if not status: 552 payload['status'] = '🖼️' 553 554 if reply_ref: 555 payload['in_reply_to_id'] = reply_ref 556 557 reqs = requests.post(f'{self.service}/api/v1/statuses', headers={ 558 'Authorization': f'Bearer {self.token}', 559 'Content-Type': 'application/json' 560 }, json=payload) 561 562 if reqs.status_code != 200: 563 LOGGER.info("Failed to post status! %s - %s", reqs.status_code, reqs.text) 564 reqs.raise_for_status() 565 566 reply_ref = reqs.json()['id'] 567 LOGGER.info("Created new status %s!", reply_ref) 568 569 created_statuses.append(reqs.json()['id']) 570 571 db_post = database.find_post(self.db, post.get_id(), self.input.user_id, self.input.service) 572 assert db_post, "ghghghhhhh" 573 574 if new_root_id is None or new_parent_id is None: 575 new_root_id = database.insert_post( 576 self.db, 577 created_statuses[0], 578 self.user_id, 579 self.service 580 ) 581 new_parent_id = new_root_id 582 database.insert_mapping(self.db, db_post['id'], new_parent_id) 583 created_statuses = created_statuses[1:] 584 585 for db_id in created_statuses: 586 new_parent_id = database.insert_reply( 587 self.db, 588 db_id, 589 self.user_id, 590 self.service, 591 new_parent_id, 592 new_root_id 593 ) 594 database.insert_mapping(self.db, db_post['id'], new_parent_id) 595 596 def delete_post(self, identifier: str): 597 post = database.find_post(self.db, identifier, self.input.user_id, self.input.service) 598 if not post: 599 return 600 601 mappings = database.find_mappings(self.db, post['id'], self.service, self.user_id) 602 for mapping in mappings[::-1]: 603 LOGGER.info("Deleting '%s'...", mapping[0]) 604 requests.delete(f'{self.service}/api/v1/statuses/{mapping[0]}', headers={ 605 'Authorization': f'Bearer {self.token}' 606 }) 607 database.delete_post(self.db, mapping[0], self.service, self.user_id) 608