Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/joblib/parallel.py: 17%

616 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-12 06:31 +0000

1""" 

2Helpers for embarrassingly parallel code. 

3""" 

4# Author: Gael Varoquaux < gael dot varoquaux at normalesup dot org > 

5# Copyright: 2010, Gael Varoquaux 

6# License: BSD 3 clause 

7 

8from __future__ import division 

9 

10import os 

11import sys 

12from math import sqrt 

13import functools 

14import collections 

15import time 

16import threading 

17import itertools 

18from uuid import uuid4 

19from numbers import Integral 

20import warnings 

21import queue 

22import weakref 

23from contextlib import nullcontext 

24 

25from multiprocessing import TimeoutError 

26 

27from ._multiprocessing_helpers import mp 

28 

29from .logger import Logger, short_format_time 

30from .disk import memstr_to_bytes 

31from ._parallel_backends import (FallbackToBackend, MultiprocessingBackend, 

32 ThreadingBackend, SequentialBackend, 

33 LokyBackend) 

34from ._utils import eval_expr, _Sentinel 

35 

36# Make sure that those two classes are part of the public joblib.parallel API 

37# so that 3rd party backend implementers can import them from here. 

38from ._parallel_backends import AutoBatchingMixin # noqa 

39from ._parallel_backends import ParallelBackendBase # noqa 

40 

41 

42IS_PYPY = hasattr(sys, "pypy_version_info") 

43 

44 

45BACKENDS = { 

46 'threading': ThreadingBackend, 

47 'sequential': SequentialBackend, 

48} 

49# name of the backend used by default by Parallel outside of any context 

50# managed by ``parallel_config`` or ``parallel_backend``. 

51 

52# threading is the only backend that is always everywhere 

53DEFAULT_BACKEND = 'threading' 

54 

55MAYBE_AVAILABLE_BACKENDS = {'multiprocessing', 'loky'} 

56 

57# if multiprocessing is available, so is loky, we set it as the default 

58# backend 

59if mp is not None: 

60 BACKENDS['multiprocessing'] = MultiprocessingBackend 

61 from .externals import loky 

62 BACKENDS['loky'] = LokyBackend 

63 DEFAULT_BACKEND = 'loky' 

64 

65 

66DEFAULT_THREAD_BACKEND = 'threading' 

67 

68 

69# Thread local value that can be overridden by the ``parallel_config`` context 

70# manager 

71_backend = threading.local() 

72 

73 

74def _register_dask(): 

75 """Register Dask Backend if called with parallel_config(backend="dask")""" 

76 try: 

77 from ._dask import DaskDistributedBackend 

78 register_parallel_backend('dask', DaskDistributedBackend) 

79 except ImportError as e: 

80 msg = ("To use the dask.distributed backend you must install both " 

81 "the `dask` and distributed modules.\n\n" 

82 "See https://dask.pydata.org/en/latest/install.html for more " 

83 "information.") 

84 raise ImportError(msg) from e 

85 

86 

87EXTERNAL_BACKENDS = { 

88 'dask': _register_dask, 

89} 

90 

91 

92# Sentinels for the default values of the Parallel constructor and 

93# the parallel_config and parallel_backend context managers 

94default_parallel_config = { 

95 "backend": _Sentinel(default_value=None), 

96 "n_jobs": _Sentinel(default_value=None), 

97 "verbose": _Sentinel(default_value=0), 

98 "temp_folder": _Sentinel(default_value=None), 

99 "max_nbytes": _Sentinel(default_value="1M"), 

100 "mmap_mode": _Sentinel(default_value="r"), 

101 "prefer": _Sentinel(default_value=None), 

102 "require": _Sentinel(default_value=None), 

103} 

104 

105 

106VALID_BACKEND_HINTS = ('processes', 'threads', None) 

107VALID_BACKEND_CONSTRAINTS = ('sharedmem', None) 

108 

109 

110def _get_config_param(param, context_config, key): 

111 """Return the value of a parallel config parameter 

112 

113 Explicitly setting it in Parallel has priority over setting in a 

114 parallel_(config/backend) context manager. 

115 """ 

116 if param is not default_parallel_config[key]: 

117 # param is explicitely set, return it 

118 return param 

119 

120 if context_config[key] is not default_parallel_config[key]: 

121 # there's a context manager and the key is set, return it 

122 return context_config[key] 

123 

124 # Otherwise, we are in the default_parallel_config, 

125 # return the default value 

126 return param.default_value 

127 

128 

129def get_active_backend( 

130 prefer=default_parallel_config["prefer"], 

131 require=default_parallel_config["require"], 

132 verbose=default_parallel_config["verbose"], 

133): 

134 """Return the active default backend""" 

135 backend, config = _get_active_backend(prefer, require, verbose) 

136 n_jobs = _get_config_param( 

137 default_parallel_config['n_jobs'], config, "n_jobs" 

138 ) 

139 return backend, n_jobs 

140 

141 

142def _get_active_backend( 

143 prefer=default_parallel_config["prefer"], 

144 require=default_parallel_config["require"], 

145 verbose=default_parallel_config["verbose"], 

146): 

147 """Return the active default backend""" 

148 

149 backend_config = getattr(_backend, "config", default_parallel_config) 

150 

151 backend = _get_config_param( 

152 default_parallel_config['backend'], backend_config, "backend" 

153 ) 

154 prefer = _get_config_param(prefer, backend_config, "prefer") 

155 require = _get_config_param(require, backend_config, "require") 

156 verbose = _get_config_param(verbose, backend_config, "verbose") 

157 

158 if prefer not in VALID_BACKEND_HINTS: 

159 raise ValueError( 

160 f"prefer={prefer} is not a valid backend hint, " 

161 f"expected one of {VALID_BACKEND_HINTS}" 

162 ) 

163 if require not in VALID_BACKEND_CONSTRAINTS: 

164 raise ValueError( 

165 f"require={require} is not a valid backend constraint, " 

166 f"expected one of {VALID_BACKEND_CONSTRAINTS}" 

167 ) 

168 if prefer == 'processes' and require == 'sharedmem': 

169 raise ValueError( 

170 "prefer == 'processes' and require == 'sharedmem'" 

171 " are inconsistent settings" 

172 ) 

173 

174 explicit_backend = True 

175 if backend is None: 

176 

177 # We are either outside of the scope of any parallel_(config/backend) 

178 # context manager or the context manager did not set a backend. 

179 # create the default backend instance now. 

180 backend = BACKENDS[DEFAULT_BACKEND](nesting_level=0) 

181 explicit_backend = False 

182 

183 # Try to use the backend set by the user with the context manager. 

184 

185 nesting_level = backend.nesting_level 

186 uses_threads = getattr(backend, 'uses_threads', False) 

187 supports_sharedmem = getattr(backend, 'supports_sharedmem', False) 

188 # Force to use thread-based backend if the provided backend does not 

189 # match the shared memory constraint or if the backend is not explicitely 

190 # given and threads are prefered. 

191 force_threads = (require == 'sharedmem' and not supports_sharedmem) 

192 force_threads |= ( 

193 not explicit_backend and prefer == 'threads' and not uses_threads 

194 ) 

195 if force_threads: 

196 # This backend does not match the shared memory constraint: 

197 # fallback to the default thead-based backend. 

198 sharedmem_backend = BACKENDS[DEFAULT_THREAD_BACKEND]( 

199 nesting_level=nesting_level 

200 ) 

201 # Warn the user if we forced the backend to thread-based, while the 

202 # user explicitely specified a non-thread-based backend. 

203 if verbose >= 10 and explicit_backend: 

204 print( 

205 f"Using {sharedmem_backend.__class__.__name__} as " 

206 f"joblib backend instead of {backend.__class__.__name__} " 

207 "as the latter does not provide shared memory semantics." 

208 ) 

209 # Force to n_jobs=1 by default 

210 thread_config = backend_config.copy() 

211 thread_config['n_jobs'] = 1 

212 return sharedmem_backend, thread_config 

213 

214 return backend, backend_config 

215 

216 

217class parallel_config: 

