social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky

feat: strip mfm from crossposted posts, leave a link to the original post. (this code sucks) fix: html in markdown

zenfyr.dev c004136e 5918f9b4

verified
+11
bluesky/common.py
···
def get_attachments(self) -> list[MediaInfo]:
return self.attachments
+
def get_text_type(self) -> str:
+
return "text/plain"
+
+
def get_post_url(self) -> str | None:
+
at_uri: str = self.post['$xpost.strongRef']['uri'][len("at://"):]
+
+
parts = at_uri.split("/")
+
did, _, post_id = parts
+
+
return f"https://bsky.app/profile/{did}/post/{post_id}"
+
def tokens_to_richtext(tokens: list[cross.Token]) -> client_utils.TextBuilder | None:
builder = client_utils.TextBuilder()
+8 -1
bluesky/output.py
···
from bluesky.common import SERVICE, ADULT_PATTERN, PORN_PATTERN, tokens_to_richtext
import cross, util.database as database
+
import misskey.mfm_util as mfm_util
from util.util import LOGGER, as_envvar
from util.media import MediaInfo, get_filename_from_url, get_media_meta, compress_image, convert_to_mp4
from util.database import DataBaseWorker
···
f"[{get_filename_from_url(attachment.url)}]"
))
tokens.append(cross.TextToken(' '))
-
+
+
if post.get_text_type() == "text/x.misskeymarkdown":
+
tokens, status = mfm_util.strip_mfm(tokens)
+
post_url = post.get_post_url()
+
if status and post_url:
+
tokens.append(cross.TextToken('\n'))
+
tokens.append(cross.LinkToken(post_url, "[Post contains MFM, see original]"))
split_tokens: list[list[cross.Token]] = cross.split_tokens(tokens, 300)
post_text: list[client_utils.TextBuilder] = []
+10 -83
cross.py
···
import re
ALTERNATE = re.compile(r'\S+|\s+')
-
URL = re.compile(r'(?:(?:[A-Za-z][A-Za-z0-9+.-]*://)|mailto:)[^\s]+', re.IGNORECASE)
-
MD_INLINE_LINK = re.compile(r"\[([^\]]+)\]\(\s*((?:(?:[A-Za-z][A-Za-z0-9+.\-]*://)|mailto:)[^\s\)]+)\s*\)", re.IGNORECASE)
-
MD_AUTOLINK = re.compile(r"<((?:(?:[A-Za-z][A-Za-z0-9+.\-]*://)|mailto:)[^\s>]+)>", re.IGNORECASE)
-
HASHTAG = re.compile(r'(?<!\w)\#([\w]+)')
-
FEDIVERSE_HANDLE = re.compile(r'(?<![\w@])@([\w\.-]+)(?:@([\w\.-]+\.[\w\.-]+))?')
# generic token
class Token():
···
class Post():
def __init__(self) -> None:
+
self.now_timestamp = datetime.now(timezone.utc).isoformat()
pass
def get_tokens(self) -> list[Token]:
···
return None
def get_post_date_iso(self) -> str:
-
return datetime.now(timezone.utc).isoformat()
+
return self.now_timestamp
def get_attachments(self) -> list[MediaInfo]:
return []
···
def is_sensitive(self) -> bool:
return False
+
# returns input text type.
+
# text/plain, text/markdown, text/x.misskeymarkdown
+
def get_text_type(self) -> str:
+
return 'text/plain'
+
+
def get_post_url(self) -> str | None:
+
return None
+
# generic input service.
# user and service for db queries
class Input():
···
return False
return True
-
-
def tokenize_markdown(text: str, tags: list[str], handles: list[tuple[str, str]]) -> list[Token]:
-
if not text:
-
return []
-
-
index: int = 0
-
total: int = len(text)
-
buffer: list[str] = []
-
-
tokens: list[Token] = []
-
-
def flush():
-
nonlocal buffer
-
if buffer:
-
tokens.append(TextToken(''.join(buffer)))
-
buffer = []
-
-
while index < total:
-
if text[index] == '[':
-
md_inline = MD_INLINE_LINK.match(text, index)
-
if md_inline:
-
flush()
-
label = md_inline.group(1)
-
href = md_inline.group(2)
-
tokens.append(LinkToken(href, label))
-
index = md_inline.end()
-
continue
-
-
if text[index] == '<':
-
md_auto = MD_AUTOLINK.match(text, index)
-
if md_auto:
-
flush()
-
href = md_auto.group(1)
-
tokens.append(LinkToken(href, href))
-
index = md_auto.end()
-
continue
-
-
if text[index] == '#':
-
tag = HASHTAG.match(text, index)
-
if tag:
-
tag_text = tag.group(1)
-
if tag_text.lower() in tags:
-
flush()
-
tokens.append(TagToken(tag_text))
-
index = tag.end()
-
continue
-
-
if text[index] == '@':
-
handle = FEDIVERSE_HANDLE.match(text, index)
-
if handle:
-
handle_text = handle.group(0)
-
stripped_handle = handle_text.strip()
-
-
match = next(
-
(pair for pair in handles if stripped_handle in pair),
-
None
-
)
-
-
if match:
-
flush()
-
tokens.append(MentionToken(match[1], '')) # TODO: misskey doesn’t provide a uri
-
index = handle.end()
-
continue
-
-
url = URL.match(text, index)
-
if url:
-
flush()
-
href = url.group(0)
-
tokens.append(LinkToken(href, href))
-
index = url.end()
-
continue
-
-
buffer.append(text[index])
-
index += 1
-
-
flush()
-
return tokens
def split_tokens(tokens: list[Token], max_chars: int, max_link_len: int = 35) -> list[list[Token]]:
def new_block():
+10 -6
mastodon/common.py
···
self.status = status
self.media_attachments = media_attachments
self.tokens = tokens
+
self.content_type = status.get('content_type', 'text/plain')
def get_tokens(self) -> list[cross.Token]:
return self.tokens
···
return self.status.get('in_reply_to_id')
def get_post_date_iso(self) -> str:
-
date = self.status.get('created_at')
-
return date or super().get_post_date_iso()
+
return self.status.get('created_at') or self.now_timestamp
def get_cw(self) -> str:
return self.status.get('spoiler_text') or ''
···
return self.status['id']
def get_languages(self) -> list[str]:
-
if self.status.get('language'):
-
return [self.status['language']]
-
return []
+
return [self.status['language']] if self.status.get('language') else []
def is_sensitive(self) -> bool:
return self.status.get('sensitive', False)
def get_attachments(self) -> list[MediaInfo]:
-
return self.media_attachments
+
return self.media_attachments
+
+
def get_text_type(self) -> str:
+
return self.content_type
+
+
def get_post_url(self) -> str | None:
+
return self.status.get('url')
+13 -13
mastodon/html_util.py util/html_util.py
···
self.tokens: list[cross.Token] = []
self.status: dict
+
self.mentions: list[tuple[str, str]]
+
self.tags: list[str]
+
self.in_pre = False
self.in_code = False
···
self.anchor_data = []
if anchor_data.startswith('#'):
-
tags: list[dict] = self.status.get('tags', [])
-
as_tag = anchor_data[1:].lower()
-
if any(as_tag == block.get('name') for block in tags):
+
if any(as_tag == block for block in self.tags):
self.tokens.append(cross.TagToken(anchor_data[1:]))
elif anchor_data.startswith('@'):
-
mentions: list[dict] = self.status.get('mentions', [])
+
match = next(
+
(pair for pair in self.mentions if anchor_data in pair),
+
None
+
)
-
as_mention = anchor_data[1:]
-
for block in mentions:
-
if href == block.get('url'):
-
self.tokens.append(cross.MentionToken(block['acct'], block['url']))
-
break
-
elif as_mention == block.get('acct') or as_mention == block.get('username'):
-
self.tokens.append(cross.MentionToken(block['acct'], block['url']))
-
break
+
if match:
+
self.tokens.append(cross.MentionToken(match[1], ''))
else:
self.tokens.append(cross.LinkToken(href, anchor_data))
···
"""Reset the parser state for reuse."""
super().reset()
self.tokens = []
-
self.status = {}
+
+
self.mentions = []
+
self.tags = []
self.in_pre = False
self.in_code = False
+6 -4
mastodon/input.py
···
import asyncio
from mastodon.common import MastodonPost
-
import mastodon.html_util as html_util
+
import util.html_util as html_util
+
import util.md_util as md_util
import cross, util.database as database
from util.util import LOGGER, as_envvar
···
mentions.append(('@' + mention['username'], '@' + mention['acct']))
if raw_text and content_type in MARKDOWNY:
-
return cross.tokenize_markdown(raw_text, tags, mentions)
+
return md_util.tokenize_markdown(raw_text, tags, mentions)
akkoma_ext: dict | None = status.get('akkoma', {}).get('source')
if akkoma_ext:
if akkoma_ext.get('mediaType') in MARKDOWNY:
-
return cross.tokenize_markdown(akkoma_ext["content"], tags, mentions)
+
return md_util.tokenize_markdown(akkoma_ext["content"], tags, mentions)
tokenizer = html_util.HTMLPostTokenizer()
-
tokenizer.status = status
+
tokenizer.mentions = mentions
+
tokenizer.tags = tags
tokenizer.feed(status.get('content', ""))
return tokenizer.get_tokens()
+11 -2
mastodon/output.py
···
import requests, time
import cross, util.database as database
+
import misskey.mfm_util as mfm_util
from util.util import LOGGER, as_envvar, canonical_label
from util.media import MediaInfo
from util.database import DataBaseWorker
···
lang = post.get_languages()[0]
else:
lang = 'en'
-
-
raw_statuses = self.split_tokens_media(post.get_tokens(), post.get_attachments())
+
+
post_tokens = post.get_tokens()
+
if post.get_text_type() == "text/x.misskeymarkdown":
+
post_tokens, status = mfm_util.strip_mfm(post_tokens)
+
post_url = post.get_post_url()
+
if status and post_url:
+
post_tokens.append(cross.TextToken('\n'))
+
post_tokens.append(cross.LinkToken(post_url, "[Post contains MFM, see original]"))
+
+
raw_statuses = self.split_tokens_media(post_tokens, post.get_attachments())
if not raw_statuses:
LOGGER.error("Failed to split post into statuses?")
return None
+9 -2
misskey/common.py
···
from util.media import MediaInfo
class MisskeyPost(cross.Post):
-
def __init__(self, note: dict, tokens: list[cross.Token], files: list[MediaInfo]) -> None:
+
def __init__(self, instance_url: str, note: dict, tokens: list[cross.Token], files: list[MediaInfo]) -> None:
super().__init__()
self.note = note
self.sensitive = any([a.get('isSensitive', False) for a in note.get('files', [])])
self.media_attachments = files
self.tokens = tokens
+
self.url = instance_url + '/notes/' + note['id']
def get_tokens(self) -> list[cross.Token]:
return self.tokens
···
return []
def is_sensitive(self) -> bool:
-
return self.sensitive
+
return self.sensitive
+
+
def get_text_type(self) -> str:
+
return "text/x.misskeymarkdown"
+
+
def get_post_url(self) -> str | None:
+
return self.url
+3 -2
misskey/input.py
···
from misskey.common import MisskeyPost
import cross, util.database as database
+
import util.md_util as md_util
from util.media import MediaInfo, download_media
from util.util import LOGGER, as_envvar
···
for key, value in mention_handles.items():
handles.append((value, value))
-
tokens = cross.tokenize_markdown(note.get('text', ''), tags, handles)
+
tokens = md_util.tokenize_markdown(note.get('text', ''), tags, handles)
if not cross.test_filters(tokens, self.options.filters):
LOGGER.info("Skipping '%s'. Matched a filter!", note['id'])
return
···
return
media_attachments.append(info)
-
cross_post = MisskeyPost(note, tokens, media_attachments)
+
cross_post = MisskeyPost(self.service, note, tokens, media_attachments)
for output in outputs:
output.accept_post(cross_post)
+35
misskey/mfm_util.py
···
+
import re, cross
+
+
MFM_PATTERN = re.compile(r'\$\[([^\[\]]+)\]')
+
+
def strip_mfm(tokens: list[cross.Token]) -> tuple[list[cross.Token], bool]:
+
modified = False
+
+
for tk in tokens:
+
if isinstance(tk, cross.TextToken):
+
original = tk.text
+
cleaned = __strip_mfm(original)
+
if cleaned != original:
+
modified = True
+
tk.text = cleaned
+
+
elif isinstance(tk, cross.LinkToken):
+
original = tk.label
+
cleaned = __strip_mfm(original)
+
if cleaned != original:
+
modified = True
+
tk.label = cleaned
+
+
return tokens, modified
+
+
def __strip_mfm(text: str) -> str:
+
def match_contents(match: re.Match[str]):
+
content = match.group(1).strip()
+
parts = content.split(' ', 1)
+
return parts[1] if len(parts) > 1 else ''
+
+
while MFM_PATTERN.search(text):
+
text = MFM_PATTERN.sub(match_contents, text)
+
+
return text
+
+112
util/md_util.py
···
+
import re
+
+
import cross
+
import util.html_util as html_util
+
import util.util as util
+
+
URL = re.compile(r'(?:(?:[A-Za-z][A-Za-z0-9+.-]*://)|mailto:)[^\s]+', re.IGNORECASE)
+
MD_INLINE_LINK = re.compile(r"\[([^\]]+)\]\(\s*((?:(?:[A-Za-z][A-Za-z0-9+.\-]*://)|mailto:)[^\s\)]+)\s*\)", re.IGNORECASE)
+
MD_AUTOLINK = re.compile(r"<((?:(?:[A-Za-z][A-Za-z0-9+.\-]*://)|mailto:)[^\s>]+)>", re.IGNORECASE)
+
HASHTAG = re.compile(r'(?<!\w)\#([\w]+)')
+
FEDIVERSE_HANDLE = re.compile(r'(?<![\w@])@([\w\.-]+)(?:@([\w\.-]+\.[\w\.-]+))?')
+
+
def tokenize_markdown(text: str, tags: list[str], handles: list[tuple[str, str]]) -> list[cross.Token]:
+
if not text:
+
return []
+
+
tokenizer = html_util.HTMLPostTokenizer()
+
tokenizer.mentions = handles
+
tokenizer.tags = tags
+
tokenizer.feed(text)
+
html_tokens = tokenizer.get_tokens()
+
+
tokens: list[cross.Token] = []
+
+
for tk in html_tokens:
+
if isinstance(tk, cross.TextToken):
+
tokens.extend(__tokenize_md(tk.text, tags, handles))
+
elif isinstance(tk, cross.LinkToken):
+
if not tk.label or util.canonical_label(tk.label, tk.href):
+
tokens.append(tk)
+
continue
+
+
tokens.extend(__tokenize_md(f"[{tk.label}]({tk.href})", tags, handles))
+
else:
+
tokens.append(tk)
+
+
return tokens
+
+
+
def __tokenize_md(text: str, tags: list[str], handles: list[tuple[str, str]]) -> list[cross.Token]:
+
index: int = 0
+
total: int = len(text)
+
buffer: list[str] = []
+
+
tokens: list[cross.Token] = []
+
+
def flush():
+
nonlocal buffer
+
if buffer:
+
tokens.append(cross.TextToken(''.join(buffer)))
+
buffer = []
+
+
while index < total:
+
if text[index] == '[':
+
md_inline = MD_INLINE_LINK.match(text, index)
+
if md_inline:
+
flush()
+
label = md_inline.group(1)
+
href = md_inline.group(2)
+
tokens.append(cross.LinkToken(href, label))
+
index = md_inline.end()
+
continue
+
+
if text[index] == '<':
+
md_auto = MD_AUTOLINK.match(text, index)
+
if md_auto:
+
flush()
+
href = md_auto.group(1)
+
tokens.append(cross.LinkToken(href, href))
+
index = md_auto.end()
+
continue
+
+
if text[index] == '#':
+
tag = HASHTAG.match(text, index)
+
if tag:
+
tag_text = tag.group(1)
+
if tag_text.lower() in tags:
+
flush()
+
tokens.append(cross.TagToken(tag_text))
+
index = tag.end()
+
continue
+
+
if text[index] == '@':
+
handle = FEDIVERSE_HANDLE.match(text, index)
+
if handle:
+
handle_text = handle.group(0)
+
stripped_handle = handle_text.strip()
+
+
match = next(
+
(pair for pair in handles if stripped_handle in pair),
+
None
+
)
+
+
if match:
+
flush()
+
tokens.append(cross.MentionToken(match[1], '')) # TODO: misskey doesn’t provide a uri
+
index = handle.end()
+
continue
+
+
url = URL.match(text, index)
+
if url:
+
flush()
+
href = url.group(0)
+
tokens.append(cross.LinkToken(href, href))
+
index = url.end()
+
continue
+
+
buffer.append(text[index])
+
index += 1
+
+
flush()
+
return tokens