Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/joblib/externals/loky/process_executor.py: 20%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

569 statements  

1############################################################################### 

2# Re-implementation of the ProcessPoolExecutor more robust to faults 

3# 

4# author: Thomas Moreau and Olivier Grisel 

5# 

6# adapted from concurrent/futures/process_pool_executor.py (17/02/2017) 

7# * Add an extra management thread to detect executor_manager_thread failures, 

8# * Improve the shutdown process to avoid deadlocks, 

9# * Add timeout for workers, 

10# * More robust pickling process. 

11# 

12# Copyright 2009 Brian Quinlan. All Rights Reserved. 

13# Licensed to PSF under a Contributor Agreement. 

14 

15"""Implements ProcessPoolExecutor. 

16 

17The follow diagram and text describe the data-flow through the system: 

18 

19|======================= In-process =====================|== Out-of-process ==| 

20 

21+----------+ +----------+ +--------+ +-----------+ +---------+ 

22| | => | Work Ids | | | | Call Q | | Process | 

23| | +----------+ | | +-----------+ | Pool | 

24| | | ... | | | | ... | +---------+ 

25| | | 6 | => | | => | 5, call() | => | | 

26| | | 7 | | | | ... | | | 

27| Process | | ... | | Local | +-----------+ | Process | 

28| Pool | +----------+ | Worker | | #1..n | 

29| Executor | | Thread | | | 

30| | +----------- + | | +-----------+ | | 

31| | <=> | Work Items | <=> | | <= | Result Q | <= | | 

32| | +------------+ | | +-----------+ | | 

33| | | 6: call() | | | | ... | | | 

34| | | future | +--------+ | 4, result | | | 

35| | | ... | | 3, except | | | 

36+----------+ +------------+ +-----------+ +---------+ 

37 

38Executor.submit() called: 

39- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict 

40- adds the id of the _WorkItem to the "Work Ids" queue 

41 

42Local worker thread: 

43- reads work ids from the "Work Ids" queue and looks up the corresponding 

44 WorkItem from the "Work Items" dict: if the work item has been cancelled then 

45 it is simply removed from the dict, otherwise it is repackaged as a 

46 _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q" 

47 until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because 

48 calls placed in the "Call Q" can no longer be cancelled with Future.cancel(). 

49- reads _ResultItems from "Result Q", updates the future stored in the 

50 "Work Items" dict and deletes the dict entry 

51 

52Process #1..n: 

53- reads _CallItems from "Call Q", executes the calls, and puts the resulting 

54 _ResultItems in "Result Q" 

55""" 

56 

57 

58__author__ = "Thomas Moreau (thomas.moreau.2010@gmail.com)" 

59 

60 

61import faulthandler 

62import os 

63import gc 

64import sys 

65import queue 

66import struct 

67import weakref 

68import warnings 

69import itertools 

70import traceback 

71import threading 

72from time import time, sleep 

73import multiprocessing as mp 

74from functools import partial 

75from pickle import PicklingError 

76from concurrent.futures import Executor 

77from concurrent.futures._base import LOGGER 

78from concurrent.futures.process import BrokenProcessPool as _BPPException 

79from multiprocessing.connection import wait 

80 

81from ._base import Future 

82from .backend import get_context 

83from .backend.context import cpu_count, _MAX_WINDOWS_WORKERS 

84from .backend.queues import Queue, SimpleQueue 

85from .backend.reduction import set_loky_pickler, get_loky_pickler_name 

86from .backend.utils import kill_process_tree, get_exitcodes_terminated_worker 

87from .initializers import _prepare_initializer 

88 

89 

90# Mechanism to prevent infinite process spawning. When a worker of a 

91# ProcessPoolExecutor nested in MAX_DEPTH Executor tries to create a new 

92# Executor, a LokyRecursionError is raised 

93MAX_DEPTH = int(os.environ.get("LOKY_MAX_DEPTH", 10)) 

94_CURRENT_DEPTH = 0 

95 

96# Minimum time interval between two consecutive memory leak protection checks. 

97_MEMORY_LEAK_CHECK_DELAY = 1.0 

98 

99# Number of bytes of memory usage allowed over the reference process size. 

100_MAX_MEMORY_LEAK_SIZE = int(3e8) 

101 

102 

103try: 

104 from psutil import Process 

105 

106 _USE_PSUTIL = True 

107 

108 def _get_memory_usage(pid, force_gc=False): 

109 if force_gc: 

110 gc.collect() 

111 

112 mem_size = Process(pid).memory_info().rss 

113 mp.util.debug(f"psutil return memory size: {mem_size}") 

114 return mem_size 

115 

116except ImportError: 

117 _USE_PSUTIL = False 

118 

119 

120class _ThreadWakeup: 

121 def __init__(self): 

122 self._closed = False 

123 self._reader, self._writer = mp.Pipe(duplex=False) 

124 

125 def close(self): 

126 if not self._closed: 

127 self._closed = True 

128 self._writer.close() 

129 self._reader.close() 

130 

131 def wakeup(self): 

132 if not self._closed: 

133 self._writer.send_bytes(b"") 

134 

135 def clear(self): 

136 if not self._closed: 

137 while self._reader.poll(): 

138 self._reader.recv_bytes() 

139 

140 

141class _ExecutorFlags: 

142 """necessary references to maintain executor states without preventing gc 

143 

144 It permits to keep the information needed by executor_manager_thread 

145 and crash_detection_thread to maintain the pool without preventing the 

146 garbage collection of unreferenced executors. 

147 """ 

148 

149 def __init__(self, shutdown_lock): 

150 

151 self.shutdown = False 

152 self.broken = None 

153 self.kill_workers = False 

154 self.shutdown_lock = shutdown_lock 

155 

156 def flag_as_shutting_down(self, kill_workers=None): 

157 with self.shutdown_lock: 

158 self.shutdown = True 

159 if kill_workers is not None: 

160 self.kill_workers = kill_workers 

161 

162 def flag_as_broken(self, broken): 

163 with self.shutdown_lock: 

164 self.shutdown = True 

165 self.broken = broken 

166 

167 

168# Prior to 3.9, executor_manager_thread is created as daemon thread. This means 

169# that it is not joined automatically when the interpreter is shutting down. 

170# To work around this problem, an exit handler is installed to tell the 

171# thread to exit when the interpreter is shutting down and then waits until 

172# it finishes. The thread needs to be daemonized because the atexit hooks are 

