social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
1from typing import Callable, Any 2from database import DataBaseWorker 3from datetime import datetime, timezone 4from media_util import MediaInfo 5from util import LOGGER 6import util 7import re 8 9ALTERNATE = re.compile(r'\S+|\s+') 10URL = re.compile(r'(?:(?:[A-Za-z][A-Za-z0-9+.-]*://)|mailto:)[^\s]+', re.IGNORECASE) 11MD_INLINE_LINK = re.compile(r"\[([^\]]+)\]\(\s*((?:(?:[A-Za-z][A-Za-z0-9+.\-]*://)|mailto:)[^\s\)]+)\s*\)", re.IGNORECASE) 12MD_AUTOLINK = re.compile(r"<((?:(?:[A-Za-z][A-Za-z0-9+.\-]*://)|mailto:)[^\s>]+)>", re.IGNORECASE) 13HASHTAG = re.compile(r'(?<!\w)\#([\w]+)') 14FEDIVERSE_HANDLE = re.compile(r'(?<![\w@])@([\w\.-]+)(?:@([\w\.-]+\.[\w\.-]+))?') 15 16# generic token 17class Token(): 18 def __init__(self, type: str) -> None: 19 self.type = type 20 21class TextToken(Token): 22 def __init__(self, text: str) -> None: 23 super().__init__('text') 24 self.text = text 25 26# token that represents a link to a website. e.g. [link](https://google.com/) 27class LinkToken(Token): 28 def __init__(self, href: str, label: str) -> None: 29 super().__init__('link') 30 self.href = href 31 self.label = label 32 33# token that represents a hashtag. e.g. #SocialMedia 34class TagToken(Token): 35 def __init__(self, tag: str) -> None: 36 super().__init__('tag') 37 self.tag = tag 38 39# token that represents a mention of a user. 40class MentionToken(Token): 41 def __init__(self, username: str, uri: str) -> None: 42 super().__init__('mention') 43 self.username = username 44 self.uri = uri 45 46class MediaMeta(): 47 def __init__(self, width: int, height: int, duration: float) -> None: 48 self.width = width 49 self.height = height 50 self.duration = duration 51 52 def get_width(self) -> int: 53 return self.width 54 55 def get_height(self) -> int: 56 return self.height 57 58 def get_duration(self) -> float: 59 return self.duration 60 61class Post(): 62 def __init__(self) -> None: 63 pass 64 65 def get_tokens(self) -> list[Token]: 66 return [] 67 68 def get_parent_id(self) -> str | None: 69 return None 70 71 def get_post_date_iso(self) -> str: 72 return datetime.now(timezone.utc).isoformat() 73 74 def get_attachments(self) -> list[MediaInfo]: 75 return [] 76 77 def get_id(self) -> str: 78 return '' 79 80 def get_cw(self) -> str: 81 return '' 82 83 def get_languages(self) -> list[str]: 84 return [] 85 86 def is_sensitive(self) -> bool: 87 return False 88 89# generic input service. 90# user and service for db queries 91class Input(): 92 def __init__(self, service: str, user_id: str, settings: dict, db: DataBaseWorker) -> None: 93 self.service = service 94 self.user_id = user_id 95 self.settings = settings 96 self.db = db 97 98 async def listen(self, outputs: list, handler: Callable[[Post], Any]): 99 pass 100 101class Output(): 102 def __init__(self, input: Input, settings: dict, db: DataBaseWorker) -> None: 103 self.input = input 104 self.settings = settings 105 self.db = db 106 107 def accept_post(self, post: Post): 108 LOGGER.warning('Not Implemented.. "posted" %s', post.get_id()) 109 110 def delete_post(self, identifier: str): 111 LOGGER.warning('Not Implemented.. "deleted" %s', identifier) 112 113 def accept_repost(self, repost_id: str, reposted_id: str): 114 LOGGER.warning('Not Implemented.. "reblogged" %s, %s', repost_id, reposted_id) 115 116 def delete_repost(self, repost_id: str): 117 LOGGER.warning('Not Implemented.. "removed reblog" %s', repost_id) 118 119 def accept_quote(self, quote: Post, quoted_id: str): 120 LOGGER.warning('Not Implemented.. "quoted" %s, %s', quote.get_id(), quoted_id) 121 122 def delete_quote(self, quote_id: str): 123 LOGGER.warning('Not Implemented.. "removed quote" %s', quote_id) 124 125def test_filters(tokens: list[Token], filters: list[re.Pattern[str]]): 126 if not tokens or not filters: 127 return True 128 129 markdown = '' 130 131 for token in tokens: 132 if isinstance(token, TextToken): 133 markdown += token.text 134 elif isinstance(token, LinkToken): 135 markdown += f'[{token.label}]({token.href})' 136 elif isinstance(token, TagToken): 137 markdown += '#' + token.tag 138 elif isinstance(token, MentionToken): 139 markdown += token.username 140 141 for filter in filters: 142 if filter.search(markdown): 143 return False 144 145 return True 146 147def tokenize_markdown(text: str, tags: list[str], handles: list[tuple[str, str]]) -> list[Token]: 148 if not text: 149 return [] 150 151 index: int = 0 152 total: int = len(text) 153 buffer: list[str] = [] 154 155 tokens: list[Token] = [] 156 157 def flush(): 158 nonlocal buffer 159 if buffer: 160 tokens.append(TextToken(''.join(buffer))) 161 buffer = [] 162 163 while index < total: 164 if text[index] == '[': 165 md_inline = MD_INLINE_LINK.match(text, index) 166 if md_inline: 167 flush() 168 label = md_inline.group(1) 169 href = md_inline.group(2) 170 tokens.append(LinkToken(href, label)) 171 index = md_inline.end() 172 continue 173 174 if text[index] == '<': 175 md_auto = MD_AUTOLINK.match(text, index) 176 if md_auto: 177 flush() 178 href = md_auto.group(1) 179 tokens.append(LinkToken(href, href)) 180 index = md_auto.end() 181 continue 182 183 if text[index] == '#': 184 tag = HASHTAG.match(text, index) 185 if tag: 186 tag_text = tag.group(1) 187 if tag_text.lower() in tags: 188 flush() 189 tokens.append(TagToken(tag_text)) 190 index = tag.end() 191 continue 192 193 if text[index] == '@': 194 handle = FEDIVERSE_HANDLE.match(text, index) 195 if handle: 196 handle_text = handle.group(0) 197 stripped_handle = handle_text.strip() 198 199 match = next( 200 (pair for pair in handles if stripped_handle in pair), 201 None 202 ) 203 204 if match: 205 flush() 206 tokens.append(MentionToken(match[1], '')) # TODO: misskey doesn’t provide a uri 207 index = handle.end() 208 continue 209 210 url = URL.match(text, index) 211 if url: 212 flush() 213 href = url.group(0) 214 tokens.append(LinkToken(href, href)) 215 index = url.end() 216 continue 217 218 buffer.append(text[index]) 219 index += 1 220 221 flush() 222 return tokens 223 224def split_tokens(tokens: list[Token], max_chars: int, max_link_len: int = 35) -> list[list[Token]]: 225 def start_new_block(): 226 nonlocal current_block, blocks, current_length 227 if current_block: 228 blocks.append(current_block) 229 current_block = [] 230 current_length = 0 231 232 def append_text_to_block(text_segment): 233 nonlocal current_block 234 # if the last element in the current block is also text, just append to it 235 if current_block and isinstance(current_block[-1], TextToken): 236 current_block[-1].text += text_segment 237 else: 238 current_block.append(TextToken(text_segment)) 239 240 blocks: list[list[Token]] = [] 241 current_block: list[Token] = [] 242 current_length: int = 0 243 244 for token in tokens: 245 if isinstance(token, TextToken): 246 # split content into alternating “words” (\S+) and “whitespace” (\s+). 247 # this ensures every space/newline is treated as its own segment. 248 segments: list[str] = ALTERNATE.findall(token.text) 249 250 for seg in segments: 251 if seg.isspace(): 252 # whitespace segment: we count it, and if it doesn't fully fit, 253 # split the whitespace across blocks to preserve exact spacing. 254 seg_len: int = len(seg) 255 while seg_len > 0: 256 space_left = max_chars - current_length 257 if space_left == 0: 258 start_new_block() 259 continue 260 261 take = min(space_left, seg_len) 262 part = seg[:take] 263 append_text_to_block(part) 264 265 current_length += len(part) 266 seg = seg[take:] 267 seg_len -= take 268 269 if current_length == max_chars: 270 start_new_block() 271 272 else: 273 # seg is a “word” (no whitespace inside). 274 word: str = seg 275 wlen: int = len(word) 276 277 # if the word itself is longer than n, we must split it with hyphens. 278 if wlen > max_chars: 279 # first, if we're in the middle of a block, close it & start fresh. 280 if current_length > 0: 281 start_new_block() 282 283 remaining = word 284 # carve off (n-1)-sized chunks + “-” so each chunk is n chars. 285 while len(remaining) > (max_chars - 1): 286 chunk = remaining[: max_chars - 1] + '-' 287 append_text_to_block(chunk) 288 # that chunk fills the current block 289 start_new_block() 290 remaining = remaining[max_chars - 1 :] 291 292 # now whatever remains is ≤ n characters 293 if remaining: 294 append_text_to_block(remaining) 295 current_length = len(remaining) 296 297 else: 298 # word fits fully within a block (≤ n). 299 if current_length + wlen <= max_chars: 300 append_text_to_block(word) 301 current_length += wlen 302 else: 303 # not enough space in current block → start a new one 304 start_new_block() 305 append_text_to_block(word) 306 current_length = wlen 307 308 elif isinstance(token, LinkToken): 309 link_len = len(token.label) 310 if util.canonical_label(token.label, token.href): 311 link_len = min(link_len, max_link_len) 312 313 if current_length + link_len <= max_chars: 314 current_block.append(token) 315 current_length += link_len 316 else: 317 start_new_block() 318 current_block.append(token) 319 current_length = link_len 320 321 elif isinstance(token, TagToken): 322 # we treat a hashtag like “#tagname” for counting. 323 hashtag_len = 1 + len(token.tag) 324 if current_length + hashtag_len <= max_chars: 325 current_block.append(token) 326 current_length += hashtag_len 327 else: 328 start_new_block() 329 current_block.append(token) 330 current_length = hashtag_len 331 332 else: 333 # if you happen to have other types, just append them without affecting length. 334 current_block.append(token) 335 336 # append any remaining tokens as the final block 337 if current_block: 338 blocks.append(current_block) 339 340 return blocks