social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
1from abc import ABC, abstractmethod 2from typing import Callable, Any 3from util.database import DataBaseWorker 4from datetime import datetime, timezone 5from util.media import MediaInfo 6from util.util import LOGGER, canonical_label 7import re 8 9ALTERNATE = re.compile(r'\S+|\s+') 10 11# generic token 12class Token(): 13 def __init__(self, type: str) -> None: 14 self.type = type 15 16class TextToken(Token): 17 def __init__(self, text: str) -> None: 18 super().__init__('text') 19 self.text = text 20 21# token that represents a link to a website. e.g. [link](https://google.com/) 22class LinkToken(Token): 23 def __init__(self, href: str, label: str) -> None: 24 super().__init__('link') 25 self.href = href 26 self.label = label 27 28# token that represents a hashtag. e.g. #SocialMedia 29class TagToken(Token): 30 def __init__(self, tag: str) -> None: 31 super().__init__('tag') 32 self.tag = tag 33 34# token that represents a mention of a user. 35class MentionToken(Token): 36 def __init__(self, username: str, uri: str) -> None: 37 super().__init__('mention') 38 self.username = username 39 self.uri = uri 40 41class MediaMeta(): 42 def __init__(self, width: int, height: int, duration: float) -> None: 43 self.width = width 44 self.height = height 45 self.duration = duration 46 47 def get_width(self) -> int: 48 return self.width 49 50 def get_height(self) -> int: 51 return self.height 52 53 def get_duration(self) -> float: 54 return self.duration 55 56class Post(ABC): 57 @abstractmethod 58 def get_id(self) -> str: 59 return '' 60 61 @abstractmethod 62 def get_parent_id(self) -> str | None: 63 pass 64 65 @abstractmethod 66 def get_tokens(self) -> list[Token]: 67 pass 68 69 # returns input text type. 70 # text/plain, text/markdown, text/x.misskeymarkdown 71 @abstractmethod 72 def get_text_type(self) -> str: 73 pass 74 75 # post iso timestamp 76 @abstractmethod 77 def get_timestamp(self) -> str: 78 pass 79 80 def get_attachments(self) -> list[MediaInfo]: 81 return [] 82 83 def get_spoiler(self) -> str | None: 84 return None 85 86 def get_languages(self) -> list[str]: 87 return [] 88 89 def is_sensitive(self) -> bool: 90 return False 91 92 def get_post_url(self) -> str | None: 93 return None 94 95# generic input service. 96# user and service for db queries 97class Input(): 98 def __init__(self, service: str, user_id: str, settings: dict, db: DataBaseWorker) -> None: 99 self.service = service 100 self.user_id = user_id 101 self.settings = settings 102 self.db = db 103 104 async def listen(self, outputs: list, handler: Callable[[Post], Any]): 105 pass 106 107class Output(): 108 def __init__(self, input: Input, settings: dict, db: DataBaseWorker) -> None: 109 self.input = input 110 self.settings = settings 111 self.db = db 112 113 def accept_post(self, post: Post): 114 LOGGER.warning('Not Implemented.. "posted" %s', post.get_id()) 115 116 def delete_post(self, identifier: str): 117 LOGGER.warning('Not Implemented.. "deleted" %s', identifier) 118 119 def accept_repost(self, repost_id: str, reposted_id: str): 120 LOGGER.warning('Not Implemented.. "reblogged" %s, %s', repost_id, reposted_id) 121 122 def delete_repost(self, repost_id: str): 123 LOGGER.warning('Not Implemented.. "removed reblog" %s', repost_id) 124 125def test_filters(tokens: list[Token], filters: list[re.Pattern[str]]): 126 if not tokens or not filters: 127 return True 128 129 markdown = '' 130 131 for token in tokens: 132 if isinstance(token, TextToken): 133 markdown += token.text 134 elif isinstance(token, LinkToken): 135 markdown += f'[{token.label}]({token.href})' 136 elif isinstance(token, TagToken): 137 markdown += '#' + token.tag 138 elif isinstance(token, MentionToken): 139 markdown += token.username 140 141 for filter in filters: 142 if filter.search(markdown): 143 return False 144 145 return True 146 147def split_tokens(tokens: list[Token], max_chars: int, max_link_len: int = 35) -> list[list[Token]]: 148 def new_block(): 149 nonlocal blocks, block, length 150 if block: 151 blocks.append(block) 152 block = [] 153 length = 0 154 155 def append_text(text_segment): 156 nonlocal block 157 # if the last element in the current block is also text, just append to it 158 if block and isinstance(block[-1], TextToken): 159 block[-1].text += text_segment 160 else: 161 block.append(TextToken(text_segment)) 162 163 blocks: list[list[Token]] = [] 164 block: list[Token] = [] 165 length = 0 166 167 for tk in tokens: 168 if isinstance(tk, TagToken): 169 tag_len = 1 + len(tk.tag) # (#) + tag 170 if length + tag_len > max_chars: 171 new_block() # create new block if the current one is too large 172 173 block.append(tk) 174 length += tag_len 175 elif isinstance(tk, LinkToken): # TODO labels should proably be split too 176 link_len = len(tk.label) 177 if canonical_label(tk.label, tk.href): # cut down the link if the label is canonical 178 link_len = min(link_len, max_link_len) 179 180 if length + link_len > max_chars: 181 new_block() 182 block.append(tk) 183 length += link_len 184 elif isinstance(tk, TextToken): 185 segments: list[str] = ALTERNATE.findall(tk.text) 186 187 for seg in segments: 188 seg_len: int = len(seg) 189 if length + seg_len <= max_chars - (0 if seg.isspace() else 1): 190 append_text(seg) 191 length += seg_len 192 continue 193 194 if length > 0: 195 new_block() 196 197 if not seg.isspace(): 198 while len(seg) > max_chars - 1: 199 chunk = seg[: max_chars - 1] + "-" 200 append_text(chunk) 201 new_block() 202 seg = seg[max_chars - 1 :] 203 else: 204 while len(seg) > max_chars: 205 chunk = seg[: max_chars] 206 append_text(chunk) 207 new_block() 208 seg = seg[max_chars :] 209 210 if seg: 211 append_text(seg) 212 length = len(seg) 213 else: #TODO fix mentions 214 block.append(tk) 215 216 if block: 217 blocks.append(block) 218 219 return blocks