Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/joblib/externals/loky/process_executor.py: 20%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

552 statements  

1############################################################################### 

2# Re-implementation of the ProcessPoolExecutor more robust to faults 

3# 

4# author: Thomas Moreau and Olivier Grisel 

5# 

6# adapted from concurrent/futures/process_pool_executor.py (17/02/2017) 

7# * Add an extra management thread to detect executor_manager_thread failures, 

8# * Improve the shutdown process to avoid deadlocks, 

9# * Add timeout for workers, 

10# * More robust pickling process. 

11# 

12# Copyright 2009 Brian Quinlan. All Rights Reserved. 

13# Licensed to PSF under a Contributor Agreement. 

14 

15"""Implements ProcessPoolExecutor. 

16 

17The follow diagram and text describe the data-flow through the system: 

18 

19|======================= In-process =====================|== Out-of-process ==| 

20 

21+----------+ +----------+ +--------+ +-----------+ +---------+ 

22| | => | Work Ids | | | | Call Q | | Process | 

23| | +----------+ | | +-----------+ | Pool | 

24| | | ... | | | | ... | +---------+ 

25| | | 6 | => | | => | 5, call() | => | | 

26| | | 7 | | | | ... | | | 

27| Process | | ... | | Local | +-----------+ | Process | 

28| Pool | +----------+ | Worker | | #1..n | 

29| Executor | | Thread | | | 

30| | +----------- + | | +-----------+ | | 

31| | <=> | Work Items | <=> | | <= | Result Q | <= | | 

32| | +------------+ | | +-----------+ | | 

33| | | 6: call() | | | | ... | | | 

34| | | future | +--------+ | 4, result | | | 

35| | | ... | | 3, except | | | 

36+----------+ +------------+ +-----------+ +---------+ 

37 

38Executor.submit() called: 

39- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict 

40- adds the id of the _WorkItem to the "Work Ids" queue 

41 

42Local worker thread: 

43- reads work ids from the "Work Ids" queue and looks up the corresponding 

44 WorkItem from the "Work Items" dict: if the work item has been cancelled then 

45 it is simply removed from the dict, otherwise it is repackaged as a 

46 _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q" 

47 until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because 

48 calls placed in the "Call Q" can no longer be cancelled with Future.cancel(). 

49- reads _ResultItems from "Result Q", updates the future stored in the 

50 "Work Items" dict and deletes the dict entry 

51 

52Process #1..n: 

53- reads _CallItems from "Call Q", executes the calls, and puts the resulting 

54 _ResultItems in "Result Q" 

55""" 

56 

57 

58__author__ = "Thomas Moreau (thomas.moreau.2010@gmail.com)" 

59 

60 

61import os 

62import gc 

63import sys 

64import queue 

65import struct 

66import weakref 

67import warnings 

68import itertools 

69import traceback 

70import threading 

71from time import time, sleep 

72import multiprocessing as mp 

73from functools import partial 

74from pickle import PicklingError 

75from concurrent.futures import Executor 

76from concurrent.futures._base import LOGGER 

77from concurrent.futures.process import BrokenProcessPool as _BPPException 

78from multiprocessing.connection import wait 

79 

80from ._base import Future 

81from .backend import get_context 

82from .backend.context import cpu_count, _MAX_WINDOWS_WORKERS 

83from .backend.queues import Queue, SimpleQueue 

84from .backend.reduction import set_loky_pickler, get_loky_pickler_name 

85from .backend.utils import kill_process_tree, get_exitcodes_terminated_worker 

86from .initializers import _prepare_initializer 

87 

88 

89# Mechanism to prevent infinite process spawning. When a worker of a 

90# ProcessPoolExecutor nested in MAX_DEPTH Executor tries to create a new 

91# Executor, a LokyRecursionError is raised 

92MAX_DEPTH = int(os.environ.get("LOKY_MAX_DEPTH", 10)) 

93_CURRENT_DEPTH = 0 

94 

95# Minimum time interval between two consecutive memory leak protection checks. 

96_MEMORY_LEAK_CHECK_DELAY = 1.0 

97 

98# Number of bytes of memory usage allowed over the reference process size. 

99_MAX_MEMORY_LEAK_SIZE = int(3e8) 

100 

101 

102try: 

103 from psutil import Process 

104 

105 _USE_PSUTIL = True 

106 

107 def _get_memory_usage(pid, force_gc=False): 

108 if force_gc: 

109 gc.collect() 

110 

111 mem_size = Process(pid).memory_info().rss 

112 mp.util.debug(f"psutil return memory size: {mem_size}") 

113 return mem_size 

114 

115except ImportError: 

116 _USE_PSUTIL = False 

117 

118 

119class _ThreadWakeup: 

120 def __init__(self): 

121 self._closed = False 

122 self._reader, self._writer = mp.Pipe(duplex=False) 

123 

124 def close(self): 

125 if not self._closed: 

126 self._closed = True 

127 self._writer.close() 

128 self._reader.close() 

129 

130 def wakeup(self): 

131 if not self._closed: 

132 self._writer.send_bytes(b"") 

133 

134 def clear(self): 

135 if not self._closed: 

136 while self._reader.poll(): 

137 self._reader.recv_bytes() 

138 

139 

140class _ExecutorFlags: 

141 """necessary references to maintain executor states without preventing gc 

142 

143 It permits to keep the information needed by executor_manager_thread 

144 and crash_detection_thread to maintain the pool without preventing the 

145 garbage collection of unreferenced executors. 

146 """ 

147 

148 def __init__(self, shutdown_lock): 

149 

150 self.shutdown = False 

151 self.broken = None 

152 self.kill_workers = False 

153 self.shutdown_lock = shutdown_lock 

154 

155 def flag_as_shutting_down(self, kill_workers=None): 

156 with self.shutdown_lock: 

157 self.shutdown = True 

158 if kill_workers is not None: 

159 self.kill_workers = kill_workers 

160 

161 def flag_as_broken(self, broken): 

162 with self.shutdown_lock: 

163 self.shutdown = True 

164 self.broken = broken 

165 

166 

167# Prior to 3.9, executor_manager_thread is created as daemon thread. This means 

168# that it is not joined automatically when the interpreter is shutting down. 

