Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/joblib/_parallel_backends.py: 35%

276 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-12 06:31 +0000

1""" 

2Backends for embarrassingly parallel code. 

3""" 

4 

5import gc 

6import os 

7import warnings 

8import threading 

9import contextlib 

10from abc import ABCMeta, abstractmethod 

11 

12 

13from ._multiprocessing_helpers import mp 

14 

15if mp is not None: 

16 from .pool import MemmappingPool 

17 from multiprocessing.pool import ThreadPool 

18 from .executor import get_memmapping_executor 

19 

20 # Import loky only if multiprocessing is present 

21 from .externals.loky import process_executor, cpu_count 

22 from .externals.loky.process_executor import ShutdownExecutorError 

23 from .externals.loky.process_executor import _ExceptionWithTraceback 

24 

25 

26class ParallelBackendBase(metaclass=ABCMeta): 

27 """Helper abc which defines all methods a ParallelBackend must implement""" 

28 

29 supports_inner_max_num_threads = False 

30 supports_retrieve_callback = False 

31 default_n_jobs = 1 

32 

33 @property 

34 def supports_return_generator(self): 

35 return self.supports_retrieve_callback 

36 

37 @property 

38 def supports_timeout(self): 

39 return self.supports_retrieve_callback 

40 

41 nesting_level = None 

42 

43 def __init__(self, nesting_level=None, inner_max_num_threads=None, 

44 **kwargs): 

45 super().__init__(**kwargs) 

46 self.nesting_level = nesting_level 

47 self.inner_max_num_threads = inner_max_num_threads 

48 

49 MAX_NUM_THREADS_VARS = [ 

50 'OMP_NUM_THREADS', 'OPENBLAS_NUM_THREADS', 'MKL_NUM_THREADS', 

51 'BLIS_NUM_THREADS', 'VECLIB_MAXIMUM_THREADS', 'NUMBA_NUM_THREADS', 

52 'NUMEXPR_NUM_THREADS', 

53 ] 

54 

55 TBB_ENABLE_IPC_VAR = "ENABLE_IPC" 

56 

57 @abstractmethod 

58 def effective_n_jobs(self, n_jobs): 

59 """Determine the number of jobs that can actually run in parallel 

60 

61 n_jobs is the number of workers requested by the callers. Passing 

62 n_jobs=-1 means requesting all available workers for instance matching 

63 the number of CPU cores on the worker host(s). 

64 

65 This method should return a guesstimate of the number of workers that 

66 can actually perform work concurrently. The primary use case is to make 

67 it possible for the caller to know in how many chunks to slice the 

68 work. 

69 

70 In general working on larger data chunks is more efficient (less 

71 scheduling overhead and better use of CPU cache prefetching heuristics) 

72 as long as all the workers have enough work to do. 

73 """ 

74 

75 @abstractmethod 

76 def apply_async(self, func, callback=None): 

77 """Schedule a func to be run""" 

78 

79 def retrieve_result_callback(self, out): 

80 """Called within the callback function passed in apply_async. 

81 

82 The argument of this function is the argument given to a callback in 

83 the considered backend. It is supposed to return the outcome of a task 

84 if it succeeded or raise the exception if it failed. 

85 """ 

86 

87 def configure(self, n_jobs=1, parallel=None, prefer=None, require=None, 

88 **backend_args): 

89 """Reconfigure the backend and return the number of workers. 

90 

91 This makes it possible to reuse an existing backend instance for 

92 successive independent calls to Parallel with different parameters. 

93 """ 

94 self.parallel = parallel 

95 return self.effective_n_jobs(n_jobs) 

96 

97 def start_call(self): 

98 """Call-back method called at the beginning of a Parallel call""" 

99 

100 def stop_call(self): 

101 """Call-back method called at the end of a Parallel call""" 

102 

103 def terminate(self): 

104 """Shutdown the workers and free the shared memory.""" 

105 

106 def compute_batch_size(self): 

107 """Determine the optimal batch size""" 

108 return 1 

109 

110 def batch_completed(self, batch_size, duration): 

111 """Callback indicate how long it took to run a batch""" 

112 

113 def get_exceptions(self): 

114 """List of exception types to be captured.""" 

115 return [] 

116 

117 def abort_everything(self, ensure_ready=True): 

