social media crossposting tool. 3rd time's the charm
mastodon
misskey
crossposting
bluesky
1from typing import Callable, Any
2from database import DataBaseWorker
3from datetime import datetime, timezone
4from media_util import MediaInfo
5from util import LOGGER
6import util
7import re
8
9ALTERNATE = re.compile(r'\S+|\s+')
10URL = re.compile(r'(?:(?:[A-Za-z][A-Za-z0-9+.-]*://)|mailto:)[^\s]+', re.IGNORECASE)
11MD_INLINE_LINK = re.compile(r"\[([^\]]+)\]\(\s*((?:(?:[A-Za-z][A-Za-z0-9+.\-]*://)|mailto:)[^\s\)]+)\s*\)", re.IGNORECASE)
12MD_AUTOLINK = re.compile(r"<((?:(?:[A-Za-z][A-Za-z0-9+.\-]*://)|mailto:)[^\s>]+)>", re.IGNORECASE)
13HASHTAG = re.compile(r'(?<!\w)\#([\w]+)')
14FEDIVERSE_HANDLE = re.compile(r'(?<![\w@])@([\w\.-]+)(?:@([\w\.-]+\.[\w\.-]+))?')
15
16# generic token
17class Token():
18 def __init__(self, type: str) -> None:
19 self.type = type
20
21class TextToken(Token):
22 def __init__(self, text: str) -> None:
23 super().__init__('text')
24 self.text = text
25
26# token that represents a link to a website. e.g. [link](https://google.com/)
27class LinkToken(Token):
28 def __init__(self, href: str, label: str) -> None:
29 super().__init__('link')
30 self.href = href
31 self.label = label
32
33# token that represents a hashtag. e.g. #SocialMedia
34class TagToken(Token):
35 def __init__(self, tag: str) -> None:
36 super().__init__('tag')
37 self.tag = tag
38
39# token that represents a mention of a user.
40class MentionToken(Token):
41 def __init__(self, username: str, uri: str) -> None:
42 super().__init__('mention')
43 self.username = username
44 self.uri = uri
45
46class MediaMeta():
47 def __init__(self, width: int, height: int, duration: float) -> None:
48 self.width = width
49 self.height = height
50 self.duration = duration
51
52 def get_width(self) -> int:
53 return self.width
54
55 def get_height(self) -> int:
56 return self.height
57
58 def get_duration(self) -> float:
59 return self.duration
60
61class Post():
62 def __init__(self) -> None:
63 pass
64
65 def get_tokens(self) -> list[Token]:
66 return []
67
68 def get_parent_id(self) -> str | None:
69 return None
70
71 def get_post_date_iso(self) -> str:
72 return datetime.now(timezone.utc).isoformat()
73
74 def get_attachments(self) -> list[MediaInfo]:
75 return []
76
77 def get_id(self) -> str:
78 return ''
79
80 def get_cw(self) -> str:
81 return ''
82
83 def get_languages(self) -> list[str]:
84 return []
85
86 def is_sensitive(self) -> bool:
87 return False
88
89# generic input service.
90# user and service for db queries
91class Input():
92 def __init__(self, service: str, user_id: str, settings: dict, db: DataBaseWorker) -> None:
93 self.service = service
94 self.user_id = user_id
95 self.settings = settings
96 self.db = db
97
98 async def listen(self, outputs: list, handler: Callable[[Post], Any]):
99 pass
100
101class Output():
102 def __init__(self, input: Input, settings: dict, db: DataBaseWorker) -> None:
103 self.input = input
104 self.settings = settings
105 self.db = db
106
107 def accept_post(self, post: Post):
108 LOGGER.warning('Not Implemented.. "posted" %s', post.get_id())
109
110 def delete_post(self, identifier: str):
111 LOGGER.warning('Not Implemented.. "deleted" %s', identifier)
112
113 def accept_repost(self, repost_id: str, reposted_id: str):
114 LOGGER.warning('Not Implemented.. "reblogged" %s, %s', repost_id, reposted_id)
115
116 def delete_repost(self, repost_id: str):
117 LOGGER.warning('Not Implemented.. "removed reblog" %s', repost_id)
118
119 def accept_quote(self, quote: Post, quoted_id: str):
120 LOGGER.warning('Not Implemented.. "quoted" %s, %s', quote.get_id(), quoted_id)
121
122 def delete_quote(self, quote_id: str):
123 LOGGER.warning('Not Implemented.. "removed quote" %s', quote_id)
124
125def test_filters(tokens: list[Token], filters: list[re.Pattern[str]]):
126 if not tokens or not filters:
127 return True
128
129 markdown = ''
130
131 for token in tokens:
132 if isinstance(token, TextToken):
133 markdown += token.text
134 elif isinstance(token, LinkToken):
135 markdown += f'[{token.label}]({token.href})'
136 elif isinstance(token, TagToken):
137 markdown += '#' + token.tag
138 elif isinstance(token, MentionToken):
139 markdown += token.username
140
141 for filter in filters:
142 if filter.search(markdown):
143 return False
144
145 return True
146
147def tokenize_markdown(text: str, tags: list[str], handles: list[tuple[str, str]]) -> list[Token]:
148 if not text:
149 return []
150
151 index: int = 0
152 total: int = len(text)
153 buffer: list[str] = []
154
155 tokens: list[Token] = []
156
157 def flush():
158 nonlocal buffer
159 if buffer:
160 tokens.append(TextToken(''.join(buffer)))
161 buffer = []
162
163 while index < total:
164 if text[index] == '[':
165 md_inline = MD_INLINE_LINK.match(text, index)
166 if md_inline:
167 flush()
168 label = md_inline.group(1)
169 href = md_inline.group(2)
170 tokens.append(LinkToken(href, label))
171 index = md_inline.end()
172 continue
173
174 if text[index] == '<':
175 md_auto = MD_AUTOLINK.match(text, index)
176 if md_auto:
177 flush()
178 href = md_auto.group(1)
179 tokens.append(LinkToken(href, href))
180 index = md_auto.end()
181 continue
182
183 if text[index] == '#':
184 tag = HASHTAG.match(text, index)
185 if tag:
186 tag_text = tag.group(1)
187 if tag_text.lower() in tags:
188 flush()
189 tokens.append(TagToken(tag_text))
190 index = tag.end()
191 continue
192
193 if text[index] == '@':
194 handle = FEDIVERSE_HANDLE.match(text, index)
195 if handle:
196 handle_text = handle.group(0)
197 stripped_handle = handle_text.strip()
198
199 match = next(
200 (pair for pair in handles if stripped_handle in pair),
201 None
202 )
203
204 if match:
205 flush()
206 tokens.append(MentionToken(match[1], '')) # TODO: misskey doesn’t provide a uri
207 index = handle.end()
208 continue
209
210 url = URL.match(text, index)
211 if url:
212 flush()
213 href = url.group(0)
214 tokens.append(LinkToken(href, href))
215 index = url.end()
216 continue
217
218 buffer.append(text[index])
219 index += 1
220
221 flush()
222 return tokens
223
224def split_tokens(tokens: list[Token], max_chars: int, max_link_len: int = 35) -> list[list[Token]]:
225 def start_new_block():
226 nonlocal current_block, blocks, current_length
227 if current_block:
228 blocks.append(current_block)
229 current_block = []
230 current_length = 0
231
232 def append_text_to_block(text_segment):
233 nonlocal current_block
234 # if the last element in the current block is also text, just append to it
235 if current_block and isinstance(current_block[-1], TextToken):
236 current_block[-1].text += text_segment
237 else:
238 current_block.append(TextToken(text_segment))
239
240 blocks: list[list[Token]] = []
241 current_block: list[Token] = []
242 current_length: int = 0
243
244 for token in tokens:
245 if isinstance(token, TextToken):
246 # split content into alternating “words” (\S+) and “whitespace” (\s+).
247 # this ensures every space/newline is treated as its own segment.
248 segments: list[str] = ALTERNATE.findall(token.text)
249
250 for seg in segments:
251 if seg.isspace():
252 # whitespace segment: we count it, and if it doesn't fully fit,
253 # split the whitespace across blocks to preserve exact spacing.
254 seg_len: int = len(seg)
255 while seg_len > 0:
256 space_left = max_chars - current_length
257 if space_left == 0:
258 start_new_block()
259 continue
260
261 take = min(space_left, seg_len)
262 part = seg[:take]
263 append_text_to_block(part)
264
265 current_length += len(part)
266 seg = seg[take:]
267 seg_len -= take
268
269 if current_length == max_chars:
270 start_new_block()
271
272 else:
273 # seg is a “word” (no whitespace inside).
274 word: str = seg
275 wlen: int = len(word)
276
277 # if the word itself is longer than n, we must split it with hyphens.
278 if wlen > max_chars:
279 # first, if we're in the middle of a block, close it & start fresh.
280 if current_length > 0:
281 start_new_block()
282
283 remaining = word
284 # carve off (n-1)-sized chunks + “-” so each chunk is n chars.
285 while len(remaining) > (max_chars - 1):
286 chunk = remaining[: max_chars - 1] + '-'
287 append_text_to_block(chunk)
288 # that chunk fills the current block
289 start_new_block()
290 remaining = remaining[max_chars - 1 :]
291
292 # now whatever remains is ≤ n characters
293 if remaining:
294 append_text_to_block(remaining)
295 current_length = len(remaining)
296
297 else:
298 # word fits fully within a block (≤ n).
299 if current_length + wlen <= max_chars:
300 append_text_to_block(word)
301 current_length += wlen
302 else:
303 # not enough space in current block → start a new one
304 start_new_block()
305 append_text_to_block(word)
306 current_length = wlen
307
308 elif isinstance(token, LinkToken):
309 link_len = len(token.label)
310 if util.canonical_label(token.label, token.href):
311 link_len = min(link_len, max_link_len)
312
313 if current_length + link_len <= max_chars:
314 current_block.append(token)
315 current_length += link_len
316 else:
317 start_new_block()
318 current_block.append(token)
319 current_length = link_len
320
321 elif isinstance(token, TagToken):
322 # we treat a hashtag like “#tagname” for counting.
323 hashtag_len = 1 + len(token.tag)
324 if current_length + hashtag_len <= max_chars:
325 current_block.append(token)
326 current_length += hashtag_len
327 else:
328 start_new_block()
329 current_block.append(token)
330 current_length = hashtag_len
331
332 else:
333 # if you happen to have other types, just append them without affecting length.
334 current_block.append(token)
335
336 # append any remaining tokens as the final block
337 if current_block:
338 blocks.append(current_block)
339
340 return blocks