218 """Set the default backend or configuration for :class:`~joblib.Parallel`. 

219 

220 This is an alternative to directly passing keyword arguments to the 

221 :class:`~joblib.Parallel` class constructor. It is particularly useful when 

222 calling into library code that uses joblib internally but does not expose 

223 the various parallel configuration arguments in its own API. 

224 

225 Parameters 

226 ---------- 

227 backend : str or ParallelBackendBase instance, default=None 

228 If ``backend`` is a string it must match a previously registered 

229 implementation using the :func:`~register_parallel_backend` function. 

230 

231 By default the following backends are available: 

232 

233 - 'loky': single-host, process-based parallelism (used by default), 

234 - 'threading': single-host, thread-based parallelism, 

235 - 'multiprocessing': legacy single-host, process-based parallelism. 

236 

237 'loky' is recommended to run functions that manipulate Python objects. 

238 'threading' is a low-overhead alternative that is most efficient for 

239 functions that release the Global Interpreter Lock: e.g. I/O-bound 

240 code or CPU-bound code in a few calls to native code that explicitly 

241 releases the GIL. Note that on some rare systems (such as pyodide), 

242 multiprocessing and loky may not be available, in which case joblib 

243 defaults to threading. 

244 

245 In addition, if the ``dask`` and ``distributed`` Python packages are 

246 installed, it is possible to use the 'dask' backend for better 

247 scheduling of nested parallel calls without over-subscription and 

248 potentially distribute parallel calls over a networked cluster of 

249 several hosts. 

250 

251 It is also possible to use the distributed 'ray' backend for 

252 distributing the workload to a cluster of nodes. See more details 

253 in the Examples section below. 

254 

255 Alternatively the backend can be passed directly as an instance. 

256 

257 n_jobs : int, default=None 

258 The maximum number of concurrently running jobs, such as the number 

259 of Python worker processes when ``backend="loky"`` or the size of the 

260 thread-pool when ``backend="threading"``. 

261 If -1 all CPUs are used. If 1 is given, no parallel computing code 

262 is used at all, which is useful for debugging. For ``n_jobs`` below -1, 

263 (n_cpus + 1 + n_jobs) are used. Thus for ``n_jobs=-2``, all 

264 CPUs but one are used. 

265 ``None`` is a marker for 'unset' that will be interpreted as 

266 ``n_jobs=1`` in most backends. 

267 

268 verbose : int, default=0 

269 The verbosity level: if non zero, progress messages are 

270 printed. Above 50, the output is sent to stdout. 

271 The frequency of the messages increases with the verbosity level. 

272 If it more than 10, all iterations are reported. 

273 

274 temp_folder : str, default=None 

275 Folder to be used by the pool for memmapping large arrays 

276 for sharing memory with worker processes. If None, this will try in 

277 order: 

278 

279 - a folder pointed by the ``JOBLIB_TEMP_FOLDER`` environment 

280 variable, 

281 - ``/dev/shm`` if the folder exists and is writable: this is a 

282 RAM disk filesystem available by default on modern Linux 

283 distributions, 

284 - the default system temporary folder that can be 

285 overridden with ``TMP``, ``TMPDIR`` or ``TEMP`` environment 

286 variables, typically ``/tmp`` under Unix operating systems. 

287 

288 max_nbytes int, str, or None, optional, default='1M' 

289 Threshold on the size of arrays passed to the workers that 

290 triggers automated memory mapping in temp_folder. Can be an int 

291 in Bytes, or a human-readable string, e.g., '1M' for 1 megabyte. 

292 Use None to disable memmapping of large arrays. 

293 

294 mmap_mode: {None, 'r+', 'r', 'w+', 'c'}, default='r' 

295 Memmapping mode for numpy arrays passed to workers. None will 

296 disable memmapping, other modes defined in the numpy.memmap doc: 

297 https://numpy.org/doc/stable/reference/generated/numpy.memmap.html 

298 Also, see 'max_nbytes' parameter documentation for more details. 

299 

300 prefer: str in {'processes', 'threads'} or None, default=None 

301 Soft hint to choose the default backend. 

302 The default process-based backend is 'loky' and the default 

303 thread-based backend is 'threading'. Ignored if the ``backend`` 

304 parameter is specified. 

305 

306 require: 'sharedmem' or None, default=None 

307 Hard constraint to select the backend. If set to 'sharedmem', 

308 the selected backend will be single-host and thread-based. 

309 

310 inner_max_num_threads : int, default=None 

311 If not None, overwrites the limit set on the number of threads 

312 usable in some third-party library threadpools like OpenBLAS, 

313 MKL or OpenMP. This is only used with the ``loky`` backend. 

314 

315 backend_params : dict 

316 Additional parameters to pass to the backend constructor when 

317 backend is a string. 

318 

319 Notes 

320 ----- 

321 Joblib tries to limit the oversubscription by limiting the number of 

322 threads usable in some third-party library threadpools like OpenBLAS, MKL 

323 or OpenMP. The default limit in each worker is set to 

324 ``max(cpu_count() // effective_n_jobs, 1)`` but this limit can be 

325 overwritten with the ``inner_max_num_threads`` argument which will be used 

326 to set this limit in the child processes. 

327 

328 .. versionadded:: 1.3 

329 

330 Examples 

331 -------- 

332 >>> from operator import neg 

333 >>> with parallel_config(backend='threading'): 

334 ... print(Parallel()(delayed(neg)(i + 1) for i in range(5))) 

335 ... 

336 [-1, -2, -3, -4, -5] 

337 

338 To use the 'ray' joblib backend add the following lines: 

339 

340 >>> from ray.util.joblib import register_ray # doctest: +SKIP 

341 >>> register_ray() # doctest: +SKIP 

342 >>> with parallel_config(backend="ray"): # doctest: +SKIP 

343 ... print(Parallel()(delayed(neg)(i + 1) for i in range(5))) 

344 [-1, -2, -3, -4, -5] 

345 

346 """ 

347 def __init__( 

348 self, 

349 backend=default_parallel_config["backend"], 

350 *, 

351 n_jobs=default_parallel_config["n_jobs"], 

352 verbose=default_parallel_config["verbose"], 

353 temp_folder=default_parallel_config["temp_folder"], 

354 max_nbytes=default_parallel_config["max_nbytes"], 

355 mmap_mode=default_parallel_config["mmap_mode"], 

356 prefer=default_parallel_config["prefer"], 

357 require=default_parallel_config["require"], 

358 inner_max_num_threads=None, 

359 **backend_params 

360 ): 

361 # Save the parallel info and set the active parallel config 

362 self.old_parallel_config = getattr( 

363 _backend, "config", default_parallel_config 

364 ) 

365 

366 backend = self._check_backend( 

367 backend, inner_max_num_threads, **backend_params 

368 ) 

369 

370 new_config = { 

371 "n_jobs": n_jobs, 

372 "verbose": verbose, 

373 "temp_folder": temp_folder, 

374 "max_nbytes": max_nbytes, 

375 "mmap_mode": mmap_mode, 

376 "prefer": prefer, 

377 "require": require, 

378 "backend": backend 

379 } 

380 self.parallel_config = self.old_parallel_config.copy() 

381 self.parallel_config.update({ 

382 k: v for k, v in new_config.items() 

383 if not isinstance(v, _Sentinel) 

384 }) 

385 

386 setattr(_backend, "config", self.parallel_config) 

387 

388 def _check_backend(self, backend, inner_max_num_threads, **backend_params): 

389 if backend is default_parallel_config['backend']: 

390 if inner_max_num_threads is not None or len(backend_params) > 0: 

391 raise ValueError( 

392 "inner_max_num_threads and other constructor " 

393 "parameters backend_params are only supported " 

394 "when backend is not None." 

395 ) 

396 return backend 

397 

398 if isinstance(backend, str): 

399 # Handle non-registered or missing backends 

400 if backend not in BACKENDS: 

401 if backend in EXTERNAL_BACKENDS: 

402 register = EXTERNAL_BACKENDS[backend] 

403 register() 

404 elif backend in MAYBE_AVAILABLE_BACKENDS: 

405 warnings.warn( 

406 f"joblib backend '{backend}' is not available on " 

407 f"your system, falling back to {DEFAULT_BACKEND}.", 

408 UserWarning, 

409 stacklevel=2 

410 ) 

411 BACKENDS[backend] = BACKENDS[DEFAULT_BACKEND] 

412 else: 

413 raise ValueError( 

414 f"Invalid backend: {backend}, expected one of " 

415 f"{sorted(BACKENDS.keys())}" 

416 ) 

417 

418 backend = BACKENDS[backend](**backend_params) 

419 

420 if inner_max_num_threads is not None: 

421 msg = ( 

422 f"{backend.__class__.__name__} does not accept setting the " 

423 "inner_max_num_threads argument." 

424 ) 

425 assert backend.supports_inner_max_num_threads, msg 

426 backend.inner_max_num_threads = inner_max_num_threads 

427 

428 # If the nesting_level of the backend is not set previously, use the 

429 # nesting level from the previous active_backend to set it 

430 if backend.nesting_level is None: 

431 parent_backend = self.old_parallel_config['backend'] 

432 if parent_backend is default_parallel_config['backend']: 

433 nesting_level = 0 

434 else: 

435 nesting_level = parent_backend.nesting_level 

436 backend.nesting_level = nesting_level 

437 

438 return backend 

439 

440 def __enter__(self): 

441 return self.parallel_config 

442 

443 def __exit__(self, type, value, traceback): 

444 self.unregister() 

445 

446 def unregister(self): 

447 setattr(_backend, "config", self.old_parallel_config) 

448 

449 

450class parallel_backend(parallel_config): 

451 """Change the default backend used by Parallel inside a with block. 

452 

453 .. warning:: 

454 It is advised to use the :class:`~joblib.parallel_config` context 

455 manager instead, which allows more fine-grained control over the 

456 backend configuration. 

457 

458 If ``backend`` is a string it must match a previously registered 

459 implementation using the :func:`~register_parallel_backend` function. 

460 

461 By default the following backends are available: 

462 

463 - 'loky': single-host, process-based parallelism (used by default), 

464 - 'threading': single-host, thread-based parallelism, 

465 - 'multiprocessing': legacy single-host, process-based parallelism. 

466 

467 'loky' is recommended to run functions that manipulate Python objects. 

468 'threading' is a low-overhead alternative that is most efficient for 

469 functions that release the Global Interpreter Lock: e.g. I/O-bound code or 

470 CPU-bound code in a few calls to native code that explicitly releases the 

471 GIL. Note that on some rare systems (such as Pyodide), 

472 multiprocessing and loky may not be available, in which case joblib 

473 defaults to threading. 

474 

475 You can also use the `Dask <https://docs.dask.org/en/stable/>`_ joblib 

476 backend to distribute work across machines. This works well with 

477 scikit-learn estimators with the ``n_jobs`` parameter, for example:: 

478 

479 >>> import joblib # doctest: +SKIP 

480 >>> from sklearn.model_selection import GridSearchCV # doctest: +SKIP 

481 >>> from dask.distributed import Client, LocalCluster # doctest: +SKIP 

482 

483 >>> # create a local Dask cluster 

484 >>> cluster = LocalCluster() # doctest: +SKIP 

485 >>> client = Client(cluster) # doctest: +SKIP 

486 >>> grid_search = GridSearchCV(estimator, param_grid, n_jobs=-1) 

487 ... # doctest: +SKIP 

488 >>> with joblib.parallel_backend("dask", scatter=[X, y]): # doctest: +SKIP 

489 ... grid_search.fit(X, y) 

490 

491 It is also possible to use the distributed 'ray' backend for distributing 

492 the workload to a cluster of nodes. To use the 'ray' joblib backend add 

493 the following lines:: 

494 

495 >>> from ray.util.joblib import register_ray # doctest: +SKIP 

496 >>> register_ray() # doctest: +SKIP 

497 >>> with parallel_backend("ray"): # doctest: +SKIP 

498 ... print(Parallel()(delayed(neg)(i + 1) for i in range(5))) 

499 [-1, -2, -3, -4, -5] 

500 

501 Alternatively the backend can be passed directly as an instance. 

502 

503 By default all available workers will be used (``n_jobs=-1``) unless the 

504 caller passes an explicit value for the ``n_jobs`` parameter. 

505 

506 This is an alternative to passing a ``backend='backend_name'`` argument to 

507 the :class:`~Parallel` class constructor. It is particularly useful when 

508 calling into library code that uses joblib internally but does not expose 

509 the backend argument in its own API. 

510 

511 >>> from operator import neg 

512 >>> with parallel_backend('threading'): 

513 ... print(Parallel()(delayed(neg)(i + 1) for i in range(5))) 

514 ... 

515 [-1, -2, -3, -4, -5] 

516 

517 Joblib also tries to limit the oversubscription by limiting the number of 

518 threads usable in some third-party library threadpools like OpenBLAS, MKL 

519 or OpenMP. The default limit in each worker is set to 

520 ``max(cpu_count() // effective_n_jobs, 1)`` but this limit can be 

521 overwritten with the ``inner_max_num_threads`` argument which will be used 

522 to set this limit in the child processes. 

523 

524 .. versionadded:: 0.10 

525 

526 See Also 

527 -------- 

528 joblib.parallel_config : context manager to change the backend 

529 configuration. 

530 """ 

