social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
1import cross, media_util, util, database 2from util import LOGGER 3import requests, websockets 4from typing import Callable, Any 5import asyncio 6import json, uuid 7import re 8 9 10class MisskeyPost(cross.Post): 11 def __init__(self, note: dict, tokens: list[cross.Token], files: list[media_util.MediaInfo]) -> None: 12 super().__init__() 13 self.note = note 14 self.sensitive = any([a.get('isSensitive', False) for a in note.get('files', [])]) 15 self.media_attachments = files 16 self.tokens = tokens 17 18 def get_tokens(self) -> list[cross.Token]: 19 return self.tokens 20 21 def get_parent_id(self) -> str | None: 22 return self.note.get('replyId') 23 24 def get_post_date_iso(self) -> str: 25 date = self.note.get('createdAt') 26 return date or super().get_post_date_iso() 27 28 def get_attachments(self) -> list[media_util.MediaInfo]: 29 return self.media_attachments 30 31 def get_id(self) -> str: 32 return self.note['id'] 33 34 def get_cw(self) -> str: 35 return self.note.get('cw') or '' 36 37 def get_languages(self) -> list[str]: 38 return [] 39 40 def is_sensitive(self) -> bool: 41 return self.sensitive 42 43ALLOWED_VISIBILITY = ['public', 'home'] 44 45class MisskeyInputOptions(): 46 def __init__(self, o: dict) -> None: 47 self.allowed_visibility = ALLOWED_VISIBILITY 48 self.filters = [re.compile(f) for f in o.get('regex_filters', [])] 49 50 allowed_visibility = o.get('allowed_visibility') 51 if allowed_visibility is not None: 52 if any([v not in ALLOWED_VISIBILITY for v in allowed_visibility]): 53 raise ValueError(f"'allowed_visibility' only accepts {', '.join(ALLOWED_VISIBILITY)}, got: {allowed_visibility}") 54 self.allowed_visibility = allowed_visibility 55 56class MisskeyInput(cross.Input): 57 def __init__(self, settings: dict, db: cross.DataBaseWorker) -> None: 58 self.options = MisskeyInputOptions(settings.get('options', {})) 59 self.token = util.as_envvar(settings.get('token')) or (_ for _ in ()).throw(ValueError("'token' is required")) 60 instance: str = util.as_envvar(settings.get('instance')) or (_ for _ in ()).throw(ValueError("'instance' is required")) 61 62 service = instance[:-1] if instance.endswith('/') else instance 63 64 LOGGER.info("Verifying %s credentails...", service) 65 responce = requests.post(f"{instance}/api/i", json={ 'i': self.token }, headers={ 66 "Content-Type": "application/json" 67 }) 68 if responce.status_code != 200: 69 LOGGER.error("Failed to validate user credentials!") 70 responce.raise_for_status() 71 return 72 73 super().__init__(service, responce.json()["id"], settings, db) 74 75 def _on_note(self, outputs: list[cross.Output], note: dict): 76 if note['userId'] != self.user_id: 77 return 78 79 if note.get('renoteId') or note.get('poll'): 80 # TODO polls not supported on bsky. maybe 3rd party? skip for now 81 # we don't handle reblogs. possible with bridgy(?) and self 82 LOGGER.info("Skipping '%s'! Renote or poll..", note['id']) 83 return 84 85 reply_id: str | None = note.get('replyId') 86 if reply_id: 87 if note.get('reply', {}).get('userId') != self.user_id: 88 LOGGER.info("Skipping '%s'! Reply to other user..", note['id']) 89 return 90 91 if note.get('visibility') not in self.options.allowed_visibility: 92 LOGGER.info("Skipping '%s'! '%s' visibility..", note['id'], note.get('visibility')) 93 return 94 95 success = database.try_insert_post(self.db, note['id'], reply_id, self.user_id, self.service) 96 if not success: 97 LOGGER.info("Skipping '%s' as parent note was not found in db!", note['id']) 98 return 99 100 mention_handles: dict = note.get('mentionHandles') or {} 101 tags: list[str] = note.get('tags') or [] 102 103 handles: list[tuple[str, str]] = [] 104 for key, value in mention_handles.items(): 105 handles.append((value, value)) 106 107 tokens = cross.tokenize_markdown(note.get('text', ''), tags, handles) 108 if not cross.test_filters(tokens, self.options.filters): 109 LOGGER.info("Skipping '%s'. Matched a filter!", note['id']) 110 return 111 112 LOGGER.info("Crossposting '%s'...", note['id']) 113 114 media_attachments: list[media_util.MediaInfo] = [] 115 for attachment in note.get('files', []): 116 LOGGER.info("Downloading %s...", attachment['url']) 117 info = media_util.download_media(attachment['url'], attachment.get('comment') or '') 118 if not info: 119 LOGGER.error("Skipping '%s'. Failed to download media!", note['id']) 120 return 121 media_attachments.append(info) 122 123 cross_post = MisskeyPost(note, tokens, media_attachments) 124 for output in outputs: 125 output.accept_post(cross_post) 126 127 def _on_delete(self, outputs: list[cross.Output], note: dict): 128 # TODO handle deletes 129 pass 130 131 def _on_message(self, outputs: list[cross.Output], data: dict): 132 133 if data['type'] == 'channel': 134 type: str = data['body']['type'] 135 if type == 'note' or type == 'reply': 136 note_body = data['body']['body'] 137 self._on_note(outputs, note_body) 138 return 139 140 pass 141 142 async def _send_keepalive(self, ws: websockets.WebSocketClientProtocol): 143 while ws.open: 144 try: 145 await asyncio.sleep(120) 146 if ws.open: 147 await ws.send("h") 148 LOGGER.debug("Sent keepalive h..") 149 else: 150 LOGGER.info("WebSocket is closed, stopping keepalive task.") 151 break 152 except Exception as e: 153 LOGGER.error(f"Error sending keepalive: {e}") 154 break 155 156 async def _subscribe_to_home(self, ws: websockets.WebSocketClientProtocol): 157 await ws.send(json.dumps({ 158 "type": "connect", 159 "body": { 160 "channel": "homeTimeline", 161 "id": str(uuid.uuid4()) 162 } 163 })) 164 LOGGER.info("Subscribed to 'homeTimeline' channel...") 165 166 167 async def listen(self, outputs: list[cross.Output], submit: Callable[[Callable[[], Any]], Any]): 168 streaming: str = f"wss://{self.service.split("://", 1)[1]}" 169 url: str = f"{streaming}/streaming?i={self.token}" 170 171 async for ws in websockets.connect(url, extra_headers={"User-Agent": "XPost/0.0.3"}): 172 try: 173 LOGGER.info("Listening to %s...", streaming) 174 await self._subscribe_to_home(ws) 175 176 async def listen_for_messages(): 177 async for msg in ws: 178 # TODO listen to deletes somehow 179 submit(lambda: self._on_message(outputs, json.loads(msg))) 180 181 keepalive = asyncio.create_task(self._send_keepalive(ws)) 182 listen = asyncio.create_task(listen_for_messages()) 183 184 await asyncio.gather(keepalive, listen) 185 except websockets.ConnectionClosedError as e: 186 LOGGER.error(e, stack_info=True, exc_info=True) 187 LOGGER.info("Reconnecting to %s...", streaming) 188 continue