social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
at master 7.8 kB view raw
1import asyncio 2import json 3import re 4from typing import Any, Callable 5 6import websockets 7from atproto_client import models 8from atproto_client.models.utils import get_or_create as get_model_or_create 9 10import cross 11import util.database as database 12from bluesky.atproto2 import resolve_identity 13from bluesky.common import SERVICE, BlueskyPost, tokenize_post 14from util.database import DataBaseWorker 15from util.media import MediaInfo, download_media 16from util.util import LOGGER, as_envvar 17 18 19class BlueskyInputOptions: 20 def __init__(self, o: dict) -> None: 21 self.filters = [re.compile(f) for f in o.get("regex_filters", [])] 22 23 24class BlueskyInput(cross.Input): 25 def __init__(self, settings: dict, db: DataBaseWorker) -> None: 26 self.options = BlueskyInputOptions(settings.get("options", {})) 27 did, pds = resolve_identity( 28 handle=as_envvar(settings.get("handle")), 29 did=as_envvar(settings.get("did")), 30 pds=as_envvar(settings.get("pds")), 31 ) 32 self.pds = pds 33 34 # PDS is Not a service, the lexicon and rids are the same across pds 35 super().__init__(SERVICE, did, settings, db) 36 37 def _on_post(self, outputs: list[cross.Output], post: dict[str, Any]): 38 post_uri = post["$xpost.strongRef"]["uri"] 39 post_cid = post["$xpost.strongRef"]["cid"] 40 41 parent_uri = None 42 if post.get("reply"): 43 parent_uri = post["reply"]["parent"]["uri"] 44 45 embed = post.get("embed", {}) 46 if embed.get("$type") in ( 47 "app.bsky.embed.record", 48 "app.bsky.embed.recordWithMedia", 49 ): 50 did, collection, rid = str(embed["record"]["uri"][len("at://") :]).split( 51 "/" 52 ) 53 if collection == "app.bsky.feed.post": 54 LOGGER.info("Skipping '%s'! Quote..", post_uri) 55 return 56 57 success = database.try_insert_post( 58 self.db, post_uri, parent_uri, self.user_id, self.service 59 ) 60 if not success: 61 LOGGER.info("Skipping '%s' as parent post was not found in db!", post_uri) 62 return 63 database.store_data( 64 self.db, post_uri, self.user_id, self.service, {"cid": post_cid} 65 ) 66 67 tokens = tokenize_post(post) 68 if not cross.test_filters(tokens, self.options.filters): 69 LOGGER.info("Skipping '%s'. Matched a filter!", post_uri) 70 return 71 72 LOGGER.info("Crossposting '%s'...", post_uri) 73 74 def get_blob_url(blob: str): 75 return f"{self.pds}/xrpc/com.atproto.sync.getBlob?did={self.user_id}&cid={blob}" 76 77 attachments: list[MediaInfo] = [] 78 if embed.get("$type") == "app.bsky.embed.images": 79 model = get_model_or_create(embed, model=models.AppBskyEmbedImages.Main) 80 assert isinstance(model, models.AppBskyEmbedImages.Main) 81 82 for image in model.images: 83 url = get_blob_url(image.image.cid.encode()) 84 LOGGER.info("Downloading %s...", url) 85 io = download_media(url, image.alt) 86 if not io: 87 LOGGER.error("Skipping '%s'. Failed to download media!", post_uri) 88 return 89 attachments.append(io) 90 elif embed.get("$type") == "app.bsky.embed.video": 91 model = get_model_or_create(embed, model=models.AppBskyEmbedVideo.Main) 92 assert isinstance(model, models.AppBskyEmbedVideo.Main) 93 url = get_blob_url(model.video.cid.encode()) 94 LOGGER.info("Downloading %s...", url) 95 io = download_media(url, model.alt if model.alt else "") 96 if not io: 97 LOGGER.error("Skipping '%s'. Failed to download media!", post_uri) 98 return 99 attachments.append(io) 100 101 cross_post = BlueskyPost(post, tokens, attachments) 102 for output in outputs: 103 output.accept_post(cross_post) 104 105 def _on_delete_post(self, outputs: list[cross.Output], post_id: str, repost: bool): 106 post = database.find_post(self.db, post_id, self.user_id, self.service) 107 if not post: 108 return 109 110 LOGGER.info("Deleting '%s'...", post_id) 111 if repost: 112 for output in outputs: 113 output.delete_repost(post_id) 114 else: 115 for output in outputs: 116 output.delete_post(post_id) 117 database.delete_post(self.db, post_id, self.user_id, self.service) 118 119 def _on_repost(self, outputs: list[cross.Output], post: dict[str, Any]): 120 post_uri = post["$xpost.strongRef"]["uri"] 121 post_cid = post["$xpost.strongRef"]["cid"] 122 123 reposted_uri = post["subject"]["uri"] 124 125 success = database.try_insert_repost( 126 self.db, post_uri, reposted_uri, self.user_id, self.service 127 ) 128 if not success: 129 LOGGER.info("Skipping '%s' as reposted post was not found in db!", post_uri) 130 return 131 database.store_data( 132 self.db, post_uri, self.user_id, self.service, {"cid": post_cid} 133 ) 134 135 LOGGER.info("Crossposting '%s'...", post_uri) 136 for output in outputs: 137 output.accept_repost(post_uri, reposted_uri) 138 139 140class BlueskyJetstreamInput(BlueskyInput): 141 def __init__(self, settings: dict, db: DataBaseWorker) -> None: 142 super().__init__(settings, db) 143 self.jetstream = settings.get( 144 "jetstream", "wss://jetstream2.us-east.bsky.network/subscribe" 145 ) 146 147 def __on_commit(self, outputs: list[cross.Output], msg: dict): 148 if msg.get("did") != self.user_id: 149 return 150 151 commit: dict = msg.get("commit", {}) 152 if not commit: 153 return 154 155 commit_type = commit["operation"] 156 match commit_type: 157 case "create": 158 record = dict(commit.get("record", {})) 159 record["$xpost.strongRef"] = { 160 "cid": commit["cid"], 161 "uri": f"at://{self.user_id}/{commit['collection']}/{commit['rkey']}", 162 } 163 164 match commit["collection"]: 165 case "app.bsky.feed.post": 166 self._on_post(outputs, record) 167 case "app.bsky.feed.repost": 168 self._on_repost(outputs, record) 169 case "delete": 170 post_id: str = ( 171 f"at://{self.user_id}/{commit['collection']}/{commit['rkey']}" 172 ) 173 match commit["collection"]: 174 case "app.bsky.feed.post": 175 self._on_delete_post(outputs, post_id, False) 176 case "app.bsky.feed.repost": 177 self._on_delete_post(outputs, post_id, True) 178 179 async def listen( 180 self, outputs: list[cross.Output], submit: Callable[[Callable[[], Any]], Any] 181 ): 182 uri = self.jetstream + "?" 183 uri += "wantedCollections=app.bsky.feed.post" 184 uri += "&wantedCollections=app.bsky.feed.repost" 185 uri += f"&wantedDids={self.user_id}" 186 187 async for ws in websockets.connect( 188 uri, extra_headers={"User-Agent": "XPost/0.0.3"} 189 ): 190 try: 191 LOGGER.info("Listening to %s...", self.jetstream) 192 193 async def listen_for_messages(): 194 async for msg in ws: 195 submit(lambda: self.__on_commit(outputs, json.loads(msg))) 196 197 listen = asyncio.create_task(listen_for_messages()) 198 199 await asyncio.gather(listen) 200 except websockets.ConnectionClosedError as e: 201 LOGGER.error(e, stack_info=True, exc_info=True) 202 LOGGER.info("Reconnecting to %s...", self.jetstream) 203 continue