Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/joblib/parallel.py: 16%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

666 statements  

1""" 

2Helpers for embarrassingly parallel code. 

3""" 

4# Author: Gael Varoquaux < gael dot varoquaux at normalesup dot org > 

5# Copyright: 2010, Gael Varoquaux 

6# License: BSD 3 clause 

7 

8from __future__ import division 

9 

10import collections 

11import functools 

12import itertools 

13import os 

14import queue 

15import sys 

16import threading 

17import time 

18import warnings 

19import weakref 

20from contextlib import nullcontext 

21from math import floor, log10, sqrt 

22from multiprocessing import TimeoutError 

23from numbers import Integral 

24from uuid import uuid4 

25 

26from ._multiprocessing_helpers import mp 

27 

28# Make sure that those two classes are part of the public joblib.parallel API 

29# so that 3rd party backend implementers can import them from here. 

30from ._parallel_backends import ( 

31 AutoBatchingMixin, # noqa 

32 FallbackToBackend, 

33 LokyBackend, 

34 MultiprocessingBackend, 

35 ParallelBackendBase, # noqa 

36 SequentialBackend, 

37 ThreadingBackend, 

38) 

39from ._utils import _Sentinel, eval_expr 

40from .disk import memstr_to_bytes 

41from .logger import Logger, short_format_time 

42 

43BACKENDS = { 

44 "threading": ThreadingBackend, 

45 "sequential": SequentialBackend, 

46} 

47# name of the backend used by default by Parallel outside of any context 

48# managed by ``parallel_config`` or ``parallel_backend``. 

49 

50# threading is the only backend that is always everywhere 

51DEFAULT_BACKEND = "threading" 

52DEFAULT_THREAD_BACKEND = "threading" 

53DEFAULT_PROCESS_BACKEND = "threading" 

54 

55MAYBE_AVAILABLE_BACKENDS = {"multiprocessing", "loky"} 

56 

57# if multiprocessing is available, so is loky, we set it as the default 

58# backend 

59if mp is not None: 

60 BACKENDS["multiprocessing"] = MultiprocessingBackend 

61 from .externals import loky 

62 

63 BACKENDS["loky"] = LokyBackend 

64 DEFAULT_BACKEND = "loky" 

65 DEFAULT_PROCESS_BACKEND = "loky" 

66 

67# Thread local value that can be overridden by the ``parallel_config`` context 

68# manager 

69_backend = threading.local() 

70 

71 

72def _register_dask(): 

73 """Register Dask Backend if called with parallel_config(backend="dask")""" 

74 try: 

75 from ._dask import DaskDistributedBackend 

76 

77 register_parallel_backend("dask", DaskDistributedBackend) 

78 except ImportError as e: 

79 msg = ( 

80 "To use the dask.distributed backend you must install both " 

81 "the `dask` and distributed modules.\n\n" 

82 "See https://dask.pydata.org/en/latest/install.html for more " 

83 "information." 

84 ) 

85 raise ImportError(msg) from e 

86 

87 

88EXTERNAL_BACKENDS = { 

89 "dask": _register_dask, 

90} 

91 

92 

93# Sentinels for the default values of the Parallel constructor and 

94# the parallel_config and parallel_backend context managers 

95default_parallel_config = { 

96 "backend": _Sentinel(default_value=None), 

97 "n_jobs": _Sentinel(default_value=None), 

98 "verbose": _Sentinel(default_value=0), 

99 "temp_folder": _Sentinel(default_value=None), 

100 "max_nbytes": _Sentinel(default_value="1M"), 

101 "mmap_mode": _Sentinel(default_value="r"), 

102 "prefer": _Sentinel(default_value=None), 

103 "require": _Sentinel(default_value=None), 

104} 

105 

106 

107VALID_BACKEND_HINTS = ("processes", "threads", None) 

108VALID_BACKEND_CONSTRAINTS = ("sharedmem", None) 

109 

110 

111def _get_config_param(param, context_config, key): 

112 """Return the value of a parallel config parameter 

113 

114 Explicitly setting it in Parallel has priority over setting in a 

115 parallel_(config/backend) context manager. 

116 """ 

117 if param is not default_parallel_config[key]: 

118 # param is explicitly set, return it 

119 return param 

120 

121 if context_config[key] is not default_parallel_config[key]: 

122 # there's a context manager and the key is set, return it 

123 return context_config[key] 

124 

125 # Otherwise, we are in the default_parallel_config, 

126 # return the default value 

127 return param.default_value 

128 

129 

130def get_active_backend( 

131 prefer=default_parallel_config["prefer"], 

132 require=default_parallel_config["require"], 

133 verbose=default_parallel_config["verbose"], 

134): 

135 """Return the active default backend""" 

136 backend, config = _get_active_backend(prefer, require, verbose) 

137 n_jobs = _get_config_param(default_parallel_config["n_jobs"], config, "n_jobs") 

138 return backend, n_jobs 

139 

140 

141def _get_active_backend( 

142 prefer=default_parallel_config["prefer"], 

143 require=default_parallel_config["require"], 

144 verbose=default_parallel_config["verbose"], 

145): 

146 """Return the active default backend""" 

147 

148 backend_config = getattr(_backend, "config", default_parallel_config) 

149 

150 backend = _get_config_param( 

151 default_parallel_config["backend"], backend_config, "backend" 

152 ) 

153 

154 prefer = _get_config_param(prefer, backend_config, "prefer") 

155 require = _get_config_param(require, backend_config, "require") 

156 verbose = _get_config_param(verbose, backend_config, "verbose") 

157 

158 if prefer not in VALID_BACKEND_HINTS: 

159 raise ValueError( 

160 f"prefer={prefer} is not a valid backend hint, " 

161 f"expected one of {VALID_BACKEND_HINTS}" 

162 ) 

163 if require not in VALID_BACKEND_CONSTRAINTS: 

164 raise ValueError( 

165 f"require={require} is not a valid backend constraint, " 

166 f"expected one of {VALID_BACKEND_CONSTRAINTS}" 

167 ) 

168 if prefer == "processes" and require == "sharedmem": 

169 raise ValueError( 

170 "prefer == 'processes' and require == 'sharedmem' are inconsistent settings" 

171 ) 

172 

173 explicit_backend = True 

174 if backend is None: 

175 # We are either outside of the scope of any parallel_(config/backend) 

176 # context manager or the context manager did not set a backend. 

177 # create the default backend instance now. 

178 backend = BACKENDS[DEFAULT_BACKEND](nesting_level=0) 

179 explicit_backend = False 

180 

181 # Try to use the backend set by the user with the context manager. 

182 

183 nesting_level = backend.nesting_level 

184 uses_threads = getattr(backend, "uses_threads", False) 

185 supports_sharedmem = getattr(backend, "supports_sharedmem", False) 

186 # Force to use thread-based backend if the provided backend does not 

187 # match the shared memory constraint or if the backend is not explicitly 

188 # given and threads are preferred. 

189 force_threads = (require == "sharedmem" and not supports_sharedmem) or ( 

190 not explicit_backend and prefer == "threads" and not uses_threads 

191 ) 

192 force_processes = not explicit_backend and prefer == "processes" and uses_threads 

193 

194 if force_threads: 

195 # This backend does not match the shared memory constraint: 

196 # fallback to the default thead-based backend. 

197 sharedmem_backend = BACKENDS[DEFAULT_THREAD_BACKEND]( 

198 nesting_level=nesting_level 

199 ) 

200 # Warn the user if we forced the backend to thread-based, while the 

201 # user explicitly specified a non-thread-based backend. 

202 if verbose >= 10 and explicit_backend: 

203 print( 

204 f"Using {sharedmem_backend.__class__.__name__} as " 

205 f"joblib backend instead of {backend.__class__.__name__} " 

206 "as the latter does not provide shared memory semantics." 

207 ) 

208 # Force to n_jobs=1 by default 

209 thread_config = backend_config.copy() 

210 thread_config["n_jobs"] = 1 

211 return sharedmem_backend, thread_config 

212 

213 if force_processes: 

214 # This backend does not match the prefer="processes" constraint: 

215 # fallback to the default process-based backend. 

216 process_backend = BACKENDS[DEFAULT_PROCESS_BACKEND](nesting_level=nesting_level) 

217 

218 return process_backend, backend_config.copy() 

219 

220 return backend, backend_config 

221 

222 

223class parallel_config: 