118 """Abort any running tasks 

119 

120 This is called when an exception has been raised when executing a task 

121 and all the remaining tasks will be ignored and can therefore be 

122 aborted to spare computation resources. 

123 

124 If ensure_ready is True, the backend should be left in an operating 

125 state as future tasks might be re-submitted via that same backend 

126 instance. 

127 

128 If ensure_ready is False, the implementer of this method can decide 

129 to leave the backend in a closed / terminated state as no new task 

130 are expected to be submitted to this backend. 

131 

132 Setting ensure_ready to False is an optimization that can be leveraged 

133 when aborting tasks via killing processes from a local process pool 

134 managed by the backend it-self: if we expect no new tasks, there is no 

135 point in re-creating new workers. 

136 """ 

137 # Does nothing by default: to be overridden in subclasses when 

138 # canceling tasks is possible. 

139 pass 

140 

141 def get_nested_backend(self): 

142 """Backend instance to be used by nested Parallel calls. 

143 

144 By default a thread-based backend is used for the first level of 

145 nesting. Beyond, switch to sequential backend to avoid spawning too 

146 many threads on the host. 

147 """ 

148 nesting_level = getattr(self, 'nesting_level', 0) + 1 

149 if nesting_level > 1: 

150 return SequentialBackend(nesting_level=nesting_level), None 

151 else: 

152 return ThreadingBackend(nesting_level=nesting_level), None 

153 

154 @contextlib.contextmanager 

155 def retrieval_context(self): 

156 """Context manager to manage an execution context. 

157 

158 Calls to Parallel.retrieve will be made inside this context. 

159 

160 By default, this does nothing. It may be useful for subclasses to 

161 handle nested parallelism. In particular, it may be required to avoid 

162 deadlocks if a backend manages a fixed number of workers, when those 

163 workers may be asked to do nested Parallel calls. Without 

164 'retrieval_context' this could lead to deadlock, as all the workers 

165 managed by the backend may be "busy" waiting for the nested parallel 

166 calls to finish, but the backend has no free workers to execute those 

167 tasks. 

168 """ 

169 yield 

170 

171 def _prepare_worker_env(self, n_jobs): 

172 """Return environment variables limiting threadpools in external libs. 

173 

174 This function return a dict containing environment variables to pass 

175 when creating a pool of process. These environment variables limit the 

176 number of threads to `n_threads` for OpenMP, MKL, Accelerated and 

177 OpenBLAS libraries in the child processes. 

178 """ 

179 explicit_n_threads = self.inner_max_num_threads 

