···
5
+
from collections.abc import Iterator
6
+
from pathlib import Path
7
+
from queue import Queue
8
+
from typing import Any
10
+
logger = logging.getLogger(__name__)
13
+
class QMPAPIError(RuntimeError):
14
+
def __init__(self, message: dict[str, Any]):
15
+
assert "error" in message, "Not an error message!"
17
+
self.class_name = message["class"]
18
+
self.description = message["desc"]
19
+
# NOTE: Some errors can occur before the Server is able to read the
20
+
# id member; in these cases the id member will not be part of the
21
+
# error response, even if provided by the client.
22
+
self.transaction_id = message.get("id")
24
+
raise RuntimeError("Malformed QMP API error response")
26
+
def __str__(self) -> str:
27
+
return f"<QMP API error related to transaction {self.transaction_id} [{self.class_name}]: {self.description}>"
31
+
def __init__(self, sock: socket.socket) -> None:
33
+
self.results: Queue[dict[str, str]] = Queue()
34
+
self.pending_events: Queue[dict[str, Any]] = Queue()
35
+
self.reader = sock.makefile("r")
36
+
self.writer = sock.makefile("w")
37
+
# Make the reader non-blocking so we can kind of select on it.
38
+
os.set_blocking(self.reader.fileno(), False)
39
+
hello = self._wait_for_new_result()
40
+
logger.debug(f"Got greeting from QMP API: {hello}")
41
+
# The greeting message format is:
42
+
# { "QMP": { "version": json-object, "capabilities": json-array } }
43
+
assert "QMP" in hello, f"Unexpected result: {hello}"
44
+
self.send("qmp_capabilities")
47
+
def from_path(cls, path: Path) -> "QMPSession":
48
+
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
49
+
sock.connect(str(path))
52
+
def __del__(self) -> None:
55
+
def _wait_for_new_result(self) -> dict[str, str]:
56
+
assert self.results.empty(), "Results set is not empty, missed results!"
57
+
while self.results.empty():
58
+
self.read_pending_messages()
59
+
return self.results.get()
61
+
def read_pending_messages(self) -> None:
62
+
line = self.reader.readline()
65
+
evt_or_result = json.loads(line)
66
+
logger.debug(f"Received a message: {evt_or_result}")
69
+
if "return" in evt_or_result or "QMP" in evt_or_result:
70
+
self.results.put(evt_or_result)
72
+
elif "event" in evt_or_result:
73
+
self.pending_events.put(evt_or_result)
75
+
raise QMPAPIError(evt_or_result)
77
+
def wait_for_event(self, timeout: int = 10) -> dict[str, Any]:
78
+
while self.pending_events.empty():
79
+
self.read_pending_messages()
81
+
return self.pending_events.get(timeout=timeout)
83
+
def events(self, timeout: int = 10) -> Iterator[dict[str, Any]]:
84
+
while not self.pending_events.empty():
85
+
yield self.pending_events.get(timeout=timeout)
87
+
def send(self, cmd: str, args: dict[str, str] = {}) -> dict[str, str]:
88
+
self.read_pending_messages()
89
+
assert self.results.empty(), "Results set is not empty, missed results!"
90
+
data: dict[str, Any] = dict(execute=cmd)
92
+
data["arguments"] = args
94
+
logger.debug(f"Sending {data} to QMP...")
95
+
json.dump(data, self.writer)
96
+
self.writer.write("\n")
98
+
return self._wait_for_new_result()