224 """Set the default backend or configuration for :class:`~joblib.Parallel`. 

225 

226 This is an alternative to directly passing keyword arguments to the 

227 :class:`~joblib.Parallel` class constructor. It is particularly useful when 

228 calling into library code that uses joblib internally but does not expose 

229 the various parallel configuration arguments in its own API. 

230 

231 Parameters 

232 ---------- 

233 backend: str or ParallelBackendBase instance, default=None 

234 If ``backend`` is a string it must match a previously registered 

235 implementation using the :func:`~register_parallel_backend` function. 

236 

237 By default the following backends are available: 

238 

239 - 'loky': single-host, process-based parallelism (used by default), 

240 - 'threading': single-host, thread-based parallelism, 

241 - 'multiprocessing': legacy single-host, process-based parallelism. 

242 

243 'loky' is recommended to run functions that manipulate Python objects. 

244 'threading' is a low-overhead alternative that is most efficient for 

245 functions that release the Global Interpreter Lock: e.g. I/O-bound 

246 code or CPU-bound code in a few calls to native code that explicitly 

247 releases the GIL. Note that on some rare systems (such as pyodide), 

248 multiprocessing and loky may not be available, in which case joblib 

249 defaults to threading. 

250 

251 In addition, if the ``dask`` and ``distributed`` Python packages are 

252 installed, it is possible to use the 'dask' backend for better 

253 scheduling of nested parallel calls without over-subscription and 

254 potentially distribute parallel calls over a networked cluster of 

255 several hosts. 

256 

257 It is also possible to use the distributed 'ray' backend for 

258 distributing the workload to a cluster of nodes. See more details 

259 in the Examples section below. 

260 

261 Alternatively the backend can be passed directly as an instance. 

262 

263 n_jobs: int, default=None 

264 The maximum number of concurrently running jobs, such as the number 

265 of Python worker processes when ``backend="loky"`` or the size of the 

266 thread-pool when ``backend="threading"``. 

267 This argument is converted to an integer, rounded below for float. 

268 If -1 is given, `joblib` tries to use all CPUs. The number of CPUs 

269 ``n_cpus`` is obtained with :func:`~cpu_count`. 

270 For n_jobs below -1, (n_cpus + 1 + n_jobs) are used. For instance, 

271 using ``n_jobs=-2`` will result in all CPUs but one being used. 

272 This argument can also go above ``n_cpus``, which will cause 

273 oversubscription. In some cases, slight oversubscription can be 

274 beneficial, e.g., for tasks with large I/O operations. 

275 If 1 is given, no parallel computing code is used at all, and the 

276 behavior amounts to a simple python `for` loop. This mode is not 

277 compatible with `timeout`. 

278 None is a marker for 'unset' that will be interpreted as n_jobs=1 

279 unless the call is performed under a :func:`~parallel_config` 

280 context manager that sets another value for ``n_jobs``. 

281 If n_jobs = 0 then a ValueError is raised. 

282 

283 verbose: int, default=0 

284 The verbosity level: if non zero, progress messages are 

285 printed. Above 50, the output is sent to stdout. 

286 The frequency of the messages increases with the verbosity level. 

287 If it more than 10, all iterations are reported. 

288 

289 temp_folder: str or None, default=None 

290 Folder to be used by the pool for memmapping large arrays 

291 for sharing memory with worker processes. If None, this will try in 

292 order: 

293 

294 - a folder pointed by the ``JOBLIB_TEMP_FOLDER`` environment 

295 variable, 

296 - ``/dev/shm`` if the folder exists and is writable: this is a 

297 RAM disk filesystem available by default on modern Linux 

298 distributions, 

299 - the default system temporary folder that can be 

300 overridden with ``TMP``, ``TMPDIR`` or ``TEMP`` environment 

301 variables, typically ``/tmp`` under Unix operating systems. 

302 

303 max_nbytes: int, str, or None, optional, default='1M' 

304 Threshold on the size of arrays passed to the workers that 

305 triggers automated memory mapping in temp_folder. Can be an int 

306 in Bytes, or a human-readable string, e.g., '1M' for 1 megabyte. 

307 Use None to disable memmapping of large arrays. 

308 

309 mmap_mode: {None, 'r+', 'r', 'w+', 'c'}, default='r' 

310 Memmapping mode for numpy arrays passed to workers. None will 

311 disable memmapping, other modes defined in the numpy.memmap doc: 

312 https://numpy.org/doc/stable/reference/generated/numpy.memmap.html 

313 Also, see 'max_nbytes' parameter documentation for more details. 

314 

315 prefer: str in {'processes', 'threads'} or None, default=None 

316 Soft hint to choose the default backend. 

317 The default process-based backend is 'loky' and the default 

318 thread-based backend is 'threading'. Ignored if the ``backend`` 

319 parameter is specified. 

320 

321 require: 'sharedmem' or None, default=None 

322 Hard constraint to select the backend. If set to 'sharedmem', 

323 the selected backend will be single-host and thread-based. 

324 

325 inner_max_num_threads: int, default=None 

326 If not None, overwrites the limit set on the number of threads 

327 usable in some third-party library threadpools like OpenBLAS, 

328 MKL or OpenMP. This is only used with the ``loky`` backend. 

329 

330 backend_params: dict 

331 Additional parameters to pass to the backend constructor when 

332 backend is a string. 

333 

334 Notes 

335 ----- 

336 Joblib tries to limit the oversubscription by limiting the number of 

337 threads usable in some third-party library threadpools like OpenBLAS, MKL 

338 or OpenMP. The default limit in each worker is set to 

339 ``max(cpu_count() // effective_n_jobs, 1)`` but this limit can be 

340 overwritten with the ``inner_max_num_threads`` argument which will be used 

341 to set this limit in the child processes. 

342 

343 .. versionadded:: 1.3 

344 

345 Examples 

346 -------- 

347 >>> from operator import neg 

348 >>> with parallel_config(backend='threading'): 

349 ... print(Parallel()(delayed(neg)(i + 1) for i in range(5))) 

350 ... 

351 [-1, -2, -3, -4, -5] 

352 

353 To use the 'ray' joblib backend add the following lines: 

354 

355 >>> from ray.util.joblib import register_ray # doctest: +SKIP 

356 >>> register_ray() # doctest: +SKIP 

357 >>> with parallel_config(backend="ray"): # doctest: +SKIP 

358 ... print(Parallel()(delayed(neg)(i + 1) for i in range(5))) 

359 [-1, -2, -3, -4, -5] 

360 

361 """ 

362 

363 def __init__( 

364 self, 

365 backend=default_parallel_config["backend"], 

366 *, 

367 n_jobs=default_parallel_config["n_jobs"], 

368 verbose=default_parallel_config["verbose"], 

369 temp_folder=default_parallel_config["temp_folder"], 

370 max_nbytes=default_parallel_config["max_nbytes"], 

371 mmap_mode=default_parallel_config["mmap_mode"], 

372 prefer=default_parallel_config["prefer"], 

373 require=default_parallel_config["require"], 

374 inner_max_num_threads=None, 

375 **backend_params, 

376 ): 

377 # Save the parallel info and set the active parallel config 

378 self.old_parallel_config = getattr(_backend, "config", default_parallel_config) 

379 

380 backend = self._check_backend(backend, inner_max_num_threads, **backend_params) 

381 

382 new_config = { 

383 "n_jobs": n_jobs, 

384 "verbose": verbose, 

385 "temp_folder": temp_folder, 

386 "max_nbytes": max_nbytes, 

387 "mmap_mode": mmap_mode, 

388 "prefer": prefer, 

389 "require": require, 

390 "backend": backend, 

391 } 

392 self.parallel_config = self.old_parallel_config.copy() 

393 self.parallel_config.update( 

394 {k: v for k, v in new_config.items() if not isinstance(v, _Sentinel)} 

395 ) 

396 

397 setattr(_backend, "config", self.parallel_config) 

398 

399 def _check_backend(self, backend, inner_max_num_threads, **backend_params): 

400 if backend is default_parallel_config["backend"]: 

401 if inner_max_num_threads is not None or len(backend_params) > 0: 

402 raise ValueError( 

403 "inner_max_num_threads and other constructor " 

404 "parameters backend_params are only supported " 

405 "when backend is not None." 

406 ) 

407 return backend 

408 

409 if isinstance(backend, str): 

410 # Handle non-registered or missing backends 

411 if backend not in BACKENDS: 

412 if backend in EXTERNAL_BACKENDS: 

413 register = EXTERNAL_BACKENDS[backend] 

414 register() 

415 elif backend in MAYBE_AVAILABLE_BACKENDS: 

416 warnings.warn( 

417 f"joblib backend '{backend}' is not available on " 

418 f"your system, falling back to {DEFAULT_BACKEND}.", 

419 UserWarning, 

420 stacklevel=2, 

421 ) 

422 BACKENDS[backend] = BACKENDS[DEFAULT_BACKEND] 

423 else: 

424 raise ValueError( 

425 f"Invalid backend: {backend}, expected one of " 

426 f"{sorted(BACKENDS.keys())}" 

427 ) 

428 

429 backend = BACKENDS[backend](**backend_params) 

430 else: 

431 if len(backend_params) > 0: 

432 raise ValueError( 

433 "Constructor parameters backend_params are only " 

434 "supported when backend is a string." 

435 ) 

436 

437 if inner_max_num_threads is not None: 

438 msg = ( 

439 f"{backend.__class__.__name__} does not accept setting the " 

440 "inner_max_num_threads argument." 

441 ) 

442 assert backend.supports_inner_max_num_threads, msg 

443 backend.inner_max_num_threads = inner_max_num_threads 

444 

445 # If the nesting_level of the backend is not set previously, use the 

446 # nesting level from the previous active_backend to set it 

447 if backend.nesting_level is None: 

448 parent_backend = self.old_parallel_config["backend"] 

449 if parent_backend is default_parallel_config["backend"]: 

450 nesting_level = 0 

451 else: 

452 nesting_level = parent_backend.nesting_level 

453 backend.nesting_level = nesting_level 

454 

455 return backend 

456 

457 def __enter__(self): 

458 return self.parallel_config 

459 

460 def __exit__(self, type, value, traceback): 

461 self.unregister() 

462 

463 def unregister(self): 

464 setattr(_backend, "config", self.old_parallel_config) 

465 

466 

467class parallel_backend(parallel_config): 

468 """Change the default backend used by Parallel inside a with block. 

469 

470 .. warning:: 

471 It is advised to use the :class:`~joblib.parallel_config` context 

472 manager instead, which allows more fine-grained control over the 

473 backend configuration. 

474 

475 If ``backend`` is a string it must match a previously registered 

476 implementation using the :func:`~register_parallel_backend` function. 

477 

478 By default the following backends are available: 

479 

480 - 'loky': single-host, process-based parallelism (used by default), 

481 - 'threading': single-host, thread-based parallelism, 

482 - 'multiprocessing': legacy single-host, process-based parallelism. 

483 

484 'loky' is recommended to run functions that manipulate Python objects. 

485 'threading' is a low-overhead alternative that is most efficient for 

486 functions that release the Global Interpreter Lock: e.g. I/O-bound code or 

487 CPU-bound code in a few calls to native code that explicitly releases the 

488 GIL. Note that on some rare systems (such as Pyodide), 

489 multiprocessing and loky may not be available, in which case joblib 

490 defaults to threading. 

491 

492 You can also use the `Dask <https://docs.dask.org/en/stable/>`_ joblib 

493 backend to distribute work across machines. This works well with 

494 scikit-learn estimators with the ``n_jobs`` parameter, for example:: 

495 

496 >>> import joblib # doctest: +SKIP 

497 >>> from sklearn.model_selection import GridSearchCV # doctest: +SKIP 

498 >>> from dask.distributed import Client, LocalCluster # doctest: +SKIP 

499 

500 >>> # create a local Dask cluster 

501 >>> cluster = LocalCluster() # doctest: +SKIP 

502 >>> client = Client(cluster) # doctest: +SKIP 

503 >>> grid_search = GridSearchCV(estimator, param_grid, n_jobs=-1) 

504 ... # doctest: +SKIP 

505 >>> with joblib.parallel_backend("dask", scatter=[X, y]): # doctest: +SKIP 

506 ... grid_search.fit(X, y) 

507 

508 It is also possible to use the distributed 'ray' backend for distributing 

509 the workload to a cluster of nodes. To use the 'ray' joblib backend add 

510 the following lines:: 

511 

512 >>> from ray.util.joblib import register_ray # doctest: +SKIP 

513 >>> register_ray() # doctest: +SKIP 

514 >>> with parallel_backend("ray"): # doctest: +SKIP 

515 ... print(Parallel()(delayed(neg)(i + 1) for i in range(5))) 

516 [-1, -2, -3, -4, -5] 

517 

518 Alternatively the backend can be passed directly as an instance. 

519 

520 By default all available workers will be used (``n_jobs=-1``) unless the 

521 caller passes an explicit value for the ``n_jobs`` parameter. 

522 

523 This is an alternative to passing a ``backend='backend_name'`` argument to 

524 the :class:`~Parallel` class constructor. It is particularly useful when 

525 calling into library code that uses joblib internally but does not expose 

526 the backend argument in its own API. 

527 

528 >>> from operator import neg 

529 >>> with parallel_backend('threading'): 

530 ... print(Parallel()(delayed(neg)(i + 1) for i in range(5))) 

531 ... 

532 [-1, -2, -3, -4, -5] 

533 

534 Joblib also tries to limit the oversubscription by limiting the number of 

535 threads usable in some third-party library threadpools like OpenBLAS, MKL 

536 or OpenMP. The default limit in each worker is set to 

537 ``max(cpu_count() // effective_n_jobs, 1)`` but this limit can be 

538 overwritten with the ``inner_max_num_threads`` argument which will be used 

539 to set this limit in the child processes. 

540 

541 .. versionadded:: 0.10 

542 

543 See Also 

544 -------- 

545 joblib.parallel_config: context manager to change the backend configuration. 

546 """ 

547 

548 def __init__( 

549 self, backend, n_jobs=-1, inner_max_num_threads=None, **backend_params 

550 ): 

551 super().__init__( 

552 backend=backend, 

553 n_jobs=n_jobs, 

554 inner_max_num_threads=inner_max_num_threads, 

555 **backend_params, 

556 ) 

557 

558 if self.old_parallel_config is None: 

559 self.old_backend_and_jobs = None 

560 else: 

561 self.old_backend_and_jobs = ( 

562 self.old_parallel_config["backend"], 

563 self.old_parallel_config["n_jobs"], 

564 ) 

565 self.new_backend_and_jobs = ( 

566 self.parallel_config["backend"], 

567 self.parallel_config["n_jobs"], 

568 ) 

569 

570 def __enter__(self): 

571 return self.new_backend_and_jobs 

572 

573 

574# Under Linux or OS X the default start method of multiprocessing 

575# can cause third party libraries to crash. Under Python 3.4+ it is possible 

576# to set an environment variable to switch the default start method from 

577# 'fork' to 'forkserver' or 'spawn' to avoid this issue albeit at the cost 

578# of causing semantic changes and some additional pool instantiation overhead. 

579DEFAULT_MP_CONTEXT = None 

580if hasattr(mp, "get_context"): 

581 method = os.environ.get("JOBLIB_START_METHOD", "").strip() or None 

582 if method is not None: 

583 DEFAULT_MP_CONTEXT = mp.get_context(method=method) 

584 

585 

586class BatchedCalls(object): 

587 """Wrap a sequence of (func, args, kwargs) tuples as a single callable""" 

588 

589 def __init__( 

590 self, iterator_slice, backend_and_jobs, reducer_callback=None, pickle_cache=None 

591 ): 

592 self.items = list(iterator_slice) 

593 self._size = len(self.items) 

594 self._reducer_callback = reducer_callback 

595 if isinstance(backend_and_jobs, tuple): 

596 self._backend, self._n_jobs = backend_and_jobs 

597 else: 

598 # this is for backward compatibility purposes. Before 0.12.6, 

599 # nested backends were returned without n_jobs indications. 

600 self._backend, self._n_jobs = backend_and_jobs, None 

601 self._pickle_cache = pickle_cache if pickle_cache is not None else {} 

602 

603 def __call__(self): 

604 # Set the default nested backend to self._backend but do not set the 

605 # change the default number of processes to -1 

606 with parallel_config(backend=self._backend, n_jobs=self._n_jobs): 

607 return [func(*args, **kwargs) for func, args, kwargs in self.items] 

608 

609 def __reduce__(self): 

610 if self._reducer_callback is not None: 

611 self._reducer_callback() 

612 # no need to pickle the callback. 

613 return ( 

614 BatchedCalls, 

615 (self.items, (self._backend, self._n_jobs), None, self._pickle_cache), 

616 ) 

617 

618 def __len__(self): 

619 return self._size 

620 

621 

622# Possible exit status for a task 

623TASK_DONE = "Done" 

624TASK_ERROR = "Error" 

625TASK_PENDING = "Pending" 

626 

627 

628############################################################################### 

629# CPU count that works also when multiprocessing has been disabled via 

630# the JOBLIB_MULTIPROCESSING environment variable 

631def cpu_count(only_physical_cores=False): 

632 """Return the number of CPUs. 

633 

634 This delegates to loky.cpu_count that takes into account additional 

635 constraints such as Linux CFS scheduler quotas (typically set by container 

636 runtimes such as docker) and CPU affinity (for instance using the taskset 

637 command on Linux). 

638 

639 Parameters 

640 ---------- 

641 only_physical_cores : boolean, default=False 

642 If True, does not take hyperthreading / SMT logical cores into account. 

643 

644 """ 

645 if mp is None: 

646 return 1 

647 

648 return loky.cpu_count(only_physical_cores=only_physical_cores) 

649 

650 

651############################################################################### 

652# For verbosity 

653 

654 

655def _verbosity_filter(index, verbose): 

656 """Returns False for indices increasingly apart, the distance 

657 depending on the value of verbose. 

658 

659 We use a lag increasing as the square of index 

660 """ 

661 if not verbose: 

662 return True 

663 elif verbose > 10: 

664 return False 

665 if index == 0: 

666 return False 

667 verbose = 0.5 * (11 - verbose) ** 2 

668 scale = sqrt(index / verbose) 

669 next_scale = sqrt((index + 1) / verbose) 

670 return int(next_scale) == int(scale) 

671 

672 

673############################################################################### 

674def delayed(function): 

675 """Decorator used to capture the arguments of a function. 

676 

677 Parameters 

678 ---------- 

679 function: callable 

680 The function to be decorated. 

681 

682 Returns 

683 ------- 

684 callable 

685 A new function ``F`` such that calling ``F(*args, **kwargs)`` 

686 returns a tuple ``(function, args, kwargs)``, allowing the later 

687 execution of ``function(*args, **kwargs)``. 

688 

689 Notes 

690 ----- 

691 Be careful about the order in which decorators are applied, especially 

692 when using :meth:`Memory.cache<joblib.Memory.cache>`. For instance, 

693 ``Memory.cache(delayed(func))`` will cache the outputs of 

694 ``delayed(func)``, that is, tuples of the form ``(func, args, kwargs)``. 

695 To cache the outputs of ``func`` itself, you must instead use 

696 ``delayed(Memory.cache(func))``. 

697 """ 

698 

699 def delayed_function(*args, **kwargs): 

700 return function, args, kwargs 

701 

702 try: 

703 delayed_function = functools.wraps(function)(delayed_function) 

704 except AttributeError: 

705 " functools.wraps fails on some callable objects " 

706 return delayed_function 

707 

708 

709############################################################################### 

710class BatchCompletionCallBack(object): 

711 """Callback to keep track of completed results and schedule the next tasks. 

712 

713 This callable is executed by the parent process whenever a worker process 

714 has completed a batch of tasks. 

715 

716 It is used for progress reporting, to update estimate of the batch 

717 processing duration and to schedule the next batch of tasks to be 

718 processed. 

719 

720 It is assumed that this callback will always be triggered by the backend 

721 right after the end of a task, in case of success as well as in case of 

722 failure. 

723 """ 

724 

725 ########################################################################## 

726 # METHODS CALLED BY THE MAIN THREAD # 

727 ########################################################################## 

728 def __init__(self, dispatch_timestamp, batch_size, parallel): 

729 self.dispatch_timestamp = dispatch_timestamp 

730 self.batch_size = batch_size 

731 self.parallel = parallel 

732 self.parallel_call_id = parallel._call_id 

733 self._completion_timeout_counter = None 

734 

735 # Internals to keep track of the status and outcome of the task. 

736 

737 # Used to hold a reference to the future-like object returned by the 

738 # backend after launching this task 

739 # This will be set later when calling `register_job`, as it is only 

740 # created once the task has been submitted. 

741 self.job = None 

742 

743 if not parallel._backend.supports_retrieve_callback: 

744 # The status is only used for asynchronous result retrieval in the 

745 # callback. 

746 self.status = None 

747 else: 

748 # The initial status for the job is TASK_PENDING. 

749 # Once it is done, it will be either TASK_DONE, or TASK_ERROR. 

750 self.status = TASK_PENDING 

751 

752 def register_job(self, job): 

753 """Register the object returned by `submit`.""" 

754 self.job = job 

755 

756 def get_result(self, timeout): 

757 """Returns the raw result of the task that was submitted. 

758 

759 If the task raised an exception rather than returning, this same 

760 exception will be raised instead. 

761 

762 If the backend supports the retrieval callback, it is assumed that this 

763 method is only called after the result has been registered. It is 

764 ensured by checking that `self.status(timeout)` does not return 

765 TASK_PENDING. In this case, `get_result` directly returns the 

766 registered result (or raise the registered exception). 

767 

768 For other backends, there are no such assumptions, but `get_result` 

769 still needs to synchronously retrieve the result before it can 

770 return it or raise. It will block at most `self.timeout` seconds 

771 waiting for retrieval to complete, after that it raises a TimeoutError. 

772 """ 

773 

774 backend = self.parallel._backend 

775 

776 if backend.supports_retrieve_callback: 

777 # We assume that the result has already been retrieved by the 

778 # callback thread, and is stored internally. It's just waiting to 

779 # be returned. 

780 return self._return_or_raise() 

781 

782 # For other backends, the main thread needs to run the retrieval step. 

783 try: 

784 result = backend.retrieve_result(self.job, timeout=timeout) 

785 outcome = dict(result=result, status=TASK_DONE) 

786 except BaseException as e: 

787 outcome = dict(result=e, status=TASK_ERROR) 

788 self._register_outcome(outcome) 

789 

790 return self._return_or_raise() 

791 

792 def _return_or_raise(self): 

793 try: 

794 if self.status == TASK_ERROR: 

795 raise self._result 

796 return self._result 

797 finally: 

798 del self._result 

799 

800 def get_status(self, timeout): 

801 """Get the status of the task. 

802 

803 This function also checks if the timeout has been reached and register 

804 the TimeoutError outcome when it is the case. 

805 """ 

806 if timeout is None or self.status != TASK_PENDING: 

807 return self.status 

808 

809 # The computation are running and the status is pending. 

810 # Check that we did not wait for this jobs more than `timeout`. 

811 now = time.time() 

812 if self._completion_timeout_counter is None: 

813 self._completion_timeout_counter = now 

814 

815 if (now - self._completion_timeout_counter) > timeout: 

816 outcome = dict(result=TimeoutError(), status=TASK_ERROR) 

817 self._register_outcome(outcome) 

818 

819 return self.status 

820 

821 ########################################################################## 

822 # METHODS CALLED BY CALLBACK THREADS # 

823 ########################################################################## 

824 def __call__(self, *args, **kwargs): 

825 """Function called by the callback thread after a job is completed.""" 

826 

827 # If the backend doesn't support callback retrievals, the next batch of 

828 # tasks is dispatched regardless. The result will be retrieved by the 

829 # main thread when calling `get_result`. 

830 if not self.parallel._backend.supports_retrieve_callback: 

831 self._dispatch_new() 

832 return 

833 

834 # If the backend supports retrieving the result in the callback, it 

835 # registers the task outcome (TASK_ERROR or TASK_DONE), and schedules 

836 # the next batch if needed. 

837 with self.parallel._lock: 

838 # Edge case where while the task was processing, the `parallel` 

839 # instance has been reset and a new call has been issued, but the 

840 # worker managed to complete the task and trigger this callback 

841 # call just before being aborted by the reset. 

842 if self.parallel._call_id != self.parallel_call_id: 

843 return 

844 

845 # When aborting, stop as fast as possible and do not retrieve the 

846 # result as it won't be returned by the Parallel call. 

847 if self.parallel._aborting: 

848 return 

849 

850 # Retrieves the result of the task in the main process and dispatch 

851 # a new batch if needed. 

852 job_succeeded = self._retrieve_result(*args, **kwargs) 

853 

854 if job_succeeded: 

855 self._dispatch_new() 

856 

857 def _dispatch_new(self): 

858 """Schedule the next batch of tasks to be processed.""" 

859 

860 # This steps ensure that auto-batching works as expected. 

861 this_batch_duration = time.time() - self.dispatch_timestamp 

862 self.parallel._backend.batch_completed(self.batch_size, this_batch_duration) 

863 

864 # Schedule the next batch of tasks. 

865 with self.parallel._lock: 

866 self.parallel.n_completed_tasks += self.batch_size 

867 self.parallel.print_progress() 

868 if self.parallel._original_iterator is not None: 

869 self.parallel.dispatch_next() 

870 

871 def _retrieve_result(self, out): 

872 """Fetch and register the outcome of a task. 

873 

874 Return True if the task succeeded, False otherwise. 

875 This function is only called by backends that support retrieving 

876 the task result in the callback thread. 

877 """ 

878 try: 

879 result = self.parallel._backend.retrieve_result_callback(out) 

880 outcome = dict(status=TASK_DONE, result=result) 

881 except BaseException as e: 

882 # Avoid keeping references to parallel in the error. 

883 e.__traceback__ = None 

884 outcome = dict(result=e, status=TASK_ERROR) 

885 

886 self._register_outcome(outcome) 

887 return outcome["status"] != TASK_ERROR 

888 

889 ########################################################################## 

890 # This method can be called either in the main thread # 

891 # or in the callback thread. # 

892 ########################################################################## 

893 def _register_outcome(self, outcome): 

894 """Register the outcome of a task. 

895 

896 This method can be called only once, future calls will be ignored. 

897 """ 

898 # Covers the edge case where the main thread tries to register a 

899 # `TimeoutError` while the callback thread tries to register a result 

900 # at the same time. 

901 with self.parallel._lock: 

902 if self.status not in (TASK_PENDING, None): 

903 return 

904 self.status = outcome["status"] 

905 

906 self._result = outcome["result"] 

907 

908 # Once the result and the status are extracted, the last reference to 

909 # the job can be deleted. 

910 self.job = None 

911 

912 # As soon as an error as been spotted, early stopping flags are sent to 

913 # the `parallel` instance. 

914 if self.status == TASK_ERROR: 

915 self.parallel._exception = True 

916 self.parallel._aborting = True 

917 

918 if self.parallel.return_ordered: 

919 return 

920 

921 with self.parallel._lock: 

922 # For `return_as=generator_unordered`, append the job to the queue 

923 # in the order of completion instead of submission. 

924 self.parallel._jobs.append(self) 

925 

926 

927############################################################################### 

928def register_parallel_backend(name, factory, make_default=False): 

929 """Register a new Parallel backend factory. 

930 

931 The new backend can then be selected by passing its name as the backend 

932 argument to the :class:`~Parallel` class. Moreover, the default backend can 

933 be overwritten globally by setting make_default=True. 

934 

935 The factory can be any callable that takes no argument and return an 

936 instance of ``ParallelBackendBase``. 

937 

938 Warning: this function is experimental and subject to change in a future 

939 version of joblib. 

940 

941 .. versionadded:: 0.10 

942 """ 

943 BACKENDS[name] = factory 

944 if make_default: 

945 global DEFAULT_BACKEND 

946 DEFAULT_BACKEND = name 

947 

948 

949def effective_n_jobs(n_jobs=-1): 

950 """Determine the number of jobs that can actually run in parallel 

951 

952 n_jobs is the number of workers requested by the callers. Passing n_jobs=-1 

953 means requesting all available workers for instance matching the number of 

954 CPU cores on the worker host(s). 

955 

956 This method should return a guesstimate of the number of workers that can 

957 actually perform work concurrently with the currently enabled default 

958 backend. The primary use case is to make it possible for the caller to know 

959 in how many chunks to slice the work. 

960 

961 In general working on larger data chunks is more efficient (less scheduling 

962 overhead and better use of CPU cache prefetching heuristics) as long as all 

963 the workers have enough work to do. 

964 

965 Warning: this function is experimental and subject to change in a future 

966 version of joblib. 

967 

968 .. versionadded:: 0.10 

969 """ 

970 if n_jobs == 1: 

971 return 1 

972 

973 backend, backend_n_jobs = get_active_backend() 

974 if n_jobs is None: 

975 n_jobs = backend_n_jobs 

976 return backend.effective_n_jobs(n_jobs=n_jobs) 

977 

978 

979############################################################################### 

980class Parallel(Logger): 