173# called after all non daemonized threads are joined. 

174# 

175# Starting 3.9, there exists a specific atexit hook to be called before joining 

176# the threads so the executor_manager_thread does not need to be daemonized 

177# anymore. 

178# 

179# The atexit hooks are registered when starting the first ProcessPoolExecutor 

180# to avoid import having an effect on the interpreter. 

181 

182_global_shutdown = False 

183_global_shutdown_lock = threading.Lock() 

184_threads_wakeups = weakref.WeakKeyDictionary() 

185 

186 

187def _python_exit(): 

188 global _global_shutdown 

189 _global_shutdown = True 

190 

191 # Materialize the list of items to avoid error due to iterating over 

192 # changing size dictionary. 

193 items = list(_threads_wakeups.items()) 

194 if len(items) > 0: 

195 mp.util.debug( 

196 f"Interpreter shutting down. Waking up {len(items)}" 

197 f"executor_manager_thread:\n{items}" 

198 ) 

199 

200 # Wake up the executor_manager_thread's so they can detect the interpreter 

201 # is shutting down and exit. 

202 for _, (shutdown_lock, thread_wakeup) in items: 

203 with shutdown_lock: 

204 thread_wakeup.wakeup() 

205 

206 # Collect the executor_manager_thread's to make sure we exit cleanly. 

207 for thread, _ in items: 

208 # This locks is to prevent situations where an executor is gc'ed in one 

209 # thread while the atexit finalizer is running in another thread. 

210 with _global_shutdown_lock: 

211 thread.join() 

212 

213 

214# With the fork context, _thread_wakeups is propagated to children. 

215# Clear it after fork to avoid some situation that can cause some 

216# freeze when joining the workers. 

217mp.util.register_after_fork(_threads_wakeups, lambda obj: obj.clear()) 

218 

219 

220# Module variable to register the at_exit call 

221process_pool_executor_at_exit = None 

222 

223# Controls how many more calls than processes will be queued in the call queue. 

224# A smaller number will mean that processes spend more time idle waiting for 

225# work while a larger number will make Future.cancel() succeed less frequently 

226# (Futures in the call queue cannot be cancelled). 

227EXTRA_QUEUED_CALLS = 1 

228 

229 

230class _RemoteTraceback(Exception): 

231 """Embed stringification of remote traceback in local traceback""" 

232 

233 def __init__(self, tb=None): 

234 self.tb = f'\n"""\n{tb}"""' 

235 

236 def __str__(self): 

237 return self.tb 

238 

239 

240# Do not inherit from BaseException to mirror 

241# concurrent.futures.process._ExceptionWithTraceback 

242class _ExceptionWithTraceback: 

243 def __init__(self, exc): 

244 tb = getattr(exc, "__traceback__", None) 

245 if tb is None: 

246 _, _, tb = sys.exc_info() 

247 tb = traceback.format_exception(type(exc), exc, tb) 

248 tb = "".join(tb) 

249 self.exc = exc 

250 self.tb = tb 

251 

252 def __reduce__(self): 

253 return _rebuild_exc, (self.exc, self.tb) 

254 

255 

256def _rebuild_exc(exc, tb): 

257 exc.__cause__ = _RemoteTraceback(tb) 

258 return exc 

259 

260 

261class _WorkItem: 

262 

263 __slots__ = ["future", "fn", "args", "kwargs"] 

264 

265 def __init__(self, future, fn, args, kwargs): 

266 self.future = future 

267 self.fn = fn 

268 self.args = args 

269 self.kwargs = kwargs 

270 

271 

272class _ResultItem: 

273 def __init__(self, work_id, exception=None, result=None): 

274 self.work_id = work_id 

275 self.exception = exception 

276 self.result = result 

277 

278 

279class _CallItem: 

280 def __init__(self, work_id, fn, args, kwargs): 

281 self.work_id = work_id 

282 self.fn = fn 

283 self.args = args 

284 self.kwargs = kwargs 

285 

286 # Store the current loky_pickler so it is correctly set in the worker 

287 self.loky_pickler = get_loky_pickler_name() 

288 

289 def __call__(self): 

290 set_loky_pickler(self.loky_pickler) 

291 return self.fn(*self.args, **self.kwargs) 

292 

293 def __repr__(self): 

294 return ( 

295 f"CallItem({self.work_id}, {self.fn}, {self.args}, {self.kwargs})" 

296 ) 

297 

298 

299class _SafeQueue(Queue): 

300 """Safe Queue set exception to the future object linked to a job""" 

301 

302 def __init__( 

303 self, 

304 max_size=0, 

305 ctx=None, 

306 pending_work_items=None, 

307 running_work_items=None, 

308 thread_wakeup=None, 

309 shutdown_lock=None, 

310 reducers=None, 

311 ): 

312 self.thread_wakeup = thread_wakeup 

313 self.shutdown_lock = shutdown_lock 

314 self.pending_work_items = pending_work_items 

315 self.running_work_items = running_work_items 

316 super().__init__(max_size, reducers=reducers, ctx=ctx) 

317 

318 def _on_queue_feeder_error(self, e, obj): 

319 if isinstance(obj, _CallItem): 

320 # format traceback only works on python3 

321 if isinstance(e, struct.error): 

322 raised_error = RuntimeError( 

323 "The task could not be sent to the workers as it is too " 

324 "large for `send_bytes`." 

325 ) 

326 else: 

327 raised_error = PicklingError( 

328 "Could not pickle the task to send it to the workers." 

329 ) 

330 tb = traceback.format_exception( 

331 type(e), e, getattr(e, "__traceback__", None) 

332 ) 

333 raised_error.__cause__ = _RemoteTraceback("".join(tb)) 

334 work_item = self.pending_work_items.pop(obj.work_id, None) 

335 self.running_work_items.remove(obj.work_id) 

336 # work_item can be None if another process terminated. In this 

337 # case, the executor_manager_thread fails all work_items with 

338 # BrokenProcessPool 

339 if work_item is not None: 

340 work_item.future.set_exception(raised_error) 

341 del work_item 

342 with self.shutdown_lock: 

343 self.thread_wakeup.wakeup() 

344 else: 

345 super()._on_queue_feeder_error(e, obj) 

346 

347 

348def _get_chunks(chunksize, *iterables): 

349 """Iterates over zip()ed iterables in chunks.""" 

350 it = zip(*iterables) 

