···
···
92
-
global log, machines, test_script
95
-
def eprint(*args: object, **kwargs: Any) -> None:
96
-
print(*args, file=sys.stderr, **kwargs)
99
-
def make_command(args: list) -> str:
100
-
return " ".join(map(shlex.quote, (map(str, args))))
103
-
def create_vlan(vlan_nr: str) -> Tuple[str, str, "subprocess.Popen[bytes]", Any]:
104
-
log.log("starting VDE switch for network {}".format(vlan_nr))
105
-
vde_socket = tempfile.mkdtemp(
106
-
prefix="nixos-test-vde-", suffix="-vde{}.ctl".format(vlan_nr)
108
-
pty_master, pty_slave = pty.openpty()
109
-
vde_process = subprocess.Popen(
110
-
["vde_switch", "-s", vde_socket, "--dirmode", "0700"],
112
-
stdout=subprocess.PIPE,
113
-
stderr=subprocess.PIPE,
116
-
fd = os.fdopen(pty_master, "w")
117
-
fd.write("version\n")
118
-
# TODO: perl version checks if this can be read from
119
-
# an if not, dies. we could hang here forever. Fix it.
120
-
assert vde_process.stdout is not None
121
-
vde_process.stdout.readline()
122
-
if not os.path.exists(os.path.join(vde_socket, "ctl")):
123
-
raise Exception("cannot start vde_switch")
125
-
return (vlan_nr, vde_socket, vde_process, fd)
128
-
def retry(fn: Callable, timeout: int = 900) -> None:
129
-
"""Call the given function repeatedly, with 1 second intervals,
130
-
until it returns True or a timeout is reached.
133
-
for _ in range(timeout):
139
-
raise Exception(f"action timed out after {timeout} seconds")
def __init__(self) -> None:
···
self.xml.startElement("logfile", attrs={})
self._print_serial_logs = True
105
+
def _eprint(*args: object, **kwargs: Any) -> None:
106
+
print(*args, file=sys.stderr, **kwargs)
self.xml.endElement("logfile")
···
self.xml.characters(message)
self.xml.endElement("line")
126
+
def info(self, *args, **kwargs) -> None: # type: ignore
127
+
self.log(*args, **kwargs)
129
+
def warning(self, *args, **kwargs) -> None: # type: ignore
130
+
self.log(*args, **kwargs)
132
+
def error(self, *args, **kwargs) -> None: # type: ignore
133
+
self.log(*args, **kwargs)
def log(self, message: str, attributes: Dict[str, str] = {}) -> None:
173
-
eprint(self.maybe_prefix(message, attributes))
137
+
self._eprint(self.maybe_prefix(message, attributes))
self.log_line(message, attributes)
def log_serial(self, message: str, machine: str) -> None:
self.enqueue({"msg": message, "machine": machine, "type": "serial"})
if self._print_serial_logs:
180
-
eprint(Style.DIM + "{} # {}".format(machine, message) + Style.RESET_ALL)
145
+
Style.DIM + "{} # {}".format(machine, message) + Style.RESET_ALL
def enqueue(self, item: Dict[str, str]) -> None:
···
def nested(self, message: str, attributes: Dict[str, str] = {}) -> Iterator[None]:
197
-
eprint(self.maybe_prefix(message, attributes))
163
+
self._eprint(self.maybe_prefix(message, attributes))
self.xml.startElement("nest", attrs={})
self.xml.startElement("head", attributes)
···
self.xml.endElement("nest")
183
+
def make_command(args: list) -> str:
184
+
return " ".join(map(shlex.quote, (map(str, args))))
187
+
def retry(fn: Callable, timeout: int = 900) -> None:
188
+
"""Call the given function repeatedly, with 1 second intervals,
189
+
until it returns True or a timeout is reached.
192
+
for _ in range(timeout):
198
+
raise Exception(f"action timed out after {timeout} seconds")
def _perform_ocr_on_screenshot(
screenshot_path: str, model_ids: Iterable[int]
···
246
-
def __repr__(self) -> str:
247
-
return f"<Machine '{self.name}'>"
249
-
def __init__(self, args: Dict[str, Any]) -> None:
251
-
self.name = args["name"]
253
-
self.name = "machine"
254
-
cmd = args.get("startCommand", None)
256
-
match = re.search("run-(.+)-vm$", cmd)
258
-
self.name = match.group(1)
259
-
self.logger = args["log"]
260
-
self.script = args.get("startCommand", self.create_startcommand(args))
232
+
class StartCommand:
233
+
"""The Base Start Command knows how to append the necesary
234
+
runtime qemu options as determined by a particular test driver
235
+
run. Any such start command is expected to happily receive and
236
+
append additional qemu args.
262
-
tmp_dir = os.environ.get("TMPDIR", tempfile.gettempdir())
264
-
def create_dir(name: str) -> str:
265
-
path = os.path.join(tmp_dir, name)
266
-
os.makedirs(path, mode=0o700, exist_ok=True)
243
+
monitor_socket_path: pathlib.Path,
244
+
shell_socket_path: pathlib.Path,
245
+
allow_reboot: bool = False, # TODO: unused, legacy?
248
+
display_available = any(x in os.environ for x in ["DISPLAY", "WAYLAND_DISPLAY"])
249
+
if not display_available:
250
+
display_opts += " -nographic"
269
-
self.state_dir = os.path.join(tmp_dir, f"vm-state-{self.name}")
270
-
if not args.get("keepVmState", False):
271
-
self.cleanup_statedir()
272
-
os.makedirs(self.state_dir, mode=0o700, exist_ok=True)
273
-
self.shared_dir = create_dir("shared-xchg")
258
+
" -device virtio-serial"
259
+
" -device virtconsole,chardev=shell"
260
+
" -device virtio-rng-pci"
263
+
# TODO: qemu script already catpures this env variable, legacy?
264
+
qemu_opts += " " + os.environ.get("QEMU_OPTS", "")
275
-
self.booted = False
276
-
self.connected = False
277
-
self.pid: Optional[int] = None
279
-
self.monitor: Optional[socket.socket] = None
280
-
self.allow_reboot = args.get("allowReboot", False)
268
+
f" -monitor unix:{monitor_socket_path}"
269
+
f" -chardev socket,id=shell,path={shell_socket_path}"
283
-
def create_startcommand(args: Dict[str, str]) -> str:
284
-
net_backend = "-netdev user,id=net0"
285
-
net_frontend = "-device virtio-net-pci,netdev=net0"
275
+
def build_environment(
276
+
state_dir: pathlib.Path,
277
+
shared_dir: pathlib.Path,
279
+
# We make a copy to not update the current environment
280
+
env = dict(os.environ)
283
+
"TMPDIR": str(state_dir),
284
+
"SHARED_DIR": str(shared_dir),
287
-
if "netBackendArgs" in args:
288
-
net_backend += "," + args["netBackendArgs"]
292
+
state_dir: pathlib.Path,
293
+
shared_dir: pathlib.Path,
294
+
monitor_socket_path: pathlib.Path,
295
+
shell_socket_path: pathlib.Path,
296
+
) -> subprocess.Popen:
297
+
return subprocess.Popen(
298
+
self.cmd(monitor_socket_path, shell_socket_path),
299
+
stdin=subprocess.DEVNULL,
300
+
stdout=subprocess.PIPE,
301
+
stderr=subprocess.STDOUT,
304
+
env=self.build_environment(state_dir, shared_dir),
308
+
class NixStartScript(StartCommand):
309
+
"""A start script from nixos/modules/virtualiation/qemu-vm.nix
310
+
that also satisfies the requirement of the BaseStartCommand.
311
+
These Nix commands have the particular charactersitic that the
312
+
machine name can be extracted out of them via a regex match.
313
+
(Admittedly a _very_ implicit contract, evtl. TODO fix)
316
+
def __init__(self, script: str):
290
-
if "netFrontendArgs" in args:
291
-
net_frontend += "," + args["netFrontendArgs"]
320
+
def machine_name(self) -> str:
321
+
match = re.search("run-(.+)-vm$", self._cmd)
324
+
name = match.group(1)
294
-
args.get("qemuBinary", "qemu-kvm")
328
+
class LegacyStartCommand(StartCommand):
329
+
"""Used in some places to create an ad-hoc machine instead of
330
+
using nix test instrumentation + module system for that purpose.
303
-
hda_path = os.path.abspath(args["hda"])
304
-
if args.get("hdaInterface", "") == "scsi":
306
-
"-drive id=hda,file="
308
-
+ ",werror=report,if=none "
309
-
+ "-device scsi-hd,drive=hda "
336
+
netBackendArgs: Optional[str] = None,
337
+
netFrontendArgs: Optional[str] = None,
338
+
hda: Optional[Tuple[pathlib.Path, str]] = None,
339
+
cdrom: Optional[str] = None,
340
+
usb: Optional[str] = None,
341
+
bios: Optional[str] = None,
342
+
qemuFlags: Optional[str] = None,
344
+
self._cmd = "qemu-kvm -m 384"
347
+
net_backend = "-netdev user,id=net0"
348
+
net_frontend = "-device virtio-net-pci,netdev=net0"
349
+
if netBackendArgs is not None:
350
+
net_backend += "," + netBackendArgs
351
+
if netFrontendArgs is not None:
352
+
net_frontend += "," + netFrontendArgs
353
+
self._cmd += f" {net_backend} {net_frontend}"
357
+
if hda is not None:
358
+
hda_path = hda[0].resolve()
359
+
hda_interface = hda[1]
360
+
if hda_interface == "scsi":
362
+
f" -drive id=hda,file={hda_path},werror=report,if=none"
363
+
" -device scsi-hd,drive=hda"
316
-
+ args["hdaInterface"]
317
-
+ ",werror=report "
366
+
hda_cmd += f" -drive file={hda_path},if={hda_interface},werror=report"
367
+
self._cmd += hda_cmd
320
-
if "cdrom" in args:
321
-
start_command += "-cdrom " + args["cdrom"] + " "
370
+
if cdrom is not None:
371
+
self._cmd += f" -cdrom {cdrom}"
375
+
if usb is not None:
# https://github.com/qemu/qemu/blob/master/docs/usb2.txt
326
-
"-device usb-ehci -drive "
327
-
+ "id=usbdisk,file="
329
-
+ ",if=none,readonly "
330
-
+ "-device usb-storage,drive=usbdisk "
378
+
" -device usb-ehci"
379
+
f" -drive id=usbdisk,file={usb},if=none,readonly"
380
+
" -device usb-storage,drive=usbdisk "
333
-
start_command += "-bios " + args["bios"] + " "
382
+
self._cmd += usb_cmd
385
+
if bios is not None:
386
+
self._cmd += f" -bios {bios}"
389
+
if qemuFlags is not None:
390
+
self._cmd += f" {qemuFlags}"
394
+
"""A handle to the machine with this name, that also knows how to manage
395
+
the machine lifecycle with the help of a start script / command."""
398
+
tmp_dir: pathlib.Path
399
+
shared_dir: pathlib.Path
400
+
state_dir: pathlib.Path
401
+
monitor_path: pathlib.Path
402
+
shell_path: pathlib.Path
335
-
start_command += args.get("qemuFlags", "")
404
+
start_command: StartCommand
405
+
keep_vm_state: bool
337
-
return start_command
408
+
process: Optional[subprocess.Popen] = None
409
+
pid: Optional[int] = None
410
+
monitor: Optional[socket.socket] = None
411
+
shell: Optional[socket.socket] = None
413
+
booted: bool = False
414
+
connected: bool = False
415
+
# Store last serial console lines for use
416
+
# of wait_for_console_text
417
+
last_lines: Queue = Queue()
419
+
def __repr__(self) -> str:
420
+
return f"<Machine '{self.name}'>"
424
+
tmp_dir: pathlib.Path,
425
+
start_command: StartCommand,
426
+
name: str = "machine",
427
+
keep_vm_state: bool = False,
428
+
allow_reboot: bool = False,
430
+
self.tmp_dir = tmp_dir
431
+
self.keep_vm_state = keep_vm_state
432
+
self.allow_reboot = allow_reboot
434
+
self.start_command = start_command
436
+
# set up directories
437
+
self.shared_dir = self.tmp_dir / "shared-xchg"
438
+
self.shared_dir.mkdir(mode=0o700, exist_ok=True)
440
+
self.state_dir = self.tmp_dir / f"vm-state-{self.name}"
441
+
self.monitor_path = self.state_dir / "monitor"
442
+
self.shell_path = self.state_dir / "shell"
443
+
if (not self.keep_vm_state) and self.state_dir.exists():
444
+
self.cleanup_statedir()
445
+
self.state_dir.mkdir(mode=0o700, exist_ok=True)
448
+
def create_startcommand(args: Dict[str, str]) -> StartCommand:
450
+
"Using legacy create_startcommand(),"
451
+
"please use proper nix test vm instrumentation, instead"
452
+
"to generate the appropriate nixos test vm qemu startup script"
455
+
if args.get("hda"):
456
+
hda_arg: str = args.get("hda", "")
457
+
hda_arg_path: pathlib.Path = pathlib.Path(hda_arg)
458
+
hda = (hda_arg_path, args.get("hdaInterface", ""))
459
+
return LegacyStartCommand(
460
+
netBackendArgs=args.get("netBackendArgs"),
461
+
netFrontendArgs=args.get("netFrontendArgs"),
463
+
cdrom=args.get("cdrom"),
464
+
usb=args.get("usb"),
465
+
bios=args.get("bios"),
466
+
qemuFlags=args.get("qemuFlags"),
return self.booted and self.connected
def log(self, msg: str) -> None:
343
-
self.logger.log(msg, {"machine": self.name})
473
+
rootlog.log(msg, {"machine": self.name})
def log_serial(self, msg: str) -> None:
346
-
self.logger.log_serial(msg, self.name)
476
+
rootlog.log_serial(msg, self.name)
def nested(self, msg: str, attrs: Dict[str, str] = {}) -> _GeneratorContextManager:
my_attrs = {"machine": self.name}
351
-
return self.logger.nested(msg, my_attrs)
481
+
return rootlog.nested(msg, my_attrs)
def wait_for_monitor_prompt(self) -> str:
assert self.monitor is not None
···
out_command = "( set -euo pipefail; {} ); echo '|!=EOF' $?\n".format(command)
self.shell.send(out_command.encode())
···
Should only be used during test development, not in the production test."""
self.log("Terminal is ready (there is no prompt):")
["socat", "READLINE", f"FD:{self.shell.fileno()}"],
pass_fds=[self.shell.fileno()],
···
with self.nested("waiting for the VM to power off"):
670
+
assert self.process
···
with self.nested("waiting for the VM to finish booting"):
···
753
-
def create_socket(path: str) -> socket.socket:
754
-
if os.path.exists(path):
889
+
def clear(path: pathlib.Path) -> pathlib.Path:
894
+
def create_socket(path: pathlib.Path) -> socket.socket:
s = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM)
761
-
monitor_path = os.path.join(self.state_dir, "monitor")
762
-
self.monitor_socket = create_socket(monitor_path)
764
-
shell_path = os.path.join(self.state_dir, "shell")
765
-
self.shell_socket = create_socket(shell_path)
767
-
display_available = any(x in os.environ for x in ["DISPLAY", "WAYLAND_DISPLAY"])
771
-
"" if self.allow_reboot else "-no-reboot",
772
-
"-monitor unix:{}".format(monitor_path),
773
-
"-chardev socket,id=shell,path={}".format(shell_path),
774
-
"-device virtio-serial",
775
-
"-device virtconsole,chardev=shell",
776
-
"-device virtio-rng-pci",
777
-
"-serial stdio" if display_available else "-nographic",
781
-
+ os.environ.get("QEMU_OPTS", "")
784
-
environment = dict(os.environ)
785
-
environment.update(
787
-
"TMPDIR": self.state_dir,
788
-
"SHARED_DIR": self.shared_dir,
790
-
"QEMU_OPTS": qemu_options,
794
-
self.process = subprocess.Popen(
796
-
stdin=subprocess.DEVNULL,
797
-
stdout=subprocess.PIPE,
798
-
stderr=subprocess.STDOUT,
800
-
cwd=self.state_dir,
900
+
monitor_socket = create_socket(clear(self.monitor_path))
901
+
shell_socket = create_socket(clear(self.shell_path))
902
+
self.process = self.start_command.run(
803
-
self.monitor, _ = self.monitor_socket.accept()
804
-
self.shell, _ = self.shell_socket.accept()
908
+
self.monitor, _ = monitor_socket.accept()
909
+
self.shell, _ = shell_socket.accept()
# Store last serial console lines for use
# of wait_for_console_text
self.last_lines: Queue = Queue()
def process_serial_output() -> None:
811
-
assert self.process.stdout is not None
916
+
assert self.process
917
+
assert self.process.stdout
for _line in self.process.stdout:
# Ignore undecodable bytes that may occur in boot menus
line = _line.decode(errors="ignore").replace("\r", "").rstrip()
···
self.log("QEMU running (pid {})".format(self.pid))
def cleanup_statedir(self) -> None:
828
-
if os.path.isdir(self.state_dir):
829
-
shutil.rmtree(self.state_dir)
830
-
self.logger.log(f"deleting VM state directory {self.state_dir}")
831
-
self.logger.log("if you want to keep the VM state, pass --keep-vm-state")
934
+
shutil.rmtree(self.state_dir)
935
+
rootlog.log(f"deleting VM state directory {self.state_dir}")
936
+
rootlog.log("if you want to keep the VM state, pass --keep-vm-state")
def shutdown(self) -> None:
self.shell.send("poweroff\n".encode())
···
"""Make the machine reachable."""
self.send_monitor_command("set_link virtio-net-pci.1 on")
1017
+
def release(self) -> None:
1018
+
if self.pid is None:
1020
+
rootlog.info(f"kill machine (pid {self.pid})")
1021
+
assert self.process
1023
+
assert self.monitor
1024
+
self.process.terminate()
1025
+
self.shell.close()
1026
+
self.monitor.close()
912
-
def create_machine(args: Dict[str, Any]) -> Machine:
914
-
return Machine(args)
1030
+
"""This class handles a VLAN that the run-vm scripts identify via its
1031
+
number handles. The network's lifetime equals the object's lifetime.
917
-
def start_all() -> None:
918
-
with log.nested("starting all VMs"):
919
-
for machine in machines:
1035
+
socket_dir: pathlib.Path
1037
+
process: subprocess.Popen
1041
+
def __repr__(self) -> str:
1042
+
return f"<Vlan Nr. {self.nr}>"
1044
+
def __init__(self, nr: int, tmp_dir: pathlib.Path):
1046
+
self.socket_dir = tmp_dir / f"vde{self.nr}.ctl"
1048
+
# TODO: don't side-effect environment here
1049
+
os.environ[f"QEMU_VDE_SOCKET_{self.nr}"] = str(self.socket_dir)
923
-
def join_all() -> None:
924
-
with log.nested("waiting for all VMs to finish"):
925
-
for machine in machines:
926
-
machine.wait_for_shutdown()
1051
+
rootlog.info("start vlan")
1052
+
pty_master, pty_slave = pty.openpty()
1054
+
self.process = subprocess.Popen(
1055
+
["vde_switch", "-s", self.socket_dir, "--dirmode", "0700"],
1057
+
stdout=subprocess.PIPE,
1058
+
stderr=subprocess.PIPE,
1061
+
self.pid = self.process.pid
1062
+
self.fd = os.fdopen(pty_master, "w")
1063
+
self.fd.write("version\n")
1065
+
# TODO: perl version checks if this can be read from
1066
+
# an if not, dies. we could hang here forever. Fix it.
1067
+
assert self.process.stdout is not None
1068
+
self.process.stdout.readline()
1069
+
if not (self.socket_dir / "ctl").exists():
1070
+
rootlog.error("cannot start vde_switch")
929
-
def run_tests(interactive: bool = False) -> None:
931
-
ptpython.repl.embed(test_symbols(), {})
1072
+
rootlog.info(f"running vlan (pid {self.pid})")
1074
+
def __del__(self) -> None:
1075
+
rootlog.info(f"kill vlan (pid {self.pid})")
1077
+
self.process.terminate()
1081
+
"""A handle to the driver that sets up the environment
1082
+
and runs the tests"""
1086
+
machines: List[Machine]
1090
+
start_scripts: List[str],
1093
+
keep_vm_state: bool = False,
1095
+
self.tests = tests
1097
+
tmp_dir = pathlib.Path(os.environ.get("TMPDIR", tempfile.gettempdir()))
1098
+
tmp_dir.mkdir(mode=0o700, exist_ok=True)
1100
+
with rootlog.nested("start all VLans"):
1101
+
self.vlans = [VLan(nr, tmp_dir) for nr in vlans]
1103
+
def cmd(scripts: List[str]) -> Iterator[NixStartScript]:
1105
+
yield NixStartScript(s)
1109
+
start_command=cmd,
1110
+
keep_vm_state=keep_vm_state,
1111
+
name=cmd.machine_name,
1114
+
for cmd in cmd(start_scripts)
1118
+
def clean_up() -> None:
1119
+
with rootlog.nested("clean up"):
1120
+
for machine in self.machines:
1123
+
def subtest(self, name: str) -> Iterator[None]:
1124
+
"""Group logs under a given test name"""
1125
+
with rootlog.nested(name):
1130
+
rootlog.error(f'Test "{name}" failed with error:')
1133
+
def test_symbols(self) -> Dict[str, Any]:
1135
+
def subtest(name: str) -> Iterator[None]:
1136
+
return self.subtest(name)
1138
+
general_symbols = dict(
1139
+
start_all=self.start_all,
1140
+
test_script=self.test_script,
1141
+
machines=self.machines,
1146
+
create_machine=self.create_machine,
1148
+
run_tests=self.run_tests,
1149
+
join_all=self.join_all,
1151
+
serial_stdout_off=self.serial_stdout_off,
1152
+
serial_stdout_on=self.serial_stdout_on,
1153
+
Machine=Machine, # for typing
1155
+
machine_symbols = {
1156
+
m.name: self.machines[idx] for idx, m in enumerate(self.machines)
1159
+
f"vlan{v.nr}": self.vlans[idx] for idx, v in enumerate(self.vlans)
1162
+
"additionally exposed symbols:\n "
1163
+
+ ", ".join(map(lambda m: m.name, self.machines))
1165
+
+ ", ".join(map(lambda v: f"vlan{v.nr}", self.vlans))
1167
+
+ ", ".join(list(general_symbols.keys()))
1169
+
return {**general_symbols, **machine_symbols, **vlan_symbols}
1171
+
def test_script(self) -> None:
1172
+
"""Run the test script"""
1173
+
with rootlog.nested("run the VM test script"):
1174
+
symbols = self.test_symbols() # call eagerly
1175
+
exec(self.tests, symbols, None)
1177
+
def run_tests(self) -> None:
1178
+
"""Run the test script (for non-interactive test runs)"""
1179
+
self.test_script()
# TODO: Collect coverage data
935
-
for machine in machines:
1181
+
for machine in self.machines:
1185
+
def start_all(self) -> None:
1186
+
"""Start all machines"""
1187
+
with rootlog.nested("start all VMs"):
1188
+
for machine in self.machines:
940
-
def serial_stdout_on() -> None:
941
-
log._print_serial_logs = True
1191
+
def join_all(self) -> None:
1192
+
"""Wait for all machines to shut down"""
1193
+
with rootlog.nested("wait for all VMs to finish"):
1194
+
for machine in self.machines:
1195
+
machine.wait_for_shutdown()
1197
+
def create_machine(self, args: Dict[str, Any]) -> Machine:
1199
+
"Using legacy create_machine(), please instantiate the"
1200
+
"Machine class directly, instead"
1202
+
tmp_dir = pathlib.Path(os.environ.get("TMPDIR", tempfile.gettempdir()))
1203
+
tmp_dir.mkdir(mode=0o700, exist_ok=True)
1205
+
if args.get("startCommand"):
1206
+
start_command: str = args.get("startCommand", "")
1207
+
cmd = NixStartScript(start_command)
1208
+
name = args.get("name", cmd.machine_name)
1210
+
cmd = Machine.create_startcommand(args) # type: ignore
1211
+
name = args.get("name", "machine")
944
-
def serial_stdout_off() -> None:
945
-
log._print_serial_logs = False
1215
+
start_command=cmd,
1217
+
keep_vm_state=args.get("keep_vm_state", False),
1218
+
allow_reboot=args.get("allow_reboot", False),
1221
+
def serial_stdout_on(self) -> None:
1222
+
rootlog._print_serial_logs = True
1224
+
def serial_stdout_off(self) -> None:
1225
+
rootlog._print_serial_logs = False
class EnvDefault(argparse.Action):
···
setattr(namespace, self.dest, values)
974
-
def subtest(name: str) -> Iterator[None]:
975
-
with log.nested(name):
979
-
except Exception as e:
980
-
log.log(f'Test "{name}" failed with error: "{e}"')
986
-
def _test_symbols() -> Dict[str, Any]:
987
-
general_symbols = dict(
988
-
start_all=start_all,
989
-
test_script=globals().get("test_script"), # same
990
-
machines=globals().get("machines"), # without being initialized
991
-
log=globals().get("log"), # extracting those symbol keys
993
-
create_machine=create_machine,
995
-
run_tests=run_tests,
998
-
serial_stdout_off=serial_stdout_off,
999
-
serial_stdout_on=serial_stdout_on,
1000
-
Machine=Machine, # for typing
1002
-
return general_symbols
1005
-
def test_symbols() -> Dict[str, Any]:
1007
-
general_symbols = _test_symbols()
1009
-
machine_symbols = {m.name: machines[idx] for idx, m in enumerate(machines)}
1011
-
"additionally exposed symbols:\n "
1012
-
+ ", ".join(map(lambda m: m.name, machines))
1014
-
+ ", ".join(list(general_symbols.keys()))
1016
-
return {**general_symbols, **machine_symbols}
if __name__ == "__main__":
arg_parser = argparse.ArgumentParser(prog="nixos-test-driver")
···
args = arg_parser.parse_args()
1058
-
testscript = pathlib.Path(args.testscript).read_text()
1060
-
global log, machines, test_script
1293
+
if not args.keep_vm_state:
1294
+
rootlog.info("Machine state will be reset. To keep it, pass --keep-vm-state")
1297
+
args.start_scripts, args.vlans, args.testscript.read_text(), args.keep_vm_state
1064
-
vde_sockets = [create_vlan(v) for v in args.vlans]
1065
-
for nr, vde_socket, _, _ in vde_sockets:
1066
-
os.environ["QEMU_VDE_SOCKET_{}".format(nr)] = vde_socket
1069
-
create_machine({"startCommand": s, "keepVmState": args.keep_vm_state})
1070
-
for s in args.start_scripts
1073
-
"{0} = machines[{1}]".format(m.name, idx) for idx, m in enumerate(machines)
1075
-
exec("\n".join(machine_eval))
1078
-
def clean_up() -> None:
1079
-
with log.nested("cleaning up"):
1080
-
for machine in machines:
1081
-
if machine.pid is None:
1083
-
log.log("killing {} (pid {})".format(machine.name, machine.pid))
1084
-
machine.process.kill()
1085
-
for _, _, process, _ in vde_sockets:
1086
-
process.terminate()
1089
-
def test_script() -> None:
1090
-
with log.nested("running the VM test script"):
1091
-
symbols = test_symbols() # call eagerly
1092
-
exec(testscript, symbols, None)
1094
-
interactive = args.interactive or (not bool(testscript))
1096
-
run_tests(interactive)
1098
-
print("test script finished in {:.2f}s".format(toc - tic))
1300
+
if args.interactive:
1301
+
ptpython.repl.embed(driver.test_symbols(), {})
1304
+
driver.run_tests()
1306
+
rootlog.info(f"test script finished in {(toc-tic):.2f}s")