social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
1from util import LOGGER 2import requests, websockets 3import util, media_util, json, cross 4import database 5from database import DataBaseWorker 6from typing import Callable, Any 7import asyncio 8 9from bs4 import BeautifulSoup, Tag 10from bs4.element import NavigableString 11from markdownify import markdownify as md 12 13FORMATS = { 14 'video': 'video', 15 'image': 'image', 16 'gifv': 'gif', 17 'audio': 'audio', 18 'unknown': 'other' 19} 20 21def tokenize_post(status: dict) -> list[cross.Token]: 22 soup = BeautifulSoup(status['content'], "html.parser") 23 tokens: list[cross.Token] = [] 24 25 tags: list[dict] = status.get('tags', []) 26 mentions: list[dict] = status.get('mentions', []) 27 28 def mdd(html): 29 return md(html, escape_asterisks=False, escape_underscores=False) 30 31 def recurse(node) -> None: 32 if isinstance(node, NavigableString): 33 tokens.append(cross.TextToken(str(node))) 34 return 35 36 if isinstance(node, Tag): 37 if node.name.lower() == "a": 38 href = node.get("href", "") 39 inner_html = "".join(str(c) for c in node.contents) 40 link_text_md = mdd(inner_html) 41 42 if link_text_md.startswith('@'): 43 as_mention = link_text_md[1:] 44 for block in mentions: 45 if href == block.get('url'): 46 tokens.append(cross.MentionToken(block['acct'], block['url'])) 47 return 48 elif as_mention == block.get('acct') or as_mention == block.get('username'): 49 tokens.append(cross.MentionToken(block['acct'], block['url'])) 50 return 51 52 if link_text_md.startswith('#'): 53 as_tag = link_text_md[1:].lower() 54 if any(as_tag == block.get('name') for block in tags): 55 tokens.append(cross.TagToken(link_text_md[1:])) 56 return 57 58 # idk if we can safely convert this to string 59 tokens.append(cross.LinkToken(str(href), link_text_md)) 60 return 61 62 if node.find("a") is not None: 63 for child in node.contents: 64 recurse(child) 65 return 66 67 serialized = str(node) 68 markdownified = mdd(serialized) 69 if markdownified: 70 tokens.append(cross.TextToken(markdownified)) 71 return 72 return 73 74 for child in soup.contents: 75 recurse(child) 76 77 return tokens 78 79class MastodonPost(cross.Post): 80 def __init__(self, status: dict) -> None: 81 super().__init__() 82 self.status = status 83 media_attachments: list[cross.MediaAttachment] = [] 84 85 for attachment in status.get('media_attachments', []): 86 media_attachments.append(MastodonAttachment(attachment)) 87 88 self.media_attachments = media_attachments 89 90 self.tokens = tokenize_post(status) 91 92 def get_tokens(self) -> list[cross.Token]: 93 return self.tokens 94 95 def get_parent_id(self) -> str | None: 96 return self.status.get('in_reply_to_id') 97 98 def get_cw(self) -> str: 99 return util.safe_get(self.status, 'spoiler_text', '') 100 101 def get_id(self) -> str: 102 return self.status['id'] 103 104 def get_languages(self) -> list[str]: 105 if self.status.get('language'): 106 return [self.status['language']] 107 return [] 108 109 def is_sensitive(self) -> bool: 110 return self.status.get('sensitive', False) 111 112 def get_attachments(self) -> list[cross.MediaAttachment]: 113 return self.media_attachments 114 115class MastodonAttachment(cross.MediaAttachment): 116 def __init__(self, attachment: dict) -> None: 117 super().__init__() 118 self.attachment = attachment 119 120 if attachment.get('type') == 'video' or attachment.get('type') == 'image': 121 if attachment.get('meta') and attachment.get('meta', {}).get('original'): 122 def from_status(bytes: bytes) -> cross.MediaMeta: 123 o_meta = attachment.get('meta', {}).get('original') 124 return cross.MediaMeta(o_meta['width'], o_meta['height'], o_meta.get('duration', -1)) 125 self.meta_generator = from_status 126 else: 127 def from_bytes(bytes: bytes) -> cross.MediaMeta: 128 o_meta = media_util.get_media_meta(bytes) 129 return cross.MediaMeta(o_meta['width'], o_meta['height'], o_meta.get('duration', -1)) 130 self.meta_generator = from_bytes 131 132 # URL to download the attachment from 133 def get_url(self) -> str: 134 return self.attachment.get('url', '') 135 136 # type of attachment 137 def get_type(self) -> str | None: 138 return FORMATS.get(self.attachment.get('type', 'other'), 'other') 139 140 # create file metadata from bytes or other 141 def create_meta(self, bytes: bytes) -> cross.MediaMeta: 142 if self.meta_generator: 143 return self.meta_generator(bytes) 144 return cross.MediaMeta(-1, -1, -1) 145 146 # get media description 147 def get_alt(self) -> str: 148 return util.safe_get(self.attachment, 'description', '') 149 150class MastodonInput(cross.Input): 151 def __init__(self, settings: dict, db: DataBaseWorker) -> None: 152 self.options = settings.get('options', {}) 153 self.token = util.get_or_envvar(settings, 'token') 154 instance: str = util.get_or_envvar(settings, 'instance') 155 156 service = instance[:-1] if instance.endswith('/') else instance 157 158 LOGGER.info("Verifying %s credentails...", service) 159 responce = requests.get(f"{service}/api/v1/accounts/verify_credentials", headers={ 160 'Authorization': f'Bearer {self.token}' 161 }) 162 if responce.status_code != 200: 163 LOGGER.error("Failed to validate user credentials!") 164 responce.raise_for_status() 165 return 166 167 super().__init__(service, responce.json()["id"], settings, db) 168 self.streaming = self._get_streaming_url() 169 170 if not self.streaming: 171 raise Exception("Instance %s does not support streaming!", service) 172 173 def _get_streaming_url(self): 174 response = requests.get(f"{self.service}/api/v1/instance") 175 response.raise_for_status() 176 data: dict = response.json() 177 return util.safe_get(data, "urls", {}).get("streaming_api") 178 179 def _on_create_post(self, outputs: list[cross.Output], status: dict): 180 # skip events from other users 181 if util.safe_get(status, 'account', {})['id'] != self.user_id: 182 return 183 184 if status.get('reblog') or status.get('poll'): 185 # TODO polls not supported on bsky. maybe 3rd party? skip for now 186 # we don't handle reblogs. possible with bridgy(?) and self 187 LOGGER.info("Skipping '%s'! Reblog or poll..", status['id']) 188 return 189 190 in_reply: str | None = status.get('in_reply_to_id') 191 in_reply_to: str | None = status.get('in_reply_to_account_id') 192 if in_reply_to and in_reply_to != self.user_id: 193 # We don't support replies. 194 LOGGER.info("Skipping '%s'! Reply to other user..", status['id']) 195 return 196 197 if status.get('visibility') not in self.options.get('allowed_visibility', []): 198 # Skip f/o and direct posts 199 LOGGER.info("Skipping '%s'! '%s' visibility..", status['id'], status.get('visibility')) 200 return 201 202 root_id = None 203 parent_id = None 204 if in_reply: 205 parent_post = database.find_post(self.db, in_reply, self.user_id, self.service) 206 if not parent_post: 207 LOGGER.info("Skipping '%s' as parent post was not found in db!", status['id']) 208 return 209 210 root_id = parent_post['id'] 211 parent_id = root_id 212 if parent_post['root_id']: 213 root_id = parent_post['root_id'] 214 215 LOGGER.info("Crossposting '%s'...", status['id']) 216 if root_id and parent_id: 217 database.insert_reply( 218 self.db, 219 status['id'], 220 self.user_id, 221 self.service, 222 parent_id, 223 root_id 224 ) 225 else: 226 database.insert_post( 227 self.db, 228 status['id'], 229 self.user_id, 230 self.service 231 ) 232 233 cross_post = MastodonPost(status) 234 for output in outputs: 235 output.accept_post(cross_post) 236 237 def _on_delete_post(self, outputs: list[cross.Output], identifier: str): 238 post = database.find_post(self.db, identifier, self.user_id, self.service) 239 if not post: 240 return 241 242 LOGGER.info("Deleting '%s'...", identifier) 243 for output in outputs: 244 output.delete_post(identifier) 245 database.delete_post(self.db, identifier, self.user_id, self.service) 246 247 def _on_post(self, outputs: list[cross.Output], event: str, payload: str): 248 if event == 'update': 249 self._on_create_post(outputs, json.loads(payload)) 250 elif event == 'delete': 251 self._on_delete_post(outputs, payload) 252 253 async def listen(self, outputs: list[cross.Output], submit: Callable[[Callable[[], Any]], Any]): 254 uri = f"{self.streaming}/api/v1/streaming?stream=user&access_token={self.token}" 255 256 async for ws in websockets.connect(uri, extra_headers={"User-Agent": "XPost/0.0.2"}): 257 try: 258 LOGGER.info("Listening to %s...", self.streaming) 259 260 async def listen_for_messages(): 261 async for msg in ws: 262 data = json.loads(msg) 263 event: str = data.get('event') 264 payload: str = data.get('payload') 265 266 submit(lambda: self._on_post(outputs, str(event), str(payload))) 267 268 listen = asyncio.create_task(listen_for_messages()) 269 270 await asyncio.gather(listen) 271 except websockets.ConnectionClosedError as e: 272 LOGGER.error(e, stack_info=True, exc_info=True) 273 LOGGER.info("Reconnecting to %s...", self.streaming) 274 continue