from typing import Callable, Any from util.database import DataBaseWorker from datetime import datetime, timezone from util.media import MediaInfo from util.util import LOGGER, canonical_label import re ALTERNATE = re.compile(r'\S+|\s+') URL = re.compile(r'(?:(?:[A-Za-z][A-Za-z0-9+.-]*://)|mailto:)[^\s]+', re.IGNORECASE) MD_INLINE_LINK = re.compile(r"\[([^\]]+)\]\(\s*((?:(?:[A-Za-z][A-Za-z0-9+.\-]*://)|mailto:)[^\s\)]+)\s*\)", re.IGNORECASE) MD_AUTOLINK = re.compile(r"<((?:(?:[A-Za-z][A-Za-z0-9+.\-]*://)|mailto:)[^\s>]+)>", re.IGNORECASE) HASHTAG = re.compile(r'(? None: self.type = type class TextToken(Token): def __init__(self, text: str) -> None: super().__init__('text') self.text = text # token that represents a link to a website. e.g. [link](https://google.com/) class LinkToken(Token): def __init__(self, href: str, label: str) -> None: super().__init__('link') self.href = href self.label = label # token that represents a hashtag. e.g. #SocialMedia class TagToken(Token): def __init__(self, tag: str) -> None: super().__init__('tag') self.tag = tag # token that represents a mention of a user. class MentionToken(Token): def __init__(self, username: str, uri: str) -> None: super().__init__('mention') self.username = username self.uri = uri class MediaMeta(): def __init__(self, width: int, height: int, duration: float) -> None: self.width = width self.height = height self.duration = duration def get_width(self) -> int: return self.width def get_height(self) -> int: return self.height def get_duration(self) -> float: return self.duration class Post(): def __init__(self) -> None: pass def get_tokens(self) -> list[Token]: return [] def get_parent_id(self) -> str | None: return None def get_post_date_iso(self) -> str: return datetime.now(timezone.utc).isoformat() def get_attachments(self) -> list[MediaInfo]: return [] def get_id(self) -> str: return '' def get_cw(self) -> str: return '' def get_languages(self) -> list[str]: return [] def is_sensitive(self) -> bool: return False # generic input service. # user and service for db queries class Input(): def __init__(self, service: str, user_id: str, settings: dict, db: DataBaseWorker) -> None: self.service = service self.user_id = user_id self.settings = settings self.db = db async def listen(self, outputs: list, handler: Callable[[Post], Any]): pass class Output(): def __init__(self, input: Input, settings: dict, db: DataBaseWorker) -> None: self.input = input self.settings = settings self.db = db def accept_post(self, post: Post): LOGGER.warning('Not Implemented.. "posted" %s', post.get_id()) def delete_post(self, identifier: str): LOGGER.warning('Not Implemented.. "deleted" %s', identifier) def accept_repost(self, repost_id: str, reposted_id: str): LOGGER.warning('Not Implemented.. "reblogged" %s, %s', repost_id, reposted_id) def delete_repost(self, repost_id: str): LOGGER.warning('Not Implemented.. "removed reblog" %s', repost_id) def accept_quote(self, quote: Post, quoted_id: str): LOGGER.warning('Not Implemented.. "quoted" %s, %s', quote.get_id(), quoted_id) def delete_quote(self, quote_id: str): LOGGER.warning('Not Implemented.. "removed quote" %s', quote_id) def test_filters(tokens: list[Token], filters: list[re.Pattern[str]]): if not tokens or not filters: return True markdown = '' for token in tokens: if isinstance(token, TextToken): markdown += token.text elif isinstance(token, LinkToken): markdown += f'[{token.label}]({token.href})' elif isinstance(token, TagToken): markdown += '#' + token.tag elif isinstance(token, MentionToken): markdown += token.username for filter in filters: if filter.search(markdown): return False return True def tokenize_markdown(text: str, tags: list[str], handles: list[tuple[str, str]]) -> list[Token]: if not text: return [] index: int = 0 total: int = len(text) buffer: list[str] = [] tokens: list[Token] = [] def flush(): nonlocal buffer if buffer: tokens.append(TextToken(''.join(buffer))) buffer = [] while index < total: if text[index] == '[': md_inline = MD_INLINE_LINK.match(text, index) if md_inline: flush() label = md_inline.group(1) href = md_inline.group(2) tokens.append(LinkToken(href, label)) index = md_inline.end() continue if text[index] == '<': md_auto = MD_AUTOLINK.match(text, index) if md_auto: flush() href = md_auto.group(1) tokens.append(LinkToken(href, href)) index = md_auto.end() continue if text[index] == '#': tag = HASHTAG.match(text, index) if tag: tag_text = tag.group(1) if tag_text.lower() in tags: flush() tokens.append(TagToken(tag_text)) index = tag.end() continue if text[index] == '@': handle = FEDIVERSE_HANDLE.match(text, index) if handle: handle_text = handle.group(0) stripped_handle = handle_text.strip() match = next( (pair for pair in handles if stripped_handle in pair), None ) if match: flush() tokens.append(MentionToken(match[1], '')) # TODO: misskey doesn’t provide a uri index = handle.end() continue url = URL.match(text, index) if url: flush() href = url.group(0) tokens.append(LinkToken(href, href)) index = url.end() continue buffer.append(text[index]) index += 1 flush() return tokens def split_tokens(tokens: list[Token], max_chars: int, max_link_len: int = 35) -> list[list[Token]]: def new_block(): nonlocal blocks, block, length if block: blocks.append(block) block = [] length = 0 def append_text(text_segment): nonlocal block # if the last element in the current block is also text, just append to it if block and isinstance(block[-1], TextToken): block[-1].text += text_segment else: block.append(TextToken(text_segment)) blocks: list[list[Token]] = [] block: list[Token] = [] length = 0 for tk in tokens: # other token types are currently not supported if isinstance(tk, TagToken): tag_len = 1 + len(tk.tag) # (#) + tag if length + tag_len > max_chars: new_block() # create new block if the current one is too large block.append(tk) length += tag_len elif isinstance(tk, LinkToken): # TODO labels should proably be split too link_len = len(tk.label) if canonical_label(tk.label, tk.href): # cut down the link if the label is canonical link_len = min(link_len, max_link_len) if length + link_len > max_chars: new_block() block.append(tk) length += link_len elif isinstance(tk, TextToken): segments: list[str] = ALTERNATE.findall(tk.text) for seg in segments: seg_len: int = len(seg) if length + seg_len <= max_chars - (0 if seg.isspace() else 1): append_text(seg) length += seg_len continue if length > 0: new_block() if not seg.isspace(): while len(seg) > max_chars - 1: chunk = seg[: max_chars - 1] + "-" append_text(chunk) new_block() seg = seg[max_chars - 1 :] else: while len(seg) > max_chars: chunk = seg[: max_chars] append_text(chunk) new_block() seg = seg[max_chars :] if seg: append_text(seg) length = len(seg) if block: blocks.append(block) return blocks