social media crossposting tool. 3rd time's the charm
mastodon
misskey
crossposting
bluesky
1import re
2from abc import ABC, abstractmethod
3from datetime import datetime, timezone
4from typing import Any, Callable
5
6from util.database import DataBaseWorker
7from util.media import MediaInfo
8from util.util import LOGGER, canonical_label
9
10ALTERNATE = re.compile(r"\S+|\s+")
11
12
13# generic token
14class Token:
15 def __init__(self, type: str) -> None:
16 self.type = type
17
18
19class TextToken(Token):
20 def __init__(self, text: str) -> None:
21 super().__init__("text")
22 self.text = text
23
24
25# token that represents a link to a website. e.g. [link](https://google.com/)
26class LinkToken(Token):
27 def __init__(self, href: str, label: str) -> None:
28 super().__init__("link")
29 self.href = href
30 self.label = label
31
32
33# token that represents a hashtag. e.g. #SocialMedia
34class TagToken(Token):
35 def __init__(self, tag: str) -> None:
36 super().__init__("tag")
37 self.tag = tag
38
39
40# token that represents a mention of a user.
41class MentionToken(Token):
42 def __init__(self, username: str, uri: str) -> None:
43 super().__init__("mention")
44 self.username = username
45 self.uri = uri
46
47
48class MediaMeta:
49 def __init__(self, width: int, height: int, duration: float) -> None:
50 self.width = width
51 self.height = height
52 self.duration = duration
53
54 def get_width(self) -> int:
55 return self.width
56
57 def get_height(self) -> int:
58 return self.height
59
60 def get_duration(self) -> float:
61 return self.duration
62
63
64class Post(ABC):
65 @abstractmethod
66 def get_id(self) -> str:
67 return ""
68
69 @abstractmethod
70 def get_parent_id(self) -> str | None:
71 pass
72
73 @abstractmethod
74 def get_tokens(self) -> list[Token]:
75 pass
76
77 # returns input text type.
78 # text/plain, text/markdown, text/x.misskeymarkdown
79 @abstractmethod
80 def get_text_type(self) -> str:
81 pass
82
83 # post iso timestamp
84 @abstractmethod
85 def get_timestamp(self) -> str:
86 pass
87
88 def get_attachments(self) -> list[MediaInfo]:
89 return []
90
91 def get_spoiler(self) -> str | None:
92 return None
93
94 def get_languages(self) -> list[str]:
95 return []
96
97 def is_sensitive(self) -> bool:
98 return False
99
100 def get_post_url(self) -> str | None:
101 return None
102
103
104# generic input service.
105# user and service for db queries
106class Input:
107 def __init__(
108 self, service: str, user_id: str, settings: dict, db: DataBaseWorker
109 ) -> None:
110 self.service = service
111 self.user_id = user_id
112 self.settings = settings
113 self.db = db
114
115 async def listen(self, outputs: list, handler: Callable[[Post], Any]):
116 pass
117
118
119class Output:
120 def __init__(self, input: Input, settings: dict, db: DataBaseWorker) -> None:
121 self.input = input
122 self.settings = settings
123 self.db = db
124
125 def accept_post(self, post: Post):
126 LOGGER.warning('Not Implemented.. "posted" %s', post.get_id())
127
128 def delete_post(self, identifier: str):
129 LOGGER.warning('Not Implemented.. "deleted" %s', identifier)
130
131 def accept_repost(self, repost_id: str, reposted_id: str):
132 LOGGER.warning('Not Implemented.. "reblogged" %s, %s', repost_id, reposted_id)
133
134 def delete_repost(self, repost_id: str):
135 LOGGER.warning('Not Implemented.. "removed reblog" %s', repost_id)
136
137
138def test_filters(tokens: list[Token], filters: list[re.Pattern[str]]):
139 if not tokens or not filters:
140 return True
141
142 markdown = ""
143
144 for token in tokens:
145 if isinstance(token, TextToken):
146 markdown += token.text
147 elif isinstance(token, LinkToken):
148 markdown += f"[{token.label}]({token.href})"
149 elif isinstance(token, TagToken):
150 markdown += "#" + token.tag
151 elif isinstance(token, MentionToken):
152 markdown += token.username
153
154 for filter in filters:
155 if filter.search(markdown):
156 return False
157
158 return True
159
160
161def split_tokens(
162 tokens: list[Token], max_chars: int, max_link_len: int = 35
163) -> list[list[Token]]:
164 def new_block():
165 nonlocal blocks, block, length
166 if block:
167 blocks.append(block)
168 block = []
169 length = 0
170
171 def append_text(text_segment):
172 nonlocal block
173 # if the last element in the current block is also text, just append to it
174 if block and isinstance(block[-1], TextToken):
175 block[-1].text += text_segment
176 else:
177 block.append(TextToken(text_segment))
178
179 blocks: list[list[Token]] = []
180 block: list[Token] = []
181 length = 0
182
183 for tk in tokens:
184 if isinstance(tk, TagToken):
185 tag_len = 1 + len(tk.tag) # (#) + tag
186 if length + tag_len > max_chars:
187 new_block() # create new block if the current one is too large
188
189 block.append(tk)
190 length += tag_len
191 elif isinstance(tk, LinkToken): # TODO labels should proably be split too
192 link_len = len(tk.label)
193 if canonical_label(
194 tk.label, tk.href
195 ): # cut down the link if the label is canonical
196 link_len = min(link_len, max_link_len)
197
198 if length + link_len > max_chars:
199 new_block()
200 block.append(tk)
201 length += link_len
202 elif isinstance(tk, TextToken):
203 segments: list[str] = ALTERNATE.findall(tk.text)
204
205 for seg in segments:
206 seg_len: int = len(seg)
207 if length + seg_len <= max_chars - (0 if seg.isspace() else 1):
208 append_text(seg)
209 length += seg_len
210 continue
211
212 if length > 0:
213 new_block()
214
215 if not seg.isspace():
216 while len(seg) > max_chars - 1:
217 chunk = seg[: max_chars - 1] + "-"
218 append_text(chunk)
219 new_block()
220 seg = seg[max_chars - 1 :]
221 else:
222 while len(seg) > max_chars:
223 chunk = seg[:max_chars]
224 append_text(chunk)
225 new_block()
226 seg = seg[max_chars:]
227
228 if seg:
229 append_text(seg)
230 length = len(seg)
231 else: # TODO fix mentions
232 block.append(tk)
233
234 if block:
235 blocks.append(block)
236
237 return blocks