1import os
2import shutil
3import subprocess
4from concurrent.futures import Future, ThreadPoolExecutor
5from pathlib import Path
6
7from test_driver.errors import MachineError
8
9
10def perform_ocr_on_screenshot(screenshot_path: Path) -> str:
11 """
12 Perform OCR on a screenshot that contains text.
13 Returns a string with all words that could be found.
14 """
15 return perform_ocr_variants_on_screenshot(screenshot_path, False)[0]
16
17
18def perform_ocr_variants_on_screenshot(
19 screenshot_path: Path, variants: bool = True
20) -> list[str]:
21 """
22 Same as perform_ocr_on_screenshot but will create variants of the images
23 that can lead to more words being detected.
24 Returns a string with words for each variant.
25 """
26 if shutil.which("tesseract") is None:
27 raise MachineError("OCR requested but `tesseract` is not available")
28
29 # Tesseract runs parallel on up to 4 cores.
30 # Docs suggest to run it with OMP_THREAD_LIMIT=1 for hundreds of parallel
31 # runs. Our average test run is somewhere inbetween.
32 # https://github.com/tesseract-ocr/tesseract/issues/3109
33 nix_cores: str | None = os.environ.get("NIX_BUILD_CORES")
34 cores: int = os.cpu_count() or 1 if nix_cores is None else int(nix_cores)
35 workers: int = max(1, int(cores / 4))
36
37 with ThreadPoolExecutor(max_workers=workers) as e:
38 # The idea here is to let the first tesseract call run on the raw image
39 # while the other two are preprocessed + tesseracted in parallel
40 future_results: list[Future] = [e.submit(_run_tesseract, screenshot_path)]
41 if variants:
42
43 def tesseract_processed(inverted: bool) -> str:
44 return _run_tesseract(_preprocess_screenshot(screenshot_path, inverted))
45
46 future_results.append(e.submit(tesseract_processed, False))
47 future_results.append(e.submit(tesseract_processed, True))
48 return [future.result() for future in future_results]
49
50
51def _run_tesseract(image: Path) -> str:
52 # tesseract --help-oem
53 # OCR Engine modes (OEM):
54 # 0|tesseract_only Legacy engine only.
55 # 1|lstm_only Neural nets LSTM engine only.
56 # 2|tesseract_lstm_combined Legacy + LSTM engines.
57 # 3|default Default, based on what is available.
58 ocr_engine_mode = 2
59
60 ret = subprocess.run(
61 [
62 "tesseract",
63 image,
64 "-",
65 "--oem",
66 str(ocr_engine_mode),
67 "-c",
68 "debug_file=/dev/null",
69 "--psm",
70 "11",
71 ],
72 capture_output=True,
73 )
74 if ret.returncode != 0:
75 raise MachineError(f"OCR failed with exit code {ret.returncode}")
76 return ret.stdout.decode("utf-8")
77
78
79def _preprocess_screenshot(screenshot_path: Path, negate: bool = False) -> Path:
80 if shutil.which("magick") is None:
81 raise MachineError("OCR requested but `magick` is not available")
82
83 magick_args = [
84 "-filter",
85 "Catrom",
86 "-density",
87 "72",
88 "-resample",
89 "300",
90 "-contrast",
91 "-normalize",
92 "-despeckle",
93 "-type",
94 "grayscale",
95 "-sharpen",
96 "1",
97 "-posterize",
98 "3",
99 ]
100 out_file = screenshot_path
101
102 if negate:
103 magick_args.append("-negate")
104 out_file = out_file.with_name(f"{out_file.stem}.negative.png")
105 else:
106 out_file = out_file.with_name(f"{out_file.stem}.positive.png")
107
108 magick_args += [
109 "-gamma",
110 "100",
111 "-blur",
112 "1x65535",
113 ]
114
115 ret = subprocess.run(
116 ["magick", "convert"] + magick_args + [screenshot_path, out_file],
117 capture_output=True,
118 )
119
120 if ret.returncode != 0:
121 raise MachineError(
122 f"Image processing failed with exit code {ret.returncode}, stdout: {ret.stdout.decode()}, stderr: {ret.stderr.decode()}"
123 )
124
125 return out_file