981 """Helper class for readable parallel mapping. 

982 

983 Read more in the :ref:`User Guide <parallel>`. 

984 

985 Parameters 

986 ---------- 

987 n_jobs: int, default=None 

988 The maximum number of concurrently running jobs, such as the number 

989 of Python worker processes when ``backend="loky"`` or the size of 

990 the thread-pool when ``backend="threading"``. 

991 This argument is converted to an integer, rounded below for float. 

992 If -1 is given, `joblib` tries to use all CPUs. The number of CPUs 

993 ``n_cpus`` is obtained with :func:`~cpu_count`. 

994 For n_jobs below -1, (n_cpus + 1 + n_jobs) are used. For instance, 

995 using ``n_jobs=-2`` will result in all CPUs but one being used. 

996 This argument can also go above ``n_cpus``, which will cause 

997 oversubscription. In some cases, slight oversubscription can be 

998 beneficial, e.g., for tasks with large I/O operations. 

999 If 1 is given, no parallel computing code is used at all, and the 

1000 behavior amounts to a simple python `for` loop. This mode is not 

1001 compatible with ``timeout``. 

1002 None is a marker for 'unset' that will be interpreted as n_jobs=1 

1003 unless the call is performed under a :func:`~parallel_config` 

1004 context manager that sets another value for ``n_jobs``. 

1005 If n_jobs = 0 then a ValueError is raised. 

1006 backend: str, ParallelBackendBase instance or None, default='loky' 

1007 Specify the parallelization backend implementation. 

1008 Supported backends are: 

1009 

1010 - "loky" used by default, can induce some 

1011 communication and memory overhead when exchanging input and 

1012 output data with the worker Python processes. On some rare 

1013 systems (such as Pyiodide), the loky backend may not be 

1014 available. 

1015 - "multiprocessing" previous process-based backend based on 

1016 `multiprocessing.Pool`. Less robust than `loky`. 

1017 - "threading" is a very low-overhead backend but it suffers 

1018 from the Python Global Interpreter Lock if the called function 

1019 relies a lot on Python objects. "threading" is mostly useful 

1020 when the execution bottleneck is a compiled extension that 

1021 explicitly releases the GIL (for instance a Cython loop wrapped 

1022 in a "with nogil" block or an expensive call to a library such 

1023 as NumPy). 

1024 - finally, you can register backends by calling 

1025 :func:`~register_parallel_backend`. This will allow you to 

1026 implement a backend of your liking. 

1027 

1028 It is not recommended to hard-code the backend name in a call to 

1029 :class:`~Parallel` in a library. Instead it is recommended to set 

1030 soft hints (prefer) or hard constraints (require) so as to make it 

1031 possible for library users to change the backend from the outside 

1032 using the :func:`~parallel_config` context manager. 

1033 return_as: str in {'list', 'generator', 'generator_unordered'}, default='list' 

1034 If 'list', calls to this instance will return a list, only when 

1035 all results have been processed and retrieved. 

1036 If 'generator', it will return a generator that yields the results 

1037 as soon as they are available, in the order the tasks have been 

1038 submitted with. 

1039 If 'generator_unordered', the generator will immediately yield 

1040 available results independently of the submission order. The output 

1041 order is not deterministic in this case because it depends on the 

1042 concurrency of the workers. 

1043 prefer: str in {'processes', 'threads'} or None, default=None 

1044 Soft hint to choose the default backend if no specific backend 

1045 was selected with the :func:`~parallel_config` context manager. 

1046 The default process-based backend is 'loky' and the default 

1047 thread-based backend is 'threading'. Ignored if the ``backend`` 

1048 parameter is specified. 

1049 require: 'sharedmem' or None, default=None 

1050 Hard constraint to select the backend. If set to 'sharedmem', 

1051 the selected backend will be single-host and thread-based even 

1052 if the user asked for a non-thread based backend with 

1053 :func:`~joblib.parallel_config`. 

1054 verbose: int, default=0 

1055 The verbosity level: if non zero, progress messages are 

1056 printed. Above 50, the output is sent to stdout. 

1057 The frequency of the messages increases with the verbosity level. 

1058 If it more than 10, all iterations are reported. 

1059 timeout: float or None, default=None 

1060 Timeout limit for each task to complete. If any task takes longer 

1061 a TimeOutError will be raised. Only applied when n_jobs != 1 

1062 pre_dispatch: {'all', integer, or expression, as in '3*n_jobs'}, default='2*n_jobs' 

1063 The number of batches (of tasks) to be pre-dispatched. 

1064 Default is '2*n_jobs'. When batch_size="auto" this is reasonable 

1065 default and the workers should never starve. Note that only basic 

1066 arithmetic are allowed here and no modules can be used in this 

1067 expression. 

1068 batch_size: int or 'auto', default='auto' 

1069 The number of atomic tasks to dispatch at once to each 

1070 worker. When individual evaluations are very fast, dispatching 

1071 calls to workers can be slower than sequential computation because 

1072 of the overhead. Batching fast computations together can mitigate 

1073 this. 

1074 The ``'auto'`` strategy keeps track of the time it takes for a 

1075 batch to complete, and dynamically adjusts the batch size to keep 

1076 the time on the order of half a second, using a heuristic. The 

1077 initial batch size is 1. 

1078 ``batch_size="auto"`` with ``backend="threading"`` will dispatch 

1079 batches of a single task at a time as the threading backend has 

1080 very little overhead and using larger batch size has not proved to 

1081 bring any gain in that case. 

1082 temp_folder: str or None, default=None 

1083 Folder to be used by the pool for memmapping large arrays 

1084 for sharing memory with worker processes. If None, this will try in 

1085 order: 

1086 

1087 - a folder pointed by the JOBLIB_TEMP_FOLDER environment 

1088 variable, 

1089 - /dev/shm if the folder exists and is writable: this is a 

1090 RAM disk filesystem available by default on modern Linux 

1091 distributions, 

1092 - the default system temporary folder that can be 

1093 overridden with TMP, TMPDIR or TEMP environment 

1094 variables, typically /tmp under Unix operating systems. 

1095 

1096 Only active when ``backend="loky"`` or ``"multiprocessing"``. 

1097 max_nbytes int, str, or None, optional, default='1M' 

1098 Threshold on the size of arrays passed to the workers that 

1099 triggers automated memory mapping in temp_folder. Can be an int 

1100 in Bytes, or a human-readable string, e.g., '1M' for 1 megabyte. 

1101 Use None to disable memmapping of large arrays. 

1102 Only active when ``backend="loky"`` or ``"multiprocessing"``. 

1103 mmap_mode: {None, 'r+', 'r', 'w+', 'c'}, default='r' 

1104 Memmapping mode for numpy arrays passed to workers. None will 

1105 disable memmapping, other modes defined in the numpy.memmap doc: 

1106 https://numpy.org/doc/stable/reference/generated/numpy.memmap.html 

1107 Also, see 'max_nbytes' parameter documentation for more details. 

1108 backend_kwargs: dict, optional 

1109 Additional parameters to pass to the backend `configure` method. 

1110 

1111 Notes 

1112 ----- 

1113 

1114 This object uses workers to compute in parallel the application of a 

1115 function to many different arguments. The main functionality it brings 

1116 in addition to using the raw multiprocessing or concurrent.futures API 

1117 are (see examples for details): 

1118 

1119 * More readable code, in particular since it avoids 

1120 constructing list of arguments. 

1121 

1122 * Easier debugging: 

1123 - informative tracebacks even when the error happens on 

1124 the client side 

1125 - using 'n_jobs=1' enables to turn off parallel computing 

1126 for debugging without changing the codepath 

1127 - early capture of pickling errors 

1128 

1129 * An optional progress meter. 

1130 

1131 * Interruption of multiprocesses jobs with 'Ctrl-C' 

1132 

1133 * Flexible pickling control for the communication to and from 

1134 the worker processes. 

1135 

1136 * Ability to use shared memory efficiently with worker 

1137 processes for large numpy-based datastructures. 

1138 

1139 Note that the intended usage is to run one call at a time. Multiple 

1140 calls to the same Parallel object will result in a ``RuntimeError`` 

1141 

1142 Examples 

1143 -------- 

1144 

1145 A simple example: 

1146 

1147 >>> from math import sqrt 

1148 >>> from joblib import Parallel, delayed 

1149 >>> Parallel(n_jobs=1)(delayed(sqrt)(i**2) for i in range(10)) 

1150 [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0] 

1151 

1152 Reshaping the output when the function has several return 

1153 values: 

1154 

1155 >>> from math import modf 

1156 >>> from joblib import Parallel, delayed 

1157 >>> r = Parallel(n_jobs=1)(delayed(modf)(i/2.) for i in range(10)) 

1158 >>> res, i = zip(*r) 

1159 >>> res 

1160 (0.0, 0.5, 0.0, 0.5, 0.0, 0.5, 0.0, 0.5, 0.0, 0.5) 

1161 >>> i 

1162 (0.0, 0.0, 1.0, 1.0, 2.0, 2.0, 3.0, 3.0, 4.0, 4.0) 

1163 

1164 The progress meter: the higher the value of `verbose`, the more 

1165 messages: 

1166 

1167 >>> from time import sleep 

1168 >>> from joblib import Parallel, delayed 

1169 >>> r = Parallel(n_jobs=2, verbose=10)( 

1170 ... delayed(sleep)(.2) for _ in range(10)) #doctest: +SKIP 

1171 [Parallel(n_jobs=2)]: Done 1 tasks | elapsed: 0.6s 

1172 [Parallel(n_jobs=2)]: Done 4 tasks | elapsed: 0.8s 

1173 [Parallel(n_jobs=2)]: Done 10 out of 10 | elapsed: 1.4s finished 

1174 

1175 Traceback example, note how the line of the error is indicated 

1176 as well as the values of the parameter passed to the function that 

1177 triggered the exception, even though the traceback happens in the 

1178 child process: 

1179 

1180 >>> from heapq import nlargest 

1181 >>> from joblib import Parallel, delayed 

1182 >>> Parallel(n_jobs=2)( 

1183 ... delayed(nlargest)(2, n) for n in (range(4), 'abcde', 3)) 

1184 ... # doctest: +SKIP 

1185 ----------------------------------------------------------------------- 

1186 Sub-process traceback: 

1187 ----------------------------------------------------------------------- 

1188 TypeError Mon Nov 12 11:37:46 2012 

1189 PID: 12934 Python 2.7.3: /usr/bin/python 

1190 ........................................................................ 

1191 /usr/lib/python2.7/heapq.pyc in nlargest(n=2, iterable=3, key=None) 

1192 419 if n >= size: 

1193 420 return sorted(iterable, key=key, reverse=True)[:n] 

1194 421 

1195 422 # When key is none, use simpler decoration 

1196 423 if key is None: 

1197 --> 424 it = izip(iterable, count(0,-1)) # decorate 

1198 425 result = _nlargest(n, it) 

1199 426 return map(itemgetter(0), result) # undecorate 

1200 427 

1201 428 # General case, slowest method 

1202 TypeError: izip argument #1 must support iteration 

1203 _______________________________________________________________________ 

1204 

1205 

1206 Using pre_dispatch in a producer/consumer situation, where the 

1207 data is generated on the fly. Note how the producer is first 

1208 called 3 times before the parallel loop is initiated, and then 

1209 called to generate new data on the fly: 

1210 

1211 >>> from math import sqrt 

1212 >>> from joblib import Parallel, delayed 

1213 >>> def producer(): 

1214 ... for i in range(6): 

1215 ... print('Produced %s' % i) 

1216 ... yield i 

1217 >>> out = Parallel(n_jobs=2, verbose=100, pre_dispatch='1.5*n_jobs')( 

1218 ... delayed(sqrt)(i) for i in producer()) #doctest: +SKIP 

1219 Produced 0 

1220 Produced 1 

1221 Produced 2 

1222 [Parallel(n_jobs=2)]: Done 1 jobs | elapsed: 0.0s 

1223 Produced 3 

1224 [Parallel(n_jobs=2)]: Done 2 jobs | elapsed: 0.0s 

1225 Produced 4 

1226 [Parallel(n_jobs=2)]: Done 3 jobs | elapsed: 0.0s 

1227 Produced 5 

1228 [Parallel(n_jobs=2)]: Done 4 jobs | elapsed: 0.0s 

1229 [Parallel(n_jobs=2)]: Done 6 out of 6 | elapsed: 0.0s remaining: 0.0s 

1230 [Parallel(n_jobs=2)]: Done 6 out of 6 | elapsed: 0.0s finished 

1231 

1232 """ # noqa: E501 

1233 

1234 def __init__( 

1235 self, 

1236 n_jobs=default_parallel_config["n_jobs"], 

1237 backend=default_parallel_config["backend"], 

1238 return_as="list", 

1239 verbose=default_parallel_config["verbose"], 

1240 timeout=None, 

1241 pre_dispatch="2 * n_jobs", 

1242 batch_size="auto", 

1243 temp_folder=default_parallel_config["temp_folder"], 

1244 max_nbytes=default_parallel_config["max_nbytes"], 

1245 mmap_mode=default_parallel_config["mmap_mode"], 

1246 prefer=default_parallel_config["prefer"], 

1247 require=default_parallel_config["require"], 

1248 **backend_kwargs, 

1249 ): 

1250 # Initiate parent Logger class state 

1251 super().__init__() 

1252 

1253 # Interpret n_jobs=None as 'unset' 

1254 if n_jobs is None: 

1255 n_jobs = default_parallel_config["n_jobs"] 

1256 

1257 active_backend, context_config = _get_active_backend( 

1258 prefer=prefer, require=require, verbose=verbose 

1259 ) 

1260 

1261 nesting_level = active_backend.nesting_level 

1262 

1263 self.verbose = _get_config_param(verbose, context_config, "verbose") 

1264 self.timeout = timeout 

1265 self.pre_dispatch = pre_dispatch 

1266 

1267 if return_as not in {"list", "generator", "generator_unordered"}: 

1268 raise ValueError( 

1269 'Expected `return_as` parameter to be a string equal to "list"' 

1270 f',"generator" or "generator_unordered", but got {return_as} ' 

1271 "instead." 

1272 ) 

1273 self.return_as = return_as 

1274 self.return_generator = return_as != "list" 

1275 self.return_ordered = return_as != "generator_unordered" 

1276 

1277 # Check if we are under a parallel_config or parallel_backend 

1278 # context manager and use the config from the context manager 

1279 # for arguments that are not explicitly set. 

1280 self._backend_kwargs = { 

1281 **backend_kwargs, 

1282 **{ 

1283 k: _get_config_param(param, context_config, k) 

1284 for param, k in [ 

1285 (max_nbytes, "max_nbytes"), 

1286 (temp_folder, "temp_folder"), 

1287 (mmap_mode, "mmap_mode"), 

1288 (prefer, "prefer"), 

1289 (require, "require"), 

1290 (verbose, "verbose"), 

1291 ] 

1292 }, 

1293 } 

1294 

1295 if isinstance(self._backend_kwargs["max_nbytes"], str): 

1296 self._backend_kwargs["max_nbytes"] = memstr_to_bytes( 

1297 self._backend_kwargs["max_nbytes"] 

1298 ) 

1299 self._backend_kwargs["verbose"] = max(0, self._backend_kwargs["verbose"] - 50) 

1300 

1301 if DEFAULT_MP_CONTEXT is not None: 

1302 self._backend_kwargs["context"] = DEFAULT_MP_CONTEXT 

1303 elif hasattr(mp, "get_context"): 

1304 self._backend_kwargs["context"] = mp.get_context() 

1305 

1306 if backend is default_parallel_config["backend"] or backend is None: 

1307 backend = active_backend 

