Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/joblib/_parallel_backends.py: 35%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

276 statements  

1""" 

2Backends for embarrassingly parallel code. 

3""" 

4 

5import contextlib 

6import gc 

7import os 

8import threading 

9import warnings 

10from abc import ABCMeta, abstractmethod 

11 

12from ._multiprocessing_helpers import mp 

13from ._utils import ( 

14 _retrieve_traceback_capturing_wrapped_call, 

15 _TracebackCapturingWrapper, 

16) 

17 

18if mp is not None: 

19 from multiprocessing.pool import ThreadPool 

20 

21 from .executor import get_memmapping_executor 

22 

23 # Import loky only if multiprocessing is present 

24 from .externals.loky import cpu_count, process_executor 

25 from .externals.loky.process_executor import ShutdownExecutorError 

26 from .pool import MemmappingPool 

27 

28 

29class ParallelBackendBase(metaclass=ABCMeta): 

30 """Helper abc which defines all methods a ParallelBackend must implement""" 

31 

32 default_n_jobs = 1 

33 

34 supports_inner_max_num_threads = False 

35 

36 # This flag was introduced for backward compatibility reasons. 

37 # New backends should always set it to True and implement the 

38 # `retrieve_result_callback` method. 

39 supports_retrieve_callback = False 

40 

41 @property 

42 def supports_return_generator(self): 

43 return self.supports_retrieve_callback 

44 

45 @property 

46 def supports_timeout(self): 

47 return self.supports_retrieve_callback 

48 

49 nesting_level = None 

50 

51 def __init__( 

52 self, nesting_level=None, inner_max_num_threads=None, **backend_kwargs 

53 ): 

54 super().__init__() 

55 self.nesting_level = nesting_level 

56 self.inner_max_num_threads = inner_max_num_threads 

57 self.backend_kwargs = backend_kwargs 

58 

59 MAX_NUM_THREADS_VARS = [ 

60 "OMP_NUM_THREADS", 

61 "OPENBLAS_NUM_THREADS", 

62 "MKL_NUM_THREADS", 

63 "BLIS_NUM_THREADS", 

64 "VECLIB_MAXIMUM_THREADS", 

65 "NUMBA_NUM_THREADS", 

66 "NUMEXPR_NUM_THREADS", 

67 ] 

68 

69 TBB_ENABLE_IPC_VAR = "ENABLE_IPC" 

70 

71 @abstractmethod 

72 def effective_n_jobs(self, n_jobs): 

73 """Determine the number of jobs that can actually run in parallel 

74 

75 n_jobs is the number of workers requested by the callers. Passing 

76 n_jobs=-1 means requesting all available workers for instance matching 

77 the number of CPU cores on the worker host(s). 

78 

79 This method should return a guesstimate of the number of workers that 

80 can actually perform work concurrently. The primary use case is to make 

81 it possible for the caller to know in how many chunks to slice the 

82 work. 

83 

84 In general working on larger data chunks is more efficient (less 

85 scheduling overhead and better use of CPU cache prefetching heuristics) 

86 as long as all the workers have enough work to do. 

87 """ 

88 

89 def apply_async(self, func, callback=None): 

90 """Deprecated: implement `submit` instead.""" 

91 raise NotImplementedError("Implement `submit` instead.") 

92 

93 def submit(self, func, callback=None): 

94 """Schedule a function to be run and return a future-like object. 

95 

96 This method should return a future-like object that allow tracking 

97 the progress of the task. 

98 

99 If ``supports_retrieve_callback`` is False, the return value of this 

100 method is passed to ``retrieve_result`` instead of calling 

101 ``retrieve_result_callback``. 

102 

103 Parameters 

104 ---------- 

105 func: callable 

106 The function to be run in parallel. 

107 

108 callback: callable 

109 A callable that will be called when the task is completed. This callable 

110 is a wrapper around ``retrieve_result_callback``. This should be added 

111 to the future-like object returned by this method, so that the callback 

112 is called when the task is completed. 

113 

114 For future-like backends, this can be achieved with something like 

115 ``future.add_done_callback(callback)``. 

116 

117 Returns 

118 ------- 

119 future: future-like 

120 A future-like object to track the execution of the submitted function. 

121 """ 

