Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/joblib/_parallel_backends.py: 36%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

263 statements  

1""" 

2Backends for embarrassingly parallel code. 

3""" 

4 

5import gc 

6import os 

7import warnings 

8import threading 

9import contextlib 

10from abc import ABCMeta, abstractmethod 

11 

12from ._utils import ( 

13 _TracebackCapturingWrapper, 

14 _retrieve_traceback_capturing_wrapped_call 

15) 

16 

17from ._multiprocessing_helpers import mp 

18 

19if mp is not None: 

20 from .pool import MemmappingPool 

21 from multiprocessing.pool import ThreadPool 

22 from .executor import get_memmapping_executor 

23 

24 # Import loky only if multiprocessing is present 

25 from .externals.loky import process_executor, cpu_count 

26 from .externals.loky.process_executor import ShutdownExecutorError 

27 

28 

29class ParallelBackendBase(metaclass=ABCMeta): 

30 """Helper abc which defines all methods a ParallelBackend must implement""" 

31 

32 supports_inner_max_num_threads = False 

33 supports_retrieve_callback = False 

34 default_n_jobs = 1 

35 

36 @property 

37 def supports_return_generator(self): 

38 return self.supports_retrieve_callback 

39 

40 @property 

41 def supports_timeout(self): 

42 return self.supports_retrieve_callback 

43 

44 nesting_level = None 

45 

46 def __init__(self, nesting_level=None, inner_max_num_threads=None, 

47 **kwargs): 

48 super().__init__(**kwargs) 

49 self.nesting_level = nesting_level 

50 self.inner_max_num_threads = inner_max_num_threads 

51 

52 MAX_NUM_THREADS_VARS = [ 

53 'OMP_NUM_THREADS', 'OPENBLAS_NUM_THREADS', 'MKL_NUM_THREADS', 

54 'BLIS_NUM_THREADS', 'VECLIB_MAXIMUM_THREADS', 'NUMBA_NUM_THREADS', 

55 'NUMEXPR_NUM_THREADS', 

56 ] 

57 

58 TBB_ENABLE_IPC_VAR = "ENABLE_IPC" 

59 

60 @abstractmethod 

61 def effective_n_jobs(self, n_jobs): 

62 """Determine the number of jobs that can actually run in parallel 

63 

64 n_jobs is the number of workers requested by the callers. Passing 

65 n_jobs=-1 means requesting all available workers for instance matching 

66 the number of CPU cores on the worker host(s). 

67 

68 This method should return a guesstimate of the number of workers that 

69 can actually perform work concurrently. The primary use case is to make 

70 it possible for the caller to know in how many chunks to slice the 

71 work. 

72 

73 In general working on larger data chunks is more efficient (less 

74 scheduling overhead and better use of CPU cache prefetching heuristics) 

75 as long as all the workers have enough work to do. 

76 """ 

77 

78 @abstractmethod 

79 def apply_async(self, func, callback=None): 

80 """Schedule a func to be run""" 

81 

82 def retrieve_result_callback(self, out): 

83 """Called within the callback function passed in apply_async. 

84 

85 The argument of this function is the argument given to a callback in 

86 the considered backend. It is supposed to return the outcome of a task 

87 if it succeeded or raise the exception if it failed. 

88 """ 

89 

90 def configure(self, n_jobs=1, parallel=None, prefer=None, require=None, 

91 **backend_args): 

92 """Reconfigure the backend and return the number of workers. 

93 

94 This makes it possible to reuse an existing backend instance for 

95 successive independent calls to Parallel with different parameters. 

96 """ 

97 self.parallel = parallel 

98 return self.effective_n_jobs(n_jobs) 

99 

100 def start_call(self): 

101 """Call-back method called at the beginning of a Parallel call""" 

102 

103 def stop_call(self): 

104 """Call-back method called at the end of a Parallel call""" 

105 

106 def terminate(self): 

107 """Shutdown the workers and free the shared memory.""" 

108 

109 def compute_batch_size(self): 

110 """Determine the optimal batch size""" 

111 return 1 

112 

113 def batch_completed(self, batch_size, duration): 

114 """Callback indicate how long it took to run a batch""" 

115 

116 def get_exceptions(self): 

117 """List of exception types to be captured.""" 

118 return [] 

119 

