Coverage for /pythoncovmergedfiles/medio/medio/usr/lib/python3.9/concurrent/futures/_base.py: 26%
302 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:05 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:05 +0000
1# Copyright 2009 Brian Quinlan. All Rights Reserved.
2# Licensed to PSF under a Contributor Agreement.
4__author__ = 'Brian Quinlan (brian@sweetapp.com)'
6import collections
7import logging
8import threading
9import time
10import types
12FIRST_COMPLETED = 'FIRST_COMPLETED'
13FIRST_EXCEPTION = 'FIRST_EXCEPTION'
14ALL_COMPLETED = 'ALL_COMPLETED'
15_AS_COMPLETED = '_AS_COMPLETED'
17# Possible future states (for internal use by the futures package).
18PENDING = 'PENDING'
19RUNNING = 'RUNNING'
20# The future was cancelled by the user...
21CANCELLED = 'CANCELLED'
22# ...and _Waiter.add_cancelled() was called by a worker.
23CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED'
24FINISHED = 'FINISHED'
26_FUTURE_STATES = [
27 PENDING,
28 RUNNING,
29 CANCELLED,
30 CANCELLED_AND_NOTIFIED,
31 FINISHED
32]
34_STATE_TO_DESCRIPTION_MAP = {
35 PENDING: "pending",
36 RUNNING: "running",
37 CANCELLED: "cancelled",
38 CANCELLED_AND_NOTIFIED: "cancelled",
39 FINISHED: "finished"
40}
42# Logger for internal use by the futures package.
43LOGGER = logging.getLogger("concurrent.futures")
45class Error(Exception):
46 """Base class for all future-related exceptions."""
47 pass
49class CancelledError(Error):
50 """The Future was cancelled."""
51 pass
53class TimeoutError(Error):
54 """The operation exceeded the given deadline."""
55 pass
57class InvalidStateError(Error):
58 """The operation is not allowed in this state."""
59 pass
61class _Waiter(object):
62 """Provides the event that wait() and as_completed() block on."""
63 def __init__(self):
64 self.event = threading.Event()
65 self.finished_futures = []
67 def add_result(self, future):
68 self.finished_futures.append(future)
70 def add_exception(self, future):
71 self.finished_futures.append(future)
73 def add_cancelled(self, future):
74 self.finished_futures.append(future)
76class _AsCompletedWaiter(_Waiter):
77 """Used by as_completed()."""
79 def __init__(self):
80 super(_AsCompletedWaiter, self).__init__()
81 self.lock = threading.Lock()
83 def add_result(self, future):
84 with self.lock:
85 super(_AsCompletedWaiter, self).add_result(future)
86 self.event.set()
88 def add_exception(self, future):
89 with self.lock:
90 super(_AsCompletedWaiter, self).add_exception(future)
91 self.event.set()
93 def add_cancelled(self, future):
94 with self.lock:
95 super(_AsCompletedWaiter, self).add_cancelled(future)
96 self.event.set()
98class _FirstCompletedWaiter(_Waiter):
99 """Used by wait(return_when=FIRST_COMPLETED)."""
101 def add_result(self, future):
102 super().add_result(future)
103 self.event.set()
105 def add_exception(self, future):
106 super().add_exception(future)
107 self.event.set()
109 def add_cancelled(self, future):
110 super().add_cancelled(future)
111 self.event.set()
113class _AllCompletedWaiter(_Waiter):
114 """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED)."""
116 def __init__(self, num_pending_calls, stop_on_exception):
117 self.num_pending_calls = num_pending_calls
118 self.stop_on_exception = stop_on_exception
119 self.lock = threading.Lock()
120 super().__init__()
122 def _decrement_pending_calls(self):
123 with self.lock:
124 self.num_pending_calls -= 1
125 if not self.num_pending_calls:
126 self.event.set()
128 def add_result(self, future):
129 super().add_result(future)
130 self._decrement_pending_calls()
132 def add_exception(self, future):
133 super().add_exception(future)
134 if self.stop_on_exception:
135 self.event.set()
136 else:
137 self._decrement_pending_calls()
139 def add_cancelled(self, future):
140 super().add_cancelled(future)
141 self._decrement_pending_calls()
143class _AcquireFutures(object):
144 """A context manager that does an ordered acquire of Future conditions."""
146 def __init__(self, futures):
147 self.futures = sorted(futures, key=id)
149 def __enter__(self):
150 for future in self.futures:
151 future._condition.acquire()
153 def __exit__(self, *args):
154 for future in self.futures:
155 future._condition.release()
157def _create_and_install_waiters(fs, return_when):
158 if return_when == _AS_COMPLETED:
159 waiter = _AsCompletedWaiter()
160 elif return_when == FIRST_COMPLETED:
161 waiter = _FirstCompletedWaiter()
162 else:
163 pending_count = sum(
164 f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs)
166 if return_when == FIRST_EXCEPTION:
167 waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True)
168 elif return_when == ALL_COMPLETED:
169 waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False)
170 else:
171 raise ValueError("Invalid return condition: %r" % return_when)
173 for f in fs:
174 f._waiters.append(waiter)
176 return waiter
179def _yield_finished_futures(fs, waiter, ref_collect):
180 """
181 Iterate on the list *fs*, yielding finished futures one by one in
182 reverse order.
183 Before yielding a future, *waiter* is removed from its waiters
184 and the future is removed from each set in the collection of sets
185 *ref_collect*.
187 The aim of this function is to avoid keeping stale references after
188 the future is yielded and before the iterator resumes.
189 """
190 while fs:
191 f = fs[-1]
192 for futures_set in ref_collect:
193 futures_set.remove(f)
194 with f._condition:
195 f._waiters.remove(waiter)
196 del f
197 # Careful not to keep a reference to the popped value
198 yield fs.pop()
201def as_completed(fs, timeout=None):
202 """An iterator over the given futures that yields each as it completes.
204 Args:
205 fs: The sequence of Futures (possibly created by different Executors) to
206 iterate over.
207 timeout: The maximum number of seconds to wait. If None, then there
208 is no limit on the wait time.
210 Returns:
211 An iterator that yields the given Futures as they complete (finished or
212 cancelled). If any given Futures are duplicated, they will be returned
213 once.
215 Raises:
216 TimeoutError: If the entire result iterator could not be generated
217 before the given timeout.
218 """
219 if timeout is not None:
220 end_time = timeout + time.monotonic()
222 fs = set(fs)
223 total_futures = len(fs)
224 with _AcquireFutures(fs):
225 finished = set(
226 f for f in fs
227 if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
228 pending = fs - finished
229 waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
230 finished = list(finished)
231 try:
232 yield from _yield_finished_futures(finished, waiter,
233 ref_collect=(fs,))
235 while pending:
236 if timeout is None:
237 wait_timeout = None
238 else:
239 wait_timeout = end_time - time.monotonic()
240 if wait_timeout < 0:
241 raise TimeoutError(
242 '%d (of %d) futures unfinished' % (
243 len(pending), total_futures))
245 waiter.event.wait(wait_timeout)
247 with waiter.lock:
248 finished = waiter.finished_futures
249 waiter.finished_futures = []
250 waiter.event.clear()
252 # reverse to keep finishing order
253 finished.reverse()
254 yield from _yield_finished_futures(finished, waiter,
255 ref_collect=(fs, pending))
257 finally:
258 # Remove waiter from unfinished futures
259 for f in fs:
260 with f._condition:
261 f._waiters.remove(waiter)
263DoneAndNotDoneFutures = collections.namedtuple(
264 'DoneAndNotDoneFutures', 'done not_done')
265def wait(fs, timeout=None, return_when=ALL_COMPLETED):
266 """Wait for the futures in the given sequence to complete.
268 Args:
269 fs: The sequence of Futures (possibly created by different Executors) to
270 wait upon.
271 timeout: The maximum number of seconds to wait. If None, then there
272 is no limit on the wait time.
273 return_when: Indicates when this function should return. The options
274 are:
276 FIRST_COMPLETED - Return when any future finishes or is
277 cancelled.
278 FIRST_EXCEPTION - Return when any future finishes by raising an
279 exception. If no future raises an exception
280 then it is equivalent to ALL_COMPLETED.
281 ALL_COMPLETED - Return when all futures finish or are cancelled.
283 Returns:
284 A named 2-tuple of sets. The first set, named 'done', contains the
285 futures that completed (is finished or cancelled) before the wait
286 completed. The second set, named 'not_done', contains uncompleted
287 futures.
288 """
289 with _AcquireFutures(fs):
290 done = set(f for f in fs
291 if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
292 not_done = set(fs) - done
294 if (return_when == FIRST_COMPLETED) and done:
295 return DoneAndNotDoneFutures(done, not_done)
296 elif (return_when == FIRST_EXCEPTION) and done:
297 if any(f for f in done
298 if not f.cancelled() and f.exception() is not None):
299 return DoneAndNotDoneFutures(done, not_done)
301 if len(done) == len(fs):
302 return DoneAndNotDoneFutures(done, not_done)
304 waiter = _create_and_install_waiters(fs, return_when)
306 waiter.event.wait(timeout)
307 for f in fs:
308 with f._condition:
309 f._waiters.remove(waiter)
311 done.update(waiter.finished_futures)
312 return DoneAndNotDoneFutures(done, set(fs) - done)
314class Future(object):
315 """Represents the result of an asynchronous computation."""
317 def __init__(self):
318 """Initializes the future. Should not be called by clients."""
319 self._condition = threading.Condition()
320 self._state = PENDING
321 self._result = None
322 self._exception = None
323 self._waiters = []
324 self._done_callbacks = []
326 def _invoke_callbacks(self):
327 for callback in self._done_callbacks:
328 try:
329 callback(self)
330 except Exception:
331 LOGGER.exception('exception calling callback for %r', self)
333 def __repr__(self):
334 with self._condition:
335 if self._state == FINISHED:
336 if self._exception:
337 return '<%s at %#x state=%s raised %s>' % (
338 self.__class__.__name__,
339 id(self),
340 _STATE_TO_DESCRIPTION_MAP[self._state],
341 self._exception.__class__.__name__)
342 else:
343 return '<%s at %#x state=%s returned %s>' % (
344 self.__class__.__name__,
345 id(self),
346 _STATE_TO_DESCRIPTION_MAP[self._state],
347 self._result.__class__.__name__)
348 return '<%s at %#x state=%s>' % (
349 self.__class__.__name__,
350 id(self),
351 _STATE_TO_DESCRIPTION_MAP[self._state])
353 def cancel(self):
354 """Cancel the future if possible.
356 Returns True if the future was cancelled, False otherwise. A future
357 cannot be cancelled if it is running or has already completed.
358 """
359 with self._condition:
360 if self._state in [RUNNING, FINISHED]:
361 return False
363 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
364 return True
366 self._state = CANCELLED
367 self._condition.notify_all()
369 self._invoke_callbacks()
370 return True
372 def cancelled(self):
373 """Return True if the future was cancelled."""
374 with self._condition:
375 return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]
377 def running(self):
378 """Return True if the future is currently executing."""
379 with self._condition:
380 return self._state == RUNNING
382 def done(self):
383 """Return True of the future was cancelled or finished executing."""
384 with self._condition:
385 return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
387 def __get_result(self):
388 if self._exception:
389 try:
390 raise self._exception
391 finally:
392 # Break a reference cycle with the exception in self._exception
393 self = None
394 else:
395 return self._result
397 def add_done_callback(self, fn):
398 """Attaches a callable that will be called when the future finishes.
400 Args:
401 fn: A callable that will be called with this future as its only
402 argument when the future completes or is cancelled. The callable
403 will always be called by a thread in the same process in which
404 it was added. If the future has already completed or been
405 cancelled then the callable will be called immediately. These
406 callables are called in the order that they were added.
407 """
408 with self._condition:
409 if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
410 self._done_callbacks.append(fn)
411 return
412 try:
413 fn(self)
414 except Exception:
415 LOGGER.exception('exception calling callback for %r', self)
417 def result(self, timeout=None):
418 """Return the result of the call that the future represents.
420 Args:
421 timeout: The number of seconds to wait for the result if the future
422 isn't done. If None, then there is no limit on the wait time.
424 Returns:
425 The result of the call that the future represents.
427 Raises:
428 CancelledError: If the future was cancelled.
429 TimeoutError: If the future didn't finish executing before the given
430 timeout.
431 Exception: If the call raised then that exception will be raised.
432 """
433 try:
434 with self._condition:
435 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
436 raise CancelledError()
437 elif self._state == FINISHED:
438 return self.__get_result()
440 self._condition.wait(timeout)
442 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
443 raise CancelledError()
444 elif self._state == FINISHED:
445 return self.__get_result()
446 else:
447 raise TimeoutError()
448 finally:
449 # Break a reference cycle with the exception in self._exception
450 self = None
452 def exception(self, timeout=None):
453 """Return the exception raised by the call that the future represents.
455 Args:
456 timeout: The number of seconds to wait for the exception if the
457 future isn't done. If None, then there is no limit on the wait
458 time.
460 Returns:
461 The exception raised by the call that the future represents or None
462 if the call completed without raising.
464 Raises:
465 CancelledError: If the future was cancelled.
466 TimeoutError: If the future didn't finish executing before the given
467 timeout.
468 """
470 with self._condition:
471 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
472 raise CancelledError()
473 elif self._state == FINISHED:
474 return self._exception
476 self._condition.wait(timeout)
478 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
479 raise CancelledError()
480 elif self._state == FINISHED:
481 return self._exception
482 else:
483 raise TimeoutError()
485 # The following methods should only be used by Executors and in tests.
486 def set_running_or_notify_cancel(self):
487 """Mark the future as running or process any cancel notifications.
489 Should only be used by Executor implementations and unit tests.
491 If the future has been cancelled (cancel() was called and returned
492 True) then any threads waiting on the future completing (though calls
493 to as_completed() or wait()) are notified and False is returned.
495 If the future was not cancelled then it is put in the running state
496 (future calls to running() will return True) and True is returned.
498 This method should be called by Executor implementations before
499 executing the work associated with this future. If this method returns
500 False then the work should not be executed.
502 Returns:
503 False if the Future was cancelled, True otherwise.
505 Raises:
506 RuntimeError: if this method was already called or if set_result()
507 or set_exception() was called.
508 """
509 with self._condition:
510 if self._state == CANCELLED:
511 self._state = CANCELLED_AND_NOTIFIED
512 for waiter in self._waiters:
513 waiter.add_cancelled(self)
514 # self._condition.notify_all() is not necessary because
515 # self.cancel() triggers a notification.
516 return False
517 elif self._state == PENDING:
518 self._state = RUNNING
519 return True
520 else:
521 LOGGER.critical('Future %s in unexpected state: %s',
522 id(self),
523 self._state)
524 raise RuntimeError('Future in unexpected state')
526 def set_result(self, result):
527 """Sets the return value of work associated with the future.
529 Should only be used by Executor implementations and unit tests.
530 """
531 with self._condition:
532 if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED}:
533 raise InvalidStateError('{}: {!r}'.format(self._state, self))
534 self._result = result
535 self._state = FINISHED
536 for waiter in self._waiters:
537 waiter.add_result(self)
538 self._condition.notify_all()
539 self._invoke_callbacks()
541 def set_exception(self, exception):
542 """Sets the result of the future as being the given exception.
544 Should only be used by Executor implementations and unit tests.
545 """
546 with self._condition:
547 if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED}:
548 raise InvalidStateError('{}: {!r}'.format(self._state, self))
549 self._exception = exception
550 self._state = FINISHED
551 for waiter in self._waiters:
552 waiter.add_exception(self)
553 self._condition.notify_all()
554 self._invoke_callbacks()
556 __class_getitem__ = classmethod(types.GenericAlias)
558class Executor(object):
559 """This is an abstract base class for concrete asynchronous executors."""
561 def submit(self, fn, /, *args, **kwargs):
562 """Submits a callable to be executed with the given arguments.
564 Schedules the callable to be executed as fn(*args, **kwargs) and returns
565 a Future instance representing the execution of the callable.
567 Returns:
568 A Future representing the given call.
569 """
570 raise NotImplementedError()
572 def map(self, fn, *iterables, timeout=None, chunksize=1):
573 """Returns an iterator equivalent to map(fn, iter).
575 Args:
576 fn: A callable that will take as many arguments as there are
577 passed iterables.
578 timeout: The maximum number of seconds to wait. If None, then there
579 is no limit on the wait time.
580 chunksize: The size of the chunks the iterable will be broken into
581 before being passed to a child process. This argument is only
582 used by ProcessPoolExecutor; it is ignored by
583 ThreadPoolExecutor.
585 Returns:
586 An iterator equivalent to: map(func, *iterables) but the calls may
587 be evaluated out-of-order.
589 Raises:
590 TimeoutError: If the entire result iterator could not be generated
591 before the given timeout.
592 Exception: If fn(*args) raises for any values.
593 """
594 if timeout is not None:
595 end_time = timeout + time.monotonic()
597 fs = [self.submit(fn, *args) for args in zip(*iterables)]
599 # Yield must be hidden in closure so that the futures are submitted
600 # before the first iterator value is required.
601 def result_iterator():
602 try:
603 # reverse to keep finishing order
604 fs.reverse()
605 while fs:
606 # Careful not to keep a reference to the popped future
607 if timeout is None:
608 yield fs.pop().result()
609 else:
610 yield fs.pop().result(end_time - time.monotonic())
611 finally:
612 for future in fs:
613 future.cancel()
614 return result_iterator()
616 def shutdown(self, wait=True, *, cancel_futures=False):
617 """Clean-up the resources associated with the Executor.
619 It is safe to call this method several times. Otherwise, no other
620 methods can be called after this one.
622 Args:
623 wait: If True then shutdown will not return until all running
624 futures have finished executing and the resources used by the
625 executor have been reclaimed.
626 cancel_futures: If True then shutdown will cancel all pending
627 futures. Futures that are completed or running will not be
628 cancelled.
629 """
630 pass
632 def __enter__(self):
633 return self
635 def __exit__(self, exc_type, exc_val, exc_tb):
636 self.shutdown(wait=True)
637 return False
640class BrokenExecutor(RuntimeError):
641 """
642 Raised when a executor has become non-functional after a severe failure.
643 """