169# To work around this problem, an exit handler is installed to tell the 

170# thread to exit when the interpreter is shutting down and then waits until 

171# it finishes. The thread needs to be daemonized because the atexit hooks are 

172# called after all non daemonized threads are joined. 

173# 

174# Starting 3.9, there exists a specific atexit hook to be called before joining 

175# the threads so the executor_manager_thread does not need to be daemonized 

176# anymore. 

177# 

178# The atexit hooks are registered when starting the first ProcessPoolExecutor 

179# to avoid import having an effect on the interpreter. 

180 

181_global_shutdown = False 

182_global_shutdown_lock = threading.Lock() 

183_threads_wakeups = weakref.WeakKeyDictionary() 

184 

185 

186def _python_exit(): 

187 global _global_shutdown 

188 _global_shutdown = True 

189 

190 # Materialize the list of items to avoid error due to iterating over 

191 # changing size dictionary. 

192 items = list(_threads_wakeups.items()) 

193 if len(items) > 0: 

194 mp.util.debug( 

195 "Interpreter shutting down. Waking up {len(items)}" 

196 f"executor_manager_thread:\n{items}" 

197 ) 

198 

199 # Wake up the executor_manager_thread's so they can detect the interpreter 

200 # is shutting down and exit. 

201 for _, (shutdown_lock, thread_wakeup) in items: 

202 with shutdown_lock: 

203 thread_wakeup.wakeup() 

204 

205 # Collect the executor_manager_thread's to make sure we exit cleanly. 

206 for thread, _ in items: 

207 # This locks is to prevent situations where an executor is gc'ed in one 

208 # thread while the atexit finalizer is running in another thread. This 

209 # can happen when joblib is used in pypy for instance. 

210 with _global_shutdown_lock: 

211 thread.join() 

212 

213 

214# With the fork context, _thread_wakeups is propagated to children. 

215# Clear it after fork to avoid some situation that can cause some 

216# freeze when joining the workers. 

217mp.util.register_after_fork(_threads_wakeups, lambda obj: obj.clear()) 

218 

219 

220# Module variable to register the at_exit call 

221process_pool_executor_at_exit = None 

222 

223# Controls how many more calls than processes will be queued in the call queue. 

224# A smaller number will mean that processes spend more time idle waiting for 

225# work while a larger number will make Future.cancel() succeed less frequently 

226# (Futures in the call queue cannot be cancelled). 

227EXTRA_QUEUED_CALLS = 1 

228 

229 

230class _RemoteTraceback(Exception): 

231 """Embed stringification of remote traceback in local traceback""" 

232 

233 def __init__(self, tb=None): 

234 self.tb = f'\n"""\n{tb}"""' 

235 

236 def __str__(self): 

237 return self.tb 

238 

239 

240# Do not inherit from BaseException to mirror 

241# concurrent.futures.process._ExceptionWithTraceback 

242class _ExceptionWithTraceback: 

243 def __init__(self, exc): 

244 tb = getattr(exc, "__traceback__", None) 

245 if tb is None: 

246 _, _, tb = sys.exc_info() 

247 tb = traceback.format_exception(type(exc), exc, tb) 

248 tb = "".join(tb) 

249 self.exc = exc 

250 self.tb = tb 

251 

252 def __reduce__(self): 

253 return _rebuild_exc, (self.exc, self.tb) 

254 

255 

256def _rebuild_exc(exc, tb): 

257 exc.__cause__ = _RemoteTraceback(tb) 

258 return exc 

259 

260 

261class _WorkItem: 

262 

263 __slots__ = ["future", "fn", "args", "kwargs"] 

264 

265 def __init__(self, future, fn, args, kwargs): 

266 self.future = future 

267 self.fn = fn 

268 self.args = args 

269 self.kwargs = kwargs 

270 

271 

272class _ResultItem: 

273 def __init__(self, work_id, exception=None, result=None): 

274 self.work_id = work_id 

275 self.exception = exception 

276 self.result = result 

277 

278 

279class _CallItem: 

280 def __init__(self, work_id, fn, args, kwargs): 

281 self.work_id = work_id 

282 self.fn = fn 

283 self.args = args 

284 self.kwargs = kwargs 

285 

286 # Store the current loky_pickler so it is correctly set in the worker 

287 self.loky_pickler = get_loky_pickler_name() 

288 

289 def __call__(self): 

290 set_loky_pickler(self.loky_pickler) 

291 return self.fn(*self.args, **self.kwargs) 

292 

293 def __repr__(self): 

294 return ( 

295 f"CallItem({self.work_id}, {self.fn}, {self.args}, {self.kwargs})" 

296 ) 

297 

298 

299class _SafeQueue(Queue): 

300 """Safe Queue set exception to the future object linked to a job""" 

301 

302 def __init__( 

303 self, 

304 max_size=0, 

305 ctx=None, 

306 pending_work_items=None, 

307 running_work_items=None, 

308 thread_wakeup=None, 

309 reducers=None, 

310 ): 

311 self.thread_wakeup = thread_wakeup 

312 self.pending_work_items = pending_work_items 

313 self.running_work_items = running_work_items 

314 super().__init__(max_size, reducers=reducers, ctx=ctx) 

315 

316 def _on_queue_feeder_error(self, e, obj): 

317 if isinstance(obj, _CallItem): 

318 # format traceback only works on python3 

319 if isinstance(e, struct.error): 

320 raised_error = RuntimeError( 

321 "The task could not be sent to the workers as it is too " 

322 "large for `send_bytes`." 

323 ) 

324 else: 

325 raised_error = PicklingError( 

326 "Could not pickle the task to send it to the workers." 

327 ) 

328 tb = traceback.format_exception( 

329 type(e), e, getattr(e, "__traceback__", None) 

330 ) 

331 raised_error.__cause__ = _RemoteTraceback("".join(tb)) 

332 work_item = self.pending_work_items.pop(obj.work_id, None) 

333 self.running_work_items.remove(obj.work_id) 

334 # work_item can be None if another process terminated. In this 

335 # case, the executor_manager_thread fails all work_items with 

336 # BrokenProcessPool 

337 if work_item is not None: 

338 work_item.future.set_exception(raised_error) 

339 del work_item 