351 while True: 

352 chunk = tuple(itertools.islice(it, chunksize)) 

353 if not chunk: 

354 return 

355 yield chunk 

356 

357 

358def _process_chunk(fn, chunk): 

359 """Processes a chunk of an iterable passed to map. 

360 

361 Runs the function passed to map() on a chunk of the 

362 iterable passed to map. 

363 

364 This function is run in a separate process. 

365 

366 """ 

367 return [fn(*args) for args in chunk] 

368 

369 

370def _sendback_result(result_queue, work_id, result=None, exception=None): 

371 """Safely send back the given result or exception""" 

372 try: 

373 result_queue.put( 

374 _ResultItem(work_id, result=result, exception=exception) 

375 ) 

376 except BaseException as e: 

377 exc = _ExceptionWithTraceback(e) 

378 result_queue.put(_ResultItem(work_id, exception=exc)) 

379 

380 

381def _enable_faulthandler_if_needed(): 

382 if "PYTHONFAULTHANDLER" in os.environ: 

383 # Respect the environment variable to configure faulthandler. This 

384 # makes it possible to never enable faulthandler in the loky workers by 

385 # setting PYTHONFAULTHANDLER=0 explicitly in the environment. 

386 mp.util.debug( 

387 f"faulthandler explicitly configured by environment variable: " 

388 f"PYTHONFAULTHANDLER={os.environ['PYTHONFAULTHANDLER']}." 

389 ) 

390 else: 

391 if faulthandler.is_enabled(): 

392 # Fault handler is already enabled, possibly via a custom 

393 # initializer to customize the behavior. 

394 mp.util.debug("faulthandler already enabled.") 

395 else: 

396 # Enable faulthandler by default with default paramaters otherwise. 

397 mp.util.debug( 

398 "Enabling faulthandler to report tracebacks on worker crashes." 

399 ) 

400 faulthandler.enable() 

401 

402 

403def _process_worker( 

404 call_queue, 

405 result_queue, 

406 initializer, 

407 initargs, 

408 processes_management_lock, 

409 timeout, 

410 worker_exit_lock, 

411 current_depth, 

412): 

413 """Evaluates calls from call_queue and places the results in result_queue. 

414 

415 This worker is run in a separate process. 

416 

417 Args: 

418 call_queue: A ctx.Queue of _CallItems that will be read and 

419 evaluated by the worker. 

420 result_queue: A ctx.Queue of _ResultItems that will written 

421 to by the worker. 

422 initializer: A callable initializer, or None 

423 initargs: A tuple of args for the initializer 

424 processes_management_lock: A ctx.Lock avoiding worker timeout while 

425 some workers are being spawned. 

426 timeout: maximum time to wait for a new item in the call_queue. If that 

427 time is expired, the worker will shutdown. 

428 worker_exit_lock: Lock to avoid flagging the executor as broken on 

429 workers timeout. 

430 current_depth: Nested parallelism level, to avoid infinite spawning. 

431 """ 

432 if initializer is not None: 

433 try: 

434 initializer(*initargs) 

435 except BaseException: 

436 LOGGER.critical("Exception in initializer:", exc_info=True) 

437 # The parent will notice that the process stopped and 

438 # mark the pool broken 

439 return 

440 

441 # set the global _CURRENT_DEPTH mechanism to limit recursive call 

442 global _CURRENT_DEPTH 

443 _CURRENT_DEPTH = current_depth 

444 _process_reference_size = None 

445 _last_memory_leak_check = None 

446 pid = os.getpid() 

447 

448 mp.util.debug(f"Worker started with timeout={timeout}") 

449 _enable_faulthandler_if_needed() 

450 

451 while True: 

452 try: 

453 call_item = call_queue.get(block=True, timeout=timeout) 

454 if call_item is None: 

455 mp.util.info("Shutting down worker on sentinel") 

456 except queue.Empty: 

457 mp.util.info(f"Shutting down worker after timeout {timeout:0.3f}s") 

458 if processes_management_lock.acquire(block=False): 

459 processes_management_lock.release() 

460 call_item = None 

461 else: 

462 mp.util.info("Could not acquire processes_management_lock") 

463 continue 

464 except BaseException: 

465 previous_tb = traceback.format_exc() 

466 try: 

467 result_queue.put(_RemoteTraceback(previous_tb)) 

468 except BaseException: 

469 # If we cannot format correctly the exception, at least print 

470 # the traceback. 

471 print(previous_tb) 

472 mp.util.debug("Exiting with code 1") 

473 sys.exit(1) 

474 if call_item is None: 

475 # Notify queue management thread about worker shutdown 

476 result_queue.put(pid) 

477 is_clean = worker_exit_lock.acquire(True, timeout=30) 

478 

479 # Early notify any loky executor running in this worker process 

480 # (nested parallelism) that this process is about to shutdown to 

481 # avoid a deadlock waiting undifinitely for the worker to finish. 

482 _python_exit() 

483 

484 if is_clean: 

485 mp.util.debug("Exited cleanly") 

486 else: 

487 mp.util.info("Main process did not release worker_exit") 

488 return 

489 try: 

490 r = call_item() 

491 except BaseException as e: 

492 exc = _ExceptionWithTraceback(e) 

493 result_queue.put(_ResultItem(call_item.work_id, exception=exc)) 

494 else: 

495 _sendback_result(result_queue, call_item.work_id, result=r) 

496 del r 

497 

498 # Free the resource as soon as possible, to avoid holding onto 

499 # open files or shared memory that is not needed anymore 

500 del call_item 

501 

502 if _USE_PSUTIL: 

503 if _process_reference_size is None: 

504 # Make reference measurement after the first call 

505 _process_reference_size = _get_memory_usage(pid, force_gc=True) 

506 _last_memory_leak_check = time() 

507 continue 

508 if time() - _last_memory_leak_check > _MEMORY_LEAK_CHECK_DELAY: 

509 mem_usage = _get_memory_usage(pid) 

510 _last_memory_leak_check = time() 

511 if mem_usage - _process_reference_size < _MAX_MEMORY_LEAK_SIZE: 

512 # Memory usage stays within bounds: everything is fine. 

513 continue 

514 

515 # Check again memory usage; this time take the measurement 

516 # after a forced garbage collection to break any reference 

517 # cycles. 

518 mem_usage = _get_memory_usage(pid, force_gc=True) 

519 _last_memory_leak_check = time() 

