···
from contextlib import contextmanager
from typing import Any, Callable, ContextManager, Dict, Iterator, List, Optional, Union
···
polling_conditions: List[PollingCondition]
47
+
race_timer: threading.Timer
···
keep_vm_state: bool = False,
56
+
global_timeout: int = 24 * 60 * 60 * 7,
60
+
self.global_timeout = global_timeout
61
+
self.race_timer = threading.Timer(global_timeout, self.terminate_test)
···
def __exit__(self, *_: Any) -> None:
with rootlog.nested("cleanup"):
92
+
self.race_timer.cancel()
for machine in self.machines:
···
def run_tests(self) -> None:
"""Run the test script (for non-interactive test runs)"""
156
+
f"Test will time out and terminate in {self.global_timeout} seconds"
158
+
self.race_timer.start()
# TODO: Collect coverage data
for machine in self.machines:
···
with rootlog.nested("wait for all VMs to finish"):
for machine in self.machines:
machine.wait_for_shutdown()
176
+
self.race_timer.cancel()
178
+
def terminate_test(self) -> None:
179
+
# This will be usually running in another thread than
180
+
# the thread actually executing the test script.
181
+
with rootlog.nested("timeout reached; test terminating..."):
182
+
for machine in self.machines:
184
+
# As we cannot `sys.exit` from another thread
185
+
# We can at least force the main thread to get SIGTERM'ed.
186
+
# This will prevent any user who caught all the exceptions
187
+
# to swallow them and prevent itself from terminating.
188
+
os.kill(os.getpid(), signal.SIGTERM)
def create_machine(self, args: Dict[str, Any]) -> Machine: