social media crossposting tool. 3rd time's the charm
mastodon
misskey
crossposting
bluesky
1import time
2
3import requests
4
5import cross
6import misskey.mfm_util as mfm_util
7import util.database as database
8from util.database import DataBaseWorker
9from util.media import MediaInfo
10from util.util import LOGGER, as_envvar, canonical_label
11
12POSSIBLE_MIMES = [
13 "audio/ogg",
14 "audio/mp3",
15 "image/webp",
16 "image/jpeg",
17 "image/png",
18 "video/mp4",
19 "video/quicktime",
20 "video/webm",
21]
22
23TEXT_MIMES = ["text/x.misskeymarkdown", "text/markdown", "text/plain"]
24
25ALLOWED_POSTING_VISIBILITY = ["public", "unlisted", "private"]
26
27
28class MastodonOutputOptions:
29 def __init__(self, o: dict) -> None:
30 self.visibility = "public"
31
32 visibility = o.get("visibility")
33 if visibility is not None:
34 if visibility not in ALLOWED_POSTING_VISIBILITY:
35 raise ValueError(
36 f"'visibility' only accepts {', '.join(ALLOWED_POSTING_VISIBILITY)}, got: {visibility}"
37 )
38 self.visibility = visibility
39
40
41class MastodonOutput(cross.Output):
42 def __init__(self, input: cross.Input, settings: dict, db: DataBaseWorker) -> None:
43 super().__init__(input, settings, db)
44 self.options = settings.get("options") or {}
45 self.token = as_envvar(settings.get("token")) or (_ for _ in ()).throw(
46 ValueError("'token' is required")
47 )
48 instance: str = as_envvar(settings.get("instance")) or (_ for _ in ()).throw(
49 ValueError("'instance' is required")
50 )
51
52 self.service = instance[:-1] if instance.endswith("/") else instance
53
54 LOGGER.info("Verifying %s credentails...", self.service)
55 responce = requests.get(
56 f"{self.service}/api/v1/accounts/verify_credentials",
57 headers={"Authorization": f"Bearer {self.token}"},
58 )
59 if responce.status_code != 200:
60 LOGGER.error("Failed to validate user credentials!")
61 responce.raise_for_status()
62 return
63 self.user_id: str = responce.json()["id"]
64
65 LOGGER.info("Getting %s configuration...", self.service)
66 responce = requests.get(
67 f"{self.service}/api/v1/instance",
68 headers={"Authorization": f"Bearer {self.token}"},
69 )
70 if responce.status_code != 200:
71 LOGGER.error("Failed to get instance info!")
72 responce.raise_for_status()
73 return
74
75 instance_info: dict = responce.json()
76 configuration: dict = instance_info["configuration"]
77
78 statuses_config: dict = configuration.get("statuses", {})
79 self.max_characters: int = statuses_config.get("max_characters", 500)
80 self.max_media_attachments: int = statuses_config.get(
81 "max_media_attachments", 4
82 )
83 self.characters_reserved_per_url: int = statuses_config.get(
84 "characters_reserved_per_url", 23
85 )
86
87 media_config: dict = configuration.get("media_attachments", {})
88 self.image_size_limit: int = media_config.get("image_size_limit", 16777216)
89 self.video_size_limit: int = media_config.get("video_size_limit", 103809024)
90 self.supported_mime_types: list[str] = media_config.get(
91 "supported_mime_types", POSSIBLE_MIMES
92 )
93
94 # *oma: max post chars
95 max_toot_chars = instance_info.get("max_toot_chars")
96 if max_toot_chars:
97 self.max_characters: int = max_toot_chars
98
99 # *oma: max upload limit
100 upload_limit = instance_info.get("upload_limit")
101 if upload_limit:
102 self.image_size_limit: int = upload_limit
103 self.video_size_limit: int = upload_limit
104
105 # chuckya: supported text types
106 chuckya_text_mimes: list[str] = statuses_config.get("supported_mime_types", [])
107 self.text_format = next(
108 (mime for mime in TEXT_MIMES if mime in (chuckya_text_mimes)), "text/plain"
109 )
110
111 # *oma ext: supported text types
112 pleroma = instance_info.get("pleroma")
113 if pleroma:
114 post_formats: list[str] = pleroma.get("metadata", {}).get(
115 "post_formats", []
116 )
117 self.text_format = next(
118 (mime for mime in TEXT_MIMES if mime in post_formats), self.text_format
119 )
120
121 def upload_media(self, attachments: list[MediaInfo]) -> list[str] | None:
122 for a in attachments:
123 if a.mime.startswith("image/") and len(a.io) > self.image_size_limit:
124 return None
125
126 if a.mime.startswith("video/") and len(a.io) > self.video_size_limit:
127 return None
128
129 if not a.mime.startswith("image/") and not a.mime.startswith("video/"):
130 if len(a.io) > 7_000_000:
131 return None
132
133 uploads: list[dict] = []
134 for a in attachments:
135 data = {}
136 if a.alt:
137 data["description"] = a.alt
138
139 req = requests.post(
140 f"{self.service}/api/v2/media",
141 headers={"Authorization": f"Bearer {self.token}"},
142 files={"file": (a.name, a.io, a.mime)},
143 data=data,
144 )
145
146 if req.status_code == 200:
147 LOGGER.info("Uploaded %s! (%s)", a.name, req.json()["id"])
148 uploads.append({"done": True, "id": req.json()["id"]})
149 elif req.status_code == 202:
150 LOGGER.info("Waiting for %s to process!", a.name)
151 uploads.append({"done": False, "id": req.json()["id"]})
152 else:
153 LOGGER.error("Failed to upload %s! %s", a.name, req.text)
154 req.raise_for_status()
155
156 while any([not val["done"] for val in uploads]):
157 LOGGER.info("Waiting for media to process...")
158 time.sleep(3)
159 for media in uploads:
160 if media["done"]:
161 continue
162
163 reqs = requests.get(
164 f"{self.service}/api/v1/media/{media['id']}",
165 headers={"Authorization": f"Bearer {self.token}"},
166 )
167
168 if reqs.status_code == 206:
169 continue
170
171 if reqs.status_code == 200:
172 media["done"] = True
173 continue
174 reqs.raise_for_status()
175
176 return [val["id"] for val in uploads]
177
178 def token_to_string(self, tokens: list[cross.Token]) -> str | None:
179 p_text: str = ""
180
181 for token in tokens:
182 if isinstance(token, cross.TextToken):
183 p_text += token.text
184 elif isinstance(token, cross.TagToken):
185 p_text += "#" + token.tag
186 elif isinstance(token, cross.LinkToken):
187 if canonical_label(token.label, token.href):
188 p_text += token.href
189 else:
190 if self.text_format == "text/plain":
191 p_text += f"{token.label} ({token.href})"
192 elif self.text_format in {
193 "text/x.misskeymarkdown",
194 "text/markdown",
195 }:
196 p_text += f"[{token.label}]({token.href})"
197 else:
198 return None
199
200 return p_text
201
202 def split_tokens_media(self, tokens: list[cross.Token], media: list[MediaInfo]):
203 split_tokens = cross.split_tokens(
204 tokens, self.max_characters, self.characters_reserved_per_url
205 )
206 post_text: list[str] = []
207
208 for block in split_tokens:
209 baked_text = self.token_to_string(block)
210
211 if baked_text is None:
212 return None
213 post_text.append(baked_text)
214
215 if not post_text:
216 post_text = [""]
217
218 posts: list[dict] = [
219 {"text": post_text, "attachments": []} for post_text in post_text
220 ]
221 available_indices: list[int] = list(range(len(posts)))
222
223 current_image_post_idx: int | None = None
224
225 def make_blank_post() -> dict:
226 return {"text": "", "attachments": []}
227
228 def pop_next_empty_index() -> int:
229 if available_indices:
230 return available_indices.pop(0)
231 else:
232 new_idx = len(posts)
233 posts.append(make_blank_post())
234 return new_idx
235
236 for att in media:
237 if (
238 current_image_post_idx is not None
239 and len(posts[current_image_post_idx]["attachments"])
240 < self.max_media_attachments
241 ):
242 posts[current_image_post_idx]["attachments"].append(att)
243 else:
244 idx = pop_next_empty_index()
245 posts[idx]["attachments"].append(att)
246 current_image_post_idx = idx
247
248 result: list[tuple[str, list[MediaInfo]]] = []
249
250 for p in posts:
251 result.append((p["text"], p["attachments"]))
252
253 return result
254
255 def accept_post(self, post: cross.Post):
256 parent_id = post.get_parent_id()
257
258 new_root_id: int | None = None
259 new_parent_id: int | None = None
260
261 reply_ref: str | None = None
262 if parent_id:
263 thread_tuple = database.find_mapped_thread(
264 self.db,
265 parent_id,
266 self.input.user_id,
267 self.input.service,
268 self.user_id,
269 self.service,
270 )
271
272 if not thread_tuple:
273 LOGGER.error("Failed to find thread tuple in the database!")
274 return None
275
276 _, reply_ref, new_root_id, new_parent_id = thread_tuple
277
278 lang: str
279 if post.get_languages():
280 lang = post.get_languages()[0]
281 else:
282 lang = "en"
283
284 post_tokens = post.get_tokens()
285 if post.get_text_type() == "text/x.misskeymarkdown":
286 post_tokens, status = mfm_util.strip_mfm(post_tokens)
287 post_url = post.get_post_url()
288 if status and post_url:
289 post_tokens.append(cross.TextToken("\n"))
290 post_tokens.append(
291 cross.LinkToken(post_url, "[Post contains MFM, see original]")
292 )
293
294 raw_statuses = self.split_tokens_media(post_tokens, post.get_attachments())
295 if not raw_statuses:
296 LOGGER.error("Failed to split post into statuses?")
297 return None
298 baked_statuses = []
299
300 for status, raw_media in raw_statuses:
301 media: list[str] | None = None
302 if raw_media:
303 media = self.upload_media(raw_media)
304 if not media:
305 LOGGER.error("Failed to upload attachments!")
306 return None
307 baked_statuses.append((status, media))
308 continue
309 baked_statuses.append((status, []))
310
311 created_statuses: list[str] = []
312
313 for status, media in baked_statuses:
314 payload = {
315 "status": status,
316 "media_ids": media or [],
317 "spoiler_text": post.get_spoiler() or "",
318 "visibility": self.options.get("visibility", "public"),
319 "content_type": self.text_format,
320 "language": lang,
321 }
322
323 if media:
324 payload["sensitive"] = post.is_sensitive()
325
326 if post.get_spoiler():
327 payload["sensitive"] = True
328
329 if not status:
330 payload["status"] = "🖼️"
331
332 if reply_ref:
333 payload["in_reply_to_id"] = reply_ref
334
335 reqs = requests.post(
336 f"{self.service}/api/v1/statuses",
337 headers={
338 "Authorization": f"Bearer {self.token}",
339 "Content-Type": "application/json",
340 },
341 json=payload,
342 )
343
344 if reqs.status_code != 200:
345 LOGGER.info(
346 "Failed to post status! %s - %s", reqs.status_code, reqs.text
347 )
348 reqs.raise_for_status()
349
350 reply_ref = reqs.json()["id"]
351 LOGGER.info("Created new status %s!", reply_ref)
352
353 created_statuses.append(reqs.json()["id"])
354
355 db_post = database.find_post(
356 self.db, post.get_id(), self.input.user_id, self.input.service
357 )
358 assert db_post, "ghghghhhhh"
359
360 if new_root_id is None or new_parent_id is None:
361 new_root_id = database.insert_post(
362 self.db, created_statuses[0], self.user_id, self.service
363 )
364 new_parent_id = new_root_id
365 database.insert_mapping(self.db, db_post["id"], new_parent_id)
366 created_statuses = created_statuses[1:]
367
368 for db_id in created_statuses:
369 new_parent_id = database.insert_reply(
370 self.db, db_id, self.user_id, self.service, new_parent_id, new_root_id
371 )
372 database.insert_mapping(self.db, db_post["id"], new_parent_id)
373
374 def delete_post(self, identifier: str):
375 post = database.find_post(
376 self.db, identifier, self.input.user_id, self.input.service
377 )
378 if not post:
379 return
380
381 mappings = database.find_mappings(
382 self.db, post["id"], self.service, self.user_id
383 )
384 for mapping in mappings[::-1]:
385 LOGGER.info("Deleting '%s'...", mapping[0])
386 requests.delete(
387 f"{self.service}/api/v1/statuses/{mapping[0]}",
388 headers={"Authorization": f"Bearer {self.token}"},
389 )
390 database.delete_post(self.db, mapping[0], self.service, self.user_id)
391
392 def accept_repost(self, repost_id: str, reposted_id: str):
393 repost = self.__delete_repost(repost_id)
394 if not repost:
395 return None
396
397 reposted = database.find_post(
398 self.db, reposted_id, self.input.user_id, self.input.service
399 )
400 if not reposted:
401 return
402
403 mappings = database.find_mappings(
404 self.db, reposted["id"], self.service, self.user_id
405 )
406 if mappings:
407 rsp = requests.post(
408 f"{self.service}/api/v1/statuses/{mappings[0][0]}/reblog",
409 headers={"Authorization": f"Bearer {self.token}"},
410 )
411
412 if rsp.status_code != 200:
413 LOGGER.error(
414 "Failed to boost status! status_code: %s, msg: %s",
415 rsp.status_code,
416 rsp.content,
417 )
418 return
419
420 internal_id = database.insert_repost(
421 self.db, rsp.json()["id"], reposted["id"], self.user_id, self.service
422 )
423 database.insert_mapping(self.db, repost["id"], internal_id)
424
425 def __delete_repost(self, repost_id: str) -> dict | None:
426 repost = database.find_post(
427 self.db, repost_id, self.input.user_id, self.input.service
428 )
429 if not repost:
430 return None
431
432 mappings = database.find_mappings(
433 self.db, repost["id"], self.service, self.user_id
434 )
435 reposted_mappings = database.find_mappings(
436 self.db, repost["reposted_id"], self.service, self.user_id
437 )
438 if mappings and reposted_mappings:
439 LOGGER.info("Deleting '%s'...", mappings[0][0])
440 requests.post(
441 f"{self.service}/api/v1/statuses/{reposted_mappings[0][0]}/unreblog",
442 headers={"Authorization": f"Bearer {self.token}"},
443 )
444 database.delete_post(self.db, mappings[0][0], self.user_id, self.service)
445 return repost
446
447 def delete_repost(self, repost_id: str):
448 self.__delete_repost(repost_id)