Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/joblib/parallel.py: 16%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""
2Helpers for embarrassingly parallel code.
3"""
4# Author: Gael Varoquaux < gael dot varoquaux at normalesup dot org >
5# Copyright: 2010, Gael Varoquaux
6# License: BSD 3 clause
8from __future__ import division
10import os
11import sys
12from math import sqrt
13import functools
14import collections
15import time
16import threading
17import itertools
18from uuid import uuid4
19from numbers import Integral
20import warnings
21import queue
22import weakref
23from contextlib import nullcontext
25from multiprocessing import TimeoutError
27from ._multiprocessing_helpers import mp
29from .logger import Logger, short_format_time
30from .disk import memstr_to_bytes
31from ._parallel_backends import (FallbackToBackend, MultiprocessingBackend,
32 ThreadingBackend, SequentialBackend,
33 LokyBackend)
34from ._utils import eval_expr, _Sentinel
36# Make sure that those two classes are part of the public joblib.parallel API
37# so that 3rd party backend implementers can import them from here.
38from ._parallel_backends import AutoBatchingMixin # noqa
39from ._parallel_backends import ParallelBackendBase # noqa
42IS_PYPY = hasattr(sys, "pypy_version_info")
45BACKENDS = {
46 'threading': ThreadingBackend,
47 'sequential': SequentialBackend,
48}
49# name of the backend used by default by Parallel outside of any context
50# managed by ``parallel_config`` or ``parallel_backend``.
52# threading is the only backend that is always everywhere
53DEFAULT_BACKEND = 'threading'
55MAYBE_AVAILABLE_BACKENDS = {'multiprocessing', 'loky'}
57# if multiprocessing is available, so is loky, we set it as the default
58# backend
59if mp is not None:
60 BACKENDS['multiprocessing'] = MultiprocessingBackend
61 from .externals import loky
62 BACKENDS['loky'] = LokyBackend
63 DEFAULT_BACKEND = 'loky'
66DEFAULT_THREAD_BACKEND = 'threading'
69# Thread local value that can be overridden by the ``parallel_config`` context
70# manager
71_backend = threading.local()
74def _register_dask():
75 """Register Dask Backend if called with parallel_config(backend="dask")"""
76 try:
77 from ._dask import DaskDistributedBackend
78 register_parallel_backend('dask', DaskDistributedBackend)
79 except ImportError as e:
80 msg = ("To use the dask.distributed backend you must install both "
81 "the `dask` and distributed modules.\n\n"
82 "See https://dask.pydata.org/en/latest/install.html for more "
83 "information.")
84 raise ImportError(msg) from e
87EXTERNAL_BACKENDS = {
88 'dask': _register_dask,
89}
92# Sentinels for the default values of the Parallel constructor and
93# the parallel_config and parallel_backend context managers
94default_parallel_config = {
95 "backend": _Sentinel(default_value=None),
96 "n_jobs": _Sentinel(default_value=None),
97 "verbose": _Sentinel(default_value=0),
98 "temp_folder": _Sentinel(default_value=None),
99 "max_nbytes": _Sentinel(default_value="1M"),
100 "mmap_mode": _Sentinel(default_value="r"),
101 "prefer": _Sentinel(default_value=None),
102 "require": _Sentinel(default_value=None),
103}
106VALID_BACKEND_HINTS = ('processes', 'threads', None)
107VALID_BACKEND_CONSTRAINTS = ('sharedmem', None)
110def _get_config_param(param, context_config, key):
111 """Return the value of a parallel config parameter
113 Explicitly setting it in Parallel has priority over setting in a
114 parallel_(config/backend) context manager.
115 """
116 if param is not default_parallel_config[key]:
117 # param is explicitly set, return it
118 return param
120 if context_config[key] is not default_parallel_config[key]:
121 # there's a context manager and the key is set, return it
122 return context_config[key]
124 # Otherwise, we are in the default_parallel_config,
125 # return the default value
126 return param.default_value
129def get_active_backend(
130 prefer=default_parallel_config["prefer"],
131 require=default_parallel_config["require"],
132 verbose=default_parallel_config["verbose"],
133):
134 """Return the active default backend"""
135 backend, config = _get_active_backend(prefer, require, verbose)
136 n_jobs = _get_config_param(
137 default_parallel_config['n_jobs'], config, "n_jobs"
138 )
139 return backend, n_jobs
142def _get_active_backend(
143 prefer=default_parallel_config["prefer"],
144 require=default_parallel_config["require"],
145 verbose=default_parallel_config["verbose"],
146):
147 """Return the active default backend"""
149 backend_config = getattr(_backend, "config", default_parallel_config)
151 backend = _get_config_param(
152 default_parallel_config['backend'], backend_config, "backend"
153 )
154 prefer = _get_config_param(prefer, backend_config, "prefer")
155 require = _get_config_param(require, backend_config, "require")
156 verbose = _get_config_param(verbose, backend_config, "verbose")
158 if prefer not in VALID_BACKEND_HINTS:
159 raise ValueError(
160 f"prefer={prefer} is not a valid backend hint, "
161 f"expected one of {VALID_BACKEND_HINTS}"
162 )
163 if require not in VALID_BACKEND_CONSTRAINTS:
164 raise ValueError(
165 f"require={require} is not a valid backend constraint, "
166 f"expected one of {VALID_BACKEND_CONSTRAINTS}"
167 )
168 if prefer == 'processes' and require == 'sharedmem':
169 raise ValueError(
170 "prefer == 'processes' and require == 'sharedmem'"
171 " are inconsistent settings"
172 )
174 explicit_backend = True
175 if backend is None:
177 # We are either outside of the scope of any parallel_(config/backend)
178 # context manager or the context manager did not set a backend.
179 # create the default backend instance now.
180 backend = BACKENDS[DEFAULT_BACKEND](nesting_level=0)
181 explicit_backend = False
183 # Try to use the backend set by the user with the context manager.
185 nesting_level = backend.nesting_level
186 uses_threads = getattr(backend, 'uses_threads', False)
187 supports_sharedmem = getattr(backend, 'supports_sharedmem', False)
188 # Force to use thread-based backend if the provided backend does not
189 # match the shared memory constraint or if the backend is not explicitly
190 # given and threads are preferred.
191 force_threads = (require == 'sharedmem' and not supports_sharedmem)
192 force_threads |= (
193 not explicit_backend and prefer == 'threads' and not uses_threads
194 )
195 if force_threads:
196 # This backend does not match the shared memory constraint:
197 # fallback to the default thead-based backend.
198 sharedmem_backend = BACKENDS[DEFAULT_THREAD_BACKEND](
199 nesting_level=nesting_level
200 )
201 # Warn the user if we forced the backend to thread-based, while the
202 # user explicitly specified a non-thread-based backend.
203 if verbose >= 10 and explicit_backend:
204 print(
205 f"Using {sharedmem_backend.__class__.__name__} as "
206 f"joblib backend instead of {backend.__class__.__name__} "
207 "as the latter does not provide shared memory semantics."
208 )
209 # Force to n_jobs=1 by default
210 thread_config = backend_config.copy()
211 thread_config['n_jobs'] = 1
212 return sharedmem_backend, thread_config
214 return backend, backend_config
217class parallel_config:
218 """Set the default backend or configuration for :class:`~joblib.Parallel`.
220 This is an alternative to directly passing keyword arguments to the
221 :class:`~joblib.Parallel` class constructor. It is particularly useful when
222 calling into library code that uses joblib internally but does not expose
223 the various parallel configuration arguments in its own API.
225 Parameters
226 ----------
227 backend: str or ParallelBackendBase instance, default=None
228 If ``backend`` is a string it must match a previously registered
229 implementation using the :func:`~register_parallel_backend` function.
231 By default the following backends are available:
233 - 'loky': single-host, process-based parallelism (used by default),
234 - 'threading': single-host, thread-based parallelism,
235 - 'multiprocessing': legacy single-host, process-based parallelism.
237 'loky' is recommended to run functions that manipulate Python objects.
238 'threading' is a low-overhead alternative that is most efficient for
239 functions that release the Global Interpreter Lock: e.g. I/O-bound
240 code or CPU-bound code in a few calls to native code that explicitly
241 releases the GIL. Note that on some rare systems (such as pyodide),
242 multiprocessing and loky may not be available, in which case joblib
243 defaults to threading.
245 In addition, if the ``dask`` and ``distributed`` Python packages are
246 installed, it is possible to use the 'dask' backend for better
247 scheduling of nested parallel calls without over-subscription and
248 potentially distribute parallel calls over a networked cluster of
249 several hosts.
251 It is also possible to use the distributed 'ray' backend for
252 distributing the workload to a cluster of nodes. See more details
253 in the Examples section below.
255 Alternatively the backend can be passed directly as an instance.
257 n_jobs: int, default=None
258 The maximum number of concurrently running jobs, such as the number
259 of Python worker processes when ``backend="loky"`` or the size of the
260 thread-pool when ``backend="threading"``.
261 This argument is converted to an integer, rounded below for float.
262 If -1 is given, `joblib` tries to use all CPUs. The number of CPUs
263 ``n_cpus`` is obtained with :func:`~cpu_count`.
264 For n_jobs below -1, (n_cpus + 1 + n_jobs) are used. For instance,
265 using ``n_jobs=-2`` will result in all CPUs but one being used.
266 This argument can also go above ``n_cpus``, which will cause
267 oversubscription. In some cases, slight oversubscription can be
268 beneficial, e.g., for tasks with large I/O operations.
269 If 1 is given, no parallel computing code is used at all, and the
270 behavior amounts to a simple python `for` loop. This mode is not
271 compatible with `timeout`.
272 None is a marker for 'unset' that will be interpreted as n_jobs=1
273 unless the call is performed under a :func:`~parallel_config`
274 context manager that sets another value for ``n_jobs``.
275 If n_jobs = 0 then a ValueError is raised.
277 verbose: int, default=0
278 The verbosity level: if non zero, progress messages are
279 printed. Above 50, the output is sent to stdout.
280 The frequency of the messages increases with the verbosity level.
281 If it more than 10, all iterations are reported.
283 temp_folder: str or None, default=None
284 Folder to be used by the pool for memmapping large arrays
285 for sharing memory with worker processes. If None, this will try in
286 order:
288 - a folder pointed by the ``JOBLIB_TEMP_FOLDER`` environment
289 variable,
290 - ``/dev/shm`` if the folder exists and is writable: this is a
291 RAM disk filesystem available by default on modern Linux
292 distributions,
293 - the default system temporary folder that can be
294 overridden with ``TMP``, ``TMPDIR`` or ``TEMP`` environment
295 variables, typically ``/tmp`` under Unix operating systems.
297 max_nbytes int, str, or None, optional, default='1M'
298 Threshold on the size of arrays passed to the workers that
299 triggers automated memory mapping in temp_folder. Can be an int
300 in Bytes, or a human-readable string, e.g., '1M' for 1 megabyte.
301 Use None to disable memmapping of large arrays.
303 mmap_mode: {None, 'r+', 'r', 'w+', 'c'}, default='r'
304 Memmapping mode for numpy arrays passed to workers. None will
305 disable memmapping, other modes defined in the numpy.memmap doc:
306 https://numpy.org/doc/stable/reference/generated/numpy.memmap.html
307 Also, see 'max_nbytes' parameter documentation for more details.
309 prefer: str in {'processes', 'threads'} or None, default=None
310 Soft hint to choose the default backend.
311 The default process-based backend is 'loky' and the default
312 thread-based backend is 'threading'. Ignored if the ``backend``
313 parameter is specified.
315 require: 'sharedmem' or None, default=None
316 Hard constraint to select the backend. If set to 'sharedmem',
317 the selected backend will be single-host and thread-based.
319 inner_max_num_threads: int, default=None
320 If not None, overwrites the limit set on the number of threads
321 usable in some third-party library threadpools like OpenBLAS,
322 MKL or OpenMP. This is only used with the ``loky`` backend.
324 backend_params: dict
325 Additional parameters to pass to the backend constructor when
326 backend is a string.
328 Notes
329 -----
330 Joblib tries to limit the oversubscription by limiting the number of
331 threads usable in some third-party library threadpools like OpenBLAS, MKL
332 or OpenMP. The default limit in each worker is set to
333 ``max(cpu_count() // effective_n_jobs, 1)`` but this limit can be
334 overwritten with the ``inner_max_num_threads`` argument which will be used
335 to set this limit in the child processes.
337 .. versionadded:: 1.3
339 Examples
340 --------
341 >>> from operator import neg
342 >>> with parallel_config(backend='threading'):
343 ... print(Parallel()(delayed(neg)(i + 1) for i in range(5)))
344 ...
345 [-1, -2, -3, -4, -5]
347 To use the 'ray' joblib backend add the following lines:
349 >>> from ray.util.joblib import register_ray # doctest: +SKIP
350 >>> register_ray() # doctest: +SKIP
351 >>> with parallel_config(backend="ray"): # doctest: +SKIP
352 ... print(Parallel()(delayed(neg)(i + 1) for i in range(5)))
353 [-1, -2, -3, -4, -5]
355 """
356 def __init__(
357 self,
358 backend=default_parallel_config["backend"],
359 *,
360 n_jobs=default_parallel_config["n_jobs"],
361 verbose=default_parallel_config["verbose"],
362 temp_folder=default_parallel_config["temp_folder"],
363 max_nbytes=default_parallel_config["max_nbytes"],
364 mmap_mode=default_parallel_config["mmap_mode"],
365 prefer=default_parallel_config["prefer"],
366 require=default_parallel_config["require"],
367 inner_max_num_threads=None,
368 **backend_params
369 ):
370 # Save the parallel info and set the active parallel config
371 self.old_parallel_config = getattr(
372 _backend, "config", default_parallel_config
373 )
375 backend = self._check_backend(
376 backend, inner_max_num_threads, **backend_params
377 )
379 new_config = {
380 "n_jobs": n_jobs,
381 "verbose": verbose,
382 "temp_folder": temp_folder,
383 "max_nbytes": max_nbytes,
384 "mmap_mode": mmap_mode,
385 "prefer": prefer,
386 "require": require,
387 "backend": backend
388 }
389 self.parallel_config = self.old_parallel_config.copy()
390 self.parallel_config.update({
391 k: v for k, v in new_config.items()
392 if not isinstance(v, _Sentinel)
393 })
395 setattr(_backend, "config", self.parallel_config)
397 def _check_backend(self, backend, inner_max_num_threads, **backend_params):
398 if backend is default_parallel_config['backend']:
399 if inner_max_num_threads is not None or len(backend_params) > 0:
400 raise ValueError(
401 "inner_max_num_threads and other constructor "
402 "parameters backend_params are only supported "
403 "when backend is not None."
404 )
405 return backend
407 if isinstance(backend, str):
408 # Handle non-registered or missing backends
409 if backend not in BACKENDS:
410 if backend in EXTERNAL_BACKENDS:
411 register = EXTERNAL_BACKENDS[backend]
412 register()
413 elif backend in MAYBE_AVAILABLE_BACKENDS:
414 warnings.warn(
415 f"joblib backend '{backend}' is not available on "
416 f"your system, falling back to {DEFAULT_BACKEND}.",
417 UserWarning,
418 stacklevel=2
419 )
420 BACKENDS[backend] = BACKENDS[DEFAULT_BACKEND]
421 else:
422 raise ValueError(
423 f"Invalid backend: {backend}, expected one of "
424 f"{sorted(BACKENDS.keys())}"
425 )
427 backend = BACKENDS[backend](**backend_params)
429 if inner_max_num_threads is not None:
430 msg = (
431 f"{backend.__class__.__name__} does not accept setting the "
432 "inner_max_num_threads argument."
433 )
434 assert backend.supports_inner_max_num_threads, msg
435 backend.inner_max_num_threads = inner_max_num_threads
437 # If the nesting_level of the backend is not set previously, use the
438 # nesting level from the previous active_backend to set it
439 if backend.nesting_level is None:
440 parent_backend = self.old_parallel_config['backend']
441 if parent_backend is default_parallel_config['backend']:
442 nesting_level = 0
443 else:
444 nesting_level = parent_backend.nesting_level
445 backend.nesting_level = nesting_level
447 return backend
449 def __enter__(self):
450 return self.parallel_config
452 def __exit__(self, type, value, traceback):
453 self.unregister()
455 def unregister(self):
456 setattr(_backend, "config", self.old_parallel_config)
459class parallel_backend(parallel_config):
460 """Change the default backend used by Parallel inside a with block.
462 .. warning::
463 It is advised to use the :class:`~joblib.parallel_config` context
464 manager instead, which allows more fine-grained control over the
465 backend configuration.
467 If ``backend`` is a string it must match a previously registered
468 implementation using the :func:`~register_parallel_backend` function.
470 By default the following backends are available:
472 - 'loky': single-host, process-based parallelism (used by default),
473 - 'threading': single-host, thread-based parallelism,
474 - 'multiprocessing': legacy single-host, process-based parallelism.
476 'loky' is recommended to run functions that manipulate Python objects.
477 'threading' is a low-overhead alternative that is most efficient for
478 functions that release the Global Interpreter Lock: e.g. I/O-bound code or
479 CPU-bound code in a few calls to native code that explicitly releases the
480 GIL. Note that on some rare systems (such as Pyodide),
481 multiprocessing and loky may not be available, in which case joblib
482 defaults to threading.
484 You can also use the `Dask <https://docs.dask.org/en/stable/>`_ joblib
485 backend to distribute work across machines. This works well with
486 scikit-learn estimators with the ``n_jobs`` parameter, for example::
488 >>> import joblib # doctest: +SKIP
489 >>> from sklearn.model_selection import GridSearchCV # doctest: +SKIP
490 >>> from dask.distributed import Client, LocalCluster # doctest: +SKIP
492 >>> # create a local Dask cluster
493 >>> cluster = LocalCluster() # doctest: +SKIP
494 >>> client = Client(cluster) # doctest: +SKIP
495 >>> grid_search = GridSearchCV(estimator, param_grid, n_jobs=-1)
496 ... # doctest: +SKIP
497 >>> with joblib.parallel_backend("dask", scatter=[X, y]): # doctest: +SKIP
498 ... grid_search.fit(X, y)
500 It is also possible to use the distributed 'ray' backend for distributing
501 the workload to a cluster of nodes. To use the 'ray' joblib backend add
502 the following lines::
504 >>> from ray.util.joblib import register_ray # doctest: +SKIP
505 >>> register_ray() # doctest: +SKIP
506 >>> with parallel_backend("ray"): # doctest: +SKIP
507 ... print(Parallel()(delayed(neg)(i + 1) for i in range(5)))
508 [-1, -2, -3, -4, -5]
510 Alternatively the backend can be passed directly as an instance.
512 By default all available workers will be used (``n_jobs=-1``) unless the
513 caller passes an explicit value for the ``n_jobs`` parameter.
515 This is an alternative to passing a ``backend='backend_name'`` argument to
516 the :class:`~Parallel` class constructor. It is particularly useful when
517 calling into library code that uses joblib internally but does not expose
518 the backend argument in its own API.
520 >>> from operator import neg
521 >>> with parallel_backend('threading'):
522 ... print(Parallel()(delayed(neg)(i + 1) for i in range(5)))
523 ...
524 [-1, -2, -3, -4, -5]
526 Joblib also tries to limit the oversubscription by limiting the number of
527 threads usable in some third-party library threadpools like OpenBLAS, MKL
528 or OpenMP. The default limit in each worker is set to
529 ``max(cpu_count() // effective_n_jobs, 1)`` but this limit can be
530 overwritten with the ``inner_max_num_threads`` argument which will be used
531 to set this limit in the child processes.
533 .. versionadded:: 0.10
535 See Also
536 --------
537 joblib.parallel_config: context manager to change the backend
538 configuration.
539 """
540 def __init__(self, backend, n_jobs=-1, inner_max_num_threads=None,
541 **backend_params):
543 super().__init__(
544 backend=backend,
545 n_jobs=n_jobs,
546 inner_max_num_threads=inner_max_num_threads,
547 **backend_params
548 )
550 if self.old_parallel_config is None:
551 self.old_backend_and_jobs = None
552 else:
553 self.old_backend_and_jobs = (
554 self.old_parallel_config["backend"],
555 self.old_parallel_config["n_jobs"],
556 )
557 self.new_backend_and_jobs = (
558 self.parallel_config["backend"],
559 self.parallel_config["n_jobs"],
560 )
562 def __enter__(self):
563 return self.new_backend_and_jobs
566# Under Linux or OS X the default start method of multiprocessing
567# can cause third party libraries to crash. Under Python 3.4+ it is possible
568# to set an environment variable to switch the default start method from
569# 'fork' to 'forkserver' or 'spawn' to avoid this issue albeit at the cost
570# of causing semantic changes and some additional pool instantiation overhead.
571DEFAULT_MP_CONTEXT = None
572if hasattr(mp, 'get_context'):
573 method = os.environ.get('JOBLIB_START_METHOD', '').strip() or None
574 if method is not None:
575 DEFAULT_MP_CONTEXT = mp.get_context(method=method)
578class BatchedCalls(object):
579 """Wrap a sequence of (func, args, kwargs) tuples as a single callable"""
581 def __init__(self, iterator_slice, backend_and_jobs, reducer_callback=None,
582 pickle_cache=None):
583 self.items = list(iterator_slice)
584 self._size = len(self.items)
585 self._reducer_callback = reducer_callback
586 if isinstance(backend_and_jobs, tuple):
587 self._backend, self._n_jobs = backend_and_jobs
588 else:
589 # this is for backward compatibility purposes. Before 0.12.6,
590 # nested backends were returned without n_jobs indications.
591 self._backend, self._n_jobs = backend_and_jobs, None
592 self._pickle_cache = pickle_cache if pickle_cache is not None else {}
594 def __call__(self):
595 # Set the default nested backend to self._backend but do not set the
596 # change the default number of processes to -1
597 with parallel_config(backend=self._backend, n_jobs=self._n_jobs):
598 return [func(*args, **kwargs)
599 for func, args, kwargs in self.items]
601 def __reduce__(self):
602 if self._reducer_callback is not None:
603 self._reducer_callback()
604 # no need to pickle the callback.
605 return (
606 BatchedCalls,
607 (self.items, (self._backend, self._n_jobs), None,
608 self._pickle_cache)
609 )
611 def __len__(self):
612 return self._size
615# Possible exit status for a task
616TASK_DONE = "Done"
617TASK_ERROR = "Error"
618TASK_PENDING = "Pending"
621###############################################################################
622# CPU count that works also when multiprocessing has been disabled via
623# the JOBLIB_MULTIPROCESSING environment variable
624def cpu_count(only_physical_cores=False):
625 """Return the number of CPUs.
627 This delegates to loky.cpu_count that takes into account additional
628 constraints such as Linux CFS scheduler quotas (typically set by container
629 runtimes such as docker) and CPU affinity (for instance using the taskset
630 command on Linux).
632 If only_physical_cores is True, do not take hyperthreading / SMT logical
633 cores into account.
634 """
635 if mp is None:
636 return 1
638 return loky.cpu_count(only_physical_cores=only_physical_cores)
641###############################################################################
642# For verbosity
644def _verbosity_filter(index, verbose):
645 """ Returns False for indices increasingly apart, the distance
646 depending on the value of verbose.
648 We use a lag increasing as the square of index
649 """
650 if not verbose:
651 return True
652 elif verbose > 10:
653 return False
654 if index == 0:
655 return False
656 verbose = .5 * (11 - verbose) ** 2
657 scale = sqrt(index / verbose)
658 next_scale = sqrt((index + 1) / verbose)
659 return (int(next_scale) == int(scale))
662###############################################################################
663def delayed(function):
664 """Decorator used to capture the arguments of a function."""
666 def delayed_function(*args, **kwargs):
667 return function, args, kwargs
668 try:
669 delayed_function = functools.wraps(function)(delayed_function)
670 except AttributeError:
671 " functools.wraps fails on some callable objects "
672 return delayed_function
675###############################################################################
676class BatchCompletionCallBack(object):
677 """Callback to keep track of completed results and schedule the next tasks.
679 This callable is executed by the parent process whenever a worker process
680 has completed a batch of tasks.
682 It is used for progress reporting, to update estimate of the batch
683 processing duration and to schedule the next batch of tasks to be
684 processed.
686 It is assumed that this callback will always be triggered by the backend
687 right after the end of a task, in case of success as well as in case of
688 failure.
689 """
691 ##########################################################################
692 # METHODS CALLED BY THE MAIN THREAD #
693 ##########################################################################
694 def __init__(self, dispatch_timestamp, batch_size, parallel):
695 self.dispatch_timestamp = dispatch_timestamp
696 self.batch_size = batch_size
697 self.parallel = parallel
698 self.parallel_call_id = parallel._call_id
700 # Internals to keep track of the status and outcome of the task.
702 # Used to hold a reference to the future-like object returned by the
703 # backend after launching this task
704 # This will be set later when calling `register_job`, as it is only
705 # created once the task has been submitted.
706 self.job = None
708 if not parallel._backend.supports_retrieve_callback:
709 # The status is only used for asynchronous result retrieval in the
710 # callback.
711 self.status = None
712 else:
713 # The initial status for the job is TASK_PENDING.
714 # Once it is done, it will be either TASK_DONE, or TASK_ERROR.
715 self.status = TASK_PENDING
717 def register_job(self, job):
718 """Register the object returned by `apply_async`."""
719 self.job = job
721 def get_result(self, timeout):
722 """Returns the raw result of the task that was submitted.
724 If the task raised an exception rather than returning, this same
725 exception will be raised instead.
727 If the backend supports the retrieval callback, it is assumed that this
728 method is only called after the result has been registered. It is
729 ensured by checking that `self.status(timeout)` does not return
730 TASK_PENDING. In this case, `get_result` directly returns the
731 registered result (or raise the registered exception).
733 For other backends, there are no such assumptions, but `get_result`
734 still needs to synchronously retrieve the result before it can
735 return it or raise. It will block at most `self.timeout` seconds
736 waiting for retrieval to complete, after that it raises a TimeoutError.
737 """
739 backend = self.parallel._backend
741 if backend.supports_retrieve_callback:
742 # We assume that the result has already been retrieved by the
743 # callback thread, and is stored internally. It's just waiting to
744 # be returned.
745 return self._return_or_raise()
747 # For other backends, the main thread needs to run the retrieval step.
748 try:
749 if backend.supports_timeout:
750 result = self.job.get(timeout=timeout)
751 else:
752 result = self.job.get()
753 outcome = dict(result=result, status=TASK_DONE)
754 except BaseException as e:
755 outcome = dict(result=e, status=TASK_ERROR)
756 self._register_outcome(outcome)
758 return self._return_or_raise()
760 def _return_or_raise(self):
761 try:
762 if self.status == TASK_ERROR:
763 raise self._result
764 return self._result
765 finally:
766 del self._result
768 def get_status(self, timeout):
769 """Get the status of the task.
771 This function also checks if the timeout has been reached and register
772 the TimeoutError outcome when it is the case.
773 """
774 if timeout is None or self.status != TASK_PENDING:
775 return self.status
777 # The computation are running and the status is pending.
778 # Check that we did not wait for this jobs more than `timeout`.
779 now = time.time()
780 if not hasattr(self, "_completion_timeout_counter"):
781 self._completion_timeout_counter = now
783 if (now - self._completion_timeout_counter) > timeout:
784 outcome = dict(result=TimeoutError(), status=TASK_ERROR)
785 self._register_outcome(outcome)
787 return self.status
789 ##########################################################################
790 # METHODS CALLED BY CALLBACK THREADS #
791 ##########################################################################
792 def __call__(self, out):
793 """Function called by the callback thread after a job is completed."""
795 # If the backend doesn't support callback retrievals, the next batch of
796 # tasks is dispatched regardless. The result will be retrieved by the
797 # main thread when calling `get_result`.
798 if not self.parallel._backend.supports_retrieve_callback:
799 self._dispatch_new()
800 return
802 # If the backend supports retrieving the result in the callback, it
803 # registers the task outcome (TASK_ERROR or TASK_DONE), and schedules
804 # the next batch if needed.
805 with self.parallel._lock:
806 # Edge case where while the task was processing, the `parallel`
807 # instance has been reset and a new call has been issued, but the
808 # worker managed to complete the task and trigger this callback
809 # call just before being aborted by the reset.
810 if self.parallel._call_id != self.parallel_call_id:
811 return
813 # When aborting, stop as fast as possible and do not retrieve the
814 # result as it won't be returned by the Parallel call.
815 if self.parallel._aborting:
816 return
818 # Retrieves the result of the task in the main process and dispatch
819 # a new batch if needed.
820 job_succeeded = self._retrieve_result(out)
822 if not self.parallel.return_ordered:
823 # Append the job to the queue in the order of completion
824 # instead of submission.
825 self.parallel._jobs.append(self)
827 if job_succeeded:
828 self._dispatch_new()
830 def _dispatch_new(self):
831 """Schedule the next batch of tasks to be processed."""
833 # This steps ensure that auto-batching works as expected.
834 this_batch_duration = time.time() - self.dispatch_timestamp
835 self.parallel._backend.batch_completed(self.batch_size,
836 this_batch_duration)
838 # Schedule the next batch of tasks.
839 with self.parallel._lock:
840 self.parallel.n_completed_tasks += self.batch_size
841 self.parallel.print_progress()
842 if self.parallel._original_iterator is not None:
843 self.parallel.dispatch_next()
845 def _retrieve_result(self, out):
846 """Fetch and register the outcome of a task.
848 Return True if the task succeeded, False otherwise.
849 This function is only called by backends that support retrieving
850 the task result in the callback thread.
851 """
852 try:
853 result = self.parallel._backend.retrieve_result_callback(out)
854 outcome = dict(status=TASK_DONE, result=result)
855 except BaseException as e:
856 # Avoid keeping references to parallel in the error.
857 e.__traceback__ = None
858 outcome = dict(result=e, status=TASK_ERROR)
860 self._register_outcome(outcome)
861 return outcome['status'] != TASK_ERROR
863 ##########################################################################
864 # This method can be called either in the main thread #
865 # or in the callback thread. #
866 ##########################################################################
867 def _register_outcome(self, outcome):
868 """Register the outcome of a task.
870 This method can be called only once, future calls will be ignored.
871 """
872 # Covers the edge case where the main thread tries to register a
873 # `TimeoutError` while the callback thread tries to register a result
874 # at the same time.
875 with self.parallel._lock:
876 if self.status not in (TASK_PENDING, None):
877 return
878 self.status = outcome["status"]
880 self._result = outcome["result"]
882 # Once the result and the status are extracted, the last reference to
883 # the job can be deleted.
884 self.job = None
886 # As soon as an error as been spotted, early stopping flags are sent to
887 # the `parallel` instance.
888 if self.status == TASK_ERROR:
889 self.parallel._exception = True
890 self.parallel._aborting = True
893###############################################################################
894def register_parallel_backend(name, factory, make_default=False):
895 """Register a new Parallel backend factory.
897 The new backend can then be selected by passing its name as the backend
898 argument to the :class:`~Parallel` class. Moreover, the default backend can
899 be overwritten globally by setting make_default=True.
901 The factory can be any callable that takes no argument and return an
902 instance of ``ParallelBackendBase``.
904 Warning: this function is experimental and subject to change in a future
905 version of joblib.
907 .. versionadded:: 0.10
908 """
909 BACKENDS[name] = factory
910 if make_default:
911 global DEFAULT_BACKEND
912 DEFAULT_BACKEND = name
915def effective_n_jobs(n_jobs=-1):
916 """Determine the number of jobs that can actually run in parallel
918 n_jobs is the number of workers requested by the callers. Passing n_jobs=-1
919 means requesting all available workers for instance matching the number of
920 CPU cores on the worker host(s).
922 This method should return a guesstimate of the number of workers that can
923 actually perform work concurrently with the currently enabled default
924 backend. The primary use case is to make it possible for the caller to know
925 in how many chunks to slice the work.
927 In general working on larger data chunks is more efficient (less scheduling
928 overhead and better use of CPU cache prefetching heuristics) as long as all
929 the workers have enough work to do.
931 Warning: this function is experimental and subject to change in a future
932 version of joblib.
934 .. versionadded:: 0.10
935 """
936 if n_jobs == 1:
937 return 1
939 backend, backend_n_jobs = get_active_backend()
940 if n_jobs is None:
941 n_jobs = backend_n_jobs
942 return backend.effective_n_jobs(n_jobs=n_jobs)
945###############################################################################
946class Parallel(Logger):
947 ''' Helper class for readable parallel mapping.
949 Read more in the :ref:`User Guide <parallel>`.
951 Parameters
952 ----------
953 n_jobs: int, default=None
954 The maximum number of concurrently running jobs, such as the number
955 of Python worker processes when ``backend="loky"`` or the size of
956 the thread-pool when ``backend="threading"``.
957 This argument is converted to an integer, rounded below for float.
958 If -1 is given, `joblib` tries to use all CPUs. The number of CPUs
959 ``n_cpus`` is obtained with :func:`~cpu_count`.
960 For n_jobs below -1, (n_cpus + 1 + n_jobs) are used. For instance,
961 using ``n_jobs=-2`` will result in all CPUs but one being used.
962 This argument can also go above ``n_cpus``, which will cause
963 oversubscription. In some cases, slight oversubscription can be
964 beneficial, e.g., for tasks with large I/O operations.
965 If 1 is given, no parallel computing code is used at all, and the
966 behavior amounts to a simple python `for` loop. This mode is not
967 compatible with ``timeout``.
968 None is a marker for 'unset' that will be interpreted as n_jobs=1
969 unless the call is performed under a :func:`~parallel_config`
970 context manager that sets another value for ``n_jobs``.
971 If n_jobs = 0 then a ValueError is raised.
972 backend: str, ParallelBackendBase instance or None, default='loky'
973 Specify the parallelization backend implementation.
974 Supported backends are:
976 - "loky" used by default, can induce some
977 communication and memory overhead when exchanging input and
978 output data with the worker Python processes. On some rare
979 systems (such as Pyiodide), the loky backend may not be
980 available.
981 - "multiprocessing" previous process-based backend based on
982 `multiprocessing.Pool`. Less robust than `loky`.
983 - "threading" is a very low-overhead backend but it suffers
984 from the Python Global Interpreter Lock if the called function
985 relies a lot on Python objects. "threading" is mostly useful
986 when the execution bottleneck is a compiled extension that
987 explicitly releases the GIL (for instance a Cython loop wrapped
988 in a "with nogil" block or an expensive call to a library such
989 as NumPy).
990 - finally, you can register backends by calling
991 :func:`~register_parallel_backend`. This will allow you to
992 implement a backend of your liking.
994 It is not recommended to hard-code the backend name in a call to
995 :class:`~Parallel` in a library. Instead it is recommended to set
996 soft hints (prefer) or hard constraints (require) so as to make it
997 possible for library users to change the backend from the outside
998 using the :func:`~parallel_config` context manager.
999 return_as: str in {'list', 'generator', 'generator_unordered'}, default='list'
1000 If 'list', calls to this instance will return a list, only when
1001 all results have been processed and retrieved.
1002 If 'generator', it will return a generator that yields the results
1003 as soon as they are available, in the order the tasks have been
1004 submitted with.
1005 If 'generator_unordered', the generator will immediately yield
1006 available results independently of the submission order. The output
1007 order is not deterministic in this case because it depends on the
1008 concurrency of the workers.
1009 prefer: str in {'processes', 'threads'} or None, default=None
1010 Soft hint to choose the default backend if no specific backend
1011 was selected with the :func:`~parallel_config` context manager.
1012 The default process-based backend is 'loky' and the default
1013 thread-based backend is 'threading'. Ignored if the ``backend``
1014 parameter is specified.
1015 require: 'sharedmem' or None, default=None
1016 Hard constraint to select the backend. If set to 'sharedmem',
1017 the selected backend will be single-host and thread-based even
1018 if the user asked for a non-thread based backend with
1019 :func:`~joblib.parallel_config`.
1020 verbose: int, default=0
1021 The verbosity level: if non zero, progress messages are
1022 printed. Above 50, the output is sent to stdout.
1023 The frequency of the messages increases with the verbosity level.
1024 If it more than 10, all iterations are reported.
1025 timeout: float or None, default=None
1026 Timeout limit for each task to complete. If any task takes longer
1027 a TimeOutError will be raised. Only applied when n_jobs != 1
1028 pre_dispatch: {'all', integer, or expression, as in '3*n_jobs'}, default='2*n_jobs'
1029 The number of batches (of tasks) to be pre-dispatched.
1030 Default is '2*n_jobs'. When batch_size="auto" this is reasonable
1031 default and the workers should never starve. Note that only basic
1032 arithmetics are allowed here and no modules can be used in this
1033 expression.
1034 batch_size: int or 'auto', default='auto'
1035 The number of atomic tasks to dispatch at once to each
1036 worker. When individual evaluations are very fast, dispatching
1037 calls to workers can be slower than sequential computation because
1038 of the overhead. Batching fast computations together can mitigate
1039 this.
1040 The ``'auto'`` strategy keeps track of the time it takes for a
1041 batch to complete, and dynamically adjusts the batch size to keep
1042 the time on the order of half a second, using a heuristic. The
1043 initial batch size is 1.
1044 ``batch_size="auto"`` with ``backend="threading"`` will dispatch
1045 batches of a single task at a time as the threading backend has
1046 very little overhead and using larger batch size has not proved to
1047 bring any gain in that case.
1048 temp_folder: str or None, default=None
1049 Folder to be used by the pool for memmapping large arrays
1050 for sharing memory with worker processes. If None, this will try in
1051 order:
1053 - a folder pointed by the JOBLIB_TEMP_FOLDER environment
1054 variable,
1055 - /dev/shm if the folder exists and is writable: this is a
1056 RAM disk filesystem available by default on modern Linux
1057 distributions,
1058 - the default system temporary folder that can be
1059 overridden with TMP, TMPDIR or TEMP environment
1060 variables, typically /tmp under Unix operating systems.
1062 Only active when ``backend="loky"`` or ``"multiprocessing"``.
1063 max_nbytes int, str, or None, optional, default='1M'
1064 Threshold on the size of arrays passed to the workers that
1065 triggers automated memory mapping in temp_folder. Can be an int
1066 in Bytes, or a human-readable string, e.g., '1M' for 1 megabyte.
1067 Use None to disable memmapping of large arrays.
1068 Only active when ``backend="loky"`` or ``"multiprocessing"``.
1069 mmap_mode: {None, 'r+', 'r', 'w+', 'c'}, default='r'
1070 Memmapping mode for numpy arrays passed to workers. None will
1071 disable memmapping, other modes defined in the numpy.memmap doc:
1072 https://numpy.org/doc/stable/reference/generated/numpy.memmap.html
1073 Also, see 'max_nbytes' parameter documentation for more details.
1075 Notes
1076 -----
1078 This object uses workers to compute in parallel the application of a
1079 function to many different arguments. The main functionality it brings
1080 in addition to using the raw multiprocessing or concurrent.futures API
1081 are (see examples for details):
1083 * More readable code, in particular since it avoids
1084 constructing list of arguments.
1086 * Easier debugging:
1087 - informative tracebacks even when the error happens on
1088 the client side
1089 - using 'n_jobs=1' enables to turn off parallel computing
1090 for debugging without changing the codepath
1091 - early capture of pickling errors
1093 * An optional progress meter.
1095 * Interruption of multiprocesses jobs with 'Ctrl-C'
1097 * Flexible pickling control for the communication to and from
1098 the worker processes.
1100 * Ability to use shared memory efficiently with worker
1101 processes for large numpy-based datastructures.
1103 Note that the intended usage is to run one call at a time. Multiple
1104 calls to the same Parallel object will result in a ``RuntimeError``
1106 Examples
1107 --------
1109 A simple example:
1111 >>> from math import sqrt
1112 >>> from joblib import Parallel, delayed
1113 >>> Parallel(n_jobs=1)(delayed(sqrt)(i**2) for i in range(10))
1114 [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
1116 Reshaping the output when the function has several return
1117 values:
1119 >>> from math import modf
1120 >>> from joblib import Parallel, delayed
1121 >>> r = Parallel(n_jobs=1)(delayed(modf)(i/2.) for i in range(10))
1122 >>> res, i = zip(*r)
1123 >>> res
1124 (0.0, 0.5, 0.0, 0.5, 0.0, 0.5, 0.0, 0.5, 0.0, 0.5)
1125 >>> i
1126 (0.0, 0.0, 1.0, 1.0, 2.0, 2.0, 3.0, 3.0, 4.0, 4.0)
1128 The progress meter: the higher the value of `verbose`, the more
1129 messages:
1131 >>> from time import sleep
1132 >>> from joblib import Parallel, delayed
1133 >>> r = Parallel(n_jobs=2, verbose=10)(
1134 ... delayed(sleep)(.2) for _ in range(10)) #doctest: +SKIP
1135 [Parallel(n_jobs=2)]: Done 1 tasks | elapsed: 0.6s
1136 [Parallel(n_jobs=2)]: Done 4 tasks | elapsed: 0.8s
1137 [Parallel(n_jobs=2)]: Done 10 out of 10 | elapsed: 1.4s finished
1139 Traceback example, note how the line of the error is indicated
1140 as well as the values of the parameter passed to the function that
1141 triggered the exception, even though the traceback happens in the
1142 child process:
1144 >>> from heapq import nlargest
1145 >>> from joblib import Parallel, delayed
1146 >>> Parallel(n_jobs=2)(
1147 ... delayed(nlargest)(2, n) for n in (range(4), 'abcde', 3))
1148 ... # doctest: +SKIP
1149 -----------------------------------------------------------------------
1150 Sub-process traceback:
1151 -----------------------------------------------------------------------
1152 TypeError Mon Nov 12 11:37:46 2012
1153 PID: 12934 Python 2.7.3: /usr/bin/python
1154 ........................................................................
1155 /usr/lib/python2.7/heapq.pyc in nlargest(n=2, iterable=3, key=None)
1156 419 if n >= size:
1157 420 return sorted(iterable, key=key, reverse=True)[:n]
1158 421
1159 422 # When key is none, use simpler decoration
1160 423 if key is None:
1161 --> 424 it = izip(iterable, count(0,-1)) # decorate
1162 425 result = _nlargest(n, it)
1163 426 return map(itemgetter(0), result) # undecorate
1164 427
1165 428 # General case, slowest method
1166 TypeError: izip argument #1 must support iteration
1167 _______________________________________________________________________
1170 Using pre_dispatch in a producer/consumer situation, where the
1171 data is generated on the fly. Note how the producer is first
1172 called 3 times before the parallel loop is initiated, and then
1173 called to generate new data on the fly:
1175 >>> from math import sqrt
1176 >>> from joblib import Parallel, delayed
1177 >>> def producer():
1178 ... for i in range(6):
1179 ... print('Produced %s' % i)
1180 ... yield i
1181 >>> out = Parallel(n_jobs=2, verbose=100, pre_dispatch='1.5*n_jobs')(
1182 ... delayed(sqrt)(i) for i in producer()) #doctest: +SKIP
1183 Produced 0
1184 Produced 1
1185 Produced 2
1186 [Parallel(n_jobs=2)]: Done 1 jobs | elapsed: 0.0s
1187 Produced 3
1188 [Parallel(n_jobs=2)]: Done 2 jobs | elapsed: 0.0s
1189 Produced 4
1190 [Parallel(n_jobs=2)]: Done 3 jobs | elapsed: 0.0s
1191 Produced 5
1192 [Parallel(n_jobs=2)]: Done 4 jobs | elapsed: 0.0s
1193 [Parallel(n_jobs=2)]: Done 6 out of 6 | elapsed: 0.0s remaining: 0.0s
1194 [Parallel(n_jobs=2)]: Done 6 out of 6 | elapsed: 0.0s finished
1196 ''' # noqa: E501
1197 def __init__(
1198 self,
1199 n_jobs=default_parallel_config["n_jobs"],
1200 backend=default_parallel_config['backend'],
1201 return_as="list",
1202 verbose=default_parallel_config["verbose"],
1203 timeout=None,
1204 pre_dispatch='2 * n_jobs',
1205 batch_size='auto',
1206 temp_folder=default_parallel_config["temp_folder"],
1207 max_nbytes=default_parallel_config["max_nbytes"],
1208 mmap_mode=default_parallel_config["mmap_mode"],
1209 prefer=default_parallel_config["prefer"],
1210 require=default_parallel_config["require"],
1211 ):
1212 # Initiate parent Logger class state
1213 super().__init__()
1215 # Interpret n_jobs=None as 'unset'
1216 if n_jobs is None:
1217 n_jobs = default_parallel_config["n_jobs"]
1219 active_backend, context_config = _get_active_backend(
1220 prefer=prefer, require=require, verbose=verbose
1221 )
1223 nesting_level = active_backend.nesting_level
1225 self.verbose = _get_config_param(verbose, context_config, "verbose")
1226 self.timeout = timeout
1227 self.pre_dispatch = pre_dispatch
1229 if return_as not in {"list", "generator", "generator_unordered"}:
1230 raise ValueError(
1231 'Expected `return_as` parameter to be a string equal to "list"'
1232 f',"generator" or "generator_unordered", but got {return_as} '
1233 "instead."
1234 )
1235 self.return_as = return_as
1236 self.return_generator = return_as != "list"
1237 self.return_ordered = return_as != "generator_unordered"
1239 # Check if we are under a parallel_config or parallel_backend
1240 # context manager and use the config from the context manager
1241 # for arguments that are not explicitly set.
1242 self._backend_args = {
1243 k: _get_config_param(param, context_config, k) for param, k in [
1244 (max_nbytes, "max_nbytes"),
1245 (temp_folder, "temp_folder"),
1246 (mmap_mode, "mmap_mode"),
1247 (prefer, "prefer"),
1248 (require, "require"),
1249 (verbose, "verbose"),
1250 ]
1251 }
1253 if isinstance(self._backend_args["max_nbytes"], str):
1254 self._backend_args["max_nbytes"] = memstr_to_bytes(
1255 self._backend_args["max_nbytes"]
1256 )
1257 self._backend_args["verbose"] = max(
1258 0, self._backend_args["verbose"] - 50
1259 )
1261 if DEFAULT_MP_CONTEXT is not None:
1262 self._backend_args['context'] = DEFAULT_MP_CONTEXT
1263 elif hasattr(mp, "get_context"):
1264 self._backend_args['context'] = mp.get_context()
1266 if backend is default_parallel_config['backend'] or backend is None:
1267 backend = active_backend
1269 elif isinstance(backend, ParallelBackendBase):
1270 # Use provided backend as is, with the current nesting_level if it
1271 # is not set yet.
1272 if backend.nesting_level is None:
1273 backend.nesting_level = nesting_level
1275 elif hasattr(backend, 'Pool') and hasattr(backend, 'Lock'):
1276 # Make it possible to pass a custom multiprocessing context as
1277 # backend to change the start method to forkserver or spawn or
1278 # preload modules on the forkserver helper process.
1279 self._backend_args['context'] = backend
1280 backend = MultiprocessingBackend(nesting_level=nesting_level)
1282 elif backend not in BACKENDS and backend in MAYBE_AVAILABLE_BACKENDS:
1283 warnings.warn(
1284 f"joblib backend '{backend}' is not available on "
1285 f"your system, falling back to {DEFAULT_BACKEND}.",
1286 UserWarning,
1287 stacklevel=2)
1288 BACKENDS[backend] = BACKENDS[DEFAULT_BACKEND]
1289 backend = BACKENDS[DEFAULT_BACKEND](nesting_level=nesting_level)
1291 else:
1292 try:
1293 backend_factory = BACKENDS[backend]
1294 except KeyError as e:
1295 raise ValueError("Invalid backend: %s, expected one of %r"
1296 % (backend, sorted(BACKENDS.keys()))) from e
1297 backend = backend_factory(nesting_level=nesting_level)
1299 n_jobs = _get_config_param(n_jobs, context_config, "n_jobs")
1300 if n_jobs is None:
1301 # No specific context override and no specific value request:
1302 # default to the default of the backend.
1303 n_jobs = backend.default_n_jobs
1304 try:
1305 n_jobs = int(n_jobs)
1306 except ValueError:
1307 raise ValueError("n_jobs could not be converted to int")
1308 self.n_jobs = n_jobs
1310 if (require == 'sharedmem' and
1311 not getattr(backend, 'supports_sharedmem', False)):
1312 raise ValueError("Backend %s does not support shared memory"
1313 % backend)
1315 if (batch_size == 'auto' or isinstance(batch_size, Integral) and
1316 batch_size > 0):
1317 self.batch_size = batch_size
1318 else:
1319 raise ValueError(
1320 "batch_size must be 'auto' or a positive integer, got: %r"
1321 % batch_size)
1323 if not isinstance(backend, SequentialBackend):
1324 if self.return_generator and not backend.supports_return_generator:
1325 raise ValueError(
1326 "Backend {} does not support "
1327 "return_as={}".format(backend, return_as)
1328 )
1329 # This lock is used to coordinate the main thread of this process
1330 # with the async callback thread of our the pool.
1331 self._lock = threading.RLock()
1332 self._jobs = collections.deque()
1333 self._pending_outputs = list()
1334 self._ready_batches = queue.Queue()
1335 self._reducer_callback = None
1337 # Internal variables
1338 self._backend = backend
1339 self._running = False
1340 self._managed_backend = False
1341 self._id = uuid4().hex
1342 self._call_ref = None
1344 def __enter__(self):
1345 self._managed_backend = True
1346 self._calling = False
1347 self._initialize_backend()
1348 return self
1350 def __exit__(self, exc_type, exc_value, traceback):
1351 self._managed_backend = False
1352 if self.return_generator and self._calling:
1353 self._abort()
1354 self._terminate_and_reset()
1356 def _initialize_backend(self):
1357 """Build a process or thread pool and return the number of workers"""
1358 try:
1359 n_jobs = self._backend.configure(n_jobs=self.n_jobs, parallel=self,
1360 **self._backend_args)
1361 if self.timeout is not None and not self._backend.supports_timeout:
1362 warnings.warn(
1363 'The backend class {!r} does not support timeout. '
1364 "You have set 'timeout={}' in Parallel but "
1365 "the 'timeout' parameter will not be used.".format(
1366 self._backend.__class__.__name__,
1367 self.timeout))
1369 except FallbackToBackend as e:
1370 # Recursively initialize the backend in case of requested fallback.
1371 self._backend = e.backend
1372 n_jobs = self._initialize_backend()
1374 return n_jobs
1376 def _effective_n_jobs(self):
1377 if self._backend:
1378 return self._backend.effective_n_jobs(self.n_jobs)
1379 return 1
1381 def _terminate_and_reset(self):
1382 if hasattr(self._backend, 'stop_call') and self._calling:
1383 self._backend.stop_call()
1384 self._calling = False
1385 if not self._managed_backend:
1386 self._backend.terminate()
1388 def _dispatch(self, batch):
1389 """Queue the batch for computing, with or without multiprocessing
1391 WARNING: this method is not thread-safe: it should be only called
1392 indirectly via dispatch_one_batch.
1394 """
1395 # If job.get() catches an exception, it closes the queue:
1396 if self._aborting:
1397 return
1399 batch_size = len(batch)
1401 self.n_dispatched_tasks += batch_size
1402 self.n_dispatched_batches += 1
1404 dispatch_timestamp = time.time()
1406 batch_tracker = BatchCompletionCallBack(
1407 dispatch_timestamp, batch_size, self
1408 )
1410 if self.return_ordered:
1411 self._jobs.append(batch_tracker)
1413 # If return_ordered is False, the batch_tracker is not stored in the
1414 # jobs queue at the time of submission. Instead, it will be appended to
1415 # the queue by itself as soon as the callback is triggered to be able
1416 # to return the results in the order of completion.
1418 job = self._backend.apply_async(batch, callback=batch_tracker)
1419 batch_tracker.register_job(job)
1421 def dispatch_next(self):
1422 """Dispatch more data for parallel processing
1424 This method is meant to be called concurrently by the multiprocessing
1425 callback. We rely on the thread-safety of dispatch_one_batch to protect
1426 against concurrent consumption of the unprotected iterator.
1428 """
1429 if not self.dispatch_one_batch(self._original_iterator):
1430 self._iterating = False
1431 self._original_iterator = None
1433 def dispatch_one_batch(self, iterator):
1434 """Prefetch the tasks for the next batch and dispatch them.
1436 The effective size of the batch is computed here.
1437 If there are no more jobs to dispatch, return False, else return True.
1439 The iterator consumption and dispatching is protected by the same
1440 lock so calling this function should be thread safe.
1442 """
1444 if self._aborting:
1445 return False
1447 batch_size = self._get_batch_size()
1449 with self._lock:
1450 # to ensure an even distribution of the workload between workers,
1451 # we look ahead in the original iterators more than batch_size
1452 # tasks - However, we keep consuming only one batch at each
1453 # dispatch_one_batch call. The extra tasks are stored in a local
1454 # queue, _ready_batches, that is looked-up prior to re-consuming
1455 # tasks from the origal iterator.
1456 try:
1457 tasks = self._ready_batches.get(block=False)
1458 except queue.Empty:
1459 # slice the iterator n_jobs * batchsize items at a time. If the
1460 # slice returns less than that, then the current batchsize puts
1461 # too much weight on a subset of workers, while other may end
1462 # up starving. So in this case, re-scale the batch size
1463 # accordingly to distribute evenly the last items between all
1464 # workers.
1465 n_jobs = self._cached_effective_n_jobs
1466 big_batch_size = batch_size * n_jobs
1468 try:
1469 islice = list(itertools.islice(iterator, big_batch_size))
1470 except Exception as e:
1471 # Handle the fact that the generator of task raised an
1472 # exception. As this part of the code can be executed in
1473 # a thread internal to the backend, register a task with
1474 # an error that will be raised in the user's thread.
1475 if isinstance(e.__context__, queue.Empty):
1476 # Suppress the cause of the exception if it is
1477 # queue.Empty to avoid cluttered traceback. Only do it
1478 # if the __context__ is really empty to avoid messing
1479 # with causes of the original error.
1480 e.__cause__ = None
1481 batch_tracker = BatchCompletionCallBack(
1482 0, batch_size, self
1483 )
1484 self._jobs.append(batch_tracker)
1485 batch_tracker._register_outcome(dict(
1486 result=e, status=TASK_ERROR
1487 ))
1488 return True
1490 if len(islice) == 0:
1491 return False
1492 elif (iterator is self._original_iterator and
1493 len(islice) < big_batch_size):
1494 # We reached the end of the original iterator (unless
1495 # iterator is the ``pre_dispatch``-long initial slice of
1496 # the original iterator) -- decrease the batch size to
1497 # account for potential variance in the batches running
1498 # time.
1499 final_batch_size = max(1, len(islice) // (10 * n_jobs))
1500 else:
1501 final_batch_size = max(1, len(islice) // n_jobs)
1503 # enqueue n_jobs batches in a local queue
1504 for i in range(0, len(islice), final_batch_size):
1505 tasks = BatchedCalls(islice[i:i + final_batch_size],
1506 self._backend.get_nested_backend(),
1507 self._reducer_callback,
1508 self._pickle_cache)
1509 self._ready_batches.put(tasks)
1511 # finally, get one task.
1512 tasks = self._ready_batches.get(block=False)
1513 if len(tasks) == 0:
1514 # No more tasks available in the iterator: tell caller to stop.
1515 return False
1516 else:
1517 self._dispatch(tasks)
1518 return True
1520 def _get_batch_size(self):
1521 """Returns the effective batch size for dispatch"""
1522 if self.batch_size == 'auto':
1523 return self._backend.compute_batch_size()
1524 else:
1525 # Fixed batch size strategy
1526 return self.batch_size
1528 def _print(self, msg):
1529 """Display the message on stout or stderr depending on verbosity"""
1530 # XXX: Not using the logger framework: need to
1531 # learn to use logger better.
1532 if not self.verbose:
1533 return
1534 if self.verbose < 50:
1535 writer = sys.stderr.write
1536 else:
1537 writer = sys.stdout.write
1538 writer(f"[{self}]: {msg}\n")
1540 def _is_completed(self):
1541 """Check if all tasks have been completed"""
1542 return self.n_completed_tasks == self.n_dispatched_tasks and not (
1543 self._iterating or self._aborting
1544 )
1546 def print_progress(self):
1547 """Display the process of the parallel execution only a fraction
1548 of time, controlled by self.verbose.
1549 """
1551 if not self.verbose:
1552 return
1554 elapsed_time = time.time() - self._start_time
1556 if self._is_completed():
1557 # Make sure that we get a last message telling us we are done
1558 self._print(
1559 f"Done {self.n_completed_tasks:3d} out of "
1560 f"{self.n_completed_tasks:3d} | elapsed: "
1561 f"{short_format_time(elapsed_time)} finished"
1562 )
1563 return
1565 # Original job iterator becomes None once it has been fully
1566 # consumed: at this point we know the total number of jobs and we are
1567 # able to display an estimation of the remaining time based on already
1568 # completed jobs. Otherwise, we simply display the number of completed
1569 # tasks.
1570 elif self._original_iterator is not None:
1571 if _verbosity_filter(self.n_dispatched_batches, self.verbose):
1572 return
1573 self._print(
1574 f"Done {self.n_completed_tasks:3d} tasks | elapsed: "
1575 f"{short_format_time(elapsed_time)}"
1576 )
1577 else:
1578 index = self.n_completed_tasks
1579 # We are finished dispatching
1580 total_tasks = self.n_dispatched_tasks
1581 # We always display the first loop
1582 if not index == 0:
1583 # Display depending on the number of remaining items
1584 # A message as soon as we finish dispatching, cursor is 0
1585 cursor = (total_tasks - index + 1 -
1586 self._pre_dispatch_amount)
1587 frequency = (total_tasks // self.verbose) + 1
1588 is_last_item = (index + 1 == total_tasks)
1589 if (is_last_item or cursor % frequency):
1590 return
1591 remaining_time = (elapsed_time / index) * \
1592 (self.n_dispatched_tasks - index * 1.0)
1593 # only display status if remaining time is greater or equal to 0
1594 self._print(
1595 f"Done {index:3d} out of {total_tasks:3d} | elapsed: "
1596 f"{short_format_time(elapsed_time)} remaining: "
1597 f"{short_format_time(remaining_time)}"
1598 )
1600 def _abort(self):
1601 # Stop dispatching new jobs in the async callback thread
1602 self._aborting = True
1604 # If the backend allows it, cancel or kill remaining running
1605 # tasks without waiting for the results as we will raise
1606 # the exception we got back to the caller instead of returning
1607 # any result.
1608 backend = self._backend
1609 if (not self._aborted and hasattr(backend, 'abort_everything')):
1610 # If the backend is managed externally we need to make sure
1611 # to leave it in a working state to allow for future jobs
1612 # scheduling.
1613 ensure_ready = self._managed_backend
1614 backend.abort_everything(ensure_ready=ensure_ready)
1615 self._aborted = True
1617 def _start(self, iterator, pre_dispatch):
1618 # Only set self._iterating to True if at least a batch
1619 # was dispatched. In particular this covers the edge
1620 # case of Parallel used with an exhausted iterator. If
1621 # self._original_iterator is None, then this means either
1622 # that pre_dispatch == "all", n_jobs == 1 or that the first batch
1623 # was very quick and its callback already dispatched all the
1624 # remaining jobs.
1625 self._iterating = False
1626 if self.dispatch_one_batch(iterator):
1627 self._iterating = self._original_iterator is not None
1629 while self.dispatch_one_batch(iterator):
1630 pass
1632 if pre_dispatch == "all":
1633 # The iterable was consumed all at once by the above for loop.
1634 # No need to wait for async callbacks to trigger to
1635 # consumption.
1636 self._iterating = False
1638 def _get_outputs(self, iterator, pre_dispatch):
1639 """Iterator returning the tasks' output as soon as they are ready."""
1640 dispatch_thread_id = threading.get_ident()
1641 detach_generator_exit = False
1642 try:
1643 self._start(iterator, pre_dispatch)
1644 # first yield returns None, for internal use only. This ensures
1645 # that we enter the try/except block and start dispatching the
1646 # tasks.
1647 yield
1649 with self._backend.retrieval_context():
1650 yield from self._retrieve()
1652 except GeneratorExit:
1653 # The generator has been garbage collected before being fully
1654 # consumed. This aborts the remaining tasks if possible and warn
1655 # the user if necessary.
1656 self._exception = True
1658 # In some interpreters such as PyPy, GeneratorExit can be raised in
1659 # a different thread than the one used to start the dispatch of the
1660 # parallel tasks. This can lead to hang when a thread attempts to
1661 # join itself. As workaround, we detach the execution of the
1662 # aborting code to a dedicated thread. We then need to make sure
1663 # the rest of the function does not call `_terminate_and_reset`
1664 # in finally.
1665 if dispatch_thread_id != threading.get_ident():
1666 if not IS_PYPY:
1667 warnings.warn(
1668 "A generator produced by joblib.Parallel has been "
1669 "gc'ed in an unexpected thread. This behavior should "
1670 "not cause major -issues but to make sure, please "
1671 "report this warning and your use case at "
1672 "https://github.com/joblib/joblib/issues so it can "
1673 "be investigated."
1674 )
1676 detach_generator_exit = True
1677 _parallel = self
1679 class _GeneratorExitThread(threading.Thread):
1680 def run(self):
1681 _parallel._abort()
1682 if _parallel.return_generator:
1683 _parallel._warn_exit_early()
1684 _parallel._terminate_and_reset()
1686 _GeneratorExitThread(
1687 name="GeneratorExitThread"
1688 ).start()
1689 return
1691 # Otherwise, we are in the thread that started the dispatch: we can
1692 # safely abort the execution and warn the user.
1693 self._abort()
1694 if self.return_generator:
1695 self._warn_exit_early()
1697 raise
1699 # Note: we catch any BaseException instead of just Exception instances
1700 # to also include KeyboardInterrupt
1701 except BaseException:
1702 self._exception = True
1703 self._abort()
1704 raise
1705 finally:
1706 # Store the unconsumed tasks and terminate the workers if necessary
1707 _remaining_outputs = ([] if self._exception else self._jobs)
1708 self._jobs = collections.deque()
1709 self._running = False
1710 if not detach_generator_exit:
1711 self._terminate_and_reset()
1713 while len(_remaining_outputs) > 0:
1714 batched_results = _remaining_outputs.popleft()
1715 batched_results = batched_results.get_result(self.timeout)
1716 for result in batched_results:
1717 yield result
1719 def _wait_retrieval(self):
1720 """Return True if we need to continue retrieving some tasks."""
1722 # If the input load is still being iterated over, it means that tasks
1723 # are still on the dispatch waitlist and their results will need to
1724 # be retrieved later on.
1725 if self._iterating:
1726 return True
1728 # If some of the dispatched tasks are still being processed by the
1729 # workers, wait for the compute to finish before starting retrieval
1730 if self.n_completed_tasks < self.n_dispatched_tasks:
1731 return True
1733 # For backends that does not support retrieving asynchronously the
1734 # result to the main process, all results must be carefully retrieved
1735 # in the _retrieve loop in the main thread while the backend is alive.
1736 # For other backends, the actual retrieval is done asynchronously in
1737 # the callback thread, and we can terminate the backend before the
1738 # `self._jobs` result list has been emptied. The remaining results
1739 # will be collected in the `finally` step of the generator.
1740 if not self._backend.supports_retrieve_callback:
1741 if len(self._jobs) > 0:
1742 return True
1744 return False
1746 def _retrieve(self):
1747 while self._wait_retrieval():
1749 # If the callback thread of a worker has signaled that its task
1750 # triggered an exception, or if the retrieval loop has raised an
1751 # exception (e.g. `GeneratorExit`), exit the loop and surface the
1752 # worker traceback.
1753 if self._aborting:
1754 self._raise_error_fast()
1755 break
1757 # If the next job is not ready for retrieval yet, we just wait for
1758 # async callbacks to progress.
1759 if ((len(self._jobs) == 0) or
1760 (self._jobs[0].get_status(
1761 timeout=self.timeout) == TASK_PENDING)):
1762 time.sleep(0.01)
1763 continue
1765 # We need to be careful: the job list can be filling up as
1766 # we empty it and Python list are not thread-safe by
1767 # default hence the use of the lock
1768 with self._lock:
1769 batched_results = self._jobs.popleft()
1771 # Flatten the batched results to output one output at a time
1772 batched_results = batched_results.get_result(self.timeout)
1773 for result in batched_results:
1774 self._nb_consumed += 1
1775 yield result
1777 def _raise_error_fast(self):
1778 """If we are aborting, raise if a job caused an error."""
1780 # Find the first job whose status is TASK_ERROR if it exists.
1781 with self._lock:
1782 error_job = next((job for job in self._jobs
1783 if job.status == TASK_ERROR), None)
1785 # If this error job exists, immediately raise the error by
1786 # calling get_result. This job might not exists if abort has been
1787 # called directly or if the generator is gc'ed.
1788 if error_job is not None:
1789 error_job.get_result(self.timeout)
1791 def _warn_exit_early(self):
1792 """Warn the user if the generator is gc'ed before being consumned."""
1793 ready_outputs = self.n_completed_tasks - self._nb_consumed
1794 is_completed = self._is_completed()
1795 msg = ""
1796 if ready_outputs:
1797 msg += (
1798 f"{ready_outputs} tasks have been successfully executed "
1799 " but not used."
1800 )
1801 if not is_completed:
1802 msg += " Additionally, "
1804 if not is_completed:
1805 msg += (
1806 f"{self.n_dispatched_tasks - self.n_completed_tasks} tasks "
1807 "which were still being processed by the workers have been "
1808 "cancelled."
1809 )
1811 if msg:
1812 msg += (
1813 " You could benefit from adjusting the input task "
1814 "iterator to limit unnecessary computation time."
1815 )
1817 warnings.warn(msg)
1819 def _get_sequential_output(self, iterable):
1820 """Separate loop for sequential output.
1822 This simplifies the traceback in case of errors and reduces the
1823 overhead of calling sequential tasks with `joblib`.
1824 """
1825 try:
1826 self._iterating = True
1827 self._original_iterator = iterable
1828 batch_size = self._get_batch_size()
1830 if batch_size != 1:
1831 it = iter(iterable)
1832 iterable_batched = iter(
1833 lambda: tuple(itertools.islice(it, batch_size)), ()
1834 )
1835 iterable = (
1836 task for batch in iterable_batched for task in batch
1837 )
1839 # first yield returns None, for internal use only. This ensures
1840 # that we enter the try/except block and setup the generator.
1841 yield None
1843 # Sequentially call the tasks and yield the results.
1844 for func, args, kwargs in iterable:
1845 self.n_dispatched_batches += 1
1846 self.n_dispatched_tasks += 1
1847 res = func(*args, **kwargs)
1848 self.n_completed_tasks += 1
1849 self.print_progress()
1850 yield res
1851 self._nb_consumed += 1
1852 except BaseException:
1853 self._exception = True
1854 self._aborting = True
1855 self._aborted = True
1856 raise
1857 finally:
1858 self.print_progress()
1859 self._running = False
1860 self._iterating = False
1861 self._original_iterator = None
1863 def _reset_run_tracking(self):
1864 """Reset the counters and flags used to track the execution."""
1866 # Makes sur the parallel instance was not previously running in a
1867 # thread-safe way.
1868 with getattr(self, '_lock', nullcontext()):
1869 if self._running:
1870 msg = 'This Parallel instance is already running !'
1871 if self.return_generator is True:
1872 msg += (
1873 " Before submitting new tasks, you must wait for the "
1874 "completion of all the previous tasks, or clean all "
1875 "references to the output generator."
1876 )
1877 raise RuntimeError(msg)
1878 self._running = True
1880 # Counter to keep track of the task dispatched and completed.
1881 self.n_dispatched_batches = 0
1882 self.n_dispatched_tasks = 0
1883 self.n_completed_tasks = 0
1885 # Following count is incremented by one each time the user iterates
1886 # on the output generator, it is used to prepare an informative
1887 # warning message in case the generator is deleted before all the
1888 # dispatched tasks have been consumed.
1889 self._nb_consumed = 0
1891 # Following flags are used to synchronize the threads in case one of
1892 # the tasks error-out to ensure that all workers abort fast and that
1893 # the backend terminates properly.
1895 # Set to True as soon as a worker signals that a task errors-out
1896 self._exception = False
1897 # Set to True in case of early termination following an incident
1898 self._aborting = False
1899 # Set to True after abortion is complete
1900 self._aborted = False
1902 def __call__(self, iterable):
1903 """Main function to dispatch parallel tasks."""
1905 self._reset_run_tracking()
1906 self._start_time = time.time()
1908 if not self._managed_backend:
1909 n_jobs = self._initialize_backend()
1910 else:
1911 n_jobs = self._effective_n_jobs()
1913 if n_jobs == 1:
1914 # If n_jobs==1, run the computation sequentially and return
1915 # immediately to avoid overheads.
1916 output = self._get_sequential_output(iterable)
1917 next(output)
1918 return output if self.return_generator else list(output)
1920 # Let's create an ID that uniquely identifies the current call. If the
1921 # call is interrupted early and that the same instance is immediately
1922 # re-used, this id will be used to prevent workers that were
1923 # concurrently finalizing a task from the previous call to run the
1924 # callback.
1925 with self._lock:
1926 self._call_id = uuid4().hex
1928 # self._effective_n_jobs should be called in the Parallel.__call__
1929 # thread only -- store its value in an attribute for further queries.
1930 self._cached_effective_n_jobs = n_jobs
1932 if isinstance(self._backend, LokyBackend):
1933 # For the loky backend, we add a callback executed when reducing
1934 # BatchCalls, that makes the loky executor use a temporary folder
1935 # specific to this Parallel object when pickling temporary memmaps.
1936 # This callback is necessary to ensure that several Parallel
1937 # objects using the same reusable executor don't use the same
1938 # temporary resources.
1940 def _batched_calls_reducer_callback():
1941 # Relevant implementation detail: the following lines, called
1942 # when reducing BatchedCalls, are called in a thread-safe
1943 # situation, meaning that the context of the temporary folder
1944 # manager will not be changed in between the callback execution
1945 # and the end of the BatchedCalls pickling. The reason is that
1946 # pickling (the only place where set_current_context is used)
1947 # is done from a single thread (the queue_feeder_thread).
1948 self._backend._workers._temp_folder_manager.set_current_context( # noqa
1949 self._id
1950 )
1951 self._reducer_callback = _batched_calls_reducer_callback
1953 # self._effective_n_jobs should be called in the Parallel.__call__
1954 # thread only -- store its value in an attribute for further queries.
1955 self._cached_effective_n_jobs = n_jobs
1957 backend_name = self._backend.__class__.__name__
1958 if n_jobs == 0:
1959 raise RuntimeError("%s has no active worker." % backend_name)
1961 self._print(
1962 f"Using backend {backend_name} with {n_jobs} concurrent workers."
1963 )
1964 if hasattr(self._backend, 'start_call'):
1965 self._backend.start_call()
1967 # Following flag prevents double calls to `backend.stop_call`.
1968 self._calling = True
1970 iterator = iter(iterable)
1971 pre_dispatch = self.pre_dispatch
1973 if pre_dispatch == 'all':
1974 # prevent further dispatch via multiprocessing callback thread
1975 self._original_iterator = None
1976 self._pre_dispatch_amount = 0
1977 else:
1978 self._original_iterator = iterator
1979 if hasattr(pre_dispatch, 'endswith'):
1980 pre_dispatch = eval_expr(
1981 pre_dispatch.replace("n_jobs", str(n_jobs))
1982 )
1983 self._pre_dispatch_amount = pre_dispatch = int(pre_dispatch)
1985 # The main thread will consume the first pre_dispatch items and
1986 # the remaining items will later be lazily dispatched by async
1987 # callbacks upon task completions.
1989 # TODO: this iterator should be batch_size * n_jobs
1990 iterator = itertools.islice(iterator, self._pre_dispatch_amount)
1992 # Use a caching dict for callables that are pickled with cloudpickle to
1993 # improve performances. This cache is used only in the case of
1994 # functions that are defined in the __main__ module, functions that
1995 # are defined locally (inside another function) and lambda expressions.
1996 self._pickle_cache = dict()
1998 output = self._get_outputs(iterator, pre_dispatch)
1999 self._call_ref = weakref.ref(output)
2001 # The first item from the output is blank, but it makes the interpreter
2002 # progress until it enters the Try/Except block of the generator and
2003 # reaches the first `yield` statement. This starts the asynchronous
2004 # dispatch of the tasks to the workers.
2005 next(output)
2007 return output if self.return_generator else list(output)
2009 def __repr__(self):
2010 return '%s(n_jobs=%s)' % (self.__class__.__name__, self.n_jobs)