Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/concurrent/futures/_base.py: 25%
305 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
1# Copyright 2009 Brian Quinlan. All Rights Reserved.
2# Licensed to PSF under a Contributor Agreement.
4__author__ = 'Brian Quinlan (brian@sweetapp.com)'
6import collections
7import logging
8import threading
9import time
11FIRST_COMPLETED = 'FIRST_COMPLETED'
12FIRST_EXCEPTION = 'FIRST_EXCEPTION'
13ALL_COMPLETED = 'ALL_COMPLETED'
14_AS_COMPLETED = '_AS_COMPLETED'
16# Possible future states (for internal use by the futures package).
17PENDING = 'PENDING'
18RUNNING = 'RUNNING'
19# The future was cancelled by the user...
20CANCELLED = 'CANCELLED'
21# ...and _Waiter.add_cancelled() was called by a worker.
22CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED'
23FINISHED = 'FINISHED'
25_FUTURE_STATES = [
26 PENDING,
27 RUNNING,
28 CANCELLED,
29 CANCELLED_AND_NOTIFIED,
30 FINISHED
31]
33_STATE_TO_DESCRIPTION_MAP = {
34 PENDING: "pending",
35 RUNNING: "running",
36 CANCELLED: "cancelled",
37 CANCELLED_AND_NOTIFIED: "cancelled",
38 FINISHED: "finished"
39}
41# Logger for internal use by the futures package.
42LOGGER = logging.getLogger("concurrent.futures")
44class Error(Exception):
45 """Base class for all future-related exceptions."""
46 pass
48class CancelledError(Error):
49 """The Future was cancelled."""
50 pass
52class TimeoutError(Error):
53 """The operation exceeded the given deadline."""
54 pass
56class InvalidStateError(Error):
57 """The operation is not allowed in this state."""
58 pass
60class _Waiter(object):
61 """Provides the event that wait() and as_completed() block on."""
62 def __init__(self):
63 self.event = threading.Event()
64 self.finished_futures = []
66 def add_result(self, future):
67 self.finished_futures.append(future)
69 def add_exception(self, future):
70 self.finished_futures.append(future)
72 def add_cancelled(self, future):
73 self.finished_futures.append(future)
75class _AsCompletedWaiter(_Waiter):
76 """Used by as_completed()."""
78 def __init__(self):
79 super(_AsCompletedWaiter, self).__init__()
80 self.lock = threading.Lock()
82 def add_result(self, future):
83 with self.lock:
84 super(_AsCompletedWaiter, self).add_result(future)
85 self.event.set()
87 def add_exception(self, future):
88 with self.lock:
89 super(_AsCompletedWaiter, self).add_exception(future)
90 self.event.set()
92 def add_cancelled(self, future):
93 with self.lock:
94 super(_AsCompletedWaiter, self).add_cancelled(future)
95 self.event.set()
97class _FirstCompletedWaiter(_Waiter):
98 """Used by wait(return_when=FIRST_COMPLETED)."""
100 def add_result(self, future):
101 super().add_result(future)
102 self.event.set()
104 def add_exception(self, future):
105 super().add_exception(future)
106 self.event.set()
108 def add_cancelled(self, future):
109 super().add_cancelled(future)
110 self.event.set()
112class _AllCompletedWaiter(_Waiter):
113 """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED)."""
115 def __init__(self, num_pending_calls, stop_on_exception):
116 self.num_pending_calls = num_pending_calls
117 self.stop_on_exception = stop_on_exception
118 self.lock = threading.Lock()
119 super().__init__()
121 def _decrement_pending_calls(self):
122 with self.lock:
123 self.num_pending_calls -= 1
124 if not self.num_pending_calls:
125 self.event.set()
127 def add_result(self, future):
128 super().add_result(future)
129 self._decrement_pending_calls()
131 def add_exception(self, future):
132 super().add_exception(future)
133 if self.stop_on_exception:
134 self.event.set()
135 else:
136 self._decrement_pending_calls()
138 def add_cancelled(self, future):
139 super().add_cancelled(future)
140 self._decrement_pending_calls()
142class _AcquireFutures(object):
143 """A context manager that does an ordered acquire of Future conditions."""
145 def __init__(self, futures):
146 self.futures = sorted(futures, key=id)
148 def __enter__(self):
149 for future in self.futures:
150 future._condition.acquire()
152 def __exit__(self, *args):
153 for future in self.futures:
154 future._condition.release()
156def _create_and_install_waiters(fs, return_when):
157 if return_when == _AS_COMPLETED:
158 waiter = _AsCompletedWaiter()
159 elif return_when == FIRST_COMPLETED:
160 waiter = _FirstCompletedWaiter()
161 else:
162 pending_count = sum(
163 f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs)
165 if return_when == FIRST_EXCEPTION:
166 waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True)
167 elif return_when == ALL_COMPLETED:
168 waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False)
169 else:
170 raise ValueError("Invalid return condition: %r" % return_when)
172 for f in fs:
173 f._waiters.append(waiter)
175 return waiter
178def _yield_finished_futures(fs, waiter, ref_collect):
179 """
180 Iterate on the list *fs*, yielding finished futures one by one in
181 reverse order.
182 Before yielding a future, *waiter* is removed from its waiters
183 and the future is removed from each set in the collection of sets
184 *ref_collect*.
186 The aim of this function is to avoid keeping stale references after
187 the future is yielded and before the iterator resumes.
188 """
189 while fs:
190 f = fs[-1]
191 for futures_set in ref_collect:
192 futures_set.remove(f)
193 with f._condition:
194 f._waiters.remove(waiter)
195 del f
196 # Careful not to keep a reference to the popped value
197 yield fs.pop()
200def as_completed(fs, timeout=None):
201 """An iterator over the given futures that yields each as it completes.
203 Args:
204 fs: The sequence of Futures (possibly created by different Executors) to
205 iterate over.
206 timeout: The maximum number of seconds to wait. If None, then there
207 is no limit on the wait time.
209 Returns:
210 An iterator that yields the given Futures as they complete (finished or
211 cancelled). If any given Futures are duplicated, they will be returned
212 once.
214 Raises:
215 TimeoutError: If the entire result iterator could not be generated
216 before the given timeout.
217 """
218 if timeout is not None:
219 end_time = timeout + time.monotonic()
221 fs = set(fs)
222 total_futures = len(fs)
223 with _AcquireFutures(fs):
224 finished = set(
225 f for f in fs
226 if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
227 pending = fs - finished
228 waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
229 finished = list(finished)
230 try:
231 yield from _yield_finished_futures(finished, waiter,
232 ref_collect=(fs,))
234 while pending:
235 if timeout is None:
236 wait_timeout = None
237 else:
238 wait_timeout = end_time - time.monotonic()
239 if wait_timeout < 0:
240 raise TimeoutError(
241 '%d (of %d) futures unfinished' % (
242 len(pending), total_futures))
244 waiter.event.wait(wait_timeout)
246 with waiter.lock:
247 finished = waiter.finished_futures
248 waiter.finished_futures = []
249 waiter.event.clear()
251 # reverse to keep finishing order
252 finished.reverse()
253 yield from _yield_finished_futures(finished, waiter,
254 ref_collect=(fs, pending))
256 finally:
257 # Remove waiter from unfinished futures
258 for f in fs:
259 with f._condition:
260 f._waiters.remove(waiter)
262DoneAndNotDoneFutures = collections.namedtuple(
263 'DoneAndNotDoneFutures', 'done not_done')
264def wait(fs, timeout=None, return_when=ALL_COMPLETED):
265 """Wait for the futures in the given sequence to complete.
267 Args:
268 fs: The sequence of Futures (possibly created by different Executors) to
269 wait upon.
270 timeout: The maximum number of seconds to wait. If None, then there
271 is no limit on the wait time.
272 return_when: Indicates when this function should return. The options
273 are:
275 FIRST_COMPLETED - Return when any future finishes or is
276 cancelled.
277 FIRST_EXCEPTION - Return when any future finishes by raising an
278 exception. If no future raises an exception
279 then it is equivalent to ALL_COMPLETED.
280 ALL_COMPLETED - Return when all futures finish or are cancelled.
282 Returns:
283 A named 2-tuple of sets. The first set, named 'done', contains the
284 futures that completed (is finished or cancelled) before the wait
285 completed. The second set, named 'not_done', contains uncompleted
286 futures.
287 """
288 with _AcquireFutures(fs):
289 done = set(f for f in fs
290 if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
291 not_done = set(fs) - done
293 if (return_when == FIRST_COMPLETED) and done:
294 return DoneAndNotDoneFutures(done, not_done)
295 elif (return_when == FIRST_EXCEPTION) and done:
296 if any(f for f in done
297 if not f.cancelled() and f.exception() is not None):
298 return DoneAndNotDoneFutures(done, not_done)
300 if len(done) == len(fs):
301 return DoneAndNotDoneFutures(done, not_done)
303 waiter = _create_and_install_waiters(fs, return_when)
305 waiter.event.wait(timeout)
306 for f in fs:
307 with f._condition:
308 f._waiters.remove(waiter)
310 done.update(waiter.finished_futures)
311 return DoneAndNotDoneFutures(done, set(fs) - done)
313class Future(object):
314 """Represents the result of an asynchronous computation."""
316 def __init__(self):
317 """Initializes the future. Should not be called by clients."""
318 self._condition = threading.Condition()
319 self._state = PENDING
320 self._result = None
321 self._exception = None
322 self._waiters = []
323 self._done_callbacks = []
325 def _invoke_callbacks(self):
326 for callback in self._done_callbacks:
327 try:
328 callback(self)
329 except Exception:
330 LOGGER.exception('exception calling callback for %r', self)
332 def __repr__(self):
333 with self._condition:
334 if self._state == FINISHED:
335 if self._exception:
336 return '<%s at %#x state=%s raised %s>' % (
337 self.__class__.__name__,
338 id(self),
339 _STATE_TO_DESCRIPTION_MAP[self._state],
340 self._exception.__class__.__name__)
341 else:
342 return '<%s at %#x state=%s returned %s>' % (
343 self.__class__.__name__,
344 id(self),
345 _STATE_TO_DESCRIPTION_MAP[self._state],
346 self._result.__class__.__name__)
347 return '<%s at %#x state=%s>' % (
348 self.__class__.__name__,
349 id(self),
350 _STATE_TO_DESCRIPTION_MAP[self._state])
352 def cancel(self):
353 """Cancel the future if possible.
355 Returns True if the future was cancelled, False otherwise. A future
356 cannot be cancelled if it is running or has already completed.
357 """
358 with self._condition:
359 if self._state in [RUNNING, FINISHED]:
360 return False
362 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
363 return True
365 self._state = CANCELLED
366 self._condition.notify_all()
368 self._invoke_callbacks()
369 return True
371 def cancelled(self):
372 """Return True if the future was cancelled."""
373 with self._condition:
374 return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]
376 def running(self):
377 """Return True if the future is currently executing."""
378 with self._condition:
379 return self._state == RUNNING
381 def done(self):
382 """Return True of the future was cancelled or finished executing."""
383 with self._condition:
384 return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
386 def __get_result(self):
387 if self._exception:
388 raise self._exception
389 else:
390 return self._result
392 def add_done_callback(self, fn):
393 """Attaches a callable that will be called when the future finishes.
395 Args:
396 fn: A callable that will be called with this future as its only
397 argument when the future completes or is cancelled. The callable
398 will always be called by a thread in the same process in which
399 it was added. If the future has already completed or been
400 cancelled then the callable will be called immediately. These
401 callables are called in the order that they were added.
402 """
403 with self._condition:
404 if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
405 self._done_callbacks.append(fn)
406 return
407 try:
408 fn(self)
409 except Exception:
410 LOGGER.exception('exception calling callback for %r', self)
412 def result(self, timeout=None):
413 """Return the result of the call that the future represents.
415 Args:
416 timeout: The number of seconds to wait for the result if the future
417 isn't done. If None, then there is no limit on the wait time.
419 Returns:
420 The result of the call that the future represents.
422 Raises:
423 CancelledError: If the future was cancelled.
424 TimeoutError: If the future didn't finish executing before the given
425 timeout.
426 Exception: If the call raised then that exception will be raised.
427 """
428 with self._condition:
429 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
430 raise CancelledError()
431 elif self._state == FINISHED:
432 return self.__get_result()
434 self._condition.wait(timeout)
436 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
437 raise CancelledError()
438 elif self._state == FINISHED:
439 return self.__get_result()
440 else:
441 raise TimeoutError()
443 def exception(self, timeout=None):
444 """Return the exception raised by the call that the future represents.
446 Args:
447 timeout: The number of seconds to wait for the exception if the
448 future isn't done. If None, then there is no limit on the wait
449 time.
451 Returns:
452 The exception raised by the call that the future represents or None
453 if the call completed without raising.
455 Raises:
456 CancelledError: If the future was cancelled.
457 TimeoutError: If the future didn't finish executing before the given
458 timeout.
459 """
461 with self._condition:
462 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
463 raise CancelledError()
464 elif self._state == FINISHED:
465 return self._exception
467 self._condition.wait(timeout)
469 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
470 raise CancelledError()
471 elif self._state == FINISHED:
472 return self._exception
473 else:
474 raise TimeoutError()
476 # The following methods should only be used by Executors and in tests.
477 def set_running_or_notify_cancel(self):
478 """Mark the future as running or process any cancel notifications.
480 Should only be used by Executor implementations and unit tests.
482 If the future has been cancelled (cancel() was called and returned
483 True) then any threads waiting on the future completing (though calls
484 to as_completed() or wait()) are notified and False is returned.
486 If the future was not cancelled then it is put in the running state
487 (future calls to running() will return True) and True is returned.
489 This method should be called by Executor implementations before
490 executing the work associated with this future. If this method returns
491 False then the work should not be executed.
493 Returns:
494 False if the Future was cancelled, True otherwise.
496 Raises:
497 RuntimeError: if this method was already called or if set_result()
498 or set_exception() was called.
499 """
500 with self._condition:
501 if self._state == CANCELLED:
502 self._state = CANCELLED_AND_NOTIFIED
503 for waiter in self._waiters:
504 waiter.add_cancelled(self)
505 # self._condition.notify_all() is not necessary because
506 # self.cancel() triggers a notification.
507 return False
508 elif self._state == PENDING:
509 self._state = RUNNING
510 return True
511 else:
512 LOGGER.critical('Future %s in unexpected state: %s',
513 id(self),
514 self._state)
515 raise RuntimeError('Future in unexpected state')
517 def set_result(self, result):
518 """Sets the return value of work associated with the future.
520 Should only be used by Executor implementations and unit tests.
521 """
522 with self._condition:
523 if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED}:
524 raise InvalidStateError('{}: {!r}'.format(self._state, self))
525 self._result = result
526 self._state = FINISHED
527 for waiter in self._waiters:
528 waiter.add_result(self)
529 self._condition.notify_all()
530 self._invoke_callbacks()
532 def set_exception(self, exception):
533 """Sets the result of the future as being the given exception.
535 Should only be used by Executor implementations and unit tests.
536 """
537 with self._condition:
538 if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED}:
539 raise InvalidStateError('{}: {!r}'.format(self._state, self))
540 self._exception = exception
541 self._state = FINISHED
542 for waiter in self._waiters:
543 waiter.add_exception(self)
544 self._condition.notify_all()
545 self._invoke_callbacks()
547class Executor(object):
548 """This is an abstract base class for concrete asynchronous executors."""
550 def submit(*args, **kwargs):
551 """Submits a callable to be executed with the given arguments.
553 Schedules the callable to be executed as fn(*args, **kwargs) and returns
554 a Future instance representing the execution of the callable.
556 Returns:
557 A Future representing the given call.
558 """
559 if len(args) >= 2:
560 pass
561 elif not args:
562 raise TypeError("descriptor 'submit' of 'Executor' object "
563 "needs an argument")
564 elif 'fn' in kwargs:
565 import warnings
566 warnings.warn("Passing 'fn' as keyword argument is deprecated",
567 DeprecationWarning, stacklevel=2)
568 else:
569 raise TypeError('submit expected at least 1 positional argument, '
570 'got %d' % (len(args)-1))
572 raise NotImplementedError()
573 submit.__text_signature__ = '($self, fn, /, *args, **kwargs)'
575 def map(self, fn, *iterables, timeout=None, chunksize=1):
576 """Returns an iterator equivalent to map(fn, iter).
578 Args:
579 fn: A callable that will take as many arguments as there are
580 passed iterables.
581 timeout: The maximum number of seconds to wait. If None, then there
582 is no limit on the wait time.
583 chunksize: The size of the chunks the iterable will be broken into
584 before being passed to a child process. This argument is only
585 used by ProcessPoolExecutor; it is ignored by
586 ThreadPoolExecutor.
588 Returns:
589 An iterator equivalent to: map(func, *iterables) but the calls may
590 be evaluated out-of-order.
592 Raises:
593 TimeoutError: If the entire result iterator could not be generated
594 before the given timeout.
595 Exception: If fn(*args) raises for any values.
596 """
597 if timeout is not None:
598 end_time = timeout + time.monotonic()
600 fs = [self.submit(fn, *args) for args in zip(*iterables)]
602 # Yield must be hidden in closure so that the futures are submitted
603 # before the first iterator value is required.
604 def result_iterator():
605 try:
606 # reverse to keep finishing order
607 fs.reverse()
608 while fs:
609 # Careful not to keep a reference to the popped future
610 if timeout is None:
611 yield fs.pop().result()
612 else:
613 yield fs.pop().result(end_time - time.monotonic())
614 finally:
615 for future in fs:
616 future.cancel()
617 return result_iterator()
619 def shutdown(self, wait=True):
620 """Clean-up the resources associated with the Executor.
622 It is safe to call this method several times. Otherwise, no other
623 methods can be called after this one.
625 Args:
626 wait: If True then shutdown will not return until all running
627 futures have finished executing and the resources used by the
628 executor have been reclaimed.
629 """
630 pass
632 def __enter__(self):
633 return self
635 def __exit__(self, exc_type, exc_val, exc_tb):
636 self.shutdown(wait=True)
637 return False
640class BrokenExecutor(RuntimeError):
641 """
642 Raised when a executor has become non-functional after a severe failure.
643 """