social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
1from typing import Callable, Any 2from util.database import DataBaseWorker 3from datetime import datetime, timezone 4from util.media import MediaInfo 5from util.util import LOGGER, canonical_label 6import re 7 8ALTERNATE = re.compile(r'\S+|\s+') 9URL = re.compile(r'(?:(?:[A-Za-z][A-Za-z0-9+.-]*://)|mailto:)[^\s]+', re.IGNORECASE) 10MD_INLINE_LINK = re.compile(r"\[([^\]]+)\]\(\s*((?:(?:[A-Za-z][A-Za-z0-9+.\-]*://)|mailto:)[^\s\)]+)\s*\)", re.IGNORECASE) 11MD_AUTOLINK = re.compile(r"<((?:(?:[A-Za-z][A-Za-z0-9+.\-]*://)|mailto:)[^\s>]+)>", re.IGNORECASE) 12HASHTAG = re.compile(r'(?<!\w)\#([\w]+)') 13FEDIVERSE_HANDLE = re.compile(r'(?<![\w@])@([\w\.-]+)(?:@([\w\.-]+\.[\w\.-]+))?') 14 15# generic token 16class Token(): 17 def __init__(self, type: str) -> None: 18 self.type = type 19 20class TextToken(Token): 21 def __init__(self, text: str) -> None: 22 super().__init__('text') 23 self.text = text 24 25# token that represents a link to a website. e.g. [link](https://google.com/) 26class LinkToken(Token): 27 def __init__(self, href: str, label: str) -> None: 28 super().__init__('link') 29 self.href = href 30 self.label = label 31 32# token that represents a hashtag. e.g. #SocialMedia 33class TagToken(Token): 34 def __init__(self, tag: str) -> None: 35 super().__init__('tag') 36 self.tag = tag 37 38# token that represents a mention of a user. 39class MentionToken(Token): 40 def __init__(self, username: str, uri: str) -> None: 41 super().__init__('mention') 42 self.username = username 43 self.uri = uri 44 45class MediaMeta(): 46 def __init__(self, width: int, height: int, duration: float) -> None: 47 self.width = width 48 self.height = height 49 self.duration = duration 50 51 def get_width(self) -> int: 52 return self.width 53 54 def get_height(self) -> int: 55 return self.height 56 57 def get_duration(self) -> float: 58 return self.duration 59 60class Post(): 61 def __init__(self) -> None: 62 pass 63 64 def get_tokens(self) -> list[Token]: 65 return [] 66 67 def get_parent_id(self) -> str | None: 68 return None 69 70 def get_post_date_iso(self) -> str: 71 return datetime.now(timezone.utc).isoformat() 72 73 def get_attachments(self) -> list[MediaInfo]: 74 return [] 75 76 def get_id(self) -> str: 77 return '' 78 79 def get_cw(self) -> str: 80 return '' 81 82 def get_languages(self) -> list[str]: 83 return [] 84 85 def is_sensitive(self) -> bool: 86 return False 87 88# generic input service. 89# user and service for db queries 90class Input(): 91 def __init__(self, service: str, user_id: str, settings: dict, db: DataBaseWorker) -> None: 92 self.service = service 93 self.user_id = user_id 94 self.settings = settings 95 self.db = db 96 97 async def listen(self, outputs: list, handler: Callable[[Post], Any]): 98 pass 99 100class Output(): 101 def __init__(self, input: Input, settings: dict, db: DataBaseWorker) -> None: 102 self.input = input 103 self.settings = settings 104 self.db = db 105 106 def accept_post(self, post: Post): 107 LOGGER.warning('Not Implemented.. "posted" %s', post.get_id()) 108 109 def delete_post(self, identifier: str): 110 LOGGER.warning('Not Implemented.. "deleted" %s', identifier) 111 112 def accept_repost(self, repost_id: str, reposted_id: str): 113 LOGGER.warning('Not Implemented.. "reblogged" %s, %s', repost_id, reposted_id) 114 115 def delete_repost(self, repost_id: str): 116 LOGGER.warning('Not Implemented.. "removed reblog" %s', repost_id) 117 118 def accept_quote(self, quote: Post, quoted_id: str): 119 LOGGER.warning('Not Implemented.. "quoted" %s, %s', quote.get_id(), quoted_id) 120 121 def delete_quote(self, quote_id: str): 122 LOGGER.warning('Not Implemented.. "removed quote" %s', quote_id) 123 124def test_filters(tokens: list[Token], filters: list[re.Pattern[str]]): 125 if not tokens or not filters: 126 return True 127 128 markdown = '' 129 130 for token in tokens: 131 if isinstance(token, TextToken): 132 markdown += token.text 133 elif isinstance(token, LinkToken): 134 markdown += f'[{token.label}]({token.href})' 135 elif isinstance(token, TagToken): 136 markdown += '#' + token.tag 137 elif isinstance(token, MentionToken): 138 markdown += token.username 139 140 for filter in filters: 141 if filter.search(markdown): 142 return False 143 144 return True 145 146def tokenize_markdown(text: str, tags: list[str], handles: list[tuple[str, str]]) -> list[Token]: 147 if not text: 148 return [] 149 150 index: int = 0 151 total: int = len(text) 152 buffer: list[str] = [] 153 154 tokens: list[Token] = [] 155 156 def flush(): 157 nonlocal buffer 158 if buffer: 159 tokens.append(TextToken(''.join(buffer))) 160 buffer = [] 161 162 while index < total: 163 if text[index] == '[': 164 md_inline = MD_INLINE_LINK.match(text, index) 165 if md_inline: 166 flush() 167 label = md_inline.group(1) 168 href = md_inline.group(2) 169 tokens.append(LinkToken(href, label)) 170 index = md_inline.end() 171 continue 172 173 if text[index] == '<': 174 md_auto = MD_AUTOLINK.match(text, index) 175 if md_auto: 176 flush() 177 href = md_auto.group(1) 178 tokens.append(LinkToken(href, href)) 179 index = md_auto.end() 180 continue 181 182 if text[index] == '#': 183 tag = HASHTAG.match(text, index) 184 if tag: 185 tag_text = tag.group(1) 186 if tag_text.lower() in tags: 187 flush() 188 tokens.append(TagToken(tag_text)) 189 index = tag.end() 190 continue 191 192 if text[index] == '@': 193 handle = FEDIVERSE_HANDLE.match(text, index) 194 if handle: 195 handle_text = handle.group(0) 196 stripped_handle = handle_text.strip() 197 198 match = next( 199 (pair for pair in handles if stripped_handle in pair), 200 None 201 ) 202 203 if match: 204 flush() 205 tokens.append(MentionToken(match[1], '')) # TODO: misskey doesn’t provide a uri 206 index = handle.end() 207 continue 208 209 url = URL.match(text, index) 210 if url: 211 flush() 212 href = url.group(0) 213 tokens.append(LinkToken(href, href)) 214 index = url.end() 215 continue 216 217 buffer.append(text[index]) 218 index += 1 219 220 flush() 221 return tokens 222 223def split_tokens(tokens: list[Token], max_chars: int, max_link_len: int = 35) -> list[list[Token]]: 224 def new_block(): 225 nonlocal blocks, block, length 226 if block: 227 blocks.append(block) 228 block = [] 229 length = 0 230 231 def append_text(text_segment): 232 nonlocal block 233 # if the last element in the current block is also text, just append to it 234 if block and isinstance(block[-1], TextToken): 235 block[-1].text += text_segment 236 else: 237 block.append(TextToken(text_segment)) 238 239 blocks: list[list[Token]] = [] 240 block: list[Token] = [] 241 length = 0 242 243 for tk in tokens: # other token types are currently not supported 244 if isinstance(tk, TagToken): 245 tag_len = 1 + len(tk.tag) # (#) + tag 246 if length + tag_len > max_chars: 247 new_block() # create new block if the current one is too large 248 249 block.append(tk) 250 length += tag_len 251 elif isinstance(tk, LinkToken): # TODO labels should proably be split too 252 link_len = len(tk.label) 253 if canonical_label(tk.label, tk.href): # cut down the link if the label is canonical 254 link_len = min(link_len, max_link_len) 255 256 if length + link_len > max_chars: 257 new_block() 258 block.append(tk) 259 length += link_len 260 elif isinstance(tk, TextToken): 261 segments: list[str] = ALTERNATE.findall(tk.text) 262 263 for seg in segments: 264 seg_len: int = len(seg) 265 if length + seg_len <= max_chars - (0 if seg.isspace() else 1): 266 append_text(seg) 267 length += seg_len 268 continue 269 270 if length > 0: 271 new_block() 272 273 if not seg.isspace(): 274 while len(seg) > max_chars - 1: 275 chunk = seg[: max_chars - 1] + "-" 276 append_text(chunk) 277 new_block() 278 seg = seg[max_chars - 1 :] 279 else: 280 while len(seg) > max_chars: 281 chunk = seg[: max_chars] 282 append_text(chunk) 283 new_block() 284 seg = seg[max_chars :] 285 286 if seg: 287 append_text(seg) 288 length = len(seg) 289 290 if block: 291 blocks.append(block) 292 293 return blocks