1"""
2Backends for embarrassingly parallel code.
3"""
4
5import contextlib
6import gc
7import os
8import threading
9import warnings
10from abc import ABCMeta, abstractmethod
11
12from ._multiprocessing_helpers import mp
13from ._utils import (
14 _retrieve_traceback_capturing_wrapped_call,
15 _TracebackCapturingWrapper,
16)
17
18if mp is not None:
19 from multiprocessing.pool import ThreadPool
20
21 from .executor import get_memmapping_executor
22
23 # Import loky only if multiprocessing is present
24 from .externals.loky import cpu_count, process_executor
25 from .externals.loky.process_executor import ShutdownExecutorError
26 from .pool import MemmappingPool
27
28
29class ParallelBackendBase(metaclass=ABCMeta):
30 """Helper abc which defines all methods a ParallelBackend must implement"""
31
32 default_n_jobs = 1
33
34 supports_inner_max_num_threads = False
35
36 # This flag was introduced for backward compatibility reasons.
37 # New backends should always set it to True and implement the
38 # `retrieve_result_callback` method.
39 supports_retrieve_callback = False
40
41 @property
42 def supports_return_generator(self):
43 return self.supports_retrieve_callback
44
45 @property
46 def supports_timeout(self):
47 return self.supports_retrieve_callback
48
49 nesting_level = None
50
51 def __init__(
52 self, nesting_level=None, inner_max_num_threads=None, **backend_kwargs
53 ):
54 super().__init__()
55 self.nesting_level = nesting_level
56 self.inner_max_num_threads = inner_max_num_threads
57 self.backend_kwargs = backend_kwargs
58
59 MAX_NUM_THREADS_VARS = [
60 "OMP_NUM_THREADS",
61 "OPENBLAS_NUM_THREADS",
62 "MKL_NUM_THREADS",
63 "BLIS_NUM_THREADS",
64 "VECLIB_MAXIMUM_THREADS",
65 "NUMBA_NUM_THREADS",
66 "NUMEXPR_NUM_THREADS",
67 ]
68
69 TBB_ENABLE_IPC_VAR = "ENABLE_IPC"
70
71 @abstractmethod
72 def effective_n_jobs(self, n_jobs):
73 """Determine the number of jobs that can actually run in parallel
74
75 n_jobs is the number of workers requested by the callers. Passing
76 n_jobs=-1 means requesting all available workers for instance matching
77 the number of CPU cores on the worker host(s).
78
79 This method should return a guesstimate of the number of workers that
80 can actually perform work concurrently. The primary use case is to make
81 it possible for the caller to know in how many chunks to slice the
82 work.
83
84 In general working on larger data chunks is more efficient (less
85 scheduling overhead and better use of CPU cache prefetching heuristics)
86 as long as all the workers have enough work to do.
87 """
88
89 def apply_async(self, func, callback=None):
90 """Deprecated: implement `submit` instead."""
91 raise NotImplementedError("Implement `submit` instead.")
92
93 def submit(self, func, callback=None):
94 """Schedule a function to be run and return a future-like object.
95
96 This method should return a future-like object that allow tracking
97 the progress of the task.
98
99 If ``supports_retrieve_callback`` is False, the return value of this
100 method is passed to ``retrieve_result`` instead of calling
101 ``retrieve_result_callback``.
102
103 Parameters
104 ----------
105 func: callable
106 The function to be run in parallel.
107
108 callback: callable
109 A callable that will be called when the task is completed. This callable
110 is a wrapper around ``retrieve_result_callback``. This should be added
111 to the future-like object returned by this method, so that the callback
112 is called when the task is completed.
113
114 For future-like backends, this can be achieved with something like
115 ``future.add_done_callback(callback)``.
116
117 Returns
118 -------
119 future: future-like
120 A future-like object to track the execution of the submitted function.
121 """
122 warnings.warn(
123 "`apply_async` is deprecated, implement and use `submit` instead.",
124 DeprecationWarning,
125 )
126 return self.apply_async(func, callback)
127
128 def retrieve_result_callback(self, out):
129 """Called within the callback function passed to `submit`.
130
131 This method can customise how the result of the function is retrieved
132 from the future-like object.
133
134 Parameters
135 ----------
136 future: future-like
137 The future-like object returned by the `submit` method.
138
139 Returns
140 -------
141 result: object
142 The result of the function executed in parallel.
143 """
144
145 def retrieve_result(self, out, timeout=None):
146 """Hook to retrieve the result when support_retrieve_callback=False.
147
148 The argument `out` is the result of the `submit` call. This method
149 should return the result of the computation or raise an exception if
150 the computation failed.
151 """
152 if self.supports_timeout:
153 return out.get(timeout=timeout)
154 else:
155 return out.get()
156
157 def configure(
158 self, n_jobs=1, parallel=None, prefer=None, require=None, **backend_kwargs
159 ):
160 """Reconfigure the backend and return the number of workers.
161
162 This makes it possible to reuse an existing backend instance for
163 successive independent calls to Parallel with different parameters.
164 """
165 self.parallel = parallel
166 return self.effective_n_jobs(n_jobs)
167
168 def start_call(self):
169 """Call-back method called at the beginning of a Parallel call"""
170
171 def stop_call(self):
172 """Call-back method called at the end of a Parallel call"""
173
174 def terminate(self):
175 """Shutdown the workers and free the shared memory."""
176
177 def compute_batch_size(self):
178 """Determine the optimal batch size"""
179 return 1
180
181 def batch_completed(self, batch_size, duration):
182 """Callback indicate how long it took to run a batch"""
183
184 def abort_everything(self, ensure_ready=True):
185 """Abort any running tasks
186
187 This is called when an exception has been raised when executing a task
188 and all the remaining tasks will be ignored and can therefore be
189 aborted to spare computation resources.
190
191 If ensure_ready is True, the backend should be left in an operating
192 state as future tasks might be re-submitted via that same backend
193 instance.
194
195 If ensure_ready is False, the implementer of this method can decide
196 to leave the backend in a closed / terminated state as no new task
197 are expected to be submitted to this backend.
198
199 Setting ensure_ready to False is an optimization that can be leveraged
200 when aborting tasks via killing processes from a local process pool
201 managed by the backend it-self: if we expect no new tasks, there is no
202 point in re-creating new workers.
203 """
204 # Does nothing by default: to be overridden in subclasses when
205 # canceling tasks is possible.
206 pass
207
208 def get_nested_backend(self):
209 """Backend instance to be used by nested Parallel calls.
210
211 By default a thread-based backend is used for the first level of
212 nesting. Beyond, switch to sequential backend to avoid spawning too
213 many threads on the host.
214 """
215 nesting_level = getattr(self, "nesting_level", 0) + 1
216 if nesting_level > 1:
217 return SequentialBackend(nesting_level=nesting_level), None
218 else:
219 return ThreadingBackend(nesting_level=nesting_level), None
220
221 def _prepare_worker_env(self, n_jobs):
222 """Return environment variables limiting threadpools in external libs.
223
224 This function return a dict containing environment variables to pass
225 when creating a pool of process. These environment variables limit the
226 number of threads to `n_threads` for OpenMP, MKL, Accelerated and
227 OpenBLAS libraries in the child processes.
228 """
229 explicit_n_threads = self.inner_max_num_threads
230 default_n_threads = max(cpu_count() // n_jobs, 1)
231
232 # Set the inner environment variables to self.inner_max_num_threads if
233 # it is given. Else, default to cpu_count // n_jobs unless the variable
234 # is already present in the parent process environment.
235 env = {}
236 for var in self.MAX_NUM_THREADS_VARS:
237 if explicit_n_threads is None:
238 var_value = os.environ.get(var, default_n_threads)
239 else:
240 var_value = explicit_n_threads
241
242 env[var] = str(var_value)
243
244 if self.TBB_ENABLE_IPC_VAR not in os.environ:
245 # To avoid over-subscription when using TBB, let the TBB schedulers
246 # use Inter Process Communication to coordinate:
247 env[self.TBB_ENABLE_IPC_VAR] = "1"
248 return env
249
250 @contextlib.contextmanager
251 def retrieval_context(self):
252 """Context manager to manage an execution context.
253
254 Calls to Parallel.retrieve will be made inside this context.
255
256 By default, this does nothing. It may be useful for subclasses to
257 handle nested parallelism. In particular, it may be required to avoid
258 deadlocks if a backend manages a fixed number of workers, when those
259 workers may be asked to do nested Parallel calls. Without
260 'retrieval_context' this could lead to deadlock, as all the workers
261 managed by the backend may be "busy" waiting for the nested parallel
262 calls to finish, but the backend has no free workers to execute those
263 tasks.
264 """
265 yield
266
267 @staticmethod
268 def in_main_thread():
269 return isinstance(threading.current_thread(), threading._MainThread)
270
271
272class SequentialBackend(ParallelBackendBase):
273 """A ParallelBackend which will execute all batches sequentially.
274
275 Does not use/create any threading objects, and hence has minimal
276 overhead. Used when n_jobs == 1.
277 """
278
279 uses_threads = True
280 supports_timeout = False
281 supports_retrieve_callback = False
282 supports_sharedmem = True
283
284 def effective_n_jobs(self, n_jobs):
285 """Determine the number of jobs which are going to run in parallel"""
286 if n_jobs == 0:
287 raise ValueError("n_jobs == 0 in Parallel has no meaning")
288 return 1
289
290 def submit(self, func, callback=None):
291 """Schedule a func to be run"""
292 raise RuntimeError("Should never be called for SequentialBackend.")
293
294 def retrieve_result_callback(self, out):
295 raise RuntimeError("Should never be called for SequentialBackend.")
296
297 def get_nested_backend(self):
298 # import is not top level to avoid cyclic import errors.
299 from .parallel import get_active_backend
300
301 # SequentialBackend should neither change the nesting level, the
302 # default backend or the number of jobs. Just return the current one.
303 return get_active_backend()
304
305
306class PoolManagerMixin(object):
307 """A helper class for managing pool of workers."""
308
309 _pool = None
310
311 def effective_n_jobs(self, n_jobs):
312 """Determine the number of jobs which are going to run in parallel"""
313 if n_jobs == 0:
314 raise ValueError("n_jobs == 0 in Parallel has no meaning")
315 elif mp is None or n_jobs is None:
316 # multiprocessing is not available or disabled, fallback
317 # to sequential mode
318 return 1
319 elif n_jobs < 0:
320 n_jobs = max(cpu_count() + 1 + n_jobs, 1)
321 return n_jobs
322
323 def terminate(self):
324 """Shutdown the process or thread pool"""
325 if self._pool is not None:
326 self._pool.close()
327 self._pool.terminate() # terminate does a join()
328 self._pool = None
329
330 def _get_pool(self):
331 """Used by `submit` to make it possible to implement lazy init"""
332 return self._pool
333
334 def submit(self, func, callback=None):
335 """Schedule a func to be run"""
336 # Here, we need a wrapper to avoid crashes on KeyboardInterruptErrors.
337 # We also call the callback on error, to make sure the pool does not
338 # wait on crashed jobs.
339 return self._get_pool().apply_async(
340 _TracebackCapturingWrapper(func),
341 (),
342 callback=callback,
343 error_callback=callback,
344 )
345
346 def retrieve_result_callback(self, result):
347 """Mimic concurrent.futures results, raising an error if needed."""
348 # In the multiprocessing Pool API, the callback are called with the
349 # result value as an argument so `result`(`out`) is the output of
350 # job.get(). It's either the result or the exception raised while
351 # collecting the result.
352 return _retrieve_traceback_capturing_wrapped_call(result)
353
354 def abort_everything(self, ensure_ready=True):
355 """Shutdown the pool and restart a new one with the same parameters"""
356 self.terminate()
357 if ensure_ready:
358 self.configure(
359 n_jobs=self.parallel.n_jobs,
360 parallel=self.parallel,
361 **self.parallel._backend_kwargs,
362 )
363
364
365class AutoBatchingMixin(object):
366 """A helper class for automagically batching jobs."""
367
368 # In seconds, should be big enough to hide multiprocessing dispatching
369 # overhead.
370 # This settings was found by running benchmarks/bench_auto_batching.py
371 # with various parameters on various platforms.
372 MIN_IDEAL_BATCH_DURATION = 0.2
373
374 # Should not be too high to avoid stragglers: long jobs running alone
375 # on a single worker while other workers have no work to process any more.
376 MAX_IDEAL_BATCH_DURATION = 2
377
378 # Batching counters default values
379 _DEFAULT_EFFECTIVE_BATCH_SIZE = 1
380 _DEFAULT_SMOOTHED_BATCH_DURATION = 0.0
381
382 def __init__(self, **kwargs):
383 super().__init__(**kwargs)
384 self._effective_batch_size = self._DEFAULT_EFFECTIVE_BATCH_SIZE
385 self._smoothed_batch_duration = self._DEFAULT_SMOOTHED_BATCH_DURATION
386
387 def compute_batch_size(self):
388 """Determine the optimal batch size"""
389 old_batch_size = self._effective_batch_size
390 batch_duration = self._smoothed_batch_duration
391 if batch_duration > 0 and batch_duration < self.MIN_IDEAL_BATCH_DURATION:
392 # The current batch size is too small: the duration of the
393 # processing of a batch of task is not large enough to hide
394 # the scheduling overhead.
395 ideal_batch_size = int(
396 old_batch_size * self.MIN_IDEAL_BATCH_DURATION / batch_duration
397 )
398 # Multiply by two to limit oscilations between min and max.
399 ideal_batch_size *= 2
400
401 # dont increase the batch size too fast to limit huge batch sizes
402 # potentially leading to starving worker
403 batch_size = min(2 * old_batch_size, ideal_batch_size)
404
405 batch_size = max(batch_size, 1)
406
407 self._effective_batch_size = batch_size
408 if self.parallel.verbose >= 10:
409 self.parallel._print(
410 f"Batch computation too fast ({batch_duration}s.) "
411 f"Setting batch_size={batch_size}."
412 )
413 elif batch_duration > self.MAX_IDEAL_BATCH_DURATION and old_batch_size >= 2:
414 # The current batch size is too big. If we schedule overly long
415 # running batches some CPUs might wait with nothing left to do
416 # while a couple of CPUs a left processing a few long running
417 # batches. Better reduce the batch size a bit to limit the
418 # likelihood of scheduling such stragglers.
419
420 # decrease the batch size quickly to limit potential starving
421 ideal_batch_size = int(
422 old_batch_size * self.MIN_IDEAL_BATCH_DURATION / batch_duration
423 )
424 # Multiply by two to limit oscilations between min and max.
425 batch_size = max(2 * ideal_batch_size, 1)
426 self._effective_batch_size = batch_size
427 if self.parallel.verbose >= 10:
428 self.parallel._print(
429 f"Batch computation too slow ({batch_duration}s.) "
430 f"Setting batch_size={batch_size}."
431 )
432 else:
433 # No batch size adjustment
434 batch_size = old_batch_size
435
436 if batch_size != old_batch_size:
437 # Reset estimation of the smoothed mean batch duration: this
438 # estimate is updated in the multiprocessing apply_async
439 # CallBack as long as the batch_size is constant. Therefore
440 # we need to reset the estimate whenever we re-tune the batch
441 # size.
442 self._smoothed_batch_duration = self._DEFAULT_SMOOTHED_BATCH_DURATION
443
444 return batch_size
445
446 def batch_completed(self, batch_size, duration):
447 """Callback indicate how long it took to run a batch"""
448 if batch_size == self._effective_batch_size:
449 # Update the smoothed streaming estimate of the duration of a batch
450 # from dispatch to completion
451 old_duration = self._smoothed_batch_duration
452 if old_duration == self._DEFAULT_SMOOTHED_BATCH_DURATION:
453 # First record of duration for this batch size after the last
454 # reset.
455 new_duration = duration
456 else:
457 # Update the exponentially weighted average of the duration of
458 # batch for the current effective size.
459 new_duration = 0.8 * old_duration + 0.2 * duration
460 self._smoothed_batch_duration = new_duration
461
462 def reset_batch_stats(self):
463 """Reset batch statistics to default values.
464
465 This avoids interferences with future jobs.
466 """
467 self._effective_batch_size = self._DEFAULT_EFFECTIVE_BATCH_SIZE
468 self._smoothed_batch_duration = self._DEFAULT_SMOOTHED_BATCH_DURATION
469
470
471class ThreadingBackend(PoolManagerMixin, ParallelBackendBase):
472 """A ParallelBackend which will use a thread pool to execute batches in.
473
474 This is a low-overhead backend but it suffers from the Python Global
475 Interpreter Lock if the called function relies a lot on Python objects.
476 Mostly useful when the execution bottleneck is a compiled extension that
477 explicitly releases the GIL (for instance a Cython loop wrapped in a "with
478 nogil" block or an expensive call to a library such as NumPy).
479
480 The actual thread pool is lazily initialized: the actual thread pool
481 construction is delayed to the first call to apply_async.
482
483 ThreadingBackend is used as the default backend for nested calls.
484 """
485
486 supports_retrieve_callback = True
487 uses_threads = True
488 supports_sharedmem = True
489
490 def configure(self, n_jobs=1, parallel=None, **backend_kwargs):
491 """Build a process or thread pool and return the number of workers"""
492 n_jobs = self.effective_n_jobs(n_jobs)
493 if n_jobs == 1:
494 # Avoid unnecessary overhead and use sequential backend instead.
495 raise FallbackToBackend(SequentialBackend(nesting_level=self.nesting_level))
496 self.parallel = parallel
497 self._n_jobs = n_jobs
498 return n_jobs
499
500 def _get_pool(self):
501 """Lazily initialize the thread pool
502
503 The actual pool of worker threads is only initialized at the first
504 call to apply_async.
505 """
506 if self._pool is None:
507 self._pool = ThreadPool(self._n_jobs)
508 return self._pool
509
510
511class MultiprocessingBackend(PoolManagerMixin, AutoBatchingMixin, ParallelBackendBase):
512 """A ParallelBackend which will use a multiprocessing.Pool.
513
514 Will introduce some communication and memory overhead when exchanging
515 input and output data with the with the worker Python processes.
516 However, does not suffer from the Python Global Interpreter Lock.
517 """
518
519 supports_retrieve_callback = True
520 supports_return_generator = False
521
522 def effective_n_jobs(self, n_jobs):
523 """Determine the number of jobs which are going to run in parallel.
524
525 This also checks if we are attempting to create a nested parallel
526 loop.
527 """
528 if mp is None:
529 return 1
530
531 if mp.current_process().daemon:
532 # Daemonic processes cannot have children
533 if n_jobs != 1:
534 if inside_dask_worker():
535 msg = (
536 "Inside a Dask worker with daemon=True, "
537 "setting n_jobs=1.\nPossible work-arounds:\n"
538 "- dask.config.set("
539 "{'distributed.worker.daemon': False})"
540 "- set the environment variable "
541 "DASK_DISTRIBUTED__WORKER__DAEMON=False\n"
542 "before creating your Dask cluster."
543 )
544 else:
545 msg = (
546 "Multiprocessing-backed parallel loops "
547 "cannot be nested, setting n_jobs=1"
548 )
549 warnings.warn(msg, stacklevel=3)
550 return 1
551
552 if process_executor._CURRENT_DEPTH > 0:
553 # Mixing loky and multiprocessing in nested loop is not supported
554 if n_jobs != 1:
555 warnings.warn(
556 "Multiprocessing-backed parallel loops cannot be nested,"
557 " below loky, setting n_jobs=1",
558 stacklevel=3,
559 )
560 return 1
561
562 elif not (self.in_main_thread() or self.nesting_level == 0):
563 # Prevent posix fork inside in non-main posix threads
564 if n_jobs != 1:
565 warnings.warn(
566 "Multiprocessing-backed parallel loops cannot be nested"
567 " below threads, setting n_jobs=1",
568 stacklevel=3,
569 )
570 return 1
571
572 return super(MultiprocessingBackend, self).effective_n_jobs(n_jobs)
573
574 def configure(
575 self,
576 n_jobs=1,
577 parallel=None,
578 prefer=None,
579 require=None,
580 **memmapping_pool_kwargs,
581 ):
582 """Build a process or thread pool and return the number of workers"""
583 n_jobs = self.effective_n_jobs(n_jobs)
584 if n_jobs == 1:
585 raise FallbackToBackend(SequentialBackend(nesting_level=self.nesting_level))
586
587 memmapping_pool_kwargs = {
588 **self.backend_kwargs,
589 **memmapping_pool_kwargs,
590 }
591
592 # Make sure to free as much memory as possible before forking
593 gc.collect()
594 self._pool = MemmappingPool(n_jobs, **memmapping_pool_kwargs)
595 self.parallel = parallel
596 return n_jobs
597
598 def terminate(self):
599 """Shutdown the process or thread pool"""
600 super(MultiprocessingBackend, self).terminate()
601 self.reset_batch_stats()
602
603
604class LokyBackend(AutoBatchingMixin, ParallelBackendBase):
605 """Managing pool of workers with loky instead of multiprocessing."""
606
607 supports_retrieve_callback = True
608 supports_inner_max_num_threads = True
609
610 def configure(
611 self,
612 n_jobs=1,
613 parallel=None,
614 prefer=None,
615 require=None,
616 idle_worker_timeout=None,
617 **memmapping_executor_kwargs,
618 ):
619 """Build a process executor and return the number of workers"""
620 n_jobs = self.effective_n_jobs(n_jobs)
621 if n_jobs == 1:
622 raise FallbackToBackend(SequentialBackend(nesting_level=self.nesting_level))
623
624 memmapping_executor_kwargs = {
625 **self.backend_kwargs,
626 **memmapping_executor_kwargs,
627 }
628
629 # Prohibit the use of 'timeout' in the LokyBackend, as 'idle_worker_timeout'
630 # better describes the backend's behavior.
631 if "timeout" in memmapping_executor_kwargs:
632 raise ValueError(
633 "The 'timeout' parameter is not supported by the LokyBackend. "
634 "Please use the `idle_worker_timeout` parameter instead."
635 )
636 if idle_worker_timeout is None:
637 idle_worker_timeout = self.backend_kwargs.get("idle_worker_timeout", 300)
638
639 self._workers = get_memmapping_executor(
640 n_jobs,
641 timeout=idle_worker_timeout,
642 env=self._prepare_worker_env(n_jobs=n_jobs),
643 context_id=parallel._id,
644 **memmapping_executor_kwargs,
645 )
646 self.parallel = parallel
647 return n_jobs
648
649 def effective_n_jobs(self, n_jobs):
650 """Determine the number of jobs which are going to run in parallel"""
651 if n_jobs == 0:
652 raise ValueError("n_jobs == 0 in Parallel has no meaning")
653 elif mp is None or n_jobs is None:
654 # multiprocessing is not available or disabled, fallback
655 # to sequential mode
656 return 1
657 elif mp.current_process().daemon:
658 # Daemonic processes cannot have children
659 if n_jobs != 1:
660 if inside_dask_worker():
661 msg = (
662 "Inside a Dask worker with daemon=True, "
663 "setting n_jobs=1.\nPossible work-arounds:\n"
664 "- dask.config.set("
665 "{'distributed.worker.daemon': False})\n"
666 "- set the environment variable "
667 "DASK_DISTRIBUTED__WORKER__DAEMON=False\n"
668 "before creating your Dask cluster."
669 )
670 else:
671 msg = (
672 "Loky-backed parallel loops cannot be called in a"
673 " multiprocessing, setting n_jobs=1"
674 )
675 warnings.warn(msg, stacklevel=3)
676
677 return 1
678 elif not (self.in_main_thread() or self.nesting_level == 0):
679 # Prevent posix fork inside in non-main posix threads
680 if n_jobs != 1:
681 warnings.warn(
682 "Loky-backed parallel loops cannot be nested below "
683 "threads, setting n_jobs=1",
684 stacklevel=3,
685 )
686 return 1
687 elif n_jobs < 0:
688 n_jobs = max(cpu_count() + 1 + n_jobs, 1)
689 return n_jobs
690
691 def submit(self, func, callback=None):
692 """Schedule a func to be run"""
693 future = self._workers.submit(func)
694 if callback is not None:
695 future.add_done_callback(callback)
696 return future
697
698 def retrieve_result_callback(self, future):
699 """Retrieve the result, here out is the future given by submit"""
700 try:
701 return future.result()
702 except ShutdownExecutorError:
703 raise RuntimeError(
704 "The executor underlying Parallel has been shutdown. "
705 "This is likely due to the garbage collection of a previous "
706 "generator from a call to Parallel with return_as='generator'."
707 " Make sure the generator is not garbage collected when "
708 "submitting a new job or that it is first properly exhausted."
709 )
710
711 def terminate(self):
712 if self._workers is not None:
713 # Don't terminate the workers as we want to reuse them in later
714 # calls, but cleanup the temporary resources that the Parallel call
715 # created. This 'hack' requires a private, low-level operation.
716 self._workers._temp_folder_manager._clean_temporary_resources(
717 context_id=self.parallel._id, force=False
718 )
719 self._workers = None
720
721 self.reset_batch_stats()
722
723 def abort_everything(self, ensure_ready=True):
724 """Shutdown the workers and restart a new one with the same parameters"""
725 self._workers.terminate(kill_workers=True)
726 self._workers = None
727
728 if ensure_ready:
729 self.configure(n_jobs=self.parallel.n_jobs, parallel=self.parallel)
730
731
732class FallbackToBackend(Exception):
733 """Raised when configuration should fallback to another backend"""
734
735 def __init__(self, backend):
736 self.backend = backend
737
738
739def inside_dask_worker():
740 """Check whether the current function is executed inside a Dask worker."""
741 # This function can not be in joblib._dask because there would be a
742 # circular import:
743 # _dask imports _parallel_backend that imports _dask ...
744 try:
745 from distributed import get_worker
746 except ImportError:
747 return False
748
749 try:
750 get_worker()
751 return True
752 except ValueError:
753 return False