1# this module only has to exist because cpython has a global interpreter lock 2# and markdown-it is pure python code. ideally we'd just use thread pools, but 3# the GIL prohibits this. 4 5import multiprocessing 6 7from typing import Any, Callable, Iterable, Optional, TypeVar 8 9R = TypeVar('R') 10S = TypeVar('S') 11T = TypeVar('T') 12A = TypeVar('A') 13 14pool_processes: Optional[int] = None 15 16# this thing is impossible to type because there's so much global state involved. 17# wrapping in a class to get access to Generic[] parameters is not sufficient 18# because mypy is too weak, and unnecessarily obscures how much global state is 19# needed in each worker to make this whole brouhaha work. 20_map_worker_fn: Any = None 21_map_worker_state_fn: Any = None 22_map_worker_state_arg: Any = None 23 24def _map_worker_init(*args: Any) -> None: 25 global _map_worker_fn, _map_worker_state_fn, _map_worker_state_arg 26 (_map_worker_fn, _map_worker_state_fn, _map_worker_state_arg) = args 27 28# NOTE: the state argument is never passed by any caller, we only use it as a localized 29# cache for the created state in lieu of another global. it is effectively a global though. 30def _map_worker_step(arg: Any, state: Any = []) -> Any: 31 global _map_worker_fn, _map_worker_state_fn, _map_worker_state_arg 32 # if a Pool initializer throws it'll just be retried, leading to endless loops. 33 # doing the proper initialization only on first use avoids this. 34 if not state: 35 state.append(_map_worker_state_fn(_map_worker_state_arg)) 36 return _map_worker_fn(state[0], arg) 37 38def map(fn: Callable[[S, T], R], d: Iterable[T], chunk_size: int, 39 state_fn: Callable[[A], S], state_arg: A) -> list[R]: 40 """ 41 `[ fn(state, i) for i in d ]` where `state = state_fn(state_arg)`, but using multiprocessing 42 if `pool_processes` is not `None`. when using multiprocessing is used the state function will 43 be run once in ever worker process and `multiprocessing.Pool.imap` will be used. 44 45 **NOTE:** neither `state_fn` nor `fn` are allowed to mutate global state! doing so will cause 46 discrepancies if `pool_processes` is not None, since each worker will have its own copy. 47 48 **NOTE**: all data types that potentially cross a process boundary (so, all of them) must be 49 pickle-able. this excludes lambdas, bound functions, local functions, and a number of other 50 types depending on their exact internal structure. *theoretically* the pool constructor 51 can transfer non-pickleable data to worker processes, but this only works when using the 52 `fork` spawn method (and is thus not available on darwin or windows). 53 """ 54 if pool_processes is None: 55 state = state_fn(state_arg) 56 return [ fn(state, i) for i in d ] 57 with multiprocessing.Pool(pool_processes, _map_worker_init, (fn, state_fn, state_arg)) as p: 58 return list(p.imap(_map_worker_step, d, chunk_size))