social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
1import requests, websockets 2import asyncio 3import json, uuid 4import re 5 6from misskey.common import MisskeyPost 7 8import cross, util.database as database 9import util.md_util as md_util 10from util.media import MediaInfo, download_media 11from util.util import LOGGER, as_envvar 12 13from typing import Callable, Any 14 15ALLOWED_VISIBILITY = ['public', 'home'] 16 17class MisskeyInputOptions(): 18 def __init__(self, o: dict) -> None: 19 self.allowed_visibility = ALLOWED_VISIBILITY 20 self.filters = [re.compile(f) for f in o.get('regex_filters', [])] 21 22 allowed_visibility = o.get('allowed_visibility') 23 if allowed_visibility is not None: 24 if any([v not in ALLOWED_VISIBILITY for v in allowed_visibility]): 25 raise ValueError(f"'allowed_visibility' only accepts {', '.join(ALLOWED_VISIBILITY)}, got: {allowed_visibility}") 26 self.allowed_visibility = allowed_visibility 27 28class MisskeyInput(cross.Input): 29 def __init__(self, settings: dict, db: cross.DataBaseWorker) -> None: 30 self.options = MisskeyInputOptions(settings.get('options', {})) 31 self.token = as_envvar(settings.get('token')) or (_ for _ in ()).throw(ValueError("'token' is required")) 32 instance: str = as_envvar(settings.get('instance')) or (_ for _ in ()).throw(ValueError("'instance' is required")) 33 34 service = instance[:-1] if instance.endswith('/') else instance 35 36 LOGGER.info("Verifying %s credentails...", service) 37 responce = requests.post(f"{instance}/api/i", json={ 'i': self.token }, headers={ 38 "Content-Type": "application/json" 39 }) 40 if responce.status_code != 200: 41 LOGGER.error("Failed to validate user credentials!") 42 responce.raise_for_status() 43 return 44 45 super().__init__(service, responce.json()["id"], settings, db) 46 47 def _on_note(self, outputs: list[cross.Output], note: dict): 48 if note['userId'] != self.user_id: 49 return 50 51 if note.get('visibility') not in self.options.allowed_visibility: 52 LOGGER.info("Skipping '%s'! '%s' visibility..", note['id'], note.get('visibility')) 53 return 54 55 # TODO polls not supported on bsky. maybe 3rd party? skip for now 56 # we don't handle reblogs. possible with bridgy(?) and self 57 if note.get('poll'): 58 LOGGER.info("Skipping '%s'! Contains a poll..", note['id']) 59 return 60 61 renote: dict | None = note.get('renote') 62 if renote: 63 if note.get('text') is not None: 64 LOGGER.info("Skipping '%s'! Quote..", note['id']) 65 return 66 67 if renote.get('userId') != self.user_id: 68 LOGGER.info("Skipping '%s'! Reblog of other user..", note['id']) 69 return 70 71 success = database.try_insert_repost(self.db, note['id'], renote['id'], self.user_id, self.service) 72 if not success: 73 LOGGER.info("Skipping '%s' as renoted note was not found in db!", note['id']) 74 return 75 76 for output in outputs: 77 output.accept_repost(note['id'], renote['id']) 78 return 79 80 reply_id: str | None = note.get('replyId') 81 if reply_id: 82 if note.get('reply', {}).get('userId') != self.user_id: 83 LOGGER.info("Skipping '%s'! Reply to other user..", note['id']) 84 return 85 86 success = database.try_insert_post(self.db, note['id'], reply_id, self.user_id, self.service) 87 if not success: 88 LOGGER.info("Skipping '%s' as parent note was not found in db!", note['id']) 89 return 90 91 mention_handles: dict = note.get('mentionHandles') or {} 92 tags: list[str] = note.get('tags') or [] 93 94 handles: list[tuple[str, str]] = [] 95 for key, value in mention_handles.items(): 96 handles.append((value, value)) 97 98 tokens = md_util.tokenize_markdown(note.get('text', ''), tags, handles) 99 if not cross.test_filters(tokens, self.options.filters): 100 LOGGER.info("Skipping '%s'. Matched a filter!", note['id']) 101 return 102 103 LOGGER.info("Crossposting '%s'...", note['id']) 104 105 media_attachments: list[MediaInfo] = [] 106 for attachment in note.get('files', []): 107 LOGGER.info("Downloading %s...", attachment['url']) 108 info = download_media(attachment['url'], attachment.get('comment') or '') 109 if not info: 110 LOGGER.error("Skipping '%s'. Failed to download media!", note['id']) 111 return 112 media_attachments.append(info) 113 114 cross_post = MisskeyPost(self.service, note, tokens, media_attachments) 115 for output in outputs: 116 output.accept_post(cross_post) 117 118 def _on_delete(self, outputs: list[cross.Output], note: dict): 119 # TODO handle deletes 120 pass 121 122 def _on_message(self, outputs: list[cross.Output], data: dict): 123 124 if data['type'] == 'channel': 125 type: str = data['body']['type'] 126 if type == 'note' or type == 'reply': 127 note_body = data['body']['body'] 128 self._on_note(outputs, note_body) 129 return 130 131 pass 132 133 async def _send_keepalive(self, ws: websockets.WebSocketClientProtocol): 134 while ws.open: 135 try: 136 await asyncio.sleep(120) 137 if ws.open: 138 await ws.send("h") 139 LOGGER.debug("Sent keepalive h..") 140 else: 141 LOGGER.info("WebSocket is closed, stopping keepalive task.") 142 break 143 except Exception as e: 144 LOGGER.error(f"Error sending keepalive: {e}") 145 break 146 147 async def _subscribe_to_home(self, ws: websockets.WebSocketClientProtocol): 148 await ws.send(json.dumps({ 149 "type": "connect", 150 "body": { 151 "channel": "homeTimeline", 152 "id": str(uuid.uuid4()) 153 } 154 })) 155 LOGGER.info("Subscribed to 'homeTimeline' channel...") 156 157 158 async def listen(self, outputs: list[cross.Output], submit: Callable[[Callable[[], Any]], Any]): 159 streaming: str = f"wss://{self.service.split("://", 1)[1]}" 160 url: str = f"{streaming}/streaming?i={self.token}" 161 162 async for ws in websockets.connect(url, extra_headers={"User-Agent": "XPost/0.0.3"}): 163 try: 164 LOGGER.info("Listening to %s...", streaming) 165 await self._subscribe_to_home(ws) 166 167 async def listen_for_messages(): 168 async for msg in ws: 169 # TODO listen to deletes somehow 170 submit(lambda: self._on_message(outputs, json.loads(msg))) 171 172 keepalive = asyncio.create_task(self._send_keepalive(ws)) 173 listen = asyncio.create_task(listen_for_messages()) 174 175 await asyncio.gather(keepalive, listen) 176 except websockets.ConnectionClosedError as e: 177 LOGGER.error(e, stack_info=True, exc_info=True) 178 LOGGER.info("Reconnecting to %s...", streaming) 179 continue