social media crossposting tool. 3rd time's the charm
mastodon
misskey
crossposting
bluesky
1from util import LOGGER
2import requests, websockets
3import util, media_util, json, cross
4import database
5from database import DataBaseWorker
6from typing import Callable, Any
7import asyncio
8
9from bs4 import BeautifulSoup, Tag
10from bs4.element import NavigableString
11from markdownify import markdownify as md
12
13FORMATS = {
14 'video': 'video',
15 'image': 'image',
16 'gifv': 'gif',
17 'audio': 'audio',
18 'unknown': 'other'
19}
20
21def tokenize_post(status: dict) -> list[cross.Token]:
22 soup = BeautifulSoup(status['content'], "html.parser")
23 tokens: list[cross.Token] = []
24
25 tags: list[dict] = status.get('tags', [])
26 mentions: list[dict] = status.get('mentions', [])
27
28 def mdd(html):
29 return md(html, escape_asterisks=False, escape_underscores=False)
30
31 def recurse(node) -> None:
32 if isinstance(node, NavigableString):
33 tokens.append(cross.TextToken(str(node)))
34 return
35
36 if isinstance(node, Tag):
37 if node.name.lower() == "a":
38 href = node.get("href", "")
39 inner_html = "".join(str(c) for c in node.contents)
40 link_text_md = mdd(inner_html)
41
42 if link_text_md.startswith('@'):
43 as_mention = link_text_md[1:]
44 for block in mentions:
45 if href == block.get('url'):
46 tokens.append(cross.MentionToken(block['acct'], block['url']))
47 return
48 elif as_mention == block.get('acct') or as_mention == block.get('username'):
49 tokens.append(cross.MentionToken(block['acct'], block['url']))
50 return
51
52 if link_text_md.startswith('#'):
53 as_tag = link_text_md[1:].lower()
54 if any(as_tag == block.get('name') for block in tags):
55 tokens.append(cross.TagToken(link_text_md[1:]))
56 return
57
58 # idk if we can safely convert this to string
59 tokens.append(cross.LinkToken(str(href), link_text_md))
60 return
61
62 if node.find("a") is not None:
63 for child in node.contents:
64 recurse(child)
65 return
66
67 serialized = str(node)
68 markdownified = mdd(serialized)
69 if markdownified:
70 tokens.append(cross.TextToken(markdownified))
71 return
72 return
73
74 for child in soup.contents:
75 recurse(child)
76
77 return tokens
78
79class MastodonPost(cross.Post):
80 def __init__(self, status: dict) -> None:
81 super().__init__()
82 self.status = status
83 media_attachments: list[cross.MediaAttachment] = []
84
85 for attachment in status.get('media_attachments', []):
86 media_attachments.append(MastodonAttachment(attachment))
87
88 self.media_attachments = media_attachments
89
90 self.tokens = tokenize_post(status)
91
92 def get_tokens(self) -> list[cross.Token]:
93 return self.tokens
94
95 def get_parent_id(self) -> str | None:
96 return self.status.get('in_reply_to_id')
97
98 def get_cw(self) -> str:
99 return util.safe_get(self.status, 'spoiler_text', '')
100
101 def get_id(self) -> str:
102 return self.status['id']
103
104 def get_languages(self) -> list[str]:
105 if self.status.get('language'):
106 return [self.status['language']]
107 return []
108
109 def is_sensitive(self) -> bool:
110 return self.status.get('sensitive', False)
111
112 def get_attachments(self) -> list[cross.MediaAttachment]:
113 return self.media_attachments
114
115class MastodonAttachment(cross.MediaAttachment):
116 def __init__(self, attachment: dict) -> None:
117 super().__init__()
118 self.attachment = attachment
119
120 if attachment.get('type') == 'video' or attachment.get('type') == 'image':
121 if attachment.get('meta') and attachment.get('meta', {}).get('original'):
122 def from_status(bytes: bytes) -> cross.MediaMeta:
123 o_meta = attachment.get('meta', {}).get('original')
124 return cross.MediaMeta(o_meta['width'], o_meta['height'], o_meta.get('duration', -1))
125 self.meta_generator = from_status
126 else:
127 def from_bytes(bytes: bytes) -> cross.MediaMeta:
128 o_meta = media_util.get_media_meta(bytes)
129 return cross.MediaMeta(o_meta['width'], o_meta['height'], o_meta.get('duration', -1))
130 self.meta_generator = from_bytes
131
132 # URL to download the attachment from
133 def get_url(self) -> str:
134 return self.attachment.get('url', '')
135
136 # type of attachment
137 def get_type(self) -> str | None:
138 return FORMATS.get(self.attachment.get('type', 'other'), 'other')
139
140 # create file metadata from bytes or other
141 def create_meta(self, bytes: bytes) -> cross.MediaMeta:
142 if self.meta_generator:
143 return self.meta_generator(bytes)
144 return cross.MediaMeta(-1, -1, -1)
145
146 # get media description
147 def get_alt(self) -> str:
148 return util.safe_get(self.attachment, 'description', '')
149
150class MastodonInput(cross.Input):
151 def __init__(self, settings: dict, db: DataBaseWorker) -> None:
152 self.options = settings.get('options', {})
153 self.token = util.get_or_envvar(settings, 'token')
154 instance: str = util.get_or_envvar(settings, 'instance')
155
156 service = instance[:-1] if instance.endswith('/') else instance
157
158 LOGGER.info("Verifying %s credentails...", service)
159 responce = requests.get(f"{service}/api/v1/accounts/verify_credentials", headers={
160 'Authorization': f'Bearer {self.token}'
161 })
162 if responce.status_code != 200:
163 LOGGER.error("Failed to validate user credentials!")
164 responce.raise_for_status()
165 return
166
167 super().__init__(service, responce.json()["id"], settings, db)
168 self.streaming = self._get_streaming_url()
169
170 if not self.streaming:
171 raise Exception("Instance %s does not support streaming!", service)
172
173 def _get_streaming_url(self):
174 response = requests.get(f"{self.service}/api/v1/instance")
175 response.raise_for_status()
176 data: dict = response.json()
177 return util.safe_get(data, "urls", {}).get("streaming_api")
178
179 def _on_create_post(self, outputs: list[cross.Output], status: dict):
180 # skip events from other users
181 if util.safe_get(status, 'account', {})['id'] != self.user_id:
182 return
183
184 if status.get('reblog') or status.get('poll'):
185 # TODO polls not supported on bsky. maybe 3rd party? skip for now
186 # we don't handle reblogs. possible with bridgy(?) and self
187 LOGGER.info("Skipping '%s'! Reblog or poll..", status['id'])
188 return
189
190 in_reply: str | None = status.get('in_reply_to_id')
191 in_reply_to: str | None = status.get('in_reply_to_account_id')
192 if in_reply_to and in_reply_to != self.user_id:
193 # We don't support replies.
194 LOGGER.info("Skipping '%s'! Reply to other user..", status['id'])
195 return
196
197 if status.get('visibility') not in self.options.get('allowed_visibility', []):
198 # Skip f/o and direct posts
199 LOGGER.info("Skipping '%s'! '%s' visibility..", status['id'], status.get('visibility'))
200 return
201
202 root_id = None
203 parent_id = None
204 if in_reply:
205 parent_post = database.find_post(self.db, in_reply, self.user_id, self.service)
206 if not parent_post:
207 LOGGER.info("Skipping '%s' as parent post was not found in db!", status['id'])
208 return
209
210 root_id = parent_post['id']
211 parent_id = root_id
212 if parent_post['root_id']:
213 root_id = parent_post['root_id']
214
215 LOGGER.info("Crossposting '%s'...", status['id'])
216 if root_id and parent_id:
217 database.insert_reply(
218 self.db,
219 status['id'],
220 self.user_id,
221 self.service,
222 parent_id,
223 root_id
224 )
225 else:
226 database.insert_post(
227 self.db,
228 status['id'],
229 self.user_id,
230 self.service
231 )
232
233 cross_post = MastodonPost(status)
234 for output in outputs:
235 output.accept_post(cross_post)
236
237 def _on_delete_post(self, outputs: list[cross.Output], identifier: str):
238 post = database.find_post(self.db, identifier, self.user_id, self.service)
239 if not post:
240 return
241
242 LOGGER.info("Deleting '%s'...", identifier)
243 for output in outputs:
244 output.delete_post(identifier)
245 database.delete_post(self.db, identifier, self.user_id, self.service)
246
247 def _on_post(self, outputs: list[cross.Output], event: str, payload: str):
248 if event == 'update':
249 self._on_create_post(outputs, json.loads(payload))
250 elif event == 'delete':
251 self._on_delete_post(outputs, payload)
252
253 async def listen(self, outputs: list[cross.Output], submit: Callable[[Callable[[], Any]], Any]):
254 uri = f"{self.streaming}/api/v1/streaming?stream=user&access_token={self.token}"
255
256 async for ws in websockets.connect(uri, extra_headers={"User-Agent": "XPost/0.0.2"}):
257 try:
258 LOGGER.info("Listening to %s...", self.streaming)
259
260 async def listen_for_messages():
261 async for msg in ws:
262 data = json.loads(msg)
263 event: str = data.get('event')
264 payload: str = data.get('payload')
265
266 submit(lambda: self._on_post(outputs, str(event), str(payload)))
267
268 listen = asyncio.create_task(listen_for_messages())
269
270 await asyncio.gather(listen)
271 except websockets.ConnectionClosedError as e:
272 LOGGER.error(e, stack_info=True, exc_info=True)
273 LOGGER.info("Reconnecting to %s...", self.streaming)
274 continue