social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
1import asyncio 2import json 3import re 4from abc import ABC 5from dataclasses import dataclass, field 6from typing import Any, cast, override 7 8import websockets 9 10from bluesky.info import SERVICE, BlueskyService, validate_and_transform 11from cross.service import InputService 12from database.connection import DatabasePool 13from util.util import LOGGER, normalize_service_url 14 15 16@dataclass(kw_only=True) 17class BlueskyInputOptions: 18 handle: str | None = None 19 did: str | None = None 20 pds: str | None = None 21 filters: list[re.Pattern[str]] = field(default_factory=lambda: []) 22 23 @classmethod 24 def from_dict(cls, data: dict[str, Any]) -> "BlueskyInputOptions": 25 validate_and_transform(data) 26 27 if "filters" in data: 28 data["filters"] = [re.compile(r) for r in data["filters"]] 29 30 return BlueskyInputOptions(**data) 31 32 33@dataclass(kw_only=True) 34class BlueskyJetstreamInputOptions(BlueskyInputOptions): 35 jetstream: str = "wss://jetstream2.us-west.bsky.network/subscribe" 36 37 @classmethod 38 def from_dict(cls, data: dict[str, Any]) -> "BlueskyJetstreamInputOptions": 39 jetstream = data.pop("jetstream", None) 40 41 base = BlueskyInputOptions.from_dict(data).__dict__.copy() 42 if jetstream: 43 base["jetstream"] = normalize_service_url(jetstream) 44 45 return BlueskyJetstreamInputOptions(**base) 46 47 48class BlueskyBaseInputService(BlueskyService, InputService, ABC): 49 def __init__(self, db: DatabasePool) -> None: 50 super().__init__(SERVICE, db) 51 52 def _on_post(self, record: dict[str, Any]): 53 LOGGER.info(record) # TODO 54 55 def _on_repost(self, record: dict[str, Any]): 56 LOGGER.info(record) # TODO 57 58 def _on_delete_post(self, post_id: str, repost: bool): 59 LOGGER.info("%s | %s", post_id, repost) # TODO 60 61 62class BlueskyJetstreamInputService(BlueskyBaseInputService): 63 def __init__(self, db: DatabasePool, options: BlueskyJetstreamInputOptions) -> None: 64 super().__init__(db) 65 self.options: BlueskyJetstreamInputOptions = options 66 self._init_identity() 67 68 @override 69 def get_identity_options(self) -> tuple[str | None, str | None, str | None]: 70 return (self.options.handle, self.options.did, self.options.pds) 71 72 def _accept_msg(self, msg: websockets.Data) -> None: 73 data: dict[str, Any] = cast(dict[str, Any], json.loads(msg)) 74 if data.get("did") != self.did: 75 return 76 commit: dict[str, Any] | None = data.get("commit") 77 if not commit: 78 return 79 80 commit_type: str = cast(str, commit["operation"]) 81 match commit_type: 82 case "create": 83 record: dict[str, Any] = cast(dict[str, Any], commit["record"]) 84 record["$xpost.strongRef"] = { 85 "cid": commit["cid"], 86 "uri": f"at://{self.did}/{commit['collection']}/{commit['rkey']}", 87 } 88 89 match cast(str, commit["collection"]): 90 case "app.bsky.feed.post": 91 self._on_post(record) 92 case "app.bsky.feed.repost": 93 self._on_repost(record) 94 case _: 95 pass 96 case "delete": 97 post_id: str = ( 98 f"at://{self.did}/{commit['collection']}/{commit['rkey']}" 99 ) 100 match cast(str, commit["collection"]): 101 case "app.bsky.feed.post": 102 self._on_delete_post(post_id, False) 103 case "app.bsky.feed.repost": 104 self._on_delete_post(post_id, True) 105 case _: 106 pass 107 case _: 108 pass 109 110 @override 111 async def listen(self): 112 url = self.options.jetstream + "?" 113 url += "wantedCollections=app.bsky.feed.post" 114 url += "&wantedCollections=app.bsky.feed.repost" 115 url += f"&wantedDids={self.did}" 116 117 async for ws in websockets.connect(url): 118 try: 119 LOGGER.info("Listening to %s...", self.options.jetstream) 120 121 async def listen_for_messages(): 122 async for msg in ws: 123 self.submitter(lambda: self._accept_msg(msg)) 124 125 listen = asyncio.create_task(listen_for_messages()) 126 127 _ = await asyncio.gather(listen) 128 except websockets.ConnectionClosedError as e: 129 LOGGER.error(e, stack_info=True, exc_info=True) 130 LOGGER.info("Reconnecting to %s...", self.options.jetstream) 131 continue