1import base64 2import io 3import os 4import platform 5import queue 6import re 7import select 8import shlex 9import shutil 10import socket 11import subprocess 12import sys 13import tempfile 14import threading 15import time 16from collections.abc import Callable, Generator 17from contextlib import _GeneratorContextManager, contextmanager, nullcontext 18from pathlib import Path 19from queue import Queue 20from typing import Any 21 22from test_driver.errors import MachineError, RequestedAssertionFailed 23from test_driver.logger import AbstractLogger 24 25from .ocr import perform_ocr_on_screenshot, perform_ocr_variants_on_screenshot 26from .qmp import QMPSession 27 28CHAR_TO_KEY = { 29 "A": "shift-a", 30 "N": "shift-n", 31 "-": "0x0C", 32 "_": "shift-0x0C", 33 "B": "shift-b", 34 "O": "shift-o", 35 "=": "0x0D", 36 "+": "shift-0x0D", 37 "C": "shift-c", 38 "P": "shift-p", 39 "[": "0x1A", 40 "{": "shift-0x1A", 41 "D": "shift-d", 42 "Q": "shift-q", 43 "]": "0x1B", 44 "}": "shift-0x1B", 45 "E": "shift-e", 46 "R": "shift-r", 47 ";": "0x27", 48 ":": "shift-0x27", 49 "F": "shift-f", 50 "S": "shift-s", 51 "'": "0x28", 52 '"': "shift-0x28", 53 "G": "shift-g", 54 "T": "shift-t", 55 "`": "0x29", 56 "~": "shift-0x29", 57 "H": "shift-h", 58 "U": "shift-u", 59 "\\": "0x2B", 60 "|": "shift-0x2B", 61 "I": "shift-i", 62 "V": "shift-v", 63 ",": "0x33", 64 "<": "shift-0x33", 65 "J": "shift-j", 66 "W": "shift-w", 67 ".": "0x34", 68 ">": "shift-0x34", 69 "K": "shift-k", 70 "X": "shift-x", 71 "/": "0x35", 72 "?": "shift-0x35", 73 "L": "shift-l", 74 "Y": "shift-y", 75 " ": "spc", 76 "M": "shift-m", 77 "Z": "shift-z", 78 "\n": "ret", 79 "!": "shift-0x02", 80 "@": "shift-0x03", 81 "#": "shift-0x04", 82 "$": "shift-0x05", 83 "%": "shift-0x06", 84 "^": "shift-0x07", 85 "&": "shift-0x08", 86 "*": "shift-0x09", 87 "(": "shift-0x0A", 88 ")": "shift-0x0B", 89} 90 91 92def make_command(args: list) -> str: 93 return " ".join(map(shlex.quote, (map(str, args)))) 94 95 96def retry(fn: Callable, timeout: int = 900) -> None: 97 """Call the given function repeatedly, with 1 second intervals, 98 until it returns True or a timeout is reached. 99 """ 100 101 for _ in range(timeout): 102 if fn(False): 103 return 104 time.sleep(1) 105 106 if not fn(True): 107 raise RequestedAssertionFailed( 108 f"action timed out after {timeout} tries with one-second pause in-between" 109 ) 110 111 112class StartCommand: 113 """The Base Start Command knows how to append the necessary 114 runtime qemu options as determined by a particular test driver 115 run. Any such start command is expected to happily receive and 116 append additional qemu args. 117 """ 118 119 _cmd: str 120 121 def cmd( 122 self, 123 monitor_socket_path: Path, 124 qmp_socket_path: Path, 125 shell_socket_path: Path, 126 allow_reboot: bool = False, 127 ) -> str: 128 display_opts = "" 129 130 display_available = any(x in os.environ for x in ["DISPLAY", "WAYLAND_DISPLAY"]) 131 if platform.system() == "Darwin": 132 # We have no DISPLAY variables on macOS and seemingly no better way 133 # to find out 134 display_available = "TERM_PROGRAM" in os.environ 135 136 if not display_available: 137 display_opts += " -nographic" 138 139 # qemu options 140 qemu_opts = ( 141 " -device virtio-serial" 142 # Note: virtconsole will map to /dev/hvc0 in Linux guests 143 " -device virtconsole,chardev=shell" 144 " -device virtio-rng-pci" 145 " -serial stdio" 146 ) 147 if not allow_reboot: 148 qemu_opts += " -no-reboot" 149 150 return ( 151 f"{self._cmd}" 152 f" -qmp unix:{qmp_socket_path},server=on,wait=off" 153 f" -monitor unix:{monitor_socket_path}" 154 f" -chardev socket,id=shell,path={shell_socket_path}" 155 f"{qemu_opts}" 156 f"{display_opts}" 157 ) 158 159 @staticmethod 160 def build_environment( 161 state_dir: Path, 162 shared_dir: Path, 163 ) -> dict: 164 # We make a copy to not update the current environment 165 env = dict(os.environ) 166 env.update( 167 { 168 "TMPDIR": str(state_dir), 169 "SHARED_DIR": str(shared_dir), 170 "USE_TMPDIR": "1", 171 } 172 ) 173 return env 174 175 def run( 176 self, 177 state_dir: Path, 178 shared_dir: Path, 179 monitor_socket_path: Path, 180 qmp_socket_path: Path, 181 shell_socket_path: Path, 182 allow_reboot: bool, 183 ) -> subprocess.Popen: 184 return subprocess.Popen( 185 self.cmd( 186 monitor_socket_path, qmp_socket_path, shell_socket_path, allow_reboot 187 ), 188 stdin=subprocess.PIPE, 189 stdout=subprocess.PIPE, 190 shell=True, 191 cwd=state_dir, 192 env=self.build_environment(state_dir, shared_dir), 193 ) 194 195 196class NixStartScript(StartCommand): 197 """A start script from nixos/modules/virtualiation/qemu-vm.nix 198 that also satisfies the requirement of the BaseStartCommand. 199 These Nix commands have the particular characteristic that the 200 machine name can be extracted out of them via a regex match. 201 (Admittedly a _very_ implicit contract, evtl. TODO fix) 202 """ 203 204 def __init__(self, script: str): 205 self._cmd = script 206 207 @property 208 def machine_name(self) -> str: 209 match = re.search("run-(.+)-vm$", self._cmd) 210 name = "machine" 211 if match: 212 name = match.group(1) 213 return name 214 215 216class Machine: 217 """A handle to the machine with this name, that also knows how to manage 218 the machine lifecycle with the help of a start script / command.""" 219 220 name: str 221 out_dir: Path 222 tmp_dir: Path 223 shared_dir: Path 224 state_dir: Path 225 monitor_path: Path 226 qmp_path: Path 227 shell_path: Path 228 229 start_command: StartCommand 230 keep_vm_state: bool 231 232 process: subprocess.Popen | None 233 pid: int | None 234 monitor: socket.socket | None 235 qmp_client: QMPSession | None 236 shell: socket.socket | None 237 serial_thread: threading.Thread | None 238 239 booted: bool 240 connected: bool 241 # Store last serial console lines for use 242 # of wait_for_console_text 243 last_lines: Queue = Queue() 244 # Store all console output for full log retrieval 245 full_console_log: list[str] 246 callbacks: list[Callable] 247 248 def __repr__(self) -> str: 249 return f"<Machine '{self.name}'>" 250 251 def __init__( 252 self, 253 out_dir: Path, 254 tmp_dir: Path, 255 start_command: StartCommand, 256 logger: AbstractLogger, 257 name: str = "machine", 258 keep_vm_state: bool = False, 259 callbacks: list[Callable] | None = None, 260 ) -> None: 261 self.out_dir = out_dir 262 self.tmp_dir = tmp_dir 263 self.keep_vm_state = keep_vm_state 264 self.name = name 265 self.start_command = start_command 266 self.callbacks = callbacks if callbacks is not None else [] 267 self.logger = logger 268 self.full_console_log = [] 269 270 # set up directories 271 self.shared_dir = self.tmp_dir / "shared-xchg" 272 self.shared_dir.mkdir(mode=0o700, exist_ok=True) 273 274 self.state_dir = self.tmp_dir / f"vm-state-{self.name}" 275 self.monitor_path = self.state_dir / "monitor" 276 self.qmp_path = self.state_dir / "qmp" 277 self.shell_path = self.state_dir / "shell" 278 if (not self.keep_vm_state) and self.state_dir.exists(): 279 self.cleanup_statedir() 280 self.state_dir.mkdir(mode=0o700, exist_ok=True) 281 282 self.process = None 283 self.pid = None 284 self.monitor = None 285 self.qmp_client = None 286 self.shell = None 287 self.serial_thread = None 288 289 self.booted = False 290 self.connected = False 291 292 def is_up(self) -> bool: 293 return self.booted and self.connected 294 295 def log(self, msg: str) -> None: 296 self.logger.log(msg, {"machine": self.name}) 297 298 def log_serial(self, msg: str) -> None: 299 self.logger.log_serial(msg, self.name) 300 301 def nested(self, msg: str, attrs: dict[str, str] = {}) -> _GeneratorContextManager: 302 my_attrs = {"machine": self.name} 303 my_attrs.update(attrs) 304 return self.logger.nested(msg, my_attrs) 305 306 def wait_for_monitor_prompt(self) -> str: 307 assert self.monitor is not None 308 answer = "" 309 while True: 310 undecoded_answer = self.monitor.recv(1024) 311 if not undecoded_answer: 312 break 313 answer += undecoded_answer.decode() 314 if answer.endswith("(qemu) "): 315 break 316 return answer 317 318 def send_monitor_command(self, command: str) -> str: 319 """ 320 Send a command to the QEMU monitor. This allows attaching 321 virtual USB disks to a running machine, among other things. 322 """ 323 self.run_callbacks() 324 message = f"{command}\n".encode() 325 assert self.monitor is not None 326 self.monitor.send(message) 327 return self.wait_for_monitor_prompt() 328 329 def wait_for_unit( 330 self, unit: str, user: str | None = None, timeout: int = 900 331 ) -> None: 332 """ 333 Wait for a systemd unit to get into "active" state. 334 Throws exceptions on "failed" and "inactive" states as well as after 335 timing out. 336 """ 337 338 def check_active(_last_try: bool) -> bool: 339 state = self.get_unit_property(unit, "ActiveState", user) 340 if state == "failed": 341 raise RequestedAssertionFailed(f'unit "{unit}" reached state "{state}"') 342 343 if state == "inactive": 344 status, jobs = self.systemctl("list-jobs --full 2>&1", user) 345 if "No jobs" in jobs: 346 info = self.get_unit_info(unit, user) 347 if info["ActiveState"] == state: 348 raise RequestedAssertionFailed( 349 f'unit "{unit}" is inactive and there are no pending jobs' 350 ) 351 352 return state == "active" 353 354 with self.nested( 355 f"waiting for unit {unit}" 356 + (f" with user {user}" if user is not None else "") 357 ): 358 retry(check_active, timeout) 359 360 def get_unit_info(self, unit: str, user: str | None = None) -> dict[str, str]: 361 status, lines = self.systemctl(f'--no-pager show "{unit}"', user) 362 if status != 0: 363 raise RequestedAssertionFailed( 364 f'retrieving systemctl info for unit "{unit}"' 365 + ("" if user is None else f' under user "{user}"') 366 + f" failed with exit code {status}" 367 ) 368 369 line_pattern = re.compile(r"^([^=]+)=(.*)$") 370 371 def tuple_from_line(line: str) -> tuple[str, str]: 372 match = line_pattern.match(line) 373 assert match is not None 374 return match[1], match[2] 375 376 return dict( 377 tuple_from_line(line) 378 for line in lines.split("\n") 379 if line_pattern.match(line) 380 ) 381 382 def get_unit_property( 383 self, 384 unit: str, 385 property: str, 386 user: str | None = None, 387 ) -> str: 388 status, lines = self.systemctl( 389 f'--no-pager show "{unit}" --property="{property}"', 390 user, 391 ) 392 if status != 0: 393 raise RequestedAssertionFailed( 394 f'retrieving systemctl property "{property}" for unit "{unit}"' 395 + ("" if user is None else f' under user "{user}"') 396 + f" failed with exit code {status}" 397 ) 398 399 invalid_output_message = ( 400 f'systemctl show --property "{property}" "{unit}"' 401 f"produced invalid output: {lines}" 402 ) 403 404 line_pattern = re.compile(r"^([^=]+)=(.*)$") 405 match = line_pattern.match(lines) 406 assert match is not None, invalid_output_message 407 408 assert match[1] == property, invalid_output_message 409 return match[2] 410 411 def systemctl(self, q: str, user: str | None = None) -> tuple[int, str]: 412 """ 413 Runs `systemctl` commands with optional support for 414 `systemctl --user` 415 416 ```py 417 # run `systemctl list-jobs --no-pager` 418 machine.systemctl("list-jobs --no-pager") 419 420 # spawn a shell for `any-user` and run 421 # `systemctl --user list-jobs --no-pager` 422 machine.systemctl("list-jobs --no-pager", "any-user") 423 ``` 424 """ 425 if user is not None: 426 q = q.replace("'", "\\'") 427 return self.execute( 428 f"su -l {user} --shell /bin/sh -c " 429 "$'XDG_RUNTIME_DIR=/run/user/`id -u` " 430 f"systemctl --user {q}'" 431 ) 432 return self.execute(f"systemctl {q}") 433 434 def require_unit_state(self, unit: str, require_state: str = "active") -> None: 435 with self.nested( 436 f"checking if unit '{unit}' has reached state '{require_state}'" 437 ): 438 info = self.get_unit_info(unit) 439 state = info["ActiveState"] 440 if state != require_state: 441 raise RequestedAssertionFailed( 442 f"Expected unit '{unit}' to to be in state " 443 f"'{require_state}' but it is in state '{state}'" 444 ) 445 446 def _next_newline_closed_block_from_shell(self) -> str: 447 assert self.shell 448 output_buffer = [] 449 while True: 450 # This receives up to 4096 bytes from the socket 451 chunk = self.shell.recv(4096) 452 if not chunk: 453 # Probably a broken pipe, return the output we have 454 break 455 456 decoded = chunk.decode() 457 output_buffer += [decoded] 458 if decoded[-1] == "\n": 459 break 460 return "".join(output_buffer) 461 462 def execute( 463 self, 464 command: str, 465 check_return: bool = True, 466 check_output: bool = True, 467 timeout: int | None = 900, 468 ) -> tuple[int, str]: 469 """ 470 Execute a shell command, returning a list `(status, stdout)`. 471 472 Commands are run with `set -euo pipefail` set: 473 474 - If several commands are separated by `;` and one fails, the 475 command as a whole will fail. 476 477 - For pipelines, the last non-zero exit status will be returned 478 (if there is one; otherwise zero will be returned). 479 480 - Dereferencing unset variables fails the command. 481 482 - It will wait for stdout to be closed. 483 484 If the command detaches, it must close stdout, as `execute` will wait 485 for this to consume all output reliably. This can be achieved by 486 redirecting stdout to stderr `>&2`, to `/dev/console`, `/dev/null` or 487 a file. Examples of detaching commands are `sleep 365d &`, where the 488 shell forks a new process that can write to stdout and `xclip -i`, where 489 the `xclip` command itself forks without closing stdout. 490 491 Takes an optional parameter `check_return` that defaults to `True`. 492 Setting this parameter to `False` will not check for the return code 493 and return -1 instead. This can be used for commands that shut down 494 the VM and would therefore break the pipe that would be used for 495 retrieving the return code. 496 497 A timeout for the command can be specified (in seconds) using the optional 498 `timeout` parameter, e.g., `execute(cmd, timeout=10)` or 499 `execute(cmd, timeout=None)`. The default is 900 seconds. 500 """ 501 self.run_callbacks() 502 self.connect() 503 504 # Always run command with shell opts 505 command = f"set -euo pipefail; {command}" 506 507 timeout_str = "" 508 if timeout is not None: 509 timeout_str = f"timeout {timeout}" 510 511 # While sh is bash on NixOS, this is not the case for every distro. 512 # We explicitly call bash here to allow for the driver to boot other distros as well. 513 out_command = ( 514 f"{timeout_str} bash -c {shlex.quote(command)} | (base64 -w 0; echo)\n" 515 ) 516 517 assert self.shell 518 self.shell.send(out_command.encode()) 519 520 if not check_output: 521 return (-2, "") 522 523 # Get the output 524 output = base64.b64decode(self._next_newline_closed_block_from_shell()) 525 526 if not check_return: 527 return (-1, output.decode()) 528 529 # Get the return code 530 self.shell.send(b"echo ${PIPESTATUS[0]}\n") 531 rc = int(self._next_newline_closed_block_from_shell().strip()) 532 533 return (rc, output.decode(errors="replace")) 534 535 def shell_interact(self, address: str | None = None) -> None: 536 """ 537 Allows you to directly interact with the guest shell. This should 538 only be used during test development, not in production tests. 539 Killing the interactive session with `Ctrl-d` or `Ctrl-c` also ends 540 the guest session. 541 """ 542 self.connect() 543 544 if address is None: 545 address = "READLINE,prompt=$ " 546 self.log("Terminal is ready (there is no initial prompt):") 547 548 assert self.shell 549 try: 550 subprocess.run( 551 ["socat", address, f"FD:{self.shell.fileno()}"], 552 pass_fds=[self.shell.fileno()], 553 ) 554 # allow users to cancel this command without breaking the test 555 except KeyboardInterrupt: 556 pass 557 558 def console_interact(self) -> None: 559 """ 560 Allows you to directly interact with QEMU's stdin, by forwarding 561 terminal input to the QEMU process. 562 This is for use with the interactive test driver, not for production 563 tests, which run unattended. 564 Output from QEMU is only read line-wise. `Ctrl-c` kills QEMU and 565 `Ctrl-d` closes console and returns to the test runner. 566 """ 567 self.log("Terminal is ready (there is no prompt):") 568 569 assert self.process 570 assert self.process.stdin 571 572 while True: 573 try: 574 char = sys.stdin.buffer.read(1) 575 except KeyboardInterrupt: 576 break 577 if char == b"": # ctrl+d 578 self.log("Closing connection to the console") 579 break 580 self.send_console(char.decode()) 581 582 def succeed(self, *commands: str, timeout: int | None = None) -> str: 583 """ 584 Execute a shell command, raising an exception if the exit status is 585 not zero, otherwise returning the standard output. Similar to `execute`, 586 except that the timeout is `None` by default. See `execute` for details on 587 command execution. 588 """ 589 output = "" 590 for command in commands: 591 with self.nested(f"must succeed: {command}"): 592 (status, out) = self.execute(command, timeout=timeout) 593 if status != 0: 594 self.log(f"output: {out}") 595 raise RequestedAssertionFailed( 596 f"command `{command}` failed (exit code {status})" 597 ) 598 output += out 599 return output 600 601 def fail(self, *commands: str, timeout: int | None = None) -> str: 602 """ 603 Like `succeed`, but raising an exception if the command returns a zero 604 status. 605 """ 606 output = "" 607 for command in commands: 608 with self.nested(f"must fail: {command}"): 609 (status, out) = self.execute(command, timeout=timeout) 610 if status == 0: 611 raise RequestedAssertionFailed( 612 f"command `{command}` unexpectedly succeeded" 613 ) 614 output += out 615 return output 616 617 def wait_until_succeeds(self, command: str, timeout: int = 900) -> str: 618 """ 619 Repeat a shell command with 1-second intervals until it succeeds. 620 Has a default timeout of 900 seconds which can be modified, e.g. 621 `wait_until_succeeds(cmd, timeout=10)`. See `execute` for details on 622 command execution. 623 Throws an exception on timeout. 624 """ 625 output = "" 626 627 def check_success(_last_try: bool) -> bool: 628 nonlocal output 629 status, output = self.execute(command, timeout=timeout) 630 return status == 0 631 632 with self.nested(f"waiting for success: {command}"): 633 retry(check_success, timeout) 634 return output 635 636 def wait_until_fails(self, command: str, timeout: int = 900) -> str: 637 """ 638 Like `wait_until_succeeds`, but repeating the command until it fails. 639 """ 640 output = "" 641 642 def check_failure(_last_try: bool) -> bool: 643 nonlocal output 644 status, output = self.execute(command, timeout=timeout) 645 return status != 0 646 647 with self.nested(f"waiting for failure: {command}"): 648 retry(check_failure, timeout) 649 return output 650 651 def wait_for_shutdown(self) -> None: 652 if not self.booted: 653 return 654 655 with self.nested("waiting for the VM to power off"): 656 sys.stdout.flush() 657 assert self.process 658 self.process.wait() 659 660 self.pid = None 661 self.booted = False 662 self.connected = False 663 664 def wait_for_qmp_event( 665 self, event_filter: Callable[[dict[str, Any]], bool], timeout: int = 60 * 10 666 ) -> dict[str, Any]: 667 """ 668 Wait for a QMP event which you can filter with the `event_filter` function. 669 The function takes as an input a dictionary of the event and if it returns True, we return that event, 670 if it does not, we wait for the next event and retry. 671 672 It will skip all events received in the meantime, if you want to keep them, 673 you have to do the bookkeeping yourself and store them somewhere. 674 675 By default, it will wait up to 10 minutes, `timeout` is in seconds. 676 """ 677 if self.qmp_client is None: 678 raise RuntimeError("QMP API is not ready yet, is the VM ready?") 679 680 start = time.time() 681 while True: 682 evt = self.qmp_client.wait_for_event(timeout=timeout) 683 if event_filter(evt): 684 return evt 685 686 elapsed = time.time() - start 687 if elapsed >= timeout: 688 raise TimeoutError 689 690 def get_tty_text(self, tty: str) -> str: 691 status, output = self.execute( 692 f"fold -w$(stty -F /dev/tty{tty} size | awk '{{print $2}}') /dev/vcs{tty}" 693 ) 694 return output 695 696 def wait_until_tty_matches(self, tty: str, regexp: str, timeout: int = 900) -> None: 697 """Wait until the visible output on the chosen TTY matches regular 698 expression. Throws an exception on timeout. 699 """ 700 matcher = re.compile(regexp) 701 702 def tty_matches(last_try: bool) -> bool: 703 text = self.get_tty_text(tty) 704 if last_try: 705 self.log( 706 f"Last chance to match /{regexp}/ on TTY{tty}, " 707 f"which currently contains: {text}" 708 ) 709 return len(matcher.findall(text)) > 0 710 711 with self.nested(f"waiting for {regexp} to appear on tty {tty}"): 712 retry(tty_matches, timeout) 713 714 def send_chars(self, chars: str, delay: float | None = 0.01) -> None: 715 r""" 716 Simulate typing a sequence of characters on the virtual keyboard, 717 e.g., `send_chars("foobar\n")` will type the string `foobar` 718 followed by the Enter key. 719 """ 720 with self.nested(f"sending keys {repr(chars)}"): 721 for char in chars: 722 self.send_key(char, delay, log=False) 723 724 def wait_for_file(self, filename: str, timeout: int = 900) -> None: 725 """ 726 Waits until the file exists in the machine's file system. 727 """ 728 729 def check_file(_last_try: bool) -> bool: 730 status, _ = self.execute(f"test -e {filename}") 731 return status == 0 732 733 with self.nested(f"waiting for file '{filename}'"): 734 retry(check_file, timeout) 735 736 def wait_for_open_port( 737 self, port: int, addr: str = "localhost", timeout: int = 900 738 ) -> None: 739 """ 740 Wait until a process is listening on the given TCP port and IP address 741 (default `localhost`). 742 """ 743 744 def port_is_open(_last_try: bool) -> bool: 745 status, _ = self.execute(f"nc -z {addr} {port}") 746 return status == 0 747 748 with self.nested(f"waiting for TCP port {port} on {addr}"): 749 retry(port_is_open, timeout) 750 751 def wait_for_open_unix_socket( 752 self, addr: str, is_datagram: bool = False, timeout: int = 900 753 ) -> None: 754 """ 755 Wait until a process is listening on the given UNIX-domain socket 756 (default to a UNIX-domain stream socket). 757 """ 758 759 nc_flags = [ 760 "-z", 761 "-uU" if is_datagram else "-U", 762 ] 763 764 def socket_is_open(_last_try: bool) -> bool: 765 status, _ = self.execute(f"nc {' '.join(nc_flags)} {addr}") 766 return status == 0 767 768 with self.nested( 769 f"waiting for UNIX-domain {'datagram' if is_datagram else 'stream'} on '{addr}'" 770 ): 771 retry(socket_is_open, timeout) 772 773 def wait_for_closed_port( 774 self, port: int, addr: str = "localhost", timeout: int = 900 775 ) -> None: 776 """ 777 Wait until nobody is listening on the given TCP port and IP address 778 (default `localhost`). 779 """ 780 781 def port_is_closed(_last_try: bool) -> bool: 782 status, _ = self.execute(f"nc -z {addr} {port}") 783 return status != 0 784 785 with self.nested(f"waiting for TCP port {port} on {addr} to be closed"): 786 retry(port_is_closed, timeout) 787 788 def start_job(self, jobname: str, user: str | None = None) -> tuple[int, str]: 789 return self.systemctl(f"start {jobname}", user) 790 791 def stop_job(self, jobname: str, user: str | None = None) -> tuple[int, str]: 792 return self.systemctl(f"stop {jobname}", user) 793 794 def wait_for_job(self, jobname: str) -> None: 795 self.wait_for_unit(jobname) 796 797 def connect(self) -> None: 798 def shell_ready(timeout_secs: int) -> bool: 799 """We sent some data from the backdoor service running on the guest 800 to indicate that the backdoor shell is ready. 801 As soon as we read some data from the socket here, we assume that 802 our root shell is operational. 803 """ 804 (ready, _, _) = select.select([self.shell], [], [], timeout_secs) 805 return bool(ready) 806 807 if self.connected: 808 return 809 810 with self.nested("waiting for the VM to finish booting"): 811 self.start() 812 813 assert self.shell 814 815 tic = time.time() 816 # TODO: do we want to bail after a set number of attempts? 817 while not shell_ready(timeout_secs=30): 818 self.log("Guest root shell did not produce any data yet...") 819 self.log( 820 " To debug, enter the VM and run 'systemctl status backdoor.service'." 821 ) 822 823 while True: 824 chunk = self.shell.recv(1024) 825 # No need to print empty strings, it means we are waiting. 826 if len(chunk) == 0: 827 continue 828 self.log(f"Guest shell says: {chunk!r}") 829 # NOTE: for this to work, nothing must be printed after this line! 830 if b"Spawning backdoor root shell..." in chunk: 831 break 832 833 toc = time.time() 834 835 self.log("connected to guest root shell") 836 self.log(f"(connecting took {toc - tic:.2f} seconds)") 837 self.connected = True 838 839 @contextmanager 840 def _managed_screenshot(self) -> Generator[Path]: 841 """ 842 Take a screenshot and yield the screenshot filepath. 843 The file will be deleted when leaving the generator. 844 """ 845 with tempfile.TemporaryDirectory() as tmpdir: 846 screenshot_path: Path = Path(tmpdir) / "ppm" 847 self.send_monitor_command(f"screendump {screenshot_path}") 848 yield screenshot_path 849 850 def screenshot(self, filename: str) -> None: 851 """ 852 Take a picture of the display of the virtual machine, in PNG format. 853 The screenshot will be available in the derivation output. 854 """ 855 if "." not in filename: 856 filename += ".png" 857 if "/" not in filename: 858 filename = os.path.join(self.out_dir, filename) 859 860 with self.nested( 861 f"making screenshot {filename}", 862 {"image": os.path.basename(filename)}, 863 ): 864 with self._managed_screenshot() as screenshot_path: 865 ret = subprocess.run( 866 f"pnmtopng '{screenshot_path}' > '{filename}'", shell=True 867 ) 868 if ret.returncode != 0: 869 raise MachineError( 870 f"Cannot convert screenshot (pnmtopng returned code {ret.returncode})" 871 ) 872 873 def copy_from_host_via_shell(self, source: str, target: str) -> None: 874 """Copy a file from the host into the guest by piping it over the 875 shell into the destination file. Works without host-guest shared folder. 876 Prefer copy_from_host for whenever possible. 877 """ 878 with open(source, "rb") as fh: 879 content_b64 = base64.b64encode(fh.read()).decode() 880 self.succeed( 881 f"mkdir -p $(dirname {target})", 882 f"echo -n {content_b64} | base64 -d > {target}", 883 ) 884 885 def copy_from_host(self, source: str, target: str) -> None: 886 """ 887 Copies a file from host to machine, e.g., 888 `copy_from_host("myfile", "/etc/my/important/file")`. 889 890 The first argument is the file on the host. Note that the "host" refers 891 to the environment in which the test driver runs, which is typically the 892 Nix build sandbox. 893 894 The second argument is the location of the file on the machine that will 895 be written to. 896 897 The file is copied via the `shared_dir` directory which is shared among 898 all the VMs (using a temporary directory). 899 The access rights bits will mimic the ones from the host file and 900 user:group will be root:root. 901 """ 902 host_src = Path(source) 903 vm_target = Path(target) 904 with tempfile.TemporaryDirectory(dir=self.shared_dir) as shared_td: 905 shared_temp = Path(shared_td) 906 host_intermediate = shared_temp / host_src.name 907 vm_shared_temp = Path("/tmp/shared") / shared_temp.name 908 vm_intermediate = vm_shared_temp / host_src.name 909 910 self.succeed(make_command(["mkdir", "-p", vm_shared_temp])) 911 if host_src.is_dir(): 912 shutil.copytree(host_src, host_intermediate) 913 else: 914 shutil.copy(host_src, host_intermediate) 915 self.succeed(make_command(["mkdir", "-p", vm_target.parent])) 916 self.succeed(make_command(["cp", "-r", vm_intermediate, vm_target])) 917 918 def copy_from_vm(self, source: str, target_dir: str = "") -> None: 919 """Copy a file from the VM (specified by an in-VM source path) to a path 920 relative to `$out`. The file is copied via the `shared_dir` shared among 921 all the VMs (using a temporary directory). 922 """ 923 # Compute the source, target, and intermediate shared file names 924 vm_src = Path(source) 925 with tempfile.TemporaryDirectory(dir=self.shared_dir) as shared_td: 926 shared_temp = Path(shared_td) 927 vm_shared_temp = Path("/tmp/shared") / shared_temp.name 928 vm_intermediate = vm_shared_temp / vm_src.name 929 intermediate = shared_temp / vm_src.name 930 # Copy the file to the shared directory inside VM 931 self.succeed(make_command(["mkdir", "-p", vm_shared_temp])) 932 self.succeed(make_command(["cp", "-r", vm_src, vm_intermediate])) 933 abs_target = self.out_dir / target_dir / vm_src.name 934 abs_target.parent.mkdir(exist_ok=True, parents=True) 935 # Copy the file from the shared directory outside VM 936 if intermediate.is_dir(): 937 shutil.copytree(intermediate, abs_target) 938 else: 939 shutil.copy(intermediate, abs_target) 940 941 def dump_tty_contents(self, tty: str) -> None: 942 """Debugging: Dump the contents of the TTY<n>""" 943 self.execute(f"fold -w 80 /dev/vcs{tty} | systemd-cat") 944 945 def get_screen_text_variants(self) -> list[str]: 946 """ 947 Return a list of different interpretations of what is currently 948 visible on the machine's screen using optical character 949 recognition. The number and order of the interpretations is not 950 specified and is subject to change, but if no exception is raised at 951 least one will be returned. 952 953 ::: {.note} 954 This requires [`enableOCR`](#test-opt-enableOCR) to be set to `true`. 955 ::: 956 """ 957 with self._managed_screenshot() as screenshot_path: 958 return perform_ocr_variants_on_screenshot(screenshot_path) 959 960 def get_screen_text(self) -> str: 961 """ 962 Return a textual representation of what is currently visible on the 963 machine's screen using optical character recognition. 964 965 ::: {.note} 966 This requires [`enableOCR`](#test-opt-enableOCR) to be set to `true`. 967 ::: 968 """ 969 with self._managed_screenshot() as screenshot_path: 970 return perform_ocr_on_screenshot(screenshot_path) 971 972 def wait_for_text(self, regex: str, timeout: int = 900) -> None: 973 """ 974 Wait until the supplied regular expressions matches the textual 975 contents of the screen by using optical character recognition (see 976 `get_screen_text` and `get_screen_text_variants`). 977 978 ::: {.note} 979 This requires [`enableOCR`](#test-opt-enableOCR) to be set to `true`. 980 ::: 981 """ 982 983 def screen_matches(last_try: bool) -> bool: 984 variants = self.get_screen_text_variants() 985 for text in variants: 986 if re.search(regex, text) is not None: 987 return True 988 989 if last_try: 990 self.log(f"Last OCR attempt failed. Text was: {variants}") 991 992 return False 993 994 with self.nested(f"waiting for {regex} to appear on screen"): 995 retry(screen_matches, timeout) 996 997 def wait_for_console_text(self, regex: str, timeout: int | None = None) -> None: 998 """ 999 Wait until the supplied regular expressions match a line of the 1000 serial console output. 1001 This method is useful when OCR is not possible or inaccurate. 1002 1003 When this method returns, the console output that includes the match has already become part of get_console_log(). 1004 """ 1005 # Buffer the console output, this is needed 1006 # to match multiline regexes. 1007 console = io.StringIO() 1008 1009 def console_matches(_last_try: bool) -> bool: 1010 nonlocal console 1011 try: 1012 # This will return as soon as possible and 1013 # sleep 1 second. 1014 console.write(self.last_lines.get(block=False)) 1015 except queue.Empty: 1016 pass 1017 console.seek(0) 1018 matches = re.search(regex, console.read()) 1019 return matches is not None 1020 1021 with self.nested(f"waiting for {regex} to appear on console"): 1022 if timeout is not None: 1023 retry(console_matches, timeout) 1024 else: 1025 while not console_matches(False): 1026 pass 1027 1028 def get_console_log(self) -> str: 1029 """ 1030 Get the full console output from the machine since boot. 1031 Returns all serial console output as a single string. 1032 """ 1033 return "\n".join(self.full_console_log) 1034 1035 def send_key( 1036 self, key: str, delay: float | None = 0.01, log: bool | None = True 1037 ) -> None: 1038 """ 1039 Simulate pressing keys on the virtual keyboard, e.g., 1040 `send_key("ctrl-alt-delete")`. 1041 1042 Please also refer to the QEMU documentation for more information on the 1043 input syntax: https://en.wikibooks.org/wiki/QEMU/Monitor#sendkey_keys 1044 """ 1045 key = CHAR_TO_KEY.get(key, key) 1046 context = self.nested(f"sending key {repr(key)}") if log else nullcontext() 1047 with context: 1048 self.send_monitor_command(f"sendkey {key}") 1049 if delay is not None: 1050 time.sleep(delay) 1051 1052 def send_console(self, chars: str) -> None: 1053 r""" 1054 Send keys to the kernel console. This allows interaction with the systemd 1055 emergency mode, for example. Takes a string that is sent, e.g., 1056 `send_console("\n\nsystemctl default\n")`. 1057 """ 1058 assert self.process 1059 assert self.process.stdin 1060 self.process.stdin.write(chars.encode()) 1061 self.process.stdin.flush() 1062 1063 def start(self, allow_reboot: bool = False) -> None: 1064 """ 1065 Start the virtual machine. This method is asynchronous --- it does 1066 not wait for the machine to finish booting. 1067 """ 1068 if self.booted: 1069 return 1070 1071 self.log("starting vm") 1072 1073 def clear(path: Path) -> Path: 1074 if path.exists(): 1075 path.unlink() 1076 return path 1077 1078 def create_socket(path: Path) -> socket.socket: 1079 s = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM) 1080 s.bind(str(path)) 1081 s.listen(1) 1082 return s 1083 1084 monitor_socket = create_socket(clear(self.monitor_path)) 1085 shell_socket = create_socket(clear(self.shell_path)) 1086 self.process = self.start_command.run( 1087 self.state_dir, 1088 self.shared_dir, 1089 self.monitor_path, 1090 self.qmp_path, 1091 self.shell_path, 1092 allow_reboot, 1093 ) 1094 self.monitor, _ = monitor_socket.accept() 1095 self.shell, _ = shell_socket.accept() 1096 self.qmp_client = QMPSession.from_path(self.qmp_path) 1097 1098 # Store last serial console lines for use 1099 # of wait_for_console_text 1100 self.last_lines: Queue = Queue() 1101 # Re-initialize (if this is not the first start) 1102 self.full_console_log: list[str] = [] 1103 1104 def process_serial_output() -> None: 1105 assert self.process 1106 assert self.process.stdout 1107 for _line in self.process.stdout: 1108 # Ignore undecodable bytes that may occur in boot menus 1109 line = _line.decode(errors="ignore").replace("\r", "").rstrip() 1110 self.full_console_log.append(line) 1111 # Put on queue after adding to full_console_log to guarantee ordering 1112 self.last_lines.put(line) 1113 self.log_serial(line) 1114 1115 self.serial_thread = threading.Thread(target=process_serial_output) 1116 self.serial_thread.start() 1117 1118 self.wait_for_monitor_prompt() 1119 1120 self.pid = self.process.pid 1121 self.booted = True 1122 1123 self.log(f"QEMU running (pid {self.pid})") 1124 1125 def cleanup_statedir(self) -> None: 1126 shutil.rmtree(self.state_dir) 1127 self.logger.log(f"deleting VM state directory {self.state_dir}") 1128 self.logger.log("if you want to keep the VM state, pass --keep-vm-state") 1129 1130 def shutdown(self) -> None: 1131 """ 1132 Shut down the machine, waiting for the VM to exit. 1133 """ 1134 if not self.booted: 1135 return 1136 1137 assert self.shell 1138 self.shell.send(b"poweroff\n") 1139 self.wait_for_shutdown() 1140 1141 def crash(self) -> None: 1142 """ 1143 Simulate a sudden power failure, by telling the VM to exit immediately. 1144 """ 1145 if not self.booted: 1146 return 1147 1148 self.log("forced crash") 1149 self.send_monitor_command("quit") 1150 self.wait_for_shutdown() 1151 1152 def reboot(self) -> None: 1153 """Press Ctrl+Alt+Delete in the guest. 1154 1155 Prepares the machine to be reconnected which is useful if the 1156 machine was started with `allow_reboot = True` 1157 """ 1158 self.send_key("ctrl-alt-delete") 1159 self.connected = False 1160 1161 def wait_for_x(self, timeout: int = 900) -> None: 1162 """ 1163 Wait until it is possible to connect to the X server. 1164 """ 1165 1166 def check_x(_last_try: bool) -> bool: 1167 cmd = ( 1168 "journalctl -b SYSLOG_IDENTIFIER=systemd | " 1169 + 'grep "Reached target Current graphical"' 1170 ) 1171 status, _ = self.execute(cmd) 1172 if status != 0: 1173 return False 1174 status, _ = self.execute("[ -e /tmp/.X11-unix/X0 ]") 1175 return status == 0 1176 1177 with self.nested("waiting for the X11 server"): 1178 retry(check_x, timeout) 1179 1180 def get_window_names(self) -> list[str]: 1181 return self.succeed( 1182 r"xwininfo -root -tree | sed 's/.*0x[0-9a-f]* \"\([^\"]*\)\".*/\1/; t; d'" 1183 ).splitlines() 1184 1185 def wait_for_window(self, regexp: str, timeout: int = 900) -> None: 1186 """ 1187 Wait until an X11 window has appeared whose name matches the given 1188 regular expression, e.g., `wait_for_window("Terminal")`. 1189 """ 1190 pattern = re.compile(regexp) 1191 1192 def window_is_visible(last_try: bool) -> bool: 1193 names = self.get_window_names() 1194 if last_try: 1195 self.log( 1196 f"Last chance to match {regexp} on the window list," 1197 + " which currently contains: " 1198 + ", ".join(names) 1199 ) 1200 return any(pattern.search(name) for name in names) 1201 1202 with self.nested("waiting for a window to appear"): 1203 retry(window_is_visible, timeout) 1204 1205 def sleep(self, secs: int) -> None: 1206 # We want to sleep in *guest* time, not *host* time. 1207 self.succeed(f"sleep {secs}") 1208 1209 def forward_port(self, host_port: int = 8080, guest_port: int = 80) -> None: 1210 """ 1211 Forward a TCP port on the host to a TCP port on the guest. 1212 Useful during interactive testing. 1213 """ 1214 self.send_monitor_command(f"hostfwd_add tcp::{host_port}-:{guest_port}") 1215 1216 def block(self) -> None: 1217 """ 1218 Simulate unplugging the Ethernet cable that connects the machine to 1219 the other machines. 1220 This happens by shutting down eth1 (the multicast interface used to talk 1221 to the other VMs). eth0 is kept online to still enable the test driver 1222 to communicate with the machine. 1223 """ 1224 self.send_monitor_command("set_link virtio-net-pci.1 off") 1225 1226 def unblock(self) -> None: 1227 """ 1228 Undo the effect of `block`. 1229 """ 1230 self.send_monitor_command("set_link virtio-net-pci.1 on") 1231 1232 def release(self) -> None: 1233 if self.pid is None: 1234 return 1235 self.logger.info(f"kill machine (pid {self.pid})") 1236 assert self.process 1237 assert self.shell 1238 assert self.monitor 1239 assert self.serial_thread 1240 1241 self.process.terminate() 1242 self.shell.close() 1243 self.monitor.close() 1244 self.serial_thread.join() 1245 1246 if self.qmp_client: 1247 self.qmp_client.close() 1248 1249 def run_callbacks(self) -> None: 1250 for callback in self.callbacks: 1251 callback() 1252 1253 def switch_root(self) -> None: 1254 """ 1255 Transition from stage 1 to stage 2. This requires the 1256 machine to be configured with `testing.initrdBackdoor = true` 1257 and `boot.initrd.systemd.enable = true`. 1258 """ 1259 self.wait_for_unit("initrd.target") 1260 self.execute( 1261 "systemctl isolate --no-block initrd-switch-root.target 2>/dev/null >/dev/null", 1262 check_return=False, 1263 check_output=False, 1264 ) 1265 self.connected = False 1266 self.connect()