Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/joblib/parallel.py: 16%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

666 statements  

1""" 

2Helpers for embarrassingly parallel code. 

3""" 

4# Author: Gael Varoquaux < gael dot varoquaux at normalesup dot org > 

5# Copyright: 2010, Gael Varoquaux 

6# License: BSD 3 clause 

7 

8from __future__ import division 

9 

10import collections 

11import functools 

12import itertools 

13import os 

14import queue 

15import sys 

16import threading 

17import time 

18import warnings 

19import weakref 

20from contextlib import nullcontext 

21from math import floor, log10, sqrt 

22from multiprocessing import TimeoutError 

23from numbers import Integral 

24from uuid import uuid4 

25 

26from ._multiprocessing_helpers import mp 

27 

28# Make sure that those two classes are part of the public joblib.parallel API 

29# so that 3rd party backend implementers can import them from here. 

30from ._parallel_backends import ( 

31 AutoBatchingMixin, # noqa 

32 FallbackToBackend, 

33 LokyBackend, 

34 MultiprocessingBackend, 

35 ParallelBackendBase, # noqa 

36 SequentialBackend, 

37 ThreadingBackend, 

38) 

39from ._utils import _Sentinel, eval_expr 

40from .disk import memstr_to_bytes 

41from .logger import Logger, short_format_time 

42 

43BACKENDS = { 

44 "threading": ThreadingBackend, 

45 "sequential": SequentialBackend, 

46} 

47# name of the backend used by default by Parallel outside of any context 

48# managed by ``parallel_config`` or ``parallel_backend``. 

49 

50# threading is the only backend that is always everywhere 

51DEFAULT_BACKEND = "threading" 

52DEFAULT_THREAD_BACKEND = "threading" 

53DEFAULT_PROCESS_BACKEND = "threading" 

54 

55MAYBE_AVAILABLE_BACKENDS = {"multiprocessing", "loky"} 

56 

57# if multiprocessing is available, so is loky, we set it as the default 

58# backend 

59if mp is not None: 

60 BACKENDS["multiprocessing"] = MultiprocessingBackend 

61 from .externals import loky 

62 

63 BACKENDS["loky"] = LokyBackend 

64 DEFAULT_BACKEND = "loky" 

65 DEFAULT_PROCESS_BACKEND = "loky" 

66 

67# Thread local value that can be overridden by the ``parallel_config`` context 

68# manager 

69_backend = threading.local() 

70 

71 

72def _register_dask(): 

73 """Register Dask Backend if called with parallel_config(backend="dask")""" 

74 try: 

75 from ._dask import DaskDistributedBackend 

76 

77 register_parallel_backend("dask", DaskDistributedBackend) 

78 except ImportError as e: 

79 msg = ( 

80 "To use the dask.distributed backend you must install both " 

81 "the `dask` and distributed modules.\n\n" 

82 "See https://dask.pydata.org/en/latest/install.html for more " 

83 "information." 

84 ) 

85 raise ImportError(msg) from e 

86 

87 

88EXTERNAL_BACKENDS = { 

89 "dask": _register_dask, 

90} 

91 

92 

93# Sentinels for the default values of the Parallel constructor and 

94# the parallel_config and parallel_backend context managers 

95default_parallel_config = { 

96 "backend": _Sentinel(default_value=None), 

97 "n_jobs": _Sentinel(default_value=None), 

98 "verbose": _Sentinel(default_value=0), 

99 "temp_folder": _Sentinel(default_value=None), 

100 "max_nbytes": _Sentinel(default_value="1M"), 

101 "mmap_mode": _Sentinel(default_value="r"), 

102 "prefer": _Sentinel(default_value=None), 

103 "require": _Sentinel(default_value=None), 

104} 

105 

106 

107VALID_BACKEND_HINTS = ("processes", "threads", None) 

108VALID_BACKEND_CONSTRAINTS = ("sharedmem", None) 

109 

110 

111def _get_config_param(param, context_config, key): 

112 """Return the value of a parallel config parameter 

113 

114 Explicitly setting it in Parallel has priority over setting in a 

115 parallel_(config/backend) context manager. 

116 """ 

117 if param is not default_parallel_config[key]: 

118 # param is explicitly set, return it 

119 return param 

120 

121 if context_config[key] is not default_parallel_config[key]: 

122 # there's a context manager and the key is set, return it 

123 return context_config[key] 

124 

125 # Otherwise, we are in the default_parallel_config, 

126 # return the default value 

127 return param.default_value 

128 

129 

130def get_active_backend( 

131 prefer=default_parallel_config["prefer"], 

132 require=default_parallel_config["require"], 

133 verbose=default_parallel_config["verbose"], 

134): 

135 """Return the active default backend""" 

136 backend, config = _get_active_backend(prefer, require, verbose) 

137 n_jobs = _get_config_param(default_parallel_config["n_jobs"], config, "n_jobs") 

138 return backend, n_jobs 

139 

140 

141def _get_active_backend( 

142 prefer=default_parallel_config["prefer"], 

143 require=default_parallel_config["require"], 

144 verbose=default_parallel_config["verbose"], 

145): 

146 """Return the active default backend""" 

147 

148 backend_config = getattr(_backend, "config", default_parallel_config) 

149 

150 backend = _get_config_param( 

151 default_parallel_config["backend"], backend_config, "backend" 

152 ) 

153 

154 prefer = _get_config_param(prefer, backend_config, "prefer") 

155 require = _get_config_param(require, backend_config, "require") 

156 verbose = _get_config_param(verbose, backend_config, "verbose") 

157 

158 if prefer not in VALID_BACKEND_HINTS: 

159 raise ValueError( 

160 f"prefer={prefer} is not a valid backend hint, " 

161 f"expected one of {VALID_BACKEND_HINTS}" 

162 ) 

163 if require not in VALID_BACKEND_CONSTRAINTS: 

164 raise ValueError( 

165 f"require={require} is not a valid backend constraint, " 

166 f"expected one of {VALID_BACKEND_CONSTRAINTS}" 

167 ) 

168 if prefer == "processes" and require == "sharedmem": 

169 raise ValueError( 

170 "prefer == 'processes' and require == 'sharedmem' are inconsistent settings" 

171 ) 

172 

173 explicit_backend = True 

174 if backend is None: 

175 # We are either outside of the scope of any parallel_(config/backend) 

176 # context manager or the context manager did not set a backend. 

177 # create the default backend instance now. 

178 backend = BACKENDS[DEFAULT_BACKEND](nesting_level=0) 

179 explicit_backend = False 

180 

181 # Try to use the backend set by the user with the context manager. 

182 

183 nesting_level = backend.nesting_level 

184 uses_threads = getattr(backend, "uses_threads", False) 

185 supports_sharedmem = getattr(backend, "supports_sharedmem", False) 

186 # Force to use thread-based backend if the provided backend does not 

187 # match the shared memory constraint or if the backend is not explicitly 

188 # given and threads are preferred. 

189 force_threads = (require == "sharedmem" and not supports_sharedmem) or ( 

190 not explicit_backend and prefer == "threads" and not uses_threads 

191 ) 

192 force_processes = not explicit_backend and prefer == "processes" and uses_threads 

193 

194 if force_threads: 

195 # This backend does not match the shared memory constraint: 

196 # fallback to the default thead-based backend. 

197 sharedmem_backend = BACKENDS[DEFAULT_THREAD_BACKEND]( 

198 nesting_level=nesting_level 

199 ) 

200 # Warn the user if we forced the backend to thread-based, while the 

201 # user explicitly specified a non-thread-based backend. 

202 if verbose >= 10 and explicit_backend: 

203 print( 

204 f"Using {sharedmem_backend.__class__.__name__} as " 

205 f"joblib backend instead of {backend.__class__.__name__} " 

206 "as the latter does not provide shared memory semantics." 

207 ) 

208 # Force to n_jobs=1 by default 

209 thread_config = backend_config.copy() 

210 thread_config["n_jobs"] = 1 

211 return sharedmem_backend, thread_config 

212 

213 if force_processes: 

214 # This backend does not match the prefer="processes" constraint: 

215 # fallback to the default process-based backend. 

216 process_backend = BACKENDS[DEFAULT_PROCESS_BACKEND](nesting_level=nesting_level) 

217 

218 return process_backend, backend_config.copy() 

219 

220 return backend, backend_config 

221 

222 

223class parallel_config: 

