Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/joblib/_parallel_backends.py: 35%
276 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-12 06:31 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-12 06:31 +0000
1"""
2Backends for embarrassingly parallel code.
3"""
5import gc
6import os
7import warnings
8import threading
9import contextlib
10from abc import ABCMeta, abstractmethod
13from ._multiprocessing_helpers import mp
15if mp is not None:
16 from .pool import MemmappingPool
17 from multiprocessing.pool import ThreadPool
18 from .executor import get_memmapping_executor
20 # Import loky only if multiprocessing is present
21 from .externals.loky import process_executor, cpu_count
22 from .externals.loky.process_executor import ShutdownExecutorError
23 from .externals.loky.process_executor import _ExceptionWithTraceback
26class ParallelBackendBase(metaclass=ABCMeta):
27 """Helper abc which defines all methods a ParallelBackend must implement"""
29 supports_inner_max_num_threads = False
30 supports_retrieve_callback = False
31 default_n_jobs = 1
33 @property
34 def supports_return_generator(self):
35 return self.supports_retrieve_callback
37 @property
38 def supports_timeout(self):
39 return self.supports_retrieve_callback
41 nesting_level = None
43 def __init__(self, nesting_level=None, inner_max_num_threads=None,
44 **kwargs):
45 super().__init__(**kwargs)
46 self.nesting_level = nesting_level
47 self.inner_max_num_threads = inner_max_num_threads
49 MAX_NUM_THREADS_VARS = [
50 'OMP_NUM_THREADS', 'OPENBLAS_NUM_THREADS', 'MKL_NUM_THREADS',
51 'BLIS_NUM_THREADS', 'VECLIB_MAXIMUM_THREADS', 'NUMBA_NUM_THREADS',
52 'NUMEXPR_NUM_THREADS',
53 ]
55 TBB_ENABLE_IPC_VAR = "ENABLE_IPC"
57 @abstractmethod
58 def effective_n_jobs(self, n_jobs):
59 """Determine the number of jobs that can actually run in parallel
61 n_jobs is the number of workers requested by the callers. Passing
62 n_jobs=-1 means requesting all available workers for instance matching
63 the number of CPU cores on the worker host(s).
65 This method should return a guesstimate of the number of workers that
66 can actually perform work concurrently. The primary use case is to make
67 it possible for the caller to know in how many chunks to slice the
68 work.
70 In general working on larger data chunks is more efficient (less
71 scheduling overhead and better use of CPU cache prefetching heuristics)
72 as long as all the workers have enough work to do.
73 """
75 @abstractmethod
76 def apply_async(self, func, callback=None):
77 """Schedule a func to be run"""
79 def retrieve_result_callback(self, out):
80 """Called within the callback function passed in apply_async.
82 The argument of this function is the argument given to a callback in
83 the considered backend. It is supposed to return the outcome of a task
84 if it succeeded or raise the exception if it failed.
85 """
87 def configure(self, n_jobs=1, parallel=None, prefer=None, require=None,
88 **backend_args):
89 """Reconfigure the backend and return the number of workers.
91 This makes it possible to reuse an existing backend instance for
92 successive independent calls to Parallel with different parameters.
93 """
94 self.parallel = parallel
95 return self.effective_n_jobs(n_jobs)
97 def start_call(self):
98 """Call-back method called at the beginning of a Parallel call"""
100 def stop_call(self):
101 """Call-back method called at the end of a Parallel call"""
103 def terminate(self):
104 """Shutdown the workers and free the shared memory."""
106 def compute_batch_size(self):
107 """Determine the optimal batch size"""
108 return 1
110 def batch_completed(self, batch_size, duration):
111 """Callback indicate how long it took to run a batch"""
113 def get_exceptions(self):
114 """List of exception types to be captured."""
115 return []
117 def abort_everything(self, ensure_ready=True):
118 """Abort any running tasks
120 This is called when an exception has been raised when executing a task
121 and all the remaining tasks will be ignored and can therefore be
122 aborted to spare computation resources.
124 If ensure_ready is True, the backend should be left in an operating
125 state as future tasks might be re-submitted via that same backend
126 instance.
128 If ensure_ready is False, the implementer of this method can decide
129 to leave the backend in a closed / terminated state as no new task
130 are expected to be submitted to this backend.
132 Setting ensure_ready to False is an optimization that can be leveraged
133 when aborting tasks via killing processes from a local process pool
134 managed by the backend it-self: if we expect no new tasks, there is no
135 point in re-creating new workers.
136 """
137 # Does nothing by default: to be overridden in subclasses when
138 # canceling tasks is possible.
139 pass
141 def get_nested_backend(self):
142 """Backend instance to be used by nested Parallel calls.
144 By default a thread-based backend is used for the first level of
145 nesting. Beyond, switch to sequential backend to avoid spawning too
146 many threads on the host.
147 """
148 nesting_level = getattr(self, 'nesting_level', 0) + 1
149 if nesting_level > 1:
150 return SequentialBackend(nesting_level=nesting_level), None
151 else:
152 return ThreadingBackend(nesting_level=nesting_level), None
154 @contextlib.contextmanager
155 def retrieval_context(self):
156 """Context manager to manage an execution context.
158 Calls to Parallel.retrieve will be made inside this context.
160 By default, this does nothing. It may be useful for subclasses to
161 handle nested parallelism. In particular, it may be required to avoid
162 deadlocks if a backend manages a fixed number of workers, when those
163 workers may be asked to do nested Parallel calls. Without
164 'retrieval_context' this could lead to deadlock, as all the workers
165 managed by the backend may be "busy" waiting for the nested parallel
166 calls to finish, but the backend has no free workers to execute those
167 tasks.
168 """
169 yield
171 def _prepare_worker_env(self, n_jobs):
172 """Return environment variables limiting threadpools in external libs.
174 This function return a dict containing environment variables to pass
175 when creating a pool of process. These environment variables limit the
176 number of threads to `n_threads` for OpenMP, MKL, Accelerated and
177 OpenBLAS libraries in the child processes.
178 """
179 explicit_n_threads = self.inner_max_num_threads
180 default_n_threads = str(max(cpu_count() // n_jobs, 1))
182 # Set the inner environment variables to self.inner_max_num_threads if
183 # it is given. Else, default to cpu_count // n_jobs unless the variable
184 # is already present in the parent process environment.
185 env = {}
186 for var in self.MAX_NUM_THREADS_VARS:
187 if explicit_n_threads is None:
188 var_value = os.environ.get(var, None)
189 if var_value is None:
190 var_value = default_n_threads
191 else:
192 var_value = str(explicit_n_threads)
194 env[var] = var_value
196 if self.TBB_ENABLE_IPC_VAR not in os.environ:
197 # To avoid over-subscription when using TBB, let the TBB schedulers
198 # use Inter Process Communication to coordinate:
199 env[self.TBB_ENABLE_IPC_VAR] = "1"
200 return env
202 @staticmethod
203 def in_main_thread():
204 return isinstance(threading.current_thread(), threading._MainThread)
207class SequentialBackend(ParallelBackendBase):
208 """A ParallelBackend which will execute all batches sequentially.
210 Does not use/create any threading objects, and hence has minimal
211 overhead. Used when n_jobs == 1.
212 """
214 uses_threads = True
215 supports_timeout = False
216 supports_retrieve_callback = False
217 supports_sharedmem = True
219 def effective_n_jobs(self, n_jobs):
220 """Determine the number of jobs which are going to run in parallel"""
221 if n_jobs == 0:
222 raise ValueError('n_jobs == 0 in Parallel has no meaning')
223 return 1
225 def apply_async(self, func, callback=None):
226 """Schedule a func to be run"""
227 raise RuntimeError("Should never be called for SequentialBackend.")
229 def retrieve_result_callback(self, out):
230 raise RuntimeError("Should never be called for SequentialBackend.")
232 def get_nested_backend(self):
233 # import is not top level to avoid cyclic import errors.
234 from .parallel import get_active_backend
236 # SequentialBackend should neither change the nesting level, the
237 # default backend or the number of jobs. Just return the current one.
238 return get_active_backend()
241class PoolManagerMixin(object):
242 """A helper class for managing pool of workers."""
244 _pool = None
246 def effective_n_jobs(self, n_jobs):
247 """Determine the number of jobs which are going to run in parallel"""
248 if n_jobs == 0:
249 raise ValueError('n_jobs == 0 in Parallel has no meaning')
250 elif mp is None or n_jobs is None:
251 # multiprocessing is not available or disabled, fallback
252 # to sequential mode
253 return 1
254 elif n_jobs < 0:
255 n_jobs = max(cpu_count() + 1 + n_jobs, 1)
256 return n_jobs
258 def terminate(self):
259 """Shutdown the process or thread pool"""
260 if self._pool is not None:
261 self._pool.close()
262 self._pool.terminate() # terminate does a join()
263 self._pool = None
265 def _get_pool(self):
266 """Used by apply_async to make it possible to implement lazy init"""
267 return self._pool
269 @staticmethod
270 def _wrap_func_call(func):
271 """Protect function call and return error with traceback."""
272 try:
273 return func()
274 except BaseException as e:
275 return _ExceptionWithTraceback(e)
277 def apply_async(self, func, callback=None):
278 """Schedule a func to be run"""
279 # Here, we need a wrapper to avoid crashes on KeyboardInterruptErrors.
280 # We also call the callback on error, to make sure the pool does not
281 # wait on crashed jobs.
282 return self._get_pool().apply_async(
283 self._wrap_func_call, (func,),
284 callback=callback, error_callback=callback
285 )
287 def retrieve_result_callback(self, out):
288 """Mimic concurrent.futures results, raising an error if needed."""
289 if isinstance(out, _ExceptionWithTraceback):
290 rebuild, args = out.__reduce__()
291 out = rebuild(*args)
292 if isinstance(out, BaseException):
293 raise out
294 return out
296 def abort_everything(self, ensure_ready=True):
297 """Shutdown the pool and restart a new one with the same parameters"""
298 self.terminate()
299 if ensure_ready:
300 self.configure(n_jobs=self.parallel.n_jobs, parallel=self.parallel,
301 **self.parallel._backend_args)
304class AutoBatchingMixin(object):
305 """A helper class for automagically batching jobs."""
307 # In seconds, should be big enough to hide multiprocessing dispatching
308 # overhead.
309 # This settings was found by running benchmarks/bench_auto_batching.py
310 # with various parameters on various platforms.
311 MIN_IDEAL_BATCH_DURATION = .2
313 # Should not be too high to avoid stragglers: long jobs running alone
314 # on a single worker while other workers have no work to process any more.
315 MAX_IDEAL_BATCH_DURATION = 2
317 # Batching counters default values
318 _DEFAULT_EFFECTIVE_BATCH_SIZE = 1
319 _DEFAULT_SMOOTHED_BATCH_DURATION = 0.0
321 def __init__(self, **kwargs):
322 super().__init__(**kwargs)
323 self._effective_batch_size = self._DEFAULT_EFFECTIVE_BATCH_SIZE
324 self._smoothed_batch_duration = self._DEFAULT_SMOOTHED_BATCH_DURATION
326 def compute_batch_size(self):
327 """Determine the optimal batch size"""
328 old_batch_size = self._effective_batch_size
329 batch_duration = self._smoothed_batch_duration
330 if (batch_duration > 0 and
331 batch_duration < self.MIN_IDEAL_BATCH_DURATION):
332 # The current batch size is too small: the duration of the
333 # processing of a batch of task is not large enough to hide
334 # the scheduling overhead.
335 ideal_batch_size = int(old_batch_size *
336 self.MIN_IDEAL_BATCH_DURATION /
337 batch_duration)
338 # Multiply by two to limit oscilations between min and max.
339 ideal_batch_size *= 2
341 # dont increase the batch size too fast to limit huge batch sizes
342 # potentially leading to starving worker
343 batch_size = min(2 * old_batch_size, ideal_batch_size)
345 batch_size = max(batch_size, 1)
347 self._effective_batch_size = batch_size
348 if self.parallel.verbose >= 10:
349 self.parallel._print(
350 f"Batch computation too fast ({batch_duration}s.) "
351 f"Setting batch_size={batch_size}."
352 )
353 elif (batch_duration > self.MAX_IDEAL_BATCH_DURATION and
354 old_batch_size >= 2):
355 # The current batch size is too big. If we schedule overly long
356 # running batches some CPUs might wait with nothing left to do
357 # while a couple of CPUs a left processing a few long running
358 # batches. Better reduce the batch size a bit to limit the
359 # likelihood of scheduling such stragglers.
361 # decrease the batch size quickly to limit potential starving
362 ideal_batch_size = int(
363 old_batch_size * self.MIN_IDEAL_BATCH_DURATION / batch_duration
364 )
365 # Multiply by two to limit oscilations between min and max.
366 batch_size = max(2 * ideal_batch_size, 1)
367 self._effective_batch_size = batch_size
368 if self.parallel.verbose >= 10:
369 self.parallel._print(
370 f"Batch computation too slow ({batch_duration}s.) "
371 f"Setting batch_size={batch_size}."
372 )
373 else:
374 # No batch size adjustment
375 batch_size = old_batch_size
377 if batch_size != old_batch_size:
378 # Reset estimation of the smoothed mean batch duration: this
379 # estimate is updated in the multiprocessing apply_async
380 # CallBack as long as the batch_size is constant. Therefore
381 # we need to reset the estimate whenever we re-tune the batch
382 # size.
383 self._smoothed_batch_duration = \
384 self._DEFAULT_SMOOTHED_BATCH_DURATION
386 return batch_size
388 def batch_completed(self, batch_size, duration):
389 """Callback indicate how long it took to run a batch"""
390 if batch_size == self._effective_batch_size:
391 # Update the smoothed streaming estimate of the duration of a batch
392 # from dispatch to completion
393 old_duration = self._smoothed_batch_duration
394 if old_duration == self._DEFAULT_SMOOTHED_BATCH_DURATION:
395 # First record of duration for this batch size after the last
396 # reset.
397 new_duration = duration
398 else:
399 # Update the exponentially weighted average of the duration of
400 # batch for the current effective size.
401 new_duration = 0.8 * old_duration + 0.2 * duration
402 self._smoothed_batch_duration = new_duration
404 def reset_batch_stats(self):
405 """Reset batch statistics to default values.
407 This avoids interferences with future jobs.
408 """
409 self._effective_batch_size = self._DEFAULT_EFFECTIVE_BATCH_SIZE
410 self._smoothed_batch_duration = self._DEFAULT_SMOOTHED_BATCH_DURATION
413class ThreadingBackend(PoolManagerMixin, ParallelBackendBase):
414 """A ParallelBackend which will use a thread pool to execute batches in.
416 This is a low-overhead backend but it suffers from the Python Global
417 Interpreter Lock if the called function relies a lot on Python objects.
418 Mostly useful when the execution bottleneck is a compiled extension that
419 explicitly releases the GIL (for instance a Cython loop wrapped in a "with
420 nogil" block or an expensive call to a library such as NumPy).
422 The actual thread pool is lazily initialized: the actual thread pool
423 construction is delayed to the first call to apply_async.
425 ThreadingBackend is used as the default backend for nested calls.
426 """
428 supports_retrieve_callback = True
429 uses_threads = True
430 supports_sharedmem = True
432 def configure(self, n_jobs=1, parallel=None, **backend_args):
433 """Build a process or thread pool and return the number of workers"""
434 n_jobs = self.effective_n_jobs(n_jobs)
435 if n_jobs == 1:
436 # Avoid unnecessary overhead and use sequential backend instead.
437 raise FallbackToBackend(
438 SequentialBackend(nesting_level=self.nesting_level))
439 self.parallel = parallel
440 self._n_jobs = n_jobs
441 return n_jobs
443 def _get_pool(self):
444 """Lazily initialize the thread pool
446 The actual pool of worker threads is only initialized at the first
447 call to apply_async.
448 """
449 if self._pool is None:
450 self._pool = ThreadPool(self._n_jobs)
451 return self._pool
454class MultiprocessingBackend(PoolManagerMixin, AutoBatchingMixin,
455 ParallelBackendBase):
456 """A ParallelBackend which will use a multiprocessing.Pool.
458 Will introduce some communication and memory overhead when exchanging
459 input and output data with the with the worker Python processes.
460 However, does not suffer from the Python Global Interpreter Lock.
461 """
463 supports_retrieve_callback = True
464 supports_return_generator = False
466 def effective_n_jobs(self, n_jobs):
467 """Determine the number of jobs which are going to run in parallel.
469 This also checks if we are attempting to create a nested parallel
470 loop.
471 """
472 if mp is None:
473 return 1
475 if mp.current_process().daemon:
476 # Daemonic processes cannot have children
477 if n_jobs != 1:
478 if inside_dask_worker():
479 msg = (
480 "Inside a Dask worker with daemon=True, "
481 "setting n_jobs=1.\nPossible work-arounds:\n"
482 "- dask.config.set("
483 "{'distributed.worker.daemon': False})"
484 "- set the environment variable "
485 "DASK_DISTRIBUTED__WORKER__DAEMON=False\n"
486 "before creating your Dask cluster."
487 )
488 else:
489 msg = (
490 'Multiprocessing-backed parallel loops '
491 'cannot be nested, setting n_jobs=1'
492 )
493 warnings.warn(msg, stacklevel=3)
494 return 1
496 if process_executor._CURRENT_DEPTH > 0:
497 # Mixing loky and multiprocessing in nested loop is not supported
498 if n_jobs != 1:
499 warnings.warn(
500 'Multiprocessing-backed parallel loops cannot be nested,'
501 ' below loky, setting n_jobs=1',
502 stacklevel=3)
503 return 1
505 elif not (self.in_main_thread() or self.nesting_level == 0):
506 # Prevent posix fork inside in non-main posix threads
507 if n_jobs != 1:
508 warnings.warn(
509 'Multiprocessing-backed parallel loops cannot be nested'
510 ' below threads, setting n_jobs=1',
511 stacklevel=3)
512 return 1
514 return super(MultiprocessingBackend, self).effective_n_jobs(n_jobs)
516 def configure(self, n_jobs=1, parallel=None, prefer=None, require=None,
517 **memmappingpool_args):
518 """Build a process or thread pool and return the number of workers"""
519 n_jobs = self.effective_n_jobs(n_jobs)
520 if n_jobs == 1:
521 raise FallbackToBackend(
522 SequentialBackend(nesting_level=self.nesting_level))
524 # Make sure to free as much memory as possible before forking
525 gc.collect()
526 self._pool = MemmappingPool(n_jobs, **memmappingpool_args)
527 self.parallel = parallel
528 return n_jobs
530 def terminate(self):
531 """Shutdown the process or thread pool"""
532 super(MultiprocessingBackend, self).terminate()
533 self.reset_batch_stats()
536class LokyBackend(AutoBatchingMixin, ParallelBackendBase):
537 """Managing pool of workers with loky instead of multiprocessing."""
539 supports_retrieve_callback = True
540 supports_inner_max_num_threads = True
542 def configure(self, n_jobs=1, parallel=None, prefer=None, require=None,
543 idle_worker_timeout=300, **memmappingexecutor_args):
544 """Build a process executor and return the number of workers"""
545 n_jobs = self.effective_n_jobs(n_jobs)
546 if n_jobs == 1:
547 raise FallbackToBackend(
548 SequentialBackend(nesting_level=self.nesting_level))
550 self._workers = get_memmapping_executor(
551 n_jobs, timeout=idle_worker_timeout,
552 env=self._prepare_worker_env(n_jobs=n_jobs),
553 context_id=parallel._id, **memmappingexecutor_args)
554 self.parallel = parallel
555 return n_jobs
557 def effective_n_jobs(self, n_jobs):
558 """Determine the number of jobs which are going to run in parallel"""
559 if n_jobs == 0:
560 raise ValueError('n_jobs == 0 in Parallel has no meaning')
561 elif mp is None or n_jobs is None:
562 # multiprocessing is not available or disabled, fallback
563 # to sequential mode
564 return 1
565 elif mp.current_process().daemon:
566 # Daemonic processes cannot have children
567 if n_jobs != 1:
568 if inside_dask_worker():
569 msg = (
570 "Inside a Dask worker with daemon=True, "
571 "setting n_jobs=1.\nPossible work-arounds:\n"
572 "- dask.config.set("
573 "{'distributed.worker.daemon': False})\n"
574 "- set the environment variable "
575 "DASK_DISTRIBUTED__WORKER__DAEMON=False\n"
576 "before creating your Dask cluster."
577 )
578 else:
579 msg = (
580 'Loky-backed parallel loops cannot be called in a'
581 ' multiprocessing, setting n_jobs=1'
582 )
583 warnings.warn(msg, stacklevel=3)
585 return 1
586 elif not (self.in_main_thread() or self.nesting_level == 0):
587 # Prevent posix fork inside in non-main posix threads
588 if n_jobs != 1:
589 warnings.warn(
590 'Loky-backed parallel loops cannot be nested below '
591 'threads, setting n_jobs=1',
592 stacklevel=3)
593 return 1
594 elif n_jobs < 0:
595 n_jobs = max(cpu_count() + 1 + n_jobs, 1)
596 return n_jobs
598 def apply_async(self, func, callback=None):
599 """Schedule a func to be run"""
600 future = self._workers.submit(func)
601 if callback is not None:
602 future.add_done_callback(callback)
603 return future
605 def retrieve_result_callback(self, out):
606 try:
607 return out.result()
608 except ShutdownExecutorError:
609 raise RuntimeError(
610 "The executor underlying Parallel has been shutdown. "
611 "This is likely due to the garbage collection of a previous "
612 "generator from a call to Parallel with return_as='generator'."
613 " Make sure the generator is not garbage collected when "
614 "submitting a new job or that it is first properly exhausted."
615 )
617 def terminate(self):
618 if self._workers is not None:
619 # Don't terminate the workers as we want to reuse them in later
620 # calls, but cleanup the temporary resources that the Parallel call
621 # created. This 'hack' requires a private, low-level operation.
622 self._workers._temp_folder_manager._clean_temporary_resources(
623 context_id=self.parallel._id, force=False
624 )
625 self._workers = None
627 self.reset_batch_stats()
629 def abort_everything(self, ensure_ready=True):
630 """Shutdown the workers and restart a new one with the same parameters
631 """
632 self._workers.terminate(kill_workers=True)
633 self._workers = None
635 if ensure_ready:
636 self.configure(n_jobs=self.parallel.n_jobs, parallel=self.parallel)
639class FallbackToBackend(Exception):
640 """Raised when configuration should fallback to another backend"""
642 def __init__(self, backend):
643 self.backend = backend
646def inside_dask_worker():
647 """Check whether the current function is executed inside a Dask worker.
648 """
649 # This function can not be in joblib._dask because there would be a
650 # circular import:
651 # _dask imports _parallel_backend that imports _dask ...
652 try:
653 from distributed import get_worker
654 except ImportError:
655 return False
657 try:
658 get_worker()
659 return True
660 except ValueError:
661 return False