social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky

switch from fragments to tokens

zenfyr.dev 2a4fcb3c 98395f82

verified
-87
bluesky/facets.py
···
-
from typing import Any, override
-
import cross.fragments as f
-
from util.splitter import FragmentSplitter, canonical_label
-
-
LINK = 'app.bsky.richtext.facet#link'
-
TAG = 'app.bsky.richtext.facet#tag'
-
MENTION = "app.bsky.richtext.facet#mention"
-
-
class BskySplitter(FragmentSplitter):
-
def __init__(self):
-
super().__init__(300, 30)
-
-
@override
-
def normalize_link(self, label: str, url: str) -> str:
-
if canonical_label(label, url):
-
nlabel = url.split("://", 1)[1]
-
if len(nlabel) <= self.urllen:
-
return nlabel
-
return nlabel[: self.urllen - 1] + "…"
-
return label
-
-
# TODO handle extending overlapping fragments somehow
-
def parse_facets(
-
text: str,
-
facets: list[dict[str, Any]] | None
-
) -> tuple[str, list[f.Fragment]]:
-
if not facets:
-
return text, []
-
-
btext = text.encode("utf-8")
-
nbytes = bytearray()
-
last_original_byte_index = 0
-
fragments: list[f.Fragment] = []
-
-
for facet in facets:
-
original_start: int = facet['index']['byteStart']
-
original_end: int = facet['index']['byteEnd']
-
-
if last_original_byte_index < original_start:
-
nbytes.extend(btext[last_original_byte_index:original_start])
-
-
fdict = {feat['$type']: feat for feat in facet.get('features', [])}
-
-
original_label_bytes = btext[original_start:original_end]
-
original_label_str = original_label_bytes.decode("utf-8")
-
-
nlabel_bytes = original_label_bytes
-
-
if LINK in fdict:
-
url: str = fdict.pop(LINK)['uri']
-
label = original_label_str
-
-
split = url.split("://", 1)
-
full_url = False
-
if len(split) > 1:
-
if split[1].startswith(label):
-
full_url = True
-
if label.endswith("...") and split[1].startswith(label[:-3]):
-
full_url = True
-
-
if full_url:
-
nlabel_bytes = url.encode("utf-8")
-
-
nstart = len(nbytes)
-
nbytes.extend(nlabel_bytes)
-
nend = len(nbytes)
-
-
fragments.append(f.LinkFragment(start=nstart, end=nend, url=url))
-
else:
-
nstart = len(nbytes)
-
nbytes.extend(nlabel_bytes)
-
nend = len(nbytes)
-
-
if TAG in fdict:
-
tag: str = fdict.pop(TAG)['tag']
-
fragments.append(f.TagFragment(start=nstart, end=nend, tag=tag))
-
-
if MENTION in fdict:
-
did: str = fdict.pop(MENTION)['did']
-
fragments.append(f.MentionFragment(start=nstart, end=nend, uri=did))
-
-
last_original_byte_index = original_end
-
-
if last_original_byte_index < len(btext):
-
nbytes.extend(btext[last_original_byte_index:])
-
-
return nbytes.decode("utf-8"), fragments
+3 -4
bluesky/input.py
···
import websockets
from atproto.util import AtUri
-
from bluesky.facets import parse_facets
+
from bluesky.tokens import tokenize_post
from bluesky.info import SERVICE, BlueskyService, validate_and_transform
from cross.attachments import (
LabelsAttachment,
···
)
return
-
text, fragments = parse_facets(record["text"], record.get('facets'))
-
post = Post(id=post_uri, parent_id=parent_uri, text=text)
-
post.fragments.extend(fragments)
+
tokens = tokenize_post(record["text"], record.get('facets', {}))
+
post = Post(id=post_uri, parent_id=parent_uri, tokens=tokens)
did, _, rid = AtUri.record_uri(post_uri)
post.attachments.put(
+95
bluesky/tokens.py
···
+
from cross.tokens import LinkToken, MentionToken, TagToken, TextToken, Token
+
+
+
def tokenize_post(text: str, facets: list[dict]) -> list[Token]:
+
def decode(ut8: bytes) -> str:
+
return ut8.decode(encoding="utf-8")
+
+
if not text:
+
return []
+
ut8_text = text.encode(encoding="utf-8")
+
if not facets:
+
return [TextToken(text=decode(ut8_text))]
+
+
slices: list[tuple[int, int, str, str]] = []
+
+
for facet in facets:
+
features: list[dict] = facet.get("features", [])
+
if not features:
+
continue
+
+
# we don't support overlapping facets/features
+
feature = features[0]
+
feature_type = feature["$type"]
+
index = facet["index"]
+
match feature_type:
+
case "app.bsky.richtext.facet#tag":
+
slices.append(
+
(index["byteStart"], index["byteEnd"], "tag", feature["tag"])
+
)
+
case "app.bsky.richtext.facet#link":
+
slices.append(
+
(index["byteStart"], index["byteEnd"], "link", feature["uri"])
+
)
+
case "app.bsky.richtext.facet#mention":
+
slices.append(
+
(index["byteStart"], index["byteEnd"], "mention", feature["did"])
+
)
+
+
if not slices:
+
return [TextToken(text=decode(ut8_text))]
+
+
slices.sort(key=lambda s: s[0])
+
unique: list[tuple[int, int, str, str]] = []
+
current_end = 0
+
for start, end, ttype, val in slices:
+
if start >= current_end:
+
unique.append((start, end, ttype, val))
+
current_end = end
+
+
if not unique:
+
return [TextToken(text=decode(ut8_text))]
+
+
tokens: list[Token] = []
+
prev = 0
+
+
for start, end, ttype, val in unique:
+
if start > prev:
+
# text between facets
+
tokens.append(TextToken(text=decode(ut8_text[prev:start])))
+
# facet token
+
match ttype:
+
case "link":
+
label = decode(ut8_text[start:end])
+
+
# try to unflatten links
+
split = val.split("://", 1)
+
if len(split) > 1:
+
if split[1].startswith(label):
+
tokens.append(LinkToken(href=val))
+
prev = end
+
continue
+
+
if label.endswith("...") and split[1].startswith(label[:-3]):
+
tokens.append(LinkToken(href=val))
+
prev = end
+
continue
+
+
tokens.append(LinkToken(href=val, label=label))
+
case "tag":
+
tag = decode(ut8_text[start:end])
+
tokens.append(TagToken(tag=tag[1:] if tag.startswith("#") else tag))
+
case "mention":
+
mention = decode(ut8_text[start:end])
+
tokens.append(
+
MentionToken(
+
username=mention[1:] if mention.startswith("@") else mention,
+
uri=val,
+
)
+
)
+
prev = end
+
+
if prev < len(ut8_text):
+
tokens.append(TextToken(text=decode(ut8_text[prev:])))
+
+
return tokens
-25
cross/fragments.py
···
-
from dataclasses import dataclass
-
-
-
@dataclass(kw_only=True)
-
class Fragment:
-
start: int
-
end: int
-
-
-
@dataclass(kw_only=True)
-
class LinkFragment(Fragment):
-
url: str
-
-
-
@dataclass(kw_only=True)
-
class TagFragment(Fragment):
-
tag: str
-
-
-
@dataclass(kw_only=True)
-
class MentionFragment(Fragment):
-
uri: str
-
-
-
NON_OVERLAPPING: set[type[Fragment]] = {LinkFragment, TagFragment, MentionFragment}
+2 -3
cross/post.py
···
from typing import TypeVar
from cross.attachments import Attachment
-
from cross.fragments import Fragment
+
from cross.tokens import Token
T = TypeVar("T", bound=Attachment)
···
class Post:
id: str
parent_id: str | None
-
text: str # utf-8 text
+
tokens: list[Token]
attachments: AttachmentKeeper = field(default_factory=AttachmentKeeper)
-
fragments: list[Fragment] = field(default_factory=list)
+23
cross/tokens.py
···
+
from dataclasses import dataclass
+
+
@dataclass(kw_only=True)
+
class Token:
+
pass
+
+
@dataclass(kw_only=True)
+
class TextToken(Token):
+
text: str
+
+
@dataclass(kw_only=True)
+
class LinkToken(Token):
+
href: str
+
label: str | None = None
+
+
@dataclass(kw_only=True)
+
class TagToken(Token):
+
tag: str
+
+
@dataclass(kw_only=True)
+
class MentionToken(Token):
+
username: str
+
uri: str | None = None
+3 -4
mastodon/input.py
···
"Skipping %s, parent %s not found in db", status["id"], in_reply
)
return
-
parser = StatusParser()
+
parser = StatusParser(status)
parser.feed(status["content"])
-
text, fragments = parser.get_result()
+
tokens = parser.get_result()
-
post = Post(id=status["id"], parent_id=in_reply, text=text)
-
post.fragments.extend(fragments)
+
post = Post(id=status["id"], parent_id=in_reply, tokens=tokens)
if quote:
post.attachments.put(QuoteAttachment(quoted_id=quote['id'], quoted_user=self.user_id))
+14 -22
mastodon/parser.py
···
from typing import Any, override
-
import cross.fragments as f
-
from util.html import HTMLToFragmentsParser
+
+
from cross.tokens import LinkToken, MentionToken, TagToken
+
from util.html import HTMLToTokensParser
-
class StatusParser(HTMLToFragmentsParser):
+
class StatusParser(HTMLToTokensParser):
def __init__(self, status: dict[str, Any]) -> None:
super().__init__()
self.tags: set[str] = set(tag["url"] for tag in status.get("tags", []))
···
@override
def handle_a_endtag(self):
-
current_end = len(self.builder)
-
start, _attr = self._tag_stack.pop("a")
+
label, _attr = self._tag_stack.pop("a")
href = _attr.get("href")
-
if href and current_end > start:
+
if href:
cls = _attr.get("class", "")
if cls:
if "hashtag" in cls and href in self.tags:
-
tag = self.builder[start:current_end]
-
tag = tag[1:] if tag.startswith(b"#") else tag
+
tag = label[1:] if label.startswith("#") else label
-
self.fragments.append(
-
f.TagFragment(
-
start=start, end=current_end, tag=tag.decode("utf-8")
-
)
-
)
+
self.tokens.append(TagToken(tag=tag))
return
-
if "mention" in cls:
-
if href in self.mentions:
-
self.fragments.append(
-
f.MentionFragment(start=start, end=current_end, uri=href)
-
)
-
return
-
self.fragments.append(
-
f.LinkFragment(start=start, end=current_end, url=href)
-
)
+
if "mention" in cls and href in self.mentions:
+
username = label[1:] if label.startswith("@") else label
+
+
self.tokens.append(MentionToken(username=username, uri=href))
+
return
+
self.tokens.append(LinkToken(href=href, label=label))
+9 -3
misskey/input.py
···
)
return
+
mention_handles: dict = note.get("mentionHandles") or {}
+
tags: list[str] = note.get("tags") or []
+
+
handles: list[tuple[str, str]] = []
+
for key, value in mention_handles.items():
+
handles.append((value, value))
+
parser = MarkdownParser() # TODO MFM parser
-
text, fragments = parser.parse(note.get("text", ""))
-
post = Post(id=note["id"], parent_id=reply["id"] if reply else None, text=text)
-
post.fragments.extend(fragments)
+
tokens = parser.parse(note.get("text", ""), tags, handles)
+
post = Post(id=note["id"], parent_id=reply["id"] if reply else None, tokens=tokens)
post.attachments.put(RemoteUrlAttachment(url=self.url + "/notes/" + note["id"]))
if renote:
-32
tests/util/html_test.py
···
-
import html
-
from util.html import HTMLToFragmentsParser
-
import cross.fragments as f
-
import pytest
-
-
-
@pytest.fixture()
-
def parser():
-
return HTMLToFragmentsParser()
-
-
-
def test_html(parser: HTMLToFragmentsParser):
-
input = '<p><del>excuse</del> <em>me</em>, <strong>test</strong> post</p><blockquote><p>very testy <a href="https://google.com" target="_blank" rel="nofollow noopener">post</a></p></blockquote><pre><code>cat &lt;&lt; food<br></code></pre>'
-
parser.feed(input)
-
text, frgs = parser.get_result()
-
-
excepted = "~~excuse~~ *me*, **test** post\n\n> very testy post\n\n\n```\ncat << food\n```"
-
assert text == excepted
-
assert len(frgs) == 1
-
-
assert isinstance(frgs[0], f.LinkFragment)
-
assert frgs[0].start == 46 and frgs[0].end == 50
-
assert frgs[0].url == "https://google.com"
-
-
-
def test_keep_autolink(parser: HTMLToFragmentsParser):
-
input = "<https://google.com>"
-
parser.feed(input)
-
text, frgs = parser.get_result()
-
-
# TODO
-
# assert text == input
-160
tests/util/markdown_test.py
···
-
from util.markdown import MarkdownParser
-
import cross.fragments as f
-
import pytest
-
-
EMOJI = "🤬🤬"
-
-
-
@pytest.fixture()
-
def parser():
-
return MarkdownParser()
-
-
-
def test_empty(parser: MarkdownParser):
-
text, frgs = parser.parse("")
-
assert text == ""
-
assert frgs == []
-
-
-
def test_no_formatting(parser: MarkdownParser):
-
text, frgs = parser.parse("text no formatting!")
-
assert text == "text no formatting!"
-
assert frgs == []
-
-
-
def test_link(parser: MarkdownParser):
-
text, frgs = parser.parse("https://google.com")
-
assert text == "https://google.com"
-
assert len(frgs) == 1
-
-
assert isinstance(frgs[0], f.LinkFragment)
-
assert frgs[0].start == 0 and frgs[0].end == 18
-
assert frgs[0].url == "https://google.com"
-
-
-
def test_link_emojis(parser: MarkdownParser):
-
input = f"{EMOJI} https://google.com"
-
text, frgs = parser.parse(input)
-
assert text == input
-
assert len(frgs) == 1
-
-
assert isinstance(frgs[0], f.LinkFragment)
-
assert frgs[0].start == 9 and frgs[0].end == 27
-
assert frgs[0].url == "https://google.com"
-
-
-
def test_label_link(parser: MarkdownParser):
-
text, frgs = parser.parse("[hello](https://google.com)")
-
assert text == "hello"
-
assert len(frgs) == 1
-
-
assert isinstance(frgs[0], f.LinkFragment)
-
assert frgs[0].start == 0 and frgs[0].end == 5
-
assert frgs[0].url == "https://google.com"
-
-
-
def test_label_link_emojis(parser: MarkdownParser):
-
input = f"[{EMOJI}]( https://google.com)"
-
text, frgs = parser.parse(input)
-
assert text == EMOJI
-
assert len(frgs) == 1
-
-
assert isinstance(frgs[0], f.LinkFragment)
-
assert frgs[0].start == 0 and frgs[0].end == 8
-
assert frgs[0].url == "https://google.com"
-
-
-
def test_tag(parser: MarkdownParser):
-
input = "#testing"
-
text, frgs = parser.parse(input)
-
assert text == input
-
assert len(frgs) == 1
-
-
assert isinstance(frgs[0], f.TagFragment)
-
assert frgs[0].start == 0 and frgs[0].end == 8
-
assert frgs[0].tag == "testing"
-
-
-
def test_tag_emojis(parser: MarkdownParser):
-
input = f"{EMOJI} #testing"
-
text, frgs = parser.parse(input)
-
assert text == input
-
assert len(frgs) == 1
-
-
assert isinstance(frgs[0], f.TagFragment)
-
assert frgs[0].start == 9 and frgs[0].end == 17
-
assert frgs[0].tag == "testing"
-
-
-
def test_mention(parser: MarkdownParser):
-
input = "@zen@merping.synth.download"
-
text, frgs = parser.parse(input)
-
assert text == input
-
assert len(frgs) == 1
-
-
assert isinstance(frgs[0], f.MentionFragment)
-
assert frgs[0].start == 0 and frgs[0].end == 27
-
assert frgs[0].uri == "zen@merping.synth.download"
-
-
-
def test_mention_emojis(parser: MarkdownParser):
-
input = f"{EMOJI} @zen@merping.synth.download"
-
text, frgs = parser.parse(input)
-
assert text == input
-
assert len(frgs) == 1
-
-
assert isinstance(frgs[0], f.MentionFragment)
-
assert frgs[0].start == 9 and frgs[0].end == 36
-
assert frgs[0].uri == "zen@merping.synth.download"
-
-
-
def test_mixed(parser: MarkdownParser):
-
input = "#testing_tag @zen@merping.synth.download [hello](https://zenfyr.dev/) hii! https://example.com"
-
text, frgs = parser.parse(input)
-
-
expected_text = (
-
"#testing_tag @zen@merping.synth.download hello hii! https://example.com"
-
)
-
assert text == expected_text
-
assert len(frgs) == 4
-
-
assert isinstance(frgs[0], f.TagFragment)
-
assert frgs[0].start == 0 and frgs[0].end == 12
-
assert frgs[0].tag == "testing_tag"
-
-
assert isinstance(frgs[1], f.MentionFragment)
-
assert frgs[1].start == 13 and frgs[1].end == 40
-
assert frgs[1].uri == "zen@merping.synth.download"
-
-
assert isinstance(frgs[2], f.LinkFragment)
-
assert frgs[2].start == 41 and frgs[2].end == 46
-
assert frgs[2].url == "https://zenfyr.dev/"
-
-
assert isinstance(frgs[3], f.LinkFragment)
-
assert frgs[3].start == 52 and frgs[3].end == 71
-
assert frgs[3].url == "https://example.com"
-
-
-
def test_mixed_html(parser: MarkdownParser):
-
input = f'<p>#testing_tag @zen@merping.synth.download</p> {EMOJI} <a href="https://zenfyr.dev/"><b>hello</b></a> hii! https://example.com'
-
text, frgs = parser.parse(input)
-
-
expected_text = f"#testing_tag @zen@merping.synth.download\n\n {EMOJI} **hello** hii! https://example.com"
-
assert text == expected_text
-
assert len(frgs) == 4
-
-
assert isinstance(frgs[0], f.TagFragment)
-
assert frgs[0].start == 0 and frgs[0].end == 12
-
assert frgs[0].tag == "testing_tag"
-
-
assert isinstance(frgs[1], f.MentionFragment)
-
assert frgs[1].start == 13 and frgs[1].end == 40
-
assert frgs[1].uri == "zen@merping.synth.download"
-
-
assert isinstance(frgs[2], f.LinkFragment)
-
assert frgs[2].start == 52 and frgs[2].end == 61
-
assert frgs[2].url == "https://zenfyr.dev/"
-
-
assert isinstance(frgs[3], f.LinkFragment)
-
assert frgs[3].start == 67 and frgs[3].end == 86
-
assert frgs[3].url == "https://example.com"
+78 -44
util/html.py
···
from html.parser import HTMLParser
from typing import override
-
import cross.fragments as f
-
class HTMLToFragmentsParser(HTMLParser):
+
from cross.tokens import LinkToken, TextToken, Token
+
from util.splitter import canonical_label
+
+
+
class HTMLToTokensParser(HTMLParser):
def __init__(self) -> None:
super().__init__()
-
self.builder: bytearray = bytearray()
-
self.fragments: list[f.Fragment] = []
+
self.tokens: list[Token] = []
-
self._tag_stack: dict[str, tuple[int, dict[str, str | None]]] = {}
+
self._tag_stack: dict[str, tuple[str, dict[str, str | None]]] = {}
self.in_pre: bool = False
self.in_code: bool = False
self.invisible: bool = False
def handle_a_endtag(self):
-
current_end = len(self.builder)
-
start, _attr = self._tag_stack.pop("a")
+
label, _attr = self._tag_stack.pop("a")
+
+
href = _attr.get("href")
+
if href:
+
if canonical_label(label, href):
+
self.tokens.append(LinkToken(href=href))
+
else:
+
self.tokens.append(LinkToken(href=href, label=label))
-
href = _attr.get('href')
-
if href and current_end > start:
-
self.fragments.append(
-
f.LinkFragment(start=start, end=current_end, url=href)
-
)
+
def append_text(self, text: str):
+
self.tokens.append(TextToken(text=text))
def append_newline(self):
-
if self.builder and not self.builder.endswith(b"\n"):
-
self.builder.extend(b"\n")
+
if self.tokens:
+
last_token = self.tokens[-1]
+
if isinstance(last_token, TextToken) and not last_token.text.endswith("\n"):
+
self.tokens.append(TextToken(text="\n"))
@override
def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None:
···
match tag:
case "p":
-
cls = _attr.get('class', '')
-
if cls and 'quote-inline' in cls:
+
cls = _attr.get("class", "")
+
if cls and "quote-inline" in cls:
self.invisible = True
case "a":
-
self._tag_stack["a"] = (len(self.builder), _attr)
+
self._tag_stack["a"] = ("", _attr)
case "code":
if not self.in_pre:
-
self.builder.extend(b"`")
+
self.append_text("`")
self.in_code = True
case "pre":
self.append_newline()
-
self.builder.extend(b"```\n")
+
self.append_text("```\n")
self.in_pre = True
case "blockquote":
self.append_newline()
-
self.builder.extend(b"> ")
+
self.append_text("> ")
case "strong" | "b":
-
self.builder.extend(b"**")
+
self.append_text("**")
case "em" | "i":
-
self.builder.extend(b"*")
+
self.append_text("*")
case "del" | "s":
-
self.builder.extend(b"~~")
+
self.append_text("~~")
case "br":
-
self.builder.extend(b"\n")
+
self.append_text("\n")
case "h1" | "h2" | "h3" | "h4" | "h5" | "h6":
level = int(tag[1])
-
self.builder.extend(("\n" + "#" * level + " ").encode('utf-8'))
+
self.append_text("\n" + "#" * level + " ")
case _:
-
#self.builder.extend(f"<{tag}>".encode("utf-8"))
+
# self.builder.extend(f"<{tag}>".encode("utf-8"))
pass
-
@override
def handle_endtag(self, tag: str) -> None:
···
self.handle_a_endtag()
case "code":
if not self.in_pre and self.in_code:
-
self.builder.extend(b"`")
+
self.append_text("`")
self.in_code = False
case "pre":
self.append_newline()
-
self.builder.extend(b"```\n")
+
self.append_text("```\n")
self.in_pre = False
case "blockquote":
-
self.builder.extend(b"\n")
+
self.append_text("\n")
case "strong" | "b":
-
self.builder.extend(b"**")
+
self.append_text("**")
case "em" | "i":
-
self.builder.extend(b"*")
+
self.append_text("*")
case "del" | "s":
-
self.builder.extend(b"~~")
+
self.append_text("~~")
case "p":
-
self.builder.extend(b"\n\n")
+
self.append_text("\n\n")
case "h1" | "h2" | "h3" | "h4" | "h5" | "h6":
-
self.builder.extend(b'\n')
+
self.append_text("\n")
case _:
-
#self.builder.extend(f"</{tag}>".encode("utf-8"))
+
# self.builder.extend(f"</{tag}>".encode("utf-8"))
pass
@override
def handle_data(self, data: str) -> None:
-
if not self.invisible:
-
self.builder.extend(data.encode('utf-8'))
+
if self.invisible:
+
return
+
+
if self._tag_stack.get('a'):
+
label, _attr = self._tag_stack.pop("a")
+
self._tag_stack["a"] = (label + data, _attr)
+
return
+
+
def get_result(self) -> list[Token]:
+
if not self.tokens:
+
return []
+
+
combined: list[Token] = []
+
buffer: list[str] = []
+
+
def flush_buffer():
+
if buffer:
+
merged = "".join(buffer)
+
combined.append(TextToken(text=merged))
+
buffer.clear()
-
def get_result(self) -> tuple[str, list[f.Fragment]]:
-
if self.builder.endswith(b'\n\n'):
-
return self.builder[:-2].decode('utf-8'), self.fragments
-
if self.builder.endswith(b'\n'):
-
return self.builder[:-1].decode('utf-8'), self.fragments
-
return self.builder.decode('utf-8'), self.fragments
+
for token in self.tokens:
+
if isinstance(token, TextToken):
+
buffer.append(token.text)
+
else:
+
flush_buffer()
+
combined.append(token)
+
+
flush_buffer()
+
+
if combined and isinstance(combined[-1], TextToken):
+
if combined[-1].text.endswith("\n\n"):
+
combined[-1] = TextToken(text=combined[-1].text[:-2])
+
+
if combined[-1].text.endswith("\n"):
+
combined[-1] = TextToken(text=combined[-1].text[:-1])
+
return combined
+92 -122
util/markdown.py
···
-
from dataclasses import replace
import re
-
import cross.fragments as f
-
from util.html import HTMLToFragmentsParser
-
URL = re.compile(rb"(?:(?:[A-Za-z][A-Za-z0-9+.-]*://)|mailto:)[^\s]+", re.IGNORECASE)
+
from cross.tokens import LinkToken, MentionToken, TagToken, TextToken, Token
+
from util.html import HTMLToTokensParser
+
from util.splitter import canonical_label
+
+
URL = re.compile(r"(?:(?:[A-Za-z][A-Za-z0-9+.-]*://)|mailto:)[^\s]+", re.IGNORECASE)
MD_INLINE_LINK = re.compile(
-
rb"\[([^\]]+)\]\(\s*((?:(?:[A-Za-z][A-Za-z0-9+.\-]*://)|mailto:)[^\s\)]+)\s*\)",
+
r"\[([^\]]+)\]\(\s*((?:(?:[A-Za-z][A-Za-z0-9+.\-]*://)|mailto:)[^\s\)]+)\s*\)",
re.IGNORECASE,
)
MD_AUTOLINK = re.compile(
-
rb"<((?:(?:[A-Za-z][A-Za-z0-9+.\-]*://)|mailto:)[^\s>]+)>", re.IGNORECASE
+
r"<((?:(?:[A-Za-z][A-Za-z0-9+.\-]*://)|mailto:)[^\s>]+)>", re.IGNORECASE
)
-
HASHTAG = re.compile(rb"(?<!\w)\#([\w]+)")
-
FEDIVERSE_HANDLE = re.compile(rb"(?<![\w@])@([\w\.-]+)(?:@([\w\.-]+\.[\w\.-]+))?")
+
HASHTAG = re.compile(r"(?<!\w)\#([\w]+)")
+
FEDIVERSE_HANDLE = re.compile(r"(?<![\w@])@([\w\.-]+)(?:@([\w\.-]+\.[\w\.-]+))?")
REGEXES = [URL, MD_INLINE_LINK, MD_AUTOLINK, HASHTAG, FEDIVERSE_HANDLE]
# TODO autolinks are broken by the html parser
class MarkdownParser:
-
def parse(self, text: str) -> tuple[str, list[f.Fragment]]:
+
def parse(
+
self, text: str, tags: list[str], handles: list[tuple[str, str]]
+
) -> list[Token]:
if not text:
-
return "", []
-
-
html_parser = HTMLToFragmentsParser()
-
html_parser.feed(text)
-
markdown, fragments = html_parser.get_result()
-
-
markdown_bytes: bytes = markdown.encode("utf-8")
+
return []
-
index: int = 0
-
total: int = len(markdown_bytes)
+
tokenizer = HTMLToTokensParser()
+
tokenizer.feed(text)
+
html_tokens = tokenizer.get_result()
-
events: list[tuple[int, int, re.Match[bytes] | f.Fragment, str]] = []
-
events.extend([(fg.start, fg.end, fg, "html") for fg in fragments])
+
tokens: list[Token] = []
-
while index < total:
-
ch: int = markdown_bytes[index]
-
rmatch: re.Match[bytes] | None = None
-
kind = None
-
-
if ch == b"["[0]:
-
rmatch = MD_INLINE_LINK.match(markdown_bytes, index)
-
kind = "inline_link"
-
# elif ch == b"<"[0]:
-
# rmatch = MD_AUTOLINK.match(markdown_bytes, index)
-
# kind = "autolink"
-
elif ch == b"#"[0]:
-
rmatch = HASHTAG.match(markdown_bytes, index)
-
kind = "hashtag"
-
elif ch == b"@"[0]:
-
rmatch = FEDIVERSE_HANDLE.match(markdown_bytes, index)
-
kind = "mention"
-
else:
-
rmatch = URL.match(markdown_bytes, index)
-
kind = "url"
-
-
if rmatch:
-
start, end = rmatch.start(), rmatch.end()
-
if end == index:
-
index += 1
+
for tk in html_tokens:
+
if isinstance(tk, TextToken):
+
tokens.extend(self.__tokenize_md(tk.text, tags, handles))
+
elif isinstance(tk, LinkToken):
+
if not tk.label or canonical_label(tk.label, tk.href):
+
tokens.append(tk)
continue
-
events.append((start, end, rmatch, kind))
-
index = end
-
continue
-
index += 1
-
-
events.sort(key=lambda x: x[0])
-
-
last_end: int = 0
-
for start, end, _, _ in events:
-
if start > end:
-
raise Exception(f"Invalid fragment position start={start}, end={end}")
-
if last_end > start:
-
raise Exception(
-
f"Overlapping text fragments at position end={last_end}, start={start}"
+
tokens.extend(
+
self.__tokenize_md(f"[{tk.label}]({tk.href})", tags, handles)
)
-
last_end = end
-
-
ntext: bytearray = bytearray()
-
nfragments: list[f.Fragment] = []
-
-
offset: int = 0
-
last_index: int = 0
-
-
for start, end, rmatch, event in events:
-
ntext.extend(markdown_bytes[last_index:start])
+
else:
+
tokens.append(tk)
-
if isinstance(rmatch, f.Fragment):
-
ntext.extend(markdown_bytes[start:end])
-
nfg = replace(rmatch, start=start + offset, end=end + offset)
-
nfragments.append(nfg)
-
last_index = end
-
continue
+
return tokens
-
nstart = start + offset
-
match event:
-
case "inline_link":
-
label_bytes: bytes = rmatch.group(1)
-
href_bytes: bytes = rmatch.group(2)
+
def __tokenize_md(
+
self, text: str, tags: list[str], handles: list[tuple[str, str]]
+
) -> list[Token]:
+
index: int = 0
+
total: int = len(text)
+
buffer: list[str] = []
-
ntext.extend(label_bytes)
+
tokens: list[Token] = []
-
delta = len(label_bytes) - (end - start)
-
offset += delta
+
def flush():
+
nonlocal buffer
+
if buffer:
+
tokens.append(TextToken(text="".join(buffer)))
+
buffer = []
-
nend = nstart + len(label_bytes)
-
nfragments.append(
-
f.LinkFragment(
-
start=nstart, end=nend, url=href_bytes.decode("utf-8")
-
)
-
)
+
while index < total:
+
if text[index] == "[":
+
md_inline = MD_INLINE_LINK.match(text, index)
+
if md_inline:
+
flush()
+
label = md_inline.group(1)
+
href = md_inline.group(2)
+
tokens.append(LinkToken(href=href, label=label))
+
index = md_inline.end()
+
continue
-
case "hashtag":
-
tag_bytes: bytes = rmatch.group(1)
-
ntext.extend(markdown_bytes[start:end])
-
nend = end + offset
-
nfragments.append(
-
f.TagFragment(
-
start=nstart, end=nend, tag=tag_bytes.decode("utf-8")
-
)
-
)
+
if text[index] == "<":
+
md_auto = MD_AUTOLINK.match(text, index)
+
if md_auto:
+
flush()
+
href = md_auto.group(1)
+
tokens.append(LinkToken(href=href, label=None))
+
index = md_auto.end()
+
continue
-
case "mention":
-
mention_bytes: bytes = rmatch.group(0)
-
ntext.extend(markdown_bytes[start:end])
+
if text[index] == "#":
+
tag = HASHTAG.match(text, index)
+
if tag:
+
tag_text = tag.group(1)
+
if tag_text.lower() in tags:
+
flush()
+
tokens.append(TagToken(tag=tag_text))
+
index = tag.end()
+
continue
-
mention_str = mention_bytes.decode("utf-8")
-
mention_str = (
-
mention_str[1:] if mention_str.startswith("@") else mention_str
-
)
+
if text[index] == "@":
+
handle = FEDIVERSE_HANDLE.match(text, index)
+
if handle:
+
handle_text = handle.group(0)
+
stripped_handle = handle_text.strip()
-
nend = end + offset
-
nfragments.append(
-
f.MentionFragment(start=nstart, end=nend, uri=mention_str)
+
match = next(
+
(pair for pair in handles if stripped_handle in pair), None
)
-
case "url":
-
url_bytes: bytes = rmatch.group(0)
-
ntext.extend(markdown_bytes[start:end])
-
nend = end + offset
-
nfragments.append(
-
f.LinkFragment(
-
start=nstart, end=nend, url=url_bytes.decode("utf-8")
-
)
-
)
+
if match:
+
flush()
+
tokens.append(
+
MentionToken(username=match[1], uri=None)
+
) # TODO: misskey doesn’t provide a uri
+
index = handle.end()
+
continue
-
case _:
-
pass
-
last_index = end
+
url = URL.match(text, index)
+
if url:
+
flush()
+
href = url.group(0)
+
tokens.append(LinkToken(href=href, label=None))
+
index = url.end()
+
continue
-
ntext.extend(markdown_bytes[last_index:])
+
buffer.append(text[index])
+
index += 1
-
return ntext.decode("utf-8"), nfragments
+
flush()
+
return tokens
+89 -45
util/splitter.py
···
+
import re
+
from dataclasses import replace
+
import grapheme
-
from cross.fragments import Fragment, LinkFragment
-
from dataclasses import replace
+
+
from cross.tokens import LinkToken, TagToken, TextToken, Token
def canonical_label(label: str | None, href: str):
···
return False
-
class FragmentSplitter:
-
def __init__(self, climit: int, urllen: int):
-
self.climit: int = climit
-
self.urllen: int = urllen
+
ALTERNATE = re.compile(r"\S+|\s+")
-
def normalize_link(self, label: str, url: str) -> str:
-
return label
-
def tally_lenght(self, post: tuple[str, list[Fragment]]):
-
return grapheme.length(post[0])
+
def split_tokens(
+
tokens: list[Token],
+
max_chars: int,
+
max_link_len: int = 35,
+
) -> list[list[Token]]:
+
def new_block() -> None:
+
nonlocal blocks, block, length
+
if block:
+
blocks.append(block)
+
block, length = [], 0
-
def url_normalize(
-
self, text: str, fragments: list[Fragment]
-
) -> tuple[str, list[Fragment]]:
-
if self.urllen == -1:
-
return text, fragments
-
btext = text.encode('utf-8')
+
def append_text(text: str) -> None:
+
nonlocal block
+
if block and isinstance(block[-1], TextToken):
+
block[-1] = replace(block[-1], text=block[-1].text + text)
+
else:
+
block.append(TextToken(text=text))
-
nbytes = bytearray()
-
nfragments: list[Fragment] = []
+
blocks: list[list[Token]] = []
+
block: list[Token] = []
+
length: int = 0
-
fragments = [fg for fg in fragments]
-
fragments.sort(key=lambda x: x.start)
+
for tk in tokens:
+
if isinstance(tk, TagToken):
+
tag_len = 1 + grapheme.length(tk.tag)
+
if length + tag_len > max_chars:
+
new_block()
+
block.append(tk)
+
length += tag_len
+
continue
+
if isinstance(tk, LinkToken):
+
label_text = tk.label or ""
+
link_len = grapheme.length(label_text)
-
last_index = 0
+
if canonical_label(tk.label, tk.href):
+
link_len = min(link_len, max_link_len)
-
for fg in fragments:
-
if last_index < fg.start:
-
nbytes.extend(btext[last_index:fg.start])
+
if length + link_len <= max_chars:
+
block.append(tk)
+
length += link_len
+
continue
-
label_bytes = btext[fg.start:fg.end]
-
label = label_bytes.decode('utf-8')
+
if length:
+
new_block()
-
nlabel = label
-
if isinstance(fg, LinkFragment):
-
nlabel = self.normalize_link(nlabel, fg.url)
+
remaining = label_text
+
while remaining:
+
room = (
+
max_chars
+
- length
+
- (0 if grapheme.length(remaining) <= max_chars else 1)
+
)
+
chunk = grapheme.slice(remaining, 0, room)
+
if grapheme.length(remaining) > room:
+
chunk += "-"
-
nlabel_bytes = nlabel.encode('utf-8')
+
block.append(replace(tk, label=chunk))
+
length += grapheme.length(chunk)
-
nstart = len(nbytes)
-
nbytes.extend(nlabel_bytes)
-
nend = len(nbytes)
+
remaining = grapheme.slice(remaining, room, grapheme.length(remaining))
+
if remaining:
+
new_block()
+
continue
+
if isinstance(tk, TextToken):
+
for seg in ALTERNATE.findall(tk.text):
+
seg_len = grapheme.length(seg)
-
nfg = replace(fg, start=nstart, end=nend)
-
nfragments.append(nfg)
+
if length + seg_len <= max_chars - (0 if seg.isspace() else 1):
+
append_text(seg)
+
length += seg_len
+
continue
-
last_index = fg.end
+
if length:
+
new_block()
-
if last_index < len(btext):
-
nbytes.extend(btext[last_index:])
+
if not seg.isspace():
+
while grapheme.length(seg) > max_chars - 1:
+
chunk = grapheme.slice(seg, 0, max_chars - 1) + "-"
+
append_text(chunk)
+
new_block()
+
seg = grapheme.slice(seg, max_chars - 1, grapheme.length(seg))
+
else:
+
while grapheme.length(seg) > max_chars:
+
chunk = grapheme.slice(seg, 0, max_chars)
+
append_text(chunk)
+
new_block()
+
seg = grapheme.slice(seg, max_chars, grapheme.length(seg))
-
return nbytes.decode('utf-8'), nfragments
+
if seg:
+
append_text(seg)
+
length = grapheme.length(seg)
+
continue
+
block.append(tk)
+
if block:
+
blocks.append(block)
-
def split(
-
self, text: str, fragments: list[Fragment]
-
) -> list[tuple[str, list[Fragment]]]:
-
text, fragments = self.url_normalize(text, fragments)
-
if self.tally_lenght((text, fragments)) <= self.climit:
-
return [(text, fragments)]
+
return blocks