at 23.05-pre 35 kB view raw
1from contextlib import _GeneratorContextManager 2from pathlib import Path 3from queue import Queue 4from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple 5import base64 6import io 7import os 8import queue 9import re 10import shlex 11import shutil 12import socket 13import subprocess 14import sys 15import tempfile 16import threading 17import time 18 19from test_driver.logger import rootlog 20 21CHAR_TO_KEY = { 22 "A": "shift-a", 23 "N": "shift-n", 24 "-": "0x0C", 25 "_": "shift-0x0C", 26 "B": "shift-b", 27 "O": "shift-o", 28 "=": "0x0D", 29 "+": "shift-0x0D", 30 "C": "shift-c", 31 "P": "shift-p", 32 "[": "0x1A", 33 "{": "shift-0x1A", 34 "D": "shift-d", 35 "Q": "shift-q", 36 "]": "0x1B", 37 "}": "shift-0x1B", 38 "E": "shift-e", 39 "R": "shift-r", 40 ";": "0x27", 41 ":": "shift-0x27", 42 "F": "shift-f", 43 "S": "shift-s", 44 "'": "0x28", 45 '"': "shift-0x28", 46 "G": "shift-g", 47 "T": "shift-t", 48 "`": "0x29", 49 "~": "shift-0x29", 50 "H": "shift-h", 51 "U": "shift-u", 52 "\\": "0x2B", 53 "|": "shift-0x2B", 54 "I": "shift-i", 55 "V": "shift-v", 56 ",": "0x33", 57 "<": "shift-0x33", 58 "J": "shift-j", 59 "W": "shift-w", 60 ".": "0x34", 61 ">": "shift-0x34", 62 "K": "shift-k", 63 "X": "shift-x", 64 "/": "0x35", 65 "?": "shift-0x35", 66 "L": "shift-l", 67 "Y": "shift-y", 68 " ": "spc", 69 "M": "shift-m", 70 "Z": "shift-z", 71 "\n": "ret", 72 "!": "shift-0x02", 73 "@": "shift-0x03", 74 "#": "shift-0x04", 75 "$": "shift-0x05", 76 "%": "shift-0x06", 77 "^": "shift-0x07", 78 "&": "shift-0x08", 79 "*": "shift-0x09", 80 "(": "shift-0x0A", 81 ")": "shift-0x0B", 82} 83 84 85def make_command(args: list) -> str: 86 return " ".join(map(shlex.quote, (map(str, args)))) 87 88 89def _perform_ocr_on_screenshot( 90 screenshot_path: str, model_ids: Iterable[int] 91) -> List[str]: 92 if shutil.which("tesseract") is None: 93 raise Exception("OCR requested but enableOCR is false") 94 95 magick_args = ( 96 "-filter Catrom -density 72 -resample 300 " 97 + "-contrast -normalize -despeckle -type grayscale " 98 + "-sharpen 1 -posterize 3 -negate -gamma 100 " 99 + "-blur 1x65535" 100 ) 101 102 tess_args = f"-c debug_file=/dev/null --psm 11" 103 104 cmd = f"convert {magick_args} {screenshot_path} tiff:{screenshot_path}.tiff" 105 ret = subprocess.run(cmd, shell=True, capture_output=True) 106 if ret.returncode != 0: 107 raise Exception(f"TIFF conversion failed with exit code {ret.returncode}") 108 109 model_results = [] 110 for model_id in model_ids: 111 cmd = f"tesseract {screenshot_path}.tiff - {tess_args} --oem {model_id}" 112 ret = subprocess.run(cmd, shell=True, capture_output=True) 113 if ret.returncode != 0: 114 raise Exception(f"OCR failed with exit code {ret.returncode}") 115 model_results.append(ret.stdout.decode("utf-8")) 116 117 return model_results 118 119 120def retry(fn: Callable, timeout: int = 900) -> None: 121 """Call the given function repeatedly, with 1 second intervals, 122 until it returns True or a timeout is reached. 123 """ 124 125 for _ in range(timeout): 126 if fn(False): 127 return 128 time.sleep(1) 129 130 if not fn(True): 131 raise Exception(f"action timed out after {timeout} seconds") 132 133 134class StartCommand: 135 """The Base Start Command knows how to append the necesary 136 runtime qemu options as determined by a particular test driver 137 run. Any such start command is expected to happily receive and 138 append additional qemu args. 139 """ 140 141 _cmd: str 142 143 def cmd( 144 self, 145 monitor_socket_path: Path, 146 shell_socket_path: Path, 147 allow_reboot: bool = False, # TODO: unused, legacy? 148 ) -> str: 149 display_opts = "" 150 display_available = any(x in os.environ for x in ["DISPLAY", "WAYLAND_DISPLAY"]) 151 if not display_available: 152 display_opts += " -nographic" 153 154 # qemu options 155 qemu_opts = "" 156 qemu_opts += ( 157 "" 158 if allow_reboot 159 else " -no-reboot" 160 " -device virtio-serial" 161 " -device virtconsole,chardev=shell" 162 " -device virtio-rng-pci" 163 " -serial stdio" 164 ) 165 # TODO: qemu script already catpures this env variable, legacy? 166 qemu_opts += " " + os.environ.get("QEMU_OPTS", "") 167 168 return ( 169 f"{self._cmd}" 170 f" -monitor unix:{monitor_socket_path}" 171 f" -chardev socket,id=shell,path={shell_socket_path}" 172 f"{qemu_opts}" 173 f"{display_opts}" 174 ) 175 176 @staticmethod 177 def build_environment( 178 state_dir: Path, 179 shared_dir: Path, 180 ) -> dict: 181 # We make a copy to not update the current environment 182 env = dict(os.environ) 183 env.update( 184 { 185 "TMPDIR": str(state_dir), 186 "SHARED_DIR": str(shared_dir), 187 "USE_TMPDIR": "1", 188 } 189 ) 190 return env 191 192 def run( 193 self, 194 state_dir: Path, 195 shared_dir: Path, 196 monitor_socket_path: Path, 197 shell_socket_path: Path, 198 ) -> subprocess.Popen: 199 return subprocess.Popen( 200 self.cmd(monitor_socket_path, shell_socket_path), 201 stdin=subprocess.PIPE, 202 stdout=subprocess.PIPE, 203 stderr=subprocess.STDOUT, 204 shell=True, 205 cwd=state_dir, 206 env=self.build_environment(state_dir, shared_dir), 207 ) 208 209 210class NixStartScript(StartCommand): 211 """A start script from nixos/modules/virtualiation/qemu-vm.nix 212 that also satisfies the requirement of the BaseStartCommand. 213 These Nix commands have the particular charactersitic that the 214 machine name can be extracted out of them via a regex match. 215 (Admittedly a _very_ implicit contract, evtl. TODO fix) 216 """ 217 218 def __init__(self, script: str): 219 self._cmd = script 220 221 @property 222 def machine_name(self) -> str: 223 match = re.search("run-(.+)-vm$", self._cmd) 224 name = "machine" 225 if match: 226 name = match.group(1) 227 return name 228 229 230class LegacyStartCommand(StartCommand): 231 """Used in some places to create an ad-hoc machine instead of 232 using nix test instrumentation + module system for that purpose. 233 Legacy. 234 """ 235 236 def __init__( 237 self, 238 netBackendArgs: Optional[str] = None, 239 netFrontendArgs: Optional[str] = None, 240 hda: Optional[Tuple[Path, str]] = None, 241 cdrom: Optional[str] = None, 242 usb: Optional[str] = None, 243 bios: Optional[str] = None, 244 qemuBinary: Optional[str] = None, 245 qemuFlags: Optional[str] = None, 246 ): 247 if qemuBinary is not None: 248 self._cmd = qemuBinary 249 else: 250 self._cmd = "qemu-kvm" 251 252 self._cmd += " -m 384" 253 254 # networking 255 net_backend = "-netdev user,id=net0" 256 net_frontend = "-device virtio-net-pci,netdev=net0" 257 if netBackendArgs is not None: 258 net_backend += "," + netBackendArgs 259 if netFrontendArgs is not None: 260 net_frontend += "," + netFrontendArgs 261 self._cmd += f" {net_backend} {net_frontend}" 262 263 # hda 264 hda_cmd = "" 265 if hda is not None: 266 hda_path = hda[0].resolve() 267 hda_interface = hda[1] 268 if hda_interface == "scsi": 269 hda_cmd += ( 270 f" -drive id=hda,file={hda_path},werror=report,if=none" 271 " -device scsi-hd,drive=hda" 272 ) 273 else: 274 hda_cmd += f" -drive file={hda_path},if={hda_interface},werror=report" 275 self._cmd += hda_cmd 276 277 # cdrom 278 if cdrom is not None: 279 self._cmd += f" -cdrom {cdrom}" 280 281 # usb 282 usb_cmd = "" 283 if usb is not None: 284 # https://github.com/qemu/qemu/blob/master/docs/usb2.txt 285 usb_cmd += ( 286 " -device usb-ehci" 287 f" -drive id=usbdisk,file={usb},if=none,readonly" 288 " -device usb-storage,drive=usbdisk " 289 ) 290 self._cmd += usb_cmd 291 292 # bios 293 if bios is not None: 294 self._cmd += f" -bios {bios}" 295 296 # qemu flags 297 if qemuFlags is not None: 298 self._cmd += f" {qemuFlags}" 299 300 301class Machine: 302 """A handle to the machine with this name, that also knows how to manage 303 the machine lifecycle with the help of a start script / command.""" 304 305 name: str 306 out_dir: Path 307 tmp_dir: Path 308 shared_dir: Path 309 state_dir: Path 310 monitor_path: Path 311 shell_path: Path 312 313 start_command: StartCommand 314 keep_vm_state: bool 315 allow_reboot: bool 316 317 process: Optional[subprocess.Popen] 318 pid: Optional[int] 319 monitor: Optional[socket.socket] 320 shell: Optional[socket.socket] 321 serial_thread: Optional[threading.Thread] 322 323 booted: bool 324 connected: bool 325 # Store last serial console lines for use 326 # of wait_for_console_text 327 last_lines: Queue = Queue() 328 callbacks: List[Callable] 329 330 def __repr__(self) -> str: 331 return f"<Machine '{self.name}'>" 332 333 def __init__( 334 self, 335 out_dir: Path, 336 tmp_dir: Path, 337 start_command: StartCommand, 338 name: str = "machine", 339 keep_vm_state: bool = False, 340 allow_reboot: bool = False, 341 callbacks: Optional[List[Callable]] = None, 342 ) -> None: 343 self.out_dir = out_dir 344 self.tmp_dir = tmp_dir 345 self.keep_vm_state = keep_vm_state 346 self.allow_reboot = allow_reboot 347 self.name = name 348 self.start_command = start_command 349 self.callbacks = callbacks if callbacks is not None else [] 350 351 # set up directories 352 self.shared_dir = self.tmp_dir / "shared-xchg" 353 self.shared_dir.mkdir(mode=0o700, exist_ok=True) 354 355 self.state_dir = self.tmp_dir / f"vm-state-{self.name}" 356 self.monitor_path = self.state_dir / "monitor" 357 self.shell_path = self.state_dir / "shell" 358 if (not self.keep_vm_state) and self.state_dir.exists(): 359 self.cleanup_statedir() 360 self.state_dir.mkdir(mode=0o700, exist_ok=True) 361 362 self.process = None 363 self.pid = None 364 self.monitor = None 365 self.shell = None 366 self.serial_thread = None 367 368 self.booted = False 369 self.connected = False 370 371 @staticmethod 372 def create_startcommand(args: Dict[str, str]) -> StartCommand: 373 rootlog.warning( 374 "Using legacy create_startcommand()," 375 "please use proper nix test vm instrumentation, instead" 376 "to generate the appropriate nixos test vm qemu startup script" 377 ) 378 hda = None 379 if args.get("hda"): 380 hda_arg: str = args.get("hda", "") 381 hda_arg_path: Path = Path(hda_arg) 382 hda = (hda_arg_path, args.get("hdaInterface", "")) 383 return LegacyStartCommand( 384 netBackendArgs=args.get("netBackendArgs"), 385 netFrontendArgs=args.get("netFrontendArgs"), 386 hda=hda, 387 cdrom=args.get("cdrom"), 388 usb=args.get("usb"), 389 bios=args.get("bios"), 390 qemuBinary=args.get("qemuBinary"), 391 qemuFlags=args.get("qemuFlags"), 392 ) 393 394 def is_up(self) -> bool: 395 return self.booted and self.connected 396 397 def log(self, msg: str) -> None: 398 rootlog.log(msg, {"machine": self.name}) 399 400 def log_serial(self, msg: str) -> None: 401 rootlog.log_serial(msg, self.name) 402 403 def nested(self, msg: str, attrs: Dict[str, str] = {}) -> _GeneratorContextManager: 404 my_attrs = {"machine": self.name} 405 my_attrs.update(attrs) 406 return rootlog.nested(msg, my_attrs) 407 408 def wait_for_monitor_prompt(self) -> str: 409 with self.nested("waiting for monitor prompt"): 410 assert self.monitor is not None 411 answer = "" 412 while True: 413 undecoded_answer = self.monitor.recv(1024) 414 if not undecoded_answer: 415 break 416 answer += undecoded_answer.decode() 417 if answer.endswith("(qemu) "): 418 break 419 return answer 420 421 def send_monitor_command(self, command: str) -> str: 422 self.run_callbacks() 423 with self.nested("sending monitor command: {}".format(command)): 424 message = ("{}\n".format(command)).encode() 425 assert self.monitor is not None 426 self.monitor.send(message) 427 return self.wait_for_monitor_prompt() 428 429 def wait_for_unit( 430 self, unit: str, user: Optional[str] = None, timeout: int = 900 431 ) -> None: 432 """Wait for a systemd unit to get into "active" state. 433 Throws exceptions on "failed" and "inactive" states as well as 434 after timing out. 435 """ 436 437 def check_active(_: Any) -> bool: 438 info = self.get_unit_info(unit, user) 439 state = info["ActiveState"] 440 if state == "failed": 441 raise Exception('unit "{}" reached state "{}"'.format(unit, state)) 442 443 if state == "inactive": 444 status, jobs = self.systemctl("list-jobs --full 2>&1", user) 445 if "No jobs" in jobs: 446 info = self.get_unit_info(unit, user) 447 if info["ActiveState"] == state: 448 raise Exception( 449 ( 450 'unit "{}" is inactive and there ' "are no pending jobs" 451 ).format(unit) 452 ) 453 454 return state == "active" 455 456 with self.nested( 457 "waiting for unit {}{}".format( 458 unit, f" with user {user}" if user is not None else "" 459 ) 460 ): 461 retry(check_active, timeout) 462 463 def get_unit_info(self, unit: str, user: Optional[str] = None) -> Dict[str, str]: 464 status, lines = self.systemctl('--no-pager show "{}"'.format(unit), user) 465 if status != 0: 466 raise Exception( 467 'retrieving systemctl info for unit "{}" {} failed with exit code {}'.format( 468 unit, "" if user is None else 'under user "{}"'.format(user), status 469 ) 470 ) 471 472 line_pattern = re.compile(r"^([^=]+)=(.*)$") 473 474 def tuple_from_line(line: str) -> Tuple[str, str]: 475 match = line_pattern.match(line) 476 assert match is not None 477 return match[1], match[2] 478 479 return dict( 480 tuple_from_line(line) 481 for line in lines.split("\n") 482 if line_pattern.match(line) 483 ) 484 485 def systemctl(self, q: str, user: Optional[str] = None) -> Tuple[int, str]: 486 if user is not None: 487 q = q.replace("'", "\\'") 488 return self.execute( 489 ( 490 "su -l {} --shell /bin/sh -c " 491 "$'XDG_RUNTIME_DIR=/run/user/`id -u` " 492 "systemctl --user {}'" 493 ).format(user, q) 494 ) 495 return self.execute("systemctl {}".format(q)) 496 497 def require_unit_state(self, unit: str, require_state: str = "active") -> None: 498 with self.nested( 499 "checking if unit ‘{}’ has reached state '{}'".format(unit, require_state) 500 ): 501 info = self.get_unit_info(unit) 502 state = info["ActiveState"] 503 if state != require_state: 504 raise Exception( 505 "Expected unit ‘{}’ to to be in state ".format(unit) 506 + "'{}' but it is in state ‘{}".format(require_state, state) 507 ) 508 509 def _next_newline_closed_block_from_shell(self) -> str: 510 assert self.shell 511 output_buffer = [] 512 while True: 513 # This receives up to 4096 bytes from the socket 514 chunk = self.shell.recv(4096) 515 if not chunk: 516 # Probably a broken pipe, return the output we have 517 break 518 519 decoded = chunk.decode() 520 output_buffer += [decoded] 521 if decoded[-1] == "\n": 522 break 523 return "".join(output_buffer) 524 525 def execute( 526 self, command: str, check_return: bool = True, timeout: Optional[int] = 900 527 ) -> Tuple[int, str]: 528 self.run_callbacks() 529 self.connect() 530 531 # Always run command with shell opts 532 command = f"set -euo pipefail; {command}" 533 534 timeout_str = "" 535 if timeout is not None: 536 timeout_str = f"timeout {timeout}" 537 538 out_command = ( 539 f"{timeout_str} sh -c {shlex.quote(command)} | (base64 --wrap 0; echo)\n" 540 ) 541 542 assert self.shell 543 self.shell.send(out_command.encode()) 544 545 # Get the output 546 output = base64.b64decode(self._next_newline_closed_block_from_shell()) 547 548 if not check_return: 549 return (-1, output.decode()) 550 551 # Get the return code 552 self.shell.send("echo ${PIPESTATUS[0]}\n".encode()) 553 rc = int(self._next_newline_closed_block_from_shell().strip()) 554 555 return (rc, output.decode()) 556 557 def shell_interact(self) -> None: 558 """Allows you to interact with the guest shell 559 560 Should only be used during test development, not in the production test.""" 561 self.connect() 562 self.log("Terminal is ready (there is no initial prompt):") 563 564 assert self.shell 565 subprocess.run( 566 ["socat", "READLINE,prompt=$ ", f"FD:{self.shell.fileno()}"], 567 pass_fds=[self.shell.fileno()], 568 ) 569 570 def console_interact(self) -> None: 571 """Allows you to interact with QEMU's stdin 572 573 The shell can be exited with Ctrl+D. Note that Ctrl+C is not allowed to be used. 574 QEMU's stdout is read line-wise. 575 576 Should only be used during test development, not in the production test.""" 577 self.log("Terminal is ready (there is no prompt):") 578 579 assert self.process 580 assert self.process.stdin 581 582 while True: 583 try: 584 char = sys.stdin.buffer.read(1) 585 except KeyboardInterrupt: 586 break 587 if char == b"": # ctrl+d 588 self.log("Closing connection to the console") 589 break 590 self.send_console(char.decode()) 591 592 def succeed(self, *commands: str, timeout: Optional[int] = None) -> str: 593 """Execute each command and check that it succeeds.""" 594 output = "" 595 for command in commands: 596 with self.nested("must succeed: {}".format(command)): 597 (status, out) = self.execute(command, timeout=timeout) 598 if status != 0: 599 self.log("output: {}".format(out)) 600 raise Exception( 601 "command `{}` failed (exit code {})".format(command, status) 602 ) 603 output += out 604 return output 605 606 def fail(self, *commands: str, timeout: Optional[int] = None) -> str: 607 """Execute each command and check that it fails.""" 608 output = "" 609 for command in commands: 610 with self.nested("must fail: {}".format(command)): 611 (status, out) = self.execute(command, timeout=timeout) 612 if status == 0: 613 raise Exception( 614 "command `{}` unexpectedly succeeded".format(command) 615 ) 616 output += out 617 return output 618 619 def wait_until_succeeds(self, command: str, timeout: int = 900) -> str: 620 """Wait until a command returns success and return its output. 621 Throws an exception on timeout. 622 """ 623 output = "" 624 625 def check_success(_: Any) -> bool: 626 nonlocal output 627 status, output = self.execute(command, timeout=timeout) 628 return status == 0 629 630 with self.nested("waiting for success: {}".format(command)): 631 retry(check_success, timeout) 632 return output 633 634 def wait_until_fails(self, command: str, timeout: int = 900) -> str: 635 """Wait until a command returns failure. 636 Throws an exception on timeout. 637 """ 638 output = "" 639 640 def check_failure(_: Any) -> bool: 641 nonlocal output 642 status, output = self.execute(command, timeout=timeout) 643 return status != 0 644 645 with self.nested("waiting for failure: {}".format(command)): 646 retry(check_failure) 647 return output 648 649 def wait_for_shutdown(self) -> None: 650 if not self.booted: 651 return 652 653 with self.nested("waiting for the VM to power off"): 654 sys.stdout.flush() 655 assert self.process 656 self.process.wait() 657 658 self.pid = None 659 self.booted = False 660 self.connected = False 661 662 def get_tty_text(self, tty: str) -> str: 663 status, output = self.execute( 664 "fold -w$(stty -F /dev/tty{0} size | " 665 "awk '{{print $2}}') /dev/vcs{0}".format(tty) 666 ) 667 return output 668 669 def wait_until_tty_matches(self, tty: str, regexp: str) -> None: 670 """Wait until the visible output on the chosen TTY matches regular 671 expression. Throws an exception on timeout. 672 """ 673 matcher = re.compile(regexp) 674 675 def tty_matches(last: bool) -> bool: 676 text = self.get_tty_text(tty) 677 if last: 678 self.log( 679 f"Last chance to match /{regexp}/ on TTY{tty}, " 680 f"which currently contains: {text}" 681 ) 682 return len(matcher.findall(text)) > 0 683 684 with self.nested("waiting for {} to appear on tty {}".format(regexp, tty)): 685 retry(tty_matches) 686 687 def send_chars(self, chars: str, delay: Optional[float] = 0.01) -> None: 688 with self.nested("sending keys ‘{}".format(chars)): 689 for char in chars: 690 self.send_key(char, delay) 691 692 def wait_for_file(self, filename: str) -> None: 693 """Waits until the file exists in machine's file system.""" 694 695 def check_file(_: Any) -> bool: 696 status, _ = self.execute("test -e {}".format(filename)) 697 return status == 0 698 699 with self.nested("waiting for file ‘{}".format(filename)): 700 retry(check_file) 701 702 def wait_for_open_port(self, port: int) -> None: 703 def port_is_open(_: Any) -> bool: 704 status, _ = self.execute("nc -z localhost {}".format(port)) 705 return status == 0 706 707 with self.nested("waiting for TCP port {}".format(port)): 708 retry(port_is_open) 709 710 def wait_for_closed_port(self, port: int) -> None: 711 def port_is_closed(_: Any) -> bool: 712 status, _ = self.execute("nc -z localhost {}".format(port)) 713 return status != 0 714 715 with self.nested("waiting for TCP port {} to be closed".format(port)): 716 retry(port_is_closed) 717 718 def start_job(self, jobname: str, user: Optional[str] = None) -> Tuple[int, str]: 719 return self.systemctl("start {}".format(jobname), user) 720 721 def stop_job(self, jobname: str, user: Optional[str] = None) -> Tuple[int, str]: 722 return self.systemctl("stop {}".format(jobname), user) 723 724 def wait_for_job(self, jobname: str) -> None: 725 self.wait_for_unit(jobname) 726 727 def connect(self) -> None: 728 if self.connected: 729 return 730 731 with self.nested("waiting for the VM to finish booting"): 732 self.start() 733 734 assert self.shell 735 736 tic = time.time() 737 self.shell.recv(1024) 738 # TODO: Timeout 739 toc = time.time() 740 741 self.log("connected to guest root shell") 742 self.log("(connecting took {:.2f} seconds)".format(toc - tic)) 743 self.connected = True 744 745 def screenshot(self, filename: str) -> None: 746 word_pattern = re.compile(r"^\w+$") 747 if word_pattern.match(filename): 748 filename = os.path.join(self.out_dir, "{}.png".format(filename)) 749 tmp = "{}.ppm".format(filename) 750 751 with self.nested( 752 "making screenshot {}".format(filename), 753 {"image": os.path.basename(filename)}, 754 ): 755 self.send_monitor_command("screendump {}".format(tmp)) 756 ret = subprocess.run("pnmtopng {} > {}".format(tmp, filename), shell=True) 757 os.unlink(tmp) 758 if ret.returncode != 0: 759 raise Exception("Cannot convert screenshot") 760 761 def copy_from_host_via_shell(self, source: str, target: str) -> None: 762 """Copy a file from the host into the guest by piping it over the 763 shell into the destination file. Works without host-guest shared folder. 764 Prefer copy_from_host for whenever possible. 765 """ 766 with open(source, "rb") as fh: 767 content_b64 = base64.b64encode(fh.read()).decode() 768 self.succeed( 769 f"mkdir -p $(dirname {target})", 770 f"echo -n {content_b64} | base64 -d > {target}", 771 ) 772 773 def copy_from_host(self, source: str, target: str) -> None: 774 """Copy a file from the host into the guest via the `shared_dir` shared 775 among all the VMs (using a temporary directory). 776 """ 777 host_src = Path(source) 778 vm_target = Path(target) 779 with tempfile.TemporaryDirectory(dir=self.shared_dir) as shared_td: 780 shared_temp = Path(shared_td) 781 host_intermediate = shared_temp / host_src.name 782 vm_shared_temp = Path("/tmp/shared") / shared_temp.name 783 vm_intermediate = vm_shared_temp / host_src.name 784 785 self.succeed(make_command(["mkdir", "-p", vm_shared_temp])) 786 if host_src.is_dir(): 787 shutil.copytree(host_src, host_intermediate) 788 else: 789 shutil.copy(host_src, host_intermediate) 790 self.succeed(make_command(["mkdir", "-p", vm_target.parent])) 791 self.succeed(make_command(["cp", "-r", vm_intermediate, vm_target])) 792 793 def copy_from_vm(self, source: str, target_dir: str = "") -> None: 794 """Copy a file from the VM (specified by an in-VM source path) to a path 795 relative to `$out`. The file is copied via the `shared_dir` shared among 796 all the VMs (using a temporary directory). 797 """ 798 # Compute the source, target, and intermediate shared file names 799 vm_src = Path(source) 800 with tempfile.TemporaryDirectory(dir=self.shared_dir) as shared_td: 801 shared_temp = Path(shared_td) 802 vm_shared_temp = Path("/tmp/shared") / shared_temp.name 803 vm_intermediate = vm_shared_temp / vm_src.name 804 intermediate = shared_temp / vm_src.name 805 # Copy the file to the shared directory inside VM 806 self.succeed(make_command(["mkdir", "-p", vm_shared_temp])) 807 self.succeed(make_command(["cp", "-r", vm_src, vm_intermediate])) 808 abs_target = self.out_dir / target_dir / vm_src.name 809 abs_target.parent.mkdir(exist_ok=True, parents=True) 810 # Copy the file from the shared directory outside VM 811 if intermediate.is_dir(): 812 shutil.copytree(intermediate, abs_target) 813 else: 814 shutil.copy(intermediate, abs_target) 815 816 def dump_tty_contents(self, tty: str) -> None: 817 """Debugging: Dump the contents of the TTY<n>""" 818 self.execute("fold -w 80 /dev/vcs{} | systemd-cat".format(tty)) 819 820 def _get_screen_text_variants(self, model_ids: Iterable[int]) -> List[str]: 821 with tempfile.TemporaryDirectory() as tmpdir: 822 screenshot_path = os.path.join(tmpdir, "ppm") 823 self.send_monitor_command(f"screendump {screenshot_path}") 824 return _perform_ocr_on_screenshot(screenshot_path, model_ids) 825 826 def get_screen_text_variants(self) -> List[str]: 827 return self._get_screen_text_variants([0, 1, 2]) 828 829 def get_screen_text(self) -> str: 830 return self._get_screen_text_variants([2])[0] 831 832 def wait_for_text(self, regex: str) -> None: 833 def screen_matches(last: bool) -> bool: 834 variants = self.get_screen_text_variants() 835 for text in variants: 836 if re.search(regex, text) is not None: 837 return True 838 839 if last: 840 self.log("Last OCR attempt failed. Text was: {}".format(variants)) 841 842 return False 843 844 with self.nested("waiting for {} to appear on screen".format(regex)): 845 retry(screen_matches) 846 847 def wait_for_console_text(self, regex: str) -> None: 848 with self.nested("waiting for {} to appear on console".format(regex)): 849 # Buffer the console output, this is needed 850 # to match multiline regexes. 851 console = io.StringIO() 852 while True: 853 try: 854 console.write(self.last_lines.get()) 855 except queue.Empty: 856 self.sleep(1) 857 continue 858 console.seek(0) 859 matches = re.search(regex, console.read()) 860 if matches is not None: 861 return 862 863 def send_key(self, key: str, delay: Optional[float] = 0.01) -> None: 864 key = CHAR_TO_KEY.get(key, key) 865 self.send_monitor_command("sendkey {}".format(key)) 866 if delay is not None: 867 time.sleep(delay) 868 869 def send_console(self, chars: str) -> None: 870 assert self.process 871 assert self.process.stdin 872 self.process.stdin.write(chars.encode()) 873 self.process.stdin.flush() 874 875 def start(self) -> None: 876 if self.booted: 877 return 878 879 self.log("starting vm") 880 881 def clear(path: Path) -> Path: 882 if path.exists(): 883 path.unlink() 884 return path 885 886 def create_socket(path: Path) -> socket.socket: 887 s = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM) 888 s.bind(str(path)) 889 s.listen(1) 890 return s 891 892 monitor_socket = create_socket(clear(self.monitor_path)) 893 shell_socket = create_socket(clear(self.shell_path)) 894 self.process = self.start_command.run( 895 self.state_dir, 896 self.shared_dir, 897 self.monitor_path, 898 self.shell_path, 899 ) 900 self.monitor, _ = monitor_socket.accept() 901 self.shell, _ = shell_socket.accept() 902 903 # Store last serial console lines for use 904 # of wait_for_console_text 905 self.last_lines: Queue = Queue() 906 907 def process_serial_output() -> None: 908 assert self.process 909 assert self.process.stdout 910 for _line in self.process.stdout: 911 # Ignore undecodable bytes that may occur in boot menus 912 line = _line.decode(errors="ignore").replace("\r", "").rstrip() 913 self.last_lines.put(line) 914 self.log_serial(line) 915 916 self.serial_thread = threading.Thread(target=process_serial_output) 917 self.serial_thread.start() 918 919 self.wait_for_monitor_prompt() 920 921 self.pid = self.process.pid 922 self.booted = True 923 924 self.log("QEMU running (pid {})".format(self.pid)) 925 926 def cleanup_statedir(self) -> None: 927 shutil.rmtree(self.state_dir) 928 rootlog.log(f"deleting VM state directory {self.state_dir}") 929 rootlog.log("if you want to keep the VM state, pass --keep-vm-state") 930 931 def shutdown(self) -> None: 932 if not self.booted: 933 return 934 935 assert self.shell 936 self.shell.send("poweroff\n".encode()) 937 self.wait_for_shutdown() 938 939 def crash(self) -> None: 940 if not self.booted: 941 return 942 943 self.log("forced crash") 944 self.send_monitor_command("quit") 945 self.wait_for_shutdown() 946 947 def wait_for_x(self) -> None: 948 """Wait until it is possible to connect to the X server. Note that 949 testing the existence of /tmp/.X11-unix/X0 is insufficient. 950 """ 951 952 def check_x(_: Any) -> bool: 953 cmd = ( 954 "journalctl -b SYSLOG_IDENTIFIER=systemd | " 955 + 'grep "Reached target Current graphical"' 956 ) 957 status, _ = self.execute(cmd) 958 if status != 0: 959 return False 960 status, _ = self.execute("[ -e /tmp/.X11-unix/X0 ]") 961 return status == 0 962 963 with self.nested("waiting for the X11 server"): 964 retry(check_x) 965 966 def get_window_names(self) -> List[str]: 967 return self.succeed( 968 r"xwininfo -root -tree | sed 's/.*0x[0-9a-f]* \"\([^\"]*\)\".*/\1/; t; d'" 969 ).splitlines() 970 971 def wait_for_window(self, regexp: str) -> None: 972 pattern = re.compile(regexp) 973 974 def window_is_visible(last_try: bool) -> bool: 975 names = self.get_window_names() 976 if last_try: 977 self.log( 978 "Last chance to match {} on the window list,".format(regexp) 979 + " which currently contains: " 980 + ", ".join(names) 981 ) 982 return any(pattern.search(name) for name in names) 983 984 with self.nested("waiting for a window to appear"): 985 retry(window_is_visible) 986 987 def sleep(self, secs: int) -> None: 988 # We want to sleep in *guest* time, not *host* time. 989 self.succeed(f"sleep {secs}") 990 991 def forward_port(self, host_port: int = 8080, guest_port: int = 80) -> None: 992 """Forward a TCP port on the host to a TCP port on the guest. 993 Useful during interactive testing. 994 """ 995 self.send_monitor_command( 996 "hostfwd_add tcp::{}-:{}".format(host_port, guest_port) 997 ) 998 999 def block(self) -> None: 1000 """Make the machine unreachable by shutting down eth1 (the multicast 1001 interface used to talk to the other VMs). We keep eth0 up so that 1002 the test driver can continue to talk to the machine. 1003 """ 1004 self.send_monitor_command("set_link virtio-net-pci.1 off") 1005 1006 def unblock(self) -> None: 1007 """Make the machine reachable.""" 1008 self.send_monitor_command("set_link virtio-net-pci.1 on") 1009 1010 def release(self) -> None: 1011 if self.pid is None: 1012 return 1013 rootlog.info(f"kill machine (pid {self.pid})") 1014 assert self.process 1015 assert self.shell 1016 assert self.monitor 1017 assert self.serial_thread 1018 1019 self.process.terminate() 1020 self.shell.close() 1021 self.monitor.close() 1022 self.serial_thread.join() 1023 1024 def run_callbacks(self) -> None: 1025 for callback in self.callbacks: 1026 callback()