···
def send_monitor_command(self, command: str) -> str:
423
-
with self.nested("sending monitor command: {}".format(command)):
424
-
message = ("{}\n".format(command)).encode()
423
+
with self.nested(f"sending monitor command: {command}"):
424
+
message = f"{command}\n".encode()
assert self.monitor is not None
self.monitor.send(message)
return self.wait_for_monitor_prompt()
···
info = self.get_unit_info(unit, user)
state = info["ActiveState"]
441
-
raise Exception('unit "{}" reached state "{}"'.format(unit, state))
441
+
raise Exception(f'unit "{unit}" reached state "{state}"')
status, jobs = self.systemctl("list-jobs --full 2>&1", user)
···
info = self.get_unit_info(unit, user)
if info["ActiveState"] == state:
450
-
'unit "{}" is inactive and there ' "are no pending jobs"
449
+
f'unit "{unit}" is inactive and there are no pending jobs'
457
-
"waiting for unit {}{}".format(
458
-
unit, f" with user {user}" if user is not None else ""
455
+
f"waiting for unit {unit}"
456
+
+ (f" with user {user}" if user is not None else "")
retry(check_active, timeout)
def get_unit_info(self, unit: str, user: Optional[str] = None) -> Dict[str, str]:
464
-
status, lines = self.systemctl('--no-pager show "{}"'.format(unit), user)
461
+
status, lines = self.systemctl(f'--no-pager show "{unit}"', user)
467
-
'retrieving systemctl info for unit "{}" {} failed with exit code {}'.format(
468
-
unit, "" if user is None else 'under user "{}"'.format(user), status
464
+
f'retrieving systemctl info for unit "{unit}"'
465
+
+ ("" if user is None else f' under user "{user}"')
466
+
+ f" failed with exit code {status}"
line_pattern = re.compile(r"^([^=]+)=(.*)$")
···
q = q.replace("'", "\\'")
490
-
"su -l {} --shell /bin/sh -c "
491
-
"$'XDG_RUNTIME_DIR=/run/user/`id -u` "
492
-
"systemctl --user {}'"
486
+
f"su -l {user} --shell /bin/sh -c "
487
+
"$'XDG_RUNTIME_DIR=/run/user/`id -u` "
488
+
f"systemctl --user {q}'"
495
-
return self.execute("systemctl {}".format(q))
490
+
return self.execute(f"systemctl {q}")
def require_unit_state(self, unit: str, require_state: str = "active") -> None:
499
-
"checking if unit ‘{}’ has reached state '{}'".format(unit, require_state)
494
+
f"checking if unit ‘{unit}’ has reached state '{require_state}'"
info = self.get_unit_info(unit)
state = info["ActiveState"]
if state != require_state:
505
-
"Expected unit ‘{}’ to to be in state ".format(unit)
506
-
+ "'{}' but it is in state ‘{}’".format(require_state, state)
500
+
f"Expected unit ‘{unit}’ to to be in state "
501
+
f"'{require_state}' but it is in state ‘{state}’"
def _next_newline_closed_block_from_shell(self) -> str:
···
"""Execute each command and check that it succeeds."""
596
-
with self.nested("must succeed: {}".format(command)):
591
+
with self.nested(f"must succeed: {command}"):
(status, out) = self.execute(command, timeout=timeout)
599
-
self.log("output: {}".format(out))
601
-
"command `{}` failed (exit code {})".format(command, status)
594
+
self.log(f"output: {out}")
595
+
raise Exception(f"command `{command}` failed (exit code {status})")
···
"""Execute each command and check that it fails."""
610
-
with self.nested("must fail: {}".format(command)):
603
+
with self.nested(f"must fail: {command}"):
(status, out) = self.execute(command, timeout=timeout)
614
-
"command `{}` unexpectedly succeeded".format(command)
606
+
raise Exception(f"command `{command}` unexpectedly succeeded")
···
status, output = self.execute(command, timeout=timeout)
630
-
with self.nested("waiting for success: {}".format(command)):
621
+
with self.nested(f"waiting for success: {command}"):
retry(check_success, timeout)
···
status, output = self.execute(command, timeout=timeout)
645
-
with self.nested("waiting for failure: {}".format(command)):
636
+
with self.nested(f"waiting for failure: {command}"):
···
def get_tty_text(self, tty: str) -> str:
status, output = self.execute(
664
-
"fold -w$(stty -F /dev/tty{0} size | "
665
-
"awk '{{print $2}}') /dev/vcs{0}".format(tty)
655
+
f"fold -w$(stty -F /dev/tty{tty} size | "
656
+
f"awk '{{print $2}}') /dev/vcs{tty}"
···
return len(matcher.findall(text)) > 0
684
-
with self.nested("waiting for {} to appear on tty {}".format(regexp, tty)):
675
+
with self.nested(f"waiting for {regexp} to appear on tty {tty}"):
def send_chars(self, chars: str, delay: Optional[float] = 0.01) -> None:
688
-
with self.nested("sending keys ‘{}‘".format(chars)):
679
+
with self.nested(f"sending keys ‘{chars}‘"):
self.send_key(char, delay)
···
"""Waits until the file exists in machine's file system."""
def check_file(_: Any) -> bool:
696
-
status, _ = self.execute("test -e {}".format(filename))
687
+
status, _ = self.execute(f"test -e {filename}")
699
-
with self.nested("waiting for file ‘{}‘".format(filename)):
690
+
with self.nested(f"waiting for file ‘{filename}‘"):
def wait_for_open_port(self, port: int, addr: str = "localhost") -> None:
def port_is_open(_: Any) -> bool:
704
-
status, _ = self.execute("nc -z {} {}".format(addr, port))
695
+
status, _ = self.execute(f"nc -z {addr} {port}")
707
-
with self.nested("waiting for TCP port {} on {}".format(port, addr)):
698
+
with self.nested(f"waiting for TCP port {port} on {addr}"):
def wait_for_closed_port(self, port: int, addr: str = "localhost") -> None:
def port_is_closed(_: Any) -> bool:
712
-
status, _ = self.execute("nc -z {} {}".format(addr, port))
703
+
status, _ = self.execute(f"nc -z {addr} {port}")
716
-
"waiting for TCP port {} on {} to be closed".format(port, addr)
706
+
with self.nested(f"waiting for TCP port {port} on {addr} to be closed"):
def start_job(self, jobname: str, user: Optional[str] = None) -> Tuple[int, str]:
721
-
return self.systemctl("start {}".format(jobname), user)
710
+
return self.systemctl(f"start {jobname}", user)
def stop_job(self, jobname: str, user: Optional[str] = None) -> Tuple[int, str]:
724
-
return self.systemctl("stop {}".format(jobname), user)
713
+
return self.systemctl(f"stop {jobname}", user)
def wait_for_job(self, jobname: str) -> None:
self.wait_for_unit(jobname)
···
self.log("connected to guest root shell")
744
-
self.log("(connecting took {:.2f} seconds)".format(toc - tic))
733
+
self.log(f"(connecting took {toc - tic:.2f} seconds)")
def screenshot(self, filename: str) -> None:
word_pattern = re.compile(r"^\w+$")
if word_pattern.match(filename):
750
-
filename = os.path.join(self.out_dir, "{}.png".format(filename))
751
-
tmp = "{}.ppm".format(filename)
739
+
filename = os.path.join(self.out_dir, f"{filename}.png")
740
+
tmp = f"{filename}.ppm"
754
-
"making screenshot {}".format(filename),
743
+
f"making screenshot {filename}",
{"image": os.path.basename(filename)},
757
-
self.send_monitor_command("screendump {}".format(tmp))
758
-
ret = subprocess.run("pnmtopng {} > {}".format(tmp, filename), shell=True)
746
+
self.send_monitor_command(f"screendump {tmp}")
747
+
ret = subprocess.run(f"pnmtopng {tmp} > {filename}", shell=True)
raise Exception("Cannot convert screenshot")
···
def dump_tty_contents(self, tty: str) -> None:
"""Debugging: Dump the contents of the TTY<n>"""
820
-
self.execute("fold -w 80 /dev/vcs{} | systemd-cat".format(tty))
809
+
self.execute(f"fold -w 80 /dev/vcs{tty} | systemd-cat")
def _get_screen_text_variants(self, model_ids: Iterable[int]) -> List[str]:
with tempfile.TemporaryDirectory() as tmpdir:
···
842
-
self.log("Last OCR attempt failed. Text was: {}".format(variants))
831
+
self.log(f"Last OCR attempt failed. Text was: {variants}")
846
-
with self.nested("waiting for {} to appear on screen".format(regex)):
835
+
with self.nested(f"waiting for {regex} to appear on screen"):
def wait_for_console_text(self, regex: str) -> None:
850
-
with self.nested("waiting for {} to appear on console".format(regex)):
839
+
with self.nested(f"waiting for {regex} to appear on console"):
# Buffer the console output, this is needed
# to match multiline regexes.
···
def send_key(self, key: str, delay: Optional[float] = 0.01) -> None:
key = CHAR_TO_KEY.get(key, key)
867
-
self.send_monitor_command("sendkey {}".format(key))
856
+
self.send_monitor_command(f"sendkey {key}")
···
self.pid = self.process.pid
926
-
self.log("QEMU running (pid {})".format(self.pid))
915
+
self.log(f"QEMU running (pid {self.pid})")
def cleanup_statedir(self) -> None:
shutil.rmtree(self.state_dir)
···
names = self.get_window_names()
980
-
"Last chance to match {} on the window list,".format(regexp)
969
+
f"Last chance to match {regexp} on the window list,"
+ " which currently contains: "
···
"""Forward a TCP port on the host to a TCP port on the guest.
Useful during interactive testing.
997
-
self.send_monitor_command(
998
-
"hostfwd_add tcp::{}-:{}".format(host_port, guest_port)
986
+
self.send_monitor_command(f"hostfwd_add tcp::{host_port}-:{guest_port}")
"""Make the machine unreachable by shutting down eth1 (the multicast