340 self.thread_wakeup.wakeup() 

341 else: 

342 super()._on_queue_feeder_error(e, obj) 

343 

344 

345def _get_chunks(chunksize, *iterables): 

346 """Iterates over zip()ed iterables in chunks.""" 

347 it = zip(*iterables) 

348 while True: 

349 chunk = tuple(itertools.islice(it, chunksize)) 

350 if not chunk: 

351 return 

352 yield chunk 

353 

354 

355def _process_chunk(fn, chunk): 

356 """Processes a chunk of an iterable passed to map. 

357 

358 Runs the function passed to map() on a chunk of the 

359 iterable passed to map. 

360 

361 This function is run in a separate process. 

362 

363 """ 

364 return [fn(*args) for args in chunk] 

365 

366 

367def _sendback_result(result_queue, work_id, result=None, exception=None): 

368 """Safely send back the given result or exception""" 

369 try: 

370 result_queue.put( 

371 _ResultItem(work_id, result=result, exception=exception) 

372 ) 

373 except BaseException as e: 

374 exc = _ExceptionWithTraceback(e) 

375 result_queue.put(_ResultItem(work_id, exception=exc)) 

376 

377 

378def _process_worker( 

379 call_queue, 

380 result_queue, 

381 initializer, 

382 initargs, 

383 processes_management_lock, 

384 timeout, 

385 worker_exit_lock, 

386 current_depth, 

387): 

388 """Evaluates calls from call_queue and places the results in result_queue. 

389 

390 This worker is run in a separate process. 

391 

392 Args: 

393 call_queue: A ctx.Queue of _CallItems that will be read and 

394 evaluated by the worker. 

395 result_queue: A ctx.Queue of _ResultItems that will written 

396 to by the worker. 

397 initializer: A callable initializer, or None 

398 initargs: A tuple of args for the initializer 

399 processes_management_lock: A ctx.Lock avoiding worker timeout while 

400 some workers are being spawned. 

401 timeout: maximum time to wait for a new item in the call_queue. If that 

402 time is expired, the worker will shutdown. 

403 worker_exit_lock: Lock to avoid flagging the executor as broken on 

404 workers timeout. 

405 current_depth: Nested parallelism level, to avoid infinite spawning. 

406 """ 

407 if initializer is not None: 

408 try: 

409 initializer(*initargs) 

410 except BaseException: 

411 LOGGER.critical("Exception in initializer:", exc_info=True) 

412 # The parent will notice that the process stopped and 

413 # mark the pool broken 

414 return 

415 

416 # set the global _CURRENT_DEPTH mechanism to limit recursive call 

417 global _CURRENT_DEPTH 

418 _CURRENT_DEPTH = current_depth 

419 _process_reference_size = None 

420 _last_memory_leak_check = None 

421 pid = os.getpid() 

422 

423 mp.util.debug(f"Worker started with timeout={timeout}") 

424 while True: 

425 try: 

426 call_item = call_queue.get(block=True, timeout=timeout) 

427 if call_item is None: 

428 mp.util.info("Shutting down worker on sentinel") 

429 except queue.Empty: 

430 mp.util.info(f"Shutting down worker after timeout {timeout:0.3f}s") 

431 if processes_management_lock.acquire(block=False): 

432 processes_management_lock.release() 

433 call_item = None 

434 else: 

435 mp.util.info("Could not acquire processes_management_lock") 

436 continue 

437 except BaseException: 

438 previous_tb = traceback.format_exc() 

439 try: 

440 result_queue.put(_RemoteTraceback(previous_tb)) 

441 except BaseException: 

442 # If we cannot format correctly the exception, at least print 

443 # the traceback. 

444 print(previous_tb) 

445 mp.util.debug("Exiting with code 1") 

446 sys.exit(1) 

447 if call_item is None: 

448 # Notify queue management thread about worker shutdown 

449 result_queue.put(pid) 

450 is_clean = worker_exit_lock.acquire(True, timeout=30) 

451 

452 # Early notify any loky executor running in this worker process 

453 # (nested parallelism) that this process is about to shutdown to 

454 # avoid a deadlock waiting undifinitely for the worker to finish. 

455 _python_exit() 

456 

457 if is_clean: 

458 mp.util.debug("Exited cleanly") 

459 else: 

460 mp.util.info("Main process did not release worker_exit") 

461 return 

462 try: 

463 r = call_item() 

464 except BaseException as e: 

465 exc = _ExceptionWithTraceback(e) 

466 result_queue.put(_ResultItem(call_item.work_id, exception=exc)) 

467 else: 

468 _sendback_result(result_queue, call_item.work_id, result=r) 

469 del r 

470 

471 # Free the resource as soon as possible, to avoid holding onto 

472 # open files or shared memory that is not needed anymore 

473 del call_item 

474 

475 if _USE_PSUTIL: 

476 if _process_reference_size is None: 

477 # Make reference measurement after the first call 

478 _process_reference_size = _get_memory_usage(pid, force_gc=True) 

479 _last_memory_leak_check = time() 

480 continue 

481 if time() - _last_memory_leak_check > _MEMORY_LEAK_CHECK_DELAY: 

482 mem_usage = _get_memory_usage(pid) 

483 _last_memory_leak_check = time() 

484 if mem_usage - _process_reference_size < _MAX_MEMORY_LEAK_SIZE: 

485 # Memory usage stays within bounds: everything is fine. 

486 continue 

487 

488 # Check again memory usage; this time take the measurement 

489 # after a forced garbage collection to break any reference 

490 # cycles. 

491 mem_usage = _get_memory_usage(pid, force_gc=True) 

492 _last_memory_leak_check = time() 

493 if mem_usage - _process_reference_size < _MAX_MEMORY_LEAK_SIZE: 

494 # The GC managed to free the memory: everything is fine. 

495 continue 

496 

497 # The process is leaking memory: let the main process 

498 # know that we need to start a new worker. 

499 mp.util.info("Memory leak detected: shutting down worker") 

500 result_queue.put(pid) 

501 with worker_exit_lock: 

502 mp.util.debug("Exit due to memory leak") 

503 return 

504 else: 

505 # if psutil is not installed, trigger gc.collect events 

506 # regularly to limit potential memory leaks due to reference cycles 