520 if mem_usage - _process_reference_size < _MAX_MEMORY_LEAK_SIZE: 

521 # The GC managed to free the memory: everything is fine. 

522 continue 

523 

524 # The process is leaking memory: let the master process 

525 # know that we need to start a new worker. 

526 mp.util.info("Memory leak detected: shutting down worker") 

527 result_queue.put(pid) 

528 with worker_exit_lock: 

529 mp.util.debug("Exit due to memory leak") 

530 return 

531 else: 

532 # if psutil is not installed, trigger gc.collect events 

533 # regularly to limit potential memory leaks due to reference cycles 

534 if _last_memory_leak_check is None or ( 

535 time() - _last_memory_leak_check > _MEMORY_LEAK_CHECK_DELAY 

536 ): 

537 gc.collect() 

538 _last_memory_leak_check = time() 

539 

540 

541class _ExecutorManagerThread(threading.Thread): 

542 """Manages the communication between this process and the worker processes. 

543 

544 The manager is run in a local thread. 

545 

546 Args: 

547 executor: A reference to the ProcessPoolExecutor that owns 

548 this thread. A weakref will be own by the manager as well as 

549 references to internal objects used to introspect the state of 

550 the executor. 

551 """ 

552 

553 def __init__(self, executor): 

554 # Store references to necessary internals of the executor. 

555 

556 # A _ThreadWakeup to allow waking up the executor_manager_thread from 

557 # the main Thread and avoid deadlocks caused by permanently 

558 # locked queues. 

559 self.thread_wakeup = executor._executor_manager_thread_wakeup 

560 self.shutdown_lock = executor._shutdown_lock 

561 

562 # A weakref.ref to the ProcessPoolExecutor that owns this thread. Used 

563 # to determine if the ProcessPoolExecutor has been garbage collected 

564 # and that the manager can exit. 

565 # When the executor gets garbage collected, the weakref callback 

566 # will wake up the queue management thread so that it can terminate 

567 # if there is no pending work item. 

568 def weakref_cb( 

569 _, 

570 thread_wakeup=self.thread_wakeup, 

571 shutdown_lock=self.shutdown_lock, 

572 ): 

573 if mp is not None: 

574 # At this point, the multiprocessing module can already be 

575 # garbage collected. We only log debug info when still 

576 # possible. 

577 mp.util.debug( 

578 "Executor collected: triggering callback for" 

579 " QueueManager wakeup" 

580 ) 

581 with shutdown_lock: 

582 thread_wakeup.wakeup() 

583 

584 self.executor_reference = weakref.ref(executor, weakref_cb) 

585 

586 # The flags of the executor 

587 self.executor_flags = executor._flags 

588 

589 # A list of the ctx.Process instances used as workers. 

590 self.processes = executor._processes 

591 

592 # A ctx.Queue that will be filled with _CallItems derived from 

593 # _WorkItems for processing by the process workers. 

594 self.call_queue = executor._call_queue 

595 

596 # A ctx.SimpleQueue of _ResultItems generated by the process workers. 

597 self.result_queue = executor._result_queue 

598 

599 # A queue.Queue of work ids e.g. Queue([5, 6, ...]). 

600 self.work_ids_queue = executor._work_ids 

601 

602 # A dict mapping work ids to _WorkItems e.g. 

603 # {5: <_WorkItem...>, 6: <_WorkItem...>, ...} 

604 self.pending_work_items = executor._pending_work_items 

605 

606 # A list of the work_ids that are currently running 

607 self.running_work_items = executor._running_work_items 

608 

609 # A lock to avoid concurrent shutdown of workers on timeout and spawn 

610 # of new processes or shut down 

611 self.processes_management_lock = executor._processes_management_lock 

612 

613 super().__init__(name="ExecutorManagerThread") 

614 if sys.version_info < (3, 9): 

615 self.daemon = True 

616 

617 def run(self): 

618 # Main loop for the executor manager thread. 

619 

620 while True: 

621 self.add_call_item_to_queue() 

622 

623 result_item, is_broken, bpe = self.wait_result_broken_or_wakeup() 

624 

625 if is_broken: 

626 self.terminate_broken(bpe) 

627 return 

628 if result_item is not None: 

629 self.process_result_item(result_item) 

630 # Delete reference to result_item to avoid keeping references 

631 # while waiting on new results. 

632 del result_item 

633 

634 if self.is_shutting_down(): 

635 self.flag_executor_shutting_down() 

636 

637 # Since no new work items can be added, it is safe to shutdown 

638 # this thread if there are no pending work items. 

639 if not self.pending_work_items: 

640 self.join_executor_internals() 

641 return 

642 

643 def add_call_item_to_queue(self): 

644 # Fills call_queue with _WorkItems from pending_work_items. 

645 # This function never blocks. 

646 while True: 

647 if self.call_queue.full(): 

648 return 

649 try: 

650 work_id = self.work_ids_queue.get(block=False) 

651 except queue.Empty: 

652 return 

653 else: 

654 work_item = self.pending_work_items[work_id] 

655 

656 if work_item.future.set_running_or_notify_cancel(): 

657 self.running_work_items += [work_id] 

658 self.call_queue.put( 

659 _CallItem( 

660 work_id, 

661 work_item.fn, 

662 work_item.args, 

663 work_item.kwargs, 

664 ), 

665 block=True, 

666 ) 

667 else: 

668 del self.pending_work_items[work_id] 

669 continue 

670 

671 def wait_result_broken_or_wakeup(self): 

672 # Wait for a result to be ready in the result_queue while checking 

673 # that all worker processes are still running, or for a wake up 

674 # signal send. The wake up signals come either from new tasks being 

675 # submitted, from the executor being shutdown/gc-ed, or from the 

676 # shutdown of the python interpreter. 

677 result_reader = self.result_queue._reader 

678 wakeup_reader = self.thread_wakeup._reader 

679 readers = [result_reader, wakeup_reader] 

680 worker_sentinels = [p.sentinel for p in list(self.processes.values())] 

681 ready = wait(readers + worker_sentinels) 

682 

683 bpe = None 

684 is_broken = True 

685 result_item = None 

686 if result_reader in ready: 

687 try: 

688 result_item = result_reader.recv() 

689 if isinstance(result_item, _RemoteTraceback): 

690 bpe = BrokenProcessPool( 

691 "A task has failed to un-serialize. Please ensure that" 

692 " the arguments of the function are all picklable." 

693 ) 