224 """Set the default backend or configuration for :class:`~joblib.Parallel`. 

225 

226 This is an alternative to directly passing keyword arguments to the 

227 :class:`~joblib.Parallel` class constructor. It is particularly useful when 

228 calling into library code that uses joblib internally but does not expose 

229 the various parallel configuration arguments in its own API. 

230 

231 Parameters 

232 ---------- 

233 backend: str or ParallelBackendBase instance, default=None 

234 If ``backend`` is a string it must match a previously registered 

235 implementation using the :func:`~register_parallel_backend` function. 

236 

237 By default the following backends are available: 

238 

239 - 'loky': single-host, process-based parallelism (used by default), 

240 - 'threading': single-host, thread-based parallelism, 

241 - 'multiprocessing': legacy single-host, process-based parallelism. 

242 

243 'loky' is recommended to run functions that manipulate Python objects. 

244 'threading' is a low-overhead alternative that is most efficient for 

245 functions that release the Global Interpreter Lock: e.g. I/O-bound 

246 code or CPU-bound code in a few calls to native code that explicitly 

247 releases the GIL. Note that on some rare systems (such as pyodide), 

248 multiprocessing and loky may not be available, in which case joblib 

249 defaults to threading. 

250 

251 In addition, if the ``dask`` and ``distributed`` Python packages are 

252 installed, it is possible to use the 'dask' backend for better 

253 scheduling of nested parallel calls without over-subscription and 

254 potentially distribute parallel calls over a networked cluster of 

255 several hosts. 

256 

257 It is also possible to use the distributed 'ray' backend for 

258 distributing the workload to a cluster of nodes. See more details 

259 in the Examples section below. 

260 

261 Alternatively the backend can be passed directly as an instance. 

262 

263 n_jobs: int, default=None 

264 The maximum number of concurrently running jobs, such as the number 

265 of Python worker processes when ``backend="loky"`` or the size of the 

266 thread-pool when ``backend="threading"``. 

267 This argument is converted to an integer, rounded below for float. 

268 If -1 is given, `joblib` tries to use all CPUs. The number of CPUs 

269 ``n_cpus`` is obtained with :func:`~cpu_count`. 

270 For n_jobs below -1, (n_cpus + 1 + n_jobs) are used. For instance, 

271 using ``n_jobs=-2`` will result in all CPUs but one being used. 

272 This argument can also go above ``n_cpus``, which will cause 

273 oversubscription. In some cases, slight oversubscription can be 

274 beneficial, e.g., for tasks with large I/O operations. 

275 If 1 is given, no parallel computing code is used at all, and the 

276 behavior amounts to a simple python `for` loop. This mode is not 

277 compatible with `timeout`. 

278 None is a marker for 'unset' that will be interpreted as n_jobs=1 

279 unless the call is performed under a :func:`~parallel_config` 

280 context manager that sets another value for ``n_jobs``. 

281 If n_jobs = 0 then a ValueError is raised. 

282 

283 verbose: int, default=0 

284 The verbosity level: if non zero, progress messages are 

285 printed. Above 50, the output is sent to stdout. 

286 The frequency of the messages increases with the verbosity level. 

287 If it more than 10, all iterations are reported. 

288 

289 temp_folder: str or None, default=None 

290 Folder to be used by the pool for memmapping large arrays 

291 for sharing memory with worker processes. If None, this will try in 

292 order: 

293 

294 - a folder pointed by the ``JOBLIB_TEMP_FOLDER`` environment 

295 variable, 

296 - ``/dev/shm`` if the folder exists and is writable: this is a 

297 RAM disk filesystem available by default on modern Linux 

298 distributions, 

299 - the default system temporary folder that can be 

300 overridden with ``TMP``, ``TMPDIR`` or ``TEMP`` environment 

301 variables, typically ``/tmp`` under Unix operating systems. 

302 

303 max_nbytes: int, str, or None, optional, default='1M' 

304 Threshold on the size of arrays passed to the workers that 

305 triggers automated memory mapping in temp_folder. Can be an int 

306 in Bytes, or a human-readable string, e.g., '1M' for 1 megabyte. 

307 Use None to disable memmapping of large arrays. 

308 

309 mmap_mode: {None, 'r+', 'r', 'w+', 'c'}, default='r' 

310 Memmapping mode for numpy arrays passed to workers. None will 

311 disable memmapping, other modes defined in the numpy.memmap doc: 

312 https://numpy.org/doc/stable/reference/generated/numpy.memmap.html 

313 Also, see 'max_nbytes' parameter documentation for more details. 

314 

315 prefer: str in {'processes', 'threads'} or None, default=None 

316 Soft hint to choose the default backend. 

317 The default process-based backend is 'loky' and the default 

318 thread-based backend is 'threading'. Ignored if the ``backend`` 

319 parameter is specified. 

320 

321 require: 'sharedmem' or None, default=None 

322 Hard constraint to select the backend. If set to 'sharedmem', 

323 the selected backend will be single-host and thread-based. 

324 

325 inner_max_num_threads: int, default=None 

326 If not None, overwrites the limit set on the number of threads 

327 usable in some third-party library threadpools like OpenBLAS, 

328 MKL or OpenMP. This is only used with the ``loky`` backend. 

329 

330 backend_params: dict 

331 Additional parameters to pass to the backend constructor when 

332 backend is a string. 

333 

334 Notes 

335 ----- 

336 Joblib tries to limit the oversubscription by limiting the number of 

337 threads usable in some third-party library threadpools like OpenBLAS, MKL 

338 or OpenMP. The default limit in each worker is set to 

339 ``max(cpu_count() // effective_n_jobs, 1)`` but this limit can be 

340 overwritten with the ``inner_max_num_threads`` argument which will be used 

341 to set this limit in the child processes. 

342 

343 .. versionadded:: 1.3 

344 

345 Examples 

346 -------- 

347 >>> from operator import neg 

348 >>> with parallel_config(backend='threading'): 

349 ... print(Parallel()(delayed(neg)(i + 1) for i in range(5))) 

350 ... 

351 [-1, -2, -3, -4, -5] 

352 

353 To use the 'ray' joblib backend add the following lines: 

354 

355 >>> from ray.util.joblib import register_ray # doctest: +SKIP 

356 >>> register_ray() # doctest: +SKIP 

357 >>> with parallel_config(backend="ray"): # doctest: +SKIP 

358 ... print(Parallel()(delayed(neg)(i + 1) for i in range(5))) 

359 [-1, -2, -3, -4, -5] 

360 

361 """ 

362 

363 def __init__( 

364 self, 

365 backend=default_parallel_config["backend"], 

366 *, 

367 n_jobs=default_parallel_config["n_jobs"], 

368 verbose=default_parallel_config["verbose"], 

369 temp_folder=default_parallel_config["temp_folder"], 

370 max_nbytes=default_parallel_config["max_nbytes"], 

371 mmap_mode=default_parallel_config["mmap_mode"], 

372 prefer=default_parallel_config["prefer"], 

373 require=default_parallel_config["require"], 

374 inner_max_num_threads=None, 

375 **backend_params, 

376 ): 

377 # Save the parallel info and set the active parallel config 

378 self.old_parallel_config = getattr(_backend, "config", default_parallel_config) 

379 

380 backend = self._check_backend(backend, inner_max_num_threads, **backend_params) 

381 

382 new_config = { 

383 "n_jobs": n_jobs, 

384 "verbose": verbose, 

385 "temp_folder": temp_folder, 

386 "max_nbytes": max_nbytes, 

387 "mmap_mode": mmap_mode, 

388 "prefer": prefer, 

389 "require": require, 

390 "backend": backend, 

391 } 

392 self.parallel_config = self.old_parallel_config.copy() 

393 self.parallel_config.update( 

394 {k: v for k, v in new_config.items() if not isinstance(v, _Sentinel)} 

395 ) 

396 

397 setattr(_backend, "config", self.parallel_config) 

398 

399 def _check_backend(self, backend, inner_max_num_threads, **backend_params): 

400 if backend is default_parallel_config["backend"]: 

401 if inner_max_num_threads is not None or len(backend_params) > 0: 

402 raise ValueError( 

403 "inner_max_num_threads and other constructor " 

404 "parameters backend_params are only supported " 

405 "when backend is not None." 

406 ) 

407 return backend 

408 

409 if isinstance(backend, str): 

410 # Handle non-registered or missing backends 

411 if backend not in BACKENDS: 

412 if backend in EXTERNAL_BACKENDS: 

413 register = EXTERNAL_BACKENDS[backend] 

414 register() 

415 elif backend in MAYBE_AVAILABLE_BACKENDS: 

416 warnings.warn( 

417 f"joblib backend '{backend}' is not available on " 

418 f"your system, falling back to {DEFAULT_BACKEND}.", 

419 UserWarning, 

420 stacklevel=2, 

421 ) 

422 BACKENDS[backend] = BACKENDS[DEFAULT_BACKEND] 

423 else: 

424 raise ValueError( 

425 f"Invalid backend: {backend}, expected one of " 

426 f"{sorted(BACKENDS.keys())}" 

427 ) 

428 

429 backend = BACKENDS[backend](**backend_params) 

430 else: 

431 if len(backend_params) > 0: 

432 raise ValueError( 

433 "Constructor parameters backend_params are only " 

434 "supported when backend is a string." 

435 ) 

436 

437 if inner_max_num_threads is not None: 

438 msg = ( 

439 f"{backend.__class__.__name__} does not accept setting the " 

440 "inner_max_num_threads argument." 

441 ) 

442 assert backend.supports_inner_max_num_threads, msg 

443 backend.inner_max_num_threads = inner_max_num_threads 

444 

445 # If the nesting_level of the backend is not set previously, use the 

446 # nesting level from the previous active_backend to set it 

447 if backend.nesting_level is None: 

448 parent_backend = self.old_parallel_config["backend"] 

449 if parent_backend is default_parallel_config["backend"]: 

450 nesting_level = 0 

451 else: 

452 nesting_level = parent_backend.nesting_level 

453 backend.nesting_level = nesting_level 

454 

455 return backend 

456 

457 def __enter__(self): 

458 return self.parallel_config 

459 

460 def __exit__(self, type, value, traceback): 

461 self.unregister() 

462 

463 def unregister(self): 

464 setattr(_backend, "config", self.old_parallel_config) 

465 

466 

467class parallel_backend(parallel_config): 

468 """Change the default backend used by Parallel inside a with block. 

469 

470 .. warning:: 

471 It is advised to use the :class:`~joblib.parallel_config` context 

472 manager instead, which allows more fine-grained control over the 

473 backend configuration. 

474 

475 If ``backend`` is a string it must match a previously registered 

476 implementation using the :func:`~register_parallel_backend` function. 

477 

478 By default the following backends are available: 

479 

480 - 'loky': single-host, process-based parallelism (used by default), 

481 - 'threading': single-host, thread-based parallelism, 

482 - 'multiprocessing': legacy single-host, process-based parallelism. 

483 

484 'loky' is recommended to run functions that manipulate Python objects. 

485 'threading' is a low-overhead alternative that is most efficient for 

486 functions that release the Global Interpreter Lock: e.g. I/O-bound code or 

487 CPU-bound code in a few calls to native code that explicitly releases the 

488 GIL. Note that on some rare systems (such as Pyodide), 

489 multiprocessing and loky may not be available, in which case joblib 

490 defaults to threading. 

491 

492 You can also use the `Dask <https://docs.dask.org/en/stable/>`_ joblib 

493 backend to distribute work across machines. This works well with 

494 scikit-learn estimators with the ``n_jobs`` parameter, for example:: 

495 

496 >>> import joblib # doctest: +SKIP 

497 >>> from sklearn.model_selection import GridSearchCV # doctest: +SKIP 

498 >>> from dask.distributed import Client, LocalCluster # doctest: +SKIP 

499 

500 >>> # create a local Dask cluster 

501 >>> cluster = LocalCluster() # doctest: +SKIP 

502 >>> client = Client(cluster) # doctest: +SKIP 

503 >>> grid_search = GridSearchCV(estimator, param_grid, n_jobs=-1) 

504 ... # doctest: +SKIP 

505 >>> with joblib.parallel_backend("dask", scatter=[X, y]): # doctest: +SKIP 

506 ... grid_search.fit(X, y) 

507 

508 It is also possible to use the distributed 'ray' backend for distributing 

509 the workload to a cluster of nodes. To use the 'ray' joblib backend add 

510 the following lines:: 

511 

512 >>> from ray.util.joblib import register_ray # doctest: +SKIP 

513 >>> register_ray() # doctest: +SKIP 

514 >>> with parallel_backend("ray"): # doctest: +SKIP 

515 ... print(Parallel()(delayed(neg)(i + 1) for i in range(5))) 

516 [-1, -2, -3, -4, -5] 

517 

518 Alternatively the backend can be passed directly as an instance. 

519 

520 By default all available workers will be used (``n_jobs=-1``) unless the 

521 caller passes an explicit value for the ``n_jobs`` parameter. 

522 

523 This is an alternative to passing a ``backend='backend_name'`` argument to 

524 the :class:`~Parallel` class constructor. It is particularly useful when 

525 calling into library code that uses joblib internally but does not expose 

526 the backend argument in its own API. 

527 

528 >>> from operator import neg 

529 >>> with parallel_backend('threading'): 

530 ... print(Parallel()(delayed(neg)(i + 1) for i in range(5))) 

531 ... 

532 [-1, -2, -3, -4, -5] 

533 

534 Joblib also tries to limit the oversubscription by limiting the number of 

535 threads usable in some third-party library threadpools like OpenBLAS, MKL 

536 or OpenMP. The default limit in each worker is set to 

537 ``max(cpu_count() // effective_n_jobs, 1)`` but this limit can be 

538 overwritten with the ``inner_max_num_threads`` argument which will be used 

539 to set this limit in the child processes. 

540 

541 .. versionadded:: 0.10 

542 

543 See Also 

544 -------- 

545 joblib.parallel_config: context manager to change the backend configuration. 

546 """ 

