from dataclasses import dataclass from typing import Any, override import requests from cross.attachments import ( LanguagesAttachment, QuoteAttachment, RemoteUrlAttachment, SensitiveAttachment, ) from cross.post import Post from cross.service import OutputService from database.connection import DatabasePool from mastodon.info import InstanceInfo, MastodonService, validate_and_transform ALLOWED_POSTING_VISIBILITY: list[str] = ["public", "unlisted", "private"] @dataclass(kw_only=True) class MastodonOutputOptions: token: str instance: str visibility: str = "public" @classmethod def from_dict(cls, data: dict[str, Any]) -> "MastodonOutputOptions": validate_and_transform(data) if "visibility" in data: if data["visibility"] not in ALLOWED_POSTING_VISIBILITY: raise ValueError(f"Invalid visibility option {data['visibility']}!") return MastodonOutputOptions(**data) # TODO class MastodonOutputService(MastodonService, OutputService): def __init__(self, db: DatabasePool, options: MastodonOutputOptions) -> None: super().__init__(options.instance, db) self.options: MastodonOutputOptions = options self.log.info("Verifying %s credentails...", self.url) response = self.verify_credentials() self.user_id: str = response["id"] self.log.info("Getting %s configuration...", self.url) response = self.fetch_instance_info() self.instance_info: InstanceInfo = InstanceInfo.from_api(response) def accept_post(self, service: str, user: str, post: Post): new_root_id: int | None = None new_parent_id: int | None = None reply_ref: str | None = None if post.parent_id: thread = self._find_mapped_thread( post.parent_id, service, user, self.url, self.user_id ) if not thread: self.log.error("Failed to find thread tuple in the database!") return _, reply_ref, new_root_id, new_parent_id = thread quote = post.attachments.get(QuoteAttachment) if quote: if quote.quoted_user != user: self.log.info("Quoted other user, skipping!") return quoted_post = self._get_post(service, user, quote.quoted_id) if not quoted_post: self.log.error("Failed to find quoted post in the database!") return quoted_mappings = self._get_mappings(quoted_post["id"], self.url, self.user_id) if not quoted_mappings: self.log.error("Failed to find mappings for quoted post!") return quoted_local_id = quoted_mappings[-1][0] # TODO resolve service identifier post_tokens = post.tokens.copy() remote_url = post.attachments.get(RemoteUrlAttachment) if remote_url and remote_url.url and post.text_type == "text/x.misskeymarkdown": # TODO stip mfm pass raw_statuses = [] # TODO split tokens and media across posts if not raw_statuses: self.log.error("Failed to split post into statuses!") return langs = post.attachments.get(LanguagesAttachment) sensitive = post.attachments.get(SensitiveAttachment) if langs and langs.langs: pass # TODO if sensitive and sensitive.sensitive: pass # TODO def delete_post(self, service: str, user: str, post_id: str): post = self._get_post(service, user, post_id) if not post: self.log.info("Post not found in db, skipping delete..") return mappings = self._get_mappings(post["id"], self.url, self.user_id) for mapping in mappings[::-1]: self.log.info("Deleting '%s'...", mapping["identifier"]) requests.delete( f"{self.url}/api/v1/statuses/{mapping['identifier']}", headers={"Authorization": f"Bearer {self._get_token()}"}, ) self._delete_post_by_id(mapping["id"]) def accept_repost(self, service: str, user: str, repost_id: str, reposted_id: str): reposted = self._get_post(service, user, reposted_id) if not reposted: self.log.info("Post not found in db, skipping repost..") return mappings = self._get_mappings(reposted["id"], self.url, self.user_id) if mappings: rsp = requests.post( f"{self.url}/api/v1/statuses/{mappings[0]['identifier']}/reblog", headers={"Authorization": f"Bearer {self._get_token()}"}, ) if rsp.status_code != 200: self.log.error( "Failed to boost status! status_code: %s, msg: %s", rsp.status_code, rsp.content, ) return self._insert_post( { "user": self.user_id, "service": self.url, "identifier": rsp.json()["id"], "reposted": mappings[0]["id"], } ) inserted = self._get_post(self.url, self.user_id, rsp.json()["id"]) if not inserted: raise ValueError("Inserted post not found!") self._insert_post_mapping(reposted["id"], inserted["id"]) def delete_repost(self, service: str, user: str, repost_id: str): repost = self._get_post(service, user, repost_id) if not repost: self.log.info("Repost not found in db, skipping delete..") return mappings = self._get_mappings(repost["id"], self.url, self.user_id) rmappings = self._get_mappings(repost["reposted"], self.url, self.user_id) if mappings and rmappings: self.log.info( "Removing '%s' Repost of '%s'...", mappings[0]["identifier"], rmappings[0]["identifier"], ) requests.post( f"{self.url}/api/v1/statuses/{rmappings[0]['identifier']}/unreblog", headers={"Authorization": f"Bearer {self._get_token()}"}, ) self._delete_post_by_id(mappings[0]["id"]) @override def _get_token(self) -> str: return self.options.token