Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/joblib/parallel.py: 16%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

632 statements  

1""" 

2Helpers for embarrassingly parallel code. 

3""" 

4# Author: Gael Varoquaux < gael dot varoquaux at normalesup dot org > 

5# Copyright: 2010, Gael Varoquaux 

6# License: BSD 3 clause 

7 

8from __future__ import division 

9 

10import os 

11import sys 

12from math import sqrt 

13import functools 

14import collections 

15import time 

16import threading 

17import itertools 

18from uuid import uuid4 

19from numbers import Integral 

20import warnings 

21import queue 

22import weakref 

23from contextlib import nullcontext 

24 

25from multiprocessing import TimeoutError 

26 

27from ._multiprocessing_helpers import mp 

28 

29from .logger import Logger, short_format_time 

30from .disk import memstr_to_bytes 

31from ._parallel_backends import (FallbackToBackend, MultiprocessingBackend, 

32 ThreadingBackend, SequentialBackend, 

33 LokyBackend) 

34from ._utils import eval_expr, _Sentinel 

35 

36# Make sure that those two classes are part of the public joblib.parallel API 

37# so that 3rd party backend implementers can import them from here. 

38from ._parallel_backends import AutoBatchingMixin # noqa 

39from ._parallel_backends import ParallelBackendBase # noqa 

40 

41 

42IS_PYPY = hasattr(sys, "pypy_version_info") 

43 

44 

45BACKENDS = { 

46 'threading': ThreadingBackend, 

47 'sequential': SequentialBackend, 

48} 

49# name of the backend used by default by Parallel outside of any context 

50# managed by ``parallel_config`` or ``parallel_backend``. 

51 

52# threading is the only backend that is always everywhere 

53DEFAULT_BACKEND = 'threading' 

54 

55MAYBE_AVAILABLE_BACKENDS = {'multiprocessing', 'loky'} 

56 

57# if multiprocessing is available, so is loky, we set it as the default 

58# backend 

59if mp is not None: 

60 BACKENDS['multiprocessing'] = MultiprocessingBackend 

61 from .externals import loky 

62 BACKENDS['loky'] = LokyBackend 

63 DEFAULT_BACKEND = 'loky' 

64 

65 

66DEFAULT_THREAD_BACKEND = 'threading' 

67 

68 

69# Thread local value that can be overridden by the ``parallel_config`` context 

70# manager 

71_backend = threading.local() 

72 

73 

74def _register_dask(): 

75 """Register Dask Backend if called with parallel_config(backend="dask")""" 

76 try: 

77 from ._dask import DaskDistributedBackend 

78 register_parallel_backend('dask', DaskDistributedBackend) 

79 except ImportError as e: 

80 msg = ("To use the dask.distributed backend you must install both " 

81 "the `dask` and distributed modules.\n\n" 

82 "See https://dask.pydata.org/en/latest/install.html for more " 

83 "information.") 

84 raise ImportError(msg) from e 

85 

86 

87EXTERNAL_BACKENDS = { 

88 'dask': _register_dask, 

89} 

90 

91 

92# Sentinels for the default values of the Parallel constructor and 

93# the parallel_config and parallel_backend context managers 

94default_parallel_config = { 

95 "backend": _Sentinel(default_value=None), 

96 "n_jobs": _Sentinel(default_value=None), 

97 "verbose": _Sentinel(default_value=0), 

98 "temp_folder": _Sentinel(default_value=None), 

99 "max_nbytes": _Sentinel(default_value="1M"), 

100 "mmap_mode": _Sentinel(default_value="r"), 

101 "prefer": _Sentinel(default_value=None), 

102 "require": _Sentinel(default_value=None), 

103} 

104 

105 

106VALID_BACKEND_HINTS = ('processes', 'threads', None) 

107VALID_BACKEND_CONSTRAINTS = ('sharedmem', None) 

108 

109 

110def _get_config_param(param, context_config, key): 

111 """Return the value of a parallel config parameter 

112 

113 Explicitly setting it in Parallel has priority over setting in a 

114 parallel_(config/backend) context manager. 

115 """ 

116 if param is not default_parallel_config[key]: 

117 # param is explicitly set, return it 

118 return param 

119 

120 if context_config[key] is not default_parallel_config[key]: 

121 # there's a context manager and the key is set, return it 

122 return context_config[key] 

123 

124 # Otherwise, we are in the default_parallel_config, 

125 # return the default value 

126 return param.default_value 

127 

128 

129def get_active_backend( 

130 prefer=default_parallel_config["prefer"], 

131 require=default_parallel_config["require"], 

132 verbose=default_parallel_config["verbose"], 

133): 

134 """Return the active default backend""" 

135 backend, config = _get_active_backend(prefer, require, verbose) 

136 n_jobs = _get_config_param( 

137 default_parallel_config['n_jobs'], config, "n_jobs" 

138 ) 

139 return backend, n_jobs 

140 

141 

142def _get_active_backend( 

143 prefer=default_parallel_config["prefer"], 

144 require=default_parallel_config["require"], 

145 verbose=default_parallel_config["verbose"], 

146): 

147 """Return the active default backend""" 

148 

149 backend_config = getattr(_backend, "config", default_parallel_config) 

150 

151 backend = _get_config_param( 

152 default_parallel_config['backend'], backend_config, "backend" 

153 ) 

154 prefer = _get_config_param(prefer, backend_config, "prefer") 

155 require = _get_config_param(require, backend_config, "require") 

156 verbose = _get_config_param(verbose, backend_config, "verbose") 

157 

158 if prefer not in VALID_BACKEND_HINTS: 

159 raise ValueError( 

160 f"prefer={prefer} is not a valid backend hint, " 

161 f"expected one of {VALID_BACKEND_HINTS}" 

162 ) 

163 if require not in VALID_BACKEND_CONSTRAINTS: 

164 raise ValueError( 

165 f"require={require} is not a valid backend constraint, " 

166 f"expected one of {VALID_BACKEND_CONSTRAINTS}" 

167 ) 

168 if prefer == 'processes' and require == 'sharedmem': 

169 raise ValueError( 

170 "prefer == 'processes' and require == 'sharedmem'" 

171 " are inconsistent settings" 

172 ) 

173 

174 explicit_backend = True 

175 if backend is None: 

176 

177 # We are either outside of the scope of any parallel_(config/backend) 

178 # context manager or the context manager did not set a backend. 

179 # create the default backend instance now. 

180 backend = BACKENDS[DEFAULT_BACKEND](nesting_level=0) 

181 explicit_backend = False 

182 

183 # Try to use the backend set by the user with the context manager. 

184 

185 nesting_level = backend.nesting_level 

186 uses_threads = getattr(backend, 'uses_threads', False) 

187 supports_sharedmem = getattr(backend, 'supports_sharedmem', False) 

188 # Force to use thread-based backend if the provided backend does not 

189 # match the shared memory constraint or if the backend is not explicitly 

190 # given and threads are preferred. 

191 force_threads = (require == 'sharedmem' and not supports_sharedmem) 

192 force_threads |= ( 

193 not explicit_backend and prefer == 'threads' and not uses_threads 

194 ) 

195 if force_threads: 

196 # This backend does not match the shared memory constraint: 

197 # fallback to the default thead-based backend. 

198 sharedmem_backend = BACKENDS[DEFAULT_THREAD_BACKEND]( 

199 nesting_level=nesting_level 

200 ) 

201 # Warn the user if we forced the backend to thread-based, while the 

202 # user explicitly specified a non-thread-based backend. 

203 if verbose >= 10 and explicit_backend: 

204 print( 

205 f"Using {sharedmem_backend.__class__.__name__} as " 

206 f"joblib backend instead of {backend.__class__.__name__} " 

207 "as the latter does not provide shared memory semantics." 

208 ) 

209 # Force to n_jobs=1 by default 

210 thread_config = backend_config.copy() 

211 thread_config['n_jobs'] = 1 

212 return sharedmem_backend, thread_config 

213 

214 return backend, backend_config 

215 

216 

217class parallel_config: 

