1from contextlib import _GeneratorContextManager
2from pathlib import Path
3from queue import Queue
4from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple
5import base64
6import io
7import os
8import queue
9import re
10import shlex
11import shutil
12import socket
13import subprocess
14import sys
15import tempfile
16import threading
17import time
18
19from test_driver.logger import rootlog
20
21CHAR_TO_KEY = {
22 "A": "shift-a",
23 "N": "shift-n",
24 "-": "0x0C",
25 "_": "shift-0x0C",
26 "B": "shift-b",
27 "O": "shift-o",
28 "=": "0x0D",
29 "+": "shift-0x0D",
30 "C": "shift-c",
31 "P": "shift-p",
32 "[": "0x1A",
33 "{": "shift-0x1A",
34 "D": "shift-d",
35 "Q": "shift-q",
36 "]": "0x1B",
37 "}": "shift-0x1B",
38 "E": "shift-e",
39 "R": "shift-r",
40 ";": "0x27",
41 ":": "shift-0x27",
42 "F": "shift-f",
43 "S": "shift-s",
44 "'": "0x28",
45 '"': "shift-0x28",
46 "G": "shift-g",
47 "T": "shift-t",
48 "`": "0x29",
49 "~": "shift-0x29",
50 "H": "shift-h",
51 "U": "shift-u",
52 "\\": "0x2B",
53 "|": "shift-0x2B",
54 "I": "shift-i",
55 "V": "shift-v",
56 ",": "0x33",
57 "<": "shift-0x33",
58 "J": "shift-j",
59 "W": "shift-w",
60 ".": "0x34",
61 ">": "shift-0x34",
62 "K": "shift-k",
63 "X": "shift-x",
64 "/": "0x35",
65 "?": "shift-0x35",
66 "L": "shift-l",
67 "Y": "shift-y",
68 " ": "spc",
69 "M": "shift-m",
70 "Z": "shift-z",
71 "\n": "ret",
72 "!": "shift-0x02",
73 "@": "shift-0x03",
74 "#": "shift-0x04",
75 "$": "shift-0x05",
76 "%": "shift-0x06",
77 "^": "shift-0x07",
78 "&": "shift-0x08",
79 "*": "shift-0x09",
80 "(": "shift-0x0A",
81 ")": "shift-0x0B",
82}
83
84
85def make_command(args: list) -> str:
86 return " ".join(map(shlex.quote, (map(str, args))))
87
88
89def _perform_ocr_on_screenshot(
90 screenshot_path: str, model_ids: Iterable[int]
91) -> List[str]:
92 if shutil.which("tesseract") is None:
93 raise Exception("OCR requested but enableOCR is false")
94
95 magick_args = (
96 "-filter Catrom -density 72 -resample 300 "
97 + "-contrast -normalize -despeckle -type grayscale "
98 + "-sharpen 1 -posterize 3 -negate -gamma 100 "
99 + "-blur 1x65535"
100 )
101
102 tess_args = f"-c debug_file=/dev/null --psm 11"
103
104 cmd = f"convert {magick_args} {screenshot_path} tiff:{screenshot_path}.tiff"
105 ret = subprocess.run(cmd, shell=True, capture_output=True)
106 if ret.returncode != 0:
107 raise Exception(f"TIFF conversion failed with exit code {ret.returncode}")
108
109 model_results = []
110 for model_id in model_ids:
111 cmd = f"tesseract {screenshot_path}.tiff - {tess_args} --oem {model_id}"
112 ret = subprocess.run(cmd, shell=True, capture_output=True)
113 if ret.returncode != 0:
114 raise Exception(f"OCR failed with exit code {ret.returncode}")
115 model_results.append(ret.stdout.decode("utf-8"))
116
117 return model_results
118
119
120def retry(fn: Callable, timeout: int = 900) -> None:
121 """Call the given function repeatedly, with 1 second intervals,
122 until it returns True or a timeout is reached.
123 """
124
125 for _ in range(timeout):
126 if fn(False):
127 return
128 time.sleep(1)
129
130 if not fn(True):
131 raise Exception(f"action timed out after {timeout} seconds")
132
133
134class StartCommand:
135 """The Base Start Command knows how to append the necesary
136 runtime qemu options as determined by a particular test driver
137 run. Any such start command is expected to happily receive and
138 append additional qemu args.
139 """
140
141 _cmd: str
142
143 def cmd(
144 self,
145 monitor_socket_path: Path,
146 shell_socket_path: Path,
147 allow_reboot: bool = False, # TODO: unused, legacy?
148 ) -> str:
149 display_opts = ""
150 display_available = any(x in os.environ for x in ["DISPLAY", "WAYLAND_DISPLAY"])
151 if not display_available:
152 display_opts += " -nographic"
153
154 # qemu options
155 qemu_opts = ""
156 qemu_opts += (
157 ""
158 if allow_reboot
159 else " -no-reboot"
160 " -device virtio-serial"
161 " -device virtconsole,chardev=shell"
162 " -device virtio-rng-pci"
163 " -serial stdio"
164 )
165 # TODO: qemu script already catpures this env variable, legacy?
166 qemu_opts += " " + os.environ.get("QEMU_OPTS", "")
167
168 return (
169 f"{self._cmd}"
170 f" -monitor unix:{monitor_socket_path}"
171 f" -chardev socket,id=shell,path={shell_socket_path}"
172 f"{qemu_opts}"
173 f"{display_opts}"
174 )
175
176 @staticmethod
177 def build_environment(
178 state_dir: Path,
179 shared_dir: Path,
180 ) -> dict:
181 # We make a copy to not update the current environment
182 env = dict(os.environ)
183 env.update(
184 {
185 "TMPDIR": str(state_dir),
186 "SHARED_DIR": str(shared_dir),
187 "USE_TMPDIR": "1",
188 }
189 )
190 return env
191
192 def run(
193 self,
194 state_dir: Path,
195 shared_dir: Path,
196 monitor_socket_path: Path,
197 shell_socket_path: Path,
198 ) -> subprocess.Popen:
199 return subprocess.Popen(
200 self.cmd(monitor_socket_path, shell_socket_path),
201 stdin=subprocess.PIPE,
202 stdout=subprocess.PIPE,
203 stderr=subprocess.STDOUT,
204 shell=True,
205 cwd=state_dir,
206 env=self.build_environment(state_dir, shared_dir),
207 )
208
209
210class NixStartScript(StartCommand):
211 """A start script from nixos/modules/virtualiation/qemu-vm.nix
212 that also satisfies the requirement of the BaseStartCommand.
213 These Nix commands have the particular charactersitic that the
214 machine name can be extracted out of them via a regex match.
215 (Admittedly a _very_ implicit contract, evtl. TODO fix)
216 """
217
218 def __init__(self, script: str):
219 self._cmd = script
220
221 @property
222 def machine_name(self) -> str:
223 match = re.search("run-(.+)-vm$", self._cmd)
224 name = "machine"
225 if match:
226 name = match.group(1)
227 return name
228
229
230class LegacyStartCommand(StartCommand):
231 """Used in some places to create an ad-hoc machine instead of
232 using nix test instrumentation + module system for that purpose.
233 Legacy.
234 """
235
236 def __init__(
237 self,
238 netBackendArgs: Optional[str] = None,
239 netFrontendArgs: Optional[str] = None,
240 hda: Optional[Tuple[Path, str]] = None,
241 cdrom: Optional[str] = None,
242 usb: Optional[str] = None,
243 bios: Optional[str] = None,
244 qemuBinary: Optional[str] = None,
245 qemuFlags: Optional[str] = None,
246 ):
247 if qemuBinary is not None:
248 self._cmd = qemuBinary
249 else:
250 self._cmd = "qemu-kvm"
251
252 self._cmd += " -m 384"
253
254 # networking
255 net_backend = "-netdev user,id=net0"
256 net_frontend = "-device virtio-net-pci,netdev=net0"
257 if netBackendArgs is not None:
258 net_backend += "," + netBackendArgs
259 if netFrontendArgs is not None:
260 net_frontend += "," + netFrontendArgs
261 self._cmd += f" {net_backend} {net_frontend}"
262
263 # hda
264 hda_cmd = ""
265 if hda is not None:
266 hda_path = hda[0].resolve()
267 hda_interface = hda[1]
268 if hda_interface == "scsi":
269 hda_cmd += (
270 f" -drive id=hda,file={hda_path},werror=report,if=none"
271 " -device scsi-hd,drive=hda"
272 )
273 else:
274 hda_cmd += f" -drive file={hda_path},if={hda_interface},werror=report"
275 self._cmd += hda_cmd
276
277 # cdrom
278 if cdrom is not None:
279 self._cmd += f" -cdrom {cdrom}"
280
281 # usb
282 usb_cmd = ""
283 if usb is not None:
284 # https://github.com/qemu/qemu/blob/master/docs/usb2.txt
285 usb_cmd += (
286 " -device usb-ehci"
287 f" -drive id=usbdisk,file={usb},if=none,readonly"
288 " -device usb-storage,drive=usbdisk "
289 )
290 self._cmd += usb_cmd
291
292 # bios
293 if bios is not None:
294 self._cmd += f" -bios {bios}"
295
296 # qemu flags
297 if qemuFlags is not None:
298 self._cmd += f" {qemuFlags}"
299
300
301class Machine:
302 """A handle to the machine with this name, that also knows how to manage
303 the machine lifecycle with the help of a start script / command."""
304
305 name: str
306 out_dir: Path
307 tmp_dir: Path
308 shared_dir: Path
309 state_dir: Path
310 monitor_path: Path
311 shell_path: Path
312
313 start_command: StartCommand
314 keep_vm_state: bool
315 allow_reboot: bool
316
317 process: Optional[subprocess.Popen]
318 pid: Optional[int]
319 monitor: Optional[socket.socket]
320 shell: Optional[socket.socket]
321 serial_thread: Optional[threading.Thread]
322
323 booted: bool
324 connected: bool
325 # Store last serial console lines for use
326 # of wait_for_console_text
327 last_lines: Queue = Queue()
328 callbacks: List[Callable]
329
330 def __repr__(self) -> str:
331 return f"<Machine '{self.name}'>"
332
333 def __init__(
334 self,
335 out_dir: Path,
336 tmp_dir: Path,
337 start_command: StartCommand,
338 name: str = "machine",
339 keep_vm_state: bool = False,
340 allow_reboot: bool = False,
341 callbacks: Optional[List[Callable]] = None,
342 ) -> None:
343 self.out_dir = out_dir
344 self.tmp_dir = tmp_dir
345 self.keep_vm_state = keep_vm_state
346 self.allow_reboot = allow_reboot
347 self.name = name
348 self.start_command = start_command
349 self.callbacks = callbacks if callbacks is not None else []
350
351 # set up directories
352 self.shared_dir = self.tmp_dir / "shared-xchg"
353 self.shared_dir.mkdir(mode=0o700, exist_ok=True)
354
355 self.state_dir = self.tmp_dir / f"vm-state-{self.name}"
356 self.monitor_path = self.state_dir / "monitor"
357 self.shell_path = self.state_dir / "shell"
358 if (not self.keep_vm_state) and self.state_dir.exists():
359 self.cleanup_statedir()
360 self.state_dir.mkdir(mode=0o700, exist_ok=True)
361
362 self.process = None
363 self.pid = None
364 self.monitor = None
365 self.shell = None
366 self.serial_thread = None
367
368 self.booted = False
369 self.connected = False
370
371 @staticmethod
372 def create_startcommand(args: Dict[str, str]) -> StartCommand:
373 rootlog.warning(
374 "Using legacy create_startcommand(),"
375 "please use proper nix test vm instrumentation, instead"
376 "to generate the appropriate nixos test vm qemu startup script"
377 )
378 hda = None
379 if args.get("hda"):
380 hda_arg: str = args.get("hda", "")
381 hda_arg_path: Path = Path(hda_arg)
382 hda = (hda_arg_path, args.get("hdaInterface", ""))
383 return LegacyStartCommand(
384 netBackendArgs=args.get("netBackendArgs"),
385 netFrontendArgs=args.get("netFrontendArgs"),
386 hda=hda,
387 cdrom=args.get("cdrom"),
388 usb=args.get("usb"),
389 bios=args.get("bios"),
390 qemuBinary=args.get("qemuBinary"),
391 qemuFlags=args.get("qemuFlags"),
392 )
393
394 def is_up(self) -> bool:
395 return self.booted and self.connected
396
397 def log(self, msg: str) -> None:
398 rootlog.log(msg, {"machine": self.name})
399
400 def log_serial(self, msg: str) -> None:
401 rootlog.log_serial(msg, self.name)
402
403 def nested(self, msg: str, attrs: Dict[str, str] = {}) -> _GeneratorContextManager:
404 my_attrs = {"machine": self.name}
405 my_attrs.update(attrs)
406 return rootlog.nested(msg, my_attrs)
407
408 def wait_for_monitor_prompt(self) -> str:
409 with self.nested("waiting for monitor prompt"):
410 assert self.monitor is not None
411 answer = ""
412 while True:
413 undecoded_answer = self.monitor.recv(1024)
414 if not undecoded_answer:
415 break
416 answer += undecoded_answer.decode()
417 if answer.endswith("(qemu) "):
418 break
419 return answer
420
421 def send_monitor_command(self, command: str) -> str:
422 self.run_callbacks()
423 with self.nested("sending monitor command: {}".format(command)):
424 message = ("{}\n".format(command)).encode()
425 assert self.monitor is not None
426 self.monitor.send(message)
427 return self.wait_for_monitor_prompt()
428
429 def wait_for_unit(
430 self, unit: str, user: Optional[str] = None, timeout: int = 900
431 ) -> None:
432 """Wait for a systemd unit to get into "active" state.
433 Throws exceptions on "failed" and "inactive" states as well as
434 after timing out.
435 """
436
437 def check_active(_: Any) -> bool:
438 info = self.get_unit_info(unit, user)
439 state = info["ActiveState"]
440 if state == "failed":
441 raise Exception('unit "{}" reached state "{}"'.format(unit, state))
442
443 if state == "inactive":
444 status, jobs = self.systemctl("list-jobs --full 2>&1", user)
445 if "No jobs" in jobs:
446 info = self.get_unit_info(unit, user)
447 if info["ActiveState"] == state:
448 raise Exception(
449 (
450 'unit "{}" is inactive and there ' "are no pending jobs"
451 ).format(unit)
452 )
453
454 return state == "active"
455
456 with self.nested(
457 "waiting for unit {}{}".format(
458 unit, f" with user {user}" if user is not None else ""
459 )
460 ):
461 retry(check_active, timeout)
462
463 def get_unit_info(self, unit: str, user: Optional[str] = None) -> Dict[str, str]:
464 status, lines = self.systemctl('--no-pager show "{}"'.format(unit), user)
465 if status != 0:
466 raise Exception(
467 'retrieving systemctl info for unit "{}" {} failed with exit code {}'.format(
468 unit, "" if user is None else 'under user "{}"'.format(user), status
469 )
470 )
471
472 line_pattern = re.compile(r"^([^=]+)=(.*)$")
473
474 def tuple_from_line(line: str) -> Tuple[str, str]:
475 match = line_pattern.match(line)
476 assert match is not None
477 return match[1], match[2]
478
479 return dict(
480 tuple_from_line(line)
481 for line in lines.split("\n")
482 if line_pattern.match(line)
483 )
484
485 def systemctl(self, q: str, user: Optional[str] = None) -> Tuple[int, str]:
486 if user is not None:
487 q = q.replace("'", "\\'")
488 return self.execute(
489 (
490 "su -l {} --shell /bin/sh -c "
491 "$'XDG_RUNTIME_DIR=/run/user/`id -u` "
492 "systemctl --user {}'"
493 ).format(user, q)
494 )
495 return self.execute("systemctl {}".format(q))
496
497 def require_unit_state(self, unit: str, require_state: str = "active") -> None:
498 with self.nested(
499 "checking if unit ‘{}’ has reached state '{}'".format(unit, require_state)
500 ):
501 info = self.get_unit_info(unit)
502 state = info["ActiveState"]
503 if state != require_state:
504 raise Exception(
505 "Expected unit ‘{}’ to to be in state ".format(unit)
506 + "'{}' but it is in state ‘{}’".format(require_state, state)
507 )
508
509 def _next_newline_closed_block_from_shell(self) -> str:
510 assert self.shell
511 output_buffer = []
512 while True:
513 # This receives up to 4096 bytes from the socket
514 chunk = self.shell.recv(4096)
515 if not chunk:
516 # Probably a broken pipe, return the output we have
517 break
518
519 decoded = chunk.decode()
520 output_buffer += [decoded]
521 if decoded[-1] == "\n":
522 break
523 return "".join(output_buffer)
524
525 def execute(
526 self, command: str, check_return: bool = True, timeout: Optional[int] = 900
527 ) -> Tuple[int, str]:
528 self.run_callbacks()
529 self.connect()
530
531 # Always run command with shell opts
532 command = f"set -euo pipefail; {command}"
533
534 timeout_str = ""
535 if timeout is not None:
536 timeout_str = f"timeout {timeout}"
537
538 out_command = (
539 f"{timeout_str} sh -c {shlex.quote(command)} | (base64 --wrap 0; echo)\n"
540 )
541
542 assert self.shell
543 self.shell.send(out_command.encode())
544
545 # Get the output
546 output = base64.b64decode(self._next_newline_closed_block_from_shell())
547
548 if not check_return:
549 return (-1, output.decode())
550
551 # Get the return code
552 self.shell.send("echo ${PIPESTATUS[0]}\n".encode())
553 rc = int(self._next_newline_closed_block_from_shell().strip())
554
555 return (rc, output.decode())
556
557 def shell_interact(self) -> None:
558 """Allows you to interact with the guest shell
559
560 Should only be used during test development, not in the production test."""
561 self.connect()
562 self.log("Terminal is ready (there is no initial prompt):")
563
564 assert self.shell
565 subprocess.run(
566 ["socat", "READLINE,prompt=$ ", f"FD:{self.shell.fileno()}"],
567 pass_fds=[self.shell.fileno()],
568 )
569
570 def console_interact(self) -> None:
571 """Allows you to interact with QEMU's stdin
572
573 The shell can be exited with Ctrl+D. Note that Ctrl+C is not allowed to be used.
574 QEMU's stdout is read line-wise.
575
576 Should only be used during test development, not in the production test."""
577 self.log("Terminal is ready (there is no prompt):")
578
579 assert self.process
580 assert self.process.stdin
581
582 while True:
583 try:
584 char = sys.stdin.buffer.read(1)
585 except KeyboardInterrupt:
586 break
587 if char == b"": # ctrl+d
588 self.log("Closing connection to the console")
589 break
590 self.send_console(char.decode())
591
592 def succeed(self, *commands: str, timeout: Optional[int] = None) -> str:
593 """Execute each command and check that it succeeds."""
594 output = ""
595 for command in commands:
596 with self.nested("must succeed: {}".format(command)):
597 (status, out) = self.execute(command, timeout=timeout)
598 if status != 0:
599 self.log("output: {}".format(out))
600 raise Exception(
601 "command `{}` failed (exit code {})".format(command, status)
602 )
603 output += out
604 return output
605
606 def fail(self, *commands: str, timeout: Optional[int] = None) -> str:
607 """Execute each command and check that it fails."""
608 output = ""
609 for command in commands:
610 with self.nested("must fail: {}".format(command)):
611 (status, out) = self.execute(command, timeout=timeout)
612 if status == 0:
613 raise Exception(
614 "command `{}` unexpectedly succeeded".format(command)
615 )
616 output += out
617 return output
618
619 def wait_until_succeeds(self, command: str, timeout: int = 900) -> str:
620 """Wait until a command returns success and return its output.
621 Throws an exception on timeout.
622 """
623 output = ""
624
625 def check_success(_: Any) -> bool:
626 nonlocal output
627 status, output = self.execute(command, timeout=timeout)
628 return status == 0
629
630 with self.nested("waiting for success: {}".format(command)):
631 retry(check_success, timeout)
632 return output
633
634 def wait_until_fails(self, command: str, timeout: int = 900) -> str:
635 """Wait until a command returns failure.
636 Throws an exception on timeout.
637 """
638 output = ""
639
640 def check_failure(_: Any) -> bool:
641 nonlocal output
642 status, output = self.execute(command, timeout=timeout)
643 return status != 0
644
645 with self.nested("waiting for failure: {}".format(command)):
646 retry(check_failure)
647 return output
648
649 def wait_for_shutdown(self) -> None:
650 if not self.booted:
651 return
652
653 with self.nested("waiting for the VM to power off"):
654 sys.stdout.flush()
655 assert self.process
656 self.process.wait()
657
658 self.pid = None
659 self.booted = False
660 self.connected = False
661
662 def get_tty_text(self, tty: str) -> str:
663 status, output = self.execute(
664 "fold -w$(stty -F /dev/tty{0} size | "
665 "awk '{{print $2}}') /dev/vcs{0}".format(tty)
666 )
667 return output
668
669 def wait_until_tty_matches(self, tty: str, regexp: str) -> None:
670 """Wait until the visible output on the chosen TTY matches regular
671 expression. Throws an exception on timeout.
672 """
673 matcher = re.compile(regexp)
674
675 def tty_matches(last: bool) -> bool:
676 text = self.get_tty_text(tty)
677 if last:
678 self.log(
679 f"Last chance to match /{regexp}/ on TTY{tty}, "
680 f"which currently contains: {text}"
681 )
682 return len(matcher.findall(text)) > 0
683
684 with self.nested("waiting for {} to appear on tty {}".format(regexp, tty)):
685 retry(tty_matches)
686
687 def send_chars(self, chars: str, delay: Optional[float] = 0.01) -> None:
688 with self.nested("sending keys ‘{}‘".format(chars)):
689 for char in chars:
690 self.send_key(char, delay)
691
692 def wait_for_file(self, filename: str) -> None:
693 """Waits until the file exists in machine's file system."""
694
695 def check_file(_: Any) -> bool:
696 status, _ = self.execute("test -e {}".format(filename))
697 return status == 0
698
699 with self.nested("waiting for file ‘{}‘".format(filename)):
700 retry(check_file)
701
702 def wait_for_open_port(self, port: int) -> None:
703 def port_is_open(_: Any) -> bool:
704 status, _ = self.execute("nc -z localhost {}".format(port))
705 return status == 0
706
707 with self.nested("waiting for TCP port {}".format(port)):
708 retry(port_is_open)
709
710 def wait_for_closed_port(self, port: int) -> None:
711 def port_is_closed(_: Any) -> bool:
712 status, _ = self.execute("nc -z localhost {}".format(port))
713 return status != 0
714
715 with self.nested("waiting for TCP port {} to be closed".format(port)):
716 retry(port_is_closed)
717
718 def start_job(self, jobname: str, user: Optional[str] = None) -> Tuple[int, str]:
719 return self.systemctl("start {}".format(jobname), user)
720
721 def stop_job(self, jobname: str, user: Optional[str] = None) -> Tuple[int, str]:
722 return self.systemctl("stop {}".format(jobname), user)
723
724 def wait_for_job(self, jobname: str) -> None:
725 self.wait_for_unit(jobname)
726
727 def connect(self) -> None:
728 if self.connected:
729 return
730
731 with self.nested("waiting for the VM to finish booting"):
732 self.start()
733
734 assert self.shell
735
736 tic = time.time()
737 self.shell.recv(1024)
738 # TODO: Timeout
739 toc = time.time()
740
741 self.log("connected to guest root shell")
742 self.log("(connecting took {:.2f} seconds)".format(toc - tic))
743 self.connected = True
744
745 def screenshot(self, filename: str) -> None:
746 word_pattern = re.compile(r"^\w+$")
747 if word_pattern.match(filename):
748 filename = os.path.join(self.out_dir, "{}.png".format(filename))
749 tmp = "{}.ppm".format(filename)
750
751 with self.nested(
752 "making screenshot {}".format(filename),
753 {"image": os.path.basename(filename)},
754 ):
755 self.send_monitor_command("screendump {}".format(tmp))
756 ret = subprocess.run("pnmtopng {} > {}".format(tmp, filename), shell=True)
757 os.unlink(tmp)
758 if ret.returncode != 0:
759 raise Exception("Cannot convert screenshot")
760
761 def copy_from_host_via_shell(self, source: str, target: str) -> None:
762 """Copy a file from the host into the guest by piping it over the
763 shell into the destination file. Works without host-guest shared folder.
764 Prefer copy_from_host for whenever possible.
765 """
766 with open(source, "rb") as fh:
767 content_b64 = base64.b64encode(fh.read()).decode()
768 self.succeed(
769 f"mkdir -p $(dirname {target})",
770 f"echo -n {content_b64} | base64 -d > {target}",
771 )
772
773 def copy_from_host(self, source: str, target: str) -> None:
774 """Copy a file from the host into the guest via the `shared_dir` shared
775 among all the VMs (using a temporary directory).
776 """
777 host_src = Path(source)
778 vm_target = Path(target)
779 with tempfile.TemporaryDirectory(dir=self.shared_dir) as shared_td:
780 shared_temp = Path(shared_td)
781 host_intermediate = shared_temp / host_src.name
782 vm_shared_temp = Path("/tmp/shared") / shared_temp.name
783 vm_intermediate = vm_shared_temp / host_src.name
784
785 self.succeed(make_command(["mkdir", "-p", vm_shared_temp]))
786 if host_src.is_dir():
787 shutil.copytree(host_src, host_intermediate)
788 else:
789 shutil.copy(host_src, host_intermediate)
790 self.succeed(make_command(["mkdir", "-p", vm_target.parent]))
791 self.succeed(make_command(["cp", "-r", vm_intermediate, vm_target]))
792
793 def copy_from_vm(self, source: str, target_dir: str = "") -> None:
794 """Copy a file from the VM (specified by an in-VM source path) to a path
795 relative to `$out`. The file is copied via the `shared_dir` shared among
796 all the VMs (using a temporary directory).
797 """
798 # Compute the source, target, and intermediate shared file names
799 vm_src = Path(source)
800 with tempfile.TemporaryDirectory(dir=self.shared_dir) as shared_td:
801 shared_temp = Path(shared_td)
802 vm_shared_temp = Path("/tmp/shared") / shared_temp.name
803 vm_intermediate = vm_shared_temp / vm_src.name
804 intermediate = shared_temp / vm_src.name
805 # Copy the file to the shared directory inside VM
806 self.succeed(make_command(["mkdir", "-p", vm_shared_temp]))
807 self.succeed(make_command(["cp", "-r", vm_src, vm_intermediate]))
808 abs_target = self.out_dir / target_dir / vm_src.name
809 abs_target.parent.mkdir(exist_ok=True, parents=True)
810 # Copy the file from the shared directory outside VM
811 if intermediate.is_dir():
812 shutil.copytree(intermediate, abs_target)
813 else:
814 shutil.copy(intermediate, abs_target)
815
816 def dump_tty_contents(self, tty: str) -> None:
817 """Debugging: Dump the contents of the TTY<n>"""
818 self.execute("fold -w 80 /dev/vcs{} | systemd-cat".format(tty))
819
820 def _get_screen_text_variants(self, model_ids: Iterable[int]) -> List[str]:
821 with tempfile.TemporaryDirectory() as tmpdir:
822 screenshot_path = os.path.join(tmpdir, "ppm")
823 self.send_monitor_command(f"screendump {screenshot_path}")
824 return _perform_ocr_on_screenshot(screenshot_path, model_ids)
825
826 def get_screen_text_variants(self) -> List[str]:
827 return self._get_screen_text_variants([0, 1, 2])
828
829 def get_screen_text(self) -> str:
830 return self._get_screen_text_variants([2])[0]
831
832 def wait_for_text(self, regex: str) -> None:
833 def screen_matches(last: bool) -> bool:
834 variants = self.get_screen_text_variants()
835 for text in variants:
836 if re.search(regex, text) is not None:
837 return True
838
839 if last:
840 self.log("Last OCR attempt failed. Text was: {}".format(variants))
841
842 return False
843
844 with self.nested("waiting for {} to appear on screen".format(regex)):
845 retry(screen_matches)
846
847 def wait_for_console_text(self, regex: str) -> None:
848 with self.nested("waiting for {} to appear on console".format(regex)):
849 # Buffer the console output, this is needed
850 # to match multiline regexes.
851 console = io.StringIO()
852 while True:
853 try:
854 console.write(self.last_lines.get())
855 except queue.Empty:
856 self.sleep(1)
857 continue
858 console.seek(0)
859 matches = re.search(regex, console.read())
860 if matches is not None:
861 return
862
863 def send_key(self, key: str, delay: Optional[float] = 0.01) -> None:
864 key = CHAR_TO_KEY.get(key, key)
865 self.send_monitor_command("sendkey {}".format(key))
866 if delay is not None:
867 time.sleep(delay)
868
869 def send_console(self, chars: str) -> None:
870 assert self.process
871 assert self.process.stdin
872 self.process.stdin.write(chars.encode())
873 self.process.stdin.flush()
874
875 def start(self) -> None:
876 if self.booted:
877 return
878
879 self.log("starting vm")
880
881 def clear(path: Path) -> Path:
882 if path.exists():
883 path.unlink()
884 return path
885
886 def create_socket(path: Path) -> socket.socket:
887 s = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM)
888 s.bind(str(path))
889 s.listen(1)
890 return s
891
892 monitor_socket = create_socket(clear(self.monitor_path))
893 shell_socket = create_socket(clear(self.shell_path))
894 self.process = self.start_command.run(
895 self.state_dir,
896 self.shared_dir,
897 self.monitor_path,
898 self.shell_path,
899 )
900 self.monitor, _ = monitor_socket.accept()
901 self.shell, _ = shell_socket.accept()
902
903 # Store last serial console lines for use
904 # of wait_for_console_text
905 self.last_lines: Queue = Queue()
906
907 def process_serial_output() -> None:
908 assert self.process
909 assert self.process.stdout
910 for _line in self.process.stdout:
911 # Ignore undecodable bytes that may occur in boot menus
912 line = _line.decode(errors="ignore").replace("\r", "").rstrip()
913 self.last_lines.put(line)
914 self.log_serial(line)
915
916 self.serial_thread = threading.Thread(target=process_serial_output)
917 self.serial_thread.start()
918
919 self.wait_for_monitor_prompt()
920
921 self.pid = self.process.pid
922 self.booted = True
923
924 self.log("QEMU running (pid {})".format(self.pid))
925
926 def cleanup_statedir(self) -> None:
927 shutil.rmtree(self.state_dir)
928 rootlog.log(f"deleting VM state directory {self.state_dir}")
929 rootlog.log("if you want to keep the VM state, pass --keep-vm-state")
930
931 def shutdown(self) -> None:
932 if not self.booted:
933 return
934
935 assert self.shell
936 self.shell.send("poweroff\n".encode())
937 self.wait_for_shutdown()
938
939 def crash(self) -> None:
940 if not self.booted:
941 return
942
943 self.log("forced crash")
944 self.send_monitor_command("quit")
945 self.wait_for_shutdown()
946
947 def wait_for_x(self) -> None:
948 """Wait until it is possible to connect to the X server. Note that
949 testing the existence of /tmp/.X11-unix/X0 is insufficient.
950 """
951
952 def check_x(_: Any) -> bool:
953 cmd = (
954 "journalctl -b SYSLOG_IDENTIFIER=systemd | "
955 + 'grep "Reached target Current graphical"'
956 )
957 status, _ = self.execute(cmd)
958 if status != 0:
959 return False
960 status, _ = self.execute("[ -e /tmp/.X11-unix/X0 ]")
961 return status == 0
962
963 with self.nested("waiting for the X11 server"):
964 retry(check_x)
965
966 def get_window_names(self) -> List[str]:
967 return self.succeed(
968 r"xwininfo -root -tree | sed 's/.*0x[0-9a-f]* \"\([^\"]*\)\".*/\1/; t; d'"
969 ).splitlines()
970
971 def wait_for_window(self, regexp: str) -> None:
972 pattern = re.compile(regexp)
973
974 def window_is_visible(last_try: bool) -> bool:
975 names = self.get_window_names()
976 if last_try:
977 self.log(
978 "Last chance to match {} on the window list,".format(regexp)
979 + " which currently contains: "
980 + ", ".join(names)
981 )
982 return any(pattern.search(name) for name in names)
983
984 with self.nested("waiting for a window to appear"):
985 retry(window_is_visible)
986
987 def sleep(self, secs: int) -> None:
988 # We want to sleep in *guest* time, not *host* time.
989 self.succeed(f"sleep {secs}")
990
991 def forward_port(self, host_port: int = 8080, guest_port: int = 80) -> None:
992 """Forward a TCP port on the host to a TCP port on the guest.
993 Useful during interactive testing.
994 """
995 self.send_monitor_command(
996 "hostfwd_add tcp::{}-:{}".format(host_port, guest_port)
997 )
998
999 def block(self) -> None:
1000 """Make the machine unreachable by shutting down eth1 (the multicast
1001 interface used to talk to the other VMs). We keep eth0 up so that
1002 the test driver can continue to talk to the machine.
1003 """
1004 self.send_monitor_command("set_link virtio-net-pci.1 off")
1005
1006 def unblock(self) -> None:
1007 """Make the machine reachable."""
1008 self.send_monitor_command("set_link virtio-net-pci.1 on")
1009
1010 def release(self) -> None:
1011 if self.pid is None:
1012 return
1013 rootlog.info(f"kill machine (pid {self.pid})")
1014 assert self.process
1015 assert self.shell
1016 assert self.monitor
1017 assert self.serial_thread
1018
1019 self.process.terminate()
1020 self.shell.close()
1021 self.monitor.close()
1022 self.serial_thread.join()
1023
1024 def run_callbacks(self) -> None:
1025 for callback in self.callbacks:
1026 callback()