531 def __init__(self, backend, n_jobs=-1, inner_max_num_threads=None, 

532 **backend_params): 

533 

534 super().__init__( 

535 backend=backend, 

536 n_jobs=n_jobs, 

537 inner_max_num_threads=inner_max_num_threads, 

538 **backend_params 

539 ) 

540 

541 if self.old_parallel_config is None: 

542 self.old_backend_and_jobs = None 

543 else: 

544 self.old_backend_and_jobs = ( 

545 self.old_parallel_config["backend"], 

546 self.old_parallel_config["n_jobs"], 

547 ) 

548 self.new_backend_and_jobs = ( 

549 self.parallel_config["backend"], 

550 self.parallel_config["n_jobs"], 

551 ) 

552 

553 def __enter__(self): 

554 return self.new_backend_and_jobs 

555 

556 

557# Under Linux or OS X the default start method of multiprocessing 

558# can cause third party libraries to crash. Under Python 3.4+ it is possible 

559# to set an environment variable to switch the default start method from 

560# 'fork' to 'forkserver' or 'spawn' to avoid this issue albeit at the cost 

561# of causing semantic changes and some additional pool instantiation overhead. 

562DEFAULT_MP_CONTEXT = None 

563if hasattr(mp, 'get_context'): 

564 method = os.environ.get('JOBLIB_START_METHOD', '').strip() or None 

565 if method is not None: 

566 DEFAULT_MP_CONTEXT = mp.get_context(method=method) 

567 

568 

569class BatchedCalls(object): 

570 """Wrap a sequence of (func, args, kwargs) tuples as a single callable""" 

571 

572 def __init__(self, iterator_slice, backend_and_jobs, reducer_callback=None, 

573 pickle_cache=None): 

574 self.items = list(iterator_slice) 

575 self._size = len(self.items) 

576 self._reducer_callback = reducer_callback 

577 if isinstance(backend_and_jobs, tuple): 

578 self._backend, self._n_jobs = backend_and_jobs 

579 else: 

580 # this is for backward compatibility purposes. Before 0.12.6, 

581 # nested backends were returned without n_jobs indications. 

582 self._backend, self._n_jobs = backend_and_jobs, None 

583 self._pickle_cache = pickle_cache if pickle_cache is not None else {} 

584 

585 def __call__(self): 

586 # Set the default nested backend to self._backend but do not set the 

587 # change the default number of processes to -1 

588 with parallel_config(backend=self._backend, n_jobs=self._n_jobs): 

589 return [func(*args, **kwargs) 

590 for func, args, kwargs in self.items] 

591 

592 def __reduce__(self): 

593 if self._reducer_callback is not None: 

594 self._reducer_callback() 

595 # no need to pickle the callback. 

596 return ( 

597 BatchedCalls, 

598 (self.items, (self._backend, self._n_jobs), None, 

599 self._pickle_cache) 

600 ) 

601 

602 def __len__(self): 

603 return self._size 

604 

605 

606# Possible exit status for a task 

607TASK_DONE = "Done" 

608TASK_ERROR = "Error" 

609TASK_PENDING = "Pending" 

610 

611 

612############################################################################### 

613# CPU count that works also when multiprocessing has been disabled via 

614# the JOBLIB_MULTIPROCESSING environment variable 

615def cpu_count(only_physical_cores=False): 

616 """Return the number of CPUs. 

617 

618 This delegates to loky.cpu_count that takes into account additional 

619 constraints such as Linux CFS scheduler quotas (typically set by container 

620 runtimes such as docker) and CPU affinity (for instance using the taskset 

621 command on Linux). 

622 

623 If only_physical_cores is True, do not take hyperthreading / SMT logical 

624 cores into account. 

625 """ 

626 if mp is None: 

627 return 1 

628 

629 return loky.cpu_count(only_physical_cores=only_physical_cores) 

630 

631 

632############################################################################### 

633# For verbosity 

634 

635def _verbosity_filter(index, verbose): 

636 """ Returns False for indices increasingly apart, the distance 

637 depending on the value of verbose. 

638 

639 We use a lag increasing as the square of index 

640 """ 

641 if not verbose: 

642 return True 

643 elif verbose > 10: 

644 return False 

645 if index == 0: 

646 return False 

647 verbose = .5 * (11 - verbose) ** 2 

648 scale = sqrt(index / verbose) 

649 next_scale = sqrt((index + 1) / verbose) 

650 return (int(next_scale) == int(scale)) 

651 

652 

653############################################################################### 

654def delayed(function): 

655 """Decorator used to capture the arguments of a function.""" 

656 

657 def delayed_function(*args, **kwargs): 

658 return function, args, kwargs 

659 try: 

660 delayed_function = functools.wraps(function)(delayed_function) 

661 except AttributeError: 

662 " functools.wraps fails on some callable objects " 

663 return delayed_function 

664 

665 

666############################################################################### 

667class BatchCompletionCallBack(object): 

668 """Callback to keep track of completed results and schedule the next tasks. 

669 

670 This callable is executed by the parent process whenever a worker process 

671 has completed a batch of tasks. 

672 

673 It is used for progress reporting, to update estimate of the batch 

674 processing duration and to schedule the next batch of tasks to be 

675 processed. 

676 

677 It is assumed that this callback will always be triggered by the backend 

678 right after the end of a task, in case of success as well as in case of 

679 failure. 

680 """ 

681 

682 ########################################################################## 

683 # METHODS CALLED BY THE MAIN THREAD # 

684 ########################################################################## 

685 def __init__(self, dispatch_timestamp, batch_size, parallel): 

686 self.dispatch_timestamp = dispatch_timestamp 

687 self.batch_size = batch_size 

688 self.parallel = parallel 

689 self.parallel_call_id = parallel._call_id 

690 

691 # Internals to keep track of the status and outcome of the task. 

692 

693 # Used to hold a reference to the future-like object returned by the 

694 # backend after launching this task 

695 # This will be set later when calling `register_job`, as it is only 

696 # created once the task has been submitted. 

697 self.job = None 

698 

699 if not parallel._backend.supports_retrieve_callback: 

700 # The status is only used for asynchronous result retrieval in the 

701 # callback. 

702 self.status = None 

703 else: 

704 # The initial status for the job is TASK_PENDING. 

705 # Once it is done, it will be either TASK_DONE, or TASK_ERROR. 

706 self.status = TASK_PENDING 

707 

708 def register_job(self, job): 

709 """Register the object returned by `apply_async`.""" 

710 self.job = job 

711 

712 def get_result(self, timeout): 

713 """Returns the raw result of the task that was submitted. 

714 

715 If the task raised an exception rather than returning, this same 

716 exception will be raised instead. 

717 

718 If the backend supports the retrieval callback, it is assumed that this 

719 method is only called after the result has been registered. It is 

720 ensured by checking that `self.status(timeout)` does not return 

721 TASK_PENDING. In this case, `get_result` directly returns the 

722 registered result (or raise the registered exception). 

723 

724 For other backends, there are no such assumptions, but `get_result` 

725 still needs to synchronously retrieve the result before it can 

726 return it or raise. It will block at most `self.timeout` seconds 

727 waiting for retrieval to complete, after that it raises a TimeoutError. 

728 """ 

729 

730 backend = self.parallel._backend 

731 

732 if backend.supports_retrieve_callback: 

733 # We assume that the result has already been retrieved by the 

734 # callback thread, and is stored internally. It's just waiting to 

735 # be returned. 

736 return self._return_or_raise() 

737 

738 # For other backends, the main thread needs to run the retrieval step. 

739 try: 

740 if backend.supports_timeout: 

741 result = self.job.get(timeout=timeout) 

742 else: 

743 result = self.job.get() 

744 outcome = dict(result=result, status=TASK_DONE) 

745 except BaseException as e: 

746 outcome = dict(result=e, status=TASK_ERROR) 

747 self._register_outcome(outcome) 

748 

749 return self._return_or_raise() 

750 

751 def _return_or_raise(self): 

752 try: 

753 if self.status == TASK_ERROR: 

754 raise self._result 

755 return self._result 

756 finally: 

757 del self._result 

758 

759 def get_status(self, timeout): 

