at 23.11-pre 36 kB view raw
1from contextlib import _GeneratorContextManager, nullcontext 2from pathlib import Path 3from queue import Queue 4from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple 5import base64 6import io 7import os 8import queue 9import re 10import select 11import shlex 12import shutil 13import socket 14import subprocess 15import sys 16import tempfile 17import threading 18import time 19 20from test_driver.logger import rootlog 21 22CHAR_TO_KEY = { 23 "A": "shift-a", 24 "N": "shift-n", 25 "-": "0x0C", 26 "_": "shift-0x0C", 27 "B": "shift-b", 28 "O": "shift-o", 29 "=": "0x0D", 30 "+": "shift-0x0D", 31 "C": "shift-c", 32 "P": "shift-p", 33 "[": "0x1A", 34 "{": "shift-0x1A", 35 "D": "shift-d", 36 "Q": "shift-q", 37 "]": "0x1B", 38 "}": "shift-0x1B", 39 "E": "shift-e", 40 "R": "shift-r", 41 ";": "0x27", 42 ":": "shift-0x27", 43 "F": "shift-f", 44 "S": "shift-s", 45 "'": "0x28", 46 '"': "shift-0x28", 47 "G": "shift-g", 48 "T": "shift-t", 49 "`": "0x29", 50 "~": "shift-0x29", 51 "H": "shift-h", 52 "U": "shift-u", 53 "\\": "0x2B", 54 "|": "shift-0x2B", 55 "I": "shift-i", 56 "V": "shift-v", 57 ",": "0x33", 58 "<": "shift-0x33", 59 "J": "shift-j", 60 "W": "shift-w", 61 ".": "0x34", 62 ">": "shift-0x34", 63 "K": "shift-k", 64 "X": "shift-x", 65 "/": "0x35", 66 "?": "shift-0x35", 67 "L": "shift-l", 68 "Y": "shift-y", 69 " ": "spc", 70 "M": "shift-m", 71 "Z": "shift-z", 72 "\n": "ret", 73 "!": "shift-0x02", 74 "@": "shift-0x03", 75 "#": "shift-0x04", 76 "$": "shift-0x05", 77 "%": "shift-0x06", 78 "^": "shift-0x07", 79 "&": "shift-0x08", 80 "*": "shift-0x09", 81 "(": "shift-0x0A", 82 ")": "shift-0x0B", 83} 84 85 86def make_command(args: list) -> str: 87 return " ".join(map(shlex.quote, (map(str, args)))) 88 89 90def _perform_ocr_on_screenshot( 91 screenshot_path: str, model_ids: Iterable[int] 92) -> List[str]: 93 if shutil.which("tesseract") is None: 94 raise Exception("OCR requested but enableOCR is false") 95 96 magick_args = ( 97 "-filter Catrom -density 72 -resample 300 " 98 + "-contrast -normalize -despeckle -type grayscale " 99 + "-sharpen 1 -posterize 3 -negate -gamma 100 " 100 + "-blur 1x65535" 101 ) 102 103 tess_args = "-c debug_file=/dev/null --psm 11" 104 105 cmd = f"convert {magick_args} '{screenshot_path}' 'tiff:{screenshot_path}.tiff'" 106 ret = subprocess.run(cmd, shell=True, capture_output=True) 107 if ret.returncode != 0: 108 raise Exception(f"TIFF conversion failed with exit code {ret.returncode}") 109 110 model_results = [] 111 for model_id in model_ids: 112 cmd = f"tesseract '{screenshot_path}.tiff' - {tess_args} --oem '{model_id}'" 113 ret = subprocess.run(cmd, shell=True, capture_output=True) 114 if ret.returncode != 0: 115 raise Exception(f"OCR failed with exit code {ret.returncode}") 116 model_results.append(ret.stdout.decode("utf-8")) 117 118 return model_results 119 120 121def retry(fn: Callable, timeout: int = 900) -> None: 122 """Call the given function repeatedly, with 1 second intervals, 123 until it returns True or a timeout is reached. 124 """ 125 126 for _ in range(timeout): 127 if fn(False): 128 return 129 time.sleep(1) 130 131 if not fn(True): 132 raise Exception(f"action timed out after {timeout} seconds") 133 134 135class StartCommand: 136 """The Base Start Command knows how to append the necessary 137 runtime qemu options as determined by a particular test driver 138 run. Any such start command is expected to happily receive and 139 append additional qemu args. 140 """ 141 142 _cmd: str 143 144 def cmd( 145 self, 146 monitor_socket_path: Path, 147 shell_socket_path: Path, 148 allow_reboot: bool = False, 149 ) -> str: 150 display_opts = "" 151 display_available = any(x in os.environ for x in ["DISPLAY", "WAYLAND_DISPLAY"]) 152 if not display_available: 153 display_opts += " -nographic" 154 155 # qemu options 156 qemu_opts = ( 157 " -device virtio-serial" 158 # Note: virtconsole will map to /dev/hvc0 in Linux guests 159 " -device virtconsole,chardev=shell" 160 " -device virtio-rng-pci" 161 " -serial stdio" 162 ) 163 if not allow_reboot: 164 qemu_opts += " -no-reboot" 165 # TODO: qemu script already catpures this env variable, legacy? 166 qemu_opts += " " + os.environ.get("QEMU_OPTS", "") 167 168 return ( 169 f"{self._cmd}" 170 f" -monitor unix:{monitor_socket_path}" 171 f" -chardev socket,id=shell,path={shell_socket_path}" 172 f"{qemu_opts}" 173 f"{display_opts}" 174 ) 175 176 @staticmethod 177 def build_environment( 178 state_dir: Path, 179 shared_dir: Path, 180 ) -> dict: 181 # We make a copy to not update the current environment 182 env = dict(os.environ) 183 env.update( 184 { 185 "TMPDIR": str(state_dir), 186 "SHARED_DIR": str(shared_dir), 187 "USE_TMPDIR": "1", 188 } 189 ) 190 return env 191 192 def run( 193 self, 194 state_dir: Path, 195 shared_dir: Path, 196 monitor_socket_path: Path, 197 shell_socket_path: Path, 198 allow_reboot: bool, 199 ) -> subprocess.Popen: 200 return subprocess.Popen( 201 self.cmd(monitor_socket_path, shell_socket_path, allow_reboot), 202 stdin=subprocess.PIPE, 203 stdout=subprocess.PIPE, 204 stderr=subprocess.STDOUT, 205 shell=True, 206 cwd=state_dir, 207 env=self.build_environment(state_dir, shared_dir), 208 ) 209 210 211class NixStartScript(StartCommand): 212 """A start script from nixos/modules/virtualiation/qemu-vm.nix 213 that also satisfies the requirement of the BaseStartCommand. 214 These Nix commands have the particular characteristic that the 215 machine name can be extracted out of them via a regex match. 216 (Admittedly a _very_ implicit contract, evtl. TODO fix) 217 """ 218 219 def __init__(self, script: str): 220 self._cmd = script 221 222 @property 223 def machine_name(self) -> str: 224 match = re.search("run-(.+)-vm$", self._cmd) 225 name = "machine" 226 if match: 227 name = match.group(1) 228 return name 229 230 231class LegacyStartCommand(StartCommand): 232 """Used in some places to create an ad-hoc machine instead of 233 using nix test instrumentation + module system for that purpose. 234 Legacy. 235 """ 236 237 def __init__( 238 self, 239 netBackendArgs: Optional[str] = None, 240 netFrontendArgs: Optional[str] = None, 241 hda: Optional[Tuple[Path, str]] = None, 242 cdrom: Optional[str] = None, 243 usb: Optional[str] = None, 244 bios: Optional[str] = None, 245 qemuBinary: Optional[str] = None, 246 qemuFlags: Optional[str] = None, 247 ): 248 if qemuBinary is not None: 249 self._cmd = qemuBinary 250 else: 251 self._cmd = "qemu-kvm" 252 253 self._cmd += " -m 384" 254 255 # networking 256 net_backend = "-netdev user,id=net0" 257 net_frontend = "-device virtio-net-pci,netdev=net0" 258 if netBackendArgs is not None: 259 net_backend += "," + netBackendArgs 260 if netFrontendArgs is not None: 261 net_frontend += "," + netFrontendArgs 262 self._cmd += f" {net_backend} {net_frontend}" 263 264 # hda 265 hda_cmd = "" 266 if hda is not None: 267 hda_path = hda[0].resolve() 268 hda_interface = hda[1] 269 if hda_interface == "scsi": 270 hda_cmd += ( 271 f" -drive id=hda,file={hda_path},werror=report,if=none" 272 " -device scsi-hd,drive=hda" 273 ) 274 else: 275 hda_cmd += f" -drive file={hda_path},if={hda_interface},werror=report" 276 self._cmd += hda_cmd 277 278 # cdrom 279 if cdrom is not None: 280 self._cmd += f" -cdrom {cdrom}" 281 282 # usb 283 usb_cmd = "" 284 if usb is not None: 285 # https://github.com/qemu/qemu/blob/master/docs/usb2.txt 286 usb_cmd += ( 287 " -device usb-ehci" 288 f" -drive id=usbdisk,file={usb},if=none,readonly" 289 " -device usb-storage,drive=usbdisk " 290 ) 291 self._cmd += usb_cmd 292 293 # bios 294 if bios is not None: 295 self._cmd += f" -bios {bios}" 296 297 # qemu flags 298 if qemuFlags is not None: 299 self._cmd += f" {qemuFlags}" 300 301 302class Machine: 303 """A handle to the machine with this name, that also knows how to manage 304 the machine lifecycle with the help of a start script / command.""" 305 306 name: str 307 out_dir: Path 308 tmp_dir: Path 309 shared_dir: Path 310 state_dir: Path 311 monitor_path: Path 312 shell_path: Path 313 314 start_command: StartCommand 315 keep_vm_state: bool 316 317 process: Optional[subprocess.Popen] 318 pid: Optional[int] 319 monitor: Optional[socket.socket] 320 shell: Optional[socket.socket] 321 serial_thread: Optional[threading.Thread] 322 323 booted: bool 324 connected: bool 325 # Store last serial console lines for use 326 # of wait_for_console_text 327 last_lines: Queue = Queue() 328 callbacks: List[Callable] 329 330 def __repr__(self) -> str: 331 return f"<Machine '{self.name}'>" 332 333 def __init__( 334 self, 335 out_dir: Path, 336 tmp_dir: Path, 337 start_command: StartCommand, 338 name: str = "machine", 339 keep_vm_state: bool = False, 340 callbacks: Optional[List[Callable]] = None, 341 ) -> None: 342 self.out_dir = out_dir 343 self.tmp_dir = tmp_dir 344 self.keep_vm_state = keep_vm_state 345 self.name = name 346 self.start_command = start_command 347 self.callbacks = callbacks if callbacks is not None else [] 348 349 # set up directories 350 self.shared_dir = self.tmp_dir / "shared-xchg" 351 self.shared_dir.mkdir(mode=0o700, exist_ok=True) 352 353 self.state_dir = self.tmp_dir / f"vm-state-{self.name}" 354 self.monitor_path = self.state_dir / "monitor" 355 self.shell_path = self.state_dir / "shell" 356 if (not self.keep_vm_state) and self.state_dir.exists(): 357 self.cleanup_statedir() 358 self.state_dir.mkdir(mode=0o700, exist_ok=True) 359 360 self.process = None 361 self.pid = None 362 self.monitor = None 363 self.shell = None 364 self.serial_thread = None 365 366 self.booted = False 367 self.connected = False 368 369 @staticmethod 370 def create_startcommand(args: Dict[str, str]) -> StartCommand: 371 rootlog.warning( 372 "Using legacy create_startcommand()," 373 "please use proper nix test vm instrumentation, instead" 374 "to generate the appropriate nixos test vm qemu startup script" 375 ) 376 hda = None 377 if args.get("hda"): 378 hda_arg: str = args.get("hda", "") 379 hda_arg_path: Path = Path(hda_arg) 380 hda = (hda_arg_path, args.get("hdaInterface", "")) 381 return LegacyStartCommand( 382 netBackendArgs=args.get("netBackendArgs"), 383 netFrontendArgs=args.get("netFrontendArgs"), 384 hda=hda, 385 cdrom=args.get("cdrom"), 386 usb=args.get("usb"), 387 bios=args.get("bios"), 388 qemuBinary=args.get("qemuBinary"), 389 qemuFlags=args.get("qemuFlags"), 390 ) 391 392 def is_up(self) -> bool: 393 return self.booted and self.connected 394 395 def log(self, msg: str) -> None: 396 rootlog.log(msg, {"machine": self.name}) 397 398 def log_serial(self, msg: str) -> None: 399 rootlog.log_serial(msg, self.name) 400 401 def nested(self, msg: str, attrs: Dict[str, str] = {}) -> _GeneratorContextManager: 402 my_attrs = {"machine": self.name} 403 my_attrs.update(attrs) 404 return rootlog.nested(msg, my_attrs) 405 406 def wait_for_monitor_prompt(self) -> str: 407 assert self.monitor is not None 408 answer = "" 409 while True: 410 undecoded_answer = self.monitor.recv(1024) 411 if not undecoded_answer: 412 break 413 answer += undecoded_answer.decode() 414 if answer.endswith("(qemu) "): 415 break 416 return answer 417 418 def send_monitor_command(self, command: str) -> str: 419 self.run_callbacks() 420 message = f"{command}\n".encode() 421 assert self.monitor is not None 422 self.monitor.send(message) 423 return self.wait_for_monitor_prompt() 424 425 def wait_for_unit( 426 self, unit: str, user: Optional[str] = None, timeout: int = 900 427 ) -> None: 428 """Wait for a systemd unit to get into "active" state. 429 Throws exceptions on "failed" and "inactive" states as well as 430 after timing out. 431 """ 432 433 def check_active(_: Any) -> bool: 434 info = self.get_unit_info(unit, user) 435 state = info["ActiveState"] 436 if state == "failed": 437 raise Exception(f'unit "{unit}" reached state "{state}"') 438 439 if state == "inactive": 440 status, jobs = self.systemctl("list-jobs --full 2>&1", user) 441 if "No jobs" in jobs: 442 info = self.get_unit_info(unit, user) 443 if info["ActiveState"] == state: 444 raise Exception( 445 f'unit "{unit}" is inactive and there are no pending jobs' 446 ) 447 448 return state == "active" 449 450 with self.nested( 451 f"waiting for unit {unit}" 452 + (f" with user {user}" if user is not None else "") 453 ): 454 retry(check_active, timeout) 455 456 def get_unit_info(self, unit: str, user: Optional[str] = None) -> Dict[str, str]: 457 status, lines = self.systemctl(f'--no-pager show "{unit}"', user) 458 if status != 0: 459 raise Exception( 460 f'retrieving systemctl info for unit "{unit}"' 461 + ("" if user is None else f' under user "{user}"') 462 + f" failed with exit code {status}" 463 ) 464 465 line_pattern = re.compile(r"^([^=]+)=(.*)$") 466 467 def tuple_from_line(line: str) -> Tuple[str, str]: 468 match = line_pattern.match(line) 469 assert match is not None 470 return match[1], match[2] 471 472 return dict( 473 tuple_from_line(line) 474 for line in lines.split("\n") 475 if line_pattern.match(line) 476 ) 477 478 def systemctl(self, q: str, user: Optional[str] = None) -> Tuple[int, str]: 479 if user is not None: 480 q = q.replace("'", "\\'") 481 return self.execute( 482 f"su -l {user} --shell /bin/sh -c " 483 "$'XDG_RUNTIME_DIR=/run/user/`id -u` " 484 f"systemctl --user {q}'" 485 ) 486 return self.execute(f"systemctl {q}") 487 488 def require_unit_state(self, unit: str, require_state: str = "active") -> None: 489 with self.nested( 490 f"checking if unit '{unit}' has reached state '{require_state}'" 491 ): 492 info = self.get_unit_info(unit) 493 state = info["ActiveState"] 494 if state != require_state: 495 raise Exception( 496 f"Expected unit '{unit}' to to be in state " 497 f"'{require_state}' but it is in state '{state}'" 498 ) 499 500 def _next_newline_closed_block_from_shell(self) -> str: 501 assert self.shell 502 output_buffer = [] 503 while True: 504 # This receives up to 4096 bytes from the socket 505 chunk = self.shell.recv(4096) 506 if not chunk: 507 # Probably a broken pipe, return the output we have 508 break 509 510 decoded = chunk.decode() 511 output_buffer += [decoded] 512 if decoded[-1] == "\n": 513 break 514 return "".join(output_buffer) 515 516 def execute( 517 self, command: str, check_return: bool = True, timeout: Optional[int] = 900 518 ) -> Tuple[int, str]: 519 self.run_callbacks() 520 self.connect() 521 522 # Always run command with shell opts 523 command = f"set -euo pipefail; {command}" 524 525 timeout_str = "" 526 if timeout is not None: 527 timeout_str = f"timeout {timeout}" 528 529 # While sh is bash on NixOS, this is not the case for every distro. 530 # We explicitly call bash here to allow for the driver to boot other distros as well. 531 out_command = ( 532 f"{timeout_str} bash -c {shlex.quote(command)} | (base64 --wrap 0; echo)\n" 533 ) 534 535 assert self.shell 536 self.shell.send(out_command.encode()) 537 538 # Get the output 539 output = base64.b64decode(self._next_newline_closed_block_from_shell()) 540 541 if not check_return: 542 return (-1, output.decode()) 543 544 # Get the return code 545 self.shell.send("echo ${PIPESTATUS[0]}\n".encode()) 546 rc = int(self._next_newline_closed_block_from_shell().strip()) 547 548 return (rc, output.decode(errors="replace")) 549 550 def shell_interact(self, address: Optional[str] = None) -> None: 551 """Allows you to interact with the guest shell for debugging purposes. 552 553 @address string passed to socat that will be connected to the guest shell. 554 Check the `Running Tests interactivly` chapter of NixOS manual for an example. 555 """ 556 self.connect() 557 558 if address is None: 559 address = "READLINE,prompt=$ " 560 self.log("Terminal is ready (there is no initial prompt):") 561 562 assert self.shell 563 try: 564 subprocess.run( 565 ["socat", address, f"FD:{self.shell.fileno()}"], 566 pass_fds=[self.shell.fileno()], 567 ) 568 # allow users to cancel this command without breaking the test 569 except KeyboardInterrupt: 570 pass 571 572 def console_interact(self) -> None: 573 """Allows you to interact with QEMU's stdin 574 575 The shell can be exited with Ctrl+D. Note that Ctrl+C is not allowed to be used. 576 QEMU's stdout is read line-wise. 577 578 Should only be used during test development, not in the production test.""" 579 self.log("Terminal is ready (there is no prompt):") 580 581 assert self.process 582 assert self.process.stdin 583 584 while True: 585 try: 586 char = sys.stdin.buffer.read(1) 587 except KeyboardInterrupt: 588 break 589 if char == b"": # ctrl+d 590 self.log("Closing connection to the console") 591 break 592 self.send_console(char.decode()) 593 594 def succeed(self, *commands: str, timeout: Optional[int] = None) -> str: 595 """Execute each command and check that it succeeds.""" 596 output = "" 597 for command in commands: 598 with self.nested(f"must succeed: {command}"): 599 (status, out) = self.execute(command, timeout=timeout) 600 if status != 0: 601 self.log(f"output: {out}") 602 raise Exception(f"command `{command}` failed (exit code {status})") 603 output += out 604 return output 605 606 def fail(self, *commands: str, timeout: Optional[int] = None) -> str: 607 """Execute each command and check that it fails.""" 608 output = "" 609 for command in commands: 610 with self.nested(f"must fail: {command}"): 611 (status, out) = self.execute(command, timeout=timeout) 612 if status == 0: 613 raise Exception(f"command `{command}` unexpectedly succeeded") 614 output += out 615 return output 616 617 def wait_until_succeeds(self, command: str, timeout: int = 900) -> str: 618 """Wait until a command returns success and return its output. 619 Throws an exception on timeout. 620 """ 621 output = "" 622 623 def check_success(_: Any) -> bool: 624 nonlocal output 625 status, output = self.execute(command, timeout=timeout) 626 return status == 0 627 628 with self.nested(f"waiting for success: {command}"): 629 retry(check_success, timeout) 630 return output 631 632 def wait_until_fails(self, command: str, timeout: int = 900) -> str: 633 """Wait until a command returns failure. 634 Throws an exception on timeout. 635 """ 636 output = "" 637 638 def check_failure(_: Any) -> bool: 639 nonlocal output 640 status, output = self.execute(command, timeout=timeout) 641 return status != 0 642 643 with self.nested(f"waiting for failure: {command}"): 644 retry(check_failure) 645 return output 646 647 def wait_for_shutdown(self) -> None: 648 if not self.booted: 649 return 650 651 with self.nested("waiting for the VM to power off"): 652 sys.stdout.flush() 653 assert self.process 654 self.process.wait() 655 656 self.pid = None 657 self.booted = False 658 self.connected = False 659 660 def get_tty_text(self, tty: str) -> str: 661 status, output = self.execute( 662 f"fold -w$(stty -F /dev/tty{tty} size | " 663 f"awk '{{print $2}}') /dev/vcs{tty}" 664 ) 665 return output 666 667 def wait_until_tty_matches(self, tty: str, regexp: str) -> None: 668 """Wait until the visible output on the chosen TTY matches regular 669 expression. Throws an exception on timeout. 670 """ 671 matcher = re.compile(regexp) 672 673 def tty_matches(last: bool) -> bool: 674 text = self.get_tty_text(tty) 675 if last: 676 self.log( 677 f"Last chance to match /{regexp}/ on TTY{tty}, " 678 f"which currently contains: {text}" 679 ) 680 return len(matcher.findall(text)) > 0 681 682 with self.nested(f"waiting for {regexp} to appear on tty {tty}"): 683 retry(tty_matches) 684 685 def send_chars(self, chars: str, delay: Optional[float] = 0.01) -> None: 686 with self.nested(f"sending keys {repr(chars)}"): 687 for char in chars: 688 self.send_key(char, delay, log=False) 689 690 def wait_for_file(self, filename: str) -> None: 691 """Waits until the file exists in machine's file system.""" 692 693 def check_file(_: Any) -> bool: 694 status, _ = self.execute(f"test -e {filename}") 695 return status == 0 696 697 with self.nested(f"waiting for file '{filename}'"): 698 retry(check_file) 699 700 def wait_for_open_port(self, port: int, addr: str = "localhost") -> None: 701 def port_is_open(_: Any) -> bool: 702 status, _ = self.execute(f"nc -z {addr} {port}") 703 return status == 0 704 705 with self.nested(f"waiting for TCP port {port} on {addr}"): 706 retry(port_is_open) 707 708 def wait_for_closed_port(self, port: int, addr: str = "localhost") -> None: 709 def port_is_closed(_: Any) -> bool: 710 status, _ = self.execute(f"nc -z {addr} {port}") 711 return status != 0 712 713 with self.nested(f"waiting for TCP port {port} on {addr} to be closed"): 714 retry(port_is_closed) 715 716 def start_job(self, jobname: str, user: Optional[str] = None) -> Tuple[int, str]: 717 return self.systemctl(f"start {jobname}", user) 718 719 def stop_job(self, jobname: str, user: Optional[str] = None) -> Tuple[int, str]: 720 return self.systemctl(f"stop {jobname}", user) 721 722 def wait_for_job(self, jobname: str) -> None: 723 self.wait_for_unit(jobname) 724 725 def connect(self) -> None: 726 def shell_ready(timeout_secs: int) -> bool: 727 """We sent some data from the backdoor service running on the guest 728 to indicate that the backdoor shell is ready. 729 As soon as we read some data from the socket here, we assume that 730 our root shell is operational. 731 """ 732 (ready, _, _) = select.select([self.shell], [], [], timeout_secs) 733 return bool(ready) 734 735 if self.connected: 736 return 737 738 with self.nested("waiting for the VM to finish booting"): 739 self.start() 740 741 assert self.shell 742 743 tic = time.time() 744 # TODO: do we want to bail after a set number of attempts? 745 while not shell_ready(timeout_secs=30): 746 self.log("Guest root shell did not produce any data yet...") 747 748 self.log(self.shell.recv(1024).decode()) 749 toc = time.time() 750 751 self.log("connected to guest root shell") 752 self.log(f"(connecting took {toc - tic:.2f} seconds)") 753 self.connected = True 754 755 def screenshot(self, filename: str) -> None: 756 if "." not in filename: 757 filename += ".png" 758 if "/" not in filename: 759 filename = os.path.join(self.out_dir, filename) 760 tmp = f"{filename}.ppm" 761 762 with self.nested( 763 f"making screenshot {filename}", 764 {"image": os.path.basename(filename)}, 765 ): 766 self.send_monitor_command(f"screendump {tmp}") 767 ret = subprocess.run(f"pnmtopng '{tmp}' > '{filename}'", shell=True) 768 os.unlink(tmp) 769 if ret.returncode != 0: 770 raise Exception("Cannot convert screenshot") 771 772 def copy_from_host_via_shell(self, source: str, target: str) -> None: 773 """Copy a file from the host into the guest by piping it over the 774 shell into the destination file. Works without host-guest shared folder. 775 Prefer copy_from_host for whenever possible. 776 """ 777 with open(source, "rb") as fh: 778 content_b64 = base64.b64encode(fh.read()).decode() 779 self.succeed( 780 f"mkdir -p $(dirname {target})", 781 f"echo -n {content_b64} | base64 -d > {target}", 782 ) 783 784 def copy_from_host(self, source: str, target: str) -> None: 785 """Copy a file from the host into the guest via the `shared_dir` shared 786 among all the VMs (using a temporary directory). 787 """ 788 host_src = Path(source) 789 vm_target = Path(target) 790 with tempfile.TemporaryDirectory(dir=self.shared_dir) as shared_td: 791 shared_temp = Path(shared_td) 792 host_intermediate = shared_temp / host_src.name 793 vm_shared_temp = Path("/tmp/shared") / shared_temp.name 794 vm_intermediate = vm_shared_temp / host_src.name 795 796 self.succeed(make_command(["mkdir", "-p", vm_shared_temp])) 797 if host_src.is_dir(): 798 shutil.copytree(host_src, host_intermediate) 799 else: 800 shutil.copy(host_src, host_intermediate) 801 self.succeed(make_command(["mkdir", "-p", vm_target.parent])) 802 self.succeed(make_command(["cp", "-r", vm_intermediate, vm_target])) 803 804 def copy_from_vm(self, source: str, target_dir: str = "") -> None: 805 """Copy a file from the VM (specified by an in-VM source path) to a path 806 relative to `$out`. The file is copied via the `shared_dir` shared among 807 all the VMs (using a temporary directory). 808 """ 809 # Compute the source, target, and intermediate shared file names 810 vm_src = Path(source) 811 with tempfile.TemporaryDirectory(dir=self.shared_dir) as shared_td: 812 shared_temp = Path(shared_td) 813 vm_shared_temp = Path("/tmp/shared") / shared_temp.name 814 vm_intermediate = vm_shared_temp / vm_src.name 815 intermediate = shared_temp / vm_src.name 816 # Copy the file to the shared directory inside VM 817 self.succeed(make_command(["mkdir", "-p", vm_shared_temp])) 818 self.succeed(make_command(["cp", "-r", vm_src, vm_intermediate])) 819 abs_target = self.out_dir / target_dir / vm_src.name 820 abs_target.parent.mkdir(exist_ok=True, parents=True) 821 # Copy the file from the shared directory outside VM 822 if intermediate.is_dir(): 823 shutil.copytree(intermediate, abs_target) 824 else: 825 shutil.copy(intermediate, abs_target) 826 827 def dump_tty_contents(self, tty: str) -> None: 828 """Debugging: Dump the contents of the TTY<n>""" 829 self.execute(f"fold -w 80 /dev/vcs{tty} | systemd-cat") 830 831 def _get_screen_text_variants(self, model_ids: Iterable[int]) -> List[str]: 832 with tempfile.TemporaryDirectory() as tmpdir: 833 screenshot_path = os.path.join(tmpdir, "ppm") 834 self.send_monitor_command(f"screendump {screenshot_path}") 835 return _perform_ocr_on_screenshot(screenshot_path, model_ids) 836 837 def get_screen_text_variants(self) -> List[str]: 838 return self._get_screen_text_variants([0, 1, 2]) 839 840 def get_screen_text(self) -> str: 841 return self._get_screen_text_variants([2])[0] 842 843 def wait_for_text(self, regex: str) -> None: 844 def screen_matches(last: bool) -> bool: 845 variants = self.get_screen_text_variants() 846 for text in variants: 847 if re.search(regex, text) is not None: 848 return True 849 850 if last: 851 self.log(f"Last OCR attempt failed. Text was: {variants}") 852 853 return False 854 855 with self.nested(f"waiting for {regex} to appear on screen"): 856 retry(screen_matches) 857 858 def wait_for_console_text(self, regex: str) -> None: 859 with self.nested(f"waiting for {regex} to appear on console"): 860 # Buffer the console output, this is needed 861 # to match multiline regexes. 862 console = io.StringIO() 863 while True: 864 try: 865 console.write(self.last_lines.get()) 866 except queue.Empty: 867 self.sleep(1) 868 continue 869 console.seek(0) 870 matches = re.search(regex, console.read()) 871 if matches is not None: 872 return 873 874 def send_key( 875 self, key: str, delay: Optional[float] = 0.01, log: Optional[bool] = True 876 ) -> None: 877 key = CHAR_TO_KEY.get(key, key) 878 context = self.nested(f"sending key {repr(key)}") if log else nullcontext() 879 with context: 880 self.send_monitor_command(f"sendkey {key}") 881 if delay is not None: 882 time.sleep(delay) 883 884 def send_console(self, chars: str) -> None: 885 assert self.process 886 assert self.process.stdin 887 self.process.stdin.write(chars.encode()) 888 self.process.stdin.flush() 889 890 def start(self, allow_reboot: bool = False) -> None: 891 if self.booted: 892 return 893 894 self.log("starting vm") 895 896 def clear(path: Path) -> Path: 897 if path.exists(): 898 path.unlink() 899 return path 900 901 def create_socket(path: Path) -> socket.socket: 902 s = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM) 903 s.bind(str(path)) 904 s.listen(1) 905 return s 906 907 monitor_socket = create_socket(clear(self.monitor_path)) 908 shell_socket = create_socket(clear(self.shell_path)) 909 self.process = self.start_command.run( 910 self.state_dir, 911 self.shared_dir, 912 self.monitor_path, 913 self.shell_path, 914 allow_reboot, 915 ) 916 self.monitor, _ = monitor_socket.accept() 917 self.shell, _ = shell_socket.accept() 918 919 # Store last serial console lines for use 920 # of wait_for_console_text 921 self.last_lines: Queue = Queue() 922 923 def process_serial_output() -> None: 924 assert self.process 925 assert self.process.stdout 926 for _line in self.process.stdout: 927 # Ignore undecodable bytes that may occur in boot menus 928 line = _line.decode(errors="ignore").replace("\r", "").rstrip() 929 self.last_lines.put(line) 930 self.log_serial(line) 931 932 self.serial_thread = threading.Thread(target=process_serial_output) 933 self.serial_thread.start() 934 935 self.wait_for_monitor_prompt() 936 937 self.pid = self.process.pid 938 self.booted = True 939 940 self.log(f"QEMU running (pid {self.pid})") 941 942 def cleanup_statedir(self) -> None: 943 shutil.rmtree(self.state_dir) 944 rootlog.log(f"deleting VM state directory {self.state_dir}") 945 rootlog.log("if you want to keep the VM state, pass --keep-vm-state") 946 947 def shutdown(self) -> None: 948 if not self.booted: 949 return 950 951 assert self.shell 952 self.shell.send("poweroff\n".encode()) 953 self.wait_for_shutdown() 954 955 def crash(self) -> None: 956 if not self.booted: 957 return 958 959 self.log("forced crash") 960 self.send_monitor_command("quit") 961 self.wait_for_shutdown() 962 963 def reboot(self) -> None: 964 """Press Ctrl+Alt+Delete in the guest. 965 966 Prepares the machine to be reconnected which is useful if the 967 machine was started with `allow_reboot = True` 968 """ 969 self.send_key("ctrl-alt-delete") 970 self.connected = False 971 972 def wait_for_x(self) -> None: 973 """Wait until it is possible to connect to the X server. Note that 974 testing the existence of /tmp/.X11-unix/X0 is insufficient. 975 """ 976 977 def check_x(_: Any) -> bool: 978 cmd = ( 979 "journalctl -b SYSLOG_IDENTIFIER=systemd | " 980 + 'grep "Reached target Current graphical"' 981 ) 982 status, _ = self.execute(cmd) 983 if status != 0: 984 return False 985 status, _ = self.execute("[ -e /tmp/.X11-unix/X0 ]") 986 return status == 0 987 988 with self.nested("waiting for the X11 server"): 989 retry(check_x) 990 991 def get_window_names(self) -> List[str]: 992 return self.succeed( 993 r"xwininfo -root -tree | sed 's/.*0x[0-9a-f]* \"\([^\"]*\)\".*/\1/; t; d'" 994 ).splitlines() 995 996 def wait_for_window(self, regexp: str) -> None: 997 pattern = re.compile(regexp) 998 999 def window_is_visible(last_try: bool) -> bool: 1000 names = self.get_window_names() 1001 if last_try: 1002 self.log( 1003 f"Last chance to match {regexp} on the window list," 1004 + " which currently contains: " 1005 + ", ".join(names) 1006 ) 1007 return any(pattern.search(name) for name in names) 1008 1009 with self.nested("waiting for a window to appear"): 1010 retry(window_is_visible) 1011 1012 def sleep(self, secs: int) -> None: 1013 # We want to sleep in *guest* time, not *host* time. 1014 self.succeed(f"sleep {secs}") 1015 1016 def forward_port(self, host_port: int = 8080, guest_port: int = 80) -> None: 1017 """Forward a TCP port on the host to a TCP port on the guest. 1018 Useful during interactive testing. 1019 """ 1020 self.send_monitor_command(f"hostfwd_add tcp::{host_port}-:{guest_port}") 1021 1022 def block(self) -> None: 1023 """Make the machine unreachable by shutting down eth1 (the multicast 1024 interface used to talk to the other VMs). We keep eth0 up so that 1025 the test driver can continue to talk to the machine. 1026 """ 1027 self.send_monitor_command("set_link virtio-net-pci.1 off") 1028 1029 def unblock(self) -> None: 1030 """Make the machine reachable.""" 1031 self.send_monitor_command("set_link virtio-net-pci.1 on") 1032 1033 def release(self) -> None: 1034 if self.pid is None: 1035 return 1036 rootlog.info(f"kill machine (pid {self.pid})") 1037 assert self.process 1038 assert self.shell 1039 assert self.monitor 1040 assert self.serial_thread 1041 1042 self.process.terminate() 1043 self.shell.close() 1044 self.monitor.close() 1045 self.serial_thread.join() 1046 1047 def run_callbacks(self) -> None: 1048 for callback in self.callbacks: 1049 callback()