social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
at master 16 kB view raw
1import time 2 3import requests 4 5import cross 6import misskey.mfm_util as mfm_util 7import util.database as database 8from util.database import DataBaseWorker 9from util.media import MediaInfo 10from util.util import LOGGER, as_envvar, canonical_label 11 12POSSIBLE_MIMES = [ 13 "audio/ogg", 14 "audio/mp3", 15 "image/webp", 16 "image/jpeg", 17 "image/png", 18 "video/mp4", 19 "video/quicktime", 20 "video/webm", 21] 22 23TEXT_MIMES = ["text/x.misskeymarkdown", "text/markdown", "text/plain"] 24 25ALLOWED_POSTING_VISIBILITY = ["public", "unlisted", "private"] 26 27 28class MastodonOutputOptions: 29 def __init__(self, o: dict) -> None: 30 self.visibility = "public" 31 32 visibility = o.get("visibility") 33 if visibility is not None: 34 if visibility not in ALLOWED_POSTING_VISIBILITY: 35 raise ValueError( 36 f"'visibility' only accepts {', '.join(ALLOWED_POSTING_VISIBILITY)}, got: {visibility}" 37 ) 38 self.visibility = visibility 39 40 41class MastodonOutput(cross.Output): 42 def __init__(self, input: cross.Input, settings: dict, db: DataBaseWorker) -> None: 43 super().__init__(input, settings, db) 44 self.options = settings.get("options") or {} 45 self.token = as_envvar(settings.get("token")) or (_ for _ in ()).throw( 46 ValueError("'token' is required") 47 ) 48 instance: str = as_envvar(settings.get("instance")) or (_ for _ in ()).throw( 49 ValueError("'instance' is required") 50 ) 51 52 self.service = instance[:-1] if instance.endswith("/") else instance 53 54 LOGGER.info("Verifying %s credentails...", self.service) 55 responce = requests.get( 56 f"{self.service}/api/v1/accounts/verify_credentials", 57 headers={"Authorization": f"Bearer {self.token}"}, 58 ) 59 if responce.status_code != 200: 60 LOGGER.error("Failed to validate user credentials!") 61 responce.raise_for_status() 62 return 63 self.user_id: str = responce.json()["id"] 64 65 LOGGER.info("Getting %s configuration...", self.service) 66 responce = requests.get( 67 f"{self.service}/api/v1/instance", 68 headers={"Authorization": f"Bearer {self.token}"}, 69 ) 70 if responce.status_code != 200: 71 LOGGER.error("Failed to get instance info!") 72 responce.raise_for_status() 73 return 74 75 instance_info: dict = responce.json() 76 configuration: dict = instance_info["configuration"] 77 78 statuses_config: dict = configuration.get("statuses", {}) 79 self.max_characters: int = statuses_config.get("max_characters", 500) 80 self.max_media_attachments: int = statuses_config.get( 81 "max_media_attachments", 4 82 ) 83 self.characters_reserved_per_url: int = statuses_config.get( 84 "characters_reserved_per_url", 23 85 ) 86 87 media_config: dict = configuration.get("media_attachments", {}) 88 self.image_size_limit: int = media_config.get("image_size_limit", 16777216) 89 self.video_size_limit: int = media_config.get("video_size_limit", 103809024) 90 self.supported_mime_types: list[str] = media_config.get( 91 "supported_mime_types", POSSIBLE_MIMES 92 ) 93 94 # *oma: max post chars 95 max_toot_chars = instance_info.get("max_toot_chars") 96 if max_toot_chars: 97 self.max_characters: int = max_toot_chars 98 99 # *oma: max upload limit 100 upload_limit = instance_info.get("upload_limit") 101 if upload_limit: 102 self.image_size_limit: int = upload_limit 103 self.video_size_limit: int = upload_limit 104 105 # chuckya: supported text types 106 chuckya_text_mimes: list[str] = statuses_config.get("supported_mime_types", []) 107 self.text_format = next( 108 (mime for mime in TEXT_MIMES if mime in (chuckya_text_mimes)), "text/plain" 109 ) 110 111 # *oma ext: supported text types 112 pleroma = instance_info.get("pleroma") 113 if pleroma: 114 post_formats: list[str] = pleroma.get("metadata", {}).get( 115 "post_formats", [] 116 ) 117 self.text_format = next( 118 (mime for mime in TEXT_MIMES if mime in post_formats), self.text_format 119 ) 120 121 def upload_media(self, attachments: list[MediaInfo]) -> list[str] | None: 122 for a in attachments: 123 if a.mime.startswith("image/") and len(a.io) > self.image_size_limit: 124 return None 125 126 if a.mime.startswith("video/") and len(a.io) > self.video_size_limit: 127 return None 128 129 if not a.mime.startswith("image/") and not a.mime.startswith("video/"): 130 if len(a.io) > 7_000_000: 131 return None 132 133 uploads: list[dict] = [] 134 for a in attachments: 135 data = {} 136 if a.alt: 137 data["description"] = a.alt 138 139 req = requests.post( 140 f"{self.service}/api/v2/media", 141 headers={"Authorization": f"Bearer {self.token}"}, 142 files={"file": (a.name, a.io, a.mime)}, 143 data=data, 144 ) 145 146 if req.status_code == 200: 147 LOGGER.info("Uploaded %s! (%s)", a.name, req.json()["id"]) 148 uploads.append({"done": True, "id": req.json()["id"]}) 149 elif req.status_code == 202: 150 LOGGER.info("Waiting for %s to process!", a.name) 151 uploads.append({"done": False, "id": req.json()["id"]}) 152 else: 153 LOGGER.error("Failed to upload %s! %s", a.name, req.text) 154 req.raise_for_status() 155 156 while any([not val["done"] for val in uploads]): 157 LOGGER.info("Waiting for media to process...") 158 time.sleep(3) 159 for media in uploads: 160 if media["done"]: 161 continue 162 163 reqs = requests.get( 164 f"{self.service}/api/v1/media/{media['id']}", 165 headers={"Authorization": f"Bearer {self.token}"}, 166 ) 167 168 if reqs.status_code == 206: 169 continue 170 171 if reqs.status_code == 200: 172 media["done"] = True 173 continue 174 reqs.raise_for_status() 175 176 return [val["id"] for val in uploads] 177 178 def token_to_string(self, tokens: list[cross.Token]) -> str | None: 179 p_text: str = "" 180 181 for token in tokens: 182 if isinstance(token, cross.TextToken): 183 p_text += token.text 184 elif isinstance(token, cross.TagToken): 185 p_text += "#" + token.tag 186 elif isinstance(token, cross.LinkToken): 187 if canonical_label(token.label, token.href): 188 p_text += token.href 189 else: 190 if self.text_format == "text/plain": 191 p_text += f"{token.label} ({token.href})" 192 elif self.text_format in { 193 "text/x.misskeymarkdown", 194 "text/markdown", 195 }: 196 p_text += f"[{token.label}]({token.href})" 197 else: 198 return None 199 200 return p_text 201 202 def split_tokens_media(self, tokens: list[cross.Token], media: list[MediaInfo]): 203 split_tokens = cross.split_tokens( 204 tokens, self.max_characters, self.characters_reserved_per_url 205 ) 206 post_text: list[str] = [] 207 208 for block in split_tokens: 209 baked_text = self.token_to_string(block) 210 211 if baked_text is None: 212 return None 213 post_text.append(baked_text) 214 215 if not post_text: 216 post_text = [""] 217 218 posts: list[dict] = [ 219 {"text": post_text, "attachments": []} for post_text in post_text 220 ] 221 available_indices: list[int] = list(range(len(posts))) 222 223 current_image_post_idx: int | None = None 224 225 def make_blank_post() -> dict: 226 return {"text": "", "attachments": []} 227 228 def pop_next_empty_index() -> int: 229 if available_indices: 230 return available_indices.pop(0) 231 else: 232 new_idx = len(posts) 233 posts.append(make_blank_post()) 234 return new_idx 235 236 for att in media: 237 if ( 238 current_image_post_idx is not None 239 and len(posts[current_image_post_idx]["attachments"]) 240 < self.max_media_attachments 241 ): 242 posts[current_image_post_idx]["attachments"].append(att) 243 else: 244 idx = pop_next_empty_index() 245 posts[idx]["attachments"].append(att) 246 current_image_post_idx = idx 247 248 result: list[tuple[str, list[MediaInfo]]] = [] 249 250 for p in posts: 251 result.append((p["text"], p["attachments"])) 252 253 return result 254 255 def accept_post(self, post: cross.Post): 256 parent_id = post.get_parent_id() 257 258 new_root_id: int | None = None 259 new_parent_id: int | None = None 260 261 reply_ref: str | None = None 262 if parent_id: 263 thread_tuple = database.find_mapped_thread( 264 self.db, 265 parent_id, 266 self.input.user_id, 267 self.input.service, 268 self.user_id, 269 self.service, 270 ) 271 272 if not thread_tuple: 273 LOGGER.error("Failed to find thread tuple in the database!") 274 return None 275 276 _, reply_ref, new_root_id, new_parent_id = thread_tuple 277 278 lang: str 279 if post.get_languages(): 280 lang = post.get_languages()[0] 281 else: 282 lang = "en" 283 284 post_tokens = post.get_tokens() 285 if post.get_text_type() == "text/x.misskeymarkdown": 286 post_tokens, status = mfm_util.strip_mfm(post_tokens) 287 post_url = post.get_post_url() 288 if status and post_url: 289 post_tokens.append(cross.TextToken("\n")) 290 post_tokens.append( 291 cross.LinkToken(post_url, "[Post contains MFM, see original]") 292 ) 293 294 raw_statuses = self.split_tokens_media(post_tokens, post.get_attachments()) 295 if not raw_statuses: 296 LOGGER.error("Failed to split post into statuses?") 297 return None 298 baked_statuses = [] 299 300 for status, raw_media in raw_statuses: 301 media: list[str] | None = None 302 if raw_media: 303 media = self.upload_media(raw_media) 304 if not media: 305 LOGGER.error("Failed to upload attachments!") 306 return None 307 baked_statuses.append((status, media)) 308 continue 309 baked_statuses.append((status, [])) 310 311 created_statuses: list[str] = [] 312 313 for status, media in baked_statuses: 314 payload = { 315 "status": status, 316 "media_ids": media or [], 317 "spoiler_text": post.get_spoiler() or "", 318 "visibility": self.options.get("visibility", "public"), 319 "content_type": self.text_format, 320 "language": lang, 321 } 322 323 if media: 324 payload["sensitive"] = post.is_sensitive() 325 326 if post.get_spoiler(): 327 payload["sensitive"] = True 328 329 if not status: 330 payload["status"] = "🖼️" 331 332 if reply_ref: 333 payload["in_reply_to_id"] = reply_ref 334 335 reqs = requests.post( 336 f"{self.service}/api/v1/statuses", 337 headers={ 338 "Authorization": f"Bearer {self.token}", 339 "Content-Type": "application/json", 340 }, 341 json=payload, 342 ) 343 344 if reqs.status_code != 200: 345 LOGGER.info( 346 "Failed to post status! %s - %s", reqs.status_code, reqs.text 347 ) 348 reqs.raise_for_status() 349 350 reply_ref = reqs.json()["id"] 351 LOGGER.info("Created new status %s!", reply_ref) 352 353 created_statuses.append(reqs.json()["id"]) 354 355 db_post = database.find_post( 356 self.db, post.get_id(), self.input.user_id, self.input.service 357 ) 358 assert db_post, "ghghghhhhh" 359 360 if new_root_id is None or new_parent_id is None: 361 new_root_id = database.insert_post( 362 self.db, created_statuses[0], self.user_id, self.service 363 ) 364 new_parent_id = new_root_id 365 database.insert_mapping(self.db, db_post["id"], new_parent_id) 366 created_statuses = created_statuses[1:] 367 368 for db_id in created_statuses: 369 new_parent_id = database.insert_reply( 370 self.db, db_id, self.user_id, self.service, new_parent_id, new_root_id 371 ) 372 database.insert_mapping(self.db, db_post["id"], new_parent_id) 373 374 def delete_post(self, identifier: str): 375 post = database.find_post( 376 self.db, identifier, self.input.user_id, self.input.service 377 ) 378 if not post: 379 return 380 381 mappings = database.find_mappings( 382 self.db, post["id"], self.service, self.user_id 383 ) 384 for mapping in mappings[::-1]: 385 LOGGER.info("Deleting '%s'...", mapping[0]) 386 requests.delete( 387 f"{self.service}/api/v1/statuses/{mapping[0]}", 388 headers={"Authorization": f"Bearer {self.token}"}, 389 ) 390 database.delete_post(self.db, mapping[0], self.service, self.user_id) 391 392 def accept_repost(self, repost_id: str, reposted_id: str): 393 repost = self.__delete_repost(repost_id) 394 if not repost: 395 return None 396 397 reposted = database.find_post( 398 self.db, reposted_id, self.input.user_id, self.input.service 399 ) 400 if not reposted: 401 return 402 403 mappings = database.find_mappings( 404 self.db, reposted["id"], self.service, self.user_id 405 ) 406 if mappings: 407 rsp = requests.post( 408 f"{self.service}/api/v1/statuses/{mappings[0][0]}/reblog", 409 headers={"Authorization": f"Bearer {self.token}"}, 410 ) 411 412 if rsp.status_code != 200: 413 LOGGER.error( 414 "Failed to boost status! status_code: %s, msg: %s", 415 rsp.status_code, 416 rsp.content, 417 ) 418 return 419 420 internal_id = database.insert_repost( 421 self.db, rsp.json()["id"], reposted["id"], self.user_id, self.service 422 ) 423 database.insert_mapping(self.db, repost["id"], internal_id) 424 425 def __delete_repost(self, repost_id: str) -> dict | None: 426 repost = database.find_post( 427 self.db, repost_id, self.input.user_id, self.input.service 428 ) 429 if not repost: 430 return None 431 432 mappings = database.find_mappings( 433 self.db, repost["id"], self.service, self.user_id 434 ) 435 reposted_mappings = database.find_mappings( 436 self.db, repost["reposted_id"], self.service, self.user_id 437 ) 438 if mappings and reposted_mappings: 439 LOGGER.info("Deleting '%s'...", mappings[0][0]) 440 requests.post( 441 f"{self.service}/api/v1/statuses/{reposted_mappings[0][0]}/unreblog", 442 headers={"Authorization": f"Bearer {self.token}"}, 443 ) 444 database.delete_post(self.db, mappings[0][0], self.user_id, self.service) 445 return repost 446 447 def delete_repost(self, repost_id: str): 448 self.__delete_repost(repost_id)