507 if _last_memory_leak_check is None or ( 

508 time() - _last_memory_leak_check > _MEMORY_LEAK_CHECK_DELAY 

509 ): 

510 gc.collect() 

511 _last_memory_leak_check = time() 

512 

513 

514class _ExecutorManagerThread(threading.Thread): 

515 """Manages the communication between this process and the worker processes. 

516 

517 The manager is run in a local thread. 

518 

519 Args: 

520 executor: A reference to the ProcessPoolExecutor that owns 

521 this thread. A weakref will be own by the manager as well as 

522 references to internal objects used to introspect the state of 

523 the executor. 

524 """ 

525 

526 def __init__(self, executor): 

527 # Store references to necessary internals of the executor. 

528 

529 # A _ThreadWakeup to allow waking up the executor_manager_thread from 

530 # the main Thread and avoid deadlocks caused by permanently 

531 # locked queues. 

532 self.thread_wakeup = executor._executor_manager_thread_wakeup 

533 self.shutdown_lock = executor._shutdown_lock 

534 

535 # A weakref.ref to the ProcessPoolExecutor that owns this thread. Used 

536 # to determine if the ProcessPoolExecutor has been garbage collected 

537 # and that the manager can exit. 

538 # When the executor gets garbage collected, the weakref callback 

539 # will wake up the queue management thread so that it can terminate 

540 # if there is no pending work item. 

541 def weakref_cb( 

542 _, 

543 thread_wakeup=self.thread_wakeup, 

544 shutdown_lock=self.shutdown_lock, 

545 ): 

546 if mp is not None: 

547 # At this point, the multiprocessing module can already be 

548 # garbage collected. We only log debug info when still 

549 # possible. 

550 mp.util.debug( 

551 "Executor collected: triggering callback for" 

552 " QueueManager wakeup" 

553 ) 

554 with shutdown_lock: 

555 thread_wakeup.wakeup() 

556 

557 self.executor_reference = weakref.ref(executor, weakref_cb) 

558 

559 # The flags of the executor 

560 self.executor_flags = executor._flags 

561 

562 # A list of the ctx.Process instances used as workers. 

563 self.processes = executor._processes 

564 

565 # A ctx.Queue that will be filled with _CallItems derived from 

566 # _WorkItems for processing by the process workers. 

567 self.call_queue = executor._call_queue 

568 

569 # A ctx.SimpleQueue of _ResultItems generated by the process workers. 

570 self.result_queue = executor._result_queue 

571 

572 # A queue.Queue of work ids e.g. Queue([5, 6, ...]). 

573 self.work_ids_queue = executor._work_ids 

574 

575 # A dict mapping work ids to _WorkItems e.g. 

576 # {5: <_WorkItem...>, 6: <_WorkItem...>, ...} 

577 self.pending_work_items = executor._pending_work_items 

578 

579 # A list of the work_ids that are currently running 

580 self.running_work_items = executor._running_work_items 

581 

582 # A lock to avoid concurrent shutdown of workers on timeout and spawn 

583 # of new processes or shut down 

584 self.processes_management_lock = executor._processes_management_lock 

585 

586 super().__init__(name="ExecutorManagerThread") 

587 if sys.version_info < (3, 9): 

588 self.daemon = True 

589 

590 def run(self): 

591 # Main loop for the executor manager thread. 

592 

593 while True: 

594 self.add_call_item_to_queue() 

595 

596 result_item, is_broken, bpe = self.wait_result_broken_or_wakeup() 

597 

598 if is_broken: 

599 self.terminate_broken(bpe) 

600 return 

601 if result_item is not None: 

602 self.process_result_item(result_item) 

603 # Delete reference to result_item to avoid keeping references 

604 # while waiting on new results. 

605 del result_item 

606 

607 if self.is_shutting_down(): 

608 self.flag_executor_shutting_down() 

609 

610 # Since no new work items can be added, it is safe to shutdown 

611 # this thread if there are no pending work items. 

612 if not self.pending_work_items: 

613 self.join_executor_internals() 

614 return 

615 

616 def add_call_item_to_queue(self): 

617 # Fills call_queue with _WorkItems from pending_work_items. 

618 # This function never blocks. 

619 while True: 

620 if self.call_queue.full(): 

621 return 

622 try: 

623 work_id = self.work_ids_queue.get(block=False) 

624 except queue.Empty: 

625 return 

626 else: 

627 work_item = self.pending_work_items[work_id] 

628 

629 if work_item.future.set_running_or_notify_cancel(): 

630 self.running_work_items += [work_id] 

631 self.call_queue.put( 

632 _CallItem( 

633 work_id, 

634 work_item.fn, 

635 work_item.args, 

636 work_item.kwargs, 

637 ), 

638 block=True, 

639 ) 

640 else: 

641 del self.pending_work_items[work_id] 

642 continue 

643 

644 def wait_result_broken_or_wakeup(self): 

645 # Wait for a result to be ready in the result_queue while checking 

646 # that all worker processes are still running, or for a wake up 

647 # signal send. The wake up signals come either from new tasks being 

648 # submitted, from the executor being shutdown/gc-ed, or from the 

649 # shutdown of the python interpreter. 

650 result_reader = self.result_queue._reader 

651 wakeup_reader = self.thread_wakeup._reader 

652 readers = [result_reader, wakeup_reader] 

653 worker_sentinels = [p.sentinel for p in list(self.processes.values())] 

654 ready = wait(readers + worker_sentinels) 

655 

656 bpe = None 

657 is_broken = True 

658 result_item = None 

659 if result_reader in ready: 

660 try: 

661 result_item = result_reader.recv() 

662 if isinstance(result_item, _RemoteTraceback): 

663 bpe = BrokenProcessPool( 

664 "A task has failed to un-serialize. Please ensure that" 

665 " the arguments of the function are all picklable." 

666 ) 

667 bpe.__cause__ = result_item 

668 else: 

669 is_broken = False 

670 except BaseException as e: 

671 bpe = BrokenProcessPool( 

672 "A result has failed to un-serialize. Please ensure that " 

673 "the objects returned by the function are always " 

674 "picklable." 

675 ) 

676 tb = traceback.format_exception( 

677 type(e), e, getattr(e, "__traceback__", None) 

678 ) 

