···
from queue import Queue, Empty
from typing import Tuple, Any, Callable, Dict, Iterator, Optional, List, Iterable
from xml.sax.saxutils import XMLGenerator
···
self.xml.startElement("logfile", attrs={})
self.xml.endElement("logfile")
···
self.log_line(message, attributes)
-
def enqueue(self, message: Dict[str, str]) -> None:
-
self.queue.put(message)
def drain_log_queue(self) -> None:
item = self.queue.get_nowait()
-
attributes = {"machine": item["machine"], "type": "serial"}
-
self.log_line(self.sanitise(item["msg"]), attributes)
···
def log(self, msg: str) -> None:
self.logger.log(msg, {"machine": self.name})
def nested(self, msg: str, attrs: Dict[str, str] = {}) -> _GeneratorContextManager:
my_attrs = {"machine": self.name}
···
# Ignore undecodable bytes that may occur in boot menus
line = _line.decode(errors="ignore").replace("\r", "").rstrip()
self.last_lines.put(line)
-
eprint("{} # {}".format(self.name, line))
-
self.logger.enqueue({"msg": line, "machine": self.name})
_thread.start_new_thread(process_serial_output, ())
···
···
from queue import Queue, Empty
from typing import Tuple, Any, Callable, Dict, Iterator, Optional, List, Iterable
from xml.sax.saxutils import XMLGenerator
+
from colorama import Style
···
self.xml.startElement("logfile", attrs={})
+
self._print_serial_logs = True
self.xml.endElement("logfile")
···
self.log_line(message, attributes)
+
def log_serial(self, message: str, machine: str) -> None:
+
self.enqueue({"msg": message, "machine": machine, "type": "serial"})
+
if self._print_serial_logs:
+
eprint(Style.DIM + "{} # {}".format(machine, message) + Style.RESET_ALL)
+
def enqueue(self, item: Dict[str, str]) -> None:
def drain_log_queue(self) -> None:
item = self.queue.get_nowait()
+
msg = self.sanitise(item["msg"])
+
self.log_line(msg, item)
···
def log(self, msg: str) -> None:
self.logger.log(msg, {"machine": self.name})
+
def log_serial(self, msg: str) -> None:
+
self.logger.log_serial(msg, self.name)
def nested(self, msg: str, attrs: Dict[str, str] = {}) -> _GeneratorContextManager:
my_attrs = {"machine": self.name}
···
# Ignore undecodable bytes that may occur in boot menus
line = _line.decode(errors="ignore").replace("\r", "").rstrip()
self.last_lines.put(line)
_thread.start_new_thread(process_serial_output, ())
···
+
def serial_stdout_on() -> None:
+
log._print_serial_logs = True
+
def serial_stdout_off() -> None:
+
log._print_serial_logs = False