Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/joblib/parallel.py: 16%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""
2Helpers for embarrassingly parallel code.
3"""
4# Author: Gael Varoquaux < gael dot varoquaux at normalesup dot org >
5# Copyright: 2010, Gael Varoquaux
6# License: BSD 3 clause
8from __future__ import division
10import collections
11import functools
12import itertools
13import os
14import queue
15import sys
16import threading
17import time
18import warnings
19import weakref
20from contextlib import nullcontext
21from math import floor, log10, sqrt
22from multiprocessing import TimeoutError
23from numbers import Integral
24from uuid import uuid4
26from ._multiprocessing_helpers import mp
28# Make sure that those two classes are part of the public joblib.parallel API
29# so that 3rd party backend implementers can import them from here.
30from ._parallel_backends import (
31 AutoBatchingMixin, # noqa
32 FallbackToBackend,
33 LokyBackend,
34 MultiprocessingBackend,
35 ParallelBackendBase, # noqa
36 SequentialBackend,
37 ThreadingBackend,
38)
39from ._utils import _Sentinel, eval_expr
40from .disk import memstr_to_bytes
41from .logger import Logger, short_format_time
43BACKENDS = {
44 "threading": ThreadingBackend,
45 "sequential": SequentialBackend,
46}
47# name of the backend used by default by Parallel outside of any context
48# managed by ``parallel_config`` or ``parallel_backend``.
50# threading is the only backend that is always everywhere
51DEFAULT_BACKEND = "threading"
52DEFAULT_THREAD_BACKEND = "threading"
53DEFAULT_PROCESS_BACKEND = "threading"
55MAYBE_AVAILABLE_BACKENDS = {"multiprocessing", "loky"}
57# if multiprocessing is available, so is loky, we set it as the default
58# backend
59if mp is not None:
60 BACKENDS["multiprocessing"] = MultiprocessingBackend
61 from .externals import loky
63 BACKENDS["loky"] = LokyBackend
64 DEFAULT_BACKEND = "loky"
65 DEFAULT_PROCESS_BACKEND = "loky"
67# Thread local value that can be overridden by the ``parallel_config`` context
68# manager
69_backend = threading.local()
72def _register_dask():
73 """Register Dask Backend if called with parallel_config(backend="dask")"""
74 try:
75 from ._dask import DaskDistributedBackend
77 register_parallel_backend("dask", DaskDistributedBackend)
78 except ImportError as e:
79 msg = (
80 "To use the dask.distributed backend you must install both "
81 "the `dask` and distributed modules.\n\n"
82 "See https://dask.pydata.org/en/latest/install.html for more "
83 "information."
84 )
85 raise ImportError(msg) from e
88EXTERNAL_BACKENDS = {
89 "dask": _register_dask,
90}
93# Sentinels for the default values of the Parallel constructor and
94# the parallel_config and parallel_backend context managers
95default_parallel_config = {
96 "backend": _Sentinel(default_value=None),
97 "n_jobs": _Sentinel(default_value=None),
98 "verbose": _Sentinel(default_value=0),
99 "temp_folder": _Sentinel(default_value=None),
100 "max_nbytes": _Sentinel(default_value="1M"),
101 "mmap_mode": _Sentinel(default_value="r"),
102 "prefer": _Sentinel(default_value=None),
103 "require": _Sentinel(default_value=None),
104}
107VALID_BACKEND_HINTS = ("processes", "threads", None)
108VALID_BACKEND_CONSTRAINTS = ("sharedmem", None)
111def _get_config_param(param, context_config, key):
112 """Return the value of a parallel config parameter
114 Explicitly setting it in Parallel has priority over setting in a
115 parallel_(config/backend) context manager.
116 """
117 if param is not default_parallel_config[key]:
118 # param is explicitly set, return it
119 return param
121 if context_config[key] is not default_parallel_config[key]:
122 # there's a context manager and the key is set, return it
123 return context_config[key]
125 # Otherwise, we are in the default_parallel_config,
126 # return the default value
127 return param.default_value
130def get_active_backend(
131 prefer=default_parallel_config["prefer"],
132 require=default_parallel_config["require"],
133 verbose=default_parallel_config["verbose"],
134):
135 """Return the active default backend"""
136 backend, config = _get_active_backend(prefer, require, verbose)
137 n_jobs = _get_config_param(default_parallel_config["n_jobs"], config, "n_jobs")
138 return backend, n_jobs
141def _get_active_backend(
142 prefer=default_parallel_config["prefer"],
143 require=default_parallel_config["require"],
144 verbose=default_parallel_config["verbose"],
145):
146 """Return the active default backend"""
148 backend_config = getattr(_backend, "config", default_parallel_config)
150 backend = _get_config_param(
151 default_parallel_config["backend"], backend_config, "backend"
152 )
154 prefer = _get_config_param(prefer, backend_config, "prefer")
155 require = _get_config_param(require, backend_config, "require")
156 verbose = _get_config_param(verbose, backend_config, "verbose")
158 if prefer not in VALID_BACKEND_HINTS:
159 raise ValueError(
160 f"prefer={prefer} is not a valid backend hint, "
161 f"expected one of {VALID_BACKEND_HINTS}"
162 )
163 if require not in VALID_BACKEND_CONSTRAINTS:
164 raise ValueError(
165 f"require={require} is not a valid backend constraint, "
166 f"expected one of {VALID_BACKEND_CONSTRAINTS}"
167 )
168 if prefer == "processes" and require == "sharedmem":
169 raise ValueError(
170 "prefer == 'processes' and require == 'sharedmem' are inconsistent settings"
171 )
173 explicit_backend = True
174 if backend is None:
175 # We are either outside of the scope of any parallel_(config/backend)
176 # context manager or the context manager did not set a backend.
177 # create the default backend instance now.
178 backend = BACKENDS[DEFAULT_BACKEND](nesting_level=0)
179 explicit_backend = False
181 # Try to use the backend set by the user with the context manager.
183 nesting_level = backend.nesting_level
184 uses_threads = getattr(backend, "uses_threads", False)
185 supports_sharedmem = getattr(backend, "supports_sharedmem", False)
186 # Force to use thread-based backend if the provided backend does not
187 # match the shared memory constraint or if the backend is not explicitly
188 # given and threads are preferred.
189 force_threads = (require == "sharedmem" and not supports_sharedmem) or (
190 not explicit_backend and prefer == "threads" and not uses_threads
191 )
192 force_processes = not explicit_backend and prefer == "processes" and uses_threads
194 if force_threads:
195 # This backend does not match the shared memory constraint:
196 # fallback to the default thead-based backend.
197 sharedmem_backend = BACKENDS[DEFAULT_THREAD_BACKEND](
198 nesting_level=nesting_level
199 )
200 # Warn the user if we forced the backend to thread-based, while the
201 # user explicitly specified a non-thread-based backend.
202 if verbose >= 10 and explicit_backend:
203 print(
204 f"Using {sharedmem_backend.__class__.__name__} as "
205 f"joblib backend instead of {backend.__class__.__name__} "
206 "as the latter does not provide shared memory semantics."
207 )
208 # Force to n_jobs=1 by default
209 thread_config = backend_config.copy()
210 thread_config["n_jobs"] = 1
211 return sharedmem_backend, thread_config
213 if force_processes:
214 # This backend does not match the prefer="processes" constraint:
215 # fallback to the default process-based backend.
216 process_backend = BACKENDS[DEFAULT_PROCESS_BACKEND](nesting_level=nesting_level)
218 return process_backend, backend_config.copy()
220 return backend, backend_config
223class parallel_config:
224 """Set the default backend or configuration for :class:`~joblib.Parallel`.
226 This is an alternative to directly passing keyword arguments to the
227 :class:`~joblib.Parallel` class constructor. It is particularly useful when
228 calling into library code that uses joblib internally but does not expose
229 the various parallel configuration arguments in its own API.
231 Parameters
232 ----------
233 backend: str or ParallelBackendBase instance, default=None
234 If ``backend`` is a string it must match a previously registered
235 implementation using the :func:`~register_parallel_backend` function.
237 By default the following backends are available:
239 - 'loky': single-host, process-based parallelism (used by default),
240 - 'threading': single-host, thread-based parallelism,
241 - 'multiprocessing': legacy single-host, process-based parallelism.
243 'loky' is recommended to run functions that manipulate Python objects.
244 'threading' is a low-overhead alternative that is most efficient for
245 functions that release the Global Interpreter Lock: e.g. I/O-bound
246 code or CPU-bound code in a few calls to native code that explicitly
247 releases the GIL. Note that on some rare systems (such as pyodide),
248 multiprocessing and loky may not be available, in which case joblib
249 defaults to threading.
251 In addition, if the ``dask`` and ``distributed`` Python packages are
252 installed, it is possible to use the 'dask' backend for better
253 scheduling of nested parallel calls without over-subscription and
254 potentially distribute parallel calls over a networked cluster of
255 several hosts.
257 It is also possible to use the distributed 'ray' backend for
258 distributing the workload to a cluster of nodes. See more details
259 in the Examples section below.
261 Alternatively the backend can be passed directly as an instance.
263 n_jobs: int, default=None
264 The maximum number of concurrently running jobs, such as the number
265 of Python worker processes when ``backend="loky"`` or the size of the
266 thread-pool when ``backend="threading"``.
267 This argument is converted to an integer, rounded below for float.
268 If -1 is given, `joblib` tries to use all CPUs. The number of CPUs
269 ``n_cpus`` is obtained with :func:`~cpu_count`.
270 For n_jobs below -1, (n_cpus + 1 + n_jobs) are used. For instance,
271 using ``n_jobs=-2`` will result in all CPUs but one being used.
272 This argument can also go above ``n_cpus``, which will cause
273 oversubscription. In some cases, slight oversubscription can be
274 beneficial, e.g., for tasks with large I/O operations.
275 If 1 is given, no parallel computing code is used at all, and the
276 behavior amounts to a simple python `for` loop. This mode is not
277 compatible with `timeout`.
278 None is a marker for 'unset' that will be interpreted as n_jobs=1
279 unless the call is performed under a :func:`~parallel_config`
280 context manager that sets another value for ``n_jobs``.
281 If n_jobs = 0 then a ValueError is raised.
283 verbose: int, default=0
284 The verbosity level: if non zero, progress messages are
285 printed. Above 50, the output is sent to stdout.
286 The frequency of the messages increases with the verbosity level.
287 If it more than 10, all iterations are reported.
289 temp_folder: str or None, default=None
290 Folder to be used by the pool for memmapping large arrays
291 for sharing memory with worker processes. If None, this will try in
292 order:
294 - a folder pointed by the ``JOBLIB_TEMP_FOLDER`` environment
295 variable,
296 - ``/dev/shm`` if the folder exists and is writable: this is a
297 RAM disk filesystem available by default on modern Linux
298 distributions,
299 - the default system temporary folder that can be
300 overridden with ``TMP``, ``TMPDIR`` or ``TEMP`` environment
301 variables, typically ``/tmp`` under Unix operating systems.
303 max_nbytes: int, str, or None, optional, default='1M'
304 Threshold on the size of arrays passed to the workers that
305 triggers automated memory mapping in temp_folder. Can be an int
306 in Bytes, or a human-readable string, e.g., '1M' for 1 megabyte.
307 Use None to disable memmapping of large arrays.
309 mmap_mode: {None, 'r+', 'r', 'w+', 'c'}, default='r'
310 Memmapping mode for numpy arrays passed to workers. None will
311 disable memmapping, other modes defined in the numpy.memmap doc:
312 https://numpy.org/doc/stable/reference/generated/numpy.memmap.html
313 Also, see 'max_nbytes' parameter documentation for more details.
315 prefer: str in {'processes', 'threads'} or None, default=None
316 Soft hint to choose the default backend.
317 The default process-based backend is 'loky' and the default
318 thread-based backend is 'threading'. Ignored if the ``backend``
319 parameter is specified.
321 require: 'sharedmem' or None, default=None
322 Hard constraint to select the backend. If set to 'sharedmem',
323 the selected backend will be single-host and thread-based.
325 inner_max_num_threads: int, default=None
326 If not None, overwrites the limit set on the number of threads
327 usable in some third-party library threadpools like OpenBLAS,
328 MKL or OpenMP. This is only used with the ``loky`` backend.
330 backend_params: dict
331 Additional parameters to pass to the backend constructor when
332 backend is a string.
334 Notes
335 -----
336 Joblib tries to limit the oversubscription by limiting the number of
337 threads usable in some third-party library threadpools like OpenBLAS, MKL
338 or OpenMP. The default limit in each worker is set to
339 ``max(cpu_count() // effective_n_jobs, 1)`` but this limit can be
340 overwritten with the ``inner_max_num_threads`` argument which will be used
341 to set this limit in the child processes.
343 .. versionadded:: 1.3
345 Examples
346 --------
347 >>> from operator import neg
348 >>> with parallel_config(backend='threading'):
349 ... print(Parallel()(delayed(neg)(i + 1) for i in range(5)))
350 ...
351 [-1, -2, -3, -4, -5]
353 To use the 'ray' joblib backend add the following lines:
355 >>> from ray.util.joblib import register_ray # doctest: +SKIP
356 >>> register_ray() # doctest: +SKIP
357 >>> with parallel_config(backend="ray"): # doctest: +SKIP
358 ... print(Parallel()(delayed(neg)(i + 1) for i in range(5)))
359 [-1, -2, -3, -4, -5]
361 """
363 def __init__(
364 self,
365 backend=default_parallel_config["backend"],
366 *,
367 n_jobs=default_parallel_config["n_jobs"],
368 verbose=default_parallel_config["verbose"],
369 temp_folder=default_parallel_config["temp_folder"],
370 max_nbytes=default_parallel_config["max_nbytes"],
371 mmap_mode=default_parallel_config["mmap_mode"],
372 prefer=default_parallel_config["prefer"],
373 require=default_parallel_config["require"],
374 inner_max_num_threads=None,
375 **backend_params,
376 ):
377 # Save the parallel info and set the active parallel config
378 self.old_parallel_config = getattr(_backend, "config", default_parallel_config)
380 backend = self._check_backend(backend, inner_max_num_threads, **backend_params)
382 new_config = {
383 "n_jobs": n_jobs,
384 "verbose": verbose,
385 "temp_folder": temp_folder,
386 "max_nbytes": max_nbytes,
387 "mmap_mode": mmap_mode,
388 "prefer": prefer,
389 "require": require,
390 "backend": backend,
391 }
392 self.parallel_config = self.old_parallel_config.copy()
393 self.parallel_config.update(
394 {k: v for k, v in new_config.items() if not isinstance(v, _Sentinel)}
395 )
397 setattr(_backend, "config", self.parallel_config)
399 def _check_backend(self, backend, inner_max_num_threads, **backend_params):
400 if backend is default_parallel_config["backend"]:
401 if inner_max_num_threads is not None or len(backend_params) > 0:
402 raise ValueError(
403 "inner_max_num_threads and other constructor "
404 "parameters backend_params are only supported "
405 "when backend is not None."
406 )
407 return backend
409 if isinstance(backend, str):
410 # Handle non-registered or missing backends
411 if backend not in BACKENDS:
412 if backend in EXTERNAL_BACKENDS:
413 register = EXTERNAL_BACKENDS[backend]
414 register()
415 elif backend in MAYBE_AVAILABLE_BACKENDS:
416 warnings.warn(
417 f"joblib backend '{backend}' is not available on "
418 f"your system, falling back to {DEFAULT_BACKEND}.",
419 UserWarning,
420 stacklevel=2,
421 )
422 BACKENDS[backend] = BACKENDS[DEFAULT_BACKEND]
423 else:
424 raise ValueError(
425 f"Invalid backend: {backend}, expected one of "
426 f"{sorted(BACKENDS.keys())}"
427 )
429 backend = BACKENDS[backend](**backend_params)
430 else:
431 if len(backend_params) > 0:
432 raise ValueError(
433 "Constructor parameters backend_params are only "
434 "supported when backend is a string."
435 )
437 if inner_max_num_threads is not None:
438 msg = (
439 f"{backend.__class__.__name__} does not accept setting the "
440 "inner_max_num_threads argument."
441 )
442 assert backend.supports_inner_max_num_threads, msg
443 backend.inner_max_num_threads = inner_max_num_threads
445 # If the nesting_level of the backend is not set previously, use the
446 # nesting level from the previous active_backend to set it
447 if backend.nesting_level is None:
448 parent_backend = self.old_parallel_config["backend"]
449 if parent_backend is default_parallel_config["backend"]:
450 nesting_level = 0
451 else:
452 nesting_level = parent_backend.nesting_level
453 backend.nesting_level = nesting_level
455 return backend
457 def __enter__(self):
458 return self.parallel_config
460 def __exit__(self, type, value, traceback):
461 self.unregister()
463 def unregister(self):
464 setattr(_backend, "config", self.old_parallel_config)
467class parallel_backend(parallel_config):
468 """Change the default backend used by Parallel inside a with block.
470 .. warning::
471 It is advised to use the :class:`~joblib.parallel_config` context
472 manager instead, which allows more fine-grained control over the
473 backend configuration.
475 If ``backend`` is a string it must match a previously registered
476 implementation using the :func:`~register_parallel_backend` function.
478 By default the following backends are available:
480 - 'loky': single-host, process-based parallelism (used by default),
481 - 'threading': single-host, thread-based parallelism,
482 - 'multiprocessing': legacy single-host, process-based parallelism.
484 'loky' is recommended to run functions that manipulate Python objects.
485 'threading' is a low-overhead alternative that is most efficient for
486 functions that release the Global Interpreter Lock: e.g. I/O-bound code or
487 CPU-bound code in a few calls to native code that explicitly releases the
488 GIL. Note that on some rare systems (such as Pyodide),
489 multiprocessing and loky may not be available, in which case joblib
490 defaults to threading.
492 You can also use the `Dask <https://docs.dask.org/en/stable/>`_ joblib
493 backend to distribute work across machines. This works well with
494 scikit-learn estimators with the ``n_jobs`` parameter, for example::
496 >>> import joblib # doctest: +SKIP
497 >>> from sklearn.model_selection import GridSearchCV # doctest: +SKIP
498 >>> from dask.distributed import Client, LocalCluster # doctest: +SKIP
500 >>> # create a local Dask cluster
501 >>> cluster = LocalCluster() # doctest: +SKIP
502 >>> client = Client(cluster) # doctest: +SKIP
503 >>> grid_search = GridSearchCV(estimator, param_grid, n_jobs=-1)
504 ... # doctest: +SKIP
505 >>> with joblib.parallel_backend("dask", scatter=[X, y]): # doctest: +SKIP
506 ... grid_search.fit(X, y)
508 It is also possible to use the distributed 'ray' backend for distributing
509 the workload to a cluster of nodes. To use the 'ray' joblib backend add
510 the following lines::
512 >>> from ray.util.joblib import register_ray # doctest: +SKIP
513 >>> register_ray() # doctest: +SKIP
514 >>> with parallel_backend("ray"): # doctest: +SKIP
515 ... print(Parallel()(delayed(neg)(i + 1) for i in range(5)))
516 [-1, -2, -3, -4, -5]
518 Alternatively the backend can be passed directly as an instance.
520 By default all available workers will be used (``n_jobs=-1``) unless the
521 caller passes an explicit value for the ``n_jobs`` parameter.
523 This is an alternative to passing a ``backend='backend_name'`` argument to
524 the :class:`~Parallel` class constructor. It is particularly useful when
525 calling into library code that uses joblib internally but does not expose
526 the backend argument in its own API.
528 >>> from operator import neg
529 >>> with parallel_backend('threading'):
530 ... print(Parallel()(delayed(neg)(i + 1) for i in range(5)))
531 ...
532 [-1, -2, -3, -4, -5]
534 Joblib also tries to limit the oversubscription by limiting the number of
535 threads usable in some third-party library threadpools like OpenBLAS, MKL
536 or OpenMP. The default limit in each worker is set to
537 ``max(cpu_count() // effective_n_jobs, 1)`` but this limit can be
538 overwritten with the ``inner_max_num_threads`` argument which will be used
539 to set this limit in the child processes.
541 .. versionadded:: 0.10
543 See Also
544 --------
545 joblib.parallel_config: context manager to change the backend configuration.
546 """
548 def __init__(
549 self, backend, n_jobs=-1, inner_max_num_threads=None, **backend_params
550 ):
551 super().__init__(
552 backend=backend,
553 n_jobs=n_jobs,
554 inner_max_num_threads=inner_max_num_threads,
555 **backend_params,
556 )
558 if self.old_parallel_config is None:
559 self.old_backend_and_jobs = None
560 else:
561 self.old_backend_and_jobs = (
562 self.old_parallel_config["backend"],
563 self.old_parallel_config["n_jobs"],
564 )
565 self.new_backend_and_jobs = (
566 self.parallel_config["backend"],
567 self.parallel_config["n_jobs"],
568 )
570 def __enter__(self):
571 return self.new_backend_and_jobs
574# Under Linux or OS X the default start method of multiprocessing
575# can cause third party libraries to crash. Under Python 3.4+ it is possible
576# to set an environment variable to switch the default start method from
577# 'fork' to 'forkserver' or 'spawn' to avoid this issue albeit at the cost
578# of causing semantic changes and some additional pool instantiation overhead.
579DEFAULT_MP_CONTEXT = None
580if hasattr(mp, "get_context"):
581 method = os.environ.get("JOBLIB_START_METHOD", "").strip() or None
582 if method is not None:
583 DEFAULT_MP_CONTEXT = mp.get_context(method=method)
586class BatchedCalls(object):
587 """Wrap a sequence of (func, args, kwargs) tuples as a single callable"""
589 def __init__(
590 self, iterator_slice, backend_and_jobs, reducer_callback=None, pickle_cache=None
591 ):
592 self.items = list(iterator_slice)
593 self._size = len(self.items)
594 self._reducer_callback = reducer_callback
595 if isinstance(backend_and_jobs, tuple):
596 self._backend, self._n_jobs = backend_and_jobs
597 else:
598 # this is for backward compatibility purposes. Before 0.12.6,
599 # nested backends were returned without n_jobs indications.
600 self._backend, self._n_jobs = backend_and_jobs, None
601 self._pickle_cache = pickle_cache if pickle_cache is not None else {}
603 def __call__(self):
604 # Set the default nested backend to self._backend but do not set the
605 # change the default number of processes to -1
606 with parallel_config(backend=self._backend, n_jobs=self._n_jobs):
607 return [func(*args, **kwargs) for func, args, kwargs in self.items]
609 def __reduce__(self):
610 if self._reducer_callback is not None:
611 self._reducer_callback()
612 # no need to pickle the callback.
613 return (
614 BatchedCalls,
615 (self.items, (self._backend, self._n_jobs), None, self._pickle_cache),
616 )
618 def __len__(self):
619 return self._size
622# Possible exit status for a task
623TASK_DONE = "Done"
624TASK_ERROR = "Error"
625TASK_PENDING = "Pending"
628###############################################################################
629# CPU count that works also when multiprocessing has been disabled via
630# the JOBLIB_MULTIPROCESSING environment variable
631def cpu_count(only_physical_cores=False):
632 """Return the number of CPUs.
634 This delegates to loky.cpu_count that takes into account additional
635 constraints such as Linux CFS scheduler quotas (typically set by container
636 runtimes such as docker) and CPU affinity (for instance using the taskset
637 command on Linux).
639 Parameters
640 ----------
641 only_physical_cores : boolean, default=False
642 If True, does not take hyperthreading / SMT logical cores into account.
644 """
645 if mp is None:
646 return 1
648 return loky.cpu_count(only_physical_cores=only_physical_cores)
651###############################################################################
652# For verbosity
655def _verbosity_filter(index, verbose):
656 """Returns False for indices increasingly apart, the distance
657 depending on the value of verbose.
659 We use a lag increasing as the square of index
660 """
661 if not verbose:
662 return True
663 elif verbose > 10:
664 return False
665 if index == 0:
666 return False
667 verbose = 0.5 * (11 - verbose) ** 2
668 scale = sqrt(index / verbose)
669 next_scale = sqrt((index + 1) / verbose)
670 return int(next_scale) == int(scale)
673###############################################################################
674def delayed(function):
675 """Decorator used to capture the arguments of a function.
677 Parameters
678 ----------
679 function: callable
680 The function to be decorated.
682 Returns
683 -------
684 callable
685 A new function ``F`` such that calling ``F(*args, **kwargs)``
686 returns a tuple ``(function, args, kwargs)``, allowing the later
687 execution of ``function(*args, **kwargs)``.
689 Notes
690 -----
691 Be careful about the order in which decorators are applied, especially
692 when using :meth:`Memory.cache<joblib.Memory.cache>`. For instance,
693 ``Memory.cache(delayed(func))`` will cache the outputs of
694 ``delayed(func)``, that is, tuples of the form ``(func, args, kwargs)``.
695 To cache the outputs of ``func`` itself, you must instead use
696 ``delayed(Memory.cache(func))``.
697 """
699 def delayed_function(*args, **kwargs):
700 return function, args, kwargs
702 try:
703 delayed_function = functools.wraps(function)(delayed_function)
704 except AttributeError:
705 " functools.wraps fails on some callable objects "
706 return delayed_function
709###############################################################################
710class BatchCompletionCallBack(object):
711 """Callback to keep track of completed results and schedule the next tasks.
713 This callable is executed by the parent process whenever a worker process
714 has completed a batch of tasks.
716 It is used for progress reporting, to update estimate of the batch
717 processing duration and to schedule the next batch of tasks to be
718 processed.
720 It is assumed that this callback will always be triggered by the backend
721 right after the end of a task, in case of success as well as in case of
722 failure.
723 """
725 ##########################################################################
726 # METHODS CALLED BY THE MAIN THREAD #
727 ##########################################################################
728 def __init__(self, dispatch_timestamp, batch_size, parallel):
729 self.dispatch_timestamp = dispatch_timestamp
730 self.batch_size = batch_size
731 self.parallel = parallel
732 self.parallel_call_id = parallel._call_id
733 self._completion_timeout_counter = None
735 # Internals to keep track of the status and outcome of the task.
737 # Used to hold a reference to the future-like object returned by the
738 # backend after launching this task
739 # This will be set later when calling `register_job`, as it is only
740 # created once the task has been submitted.
741 self.job = None
743 if not parallel._backend.supports_retrieve_callback:
744 # The status is only used for asynchronous result retrieval in the
745 # callback.
746 self.status = None
747 else:
748 # The initial status for the job is TASK_PENDING.
749 # Once it is done, it will be either TASK_DONE, or TASK_ERROR.
750 self.status = TASK_PENDING
752 def register_job(self, job):
753 """Register the object returned by `submit`."""
754 self.job = job
756 def get_result(self, timeout):
757 """Returns the raw result of the task that was submitted.
759 If the task raised an exception rather than returning, this same
760 exception will be raised instead.
762 If the backend supports the retrieval callback, it is assumed that this
763 method is only called after the result has been registered. It is
764 ensured by checking that `self.status(timeout)` does not return
765 TASK_PENDING. In this case, `get_result` directly returns the
766 registered result (or raise the registered exception).
768 For other backends, there are no such assumptions, but `get_result`
769 still needs to synchronously retrieve the result before it can
770 return it or raise. It will block at most `self.timeout` seconds
771 waiting for retrieval to complete, after that it raises a TimeoutError.
772 """
774 backend = self.parallel._backend
776 if backend.supports_retrieve_callback:
777 # We assume that the result has already been retrieved by the
778 # callback thread, and is stored internally. It's just waiting to
779 # be returned.
780 return self._return_or_raise()
782 # For other backends, the main thread needs to run the retrieval step.
783 try:
784 result = backend.retrieve_result(self.job, timeout=timeout)
785 outcome = dict(result=result, status=TASK_DONE)
786 except BaseException as e:
787 outcome = dict(result=e, status=TASK_ERROR)
788 self._register_outcome(outcome)
790 return self._return_or_raise()
792 def _return_or_raise(self):
793 try:
794 if self.status == TASK_ERROR:
795 raise self._result
796 return self._result
797 finally:
798 del self._result
800 def get_status(self, timeout):
801 """Get the status of the task.
803 This function also checks if the timeout has been reached and register
804 the TimeoutError outcome when it is the case.
805 """
806 if timeout is None or self.status != TASK_PENDING:
807 return self.status
809 # The computation are running and the status is pending.
810 # Check that we did not wait for this jobs more than `timeout`.
811 now = time.time()
812 if self._completion_timeout_counter is None:
813 self._completion_timeout_counter = now
815 if (now - self._completion_timeout_counter) > timeout:
816 outcome = dict(result=TimeoutError(), status=TASK_ERROR)
817 self._register_outcome(outcome)
819 return self.status
821 ##########################################################################
822 # METHODS CALLED BY CALLBACK THREADS #
823 ##########################################################################
824 def __call__(self, *args, **kwargs):
825 """Function called by the callback thread after a job is completed."""
827 # If the backend doesn't support callback retrievals, the next batch of
828 # tasks is dispatched regardless. The result will be retrieved by the
829 # main thread when calling `get_result`.
830 if not self.parallel._backend.supports_retrieve_callback:
831 self._dispatch_new()
832 return
834 # If the backend supports retrieving the result in the callback, it
835 # registers the task outcome (TASK_ERROR or TASK_DONE), and schedules
836 # the next batch if needed.
837 with self.parallel._lock:
838 # Edge case where while the task was processing, the `parallel`
839 # instance has been reset and a new call has been issued, but the
840 # worker managed to complete the task and trigger this callback
841 # call just before being aborted by the reset.
842 if self.parallel._call_id != self.parallel_call_id:
843 return
845 # When aborting, stop as fast as possible and do not retrieve the
846 # result as it won't be returned by the Parallel call.
847 if self.parallel._aborting:
848 return
850 # Retrieves the result of the task in the main process and dispatch
851 # a new batch if needed.
852 job_succeeded = self._retrieve_result(*args, **kwargs)
854 if job_succeeded:
855 self._dispatch_new()
857 def _dispatch_new(self):
858 """Schedule the next batch of tasks to be processed."""
860 # This steps ensure that auto-batching works as expected.
861 this_batch_duration = time.time() - self.dispatch_timestamp
862 self.parallel._backend.batch_completed(self.batch_size, this_batch_duration)
864 # Schedule the next batch of tasks.
865 with self.parallel._lock:
866 self.parallel.n_completed_tasks += self.batch_size
867 self.parallel.print_progress()
868 if self.parallel._original_iterator is not None:
869 self.parallel.dispatch_next()
871 def _retrieve_result(self, out):
872 """Fetch and register the outcome of a task.
874 Return True if the task succeeded, False otherwise.
875 This function is only called by backends that support retrieving
876 the task result in the callback thread.
877 """
878 try:
879 result = self.parallel._backend.retrieve_result_callback(out)
880 outcome = dict(status=TASK_DONE, result=result)
881 except BaseException as e:
882 # Avoid keeping references to parallel in the error.
883 e.__traceback__ = None
884 outcome = dict(result=e, status=TASK_ERROR)
886 self._register_outcome(outcome)
887 return outcome["status"] != TASK_ERROR
889 ##########################################################################
890 # This method can be called either in the main thread #
891 # or in the callback thread. #
892 ##########################################################################
893 def _register_outcome(self, outcome):
894 """Register the outcome of a task.
896 This method can be called only once, future calls will be ignored.
897 """
898 # Covers the edge case where the main thread tries to register a
899 # `TimeoutError` while the callback thread tries to register a result
900 # at the same time.
901 with self.parallel._lock:
902 if self.status not in (TASK_PENDING, None):
903 return
904 self.status = outcome["status"]
906 self._result = outcome["result"]
908 # Once the result and the status are extracted, the last reference to
909 # the job can be deleted.
910 self.job = None
912 # As soon as an error as been spotted, early stopping flags are sent to
913 # the `parallel` instance.
914 if self.status == TASK_ERROR:
915 self.parallel._exception = True
916 self.parallel._aborting = True
918 if self.parallel.return_ordered:
919 return
921 with self.parallel._lock:
922 # For `return_as=generator_unordered`, append the job to the queue
923 # in the order of completion instead of submission.
924 self.parallel._jobs.append(self)
927###############################################################################
928def register_parallel_backend(name, factory, make_default=False):
929 """Register a new Parallel backend factory.
931 The new backend can then be selected by passing its name as the backend
932 argument to the :class:`~Parallel` class. Moreover, the default backend can
933 be overwritten globally by setting make_default=True.
935 The factory can be any callable that takes no argument and return an
936 instance of ``ParallelBackendBase``.
938 Warning: this function is experimental and subject to change in a future
939 version of joblib.
941 .. versionadded:: 0.10
942 """
943 BACKENDS[name] = factory
944 if make_default:
945 global DEFAULT_BACKEND
946 DEFAULT_BACKEND = name
949def effective_n_jobs(n_jobs=-1):
950 """Determine the number of jobs that can actually run in parallel
952 n_jobs is the number of workers requested by the callers. Passing n_jobs=-1
953 means requesting all available workers for instance matching the number of
954 CPU cores on the worker host(s).
956 This method should return a guesstimate of the number of workers that can
957 actually perform work concurrently with the currently enabled default
958 backend. The primary use case is to make it possible for the caller to know
959 in how many chunks to slice the work.
961 In general working on larger data chunks is more efficient (less scheduling
962 overhead and better use of CPU cache prefetching heuristics) as long as all
963 the workers have enough work to do.
965 Warning: this function is experimental and subject to change in a future
966 version of joblib.
968 .. versionadded:: 0.10
969 """
970 if n_jobs == 1:
971 return 1
973 backend, backend_n_jobs = get_active_backend()
974 if n_jobs is None:
975 n_jobs = backend_n_jobs
976 return backend.effective_n_jobs(n_jobs=n_jobs)
979###############################################################################
980class Parallel(Logger):
981 """Helper class for readable parallel mapping.
983 Read more in the :ref:`User Guide <parallel>`.
985 Parameters
986 ----------
987 n_jobs: int, default=None
988 The maximum number of concurrently running jobs, such as the number
989 of Python worker processes when ``backend="loky"`` or the size of
990 the thread-pool when ``backend="threading"``.
991 This argument is converted to an integer, rounded below for float.
992 If -1 is given, `joblib` tries to use all CPUs. The number of CPUs
993 ``n_cpus`` is obtained with :func:`~cpu_count`.
994 For n_jobs below -1, (n_cpus + 1 + n_jobs) are used. For instance,
995 using ``n_jobs=-2`` will result in all CPUs but one being used.
996 This argument can also go above ``n_cpus``, which will cause
997 oversubscription. In some cases, slight oversubscription can be
998 beneficial, e.g., for tasks with large I/O operations.
999 If 1 is given, no parallel computing code is used at all, and the
1000 behavior amounts to a simple python `for` loop. This mode is not
1001 compatible with ``timeout``.
1002 None is a marker for 'unset' that will be interpreted as n_jobs=1
1003 unless the call is performed under a :func:`~parallel_config`
1004 context manager that sets another value for ``n_jobs``.
1005 If n_jobs = 0 then a ValueError is raised.
1006 backend: str, ParallelBackendBase instance or None, default='loky'
1007 Specify the parallelization backend implementation.
1008 Supported backends are:
1010 - "loky" used by default, can induce some
1011 communication and memory overhead when exchanging input and
1012 output data with the worker Python processes. On some rare
1013 systems (such as Pyiodide), the loky backend may not be
1014 available.
1015 - "multiprocessing" previous process-based backend based on
1016 `multiprocessing.Pool`. Less robust than `loky`.
1017 - "threading" is a very low-overhead backend but it suffers
1018 from the Python Global Interpreter Lock if the called function
1019 relies a lot on Python objects. "threading" is mostly useful
1020 when the execution bottleneck is a compiled extension that
1021 explicitly releases the GIL (for instance a Cython loop wrapped
1022 in a "with nogil" block or an expensive call to a library such
1023 as NumPy).
1024 - finally, you can register backends by calling
1025 :func:`~register_parallel_backend`. This will allow you to
1026 implement a backend of your liking.
1028 It is not recommended to hard-code the backend name in a call to
1029 :class:`~Parallel` in a library. Instead it is recommended to set
1030 soft hints (prefer) or hard constraints (require) so as to make it
1031 possible for library users to change the backend from the outside
1032 using the :func:`~parallel_config` context manager.
1033 return_as: str in {'list', 'generator', 'generator_unordered'}, default='list'
1034 If 'list', calls to this instance will return a list, only when
1035 all results have been processed and retrieved.
1036 If 'generator', it will return a generator that yields the results
1037 as soon as they are available, in the order the tasks have been
1038 submitted with.
1039 If 'generator_unordered', the generator will immediately yield
1040 available results independently of the submission order. The output
1041 order is not deterministic in this case because it depends on the
1042 concurrency of the workers.
1043 prefer: str in {'processes', 'threads'} or None, default=None
1044 Soft hint to choose the default backend if no specific backend
1045 was selected with the :func:`~parallel_config` context manager.
1046 The default process-based backend is 'loky' and the default
1047 thread-based backend is 'threading'. Ignored if the ``backend``
1048 parameter is specified.
1049 require: 'sharedmem' or None, default=None
1050 Hard constraint to select the backend. If set to 'sharedmem',
1051 the selected backend will be single-host and thread-based even
1052 if the user asked for a non-thread based backend with
1053 :func:`~joblib.parallel_config`.
1054 verbose: int, default=0
1055 The verbosity level: if non zero, progress messages are
1056 printed. Above 50, the output is sent to stdout.
1057 The frequency of the messages increases with the verbosity level.
1058 If it more than 10, all iterations are reported.
1059 timeout: float or None, default=None
1060 Timeout limit for each task to complete. If any task takes longer
1061 a TimeOutError will be raised. Only applied when n_jobs != 1
1062 pre_dispatch: {'all', integer, or expression, as in '3*n_jobs'}, default='2*n_jobs'
1063 The number of batches (of tasks) to be pre-dispatched.
1064 Default is '2*n_jobs'. When batch_size="auto" this is reasonable
1065 default and the workers should never starve. Note that only basic
1066 arithmetic are allowed here and no modules can be used in this
1067 expression.
1068 batch_size: int or 'auto', default='auto'
1069 The number of atomic tasks to dispatch at once to each
1070 worker. When individual evaluations are very fast, dispatching
1071 calls to workers can be slower than sequential computation because
1072 of the overhead. Batching fast computations together can mitigate
1073 this.
1074 The ``'auto'`` strategy keeps track of the time it takes for a
1075 batch to complete, and dynamically adjusts the batch size to keep
1076 the time on the order of half a second, using a heuristic. The
1077 initial batch size is 1.
1078 ``batch_size="auto"`` with ``backend="threading"`` will dispatch
1079 batches of a single task at a time as the threading backend has
1080 very little overhead and using larger batch size has not proved to
1081 bring any gain in that case.
1082 temp_folder: str or None, default=None
1083 Folder to be used by the pool for memmapping large arrays
1084 for sharing memory with worker processes. If None, this will try in
1085 order:
1087 - a folder pointed by the JOBLIB_TEMP_FOLDER environment
1088 variable,
1089 - /dev/shm if the folder exists and is writable: this is a
1090 RAM disk filesystem available by default on modern Linux
1091 distributions,
1092 - the default system temporary folder that can be
1093 overridden with TMP, TMPDIR or TEMP environment
1094 variables, typically /tmp under Unix operating systems.
1096 Only active when ``backend="loky"`` or ``"multiprocessing"``.
1097 max_nbytes int, str, or None, optional, default='1M'
1098 Threshold on the size of arrays passed to the workers that
1099 triggers automated memory mapping in temp_folder. Can be an int
1100 in Bytes, or a human-readable string, e.g., '1M' for 1 megabyte.
1101 Use None to disable memmapping of large arrays.
1102 Only active when ``backend="loky"`` or ``"multiprocessing"``.
1103 mmap_mode: {None, 'r+', 'r', 'w+', 'c'}, default='r'
1104 Memmapping mode for numpy arrays passed to workers. None will
1105 disable memmapping, other modes defined in the numpy.memmap doc:
1106 https://numpy.org/doc/stable/reference/generated/numpy.memmap.html
1107 Also, see 'max_nbytes' parameter documentation for more details.
1108 backend_kwargs: dict, optional
1109 Additional parameters to pass to the backend `configure` method.
1111 Notes
1112 -----
1114 This object uses workers to compute in parallel the application of a
1115 function to many different arguments. The main functionality it brings
1116 in addition to using the raw multiprocessing or concurrent.futures API
1117 are (see examples for details):
1119 * More readable code, in particular since it avoids
1120 constructing list of arguments.
1122 * Easier debugging:
1123 - informative tracebacks even when the error happens on
1124 the client side
1125 - using 'n_jobs=1' enables to turn off parallel computing
1126 for debugging without changing the codepath
1127 - early capture of pickling errors
1129 * An optional progress meter.
1131 * Interruption of multiprocesses jobs with 'Ctrl-C'
1133 * Flexible pickling control for the communication to and from
1134 the worker processes.
1136 * Ability to use shared memory efficiently with worker
1137 processes for large numpy-based datastructures.
1139 Note that the intended usage is to run one call at a time. Multiple
1140 calls to the same Parallel object will result in a ``RuntimeError``
1142 Examples
1143 --------
1145 A simple example:
1147 >>> from math import sqrt
1148 >>> from joblib import Parallel, delayed
1149 >>> Parallel(n_jobs=1)(delayed(sqrt)(i**2) for i in range(10))
1150 [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
1152 Reshaping the output when the function has several return
1153 values:
1155 >>> from math import modf
1156 >>> from joblib import Parallel, delayed
1157 >>> r = Parallel(n_jobs=1)(delayed(modf)(i/2.) for i in range(10))
1158 >>> res, i = zip(*r)
1159 >>> res
1160 (0.0, 0.5, 0.0, 0.5, 0.0, 0.5, 0.0, 0.5, 0.0, 0.5)
1161 >>> i
1162 (0.0, 0.0, 1.0, 1.0, 2.0, 2.0, 3.0, 3.0, 4.0, 4.0)
1164 The progress meter: the higher the value of `verbose`, the more
1165 messages:
1167 >>> from time import sleep
1168 >>> from joblib import Parallel, delayed
1169 >>> r = Parallel(n_jobs=2, verbose=10)(
1170 ... delayed(sleep)(.2) for _ in range(10)) #doctest: +SKIP
1171 [Parallel(n_jobs=2)]: Done 1 tasks | elapsed: 0.6s
1172 [Parallel(n_jobs=2)]: Done 4 tasks | elapsed: 0.8s
1173 [Parallel(n_jobs=2)]: Done 10 out of 10 | elapsed: 1.4s finished
1175 Traceback example, note how the line of the error is indicated
1176 as well as the values of the parameter passed to the function that
1177 triggered the exception, even though the traceback happens in the
1178 child process:
1180 >>> from heapq import nlargest
1181 >>> from joblib import Parallel, delayed
1182 >>> Parallel(n_jobs=2)(
1183 ... delayed(nlargest)(2, n) for n in (range(4), 'abcde', 3))
1184 ... # doctest: +SKIP
1185 -----------------------------------------------------------------------
1186 Sub-process traceback:
1187 -----------------------------------------------------------------------
1188 TypeError Mon Nov 12 11:37:46 2012
1189 PID: 12934 Python 2.7.3: /usr/bin/python
1190 ........................................................................
1191 /usr/lib/python2.7/heapq.pyc in nlargest(n=2, iterable=3, key=None)
1192 419 if n >= size:
1193 420 return sorted(iterable, key=key, reverse=True)[:n]
1194 421
1195 422 # When key is none, use simpler decoration
1196 423 if key is None:
1197 --> 424 it = izip(iterable, count(0,-1)) # decorate
1198 425 result = _nlargest(n, it)
1199 426 return map(itemgetter(0), result) # undecorate
1200 427
1201 428 # General case, slowest method
1202 TypeError: izip argument #1 must support iteration
1203 _______________________________________________________________________
1206 Using pre_dispatch in a producer/consumer situation, where the
1207 data is generated on the fly. Note how the producer is first
1208 called 3 times before the parallel loop is initiated, and then
1209 called to generate new data on the fly:
1211 >>> from math import sqrt
1212 >>> from joblib import Parallel, delayed
1213 >>> def producer():
1214 ... for i in range(6):
1215 ... print('Produced %s' % i)
1216 ... yield i
1217 >>> out = Parallel(n_jobs=2, verbose=100, pre_dispatch='1.5*n_jobs')(
1218 ... delayed(sqrt)(i) for i in producer()) #doctest: +SKIP
1219 Produced 0
1220 Produced 1
1221 Produced 2
1222 [Parallel(n_jobs=2)]: Done 1 jobs | elapsed: 0.0s
1223 Produced 3
1224 [Parallel(n_jobs=2)]: Done 2 jobs | elapsed: 0.0s
1225 Produced 4
1226 [Parallel(n_jobs=2)]: Done 3 jobs | elapsed: 0.0s
1227 Produced 5
1228 [Parallel(n_jobs=2)]: Done 4 jobs | elapsed: 0.0s
1229 [Parallel(n_jobs=2)]: Done 6 out of 6 | elapsed: 0.0s remaining: 0.0s
1230 [Parallel(n_jobs=2)]: Done 6 out of 6 | elapsed: 0.0s finished
1232 """ # noqa: E501
1234 def __init__(
1235 self,
1236 n_jobs=default_parallel_config["n_jobs"],
1237 backend=default_parallel_config["backend"],
1238 return_as="list",
1239 verbose=default_parallel_config["verbose"],
1240 timeout=None,
1241 pre_dispatch="2 * n_jobs",
1242 batch_size="auto",
1243 temp_folder=default_parallel_config["temp_folder"],
1244 max_nbytes=default_parallel_config["max_nbytes"],
1245 mmap_mode=default_parallel_config["mmap_mode"],
1246 prefer=default_parallel_config["prefer"],
1247 require=default_parallel_config["require"],
1248 **backend_kwargs,
1249 ):
1250 # Initiate parent Logger class state
1251 super().__init__()
1253 # Interpret n_jobs=None as 'unset'
1254 if n_jobs is None:
1255 n_jobs = default_parallel_config["n_jobs"]
1257 active_backend, context_config = _get_active_backend(
1258 prefer=prefer, require=require, verbose=verbose
1259 )
1261 nesting_level = active_backend.nesting_level
1263 self.verbose = _get_config_param(verbose, context_config, "verbose")
1264 self.timeout = timeout
1265 self.pre_dispatch = pre_dispatch
1267 if return_as not in {"list", "generator", "generator_unordered"}:
1268 raise ValueError(
1269 'Expected `return_as` parameter to be a string equal to "list"'
1270 f',"generator" or "generator_unordered", but got {return_as} '
1271 "instead."
1272 )
1273 self.return_as = return_as
1274 self.return_generator = return_as != "list"
1275 self.return_ordered = return_as != "generator_unordered"
1277 # Check if we are under a parallel_config or parallel_backend
1278 # context manager and use the config from the context manager
1279 # for arguments that are not explicitly set.
1280 self._backend_kwargs = {
1281 **backend_kwargs,
1282 **{
1283 k: _get_config_param(param, context_config, k)
1284 for param, k in [
1285 (max_nbytes, "max_nbytes"),
1286 (temp_folder, "temp_folder"),
1287 (mmap_mode, "mmap_mode"),
1288 (prefer, "prefer"),
1289 (require, "require"),
1290 (verbose, "verbose"),
1291 ]
1292 },
1293 }
1295 if isinstance(self._backend_kwargs["max_nbytes"], str):
1296 self._backend_kwargs["max_nbytes"] = memstr_to_bytes(
1297 self._backend_kwargs["max_nbytes"]
1298 )
1299 self._backend_kwargs["verbose"] = max(0, self._backend_kwargs["verbose"] - 50)
1301 if DEFAULT_MP_CONTEXT is not None:
1302 self._backend_kwargs["context"] = DEFAULT_MP_CONTEXT
1303 elif hasattr(mp, "get_context"):
1304 self._backend_kwargs["context"] = mp.get_context()
1306 if backend is default_parallel_config["backend"] or backend is None:
1307 backend = active_backend
1309 elif isinstance(backend, ParallelBackendBase):
1310 # Use provided backend as is, with the current nesting_level if it
1311 # is not set yet.
1312 if backend.nesting_level is None:
1313 backend.nesting_level = nesting_level
1315 elif hasattr(backend, "Pool") and hasattr(backend, "Lock"):
1316 # Make it possible to pass a custom multiprocessing context as
1317 # backend to change the start method to forkserver or spawn or
1318 # preload modules on the forkserver helper process.
1319 self._backend_kwargs["context"] = backend
1320 backend = MultiprocessingBackend(nesting_level=nesting_level)
1322 elif backend not in BACKENDS and backend in MAYBE_AVAILABLE_BACKENDS:
1323 warnings.warn(
1324 f"joblib backend '{backend}' is not available on "
1325 f"your system, falling back to {DEFAULT_BACKEND}.",
1326 UserWarning,
1327 stacklevel=2,
1328 )
1329 BACKENDS[backend] = BACKENDS[DEFAULT_BACKEND]
1330 backend = BACKENDS[DEFAULT_BACKEND](nesting_level=nesting_level)
1332 else:
1333 try:
1334 backend_factory = BACKENDS[backend]
1335 except KeyError as e:
1336 raise ValueError(
1337 "Invalid backend: %s, expected one of %r"
1338 % (backend, sorted(BACKENDS.keys()))
1339 ) from e
1340 backend = backend_factory(nesting_level=nesting_level)
1342 n_jobs = _get_config_param(n_jobs, context_config, "n_jobs")
1343 if n_jobs is None:
1344 # No specific context override and no specific value request:
1345 # default to the default of the backend.
1346 n_jobs = backend.default_n_jobs
1347 try:
1348 n_jobs = int(n_jobs)
1349 except ValueError:
1350 raise ValueError("n_jobs could not be converted to int")
1351 self.n_jobs = n_jobs
1353 if require == "sharedmem" and not getattr(backend, "supports_sharedmem", False):
1354 raise ValueError("Backend %s does not support shared memory" % backend)
1356 if batch_size == "auto" or isinstance(batch_size, Integral) and batch_size > 0:
1357 self.batch_size = batch_size
1358 else:
1359 raise ValueError(
1360 "batch_size must be 'auto' or a positive integer, got: %r" % batch_size
1361 )
1363 if not isinstance(backend, SequentialBackend):
1364 if self.return_generator and not backend.supports_return_generator:
1365 raise ValueError(
1366 "Backend {} does not support return_as={}".format(
1367 backend, return_as
1368 )
1369 )
1370 # This lock is used to coordinate the main thread of this process
1371 # with the async callback thread of our the pool.
1372 self._lock = threading.RLock()
1373 self._jobs = collections.deque()
1374 self._jobs_set = set()
1375 self._pending_outputs = list()
1376 self._ready_batches = queue.Queue()
1377 self._reducer_callback = None
1379 # Internal variables
1380 self._backend = backend
1381 self._running = False
1382 self._managed_backend = False
1383 self._id = uuid4().hex
1384 self._call_ref = None
1386 def __enter__(self):
1387 self._managed_backend = True
1388 self._calling = False
1389 self._initialize_backend()
1390 return self
1392 def __exit__(self, exc_type, exc_value, traceback):
1393 self._managed_backend = False
1394 if self.return_generator and self._calling:
1395 self._abort()
1396 self._terminate_and_reset()
1398 def _initialize_backend(self):
1399 """Build a process or thread pool and return the number of workers"""
1400 try:
1401 n_jobs = self._backend.configure(
1402 n_jobs=self.n_jobs, parallel=self, **self._backend_kwargs
1403 )
1404 if self.timeout is not None and not self._backend.supports_timeout:
1405 warnings.warn(
1406 "The backend class {!r} does not support timeout. "
1407 "You have set 'timeout={}' in Parallel but "
1408 "the 'timeout' parameter will not be used.".format(
1409 self._backend.__class__.__name__, self.timeout
1410 )
1411 )
1413 except FallbackToBackend as e:
1414 # Recursively initialize the backend in case of requested fallback.
1415 self._backend = e.backend
1416 n_jobs = self._initialize_backend()
1418 return n_jobs
1420 def _effective_n_jobs(self):
1421 if self._backend:
1422 return self._backend.effective_n_jobs(self.n_jobs)
1423 return 1
1425 def _terminate_and_reset(self):
1426 if hasattr(self._backend, "stop_call") and self._calling:
1427 self._backend.stop_call()
1428 self._calling = False
1429 if not self._managed_backend:
1430 self._backend.terminate()
1432 def _dispatch(self, batch):
1433 """Queue the batch for computing, with or without multiprocessing
1435 WARNING: this method is not thread-safe: it should be only called
1436 indirectly via dispatch_one_batch.
1438 """
1439 # If job.get() catches an exception, it closes the queue:
1440 if self._aborting:
1441 return
1443 batch_size = len(batch)
1445 self.n_dispatched_tasks += batch_size
1446 self.n_dispatched_batches += 1
1448 dispatch_timestamp = time.time()
1450 batch_tracker = BatchCompletionCallBack(dispatch_timestamp, batch_size, self)
1452 self._register_new_job(batch_tracker)
1454 # If return_ordered is False, the batch_tracker is not stored in the
1455 # jobs queue at the time of submission. Instead, it will be appended to
1456 # the queue by itself as soon as the callback is triggered to be able
1457 # to return the results in the order of completion.
1459 job = self._backend.submit(batch, callback=batch_tracker)
1460 batch_tracker.register_job(job)
1462 def _register_new_job(self, batch_tracker):
1463 if self.return_ordered:
1464 self._jobs.append(batch_tracker)
1465 else:
1466 self._jobs_set.add(batch_tracker)
1468 def dispatch_next(self):
1469 """Dispatch more data for parallel processing
1471 This method is meant to be called concurrently by the multiprocessing
1472 callback. We rely on the thread-safety of dispatch_one_batch to protect
1473 against concurrent consumption of the unprotected iterator.
1474 """
1475 if not self.dispatch_one_batch(self._original_iterator):
1476 self._iterating = False
1477 self._original_iterator = None
1479 def dispatch_one_batch(self, iterator):
1480 """Prefetch the tasks for the next batch and dispatch them.
1482 The effective size of the batch is computed here.
1483 If there are no more jobs to dispatch, return False, else return True.
1485 The iterator consumption and dispatching is protected by the same
1486 lock so calling this function should be thread safe.
1488 """
1490 if self._aborting:
1491 return False
1493 batch_size = self._get_batch_size()
1495 with self._lock:
1496 # to ensure an even distribution of the workload between workers,
1497 # we look ahead in the original iterators more than batch_size
1498 # tasks - However, we keep consuming only one batch at each
1499 # dispatch_one_batch call. The extra tasks are stored in a local
1500 # queue, _ready_batches, that is looked-up prior to re-consuming
1501 # tasks from the origal iterator.
1502 try:
1503 tasks = self._ready_batches.get(block=False)
1504 except queue.Empty:
1505 # slice the iterator n_jobs * batchsize items at a time. If the
1506 # slice returns less than that, then the current batchsize puts
1507 # too much weight on a subset of workers, while other may end
1508 # up starving. So in this case, re-scale the batch size
1509 # accordingly to distribute evenly the last items between all
1510 # workers.
1511 n_jobs = self._cached_effective_n_jobs
1512 big_batch_size = batch_size * n_jobs
1514 try:
1515 islice = list(itertools.islice(iterator, big_batch_size))
1516 except Exception as e:
1517 # Handle the fact that the generator of task raised an
1518 # exception. As this part of the code can be executed in
1519 # a thread internal to the backend, register a task with
1520 # an error that will be raised in the user's thread.
1521 if isinstance(e.__context__, queue.Empty):
1522 # Suppress the cause of the exception if it is
1523 # queue.Empty to avoid cluttered traceback. Only do it
1524 # if the __context__ is really empty to avoid messing
1525 # with causes of the original error.
1526 e.__cause__ = None
1527 batch_tracker = BatchCompletionCallBack(0, batch_size, self)
1528 self._register_new_job(batch_tracker)
1529 batch_tracker._register_outcome(dict(result=e, status=TASK_ERROR))
1530 return True
1532 if len(islice) == 0:
1533 return False
1534 elif (
1535 iterator is self._original_iterator and len(islice) < big_batch_size
1536 ):
1537 # We reached the end of the original iterator (unless
1538 # iterator is the ``pre_dispatch``-long initial slice of
1539 # the original iterator) -- decrease the batch size to
1540 # account for potential variance in the batches running
1541 # time.
1542 final_batch_size = max(1, len(islice) // (10 * n_jobs))
1543 else:
1544 final_batch_size = max(1, len(islice) // n_jobs)
1546 # enqueue n_jobs batches in a local queue
1547 for i in range(0, len(islice), final_batch_size):
1548 tasks = BatchedCalls(
1549 islice[i : i + final_batch_size],
1550 self._backend.get_nested_backend(),
1551 self._reducer_callback,
1552 self._pickle_cache,
1553 )
1554 self._ready_batches.put(tasks)
1556 # finally, get one task.
1557 tasks = self._ready_batches.get(block=False)
1558 if len(tasks) == 0:
1559 # No more tasks available in the iterator: tell caller to stop.
1560 return False
1561 else:
1562 self._dispatch(tasks)
1563 return True
1565 def _get_batch_size(self):
1566 """Returns the effective batch size for dispatch"""
1567 if self.batch_size == "auto":
1568 return self._backend.compute_batch_size()
1569 else:
1570 # Fixed batch size strategy
1571 return self.batch_size
1573 def _print(self, msg):
1574 """Display the message on stout or stderr depending on verbosity"""
1575 # XXX: Not using the logger framework: need to
1576 # learn to use logger better.
1577 if not self.verbose:
1578 return
1579 if self.verbose < 50:
1580 writer = sys.stderr.write
1581 else:
1582 writer = sys.stdout.write
1583 writer(f"[{self}]: {msg}\n")
1585 def _is_completed(self):
1586 """Check if all tasks have been completed"""
1587 return self.n_completed_tasks == self.n_dispatched_tasks and not (
1588 self._iterating or self._aborting
1589 )
1591 def print_progress(self):
1592 """Display the process of the parallel execution only a fraction
1593 of time, controlled by self.verbose.
1594 """
1596 if not self.verbose:
1597 return
1599 if self.n_tasks is not None and self.n_tasks > 0:
1600 width = floor(log10(self.n_tasks)) + 1
1601 else:
1602 width = 3
1603 elapsed_time = time.time() - self._start_time
1605 if self._is_completed():
1606 # Make sure that we get a last message telling us we are done
1607 self._print(
1608 f"Done {self.n_completed_tasks:{width}d} out of "
1609 f"{self.n_completed_tasks:{width}d} | elapsed: "
1610 f"{short_format_time(elapsed_time)} finished"
1611 )
1612 return
1614 # Original job iterator becomes None once it has been fully
1615 # consumed: at this point we know the total number of jobs and we are
1616 # able to display an estimation of the remaining time based on already
1617 # completed jobs. Otherwise, we simply display the number of completed
1618 # tasks.
1619 elif self._original_iterator is not None:
1620 if _verbosity_filter(self.n_dispatched_batches, self.verbose):
1621 return
1622 fmt_time = f"| elapsed: {short_format_time(elapsed_time)}"
1623 index = self.n_completed_tasks
1624 if self.n_tasks is not None:
1625 self._print(
1626 f"Done {index:{width}d} out of {self.n_tasks:{width}d} {fmt_time}"
1627 )
1628 else:
1629 pad = " " * (len("out of ") + width - len("tasks"))
1630 self._print(f"Done {index:{width}d} tasks {pad}{fmt_time}")
1631 else:
1632 index = self.n_completed_tasks
1633 # We are finished dispatching
1634 total_tasks = self.n_dispatched_tasks
1635 # We always display the first loop
1636 if index != 0:
1637 # Display depending on the number of remaining items
1638 # A message as soon as we finish dispatching, cursor is 0
1639 cursor = total_tasks - index + 1 - self._pre_dispatch_amount
1640 frequency = (total_tasks // self.verbose) + 1
1641 is_last_item = index + 1 == total_tasks
1642 if is_last_item or cursor % frequency:
1643 return
1644 remaining_time = (elapsed_time / max(index, 1)) * (
1645 self.n_dispatched_tasks - index
1646 )
1647 # only display status if remaining time is greater or equal to 0
1648 self._print(
1649 f"Done {index:{width}d} out of {total_tasks:{width}d} "
1650 f"| elapsed: {short_format_time(elapsed_time)} remaining: "
1651 f"{short_format_time(remaining_time)}"
1652 )
1654 def _abort(self):
1655 # Stop dispatching new jobs in the async callback thread
1656 self._aborting = True
1658 # If the backend allows it, cancel or kill remaining running
1659 # tasks without waiting for the results as we will raise
1660 # the exception we got back to the caller instead of returning
1661 # any result.
1662 backend = self._backend
1663 if not self._aborted and hasattr(backend, "abort_everything"):
1664 # If the backend is managed externally we need to make sure
1665 # to leave it in a working state to allow for future jobs
1666 # scheduling.
1667 ensure_ready = self._managed_backend
1668 backend.abort_everything(ensure_ready=ensure_ready)
1669 self._aborted = True
1671 def _start(self, iterator, pre_dispatch):
1672 # Only set self._iterating to True if at least a batch
1673 # was dispatched. In particular this covers the edge
1674 # case of Parallel used with an exhausted iterator. If
1675 # self._original_iterator is None, then this means either
1676 # that pre_dispatch == "all", n_jobs == 1 or that the first batch
1677 # was very quick and its callback already dispatched all the
1678 # remaining jobs.
1679 self._iterating = False
1680 if self.dispatch_one_batch(iterator):
1681 self._iterating = self._original_iterator is not None
1683 while self.dispatch_one_batch(iterator):
1684 pass
1686 if pre_dispatch == "all":
1687 # The iterable was consumed all at once by the above for loop.
1688 # No need to wait for async callbacks to trigger to
1689 # consumption.
1690 self._iterating = False
1692 def _get_outputs(self, iterator, pre_dispatch):
1693 """Iterator returning the tasks' output as soon as they are ready."""
1694 dispatch_thread_id = threading.get_ident()
1695 detach_generator_exit = False
1696 try:
1697 self._start(iterator, pre_dispatch)
1698 # first yield returns None, for internal use only. This ensures
1699 # that we enter the try/except block and start dispatching the
1700 # tasks.
1701 yield
1703 with self._backend.retrieval_context():
1704 yield from self._retrieve()
1706 except GeneratorExit:
1707 # The generator has been garbage collected before being fully
1708 # consumed. This aborts the remaining tasks if possible and warn
1709 # the user if necessary.
1710 self._exception = True
1712 # In some interpreters such as PyPy, GeneratorExit can be raised in
1713 # a different thread than the one used to start the dispatch of the
1714 # parallel tasks. This can lead to hang when a thread attempts to
1715 # join itself. As workaround, we detach the execution of the
1716 # aborting code to a dedicated thread. We then need to make sure
1717 # the rest of the function does not call `_terminate_and_reset`
1718 # in finally.
1719 if dispatch_thread_id != threading.get_ident():
1720 warnings.warn(
1721 "A generator produced by joblib.Parallel has been "
1722 "gc'ed in an unexpected thread. This behavior should "
1723 "not cause major -issues but to make sure, please "
1724 "report this warning and your use case at "
1725 "https://github.com/joblib/joblib/issues so it can "
1726 "be investigated."
1727 )
1729 detach_generator_exit = True
1730 _parallel = self
1732 class _GeneratorExitThread(threading.Thread):
1733 def run(self):
1734 _parallel._abort()
1735 if _parallel.return_generator:
1736 _parallel._warn_exit_early()
1737 _parallel._terminate_and_reset()
1739 _GeneratorExitThread(name="GeneratorExitThread").start()
1740 return
1742 # Otherwise, we are in the thread that started the dispatch: we can
1743 # safely abort the execution and warn the user.
1744 self._abort()
1745 if self.return_generator:
1746 self._warn_exit_early()
1748 raise
1750 # Note: we catch any BaseException instead of just Exception instances
1751 # to also include KeyboardInterrupt
1752 except BaseException:
1753 self._exception = True
1754 self._abort()
1755 raise
1756 finally:
1757 # Store the unconsumed tasks and terminate the workers if necessary
1758 _remaining_outputs = [] if self._exception else self._jobs
1759 self._jobs = collections.deque()
1760 self._jobs_set = set()
1761 self._running = False
1762 if not detach_generator_exit:
1763 self._terminate_and_reset()
1765 while len(_remaining_outputs) > 0:
1766 batched_results = _remaining_outputs.popleft()
1767 batched_results = batched_results.get_result(self.timeout)
1768 for result in batched_results:
1769 yield result
1771 def _wait_retrieval(self):
1772 """Return True if we need to continue retrieving some tasks."""
1774 # If the input load is still being iterated over, it means that tasks
1775 # are still on the dispatch waitlist and their results will need to
1776 # be retrieved later on.
1777 if self._iterating:
1778 return True
1780 # If some of the dispatched tasks are still being processed by the
1781 # workers, wait for the compute to finish before starting retrieval
1782 if self.n_completed_tasks < self.n_dispatched_tasks:
1783 return True
1785 # For backends that does not support retrieving asynchronously the
1786 # result to the main process, all results must be carefully retrieved
1787 # in the _retrieve loop in the main thread while the backend is alive.
1788 # For other backends, the actual retrieval is done asynchronously in
1789 # the callback thread, and we can terminate the backend before the
1790 # `self._jobs` result list has been emptied. The remaining results
1791 # will be collected in the `finally` step of the generator.
1792 if not self._backend.supports_retrieve_callback:
1793 if len(self._jobs) > 0:
1794 return True
1796 return False
1798 def _retrieve(self):
1799 timeout_control_job = None
1800 while self._wait_retrieval():
1801 # If the callback thread of a worker has signaled that its task
1802 # triggered an exception, or if the retrieval loop has raised an
1803 # exception (e.g. `GeneratorExit`), exit the loop and surface the
1804 # worker traceback.
1805 if self._aborting:
1806 self._raise_error_fast()
1807 break
1809 nb_jobs = len(self._jobs)
1810 # Now wait for a job to be ready for retrieval.
1811 if self.return_ordered:
1812 # Case ordered: wait for completion (or error) of the next job
1813 # that have been dispatched and not retrieved yet. If no job
1814 # have been dispatched yet, wait for dispatch.
1815 # We assume that the time to wait for the next job to be
1816 # dispatched is always low, so that the timeout
1817 # control only have to be done on the amount of time the next
1818 # dispatched job is pending.
1819 if (nb_jobs == 0) or (
1820 self._jobs[0].get_status(timeout=self.timeout) == TASK_PENDING
1821 ):
1822 time.sleep(0.01)
1823 continue
1825 elif nb_jobs == 0:
1826 # Case unordered: jobs are added to the list of jobs to
1827 # retrieve `self._jobs` only once completed or in error, which
1828 # is too late to enable timeout control in the same way than in
1829 # the previous case.
1830 # Instead, if no job is ready to be retrieved yet, we
1831 # arbitrarily pick a dispatched job, and the timeout control is
1832 # done such that an error is raised if this control job
1833 # timeouts before any other dispatched job has completed and
1834 # been added to `self._jobs` to be retrieved.
1835 if timeout_control_job is None:
1836 timeout_control_job = next(iter(self._jobs_set), None)
1838 # NB: it can be None if no job has been dispatched yet.
1839 if timeout_control_job is not None:
1840 timeout_control_job.get_status(timeout=self.timeout)
1842 time.sleep(0.01)
1843 continue
1845 elif timeout_control_job is not None:
1846 # Case unordered, when `nb_jobs > 0`:
1847 # It means that a job is ready to be retrieved, so no timeout
1848 # will occur during this iteration.
1849 # Before proceeding to retrieval of the next ready job, reset
1850 # the timeout control state to prepare the next iteration.
1851 timeout_control_job._completion_timeout_counter = None
1852 timeout_control_job = None
1854 # We need to be careful: the job list can be filling up as
1855 # we empty it and Python list are not thread-safe by
1856 # default hence the use of the lock
1857 with self._lock:
1858 batched_results = self._jobs.popleft()
1859 if not self.return_ordered:
1860 self._jobs_set.remove(batched_results)
1862 # Flatten the batched results to output one output at a time
1863 batched_results = batched_results.get_result(self.timeout)
1864 for result in batched_results:
1865 self._nb_consumed += 1
1866 yield result
1868 def _raise_error_fast(self):
1869 """If we are aborting, raise if a job caused an error."""
1871 # Find the first job whose status is TASK_ERROR if it exists.
1872 with self._lock:
1873 error_job = next(
1874 (job for job in self._jobs if job.status == TASK_ERROR), None
1875 )
1877 # If this error job exists, immediately raise the error by
1878 # calling get_result. This job might not exists if abort has been
1879 # called directly or if the generator is gc'ed.
1880 if error_job is not None:
1881 error_job.get_result(self.timeout)
1883 def _warn_exit_early(self):
1884 """Warn the user if the generator is gc'ed before being consumned."""
1885 ready_outputs = self.n_completed_tasks - self._nb_consumed
1886 is_completed = self._is_completed()
1887 msg = ""
1888 if ready_outputs:
1889 msg += (
1890 f"{ready_outputs} tasks have been successfully executed but not used."
1891 )
1892 if not is_completed:
1893 msg += " Additionally, "
1895 if not is_completed:
1896 msg += (
1897 f"{self.n_dispatched_tasks - self.n_completed_tasks} tasks "
1898 "which were still being processed by the workers have been "
1899 "cancelled."
1900 )
1902 if msg:
1903 msg += (
1904 " You could benefit from adjusting the input task "
1905 "iterator to limit unnecessary computation time."
1906 )
1908 warnings.warn(msg)
1910 def _get_sequential_output(self, iterable):
1911 """Separate loop for sequential output.
1913 This simplifies the traceback in case of errors and reduces the
1914 overhead of calling sequential tasks with `joblib`.
1915 """
1916 try:
1917 self._iterating = True
1918 self._original_iterator = iterable
1919 batch_size = self._get_batch_size()
1921 if batch_size != 1:
1922 it = iter(iterable)
1923 iterable_batched = iter(
1924 lambda: tuple(itertools.islice(it, batch_size)), ()
1925 )
1926 iterable = (task for batch in iterable_batched for task in batch)
1928 # first yield returns None, for internal use only. This ensures
1929 # that we enter the try/except block and setup the generator.
1930 yield None
1932 # Sequentially call the tasks and yield the results.
1933 for func, args, kwargs in iterable:
1934 self.n_dispatched_batches += 1
1935 self.n_dispatched_tasks += 1
1936 res = func(*args, **kwargs)
1937 self.n_completed_tasks += 1
1938 self.print_progress()
1939 yield res
1940 self._nb_consumed += 1
1941 except BaseException:
1942 self._exception = True
1943 self._aborting = True
1944 self._aborted = True
1945 raise
1946 finally:
1947 self._running = False
1948 self._iterating = False
1949 self._original_iterator = None
1950 self.print_progress()
1952 def _reset_run_tracking(self):
1953 """Reset the counters and flags used to track the execution."""
1955 # Makes sur the parallel instance was not previously running in a
1956 # thread-safe way.
1957 with getattr(self, "_lock", nullcontext()):
1958 if self._running:
1959 msg = "This Parallel instance is already running !"
1960 if self.return_generator is True:
1961 msg += (
1962 " Before submitting new tasks, you must wait for the "
1963 "completion of all the previous tasks, or clean all "
1964 "references to the output generator."
1965 )
1966 raise RuntimeError(msg)
1967 self._running = True
1969 # Counter to keep track of the task dispatched and completed.
1970 self.n_dispatched_batches = 0
1971 self.n_dispatched_tasks = 0
1972 self.n_completed_tasks = 0
1974 # Following count is incremented by one each time the user iterates
1975 # on the output generator, it is used to prepare an informative
1976 # warning message in case the generator is deleted before all the
1977 # dispatched tasks have been consumed.
1978 self._nb_consumed = 0
1980 # Following flags are used to synchronize the threads in case one of
1981 # the tasks error-out to ensure that all workers abort fast and that
1982 # the backend terminates properly.
1984 # Set to True as soon as a worker signals that a task errors-out
1985 self._exception = False
1986 # Set to True in case of early termination following an incident
1987 self._aborting = False
1988 # Set to True after abortion is complete
1989 self._aborted = False
1991 def __call__(self, iterable):
1992 """Main function to dispatch parallel tasks."""
1994 self._reset_run_tracking()
1995 self.n_tasks = len(iterable) if hasattr(iterable, "__len__") else None
1996 self._start_time = time.time()
1998 if not self._managed_backend:
1999 n_jobs = self._initialize_backend()
2000 else:
2001 n_jobs = self._effective_n_jobs()
2003 if n_jobs == 1:
2004 # If n_jobs==1, run the computation sequentially and return
2005 # immediately to avoid overheads.
2006 output = self._get_sequential_output(iterable)
2007 next(output)
2008 return output if self.return_generator else list(output)
2010 # Let's create an ID that uniquely identifies the current call. If the
2011 # call is interrupted early and that the same instance is immediately
2012 # reused, this id will be used to prevent workers that were
2013 # concurrently finalizing a task from the previous call to run the
2014 # callback.
2015 with self._lock:
2016 self._call_id = uuid4().hex
2018 # self._effective_n_jobs should be called in the Parallel.__call__
2019 # thread only -- store its value in an attribute for further queries.
2020 self._cached_effective_n_jobs = n_jobs
2022 if isinstance(self._backend, LokyBackend):
2023 # For the loky backend, we add a callback executed when reducing
2024 # BatchCalls, that makes the loky executor use a temporary folder
2025 # specific to this Parallel object when pickling temporary memmaps.
2026 # This callback is necessary to ensure that several Parallel
2027 # objects using the same reusable executor don't use the same
2028 # temporary resources.
2030 def _batched_calls_reducer_callback():
2031 # Relevant implementation detail: the following lines, called
2032 # when reducing BatchedCalls, are called in a thread-safe
2033 # situation, meaning that the context of the temporary folder
2034 # manager will not be changed in between the callback execution
2035 # and the end of the BatchedCalls pickling. The reason is that
2036 # pickling (the only place where set_current_context is used)
2037 # is done from a single thread (the queue_feeder_thread).
2038 self._backend._workers._temp_folder_manager.set_current_context( # noqa
2039 self._id
2040 )
2042 self._reducer_callback = _batched_calls_reducer_callback
2044 # self._effective_n_jobs should be called in the Parallel.__call__
2045 # thread only -- store its value in an attribute for further queries.
2046 self._cached_effective_n_jobs = n_jobs
2048 backend_name = self._backend.__class__.__name__
2049 if n_jobs == 0:
2050 raise RuntimeError("%s has no active worker." % backend_name)
2052 self._print(f"Using backend {backend_name} with {n_jobs} concurrent workers.")
2053 if hasattr(self._backend, "start_call"):
2054 self._backend.start_call()
2056 # Following flag prevents double calls to `backend.stop_call`.
2057 self._calling = True
2059 iterator = iter(iterable)
2060 pre_dispatch = self.pre_dispatch
2062 if pre_dispatch == "all":
2063 # prevent further dispatch via multiprocessing callback thread
2064 self._original_iterator = None
2065 self._pre_dispatch_amount = 0
2066 else:
2067 self._original_iterator = iterator
2068 if hasattr(pre_dispatch, "endswith"):
2069 pre_dispatch = eval_expr(pre_dispatch.replace("n_jobs", str(n_jobs)))
2070 self._pre_dispatch_amount = pre_dispatch = int(pre_dispatch)
2072 # The main thread will consume the first pre_dispatch items and
2073 # the remaining items will later be lazily dispatched by async
2074 # callbacks upon task completions.
2076 # TODO: this iterator should be batch_size * n_jobs
2077 iterator = itertools.islice(iterator, self._pre_dispatch_amount)
2079 # Use a caching dict for callables that are pickled with cloudpickle to
2080 # improve performances. This cache is used only in the case of
2081 # functions that are defined in the __main__ module, functions that
2082 # are defined locally (inside another function) and lambda expressions.
2083 self._pickle_cache = dict()
2085 output = self._get_outputs(iterator, pre_dispatch)
2086 self._call_ref = weakref.ref(output)
2088 # The first item from the output is blank, but it makes the interpreter
2089 # progress until it enters the Try/Except block of the generator and
2090 # reaches the first `yield` statement. This starts the asynchronous
2091 # dispatch of the tasks to the workers.
2092 next(output)
2094 return output if self.return_generator else list(output)
2096 def __repr__(self):
2097 return "%s(n_jobs=%s)" % (self.__class__.__name__, self.n_jobs)