679 bpe.__cause__ = _RemoteTraceback("".join(tb)) 

680 

681 elif wakeup_reader in ready: 

682 # This is simply a wake-up event that might either trigger putting 

683 # more tasks in the queue or trigger the clean up of resources. 

684 is_broken = False 

685 else: 

686 # A worker has terminated and we don't know why, set the state of 

687 # the executor as broken 

688 exit_codes = "" 

689 if sys.platform != "win32": 

690 # In Windows, introspecting terminated workers exitcodes seems 

691 # unstable, therefore they are not appended in the exception 

692 # message. 

693 exit_codes = ( 

694 "\nThe exit codes of the workers are " 

695 f"{get_exitcodes_terminated_worker(self.processes)}" 

696 ) 

697 mp.util.debug( 

698 "A worker unexpectedly terminated. Workers that " 

699 "might have caused the breakage: " 

700 + str( 

701 { 

702 p.name: p.exitcode 

703 for p in list(self.processes.values()) 

704 if p is not None and p.sentinel in ready 

705 } 

706 ) 

707 ) 

708 bpe = TerminatedWorkerError( 

709 "A worker process managed by the executor was unexpectedly " 

710 "terminated. This could be caused by a segmentation fault " 

711 "while calling the function or by an excessive memory usage " 

712 "causing the Operating System to kill the worker.\n" 

713 f"{exit_codes}" 

714 ) 

715 

716 self.thread_wakeup.clear() 

717 

718 return result_item, is_broken, bpe 

719 

720 def process_result_item(self, result_item): 

721 # Process the received a result_item. This can be either the PID of a 

722 # worker that exited gracefully or a _ResultItem 

723 

724 if isinstance(result_item, int): 

725 # Clean shutdown of a worker using its PID, either on request 

726 # by the executor.shutdown method or by the timeout of the worker 

727 # itself: we should not mark the executor as broken. 

728 with self.processes_management_lock: 

729 p = self.processes.pop(result_item, None) 

730 

731 # p can be None if the executor is concurrently shutting down. 

732 if p is not None: 

733 p._worker_exit_lock.release() 

734 mp.util.debug( 

735 f"joining {p.name} when processing {p.pid} as result_item" 

736 ) 

737 p.join() 

738 del p 

739 

740 # Make sure the executor have the right number of worker, even if a 

741 # worker timeout while some jobs were submitted. If some work is 

742 # pending or there is less processes than running items, we need to 

743 # start a new Process and raise a warning. 

744 n_pending = len(self.pending_work_items) 

745 n_running = len(self.running_work_items) 

746 if n_pending - n_running > 0 or n_running > len(self.processes): 

747 executor = self.executor_reference() 

748 if ( 

749 executor is not None 

750 and len(self.processes) < executor._max_workers 

751 ): 

752 warnings.warn( 

753 "A worker stopped while some jobs were given to the " 

754 "executor. This can be caused by a too short worker " 

755 "timeout or by a memory leak.", 

756 UserWarning, 

757 ) 

758 with executor._processes_management_lock: 

759 executor._adjust_process_count() 

760 executor = None 

761 else: 

762 # Received a _ResultItem so mark the future as completed. 

763 work_item = self.pending_work_items.pop(result_item.work_id, None) 

764 # work_item can be None if another process terminated (see above) 

765 if work_item is not None: 

766 if result_item.exception: 

767 work_item.future.set_exception(result_item.exception) 

768 else: 

769 work_item.future.set_result(result_item.result) 

770 self.running_work_items.remove(result_item.work_id) 

771 

772 def is_shutting_down(self): 

773 # Check whether we should start shutting down the executor. 

774 executor = self.executor_reference() 

775 # No more work items can be added if: 

776 # - The interpreter is shutting down OR 

777 # - The executor that owns this thread is not broken AND 

778 # * The executor that owns this worker has been collected OR 

779 # * The executor that owns this worker has been shutdown. 

780 # If the executor is broken, it should be detected in the next loop. 

781 return _global_shutdown or ( 

782 (executor is None or self.executor_flags.shutdown) 

783 and not self.executor_flags.broken 

784 ) 

785 

786 def terminate_broken(self, bpe): 

787 # Terminate the executor because it is in a broken state. The bpe 

788 # argument can be used to display more information on the error that 

789 # lead the executor into becoming broken. 

790 

791 # Mark the process pool broken so that submits fail right now. 

792 self.executor_flags.flag_as_broken(bpe) 

793 

794 # Mark pending tasks as failed. 

795 for work_item in self.pending_work_items.values(): 

796 work_item.future.set_exception(bpe) 

797 # Delete references to object. See issue16284 

798 del work_item 

799 self.pending_work_items.clear() 

800 

801 # Terminate remaining workers forcibly: the queues or their 

802 # locks may be in a dirty state and block forever. 

803 self.kill_workers(reason="broken executor") 

804 

805 # clean up resources 

806 self.join_executor_internals() 

807 

808 def flag_executor_shutting_down(self): 

809 # Flag the executor as shutting down and cancel remaining tasks if 

810 # requested as early as possible if it is not gc-ed yet. 

811 self.executor_flags.flag_as_shutting_down() 

812 

813 # Cancel pending work items if requested. 

814 if self.executor_flags.kill_workers: 

815 while self.pending_work_items: 

816 _, work_item = self.pending_work_items.popitem() 

817 work_item.future.set_exception( 

818 ShutdownExecutorError( 

819 "The Executor was shutdown with `kill_workers=True` " 

820 "before this job could complete." 

821 ) 

822 ) 

823 del work_item 

824 

825 # Kill the remaining worker forcibly to no waste time joining them 

826 self.kill_workers(reason="executor shutting down") 

827 

828 def kill_workers(self, reason=""): 

829 # Terminate the remaining workers using SIGKILL. This function also 

830 # terminates descendant workers of the children in case there is some 

831 # nested parallelism. 

832 while self.processes: 

833 _, p = self.processes.popitem() 

834 mp.util.debug(f"terminate process {p.name}, reason: {reason}") 

835 try: 

836 kill_process_tree(p) 

837 except ProcessLookupError: # pragma: no cover 

838 pass 

839 

