social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky

mastodon input

zenfyr.dev f7c69e5d 6095131e

verified
Changed files
+258 -5
bluesky
mastodon
+3 -1
bluesky/input.py
···
reposted = self._get_post(self.url, self.did, reposted_uri)
if not reposted:
self.log.info(
-
"Skipping repost '%s' as reposted post '%s' was not found in the db."
+
"Skipping repost '%s' as reposted post '%s' was not found in the db.",
+
post_uri,
+
reposted_uri,
)
return
+132 -4
mastodon/input.py
···
import websockets
+
from cross.attachments import (
+
LabelsAttachment,
+
LanguagesAttachment,
+
MediaAttachment,
+
RemoteUrlAttachment,
+
SensitiveAttachment,
+
)
+
from cross.media import Blob, download_blob
+
from cross.post import Post
from cross.service import InputService
from database.connection import DatabasePool
from mastodon.info import MastodonService, validate_and_transform
+
from mastodon.parser import StatusParser
ALLOWED_VISIBILITY: list[str] = ["public", "unlisted"]
···
return self.options.token
def _on_create_post(self, status: dict[str, Any]):
-
self.log.info(status) # TODO
+
if status["account"]["id"] != self.user_id:
+
return
+
+
if status["visibility"] not in self.options.allowed_visibility:
+
return
+
+
reblog: dict[str, Any] | None = status.get("reblog")
+
if reblog:
+
if reblog["account"]["id"] != self.user_id:
+
return
+
self._on_reblog(status, reblog)
+
return
+
+
if status.get("poll"):
+
self.log.info("Skipping '%s'! Contains a poll..", status["id"])
+
return
+
+
if status.get("quote"):
+
self.log.info("Skipping '%s'! Quote..", status["id"])
+
return
+
+
in_reply: str | None = status.get("in_reply_to_id")
+
in_reply_to: str | None = status.get("in_reply_to_account_id")
+
if in_reply_to and in_reply_to != self.user_id:
+
return
+
+
parent = None
+
if in_reply:
+
parent = self._get_post(self.url, self.user_id, in_reply)
+
if not parent:
+
self.log.info(
+
"Skipping %s, parent %s not found in db", status["id"], in_reply
+
)
+
return
+
parser = StatusParser()
+
parser.feed(status["content"])
+
text, fragments = parser.get_result()
+
+
post = Post(id=status["id"], parent_id=in_reply, text=text)
+
post.fragments.extend(fragments)
+
+
if status.get("url"):
+
post.attachments.put(RemoteUrlAttachment(url=status["url"]))
+
if status.get("sensitive"):
+
post.attachments.put(SensitiveAttachment(sensitive=True))
+
if status.get("language"):
+
post.attachments.put(LanguagesAttachment(langs=[status["language"]]))
+
if status.get("spoiler"):
+
post.attachments.put(LabelsAttachment(labels=[status["spoiler"]]))
+
+
blobs: list[Blob] = []
+
for media in status.get("media_attachments", []):
+
self.log.info("Downloading %s...", media["url"])
+
blob: Blob | None = download_blob(media["url"], media.get("alt"))
+
if not blob:
+
self.log.error(
+
"Skipping %s! Failed to download media %s.",
+
status["id"],
+
media["url"],
+
)
+
return
+
blobs.append(blob)
+
+
if blobs:
+
post.attachments.put(MediaAttachment(blobs=blobs))
+
+
if parent:
+
self._insert_post(
+
{
+
"user": self.user_id,
+
"service": self.url,
+
"identifier": status["id"],
+
"parent": parent["id"],
+
"root": parent["id"] if not parent["root"] else parent["root"],
+
}
+
)
+
else:
+
self._insert_post(
+
{
+
"user": self.user_id,
+
"service": self.url,
+
"identifier": status["id"],
+
}
+
)
+
+
for out in self.outputs:
+
self.submitter(lambda: out.accept_post(post))
+
+
def _on_reblog(self, status: dict[str, Any], reblog: dict[str, Any]):
+
reposted = self._get_post(self.url, self.user_id, reblog["id"])
+
if not reposted:
+
self.log.info(
+
"Skipping repost '%s' as reposted post '%s' was not found in the db.",
+
status["id"],
+
reblog["id"],
+
)
+
return
+
+
self._insert_post(
+
{
+
"user": self.user_id,
+
"service": self.url,
+
"identifier": status["id"],
+
"reposted": reposted["id"],
+
}
+
)
+
+
for out in self.outputs:
+
self.submitter(lambda: out.accept_repost(status["id"], reposted["id"]))
def _on_delete_post(self, status_id: str):
-
self.log.info(status_id) # TODO
+
post = self._get_post(self.url, self.user_id, status_id)
+
if not post:
+
return
+
+
if post["reposted_id"]:
+
for output in self.outputs:
+
self.submitter(lambda: output.delete_repost(status_id))
+
else:
+
for output in self.outputs:
+
self.submitter(lambda: output.delete_post(status_id))
+
self._delete_post_by_id(post["id"])
def _accept_msg(self, msg: websockets.Data) -> None:
data: dict[str, Any] = cast(dict[str, Any], json.loads(msg))
-
event: str = cast(str, data['event'])
-
payload: str = cast(str, data['payload'])
+
event: str = cast(str, data["event"])
+
payload: str = cast(str, data["payload"])
if event == "update":
self._on_create_post(json.loads(payload))
+123
mastodon/parser.py
···
+
from html.parser import HTMLParser
+
from typing import override
+
import cross.fragments as f
+
+
+
class StatusParser(HTMLParser):
+
def __init__(self) -> None:
+
super().__init__()
+
self.text: str = ""
+
self.fragments: list[f.Fragment] = []
+
+
self._tag_stack: dict[str, tuple[int, dict[str, str | None]]] = {}
+
self.in_pre: bool = False
+
self.in_code: bool = False
+
+
self.invisible: bool = False
+
+
@override
+
def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None:
+
_attr = dict(attrs)
+
+
def append_newline():
+
if self.text and not self.text.endswith("\n"):
+
self.text += "\n"
+
+
if self.invisible:
+
return
+
+
match tag:
+
case "p":
+
cls = _attr.get('class', '')
+
if cls and 'quote-inline' in cls:
+
self.invisible = True
+
case "a":
+
self._tag_stack["a"] = (len(self.text), _attr)
+
case "code":
+
if not self.in_pre:
+
self.text += "`"
+
self.in_code = True
+
case "pre":
+
append_newline()
+
self.text += "```\n"
+
self.in_pre = True
+
case "blockquote":
+
append_newline()
+
self.text += "> "
+
case "strong" | "b":
+
self.text += "**"
+
case "em" | "i":
+
self.text += "*"
+
case "del" | "s":
+
self.text += "~~"
+
case "br":
+
self.text += "\n"
+
case _:
+
if tag in {"h1", "h2", "h3", "h4", "h5", "h6"}:
+
level = int(tag[1])
+
self.text += "\n" + "#" * level + " "
+
+
@override
+
def handle_endtag(self, tag: str) -> None:
+
if self.invisible:
+
if tag == "p":
+
self.invisible = False
+
return
+
+
current_end = len(self.text)
+
match tag:
+
case "a":
+
if "a" in self._tag_stack:
+
start, _attr = self._tag_stack.pop("a")
+
+
href = _attr.get('href')
+
if href and current_end > start:
+
cls = _attr.get('class', '')
+
if cls:
+
if 'hashtag' in cls:
+
tag = self.text[start:current_end]
+
tag = tag[1:] if tag.startswith('#') else tag
+
+
self.fragments.append(
+
f.TagFragment(start=start, end=current_end, tag=tag)
+
)
+
return
+
if 'mention' in cls: # TODO put the full acct in the fragment
+
mention = self.text[start:current_end]
+
self.fragments.append(
+
f.MentionFragment(start=start, end=current_end, uri=mention)
+
)
+
return
+
self.fragments.append(
+
f.LinkFragment(start=start, end=current_end, url=href)
+
)
+
case "code":
+
if not self.in_pre and self.in_code:
+
self.text += "`"
+
self.in_code = False
+
case "pre":
+
self.text += "\n```\n"
+
self.in_pre = False
+
case "blockquote":
+
self.text += "\n"
+
case "strong" | "b":
+
self.text += "**"
+
case "em" | "i":
+
self.text += "*"
+
case "del" | "s":
+
self.text += "~~"
+
case "p":
+
self.text += "\n\n"
+
case _:
+
if tag in ["h1", "h2", "h3", "h4", "h5", "h6"]:
+
self.text += '\n'
+
+
@override
+
def handle_data(self, data: str) -> None:
+
if not self.invisible:
+
self.text += data
+
+
def get_result(self) -> tuple[str, list[f.Fragment]]:
+
if self.text.endswith('\n\n'):
+
return self.text[:-2], self.fragments
+
return self.text, self.fragments