547 

548 def __init__( 

549 self, backend, n_jobs=-1, inner_max_num_threads=None, **backend_params 

550 ): 

551 super().__init__( 

552 backend=backend, 

553 n_jobs=n_jobs, 

554 inner_max_num_threads=inner_max_num_threads, 

555 **backend_params, 

556 ) 

557 

558 if self.old_parallel_config is None: 

559 self.old_backend_and_jobs = None 

560 else: 

561 self.old_backend_and_jobs = ( 

562 self.old_parallel_config["backend"], 

563 self.old_parallel_config["n_jobs"], 

564 ) 

565 self.new_backend_and_jobs = ( 

566 self.parallel_config["backend"], 

567 self.parallel_config["n_jobs"], 

568 ) 

569 

570 def __enter__(self): 

571 return self.new_backend_and_jobs 

572 

573 

574# Under Linux or OS X the default start method of multiprocessing 

575# can cause third party libraries to crash. Under Python 3.4+ it is possible 

576# to set an environment variable to switch the default start method from 

577# 'fork' to 'forkserver' or 'spawn' to avoid this issue albeit at the cost 

578# of causing semantic changes and some additional pool instantiation overhead. 

579DEFAULT_MP_CONTEXT = None 

580if hasattr(mp, "get_context"): 

581 method = os.environ.get("JOBLIB_START_METHOD", "").strip() or None 

582 if method is not None: 

583 DEFAULT_MP_CONTEXT = mp.get_context(method=method) 

584 

585 

586class BatchedCalls(object): 

587 """Wrap a sequence of (func, args, kwargs) tuples as a single callable""" 

588 

589 def __init__( 

590 self, iterator_slice, backend_and_jobs, reducer_callback=None, pickle_cache=None 

591 ): 

592 self.items = list(iterator_slice) 

593 self._size = len(self.items) 

594 self._reducer_callback = reducer_callback 

595 if isinstance(backend_and_jobs, tuple): 

596 self._backend, self._n_jobs = backend_and_jobs 

597 else: 

598 # this is for backward compatibility purposes. Before 0.12.6, 

599 # nested backends were returned without n_jobs indications. 

600 self._backend, self._n_jobs = backend_and_jobs, None 

601 self._pickle_cache = pickle_cache if pickle_cache is not None else {} 

602 

603 def __call__(self): 

604 # Set the default nested backend to self._backend but do not set the 

605 # change the default number of processes to -1 

606 with parallel_config(backend=self._backend, n_jobs=self._n_jobs): 

607 return [func(*args, **kwargs) for func, args, kwargs in self.items] 

608 

609 def __reduce__(self): 

610 if self._reducer_callback is not None: 

611 self._reducer_callback() 

612 # no need to pickle the callback. 

613 return ( 

614 BatchedCalls, 

615 (self.items, (self._backend, self._n_jobs), None, self._pickle_cache), 

616 ) 

617 

618 def __len__(self): 

619 return self._size 

620 

621 

622# Possible exit status for a task 

623TASK_DONE = "Done" 

624TASK_ERROR = "Error" 

625TASK_PENDING = "Pending" 

626 

627 

628############################################################################### 

629# CPU count that works also when multiprocessing has been disabled via 

630# the JOBLIB_MULTIPROCESSING environment variable 

631def cpu_count(only_physical_cores=False): 

632 """Return the number of CPUs. 

633 

634 This delegates to loky.cpu_count that takes into account additional 

635 constraints such as Linux CFS scheduler quotas (typically set by container 

636 runtimes such as docker) and CPU affinity (for instance using the taskset 

637 command on Linux). 

638 

639 Parameters 

640 ---------- 

641 only_physical_cores : boolean, default=False 

642 If True, does not take hyperthreading / SMT logical cores into account. 

643 

644 """ 

645 if mp is None: 

646 return 1 

647 

648 return loky.cpu_count(only_physical_cores=only_physical_cores) 

649 

650 

651############################################################################### 

652# For verbosity 

653 

654 

655def _verbosity_filter(index, verbose): 

656 """Returns False for indices increasingly apart, the distance 

657 depending on the value of verbose. 

658 

659 We use a lag increasing as the square of index 

660 """ 

661 if not verbose: 

662 return True 

663 elif verbose > 10: 

664 return False 

665 if index == 0: 

666 return False 

667 verbose = 0.5 * (11 - verbose) ** 2 

668 scale = sqrt(index / verbose) 

669 next_scale = sqrt((index + 1) / verbose) 

670 return int(next_scale) == int(scale) 

671 

672 

673############################################################################### 

674def delayed(function): 

675 """Decorator used to capture the arguments of a function.""" 

676 

677 def delayed_function(*args, **kwargs): 

678 return function, args, kwargs 

679 

680 try: 

681 delayed_function = functools.wraps(function)(delayed_function) 

682 except AttributeError: 

683 " functools.wraps fails on some callable objects " 

684 return delayed_function 

685 

686 

687############################################################################### 

688class BatchCompletionCallBack(object): 

689 """Callback to keep track of completed results and schedule the next tasks. 

690 

691 This callable is executed by the parent process whenever a worker process 

692 has completed a batch of tasks. 

693 

694 It is used for progress reporting, to update estimate of the batch 

695 processing duration and to schedule the next batch of tasks to be 

696 processed. 

697 

698 It is assumed that this callback will always be triggered by the backend 

699 right after the end of a task, in case of success as well as in case of 

700 failure. 

701 """ 

702 

703 ########################################################################## 

704 # METHODS CALLED BY THE MAIN THREAD # 

705 ########################################################################## 

706 def __init__(self, dispatch_timestamp, batch_size, parallel): 

707 self.dispatch_timestamp = dispatch_timestamp 

708 self.batch_size = batch_size 

709 self.parallel = parallel 

710 self.parallel_call_id = parallel._call_id 

711 self._completion_timeout_counter = None 

712 

713 # Internals to keep track of the status and outcome of the task. 

714 

715 # Used to hold a reference to the future-like object returned by the 

716 # backend after launching this task 

717 # This will be set later when calling `register_job`, as it is only 

718 # created once the task has been submitted. 

719 self.job = None 

720 

721 if not parallel._backend.supports_retrieve_callback: 

722 # The status is only used for asynchronous result retrieval in the 

723 # callback. 

724 self.status = None 

725 else: 

726 # The initial status for the job is TASK_PENDING. 

727 # Once it is done, it will be either TASK_DONE, or TASK_ERROR. 

728 self.status = TASK_PENDING 

729 

730 def register_job(self, job): 

731 """Register the object returned by `submit`.""" 

732 self.job = job 

733 

734 def get_result(self, timeout): 

735 """Returns the raw result of the task that was submitted. 

736 

737 If the task raised an exception rather than returning, this same 

738 exception will be raised instead. 

739 

740 If the backend supports the retrieval callback, it is assumed that this 

741 method is only called after the result has been registered. It is 

742 ensured by checking that `self.status(timeout)` does not return 

743 TASK_PENDING. In this case, `get_result` directly returns the 

744 registered result (or raise the registered exception). 

745 

746 For other backends, there are no such assumptions, but `get_result` 

747 still needs to synchronously retrieve the result before it can 

748 return it or raise. It will block at most `self.timeout` seconds 

749 waiting for retrieval to complete, after that it raises a TimeoutError. 

750 """ 

751 

752 backend = self.parallel._backend 

753 

754 if backend.supports_retrieve_callback: 

755 # We assume that the result has already been retrieved by the 

756 # callback thread, and is stored internally. It's just waiting to 

757 # be returned. 

758 return self._return_or_raise() 

759 

760 # For other backends, the main thread needs to run the retrieval step. 

761 try: 

762 result = backend.retrieve_result(self.job, timeout=timeout) 

763 outcome = dict(result=result, status=TASK_DONE) 

764 except BaseException as e: 

765 outcome = dict(result=e, status=TASK_ERROR) 

766 self._register_outcome(outcome) 

767 

768 return self._return_or_raise() 

769 

770 def _return_or_raise(self): 

771 try: 

772 if self.status == TASK_ERROR: 

773 raise self._result 

774 return self._result 

775 finally: 

776 del self._result 

777 

778 def get_status(self, timeout): 

779 """Get the status of the task. 

780 

781 This function also checks if the timeout has been reached and register 

782 the TimeoutError outcome when it is the case. 

783 """ 

784 if timeout is None or self.status != TASK_PENDING: 

785 return self.status 

786 

787 # The computation are running and the status is pending. 

788 # Check that we did not wait for this jobs more than `timeout`. 

789 now = time.time() 

790 if self._completion_timeout_counter is None: 

791 self._completion_timeout_counter = now 

792 

793 if (now - self._completion_timeout_counter) > timeout: 

794 outcome = dict(result=TimeoutError(), status=TASK_ERROR) 

795 self._register_outcome(outcome) 

796 

797 return self.status 

798 

799 ########################################################################## 

800 # METHODS CALLED BY CALLBACK THREADS # 

801 ########################################################################## 

802 def __call__(self, *args, **kwargs): 

803 """Function called by the callback thread after a job is completed.""" 

804 

805 # If the backend doesn't support callback retrievals, the next batch of 

806 # tasks is dispatched regardless. The result will be retrieved by the 

807 # main thread when calling `get_result`. 

808 if not self.parallel._backend.supports_retrieve_callback: 

809 self._dispatch_new() 

810 return 

811 

812 # If the backend supports retrieving the result in the callback, it 

813 # registers the task outcome (TASK_ERROR or TASK_DONE), and schedules 

814 # the next batch if needed. 

815 with self.parallel._lock: 

816 # Edge case where while the task was processing, the `parallel` 

817 # instance has been reset and a new call has been issued, but the 

818 # worker managed to complete the task and trigger this callback 

819 # call just before being aborted by the reset. 

820 if self.parallel._call_id != self.parallel_call_id: 

821 return 

822 

823 # When aborting, stop as fast as possible and do not retrieve the 

824 # result as it won't be returned by the Parallel call. 

825 if self.parallel._aborting: 

826 return 

827 

828 # Retrieves the result of the task in the main process and dispatch 

829 # a new batch if needed. 

830 job_succeeded = self._retrieve_result(*args, **kwargs) 

831 

832 if job_succeeded: 

833 self._dispatch_new() 

834 

835 def _dispatch_new(self): 

836 """Schedule the next batch of tasks to be processed.""" 

837 

838 # This steps ensure that auto-batching works as expected. 

839 this_batch_duration = time.time() - self.dispatch_timestamp 

840 self.parallel._backend.batch_completed(self.batch_size, this_batch_duration) 

841 

842 # Schedule the next batch of tasks. 

843 with self.parallel._lock: 

844 self.parallel.n_completed_tasks += self.batch_size 

845 self.parallel.print_progress() 

846 if self.parallel._original_iterator is not None: 

847 self.parallel.dispatch_next() 

848 

849 def _retrieve_result(self, out): 

850 """Fetch and register the outcome of a task. 

851 

852 Return True if the task succeeded, False otherwise. 

853 This function is only called by backends that support retrieving 

854 the task result in the callback thread. 

855 """ 

856 try: 

857 result = self.parallel._backend.retrieve_result_callback(out) 

858 outcome = dict(status=TASK_DONE, result=result) 

859 except BaseException as e: 

860 # Avoid keeping references to parallel in the error. 

861 e.__traceback__ = None 

862 outcome = dict(result=e, status=TASK_ERROR) 

863 

864 self._register_outcome(outcome) 

865 return outcome["status"] != TASK_ERROR 

866 

867 ########################################################################## 

868 # This method can be called either in the main thread # 

869 # or in the callback thread. # 

870 ########################################################################## 

871 def _register_outcome(self, outcome): 

872 """Register the outcome of a task. 

873 

874 This method can be called only once, future calls will be ignored. 

875 """ 

876 # Covers the edge case where the main thread tries to register a 

877 # `TimeoutError` while the callback thread tries to register a result 

878 # at the same time. 

879 with self.parallel._lock: 

880 if self.status not in (TASK_PENDING, None): 

881 return 

882 self.status = outcome["status"] 

883 

884 self._result = outcome["result"] 

885 

886 # Once the result and the status are extracted, the last reference to 

887 # the job can be deleted. 

888 self.job = None 

889 

890 # As soon as an error as been spotted, early stopping flags are sent to 

891 # the `parallel` instance. 

892 if self.status == TASK_ERROR: 

893 self.parallel._exception = True 

894 self.parallel._aborting = True 

895 

896 if self.parallel.return_ordered: 

897 return 

898 

899 with self.parallel._lock: 

900 # For `return_as=generator_unordered`, append the job to the queue 

901 # in the order of completion instead of submission. 

902 self.parallel._jobs.append(self) 

903 

904 

905############################################################################### 

906def register_parallel_backend(name, factory, make_default=False): 

907 """Register a new Parallel backend factory. 

908 

909 The new backend can then be selected by passing its name as the backend 

910 argument to the :class:`~Parallel` class. Moreover, the default backend can 

911 be overwritten globally by setting make_default=True. 

912 

913 The factory can be any callable that takes no argument and return an 

914 instance of ``ParallelBackendBase``. 

915 

916 Warning: this function is experimental and subject to change in a future 

917 version of joblib. 

918 

919 .. versionadded:: 0.10 

920 """ 

921 BACKENDS[name] = factory 

922 if make_default: 

923 global DEFAULT_BACKEND 

924 DEFAULT_BACKEND = name 

925 

926 

927def effective_n_jobs(n_jobs=-1): 

928 """Determine the number of jobs that can actually run in parallel 

929 

930 n_jobs is the number of workers requested by the callers. Passing n_jobs=-1 

931 means requesting all available workers for instance matching the number of 

932 CPU cores on the worker host(s). 

933 

934 This method should return a guesstimate of the number of workers that can 

935 actually perform work concurrently with the currently enabled default 

936 backend. The primary use case is to make it possible for the caller to know 

937 in how many chunks to slice the work. 

938 

939 In general working on larger data chunks is more efficient (less scheduling 

940 overhead and better use of CPU cache prefetching heuristics) as long as all 

941 the workers have enough work to do. 

942 

943 Warning: this function is experimental and subject to change in a future 

944 version of joblib. 

945 

946 .. versionadded:: 0.10 

947 """ 

948 if n_jobs == 1: 

949 return 1 

950 

951 backend, backend_n_jobs = get_active_backend() 

952 if n_jobs is None: 

953 n_jobs = backend_n_jobs 

954 return backend.effective_n_jobs(n_jobs=n_jobs) 

955 

956 

957############################################################################### 

958class Parallel(Logger): 