840 def shutdown_workers(self): 

841 # shutdown all workers in self.processes 

842 

843 # Create a list to avoid RuntimeError due to concurrent modification of 

844 # processes. nb_children_alive is thus an upper bound. Also release the 

845 # processes' _worker_exit_lock to accelerate the shutdown procedure, as 

846 # there is no need for hand-shake here. 

847 with self.processes_management_lock: 

848 n_children_to_stop = 0 

849 for p in list(self.processes.values()): 

850 mp.util.debug(f"releasing worker exit lock on {p.name}") 

851 p._worker_exit_lock.release() 

852 n_children_to_stop += 1 

853 

854 mp.util.debug(f"found {n_children_to_stop} processes to stop") 

855 

856 # Send the right number of sentinels, to make sure all children are 

857 # properly terminated. Do it with a mechanism that avoid hanging on 

858 # Full queue when all workers have already been shutdown. 

859 n_sentinels_sent = 0 

860 cooldown_time = 0.001 

861 while ( 

862 n_sentinels_sent < n_children_to_stop 

863 and self.get_n_children_alive() > 0 

864 ): 

865 for _ in range(n_children_to_stop - n_sentinels_sent): 

866 try: 

867 self.call_queue.put_nowait(None) 

868 n_sentinels_sent += 1 

869 except queue.Full as e: 

870 if cooldown_time > 5.0: 

871 mp.util.info( 

872 "failed to send all sentinels and exit with error." 

873 f"\ncall_queue size={self.call_queue._maxsize}; " 

874 f" full is {self.call_queue.full()}; " 

875 ) 

876 raise e 

877 mp.util.info( 

878 "full call_queue prevented to send all sentinels at " 

879 "once, waiting..." 

880 ) 

881 sleep(cooldown_time) 

882 cooldown_time *= 1.2 

883 break 

884 

885 mp.util.debug(f"sent {n_sentinels_sent} sentinels to the call queue") 

886 

887 def join_executor_internals(self): 

888 self.shutdown_workers() 

889 

890 # Release the queue's resources as soon as possible. Flag the feeder 

891 # thread for clean exit to avoid having the crash detection thread flag 

892 # the Executor as broken during the shutdown. This is safe as either: 

893 # * We don't need to communicate with the workers anymore 

894 # * There is nothing left in the Queue buffer except None sentinels 

895 mp.util.debug("closing call_queue") 

896 self.call_queue.close() 

897 self.call_queue.join_thread() 

898 

899 # Closing result_queue 

900 mp.util.debug("closing result_queue") 

901 self.result_queue.close() 

902 

903 mp.util.debug("closing thread_wakeup") 

904 with self.shutdown_lock: 

905 self.thread_wakeup.close() 

906 

907 # If .join() is not called on the created processes then 

908 # some ctx.Queue methods may deadlock on macOS. 

909 with self.processes_management_lock: 

910 mp.util.debug(f"joining {len(self.processes)} processes") 

911 n_joined_processes = 0 

912 while True: 

913 try: 

914 pid, p = self.processes.popitem() 

915 mp.util.debug(f"joining process {p.name} with pid {pid}") 

916 p.join() 

917 n_joined_processes += 1 

918 except KeyError: 

919 break 

920 

921 mp.util.debug( 

922 "executor management thread clean shutdown of " 

923 f"{n_joined_processes} workers" 

924 ) 

925 

926 def get_n_children_alive(self): 

927 # This is an upper bound on the number of children alive. 

928 with self.processes_management_lock: 

929 return sum(p.is_alive() for p in list(self.processes.values())) 

930 

931 

932_system_limits_checked = False 

933_system_limited = None 

934 

935 

936def _check_system_limits(): 

937 global _system_limits_checked, _system_limited 

938 if _system_limits_checked and _system_limited: 

939 raise NotImplementedError(_system_limited) 

940 _system_limits_checked = True 

941 try: 

942 nsems_max = os.sysconf("SC_SEM_NSEMS_MAX") 

943 except (AttributeError, ValueError): 

944 # sysconf not available or setting not available 

945 return 

946 if nsems_max == -1: 

947 # undetermined limit, assume that limit is determined 

948 # by available memory only 

949 return 

950 if nsems_max >= 256: 

951 # minimum number of semaphores available 

952 # according to POSIX 

953 return 

954 _system_limited = ( 

955 f"system provides too few semaphores ({nsems_max} available, " 

956 "256 necessary)" 

957 ) 

958 raise NotImplementedError(_system_limited) 

959 

960 

961def _chain_from_iterable_of_lists(iterable): 

962 """ 

963 Specialized implementation of itertools.chain.from_iterable. 

964 Each item in *iterable* should be a list. This function is 

965 careful not to keep references to yielded objects. 

966 """ 

967 for element in iterable: 

968 element.reverse() 

969 while element: 

970 yield element.pop() 

971 

972 

973def _check_max_depth(context): 

974 # Limit the maxmal recursion level 

975 global _CURRENT_DEPTH 

976 if context.get_start_method() == "fork" and _CURRENT_DEPTH > 0: 

977 raise LokyRecursionError( 

978 "Could not spawn extra nested processes at depth superior to " 

979 "MAX_DEPTH=1. It is not possible to increase this limit when " 

980 "using the 'fork' start method." 

981 ) 

982 

983 if 0 < MAX_DEPTH and _CURRENT_DEPTH + 1 > MAX_DEPTH: 

984 raise LokyRecursionError( 

985 "Could not spawn extra nested processes at depth superior to " 

986 f"MAX_DEPTH={MAX_DEPTH}. If this is intendend, you can change " 

987 "this limit with the LOKY_MAX_DEPTH environment variable." 

988 ) 

989 

990 

991class LokyRecursionError(RuntimeError): 

992 """A process tries to spawn too many levels of nested processes.""" 

993 

994 

995class BrokenProcessPool(_BPPException): 

996 """ 

997 Raised when the executor is broken while a future was in the running state. 

998 The cause can an error raised when unpickling the task in the worker 

999 process or when unpickling the result value in the parent process. It can 

1000 also be caused by a worker process being terminated unexpectedly. 

1001 """ 

1002 

1003 

1004class TerminatedWorkerError(BrokenProcessPool): 

