social media crossposting tool. 3rd time's the charm
mastodon
misskey
crossposting
bluesky
1from util import LOGGER
2import requests, websockets
3import util, media_util, json, cross
4import database
5from database import DataBaseWorker
6from typing import Callable, Any
7import asyncio, time
8
9from bs4 import BeautifulSoup, Tag
10from bs4.element import NavigableString
11import markeddown
12from html import unescape
13import re
14
15POSSIBLE_MIMES = [
16 'audio/ogg',
17 'audio/mp3',
18 'image/webp',
19 'image/jpeg',
20 'image/png',
21 'video/mp4',
22 'video/quicktime',
23 'video/webm'
24]
25
26md_parser = markeddown.HTMLToMarkdownParser()
27md_parser.preserve_spaces = True
28
29def tokenize_post(status: dict) -> list[cross.Token]:
30 if not status.get('content'):
31 return []
32
33 soup = BeautifulSoup(status['content'], "html.parser")
34 tokens: list[cross.Token] = []
35
36 tags: list[dict] = status.get('tags', [])
37 mentions: list[dict] = status.get('mentions', [])
38
39 def mdd(html):
40 md_parser.feed(unescape(html))
41 md = md_parser.get_markdown()
42 md_parser.reset()
43 return md
44
45 def recurse(node) -> None:
46 if isinstance(node, NavigableString):
47 tokens.append(cross.TextToken(str(node)))
48 return
49
50 if isinstance(node, Tag):
51 if node.name.lower() == "a":
52 href = node.get("href", "")
53 inner_html = "".join(str(c) for c in node.contents)
54 link_text_md = mdd(inner_html)
55
56 if link_text_md.startswith('@'):
57 as_mention = link_text_md[1:]
58 for block in mentions:
59 if href == block.get('url'):
60 tokens.append(cross.MentionToken(block['acct'], block['url']))
61 return
62 elif as_mention == block.get('acct') or as_mention == block.get('username'):
63 tokens.append(cross.MentionToken(block['acct'], block['url']))
64 return
65
66 if link_text_md.startswith('#'):
67 as_tag = link_text_md[1:].lower()
68 if any(as_tag == block.get('name') for block in tags):
69 tokens.append(cross.TagToken(link_text_md[1:]))
70 return
71
72 # idk if we can safely convert this to string
73 tokens.append(cross.LinkToken(str(href), link_text_md))
74 return
75
76 if node.find("a") is not None:
77 for child in node.contents:
78 recurse(child)
79 return
80
81 serialized = str(node)
82 markdownified = mdd(serialized)
83 if markdownified:
84 tokens.append(cross.TextToken(markdownified))
85 return
86 return
87
88 for child in soup.contents:
89 recurse(child)
90
91 if not tokens:
92 return []
93
94 last_token = tokens[-1]
95 if last_token and isinstance(last_token, cross.TextToken) and last_token.text.endswith('\n\n'):
96 tokens[-1] = cross.TextToken(last_token.text[:-2])
97
98 return tokens
99
100MARKDOWNY = ['text/x.misskeymarkdown', 'text/markdown', 'text/plain']
101
102class MastodonPost(cross.Post):
103 def __init__(self, status: dict, tokens: list[cross.Token], media_attachments: list[media_util.MediaInfo]) -> None:
104 super().__init__()
105 self.status = status
106 self.media_attachments = media_attachments
107 self.tokens = tokens
108
109 def get_tokens(self) -> list[cross.Token]:
110 return self.tokens
111
112 def get_parent_id(self) -> str | None:
113 return self.status.get('in_reply_to_id')
114
115 def get_post_date_iso(self) -> str:
116 date = self.status.get('created_at')
117 return date or super().get_post_date_iso()
118
119 def get_cw(self) -> str:
120 return self.status.get('spoiler_text') or ''
121
122 def get_id(self) -> str:
123 return self.status['id']
124
125 def get_languages(self) -> list[str]:
126 if self.status.get('language'):
127 return [self.status['language']]
128 return []
129
130 def is_sensitive(self) -> bool:
131 return self.status.get('sensitive', False)
132
133 def get_attachments(self) -> list[media_util.MediaInfo]:
134 return self.media_attachments
135
136ALLOWED_VISIBILITY = ['public', 'unlisted']
137
138class MastodonInputOptions():
139 def __init__(self, o: dict) -> None:
140 self.allowed_visibility = ALLOWED_VISIBILITY
141 self.filters = [re.compile(f) for f in o.get('regex_filters', [])]
142
143 allowed_visibility = o.get('allowed_visibility')
144 if allowed_visibility is not None:
145 if any([v not in ALLOWED_VISIBILITY for v in allowed_visibility]):
146 raise ValueError(f"'allowed_visibility' only accepts {', '.join(ALLOWED_VISIBILITY)}, got: {allowed_visibility}")
147 self.allowed_visibility = allowed_visibility
148
149class MastodonInput(cross.Input):
150 def __init__(self, settings: dict, db: DataBaseWorker) -> None:
151 self.options = MastodonInputOptions(settings.get('options', {}))
152 self.token = util.as_envvar(settings.get('token')) or (_ for _ in ()).throw(ValueError("'token' is required"))
153 instance: str = util.as_envvar(settings.get('instance')) or (_ for _ in ()).throw(ValueError("'instance' is required"))
154
155 service = instance[:-1] if instance.endswith('/') else instance
156
157 LOGGER.info("Verifying %s credentails...", service)
158 responce = requests.get(f"{service}/api/v1/accounts/verify_credentials", headers={
159 'Authorization': f'Bearer {self.token}'
160 })
161 if responce.status_code != 200:
162 LOGGER.error("Failed to validate user credentials!")
163 responce.raise_for_status()
164 return
165
166 super().__init__(service, responce.json()["id"], settings, db)
167 self.streaming = self._get_streaming_url()
168
169 if not self.streaming:
170 raise Exception("Instance %s does not support streaming!", service)
171
172 def _get_streaming_url(self):
173 response = requests.get(f"{self.service}/api/v1/instance")
174 response.raise_for_status()
175 data: dict = response.json()
176 return (data.get('urls') or {}).get('streaming_api')
177
178 def __to_tokens(self, status: dict):
179 content_type = status.get('content_type', 'text/plain')
180 raw_text = status.get('text')
181
182 tags: list[str] = []
183 for tag in status.get('tags', []):
184 tags.append(tag['name'])
185
186 mentions: list[tuple[str, str]] = []
187 for mention in status.get('mentions', []):
188 mentions.append(('@' + mention['username'], '@' + mention['acct']))
189
190 if raw_text and content_type in MARKDOWNY:
191 return cross.tokenize_markdown(raw_text, tags, mentions)
192
193 akkoma_ext: dict | None = status.get('akkoma', {}).get('source')
194 if akkoma_ext:
195 if akkoma_ext.get('mediaType') in MARKDOWNY:
196 return cross.tokenize_markdown(akkoma_ext["content"], tags, mentions)
197
198 return tokenize_post(status)
199
200 def _on_create_post(self, outputs: list[cross.Output], status: dict):
201 # skip events from other users
202 if (status.get('account') or {})['id'] != self.user_id:
203 return
204
205 if status.get('reblog') or (status.get('quote_id') or status.get('quote')) or status.get('poll'):
206 # TODO polls not supported on bsky. maybe 3rd party? skip for now
207 # we don't handle reblogs. possible with bridgy(?) and self
208 # we don't handle quotes.
209 LOGGER.info("Skipping '%s'! Reblog, quote or poll..", status['id'])
210 return
211
212 in_reply: str | None = status.get('in_reply_to_id')
213 in_reply_to: str | None = status.get('in_reply_to_account_id')
214 if in_reply_to and in_reply_to != self.user_id:
215 # We don't support replies.
216 LOGGER.info("Skipping '%s'! Reply to other user..", status['id'])
217 return
218
219 if status.get('visibility') not in self.options.allowed_visibility:
220 # Skip f/o and direct posts
221 LOGGER.info("Skipping '%s'! '%s' visibility..", status['id'], status.get('visibility'))
222 return
223
224 success = database.try_insert_post(self.db, status['id'], in_reply, self.user_id, self.service)
225 if not success:
226 LOGGER.info("Skipping '%s' as parent post was not found in db!", status['id'])
227 return
228
229 tokens = self.__to_tokens(status)
230 if not cross.test_filters(tokens, self.options.filters):
231 LOGGER.info("Skipping '%s'. Matched a filter!", status['id'])
232 return
233
234 LOGGER.info("Crossposting '%s'...", status['id'])
235
236 media_attachments: list[media_util.MediaInfo] = []
237 for attachment in status.get('media_attachments', []):
238 LOGGER.info("Downloading %s...", attachment['url'])
239 info = media_util.download_media(attachment['url'], attachment.get('description') or '')
240 if not info:
241 LOGGER.error("Skipping '%s'. Failed to download media!", status['id'])
242 return
243 media_attachments.append(info)
244
245 cross_post = MastodonPost(status, tokens, media_attachments)
246 for output in outputs:
247 output.accept_post(cross_post)
248
249 def _on_delete_post(self, outputs: list[cross.Output], identifier: str):
250 post = database.find_post(self.db, identifier, self.user_id, self.service)
251 if not post:
252 return
253
254 LOGGER.info("Deleting '%s'...", identifier)
255 for output in outputs:
256 output.delete_post(identifier)
257 database.delete_post(self.db, identifier, self.user_id, self.service)
258
259 def _on_post(self, outputs: list[cross.Output], event: str, payload: str):
260 if event == 'update':
261 self._on_create_post(outputs, json.loads(payload))
262 elif event == 'delete':
263 self._on_delete_post(outputs, payload)
264
265 async def listen(self, outputs: list[cross.Output], submit: Callable[[Callable[[], Any]], Any]):
266 uri = f"{self.streaming}/api/v1/streaming?stream=user&access_token={self.token}"
267
268 async for ws in websockets.connect(uri, extra_headers={"User-Agent": "XPost/0.0.3"}):
269 try:
270 LOGGER.info("Listening to %s...", self.streaming)
271
272 async def listen_for_messages():
273 async for msg in ws:
274 data = json.loads(msg)
275 event: str = data.get('event')
276 payload: str = data.get('payload')
277
278 submit(lambda: self._on_post(outputs, str(event), str(payload)))
279
280 listen = asyncio.create_task(listen_for_messages())
281
282 await asyncio.gather(listen)
283 except websockets.ConnectionClosedError as e:
284 LOGGER.error(e, stack_info=True, exc_info=True)
285 LOGGER.info("Reconnecting to %s...", self.streaming)
286 continue
287
288ALLOWED_POSTING_VISIBILITY = ['public', 'unlisted', 'private']
289
290class MastodonOutputOptions():
291 def __init__(self, o: dict) -> None:
292 self.visibility = 'public'
293
294 visibility = o.get('visibility')
295 if visibility is not None:
296 if visibility not in ALLOWED_POSTING_VISIBILITY:
297 raise ValueError(f"'visibility' only accepts {', '.join(ALLOWED_POSTING_VISIBILITY)}, got: {visibility}")
298 self.visibility = visibility
299
300class MastodonOutput(cross.Output):
301 def __init__(self, input: cross.Input, settings: dict, db: DataBaseWorker) -> None:
302 super().__init__(input, settings, db)
303 self.options = settings.get('options') or {}
304 self.token = util.as_envvar(settings.get('token')) or (_ for _ in ()).throw(ValueError("'token' is required"))
305 instance: str = util.as_envvar(settings.get('instance')) or (_ for _ in ()).throw(ValueError("'instance' is required"))
306
307 self.service = instance[:-1] if instance.endswith('/') else instance
308
309 LOGGER.info("Verifying %s credentails...", self.service)
310 responce = requests.get(f"{self.service}/api/v1/accounts/verify_credentials", headers={
311 'Authorization': f'Bearer {self.token}'
312 })
313 if responce.status_code != 200:
314 LOGGER.error("Failed to validate user credentials!")
315 responce.raise_for_status()
316 return
317 self.user_id: str = responce.json()["id"]
318
319 LOGGER.info("Getting %s configuration...", self.service)
320 responce = requests.get(f"{self.service}/api/v1/instance", headers={
321 'Authorization': f'Bearer {self.token}'
322 })
323 if responce.status_code != 200:
324 LOGGER.error("Failed to get instance info!")
325 responce.raise_for_status()
326 return
327
328 instance_info: dict = responce.json()
329 configuration: dict = instance_info['configuration']
330
331 statuses_config: dict = configuration.get('statuses', {})
332 self.max_characters: int = statuses_config.get('max_characters', 500)
333 self.max_media_attachments: int = statuses_config.get('max_media_attachments', 4)
334 self.characters_reserved_per_url: int = statuses_config.get('characters_reserved_per_url', 23)
335
336 media_config: dict = configuration.get('media_attachments', {})
337 self.image_size_limit: int = media_config.get('image_size_limit', 16777216)
338 self.video_size_limit: int = media_config.get('video_size_limit', 103809024)
339 self.supported_mime_types: list[str] = media_config.get('supported_mime_types', POSSIBLE_MIMES)
340
341 # *oma: max post chars
342 max_toot_chars = instance_info.get('max_toot_chars')
343 if max_toot_chars:
344 self.max_characters: int = max_toot_chars
345
346 # *oma: max upload limit
347 upload_limit = instance_info.get('upload_limit')
348 if upload_limit:
349 self.image_size_limit: int = upload_limit
350 self.video_size_limit: int = upload_limit
351
352 # *oma ext: supported text types
353 self.text_format = 'text/plain'
354 pleroma = instance_info.get('pleroma')
355 if pleroma:
356 post_formats: list[str] = pleroma.get('metadata', {}).get('post_formats', [])
357 if 'text/x.misskeymarkdown' in post_formats:
358 self.text_format = 'text/x.misskeymarkdown'
359 elif 'text/markdown' in post_formats:
360 self.text_format = 'text/markdown'
361
362 def upload_media(self, attachments: list[media_util.MediaInfo]) -> list[str] | None:
363 for a in attachments:
364 if a.mime.startswith('image/') and len(a.io) > self.image_size_limit:
365 return None
366
367 if a.mime.startswith('video/') and len(a.io) > self.video_size_limit:
368 return None
369
370 if not a.mime.startswith('image/') and not a.mime.startswith('video/'):
371 if len(a.io) > 7_000_000:
372 return None
373
374 uploads: list[dict] = []
375 for a in attachments:
376 data = {}
377 if a.alt:
378 data['description'] = a.alt
379
380 req = requests.post(f"{self.service}/api/v2/media", headers= {
381 'Authorization': f'Bearer {self.token}'
382 }, files={'file': (a.name, a.io, a.mime)}, data=data)
383
384 if req.status_code == 200:
385 LOGGER.info("Uploaded %s! (%s)", a.name, req.json()['id'])
386 uploads.append({
387 'done': True,
388 'id': req.json()['id']
389 })
390 elif req.status_code == 202:
391 LOGGER.info("Waiting for %s to process!", a.name)
392 uploads.append({
393 'done': False,
394 'id': req.json()['id']
395 })
396 else:
397 LOGGER.error("Failed to upload %s! %s", a.name, req.text)
398 req.raise_for_status()
399
400 while any([not val['done'] for val in uploads]):
401 LOGGER.info("Waiting for media to process...")
402 time.sleep(3)
403 for media in uploads:
404 if media['done']:
405 continue
406
407 reqs = requests.get(f'{self.service}/api/v1/media/{media['id']}', headers={
408 'Authorization': f'Bearer {self.token}'
409 })
410
411 if reqs.status_code == 206:
412 continue
413
414 if reqs.status_code == 200:
415 media['done'] = True
416 continue
417 reqs.raise_for_status()
418
419 return [val['id'] for val in uploads]
420
421 def token_to_string(self, tokens: list[cross.Token]) -> str | None:
422 p_text: str = ''
423
424 for token in tokens:
425 if isinstance(token, cross.TextToken):
426 p_text += token.text
427 elif isinstance(token, cross.TagToken):
428 p_text += '#' + token.tag
429 elif isinstance(token, cross.LinkToken):
430 if util.canonical_label(token.label, token.href):
431 p_text += token.href
432 else:
433 if self.text_format == 'text/plain':
434 p_text += f'{token.label}: {token.href}'
435 elif self.text_format in {'text/x.misskeymarkdown', 'text/markdown'}:
436 p_text += f'[{token.label}]({token.href})'
437 else:
438 return None
439
440 return p_text
441
442 def split_tokens_media(self, tokens: list[cross.Token], media: list[media_util.MediaInfo]):
443 split_tokens = cross.split_tokens(tokens, self.max_characters, self.characters_reserved_per_url)
444 post_text: list[str] = []
445
446 for block in split_tokens:
447 baked_text = self.token_to_string(block)
448
449 if baked_text is None:
450 return None
451 post_text.append(baked_text)
452
453 if not post_text:
454 post_text = ['']
455
456 posts: list[dict] = [{"text": post_text, "attachments": []} for post_text in post_text]
457 available_indices: list[int] = list(range(len(posts)))
458
459 current_image_post_idx: int | None = None
460
461 def make_blank_post() -> dict:
462 return {
463 "text": '',
464 "attachments": []
465 }
466
467 def pop_next_empty_index() -> int:
468 if available_indices:
469 return available_indices.pop(0)
470 else:
471 new_idx = len(posts)
472 posts.append(make_blank_post())
473 return new_idx
474
475 for att in media:
476 if (
477 current_image_post_idx is not None
478 and len(posts[current_image_post_idx]["attachments"]) < self.max_media_attachments
479 ):
480 posts[current_image_post_idx]["attachments"].append(att)
481 else:
482 idx = pop_next_empty_index()
483 posts[idx]["attachments"].append(att)
484 current_image_post_idx = idx
485
486 result: list[tuple[str, list[media_util.MediaInfo]]] = []
487
488 for p in posts:
489 result.append((p['text'], p["attachments"]))
490
491 return result
492
493 def accept_post(self, post: cross.Post):
494 parent_id = post.get_parent_id()
495
496 new_root_id: int | None = None
497 new_parent_id: int | None = None
498
499 reply_ref: str | None = None
500 if parent_id:
501 thread_tuple = database.find_mapped_thread(
502 self.db,
503 parent_id,
504 self.input.user_id,
505 self.input.service,
506 self.user_id,
507 self.service
508 )
509
510 if not thread_tuple:
511 LOGGER.error("Failed to find thread tuple in the database!")
512 return None
513
514 _, reply_ref, new_root_id, new_parent_id = thread_tuple
515
516 lang: str
517 if post.get_languages():
518 lang = post.get_languages()[0]
519 else:
520 lang = 'en'
521
522 raw_statuses = self.split_tokens_media(post.get_tokens(), post.get_attachments())
523 if not raw_statuses:
524 LOGGER.error("Failed to split post into statuses?")
525 return None
526 baked_statuses = []
527
528 for status, raw_media in raw_statuses:
529 media: list[str] | None = None
530 if raw_media:
531 media = self.upload_media(raw_media)
532 if not media:
533 LOGGER.error("Failed to upload attachments!")
534 return None
535 baked_statuses.append((status, media))
536 continue
537 baked_statuses.append((status,[]))
538
539 created_statuses: list[str] = []
540
541 for status, media in baked_statuses:
542 payload = {
543 'status': status,
544 'media_ids': media or [],
545 'spoiler_text': post.get_cw(),
546 'visibility': self.options.get('visibility', 'public'),
547 'content_type': self.text_format,
548 'language': lang
549 }
550
551 if media:
552 payload['sensitive'] = post.is_sensitive()
553
554 if post.get_cw():
555 payload['sensitive'] = True
556
557 if not status:
558 payload['status'] = '🖼️'
559
560 if reply_ref:
561 payload['in_reply_to_id'] = reply_ref
562
563 reqs = requests.post(f'{self.service}/api/v1/statuses', headers={
564 'Authorization': f'Bearer {self.token}',
565 'Content-Type': 'application/json'
566 }, json=payload)
567
568 if reqs.status_code != 200:
569 LOGGER.info("Failed to post status! %s - %s", reqs.status_code, reqs.text)
570 reqs.raise_for_status()
571
572 reply_ref = reqs.json()['id']
573 LOGGER.info("Created new status %s!", reply_ref)
574
575 created_statuses.append(reqs.json()['id'])
576
577 db_post = database.find_post(self.db, post.get_id(), self.input.user_id, self.input.service)
578 assert db_post, "ghghghhhhh"
579
580 if new_root_id is None or new_parent_id is None:
581 new_root_id = database.insert_post(
582 self.db,
583 created_statuses[0],
584 self.user_id,
585 self.service
586 )
587 new_parent_id = new_root_id
588 database.insert_mapping(self.db, db_post['id'], new_parent_id)
589 created_statuses = created_statuses[1:]
590
591 for db_id in created_statuses:
592 new_parent_id = database.insert_reply(
593 self.db,
594 db_id,
595 self.user_id,
596 self.service,
597 new_parent_id,
598 new_root_id
599 )
600 database.insert_mapping(self.db, db_post['id'], new_parent_id)
601
602 def delete_post(self, identifier: str):
603 post = database.find_post(self.db, identifier, self.input.user_id, self.input.service)
604 if not post:
605 return
606
607 mappings = database.find_mappings(self.db, post['id'], self.service, self.user_id)
608 for mapping in mappings[::-1]:
609 LOGGER.info("Deleting '%s'...", mapping[0])
610 requests.delete(f'{self.service}/api/v1/statuses/{mapping[0]}', headers={
611 'Authorization': f'Bearer {self.token}'
612 })
613 database.delete_post(self.db, mapping[0], self.service, self.user_id)
614