at 23.11-beta 45 kB view raw
1import base64 2import io 3import os 4import queue 5import re 6import select 7import shlex 8import shutil 9import socket 10import subprocess 11import sys 12import tempfile 13import threading 14import time 15from contextlib import _GeneratorContextManager, nullcontext 16from pathlib import Path 17from queue import Queue 18from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple 19 20from test_driver.logger import rootlog 21 22from .qmp import QMPSession 23 24CHAR_TO_KEY = { 25 "A": "shift-a", 26 "N": "shift-n", 27 "-": "0x0C", 28 "_": "shift-0x0C", 29 "B": "shift-b", 30 "O": "shift-o", 31 "=": "0x0D", 32 "+": "shift-0x0D", 33 "C": "shift-c", 34 "P": "shift-p", 35 "[": "0x1A", 36 "{": "shift-0x1A", 37 "D": "shift-d", 38 "Q": "shift-q", 39 "]": "0x1B", 40 "}": "shift-0x1B", 41 "E": "shift-e", 42 "R": "shift-r", 43 ";": "0x27", 44 ":": "shift-0x27", 45 "F": "shift-f", 46 "S": "shift-s", 47 "'": "0x28", 48 '"': "shift-0x28", 49 "G": "shift-g", 50 "T": "shift-t", 51 "`": "0x29", 52 "~": "shift-0x29", 53 "H": "shift-h", 54 "U": "shift-u", 55 "\\": "0x2B", 56 "|": "shift-0x2B", 57 "I": "shift-i", 58 "V": "shift-v", 59 ",": "0x33", 60 "<": "shift-0x33", 61 "J": "shift-j", 62 "W": "shift-w", 63 ".": "0x34", 64 ">": "shift-0x34", 65 "K": "shift-k", 66 "X": "shift-x", 67 "/": "0x35", 68 "?": "shift-0x35", 69 "L": "shift-l", 70 "Y": "shift-y", 71 " ": "spc", 72 "M": "shift-m", 73 "Z": "shift-z", 74 "\n": "ret", 75 "!": "shift-0x02", 76 "@": "shift-0x03", 77 "#": "shift-0x04", 78 "$": "shift-0x05", 79 "%": "shift-0x06", 80 "^": "shift-0x07", 81 "&": "shift-0x08", 82 "*": "shift-0x09", 83 "(": "shift-0x0A", 84 ")": "shift-0x0B", 85} 86 87 88def make_command(args: list) -> str: 89 return " ".join(map(shlex.quote, (map(str, args)))) 90 91 92def _perform_ocr_on_screenshot( 93 screenshot_path: str, model_ids: Iterable[int] 94) -> List[str]: 95 if shutil.which("tesseract") is None: 96 raise Exception("OCR requested but enableOCR is false") 97 98 magick_args = ( 99 "-filter Catrom -density 72 -resample 300 " 100 + "-contrast -normalize -despeckle -type grayscale " 101 + "-sharpen 1 -posterize 3 -negate -gamma 100 " 102 + "-blur 1x65535" 103 ) 104 105 tess_args = "-c debug_file=/dev/null --psm 11" 106 107 cmd = f"convert {magick_args} '{screenshot_path}' 'tiff:{screenshot_path}.tiff'" 108 ret = subprocess.run(cmd, shell=True, capture_output=True) 109 if ret.returncode != 0: 110 raise Exception(f"TIFF conversion failed with exit code {ret.returncode}") 111 112 model_results = [] 113 for model_id in model_ids: 114 cmd = f"tesseract '{screenshot_path}.tiff' - {tess_args} --oem '{model_id}'" 115 ret = subprocess.run(cmd, shell=True, capture_output=True) 116 if ret.returncode != 0: 117 raise Exception(f"OCR failed with exit code {ret.returncode}") 118 model_results.append(ret.stdout.decode("utf-8")) 119 120 return model_results 121 122 123def retry(fn: Callable, timeout: int = 900) -> None: 124 """Call the given function repeatedly, with 1 second intervals, 125 until it returns True or a timeout is reached. 126 """ 127 128 for _ in range(timeout): 129 if fn(False): 130 return 131 time.sleep(1) 132 133 if not fn(True): 134 raise Exception(f"action timed out after {timeout} seconds") 135 136 137class StartCommand: 138 """The Base Start Command knows how to append the necessary 139 runtime qemu options as determined by a particular test driver 140 run. Any such start command is expected to happily receive and 141 append additional qemu args. 142 """ 143 144 _cmd: str 145 146 def cmd( 147 self, 148 monitor_socket_path: Path, 149 qmp_socket_path: Path, 150 shell_socket_path: Path, 151 allow_reboot: bool = False, 152 ) -> str: 153 display_opts = "" 154 display_available = any(x in os.environ for x in ["DISPLAY", "WAYLAND_DISPLAY"]) 155 if not display_available: 156 display_opts += " -nographic" 157 158 # qemu options 159 qemu_opts = ( 160 " -device virtio-serial" 161 # Note: virtconsole will map to /dev/hvc0 in Linux guests 162 " -device virtconsole,chardev=shell" 163 " -device virtio-rng-pci" 164 " -serial stdio" 165 ) 166 if not allow_reboot: 167 qemu_opts += " -no-reboot" 168 # TODO: qemu script already catpures this env variable, legacy? 169 qemu_opts += " " + os.environ.get("QEMU_OPTS", "") 170 171 return ( 172 f"{self._cmd}" 173 f" -qmp unix:{qmp_socket_path},server=on,wait=off" 174 f" -monitor unix:{monitor_socket_path}" 175 f" -chardev socket,id=shell,path={shell_socket_path}" 176 f"{qemu_opts}" 177 f"{display_opts}" 178 ) 179 180 @staticmethod 181 def build_environment( 182 state_dir: Path, 183 shared_dir: Path, 184 ) -> dict: 185 # We make a copy to not update the current environment 186 env = dict(os.environ) 187 env.update( 188 { 189 "TMPDIR": str(state_dir), 190 "SHARED_DIR": str(shared_dir), 191 "USE_TMPDIR": "1", 192 } 193 ) 194 return env 195 196 def run( 197 self, 198 state_dir: Path, 199 shared_dir: Path, 200 monitor_socket_path: Path, 201 qmp_socket_path: Path, 202 shell_socket_path: Path, 203 allow_reboot: bool, 204 ) -> subprocess.Popen: 205 return subprocess.Popen( 206 self.cmd( 207 monitor_socket_path, qmp_socket_path, shell_socket_path, allow_reboot 208 ), 209 stdin=subprocess.PIPE, 210 stdout=subprocess.PIPE, 211 stderr=subprocess.STDOUT, 212 shell=True, 213 cwd=state_dir, 214 env=self.build_environment(state_dir, shared_dir), 215 ) 216 217 218class NixStartScript(StartCommand): 219 """A start script from nixos/modules/virtualiation/qemu-vm.nix 220 that also satisfies the requirement of the BaseStartCommand. 221 These Nix commands have the particular characteristic that the 222 machine name can be extracted out of them via a regex match. 223 (Admittedly a _very_ implicit contract, evtl. TODO fix) 224 """ 225 226 def __init__(self, script: str): 227 self._cmd = script 228 229 @property 230 def machine_name(self) -> str: 231 match = re.search("run-(.+)-vm$", self._cmd) 232 name = "machine" 233 if match: 234 name = match.group(1) 235 return name 236 237 238class LegacyStartCommand(StartCommand): 239 """Used in some places to create an ad-hoc machine instead of 240 using nix test instrumentation + module system for that purpose. 241 Legacy. 242 """ 243 244 def __init__( 245 self, 246 netBackendArgs: Optional[str] = None, # noqa: N803 247 netFrontendArgs: Optional[str] = None, # noqa: N803 248 hda: Optional[Tuple[Path, str]] = None, 249 cdrom: Optional[str] = None, 250 usb: Optional[str] = None, 251 bios: Optional[str] = None, 252 qemuBinary: Optional[str] = None, # noqa: N803 253 qemuFlags: Optional[str] = None, # noqa: N803 254 ): 255 if qemuBinary is not None: 256 self._cmd = qemuBinary 257 else: 258 self._cmd = "qemu-kvm" 259 260 self._cmd += " -m 384" 261 262 # networking 263 net_backend = "-netdev user,id=net0" 264 net_frontend = "-device virtio-net-pci,netdev=net0" 265 if netBackendArgs is not None: 266 net_backend += "," + netBackendArgs 267 if netFrontendArgs is not None: 268 net_frontend += "," + netFrontendArgs 269 self._cmd += f" {net_backend} {net_frontend}" 270 271 # hda 272 hda_cmd = "" 273 if hda is not None: 274 hda_path = hda[0].resolve() 275 hda_interface = hda[1] 276 if hda_interface == "scsi": 277 hda_cmd += ( 278 f" -drive id=hda,file={hda_path},werror=report,if=none" 279 " -device scsi-hd,drive=hda" 280 ) 281 else: 282 hda_cmd += f" -drive file={hda_path},if={hda_interface},werror=report" 283 self._cmd += hda_cmd 284 285 # cdrom 286 if cdrom is not None: 287 self._cmd += f" -cdrom {cdrom}" 288 289 # usb 290 usb_cmd = "" 291 if usb is not None: 292 # https://github.com/qemu/qemu/blob/master/docs/usb2.txt 293 usb_cmd += ( 294 " -device usb-ehci" 295 f" -drive id=usbdisk,file={usb},if=none,readonly" 296 " -device usb-storage,drive=usbdisk " 297 ) 298 self._cmd += usb_cmd 299 300 # bios 301 if bios is not None: 302 self._cmd += f" -bios {bios}" 303 304 # qemu flags 305 if qemuFlags is not None: 306 self._cmd += f" {qemuFlags}" 307 308 309class Machine: 310 """A handle to the machine with this name, that also knows how to manage 311 the machine lifecycle with the help of a start script / command.""" 312 313 name: str 314 out_dir: Path 315 tmp_dir: Path 316 shared_dir: Path 317 state_dir: Path 318 monitor_path: Path 319 qmp_path: Path 320 shell_path: Path 321 322 start_command: StartCommand 323 keep_vm_state: bool 324 325 process: Optional[subprocess.Popen] 326 pid: Optional[int] 327 monitor: Optional[socket.socket] 328 qmp_client: Optional[QMPSession] 329 shell: Optional[socket.socket] 330 serial_thread: Optional[threading.Thread] 331 332 booted: bool 333 connected: bool 334 # Store last serial console lines for use 335 # of wait_for_console_text 336 last_lines: Queue = Queue() 337 callbacks: List[Callable] 338 339 def __repr__(self) -> str: 340 return f"<Machine '{self.name}'>" 341 342 def __init__( 343 self, 344 out_dir: Path, 345 tmp_dir: Path, 346 start_command: StartCommand, 347 name: str = "machine", 348 keep_vm_state: bool = False, 349 callbacks: Optional[List[Callable]] = None, 350 ) -> None: 351 self.out_dir = out_dir 352 self.tmp_dir = tmp_dir 353 self.keep_vm_state = keep_vm_state 354 self.name = name 355 self.start_command = start_command 356 self.callbacks = callbacks if callbacks is not None else [] 357 358 # set up directories 359 self.shared_dir = self.tmp_dir / "shared-xchg" 360 self.shared_dir.mkdir(mode=0o700, exist_ok=True) 361 362 self.state_dir = self.tmp_dir / f"vm-state-{self.name}" 363 self.monitor_path = self.state_dir / "monitor" 364 self.qmp_path = self.state_dir / "qmp" 365 self.shell_path = self.state_dir / "shell" 366 if (not self.keep_vm_state) and self.state_dir.exists(): 367 self.cleanup_statedir() 368 self.state_dir.mkdir(mode=0o700, exist_ok=True) 369 370 self.process = None 371 self.pid = None 372 self.monitor = None 373 self.qmp_client = None 374 self.shell = None 375 self.serial_thread = None 376 377 self.booted = False 378 self.connected = False 379 380 @staticmethod 381 def create_startcommand(args: Dict[str, str]) -> StartCommand: 382 rootlog.warning( 383 "Using legacy create_startcommand(), " 384 "please use proper nix test vm instrumentation, instead " 385 "to generate the appropriate nixos test vm qemu startup script" 386 ) 387 hda = None 388 if args.get("hda"): 389 hda_arg: str = args.get("hda", "") 390 hda_arg_path: Path = Path(hda_arg) 391 hda = (hda_arg_path, args.get("hdaInterface", "")) 392 return LegacyStartCommand( 393 netBackendArgs=args.get("netBackendArgs"), 394 netFrontendArgs=args.get("netFrontendArgs"), 395 hda=hda, 396 cdrom=args.get("cdrom"), 397 usb=args.get("usb"), 398 bios=args.get("bios"), 399 qemuBinary=args.get("qemuBinary"), 400 qemuFlags=args.get("qemuFlags"), 401 ) 402 403 def is_up(self) -> bool: 404 return self.booted and self.connected 405 406 def log(self, msg: str) -> None: 407 rootlog.log(msg, {"machine": self.name}) 408 409 def log_serial(self, msg: str) -> None: 410 rootlog.log_serial(msg, self.name) 411 412 def nested(self, msg: str, attrs: Dict[str, str] = {}) -> _GeneratorContextManager: 413 my_attrs = {"machine": self.name} 414 my_attrs.update(attrs) 415 return rootlog.nested(msg, my_attrs) 416 417 def wait_for_monitor_prompt(self) -> str: 418 assert self.monitor is not None 419 answer = "" 420 while True: 421 undecoded_answer = self.monitor.recv(1024) 422 if not undecoded_answer: 423 break 424 answer += undecoded_answer.decode() 425 if answer.endswith("(qemu) "): 426 break 427 return answer 428 429 def send_monitor_command(self, command: str) -> str: 430 """ 431 Send a command to the QEMU monitor. This allows attaching 432 virtual USB disks to a running machine, among other things. 433 """ 434 self.run_callbacks() 435 message = f"{command}\n".encode() 436 assert self.monitor is not None 437 self.monitor.send(message) 438 return self.wait_for_monitor_prompt() 439 440 def wait_for_unit( 441 self, unit: str, user: Optional[str] = None, timeout: int = 900 442 ) -> None: 443 """ 444 Wait for a systemd unit to get into "active" state. 445 Throws exceptions on "failed" and "inactive" states as well as after 446 timing out. 447 """ 448 449 def check_active(_: Any) -> bool: 450 info = self.get_unit_info(unit, user) 451 state = info["ActiveState"] 452 if state == "failed": 453 raise Exception(f'unit "{unit}" reached state "{state}"') 454 455 if state == "inactive": 456 status, jobs = self.systemctl("list-jobs --full 2>&1", user) 457 if "No jobs" in jobs: 458 info = self.get_unit_info(unit, user) 459 if info["ActiveState"] == state: 460 raise Exception( 461 f'unit "{unit}" is inactive and there are no pending jobs' 462 ) 463 464 return state == "active" 465 466 with self.nested( 467 f"waiting for unit {unit}" 468 + (f" with user {user}" if user is not None else "") 469 ): 470 retry(check_active, timeout) 471 472 def get_unit_info(self, unit: str, user: Optional[str] = None) -> Dict[str, str]: 473 status, lines = self.systemctl(f'--no-pager show "{unit}"', user) 474 if status != 0: 475 raise Exception( 476 f'retrieving systemctl info for unit "{unit}"' 477 + ("" if user is None else f' under user "{user}"') 478 + f" failed with exit code {status}" 479 ) 480 481 line_pattern = re.compile(r"^([^=]+)=(.*)$") 482 483 def tuple_from_line(line: str) -> Tuple[str, str]: 484 match = line_pattern.match(line) 485 assert match is not None 486 return match[1], match[2] 487 488 return dict( 489 tuple_from_line(line) 490 for line in lines.split("\n") 491 if line_pattern.match(line) 492 ) 493 494 def systemctl(self, q: str, user: Optional[str] = None) -> Tuple[int, str]: 495 """ 496 Runs `systemctl` commands with optional support for 497 `systemctl --user` 498 499 ```py 500 # run `systemctl list-jobs --no-pager` 501 machine.systemctl("list-jobs --no-pager") 502 503 # spawn a shell for `any-user` and run 504 # `systemctl --user list-jobs --no-pager` 505 machine.systemctl("list-jobs --no-pager", "any-user") 506 ``` 507 """ 508 if user is not None: 509 q = q.replace("'", "\\'") 510 return self.execute( 511 f"su -l {user} --shell /bin/sh -c " 512 "$'XDG_RUNTIME_DIR=/run/user/`id -u` " 513 f"systemctl --user {q}'" 514 ) 515 return self.execute(f"systemctl {q}") 516 517 def require_unit_state(self, unit: str, require_state: str = "active") -> None: 518 with self.nested( 519 f"checking if unit '{unit}' has reached state '{require_state}'" 520 ): 521 info = self.get_unit_info(unit) 522 state = info["ActiveState"] 523 if state != require_state: 524 raise Exception( 525 f"Expected unit '{unit}' to to be in state " 526 f"'{require_state}' but it is in state '{state}'" 527 ) 528 529 def _next_newline_closed_block_from_shell(self) -> str: 530 assert self.shell 531 output_buffer = [] 532 while True: 533 # This receives up to 4096 bytes from the socket 534 chunk = self.shell.recv(4096) 535 if not chunk: 536 # Probably a broken pipe, return the output we have 537 break 538 539 decoded = chunk.decode() 540 output_buffer += [decoded] 541 if decoded[-1] == "\n": 542 break 543 return "".join(output_buffer) 544 545 def execute( 546 self, 547 command: str, 548 check_return: bool = True, 549 check_output: bool = True, 550 timeout: Optional[int] = 900, 551 ) -> Tuple[int, str]: 552 """ 553 Execute a shell command, returning a list `(status, stdout)`. 554 555 Commands are run with `set -euo pipefail` set: 556 557 - If several commands are separated by `;` and one fails, the 558 command as a whole will fail. 559 560 - For pipelines, the last non-zero exit status will be returned 561 (if there is one; otherwise zero will be returned). 562 563 - Dereferencing unset variables fails the command. 564 565 - It will wait for stdout to be closed. 566 567 If the command detaches, it must close stdout, as `execute` will wait 568 for this to consume all output reliably. This can be achieved by 569 redirecting stdout to stderr `>&2`, to `/dev/console`, `/dev/null` or 570 a file. Examples of detaching commands are `sleep 365d &`, where the 571 shell forks a new process that can write to stdout and `xclip -i`, where 572 the `xclip` command itself forks without closing stdout. 573 574 Takes an optional parameter `check_return` that defaults to `True`. 575 Setting this parameter to `False` will not check for the return code 576 and return -1 instead. This can be used for commands that shut down 577 the VM and would therefore break the pipe that would be used for 578 retrieving the return code. 579 580 A timeout for the command can be specified (in seconds) using the optional 581 `timeout` parameter, e.g., `execute(cmd, timeout=10)` or 582 `execute(cmd, timeout=None)`. The default is 900 seconds. 583 """ 584 self.run_callbacks() 585 self.connect() 586 587 # Always run command with shell opts 588 command = f"set -euo pipefail; {command}" 589 590 timeout_str = "" 591 if timeout is not None: 592 timeout_str = f"timeout {timeout}" 593 594 # While sh is bash on NixOS, this is not the case for every distro. 595 # We explicitly call bash here to allow for the driver to boot other distros as well. 596 out_command = ( 597 f"{timeout_str} bash -c {shlex.quote(command)} | (base64 -w 0; echo)\n" 598 ) 599 600 assert self.shell 601 self.shell.send(out_command.encode()) 602 603 if not check_output: 604 return (-2, "") 605 606 # Get the output 607 output = base64.b64decode(self._next_newline_closed_block_from_shell()) 608 609 if not check_return: 610 return (-1, output.decode()) 611 612 # Get the return code 613 self.shell.send(b"echo ${PIPESTATUS[0]}\n") 614 rc = int(self._next_newline_closed_block_from_shell().strip()) 615 616 return (rc, output.decode(errors="replace")) 617 618 def shell_interact(self, address: Optional[str] = None) -> None: 619 """ 620 Allows you to directly interact with the guest shell. This should 621 only be used during test development, not in production tests. 622 Killing the interactive session with `Ctrl-d` or `Ctrl-c` also ends 623 the guest session. 624 """ 625 self.connect() 626 627 if address is None: 628 address = "READLINE,prompt=$ " 629 self.log("Terminal is ready (there is no initial prompt):") 630 631 assert self.shell 632 try: 633 subprocess.run( 634 ["socat", address, f"FD:{self.shell.fileno()}"], 635 pass_fds=[self.shell.fileno()], 636 ) 637 # allow users to cancel this command without breaking the test 638 except KeyboardInterrupt: 639 pass 640 641 def console_interact(self) -> None: 642 """ 643 Allows you to directly interact with QEMU's stdin, by forwarding 644 terminal input to the QEMU process. 645 This is for use with the interactive test driver, not for production 646 tests, which run unattended. 647 Output from QEMU is only read line-wise. `Ctrl-c` kills QEMU and 648 `Ctrl-d` closes console and returns to the test runner. 649 """ 650 self.log("Terminal is ready (there is no prompt):") 651 652 assert self.process 653 assert self.process.stdin 654 655 while True: 656 try: 657 char = sys.stdin.buffer.read(1) 658 except KeyboardInterrupt: 659 break 660 if char == b"": # ctrl+d 661 self.log("Closing connection to the console") 662 break 663 self.send_console(char.decode()) 664 665 def succeed(self, *commands: str, timeout: Optional[int] = None) -> str: 666 """ 667 Execute a shell command, raising an exception if the exit status is 668 not zero, otherwise returning the standard output. Similar to `execute`, 669 except that the timeout is `None` by default. See `execute` for details on 670 command execution. 671 """ 672 output = "" 673 for command in commands: 674 with self.nested(f"must succeed: {command}"): 675 (status, out) = self.execute(command, timeout=timeout) 676 if status != 0: 677 self.log(f"output: {out}") 678 raise Exception(f"command `{command}` failed (exit code {status})") 679 output += out 680 return output 681 682 def fail(self, *commands: str, timeout: Optional[int] = None) -> str: 683 """ 684 Like `succeed`, but raising an exception if the command returns a zero 685 status. 686 """ 687 output = "" 688 for command in commands: 689 with self.nested(f"must fail: {command}"): 690 (status, out) = self.execute(command, timeout=timeout) 691 if status == 0: 692 raise Exception(f"command `{command}` unexpectedly succeeded") 693 output += out 694 return output 695 696 def wait_until_succeeds(self, command: str, timeout: int = 900) -> str: 697 """ 698 Repeat a shell command with 1-second intervals until it succeeds. 699 Has a default timeout of 900 seconds which can be modified, e.g. 700 `wait_until_succeeds(cmd, timeout=10)`. See `execute` for details on 701 command execution. 702 Throws an exception on timeout. 703 """ 704 output = "" 705 706 def check_success(_: Any) -> bool: 707 nonlocal output 708 status, output = self.execute(command, timeout=timeout) 709 return status == 0 710 711 with self.nested(f"waiting for success: {command}"): 712 retry(check_success, timeout) 713 return output 714 715 def wait_until_fails(self, command: str, timeout: int = 900) -> str: 716 """ 717 Like `wait_until_succeeds`, but repeating the command until it fails. 718 """ 719 output = "" 720 721 def check_failure(_: Any) -> bool: 722 nonlocal output 723 status, output = self.execute(command, timeout=timeout) 724 return status != 0 725 726 with self.nested(f"waiting for failure: {command}"): 727 retry(check_failure, timeout) 728 return output 729 730 def wait_for_shutdown(self) -> None: 731 if not self.booted: 732 return 733 734 with self.nested("waiting for the VM to power off"): 735 sys.stdout.flush() 736 assert self.process 737 self.process.wait() 738 739 self.pid = None 740 self.booted = False 741 self.connected = False 742 743 def get_tty_text(self, tty: str) -> str: 744 status, output = self.execute( 745 f"fold -w$(stty -F /dev/tty{tty} size | " 746 f"awk '{{print $2}}') /dev/vcs{tty}" 747 ) 748 return output 749 750 def wait_until_tty_matches(self, tty: str, regexp: str, timeout: int = 900) -> None: 751 """Wait until the visible output on the chosen TTY matches regular 752 expression. Throws an exception on timeout. 753 """ 754 matcher = re.compile(regexp) 755 756 def tty_matches(last: bool) -> bool: 757 text = self.get_tty_text(tty) 758 if last: 759 self.log( 760 f"Last chance to match /{regexp}/ on TTY{tty}, " 761 f"which currently contains: {text}" 762 ) 763 return len(matcher.findall(text)) > 0 764 765 with self.nested(f"waiting for {regexp} to appear on tty {tty}"): 766 retry(tty_matches, timeout) 767 768 def send_chars(self, chars: str, delay: Optional[float] = 0.01) -> None: 769 """ 770 Simulate typing a sequence of characters on the virtual keyboard, 771 e.g., `send_chars("foobar\n")` will type the string `foobar` 772 followed by the Enter key. 773 """ 774 with self.nested(f"sending keys {repr(chars)}"): 775 for char in chars: 776 self.send_key(char, delay, log=False) 777 778 def wait_for_file(self, filename: str, timeout: int = 900) -> None: 779 """ 780 Waits until the file exists in the machine's file system. 781 """ 782 783 def check_file(_: Any) -> bool: 784 status, _ = self.execute(f"test -e {filename}") 785 return status == 0 786 787 with self.nested(f"waiting for file '{filename}'"): 788 retry(check_file, timeout) 789 790 def wait_for_open_port( 791 self, port: int, addr: str = "localhost", timeout: int = 900 792 ) -> None: 793 """ 794 Wait until a process is listening on the given TCP port and IP address 795 (default `localhost`). 796 """ 797 798 def port_is_open(_: Any) -> bool: 799 status, _ = self.execute(f"nc -z {addr} {port}") 800 return status == 0 801 802 with self.nested(f"waiting for TCP port {port} on {addr}"): 803 retry(port_is_open, timeout) 804 805 def wait_for_open_unix_socket( 806 self, addr: str, is_datagram: bool = False, timeout: int = 900 807 ) -> None: 808 """ 809 Wait until a process is listening on the given UNIX-domain socket 810 (default to a UNIX-domain stream socket). 811 """ 812 813 nc_flags = [ 814 "-z", 815 "-uU" if is_datagram else "-U", 816 ] 817 818 def socket_is_open(_: Any) -> bool: 819 status, _ = self.execute(f"nc {' '.join(nc_flags)} {addr}") 820 return status == 0 821 822 with self.nested( 823 f"waiting for UNIX-domain {'datagram' if is_datagram else 'stream'} on '{addr}'" 824 ): 825 retry(socket_is_open, timeout) 826 827 def wait_for_closed_port( 828 self, port: int, addr: str = "localhost", timeout: int = 900 829 ) -> None: 830 """ 831 Wait until nobody is listening on the given TCP port and IP address 832 (default `localhost`). 833 """ 834 835 def port_is_closed(_: Any) -> bool: 836 status, _ = self.execute(f"nc -z {addr} {port}") 837 return status != 0 838 839 with self.nested(f"waiting for TCP port {port} on {addr} to be closed"): 840 retry(port_is_closed, timeout) 841 842 def start_job(self, jobname: str, user: Optional[str] = None) -> Tuple[int, str]: 843 return self.systemctl(f"start {jobname}", user) 844 845 def stop_job(self, jobname: str, user: Optional[str] = None) -> Tuple[int, str]: 846 return self.systemctl(f"stop {jobname}", user) 847 848 def wait_for_job(self, jobname: str) -> None: 849 self.wait_for_unit(jobname) 850 851 def connect(self) -> None: 852 def shell_ready(timeout_secs: int) -> bool: 853 """We sent some data from the backdoor service running on the guest 854 to indicate that the backdoor shell is ready. 855 As soon as we read some data from the socket here, we assume that 856 our root shell is operational. 857 """ 858 (ready, _, _) = select.select([self.shell], [], [], timeout_secs) 859 return bool(ready) 860 861 if self.connected: 862 return 863 864 with self.nested("waiting for the VM to finish booting"): 865 self.start() 866 867 assert self.shell 868 869 tic = time.time() 870 # TODO: do we want to bail after a set number of attempts? 871 while not shell_ready(timeout_secs=30): 872 self.log("Guest root shell did not produce any data yet...") 873 self.log( 874 " To debug, enter the VM and run 'systemctl status backdoor.service'." 875 ) 876 877 while True: 878 chunk = self.shell.recv(1024) 879 # No need to print empty strings, it means we are waiting. 880 if len(chunk) == 0: 881 continue 882 self.log(f"Guest shell says: {chunk!r}") 883 # NOTE: for this to work, nothing must be printed after this line! 884 if b"Spawning backdoor root shell..." in chunk: 885 break 886 887 toc = time.time() 888 889 self.log("connected to guest root shell") 890 self.log(f"(connecting took {toc - tic:.2f} seconds)") 891 self.connected = True 892 893 def screenshot(self, filename: str) -> None: 894 """ 895 Take a picture of the display of the virtual machine, in PNG format. 896 The screenshot will be available in the derivation output. 897 """ 898 if "." not in filename: 899 filename += ".png" 900 if "/" not in filename: 901 filename = os.path.join(self.out_dir, filename) 902 tmp = f"{filename}.ppm" 903 904 with self.nested( 905 f"making screenshot {filename}", 906 {"image": os.path.basename(filename)}, 907 ): 908 self.send_monitor_command(f"screendump {tmp}") 909 ret = subprocess.run(f"pnmtopng '{tmp}' > '{filename}'", shell=True) 910 os.unlink(tmp) 911 if ret.returncode != 0: 912 raise Exception("Cannot convert screenshot") 913 914 def copy_from_host_via_shell(self, source: str, target: str) -> None: 915 """Copy a file from the host into the guest by piping it over the 916 shell into the destination file. Works without host-guest shared folder. 917 Prefer copy_from_host for whenever possible. 918 """ 919 with open(source, "rb") as fh: 920 content_b64 = base64.b64encode(fh.read()).decode() 921 self.succeed( 922 f"mkdir -p $(dirname {target})", 923 f"echo -n {content_b64} | base64 -d > {target}", 924 ) 925 926 def copy_from_host(self, source: str, target: str) -> None: 927 """ 928 Copies a file from host to machine, e.g., 929 `copy_from_host("myfile", "/etc/my/important/file")`. 930 931 The first argument is the file on the host. Note that the "host" refers 932 to the environment in which the test driver runs, which is typically the 933 Nix build sandbox. 934 935 The second argument is the location of the file on the machine that will 936 be written to. 937 938 The file is copied via the `shared_dir` directory which is shared among 939 all the VMs (using a temporary directory). 940 The access rights bits will mimic the ones from the host file and 941 user:group will be root:root. 942 """ 943 host_src = Path(source) 944 vm_target = Path(target) 945 with tempfile.TemporaryDirectory(dir=self.shared_dir) as shared_td: 946 shared_temp = Path(shared_td) 947 host_intermediate = shared_temp / host_src.name 948 vm_shared_temp = Path("/tmp/shared") / shared_temp.name 949 vm_intermediate = vm_shared_temp / host_src.name 950 951 self.succeed(make_command(["mkdir", "-p", vm_shared_temp])) 952 if host_src.is_dir(): 953 shutil.copytree(host_src, host_intermediate) 954 else: 955 shutil.copy(host_src, host_intermediate) 956 self.succeed(make_command(["mkdir", "-p", vm_target.parent])) 957 self.succeed(make_command(["cp", "-r", vm_intermediate, vm_target])) 958 959 def copy_from_vm(self, source: str, target_dir: str = "") -> None: 960 """Copy a file from the VM (specified by an in-VM source path) to a path 961 relative to `$out`. The file is copied via the `shared_dir` shared among 962 all the VMs (using a temporary directory). 963 """ 964 # Compute the source, target, and intermediate shared file names 965 vm_src = Path(source) 966 with tempfile.TemporaryDirectory(dir=self.shared_dir) as shared_td: 967 shared_temp = Path(shared_td) 968 vm_shared_temp = Path("/tmp/shared") / shared_temp.name 969 vm_intermediate = vm_shared_temp / vm_src.name 970 intermediate = shared_temp / vm_src.name 971 # Copy the file to the shared directory inside VM 972 self.succeed(make_command(["mkdir", "-p", vm_shared_temp])) 973 self.succeed(make_command(["cp", "-r", vm_src, vm_intermediate])) 974 abs_target = self.out_dir / target_dir / vm_src.name 975 abs_target.parent.mkdir(exist_ok=True, parents=True) 976 # Copy the file from the shared directory outside VM 977 if intermediate.is_dir(): 978 shutil.copytree(intermediate, abs_target) 979 else: 980 shutil.copy(intermediate, abs_target) 981 982 def dump_tty_contents(self, tty: str) -> None: 983 """Debugging: Dump the contents of the TTY<n>""" 984 self.execute(f"fold -w 80 /dev/vcs{tty} | systemd-cat") 985 986 def _get_screen_text_variants(self, model_ids: Iterable[int]) -> List[str]: 987 with tempfile.TemporaryDirectory() as tmpdir: 988 screenshot_path = os.path.join(tmpdir, "ppm") 989 self.send_monitor_command(f"screendump {screenshot_path}") 990 return _perform_ocr_on_screenshot(screenshot_path, model_ids) 991 992 def get_screen_text_variants(self) -> List[str]: 993 """ 994 Return a list of different interpretations of what is currently 995 visible on the machine's screen using optical character 996 recognition. The number and order of the interpretations is not 997 specified and is subject to change, but if no exception is raised at 998 least one will be returned. 999 1000 ::: {.note} 1001 This requires [`enableOCR`](#test-opt-enableOCR) to be set to `true`. 1002 ::: 1003 """ 1004 return self._get_screen_text_variants([0, 1, 2]) 1005 1006 def get_screen_text(self) -> str: 1007 """ 1008 Return a textual representation of what is currently visible on the 1009 machine's screen using optical character recognition. 1010 1011 ::: {.note} 1012 This requires [`enableOCR`](#test-opt-enableOCR) to be set to `true`. 1013 ::: 1014 """ 1015 return self._get_screen_text_variants([2])[0] 1016 1017 def wait_for_text(self, regex: str, timeout: int = 900) -> None: 1018 """ 1019 Wait until the supplied regular expressions matches the textual 1020 contents of the screen by using optical character recognition (see 1021 `get_screen_text` and `get_screen_text_variants`). 1022 1023 ::: {.note} 1024 This requires [`enableOCR`](#test-opt-enableOCR) to be set to `true`. 1025 ::: 1026 """ 1027 1028 def screen_matches(last: bool) -> bool: 1029 variants = self.get_screen_text_variants() 1030 for text in variants: 1031 if re.search(regex, text) is not None: 1032 return True 1033 1034 if last: 1035 self.log(f"Last OCR attempt failed. Text was: {variants}") 1036 1037 return False 1038 1039 with self.nested(f"waiting for {regex} to appear on screen"): 1040 retry(screen_matches, timeout) 1041 1042 def wait_for_console_text(self, regex: str, timeout: int | None = None) -> None: 1043 """ 1044 Wait until the supplied regular expressions match a line of the 1045 serial console output. 1046 This method is useful when OCR is not possible or inaccurate. 1047 """ 1048 # Buffer the console output, this is needed 1049 # to match multiline regexes. 1050 console = io.StringIO() 1051 1052 def console_matches(_: Any) -> bool: 1053 nonlocal console 1054 try: 1055 # This will return as soon as possible and 1056 # sleep 1 second. 1057 console.write(self.last_lines.get(block=False)) 1058 except queue.Empty: 1059 pass 1060 console.seek(0) 1061 matches = re.search(regex, console.read()) 1062 return matches is not None 1063 1064 with self.nested(f"waiting for {regex} to appear on console"): 1065 if timeout is not None: 1066 retry(console_matches, timeout) 1067 else: 1068 while not console_matches(False): 1069 pass 1070 1071 def send_key( 1072 self, key: str, delay: Optional[float] = 0.01, log: Optional[bool] = True 1073 ) -> None: 1074 """ 1075 Simulate pressing keys on the virtual keyboard, e.g., 1076 `send_key("ctrl-alt-delete")`. 1077 1078 Please also refer to the QEMU documentation for more information on the 1079 input syntax: https://en.wikibooks.org/wiki/QEMU/Monitor#sendkey_keys 1080 """ 1081 key = CHAR_TO_KEY.get(key, key) 1082 context = self.nested(f"sending key {repr(key)}") if log else nullcontext() 1083 with context: 1084 self.send_monitor_command(f"sendkey {key}") 1085 if delay is not None: 1086 time.sleep(delay) 1087 1088 def send_console(self, chars: str) -> None: 1089 r""" 1090 Send keys to the kernel console. This allows interaction with the systemd 1091 emergency mode, for example. Takes a string that is sent, e.g., 1092 `send_console("\n\nsystemctl default\n")`. 1093 """ 1094 assert self.process 1095 assert self.process.stdin 1096 self.process.stdin.write(chars.encode()) 1097 self.process.stdin.flush() 1098 1099 def start(self, allow_reboot: bool = False) -> None: 1100 """ 1101 Start the virtual machine. This method is asynchronous --- it does 1102 not wait for the machine to finish booting. 1103 """ 1104 if self.booted: 1105 return 1106 1107 self.log("starting vm") 1108 1109 def clear(path: Path) -> Path: 1110 if path.exists(): 1111 path.unlink() 1112 return path 1113 1114 def create_socket(path: Path) -> socket.socket: 1115 s = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM) 1116 s.bind(str(path)) 1117 s.listen(1) 1118 return s 1119 1120 monitor_socket = create_socket(clear(self.monitor_path)) 1121 shell_socket = create_socket(clear(self.shell_path)) 1122 self.process = self.start_command.run( 1123 self.state_dir, 1124 self.shared_dir, 1125 self.monitor_path, 1126 self.qmp_path, 1127 self.shell_path, 1128 allow_reboot, 1129 ) 1130 self.monitor, _ = monitor_socket.accept() 1131 self.shell, _ = shell_socket.accept() 1132 self.qmp_client = QMPSession.from_path(self.qmp_path) 1133 1134 # Store last serial console lines for use 1135 # of wait_for_console_text 1136 self.last_lines: Queue = Queue() 1137 1138 def process_serial_output() -> None: 1139 assert self.process 1140 assert self.process.stdout 1141 for _line in self.process.stdout: 1142 # Ignore undecodable bytes that may occur in boot menus 1143 line = _line.decode(errors="ignore").replace("\r", "").rstrip() 1144 self.last_lines.put(line) 1145 self.log_serial(line) 1146 1147 self.serial_thread = threading.Thread(target=process_serial_output) 1148 self.serial_thread.start() 1149 1150 self.wait_for_monitor_prompt() 1151 1152 self.pid = self.process.pid 1153 self.booted = True 1154 1155 self.log(f"QEMU running (pid {self.pid})") 1156 1157 def cleanup_statedir(self) -> None: 1158 shutil.rmtree(self.state_dir) 1159 rootlog.log(f"deleting VM state directory {self.state_dir}") 1160 rootlog.log("if you want to keep the VM state, pass --keep-vm-state") 1161 1162 def shutdown(self) -> None: 1163 """ 1164 Shut down the machine, waiting for the VM to exit. 1165 """ 1166 if not self.booted: 1167 return 1168 1169 assert self.shell 1170 self.shell.send(b"poweroff\n") 1171 self.wait_for_shutdown() 1172 1173 def crash(self) -> None: 1174 """ 1175 Simulate a sudden power failure, by telling the VM to exit immediately. 1176 """ 1177 if not self.booted: 1178 return 1179 1180 self.log("forced crash") 1181 self.send_monitor_command("quit") 1182 self.wait_for_shutdown() 1183 1184 def reboot(self) -> None: 1185 """Press Ctrl+Alt+Delete in the guest. 1186 1187 Prepares the machine to be reconnected which is useful if the 1188 machine was started with `allow_reboot = True` 1189 """ 1190 self.send_key("ctrl-alt-delete") 1191 self.connected = False 1192 1193 def wait_for_x(self, timeout: int = 900) -> None: 1194 """ 1195 Wait until it is possible to connect to the X server. 1196 """ 1197 1198 def check_x(_: Any) -> bool: 1199 cmd = ( 1200 "journalctl -b SYSLOG_IDENTIFIER=systemd | " 1201 + 'grep "Reached target Current graphical"' 1202 ) 1203 status, _ = self.execute(cmd) 1204 if status != 0: 1205 return False 1206 status, _ = self.execute("[ -e /tmp/.X11-unix/X0 ]") 1207 return status == 0 1208 1209 with self.nested("waiting for the X11 server"): 1210 retry(check_x, timeout) 1211 1212 def get_window_names(self) -> List[str]: 1213 return self.succeed( 1214 r"xwininfo -root -tree | sed 's/.*0x[0-9a-f]* \"\([^\"]*\)\".*/\1/; t; d'" 1215 ).splitlines() 1216 1217 def wait_for_window(self, regexp: str, timeout: int = 900) -> None: 1218 """ 1219 Wait until an X11 window has appeared whose name matches the given 1220 regular expression, e.g., `wait_for_window("Terminal")`. 1221 """ 1222 pattern = re.compile(regexp) 1223 1224 def window_is_visible(last_try: bool) -> bool: 1225 names = self.get_window_names() 1226 if last_try: 1227 self.log( 1228 f"Last chance to match {regexp} on the window list," 1229 + " which currently contains: " 1230 + ", ".join(names) 1231 ) 1232 return any(pattern.search(name) for name in names) 1233 1234 with self.nested("waiting for a window to appear"): 1235 retry(window_is_visible, timeout) 1236 1237 def sleep(self, secs: int) -> None: 1238 # We want to sleep in *guest* time, not *host* time. 1239 self.succeed(f"sleep {secs}") 1240 1241 def forward_port(self, host_port: int = 8080, guest_port: int = 80) -> None: 1242 """ 1243 Forward a TCP port on the host to a TCP port on the guest. 1244 Useful during interactive testing. 1245 """ 1246 self.send_monitor_command(f"hostfwd_add tcp::{host_port}-:{guest_port}") 1247 1248 def block(self) -> None: 1249 """ 1250 Simulate unplugging the Ethernet cable that connects the machine to 1251 the other machines. 1252 This happens by shutting down eth1 (the multicast interface used to talk 1253 to the other VMs). eth0 is kept online to still enable the test driver 1254 to communicate with the machine. 1255 """ 1256 self.send_monitor_command("set_link virtio-net-pci.1 off") 1257 1258 def unblock(self) -> None: 1259 """ 1260 Undo the effect of `block`. 1261 """ 1262 self.send_monitor_command("set_link virtio-net-pci.1 on") 1263 1264 def release(self) -> None: 1265 if self.pid is None: 1266 return 1267 rootlog.info(f"kill machine (pid {self.pid})") 1268 assert self.process 1269 assert self.shell 1270 assert self.monitor 1271 assert self.serial_thread 1272 1273 self.process.terminate() 1274 self.shell.close() 1275 self.monitor.close() 1276 self.serial_thread.join() 1277 1278 def run_callbacks(self) -> None: 1279 for callback in self.callbacks: 1280 callback() 1281 1282 def switch_root(self) -> None: 1283 """ 1284 Transition from stage 1 to stage 2. This requires the 1285 machine to be configured with `testing.initrdBackdoor = true` 1286 and `boot.initrd.systemd.enable = true`. 1287 """ 1288 self.wait_for_unit("initrd.target") 1289 self.execute( 1290 "systemctl isolate --no-block initrd-switch-root.target 2>/dev/null >/dev/null", 1291 check_return=False, 1292 check_output=False, 1293 ) 1294 self.wait_for_console_text(r"systemd\[1\]:.*Switching root\.") 1295 self.connected = False 1296 self.connect()