social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
1import asyncio 2import json 3import re 4import uuid 5from dataclasses import dataclass, field 6from typing import Any, cast, override 7 8import websockets 9 10from cross.attachments import ( 11 LabelsAttachment, 12 MediaAttachment, 13 RemoteUrlAttachment, 14 SensitiveAttachment, 15) 16from cross.media import Blob, download_blob 17from cross.post import Post 18from cross.service import InputService 19from database.connection import DatabasePool 20from misskey.info import MisskeyService 21from util.markdown import MarkdownParser 22from util.util import normalize_service_url 23 24ALLOWED_VISIBILITY = ["public", "home"] 25 26 27@dataclass 28class MisskeyInputOptions: 29 token: str 30 instance: str 31 allowed_visibility: list[str] = field( 32 default_factory=lambda: ALLOWED_VISIBILITY.copy() 33 ) 34 filters: list[re.Pattern[str]] = field(default_factory=lambda: []) 35 36 @classmethod 37 def from_dict(cls, data: dict[str, Any]) -> "MisskeyInputOptions": 38 data["instance"] = normalize_service_url(data["instance"]) 39 40 if "allowed_visibility" in data: 41 for vis in data.get("allowed_visibility", []): 42 if vis not in ALLOWED_VISIBILITY: 43 raise ValueError(f"Invalid visibility option {vis}!") 44 45 if "filters" in data: 46 data["filters"] = [re.compile(r) for r in data["filters"]] 47 48 return MisskeyInputOptions(**data) 49 50 51class MisskeyInputService(MisskeyService, InputService): 52 def __init__(self, db: DatabasePool, options: MisskeyInputOptions) -> None: 53 super().__init__(options.instance, db) 54 self.options: MisskeyInputOptions = options 55 56 self.log.info("Verifying %s credentails...", self.url) 57 responce = self.verify_credentials() 58 self.user_id: str = responce["id"] 59 60 @override 61 def _get_token(self) -> str: 62 return self.options.token 63 64 def _on_note(self, note: dict[str, Any]): 65 if note["userId"] != self.user_id: 66 return 67 68 if note["visibility"] not in self.options.allowed_visibility: 69 return 70 71 if note.get("poll"): 72 self.log.info("Skipping '%s'! Contains a poll..", note["id"]) 73 return 74 75 renote: dict[str, Any] | None = note.get("renote") 76 if renote: 77 if note.get("text") is not None: 78 self.log.info("Skipping '%s'! Quote..", note["id"]) 79 return 80 self._on_renote(note, renote) 81 return 82 83 reply: dict[str, Any] | None = note.get("reply") 84 if reply: 85 if reply.get("userId") != self.user_id: 86 self.log.info("Skipping '%s'! Reply to other user..", note["id"]) 87 return 88 89 parent = None 90 if reply: 91 parent = self._get_post(self.url, self.user_id, reply["id"]) 92 if not parent: 93 self.log.info( 94 "Skipping %s, parent %s not found in db", note["id"], reply["id"] 95 ) 96 return 97 98 parser = MarkdownParser() # TODO MFM parser 99 text, fragments = parser.parse(note.get("text", "")) 100 post = Post(id=note["id"], parent_id=reply["id"] if reply else None, text=text) 101 post.fragments.extend(fragments) 102 103 post.attachments.put(RemoteUrlAttachment(url=self.url + "/notes/" + note["id"])) 104 if any([a.get("isSensitive", False) for a in note.get("files", [])]): 105 post.attachments.put(SensitiveAttachment(sensitive=True)) 106 if note.get("cw"): 107 post.attachments.put(LabelsAttachment(labels=[note["cw"]])) 108 109 blobs: list[Blob] = [] 110 for media in note.get("files", []): 111 self.log.info("Downloading %s...", media["url"]) 112 blob: Blob | None = download_blob(media["url"], media.get("comment", "")) 113 if not blob: 114 self.log.error( 115 "Skipping %s! Failed to download media %s.", 116 note["id"], 117 media["url"], 118 ) 119 return 120 blobs.append(blob) 121 122 if blobs: 123 post.attachments.put(MediaAttachment(blobs=blobs)) 124 125 if parent: 126 self._insert_post( 127 { 128 "user": self.user_id, 129 "service": self.url, 130 "identifier": note["id"], 131 "parent": parent["id"], 132 "root": parent["id"] if not parent["root"] else parent["root"], 133 } 134 ) 135 else: 136 self._insert_post( 137 { 138 "user": self.user_id, 139 "service": self.url, 140 "identifier": note["id"], 141 } 142 ) 143 144 for out in self.outputs: 145 self.submitter(lambda: out.accept_post(post)) 146 147 def _on_renote(self, note: dict[str, Any], renote: dict[str, Any]): 148 reposted = self._get_post(self.url, self.user_id, renote["id"]) 149 if not reposted: 150 self.log.info( 151 "Skipping repost '%s' as reposted post '%s' was not found in the db.", 152 note["id"], 153 renote["id"], 154 ) 155 return 156 157 self._insert_post( 158 { 159 "user": self.user_id, 160 "service": self.url, 161 "identifier": note["id"], 162 "reposted": reposted["id"], 163 } 164 ) 165 166 for out in self.outputs: 167 self.submitter(lambda: out.accept_repost(note["id"], renote["id"])) 168 169 def _accept_msg(self, msg: websockets.Data) -> None: 170 data: dict[str, Any] = cast(dict[str, Any], json.loads(msg)) 171 172 if data["type"] == "channel": 173 type: str = cast(str, data["body"]["type"]) 174 if type == "note" or type == "reply": 175 note_body = data["body"]["body"] 176 self._on_note(note_body) 177 return 178 179 async def _subscribe_to_home(self, ws: websockets.ClientConnection) -> None: 180 await ws.send( 181 json.dumps( 182 { 183 "type": "connect", 184 "body": {"channel": "homeTimeline", "id": str(uuid.uuid4())}, 185 } 186 ) 187 ) 188 self.log.info("Subscribed to 'homeTimeline' channel...") 189 190 @override 191 async def listen(self): 192 streaming: str = f"{'wss' if self.url.startswith('https') else 'ws'}://{self.url.split('://', 1)[1]}" 193 url: str = f"{streaming}/streaming?i={self.options.token}" 194 195 async for ws in websockets.connect(url): 196 try: 197 self.log.info("Listening to %s...", streaming) 198 await self._subscribe_to_home(ws) 199 200 async def listen_for_messages(): 201 async for msg in ws: 202 self.submitter(lambda: self._accept_msg(msg)) 203 204 listen = asyncio.create_task(listen_for_messages()) 205 206 _ = await asyncio.gather(listen) 207 except websockets.ConnectionClosedError as e: 208 self.log.error(e, stack_info=True, exc_info=True) 209 self.log.info("Reconnecting to %s...", streaming) 210 continue