218 """Set the default backend or configuration for :class:`~joblib.Parallel`. 

219 

220 This is an alternative to directly passing keyword arguments to the 

221 :class:`~joblib.Parallel` class constructor. It is particularly useful when 

222 calling into library code that uses joblib internally but does not expose 

223 the various parallel configuration arguments in its own API. 

224 

225 Parameters 

226 ---------- 

227 backend: str or ParallelBackendBase instance, default=None 

228 If ``backend`` is a string it must match a previously registered 

229 implementation using the :func:`~register_parallel_backend` function. 

230 

231 By default the following backends are available: 

232 

233 - 'loky': single-host, process-based parallelism (used by default), 

234 - 'threading': single-host, thread-based parallelism, 

235 - 'multiprocessing': legacy single-host, process-based parallelism. 

236 

237 'loky' is recommended to run functions that manipulate Python objects. 

238 'threading' is a low-overhead alternative that is most efficient for 

239 functions that release the Global Interpreter Lock: e.g. I/O-bound 

240 code or CPU-bound code in a few calls to native code that explicitly 

241 releases the GIL. Note that on some rare systems (such as pyodide), 

242 multiprocessing and loky may not be available, in which case joblib 

243 defaults to threading. 

244 

245 In addition, if the ``dask`` and ``distributed`` Python packages are 

246 installed, it is possible to use the 'dask' backend for better 

247 scheduling of nested parallel calls without over-subscription and 

248 potentially distribute parallel calls over a networked cluster of 

249 several hosts. 

250 

251 It is also possible to use the distributed 'ray' backend for 

252 distributing the workload to a cluster of nodes. See more details 

253 in the Examples section below. 

254 

255 Alternatively the backend can be passed directly as an instance. 

256 

257 n_jobs: int, default=None 

258 The maximum number of concurrently running jobs, such as the number 

259 of Python worker processes when ``backend="loky"`` or the size of the 

260 thread-pool when ``backend="threading"``. 

261 This argument is converted to an integer, rounded below for float. 

262 If -1 is given, `joblib` tries to use all CPUs. The number of CPUs 

263 ``n_cpus`` is obtained with :func:`~cpu_count`. 

264 For n_jobs below -1, (n_cpus + 1 + n_jobs) are used. For instance, 

265 using ``n_jobs=-2`` will result in all CPUs but one being used. 

266 This argument can also go above ``n_cpus``, which will cause 

267 oversubscription. In some cases, slight oversubscription can be 

268 beneficial, e.g., for tasks with large I/O operations. 

269 If 1 is given, no parallel computing code is used at all, and the 

270 behavior amounts to a simple python `for` loop. This mode is not 

271 compatible with `timeout`. 

272 None is a marker for 'unset' that will be interpreted as n_jobs=1 

273 unless the call is performed under a :func:`~parallel_config` 

274 context manager that sets another value for ``n_jobs``. 

275 If n_jobs = 0 then a ValueError is raised. 

276 

277 verbose: int, default=0 

278 The verbosity level: if non zero, progress messages are 

279 printed. Above 50, the output is sent to stdout. 

280 The frequency of the messages increases with the verbosity level. 

281 If it more than 10, all iterations are reported. 

282 

283 temp_folder: str or None, default=None 

284 Folder to be used by the pool for memmapping large arrays 

285 for sharing memory with worker processes. If None, this will try in 

286 order: 

287 

288 - a folder pointed by the ``JOBLIB_TEMP_FOLDER`` environment 

289 variable, 

290 - ``/dev/shm`` if the folder exists and is writable: this is a 

291 RAM disk filesystem available by default on modern Linux 

292 distributions, 

293 - the default system temporary folder that can be 

294 overridden with ``TMP``, ``TMPDIR`` or ``TEMP`` environment 

295 variables, typically ``/tmp`` under Unix operating systems. 

296 

297 max_nbytes int, str, or None, optional, default='1M' 

298 Threshold on the size of arrays passed to the workers that 

299 triggers automated memory mapping in temp_folder. Can be an int 

300 in Bytes, or a human-readable string, e.g., '1M' for 1 megabyte. 

301 Use None to disable memmapping of large arrays. 

302 

303 mmap_mode: {None, 'r+', 'r', 'w+', 'c'}, default='r' 

304 Memmapping mode for numpy arrays passed to workers. None will 

305 disable memmapping, other modes defined in the numpy.memmap doc: 

306 https://numpy.org/doc/stable/reference/generated/numpy.memmap.html 

307 Also, see 'max_nbytes' parameter documentation for more details. 

308 

309 prefer: str in {'processes', 'threads'} or None, default=None 

310 Soft hint to choose the default backend. 

311 The default process-based backend is 'loky' and the default 

312 thread-based backend is 'threading'. Ignored if the ``backend`` 

313 parameter is specified. 

314 

315 require: 'sharedmem' or None, default=None 

316 Hard constraint to select the backend. If set to 'sharedmem', 

317 the selected backend will be single-host and thread-based. 

318 

319 inner_max_num_threads: int, default=None 

320 If not None, overwrites the limit set on the number of threads 

321 usable in some third-party library threadpools like OpenBLAS, 

322 MKL or OpenMP. This is only used with the ``loky`` backend. 

323 

324 backend_params: dict 

325 Additional parameters to pass to the backend constructor when 

326 backend is a string. 

327 

328 Notes 

329 ----- 

330 Joblib tries to limit the oversubscription by limiting the number of 

331 threads usable in some third-party library threadpools like OpenBLAS, MKL 

332 or OpenMP. The default limit in each worker is set to 

333 ``max(cpu_count() // effective_n_jobs, 1)`` but this limit can be 

334 overwritten with the ``inner_max_num_threads`` argument which will be used 

335 to set this limit in the child processes. 

336 

337 .. versionadded:: 1.3 

338 

339 Examples 

340 -------- 

341 >>> from operator import neg 

342 >>> with parallel_config(backend='threading'): 

343 ... print(Parallel()(delayed(neg)(i + 1) for i in range(5))) 

344 ... 

345 [-1, -2, -3, -4, -5] 

346 

347 To use the 'ray' joblib backend add the following lines: 

348 

349 >>> from ray.util.joblib import register_ray # doctest: +SKIP 

350 >>> register_ray() # doctest: +SKIP 

351 >>> with parallel_config(backend="ray"): # doctest: +SKIP 

352 ... print(Parallel()(delayed(neg)(i + 1) for i in range(5))) 

353 [-1, -2, -3, -4, -5] 

354 

355 """ 

356 def __init__( 

357 self, 

358 backend=default_parallel_config["backend"], 

359 *, 

360 n_jobs=default_parallel_config["n_jobs"], 

361 verbose=default_parallel_config["verbose"], 

362 temp_folder=default_parallel_config["temp_folder"], 

363 max_nbytes=default_parallel_config["max_nbytes"], 

364 mmap_mode=default_parallel_config["mmap_mode"], 

365 prefer=default_parallel_config["prefer"], 

366 require=default_parallel_config["require"], 

367 inner_max_num_threads=None, 

368 **backend_params 

369 ): 

370 # Save the parallel info and set the active parallel config 

371 self.old_parallel_config = getattr( 

372 _backend, "config", default_parallel_config 

373 ) 

374 

375 backend = self._check_backend( 

376 backend, inner_max_num_threads, **backend_params 

377 ) 

378 

379 new_config = { 

380 "n_jobs": n_jobs, 

381 "verbose": verbose, 

382 "temp_folder": temp_folder, 

383 "max_nbytes": max_nbytes, 

384 "mmap_mode": mmap_mode, 

385 "prefer": prefer, 

386 "require": require, 

387 "backend": backend 

388 } 

389 self.parallel_config = self.old_parallel_config.copy() 

390 self.parallel_config.update({ 

391 k: v for k, v in new_config.items() 

392 if not isinstance(v, _Sentinel) 

393 }) 

394 

395 setattr(_backend, "config", self.parallel_config) 

396 

397 def _check_backend(self, backend, inner_max_num_threads, **backend_params): 

398 if backend is default_parallel_config['backend']: 

399 if inner_max_num_threads is not None or len(backend_params) > 0: 

400 raise ValueError( 

401 "inner_max_num_threads and other constructor " 

402 "parameters backend_params are only supported " 

403 "when backend is not None." 

404 ) 

405 return backend 

406 

407 if isinstance(backend, str): 

408 # Handle non-registered or missing backends 

409 if backend not in BACKENDS: 

410 if backend in EXTERNAL_BACKENDS: 

411 register = EXTERNAL_BACKENDS[backend] 

412 register() 

413 elif backend in MAYBE_AVAILABLE_BACKENDS: 

414 warnings.warn( 

415 f"joblib backend '{backend}' is not available on " 

416 f"your system, falling back to {DEFAULT_BACKEND}.", 

417 UserWarning, 

418 stacklevel=2 

419 ) 

420 BACKENDS[backend] = BACKENDS[DEFAULT_BACKEND] 

421 else: 

422 raise ValueError( 

423 f"Invalid backend: {backend}, expected one of " 

424 f"{sorted(BACKENDS.keys())}" 

425 ) 

426 

427 backend = BACKENDS[backend](**backend_params) 

428 

429 if inner_max_num_threads is not None: 

430 msg = ( 

431 f"{backend.__class__.__name__} does not accept setting the " 

432 "inner_max_num_threads argument." 

433 ) 

434 assert backend.supports_inner_max_num_threads, msg 

435 backend.inner_max_num_threads = inner_max_num_threads 

436 

437 # If the nesting_level of the backend is not set previously, use the 

438 # nesting level from the previous active_backend to set it 

439 if backend.nesting_level is None: 

440 parent_backend = self.old_parallel_config['backend'] 

441 if parent_backend is default_parallel_config['backend']: 

442 nesting_level = 0 

443 else: 

444 nesting_level = parent_backend.nesting_level 

445 backend.nesting_level = nesting_level 

446 

447 return backend 

448 

449 def __enter__(self): 

450 return self.parallel_config 

451 

452 def __exit__(self, type, value, traceback): 

453 self.unregister() 

454 

455 def unregister(self): 

456 setattr(_backend, "config", self.old_parallel_config) 

457 

458 

459class parallel_backend(parallel_config): 

460 """Change the default backend used by Parallel inside a with block. 

461 

462 .. warning:: 

463 It is advised to use the :class:`~joblib.parallel_config` context 

464 manager instead, which allows more fine-grained control over the 

465 backend configuration. 

466 

467 If ``backend`` is a string it must match a previously registered 

468 implementation using the :func:`~register_parallel_backend` function. 

469 

470 By default the following backends are available: 

471 

472 - 'loky': single-host, process-based parallelism (used by default), 

473 - 'threading': single-host, thread-based parallelism, 

474 - 'multiprocessing': legacy single-host, process-based parallelism. 

475 

476 'loky' is recommended to run functions that manipulate Python objects. 

477 'threading' is a low-overhead alternative that is most efficient for 

478 functions that release the Global Interpreter Lock: e.g. I/O-bound code or 

479 CPU-bound code in a few calls to native code that explicitly releases the 

480 GIL. Note that on some rare systems (such as Pyodide), 

481 multiprocessing and loky may not be available, in which case joblib 

482 defaults to threading. 

483 

484 You can also use the `Dask <https://docs.dask.org/en/stable/>`_ joblib 

485 backend to distribute work across machines. This works well with 

486 scikit-learn estimators with the ``n_jobs`` parameter, for example:: 

487 

488 >>> import joblib # doctest: +SKIP 

489 >>> from sklearn.model_selection import GridSearchCV # doctest: +SKIP 

490 >>> from dask.distributed import Client, LocalCluster # doctest: +SKIP 

491 

492 >>> # create a local Dask cluster 

493 >>> cluster = LocalCluster() # doctest: +SKIP 

494 >>> client = Client(cluster) # doctest: +SKIP 

495 >>> grid_search = GridSearchCV(estimator, param_grid, n_jobs=-1) 

496 ... # doctest: +SKIP 

497 >>> with joblib.parallel_backend("dask", scatter=[X, y]): # doctest: +SKIP 

498 ... grid_search.fit(X, y) 

499 

500 It is also possible to use the distributed 'ray' backend for distributing 

501 the workload to a cluster of nodes. To use the 'ray' joblib backend add 

502 the following lines:: 

503 

504 >>> from ray.util.joblib import register_ray # doctest: +SKIP 

505 >>> register_ray() # doctest: +SKIP 

506 >>> with parallel_backend("ray"): # doctest: +SKIP 

507 ... print(Parallel()(delayed(neg)(i + 1) for i in range(5))) 

508 [-1, -2, -3, -4, -5] 

509 

510 Alternatively the backend can be passed directly as an instance. 

511 

512 By default all available workers will be used (``n_jobs=-1``) unless the 

513 caller passes an explicit value for the ``n_jobs`` parameter. 

514 

515 This is an alternative to passing a ``backend='backend_name'`` argument to 

516 the :class:`~Parallel` class constructor. It is particularly useful when 

517 calling into library code that uses joblib internally but does not expose 

518 the backend argument in its own API. 

519 

520 >>> from operator import neg 

521 >>> with parallel_backend('threading'): 

522 ... print(Parallel()(delayed(neg)(i + 1) for i in range(5))) 

523 ... 

524 [-1, -2, -3, -4, -5] 

525 

526 Joblib also tries to limit the oversubscription by limiting the number of 

527 threads usable in some third-party library threadpools like OpenBLAS, MKL 

528 or OpenMP. The default limit in each worker is set to 

529 ``max(cpu_count() // effective_n_jobs, 1)`` but this limit can be 

530 overwritten with the ``inner_max_num_threads`` argument which will be used 

531 to set this limit in the child processes. 

532 

533 .. versionadded:: 0.10 

534 

535 See Also 

536 -------- 

537 joblib.parallel_config: context manager to change the backend 

538 configuration. 

539 """ 

