1import json 2import logging 3import os 4import socket 5from collections.abc import Iterator 6from pathlib import Path 7from queue import Queue 8from typing import Any 9 10logger = logging.getLogger(__name__) 11 12 13class QMPAPIError(RuntimeError): 14 def __init__(self, message: dict[str, Any]): 15 assert "error" in message, "Not an error message!" 16 try: 17 self.class_name = message["class"] 18 self.description = message["desc"] 19 # NOTE: Some errors can occur before the Server is able to read the 20 # id member; in these cases the id member will not be part of the 21 # error response, even if provided by the client. 22 self.transaction_id = message.get("id") 23 except KeyError: 24 raise RuntimeError("Malformed QMP API error response") 25 26 def __str__(self) -> str: 27 return f"<QMP API error related to transaction {self.transaction_id} [{self.class_name}]: {self.description}>" 28 29 30class QMPSession: 31 def __init__(self, sock: socket.socket) -> None: 32 self.sock = sock 33 self.results: Queue[dict[str, str]] = Queue() 34 self.pending_events: Queue[dict[str, Any]] = Queue() 35 self.reader = sock.makefile("r") 36 self.writer = sock.makefile("w") 37 # Make the reader non-blocking so we can kind of select on it. 38 os.set_blocking(self.reader.fileno(), False) 39 hello = self._wait_for_new_result() 40 logger.debug(f"Got greeting from QMP API: {hello}") 41 # The greeting message format is: 42 # { "QMP": { "version": json-object, "capabilities": json-array } } 43 assert "QMP" in hello, f"Unexpected result: {hello}" 44 self.send("qmp_capabilities") 45 46 @classmethod 47 def from_path(cls, path: Path) -> "QMPSession": 48 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 49 sock.connect(str(path)) 50 return cls(sock) 51 52 def __del__(self) -> None: 53 self.sock.close() 54 55 def _wait_for_new_result(self) -> dict[str, str]: 56 assert self.results.empty(), "Results set is not empty, missed results!" 57 while self.results.empty(): 58 self.read_pending_messages() 59 return self.results.get() 60 61 def read_pending_messages(self) -> None: 62 line = self.reader.readline() 63 if not line: 64 return 65 evt_or_result = json.loads(line) 66 logger.debug(f"Received a message: {evt_or_result}") 67 68 # It's a result 69 if "return" in evt_or_result or "QMP" in evt_or_result: 70 self.results.put(evt_or_result) 71 # It's an event 72 elif "event" in evt_or_result: 73 self.pending_events.put(evt_or_result) 74 else: 75 raise QMPAPIError(evt_or_result) 76 77 def wait_for_event(self, timeout: int = 10) -> dict[str, Any]: 78 while self.pending_events.empty(): 79 self.read_pending_messages() 80 81 return self.pending_events.get(timeout=timeout) 82 83 def events(self, timeout: int = 10) -> Iterator[dict[str, Any]]: 84 while not self.pending_events.empty(): 85 yield self.pending_events.get(timeout=timeout) 86 87 def send(self, cmd: str, args: dict[str, str] = {}) -> dict[str, str]: 88 self.read_pending_messages() 89 assert self.results.empty(), "Results set is not empty, missed results!" 90 data: dict[str, Any] = dict(execute=cmd) 91 if args != {}: 92 data["arguments"] = args 93 94 logger.debug(f"Sending {data} to QMP...") 95 json.dump(data, self.writer) 96 self.writer.write("\n") 97 self.writer.flush() 98 return self._wait_for_new_result()