1005 """ 

1006 Raised when a process in a ProcessPoolExecutor terminated abruptly 

1007 while a future was in the running state. 

1008 """ 

1009 

1010 

1011# Alias for backward compat (for code written for loky 1.1.4 and earlier). Do 

1012# not use in new code. 

1013BrokenExecutor = BrokenProcessPool 

1014 

1015 

1016class ShutdownExecutorError(RuntimeError): 

1017 

1018 """ 

1019 Raised when a ProcessPoolExecutor is shutdown while a future was in the 

1020 running or pending state. 

1021 """ 

1022 

1023 

1024class ProcessPoolExecutor(Executor): 

1025 

1026 _at_exit = None 

1027 

1028 def __init__( 

1029 self, 

1030 max_workers=None, 

1031 job_reducers=None, 

1032 result_reducers=None, 

1033 timeout=None, 

1034 context=None, 

1035 initializer=None, 

1036 initargs=(), 

1037 env=None, 

1038 ): 

1039 """Initializes a new ProcessPoolExecutor instance. 

1040 

1041 Args: 

1042 max_workers: int, optional (default: cpu_count()) 

1043 The maximum number of processes that can be used to execute the 

1044 given calls. If None or not given then as many worker processes 

1045 will be created as the number of CPUs the current process 

1046 can use. 

1047 job_reducers, result_reducers: dict(type: reducer_func) 

1048 Custom reducer for pickling the jobs and the results from the 

1049 Executor. If only `job_reducers` is provided, `result_reducer` 

1050 will use the same reducers 

1051 timeout: int, optional (default: None) 

1052 Idle workers exit after timeout seconds. If a new job is 

1053 submitted after the timeout, the executor will start enough 

1054 new Python processes to make sure the pool of workers is full. 

1055 context: A multiprocessing context to launch the workers. This 

1056 object should provide SimpleQueue, Queue and Process. 

1057 initializer: An callable used to initialize worker processes. 

1058 initargs: A tuple of arguments to pass to the initializer. 

1059 env: A dict of environment variable to overwrite in the child 

1060 process. The environment variables are set before any module is 

1061 loaded. Note that this only works with the loky context. 

1062 """ 

1063 _check_system_limits() 

1064 

1065 if max_workers is None: 

1066 self._max_workers = cpu_count() 

1067 else: 

1068 if max_workers <= 0: 

1069 raise ValueError("max_workers must be greater than 0") 

1070 self._max_workers = max_workers 

1071 

1072 if ( 

1073 sys.platform == "win32" 

1074 and self._max_workers > _MAX_WINDOWS_WORKERS 

1075 ): 

1076 warnings.warn( 

1077 f"On Windows, max_workers cannot exceed {_MAX_WINDOWS_WORKERS} " 

1078 "due to limitations of the operating system." 

1079 ) 

1080 self._max_workers = _MAX_WINDOWS_WORKERS 

1081 

1082 if context is None: 

1083 context = get_context() 

1084 self._context = context 

1085 self._env = env 

1086 

1087 self._initializer, self._initargs = _prepare_initializer( 

1088 initializer, initargs 

1089 ) 

1090 _check_max_depth(self._context) 

1091 

1092 if result_reducers is None: 

1093 result_reducers = job_reducers 

1094 

1095 # Timeout 

1096 self._timeout = timeout 

1097 

1098 # Management thread 

1099 self._executor_manager_thread = None 

1100 

1101 # Map of pids to processes 

1102 self._processes = {} 

1103 

1104 # Internal variables of the ProcessPoolExecutor 

1105 self._processes = {} 

1106 self._queue_count = 0 

1107 self._pending_work_items = {} 

1108 self._running_work_items = [] 

1109 self._work_ids = queue.Queue() 

1110 self._processes_management_lock = self._context.Lock() 

1111 self._executor_manager_thread = None 

1112 self._shutdown_lock = threading.Lock() 

1113 

1114 # _ThreadWakeup is a communication channel used to interrupt the wait 

1115 # of the main loop of executor_manager_thread from another thread (e.g. 

1116 # when calling executor.submit or executor.shutdown). We do not use the 

1117 # _result_queue to send wakeup signals to the executor_manager_thread 

1118 # as it could result in a deadlock if a worker process dies with the 

1119 # _result_queue write lock still acquired. 

1120 # 

1121 # _shutdown_lock must be locked to access _ThreadWakeup.wakeup. 

1122 self._executor_manager_thread_wakeup = _ThreadWakeup() 

1123 

1124 # Flag to hold the state of the Executor. This permits to introspect 

1125 # the Executor state even once it has been garbage collected. 

1126 self._flags = _ExecutorFlags(self._shutdown_lock) 

1127 

1128 # Finally setup the queues for interprocess communication 

1129 self._setup_queues(job_reducers, result_reducers) 

1130 

1131 mp.util.debug("ProcessPoolExecutor is setup") 

1132 

1133 def _setup_queues(self, job_reducers, result_reducers, queue_size=None): 

1134 # Make the call queue slightly larger than the number of processes to 

1135 # prevent the worker processes from idling. But don't make it too big 

1136 # because futures in the call queue cannot be cancelled. 

1137 if queue_size is None: 

1138 queue_size = 2 * self._max_workers + EXTRA_QUEUED_CALLS 

1139 self._call_queue = _SafeQueue( 

1140 max_size=queue_size, 

1141 pending_work_items=self._pending_work_items, 

1142 running_work_items=self._running_work_items, 

1143 thread_wakeup=self._executor_manager_thread_wakeup, 

1144 reducers=job_reducers, 

1145 ctx=self._context, 

1146 ) 

1147 # Killed worker processes can produce spurious "broken pipe" 

1148 # tracebacks in the queue's own worker thread. But we detect killed 

1149 # processes anyway, so silence the tracebacks. 

1150 self._call_queue._ignore_epipe = True 

1151 

1152 self._result_queue = SimpleQueue( 

1153 reducers=result_reducers, ctx=self._context 

1154 ) 

1155 

1156 def _start_executor_manager_thread(self): 

1157 if self._executor_manager_thread is None: 

1158 mp.util.debug("_start_executor_manager_thread called") 

1159 

1160 # Start the processes so that their sentinels are known. 

