1from contextlib import _GeneratorContextManager, nullcontext
2from pathlib import Path
3from queue import Queue
4from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple
5import base64
6import io
7import os
8import queue
9import re
10import select
11import shlex
12import shutil
13import socket
14import subprocess
15import sys
16import tempfile
17import threading
18import time
19
20from test_driver.logger import rootlog
21
22CHAR_TO_KEY = {
23 "A": "shift-a",
24 "N": "shift-n",
25 "-": "0x0C",
26 "_": "shift-0x0C",
27 "B": "shift-b",
28 "O": "shift-o",
29 "=": "0x0D",
30 "+": "shift-0x0D",
31 "C": "shift-c",
32 "P": "shift-p",
33 "[": "0x1A",
34 "{": "shift-0x1A",
35 "D": "shift-d",
36 "Q": "shift-q",
37 "]": "0x1B",
38 "}": "shift-0x1B",
39 "E": "shift-e",
40 "R": "shift-r",
41 ";": "0x27",
42 ":": "shift-0x27",
43 "F": "shift-f",
44 "S": "shift-s",
45 "'": "0x28",
46 '"': "shift-0x28",
47 "G": "shift-g",
48 "T": "shift-t",
49 "`": "0x29",
50 "~": "shift-0x29",
51 "H": "shift-h",
52 "U": "shift-u",
53 "\\": "0x2B",
54 "|": "shift-0x2B",
55 "I": "shift-i",
56 "V": "shift-v",
57 ",": "0x33",
58 "<": "shift-0x33",
59 "J": "shift-j",
60 "W": "shift-w",
61 ".": "0x34",
62 ">": "shift-0x34",
63 "K": "shift-k",
64 "X": "shift-x",
65 "/": "0x35",
66 "?": "shift-0x35",
67 "L": "shift-l",
68 "Y": "shift-y",
69 " ": "spc",
70 "M": "shift-m",
71 "Z": "shift-z",
72 "\n": "ret",
73 "!": "shift-0x02",
74 "@": "shift-0x03",
75 "#": "shift-0x04",
76 "$": "shift-0x05",
77 "%": "shift-0x06",
78 "^": "shift-0x07",
79 "&": "shift-0x08",
80 "*": "shift-0x09",
81 "(": "shift-0x0A",
82 ")": "shift-0x0B",
83}
84
85
86def make_command(args: list) -> str:
87 return " ".join(map(shlex.quote, (map(str, args))))
88
89
90def _perform_ocr_on_screenshot(
91 screenshot_path: str, model_ids: Iterable[int]
92) -> List[str]:
93 if shutil.which("tesseract") is None:
94 raise Exception("OCR requested but enableOCR is false")
95
96 magick_args = (
97 "-filter Catrom -density 72 -resample 300 "
98 + "-contrast -normalize -despeckle -type grayscale "
99 + "-sharpen 1 -posterize 3 -negate -gamma 100 "
100 + "-blur 1x65535"
101 )
102
103 tess_args = "-c debug_file=/dev/null --psm 11"
104
105 cmd = f"convert {magick_args} '{screenshot_path}' 'tiff:{screenshot_path}.tiff'"
106 ret = subprocess.run(cmd, shell=True, capture_output=True)
107 if ret.returncode != 0:
108 raise Exception(f"TIFF conversion failed with exit code {ret.returncode}")
109
110 model_results = []
111 for model_id in model_ids:
112 cmd = f"tesseract '{screenshot_path}.tiff' - {tess_args} --oem '{model_id}'"
113 ret = subprocess.run(cmd, shell=True, capture_output=True)
114 if ret.returncode != 0:
115 raise Exception(f"OCR failed with exit code {ret.returncode}")
116 model_results.append(ret.stdout.decode("utf-8"))
117
118 return model_results
119
120
121def retry(fn: Callable, timeout: int = 900) -> None:
122 """Call the given function repeatedly, with 1 second intervals,
123 until it returns True or a timeout is reached.
124 """
125
126 for _ in range(timeout):
127 if fn(False):
128 return
129 time.sleep(1)
130
131 if not fn(True):
132 raise Exception(f"action timed out after {timeout} seconds")
133
134
135class StartCommand:
136 """The Base Start Command knows how to append the necessary
137 runtime qemu options as determined by a particular test driver
138 run. Any such start command is expected to happily receive and
139 append additional qemu args.
140 """
141
142 _cmd: str
143
144 def cmd(
145 self,
146 monitor_socket_path: Path,
147 shell_socket_path: Path,
148 allow_reboot: bool = False,
149 ) -> str:
150 display_opts = ""
151 display_available = any(x in os.environ for x in ["DISPLAY", "WAYLAND_DISPLAY"])
152 if not display_available:
153 display_opts += " -nographic"
154
155 # qemu options
156 qemu_opts = (
157 " -device virtio-serial"
158 # Note: virtconsole will map to /dev/hvc0 in Linux guests
159 " -device virtconsole,chardev=shell"
160 " -device virtio-rng-pci"
161 " -serial stdio"
162 )
163 if not allow_reboot:
164 qemu_opts += " -no-reboot"
165 # TODO: qemu script already catpures this env variable, legacy?
166 qemu_opts += " " + os.environ.get("QEMU_OPTS", "")
167
168 return (
169 f"{self._cmd}"
170 f" -monitor unix:{monitor_socket_path}"
171 f" -chardev socket,id=shell,path={shell_socket_path}"
172 f"{qemu_opts}"
173 f"{display_opts}"
174 )
175
176 @staticmethod
177 def build_environment(
178 state_dir: Path,
179 shared_dir: Path,
180 ) -> dict:
181 # We make a copy to not update the current environment
182 env = dict(os.environ)
183 env.update(
184 {
185 "TMPDIR": str(state_dir),
186 "SHARED_DIR": str(shared_dir),
187 "USE_TMPDIR": "1",
188 }
189 )
190 return env
191
192 def run(
193 self,
194 state_dir: Path,
195 shared_dir: Path,
196 monitor_socket_path: Path,
197 shell_socket_path: Path,
198 allow_reboot: bool,
199 ) -> subprocess.Popen:
200 return subprocess.Popen(
201 self.cmd(monitor_socket_path, shell_socket_path, allow_reboot),
202 stdin=subprocess.PIPE,
203 stdout=subprocess.PIPE,
204 stderr=subprocess.STDOUT,
205 shell=True,
206 cwd=state_dir,
207 env=self.build_environment(state_dir, shared_dir),
208 )
209
210
211class NixStartScript(StartCommand):
212 """A start script from nixos/modules/virtualiation/qemu-vm.nix
213 that also satisfies the requirement of the BaseStartCommand.
214 These Nix commands have the particular characteristic that the
215 machine name can be extracted out of them via a regex match.
216 (Admittedly a _very_ implicit contract, evtl. TODO fix)
217 """
218
219 def __init__(self, script: str):
220 self._cmd = script
221
222 @property
223 def machine_name(self) -> str:
224 match = re.search("run-(.+)-vm$", self._cmd)
225 name = "machine"
226 if match:
227 name = match.group(1)
228 return name
229
230
231class LegacyStartCommand(StartCommand):
232 """Used in some places to create an ad-hoc machine instead of
233 using nix test instrumentation + module system for that purpose.
234 Legacy.
235 """
236
237 def __init__(
238 self,
239 netBackendArgs: Optional[str] = None,
240 netFrontendArgs: Optional[str] = None,
241 hda: Optional[Tuple[Path, str]] = None,
242 cdrom: Optional[str] = None,
243 usb: Optional[str] = None,
244 bios: Optional[str] = None,
245 qemuBinary: Optional[str] = None,
246 qemuFlags: Optional[str] = None,
247 ):
248 if qemuBinary is not None:
249 self._cmd = qemuBinary
250 else:
251 self._cmd = "qemu-kvm"
252
253 self._cmd += " -m 384"
254
255 # networking
256 net_backend = "-netdev user,id=net0"
257 net_frontend = "-device virtio-net-pci,netdev=net0"
258 if netBackendArgs is not None:
259 net_backend += "," + netBackendArgs
260 if netFrontendArgs is not None:
261 net_frontend += "," + netFrontendArgs
262 self._cmd += f" {net_backend} {net_frontend}"
263
264 # hda
265 hda_cmd = ""
266 if hda is not None:
267 hda_path = hda[0].resolve()
268 hda_interface = hda[1]
269 if hda_interface == "scsi":
270 hda_cmd += (
271 f" -drive id=hda,file={hda_path},werror=report,if=none"
272 " -device scsi-hd,drive=hda"
273 )
274 else:
275 hda_cmd += f" -drive file={hda_path},if={hda_interface},werror=report"
276 self._cmd += hda_cmd
277
278 # cdrom
279 if cdrom is not None:
280 self._cmd += f" -cdrom {cdrom}"
281
282 # usb
283 usb_cmd = ""
284 if usb is not None:
285 # https://github.com/qemu/qemu/blob/master/docs/usb2.txt
286 usb_cmd += (
287 " -device usb-ehci"
288 f" -drive id=usbdisk,file={usb},if=none,readonly"
289 " -device usb-storage,drive=usbdisk "
290 )
291 self._cmd += usb_cmd
292
293 # bios
294 if bios is not None:
295 self._cmd += f" -bios {bios}"
296
297 # qemu flags
298 if qemuFlags is not None:
299 self._cmd += f" {qemuFlags}"
300
301
302class Machine:
303 """A handle to the machine with this name, that also knows how to manage
304 the machine lifecycle with the help of a start script / command."""
305
306 name: str
307 out_dir: Path
308 tmp_dir: Path
309 shared_dir: Path
310 state_dir: Path
311 monitor_path: Path
312 shell_path: Path
313
314 start_command: StartCommand
315 keep_vm_state: bool
316
317 process: Optional[subprocess.Popen]
318 pid: Optional[int]
319 monitor: Optional[socket.socket]
320 shell: Optional[socket.socket]
321 serial_thread: Optional[threading.Thread]
322
323 booted: bool
324 connected: bool
325 # Store last serial console lines for use
326 # of wait_for_console_text
327 last_lines: Queue = Queue()
328 callbacks: List[Callable]
329
330 def __repr__(self) -> str:
331 return f"<Machine '{self.name}'>"
332
333 def __init__(
334 self,
335 out_dir: Path,
336 tmp_dir: Path,
337 start_command: StartCommand,
338 name: str = "machine",
339 keep_vm_state: bool = False,
340 callbacks: Optional[List[Callable]] = None,
341 ) -> None:
342 self.out_dir = out_dir
343 self.tmp_dir = tmp_dir
344 self.keep_vm_state = keep_vm_state
345 self.name = name
346 self.start_command = start_command
347 self.callbacks = callbacks if callbacks is not None else []
348
349 # set up directories
350 self.shared_dir = self.tmp_dir / "shared-xchg"
351 self.shared_dir.mkdir(mode=0o700, exist_ok=True)
352
353 self.state_dir = self.tmp_dir / f"vm-state-{self.name}"
354 self.monitor_path = self.state_dir / "monitor"
355 self.shell_path = self.state_dir / "shell"
356 if (not self.keep_vm_state) and self.state_dir.exists():
357 self.cleanup_statedir()
358 self.state_dir.mkdir(mode=0o700, exist_ok=True)
359
360 self.process = None
361 self.pid = None
362 self.monitor = None
363 self.shell = None
364 self.serial_thread = None
365
366 self.booted = False
367 self.connected = False
368
369 @staticmethod
370 def create_startcommand(args: Dict[str, str]) -> StartCommand:
371 rootlog.warning(
372 "Using legacy create_startcommand(),"
373 "please use proper nix test vm instrumentation, instead"
374 "to generate the appropriate nixos test vm qemu startup script"
375 )
376 hda = None
377 if args.get("hda"):
378 hda_arg: str = args.get("hda", "")
379 hda_arg_path: Path = Path(hda_arg)
380 hda = (hda_arg_path, args.get("hdaInterface", ""))
381 return LegacyStartCommand(
382 netBackendArgs=args.get("netBackendArgs"),
383 netFrontendArgs=args.get("netFrontendArgs"),
384 hda=hda,
385 cdrom=args.get("cdrom"),
386 usb=args.get("usb"),
387 bios=args.get("bios"),
388 qemuBinary=args.get("qemuBinary"),
389 qemuFlags=args.get("qemuFlags"),
390 )
391
392 def is_up(self) -> bool:
393 return self.booted and self.connected
394
395 def log(self, msg: str) -> None:
396 rootlog.log(msg, {"machine": self.name})
397
398 def log_serial(self, msg: str) -> None:
399 rootlog.log_serial(msg, self.name)
400
401 def nested(self, msg: str, attrs: Dict[str, str] = {}) -> _GeneratorContextManager:
402 my_attrs = {"machine": self.name}
403 my_attrs.update(attrs)
404 return rootlog.nested(msg, my_attrs)
405
406 def wait_for_monitor_prompt(self) -> str:
407 assert self.monitor is not None
408 answer = ""
409 while True:
410 undecoded_answer = self.monitor.recv(1024)
411 if not undecoded_answer:
412 break
413 answer += undecoded_answer.decode()
414 if answer.endswith("(qemu) "):
415 break
416 return answer
417
418 def send_monitor_command(self, command: str) -> str:
419 self.run_callbacks()
420 message = f"{command}\n".encode()
421 assert self.monitor is not None
422 self.monitor.send(message)
423 return self.wait_for_monitor_prompt()
424
425 def wait_for_unit(
426 self, unit: str, user: Optional[str] = None, timeout: int = 900
427 ) -> None:
428 """Wait for a systemd unit to get into "active" state.
429 Throws exceptions on "failed" and "inactive" states as well as
430 after timing out.
431 """
432
433 def check_active(_: Any) -> bool:
434 info = self.get_unit_info(unit, user)
435 state = info["ActiveState"]
436 if state == "failed":
437 raise Exception(f'unit "{unit}" reached state "{state}"')
438
439 if state == "inactive":
440 status, jobs = self.systemctl("list-jobs --full 2>&1", user)
441 if "No jobs" in jobs:
442 info = self.get_unit_info(unit, user)
443 if info["ActiveState"] == state:
444 raise Exception(
445 f'unit "{unit}" is inactive and there are no pending jobs'
446 )
447
448 return state == "active"
449
450 with self.nested(
451 f"waiting for unit {unit}"
452 + (f" with user {user}" if user is not None else "")
453 ):
454 retry(check_active, timeout)
455
456 def get_unit_info(self, unit: str, user: Optional[str] = None) -> Dict[str, str]:
457 status, lines = self.systemctl(f'--no-pager show "{unit}"', user)
458 if status != 0:
459 raise Exception(
460 f'retrieving systemctl info for unit "{unit}"'
461 + ("" if user is None else f' under user "{user}"')
462 + f" failed with exit code {status}"
463 )
464
465 line_pattern = re.compile(r"^([^=]+)=(.*)$")
466
467 def tuple_from_line(line: str) -> Tuple[str, str]:
468 match = line_pattern.match(line)
469 assert match is not None
470 return match[1], match[2]
471
472 return dict(
473 tuple_from_line(line)
474 for line in lines.split("\n")
475 if line_pattern.match(line)
476 )
477
478 def systemctl(self, q: str, user: Optional[str] = None) -> Tuple[int, str]:
479 if user is not None:
480 q = q.replace("'", "\\'")
481 return self.execute(
482 f"su -l {user} --shell /bin/sh -c "
483 "$'XDG_RUNTIME_DIR=/run/user/`id -u` "
484 f"systemctl --user {q}'"
485 )
486 return self.execute(f"systemctl {q}")
487
488 def require_unit_state(self, unit: str, require_state: str = "active") -> None:
489 with self.nested(
490 f"checking if unit '{unit}' has reached state '{require_state}'"
491 ):
492 info = self.get_unit_info(unit)
493 state = info["ActiveState"]
494 if state != require_state:
495 raise Exception(
496 f"Expected unit '{unit}' to to be in state "
497 f"'{require_state}' but it is in state '{state}'"
498 )
499
500 def _next_newline_closed_block_from_shell(self) -> str:
501 assert self.shell
502 output_buffer = []
503 while True:
504 # This receives up to 4096 bytes from the socket
505 chunk = self.shell.recv(4096)
506 if not chunk:
507 # Probably a broken pipe, return the output we have
508 break
509
510 decoded = chunk.decode()
511 output_buffer += [decoded]
512 if decoded[-1] == "\n":
513 break
514 return "".join(output_buffer)
515
516 def execute(
517 self, command: str, check_return: bool = True, timeout: Optional[int] = 900
518 ) -> Tuple[int, str]:
519 self.run_callbacks()
520 self.connect()
521
522 # Always run command with shell opts
523 command = f"set -euo pipefail; {command}"
524
525 timeout_str = ""
526 if timeout is not None:
527 timeout_str = f"timeout {timeout}"
528
529 # While sh is bash on NixOS, this is not the case for every distro.
530 # We explicitly call bash here to allow for the driver to boot other distros as well.
531 out_command = (
532 f"{timeout_str} bash -c {shlex.quote(command)} | (base64 --wrap 0; echo)\n"
533 )
534
535 assert self.shell
536 self.shell.send(out_command.encode())
537
538 # Get the output
539 output = base64.b64decode(self._next_newline_closed_block_from_shell())
540
541 if not check_return:
542 return (-1, output.decode())
543
544 # Get the return code
545 self.shell.send("echo ${PIPESTATUS[0]}\n".encode())
546 rc = int(self._next_newline_closed_block_from_shell().strip())
547
548 return (rc, output.decode(errors="replace"))
549
550 def shell_interact(self, address: Optional[str] = None) -> None:
551 """Allows you to interact with the guest shell for debugging purposes.
552
553 @address string passed to socat that will be connected to the guest shell.
554 Check the `Running Tests interactivly` chapter of NixOS manual for an example.
555 """
556 self.connect()
557
558 if address is None:
559 address = "READLINE,prompt=$ "
560 self.log("Terminal is ready (there is no initial prompt):")
561
562 assert self.shell
563 try:
564 subprocess.run(
565 ["socat", address, f"FD:{self.shell.fileno()}"],
566 pass_fds=[self.shell.fileno()],
567 )
568 # allow users to cancel this command without breaking the test
569 except KeyboardInterrupt:
570 pass
571
572 def console_interact(self) -> None:
573 """Allows you to interact with QEMU's stdin
574
575 The shell can be exited with Ctrl+D. Note that Ctrl+C is not allowed to be used.
576 QEMU's stdout is read line-wise.
577
578 Should only be used during test development, not in the production test."""
579 self.log("Terminal is ready (there is no prompt):")
580
581 assert self.process
582 assert self.process.stdin
583
584 while True:
585 try:
586 char = sys.stdin.buffer.read(1)
587 except KeyboardInterrupt:
588 break
589 if char == b"": # ctrl+d
590 self.log("Closing connection to the console")
591 break
592 self.send_console(char.decode())
593
594 def succeed(self, *commands: str, timeout: Optional[int] = None) -> str:
595 """Execute each command and check that it succeeds."""
596 output = ""
597 for command in commands:
598 with self.nested(f"must succeed: {command}"):
599 (status, out) = self.execute(command, timeout=timeout)
600 if status != 0:
601 self.log(f"output: {out}")
602 raise Exception(f"command `{command}` failed (exit code {status})")
603 output += out
604 return output
605
606 def fail(self, *commands: str, timeout: Optional[int] = None) -> str:
607 """Execute each command and check that it fails."""
608 output = ""
609 for command in commands:
610 with self.nested(f"must fail: {command}"):
611 (status, out) = self.execute(command, timeout=timeout)
612 if status == 0:
613 raise Exception(f"command `{command}` unexpectedly succeeded")
614 output += out
615 return output
616
617 def wait_until_succeeds(self, command: str, timeout: int = 900) -> str:
618 """Wait until a command returns success and return its output.
619 Throws an exception on timeout.
620 """
621 output = ""
622
623 def check_success(_: Any) -> bool:
624 nonlocal output
625 status, output = self.execute(command, timeout=timeout)
626 return status == 0
627
628 with self.nested(f"waiting for success: {command}"):
629 retry(check_success, timeout)
630 return output
631
632 def wait_until_fails(self, command: str, timeout: int = 900) -> str:
633 """Wait until a command returns failure.
634 Throws an exception on timeout.
635 """
636 output = ""
637
638 def check_failure(_: Any) -> bool:
639 nonlocal output
640 status, output = self.execute(command, timeout=timeout)
641 return status != 0
642
643 with self.nested(f"waiting for failure: {command}"):
644 retry(check_failure)
645 return output
646
647 def wait_for_shutdown(self) -> None:
648 if not self.booted:
649 return
650
651 with self.nested("waiting for the VM to power off"):
652 sys.stdout.flush()
653 assert self.process
654 self.process.wait()
655
656 self.pid = None
657 self.booted = False
658 self.connected = False
659
660 def get_tty_text(self, tty: str) -> str:
661 status, output = self.execute(
662 f"fold -w$(stty -F /dev/tty{tty} size | "
663 f"awk '{{print $2}}') /dev/vcs{tty}"
664 )
665 return output
666
667 def wait_until_tty_matches(self, tty: str, regexp: str) -> None:
668 """Wait until the visible output on the chosen TTY matches regular
669 expression. Throws an exception on timeout.
670 """
671 matcher = re.compile(regexp)
672
673 def tty_matches(last: bool) -> bool:
674 text = self.get_tty_text(tty)
675 if last:
676 self.log(
677 f"Last chance to match /{regexp}/ on TTY{tty}, "
678 f"which currently contains: {text}"
679 )
680 return len(matcher.findall(text)) > 0
681
682 with self.nested(f"waiting for {regexp} to appear on tty {tty}"):
683 retry(tty_matches)
684
685 def send_chars(self, chars: str, delay: Optional[float] = 0.01) -> None:
686 with self.nested(f"sending keys {repr(chars)}"):
687 for char in chars:
688 self.send_key(char, delay, log=False)
689
690 def wait_for_file(self, filename: str) -> None:
691 """Waits until the file exists in machine's file system."""
692
693 def check_file(_: Any) -> bool:
694 status, _ = self.execute(f"test -e {filename}")
695 return status == 0
696
697 with self.nested(f"waiting for file '{filename}'"):
698 retry(check_file)
699
700 def wait_for_open_port(self, port: int, addr: str = "localhost") -> None:
701 def port_is_open(_: Any) -> bool:
702 status, _ = self.execute(f"nc -z {addr} {port}")
703 return status == 0
704
705 with self.nested(f"waiting for TCP port {port} on {addr}"):
706 retry(port_is_open)
707
708 def wait_for_closed_port(self, port: int, addr: str = "localhost") -> None:
709 def port_is_closed(_: Any) -> bool:
710 status, _ = self.execute(f"nc -z {addr} {port}")
711 return status != 0
712
713 with self.nested(f"waiting for TCP port {port} on {addr} to be closed"):
714 retry(port_is_closed)
715
716 def start_job(self, jobname: str, user: Optional[str] = None) -> Tuple[int, str]:
717 return self.systemctl(f"start {jobname}", user)
718
719 def stop_job(self, jobname: str, user: Optional[str] = None) -> Tuple[int, str]:
720 return self.systemctl(f"stop {jobname}", user)
721
722 def wait_for_job(self, jobname: str) -> None:
723 self.wait_for_unit(jobname)
724
725 def connect(self) -> None:
726 def shell_ready(timeout_secs: int) -> bool:
727 """We sent some data from the backdoor service running on the guest
728 to indicate that the backdoor shell is ready.
729 As soon as we read some data from the socket here, we assume that
730 our root shell is operational.
731 """
732 (ready, _, _) = select.select([self.shell], [], [], timeout_secs)
733 return bool(ready)
734
735 if self.connected:
736 return
737
738 with self.nested("waiting for the VM to finish booting"):
739 self.start()
740
741 assert self.shell
742
743 tic = time.time()
744 # TODO: do we want to bail after a set number of attempts?
745 while not shell_ready(timeout_secs=30):
746 self.log("Guest root shell did not produce any data yet...")
747
748 self.log(self.shell.recv(1024).decode())
749 toc = time.time()
750
751 self.log("connected to guest root shell")
752 self.log(f"(connecting took {toc - tic:.2f} seconds)")
753 self.connected = True
754
755 def screenshot(self, filename: str) -> None:
756 if "." not in filename:
757 filename += ".png"
758 if "/" not in filename:
759 filename = os.path.join(self.out_dir, filename)
760 tmp = f"{filename}.ppm"
761
762 with self.nested(
763 f"making screenshot {filename}",
764 {"image": os.path.basename(filename)},
765 ):
766 self.send_monitor_command(f"screendump {tmp}")
767 ret = subprocess.run(f"pnmtopng '{tmp}' > '{filename}'", shell=True)
768 os.unlink(tmp)
769 if ret.returncode != 0:
770 raise Exception("Cannot convert screenshot")
771
772 def copy_from_host_via_shell(self, source: str, target: str) -> None:
773 """Copy a file from the host into the guest by piping it over the
774 shell into the destination file. Works without host-guest shared folder.
775 Prefer copy_from_host for whenever possible.
776 """
777 with open(source, "rb") as fh:
778 content_b64 = base64.b64encode(fh.read()).decode()
779 self.succeed(
780 f"mkdir -p $(dirname {target})",
781 f"echo -n {content_b64} | base64 -d > {target}",
782 )
783
784 def copy_from_host(self, source: str, target: str) -> None:
785 """Copy a file from the host into the guest via the `shared_dir` shared
786 among all the VMs (using a temporary directory).
787 """
788 host_src = Path(source)
789 vm_target = Path(target)
790 with tempfile.TemporaryDirectory(dir=self.shared_dir) as shared_td:
791 shared_temp = Path(shared_td)
792 host_intermediate = shared_temp / host_src.name
793 vm_shared_temp = Path("/tmp/shared") / shared_temp.name
794 vm_intermediate = vm_shared_temp / host_src.name
795
796 self.succeed(make_command(["mkdir", "-p", vm_shared_temp]))
797 if host_src.is_dir():
798 shutil.copytree(host_src, host_intermediate)
799 else:
800 shutil.copy(host_src, host_intermediate)
801 self.succeed(make_command(["mkdir", "-p", vm_target.parent]))
802 self.succeed(make_command(["cp", "-r", vm_intermediate, vm_target]))
803
804 def copy_from_vm(self, source: str, target_dir: str = "") -> None:
805 """Copy a file from the VM (specified by an in-VM source path) to a path
806 relative to `$out`. The file is copied via the `shared_dir` shared among
807 all the VMs (using a temporary directory).
808 """
809 # Compute the source, target, and intermediate shared file names
810 vm_src = Path(source)
811 with tempfile.TemporaryDirectory(dir=self.shared_dir) as shared_td:
812 shared_temp = Path(shared_td)
813 vm_shared_temp = Path("/tmp/shared") / shared_temp.name
814 vm_intermediate = vm_shared_temp / vm_src.name
815 intermediate = shared_temp / vm_src.name
816 # Copy the file to the shared directory inside VM
817 self.succeed(make_command(["mkdir", "-p", vm_shared_temp]))
818 self.succeed(make_command(["cp", "-r", vm_src, vm_intermediate]))
819 abs_target = self.out_dir / target_dir / vm_src.name
820 abs_target.parent.mkdir(exist_ok=True, parents=True)
821 # Copy the file from the shared directory outside VM
822 if intermediate.is_dir():
823 shutil.copytree(intermediate, abs_target)
824 else:
825 shutil.copy(intermediate, abs_target)
826
827 def dump_tty_contents(self, tty: str) -> None:
828 """Debugging: Dump the contents of the TTY<n>"""
829 self.execute(f"fold -w 80 /dev/vcs{tty} | systemd-cat")
830
831 def _get_screen_text_variants(self, model_ids: Iterable[int]) -> List[str]:
832 with tempfile.TemporaryDirectory() as tmpdir:
833 screenshot_path = os.path.join(tmpdir, "ppm")
834 self.send_monitor_command(f"screendump {screenshot_path}")
835 return _perform_ocr_on_screenshot(screenshot_path, model_ids)
836
837 def get_screen_text_variants(self) -> List[str]:
838 return self._get_screen_text_variants([0, 1, 2])
839
840 def get_screen_text(self) -> str:
841 return self._get_screen_text_variants([2])[0]
842
843 def wait_for_text(self, regex: str) -> None:
844 def screen_matches(last: bool) -> bool:
845 variants = self.get_screen_text_variants()
846 for text in variants:
847 if re.search(regex, text) is not None:
848 return True
849
850 if last:
851 self.log(f"Last OCR attempt failed. Text was: {variants}")
852
853 return False
854
855 with self.nested(f"waiting for {regex} to appear on screen"):
856 retry(screen_matches)
857
858 def wait_for_console_text(self, regex: str) -> None:
859 with self.nested(f"waiting for {regex} to appear on console"):
860 # Buffer the console output, this is needed
861 # to match multiline regexes.
862 console = io.StringIO()
863 while True:
864 try:
865 console.write(self.last_lines.get())
866 except queue.Empty:
867 self.sleep(1)
868 continue
869 console.seek(0)
870 matches = re.search(regex, console.read())
871 if matches is not None:
872 return
873
874 def send_key(
875 self, key: str, delay: Optional[float] = 0.01, log: Optional[bool] = True
876 ) -> None:
877 key = CHAR_TO_KEY.get(key, key)
878 context = self.nested(f"sending key {repr(key)}") if log else nullcontext()
879 with context:
880 self.send_monitor_command(f"sendkey {key}")
881 if delay is not None:
882 time.sleep(delay)
883
884 def send_console(self, chars: str) -> None:
885 assert self.process
886 assert self.process.stdin
887 self.process.stdin.write(chars.encode())
888 self.process.stdin.flush()
889
890 def start(self, allow_reboot: bool = False) -> None:
891 if self.booted:
892 return
893
894 self.log("starting vm")
895
896 def clear(path: Path) -> Path:
897 if path.exists():
898 path.unlink()
899 return path
900
901 def create_socket(path: Path) -> socket.socket:
902 s = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM)
903 s.bind(str(path))
904 s.listen(1)
905 return s
906
907 monitor_socket = create_socket(clear(self.monitor_path))
908 shell_socket = create_socket(clear(self.shell_path))
909 self.process = self.start_command.run(
910 self.state_dir,
911 self.shared_dir,
912 self.monitor_path,
913 self.shell_path,
914 allow_reboot,
915 )
916 self.monitor, _ = monitor_socket.accept()
917 self.shell, _ = shell_socket.accept()
918
919 # Store last serial console lines for use
920 # of wait_for_console_text
921 self.last_lines: Queue = Queue()
922
923 def process_serial_output() -> None:
924 assert self.process
925 assert self.process.stdout
926 for _line in self.process.stdout:
927 # Ignore undecodable bytes that may occur in boot menus
928 line = _line.decode(errors="ignore").replace("\r", "").rstrip()
929 self.last_lines.put(line)
930 self.log_serial(line)
931
932 self.serial_thread = threading.Thread(target=process_serial_output)
933 self.serial_thread.start()
934
935 self.wait_for_monitor_prompt()
936
937 self.pid = self.process.pid
938 self.booted = True
939
940 self.log(f"QEMU running (pid {self.pid})")
941
942 def cleanup_statedir(self) -> None:
943 shutil.rmtree(self.state_dir)
944 rootlog.log(f"deleting VM state directory {self.state_dir}")
945 rootlog.log("if you want to keep the VM state, pass --keep-vm-state")
946
947 def shutdown(self) -> None:
948 if not self.booted:
949 return
950
951 assert self.shell
952 self.shell.send("poweroff\n".encode())
953 self.wait_for_shutdown()
954
955 def crash(self) -> None:
956 if not self.booted:
957 return
958
959 self.log("forced crash")
960 self.send_monitor_command("quit")
961 self.wait_for_shutdown()
962
963 def reboot(self) -> None:
964 """Press Ctrl+Alt+Delete in the guest.
965
966 Prepares the machine to be reconnected which is useful if the
967 machine was started with `allow_reboot = True`
968 """
969 self.send_key("ctrl-alt-delete")
970 self.connected = False
971
972 def wait_for_x(self) -> None:
973 """Wait until it is possible to connect to the X server. Note that
974 testing the existence of /tmp/.X11-unix/X0 is insufficient.
975 """
976
977 def check_x(_: Any) -> bool:
978 cmd = (
979 "journalctl -b SYSLOG_IDENTIFIER=systemd | "
980 + 'grep "Reached target Current graphical"'
981 )
982 status, _ = self.execute(cmd)
983 if status != 0:
984 return False
985 status, _ = self.execute("[ -e /tmp/.X11-unix/X0 ]")
986 return status == 0
987
988 with self.nested("waiting for the X11 server"):
989 retry(check_x)
990
991 def get_window_names(self) -> List[str]:
992 return self.succeed(
993 r"xwininfo -root -tree | sed 's/.*0x[0-9a-f]* \"\([^\"]*\)\".*/\1/; t; d'"
994 ).splitlines()
995
996 def wait_for_window(self, regexp: str) -> None:
997 pattern = re.compile(regexp)
998
999 def window_is_visible(last_try: bool) -> bool:
1000 names = self.get_window_names()
1001 if last_try:
1002 self.log(
1003 f"Last chance to match {regexp} on the window list,"
1004 + " which currently contains: "
1005 + ", ".join(names)
1006 )
1007 return any(pattern.search(name) for name in names)
1008
1009 with self.nested("waiting for a window to appear"):
1010 retry(window_is_visible)
1011
1012 def sleep(self, secs: int) -> None:
1013 # We want to sleep in *guest* time, not *host* time.
1014 self.succeed(f"sleep {secs}")
1015
1016 def forward_port(self, host_port: int = 8080, guest_port: int = 80) -> None:
1017 """Forward a TCP port on the host to a TCP port on the guest.
1018 Useful during interactive testing.
1019 """
1020 self.send_monitor_command(f"hostfwd_add tcp::{host_port}-:{guest_port}")
1021
1022 def block(self) -> None:
1023 """Make the machine unreachable by shutting down eth1 (the multicast
1024 interface used to talk to the other VMs). We keep eth0 up so that
1025 the test driver can continue to talk to the machine.
1026 """
1027 self.send_monitor_command("set_link virtio-net-pci.1 off")
1028
1029 def unblock(self) -> None:
1030 """Make the machine reachable."""
1031 self.send_monitor_command("set_link virtio-net-pci.1 on")
1032
1033 def release(self) -> None:
1034 if self.pid is None:
1035 return
1036 rootlog.info(f"kill machine (pid {self.pid})")
1037 assert self.process
1038 assert self.shell
1039 assert self.monitor
1040 assert self.serial_thread
1041
1042 self.process.terminate()
1043 self.shell.close()
1044 self.monitor.close()
1045 self.serial_thread.join()
1046
1047 def run_callbacks(self) -> None:
1048 for callback in self.callbacks:
1049 callback()