1###############################################################################
2# Re-implementation of the ProcessPoolExecutor more robust to faults
3#
4# author: Thomas Moreau and Olivier Grisel
5#
6# adapted from concurrent/futures/process_pool_executor.py (17/02/2017)
7# * Add an extra management thread to detect executor_manager_thread failures,
8# * Improve the shutdown process to avoid deadlocks,
9# * Add timeout for workers,
10# * More robust pickling process.
11#
12# Copyright 2009 Brian Quinlan. All Rights Reserved.
13# Licensed to PSF under a Contributor Agreement.
14
15"""Implements ProcessPoolExecutor.
16
17The follow diagram and text describe the data-flow through the system:
18
19|======================= In-process =====================|== Out-of-process ==|
20
21+----------+ +----------+ +--------+ +-----------+ +---------+
22| | => | Work Ids | | | | Call Q | | Process |
23| | +----------+ | | +-----------+ | Pool |
24| | | ... | | | | ... | +---------+
25| | | 6 | => | | => | 5, call() | => | |
26| | | 7 | | | | ... | | |
27| Process | | ... | | Local | +-----------+ | Process |
28| Pool | +----------+ | Worker | | #1..n |
29| Executor | | Thread | | |
30| | +----------- + | | +-----------+ | |
31| | <=> | Work Items | <=> | | <= | Result Q | <= | |
32| | +------------+ | | +-----------+ | |
33| | | 6: call() | | | | ... | | |
34| | | future | +--------+ | 4, result | | |
35| | | ... | | 3, except | | |
36+----------+ +------------+ +-----------+ +---------+
37
38Executor.submit() called:
39- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
40- adds the id of the _WorkItem to the "Work Ids" queue
41
42Local worker thread:
43- reads work ids from the "Work Ids" queue and looks up the corresponding
44 WorkItem from the "Work Items" dict: if the work item has been cancelled then
45 it is simply removed from the dict, otherwise it is repackaged as a
46 _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
47 until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
48 calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
49- reads _ResultItems from "Result Q", updates the future stored in the
50 "Work Items" dict and deletes the dict entry
51
52Process #1..n:
53- reads _CallItems from "Call Q", executes the calls, and puts the resulting
54 _ResultItems in "Result Q"
55"""
56
57
58__author__ = "Thomas Moreau (thomas.moreau.2010@gmail.com)"
59
60
61import os
62import gc
63import sys
64import queue
65import struct
66import weakref
67import warnings
68import itertools
69import traceback
70import threading
71from time import time, sleep
72import multiprocessing as mp
73from functools import partial
74from pickle import PicklingError
75from concurrent.futures import Executor
76from concurrent.futures._base import LOGGER
77from concurrent.futures.process import BrokenProcessPool as _BPPException
78from multiprocessing.connection import wait
79
80from ._base import Future
81from .backend import get_context
82from .backend.context import cpu_count, _MAX_WINDOWS_WORKERS
83from .backend.queues import Queue, SimpleQueue
84from .backend.reduction import set_loky_pickler, get_loky_pickler_name
85from .backend.utils import kill_process_tree, get_exitcodes_terminated_worker
86from .initializers import _prepare_initializer
87
88
89# Mechanism to prevent infinite process spawning. When a worker of a
90# ProcessPoolExecutor nested in MAX_DEPTH Executor tries to create a new
91# Executor, a LokyRecursionError is raised
92MAX_DEPTH = int(os.environ.get("LOKY_MAX_DEPTH", 10))
93_CURRENT_DEPTH = 0
94
95# Minimum time interval between two consecutive memory leak protection checks.
96_MEMORY_LEAK_CHECK_DELAY = 1.0
97
98# Number of bytes of memory usage allowed over the reference process size.
99_MAX_MEMORY_LEAK_SIZE = int(3e8)
100
101
102try:
103 from psutil import Process
104
105 _USE_PSUTIL = True
106
107 def _get_memory_usage(pid, force_gc=False):
108 if force_gc:
109 gc.collect()
110
111 mem_size = Process(pid).memory_info().rss
112 mp.util.debug(f"psutil return memory size: {mem_size}")
113 return mem_size
114
115except ImportError:
116 _USE_PSUTIL = False
117
118
119class _ThreadWakeup:
120 def __init__(self):
121 self._closed = False
122 self._reader, self._writer = mp.Pipe(duplex=False)
123
124 def close(self):
125 if not self._closed:
126 self._closed = True
127 self._writer.close()
128 self._reader.close()
129
130 def wakeup(self):
131 if not self._closed:
132 self._writer.send_bytes(b"")
133
134 def clear(self):
135 if not self._closed:
136 while self._reader.poll():
137 self._reader.recv_bytes()
138
139
140class _ExecutorFlags:
141 """necessary references to maintain executor states without preventing gc
142
143 It permits to keep the information needed by executor_manager_thread
144 and crash_detection_thread to maintain the pool without preventing the
145 garbage collection of unreferenced executors.
146 """
147
148 def __init__(self, shutdown_lock):
149
150 self.shutdown = False
151 self.broken = None
152 self.kill_workers = False
153 self.shutdown_lock = shutdown_lock
154
155 def flag_as_shutting_down(self, kill_workers=None):
156 with self.shutdown_lock:
157 self.shutdown = True
158 if kill_workers is not None:
159 self.kill_workers = kill_workers
160
161 def flag_as_broken(self, broken):
162 with self.shutdown_lock:
163 self.shutdown = True
164 self.broken = broken
165
166
167# Prior to 3.9, executor_manager_thread is created as daemon thread. This means
168# that it is not joined automatically when the interpreter is shutting down.
169# To work around this problem, an exit handler is installed to tell the
170# thread to exit when the interpreter is shutting down and then waits until
171# it finishes. The thread needs to be daemonized because the atexit hooks are
172# called after all non daemonized threads are joined.
173#
174# Starting 3.9, there exists a specific atexit hook to be called before joining
175# the threads so the executor_manager_thread does not need to be daemonized
176# anymore.
177#
178# The atexit hooks are registered when starting the first ProcessPoolExecutor
179# to avoid import having an effect on the interpreter.
180
181_global_shutdown = False
182_global_shutdown_lock = threading.Lock()
183_threads_wakeups = weakref.WeakKeyDictionary()
184
185
186def _python_exit():
187 global _global_shutdown
188 _global_shutdown = True
189
190 # Materialize the list of items to avoid error due to iterating over
191 # changing size dictionary.
192 items = list(_threads_wakeups.items())
193 if len(items) > 0:
194 mp.util.debug(
195 "Interpreter shutting down. Waking up {len(items)}"
196 f"executor_manager_thread:\n{items}"
197 )
198
199 # Wake up the executor_manager_thread's so they can detect the interpreter
200 # is shutting down and exit.
201 for _, (shutdown_lock, thread_wakeup) in items:
202 with shutdown_lock:
203 thread_wakeup.wakeup()
204
205 # Collect the executor_manager_thread's to make sure we exit cleanly.
206 for thread, _ in items:
207 # This locks is to prevent situations where an executor is gc'ed in one
208 # thread while the atexit finalizer is running in another thread. This
209 # can happen when joblib is used in pypy for instance.
210 with _global_shutdown_lock:
211 thread.join()
212
213
214# With the fork context, _thread_wakeups is propagated to children.
215# Clear it after fork to avoid some situation that can cause some
216# freeze when joining the workers.
217mp.util.register_after_fork(_threads_wakeups, lambda obj: obj.clear())
218
219
220# Module variable to register the at_exit call
221process_pool_executor_at_exit = None
222
223# Controls how many more calls than processes will be queued in the call queue.
224# A smaller number will mean that processes spend more time idle waiting for
225# work while a larger number will make Future.cancel() succeed less frequently
226# (Futures in the call queue cannot be cancelled).
227EXTRA_QUEUED_CALLS = 1
228
229
230class _RemoteTraceback(Exception):
231 """Embed stringification of remote traceback in local traceback"""
232
233 def __init__(self, tb=None):
234 self.tb = f'\n"""\n{tb}"""'
235
236 def __str__(self):
237 return self.tb
238
239
240# Do not inherit from BaseException to mirror
241# concurrent.futures.process._ExceptionWithTraceback
242class _ExceptionWithTraceback:
243 def __init__(self, exc):
244 tb = getattr(exc, "__traceback__", None)
245 if tb is None:
246 _, _, tb = sys.exc_info()
247 tb = traceback.format_exception(type(exc), exc, tb)
248 tb = "".join(tb)
249 self.exc = exc
250 self.tb = tb
251
252 def __reduce__(self):
253 return _rebuild_exc, (self.exc, self.tb)
254
255
256def _rebuild_exc(exc, tb):
257 exc.__cause__ = _RemoteTraceback(tb)
258 return exc
259
260
261class _WorkItem:
262
263 __slots__ = ["future", "fn", "args", "kwargs"]
264
265 def __init__(self, future, fn, args, kwargs):
266 self.future = future
267 self.fn = fn
268 self.args = args
269 self.kwargs = kwargs
270
271
272class _ResultItem:
273 def __init__(self, work_id, exception=None, result=None):
274 self.work_id = work_id
275 self.exception = exception
276 self.result = result
277
278
279class _CallItem:
280 def __init__(self, work_id, fn, args, kwargs):
281 self.work_id = work_id
282 self.fn = fn
283 self.args = args
284 self.kwargs = kwargs
285
286 # Store the current loky_pickler so it is correctly set in the worker
287 self.loky_pickler = get_loky_pickler_name()
288
289 def __call__(self):
290 set_loky_pickler(self.loky_pickler)
291 return self.fn(*self.args, **self.kwargs)
292
293 def __repr__(self):
294 return (
295 f"CallItem({self.work_id}, {self.fn}, {self.args}, {self.kwargs})"
296 )
297
298
299class _SafeQueue(Queue):
300 """Safe Queue set exception to the future object linked to a job"""
301
302 def __init__(
303 self,
304 max_size=0,
305 ctx=None,
306 pending_work_items=None,
307 running_work_items=None,
308 thread_wakeup=None,
309 reducers=None,
310 ):
311 self.thread_wakeup = thread_wakeup
312 self.pending_work_items = pending_work_items
313 self.running_work_items = running_work_items
314 super().__init__(max_size, reducers=reducers, ctx=ctx)
315
316 def _on_queue_feeder_error(self, e, obj):
317 if isinstance(obj, _CallItem):
318 # format traceback only works on python3
319 if isinstance(e, struct.error):
320 raised_error = RuntimeError(
321 "The task could not be sent to the workers as it is too "
322 "large for `send_bytes`."
323 )
324 else:
325 raised_error = PicklingError(
326 "Could not pickle the task to send it to the workers."
327 )
328 tb = traceback.format_exception(
329 type(e), e, getattr(e, "__traceback__", None)
330 )
331 raised_error.__cause__ = _RemoteTraceback("".join(tb))
332 work_item = self.pending_work_items.pop(obj.work_id, None)
333 self.running_work_items.remove(obj.work_id)
334 # work_item can be None if another process terminated. In this
335 # case, the executor_manager_thread fails all work_items with
336 # BrokenProcessPool
337 if work_item is not None:
338 work_item.future.set_exception(raised_error)
339 del work_item
340 self.thread_wakeup.wakeup()
341 else:
342 super()._on_queue_feeder_error(e, obj)
343
344
345def _get_chunks(chunksize, *iterables):
346 """Iterates over zip()ed iterables in chunks."""
347 it = zip(*iterables)
348 while True:
349 chunk = tuple(itertools.islice(it, chunksize))
350 if not chunk:
351 return
352 yield chunk
353
354
355def _process_chunk(fn, chunk):
356 """Processes a chunk of an iterable passed to map.
357
358 Runs the function passed to map() on a chunk of the
359 iterable passed to map.
360
361 This function is run in a separate process.
362
363 """
364 return [fn(*args) for args in chunk]
365
366
367def _sendback_result(result_queue, work_id, result=None, exception=None):
368 """Safely send back the given result or exception"""
369 try:
370 result_queue.put(
371 _ResultItem(work_id, result=result, exception=exception)
372 )
373 except BaseException as e:
374 exc = _ExceptionWithTraceback(e)
375 result_queue.put(_ResultItem(work_id, exception=exc))
376
377
378def _process_worker(
379 call_queue,
380 result_queue,
381 initializer,
382 initargs,
383 processes_management_lock,
384 timeout,
385 worker_exit_lock,
386 current_depth,
387):
388 """Evaluates calls from call_queue and places the results in result_queue.
389
390 This worker is run in a separate process.
391
392 Args:
393 call_queue: A ctx.Queue of _CallItems that will be read and
394 evaluated by the worker.
395 result_queue: A ctx.Queue of _ResultItems that will written
396 to by the worker.
397 initializer: A callable initializer, or None
398 initargs: A tuple of args for the initializer
399 processes_management_lock: A ctx.Lock avoiding worker timeout while
400 some workers are being spawned.
401 timeout: maximum time to wait for a new item in the call_queue. If that
402 time is expired, the worker will shutdown.
403 worker_exit_lock: Lock to avoid flagging the executor as broken on
404 workers timeout.
405 current_depth: Nested parallelism level, to avoid infinite spawning.
406 """
407 if initializer is not None:
408 try:
409 initializer(*initargs)
410 except BaseException:
411 LOGGER.critical("Exception in initializer:", exc_info=True)
412 # The parent will notice that the process stopped and
413 # mark the pool broken
414 return
415
416 # set the global _CURRENT_DEPTH mechanism to limit recursive call
417 global _CURRENT_DEPTH
418 _CURRENT_DEPTH = current_depth
419 _process_reference_size = None
420 _last_memory_leak_check = None
421 pid = os.getpid()
422
423 mp.util.debug(f"Worker started with timeout={timeout}")
424 while True:
425 try:
426 call_item = call_queue.get(block=True, timeout=timeout)
427 if call_item is None:
428 mp.util.info("Shutting down worker on sentinel")
429 except queue.Empty:
430 mp.util.info(f"Shutting down worker after timeout {timeout:0.3f}s")
431 if processes_management_lock.acquire(block=False):
432 processes_management_lock.release()
433 call_item = None
434 else:
435 mp.util.info("Could not acquire processes_management_lock")
436 continue
437 except BaseException:
438 previous_tb = traceback.format_exc()
439 try:
440 result_queue.put(_RemoteTraceback(previous_tb))
441 except BaseException:
442 # If we cannot format correctly the exception, at least print
443 # the traceback.
444 print(previous_tb)
445 mp.util.debug("Exiting with code 1")
446 sys.exit(1)
447 if call_item is None:
448 # Notify queue management thread about worker shutdown
449 result_queue.put(pid)
450 is_clean = worker_exit_lock.acquire(True, timeout=30)
451
452 # Early notify any loky executor running in this worker process
453 # (nested parallelism) that this process is about to shutdown to
454 # avoid a deadlock waiting undifinitely for the worker to finish.
455 _python_exit()
456
457 if is_clean:
458 mp.util.debug("Exited cleanly")
459 else:
460 mp.util.info("Main process did not release worker_exit")
461 return
462 try:
463 r = call_item()
464 except BaseException as e:
465 exc = _ExceptionWithTraceback(e)
466 result_queue.put(_ResultItem(call_item.work_id, exception=exc))
467 else:
468 _sendback_result(result_queue, call_item.work_id, result=r)
469 del r
470
471 # Free the resource as soon as possible, to avoid holding onto
472 # open files or shared memory that is not needed anymore
473 del call_item
474
475 if _USE_PSUTIL:
476 if _process_reference_size is None:
477 # Make reference measurement after the first call
478 _process_reference_size = _get_memory_usage(pid, force_gc=True)
479 _last_memory_leak_check = time()
480 continue
481 if time() - _last_memory_leak_check > _MEMORY_LEAK_CHECK_DELAY:
482 mem_usage = _get_memory_usage(pid)
483 _last_memory_leak_check = time()
484 if mem_usage - _process_reference_size < _MAX_MEMORY_LEAK_SIZE:
485 # Memory usage stays within bounds: everything is fine.
486 continue
487
488 # Check again memory usage; this time take the measurement
489 # after a forced garbage collection to break any reference
490 # cycles.
491 mem_usage = _get_memory_usage(pid, force_gc=True)
492 _last_memory_leak_check = time()
493 if mem_usage - _process_reference_size < _MAX_MEMORY_LEAK_SIZE:
494 # The GC managed to free the memory: everything is fine.
495 continue
496
497 # The process is leaking memory: let the main process
498 # know that we need to start a new worker.
499 mp.util.info("Memory leak detected: shutting down worker")
500 result_queue.put(pid)
501 with worker_exit_lock:
502 mp.util.debug("Exit due to memory leak")
503 return
504 else:
505 # if psutil is not installed, trigger gc.collect events
506 # regularly to limit potential memory leaks due to reference cycles
507 if _last_memory_leak_check is None or (
508 time() - _last_memory_leak_check > _MEMORY_LEAK_CHECK_DELAY
509 ):
510 gc.collect()
511 _last_memory_leak_check = time()
512
513
514class _ExecutorManagerThread(threading.Thread):
515 """Manages the communication between this process and the worker processes.
516
517 The manager is run in a local thread.
518
519 Args:
520 executor: A reference to the ProcessPoolExecutor that owns
521 this thread. A weakref will be own by the manager as well as
522 references to internal objects used to introspect the state of
523 the executor.
524 """
525
526 def __init__(self, executor):
527 # Store references to necessary internals of the executor.
528
529 # A _ThreadWakeup to allow waking up the executor_manager_thread from
530 # the main Thread and avoid deadlocks caused by permanently
531 # locked queues.
532 self.thread_wakeup = executor._executor_manager_thread_wakeup
533 self.shutdown_lock = executor._shutdown_lock
534
535 # A weakref.ref to the ProcessPoolExecutor that owns this thread. Used
536 # to determine if the ProcessPoolExecutor has been garbage collected
537 # and that the manager can exit.
538 # When the executor gets garbage collected, the weakref callback
539 # will wake up the queue management thread so that it can terminate
540 # if there is no pending work item.
541 def weakref_cb(
542 _,
543 thread_wakeup=self.thread_wakeup,
544 shutdown_lock=self.shutdown_lock,
545 ):
546 if mp is not None:
547 # At this point, the multiprocessing module can already be
548 # garbage collected. We only log debug info when still
549 # possible.
550 mp.util.debug(
551 "Executor collected: triggering callback for"
552 " QueueManager wakeup"
553 )
554 with shutdown_lock:
555 thread_wakeup.wakeup()
556
557 self.executor_reference = weakref.ref(executor, weakref_cb)
558
559 # The flags of the executor
560 self.executor_flags = executor._flags
561
562 # A list of the ctx.Process instances used as workers.
563 self.processes = executor._processes
564
565 # A ctx.Queue that will be filled with _CallItems derived from
566 # _WorkItems for processing by the process workers.
567 self.call_queue = executor._call_queue
568
569 # A ctx.SimpleQueue of _ResultItems generated by the process workers.
570 self.result_queue = executor._result_queue
571
572 # A queue.Queue of work ids e.g. Queue([5, 6, ...]).
573 self.work_ids_queue = executor._work_ids
574
575 # A dict mapping work ids to _WorkItems e.g.
576 # {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
577 self.pending_work_items = executor._pending_work_items
578
579 # A list of the work_ids that are currently running
580 self.running_work_items = executor._running_work_items
581
582 # A lock to avoid concurrent shutdown of workers on timeout and spawn
583 # of new processes or shut down
584 self.processes_management_lock = executor._processes_management_lock
585
586 super().__init__(name="ExecutorManagerThread")
587 if sys.version_info < (3, 9):
588 self.daemon = True
589
590 def run(self):
591 # Main loop for the executor manager thread.
592
593 while True:
594 self.add_call_item_to_queue()
595
596 result_item, is_broken, bpe = self.wait_result_broken_or_wakeup()
597
598 if is_broken:
599 self.terminate_broken(bpe)
600 return
601 if result_item is not None:
602 self.process_result_item(result_item)
603 # Delete reference to result_item to avoid keeping references
604 # while waiting on new results.
605 del result_item
606
607 if self.is_shutting_down():
608 self.flag_executor_shutting_down()
609
610 # Since no new work items can be added, it is safe to shutdown
611 # this thread if there are no pending work items.
612 if not self.pending_work_items:
613 self.join_executor_internals()
614 return
615
616 def add_call_item_to_queue(self):
617 # Fills call_queue with _WorkItems from pending_work_items.
618 # This function never blocks.
619 while True:
620 if self.call_queue.full():
621 return
622 try:
623 work_id = self.work_ids_queue.get(block=False)
624 except queue.Empty:
625 return
626 else:
627 work_item = self.pending_work_items[work_id]
628
629 if work_item.future.set_running_or_notify_cancel():
630 self.running_work_items += [work_id]
631 self.call_queue.put(
632 _CallItem(
633 work_id,
634 work_item.fn,
635 work_item.args,
636 work_item.kwargs,
637 ),
638 block=True,
639 )
640 else:
641 del self.pending_work_items[work_id]
642 continue
643
644 def wait_result_broken_or_wakeup(self):
645 # Wait for a result to be ready in the result_queue while checking
646 # that all worker processes are still running, or for a wake up
647 # signal send. The wake up signals come either from new tasks being
648 # submitted, from the executor being shutdown/gc-ed, or from the
649 # shutdown of the python interpreter.
650 result_reader = self.result_queue._reader
651 wakeup_reader = self.thread_wakeup._reader
652 readers = [result_reader, wakeup_reader]
653 worker_sentinels = [p.sentinel for p in list(self.processes.values())]
654 ready = wait(readers + worker_sentinels)
655
656 bpe = None
657 is_broken = True
658 result_item = None
659 if result_reader in ready:
660 try:
661 result_item = result_reader.recv()
662 if isinstance(result_item, _RemoteTraceback):
663 bpe = BrokenProcessPool(
664 "A task has failed to un-serialize. Please ensure that"
665 " the arguments of the function are all picklable."
666 )
667 bpe.__cause__ = result_item
668 else:
669 is_broken = False
670 except BaseException as e:
671 bpe = BrokenProcessPool(
672 "A result has failed to un-serialize. Please ensure that "
673 "the objects returned by the function are always "
674 "picklable."
675 )
676 tb = traceback.format_exception(
677 type(e), e, getattr(e, "__traceback__", None)
678 )
679 bpe.__cause__ = _RemoteTraceback("".join(tb))
680
681 elif wakeup_reader in ready:
682 # This is simply a wake-up event that might either trigger putting
683 # more tasks in the queue or trigger the clean up of resources.
684 is_broken = False
685 else:
686 # A worker has terminated and we don't know why, set the state of
687 # the executor as broken
688 exit_codes = ""
689 if sys.platform != "win32":
690 # In Windows, introspecting terminated workers exitcodes seems
691 # unstable, therefore they are not appended in the exception
692 # message.
693 exit_codes = (
694 "\nThe exit codes of the workers are "
695 f"{get_exitcodes_terminated_worker(self.processes)}"
696 )
697 mp.util.debug(
698 "A worker unexpectedly terminated. Workers that "
699 "might have caused the breakage: "
700 + str(
701 {
702 p.name: p.exitcode
703 for p in list(self.processes.values())
704 if p is not None and p.sentinel in ready
705 }
706 )
707 )
708 bpe = TerminatedWorkerError(
709 "A worker process managed by the executor was unexpectedly "
710 "terminated. This could be caused by a segmentation fault "
711 "while calling the function or by an excessive memory usage "
712 "causing the Operating System to kill the worker.\n"
713 f"{exit_codes}"
714 )
715
716 self.thread_wakeup.clear()
717
718 return result_item, is_broken, bpe
719
720 def process_result_item(self, result_item):
721 # Process the received a result_item. This can be either the PID of a
722 # worker that exited gracefully or a _ResultItem
723
724 if isinstance(result_item, int):
725 # Clean shutdown of a worker using its PID, either on request
726 # by the executor.shutdown method or by the timeout of the worker
727 # itself: we should not mark the executor as broken.
728 with self.processes_management_lock:
729 p = self.processes.pop(result_item, None)
730
731 # p can be None if the executor is concurrently shutting down.
732 if p is not None:
733 p._worker_exit_lock.release()
734 mp.util.debug(
735 f"joining {p.name} when processing {p.pid} as result_item"
736 )
737 p.join()
738 del p
739
740 # Make sure the executor have the right number of worker, even if a
741 # worker timeout while some jobs were submitted. If some work is
742 # pending or there is less processes than running items, we need to
743 # start a new Process and raise a warning.
744 n_pending = len(self.pending_work_items)
745 n_running = len(self.running_work_items)
746 if n_pending - n_running > 0 or n_running > len(self.processes):
747 executor = self.executor_reference()
748 if (
749 executor is not None
750 and len(self.processes) < executor._max_workers
751 ):
752 warnings.warn(
753 "A worker stopped while some jobs were given to the "
754 "executor. This can be caused by a too short worker "
755 "timeout or by a memory leak.",
756 UserWarning,
757 )
758 with executor._processes_management_lock:
759 executor._adjust_process_count()
760 executor = None
761 else:
762 # Received a _ResultItem so mark the future as completed.
763 work_item = self.pending_work_items.pop(result_item.work_id, None)
764 # work_item can be None if another process terminated (see above)
765 if work_item is not None:
766 if result_item.exception:
767 work_item.future.set_exception(result_item.exception)
768 else:
769 work_item.future.set_result(result_item.result)
770 self.running_work_items.remove(result_item.work_id)
771
772 def is_shutting_down(self):
773 # Check whether we should start shutting down the executor.
774 executor = self.executor_reference()
775 # No more work items can be added if:
776 # - The interpreter is shutting down OR
777 # - The executor that owns this thread is not broken AND
778 # * The executor that owns this worker has been collected OR
779 # * The executor that owns this worker has been shutdown.
780 # If the executor is broken, it should be detected in the next loop.
781 return _global_shutdown or (
782 (executor is None or self.executor_flags.shutdown)
783 and not self.executor_flags.broken
784 )
785
786 def terminate_broken(self, bpe):
787 # Terminate the executor because it is in a broken state. The bpe
788 # argument can be used to display more information on the error that
789 # lead the executor into becoming broken.
790
791 # Mark the process pool broken so that submits fail right now.
792 self.executor_flags.flag_as_broken(bpe)
793
794 # Mark pending tasks as failed.
795 for work_item in self.pending_work_items.values():
796 work_item.future.set_exception(bpe)
797 # Delete references to object. See issue16284
798 del work_item
799 self.pending_work_items.clear()
800
801 # Terminate remaining workers forcibly: the queues or their
802 # locks may be in a dirty state and block forever.
803 self.kill_workers(reason="broken executor")
804
805 # clean up resources
806 self.join_executor_internals()
807
808 def flag_executor_shutting_down(self):
809 # Flag the executor as shutting down and cancel remaining tasks if
810 # requested as early as possible if it is not gc-ed yet.
811 self.executor_flags.flag_as_shutting_down()
812
813 # Cancel pending work items if requested.
814 if self.executor_flags.kill_workers:
815 while self.pending_work_items:
816 _, work_item = self.pending_work_items.popitem()
817 work_item.future.set_exception(
818 ShutdownExecutorError(
819 "The Executor was shutdown with `kill_workers=True` "
820 "before this job could complete."
821 )
822 )
823 del work_item
824
825 # Kill the remaining worker forcibly to no waste time joining them
826 self.kill_workers(reason="executor shutting down")
827
828 def kill_workers(self, reason=""):
829 # Terminate the remaining workers using SIGKILL. This function also
830 # terminates descendant workers of the children in case there is some
831 # nested parallelism.
832 while self.processes:
833 _, p = self.processes.popitem()
834 mp.util.debug(f"terminate process {p.name}, reason: {reason}")
835 try:
836 kill_process_tree(p)
837 except ProcessLookupError: # pragma: no cover
838 pass
839
840 def shutdown_workers(self):
841 # shutdown all workers in self.processes
842
843 # Create a list to avoid RuntimeError due to concurrent modification of
844 # processes. nb_children_alive is thus an upper bound. Also release the
845 # processes' _worker_exit_lock to accelerate the shutdown procedure, as
846 # there is no need for hand-shake here.
847 with self.processes_management_lock:
848 n_children_to_stop = 0
849 for p in list(self.processes.values()):
850 mp.util.debug(f"releasing worker exit lock on {p.name}")
851 p._worker_exit_lock.release()
852 n_children_to_stop += 1
853
854 mp.util.debug(f"found {n_children_to_stop} processes to stop")
855
856 # Send the right number of sentinels, to make sure all children are
857 # properly terminated. Do it with a mechanism that avoid hanging on
858 # Full queue when all workers have already been shutdown.
859 n_sentinels_sent = 0
860 cooldown_time = 0.001
861 while (
862 n_sentinels_sent < n_children_to_stop
863 and self.get_n_children_alive() > 0
864 ):
865 for _ in range(n_children_to_stop - n_sentinels_sent):
866 try:
867 self.call_queue.put_nowait(None)
868 n_sentinels_sent += 1
869 except queue.Full as e:
870 if cooldown_time > 5.0:
871 mp.util.info(
872 "failed to send all sentinels and exit with error."
873 f"\ncall_queue size={self.call_queue._maxsize}; "
874 f" full is {self.call_queue.full()}; "
875 )
876 raise e
877 mp.util.info(
878 "full call_queue prevented to send all sentinels at "
879 "once, waiting..."
880 )
881 sleep(cooldown_time)
882 cooldown_time *= 1.2
883 break
884
885 mp.util.debug(f"sent {n_sentinels_sent} sentinels to the call queue")
886
887 def join_executor_internals(self):
888 self.shutdown_workers()
889
890 # Release the queue's resources as soon as possible. Flag the feeder
891 # thread for clean exit to avoid having the crash detection thread flag
892 # the Executor as broken during the shutdown. This is safe as either:
893 # * We don't need to communicate with the workers anymore
894 # * There is nothing left in the Queue buffer except None sentinels
895 mp.util.debug("closing call_queue")
896 self.call_queue.close()
897 self.call_queue.join_thread()
898
899 # Closing result_queue
900 mp.util.debug("closing result_queue")
901 self.result_queue.close()
902
903 mp.util.debug("closing thread_wakeup")
904 with self.shutdown_lock:
905 self.thread_wakeup.close()
906
907 # If .join() is not called on the created processes then
908 # some ctx.Queue methods may deadlock on macOS.
909 with self.processes_management_lock:
910 mp.util.debug(f"joining {len(self.processes)} processes")
911 n_joined_processes = 0
912 while True:
913 try:
914 pid, p = self.processes.popitem()
915 mp.util.debug(f"joining process {p.name} with pid {pid}")
916 p.join()
917 n_joined_processes += 1
918 except KeyError:
919 break
920
921 mp.util.debug(
922 "executor management thread clean shutdown of "
923 f"{n_joined_processes} workers"
924 )
925
926 def get_n_children_alive(self):
927 # This is an upper bound on the number of children alive.
928 with self.processes_management_lock:
929 return sum(p.is_alive() for p in list(self.processes.values()))
930
931
932_system_limits_checked = False
933_system_limited = None
934
935
936def _check_system_limits():
937 global _system_limits_checked, _system_limited
938 if _system_limits_checked and _system_limited:
939 raise NotImplementedError(_system_limited)
940 _system_limits_checked = True
941 try:
942 nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
943 except (AttributeError, ValueError):
944 # sysconf not available or setting not available
945 return
946 if nsems_max == -1:
947 # undetermined limit, assume that limit is determined
948 # by available memory only
949 return
950 if nsems_max >= 256:
951 # minimum number of semaphores available
952 # according to POSIX
953 return
954 _system_limited = (
955 f"system provides too few semaphores ({nsems_max} available, "
956 "256 necessary)"
957 )
958 raise NotImplementedError(_system_limited)
959
960
961def _chain_from_iterable_of_lists(iterable):
962 """
963 Specialized implementation of itertools.chain.from_iterable.
964 Each item in *iterable* should be a list. This function is
965 careful not to keep references to yielded objects.
966 """
967 for element in iterable:
968 element.reverse()
969 while element:
970 yield element.pop()
971
972
973def _check_max_depth(context):
974 # Limit the maxmal recursion level
975 global _CURRENT_DEPTH
976 if context.get_start_method() == "fork" and _CURRENT_DEPTH > 0:
977 raise LokyRecursionError(
978 "Could not spawn extra nested processes at depth superior to "
979 "MAX_DEPTH=1. It is not possible to increase this limit when "
980 "using the 'fork' start method."
981 )
982
983 if 0 < MAX_DEPTH and _CURRENT_DEPTH + 1 > MAX_DEPTH:
984 raise LokyRecursionError(
985 "Could not spawn extra nested processes at depth superior to "
986 f"MAX_DEPTH={MAX_DEPTH}. If this is intendend, you can change "
987 "this limit with the LOKY_MAX_DEPTH environment variable."
988 )
989
990
991class LokyRecursionError(RuntimeError):
992 """A process tries to spawn too many levels of nested processes."""
993
994
995class BrokenProcessPool(_BPPException):
996 """
997 Raised when the executor is broken while a future was in the running state.
998 The cause can an error raised when unpickling the task in the worker
999 process or when unpickling the result value in the parent process. It can
1000 also be caused by a worker process being terminated unexpectedly.
1001 """
1002
1003
1004class TerminatedWorkerError(BrokenProcessPool):
1005 """
1006 Raised when a process in a ProcessPoolExecutor terminated abruptly
1007 while a future was in the running state.
1008 """
1009
1010
1011# Alias for backward compat (for code written for loky 1.1.4 and earlier). Do
1012# not use in new code.
1013BrokenExecutor = BrokenProcessPool
1014
1015
1016class ShutdownExecutorError(RuntimeError):
1017
1018 """
1019 Raised when a ProcessPoolExecutor is shutdown while a future was in the
1020 running or pending state.
1021 """
1022
1023
1024class ProcessPoolExecutor(Executor):
1025
1026 _at_exit = None
1027
1028 def __init__(
1029 self,
1030 max_workers=None,
1031 job_reducers=None,
1032 result_reducers=None,
1033 timeout=None,
1034 context=None,
1035 initializer=None,
1036 initargs=(),
1037 env=None,
1038 ):
1039 """Initializes a new ProcessPoolExecutor instance.
1040
1041 Args:
1042 max_workers: int, optional (default: cpu_count())
1043 The maximum number of processes that can be used to execute the
1044 given calls. If None or not given then as many worker processes
1045 will be created as the number of CPUs the current process
1046 can use.
1047 job_reducers, result_reducers: dict(type: reducer_func)
1048 Custom reducer for pickling the jobs and the results from the
1049 Executor. If only `job_reducers` is provided, `result_reducer`
1050 will use the same reducers
1051 timeout: int, optional (default: None)
1052 Idle workers exit after timeout seconds. If a new job is
1053 submitted after the timeout, the executor will start enough
1054 new Python processes to make sure the pool of workers is full.
1055 context: A multiprocessing context to launch the workers. This
1056 object should provide SimpleQueue, Queue and Process.
1057 initializer: An callable used to initialize worker processes.
1058 initargs: A tuple of arguments to pass to the initializer.
1059 env: A dict of environment variable to overwrite in the child
1060 process. The environment variables are set before any module is
1061 loaded. Note that this only works with the loky context.
1062 """
1063 _check_system_limits()
1064
1065 if max_workers is None:
1066 self._max_workers = cpu_count()
1067 else:
1068 if max_workers <= 0:
1069 raise ValueError("max_workers must be greater than 0")
1070 self._max_workers = max_workers
1071
1072 if (
1073 sys.platform == "win32"
1074 and self._max_workers > _MAX_WINDOWS_WORKERS
1075 ):
1076 warnings.warn(
1077 f"On Windows, max_workers cannot exceed {_MAX_WINDOWS_WORKERS} "
1078 "due to limitations of the operating system."
1079 )
1080 self._max_workers = _MAX_WINDOWS_WORKERS
1081
1082 if context is None:
1083 context = get_context()
1084 self._context = context
1085 self._env = env
1086
1087 self._initializer, self._initargs = _prepare_initializer(
1088 initializer, initargs
1089 )
1090 _check_max_depth(self._context)
1091
1092 if result_reducers is None:
1093 result_reducers = job_reducers
1094
1095 # Timeout
1096 self._timeout = timeout
1097
1098 # Management thread
1099 self._executor_manager_thread = None
1100
1101 # Map of pids to processes
1102 self._processes = {}
1103
1104 # Internal variables of the ProcessPoolExecutor
1105 self._processes = {}
1106 self._queue_count = 0
1107 self._pending_work_items = {}
1108 self._running_work_items = []
1109 self._work_ids = queue.Queue()
1110 self._processes_management_lock = self._context.Lock()
1111 self._executor_manager_thread = None
1112 self._shutdown_lock = threading.Lock()
1113
1114 # _ThreadWakeup is a communication channel used to interrupt the wait
1115 # of the main loop of executor_manager_thread from another thread (e.g.
1116 # when calling executor.submit or executor.shutdown). We do not use the
1117 # _result_queue to send wakeup signals to the executor_manager_thread
1118 # as it could result in a deadlock if a worker process dies with the
1119 # _result_queue write lock still acquired.
1120 #
1121 # _shutdown_lock must be locked to access _ThreadWakeup.wakeup.
1122 self._executor_manager_thread_wakeup = _ThreadWakeup()
1123
1124 # Flag to hold the state of the Executor. This permits to introspect
1125 # the Executor state even once it has been garbage collected.
1126 self._flags = _ExecutorFlags(self._shutdown_lock)
1127
1128 # Finally setup the queues for interprocess communication
1129 self._setup_queues(job_reducers, result_reducers)
1130
1131 mp.util.debug("ProcessPoolExecutor is setup")
1132
1133 def _setup_queues(self, job_reducers, result_reducers, queue_size=None):
1134 # Make the call queue slightly larger than the number of processes to
1135 # prevent the worker processes from idling. But don't make it too big
1136 # because futures in the call queue cannot be cancelled.
1137 if queue_size is None:
1138 queue_size = 2 * self._max_workers + EXTRA_QUEUED_CALLS
1139 self._call_queue = _SafeQueue(
1140 max_size=queue_size,
1141 pending_work_items=self._pending_work_items,
1142 running_work_items=self._running_work_items,
1143 thread_wakeup=self._executor_manager_thread_wakeup,
1144 reducers=job_reducers,
1145 ctx=self._context,
1146 )
1147 # Killed worker processes can produce spurious "broken pipe"
1148 # tracebacks in the queue's own worker thread. But we detect killed
1149 # processes anyway, so silence the tracebacks.
1150 self._call_queue._ignore_epipe = True
1151
1152 self._result_queue = SimpleQueue(
1153 reducers=result_reducers, ctx=self._context
1154 )
1155
1156 def _start_executor_manager_thread(self):
1157 if self._executor_manager_thread is None:
1158 mp.util.debug("_start_executor_manager_thread called")
1159
1160 # Start the processes so that their sentinels are known.
1161 self._executor_manager_thread = _ExecutorManagerThread(self)
1162 self._executor_manager_thread.start()
1163
1164 # register this executor in a mechanism that ensures it will wakeup
1165 # when the interpreter is exiting.
1166 _threads_wakeups[self._executor_manager_thread] = (
1167 self._shutdown_lock,
1168 self._executor_manager_thread_wakeup,
1169 )
1170
1171 global process_pool_executor_at_exit
1172 if process_pool_executor_at_exit is None:
1173 # Ensure that the _python_exit function will be called before
1174 # the multiprocessing.Queue._close finalizers which have an
1175 # exitpriority of 10.
1176
1177 if sys.version_info < (3, 9):
1178 process_pool_executor_at_exit = mp.util.Finalize(
1179 None, _python_exit, exitpriority=20
1180 )
1181 else:
1182 process_pool_executor_at_exit = threading._register_atexit(
1183 _python_exit
1184 )
1185
1186 def _adjust_process_count(self):
1187 while len(self._processes) < self._max_workers:
1188 worker_exit_lock = self._context.BoundedSemaphore(1)
1189 args = (
1190 self._call_queue,
1191 self._result_queue,
1192 self._initializer,
1193 self._initargs,
1194 self._processes_management_lock,
1195 self._timeout,
1196 worker_exit_lock,
1197 _CURRENT_DEPTH + 1,
1198 )
1199 worker_exit_lock.acquire()
1200 try:
1201 # Try to spawn the process with some environment variable to
1202 # overwrite but it only works with the loky context for now.
1203 p = self._context.Process(
1204 target=_process_worker, args=args, env=self._env
1205 )
1206 except TypeError:
1207 p = self._context.Process(target=_process_worker, args=args)
1208 p._worker_exit_lock = worker_exit_lock
1209 p.start()
1210 self._processes[p.pid] = p
1211 mp.util.debug(
1212 f"Adjusted process count to {self._max_workers}: "
1213 f"{[(p.name, pid) for pid, p in self._processes.items()]}"
1214 )
1215
1216 def _ensure_executor_running(self):
1217 """ensures all workers and management thread are running"""
1218 with self._processes_management_lock:
1219 if len(self._processes) != self._max_workers:
1220 self._adjust_process_count()
1221 self._start_executor_manager_thread()
1222
1223 def submit(self, fn, *args, **kwargs):
1224 with self._flags.shutdown_lock:
1225 if self._flags.broken is not None:
1226 raise self._flags.broken
1227 if self._flags.shutdown:
1228 raise ShutdownExecutorError(
1229 "cannot schedule new futures after shutdown"
1230 )
1231
1232 # Cannot submit a new calls once the interpreter is shutting down.
1233 # This check avoids spawning new processes at exit.
1234 if _global_shutdown:
1235 raise RuntimeError(
1236 "cannot schedule new futures after " "interpreter shutdown"
1237 )
1238
1239 f = Future()
1240 w = _WorkItem(f, fn, args, kwargs)
1241
1242 self._pending_work_items[self._queue_count] = w
1243 self._work_ids.put(self._queue_count)
1244 self._queue_count += 1
1245 # Wake up queue management thread
1246 self._executor_manager_thread_wakeup.wakeup()
1247
1248 self._ensure_executor_running()
1249 return f
1250
1251 submit.__doc__ = Executor.submit.__doc__
1252
1253 def map(self, fn, *iterables, **kwargs):
1254 """Returns an iterator equivalent to map(fn, iter).
1255
1256 Args:
1257 fn: A callable that will take as many arguments as there are
1258 passed iterables.
1259 timeout: The maximum number of seconds to wait. If None, then there
1260 is no limit on the wait time.
1261 chunksize: If greater than one, the iterables will be chopped into
1262 chunks of size chunksize and submitted to the process pool.
1263 If set to one, the items in the list will be sent one at a
1264 time.
1265
1266 Returns:
1267 An iterator equivalent to: map(func, *iterables) but the calls may
1268 be evaluated out-of-order.
1269
1270 Raises:
1271 TimeoutError: If the entire result iterator could not be generated
1272 before the given timeout.
1273 Exception: If fn(*args) raises for any values.
1274 """
1275 timeout = kwargs.get("timeout", None)
1276 chunksize = kwargs.get("chunksize", 1)
1277 if chunksize < 1:
1278 raise ValueError("chunksize must be >= 1.")
1279
1280 results = super().map(
1281 partial(_process_chunk, fn),
1282 _get_chunks(chunksize, *iterables),
1283 timeout=timeout,
1284 )
1285 return _chain_from_iterable_of_lists(results)
1286
1287 def shutdown(self, wait=True, kill_workers=False):
1288 mp.util.debug(f"shutting down executor {self}")
1289
1290 self._flags.flag_as_shutting_down(kill_workers)
1291 executor_manager_thread = self._executor_manager_thread
1292 executor_manager_thread_wakeup = self._executor_manager_thread_wakeup
1293
1294 if executor_manager_thread_wakeup is not None:
1295 # Wake up queue management thread
1296 with self._shutdown_lock:
1297 self._executor_manager_thread_wakeup.wakeup()
1298
1299 if executor_manager_thread is not None and wait:
1300 # This locks avoids concurrent join if the interpreter
1301 # is shutting down.
1302 with _global_shutdown_lock:
1303 executor_manager_thread.join()
1304 _threads_wakeups.pop(executor_manager_thread, None)
1305
1306 # To reduce the risk of opening too many files, remove references to
1307 # objects that use file descriptors.
1308 self._executor_manager_thread = None
1309 self._executor_manager_thread_wakeup = None
1310 self._call_queue = None
1311 self._result_queue = None
1312 self._processes_management_lock = None
1313
1314 shutdown.__doc__ = Executor.shutdown.__doc__