social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
1import requests, time 2 3import cross, util.database as database 4import misskey.mfm_util as mfm_util 5from util.util import LOGGER, as_envvar, canonical_label 6from util.media import MediaInfo 7from util.database import DataBaseWorker 8 9POSSIBLE_MIMES = [ 10 'audio/ogg', 11 'audio/mp3', 12 'image/webp', 13 'image/jpeg', 14 'image/png', 15 'video/mp4', 16 'video/quicktime', 17 'video/webm' 18] 19 20ALLOWED_POSTING_VISIBILITY = ['public', 'unlisted', 'private'] 21 22class MastodonOutputOptions(): 23 def __init__(self, o: dict) -> None: 24 self.visibility = 'public' 25 26 visibility = o.get('visibility') 27 if visibility is not None: 28 if visibility not in ALLOWED_POSTING_VISIBILITY: 29 raise ValueError(f"'visibility' only accepts {', '.join(ALLOWED_POSTING_VISIBILITY)}, got: {visibility}") 30 self.visibility = visibility 31 32class MastodonOutput(cross.Output): 33 def __init__(self, input: cross.Input, settings: dict, db: DataBaseWorker) -> None: 34 super().__init__(input, settings, db) 35 self.options = settings.get('options') or {} 36 self.token = as_envvar(settings.get('token')) or (_ for _ in ()).throw(ValueError("'token' is required")) 37 instance: str = as_envvar(settings.get('instance')) or (_ for _ in ()).throw(ValueError("'instance' is required")) 38 39 self.service = instance[:-1] if instance.endswith('/') else instance 40 41 LOGGER.info("Verifying %s credentails...", self.service) 42 responce = requests.get(f"{self.service}/api/v1/accounts/verify_credentials", headers={ 43 'Authorization': f'Bearer {self.token}' 44 }) 45 if responce.status_code != 200: 46 LOGGER.error("Failed to validate user credentials!") 47 responce.raise_for_status() 48 return 49 self.user_id: str = responce.json()["id"] 50 51 LOGGER.info("Getting %s configuration...", self.service) 52 responce = requests.get(f"{self.service}/api/v1/instance", headers={ 53 'Authorization': f'Bearer {self.token}' 54 }) 55 if responce.status_code != 200: 56 LOGGER.error("Failed to get instance info!") 57 responce.raise_for_status() 58 return 59 60 instance_info: dict = responce.json() 61 configuration: dict = instance_info['configuration'] 62 63 statuses_config: dict = configuration.get('statuses', {}) 64 self.max_characters: int = statuses_config.get('max_characters', 500) 65 self.max_media_attachments: int = statuses_config.get('max_media_attachments', 4) 66 self.characters_reserved_per_url: int = statuses_config.get('characters_reserved_per_url', 23) 67 68 media_config: dict = configuration.get('media_attachments', {}) 69 self.image_size_limit: int = media_config.get('image_size_limit', 16777216) 70 self.video_size_limit: int = media_config.get('video_size_limit', 103809024) 71 self.supported_mime_types: list[str] = media_config.get('supported_mime_types', POSSIBLE_MIMES) 72 73 # *oma: max post chars 74 max_toot_chars = instance_info.get('max_toot_chars') 75 if max_toot_chars: 76 self.max_characters: int = max_toot_chars 77 78 # *oma: max upload limit 79 upload_limit = instance_info.get('upload_limit') 80 if upload_limit: 81 self.image_size_limit: int = upload_limit 82 self.video_size_limit: int = upload_limit 83 84 # *oma ext: supported text types 85 self.text_format = 'text/plain' 86 pleroma = instance_info.get('pleroma') 87 if pleroma: 88 post_formats: list[str] = pleroma.get('metadata', {}).get('post_formats', []) 89 if 'text/x.misskeymarkdown' in post_formats: 90 self.text_format = 'text/x.misskeymarkdown' 91 elif 'text/markdown' in post_formats: 92 self.text_format = 'text/markdown' 93 94 def upload_media(self, attachments: list[MediaInfo]) -> list[str] | None: 95 for a in attachments: 96 if a.mime.startswith('image/') and len(a.io) > self.image_size_limit: 97 return None 98 99 if a.mime.startswith('video/') and len(a.io) > self.video_size_limit: 100 return None 101 102 if not a.mime.startswith('image/') and not a.mime.startswith('video/'): 103 if len(a.io) > 7_000_000: 104 return None 105 106 uploads: list[dict] = [] 107 for a in attachments: 108 data = {} 109 if a.alt: 110 data['description'] = a.alt 111 112 req = requests.post(f"{self.service}/api/v2/media", headers= { 113 'Authorization': f'Bearer {self.token}' 114 }, files={'file': (a.name, a.io, a.mime)}, data=data) 115 116 if req.status_code == 200: 117 LOGGER.info("Uploaded %s! (%s)", a.name, req.json()['id']) 118 uploads.append({ 119 'done': True, 120 'id': req.json()['id'] 121 }) 122 elif req.status_code == 202: 123 LOGGER.info("Waiting for %s to process!", a.name) 124 uploads.append({ 125 'done': False, 126 'id': req.json()['id'] 127 }) 128 else: 129 LOGGER.error("Failed to upload %s! %s", a.name, req.text) 130 req.raise_for_status() 131 132 while any([not val['done'] for val in uploads]): 133 LOGGER.info("Waiting for media to process...") 134 time.sleep(3) 135 for media in uploads: 136 if media['done']: 137 continue 138 139 reqs = requests.get(f'{self.service}/api/v1/media/{media['id']}', headers={ 140 'Authorization': f'Bearer {self.token}' 141 }) 142 143 if reqs.status_code == 206: 144 continue 145 146 if reqs.status_code == 200: 147 media['done'] = True 148 continue 149 reqs.raise_for_status() 150 151 return [val['id'] for val in uploads] 152 153 def token_to_string(self, tokens: list[cross.Token]) -> str | None: 154 p_text: str = '' 155 156 for token in tokens: 157 if isinstance(token, cross.TextToken): 158 p_text += token.text 159 elif isinstance(token, cross.TagToken): 160 p_text += '#' + token.tag 161 elif isinstance(token, cross.LinkToken): 162 if canonical_label(token.label, token.href): 163 p_text += token.href 164 else: 165 if self.text_format == 'text/plain': 166 p_text += f'{token.label}: {token.href}' 167 elif self.text_format in {'text/x.misskeymarkdown', 'text/markdown'}: 168 p_text += f'[{token.label}]({token.href})' 169 else: 170 return None 171 172 return p_text 173 174 def split_tokens_media(self, tokens: list[cross.Token], media: list[MediaInfo]): 175 split_tokens = cross.split_tokens(tokens, self.max_characters, self.characters_reserved_per_url) 176 post_text: list[str] = [] 177 178 for block in split_tokens: 179 baked_text = self.token_to_string(block) 180 181 if baked_text is None: 182 return None 183 post_text.append(baked_text) 184 185 if not post_text: 186 post_text = [''] 187 188 posts: list[dict] = [{"text": post_text, "attachments": []} for post_text in post_text] 189 available_indices: list[int] = list(range(len(posts))) 190 191 current_image_post_idx: int | None = None 192 193 def make_blank_post() -> dict: 194 return { 195 "text": '', 196 "attachments": [] 197 } 198 199 def pop_next_empty_index() -> int: 200 if available_indices: 201 return available_indices.pop(0) 202 else: 203 new_idx = len(posts) 204 posts.append(make_blank_post()) 205 return new_idx 206 207 for att in media: 208 if ( 209 current_image_post_idx is not None 210 and len(posts[current_image_post_idx]["attachments"]) < self.max_media_attachments 211 ): 212 posts[current_image_post_idx]["attachments"].append(att) 213 else: 214 idx = pop_next_empty_index() 215 posts[idx]["attachments"].append(att) 216 current_image_post_idx = idx 217 218 result: list[tuple[str, list[MediaInfo]]] = [] 219 220 for p in posts: 221 result.append((p['text'], p["attachments"])) 222 223 return result 224 225 def accept_post(self, post: cross.Post): 226 parent_id = post.get_parent_id() 227 228 new_root_id: int | None = None 229 new_parent_id: int | None = None 230 231 reply_ref: str | None = None 232 if parent_id: 233 thread_tuple = database.find_mapped_thread( 234 self.db, 235 parent_id, 236 self.input.user_id, 237 self.input.service, 238 self.user_id, 239 self.service 240 ) 241 242 if not thread_tuple: 243 LOGGER.error("Failed to find thread tuple in the database!") 244 return None 245 246 _, reply_ref, new_root_id, new_parent_id = thread_tuple 247 248 lang: str 249 if post.get_languages(): 250 lang = post.get_languages()[0] 251 else: 252 lang = 'en' 253 254 post_tokens = post.get_tokens() 255 if post.get_text_type() == "text/x.misskeymarkdown": 256 post_tokens, status = mfm_util.strip_mfm(post_tokens) 257 post_url = post.get_post_url() 258 if status and post_url: 259 post_tokens.append(cross.TextToken('\n')) 260 post_tokens.append(cross.LinkToken(post_url, "[Post contains MFM, see original]")) 261 262 raw_statuses = self.split_tokens_media(post_tokens, post.get_attachments()) 263 if not raw_statuses: 264 LOGGER.error("Failed to split post into statuses?") 265 return None 266 baked_statuses = [] 267 268 for status, raw_media in raw_statuses: 269 media: list[str] | None = None 270 if raw_media: 271 media = self.upload_media(raw_media) 272 if not media: 273 LOGGER.error("Failed to upload attachments!") 274 return None 275 baked_statuses.append((status, media)) 276 continue 277 baked_statuses.append((status,[])) 278 279 created_statuses: list[str] = [] 280 281 for status, media in baked_statuses: 282 payload = { 283 'status': status, 284 'media_ids': media or [], 285 'spoiler_text': post.get_cw(), 286 'visibility': self.options.get('visibility', 'public'), 287 'content_type': self.text_format, 288 'language': lang 289 } 290 291 if media: 292 payload['sensitive'] = post.is_sensitive() 293 294 if post.get_cw(): 295 payload['sensitive'] = True 296 297 if not status: 298 payload['status'] = '🖼️' 299 300 if reply_ref: 301 payload['in_reply_to_id'] = reply_ref 302 303 reqs = requests.post(f'{self.service}/api/v1/statuses', headers={ 304 'Authorization': f'Bearer {self.token}', 305 'Content-Type': 'application/json' 306 }, json=payload) 307 308 if reqs.status_code != 200: 309 LOGGER.info("Failed to post status! %s - %s", reqs.status_code, reqs.text) 310 reqs.raise_for_status() 311 312 reply_ref = reqs.json()['id'] 313 LOGGER.info("Created new status %s!", reply_ref) 314 315 created_statuses.append(reqs.json()['id']) 316 317 db_post = database.find_post(self.db, post.get_id(), self.input.user_id, self.input.service) 318 assert db_post, "ghghghhhhh" 319 320 if new_root_id is None or new_parent_id is None: 321 new_root_id = database.insert_post( 322 self.db, 323 created_statuses[0], 324 self.user_id, 325 self.service 326 ) 327 new_parent_id = new_root_id 328 database.insert_mapping(self.db, db_post['id'], new_parent_id) 329 created_statuses = created_statuses[1:] 330 331 for db_id in created_statuses: 332 new_parent_id = database.insert_reply( 333 self.db, 334 db_id, 335 self.user_id, 336 self.service, 337 new_parent_id, 338 new_root_id 339 ) 340 database.insert_mapping(self.db, db_post['id'], new_parent_id) 341 342 def delete_post(self, identifier: str): 343 post = database.find_post(self.db, identifier, self.input.user_id, self.input.service) 344 if not post: 345 return 346 347 mappings = database.find_mappings(self.db, post['id'], self.service, self.user_id) 348 for mapping in mappings[::-1]: 349 LOGGER.info("Deleting '%s'...", mapping[0]) 350 requests.delete(f'{self.service}/api/v1/statuses/{mapping[0]}', headers={ 351 'Authorization': f'Bearer {self.token}' 352 }) 353 database.delete_post(self.db, mapping[0], self.service, self.user_id) 354 355 def accept_repost(self, repost_id: str, reposted_id: str): 356 repost = self.__delete_repost(repost_id) 357 if not repost: 358 return None 359 360 reposted = database.find_post(self.db, reposted_id, self.input.user_id, self.input.service) 361 if not reposted: 362 return 363 364 mappings = database.find_mappings(self.db, reposted['id'], self.service, self.user_id) 365 if mappings: 366 rsp = requests.post(f'{self.service}/api/v1/statuses/{mappings[0][0]}/reblog', headers={ 367 'Authorization': f'Bearer {self.token}' 368 }) 369 370 if rsp.status_code != 200: 371 LOGGER.error("Failed to boost status! status_code: %s, msg: %s", rsp.status_code, rsp.content) 372 return 373 374 internal_id = database.insert_repost( 375 self.db, 376 rsp.json()['id'], 377 reposted['id'], 378 self.user_id, 379 self.service) 380 database.insert_mapping(self.db, repost['id'], internal_id) 381 382 def __delete_repost(self, repost_id: str) -> dict | None: 383 repost = database.find_post(self.db, repost_id, self.input.user_id, self.input.service) 384 if not repost: 385 return None 386 387 mappings = database.find_mappings(self.db, repost['id'], self.service, self.user_id) 388 reposted_mappings = database.find_mappings(self.db, repost['reposted_id'], self.service, self.user_id) 389 if mappings and reposted_mappings: 390 LOGGER.info("Deleting '%s'...", mappings[0][0]) 391 requests.post(f'{self.service}/api/v1/statuses/{reposted_mappings[0][0]}/unreblog', headers={ 392 'Authorization': f'Bearer {self.token}' 393 }) 394 database.delete_post(self.db, mappings[0][0], self.user_id, self.service) 395 return repost 396 397 def delete_repost(self, repost_id: str): 398 self.__delete_repost(repost_id)