694 bpe.__cause__ = result_item 

695 else: 

696 is_broken = False 

697 except BaseException as e: 

698 bpe = BrokenProcessPool( 

699 "A result has failed to un-serialize. Please ensure that " 

700 "the objects returned by the function are always " 

701 "picklable." 

702 ) 

703 tb = traceback.format_exception( 

704 type(e), e, getattr(e, "__traceback__", None) 

705 ) 

706 bpe.__cause__ = _RemoteTraceback("".join(tb)) 

707 

708 elif wakeup_reader in ready: 

709 # This is simply a wake-up event that might either trigger putting 

710 # more tasks in the queue or trigger the clean up of resources. 

711 is_broken = False 

712 else: 

713 # A worker has terminated and we don't know why, set the state of 

714 # the executor as broken 

715 exit_codes = "" 

716 if sys.platform != "win32": 

717 # In Windows, introspecting terminated workers exitcodes seems 

718 # unstable, therefore they are not appended in the exception 

719 # message. 

720 exit_codes = ( 

721 "\nThe exit codes of the workers are " 

722 f"{get_exitcodes_terminated_worker(self.processes)}" 

723 ) 

724 mp.util.debug( 

725 "A worker unexpectedly terminated. Workers that " 

726 "might have caused the breakage: " 

727 + str( 

728 { 

729 p.name: p.exitcode 

730 for p in list(self.processes.values()) 

731 if p is not None and p.sentinel in ready 

732 } 

733 ) 

734 ) 

735 bpe = TerminatedWorkerError( 

736 "A worker process managed by the executor was unexpectedly " 

737 "terminated. This could be caused by a segmentation fault " 

738 "while calling the function or by an excessive memory usage " 

739 "causing the Operating System to kill the worker.\n" 

740 f"{exit_codes}\n" 

741 "Detailed tracebacks of the workers should have been printed " 

742 "to stderr in the executor process if faulthandler was not " 

743 "disabled." 

744 ) 

745 

746 self.thread_wakeup.clear() 

747 

748 return result_item, is_broken, bpe 

749 

750 def process_result_item(self, result_item): 

751 # Process the received a result_item. This can be either the PID of a 

752 # worker that exited gracefully or a _ResultItem 

753 

754 if isinstance(result_item, int): 

755 # Clean shutdown of a worker using its PID, either on request 

756 # by the executor.shutdown method or by the timeout of the worker 

757 # itself: we should not mark the executor as broken. 

758 with self.processes_management_lock: 

759 p = self.processes.pop(result_item, None) 

760 

761 # p can be None if the executor is concurrently shutting down. 

762 if p is not None: 

763 p._worker_exit_lock.release() 

764 mp.util.debug( 

765 f"joining {p.name} when processing {p.pid} as result_item" 

766 ) 

767 p.join() 

768 del p 

769 

770 # Make sure the executor have the right number of worker, even if a 

771 # worker timeout while some jobs were submitted. If some work is 

772 # pending or there is less processes than running items, we need to 

773 # start a new Process and raise a warning. 

774 n_pending = len(self.pending_work_items) 

775 n_running = len(self.running_work_items) 

776 if n_pending - n_running > 0 or n_running > len(self.processes): 

777 executor = self.executor_reference() 

778 if ( 

779 executor is not None 

780 and len(self.processes) < executor._max_workers 

781 ): 

782 warnings.warn( 

783 "A worker stopped while some jobs were given to the " 

784 "executor. This can be caused by a too short worker " 

785 "timeout or by a memory leak.", 

786 UserWarning, 

787 ) 

788 with executor._processes_management_lock: 

789 executor._adjust_process_count() 

790 executor = None 

791 else: 

792 # Received a _ResultItem so mark the future as completed. 

793 work_item = self.pending_work_items.pop(result_item.work_id, None) 

794 # work_item can be None if another process terminated (see above) 

795 if work_item is not None: 

796 if result_item.exception: 

797 work_item.future.set_exception(result_item.exception) 

798 else: 

799 work_item.future.set_result(result_item.result) 

800 self.running_work_items.remove(result_item.work_id) 

801 

802 def is_shutting_down(self): 

803 # Check whether we should start shutting down the executor. 

804 executor = self.executor_reference() 

805 # No more work items can be added if: 

806 # - The interpreter is shutting down OR 

807 # - The executor that owns this thread is not broken AND 

808 # * The executor that owns this worker has been collected OR 

809 # * The executor that owns this worker has been shutdown. 

810 # If the executor is broken, it should be detected in the next loop. 

811 return _global_shutdown or ( 

812 (executor is None or self.executor_flags.shutdown) 

813 and not self.executor_flags.broken 

814 ) 

815 

816 def terminate_broken(self, bpe): 

817 # Terminate the executor because it is in a broken state. The bpe 

818 # argument can be used to display more information on the error that 

819 # lead the executor into becoming broken. 

820 

821 # Mark the process pool broken so that submits fail right now. 

822 self.executor_flags.flag_as_broken(bpe) 

823 

824 # Mark pending tasks as failed. 

825 for work_item in self.pending_work_items.values(): 

826 work_item.future.set_exception(bpe) 

827 # Delete references to object. See issue16284 

828 del work_item 

829 self.pending_work_items.clear() 

830 

831 # Terminate remaining workers forcibly: the queues or their 

832 # locks may be in a dirty state and block forever. 

833 self.kill_workers(reason="broken executor") 

834 

835 # clean up resources 

836 self.join_executor_internals() 

837 

838 def flag_executor_shutting_down(self): 

839 # Flag the executor as shutting down and cancel remaining tasks if 

840 # requested as early as possible if it is not gc-ed yet. 

841 self.executor_flags.flag_as_shutting_down() 

842 

843 # Cancel pending work items if requested. 

844 if self.executor_flags.kill_workers: 

845 while self.pending_work_items: 

846 _, work_item = self.pending_work_items.popitem() 

847 work_item.future.set_exception( 

848 ShutdownExecutorError( 

849 "The Executor was shutdown with `kill_workers=True` " 

850 "before this job could complete." 

851 ) 

852 ) 

853 del work_item 

854 

855 # Kill the remaining worker forcibly to no waste time joining them 

856 self.kill_workers(reason="executor shutting down") 

857 

858 def kill_workers(self, reason=""): 