122 warnings.warn( 

123 "`apply_async` is deprecated, implement and use `submit` instead.", 

124 DeprecationWarning, 

125 ) 

126 return self.apply_async(func, callback) 

127 

128 def retrieve_result_callback(self, out): 

129 """Called within the callback function passed to `submit`. 

130 

131 This method can customise how the result of the function is retrieved 

132 from the future-like object. 

133 

134 Parameters 

135 ---------- 

136 future: future-like 

137 The future-like object returned by the `submit` method. 

138 

139 Returns 

140 ------- 

141 result: object 

142 The result of the function executed in parallel. 

143 """ 

144 

145 def retrieve_result(self, out, timeout=None): 

146 """Hook to retrieve the result when support_retrieve_callback=False. 

147 

148 The argument `out` is the result of the `submit` call. This method 

149 should return the result of the computation or raise an exception if 

150 the computation failed. 

151 """ 

152 if self.supports_timeout: 

153 return out.get(timeout=timeout) 

154 else: 

155 return out.get() 

156 

157 def configure( 

158 self, n_jobs=1, parallel=None, prefer=None, require=None, **backend_kwargs 

159 ): 

160 """Reconfigure the backend and return the number of workers. 

161 

162 This makes it possible to reuse an existing backend instance for 

163 successive independent calls to Parallel with different parameters. 

164 """ 

165 self.parallel = parallel 

166 return self.effective_n_jobs(n_jobs) 

167 

168 def start_call(self): 

169 """Call-back method called at the beginning of a Parallel call""" 

170 

171 def stop_call(self): 

172 """Call-back method called at the end of a Parallel call""" 

173 

174 def terminate(self): 

175 """Shutdown the workers and free the shared memory.""" 

176 

177 def compute_batch_size(self): 

178 """Determine the optimal batch size""" 

179 return 1 

180 

181 def batch_completed(self, batch_size, duration): 

182 """Callback indicate how long it took to run a batch""" 

183 

184 def abort_everything(self, ensure_ready=True): 

185 """Abort any running tasks 

186 

187 This is called when an exception has been raised when executing a task 

188 and all the remaining tasks will be ignored and can therefore be 

189 aborted to spare computation resources. 

190 

191 If ensure_ready is True, the backend should be left in an operating 

192 state as future tasks might be re-submitted via that same backend 

193 instance. 

194 

195 If ensure_ready is False, the implementer of this method can decide 

196 to leave the backend in a closed / terminated state as no new task 

197 are expected to be submitted to this backend. 

198 

199 Setting ensure_ready to False is an optimization that can be leveraged 

200 when aborting tasks via killing processes from a local process pool 

201 managed by the backend it-self: if we expect no new tasks, there is no 

202 point in re-creating new workers. 

203 """ 

204 # Does nothing by default: to be overridden in subclasses when 

205 # canceling tasks is possible. 

206 pass 

207 

208 def get_nested_backend(self): 

209 """Backend instance to be used by nested Parallel calls. 

210 

211 By default a thread-based backend is used for the first level of 

212 nesting. Beyond, switch to sequential backend to avoid spawning too 

213 many threads on the host. 

214 """ 

215 nesting_level = getattr(self, "nesting_level", 0) + 1 

216 if nesting_level > 1: 

217 return SequentialBackend(nesting_level=nesting_level), None 

218 else: 

219 return ThreadingBackend(nesting_level=nesting_level), None 

220 

221 def _prepare_worker_env(self, n_jobs): 

222 """Return environment variables limiting threadpools in external libs. 

223 

224 This function return a dict containing environment variables to pass 

225 when creating a pool of process. These environment variables limit the 

226 number of threads to `n_threads` for OpenMP, MKL, Accelerated and 

227 OpenBLAS libraries in the child processes. 

228 """ 

229 explicit_n_threads = self.inner_max_num_threads 

