from abc import ABC from dataclasses import dataclass, field import re from typing import Any, Callable, override from bluesky.info import BlueskyService, validate_and_transform from cross.service import InputService, OutputService @dataclass(kw_only=True) class BlueskyInputOptions: handle: str | None did: str | None pds: str | None filters: list[re.Pattern[str]] = field(default_factory=lambda: []) @classmethod def from_dict(cls, data: dict[str, Any]) -> "BlueskyInputOptions": validate_and_transform(data) if "filters" in data: data["filters"] = [re.compile(r) for r in data["filters"]] return BlueskyInputOptions(**data) class BlueskyBaseInputService(BlueskyService, InputService, ABC): pass class BlueskyJetstreamInputService(BlueskyBaseInputService): @override async def listen( self, outputs: list[OutputService], submitter: Callable[[Callable[[], None]], None], ): return await super().listen(outputs, submitter) # TODO