Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/joblib/parallel.py: 17%
616 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-12 06:31 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-12 06:31 +0000
1"""
2Helpers for embarrassingly parallel code.
3"""
4# Author: Gael Varoquaux < gael dot varoquaux at normalesup dot org >
5# Copyright: 2010, Gael Varoquaux
6# License: BSD 3 clause
8from __future__ import division
10import os
11import sys
12from math import sqrt
13import functools
14import collections
15import time
16import threading
17import itertools
18from uuid import uuid4
19from numbers import Integral
20import warnings
21import queue
22import weakref
23from contextlib import nullcontext
25from multiprocessing import TimeoutError
27from ._multiprocessing_helpers import mp
29from .logger import Logger, short_format_time
30from .disk import memstr_to_bytes
31from ._parallel_backends import (FallbackToBackend, MultiprocessingBackend,
32 ThreadingBackend, SequentialBackend,
33 LokyBackend)
34from ._utils import eval_expr, _Sentinel
36# Make sure that those two classes are part of the public joblib.parallel API
37# so that 3rd party backend implementers can import them from here.
38from ._parallel_backends import AutoBatchingMixin # noqa
39from ._parallel_backends import ParallelBackendBase # noqa
42IS_PYPY = hasattr(sys, "pypy_version_info")
45BACKENDS = {
46 'threading': ThreadingBackend,
47 'sequential': SequentialBackend,
48}
49# name of the backend used by default by Parallel outside of any context
50# managed by ``parallel_config`` or ``parallel_backend``.
52# threading is the only backend that is always everywhere
53DEFAULT_BACKEND = 'threading'
55MAYBE_AVAILABLE_BACKENDS = {'multiprocessing', 'loky'}
57# if multiprocessing is available, so is loky, we set it as the default
58# backend
59if mp is not None:
60 BACKENDS['multiprocessing'] = MultiprocessingBackend
61 from .externals import loky
62 BACKENDS['loky'] = LokyBackend
63 DEFAULT_BACKEND = 'loky'
66DEFAULT_THREAD_BACKEND = 'threading'
69# Thread local value that can be overridden by the ``parallel_config`` context
70# manager
71_backend = threading.local()
74def _register_dask():
75 """Register Dask Backend if called with parallel_config(backend="dask")"""
76 try:
77 from ._dask import DaskDistributedBackend
78 register_parallel_backend('dask', DaskDistributedBackend)
79 except ImportError as e:
80 msg = ("To use the dask.distributed backend you must install both "
81 "the `dask` and distributed modules.\n\n"
82 "See https://dask.pydata.org/en/latest/install.html for more "
83 "information.")
84 raise ImportError(msg) from e
87EXTERNAL_BACKENDS = {
88 'dask': _register_dask,
89}
92# Sentinels for the default values of the Parallel constructor and
93# the parallel_config and parallel_backend context managers
94default_parallel_config = {
95 "backend": _Sentinel(default_value=None),
96 "n_jobs": _Sentinel(default_value=None),
97 "verbose": _Sentinel(default_value=0),
98 "temp_folder": _Sentinel(default_value=None),
99 "max_nbytes": _Sentinel(default_value="1M"),
100 "mmap_mode": _Sentinel(default_value="r"),
101 "prefer": _Sentinel(default_value=None),
102 "require": _Sentinel(default_value=None),
103}
106VALID_BACKEND_HINTS = ('processes', 'threads', None)
107VALID_BACKEND_CONSTRAINTS = ('sharedmem', None)
110def _get_config_param(param, context_config, key):
111 """Return the value of a parallel config parameter
113 Explicitly setting it in Parallel has priority over setting in a
114 parallel_(config/backend) context manager.
115 """
116 if param is not default_parallel_config[key]:
117 # param is explicitely set, return it
118 return param
120 if context_config[key] is not default_parallel_config[key]:
121 # there's a context manager and the key is set, return it
122 return context_config[key]
124 # Otherwise, we are in the default_parallel_config,
125 # return the default value
126 return param.default_value
129def get_active_backend(
130 prefer=default_parallel_config["prefer"],
131 require=default_parallel_config["require"],
132 verbose=default_parallel_config["verbose"],
133):
134 """Return the active default backend"""
135 backend, config = _get_active_backend(prefer, require, verbose)
136 n_jobs = _get_config_param(
137 default_parallel_config['n_jobs'], config, "n_jobs"
138 )
139 return backend, n_jobs
142def _get_active_backend(
143 prefer=default_parallel_config["prefer"],
144 require=default_parallel_config["require"],
145 verbose=default_parallel_config["verbose"],
146):
147 """Return the active default backend"""
149 backend_config = getattr(_backend, "config", default_parallel_config)
151 backend = _get_config_param(
152 default_parallel_config['backend'], backend_config, "backend"
153 )
154 prefer = _get_config_param(prefer, backend_config, "prefer")
155 require = _get_config_param(require, backend_config, "require")
156 verbose = _get_config_param(verbose, backend_config, "verbose")
158 if prefer not in VALID_BACKEND_HINTS:
159 raise ValueError(
160 f"prefer={prefer} is not a valid backend hint, "
161 f"expected one of {VALID_BACKEND_HINTS}"
162 )
163 if require not in VALID_BACKEND_CONSTRAINTS:
164 raise ValueError(
165 f"require={require} is not a valid backend constraint, "
166 f"expected one of {VALID_BACKEND_CONSTRAINTS}"
167 )
168 if prefer == 'processes' and require == 'sharedmem':
169 raise ValueError(
170 "prefer == 'processes' and require == 'sharedmem'"
171 " are inconsistent settings"
172 )
174 explicit_backend = True
175 if backend is None:
177 # We are either outside of the scope of any parallel_(config/backend)
178 # context manager or the context manager did not set a backend.
179 # create the default backend instance now.
180 backend = BACKENDS[DEFAULT_BACKEND](nesting_level=0)
181 explicit_backend = False
183 # Try to use the backend set by the user with the context manager.
185 nesting_level = backend.nesting_level
186 uses_threads = getattr(backend, 'uses_threads', False)
187 supports_sharedmem = getattr(backend, 'supports_sharedmem', False)
188 # Force to use thread-based backend if the provided backend does not
189 # match the shared memory constraint or if the backend is not explicitely
190 # given and threads are prefered.
191 force_threads = (require == 'sharedmem' and not supports_sharedmem)
192 force_threads |= (
193 not explicit_backend and prefer == 'threads' and not uses_threads
194 )
195 if force_threads:
196 # This backend does not match the shared memory constraint:
197 # fallback to the default thead-based backend.
198 sharedmem_backend = BACKENDS[DEFAULT_THREAD_BACKEND](
199 nesting_level=nesting_level
200 )
201 # Warn the user if we forced the backend to thread-based, while the
202 # user explicitely specified a non-thread-based backend.
203 if verbose >= 10 and explicit_backend:
204 print(
205 f"Using {sharedmem_backend.__class__.__name__} as "
206 f"joblib backend instead of {backend.__class__.__name__} "
207 "as the latter does not provide shared memory semantics."
208 )
209 # Force to n_jobs=1 by default
210 thread_config = backend_config.copy()
211 thread_config['n_jobs'] = 1
212 return sharedmem_backend, thread_config
214 return backend, backend_config
217class parallel_config:
218 """Set the default backend or configuration for :class:`~joblib.Parallel`.
220 This is an alternative to directly passing keyword arguments to the
221 :class:`~joblib.Parallel` class constructor. It is particularly useful when
222 calling into library code that uses joblib internally but does not expose
223 the various parallel configuration arguments in its own API.
225 Parameters
226 ----------
227 backend : str or ParallelBackendBase instance, default=None
228 If ``backend`` is a string it must match a previously registered
229 implementation using the :func:`~register_parallel_backend` function.
231 By default the following backends are available:
233 - 'loky': single-host, process-based parallelism (used by default),
234 - 'threading': single-host, thread-based parallelism,
235 - 'multiprocessing': legacy single-host, process-based parallelism.
237 'loky' is recommended to run functions that manipulate Python objects.
238 'threading' is a low-overhead alternative that is most efficient for
239 functions that release the Global Interpreter Lock: e.g. I/O-bound
240 code or CPU-bound code in a few calls to native code that explicitly
241 releases the GIL. Note that on some rare systems (such as pyodide),
242 multiprocessing and loky may not be available, in which case joblib
243 defaults to threading.
245 In addition, if the ``dask`` and ``distributed`` Python packages are
246 installed, it is possible to use the 'dask' backend for better
247 scheduling of nested parallel calls without over-subscription and
248 potentially distribute parallel calls over a networked cluster of
249 several hosts.
251 It is also possible to use the distributed 'ray' backend for
252 distributing the workload to a cluster of nodes. See more details
253 in the Examples section below.
255 Alternatively the backend can be passed directly as an instance.
257 n_jobs : int, default=None
258 The maximum number of concurrently running jobs, such as the number
259 of Python worker processes when ``backend="loky"`` or the size of the
260 thread-pool when ``backend="threading"``.
261 If -1 all CPUs are used. If 1 is given, no parallel computing code
262 is used at all, which is useful for debugging. For ``n_jobs`` below -1,
263 (n_cpus + 1 + n_jobs) are used. Thus for ``n_jobs=-2``, all
264 CPUs but one are used.
265 ``None`` is a marker for 'unset' that will be interpreted as
266 ``n_jobs=1`` in most backends.
268 verbose : int, default=0
269 The verbosity level: if non zero, progress messages are
270 printed. Above 50, the output is sent to stdout.
271 The frequency of the messages increases with the verbosity level.
272 If it more than 10, all iterations are reported.
274 temp_folder : str, default=None
275 Folder to be used by the pool for memmapping large arrays
276 for sharing memory with worker processes. If None, this will try in
277 order:
279 - a folder pointed by the ``JOBLIB_TEMP_FOLDER`` environment
280 variable,
281 - ``/dev/shm`` if the folder exists and is writable: this is a
282 RAM disk filesystem available by default on modern Linux
283 distributions,
284 - the default system temporary folder that can be
285 overridden with ``TMP``, ``TMPDIR`` or ``TEMP`` environment
286 variables, typically ``/tmp`` under Unix operating systems.
288 max_nbytes int, str, or None, optional, default='1M'
289 Threshold on the size of arrays passed to the workers that
290 triggers automated memory mapping in temp_folder. Can be an int
291 in Bytes, or a human-readable string, e.g., '1M' for 1 megabyte.
292 Use None to disable memmapping of large arrays.
294 mmap_mode: {None, 'r+', 'r', 'w+', 'c'}, default='r'
295 Memmapping mode for numpy arrays passed to workers. None will
296 disable memmapping, other modes defined in the numpy.memmap doc:
297 https://numpy.org/doc/stable/reference/generated/numpy.memmap.html
298 Also, see 'max_nbytes' parameter documentation for more details.
300 prefer: str in {'processes', 'threads'} or None, default=None
301 Soft hint to choose the default backend.
302 The default process-based backend is 'loky' and the default
303 thread-based backend is 'threading'. Ignored if the ``backend``
304 parameter is specified.
306 require: 'sharedmem' or None, default=None
307 Hard constraint to select the backend. If set to 'sharedmem',
308 the selected backend will be single-host and thread-based.
310 inner_max_num_threads : int, default=None
311 If not None, overwrites the limit set on the number of threads
312 usable in some third-party library threadpools like OpenBLAS,
313 MKL or OpenMP. This is only used with the ``loky`` backend.
315 backend_params : dict
316 Additional parameters to pass to the backend constructor when
317 backend is a string.
319 Notes
320 -----
321 Joblib tries to limit the oversubscription by limiting the number of
322 threads usable in some third-party library threadpools like OpenBLAS, MKL
323 or OpenMP. The default limit in each worker is set to
324 ``max(cpu_count() // effective_n_jobs, 1)`` but this limit can be
325 overwritten with the ``inner_max_num_threads`` argument which will be used
326 to set this limit in the child processes.
328 .. versionadded:: 1.3
330 Examples
331 --------
332 >>> from operator import neg
333 >>> with parallel_config(backend='threading'):
334 ... print(Parallel()(delayed(neg)(i + 1) for i in range(5)))
335 ...
336 [-1, -2, -3, -4, -5]
338 To use the 'ray' joblib backend add the following lines:
340 >>> from ray.util.joblib import register_ray # doctest: +SKIP
341 >>> register_ray() # doctest: +SKIP
342 >>> with parallel_config(backend="ray"): # doctest: +SKIP
343 ... print(Parallel()(delayed(neg)(i + 1) for i in range(5)))
344 [-1, -2, -3, -4, -5]
346 """
347 def __init__(
348 self,
349 backend=default_parallel_config["backend"],
350 *,
351 n_jobs=default_parallel_config["n_jobs"],
352 verbose=default_parallel_config["verbose"],
353 temp_folder=default_parallel_config["temp_folder"],
354 max_nbytes=default_parallel_config["max_nbytes"],
355 mmap_mode=default_parallel_config["mmap_mode"],
356 prefer=default_parallel_config["prefer"],
357 require=default_parallel_config["require"],
358 inner_max_num_threads=None,
359 **backend_params
360 ):
361 # Save the parallel info and set the active parallel config
362 self.old_parallel_config = getattr(
363 _backend, "config", default_parallel_config
364 )
366 backend = self._check_backend(
367 backend, inner_max_num_threads, **backend_params
368 )
370 new_config = {
371 "n_jobs": n_jobs,
372 "verbose": verbose,
373 "temp_folder": temp_folder,
374 "max_nbytes": max_nbytes,
375 "mmap_mode": mmap_mode,
376 "prefer": prefer,
377 "require": require,
378 "backend": backend
379 }
380 self.parallel_config = self.old_parallel_config.copy()
381 self.parallel_config.update({
382 k: v for k, v in new_config.items()
383 if not isinstance(v, _Sentinel)
384 })
386 setattr(_backend, "config", self.parallel_config)
388 def _check_backend(self, backend, inner_max_num_threads, **backend_params):
389 if backend is default_parallel_config['backend']:
390 if inner_max_num_threads is not None or len(backend_params) > 0:
391 raise ValueError(
392 "inner_max_num_threads and other constructor "
393 "parameters backend_params are only supported "
394 "when backend is not None."
395 )
396 return backend
398 if isinstance(backend, str):
399 # Handle non-registered or missing backends
400 if backend not in BACKENDS:
401 if backend in EXTERNAL_BACKENDS:
402 register = EXTERNAL_BACKENDS[backend]
403 register()
404 elif backend in MAYBE_AVAILABLE_BACKENDS:
405 warnings.warn(
406 f"joblib backend '{backend}' is not available on "
407 f"your system, falling back to {DEFAULT_BACKEND}.",
408 UserWarning,
409 stacklevel=2
410 )
411 BACKENDS[backend] = BACKENDS[DEFAULT_BACKEND]
412 else:
413 raise ValueError(
414 f"Invalid backend: {backend}, expected one of "
415 f"{sorted(BACKENDS.keys())}"
416 )
418 backend = BACKENDS[backend](**backend_params)
420 if inner_max_num_threads is not None:
421 msg = (
422 f"{backend.__class__.__name__} does not accept setting the "
423 "inner_max_num_threads argument."
424 )
425 assert backend.supports_inner_max_num_threads, msg
426 backend.inner_max_num_threads = inner_max_num_threads
428 # If the nesting_level of the backend is not set previously, use the
429 # nesting level from the previous active_backend to set it
430 if backend.nesting_level is None:
431 parent_backend = self.old_parallel_config['backend']
432 if parent_backend is default_parallel_config['backend']:
433 nesting_level = 0
434 else:
435 nesting_level = parent_backend.nesting_level
436 backend.nesting_level = nesting_level
438 return backend
440 def __enter__(self):
441 return self.parallel_config
443 def __exit__(self, type, value, traceback):
444 self.unregister()
446 def unregister(self):
447 setattr(_backend, "config", self.old_parallel_config)
450class parallel_backend(parallel_config):
451 """Change the default backend used by Parallel inside a with block.
453 .. warning::
454 It is advised to use the :class:`~joblib.parallel_config` context
455 manager instead, which allows more fine-grained control over the
456 backend configuration.
458 If ``backend`` is a string it must match a previously registered
459 implementation using the :func:`~register_parallel_backend` function.
461 By default the following backends are available:
463 - 'loky': single-host, process-based parallelism (used by default),
464 - 'threading': single-host, thread-based parallelism,
465 - 'multiprocessing': legacy single-host, process-based parallelism.
467 'loky' is recommended to run functions that manipulate Python objects.
468 'threading' is a low-overhead alternative that is most efficient for
469 functions that release the Global Interpreter Lock: e.g. I/O-bound code or
470 CPU-bound code in a few calls to native code that explicitly releases the
471 GIL. Note that on some rare systems (such as Pyodide),
472 multiprocessing and loky may not be available, in which case joblib
473 defaults to threading.
475 You can also use the `Dask <https://docs.dask.org/en/stable/>`_ joblib
476 backend to distribute work across machines. This works well with
477 scikit-learn estimators with the ``n_jobs`` parameter, for example::
479 >>> import joblib # doctest: +SKIP
480 >>> from sklearn.model_selection import GridSearchCV # doctest: +SKIP
481 >>> from dask.distributed import Client, LocalCluster # doctest: +SKIP
483 >>> # create a local Dask cluster
484 >>> cluster = LocalCluster() # doctest: +SKIP
485 >>> client = Client(cluster) # doctest: +SKIP
486 >>> grid_search = GridSearchCV(estimator, param_grid, n_jobs=-1)
487 ... # doctest: +SKIP
488 >>> with joblib.parallel_backend("dask", scatter=[X, y]): # doctest: +SKIP
489 ... grid_search.fit(X, y)
491 It is also possible to use the distributed 'ray' backend for distributing
492 the workload to a cluster of nodes. To use the 'ray' joblib backend add
493 the following lines::
495 >>> from ray.util.joblib import register_ray # doctest: +SKIP
496 >>> register_ray() # doctest: +SKIP
497 >>> with parallel_backend("ray"): # doctest: +SKIP
498 ... print(Parallel()(delayed(neg)(i + 1) for i in range(5)))
499 [-1, -2, -3, -4, -5]
501 Alternatively the backend can be passed directly as an instance.
503 By default all available workers will be used (``n_jobs=-1``) unless the
504 caller passes an explicit value for the ``n_jobs`` parameter.
506 This is an alternative to passing a ``backend='backend_name'`` argument to
507 the :class:`~Parallel` class constructor. It is particularly useful when
508 calling into library code that uses joblib internally but does not expose
509 the backend argument in its own API.
511 >>> from operator import neg
512 >>> with parallel_backend('threading'):
513 ... print(Parallel()(delayed(neg)(i + 1) for i in range(5)))
514 ...
515 [-1, -2, -3, -4, -5]
517 Joblib also tries to limit the oversubscription by limiting the number of
518 threads usable in some third-party library threadpools like OpenBLAS, MKL
519 or OpenMP. The default limit in each worker is set to
520 ``max(cpu_count() // effective_n_jobs, 1)`` but this limit can be
521 overwritten with the ``inner_max_num_threads`` argument which will be used
522 to set this limit in the child processes.
524 .. versionadded:: 0.10
526 See Also
527 --------
528 joblib.parallel_config : context manager to change the backend
529 configuration.
530 """
531 def __init__(self, backend, n_jobs=-1, inner_max_num_threads=None,
532 **backend_params):
534 super().__init__(
535 backend=backend,
536 n_jobs=n_jobs,
537 inner_max_num_threads=inner_max_num_threads,
538 **backend_params
539 )
541 if self.old_parallel_config is None:
542 self.old_backend_and_jobs = None
543 else:
544 self.old_backend_and_jobs = (
545 self.old_parallel_config["backend"],
546 self.old_parallel_config["n_jobs"],
547 )
548 self.new_backend_and_jobs = (
549 self.parallel_config["backend"],
550 self.parallel_config["n_jobs"],
551 )
553 def __enter__(self):
554 return self.new_backend_and_jobs
557# Under Linux or OS X the default start method of multiprocessing
558# can cause third party libraries to crash. Under Python 3.4+ it is possible
559# to set an environment variable to switch the default start method from
560# 'fork' to 'forkserver' or 'spawn' to avoid this issue albeit at the cost
561# of causing semantic changes and some additional pool instantiation overhead.
562DEFAULT_MP_CONTEXT = None
563if hasattr(mp, 'get_context'):
564 method = os.environ.get('JOBLIB_START_METHOD', '').strip() or None
565 if method is not None:
566 DEFAULT_MP_CONTEXT = mp.get_context(method=method)
569class BatchedCalls(object):
570 """Wrap a sequence of (func, args, kwargs) tuples as a single callable"""
572 def __init__(self, iterator_slice, backend_and_jobs, reducer_callback=None,
573 pickle_cache=None):
574 self.items = list(iterator_slice)
575 self._size = len(self.items)
576 self._reducer_callback = reducer_callback
577 if isinstance(backend_and_jobs, tuple):
578 self._backend, self._n_jobs = backend_and_jobs
579 else:
580 # this is for backward compatibility purposes. Before 0.12.6,
581 # nested backends were returned without n_jobs indications.
582 self._backend, self._n_jobs = backend_and_jobs, None
583 self._pickle_cache = pickle_cache if pickle_cache is not None else {}
585 def __call__(self):
586 # Set the default nested backend to self._backend but do not set the
587 # change the default number of processes to -1
588 with parallel_config(backend=self._backend, n_jobs=self._n_jobs):
589 return [func(*args, **kwargs)
590 for func, args, kwargs in self.items]
592 def __reduce__(self):
593 if self._reducer_callback is not None:
594 self._reducer_callback()
595 # no need to pickle the callback.
596 return (
597 BatchedCalls,
598 (self.items, (self._backend, self._n_jobs), None,
599 self._pickle_cache)
600 )
602 def __len__(self):
603 return self._size
606# Possible exit status for a task
607TASK_DONE = "Done"
608TASK_ERROR = "Error"
609TASK_PENDING = "Pending"
612###############################################################################
613# CPU count that works also when multiprocessing has been disabled via
614# the JOBLIB_MULTIPROCESSING environment variable
615def cpu_count(only_physical_cores=False):
616 """Return the number of CPUs.
618 This delegates to loky.cpu_count that takes into account additional
619 constraints such as Linux CFS scheduler quotas (typically set by container
620 runtimes such as docker) and CPU affinity (for instance using the taskset
621 command on Linux).
623 If only_physical_cores is True, do not take hyperthreading / SMT logical
624 cores into account.
625 """
626 if mp is None:
627 return 1
629 return loky.cpu_count(only_physical_cores=only_physical_cores)
632###############################################################################
633# For verbosity
635def _verbosity_filter(index, verbose):
636 """ Returns False for indices increasingly apart, the distance
637 depending on the value of verbose.
639 We use a lag increasing as the square of index
640 """
641 if not verbose:
642 return True
643 elif verbose > 10:
644 return False
645 if index == 0:
646 return False
647 verbose = .5 * (11 - verbose) ** 2
648 scale = sqrt(index / verbose)
649 next_scale = sqrt((index + 1) / verbose)
650 return (int(next_scale) == int(scale))
653###############################################################################
654def delayed(function):
655 """Decorator used to capture the arguments of a function."""
657 def delayed_function(*args, **kwargs):
658 return function, args, kwargs
659 try:
660 delayed_function = functools.wraps(function)(delayed_function)
661 except AttributeError:
662 " functools.wraps fails on some callable objects "
663 return delayed_function
666###############################################################################
667class BatchCompletionCallBack(object):
668 """Callback to keep track of completed results and schedule the next tasks.
670 This callable is executed by the parent process whenever a worker process
671 has completed a batch of tasks.
673 It is used for progress reporting, to update estimate of the batch
674 processing duration and to schedule the next batch of tasks to be
675 processed.
677 It is assumed that this callback will always be triggered by the backend
678 right after the end of a task, in case of success as well as in case of
679 failure.
680 """
682 ##########################################################################
683 # METHODS CALLED BY THE MAIN THREAD #
684 ##########################################################################
685 def __init__(self, dispatch_timestamp, batch_size, parallel):
686 self.dispatch_timestamp = dispatch_timestamp
687 self.batch_size = batch_size
688 self.parallel = parallel
689 self.parallel_call_id = parallel._call_id
691 # Internals to keep track of the status and outcome of the task.
693 # Used to hold a reference to the future-like object returned by the
694 # backend after launching this task
695 # This will be set later when calling `register_job`, as it is only
696 # created once the task has been submitted.
697 self.job = None
699 if not parallel._backend.supports_retrieve_callback:
700 # The status is only used for asynchronous result retrieval in the
701 # callback.
702 self.status = None
703 else:
704 # The initial status for the job is TASK_PENDING.
705 # Once it is done, it will be either TASK_DONE, or TASK_ERROR.
706 self.status = TASK_PENDING
708 def register_job(self, job):
709 """Register the object returned by `apply_async`."""
710 self.job = job
712 def get_result(self, timeout):
713 """Returns the raw result of the task that was submitted.
715 If the task raised an exception rather than returning, this same
716 exception will be raised instead.
718 If the backend supports the retrieval callback, it is assumed that this
719 method is only called after the result has been registered. It is
720 ensured by checking that `self.status(timeout)` does not return
721 TASK_PENDING. In this case, `get_result` directly returns the
722 registered result (or raise the registered exception).
724 For other backends, there are no such assumptions, but `get_result`
725 still needs to synchronously retrieve the result before it can
726 return it or raise. It will block at most `self.timeout` seconds
727 waiting for retrieval to complete, after that it raises a TimeoutError.
728 """
730 backend = self.parallel._backend
732 if backend.supports_retrieve_callback:
733 # We assume that the result has already been retrieved by the
734 # callback thread, and is stored internally. It's just waiting to
735 # be returned.
736 return self._return_or_raise()
738 # For other backends, the main thread needs to run the retrieval step.
739 try:
740 if backend.supports_timeout:
741 result = self.job.get(timeout=timeout)
742 else:
743 result = self.job.get()
744 outcome = dict(result=result, status=TASK_DONE)
745 except BaseException as e:
746 outcome = dict(result=e, status=TASK_ERROR)
747 self._register_outcome(outcome)
749 return self._return_or_raise()
751 def _return_or_raise(self):
752 try:
753 if self.status == TASK_ERROR:
754 raise self._result
755 return self._result
756 finally:
757 del self._result
759 def get_status(self, timeout):
760 """Get the status of the task.
762 This function also checks if the timeout has been reached and register
763 the TimeoutError outcome when it is the case.
764 """
765 if timeout is None or self.status != TASK_PENDING:
766 return self.status
768 # The computation are running and the status is pending.
769 # Check that we did not wait for this jobs more than `timeout`.
770 now = time.time()
771 if not hasattr(self, "_completion_timeout_counter"):
772 self._completion_timeout_counter = now
774 if (now - self._completion_timeout_counter) > timeout:
775 outcome = dict(result=TimeoutError(), status=TASK_ERROR)
776 self._register_outcome(outcome)
778 return self.status
780 ##########################################################################
781 # METHODS CALLED BY CALLBACK THREADS #
782 ##########################################################################
783 def __call__(self, out):
784 """Function called by the callback thread after a job is completed."""
786 # If the backend doesn't support callback retrievals, the next batch of
787 # tasks is dispatched regardless. The result will be retrieved by the
788 # main thread when calling `get_result`.
789 if not self.parallel._backend.supports_retrieve_callback:
790 self._dispatch_new()
791 return
793 # If the backend supports retrieving the result in the callback, it
794 # registers the task outcome (TASK_ERROR or TASK_DONE), and schedules
795 # the next batch if needed.
796 with self.parallel._lock:
797 # Edge case where while the task was processing, the `parallel`
798 # instance has been reset and a new call has been issued, but the
799 # worker managed to complete the task and trigger this callback
800 # call just before being aborted by the reset.
801 if self.parallel._call_id != self.parallel_call_id:
802 return
804 # When aborting, stop as fast as possible and do not retrieve the
805 # result as it won't be returned by the Parallel call.
806 if self.parallel._aborting:
807 return
809 # Retrieves the result of the task in the main process and dispatch
810 # a new batch if needed.
811 job_succeeded = self._retrieve_result(out)
813 if job_succeeded:
814 self._dispatch_new()
816 def _dispatch_new(self):
817 """Schedule the next batch of tasks to be processed."""
819 # This steps ensure that auto-baching works as expected.
820 this_batch_duration = time.time() - self.dispatch_timestamp
821 self.parallel._backend.batch_completed(self.batch_size,
822 this_batch_duration)
824 # Schedule the next batch of tasks.
825 with self.parallel._lock:
826 self.parallel.n_completed_tasks += self.batch_size
827 self.parallel.print_progress()
828 if self.parallel._original_iterator is not None:
829 self.parallel.dispatch_next()
831 def _retrieve_result(self, out):
832 """Fetch and register the outcome of a task.
834 Return True if the task succeeded, False otherwise.
835 This function is only called by backends that support retrieving
836 the task result in the callback thread.
837 """
838 try:
839 result = self.parallel._backend.retrieve_result_callback(out)
840 outcome = dict(status=TASK_DONE, result=result)
841 except BaseException as e:
842 # Avoid keeping references to parallel in the error.
843 e.__traceback__ = None
844 outcome = dict(result=e, status=TASK_ERROR)
846 self._register_outcome(outcome)
847 return outcome['status'] != TASK_ERROR
849 ##########################################################################
850 # This method can be called either in the main thread #
851 # or in the callback thread. #
852 ##########################################################################
853 def _register_outcome(self, outcome):
854 """Register the outcome of a task.
856 This method can be called only once, future calls will be ignored.
857 """
858 # Covers the edge case where the main thread tries to register a
859 # `TimeoutError` while the callback thread tries to register a result
860 # at the same time.
861 with self.parallel._lock:
862 if self.status not in (TASK_PENDING, None):
863 return
864 self.status = outcome["status"]
866 self._result = outcome["result"]
868 # Once the result and the status are extracted, the last reference to
869 # the job can be deleted.
870 self.job = None
872 # As soon as an error as been spotted, early stopping flags are sent to
873 # the `parallel` instance.
874 if self.status == TASK_ERROR:
875 self.parallel._exception = True
876 self.parallel._aborting = True
879###############################################################################
880def register_parallel_backend(name, factory, make_default=False):
881 """Register a new Parallel backend factory.
883 The new backend can then be selected by passing its name as the backend
884 argument to the :class:`~Parallel` class. Moreover, the default backend can
885 be overwritten globally by setting make_default=True.
887 The factory can be any callable that takes no argument and return an
888 instance of ``ParallelBackendBase``.
890 Warning: this function is experimental and subject to change in a future
891 version of joblib.
893 .. versionadded:: 0.10
894 """
895 BACKENDS[name] = factory
896 if make_default:
897 global DEFAULT_BACKEND
898 DEFAULT_BACKEND = name
901def effective_n_jobs(n_jobs=-1):
902 """Determine the number of jobs that can actually run in parallel
904 n_jobs is the number of workers requested by the callers. Passing n_jobs=-1
905 means requesting all available workers for instance matching the number of
906 CPU cores on the worker host(s).
908 This method should return a guesstimate of the number of workers that can
909 actually perform work concurrently with the currently enabled default
910 backend. The primary use case is to make it possible for the caller to know
911 in how many chunks to slice the work.
913 In general working on larger data chunks is more efficient (less scheduling
914 overhead and better use of CPU cache prefetching heuristics) as long as all
915 the workers have enough work to do.
917 Warning: this function is experimental and subject to change in a future
918 version of joblib.
920 .. versionadded:: 0.10
921 """
922 if n_jobs == 1:
923 return 1
925 backend, backend_n_jobs = get_active_backend()
926 if n_jobs is None:
927 n_jobs = backend_n_jobs
928 return backend.effective_n_jobs(n_jobs=n_jobs)
931###############################################################################
932class Parallel(Logger):
933 ''' Helper class for readable parallel mapping.
935 Read more in the :ref:`User Guide <parallel>`.
937 Parameters
938 ----------
939 n_jobs: int, default: None
940 The maximum number of concurrently running jobs, such as the number
941 of Python worker processes when backend="multiprocessing"
942 or the size of the thread-pool when backend="threading".
943 If -1 all CPUs are used.
944 If 1 is given, no parallel computing code is used at all, and the
945 behavior amounts to a simple python `for` loop. This mode is not
946 compatible with `timeout`.
947 For n_jobs below -1, (n_cpus + 1 + n_jobs) are used. Thus for
948 n_jobs = -2, all CPUs but one are used.
949 None is a marker for 'unset' that will be interpreted as n_jobs=1
950 unless the call is performed under a :func:`~parallel_config`
951 context manager that sets another value for ``n_jobs``.
952 backend: str, ParallelBackendBase instance or None, default: 'loky'
953 Specify the parallelization backend implementation.
954 Supported backends are:
956 - "loky" used by default, can induce some
957 communication and memory overhead when exchanging input and
958 output data with the worker Python processes. On some rare
959 systems (such as Pyiodide), the loky backend may not be
960 available.
961 - "multiprocessing" previous process-based backend based on
962 `multiprocessing.Pool`. Less robust than `loky`.
963 - "threading" is a very low-overhead backend but it suffers
964 from the Python Global Interpreter Lock if the called function
965 relies a lot on Python objects. "threading" is mostly useful
966 when the execution bottleneck is a compiled extension that
967 explicitly releases the GIL (for instance a Cython loop wrapped
968 in a "with nogil" block or an expensive call to a library such
969 as NumPy).
970 - finally, you can register backends by calling
971 :func:`~register_parallel_backend`. This will allow you to
972 implement a backend of your liking.
974 It is not recommended to hard-code the backend name in a call to
975 :class:`~Parallel` in a library. Instead it is recommended to set
976 soft hints (prefer) or hard constraints (require) so as to make it
977 possible for library users to change the backend from the outside
978 using the :func:`~parallel_config` context manager.
979 return_as: str in {'list', 'generator'}, default: 'list'
980 If 'list', calls to this instance will return a list, only when
981 all results have been processed and retrieved.
982 If 'generator', it will return a generator that yields the results
983 as soon as they are available, in the order the tasks have been
984 submitted with.
985 Future releases are planned to also support 'generator_unordered',
986 in which case the generator immediately yields available results
987 independently of the submission order.
988 prefer: str in {'processes', 'threads'} or None, default: None
989 Soft hint to choose the default backend if no specific backend
990 was selected with the :func:`~parallel_config` context manager.
991 The default process-based backend is 'loky' and the default
992 thread-based backend is 'threading'. Ignored if the ``backend``
993 parameter is specified.
994 require: 'sharedmem' or None, default None
995 Hard constraint to select the backend. If set to 'sharedmem',
996 the selected backend will be single-host and thread-based even
997 if the user asked for a non-thread based backend with
998 :func:`~joblib.parallel_config`.
999 verbose: int, optional
1000 The verbosity level: if non zero, progress messages are
1001 printed. Above 50, the output is sent to stdout.
1002 The frequency of the messages increases with the verbosity level.
1003 If it more than 10, all iterations are reported.
1004 timeout: float, optional
1005 Timeout limit for each task to complete. If any task takes longer
1006 a TimeOutError will be raised. Only applied when n_jobs != 1
1007 pre_dispatch: {'all', integer, or expression, as in '3*n_jobs'}
1008 The number of batches (of tasks) to be pre-dispatched.
1009 Default is '2*n_jobs'. When batch_size="auto" this is reasonable
1010 default and the workers should never starve. Note that only basic
1011 arithmetics are allowed here and no modules can be used in this
1012 expression.
1013 batch_size: int or 'auto', default: 'auto'
1014 The number of atomic tasks to dispatch at once to each
1015 worker. When individual evaluations are very fast, dispatching
1016 calls to workers can be slower than sequential computation because
1017 of the overhead. Batching fast computations together can mitigate
1018 this.
1019 The ``'auto'`` strategy keeps track of the time it takes for a
1020 batch to complete, and dynamically adjusts the batch size to keep
1021 the time on the order of half a second, using a heuristic. The
1022 initial batch size is 1.
1023 ``batch_size="auto"`` with ``backend="threading"`` will dispatch
1024 batches of a single task at a time as the threading backend has
1025 very little overhead and using larger batch size has not proved to
1026 bring any gain in that case.
1027 temp_folder: str, optional
1028 Folder to be used by the pool for memmapping large arrays
1029 for sharing memory with worker processes. If None, this will try in
1030 order:
1032 - a folder pointed by the JOBLIB_TEMP_FOLDER environment
1033 variable,
1034 - /dev/shm if the folder exists and is writable: this is a
1035 RAM disk filesystem available by default on modern Linux
1036 distributions,
1037 - the default system temporary folder that can be
1038 overridden with TMP, TMPDIR or TEMP environment
1039 variables, typically /tmp under Unix operating systems.
1041 Only active when backend="loky" or "multiprocessing".
1042 max_nbytes int, str, or None, optional, 1M by default
1043 Threshold on the size of arrays passed to the workers that
1044 triggers automated memory mapping in temp_folder. Can be an int
1045 in Bytes, or a human-readable string, e.g., '1M' for 1 megabyte.
1046 Use None to disable memmapping of large arrays.
1047 Only active when backend="loky" or "multiprocessing".
1048 mmap_mode: {None, 'r+', 'r', 'w+', 'c'}, default: 'r'
1049 Memmapping mode for numpy arrays passed to workers. None will
1050 disable memmapping, other modes defined in the numpy.memmap doc:
1051 https://numpy.org/doc/stable/reference/generated/numpy.memmap.html
1052 Also, see 'max_nbytes' parameter documentation for more details.
1054 Notes
1055 -----
1057 This object uses workers to compute in parallel the application of a
1058 function to many different arguments. The main functionality it brings
1059 in addition to using the raw multiprocessing or concurrent.futures API
1060 are (see examples for details):
1062 * More readable code, in particular since it avoids
1063 constructing list of arguments.
1065 * Easier debugging:
1066 - informative tracebacks even when the error happens on
1067 the client side
1068 - using 'n_jobs=1' enables to turn off parallel computing
1069 for debugging without changing the codepath
1070 - early capture of pickling errors
1072 * An optional progress meter.
1074 * Interruption of multiprocesses jobs with 'Ctrl-C'
1076 * Flexible pickling control for the communication to and from
1077 the worker processes.
1079 * Ability to use shared memory efficiently with worker
1080 processes for large numpy-based datastructures.
1082 Note that the intended usage is to run one call at a time. Multiple
1083 calls to the same Parallel object will result in a ``RuntimeError``
1085 Examples
1086 --------
1088 A simple example:
1090 >>> from math import sqrt
1091 >>> from joblib import Parallel, delayed
1092 >>> Parallel(n_jobs=1)(delayed(sqrt)(i**2) for i in range(10))
1093 [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
1095 Reshaping the output when the function has several return
1096 values:
1098 >>> from math import modf
1099 >>> from joblib import Parallel, delayed
1100 >>> r = Parallel(n_jobs=1)(delayed(modf)(i/2.) for i in range(10))
1101 >>> res, i = zip(*r)
1102 >>> res
1103 (0.0, 0.5, 0.0, 0.5, 0.0, 0.5, 0.0, 0.5, 0.0, 0.5)
1104 >>> i
1105 (0.0, 0.0, 1.0, 1.0, 2.0, 2.0, 3.0, 3.0, 4.0, 4.0)
1107 The progress meter: the higher the value of `verbose`, the more
1108 messages:
1110 >>> from time import sleep
1111 >>> from joblib import Parallel, delayed
1112 >>> r = Parallel(n_jobs=2, verbose=10)(
1113 ... delayed(sleep)(.2) for _ in range(10)) #doctest: +SKIP
1114 [Parallel(n_jobs=2)]: Done 1 tasks | elapsed: 0.6s
1115 [Parallel(n_jobs=2)]: Done 4 tasks | elapsed: 0.8s
1116 [Parallel(n_jobs=2)]: Done 10 out of 10 | elapsed: 1.4s finished
1118 Traceback example, note how the line of the error is indicated
1119 as well as the values of the parameter passed to the function that
1120 triggered the exception, even though the traceback happens in the
1121 child process:
1123 >>> from heapq import nlargest
1124 >>> from joblib import Parallel, delayed
1125 >>> Parallel(n_jobs=2)(
1126 ... delayed(nlargest)(2, n) for n in (range(4), 'abcde', 3))
1127 ... # doctest: +SKIP
1128 -----------------------------------------------------------------------
1129 Sub-process traceback:
1130 -----------------------------------------------------------------------
1131 TypeError Mon Nov 12 11:37:46 2012
1132 PID: 12934 Python 2.7.3: /usr/bin/python
1133 ........................................................................
1134 /usr/lib/python2.7/heapq.pyc in nlargest(n=2, iterable=3, key=None)
1135 419 if n >= size:
1136 420 return sorted(iterable, key=key, reverse=True)[:n]
1137 421
1138 422 # When key is none, use simpler decoration
1139 423 if key is None:
1140 --> 424 it = izip(iterable, count(0,-1)) # decorate
1141 425 result = _nlargest(n, it)
1142 426 return map(itemgetter(0), result) # undecorate
1143 427
1144 428 # General case, slowest method
1145 TypeError: izip argument #1 must support iteration
1146 _______________________________________________________________________
1149 Using pre_dispatch in a producer/consumer situation, where the
1150 data is generated on the fly. Note how the producer is first
1151 called 3 times before the parallel loop is initiated, and then
1152 called to generate new data on the fly:
1154 >>> from math import sqrt
1155 >>> from joblib import Parallel, delayed
1156 >>> def producer():
1157 ... for i in range(6):
1158 ... print('Produced %s' % i)
1159 ... yield i
1160 >>> out = Parallel(n_jobs=2, verbose=100, pre_dispatch='1.5*n_jobs')(
1161 ... delayed(sqrt)(i) for i in producer()) #doctest: +SKIP
1162 Produced 0
1163 Produced 1
1164 Produced 2
1165 [Parallel(n_jobs=2)]: Done 1 jobs | elapsed: 0.0s
1166 Produced 3
1167 [Parallel(n_jobs=2)]: Done 2 jobs | elapsed: 0.0s
1168 Produced 4
1169 [Parallel(n_jobs=2)]: Done 3 jobs | elapsed: 0.0s
1170 Produced 5
1171 [Parallel(n_jobs=2)]: Done 4 jobs | elapsed: 0.0s
1172 [Parallel(n_jobs=2)]: Done 6 out of 6 | elapsed: 0.0s remaining: 0.0s
1173 [Parallel(n_jobs=2)]: Done 6 out of 6 | elapsed: 0.0s finished
1175 '''
1176 def __init__(
1177 self,
1178 n_jobs=default_parallel_config["n_jobs"],
1179 backend=default_parallel_config['backend'],
1180 return_as="list",
1181 verbose=default_parallel_config["verbose"],
1182 timeout=None,
1183 pre_dispatch='2 * n_jobs',
1184 batch_size='auto',
1185 temp_folder=default_parallel_config["temp_folder"],
1186 max_nbytes=default_parallel_config["max_nbytes"],
1187 mmap_mode=default_parallel_config["mmap_mode"],
1188 prefer=default_parallel_config["prefer"],
1189 require=default_parallel_config["require"],
1190 ):
1191 # Initiate parent Logger class state
1192 super().__init__()
1194 # Interpret n_jobs=None as 'unset'
1195 if n_jobs is None:
1196 n_jobs = default_parallel_config["n_jobs"]
1198 active_backend, context_config = _get_active_backend(
1199 prefer=prefer, require=require, verbose=verbose
1200 )
1202 nesting_level = active_backend.nesting_level
1204 self.verbose = _get_config_param(verbose, context_config, "verbose")
1205 self.timeout = timeout
1206 self.pre_dispatch = pre_dispatch
1208 if return_as not in {"list", "generator"}:
1209 raise ValueError(
1210 'Expected `return_as` parameter to be a string equal to "list"'
1211 f' or "generator", but got {return_as} instead'
1212 )
1213 self.return_as = return_as
1214 self.return_generator = return_as != "list"
1216 # Check if we are under a parallel_config or parallel_backend
1217 # context manager and use the config from the context manager
1218 # for arguments that are not explicitly set.
1219 self._backend_args = {
1220 k: _get_config_param(param, context_config, k) for param, k in [
1221 (max_nbytes, "max_nbytes"),
1222 (temp_folder, "temp_folder"),
1223 (mmap_mode, "mmap_mode"),
1224 (prefer, "prefer"),
1225 (require, "require"),
1226 (verbose, "verbose"),
1227 ]
1228 }
1230 if isinstance(self._backend_args["max_nbytes"], str):
1231 self._backend_args["max_nbytes"] = memstr_to_bytes(
1232 self._backend_args["max_nbytes"]
1233 )
1234 self._backend_args["verbose"] = max(
1235 0, self._backend_args["verbose"] - 50
1236 )
1238 if DEFAULT_MP_CONTEXT is not None:
1239 self._backend_args['context'] = DEFAULT_MP_CONTEXT
1240 elif hasattr(mp, "get_context"):
1241 self._backend_args['context'] = mp.get_context()
1243 if backend is default_parallel_config['backend'] or backend is None:
1244 backend = active_backend
1246 elif isinstance(backend, ParallelBackendBase):
1247 # Use provided backend as is, with the current nesting_level if it
1248 # is not set yet.
1249 if backend.nesting_level is None:
1250 backend.nesting_level = nesting_level
1252 elif hasattr(backend, 'Pool') and hasattr(backend, 'Lock'):
1253 # Make it possible to pass a custom multiprocessing context as
1254 # backend to change the start method to forkserver or spawn or
1255 # preload modules on the forkserver helper process.
1256 self._backend_args['context'] = backend
1257 backend = MultiprocessingBackend(nesting_level=nesting_level)
1259 elif backend not in BACKENDS and backend in MAYBE_AVAILABLE_BACKENDS:
1260 warnings.warn(
1261 f"joblib backend '{backend}' is not available on "
1262 f"your system, falling back to {DEFAULT_BACKEND}.",
1263 UserWarning,
1264 stacklevel=2)
1265 BACKENDS[backend] = BACKENDS[DEFAULT_BACKEND]
1266 backend = BACKENDS[DEFAULT_BACKEND](nesting_level=nesting_level)
1268 else:
1269 try:
1270 backend_factory = BACKENDS[backend]
1271 except KeyError as e:
1272 raise ValueError("Invalid backend: %s, expected one of %r"
1273 % (backend, sorted(BACKENDS.keys()))) from e
1274 backend = backend_factory(nesting_level=nesting_level)
1276 n_jobs = _get_config_param(n_jobs, context_config, "n_jobs")
1277 if n_jobs is None:
1278 # No specific context override and no specific value request:
1279 # default to the default of the backend.
1280 n_jobs = backend.default_n_jobs
1281 self.n_jobs = n_jobs
1283 if (require == 'sharedmem' and
1284 not getattr(backend, 'supports_sharedmem', False)):
1285 raise ValueError("Backend %s does not support shared memory"
1286 % backend)
1288 if (batch_size == 'auto' or isinstance(batch_size, Integral) and
1289 batch_size > 0):
1290 self.batch_size = batch_size
1291 else:
1292 raise ValueError(
1293 "batch_size must be 'auto' or a positive integer, got: %r"
1294 % batch_size)
1296 if not isinstance(backend, SequentialBackend):
1297 if self.return_generator and not backend.supports_return_generator:
1298 raise ValueError(
1299 "Backend {} does not support "
1300 "return_as={}".format(backend, return_as)
1301 )
1302 # This lock is used to coordinate the main thread of this process
1303 # with the async callback thread of our the pool.
1304 self._lock = threading.RLock()
1305 self._jobs = collections.deque()
1306 self._pending_outputs = list()
1307 self._ready_batches = queue.Queue()
1308 self._reducer_callback = None
1310 # Internal variables
1311 self._backend = backend
1312 self._running = False
1313 self._managed_backend = False
1314 self._id = uuid4().hex
1315 self._call_ref = None
1317 def __enter__(self):
1318 self._managed_backend = True
1319 self._calling = False
1320 self._initialize_backend()
1321 return self
1323 def __exit__(self, exc_type, exc_value, traceback):
1324 self._managed_backend = False
1325 if self.return_generator and self._calling:
1326 self._abort()
1327 self._terminate_and_reset()
1329 def _initialize_backend(self):
1330 """Build a process or thread pool and return the number of workers"""
1331 try:
1332 n_jobs = self._backend.configure(n_jobs=self.n_jobs, parallel=self,
1333 **self._backend_args)
1334 if self.timeout is not None and not self._backend.supports_timeout:
1335 warnings.warn(
1336 'The backend class {!r} does not support timeout. '
1337 "You have set 'timeout={}' in Parallel but "
1338 "the 'timeout' parameter will not be used.".format(
1339 self._backend.__class__.__name__,
1340 self.timeout))
1342 except FallbackToBackend as e:
1343 # Recursively initialize the backend in case of requested fallback.
1344 self._backend = e.backend
1345 n_jobs = self._initialize_backend()
1347 return n_jobs
1349 def _effective_n_jobs(self):
1350 if self._backend:
1351 return self._backend.effective_n_jobs(self.n_jobs)
1352 return 1
1354 def _terminate_and_reset(self):
1355 if hasattr(self._backend, 'stop_call') and self._calling:
1356 self._backend.stop_call()
1357 self._calling = False
1358 if not self._managed_backend:
1359 self._backend.terminate()
1361 def _dispatch(self, batch):
1362 """Queue the batch for computing, with or without multiprocessing
1364 WARNING: this method is not thread-safe: it should be only called
1365 indirectly via dispatch_one_batch.
1367 """
1368 # If job.get() catches an exception, it closes the queue:
1369 if self._aborting:
1370 return
1372 batch_size = len(batch)
1374 self.n_dispatched_tasks += batch_size
1375 self.n_dispatched_batches += 1
1377 dispatch_timestamp = time.time()
1379 batch_tracker = BatchCompletionCallBack(
1380 dispatch_timestamp, batch_size, self
1381 )
1382 self._jobs.append(batch_tracker)
1384 job = self._backend.apply_async(batch, callback=batch_tracker)
1385 batch_tracker.register_job(job)
1387 def dispatch_next(self):
1388 """Dispatch more data for parallel processing
1390 This method is meant to be called concurrently by the multiprocessing
1391 callback. We rely on the thread-safety of dispatch_one_batch to protect
1392 against concurrent consumption of the unprotected iterator.
1394 """
1395 if not self.dispatch_one_batch(self._original_iterator):
1396 self._iterating = False
1397 self._original_iterator = None
1399 def dispatch_one_batch(self, iterator):
1400 """Prefetch the tasks for the next batch and dispatch them.
1402 The effective size of the batch is computed here.
1403 If there are no more jobs to dispatch, return False, else return True.
1405 The iterator consumption and dispatching is protected by the same
1406 lock so calling this function should be thread safe.
1408 """
1410 if self._aborting:
1411 return False
1413 batch_size = self._get_batch_size()
1415 with self._lock:
1416 # to ensure an even distribution of the workload between workers,
1417 # we look ahead in the original iterators more than batch_size
1418 # tasks - However, we keep consuming only one batch at each
1419 # dispatch_one_batch call. The extra tasks are stored in a local
1420 # queue, _ready_batches, that is looked-up prior to re-consuming
1421 # tasks from the origal iterator.
1422 try:
1423 tasks = self._ready_batches.get(block=False)
1424 except queue.Empty:
1425 # slice the iterator n_jobs * batchsize items at a time. If the
1426 # slice returns less than that, then the current batchsize puts
1427 # too much weight on a subset of workers, while other may end
1428 # up starving. So in this case, re-scale the batch size
1429 # accordingly to distribute evenly the last items between all
1430 # workers.
1431 n_jobs = self._cached_effective_n_jobs
1432 big_batch_size = batch_size * n_jobs
1434 islice = list(itertools.islice(iterator, big_batch_size))
1435 if len(islice) == 0:
1436 return False
1437 elif (iterator is self._original_iterator and
1438 len(islice) < big_batch_size):
1439 # We reached the end of the original iterator (unless
1440 # iterator is the ``pre_dispatch``-long initial slice of
1441 # the original iterator) -- decrease the batch size to
1442 # account for potential variance in the batches running
1443 # time.
1444 final_batch_size = max(1, len(islice) // (10 * n_jobs))
1445 else:
1446 final_batch_size = max(1, len(islice) // n_jobs)
1448 # enqueue n_jobs batches in a local queue
1449 for i in range(0, len(islice), final_batch_size):
1450 tasks = BatchedCalls(islice[i:i + final_batch_size],
1451 self._backend.get_nested_backend(),
1452 self._reducer_callback,
1453 self._pickle_cache)
1454 self._ready_batches.put(tasks)
1456 # finally, get one task.
1457 tasks = self._ready_batches.get(block=False)
1458 if len(tasks) == 0:
1459 # No more tasks available in the iterator: tell caller to stop.
1460 return False
1461 else:
1462 self._dispatch(tasks)
1463 return True
1465 def _get_batch_size(self):
1466 """Returns the effective batch size for dispatch"""
1467 if self.batch_size == 'auto':
1468 return self._backend.compute_batch_size()
1469 else:
1470 # Fixed batch size strategy
1471 return self.batch_size
1473 def _print(self, msg):
1474 """Display the message on stout or stderr depending on verbosity"""
1475 # XXX: Not using the logger framework: need to
1476 # learn to use logger better.
1477 if not self.verbose:
1478 return
1479 if self.verbose < 50:
1480 writer = sys.stderr.write
1481 else:
1482 writer = sys.stdout.write
1483 writer(f"[{self}]: {msg}\n")
1485 def _is_completed(self):
1486 """Check if all tasks have been completed"""
1487 return self.n_completed_tasks == self.n_dispatched_tasks and not (
1488 self._iterating or self._aborting
1489 )
1491 def print_progress(self):
1492 """Display the process of the parallel execution only a fraction
1493 of time, controlled by self.verbose.
1494 """
1496 if not self.verbose:
1497 return
1499 elapsed_time = time.time() - self._start_time
1501 if self._is_completed():
1502 # Make sure that we get a last message telling us we are done
1503 self._print(
1504 f"Done {self.n_completed_tasks:3d} out of "
1505 f"{self.n_completed_tasks:3d} | elapsed: "
1506 f"{short_format_time(elapsed_time)} finished"
1507 )
1508 return
1510 # Original job iterator becomes None once it has been fully
1511 # consumed : at this point we know the total number of jobs and we are
1512 # able to display an estimation of the remaining time based on already
1513 # completed jobs. Otherwise, we simply display the number of completed
1514 # tasks.
1515 elif self._original_iterator is not None:
1516 if _verbosity_filter(self.n_dispatched_batches, self.verbose):
1517 return
1518 self._print(
1519 f"Done {self.n_completed_tasks:3d} tasks | elapsed: "
1520 f"{short_format_time(elapsed_time)}"
1521 )
1522 else:
1523 index = self.n_completed_tasks
1524 # We are finished dispatching
1525 total_tasks = self.n_dispatched_tasks
1526 # We always display the first loop
1527 if not index == 0:
1528 # Display depending on the number of remaining items
1529 # A message as soon as we finish dispatching, cursor is 0
1530 cursor = (total_tasks - index + 1 -
1531 self._pre_dispatch_amount)
1532 frequency = (total_tasks // self.verbose) + 1
1533 is_last_item = (index + 1 == total_tasks)
1534 if (is_last_item or cursor % frequency):
1535 return
1536 remaining_time = (elapsed_time / index) * \
1537 (self.n_dispatched_tasks - index * 1.0)
1538 # only display status if remaining time is greater or equal to 0
1539 self._print(
1540 f"Done {index:3d} out of {total_tasks:3d} | elapsed: "
1541 f"{short_format_time(elapsed_time)} remaining: "
1542 f"{short_format_time(remaining_time)}"
1543 )
1545 def _abort(self):
1546 # Stop dispatching new jobs in the async callback thread
1547 self._aborting = True
1549 # If the backend allows it, cancel or kill remaining running
1550 # tasks without waiting for the results as we will raise
1551 # the exception we got back to the caller instead of returning
1552 # any result.
1553 backend = self._backend
1554 if (not self._aborted and hasattr(backend, 'abort_everything')):
1555 # If the backend is managed externally we need to make sure
1556 # to leave it in a working state to allow for future jobs
1557 # scheduling.
1558 ensure_ready = self._managed_backend
1559 backend.abort_everything(ensure_ready=ensure_ready)
1560 self._aborted = True
1562 def _start(self, iterator, pre_dispatch):
1563 # Only set self._iterating to True if at least a batch
1564 # was dispatched. In particular this covers the edge
1565 # case of Parallel used with an exhausted iterator. If
1566 # self._original_iterator is None, then this means either
1567 # that pre_dispatch == "all", n_jobs == 1 or that the first batch
1568 # was very quick and its callback already dispatched all the
1569 # remaining jobs.
1570 self._iterating = False
1571 if self.dispatch_one_batch(iterator):
1572 self._iterating = self._original_iterator is not None
1574 while self.dispatch_one_batch(iterator):
1575 pass
1577 if pre_dispatch == "all":
1578 # The iterable was consumed all at once by the above for loop.
1579 # No need to wait for async callbacks to trigger to
1580 # consumption.
1581 self._iterating = False
1583 def _get_outputs(self, iterator, pre_dispatch):
1584 """Iterator returning the tasks' output as soon as they are ready."""
1585 dispatch_thread_id = threading.get_ident()
1586 detach_generator_exit = False
1587 try:
1588 self._start(iterator, pre_dispatch)
1589 # first yield returns None, for internal use only. This ensures
1590 # that we enter the try/except block and start dispatching the
1591 # tasks.
1592 yield
1594 with self._backend.retrieval_context():
1595 yield from self._retrieve()
1597 except GeneratorExit:
1598 # The generator has been garbage collected before being fully
1599 # consumed. This aborts the remaining tasks if possible and warn
1600 # the user if necessary.
1601 self._exception = True
1603 # In some interpreters such as PyPy, GeneratorExit can be raised in
1604 # a different thread than the one used to start the dispatch of the
1605 # parallel tasks. This can lead to hang when a thread attempts to
1606 # join itself. As workaround, we detach the execution of the
1607 # aborting code to a dedicated thread. We then need to make sure
1608 # the rest of the function does not call `_terminate_and_reset`
1609 # in finally.
1610 if dispatch_thread_id != threading.get_ident():
1611 if not IS_PYPY:
1612 warnings.warn(
1613 "A generator produced by joblib.Parallel has been "
1614 "gc'ed in an unexpected thread. This behavior should "
1615 "not cause major -issues but to make sure, please "
1616 "report this warning and your use case at "
1617 "https://github.com/joblib/joblib/issues so it can "
1618 "be investigated."
1619 )
1621 detach_generator_exit = True
1622 _parallel = self
1624 class _GeneratorExitThread(threading.Thread):
1625 def run(self):
1626 _parallel._abort()
1627 if _parallel.return_generator:
1628 _parallel._warn_exit_early()
1629 _parallel._terminate_and_reset()
1631 _GeneratorExitThread(
1632 name="GeneratorExitThread"
1633 ).start()
1634 return
1636 # Otherwise, we are in the thread that started the dispatch: we can
1637 # safely abort the execution and warn the user.
1638 self._abort()
1639 if self.return_generator:
1640 self._warn_exit_early()
1642 raise
1644 # Note: we catch any BaseException instead of just Exception instances
1645 # to also include KeyboardInterrupt
1646 except BaseException:
1647 self._exception = True
1648 self._abort()
1649 raise
1650 finally:
1651 # Store the unconsumed tasks and terminate the workers if necessary
1652 _remaining_outputs = ([] if self._exception else self._jobs)
1653 self._jobs = collections.deque()
1654 self._running = False
1655 if not detach_generator_exit:
1656 self._terminate_and_reset()
1658 while len(_remaining_outputs) > 0:
1659 batched_results = _remaining_outputs.popleft()
1660 batched_results = batched_results.get_result(self.timeout)
1661 for result in batched_results:
1662 yield result
1664 def _wait_retrieval(self):
1665 """Return True if we need to continue retriving some tasks."""
1667 # If the input load is still being iterated over, it means that tasks
1668 # are still on the dispatch wait list and their results will need to
1669 # be retrieved later on.
1670 if self._iterating:
1671 return True
1673 # If some of the dispatched tasks are still being processed by the
1674 # workers, wait for the compute to finish before starting retrieval
1675 if self.n_completed_tasks < self.n_dispatched_tasks:
1676 return True
1678 # For backends that does not support retrieving asynchronously the
1679 # result to the main process, all results must be carefully retrieved
1680 # in the _retrieve loop in the main thread while the backend is alive.
1681 # For other backends, the actual retrieval is done asynchronously in
1682 # the callback thread, and we can terminate the backend before the
1683 # `self._jobs` result list has been emptied. The remaining results
1684 # will be collected in the `finally` step of the generator.
1685 if not self._backend.supports_retrieve_callback:
1686 if len(self._jobs) > 0:
1687 return True
1689 return False
1691 def _retrieve(self):
1692 while self._wait_retrieval():
1694 # If the callback thread of a worker has signaled that its task
1695 # triggered an exception, or if the retrieval loop has raised an
1696 # exception (e.g. `GeneratorExit`), exit the loop and surface the
1697 # worker traceback.
1698 if self._aborting:
1699 self._raise_error_fast()
1700 break
1702 # If the next job is not ready for retrieval yet, we just wait for
1703 # async callbacks to progress.
1704 if ((len(self._jobs) == 0) or
1705 (self._jobs[0].get_status(
1706 timeout=self.timeout) == TASK_PENDING)):
1707 time.sleep(0.01)
1708 continue
1710 # We need to be careful: the job list can be filling up as
1711 # we empty it and Python list are not thread-safe by
1712 # default hence the use of the lock
1713 with self._lock:
1714 batched_results = self._jobs.popleft()
1716 # Flatten the batched results to output one output at a time
1717 batched_results = batched_results.get_result(self.timeout)
1718 for result in batched_results:
1719 self._nb_consumed += 1
1720 yield result
1722 def _raise_error_fast(self):
1723 """If we are aborting, raise if a job caused an error."""
1725 # Find the first job whose status is TASK_ERROR if it exists.
1726 with self._lock:
1727 error_job = next((job for job in self._jobs
1728 if job.status == TASK_ERROR), None)
1730 # If this error job exists, immediatly raise the error by
1731 # calling get_result. This job might not exists if abort has been
1732 # called directly or if the generator is gc'ed.
1733 if error_job is not None:
1734 error_job.get_result(self.timeout)
1736 def _warn_exit_early(self):
1737 """Warn the user if the generator is gc'ed before being consumned."""
1738 ready_outputs = self.n_completed_tasks - self._nb_consumed
1739 is_completed = self._is_completed()
1740 msg = ""
1741 if ready_outputs:
1742 msg += (
1743 f"{ready_outputs} tasks have been successfully executed "
1744 " but not used."
1745 )
1746 if not is_completed:
1747 msg += " Additionally, "
1749 if not is_completed:
1750 msg += (
1751 f"{self.n_dispatched_tasks - self.n_completed_tasks} tasks "
1752 "which were still being processed by the workers have been "
1753 "cancelled."
1754 )
1756 if msg:
1757 msg += (
1758 " You could benefit from adjusting the input task "
1759 "iterator to limit unnecessary computation time."
1760 )
1762 warnings.warn(msg)
1764 def _get_sequential_output(self, iterable):
1765 """Separate loop for sequential output.
1767 This simplifies the traceback in case of errors and reduces the
1768 overhead of calling sequential tasks with `joblib`.
1769 """
1770 try:
1771 self._iterating = True
1772 self._original_iterator = iterable
1773 batch_size = self._get_batch_size()
1775 if batch_size != 1:
1776 it = iter(iterable)
1777 iterable_batched = iter(
1778 lambda: tuple(itertools.islice(it, batch_size)), ()
1779 )
1780 iterable = (
1781 task for batch in iterable_batched for task in batch
1782 )
1784 # first yield returns None, for internal use only. This ensures
1785 # that we enter the try/except block and setup the generator.
1786 yield None
1788 # Sequentially call the tasks and yield the results.
1789 for func, args, kwargs in iterable:
1790 self.n_dispatched_batches += 1
1791 self.n_dispatched_tasks += 1
1792 res = func(*args, **kwargs)
1793 self.n_completed_tasks += 1
1794 self.print_progress()
1795 yield res
1796 self._nb_consumed += 1
1797 except BaseException:
1798 self._exception = True
1799 self._aborting = True
1800 self._aborted = True
1801 raise
1802 finally:
1803 self.print_progress()
1804 self._running = False
1805 self._iterating = False
1806 self._original_iterator = None
1808 def _reset_run_tracking(self):
1809 """Reset the counters and flags used to track the execution."""
1811 # Makes sur the parallel instance was not previously running in a
1812 # thread-safe way.
1813 with getattr(self, '_lock', nullcontext()):
1814 if self._running:
1815 msg = 'This Parallel instance is already running !'
1816 if self.return_generator is True:
1817 msg += (
1818 " Before submitting new tasks, you must wait for the "
1819 "completion of all the previous tasks, or clean all "
1820 "references to the output generator."
1821 )
1822 raise RuntimeError(msg)
1823 self._running = True
1825 # Counter to keep track of the task dispatched and completed.
1826 self.n_dispatched_batches = 0
1827 self.n_dispatched_tasks = 0
1828 self.n_completed_tasks = 0
1830 # Following count is incremented by one each time the user iterates
1831 # on the output generator, it is used to prepare an informative
1832 # warning message in case the generator is deleted before all the
1833 # dispatched tasks have been consumed.
1834 self._nb_consumed = 0
1836 # Following flags are used to synchronize the threads in case one of
1837 # the tasks error-out to ensure that all workers abort fast and that
1838 # the backend terminates properly.
1840 # Set to True as soon as a worker signals that a task errors-out
1841 self._exception = False
1842 # Set to True in case of early termination following an incident
1843 self._aborting = False
1844 # Set to True after abortion is complete
1845 self._aborted = False
1847 def __call__(self, iterable):
1848 """Main function to dispatch parallel tasks."""
1850 self._reset_run_tracking()
1851 self._start_time = time.time()
1853 if not self._managed_backend:
1854 n_jobs = self._initialize_backend()
1855 else:
1856 n_jobs = self._effective_n_jobs()
1858 if n_jobs == 1:
1859 # If n_jobs==1, run the computation sequentially and return
1860 # immediatly to avoid overheads.
1861 output = self._get_sequential_output(iterable)
1862 next(output)
1863 return output if self.return_generator else list(output)
1865 # Let's create an ID that uniquely identifies the current call. If the
1866 # call is interrupted early and that the same instance is immediately
1867 # re-used, this id will be used to prevent workers that were
1868 # concurrently finalizing a task from the previous call to run the
1869 # callback.
1870 with self._lock:
1871 self._call_id = uuid4().hex
1873 # self._effective_n_jobs should be called in the Parallel.__call__
1874 # thread only -- store its value in an attribute for further queries.
1875 self._cached_effective_n_jobs = n_jobs
1877 if isinstance(self._backend, LokyBackend):
1878 # For the loky backend, we add a callback executed when reducing
1879 # BatchCalls, that makes the loky executor use a temporary folder
1880 # specific to this Parallel object when pickling temporary memmaps.
1881 # This callback is necessary to ensure that several Parallel
1882 # objects using the same resuable executor don't use the same
1883 # temporary resources.
1885 def _batched_calls_reducer_callback():
1886 # Relevant implementation detail: the following lines, called
1887 # when reducing BatchedCalls, are called in a thread-safe
1888 # situation, meaning that the context of the temporary folder
1889 # manager will not be changed in between the callback execution
1890 # and the end of the BatchedCalls pickling. The reason is that
1891 # pickling (the only place where set_current_context is used)
1892 # is done from a single thread (the queue_feeder_thread).
1893 self._backend._workers._temp_folder_manager.set_current_context( # noqa
1894 self._id
1895 )
1896 self._reducer_callback = _batched_calls_reducer_callback
1898 # self._effective_n_jobs should be called in the Parallel.__call__
1899 # thread only -- store its value in an attribute for further queries.
1900 self._cached_effective_n_jobs = n_jobs
1902 backend_name = self._backend.__class__.__name__
1903 if n_jobs == 0:
1904 raise RuntimeError("%s has no active worker." % backend_name)
1906 self._print(
1907 f"Using backend {backend_name} with {n_jobs} concurrent workers."
1908 )
1909 if hasattr(self._backend, 'start_call'):
1910 self._backend.start_call()
1912 # Following flag prevents double calls to `backend.stop_call`.
1913 self._calling = True
1915 iterator = iter(iterable)
1916 pre_dispatch = self.pre_dispatch
1918 if pre_dispatch == 'all':
1919 # prevent further dispatch via multiprocessing callback thread
1920 self._original_iterator = None
1921 self._pre_dispatch_amount = 0
1922 else:
1923 self._original_iterator = iterator
1924 if hasattr(pre_dispatch, 'endswith'):
1925 pre_dispatch = eval_expr(
1926 pre_dispatch.replace("n_jobs", str(n_jobs))
1927 )
1928 self._pre_dispatch_amount = pre_dispatch = int(pre_dispatch)
1930 # The main thread will consume the first pre_dispatch items and
1931 # the remaining items will later be lazily dispatched by async
1932 # callbacks upon task completions.
1934 # TODO: this iterator should be batch_size * n_jobs
1935 iterator = itertools.islice(iterator, self._pre_dispatch_amount)
1937 # Use a caching dict for callables that are pickled with cloudpickle to
1938 # improve performances. This cache is used only in the case of
1939 # functions that are defined in the __main__ module, functions that
1940 # are defined locally (inside another function) and lambda expressions.
1941 self._pickle_cache = dict()
1943 output = self._get_outputs(iterator, pre_dispatch)
1944 self._call_ref = weakref.ref(output)
1946 # The first item from the output is blank, but it makes the interpreter
1947 # progress until it enters the Try/Except block of the generator and
1948 # reach the first `yield` statement. This starts the aynchronous
1949 # dispatch of the tasks to the workers.
1950 next(output)
1952 return output if self.return_generator else list(output)
1954 def __repr__(self):
1955 return '%s(n_jobs=%s)' % (self.__class__.__name__, self.n_jobs)