from atproto import Request, client_utils from atproto_client import models from httpx import Timeout import cross import misskey.mfm_util as mfm_util import util.database as database from bluesky.atproto2 import Client2, resolve_identity from bluesky.common import ADULT_PATTERN, PORN_PATTERN, SERVICE, tokens_to_richtext from util.database import DataBaseWorker from util.media import ( MediaInfo, compress_image, convert_to_mp4, get_filename_from_url, get_media_meta, ) from util.util import LOGGER, as_envvar ALLOWED_GATES = ["mentioned", "following", "followers", "everybody"] class BlueskyOutputOptions: def __init__(self, o: dict) -> None: self.quote_gate: bool = False self.thread_gate: list[str] = ["everybody"] self.encode_videos: bool = True quote_gate = o.get("quote_gate") if quote_gate is not None: self.quote_gate = bool(quote_gate) thread_gate = o.get("thread_gate") if thread_gate is not None: if any([v not in ALLOWED_GATES for v in thread_gate]): raise ValueError( f"'thread_gate' only accepts {', '.join(ALLOWED_GATES)} or [], got: {thread_gate}" ) self.thread_gate = thread_gate encode_videos = o.get("encode_videos") if encode_videos is not None: self.encode_videos = bool(encode_videos) class BlueskyOutput(cross.Output): def __init__(self, input: cross.Input, settings: dict, db: DataBaseWorker) -> None: super().__init__(input, settings, db) self.options = BlueskyOutputOptions(settings.get("options") or {}) if not as_envvar(settings.get("app-password")): raise Exception("Account app password not provided!") did, pds = resolve_identity( handle=as_envvar(settings.get("handle")), did=as_envvar(settings.get("did")), pds=as_envvar(settings.get("pds")), ) reqs = Request(timeout=Timeout(None, connect=30.0)) self.bsky = Client2(pds, request=reqs) self.bsky.configure_proxy_header( service_type="bsky_appview", did=as_envvar(settings.get("bsky_appview")) or "did:web:api.bsky.app", ) self.bsky.login(did, as_envvar(settings.get("app-password"))) def __check_login(self): login = self.bsky.me if not login: raise Exception("Client not logged in!") return login def _find_parent(self, parent_id: str): login = self.__check_login() thread_tuple = database.find_mapped_thread( self.db, parent_id, self.input.user_id, self.input.service, login.did, SERVICE, ) if not thread_tuple: LOGGER.error("Failed to find thread tuple in the database!") return None root_uri: str = thread_tuple[0] reply_uri: str = thread_tuple[1] root_cid = database.fetch_data(self.db, root_uri, login.did, SERVICE)["cid"] reply_cid = database.fetch_data(self.db, root_uri, login.did, SERVICE)["cid"] root_record = models.AppBskyFeedPost.CreateRecordResponse( uri=root_uri, cid=root_cid ) reply_record = models.AppBskyFeedPost.CreateRecordResponse( uri=reply_uri, cid=reply_cid ) return ( models.create_strong_ref(root_record), models.create_strong_ref(reply_record), thread_tuple[2], thread_tuple[3], ) def _split_attachments(self, attachments: list[MediaInfo]): sup_media: list[MediaInfo] = [] unsup_media: list[MediaInfo] = [] for a in attachments: if a.mime.startswith("image/") or a.mime.startswith( "video/" ): # TODO convert gifs to videos sup_media.append(a) else: unsup_media.append(a) return (sup_media, unsup_media) def _split_media_per_post( self, tokens: list[client_utils.TextBuilder], media: list[MediaInfo] ): posts: list[dict] = [{"tokens": tokens, "attachments": []} for tokens in tokens] available_indices: list[int] = list(range(len(posts))) current_image_post_idx: int | None = None def make_blank_post() -> dict: return {"tokens": [client_utils.TextBuilder().text("")], "attachments": []} def pop_next_empty_index() -> int: if available_indices: return available_indices.pop(0) else: new_idx = len(posts) posts.append(make_blank_post()) return new_idx for att in media: if att.mime.startswith("video/"): current_image_post_idx = None idx = pop_next_empty_index() posts[idx]["attachments"].append(att) elif att.mime.startswith("image/"): if ( current_image_post_idx is not None and len(posts[current_image_post_idx]["attachments"]) < 4 ): posts[current_image_post_idx]["attachments"].append(att) else: idx = pop_next_empty_index() posts[idx]["attachments"].append(att) current_image_post_idx = idx result: list[tuple[client_utils.TextBuilder, list[MediaInfo]]] = [] for p in posts: result.append((p["tokens"], p["attachments"])) return result def accept_post(self, post: cross.Post): login = self.__check_login() parent_id = post.get_parent_id() # used for db insertion new_root_id = None new_parent_id = None root_ref = None reply_ref = None if parent_id: parents = self._find_parent(parent_id) if not parents: return root_ref, reply_ref, new_root_id, new_parent_id = parents tokens = post.get_tokens().copy() unique_labels: set[str] = set() cw = post.get_spoiler() if cw: tokens.insert(0, cross.TextToken("CW: " + cw + "\n\n")) unique_labels.add("graphic-media") # from bsky.app, a post can only have one of those labels if PORN_PATTERN.search(cw): unique_labels.add("porn") elif ADULT_PATTERN.search(cw): unique_labels.add("sexual") if post.is_sensitive(): unique_labels.add("graphic-media") labels = ( models.ComAtprotoLabelDefs.SelfLabels( values=[ models.ComAtprotoLabelDefs.SelfLabel(val=label) for label in unique_labels ] ) if unique_labels else None ) sup_media, unsup_media = self._split_attachments(post.get_attachments()) if unsup_media: if tokens: tokens.append(cross.TextToken("\n")) for i, attachment in enumerate(unsup_media): tokens.append( cross.LinkToken( attachment.url, f"[{get_filename_from_url(attachment.url)}]" ) ) tokens.append(cross.TextToken(" ")) if post.get_text_type() == "text/x.misskeymarkdown": tokens, status = mfm_util.strip_mfm(tokens) post_url = post.get_post_url() if status and post_url: tokens.append(cross.TextToken("\n")) tokens.append( cross.LinkToken(post_url, "[Post contains MFM, see original]") ) split_tokens: list[list[cross.Token]] = cross.split_tokens(tokens, 300) post_text: list[client_utils.TextBuilder] = [] # convert tokens into rich text. skip post if contains unsupported tokens for block in split_tokens: rich_text = tokens_to_richtext(block) if not rich_text: LOGGER.error( "Skipping '%s' as it contains invalid rich text types!", post.get_id(), ) return post_text.append(rich_text) if not post_text: post_text = [client_utils.TextBuilder().text("")] for m in sup_media: if m.mime.startswith("image/"): if len(m.io) > 2_000_000: LOGGER.error( "Skipping post_id '%s', failed to download attachment! File too large.", post.get_id(), ) return if m.mime.startswith("video/"): if m.mime != "video/mp4" and not self.options.encode_videos: LOGGER.info( "Video is not mp4, but encoding is disabled. Skipping '%s'...", post.get_id(), ) return if len(m.io) > 100_000_000: LOGGER.error( "Skipping post_id '%s', failed to download attachment! File too large?", post.get_id(), ) return created_records: list[models.AppBskyFeedPost.CreateRecordResponse] = [] baked_media = self._split_media_per_post(post_text, sup_media) for text, attachments in baked_media: if not attachments: if reply_ref and root_ref: new_post = self.bsky.send_post( text, reply_to=models.AppBskyFeedPost.ReplyRef( parent=reply_ref, root=root_ref ), labels=labels, time_iso=post.get_timestamp(), ) else: new_post = self.bsky.send_post( text, labels=labels, time_iso=post.get_timestamp() ) root_ref = models.create_strong_ref(new_post) self.bsky.create_gates( self.options.thread_gate, self.options.quote_gate, new_post.uri, time_iso=post.get_timestamp(), ) reply_ref = models.create_strong_ref(new_post) created_records.append(new_post) else: # if a single post is an image - everything else is an image if attachments[0].mime.startswith("image/"): images: list[bytes] = [] image_alts: list[str] = [] image_aspect_ratios: list[models.AppBskyEmbedDefs.AspectRatio] = [] for attachment in attachments: image_io = compress_image(attachment.io, quality=100) metadata = get_media_meta(image_io) if len(image_io) > 1_000_000: LOGGER.info("Compressing %s...", attachment.name) image_io = compress_image(image_io) images.append(image_io) image_alts.append(attachment.alt) image_aspect_ratios.append( models.AppBskyEmbedDefs.AspectRatio( width=metadata["width"], height=metadata["height"] ) ) new_post = self.bsky.send_images( text=post_text[0], images=images, image_alts=image_alts, image_aspect_ratios=image_aspect_ratios, reply_to=models.AppBskyFeedPost.ReplyRef( parent=reply_ref, root=root_ref ) if root_ref and reply_ref else None, labels=labels, time_iso=post.get_timestamp(), ) if not root_ref: root_ref = models.create_strong_ref(new_post) self.bsky.create_gates( self.options.thread_gate, self.options.quote_gate, new_post.uri, time_iso=post.get_timestamp(), ) reply_ref = models.create_strong_ref(new_post) created_records.append(new_post) else: # video is guarantedd to be one metadata = get_media_meta(attachments[0].io) if metadata["duration"] > 180: LOGGER.info( "Skipping post_id '%s', video attachment too long!", post.get_id(), ) return video_io = attachments[0].io if attachments[0].mime != "video/mp4": LOGGER.info("Converting %s to mp4...", attachments[0].name) video_io = convert_to_mp4(video_io) aspect_ratio = models.AppBskyEmbedDefs.AspectRatio( width=metadata["width"], height=metadata["height"] ) new_post = self.bsky.send_video( text=post_text[0], video=video_io, video_aspect_ratio=aspect_ratio, video_alt=attachments[0].alt, reply_to=models.AppBskyFeedPost.ReplyRef( parent=reply_ref, root=root_ref ) if root_ref and reply_ref else None, labels=labels, time_iso=post.get_timestamp(), ) if not root_ref: root_ref = models.create_strong_ref(new_post) self.bsky.create_gates( self.options.thread_gate, self.options.quote_gate, new_post.uri, time_iso=post.get_timestamp(), ) reply_ref = models.create_strong_ref(new_post) created_records.append(new_post) db_post = database.find_post( self.db, post.get_id(), self.input.user_id, self.input.service ) assert db_post, "ghghghhhhh" if new_root_id is None or new_parent_id is None: new_root_id = database.insert_post( self.db, created_records[0].uri, login.did, SERVICE ) database.store_data( self.db, created_records[0].uri, login.did, SERVICE, {"cid": created_records[0].cid}, ) new_parent_id = new_root_id database.insert_mapping(self.db, db_post["id"], new_parent_id) created_records = created_records[1:] for record in created_records: new_parent_id = database.insert_reply( self.db, record.uri, login.did, SERVICE, new_parent_id, new_root_id ) database.store_data( self.db, record.uri, login.did, SERVICE, {"cid": record.cid} ) database.insert_mapping(self.db, db_post["id"], new_parent_id) def delete_post(self, identifier: str): login = self.__check_login() post = database.find_post( self.db, identifier, self.input.user_id, self.input.service ) if not post: return mappings = database.find_mappings(self.db, post["id"], SERVICE, login.did) for mapping in mappings[::-1]: LOGGER.info("Deleting '%s'...", mapping[0]) self.bsky.delete_post(mapping[0]) database.delete_post(self.db, mapping[0], SERVICE, login.did) def accept_repost(self, repost_id: str, reposted_id: str): login, repost = self.__delete_repost(repost_id) if not (login and repost): return reposted = database.find_post( self.db, reposted_id, self.input.user_id, self.input.service ) if not reposted: return # mappings of the reposted post mappings = database.find_mappings(self.db, reposted["id"], SERVICE, login.did) if mappings: cid = database.fetch_data(self.db, mappings[0][0], login.did, SERVICE)[ "cid" ] rsp = self.bsky.repost(mappings[0][0], cid) internal_id = database.insert_repost( self.db, rsp.uri, reposted["id"], login.did, SERVICE ) database.store_data(self.db, rsp.uri, login.did, SERVICE, {"cid": rsp.cid}) database.insert_mapping(self.db, repost["id"], internal_id) def __delete_repost( self, repost_id: str ) -> tuple[models.AppBskyActorDefs.ProfileViewDetailed | None, dict | None]: login = self.__check_login() repost = database.find_post( self.db, repost_id, self.input.user_id, self.input.service ) if not repost: return None, None mappings = database.find_mappings(self.db, repost["id"], SERVICE, login.did) if mappings: LOGGER.info("Deleting '%s'...", mappings[0][0]) self.bsky.unrepost(mappings[0][0]) database.delete_post(self.db, mappings[0][0], login.did, SERVICE) return login, repost def delete_repost(self, repost_id: str): self.__delete_repost(repost_id)