social media crossposting tool. 3rd time's the charm
mastodon
misskey
crossposting
bluesky
1from dataclasses import dataclass
2from typing import Any, override
3
4import requests
5
6from cross.attachments import (
7 LanguagesAttachment,
8 QuoteAttachment,
9 RemoteUrlAttachment,
10 SensitiveAttachment,
11)
12from cross.post import Post
13from cross.service import OutputService
14from database.connection import DatabasePool
15from mastodon.info import InstanceInfo, MastodonService, validate_and_transform
16
17ALLOWED_POSTING_VISIBILITY: list[str] = ["public", "unlisted", "private"]
18
19
20@dataclass(kw_only=True)
21class MastodonOutputOptions:
22 token: str
23 instance: str
24 visibility: str = "public"
25
26 @classmethod
27 def from_dict(cls, data: dict[str, Any]) -> "MastodonOutputOptions":
28 validate_and_transform(data)
29
30 if "visibility" in data:
31 if data["visibility"] not in ALLOWED_POSTING_VISIBILITY:
32 raise ValueError(f"Invalid visibility option {data['visibility']}!")
33
34 return MastodonOutputOptions(**data)
35
36
37# TODO
38class MastodonOutputService(MastodonService, OutputService):
39 def __init__(self, db: DatabasePool, options: MastodonOutputOptions) -> None:
40 super().__init__(options.instance, db)
41 self.options: MastodonOutputOptions = options
42
43 self.log.info("Verifying %s credentails...", self.url)
44 response = self.verify_credentials()
45 self.user_id: str = response["id"]
46
47 self.log.info("Getting %s configuration...", self.url)
48 response = self.fetch_instance_info()
49 self.instance_info: InstanceInfo = InstanceInfo.from_api(response)
50
51 def accept_post(self, service: str, user: str, post: Post):
52 new_root_id: int | None = None
53 new_parent_id: int | None = None
54
55 reply_ref: str | None = None
56 if post.parent_id:
57 thread = self._find_mapped_thread(
58 post.parent_id, service, user, self.url, self.user_id
59 )
60
61 if not thread:
62 self.log.error("Failed to find thread tuple in the database!")
63 return
64 _, reply_ref, new_root_id, new_parent_id = thread
65
66 quote = post.attachments.get(QuoteAttachment)
67 if quote:
68 if quote.quoted_user != user:
69 self.log.info("Quoted other user, skipping!")
70 return
71
72 quoted_post = self._get_post(service, user, quote.quoted_id)
73 if not quoted_post:
74 self.log.error("Failed to find quoted post in the database!")
75 return
76
77 quoted_mappings = self._get_mappings(quoted_post["id"], self.url, self.user_id)
78 if not quoted_mappings:
79 self.log.error("Failed to find mappings for quoted post!")
80 return
81
82 quoted_local_id = quoted_mappings[-1][0]
83 # TODO resolve service identifier
84
85 post_tokens = post.tokens.copy()
86
87 remote_url = post.attachments.get(RemoteUrlAttachment)
88 if remote_url and remote_url.url and post.text_type == "text/x.misskeymarkdown":
89 # TODO stip mfm
90 pass
91
92 raw_statuses = [] # TODO split tokens and media across posts
93 if not raw_statuses:
94 self.log.error("Failed to split post into statuses!")
95 return
96
97 langs = post.attachments.get(LanguagesAttachment)
98 sensitive = post.attachments.get(SensitiveAttachment)
99
100 if langs and langs.langs:
101 pass # TODO
102
103 if sensitive and sensitive.sensitive:
104 pass # TODO
105
106 def delete_post(self, service: str, user: str, post_id: str):
107 post = self._get_post(service, user, post_id)
108 if not post:
109 self.log.info("Post not found in db, skipping delete..")
110 return
111
112 mappings = self._get_mappings(post["id"], self.url, self.user_id)
113 for mapping in mappings[::-1]:
114 self.log.info("Deleting '%s'...", mapping["identifier"])
115 requests.delete(
116 f"{self.url}/api/v1/statuses/{mapping['identifier']}",
117 headers={"Authorization": f"Bearer {self._get_token()}"},
118 )
119 self._delete_post_by_id(mapping["id"])
120
121 def accept_repost(self, service: str, user: str, repost_id: str, reposted_id: str):
122 reposted = self._get_post(service, user, reposted_id)
123 if not reposted:
124 self.log.info("Post not found in db, skipping repost..")
125 return
126
127 mappings = self._get_mappings(reposted["id"], self.url, self.user_id)
128 if mappings:
129 rsp = requests.post(
130 f"{self.url}/api/v1/statuses/{mappings[0]['identifier']}/reblog",
131 headers={"Authorization": f"Bearer {self._get_token()}"},
132 )
133
134 if rsp.status_code != 200:
135 self.log.error(
136 "Failed to boost status! status_code: %s, msg: %s",
137 rsp.status_code,
138 rsp.content,
139 )
140 return
141
142 self._insert_post(
143 {
144 "user": self.user_id,
145 "service": self.url,
146 "identifier": rsp.json()["id"],
147 "reposted": mappings[0]["id"],
148 }
149 )
150 inserted = self._get_post(self.url, self.user_id, rsp.json()["id"])
151 if not inserted:
152 raise ValueError("Inserted post not found!")
153 self._insert_post_mapping(reposted["id"], inserted["id"])
154
155 def delete_repost(self, service: str, user: str, repost_id: str):
156 repost = self._get_post(service, user, repost_id)
157 if not repost:
158 self.log.info("Repost not found in db, skipping delete..")
159 return
160
161 mappings = self._get_mappings(repost["id"], self.url, self.user_id)
162 rmappings = self._get_mappings(repost["reposted"], self.url, self.user_id)
163
164 if mappings and rmappings:
165 self.log.info(
166 "Removing '%s' Repost of '%s'...",
167 mappings[0]["identifier"],
168 rmappings[0]["identifier"],
169 )
170 requests.post(
171 f"{self.url}/api/v1/statuses/{rmappings[0]['identifier']}/unreblog",
172 headers={"Authorization": f"Bearer {self._get_token()}"},
173 )
174 self._delete_post_by_id(mappings[0]["id"])
175
176 @override
177 def _get_token(self) -> str:
178 return self.options.token