1############################################################################### 
    2# Reusable ProcessPoolExecutor 
    3# 
    4# author: Thomas Moreau and Olivier Grisel 
    5# 
    6import time 
    7import warnings 
    8import threading 
    9import multiprocessing as mp 
    10 
    11from .process_executor import ProcessPoolExecutor, EXTRA_QUEUED_CALLS 
    12from .backend.context import cpu_count 
    13from .backend import get_context 
    14 
    15__all__ = ["get_reusable_executor"] 
    16 
    17# Singleton executor and id management 
    18_executor_lock = threading.RLock() 
    19_next_executor_id = 0 
    20_executor = None 
    21_executor_kwargs = None 
    22 
    23 
    24def _get_next_executor_id(): 
    25    """Ensure that each successive executor instance has a unique, monotonic id. 
    26 
    27    The purpose of this monotonic id is to help debug and test automated 
    28    instance creation. 
    29    """ 
    30    global _next_executor_id 
    31    with _executor_lock: 
    32        executor_id = _next_executor_id 
    33        _next_executor_id += 1 
    34        return executor_id 
    35 
    36 
    37def get_reusable_executor( 
    38    max_workers=None, 
    39    context=None, 
    40    timeout=10, 
    41    kill_workers=False, 
    42    reuse="auto", 
    43    job_reducers=None, 
    44    result_reducers=None, 
    45    initializer=None, 
    46    initargs=(), 
    47    env=None, 
    48): 
    49    """Return the current ReusableExectutor instance. 
    50 
    51    Start a new instance if it has not been started already or if the previous 
    52    instance was left in a broken state. 
    53 
    54    If the previous instance does not have the requested number of workers, the 
    55    executor is dynamically resized to adjust the number of workers prior to 
    56    returning. 
    57 
    58    Reusing a singleton instance spares the overhead of starting new worker 
    59    processes and importing common python packages each time. 
    60 
    61    ``max_workers`` controls the maximum number of tasks that can be running in 
    62    parallel in worker processes. By default this is set to the number of 
    63    CPUs on the host. 
    64 
    65    Setting ``timeout`` (in seconds) makes idle workers automatically shutdown 
    66    so as to release system resources. New workers are respawn upon submission 
    67    of new tasks so that ``max_workers`` are available to accept the newly 
    68    submitted tasks. Setting ``timeout`` to around 100 times the time required 
    69    to spawn new processes and import packages in them (on the order of 100ms) 
    70    ensures that the overhead of spawning workers is negligible. 
    71 
    72    Setting ``kill_workers=True`` makes it possible to forcibly interrupt 
    73    previously spawned jobs to get a new instance of the reusable executor 
    74    with new constructor argument values. 
    75 
    76    The ``job_reducers`` and ``result_reducers`` are used to customize the 
    77    pickling of tasks and results send to the executor. 
    78 
    79    When provided, the ``initializer`` is run first in newly spawned 
    80    processes with argument ``initargs``. 
    81 
    82    The environment variable in the child process are a copy of the values in 
    83    the main process. One can provide a dict ``{ENV: VAL}`` where ``ENV`` and 
    84    ``VAL`` are string literals to overwrite the environment variable ``ENV`` 
    85    in the child processes to value ``VAL``. The environment variables are set 
    86    in the children before any module is loaded. This only works with the 
    87    ``loky`` context. 
    88    """ 
    89    _executor, _ = _ReusablePoolExecutor.get_reusable_executor( 
    90        max_workers=max_workers, 
    91        context=context, 
    92        timeout=timeout, 
    93        kill_workers=kill_workers, 
    94        reuse=reuse, 
    95        job_reducers=job_reducers, 
    96        result_reducers=result_reducers, 
    97        initializer=initializer, 
    98        initargs=initargs, 
    99        env=env, 
    100    ) 
    101    return _executor 
    102 
    103 
    104class _ReusablePoolExecutor(ProcessPoolExecutor): 
    105    def __init__( 
    106        self, 
    107        submit_resize_lock, 
    108        max_workers=None, 
    109        context=None, 
    110        timeout=None, 
    111        executor_id=0, 
    112        job_reducers=None, 
    113        result_reducers=None, 
    114        initializer=None, 
    115        initargs=(), 
    116        env=None, 
    117    ): 
    118        super().__init__( 
    119            max_workers=max_workers, 
    120            context=context, 
    121            timeout=timeout, 
    122            job_reducers=job_reducers, 
    123            result_reducers=result_reducers, 
    124            initializer=initializer, 
    125            initargs=initargs, 
    126            env=env, 
    127        ) 
    128        self.executor_id = executor_id 
    129        self._submit_resize_lock = submit_resize_lock 
    130 
    131    @classmethod 
    132    def get_reusable_executor( 
    133        cls, 
    134        max_workers=None, 
    135        context=None, 
    136        timeout=10, 
    137        kill_workers=False, 
    138        reuse="auto", 
    139        job_reducers=None, 
    140        result_reducers=None, 
    141        initializer=None, 
    142        initargs=(), 
    143        env=None, 
    144    ): 
    145        with _executor_lock: 
    146            global _executor, _executor_kwargs 
    147            executor = _executor 
    148 
    149            if max_workers is None: 
    150                if reuse is True and executor is not None: 
    151                    max_workers = executor._max_workers 
    152                else: 
    153                    max_workers = cpu_count() 
    154            elif max_workers <= 0: 
    155                raise ValueError( 
    156                    f"max_workers must be greater than 0, got {max_workers}." 
    157                ) 
    158 
    159            if isinstance(context, str): 
    160                context = get_context(context) 
    161            if context is not None and context.get_start_method() == "fork": 
    162                raise ValueError( 
    163                    "Cannot use reusable executor with the 'fork' context" 
    164                ) 
    165 
    166            kwargs = dict( 
    167                context=context, 
    168                timeout=timeout, 
    169                job_reducers=job_reducers, 
    170                result_reducers=result_reducers, 
    171                initializer=initializer, 
    172                initargs=initargs, 
    173                env=env, 
    174            ) 
    175            if executor is None: 
    176                is_reused = False 
    177                mp.util.debug( 
    178                    f"Create a executor with max_workers={max_workers}." 
    179                ) 
    180                executor_id = _get_next_executor_id() 
    181                _executor_kwargs = kwargs 
    182                _executor = executor = cls( 
    183                    _executor_lock, 
    184                    max_workers=max_workers, 
    185                    executor_id=executor_id, 
    186                    **kwargs, 
    187                ) 
    188            else: 
    189                if reuse == "auto": 
    190                    reuse = kwargs == _executor_kwargs 
    191                if ( 
    192                    executor._flags.broken 
    193                    or executor._flags.shutdown 
    194                    or not reuse 
    195                    or executor.queue_size < max_workers 
    196                ): 
    197                    if executor._flags.broken: 
    198                        reason = "broken" 
    199                    elif executor._flags.shutdown: 
    200                        reason = "shutdown" 
    201                    elif executor.queue_size < max_workers: 
    202                        # Do not reuse the executor if the queue size is too 
    203                        # small as this would lead to limited parallelism. 
    204                        reason = "queue size is too small" 
    205                    else: 
    206                        reason = "arguments have changed" 
    207                    mp.util.debug( 
    208                        "Creating a new executor with max_workers=" 
    209                        f"{max_workers} as the previous instance cannot be " 
    210                        f"reused ({reason})." 
    211                    ) 
    212                    executor.shutdown(wait=True, kill_workers=kill_workers) 
    213                    _executor = executor = _executor_kwargs = None 
    214                    # Recursive call to build a new instance 
    215                    return cls.get_reusable_executor( 
    216                        max_workers=max_workers, **kwargs 
    217                    ) 
    218                else: 
    219                    mp.util.debug( 
    220                        "Reusing existing executor with " 
    221                        f"max_workers={executor._max_workers}." 
    222                    ) 
    223                    is_reused = True 
    224                    executor._resize(max_workers) 
    225 
    226        return executor, is_reused 
    227 
    228    def submit(self, fn, *args, **kwargs): 
    229        with self._submit_resize_lock: 
    230            return super().submit(fn, *args, **kwargs) 
    231 
    232    def _resize(self, max_workers): 
    233        with self._submit_resize_lock: 
    234            if max_workers is None: 
    235                raise ValueError("Trying to resize with max_workers=None") 
    236            elif max_workers == self._max_workers: 
    237                return 
    238 
    239            if self._executor_manager_thread is None: 
    240                # If the executor_manager_thread has not been started 
    241                # then no processes have been spawned and we can just 
    242                # update _max_workers and return 
    243                self._max_workers = max_workers 
    244                return 
    245 
    246            self._wait_job_completion() 
    247 
    248            # Some process might have returned due to timeout so check how many 
    249            # children are still alive. Use the _process_management_lock to 
    250            # ensure that no process are spawned or timeout during the resize. 
    251            with self._processes_management_lock: 
    252                processes = list(self._processes.values()) 
    253                nb_children_alive = sum(p.is_alive() for p in processes) 
    254                self._max_workers = max_workers 
    255                for _ in range(max_workers, nb_children_alive): 
    256                    self._call_queue.put(None) 
    257            while ( 
    258                len(self._processes) > max_workers and not self._flags.broken 
    259            ): 
    260                time.sleep(1e-3) 
    261 
    262            self._adjust_process_count() 
    263            processes = list(self._processes.values()) 
    264            while not all(p.is_alive() for p in processes): 
    265                time.sleep(1e-3) 
    266 
    267    def _wait_job_completion(self): 
    268        """Wait for the cache to be empty before resizing the pool.""" 
    269        # Issue a warning to the user about the bad effect of this usage. 
    270        if self._pending_work_items: 
    271            warnings.warn( 
    272                "Trying to resize an executor with running jobs: " 
    273                "waiting for jobs completion before resizing.", 
    274                UserWarning, 
    275            ) 
    276            mp.util.debug( 
    277                f"Executor {self.executor_id} waiting for jobs completion " 
    278                "before resizing" 
    279            ) 
    280        # Wait for the completion of the jobs 
    281        while self._pending_work_items: 
    282            time.sleep(1e-3) 
    283 
    284    def _setup_queues(self, job_reducers, result_reducers): 
    285        # As this executor can be resized, use a large queue size to avoid 
    286        # underestimating capacity and introducing overhead 
    287        # Also handle the case where the user set max_workers to a value larger 
    288        # than cpu_count(), to avoid limiting the number of parallel jobs. 
    289 
    290        min_queue_size = max(cpu_count(), self._max_workers) 
    291        self.queue_size = 2 * min_queue_size + EXTRA_QUEUED_CALLS 
    292        super()._setup_queues( 
    293            job_reducers, result_reducers, queue_size=self.queue_size 
    294        )