social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky

add misskey base

zenfyr.dev ada6f7dd 33d25a2a

verified
Changed files
+119 -2
mastodon
misskey
+2 -2
mastodon/input.py
···
outputs: list[OutputService],
submitter: Callable[[Callable[[], None]], None],
):
-
uri = f"{self.streaming_url}/api/v1/streaming?stream=user"
async for ws in websockets.connect(
-
uri, additional_headers={"Authorization": f"Bearer {self.options.token}"}
):
try:
LOGGER.info("Listening to %s...", self.streaming_url)
···
outputs: list[OutputService],
submitter: Callable[[Callable[[], None]], None],
):
+
url = f"{self.streaming_url}/api/v1/streaming?stream=user"
async for ws in websockets.connect(
+
url, additional_headers={"Authorization": f"Bearer {self.options.token}"}
):
try:
LOGGER.info("Listening to %s...", self.streaming_url)
+22
misskey/info.py
···
···
+
from abc import ABC, abstractmethod
+
from cross.service import Service
+
+
import requests
+
+
from util.util import LOGGER
+
+
class MisskeyService(ABC, Service):
+
def verify_credentials(self):
+
responce = requests.post(
+
f"{self.url}/api/i",
+
json={"i": self._get_token()},
+
headers={"Content-Type": "application/json"},
+
)
+
if responce.status_code != 200:
+
LOGGER.error("Failed to validate user credentials!")
+
responce.raise_for_status()
+
return dict(responce.json())
+
+
@abstractmethod
+
def _get_token(self) -> str:
+
pass
+95
misskey/input.py
···
···
+
import asyncio
+
from dataclasses import dataclass, field
+
import json
+
from pathlib import Path
+
import re
+
from typing import Any, Callable, override
+
import uuid
+
+
import websockets
+
+
from cross.service import InputService, OutputService
+
from misskey.info import MisskeyService
+
from util.util import LOGGER
+
+
ALLOWED_VISIBILITY = ["public", "home"]
+
+
+
@dataclass
+
class MisskeyInputOptions:
+
token: str
+
instance: str
+
allowed_visibility: list[str] = field(
+
default_factory=lambda: ALLOWED_VISIBILITY.copy()
+
)
+
regex_filters: list[re.Pattern[str]] = field(default_factory=lambda: [])
+
+
@classmethod
+
def from_dict(cls, data: dict[str, Any]) -> "MisskeyInputOptions":
+
data["instance"] = (
+
data["instance"][:-1]
+
if data["instance"].endswith("/")
+
else data["instance"]
+
)
+
+
if "allowed_visibility" in data:
+
for vis in data.get("allowed_visibility", []):
+
if vis not in ALLOWED_VISIBILITY:
+
raise ValueError(f"Invalid visibility option {vis}!")
+
+
if "regex_filters" in data:
+
data["regex_filters"] = [re.compile(r) for r in data["regex_filters"]]
+
+
return MisskeyInputOptions(**data)
+
+
+
class MisskeyInputService(MisskeyService, InputService):
+
def __init__(self, db: Path, options: MisskeyInputOptions) -> None:
+
super().__init__(options.instance, db)
+
self.options: MisskeyInputOptions = options
+
+
LOGGER.info("Verifying %s credentails...", self.url)
+
responce = self.verify_credentials()
+
self.user_id: str = responce["id"]
+
+
@override
+
def _get_token(self) -> str:
+
return self.options.token
+
+
async def _subscribe_to_home(self, ws: websockets.ClientConnection) -> None:
+
await ws.send(
+
json.dumps(
+
{
+
"type": "connect",
+
"body": {"channel": "homeTimeline", "id": str(uuid.uuid4())},
+
}
+
)
+
)
+
LOGGER.info("Subscribed to 'homeTimeline' channel...")
+
+
@override
+
async def listen(
+
self,
+
outputs: list[OutputService],
+
submitter: Callable[[Callable[[], None]], None],
+
):
+
streaming: str = f"{'wss' if self.url.startswith('https') else 'ws'}://{self.url.split('://', 1)[1]}"
+
url: str = f"{streaming}/streaming?i={self.options.token}"
+
+
async for ws in websockets.connect(url):
+
try:
+
LOGGER.info("Listening to %s...", streaming)
+
await self._subscribe_to_home(ws)
+
+
async def listen_for_messages():
+
async for msg in ws:
+
LOGGER.info(msg) # TODO
+
+
#keepalive = asyncio.create_task(self._send_keepalive(ws))
+
listen = asyncio.create_task(listen_for_messages())
+
+
_ = await asyncio.gather(listen)
+
except websockets.ConnectionClosedError as e:
+
LOGGER.error(e, stack_info=True, exc_info=True)
+
LOGGER.info("Reconnecting to %s...", streaming)
+
continue