959 """Helper class for readable parallel mapping. 

960 

961 Read more in the :ref:`User Guide <parallel>`. 

962 

963 Parameters 

964 ---------- 

965 n_jobs: int, default=None 

966 The maximum number of concurrently running jobs, such as the number 

967 of Python worker processes when ``backend="loky"`` or the size of 

968 the thread-pool when ``backend="threading"``. 

969 This argument is converted to an integer, rounded below for float. 

970 If -1 is given, `joblib` tries to use all CPUs. The number of CPUs 

971 ``n_cpus`` is obtained with :func:`~cpu_count`. 

972 For n_jobs below -1, (n_cpus + 1 + n_jobs) are used. For instance, 

973 using ``n_jobs=-2`` will result in all CPUs but one being used. 

974 This argument can also go above ``n_cpus``, which will cause 

975 oversubscription. In some cases, slight oversubscription can be 

976 beneficial, e.g., for tasks with large I/O operations. 

977 If 1 is given, no parallel computing code is used at all, and the 

978 behavior amounts to a simple python `for` loop. This mode is not 

979 compatible with ``timeout``. 

980 None is a marker for 'unset' that will be interpreted as n_jobs=1 

981 unless the call is performed under a :func:`~parallel_config` 

982 context manager that sets another value for ``n_jobs``. 

983 If n_jobs = 0 then a ValueError is raised. 

984 backend: str, ParallelBackendBase instance or None, default='loky' 

985 Specify the parallelization backend implementation. 

986 Supported backends are: 

987 

988 - "loky" used by default, can induce some 

989 communication and memory overhead when exchanging input and 

990 output data with the worker Python processes. On some rare 

991 systems (such as Pyiodide), the loky backend may not be 

992 available. 

993 - "multiprocessing" previous process-based backend based on 

994 `multiprocessing.Pool`. Less robust than `loky`. 

995 - "threading" is a very low-overhead backend but it suffers 

996 from the Python Global Interpreter Lock if the called function 

997 relies a lot on Python objects. "threading" is mostly useful 

998 when the execution bottleneck is a compiled extension that 

999 explicitly releases the GIL (for instance a Cython loop wrapped 

1000 in a "with nogil" block or an expensive call to a library such 

1001 as NumPy). 

1002 - finally, you can register backends by calling 

1003 :func:`~register_parallel_backend`. This will allow you to 

1004 implement a backend of your liking. 

1005 

1006 It is not recommended to hard-code the backend name in a call to 

1007 :class:`~Parallel` in a library. Instead it is recommended to set 

1008 soft hints (prefer) or hard constraints (require) so as to make it 

1009 possible for library users to change the backend from the outside 

1010 using the :func:`~parallel_config` context manager. 

1011 return_as: str in {'list', 'generator', 'generator_unordered'}, default='list' 

1012 If 'list', calls to this instance will return a list, only when 

1013 all results have been processed and retrieved. 

1014 If 'generator', it will return a generator that yields the results 

1015 as soon as they are available, in the order the tasks have been 

1016 submitted with. 

1017 If 'generator_unordered', the generator will immediately yield 

1018 available results independently of the submission order. The output 

1019 order is not deterministic in this case because it depends on the 

1020 concurrency of the workers. 

1021 prefer: str in {'processes', 'threads'} or None, default=None 

1022 Soft hint to choose the default backend if no specific backend 

1023 was selected with the :func:`~parallel_config` context manager. 

1024 The default process-based backend is 'loky' and the default 

1025 thread-based backend is 'threading'. Ignored if the ``backend`` 

1026 parameter is specified. 

1027 require: 'sharedmem' or None, default=None 

1028 Hard constraint to select the backend. If set to 'sharedmem', 

1029 the selected backend will be single-host and thread-based even 

1030 if the user asked for a non-thread based backend with 

1031 :func:`~joblib.parallel_config`. 

1032 verbose: int, default=0 

1033 The verbosity level: if non zero, progress messages are 

1034 printed. Above 50, the output is sent to stdout. 

1035 The frequency of the messages increases with the verbosity level. 

1036 If it more than 10, all iterations are reported. 

1037 timeout: float or None, default=None 

1038 Timeout limit for each task to complete. If any task takes longer 

1039 a TimeOutError will be raised. Only applied when n_jobs != 1 

1040 pre_dispatch: {'all', integer, or expression, as in '3*n_jobs'}, default='2*n_jobs' 

1041 The number of batches (of tasks) to be pre-dispatched. 

1042 Default is '2*n_jobs'. When batch_size="auto" this is reasonable 

1043 default and the workers should never starve. Note that only basic 

1044 arithmetic are allowed here and no modules can be used in this 

1045 expression. 

1046 batch_size: int or 'auto', default='auto' 

1047 The number of atomic tasks to dispatch at once to each 

1048 worker. When individual evaluations are very fast, dispatching 

1049 calls to workers can be slower than sequential computation because 

1050 of the overhead. Batching fast computations together can mitigate 

1051 this. 

1052 The ``'auto'`` strategy keeps track of the time it takes for a 

1053 batch to complete, and dynamically adjusts the batch size to keep 

1054 the time on the order of half a second, using a heuristic. The 

1055 initial batch size is 1. 

1056 ``batch_size="auto"`` with ``backend="threading"`` will dispatch 

1057 batches of a single task at a time as the threading backend has 

1058 very little overhead and using larger batch size has not proved to 

1059 bring any gain in that case. 

1060 temp_folder: str or None, default=None 

1061 Folder to be used by the pool for memmapping large arrays 

1062 for sharing memory with worker processes. If None, this will try in 

1063 order: 

1064 

1065 - a folder pointed by the JOBLIB_TEMP_FOLDER environment 

1066 variable, 

1067 - /dev/shm if the folder exists and is writable: this is a 

1068 RAM disk filesystem available by default on modern Linux 

1069 distributions, 

1070 - the default system temporary folder that can be 

1071 overridden with TMP, TMPDIR or TEMP environment 

1072 variables, typically /tmp under Unix operating systems. 

1073 

1074 Only active when ``backend="loky"`` or ``"multiprocessing"``. 

1075 max_nbytes int, str, or None, optional, default='1M' 

1076 Threshold on the size of arrays passed to the workers that 

1077 triggers automated memory mapping in temp_folder. Can be an int 

1078 in Bytes, or a human-readable string, e.g., '1M' for 1 megabyte. 

1079 Use None to disable memmapping of large arrays. 

1080 Only active when ``backend="loky"`` or ``"multiprocessing"``. 

1081 mmap_mode: {None, 'r+', 'r', 'w+', 'c'}, default='r' 

1082 Memmapping mode for numpy arrays passed to workers. None will 

1083 disable memmapping, other modes defined in the numpy.memmap doc: 

1084 https://numpy.org/doc/stable/reference/generated/numpy.memmap.html 

1085 Also, see 'max_nbytes' parameter documentation for more details. 

1086 backend_kwargs: dict, optional 

1087 Additional parameters to pass to the backend `configure` method. 

1088 

1089 Notes 

1090 ----- 

1091 

1092 This object uses workers to compute in parallel the application of a 

1093 function to many different arguments. The main functionality it brings 

1094 in addition to using the raw multiprocessing or concurrent.futures API 

1095 are (see examples for details): 

1096 

1097 * More readable code, in particular since it avoids 

1098 constructing list of arguments. 

1099 

1100 * Easier debugging: 

1101 - informative tracebacks even when the error happens on 

1102 the client side 

1103 - using 'n_jobs=1' enables to turn off parallel computing 

1104 for debugging without changing the codepath 

1105 - early capture of pickling errors 

1106 

1107 * An optional progress meter. 

1108 

1109 * Interruption of multiprocesses jobs with 'Ctrl-C' 

1110 

1111 * Flexible pickling control for the communication to and from 

1112 the worker processes. 

1113 

1114 * Ability to use shared memory efficiently with worker 

1115 processes for large numpy-based datastructures. 

1116 

1117 Note that the intended usage is to run one call at a time. Multiple 

1118 calls to the same Parallel object will result in a ``RuntimeError`` 

1119 

1120 Examples 

1121 -------- 

1122 

1123 A simple example: 

1124 

1125 >>> from math import sqrt 

1126 >>> from joblib import Parallel, delayed 

1127 >>> Parallel(n_jobs=1)(delayed(sqrt)(i**2) for i in range(10)) 

1128 [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0] 

1129 

1130 Reshaping the output when the function has several return 

1131 values: 

1132 

1133 >>> from math import modf 

1134 >>> from joblib import Parallel, delayed 

1135 >>> r = Parallel(n_jobs=1)(delayed(modf)(i/2.) for i in range(10)) 

1136 >>> res, i = zip(*r) 

1137 >>> res 

1138 (0.0, 0.5, 0.0, 0.5, 0.0, 0.5, 0.0, 0.5, 0.0, 0.5) 

1139 >>> i 

1140 (0.0, 0.0, 1.0, 1.0, 2.0, 2.0, 3.0, 3.0, 4.0, 4.0) 

1141 

1142 The progress meter: the higher the value of `verbose`, the more 

1143 messages: 

1144 

1145 >>> from time import sleep 

1146 >>> from joblib import Parallel, delayed 

1147 >>> r = Parallel(n_jobs=2, verbose=10)( 

1148 ... delayed(sleep)(.2) for _ in range(10)) #doctest: +SKIP 

1149 [Parallel(n_jobs=2)]: Done 1 tasks | elapsed: 0.6s 

1150 [Parallel(n_jobs=2)]: Done 4 tasks | elapsed: 0.8s 

1151 [Parallel(n_jobs=2)]: Done 10 out of 10 | elapsed: 1.4s finished 

1152 

1153 Traceback example, note how the line of the error is indicated 

1154 as well as the values of the parameter passed to the function that 

1155 triggered the exception, even though the traceback happens in the 

1156 child process: 

1157 

1158 >>> from heapq import nlargest 

1159 >>> from joblib import Parallel, delayed 

1160 >>> Parallel(n_jobs=2)( 

1161 ... delayed(nlargest)(2, n) for n in (range(4), 'abcde', 3)) 

1162 ... # doctest: +SKIP 

1163 ----------------------------------------------------------------------- 

1164 Sub-process traceback: 

1165 ----------------------------------------------------------------------- 

1166 TypeError Mon Nov 12 11:37:46 2012 

1167 PID: 12934 Python 2.7.3: /usr/bin/python 

1168 ........................................................................ 

1169 /usr/lib/python2.7/heapq.pyc in nlargest(n=2, iterable=3, key=None) 

1170 419 if n >= size: 

1171 420 return sorted(iterable, key=key, reverse=True)[:n] 

1172 421 

1173 422 # When key is none, use simpler decoration 

1174 423 if key is None: 

1175 --> 424 it = izip(iterable, count(0,-1)) # decorate 

1176 425 result = _nlargest(n, it) 

1177 426 return map(itemgetter(0), result) # undecorate 

1178 427 

1179 428 # General case, slowest method 

1180 TypeError: izip argument #1 must support iteration 

1181 _______________________________________________________________________ 

1182 

1183 

1184 Using pre_dispatch in a producer/consumer situation, where the 

1185 data is generated on the fly. Note how the producer is first 

1186 called 3 times before the parallel loop is initiated, and then 

1187 called to generate new data on the fly: 

1188 

1189 >>> from math import sqrt 

1190 >>> from joblib import Parallel, delayed 

1191 >>> def producer(): 

1192 ... for i in range(6): 

1193 ... print('Produced %s' % i) 

1194 ... yield i 

1195 >>> out = Parallel(n_jobs=2, verbose=100, pre_dispatch='1.5*n_jobs')( 

1196 ... delayed(sqrt)(i) for i in producer()) #doctest: +SKIP 

1197 Produced 0 

1198 Produced 1 

1199 Produced 2 

1200 [Parallel(n_jobs=2)]: Done 1 jobs | elapsed: 0.0s 

1201 Produced 3 

1202 [Parallel(n_jobs=2)]: Done 2 jobs | elapsed: 0.0s 

1203 Produced 4 

1204 [Parallel(n_jobs=2)]: Done 3 jobs | elapsed: 0.0s 

1205 Produced 5 

1206 [Parallel(n_jobs=2)]: Done 4 jobs | elapsed: 0.0s 

1207 [Parallel(n_jobs=2)]: Done 6 out of 6 | elapsed: 0.0s remaining: 0.0s 

1208 [Parallel(n_jobs=2)]: Done 6 out of 6 | elapsed: 0.0s finished 

1209 

1210 """ # noqa: E501 

1211 

1212 def __init__( 

1213 self, 

1214 n_jobs=default_parallel_config["n_jobs"], 

1215 backend=default_parallel_config["backend"], 

1216 return_as="list", 

1217 verbose=default_parallel_config["verbose"], 

1218 timeout=None, 

1219 pre_dispatch="2 * n_jobs", 

1220 batch_size="auto", 

1221 temp_folder=default_parallel_config["temp_folder"], 

1222 max_nbytes=default_parallel_config["max_nbytes"], 

1223 mmap_mode=default_parallel_config["mmap_mode"], 

1224 prefer=default_parallel_config["prefer"], 

1225 require=default_parallel_config["require"], 

1226 **backend_kwargs, 

1227 ): 

1228 # Initiate parent Logger class state 

1229 super().__init__() 

1230 

1231 # Interpret n_jobs=None as 'unset' 

1232 if n_jobs is None: 

1233 n_jobs = default_parallel_config["n_jobs"] 

1234 

1235 active_backend, context_config = _get_active_backend( 

1236 prefer=prefer, require=require, verbose=verbose 

1237 ) 

1238 

1239 nesting_level = active_backend.nesting_level 

1240 

1241 self.verbose = _get_config_param(verbose, context_config, "verbose") 

1242 self.timeout = timeout 

1243 self.pre_dispatch = pre_dispatch 

1244 

1245 if return_as not in {"list", "generator", "generator_unordered"}: 

1246 raise ValueError( 

1247 'Expected `return_as` parameter to be a string equal to "list"' 

1248 f',"generator" or "generator_unordered", but got {return_as} ' 

1249 "instead." 

1250 ) 

1251 self.return_as = return_as 

1252 self.return_generator = return_as != "list" 

1253 self.return_ordered = return_as != "generator_unordered" 

1254 

1255 # Check if we are under a parallel_config or parallel_backend 

1256 # context manager and use the config from the context manager 

1257 # for arguments that are not explicitly set. 

1258 self._backend_kwargs = { 

1259 **backend_kwargs, 

1260 **{ 

1261 k: _get_config_param(param, context_config, k) 

1262 for param, k in [ 

1263 (max_nbytes, "max_nbytes"), 

1264 (temp_folder, "temp_folder"), 

1265 (mmap_mode, "mmap_mode"), 

1266 (prefer, "prefer"), 

1267 (require, "require"), 

1268 (verbose, "verbose"), 

1269 ] 

1270 }, 

1271 } 

1272 

1273 if isinstance(self._backend_kwargs["max_nbytes"], str): 

1274 self._backend_kwargs["max_nbytes"] = memstr_to_bytes( 

1275 self._backend_kwargs["max_nbytes"] 

1276 ) 

1277 self._backend_kwargs["verbose"] = max(0, self._backend_kwargs["verbose"] - 50) 

1278 

1279 if DEFAULT_MP_CONTEXT is not None: 

1280 self._backend_kwargs["context"] = DEFAULT_MP_CONTEXT 

1281 elif hasattr(mp, "get_context"): 

1282 self._backend_kwargs["context"] = mp.get_context() 

1283 

1284 if backend is default_parallel_config["backend"] or backend is None: 

1285 backend = active_backend 

1286 

1287 elif isinstance(backend, ParallelBackendBase): 

1288 # Use provided backend as is, with the current nesting_level if it 

1289 # is not set yet. 

1290 if backend.nesting_level is None: 

1291 backend.nesting_level = nesting_level 

1292 

1293 elif hasattr(backend, "Pool") and hasattr(backend, "Lock"): 

