| # Copyright 2020 The Chromium Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| """Helpers related to multiprocessing. |
| |
| Based on: //tools/binary_size/libsupersize/parallel.py |
| """ |
| |
| import atexit |
| import logging |
| import multiprocessing |
| import os |
| import sys |
| import threading |
| import traceback |
| |
| DISABLE_ASYNC = os.environ.get('DISABLE_ASYNC') == '1' |
| if DISABLE_ASYNC: |
| logging.warning('Running in synchronous mode.') |
| |
| _all_pools = None |
| _is_child_process = False |
| _silence_exceptions = False |
| |
| # Used to pass parameters to forked processes without pickling. |
| _fork_params = None |
| _fork_kwargs = None |
| |
| |
| class _ImmediateResult(object): |
| def __init__(self, value): |
| self._value = value |
| |
| def get(self): |
| return self._value |
| |
| def wait(self): |
| pass |
| |
| def ready(self): |
| return True |
| |
| def successful(self): |
| return True |
| |
| |
| class _ExceptionWrapper(object): |
| """Used to marshal exception messages back to main process.""" |
| |
| def __init__(self, msg, exception_type=None): |
| self.msg = msg |
| self.exception_type = exception_type |
| |
| def MaybeThrow(self): |
| if self.exception_type: |
| raise getattr(__builtins__, |
| self.exception_type)('Originally caused by: ' + self.msg) |
| |
| |
| class _FuncWrapper(object): |
| """Runs on the fork()'ed side to catch exceptions and spread *args.""" |
| |
| def __init__(self, func): |
| global _is_child_process |
| _is_child_process = True |
| self._func = func |
| |
| def __call__(self, index, _=None): |
| try: |
| return self._func(*_fork_params[index], **_fork_kwargs) |
| except Exception as e: |
| # Only keep the exception type for builtin exception types or else risk |
| # further marshalling exceptions. |
| exception_type = None |
| if hasattr(__builtins__, type(e).__name__): |
| exception_type = type(e).__name__ |
| # multiprocessing is supposed to catch and return exceptions automatically |
| # but it doesn't seem to work properly :(. |
| return _ExceptionWrapper(traceback.format_exc(), exception_type) |
| except: # pylint: disable=bare-except |
| return _ExceptionWrapper(traceback.format_exc()) |
| |
| |
| class _WrappedResult(object): |
| """Allows for host-side logic to be run after child process has terminated. |
| |
| * Unregisters associated pool _all_pools. |
| * Raises exception caught by _FuncWrapper. |
| """ |
| |
| def __init__(self, result, pool=None): |
| self._result = result |
| self._pool = pool |
| |
| def get(self): |
| self.wait() |
| value = self._result.get() |
| _CheckForException(value) |
| return value |
| |
| def wait(self): |
| self._result.wait() |
| if self._pool: |
| _all_pools.remove(self._pool) |
| self._pool = None |
| |
| def ready(self): |
| return self._result.ready() |
| |
| def successful(self): |
| return self._result.successful() |
| |
| |
| def _TerminatePools(): |
| """Calls .terminate() on all active process pools. |
| |
| Not supposed to be necessary according to the docs, but seems to be required |
| when child process throws an exception or Ctrl-C is hit. |
| """ |
| global _silence_exceptions |
| _silence_exceptions = True |
| # Child processes cannot have pools, but atexit runs this function because |
| # it was registered before fork()ing. |
| if _is_child_process: |
| return |
| |
| def close_pool(pool): |
| try: |
| pool.terminate() |
| except: # pylint: disable=bare-except |
| pass |
| |
| for i, pool in enumerate(_all_pools): |
| # Without calling terminate() on a separate thread, the call can block |
| # forever. |
| thread = threading.Thread(name='Pool-Terminate-{}'.format(i), |
| target=close_pool, |
| args=(pool, )) |
| thread.daemon = True |
| thread.start() |
| |
| |
| def _CheckForException(value): |
| if isinstance(value, _ExceptionWrapper): |
| global _silence_exceptions |
| if not _silence_exceptions: |
| value.MaybeThrow() |
| _silence_exceptions = True |
| logging.error('Subprocess raised an exception:\n%s', value.msg) |
| sys.exit(1) |
| |
| |
| def _MakeProcessPool(job_params, **job_kwargs): |
| global _all_pools |
| global _fork_params |
| global _fork_kwargs |
| assert _fork_params is None |
| assert _fork_kwargs is None |
| pool_size = min(len(job_params), multiprocessing.cpu_count()) |
| _fork_params = job_params |
| _fork_kwargs = job_kwargs |
| ret = multiprocessing.Pool(pool_size) |
| _fork_params = None |
| _fork_kwargs = None |
| if _all_pools is None: |
| _all_pools = [] |
| atexit.register(_TerminatePools) |
| _all_pools.append(ret) |
| return ret |
| |
| |
| def ForkAndCall(func, args): |
| """Runs |func| in a fork'ed process. |
| |
| Returns: |
| A Result object (call .get() to get the return value) |
| """ |
| if DISABLE_ASYNC: |
| pool = None |
| result = _ImmediateResult(func(*args)) |
| else: |
| pool = _MakeProcessPool([args]) # Omit |kwargs|. |
| result = pool.apply_async(_FuncWrapper(func), (0, )) |
| pool.close() |
| return _WrappedResult(result, pool=pool) |
| |
| |
| def BulkForkAndCall(func, arg_tuples, **kwargs): |
| """Calls |func| in a fork'ed process for each set of args within |arg_tuples|. |
| |
| Args: |
| kwargs: Common keyword arguments to be passed to |func|. |
| |
| Yields the return values in order. |
| """ |
| arg_tuples = list(arg_tuples) |
| if not arg_tuples: |
| return |
| |
| if DISABLE_ASYNC: |
| for args in arg_tuples: |
| yield func(*args, **kwargs) |
| return |
| |
| pool = _MakeProcessPool(arg_tuples, **kwargs) |
| wrapped_func = _FuncWrapper(func) |
| try: |
| for result in pool.imap(wrapped_func, range(len(arg_tuples))): |
| _CheckForException(result) |
| yield result |
| finally: |
| pool.close() |
| pool.join() |
| _all_pools.remove(pool) |