120 def abort_everything(self, ensure_ready=True): 

121 """Abort any running tasks 

122 

123 This is called when an exception has been raised when executing a task 

124 and all the remaining tasks will be ignored and can therefore be 

125 aborted to spare computation resources. 

126 

127 If ensure_ready is True, the backend should be left in an operating 

128 state as future tasks might be re-submitted via that same backend 

129 instance. 

130 

131 If ensure_ready is False, the implementer of this method can decide 

132 to leave the backend in a closed / terminated state as no new task 

133 are expected to be submitted to this backend. 

134 

135 Setting ensure_ready to False is an optimization that can be leveraged 

136 when aborting tasks via killing processes from a local process pool 

137 managed by the backend it-self: if we expect no new tasks, there is no 

138 point in re-creating new workers. 

139 """ 

140 # Does nothing by default: to be overridden in subclasses when 

141 # canceling tasks is possible. 

142 pass 

143 

144 def get_nested_backend(self): 

145 """Backend instance to be used by nested Parallel calls. 

146 

147 By default a thread-based backend is used for the first level of 

148 nesting. Beyond, switch to sequential backend to avoid spawning too 

149 many threads on the host. 

150 """ 

151 nesting_level = getattr(self, 'nesting_level', 0) + 1 

152 if nesting_level > 1: 

153 return SequentialBackend(nesting_level=nesting_level), None 

154 else: 

155 return ThreadingBackend(nesting_level=nesting_level), None 

156 

157 @contextlib.contextmanager 

158 def retrieval_context(self): 

159 """Context manager to manage an execution context. 

160 

161 Calls to Parallel.retrieve will be made inside this context. 

162 

163 By default, this does nothing. It may be useful for subclasses to 

164 handle nested parallelism. In particular, it may be required to avoid 

165 deadlocks if a backend manages a fixed number of workers, when those 

166 workers may be asked to do nested Parallel calls. Without 

167 'retrieval_context' this could lead to deadlock, as all the workers 

168 managed by the backend may be "busy" waiting for the nested parallel 

169 calls to finish, but the backend has no free workers to execute those 

170 tasks. 

171 """ 

172 yield 

173 

174 def _prepare_worker_env(self, n_jobs): 

175 """Return environment variables limiting threadpools in external libs. 

176 

177 This function return a dict containing environment variables to pass 

178 when creating a pool of process. These environment variables limit the 

179 number of threads to `n_threads` for OpenMP, MKL, Accelerated and 

180 OpenBLAS libraries in the child processes. 

181 """ 

182 explicit_n_threads = self.inner_max_num_threads 