760 """Get the status of the task. 

761 

762 This function also checks if the timeout has been reached and register 

763 the TimeoutError outcome when it is the case. 

764 """ 

765 if timeout is None or self.status != TASK_PENDING: 

766 return self.status 

767 

768 # The computation are running and the status is pending. 

769 # Check that we did not wait for this jobs more than `timeout`. 

770 now = time.time() 

771 if not hasattr(self, "_completion_timeout_counter"): 

772 self._completion_timeout_counter = now 

773 

774 if (now - self._completion_timeout_counter) > timeout: 

775 outcome = dict(result=TimeoutError(), status=TASK_ERROR) 

776 self._register_outcome(outcome) 

777 

778 return self.status 

779 

780 ########################################################################## 

781 # METHODS CALLED BY CALLBACK THREADS # 

782 ########################################################################## 

783 def __call__(self, out): 

784 """Function called by the callback thread after a job is completed.""" 

785 

786 # If the backend doesn't support callback retrievals, the next batch of 

787 # tasks is dispatched regardless. The result will be retrieved by the 

788 # main thread when calling `get_result`. 

789 if not self.parallel._backend.supports_retrieve_callback: 

790 self._dispatch_new() 

791 return 

792 

793 # If the backend supports retrieving the result in the callback, it 

794 # registers the task outcome (TASK_ERROR or TASK_DONE), and schedules 

795 # the next batch if needed. 

796 with self.parallel._lock: 

797 # Edge case where while the task was processing, the `parallel` 

798 # instance has been reset and a new call has been issued, but the 

799 # worker managed to complete the task and trigger this callback 

800 # call just before being aborted by the reset. 

801 if self.parallel._call_id != self.parallel_call_id: 

802 return 

803 

804 # When aborting, stop as fast as possible and do not retrieve the 

805 # result as it won't be returned by the Parallel call. 

806 if self.parallel._aborting: 

807 return 

808 

809 # Retrieves the result of the task in the main process and dispatch 

810 # a new batch if needed. 

811 job_succeeded = self._retrieve_result(out) 

812 

813 if job_succeeded: 

814 self._dispatch_new() 

815 

816 def _dispatch_new(self): 

817 """Schedule the next batch of tasks to be processed.""" 

818 

819 # This steps ensure that auto-baching works as expected. 

820 this_batch_duration = time.time() - self.dispatch_timestamp 

821 self.parallel._backend.batch_completed(self.batch_size, 

822 this_batch_duration) 

823 

824 # Schedule the next batch of tasks. 

825 with self.parallel._lock: 

826 self.parallel.n_completed_tasks += self.batch_size 

827 self.parallel.print_progress() 

828 if self.parallel._original_iterator is not None: 

829 self.parallel.dispatch_next() 

830 

831 def _retrieve_result(self, out): 

832 """Fetch and register the outcome of a task. 

833 

834 Return True if the task succeeded, False otherwise. 

835 This function is only called by backends that support retrieving 

836 the task result in the callback thread. 

837 """ 

838 try: 

839 result = self.parallel._backend.retrieve_result_callback(out) 

840 outcome = dict(status=TASK_DONE, result=result) 

841 except BaseException as e: 

842 # Avoid keeping references to parallel in the error. 

843 e.__traceback__ = None 

844 outcome = dict(result=e, status=TASK_ERROR) 

845 

846 self._register_outcome(outcome) 

847 return outcome['status'] != TASK_ERROR 

848 

849 ########################################################################## 

850 # This method can be called either in the main thread # 

851 # or in the callback thread. # 

852 ########################################################################## 

853 def _register_outcome(self, outcome): 

854 """Register the outcome of a task. 

855 

856 This method can be called only once, future calls will be ignored. 

857 """ 

858 # Covers the edge case where the main thread tries to register a 

859 # `TimeoutError` while the callback thread tries to register a result 

860 # at the same time. 

861 with self.parallel._lock: 

862 if self.status not in (TASK_PENDING, None): 

863 return 

864 self.status = outcome["status"] 

865 

866 self._result = outcome["result"] 

867 

868 # Once the result and the status are extracted, the last reference to 

869 # the job can be deleted. 

870 self.job = None 

871 

872 # As soon as an error as been spotted, early stopping flags are sent to 

873 # the `parallel` instance. 

874 if self.status == TASK_ERROR: 

875 self.parallel._exception = True 

876 self.parallel._aborting = True 

877 

878 

879############################################################################### 

880def register_parallel_backend(name, factory, make_default=False): 

881 """Register a new Parallel backend factory. 

882 

883 The new backend can then be selected by passing its name as the backend 

884 argument to the :class:`~Parallel` class. Moreover, the default backend can 

885 be overwritten globally by setting make_default=True. 

886 

887 The factory can be any callable that takes no argument and return an 

888 instance of ``ParallelBackendBase``. 

889 

890 Warning: this function is experimental and subject to change in a future 

891 version of joblib. 

892 

893 .. versionadded:: 0.10 

894 """ 

895 BACKENDS[name] = factory 

896 if make_default: 

897 global DEFAULT_BACKEND 

898 DEFAULT_BACKEND = name 

899 

900 

901def effective_n_jobs(n_jobs=-1): 

902 """Determine the number of jobs that can actually run in parallel 

903 

904 n_jobs is the number of workers requested by the callers. Passing n_jobs=-1 

905 means requesting all available workers for instance matching the number of 

906 CPU cores on the worker host(s). 

907 

908 This method should return a guesstimate of the number of workers that can 

909 actually perform work concurrently with the currently enabled default 

910 backend. The primary use case is to make it possible for the caller to know 

911 in how many chunks to slice the work. 

912 

913 In general working on larger data chunks is more efficient (less scheduling 

914 overhead and better use of CPU cache prefetching heuristics) as long as all 

915 the workers have enough work to do. 

916 

917 Warning: this function is experimental and subject to change in a future 

918 version of joblib. 

919 

920 .. versionadded:: 0.10 

921 """ 

922 if n_jobs == 1: 

923 return 1 

924 

925 backend, backend_n_jobs = get_active_backend() 

926 if n_jobs is None: 

927 n_jobs = backend_n_jobs 

928 return backend.effective_n_jobs(n_jobs=n_jobs) 

929 

930 

931############################################################################### 

932class Parallel(Logger): 