1294 # Make it possible to pass a custom multiprocessing context as 

1295 # backend to change the start method to forkserver or spawn or 

1296 # preload modules on the forkserver helper process. 

1297 self._backend_kwargs["context"] = backend 

1298 backend = MultiprocessingBackend(nesting_level=nesting_level) 

1299 

1300 elif backend not in BACKENDS and backend in MAYBE_AVAILABLE_BACKENDS: 

1301 warnings.warn( 

1302 f"joblib backend '{backend}' is not available on " 

1303 f"your system, falling back to {DEFAULT_BACKEND}.", 

1304 UserWarning, 

1305 stacklevel=2, 

1306 ) 

1307 BACKENDS[backend] = BACKENDS[DEFAULT_BACKEND] 

1308 backend = BACKENDS[DEFAULT_BACKEND](nesting_level=nesting_level) 

1309 

1310 else: 

1311 try: 

1312 backend_factory = BACKENDS[backend] 

1313 except KeyError as e: 

1314 raise ValueError( 

1315 "Invalid backend: %s, expected one of %r" 

1316 % (backend, sorted(BACKENDS.keys())) 

1317 ) from e 

1318 backend = backend_factory(nesting_level=nesting_level) 

1319 

1320 n_jobs = _get_config_param(n_jobs, context_config, "n_jobs") 

1321 if n_jobs is None: 

1322 # No specific context override and no specific value request: 

1323 # default to the default of the backend. 

1324 n_jobs = backend.default_n_jobs 

1325 try: 

1326 n_jobs = int(n_jobs) 

1327 except ValueError: 

1328 raise ValueError("n_jobs could not be converted to int") 

1329 self.n_jobs = n_jobs 

1330 

1331 if require == "sharedmem" and not getattr(backend, "supports_sharedmem", False): 

1332 raise ValueError("Backend %s does not support shared memory" % backend) 

1333 

1334 if batch_size == "auto" or isinstance(batch_size, Integral) and batch_size > 0: 

1335 self.batch_size = batch_size 

1336 else: 

1337 raise ValueError( 

1338 "batch_size must be 'auto' or a positive integer, got: %r" % batch_size 

1339 ) 

1340 

1341 if not isinstance(backend, SequentialBackend): 

1342 if self.return_generator and not backend.supports_return_generator: 

1343 raise ValueError( 

1344 "Backend {} does not support return_as={}".format( 

1345 backend, return_as 

1346 ) 

1347 ) 

1348 # This lock is used to coordinate the main thread of this process 

1349 # with the async callback thread of our the pool. 

1350 self._lock = threading.RLock() 

1351 self._jobs = collections.deque() 

1352 self._jobs_set = set() 

1353 self._pending_outputs = list() 

1354 self._ready_batches = queue.Queue() 

1355 self._reducer_callback = None 

1356 

1357 # Internal variables 

1358 self._backend = backend 

1359 self._running = False 

1360 self._managed_backend = False 

1361 self._id = uuid4().hex 

1362 self._call_ref = None 

1363 

1364 def __enter__(self): 

1365 self._managed_backend = True 

1366 self._calling = False 

1367 self._initialize_backend() 

1368 return self 

1369 

1370 def __exit__(self, exc_type, exc_value, traceback): 

1371 self._managed_backend = False 

1372 if self.return_generator and self._calling: 

1373 self._abort() 

1374 self._terminate_and_reset() 

1375 

1376 def _initialize_backend(self): 

1377 """Build a process or thread pool and return the number of workers""" 

1378 try: 

1379 n_jobs = self._backend.configure( 

1380 n_jobs=self.n_jobs, parallel=self, **self._backend_kwargs 

1381 ) 

1382 if self.timeout is not None and not self._backend.supports_timeout: 

1383 warnings.warn( 

1384 "The backend class {!r} does not support timeout. " 

1385 "You have set 'timeout={}' in Parallel but " 

1386 "the 'timeout' parameter will not be used.".format( 

1387 self._backend.__class__.__name__, self.timeout 

1388 ) 

1389 ) 

1390 

1391 except FallbackToBackend as e: 

1392 # Recursively initialize the backend in case of requested fallback. 

1393 self._backend = e.backend 

1394 n_jobs = self._initialize_backend() 

1395 

1396 return n_jobs 

1397 

1398 def _effective_n_jobs(self): 

1399 if self._backend: 

1400 return self._backend.effective_n_jobs(self.n_jobs) 

1401 return 1 

1402 

1403 def _terminate_and_reset(self): 

1404 if hasattr(self._backend, "stop_call") and self._calling: 

1405 self._backend.stop_call() 

1406 self._calling = False 

1407 if not self._managed_backend: 

1408 self._backend.terminate() 

1409 

1410 def _dispatch(self, batch): 

1411 """Queue the batch for computing, with or without multiprocessing 

1412 

1413 WARNING: this method is not thread-safe: it should be only called 

1414 indirectly via dispatch_one_batch. 

1415 

1416 """ 

1417 # If job.get() catches an exception, it closes the queue: 

1418 if self._aborting: 

1419 return 

1420 

1421 batch_size = len(batch) 

1422 

1423 self.n_dispatched_tasks += batch_size 

1424 self.n_dispatched_batches += 1 

1425 

1426 dispatch_timestamp = time.time() 

1427 

1428 batch_tracker = BatchCompletionCallBack(dispatch_timestamp, batch_size, self) 

1429 

1430 self._register_new_job(batch_tracker) 

1431 

1432 # If return_ordered is False, the batch_tracker is not stored in the 

1433 # jobs queue at the time of submission. Instead, it will be appended to 

1434 # the queue by itself as soon as the callback is triggered to be able 

1435 # to return the results in the order of completion. 

1436 

1437 job = self._backend.submit(batch, callback=batch_tracker) 

1438 batch_tracker.register_job(job) 

1439 

1440 def _register_new_job(self, batch_tracker): 

1441 if self.return_ordered: 

1442 self._jobs.append(batch_tracker) 

1443 else: 

1444 self._jobs_set.add(batch_tracker) 

1445 

1446 def dispatch_next(self): 

1447 """Dispatch more data for parallel processing 

1448 

1449 This method is meant to be called concurrently by the multiprocessing 

1450 callback. We rely on the thread-safety of dispatch_one_batch to protect 

1451 against concurrent consumption of the unprotected iterator. 

1452 """ 

1453 if not self.dispatch_one_batch(self._original_iterator): 

1454 self._iterating = False 

1455 self._original_iterator = None 

1456 

1457 def dispatch_one_batch(self, iterator): 

1458 """Prefetch the tasks for the next batch and dispatch them. 

1459 

1460 The effective size of the batch is computed here. 

1461 If there are no more jobs to dispatch, return False, else return True. 

1462 

1463 The iterator consumption and dispatching is protected by the same 

1464 lock so calling this function should be thread safe. 

1465 

1466 """ 

1467 

1468 if self._aborting: 

1469 return False 

1470 

1471 batch_size = self._get_batch_size() 

1472 

1473 with self._lock: 

1474 # to ensure an even distribution of the workload between workers, 

1475 # we look ahead in the original iterators more than batch_size 

1476 # tasks - However, we keep consuming only one batch at each 

1477 # dispatch_one_batch call. The extra tasks are stored in a local 

1478 # queue, _ready_batches, that is looked-up prior to re-consuming 

1479 # tasks from the origal iterator. 

1480 try: 

1481 tasks = self._ready_batches.get(block=False) 

1482 except queue.Empty: 

1483 # slice the iterator n_jobs * batchsize items at a time. If the 

1484 # slice returns less than that, then the current batchsize puts 

1485 # too much weight on a subset of workers, while other may end 

1486 # up starving. So in this case, re-scale the batch size 

1487 # accordingly to distribute evenly the last items between all 

1488 # workers. 

1489 n_jobs = self._cached_effective_n_jobs 

1490 big_batch_size = batch_size * n_jobs 

1491 

1492 try: 

1493 islice = list(itertools.islice(iterator, big_batch_size)) 

1494 except Exception as e: 

1495 # Handle the fact that the generator of task raised an 

1496 # exception. As this part of the code can be executed in 

1497 # a thread internal to the backend, register a task with 

1498 # an error that will be raised in the user's thread. 

1499 if isinstance(e.__context__, queue.Empty): 

1500 # Suppress the cause of the exception if it is 

1501 # queue.Empty to avoid cluttered traceback. Only do it 

1502 # if the __context__ is really empty to avoid messing 

1503 # with causes of the original error. 

1504 e.__cause__ = None 

1505 batch_tracker = BatchCompletionCallBack(0, batch_size, self) 

1506 self._register_new_job(batch_tracker) 

1507 batch_tracker._register_outcome(dict(result=e, status=TASK_ERROR)) 

1508 return True 

1509 

1510 if len(islice) == 0: 

1511 return False 

1512 elif ( 

1513 iterator is self._original_iterator and len(islice) < big_batch_size 

1514 ): 

1515 # We reached the end of the original iterator (unless 

1516 # iterator is the ``pre_dispatch``-long initial slice of 

1517 # the original iterator) -- decrease the batch size to 

1518 # account for potential variance in the batches running 

1519 # time. 

