social media crossposting tool. 3rd time's the charm
mastodon
misskey
crossposting
bluesky
1from abc import ABC, abstractmethod
2from typing import Callable, Any
3from util.database import DataBaseWorker
4from datetime import datetime, timezone
5from util.media import MediaInfo
6from util.util import LOGGER, canonical_label
7import re
8
9ALTERNATE = re.compile(r'\S+|\s+')
10
11# generic token
12class Token():
13 def __init__(self, type: str) -> None:
14 self.type = type
15
16class TextToken(Token):
17 def __init__(self, text: str) -> None:
18 super().__init__('text')
19 self.text = text
20
21# token that represents a link to a website. e.g. [link](https://google.com/)
22class LinkToken(Token):
23 def __init__(self, href: str, label: str) -> None:
24 super().__init__('link')
25 self.href = href
26 self.label = label
27
28# token that represents a hashtag. e.g. #SocialMedia
29class TagToken(Token):
30 def __init__(self, tag: str) -> None:
31 super().__init__('tag')
32 self.tag = tag
33
34# token that represents a mention of a user.
35class MentionToken(Token):
36 def __init__(self, username: str, uri: str) -> None:
37 super().__init__('mention')
38 self.username = username
39 self.uri = uri
40
41class MediaMeta():
42 def __init__(self, width: int, height: int, duration: float) -> None:
43 self.width = width
44 self.height = height
45 self.duration = duration
46
47 def get_width(self) -> int:
48 return self.width
49
50 def get_height(self) -> int:
51 return self.height
52
53 def get_duration(self) -> float:
54 return self.duration
55
56class Post(ABC):
57 @abstractmethod
58 def get_id(self) -> str:
59 return ''
60
61 @abstractmethod
62 def get_parent_id(self) -> str | None:
63 pass
64
65 @abstractmethod
66 def get_tokens(self) -> list[Token]:
67 pass
68
69 # returns input text type.
70 # text/plain, text/markdown, text/x.misskeymarkdown
71 @abstractmethod
72 def get_text_type(self) -> str:
73 pass
74
75 # post iso timestamp
76 @abstractmethod
77 def get_timestamp(self) -> str:
78 pass
79
80 def get_attachments(self) -> list[MediaInfo]:
81 return []
82
83 def get_spoiler(self) -> str | None:
84 return None
85
86 def get_languages(self) -> list[str]:
87 return []
88
89 def is_sensitive(self) -> bool:
90 return False
91
92 def get_post_url(self) -> str | None:
93 return None
94
95# generic input service.
96# user and service for db queries
97class Input():
98 def __init__(self, service: str, user_id: str, settings: dict, db: DataBaseWorker) -> None:
99 self.service = service
100 self.user_id = user_id
101 self.settings = settings
102 self.db = db
103
104 async def listen(self, outputs: list, handler: Callable[[Post], Any]):
105 pass
106
107class Output():
108 def __init__(self, input: Input, settings: dict, db: DataBaseWorker) -> None:
109 self.input = input
110 self.settings = settings
111 self.db = db
112
113 def accept_post(self, post: Post):
114 LOGGER.warning('Not Implemented.. "posted" %s', post.get_id())
115
116 def delete_post(self, identifier: str):
117 LOGGER.warning('Not Implemented.. "deleted" %s', identifier)
118
119 def accept_repost(self, repost_id: str, reposted_id: str):
120 LOGGER.warning('Not Implemented.. "reblogged" %s, %s', repost_id, reposted_id)
121
122 def delete_repost(self, repost_id: str):
123 LOGGER.warning('Not Implemented.. "removed reblog" %s', repost_id)
124
125def test_filters(tokens: list[Token], filters: list[re.Pattern[str]]):
126 if not tokens or not filters:
127 return True
128
129 markdown = ''
130
131 for token in tokens:
132 if isinstance(token, TextToken):
133 markdown += token.text
134 elif isinstance(token, LinkToken):
135 markdown += f'[{token.label}]({token.href})'
136 elif isinstance(token, TagToken):
137 markdown += '#' + token.tag
138 elif isinstance(token, MentionToken):
139 markdown += token.username
140
141 for filter in filters:
142 if filter.search(markdown):
143 return False
144
145 return True
146
147def split_tokens(tokens: list[Token], max_chars: int, max_link_len: int = 35) -> list[list[Token]]:
148 def new_block():
149 nonlocal blocks, block, length
150 if block:
151 blocks.append(block)
152 block = []
153 length = 0
154
155 def append_text(text_segment):
156 nonlocal block
157 # if the last element in the current block is also text, just append to it
158 if block and isinstance(block[-1], TextToken):
159 block[-1].text += text_segment
160 else:
161 block.append(TextToken(text_segment))
162
163 blocks: list[list[Token]] = []
164 block: list[Token] = []
165 length = 0
166
167 for tk in tokens:
168 if isinstance(tk, TagToken):
169 tag_len = 1 + len(tk.tag) # (#) + tag
170 if length + tag_len > max_chars:
171 new_block() # create new block if the current one is too large
172
173 block.append(tk)
174 length += tag_len
175 elif isinstance(tk, LinkToken): # TODO labels should proably be split too
176 link_len = len(tk.label)
177 if canonical_label(tk.label, tk.href): # cut down the link if the label is canonical
178 link_len = min(link_len, max_link_len)
179
180 if length + link_len > max_chars:
181 new_block()
182 block.append(tk)
183 length += link_len
184 elif isinstance(tk, TextToken):
185 segments: list[str] = ALTERNATE.findall(tk.text)
186
187 for seg in segments:
188 seg_len: int = len(seg)
189 if length + seg_len <= max_chars - (0 if seg.isspace() else 1):
190 append_text(seg)
191 length += seg_len
192 continue
193
194 if length > 0:
195 new_block()
196
197 if not seg.isspace():
198 while len(seg) > max_chars - 1:
199 chunk = seg[: max_chars - 1] + "-"
200 append_text(chunk)
201 new_block()
202 seg = seg[max_chars - 1 :]
203 else:
204 while len(seg) > max_chars:
205 chunk = seg[: max_chars]
206 append_text(chunk)
207 new_block()
208 seg = seg[max_chars :]
209
210 if seg:
211 append_text(seg)
212 length = len(seg)
213 else: #TODO fix mentions
214 block.append(tk)
215
216 if block:
217 blocks.append(block)
218
219 return blocks