859 # Terminate the remaining workers using SIGKILL. This function also 

860 # terminates descendant workers of the children in case there is some 

861 # nested parallelism. 

862 while self.processes: 

863 _, p = self.processes.popitem() 

864 mp.util.debug(f"terminate process {p.name}, reason: {reason}") 

865 try: 

866 kill_process_tree(p) 

867 except ProcessLookupError: # pragma: no cover 

868 pass 

869 

870 def shutdown_workers(self): 

871 # shutdown all workers in self.processes 

872 

873 # Create a list to avoid RuntimeError due to concurrent modification of 

874 # processes. nb_children_alive is thus an upper bound. Also release the 

875 # processes' _worker_exit_lock to accelerate the shutdown procedure, as 

876 # there is no need for hand-shake here. 

877 with self.processes_management_lock: 

878 n_children_to_stop = 0 

879 for p in list(self.processes.values()): 

880 mp.util.debug(f"releasing worker exit lock on {p.name}") 

881 p._worker_exit_lock.release() 

882 n_children_to_stop += 1 

883 

884 mp.util.debug(f"found {n_children_to_stop} processes to stop") 

885 

886 # Send the right number of sentinels, to make sure all children are 

887 # properly terminated. Do it with a mechanism that avoid hanging on 

888 # Full queue when all workers have already been shutdown. 

889 n_sentinels_sent = 0 

890 cooldown_time = 0.001 

891 while ( 

892 n_sentinels_sent < n_children_to_stop 

893 and self.get_n_children_alive() > 0 

894 ): 

895 for _ in range(n_children_to_stop - n_sentinels_sent): 

896 try: 

897 self.call_queue.put_nowait(None) 

898 n_sentinels_sent += 1 

899 except queue.Full as e: 

900 if cooldown_time > 5.0: 

901 mp.util.info( 

902 "failed to send all sentinels and exit with error." 

903 f"\ncall_queue size={self.call_queue._maxsize}; " 

904 f" full is {self.call_queue.full()}; " 

905 ) 

906 raise e 

907 mp.util.info( 

908 "full call_queue prevented to send all sentinels at " 

909 "once, waiting..." 

910 ) 

911 sleep(cooldown_time) 

912 cooldown_time *= 1.2 

913 break 

914 

915 mp.util.debug(f"sent {n_sentinels_sent} sentinels to the call queue") 

916 

917 def join_executor_internals(self): 

918 self.shutdown_workers() 

919 

920 # Release the queue's resources as soon as possible. Flag the feeder 

921 # thread for clean exit to avoid having the crash detection thread flag 

922 # the Executor as broken during the shutdown. This is safe as either: 

923 # * We don't need to communicate with the workers anymore 

924 # * There is nothing left in the Queue buffer except None sentinels 

925 mp.util.debug("closing call_queue") 

926 self.call_queue.close() 

927 self.call_queue.join_thread() 

928 

929 # Closing result_queue 

930 mp.util.debug("closing result_queue") 

931 self.result_queue.close() 

932 

933 mp.util.debug("closing thread_wakeup") 

934 with self.shutdown_lock: 

935 self.thread_wakeup.close() 

936 

937 # If .join() is not called on the created processes then 

938 # some ctx.Queue methods may deadlock on macOS. 

939 with self.processes_management_lock: 

940 mp.util.debug(f"joining {len(self.processes)} processes") 

941 n_joined_processes = 0 

942 while True: 

943 try: 

944 pid, p = self.processes.popitem() 

945 mp.util.debug(f"joining process {p.name} with pid {pid}") 

946 p.join() 

947 n_joined_processes += 1 

948 except KeyError: 

949 break 

950 

951 mp.util.debug( 

952 "executor management thread clean shutdown of " 

953 f"{n_joined_processes} workers" 

954 ) 

955 

956 def get_n_children_alive(self): 

957 # This is an upper bound on the number of children alive. 

958 with self.processes_management_lock: 

959 return sum(p.is_alive() for p in list(self.processes.values())) 

960 

961 

962_system_limits_checked = False 

963_system_limited = None 

964 

965 

966def _check_system_limits(): 

967 global _system_limits_checked, _system_limited 

968 if _system_limits_checked and _system_limited: 

969 raise NotImplementedError(_system_limited) 

970 _system_limits_checked = True 

971 try: 

972 nsems_max = os.sysconf("SC_SEM_NSEMS_MAX") 

973 except (AttributeError, ValueError): 

974 # sysconf not available or setting not available 

975 return 

976 if nsems_max == -1: 

977 # undetermined limit, assume that limit is determined 

978 # by available memory only 

979 return 

980 if nsems_max >= 256: 

981 # minimum number of semaphores available 

982 # according to POSIX 

983 return 

984 _system_limited = ( 

985 f"system provides too few semaphores ({nsems_max} available, " 

986 "256 necessary)" 

987 ) 

988 raise NotImplementedError(_system_limited) 

989 

990 

991def _chain_from_iterable_of_lists(iterable): 

992 """ 

993 Specialized implementation of itertools.chain.from_iterable. 

994 Each item in *iterable* should be a list. This function is 

995 careful not to keep references to yielded objects. 

996 """ 

997 for element in iterable: 

998 element.reverse() 

999 while element: 

1000 yield element.pop() 

1001 

1002 

1003def _check_max_depth(context): 

1004 # Limit the maxmal recursion level 

1005 global _CURRENT_DEPTH 

1006 if context.get_start_method() == "fork" and _CURRENT_DEPTH > 0: 

1007 raise LokyRecursionError( 

1008 "Could not spawn extra nested processes at depth superior to " 

1009 "MAX_DEPTH=1. It is not possible to increase this limit when " 

1010 "using the 'fork' start method." 

1011 ) 

1012 

1013 if 0 < MAX_DEPTH and _CURRENT_DEPTH + 1 > MAX_DEPTH: 

1014 raise LokyRecursionError( 

1015 "Could not spawn extra nested processes at depth superior to " 

1016 f"MAX_DEPTH={MAX_DEPTH}. If this is intendend, you can change " 

1017 "this limit with the LOKY_MAX_DEPTH environment variable." 

1018 ) 

1019 

1020 

1021class LokyRecursionError(RuntimeError): 

1022 """A process tries to spawn too many levels of nested processes.""" 

1023 

1024 

1025class BrokenProcessPool(_BPPException): 

