social media crossposting tool. 3rd time's the charm
mastodon
misskey
crossposting
bluesky
1import requests, websockets
2import asyncio
3import json, uuid
4import re
5
6from misskey.common import MisskeyPost
7
8import cross, util.database as database
9import util.md_util as md_util
10from util.media import MediaInfo, download_media
11from util.util import LOGGER, as_envvar
12
13from typing import Callable, Any
14
15ALLOWED_VISIBILITY = ['public', 'home']
16
17class MisskeyInputOptions():
18 def __init__(self, o: dict) -> None:
19 self.allowed_visibility = ALLOWED_VISIBILITY
20 self.filters = [re.compile(f) for f in o.get('regex_filters', [])]
21
22 allowed_visibility = o.get('allowed_visibility')
23 if allowed_visibility is not None:
24 if any([v not in ALLOWED_VISIBILITY for v in allowed_visibility]):
25 raise ValueError(f"'allowed_visibility' only accepts {', '.join(ALLOWED_VISIBILITY)}, got: {allowed_visibility}")
26 self.allowed_visibility = allowed_visibility
27
28class MisskeyInput(cross.Input):
29 def __init__(self, settings: dict, db: cross.DataBaseWorker) -> None:
30 self.options = MisskeyInputOptions(settings.get('options', {}))
31 self.token = as_envvar(settings.get('token')) or (_ for _ in ()).throw(ValueError("'token' is required"))
32 instance: str = as_envvar(settings.get('instance')) or (_ for _ in ()).throw(ValueError("'instance' is required"))
33
34 service = instance[:-1] if instance.endswith('/') else instance
35
36 LOGGER.info("Verifying %s credentails...", service)
37 responce = requests.post(f"{instance}/api/i", json={ 'i': self.token }, headers={
38 "Content-Type": "application/json"
39 })
40 if responce.status_code != 200:
41 LOGGER.error("Failed to validate user credentials!")
42 responce.raise_for_status()
43 return
44
45 super().__init__(service, responce.json()["id"], settings, db)
46
47 def _on_note(self, outputs: list[cross.Output], note: dict):
48 if note['userId'] != self.user_id:
49 return
50
51 if note.get('visibility') not in self.options.allowed_visibility:
52 LOGGER.info("Skipping '%s'! '%s' visibility..", note['id'], note.get('visibility'))
53 return
54
55 # TODO polls not supported on bsky. maybe 3rd party? skip for now
56 # we don't handle reblogs. possible with bridgy(?) and self
57 if note.get('poll'):
58 LOGGER.info("Skipping '%s'! Contains a poll..", note['id'])
59 return
60
61 renote: dict | None = note.get('renote')
62 if renote:
63 if note.get('text') is not None:
64 LOGGER.info("Skipping '%s'! Quote..", note['id'])
65 return
66
67 if renote.get('userId') != self.user_id:
68 LOGGER.info("Skipping '%s'! Reblog of other user..", note['id'])
69 return
70
71 success = database.try_insert_repost(self.db, note['id'], renote['id'], self.user_id, self.service)
72 if not success:
73 LOGGER.info("Skipping '%s' as renoted note was not found in db!", note['id'])
74 return
75
76 for output in outputs:
77 output.accept_repost(note['id'], renote['id'])
78 return
79
80 reply_id: str | None = note.get('replyId')
81 if reply_id:
82 if note.get('reply', {}).get('userId') != self.user_id:
83 LOGGER.info("Skipping '%s'! Reply to other user..", note['id'])
84 return
85
86 success = database.try_insert_post(self.db, note['id'], reply_id, self.user_id, self.service)
87 if not success:
88 LOGGER.info("Skipping '%s' as parent note was not found in db!", note['id'])
89 return
90
91 mention_handles: dict = note.get('mentionHandles') or {}
92 tags: list[str] = note.get('tags') or []
93
94 handles: list[tuple[str, str]] = []
95 for key, value in mention_handles.items():
96 handles.append((value, value))
97
98 tokens = md_util.tokenize_markdown(note.get('text', ''), tags, handles)
99 if not cross.test_filters(tokens, self.options.filters):
100 LOGGER.info("Skipping '%s'. Matched a filter!", note['id'])
101 return
102
103 LOGGER.info("Crossposting '%s'...", note['id'])
104
105 media_attachments: list[MediaInfo] = []
106 for attachment in note.get('files', []):
107 LOGGER.info("Downloading %s...", attachment['url'])
108 info = download_media(attachment['url'], attachment.get('comment') or '')
109 if not info:
110 LOGGER.error("Skipping '%s'. Failed to download media!", note['id'])
111 return
112 media_attachments.append(info)
113
114 cross_post = MisskeyPost(self.service, note, tokens, media_attachments)
115 for output in outputs:
116 output.accept_post(cross_post)
117
118 def _on_delete(self, outputs: list[cross.Output], note: dict):
119 # TODO handle deletes
120 pass
121
122 def _on_message(self, outputs: list[cross.Output], data: dict):
123
124 if data['type'] == 'channel':
125 type: str = data['body']['type']
126 if type == 'note' or type == 'reply':
127 note_body = data['body']['body']
128 self._on_note(outputs, note_body)
129 return
130
131 pass
132
133 async def _send_keepalive(self, ws: websockets.WebSocketClientProtocol):
134 while ws.open:
135 try:
136 await asyncio.sleep(120)
137 if ws.open:
138 await ws.send("h")
139 LOGGER.debug("Sent keepalive h..")
140 else:
141 LOGGER.info("WebSocket is closed, stopping keepalive task.")
142 break
143 except Exception as e:
144 LOGGER.error(f"Error sending keepalive: {e}")
145 break
146
147 async def _subscribe_to_home(self, ws: websockets.WebSocketClientProtocol):
148 await ws.send(json.dumps({
149 "type": "connect",
150 "body": {
151 "channel": "homeTimeline",
152 "id": str(uuid.uuid4())
153 }
154 }))
155 LOGGER.info("Subscribed to 'homeTimeline' channel...")
156
157
158 async def listen(self, outputs: list[cross.Output], submit: Callable[[Callable[[], Any]], Any]):
159 streaming: str = f"wss://{self.service.split("://", 1)[1]}"
160 url: str = f"{streaming}/streaming?i={self.token}"
161
162 async for ws in websockets.connect(url, extra_headers={"User-Agent": "XPost/0.0.3"}):
163 try:
164 LOGGER.info("Listening to %s...", streaming)
165 await self._subscribe_to_home(ws)
166
167 async def listen_for_messages():
168 async for msg in ws:
169 # TODO listen to deletes somehow
170 submit(lambda: self._on_message(outputs, json.loads(msg)))
171
172 keepalive = asyncio.create_task(self._send_keepalive(ws))
173 listen = asyncio.create_task(listen_for_messages())
174
175 await asyncio.gather(keepalive, listen)
176 except websockets.ConnectionClosedError as e:
177 LOGGER.error(e, stack_info=True, exc_info=True)
178 LOGGER.info("Reconnecting to %s...", streaming)
179 continue