540 def __init__(self, backend, n_jobs=-1, inner_max_num_threads=None, 

541 **backend_params): 

542 

543 super().__init__( 

544 backend=backend, 

545 n_jobs=n_jobs, 

546 inner_max_num_threads=inner_max_num_threads, 

547 **backend_params 

548 ) 

549 

550 if self.old_parallel_config is None: 

551 self.old_backend_and_jobs = None 

552 else: 

553 self.old_backend_and_jobs = ( 

554 self.old_parallel_config["backend"], 

555 self.old_parallel_config["n_jobs"], 

556 ) 

557 self.new_backend_and_jobs = ( 

558 self.parallel_config["backend"], 

559 self.parallel_config["n_jobs"], 

560 ) 

561 

562 def __enter__(self): 

563 return self.new_backend_and_jobs 

564 

565 

566# Under Linux or OS X the default start method of multiprocessing 

567# can cause third party libraries to crash. Under Python 3.4+ it is possible 

568# to set an environment variable to switch the default start method from 

569# 'fork' to 'forkserver' or 'spawn' to avoid this issue albeit at the cost 

570# of causing semantic changes and some additional pool instantiation overhead. 

571DEFAULT_MP_CONTEXT = None 

572if hasattr(mp, 'get_context'): 

573 method = os.environ.get('JOBLIB_START_METHOD', '').strip() or None 

574 if method is not None: 

575 DEFAULT_MP_CONTEXT = mp.get_context(method=method) 

576 

577 

578class BatchedCalls(object): 

579 """Wrap a sequence of (func, args, kwargs) tuples as a single callable""" 

580 

581 def __init__(self, iterator_slice, backend_and_jobs, reducer_callback=None, 

582 pickle_cache=None): 

583 self.items = list(iterator_slice) 

584 self._size = len(self.items) 

585 self._reducer_callback = reducer_callback 

586 if isinstance(backend_and_jobs, tuple): 

587 self._backend, self._n_jobs = backend_and_jobs 

588 else: 

589 # this is for backward compatibility purposes. Before 0.12.6, 

590 # nested backends were returned without n_jobs indications. 

591 self._backend, self._n_jobs = backend_and_jobs, None 

592 self._pickle_cache = pickle_cache if pickle_cache is not None else {} 

593 

594 def __call__(self): 

595 # Set the default nested backend to self._backend but do not set the 

596 # change the default number of processes to -1 

597 with parallel_config(backend=self._backend, n_jobs=self._n_jobs): 

598 return [func(*args, **kwargs) 

599 for func, args, kwargs in self.items] 

600 

601 def __reduce__(self): 

602 if self._reducer_callback is not None: 

603 self._reducer_callback() 

604 # no need to pickle the callback. 

605 return ( 

606 BatchedCalls, 

607 (self.items, (self._backend, self._n_jobs), None, 

608 self._pickle_cache) 

609 ) 

610 

611 def __len__(self): 

612 return self._size 

613 

614 

615# Possible exit status for a task 

616TASK_DONE = "Done" 

617TASK_ERROR = "Error" 

618TASK_PENDING = "Pending" 

619 

620 

621############################################################################### 

622# CPU count that works also when multiprocessing has been disabled via 

623# the JOBLIB_MULTIPROCESSING environment variable 

624def cpu_count(only_physical_cores=False): 

625 """Return the number of CPUs. 

626 

627 This delegates to loky.cpu_count that takes into account additional 

628 constraints such as Linux CFS scheduler quotas (typically set by container 

629 runtimes such as docker) and CPU affinity (for instance using the taskset 

630 command on Linux). 

631 

632 If only_physical_cores is True, do not take hyperthreading / SMT logical 

633 cores into account. 

634 """ 

635 if mp is None: 

636 return 1 

637 

638 return loky.cpu_count(only_physical_cores=only_physical_cores) 

639 

640 

641############################################################################### 

642# For verbosity 

643 

644def _verbosity_filter(index, verbose): 

645 """ Returns False for indices increasingly apart, the distance 

646 depending on the value of verbose. 

647 

648 We use a lag increasing as the square of index 

649 """ 

650 if not verbose: 

651 return True 

652 elif verbose > 10: 

653 return False 

654 if index == 0: 

655 return False 

656 verbose = .5 * (11 - verbose) ** 2 

657 scale = sqrt(index / verbose) 

658 next_scale = sqrt((index + 1) / verbose) 

659 return (int(next_scale) == int(scale)) 

660 

661 

662############################################################################### 

663def delayed(function): 

664 """Decorator used to capture the arguments of a function.""" 

665 

666 def delayed_function(*args, **kwargs): 

667 return function, args, kwargs 

668 try: 

669 delayed_function = functools.wraps(function)(delayed_function) 

670 except AttributeError: 

671 " functools.wraps fails on some callable objects " 

672 return delayed_function 

673 

674 

675############################################################################### 

676class BatchCompletionCallBack(object): 

677 """Callback to keep track of completed results and schedule the next tasks. 

678 

679 This callable is executed by the parent process whenever a worker process 

680 has completed a batch of tasks. 

681 

682 It is used for progress reporting, to update estimate of the batch 

683 processing duration and to schedule the next batch of tasks to be 

684 processed. 

685 

686 It is assumed that this callback will always be triggered by the backend 

687 right after the end of a task, in case of success as well as in case of 

688 failure. 

689 """ 

690 

691 ########################################################################## 

692 # METHODS CALLED BY THE MAIN THREAD # 

693 ########################################################################## 

694 def __init__(self, dispatch_timestamp, batch_size, parallel): 

695 self.dispatch_timestamp = dispatch_timestamp 

696 self.batch_size = batch_size 

697 self.parallel = parallel 

698 self.parallel_call_id = parallel._call_id 

699 

700 # Internals to keep track of the status and outcome of the task. 

701 

702 # Used to hold a reference to the future-like object returned by the 

703 # backend after launching this task 

704 # This will be set later when calling `register_job`, as it is only 

705 # created once the task has been submitted. 

706 self.job = None 

707 

708 if not parallel._backend.supports_retrieve_callback: 

709 # The status is only used for asynchronous result retrieval in the 

710 # callback. 

711 self.status = None 

712 else: 

713 # The initial status for the job is TASK_PENDING. 

714 # Once it is done, it will be either TASK_DONE, or TASK_ERROR. 

715 self.status = TASK_PENDING 

716 

717 def register_job(self, job): 

718 """Register the object returned by `apply_async`.""" 

719 self.job = job 

720 

721 def get_result(self, timeout): 

722 """Returns the raw result of the task that was submitted. 

723 

724 If the task raised an exception rather than returning, this same 

725 exception will be raised instead. 

726 

727 If the backend supports the retrieval callback, it is assumed that this 

728 method is only called after the result has been registered. It is 

729 ensured by checking that `self.status(timeout)` does not return 

730 TASK_PENDING. In this case, `get_result` directly returns the 

731 registered result (or raise the registered exception). 

732 

733 For other backends, there are no such assumptions, but `get_result` 

734 still needs to synchronously retrieve the result before it can 

735 return it or raise. It will block at most `self.timeout` seconds 

736 waiting for retrieval to complete, after that it raises a TimeoutError. 

737 """ 

738 

739 backend = self.parallel._backend 

740 

741 if backend.supports_retrieve_callback: 

742 # We assume that the result has already been retrieved by the 

743 # callback thread, and is stored internally. It's just waiting to 

744 # be returned. 

745 return self._return_or_raise() 

746 

747 # For other backends, the main thread needs to run the retrieval step. 

748 try: 

749 if backend.supports_timeout: 

750 result = self.job.get(timeout=timeout) 

751 else: 

752 result = self.job.get() 

753 outcome = dict(result=result, status=TASK_DONE) 

754 except BaseException as e: 

755 outcome = dict(result=e, status=TASK_ERROR) 

756 self._register_outcome(outcome) 

757 

758 return self._return_or_raise() 

759 

760 def _return_or_raise(self): 

761 try: 

762 if self.status == TASK_ERROR: 

763 raise self._result 

764 return self._result 

765 finally: 

766 del self._result 

767 

768 def get_status(self, timeout): 

769 """Get the status of the task. 

770 

771 This function also checks if the timeout has been reached and register 

772 the TimeoutError outcome when it is the case. 

773 """ 

774 if timeout is None or self.status != TASK_PENDING: 

775 return self.status 

776 

777 # The computation are running and the status is pending. 

778 # Check that we did not wait for this jobs more than `timeout`. 

779 now = time.time() 

780 if not hasattr(self, "_completion_timeout_counter"): 

781 self._completion_timeout_counter = now 

782 

783 if (now - self._completion_timeout_counter) > timeout: 