933 ''' Helper class for readable parallel mapping. 

934 

935 Read more in the :ref:`User Guide <parallel>`. 

936 

937 Parameters 

938 ---------- 

939 n_jobs: int, default: None 

940 The maximum number of concurrently running jobs, such as the number 

941 of Python worker processes when backend="multiprocessing" 

942 or the size of the thread-pool when backend="threading". 

943 If -1 all CPUs are used. 

944 If 1 is given, no parallel computing code is used at all, and the 

945 behavior amounts to a simple python `for` loop. This mode is not 

946 compatible with `timeout`. 

947 For n_jobs below -1, (n_cpus + 1 + n_jobs) are used. Thus for 

948 n_jobs = -2, all CPUs but one are used. 

949 None is a marker for 'unset' that will be interpreted as n_jobs=1 

950 unless the call is performed under a :func:`~parallel_config` 

951 context manager that sets another value for ``n_jobs``. 

952 backend: str, ParallelBackendBase instance or None, default: 'loky' 

953 Specify the parallelization backend implementation. 

954 Supported backends are: 

955 

956 - "loky" used by default, can induce some 

957 communication and memory overhead when exchanging input and 

958 output data with the worker Python processes. On some rare 

959 systems (such as Pyiodide), the loky backend may not be 

960 available. 

961 - "multiprocessing" previous process-based backend based on 

962 `multiprocessing.Pool`. Less robust than `loky`. 

963 - "threading" is a very low-overhead backend but it suffers 

964 from the Python Global Interpreter Lock if the called function 

965 relies a lot on Python objects. "threading" is mostly useful 

966 when the execution bottleneck is a compiled extension that 

967 explicitly releases the GIL (for instance a Cython loop wrapped 

968 in a "with nogil" block or an expensive call to a library such 

969 as NumPy). 

970 - finally, you can register backends by calling 

971 :func:`~register_parallel_backend`. This will allow you to 

972 implement a backend of your liking. 

973 

974 It is not recommended to hard-code the backend name in a call to 

975 :class:`~Parallel` in a library. Instead it is recommended to set 

976 soft hints (prefer) or hard constraints (require) so as to make it 

977 possible for library users to change the backend from the outside 

978 using the :func:`~parallel_config` context manager. 

979 return_as: str in {'list', 'generator'}, default: 'list' 

980 If 'list', calls to this instance will return a list, only when 

981 all results have been processed and retrieved. 

982 If 'generator', it will return a generator that yields the results 

983 as soon as they are available, in the order the tasks have been 

984 submitted with. 

985 Future releases are planned to also support 'generator_unordered', 

986 in which case the generator immediately yields available results 

987 independently of the submission order. 

988 prefer: str in {'processes', 'threads'} or None, default: None 

989 Soft hint to choose the default backend if no specific backend 

990 was selected with the :func:`~parallel_config` context manager. 

991 The default process-based backend is 'loky' and the default 

992 thread-based backend is 'threading'. Ignored if the ``backend`` 

993 parameter is specified. 

994 require: 'sharedmem' or None, default None 

995 Hard constraint to select the backend. If set to 'sharedmem', 

996 the selected backend will be single-host and thread-based even 

997 if the user asked for a non-thread based backend with 

998 :func:`~joblib.parallel_config`. 

999 verbose: int, optional 

1000 The verbosity level: if non zero, progress messages are 

1001 printed. Above 50, the output is sent to stdout. 

1002 The frequency of the messages increases with the verbosity level. 

1003 If it more than 10, all iterations are reported. 

1004 timeout: float, optional 

1005 Timeout limit for each task to complete. If any task takes longer 

1006 a TimeOutError will be raised. Only applied when n_jobs != 1 

1007 pre_dispatch: {'all', integer, or expression, as in '3*n_jobs'} 

1008 The number of batches (of tasks) to be pre-dispatched. 

1009 Default is '2*n_jobs'. When batch_size="auto" this is reasonable 

1010 default and the workers should never starve. Note that only basic 

1011 arithmetics are allowed here and no modules can be used in this 

1012 expression. 

1013 batch_size: int or 'auto', default: 'auto' 

1014 The number of atomic tasks to dispatch at once to each 

1015 worker. When individual evaluations are very fast, dispatching 

1016 calls to workers can be slower than sequential computation because 

1017 of the overhead. Batching fast computations together can mitigate 

1018 this. 

1019 The ``'auto'`` strategy keeps track of the time it takes for a 

1020 batch to complete, and dynamically adjusts the batch size to keep 

1021 the time on the order of half a second, using a heuristic. The 

1022 initial batch size is 1. 

1023 ``batch_size="auto"`` with ``backend="threading"`` will dispatch 

1024 batches of a single task at a time as the threading backend has 

1025 very little overhead and using larger batch size has not proved to 

1026 bring any gain in that case. 

1027 temp_folder: str, optional 

1028 Folder to be used by the pool for memmapping large arrays 

1029 for sharing memory with worker processes. If None, this will try in 

1030 order: 

1031 

1032 - a folder pointed by the JOBLIB_TEMP_FOLDER environment 

1033 variable, 

1034 - /dev/shm if the folder exists and is writable: this is a 

1035 RAM disk filesystem available by default on modern Linux 

1036 distributions, 

1037 - the default system temporary folder that can be 

1038 overridden with TMP, TMPDIR or TEMP environment 

1039 variables, typically /tmp under Unix operating systems. 

1040 

1041 Only active when backend="loky" or "multiprocessing". 

1042 max_nbytes int, str, or None, optional, 1M by default 

1043 Threshold on the size of arrays passed to the workers that 

1044 triggers automated memory mapping in temp_folder. Can be an int 

1045 in Bytes, or a human-readable string, e.g., '1M' for 1 megabyte. 

1046 Use None to disable memmapping of large arrays. 

1047 Only active when backend="loky" or "multiprocessing". 

1048 mmap_mode: {None, 'r+', 'r', 'w+', 'c'}, default: 'r' 

1049 Memmapping mode for numpy arrays passed to workers. None will 

1050 disable memmapping, other modes defined in the numpy.memmap doc: 

1051 https://numpy.org/doc/stable/reference/generated/numpy.memmap.html 

1052 Also, see 'max_nbytes' parameter documentation for more details. 

1053 

1054 Notes 

1055 ----- 

1056 

1057 This object uses workers to compute in parallel the application of a 

1058 function to many different arguments. The main functionality it brings 

1059 in addition to using the raw multiprocessing or concurrent.futures API 

1060 are (see examples for details): 

1061 

1062 * More readable code, in particular since it avoids 

1063 constructing list of arguments. 

1064 

1065 * Easier debugging: 

1066 - informative tracebacks even when the error happens on 

1067 the client side 

1068 - using 'n_jobs=1' enables to turn off parallel computing 

1069 for debugging without changing the codepath 

1070 - early capture of pickling errors 

1071 

1072 * An optional progress meter. 

1073 

1074 * Interruption of multiprocesses jobs with 'Ctrl-C' 

1075 

1076 * Flexible pickling control for the communication to and from 

1077 the worker processes. 

1078 

1079 * Ability to use shared memory efficiently with worker 

1080 processes for large numpy-based datastructures. 

1081 

1082 Note that the intended usage is to run one call at a time. Multiple 

1083 calls to the same Parallel object will result in a ``RuntimeError`` 

1084 

1085 Examples 

1086 -------- 

1087 

1088 A simple example: 

1089 

1090 >>> from math import sqrt 

1091 >>> from joblib import Parallel, delayed 

1092 >>> Parallel(n_jobs=1)(delayed(sqrt)(i**2) for i in range(10)) 

1093 [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0] 

1094 

1095 Reshaping the output when the function has several return 

1096 values: 

1097 

1098 >>> from math import modf 

1099 >>> from joblib import Parallel, delayed 

1100 >>> r = Parallel(n_jobs=1)(delayed(modf)(i/2.) for i in range(10)) 

1101 >>> res, i = zip(*r) 

1102 >>> res 

1103 (0.0, 0.5, 0.0, 0.5, 0.0, 0.5, 0.0, 0.5, 0.0, 0.5) 

1104 >>> i 

1105 (0.0, 0.0, 1.0, 1.0, 2.0, 2.0, 3.0, 3.0, 4.0, 4.0) 

1106 

1107 The progress meter: the higher the value of `verbose`, the more 

1108 messages: 

1109 

1110 >>> from time import sleep 

1111 >>> from joblib import Parallel, delayed 

1112 >>> r = Parallel(n_jobs=2, verbose=10)( 

1113 ... delayed(sleep)(.2) for _ in range(10)) #doctest: +SKIP 

1114 [Parallel(n_jobs=2)]: Done 1 tasks | elapsed: 0.6s 

1115 [Parallel(n_jobs=2)]: Done 4 tasks | elapsed: 0.8s 

1116 [Parallel(n_jobs=2)]: Done 10 out of 10 | elapsed: 1.4s finished 

1117 

1118 Traceback example, note how the line of the error is indicated 

1119 as well as the values of the parameter passed to the function that 

1120 triggered the exception, even though the traceback happens in the 

1121 child process: 

1122 

1123 >>> from heapq import nlargest 

1124 >>> from joblib import Parallel, delayed 

1125 >>> Parallel(n_jobs=2)( 

1126 ... delayed(nlargest)(2, n) for n in (range(4), 'abcde', 3)) 

1127 ... # doctest: +SKIP 

1128 ----------------------------------------------------------------------- 

1129 Sub-process traceback: 

1130 ----------------------------------------------------------------------- 

1131 TypeError Mon Nov 12 11:37:46 2012 

1132 PID: 12934 Python 2.7.3: /usr/bin/python 

1133 ........................................................................ 

1134 /usr/lib/python2.7/heapq.pyc in nlargest(n=2, iterable=3, key=None) 

1135 419 if n >= size: 

1136 420 return sorted(iterable, key=key, reverse=True)[:n] 

1137 421 

1138 422 # When key is none, use simpler decoration 

1139 423 if key is None: 

1140 --> 424 it = izip(iterable, count(0,-1)) # decorate 

1141 425 result = _nlargest(n, it) 

1142 426 return map(itemgetter(0), result) # undecorate 

1143 427 

1144 428 # General case, slowest method 

1145 TypeError: izip argument #1 must support iteration 

1146 _______________________________________________________________________ 

1147 

1148 

1149 Using pre_dispatch in a producer/consumer situation, where the 

1150 data is generated on the fly. Note how the producer is first 

1151 called 3 times before the parallel loop is initiated, and then 

1152 called to generate new data on the fly: 

1153 

1154 >>> from math import sqrt 

1155 >>> from joblib import Parallel, delayed 

1156 >>> def producer(): 

1157 ... for i in range(6): 

1158 ... print('Produced %s' % i) 

1159 ... yield i 

1160 >>> out = Parallel(n_jobs=2, verbose=100, pre_dispatch='1.5*n_jobs')( 

1161 ... delayed(sqrt)(i) for i in producer()) #doctest: +SKIP 

1162 Produced 0 

1163 Produced 1 

1164 Produced 2 

1165 [Parallel(n_jobs=2)]: Done 1 jobs | elapsed: 0.0s 

1166 Produced 3 

1167 [Parallel(n_jobs=2)]: Done 2 jobs | elapsed: 0.0s 

1168 Produced 4 

1169 [Parallel(n_jobs=2)]: Done 3 jobs | elapsed: 0.0s 

1170 Produced 5 

1171 [Parallel(n_jobs=2)]: Done 4 jobs | elapsed: 0.0s 

1172 [Parallel(n_jobs=2)]: Done 6 out of 6 | elapsed: 0.0s remaining: 0.0s 

1173 [Parallel(n_jobs=2)]: Done 6 out of 6 | elapsed: 0.0s finished 

1174 

1175 ''' 

1176 def __init__( 

1177 self, 

1178 n_jobs=default_parallel_config["n_jobs"], 

1179 backend=default_parallel_config['backend'], 

1180 return_as="list", 

1181 verbose=default_parallel_config["verbose"], 

1182 timeout=None, 

1183 pre_dispatch='2 * n_jobs', 

1184 batch_size='auto', 

1185 temp_folder=default_parallel_config["temp_folder"], 

1186 max_nbytes=default_parallel_config["max_nbytes"], 

1187 mmap_mode=default_parallel_config["mmap_mode"], 

1188 prefer=default_parallel_config["prefer"], 

1189 require=default_parallel_config["require"], 

1190 ): 

1191 # Initiate parent Logger class state 

1192 super().__init__() 

1193 

1194 # Interpret n_jobs=None as 'unset' 

1195 if n_jobs is None: 

1196 n_jobs = default_parallel_config["n_jobs"] 

1197 

1198 active_backend, context_config = _get_active_backend( 

1199 prefer=prefer, require=require, verbose=verbose 

1200 ) 

1201 

1202 nesting_level = active_backend.nesting_level 

1203 

1204 self.verbose = _get_config_param(verbose, context_config, "verbose") 

1205 self.timeout = timeout 

1206 self.pre_dispatch = pre_dispatch 

1207 

1208 if return_as not in {"list", "generator"}: 

1209 raise ValueError( 

1210 'Expected `return_as` parameter to be a string equal to "list"' 

1211 f' or "generator", but got {return_as} instead' 

1212 ) 

1213 self.return_as = return_as 

1214 self.return_generator = return_as != "list" 

1215 

1216 # Check if we are under a parallel_config or parallel_backend 

1217 # context manager and use the config from the context manager 

