···
from typing import Tuple, Any, Callable, Dict, Iterator, Optional, List, Iterable
from xml.sax.saxutils import XMLGenerator
from colorama import Style
7
+
from pathlib import Path
···
···
242
-
monitor_socket_path: pathlib.Path,
243
-
shell_socket_path: pathlib.Path,
242
+
monitor_socket_path: Path,
243
+
shell_socket_path: Path,
allow_reboot: bool = False, # TODO: unused, legacy?
···
275
-
state_dir: pathlib.Path,
276
-
shared_dir: pathlib.Path,
# We make a copy to not update the current environment
···
291
-
state_dir: pathlib.Path,
292
-
shared_dir: pathlib.Path,
293
-
monitor_socket_path: pathlib.Path,
294
-
shell_socket_path: pathlib.Path,
293
+
monitor_socket_path: Path,
294
+
shell_socket_path: Path,
self.cmd(monitor_socket_path, shell_socket_path),
···
netBackendArgs: Optional[str] = None,
netFrontendArgs: Optional[str] = None,
337
-
hda: Optional[Tuple[pathlib.Path, str]] = None,
337
+
hda: Optional[Tuple[Path, str]] = None,
cdrom: Optional[str] = None,
usb: Optional[str] = None,
bios: Optional[str] = None,
···
the machine lifecycle with the help of a start script / command."""
397
-
tmp_dir: pathlib.Path
398
-
shared_dir: pathlib.Path
399
-
state_dir: pathlib.Path
400
-
monitor_path: pathlib.Path
401
-
shell_path: pathlib.Path
start_command: StartCommand
···
424
-
tmp_dir: pathlib.Path,
start_command: StartCommand,
keep_vm_state: bool = False,
···
hda_arg: str = args.get("hda", "")
466
-
hda_arg_path: pathlib.Path = pathlib.Path(hda_arg)
466
+
hda_arg_path: Path = Path(hda_arg)
hda = (hda_arg_path, args.get("hdaInterface", ""))
return LegacyStartCommand(
netBackendArgs=args.get("netBackendArgs"),
···
"""Copy a file from the host into the guest via the `shared_dir` shared
among all the VMs (using a temporary directory).
817
-
host_src = pathlib.Path(source)
818
-
vm_target = pathlib.Path(target)
817
+
host_src = Path(source)
818
+
vm_target = Path(target)
with tempfile.TemporaryDirectory(dir=self.shared_dir) as shared_td:
820
-
shared_temp = pathlib.Path(shared_td)
820
+
shared_temp = Path(shared_td)
host_intermediate = shared_temp / host_src.name
822
-
vm_shared_temp = pathlib.Path("/tmp/shared") / shared_temp.name
822
+
vm_shared_temp = Path("/tmp/shared") / shared_temp.name
vm_intermediate = vm_shared_temp / host_src.name
self.succeed(make_command(["mkdir", "-p", vm_shared_temp]))
···
all the VMs (using a temporary directory).
# Compute the source, target, and intermediate shared file names
839
-
out_dir = pathlib.Path(os.environ.get("out", os.getcwd()))
840
-
vm_src = pathlib.Path(source)
839
+
out_dir = Path(os.environ.get("out", os.getcwd()))
840
+
vm_src = Path(source)
with tempfile.TemporaryDirectory(dir=self.shared_dir) as shared_td:
842
-
shared_temp = pathlib.Path(shared_td)
843
-
vm_shared_temp = pathlib.Path("/tmp/shared") / shared_temp.name
842
+
shared_temp = Path(shared_td)
843
+
vm_shared_temp = Path("/tmp/shared") / shared_temp.name
vm_intermediate = vm_shared_temp / vm_src.name
intermediate = shared_temp / vm_src.name
# Copy the file to the shared directory inside VM
···
914
-
def clear(path: pathlib.Path) -> pathlib.Path:
914
+
def clear(path: Path) -> Path:
919
-
def create_socket(path: pathlib.Path) -> socket.socket:
919
+
def create_socket(path: Path) -> socket.socket:
s = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM)
···
1064
-
socket_dir: pathlib.Path
process: subprocess.Popen
···
def __repr__(self) -> str:
return f"<Vlan Nr. {self.nr}>"
1073
-
def __init__(self, nr: int, tmp_dir: pathlib.Path):
1073
+
def __init__(self, nr: int, tmp_dir: Path):
self.socket_dir = tmp_dir / f"vde{self.nr}.ctl"
···
1126
-
tmp_dir = pathlib.Path(os.environ.get("TMPDIR", tempfile.gettempdir()))
1126
+
tmp_dir = Path(os.environ.get("TMPDIR", tempfile.gettempdir()))
tmp_dir.mkdir(mode=0o700, exist_ok=True)
with rootlog.nested("start all VLans"):
···
"Using legacy create_machine(), please instantiate the"
"Machine class directly, instead"
1235
-
tmp_dir = pathlib.Path(os.environ.get("TMPDIR", tempfile.gettempdir()))
1235
+
tmp_dir = Path(os.environ.get("TMPDIR", tempfile.gettempdir()))
tmp_dir.mkdir(mode=0o700, exist_ok=True)
if args.get("startCommand"):
···
help="the test script to run",
1321
-
type=pathlib.Path,
args = arg_parser.parse_args()