from util import LOGGER import requests, websockets import util, media_util, json, cross import database from database import DataBaseWorker from typing import Callable, Any import asyncio, time from bs4 import BeautifulSoup, Tag from bs4.element import NavigableString import markeddown from html import unescape import re POSSIBLE_MIMES = [ 'audio/ogg', 'audio/mp3', 'image/webp', 'image/jpeg', 'image/png', 'video/mp4', 'video/quicktime', 'video/webm' ] md_parser = markeddown.HTMLToMarkdownParser() md_parser.preserve_spaces = True def tokenize_post(status: dict) -> list[cross.Token]: soup = BeautifulSoup(status['content'], "html.parser") tokens: list[cross.Token] = [] tags: list[dict] = status.get('tags', []) mentions: list[dict] = status.get('mentions', []) def mdd(html): md_parser.feed(unescape(html)) md = md_parser.get_markdown() md_parser.reset() return md def recurse(node) -> None: if isinstance(node, NavigableString): tokens.append(cross.TextToken(str(node))) return if isinstance(node, Tag): if node.name.lower() == "a": href = node.get("href", "") inner_html = "".join(str(c) for c in node.contents) link_text_md = mdd(inner_html) if link_text_md.startswith('@'): as_mention = link_text_md[1:] for block in mentions: if href == block.get('url'): tokens.append(cross.MentionToken(block['acct'], block['url'])) return elif as_mention == block.get('acct') or as_mention == block.get('username'): tokens.append(cross.MentionToken(block['acct'], block['url'])) return if link_text_md.startswith('#'): as_tag = link_text_md[1:].lower() if any(as_tag == block.get('name') for block in tags): tokens.append(cross.TagToken(link_text_md[1:])) return # idk if we can safely convert this to string tokens.append(cross.LinkToken(str(href), link_text_md)) return if node.find("a") is not None: for child in node.contents: recurse(child) return serialized = str(node) markdownified = mdd(serialized) if markdownified: tokens.append(cross.TextToken(markdownified)) return return for child in soup.contents: recurse(child) last_token = tokens[-1] if last_token and isinstance(last_token, cross.TextToken) and last_token.text.endswith('\n\n'): tokens[-1] = cross.TextToken(last_token.text[:-2]) return tokens MARKDOWNY = ['text/x.misskeymarkdown', 'text/markdown', 'text/plain'] class MastodonPost(cross.Post): def __init__(self, status: dict, tokens: list[cross.Token], media_attachments: list[media_util.MediaInfo]) -> None: super().__init__() self.status = status self.media_attachments = media_attachments self.tokens = tokens def get_tokens(self) -> list[cross.Token]: return self.tokens def get_parent_id(self) -> str | None: return self.status.get('in_reply_to_id') def get_post_date_iso(self) -> str: date = self.status.get('created_at') return date or super().get_post_date_iso() def get_cw(self) -> str: return self.status.get('spoiler_text') or '' def get_id(self) -> str: return self.status['id'] def get_languages(self) -> list[str]: if self.status.get('language'): return [self.status['language']] return [] def is_sensitive(self) -> bool: return self.status.get('sensitive', False) def get_attachments(self) -> list[media_util.MediaInfo]: return self.media_attachments ALLOWED_VISIBILITY = ['public', 'unlisted'] class MastodonInputOptions(): def __init__(self, o: dict) -> None: self.allowed_visibility = ALLOWED_VISIBILITY self.filters = [re.compile(f) for f in o.get('regex_filters', [])] allowed_visibility = o.get('allowed_visibility') if allowed_visibility is not None: if any([v not in ALLOWED_VISIBILITY for v in allowed_visibility]): raise ValueError(f"'allowed_visibility' only accepts {', '.join(ALLOWED_VISIBILITY)}, got: {allowed_visibility}") self.allowed_visibility = allowed_visibility class MastodonInput(cross.Input): def __init__(self, settings: dict, db: DataBaseWorker) -> None: self.options = MastodonInputOptions(settings.get('options', {})) self.token = util.as_envvar(settings.get('token')) or (_ for _ in ()).throw(ValueError("'token' is required")) instance: str = util.as_envvar(settings.get('instance')) or (_ for _ in ()).throw(ValueError("'instance' is required")) service = instance[:-1] if instance.endswith('/') else instance LOGGER.info("Verifying %s credentails...", service) responce = requests.get(f"{service}/api/v1/accounts/verify_credentials", headers={ 'Authorization': f'Bearer {self.token}' }) if responce.status_code != 200: LOGGER.error("Failed to validate user credentials!") responce.raise_for_status() return super().__init__(service, responce.json()["id"], settings, db) self.streaming = self._get_streaming_url() if not self.streaming: raise Exception("Instance %s does not support streaming!", service) def _get_streaming_url(self): response = requests.get(f"{self.service}/api/v1/instance") response.raise_for_status() data: dict = response.json() return (data.get('urls') or {}).get('streaming_api') def __to_tokens(self, status: dict): content_type = status.get('content_type', 'text/plain') raw_text = status.get('text') tags: list[str] = [] for tag in status.get('tags', []): tags.append(tag['name']) mentions: list[tuple[str, str]] = [] for mention in status.get('mentions', []): mentions.append(('@' + mention['username'], '@' + mention['acct'])) if raw_text and content_type in MARKDOWNY: return cross.tokenize_markdown(raw_text, tags, mentions) akkoma_ext: dict | None = status.get('akkoma', {}).get('source') if akkoma_ext: if akkoma_ext.get('mediaType') in MARKDOWNY: return cross.tokenize_markdown(akkoma_ext["content"], tags, mentions) return tokenize_post(status) def _on_create_post(self, outputs: list[cross.Output], status: dict): # skip events from other users if (status.get('account') or {})['id'] != self.user_id: return if status.get('reblog') or (status.get('quote_id') or status.get('quote')) or status.get('poll'): # TODO polls not supported on bsky. maybe 3rd party? skip for now # we don't handle reblogs. possible with bridgy(?) and self # we don't handle quotes. LOGGER.info("Skipping '%s'! Reblog, quote or poll..", status['id']) return in_reply: str | None = status.get('in_reply_to_id') in_reply_to: str | None = status.get('in_reply_to_account_id') if in_reply_to and in_reply_to != self.user_id: # We don't support replies. LOGGER.info("Skipping '%s'! Reply to other user..", status['id']) return if status.get('visibility') not in self.options.allowed_visibility: # Skip f/o and direct posts LOGGER.info("Skipping '%s'! '%s' visibility..", status['id'], status.get('visibility')) return success = database.try_insert_post(self.db, status['id'], in_reply, self.user_id, self.service) if not success: LOGGER.info("Skipping '%s' as parent post was not found in db!", status['id']) return tokens = self.__to_tokens(status) if not cross.test_filters(tokens, self.options.filters): LOGGER.info("Skipping '%s'. Matched a filter!", status['id']) return LOGGER.info("Crossposting '%s'...", status['id']) media_attachments: list[media_util.MediaInfo] = [] for attachment in status.get('media_attachments', []): LOGGER.info("Downloading %s...", attachment['url']) info = media_util.download_media(attachment['url'], attachment.get('description') or '') if not info: LOGGER.error("Skipping '%s'. Failed to download media!", status['id']) return media_attachments.append(info) cross_post = MastodonPost(status, tokens, media_attachments) for output in outputs: output.accept_post(cross_post) def _on_delete_post(self, outputs: list[cross.Output], identifier: str): post = database.find_post(self.db, identifier, self.user_id, self.service) if not post: return LOGGER.info("Deleting '%s'...", identifier) for output in outputs: output.delete_post(identifier) database.delete_post(self.db, identifier, self.user_id, self.service) def _on_post(self, outputs: list[cross.Output], event: str, payload: str): if event == 'update': self._on_create_post(outputs, json.loads(payload)) elif event == 'delete': self._on_delete_post(outputs, payload) async def listen(self, outputs: list[cross.Output], submit: Callable[[Callable[[], Any]], Any]): uri = f"{self.streaming}/api/v1/streaming?stream=user&access_token={self.token}" async for ws in websockets.connect(uri, extra_headers={"User-Agent": "XPost/0.0.3"}): try: LOGGER.info("Listening to %s...", self.streaming) async def listen_for_messages(): async for msg in ws: data = json.loads(msg) event: str = data.get('event') payload: str = data.get('payload') submit(lambda: self._on_post(outputs, str(event), str(payload))) listen = asyncio.create_task(listen_for_messages()) await asyncio.gather(listen) except websockets.ConnectionClosedError as e: LOGGER.error(e, stack_info=True, exc_info=True) LOGGER.info("Reconnecting to %s...", self.streaming) continue ALLOWED_POSTING_VISIBILITY = ['public', 'unlisted', 'private'] class MastodonOutputOptions(): def __init__(self, o: dict) -> None: self.visibility = 'public' visibility = o.get('visibility') if visibility is not None: if visibility not in ALLOWED_POSTING_VISIBILITY: raise ValueError(f"'visibility' only accepts {', '.join(ALLOWED_POSTING_VISIBILITY)}, got: {visibility}") self.visibility = visibility class MastodonOutput(cross.Output): def __init__(self, input: cross.Input, settings: dict, db: DataBaseWorker) -> None: super().__init__(input, settings, db) self.options = settings.get('options') or {} self.token = util.as_envvar(settings.get('token')) or (_ for _ in ()).throw(ValueError("'token' is required")) instance: str = util.as_envvar(settings.get('instance')) or (_ for _ in ()).throw(ValueError("'instance' is required")) self.service = instance[:-1] if instance.endswith('/') else instance LOGGER.info("Verifying %s credentails...", self.service) responce = requests.get(f"{self.service}/api/v1/accounts/verify_credentials", headers={ 'Authorization': f'Bearer {self.token}' }) if responce.status_code != 200: LOGGER.error("Failed to validate user credentials!") responce.raise_for_status() return self.user_id: str = responce.json()["id"] LOGGER.info("Getting %s configuration...", self.service) responce = requests.get(f"{self.service}/api/v1/instance", headers={ 'Authorization': f'Bearer {self.token}' }) if responce.status_code != 200: LOGGER.error("Failed to get instance info!") responce.raise_for_status() return instance_info: dict = responce.json() configuration: dict = instance_info['configuration'] statuses_config: dict = configuration.get('statuses', {}) self.max_characters: int = statuses_config.get('max_characters', 500) self.max_media_attachments: int = statuses_config.get('max_media_attachments', 4) self.characters_reserved_per_url: int = statuses_config.get('characters_reserved_per_url', 23) media_config: dict = configuration.get('media_attachments', {}) self.image_size_limit: int = media_config.get('image_size_limit', 16777216) self.video_size_limit: int = media_config.get('video_size_limit', 103809024) self.supported_mime_types: list[str] = media_config.get('supported_mime_types', POSSIBLE_MIMES) # *oma: max post chars max_toot_chars = instance_info.get('max_toot_chars') if max_toot_chars: self.max_characters: int = max_toot_chars # *oma: max upload limit upload_limit = instance_info.get('upload_limit') if upload_limit: self.image_size_limit: int = upload_limit self.video_size_limit: int = upload_limit # *oma ext: supported text types self.text_format = 'text/plain' pleroma = instance_info.get('pleroma') if pleroma: post_formats: list[str] = pleroma.get('metadata', {}).get('post_formats', []) if 'text/x.misskeymarkdown' in post_formats: self.text_format = 'text/x.misskeymarkdown' elif 'text/markdown' in post_formats: self.text_format = 'text/markdown' def upload_media(self, attachments: list[media_util.MediaInfo]) -> list[str] | None: for a in attachments: if a.mime.startswith('image/') and len(a.io) > self.image_size_limit: return None if a.mime.startswith('video/') and len(a.io) > self.video_size_limit: return None if not a.mime.startswith('image/') and not a.mime.startswith('video/'): if len(a.io) > 7_000_000: return None uploads: list[dict] = [] for a in attachments: data = {} if a.alt: data['description'] = a.alt req = requests.post(f"{self.service}/api/v2/media", headers= { 'Authorization': f'Bearer {self.token}' }, files={'file': (a.name, a.io, a.mime)}, data=data) if req.status_code == 200: LOGGER.info("Uploaded %s! (%s)", a.name, req.json()['id']) uploads.append({ 'done': True, 'id': req.json()['id'] }) elif req.status_code == 202: LOGGER.info("Waiting for %s to process!", a.name) uploads.append({ 'done': False, 'id': req.json()['id'] }) else: LOGGER.error("Failed to upload %s! %s", a.name, req.text) req.raise_for_status() while any([not val['done'] for val in uploads]): LOGGER.info("Waiting for media to process...") time.sleep(3) for media in uploads: if media['done']: continue reqs = requests.get(f'{self.service}/api/v1/media/{media['id']}', headers={ 'Authorization': f'Bearer {self.token}' }) if reqs.status_code == 206: continue if reqs.status_code == 200: media['done'] = True continue reqs.raise_for_status() return [val['id'] for val in uploads] def token_to_string(self, tokens: list[cross.Token]) -> str | None: p_text: str = '' for token in tokens: if isinstance(token, cross.TextToken): p_text += token.text elif isinstance(token, cross.TagToken): p_text += '#' + token.tag elif isinstance(token, cross.LinkToken): if util.canonical_label(token.label, token.href): p_text += token.href else: if self.text_format == 'text/plain': p_text += f'{token.label}: {token.href}' elif self.text_format in {'text/x.misskeymarkdown', 'text/markdown'}: p_text += f'[{token.label}]({token.href})' else: return None return p_text def split_tokens_media(self, tokens: list[cross.Token], media: list[media_util.MediaInfo]): split_tokens = cross.split_tokens(tokens, self.max_characters, self.characters_reserved_per_url) post_text: list[str] = [] for block in split_tokens: baked_text = self.token_to_string(block) if baked_text is None: return None post_text.append(baked_text) if not post_text: post_text = [''] posts: list[dict] = [{"text": post_text, "attachments": []} for post_text in post_text] available_indices: list[int] = list(range(len(posts))) current_image_post_idx: int | None = None def make_blank_post() -> dict: return { "text": '', "attachments": [] } def pop_next_empty_index() -> int: if available_indices: return available_indices.pop(0) else: new_idx = len(posts) posts.append(make_blank_post()) return new_idx for att in media: if ( current_image_post_idx is not None and len(posts[current_image_post_idx]["attachments"]) < self.max_media_attachments ): posts[current_image_post_idx]["attachments"].append(att) else: idx = pop_next_empty_index() posts[idx]["attachments"].append(att) current_image_post_idx = idx result: list[tuple[str, list[media_util.MediaInfo]]] = [] for p in posts: result.append((p['text'], p["attachments"])) return result def accept_post(self, post: cross.Post): parent_id = post.get_parent_id() new_root_id: int | None = None new_parent_id: int | None = None reply_ref: str | None = None if parent_id: thread_tuple = database.find_mapped_thread( self.db, parent_id, self.input.user_id, self.input.service, self.user_id, self.service ) if not thread_tuple: LOGGER.error("Failed to find thread tuple in the database!") return None _, reply_ref, new_root_id, new_parent_id = thread_tuple lang: str if post.get_languages(): lang = post.get_languages()[0] else: lang = 'en' raw_statuses = self.split_tokens_media(post.get_tokens(), post.get_attachments()) if not raw_statuses: LOGGER.error("Failed to split post into statuses?") return None baked_statuses = [] for status, raw_media in raw_statuses: media: list[str] | None = None if raw_media: media = self.upload_media(raw_media) if not media: LOGGER.error("Failed to upload attachments!") return None baked_statuses.append((status, media)) continue baked_statuses.append((status,[])) created_statuses: list[str] = [] for status, media in baked_statuses: payload = { 'status': status, 'media_ids': media or [], 'spoiler_text': post.get_cw(), 'visibility': self.options.get('visibility', 'public'), 'content_type': self.text_format, 'language': lang } if media: payload['sensitive'] = post.is_sensitive() if post.get_cw(): payload['sensitive'] = True if not status: payload['status'] = '🖼️' if reply_ref: payload['in_reply_to_id'] = reply_ref reqs = requests.post(f'{self.service}/api/v1/statuses', headers={ 'Authorization': f'Bearer {self.token}', 'Content-Type': 'application/json' }, json=payload) if reqs.status_code != 200: LOGGER.info("Failed to post status! %s - %s", reqs.status_code, reqs.text) reqs.raise_for_status() reply_ref = reqs.json()['id'] LOGGER.info("Created new status %s!", reply_ref) created_statuses.append(reqs.json()['id']) db_post = database.find_post(self.db, post.get_id(), self.input.user_id, self.input.service) assert db_post, "ghghghhhhh" if new_root_id is None or new_parent_id is None: new_root_id = database.insert_post( self.db, created_statuses[0], self.user_id, self.service ) new_parent_id = new_root_id database.insert_mapping(self.db, db_post['id'], new_parent_id) created_statuses = created_statuses[1:] for db_id in created_statuses: new_parent_id = database.insert_reply( self.db, db_id, self.user_id, self.service, new_parent_id, new_root_id ) database.insert_mapping(self.db, db_post['id'], new_parent_id) def delete_post(self, identifier: str): post = database.find_post(self.db, identifier, self.input.user_id, self.input.service) if not post: return mappings = database.find_mappings(self.db, post['id'], self.service, self.user_id) for mapping in mappings[::-1]: LOGGER.info("Deleting '%s'...", mapping[0]) requests.delete(f'{self.service}/api/v1/statuses/{mapping[0]}', headers={ 'Authorization': f'Bearer {self.token}' }) database.delete_post(self.db, mapping[0], self.service, self.user_id)