1218 # for arguments that are not explicitly set. 

1219 self._backend_args = { 

1220 k: _get_config_param(param, context_config, k) for param, k in [ 

1221 (max_nbytes, "max_nbytes"), 

1222 (temp_folder, "temp_folder"), 

1223 (mmap_mode, "mmap_mode"), 

1224 (prefer, "prefer"), 

1225 (require, "require"), 

1226 (verbose, "verbose"), 

1227 ] 

1228 } 

1229 

1230 if isinstance(self._backend_args["max_nbytes"], str): 

1231 self._backend_args["max_nbytes"] = memstr_to_bytes( 

1232 self._backend_args["max_nbytes"] 

1233 ) 

1234 self._backend_args["verbose"] = max( 

1235 0, self._backend_args["verbose"] - 50 

1236 ) 

1237 

1238 if DEFAULT_MP_CONTEXT is not None: 

1239 self._backend_args['context'] = DEFAULT_MP_CONTEXT 

1240 elif hasattr(mp, "get_context"): 

1241 self._backend_args['context'] = mp.get_context() 

1242 

1243 if backend is default_parallel_config['backend'] or backend is None: 

1244 backend = active_backend 

1245 

1246 elif isinstance(backend, ParallelBackendBase): 

1247 # Use provided backend as is, with the current nesting_level if it 

1248 # is not set yet. 

1249 if backend.nesting_level is None: 

1250 backend.nesting_level = nesting_level 

1251 

1252 elif hasattr(backend, 'Pool') and hasattr(backend, 'Lock'): 

1253 # Make it possible to pass a custom multiprocessing context as 

1254 # backend to change the start method to forkserver or spawn or 

1255 # preload modules on the forkserver helper process. 

1256 self._backend_args['context'] = backend 

1257 backend = MultiprocessingBackend(nesting_level=nesting_level) 

1258 

1259 elif backend not in BACKENDS and backend in MAYBE_AVAILABLE_BACKENDS: 

1260 warnings.warn( 

1261 f"joblib backend '{backend}' is not available on " 

1262 f"your system, falling back to {DEFAULT_BACKEND}.", 

1263 UserWarning, 

1264 stacklevel=2) 

1265 BACKENDS[backend] = BACKENDS[DEFAULT_BACKEND] 

1266 backend = BACKENDS[DEFAULT_BACKEND](nesting_level=nesting_level) 

1267 

1268 else: 

1269 try: 

1270 backend_factory = BACKENDS[backend] 

1271 except KeyError as e: 

1272 raise ValueError("Invalid backend: %s, expected one of %r" 

1273 % (backend, sorted(BACKENDS.keys()))) from e 

1274 backend = backend_factory(nesting_level=nesting_level) 

1275 

1276 n_jobs = _get_config_param(n_jobs, context_config, "n_jobs") 

1277 if n_jobs is None: 

1278 # No specific context override and no specific value request: 

1279 # default to the default of the backend. 

1280 n_jobs = backend.default_n_jobs 

1281 self.n_jobs = n_jobs 

1282 

1283 if (require == 'sharedmem' and 

1284 not getattr(backend, 'supports_sharedmem', False)): 

1285 raise ValueError("Backend %s does not support shared memory" 

1286 % backend) 

1287 

1288 if (batch_size == 'auto' or isinstance(batch_size, Integral) and 

1289 batch_size > 0): 

1290 self.batch_size = batch_size 

1291 else: 

1292 raise ValueError( 

1293 "batch_size must be 'auto' or a positive integer, got: %r" 

1294 % batch_size) 

1295 

1296 if not isinstance(backend, SequentialBackend): 

1297 if self.return_generator and not backend.supports_return_generator: 

1298 raise ValueError( 

1299 "Backend {} does not support " 

1300 "return_as={}".format(backend, return_as) 

1301 ) 

1302 # This lock is used to coordinate the main thread of this process 

1303 # with the async callback thread of our the pool. 

1304 self._lock = threading.RLock() 

1305 self._jobs = collections.deque() 

1306 self._pending_outputs = list() 

1307 self._ready_batches = queue.Queue() 

1308 self._reducer_callback = None 

1309 

1310 # Internal variables 

1311 self._backend = backend 

1312 self._running = False 

1313 self._managed_backend = False 

1314 self._id = uuid4().hex 

1315 self._call_ref = None 

1316 

1317 def __enter__(self): 

1318 self._managed_backend = True 

1319 self._calling = False 

1320 self._initialize_backend() 

1321 return self 

1322 

1323 def __exit__(self, exc_type, exc_value, traceback): 

1324 self._managed_backend = False 

1325 if self.return_generator and self._calling: 

1326 self._abort() 

1327 self._terminate_and_reset() 

1328 

1329 def _initialize_backend(self): 

1330 """Build a process or thread pool and return the number of workers""" 

1331 try: 

1332 n_jobs = self._backend.configure(n_jobs=self.n_jobs, parallel=self, 

1333 **self._backend_args) 

1334 if self.timeout is not None and not self._backend.supports_timeout: 

1335 warnings.warn( 

1336 'The backend class {!r} does not support timeout. ' 

1337 "You have set 'timeout={}' in Parallel but " 

1338 "the 'timeout' parameter will not be used.".format( 

1339 self._backend.__class__.__name__, 

1340 self.timeout)) 

1341 

1342 except FallbackToBackend as e: 

1343 # Recursively initialize the backend in case of requested fallback. 

1344 self._backend = e.backend 

1345 n_jobs = self._initialize_backend() 

1346 

1347 return n_jobs 

1348 

1349 def _effective_n_jobs(self): 

1350 if self._backend: 

1351 return self._backend.effective_n_jobs(self.n_jobs) 

1352 return 1 

1353 

1354 def _terminate_and_reset(self): 

1355 if hasattr(self._backend, 'stop_call') and self._calling: 

1356 self._backend.stop_call() 

1357 self._calling = False 

1358 if not self._managed_backend: 

1359 self._backend.terminate() 

1360 

1361 def _dispatch(self, batch): 

1362 """Queue the batch for computing, with or without multiprocessing 

1363 

1364 WARNING: this method is not thread-safe: it should be only called 

1365 indirectly via dispatch_one_batch. 

1366 

1367 """ 

1368 # If job.get() catches an exception, it closes the queue: 

1369 if self._aborting: 

1370 return 

1371 

1372 batch_size = len(batch) 

1373 

1374 self.n_dispatched_tasks += batch_size 

1375 self.n_dispatched_batches += 1 

1376 

1377 dispatch_timestamp = time.time() 

1378 

1379 batch_tracker = BatchCompletionCallBack( 

1380 dispatch_timestamp, batch_size, self 

1381 ) 

1382 self._jobs.append(batch_tracker) 

1383 

1384 job = self._backend.apply_async(batch, callback=batch_tracker) 

1385 batch_tracker.register_job(job) 

1386 

1387 def dispatch_next(self): 

1388 """Dispatch more data for parallel processing 

1389 

1390 This method is meant to be called concurrently by the multiprocessing 

1391 callback. We rely on the thread-safety of dispatch_one_batch to protect 

1392 against concurrent consumption of the unprotected iterator. 

1393 

1394 """ 

1395 if not self.dispatch_one_batch(self._original_iterator): 

1396 self._iterating = False 

1397 self._original_iterator = None 

1398 

1399 def dispatch_one_batch(self, iterator): 

1400 """Prefetch the tasks for the next batch and dispatch them. 

1401 

1402 The effective size of the batch is computed here. 

1403 If there are no more jobs to dispatch, return False, else return True. 

1404 

1405 The iterator consumption and dispatching is protected by the same 

1406 lock so calling this function should be thread safe. 

1407 

1408 """ 

1409 

1410 if self._aborting: 

1411 return False 

1412 

1413 batch_size = self._get_batch_size() 

1414 

1415 with self._lock: 

1416 # to ensure an even distribution of the workload between workers, 

1417 # we look ahead in the original iterators more than batch_size 

1418 # tasks - However, we keep consuming only one batch at each 

1419 # dispatch_one_batch call. The extra tasks are stored in a local 

1420 # queue, _ready_batches, that is looked-up prior to re-consuming 

1421 # tasks from the origal iterator. 

1422 try: 

1423 tasks = self._ready_batches.get(block=False) 

1424 except queue.Empty: 

1425 # slice the iterator n_jobs * batchsize items at a time. If the 

1426 # slice returns less than that, then the current batchsize puts 

1427 # too much weight on a subset of workers, while other may end 

1428 # up starving. So in this case, re-scale the batch size 

1429 # accordingly to distribute evenly the last items between all 

1430 # workers. 

1431 n_jobs = self._cached_effective_n_jobs 

1432 big_batch_size = batch_size * n_jobs 

1433 

1434 islice = list(itertools.islice(iterator, big_batch_size)) 

1435 if len(islice) == 0: 

1436 return False 

1437 elif (iterator is self._original_iterator and 

1438 len(islice) < big_batch_size): 

1439 # We reached the end of the original iterator (unless 

1440 # iterator is the ``pre_dispatch``-long initial slice of 

1441 # the original iterator) -- decrease the batch size to 

1442 # account for potential variance in the batches running 

1443 # time. 