784 outcome = dict(result=TimeoutError(), status=TASK_ERROR) 

785 self._register_outcome(outcome) 

786 

787 return self.status 

788 

789 ########################################################################## 

790 # METHODS CALLED BY CALLBACK THREADS # 

791 ########################################################################## 

792 def __call__(self, out): 

793 """Function called by the callback thread after a job is completed.""" 

794 

795 # If the backend doesn't support callback retrievals, the next batch of 

796 # tasks is dispatched regardless. The result will be retrieved by the 

797 # main thread when calling `get_result`. 

798 if not self.parallel._backend.supports_retrieve_callback: 

799 self._dispatch_new() 

800 return 

801 

802 # If the backend supports retrieving the result in the callback, it 

803 # registers the task outcome (TASK_ERROR or TASK_DONE), and schedules 

804 # the next batch if needed. 

805 with self.parallel._lock: 

806 # Edge case where while the task was processing, the `parallel` 

807 # instance has been reset and a new call has been issued, but the 

808 # worker managed to complete the task and trigger this callback 

809 # call just before being aborted by the reset. 

810 if self.parallel._call_id != self.parallel_call_id: 

811 return 

812 

813 # When aborting, stop as fast as possible and do not retrieve the 

814 # result as it won't be returned by the Parallel call. 

815 if self.parallel._aborting: 

816 return 

817 

818 # Retrieves the result of the task in the main process and dispatch 

819 # a new batch if needed. 

820 job_succeeded = self._retrieve_result(out) 

821 

822 if not self.parallel.return_ordered: 

823 # Append the job to the queue in the order of completion 

824 # instead of submission. 

825 self.parallel._jobs.append(self) 

826 

827 if job_succeeded: 

828 self._dispatch_new() 

829 

830 def _dispatch_new(self): 

831 """Schedule the next batch of tasks to be processed.""" 

832 

833 # This steps ensure that auto-batching works as expected. 

834 this_batch_duration = time.time() - self.dispatch_timestamp 

835 self.parallel._backend.batch_completed(self.batch_size, 

836 this_batch_duration) 

837 

838 # Schedule the next batch of tasks. 

839 with self.parallel._lock: 

840 self.parallel.n_completed_tasks += self.batch_size 

841 self.parallel.print_progress() 

842 if self.parallel._original_iterator is not None: 

843 self.parallel.dispatch_next() 

844 

845 def _retrieve_result(self, out): 

846 """Fetch and register the outcome of a task. 

847 

848 Return True if the task succeeded, False otherwise. 

849 This function is only called by backends that support retrieving 

850 the task result in the callback thread. 

851 """ 

852 try: 

853 result = self.parallel._backend.retrieve_result_callback(out) 

854 outcome = dict(status=TASK_DONE, result=result) 

855 except BaseException as e: 

856 # Avoid keeping references to parallel in the error. 

857 e.__traceback__ = None 

858 outcome = dict(result=e, status=TASK_ERROR) 

859 

860 self._register_outcome(outcome) 

861 return outcome['status'] != TASK_ERROR 

862 

863 ########################################################################## 

864 # This method can be called either in the main thread # 

865 # or in the callback thread. # 

866 ########################################################################## 

867 def _register_outcome(self, outcome): 

868 """Register the outcome of a task. 

869 

870 This method can be called only once, future calls will be ignored. 

871 """ 

872 # Covers the edge case where the main thread tries to register a 

873 # `TimeoutError` while the callback thread tries to register a result 

874 # at the same time. 

875 with self.parallel._lock: 

876 if self.status not in (TASK_PENDING, None): 

877 return 

878 self.status = outcome["status"] 

879 

880 self._result = outcome["result"] 

881 

882 # Once the result and the status are extracted, the last reference to 

883 # the job can be deleted. 

884 self.job = None 

885 

886 # As soon as an error as been spotted, early stopping flags are sent to 

887 # the `parallel` instance. 

888 if self.status == TASK_ERROR: 

889 self.parallel._exception = True 

890 self.parallel._aborting = True 

891 

892 

893############################################################################### 

894def register_parallel_backend(name, factory, make_default=False): 

895 """Register a new Parallel backend factory. 

896 

897 The new backend can then be selected by passing its name as the backend 

898 argument to the :class:`~Parallel` class. Moreover, the default backend can 

899 be overwritten globally by setting make_default=True. 

900 

901 The factory can be any callable that takes no argument and return an 

902 instance of ``ParallelBackendBase``. 

903 

904 Warning: this function is experimental and subject to change in a future 

905 version of joblib. 

906 

907 .. versionadded:: 0.10 

908 """ 

909 BACKENDS[name] = factory 

910 if make_default: 

911 global DEFAULT_BACKEND 

912 DEFAULT_BACKEND = name 

913 

914 

915def effective_n_jobs(n_jobs=-1): 

916 """Determine the number of jobs that can actually run in parallel 

917 

918 n_jobs is the number of workers requested by the callers. Passing n_jobs=-1 

919 means requesting all available workers for instance matching the number of 

920 CPU cores on the worker host(s). 

921 

922 This method should return a guesstimate of the number of workers that can 

923 actually perform work concurrently with the currently enabled default 

924 backend. The primary use case is to make it possible for the caller to know 

925 in how many chunks to slice the work. 

926 

927 In general working on larger data chunks is more efficient (less scheduling 

928 overhead and better use of CPU cache prefetching heuristics) as long as all 

929 the workers have enough work to do. 

930 

931 Warning: this function is experimental and subject to change in a future 

932 version of joblib. 

933 

934 .. versionadded:: 0.10 

935 """ 

936 if n_jobs == 1: 

937 return 1 

938 

939 backend, backend_n_jobs = get_active_backend() 

940 if n_jobs is None: 

941 n_jobs = backend_n_jobs 

942 return backend.effective_n_jobs(n_jobs=n_jobs) 

943 

944 

945############################################################################### 

946class Parallel(Logger): 

