social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
1import asyncio 2import re 3from abc import ABC 4from dataclasses import dataclass, field 5from typing import Any, Callable, override 6 7import websockets 8 9from bluesky.info import SERVICE, BlueskyService, validate_and_transform 10from cross.service import InputService, OutputService 11from database.connection import DatabasePool 12from util.util import LOGGER, normalize_service_url 13 14 15@dataclass(kw_only=True) 16class BlueskyInputOptions: 17 handle: str | None = None 18 did: str | None = None 19 pds: str | None = None 20 filters: list[re.Pattern[str]] = field(default_factory=lambda: []) 21 22 @classmethod 23 def from_dict(cls, data: dict[str, Any]) -> "BlueskyInputOptions": 24 validate_and_transform(data) 25 26 if "filters" in data: 27 data["filters"] = [re.compile(r) for r in data["filters"]] 28 29 return BlueskyInputOptions(**data) 30 31 32@dataclass(kw_only=True) 33class BlueskyJetstreamInputOptions(BlueskyInputOptions): 34 jetstream: str = "wss://jetstream2.us-west.bsky.network/subscribe" 35 36 @classmethod 37 def from_dict(cls, data: dict[str, Any]) -> "BlueskyJetstreamInputOptions": 38 jetstream = data.pop("jetstream", None) 39 40 base = BlueskyInputOptions.from_dict(data).__dict__.copy() 41 if jetstream: 42 base["jetstream"] = normalize_service_url(jetstream) 43 44 return BlueskyJetstreamInputOptions(**base) 45 46 47class BlueskyBaseInputService(BlueskyService, InputService, ABC): 48 def __init__(self, db: DatabasePool) -> None: 49 super().__init__(SERVICE, db) 50 51 52class BlueskyJetstreamInputService(BlueskyBaseInputService): 53 def __init__(self, db: DatabasePool, options: BlueskyJetstreamInputOptions) -> None: 54 super().__init__(db) 55 self.options: BlueskyJetstreamInputOptions = options 56 self._init_identity() 57 58 @override 59 def get_identity_options(self) -> tuple[str | None, str | None, str | None]: 60 return (self.options.handle, self.options.did, self.options.pds) 61 62 @override 63 async def listen( 64 self, 65 outputs: list[OutputService], 66 submitter: Callable[[Callable[[], None]], None], 67 ): 68 url = self.options.jetstream + "?" 69 url += "wantedCollections=app.bsky.feed.post" 70 url += "&wantedCollections=app.bsky.feed.repost" 71 url += f"&wantedDids={self.did}" 72 73 async for ws in websockets.connect(url): 74 try: 75 LOGGER.info("Listening to %s...", self.options.jetstream) 76 77 async def listen_for_messages(): 78 async for msg in ws: 79 LOGGER.info(msg) # TODO 80 81 listen = asyncio.create_task(listen_for_messages()) 82 83 _ = await asyncio.gather(listen) 84 except websockets.ConnectionClosedError as e: 85 LOGGER.error(e, stack_info=True, exc_info=True) 86 LOGGER.info("Reconnecting to %s...", self.options.jetstream) 87 continue