Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/joblib/parallel.py: 16%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""
2Helpers for embarrassingly parallel code.
3"""
4# Author: Gael Varoquaux < gael dot varoquaux at normalesup dot org >
5# Copyright: 2010, Gael Varoquaux
6# License: BSD 3 clause
8from __future__ import division
10import collections
11import functools
12import itertools
13import os
14import queue
15import sys
16import threading
17import time
18import warnings
19import weakref
20from contextlib import nullcontext
21from math import floor, log10, sqrt
22from multiprocessing import TimeoutError
23from numbers import Integral
24from uuid import uuid4
26from ._multiprocessing_helpers import mp
28# Make sure that those two classes are part of the public joblib.parallel API
29# so that 3rd party backend implementers can import them from here.
30from ._parallel_backends import (
31 AutoBatchingMixin, # noqa
32 FallbackToBackend,
33 LokyBackend,
34 MultiprocessingBackend,
35 ParallelBackendBase, # noqa
36 SequentialBackend,
37 ThreadingBackend,
38)
39from ._utils import _Sentinel, eval_expr
40from .disk import memstr_to_bytes
41from .logger import Logger, short_format_time
43BACKENDS = {
44 "threading": ThreadingBackend,
45 "sequential": SequentialBackend,
46}
47# name of the backend used by default by Parallel outside of any context
48# managed by ``parallel_config`` or ``parallel_backend``.
50# threading is the only backend that is always everywhere
51DEFAULT_BACKEND = "threading"
52DEFAULT_THREAD_BACKEND = "threading"
53DEFAULT_PROCESS_BACKEND = "threading"
55MAYBE_AVAILABLE_BACKENDS = {"multiprocessing", "loky"}
57# if multiprocessing is available, so is loky, we set it as the default
58# backend
59if mp is not None:
60 BACKENDS["multiprocessing"] = MultiprocessingBackend
61 from .externals import loky
63 BACKENDS["loky"] = LokyBackend
64 DEFAULT_BACKEND = "loky"
65 DEFAULT_PROCESS_BACKEND = "loky"
67# Thread local value that can be overridden by the ``parallel_config`` context
68# manager
69_backend = threading.local()
72def _register_dask():
73 """Register Dask Backend if called with parallel_config(backend="dask")"""
74 try:
75 from ._dask import DaskDistributedBackend
77 register_parallel_backend("dask", DaskDistributedBackend)
78 except ImportError as e:
79 msg = (
80 "To use the dask.distributed backend you must install both "
81 "the `dask` and distributed modules.\n\n"
82 "See https://dask.pydata.org/en/latest/install.html for more "
83 "information."
84 )
85 raise ImportError(msg) from e
88EXTERNAL_BACKENDS = {
89 "dask": _register_dask,
90}
93# Sentinels for the default values of the Parallel constructor and
94# the parallel_config and parallel_backend context managers
95default_parallel_config = {
96 "backend": _Sentinel(default_value=None),
97 "n_jobs": _Sentinel(default_value=None),
98 "verbose": _Sentinel(default_value=0),
99 "temp_folder": _Sentinel(default_value=None),
100 "max_nbytes": _Sentinel(default_value="1M"),
101 "mmap_mode": _Sentinel(default_value="r"),
102 "prefer": _Sentinel(default_value=None),
103 "require": _Sentinel(default_value=None),
104}
107VALID_BACKEND_HINTS = ("processes", "threads", None)
108VALID_BACKEND_CONSTRAINTS = ("sharedmem", None)
111def _get_config_param(param, context_config, key):
112 """Return the value of a parallel config parameter
114 Explicitly setting it in Parallel has priority over setting in a
115 parallel_(config/backend) context manager.
116 """
117 if param is not default_parallel_config[key]:
118 # param is explicitly set, return it
119 return param
121 if context_config[key] is not default_parallel_config[key]:
122 # there's a context manager and the key is set, return it
123 return context_config[key]
125 # Otherwise, we are in the default_parallel_config,
126 # return the default value
127 return param.default_value
130def get_active_backend(
131 prefer=default_parallel_config["prefer"],
132 require=default_parallel_config["require"],
133 verbose=default_parallel_config["verbose"],
134):
135 """Return the active default backend"""
136 backend, config = _get_active_backend(prefer, require, verbose)
137 n_jobs = _get_config_param(default_parallel_config["n_jobs"], config, "n_jobs")
138 return backend, n_jobs
141def _get_active_backend(
142 prefer=default_parallel_config["prefer"],
143 require=default_parallel_config["require"],
144 verbose=default_parallel_config["verbose"],
145):
146 """Return the active default backend"""
148 backend_config = getattr(_backend, "config", default_parallel_config)
150 backend = _get_config_param(
151 default_parallel_config["backend"], backend_config, "backend"
152 )
154 prefer = _get_config_param(prefer, backend_config, "prefer")
155 require = _get_config_param(require, backend_config, "require")
156 verbose = _get_config_param(verbose, backend_config, "verbose")
158 if prefer not in VALID_BACKEND_HINTS:
159 raise ValueError(
160 f"prefer={prefer} is not a valid backend hint, "
161 f"expected one of {VALID_BACKEND_HINTS}"
162 )
163 if require not in VALID_BACKEND_CONSTRAINTS:
164 raise ValueError(
165 f"require={require} is not a valid backend constraint, "
166 f"expected one of {VALID_BACKEND_CONSTRAINTS}"
167 )
168 if prefer == "processes" and require == "sharedmem":
169 raise ValueError(
170 "prefer == 'processes' and require == 'sharedmem' are inconsistent settings"
171 )
173 explicit_backend = True
174 if backend is None:
175 # We are either outside of the scope of any parallel_(config/backend)
176 # context manager or the context manager did not set a backend.
177 # create the default backend instance now.
178 backend = BACKENDS[DEFAULT_BACKEND](nesting_level=0)
179 explicit_backend = False
181 # Try to use the backend set by the user with the context manager.
183 nesting_level = backend.nesting_level
184 uses_threads = getattr(backend, "uses_threads", False)
185 supports_sharedmem = getattr(backend, "supports_sharedmem", False)
186 # Force to use thread-based backend if the provided backend does not
187 # match the shared memory constraint or if the backend is not explicitly
188 # given and threads are preferred.
189 force_threads = (require == "sharedmem" and not supports_sharedmem) or (
190 not explicit_backend and prefer == "threads" and not uses_threads
191 )
192 force_processes = not explicit_backend and prefer == "processes" and uses_threads
194 if force_threads:
195 # This backend does not match the shared memory constraint:
196 # fallback to the default thead-based backend.
197 sharedmem_backend = BACKENDS[DEFAULT_THREAD_BACKEND](
198 nesting_level=nesting_level
199 )
200 # Warn the user if we forced the backend to thread-based, while the
201 # user explicitly specified a non-thread-based backend.
202 if verbose >= 10 and explicit_backend:
203 print(
204 f"Using {sharedmem_backend.__class__.__name__} as "
205 f"joblib backend instead of {backend.__class__.__name__} "
206 "as the latter does not provide shared memory semantics."
207 )
208 # Force to n_jobs=1 by default
209 thread_config = backend_config.copy()
210 thread_config["n_jobs"] = 1
211 return sharedmem_backend, thread_config
213 if force_processes:
214 # This backend does not match the prefer="processes" constraint:
215 # fallback to the default process-based backend.
216 process_backend = BACKENDS[DEFAULT_PROCESS_BACKEND](nesting_level=nesting_level)
218 return process_backend, backend_config.copy()
220 return backend, backend_config
223class parallel_config:
224 """Set the default backend or configuration for :class:`~joblib.Parallel`.
226 This is an alternative to directly passing keyword arguments to the
227 :class:`~joblib.Parallel` class constructor. It is particularly useful when
228 calling into library code that uses joblib internally but does not expose
229 the various parallel configuration arguments in its own API.
231 Parameters
232 ----------
233 backend: str or ParallelBackendBase instance, default=None
234 If ``backend`` is a string it must match a previously registered
235 implementation using the :func:`~register_parallel_backend` function.
237 By default the following backends are available:
239 - 'loky': single-host, process-based parallelism (used by default),
240 - 'threading': single-host, thread-based parallelism,
241 - 'multiprocessing': legacy single-host, process-based parallelism.
243 'loky' is recommended to run functions that manipulate Python objects.
244 'threading' is a low-overhead alternative that is most efficient for
245 functions that release the Global Interpreter Lock: e.g. I/O-bound
246 code or CPU-bound code in a few calls to native code that explicitly
247 releases the GIL. Note that on some rare systems (such as pyodide),
248 multiprocessing and loky may not be available, in which case joblib
249 defaults to threading.
251 In addition, if the ``dask`` and ``distributed`` Python packages are
252 installed, it is possible to use the 'dask' backend for better
253 scheduling of nested parallel calls without over-subscription and
254 potentially distribute parallel calls over a networked cluster of
255 several hosts.
257 It is also possible to use the distributed 'ray' backend for
258 distributing the workload to a cluster of nodes. See more details
259 in the Examples section below.
261 Alternatively the backend can be passed directly as an instance.
263 n_jobs: int, default=None
264 The maximum number of concurrently running jobs, such as the number
265 of Python worker processes when ``backend="loky"`` or the size of the
266 thread-pool when ``backend="threading"``.
267 This argument is converted to an integer, rounded below for float.
268 If -1 is given, `joblib` tries to use all CPUs. The number of CPUs
269 ``n_cpus`` is obtained with :func:`~cpu_count`.
270 For n_jobs below -1, (n_cpus + 1 + n_jobs) are used. For instance,
271 using ``n_jobs=-2`` will result in all CPUs but one being used.
272 This argument can also go above ``n_cpus``, which will cause
273 oversubscription. In some cases, slight oversubscription can be
274 beneficial, e.g., for tasks with large I/O operations.
275 If 1 is given, no parallel computing code is used at all, and the
276 behavior amounts to a simple python `for` loop. This mode is not
277 compatible with `timeout`.
278 None is a marker for 'unset' that will be interpreted as n_jobs=1
279 unless the call is performed under a :func:`~parallel_config`
280 context manager that sets another value for ``n_jobs``.
281 If n_jobs = 0 then a ValueError is raised.
283 verbose: int, default=0
284 The verbosity level: if non zero, progress messages are
285 printed. Above 50, the output is sent to stdout.
286 The frequency of the messages increases with the verbosity level.
287 If it more than 10, all iterations are reported.
289 temp_folder: str or None, default=None
290 Folder to be used by the pool for memmapping large arrays
291 for sharing memory with worker processes. If None, this will try in
292 order:
294 - a folder pointed by the ``JOBLIB_TEMP_FOLDER`` environment
295 variable,
296 - ``/dev/shm`` if the folder exists and is writable: this is a
297 RAM disk filesystem available by default on modern Linux
298 distributions,
299 - the default system temporary folder that can be
300 overridden with ``TMP``, ``TMPDIR`` or ``TEMP`` environment
301 variables, typically ``/tmp`` under Unix operating systems.
303 max_nbytes: int, str, or None, optional, default='1M'
304 Threshold on the size of arrays passed to the workers that
305 triggers automated memory mapping in temp_folder. Can be an int
306 in Bytes, or a human-readable string, e.g., '1M' for 1 megabyte.
307 Use None to disable memmapping of large arrays.
309 mmap_mode: {None, 'r+', 'r', 'w+', 'c'}, default='r'
310 Memmapping mode for numpy arrays passed to workers. None will
311 disable memmapping, other modes defined in the numpy.memmap doc:
312 https://numpy.org/doc/stable/reference/generated/numpy.memmap.html
313 Also, see 'max_nbytes' parameter documentation for more details.
315 prefer: str in {'processes', 'threads'} or None, default=None
316 Soft hint to choose the default backend.
317 The default process-based backend is 'loky' and the default
318 thread-based backend is 'threading'. Ignored if the ``backend``
319 parameter is specified.
321 require: 'sharedmem' or None, default=None
322 Hard constraint to select the backend. If set to 'sharedmem',
323 the selected backend will be single-host and thread-based.
325 inner_max_num_threads: int, default=None
326 If not None, overwrites the limit set on the number of threads
327 usable in some third-party library threadpools like OpenBLAS,
328 MKL or OpenMP. This is only used with the ``loky`` backend.
330 backend_params: dict
331 Additional parameters to pass to the backend constructor when
332 backend is a string.
334 Notes
335 -----
336 Joblib tries to limit the oversubscription by limiting the number of
337 threads usable in some third-party library threadpools like OpenBLAS, MKL
338 or OpenMP. The default limit in each worker is set to
339 ``max(cpu_count() // effective_n_jobs, 1)`` but this limit can be
340 overwritten with the ``inner_max_num_threads`` argument which will be used
341 to set this limit in the child processes.
343 .. versionadded:: 1.3
345 Examples
346 --------
347 >>> from operator import neg
348 >>> with parallel_config(backend='threading'):
349 ... print(Parallel()(delayed(neg)(i + 1) for i in range(5)))
350 ...
351 [-1, -2, -3, -4, -5]
353 To use the 'ray' joblib backend add the following lines:
355 >>> from ray.util.joblib import register_ray # doctest: +SKIP
356 >>> register_ray() # doctest: +SKIP
357 >>> with parallel_config(backend="ray"): # doctest: +SKIP
358 ... print(Parallel()(delayed(neg)(i + 1) for i in range(5)))
359 [-1, -2, -3, -4, -5]
361 """
363 def __init__(
364 self,
365 backend=default_parallel_config["backend"],
366 *,
367 n_jobs=default_parallel_config["n_jobs"],
368 verbose=default_parallel_config["verbose"],
369 temp_folder=default_parallel_config["temp_folder"],
370 max_nbytes=default_parallel_config["max_nbytes"],
371 mmap_mode=default_parallel_config["mmap_mode"],
372 prefer=default_parallel_config["prefer"],
373 require=default_parallel_config["require"],
374 inner_max_num_threads=None,
375 **backend_params,
376 ):
377 # Save the parallel info and set the active parallel config
378 self.old_parallel_config = getattr(_backend, "config", default_parallel_config)
380 backend = self._check_backend(backend, inner_max_num_threads, **backend_params)
382 new_config = {
383 "n_jobs": n_jobs,
384 "verbose": verbose,
385 "temp_folder": temp_folder,
386 "max_nbytes": max_nbytes,
387 "mmap_mode": mmap_mode,
388 "prefer": prefer,
389 "require": require,
390 "backend": backend,
391 }
392 self.parallel_config = self.old_parallel_config.copy()
393 self.parallel_config.update(
394 {k: v for k, v in new_config.items() if not isinstance(v, _Sentinel)}
395 )
397 setattr(_backend, "config", self.parallel_config)
399 def _check_backend(self, backend, inner_max_num_threads, **backend_params):
400 if backend is default_parallel_config["backend"]:
401 if inner_max_num_threads is not None or len(backend_params) > 0:
402 raise ValueError(
403 "inner_max_num_threads and other constructor "
404 "parameters backend_params are only supported "
405 "when backend is not None."
406 )
407 return backend
409 if isinstance(backend, str):
410 # Handle non-registered or missing backends
411 if backend not in BACKENDS:
412 if backend in EXTERNAL_BACKENDS:
413 register = EXTERNAL_BACKENDS[backend]
414 register()
415 elif backend in MAYBE_AVAILABLE_BACKENDS:
416 warnings.warn(
417 f"joblib backend '{backend}' is not available on "
418 f"your system, falling back to {DEFAULT_BACKEND}.",
419 UserWarning,
420 stacklevel=2,
421 )
422 BACKENDS[backend] = BACKENDS[DEFAULT_BACKEND]
423 else:
424 raise ValueError(
425 f"Invalid backend: {backend}, expected one of "
426 f"{sorted(BACKENDS.keys())}"
427 )
429 backend = BACKENDS[backend](**backend_params)
430 else:
431 if len(backend_params) > 0:
432 raise ValueError(
433 "Constructor parameters backend_params are only "
434 "supported when backend is a string."
435 )
437 if inner_max_num_threads is not None:
438 msg = (
439 f"{backend.__class__.__name__} does not accept setting the "
440 "inner_max_num_threads argument."
441 )
442 assert backend.supports_inner_max_num_threads, msg
443 backend.inner_max_num_threads = inner_max_num_threads
445 # If the nesting_level of the backend is not set previously, use the
446 # nesting level from the previous active_backend to set it
447 if backend.nesting_level is None:
448 parent_backend = self.old_parallel_config["backend"]
449 if parent_backend is default_parallel_config["backend"]:
450 nesting_level = 0
451 else:
452 nesting_level = parent_backend.nesting_level
453 backend.nesting_level = nesting_level
455 return backend
457 def __enter__(self):
458 return self.parallel_config
460 def __exit__(self, type, value, traceback):
461 self.unregister()
463 def unregister(self):
464 setattr(_backend, "config", self.old_parallel_config)
467class parallel_backend(parallel_config):
468 """Change the default backend used by Parallel inside a with block.
470 .. warning::
471 It is advised to use the :class:`~joblib.parallel_config` context
472 manager instead, which allows more fine-grained control over the
473 backend configuration.
475 If ``backend`` is a string it must match a previously registered
476 implementation using the :func:`~register_parallel_backend` function.
478 By default the following backends are available:
480 - 'loky': single-host, process-based parallelism (used by default),
481 - 'threading': single-host, thread-based parallelism,
482 - 'multiprocessing': legacy single-host, process-based parallelism.
484 'loky' is recommended to run functions that manipulate Python objects.
485 'threading' is a low-overhead alternative that is most efficient for
486 functions that release the Global Interpreter Lock: e.g. I/O-bound code or
487 CPU-bound code in a few calls to native code that explicitly releases the
488 GIL. Note that on some rare systems (such as Pyodide),
489 multiprocessing and loky may not be available, in which case joblib
490 defaults to threading.
492 You can also use the `Dask <https://docs.dask.org/en/stable/>`_ joblib
493 backend to distribute work across machines. This works well with
494 scikit-learn estimators with the ``n_jobs`` parameter, for example::
496 >>> import joblib # doctest: +SKIP
497 >>> from sklearn.model_selection import GridSearchCV # doctest: +SKIP
498 >>> from dask.distributed import Client, LocalCluster # doctest: +SKIP
500 >>> # create a local Dask cluster
501 >>> cluster = LocalCluster() # doctest: +SKIP
502 >>> client = Client(cluster) # doctest: +SKIP
503 >>> grid_search = GridSearchCV(estimator, param_grid, n_jobs=-1)
504 ... # doctest: +SKIP
505 >>> with joblib.parallel_backend("dask", scatter=[X, y]): # doctest: +SKIP
506 ... grid_search.fit(X, y)
508 It is also possible to use the distributed 'ray' backend for distributing
509 the workload to a cluster of nodes. To use the 'ray' joblib backend add
510 the following lines::
512 >>> from ray.util.joblib import register_ray # doctest: +SKIP
513 >>> register_ray() # doctest: +SKIP
514 >>> with parallel_backend("ray"): # doctest: +SKIP
515 ... print(Parallel()(delayed(neg)(i + 1) for i in range(5)))
516 [-1, -2, -3, -4, -5]
518 Alternatively the backend can be passed directly as an instance.
520 By default all available workers will be used (``n_jobs=-1``) unless the
521 caller passes an explicit value for the ``n_jobs`` parameter.
523 This is an alternative to passing a ``backend='backend_name'`` argument to
524 the :class:`~Parallel` class constructor. It is particularly useful when
525 calling into library code that uses joblib internally but does not expose
526 the backend argument in its own API.
528 >>> from operator import neg
529 >>> with parallel_backend('threading'):
530 ... print(Parallel()(delayed(neg)(i + 1) for i in range(5)))
531 ...
532 [-1, -2, -3, -4, -5]
534 Joblib also tries to limit the oversubscription by limiting the number of
535 threads usable in some third-party library threadpools like OpenBLAS, MKL
536 or OpenMP. The default limit in each worker is set to
537 ``max(cpu_count() // effective_n_jobs, 1)`` but this limit can be
538 overwritten with the ``inner_max_num_threads`` argument which will be used
539 to set this limit in the child processes.
541 .. versionadded:: 0.10
543 See Also
544 --------
545 joblib.parallel_config: context manager to change the backend configuration.
546 """
548 def __init__(
549 self, backend, n_jobs=-1, inner_max_num_threads=None, **backend_params
550 ):
551 super().__init__(
552 backend=backend,
553 n_jobs=n_jobs,
554 inner_max_num_threads=inner_max_num_threads,
555 **backend_params,
556 )
558 if self.old_parallel_config is None:
559 self.old_backend_and_jobs = None
560 else:
561 self.old_backend_and_jobs = (
562 self.old_parallel_config["backend"],
563 self.old_parallel_config["n_jobs"],
564 )
565 self.new_backend_and_jobs = (
566 self.parallel_config["backend"],
567 self.parallel_config["n_jobs"],
568 )
570 def __enter__(self):
571 return self.new_backend_and_jobs
574# Under Linux or OS X the default start method of multiprocessing
575# can cause third party libraries to crash. Under Python 3.4+ it is possible
576# to set an environment variable to switch the default start method from
577# 'fork' to 'forkserver' or 'spawn' to avoid this issue albeit at the cost
578# of causing semantic changes and some additional pool instantiation overhead.
579DEFAULT_MP_CONTEXT = None
580if hasattr(mp, "get_context"):
581 method = os.environ.get("JOBLIB_START_METHOD", "").strip() or None
582 if method is not None:
583 DEFAULT_MP_CONTEXT = mp.get_context(method=method)
586class BatchedCalls(object):
587 """Wrap a sequence of (func, args, kwargs) tuples as a single callable"""
589 def __init__(
590 self, iterator_slice, backend_and_jobs, reducer_callback=None, pickle_cache=None
591 ):
592 self.items = list(iterator_slice)
593 self._size = len(self.items)
594 self._reducer_callback = reducer_callback
595 if isinstance(backend_and_jobs, tuple):
596 self._backend, self._n_jobs = backend_and_jobs
597 else:
598 # this is for backward compatibility purposes. Before 0.12.6,
599 # nested backends were returned without n_jobs indications.
600 self._backend, self._n_jobs = backend_and_jobs, None
601 self._pickle_cache = pickle_cache if pickle_cache is not None else {}
603 def __call__(self):
604 # Set the default nested backend to self._backend but do not set the
605 # change the default number of processes to -1
606 with parallel_config(backend=self._backend, n_jobs=self._n_jobs):
607 return [func(*args, **kwargs) for func, args, kwargs in self.items]
609 def __reduce__(self):
610 if self._reducer_callback is not None:
611 self._reducer_callback()
612 # no need to pickle the callback.
613 return (
614 BatchedCalls,
615 (self.items, (self._backend, self._n_jobs), None, self._pickle_cache),
616 )
618 def __len__(self):
619 return self._size
622# Possible exit status for a task
623TASK_DONE = "Done"
624TASK_ERROR = "Error"
625TASK_PENDING = "Pending"
628###############################################################################
629# CPU count that works also when multiprocessing has been disabled via
630# the JOBLIB_MULTIPROCESSING environment variable
631def cpu_count(only_physical_cores=False):
632 """Return the number of CPUs.
634 This delegates to loky.cpu_count that takes into account additional
635 constraints such as Linux CFS scheduler quotas (typically set by container
636 runtimes such as docker) and CPU affinity (for instance using the taskset
637 command on Linux).
639 Parameters
640 ----------
641 only_physical_cores : boolean, default=False
642 If True, does not take hyperthreading / SMT logical cores into account.
644 """
645 if mp is None:
646 return 1
648 return loky.cpu_count(only_physical_cores=only_physical_cores)
651###############################################################################
652# For verbosity
655def _verbosity_filter(index, verbose):
656 """Returns False for indices increasingly apart, the distance
657 depending on the value of verbose.
659 We use a lag increasing as the square of index
660 """
661 if not verbose:
662 return True
663 elif verbose > 10:
664 return False
665 if index == 0:
666 return False
667 verbose = 0.5 * (11 - verbose) ** 2
668 scale = sqrt(index / verbose)
669 next_scale = sqrt((index + 1) / verbose)
670 return int(next_scale) == int(scale)
673###############################################################################
674def delayed(function):
675 """Decorator used to capture the arguments of a function."""
677 def delayed_function(*args, **kwargs):
678 return function, args, kwargs
680 try:
681 delayed_function = functools.wraps(function)(delayed_function)
682 except AttributeError:
683 " functools.wraps fails on some callable objects "
684 return delayed_function
687###############################################################################
688class BatchCompletionCallBack(object):
689 """Callback to keep track of completed results and schedule the next tasks.
691 This callable is executed by the parent process whenever a worker process
692 has completed a batch of tasks.
694 It is used for progress reporting, to update estimate of the batch
695 processing duration and to schedule the next batch of tasks to be
696 processed.
698 It is assumed that this callback will always be triggered by the backend
699 right after the end of a task, in case of success as well as in case of
700 failure.
701 """
703 ##########################################################################
704 # METHODS CALLED BY THE MAIN THREAD #
705 ##########################################################################
706 def __init__(self, dispatch_timestamp, batch_size, parallel):
707 self.dispatch_timestamp = dispatch_timestamp
708 self.batch_size = batch_size
709 self.parallel = parallel
710 self.parallel_call_id = parallel._call_id
711 self._completion_timeout_counter = None
713 # Internals to keep track of the status and outcome of the task.
715 # Used to hold a reference to the future-like object returned by the
716 # backend after launching this task
717 # This will be set later when calling `register_job`, as it is only
718 # created once the task has been submitted.
719 self.job = None
721 if not parallel._backend.supports_retrieve_callback:
722 # The status is only used for asynchronous result retrieval in the
723 # callback.
724 self.status = None
725 else:
726 # The initial status for the job is TASK_PENDING.
727 # Once it is done, it will be either TASK_DONE, or TASK_ERROR.
728 self.status = TASK_PENDING
730 def register_job(self, job):
731 """Register the object returned by `submit`."""
732 self.job = job
734 def get_result(self, timeout):
735 """Returns the raw result of the task that was submitted.
737 If the task raised an exception rather than returning, this same
738 exception will be raised instead.
740 If the backend supports the retrieval callback, it is assumed that this
741 method is only called after the result has been registered. It is
742 ensured by checking that `self.status(timeout)` does not return
743 TASK_PENDING. In this case, `get_result` directly returns the
744 registered result (or raise the registered exception).
746 For other backends, there are no such assumptions, but `get_result`
747 still needs to synchronously retrieve the result before it can
748 return it or raise. It will block at most `self.timeout` seconds
749 waiting for retrieval to complete, after that it raises a TimeoutError.
750 """
752 backend = self.parallel._backend
754 if backend.supports_retrieve_callback:
755 # We assume that the result has already been retrieved by the
756 # callback thread, and is stored internally. It's just waiting to
757 # be returned.
758 return self._return_or_raise()
760 # For other backends, the main thread needs to run the retrieval step.
761 try:
762 result = backend.retrieve_result(self.job, timeout=timeout)
763 outcome = dict(result=result, status=TASK_DONE)
764 except BaseException as e:
765 outcome = dict(result=e, status=TASK_ERROR)
766 self._register_outcome(outcome)
768 return self._return_or_raise()
770 def _return_or_raise(self):
771 try:
772 if self.status == TASK_ERROR:
773 raise self._result
774 return self._result
775 finally:
776 del self._result
778 def get_status(self, timeout):
779 """Get the status of the task.
781 This function also checks if the timeout has been reached and register
782 the TimeoutError outcome when it is the case.
783 """
784 if timeout is None or self.status != TASK_PENDING:
785 return self.status
787 # The computation are running and the status is pending.
788 # Check that we did not wait for this jobs more than `timeout`.
789 now = time.time()
790 if self._completion_timeout_counter is None:
791 self._completion_timeout_counter = now
793 if (now - self._completion_timeout_counter) > timeout:
794 outcome = dict(result=TimeoutError(), status=TASK_ERROR)
795 self._register_outcome(outcome)
797 return self.status
799 ##########################################################################
800 # METHODS CALLED BY CALLBACK THREADS #
801 ##########################################################################
802 def __call__(self, *args, **kwargs):
803 """Function called by the callback thread after a job is completed."""
805 # If the backend doesn't support callback retrievals, the next batch of
806 # tasks is dispatched regardless. The result will be retrieved by the
807 # main thread when calling `get_result`.
808 if not self.parallel._backend.supports_retrieve_callback:
809 self._dispatch_new()
810 return
812 # If the backend supports retrieving the result in the callback, it
813 # registers the task outcome (TASK_ERROR or TASK_DONE), and schedules
814 # the next batch if needed.
815 with self.parallel._lock:
816 # Edge case where while the task was processing, the `parallel`
817 # instance has been reset and a new call has been issued, but the
818 # worker managed to complete the task and trigger this callback
819 # call just before being aborted by the reset.
820 if self.parallel._call_id != self.parallel_call_id:
821 return
823 # When aborting, stop as fast as possible and do not retrieve the
824 # result as it won't be returned by the Parallel call.
825 if self.parallel._aborting:
826 return
828 # Retrieves the result of the task in the main process and dispatch
829 # a new batch if needed.
830 job_succeeded = self._retrieve_result(*args, **kwargs)
832 if job_succeeded:
833 self._dispatch_new()
835 def _dispatch_new(self):
836 """Schedule the next batch of tasks to be processed."""
838 # This steps ensure that auto-batching works as expected.
839 this_batch_duration = time.time() - self.dispatch_timestamp
840 self.parallel._backend.batch_completed(self.batch_size, this_batch_duration)
842 # Schedule the next batch of tasks.
843 with self.parallel._lock:
844 self.parallel.n_completed_tasks += self.batch_size
845 self.parallel.print_progress()
846 if self.parallel._original_iterator is not None:
847 self.parallel.dispatch_next()
849 def _retrieve_result(self, out):
850 """Fetch and register the outcome of a task.
852 Return True if the task succeeded, False otherwise.
853 This function is only called by backends that support retrieving
854 the task result in the callback thread.
855 """
856 try:
857 result = self.parallel._backend.retrieve_result_callback(out)
858 outcome = dict(status=TASK_DONE, result=result)
859 except BaseException as e:
860 # Avoid keeping references to parallel in the error.
861 e.__traceback__ = None
862 outcome = dict(result=e, status=TASK_ERROR)
864 self._register_outcome(outcome)
865 return outcome["status"] != TASK_ERROR
867 ##########################################################################
868 # This method can be called either in the main thread #
869 # or in the callback thread. #
870 ##########################################################################
871 def _register_outcome(self, outcome):
872 """Register the outcome of a task.
874 This method can be called only once, future calls will be ignored.
875 """
876 # Covers the edge case where the main thread tries to register a
877 # `TimeoutError` while the callback thread tries to register a result
878 # at the same time.
879 with self.parallel._lock:
880 if self.status not in (TASK_PENDING, None):
881 return
882 self.status = outcome["status"]
884 self._result = outcome["result"]
886 # Once the result and the status are extracted, the last reference to
887 # the job can be deleted.
888 self.job = None
890 # As soon as an error as been spotted, early stopping flags are sent to
891 # the `parallel` instance.
892 if self.status == TASK_ERROR:
893 self.parallel._exception = True
894 self.parallel._aborting = True
896 if self.parallel.return_ordered:
897 return
899 with self.parallel._lock:
900 # For `return_as=generator_unordered`, append the job to the queue
901 # in the order of completion instead of submission.
902 self.parallel._jobs.append(self)
905###############################################################################
906def register_parallel_backend(name, factory, make_default=False):
907 """Register a new Parallel backend factory.
909 The new backend can then be selected by passing its name as the backend
910 argument to the :class:`~Parallel` class. Moreover, the default backend can
911 be overwritten globally by setting make_default=True.
913 The factory can be any callable that takes no argument and return an
914 instance of ``ParallelBackendBase``.
916 Warning: this function is experimental and subject to change in a future
917 version of joblib.
919 .. versionadded:: 0.10
920 """
921 BACKENDS[name] = factory
922 if make_default:
923 global DEFAULT_BACKEND
924 DEFAULT_BACKEND = name
927def effective_n_jobs(n_jobs=-1):
928 """Determine the number of jobs that can actually run in parallel
930 n_jobs is the number of workers requested by the callers. Passing n_jobs=-1
931 means requesting all available workers for instance matching the number of
932 CPU cores on the worker host(s).
934 This method should return a guesstimate of the number of workers that can
935 actually perform work concurrently with the currently enabled default
936 backend. The primary use case is to make it possible for the caller to know
937 in how many chunks to slice the work.
939 In general working on larger data chunks is more efficient (less scheduling
940 overhead and better use of CPU cache prefetching heuristics) as long as all
941 the workers have enough work to do.
943 Warning: this function is experimental and subject to change in a future
944 version of joblib.
946 .. versionadded:: 0.10
947 """
948 if n_jobs == 1:
949 return 1
951 backend, backend_n_jobs = get_active_backend()
952 if n_jobs is None:
953 n_jobs = backend_n_jobs
954 return backend.effective_n_jobs(n_jobs=n_jobs)
957###############################################################################
958class Parallel(Logger):
959 """Helper class for readable parallel mapping.
961 Read more in the :ref:`User Guide <parallel>`.
963 Parameters
964 ----------
965 n_jobs: int, default=None
966 The maximum number of concurrently running jobs, such as the number
967 of Python worker processes when ``backend="loky"`` or the size of
968 the thread-pool when ``backend="threading"``.
969 This argument is converted to an integer, rounded below for float.
970 If -1 is given, `joblib` tries to use all CPUs. The number of CPUs
971 ``n_cpus`` is obtained with :func:`~cpu_count`.
972 For n_jobs below -1, (n_cpus + 1 + n_jobs) are used. For instance,
973 using ``n_jobs=-2`` will result in all CPUs but one being used.
974 This argument can also go above ``n_cpus``, which will cause
975 oversubscription. In some cases, slight oversubscription can be
976 beneficial, e.g., for tasks with large I/O operations.
977 If 1 is given, no parallel computing code is used at all, and the
978 behavior amounts to a simple python `for` loop. This mode is not
979 compatible with ``timeout``.
980 None is a marker for 'unset' that will be interpreted as n_jobs=1
981 unless the call is performed under a :func:`~parallel_config`
982 context manager that sets another value for ``n_jobs``.
983 If n_jobs = 0 then a ValueError is raised.
984 backend: str, ParallelBackendBase instance or None, default='loky'
985 Specify the parallelization backend implementation.
986 Supported backends are:
988 - "loky" used by default, can induce some
989 communication and memory overhead when exchanging input and
990 output data with the worker Python processes. On some rare
991 systems (such as Pyiodide), the loky backend may not be
992 available.
993 - "multiprocessing" previous process-based backend based on
994 `multiprocessing.Pool`. Less robust than `loky`.
995 - "threading" is a very low-overhead backend but it suffers
996 from the Python Global Interpreter Lock if the called function
997 relies a lot on Python objects. "threading" is mostly useful
998 when the execution bottleneck is a compiled extension that
999 explicitly releases the GIL (for instance a Cython loop wrapped
1000 in a "with nogil" block or an expensive call to a library such
1001 as NumPy).
1002 - finally, you can register backends by calling
1003 :func:`~register_parallel_backend`. This will allow you to
1004 implement a backend of your liking.
1006 It is not recommended to hard-code the backend name in a call to
1007 :class:`~Parallel` in a library. Instead it is recommended to set
1008 soft hints (prefer) or hard constraints (require) so as to make it
1009 possible for library users to change the backend from the outside
1010 using the :func:`~parallel_config` context manager.
1011 return_as: str in {'list', 'generator', 'generator_unordered'}, default='list'
1012 If 'list', calls to this instance will return a list, only when
1013 all results have been processed and retrieved.
1014 If 'generator', it will return a generator that yields the results
1015 as soon as they are available, in the order the tasks have been
1016 submitted with.
1017 If 'generator_unordered', the generator will immediately yield
1018 available results independently of the submission order. The output
1019 order is not deterministic in this case because it depends on the
1020 concurrency of the workers.
1021 prefer: str in {'processes', 'threads'} or None, default=None
1022 Soft hint to choose the default backend if no specific backend
1023 was selected with the :func:`~parallel_config` context manager.
1024 The default process-based backend is 'loky' and the default
1025 thread-based backend is 'threading'. Ignored if the ``backend``
1026 parameter is specified.
1027 require: 'sharedmem' or None, default=None
1028 Hard constraint to select the backend. If set to 'sharedmem',
1029 the selected backend will be single-host and thread-based even
1030 if the user asked for a non-thread based backend with
1031 :func:`~joblib.parallel_config`.
1032 verbose: int, default=0
1033 The verbosity level: if non zero, progress messages are
1034 printed. Above 50, the output is sent to stdout.
1035 The frequency of the messages increases with the verbosity level.
1036 If it more than 10, all iterations are reported.
1037 timeout: float or None, default=None
1038 Timeout limit for each task to complete. If any task takes longer
1039 a TimeOutError will be raised. Only applied when n_jobs != 1
1040 pre_dispatch: {'all', integer, or expression, as in '3*n_jobs'}, default='2*n_jobs'
1041 The number of batches (of tasks) to be pre-dispatched.
1042 Default is '2*n_jobs'. When batch_size="auto" this is reasonable
1043 default and the workers should never starve. Note that only basic
1044 arithmetic are allowed here and no modules can be used in this
1045 expression.
1046 batch_size: int or 'auto', default='auto'
1047 The number of atomic tasks to dispatch at once to each
1048 worker. When individual evaluations are very fast, dispatching
1049 calls to workers can be slower than sequential computation because
1050 of the overhead. Batching fast computations together can mitigate
1051 this.
1052 The ``'auto'`` strategy keeps track of the time it takes for a
1053 batch to complete, and dynamically adjusts the batch size to keep
1054 the time on the order of half a second, using a heuristic. The
1055 initial batch size is 1.
1056 ``batch_size="auto"`` with ``backend="threading"`` will dispatch
1057 batches of a single task at a time as the threading backend has
1058 very little overhead and using larger batch size has not proved to
1059 bring any gain in that case.
1060 temp_folder: str or None, default=None
1061 Folder to be used by the pool for memmapping large arrays
1062 for sharing memory with worker processes. If None, this will try in
1063 order:
1065 - a folder pointed by the JOBLIB_TEMP_FOLDER environment
1066 variable,
1067 - /dev/shm if the folder exists and is writable: this is a
1068 RAM disk filesystem available by default on modern Linux
1069 distributions,
1070 - the default system temporary folder that can be
1071 overridden with TMP, TMPDIR or TEMP environment
1072 variables, typically /tmp under Unix operating systems.
1074 Only active when ``backend="loky"`` or ``"multiprocessing"``.
1075 max_nbytes int, str, or None, optional, default='1M'
1076 Threshold on the size of arrays passed to the workers that
1077 triggers automated memory mapping in temp_folder. Can be an int
1078 in Bytes, or a human-readable string, e.g., '1M' for 1 megabyte.
1079 Use None to disable memmapping of large arrays.
1080 Only active when ``backend="loky"`` or ``"multiprocessing"``.
1081 mmap_mode: {None, 'r+', 'r', 'w+', 'c'}, default='r'
1082 Memmapping mode for numpy arrays passed to workers. None will
1083 disable memmapping, other modes defined in the numpy.memmap doc:
1084 https://numpy.org/doc/stable/reference/generated/numpy.memmap.html
1085 Also, see 'max_nbytes' parameter documentation for more details.
1086 backend_kwargs: dict, optional
1087 Additional parameters to pass to the backend `configure` method.
1089 Notes
1090 -----
1092 This object uses workers to compute in parallel the application of a
1093 function to many different arguments. The main functionality it brings
1094 in addition to using the raw multiprocessing or concurrent.futures API
1095 are (see examples for details):
1097 * More readable code, in particular since it avoids
1098 constructing list of arguments.
1100 * Easier debugging:
1101 - informative tracebacks even when the error happens on
1102 the client side
1103 - using 'n_jobs=1' enables to turn off parallel computing
1104 for debugging without changing the codepath
1105 - early capture of pickling errors
1107 * An optional progress meter.
1109 * Interruption of multiprocesses jobs with 'Ctrl-C'
1111 * Flexible pickling control for the communication to and from
1112 the worker processes.
1114 * Ability to use shared memory efficiently with worker
1115 processes for large numpy-based datastructures.
1117 Note that the intended usage is to run one call at a time. Multiple
1118 calls to the same Parallel object will result in a ``RuntimeError``
1120 Examples
1121 --------
1123 A simple example:
1125 >>> from math import sqrt
1126 >>> from joblib import Parallel, delayed
1127 >>> Parallel(n_jobs=1)(delayed(sqrt)(i**2) for i in range(10))
1128 [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
1130 Reshaping the output when the function has several return
1131 values:
1133 >>> from math import modf
1134 >>> from joblib import Parallel, delayed
1135 >>> r = Parallel(n_jobs=1)(delayed(modf)(i/2.) for i in range(10))
1136 >>> res, i = zip(*r)
1137 >>> res
1138 (0.0, 0.5, 0.0, 0.5, 0.0, 0.5, 0.0, 0.5, 0.0, 0.5)
1139 >>> i
1140 (0.0, 0.0, 1.0, 1.0, 2.0, 2.0, 3.0, 3.0, 4.0, 4.0)
1142 The progress meter: the higher the value of `verbose`, the more
1143 messages:
1145 >>> from time import sleep
1146 >>> from joblib import Parallel, delayed
1147 >>> r = Parallel(n_jobs=2, verbose=10)(
1148 ... delayed(sleep)(.2) for _ in range(10)) #doctest: +SKIP
1149 [Parallel(n_jobs=2)]: Done 1 tasks | elapsed: 0.6s
1150 [Parallel(n_jobs=2)]: Done 4 tasks | elapsed: 0.8s
1151 [Parallel(n_jobs=2)]: Done 10 out of 10 | elapsed: 1.4s finished
1153 Traceback example, note how the line of the error is indicated
1154 as well as the values of the parameter passed to the function that
1155 triggered the exception, even though the traceback happens in the
1156 child process:
1158 >>> from heapq import nlargest
1159 >>> from joblib import Parallel, delayed
1160 >>> Parallel(n_jobs=2)(
1161 ... delayed(nlargest)(2, n) for n in (range(4), 'abcde', 3))
1162 ... # doctest: +SKIP
1163 -----------------------------------------------------------------------
1164 Sub-process traceback:
1165 -----------------------------------------------------------------------
1166 TypeError Mon Nov 12 11:37:46 2012
1167 PID: 12934 Python 2.7.3: /usr/bin/python
1168 ........................................................................
1169 /usr/lib/python2.7/heapq.pyc in nlargest(n=2, iterable=3, key=None)
1170 419 if n >= size:
1171 420 return sorted(iterable, key=key, reverse=True)[:n]
1172 421
1173 422 # When key is none, use simpler decoration
1174 423 if key is None:
1175 --> 424 it = izip(iterable, count(0,-1)) # decorate
1176 425 result = _nlargest(n, it)
1177 426 return map(itemgetter(0), result) # undecorate
1178 427
1179 428 # General case, slowest method
1180 TypeError: izip argument #1 must support iteration
1181 _______________________________________________________________________
1184 Using pre_dispatch in a producer/consumer situation, where the
1185 data is generated on the fly. Note how the producer is first
1186 called 3 times before the parallel loop is initiated, and then
1187 called to generate new data on the fly:
1189 >>> from math import sqrt
1190 >>> from joblib import Parallel, delayed
1191 >>> def producer():
1192 ... for i in range(6):
1193 ... print('Produced %s' % i)
1194 ... yield i
1195 >>> out = Parallel(n_jobs=2, verbose=100, pre_dispatch='1.5*n_jobs')(
1196 ... delayed(sqrt)(i) for i in producer()) #doctest: +SKIP
1197 Produced 0
1198 Produced 1
1199 Produced 2
1200 [Parallel(n_jobs=2)]: Done 1 jobs | elapsed: 0.0s
1201 Produced 3
1202 [Parallel(n_jobs=2)]: Done 2 jobs | elapsed: 0.0s
1203 Produced 4
1204 [Parallel(n_jobs=2)]: Done 3 jobs | elapsed: 0.0s
1205 Produced 5
1206 [Parallel(n_jobs=2)]: Done 4 jobs | elapsed: 0.0s
1207 [Parallel(n_jobs=2)]: Done 6 out of 6 | elapsed: 0.0s remaining: 0.0s
1208 [Parallel(n_jobs=2)]: Done 6 out of 6 | elapsed: 0.0s finished
1210 """ # noqa: E501
1212 def __init__(
1213 self,
1214 n_jobs=default_parallel_config["n_jobs"],
1215 backend=default_parallel_config["backend"],
1216 return_as="list",
1217 verbose=default_parallel_config["verbose"],
1218 timeout=None,
1219 pre_dispatch="2 * n_jobs",
1220 batch_size="auto",
1221 temp_folder=default_parallel_config["temp_folder"],
1222 max_nbytes=default_parallel_config["max_nbytes"],
1223 mmap_mode=default_parallel_config["mmap_mode"],
1224 prefer=default_parallel_config["prefer"],
1225 require=default_parallel_config["require"],
1226 **backend_kwargs,
1227 ):
1228 # Initiate parent Logger class state
1229 super().__init__()
1231 # Interpret n_jobs=None as 'unset'
1232 if n_jobs is None:
1233 n_jobs = default_parallel_config["n_jobs"]
1235 active_backend, context_config = _get_active_backend(
1236 prefer=prefer, require=require, verbose=verbose
1237 )
1239 nesting_level = active_backend.nesting_level
1241 self.verbose = _get_config_param(verbose, context_config, "verbose")
1242 self.timeout = timeout
1243 self.pre_dispatch = pre_dispatch
1245 if return_as not in {"list", "generator", "generator_unordered"}:
1246 raise ValueError(
1247 'Expected `return_as` parameter to be a string equal to "list"'
1248 f',"generator" or "generator_unordered", but got {return_as} '
1249 "instead."
1250 )
1251 self.return_as = return_as
1252 self.return_generator = return_as != "list"
1253 self.return_ordered = return_as != "generator_unordered"
1255 # Check if we are under a parallel_config or parallel_backend
1256 # context manager and use the config from the context manager
1257 # for arguments that are not explicitly set.
1258 self._backend_kwargs = {
1259 **backend_kwargs,
1260 **{
1261 k: _get_config_param(param, context_config, k)
1262 for param, k in [
1263 (max_nbytes, "max_nbytes"),
1264 (temp_folder, "temp_folder"),
1265 (mmap_mode, "mmap_mode"),
1266 (prefer, "prefer"),
1267 (require, "require"),
1268 (verbose, "verbose"),
1269 ]
1270 },
1271 }
1273 if isinstance(self._backend_kwargs["max_nbytes"], str):
1274 self._backend_kwargs["max_nbytes"] = memstr_to_bytes(
1275 self._backend_kwargs["max_nbytes"]
1276 )
1277 self._backend_kwargs["verbose"] = max(0, self._backend_kwargs["verbose"] - 50)
1279 if DEFAULT_MP_CONTEXT is not None:
1280 self._backend_kwargs["context"] = DEFAULT_MP_CONTEXT
1281 elif hasattr(mp, "get_context"):
1282 self._backend_kwargs["context"] = mp.get_context()
1284 if backend is default_parallel_config["backend"] or backend is None:
1285 backend = active_backend
1287 elif isinstance(backend, ParallelBackendBase):
1288 # Use provided backend as is, with the current nesting_level if it
1289 # is not set yet.
1290 if backend.nesting_level is None:
1291 backend.nesting_level = nesting_level
1293 elif hasattr(backend, "Pool") and hasattr(backend, "Lock"):
1294 # Make it possible to pass a custom multiprocessing context as
1295 # backend to change the start method to forkserver or spawn or
1296 # preload modules on the forkserver helper process.
1297 self._backend_kwargs["context"] = backend
1298 backend = MultiprocessingBackend(nesting_level=nesting_level)
1300 elif backend not in BACKENDS and backend in MAYBE_AVAILABLE_BACKENDS:
1301 warnings.warn(
1302 f"joblib backend '{backend}' is not available on "
1303 f"your system, falling back to {DEFAULT_BACKEND}.",
1304 UserWarning,
1305 stacklevel=2,
1306 )
1307 BACKENDS[backend] = BACKENDS[DEFAULT_BACKEND]
1308 backend = BACKENDS[DEFAULT_BACKEND](nesting_level=nesting_level)
1310 else:
1311 try:
1312 backend_factory = BACKENDS[backend]
1313 except KeyError as e:
1314 raise ValueError(
1315 "Invalid backend: %s, expected one of %r"
1316 % (backend, sorted(BACKENDS.keys()))
1317 ) from e
1318 backend = backend_factory(nesting_level=nesting_level)
1320 n_jobs = _get_config_param(n_jobs, context_config, "n_jobs")
1321 if n_jobs is None:
1322 # No specific context override and no specific value request:
1323 # default to the default of the backend.
1324 n_jobs = backend.default_n_jobs
1325 try:
1326 n_jobs = int(n_jobs)
1327 except ValueError:
1328 raise ValueError("n_jobs could not be converted to int")
1329 self.n_jobs = n_jobs
1331 if require == "sharedmem" and not getattr(backend, "supports_sharedmem", False):
1332 raise ValueError("Backend %s does not support shared memory" % backend)
1334 if batch_size == "auto" or isinstance(batch_size, Integral) and batch_size > 0:
1335 self.batch_size = batch_size
1336 else:
1337 raise ValueError(
1338 "batch_size must be 'auto' or a positive integer, got: %r" % batch_size
1339 )
1341 if not isinstance(backend, SequentialBackend):
1342 if self.return_generator and not backend.supports_return_generator:
1343 raise ValueError(
1344 "Backend {} does not support return_as={}".format(
1345 backend, return_as
1346 )
1347 )
1348 # This lock is used to coordinate the main thread of this process
1349 # with the async callback thread of our the pool.
1350 self._lock = threading.RLock()
1351 self._jobs = collections.deque()
1352 self._jobs_set = set()
1353 self._pending_outputs = list()
1354 self._ready_batches = queue.Queue()
1355 self._reducer_callback = None
1357 # Internal variables
1358 self._backend = backend
1359 self._running = False
1360 self._managed_backend = False
1361 self._id = uuid4().hex
1362 self._call_ref = None
1364 def __enter__(self):
1365 self._managed_backend = True
1366 self._calling = False
1367 self._initialize_backend()
1368 return self
1370 def __exit__(self, exc_type, exc_value, traceback):
1371 self._managed_backend = False
1372 if self.return_generator and self._calling:
1373 self._abort()
1374 self._terminate_and_reset()
1376 def _initialize_backend(self):
1377 """Build a process or thread pool and return the number of workers"""
1378 try:
1379 n_jobs = self._backend.configure(
1380 n_jobs=self.n_jobs, parallel=self, **self._backend_kwargs
1381 )
1382 if self.timeout is not None and not self._backend.supports_timeout:
1383 warnings.warn(
1384 "The backend class {!r} does not support timeout. "
1385 "You have set 'timeout={}' in Parallel but "
1386 "the 'timeout' parameter will not be used.".format(
1387 self._backend.__class__.__name__, self.timeout
1388 )
1389 )
1391 except FallbackToBackend as e:
1392 # Recursively initialize the backend in case of requested fallback.
1393 self._backend = e.backend
1394 n_jobs = self._initialize_backend()
1396 return n_jobs
1398 def _effective_n_jobs(self):
1399 if self._backend:
1400 return self._backend.effective_n_jobs(self.n_jobs)
1401 return 1
1403 def _terminate_and_reset(self):
1404 if hasattr(self._backend, "stop_call") and self._calling:
1405 self._backend.stop_call()
1406 self._calling = False
1407 if not self._managed_backend:
1408 self._backend.terminate()
1410 def _dispatch(self, batch):
1411 """Queue the batch for computing, with or without multiprocessing
1413 WARNING: this method is not thread-safe: it should be only called
1414 indirectly via dispatch_one_batch.
1416 """
1417 # If job.get() catches an exception, it closes the queue:
1418 if self._aborting:
1419 return
1421 batch_size = len(batch)
1423 self.n_dispatched_tasks += batch_size
1424 self.n_dispatched_batches += 1
1426 dispatch_timestamp = time.time()
1428 batch_tracker = BatchCompletionCallBack(dispatch_timestamp, batch_size, self)
1430 self._register_new_job(batch_tracker)
1432 # If return_ordered is False, the batch_tracker is not stored in the
1433 # jobs queue at the time of submission. Instead, it will be appended to
1434 # the queue by itself as soon as the callback is triggered to be able
1435 # to return the results in the order of completion.
1437 job = self._backend.submit(batch, callback=batch_tracker)
1438 batch_tracker.register_job(job)
1440 def _register_new_job(self, batch_tracker):
1441 if self.return_ordered:
1442 self._jobs.append(batch_tracker)
1443 else:
1444 self._jobs_set.add(batch_tracker)
1446 def dispatch_next(self):
1447 """Dispatch more data for parallel processing
1449 This method is meant to be called concurrently by the multiprocessing
1450 callback. We rely on the thread-safety of dispatch_one_batch to protect
1451 against concurrent consumption of the unprotected iterator.
1452 """
1453 if not self.dispatch_one_batch(self._original_iterator):
1454 self._iterating = False
1455 self._original_iterator = None
1457 def dispatch_one_batch(self, iterator):
1458 """Prefetch the tasks for the next batch and dispatch them.
1460 The effective size of the batch is computed here.
1461 If there are no more jobs to dispatch, return False, else return True.
1463 The iterator consumption and dispatching is protected by the same
1464 lock so calling this function should be thread safe.
1466 """
1468 if self._aborting:
1469 return False
1471 batch_size = self._get_batch_size()
1473 with self._lock:
1474 # to ensure an even distribution of the workload between workers,
1475 # we look ahead in the original iterators more than batch_size
1476 # tasks - However, we keep consuming only one batch at each
1477 # dispatch_one_batch call. The extra tasks are stored in a local
1478 # queue, _ready_batches, that is looked-up prior to re-consuming
1479 # tasks from the origal iterator.
1480 try:
1481 tasks = self._ready_batches.get(block=False)
1482 except queue.Empty:
1483 # slice the iterator n_jobs * batchsize items at a time. If the
1484 # slice returns less than that, then the current batchsize puts
1485 # too much weight on a subset of workers, while other may end
1486 # up starving. So in this case, re-scale the batch size
1487 # accordingly to distribute evenly the last items between all
1488 # workers.
1489 n_jobs = self._cached_effective_n_jobs
1490 big_batch_size = batch_size * n_jobs
1492 try:
1493 islice = list(itertools.islice(iterator, big_batch_size))
1494 except Exception as e:
1495 # Handle the fact that the generator of task raised an
1496 # exception. As this part of the code can be executed in
1497 # a thread internal to the backend, register a task with
1498 # an error that will be raised in the user's thread.
1499 if isinstance(e.__context__, queue.Empty):
1500 # Suppress the cause of the exception if it is
1501 # queue.Empty to avoid cluttered traceback. Only do it
1502 # if the __context__ is really empty to avoid messing
1503 # with causes of the original error.
1504 e.__cause__ = None
1505 batch_tracker = BatchCompletionCallBack(0, batch_size, self)
1506 self._register_new_job(batch_tracker)
1507 batch_tracker._register_outcome(dict(result=e, status=TASK_ERROR))
1508 return True
1510 if len(islice) == 0:
1511 return False
1512 elif (
1513 iterator is self._original_iterator and len(islice) < big_batch_size
1514 ):
1515 # We reached the end of the original iterator (unless
1516 # iterator is the ``pre_dispatch``-long initial slice of
1517 # the original iterator) -- decrease the batch size to
1518 # account for potential variance in the batches running
1519 # time.
1520 final_batch_size = max(1, len(islice) // (10 * n_jobs))
1521 else:
1522 final_batch_size = max(1, len(islice) // n_jobs)
1524 # enqueue n_jobs batches in a local queue
1525 for i in range(0, len(islice), final_batch_size):
1526 tasks = BatchedCalls(
1527 islice[i : i + final_batch_size],
1528 self._backend.get_nested_backend(),
1529 self._reducer_callback,
1530 self._pickle_cache,
1531 )
1532 self._ready_batches.put(tasks)
1534 # finally, get one task.
1535 tasks = self._ready_batches.get(block=False)
1536 if len(tasks) == 0:
1537 # No more tasks available in the iterator: tell caller to stop.
1538 return False
1539 else:
1540 self._dispatch(tasks)
1541 return True
1543 def _get_batch_size(self):
1544 """Returns the effective batch size for dispatch"""
1545 if self.batch_size == "auto":
1546 return self._backend.compute_batch_size()
1547 else:
1548 # Fixed batch size strategy
1549 return self.batch_size
1551 def _print(self, msg):
1552 """Display the message on stout or stderr depending on verbosity"""
1553 # XXX: Not using the logger framework: need to
1554 # learn to use logger better.
1555 if not self.verbose:
1556 return
1557 if self.verbose < 50:
1558 writer = sys.stderr.write
1559 else:
1560 writer = sys.stdout.write
1561 writer(f"[{self}]: {msg}\n")
1563 def _is_completed(self):
1564 """Check if all tasks have been completed"""
1565 return self.n_completed_tasks == self.n_dispatched_tasks and not (
1566 self._iterating or self._aborting
1567 )
1569 def print_progress(self):
1570 """Display the process of the parallel execution only a fraction
1571 of time, controlled by self.verbose.
1572 """
1574 if not self.verbose:
1575 return
1577 if self.n_tasks is not None and self.n_tasks > 0:
1578 width = floor(log10(self.n_tasks)) + 1
1579 else:
1580 width = 3
1581 elapsed_time = time.time() - self._start_time
1583 if self._is_completed():
1584 # Make sure that we get a last message telling us we are done
1585 self._print(
1586 f"Done {self.n_completed_tasks:{width}d} out of "
1587 f"{self.n_completed_tasks:{width}d} | elapsed: "
1588 f"{short_format_time(elapsed_time)} finished"
1589 )
1590 return
1592 # Original job iterator becomes None once it has been fully
1593 # consumed: at this point we know the total number of jobs and we are
1594 # able to display an estimation of the remaining time based on already
1595 # completed jobs. Otherwise, we simply display the number of completed
1596 # tasks.
1597 elif self._original_iterator is not None:
1598 if _verbosity_filter(self.n_dispatched_batches, self.verbose):
1599 return
1600 fmt_time = f"| elapsed: {short_format_time(elapsed_time)}"
1601 index = self.n_completed_tasks
1602 if self.n_tasks is not None:
1603 self._print(
1604 f"Done {index:{width}d} out of {self.n_tasks:{width}d} {fmt_time}"
1605 )
1606 else:
1607 pad = " " * (len("out of ") + width - len("tasks"))
1608 self._print(f"Done {index:{width}d} tasks {pad}{fmt_time}")
1609 else:
1610 index = self.n_completed_tasks
1611 # We are finished dispatching
1612 total_tasks = self.n_dispatched_tasks
1613 # We always display the first loop
1614 if index != 0:
1615 # Display depending on the number of remaining items
1616 # A message as soon as we finish dispatching, cursor is 0
1617 cursor = total_tasks - index + 1 - self._pre_dispatch_amount
1618 frequency = (total_tasks // self.verbose) + 1
1619 is_last_item = index + 1 == total_tasks
1620 if is_last_item or cursor % frequency:
1621 return
1622 remaining_time = (elapsed_time / max(index, 1)) * (
1623 self.n_dispatched_tasks - index
1624 )
1625 # only display status if remaining time is greater or equal to 0
1626 self._print(
1627 f"Done {index:{width}d} out of {total_tasks:{width}d} "
1628 f"| elapsed: {short_format_time(elapsed_time)} remaining: "
1629 f"{short_format_time(remaining_time)}"
1630 )
1632 def _abort(self):
1633 # Stop dispatching new jobs in the async callback thread
1634 self._aborting = True
1636 # If the backend allows it, cancel or kill remaining running
1637 # tasks without waiting for the results as we will raise
1638 # the exception we got back to the caller instead of returning
1639 # any result.
1640 backend = self._backend
1641 if not self._aborted and hasattr(backend, "abort_everything"):
1642 # If the backend is managed externally we need to make sure
1643 # to leave it in a working state to allow for future jobs
1644 # scheduling.
1645 ensure_ready = self._managed_backend
1646 backend.abort_everything(ensure_ready=ensure_ready)
1647 self._aborted = True
1649 def _start(self, iterator, pre_dispatch):
1650 # Only set self._iterating to True if at least a batch
1651 # was dispatched. In particular this covers the edge
1652 # case of Parallel used with an exhausted iterator. If
1653 # self._original_iterator is None, then this means either
1654 # that pre_dispatch == "all", n_jobs == 1 or that the first batch
1655 # was very quick and its callback already dispatched all the
1656 # remaining jobs.
1657 self._iterating = False
1658 if self.dispatch_one_batch(iterator):
1659 self._iterating = self._original_iterator is not None
1661 while self.dispatch_one_batch(iterator):
1662 pass
1664 if pre_dispatch == "all":
1665 # The iterable was consumed all at once by the above for loop.
1666 # No need to wait for async callbacks to trigger to
1667 # consumption.
1668 self._iterating = False
1670 def _get_outputs(self, iterator, pre_dispatch):
1671 """Iterator returning the tasks' output as soon as they are ready."""
1672 dispatch_thread_id = threading.get_ident()
1673 detach_generator_exit = False
1674 try:
1675 self._start(iterator, pre_dispatch)
1676 # first yield returns None, for internal use only. This ensures
1677 # that we enter the try/except block and start dispatching the
1678 # tasks.
1679 yield
1681 with self._backend.retrieval_context():
1682 yield from self._retrieve()
1684 except GeneratorExit:
1685 # The generator has been garbage collected before being fully
1686 # consumed. This aborts the remaining tasks if possible and warn
1687 # the user if necessary.
1688 self._exception = True
1690 # In some interpreters such as PyPy, GeneratorExit can be raised in
1691 # a different thread than the one used to start the dispatch of the
1692 # parallel tasks. This can lead to hang when a thread attempts to
1693 # join itself. As workaround, we detach the execution of the
1694 # aborting code to a dedicated thread. We then need to make sure
1695 # the rest of the function does not call `_terminate_and_reset`
1696 # in finally.
1697 if dispatch_thread_id != threading.get_ident():
1698 warnings.warn(
1699 "A generator produced by joblib.Parallel has been "
1700 "gc'ed in an unexpected thread. This behavior should "
1701 "not cause major -issues but to make sure, please "
1702 "report this warning and your use case at "
1703 "https://github.com/joblib/joblib/issues so it can "
1704 "be investigated."
1705 )
1707 detach_generator_exit = True
1708 _parallel = self
1710 class _GeneratorExitThread(threading.Thread):
1711 def run(self):
1712 _parallel._abort()
1713 if _parallel.return_generator:
1714 _parallel._warn_exit_early()
1715 _parallel._terminate_and_reset()
1717 _GeneratorExitThread(name="GeneratorExitThread").start()
1718 return
1720 # Otherwise, we are in the thread that started the dispatch: we can
1721 # safely abort the execution and warn the user.
1722 self._abort()
1723 if self.return_generator:
1724 self._warn_exit_early()
1726 raise
1728 # Note: we catch any BaseException instead of just Exception instances
1729 # to also include KeyboardInterrupt
1730 except BaseException:
1731 self._exception = True
1732 self._abort()
1733 raise
1734 finally:
1735 # Store the unconsumed tasks and terminate the workers if necessary
1736 _remaining_outputs = [] if self._exception else self._jobs
1737 self._jobs = collections.deque()
1738 self._jobs_set = set()
1739 self._running = False
1740 if not detach_generator_exit:
1741 self._terminate_and_reset()
1743 while len(_remaining_outputs) > 0:
1744 batched_results = _remaining_outputs.popleft()
1745 batched_results = batched_results.get_result(self.timeout)
1746 for result in batched_results:
1747 yield result
1749 def _wait_retrieval(self):
1750 """Return True if we need to continue retrieving some tasks."""
1752 # If the input load is still being iterated over, it means that tasks
1753 # are still on the dispatch waitlist and their results will need to
1754 # be retrieved later on.
1755 if self._iterating:
1756 return True
1758 # If some of the dispatched tasks are still being processed by the
1759 # workers, wait for the compute to finish before starting retrieval
1760 if self.n_completed_tasks < self.n_dispatched_tasks:
1761 return True
1763 # For backends that does not support retrieving asynchronously the
1764 # result to the main process, all results must be carefully retrieved
1765 # in the _retrieve loop in the main thread while the backend is alive.
1766 # For other backends, the actual retrieval is done asynchronously in
1767 # the callback thread, and we can terminate the backend before the
1768 # `self._jobs` result list has been emptied. The remaining results
1769 # will be collected in the `finally` step of the generator.
1770 if not self._backend.supports_retrieve_callback:
1771 if len(self._jobs) > 0:
1772 return True
1774 return False
1776 def _retrieve(self):
1777 timeout_control_job = None
1778 while self._wait_retrieval():
1779 # If the callback thread of a worker has signaled that its task
1780 # triggered an exception, or if the retrieval loop has raised an
1781 # exception (e.g. `GeneratorExit`), exit the loop and surface the
1782 # worker traceback.
1783 if self._aborting:
1784 self._raise_error_fast()
1785 break
1787 nb_jobs = len(self._jobs)
1788 # Now wait for a job to be ready for retrieval.
1789 if self.return_ordered:
1790 # Case ordered: wait for completion (or error) of the next job
1791 # that have been dispatched and not retrieved yet. If no job
1792 # have been dispatched yet, wait for dispatch.
1793 # We assume that the time to wait for the next job to be
1794 # dispatched is always low, so that the timeout
1795 # control only have to be done on the amount of time the next
1796 # dispatched job is pending.
1797 if (nb_jobs == 0) or (
1798 self._jobs[0].get_status(timeout=self.timeout) == TASK_PENDING
1799 ):
1800 time.sleep(0.01)
1801 continue
1803 elif nb_jobs == 0:
1804 # Case unordered: jobs are added to the list of jobs to
1805 # retrieve `self._jobs` only once completed or in error, which
1806 # is too late to enable timeout control in the same way than in
1807 # the previous case.
1808 # Instead, if no job is ready to be retrieved yet, we
1809 # arbitrarily pick a dispatched job, and the timeout control is
1810 # done such that an error is raised if this control job
1811 # timeouts before any other dispatched job has completed and
1812 # been added to `self._jobs` to be retrieved.
1813 if timeout_control_job is None:
1814 timeout_control_job = next(iter(self._jobs_set), None)
1816 # NB: it can be None if no job has been dispatched yet.
1817 if timeout_control_job is not None:
1818 timeout_control_job.get_status(timeout=self.timeout)
1820 time.sleep(0.01)
1821 continue
1823 elif timeout_control_job is not None:
1824 # Case unordered, when `nb_jobs > 0`:
1825 # It means that a job is ready to be retrieved, so no timeout
1826 # will occur during this iteration.
1827 # Before proceeding to retrieval of the next ready job, reset
1828 # the timeout control state to prepare the next iteration.
1829 timeout_control_job._completion_timeout_counter = None
1830 timeout_control_job = None
1832 # We need to be careful: the job list can be filling up as
1833 # we empty it and Python list are not thread-safe by
1834 # default hence the use of the lock
1835 with self._lock:
1836 batched_results = self._jobs.popleft()
1837 if not self.return_ordered:
1838 self._jobs_set.remove(batched_results)
1840 # Flatten the batched results to output one output at a time
1841 batched_results = batched_results.get_result(self.timeout)
1842 for result in batched_results:
1843 self._nb_consumed += 1
1844 yield result
1846 def _raise_error_fast(self):
1847 """If we are aborting, raise if a job caused an error."""
1849 # Find the first job whose status is TASK_ERROR if it exists.
1850 with self._lock:
1851 error_job = next(
1852 (job for job in self._jobs if job.status == TASK_ERROR), None
1853 )
1855 # If this error job exists, immediately raise the error by
1856 # calling get_result. This job might not exists if abort has been
1857 # called directly or if the generator is gc'ed.
1858 if error_job is not None:
1859 error_job.get_result(self.timeout)
1861 def _warn_exit_early(self):
1862 """Warn the user if the generator is gc'ed before being consumned."""
1863 ready_outputs = self.n_completed_tasks - self._nb_consumed
1864 is_completed = self._is_completed()
1865 msg = ""
1866 if ready_outputs:
1867 msg += (
1868 f"{ready_outputs} tasks have been successfully executed but not used."
1869 )
1870 if not is_completed:
1871 msg += " Additionally, "
1873 if not is_completed:
1874 msg += (
1875 f"{self.n_dispatched_tasks - self.n_completed_tasks} tasks "
1876 "which were still being processed by the workers have been "
1877 "cancelled."
1878 )
1880 if msg:
1881 msg += (
1882 " You could benefit from adjusting the input task "
1883 "iterator to limit unnecessary computation time."
1884 )
1886 warnings.warn(msg)
1888 def _get_sequential_output(self, iterable):
1889 """Separate loop for sequential output.
1891 This simplifies the traceback in case of errors and reduces the
1892 overhead of calling sequential tasks with `joblib`.
1893 """
1894 try:
1895 self._iterating = True
1896 self._original_iterator = iterable
1897 batch_size = self._get_batch_size()
1899 if batch_size != 1:
1900 it = iter(iterable)
1901 iterable_batched = iter(
1902 lambda: tuple(itertools.islice(it, batch_size)), ()
1903 )
1904 iterable = (task for batch in iterable_batched for task in batch)
1906 # first yield returns None, for internal use only. This ensures
1907 # that we enter the try/except block and setup the generator.
1908 yield None
1910 # Sequentially call the tasks and yield the results.
1911 for func, args, kwargs in iterable:
1912 self.n_dispatched_batches += 1
1913 self.n_dispatched_tasks += 1
1914 res = func(*args, **kwargs)
1915 self.n_completed_tasks += 1
1916 self.print_progress()
1917 yield res
1918 self._nb_consumed += 1
1919 except BaseException:
1920 self._exception = True
1921 self._aborting = True
1922 self._aborted = True
1923 raise
1924 finally:
1925 self._running = False
1926 self._iterating = False
1927 self._original_iterator = None
1928 self.print_progress()
1930 def _reset_run_tracking(self):
1931 """Reset the counters and flags used to track the execution."""
1933 # Makes sur the parallel instance was not previously running in a
1934 # thread-safe way.
1935 with getattr(self, "_lock", nullcontext()):
1936 if self._running:
1937 msg = "This Parallel instance is already running !"
1938 if self.return_generator is True:
1939 msg += (
1940 " Before submitting new tasks, you must wait for the "
1941 "completion of all the previous tasks, or clean all "
1942 "references to the output generator."
1943 )
1944 raise RuntimeError(msg)
1945 self._running = True
1947 # Counter to keep track of the task dispatched and completed.
1948 self.n_dispatched_batches = 0
1949 self.n_dispatched_tasks = 0
1950 self.n_completed_tasks = 0
1952 # Following count is incremented by one each time the user iterates
1953 # on the output generator, it is used to prepare an informative
1954 # warning message in case the generator is deleted before all the
1955 # dispatched tasks have been consumed.
1956 self._nb_consumed = 0
1958 # Following flags are used to synchronize the threads in case one of
1959 # the tasks error-out to ensure that all workers abort fast and that
1960 # the backend terminates properly.
1962 # Set to True as soon as a worker signals that a task errors-out
1963 self._exception = False
1964 # Set to True in case of early termination following an incident
1965 self._aborting = False
1966 # Set to True after abortion is complete
1967 self._aborted = False
1969 def __call__(self, iterable):
1970 """Main function to dispatch parallel tasks."""
1972 self._reset_run_tracking()
1973 self.n_tasks = len(iterable) if hasattr(iterable, "__len__") else None
1974 self._start_time = time.time()
1976 if not self._managed_backend:
1977 n_jobs = self._initialize_backend()
1978 else:
1979 n_jobs = self._effective_n_jobs()
1981 if n_jobs == 1:
1982 # If n_jobs==1, run the computation sequentially and return
1983 # immediately to avoid overheads.
1984 output = self._get_sequential_output(iterable)
1985 next(output)
1986 return output if self.return_generator else list(output)
1988 # Let's create an ID that uniquely identifies the current call. If the
1989 # call is interrupted early and that the same instance is immediately
1990 # reused, this id will be used to prevent workers that were
1991 # concurrently finalizing a task from the previous call to run the
1992 # callback.
1993 with self._lock:
1994 self._call_id = uuid4().hex
1996 # self._effective_n_jobs should be called in the Parallel.__call__
1997 # thread only -- store its value in an attribute for further queries.
1998 self._cached_effective_n_jobs = n_jobs
2000 if isinstance(self._backend, LokyBackend):
2001 # For the loky backend, we add a callback executed when reducing
2002 # BatchCalls, that makes the loky executor use a temporary folder
2003 # specific to this Parallel object when pickling temporary memmaps.
2004 # This callback is necessary to ensure that several Parallel
2005 # objects using the same reusable executor don't use the same
2006 # temporary resources.
2008 def _batched_calls_reducer_callback():
2009 # Relevant implementation detail: the following lines, called
2010 # when reducing BatchedCalls, are called in a thread-safe
2011 # situation, meaning that the context of the temporary folder
2012 # manager will not be changed in between the callback execution
2013 # and the end of the BatchedCalls pickling. The reason is that
2014 # pickling (the only place where set_current_context is used)
2015 # is done from a single thread (the queue_feeder_thread).
2016 self._backend._workers._temp_folder_manager.set_current_context( # noqa
2017 self._id
2018 )
2020 self._reducer_callback = _batched_calls_reducer_callback
2022 # self._effective_n_jobs should be called in the Parallel.__call__
2023 # thread only -- store its value in an attribute for further queries.
2024 self._cached_effective_n_jobs = n_jobs
2026 backend_name = self._backend.__class__.__name__
2027 if n_jobs == 0:
2028 raise RuntimeError("%s has no active worker." % backend_name)
2030 self._print(f"Using backend {backend_name} with {n_jobs} concurrent workers.")
2031 if hasattr(self._backend, "start_call"):
2032 self._backend.start_call()
2034 # Following flag prevents double calls to `backend.stop_call`.
2035 self._calling = True
2037 iterator = iter(iterable)
2038 pre_dispatch = self.pre_dispatch
2040 if pre_dispatch == "all":
2041 # prevent further dispatch via multiprocessing callback thread
2042 self._original_iterator = None
2043 self._pre_dispatch_amount = 0
2044 else:
2045 self._original_iterator = iterator
2046 if hasattr(pre_dispatch, "endswith"):
2047 pre_dispatch = eval_expr(pre_dispatch.replace("n_jobs", str(n_jobs)))
2048 self._pre_dispatch_amount = pre_dispatch = int(pre_dispatch)
2050 # The main thread will consume the first pre_dispatch items and
2051 # the remaining items will later be lazily dispatched by async
2052 # callbacks upon task completions.
2054 # TODO: this iterator should be batch_size * n_jobs
2055 iterator = itertools.islice(iterator, self._pre_dispatch_amount)
2057 # Use a caching dict for callables that are pickled with cloudpickle to
2058 # improve performances. This cache is used only in the case of
2059 # functions that are defined in the __main__ module, functions that
2060 # are defined locally (inside another function) and lambda expressions.
2061 self._pickle_cache = dict()
2063 output = self._get_outputs(iterator, pre_dispatch)
2064 self._call_ref = weakref.ref(output)
2066 # The first item from the output is blank, but it makes the interpreter
2067 # progress until it enters the Try/Except block of the generator and
2068 # reaches the first `yield` statement. This starts the asynchronous
2069 # dispatch of the tasks to the workers.
2070 next(output)
2072 return output if self.return_generator else list(output)
2074 def __repr__(self):
2075 return "%s(n_jobs=%s)" % (self.__class__.__name__, self.n_jobs)