1import base64
2import io
3import os
4import platform
5import queue
6import re
7import select
8import shlex
9import shutil
10import socket
11import subprocess
12import sys
13import tempfile
14import threading
15import time
16from collections.abc import Callable, Generator
17from contextlib import _GeneratorContextManager, contextmanager, nullcontext
18from pathlib import Path
19from queue import Queue
20from typing import Any
21
22from test_driver.errors import MachineError, RequestedAssertionFailed
23from test_driver.logger import AbstractLogger
24
25from .ocr import perform_ocr_on_screenshot, perform_ocr_variants_on_screenshot
26from .qmp import QMPSession
27
28CHAR_TO_KEY = {
29 "A": "shift-a",
30 "N": "shift-n",
31 "-": "0x0C",
32 "_": "shift-0x0C",
33 "B": "shift-b",
34 "O": "shift-o",
35 "=": "0x0D",
36 "+": "shift-0x0D",
37 "C": "shift-c",
38 "P": "shift-p",
39 "[": "0x1A",
40 "{": "shift-0x1A",
41 "D": "shift-d",
42 "Q": "shift-q",
43 "]": "0x1B",
44 "}": "shift-0x1B",
45 "E": "shift-e",
46 "R": "shift-r",
47 ";": "0x27",
48 ":": "shift-0x27",
49 "F": "shift-f",
50 "S": "shift-s",
51 "'": "0x28",
52 '"': "shift-0x28",
53 "G": "shift-g",
54 "T": "shift-t",
55 "`": "0x29",
56 "~": "shift-0x29",
57 "H": "shift-h",
58 "U": "shift-u",
59 "\\": "0x2B",
60 "|": "shift-0x2B",
61 "I": "shift-i",
62 "V": "shift-v",
63 ",": "0x33",
64 "<": "shift-0x33",
65 "J": "shift-j",
66 "W": "shift-w",
67 ".": "0x34",
68 ">": "shift-0x34",
69 "K": "shift-k",
70 "X": "shift-x",
71 "/": "0x35",
72 "?": "shift-0x35",
73 "L": "shift-l",
74 "Y": "shift-y",
75 " ": "spc",
76 "M": "shift-m",
77 "Z": "shift-z",
78 "\n": "ret",
79 "!": "shift-0x02",
80 "@": "shift-0x03",
81 "#": "shift-0x04",
82 "$": "shift-0x05",
83 "%": "shift-0x06",
84 "^": "shift-0x07",
85 "&": "shift-0x08",
86 "*": "shift-0x09",
87 "(": "shift-0x0A",
88 ")": "shift-0x0B",
89}
90
91
92def make_command(args: list) -> str:
93 return " ".join(map(shlex.quote, (map(str, args))))
94
95
96def retry(fn: Callable, timeout: int = 900) -> None:
97 """Call the given function repeatedly, with 1 second intervals,
98 until it returns True or a timeout is reached.
99 """
100
101 for _ in range(timeout):
102 if fn(False):
103 return
104 time.sleep(1)
105
106 if not fn(True):
107 raise RequestedAssertionFailed(
108 f"action timed out after {timeout} tries with one-second pause in-between"
109 )
110
111
112class StartCommand:
113 """The Base Start Command knows how to append the necessary
114 runtime qemu options as determined by a particular test driver
115 run. Any such start command is expected to happily receive and
116 append additional qemu args.
117 """
118
119 _cmd: str
120
121 def cmd(
122 self,
123 monitor_socket_path: Path,
124 qmp_socket_path: Path,
125 shell_socket_path: Path,
126 allow_reboot: bool = False,
127 ) -> str:
128 display_opts = ""
129
130 display_available = any(x in os.environ for x in ["DISPLAY", "WAYLAND_DISPLAY"])
131 if platform.system() == "Darwin":
132 # We have no DISPLAY variables on macOS and seemingly no better way
133 # to find out
134 display_available = "TERM_PROGRAM" in os.environ
135
136 if not display_available:
137 display_opts += " -nographic"
138
139 # qemu options
140 qemu_opts = (
141 " -device virtio-serial"
142 # Note: virtconsole will map to /dev/hvc0 in Linux guests
143 " -device virtconsole,chardev=shell"
144 " -device virtio-rng-pci"
145 " -serial stdio"
146 )
147 if not allow_reboot:
148 qemu_opts += " -no-reboot"
149
150 return (
151 f"{self._cmd}"
152 f" -qmp unix:{qmp_socket_path},server=on,wait=off"
153 f" -monitor unix:{monitor_socket_path}"
154 f" -chardev socket,id=shell,path={shell_socket_path}"
155 f"{qemu_opts}"
156 f"{display_opts}"
157 )
158
159 @staticmethod
160 def build_environment(
161 state_dir: Path,
162 shared_dir: Path,
163 ) -> dict:
164 # We make a copy to not update the current environment
165 env = dict(os.environ)
166 env.update(
167 {
168 "TMPDIR": str(state_dir),
169 "SHARED_DIR": str(shared_dir),
170 "USE_TMPDIR": "1",
171 }
172 )
173 return env
174
175 def run(
176 self,
177 state_dir: Path,
178 shared_dir: Path,
179 monitor_socket_path: Path,
180 qmp_socket_path: Path,
181 shell_socket_path: Path,
182 allow_reboot: bool,
183 ) -> subprocess.Popen:
184 return subprocess.Popen(
185 self.cmd(
186 monitor_socket_path, qmp_socket_path, shell_socket_path, allow_reboot
187 ),
188 stdin=subprocess.PIPE,
189 stdout=subprocess.PIPE,
190 shell=True,
191 cwd=state_dir,
192 env=self.build_environment(state_dir, shared_dir),
193 )
194
195
196class NixStartScript(StartCommand):
197 """A start script from nixos/modules/virtualiation/qemu-vm.nix
198 that also satisfies the requirement of the BaseStartCommand.
199 These Nix commands have the particular characteristic that the
200 machine name can be extracted out of them via a regex match.
201 (Admittedly a _very_ implicit contract, evtl. TODO fix)
202 """
203
204 def __init__(self, script: str):
205 self._cmd = script
206
207 @property
208 def machine_name(self) -> str:
209 match = re.search("run-(.+)-vm$", self._cmd)
210 name = "machine"
211 if match:
212 name = match.group(1)
213 return name
214
215
216class Machine:
217 """A handle to the machine with this name, that also knows how to manage
218 the machine lifecycle with the help of a start script / command."""
219
220 name: str
221 out_dir: Path
222 tmp_dir: Path
223 shared_dir: Path
224 state_dir: Path
225 monitor_path: Path
226 qmp_path: Path
227 shell_path: Path
228
229 start_command: StartCommand
230 keep_vm_state: bool
231
232 process: subprocess.Popen | None
233 pid: int | None
234 monitor: socket.socket | None
235 qmp_client: QMPSession | None
236 shell: socket.socket | None
237 serial_thread: threading.Thread | None
238
239 booted: bool
240 connected: bool
241 # Store last serial console lines for use
242 # of wait_for_console_text
243 last_lines: Queue = Queue()
244 # Store all console output for full log retrieval
245 full_console_log: list[str]
246 callbacks: list[Callable]
247
248 def __repr__(self) -> str:
249 return f"<Machine '{self.name}'>"
250
251 def __init__(
252 self,
253 out_dir: Path,
254 tmp_dir: Path,
255 start_command: StartCommand,
256 logger: AbstractLogger,
257 name: str = "machine",
258 keep_vm_state: bool = False,
259 callbacks: list[Callable] | None = None,
260 ) -> None:
261 self.out_dir = out_dir
262 self.tmp_dir = tmp_dir
263 self.keep_vm_state = keep_vm_state
264 self.name = name
265 self.start_command = start_command
266 self.callbacks = callbacks if callbacks is not None else []
267 self.logger = logger
268 self.full_console_log = []
269
270 # set up directories
271 self.shared_dir = self.tmp_dir / "shared-xchg"
272 self.shared_dir.mkdir(mode=0o700, exist_ok=True)
273
274 self.state_dir = self.tmp_dir / f"vm-state-{self.name}"
275 self.monitor_path = self.state_dir / "monitor"
276 self.qmp_path = self.state_dir / "qmp"
277 self.shell_path = self.state_dir / "shell"
278 if (not self.keep_vm_state) and self.state_dir.exists():
279 self.cleanup_statedir()
280 self.state_dir.mkdir(mode=0o700, exist_ok=True)
281
282 self.process = None
283 self.pid = None
284 self.monitor = None
285 self.qmp_client = None
286 self.shell = None
287 self.serial_thread = None
288
289 self.booted = False
290 self.connected = False
291
292 def is_up(self) -> bool:
293 return self.booted and self.connected
294
295 def log(self, msg: str) -> None:
296 self.logger.log(msg, {"machine": self.name})
297
298 def log_serial(self, msg: str) -> None:
299 self.logger.log_serial(msg, self.name)
300
301 def nested(self, msg: str, attrs: dict[str, str] = {}) -> _GeneratorContextManager:
302 my_attrs = {"machine": self.name}
303 my_attrs.update(attrs)
304 return self.logger.nested(msg, my_attrs)
305
306 def wait_for_monitor_prompt(self) -> str:
307 assert self.monitor is not None
308 answer = ""
309 while True:
310 undecoded_answer = self.monitor.recv(1024)
311 if not undecoded_answer:
312 break
313 answer += undecoded_answer.decode()
314 if answer.endswith("(qemu) "):
315 break
316 return answer
317
318 def send_monitor_command(self, command: str) -> str:
319 """
320 Send a command to the QEMU monitor. This allows attaching
321 virtual USB disks to a running machine, among other things.
322 """
323 self.run_callbacks()
324 message = f"{command}\n".encode()
325 assert self.monitor is not None
326 self.monitor.send(message)
327 return self.wait_for_monitor_prompt()
328
329 def wait_for_unit(
330 self, unit: str, user: str | None = None, timeout: int = 900
331 ) -> None:
332 """
333 Wait for a systemd unit to get into "active" state.
334 Throws exceptions on "failed" and "inactive" states as well as after
335 timing out.
336 """
337
338 def check_active(_last_try: bool) -> bool:
339 state = self.get_unit_property(unit, "ActiveState", user)
340 if state == "failed":
341 raise RequestedAssertionFailed(f'unit "{unit}" reached state "{state}"')
342
343 if state == "inactive":
344 status, jobs = self.systemctl("list-jobs --full 2>&1", user)
345 if "No jobs" in jobs:
346 info = self.get_unit_info(unit, user)
347 if info["ActiveState"] == state:
348 raise RequestedAssertionFailed(
349 f'unit "{unit}" is inactive and there are no pending jobs'
350 )
351
352 return state == "active"
353
354 with self.nested(
355 f"waiting for unit {unit}"
356 + (f" with user {user}" if user is not None else "")
357 ):
358 retry(check_active, timeout)
359
360 def get_unit_info(self, unit: str, user: str | None = None) -> dict[str, str]:
361 status, lines = self.systemctl(f'--no-pager show "{unit}"', user)
362 if status != 0:
363 raise RequestedAssertionFailed(
364 f'retrieving systemctl info for unit "{unit}"'
365 + ("" if user is None else f' under user "{user}"')
366 + f" failed with exit code {status}"
367 )
368
369 line_pattern = re.compile(r"^([^=]+)=(.*)$")
370
371 def tuple_from_line(line: str) -> tuple[str, str]:
372 match = line_pattern.match(line)
373 assert match is not None
374 return match[1], match[2]
375
376 return dict(
377 tuple_from_line(line)
378 for line in lines.split("\n")
379 if line_pattern.match(line)
380 )
381
382 def get_unit_property(
383 self,
384 unit: str,
385 property: str,
386 user: str | None = None,
387 ) -> str:
388 status, lines = self.systemctl(
389 f'--no-pager show "{unit}" --property="{property}"',
390 user,
391 )
392 if status != 0:
393 raise RequestedAssertionFailed(
394 f'retrieving systemctl property "{property}" for unit "{unit}"'
395 + ("" if user is None else f' under user "{user}"')
396 + f" failed with exit code {status}"
397 )
398
399 invalid_output_message = (
400 f'systemctl show --property "{property}" "{unit}"'
401 f"produced invalid output: {lines}"
402 )
403
404 line_pattern = re.compile(r"^([^=]+)=(.*)$")
405 match = line_pattern.match(lines)
406 assert match is not None, invalid_output_message
407
408 assert match[1] == property, invalid_output_message
409 return match[2]
410
411 def systemctl(self, q: str, user: str | None = None) -> tuple[int, str]:
412 """
413 Runs `systemctl` commands with optional support for
414 `systemctl --user`
415
416 ```py
417 # run `systemctl list-jobs --no-pager`
418 machine.systemctl("list-jobs --no-pager")
419
420 # spawn a shell for `any-user` and run
421 # `systemctl --user list-jobs --no-pager`
422 machine.systemctl("list-jobs --no-pager", "any-user")
423 ```
424 """
425 if user is not None:
426 q = q.replace("'", "\\'")
427 return self.execute(
428 f"su -l {user} --shell /bin/sh -c "
429 "$'XDG_RUNTIME_DIR=/run/user/`id -u` "
430 f"systemctl --user {q}'"
431 )
432 return self.execute(f"systemctl {q}")
433
434 def require_unit_state(self, unit: str, require_state: str = "active") -> None:
435 with self.nested(
436 f"checking if unit '{unit}' has reached state '{require_state}'"
437 ):
438 info = self.get_unit_info(unit)
439 state = info["ActiveState"]
440 if state != require_state:
441 raise RequestedAssertionFailed(
442 f"Expected unit '{unit}' to to be in state "
443 f"'{require_state}' but it is in state '{state}'"
444 )
445
446 def _next_newline_closed_block_from_shell(self) -> str:
447 assert self.shell
448 output_buffer = []
449 while True:
450 # This receives up to 4096 bytes from the socket
451 chunk = self.shell.recv(4096)
452 if not chunk:
453 # Probably a broken pipe, return the output we have
454 break
455
456 decoded = chunk.decode()
457 output_buffer += [decoded]
458 if decoded[-1] == "\n":
459 break
460 return "".join(output_buffer)
461
462 def execute(
463 self,
464 command: str,
465 check_return: bool = True,
466 check_output: bool = True,
467 timeout: int | None = 900,
468 ) -> tuple[int, str]:
469 """
470 Execute a shell command, returning a list `(status, stdout)`.
471
472 Commands are run with `set -euo pipefail` set:
473
474 - If several commands are separated by `;` and one fails, the
475 command as a whole will fail.
476
477 - For pipelines, the last non-zero exit status will be returned
478 (if there is one; otherwise zero will be returned).
479
480 - Dereferencing unset variables fails the command.
481
482 - It will wait for stdout to be closed.
483
484 If the command detaches, it must close stdout, as `execute` will wait
485 for this to consume all output reliably. This can be achieved by
486 redirecting stdout to stderr `>&2`, to `/dev/console`, `/dev/null` or
487 a file. Examples of detaching commands are `sleep 365d &`, where the
488 shell forks a new process that can write to stdout and `xclip -i`, where
489 the `xclip` command itself forks without closing stdout.
490
491 Takes an optional parameter `check_return` that defaults to `True`.
492 Setting this parameter to `False` will not check for the return code
493 and return -1 instead. This can be used for commands that shut down
494 the VM and would therefore break the pipe that would be used for
495 retrieving the return code.
496
497 A timeout for the command can be specified (in seconds) using the optional
498 `timeout` parameter, e.g., `execute(cmd, timeout=10)` or
499 `execute(cmd, timeout=None)`. The default is 900 seconds.
500 """
501 self.run_callbacks()
502 self.connect()
503
504 # Always run command with shell opts
505 command = f"set -euo pipefail; {command}"
506
507 timeout_str = ""
508 if timeout is not None:
509 timeout_str = f"timeout {timeout}"
510
511 # While sh is bash on NixOS, this is not the case for every distro.
512 # We explicitly call bash here to allow for the driver to boot other distros as well.
513 out_command = (
514 f"{timeout_str} bash -c {shlex.quote(command)} | (base64 -w 0; echo)\n"
515 )
516
517 assert self.shell
518 self.shell.send(out_command.encode())
519
520 if not check_output:
521 return (-2, "")
522
523 # Get the output
524 output = base64.b64decode(self._next_newline_closed_block_from_shell())
525
526 if not check_return:
527 return (-1, output.decode())
528
529 # Get the return code
530 self.shell.send(b"echo ${PIPESTATUS[0]}\n")
531 rc = int(self._next_newline_closed_block_from_shell().strip())
532
533 return (rc, output.decode(errors="replace"))
534
535 def shell_interact(self, address: str | None = None) -> None:
536 """
537 Allows you to directly interact with the guest shell. This should
538 only be used during test development, not in production tests.
539 Killing the interactive session with `Ctrl-d` or `Ctrl-c` also ends
540 the guest session.
541 """
542 self.connect()
543
544 if address is None:
545 address = "READLINE,prompt=$ "
546 self.log("Terminal is ready (there is no initial prompt):")
547
548 assert self.shell
549 try:
550 subprocess.run(
551 ["socat", address, f"FD:{self.shell.fileno()}"],
552 pass_fds=[self.shell.fileno()],
553 )
554 # allow users to cancel this command without breaking the test
555 except KeyboardInterrupt:
556 pass
557
558 def console_interact(self) -> None:
559 """
560 Allows you to directly interact with QEMU's stdin, by forwarding
561 terminal input to the QEMU process.
562 This is for use with the interactive test driver, not for production
563 tests, which run unattended.
564 Output from QEMU is only read line-wise. `Ctrl-c` kills QEMU and
565 `Ctrl-d` closes console and returns to the test runner.
566 """
567 self.log("Terminal is ready (there is no prompt):")
568
569 assert self.process
570 assert self.process.stdin
571
572 while True:
573 try:
574 char = sys.stdin.buffer.read(1)
575 except KeyboardInterrupt:
576 break
577 if char == b"": # ctrl+d
578 self.log("Closing connection to the console")
579 break
580 self.send_console(char.decode())
581
582 def succeed(self, *commands: str, timeout: int | None = None) -> str:
583 """
584 Execute a shell command, raising an exception if the exit status is
585 not zero, otherwise returning the standard output. Similar to `execute`,
586 except that the timeout is `None` by default. See `execute` for details on
587 command execution.
588 """
589 output = ""
590 for command in commands:
591 with self.nested(f"must succeed: {command}"):
592 (status, out) = self.execute(command, timeout=timeout)
593 if status != 0:
594 self.log(f"output: {out}")
595 raise RequestedAssertionFailed(
596 f"command `{command}` failed (exit code {status})"
597 )
598 output += out
599 return output
600
601 def fail(self, *commands: str, timeout: int | None = None) -> str:
602 """
603 Like `succeed`, but raising an exception if the command returns a zero
604 status.
605 """
606 output = ""
607 for command in commands:
608 with self.nested(f"must fail: {command}"):
609 (status, out) = self.execute(command, timeout=timeout)
610 if status == 0:
611 raise RequestedAssertionFailed(
612 f"command `{command}` unexpectedly succeeded"
613 )
614 output += out
615 return output
616
617 def wait_until_succeeds(self, command: str, timeout: int = 900) -> str:
618 """
619 Repeat a shell command with 1-second intervals until it succeeds.
620 Has a default timeout of 900 seconds which can be modified, e.g.
621 `wait_until_succeeds(cmd, timeout=10)`. See `execute` for details on
622 command execution.
623 Throws an exception on timeout.
624 """
625 output = ""
626
627 def check_success(_last_try: bool) -> bool:
628 nonlocal output
629 status, output = self.execute(command, timeout=timeout)
630 return status == 0
631
632 with self.nested(f"waiting for success: {command}"):
633 retry(check_success, timeout)
634 return output
635
636 def wait_until_fails(self, command: str, timeout: int = 900) -> str:
637 """
638 Like `wait_until_succeeds`, but repeating the command until it fails.
639 """
640 output = ""
641
642 def check_failure(_last_try: bool) -> bool:
643 nonlocal output
644 status, output = self.execute(command, timeout=timeout)
645 return status != 0
646
647 with self.nested(f"waiting for failure: {command}"):
648 retry(check_failure, timeout)
649 return output
650
651 def wait_for_shutdown(self) -> None:
652 if not self.booted:
653 return
654
655 with self.nested("waiting for the VM to power off"):
656 sys.stdout.flush()
657 assert self.process
658 self.process.wait()
659
660 self.pid = None
661 self.booted = False
662 self.connected = False
663
664 def wait_for_qmp_event(
665 self, event_filter: Callable[[dict[str, Any]], bool], timeout: int = 60 * 10
666 ) -> dict[str, Any]:
667 """
668 Wait for a QMP event which you can filter with the `event_filter` function.
669 The function takes as an input a dictionary of the event and if it returns True, we return that event,
670 if it does not, we wait for the next event and retry.
671
672 It will skip all events received in the meantime, if you want to keep them,
673 you have to do the bookkeeping yourself and store them somewhere.
674
675 By default, it will wait up to 10 minutes, `timeout` is in seconds.
676 """
677 if self.qmp_client is None:
678 raise RuntimeError("QMP API is not ready yet, is the VM ready?")
679
680 start = time.time()
681 while True:
682 evt = self.qmp_client.wait_for_event(timeout=timeout)
683 if event_filter(evt):
684 return evt
685
686 elapsed = time.time() - start
687 if elapsed >= timeout:
688 raise TimeoutError
689
690 def get_tty_text(self, tty: str) -> str:
691 status, output = self.execute(
692 f"fold -w$(stty -F /dev/tty{tty} size | awk '{{print $2}}') /dev/vcs{tty}"
693 )
694 return output
695
696 def wait_until_tty_matches(self, tty: str, regexp: str, timeout: int = 900) -> None:
697 """Wait until the visible output on the chosen TTY matches regular
698 expression. Throws an exception on timeout.
699 """
700 matcher = re.compile(regexp)
701
702 def tty_matches(last_try: bool) -> bool:
703 text = self.get_tty_text(tty)
704 if last_try:
705 self.log(
706 f"Last chance to match /{regexp}/ on TTY{tty}, "
707 f"which currently contains: {text}"
708 )
709 return len(matcher.findall(text)) > 0
710
711 with self.nested(f"waiting for {regexp} to appear on tty {tty}"):
712 retry(tty_matches, timeout)
713
714 def send_chars(self, chars: str, delay: float | None = 0.01) -> None:
715 r"""
716 Simulate typing a sequence of characters on the virtual keyboard,
717 e.g., `send_chars("foobar\n")` will type the string `foobar`
718 followed by the Enter key.
719 """
720 with self.nested(f"sending keys {repr(chars)}"):
721 for char in chars:
722 self.send_key(char, delay, log=False)
723
724 def wait_for_file(self, filename: str, timeout: int = 900) -> None:
725 """
726 Waits until the file exists in the machine's file system.
727 """
728
729 def check_file(_last_try: bool) -> bool:
730 status, _ = self.execute(f"test -e {filename}")
731 return status == 0
732
733 with self.nested(f"waiting for file '{filename}'"):
734 retry(check_file, timeout)
735
736 def wait_for_open_port(
737 self, port: int, addr: str = "localhost", timeout: int = 900
738 ) -> None:
739 """
740 Wait until a process is listening on the given TCP port and IP address
741 (default `localhost`).
742 """
743
744 def port_is_open(_last_try: bool) -> bool:
745 status, _ = self.execute(f"nc -z {addr} {port}")
746 return status == 0
747
748 with self.nested(f"waiting for TCP port {port} on {addr}"):
749 retry(port_is_open, timeout)
750
751 def wait_for_open_unix_socket(
752 self, addr: str, is_datagram: bool = False, timeout: int = 900
753 ) -> None:
754 """
755 Wait until a process is listening on the given UNIX-domain socket
756 (default to a UNIX-domain stream socket).
757 """
758
759 nc_flags = [
760 "-z",
761 "-uU" if is_datagram else "-U",
762 ]
763
764 def socket_is_open(_last_try: bool) -> bool:
765 status, _ = self.execute(f"nc {' '.join(nc_flags)} {addr}")
766 return status == 0
767
768 with self.nested(
769 f"waiting for UNIX-domain {'datagram' if is_datagram else 'stream'} on '{addr}'"
770 ):
771 retry(socket_is_open, timeout)
772
773 def wait_for_closed_port(
774 self, port: int, addr: str = "localhost", timeout: int = 900
775 ) -> None:
776 """
777 Wait until nobody is listening on the given TCP port and IP address
778 (default `localhost`).
779 """
780
781 def port_is_closed(_last_try: bool) -> bool:
782 status, _ = self.execute(f"nc -z {addr} {port}")
783 return status != 0
784
785 with self.nested(f"waiting for TCP port {port} on {addr} to be closed"):
786 retry(port_is_closed, timeout)
787
788 def start_job(self, jobname: str, user: str | None = None) -> tuple[int, str]:
789 return self.systemctl(f"start {jobname}", user)
790
791 def stop_job(self, jobname: str, user: str | None = None) -> tuple[int, str]:
792 return self.systemctl(f"stop {jobname}", user)
793
794 def wait_for_job(self, jobname: str) -> None:
795 self.wait_for_unit(jobname)
796
797 def connect(self) -> None:
798 def shell_ready(timeout_secs: int) -> bool:
799 """We sent some data from the backdoor service running on the guest
800 to indicate that the backdoor shell is ready.
801 As soon as we read some data from the socket here, we assume that
802 our root shell is operational.
803 """
804 (ready, _, _) = select.select([self.shell], [], [], timeout_secs)
805 return bool(ready)
806
807 if self.connected:
808 return
809
810 with self.nested("waiting for the VM to finish booting"):
811 self.start()
812
813 assert self.shell
814
815 tic = time.time()
816 # TODO: do we want to bail after a set number of attempts?
817 while not shell_ready(timeout_secs=30):
818 self.log("Guest root shell did not produce any data yet...")
819 self.log(
820 " To debug, enter the VM and run 'systemctl status backdoor.service'."
821 )
822
823 while True:
824 chunk = self.shell.recv(1024)
825 # No need to print empty strings, it means we are waiting.
826 if len(chunk) == 0:
827 continue
828 self.log(f"Guest shell says: {chunk!r}")
829 # NOTE: for this to work, nothing must be printed after this line!
830 if b"Spawning backdoor root shell..." in chunk:
831 break
832
833 toc = time.time()
834
835 self.log("connected to guest root shell")
836 self.log(f"(connecting took {toc - tic:.2f} seconds)")
837 self.connected = True
838
839 @contextmanager
840 def _managed_screenshot(self) -> Generator[Path]:
841 """
842 Take a screenshot and yield the screenshot filepath.
843 The file will be deleted when leaving the generator.
844 """
845 with tempfile.TemporaryDirectory() as tmpdir:
846 screenshot_path: Path = Path(tmpdir) / "ppm"
847 self.send_monitor_command(f"screendump {screenshot_path}")
848 yield screenshot_path
849
850 def screenshot(self, filename: str) -> None:
851 """
852 Take a picture of the display of the virtual machine, in PNG format.
853 The screenshot will be available in the derivation output.
854 """
855 if "." not in filename:
856 filename += ".png"
857 if "/" not in filename:
858 filename = os.path.join(self.out_dir, filename)
859
860 with self.nested(
861 f"making screenshot {filename}",
862 {"image": os.path.basename(filename)},
863 ):
864 with self._managed_screenshot() as screenshot_path:
865 ret = subprocess.run(
866 f"pnmtopng '{screenshot_path}' > '{filename}'", shell=True
867 )
868 if ret.returncode != 0:
869 raise MachineError(
870 f"Cannot convert screenshot (pnmtopng returned code {ret.returncode})"
871 )
872
873 def copy_from_host_via_shell(self, source: str, target: str) -> None:
874 """Copy a file from the host into the guest by piping it over the
875 shell into the destination file. Works without host-guest shared folder.
876 Prefer copy_from_host for whenever possible.
877 """
878 with open(source, "rb") as fh:
879 content_b64 = base64.b64encode(fh.read()).decode()
880 self.succeed(
881 f"mkdir -p $(dirname {target})",
882 f"echo -n {content_b64} | base64 -d > {target}",
883 )
884
885 def copy_from_host(self, source: str, target: str) -> None:
886 """
887 Copies a file from host to machine, e.g.,
888 `copy_from_host("myfile", "/etc/my/important/file")`.
889
890 The first argument is the file on the host. Note that the "host" refers
891 to the environment in which the test driver runs, which is typically the
892 Nix build sandbox.
893
894 The second argument is the location of the file on the machine that will
895 be written to.
896
897 The file is copied via the `shared_dir` directory which is shared among
898 all the VMs (using a temporary directory).
899 The access rights bits will mimic the ones from the host file and
900 user:group will be root:root.
901 """
902 host_src = Path(source)
903 vm_target = Path(target)
904 with tempfile.TemporaryDirectory(dir=self.shared_dir) as shared_td:
905 shared_temp = Path(shared_td)
906 host_intermediate = shared_temp / host_src.name
907 vm_shared_temp = Path("/tmp/shared") / shared_temp.name
908 vm_intermediate = vm_shared_temp / host_src.name
909
910 self.succeed(make_command(["mkdir", "-p", vm_shared_temp]))
911 if host_src.is_dir():
912 shutil.copytree(host_src, host_intermediate)
913 else:
914 shutil.copy(host_src, host_intermediate)
915 self.succeed(make_command(["mkdir", "-p", vm_target.parent]))
916 self.succeed(make_command(["cp", "-r", vm_intermediate, vm_target]))
917
918 def copy_from_vm(self, source: str, target_dir: str = "") -> None:
919 """Copy a file from the VM (specified by an in-VM source path) to a path
920 relative to `$out`. The file is copied via the `shared_dir` shared among
921 all the VMs (using a temporary directory).
922 """
923 # Compute the source, target, and intermediate shared file names
924 vm_src = Path(source)
925 with tempfile.TemporaryDirectory(dir=self.shared_dir) as shared_td:
926 shared_temp = Path(shared_td)
927 vm_shared_temp = Path("/tmp/shared") / shared_temp.name
928 vm_intermediate = vm_shared_temp / vm_src.name
929 intermediate = shared_temp / vm_src.name
930 # Copy the file to the shared directory inside VM
931 self.succeed(make_command(["mkdir", "-p", vm_shared_temp]))
932 self.succeed(make_command(["cp", "-r", vm_src, vm_intermediate]))
933 abs_target = self.out_dir / target_dir / vm_src.name
934 abs_target.parent.mkdir(exist_ok=True, parents=True)
935 # Copy the file from the shared directory outside VM
936 if intermediate.is_dir():
937 shutil.copytree(intermediate, abs_target)
938 else:
939 shutil.copy(intermediate, abs_target)
940
941 def dump_tty_contents(self, tty: str) -> None:
942 """Debugging: Dump the contents of the TTY<n>"""
943 self.execute(f"fold -w 80 /dev/vcs{tty} | systemd-cat")
944
945 def get_screen_text_variants(self) -> list[str]:
946 """
947 Return a list of different interpretations of what is currently
948 visible on the machine's screen using optical character
949 recognition. The number and order of the interpretations is not
950 specified and is subject to change, but if no exception is raised at
951 least one will be returned.
952
953 ::: {.note}
954 This requires [`enableOCR`](#test-opt-enableOCR) to be set to `true`.
955 :::
956 """
957 with self._managed_screenshot() as screenshot_path:
958 return perform_ocr_variants_on_screenshot(screenshot_path)
959
960 def get_screen_text(self) -> str:
961 """
962 Return a textual representation of what is currently visible on the
963 machine's screen using optical character recognition.
964
965 ::: {.note}
966 This requires [`enableOCR`](#test-opt-enableOCR) to be set to `true`.
967 :::
968 """
969 with self._managed_screenshot() as screenshot_path:
970 return perform_ocr_on_screenshot(screenshot_path)
971
972 def wait_for_text(self, regex: str, timeout: int = 900) -> None:
973 """
974 Wait until the supplied regular expressions matches the textual
975 contents of the screen by using optical character recognition (see
976 `get_screen_text` and `get_screen_text_variants`).
977
978 ::: {.note}
979 This requires [`enableOCR`](#test-opt-enableOCR) to be set to `true`.
980 :::
981 """
982
983 def screen_matches(last_try: bool) -> bool:
984 variants = self.get_screen_text_variants()
985 for text in variants:
986 if re.search(regex, text) is not None:
987 return True
988
989 if last_try:
990 self.log(f"Last OCR attempt failed. Text was: {variants}")
991
992 return False
993
994 with self.nested(f"waiting for {regex} to appear on screen"):
995 retry(screen_matches, timeout)
996
997 def wait_for_console_text(self, regex: str, timeout: int | None = None) -> None:
998 """
999 Wait until the supplied regular expressions match a line of the
1000 serial console output.
1001 This method is useful when OCR is not possible or inaccurate.
1002
1003 When this method returns, the console output that includes the match has already become part of get_console_log().
1004 """
1005 # Buffer the console output, this is needed
1006 # to match multiline regexes.
1007 console = io.StringIO()
1008
1009 def console_matches(_last_try: bool) -> bool:
1010 nonlocal console
1011 try:
1012 # This will return as soon as possible and
1013 # sleep 1 second.
1014 console.write(self.last_lines.get(block=False))
1015 except queue.Empty:
1016 pass
1017 console.seek(0)
1018 matches = re.search(regex, console.read())
1019 return matches is not None
1020
1021 with self.nested(f"waiting for {regex} to appear on console"):
1022 if timeout is not None:
1023 retry(console_matches, timeout)
1024 else:
1025 while not console_matches(False):
1026 pass
1027
1028 def get_console_log(self) -> str:
1029 """
1030 Get the full console output from the machine since boot.
1031 Returns all serial console output as a single string.
1032 """
1033 return "\n".join(self.full_console_log)
1034
1035 def send_key(
1036 self, key: str, delay: float | None = 0.01, log: bool | None = True
1037 ) -> None:
1038 """
1039 Simulate pressing keys on the virtual keyboard, e.g.,
1040 `send_key("ctrl-alt-delete")`.
1041
1042 Please also refer to the QEMU documentation for more information on the
1043 input syntax: https://en.wikibooks.org/wiki/QEMU/Monitor#sendkey_keys
1044 """
1045 key = CHAR_TO_KEY.get(key, key)
1046 context = self.nested(f"sending key {repr(key)}") if log else nullcontext()
1047 with context:
1048 self.send_monitor_command(f"sendkey {key}")
1049 if delay is not None:
1050 time.sleep(delay)
1051
1052 def send_console(self, chars: str) -> None:
1053 r"""
1054 Send keys to the kernel console. This allows interaction with the systemd
1055 emergency mode, for example. Takes a string that is sent, e.g.,
1056 `send_console("\n\nsystemctl default\n")`.
1057 """
1058 assert self.process
1059 assert self.process.stdin
1060 self.process.stdin.write(chars.encode())
1061 self.process.stdin.flush()
1062
1063 def start(self, allow_reboot: bool = False) -> None:
1064 """
1065 Start the virtual machine. This method is asynchronous --- it does
1066 not wait for the machine to finish booting.
1067 """
1068 if self.booted:
1069 return
1070
1071 self.log("starting vm")
1072
1073 def clear(path: Path) -> Path:
1074 if path.exists():
1075 path.unlink()
1076 return path
1077
1078 def create_socket(path: Path) -> socket.socket:
1079 s = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM)
1080 s.bind(str(path))
1081 s.listen(1)
1082 return s
1083
1084 monitor_socket = create_socket(clear(self.monitor_path))
1085 shell_socket = create_socket(clear(self.shell_path))
1086 self.process = self.start_command.run(
1087 self.state_dir,
1088 self.shared_dir,
1089 self.monitor_path,
1090 self.qmp_path,
1091 self.shell_path,
1092 allow_reboot,
1093 )
1094 self.monitor, _ = monitor_socket.accept()
1095 self.shell, _ = shell_socket.accept()
1096 self.qmp_client = QMPSession.from_path(self.qmp_path)
1097
1098 # Store last serial console lines for use
1099 # of wait_for_console_text
1100 self.last_lines: Queue = Queue()
1101 # Re-initialize (if this is not the first start)
1102 self.full_console_log: list[str] = []
1103
1104 def process_serial_output() -> None:
1105 assert self.process
1106 assert self.process.stdout
1107 for _line in self.process.stdout:
1108 # Ignore undecodable bytes that may occur in boot menus
1109 line = _line.decode(errors="ignore").replace("\r", "").rstrip()
1110 self.full_console_log.append(line)
1111 # Put on queue after adding to full_console_log to guarantee ordering
1112 self.last_lines.put(line)
1113 self.log_serial(line)
1114
1115 self.serial_thread = threading.Thread(target=process_serial_output)
1116 self.serial_thread.start()
1117
1118 self.wait_for_monitor_prompt()
1119
1120 self.pid = self.process.pid
1121 self.booted = True
1122
1123 self.log(f"QEMU running (pid {self.pid})")
1124
1125 def cleanup_statedir(self) -> None:
1126 shutil.rmtree(self.state_dir)
1127 self.logger.log(f"deleting VM state directory {self.state_dir}")
1128 self.logger.log("if you want to keep the VM state, pass --keep-vm-state")
1129
1130 def shutdown(self) -> None:
1131 """
1132 Shut down the machine, waiting for the VM to exit.
1133 """
1134 if not self.booted:
1135 return
1136
1137 assert self.shell
1138 self.shell.send(b"poweroff\n")
1139 self.wait_for_shutdown()
1140
1141 def crash(self) -> None:
1142 """
1143 Simulate a sudden power failure, by telling the VM to exit immediately.
1144 """
1145 if not self.booted:
1146 return
1147
1148 self.log("forced crash")
1149 self.send_monitor_command("quit")
1150 self.wait_for_shutdown()
1151
1152 def reboot(self) -> None:
1153 """Press Ctrl+Alt+Delete in the guest.
1154
1155 Prepares the machine to be reconnected which is useful if the
1156 machine was started with `allow_reboot = True`
1157 """
1158 self.send_key("ctrl-alt-delete")
1159 self.connected = False
1160
1161 def wait_for_x(self, timeout: int = 900) -> None:
1162 """
1163 Wait until it is possible to connect to the X server.
1164 """
1165
1166 def check_x(_last_try: bool) -> bool:
1167 cmd = (
1168 "journalctl -b SYSLOG_IDENTIFIER=systemd | "
1169 + 'grep "Reached target Current graphical"'
1170 )
1171 status, _ = self.execute(cmd)
1172 if status != 0:
1173 return False
1174 status, _ = self.execute("[ -e /tmp/.X11-unix/X0 ]")
1175 return status == 0
1176
1177 with self.nested("waiting for the X11 server"):
1178 retry(check_x, timeout)
1179
1180 def get_window_names(self) -> list[str]:
1181 return self.succeed(
1182 r"xwininfo -root -tree | sed 's/.*0x[0-9a-f]* \"\([^\"]*\)\".*/\1/; t; d'"
1183 ).splitlines()
1184
1185 def wait_for_window(self, regexp: str, timeout: int = 900) -> None:
1186 """
1187 Wait until an X11 window has appeared whose name matches the given
1188 regular expression, e.g., `wait_for_window("Terminal")`.
1189 """
1190 pattern = re.compile(regexp)
1191
1192 def window_is_visible(last_try: bool) -> bool:
1193 names = self.get_window_names()
1194 if last_try:
1195 self.log(
1196 f"Last chance to match {regexp} on the window list,"
1197 + " which currently contains: "
1198 + ", ".join(names)
1199 )
1200 return any(pattern.search(name) for name in names)
1201
1202 with self.nested("waiting for a window to appear"):
1203 retry(window_is_visible, timeout)
1204
1205 def sleep(self, secs: int) -> None:
1206 # We want to sleep in *guest* time, not *host* time.
1207 self.succeed(f"sleep {secs}")
1208
1209 def forward_port(self, host_port: int = 8080, guest_port: int = 80) -> None:
1210 """
1211 Forward a TCP port on the host to a TCP port on the guest.
1212 Useful during interactive testing.
1213 """
1214 self.send_monitor_command(f"hostfwd_add tcp::{host_port}-:{guest_port}")
1215
1216 def block(self) -> None:
1217 """
1218 Simulate unplugging the Ethernet cable that connects the machine to
1219 the other machines.
1220 This happens by shutting down eth1 (the multicast interface used to talk
1221 to the other VMs). eth0 is kept online to still enable the test driver
1222 to communicate with the machine.
1223 """
1224 self.send_monitor_command("set_link virtio-net-pci.1 off")
1225
1226 def unblock(self) -> None:
1227 """
1228 Undo the effect of `block`.
1229 """
1230 self.send_monitor_command("set_link virtio-net-pci.1 on")
1231
1232 def release(self) -> None:
1233 if self.pid is None:
1234 return
1235 self.logger.info(f"kill machine (pid {self.pid})")
1236 assert self.process
1237 assert self.shell
1238 assert self.monitor
1239 assert self.serial_thread
1240
1241 self.process.terminate()
1242 self.shell.close()
1243 self.monitor.close()
1244 self.serial_thread.join()
1245
1246 if self.qmp_client:
1247 self.qmp_client.close()
1248
1249 def run_callbacks(self) -> None:
1250 for callback in self.callbacks:
1251 callback()
1252
1253 def switch_root(self) -> None:
1254 """
1255 Transition from stage 1 to stage 2. This requires the
1256 machine to be configured with `testing.initrdBackdoor = true`
1257 and `boot.initrd.systemd.enable = true`.
1258 """
1259 self.wait_for_unit("initrd.target")
1260 self.execute(
1261 "systemctl isolate --no-block initrd-switch-root.target 2>/dev/null >/dev/null",
1262 check_return=False,
1263 check_output=False,
1264 )
1265 self.connected = False
1266 self.connect()