import re from abc import ABC, abstractmethod from datetime import datetime, timezone from typing import Any, Callable from util.database import DataBaseWorker from util.media import MediaInfo from util.util import LOGGER, canonical_label ALTERNATE = re.compile(r"\S+|\s+") # generic token class Token: def __init__(self, type: str) -> None: self.type = type class TextToken(Token): def __init__(self, text: str) -> None: super().__init__("text") self.text = text # token that represents a link to a website. e.g. [link](https://google.com/) class LinkToken(Token): def __init__(self, href: str, label: str) -> None: super().__init__("link") self.href = href self.label = label # token that represents a hashtag. e.g. #SocialMedia class TagToken(Token): def __init__(self, tag: str) -> None: super().__init__("tag") self.tag = tag # token that represents a mention of a user. class MentionToken(Token): def __init__(self, username: str, uri: str) -> None: super().__init__("mention") self.username = username self.uri = uri class MediaMeta: def __init__(self, width: int, height: int, duration: float) -> None: self.width = width self.height = height self.duration = duration def get_width(self) -> int: return self.width def get_height(self) -> int: return self.height def get_duration(self) -> float: return self.duration class Post(ABC): @abstractmethod def get_id(self) -> str: return "" @abstractmethod def get_parent_id(self) -> str | None: pass @abstractmethod def get_tokens(self) -> list[Token]: pass # returns input text type. # text/plain, text/markdown, text/x.misskeymarkdown @abstractmethod def get_text_type(self) -> str: pass # post iso timestamp @abstractmethod def get_timestamp(self) -> str: pass def get_attachments(self) -> list[MediaInfo]: return [] def get_spoiler(self) -> str | None: return None def get_languages(self) -> list[str]: return [] def is_sensitive(self) -> bool: return False def get_post_url(self) -> str | None: return None # generic input service. # user and service for db queries class Input: def __init__( self, service: str, user_id: str, settings: dict, db: DataBaseWorker ) -> None: self.service = service self.user_id = user_id self.settings = settings self.db = db async def listen(self, outputs: list, handler: Callable[[Post], Any]): pass class Output: def __init__(self, input: Input, settings: dict, db: DataBaseWorker) -> None: self.input = input self.settings = settings self.db = db def accept_post(self, post: Post): LOGGER.warning('Not Implemented.. "posted" %s', post.get_id()) def delete_post(self, identifier: str): LOGGER.warning('Not Implemented.. "deleted" %s', identifier) def accept_repost(self, repost_id: str, reposted_id: str): LOGGER.warning('Not Implemented.. "reblogged" %s, %s', repost_id, reposted_id) def delete_repost(self, repost_id: str): LOGGER.warning('Not Implemented.. "removed reblog" %s', repost_id) def test_filters(tokens: list[Token], filters: list[re.Pattern[str]]): if not tokens or not filters: return True markdown = "" for token in tokens: if isinstance(token, TextToken): markdown += token.text elif isinstance(token, LinkToken): markdown += f"[{token.label}]({token.href})" elif isinstance(token, TagToken): markdown += "#" + token.tag elif isinstance(token, MentionToken): markdown += token.username for filter in filters: if filter.search(markdown): return False return True def split_tokens( tokens: list[Token], max_chars: int, max_link_len: int = 35 ) -> list[list[Token]]: def new_block(): nonlocal blocks, block, length if block: blocks.append(block) block = [] length = 0 def append_text(text_segment): nonlocal block # if the last element in the current block is also text, just append to it if block and isinstance(block[-1], TextToken): block[-1].text += text_segment else: block.append(TextToken(text_segment)) blocks: list[list[Token]] = [] block: list[Token] = [] length = 0 for tk in tokens: if isinstance(tk, TagToken): tag_len = 1 + len(tk.tag) # (#) + tag if length + tag_len > max_chars: new_block() # create new block if the current one is too large block.append(tk) length += tag_len elif isinstance(tk, LinkToken): # TODO labels should proably be split too link_len = len(tk.label) if canonical_label( tk.label, tk.href ): # cut down the link if the label is canonical link_len = min(link_len, max_link_len) if length + link_len > max_chars: new_block() block.append(tk) length += link_len elif isinstance(tk, TextToken): segments: list[str] = ALTERNATE.findall(tk.text) for seg in segments: seg_len: int = len(seg) if length + seg_len <= max_chars - (0 if seg.isspace() else 1): append_text(seg) length += seg_len continue if length > 0: new_block() if not seg.isspace(): while len(seg) > max_chars - 1: chunk = seg[: max_chars - 1] + "-" append_text(chunk) new_block() seg = seg[max_chars - 1 :] else: while len(seg) > max_chars: chunk = seg[:max_chars] append_text(chunk) new_block() seg = seg[max_chars:] if seg: append_text(seg) length = len(seg) else: # TODO fix mentions block.append(tk) if block: blocks.append(block) return blocks