1026 """ 

1027 Raised when the executor is broken while a future was in the running state. 

1028 The cause can an error raised when unpickling the task in the worker 

1029 process or when unpickling the result value in the parent process. It can 

1030 also be caused by a worker process being terminated unexpectedly. 

1031 """ 

1032 

1033 

1034class TerminatedWorkerError(BrokenProcessPool): 

1035 """ 

1036 Raised when a process in a ProcessPoolExecutor terminated abruptly 

1037 while a future was in the running state. 

1038 """ 

1039 

1040 

1041# Alias for backward compat (for code written for loky 1.1.4 and earlier). Do 

1042# not use in new code. 

1043BrokenExecutor = BrokenProcessPool 

1044 

1045 

1046class ShutdownExecutorError(RuntimeError): 

1047 """ 

1048 Raised when a ProcessPoolExecutor is shutdown while a future was in the 

1049 running or pending state. 

1050 """ 

1051 

1052 

1053class ProcessPoolExecutor(Executor): 

1054 

1055 _at_exit = None 

1056 

1057 def __init__( 

1058 self, 

1059 max_workers=None, 

1060 job_reducers=None, 

1061 result_reducers=None, 

1062 timeout=None, 

1063 context=None, 

1064 initializer=None, 

1065 initargs=(), 

1066 env=None, 

1067 ): 

1068 """Initializes a new ProcessPoolExecutor instance. 

1069 

1070 Args: 

1071 max_workers: int, optional (default: cpu_count()) 

1072 The maximum number of processes that can be used to execute the 

1073 given calls. If None or not given then as many worker processes 

1074 will be created as the number of CPUs the current process 

1075 can use. 

1076 job_reducers, result_reducers: dict(type: reducer_func) 

1077 Custom reducer for pickling the jobs and the results from the 

1078 Executor. If only `job_reducers` is provided, `result_reducer` 

1079 will use the same reducers 

1080 timeout: int, optional (default: None) 

1081 Idle workers exit after timeout seconds. If a new job is 

1082 submitted after the timeout, the executor will start enough 

1083 new Python processes to make sure the pool of workers is full. 

1084 context: A multiprocessing context to launch the workers. This 

1085 object should provide SimpleQueue, Queue and Process. 

1086 initializer: An callable used to initialize worker processes. 

1087 initargs: A tuple of arguments to pass to the initializer. 

1088 env: A dict of environment variable to overwrite in the child 

1089 process. The environment variables are set before any module is 

1090 loaded. Note that this only works with the loky context. 

1091 """ 

1092 _check_system_limits() 

1093 

1094 if max_workers is None: 

1095 self._max_workers = cpu_count() 

1096 else: 

1097 if max_workers <= 0: 

1098 raise ValueError("max_workers must be greater than 0") 

1099 self._max_workers = max_workers 

1100 

1101 if ( 

1102 sys.platform == "win32" 

1103 and self._max_workers > _MAX_WINDOWS_WORKERS 

1104 ): 

1105 warnings.warn( 

1106 f"On Windows, max_workers cannot exceed {_MAX_WINDOWS_WORKERS} " 

1107 "due to limitations of the operating system." 

1108 ) 

1109 self._max_workers = _MAX_WINDOWS_WORKERS 

1110 

1111 if context is None: 

1112 context = get_context() 

1113 self._context = context 

1114 self._env = env 

1115 

1116 self._initializer, self._initargs = _prepare_initializer( 

1117 initializer, initargs 

1118 ) 

1119 _check_max_depth(self._context) 

1120 

1121 if result_reducers is None: 

1122 result_reducers = job_reducers 

1123 

1124 # Timeout 

1125 self._timeout = timeout 

1126 

1127 # Management thread 

1128 self._executor_manager_thread = None 

1129 

1130 # Map of pids to processes 

1131 self._processes = {} 

1132 

1133 # Internal variables of the ProcessPoolExecutor 

1134 self._processes = {} 

1135 self._queue_count = 0 

1136 self._pending_work_items = {} 

1137 self._running_work_items = [] 

1138 self._work_ids = queue.Queue() 

1139 self._processes_management_lock = self._context.Lock() 

1140 self._executor_manager_thread = None 

1141 self._shutdown_lock = threading.Lock() 

1142 

1143 # _ThreadWakeup is a communication channel used to interrupt the wait 

1144 # of the main loop of executor_manager_thread from another thread (e.g. 

1145 # when calling executor.submit or executor.shutdown). We do not use the 

1146 # _result_queue to send wakeup signals to the executor_manager_thread 

1147 # as it could result in a deadlock if a worker process dies with the 

1148 # _result_queue write lock still acquired. 

1149 # 

1150 # _shutdown_lock must be locked to access _ThreadWakeup.wakeup. 

1151 self._executor_manager_thread_wakeup = _ThreadWakeup() 

1152 

1153 # Flag to hold the state of the Executor. This permits to introspect 

1154 # the Executor state even once it has been garbage collected. 

1155 self._flags = _ExecutorFlags(self._shutdown_lock) 

1156 

1157 # Finally setup the queues for interprocess communication 

1158 self._setup_queues(job_reducers, result_reducers) 

1159 

1160 mp.util.debug("ProcessPoolExecutor is setup") 

1161 

1162 def _setup_queues(self, job_reducers, result_reducers, queue_size=None): 

1163 # Make the call queue slightly larger than the number of processes to 

1164 # prevent the worker processes from idling. But don't make it too big 

1165 # because futures in the call queue cannot be cancelled. 

1166 if queue_size is None: 

1167 queue_size = 2 * self._max_workers + EXTRA_QUEUED_CALLS 

1168 self._call_queue = _SafeQueue( 

1169 max_size=queue_size, 

1170 pending_work_items=self._pending_work_items, 

1171 running_work_items=self._running_work_items, 

1172 thread_wakeup=self._executor_manager_thread_wakeup, 

1173 shutdown_lock=self._shutdown_lock, 

1174 reducers=job_reducers, 

1175 ctx=self._context, 

1176 ) 

1177 # Killed worker processes can produce spurious "broken pipe" 

1178 # tracebacks in the queue's own worker thread. But we detect killed 

1179 # processes anyway, so silence the tracebacks. 

1180 self._call_queue._ignore_epipe = True 

1181 

1182 self._result_queue = SimpleQueue( 

1183 reducers=result_reducers, ctx=self._context 

1184 ) 

1185 

1186 def _start_executor_manager_thread(self): 

