social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky

parse raw text if available

zenfyr.dev f361003e 8bd14026

verified
+82
cross.py
···
import re
ALTERNATE = re.compile(r'\S+|\s+')
+
URL = re.compile(r'(?:(?:[A-Za-z][A-Za-z0-9+.-]*://)|mailto:)[^\s]+', re.IGNORECASE)
+
MD_INLINE_LINK = re.compile(r"\[([^\]]+)\]\(\s*((?:(?:[A-Za-z][A-Za-z0-9+.\-]*://)|mailto:)[^\s\)]+)\s*\)", re.IGNORECASE)
+
MD_AUTOLINK = re.compile(r"<((?:(?:[A-Za-z][A-Za-z0-9+.\-]*://)|mailto:)[^\s>]+)>", re.IGNORECASE)
+
HASHTAG = re.compile(r'(?<!\w)\#([\w]+)')
+
FEDIVERSE_HANDLE = re.compile(r'(?<![\w@])@([\w-]+)(?:@([\w\.-]+\.[\w\.-]+))?')
# generic token
class Token():
···
def delete_quote(self, quote_id: str):
LOGGER.warning('Not Implemented.. "removed quote" %s', quote_id)
+
+
def tokenize_markdown(text: str, tags: list[str], handles: list[tuple[str, str]]) -> list[Token]:
+
if not text:
+
return []
+
+
index: int = 0
+
total: int = len(text)
+
buffer: list[str] = []
+
+
tokens: list[Token] = []
+
+
def flush():
+
nonlocal buffer
+
if buffer:
+
tokens.append(TextToken(''.join(buffer)))
+
buffer = []
+
+
while index < total:
+
if text[index] == '[':
+
md_inline = MD_INLINE_LINK.match(text, index)
+
if md_inline:
+
flush()
+
label = md_inline.group(1)
+
href = md_inline.group(2)
+
tokens.append(LinkToken(href, label))
+
index = md_inline.end()
+
continue
+
+
if text[index] == '<':
+
md_auto = MD_AUTOLINK.match(text, index)
+
if md_auto:
+
flush()
+
href = md_auto.group(1)
+
tokens.append(LinkToken(href, href))
+
index = md_auto.end()
+
continue
+
+
if text[index] == '#':
+
tag = HASHTAG.match(text, index)
+
if tag:
+
tag_text = tag.group(1)
+
if tag_text.lower() in tags:
+
flush()
+
tokens.append(TagToken(tag_text))
+
index = tag.end()
+
continue
+
+
if text[index] == '@':
+
handle = FEDIVERSE_HANDLE.match(text, index)
+
if handle:
+
handle_text = handle.group(0)
+
stripped_handle = handle_text.strip()
+
+
match = next(
+
(pair for pair in handles if stripped_handle in pair),
+
None
+
)
+
+
if match:
+
flush()
+
tokens.append(MentionToken(match[1], '')) # TODO: misskey doesn’t provide a uri
+
index = handle.end()
+
continue
+
+
url = URL.match(text, index)
+
if url:
+
flush()
+
href = url.group(0)
+
tokens.append(LinkToken(href, href))
+
index = url.end()
+
continue
+
+
buffer.append(text[index])
+
index += 1
+
+
flush()
+
return tokens
def split_tokens(tokens: list[Token], max_chars: int, max_link_len: int = 35) -> list[list[Token]]:
def start_new_block():
+30 -1
mastodon.py
···
recurse(child)
return tokens
+
+
MARKDOWNY = ['text/x.misskeymarkdown', 'text/markdown', 'text/plain']
class MastodonPost(cross.Post):
def __init__(self, status: dict, media_attachments: list[media_util.MediaInfo]) -> None:
super().__init__()
self.status = status
self.media_attachments = media_attachments
-
self.tokens = tokenize_post(status)
+
self.tokens = self.__to_tokens()
+
+
+
def __to_tokens(self):
+
content_type = self.status.get('content_type', 'text/plain')
+
raw_text = self.status.get('text')
+
+
tags: list[str] = []
+
for tag in self.status.get('tags', []):
+
tags.append(tag['name'])
+
+
mentions: list[tuple[str, str]] = []
+
for mention in self.status.get('mentions', []):
+
mentions.append(('@' + mention['username'], '@' + mention['acct']))
+
+
if raw_text and content_type in MARKDOWNY:
+
return cross.tokenize_markdown(raw_text, tags, mentions)
+
+
pleroma_ext: dict | None = self.status.get('pleroma', {}).get('content')
+
if pleroma_ext:
+
for ctype in MARKDOWNY:
+
if ctype not in pleroma_ext:
+
continue
+
+
return cross.tokenize_markdown(pleroma_ext[ctype], tags, mentions)
+
+
return tokenize_post(self.status)
+
def get_tokens(self) -> list[cross.Token]:
return self.tokens
+10 -85
misskey.py
···
import cross, media_util, util, database
from util import LOGGER
import requests, websockets
-
import re
from typing import Callable, Any
import asyncio
import json, uuid
-
-
URL = re.compile(r'(?:(?:[A-Za-z][A-Za-z0-9+.-]*://)|mailto:)[^\s]+',re.IGNORECASE)
-
MD_INLINE_LINK = re.compile(r"\[([^\]]+)\]\(([^\)]+)\)")
-
MD_AUTOLINK = re.compile(r"<((?:https?://[^\s>]+|mailto:[^\s>]+))>")
-
HASHTAG = re.compile(r'(?<!\w)\#([\w]+)')
-
FEDIVERSE_HANDLE = re.compile(r'(?<![\w@])@([\w-]+)(?:@([\w\.-]+\.[\w\.-]+))?')
-
-
def tokenize_note(note: dict) -> list[cross.Token]:
-
text: str = note.get('text', '')
-
if not text:
-
return []
-
mention_handles: dict = note.get('mentionHandles') or {}
-
tags: list[str] = note.get('tags') or []
-
-
handles: list[str] = []
-
for key, value in mention_handles.items():
-
handles.append(value)
-
-
index: int = 0
-
total: int = len(text)
-
buffer: list[str] = []
-
-
tokens: list[cross.Token] = []
-
-
def flush():
-
nonlocal buffer
-
if buffer:
-
tokens.append(cross.TextToken(''.join(buffer)))
-
buffer = []
-
-
while index < total:
-
if text[index] == '[':
-
md_inline = MD_INLINE_LINK.match(text, index)
-
if md_inline:
-
flush()
-
label = md_inline.group(1)
-
href = md_inline.group(2)
-
tokens.append(cross.LinkToken(href, label))
-
index = md_inline.end()
-
continue
-
-
if text[index] == '<':
-
md_auto = MD_AUTOLINK.match(text, index)
-
if md_auto:
-
flush()
-
href = md_auto.group(1)
-
tokens.append(cross.LinkToken(href, href))
-
index = md_auto.end()
-
continue
-
-
if text[index] == '#':
-
tag = HASHTAG.match(text, index)
-
if tag:
-
tag_text = tag.group(1)
-
if tag_text.lower() in tags:
-
flush()
-
tokens.append(cross.TagToken(tag_text))
-
index = tag.end()
-
continue
-
-
if text[index] == '@':
-
handle = FEDIVERSE_HANDLE.match(text, index)
-
if handle:
-
handle_text = handle.group(0)
-
if handle_text.strip() in handles:
-
flush()
-
tokens.append(cross.MentionToken(handle_text, '')) # TODO misskey doesn't provide a uri
-
index = handle.end()
-
continue
-
-
url = URL.match(text, index)
-
if url:
-
flush()
-
href = url.group(0)
-
tokens.append(cross.LinkToken(href, href))
-
index = url.end()
-
continue
-
-
buffer.append(text[index])
-
index += 1
-
-
flush()
-
return tokens
+
class MisskeyPost(cross.Post):
def __init__(self, note: dict, files: list[media_util.MediaInfo]) -> None:
···
self.note = note
self.sensitive = any([a.get('isSensitive', False) for a in note.get('files', [])])
self.media_attachments = files
-
self.tokens = tokenize_note(self.note)
+
+
mention_handles: dict = note.get('mentionHandles') or {}
+
tags: list[str] = note.get('tags') or []
+
+
handles: list[tuple[str, str]] = []
+
for key, value in mention_handles.items():
+
handles.append((value, value))
+
+
self.tokens = cross.tokenize_markdown(note.get('text', ''), tags, handles)
def get_tokens(self) -> list[cross.Token]:
return self.tokens
-2
util.py
···
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
LOGGER = logging.getLogger("XPost")
-
import json
-
def as_json(obj, indent=None,sort_keys=False) -> str:
return json.dumps(
obj.__dict__ if not isinstance(obj, dict) else obj,