social media crossposting tool. 3rd time's the charm
mastodon
misskey
crossposting
bluesky
1import asyncio
2import json
3import re
4import uuid
5from dataclasses import dataclass, field
6from typing import Any, cast, override
7
8import websockets
9
10from cross.attachments import (
11 LabelsAttachment,
12 MediaAttachment,
13 RemoteUrlAttachment,
14 SensitiveAttachment,
15)
16from cross.media import Blob, download_blob
17from cross.post import Post
18from cross.service import InputService
19from database.connection import DatabasePool
20from misskey.info import MisskeyService
21from util.markdown import MarkdownParser
22from util.util import normalize_service_url
23
24ALLOWED_VISIBILITY = ["public", "home"]
25
26
27@dataclass
28class MisskeyInputOptions:
29 token: str
30 instance: str
31 allowed_visibility: list[str] = field(
32 default_factory=lambda: ALLOWED_VISIBILITY.copy()
33 )
34 filters: list[re.Pattern[str]] = field(default_factory=lambda: [])
35
36 @classmethod
37 def from_dict(cls, data: dict[str, Any]) -> "MisskeyInputOptions":
38 data["instance"] = normalize_service_url(data["instance"])
39
40 if "allowed_visibility" in data:
41 for vis in data.get("allowed_visibility", []):
42 if vis not in ALLOWED_VISIBILITY:
43 raise ValueError(f"Invalid visibility option {vis}!")
44
45 if "filters" in data:
46 data["filters"] = [re.compile(r) for r in data["filters"]]
47
48 return MisskeyInputOptions(**data)
49
50
51class MisskeyInputService(MisskeyService, InputService):
52 def __init__(self, db: DatabasePool, options: MisskeyInputOptions) -> None:
53 super().__init__(options.instance, db)
54 self.options: MisskeyInputOptions = options
55
56 self.log.info("Verifying %s credentails...", self.url)
57 responce = self.verify_credentials()
58 self.user_id: str = responce["id"]
59
60 @override
61 def _get_token(self) -> str:
62 return self.options.token
63
64 def _on_note(self, note: dict[str, Any]):
65 if note["userId"] != self.user_id:
66 return
67
68 if note["visibility"] not in self.options.allowed_visibility:
69 return
70
71 if note.get("poll"):
72 self.log.info("Skipping '%s'! Contains a poll..", note["id"])
73 return
74
75 renote: dict[str, Any] | None = note.get("renote")
76 if renote:
77 if note.get("text") is not None:
78 self.log.info("Skipping '%s'! Quote..", note["id"])
79 return
80 self._on_renote(note, renote)
81 return
82
83 reply: dict[str, Any] | None = note.get("reply")
84 if reply:
85 if reply.get("userId") != self.user_id:
86 self.log.info("Skipping '%s'! Reply to other user..", note["id"])
87 return
88
89 parent = None
90 if reply:
91 parent = self._get_post(self.url, self.user_id, reply["id"])
92 if not parent:
93 self.log.info(
94 "Skipping %s, parent %s not found in db", note["id"], reply["id"]
95 )
96 return
97
98 parser = MarkdownParser() # TODO MFM parser
99 text, fragments = parser.parse(note.get("text", ""))
100 post = Post(id=note["id"], parent_id=reply["id"] if reply else None, text=text)
101 post.fragments.extend(fragments)
102
103 post.attachments.put(RemoteUrlAttachment(url=self.url + "/notes/" + note["id"]))
104 if any([a.get("isSensitive", False) for a in note.get("files", [])]):
105 post.attachments.put(SensitiveAttachment(sensitive=True))
106 if note.get("cw"):
107 post.attachments.put(LabelsAttachment(labels=[note["cw"]]))
108
109 blobs: list[Blob] = []
110 for media in note.get("files", []):
111 self.log.info("Downloading %s...", media["url"])
112 blob: Blob | None = download_blob(media["url"], media.get("comment", ""))
113 if not blob:
114 self.log.error(
115 "Skipping %s! Failed to download media %s.",
116 note["id"],
117 media["url"],
118 )
119 return
120 blobs.append(blob)
121
122 if blobs:
123 post.attachments.put(MediaAttachment(blobs=blobs))
124
125 if parent:
126 self._insert_post(
127 {
128 "user": self.user_id,
129 "service": self.url,
130 "identifier": note["id"],
131 "parent": parent["id"],
132 "root": parent["id"] if not parent["root"] else parent["root"],
133 }
134 )
135 else:
136 self._insert_post(
137 {
138 "user": self.user_id,
139 "service": self.url,
140 "identifier": note["id"],
141 }
142 )
143
144 for out in self.outputs:
145 self.submitter(lambda: out.accept_post(post))
146
147 def _on_renote(self, note: dict[str, Any], renote: dict[str, Any]):
148 reposted = self._get_post(self.url, self.user_id, renote["id"])
149 if not reposted:
150 self.log.info(
151 "Skipping repost '%s' as reposted post '%s' was not found in the db.",
152 note["id"],
153 renote["id"],
154 )
155 return
156
157 self._insert_post(
158 {
159 "user": self.user_id,
160 "service": self.url,
161 "identifier": note["id"],
162 "reposted": reposted["id"],
163 }
164 )
165
166 for out in self.outputs:
167 self.submitter(lambda: out.accept_repost(note["id"], renote["id"]))
168
169 def _accept_msg(self, msg: websockets.Data) -> None:
170 data: dict[str, Any] = cast(dict[str, Any], json.loads(msg))
171
172 if data["type"] == "channel":
173 type: str = cast(str, data["body"]["type"])
174 if type == "note" or type == "reply":
175 note_body = data["body"]["body"]
176 self._on_note(note_body)
177 return
178
179 async def _subscribe_to_home(self, ws: websockets.ClientConnection) -> None:
180 await ws.send(
181 json.dumps(
182 {
183 "type": "connect",
184 "body": {"channel": "homeTimeline", "id": str(uuid.uuid4())},
185 }
186 )
187 )
188 self.log.info("Subscribed to 'homeTimeline' channel...")
189
190 @override
191 async def listen(self):
192 streaming: str = f"{'wss' if self.url.startswith('https') else 'ws'}://{self.url.split('://', 1)[1]}"
193 url: str = f"{streaming}/streaming?i={self.options.token}"
194
195 async for ws in websockets.connect(url):
196 try:
197 self.log.info("Listening to %s...", streaming)
198 await self._subscribe_to_home(ws)
199
200 async def listen_for_messages():
201 async for msg in ws:
202 self.submitter(lambda: self._accept_msg(msg))
203
204 listen = asyncio.create_task(listen_for_messages())
205
206 _ = await asyncio.gather(listen)
207 except websockets.ConnectionClosedError as e:
208 self.log.error(e, stack_info=True, exc_info=True)
209 self.log.info("Reconnecting to %s...", streaming)
210 continue