social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
at next 6.4 kB view raw
1from dataclasses import dataclass 2from typing import Any, override 3 4import requests 5 6from cross.attachments import ( 7 LanguagesAttachment, 8 QuoteAttachment, 9 RemoteUrlAttachment, 10 SensitiveAttachment, 11) 12from cross.post import Post 13from cross.service import OutputService 14from database.connection import DatabasePool 15from mastodon.info import InstanceInfo, MastodonService, validate_and_transform 16 17ALLOWED_POSTING_VISIBILITY: list[str] = ["public", "unlisted", "private"] 18 19 20@dataclass(kw_only=True) 21class MastodonOutputOptions: 22 token: str 23 instance: str 24 visibility: str = "public" 25 26 @classmethod 27 def from_dict(cls, data: dict[str, Any]) -> "MastodonOutputOptions": 28 validate_and_transform(data) 29 30 if "visibility" in data: 31 if data["visibility"] not in ALLOWED_POSTING_VISIBILITY: 32 raise ValueError(f"Invalid visibility option {data['visibility']}!") 33 34 return MastodonOutputOptions(**data) 35 36 37# TODO 38class MastodonOutputService(MastodonService, OutputService): 39 def __init__(self, db: DatabasePool, options: MastodonOutputOptions) -> None: 40 super().__init__(options.instance, db) 41 self.options: MastodonOutputOptions = options 42 43 self.log.info("Verifying %s credentails...", self.url) 44 response = self.verify_credentials() 45 self.user_id: str = response["id"] 46 47 self.log.info("Getting %s configuration...", self.url) 48 response = self.fetch_instance_info() 49 self.instance_info: InstanceInfo = InstanceInfo.from_api(response) 50 51 def accept_post(self, service: str, user: str, post: Post): 52 new_root_id: int | None = None 53 new_parent_id: int | None = None 54 55 reply_ref: str | None = None 56 if post.parent_id: 57 thread = self._find_mapped_thread( 58 post.parent_id, service, user, self.url, self.user_id 59 ) 60 61 if not thread: 62 self.log.error("Failed to find thread tuple in the database!") 63 return 64 _, reply_ref, new_root_id, new_parent_id = thread 65 66 quote = post.attachments.get(QuoteAttachment) 67 if quote: 68 if quote.quoted_user != user: 69 self.log.info("Quoted other user, skipping!") 70 return 71 72 quoted_post = self._get_post(service, user, quote.quoted_id) 73 if not quoted_post: 74 self.log.error("Failed to find quoted post in the database!") 75 return 76 77 quoted_mappings = self._get_mappings(quoted_post["id"], self.url, self.user_id) 78 if not quoted_mappings: 79 self.log.error("Failed to find mappings for quoted post!") 80 return 81 82 quoted_local_id = quoted_mappings[-1][0] 83 # TODO resolve service identifier 84 85 post_tokens = post.tokens.copy() 86 87 remote_url = post.attachments.get(RemoteUrlAttachment) 88 if remote_url and remote_url.url and post.text_type == "text/x.misskeymarkdown": 89 # TODO stip mfm 90 pass 91 92 raw_statuses = [] # TODO split tokens and media across posts 93 if not raw_statuses: 94 self.log.error("Failed to split post into statuses!") 95 return 96 97 langs = post.attachments.get(LanguagesAttachment) 98 sensitive = post.attachments.get(SensitiveAttachment) 99 100 if langs and langs.langs: 101 pass # TODO 102 103 if sensitive and sensitive.sensitive: 104 pass # TODO 105 106 def delete_post(self, service: str, user: str, post_id: str): 107 post = self._get_post(service, user, post_id) 108 if not post: 109 self.log.info("Post not found in db, skipping delete..") 110 return 111 112 mappings = self._get_mappings(post["id"], self.url, self.user_id) 113 for mapping in mappings[::-1]: 114 self.log.info("Deleting '%s'...", mapping["identifier"]) 115 requests.delete( 116 f"{self.url}/api/v1/statuses/{mapping['identifier']}", 117 headers={"Authorization": f"Bearer {self._get_token()}"}, 118 ) 119 self._delete_post_by_id(mapping["id"]) 120 121 def accept_repost(self, service: str, user: str, repost_id: str, reposted_id: str): 122 reposted = self._get_post(service, user, reposted_id) 123 if not reposted: 124 self.log.info("Post not found in db, skipping repost..") 125 return 126 127 mappings = self._get_mappings(reposted["id"], self.url, self.user_id) 128 if mappings: 129 rsp = requests.post( 130 f"{self.url}/api/v1/statuses/{mappings[0]['identifier']}/reblog", 131 headers={"Authorization": f"Bearer {self._get_token()}"}, 132 ) 133 134 if rsp.status_code != 200: 135 self.log.error( 136 "Failed to boost status! status_code: %s, msg: %s", 137 rsp.status_code, 138 rsp.content, 139 ) 140 return 141 142 self._insert_post( 143 { 144 "user": self.user_id, 145 "service": self.url, 146 "identifier": rsp.json()["id"], 147 "reposted": mappings[0]["id"], 148 } 149 ) 150 inserted = self._get_post(self.url, self.user_id, rsp.json()["id"]) 151 if not inserted: 152 raise ValueError("Inserted post not found!") 153 self._insert_post_mapping(reposted["id"], inserted["id"]) 154 155 def delete_repost(self, service: str, user: str, repost_id: str): 156 repost = self._get_post(service, user, repost_id) 157 if not repost: 158 self.log.info("Repost not found in db, skipping delete..") 159 return 160 161 mappings = self._get_mappings(repost["id"], self.url, self.user_id) 162 rmappings = self._get_mappings(repost["reposted"], self.url, self.user_id) 163 164 if mappings and rmappings: 165 self.log.info( 166 "Removing '%s' Repost of '%s'...", 167 mappings[0]["identifier"], 168 rmappings[0]["identifier"], 169 ) 170 requests.post( 171 f"{self.url}/api/v1/statuses/{rmappings[0]['identifier']}/unreblog", 172 headers={"Authorization": f"Bearer {self._get_token()}"}, 173 ) 174 self._delete_post_by_id(mappings[0]["id"]) 175 176 @override 177 def _get_token(self) -> str: 178 return self.options.token