1import base64
2import io
3import os
4import queue
5import re
6import select
7import shlex
8import shutil
9import socket
10import subprocess
11import sys
12import tempfile
13import threading
14import time
15from contextlib import _GeneratorContextManager, nullcontext
16from pathlib import Path
17from queue import Queue
18from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple
19
20from test_driver.logger import rootlog
21
22from .qmp import QMPSession
23
24CHAR_TO_KEY = {
25 "A": "shift-a",
26 "N": "shift-n",
27 "-": "0x0C",
28 "_": "shift-0x0C",
29 "B": "shift-b",
30 "O": "shift-o",
31 "=": "0x0D",
32 "+": "shift-0x0D",
33 "C": "shift-c",
34 "P": "shift-p",
35 "[": "0x1A",
36 "{": "shift-0x1A",
37 "D": "shift-d",
38 "Q": "shift-q",
39 "]": "0x1B",
40 "}": "shift-0x1B",
41 "E": "shift-e",
42 "R": "shift-r",
43 ";": "0x27",
44 ":": "shift-0x27",
45 "F": "shift-f",
46 "S": "shift-s",
47 "'": "0x28",
48 '"': "shift-0x28",
49 "G": "shift-g",
50 "T": "shift-t",
51 "`": "0x29",
52 "~": "shift-0x29",
53 "H": "shift-h",
54 "U": "shift-u",
55 "\\": "0x2B",
56 "|": "shift-0x2B",
57 "I": "shift-i",
58 "V": "shift-v",
59 ",": "0x33",
60 "<": "shift-0x33",
61 "J": "shift-j",
62 "W": "shift-w",
63 ".": "0x34",
64 ">": "shift-0x34",
65 "K": "shift-k",
66 "X": "shift-x",
67 "/": "0x35",
68 "?": "shift-0x35",
69 "L": "shift-l",
70 "Y": "shift-y",
71 " ": "spc",
72 "M": "shift-m",
73 "Z": "shift-z",
74 "\n": "ret",
75 "!": "shift-0x02",
76 "@": "shift-0x03",
77 "#": "shift-0x04",
78 "$": "shift-0x05",
79 "%": "shift-0x06",
80 "^": "shift-0x07",
81 "&": "shift-0x08",
82 "*": "shift-0x09",
83 "(": "shift-0x0A",
84 ")": "shift-0x0B",
85}
86
87
88def make_command(args: list) -> str:
89 return " ".join(map(shlex.quote, (map(str, args))))
90
91
92def _perform_ocr_on_screenshot(
93 screenshot_path: str, model_ids: Iterable[int]
94) -> List[str]:
95 if shutil.which("tesseract") is None:
96 raise Exception("OCR requested but enableOCR is false")
97
98 magick_args = (
99 "-filter Catrom -density 72 -resample 300 "
100 + "-contrast -normalize -despeckle -type grayscale "
101 + "-sharpen 1 -posterize 3 -negate -gamma 100 "
102 + "-blur 1x65535"
103 )
104
105 tess_args = "-c debug_file=/dev/null --psm 11"
106
107 cmd = f"convert {magick_args} '{screenshot_path}' 'tiff:{screenshot_path}.tiff'"
108 ret = subprocess.run(cmd, shell=True, capture_output=True)
109 if ret.returncode != 0:
110 raise Exception(f"TIFF conversion failed with exit code {ret.returncode}")
111
112 model_results = []
113 for model_id in model_ids:
114 cmd = f"tesseract '{screenshot_path}.tiff' - {tess_args} --oem '{model_id}'"
115 ret = subprocess.run(cmd, shell=True, capture_output=True)
116 if ret.returncode != 0:
117 raise Exception(f"OCR failed with exit code {ret.returncode}")
118 model_results.append(ret.stdout.decode("utf-8"))
119
120 return model_results
121
122
123def retry(fn: Callable, timeout: int = 900) -> None:
124 """Call the given function repeatedly, with 1 second intervals,
125 until it returns True or a timeout is reached.
126 """
127
128 for _ in range(timeout):
129 if fn(False):
130 return
131 time.sleep(1)
132
133 if not fn(True):
134 raise Exception(f"action timed out after {timeout} seconds")
135
136
137class StartCommand:
138 """The Base Start Command knows how to append the necessary
139 runtime qemu options as determined by a particular test driver
140 run. Any such start command is expected to happily receive and
141 append additional qemu args.
142 """
143
144 _cmd: str
145
146 def cmd(
147 self,
148 monitor_socket_path: Path,
149 qmp_socket_path: Path,
150 shell_socket_path: Path,
151 allow_reboot: bool = False,
152 ) -> str:
153 display_opts = ""
154 display_available = any(x in os.environ for x in ["DISPLAY", "WAYLAND_DISPLAY"])
155 if not display_available:
156 display_opts += " -nographic"
157
158 # qemu options
159 qemu_opts = (
160 " -device virtio-serial"
161 # Note: virtconsole will map to /dev/hvc0 in Linux guests
162 " -device virtconsole,chardev=shell"
163 " -device virtio-rng-pci"
164 " -serial stdio"
165 )
166 if not allow_reboot:
167 qemu_opts += " -no-reboot"
168 # TODO: qemu script already catpures this env variable, legacy?
169 qemu_opts += " " + os.environ.get("QEMU_OPTS", "")
170
171 return (
172 f"{self._cmd}"
173 f" -qmp unix:{qmp_socket_path},server=on,wait=off"
174 f" -monitor unix:{monitor_socket_path}"
175 f" -chardev socket,id=shell,path={shell_socket_path}"
176 f"{qemu_opts}"
177 f"{display_opts}"
178 )
179
180 @staticmethod
181 def build_environment(
182 state_dir: Path,
183 shared_dir: Path,
184 ) -> dict:
185 # We make a copy to not update the current environment
186 env = dict(os.environ)
187 env.update(
188 {
189 "TMPDIR": str(state_dir),
190 "SHARED_DIR": str(shared_dir),
191 "USE_TMPDIR": "1",
192 }
193 )
194 return env
195
196 def run(
197 self,
198 state_dir: Path,
199 shared_dir: Path,
200 monitor_socket_path: Path,
201 qmp_socket_path: Path,
202 shell_socket_path: Path,
203 allow_reboot: bool,
204 ) -> subprocess.Popen:
205 return subprocess.Popen(
206 self.cmd(
207 monitor_socket_path, qmp_socket_path, shell_socket_path, allow_reboot
208 ),
209 stdin=subprocess.PIPE,
210 stdout=subprocess.PIPE,
211 stderr=subprocess.STDOUT,
212 shell=True,
213 cwd=state_dir,
214 env=self.build_environment(state_dir, shared_dir),
215 )
216
217
218class NixStartScript(StartCommand):
219 """A start script from nixos/modules/virtualiation/qemu-vm.nix
220 that also satisfies the requirement of the BaseStartCommand.
221 These Nix commands have the particular characteristic that the
222 machine name can be extracted out of them via a regex match.
223 (Admittedly a _very_ implicit contract, evtl. TODO fix)
224 """
225
226 def __init__(self, script: str):
227 self._cmd = script
228
229 @property
230 def machine_name(self) -> str:
231 match = re.search("run-(.+)-vm$", self._cmd)
232 name = "machine"
233 if match:
234 name = match.group(1)
235 return name
236
237
238class LegacyStartCommand(StartCommand):
239 """Used in some places to create an ad-hoc machine instead of
240 using nix test instrumentation + module system for that purpose.
241 Legacy.
242 """
243
244 def __init__(
245 self,
246 netBackendArgs: Optional[str] = None, # noqa: N803
247 netFrontendArgs: Optional[str] = None, # noqa: N803
248 hda: Optional[Tuple[Path, str]] = None,
249 cdrom: Optional[str] = None,
250 usb: Optional[str] = None,
251 bios: Optional[str] = None,
252 qemuBinary: Optional[str] = None, # noqa: N803
253 qemuFlags: Optional[str] = None, # noqa: N803
254 ):
255 if qemuBinary is not None:
256 self._cmd = qemuBinary
257 else:
258 self._cmd = "qemu-kvm"
259
260 self._cmd += " -m 384"
261
262 # networking
263 net_backend = "-netdev user,id=net0"
264 net_frontend = "-device virtio-net-pci,netdev=net0"
265 if netBackendArgs is not None:
266 net_backend += "," + netBackendArgs
267 if netFrontendArgs is not None:
268 net_frontend += "," + netFrontendArgs
269 self._cmd += f" {net_backend} {net_frontend}"
270
271 # hda
272 hda_cmd = ""
273 if hda is not None:
274 hda_path = hda[0].resolve()
275 hda_interface = hda[1]
276 if hda_interface == "scsi":
277 hda_cmd += (
278 f" -drive id=hda,file={hda_path},werror=report,if=none"
279 " -device scsi-hd,drive=hda"
280 )
281 else:
282 hda_cmd += f" -drive file={hda_path},if={hda_interface},werror=report"
283 self._cmd += hda_cmd
284
285 # cdrom
286 if cdrom is not None:
287 self._cmd += f" -cdrom {cdrom}"
288
289 # usb
290 usb_cmd = ""
291 if usb is not None:
292 # https://github.com/qemu/qemu/blob/master/docs/usb2.txt
293 usb_cmd += (
294 " -device usb-ehci"
295 f" -drive id=usbdisk,file={usb},if=none,readonly"
296 " -device usb-storage,drive=usbdisk "
297 )
298 self._cmd += usb_cmd
299
300 # bios
301 if bios is not None:
302 self._cmd += f" -bios {bios}"
303
304 # qemu flags
305 if qemuFlags is not None:
306 self._cmd += f" {qemuFlags}"
307
308
309class Machine:
310 """A handle to the machine with this name, that also knows how to manage
311 the machine lifecycle with the help of a start script / command."""
312
313 name: str
314 out_dir: Path
315 tmp_dir: Path
316 shared_dir: Path
317 state_dir: Path
318 monitor_path: Path
319 qmp_path: Path
320 shell_path: Path
321
322 start_command: StartCommand
323 keep_vm_state: bool
324
325 process: Optional[subprocess.Popen]
326 pid: Optional[int]
327 monitor: Optional[socket.socket]
328 qmp_client: Optional[QMPSession]
329 shell: Optional[socket.socket]
330 serial_thread: Optional[threading.Thread]
331
332 booted: bool
333 connected: bool
334 # Store last serial console lines for use
335 # of wait_for_console_text
336 last_lines: Queue = Queue()
337 callbacks: List[Callable]
338
339 def __repr__(self) -> str:
340 return f"<Machine '{self.name}'>"
341
342 def __init__(
343 self,
344 out_dir: Path,
345 tmp_dir: Path,
346 start_command: StartCommand,
347 name: str = "machine",
348 keep_vm_state: bool = False,
349 callbacks: Optional[List[Callable]] = None,
350 ) -> None:
351 self.out_dir = out_dir
352 self.tmp_dir = tmp_dir
353 self.keep_vm_state = keep_vm_state
354 self.name = name
355 self.start_command = start_command
356 self.callbacks = callbacks if callbacks is not None else []
357
358 # set up directories
359 self.shared_dir = self.tmp_dir / "shared-xchg"
360 self.shared_dir.mkdir(mode=0o700, exist_ok=True)
361
362 self.state_dir = self.tmp_dir / f"vm-state-{self.name}"
363 self.monitor_path = self.state_dir / "monitor"
364 self.qmp_path = self.state_dir / "qmp"
365 self.shell_path = self.state_dir / "shell"
366 if (not self.keep_vm_state) and self.state_dir.exists():
367 self.cleanup_statedir()
368 self.state_dir.mkdir(mode=0o700, exist_ok=True)
369
370 self.process = None
371 self.pid = None
372 self.monitor = None
373 self.qmp_client = None
374 self.shell = None
375 self.serial_thread = None
376
377 self.booted = False
378 self.connected = False
379
380 @staticmethod
381 def create_startcommand(args: Dict[str, str]) -> StartCommand:
382 rootlog.warning(
383 "Using legacy create_startcommand(), "
384 "please use proper nix test vm instrumentation, instead "
385 "to generate the appropriate nixos test vm qemu startup script"
386 )
387 hda = None
388 if args.get("hda"):
389 hda_arg: str = args.get("hda", "")
390 hda_arg_path: Path = Path(hda_arg)
391 hda = (hda_arg_path, args.get("hdaInterface", ""))
392 return LegacyStartCommand(
393 netBackendArgs=args.get("netBackendArgs"),
394 netFrontendArgs=args.get("netFrontendArgs"),
395 hda=hda,
396 cdrom=args.get("cdrom"),
397 usb=args.get("usb"),
398 bios=args.get("bios"),
399 qemuBinary=args.get("qemuBinary"),
400 qemuFlags=args.get("qemuFlags"),
401 )
402
403 def is_up(self) -> bool:
404 return self.booted and self.connected
405
406 def log(self, msg: str) -> None:
407 rootlog.log(msg, {"machine": self.name})
408
409 def log_serial(self, msg: str) -> None:
410 rootlog.log_serial(msg, self.name)
411
412 def nested(self, msg: str, attrs: Dict[str, str] = {}) -> _GeneratorContextManager:
413 my_attrs = {"machine": self.name}
414 my_attrs.update(attrs)
415 return rootlog.nested(msg, my_attrs)
416
417 def wait_for_monitor_prompt(self) -> str:
418 assert self.monitor is not None
419 answer = ""
420 while True:
421 undecoded_answer = self.monitor.recv(1024)
422 if not undecoded_answer:
423 break
424 answer += undecoded_answer.decode()
425 if answer.endswith("(qemu) "):
426 break
427 return answer
428
429 def send_monitor_command(self, command: str) -> str:
430 """
431 Send a command to the QEMU monitor. This allows attaching
432 virtual USB disks to a running machine, among other things.
433 """
434 self.run_callbacks()
435 message = f"{command}\n".encode()
436 assert self.monitor is not None
437 self.monitor.send(message)
438 return self.wait_for_monitor_prompt()
439
440 def wait_for_unit(
441 self, unit: str, user: Optional[str] = None, timeout: int = 900
442 ) -> None:
443 """
444 Wait for a systemd unit to get into "active" state.
445 Throws exceptions on "failed" and "inactive" states as well as after
446 timing out.
447 """
448
449 def check_active(_: Any) -> bool:
450 info = self.get_unit_info(unit, user)
451 state = info["ActiveState"]
452 if state == "failed":
453 raise Exception(f'unit "{unit}" reached state "{state}"')
454
455 if state == "inactive":
456 status, jobs = self.systemctl("list-jobs --full 2>&1", user)
457 if "No jobs" in jobs:
458 info = self.get_unit_info(unit, user)
459 if info["ActiveState"] == state:
460 raise Exception(
461 f'unit "{unit}" is inactive and there are no pending jobs'
462 )
463
464 return state == "active"
465
466 with self.nested(
467 f"waiting for unit {unit}"
468 + (f" with user {user}" if user is not None else "")
469 ):
470 retry(check_active, timeout)
471
472 def get_unit_info(self, unit: str, user: Optional[str] = None) -> Dict[str, str]:
473 status, lines = self.systemctl(f'--no-pager show "{unit}"', user)
474 if status != 0:
475 raise Exception(
476 f'retrieving systemctl info for unit "{unit}"'
477 + ("" if user is None else f' under user "{user}"')
478 + f" failed with exit code {status}"
479 )
480
481 line_pattern = re.compile(r"^([^=]+)=(.*)$")
482
483 def tuple_from_line(line: str) -> Tuple[str, str]:
484 match = line_pattern.match(line)
485 assert match is not None
486 return match[1], match[2]
487
488 return dict(
489 tuple_from_line(line)
490 for line in lines.split("\n")
491 if line_pattern.match(line)
492 )
493
494 def systemctl(self, q: str, user: Optional[str] = None) -> Tuple[int, str]:
495 """
496 Runs `systemctl` commands with optional support for
497 `systemctl --user`
498
499 ```py
500 # run `systemctl list-jobs --no-pager`
501 machine.systemctl("list-jobs --no-pager")
502
503 # spawn a shell for `any-user` and run
504 # `systemctl --user list-jobs --no-pager`
505 machine.systemctl("list-jobs --no-pager", "any-user")
506 ```
507 """
508 if user is not None:
509 q = q.replace("'", "\\'")
510 return self.execute(
511 f"su -l {user} --shell /bin/sh -c "
512 "$'XDG_RUNTIME_DIR=/run/user/`id -u` "
513 f"systemctl --user {q}'"
514 )
515 return self.execute(f"systemctl {q}")
516
517 def require_unit_state(self, unit: str, require_state: str = "active") -> None:
518 with self.nested(
519 f"checking if unit '{unit}' has reached state '{require_state}'"
520 ):
521 info = self.get_unit_info(unit)
522 state = info["ActiveState"]
523 if state != require_state:
524 raise Exception(
525 f"Expected unit '{unit}' to to be in state "
526 f"'{require_state}' but it is in state '{state}'"
527 )
528
529 def _next_newline_closed_block_from_shell(self) -> str:
530 assert self.shell
531 output_buffer = []
532 while True:
533 # This receives up to 4096 bytes from the socket
534 chunk = self.shell.recv(4096)
535 if not chunk:
536 # Probably a broken pipe, return the output we have
537 break
538
539 decoded = chunk.decode()
540 output_buffer += [decoded]
541 if decoded[-1] == "\n":
542 break
543 return "".join(output_buffer)
544
545 def execute(
546 self,
547 command: str,
548 check_return: bool = True,
549 check_output: bool = True,
550 timeout: Optional[int] = 900,
551 ) -> Tuple[int, str]:
552 """
553 Execute a shell command, returning a list `(status, stdout)`.
554
555 Commands are run with `set -euo pipefail` set:
556
557 - If several commands are separated by `;` and one fails, the
558 command as a whole will fail.
559
560 - For pipelines, the last non-zero exit status will be returned
561 (if there is one; otherwise zero will be returned).
562
563 - Dereferencing unset variables fails the command.
564
565 - It will wait for stdout to be closed.
566
567 If the command detaches, it must close stdout, as `execute` will wait
568 for this to consume all output reliably. This can be achieved by
569 redirecting stdout to stderr `>&2`, to `/dev/console`, `/dev/null` or
570 a file. Examples of detaching commands are `sleep 365d &`, where the
571 shell forks a new process that can write to stdout and `xclip -i`, where
572 the `xclip` command itself forks without closing stdout.
573
574 Takes an optional parameter `check_return` that defaults to `True`.
575 Setting this parameter to `False` will not check for the return code
576 and return -1 instead. This can be used for commands that shut down
577 the VM and would therefore break the pipe that would be used for
578 retrieving the return code.
579
580 A timeout for the command can be specified (in seconds) using the optional
581 `timeout` parameter, e.g., `execute(cmd, timeout=10)` or
582 `execute(cmd, timeout=None)`. The default is 900 seconds.
583 """
584 self.run_callbacks()
585 self.connect()
586
587 # Always run command with shell opts
588 command = f"set -euo pipefail; {command}"
589
590 timeout_str = ""
591 if timeout is not None:
592 timeout_str = f"timeout {timeout}"
593
594 # While sh is bash on NixOS, this is not the case for every distro.
595 # We explicitly call bash here to allow for the driver to boot other distros as well.
596 out_command = (
597 f"{timeout_str} bash -c {shlex.quote(command)} | (base64 -w 0; echo)\n"
598 )
599
600 assert self.shell
601 self.shell.send(out_command.encode())
602
603 if not check_output:
604 return (-2, "")
605
606 # Get the output
607 output = base64.b64decode(self._next_newline_closed_block_from_shell())
608
609 if not check_return:
610 return (-1, output.decode())
611
612 # Get the return code
613 self.shell.send(b"echo ${PIPESTATUS[0]}\n")
614 rc = int(self._next_newline_closed_block_from_shell().strip())
615
616 return (rc, output.decode(errors="replace"))
617
618 def shell_interact(self, address: Optional[str] = None) -> None:
619 """
620 Allows you to directly interact with the guest shell. This should
621 only be used during test development, not in production tests.
622 Killing the interactive session with `Ctrl-d` or `Ctrl-c` also ends
623 the guest session.
624 """
625 self.connect()
626
627 if address is None:
628 address = "READLINE,prompt=$ "
629 self.log("Terminal is ready (there is no initial prompt):")
630
631 assert self.shell
632 try:
633 subprocess.run(
634 ["socat", address, f"FD:{self.shell.fileno()}"],
635 pass_fds=[self.shell.fileno()],
636 )
637 # allow users to cancel this command without breaking the test
638 except KeyboardInterrupt:
639 pass
640
641 def console_interact(self) -> None:
642 """
643 Allows you to directly interact with QEMU's stdin, by forwarding
644 terminal input to the QEMU process.
645 This is for use with the interactive test driver, not for production
646 tests, which run unattended.
647 Output from QEMU is only read line-wise. `Ctrl-c` kills QEMU and
648 `Ctrl-d` closes console and returns to the test runner.
649 """
650 self.log("Terminal is ready (there is no prompt):")
651
652 assert self.process
653 assert self.process.stdin
654
655 while True:
656 try:
657 char = sys.stdin.buffer.read(1)
658 except KeyboardInterrupt:
659 break
660 if char == b"": # ctrl+d
661 self.log("Closing connection to the console")
662 break
663 self.send_console(char.decode())
664
665 def succeed(self, *commands: str, timeout: Optional[int] = None) -> str:
666 """
667 Execute a shell command, raising an exception if the exit status is
668 not zero, otherwise returning the standard output. Similar to `execute`,
669 except that the timeout is `None` by default. See `execute` for details on
670 command execution.
671 """
672 output = ""
673 for command in commands:
674 with self.nested(f"must succeed: {command}"):
675 (status, out) = self.execute(command, timeout=timeout)
676 if status != 0:
677 self.log(f"output: {out}")
678 raise Exception(f"command `{command}` failed (exit code {status})")
679 output += out
680 return output
681
682 def fail(self, *commands: str, timeout: Optional[int] = None) -> str:
683 """
684 Like `succeed`, but raising an exception if the command returns a zero
685 status.
686 """
687 output = ""
688 for command in commands:
689 with self.nested(f"must fail: {command}"):
690 (status, out) = self.execute(command, timeout=timeout)
691 if status == 0:
692 raise Exception(f"command `{command}` unexpectedly succeeded")
693 output += out
694 return output
695
696 def wait_until_succeeds(self, command: str, timeout: int = 900) -> str:
697 """
698 Repeat a shell command with 1-second intervals until it succeeds.
699 Has a default timeout of 900 seconds which can be modified, e.g.
700 `wait_until_succeeds(cmd, timeout=10)`. See `execute` for details on
701 command execution.
702 Throws an exception on timeout.
703 """
704 output = ""
705
706 def check_success(_: Any) -> bool:
707 nonlocal output
708 status, output = self.execute(command, timeout=timeout)
709 return status == 0
710
711 with self.nested(f"waiting for success: {command}"):
712 retry(check_success, timeout)
713 return output
714
715 def wait_until_fails(self, command: str, timeout: int = 900) -> str:
716 """
717 Like `wait_until_succeeds`, but repeating the command until it fails.
718 """
719 output = ""
720
721 def check_failure(_: Any) -> bool:
722 nonlocal output
723 status, output = self.execute(command, timeout=timeout)
724 return status != 0
725
726 with self.nested(f"waiting for failure: {command}"):
727 retry(check_failure, timeout)
728 return output
729
730 def wait_for_shutdown(self) -> None:
731 if not self.booted:
732 return
733
734 with self.nested("waiting for the VM to power off"):
735 sys.stdout.flush()
736 assert self.process
737 self.process.wait()
738
739 self.pid = None
740 self.booted = False
741 self.connected = False
742
743 def get_tty_text(self, tty: str) -> str:
744 status, output = self.execute(
745 f"fold -w$(stty -F /dev/tty{tty} size | "
746 f"awk '{{print $2}}') /dev/vcs{tty}"
747 )
748 return output
749
750 def wait_until_tty_matches(self, tty: str, regexp: str, timeout: int = 900) -> None:
751 """Wait until the visible output on the chosen TTY matches regular
752 expression. Throws an exception on timeout.
753 """
754 matcher = re.compile(regexp)
755
756 def tty_matches(last: bool) -> bool:
757 text = self.get_tty_text(tty)
758 if last:
759 self.log(
760 f"Last chance to match /{regexp}/ on TTY{tty}, "
761 f"which currently contains: {text}"
762 )
763 return len(matcher.findall(text)) > 0
764
765 with self.nested(f"waiting for {regexp} to appear on tty {tty}"):
766 retry(tty_matches, timeout)
767
768 def send_chars(self, chars: str, delay: Optional[float] = 0.01) -> None:
769 """
770 Simulate typing a sequence of characters on the virtual keyboard,
771 e.g., `send_chars("foobar\n")` will type the string `foobar`
772 followed by the Enter key.
773 """
774 with self.nested(f"sending keys {repr(chars)}"):
775 for char in chars:
776 self.send_key(char, delay, log=False)
777
778 def wait_for_file(self, filename: str, timeout: int = 900) -> None:
779 """
780 Waits until the file exists in the machine's file system.
781 """
782
783 def check_file(_: Any) -> bool:
784 status, _ = self.execute(f"test -e {filename}")
785 return status == 0
786
787 with self.nested(f"waiting for file '{filename}'"):
788 retry(check_file, timeout)
789
790 def wait_for_open_port(
791 self, port: int, addr: str = "localhost", timeout: int = 900
792 ) -> None:
793 """
794 Wait until a process is listening on the given TCP port and IP address
795 (default `localhost`).
796 """
797
798 def port_is_open(_: Any) -> bool:
799 status, _ = self.execute(f"nc -z {addr} {port}")
800 return status == 0
801
802 with self.nested(f"waiting for TCP port {port} on {addr}"):
803 retry(port_is_open, timeout)
804
805 def wait_for_open_unix_socket(
806 self, addr: str, is_datagram: bool = False, timeout: int = 900
807 ) -> None:
808 """
809 Wait until a process is listening on the given UNIX-domain socket
810 (default to a UNIX-domain stream socket).
811 """
812
813 nc_flags = [
814 "-z",
815 "-uU" if is_datagram else "-U",
816 ]
817
818 def socket_is_open(_: Any) -> bool:
819 status, _ = self.execute(f"nc {' '.join(nc_flags)} {addr}")
820 return status == 0
821
822 with self.nested(
823 f"waiting for UNIX-domain {'datagram' if is_datagram else 'stream'} on '{addr}'"
824 ):
825 retry(socket_is_open, timeout)
826
827 def wait_for_closed_port(
828 self, port: int, addr: str = "localhost", timeout: int = 900
829 ) -> None:
830 """
831 Wait until nobody is listening on the given TCP port and IP address
832 (default `localhost`).
833 """
834
835 def port_is_closed(_: Any) -> bool:
836 status, _ = self.execute(f"nc -z {addr} {port}")
837 return status != 0
838
839 with self.nested(f"waiting for TCP port {port} on {addr} to be closed"):
840 retry(port_is_closed, timeout)
841
842 def start_job(self, jobname: str, user: Optional[str] = None) -> Tuple[int, str]:
843 return self.systemctl(f"start {jobname}", user)
844
845 def stop_job(self, jobname: str, user: Optional[str] = None) -> Tuple[int, str]:
846 return self.systemctl(f"stop {jobname}", user)
847
848 def wait_for_job(self, jobname: str) -> None:
849 self.wait_for_unit(jobname)
850
851 def connect(self) -> None:
852 def shell_ready(timeout_secs: int) -> bool:
853 """We sent some data from the backdoor service running on the guest
854 to indicate that the backdoor shell is ready.
855 As soon as we read some data from the socket here, we assume that
856 our root shell is operational.
857 """
858 (ready, _, _) = select.select([self.shell], [], [], timeout_secs)
859 return bool(ready)
860
861 if self.connected:
862 return
863
864 with self.nested("waiting for the VM to finish booting"):
865 self.start()
866
867 assert self.shell
868
869 tic = time.time()
870 # TODO: do we want to bail after a set number of attempts?
871 while not shell_ready(timeout_secs=30):
872 self.log("Guest root shell did not produce any data yet...")
873 self.log(
874 " To debug, enter the VM and run 'systemctl status backdoor.service'."
875 )
876
877 while True:
878 chunk = self.shell.recv(1024)
879 # No need to print empty strings, it means we are waiting.
880 if len(chunk) == 0:
881 continue
882 self.log(f"Guest shell says: {chunk!r}")
883 # NOTE: for this to work, nothing must be printed after this line!
884 if b"Spawning backdoor root shell..." in chunk:
885 break
886
887 toc = time.time()
888
889 self.log("connected to guest root shell")
890 self.log(f"(connecting took {toc - tic:.2f} seconds)")
891 self.connected = True
892
893 def screenshot(self, filename: str) -> None:
894 """
895 Take a picture of the display of the virtual machine, in PNG format.
896 The screenshot will be available in the derivation output.
897 """
898 if "." not in filename:
899 filename += ".png"
900 if "/" not in filename:
901 filename = os.path.join(self.out_dir, filename)
902 tmp = f"{filename}.ppm"
903
904 with self.nested(
905 f"making screenshot {filename}",
906 {"image": os.path.basename(filename)},
907 ):
908 self.send_monitor_command(f"screendump {tmp}")
909 ret = subprocess.run(f"pnmtopng '{tmp}' > '{filename}'", shell=True)
910 os.unlink(tmp)
911 if ret.returncode != 0:
912 raise Exception("Cannot convert screenshot")
913
914 def copy_from_host_via_shell(self, source: str, target: str) -> None:
915 """Copy a file from the host into the guest by piping it over the
916 shell into the destination file. Works without host-guest shared folder.
917 Prefer copy_from_host for whenever possible.
918 """
919 with open(source, "rb") as fh:
920 content_b64 = base64.b64encode(fh.read()).decode()
921 self.succeed(
922 f"mkdir -p $(dirname {target})",
923 f"echo -n {content_b64} | base64 -d > {target}",
924 )
925
926 def copy_from_host(self, source: str, target: str) -> None:
927 """
928 Copies a file from host to machine, e.g.,
929 `copy_from_host("myfile", "/etc/my/important/file")`.
930
931 The first argument is the file on the host. Note that the "host" refers
932 to the environment in which the test driver runs, which is typically the
933 Nix build sandbox.
934
935 The second argument is the location of the file on the machine that will
936 be written to.
937
938 The file is copied via the `shared_dir` directory which is shared among
939 all the VMs (using a temporary directory).
940 The access rights bits will mimic the ones from the host file and
941 user:group will be root:root.
942 """
943 host_src = Path(source)
944 vm_target = Path(target)
945 with tempfile.TemporaryDirectory(dir=self.shared_dir) as shared_td:
946 shared_temp = Path(shared_td)
947 host_intermediate = shared_temp / host_src.name
948 vm_shared_temp = Path("/tmp/shared") / shared_temp.name
949 vm_intermediate = vm_shared_temp / host_src.name
950
951 self.succeed(make_command(["mkdir", "-p", vm_shared_temp]))
952 if host_src.is_dir():
953 shutil.copytree(host_src, host_intermediate)
954 else:
955 shutil.copy(host_src, host_intermediate)
956 self.succeed(make_command(["mkdir", "-p", vm_target.parent]))
957 self.succeed(make_command(["cp", "-r", vm_intermediate, vm_target]))
958
959 def copy_from_vm(self, source: str, target_dir: str = "") -> None:
960 """Copy a file from the VM (specified by an in-VM source path) to a path
961 relative to `$out`. The file is copied via the `shared_dir` shared among
962 all the VMs (using a temporary directory).
963 """
964 # Compute the source, target, and intermediate shared file names
965 vm_src = Path(source)
966 with tempfile.TemporaryDirectory(dir=self.shared_dir) as shared_td:
967 shared_temp = Path(shared_td)
968 vm_shared_temp = Path("/tmp/shared") / shared_temp.name
969 vm_intermediate = vm_shared_temp / vm_src.name
970 intermediate = shared_temp / vm_src.name
971 # Copy the file to the shared directory inside VM
972 self.succeed(make_command(["mkdir", "-p", vm_shared_temp]))
973 self.succeed(make_command(["cp", "-r", vm_src, vm_intermediate]))
974 abs_target = self.out_dir / target_dir / vm_src.name
975 abs_target.parent.mkdir(exist_ok=True, parents=True)
976 # Copy the file from the shared directory outside VM
977 if intermediate.is_dir():
978 shutil.copytree(intermediate, abs_target)
979 else:
980 shutil.copy(intermediate, abs_target)
981
982 def dump_tty_contents(self, tty: str) -> None:
983 """Debugging: Dump the contents of the TTY<n>"""
984 self.execute(f"fold -w 80 /dev/vcs{tty} | systemd-cat")
985
986 def _get_screen_text_variants(self, model_ids: Iterable[int]) -> List[str]:
987 with tempfile.TemporaryDirectory() as tmpdir:
988 screenshot_path = os.path.join(tmpdir, "ppm")
989 self.send_monitor_command(f"screendump {screenshot_path}")
990 return _perform_ocr_on_screenshot(screenshot_path, model_ids)
991
992 def get_screen_text_variants(self) -> List[str]:
993 """
994 Return a list of different interpretations of what is currently
995 visible on the machine's screen using optical character
996 recognition. The number and order of the interpretations is not
997 specified and is subject to change, but if no exception is raised at
998 least one will be returned.
999
1000 ::: {.note}
1001 This requires [`enableOCR`](#test-opt-enableOCR) to be set to `true`.
1002 :::
1003 """
1004 return self._get_screen_text_variants([0, 1, 2])
1005
1006 def get_screen_text(self) -> str:
1007 """
1008 Return a textual representation of what is currently visible on the
1009 machine's screen using optical character recognition.
1010
1011 ::: {.note}
1012 This requires [`enableOCR`](#test-opt-enableOCR) to be set to `true`.
1013 :::
1014 """
1015 return self._get_screen_text_variants([2])[0]
1016
1017 def wait_for_text(self, regex: str, timeout: int = 900) -> None:
1018 """
1019 Wait until the supplied regular expressions matches the textual
1020 contents of the screen by using optical character recognition (see
1021 `get_screen_text` and `get_screen_text_variants`).
1022
1023 ::: {.note}
1024 This requires [`enableOCR`](#test-opt-enableOCR) to be set to `true`.
1025 :::
1026 """
1027
1028 def screen_matches(last: bool) -> bool:
1029 variants = self.get_screen_text_variants()
1030 for text in variants:
1031 if re.search(regex, text) is not None:
1032 return True
1033
1034 if last:
1035 self.log(f"Last OCR attempt failed. Text was: {variants}")
1036
1037 return False
1038
1039 with self.nested(f"waiting for {regex} to appear on screen"):
1040 retry(screen_matches, timeout)
1041
1042 def wait_for_console_text(self, regex: str, timeout: int | None = None) -> None:
1043 """
1044 Wait until the supplied regular expressions match a line of the
1045 serial console output.
1046 This method is useful when OCR is not possible or inaccurate.
1047 """
1048 # Buffer the console output, this is needed
1049 # to match multiline regexes.
1050 console = io.StringIO()
1051
1052 def console_matches(_: Any) -> bool:
1053 nonlocal console
1054 try:
1055 # This will return as soon as possible and
1056 # sleep 1 second.
1057 console.write(self.last_lines.get(block=False))
1058 except queue.Empty:
1059 pass
1060 console.seek(0)
1061 matches = re.search(regex, console.read())
1062 return matches is not None
1063
1064 with self.nested(f"waiting for {regex} to appear on console"):
1065 if timeout is not None:
1066 retry(console_matches, timeout)
1067 else:
1068 while not console_matches(False):
1069 pass
1070
1071 def send_key(
1072 self, key: str, delay: Optional[float] = 0.01, log: Optional[bool] = True
1073 ) -> None:
1074 """
1075 Simulate pressing keys on the virtual keyboard, e.g.,
1076 `send_key("ctrl-alt-delete")`.
1077
1078 Please also refer to the QEMU documentation for more information on the
1079 input syntax: https://en.wikibooks.org/wiki/QEMU/Monitor#sendkey_keys
1080 """
1081 key = CHAR_TO_KEY.get(key, key)
1082 context = self.nested(f"sending key {repr(key)}") if log else nullcontext()
1083 with context:
1084 self.send_monitor_command(f"sendkey {key}")
1085 if delay is not None:
1086 time.sleep(delay)
1087
1088 def send_console(self, chars: str) -> None:
1089 r"""
1090 Send keys to the kernel console. This allows interaction with the systemd
1091 emergency mode, for example. Takes a string that is sent, e.g.,
1092 `send_console("\n\nsystemctl default\n")`.
1093 """
1094 assert self.process
1095 assert self.process.stdin
1096 self.process.stdin.write(chars.encode())
1097 self.process.stdin.flush()
1098
1099 def start(self, allow_reboot: bool = False) -> None:
1100 """
1101 Start the virtual machine. This method is asynchronous --- it does
1102 not wait for the machine to finish booting.
1103 """
1104 if self.booted:
1105 return
1106
1107 self.log("starting vm")
1108
1109 def clear(path: Path) -> Path:
1110 if path.exists():
1111 path.unlink()
1112 return path
1113
1114 def create_socket(path: Path) -> socket.socket:
1115 s = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM)
1116 s.bind(str(path))
1117 s.listen(1)
1118 return s
1119
1120 monitor_socket = create_socket(clear(self.monitor_path))
1121 shell_socket = create_socket(clear(self.shell_path))
1122 self.process = self.start_command.run(
1123 self.state_dir,
1124 self.shared_dir,
1125 self.monitor_path,
1126 self.qmp_path,
1127 self.shell_path,
1128 allow_reboot,
1129 )
1130 self.monitor, _ = monitor_socket.accept()
1131 self.shell, _ = shell_socket.accept()
1132 self.qmp_client = QMPSession.from_path(self.qmp_path)
1133
1134 # Store last serial console lines for use
1135 # of wait_for_console_text
1136 self.last_lines: Queue = Queue()
1137
1138 def process_serial_output() -> None:
1139 assert self.process
1140 assert self.process.stdout
1141 for _line in self.process.stdout:
1142 # Ignore undecodable bytes that may occur in boot menus
1143 line = _line.decode(errors="ignore").replace("\r", "").rstrip()
1144 self.last_lines.put(line)
1145 self.log_serial(line)
1146
1147 self.serial_thread = threading.Thread(target=process_serial_output)
1148 self.serial_thread.start()
1149
1150 self.wait_for_monitor_prompt()
1151
1152 self.pid = self.process.pid
1153 self.booted = True
1154
1155 self.log(f"QEMU running (pid {self.pid})")
1156
1157 def cleanup_statedir(self) -> None:
1158 shutil.rmtree(self.state_dir)
1159 rootlog.log(f"deleting VM state directory {self.state_dir}")
1160 rootlog.log("if you want to keep the VM state, pass --keep-vm-state")
1161
1162 def shutdown(self) -> None:
1163 """
1164 Shut down the machine, waiting for the VM to exit.
1165 """
1166 if not self.booted:
1167 return
1168
1169 assert self.shell
1170 self.shell.send(b"poweroff\n")
1171 self.wait_for_shutdown()
1172
1173 def crash(self) -> None:
1174 """
1175 Simulate a sudden power failure, by telling the VM to exit immediately.
1176 """
1177 if not self.booted:
1178 return
1179
1180 self.log("forced crash")
1181 self.send_monitor_command("quit")
1182 self.wait_for_shutdown()
1183
1184 def reboot(self) -> None:
1185 """Press Ctrl+Alt+Delete in the guest.
1186
1187 Prepares the machine to be reconnected which is useful if the
1188 machine was started with `allow_reboot = True`
1189 """
1190 self.send_key("ctrl-alt-delete")
1191 self.connected = False
1192
1193 def wait_for_x(self, timeout: int = 900) -> None:
1194 """
1195 Wait until it is possible to connect to the X server.
1196 """
1197
1198 def check_x(_: Any) -> bool:
1199 cmd = (
1200 "journalctl -b SYSLOG_IDENTIFIER=systemd | "
1201 + 'grep "Reached target Current graphical"'
1202 )
1203 status, _ = self.execute(cmd)
1204 if status != 0:
1205 return False
1206 status, _ = self.execute("[ -e /tmp/.X11-unix/X0 ]")
1207 return status == 0
1208
1209 with self.nested("waiting for the X11 server"):
1210 retry(check_x, timeout)
1211
1212 def get_window_names(self) -> List[str]:
1213 return self.succeed(
1214 r"xwininfo -root -tree | sed 's/.*0x[0-9a-f]* \"\([^\"]*\)\".*/\1/; t; d'"
1215 ).splitlines()
1216
1217 def wait_for_window(self, regexp: str, timeout: int = 900) -> None:
1218 """
1219 Wait until an X11 window has appeared whose name matches the given
1220 regular expression, e.g., `wait_for_window("Terminal")`.
1221 """
1222 pattern = re.compile(regexp)
1223
1224 def window_is_visible(last_try: bool) -> bool:
1225 names = self.get_window_names()
1226 if last_try:
1227 self.log(
1228 f"Last chance to match {regexp} on the window list,"
1229 + " which currently contains: "
1230 + ", ".join(names)
1231 )
1232 return any(pattern.search(name) for name in names)
1233
1234 with self.nested("waiting for a window to appear"):
1235 retry(window_is_visible, timeout)
1236
1237 def sleep(self, secs: int) -> None:
1238 # We want to sleep in *guest* time, not *host* time.
1239 self.succeed(f"sleep {secs}")
1240
1241 def forward_port(self, host_port: int = 8080, guest_port: int = 80) -> None:
1242 """
1243 Forward a TCP port on the host to a TCP port on the guest.
1244 Useful during interactive testing.
1245 """
1246 self.send_monitor_command(f"hostfwd_add tcp::{host_port}-:{guest_port}")
1247
1248 def block(self) -> None:
1249 """
1250 Simulate unplugging the Ethernet cable that connects the machine to
1251 the other machines.
1252 This happens by shutting down eth1 (the multicast interface used to talk
1253 to the other VMs). eth0 is kept online to still enable the test driver
1254 to communicate with the machine.
1255 """
1256 self.send_monitor_command("set_link virtio-net-pci.1 off")
1257
1258 def unblock(self) -> None:
1259 """
1260 Undo the effect of `block`.
1261 """
1262 self.send_monitor_command("set_link virtio-net-pci.1 on")
1263
1264 def release(self) -> None:
1265 if self.pid is None:
1266 return
1267 rootlog.info(f"kill machine (pid {self.pid})")
1268 assert self.process
1269 assert self.shell
1270 assert self.monitor
1271 assert self.serial_thread
1272
1273 self.process.terminate()
1274 self.shell.close()
1275 self.monitor.close()
1276 self.serial_thread.join()
1277
1278 def run_callbacks(self) -> None:
1279 for callback in self.callbacks:
1280 callback()
1281
1282 def switch_root(self) -> None:
1283 """
1284 Transition from stage 1 to stage 2. This requires the
1285 machine to be configured with `testing.initrdBackdoor = true`
1286 and `boot.initrd.systemd.enable = true`.
1287 """
1288 self.wait_for_unit("initrd.target")
1289 self.execute(
1290 "systemctl isolate --no-block initrd-switch-root.target 2>/dev/null >/dev/null",
1291 check_return=False,
1292 check_output=False,
1293 )
1294 self.wait_for_console_text(r"systemd\[1\]:.*Switching root\.")
1295 self.connected = False
1296 self.connect()