947 ''' Helper class for readable parallel mapping. 

948 

949 Read more in the :ref:`User Guide <parallel>`. 

950 

951 Parameters 

952 ---------- 

953 n_jobs: int, default=None 

954 The maximum number of concurrently running jobs, such as the number 

955 of Python worker processes when ``backend="loky"`` or the size of 

956 the thread-pool when ``backend="threading"``. 

957 This argument is converted to an integer, rounded below for float. 

958 If -1 is given, `joblib` tries to use all CPUs. The number of CPUs 

959 ``n_cpus`` is obtained with :func:`~cpu_count`. 

960 For n_jobs below -1, (n_cpus + 1 + n_jobs) are used. For instance, 

961 using ``n_jobs=-2`` will result in all CPUs but one being used. 

962 This argument can also go above ``n_cpus``, which will cause 

963 oversubscription. In some cases, slight oversubscription can be 

964 beneficial, e.g., for tasks with large I/O operations. 

965 If 1 is given, no parallel computing code is used at all, and the 

966 behavior amounts to a simple python `for` loop. This mode is not 

967 compatible with ``timeout``. 

968 None is a marker for 'unset' that will be interpreted as n_jobs=1 

969 unless the call is performed under a :func:`~parallel_config` 

970 context manager that sets another value for ``n_jobs``. 

971 If n_jobs = 0 then a ValueError is raised. 

972 backend: str, ParallelBackendBase instance or None, default='loky' 

973 Specify the parallelization backend implementation. 

974 Supported backends are: 

975 

976 - "loky" used by default, can induce some 

977 communication and memory overhead when exchanging input and 

978 output data with the worker Python processes. On some rare 

979 systems (such as Pyiodide), the loky backend may not be 

980 available. 

981 - "multiprocessing" previous process-based backend based on 

982 `multiprocessing.Pool`. Less robust than `loky`. 

983 - "threading" is a very low-overhead backend but it suffers 

984 from the Python Global Interpreter Lock if the called function 

985 relies a lot on Python objects. "threading" is mostly useful 

986 when the execution bottleneck is a compiled extension that 

987 explicitly releases the GIL (for instance a Cython loop wrapped 

988 in a "with nogil" block or an expensive call to a library such 

989 as NumPy). 

990 - finally, you can register backends by calling 

991 :func:`~register_parallel_backend`. This will allow you to 

992 implement a backend of your liking. 

993 

994 It is not recommended to hard-code the backend name in a call to 

995 :class:`~Parallel` in a library. Instead it is recommended to set 

996 soft hints (prefer) or hard constraints (require) so as to make it 

997 possible for library users to change the backend from the outside 

998 using the :func:`~parallel_config` context manager. 

999 return_as: str in {'list', 'generator', 'generator_unordered'}, default='list' 

1000 If 'list', calls to this instance will return a list, only when 

1001 all results have been processed and retrieved. 

1002 If 'generator', it will return a generator that yields the results 

1003 as soon as they are available, in the order the tasks have been 

1004 submitted with. 

1005 If 'generator_unordered', the generator will immediately yield 

1006 available results independently of the submission order. The output 

1007 order is not deterministic in this case because it depends on the 

1008 concurrency of the workers. 

1009 prefer: str in {'processes', 'threads'} or None, default=None 

1010 Soft hint to choose the default backend if no specific backend 

1011 was selected with the :func:`~parallel_config` context manager. 

1012 The default process-based backend is 'loky' and the default 

1013 thread-based backend is 'threading'. Ignored if the ``backend`` 

1014 parameter is specified. 

1015 require: 'sharedmem' or None, default=None 

1016 Hard constraint to select the backend. If set to 'sharedmem', 

1017 the selected backend will be single-host and thread-based even 

1018 if the user asked for a non-thread based backend with 

1019 :func:`~joblib.parallel_config`. 

1020 verbose: int, default=0 

1021 The verbosity level: if non zero, progress messages are 

1022 printed. Above 50, the output is sent to stdout. 

1023 The frequency of the messages increases with the verbosity level. 

1024 If it more than 10, all iterations are reported. 

1025 timeout: float or None, default=None 

1026 Timeout limit for each task to complete. If any task takes longer 

1027 a TimeOutError will be raised. Only applied when n_jobs != 1 

1028 pre_dispatch: {'all', integer, or expression, as in '3*n_jobs'}, default='2*n_jobs' 

1029 The number of batches (of tasks) to be pre-dispatched. 

1030 Default is '2*n_jobs'. When batch_size="auto" this is reasonable 

1031 default and the workers should never starve. Note that only basic 

1032 arithmetics are allowed here and no modules can be used in this 

1033 expression. 

1034 batch_size: int or 'auto', default='auto' 

1035 The number of atomic tasks to dispatch at once to each 

1036 worker. When individual evaluations are very fast, dispatching 

1037 calls to workers can be slower than sequential computation because 

1038 of the overhead. Batching fast computations together can mitigate 

1039 this. 

1040 The ``'auto'`` strategy keeps track of the time it takes for a 

1041 batch to complete, and dynamically adjusts the batch size to keep 

1042 the time on the order of half a second, using a heuristic. The 

1043 initial batch size is 1. 

1044 ``batch_size="auto"`` with ``backend="threading"`` will dispatch 

1045 batches of a single task at a time as the threading backend has 

1046 very little overhead and using larger batch size has not proved to 

1047 bring any gain in that case. 

1048 temp_folder: str or None, default=None 

1049 Folder to be used by the pool for memmapping large arrays 

1050 for sharing memory with worker processes. If None, this will try in 

1051 order: 

1052 

1053 - a folder pointed by the JOBLIB_TEMP_FOLDER environment 

1054 variable, 

1055 - /dev/shm if the folder exists and is writable: this is a 

1056 RAM disk filesystem available by default on modern Linux 

1057 distributions, 

1058 - the default system temporary folder that can be 

1059 overridden with TMP, TMPDIR or TEMP environment 

1060 variables, typically /tmp under Unix operating systems. 

1061 

1062 Only active when ``backend="loky"`` or ``"multiprocessing"``. 

1063 max_nbytes int, str, or None, optional, default='1M' 

1064 Threshold on the size of arrays passed to the workers that 

1065 triggers automated memory mapping in temp_folder. Can be an int 

1066 in Bytes, or a human-readable string, e.g., '1M' for 1 megabyte. 

1067 Use None to disable memmapping of large arrays. 

1068 Only active when ``backend="loky"`` or ``"multiprocessing"``. 

1069 mmap_mode: {None, 'r+', 'r', 'w+', 'c'}, default='r' 

1070 Memmapping mode for numpy arrays passed to workers. None will 

1071 disable memmapping, other modes defined in the numpy.memmap doc: 

1072 https://numpy.org/doc/stable/reference/generated/numpy.memmap.html 

1073 Also, see 'max_nbytes' parameter documentation for more details. 

1074 

1075 Notes 

1076 ----- 

1077 

1078 This object uses workers to compute in parallel the application of a 

1079 function to many different arguments. The main functionality it brings 

1080 in addition to using the raw multiprocessing or concurrent.futures API 

1081 are (see examples for details): 

1082 

1083 * More readable code, in particular since it avoids 

1084 constructing list of arguments. 

1085 

1086 * Easier debugging: 

1087 - informative tracebacks even when the error happens on 

1088 the client side 

1089 - using 'n_jobs=1' enables to turn off parallel computing 

1090 for debugging without changing the codepath 

1091 - early capture of pickling errors 

1092 

1093 * An optional progress meter. 

1094 

1095 * Interruption of multiprocesses jobs with 'Ctrl-C' 

1096 

1097 * Flexible pickling control for the communication to and from 

1098 the worker processes. 

1099 

1100 * Ability to use shared memory efficiently with worker 

1101 processes for large numpy-based datastructures. 

1102 

1103 Note that the intended usage is to run one call at a time. Multiple 

1104 calls to the same Parallel object will result in a ``RuntimeError`` 

1105 

1106 Examples 

1107 -------- 

1108 

1109 A simple example: 

1110 

1111 >>> from math import sqrt 

1112 >>> from joblib import Parallel, delayed 

1113 >>> Parallel(n_jobs=1)(delayed(sqrt)(i**2) for i in range(10)) 

1114 [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0] 

1115 

1116 Reshaping the output when the function has several return 

1117 values: 

1118 

1119 >>> from math import modf 

1120 >>> from joblib import Parallel, delayed 

1121 >>> r = Parallel(n_jobs=1)(delayed(modf)(i/2.) for i in range(10)) 

1122 >>> res, i = zip(*r) 

1123 >>> res 

1124 (0.0, 0.5, 0.0, 0.5, 0.0, 0.5, 0.0, 0.5, 0.0, 0.5) 

1125 >>> i 

1126 (0.0, 0.0, 1.0, 1.0, 2.0, 2.0, 3.0, 3.0, 4.0, 4.0) 

1127 

1128 The progress meter: the higher the value of `verbose`, the more 

1129 messages: 

1130 

1131 >>> from time import sleep 

1132 >>> from joblib import Parallel, delayed 

1133 >>> r = Parallel(n_jobs=2, verbose=10)( 

1134 ... delayed(sleep)(.2) for _ in range(10)) #doctest: +SKIP 

1135 [Parallel(n_jobs=2)]: Done 1 tasks | elapsed: 0.6s 

1136 [Parallel(n_jobs=2)]: Done 4 tasks | elapsed: 0.8s 

1137 [Parallel(n_jobs=2)]: Done 10 out of 10 | elapsed: 1.4s finished 

1138 

1139 Traceback example, note how the line of the error is indicated 

1140 as well as the values of the parameter passed to the function that 

1141 triggered the exception, even though the traceback happens in the 

1142 child process: 

1143 

1144 >>> from heapq import nlargest 

1145 >>> from joblib import Parallel, delayed 

1146 >>> Parallel(n_jobs=2)( 

1147 ... delayed(nlargest)(2, n) for n in (range(4), 'abcde', 3)) 

1148 ... # doctest: +SKIP 

1149 ----------------------------------------------------------------------- 

1150 Sub-process traceback: 

1151 ----------------------------------------------------------------------- 

1152 TypeError Mon Nov 12 11:37:46 2012 

1153 PID: 12934 Python 2.7.3: /usr/bin/python 

1154 ........................................................................ 

1155 /usr/lib/python2.7/heapq.pyc in nlargest(n=2, iterable=3, key=None) 

1156 419 if n >= size: 

1157 420 return sorted(iterable, key=key, reverse=True)[:n] 

1158 421 

1159 422 # When key is none, use simpler decoration 

1160 423 if key is None: 

1161 --> 424 it = izip(iterable, count(0,-1)) # decorate 

1162 425 result = _nlargest(n, it) 

1163 426 return map(itemgetter(0), result) # undecorate 

1164 427 

1165 428 # General case, slowest method 

1166 TypeError: izip argument #1 must support iteration 

1167 _______________________________________________________________________ 

1168 

1169 

1170 Using pre_dispatch in a producer/consumer situation, where the 

1171 data is generated on the fly. Note how the producer is first 

1172 called 3 times before the parallel loop is initiated, and then 

1173 called to generate new data on the fly: 

1174 

1175 >>> from math import sqrt 

1176 >>> from joblib import Parallel, delayed 

1177 >>> def producer(): 

1178 ... for i in range(6): 

1179 ... print('Produced %s' % i) 

1180 ... yield i 

1181 >>> out = Parallel(n_jobs=2, verbose=100, pre_dispatch='1.5*n_jobs')( 

1182 ... delayed(sqrt)(i) for i in producer()) #doctest: +SKIP 

1183 Produced 0 

1184 Produced 1 

1185 Produced 2 

1186 [Parallel(n_jobs=2)]: Done 1 jobs | elapsed: 0.0s 

1187 Produced 3 

1188 [Parallel(n_jobs=2)]: Done 2 jobs | elapsed: 0.0s 

1189 Produced 4 

1190 [Parallel(n_jobs=2)]: Done 3 jobs | elapsed: 0.0s 

1191 Produced 5 

1192 [Parallel(n_jobs=2)]: Done 4 jobs | elapsed: 0.0s 

1193 [Parallel(n_jobs=2)]: Done 6 out of 6 | elapsed: 0.0s remaining: 0.0s 

1194 [Parallel(n_jobs=2)]: Done 6 out of 6 | elapsed: 0.0s finished 

1195 

1196 ''' # noqa: E501 

1197 def __init__( 

1198 self, 

1199 n_jobs=default_parallel_config["n_jobs"], 

1200 backend=default_parallel_config['backend'], 

1201 return_as="list", 

1202 verbose=default_parallel_config["verbose"], 

1203 timeout=None, 

1204 pre_dispatch='2 * n_jobs', 

1205 batch_size='auto', 

1206 temp_folder=default_parallel_config["temp_folder"], 

1207 max_nbytes=default_parallel_config["max_nbytes"], 

1208 mmap_mode=default_parallel_config["mmap_mode"], 

1209 prefer=default_parallel_config["prefer"], 

1210 require=default_parallel_config["require"], 

1211 ): 

1212 # Initiate parent Logger class state 

1213 super().__init__() 

1214 

1215 # Interpret n_jobs=None as 'unset' 

1216 if n_jobs is None: 

1217 n_jobs = default_parallel_config["n_jobs"] 

1218 

1219 active_backend, context_config = _get_active_backend( 

1220 prefer=prefer, require=require, verbose=verbose 

1221 ) 

1222 

1223 nesting_level = active_backend.nesting_level 

1224 

1225 self.verbose = _get_config_param(verbose, context_config, "verbose") 

1226 self.timeout = timeout 

1227 self.pre_dispatch = pre_dispatch 

1228 

1229 if return_as not in {"list", "generator", "generator_unordered"}: 

1230 raise ValueError( 

1231 'Expected `return_as` parameter to be a string equal to "list"' 

1232 f',"generator" or "generator_unordered", but got {return_as} ' 

1233 "instead." 

1234 ) 

1235 self.return_as = return_as 

1236 self.return_generator = return_as != "list" 

1237 self.return_ordered = return_as != "generator_unordered" 

1238 

1239 # Check if we are under a parallel_config or parallel_backend 

1240 # context manager and use the config from the context manager 

1241 # for arguments that are not explicitly set. 

1242 self._backend_args = { 

1243 k: _get_config_param(param, context_config, k) for param, k in [ 

1244 (max_nbytes, "max_nbytes"), 

1245 (temp_folder, "temp_folder"), 

1246 (mmap_mode, "mmap_mode"), 

1247 (prefer, "prefer"), 

1248 (require, "require"), 

1249 (verbose, "verbose"), 

1250 ] 

1251 } 