1308 

1309 elif isinstance(backend, ParallelBackendBase): 

1310 # Use provided backend as is, with the current nesting_level if it 

1311 # is not set yet. 

1312 if backend.nesting_level is None: 

1313 backend.nesting_level = nesting_level 

1314 

1315 elif hasattr(backend, "Pool") and hasattr(backend, "Lock"): 

1316 # Make it possible to pass a custom multiprocessing context as 

1317 # backend to change the start method to forkserver or spawn or 

1318 # preload modules on the forkserver helper process. 

1319 self._backend_kwargs["context"] = backend 

1320 backend = MultiprocessingBackend(nesting_level=nesting_level) 

1321 

1322 elif backend not in BACKENDS and backend in MAYBE_AVAILABLE_BACKENDS: 

1323 warnings.warn( 

1324 f"joblib backend '{backend}' is not available on " 

1325 f"your system, falling back to {DEFAULT_BACKEND}.", 

1326 UserWarning, 

1327 stacklevel=2, 

1328 ) 

1329 BACKENDS[backend] = BACKENDS[DEFAULT_BACKEND] 

1330 backend = BACKENDS[DEFAULT_BACKEND](nesting_level=nesting_level) 

1331 

1332 else: 

1333 try: 

1334 backend_factory = BACKENDS[backend] 

1335 except KeyError as e: 

1336 raise ValueError( 

1337 "Invalid backend: %s, expected one of %r" 

1338 % (backend, sorted(BACKENDS.keys())) 

1339 ) from e 

1340 backend = backend_factory(nesting_level=nesting_level) 

1341 

1342 n_jobs = _get_config_param(n_jobs, context_config, "n_jobs") 

1343 if n_jobs is None: 

1344 # No specific context override and no specific value request: 

1345 # default to the default of the backend. 

1346 n_jobs = backend.default_n_jobs 

1347 try: 

1348 n_jobs = int(n_jobs) 

1349 except ValueError: 

1350 raise ValueError("n_jobs could not be converted to int") 

1351 self.n_jobs = n_jobs 

1352 

1353 if require == "sharedmem" and not getattr(backend, "supports_sharedmem", False): 

1354 raise ValueError("Backend %s does not support shared memory" % backend) 

1355 

1356 if batch_size == "auto" or isinstance(batch_size, Integral) and batch_size > 0: 

1357 self.batch_size = batch_size 

1358 else: 

1359 raise ValueError( 

1360 "batch_size must be 'auto' or a positive integer, got: %r" % batch_size 

1361 ) 

1362 

1363 if not isinstance(backend, SequentialBackend): 

1364 if self.return_generator and not backend.supports_return_generator: 

1365 raise ValueError( 

1366 "Backend {} does not support return_as={}".format( 

1367 backend, return_as 

1368 ) 

1369 ) 

1370 # This lock is used to coordinate the main thread of this process 

1371 # with the async callback thread of our the pool. 

1372 self._lock = threading.RLock() 

1373 self._jobs = collections.deque() 

1374 self._jobs_set = set() 

1375 self._pending_outputs = list() 

1376 self._ready_batches = queue.Queue() 

1377 self._reducer_callback = None 

1378 

1379 # Internal variables 

1380 self._backend = backend 

1381 self._running = False 

1382 self._managed_backend = False 

1383 self._id = uuid4().hex 

1384 self._call_ref = None 

1385 

1386 def __enter__(self): 

1387 self._managed_backend = True 

1388 self._calling = False 

1389 self._initialize_backend() 

1390 return self 

1391 

1392 def __exit__(self, exc_type, exc_value, traceback): 

1393 self._managed_backend = False 

1394 if self.return_generator and self._calling: 

1395 self._abort() 

1396 self._terminate_and_reset() 

1397 

1398 def _initialize_backend(self): 

1399 """Build a process or thread pool and return the number of workers""" 

1400 try: 

1401 n_jobs = self._backend.configure( 

1402 n_jobs=self.n_jobs, parallel=self, **self._backend_kwargs 

1403 ) 

1404 if self.timeout is not None and not self._backend.supports_timeout: 

1405 warnings.warn( 

1406 "The backend class {!r} does not support timeout. " 

1407 "You have set 'timeout={}' in Parallel but " 

1408 "the 'timeout' parameter will not be used.".format( 

1409 self._backend.__class__.__name__, self.timeout 

1410 ) 

1411 ) 

1412 

1413 except FallbackToBackend as e: 

1414 # Recursively initialize the backend in case of requested fallback. 

1415 self._backend = e.backend 

1416 n_jobs = self._initialize_backend() 

1417 

1418 return n_jobs 

1419 

1420 def _effective_n_jobs(self): 

1421 if self._backend: 

1422 return self._backend.effective_n_jobs(self.n_jobs) 

1423 return 1 

1424 

1425 def _terminate_and_reset(self): 

1426 if hasattr(self._backend, "stop_call") and self._calling: 

1427 self._backend.stop_call() 

1428 self._calling = False 

1429 if not self._managed_backend: 

1430 self._backend.terminate() 

1431 

1432 def _dispatch(self, batch): 

1433 """Queue the batch for computing, with or without multiprocessing 

1434 

1435 WARNING: this method is not thread-safe: it should be only called 

1436 indirectly via dispatch_one_batch. 

1437 

1438 """ 

1439 # If job.get() catches an exception, it closes the queue: 

1440 if self._aborting: 

1441 return 

1442 

1443 batch_size = len(batch) 

1444 

1445 self.n_dispatched_tasks += batch_size 

1446 self.n_dispatched_batches += 1 

1447 

1448 dispatch_timestamp = time.time() 

1449 

1450 batch_tracker = BatchCompletionCallBack(dispatch_timestamp, batch_size, self) 

1451 

1452 self._register_new_job(batch_tracker) 

1453 

1454 # If return_ordered is False, the batch_tracker is not stored in the 

1455 # jobs queue at the time of submission. Instead, it will be appended to 

1456 # the queue by itself as soon as the callback is triggered to be able 

1457 # to return the results in the order of completion. 

1458 

1459 job = self._backend.submit(batch, callback=batch_tracker) 

1460 batch_tracker.register_job(job) 

1461 

1462 def _register_new_job(self, batch_tracker): 

1463 if self.return_ordered: 

1464 self._jobs.append(batch_tracker) 

1465 else: 

1466 self._jobs_set.add(batch_tracker) 

1467 

1468 def dispatch_next(self): 

1469 """Dispatch more data for parallel processing 

1470 

1471 This method is meant to be called concurrently by the multiprocessing 

1472 callback. We rely on the thread-safety of dispatch_one_batch to protect 

1473 against concurrent consumption of the unprotected iterator. 

1474 """ 

1475 if not self.dispatch_one_batch(self._original_iterator): 

1476 self._iterating = False 

1477 self._original_iterator = None 

1478 

1479 def dispatch_one_batch(self, iterator): 

1480 """Prefetch the tasks for the next batch and dispatch them. 

1481 

1482 The effective size of the batch is computed here. 

1483 If there are no more jobs to dispatch, return False, else return True. 

1484 

1485 The iterator consumption and dispatching is protected by the same 

1486 lock so calling this function should be thread safe. 

1487 

1488 """ 

1489 

1490 if self._aborting: 

1491 return False 

1492 

1493 batch_size = self._get_batch_size() 

1494 

1495 with self._lock: 

1496 # to ensure an even distribution of the workload between workers, 

1497 # we look ahead in the original iterators more than batch_size 

1498 # tasks - However, we keep consuming only one batch at each 

1499 # dispatch_one_batch call. The extra tasks are stored in a local 

1500 # queue, _ready_batches, that is looked-up prior to re-consuming 

1501 # tasks from the origal iterator. 

1502 try: 

1503 tasks = self._ready_batches.get(block=False) 

1504 except queue.Empty: 

1505 # slice the iterator n_jobs * batchsize items at a time. If the 

1506 # slice returns less than that, then the current batchsize puts 

1507 # too much weight on a subset of workers, while other may end 

1508 # up starving. So in this case, re-scale the batch size 

1509 # accordingly to distribute evenly the last items between all 

1510 # workers. 

1511 n_jobs = self._cached_effective_n_jobs 

1512 big_batch_size = batch_size * n_jobs 

1513 

1514 try: 

1515 islice = list(itertools.islice(iterator, big_batch_size)) 

1516 except Exception as e: 

1517 # Handle the fact that the generator of task raised an 

1518 # exception. As this part of the code can be executed in 

1519 # a thread internal to the backend, register a task with 

1520 # an error that will be raised in the user's thread. 

1521 if isinstance(e.__context__, queue.Empty): 

1522 # Suppress the cause of the exception if it is 

1523 # queue.Empty to avoid cluttered traceback. Only do it 

1524 # if the __context__ is really empty to avoid messing 

1525 # with causes of the original error. 

1526 e.__cause__ = None 

1527 batch_tracker = BatchCompletionCallBack(0, batch_size, self) 

1528 self._register_new_job(batch_tracker) 

1529 batch_tracker._register_outcome(dict(result=e, status=TASK_ERROR)) 

1530 return True 

1531 

1532 if len(islice) == 0: 

1533 return False 

1534 elif ( 

1535 iterator is self._original_iterator and len(islice) < big_batch_size 

1536 ): 

1537 # We reached the end of the original iterator (unless 

1538 # iterator is the ``pre_dispatch``-long initial slice of 

1539 # the original iterator) -- decrease the batch size to 

1540 # account for potential variance in the batches running 

1541 # time. 

