social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky

add misskey input (untested)

zenfyr.dev 793b1fd8 f223873a

verified
Changed files
+394 -114
cross
mastodon
misskey
util
+3
cross/fragments.py
···
@dataclass(kw_only=True)
class MentionFragment(Fragment):
uri: str
+
+
+
NON_OVERLAPPING: set[type[Fragment]] = {LinkFragment, TagFragment, MentionFragment}
+1 -1
mastodon/input.py
···
)
for out in self.outputs:
-
self.submitter(lambda: out.accept_repost(status["id"], reposted["id"]))
+
self.submitter(lambda: out.accept_repost(status["id"], reblog["id"]))
def _on_delete_post(self, status_id: str):
post = self._get_post(self.url, self.user_id, status_id)
+25 -112
mastodon/parser.py
···
-
from html.parser import HTMLParser
from typing import override
import cross.fragments as f
+
from util.html import HTMLToFragmentsParser
-
class StatusParser(HTMLParser):
+
class StatusParser(HTMLToFragmentsParser):
def __init__(self) -> None:
super().__init__()
-
self.text: str = ""
-
self.fragments: list[f.Fragment] = []
-
-
self._tag_stack: dict[str, tuple[int, dict[str, str | None]]] = {}
-
self.in_pre: bool = False
-
self.in_code: bool = False
-
-
self.invisible: bool = False
@override
-
def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None:
-
_attr = dict(attrs)
-
-
def append_newline():
-
if self.text and not self.text.endswith("\n"):
-
self.text += "\n"
-
-
if self.invisible:
-
return
-
-
match tag:
-
case "p":
-
cls = _attr.get('class', '')
-
if cls and 'quote-inline' in cls:
-
self.invisible = True
-
case "a":
-
self._tag_stack["a"] = (len(self.text), _attr)
-
case "code":
-
if not self.in_pre:
-
self.text += "`"
-
self.in_code = True
-
case "pre":
-
append_newline()
-
self.text += "```\n"
-
self.in_pre = True
-
case "blockquote":
-
append_newline()
-
self.text += "> "
-
case "strong" | "b":
-
self.text += "**"
-
case "em" | "i":
-
self.text += "*"
-
case "del" | "s":
-
self.text += "~~"
-
case "br":
-
self.text += "\n"
-
case _:
-
if tag in {"h1", "h2", "h3", "h4", "h5", "h6"}:
-
level = int(tag[1])
-
self.text += "\n" + "#" * level + " "
-
-
@override
-
def handle_endtag(self, tag: str) -> None:
-
if self.invisible:
-
if tag == "p":
-
self.invisible = False
-
return
-
+
def handle_a_endtag(self):
current_end = len(self.text)
-
match tag:
-
case "a":
-
if "a" in self._tag_stack:
-
start, _attr = self._tag_stack.pop("a")
+
start, _attr = self._tag_stack.pop("a")
-
href = _attr.get('href')
-
if href and current_end > start:
-
cls = _attr.get('class', '')
-
if cls:
-
if 'hashtag' in cls:
-
tag = self.text[start:current_end]
-
tag = tag[1:] if tag.startswith('#') else tag
-
-
self.fragments.append(
-
f.TagFragment(start=start, end=current_end, tag=tag)
-
)
-
return
-
if 'mention' in cls: # TODO put the full acct in the fragment
-
mention = self.text[start:current_end]
-
self.fragments.append(
-
f.MentionFragment(start=start, end=current_end, uri=mention)
-
)
-
return
-
self.fragments.append(
-
f.LinkFragment(start=start, end=current_end, url=href)
-
)
-
case "code":
-
if not self.in_pre and self.in_code:
-
self.text += "`"
-
self.in_code = False
-
case "pre":
-
self.text += "\n```\n"
-
self.in_pre = False
-
case "blockquote":
-
self.text += "\n"
-
case "strong" | "b":
-
self.text += "**"
-
case "em" | "i":
-
self.text += "*"
-
case "del" | "s":
-
self.text += "~~"
-
case "p":
-
self.text += "\n\n"
-
case _:
-
if tag in ["h1", "h2", "h3", "h4", "h5", "h6"]:
-
self.text += '\n'
+
href = _attr.get('href')
+
if href and current_end > start:
+
cls = _attr.get('class', '')
+
if cls:
+
if 'hashtag' in cls:
+
tag = self.text[start:current_end]
+
tag = tag[1:] if tag.startswith('#') else tag
-
@override
-
def handle_data(self, data: str) -> None:
-
if not self.invisible:
-
self.text += data
-
-
def get_result(self) -> tuple[str, list[f.Fragment]]:
-
if self.text.endswith('\n\n'):
-
return self.text[:-2], self.fragments
-
return self.text, self.fragments
+
self.fragments.append(
+
f.TagFragment(start=start, end=current_end, tag=tag)
+
)
+
return
+
if 'mention' in cls: # TODO put the full acct in the fragment
+
mention = self.text[start:current_end]
+
mention = mention[1:] if mention.startswith('@') else mention
+
self.fragments.append(
+
f.MentionFragment(start=start, end=current_end, uri=mention)
+
)
+
return
+
self.fragments.append(
+
f.LinkFragment(start=start, end=current_end, url=href)
+
)
+112 -1
misskey/input.py
···
import websockets
+
from cross.attachments import (
+
LabelsAttachment,
+
MediaAttachment,
+
RemoteUrlAttachment,
+
SensitiveAttachment,
+
)
+
from cross.media import Blob, download_blob
+
from cross.post import Post
from cross.service import InputService
from database.connection import DatabasePool
from misskey.info import MisskeyService
+
from util.markdown import MarkdownParser
from util.util import normalize_service_url
ALLOWED_VISIBILITY = ["public", "home"]
···
return self.options.token
def _on_note(self, note: dict[str, Any]):
-
self.log.info(note) # TODO
+
if note["userId"] != self.user_id:
+
return
+
+
if note["visibility"] not in self.options.allowed_visibility:
+
return
+
+
if note.get("poll"):
+
self.log.info("Skipping '%s'! Contains a poll..", note["id"])
+
return
+
+
renote: dict[str, Any] | None = note.get("renote")
+
if renote:
+
if note.get("text") is not None:
+
self.log.info("Skipping '%s'! Quote..", note["id"])
+
return
+
self._on_renote(note, renote)
+
return
+
+
reply: dict[str, Any] | None = note.get("reply")
+
if reply:
+
if reply.get("userId") != self.user_id:
+
self.log.info("Skipping '%s'! Reply to other user..", note["id"])
+
return
+
+
parent = None
+
if reply:
+
parent = self._get_post(self.url, self.user_id, reply["id"])
+
if not parent:
+
self.log.info(
+
"Skipping %s, parent %s not found in db", note["id"], reply["id"]
+
)
+
return
+
+
parser = MarkdownParser() # TODO MFM parser
+
text, fragments = parser.parse(note.get("text", ""))
+
post = Post(id=note["id"], parent_id=reply["id"] if reply else None, text=text)
+
post.fragments.extend(fragments)
+
+
post.attachments.put(RemoteUrlAttachment(url=self.url + "/notes/" + note["id"]))
+
if any([a.get("isSensitive", False) for a in note.get("files", [])]):
+
post.attachments.put(SensitiveAttachment(sensitive=True))
+
if note.get("cw"):
+
post.attachments.put(LabelsAttachment(labels=[note["cw"]]))
+
+
blobs: list[Blob] = []
+
for media in note.get("files", []):
+
self.log.info("Downloading %s...", media["url"])
+
blob: Blob | None = download_blob(media["url"], media.get("comment", ""))
+
if not blob:
+
self.log.error(
+
"Skipping %s! Failed to download media %s.",
+
note["id"],
+
media["url"],
+
)
+
return
+
blobs.append(blob)
+
+
if blobs:
+
post.attachments.put(MediaAttachment(blobs=blobs))
+
+
if parent:
+
self._insert_post(
+
{
+
"user": self.user_id,
+
"service": self.url,
+
"identifier": note["id"],
+
"parent": parent["id"],
+
"root": parent["id"] if not parent["root"] else parent["root"],
+
}
+
)
+
else:
+
self._insert_post(
+
{
+
"user": self.user_id,
+
"service": self.url,
+
"identifier": note["id"],
+
}
+
)
+
+
for out in self.outputs:
+
self.submitter(lambda: out.accept_post(post))
+
+
def _on_renote(self, note: dict[str, Any], renote: dict[str, Any]):
+
reposted = self._get_post(self.url, self.user_id, renote["id"])
+
if not reposted:
+
self.log.info(
+
"Skipping repost '%s' as reposted post '%s' was not found in the db.",
+
note["id"],
+
renote["id"],
+
)
+
return
+
+
self._insert_post(
+
{
+
"user": self.user_id,
+
"service": self.url,
+
"identifier": note["id"],
+
"reposted": reposted["id"],
+
}
+
)
+
+
for out in self.outputs:
+
self.submitter(lambda: out.accept_repost(note["id"], renote["id"]))
def _accept_msg(self, msg: websockets.Data) -> None:
data: dict[str, Any] = cast(dict[str, Any], json.loads(msg))
+110
util/html.py
···
+
from html.parser import HTMLParser
+
from typing import override
+
import cross.fragments as f
+
+
+
class HTMLToFragmentsParser(HTMLParser):
+
def __init__(self) -> None:
+
super().__init__()
+
self.text: str = ""
+
self.fragments: list[f.Fragment] = []
+
+
self._tag_stack: dict[str, tuple[int, dict[str, str | None]]] = {}
+
self.in_pre: bool = False
+
self.in_code: bool = False
+
+
self.invisible: bool = False
+
+
def handle_a_endtag(self):
+
current_end = len(self.text)
+
start, _attr = self._tag_stack.pop("a")
+
+
href = _attr.get('href')
+
if href and current_end > start:
+
self.fragments.append(
+
f.LinkFragment(start=start, end=current_end, url=href)
+
)
+
+
@override
+
def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None:
+
_attr = dict(attrs)
+
+
def append_newline():
+
if self.text and not self.text.endswith("\n"):
+
self.text += "\n"
+
+
if self.invisible:
+
return
+
+
match tag:
+
case "p":
+
cls = _attr.get('class', '')
+
if cls and 'quote-inline' in cls:
+
self.invisible = True
+
case "a":
+
self._tag_stack["a"] = (len(self.text), _attr)
+
case "code":
+
if not self.in_pre:
+
self.text += "`"
+
self.in_code = True
+
case "pre":
+
append_newline()
+
self.text += "```\n"
+
self.in_pre = True
+
case "blockquote":
+
append_newline()
+
self.text += "> "
+
case "strong" | "b":
+
self.text += "**"
+
case "em" | "i":
+
self.text += "*"
+
case "del" | "s":
+
self.text += "~~"
+
case "br":
+
self.text += "\n"
+
case _:
+
if tag in {"h1", "h2", "h3", "h4", "h5", "h6"}:
+
level = int(tag[1])
+
self.text += "\n" + "#" * level + " "
+
+
@override
+
def handle_endtag(self, tag: str) -> None:
+
if self.invisible:
+
if tag == "p":
+
self.invisible = False
+
return
+
+
match tag:
+
case "a":
+
if "a" in self._tag_stack:
+
self.handle_a_endtag()
+
case "code":
+
if not self.in_pre and self.in_code:
+
self.text += "`"
+
self.in_code = False
+
case "pre":
+
self.text += "\n```\n"
+
self.in_pre = False
+
case "blockquote":
+
self.text += "\n"
+
case "strong" | "b":
+
self.text += "**"
+
case "em" | "i":
+
self.text += "*"
+
case "del" | "s":
+
self.text += "~~"
+
case "p":
+
self.text += "\n\n"
+
case _:
+
if tag in ["h1", "h2", "h3", "h4", "h5", "h6"]:
+
self.text += '\n'
+
+
@override
+
def handle_data(self, data: str) -> None:
+
if not self.invisible:
+
self.text += data
+
+
def get_result(self) -> tuple[str, list[f.Fragment]]:
+
if self.text.endswith('\n\n'):
+
return self.text[:-2], self.fragments
+
return self.text, self.fragments
+143
util/markdown.py
···
+
import re
+
import cross.fragments as f
+
from util.html import HTMLToFragmentsParser
+
+
URL = re.compile(r"(?:(?:[A-Za-z][A-Za-z0-9+.-]*://)|mailto:)[^\s]+", re.IGNORECASE)
+
MD_INLINE_LINK = re.compile(
+
r"\[([^\]]+)\]\(\s*((?:(?:[A-Za-z][A-Za-z0-9+.\-]*://)|mailto:)[^\s\)]+)\s*\)",
+
re.IGNORECASE,
+
)
+
MD_AUTOLINK = re.compile(
+
r"<((?:(?:[A-Za-z][A-Za-z0-9+.\-]*://)|mailto:)[^\s>]+)>", re.IGNORECASE
+
)
+
HASHTAG = re.compile(r"(?<!\w)\#([\w]+)")
+
FEDIVERSE_HANDLE = re.compile(r"(?<![\w@])@([\w\.-]+)(?:@([\w\.-]+\.[\w\.-]+))?")
+
+
REGEXES = [URL, MD_INLINE_LINK, MD_AUTOLINK, HASHTAG, FEDIVERSE_HANDLE]
+
+
+
# TODO autolinks are broken by the html parser
+
class MarkdownParser:
+
def parse(self, text: str) -> tuple[str, list[f.Fragment]]:
+
if not text:
+
return "", []
+
+
html_parser = HTMLToFragmentsParser()
+
html_parser.feed(text)
+
markdown, fragments = html_parser.get_result()
+
+
index: int = 0
+
total: int = len(markdown)
+
+
# no match == processed fragments
+
events: list[tuple[int, int, re.Match[str] | None, str]] = []
+
events.extend([(fg.start, fg.end, None, "html") for fg in fragments])
+
while index < total:
+
ch = markdown[index]
+
rmatch = None
+
kind = None
+
+
if ch == "[":
+
rmatch = MD_INLINE_LINK.match(markdown, index)
+
kind = "inline_link"
+
# elif ch == '<':
+
# rmatch = MD_AUTOLINK.match(markdown, index)
+
# kind = "autolink"
+
elif ch == "#":
+
rmatch = HASHTAG.match(markdown, index)
+
kind = "hashtag"
+
elif ch == "@":
+
rmatch = FEDIVERSE_HANDLE.match(markdown, index)
+
kind = "mention"
+
else:
+
rmatch = URL.match(markdown, index)
+
kind = "url"
+
+
if rmatch:
+
start, end = rmatch.start(), rmatch.end()
+
if end == index:
+
index += 1
+
continue
+
events.append((start, end, rmatch, kind))
+
index = end
+
continue
+
+
index += 1
+
+
events.sort(key=lambda x: x[0])
+
+
# validate fragment positions
+
last_end: int = 0
+
for start, end, _, _ in events:
+
if start > end:
+
raise Exception(f"Invalid fragment position start={start}, end={end}")
+
if last_end > start:
+
raise Exception(
+
f"Overlapping text fragments at position end={last_end}, start={start}"
+
)
+
last_end = end
+
+
def update_fragments(start: int, s, offset: int):
+
nonlocal fragments
+
+
for fg in fragments:
+
if fg != s and fg.start >= start:
+
fg.start += offset
+
fg.end += offset
+
+
new_text = ""
+
last_pos = 0
+
for start, end, rmatch, event in events:
+
if start > last_pos:
+
new_text += markdown[last_pos:start]
+
+
if not rmatch:
+
new_text += markdown[start:end]
+
last_pos = end
+
continue
+
+
match event:
+
case "inline_link":
+
label = rmatch.group(1)
+
href = rmatch.group(2)
+
fg = f.LinkFragment(start=start, end=start + len(label), url=href)
+
fragments.append(fg)
+
update_fragments(start, fg, -(end - (start + len(label))))
+
new_text += label
+
# case "autolink":
+
# url = rmatch.group(0)
+
# fg = f.LinkFragment(start=start, end=end - 2, url=url)
+
# fragments.append(fg)
+
# update_fragments(start, fg, -2)
+
# new_text += url
+
case "hashtag":
+
tag = rmatch.group(0)
+
fragments.append(
+
f.TagFragment(
+
start=start,
+
end=end,
+
tag=tag[1:] if tag.startswith("#") else tag,
+
)
+
)
+
new_text += markdown[start:end]
+
case "mention":
+
mention = rmatch.group(0)
+
fragments.append(
+
f.MentionFragment(
+
start=start,
+
end=end,
+
uri=mention[1:] if mention.startswith("@") else mention,
+
)
+
)
+
new_text += markdown[start:end]
+
case "url":
+
url = rmatch.group(0)
+
fragments.append(f.LinkFragment(start=start, end=end, url=url))
+
new_text += markdown[start:end]
+
case _:
+
pass
+
last_pos = end
+
if last_pos < len(markdown):
+
new_text += markdown[last_pos:]
+
+
return new_text, fragments