social media crossposting tool. 3rd time's the charm
mastodon
misskey
crossposting
bluesky
1import requests, time
2
3import cross, util.database as database
4import misskey.mfm_util as mfm_util
5from util.util import LOGGER, as_envvar, canonical_label
6from util.media import MediaInfo
7from util.database import DataBaseWorker
8
9POSSIBLE_MIMES = [
10 'audio/ogg',
11 'audio/mp3',
12 'image/webp',
13 'image/jpeg',
14 'image/png',
15 'video/mp4',
16 'video/quicktime',
17 'video/webm'
18]
19
20ALLOWED_POSTING_VISIBILITY = ['public', 'unlisted', 'private']
21
22class MastodonOutputOptions():
23 def __init__(self, o: dict) -> None:
24 self.visibility = 'public'
25
26 visibility = o.get('visibility')
27 if visibility is not None:
28 if visibility not in ALLOWED_POSTING_VISIBILITY:
29 raise ValueError(f"'visibility' only accepts {', '.join(ALLOWED_POSTING_VISIBILITY)}, got: {visibility}")
30 self.visibility = visibility
31
32class MastodonOutput(cross.Output):
33 def __init__(self, input: cross.Input, settings: dict, db: DataBaseWorker) -> None:
34 super().__init__(input, settings, db)
35 self.options = settings.get('options') or {}
36 self.token = as_envvar(settings.get('token')) or (_ for _ in ()).throw(ValueError("'token' is required"))
37 instance: str = as_envvar(settings.get('instance')) or (_ for _ in ()).throw(ValueError("'instance' is required"))
38
39 self.service = instance[:-1] if instance.endswith('/') else instance
40
41 LOGGER.info("Verifying %s credentails...", self.service)
42 responce = requests.get(f"{self.service}/api/v1/accounts/verify_credentials", headers={
43 'Authorization': f'Bearer {self.token}'
44 })
45 if responce.status_code != 200:
46 LOGGER.error("Failed to validate user credentials!")
47 responce.raise_for_status()
48 return
49 self.user_id: str = responce.json()["id"]
50
51 LOGGER.info("Getting %s configuration...", self.service)
52 responce = requests.get(f"{self.service}/api/v1/instance", headers={
53 'Authorization': f'Bearer {self.token}'
54 })
55 if responce.status_code != 200:
56 LOGGER.error("Failed to get instance info!")
57 responce.raise_for_status()
58 return
59
60 instance_info: dict = responce.json()
61 configuration: dict = instance_info['configuration']
62
63 statuses_config: dict = configuration.get('statuses', {})
64 self.max_characters: int = statuses_config.get('max_characters', 500)
65 self.max_media_attachments: int = statuses_config.get('max_media_attachments', 4)
66 self.characters_reserved_per_url: int = statuses_config.get('characters_reserved_per_url', 23)
67
68 media_config: dict = configuration.get('media_attachments', {})
69 self.image_size_limit: int = media_config.get('image_size_limit', 16777216)
70 self.video_size_limit: int = media_config.get('video_size_limit', 103809024)
71 self.supported_mime_types: list[str] = media_config.get('supported_mime_types', POSSIBLE_MIMES)
72
73 # *oma: max post chars
74 max_toot_chars = instance_info.get('max_toot_chars')
75 if max_toot_chars:
76 self.max_characters: int = max_toot_chars
77
78 # *oma: max upload limit
79 upload_limit = instance_info.get('upload_limit')
80 if upload_limit:
81 self.image_size_limit: int = upload_limit
82 self.video_size_limit: int = upload_limit
83
84 # *oma ext: supported text types
85 self.text_format = 'text/plain'
86 pleroma = instance_info.get('pleroma')
87 if pleroma:
88 post_formats: list[str] = pleroma.get('metadata', {}).get('post_formats', [])
89 if 'text/x.misskeymarkdown' in post_formats:
90 self.text_format = 'text/x.misskeymarkdown'
91 elif 'text/markdown' in post_formats:
92 self.text_format = 'text/markdown'
93
94 def upload_media(self, attachments: list[MediaInfo]) -> list[str] | None:
95 for a in attachments:
96 if a.mime.startswith('image/') and len(a.io) > self.image_size_limit:
97 return None
98
99 if a.mime.startswith('video/') and len(a.io) > self.video_size_limit:
100 return None
101
102 if not a.mime.startswith('image/') and not a.mime.startswith('video/'):
103 if len(a.io) > 7_000_000:
104 return None
105
106 uploads: list[dict] = []
107 for a in attachments:
108 data = {}
109 if a.alt:
110 data['description'] = a.alt
111
112 req = requests.post(f"{self.service}/api/v2/media", headers= {
113 'Authorization': f'Bearer {self.token}'
114 }, files={'file': (a.name, a.io, a.mime)}, data=data)
115
116 if req.status_code == 200:
117 LOGGER.info("Uploaded %s! (%s)", a.name, req.json()['id'])
118 uploads.append({
119 'done': True,
120 'id': req.json()['id']
121 })
122 elif req.status_code == 202:
123 LOGGER.info("Waiting for %s to process!", a.name)
124 uploads.append({
125 'done': False,
126 'id': req.json()['id']
127 })
128 else:
129 LOGGER.error("Failed to upload %s! %s", a.name, req.text)
130 req.raise_for_status()
131
132 while any([not val['done'] for val in uploads]):
133 LOGGER.info("Waiting for media to process...")
134 time.sleep(3)
135 for media in uploads:
136 if media['done']:
137 continue
138
139 reqs = requests.get(f'{self.service}/api/v1/media/{media['id']}', headers={
140 'Authorization': f'Bearer {self.token}'
141 })
142
143 if reqs.status_code == 206:
144 continue
145
146 if reqs.status_code == 200:
147 media['done'] = True
148 continue
149 reqs.raise_for_status()
150
151 return [val['id'] for val in uploads]
152
153 def token_to_string(self, tokens: list[cross.Token]) -> str | None:
154 p_text: str = ''
155
156 for token in tokens:
157 if isinstance(token, cross.TextToken):
158 p_text += token.text
159 elif isinstance(token, cross.TagToken):
160 p_text += '#' + token.tag
161 elif isinstance(token, cross.LinkToken):
162 if canonical_label(token.label, token.href):
163 p_text += token.href
164 else:
165 if self.text_format == 'text/plain':
166 p_text += f'{token.label}: {token.href}'
167 elif self.text_format in {'text/x.misskeymarkdown', 'text/markdown'}:
168 p_text += f'[{token.label}]({token.href})'
169 else:
170 return None
171
172 return p_text
173
174 def split_tokens_media(self, tokens: list[cross.Token], media: list[MediaInfo]):
175 split_tokens = cross.split_tokens(tokens, self.max_characters, self.characters_reserved_per_url)
176 post_text: list[str] = []
177
178 for block in split_tokens:
179 baked_text = self.token_to_string(block)
180
181 if baked_text is None:
182 return None
183 post_text.append(baked_text)
184
185 if not post_text:
186 post_text = ['']
187
188 posts: list[dict] = [{"text": post_text, "attachments": []} for post_text in post_text]
189 available_indices: list[int] = list(range(len(posts)))
190
191 current_image_post_idx: int | None = None
192
193 def make_blank_post() -> dict:
194 return {
195 "text": '',
196 "attachments": []
197 }
198
199 def pop_next_empty_index() -> int:
200 if available_indices:
201 return available_indices.pop(0)
202 else:
203 new_idx = len(posts)
204 posts.append(make_blank_post())
205 return new_idx
206
207 for att in media:
208 if (
209 current_image_post_idx is not None
210 and len(posts[current_image_post_idx]["attachments"]) < self.max_media_attachments
211 ):
212 posts[current_image_post_idx]["attachments"].append(att)
213 else:
214 idx = pop_next_empty_index()
215 posts[idx]["attachments"].append(att)
216 current_image_post_idx = idx
217
218 result: list[tuple[str, list[MediaInfo]]] = []
219
220 for p in posts:
221 result.append((p['text'], p["attachments"]))
222
223 return result
224
225 def accept_post(self, post: cross.Post):
226 parent_id = post.get_parent_id()
227
228 new_root_id: int | None = None
229 new_parent_id: int | None = None
230
231 reply_ref: str | None = None
232 if parent_id:
233 thread_tuple = database.find_mapped_thread(
234 self.db,
235 parent_id,
236 self.input.user_id,
237 self.input.service,
238 self.user_id,
239 self.service
240 )
241
242 if not thread_tuple:
243 LOGGER.error("Failed to find thread tuple in the database!")
244 return None
245
246 _, reply_ref, new_root_id, new_parent_id = thread_tuple
247
248 lang: str
249 if post.get_languages():
250 lang = post.get_languages()[0]
251 else:
252 lang = 'en'
253
254 post_tokens = post.get_tokens()
255 if post.get_text_type() == "text/x.misskeymarkdown":
256 post_tokens, status = mfm_util.strip_mfm(post_tokens)
257 post_url = post.get_post_url()
258 if status and post_url:
259 post_tokens.append(cross.TextToken('\n'))
260 post_tokens.append(cross.LinkToken(post_url, "[Post contains MFM, see original]"))
261
262 raw_statuses = self.split_tokens_media(post_tokens, post.get_attachments())
263 if not raw_statuses:
264 LOGGER.error("Failed to split post into statuses?")
265 return None
266 baked_statuses = []
267
268 for status, raw_media in raw_statuses:
269 media: list[str] | None = None
270 if raw_media:
271 media = self.upload_media(raw_media)
272 if not media:
273 LOGGER.error("Failed to upload attachments!")
274 return None
275 baked_statuses.append((status, media))
276 continue
277 baked_statuses.append((status,[]))
278
279 created_statuses: list[str] = []
280
281 for status, media in baked_statuses:
282 payload = {
283 'status': status,
284 'media_ids': media or [],
285 'spoiler_text': post.get_cw(),
286 'visibility': self.options.get('visibility', 'public'),
287 'content_type': self.text_format,
288 'language': lang
289 }
290
291 if media:
292 payload['sensitive'] = post.is_sensitive()
293
294 if post.get_cw():
295 payload['sensitive'] = True
296
297 if not status:
298 payload['status'] = '🖼️'
299
300 if reply_ref:
301 payload['in_reply_to_id'] = reply_ref
302
303 reqs = requests.post(f'{self.service}/api/v1/statuses', headers={
304 'Authorization': f'Bearer {self.token}',
305 'Content-Type': 'application/json'
306 }, json=payload)
307
308 if reqs.status_code != 200:
309 LOGGER.info("Failed to post status! %s - %s", reqs.status_code, reqs.text)
310 reqs.raise_for_status()
311
312 reply_ref = reqs.json()['id']
313 LOGGER.info("Created new status %s!", reply_ref)
314
315 created_statuses.append(reqs.json()['id'])
316
317 db_post = database.find_post(self.db, post.get_id(), self.input.user_id, self.input.service)
318 assert db_post, "ghghghhhhh"
319
320 if new_root_id is None or new_parent_id is None:
321 new_root_id = database.insert_post(
322 self.db,
323 created_statuses[0],
324 self.user_id,
325 self.service
326 )
327 new_parent_id = new_root_id
328 database.insert_mapping(self.db, db_post['id'], new_parent_id)
329 created_statuses = created_statuses[1:]
330
331 for db_id in created_statuses:
332 new_parent_id = database.insert_reply(
333 self.db,
334 db_id,
335 self.user_id,
336 self.service,
337 new_parent_id,
338 new_root_id
339 )
340 database.insert_mapping(self.db, db_post['id'], new_parent_id)
341
342 def delete_post(self, identifier: str):
343 post = database.find_post(self.db, identifier, self.input.user_id, self.input.service)
344 if not post:
345 return
346
347 mappings = database.find_mappings(self.db, post['id'], self.service, self.user_id)
348 for mapping in mappings[::-1]:
349 LOGGER.info("Deleting '%s'...", mapping[0])
350 requests.delete(f'{self.service}/api/v1/statuses/{mapping[0]}', headers={
351 'Authorization': f'Bearer {self.token}'
352 })
353 database.delete_post(self.db, mapping[0], self.service, self.user_id)
354
355 def accept_repost(self, repost_id: str, reposted_id: str):
356 repost = self.__delete_repost(repost_id)
357 if not repost:
358 return None
359
360 reposted = database.find_post(self.db, reposted_id, self.input.user_id, self.input.service)
361 if not reposted:
362 return
363
364 mappings = database.find_mappings(self.db, reposted['id'], self.service, self.user_id)
365 if mappings:
366 rsp = requests.post(f'{self.service}/api/v1/statuses/{mappings[0][0]}/reblog', headers={
367 'Authorization': f'Bearer {self.token}'
368 })
369
370 if rsp.status_code != 200:
371 LOGGER.error("Failed to boost status! status_code: %s, msg: %s", rsp.status_code, rsp.content)
372 return
373
374 internal_id = database.insert_repost(
375 self.db,
376 rsp.json()['id'],
377 reposted['id'],
378 self.user_id,
379 self.service)
380 database.insert_mapping(self.db, repost['id'], internal_id)
381
382 def __delete_repost(self, repost_id: str) -> dict | None:
383 repost = database.find_post(self.db, repost_id, self.input.user_id, self.input.service)
384 if not repost:
385 return None
386
387 mappings = database.find_mappings(self.db, repost['id'], self.service, self.user_id)
388 reposted_mappings = database.find_mappings(self.db, repost['reposted_id'], self.service, self.user_id)
389 if mappings and reposted_mappings:
390 LOGGER.info("Deleting '%s'...", mappings[0][0])
391 requests.post(f'{self.service}/api/v1/statuses/{reposted_mappings[0][0]}/unreblog', headers={
392 'Authorization': f'Bearer {self.token}'
393 })
394 database.delete_post(self.db, mappings[0][0], self.user_id, self.service)
395 return repost
396
397 def delete_repost(self, repost_id: str):
398 self.__delete_repost(repost_id)