1252 

1253 if isinstance(self._backend_args["max_nbytes"], str): 

1254 self._backend_args["max_nbytes"] = memstr_to_bytes( 

1255 self._backend_args["max_nbytes"] 

1256 ) 

1257 self._backend_args["verbose"] = max( 

1258 0, self._backend_args["verbose"] - 50 

1259 ) 

1260 

1261 if DEFAULT_MP_CONTEXT is not None: 

1262 self._backend_args['context'] = DEFAULT_MP_CONTEXT 

1263 elif hasattr(mp, "get_context"): 

1264 self._backend_args['context'] = mp.get_context() 

1265 

1266 if backend is default_parallel_config['backend'] or backend is None: 

1267 backend = active_backend 

1268 

1269 elif isinstance(backend, ParallelBackendBase): 

1270 # Use provided backend as is, with the current nesting_level if it 

1271 # is not set yet. 

1272 if backend.nesting_level is None: 

1273 backend.nesting_level = nesting_level 

1274 

1275 elif hasattr(backend, 'Pool') and hasattr(backend, 'Lock'): 

1276 # Make it possible to pass a custom multiprocessing context as 

1277 # backend to change the start method to forkserver or spawn or 

1278 # preload modules on the forkserver helper process. 

1279 self._backend_args['context'] = backend 

1280 backend = MultiprocessingBackend(nesting_level=nesting_level) 

1281 

1282 elif backend not in BACKENDS and backend in MAYBE_AVAILABLE_BACKENDS: 

1283 warnings.warn( 

1284 f"joblib backend '{backend}' is not available on " 

1285 f"your system, falling back to {DEFAULT_BACKEND}.", 

1286 UserWarning, 

1287 stacklevel=2) 

1288 BACKENDS[backend] = BACKENDS[DEFAULT_BACKEND] 

1289 backend = BACKENDS[DEFAULT_BACKEND](nesting_level=nesting_level) 

1290 

1291 else: 

1292 try: 

1293 backend_factory = BACKENDS[backend] 

1294 except KeyError as e: 

1295 raise ValueError("Invalid backend: %s, expected one of %r" 

1296 % (backend, sorted(BACKENDS.keys()))) from e 

1297 backend = backend_factory(nesting_level=nesting_level) 

1298 

1299 n_jobs = _get_config_param(n_jobs, context_config, "n_jobs") 

1300 if n_jobs is None: 

1301 # No specific context override and no specific value request: 

1302 # default to the default of the backend. 

1303 n_jobs = backend.default_n_jobs 

1304 try: 

1305 n_jobs = int(n_jobs) 

1306 except ValueError: 

1307 raise ValueError("n_jobs could not be converted to int") 

1308 self.n_jobs = n_jobs 

1309 

1310 if (require == 'sharedmem' and 

1311 not getattr(backend, 'supports_sharedmem', False)): 

1312 raise ValueError("Backend %s does not support shared memory" 

1313 % backend) 

1314 

1315 if (batch_size == 'auto' or isinstance(batch_size, Integral) and 

1316 batch_size > 0): 

1317 self.batch_size = batch_size 

1318 else: 

1319 raise ValueError( 

1320 "batch_size must be 'auto' or a positive integer, got: %r" 

1321 % batch_size) 

1322 

1323 if not isinstance(backend, SequentialBackend): 

1324 if self.return_generator and not backend.supports_return_generator: 

1325 raise ValueError( 

1326 "Backend {} does not support " 

1327 "return_as={}".format(backend, return_as) 

1328 ) 

1329 # This lock is used to coordinate the main thread of this process 

1330 # with the async callback thread of our the pool. 

1331 self._lock = threading.RLock() 

1332 self._jobs = collections.deque() 

1333 self._pending_outputs = list() 

1334 self._ready_batches = queue.Queue() 

1335 self._reducer_callback = None 

1336 

1337 # Internal variables 

1338 self._backend = backend 

1339 self._running = False 

1340 self._managed_backend = False 

1341 self._id = uuid4().hex 

1342 self._call_ref = None 

1343 

1344 def __enter__(self): 

1345 self._managed_backend = True 

1346 self._calling = False 

1347 self._initialize_backend() 

1348 return self 

1349 

1350 def __exit__(self, exc_type, exc_value, traceback): 

1351 self._managed_backend = False 

1352 if self.return_generator and self._calling: 

1353 self._abort() 

1354 self._terminate_and_reset() 

1355 

1356 def _initialize_backend(self): 

1357 """Build a process or thread pool and return the number of workers""" 

1358 try: 

1359 n_jobs = self._backend.configure(n_jobs=self.n_jobs, parallel=self, 

1360 **self._backend_args) 

1361 if self.timeout is not None and not self._backend.supports_timeout: 

1362 warnings.warn( 

1363 'The backend class {!r} does not support timeout. ' 

1364 "You have set 'timeout={}' in Parallel but " 

1365 "the 'timeout' parameter will not be used.".format( 

1366 self._backend.__class__.__name__, 

1367 self.timeout)) 

1368 

1369 except FallbackToBackend as e: 

1370 # Recursively initialize the backend in case of requested fallback. 

1371 self._backend = e.backend 

1372 n_jobs = self._initialize_backend() 

1373 

1374 return n_jobs 

1375 

1376 def _effective_n_jobs(self): 

1377 if self._backend: 

1378 return self._backend.effective_n_jobs(self.n_jobs) 

1379 return 1 

1380 

1381 def _terminate_and_reset(self): 

1382 if hasattr(self._backend, 'stop_call') and self._calling: 

1383 self._backend.stop_call() 

1384 self._calling = False 

1385 if not self._managed_backend: 

1386 self._backend.terminate() 

1387 

1388 def _dispatch(self, batch): 

1389 """Queue the batch for computing, with or without multiprocessing 

1390 

1391 WARNING: this method is not thread-safe: it should be only called 

1392 indirectly via dispatch_one_batch. 

1393 

1394 """ 

1395 # If job.get() catches an exception, it closes the queue: 

1396 if self._aborting: 

1397 return 

1398 

1399 batch_size = len(batch) 

1400 

1401 self.n_dispatched_tasks += batch_size 

1402 self.n_dispatched_batches += 1 

1403 

1404 dispatch_timestamp = time.time() 

1405 

1406 batch_tracker = BatchCompletionCallBack( 

1407 dispatch_timestamp, batch_size, self 

1408 ) 

1409 

1410 if self.return_ordered: 

1411 self._jobs.append(batch_tracker) 

1412 

1413 # If return_ordered is False, the batch_tracker is not stored in the 

1414 # jobs queue at the time of submission. Instead, it will be appended to 

1415 # the queue by itself as soon as the callback is triggered to be able 

1416 # to return the results in the order of completion. 

1417 

1418 job = self._backend.apply_async(batch, callback=batch_tracker) 

1419 batch_tracker.register_job(job) 

1420 

1421 def dispatch_next(self): 

1422 """Dispatch more data for parallel processing 

1423 

1424 This method is meant to be called concurrently by the multiprocessing 

1425 callback. We rely on the thread-safety of dispatch_one_batch to protect 

1426 against concurrent consumption of the unprotected iterator. 

1427 

1428 """ 

1429 if not self.dispatch_one_batch(self._original_iterator): 

1430 self._iterating = False 

1431 self._original_iterator = None 

1432 

1433 def dispatch_one_batch(self, iterator): 

1434 """Prefetch the tasks for the next batch and dispatch them. 

1435 

1436 The effective size of the batch is computed here. 

1437 If there are no more jobs to dispatch, return False, else return True. 

1438 

1439 The iterator consumption and dispatching is protected by the same 

1440 lock so calling this function should be thread safe. 

1441 

1442 """ 

1443 

1444 if self._aborting: 

1445 return False 

1446 

1447 batch_size = self._get_batch_size() 

1448 

1449 with self._lock: 

1450 # to ensure an even distribution of the workload between workers, 

1451 # we look ahead in the original iterators more than batch_size 

1452 # tasks - However, we keep consuming only one batch at each 

1453 # dispatch_one_batch call. The extra tasks are stored in a local 

1454 # queue, _ready_batches, that is looked-up prior to re-consuming 

1455 # tasks from the origal iterator. 

1456 try: 

1457 tasks = self._ready_batches.get(block=False) 

1458 except queue.Empty: 

1459 # slice the iterator n_jobs * batchsize items at a time. If the 

1460 # slice returns less than that, then the current batchsize puts 

1461 # too much weight on a subset of workers, while other may end 

1462 # up starving. So in this case, re-scale the batch size 

1463 # accordingly to distribute evenly the last items between all 

1464 # workers. 

1465 n_jobs = self._cached_effective_n_jobs 

1466 big_batch_size = batch_size * n_jobs 

1467 

1468 try: 

1469 islice = list(itertools.islice(iterator, big_batch_size)) 

1470 except Exception as e: 

1471 # Handle the fact that the generator of task raised an 

1472 # exception. As this part of the code can be executed in 

1473 # a thread internal to the backend, register a task with 

1474 # an error that will be raised in the user's thread. 

1475 if isinstance(e.__context__, queue.Empty): 

1476 # Suppress the cause of the exception if it is 

1477 # queue.Empty to avoid cluttered traceback. Only do it 

1478 # if the __context__ is really empty to avoid messing 

1479 # with causes of the original error. 

1480 e.__cause__ = None 

1481 batch_tracker = BatchCompletionCallBack( 

1482 0, batch_size, self 

1483 ) 

1484 self._jobs.append(batch_tracker) 

1485 batch_tracker._register_outcome(dict( 

1486 result=e, status=TASK_ERROR 

1487 )) 

1488 return True 

1489 

1490 if len(islice) == 0: 

1491 return False 

1492 elif (iterator is self._original_iterator and 

1493 len(islice) < big_batch_size): 

1494 # We reached the end of the original iterator (unless 

1495 # iterator is the ``pre_dispatch``-long initial slice of 

1496 # the original iterator) -- decrease the batch size to 

1497 # account for potential variance in the batches running 

1498 # time. 

