social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
at master 6.5 kB view raw
1import re 2from abc import ABC, abstractmethod 3from datetime import datetime, timezone 4from typing import Any, Callable 5 6from util.database import DataBaseWorker 7from util.media import MediaInfo 8from util.util import LOGGER, canonical_label 9 10ALTERNATE = re.compile(r"\S+|\s+") 11 12 13# generic token 14class Token: 15 def __init__(self, type: str) -> None: 16 self.type = type 17 18 19class TextToken(Token): 20 def __init__(self, text: str) -> None: 21 super().__init__("text") 22 self.text = text 23 24 25# token that represents a link to a website. e.g. [link](https://google.com/) 26class LinkToken(Token): 27 def __init__(self, href: str, label: str) -> None: 28 super().__init__("link") 29 self.href = href 30 self.label = label 31 32 33# token that represents a hashtag. e.g. #SocialMedia 34class TagToken(Token): 35 def __init__(self, tag: str) -> None: 36 super().__init__("tag") 37 self.tag = tag 38 39 40# token that represents a mention of a user. 41class MentionToken(Token): 42 def __init__(self, username: str, uri: str) -> None: 43 super().__init__("mention") 44 self.username = username 45 self.uri = uri 46 47 48class MediaMeta: 49 def __init__(self, width: int, height: int, duration: float) -> None: 50 self.width = width 51 self.height = height 52 self.duration = duration 53 54 def get_width(self) -> int: 55 return self.width 56 57 def get_height(self) -> int: 58 return self.height 59 60 def get_duration(self) -> float: 61 return self.duration 62 63 64class Post(ABC): 65 @abstractmethod 66 def get_id(self) -> str: 67 return "" 68 69 @abstractmethod 70 def get_parent_id(self) -> str | None: 71 pass 72 73 @abstractmethod 74 def get_tokens(self) -> list[Token]: 75 pass 76 77 # returns input text type. 78 # text/plain, text/markdown, text/x.misskeymarkdown 79 @abstractmethod 80 def get_text_type(self) -> str: 81 pass 82 83 # post iso timestamp 84 @abstractmethod 85 def get_timestamp(self) -> str: 86 pass 87 88 def get_attachments(self) -> list[MediaInfo]: 89 return [] 90 91 def get_spoiler(self) -> str | None: 92 return None 93 94 def get_languages(self) -> list[str]: 95 return [] 96 97 def is_sensitive(self) -> bool: 98 return False 99 100 def get_post_url(self) -> str | None: 101 return None 102 103 104# generic input service. 105# user and service for db queries 106class Input: 107 def __init__( 108 self, service: str, user_id: str, settings: dict, db: DataBaseWorker 109 ) -> None: 110 self.service = service 111 self.user_id = user_id 112 self.settings = settings 113 self.db = db 114 115 async def listen(self, outputs: list, handler: Callable[[Post], Any]): 116 pass 117 118 119class Output: 120 def __init__(self, input: Input, settings: dict, db: DataBaseWorker) -> None: 121 self.input = input 122 self.settings = settings 123 self.db = db 124 125 def accept_post(self, post: Post): 126 LOGGER.warning('Not Implemented.. "posted" %s', post.get_id()) 127 128 def delete_post(self, identifier: str): 129 LOGGER.warning('Not Implemented.. "deleted" %s', identifier) 130 131 def accept_repost(self, repost_id: str, reposted_id: str): 132 LOGGER.warning('Not Implemented.. "reblogged" %s, %s', repost_id, reposted_id) 133 134 def delete_repost(self, repost_id: str): 135 LOGGER.warning('Not Implemented.. "removed reblog" %s', repost_id) 136 137 138def test_filters(tokens: list[Token], filters: list[re.Pattern[str]]): 139 if not tokens or not filters: 140 return True 141 142 markdown = "" 143 144 for token in tokens: 145 if isinstance(token, TextToken): 146 markdown += token.text 147 elif isinstance(token, LinkToken): 148 markdown += f"[{token.label}]({token.href})" 149 elif isinstance(token, TagToken): 150 markdown += "#" + token.tag 151 elif isinstance(token, MentionToken): 152 markdown += token.username 153 154 for filter in filters: 155 if filter.search(markdown): 156 return False 157 158 return True 159 160 161def split_tokens( 162 tokens: list[Token], max_chars: int, max_link_len: int = 35 163) -> list[list[Token]]: 164 def new_block(): 165 nonlocal blocks, block, length 166 if block: 167 blocks.append(block) 168 block = [] 169 length = 0 170 171 def append_text(text_segment): 172 nonlocal block 173 # if the last element in the current block is also text, just append to it 174 if block and isinstance(block[-1], TextToken): 175 block[-1].text += text_segment 176 else: 177 block.append(TextToken(text_segment)) 178 179 blocks: list[list[Token]] = [] 180 block: list[Token] = [] 181 length = 0 182 183 for tk in tokens: 184 if isinstance(tk, TagToken): 185 tag_len = 1 + len(tk.tag) # (#) + tag 186 if length + tag_len > max_chars: 187 new_block() # create new block if the current one is too large 188 189 block.append(tk) 190 length += tag_len 191 elif isinstance(tk, LinkToken): # TODO labels should proably be split too 192 link_len = len(tk.label) 193 if canonical_label( 194 tk.label, tk.href 195 ): # cut down the link if the label is canonical 196 link_len = min(link_len, max_link_len) 197 198 if length + link_len > max_chars: 199 new_block() 200 block.append(tk) 201 length += link_len 202 elif isinstance(tk, TextToken): 203 segments: list[str] = ALTERNATE.findall(tk.text) 204 205 for seg in segments: 206 seg_len: int = len(seg) 207 if length + seg_len <= max_chars - (0 if seg.isspace() else 1): 208 append_text(seg) 209 length += seg_len 210 continue 211 212 if length > 0: 213 new_block() 214 215 if not seg.isspace(): 216 while len(seg) > max_chars - 1: 217 chunk = seg[: max_chars - 1] + "-" 218 append_text(chunk) 219 new_block() 220 seg = seg[max_chars - 1 :] 221 else: 222 while len(seg) > max_chars: 223 chunk = seg[:max_chars] 224 append_text(chunk) 225 new_block() 226 seg = seg[max_chars:] 227 228 if seg: 229 append_text(seg) 230 length = len(seg) 231 else: # TODO fix mentions 232 block.append(tk) 233 234 if block: 235 blocks.append(block) 236 237 return blocks