social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky

more work, convert parsers to use utf8 offsets

zenfyr.dev 749c26db 7e647c4b

verified
Changed files
+224 -104
bluesky
util
+87
bluesky/facets.py
···
···
+
from typing import Any, override
+
import cross.fragments as f
+
from util.splitter import FragmentSplitter, canonical_label
+
+
LINK = 'app.bsky.richtext.facet#link'
+
TAG = 'app.bsky.richtext.facet#tag'
+
MENTION = "app.bsky.richtext.facet#mention"
+
+
class BskySplitter(FragmentSplitter):
+
def __init__(self):
+
super().__init__(300, 30)
+
+
@override
+
def normalize_link(self, label: str, url: str) -> str:
+
if canonical_label(label, url):
+
nlabel = url.split("://", 1)[1]
+
if len(nlabel) <= self.urllen:
+
return nlabel
+
return nlabel[: self.urllen - 1] + "…"
+
return label
+
+
# TODO handle extending overlapping fragments somehow
+
def parse_facets(
+
text: str,
+
facets: list[dict[str, Any]] | None
+
) -> tuple[str, list[f.Fragment]]:
+
if not facets:
+
return text, []
+
+
btext = text.encode("utf-8")
+
nbytes = bytearray()
+
last_original_byte_index = 0
+
fragments: list[f.Fragment] = []
+
+
for facet in facets:
+
original_start: int = facet['index']['byteStart']
+
original_end: int = facet['index']['byteEnd']
+
+
if last_original_byte_index < original_start:
+
nbytes.extend(btext[last_original_byte_index:original_start])
+
+
fdict = {feat['$type']: feat for feat in facet.get('features', [])}
+
+
original_label_bytes = btext[original_start:original_end]
+
original_label_str = original_label_bytes.decode("utf-8")
+
+
nlabel_bytes = original_label_bytes
+
+
if LINK in fdict:
+
url: str = fdict.pop(LINK)['uri']
+
label = original_label_str
+
+
split = url.split("://", 1)
+
full_url = False
+
if len(split) > 1:
+
if split[1].startswith(label):
+
full_url = True
+
if label.endswith("...") and split[1].startswith(label[:-3]):
+
full_url = True
+
+
if full_url:
+
nlabel_bytes = url.encode("utf-8")
+
+
nstart = len(nbytes)
+
nbytes.extend(nlabel_bytes)
+
nend = len(nbytes)
+
+
fragments.append(f.LinkFragment(start=nstart, end=nend, url=url))
+
else:
+
nstart = len(nbytes)
+
nbytes.extend(nlabel_bytes)
+
nend = len(nbytes)
+
+
if TAG in fdict:
+
tag: str = fdict.pop(TAG)['tag']
+
fragments.append(f.TagFragment(start=nstart, end=nend, tag=tag))
+
+
if MENTION in fdict:
+
did: str = fdict.pop(MENTION)['did']
+
fragments.append(f.MentionFragment(start=nstart, end=nend, uri=did))
+
+
last_original_byte_index = original_end
+
+
if last_original_byte_index < len(btext):
+
nbytes.extend(btext[last_original_byte_index:])
+
+
return nbytes.decode("utf-8"), fragments
+5 -2
bluesky/input.py
···
import websockets
from atproto.util import AtUri
from bluesky.info import SERVICE, BlueskyService, validate_and_transform
from cross.attachments import (
LabelsAttachment,
···
)
return
-
# TODO FRAGMENTS
-
post = Post(id=post_uri, parent_id=parent_uri, text=record["text"])
did, _, rid = AtUri.record_uri(post_uri)
post.attachments.put(
RemoteUrlAttachment(url=f"https://bsky.app/profile/{did}/post/{rid}")
···
import websockets
from atproto.util import AtUri
+
from bluesky.facets import parse_facets
from bluesky.info import SERVICE, BlueskyService, validate_and_transform
from cross.attachments import (
LabelsAttachment,
···
)
return
+
text, fragments = parse_facets(record["text"], record.get('facets'))
+
post = Post(id=post_uri, parent_id=parent_uri, text=text)
+
post.fragments.extend(fragments)
+
did, _, rid = AtUri.record_uri(post_uri)
post.attachments.put(
RemoteUrlAttachment(url=f"https://bsky.app/profile/{did}/post/{rid}")
+25 -27
util/html.py
···
from typing import override
import cross.fragments as f
-
class HTMLToFragmentsParser(HTMLParser):
def __init__(self) -> None:
super().__init__()
-
self.text: str = ""
self.fragments: list[f.Fragment] = []
self._tag_stack: dict[str, tuple[int, dict[str, str | None]]] = {}
self.in_pre: bool = False
self.in_code: bool = False
-
self.invisible: bool = False
def handle_a_endtag(self):
-
current_end = len(self.text)
start, _attr = self._tag_stack.pop("a")
href = _attr.get('href')
···
_attr = dict(attrs)
def append_newline():
-
if self.text and not self.text.endswith("\n"):
-
self.text += "\n"
if self.invisible:
return
···
if cls and 'quote-inline' in cls:
self.invisible = True
case "a":
-
self._tag_stack["a"] = (len(self.text), _attr)
case "code":
if not self.in_pre:
-
self.text += "`"
self.in_code = True
case "pre":
append_newline()
-
self.text += "```\n"
self.in_pre = True
case "blockquote":
append_newline()
-
self.text += "> "
case "strong" | "b":
-
self.text += "**"
case "em" | "i":
-
self.text += "*"
case "del" | "s":
-
self.text += "~~"
case "br":
-
self.text += "\n"
case _:
if tag in {"h1", "h2", "h3", "h4", "h5", "h6"}:
level = int(tag[1])
-
self.text += "\n" + "#" * level + " "
@override
def handle_endtag(self, tag: str) -> None:
···
self.handle_a_endtag()
case "code":
if not self.in_pre and self.in_code:
-
self.text += "`"
self.in_code = False
case "pre":
-
self.text += "\n```\n"
self.in_pre = False
case "blockquote":
-
self.text += "\n"
case "strong" | "b":
-
self.text += "**"
case "em" | "i":
-
self.text += "*"
case "del" | "s":
-
self.text += "~~"
case "p":
-
self.text += "\n\n"
case _:
if tag in ["h1", "h2", "h3", "h4", "h5", "h6"]:
-
self.text += '\n'
@override
def handle_data(self, data: str) -> None:
if not self.invisible:
-
self.text += data
def get_result(self) -> tuple[str, list[f.Fragment]]:
-
if self.text.endswith('\n\n'):
-
return self.text[:-2], self.fragments
-
return self.text, self.fragments
···
from typing import override
import cross.fragments as f
class HTMLToFragmentsParser(HTMLParser):
def __init__(self) -> None:
super().__init__()
+
self.builder: bytearray = bytearray()
self.fragments: list[f.Fragment] = []
self._tag_stack: dict[str, tuple[int, dict[str, str | None]]] = {}
self.in_pre: bool = False
self.in_code: bool = False
self.invisible: bool = False
def handle_a_endtag(self):
+
current_end = len(self.builder)
start, _attr = self._tag_stack.pop("a")
href = _attr.get('href')
···
_attr = dict(attrs)
def append_newline():
+
if self.builder and not self.builder.endswith(b"\n"):
+
self.builder.extend(b"\n")
if self.invisible:
return
···
if cls and 'quote-inline' in cls:
self.invisible = True
case "a":
+
self._tag_stack["a"] = (len(self.builder), _attr)
case "code":
if not self.in_pre:
+
self.builder.extend(b"`")
self.in_code = True
case "pre":
append_newline()
+
self.builder.extend(b"```\n")
self.in_pre = True
case "blockquote":
append_newline()
+
self.builder.extend(b"> ")
case "strong" | "b":
+
self.builder.extend(b"**")
case "em" | "i":
+
self.builder.extend(b"*")
case "del" | "s":
+
self.builder.extend(b"~~")
case "br":
+
self.builder.extend(b"\n")
case _:
if tag in {"h1", "h2", "h3", "h4", "h5", "h6"}:
level = int(tag[1])
+
self.builder.extend(("\n" + "#" * level + " ").encode('utf-8'))
@override
def handle_endtag(self, tag: str) -> None:
···
self.handle_a_endtag()
case "code":
if not self.in_pre and self.in_code:
+
self.builder.extend(b"`")
self.in_code = False
case "pre":
+
self.builder.extend(b"\n```\n")
self.in_pre = False
case "blockquote":
+
self.builder.extend(b"\n")
case "strong" | "b":
+
self.builder.extend(b"**")
case "em" | "i":
+
self.builder.extend(b"*")
case "del" | "s":
+
self.builder.extend(b"~~")
case "p":
+
self.builder.extend(b"\n\n")
case _:
if tag in ["h1", "h2", "h3", "h4", "h5", "h6"]:
+
self.builder.extend(b'\n')
@override
def handle_data(self, data: str) -> None:
if not self.invisible:
+
self.builder.extend(data.encode('utf-8'))
def get_result(self) -> tuple[str, list[f.Fragment]]:
+
if self.builder.endswith(b'\n\n'):
+
return self.builder[:-2].decode('utf-8'), self.fragments
+
return self.builder.decode('utf-8'), self.fragments
+71 -43
util/markdown.py
···
import cross.fragments as f
from util.html import HTMLToFragmentsParser
-
URL = re.compile(r"(?:(?:[A-Za-z][A-Za-z0-9+.-]*://)|mailto:)[^\s]+", re.IGNORECASE)
MD_INLINE_LINK = re.compile(
-
r"\[([^\]]+)\]\(\s*((?:(?:[A-Za-z][A-Za-z0-9+.\-]*://)|mailto:)[^\s\)]+)\s*\)",
re.IGNORECASE,
)
MD_AUTOLINK = re.compile(
-
r"<((?:(?:[A-Za-z][A-Za-z0-9+.\-]*://)|mailto:)[^\s>]+)>", re.IGNORECASE
)
-
HASHTAG = re.compile(r"(?<!\w)\#([\w]+)")
-
FEDIVERSE_HANDLE = re.compile(r"(?<![\w@])@([\w\.-]+)(?:@([\w\.-]+\.[\w\.-]+))?")
REGEXES = [URL, MD_INLINE_LINK, MD_AUTOLINK, HASHTAG, FEDIVERSE_HANDLE]
···
html_parser.feed(text)
markdown, fragments = html_parser.get_result()
index: int = 0
-
total: int = len(markdown)
-
# no match == processed fragments
-
events: list[tuple[int, int, re.Match[str] | f.Fragment, str]] = []
events.extend([(fg.start, fg.end, fg, "html") for fg in fragments])
while index < total:
-
ch = markdown[index]
-
rmatch = None
kind = None
-
if ch == "[":
-
rmatch = MD_INLINE_LINK.match(markdown, index)
kind = "inline_link"
-
# elif ch == '<':
-
# rmatch = MD_AUTOLINK.match(markdown, index)
-
# kind = "autolink"
-
elif ch == "#":
-
rmatch = HASHTAG.match(markdown, index)
kind = "hashtag"
-
elif ch == "@":
-
rmatch = FEDIVERSE_HANDLE.match(markdown, index)
kind = "mention"
else:
-
rmatch = URL.match(markdown, index)
kind = "url"
if rmatch:
···
events.sort(key=lambda x: x[0])
-
# validate fragment positions
last_end: int = 0
for start, end, _, _ in events:
if start > end:
···
)
last_end = end
-
ntext: list[str] = []
nfragments: list[f.Fragment] = []
offset: int = 0
last_index: int = 0
-
events.sort(key=lambda x: x[0])
for start, end, rmatch, event in events:
-
ntext.append(markdown[last_index:start])
if isinstance(rmatch, f.Fragment):
-
ntext.append(markdown[start:end])
nfg = replace(rmatch, start=start + offset, end=end + offset)
nfragments.append(nfg)
last_index = end
continue
nstart = start + offset
-
nend = end + offset
match event:
case "inline_link":
-
label = rmatch.group(1)
-
href = rmatch.group(2)
-
ntext.append(label)
-
delta = len(label) - (end - start)
offset += delta
-
nfragments.append(f.LinkFragment(start=nstart, end=nstart + len(label), url=href))
case "hashtag":
-
tag = rmatch.group(1)
-
ntext.append(markdown[start:end])
-
nfragments.append(f.TagFragment(start=nstart, end=nend, tag=tag))
case "mention":
-
mention = rmatch.group(0)
-
ntext.append(markdown[start:end])
-
mention = mention[1:] if mention.startswith("@") else mention
-
nfragments.append(f.MentionFragment(start=nstart, end=nend, uri=mention))
case "url":
-
url = rmatch.group(0)
-
ntext.append(markdown[start:end])
-
nfragments.append(f.LinkFragment(start=nstart, end=nend, url=url))
case _:
pass
last_index = end
-
ntext.append(markdown[last_index:])
-
return ''.join(ntext), nfragments
···
import cross.fragments as f
from util.html import HTMLToFragmentsParser
+
URL = re.compile(rb"(?:(?:[A-Za-z][A-Za-z0-9+.-]*://)|mailto:)[^\s]+", re.IGNORECASE)
MD_INLINE_LINK = re.compile(
+
rb"\[([^\]]+)\]\(\s*((?:(?:[A-Za-z][A-Za-z0-9+.\-]*://)|mailto:)[^\s\)]+)\s*\)",
re.IGNORECASE,
)
MD_AUTOLINK = re.compile(
+
rb"<((?:(?:[A-Za-z][A-Za-z0-9+.\-]*://)|mailto:)[^\s>]+)>", re.IGNORECASE
)
+
HASHTAG = re.compile(rb"(?<!\w)\#([\w]+)")
+
FEDIVERSE_HANDLE = re.compile(rb"(?<![\w@])@([\w\.-]+)(?:@([\w\.-]+\.[\w\.-]+))?")
REGEXES = [URL, MD_INLINE_LINK, MD_AUTOLINK, HASHTAG, FEDIVERSE_HANDLE]
···
html_parser.feed(text)
markdown, fragments = html_parser.get_result()
+
markdown_bytes: bytes = markdown.encode("utf-8")
+
index: int = 0
+
total: int = len(markdown_bytes)
+
events: list[tuple[int, int, re.Match[bytes] | f.Fragment, str]] = []
events.extend([(fg.start, fg.end, fg, "html") for fg in fragments])
+
while index < total:
+
ch: int = markdown_bytes[index]
+
rmatch: re.Match[bytes] | None = None
kind = None
+
if ch == b"["[0]:
+
rmatch = MD_INLINE_LINK.match(markdown_bytes, index)
kind = "inline_link"
+
# elif ch == b"<"[0]:
+
# rmatch = MD_AUTOLINK.match(markdown_bytes, index)
+
# kind = "autolink"
+
elif ch == b"#"[0]:
+
rmatch = HASHTAG.match(markdown_bytes, index)
kind = "hashtag"
+
elif ch == b"@"[0]:
+
rmatch = FEDIVERSE_HANDLE.match(markdown_bytes, index)
kind = "mention"
else:
+
rmatch = URL.match(markdown_bytes, index)
kind = "url"
if rmatch:
···
events.sort(key=lambda x: x[0])
last_end: int = 0
for start, end, _, _ in events:
if start > end:
···
)
last_end = end
+
ntext: bytearray = bytearray()
nfragments: list[f.Fragment] = []
offset: int = 0
last_index: int = 0
for start, end, rmatch, event in events:
+
ntext.extend(markdown_bytes[last_index:start])
if isinstance(rmatch, f.Fragment):
+
ntext.extend(markdown_bytes[start:end])
nfg = replace(rmatch, start=start + offset, end=end + offset)
nfragments.append(nfg)
last_index = end
continue
nstart = start + offset
match event:
case "inline_link":
+
label_bytes: bytes = rmatch.group(1)
+
href_bytes: bytes = rmatch.group(2)
+
ntext.extend(label_bytes)
+
+
delta = len(label_bytes) - (end - start)
offset += delta
+
nend = nstart + len(label_bytes)
+
nfragments.append(
+
f.LinkFragment(
+
start=nstart, end=nend, url=href_bytes.decode("utf-8")
+
)
+
)
+
case "hashtag":
+
tag_bytes: bytes = rmatch.group(1)
+
ntext.extend(markdown_bytes[start:end])
+
nend = end + offset
+
nfragments.append(
+
f.TagFragment(
+
start=nstart, end=nend, tag=tag_bytes.decode("utf-8")
+
)
+
)
+
case "mention":
+
mention_bytes: bytes = rmatch.group(0)
+
ntext.extend(markdown_bytes[start:end])
+
+
mention_str = mention_bytes.decode("utf-8")
+
mention_str = (
+
mention_str[1:] if mention_str.startswith("@") else mention_str
+
)
+
+
nend = end + offset
+
nfragments.append(
+
f.MentionFragment(start=nstart, end=nend, uri=mention_str)
+
)
+
case "url":
+
url_bytes: bytes = rmatch.group(0)
+
ntext.extend(markdown_bytes[start:end])
+
nend = end + offset
+
nfragments.append(
+
f.LinkFragment(
+
start=nstart, end=nend, url=url_bytes.decode("utf-8")
+
)
+
)
+
case _:
pass
last_index = end
+
+
ntext.extend(markdown_bytes[last_index:])
+
return ntext.decode("utf-8"), nfragments
+36 -32
util/splitter.py
···
self.urllen: int = urllen
def normalize_link(self, label: str, url: str) -> str:
-
#if canonical_label(label, url):
-
# if self.urltrunc == "dotted":
-
# nlabel = url.split("://", 1)[1]
-
# if len(nlabel) <= self.urllen:
-
# return nlabel
-
# return nlabel[: self.urllen - 1] + "…"
return label
def url_normalize(
-
self, text: str, fragments: list[Fragment]
-
) -> tuple[str, list[Fragment]]:
-
if self.urllen == -1:
-
return text, fragments
-
ntext: list[str] = []
-
nfragments: list[Fragment] = []
-
offset: int = 0
-
last_index: int = 0
-
fragments = [fg for fg in fragments]
-
fragments.sort(key=lambda x: x.start)
-
for fg in fragments:
-
ntext.append(text[last_index:fg.start])
-
label = text[fg.start:fg.end]
-
nlabel = label
-
if isinstance(fg, LinkFragment):
-
nlabel = self.normalize_link(nlabel, fg.url)
-
ntext.append(nlabel)
-
nfg = replace(fg, start=fg.start + offset)
-
change = len(nlabel) - len(label)
-
offset += change
-
nfg = replace(nfg, end=fg.end + offset)
-
nfragments.append(nfg)
-
last_index = fg.end
-
ntext.append(text[last_index:])
-
return ''.join(ntext), nfragments
def split(
self, text: str, fragments: list[Fragment]
) -> list[tuple[str, list[Fragment]]]:
text, fragments = self.url_normalize(text, fragments)
-
if grapheme.length(text) <= self.climit:
return [(text, fragments)]
···
self.urllen: int = urllen
def normalize_link(self, label: str, url: str) -> str:
return label
+
+
def tally_lenght(self, post: tuple[str, list[Fragment]]):
+
return grapheme.length(post[0])
def url_normalize(
+
self, text: str, fragments: list[Fragment]
+
) -> tuple[str, list[Fragment]]:
+
if self.urllen == -1:
+
return text, fragments
+
btext = text.encode('utf-8')
+
nbytes = bytearray()
+
nfragments: list[Fragment] = []
+
fragments = [fg for fg in fragments]
+
fragments.sort(key=lambda x: x.start)
+
last_index = 0
+
for fg in fragments:
+
if last_index < fg.start:
+
nbytes.extend(btext[last_index:fg.start])
+
label_bytes = btext[fg.start:fg.end]
+
label = label_bytes.decode('utf-8')
+
nlabel = label
+
if isinstance(fg, LinkFragment):
+
nlabel = self.normalize_link(nlabel, fg.url)
+
nlabel_bytes = nlabel.encode('utf-8')
+
nstart = len(nbytes)
+
nbytes.extend(nlabel_bytes)
+
nend = len(nbytes)
+
+
nfg = replace(fg, start=nstart, end=nend)
+
nfragments.append(nfg)
+
+
last_index = fg.end
+
+
if last_index < len(btext):
+
nbytes.extend(btext[last_index:])
+
+
return nbytes.decode('utf-8'), nfragments
def split(
self, text: str, fragments: list[Fragment]
) -> list[tuple[str, list[Fragment]]]:
text, fragments = self.url_normalize(text, fragments)
+
if self.tally_lenght((text, fragments)) <= self.climit:
return [(text, fragments)]