social media crossposting tool. 3rd time's the charm
mastodon
misskey
crossposting
bluesky
1from util import LOGGER
2import requests, websockets
3import util, media_util, json, cross
4import database
5from database import DataBaseWorker
6from typing import Callable, Any
7import asyncio, time
8
9from bs4 import BeautifulSoup, Tag
10from bs4.element import NavigableString
11import markeddown
12from html import unescape
13import re
14
15POSSIBLE_MIMES = [
16 'audio/ogg',
17 'audio/mp3',
18 'image/webp',
19 'image/jpeg',
20 'image/png',
21 'video/mp4',
22 'video/quicktime',
23 'video/webm'
24]
25
26md_parser = markeddown.HTMLToMarkdownParser()
27md_parser.preserve_spaces = True
28
29def tokenize_post(status: dict) -> list[cross.Token]:
30 soup = BeautifulSoup(status['content'], "html.parser")
31 tokens: list[cross.Token] = []
32
33 tags: list[dict] = status.get('tags', [])
34 mentions: list[dict] = status.get('mentions', [])
35
36 def mdd(html):
37 md_parser.feed(unescape(html))
38 md = md_parser.get_markdown()
39 md_parser.reset()
40 return md
41
42 def recurse(node) -> None:
43 if isinstance(node, NavigableString):
44 tokens.append(cross.TextToken(str(node)))
45 return
46
47 if isinstance(node, Tag):
48 if node.name.lower() == "a":
49 href = node.get("href", "")
50 inner_html = "".join(str(c) for c in node.contents)
51 link_text_md = mdd(inner_html)
52
53 if link_text_md.startswith('@'):
54 as_mention = link_text_md[1:]
55 for block in mentions:
56 if href == block.get('url'):
57 tokens.append(cross.MentionToken(block['acct'], block['url']))
58 return
59 elif as_mention == block.get('acct') or as_mention == block.get('username'):
60 tokens.append(cross.MentionToken(block['acct'], block['url']))
61 return
62
63 if link_text_md.startswith('#'):
64 as_tag = link_text_md[1:].lower()
65 if any(as_tag == block.get('name') for block in tags):
66 tokens.append(cross.TagToken(link_text_md[1:]))
67 return
68
69 # idk if we can safely convert this to string
70 tokens.append(cross.LinkToken(str(href), link_text_md))
71 return
72
73 if node.find("a") is not None:
74 for child in node.contents:
75 recurse(child)
76 return
77
78 serialized = str(node)
79 markdownified = mdd(serialized)
80 if markdownified:
81 tokens.append(cross.TextToken(markdownified))
82 return
83 return
84
85 for child in soup.contents:
86 recurse(child)
87
88 last_token = tokens[-1]
89 if last_token and isinstance(last_token, cross.TextToken) and last_token.text.endswith('\n\n'):
90 tokens[-1] = cross.TextToken(last_token.text[:-2])
91
92 return tokens
93
94MARKDOWNY = ['text/x.misskeymarkdown', 'text/markdown', 'text/plain']
95
96class MastodonPost(cross.Post):
97 def __init__(self, status: dict, tokens: list[cross.Token], media_attachments: list[media_util.MediaInfo]) -> None:
98 super().__init__()
99 self.status = status
100 self.media_attachments = media_attachments
101 self.tokens = tokens
102
103 def get_tokens(self) -> list[cross.Token]:
104 return self.tokens
105
106 def get_parent_id(self) -> str | None:
107 return self.status.get('in_reply_to_id')
108
109 def get_post_date_iso(self) -> str:
110 date = self.status.get('created_at')
111 return date or super().get_post_date_iso()
112
113 def get_cw(self) -> str:
114 return self.status.get('spoiler_text') or ''
115
116 def get_id(self) -> str:
117 return self.status['id']
118
119 def get_languages(self) -> list[str]:
120 if self.status.get('language'):
121 return [self.status['language']]
122 return []
123
124 def is_sensitive(self) -> bool:
125 return self.status.get('sensitive', False)
126
127 def get_attachments(self) -> list[media_util.MediaInfo]:
128 return self.media_attachments
129
130ALLOWED_VISIBILITY = ['public', 'unlisted']
131
132class MastodonInputOptions():
133 def __init__(self, o: dict) -> None:
134 self.allowed_visibility = ALLOWED_VISIBILITY
135 self.filters = [re.compile(f) for f in o.get('regex_filters', [])]
136
137 allowed_visibility = o.get('allowed_visibility')
138 if allowed_visibility is not None:
139 if any([v not in ALLOWED_VISIBILITY for v in allowed_visibility]):
140 raise ValueError(f"'allowed_visibility' only accepts {', '.join(ALLOWED_VISIBILITY)}, got: {allowed_visibility}")
141 self.allowed_visibility = allowed_visibility
142
143class MastodonInput(cross.Input):
144 def __init__(self, settings: dict, db: DataBaseWorker) -> None:
145 self.options = MastodonInputOptions(settings.get('options', {}))
146 self.token = util.as_envvar(settings.get('token')) or (_ for _ in ()).throw(ValueError("'token' is required"))
147 instance: str = util.as_envvar(settings.get('instance')) or (_ for _ in ()).throw(ValueError("'instance' is required"))
148
149 service = instance[:-1] if instance.endswith('/') else instance
150
151 LOGGER.info("Verifying %s credentails...", service)
152 responce = requests.get(f"{service}/api/v1/accounts/verify_credentials", headers={
153 'Authorization': f'Bearer {self.token}'
154 })
155 if responce.status_code != 200:
156 LOGGER.error("Failed to validate user credentials!")
157 responce.raise_for_status()
158 return
159
160 super().__init__(service, responce.json()["id"], settings, db)
161 self.streaming = self._get_streaming_url()
162
163 if not self.streaming:
164 raise Exception("Instance %s does not support streaming!", service)
165
166 def _get_streaming_url(self):
167 response = requests.get(f"{self.service}/api/v1/instance")
168 response.raise_for_status()
169 data: dict = response.json()
170 return (data.get('urls') or {}).get('streaming_api')
171
172 def __to_tokens(self, status: dict):
173 content_type = status.get('content_type', 'text/plain')
174 raw_text = status.get('text')
175
176 tags: list[str] = []
177 for tag in status.get('tags', []):
178 tags.append(tag['name'])
179
180 mentions: list[tuple[str, str]] = []
181 for mention in status.get('mentions', []):
182 mentions.append(('@' + mention['username'], '@' + mention['acct']))
183
184 if raw_text and content_type in MARKDOWNY:
185 return cross.tokenize_markdown(raw_text, tags, mentions)
186
187 akkoma_ext: dict | None = status.get('akkoma', {}).get('source')
188 if akkoma_ext:
189 if akkoma_ext.get('mediaType') in MARKDOWNY:
190 return cross.tokenize_markdown(akkoma_ext["content"], tags, mentions)
191
192 return tokenize_post(status)
193
194 def _on_create_post(self, outputs: list[cross.Output], status: dict):
195 # skip events from other users
196 if (status.get('account') or {})['id'] != self.user_id:
197 return
198
199 if status.get('reblog') or (status.get('quote_id') or status.get('quote')) or status.get('poll'):
200 # TODO polls not supported on bsky. maybe 3rd party? skip for now
201 # we don't handle reblogs. possible with bridgy(?) and self
202 # we don't handle quotes.
203 LOGGER.info("Skipping '%s'! Reblog, quote or poll..", status['id'])
204 return
205
206 in_reply: str | None = status.get('in_reply_to_id')
207 in_reply_to: str | None = status.get('in_reply_to_account_id')
208 if in_reply_to and in_reply_to != self.user_id:
209 # We don't support replies.
210 LOGGER.info("Skipping '%s'! Reply to other user..", status['id'])
211 return
212
213 if status.get('visibility') not in self.options.allowed_visibility:
214 # Skip f/o and direct posts
215 LOGGER.info("Skipping '%s'! '%s' visibility..", status['id'], status.get('visibility'))
216 return
217
218 success = database.try_insert_post(self.db, status['id'], in_reply, self.user_id, self.service)
219 if not success:
220 LOGGER.info("Skipping '%s' as parent post was not found in db!", status['id'])
221 return
222
223 tokens = self.__to_tokens(status)
224 if not cross.test_filters(tokens, self.options.filters):
225 LOGGER.info("Skipping '%s'. Matched a filter!", status['id'])
226 return
227
228 LOGGER.info("Crossposting '%s'...", status['id'])
229
230 media_attachments: list[media_util.MediaInfo] = []
231 for attachment in status.get('media_attachments', []):
232 LOGGER.info("Downloading %s...", attachment['url'])
233 info = media_util.download_media(attachment['url'], attachment.get('description') or '')
234 if not info:
235 LOGGER.error("Skipping '%s'. Failed to download media!", status['id'])
236 return
237 media_attachments.append(info)
238
239 cross_post = MastodonPost(status, tokens, media_attachments)
240 for output in outputs:
241 output.accept_post(cross_post)
242
243 def _on_delete_post(self, outputs: list[cross.Output], identifier: str):
244 post = database.find_post(self.db, identifier, self.user_id, self.service)
245 if not post:
246 return
247
248 LOGGER.info("Deleting '%s'...", identifier)
249 for output in outputs:
250 output.delete_post(identifier)
251 database.delete_post(self.db, identifier, self.user_id, self.service)
252
253 def _on_post(self, outputs: list[cross.Output], event: str, payload: str):
254 if event == 'update':
255 self._on_create_post(outputs, json.loads(payload))
256 elif event == 'delete':
257 self._on_delete_post(outputs, payload)
258
259 async def listen(self, outputs: list[cross.Output], submit: Callable[[Callable[[], Any]], Any]):
260 uri = f"{self.streaming}/api/v1/streaming?stream=user&access_token={self.token}"
261
262 async for ws in websockets.connect(uri, extra_headers={"User-Agent": "XPost/0.0.3"}):
263 try:
264 LOGGER.info("Listening to %s...", self.streaming)
265
266 async def listen_for_messages():
267 async for msg in ws:
268 data = json.loads(msg)
269 event: str = data.get('event')
270 payload: str = data.get('payload')
271
272 submit(lambda: self._on_post(outputs, str(event), str(payload)))
273
274 listen = asyncio.create_task(listen_for_messages())
275
276 await asyncio.gather(listen)
277 except websockets.ConnectionClosedError as e:
278 LOGGER.error(e, stack_info=True, exc_info=True)
279 LOGGER.info("Reconnecting to %s...", self.streaming)
280 continue
281
282ALLOWED_POSTING_VISIBILITY = ['public', 'unlisted', 'private']
283
284class MastodonOutputOptions():
285 def __init__(self, o: dict) -> None:
286 self.visibility = 'public'
287
288 visibility = o.get('visibility')
289 if visibility is not None:
290 if visibility not in ALLOWED_POSTING_VISIBILITY:
291 raise ValueError(f"'visibility' only accepts {', '.join(ALLOWED_POSTING_VISIBILITY)}, got: {visibility}")
292 self.visibility = visibility
293
294class MastodonOutput(cross.Output):
295 def __init__(self, input: cross.Input, settings: dict, db: DataBaseWorker) -> None:
296 super().__init__(input, settings, db)
297 self.options = settings.get('options') or {}
298 self.token = util.as_envvar(settings.get('token')) or (_ for _ in ()).throw(ValueError("'token' is required"))
299 instance: str = util.as_envvar(settings.get('instance')) or (_ for _ in ()).throw(ValueError("'instance' is required"))
300
301 self.service = instance[:-1] if instance.endswith('/') else instance
302
303 LOGGER.info("Verifying %s credentails...", self.service)
304 responce = requests.get(f"{self.service}/api/v1/accounts/verify_credentials", headers={
305 'Authorization': f'Bearer {self.token}'
306 })
307 if responce.status_code != 200:
308 LOGGER.error("Failed to validate user credentials!")
309 responce.raise_for_status()
310 return
311 self.user_id: str = responce.json()["id"]
312
313 LOGGER.info("Getting %s configuration...", self.service)
314 responce = requests.get(f"{self.service}/api/v1/instance", headers={
315 'Authorization': f'Bearer {self.token}'
316 })
317 if responce.status_code != 200:
318 LOGGER.error("Failed to get instance info!")
319 responce.raise_for_status()
320 return
321
322 instance_info: dict = responce.json()
323 configuration: dict = instance_info['configuration']
324
325 statuses_config: dict = configuration.get('statuses', {})
326 self.max_characters: int = statuses_config.get('max_characters', 500)
327 self.max_media_attachments: int = statuses_config.get('max_media_attachments', 4)
328 self.characters_reserved_per_url: int = statuses_config.get('characters_reserved_per_url', 23)
329
330 media_config: dict = configuration.get('media_attachments', {})
331 self.image_size_limit: int = media_config.get('image_size_limit', 16777216)
332 self.video_size_limit: int = media_config.get('video_size_limit', 103809024)
333 self.supported_mime_types: list[str] = media_config.get('supported_mime_types', POSSIBLE_MIMES)
334
335 # *oma: max post chars
336 max_toot_chars = instance_info.get('max_toot_chars')
337 if max_toot_chars:
338 self.max_characters: int = max_toot_chars
339
340 # *oma: max upload limit
341 upload_limit = instance_info.get('upload_limit')
342 if upload_limit:
343 self.image_size_limit: int = upload_limit
344 self.video_size_limit: int = upload_limit
345
346 # *oma ext: supported text types
347 self.text_format = 'text/plain'
348 pleroma = instance_info.get('pleroma')
349 if pleroma:
350 post_formats: list[str] = pleroma.get('metadata', {}).get('post_formats', [])
351 if 'text/x.misskeymarkdown' in post_formats:
352 self.text_format = 'text/x.misskeymarkdown'
353 elif 'text/markdown' in post_formats:
354 self.text_format = 'text/markdown'
355
356 def upload_media(self, attachments: list[media_util.MediaInfo]) -> list[str] | None:
357 for a in attachments:
358 if a.mime.startswith('image/') and len(a.io) > self.image_size_limit:
359 return None
360
361 if a.mime.startswith('video/') and len(a.io) > self.video_size_limit:
362 return None
363
364 if not a.mime.startswith('image/') and not a.mime.startswith('video/'):
365 if len(a.io) > 7_000_000:
366 return None
367
368 uploads: list[dict] = []
369 for a in attachments:
370 data = {}
371 if a.alt:
372 data['description'] = a.alt
373
374 req = requests.post(f"{self.service}/api/v2/media", headers= {
375 'Authorization': f'Bearer {self.token}'
376 }, files={'file': (a.name, a.io, a.mime)}, data=data)
377
378 if req.status_code == 200:
379 LOGGER.info("Uploaded %s! (%s)", a.name, req.json()['id'])
380 uploads.append({
381 'done': True,
382 'id': req.json()['id']
383 })
384 elif req.status_code == 202:
385 LOGGER.info("Waiting for %s to process!", a.name)
386 uploads.append({
387 'done': False,
388 'id': req.json()['id']
389 })
390 else:
391 LOGGER.error("Failed to upload %s! %s", a.name, req.text)
392 req.raise_for_status()
393
394 while any([not val['done'] for val in uploads]):
395 LOGGER.info("Waiting for media to process...")
396 time.sleep(3)
397 for media in uploads:
398 if media['done']:
399 continue
400
401 reqs = requests.get(f'{self.service}/api/v1/media/{media['id']}', headers={
402 'Authorization': f'Bearer {self.token}'
403 })
404
405 if reqs.status_code == 206:
406 continue
407
408 if reqs.status_code == 200:
409 media['done'] = True
410 continue
411 reqs.raise_for_status()
412
413 return [val['id'] for val in uploads]
414
415 def token_to_string(self, tokens: list[cross.Token]) -> str | None:
416 p_text: str = ''
417
418 for token in tokens:
419 if isinstance(token, cross.TextToken):
420 p_text += token.text
421 elif isinstance(token, cross.TagToken):
422 p_text += '#' + token.tag
423 elif isinstance(token, cross.LinkToken):
424 if util.canonical_label(token.label, token.href):
425 p_text += token.href
426 else:
427 if self.text_format == 'text/plain':
428 p_text += f'{token.label}: {token.href}'
429 elif self.text_format in {'text/x.misskeymarkdown', 'text/markdown'}:
430 p_text += f'[{token.label}]({token.href})'
431 else:
432 return None
433
434 return p_text
435
436 def split_tokens_media(self, tokens: list[cross.Token], media: list[media_util.MediaInfo]):
437 split_tokens = cross.split_tokens(tokens, self.max_characters, self.characters_reserved_per_url)
438 post_text: list[str] = []
439
440 for block in split_tokens:
441 baked_text = self.token_to_string(block)
442
443 if baked_text is None:
444 return None
445 post_text.append(baked_text)
446
447 if not post_text:
448 post_text = ['']
449
450 posts: list[dict] = [{"text": post_text, "attachments": []} for post_text in post_text]
451 available_indices: list[int] = list(range(len(posts)))
452
453 current_image_post_idx: int | None = None
454
455 def make_blank_post() -> dict:
456 return {
457 "text": '',
458 "attachments": []
459 }
460
461 def pop_next_empty_index() -> int:
462 if available_indices:
463 return available_indices.pop(0)
464 else:
465 new_idx = len(posts)
466 posts.append(make_blank_post())
467 return new_idx
468
469 for att in media:
470 if (
471 current_image_post_idx is not None
472 and len(posts[current_image_post_idx]["attachments"]) < self.max_media_attachments
473 ):
474 posts[current_image_post_idx]["attachments"].append(att)
475 else:
476 idx = pop_next_empty_index()
477 posts[idx]["attachments"].append(att)
478 current_image_post_idx = idx
479
480 result: list[tuple[str, list[media_util.MediaInfo]]] = []
481
482 for p in posts:
483 result.append((p['text'], p["attachments"]))
484
485 return result
486
487 def accept_post(self, post: cross.Post):
488 parent_id = post.get_parent_id()
489
490 new_root_id: int | None = None
491 new_parent_id: int | None = None
492
493 reply_ref: str | None = None
494 if parent_id:
495 thread_tuple = database.find_mapped_thread(
496 self.db,
497 parent_id,
498 self.input.user_id,
499 self.input.service,
500 self.user_id,
501 self.service
502 )
503
504 if not thread_tuple:
505 LOGGER.error("Failed to find thread tuple in the database!")
506 return None
507
508 _, reply_ref, new_root_id, new_parent_id = thread_tuple
509
510 lang: str
511 if post.get_languages():
512 lang = post.get_languages()[0]
513 else:
514 lang = 'en'
515
516 raw_statuses = self.split_tokens_media(post.get_tokens(), post.get_attachments())
517 if not raw_statuses:
518 LOGGER.error("Failed to split post into statuses?")
519 return None
520 baked_statuses = []
521
522 for status, raw_media in raw_statuses:
523 media: list[str] | None = None
524 if raw_media:
525 media = self.upload_media(raw_media)
526 if not media:
527 LOGGER.error("Failed to upload attachments!")
528 return None
529 baked_statuses.append((status, media))
530 continue
531 baked_statuses.append((status,[]))
532
533 created_statuses: list[str] = []
534
535 for status, media in baked_statuses:
536 payload = {
537 'status': status,
538 'media_ids': media or [],
539 'spoiler_text': post.get_cw(),
540 'visibility': self.options.get('visibility', 'public'),
541 'content_type': self.text_format,
542 'language': lang
543 }
544
545 if media:
546 payload['sensitive'] = post.is_sensitive()
547
548 if post.get_cw():
549 payload['sensitive'] = True
550
551 if not status:
552 payload['status'] = '🖼️'
553
554 if reply_ref:
555 payload['in_reply_to_id'] = reply_ref
556
557 reqs = requests.post(f'{self.service}/api/v1/statuses', headers={
558 'Authorization': f'Bearer {self.token}',
559 'Content-Type': 'application/json'
560 }, json=payload)
561
562 if reqs.status_code != 200:
563 LOGGER.info("Failed to post status! %s - %s", reqs.status_code, reqs.text)
564 reqs.raise_for_status()
565
566 reply_ref = reqs.json()['id']
567 LOGGER.info("Created new status %s!", reply_ref)
568
569 created_statuses.append(reqs.json()['id'])
570
571 db_post = database.find_post(self.db, post.get_id(), self.input.user_id, self.input.service)
572 assert db_post, "ghghghhhhh"
573
574 if new_root_id is None or new_parent_id is None:
575 new_root_id = database.insert_post(
576 self.db,
577 created_statuses[0],
578 self.user_id,
579 self.service
580 )
581 new_parent_id = new_root_id
582 database.insert_mapping(self.db, db_post['id'], new_parent_id)
583 created_statuses = created_statuses[1:]
584
585 for db_id in created_statuses:
586 new_parent_id = database.insert_reply(
587 self.db,
588 db_id,
589 self.user_id,
590 self.service,
591 new_parent_id,
592 new_root_id
593 )
594 database.insert_mapping(self.db, db_post['id'], new_parent_id)
595
596 def delete_post(self, identifier: str):
597 post = database.find_post(self.db, identifier, self.input.user_id, self.input.service)
598 if not post:
599 return
600
601 mappings = database.find_mappings(self.db, post['id'], self.service, self.user_id)
602 for mapping in mappings[::-1]:
603 LOGGER.info("Deleting '%s'...", mapping[0])
604 requests.delete(f'{self.service}/api/v1/statuses/{mapping[0]}', headers={
605 'Authorization': f'Bearer {self.token}'
606 })
607 database.delete_post(self.db, mapping[0], self.service, self.user_id)
608