social media crossposting tool. 3rd time's the charm
mastodon
misskey
crossposting
bluesky
1import cross, media_util, util, database
2from util import LOGGER
3import requests, websockets
4from typing import Callable, Any
5import asyncio
6import json, uuid
7import re
8
9
10class MisskeyPost(cross.Post):
11 def __init__(self, note: dict, tokens: list[cross.Token], files: list[media_util.MediaInfo]) -> None:
12 super().__init__()
13 self.note = note
14 self.sensitive = any([a.get('isSensitive', False) for a in note.get('files', [])])
15 self.media_attachments = files
16 self.tokens = tokens
17
18 def get_tokens(self) -> list[cross.Token]:
19 return self.tokens
20
21 def get_parent_id(self) -> str | None:
22 return self.note.get('replyId')
23
24 def get_post_date_iso(self) -> str:
25 date = self.note.get('createdAt')
26 return date or super().get_post_date_iso()
27
28 def get_attachments(self) -> list[media_util.MediaInfo]:
29 return self.media_attachments
30
31 def get_id(self) -> str:
32 return self.note['id']
33
34 def get_cw(self) -> str:
35 return self.note.get('cw') or ''
36
37 def get_languages(self) -> list[str]:
38 return []
39
40 def is_sensitive(self) -> bool:
41 return self.sensitive
42
43ALLOWED_VISIBILITY = ['public', 'home']
44
45class MisskeyInputOptions():
46 def __init__(self, o: dict) -> None:
47 self.allowed_visibility = ALLOWED_VISIBILITY
48 self.filters = [re.compile(f) for f in o.get('regex_filters', [])]
49
50 allowed_visibility = o.get('allowed_visibility')
51 if allowed_visibility is not None:
52 if any([v not in ALLOWED_VISIBILITY for v in allowed_visibility]):
53 raise ValueError(f"'allowed_visibility' only accepts {', '.join(ALLOWED_VISIBILITY)}, got: {allowed_visibility}")
54 self.allowed_visibility = allowed_visibility
55
56class MisskeyInput(cross.Input):
57 def __init__(self, settings: dict, db: cross.DataBaseWorker) -> None:
58 self.options = MisskeyInputOptions(settings.get('options', {}))
59 self.token = util.as_envvar(settings.get('token')) or (_ for _ in ()).throw(ValueError("'token' is required"))
60 instance: str = util.as_envvar(settings.get('instance')) or (_ for _ in ()).throw(ValueError("'instance' is required"))
61
62 service = instance[:-1] if instance.endswith('/') else instance
63
64 LOGGER.info("Verifying %s credentails...", service)
65 responce = requests.post(f"{instance}/api/i", json={ 'i': self.token }, headers={
66 "Content-Type": "application/json"
67 })
68 if responce.status_code != 200:
69 LOGGER.error("Failed to validate user credentials!")
70 responce.raise_for_status()
71 return
72
73 super().__init__(service, responce.json()["id"], settings, db)
74
75 def _on_note(self, outputs: list[cross.Output], note: dict):
76 if note['userId'] != self.user_id:
77 return
78
79 if note.get('renoteId') or note.get('poll'):
80 # TODO polls not supported on bsky. maybe 3rd party? skip for now
81 # we don't handle reblogs. possible with bridgy(?) and self
82 LOGGER.info("Skipping '%s'! Renote or poll..", note['id'])
83 return
84
85 reply_id: str | None = note.get('replyId')
86 if reply_id:
87 if note.get('reply', {}).get('userId') != self.user_id:
88 LOGGER.info("Skipping '%s'! Reply to other user..", note['id'])
89 return
90
91 if note.get('visibility') not in self.options.allowed_visibility:
92 LOGGER.info("Skipping '%s'! '%s' visibility..", note['id'], note.get('visibility'))
93 return
94
95 success = database.try_insert_post(self.db, note['id'], reply_id, self.user_id, self.service)
96 if not success:
97 LOGGER.info("Skipping '%s' as parent note was not found in db!", note['id'])
98 return
99
100 mention_handles: dict = note.get('mentionHandles') or {}
101 tags: list[str] = note.get('tags') or []
102
103 handles: list[tuple[str, str]] = []
104 for key, value in mention_handles.items():
105 handles.append((value, value))
106
107 tokens = cross.tokenize_markdown(note.get('text', ''), tags, handles)
108 if not cross.test_filters(tokens, self.options.filters):
109 LOGGER.info("Skipping '%s'. Matched a filter!", note['id'])
110 return
111
112 LOGGER.info("Crossposting '%s'...", note['id'])
113
114 media_attachments: list[media_util.MediaInfo] = []
115 for attachment in note.get('files', []):
116 LOGGER.info("Downloading %s...", attachment['url'])
117 info = media_util.download_media(attachment['url'], attachment.get('comment') or '')
118 if not info:
119 LOGGER.error("Skipping '%s'. Failed to download media!", note['id'])
120 return
121 media_attachments.append(info)
122
123 cross_post = MisskeyPost(note, tokens, media_attachments)
124 for output in outputs:
125 output.accept_post(cross_post)
126
127 def _on_delete(self, outputs: list[cross.Output], note: dict):
128 # TODO handle deletes
129 pass
130
131 def _on_message(self, outputs: list[cross.Output], data: dict):
132
133 if data['type'] == 'channel':
134 type: str = data['body']['type']
135 if type == 'note' or type == 'reply':
136 note_body = data['body']['body']
137 self._on_note(outputs, note_body)
138 return
139
140 pass
141
142 async def _send_keepalive(self, ws: websockets.WebSocketClientProtocol):
143 while ws.open:
144 try:
145 await asyncio.sleep(120)
146 if ws.open:
147 await ws.send("h")
148 LOGGER.debug("Sent keepalive h..")
149 else:
150 LOGGER.info("WebSocket is closed, stopping keepalive task.")
151 break
152 except Exception as e:
153 LOGGER.error(f"Error sending keepalive: {e}")
154 break
155
156 async def _subscribe_to_home(self, ws: websockets.WebSocketClientProtocol):
157 await ws.send(json.dumps({
158 "type": "connect",
159 "body": {
160 "channel": "homeTimeline",
161 "id": str(uuid.uuid4())
162 }
163 }))
164 LOGGER.info("Subscribed to 'homeTimeline' channel...")
165
166
167 async def listen(self, outputs: list[cross.Output], submit: Callable[[Callable[[], Any]], Any]):
168 streaming: str = f"wss://{self.service.split("://", 1)[1]}"
169 url: str = f"{streaming}/streaming?i={self.token}"
170
171 async for ws in websockets.connect(url, extra_headers={"User-Agent": "XPost/0.0.3"}):
172 try:
173 LOGGER.info("Listening to %s...", streaming)
174 await self._subscribe_to_home(ws)
175
176 async def listen_for_messages():
177 async for msg in ws:
178 # TODO listen to deletes somehow
179 submit(lambda: self._on_message(outputs, json.loads(msg)))
180
181 keepalive = asyncio.create_task(self._send_keepalive(ws))
182 listen = asyncio.create_task(listen_for_messages())
183
184 await asyncio.gather(keepalive, listen)
185 except websockets.ConnectionClosedError as e:
186 LOGGER.error(e, stack_info=True, exc_info=True)
187 LOGGER.info("Reconnecting to %s...", streaming)
188 continue