1187 if self._executor_manager_thread is None: 

1188 mp.util.debug("_start_executor_manager_thread called") 

1189 

1190 # Start the processes so that their sentinels are known. 

1191 self._executor_manager_thread = _ExecutorManagerThread(self) 

1192 self._executor_manager_thread.start() 

1193 

1194 # register this executor in a mechanism that ensures it will wakeup 

1195 # when the interpreter is exiting. 

1196 _threads_wakeups[self._executor_manager_thread] = ( 

1197 self._shutdown_lock, 

1198 self._executor_manager_thread_wakeup, 

1199 ) 

1200 

1201 global process_pool_executor_at_exit 

1202 if process_pool_executor_at_exit is None: 

1203 # Ensure that the _python_exit function will be called before 

1204 # the multiprocessing.Queue._close finalizers which have an 

1205 # exitpriority of 10. 

1206 

1207 if sys.version_info < (3, 9): 

1208 process_pool_executor_at_exit = mp.util.Finalize( 

1209 None, _python_exit, exitpriority=20 

1210 ) 

1211 else: 

1212 process_pool_executor_at_exit = threading._register_atexit( 

1213 _python_exit 

1214 ) 

1215 

1216 def _adjust_process_count(self): 

1217 while len(self._processes) < self._max_workers: 

1218 worker_exit_lock = self._context.BoundedSemaphore(1) 

1219 args = ( 

1220 self._call_queue, 

1221 self._result_queue, 

1222 self._initializer, 

1223 self._initargs, 

1224 self._processes_management_lock, 

1225 self._timeout, 

1226 worker_exit_lock, 

1227 _CURRENT_DEPTH + 1, 

1228 ) 

1229 worker_exit_lock.acquire() 

1230 try: 

1231 # Try to spawn the process with some environment variable to 

1232 # overwrite but it only works with the loky context for now. 

1233 p = self._context.Process( 

1234 target=_process_worker, args=args, env=self._env 

1235 ) 

1236 except TypeError: 

1237 p = self._context.Process(target=_process_worker, args=args) 

1238 p._worker_exit_lock = worker_exit_lock 

1239 p.start() 

1240 self._processes[p.pid] = p 

1241 mp.util.debug( 

1242 f"Adjusted process count to {self._max_workers}: " 

1243 f"{[(p.name, pid) for pid, p in self._processes.items()]}" 

1244 ) 

1245 

1246 def _ensure_executor_running(self): 

1247 """ensures all workers and management thread are running""" 

1248 with self._processes_management_lock: 

1249 if len(self._processes) != self._max_workers: 

1250 self._adjust_process_count() 

1251 self._start_executor_manager_thread() 

1252 

1253 def submit(self, fn, *args, **kwargs): 

1254 with self._flags.shutdown_lock: 

1255 if self._flags.broken is not None: 

1256 raise self._flags.broken 

1257 if self._flags.shutdown: 

1258 raise ShutdownExecutorError( 

1259 "cannot schedule new futures after shutdown" 

1260 ) 

1261 

1262 # Cannot submit a new calls once the interpreter is shutting down. 

1263 # This check avoids spawning new processes at exit. 

1264 if _global_shutdown: 

1265 raise RuntimeError( 

1266 "cannot schedule new futures after interpreter shutdown" 

1267 ) 

1268 

1269 f = Future() 

1270 w = _WorkItem(f, fn, args, kwargs) 

1271 

1272 self._pending_work_items[self._queue_count] = w 

1273 self._work_ids.put(self._queue_count) 

1274 self._queue_count += 1 

1275 # Wake up queue management thread 

1276 self._executor_manager_thread_wakeup.wakeup() 

1277 

1278 self._ensure_executor_running() 

1279 return f 

1280 

1281 submit.__doc__ = Executor.submit.__doc__ 

1282 

1283 def map(self, fn, *iterables, **kwargs): 

1284 """Returns an iterator equivalent to map(fn, iter). 

1285 

1286 Args: 

1287 fn: A callable that will take as many arguments as there are 

1288 passed iterables. 

1289 timeout: The maximum number of seconds to wait. If None, then there 

1290 is no limit on the wait time. 

1291 chunksize: If greater than one, the iterables will be chopped into 

1292 chunks of size chunksize and submitted to the process pool. 

1293 If set to one, the items in the list will be sent one at a 

1294 time. 

1295 

1296 Returns: 

1297 An iterator equivalent to: map(func, *iterables) but the calls may 

1298 be evaluated out-of-order. 

1299 

1300 Raises: 

1301 TimeoutError: If the entire result iterator could not be generated 

1302 before the given timeout. 

1303 Exception: If fn(*args) raises for any values. 

1304 """ 

1305 timeout = kwargs.get("timeout", None) 

1306 chunksize = kwargs.get("chunksize", 1) 

1307 if chunksize < 1: 

1308 raise ValueError("chunksize must be >= 1.") 

1309 

1310 results = super().map( 

1311 partial(_process_chunk, fn), 

1312 _get_chunks(chunksize, *iterables), 

1313 timeout=timeout, 

1314 ) 

1315 return _chain_from_iterable_of_lists(results) 

1316 

1317 def shutdown(self, wait=True, kill_workers=False): 

1318 mp.util.debug(f"shutting down executor {self}") 

1319 

1320 self._flags.flag_as_shutting_down(kill_workers) 

1321 executor_manager_thread = self._executor_manager_thread 

1322 executor_manager_thread_wakeup = self._executor_manager_thread_wakeup 

1323 

1324 if executor_manager_thread_wakeup is not None: 

1325 # Wake up queue management thread 

1326 with self._shutdown_lock: 

1327 self._executor_manager_thread_wakeup.wakeup() 

1328 

1329 if executor_manager_thread is not None and wait: 

1330 # This locks avoids concurrent join if the interpreter 

1331 # is shutting down. 

1332 with _global_shutdown_lock: 

1333 executor_manager_thread.join() 

1334 _threads_wakeups.pop(executor_manager_thread, None) 

1335 

1336 # To reduce the risk of opening too many files, remove references to 

1337 # objects that use file descriptors. 

1338 self._executor_manager_thread = None 

1339 self._executor_manager_thread_wakeup = None 

1340 self._call_queue = None 

1341 self._result_queue = None 

1342 self._processes_management_lock = None 

1343 

1344 shutdown.__doc__ = Executor.shutdown.__doc__