social media crossposting tool. 3rd time's the charm
mastodon
misskey
crossposting
bluesky
1from util import LOGGER
2import requests, websockets
3import util, media_util, json, cross
4import database
5from database import DataBaseWorker
6from typing import Callable, Any
7import asyncio, time
8
9from bs4 import BeautifulSoup, Tag
10from bs4.element import NavigableString
11import markeddown
12from html import unescape
13import re
14
15POSSIBLE_MIMES = [
16 'audio/ogg',
17 'audio/mp3',
18 'image/webp',
19 'image/jpeg',
20 'image/png',
21 'video/mp4',
22 'video/quicktime',
23 'video/webm'
24]
25
26md_parser = markeddown.HTMLToMarkdownParser()
27md_parser.preserve_spaces = True
28
29def tokenize_post(status: dict) -> list[cross.Token]:
30 soup = BeautifulSoup(status['content'], "html.parser")
31 tokens: list[cross.Token] = []
32
33 tags: list[dict] = status.get('tags', [])
34 mentions: list[dict] = status.get('mentions', [])
35
36 def mdd(html):
37 md_parser.feed(unescape(html))
38 md = md_parser.get_markdown()
39 md_parser.reset()
40 return md
41
42 def recurse(node) -> None:
43 if isinstance(node, NavigableString):
44 tokens.append(cross.TextToken(str(node)))
45 return
46
47 if isinstance(node, Tag):
48 if node.name.lower() == "a":
49 href = node.get("href", "")
50 inner_html = "".join(str(c) for c in node.contents)
51 link_text_md = mdd(inner_html)
52
53 if link_text_md.startswith('@'):
54 as_mention = link_text_md[1:]
55 for block in mentions:
56 if href == block.get('url'):
57 tokens.append(cross.MentionToken(block['acct'], block['url']))
58 return
59 elif as_mention == block.get('acct') or as_mention == block.get('username'):
60 tokens.append(cross.MentionToken(block['acct'], block['url']))
61 return
62
63 if link_text_md.startswith('#'):
64 as_tag = link_text_md[1:].lower()
65 if any(as_tag == block.get('name') for block in tags):
66 tokens.append(cross.TagToken(link_text_md[1:]))
67 return
68
69 # idk if we can safely convert this to string
70 tokens.append(cross.LinkToken(str(href), link_text_md))
71 return
72
73 if node.find("a") is not None:
74 for child in node.contents:
75 recurse(child)
76 return
77
78 serialized = str(node)
79 markdownified = mdd(serialized)
80 if markdownified:
81 tokens.append(cross.TextToken(markdownified))
82 return
83 return
84
85 for child in soup.contents:
86 recurse(child)
87
88 last_token = tokens[-1]
89 if last_token and isinstance(last_token, cross.TextToken) and last_token.text.endswith('\n\n'):
90 tokens[-1] = cross.TextToken(last_token.text[:-2])
91
92 return tokens
93
94MARKDOWNY = ['text/x.misskeymarkdown', 'text/markdown', 'text/plain']
95
96class MastodonPost(cross.Post):
97 def __init__(self, status: dict, tokens: list[cross.Token], media_attachments: list[media_util.MediaInfo]) -> None:
98 super().__init__()
99 self.status = status
100 self.media_attachments = media_attachments
101 self.tokens = tokens
102
103 def get_tokens(self) -> list[cross.Token]:
104 return self.tokens
105
106 def get_parent_id(self) -> str | None:
107 return self.status.get('in_reply_to_id')
108
109 def get_post_date_iso(self) -> str:
110 date = self.status.get('created_at')
111 return date or super().get_post_date_iso()
112
113 def get_cw(self) -> str:
114 return self.status.get('spoiler_text') or ''
115
116 def get_id(self) -> str:
117 return self.status['id']
118
119 def get_languages(self) -> list[str]:
120 if self.status.get('language'):
121 return [self.status['language']]
122 return []
123
124 def is_sensitive(self) -> bool:
125 return self.status.get('sensitive', False)
126
127 def get_attachments(self) -> list[media_util.MediaInfo]:
128 return self.media_attachments
129
130ALLOWED_VISIBILITY = ['public', 'unlisted']
131
132class MastodonInputOptions():
133 def __init__(self, o: dict) -> None:
134 self.allowed_visibility = ALLOWED_VISIBILITY
135 self.filters = [re.compile(f) for f in o.get('regex_filters', [])]
136
137 allowed_visibility = o.get('allowed_visibility')
138 if allowed_visibility is not None:
139 if any([v not in ALLOWED_VISIBILITY for v in allowed_visibility]):
140 raise ValueError(f"'allowed_visibility' only accepts {', '.join(ALLOWED_VISIBILITY)}, got: {allowed_visibility}")
141 self.allowed_visibility = allowed_visibility
142
143class MastodonInput(cross.Input):
144 def __init__(self, settings: dict, db: DataBaseWorker) -> None:
145 self.options = MastodonInputOptions(settings.get('options', {}))
146 self.token = util.as_envvar(settings.get('token')) or (_ for _ in ()).throw(ValueError("'token' is required"))
147 instance: str = util.as_envvar(settings.get('instance')) or (_ for _ in ()).throw(ValueError("'instance' is required"))
148
149 service = instance[:-1] if instance.endswith('/') else instance
150
151 LOGGER.info("Verifying %s credentails...", service)
152 responce = requests.get(f"{service}/api/v1/accounts/verify_credentials", headers={
153 'Authorization': f'Bearer {self.token}'
154 })
155 if responce.status_code != 200:
156 LOGGER.error("Failed to validate user credentials!")
157 responce.raise_for_status()
158 return
159
160 super().__init__(service, responce.json()["id"], settings, db)
161 self.streaming = self._get_streaming_url()
162
163 if not self.streaming:
164 raise Exception("Instance %s does not support streaming!", service)
165
166 def _get_streaming_url(self):
167 response = requests.get(f"{self.service}/api/v1/instance")
168 response.raise_for_status()
169 data: dict = response.json()
170 return (data.get('urls') or {}).get('streaming_api')
171
172 def __to_tokens(self, status: dict):
173 content_type = status.get('content_type', 'text/plain')
174 raw_text = status.get('text')
175
176 tags: list[str] = []
177 for tag in status.get('tags', []):
178 tags.append(tag['name'])
179
180 mentions: list[tuple[str, str]] = []
181 for mention in status.get('mentions', []):
182 mentions.append(('@' + mention['username'], '@' + mention['acct']))
183
184 if raw_text and content_type in MARKDOWNY:
185 return cross.tokenize_markdown(raw_text, tags, mentions)
186
187 pleroma_ext: dict | None = status.get('pleroma', {}).get('content')
188 if pleroma_ext:
189 for ctype in MARKDOWNY:
190 if ctype not in pleroma_ext:
191 continue
192
193 return cross.tokenize_markdown(pleroma_ext[ctype], tags, mentions)
194
195 return tokenize_post(status)
196
197 def _on_create_post(self, outputs: list[cross.Output], status: dict):
198 # skip events from other users
199 if (status.get('account') or {})['id'] != self.user_id:
200 return
201
202 if status.get('reblog') or (status.get('quote_id') or status.get('quote')) or status.get('poll'):
203 # TODO polls not supported on bsky. maybe 3rd party? skip for now
204 # we don't handle reblogs. possible with bridgy(?) and self
205 # we don't handle quotes.
206 LOGGER.info("Skipping '%s'! Reblog, quote or poll..", status['id'])
207 return
208
209 in_reply: str | None = status.get('in_reply_to_id')
210 in_reply_to: str | None = status.get('in_reply_to_account_id')
211 if in_reply_to and in_reply_to != self.user_id:
212 # We don't support replies.
213 LOGGER.info("Skipping '%s'! Reply to other user..", status['id'])
214 return
215
216 if status.get('visibility') not in self.options.allowed_visibility:
217 # Skip f/o and direct posts
218 LOGGER.info("Skipping '%s'! '%s' visibility..", status['id'], status.get('visibility'))
219 return
220
221 success = database.try_insert_post(self.db, status['id'], in_reply, self.user_id, self.service)
222 if not success:
223 LOGGER.info("Skipping '%s' as parent post was not found in db!", status['id'])
224 return
225
226 tokens = self.__to_tokens(status)
227 if not cross.test_filters(tokens, self.options.filters):
228 LOGGER.info("Skipping '%s'. Matched a filter!", status['id'])
229 return
230
231 LOGGER.info("Crossposting '%s'...", status['id'])
232
233 media_attachments: list[media_util.MediaInfo] = []
234 for attachment in status.get('media_attachments', []):
235 LOGGER.info("Downloading %s...", attachment['url'])
236 info = media_util.download_media(attachment['url'], attachment.get('description') or '')
237 if not info:
238 LOGGER.error("Skipping '%s'. Failed to download media!", status['id'])
239 return
240 media_attachments.append(info)
241
242 cross_post = MastodonPost(status, tokens, media_attachments)
243 for output in outputs:
244 output.accept_post(cross_post)
245
246 def _on_delete_post(self, outputs: list[cross.Output], identifier: str):
247 post = database.find_post(self.db, identifier, self.user_id, self.service)
248 if not post:
249 return
250
251 LOGGER.info("Deleting '%s'...", identifier)
252 for output in outputs:
253 output.delete_post(identifier)
254 database.delete_post(self.db, identifier, self.user_id, self.service)
255
256 def _on_post(self, outputs: list[cross.Output], event: str, payload: str):
257 if event == 'update':
258 self._on_create_post(outputs, json.loads(payload))
259 elif event == 'delete':
260 self._on_delete_post(outputs, payload)
261
262 async def listen(self, outputs: list[cross.Output], submit: Callable[[Callable[[], Any]], Any]):
263 uri = f"{self.streaming}/api/v1/streaming?stream=user&access_token={self.token}"
264
265 async for ws in websockets.connect(uri, extra_headers={"User-Agent": "XPost/0.0.3"}):
266 try:
267 LOGGER.info("Listening to %s...", self.streaming)
268
269 async def listen_for_messages():
270 async for msg in ws:
271 data = json.loads(msg)
272 event: str = data.get('event')
273 payload: str = data.get('payload')
274
275 submit(lambda: self._on_post(outputs, str(event), str(payload)))
276
277 listen = asyncio.create_task(listen_for_messages())
278
279 await asyncio.gather(listen)
280 except websockets.ConnectionClosedError as e:
281 LOGGER.error(e, stack_info=True, exc_info=True)
282 LOGGER.info("Reconnecting to %s...", self.streaming)
283 continue
284
285ALLOWED_POSTING_VISIBILITY = ['public', 'unlisted', 'private']
286
287class MastodonOutputOptions():
288 def __init__(self, o: dict) -> None:
289 self.visibility = 'public'
290
291 visibility = o.get('visibility')
292 if visibility is not None:
293 if visibility not in ALLOWED_POSTING_VISIBILITY:
294 raise ValueError(f"'visibility' only accepts {', '.join(ALLOWED_POSTING_VISIBILITY)}, got: {visibility}")
295 self.visibility = visibility
296
297class MastodonOutput(cross.Output):
298 def __init__(self, input: cross.Input, settings: dict, db: DataBaseWorker) -> None:
299 super().__init__(input, settings, db)
300 self.options = settings.get('options') or {}
301 self.token = util.as_envvar(settings.get('token')) or (_ for _ in ()).throw(ValueError("'token' is required"))
302 instance: str = util.as_envvar(settings.get('instance')) or (_ for _ in ()).throw(ValueError("'instance' is required"))
303
304 self.service = instance[:-1] if instance.endswith('/') else instance
305
306 LOGGER.info("Verifying %s credentails...", self.service)
307 responce = requests.get(f"{self.service}/api/v1/accounts/verify_credentials", headers={
308 'Authorization': f'Bearer {self.token}'
309 })
310 if responce.status_code != 200:
311 LOGGER.error("Failed to validate user credentials!")
312 responce.raise_for_status()
313 return
314 self.user_id: str = responce.json()["id"]
315
316 LOGGER.info("Getting %s configuration...", self.service)
317 responce = requests.get(f"{self.service}/api/v1/instance", headers={
318 'Authorization': f'Bearer {self.token}'
319 })
320 if responce.status_code != 200:
321 LOGGER.error("Failed to get instance info!")
322 responce.raise_for_status()
323 return
324
325 instance_info: dict = responce.json()
326 configuration: dict = instance_info['configuration']
327
328 statuses_config: dict = configuration.get('statuses', {})
329 self.max_characters: int = statuses_config.get('max_characters', 500)
330 self.max_media_attachments: int = statuses_config.get('max_media_attachments', 4)
331 self.characters_reserved_per_url: int = statuses_config.get('characters_reserved_per_url', 23)
332
333 media_config: dict = configuration.get('media_attachments', {})
334 self.image_size_limit: int = media_config.get('image_size_limit', 16777216)
335 self.video_size_limit: int = media_config.get('video_size_limit', 103809024)
336 self.supported_mime_types: list[str] = media_config.get('supported_mime_types', POSSIBLE_MIMES)
337
338 # *oma: max post chars
339 max_toot_chars = instance_info.get('max_toot_chars')
340 if max_toot_chars:
341 self.max_characters: int = max_toot_chars
342
343 # *oma: max upload limit
344 upload_limit = instance_info.get('upload_limit')
345 if upload_limit:
346 self.image_size_limit: int = upload_limit
347 self.video_size_limit: int = upload_limit
348
349 # *oma ext: supported text types
350 self.text_format = 'text/plain'
351 pleroma = instance_info.get('pleroma')
352 if pleroma:
353 post_formats: list[str] = pleroma.get('metadata', {}).get('post_formats', [])
354 if 'text/x.misskeymarkdown' in post_formats:
355 self.text_format = 'text/x.misskeymarkdown'
356 elif 'text/markdown' in post_formats:
357 self.text_format = 'text/markdown'
358
359 def upload_media(self, attachments: list[media_util.MediaInfo]) -> list[str] | None:
360 for a in attachments:
361 if a.mime.startswith('image/') and len(a.io) > self.image_size_limit:
362 return None
363
364 if a.mime.startswith('video/') and len(a.io) > self.video_size_limit:
365 return None
366
367 if not a.mime.startswith('image/') and not a.mime.startswith('video/'):
368 if len(a.io) > 7_000_000:
369 return None
370
371 uploads: list[dict] = []
372 for a in attachments:
373 data = {}
374 if a.alt:
375 data['description'] = a.alt
376
377 req = requests.post(f"{self.service}/api/v2/media", headers= {
378 'Authorization': f'Bearer {self.token}'
379 }, files={'file': (a.name, a.io, a.mime)}, data=data)
380
381 if req.status_code == 200:
382 LOGGER.info("Uploaded %s! (%s)", a.name, req.json()['id'])
383 uploads.append({
384 'done': True,
385 'id': req.json()['id']
386 })
387 elif req.status_code == 202:
388 LOGGER.info("Waiting for %s to process!", a.name)
389 uploads.append({
390 'done': False,
391 'id': req.json()['id']
392 })
393 else:
394 LOGGER.error("Failed to upload %s! %s", a.name, req.text)
395 req.raise_for_status()
396
397 while any([not val['done'] for val in uploads]):
398 LOGGER.info("Waiting for media to process...")
399 time.sleep(3)
400 for media in uploads:
401 if media['done']:
402 continue
403
404 reqs = requests.get(f'{self.service}/api/v1/media/{media['id']}', headers={
405 'Authorization': f'Bearer {self.token}'
406 })
407
408 if reqs.status_code == 206:
409 continue
410
411 if reqs.status_code == 200:
412 media['done'] = True
413 continue
414 reqs.raise_for_status()
415
416 return [val['id'] for val in uploads]
417
418 def token_to_string(self, tokens: list[cross.Token]) -> str | None:
419 p_text: str = ''
420
421 for token in tokens:
422 if isinstance(token, cross.TextToken):
423 p_text += token.text
424 elif isinstance(token, cross.TagToken):
425 p_text += '#' + token.tag
426 elif isinstance(token, cross.LinkToken):
427 if util.canonical_label(token.label, token.href):
428 p_text += token.href
429 else:
430 if self.text_format == 'text/plain':
431 p_text += f'{token.label}: {token.href}'
432 elif self.text_format in {'text/x.misskeymarkdown', 'text/markdown'}:
433 p_text += f'[{token.label}]({token.href})'
434 else:
435 return None
436
437 return p_text
438
439 def split_tokens_media(self, tokens: list[cross.Token], media: list[media_util.MediaInfo]):
440 split_tokens = cross.split_tokens(tokens, self.max_characters, self.characters_reserved_per_url)
441 post_text: list[str] = []
442
443 for block in split_tokens:
444 baked_text = self.token_to_string(block)
445
446 if baked_text is None:
447 return None
448 post_text.append(baked_text)
449
450 if not post_text:
451 post_text = ['']
452
453 posts: list[dict] = [{"text": post_text, "attachments": []} for post_text in post_text]
454 available_indices: list[int] = list(range(len(posts)))
455
456 current_image_post_idx: int | None = None
457
458 def make_blank_post() -> dict:
459 return {
460 "text": '',
461 "attachments": []
462 }
463
464 def pop_next_empty_index() -> int:
465 if available_indices:
466 return available_indices.pop(0)
467 else:
468 new_idx = len(posts)
469 posts.append(make_blank_post())
470 return new_idx
471
472 for att in media:
473 if (
474 current_image_post_idx is not None
475 and len(posts[current_image_post_idx]["attachments"]) < self.max_media_attachments
476 ):
477 posts[current_image_post_idx]["attachments"].append(att)
478 else:
479 idx = pop_next_empty_index()
480 posts[idx]["attachments"].append(att)
481 current_image_post_idx = idx
482
483 result: list[tuple[str, list[media_util.MediaInfo]]] = []
484
485 for p in posts:
486 result.append((p['text'], p["attachments"]))
487
488 return result
489
490 def accept_post(self, post: cross.Post):
491 parent_id = post.get_parent_id()
492
493 new_root_id: int | None = None
494 new_parent_id: int | None = None
495
496 reply_ref: str | None = None
497 if parent_id:
498 thread_tuple = database.find_mapped_thread(
499 self.db,
500 parent_id,
501 self.input.user_id,
502 self.input.service,
503 self.user_id,
504 self.service
505 )
506
507 if not thread_tuple:
508 LOGGER.error("Failed to find thread tuple in the database!")
509 return None
510
511 _, reply_ref, new_root_id, new_parent_id = thread_tuple
512
513 lang: str
514 if post.get_languages():
515 lang = post.get_languages()[0]
516 else:
517 lang = 'en'
518
519 raw_statuses = self.split_tokens_media(post.get_tokens(), post.get_attachments())
520 if not raw_statuses:
521 LOGGER.error("Failed to split post into statuses?")
522 return None
523 baked_statuses = []
524
525 for status, raw_media in raw_statuses:
526 media: list[str] | None = None
527 if raw_media:
528 media = self.upload_media(raw_media)
529 if not media:
530 LOGGER.error("Failed to upload attachments!")
531 return None
532 baked_statuses.append((status, media))
533 continue
534 baked_statuses.append((status,[]))
535
536 created_statuses: list[str] = []
537
538 for status, media in baked_statuses:
539 payload = {
540 'status': status,
541 'media_ids': media or [],
542 'spoiler_text': post.get_cw(),
543 'visibility': self.options.get('visibility', 'public'),
544 'content_type': self.text_format,
545 'language': lang
546 }
547
548 if media:
549 payload['sensitive'] = post.is_sensitive()
550
551 if post.get_cw():
552 payload['sensitive'] = True
553
554 if not status:
555 payload['status'] = '🖼️'
556
557 if reply_ref:
558 payload['in_reply_to_id'] = reply_ref
559
560 reqs = requests.post(f'{self.service}/api/v1/statuses', headers={
561 'Authorization': f'Bearer {self.token}',
562 'Content-Type': 'application/json'
563 }, json=payload)
564
565 if reqs.status_code != 200:
566 LOGGER.info("Failed to post status! %s - %s", reqs.status_code, reqs.text)
567 reqs.raise_for_status()
568
569 reply_ref = reqs.json()['id']
570 LOGGER.info("Created new status %s!", reply_ref)
571
572 created_statuses.append(reqs.json()['id'])
573
574 db_post = database.find_post(self.db, post.get_id(), self.input.user_id, self.input.service)
575 assert db_post, "ghghghhhhh"
576
577 if new_root_id is None or new_parent_id is None:
578 new_root_id = database.insert_post(
579 self.db,
580 created_statuses[0],
581 self.user_id,
582 self.service
583 )
584 new_parent_id = new_root_id
585 database.insert_mapping(self.db, db_post['id'], new_parent_id)
586 created_statuses = created_statuses[1:]
587
588 for db_id in created_statuses:
589 new_parent_id = database.insert_reply(
590 self.db,
591 db_id,
592 self.user_id,
593 self.service,
594 new_parent_id,
595 new_root_id
596 )
597 database.insert_mapping(self.db, db_post['id'], new_parent_id)
598
599 def delete_post(self, identifier: str):
600 post = database.find_post(self.db, identifier, self.input.user_id, self.input.service)
601 if not post:
602 return
603
604 mappings = database.find_mappings(self.db, post['id'], self.service, self.user_id)
605 for mapping in mappings[::-1]:
606 LOGGER.info("Deleting '%s'...", mapping[0])
607 requests.delete(f'{self.service}/api/v1/statuses/{mapping[0]}', headers={
608 'Authorization': f'Bearer {self.token}'
609 })
610 database.delete_post(self.db, mapping[0], self.service, self.user_id)
611