import asyncio import json import re import uuid from dataclasses import dataclass, field from typing import Any, cast, override import websockets from cross.service import InputService from database.connection import DatabasePool from misskey.info import MisskeyService from util.util import normalize_service_url ALLOWED_VISIBILITY = ["public", "home"] @dataclass class MisskeyInputOptions: token: str instance: str allowed_visibility: list[str] = field( default_factory=lambda: ALLOWED_VISIBILITY.copy() ) filters: list[re.Pattern[str]] = field(default_factory=lambda: []) @classmethod def from_dict(cls, data: dict[str, Any]) -> "MisskeyInputOptions": data["instance"] = normalize_service_url(data["instance"]) if "allowed_visibility" in data: for vis in data.get("allowed_visibility", []): if vis not in ALLOWED_VISIBILITY: raise ValueError(f"Invalid visibility option {vis}!") if "filters" in data: data["filters"] = [re.compile(r) for r in data["filters"]] return MisskeyInputOptions(**data) class MisskeyInputService(MisskeyService, InputService): def __init__(self, db: DatabasePool, options: MisskeyInputOptions) -> None: super().__init__(options.instance, db) self.options: MisskeyInputOptions = options self.log.info("Verifying %s credentails...", self.url) responce = self.verify_credentials() self.user_id: str = responce["id"] @override def _get_token(self) -> str: return self.options.token def _on_note(self, note: dict[str, Any]): self.log.info(note) # TODO def _accept_msg(self, msg: websockets.Data) -> None: data: dict[str, Any] = cast(dict[str, Any], json.loads(msg)) if data["type"] == "channel": type: str = cast(str, data["body"]["type"]) if type == "note" or type == "reply": note_body = data["body"]["body"] self._on_note(note_body) return async def _subscribe_to_home(self, ws: websockets.ClientConnection) -> None: await ws.send( json.dumps( { "type": "connect", "body": {"channel": "homeTimeline", "id": str(uuid.uuid4())}, } ) ) self.log.info("Subscribed to 'homeTimeline' channel...") @override async def listen(self): streaming: str = f"{'wss' if self.url.startswith('https') else 'ws'}://{self.url.split('://', 1)[1]}" url: str = f"{streaming}/streaming?i={self.options.token}" async for ws in websockets.connect(url): try: self.log.info("Listening to %s...", streaming) await self._subscribe_to_home(ws) async def listen_for_messages(): async for msg in ws: self.submitter(lambda: self._accept_msg(msg)) listen = asyncio.create_task(listen_for_messages()) _ = await asyncio.gather(listen) except websockets.ConnectionClosedError as e: self.log.error(e, stack_info=True, exc_info=True) self.log.info("Reconnecting to %s...", streaming) continue