1import os 2import shutil 3import subprocess 4from concurrent.futures import Future, ThreadPoolExecutor 5from pathlib import Path 6 7from test_driver.errors import MachineError 8 9 10def perform_ocr_on_screenshot(screenshot_path: Path) -> str: 11 """ 12 Perform OCR on a screenshot that contains text. 13 Returns a string with all words that could be found. 14 """ 15 return perform_ocr_variants_on_screenshot(screenshot_path, False)[0] 16 17 18def perform_ocr_variants_on_screenshot( 19 screenshot_path: Path, variants: bool = True 20) -> list[str]: 21 """ 22 Same as perform_ocr_on_screenshot but will create variants of the images 23 that can lead to more words being detected. 24 Returns a string with words for each variant. 25 """ 26 if shutil.which("tesseract") is None: 27 raise MachineError("OCR requested but `tesseract` is not available") 28 29 # Tesseract runs parallel on up to 4 cores. 30 # Docs suggest to run it with OMP_THREAD_LIMIT=1 for hundreds of parallel 31 # runs. Our average test run is somewhere inbetween. 32 # https://github.com/tesseract-ocr/tesseract/issues/3109 33 nix_cores: str | None = os.environ.get("NIX_BUILD_CORES") 34 cores: int = os.cpu_count() or 1 if nix_cores is None else int(nix_cores) 35 workers: int = max(1, int(cores / 4)) 36 37 with ThreadPoolExecutor(max_workers=workers) as e: 38 # The idea here is to let the first tesseract call run on the raw image 39 # while the other two are preprocessed + tesseracted in parallel 40 future_results: list[Future] = [e.submit(_run_tesseract, screenshot_path)] 41 if variants: 42 43 def tesseract_processed(inverted: bool) -> str: 44 return _run_tesseract(_preprocess_screenshot(screenshot_path, inverted)) 45 46 future_results.append(e.submit(tesseract_processed, False)) 47 future_results.append(e.submit(tesseract_processed, True)) 48 return [future.result() for future in future_results] 49 50 51def _run_tesseract(image: Path) -> str: 52 # tesseract --help-oem 53 # OCR Engine modes (OEM): 54 # 0|tesseract_only Legacy engine only. 55 # 1|lstm_only Neural nets LSTM engine only. 56 # 2|tesseract_lstm_combined Legacy + LSTM engines. 57 # 3|default Default, based on what is available. 58 ocr_engine_mode = 2 59 60 ret = subprocess.run( 61 [ 62 "tesseract", 63 image, 64 "-", 65 "--oem", 66 str(ocr_engine_mode), 67 "-c", 68 "debug_file=/dev/null", 69 "--psm", 70 "11", 71 ], 72 capture_output=True, 73 ) 74 if ret.returncode != 0: 75 raise MachineError(f"OCR failed with exit code {ret.returncode}") 76 return ret.stdout.decode("utf-8") 77 78 79def _preprocess_screenshot(screenshot_path: Path, negate: bool = False) -> Path: 80 if shutil.which("magick") is None: 81 raise MachineError("OCR requested but `magick` is not available") 82 83 magick_args = [ 84 "-filter", 85 "Catrom", 86 "-density", 87 "72", 88 "-resample", 89 "300", 90 "-contrast", 91 "-normalize", 92 "-despeckle", 93 "-type", 94 "grayscale", 95 "-sharpen", 96 "1", 97 "-posterize", 98 "3", 99 ] 100 out_file = screenshot_path 101 102 if negate: 103 magick_args.append("-negate") 104 out_file = out_file.with_name(f"{out_file.stem}.negative.png") 105 else: 106 out_file = out_file.with_name(f"{out_file.stem}.positive.png") 107 108 magick_args += [ 109 "-gamma", 110 "100", 111 "-blur", 112 "1x65535", 113 ] 114 115 ret = subprocess.run( 116 ["magick", "convert"] + magick_args + [screenshot_path, out_file], 117 capture_output=True, 118 ) 119 120 if ret.returncode != 0: 121 raise MachineError( 122 f"Image processing failed with exit code {ret.returncode}, stdout: {ret.stdout.decode()}, stderr: {ret.stderr.decode()}" 123 ) 124 125 return out_file