1161 self._executor_manager_thread = _ExecutorManagerThread(self) 

1162 self._executor_manager_thread.start() 

1163 

1164 # register this executor in a mechanism that ensures it will wakeup 

1165 # when the interpreter is exiting. 

1166 _threads_wakeups[self._executor_manager_thread] = ( 

1167 self._shutdown_lock, 

1168 self._executor_manager_thread_wakeup, 

1169 ) 

1170 

1171 global process_pool_executor_at_exit 

1172 if process_pool_executor_at_exit is None: 

1173 # Ensure that the _python_exit function will be called before 

1174 # the multiprocessing.Queue._close finalizers which have an 

1175 # exitpriority of 10. 

1176 

1177 if sys.version_info < (3, 9): 

1178 process_pool_executor_at_exit = mp.util.Finalize( 

1179 None, _python_exit, exitpriority=20 

1180 ) 

1181 else: 

1182 process_pool_executor_at_exit = threading._register_atexit( 

1183 _python_exit 

1184 ) 

1185 

1186 def _adjust_process_count(self): 

1187 while len(self._processes) < self._max_workers: 

1188 worker_exit_lock = self._context.BoundedSemaphore(1) 

1189 args = ( 

1190 self._call_queue, 

1191 self._result_queue, 

1192 self._initializer, 

1193 self._initargs, 

1194 self._processes_management_lock, 

1195 self._timeout, 

1196 worker_exit_lock, 

1197 _CURRENT_DEPTH + 1, 

1198 ) 

1199 worker_exit_lock.acquire() 

1200 try: 

1201 # Try to spawn the process with some environment variable to 

1202 # overwrite but it only works with the loky context for now. 

1203 p = self._context.Process( 

1204 target=_process_worker, args=args, env=self._env 

1205 ) 

1206 except TypeError: 

1207 p = self._context.Process(target=_process_worker, args=args) 

1208 p._worker_exit_lock = worker_exit_lock 

1209 p.start() 

1210 self._processes[p.pid] = p 

1211 mp.util.debug( 

1212 f"Adjusted process count to {self._max_workers}: " 

1213 f"{[(p.name, pid) for pid, p in self._processes.items()]}" 

1214 ) 

1215 

1216 def _ensure_executor_running(self): 

1217 """ensures all workers and management thread are running""" 

1218 with self._processes_management_lock: 

1219 if len(self._processes) != self._max_workers: 

1220 self._adjust_process_count() 

1221 self._start_executor_manager_thread() 

1222 

1223 def submit(self, fn, *args, **kwargs): 

1224 with self._flags.shutdown_lock: 

1225 if self._flags.broken is not None: 

1226 raise self._flags.broken 

1227 if self._flags.shutdown: 

1228 raise ShutdownExecutorError( 

1229 "cannot schedule new futures after shutdown" 

1230 ) 

1231 

1232 # Cannot submit a new calls once the interpreter is shutting down. 

1233 # This check avoids spawning new processes at exit. 

1234 if _global_shutdown: 

1235 raise RuntimeError( 

1236 "cannot schedule new futures after " "interpreter shutdown" 

1237 ) 

1238 

1239 f = Future() 

1240 w = _WorkItem(f, fn, args, kwargs) 

1241 

1242 self._pending_work_items[self._queue_count] = w 

1243 self._work_ids.put(self._queue_count) 

1244 self._queue_count += 1 

1245 # Wake up queue management thread 

1246 self._executor_manager_thread_wakeup.wakeup() 

1247 

1248 self._ensure_executor_running() 

1249 return f 

1250 

1251 submit.__doc__ = Executor.submit.__doc__ 

1252 

1253 def map(self, fn, *iterables, **kwargs): 

1254 """Returns an iterator equivalent to map(fn, iter). 

1255 

1256 Args: 

1257 fn: A callable that will take as many arguments as there are 

1258 passed iterables. 

1259 timeout: The maximum number of seconds to wait. If None, then there 

1260 is no limit on the wait time. 

1261 chunksize: If greater than one, the iterables will be chopped into 

1262 chunks of size chunksize and submitted to the process pool. 

1263 If set to one, the items in the list will be sent one at a 

1264 time. 

1265 

1266 Returns: 

1267 An iterator equivalent to: map(func, *iterables) but the calls may 

1268 be evaluated out-of-order. 

1269 

1270 Raises: 

1271 TimeoutError: If the entire result iterator could not be generated 

1272 before the given timeout. 

1273 Exception: If fn(*args) raises for any values. 

1274 """ 

1275 timeout = kwargs.get("timeout", None) 

1276 chunksize = kwargs.get("chunksize", 1) 

1277 if chunksize < 1: 

1278 raise ValueError("chunksize must be >= 1.") 

1279 

1280 results = super().map( 

1281 partial(_process_chunk, fn), 

1282 _get_chunks(chunksize, *iterables), 

1283 timeout=timeout, 

1284 ) 

1285 return _chain_from_iterable_of_lists(results) 

1286 

1287 def shutdown(self, wait=True, kill_workers=False): 

1288 mp.util.debug(f"shutting down executor {self}") 

1289 

1290 self._flags.flag_as_shutting_down(kill_workers) 

1291 executor_manager_thread = self._executor_manager_thread 

1292 executor_manager_thread_wakeup = self._executor_manager_thread_wakeup 

1293 

1294 if executor_manager_thread_wakeup is not None: 

1295 # Wake up queue management thread 

1296 with self._shutdown_lock: 

1297 self._executor_manager_thread_wakeup.wakeup() 

1298 

1299 if executor_manager_thread is not None and wait: 

1300 # This locks avoids concurrent join if the interpreter 

1301 # is shutting down. 

1302 with _global_shutdown_lock: 

1303 executor_manager_thread.join() 

1304 _threads_wakeups.pop(executor_manager_thread, None) 

1305 

1306 # To reduce the risk of opening too many files, remove references to 

1307 # objects that use file descriptors. 

1308 self._executor_manager_thread = None 

1309 self._executor_manager_thread_wakeup = None 

1310 self._call_queue = None 

1311 self._result_queue = None 

1312 self._processes_management_lock = None 

1313 

1314 shutdown.__doc__ = Executor.shutdown.__doc__