social media crossposting tool. 3rd time's the charm
mastodon
misskey
crossposting
bluesky
1from typing import Callable, Any
2from util.database import DataBaseWorker
3from datetime import datetime, timezone
4from util.media import MediaInfo
5from util.util import LOGGER, canonical_label
6import re
7
8ALTERNATE = re.compile(r'\S+|\s+')
9URL = re.compile(r'(?:(?:[A-Za-z][A-Za-z0-9+.-]*://)|mailto:)[^\s]+', re.IGNORECASE)
10MD_INLINE_LINK = re.compile(r"\[([^\]]+)\]\(\s*((?:(?:[A-Za-z][A-Za-z0-9+.\-]*://)|mailto:)[^\s\)]+)\s*\)", re.IGNORECASE)
11MD_AUTOLINK = re.compile(r"<((?:(?:[A-Za-z][A-Za-z0-9+.\-]*://)|mailto:)[^\s>]+)>", re.IGNORECASE)
12HASHTAG = re.compile(r'(?<!\w)\#([\w]+)')
13FEDIVERSE_HANDLE = re.compile(r'(?<![\w@])@([\w\.-]+)(?:@([\w\.-]+\.[\w\.-]+))?')
14
15# generic token
16class Token():
17 def __init__(self, type: str) -> None:
18 self.type = type
19
20class TextToken(Token):
21 def __init__(self, text: str) -> None:
22 super().__init__('text')
23 self.text = text
24
25# token that represents a link to a website. e.g. [link](https://google.com/)
26class LinkToken(Token):
27 def __init__(self, href: str, label: str) -> None:
28 super().__init__('link')
29 self.href = href
30 self.label = label
31
32# token that represents a hashtag. e.g. #SocialMedia
33class TagToken(Token):
34 def __init__(self, tag: str) -> None:
35 super().__init__('tag')
36 self.tag = tag
37
38# token that represents a mention of a user.
39class MentionToken(Token):
40 def __init__(self, username: str, uri: str) -> None:
41 super().__init__('mention')
42 self.username = username
43 self.uri = uri
44
45class MediaMeta():
46 def __init__(self, width: int, height: int, duration: float) -> None:
47 self.width = width
48 self.height = height
49 self.duration = duration
50
51 def get_width(self) -> int:
52 return self.width
53
54 def get_height(self) -> int:
55 return self.height
56
57 def get_duration(self) -> float:
58 return self.duration
59
60class Post():
61 def __init__(self) -> None:
62 pass
63
64 def get_tokens(self) -> list[Token]:
65 return []
66
67 def get_parent_id(self) -> str | None:
68 return None
69
70 def get_post_date_iso(self) -> str:
71 return datetime.now(timezone.utc).isoformat()
72
73 def get_attachments(self) -> list[MediaInfo]:
74 return []
75
76 def get_id(self) -> str:
77 return ''
78
79 def get_cw(self) -> str:
80 return ''
81
82 def get_languages(self) -> list[str]:
83 return []
84
85 def is_sensitive(self) -> bool:
86 return False
87
88# generic input service.
89# user and service for db queries
90class Input():
91 def __init__(self, service: str, user_id: str, settings: dict, db: DataBaseWorker) -> None:
92 self.service = service
93 self.user_id = user_id
94 self.settings = settings
95 self.db = db
96
97 async def listen(self, outputs: list, handler: Callable[[Post], Any]):
98 pass
99
100class Output():
101 def __init__(self, input: Input, settings: dict, db: DataBaseWorker) -> None:
102 self.input = input
103 self.settings = settings
104 self.db = db
105
106 def accept_post(self, post: Post):
107 LOGGER.warning('Not Implemented.. "posted" %s', post.get_id())
108
109 def delete_post(self, identifier: str):
110 LOGGER.warning('Not Implemented.. "deleted" %s', identifier)
111
112 def accept_repost(self, repost_id: str, reposted_id: str):
113 LOGGER.warning('Not Implemented.. "reblogged" %s, %s', repost_id, reposted_id)
114
115 def delete_repost(self, repost_id: str):
116 LOGGER.warning('Not Implemented.. "removed reblog" %s', repost_id)
117
118 def accept_quote(self, quote: Post, quoted_id: str):
119 LOGGER.warning('Not Implemented.. "quoted" %s, %s', quote.get_id(), quoted_id)
120
121 def delete_quote(self, quote_id: str):
122 LOGGER.warning('Not Implemented.. "removed quote" %s', quote_id)
123
124def test_filters(tokens: list[Token], filters: list[re.Pattern[str]]):
125 if not tokens or not filters:
126 return True
127
128 markdown = ''
129
130 for token in tokens:
131 if isinstance(token, TextToken):
132 markdown += token.text
133 elif isinstance(token, LinkToken):
134 markdown += f'[{token.label}]({token.href})'
135 elif isinstance(token, TagToken):
136 markdown += '#' + token.tag
137 elif isinstance(token, MentionToken):
138 markdown += token.username
139
140 for filter in filters:
141 if filter.search(markdown):
142 return False
143
144 return True
145
146def tokenize_markdown(text: str, tags: list[str], handles: list[tuple[str, str]]) -> list[Token]:
147 if not text:
148 return []
149
150 index: int = 0
151 total: int = len(text)
152 buffer: list[str] = []
153
154 tokens: list[Token] = []
155
156 def flush():
157 nonlocal buffer
158 if buffer:
159 tokens.append(TextToken(''.join(buffer)))
160 buffer = []
161
162 while index < total:
163 if text[index] == '[':
164 md_inline = MD_INLINE_LINK.match(text, index)
165 if md_inline:
166 flush()
167 label = md_inline.group(1)
168 href = md_inline.group(2)
169 tokens.append(LinkToken(href, label))
170 index = md_inline.end()
171 continue
172
173 if text[index] == '<':
174 md_auto = MD_AUTOLINK.match(text, index)
175 if md_auto:
176 flush()
177 href = md_auto.group(1)
178 tokens.append(LinkToken(href, href))
179 index = md_auto.end()
180 continue
181
182 if text[index] == '#':
183 tag = HASHTAG.match(text, index)
184 if tag:
185 tag_text = tag.group(1)
186 if tag_text.lower() in tags:
187 flush()
188 tokens.append(TagToken(tag_text))
189 index = tag.end()
190 continue
191
192 if text[index] == '@':
193 handle = FEDIVERSE_HANDLE.match(text, index)
194 if handle:
195 handle_text = handle.group(0)
196 stripped_handle = handle_text.strip()
197
198 match = next(
199 (pair for pair in handles if stripped_handle in pair),
200 None
201 )
202
203 if match:
204 flush()
205 tokens.append(MentionToken(match[1], '')) # TODO: misskey doesn’t provide a uri
206 index = handle.end()
207 continue
208
209 url = URL.match(text, index)
210 if url:
211 flush()
212 href = url.group(0)
213 tokens.append(LinkToken(href, href))
214 index = url.end()
215 continue
216
217 buffer.append(text[index])
218 index += 1
219
220 flush()
221 return tokens
222
223def split_tokens(tokens: list[Token], max_chars: int, max_link_len: int = 35) -> list[list[Token]]:
224 def new_block():
225 nonlocal blocks, block, length
226 if block:
227 blocks.append(block)
228 block = []
229 length = 0
230
231 def append_text(text_segment):
232 nonlocal block
233 # if the last element in the current block is also text, just append to it
234 if block and isinstance(block[-1], TextToken):
235 block[-1].text += text_segment
236 else:
237 block.append(TextToken(text_segment))
238
239 blocks: list[list[Token]] = []
240 block: list[Token] = []
241 length = 0
242
243 for tk in tokens: # other token types are currently not supported
244 if isinstance(tk, TagToken):
245 tag_len = 1 + len(tk.tag) # (#) + tag
246 if length + tag_len > max_chars:
247 new_block() # create new block if the current one is too large
248
249 block.append(tk)
250 length += tag_len
251 elif isinstance(tk, LinkToken): # TODO labels should proably be split too
252 link_len = len(tk.label)
253 if canonical_label(tk.label, tk.href): # cut down the link if the label is canonical
254 link_len = min(link_len, max_link_len)
255
256 if length + link_len > max_chars:
257 new_block()
258 block.append(tk)
259 length += link_len
260 elif isinstance(tk, TextToken):
261 segments: list[str] = ALTERNATE.findall(tk.text)
262
263 for seg in segments:
264 seg_len: int = len(seg)
265 if length + seg_len <= max_chars - (0 if seg.isspace() else 1):
266 append_text(seg)
267 length += seg_len
268 continue
269
270 if length > 0:
271 new_block()
272
273 if not seg.isspace():
274 while len(seg) > max_chars - 1:
275 chunk = seg[: max_chars - 1] + "-"
276 append_text(chunk)
277 new_block()
278 seg = seg[max_chars - 1 :]
279 else:
280 while len(seg) > max_chars:
281 chunk = seg[: max_chars]
282 append_text(chunk)
283 new_block()
284 seg = seg[max_chars :]
285
286 if seg:
287 append_text(seg)
288 length = len(seg)
289
290 if block:
291 blocks.append(block)
292
293 return blocks