1""" 
    2Backends for embarrassingly parallel code. 
    3""" 
    4 
    5import contextlib 
    6import gc 
    7import os 
    8import threading 
    9import warnings 
    10from abc import ABCMeta, abstractmethod 
    11 
    12from ._multiprocessing_helpers import mp 
    13from ._utils import ( 
    14    _retrieve_traceback_capturing_wrapped_call, 
    15    _TracebackCapturingWrapper, 
    16) 
    17 
    18if mp is not None: 
    19    from multiprocessing.pool import ThreadPool 
    20 
    21    from .executor import get_memmapping_executor 
    22 
    23    # Import loky only if multiprocessing is present 
    24    from .externals.loky import cpu_count, process_executor 
    25    from .externals.loky.process_executor import ShutdownExecutorError 
    26    from .pool import MemmappingPool 
    27 
    28 
    29class ParallelBackendBase(metaclass=ABCMeta): 
    30    """Helper abc which defines all methods a ParallelBackend must implement""" 
    31 
    32    default_n_jobs = 1 
    33 
    34    supports_inner_max_num_threads = False 
    35 
    36    # This flag was introduced for backward compatibility reasons. 
    37    # New backends should always set it to True and implement the 
    38    # `retrieve_result_callback` method. 
    39    supports_retrieve_callback = False 
    40 
    41    @property 
    42    def supports_return_generator(self): 
    43        return self.supports_retrieve_callback 
    44 
    45    @property 
    46    def supports_timeout(self): 
    47        return self.supports_retrieve_callback 
    48 
    49    nesting_level = None 
    50 
    51    def __init__( 
    52        self, nesting_level=None, inner_max_num_threads=None, **backend_kwargs 
    53    ): 
    54        super().__init__() 
    55        self.nesting_level = nesting_level 
    56        self.inner_max_num_threads = inner_max_num_threads 
    57        self.backend_kwargs = backend_kwargs 
    58 
    59    MAX_NUM_THREADS_VARS = [ 
    60        "OMP_NUM_THREADS", 
    61        "OPENBLAS_NUM_THREADS", 
    62        "MKL_NUM_THREADS", 
    63        "BLIS_NUM_THREADS", 
    64        "VECLIB_MAXIMUM_THREADS", 
    65        "NUMBA_NUM_THREADS", 
    66        "NUMEXPR_NUM_THREADS", 
    67    ] 
    68 
    69    TBB_ENABLE_IPC_VAR = "ENABLE_IPC" 
    70 
    71    @abstractmethod 
    72    def effective_n_jobs(self, n_jobs): 
    73        """Determine the number of jobs that can actually run in parallel 
    74 
    75        n_jobs is the number of workers requested by the callers. Passing 
    76        n_jobs=-1 means requesting all available workers for instance matching 
    77        the number of CPU cores on the worker host(s). 
    78 
    79        This method should return a guesstimate of the number of workers that 
    80        can actually perform work concurrently. The primary use case is to make 
    81        it possible for the caller to know in how many chunks to slice the 
    82        work. 
    83 
    84        In general working on larger data chunks is more efficient (less 
    85        scheduling overhead and better use of CPU cache prefetching heuristics) 
    86        as long as all the workers have enough work to do. 
    87        """ 
    88 
    89    def apply_async(self, func, callback=None): 
    90        """Deprecated: implement `submit` instead.""" 
    91        raise NotImplementedError("Implement `submit` instead.") 
    92 
    93    def submit(self, func, callback=None): 
    94        """Schedule a function to be run and return a future-like object. 
    95 
    96        This method should return a future-like object that allow tracking 
    97        the progress of the task. 
    98 
    99        If ``supports_retrieve_callback`` is False, the return value of this 
    100        method is passed to ``retrieve_result`` instead of calling 
    101        ``retrieve_result_callback``. 
    102 
    103        Parameters 
    104        ---------- 
    105        func: callable 
    106            The function to be run in parallel. 
    107 
    108        callback: callable 
    109            A callable that will be called when the task is completed. This callable 
    110            is a wrapper around ``retrieve_result_callback``. This should be added 
    111            to the future-like object returned by this method, so that the callback 
    112            is called when the task is completed. 
    113 
    114            For future-like backends, this can be achieved with something like 
    115            ``future.add_done_callback(callback)``. 
    116 
    117        Returns 
    118        ------- 
    119        future: future-like 
    120            A future-like object to track the execution of the submitted function. 
    121        """ 
    122        warnings.warn( 
    123            "`apply_async` is deprecated, implement and use `submit` instead.", 
    124            DeprecationWarning, 
    125        ) 
    126        return self.apply_async(func, callback) 
    127 
    128    def retrieve_result_callback(self, out): 
    129        """Called within the callback function passed to `submit`. 
    130 
    131        This method can customise how the result of the function is retrieved 
    132        from the future-like object. 
    133 
    134        Parameters 
    135        ---------- 
    136        future: future-like 
    137            The future-like object returned by the `submit` method. 
    138 
    139        Returns 
    140        ------- 
    141        result: object 
    142            The result of the function executed in parallel. 
    143        """ 
    144 
    145    def retrieve_result(self, out, timeout=None): 
    146        """Hook to retrieve the result when support_retrieve_callback=False. 
    147 
    148        The argument `out` is the result of the `submit` call. This method 
    149        should return the result of the computation or raise an exception if 
    150        the computation failed. 
    151        """ 
    152        if self.supports_timeout: 
    153            return out.get(timeout=timeout) 
    154        else: 
    155            return out.get() 
    156 
    157    def configure( 
    158        self, n_jobs=1, parallel=None, prefer=None, require=None, **backend_kwargs 
    159    ): 
    160        """Reconfigure the backend and return the number of workers. 
    161 
    162        This makes it possible to reuse an existing backend instance for 
    163        successive independent calls to Parallel with different parameters. 
    164        """ 
    165        self.parallel = parallel 
    166        return self.effective_n_jobs(n_jobs) 
    167 
    168    def start_call(self): 
    169        """Call-back method called at the beginning of a Parallel call""" 
    170 
    171    def stop_call(self): 
    172        """Call-back method called at the end of a Parallel call""" 
    173 
    174    def terminate(self): 
    175        """Shutdown the workers and free the shared memory.""" 
    176 
    177    def compute_batch_size(self): 
    178        """Determine the optimal batch size""" 
    179        return 1 
    180 
    181    def batch_completed(self, batch_size, duration): 
    182        """Callback indicate how long it took to run a batch""" 
    183 
    184    def abort_everything(self, ensure_ready=True): 
    185        """Abort any running tasks 
    186 
    187        This is called when an exception has been raised when executing a task 
    188        and all the remaining tasks will be ignored and can therefore be 
    189        aborted to spare computation resources. 
    190 
    191        If ensure_ready is True, the backend should be left in an operating 
    192        state as future tasks might be re-submitted via that same backend 
    193        instance. 
    194 
    195        If ensure_ready is False, the implementer of this method can decide 
    196        to leave the backend in a closed / terminated state as no new task 
    197        are expected to be submitted to this backend. 
    198 
    199        Setting ensure_ready to False is an optimization that can be leveraged 
    200        when aborting tasks via killing processes from a local process pool 
    201        managed by the backend it-self: if we expect no new tasks, there is no 
    202        point in re-creating new workers. 
    203        """ 
    204        # Does nothing by default: to be overridden in subclasses when 
    205        # canceling tasks is possible. 
    206        pass 
    207 
    208    def get_nested_backend(self): 
    209        """Backend instance to be used by nested Parallel calls. 
    210 
    211        By default a thread-based backend is used for the first level of 
    212        nesting. Beyond, switch to sequential backend to avoid spawning too 
    213        many threads on the host. 
    214        """ 
    215        nesting_level = getattr(self, "nesting_level", 0) + 1 
    216        if nesting_level > 1: 
    217            return SequentialBackend(nesting_level=nesting_level), None 
    218        else: 
    219            return ThreadingBackend(nesting_level=nesting_level), None 
    220 
    221    def _prepare_worker_env(self, n_jobs): 
    222        """Return environment variables limiting threadpools in external libs. 
    223 
    224        This function return a dict containing environment variables to pass 
    225        when creating a pool of process. These environment variables limit the 
    226        number of threads to `n_threads` for OpenMP, MKL, Accelerated and 
    227        OpenBLAS libraries in the child processes. 
    228        """ 
    229        explicit_n_threads = self.inner_max_num_threads 
    230        default_n_threads = max(cpu_count() // n_jobs, 1) 
    231 
    232        # Set the inner environment variables to self.inner_max_num_threads if 
    233        # it is given. Else, default to cpu_count // n_jobs unless the variable 
    234        # is already present in the parent process environment. 
    235        env = {} 
    236        for var in self.MAX_NUM_THREADS_VARS: 
    237            if explicit_n_threads is None: 
    238                var_value = os.environ.get(var, default_n_threads) 
    239            else: 
    240                var_value = explicit_n_threads 
    241 
    242            env[var] = str(var_value) 
    243 
    244        if self.TBB_ENABLE_IPC_VAR not in os.environ: 
    245            # To avoid over-subscription when using TBB, let the TBB schedulers 
    246            # use Inter Process Communication to coordinate: 
    247            env[self.TBB_ENABLE_IPC_VAR] = "1" 
    248        return env 
    249 
    250    @contextlib.contextmanager 
    251    def retrieval_context(self): 
    252        """Context manager to manage an execution context. 
    253 
    254        Calls to Parallel.retrieve will be made inside this context. 
    255 
    256        By default, this does nothing. It may be useful for subclasses to 
    257        handle nested parallelism. In particular, it may be required to avoid 
    258        deadlocks if a backend manages a fixed number of workers, when those 
    259        workers may be asked to do nested Parallel calls. Without 
    260        'retrieval_context' this could lead to deadlock, as all the workers 
    261        managed by the backend may be "busy" waiting for the nested parallel 
    262        calls to finish, but the backend has no free workers to execute those 
    263        tasks. 
    264        """ 
    265        yield 
    266 
    267    @staticmethod 
    268    def in_main_thread(): 
    269        return isinstance(threading.current_thread(), threading._MainThread) 
    270 
    271 
    272class SequentialBackend(ParallelBackendBase): 
    273    """A ParallelBackend which will execute all batches sequentially. 
    274 
    275    Does not use/create any threading objects, and hence has minimal 
    276    overhead. Used when n_jobs == 1. 
    277    """ 
    278 
    279    uses_threads = True 
    280    supports_timeout = False 
    281    supports_retrieve_callback = False 
    282    supports_sharedmem = True 
    283 
    284    def effective_n_jobs(self, n_jobs): 
    285        """Determine the number of jobs which are going to run in parallel""" 
    286        if n_jobs == 0: 
    287            raise ValueError("n_jobs == 0 in Parallel has no meaning") 
    288        return 1 
    289 
    290    def submit(self, func, callback=None): 
    291        """Schedule a func to be run""" 
    292        raise RuntimeError("Should never be called for SequentialBackend.") 
    293 
    294    def retrieve_result_callback(self, out): 
    295        raise RuntimeError("Should never be called for SequentialBackend.") 
    296 
    297    def get_nested_backend(self): 
    298        # import is not top level to avoid cyclic import errors. 
    299        from .parallel import get_active_backend 
    300 
    301        # SequentialBackend should neither change the nesting level, the 
    302        # default backend or the number of jobs. Just return the current one. 
    303        return get_active_backend() 
    304 
    305 
    306class PoolManagerMixin(object): 
    307    """A helper class for managing pool of workers.""" 
    308 
    309    _pool = None 
    310 
    311    def effective_n_jobs(self, n_jobs): 
    312        """Determine the number of jobs which are going to run in parallel""" 
    313        if n_jobs == 0: 
    314            raise ValueError("n_jobs == 0 in Parallel has no meaning") 
    315        elif mp is None or n_jobs is None: 
    316            # multiprocessing is not available or disabled, fallback 
    317            # to sequential mode 
    318            return 1 
    319        elif n_jobs < 0: 
    320            n_jobs = max(cpu_count() + 1 + n_jobs, 1) 
    321        return n_jobs 
    322 
    323    def terminate(self): 
    324        """Shutdown the process or thread pool""" 
    325        if self._pool is not None: 
    326            self._pool.close() 
    327            self._pool.terminate()  # terminate does a join() 
    328            self._pool = None 
    329 
    330    def _get_pool(self): 
    331        """Used by `submit` to make it possible to implement lazy init""" 
    332        return self._pool 
    333 
    334    def submit(self, func, callback=None): 
    335        """Schedule a func to be run""" 
    336        # Here, we need a wrapper to avoid crashes on KeyboardInterruptErrors. 
    337        # We also call the callback on error, to make sure the pool does not 
    338        # wait on crashed jobs. 
    339        return self._get_pool().apply_async( 
    340            _TracebackCapturingWrapper(func), 
    341            (), 
    342            callback=callback, 
    343            error_callback=callback, 
    344        ) 
    345 
    346    def retrieve_result_callback(self, result): 
    347        """Mimic concurrent.futures results, raising an error if needed.""" 
    348        # In the multiprocessing Pool API, the callback are called with the 
    349        # result value as an argument so `result`(`out`) is the output of 
    350        # job.get(). It's either the result or the exception raised while 
    351        # collecting the result. 
    352        return _retrieve_traceback_capturing_wrapped_call(result) 
    353 
    354    def abort_everything(self, ensure_ready=True): 
    355        """Shutdown the pool and restart a new one with the same parameters""" 
    356        self.terminate() 
    357        if ensure_ready: 
    358            self.configure( 
    359                n_jobs=self.parallel.n_jobs, 
    360                parallel=self.parallel, 
    361                **self.parallel._backend_kwargs, 
    362            ) 
    363 
    364 
    365class AutoBatchingMixin(object): 
    366    """A helper class for automagically batching jobs.""" 
    367 
    368    # In seconds, should be big enough to hide multiprocessing dispatching 
    369    # overhead. 
    370    # This settings was found by running benchmarks/bench_auto_batching.py 
    371    # with various parameters on various platforms. 
    372    MIN_IDEAL_BATCH_DURATION = 0.2 
    373 
    374    # Should not be too high to avoid stragglers: long jobs running alone 
    375    # on a single worker while other workers have no work to process any more. 
    376    MAX_IDEAL_BATCH_DURATION = 2 
    377 
    378    # Batching counters default values 
    379    _DEFAULT_EFFECTIVE_BATCH_SIZE = 1 
    380    _DEFAULT_SMOOTHED_BATCH_DURATION = 0.0 
    381 
    382    def __init__(self, **kwargs): 
    383        super().__init__(**kwargs) 
    384        self._effective_batch_size = self._DEFAULT_EFFECTIVE_BATCH_SIZE 
    385        self._smoothed_batch_duration = self._DEFAULT_SMOOTHED_BATCH_DURATION 
    386 
    387    def compute_batch_size(self): 
    388        """Determine the optimal batch size""" 
    389        old_batch_size = self._effective_batch_size 
    390        batch_duration = self._smoothed_batch_duration 
    391        if batch_duration > 0 and batch_duration < self.MIN_IDEAL_BATCH_DURATION: 
    392            # The current batch size is too small: the duration of the 
    393            # processing of a batch of task is not large enough to hide 
    394            # the scheduling overhead. 
    395            ideal_batch_size = int( 
    396                old_batch_size * self.MIN_IDEAL_BATCH_DURATION / batch_duration 
    397            ) 
    398            # Multiply by two to limit oscilations between min and max. 
    399            ideal_batch_size *= 2 
    400 
    401            # dont increase the batch size too fast to limit huge batch sizes 
    402            # potentially leading to starving worker 
    403            batch_size = min(2 * old_batch_size, ideal_batch_size) 
    404 
    405            batch_size = max(batch_size, 1) 
    406 
    407            self._effective_batch_size = batch_size 
    408            if self.parallel.verbose >= 10: 
    409                self.parallel._print( 
    410                    f"Batch computation too fast ({batch_duration}s.) " 
    411                    f"Setting batch_size={batch_size}." 
    412                ) 
    413        elif batch_duration > self.MAX_IDEAL_BATCH_DURATION and old_batch_size >= 2: 
    414            # The current batch size is too big. If we schedule overly long 
    415            # running batches some CPUs might wait with nothing left to do 
    416            # while a couple of CPUs a left processing a few long running 
    417            # batches. Better reduce the batch size a bit to limit the 
    418            # likelihood of scheduling such stragglers. 
    419 
    420            # decrease the batch size quickly to limit potential starving 
    421            ideal_batch_size = int( 
    422                old_batch_size * self.MIN_IDEAL_BATCH_DURATION / batch_duration 
    423            ) 
    424            # Multiply by two to limit oscilations between min and max. 
    425            batch_size = max(2 * ideal_batch_size, 1) 
    426            self._effective_batch_size = batch_size 
    427            if self.parallel.verbose >= 10: 
    428                self.parallel._print( 
    429                    f"Batch computation too slow ({batch_duration}s.) " 
    430                    f"Setting batch_size={batch_size}." 
    431                ) 
    432        else: 
    433            # No batch size adjustment 
    434            batch_size = old_batch_size 
    435 
    436        if batch_size != old_batch_size: 
    437            # Reset estimation of the smoothed mean batch duration: this 
    438            # estimate is updated in the multiprocessing apply_async 
    439            # CallBack as long as the batch_size is constant. Therefore 
    440            # we need to reset the estimate whenever we re-tune the batch 
    441            # size. 
    442            self._smoothed_batch_duration = self._DEFAULT_SMOOTHED_BATCH_DURATION 
    443 
    444        return batch_size 
    445 
    446    def batch_completed(self, batch_size, duration): 
    447        """Callback indicate how long it took to run a batch""" 
    448        if batch_size == self._effective_batch_size: 
    449            # Update the smoothed streaming estimate of the duration of a batch 
    450            # from dispatch to completion 
    451            old_duration = self._smoothed_batch_duration 
    452            if old_duration == self._DEFAULT_SMOOTHED_BATCH_DURATION: 
    453                # First record of duration for this batch size after the last 
    454                # reset. 
    455                new_duration = duration 
    456            else: 
    457                # Update the exponentially weighted average of the duration of 
    458                # batch for the current effective size. 
    459                new_duration = 0.8 * old_duration + 0.2 * duration 
    460            self._smoothed_batch_duration = new_duration 
    461 
    462    def reset_batch_stats(self): 
    463        """Reset batch statistics to default values. 
    464 
    465        This avoids interferences with future jobs. 
    466        """ 
    467        self._effective_batch_size = self._DEFAULT_EFFECTIVE_BATCH_SIZE 
    468        self._smoothed_batch_duration = self._DEFAULT_SMOOTHED_BATCH_DURATION 
    469 
    470 
    471class ThreadingBackend(PoolManagerMixin, ParallelBackendBase): 
    472    """A ParallelBackend which will use a thread pool to execute batches in. 
    473 
    474    This is a low-overhead backend but it suffers from the Python Global 
    475    Interpreter Lock if the called function relies a lot on Python objects. 
    476    Mostly useful when the execution bottleneck is a compiled extension that 
    477    explicitly releases the GIL (for instance a Cython loop wrapped in a "with 
    478    nogil" block or an expensive call to a library such as NumPy). 
    479 
    480    The actual thread pool is lazily initialized: the actual thread pool 
    481    construction is delayed to the first call to apply_async. 
    482 
    483    ThreadingBackend is used as the default backend for nested calls. 
    484    """ 
    485 
    486    supports_retrieve_callback = True 
    487    uses_threads = True 
    488    supports_sharedmem = True 
    489 
    490    def configure(self, n_jobs=1, parallel=None, **backend_kwargs): 
    491        """Build a process or thread pool and return the number of workers""" 
    492        n_jobs = self.effective_n_jobs(n_jobs) 
    493        if n_jobs == 1: 
    494            # Avoid unnecessary overhead and use sequential backend instead. 
    495            raise FallbackToBackend(SequentialBackend(nesting_level=self.nesting_level)) 
    496        self.parallel = parallel 
    497        self._n_jobs = n_jobs 
    498        return n_jobs 
    499 
    500    def _get_pool(self): 
    501        """Lazily initialize the thread pool 
    502 
    503        The actual pool of worker threads is only initialized at the first 
    504        call to apply_async. 
    505        """ 
    506        if self._pool is None: 
    507            self._pool = ThreadPool(self._n_jobs) 
    508        return self._pool 
    509 
    510 
    511class MultiprocessingBackend(PoolManagerMixin, AutoBatchingMixin, ParallelBackendBase): 
    512    """A ParallelBackend which will use a multiprocessing.Pool. 
    513 
    514    Will introduce some communication and memory overhead when exchanging 
    515    input and output data with the with the worker Python processes. 
    516    However, does not suffer from the Python Global Interpreter Lock. 
    517    """ 
    518 
    519    supports_retrieve_callback = True 
    520    supports_return_generator = False 
    521 
    522    def effective_n_jobs(self, n_jobs): 
    523        """Determine the number of jobs which are going to run in parallel. 
    524 
    525        This also checks if we are attempting to create a nested parallel 
    526        loop. 
    527        """ 
    528        if mp is None: 
    529            return 1 
    530 
    531        if mp.current_process().daemon: 
    532            # Daemonic processes cannot have children 
    533            if n_jobs != 1: 
    534                if inside_dask_worker(): 
    535                    msg = ( 
    536                        "Inside a Dask worker with daemon=True, " 
    537                        "setting n_jobs=1.\nPossible work-arounds:\n" 
    538                        "- dask.config.set(" 
    539                        "{'distributed.worker.daemon': False})" 
    540                        "- set the environment variable " 
    541                        "DASK_DISTRIBUTED__WORKER__DAEMON=False\n" 
    542                        "before creating your Dask cluster." 
    543                    ) 
    544                else: 
    545                    msg = ( 
    546                        "Multiprocessing-backed parallel loops " 
    547                        "cannot be nested, setting n_jobs=1" 
    548                    ) 
    549                warnings.warn(msg, stacklevel=3) 
    550            return 1 
    551 
    552        if process_executor._CURRENT_DEPTH > 0: 
    553            # Mixing loky and multiprocessing in nested loop is not supported 
    554            if n_jobs != 1: 
    555                warnings.warn( 
    556                    "Multiprocessing-backed parallel loops cannot be nested," 
    557                    " below loky, setting n_jobs=1", 
    558                    stacklevel=3, 
    559                ) 
    560            return 1 
    561 
    562        elif not (self.in_main_thread() or self.nesting_level == 0): 
    563            # Prevent posix fork inside in non-main posix threads 
    564            if n_jobs != 1: 
    565                warnings.warn( 
    566                    "Multiprocessing-backed parallel loops cannot be nested" 
    567                    " below threads, setting n_jobs=1", 
    568                    stacklevel=3, 
    569                ) 
    570            return 1 
    571 
    572        return super(MultiprocessingBackend, self).effective_n_jobs(n_jobs) 
    573 
    574    def configure( 
    575        self, 
    576        n_jobs=1, 
    577        parallel=None, 
    578        prefer=None, 
    579        require=None, 
    580        **memmapping_pool_kwargs, 
    581    ): 
    582        """Build a process or thread pool and return the number of workers""" 
    583        n_jobs = self.effective_n_jobs(n_jobs) 
    584        if n_jobs == 1: 
    585            raise FallbackToBackend(SequentialBackend(nesting_level=self.nesting_level)) 
    586 
    587        memmapping_pool_kwargs = { 
    588            **self.backend_kwargs, 
    589            **memmapping_pool_kwargs, 
    590        } 
    591 
    592        # Make sure to free as much memory as possible before forking 
    593        gc.collect() 
    594        self._pool = MemmappingPool(n_jobs, **memmapping_pool_kwargs) 
    595        self.parallel = parallel 
    596        return n_jobs 
    597 
    598    def terminate(self): 
    599        """Shutdown the process or thread pool""" 
    600        super(MultiprocessingBackend, self).terminate() 
    601        self.reset_batch_stats() 
    602 
    603 
    604class LokyBackend(AutoBatchingMixin, ParallelBackendBase): 
    605    """Managing pool of workers with loky instead of multiprocessing.""" 
    606 
    607    supports_retrieve_callback = True 
    608    supports_inner_max_num_threads = True 
    609 
    610    def configure( 
    611        self, 
    612        n_jobs=1, 
    613        parallel=None, 
    614        prefer=None, 
    615        require=None, 
    616        idle_worker_timeout=None, 
    617        **memmapping_executor_kwargs, 
    618    ): 
    619        """Build a process executor and return the number of workers""" 
    620        n_jobs = self.effective_n_jobs(n_jobs) 
    621        if n_jobs == 1: 
    622            raise FallbackToBackend(SequentialBackend(nesting_level=self.nesting_level)) 
    623 
    624        memmapping_executor_kwargs = { 
    625            **self.backend_kwargs, 
    626            **memmapping_executor_kwargs, 
    627        } 
    628 
    629        # Prohibit the use of 'timeout' in the LokyBackend, as 'idle_worker_timeout' 
    630        # better describes the backend's behavior. 
    631        if "timeout" in memmapping_executor_kwargs: 
    632            raise ValueError( 
    633                "The 'timeout' parameter is not supported by the LokyBackend. " 
    634                "Please use the `idle_worker_timeout` parameter instead." 
    635            ) 
    636        if idle_worker_timeout is None: 
    637            idle_worker_timeout = self.backend_kwargs.get("idle_worker_timeout", 300) 
    638 
    639        self._workers = get_memmapping_executor( 
    640            n_jobs, 
    641            timeout=idle_worker_timeout, 
    642            env=self._prepare_worker_env(n_jobs=n_jobs), 
    643            context_id=parallel._id, 
    644            **memmapping_executor_kwargs, 
    645        ) 
    646        self.parallel = parallel 
    647        return n_jobs 
    648 
    649    def effective_n_jobs(self, n_jobs): 
    650        """Determine the number of jobs which are going to run in parallel""" 
    651        if n_jobs == 0: 
    652            raise ValueError("n_jobs == 0 in Parallel has no meaning") 
    653        elif mp is None or n_jobs is None: 
    654            # multiprocessing is not available or disabled, fallback 
    655            # to sequential mode 
    656            return 1 
    657        elif mp.current_process().daemon: 
    658            # Daemonic processes cannot have children 
    659            if n_jobs != 1: 
    660                if inside_dask_worker(): 
    661                    msg = ( 
    662                        "Inside a Dask worker with daemon=True, " 
    663                        "setting n_jobs=1.\nPossible work-arounds:\n" 
    664                        "- dask.config.set(" 
    665                        "{'distributed.worker.daemon': False})\n" 
    666                        "- set the environment variable " 
    667                        "DASK_DISTRIBUTED__WORKER__DAEMON=False\n" 
    668                        "before creating your Dask cluster." 
    669                    ) 
    670                else: 
    671                    msg = ( 
    672                        "Loky-backed parallel loops cannot be called in a" 
    673                        " multiprocessing, setting n_jobs=1" 
    674                    ) 
    675                warnings.warn(msg, stacklevel=3) 
    676 
    677            return 1 
    678        elif not (self.in_main_thread() or self.nesting_level == 0): 
    679            # Prevent posix fork inside in non-main posix threads 
    680            if n_jobs != 1: 
    681                warnings.warn( 
    682                    "Loky-backed parallel loops cannot be nested below " 
    683                    "threads, setting n_jobs=1", 
    684                    stacklevel=3, 
    685                ) 
    686            return 1 
    687        elif n_jobs < 0: 
    688            n_jobs = max(cpu_count() + 1 + n_jobs, 1) 
    689        return n_jobs 
    690 
    691    def submit(self, func, callback=None): 
    692        """Schedule a func to be run""" 
    693        future = self._workers.submit(func) 
    694        if callback is not None: 
    695            future.add_done_callback(callback) 
    696        return future 
    697 
    698    def retrieve_result_callback(self, future): 
    699        """Retrieve the result, here out is the future given by submit""" 
    700        try: 
    701            return future.result() 
    702        except ShutdownExecutorError: 
    703            raise RuntimeError( 
    704                "The executor underlying Parallel has been shutdown. " 
    705                "This is likely due to the garbage collection of a previous " 
    706                "generator from a call to Parallel with return_as='generator'." 
    707                " Make sure the generator is not garbage collected when " 
    708                "submitting a new job or that it is first properly exhausted." 
    709            ) 
    710 
    711    def terminate(self): 
    712        if self._workers is not None: 
    713            # Don't terminate the workers as we want to reuse them in later 
    714            # calls, but cleanup the temporary resources that the Parallel call 
    715            # created. This 'hack' requires a private, low-level operation. 
    716            self._workers._temp_folder_manager._clean_temporary_resources( 
    717                context_id=self.parallel._id, force=False 
    718            ) 
    719            self._workers = None 
    720 
    721        self.reset_batch_stats() 
    722 
    723    def abort_everything(self, ensure_ready=True): 
    724        """Shutdown the workers and restart a new one with the same parameters""" 
    725        self._workers.terminate(kill_workers=True) 
    726        self._workers = None 
    727 
    728        if ensure_ready: 
    729            self.configure(n_jobs=self.parallel.n_jobs, parallel=self.parallel) 
    730 
    731 
    732class FallbackToBackend(Exception): 
    733    """Raised when configuration should fallback to another backend""" 
    734 
    735    def __init__(self, backend): 
    736        self.backend = backend 
    737 
    738 
    739def inside_dask_worker(): 
    740    """Check whether the current function is executed inside a Dask worker.""" 
    741    # This function can not be in joblib._dask because there would be a 
    742    # circular import: 
    743    # _dask imports _parallel_backend that imports _dask ... 
    744    try: 
    745        from distributed import get_worker 
    746    except ImportError: 
    747        return False 
    748 
    749    try: 
    750        get_worker() 
    751        return True 
    752    except ValueError: 
    753        return False