1499 final_batch_size = max(1, len(islice) // (10 * n_jobs)) 

1500 else: 

1501 final_batch_size = max(1, len(islice) // n_jobs) 

1502 

1503 # enqueue n_jobs batches in a local queue 

1504 for i in range(0, len(islice), final_batch_size): 

1505 tasks = BatchedCalls(islice[i:i + final_batch_size], 

1506 self._backend.get_nested_backend(), 

1507 self._reducer_callback, 

1508 self._pickle_cache) 

1509 self._ready_batches.put(tasks) 

1510 

1511 # finally, get one task. 

1512 tasks = self._ready_batches.get(block=False) 

1513 if len(tasks) == 0: 

1514 # No more tasks available in the iterator: tell caller to stop. 

1515 return False 

1516 else: 

1517 self._dispatch(tasks) 

1518 return True 

1519 

1520 def _get_batch_size(self): 

1521 """Returns the effective batch size for dispatch""" 

1522 if self.batch_size == 'auto': 

1523 return self._backend.compute_batch_size() 

1524 else: 

1525 # Fixed batch size strategy 

1526 return self.batch_size 

1527 

1528 def _print(self, msg): 

1529 """Display the message on stout or stderr depending on verbosity""" 

1530 # XXX: Not using the logger framework: need to 

1531 # learn to use logger better. 

1532 if not self.verbose: 

1533 return 

1534 if self.verbose < 50: 

1535 writer = sys.stderr.write 

1536 else: 

1537 writer = sys.stdout.write 

1538 writer(f"[{self}]: {msg}\n") 

1539 

1540 def _is_completed(self): 

1541 """Check if all tasks have been completed""" 

1542 return self.n_completed_tasks == self.n_dispatched_tasks and not ( 

1543 self._iterating or self._aborting 

1544 ) 

1545 

1546 def print_progress(self): 

1547 """Display the process of the parallel execution only a fraction 

1548 of time, controlled by self.verbose. 

1549 """ 

1550 

1551 if not self.verbose: 

1552 return 

1553 

1554 elapsed_time = time.time() - self._start_time 

1555 

1556 if self._is_completed(): 

1557 # Make sure that we get a last message telling us we are done 

1558 self._print( 

1559 f"Done {self.n_completed_tasks:3d} out of " 

1560 f"{self.n_completed_tasks:3d} | elapsed: " 

1561 f"{short_format_time(elapsed_time)} finished" 

1562 ) 

1563 return 

1564 

1565 # Original job iterator becomes None once it has been fully 

1566 # consumed: at this point we know the total number of jobs and we are 

1567 # able to display an estimation of the remaining time based on already 

1568 # completed jobs. Otherwise, we simply display the number of completed 

1569 # tasks. 

1570 elif self._original_iterator is not None: 

1571 if _verbosity_filter(self.n_dispatched_batches, self.verbose): 

1572 return 

1573 self._print( 

1574 f"Done {self.n_completed_tasks:3d} tasks | elapsed: " 

1575 f"{short_format_time(elapsed_time)}" 

1576 ) 

1577 else: 

1578 index = self.n_completed_tasks 

1579 # We are finished dispatching 

1580 total_tasks = self.n_dispatched_tasks 

1581 # We always display the first loop 

1582 if not index == 0: 

1583 # Display depending on the number of remaining items 

1584 # A message as soon as we finish dispatching, cursor is 0 

1585 cursor = (total_tasks - index + 1 - 

1586 self._pre_dispatch_amount) 

1587 frequency = (total_tasks // self.verbose) + 1 

1588 is_last_item = (index + 1 == total_tasks) 

1589 if (is_last_item or cursor % frequency): 

1590 return 

1591 remaining_time = (elapsed_time / index) * \ 

1592 (self.n_dispatched_tasks - index * 1.0) 

1593 # only display status if remaining time is greater or equal to 0 

1594 self._print( 

1595 f"Done {index:3d} out of {total_tasks:3d} | elapsed: " 

1596 f"{short_format_time(elapsed_time)} remaining: " 

1597 f"{short_format_time(remaining_time)}" 

1598 ) 

1599 

1600 def _abort(self): 

1601 # Stop dispatching new jobs in the async callback thread 

1602 self._aborting = True 

1603 

1604 # If the backend allows it, cancel or kill remaining running 

1605 # tasks without waiting for the results as we will raise 

1606 # the exception we got back to the caller instead of returning 

1607 # any result. 

1608 backend = self._backend 

1609 if (not self._aborted and hasattr(backend, 'abort_everything')): 

1610 # If the backend is managed externally we need to make sure 

1611 # to leave it in a working state to allow for future jobs 

1612 # scheduling. 

1613 ensure_ready = self._managed_backend 

1614 backend.abort_everything(ensure_ready=ensure_ready) 

1615 self._aborted = True 

1616 

1617 def _start(self, iterator, pre_dispatch): 

1618 # Only set self._iterating to True if at least a batch 

1619 # was dispatched. In particular this covers the edge 

1620 # case of Parallel used with an exhausted iterator. If 

1621 # self._original_iterator is None, then this means either 

1622 # that pre_dispatch == "all", n_jobs == 1 or that the first batch 

1623 # was very quick and its callback already dispatched all the 

1624 # remaining jobs. 

1625 self._iterating = False 

1626 if self.dispatch_one_batch(iterator): 

1627 self._iterating = self._original_iterator is not None 

1628 

1629 while self.dispatch_one_batch(iterator): 

1630 pass 

1631 

1632 if pre_dispatch == "all": 

1633 # The iterable was consumed all at once by the above for loop. 

1634 # No need to wait for async callbacks to trigger to 

1635 # consumption. 

1636 self._iterating = False 

1637 

1638 def _get_outputs(self, iterator, pre_dispatch): 

1639 """Iterator returning the tasks' output as soon as they are ready.""" 

1640 dispatch_thread_id = threading.get_ident() 

1641 detach_generator_exit = False 

1642 try: 

1643 self._start(iterator, pre_dispatch) 

1644 # first yield returns None, for internal use only. This ensures 

1645 # that we enter the try/except block and start dispatching the 

1646 # tasks. 

1647 yield 

1648 

1649 with self._backend.retrieval_context(): 

1650 yield from self._retrieve() 

1651 

1652 except GeneratorExit: 

1653 # The generator has been garbage collected before being fully 

1654 # consumed. This aborts the remaining tasks if possible and warn 

1655 # the user if necessary. 

1656 self._exception = True 

1657 

1658 # In some interpreters such as PyPy, GeneratorExit can be raised in 

1659 # a different thread than the one used to start the dispatch of the 

1660 # parallel tasks. This can lead to hang when a thread attempts to 

1661 # join itself. As workaround, we detach the execution of the 

1662 # aborting code to a dedicated thread. We then need to make sure 

1663 # the rest of the function does not call `_terminate_and_reset` 

1664 # in finally. 

1665 if dispatch_thread_id != threading.get_ident(): 

1666 if not IS_PYPY: 

1667 warnings.warn( 

1668 "A generator produced by joblib.Parallel has been " 

1669 "gc'ed in an unexpected thread. This behavior should " 

1670 "not cause major -issues but to make sure, please " 

1671 "report this warning and your use case at " 

1672 "https://github.com/joblib/joblib/issues so it can " 

1673 "be investigated." 

1674 ) 

1675 

1676 detach_generator_exit = True 

1677 _parallel = self 

1678 

1679 class _GeneratorExitThread(threading.Thread): 

1680 def run(self): 

1681 _parallel._abort() 

1682 if _parallel.return_generator: 

1683 _parallel._warn_exit_early() 

1684 _parallel._terminate_and_reset() 

1685 

1686 _GeneratorExitThread( 

1687 name="GeneratorExitThread" 

1688 ).start() 

1689 return 

1690 

1691 # Otherwise, we are in the thread that started the dispatch: we can 

1692 # safely abort the execution and warn the user. 

1693 self._abort() 

1694 if self.return_generator: 

1695 self._warn_exit_early() 

1696 

1697 raise 

1698 

1699 # Note: we catch any BaseException instead of just Exception instances 

1700 # to also include KeyboardInterrupt 

1701 except BaseException: 

1702 self._exception = True 

1703 self._abort() 

1704 raise 

1705 finally: 

1706 # Store the unconsumed tasks and terminate the workers if necessary 

1707 _remaining_outputs = ([] if self._exception else self._jobs) 

1708 self._jobs = collections.deque() 

1709 self._running = False 

1710 if not detach_generator_exit: 

1711 self._terminate_and_reset() 

1712 

1713 while len(_remaining_outputs) > 0: 

1714 batched_results = _remaining_outputs.popleft() 

1715 batched_results = batched_results.get_result(self.timeout) 

1716 for result in batched_results: 

1717 yield result 

1718 

1719 def _wait_retrieval(self): 

1720 """Return True if we need to continue retrieving some tasks.""" 

1721 

1722 # If the input load is still being iterated over, it means that tasks 

1723 # are still on the dispatch waitlist and their results will need to 

1724 # be retrieved later on. 

1725 if self._iterating: 

1726 return True 

1727 

1728 # If some of the dispatched tasks are still being processed by the 

1729 # workers, wait for the compute to finish before starting retrieval 

1730 if self.n_completed_tasks < self.n_dispatched_tasks: 

1731 return True 

1732 

1733 # For backends that does not support retrieving asynchronously the 

1734 # result to the main process, all results must be carefully retrieved 

1735 # in the _retrieve loop in the main thread while the backend is alive. 

1736 # For other backends, the actual retrieval is done asynchronously in 

1737 # the callback thread, and we can terminate the backend before the 

1738 # `self._jobs` result list has been emptied. The remaining results 

1739 # will be collected in the `finally` step of the generator. 

1740 if not self._backend.supports_retrieve_callback: 

1741 if len(self._jobs) > 0: 

1742 return True 

1743 

1744 return False 

1745 

1746 def _retrieve(self): 

1747 while self._wait_retrieval(): 

1748 

1749 # If the callback thread of a worker has signaled that its task 

1750 # triggered an exception, or if the retrieval loop has raised an 

1751 # exception (e.g. `GeneratorExit`), exit the loop and surface the 

1752 # worker traceback. 

1753 if self._aborting: 

1754 self._raise_error_fast() 

1755 break 

1756 

1757 # If the next job is not ready for retrieval yet, we just wait for 

1758 # async callbacks to progress. 

1759 if ((len(self._jobs) == 0) or 

1760 (self._jobs[0].get_status( 

1761 timeout=self.timeout) == TASK_PENDING)): 

1762 time.sleep(0.01) 

1763 continue 

1764 

1765 # We need to be careful: the job list can be filling up as 

1766 # we empty it and Python list are not thread-safe by 

1767 # default hence the use of the lock 

1768 with self._lock: 

1769 batched_results = self._jobs.popleft() 

1770 

1771 # Flatten the batched results to output one output at a time 

1772 batched_results = batched_results.get_result(self.timeout) 

1773 for result in batched_results: 

1774 self._nb_consumed += 1 

1775 yield result 

1776 

1777 def _raise_error_fast(self): 

1778 """If we are aborting, raise if a job caused an error.""" 

1779 

1780 # Find the first job whose status is TASK_ERROR if it exists. 

1781 with self._lock: 

1782 error_job = next((job for job in self._jobs 

1783 if job.status == TASK_ERROR), None) 

1784 

1785 # If this error job exists, immediately raise the error by 

1786 # calling get_result. This job might not exists if abort has been 

1787 # called directly or if the generator is gc'ed. 

1788 if error_job is not None: 

1789 error_job.get_result(self.timeout) 

1790 

1791 def _warn_exit_early(self): 

1792 """Warn the user if the generator is gc'ed before being consumned.""" 

1793 ready_outputs = self.n_completed_tasks - self._nb_consumed 

1794 is_completed = self._is_completed() 

1795 msg = "" 

1796 if ready_outputs: 

1797 msg += ( 

1798 f"{ready_outputs} tasks have been successfully executed " 

1799 " but not used." 

1800 ) 

1801 if not is_completed: 

1802 msg += " Additionally, " 

1803 

1804 if not is_completed: 

1805 msg += ( 

1806 f"{self.n_dispatched_tasks - self.n_completed_tasks} tasks " 

1807 "which were still being processed by the workers have been " 

1808 "cancelled." 

1809 ) 

1810 

1811 if msg: 

1812 msg += ( 

1813 " You could benefit from adjusting the input task " 

1814 "iterator to limit unnecessary computation time." 

1815 ) 

1816 

1817 warnings.warn(msg) 

1818 

1819 def _get_sequential_output(self, iterable): 

1820 """Separate loop for sequential output. 

1821 

1822 This simplifies the traceback in case of errors and reduces the 

1823 overhead of calling sequential tasks with `joblib`. 

1824 """ 

1825 try: 

1826 self._iterating = True 

1827 self._original_iterator = iterable 

1828 batch_size = self._get_batch_size() 

1829 

1830 if batch_size != 1: 

1831 it = iter(iterable) 

1832 iterable_batched = iter( 

1833 lambda: tuple(itertools.islice(it, batch_size)), () 

1834 ) 

1835 iterable = ( 

1836 task for batch in iterable_batched for task in batch 

1837 ) 

1838 

1839 # first yield returns None, for internal use only. This ensures 

1840 # that we enter the try/except block and setup the generator. 

1841 yield None 

1842 

1843 # Sequentially call the tasks and yield the results. 

1844 for func, args, kwargs in iterable: 

1845 self.n_dispatched_batches += 1 

1846 self.n_dispatched_tasks += 1 

1847 res = func(*args, **kwargs) 

1848 self.n_completed_tasks += 1 

1849 self.print_progress() 

1850 yield res 

1851 self._nb_consumed += 1 

1852 except BaseException: 

1853 self._exception = True 

1854 self._aborting = True 

1855 self._aborted = True 

1856 raise 

1857 finally: 

1858 self.print_progress() 

1859 self._running = False 

1860 self._iterating = False 

1861 self._original_iterator = None 

1862 

1863 def _reset_run_tracking(self): 

1864 """Reset the counters and flags used to track the execution.""" 

1865 

1866 # Makes sur the parallel instance was not previously running in a 

1867 # thread-safe way. 

1868 with getattr(self, '_lock', nullcontext()): 

1869 if self._running: 

1870 msg = 'This Parallel instance is already running !' 

1871 if self.return_generator is True: 

1872 msg += ( 

1873 " Before submitting new tasks, you must wait for the " 

1874 "completion of all the previous tasks, or clean all " 

1875 "references to the output generator." 

1876 ) 

1877 raise RuntimeError(msg) 

1878 self._running = True 

1879 

1880 # Counter to keep track of the task dispatched and completed. 

1881 self.n_dispatched_batches = 0 

1882 self.n_dispatched_tasks = 0 

1883 self.n_completed_tasks = 0 

1884 

1885 # Following count is incremented by one each time the user iterates 

1886 # on the output generator, it is used to prepare an informative 

1887 # warning message in case the generator is deleted before all the 

1888 # dispatched tasks have been consumed. 

1889 self._nb_consumed = 0 

1890 

1891 # Following flags are used to synchronize the threads in case one of 

1892 # the tasks error-out to ensure that all workers abort fast and that 

1893 # the backend terminates properly. 

1894 

1895 # Set to True as soon as a worker signals that a task errors-out 

1896 self._exception = False 

1897 # Set to True in case of early termination following an incident 

1898 self._aborting = False 

1899 # Set to True after abortion is complete 

1900 self._aborted = False 

1901 

1902 def __call__(self, iterable): 

1903 """Main function to dispatch parallel tasks.""" 

1904 

1905 self._reset_run_tracking() 

1906 self._start_time = time.time() 

1907 

1908 if not self._managed_backend: 

1909 n_jobs = self._initialize_backend() 

1910 else: 

1911 n_jobs = self._effective_n_jobs() 

1912 

1913 if n_jobs == 1: 

1914 # If n_jobs==1, run the computation sequentially and return 

1915 # immediately to avoid overheads. 

1916 output = self._get_sequential_output(iterable) 

1917 next(output) 

1918 return output if self.return_generator else list(output) 

1919 

1920 # Let's create an ID that uniquely identifies the current call. If the 

1921 # call is interrupted early and that the same instance is immediately 

1922 # re-used, this id will be used to prevent workers that were 

1923 # concurrently finalizing a task from the previous call to run the 

1924 # callback. 

1925 with self._lock: 

1926 self._call_id = uuid4().hex 

1927 

1928 # self._effective_n_jobs should be called in the Parallel.__call__ 

1929 # thread only -- store its value in an attribute for further queries. 

1930 self._cached_effective_n_jobs = n_jobs 

1931 

1932 if isinstance(self._backend, LokyBackend): 

1933 # For the loky backend, we add a callback executed when reducing 

1934 # BatchCalls, that makes the loky executor use a temporary folder 

1935 # specific to this Parallel object when pickling temporary memmaps. 

1936 # This callback is necessary to ensure that several Parallel 

1937 # objects using the same reusable executor don't use the same 

1938 # temporary resources. 

1939 

1940 def _batched_calls_reducer_callback(): 

1941 # Relevant implementation detail: the following lines, called 

1942 # when reducing BatchedCalls, are called in a thread-safe 

1943 # situation, meaning that the context of the temporary folder 

1944 # manager will not be changed in between the callback execution 

1945 # and the end of the BatchedCalls pickling. The reason is that 

1946 # pickling (the only place where set_current_context is used) 

1947 # is done from a single thread (the queue_feeder_thread). 

1948 self._backend._workers._temp_folder_manager.set_current_context( # noqa 

1949 self._id 

1950 ) 

1951 self._reducer_callback = _batched_calls_reducer_callback 

1952 

1953 # self._effective_n_jobs should be called in the Parallel.__call__ 

1954 # thread only -- store its value in an attribute for further queries. 

1955 self._cached_effective_n_jobs = n_jobs 

1956 

1957 backend_name = self._backend.__class__.__name__ 

1958 if n_jobs == 0: 

1959 raise RuntimeError("%s has no active worker." % backend_name) 

1960 

1961 self._print( 

1962 f"Using backend {backend_name} with {n_jobs} concurrent workers." 

1963 ) 

1964 if hasattr(self._backend, 'start_call'): 

1965 self._backend.start_call() 

1966 

1967 # Following flag prevents double calls to `backend.stop_call`. 

1968 self._calling = True 

1969 

1970 iterator = iter(iterable) 

1971 pre_dispatch = self.pre_dispatch 

1972 

1973 if pre_dispatch == 'all': 

1974 # prevent further dispatch via multiprocessing callback thread 

1975 self._original_iterator = None 

1976 self._pre_dispatch_amount = 0 

1977 else: 

1978 self._original_iterator = iterator 

1979 if hasattr(pre_dispatch, 'endswith'): 

1980 pre_dispatch = eval_expr( 

1981 pre_dispatch.replace("n_jobs", str(n_jobs)) 

1982 ) 

1983 self._pre_dispatch_amount = pre_dispatch = int(pre_dispatch) 

1984 

1985 # The main thread will consume the first pre_dispatch items and 

1986 # the remaining items will later be lazily dispatched by async 

1987 # callbacks upon task completions. 

1988 

1989 # TODO: this iterator should be batch_size * n_jobs 

1990 iterator = itertools.islice(iterator, self._pre_dispatch_amount) 

1991 

1992 # Use a caching dict for callables that are pickled with cloudpickle to 

1993 # improve performances. This cache is used only in the case of 

1994 # functions that are defined in the __main__ module, functions that 

1995 # are defined locally (inside another function) and lambda expressions. 

1996 self._pickle_cache = dict() 

1997 

1998 output = self._get_outputs(iterator, pre_dispatch) 

1999 self._call_ref = weakref.ref(output) 

2000 

2001 # The first item from the output is blank, but it makes the interpreter 

2002 # progress until it enters the Try/Except block of the generator and 

2003 # reaches the first `yield` statement. This starts the asynchronous 

2004 # dispatch of the tasks to the workers. 

2005 next(output) 

2006 

2007 return output if self.return_generator else list(output) 

2008 

2009 def __repr__(self): 

2010 return '%s(n_jobs=%s)' % (self.__class__.__name__, self.n_jobs)