at 24.11-pre 43 kB view raw
1import base64 2import io 3import os 4import queue 5import re 6import select 7import shlex 8import shutil 9import socket 10import subprocess 11import sys 12import tempfile 13import threading 14import time 15from contextlib import _GeneratorContextManager, nullcontext 16from pathlib import Path 17from queue import Queue 18from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple 19 20from test_driver.logger import AbstractLogger 21 22from .qmp import QMPSession 23 24CHAR_TO_KEY = { 25 "A": "shift-a", 26 "N": "shift-n", 27 "-": "0x0C", 28 "_": "shift-0x0C", 29 "B": "shift-b", 30 "O": "shift-o", 31 "=": "0x0D", 32 "+": "shift-0x0D", 33 "C": "shift-c", 34 "P": "shift-p", 35 "[": "0x1A", 36 "{": "shift-0x1A", 37 "D": "shift-d", 38 "Q": "shift-q", 39 "]": "0x1B", 40 "}": "shift-0x1B", 41 "E": "shift-e", 42 "R": "shift-r", 43 ";": "0x27", 44 ":": "shift-0x27", 45 "F": "shift-f", 46 "S": "shift-s", 47 "'": "0x28", 48 '"': "shift-0x28", 49 "G": "shift-g", 50 "T": "shift-t", 51 "`": "0x29", 52 "~": "shift-0x29", 53 "H": "shift-h", 54 "U": "shift-u", 55 "\\": "0x2B", 56 "|": "shift-0x2B", 57 "I": "shift-i", 58 "V": "shift-v", 59 ",": "0x33", 60 "<": "shift-0x33", 61 "J": "shift-j", 62 "W": "shift-w", 63 ".": "0x34", 64 ">": "shift-0x34", 65 "K": "shift-k", 66 "X": "shift-x", 67 "/": "0x35", 68 "?": "shift-0x35", 69 "L": "shift-l", 70 "Y": "shift-y", 71 " ": "spc", 72 "M": "shift-m", 73 "Z": "shift-z", 74 "\n": "ret", 75 "!": "shift-0x02", 76 "@": "shift-0x03", 77 "#": "shift-0x04", 78 "$": "shift-0x05", 79 "%": "shift-0x06", 80 "^": "shift-0x07", 81 "&": "shift-0x08", 82 "*": "shift-0x09", 83 "(": "shift-0x0A", 84 ")": "shift-0x0B", 85} 86 87 88def make_command(args: list) -> str: 89 return " ".join(map(shlex.quote, (map(str, args)))) 90 91 92def _perform_ocr_on_screenshot( 93 screenshot_path: str, model_ids: Iterable[int] 94) -> List[str]: 95 if shutil.which("tesseract") is None: 96 raise Exception("OCR requested but enableOCR is false") 97 98 magick_args = ( 99 "-filter Catrom -density 72 -resample 300 " 100 + "-contrast -normalize -despeckle -type grayscale " 101 + "-sharpen 1 -posterize 3 -negate -gamma 100 " 102 + "-blur 1x65535" 103 ) 104 105 tess_args = "-c debug_file=/dev/null --psm 11" 106 107 cmd = f"convert {magick_args} '{screenshot_path}' 'tiff:{screenshot_path}.tiff'" 108 ret = subprocess.run(cmd, shell=True, capture_output=True) 109 if ret.returncode != 0: 110 raise Exception(f"TIFF conversion failed with exit code {ret.returncode}") 111 112 model_results = [] 113 for model_id in model_ids: 114 cmd = f"tesseract '{screenshot_path}.tiff' - {tess_args} --oem '{model_id}'" 115 ret = subprocess.run(cmd, shell=True, capture_output=True) 116 if ret.returncode != 0: 117 raise Exception(f"OCR failed with exit code {ret.returncode}") 118 model_results.append(ret.stdout.decode("utf-8")) 119 120 return model_results 121 122 123def retry(fn: Callable, timeout: int = 900) -> None: 124 """Call the given function repeatedly, with 1 second intervals, 125 until it returns True or a timeout is reached. 126 """ 127 128 for _ in range(timeout): 129 if fn(False): 130 return 131 time.sleep(1) 132 133 if not fn(True): 134 raise Exception(f"action timed out after {timeout} seconds") 135 136 137class StartCommand: 138 """The Base Start Command knows how to append the necessary 139 runtime qemu options as determined by a particular test driver 140 run. Any such start command is expected to happily receive and 141 append additional qemu args. 142 """ 143 144 _cmd: str 145 146 def cmd( 147 self, 148 monitor_socket_path: Path, 149 qmp_socket_path: Path, 150 shell_socket_path: Path, 151 allow_reboot: bool = False, 152 ) -> str: 153 display_opts = "" 154 display_available = any(x in os.environ for x in ["DISPLAY", "WAYLAND_DISPLAY"]) 155 if not display_available: 156 display_opts += " -nographic" 157 158 # qemu options 159 qemu_opts = ( 160 " -device virtio-serial" 161 # Note: virtconsole will map to /dev/hvc0 in Linux guests 162 " -device virtconsole,chardev=shell" 163 " -device virtio-rng-pci" 164 " -serial stdio" 165 ) 166 if not allow_reboot: 167 qemu_opts += " -no-reboot" 168 169 return ( 170 f"{self._cmd}" 171 f" -qmp unix:{qmp_socket_path},server=on,wait=off" 172 f" -monitor unix:{monitor_socket_path}" 173 f" -chardev socket,id=shell,path={shell_socket_path}" 174 f"{qemu_opts}" 175 f"{display_opts}" 176 ) 177 178 @staticmethod 179 def build_environment( 180 state_dir: Path, 181 shared_dir: Path, 182 ) -> dict: 183 # We make a copy to not update the current environment 184 env = dict(os.environ) 185 env.update( 186 { 187 "TMPDIR": str(state_dir), 188 "SHARED_DIR": str(shared_dir), 189 "USE_TMPDIR": "1", 190 } 191 ) 192 return env 193 194 def run( 195 self, 196 state_dir: Path, 197 shared_dir: Path, 198 monitor_socket_path: Path, 199 qmp_socket_path: Path, 200 shell_socket_path: Path, 201 allow_reboot: bool, 202 ) -> subprocess.Popen: 203 return subprocess.Popen( 204 self.cmd( 205 monitor_socket_path, qmp_socket_path, shell_socket_path, allow_reboot 206 ), 207 stdin=subprocess.PIPE, 208 stdout=subprocess.PIPE, 209 shell=True, 210 cwd=state_dir, 211 env=self.build_environment(state_dir, shared_dir), 212 ) 213 214 215class NixStartScript(StartCommand): 216 """A start script from nixos/modules/virtualiation/qemu-vm.nix 217 that also satisfies the requirement of the BaseStartCommand. 218 These Nix commands have the particular characteristic that the 219 machine name can be extracted out of them via a regex match. 220 (Admittedly a _very_ implicit contract, evtl. TODO fix) 221 """ 222 223 def __init__(self, script: str): 224 self._cmd = script 225 226 @property 227 def machine_name(self) -> str: 228 match = re.search("run-(.+)-vm$", self._cmd) 229 name = "machine" 230 if match: 231 name = match.group(1) 232 return name 233 234 235class Machine: 236 """A handle to the machine with this name, that also knows how to manage 237 the machine lifecycle with the help of a start script / command.""" 238 239 name: str 240 out_dir: Path 241 tmp_dir: Path 242 shared_dir: Path 243 state_dir: Path 244 monitor_path: Path 245 qmp_path: Path 246 shell_path: Path 247 248 start_command: StartCommand 249 keep_vm_state: bool 250 251 process: Optional[subprocess.Popen] 252 pid: Optional[int] 253 monitor: Optional[socket.socket] 254 qmp_client: Optional[QMPSession] 255 shell: Optional[socket.socket] 256 serial_thread: Optional[threading.Thread] 257 258 booted: bool 259 connected: bool 260 # Store last serial console lines for use 261 # of wait_for_console_text 262 last_lines: Queue = Queue() 263 callbacks: List[Callable] 264 265 def __repr__(self) -> str: 266 return f"<Machine '{self.name}'>" 267 268 def __init__( 269 self, 270 out_dir: Path, 271 tmp_dir: Path, 272 start_command: StartCommand, 273 logger: AbstractLogger, 274 name: str = "machine", 275 keep_vm_state: bool = False, 276 callbacks: Optional[List[Callable]] = None, 277 ) -> None: 278 self.out_dir = out_dir 279 self.tmp_dir = tmp_dir 280 self.keep_vm_state = keep_vm_state 281 self.name = name 282 self.start_command = start_command 283 self.callbacks = callbacks if callbacks is not None else [] 284 self.logger = logger 285 286 # set up directories 287 self.shared_dir = self.tmp_dir / "shared-xchg" 288 self.shared_dir.mkdir(mode=0o700, exist_ok=True) 289 290 self.state_dir = self.tmp_dir / f"vm-state-{self.name}" 291 self.monitor_path = self.state_dir / "monitor" 292 self.qmp_path = self.state_dir / "qmp" 293 self.shell_path = self.state_dir / "shell" 294 if (not self.keep_vm_state) and self.state_dir.exists(): 295 self.cleanup_statedir() 296 self.state_dir.mkdir(mode=0o700, exist_ok=True) 297 298 self.process = None 299 self.pid = None 300 self.monitor = None 301 self.qmp_client = None 302 self.shell = None 303 self.serial_thread = None 304 305 self.booted = False 306 self.connected = False 307 308 def is_up(self) -> bool: 309 return self.booted and self.connected 310 311 def log(self, msg: str) -> None: 312 self.logger.log(msg, {"machine": self.name}) 313 314 def log_serial(self, msg: str) -> None: 315 self.logger.log_serial(msg, self.name) 316 317 def nested(self, msg: str, attrs: Dict[str, str] = {}) -> _GeneratorContextManager: 318 my_attrs = {"machine": self.name} 319 my_attrs.update(attrs) 320 return self.logger.nested(msg, my_attrs) 321 322 def wait_for_monitor_prompt(self) -> str: 323 assert self.monitor is not None 324 answer = "" 325 while True: 326 undecoded_answer = self.monitor.recv(1024) 327 if not undecoded_answer: 328 break 329 answer += undecoded_answer.decode() 330 if answer.endswith("(qemu) "): 331 break 332 return answer 333 334 def send_monitor_command(self, command: str) -> str: 335 """ 336 Send a command to the QEMU monitor. This allows attaching 337 virtual USB disks to a running machine, among other things. 338 """ 339 self.run_callbacks() 340 message = f"{command}\n".encode() 341 assert self.monitor is not None 342 self.monitor.send(message) 343 return self.wait_for_monitor_prompt() 344 345 def wait_for_unit( 346 self, unit: str, user: Optional[str] = None, timeout: int = 900 347 ) -> None: 348 """ 349 Wait for a systemd unit to get into "active" state. 350 Throws exceptions on "failed" and "inactive" states as well as after 351 timing out. 352 """ 353 354 def check_active(_: Any) -> bool: 355 state = self.get_unit_property(unit, "ActiveState", user) 356 if state == "failed": 357 raise Exception(f'unit "{unit}" reached state "{state}"') 358 359 if state == "inactive": 360 status, jobs = self.systemctl("list-jobs --full 2>&1", user) 361 if "No jobs" in jobs: 362 info = self.get_unit_info(unit, user) 363 if info["ActiveState"] == state: 364 raise Exception( 365 f'unit "{unit}" is inactive and there are no pending jobs' 366 ) 367 368 return state == "active" 369 370 with self.nested( 371 f"waiting for unit {unit}" 372 + (f" with user {user}" if user is not None else "") 373 ): 374 retry(check_active, timeout) 375 376 def get_unit_info(self, unit: str, user: Optional[str] = None) -> Dict[str, str]: 377 status, lines = self.systemctl(f'--no-pager show "{unit}"', user) 378 if status != 0: 379 raise Exception( 380 f'retrieving systemctl info for unit "{unit}"' 381 + ("" if user is None else f' under user "{user}"') 382 + f" failed with exit code {status}" 383 ) 384 385 line_pattern = re.compile(r"^([^=]+)=(.*)$") 386 387 def tuple_from_line(line: str) -> Tuple[str, str]: 388 match = line_pattern.match(line) 389 assert match is not None 390 return match[1], match[2] 391 392 return dict( 393 tuple_from_line(line) 394 for line in lines.split("\n") 395 if line_pattern.match(line) 396 ) 397 398 def get_unit_property( 399 self, 400 unit: str, 401 property: str, 402 user: Optional[str] = None, 403 ) -> str: 404 status, lines = self.systemctl( 405 f'--no-pager show "{unit}" --property="{property}"', 406 user, 407 ) 408 if status != 0: 409 raise Exception( 410 f'retrieving systemctl property "{property}" for unit "{unit}"' 411 + ("" if user is None else f' under user "{user}"') 412 + f" failed with exit code {status}" 413 ) 414 415 invalid_output_message = ( 416 f'systemctl show --property "{property}" "{unit}"' 417 f"produced invalid output: {lines}" 418 ) 419 420 line_pattern = re.compile(r"^([^=]+)=(.*)$") 421 match = line_pattern.match(lines) 422 assert match is not None, invalid_output_message 423 424 assert match[1] == property, invalid_output_message 425 return match[2] 426 427 def systemctl(self, q: str, user: Optional[str] = None) -> Tuple[int, str]: 428 """ 429 Runs `systemctl` commands with optional support for 430 `systemctl --user` 431 432 ```py 433 # run `systemctl list-jobs --no-pager` 434 machine.systemctl("list-jobs --no-pager") 435 436 # spawn a shell for `any-user` and run 437 # `systemctl --user list-jobs --no-pager` 438 machine.systemctl("list-jobs --no-pager", "any-user") 439 ``` 440 """ 441 if user is not None: 442 q = q.replace("'", "\\'") 443 return self.execute( 444 f"su -l {user} --shell /bin/sh -c " 445 "$'XDG_RUNTIME_DIR=/run/user/`id -u` " 446 f"systemctl --user {q}'" 447 ) 448 return self.execute(f"systemctl {q}") 449 450 def require_unit_state(self, unit: str, require_state: str = "active") -> None: 451 with self.nested( 452 f"checking if unit '{unit}' has reached state '{require_state}'" 453 ): 454 info = self.get_unit_info(unit) 455 state = info["ActiveState"] 456 if state != require_state: 457 raise Exception( 458 f"Expected unit '{unit}' to to be in state " 459 f"'{require_state}' but it is in state '{state}'" 460 ) 461 462 def _next_newline_closed_block_from_shell(self) -> str: 463 assert self.shell 464 output_buffer = [] 465 while True: 466 # This receives up to 4096 bytes from the socket 467 chunk = self.shell.recv(4096) 468 if not chunk: 469 # Probably a broken pipe, return the output we have 470 break 471 472 decoded = chunk.decode() 473 output_buffer += [decoded] 474 if decoded[-1] == "\n": 475 break 476 return "".join(output_buffer) 477 478 def execute( 479 self, 480 command: str, 481 check_return: bool = True, 482 check_output: bool = True, 483 timeout: Optional[int] = 900, 484 ) -> Tuple[int, str]: 485 """ 486 Execute a shell command, returning a list `(status, stdout)`. 487 488 Commands are run with `set -euo pipefail` set: 489 490 - If several commands are separated by `;` and one fails, the 491 command as a whole will fail. 492 493 - For pipelines, the last non-zero exit status will be returned 494 (if there is one; otherwise zero will be returned). 495 496 - Dereferencing unset variables fails the command. 497 498 - It will wait for stdout to be closed. 499 500 If the command detaches, it must close stdout, as `execute` will wait 501 for this to consume all output reliably. This can be achieved by 502 redirecting stdout to stderr `>&2`, to `/dev/console`, `/dev/null` or 503 a file. Examples of detaching commands are `sleep 365d &`, where the 504 shell forks a new process that can write to stdout and `xclip -i`, where 505 the `xclip` command itself forks without closing stdout. 506 507 Takes an optional parameter `check_return` that defaults to `True`. 508 Setting this parameter to `False` will not check for the return code 509 and return -1 instead. This can be used for commands that shut down 510 the VM and would therefore break the pipe that would be used for 511 retrieving the return code. 512 513 A timeout for the command can be specified (in seconds) using the optional 514 `timeout` parameter, e.g., `execute(cmd, timeout=10)` or 515 `execute(cmd, timeout=None)`. The default is 900 seconds. 516 """ 517 self.run_callbacks() 518 self.connect() 519 520 # Always run command with shell opts 521 command = f"set -euo pipefail; {command}" 522 523 timeout_str = "" 524 if timeout is not None: 525 timeout_str = f"timeout {timeout}" 526 527 # While sh is bash on NixOS, this is not the case for every distro. 528 # We explicitly call bash here to allow for the driver to boot other distros as well. 529 out_command = ( 530 f"{timeout_str} bash -c {shlex.quote(command)} | (base64 -w 0; echo)\n" 531 ) 532 533 assert self.shell 534 self.shell.send(out_command.encode()) 535 536 if not check_output: 537 return (-2, "") 538 539 # Get the output 540 output = base64.b64decode(self._next_newline_closed_block_from_shell()) 541 542 if not check_return: 543 return (-1, output.decode()) 544 545 # Get the return code 546 self.shell.send(b"echo ${PIPESTATUS[0]}\n") 547 rc = int(self._next_newline_closed_block_from_shell().strip()) 548 549 return (rc, output.decode(errors="replace")) 550 551 def shell_interact(self, address: Optional[str] = None) -> None: 552 """ 553 Allows you to directly interact with the guest shell. This should 554 only be used during test development, not in production tests. 555 Killing the interactive session with `Ctrl-d` or `Ctrl-c` also ends 556 the guest session. 557 """ 558 self.connect() 559 560 if address is None: 561 address = "READLINE,prompt=$ " 562 self.log("Terminal is ready (there is no initial prompt):") 563 564 assert self.shell 565 try: 566 subprocess.run( 567 ["socat", address, f"FD:{self.shell.fileno()}"], 568 pass_fds=[self.shell.fileno()], 569 ) 570 # allow users to cancel this command without breaking the test 571 except KeyboardInterrupt: 572 pass 573 574 def console_interact(self) -> None: 575 """ 576 Allows you to directly interact with QEMU's stdin, by forwarding 577 terminal input to the QEMU process. 578 This is for use with the interactive test driver, not for production 579 tests, which run unattended. 580 Output from QEMU is only read line-wise. `Ctrl-c` kills QEMU and 581 `Ctrl-d` closes console and returns to the test runner. 582 """ 583 self.log("Terminal is ready (there is no prompt):") 584 585 assert self.process 586 assert self.process.stdin 587 588 while True: 589 try: 590 char = sys.stdin.buffer.read(1) 591 except KeyboardInterrupt: 592 break 593 if char == b"": # ctrl+d 594 self.log("Closing connection to the console") 595 break 596 self.send_console(char.decode()) 597 598 def succeed(self, *commands: str, timeout: Optional[int] = None) -> str: 599 """ 600 Execute a shell command, raising an exception if the exit status is 601 not zero, otherwise returning the standard output. Similar to `execute`, 602 except that the timeout is `None` by default. See `execute` for details on 603 command execution. 604 """ 605 output = "" 606 for command in commands: 607 with self.nested(f"must succeed: {command}"): 608 (status, out) = self.execute(command, timeout=timeout) 609 if status != 0: 610 self.log(f"output: {out}") 611 raise Exception(f"command `{command}` failed (exit code {status})") 612 output += out 613 return output 614 615 def fail(self, *commands: str, timeout: Optional[int] = None) -> str: 616 """ 617 Like `succeed`, but raising an exception if the command returns a zero 618 status. 619 """ 620 output = "" 621 for command in commands: 622 with self.nested(f"must fail: {command}"): 623 (status, out) = self.execute(command, timeout=timeout) 624 if status == 0: 625 raise Exception(f"command `{command}` unexpectedly succeeded") 626 output += out 627 return output 628 629 def wait_until_succeeds(self, command: str, timeout: int = 900) -> str: 630 """ 631 Repeat a shell command with 1-second intervals until it succeeds. 632 Has a default timeout of 900 seconds which can be modified, e.g. 633 `wait_until_succeeds(cmd, timeout=10)`. See `execute` for details on 634 command execution. 635 Throws an exception on timeout. 636 """ 637 output = "" 638 639 def check_success(_: Any) -> bool: 640 nonlocal output 641 status, output = self.execute(command, timeout=timeout) 642 return status == 0 643 644 with self.nested(f"waiting for success: {command}"): 645 retry(check_success, timeout) 646 return output 647 648 def wait_until_fails(self, command: str, timeout: int = 900) -> str: 649 """ 650 Like `wait_until_succeeds`, but repeating the command until it fails. 651 """ 652 output = "" 653 654 def check_failure(_: Any) -> bool: 655 nonlocal output 656 status, output = self.execute(command, timeout=timeout) 657 return status != 0 658 659 with self.nested(f"waiting for failure: {command}"): 660 retry(check_failure, timeout) 661 return output 662 663 def wait_for_shutdown(self) -> None: 664 if not self.booted: 665 return 666 667 with self.nested("waiting for the VM to power off"): 668 sys.stdout.flush() 669 assert self.process 670 self.process.wait() 671 672 self.pid = None 673 self.booted = False 674 self.connected = False 675 676 def wait_for_qmp_event( 677 self, event_filter: Callable[[dict[str, Any]], bool], timeout: int = 60 * 10 678 ) -> dict[str, Any]: 679 """ 680 Wait for a QMP event which you can filter with the `event_filter` function. 681 The function takes as an input a dictionary of the event and if it returns True, we return that event, 682 if it does not, we wait for the next event and retry. 683 684 It will skip all events received in the meantime, if you want to keep them, 685 you have to do the bookkeeping yourself and store them somewhere. 686 687 By default, it will wait up to 10 minutes, `timeout` is in seconds. 688 """ 689 if self.qmp_client is None: 690 raise RuntimeError("QMP API is not ready yet, is the VM ready?") 691 692 start = time.time() 693 while True: 694 evt = self.qmp_client.wait_for_event(timeout=timeout) 695 if event_filter(evt): 696 return evt 697 698 elapsed = time.time() - start 699 if elapsed >= timeout: 700 raise TimeoutError 701 702 def get_tty_text(self, tty: str) -> str: 703 status, output = self.execute( 704 f"fold -w$(stty -F /dev/tty{tty} size | " 705 f"awk '{{print $2}}') /dev/vcs{tty}" 706 ) 707 return output 708 709 def wait_until_tty_matches(self, tty: str, regexp: str, timeout: int = 900) -> None: 710 """Wait until the visible output on the chosen TTY matches regular 711 expression. Throws an exception on timeout. 712 """ 713 matcher = re.compile(regexp) 714 715 def tty_matches(last: bool) -> bool: 716 text = self.get_tty_text(tty) 717 if last: 718 self.log( 719 f"Last chance to match /{regexp}/ on TTY{tty}, " 720 f"which currently contains: {text}" 721 ) 722 return len(matcher.findall(text)) > 0 723 724 with self.nested(f"waiting for {regexp} to appear on tty {tty}"): 725 retry(tty_matches, timeout) 726 727 def send_chars(self, chars: str, delay: Optional[float] = 0.01) -> None: 728 """ 729 Simulate typing a sequence of characters on the virtual keyboard, 730 e.g., `send_chars("foobar\n")` will type the string `foobar` 731 followed by the Enter key. 732 """ 733 with self.nested(f"sending keys {repr(chars)}"): 734 for char in chars: 735 self.send_key(char, delay, log=False) 736 737 def wait_for_file(self, filename: str, timeout: int = 900) -> None: 738 """ 739 Waits until the file exists in the machine's file system. 740 """ 741 742 def check_file(_: Any) -> bool: 743 status, _ = self.execute(f"test -e {filename}") 744 return status == 0 745 746 with self.nested(f"waiting for file '{filename}'"): 747 retry(check_file, timeout) 748 749 def wait_for_open_port( 750 self, port: int, addr: str = "localhost", timeout: int = 900 751 ) -> None: 752 """ 753 Wait until a process is listening on the given TCP port and IP address 754 (default `localhost`). 755 """ 756 757 def port_is_open(_: Any) -> bool: 758 status, _ = self.execute(f"nc -z {addr} {port}") 759 return status == 0 760 761 with self.nested(f"waiting for TCP port {port} on {addr}"): 762 retry(port_is_open, timeout) 763 764 def wait_for_open_unix_socket( 765 self, addr: str, is_datagram: bool = False, timeout: int = 900 766 ) -> None: 767 """ 768 Wait until a process is listening on the given UNIX-domain socket 769 (default to a UNIX-domain stream socket). 770 """ 771 772 nc_flags = [ 773 "-z", 774 "-uU" if is_datagram else "-U", 775 ] 776 777 def socket_is_open(_: Any) -> bool: 778 status, _ = self.execute(f"nc {' '.join(nc_flags)} {addr}") 779 return status == 0 780 781 with self.nested( 782 f"waiting for UNIX-domain {'datagram' if is_datagram else 'stream'} on '{addr}'" 783 ): 784 retry(socket_is_open, timeout) 785 786 def wait_for_closed_port( 787 self, port: int, addr: str = "localhost", timeout: int = 900 788 ) -> None: 789 """ 790 Wait until nobody is listening on the given TCP port and IP address 791 (default `localhost`). 792 """ 793 794 def port_is_closed(_: Any) -> bool: 795 status, _ = self.execute(f"nc -z {addr} {port}") 796 return status != 0 797 798 with self.nested(f"waiting for TCP port {port} on {addr} to be closed"): 799 retry(port_is_closed, timeout) 800 801 def start_job(self, jobname: str, user: Optional[str] = None) -> Tuple[int, str]: 802 return self.systemctl(f"start {jobname}", user) 803 804 def stop_job(self, jobname: str, user: Optional[str] = None) -> Tuple[int, str]: 805 return self.systemctl(f"stop {jobname}", user) 806 807 def wait_for_job(self, jobname: str) -> None: 808 self.wait_for_unit(jobname) 809 810 def connect(self) -> None: 811 def shell_ready(timeout_secs: int) -> bool: 812 """We sent some data from the backdoor service running on the guest 813 to indicate that the backdoor shell is ready. 814 As soon as we read some data from the socket here, we assume that 815 our root shell is operational. 816 """ 817 (ready, _, _) = select.select([self.shell], [], [], timeout_secs) 818 return bool(ready) 819 820 if self.connected: 821 return 822 823 with self.nested("waiting for the VM to finish booting"): 824 self.start() 825 826 assert self.shell 827 828 tic = time.time() 829 # TODO: do we want to bail after a set number of attempts? 830 while not shell_ready(timeout_secs=30): 831 self.log("Guest root shell did not produce any data yet...") 832 self.log( 833 " To debug, enter the VM and run 'systemctl status backdoor.service'." 834 ) 835 836 while True: 837 chunk = self.shell.recv(1024) 838 # No need to print empty strings, it means we are waiting. 839 if len(chunk) == 0: 840 continue 841 self.log(f"Guest shell says: {chunk!r}") 842 # NOTE: for this to work, nothing must be printed after this line! 843 if b"Spawning backdoor root shell..." in chunk: 844 break 845 846 toc = time.time() 847 848 self.log("connected to guest root shell") 849 self.log(f"(connecting took {toc - tic:.2f} seconds)") 850 self.connected = True 851 852 def screenshot(self, filename: str) -> None: 853 """ 854 Take a picture of the display of the virtual machine, in PNG format. 855 The screenshot will be available in the derivation output. 856 """ 857 if "." not in filename: 858 filename += ".png" 859 if "/" not in filename: 860 filename = os.path.join(self.out_dir, filename) 861 tmp = f"{filename}.ppm" 862 863 with self.nested( 864 f"making screenshot {filename}", 865 {"image": os.path.basename(filename)}, 866 ): 867 self.send_monitor_command(f"screendump {tmp}") 868 ret = subprocess.run(f"pnmtopng '{tmp}' > '{filename}'", shell=True) 869 os.unlink(tmp) 870 if ret.returncode != 0: 871 raise Exception("Cannot convert screenshot") 872 873 def copy_from_host_via_shell(self, source: str, target: str) -> None: 874 """Copy a file from the host into the guest by piping it over the 875 shell into the destination file. Works without host-guest shared folder. 876 Prefer copy_from_host for whenever possible. 877 """ 878 with open(source, "rb") as fh: 879 content_b64 = base64.b64encode(fh.read()).decode() 880 self.succeed( 881 f"mkdir -p $(dirname {target})", 882 f"echo -n {content_b64} | base64 -d > {target}", 883 ) 884 885 def copy_from_host(self, source: str, target: str) -> None: 886 """ 887 Copies a file from host to machine, e.g., 888 `copy_from_host("myfile", "/etc/my/important/file")`. 889 890 The first argument is the file on the host. Note that the "host" refers 891 to the environment in which the test driver runs, which is typically the 892 Nix build sandbox. 893 894 The second argument is the location of the file on the machine that will 895 be written to. 896 897 The file is copied via the `shared_dir` directory which is shared among 898 all the VMs (using a temporary directory). 899 The access rights bits will mimic the ones from the host file and 900 user:group will be root:root. 901 """ 902 host_src = Path(source) 903 vm_target = Path(target) 904 with tempfile.TemporaryDirectory(dir=self.shared_dir) as shared_td: 905 shared_temp = Path(shared_td) 906 host_intermediate = shared_temp / host_src.name 907 vm_shared_temp = Path("/tmp/shared") / shared_temp.name 908 vm_intermediate = vm_shared_temp / host_src.name 909 910 self.succeed(make_command(["mkdir", "-p", vm_shared_temp])) 911 if host_src.is_dir(): 912 shutil.copytree(host_src, host_intermediate) 913 else: 914 shutil.copy(host_src, host_intermediate) 915 self.succeed(make_command(["mkdir", "-p", vm_target.parent])) 916 self.succeed(make_command(["cp", "-r", vm_intermediate, vm_target])) 917 918 def copy_from_vm(self, source: str, target_dir: str = "") -> None: 919 """Copy a file from the VM (specified by an in-VM source path) to a path 920 relative to `$out`. The file is copied via the `shared_dir` shared among 921 all the VMs (using a temporary directory). 922 """ 923 # Compute the source, target, and intermediate shared file names 924 vm_src = Path(source) 925 with tempfile.TemporaryDirectory(dir=self.shared_dir) as shared_td: 926 shared_temp = Path(shared_td) 927 vm_shared_temp = Path("/tmp/shared") / shared_temp.name 928 vm_intermediate = vm_shared_temp / vm_src.name 929 intermediate = shared_temp / vm_src.name 930 # Copy the file to the shared directory inside VM 931 self.succeed(make_command(["mkdir", "-p", vm_shared_temp])) 932 self.succeed(make_command(["cp", "-r", vm_src, vm_intermediate])) 933 abs_target = self.out_dir / target_dir / vm_src.name 934 abs_target.parent.mkdir(exist_ok=True, parents=True) 935 # Copy the file from the shared directory outside VM 936 if intermediate.is_dir(): 937 shutil.copytree(intermediate, abs_target) 938 else: 939 shutil.copy(intermediate, abs_target) 940 941 def dump_tty_contents(self, tty: str) -> None: 942 """Debugging: Dump the contents of the TTY<n>""" 943 self.execute(f"fold -w 80 /dev/vcs{tty} | systemd-cat") 944 945 def _get_screen_text_variants(self, model_ids: Iterable[int]) -> List[str]: 946 with tempfile.TemporaryDirectory() as tmpdir: 947 screenshot_path = os.path.join(tmpdir, "ppm") 948 self.send_monitor_command(f"screendump {screenshot_path}") 949 return _perform_ocr_on_screenshot(screenshot_path, model_ids) 950 951 def get_screen_text_variants(self) -> List[str]: 952 """ 953 Return a list of different interpretations of what is currently 954 visible on the machine's screen using optical character 955 recognition. The number and order of the interpretations is not 956 specified and is subject to change, but if no exception is raised at 957 least one will be returned. 958 959 ::: {.note} 960 This requires [`enableOCR`](#test-opt-enableOCR) to be set to `true`. 961 ::: 962 """ 963 return self._get_screen_text_variants([0, 1, 2]) 964 965 def get_screen_text(self) -> str: 966 """ 967 Return a textual representation of what is currently visible on the 968 machine's screen using optical character recognition. 969 970 ::: {.note} 971 This requires [`enableOCR`](#test-opt-enableOCR) to be set to `true`. 972 ::: 973 """ 974 return self._get_screen_text_variants([2])[0] 975 976 def wait_for_text(self, regex: str, timeout: int = 900) -> None: 977 """ 978 Wait until the supplied regular expressions matches the textual 979 contents of the screen by using optical character recognition (see 980 `get_screen_text` and `get_screen_text_variants`). 981 982 ::: {.note} 983 This requires [`enableOCR`](#test-opt-enableOCR) to be set to `true`. 984 ::: 985 """ 986 987 def screen_matches(last: bool) -> bool: 988 variants = self.get_screen_text_variants() 989 for text in variants: 990 if re.search(regex, text) is not None: 991 return True 992 993 if last: 994 self.log(f"Last OCR attempt failed. Text was: {variants}") 995 996 return False 997 998 with self.nested(f"waiting for {regex} to appear on screen"): 999 retry(screen_matches, timeout) 1000 1001 def wait_for_console_text(self, regex: str, timeout: int | None = None) -> None: 1002 """ 1003 Wait until the supplied regular expressions match a line of the 1004 serial console output. 1005 This method is useful when OCR is not possible or inaccurate. 1006 """ 1007 # Buffer the console output, this is needed 1008 # to match multiline regexes. 1009 console = io.StringIO() 1010 1011 def console_matches(_: Any) -> bool: 1012 nonlocal console 1013 try: 1014 # This will return as soon as possible and 1015 # sleep 1 second. 1016 console.write(self.last_lines.get(block=False)) 1017 except queue.Empty: 1018 pass 1019 console.seek(0) 1020 matches = re.search(regex, console.read()) 1021 return matches is not None 1022 1023 with self.nested(f"waiting for {regex} to appear on console"): 1024 if timeout is not None: 1025 retry(console_matches, timeout) 1026 else: 1027 while not console_matches(False): 1028 pass 1029 1030 def send_key( 1031 self, key: str, delay: Optional[float] = 0.01, log: Optional[bool] = True 1032 ) -> None: 1033 """ 1034 Simulate pressing keys on the virtual keyboard, e.g., 1035 `send_key("ctrl-alt-delete")`. 1036 1037 Please also refer to the QEMU documentation for more information on the 1038 input syntax: https://en.wikibooks.org/wiki/QEMU/Monitor#sendkey_keys 1039 """ 1040 key = CHAR_TO_KEY.get(key, key) 1041 context = self.nested(f"sending key {repr(key)}") if log else nullcontext() 1042 with context: 1043 self.send_monitor_command(f"sendkey {key}") 1044 if delay is not None: 1045 time.sleep(delay) 1046 1047 def send_console(self, chars: str) -> None: 1048 r""" 1049 Send keys to the kernel console. This allows interaction with the systemd 1050 emergency mode, for example. Takes a string that is sent, e.g., 1051 `send_console("\n\nsystemctl default\n")`. 1052 """ 1053 assert self.process 1054 assert self.process.stdin 1055 self.process.stdin.write(chars.encode()) 1056 self.process.stdin.flush() 1057 1058 def start(self, allow_reboot: bool = False) -> None: 1059 """ 1060 Start the virtual machine. This method is asynchronous --- it does 1061 not wait for the machine to finish booting. 1062 """ 1063 if self.booted: 1064 return 1065 1066 self.log("starting vm") 1067 1068 def clear(path: Path) -> Path: 1069 if path.exists(): 1070 path.unlink() 1071 return path 1072 1073 def create_socket(path: Path) -> socket.socket: 1074 s = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM) 1075 s.bind(str(path)) 1076 s.listen(1) 1077 return s 1078 1079 monitor_socket = create_socket(clear(self.monitor_path)) 1080 shell_socket = create_socket(clear(self.shell_path)) 1081 self.process = self.start_command.run( 1082 self.state_dir, 1083 self.shared_dir, 1084 self.monitor_path, 1085 self.qmp_path, 1086 self.shell_path, 1087 allow_reboot, 1088 ) 1089 self.monitor, _ = monitor_socket.accept() 1090 self.shell, _ = shell_socket.accept() 1091 self.qmp_client = QMPSession.from_path(self.qmp_path) 1092 1093 # Store last serial console lines for use 1094 # of wait_for_console_text 1095 self.last_lines: Queue = Queue() 1096 1097 def process_serial_output() -> None: 1098 assert self.process 1099 assert self.process.stdout 1100 for _line in self.process.stdout: 1101 # Ignore undecodable bytes that may occur in boot menus 1102 line = _line.decode(errors="ignore").replace("\r", "").rstrip() 1103 self.last_lines.put(line) 1104 self.log_serial(line) 1105 1106 self.serial_thread = threading.Thread(target=process_serial_output) 1107 self.serial_thread.start() 1108 1109 self.wait_for_monitor_prompt() 1110 1111 self.pid = self.process.pid 1112 self.booted = True 1113 1114 self.log(f"QEMU running (pid {self.pid})") 1115 1116 def cleanup_statedir(self) -> None: 1117 shutil.rmtree(self.state_dir) 1118 self.logger.log(f"deleting VM state directory {self.state_dir}") 1119 self.logger.log("if you want to keep the VM state, pass --keep-vm-state") 1120 1121 def shutdown(self) -> None: 1122 """ 1123 Shut down the machine, waiting for the VM to exit. 1124 """ 1125 if not self.booted: 1126 return 1127 1128 assert self.shell 1129 self.shell.send(b"poweroff\n") 1130 self.wait_for_shutdown() 1131 1132 def crash(self) -> None: 1133 """ 1134 Simulate a sudden power failure, by telling the VM to exit immediately. 1135 """ 1136 if not self.booted: 1137 return 1138 1139 self.log("forced crash") 1140 self.send_monitor_command("quit") 1141 self.wait_for_shutdown() 1142 1143 def reboot(self) -> None: 1144 """Press Ctrl+Alt+Delete in the guest. 1145 1146 Prepares the machine to be reconnected which is useful if the 1147 machine was started with `allow_reboot = True` 1148 """ 1149 self.send_key("ctrl-alt-delete") 1150 self.connected = False 1151 1152 def wait_for_x(self, timeout: int = 900) -> None: 1153 """ 1154 Wait until it is possible to connect to the X server. 1155 """ 1156 1157 def check_x(_: Any) -> bool: 1158 cmd = ( 1159 "journalctl -b SYSLOG_IDENTIFIER=systemd | " 1160 + 'grep "Reached target Current graphical"' 1161 ) 1162 status, _ = self.execute(cmd) 1163 if status != 0: 1164 return False 1165 status, _ = self.execute("[ -e /tmp/.X11-unix/X0 ]") 1166 return status == 0 1167 1168 with self.nested("waiting for the X11 server"): 1169 retry(check_x, timeout) 1170 1171 def get_window_names(self) -> List[str]: 1172 return self.succeed( 1173 r"xwininfo -root -tree | sed 's/.*0x[0-9a-f]* \"\([^\"]*\)\".*/\1/; t; d'" 1174 ).splitlines() 1175 1176 def wait_for_window(self, regexp: str, timeout: int = 900) -> None: 1177 """ 1178 Wait until an X11 window has appeared whose name matches the given 1179 regular expression, e.g., `wait_for_window("Terminal")`. 1180 """ 1181 pattern = re.compile(regexp) 1182 1183 def window_is_visible(last_try: bool) -> bool: 1184 names = self.get_window_names() 1185 if last_try: 1186 self.log( 1187 f"Last chance to match {regexp} on the window list," 1188 + " which currently contains: " 1189 + ", ".join(names) 1190 ) 1191 return any(pattern.search(name) for name in names) 1192 1193 with self.nested("waiting for a window to appear"): 1194 retry(window_is_visible, timeout) 1195 1196 def sleep(self, secs: int) -> None: 1197 # We want to sleep in *guest* time, not *host* time. 1198 self.succeed(f"sleep {secs}") 1199 1200 def forward_port(self, host_port: int = 8080, guest_port: int = 80) -> None: 1201 """ 1202 Forward a TCP port on the host to a TCP port on the guest. 1203 Useful during interactive testing. 1204 """ 1205 self.send_monitor_command(f"hostfwd_add tcp::{host_port}-:{guest_port}") 1206 1207 def block(self) -> None: 1208 """ 1209 Simulate unplugging the Ethernet cable that connects the machine to 1210 the other machines. 1211 This happens by shutting down eth1 (the multicast interface used to talk 1212 to the other VMs). eth0 is kept online to still enable the test driver 1213 to communicate with the machine. 1214 """ 1215 self.send_monitor_command("set_link virtio-net-pci.1 off") 1216 1217 def unblock(self) -> None: 1218 """ 1219 Undo the effect of `block`. 1220 """ 1221 self.send_monitor_command("set_link virtio-net-pci.1 on") 1222 1223 def release(self) -> None: 1224 if self.pid is None: 1225 return 1226 self.logger.info(f"kill machine (pid {self.pid})") 1227 assert self.process 1228 assert self.shell 1229 assert self.monitor 1230 assert self.serial_thread 1231 1232 self.process.terminate() 1233 self.shell.close() 1234 self.monitor.close() 1235 self.serial_thread.join() 1236 1237 def run_callbacks(self) -> None: 1238 for callback in self.callbacks: 1239 callback() 1240 1241 def switch_root(self) -> None: 1242 """ 1243 Transition from stage 1 to stage 2. This requires the 1244 machine to be configured with `testing.initrdBackdoor = true` 1245 and `boot.initrd.systemd.enable = true`. 1246 """ 1247 self.wait_for_unit("initrd.target") 1248 self.execute( 1249 "systemctl isolate --no-block initrd-switch-root.target 2>/dev/null >/dev/null", 1250 check_return=False, 1251 check_output=False, 1252 ) 1253 self.connected = False 1254 self.connect()