social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
1import re 2from abc import ABC, abstractmethod 3from datetime import datetime, timezone 4from typing import Any, Callable 5 6from util.database import DataBaseWorker 7from util.media import MediaInfo 8from util.util import LOGGER, canonical_label 9 10ALTERNATE = re.compile(r"\S+|\s+") 11 12 13# generic token 14class Token: 15 def __init__(self, type: str) -> None: 16 self.type = type 17 18 19class TextToken(Token): 20 def __init__(self, text: str) -> None: 21 super().__init__("text") 22 self.text = text 23 24 25# token that represents a link to a website. e.g. [link](https://google.com/) 26class LinkToken(Token): 27 def __init__(self, href: str, label: str) -> None: 28 super().__init__("link") 29 self.href = href 30 self.label = label 31 32 33# token that represents a hashtag. e.g. #SocialMedia 34class TagToken(Token): 35 def __init__(self, tag: str) -> None: 36 super().__init__("tag") 37 self.tag = tag 38 39 40# token that represents a mention of a user. 41class MentionToken(Token): 42 def __init__(self, username: str, uri: str) -> None: 43 super().__init__("mention") 44 self.username = username 45 self.uri = uri 46 47 48class MediaMeta: 49 def __init__(self, width: int, height: int, duration: float) -> None: 50 self.width = width 51 self.height = height 52 self.duration = duration 53 54 def get_width(self) -> int: 55 return self.width 56 57 def get_height(self) -> int: 58 return self.height 59 60 def get_duration(self) -> float: 61 return self.duration 62 63 64class Post(ABC): 65 @abstractmethod 66 def get_id(self) -> str: 67 return "" 68 69 @abstractmethod 70 def get_parent_id(self) -> str | None: 71 pass 72 73 @abstractmethod 74 def get_tokens(self) -> list[Token]: 75 pass 76 77 # returns input text type. 78 # text/plain, text/markdown, text/x.misskeymarkdown 79 @abstractmethod 80 def get_text_type(self) -> str: 81 pass 82 83 # post iso timestamp 84 @abstractmethod 85 def get_timestamp(self) -> str: 86 pass 87 88 def get_attachments(self) -> list[MediaInfo]: 89 return [] 90 91 def get_spoiler(self) -> str | None: 92 return None 93 94 def get_languages(self) -> list[str]: 95 return [] 96 97 def is_sensitive(self) -> bool: 98 return False 99 100 def get_post_url(self) -> str | None: 101 return None 102 103 104# generic input service. 105# user and service for db queries 106class Input: 107 def __init__( 108 self, service: str, user_id: str, settings: dict, db: DataBaseWorker 109 ) -> None: 110 self.service = service 111 self.user_id = user_id 112 self.settings = settings 113 self.db = db 114 115 async def listen(self, outputs: list, handler: Callable[[Post], Any]): 116 pass 117 118 119class Output: 120 def __init__(self, input: Input, settings: dict, db: DataBaseWorker) -> None: 121 self.input = input 122 self.settings = settings 123 self.db = db 124 125 def accept_post(self, post: Post): 126 LOGGER.warning('Not Implemented.. "posted" %s', post.get_id()) 127 128 def delete_post(self, identifier: str): 129 LOGGER.warning('Not Implemented.. "deleted" %s', identifier) 130 131 def accept_repost(self, repost_id: str, reposted_id: str): 132 LOGGER.warning('Not Implemented.. "reblogged" %s, %s', repost_id, reposted_id) 133 134 def delete_repost(self, repost_id: str): 135 LOGGER.warning('Not Implemented.. "removed reblog" %s', repost_id) 136 137 138def test_filters(tokens: list[Token], filters: list[re.Pattern[str]]): 139 if not tokens or not filters: 140 return True 141 142 markdown = "" 143 144 for token in tokens: 145 if isinstance(token, TextToken): 146 markdown += token.text 147 elif isinstance(token, LinkToken): 148 markdown += f"[{token.label}]({token.href})" 149 elif isinstance(token, TagToken): 150 markdown += "#" + token.tag 151 elif isinstance(token, MentionToken): 152 markdown += token.username 153 154 for filter in filters: 155 if filter.search(markdown): 156 return False 157 158 return True 159 160 161def split_tokens( 162 tokens: list[Token], max_chars: int, max_link_len: int = 35 163) -> list[list[Token]]: 164 def new_block(): 165 nonlocal blocks, block, length 166 if block: 167 blocks.append(block) 168 block = [] 169 length = 0 170 171 def append_text(text_segment): 172 nonlocal block 173 # if the last element in the current block is also text, just append to it 174 if block and isinstance(block[-1], TextToken): 175 block[-1].text += text_segment 176 else: 177 block.append(TextToken(text_segment)) 178 179 blocks: list[list[Token]] = [] 180 block: list[Token] = [] 181 length = 0 182 183 for tk in tokens: 184 if isinstance(tk, TagToken): 185 tag_len = 1 + len(tk.tag) # (#) + tag 186 if length + tag_len > max_chars: 187 new_block() # create new block if the current one is too large 188 189 block.append(tk) 190 length += tag_len 191 elif isinstance(tk, LinkToken): # TODO labels should proably be split too 192 link_len = len(tk.label) 193 if canonical_label( 194 tk.label, tk.href 195 ): # cut down the link if the label is canonical 196 link_len = min(link_len, max_link_len) 197 198 if length + link_len > max_chars: 199 new_block() 200 block.append(tk) 201 length += link_len 202 elif isinstance(tk, TextToken): 203 segments: list[str] = ALTERNATE.findall(tk.text) 204 205 for seg in segments: 206 seg_len: int = len(seg) 207 if length + seg_len <= max_chars - (0 if seg.isspace() else 1): 208 append_text(seg) 209 length += seg_len 210 continue 211 212 if length > 0: 213 new_block() 214 215 if not seg.isspace(): 216 while len(seg) > max_chars - 1: 217 chunk = seg[: max_chars - 1] + "-" 218 append_text(chunk) 219 new_block() 220 seg = seg[max_chars - 1 :] 221 else: 222 while len(seg) > max_chars: 223 chunk = seg[:max_chars] 224 append_text(chunk) 225 new_block() 226 seg = seg[max_chars:] 227 228 if seg: 229 append_text(seg) 230 length = len(seg) 231 else: # TODO fix mentions 232 block.append(tk) 233 234 if block: 235 blocks.append(block) 236 237 return blocks