1542 final_batch_size = max(1, len(islice) // (10 * n_jobs)) 

1543 else: 

1544 final_batch_size = max(1, len(islice) // n_jobs) 

1545 

1546 # enqueue n_jobs batches in a local queue 

1547 for i in range(0, len(islice), final_batch_size): 

1548 tasks = BatchedCalls( 

1549 islice[i : i + final_batch_size], 

1550 self._backend.get_nested_backend(), 

1551 self._reducer_callback, 

1552 self._pickle_cache, 

1553 ) 

1554 self._ready_batches.put(tasks) 

1555 

1556 # finally, get one task. 

1557 tasks = self._ready_batches.get(block=False) 

1558 if len(tasks) == 0: 

1559 # No more tasks available in the iterator: tell caller to stop. 

1560 return False 

1561 else: 

1562 self._dispatch(tasks) 

1563 return True 

1564 

1565 def _get_batch_size(self): 

1566 """Returns the effective batch size for dispatch""" 

1567 if self.batch_size == "auto": 

1568 return self._backend.compute_batch_size() 

1569 else: 

1570 # Fixed batch size strategy 

1571 return self.batch_size 

1572 

1573 def _print(self, msg): 

1574 """Display the message on stout or stderr depending on verbosity""" 

1575 # XXX: Not using the logger framework: need to 

1576 # learn to use logger better. 

1577 if not self.verbose: 

1578 return 

1579 if self.verbose < 50: 

1580 writer = sys.stderr.write 

1581 else: 

1582 writer = sys.stdout.write 

1583 writer(f"[{self}]: {msg}\n") 

1584 

1585 def _is_completed(self): 

1586 """Check if all tasks have been completed""" 

1587 return self.n_completed_tasks == self.n_dispatched_tasks and not ( 

1588 self._iterating or self._aborting 

1589 ) 

1590 

1591 def print_progress(self): 

1592 """Display the process of the parallel execution only a fraction 

1593 of time, controlled by self.verbose. 

1594 """ 

1595 

1596 if not self.verbose: 

1597 return 

1598 

1599 if self.n_tasks is not None and self.n_tasks > 0: 

1600 width = floor(log10(self.n_tasks)) + 1 

1601 else: 

1602 width = 3 

1603 elapsed_time = time.time() - self._start_time 

1604 

1605 if self._is_completed(): 

1606 # Make sure that we get a last message telling us we are done 

1607 self._print( 

1608 f"Done {self.n_completed_tasks:{width}d} out of " 

1609 f"{self.n_completed_tasks:{width}d} | elapsed: " 

1610 f"{short_format_time(elapsed_time)} finished" 

1611 ) 

1612 return 

1613 

1614 # Original job iterator becomes None once it has been fully 

1615 # consumed: at this point we know the total number of jobs and we are 

1616 # able to display an estimation of the remaining time based on already 

1617 # completed jobs. Otherwise, we simply display the number of completed 

1618 # tasks. 

1619 elif self._original_iterator is not None: 

1620 if _verbosity_filter(self.n_dispatched_batches, self.verbose): 

1621 return 

1622 fmt_time = f"| elapsed: {short_format_time(elapsed_time)}" 

1623 index = self.n_completed_tasks 

1624 if self.n_tasks is not None: 

1625 self._print( 

1626 f"Done {index:{width}d} out of {self.n_tasks:{width}d} {fmt_time}" 

1627 ) 

1628 else: 

1629 pad = " " * (len("out of ") + width - len("tasks")) 

1630 self._print(f"Done {index:{width}d} tasks {pad}{fmt_time}") 

1631 else: 

1632 index = self.n_completed_tasks 

1633 # We are finished dispatching 

1634 total_tasks = self.n_dispatched_tasks 

1635 # We always display the first loop 

1636 if index != 0: 

1637 # Display depending on the number of remaining items 

1638 # A message as soon as we finish dispatching, cursor is 0 

1639 cursor = total_tasks - index + 1 - self._pre_dispatch_amount 

1640 frequency = (total_tasks // self.verbose) + 1 

1641 is_last_item = index + 1 == total_tasks 

1642 if is_last_item or cursor % frequency: 

1643 return 

1644 remaining_time = (elapsed_time / max(index, 1)) * ( 

1645 self.n_dispatched_tasks - index 

1646 ) 

1647 # only display status if remaining time is greater or equal to 0 

1648 self._print( 

1649 f"Done {index:{width}d} out of {total_tasks:{width}d} " 

1650 f"| elapsed: {short_format_time(elapsed_time)} remaining: " 

1651 f"{short_format_time(remaining_time)}" 

1652 ) 

1653 

1654 def _abort(self): 

1655 # Stop dispatching new jobs in the async callback thread 

1656 self._aborting = True 

1657 

1658 # If the backend allows it, cancel or kill remaining running 

1659 # tasks without waiting for the results as we will raise 

1660 # the exception we got back to the caller instead of returning 

1661 # any result. 

1662 backend = self._backend 

1663 if not self._aborted and hasattr(backend, "abort_everything"): 

1664 # If the backend is managed externally we need to make sure 

1665 # to leave it in a working state to allow for future jobs 

1666 # scheduling. 

1667 ensure_ready = self._managed_backend 

1668 backend.abort_everything(ensure_ready=ensure_ready) 

1669 self._aborted = True 

1670 

1671 def _start(self, iterator, pre_dispatch): 

1672 # Only set self._iterating to True if at least a batch 

1673 # was dispatched. In particular this covers the edge 

1674 # case of Parallel used with an exhausted iterator. If 

1675 # self._original_iterator is None, then this means either 

1676 # that pre_dispatch == "all", n_jobs == 1 or that the first batch 

1677 # was very quick and its callback already dispatched all the 

1678 # remaining jobs. 

1679 self._iterating = False 

1680 if self.dispatch_one_batch(iterator): 

1681 self._iterating = self._original_iterator is not None 

1682 

1683 while self.dispatch_one_batch(iterator): 

1684 pass 

1685 

1686 if pre_dispatch == "all": 

1687 # The iterable was consumed all at once by the above for loop. 

1688 # No need to wait for async callbacks to trigger to 

1689 # consumption. 

1690 self._iterating = False 

1691 

1692 def _get_outputs(self, iterator, pre_dispatch): 

1693 """Iterator returning the tasks' output as soon as they are ready.""" 

1694 dispatch_thread_id = threading.get_ident() 

1695 detach_generator_exit = False 

1696 try: 

1697 self._start(iterator, pre_dispatch) 

1698 # first yield returns None, for internal use only. This ensures 

1699 # that we enter the try/except block and start dispatching the 

1700 # tasks. 

1701 yield 

1702 

1703 with self._backend.retrieval_context(): 

1704 yield from self._retrieve() 

1705 

1706 except GeneratorExit: 

1707 # The generator has been garbage collected before being fully 

1708 # consumed. This aborts the remaining tasks if possible and warn 

1709 # the user if necessary. 

1710 self._exception = True 

1711 

1712 # In some interpreters such as PyPy, GeneratorExit can be raised in 

1713 # a different thread than the one used to start the dispatch of the 

1714 # parallel tasks. This can lead to hang when a thread attempts to 

1715 # join itself. As workaround, we detach the execution of the 

1716 # aborting code to a dedicated thread. We then need to make sure 

1717 # the rest of the function does not call `_terminate_and_reset` 

1718 # in finally. 

1719 if dispatch_thread_id != threading.get_ident(): 

1720 warnings.warn( 

1721 "A generator produced by joblib.Parallel has been " 

1722 "gc'ed in an unexpected thread. This behavior should " 

1723 "not cause major -issues but to make sure, please " 

1724 "report this warning and your use case at " 

1725 "https://github.com/joblib/joblib/issues so it can " 

1726 "be investigated." 

1727 ) 

1728 

1729 detach_generator_exit = True 

1730 _parallel = self 

1731 

1732 class _GeneratorExitThread(threading.Thread): 

1733 def run(self): 

1734 _parallel._abort() 

1735 if _parallel.return_generator: 

1736 _parallel._warn_exit_early() 

1737 _parallel._terminate_and_reset() 

1738 

1739 _GeneratorExitThread(name="GeneratorExitThread").start() 

1740 return 

1741 

1742 # Otherwise, we are in the thread that started the dispatch: we can 

1743 # safely abort the execution and warn the user. 

1744 self._abort() 

1745 if self.return_generator: 

1746 self._warn_exit_early() 

1747 

1748 raise 

1749 

1750 # Note: we catch any BaseException instead of just Exception instances 

1751 # to also include KeyboardInterrupt 

1752 except BaseException: 

1753 self._exception = True 

1754 self._abort() 

1755 raise 

1756 finally: 

1757 # Store the unconsumed tasks and terminate the workers if necessary 

1758 _remaining_outputs = [] if self._exception else self._jobs 

1759 self._jobs = collections.deque() 

1760 self._jobs_set = set() 

1761 self._running = False 

1762 if not detach_generator_exit: 

1763 self._terminate_and_reset() 

1764 

1765 while len(_remaining_outputs) > 0: 

1766 batched_results = _remaining_outputs.popleft() 

1767 batched_results = batched_results.get_result(self.timeout) 

1768 for result in batched_results: 

1769 yield result 

1770 

1771 def _wait_retrieval(self): 

1772 """Return True if we need to continue retrieving some tasks.""" 

1773 

1774 # If the input load is still being iterated over, it means that tasks 

1775 # are still on the dispatch waitlist and their results will need to 

1776 # be retrieved later on. 

1777 if self._iterating: 

1778 return True 

1779 

1780 # If some of the dispatched tasks are still being processed by the 

1781 # workers, wait for the compute to finish before starting retrieval 

1782 if self.n_completed_tasks < self.n_dispatched_tasks: 

1783 return True 

1784 

1785 # For backends that does not support retrieving asynchronously the 

1786 # result to the main process, all results must be carefully retrieved 

1787 # in the _retrieve loop in the main thread while the backend is alive. 

1788 # For other backends, the actual retrieval is done asynchronously in 

1789 # the callback thread, and we can terminate the backend before the 

1790 # `self._jobs` result list has been emptied. The remaining results 

1791 # will be collected in the `finally` step of the generator. 

1792 if not self._backend.supports_retrieve_callback: 

1793 if len(self._jobs) > 0: 

1794 return True 

1795 

1796 return False 

1797 

1798 def _retrieve(self): 

1799 timeout_control_job = None 

1800 while self._wait_retrieval(): 

1801 # If the callback thread of a worker has signaled that its task 

1802 # triggered an exception, or if the retrieval loop has raised an 

1803 # exception (e.g. `GeneratorExit`), exit the loop and surface the 

1804 # worker traceback. 

1805 if self._aborting: 

1806 self._raise_error_fast() 

1807 break 

1808 

1809 nb_jobs = len(self._jobs) 

1810 # Now wait for a job to be ready for retrieval. 

1811 if self.return_ordered: 

1812 # Case ordered: wait for completion (or error) of the next job 

1813 # that have been dispatched and not retrieved yet. If no job 

1814 # have been dispatched yet, wait for dispatch. 

1815 # We assume that the time to wait for the next job to be 

1816 # dispatched is always low, so that the timeout 

1817 # control only have to be done on the amount of time the next 

1818 # dispatched job is pending. 

1819 if (nb_jobs == 0) or ( 

1820 self._jobs[0].get_status(timeout=self.timeout) == TASK_PENDING 

1821 ): 

1822 time.sleep(0.01) 

1823 continue 

1824 

1825 elif nb_jobs == 0: 

1826 # Case unordered: jobs are added to the list of jobs to 

1827 # retrieve `self._jobs` only once completed or in error, which 

1828 # is too late to enable timeout control in the same way than in 

1829 # the previous case. 

1830 # Instead, if no job is ready to be retrieved yet, we 

1831 # arbitrarily pick a dispatched job, and the timeout control is 

1832 # done such that an error is raised if this control job 

1833 # timeouts before any other dispatched job has completed and 

1834 # been added to `self._jobs` to be retrieved. 

1835 if timeout_control_job is None: 

1836 timeout_control_job = next(iter(self._jobs_set), None) 

1837 

1838 # NB: it can be None if no job has been dispatched yet. 

1839 if timeout_control_job is not None: 

1840 timeout_control_job.get_status(timeout=self.timeout) 

1841 

1842 time.sleep(0.01) 

1843 continue 

1844 

1845 elif timeout_control_job is not None: 

1846 # Case unordered, when `nb_jobs > 0`: 

1847 # It means that a job is ready to be retrieved, so no timeout 

1848 # will occur during this iteration. 

1849 # Before proceeding to retrieval of the next ready job, reset 

1850 # the timeout control state to prepare the next iteration. 

1851 timeout_control_job._completion_timeout_counter = None 

1852 timeout_control_job = None 

1853 

1854 # We need to be careful: the job list can be filling up as 

1855 # we empty it and Python list are not thread-safe by 

1856 # default hence the use of the lock 

1857 with self._lock: 

1858 batched_results = self._jobs.popleft() 

1859 if not self.return_ordered: 

1860 self._jobs_set.remove(batched_results) 

1861 

1862 # Flatten the batched results to output one output at a time 

1863 batched_results = batched_results.get_result(self.timeout) 

1864 for result in batched_results: 

1865 self._nb_consumed += 1 

1866 yield result 

1867 

1868 def _raise_error_fast(self): 

1869 """If we are aborting, raise if a job caused an error.""" 

1870 

1871 # Find the first job whose status is TASK_ERROR if it exists. 

1872 with self._lock: 

1873 error_job = next( 

1874 (job for job in self._jobs if job.status == TASK_ERROR), None 

1875 ) 

1876 

1877 # If this error job exists, immediately raise the error by 

1878 # calling get_result. This job might not exists if abort has been 

1879 # called directly or if the generator is gc'ed. 

1880 if error_job is not None: 

1881 error_job.get_result(self.timeout) 

1882 

1883 def _warn_exit_early(self): 

1884 """Warn the user if the generator is gc'ed before being consumned.""" 

1885 ready_outputs = self.n_completed_tasks - self._nb_consumed 

1886 is_completed = self._is_completed() 

1887 msg = "" 

1888 if ready_outputs: 

1889 msg += ( 

1890 f"{ready_outputs} tasks have been successfully executed but not used." 

1891 ) 

1892 if not is_completed: 

1893 msg += " Additionally, " 

1894 

1895 if not is_completed: 

1896 msg += ( 

1897 f"{self.n_dispatched_tasks - self.n_completed_tasks} tasks " 

1898 "which were still being processed by the workers have been " 

1899 "cancelled." 

1900 ) 

1901 

1902 if msg: 

1903 msg += ( 

1904 " You could benefit from adjusting the input task " 

1905 "iterator to limit unnecessary computation time." 

1906 ) 

1907 

1908 warnings.warn(msg) 

1909 

1910 def _get_sequential_output(self, iterable): 

1911 """Separate loop for sequential output. 

1912 

1913 This simplifies the traceback in case of errors and reduces the 

1914 overhead of calling sequential tasks with `joblib`. 

1915 """ 

1916 try: 

1917 self._iterating = True 

1918 self._original_iterator = iterable 

1919 batch_size = self._get_batch_size() 

1920 

1921 if batch_size != 1: 

1922 it = iter(iterable) 

1923 iterable_batched = iter( 

1924 lambda: tuple(itertools.islice(it, batch_size)), () 

1925 ) 

1926 iterable = (task for batch in iterable_batched for task in batch) 

1927 

1928 # first yield returns None, for internal use only. This ensures 

1929 # that we enter the try/except block and setup the generator. 

1930 yield None 

1931 

1932 # Sequentially call the tasks and yield the results. 

1933 for func, args, kwargs in iterable: 

1934 self.n_dispatched_batches += 1 

1935 self.n_dispatched_tasks += 1 

1936 res = func(*args, **kwargs) 

1937 self.n_completed_tasks += 1 

1938 self.print_progress() 

1939 yield res 

1940 self._nb_consumed += 1 

1941 except BaseException: 

1942 self._exception = True 

1943 self._aborting = True 

1944 self._aborted = True 

1945 raise 

1946 finally: 

1947 self._running = False 

1948 self._iterating = False 

1949 self._original_iterator = None 

1950 self.print_progress() 

1951 

1952 def _reset_run_tracking(self): 

1953 """Reset the counters and flags used to track the execution.""" 

1954 

1955 # Makes sur the parallel instance was not previously running in a 

1956 # thread-safe way. 

1957 with getattr(self, "_lock", nullcontext()): 

1958 if self._running: 

1959 msg = "This Parallel instance is already running !" 

1960 if self.return_generator is True: 

1961 msg += ( 

1962 " Before submitting new tasks, you must wait for the " 

1963 "completion of all the previous tasks, or clean all " 

1964 "references to the output generator." 

1965 ) 

1966 raise RuntimeError(msg) 

1967 self._running = True 

1968 

1969 # Counter to keep track of the task dispatched and completed. 

1970 self.n_dispatched_batches = 0 

1971 self.n_dispatched_tasks = 0 

1972 self.n_completed_tasks = 0 

1973 

1974 # Following count is incremented by one each time the user iterates 

1975 # on the output generator, it is used to prepare an informative 

1976 # warning message in case the generator is deleted before all the 

1977 # dispatched tasks have been consumed. 

1978 self._nb_consumed = 0 

1979 

1980 # Following flags are used to synchronize the threads in case one of 

1981 # the tasks error-out to ensure that all workers abort fast and that 

1982 # the backend terminates properly. 

1983 

1984 # Set to True as soon as a worker signals that a task errors-out 

1985 self._exception = False 

1986 # Set to True in case of early termination following an incident 

1987 self._aborting = False 

1988 # Set to True after abortion is complete 

1989 self._aborted = False 

1990 

1991 def __call__(self, iterable): 

1992 """Main function to dispatch parallel tasks.""" 

1993 

1994 self._reset_run_tracking() 

1995 self.n_tasks = len(iterable) if hasattr(iterable, "__len__") else None 

1996 self._start_time = time.time() 

1997 

1998 if not self._managed_backend: 

1999 n_jobs = self._initialize_backend() 

2000 else: 

2001 n_jobs = self._effective_n_jobs() 

2002 

2003 if n_jobs == 1: 

2004 # If n_jobs==1, run the computation sequentially and return 

2005 # immediately to avoid overheads. 

2006 output = self._get_sequential_output(iterable) 

2007 next(output) 

2008 return output if self.return_generator else list(output) 

2009 

2010 # Let's create an ID that uniquely identifies the current call. If the 

2011 # call is interrupted early and that the same instance is immediately 

2012 # reused, this id will be used to prevent workers that were 

2013 # concurrently finalizing a task from the previous call to run the 

2014 # callback. 

2015 with self._lock: 

2016 self._call_id = uuid4().hex 

2017 

2018 # self._effective_n_jobs should be called in the Parallel.__call__ 

2019 # thread only -- store its value in an attribute for further queries. 

2020 self._cached_effective_n_jobs = n_jobs 

2021 

2022 if isinstance(self._backend, LokyBackend): 

2023 # For the loky backend, we add a callback executed when reducing 

2024 # BatchCalls, that makes the loky executor use a temporary folder 

2025 # specific to this Parallel object when pickling temporary memmaps. 

2026 # This callback is necessary to ensure that several Parallel 

2027 # objects using the same reusable executor don't use the same 

2028 # temporary resources. 

2029 

2030 def _batched_calls_reducer_callback(): 

2031 # Relevant implementation detail: the following lines, called 

2032 # when reducing BatchedCalls, are called in a thread-safe 

2033 # situation, meaning that the context of the temporary folder 

2034 # manager will not be changed in between the callback execution 

2035 # and the end of the BatchedCalls pickling. The reason is that 

2036 # pickling (the only place where set_current_context is used) 

2037 # is done from a single thread (the queue_feeder_thread). 

2038 self._backend._workers._temp_folder_manager.set_current_context( # noqa 

2039 self._id 

2040 ) 

2041 

2042 self._reducer_callback = _batched_calls_reducer_callback 

2043 

2044 # self._effective_n_jobs should be called in the Parallel.__call__ 

2045 # thread only -- store its value in an attribute for further queries. 

2046 self._cached_effective_n_jobs = n_jobs 

2047 

2048 backend_name = self._backend.__class__.__name__ 

2049 if n_jobs == 0: 

2050 raise RuntimeError("%s has no active worker." % backend_name) 

2051 

2052 self._print(f"Using backend {backend_name} with {n_jobs} concurrent workers.") 

2053 if hasattr(self._backend, "start_call"): 

2054 self._backend.start_call() 

2055 

2056 # Following flag prevents double calls to `backend.stop_call`. 

2057 self._calling = True 

2058 

2059 iterator = iter(iterable) 

2060 pre_dispatch = self.pre_dispatch 

2061 

2062 if pre_dispatch == "all": 

2063 # prevent further dispatch via multiprocessing callback thread 

2064 self._original_iterator = None 

2065 self._pre_dispatch_amount = 0 

2066 else: 

2067 self._original_iterator = iterator 

2068 if hasattr(pre_dispatch, "endswith"): 

2069 pre_dispatch = eval_expr(pre_dispatch.replace("n_jobs", str(n_jobs))) 

2070 self._pre_dispatch_amount = pre_dispatch = int(pre_dispatch) 

2071 

2072 # The main thread will consume the first pre_dispatch items and 

2073 # the remaining items will later be lazily dispatched by async 

2074 # callbacks upon task completions. 

2075 

2076 # TODO: this iterator should be batch_size * n_jobs 

2077 iterator = itertools.islice(iterator, self._pre_dispatch_amount) 

2078 

2079 # Use a caching dict for callables that are pickled with cloudpickle to 

2080 # improve performances. This cache is used only in the case of 

2081 # functions that are defined in the __main__ module, functions that 

2082 # are defined locally (inside another function) and lambda expressions. 

2083 self._pickle_cache = dict() 

2084 

2085 output = self._get_outputs(iterator, pre_dispatch) 

2086 self._call_ref = weakref.ref(output) 

2087 

2088 # The first item from the output is blank, but it makes the interpreter 

2089 # progress until it enters the Try/Except block of the generator and 

2090 # reaches the first `yield` statement. This starts the asynchronous 

2091 # dispatch of the tasks to the workers. 

2092 next(output) 

2093 

2094 return output if self.return_generator else list(output) 

2095 

2096 def __repr__(self): 

2097 return "%s(n_jobs=%s)" % (self.__class__.__name__, self.n_jobs)