1520 final_batch_size = max(1, len(islice) // (10 * n_jobs)) 

1521 else: 

1522 final_batch_size = max(1, len(islice) // n_jobs) 

1523 

1524 # enqueue n_jobs batches in a local queue 

1525 for i in range(0, len(islice), final_batch_size): 

1526 tasks = BatchedCalls( 

1527 islice[i : i + final_batch_size], 

1528 self._backend.get_nested_backend(), 

1529 self._reducer_callback, 

1530 self._pickle_cache, 

1531 ) 

1532 self._ready_batches.put(tasks) 

1533 

1534 # finally, get one task. 

1535 tasks = self._ready_batches.get(block=False) 

1536 if len(tasks) == 0: 

1537 # No more tasks available in the iterator: tell caller to stop. 

1538 return False 

1539 else: 

1540 self._dispatch(tasks) 

1541 return True 

1542 

1543 def _get_batch_size(self): 

1544 """Returns the effective batch size for dispatch""" 

1545 if self.batch_size == "auto": 

1546 return self._backend.compute_batch_size() 

1547 else: 

1548 # Fixed batch size strategy 

1549 return self.batch_size 

1550 

1551 def _print(self, msg): 

1552 """Display the message on stout or stderr depending on verbosity""" 

1553 # XXX: Not using the logger framework: need to 

1554 # learn to use logger better. 

1555 if not self.verbose: 

1556 return 

1557 if self.verbose < 50: 

1558 writer = sys.stderr.write 

1559 else: 

1560 writer = sys.stdout.write 

1561 writer(f"[{self}]: {msg}\n") 

1562 

1563 def _is_completed(self): 

1564 """Check if all tasks have been completed""" 

1565 return self.n_completed_tasks == self.n_dispatched_tasks and not ( 

1566 self._iterating or self._aborting 

1567 ) 

1568 

1569 def print_progress(self): 

1570 """Display the process of the parallel execution only a fraction 

1571 of time, controlled by self.verbose. 

1572 """ 

1573 

1574 if not self.verbose: 

1575 return 

1576 

1577 if self.n_tasks is not None and self.n_tasks > 0: 

1578 width = floor(log10(self.n_tasks)) + 1 

1579 else: 

1580 width = 3 

1581 elapsed_time = time.time() - self._start_time 

1582 

1583 if self._is_completed(): 

1584 # Make sure that we get a last message telling us we are done 

1585 self._print( 

1586 f"Done {self.n_completed_tasks:{width}d} out of " 

1587 f"{self.n_completed_tasks:{width}d} | elapsed: " 

1588 f"{short_format_time(elapsed_time)} finished" 

1589 ) 

1590 return 

1591 

1592 # Original job iterator becomes None once it has been fully 

1593 # consumed: at this point we know the total number of jobs and we are 

1594 # able to display an estimation of the remaining time based on already 

1595 # completed jobs. Otherwise, we simply display the number of completed 

1596 # tasks. 

1597 elif self._original_iterator is not None: 

1598 if _verbosity_filter(self.n_dispatched_batches, self.verbose): 

1599 return 

1600 fmt_time = f"| elapsed: {short_format_time(elapsed_time)}" 

1601 index = self.n_completed_tasks 

1602 if self.n_tasks is not None: 

1603 self._print( 

1604 f"Done {index:{width}d} out of {self.n_tasks:{width}d} {fmt_time}" 

1605 ) 

1606 else: 

1607 pad = " " * (len("out of ") + width - len("tasks")) 

1608 self._print(f"Done {index:{width}d} tasks {pad}{fmt_time}") 

1609 else: 

1610 index = self.n_completed_tasks 

1611 # We are finished dispatching 

1612 total_tasks = self.n_dispatched_tasks 

1613 # We always display the first loop 

1614 if index != 0: 

1615 # Display depending on the number of remaining items 

1616 # A message as soon as we finish dispatching, cursor is 0 

1617 cursor = total_tasks - index + 1 - self._pre_dispatch_amount 

1618 frequency = (total_tasks // self.verbose) + 1 

1619 is_last_item = index + 1 == total_tasks 

1620 if is_last_item or cursor % frequency: 

1621 return 

1622 remaining_time = (elapsed_time / max(index, 1)) * ( 

1623 self.n_dispatched_tasks - index 

1624 ) 

1625 # only display status if remaining time is greater or equal to 0 

1626 self._print( 

1627 f"Done {index:{width}d} out of {total_tasks:{width}d} " 

1628 f"| elapsed: {short_format_time(elapsed_time)} remaining: " 

1629 f"{short_format_time(remaining_time)}" 

1630 ) 

1631 

1632 def _abort(self): 

1633 # Stop dispatching new jobs in the async callback thread 

1634 self._aborting = True 

1635 

1636 # If the backend allows it, cancel or kill remaining running 

1637 # tasks without waiting for the results as we will raise 

1638 # the exception we got back to the caller instead of returning 

1639 # any result. 

1640 backend = self._backend 

1641 if not self._aborted and hasattr(backend, "abort_everything"): 

1642 # If the backend is managed externally we need to make sure 

1643 # to leave it in a working state to allow for future jobs 

1644 # scheduling. 

1645 ensure_ready = self._managed_backend 

1646 backend.abort_everything(ensure_ready=ensure_ready) 

1647 self._aborted = True 

1648 

1649 def _start(self, iterator, pre_dispatch): 

1650 # Only set self._iterating to True if at least a batch 

1651 # was dispatched. In particular this covers the edge 

1652 # case of Parallel used with an exhausted iterator. If 

1653 # self._original_iterator is None, then this means either 

1654 # that pre_dispatch == "all", n_jobs == 1 or that the first batch 

1655 # was very quick and its callback already dispatched all the 

1656 # remaining jobs. 

1657 self._iterating = False 

1658 if self.dispatch_one_batch(iterator): 

1659 self._iterating = self._original_iterator is not None 

1660 

1661 while self.dispatch_one_batch(iterator): 

1662 pass 

1663 

1664 if pre_dispatch == "all": 

1665 # The iterable was consumed all at once by the above for loop. 

1666 # No need to wait for async callbacks to trigger to 

1667 # consumption. 

1668 self._iterating = False 

1669 

1670 def _get_outputs(self, iterator, pre_dispatch): 

1671 """Iterator returning the tasks' output as soon as they are ready.""" 

1672 dispatch_thread_id = threading.get_ident() 

1673 detach_generator_exit = False 

1674 try: 

1675 self._start(iterator, pre_dispatch) 

1676 # first yield returns None, for internal use only. This ensures 

1677 # that we enter the try/except block and start dispatching the 

1678 # tasks. 

1679 yield 

1680 

1681 with self._backend.retrieval_context(): 

1682 yield from self._retrieve() 

1683 

1684 except GeneratorExit: 

1685 # The generator has been garbage collected before being fully 

1686 # consumed. This aborts the remaining tasks if possible and warn 

1687 # the user if necessary. 

1688 self._exception = True 

1689 

1690 # In some interpreters such as PyPy, GeneratorExit can be raised in 

1691 # a different thread than the one used to start the dispatch of the 

1692 # parallel tasks. This can lead to hang when a thread attempts to 

1693 # join itself. As workaround, we detach the execution of the 

1694 # aborting code to a dedicated thread. We then need to make sure 

1695 # the rest of the function does not call `_terminate_and_reset` 

1696 # in finally. 

1697 if dispatch_thread_id != threading.get_ident(): 

1698 warnings.warn( 

1699 "A generator produced by joblib.Parallel has been " 

1700 "gc'ed in an unexpected thread. This behavior should " 

1701 "not cause major -issues but to make sure, please " 

1702 "report this warning and your use case at " 

1703 "https://github.com/joblib/joblib/issues so it can " 

1704 "be investigated." 

1705 ) 

1706 

1707 detach_generator_exit = True 

1708 _parallel = self 

1709 

1710 class _GeneratorExitThread(threading.Thread): 

1711 def run(self): 

1712 _parallel._abort() 

1713 if _parallel.return_generator: 

1714 _parallel._warn_exit_early() 

1715 _parallel._terminate_and_reset() 

1716 

1717 _GeneratorExitThread(name="GeneratorExitThread").start() 

1718 return 

1719 

1720 # Otherwise, we are in the thread that started the dispatch: we can 

1721 # safely abort the execution and warn the user. 

1722 self._abort() 

1723 if self.return_generator: 

1724 self._warn_exit_early() 

1725 

1726 raise 

1727 

1728 # Note: we catch any BaseException instead of just Exception instances 

1729 # to also include KeyboardInterrupt 

1730 except BaseException: 

1731 self._exception = True 

1732 self._abort() 

1733 raise 

1734 finally: 

1735 # Store the unconsumed tasks and terminate the workers if necessary 

1736 _remaining_outputs = [] if self._exception else self._jobs 

1737 self._jobs = collections.deque() 

1738 self._jobs_set = set() 

1739 self._running = False 

1740 if not detach_generator_exit: 

1741 self._terminate_and_reset() 

1742 

1743 while len(_remaining_outputs) > 0: 

1744 batched_results = _remaining_outputs.popleft() 

1745 batched_results = batched_results.get_result(self.timeout) 

1746 for result in batched_results: 

1747 yield result 

1748 

1749 def _wait_retrieval(self): 

1750 """Return True if we need to continue retrieving some tasks.""" 

1751 

1752 # If the input load is still being iterated over, it means that tasks 

1753 # are still on the dispatch waitlist and their results will need to 

1754 # be retrieved later on. 

1755 if self._iterating: 

1756 return True 

1757 

1758 # If some of the dispatched tasks are still being processed by the 

1759 # workers, wait for the compute to finish before starting retrieval 

1760 if self.n_completed_tasks < self.n_dispatched_tasks: 

1761 return True 

1762 

1763 # For backends that does not support retrieving asynchronously the 

1764 # result to the main process, all results must be carefully retrieved 

1765 # in the _retrieve loop in the main thread while the backend is alive. 

1766 # For other backends, the actual retrieval is done asynchronously in 

1767 # the callback thread, and we can terminate the backend before the 

1768 # `self._jobs` result list has been emptied. The remaining results 

1769 # will be collected in the `finally` step of the generator. 

1770 if not self._backend.supports_retrieve_callback: 

1771 if len(self._jobs) > 0: 

1772 return True 

1773 

1774 return False 

1775 

1776 def _retrieve(self): 

1777 timeout_control_job = None 

1778 while self._wait_retrieval(): 

1779 # If the callback thread of a worker has signaled that its task 

1780 # triggered an exception, or if the retrieval loop has raised an 

1781 # exception (e.g. `GeneratorExit`), exit the loop and surface the 

1782 # worker traceback. 

1783 if self._aborting: 

1784 self._raise_error_fast() 

1785 break 

1786 

1787 nb_jobs = len(self._jobs) 

1788 # Now wait for a job to be ready for retrieval. 

1789 if self.return_ordered: 

1790 # Case ordered: wait for completion (or error) of the next job 

1791 # that have been dispatched and not retrieved yet. If no job 

1792 # have been dispatched yet, wait for dispatch. 

1793 # We assume that the time to wait for the next job to be 

1794 # dispatched is always low, so that the timeout 

1795 # control only have to be done on the amount of time the next 

1796 # dispatched job is pending. 

1797 if (nb_jobs == 0) or ( 

1798 self._jobs[0].get_status(timeout=self.timeout) == TASK_PENDING 

1799 ): 

1800 time.sleep(0.01) 

1801 continue 

1802 

1803 elif nb_jobs == 0: 

1804 # Case unordered: jobs are added to the list of jobs to 

1805 # retrieve `self._jobs` only once completed or in error, which 

1806 # is too late to enable timeout control in the same way than in 

1807 # the previous case. 

1808 # Instead, if no job is ready to be retrieved yet, we 

1809 # arbitrarily pick a dispatched job, and the timeout control is 

1810 # done such that an error is raised if this control job 

1811 # timeouts before any other dispatched job has completed and 

1812 # been added to `self._jobs` to be retrieved. 

1813 if timeout_control_job is None: 

1814 timeout_control_job = next(iter(self._jobs_set), None) 

1815 

1816 # NB: it can be None if no job has been dispatched yet. 

1817 if timeout_control_job is not None: 

1818 timeout_control_job.get_status(timeout=self.timeout) 

1819 

1820 time.sleep(0.01) 

1821 continue 

1822 

1823 elif timeout_control_job is not None: 

1824 # Case unordered, when `nb_jobs > 0`: 

1825 # It means that a job is ready to be retrieved, so no timeout 

1826 # will occur during this iteration. 

1827 # Before proceeding to retrieval of the next ready job, reset 

1828 # the timeout control state to prepare the next iteration. 

1829 timeout_control_job._completion_timeout_counter = None 

1830 timeout_control_job = None 

1831 

1832 # We need to be careful: the job list can be filling up as 

1833 # we empty it and Python list are not thread-safe by 

1834 # default hence the use of the lock 

1835 with self._lock: 

1836 batched_results = self._jobs.popleft() 

1837 if not self.return_ordered: 

1838 self._jobs_set.remove(batched_results) 

1839 

1840 # Flatten the batched results to output one output at a time 

1841 batched_results = batched_results.get_result(self.timeout) 

1842 for result in batched_results: 

1843 self._nb_consumed += 1 

1844 yield result 

1845 

1846 def _raise_error_fast(self): 

1847 """If we are aborting, raise if a job caused an error.""" 

1848 

1849 # Find the first job whose status is TASK_ERROR if it exists. 

1850 with self._lock: 

1851 error_job = next( 

1852 (job for job in self._jobs if job.status == TASK_ERROR), None 

1853 ) 

1854 

1855 # If this error job exists, immediately raise the error by 

1856 # calling get_result. This job might not exists if abort has been 

1857 # called directly or if the generator is gc'ed. 

1858 if error_job is not None: 

1859 error_job.get_result(self.timeout) 

1860 

1861 def _warn_exit_early(self): 

1862 """Warn the user if the generator is gc'ed before being consumned.""" 

1863 ready_outputs = self.n_completed_tasks - self._nb_consumed 

1864 is_completed = self._is_completed() 

1865 msg = "" 

1866 if ready_outputs: 

1867 msg += ( 

1868 f"{ready_outputs} tasks have been successfully executed but not used." 

1869 ) 

1870 if not is_completed: 

1871 msg += " Additionally, " 

1872 

1873 if not is_completed: 

1874 msg += ( 

1875 f"{self.n_dispatched_tasks - self.n_completed_tasks} tasks " 

1876 "which were still being processed by the workers have been " 

1877 "cancelled." 

1878 ) 

1879 

1880 if msg: 

1881 msg += ( 

1882 " You could benefit from adjusting the input task " 

1883 "iterator to limit unnecessary computation time." 

1884 ) 

1885 

1886 warnings.warn(msg) 

1887 

1888 def _get_sequential_output(self, iterable): 

1889 """Separate loop for sequential output. 

1890 

1891 This simplifies the traceback in case of errors and reduces the 

1892 overhead of calling sequential tasks with `joblib`. 

1893 """ 

1894 try: 

1895 self._iterating = True 

1896 self._original_iterator = iterable 

1897 batch_size = self._get_batch_size() 

1898 

1899 if batch_size != 1: 

1900 it = iter(iterable) 

1901 iterable_batched = iter( 

1902 lambda: tuple(itertools.islice(it, batch_size)), () 

1903 ) 

1904 iterable = (task for batch in iterable_batched for task in batch) 

1905 

1906 # first yield returns None, for internal use only. This ensures 

1907 # that we enter the try/except block and setup the generator. 

1908 yield None 

1909 

1910 # Sequentially call the tasks and yield the results. 

1911 for func, args, kwargs in iterable: 

1912 self.n_dispatched_batches += 1 

1913 self.n_dispatched_tasks += 1 

1914 res = func(*args, **kwargs) 

1915 self.n_completed_tasks += 1 

1916 self.print_progress() 

1917 yield res 

1918 self._nb_consumed += 1 

1919 except BaseException: 

1920 self._exception = True 

1921 self._aborting = True 

1922 self._aborted = True 

1923 raise 

1924 finally: 

1925 self._running = False 

1926 self._iterating = False 

1927 self._original_iterator = None 

1928 self.print_progress() 

1929 

1930 def _reset_run_tracking(self): 

1931 """Reset the counters and flags used to track the execution.""" 

1932 

1933 # Makes sur the parallel instance was not previously running in a 

1934 # thread-safe way. 

1935 with getattr(self, "_lock", nullcontext()): 

1936 if self._running: 

1937 msg = "This Parallel instance is already running !" 

1938 if self.return_generator is True: 

1939 msg += ( 

1940 " Before submitting new tasks, you must wait for the " 

1941 "completion of all the previous tasks, or clean all " 

1942 "references to the output generator." 

1943 ) 

1944 raise RuntimeError(msg) 

1945 self._running = True 

1946 

1947 # Counter to keep track of the task dispatched and completed. 

1948 self.n_dispatched_batches = 0 

1949 self.n_dispatched_tasks = 0 

1950 self.n_completed_tasks = 0 

1951 

1952 # Following count is incremented by one each time the user iterates 

1953 # on the output generator, it is used to prepare an informative 

1954 # warning message in case the generator is deleted before all the 

1955 # dispatched tasks have been consumed. 

1956 self._nb_consumed = 0 

1957 

1958 # Following flags are used to synchronize the threads in case one of 

1959 # the tasks error-out to ensure that all workers abort fast and that 

1960 # the backend terminates properly. 

1961 

1962 # Set to True as soon as a worker signals that a task errors-out 

1963 self._exception = False 

1964 # Set to True in case of early termination following an incident 

1965 self._aborting = False 

1966 # Set to True after abortion is complete 

1967 self._aborted = False 

1968 

1969 def __call__(self, iterable): 

1970 """Main function to dispatch parallel tasks.""" 

1971 

1972 self._reset_run_tracking() 

1973 self.n_tasks = len(iterable) if hasattr(iterable, "__len__") else None 

1974 self._start_time = time.time() 

1975 

1976 if not self._managed_backend: 

1977 n_jobs = self._initialize_backend() 

1978 else: 

1979 n_jobs = self._effective_n_jobs() 

1980 

1981 if n_jobs == 1: 

1982 # If n_jobs==1, run the computation sequentially and return 

1983 # immediately to avoid overheads. 

1984 output = self._get_sequential_output(iterable) 

1985 next(output) 

1986 return output if self.return_generator else list(output) 

1987 

1988 # Let's create an ID that uniquely identifies the current call. If the 

1989 # call is interrupted early and that the same instance is immediately 

1990 # reused, this id will be used to prevent workers that were 

1991 # concurrently finalizing a task from the previous call to run the 

1992 # callback. 

1993 with self._lock: 

1994 self._call_id = uuid4().hex 

1995 

1996 # self._effective_n_jobs should be called in the Parallel.__call__ 

1997 # thread only -- store its value in an attribute for further queries. 

1998 self._cached_effective_n_jobs = n_jobs 

1999 

2000 if isinstance(self._backend, LokyBackend): 

2001 # For the loky backend, we add a callback executed when reducing 

2002 # BatchCalls, that makes the loky executor use a temporary folder 

2003 # specific to this Parallel object when pickling temporary memmaps. 

2004 # This callback is necessary to ensure that several Parallel 

2005 # objects using the same reusable executor don't use the same 

2006 # temporary resources. 

2007 

2008 def _batched_calls_reducer_callback(): 

2009 # Relevant implementation detail: the following lines, called 

2010 # when reducing BatchedCalls, are called in a thread-safe 

2011 # situation, meaning that the context of the temporary folder 

2012 # manager will not be changed in between the callback execution 

2013 # and the end of the BatchedCalls pickling. The reason is that 

2014 # pickling (the only place where set_current_context is used) 

2015 # is done from a single thread (the queue_feeder_thread). 

2016 self._backend._workers._temp_folder_manager.set_current_context( # noqa 

2017 self._id 

2018 ) 

2019 

2020 self._reducer_callback = _batched_calls_reducer_callback 

2021 

2022 # self._effective_n_jobs should be called in the Parallel.__call__ 

2023 # thread only -- store its value in an attribute for further queries. 

2024 self._cached_effective_n_jobs = n_jobs 

2025 

2026 backend_name = self._backend.__class__.__name__ 

2027 if n_jobs == 0: 

2028 raise RuntimeError("%s has no active worker." % backend_name) 

2029 

2030 self._print(f"Using backend {backend_name} with {n_jobs} concurrent workers.") 

2031 if hasattr(self._backend, "start_call"): 

2032 self._backend.start_call() 

2033 

2034 # Following flag prevents double calls to `backend.stop_call`. 

2035 self._calling = True 

2036 

2037 iterator = iter(iterable) 

2038 pre_dispatch = self.pre_dispatch 

2039 

2040 if pre_dispatch == "all": 

2041 # prevent further dispatch via multiprocessing callback thread 

2042 self._original_iterator = None 

2043 self._pre_dispatch_amount = 0 

2044 else: 

2045 self._original_iterator = iterator 

2046 if hasattr(pre_dispatch, "endswith"): 

2047 pre_dispatch = eval_expr(pre_dispatch.replace("n_jobs", str(n_jobs))) 

2048 self._pre_dispatch_amount = pre_dispatch = int(pre_dispatch) 

2049 

2050 # The main thread will consume the first pre_dispatch items and 

2051 # the remaining items will later be lazily dispatched by async 

2052 # callbacks upon task completions. 

2053 

2054 # TODO: this iterator should be batch_size * n_jobs 

2055 iterator = itertools.islice(iterator, self._pre_dispatch_amount) 

2056 

2057 # Use a caching dict for callables that are pickled with cloudpickle to 

2058 # improve performances. This cache is used only in the case of 

2059 # functions that are defined in the __main__ module, functions that 

2060 # are defined locally (inside another function) and lambda expressions. 

2061 self._pickle_cache = dict() 

2062 

2063 output = self._get_outputs(iterator, pre_dispatch) 

2064 self._call_ref = weakref.ref(output) 

2065 

2066 # The first item from the output is blank, but it makes the interpreter 

2067 # progress until it enters the Try/Except block of the generator and 

2068 # reaches the first `yield` statement. This starts the asynchronous 

2069 # dispatch of the tasks to the workers. 

2070 next(output) 

2071 

2072 return output if self.return_generator else list(output) 

2073 

2074 def __repr__(self): 

2075 return "%s(n_jobs=%s)" % (self.__class__.__name__, self.n_jobs)