183 default_n_threads = max(cpu_count() // n_jobs, 1) 

184 

185 # Set the inner environment variables to self.inner_max_num_threads if 

186 # it is given. Else, default to cpu_count // n_jobs unless the variable 

187 # is already present in the parent process environment. 

188 env = {} 

189 for var in self.MAX_NUM_THREADS_VARS: 

190 if explicit_n_threads is None: 

191 var_value = os.environ.get(var, default_n_threads) 

192 else: 

193 var_value = explicit_n_threads 

194 

195 env[var] = str(var_value) 

196 

197 if self.TBB_ENABLE_IPC_VAR not in os.environ: 

198 # To avoid over-subscription when using TBB, let the TBB schedulers 

199 # use Inter Process Communication to coordinate: 

200 env[self.TBB_ENABLE_IPC_VAR] = "1" 

201 return env 

202 

203 @staticmethod 

204 def in_main_thread(): 

205 return isinstance(threading.current_thread(), threading._MainThread) 

206 

207 

208class SequentialBackend(ParallelBackendBase): 

209 """A ParallelBackend which will execute all batches sequentially. 

210 

211 Does not use/create any threading objects, and hence has minimal 

212 overhead. Used when n_jobs == 1. 

213 """ 

214 

215 uses_threads = True 

216 supports_timeout = False 

217 supports_retrieve_callback = False 

218 supports_sharedmem = True 

219 

220 def effective_n_jobs(self, n_jobs): 

221 """Determine the number of jobs which are going to run in parallel""" 

222 if n_jobs == 0: 

223 raise ValueError('n_jobs == 0 in Parallel has no meaning') 

224 return 1 

225 

226 def apply_async(self, func, callback=None): 

227 """Schedule a func to be run""" 

228 raise RuntimeError("Should never be called for SequentialBackend.") 

229 

230 def retrieve_result_callback(self, out): 

231 raise RuntimeError("Should never be called for SequentialBackend.") 

232 

233 def get_nested_backend(self): 

234 # import is not top level to avoid cyclic import errors. 

235 from .parallel import get_active_backend 

236 

237 # SequentialBackend should neither change the nesting level, the 

238 # default backend or the number of jobs. Just return the current one. 

239 return get_active_backend() 

240 

241 

242class PoolManagerMixin(object): 

243 """A helper class for managing pool of workers.""" 

244 

245 _pool = None 

246 

247 def effective_n_jobs(self, n_jobs): 

248 """Determine the number of jobs which are going to run in parallel""" 

249 if n_jobs == 0: 

250 raise ValueError('n_jobs == 0 in Parallel has no meaning') 

251 elif mp is None or n_jobs is None: 

252 # multiprocessing is not available or disabled, fallback 

253 # to sequential mode 

254 return 1 

255 elif n_jobs < 0: 

256 n_jobs = max(cpu_count() + 1 + n_jobs, 1) 

257 return n_jobs 

258 

259 def terminate(self): 

260 """Shutdown the process or thread pool""" 

261 if self._pool is not None: 

262 self._pool.close() 

263 self._pool.terminate() # terminate does a join() 

264 self._pool = None 

265 

266 def _get_pool(self): 

267 """Used by apply_async to make it possible to implement lazy init""" 

268 return self._pool 

269 

270 def apply_async(self, func, callback=None): 

271 """Schedule a func to be run""" 

272 # Here, we need a wrapper to avoid crashes on KeyboardInterruptErrors. 

273 # We also call the callback on error, to make sure the pool does not 

274 # wait on crashed jobs. 

275 return self._get_pool().apply_async( 

276 _TracebackCapturingWrapper(func), (), 

277 callback=callback, error_callback=callback 

278 ) 

279 

280 def retrieve_result_callback(self, out): 

281 """Mimic concurrent.futures results, raising an error if needed.""" 

282 return _retrieve_traceback_capturing_wrapped_call(out) 

283 

284 def abort_everything(self, ensure_ready=True): 

285 """Shutdown the pool and restart a new one with the same parameters""" 

286 self.terminate() 

287 if ensure_ready: 

288 self.configure(n_jobs=self.parallel.n_jobs, parallel=self.parallel, 

289 **self.parallel._backend_args) 

290 

291 

292class AutoBatchingMixin(object): 

293 """A helper class for automagically batching jobs.""" 

294 

295 # In seconds, should be big enough to hide multiprocessing dispatching 

296 # overhead. 

297 # This settings was found by running benchmarks/bench_auto_batching.py 

298 # with various parameters on various platforms. 

299 MIN_IDEAL_BATCH_DURATION = .2 

300 

301 # Should not be too high to avoid stragglers: long jobs running alone 

302 # on a single worker while other workers have no work to process any more. 

303 MAX_IDEAL_BATCH_DURATION = 2 

304 

305 # Batching counters default values 

306 _DEFAULT_EFFECTIVE_BATCH_SIZE = 1 

307 _DEFAULT_SMOOTHED_BATCH_DURATION = 0.0 

308 

309 def __init__(self, **kwargs): 

310 super().__init__(**kwargs) 

311 self._effective_batch_size = self._DEFAULT_EFFECTIVE_BATCH_SIZE 

312 self._smoothed_batch_duration = self._DEFAULT_SMOOTHED_BATCH_DURATION 

313 

314 def compute_batch_size(self): 

315 """Determine the optimal batch size""" 

316 old_batch_size = self._effective_batch_size 

317 batch_duration = self._smoothed_batch_duration 

318 if (batch_duration > 0 and 

319 batch_duration < self.MIN_IDEAL_BATCH_DURATION): 

320 # The current batch size is too small: the duration of the 

321 # processing of a batch of task is not large enough to hide 

322 # the scheduling overhead. 

323 ideal_batch_size = int(old_batch_size * 

324 self.MIN_IDEAL_BATCH_DURATION / 

325 batch_duration) 

326 # Multiply by two to limit oscilations between min and max. 

327 ideal_batch_size *= 2 

328 

329 # dont increase the batch size too fast to limit huge batch sizes 

330 # potentially leading to starving worker 

331 batch_size = min(2 * old_batch_size, ideal_batch_size) 

332 

333 batch_size = max(batch_size, 1) 

334 

335 self._effective_batch_size = batch_size 

336 if self.parallel.verbose >= 10: 

337 self.parallel._print( 

338 f"Batch computation too fast ({batch_duration}s.) " 

339 f"Setting batch_size={batch_size}." 

340 ) 

341 elif (batch_duration > self.MAX_IDEAL_BATCH_DURATION and 

342 old_batch_size >= 2): 

343 # The current batch size is too big. If we schedule overly long 

344 # running batches some CPUs might wait with nothing left to do 

345 # while a couple of CPUs a left processing a few long running 

346 # batches. Better reduce the batch size a bit to limit the 

347 # likelihood of scheduling such stragglers. 

348 

349 # decrease the batch size quickly to limit potential starving 

350 ideal_batch_size = int( 

351 old_batch_size * self.MIN_IDEAL_BATCH_DURATION / batch_duration 

352 ) 

353 # Multiply by two to limit oscilations between min and max. 

354 batch_size = max(2 * ideal_batch_size, 1) 

355 self._effective_batch_size = batch_size 

356 if self.parallel.verbose >= 10: 

357 self.parallel._print( 

358 f"Batch computation too slow ({batch_duration}s.) " 

359 f"Setting batch_size={batch_size}." 

360 ) 

361 else: 

362 # No batch size adjustment 

363 batch_size = old_batch_size 

364 

365 if batch_size != old_batch_size: 

366 # Reset estimation of the smoothed mean batch duration: this 

367 # estimate is updated in the multiprocessing apply_async 

368 # CallBack as long as the batch_size is constant. Therefore 

369 # we need to reset the estimate whenever we re-tune the batch 

370 # size. 

371 self._smoothed_batch_duration = \ 

372 self._DEFAULT_SMOOTHED_BATCH_DURATION 

373 

374 return batch_size 

375 

376 def batch_completed(self, batch_size, duration): 

377 """Callback indicate how long it took to run a batch""" 

378 if batch_size == self._effective_batch_size: 

379 # Update the smoothed streaming estimate of the duration of a batch 

380 # from dispatch to completion 

381 old_duration = self._smoothed_batch_duration 

382 if old_duration == self._DEFAULT_SMOOTHED_BATCH_DURATION: 

383 # First record of duration for this batch size after the last 

384 # reset. 

385 new_duration = duration 

386 else: 

387 # Update the exponentially weighted average of the duration of 

388 # batch for the current effective size. 

389 new_duration = 0.8 * old_duration + 0.2 * duration 

390 self._smoothed_batch_duration = new_duration 

391 

392 def reset_batch_stats(self): 

393 """Reset batch statistics to default values. 

394 

395 This avoids interferences with future jobs. 

396 """ 

397 self._effective_batch_size = self._DEFAULT_EFFECTIVE_BATCH_SIZE 

398 self._smoothed_batch_duration = self._DEFAULT_SMOOTHED_BATCH_DURATION 

399 

400 

401class ThreadingBackend(PoolManagerMixin, ParallelBackendBase): 

402 """A ParallelBackend which will use a thread pool to execute batches in. 

403 

404 This is a low-overhead backend but it suffers from the Python Global 

405 Interpreter Lock if the called function relies a lot on Python objects. 

406 Mostly useful when the execution bottleneck is a compiled extension that 

407 explicitly releases the GIL (for instance a Cython loop wrapped in a "with 

408 nogil" block or an expensive call to a library such as NumPy). 

409 

410 The actual thread pool is lazily initialized: the actual thread pool 

411 construction is delayed to the first call to apply_async. 

412 

413 ThreadingBackend is used as the default backend for nested calls. 

414 """ 

415 

416 supports_retrieve_callback = True 

417 uses_threads = True 

418 supports_sharedmem = True 

419 

420 def configure(self, n_jobs=1, parallel=None, **backend_args): 

421 """Build a process or thread pool and return the number of workers""" 

422 n_jobs = self.effective_n_jobs(n_jobs) 

423 if n_jobs == 1: 

424 # Avoid unnecessary overhead and use sequential backend instead. 

425 raise FallbackToBackend( 

426 SequentialBackend(nesting_level=self.nesting_level)) 

427 self.parallel = parallel 

428 self._n_jobs = n_jobs 

429 return n_jobs 

430 

431 def _get_pool(self): 

432 """Lazily initialize the thread pool 

433 

434 The actual pool of worker threads is only initialized at the first 

435 call to apply_async. 

436 """ 

437 if self._pool is None: 

438 self._pool = ThreadPool(self._n_jobs) 

439 return self._pool 

440 

441 

442class MultiprocessingBackend(PoolManagerMixin, AutoBatchingMixin, 

443 ParallelBackendBase): 

444 """A ParallelBackend which will use a multiprocessing.Pool. 

445 

446 Will introduce some communication and memory overhead when exchanging 

447 input and output data with the with the worker Python processes. 

448 However, does not suffer from the Python Global Interpreter Lock. 

449 """ 

450 

451 supports_retrieve_callback = True 

452 supports_return_generator = False 

453 

454 def effective_n_jobs(self, n_jobs): 

455 """Determine the number of jobs which are going to run in parallel. 

456 

457 This also checks if we are attempting to create a nested parallel 

458 loop. 

459 """ 

460 if mp is None: 

461 return 1 

462 

463 if mp.current_process().daemon: 

464 # Daemonic processes cannot have children 

465 if n_jobs != 1: 

466 if inside_dask_worker(): 

467 msg = ( 

468 "Inside a Dask worker with daemon=True, " 

469 "setting n_jobs=1.\nPossible work-arounds:\n" 

470 "- dask.config.set(" 

471 "{'distributed.worker.daemon': False})" 

472 "- set the environment variable " 

473 "DASK_DISTRIBUTED__WORKER__DAEMON=False\n" 

474 "before creating your Dask cluster." 

475 ) 

476 else: 

477 msg = ( 

478 'Multiprocessing-backed parallel loops ' 

479 'cannot be nested, setting n_jobs=1' 

480 ) 

481 warnings.warn(msg, stacklevel=3) 

482 return 1 

483 

484 if process_executor._CURRENT_DEPTH > 0: 

485 # Mixing loky and multiprocessing in nested loop is not supported 

486 if n_jobs != 1: 

487 warnings.warn( 

488 'Multiprocessing-backed parallel loops cannot be nested,' 

489 ' below loky, setting n_jobs=1', 

490 stacklevel=3) 

491 return 1 

492 

493 elif not (self.in_main_thread() or self.nesting_level == 0): 

494 # Prevent posix fork inside in non-main posix threads 

495 if n_jobs != 1: 

496 warnings.warn( 

497 'Multiprocessing-backed parallel loops cannot be nested' 

498 ' below threads, setting n_jobs=1', 

499 stacklevel=3) 

500 return 1 

501 

502 return super(MultiprocessingBackend, self).effective_n_jobs(n_jobs) 

503 

504 def configure(self, n_jobs=1, parallel=None, prefer=None, require=None, 

505 **memmappingpool_args): 

506 """Build a process or thread pool and return the number of workers""" 

507 n_jobs = self.effective_n_jobs(n_jobs) 

508 if n_jobs == 1: 

509 raise FallbackToBackend( 

510 SequentialBackend(nesting_level=self.nesting_level)) 

511 

512 # Make sure to free as much memory as possible before forking 

513 gc.collect() 

514 self._pool = MemmappingPool(n_jobs, **memmappingpool_args) 

515 self.parallel = parallel 

516 return n_jobs 

517 

518 def terminate(self): 

519 """Shutdown the process or thread pool""" 

520 super(MultiprocessingBackend, self).terminate() 

521 self.reset_batch_stats() 

522 

523 

524class LokyBackend(AutoBatchingMixin, ParallelBackendBase): 

525 """Managing pool of workers with loky instead of multiprocessing.""" 

526 

527 supports_retrieve_callback = True 

528 supports_inner_max_num_threads = True 

529 

530 def configure(self, n_jobs=1, parallel=None, prefer=None, require=None, 

531 idle_worker_timeout=300, **memmappingexecutor_args): 

532 """Build a process executor and return the number of workers""" 

533 n_jobs = self.effective_n_jobs(n_jobs) 

534 if n_jobs == 1: 

535 raise FallbackToBackend( 

536 SequentialBackend(nesting_level=self.nesting_level)) 

537 

538 self._workers = get_memmapping_executor( 

539 n_jobs, timeout=idle_worker_timeout, 

540 env=self._prepare_worker_env(n_jobs=n_jobs), 

541 context_id=parallel._id, **memmappingexecutor_args) 

542 self.parallel = parallel 

543 return n_jobs 

544 

545 def effective_n_jobs(self, n_jobs): 

546 """Determine the number of jobs which are going to run in parallel""" 

547 if n_jobs == 0: 

548 raise ValueError('n_jobs == 0 in Parallel has no meaning') 

549 elif mp is None or n_jobs is None: 

550 # multiprocessing is not available or disabled, fallback 

551 # to sequential mode 

552 return 1 

553 elif mp.current_process().daemon: 

554 # Daemonic processes cannot have children 

555 if n_jobs != 1: 

556 if inside_dask_worker(): 

557 msg = ( 

558 "Inside a Dask worker with daemon=True, " 

559 "setting n_jobs=1.\nPossible work-arounds:\n" 

560 "- dask.config.set(" 

561 "{'distributed.worker.daemon': False})\n" 

562 "- set the environment variable " 

563 "DASK_DISTRIBUTED__WORKER__DAEMON=False\n" 

564 "before creating your Dask cluster." 

565 ) 

566 else: 

567 msg = ( 

568 'Loky-backed parallel loops cannot be called in a' 

569 ' multiprocessing, setting n_jobs=1' 

570 ) 

571 warnings.warn(msg, stacklevel=3) 

572 

573 return 1 

574 elif not (self.in_main_thread() or self.nesting_level == 0): 

575 # Prevent posix fork inside in non-main posix threads 

576 if n_jobs != 1: 

577 warnings.warn( 

578 'Loky-backed parallel loops cannot be nested below ' 

579 'threads, setting n_jobs=1', 

580 stacklevel=3) 

581 return 1 

582 elif n_jobs < 0: 

583 n_jobs = max(cpu_count() + 1 + n_jobs, 1) 

584 return n_jobs 

585 

586 def apply_async(self, func, callback=None): 

587 """Schedule a func to be run""" 

588 future = self._workers.submit(func) 

589 if callback is not None: 

590 future.add_done_callback(callback) 

591 return future 

592 

593 def retrieve_result_callback(self, out): 

594 try: 

595 return out.result() 

596 except ShutdownExecutorError: 

597 raise RuntimeError( 

598 "The executor underlying Parallel has been shutdown. " 

599 "This is likely due to the garbage collection of a previous " 

600 "generator from a call to Parallel with return_as='generator'." 

601 " Make sure the generator is not garbage collected when " 

602 "submitting a new job or that it is first properly exhausted." 

603 ) 

604 

605 def terminate(self): 

606 if self._workers is not None: 

607 # Don't terminate the workers as we want to reuse them in later 

608 # calls, but cleanup the temporary resources that the Parallel call 

609 # created. This 'hack' requires a private, low-level operation. 

610 self._workers._temp_folder_manager._clean_temporary_resources( 

611 context_id=self.parallel._id, force=False 

612 ) 

613 self._workers = None 

614 

615 self.reset_batch_stats() 

616 

617 def abort_everything(self, ensure_ready=True): 

618 """Shutdown the workers and restart a new one with the same parameters 

619 """ 

620 self._workers.terminate(kill_workers=True) 

621 self._workers = None 

622 

623 if ensure_ready: 

624 self.configure(n_jobs=self.parallel.n_jobs, parallel=self.parallel) 

625 

626 

627class FallbackToBackend(Exception): 

628 """Raised when configuration should fallback to another backend""" 

629 

630 def __init__(self, backend): 

631 self.backend = backend 

632 

633 

634def inside_dask_worker(): 

635 """Check whether the current function is executed inside a Dask worker. 

636 """ 

637 # This function can not be in joblib._dask because there would be a 

638 # circular import: 

639 # _dask imports _parallel_backend that imports _dask ... 

640 try: 

641 from distributed import get_worker 

642 except ImportError: 

643 return False 

644 

645 try: 

646 get_worker() 

647 return True 

648 except ValueError: 

649 return False