180 default_n_threads = str(max(cpu_count() // n_jobs, 1)) 

181 

182 # Set the inner environment variables to self.inner_max_num_threads if 

183 # it is given. Else, default to cpu_count // n_jobs unless the variable 

184 # is already present in the parent process environment. 

185 env = {} 

186 for var in self.MAX_NUM_THREADS_VARS: 

187 if explicit_n_threads is None: 

188 var_value = os.environ.get(var, None) 

189 if var_value is None: 

190 var_value = default_n_threads 

191 else: 

192 var_value = str(explicit_n_threads) 

193 

194 env[var] = var_value 

195 

196 if self.TBB_ENABLE_IPC_VAR not in os.environ: 

197 # To avoid over-subscription when using TBB, let the TBB schedulers 

198 # use Inter Process Communication to coordinate: 

199 env[self.TBB_ENABLE_IPC_VAR] = "1" 

200 return env 

201 

202 @staticmethod 

203 def in_main_thread(): 

204 return isinstance(threading.current_thread(), threading._MainThread) 

205 

206 

207class SequentialBackend(ParallelBackendBase): 

208 """A ParallelBackend which will execute all batches sequentially. 

209 

210 Does not use/create any threading objects, and hence has minimal 

211 overhead. Used when n_jobs == 1. 

212 """ 

213 

214 uses_threads = True 

215 supports_timeout = False 

216 supports_retrieve_callback = False 

217 supports_sharedmem = True 

218 

219 def effective_n_jobs(self, n_jobs): 

220 """Determine the number of jobs which are going to run in parallel""" 

221 if n_jobs == 0: 

222 raise ValueError('n_jobs == 0 in Parallel has no meaning') 

223 return 1 

224 

225 def apply_async(self, func, callback=None): 

226 """Schedule a func to be run""" 

227 raise RuntimeError("Should never be called for SequentialBackend.") 

228 

229 def retrieve_result_callback(self, out): 

230 raise RuntimeError("Should never be called for SequentialBackend.") 

231 

232 def get_nested_backend(self): 

233 # import is not top level to avoid cyclic import errors. 

234 from .parallel import get_active_backend 

235 

236 # SequentialBackend should neither change the nesting level, the 

237 # default backend or the number of jobs. Just return the current one. 

238 return get_active_backend() 

239 

240 

241class PoolManagerMixin(object): 

242 """A helper class for managing pool of workers.""" 

243 

244 _pool = None 

245 

246 def effective_n_jobs(self, n_jobs): 

247 """Determine the number of jobs which are going to run in parallel""" 

248 if n_jobs == 0: 

249 raise ValueError('n_jobs == 0 in Parallel has no meaning') 

250 elif mp is None or n_jobs is None: 

251 # multiprocessing is not available or disabled, fallback 

252 # to sequential mode 

253 return 1 

254 elif n_jobs < 0: 

255 n_jobs = max(cpu_count() + 1 + n_jobs, 1) 

256 return n_jobs 

257 

258 def terminate(self): 

259 """Shutdown the process or thread pool""" 

260 if self._pool is not None: 

261 self._pool.close() 

262 self._pool.terminate() # terminate does a join() 

263 self._pool = None 

264 

265 def _get_pool(self): 

266 """Used by apply_async to make it possible to implement lazy init""" 

267 return self._pool 

268 

269 @staticmethod 

270 def _wrap_func_call(func): 

271 """Protect function call and return error with traceback.""" 

272 try: 

273 return func() 

274 except BaseException as e: 

275 return _ExceptionWithTraceback(e) 

276 

277 def apply_async(self, func, callback=None): 

278 """Schedule a func to be run""" 

279 # Here, we need a wrapper to avoid crashes on KeyboardInterruptErrors. 

280 # We also call the callback on error, to make sure the pool does not 

281 # wait on crashed jobs. 

282 return self._get_pool().apply_async( 

283 self._wrap_func_call, (func,), 

284 callback=callback, error_callback=callback 

285 ) 

286 

287 def retrieve_result_callback(self, out): 

288 """Mimic concurrent.futures results, raising an error if needed.""" 

289 if isinstance(out, _ExceptionWithTraceback): 

290 rebuild, args = out.__reduce__() 

291 out = rebuild(*args) 

292 if isinstance(out, BaseException): 

293 raise out 

294 return out 

295 

296 def abort_everything(self, ensure_ready=True): 

297 """Shutdown the pool and restart a new one with the same parameters""" 

298 self.terminate() 

299 if ensure_ready: 

300 self.configure(n_jobs=self.parallel.n_jobs, parallel=self.parallel, 

301 **self.parallel._backend_args) 

302 

303 

304class AutoBatchingMixin(object): 

305 """A helper class for automagically batching jobs.""" 

306 

307 # In seconds, should be big enough to hide multiprocessing dispatching 

308 # overhead. 

309 # This settings was found by running benchmarks/bench_auto_batching.py 

310 # with various parameters on various platforms. 

311 MIN_IDEAL_BATCH_DURATION = .2 

312 

313 # Should not be too high to avoid stragglers: long jobs running alone 

314 # on a single worker while other workers have no work to process any more. 

315 MAX_IDEAL_BATCH_DURATION = 2 

316 

317 # Batching counters default values 

318 _DEFAULT_EFFECTIVE_BATCH_SIZE = 1 

319 _DEFAULT_SMOOTHED_BATCH_DURATION = 0.0 

320 

321 def __init__(self, **kwargs): 

322 super().__init__(**kwargs) 

323 self._effective_batch_size = self._DEFAULT_EFFECTIVE_BATCH_SIZE 

324 self._smoothed_batch_duration = self._DEFAULT_SMOOTHED_BATCH_DURATION 

325 

326 def compute_batch_size(self): 

327 """Determine the optimal batch size""" 

328 old_batch_size = self._effective_batch_size 

329 batch_duration = self._smoothed_batch_duration 

330 if (batch_duration > 0 and 

331 batch_duration < self.MIN_IDEAL_BATCH_DURATION): 

332 # The current batch size is too small: the duration of the 

333 # processing of a batch of task is not large enough to hide 

334 # the scheduling overhead. 

335 ideal_batch_size = int(old_batch_size * 

336 self.MIN_IDEAL_BATCH_DURATION / 

337 batch_duration) 

338 # Multiply by two to limit oscilations between min and max. 

339 ideal_batch_size *= 2 

340 

341 # dont increase the batch size too fast to limit huge batch sizes 

342 # potentially leading to starving worker 

343 batch_size = min(2 * old_batch_size, ideal_batch_size) 

344 

345 batch_size = max(batch_size, 1) 

346 

347 self._effective_batch_size = batch_size 

348 if self.parallel.verbose >= 10: 

349 self.parallel._print( 

350 f"Batch computation too fast ({batch_duration}s.) " 

351 f"Setting batch_size={batch_size}." 

352 ) 

353 elif (batch_duration > self.MAX_IDEAL_BATCH_DURATION and 

354 old_batch_size >= 2): 

355 # The current batch size is too big. If we schedule overly long 

356 # running batches some CPUs might wait with nothing left to do 

357 # while a couple of CPUs a left processing a few long running 

358 # batches. Better reduce the batch size a bit to limit the 

359 # likelihood of scheduling such stragglers. 

360 

361 # decrease the batch size quickly to limit potential starving 

362 ideal_batch_size = int( 

363 old_batch_size * self.MIN_IDEAL_BATCH_DURATION / batch_duration 

364 ) 

365 # Multiply by two to limit oscilations between min and max. 

366 batch_size = max(2 * ideal_batch_size, 1) 

367 self._effective_batch_size = batch_size 

368 if self.parallel.verbose >= 10: 

369 self.parallel._print( 

370 f"Batch computation too slow ({batch_duration}s.) " 

371 f"Setting batch_size={batch_size}." 

372 ) 

373 else: 

374 # No batch size adjustment 

375 batch_size = old_batch_size 

376 

377 if batch_size != old_batch_size: 

378 # Reset estimation of the smoothed mean batch duration: this 

379 # estimate is updated in the multiprocessing apply_async 

380 # CallBack as long as the batch_size is constant. Therefore 

381 # we need to reset the estimate whenever we re-tune the batch 

382 # size. 

383 self._smoothed_batch_duration = \ 

384 self._DEFAULT_SMOOTHED_BATCH_DURATION 

385 

386 return batch_size 

387 

388 def batch_completed(self, batch_size, duration): 

389 """Callback indicate how long it took to run a batch""" 

390 if batch_size == self._effective_batch_size: 

391 # Update the smoothed streaming estimate of the duration of a batch 

392 # from dispatch to completion 

393 old_duration = self._smoothed_batch_duration 

394 if old_duration == self._DEFAULT_SMOOTHED_BATCH_DURATION: 

395 # First record of duration for this batch size after the last 

396 # reset. 

397 new_duration = duration 

398 else: 

399 # Update the exponentially weighted average of the duration of 

400 # batch for the current effective size. 

401 new_duration = 0.8 * old_duration + 0.2 * duration 

402 self._smoothed_batch_duration = new_duration 

403 

404 def reset_batch_stats(self): 

405 """Reset batch statistics to default values. 

406 

407 This avoids interferences with future jobs. 

408 """ 

409 self._effective_batch_size = self._DEFAULT_EFFECTIVE_BATCH_SIZE 

410 self._smoothed_batch_duration = self._DEFAULT_SMOOTHED_BATCH_DURATION 

411 

412 

413class ThreadingBackend(PoolManagerMixin, ParallelBackendBase): 

414 """A ParallelBackend which will use a thread pool to execute batches in. 

415 

416 This is a low-overhead backend but it suffers from the Python Global 

417 Interpreter Lock if the called function relies a lot on Python objects. 

418 Mostly useful when the execution bottleneck is a compiled extension that 

419 explicitly releases the GIL (for instance a Cython loop wrapped in a "with 

420 nogil" block or an expensive call to a library such as NumPy). 

421 

422 The actual thread pool is lazily initialized: the actual thread pool 

423 construction is delayed to the first call to apply_async. 

424 

425 ThreadingBackend is used as the default backend for nested calls. 

426 """ 

427 

428 supports_retrieve_callback = True 

429 uses_threads = True 

430 supports_sharedmem = True 

431 

432 def configure(self, n_jobs=1, parallel=None, **backend_args): 

433 """Build a process or thread pool and return the number of workers""" 

434 n_jobs = self.effective_n_jobs(n_jobs) 

435 if n_jobs == 1: 

436 # Avoid unnecessary overhead and use sequential backend instead. 

437 raise FallbackToBackend( 

438 SequentialBackend(nesting_level=self.nesting_level)) 

439 self.parallel = parallel 

440 self._n_jobs = n_jobs 

441 return n_jobs 

442 

443 def _get_pool(self): 

444 """Lazily initialize the thread pool 

445 

446 The actual pool of worker threads is only initialized at the first 

447 call to apply_async. 

448 """ 

449 if self._pool is None: 

450 self._pool = ThreadPool(self._n_jobs) 

451 return self._pool 

452 

453 

454class MultiprocessingBackend(PoolManagerMixin, AutoBatchingMixin, 

455 ParallelBackendBase): 

456 """A ParallelBackend which will use a multiprocessing.Pool. 

457 

458 Will introduce some communication and memory overhead when exchanging 

459 input and output data with the with the worker Python processes. 

460 However, does not suffer from the Python Global Interpreter Lock. 

461 """ 

462 

463 supports_retrieve_callback = True 

464 supports_return_generator = False 

465 

466 def effective_n_jobs(self, n_jobs): 

467 """Determine the number of jobs which are going to run in parallel. 

468 

469 This also checks if we are attempting to create a nested parallel 

470 loop. 

471 """ 

472 if mp is None: 

473 return 1 

474 

475 if mp.current_process().daemon: 

476 # Daemonic processes cannot have children 

477 if n_jobs != 1: 

478 if inside_dask_worker(): 

479 msg = ( 

480 "Inside a Dask worker with daemon=True, " 

481 "setting n_jobs=1.\nPossible work-arounds:\n" 

482 "- dask.config.set(" 

483 "{'distributed.worker.daemon': False})" 

484 "- set the environment variable " 

485 "DASK_DISTRIBUTED__WORKER__DAEMON=False\n" 

486 "before creating your Dask cluster." 

487 ) 

488 else: 

489 msg = ( 

490 'Multiprocessing-backed parallel loops ' 

491 'cannot be nested, setting n_jobs=1' 

492 ) 

493 warnings.warn(msg, stacklevel=3) 

494 return 1 

495 

496 if process_executor._CURRENT_DEPTH > 0: 

497 # Mixing loky and multiprocessing in nested loop is not supported 

498 if n_jobs != 1: 

499 warnings.warn( 

500 'Multiprocessing-backed parallel loops cannot be nested,' 

501 ' below loky, setting n_jobs=1', 

502 stacklevel=3) 

503 return 1 

504 

505 elif not (self.in_main_thread() or self.nesting_level == 0): 

506 # Prevent posix fork inside in non-main posix threads 

507 if n_jobs != 1: 

508 warnings.warn( 

509 'Multiprocessing-backed parallel loops cannot be nested' 

510 ' below threads, setting n_jobs=1', 

511 stacklevel=3) 

512 return 1 

513 

514 return super(MultiprocessingBackend, self).effective_n_jobs(n_jobs) 

515 

516 def configure(self, n_jobs=1, parallel=None, prefer=None, require=None, 

517 **memmappingpool_args): 

518 """Build a process or thread pool and return the number of workers""" 

519 n_jobs = self.effective_n_jobs(n_jobs) 

520 if n_jobs == 1: 

521 raise FallbackToBackend( 

522 SequentialBackend(nesting_level=self.nesting_level)) 

523 

524 # Make sure to free as much memory as possible before forking 

525 gc.collect() 

526 self._pool = MemmappingPool(n_jobs, **memmappingpool_args) 

527 self.parallel = parallel 

528 return n_jobs 

529 

530 def terminate(self): 

531 """Shutdown the process or thread pool""" 

532 super(MultiprocessingBackend, self).terminate() 

533 self.reset_batch_stats() 

534 

535 

536class LokyBackend(AutoBatchingMixin, ParallelBackendBase): 

537 """Managing pool of workers with loky instead of multiprocessing.""" 

538 

539 supports_retrieve_callback = True 

540 supports_inner_max_num_threads = True 

541 

542 def configure(self, n_jobs=1, parallel=None, prefer=None, require=None, 

543 idle_worker_timeout=300, **memmappingexecutor_args): 

544 """Build a process executor and return the number of workers""" 

545 n_jobs = self.effective_n_jobs(n_jobs) 

546 if n_jobs == 1: 

547 raise FallbackToBackend( 

548 SequentialBackend(nesting_level=self.nesting_level)) 

549 

550 self._workers = get_memmapping_executor( 

551 n_jobs, timeout=idle_worker_timeout, 

552 env=self._prepare_worker_env(n_jobs=n_jobs), 

553 context_id=parallel._id, **memmappingexecutor_args) 

554 self.parallel = parallel 

555 return n_jobs 

556 

557 def effective_n_jobs(self, n_jobs): 

558 """Determine the number of jobs which are going to run in parallel""" 

559 if n_jobs == 0: 

560 raise ValueError('n_jobs == 0 in Parallel has no meaning') 

561 elif mp is None or n_jobs is None: 

562 # multiprocessing is not available or disabled, fallback 

563 # to sequential mode 

564 return 1 

565 elif mp.current_process().daemon: 

566 # Daemonic processes cannot have children 

567 if n_jobs != 1: 

568 if inside_dask_worker(): 

569 msg = ( 

570 "Inside a Dask worker with daemon=True, " 

571 "setting n_jobs=1.\nPossible work-arounds:\n" 

572 "- dask.config.set(" 

573 "{'distributed.worker.daemon': False})\n" 

574 "- set the environment variable " 

575 "DASK_DISTRIBUTED__WORKER__DAEMON=False\n" 

576 "before creating your Dask cluster." 

577 ) 

578 else: 

579 msg = ( 

580 'Loky-backed parallel loops cannot be called in a' 

581 ' multiprocessing, setting n_jobs=1' 

582 ) 

583 warnings.warn(msg, stacklevel=3) 

584 

585 return 1 

586 elif not (self.in_main_thread() or self.nesting_level == 0): 

587 # Prevent posix fork inside in non-main posix threads 

588 if n_jobs != 1: 

589 warnings.warn( 

590 'Loky-backed parallel loops cannot be nested below ' 

591 'threads, setting n_jobs=1', 

592 stacklevel=3) 

593 return 1 

594 elif n_jobs < 0: 

595 n_jobs = max(cpu_count() + 1 + n_jobs, 1) 

596 return n_jobs 

597 

598 def apply_async(self, func, callback=None): 

599 """Schedule a func to be run""" 

600 future = self._workers.submit(func) 

601 if callback is not None: 

602 future.add_done_callback(callback) 

603 return future 

604 

605 def retrieve_result_callback(self, out): 

606 try: 

607 return out.result() 

608 except ShutdownExecutorError: 

609 raise RuntimeError( 

610 "The executor underlying Parallel has been shutdown. " 

611 "This is likely due to the garbage collection of a previous " 

612 "generator from a call to Parallel with return_as='generator'." 

613 " Make sure the generator is not garbage collected when " 

614 "submitting a new job or that it is first properly exhausted." 

615 ) 

616 

617 def terminate(self): 

618 if self._workers is not None: 

619 # Don't terminate the workers as we want to reuse them in later 

620 # calls, but cleanup the temporary resources that the Parallel call 

621 # created. This 'hack' requires a private, low-level operation. 

622 self._workers._temp_folder_manager._clean_temporary_resources( 

623 context_id=self.parallel._id, force=False 

624 ) 

625 self._workers = None 

626 

627 self.reset_batch_stats() 

628 

629 def abort_everything(self, ensure_ready=True): 

630 """Shutdown the workers and restart a new one with the same parameters 

631 """ 

632 self._workers.terminate(kill_workers=True) 

633 self._workers = None 

634 

635 if ensure_ready: 

636 self.configure(n_jobs=self.parallel.n_jobs, parallel=self.parallel) 

637 

638 

639class FallbackToBackend(Exception): 

640 """Raised when configuration should fallback to another backend""" 

641 

642 def __init__(self, backend): 

643 self.backend = backend 

644 

645 

646def inside_dask_worker(): 

647 """Check whether the current function is executed inside a Dask worker. 

648 """ 

649 # This function can not be in joblib._dask because there would be a 

650 # circular import: 

651 # _dask imports _parallel_backend that imports _dask ... 

652 try: 

653 from distributed import get_worker 

654 except ImportError: 

655 return False 

656 

657 try: 

658 get_worker() 

659 return True 

660 except ValueError: 

661 return False