230 default_n_threads = max(cpu_count() // n_jobs, 1) 

231 

232 # Set the inner environment variables to self.inner_max_num_threads if 

233 # it is given. Else, default to cpu_count // n_jobs unless the variable 

234 # is already present in the parent process environment. 

235 env = {} 

236 for var in self.MAX_NUM_THREADS_VARS: 

237 if explicit_n_threads is None: 

238 var_value = os.environ.get(var, default_n_threads) 

239 else: 

240 var_value = explicit_n_threads 

241 

242 env[var] = str(var_value) 

243 

244 if self.TBB_ENABLE_IPC_VAR not in os.environ: 

245 # To avoid over-subscription when using TBB, let the TBB schedulers 

246 # use Inter Process Communication to coordinate: 

247 env[self.TBB_ENABLE_IPC_VAR] = "1" 

248 return env 

249 

250 @contextlib.contextmanager 

251 def retrieval_context(self): 

252 """Context manager to manage an execution context. 

253 

254 Calls to Parallel.retrieve will be made inside this context. 

255 

256 By default, this does nothing. It may be useful for subclasses to 

257 handle nested parallelism. In particular, it may be required to avoid 

258 deadlocks if a backend manages a fixed number of workers, when those 

259 workers may be asked to do nested Parallel calls. Without 

260 'retrieval_context' this could lead to deadlock, as all the workers 

261 managed by the backend may be "busy" waiting for the nested parallel 

262 calls to finish, but the backend has no free workers to execute those 

263 tasks. 

264 """ 

265 yield 

266 

267 @staticmethod 

268 def in_main_thread(): 

269 return isinstance(threading.current_thread(), threading._MainThread) 

270 

271 

272class SequentialBackend(ParallelBackendBase): 

273 """A ParallelBackend which will execute all batches sequentially. 

274 

275 Does not use/create any threading objects, and hence has minimal 

276 overhead. Used when n_jobs == 1. 

277 """ 

278 

279 uses_threads = True 

280 supports_timeout = False 

281 supports_retrieve_callback = False 

282 supports_sharedmem = True 

283 

284 def effective_n_jobs(self, n_jobs): 

285 """Determine the number of jobs which are going to run in parallel""" 

286 if n_jobs == 0: 

287 raise ValueError("n_jobs == 0 in Parallel has no meaning") 

288 return 1 

289 

290 def submit(self, func, callback=None): 

291 """Schedule a func to be run""" 

292 raise RuntimeError("Should never be called for SequentialBackend.") 

293 

294 def retrieve_result_callback(self, out): 

295 raise RuntimeError("Should never be called for SequentialBackend.") 

296 

297 def get_nested_backend(self): 

298 # import is not top level to avoid cyclic import errors. 

299 from .parallel import get_active_backend 

300 

301 # SequentialBackend should neither change the nesting level, the 

302 # default backend or the number of jobs. Just return the current one. 

303 return get_active_backend() 

304 

305 

306class PoolManagerMixin(object): 

307 """A helper class for managing pool of workers.""" 

308 

309 _pool = None 

310 

311 def effective_n_jobs(self, n_jobs): 

312 """Determine the number of jobs which are going to run in parallel""" 

313 if n_jobs == 0: 

314 raise ValueError("n_jobs == 0 in Parallel has no meaning") 

315 elif mp is None or n_jobs is None: 

316 # multiprocessing is not available or disabled, fallback 

317 # to sequential mode 

318 return 1 

319 elif n_jobs < 0: 

320 n_jobs = max(cpu_count() + 1 + n_jobs, 1) 

321 return n_jobs 

322 

323 def terminate(self): 

324 """Shutdown the process or thread pool""" 

325 if self._pool is not None: 

326 self._pool.close() 

327 self._pool.terminate() # terminate does a join() 

328 self._pool = None 

329 

330 def _get_pool(self): 

331 """Used by `submit` to make it possible to implement lazy init""" 

332 return self._pool 

333 

334 def submit(self, func, callback=None): 

335 """Schedule a func to be run""" 

336 # Here, we need a wrapper to avoid crashes on KeyboardInterruptErrors. 

337 # We also call the callback on error, to make sure the pool does not 

338 # wait on crashed jobs. 

339 return self._get_pool().apply_async( 

340 _TracebackCapturingWrapper(func), 

341 (), 

342 callback=callback, 

343 error_callback=callback, 

344 ) 

345 

346 def retrieve_result_callback(self, result): 

347 """Mimic concurrent.futures results, raising an error if needed.""" 

348 # In the multiprocessing Pool API, the callback are called with the 

349 # result value as an argument so `result`(`out`) is the output of 

350 # job.get(). It's either the result or the exception raised while 

351 # collecting the result. 

352 return _retrieve_traceback_capturing_wrapped_call(result) 

353 

354 def abort_everything(self, ensure_ready=True): 

355 """Shutdown the pool and restart a new one with the same parameters""" 

356 self.terminate() 

357 if ensure_ready: 

358 self.configure( 

359 n_jobs=self.parallel.n_jobs, 

360 parallel=self.parallel, 

361 **self.parallel._backend_kwargs, 

362 ) 

363 

364 

365class AutoBatchingMixin(object): 

366 """A helper class for automagically batching jobs.""" 

367 

368 # In seconds, should be big enough to hide multiprocessing dispatching 

369 # overhead. 

370 # This settings was found by running benchmarks/bench_auto_batching.py 

371 # with various parameters on various platforms. 

372 MIN_IDEAL_BATCH_DURATION = 0.2 

373 

374 # Should not be too high to avoid stragglers: long jobs running alone 

375 # on a single worker while other workers have no work to process any more. 

376 MAX_IDEAL_BATCH_DURATION = 2 

377 

378 # Batching counters default values 

379 _DEFAULT_EFFECTIVE_BATCH_SIZE = 1 

380 _DEFAULT_SMOOTHED_BATCH_DURATION = 0.0 

381 

382 def __init__(self, **kwargs): 

383 super().__init__(**kwargs) 

384 self._effective_batch_size = self._DEFAULT_EFFECTIVE_BATCH_SIZE 

385 self._smoothed_batch_duration = self._DEFAULT_SMOOTHED_BATCH_DURATION 

386 

387 def compute_batch_size(self): 

388 """Determine the optimal batch size""" 

389 old_batch_size = self._effective_batch_size 

390 batch_duration = self._smoothed_batch_duration 

391 if batch_duration > 0 and batch_duration < self.MIN_IDEAL_BATCH_DURATION: 

392 # The current batch size is too small: the duration of the 

393 # processing of a batch of task is not large enough to hide 

394 # the scheduling overhead. 

395 ideal_batch_size = int( 

396 old_batch_size * self.MIN_IDEAL_BATCH_DURATION / batch_duration 

397 ) 

398 # Multiply by two to limit oscilations between min and max. 

399 ideal_batch_size *= 2 

400 

401 # dont increase the batch size too fast to limit huge batch sizes 

402 # potentially leading to starving worker 

403 batch_size = min(2 * old_batch_size, ideal_batch_size) 

404 

405 batch_size = max(batch_size, 1) 

406 

407 self._effective_batch_size = batch_size 

408 if self.parallel.verbose >= 10: 

409 self.parallel._print( 

410 f"Batch computation too fast ({batch_duration}s.) " 

411 f"Setting batch_size={batch_size}." 

412 ) 

413 elif batch_duration > self.MAX_IDEAL_BATCH_DURATION and old_batch_size >= 2: 

414 # The current batch size is too big. If we schedule overly long 

415 # running batches some CPUs might wait with nothing left to do 

416 # while a couple of CPUs a left processing a few long running 

417 # batches. Better reduce the batch size a bit to limit the 

418 # likelihood of scheduling such stragglers. 

419 

420 # decrease the batch size quickly to limit potential starving 

421 ideal_batch_size = int( 

422 old_batch_size * self.MIN_IDEAL_BATCH_DURATION / batch_duration 

423 ) 

424 # Multiply by two to limit oscilations between min and max. 

425 batch_size = max(2 * ideal_batch_size, 1) 

426 self._effective_batch_size = batch_size 

427 if self.parallel.verbose >= 10: 

428 self.parallel._print( 

429 f"Batch computation too slow ({batch_duration}s.) " 

430 f"Setting batch_size={batch_size}." 

431 ) 

432 else: 

433 # No batch size adjustment 

434 batch_size = old_batch_size 

435 

436 if batch_size != old_batch_size: 

437 # Reset estimation of the smoothed mean batch duration: this 

438 # estimate is updated in the multiprocessing apply_async 

439 # CallBack as long as the batch_size is constant. Therefore 

440 # we need to reset the estimate whenever we re-tune the batch 

441 # size. 

442 self._smoothed_batch_duration = self._DEFAULT_SMOOTHED_BATCH_DURATION 

443 

444 return batch_size 

445 

446 def batch_completed(self, batch_size, duration): 

447 """Callback indicate how long it took to run a batch""" 

448 if batch_size == self._effective_batch_size: 

449 # Update the smoothed streaming estimate of the duration of a batch 

450 # from dispatch to completion 

451 old_duration = self._smoothed_batch_duration 

452 if old_duration == self._DEFAULT_SMOOTHED_BATCH_DURATION: 

453 # First record of duration for this batch size after the last 

454 # reset. 

455 new_duration = duration 

456 else: 

457 # Update the exponentially weighted average of the duration of 

458 # batch for the current effective size. 

459 new_duration = 0.8 * old_duration + 0.2 * duration 

460 self._smoothed_batch_duration = new_duration 

461 

462 def reset_batch_stats(self): 

463 """Reset batch statistics to default values. 

464 

465 This avoids interferences with future jobs. 

466 """ 

467 self._effective_batch_size = self._DEFAULT_EFFECTIVE_BATCH_SIZE 

468 self._smoothed_batch_duration = self._DEFAULT_SMOOTHED_BATCH_DURATION 

469 

470 

471class ThreadingBackend(PoolManagerMixin, ParallelBackendBase): 

472 """A ParallelBackend which will use a thread pool to execute batches in. 

473 

474 This is a low-overhead backend but it suffers from the Python Global 

475 Interpreter Lock if the called function relies a lot on Python objects. 

476 Mostly useful when the execution bottleneck is a compiled extension that 

477 explicitly releases the GIL (for instance a Cython loop wrapped in a "with 

478 nogil" block or an expensive call to a library such as NumPy). 

479 

480 The actual thread pool is lazily initialized: the actual thread pool 

481 construction is delayed to the first call to apply_async. 

482 

483 ThreadingBackend is used as the default backend for nested calls. 

484 """ 

485 

486 supports_retrieve_callback = True 

487 uses_threads = True 

488 supports_sharedmem = True 

489 

490 def configure(self, n_jobs=1, parallel=None, **backend_kwargs): 

491 """Build a process or thread pool and return the number of workers""" 

492 n_jobs = self.effective_n_jobs(n_jobs) 

493 if n_jobs == 1: 

494 # Avoid unnecessary overhead and use sequential backend instead. 

495 raise FallbackToBackend(SequentialBackend(nesting_level=self.nesting_level)) 

496 self.parallel = parallel 

497 self._n_jobs = n_jobs 

498 return n_jobs 

499 

500 def _get_pool(self): 

501 """Lazily initialize the thread pool 

502 

503 The actual pool of worker threads is only initialized at the first 

504 call to apply_async. 

505 """ 

506 if self._pool is None: 

507 self._pool = ThreadPool(self._n_jobs) 

508 return self._pool 

509 

510 

511class MultiprocessingBackend(PoolManagerMixin, AutoBatchingMixin, ParallelBackendBase): 

512 """A ParallelBackend which will use a multiprocessing.Pool. 

513 

514 Will introduce some communication and memory overhead when exchanging 

515 input and output data with the with the worker Python processes. 

516 However, does not suffer from the Python Global Interpreter Lock. 

517 """ 

518 

519 supports_retrieve_callback = True 

520 supports_return_generator = False 

521 

522 def effective_n_jobs(self, n_jobs): 

523 """Determine the number of jobs which are going to run in parallel. 

524 

525 This also checks if we are attempting to create a nested parallel 

526 loop. 

527 """ 

528 if mp is None: 

529 return 1 

530 

531 if mp.current_process().daemon: 

532 # Daemonic processes cannot have children 

533 if n_jobs != 1: 

534 if inside_dask_worker(): 

535 msg = ( 

536 "Inside a Dask worker with daemon=True, " 

537 "setting n_jobs=1.\nPossible work-arounds:\n" 

538 "- dask.config.set(" 

539 "{'distributed.worker.daemon': False})" 

540 "- set the environment variable " 

541 "DASK_DISTRIBUTED__WORKER__DAEMON=False\n" 

542 "before creating your Dask cluster." 

543 ) 

544 else: 

545 msg = ( 

546 "Multiprocessing-backed parallel loops " 

547 "cannot be nested, setting n_jobs=1" 

548 ) 

549 warnings.warn(msg, stacklevel=3) 

550 return 1 

551 

552 if process_executor._CURRENT_DEPTH > 0: 

553 # Mixing loky and multiprocessing in nested loop is not supported 

554 if n_jobs != 1: 

555 warnings.warn( 

556 "Multiprocessing-backed parallel loops cannot be nested," 

557 " below loky, setting n_jobs=1", 

558 stacklevel=3, 

559 ) 

560 return 1 

561 

562 elif not (self.in_main_thread() or self.nesting_level == 0): 

563 # Prevent posix fork inside in non-main posix threads 

564 if n_jobs != 1: 

565 warnings.warn( 

566 "Multiprocessing-backed parallel loops cannot be nested" 

567 " below threads, setting n_jobs=1", 

568 stacklevel=3, 

569 ) 

570 return 1 

571 

572 return super(MultiprocessingBackend, self).effective_n_jobs(n_jobs) 

573 

574 def configure( 

575 self, 

576 n_jobs=1, 

577 parallel=None, 

578 prefer=None, 

579 require=None, 

580 **memmapping_pool_kwargs, 

581 ): 

582 """Build a process or thread pool and return the number of workers""" 

583 n_jobs = self.effective_n_jobs(n_jobs) 

584 if n_jobs == 1: 

585 raise FallbackToBackend(SequentialBackend(nesting_level=self.nesting_level)) 

586 

587 memmapping_pool_kwargs = { 

588 **self.backend_kwargs, 

589 **memmapping_pool_kwargs, 

590 } 

591 

592 # Make sure to free as much memory as possible before forking 

593 gc.collect() 

594 self._pool = MemmappingPool(n_jobs, **memmapping_pool_kwargs) 

595 self.parallel = parallel 

596 return n_jobs 

597 

598 def terminate(self): 

599 """Shutdown the process or thread pool""" 

600 super(MultiprocessingBackend, self).terminate() 

601 self.reset_batch_stats() 

602 

603 

604class LokyBackend(AutoBatchingMixin, ParallelBackendBase): 

605 """Managing pool of workers with loky instead of multiprocessing.""" 

606 

607 supports_retrieve_callback = True 

608 supports_inner_max_num_threads = True 

609 

610 def configure( 

611 self, 

612 n_jobs=1, 

613 parallel=None, 

614 prefer=None, 

615 require=None, 

616 idle_worker_timeout=None, 

617 **memmapping_executor_kwargs, 

618 ): 

619 """Build a process executor and return the number of workers""" 

620 n_jobs = self.effective_n_jobs(n_jobs) 

621 if n_jobs == 1: 

622 raise FallbackToBackend(SequentialBackend(nesting_level=self.nesting_level)) 

623 

624 memmapping_executor_kwargs = { 

625 **self.backend_kwargs, 

626 **memmapping_executor_kwargs, 

627 } 

628 

629 # Prohibit the use of 'timeout' in the LokyBackend, as 'idle_worker_timeout' 

630 # better describes the backend's behavior. 

631 if "timeout" in memmapping_executor_kwargs: 

632 raise ValueError( 

633 "The 'timeout' parameter is not supported by the LokyBackend. " 

634 "Please use the `idle_worker_timeout` parameter instead." 

635 ) 

636 if idle_worker_timeout is None: 

637 idle_worker_timeout = self.backend_kwargs.get("idle_worker_timeout", 300) 

638 

639 self._workers = get_memmapping_executor( 

640 n_jobs, 

641 timeout=idle_worker_timeout, 

642 env=self._prepare_worker_env(n_jobs=n_jobs), 

643 context_id=parallel._id, 

644 **memmapping_executor_kwargs, 

645 ) 

646 self.parallel = parallel 

647 return n_jobs 

648 

649 def effective_n_jobs(self, n_jobs): 

650 """Determine the number of jobs which are going to run in parallel""" 

651 if n_jobs == 0: 

652 raise ValueError("n_jobs == 0 in Parallel has no meaning") 

653 elif mp is None or n_jobs is None: 

654 # multiprocessing is not available or disabled, fallback 

655 # to sequential mode 

656 return 1 

657 elif mp.current_process().daemon: 

658 # Daemonic processes cannot have children 

659 if n_jobs != 1: 

660 if inside_dask_worker(): 

661 msg = ( 

662 "Inside a Dask worker with daemon=True, " 

663 "setting n_jobs=1.\nPossible work-arounds:\n" 

664 "- dask.config.set(" 

665 "{'distributed.worker.daemon': False})\n" 

666 "- set the environment variable " 

667 "DASK_DISTRIBUTED__WORKER__DAEMON=False\n" 

668 "before creating your Dask cluster." 

669 ) 

670 else: 

671 msg = ( 

672 "Loky-backed parallel loops cannot be called in a" 

673 " multiprocessing, setting n_jobs=1" 

674 ) 

675 warnings.warn(msg, stacklevel=3) 

676 

677 return 1 

678 elif not (self.in_main_thread() or self.nesting_level == 0): 

679 # Prevent posix fork inside in non-main posix threads 

680 if n_jobs != 1: 

681 warnings.warn( 

682 "Loky-backed parallel loops cannot be nested below " 

683 "threads, setting n_jobs=1", 

684 stacklevel=3, 

685 ) 

686 return 1 

687 elif n_jobs < 0: 

688 n_jobs = max(cpu_count() + 1 + n_jobs, 1) 

689 return n_jobs 

690 

691 def submit(self, func, callback=None): 

692 """Schedule a func to be run""" 

693 future = self._workers.submit(func) 

694 if callback is not None: 

695 future.add_done_callback(callback) 

696 return future 

697 

698 def retrieve_result_callback(self, future): 

699 """Retrieve the result, here out is the future given by submit""" 

700 try: 

701 return future.result() 

702 except ShutdownExecutorError: 

703 raise RuntimeError( 

704 "The executor underlying Parallel has been shutdown. " 

705 "This is likely due to the garbage collection of a previous " 

706 "generator from a call to Parallel with return_as='generator'." 

707 " Make sure the generator is not garbage collected when " 

708 "submitting a new job or that it is first properly exhausted." 

709 ) 

710 

711 def terminate(self): 

712 if self._workers is not None: 

713 # Don't terminate the workers as we want to reuse them in later 

714 # calls, but cleanup the temporary resources that the Parallel call 

715 # created. This 'hack' requires a private, low-level operation. 

716 self._workers._temp_folder_manager._clean_temporary_resources( 

717 context_id=self.parallel._id, force=False 

718 ) 

719 self._workers = None 

720 

721 self.reset_batch_stats() 

722 

723 def abort_everything(self, ensure_ready=True): 

724 """Shutdown the workers and restart a new one with the same parameters""" 

725 self._workers.terminate(kill_workers=True) 

726 self._workers = None 

727 

728 if ensure_ready: 

729 self.configure(n_jobs=self.parallel.n_jobs, parallel=self.parallel) 

730 

731 

732class FallbackToBackend(Exception): 

733 """Raised when configuration should fallback to another backend""" 

734 

735 def __init__(self, backend): 

736 self.backend = backend 

737 

738 

739def inside_dask_worker(): 

740 """Check whether the current function is executed inside a Dask worker.""" 

741 # This function can not be in joblib._dask because there would be a 

742 # circular import: 

743 # _dask imports _parallel_backend that imports _dask ... 

744 try: 

745 from distributed import get_worker 

746 except ImportError: 

747 return False 

748 

749 try: 

750 get_worker() 

751 return True 

752 except ValueError: 

753 return False