1###############################################################################
2# Re-implementation of the ProcessPoolExecutor more robust to faults
3#
4# author: Thomas Moreau and Olivier Grisel
5#
6# adapted from concurrent/futures/process_pool_executor.py (17/02/2017)
7# * Add an extra management thread to detect executor_manager_thread failures,
8# * Improve the shutdown process to avoid deadlocks,
9# * Add timeout for workers,
10# * More robust pickling process.
11#
12# Copyright 2009 Brian Quinlan. All Rights Reserved.
13# Licensed to PSF under a Contributor Agreement.
14
15"""Implements ProcessPoolExecutor.
16
17The follow diagram and text describe the data-flow through the system:
18
19|======================= In-process =====================|== Out-of-process ==|
20
21+----------+ +----------+ +--------+ +-----------+ +---------+
22| | => | Work Ids | | | | Call Q | | Process |
23| | +----------+ | | +-----------+ | Pool |
24| | | ... | | | | ... | +---------+
25| | | 6 | => | | => | 5, call() | => | |
26| | | 7 | | | | ... | | |
27| Process | | ... | | Local | +-----------+ | Process |
28| Pool | +----------+ | Worker | | #1..n |
29| Executor | | Thread | | |
30| | +----------- + | | +-----------+ | |
31| | <=> | Work Items | <=> | | <= | Result Q | <= | |
32| | +------------+ | | +-----------+ | |
33| | | 6: call() | | | | ... | | |
34| | | future | +--------+ | 4, result | | |
35| | | ... | | 3, except | | |
36+----------+ +------------+ +-----------+ +---------+
37
38Executor.submit() called:
39- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
40- adds the id of the _WorkItem to the "Work Ids" queue
41
42Local worker thread:
43- reads work ids from the "Work Ids" queue and looks up the corresponding
44 WorkItem from the "Work Items" dict: if the work item has been cancelled then
45 it is simply removed from the dict, otherwise it is repackaged as a
46 _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
47 until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
48 calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
49- reads _ResultItems from "Result Q", updates the future stored in the
50 "Work Items" dict and deletes the dict entry
51
52Process #1..n:
53- reads _CallItems from "Call Q", executes the calls, and puts the resulting
54 _ResultItems in "Result Q"
55"""
56
57
58__author__ = "Thomas Moreau (thomas.moreau.2010@gmail.com)"
59
60
61import faulthandler
62import os
63import gc
64import sys
65import queue
66import struct
67import weakref
68import warnings
69import itertools
70import traceback
71import threading
72from time import time, sleep
73import multiprocessing as mp
74from functools import partial
75from pickle import PicklingError
76from concurrent.futures import Executor
77from concurrent.futures._base import LOGGER
78from concurrent.futures.process import BrokenProcessPool as _BPPException
79from multiprocessing.connection import wait
80
81from ._base import Future
82from .backend import get_context
83from .backend.context import cpu_count, _MAX_WINDOWS_WORKERS
84from .backend.queues import Queue, SimpleQueue
85from .backend.reduction import set_loky_pickler, get_loky_pickler_name
86from .backend.utils import kill_process_tree, get_exitcodes_terminated_worker
87from .initializers import _prepare_initializer
88
89
90# Mechanism to prevent infinite process spawning. When a worker of a
91# ProcessPoolExecutor nested in MAX_DEPTH Executor tries to create a new
92# Executor, a LokyRecursionError is raised
93MAX_DEPTH = int(os.environ.get("LOKY_MAX_DEPTH", 10))
94_CURRENT_DEPTH = 0
95
96# Minimum time interval between two consecutive memory leak protection checks.
97_MEMORY_LEAK_CHECK_DELAY = 1.0
98
99# Number of bytes of memory usage allowed over the reference process size.
100_MAX_MEMORY_LEAK_SIZE = int(3e8)
101
102
103try:
104 from psutil import Process
105
106 _USE_PSUTIL = True
107
108 def _get_memory_usage(pid, force_gc=False):
109 if force_gc:
110 gc.collect()
111
112 mem_size = Process(pid).memory_info().rss
113 mp.util.debug(f"psutil return memory size: {mem_size}")
114 return mem_size
115
116except ImportError:
117 _USE_PSUTIL = False
118
119
120class _ThreadWakeup:
121 def __init__(self):
122 self._closed = False
123 self._reader, self._writer = mp.Pipe(duplex=False)
124
125 def close(self):
126 if not self._closed:
127 self._closed = True
128 self._writer.close()
129 self._reader.close()
130
131 def wakeup(self):
132 if not self._closed:
133 self._writer.send_bytes(b"")
134
135 def clear(self):
136 if not self._closed:
137 while self._reader.poll():
138 self._reader.recv_bytes()
139
140
141class _ExecutorFlags:
142 """necessary references to maintain executor states without preventing gc
143
144 It permits to keep the information needed by executor_manager_thread
145 and crash_detection_thread to maintain the pool without preventing the
146 garbage collection of unreferenced executors.
147 """
148
149 def __init__(self, shutdown_lock):
150
151 self.shutdown = False
152 self.broken = None
153 self.kill_workers = False
154 self.shutdown_lock = shutdown_lock
155
156 def flag_as_shutting_down(self, kill_workers=None):
157 with self.shutdown_lock:
158 self.shutdown = True
159 if kill_workers is not None:
160 self.kill_workers = kill_workers
161
162 def flag_as_broken(self, broken):
163 with self.shutdown_lock:
164 self.shutdown = True
165 self.broken = broken
166
167
168# Prior to 3.9, executor_manager_thread is created as daemon thread. This means
169# that it is not joined automatically when the interpreter is shutting down.
170# To work around this problem, an exit handler is installed to tell the
171# thread to exit when the interpreter is shutting down and then waits until
172# it finishes. The thread needs to be daemonized because the atexit hooks are
173# called after all non daemonized threads are joined.
174#
175# Starting 3.9, there exists a specific atexit hook to be called before joining
176# the threads so the executor_manager_thread does not need to be daemonized
177# anymore.
178#
179# The atexit hooks are registered when starting the first ProcessPoolExecutor
180# to avoid import having an effect on the interpreter.
181
182_global_shutdown = False
183_global_shutdown_lock = threading.Lock()
184_threads_wakeups = weakref.WeakKeyDictionary()
185
186
187def _python_exit():
188 global _global_shutdown
189 _global_shutdown = True
190
191 # Materialize the list of items to avoid error due to iterating over
192 # changing size dictionary.
193 items = list(_threads_wakeups.items())
194 if len(items) > 0:
195 mp.util.debug(
196 f"Interpreter shutting down. Waking up {len(items)}"
197 f"executor_manager_thread:\n{items}"
198 )
199
200 # Wake up the executor_manager_thread's so they can detect the interpreter
201 # is shutting down and exit.
202 for _, (shutdown_lock, thread_wakeup) in items:
203 with shutdown_lock:
204 thread_wakeup.wakeup()
205
206 # Collect the executor_manager_thread's to make sure we exit cleanly.
207 for thread, _ in items:
208 # This locks is to prevent situations where an executor is gc'ed in one
209 # thread while the atexit finalizer is running in another thread.
210 with _global_shutdown_lock:
211 thread.join()
212
213
214# With the fork context, _thread_wakeups is propagated to children.
215# Clear it after fork to avoid some situation that can cause some
216# freeze when joining the workers.
217mp.util.register_after_fork(_threads_wakeups, lambda obj: obj.clear())
218
219
220# Module variable to register the at_exit call
221process_pool_executor_at_exit = None
222
223# Controls how many more calls than processes will be queued in the call queue.
224# A smaller number will mean that processes spend more time idle waiting for
225# work while a larger number will make Future.cancel() succeed less frequently
226# (Futures in the call queue cannot be cancelled).
227EXTRA_QUEUED_CALLS = 1
228
229
230class _RemoteTraceback(Exception):
231 """Embed stringification of remote traceback in local traceback"""
232
233 def __init__(self, tb=None):
234 self.tb = f'\n"""\n{tb}"""'
235
236 def __str__(self):
237 return self.tb
238
239
240# Do not inherit from BaseException to mirror
241# concurrent.futures.process._ExceptionWithTraceback
242class _ExceptionWithTraceback:
243 def __init__(self, exc):
244 tb = getattr(exc, "__traceback__", None)
245 if tb is None:
246 _, _, tb = sys.exc_info()
247 tb = traceback.format_exception(type(exc), exc, tb)
248 tb = "".join(tb)
249 self.exc = exc
250 self.tb = tb
251
252 def __reduce__(self):
253 return _rebuild_exc, (self.exc, self.tb)
254
255
256def _rebuild_exc(exc, tb):
257 exc.__cause__ = _RemoteTraceback(tb)
258 return exc
259
260
261class _WorkItem:
262
263 __slots__ = ["future", "fn", "args", "kwargs"]
264
265 def __init__(self, future, fn, args, kwargs):
266 self.future = future
267 self.fn = fn
268 self.args = args
269 self.kwargs = kwargs
270
271
272class _ResultItem:
273 def __init__(self, work_id, exception=None, result=None):
274 self.work_id = work_id
275 self.exception = exception
276 self.result = result
277
278
279class _CallItem:
280 def __init__(self, work_id, fn, args, kwargs):
281 self.work_id = work_id
282 self.fn = fn
283 self.args = args
284 self.kwargs = kwargs
285
286 # Store the current loky_pickler so it is correctly set in the worker
287 self.loky_pickler = get_loky_pickler_name()
288
289 def __call__(self):
290 set_loky_pickler(self.loky_pickler)
291 return self.fn(*self.args, **self.kwargs)
292
293 def __repr__(self):
294 return (
295 f"CallItem({self.work_id}, {self.fn}, {self.args}, {self.kwargs})"
296 )
297
298
299class _SafeQueue(Queue):
300 """Safe Queue set exception to the future object linked to a job"""
301
302 def __init__(
303 self,
304 max_size=0,
305 ctx=None,
306 pending_work_items=None,
307 running_work_items=None,
308 thread_wakeup=None,
309 shutdown_lock=None,
310 reducers=None,
311 ):
312 self.thread_wakeup = thread_wakeup
313 self.shutdown_lock = shutdown_lock
314 self.pending_work_items = pending_work_items
315 self.running_work_items = running_work_items
316 super().__init__(max_size, reducers=reducers, ctx=ctx)
317
318 def _on_queue_feeder_error(self, e, obj):
319 if isinstance(obj, _CallItem):
320 # format traceback only works on python3
321 if isinstance(e, struct.error):
322 raised_error = RuntimeError(
323 "The task could not be sent to the workers as it is too "
324 "large for `send_bytes`."
325 )
326 else:
327 raised_error = PicklingError(
328 "Could not pickle the task to send it to the workers."
329 )
330 tb = traceback.format_exception(
331 type(e), e, getattr(e, "__traceback__", None)
332 )
333 raised_error.__cause__ = _RemoteTraceback("".join(tb))
334 work_item = self.pending_work_items.pop(obj.work_id, None)
335 self.running_work_items.remove(obj.work_id)
336 # work_item can be None if another process terminated. In this
337 # case, the executor_manager_thread fails all work_items with
338 # BrokenProcessPool
339 if work_item is not None:
340 work_item.future.set_exception(raised_error)
341 del work_item
342 with self.shutdown_lock:
343 self.thread_wakeup.wakeup()
344 else:
345 super()._on_queue_feeder_error(e, obj)
346
347
348def _get_chunks(chunksize, *iterables):
349 """Iterates over zip()ed iterables in chunks."""
350 it = zip(*iterables)
351 while True:
352 chunk = tuple(itertools.islice(it, chunksize))
353 if not chunk:
354 return
355 yield chunk
356
357
358def _process_chunk(fn, chunk):
359 """Processes a chunk of an iterable passed to map.
360
361 Runs the function passed to map() on a chunk of the
362 iterable passed to map.
363
364 This function is run in a separate process.
365
366 """
367 return [fn(*args) for args in chunk]
368
369
370def _sendback_result(result_queue, work_id, result=None, exception=None):
371 """Safely send back the given result or exception"""
372 try:
373 result_queue.put(
374 _ResultItem(work_id, result=result, exception=exception)
375 )
376 except BaseException as e:
377 exc = _ExceptionWithTraceback(e)
378 result_queue.put(_ResultItem(work_id, exception=exc))
379
380
381def _enable_faulthandler_if_needed():
382 if "PYTHONFAULTHANDLER" in os.environ:
383 # Respect the environment variable to configure faulthandler. This
384 # makes it possible to never enable faulthandler in the loky workers by
385 # setting PYTHONFAULTHANDLER=0 explicitly in the environment.
386 mp.util.debug(
387 f"faulthandler explicitly configured by environment variable: "
388 f"PYTHONFAULTHANDLER={os.environ['PYTHONFAULTHANDLER']}."
389 )
390 else:
391 if faulthandler.is_enabled():
392 # Fault handler is already enabled, possibly via a custom
393 # initializer to customize the behavior.
394 mp.util.debug("faulthandler already enabled.")
395 else:
396 # Enable faulthandler by default with default paramaters otherwise.
397 mp.util.debug(
398 "Enabling faulthandler to report tracebacks on worker crashes."
399 )
400 faulthandler.enable()
401
402
403def _process_worker(
404 call_queue,
405 result_queue,
406 initializer,
407 initargs,
408 processes_management_lock,
409 timeout,
410 worker_exit_lock,
411 current_depth,
412):
413 """Evaluates calls from call_queue and places the results in result_queue.
414
415 This worker is run in a separate process.
416
417 Args:
418 call_queue: A ctx.Queue of _CallItems that will be read and
419 evaluated by the worker.
420 result_queue: A ctx.Queue of _ResultItems that will written
421 to by the worker.
422 initializer: A callable initializer, or None
423 initargs: A tuple of args for the initializer
424 processes_management_lock: A ctx.Lock avoiding worker timeout while
425 some workers are being spawned.
426 timeout: maximum time to wait for a new item in the call_queue. If that
427 time is expired, the worker will shutdown.
428 worker_exit_lock: Lock to avoid flagging the executor as broken on
429 workers timeout.
430 current_depth: Nested parallelism level, to avoid infinite spawning.
431 """
432 if initializer is not None:
433 try:
434 initializer(*initargs)
435 except BaseException:
436 LOGGER.critical("Exception in initializer:", exc_info=True)
437 # The parent will notice that the process stopped and
438 # mark the pool broken
439 return
440
441 # set the global _CURRENT_DEPTH mechanism to limit recursive call
442 global _CURRENT_DEPTH
443 _CURRENT_DEPTH = current_depth
444 _process_reference_size = None
445 _last_memory_leak_check = None
446 pid = os.getpid()
447
448 mp.util.debug(f"Worker started with timeout={timeout}")
449 _enable_faulthandler_if_needed()
450
451 while True:
452 try:
453 call_item = call_queue.get(block=True, timeout=timeout)
454 if call_item is None:
455 mp.util.info("Shutting down worker on sentinel")
456 except queue.Empty:
457 mp.util.info(f"Shutting down worker after timeout {timeout:0.3f}s")
458 if processes_management_lock.acquire(block=False):
459 processes_management_lock.release()
460 call_item = None
461 else:
462 mp.util.info("Could not acquire processes_management_lock")
463 continue
464 except BaseException:
465 previous_tb = traceback.format_exc()
466 try:
467 result_queue.put(_RemoteTraceback(previous_tb))
468 except BaseException:
469 # If we cannot format correctly the exception, at least print
470 # the traceback.
471 print(previous_tb)
472 mp.util.debug("Exiting with code 1")
473 sys.exit(1)
474 if call_item is None:
475 # Notify queue management thread about worker shutdown
476 result_queue.put(pid)
477 is_clean = worker_exit_lock.acquire(True, timeout=30)
478
479 # Early notify any loky executor running in this worker process
480 # (nested parallelism) that this process is about to shutdown to
481 # avoid a deadlock waiting undifinitely for the worker to finish.
482 _python_exit()
483
484 if is_clean:
485 mp.util.debug("Exited cleanly")
486 else:
487 mp.util.info("Main process did not release worker_exit")
488 return
489 try:
490 r = call_item()
491 except BaseException as e:
492 exc = _ExceptionWithTraceback(e)
493 result_queue.put(_ResultItem(call_item.work_id, exception=exc))
494 else:
495 _sendback_result(result_queue, call_item.work_id, result=r)
496 del r
497
498 # Free the resource as soon as possible, to avoid holding onto
499 # open files or shared memory that is not needed anymore
500 del call_item
501
502 if _USE_PSUTIL:
503 if _process_reference_size is None:
504 # Make reference measurement after the first call
505 _process_reference_size = _get_memory_usage(pid, force_gc=True)
506 _last_memory_leak_check = time()
507 continue
508 if time() - _last_memory_leak_check > _MEMORY_LEAK_CHECK_DELAY:
509 mem_usage = _get_memory_usage(pid)
510 _last_memory_leak_check = time()
511 if mem_usage - _process_reference_size < _MAX_MEMORY_LEAK_SIZE:
512 # Memory usage stays within bounds: everything is fine.
513 continue
514
515 # Check again memory usage; this time take the measurement
516 # after a forced garbage collection to break any reference
517 # cycles.
518 mem_usage = _get_memory_usage(pid, force_gc=True)
519 _last_memory_leak_check = time()
520 if mem_usage - _process_reference_size < _MAX_MEMORY_LEAK_SIZE:
521 # The GC managed to free the memory: everything is fine.
522 continue
523
524 # The process is leaking memory: let the master process
525 # know that we need to start a new worker.
526 mp.util.info("Memory leak detected: shutting down worker")
527 result_queue.put(pid)
528 with worker_exit_lock:
529 mp.util.debug("Exit due to memory leak")
530 return
531 else:
532 # if psutil is not installed, trigger gc.collect events
533 # regularly to limit potential memory leaks due to reference cycles
534 if _last_memory_leak_check is None or (
535 time() - _last_memory_leak_check > _MEMORY_LEAK_CHECK_DELAY
536 ):
537 gc.collect()
538 _last_memory_leak_check = time()
539
540
541class _ExecutorManagerThread(threading.Thread):
542 """Manages the communication between this process and the worker processes.
543
544 The manager is run in a local thread.
545
546 Args:
547 executor: A reference to the ProcessPoolExecutor that owns
548 this thread. A weakref will be own by the manager as well as
549 references to internal objects used to introspect the state of
550 the executor.
551 """
552
553 def __init__(self, executor):
554 # Store references to necessary internals of the executor.
555
556 # A _ThreadWakeup to allow waking up the executor_manager_thread from
557 # the main Thread and avoid deadlocks caused by permanently
558 # locked queues.
559 self.thread_wakeup = executor._executor_manager_thread_wakeup
560 self.shutdown_lock = executor._shutdown_lock
561
562 # A weakref.ref to the ProcessPoolExecutor that owns this thread. Used
563 # to determine if the ProcessPoolExecutor has been garbage collected
564 # and that the manager can exit.
565 # When the executor gets garbage collected, the weakref callback
566 # will wake up the queue management thread so that it can terminate
567 # if there is no pending work item.
568 def weakref_cb(
569 _,
570 thread_wakeup=self.thread_wakeup,
571 shutdown_lock=self.shutdown_lock,
572 ):
573 if mp is not None:
574 # At this point, the multiprocessing module can already be
575 # garbage collected. We only log debug info when still
576 # possible.
577 mp.util.debug(
578 "Executor collected: triggering callback for"
579 " QueueManager wakeup"
580 )
581 with shutdown_lock:
582 thread_wakeup.wakeup()
583
584 self.executor_reference = weakref.ref(executor, weakref_cb)
585
586 # The flags of the executor
587 self.executor_flags = executor._flags
588
589 # A list of the ctx.Process instances used as workers.
590 self.processes = executor._processes
591
592 # A ctx.Queue that will be filled with _CallItems derived from
593 # _WorkItems for processing by the process workers.
594 self.call_queue = executor._call_queue
595
596 # A ctx.SimpleQueue of _ResultItems generated by the process workers.
597 self.result_queue = executor._result_queue
598
599 # A queue.Queue of work ids e.g. Queue([5, 6, ...]).
600 self.work_ids_queue = executor._work_ids
601
602 # A dict mapping work ids to _WorkItems e.g.
603 # {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
604 self.pending_work_items = executor._pending_work_items
605
606 # A list of the work_ids that are currently running
607 self.running_work_items = executor._running_work_items
608
609 # A lock to avoid concurrent shutdown of workers on timeout and spawn
610 # of new processes or shut down
611 self.processes_management_lock = executor._processes_management_lock
612
613 super().__init__(name="ExecutorManagerThread")
614 if sys.version_info < (3, 9):
615 self.daemon = True
616
617 def run(self):
618 # Main loop for the executor manager thread.
619
620 while True:
621 self.add_call_item_to_queue()
622
623 result_item, is_broken, bpe = self.wait_result_broken_or_wakeup()
624
625 if is_broken:
626 self.terminate_broken(bpe)
627 return
628 if result_item is not None:
629 self.process_result_item(result_item)
630 # Delete reference to result_item to avoid keeping references
631 # while waiting on new results.
632 del result_item
633
634 if self.is_shutting_down():
635 self.flag_executor_shutting_down()
636
637 # Since no new work items can be added, it is safe to shutdown
638 # this thread if there are no pending work items.
639 if not self.pending_work_items:
640 self.join_executor_internals()
641 return
642
643 def add_call_item_to_queue(self):
644 # Fills call_queue with _WorkItems from pending_work_items.
645 # This function never blocks.
646 while True:
647 if self.call_queue.full():
648 return
649 try:
650 work_id = self.work_ids_queue.get(block=False)
651 except queue.Empty:
652 return
653 else:
654 work_item = self.pending_work_items[work_id]
655
656 if work_item.future.set_running_or_notify_cancel():
657 self.running_work_items += [work_id]
658 self.call_queue.put(
659 _CallItem(
660 work_id,
661 work_item.fn,
662 work_item.args,
663 work_item.kwargs,
664 ),
665 block=True,
666 )
667 else:
668 del self.pending_work_items[work_id]
669 continue
670
671 def wait_result_broken_or_wakeup(self):
672 # Wait for a result to be ready in the result_queue while checking
673 # that all worker processes are still running, or for a wake up
674 # signal send. The wake up signals come either from new tasks being
675 # submitted, from the executor being shutdown/gc-ed, or from the
676 # shutdown of the python interpreter.
677 result_reader = self.result_queue._reader
678 wakeup_reader = self.thread_wakeup._reader
679 readers = [result_reader, wakeup_reader]
680 worker_sentinels = [p.sentinel for p in list(self.processes.values())]
681 ready = wait(readers + worker_sentinels)
682
683 bpe = None
684 is_broken = True
685 result_item = None
686 if result_reader in ready:
687 try:
688 result_item = result_reader.recv()
689 if isinstance(result_item, _RemoteTraceback):
690 bpe = BrokenProcessPool(
691 "A task has failed to un-serialize. Please ensure that"
692 " the arguments of the function are all picklable."
693 )
694 bpe.__cause__ = result_item
695 else:
696 is_broken = False
697 except BaseException as e:
698 bpe = BrokenProcessPool(
699 "A result has failed to un-serialize. Please ensure that "
700 "the objects returned by the function are always "
701 "picklable."
702 )
703 tb = traceback.format_exception(
704 type(e), e, getattr(e, "__traceback__", None)
705 )
706 bpe.__cause__ = _RemoteTraceback("".join(tb))
707
708 elif wakeup_reader in ready:
709 # This is simply a wake-up event that might either trigger putting
710 # more tasks in the queue or trigger the clean up of resources.
711 is_broken = False
712 else:
713 # A worker has terminated and we don't know why, set the state of
714 # the executor as broken
715 exit_codes = ""
716 if sys.platform != "win32":
717 # In Windows, introspecting terminated workers exitcodes seems
718 # unstable, therefore they are not appended in the exception
719 # message.
720 exit_codes = (
721 "\nThe exit codes of the workers are "
722 f"{get_exitcodes_terminated_worker(self.processes)}"
723 )
724 mp.util.debug(
725 "A worker unexpectedly terminated. Workers that "
726 "might have caused the breakage: "
727 + str(
728 {
729 p.name: p.exitcode
730 for p in list(self.processes.values())
731 if p is not None and p.sentinel in ready
732 }
733 )
734 )
735 bpe = TerminatedWorkerError(
736 "A worker process managed by the executor was unexpectedly "
737 "terminated. This could be caused by a segmentation fault "
738 "while calling the function or by an excessive memory usage "
739 "causing the Operating System to kill the worker.\n"
740 f"{exit_codes}\n"
741 "Detailed tracebacks of the workers should have been printed "
742 "to stderr in the executor process if faulthandler was not "
743 "disabled."
744 )
745
746 self.thread_wakeup.clear()
747
748 return result_item, is_broken, bpe
749
750 def process_result_item(self, result_item):
751 # Process the received a result_item. This can be either the PID of a
752 # worker that exited gracefully or a _ResultItem
753
754 if isinstance(result_item, int):
755 # Clean shutdown of a worker using its PID, either on request
756 # by the executor.shutdown method or by the timeout of the worker
757 # itself: we should not mark the executor as broken.
758 with self.processes_management_lock:
759 p = self.processes.pop(result_item, None)
760
761 # p can be None if the executor is concurrently shutting down.
762 if p is not None:
763 p._worker_exit_lock.release()
764 mp.util.debug(
765 f"joining {p.name} when processing {p.pid} as result_item"
766 )
767 p.join()
768 del p
769
770 # Make sure the executor have the right number of worker, even if a
771 # worker timeout while some jobs were submitted. If some work is
772 # pending or there is less processes than running items, we need to
773 # start a new Process and raise a warning.
774 n_pending = len(self.pending_work_items)
775 n_running = len(self.running_work_items)
776 if n_pending - n_running > 0 or n_running > len(self.processes):
777 executor = self.executor_reference()
778 if (
779 executor is not None
780 and len(self.processes) < executor._max_workers
781 ):
782 warnings.warn(
783 "A worker stopped while some jobs were given to the "
784 "executor. This can be caused by a too short worker "
785 "timeout or by a memory leak.",
786 UserWarning,
787 )
788 with executor._processes_management_lock:
789 executor._adjust_process_count()
790 executor = None
791 else:
792 # Received a _ResultItem so mark the future as completed.
793 work_item = self.pending_work_items.pop(result_item.work_id, None)
794 # work_item can be None if another process terminated (see above)
795 if work_item is not None:
796 if result_item.exception:
797 work_item.future.set_exception(result_item.exception)
798 else:
799 work_item.future.set_result(result_item.result)
800 self.running_work_items.remove(result_item.work_id)
801
802 def is_shutting_down(self):
803 # Check whether we should start shutting down the executor.
804 executor = self.executor_reference()
805 # No more work items can be added if:
806 # - The interpreter is shutting down OR
807 # - The executor that owns this thread is not broken AND
808 # * The executor that owns this worker has been collected OR
809 # * The executor that owns this worker has been shutdown.
810 # If the executor is broken, it should be detected in the next loop.
811 return _global_shutdown or (
812 (executor is None or self.executor_flags.shutdown)
813 and not self.executor_flags.broken
814 )
815
816 def terminate_broken(self, bpe):
817 # Terminate the executor because it is in a broken state. The bpe
818 # argument can be used to display more information on the error that
819 # lead the executor into becoming broken.
820
821 # Mark the process pool broken so that submits fail right now.
822 self.executor_flags.flag_as_broken(bpe)
823
824 # Mark pending tasks as failed.
825 for work_item in self.pending_work_items.values():
826 work_item.future.set_exception(bpe)
827 # Delete references to object. See issue16284
828 del work_item
829 self.pending_work_items.clear()
830
831 # Terminate remaining workers forcibly: the queues or their
832 # locks may be in a dirty state and block forever.
833 self.kill_workers(reason="broken executor")
834
835 # clean up resources
836 self.join_executor_internals()
837
838 def flag_executor_shutting_down(self):
839 # Flag the executor as shutting down and cancel remaining tasks if
840 # requested as early as possible if it is not gc-ed yet.
841 self.executor_flags.flag_as_shutting_down()
842
843 # Cancel pending work items if requested.
844 if self.executor_flags.kill_workers:
845 while self.pending_work_items:
846 _, work_item = self.pending_work_items.popitem()
847 work_item.future.set_exception(
848 ShutdownExecutorError(
849 "The Executor was shutdown with `kill_workers=True` "
850 "before this job could complete."
851 )
852 )
853 del work_item
854
855 # Kill the remaining worker forcibly to no waste time joining them
856 self.kill_workers(reason="executor shutting down")
857
858 def kill_workers(self, reason=""):
859 # Terminate the remaining workers using SIGKILL. This function also
860 # terminates descendant workers of the children in case there is some
861 # nested parallelism.
862 while self.processes:
863 _, p = self.processes.popitem()
864 mp.util.debug(f"terminate process {p.name}, reason: {reason}")
865 try:
866 kill_process_tree(p)
867 except ProcessLookupError: # pragma: no cover
868 pass
869
870 def shutdown_workers(self):
871 # shutdown all workers in self.processes
872
873 # Create a list to avoid RuntimeError due to concurrent modification of
874 # processes. nb_children_alive is thus an upper bound. Also release the
875 # processes' _worker_exit_lock to accelerate the shutdown procedure, as
876 # there is no need for hand-shake here.
877 with self.processes_management_lock:
878 n_children_to_stop = 0
879 for p in list(self.processes.values()):
880 mp.util.debug(f"releasing worker exit lock on {p.name}")
881 p._worker_exit_lock.release()
882 n_children_to_stop += 1
883
884 mp.util.debug(f"found {n_children_to_stop} processes to stop")
885
886 # Send the right number of sentinels, to make sure all children are
887 # properly terminated. Do it with a mechanism that avoid hanging on
888 # Full queue when all workers have already been shutdown.
889 n_sentinels_sent = 0
890 cooldown_time = 0.001
891 while (
892 n_sentinels_sent < n_children_to_stop
893 and self.get_n_children_alive() > 0
894 ):
895 for _ in range(n_children_to_stop - n_sentinels_sent):
896 try:
897 self.call_queue.put_nowait(None)
898 n_sentinels_sent += 1
899 except queue.Full as e:
900 if cooldown_time > 5.0:
901 mp.util.info(
902 "failed to send all sentinels and exit with error."
903 f"\ncall_queue size={self.call_queue._maxsize}; "
904 f" full is {self.call_queue.full()}; "
905 )
906 raise e
907 mp.util.info(
908 "full call_queue prevented to send all sentinels at "
909 "once, waiting..."
910 )
911 sleep(cooldown_time)
912 cooldown_time *= 1.2
913 break
914
915 mp.util.debug(f"sent {n_sentinels_sent} sentinels to the call queue")
916
917 def join_executor_internals(self):
918 self.shutdown_workers()
919
920 # Release the queue's resources as soon as possible. Flag the feeder
921 # thread for clean exit to avoid having the crash detection thread flag
922 # the Executor as broken during the shutdown. This is safe as either:
923 # * We don't need to communicate with the workers anymore
924 # * There is nothing left in the Queue buffer except None sentinels
925 mp.util.debug("closing call_queue")
926 self.call_queue.close()
927 self.call_queue.join_thread()
928
929 # Closing result_queue
930 mp.util.debug("closing result_queue")
931 self.result_queue.close()
932
933 mp.util.debug("closing thread_wakeup")
934 with self.shutdown_lock:
935 self.thread_wakeup.close()
936
937 # If .join() is not called on the created processes then
938 # some ctx.Queue methods may deadlock on macOS.
939 with self.processes_management_lock:
940 mp.util.debug(f"joining {len(self.processes)} processes")
941 n_joined_processes = 0
942 while True:
943 try:
944 pid, p = self.processes.popitem()
945 mp.util.debug(f"joining process {p.name} with pid {pid}")
946 p.join()
947 n_joined_processes += 1
948 except KeyError:
949 break
950
951 mp.util.debug(
952 "executor management thread clean shutdown of "
953 f"{n_joined_processes} workers"
954 )
955
956 def get_n_children_alive(self):
957 # This is an upper bound on the number of children alive.
958 with self.processes_management_lock:
959 return sum(p.is_alive() for p in list(self.processes.values()))
960
961
962_system_limits_checked = False
963_system_limited = None
964
965
966def _check_system_limits():
967 global _system_limits_checked, _system_limited
968 if _system_limits_checked and _system_limited:
969 raise NotImplementedError(_system_limited)
970 _system_limits_checked = True
971 try:
972 nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
973 except (AttributeError, ValueError):
974 # sysconf not available or setting not available
975 return
976 if nsems_max == -1:
977 # undetermined limit, assume that limit is determined
978 # by available memory only
979 return
980 if nsems_max >= 256:
981 # minimum number of semaphores available
982 # according to POSIX
983 return
984 _system_limited = (
985 f"system provides too few semaphores ({nsems_max} available, "
986 "256 necessary)"
987 )
988 raise NotImplementedError(_system_limited)
989
990
991def _chain_from_iterable_of_lists(iterable):
992 """
993 Specialized implementation of itertools.chain.from_iterable.
994 Each item in *iterable* should be a list. This function is
995 careful not to keep references to yielded objects.
996 """
997 for element in iterable:
998 element.reverse()
999 while element:
1000 yield element.pop()
1001
1002
1003def _check_max_depth(context):
1004 # Limit the maxmal recursion level
1005 global _CURRENT_DEPTH
1006 if context.get_start_method() == "fork" and _CURRENT_DEPTH > 0:
1007 raise LokyRecursionError(
1008 "Could not spawn extra nested processes at depth superior to "
1009 "MAX_DEPTH=1. It is not possible to increase this limit when "
1010 "using the 'fork' start method."
1011 )
1012
1013 if 0 < MAX_DEPTH and _CURRENT_DEPTH + 1 > MAX_DEPTH:
1014 raise LokyRecursionError(
1015 "Could not spawn extra nested processes at depth superior to "
1016 f"MAX_DEPTH={MAX_DEPTH}. If this is intendend, you can change "
1017 "this limit with the LOKY_MAX_DEPTH environment variable."
1018 )
1019
1020
1021class LokyRecursionError(RuntimeError):
1022 """A process tries to spawn too many levels of nested processes."""
1023
1024
1025class BrokenProcessPool(_BPPException):
1026 """
1027 Raised when the executor is broken while a future was in the running state.
1028 The cause can an error raised when unpickling the task in the worker
1029 process or when unpickling the result value in the parent process. It can
1030 also be caused by a worker process being terminated unexpectedly.
1031 """
1032
1033
1034class TerminatedWorkerError(BrokenProcessPool):
1035 """
1036 Raised when a process in a ProcessPoolExecutor terminated abruptly
1037 while a future was in the running state.
1038 """
1039
1040
1041# Alias for backward compat (for code written for loky 1.1.4 and earlier). Do
1042# not use in new code.
1043BrokenExecutor = BrokenProcessPool
1044
1045
1046class ShutdownExecutorError(RuntimeError):
1047 """
1048 Raised when a ProcessPoolExecutor is shutdown while a future was in the
1049 running or pending state.
1050 """
1051
1052
1053class ProcessPoolExecutor(Executor):
1054
1055 _at_exit = None
1056
1057 def __init__(
1058 self,
1059 max_workers=None,
1060 job_reducers=None,
1061 result_reducers=None,
1062 timeout=None,
1063 context=None,
1064 initializer=None,
1065 initargs=(),
1066 env=None,
1067 ):
1068 """Initializes a new ProcessPoolExecutor instance.
1069
1070 Args:
1071 max_workers: int, optional (default: cpu_count())
1072 The maximum number of processes that can be used to execute the
1073 given calls. If None or not given then as many worker processes
1074 will be created as the number of CPUs the current process
1075 can use.
1076 job_reducers, result_reducers: dict(type: reducer_func)
1077 Custom reducer for pickling the jobs and the results from the
1078 Executor. If only `job_reducers` is provided, `result_reducer`
1079 will use the same reducers
1080 timeout: int, optional (default: None)
1081 Idle workers exit after timeout seconds. If a new job is
1082 submitted after the timeout, the executor will start enough
1083 new Python processes to make sure the pool of workers is full.
1084 context: A multiprocessing context to launch the workers. This
1085 object should provide SimpleQueue, Queue and Process.
1086 initializer: An callable used to initialize worker processes.
1087 initargs: A tuple of arguments to pass to the initializer.
1088 env: A dict of environment variable to overwrite in the child
1089 process. The environment variables are set before any module is
1090 loaded. Note that this only works with the loky context.
1091 """
1092 _check_system_limits()
1093
1094 if max_workers is None:
1095 self._max_workers = cpu_count()
1096 else:
1097 if max_workers <= 0:
1098 raise ValueError("max_workers must be greater than 0")
1099 self._max_workers = max_workers
1100
1101 if (
1102 sys.platform == "win32"
1103 and self._max_workers > _MAX_WINDOWS_WORKERS
1104 ):
1105 warnings.warn(
1106 f"On Windows, max_workers cannot exceed {_MAX_WINDOWS_WORKERS} "
1107 "due to limitations of the operating system."
1108 )
1109 self._max_workers = _MAX_WINDOWS_WORKERS
1110
1111 if context is None:
1112 context = get_context()
1113 self._context = context
1114 self._env = env
1115
1116 self._initializer, self._initargs = _prepare_initializer(
1117 initializer, initargs
1118 )
1119 _check_max_depth(self._context)
1120
1121 if result_reducers is None:
1122 result_reducers = job_reducers
1123
1124 # Timeout
1125 self._timeout = timeout
1126
1127 # Management thread
1128 self._executor_manager_thread = None
1129
1130 # Map of pids to processes
1131 self._processes = {}
1132
1133 # Internal variables of the ProcessPoolExecutor
1134 self._processes = {}
1135 self._queue_count = 0
1136 self._pending_work_items = {}
1137 self._running_work_items = []
1138 self._work_ids = queue.Queue()
1139 self._processes_management_lock = self._context.Lock()
1140 self._executor_manager_thread = None
1141 self._shutdown_lock = threading.Lock()
1142
1143 # _ThreadWakeup is a communication channel used to interrupt the wait
1144 # of the main loop of executor_manager_thread from another thread (e.g.
1145 # when calling executor.submit or executor.shutdown). We do not use the
1146 # _result_queue to send wakeup signals to the executor_manager_thread
1147 # as it could result in a deadlock if a worker process dies with the
1148 # _result_queue write lock still acquired.
1149 #
1150 # _shutdown_lock must be locked to access _ThreadWakeup.wakeup.
1151 self._executor_manager_thread_wakeup = _ThreadWakeup()
1152
1153 # Flag to hold the state of the Executor. This permits to introspect
1154 # the Executor state even once it has been garbage collected.
1155 self._flags = _ExecutorFlags(self._shutdown_lock)
1156
1157 # Finally setup the queues for interprocess communication
1158 self._setup_queues(job_reducers, result_reducers)
1159
1160 mp.util.debug("ProcessPoolExecutor is setup")
1161
1162 def _setup_queues(self, job_reducers, result_reducers, queue_size=None):
1163 # Make the call queue slightly larger than the number of processes to
1164 # prevent the worker processes from idling. But don't make it too big
1165 # because futures in the call queue cannot be cancelled.
1166 if queue_size is None:
1167 queue_size = 2 * self._max_workers + EXTRA_QUEUED_CALLS
1168 self._call_queue = _SafeQueue(
1169 max_size=queue_size,
1170 pending_work_items=self._pending_work_items,
1171 running_work_items=self._running_work_items,
1172 thread_wakeup=self._executor_manager_thread_wakeup,
1173 shutdown_lock=self._shutdown_lock,
1174 reducers=job_reducers,
1175 ctx=self._context,
1176 )
1177 # Killed worker processes can produce spurious "broken pipe"
1178 # tracebacks in the queue's own worker thread. But we detect killed
1179 # processes anyway, so silence the tracebacks.
1180 self._call_queue._ignore_epipe = True
1181
1182 self._result_queue = SimpleQueue(
1183 reducers=result_reducers, ctx=self._context
1184 )
1185
1186 def _start_executor_manager_thread(self):
1187 if self._executor_manager_thread is None:
1188 mp.util.debug("_start_executor_manager_thread called")
1189
1190 # Start the processes so that their sentinels are known.
1191 self._executor_manager_thread = _ExecutorManagerThread(self)
1192 self._executor_manager_thread.start()
1193
1194 # register this executor in a mechanism that ensures it will wakeup
1195 # when the interpreter is exiting.
1196 _threads_wakeups[self._executor_manager_thread] = (
1197 self._shutdown_lock,
1198 self._executor_manager_thread_wakeup,
1199 )
1200
1201 global process_pool_executor_at_exit
1202 if process_pool_executor_at_exit is None:
1203 # Ensure that the _python_exit function will be called before
1204 # the multiprocessing.Queue._close finalizers which have an
1205 # exitpriority of 10.
1206
1207 if sys.version_info < (3, 9):
1208 process_pool_executor_at_exit = mp.util.Finalize(
1209 None, _python_exit, exitpriority=20
1210 )
1211 else:
1212 process_pool_executor_at_exit = threading._register_atexit(
1213 _python_exit
1214 )
1215
1216 def _adjust_process_count(self):
1217 while len(self._processes) < self._max_workers:
1218 worker_exit_lock = self._context.BoundedSemaphore(1)
1219 args = (
1220 self._call_queue,
1221 self._result_queue,
1222 self._initializer,
1223 self._initargs,
1224 self._processes_management_lock,
1225 self._timeout,
1226 worker_exit_lock,
1227 _CURRENT_DEPTH + 1,
1228 )
1229 worker_exit_lock.acquire()
1230 try:
1231 # Try to spawn the process with some environment variable to
1232 # overwrite but it only works with the loky context for now.
1233 p = self._context.Process(
1234 target=_process_worker, args=args, env=self._env
1235 )
1236 except TypeError:
1237 p = self._context.Process(target=_process_worker, args=args)
1238 p._worker_exit_lock = worker_exit_lock
1239 p.start()
1240 self._processes[p.pid] = p
1241 mp.util.debug(
1242 f"Adjusted process count to {self._max_workers}: "
1243 f"{[(p.name, pid) for pid, p in self._processes.items()]}"
1244 )
1245
1246 def _ensure_executor_running(self):
1247 """ensures all workers and management thread are running"""
1248 with self._processes_management_lock:
1249 if len(self._processes) != self._max_workers:
1250 self._adjust_process_count()
1251 self._start_executor_manager_thread()
1252
1253 def submit(self, fn, *args, **kwargs):
1254 with self._flags.shutdown_lock:
1255 if self._flags.broken is not None:
1256 raise self._flags.broken
1257 if self._flags.shutdown:
1258 raise ShutdownExecutorError(
1259 "cannot schedule new futures after shutdown"
1260 )
1261
1262 # Cannot submit a new calls once the interpreter is shutting down.
1263 # This check avoids spawning new processes at exit.
1264 if _global_shutdown:
1265 raise RuntimeError(
1266 "cannot schedule new futures after interpreter shutdown"
1267 )
1268
1269 f = Future()
1270 w = _WorkItem(f, fn, args, kwargs)
1271
1272 self._pending_work_items[self._queue_count] = w
1273 self._work_ids.put(self._queue_count)
1274 self._queue_count += 1
1275 # Wake up queue management thread
1276 self._executor_manager_thread_wakeup.wakeup()
1277
1278 self._ensure_executor_running()
1279 return f
1280
1281 submit.__doc__ = Executor.submit.__doc__
1282
1283 def map(self, fn, *iterables, **kwargs):
1284 """Returns an iterator equivalent to map(fn, iter).
1285
1286 Args:
1287 fn: A callable that will take as many arguments as there are
1288 passed iterables.
1289 timeout: The maximum number of seconds to wait. If None, then there
1290 is no limit on the wait time.
1291 chunksize: If greater than one, the iterables will be chopped into
1292 chunks of size chunksize and submitted to the process pool.
1293 If set to one, the items in the list will be sent one at a
1294 time.
1295
1296 Returns:
1297 An iterator equivalent to: map(func, *iterables) but the calls may
1298 be evaluated out-of-order.
1299
1300 Raises:
1301 TimeoutError: If the entire result iterator could not be generated
1302 before the given timeout.
1303 Exception: If fn(*args) raises for any values.
1304 """
1305 timeout = kwargs.get("timeout", None)
1306 chunksize = kwargs.get("chunksize", 1)
1307 if chunksize < 1:
1308 raise ValueError("chunksize must be >= 1.")
1309
1310 results = super().map(
1311 partial(_process_chunk, fn),
1312 _get_chunks(chunksize, *iterables),
1313 timeout=timeout,
1314 )
1315 return _chain_from_iterable_of_lists(results)
1316
1317 def shutdown(self, wait=True, kill_workers=False):
1318 mp.util.debug(f"shutting down executor {self}")
1319
1320 self._flags.flag_as_shutting_down(kill_workers)
1321 executor_manager_thread = self._executor_manager_thread
1322 executor_manager_thread_wakeup = self._executor_manager_thread_wakeup
1323
1324 if executor_manager_thread_wakeup is not None:
1325 # Wake up queue management thread
1326 with self._shutdown_lock:
1327 self._executor_manager_thread_wakeup.wakeup()
1328
1329 if executor_manager_thread is not None and wait:
1330 # This locks avoids concurrent join if the interpreter
1331 # is shutting down.
1332 with _global_shutdown_lock:
1333 executor_manager_thread.join()
1334 _threads_wakeups.pop(executor_manager_thread, None)
1335
1336 # To reduce the risk of opening too many files, remove references to
1337 # objects that use file descriptors.
1338 self._executor_manager_thread = None
1339 self._executor_manager_thread_wakeup = None
1340 self._call_queue = None
1341 self._result_queue = None
1342 self._processes_management_lock = None
1343
1344 shutdown.__doc__ = Executor.shutdown.__doc__