1444 final_batch_size = max(1, len(islice) // (10 * n_jobs)) 

1445 else: 

1446 final_batch_size = max(1, len(islice) // n_jobs) 

1447 

1448 # enqueue n_jobs batches in a local queue 

1449 for i in range(0, len(islice), final_batch_size): 

1450 tasks = BatchedCalls(islice[i:i + final_batch_size], 

1451 self._backend.get_nested_backend(), 

1452 self._reducer_callback, 

1453 self._pickle_cache) 

1454 self._ready_batches.put(tasks) 

1455 

1456 # finally, get one task. 

1457 tasks = self._ready_batches.get(block=False) 

1458 if len(tasks) == 0: 

1459 # No more tasks available in the iterator: tell caller to stop. 

1460 return False 

1461 else: 

1462 self._dispatch(tasks) 

1463 return True 

1464 

1465 def _get_batch_size(self): 

1466 """Returns the effective batch size for dispatch""" 

1467 if self.batch_size == 'auto': 

1468 return self._backend.compute_batch_size() 

1469 else: 

1470 # Fixed batch size strategy 

1471 return self.batch_size 

1472 

1473 def _print(self, msg): 

1474 """Display the message on stout or stderr depending on verbosity""" 

1475 # XXX: Not using the logger framework: need to 

1476 # learn to use logger better. 

1477 if not self.verbose: 

1478 return 

1479 if self.verbose < 50: 

1480 writer = sys.stderr.write 

1481 else: 

1482 writer = sys.stdout.write 

1483 writer(f"[{self}]: {msg}\n") 

1484 

1485 def _is_completed(self): 

1486 """Check if all tasks have been completed""" 

1487 return self.n_completed_tasks == self.n_dispatched_tasks and not ( 

1488 self._iterating or self._aborting 

1489 ) 

1490 

1491 def print_progress(self): 

1492 """Display the process of the parallel execution only a fraction 

1493 of time, controlled by self.verbose. 

1494 """ 

1495 

1496 if not self.verbose: 

1497 return 

1498 

1499 elapsed_time = time.time() - self._start_time 

1500 

1501 if self._is_completed(): 

1502 # Make sure that we get a last message telling us we are done 

1503 self._print( 

1504 f"Done {self.n_completed_tasks:3d} out of " 

1505 f"{self.n_completed_tasks:3d} | elapsed: " 

1506 f"{short_format_time(elapsed_time)} finished" 

1507 ) 

1508 return 

1509 

1510 # Original job iterator becomes None once it has been fully 

1511 # consumed : at this point we know the total number of jobs and we are 

1512 # able to display an estimation of the remaining time based on already 

1513 # completed jobs. Otherwise, we simply display the number of completed 

1514 # tasks. 

1515 elif self._original_iterator is not None: 

1516 if _verbosity_filter(self.n_dispatched_batches, self.verbose): 

1517 return 

1518 self._print( 

1519 f"Done {self.n_completed_tasks:3d} tasks | elapsed: " 

1520 f"{short_format_time(elapsed_time)}" 

1521 ) 

1522 else: 

1523 index = self.n_completed_tasks 

1524 # We are finished dispatching 

1525 total_tasks = self.n_dispatched_tasks 

1526 # We always display the first loop 

1527 if not index == 0: 

1528 # Display depending on the number of remaining items 

1529 # A message as soon as we finish dispatching, cursor is 0 

1530 cursor = (total_tasks - index + 1 - 

1531 self._pre_dispatch_amount) 

1532 frequency = (total_tasks // self.verbose) + 1 

1533 is_last_item = (index + 1 == total_tasks) 

1534 if (is_last_item or cursor % frequency): 

1535 return 

1536 remaining_time = (elapsed_time / index) * \ 

1537 (self.n_dispatched_tasks - index * 1.0) 

1538 # only display status if remaining time is greater or equal to 0 

1539 self._print( 

1540 f"Done {index:3d} out of {total_tasks:3d} | elapsed: " 

1541 f"{short_format_time(elapsed_time)} remaining: " 

1542 f"{short_format_time(remaining_time)}" 

1543 ) 

1544 

1545 def _abort(self): 

1546 # Stop dispatching new jobs in the async callback thread 

1547 self._aborting = True 

1548 

1549 # If the backend allows it, cancel or kill remaining running 

1550 # tasks without waiting for the results as we will raise 

1551 # the exception we got back to the caller instead of returning 

1552 # any result. 

1553 backend = self._backend 

1554 if (not self._aborted and hasattr(backend, 'abort_everything')): 

1555 # If the backend is managed externally we need to make sure 

1556 # to leave it in a working state to allow for future jobs 

1557 # scheduling. 

1558 ensure_ready = self._managed_backend 

1559 backend.abort_everything(ensure_ready=ensure_ready) 

1560 self._aborted = True 

1561 

1562 def _start(self, iterator, pre_dispatch): 

1563 # Only set self._iterating to True if at least a batch 

1564 # was dispatched. In particular this covers the edge 

1565 # case of Parallel used with an exhausted iterator. If 

1566 # self._original_iterator is None, then this means either 

1567 # that pre_dispatch == "all", n_jobs == 1 or that the first batch 

1568 # was very quick and its callback already dispatched all the 

1569 # remaining jobs. 

1570 self._iterating = False 

1571 if self.dispatch_one_batch(iterator): 

1572 self._iterating = self._original_iterator is not None 

1573 

1574 while self.dispatch_one_batch(iterator): 

1575 pass 

1576 

1577 if pre_dispatch == "all": 

1578 # The iterable was consumed all at once by the above for loop. 

1579 # No need to wait for async callbacks to trigger to 

1580 # consumption. 

1581 self._iterating = False 

1582 

1583 def _get_outputs(self, iterator, pre_dispatch): 

1584 """Iterator returning the tasks' output as soon as they are ready.""" 

1585 dispatch_thread_id = threading.get_ident() 

1586 detach_generator_exit = False 

1587 try: 

1588 self._start(iterator, pre_dispatch) 

1589 # first yield returns None, for internal use only. This ensures 

1590 # that we enter the try/except block and start dispatching the 

1591 # tasks. 

1592 yield 

1593 

1594 with self._backend.retrieval_context(): 

1595 yield from self._retrieve() 

1596 

1597 except GeneratorExit: 

1598 # The generator has been garbage collected before being fully 

1599 # consumed. This aborts the remaining tasks if possible and warn 

1600 # the user if necessary. 

1601 self._exception = True 

1602 

1603 # In some interpreters such as PyPy, GeneratorExit can be raised in 

1604 # a different thread than the one used to start the dispatch of the 

1605 # parallel tasks. This can lead to hang when a thread attempts to 

1606 # join itself. As workaround, we detach the execution of the 

1607 # aborting code to a dedicated thread. We then need to make sure 

1608 # the rest of the function does not call `_terminate_and_reset` 

1609 # in finally. 

1610 if dispatch_thread_id != threading.get_ident(): 

1611 if not IS_PYPY: 

1612 warnings.warn( 

1613 "A generator produced by joblib.Parallel has been " 

1614 "gc'ed in an unexpected thread. This behavior should " 

1615 "not cause major -issues but to make sure, please " 

1616 "report this warning and your use case at " 

1617 "https://github.com/joblib/joblib/issues so it can " 

1618 "be investigated." 

1619 ) 

1620 

1621 detach_generator_exit = True 

1622 _parallel = self 

1623 

1624 class _GeneratorExitThread(threading.Thread): 

1625 def run(self): 

1626 _parallel._abort() 

1627 if _parallel.return_generator: 

1628 _parallel._warn_exit_early() 

1629 _parallel._terminate_and_reset() 

1630 

1631 _GeneratorExitThread( 

1632 name="GeneratorExitThread" 

1633 ).start() 

1634 return 

1635 

1636 # Otherwise, we are in the thread that started the dispatch: we can 

1637 # safely abort the execution and warn the user. 

1638 self._abort() 

1639 if self.return_generator: 

1640 self._warn_exit_early() 

1641 

1642 raise 

1643 

1644 # Note: we catch any BaseException instead of just Exception instances 

1645 # to also include KeyboardInterrupt 

1646 except BaseException: 

1647 self._exception = True 

1648 self._abort() 

1649 raise 

1650 finally: 

1651 # Store the unconsumed tasks and terminate the workers if necessary 

1652 _remaining_outputs = ([] if self._exception else self._jobs) 

1653 self._jobs = collections.deque() 

1654 self._running = False 

1655 if not detach_generator_exit: 

1656 self._terminate_and_reset() 

1657 

1658 while len(_remaining_outputs) > 0: 

1659 batched_results = _remaining_outputs.popleft() 

1660 batched_results = batched_results.get_result(self.timeout) 

1661 for result in batched_results: 

1662 yield result 

1663 

1664 def _wait_retrieval(self): 

1665 """Return True if we need to continue retriving some tasks.""" 

1666 

1667 # If the input load is still being iterated over, it means that tasks 

1668 # are still on the dispatch wait list and their results will need to 

1669 # be retrieved later on. 

1670 if self._iterating: 

1671 return True 

1672 

1673 # If some of the dispatched tasks are still being processed by the 

1674 # workers, wait for the compute to finish before starting retrieval 

1675 if self.n_completed_tasks < self.n_dispatched_tasks: 

1676 return True 

1677 

1678 # For backends that does not support retrieving asynchronously the 

1679 # result to the main process, all results must be carefully retrieved 

1680 # in the _retrieve loop in the main thread while the backend is alive. 

1681 # For other backends, the actual retrieval is done asynchronously in 

1682 # the callback thread, and we can terminate the backend before the 

1683 # `self._jobs` result list has been emptied. The remaining results 

1684 # will be collected in the `finally` step of the generator. 

1685 if not self._backend.supports_retrieve_callback: 

1686 if len(self._jobs) > 0: 

1687 return True 

1688 

1689 return False 

1690 

1691 def _retrieve(self): 

1692 while self._wait_retrieval(): 

1693 

1694 # If the callback thread of a worker has signaled that its task 

1695 # triggered an exception, or if the retrieval loop has raised an 

1696 # exception (e.g. `GeneratorExit`), exit the loop and surface the 

1697 # worker traceback. 

1698 if self._aborting: 

1699 self._raise_error_fast() 

1700 break 

1701 

1702 # If the next job is not ready for retrieval yet, we just wait for 

1703 # async callbacks to progress. 

1704 if ((len(self._jobs) == 0) or 

1705 (self._jobs[0].get_status( 

1706 timeout=self.timeout) == TASK_PENDING)): 

1707 time.sleep(0.01) 

1708 continue 

1709 

1710 # We need to be careful: the job list can be filling up as 

1711 # we empty it and Python list are not thread-safe by 

1712 # default hence the use of the lock 

1713 with self._lock: 

1714 batched_results = self._jobs.popleft() 

1715 

1716 # Flatten the batched results to output one output at a time 

1717 batched_results = batched_results.get_result(self.timeout) 

1718 for result in batched_results: 

1719 self._nb_consumed += 1 

1720 yield result 

1721 

1722 def _raise_error_fast(self): 

1723 """If we are aborting, raise if a job caused an error.""" 

1724 

1725 # Find the first job whose status is TASK_ERROR if it exists. 

1726 with self._lock: 

1727 error_job = next((job for job in self._jobs 

1728 if job.status == TASK_ERROR), None) 

1729 

1730 # If this error job exists, immediatly raise the error by 

1731 # calling get_result. This job might not exists if abort has been 

1732 # called directly or if the generator is gc'ed. 

1733 if error_job is not None: 

1734 error_job.get_result(self.timeout) 

1735 

1736 def _warn_exit_early(self): 

1737 """Warn the user if the generator is gc'ed before being consumned.""" 

1738 ready_outputs = self.n_completed_tasks - self._nb_consumed 

1739 is_completed = self._is_completed() 

1740 msg = "" 

1741 if ready_outputs: 

1742 msg += ( 

1743 f"{ready_outputs} tasks have been successfully executed " 

1744 " but not used." 

1745 ) 

1746 if not is_completed: 

1747 msg += " Additionally, " 

1748 

1749 if not is_completed: 

1750 msg += ( 

1751 f"{self.n_dispatched_tasks - self.n_completed_tasks} tasks " 

1752 "which were still being processed by the workers have been " 

1753 "cancelled." 

1754 ) 

1755 

1756 if msg: 

1757 msg += ( 

1758 " You could benefit from adjusting the input task " 

1759 "iterator to limit unnecessary computation time." 

1760 ) 

1761 

1762 warnings.warn(msg) 

1763 

1764 def _get_sequential_output(self, iterable): 

1765 """Separate loop for sequential output. 

1766 

1767 This simplifies the traceback in case of errors and reduces the 

1768 overhead of calling sequential tasks with `joblib`. 

1769 """ 

1770 try: 

1771 self._iterating = True 

1772 self._original_iterator = iterable 

1773 batch_size = self._get_batch_size() 

1774 

1775 if batch_size != 1: 

1776 it = iter(iterable) 

1777 iterable_batched = iter( 

1778 lambda: tuple(itertools.islice(it, batch_size)), () 

1779 ) 

1780 iterable = ( 

1781 task for batch in iterable_batched for task in batch 

1782 ) 

1783 

1784 # first yield returns None, for internal use only. This ensures 

1785 # that we enter the try/except block and setup the generator. 

1786 yield None 

1787 

1788 # Sequentially call the tasks and yield the results. 

1789 for func, args, kwargs in iterable: 

1790 self.n_dispatched_batches += 1 

1791 self.n_dispatched_tasks += 1 

1792 res = func(*args, **kwargs) 

1793 self.n_completed_tasks += 1 

1794 self.print_progress() 

1795 yield res 

1796 self._nb_consumed += 1 

1797 except BaseException: 

1798 self._exception = True 

1799 self._aborting = True 

1800 self._aborted = True 

1801 raise 

1802 finally: 

1803 self.print_progress() 

1804 self._running = False 

1805 self._iterating = False 

1806 self._original_iterator = None 

1807 

1808 def _reset_run_tracking(self): 

1809 """Reset the counters and flags used to track the execution.""" 

1810 

1811 # Makes sur the parallel instance was not previously running in a 

1812 # thread-safe way. 

1813 with getattr(self, '_lock', nullcontext()): 

1814 if self._running: 

1815 msg = 'This Parallel instance is already running !' 

1816 if self.return_generator is True: 

1817 msg += ( 

1818 " Before submitting new tasks, you must wait for the " 

1819 "completion of all the previous tasks, or clean all " 

1820 "references to the output generator." 

1821 ) 

1822 raise RuntimeError(msg) 

1823 self._running = True 

1824 

1825 # Counter to keep track of the task dispatched and completed. 

1826 self.n_dispatched_batches = 0 

1827 self.n_dispatched_tasks = 0 

1828 self.n_completed_tasks = 0 

1829 

1830 # Following count is incremented by one each time the user iterates 

1831 # on the output generator, it is used to prepare an informative 

1832 # warning message in case the generator is deleted before all the 

1833 # dispatched tasks have been consumed. 

1834 self._nb_consumed = 0 

1835 

1836 # Following flags are used to synchronize the threads in case one of 

1837 # the tasks error-out to ensure that all workers abort fast and that 

1838 # the backend terminates properly. 

1839 

1840 # Set to True as soon as a worker signals that a task errors-out 

1841 self._exception = False 

1842 # Set to True in case of early termination following an incident 

1843 self._aborting = False 

1844 # Set to True after abortion is complete 

1845 self._aborted = False 

1846 

1847 def __call__(self, iterable): 

1848 """Main function to dispatch parallel tasks.""" 

1849 

1850 self._reset_run_tracking() 

1851 self._start_time = time.time() 

1852 

1853 if not self._managed_backend: 

1854 n_jobs = self._initialize_backend() 

1855 else: 

1856 n_jobs = self._effective_n_jobs() 

1857 

1858 if n_jobs == 1: 

1859 # If n_jobs==1, run the computation sequentially and return 

1860 # immediatly to avoid overheads. 

1861 output = self._get_sequential_output(iterable) 

1862 next(output) 

1863 return output if self.return_generator else list(output) 

1864 

1865 # Let's create an ID that uniquely identifies the current call. If the 

1866 # call is interrupted early and that the same instance is immediately 

1867 # re-used, this id will be used to prevent workers that were 

1868 # concurrently finalizing a task from the previous call to run the 

1869 # callback. 

1870 with self._lock: 

1871 self._call_id = uuid4().hex 

1872 

1873 # self._effective_n_jobs should be called in the Parallel.__call__ 

1874 # thread only -- store its value in an attribute for further queries. 

1875 self._cached_effective_n_jobs = n_jobs 

1876 

1877 if isinstance(self._backend, LokyBackend): 

1878 # For the loky backend, we add a callback executed when reducing 

1879 # BatchCalls, that makes the loky executor use a temporary folder 

1880 # specific to this Parallel object when pickling temporary memmaps. 

1881 # This callback is necessary to ensure that several Parallel 

1882 # objects using the same resuable executor don't use the same 

1883 # temporary resources. 

1884 

1885 def _batched_calls_reducer_callback(): 

1886 # Relevant implementation detail: the following lines, called 

1887 # when reducing BatchedCalls, are called in a thread-safe 

1888 # situation, meaning that the context of the temporary folder 

1889 # manager will not be changed in between the callback execution 

1890 # and the end of the BatchedCalls pickling. The reason is that 

1891 # pickling (the only place where set_current_context is used) 

1892 # is done from a single thread (the queue_feeder_thread). 

1893 self._backend._workers._temp_folder_manager.set_current_context( # noqa 

1894 self._id 

1895 ) 

1896 self._reducer_callback = _batched_calls_reducer_callback 

1897 

1898 # self._effective_n_jobs should be called in the Parallel.__call__ 

1899 # thread only -- store its value in an attribute for further queries. 

1900 self._cached_effective_n_jobs = n_jobs 

1901 

1902 backend_name = self._backend.__class__.__name__ 

1903 if n_jobs == 0: 

1904 raise RuntimeError("%s has no active worker." % backend_name) 

1905 

1906 self._print( 

1907 f"Using backend {backend_name} with {n_jobs} concurrent workers." 

1908 ) 

1909 if hasattr(self._backend, 'start_call'): 

1910 self._backend.start_call() 

1911 

1912 # Following flag prevents double calls to `backend.stop_call`. 

1913 self._calling = True 

1914 

1915 iterator = iter(iterable) 

1916 pre_dispatch = self.pre_dispatch 

1917 

1918 if pre_dispatch == 'all': 

1919 # prevent further dispatch via multiprocessing callback thread 

1920 self._original_iterator = None 

1921 self._pre_dispatch_amount = 0 

1922 else: 

1923 self._original_iterator = iterator 

1924 if hasattr(pre_dispatch, 'endswith'): 

1925 pre_dispatch = eval_expr( 

1926 pre_dispatch.replace("n_jobs", str(n_jobs)) 

1927 ) 

1928 self._pre_dispatch_amount = pre_dispatch = int(pre_dispatch) 

1929 

1930 # The main thread will consume the first pre_dispatch items and 

1931 # the remaining items will later be lazily dispatched by async 

1932 # callbacks upon task completions. 

1933 

1934 # TODO: this iterator should be batch_size * n_jobs 

1935 iterator = itertools.islice(iterator, self._pre_dispatch_amount) 

1936 

1937 # Use a caching dict for callables that are pickled with cloudpickle to 

1938 # improve performances. This cache is used only in the case of 

1939 # functions that are defined in the __main__ module, functions that 

1940 # are defined locally (inside another function) and lambda expressions. 

1941 self._pickle_cache = dict() 

1942 

1943 output = self._get_outputs(iterator, pre_dispatch) 

1944 self._call_ref = weakref.ref(output) 

1945 

1946 # The first item from the output is blank, but it makes the interpreter 

1947 # progress until it enters the Try/Except block of the generator and 

1948 # reach the first `yield` statement. This starts the aynchronous 

1949 # dispatch of the tasks to the workers. 

1950 next(output) 

1951 

1952 return output if self.return_generator else list(output) 

1953 

1954 def __repr__(self): 

1955 return '%s(n_jobs=%s)' % (self.__class__.__name__, self.n_jobs)