social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky

new: rewrite html parser to tokenize directly

zenfyr.dev 56f0d921 af49b8a2

verified
Changed files
+199 -222
mastodon
+1 -79
mastodon/common.py
···
-
from bs4 import BeautifulSoup, Tag
-
from bs4.element import NavigableString
-
-
import mastodon.markeddown as markeddown
-
import cross
from util.media import MediaInfo
-
-
md_parser = markeddown.HTMLToMarkdownParser()
class MastodonPost(cross.Post):
def __init__(self, status: dict, tokens: list[cross.Token], media_attachments: list[MediaInfo]) -> None:
···
return self.status.get('sensitive', False)
def get_attachments(self) -> list[MediaInfo]:
-
return self.media_attachments
-
-
def tokenize_post(status: dict) -> list[cross.Token]:
-
if not status.get('content'):
-
return []
-
-
soup = BeautifulSoup(status['content'], "html.parser")
-
tokens: list[cross.Token] = []
-
-
tags: list[dict] = status.get('tags', [])
-
mentions: list[dict] = status.get('mentions', [])
-
-
def mdd(html):
-
md_parser.feed(html)
-
md = md_parser.get_markdown()
-
md_parser.reset()
-
return md
-
-
def recurse(node) -> None:
-
if isinstance(node, NavigableString):
-
tokens.append(cross.TextToken(str(node)))
-
return
-
-
if isinstance(node, Tag):
-
if node.name.lower() == "a":
-
href = node.get("href", "")
-
inner_html = "".join(str(c) for c in node.contents)
-
link_text_md = mdd(inner_html)
-
-
if link_text_md.startswith('@'):
-
as_mention = link_text_md[1:]
-
for block in mentions:
-
if href == block.get('url'):
-
tokens.append(cross.MentionToken(block['acct'], block['url']))
-
return
-
elif as_mention == block.get('acct') or as_mention == block.get('username'):
-
tokens.append(cross.MentionToken(block['acct'], block['url']))
-
return
-
-
if link_text_md.startswith('#'):
-
as_tag = link_text_md[1:].lower()
-
if any(as_tag == block.get('name') for block in tags):
-
tokens.append(cross.TagToken(link_text_md[1:]))
-
return
-
-
# idk if we can safely convert this to string
-
tokens.append(cross.LinkToken(str(href), link_text_md))
-
return
-
-
if node.find("a") is not None:
-
for child in node.contents:
-
recurse(child)
-
return
-
-
serialized = str(node)
-
markdownified = mdd(serialized)
-
if markdownified:
-
tokens.append(cross.TextToken(markdownified))
-
return
-
return
-
-
for child in soup.contents:
-
recurse(child)
-
-
if not tokens:
-
return []
-
-
last_token = tokens[-1]
-
if last_token and isinstance(last_token, cross.TextToken) and last_token.text.endswith('\n\n'):
-
tokens[-1] = cross.TextToken(last_token.text[:-2])
-
-
return tokens
+
return self.media_attachments
+191
mastodon/html_util.py
···
+
from html.parser import HTMLParser
+
import cross
+
+
class HTMLPostTokenizer(HTMLParser):
+
def __init__(self) -> None:
+
super().__init__()
+
self.tokens: list[cross.Token] = []
+
self.status: dict
+
+
self.in_pre = False
+
self.in_code = False
+
+
self.current_tag_stack = []
+
self.list_stack = []
+
+
self.anchor_stack = []
+
self.anchor_data = []
+
+
def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None:
+
attrs_dict = dict(attrs)
+
+
if tag == 'br':
+
self.tokens.append(cross.TextToken(' \n'))
+
+
elif tag == 'a':
+
href = attrs_dict.get('href', '')
+
self.anchor_stack.append(href)
+
+
elif tag == 'strong' or tag == 'b':
+
self.tokens.append(cross.TextToken('**'))
+
+
elif tag == 'em' or tag == 'i':
+
self.tokens.append(cross.TextToken('*'))
+
+
elif tag == 'del' or tag == 's':
+
self.tokens.append(cross.TextToken('~~'))
+
+
elif tag == 'code':
+
if not self.in_pre:
+
self.tokens.append(cross.TextToken('`'))
+
self.in_code = True
+
+
elif tag == 'pre':
+
if self.tokens:
+
last_token = self.tokens[-1]
+
if isinstance(last_token, cross.TextToken) and not last_token.text.endswith('\n'):
+
self.tokens.append(cross.TextToken('\n'))
+
+
self.tokens.append(cross.TextToken('```\n'))
+
self.in_pre = True
+
+
elif tag == 'blockquote':
+
if self.tokens:
+
last_token = self.tokens[-1]
+
if isinstance(last_token, cross.TextToken) and not last_token.text.endswith('\n'):
+
self.tokens.append(cross.TextToken('\n'))
+
+
self.tokens.append(cross.TextToken('\n> '))
+
+
elif tag == 'ul':
+
self.list_stack.append('ul')
+
self.tokens.append(cross.TextToken('\n'))
+
+
elif tag == 'ol':
+
self.list_stack.append('ol')
+
self.tokens.append(cross.TextToken('\n'))
+
+
elif tag == 'li':
+
indent = ' ' * (len(self.list_stack) - 1)
+
if self.list_stack and self.list_stack[-1] == 'ul':
+
self.tokens.append(cross.TextToken(f'{indent}- '))
+
elif self.list_stack and self.list_stack[-1] == 'ol':
+
self.tokens.append(cross.TextToken(f'{indent}1. '))
+
+
elif tag == {'h1', 'h2', 'h3', 'h4', 'h5', 'h6'}:
+
level = int(tag[1])
+
self.tokens.append(cross.TextToken("\n" + "#" * level + " "))
+
+
self.current_tag_stack.append(tag)
+
+
def handle_data(self, data: str) -> None:
+
if self.anchor_stack:
+
self.anchor_data.append(data)
+
else:
+
self.tokens.append(cross.TextToken(data))
+
+
def handle_endtag(self, tag: str) -> None:
+
if not self.current_tag_stack:
+
return
+
+
if tag in self.current_tag_stack:
+
self.current_tag_stack.remove(tag)
+
+
if tag == 'p':
+
self.tokens.append(cross.TextToken('\n\n'))
+
+
elif tag == 'a':
+
href = self.anchor_stack.pop()
+
anchor_data = ''.join(self.anchor_data)
+
+
if anchor_data.startswith('#'):
+
tags: list[dict] = self.status.get('tags', [])
+
+
as_tag = anchor_data[1:].lower()
+
if any(as_tag == block.get('name') for block in tags):
+
self.tokens.append(cross.TagToken(as_tag))
+
elif anchor_data.startswith('@'):
+
mentions: list[dict] = self.status.get('mentions', [])
+
+
as_mention = anchor_data[1:]
+
for block in mentions:
+
if href == block.get('url'):
+
self.tokens.append(cross.MentionToken(block['acct'], block['url']))
+
break
+
elif as_mention == block.get('acct') or as_mention == block.get('username'):
+
self.tokens.append(cross.MentionToken(block['acct'], block['url']))
+
break
+
else:
+
self.tokens.append(cross.LinkToken(href, anchor_data))
+
+
elif tag == 'strong' or tag == 'b':
+
self.tokens.append(cross.TextToken('**'))
+
+
elif tag == 'em' or tag == 'i':
+
self.tokens.append(cross.TextToken('*'))
+
+
elif tag == 'del' or tag == 's':
+
self.tokens.append(cross.TextToken('~~'))
+
+
elif tag == 'code':
+
if not self.in_pre and self.in_code:
+
self.tokens.append(cross.TextToken('`'))
+
self.in_code = False
+
+
elif tag == 'pre':
+
self.tokens.append(cross.TextToken('\n```\n'))
+
self.in_pre = False
+
+
elif tag == 'blockquote':
+
self.tokens.append(cross.TextToken('\n'))
+
+
elif tag == 'ul' or tag == 'ol':
+
if self.list_stack:
+
self.list_stack.pop()
+
self.tokens.append(cross.TextToken('\n'))
+
+
elif tag == 'li':
+
self.tokens.append(cross.TextToken('\n'))
+
+
elif tag in ['h1', 'h2', 'h3', 'h4', 'h5', 'h6']:
+
self.tokens.append(cross.TextToken('\n'))
+
+
def get_tokens(self) -> list[cross.Token]:
+
if not self.tokens:
+
return []
+
+
combined: list[cross.Token] = []
+
buffer: list[str] = []
+
+
def flush_buffer():
+
if buffer:
+
merged = ''.join(buffer)
+
combined.append(cross.TextToken(text=merged))
+
buffer.clear()
+
+
for token in self.tokens:
+
if isinstance(token, cross.TextToken):
+
buffer.append(token.text)
+
else:
+
flush_buffer()
+
combined.append(token)
+
+
flush_buffer()
+
+
if combined and isinstance(combined[-1], cross.TextToken):
+
if combined[-1].text.endswith('\n\n'):
+
combined[-1] = cross.TextToken(combined[-1].text[:-2])
+
return combined
+
+
def reset(self):
+
"""Reset the parser state for reuse."""
+
super().reset()
+
self.tokens = []
+
self.status = {}
+
+
self.in_pre = False
+
self.in_code = False
+
+
self.current_tag_stack = []
+
self.anchor_stack = []
+
self.list_stack = []
+7 -3
mastodon/input.py
···
import re
import asyncio
-
from mastodon.common import MastodonPost, tokenize_post
+
from mastodon.common import MastodonPost
+
import mastodon.html_util as html_util
import cross, util.database as database
from util.util import LOGGER, as_envvar
···
if akkoma_ext:
if akkoma_ext.get('mediaType') in MARKDOWNY:
return cross.tokenize_markdown(akkoma_ext["content"], tags, mentions)
-
-
return tokenize_post(status)
+
+
tokenizer = html_util.HTMLPostTokenizer()
+
tokenizer.status = status
+
tokenizer.feed(status.get('content', ""))
+
return tokenizer.get_tokens()
def _on_create_post(self, outputs: list[cross.Output], status: dict):
# skip events from other users
-140
mastodon/markeddown.py
···
-
import re
-
from html.parser import HTMLParser
-
-
WHITESPACE = re.compile(r'[\r\n\t]+')
-
-
class HTMLToMarkdownParser(HTMLParser):
-
def __init__(self) -> None:
-
super().__init__()
-
self.markdown = []
-
self.current_tag_stack = []
-
-
self.in_pre = False
-
self.in_code = False
-
-
self.list_stack = []
-
self.link_stack = []
-
-
def get_markdown(self):
-
return ''.join(self.markdown)
-
-
def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None:
-
attrs_dict = dict(attrs)
-
-
if tag == 'br':
-
self.markdown.append(' \n')
-
-
elif tag == 'a':
-
href = attrs_dict.get('href', '')
-
self.link_stack.append(href)
-
self.markdown.append('[')
-
-
elif tag == 'strong' or tag == 'b':
-
self.markdown.append('**')
-
-
elif tag == 'em' or tag == 'i':
-
self.markdown.append('*')
-
-
elif tag == 'del' or tag == 's':
-
self.markdown.append('~~')
-
-
elif tag == 'code':
-
if not self.in_pre:
-
self.markdown.append('`')
-
self.in_code = True
-
-
elif tag == 'pre':
-
if self.markdown and not str(self.markdown[-1]).endswith('\n'):
-
self.markdown.append('\n')
-
-
self.markdown.append('```\n')
-
self.in_pre = True
-
-
elif tag == 'blockquote':
-
if self.markdown and not str(self.markdown[-1]).endswith('\n'):
-
self.markdown.append('\n')
-
-
self.markdown.append('\n> ')
-
-
elif tag == 'ul':
-
self.list_stack.append('ul')
-
self.markdown.append('\n')
-
-
elif tag == 'ol':
-
self.list_stack.append('ol')
-
self.markdown.append('\n')
-
-
elif tag == 'li':
-
indent = ' ' * (len(self.list_stack) - 1)
-
if self.list_stack and self.list_stack[-1] == 'ul':
-
self.markdown.append(f'{indent}- ')
-
elif self.list_stack and self.list_stack[-1] == 'ol':
-
self.markdown.append(f'{indent}1. ')
-
-
elif tag == {'h1', 'h2', 'h3', 'h4', 'h5', 'h6'}:
-
level = int(tag[1])
-
self.markdown.append("\n" + "#" * level + " ")
-
-
self.current_tag_stack.append(tag)
-
-
def handle_endtag(self, tag: str) -> None:
-
if not self.current_tag_stack:
-
return
-
-
if tag in self.current_tag_stack:
-
self.current_tag_stack.remove(tag)
-
-
if tag == 'p':
-
self.markdown.append('\n\n')
-
-
elif tag == 'a':
-
if self.link_stack:
-
href = self.link_stack.pop()
-
self.markdown.append(f']({href})')
-
-
elif tag == 'strong' or tag == 'b':
-
self.markdown.append('**')
-
-
elif tag == 'em' or tag == 'i':
-
self.markdown.append('*')
-
-
elif tag == 'del' or tag == 's':
-
self.markdown.append('~~')
-
-
elif tag == 'code':
-
if not self.in_pre and self.in_code:
-
self.markdown.append('`')
-
self.in_code = False
-
-
elif tag == 'pre':
-
self.markdown.append('\n```\n')
-
self.in_pre = False
-
-
elif tag == 'blockquote':
-
self.markdown.append('\n')
-
-
elif tag == 'ul' or tag == 'ol':
-
if self.list_stack:
-
self.list_stack.pop()
-
self.markdown.append('\n')
-
-
elif tag == 'li':
-
self.markdown.append('\n')
-
-
elif tag in ['h1', 'h2', 'h3', 'h4', 'h5', 'h6']:
-
self.markdown.append('\n')
-
-
def handle_data(self, data):
-
self.markdown.append(data)
-
-
def reset(self):
-
"""Reset the parser state for reuse."""
-
super().reset()
-
self.markdown = []
-
self.current_tag_stack = []
-
-
self.in_pre = False
-
self.in_code = False
-
-
self.link_stack = []
-
self.list_stack = []