···
from dataclasses import dataclass
from typing import Any, override
+
from cross.attachments import (
+
from cross.post import Post
from cross.service import OutputService
from database.connection import DatabasePool
from mastodon.info import InstanceInfo, MastodonService, validate_and_transform
···
self.log.info("Getting %s configuration...", self.url)
response = self.fetch_instance_info()
self.instance_info: InstanceInfo = InstanceInfo.from_api(response)
+
def accept_post(self, service: str, user: str, post: Post):
+
new_root_id: int | None = None
+
new_parent_id: int | None = None
+
reply_ref: str | None = None
+
thread = self._find_mapped_thread(
+
post.parent_id, service, user, self.url, self.user_id
+
self.log.error("Failed to find thread tuple in the database!")
+
_, reply_ref, new_root_id, new_parent_id = thread
+
quote = post.attachments.get(QuoteAttachment)
+
if quote.quoted_user != user:
+
self.log.info("Quoted other user, skipping!")
+
quoted_post = self._get_post(service, user, quote.quoted_id)
+
self.log.error("Failed to find quoted post in the database!")
+
quoted_mappings = self._get_mappings(quoted_post["id"], self.url, self.user_id)
+
if not quoted_mappings:
+
self.log.error("Failed to find mappings for quoted post!")
+
quoted_local_id = quoted_mappings[-1][0]
+
# TODO resolve service identifier
+
post_tokens = post.tokens.copy()
+
remote_url = post.attachments.get(RemoteUrlAttachment)
+
if remote_url and remote_url.url and post.text_type == "text/x.misskeymarkdown":
+
raw_statuses = [] # TODO split tokens and media across posts
+
self.log.error("Failed to split post into statuses!")
+
langs = post.attachments.get(LanguagesAttachment)
+
sensitive = post.attachments.get(SensitiveAttachment)
+
if langs and langs.langs:
+
if sensitive and sensitive.sensitive:
+
def delete_post(self, service: str, user: str, post_id: str):
+
post = self._get_post(service, user, post_id)
+
self.log.info("Post not found in db, skipping delete..")
+
mappings = self._get_mappings(post["id"], self.url, self.user_id)
+
for mapping in mappings[::-1]:
+
self.log.info("Deleting '%s'...", mapping["identifier"])
+
f"{self.url}/api/v1/statuses/{mapping['identifier']}",
+
headers={"Authorization": f"Bearer {self._get_token()}"},
+
self._delete_post_by_id(mapping["id"])
+
def accept_repost(self, service: str, user: str, repost_id: str, reposted_id: str):
+
reposted = self._get_post(service, user, reposted_id)
+
self.log.info("Post not found in db, skipping repost..")
+
mappings = self._get_mappings(reposted["id"], self.url, self.user_id)
+
f"{self.url}/api/v1/statuses/{mappings[0]['identifier']}/reblog",
+
headers={"Authorization": f"Bearer {self._get_token()}"},
+
if rsp.status_code != 200:
+
"Failed to boost status! status_code: %s, msg: %s",
+
"identifier": rsp.json()["id"],
+
"reposted": mappings[0]["id"],
+
inserted = self._get_post(self.url, self.user_id, rsp.json()["id"])
+
raise ValueError("Inserted post not found!")
+
self._insert_post_mapping(reposted["id"], inserted["id"])
+
def delete_repost(self, service: str, user: str, repost_id: str):
+
repost = self._get_post(service, user, repost_id)
+
self.log.info("Repost not found in db, skipping delete..")
+
mappings = self._get_mappings(repost["id"], self.url, self.user_id)
+
rmappings = self._get_mappings(repost["reposted"], self.url, self.user_id)
+
if mappings and rmappings:
+
"Removing '%s' Repost of '%s'...",
+
mappings[0]["identifier"],
+
rmappings[0]["identifier"],
+
f"{self.url}/api/v1/statuses/{rmappings[0]['identifier']}/unreblog",
+
headers={"Authorization": f"Bearer {self._get_token()}"},
+
self._delete_post_by_id(mappings[0]["id"])
def _get_token(self) -> str: