social media crossposting tool. 3rd time's the charm
mastodon
misskey
crossposting
bluesky
1import asyncio
2import json
3import re
4from typing import Any, Callable
5
6import websockets
7from atproto_client import models
8from atproto_client.models.utils import get_or_create as get_model_or_create
9
10import cross
11import util.database as database
12from bluesky.atproto2 import resolve_identity
13from bluesky.common import SERVICE, BlueskyPost, tokenize_post
14from util.database import DataBaseWorker
15from util.media import MediaInfo, download_media
16from util.util import LOGGER, as_envvar
17
18
19class BlueskyInputOptions:
20 def __init__(self, o: dict) -> None:
21 self.filters = [re.compile(f) for f in o.get("regex_filters", [])]
22
23
24class BlueskyInput(cross.Input):
25 def __init__(self, settings: dict, db: DataBaseWorker) -> None:
26 self.options = BlueskyInputOptions(settings.get("options", {}))
27 did, pds = resolve_identity(
28 handle=as_envvar(settings.get("handle")),
29 did=as_envvar(settings.get("did")),
30 pds=as_envvar(settings.get("pds")),
31 )
32 self.pds = pds
33
34 # PDS is Not a service, the lexicon and rids are the same across pds
35 super().__init__(SERVICE, did, settings, db)
36
37 def _on_post(self, outputs: list[cross.Output], post: dict[str, Any]):
38 post_uri = post["$xpost.strongRef"]["uri"]
39 post_cid = post["$xpost.strongRef"]["cid"]
40
41 parent_uri = None
42 if post.get("reply"):
43 parent_uri = post["reply"]["parent"]["uri"]
44
45 embed = post.get("embed", {})
46 if embed.get("$type") in (
47 "app.bsky.embed.record",
48 "app.bsky.embed.recordWithMedia",
49 ):
50 did, collection, rid = str(embed["record"]["uri"][len("at://") :]).split(
51 "/"
52 )
53 if collection == "app.bsky.feed.post":
54 LOGGER.info("Skipping '%s'! Quote..", post_uri)
55 return
56
57 success = database.try_insert_post(
58 self.db, post_uri, parent_uri, self.user_id, self.service
59 )
60 if not success:
61 LOGGER.info("Skipping '%s' as parent post was not found in db!", post_uri)
62 return
63 database.store_data(
64 self.db, post_uri, self.user_id, self.service, {"cid": post_cid}
65 )
66
67 tokens = tokenize_post(post)
68 if not cross.test_filters(tokens, self.options.filters):
69 LOGGER.info("Skipping '%s'. Matched a filter!", post_uri)
70 return
71
72 LOGGER.info("Crossposting '%s'...", post_uri)
73
74 def get_blob_url(blob: str):
75 return f"{self.pds}/xrpc/com.atproto.sync.getBlob?did={self.user_id}&cid={blob}"
76
77 attachments: list[MediaInfo] = []
78 if embed.get("$type") == "app.bsky.embed.images":
79 model = get_model_or_create(embed, model=models.AppBskyEmbedImages.Main)
80 assert isinstance(model, models.AppBskyEmbedImages.Main)
81
82 for image in model.images:
83 url = get_blob_url(image.image.cid.encode())
84 LOGGER.info("Downloading %s...", url)
85 io = download_media(url, image.alt)
86 if not io:
87 LOGGER.error("Skipping '%s'. Failed to download media!", post_uri)
88 return
89 attachments.append(io)
90 elif embed.get("$type") == "app.bsky.embed.video":
91 model = get_model_or_create(embed, model=models.AppBskyEmbedVideo.Main)
92 assert isinstance(model, models.AppBskyEmbedVideo.Main)
93 url = get_blob_url(model.video.cid.encode())
94 LOGGER.info("Downloading %s...", url)
95 io = download_media(url, model.alt if model.alt else "")
96 if not io:
97 LOGGER.error("Skipping '%s'. Failed to download media!", post_uri)
98 return
99 attachments.append(io)
100
101 cross_post = BlueskyPost(post, tokens, attachments)
102 for output in outputs:
103 output.accept_post(cross_post)
104
105 def _on_delete_post(self, outputs: list[cross.Output], post_id: str, repost: bool):
106 post = database.find_post(self.db, post_id, self.user_id, self.service)
107 if not post:
108 return
109
110 LOGGER.info("Deleting '%s'...", post_id)
111 if repost:
112 for output in outputs:
113 output.delete_repost(post_id)
114 else:
115 for output in outputs:
116 output.delete_post(post_id)
117 database.delete_post(self.db, post_id, self.user_id, self.service)
118
119 def _on_repost(self, outputs: list[cross.Output], post: dict[str, Any]):
120 post_uri = post["$xpost.strongRef"]["uri"]
121 post_cid = post["$xpost.strongRef"]["cid"]
122
123 reposted_uri = post["subject"]["uri"]
124
125 success = database.try_insert_repost(
126 self.db, post_uri, reposted_uri, self.user_id, self.service
127 )
128 if not success:
129 LOGGER.info("Skipping '%s' as reposted post was not found in db!", post_uri)
130 return
131 database.store_data(
132 self.db, post_uri, self.user_id, self.service, {"cid": post_cid}
133 )
134
135 LOGGER.info("Crossposting '%s'...", post_uri)
136 for output in outputs:
137 output.accept_repost(post_uri, reposted_uri)
138
139
140class BlueskyJetstreamInput(BlueskyInput):
141 def __init__(self, settings: dict, db: DataBaseWorker) -> None:
142 super().__init__(settings, db)
143 self.jetstream = settings.get(
144 "jetstream", "wss://jetstream2.us-east.bsky.network/subscribe"
145 )
146
147 def __on_commit(self, outputs: list[cross.Output], msg: dict):
148 if msg.get("did") != self.user_id:
149 return
150
151 commit: dict = msg.get("commit", {})
152 if not commit:
153 return
154
155 commit_type = commit["operation"]
156 match commit_type:
157 case "create":
158 record = dict(commit.get("record", {}))
159 record["$xpost.strongRef"] = {
160 "cid": commit["cid"],
161 "uri": f"at://{self.user_id}/{commit['collection']}/{commit['rkey']}",
162 }
163
164 match commit["collection"]:
165 case "app.bsky.feed.post":
166 self._on_post(outputs, record)
167 case "app.bsky.feed.repost":
168 self._on_repost(outputs, record)
169 case "delete":
170 post_id: str = (
171 f"at://{self.user_id}/{commit['collection']}/{commit['rkey']}"
172 )
173 match commit["collection"]:
174 case "app.bsky.feed.post":
175 self._on_delete_post(outputs, post_id, False)
176 case "app.bsky.feed.repost":
177 self._on_delete_post(outputs, post_id, True)
178
179 async def listen(
180 self, outputs: list[cross.Output], submit: Callable[[Callable[[], Any]], Any]
181 ):
182 uri = self.jetstream + "?"
183 uri += "wantedCollections=app.bsky.feed.post"
184 uri += "&wantedCollections=app.bsky.feed.repost"
185 uri += f"&wantedDids={self.user_id}"
186
187 async for ws in websockets.connect(
188 uri, extra_headers={"User-Agent": "XPost/0.0.3"}
189 ):
190 try:
191 LOGGER.info("Listening to %s...", self.jetstream)
192
193 async def listen_for_messages():
194 async for msg in ws:
195 submit(lambda: self.__on_commit(outputs, json.loads(msg)))
196
197 listen = asyncio.create_task(listen_for_messages())
198
199 await asyncio.gather(listen)
200 except websockets.ConnectionClosedError as e:
201 LOGGER.error(e, stack_info=True, exc_info=True)
202 LOGGER.info("Reconnecting to %s...", self.jetstream)
203 continue