Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/asyncio/tasks.py: 1%
468 statements
« prev ^ index » next coverage.py v7.0.5, created at 2023-01-17 06:13 +0000
« prev ^ index » next coverage.py v7.0.5, created at 2023-01-17 06:13 +0000
1"""Support for tasks, coroutines and the scheduler."""
3__all__ = (
4 'Task', 'create_task',
5 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
6 'wait', 'wait_for', 'as_completed', 'sleep',
7 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
8 'current_task', 'all_tasks',
9 '_register_task', '_unregister_task', '_enter_task', '_leave_task',
10)
12import concurrent.futures
13import contextvars
14import functools
15import inspect
16import itertools
17import types
18import warnings
19import weakref
21from . import base_tasks
22from . import coroutines
23from . import events
24from . import exceptions
25from . import futures
26from .coroutines import _is_coroutine
28# Helper to generate new task names
29# This uses itertools.count() instead of a "+= 1" operation because the latter
30# is not thread safe. See bpo-11866 for a longer explanation.
31_task_name_counter = itertools.count(1).__next__
34def current_task(loop=None):
35 """Return a currently executed task."""
36 if loop is None:
37 loop = events.get_running_loop()
38 return _current_tasks.get(loop)
41def all_tasks(loop=None):
42 """Return a set of all tasks for the loop."""
43 if loop is None:
44 loop = events.get_running_loop()
45 # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another
46 # thread while we do so. Therefore we cast it to list prior to filtering. The list
47 # cast itself requires iteration, so we repeat it several times ignoring
48 # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for
49 # details.
50 i = 0
51 while True:
52 try:
53 tasks = list(_all_tasks)
54 except RuntimeError:
55 i += 1
56 if i >= 1000:
57 raise
58 else:
59 break
60 return {t for t in tasks
61 if futures._get_loop(t) is loop and not t.done()}
64def _all_tasks_compat(loop=None):
65 # Different from "all_task()" by returning *all* Tasks, including
66 # the completed ones. Used to implement deprecated "Tasks.all_task()"
67 # method.
68 if loop is None:
69 loop = events.get_event_loop()
70 # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another
71 # thread while we do so. Therefore we cast it to list prior to filtering. The list
72 # cast itself requires iteration, so we repeat it several times ignoring
73 # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for
74 # details.
75 i = 0
76 while True:
77 try:
78 tasks = list(_all_tasks)
79 except RuntimeError:
80 i += 1
81 if i >= 1000:
82 raise
83 else:
84 break
85 return {t for t in tasks if futures._get_loop(t) is loop}
88def _set_task_name(task, name):
89 if name is not None:
90 try:
91 set_name = task.set_name
92 except AttributeError:
93 pass
94 else:
95 set_name(name)
98class Task(futures._PyFuture): # Inherit Python Task implementation
99 # from a Python Future implementation.
101 """A coroutine wrapped in a Future."""
103 # An important invariant maintained while a Task not done:
104 #
105 # - Either _fut_waiter is None, and _step() is scheduled;
106 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
107 #
108 # The only transition from the latter to the former is through
109 # _wakeup(). When _fut_waiter is not None, one of its callbacks
110 # must be _wakeup().
112 # If False, don't log a message if the task is destroyed whereas its
113 # status is still pending
114 _log_destroy_pending = True
116 @classmethod
117 def current_task(cls, loop=None):
118 """Return the currently running task in an event loop or None.
120 By default the current task for the current event loop is returned.
122 None is returned when called not in the context of a Task.
123 """
124 warnings.warn("Task.current_task() is deprecated since Python 3.7, "
125 "use asyncio.current_task() instead",
126 DeprecationWarning,
127 stacklevel=2)
128 if loop is None:
129 loop = events.get_event_loop()
130 return current_task(loop)
132 @classmethod
133 def all_tasks(cls, loop=None):
134 """Return a set of all tasks for an event loop.
136 By default all tasks for the current event loop are returned.
137 """
138 warnings.warn("Task.all_tasks() is deprecated since Python 3.7, "
139 "use asyncio.all_tasks() instead",
140 DeprecationWarning,
141 stacklevel=2)
142 return _all_tasks_compat(loop)
144 def __init__(self, coro, *, loop=None, name=None):
145 super().__init__(loop=loop)
146 if self._source_traceback:
147 del self._source_traceback[-1]
148 if not coroutines.iscoroutine(coro):
149 # raise after Future.__init__(), attrs are required for __del__
150 # prevent logging for pending task in __del__
151 self._log_destroy_pending = False
152 raise TypeError(f"a coroutine was expected, got {coro!r}")
154 if name is None:
155 self._name = f'Task-{_task_name_counter()}'
156 else:
157 self._name = str(name)
159 self._must_cancel = False
160 self._fut_waiter = None
161 self._coro = coro
162 self._context = contextvars.copy_context()
164 self._loop.call_soon(self.__step, context=self._context)
165 _register_task(self)
167 def __del__(self):
168 if self._state == futures._PENDING and self._log_destroy_pending:
169 context = {
170 'task': self,
171 'message': 'Task was destroyed but it is pending!',
172 }
173 if self._source_traceback:
174 context['source_traceback'] = self._source_traceback
175 self._loop.call_exception_handler(context)
176 super().__del__()
178 def _repr_info(self):
179 return base_tasks._task_repr_info(self)
181 def get_coro(self):
182 return self._coro
184 def get_name(self):
185 return self._name
187 def set_name(self, value):
188 self._name = str(value)
190 def set_result(self, result):
191 raise RuntimeError('Task does not support set_result operation')
193 def set_exception(self, exception):
194 raise RuntimeError('Task does not support set_exception operation')
196 def get_stack(self, *, limit=None):
197 """Return the list of stack frames for this task's coroutine.
199 If the coroutine is not done, this returns the stack where it is
200 suspended. If the coroutine has completed successfully or was
201 cancelled, this returns an empty list. If the coroutine was
202 terminated by an exception, this returns the list of traceback
203 frames.
205 The frames are always ordered from oldest to newest.
207 The optional limit gives the maximum number of frames to
208 return; by default all available frames are returned. Its
209 meaning differs depending on whether a stack or a traceback is
210 returned: the newest frames of a stack are returned, but the
211 oldest frames of a traceback are returned. (This matches the
212 behavior of the traceback module.)
214 For reasons beyond our control, only one stack frame is
215 returned for a suspended coroutine.
216 """
217 return base_tasks._task_get_stack(self, limit)
219 def print_stack(self, *, limit=None, file=None):
220 """Print the stack or traceback for this task's coroutine.
222 This produces output similar to that of the traceback module,
223 for the frames retrieved by get_stack(). The limit argument
224 is passed to get_stack(). The file argument is an I/O stream
225 to which the output is written; by default output is written
226 to sys.stderr.
227 """
228 return base_tasks._task_print_stack(self, limit, file)
230 def cancel(self):
231 """Request that this task cancel itself.
233 This arranges for a CancelledError to be thrown into the
234 wrapped coroutine on the next cycle through the event loop.
235 The coroutine then has a chance to clean up or even deny
236 the request using try/except/finally.
238 Unlike Future.cancel, this does not guarantee that the
239 task will be cancelled: the exception might be caught and
240 acted upon, delaying cancellation of the task or preventing
241 cancellation completely. The task may also return a value or
242 raise a different exception.
244 Immediately after this method is called, Task.cancelled() will
245 not return True (unless the task was already cancelled). A
246 task will be marked as cancelled when the wrapped coroutine
247 terminates with a CancelledError exception (even if cancel()
248 was not called).
249 """
250 self._log_traceback = False
251 if self.done():
252 return False
253 if self._fut_waiter is not None:
254 if self._fut_waiter.cancel():
255 # Leave self._fut_waiter; it may be a Task that
256 # catches and ignores the cancellation so we may have
257 # to cancel it again later.
258 return True
259 # It must be the case that self.__step is already scheduled.
260 self._must_cancel = True
261 return True
263 def __step(self, exc=None):
264 if self.done():
265 raise exceptions.InvalidStateError(
266 f'_step(): already done: {self!r}, {exc!r}')
267 if self._must_cancel:
268 if not isinstance(exc, exceptions.CancelledError):
269 exc = exceptions.CancelledError()
270 self._must_cancel = False
271 coro = self._coro
272 self._fut_waiter = None
274 _enter_task(self._loop, self)
275 # Call either coro.throw(exc) or coro.send(None).
276 try:
277 if exc is None:
278 # We use the `send` method directly, because coroutines
279 # don't have `__iter__` and `__next__` methods.
280 result = coro.send(None)
281 else:
282 result = coro.throw(exc)
283 except StopIteration as exc:
284 if self._must_cancel:
285 # Task is cancelled right before coro stops.
286 self._must_cancel = False
287 super().cancel()
288 else:
289 super().set_result(exc.value)
290 except exceptions.CancelledError:
291 super().cancel() # I.e., Future.cancel(self).
292 except (KeyboardInterrupt, SystemExit) as exc:
293 super().set_exception(exc)
294 raise
295 except BaseException as exc:
296 super().set_exception(exc)
297 else:
298 blocking = getattr(result, '_asyncio_future_blocking', None)
299 if blocking is not None:
300 # Yielded Future must come from Future.__iter__().
301 if futures._get_loop(result) is not self._loop:
302 new_exc = RuntimeError(
303 f'Task {self!r} got Future '
304 f'{result!r} attached to a different loop')
305 self._loop.call_soon(
306 self.__step, new_exc, context=self._context)
307 elif blocking:
308 if result is self:
309 new_exc = RuntimeError(
310 f'Task cannot await on itself: {self!r}')
311 self._loop.call_soon(
312 self.__step, new_exc, context=self._context)
313 else:
314 result._asyncio_future_blocking = False
315 result.add_done_callback(
316 self.__wakeup, context=self._context)
317 self._fut_waiter = result
318 if self._must_cancel:
319 if self._fut_waiter.cancel():
320 self._must_cancel = False
321 else:
322 new_exc = RuntimeError(
323 f'yield was used instead of yield from '
324 f'in task {self!r} with {result!r}')
325 self._loop.call_soon(
326 self.__step, new_exc, context=self._context)
328 elif result is None:
329 # Bare yield relinquishes control for one event loop iteration.
330 self._loop.call_soon(self.__step, context=self._context)
331 elif inspect.isgenerator(result):
332 # Yielding a generator is just wrong.
333 new_exc = RuntimeError(
334 f'yield was used instead of yield from for '
335 f'generator in task {self!r} with {result!r}')
336 self._loop.call_soon(
337 self.__step, new_exc, context=self._context)
338 else:
339 # Yielding something else is an error.
340 new_exc = RuntimeError(f'Task got bad yield: {result!r}')
341 self._loop.call_soon(
342 self.__step, new_exc, context=self._context)
343 finally:
344 _leave_task(self._loop, self)
345 self = None # Needed to break cycles when an exception occurs.
347 def __wakeup(self, future):
348 try:
349 future.result()
350 except BaseException as exc:
351 # This may also be a cancellation.
352 self.__step(exc)
353 else:
354 # Don't pass the value of `future.result()` explicitly,
355 # as `Future.__iter__` and `Future.__await__` don't need it.
356 # If we call `_step(value, None)` instead of `_step()`,
357 # Python eval loop would use `.send(value)` method call,
358 # instead of `__next__()`, which is slower for futures
359 # that return non-generator iterators from their `__iter__`.
360 self.__step()
361 self = None # Needed to break cycles when an exception occurs.
364_PyTask = Task
367try:
368 import _asyncio
369except ImportError:
370 pass
371else:
372 # _CTask is needed for tests.
373 Task = _CTask = _asyncio.Task
376def create_task(coro, *, name=None):
377 """Schedule the execution of a coroutine object in a spawn task.
379 Return a Task object.
380 """
381 loop = events.get_running_loop()
382 task = loop.create_task(coro)
383 _set_task_name(task, name)
384 return task
387# wait() and as_completed() similar to those in PEP 3148.
389FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
390FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
391ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
394async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
395 """Wait for the Futures and coroutines given by fs to complete.
397 The sequence futures must not be empty.
399 Coroutines will be wrapped in Tasks.
401 Returns two sets of Future: (done, pending).
403 Usage:
405 done, pending = await asyncio.wait(fs)
407 Note: This does not raise TimeoutError! Futures that aren't done
408 when the timeout occurs are returned in the second set.
409 """
410 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
411 raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
412 if not fs:
413 raise ValueError('Set of coroutines/Futures is empty.')
414 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
415 raise ValueError(f'Invalid return_when value: {return_when}')
417 if loop is None:
418 loop = events.get_running_loop()
419 else:
420 warnings.warn("The loop argument is deprecated since Python 3.8, "
421 "and scheduled for removal in Python 3.10.",
422 DeprecationWarning, stacklevel=2)
424 fs = {ensure_future(f, loop=loop) for f in set(fs)}
426 return await _wait(fs, timeout, return_when, loop)
429def _release_waiter(waiter, *args):
430 if not waiter.done():
431 waiter.set_result(None)
434async def wait_for(fut, timeout, *, loop=None):
435 """Wait for the single Future or coroutine to complete, with timeout.
437 Coroutine will be wrapped in Task.
439 Returns result of the Future or coroutine. When a timeout occurs,
440 it cancels the task and raises TimeoutError. To avoid the task
441 cancellation, wrap it in shield().
443 If the wait is cancelled, the task is also cancelled.
445 This function is a coroutine.
446 """
447 if loop is None:
448 loop = events.get_running_loop()
449 else:
450 warnings.warn("The loop argument is deprecated since Python 3.8, "
451 "and scheduled for removal in Python 3.10.",
452 DeprecationWarning, stacklevel=2)
454 if timeout is None:
455 return await fut
457 if timeout <= 0:
458 fut = ensure_future(fut, loop=loop)
460 if fut.done():
461 return fut.result()
463 fut.cancel()
464 raise exceptions.TimeoutError()
466 waiter = loop.create_future()
467 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
468 cb = functools.partial(_release_waiter, waiter)
470 fut = ensure_future(fut, loop=loop)
471 fut.add_done_callback(cb)
473 try:
474 # wait until the future completes or the timeout
475 try:
476 await waiter
477 except exceptions.CancelledError:
478 fut.remove_done_callback(cb)
479 fut.cancel()
480 raise
482 if fut.done():
483 return fut.result()
484 else:
485 fut.remove_done_callback(cb)
486 # We must ensure that the task is not running
487 # after wait_for() returns.
488 # See https://bugs.python.org/issue32751
489 await _cancel_and_wait(fut, loop=loop)
490 raise exceptions.TimeoutError()
491 finally:
492 timeout_handle.cancel()
495async def _wait(fs, timeout, return_when, loop):
496 """Internal helper for wait().
498 The fs argument must be a collection of Futures.
499 """
500 assert fs, 'Set of Futures is empty.'
501 waiter = loop.create_future()
502 timeout_handle = None
503 if timeout is not None:
504 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
505 counter = len(fs)
507 def _on_completion(f):
508 nonlocal counter
509 counter -= 1
510 if (counter <= 0 or
511 return_when == FIRST_COMPLETED or
512 return_when == FIRST_EXCEPTION and (not f.cancelled() and
513 f.exception() is not None)):
514 if timeout_handle is not None:
515 timeout_handle.cancel()
516 if not waiter.done():
517 waiter.set_result(None)
519 for f in fs:
520 f.add_done_callback(_on_completion)
522 try:
523 await waiter
524 finally:
525 if timeout_handle is not None:
526 timeout_handle.cancel()
527 for f in fs:
528 f.remove_done_callback(_on_completion)
530 done, pending = set(), set()
531 for f in fs:
532 if f.done():
533 done.add(f)
534 else:
535 pending.add(f)
536 return done, pending
539async def _cancel_and_wait(fut, loop):
540 """Cancel the *fut* future or task and wait until it completes."""
542 waiter = loop.create_future()
543 cb = functools.partial(_release_waiter, waiter)
544 fut.add_done_callback(cb)
546 try:
547 fut.cancel()
548 # We cannot wait on *fut* directly to make
549 # sure _cancel_and_wait itself is reliably cancellable.
550 await waiter
551 finally:
552 fut.remove_done_callback(cb)
555# This is *not* a @coroutine! It is just an iterator (yielding Futures).
556def as_completed(fs, *, loop=None, timeout=None):
557 """Return an iterator whose values are coroutines.
559 When waiting for the yielded coroutines you'll get the results (or
560 exceptions!) of the original Futures (or coroutines), in the order
561 in which and as soon as they complete.
563 This differs from PEP 3148; the proper way to use this is:
565 for f in as_completed(fs):
566 result = await f # The 'await' may raise.
567 # Use result.
569 If a timeout is specified, the 'await' will raise
570 TimeoutError when the timeout occurs before all Futures are done.
572 Note: The futures 'f' are not necessarily members of fs.
573 """
574 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
575 raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
577 from .queues import Queue # Import here to avoid circular import problem.
578 done = Queue(loop=loop)
580 if loop is None:
581 loop = events.get_event_loop()
582 else:
583 warnings.warn("The loop argument is deprecated since Python 3.8, "
584 "and scheduled for removal in Python 3.10.",
585 DeprecationWarning, stacklevel=2)
586 todo = {ensure_future(f, loop=loop) for f in set(fs)}
587 timeout_handle = None
589 def _on_timeout():
590 for f in todo:
591 f.remove_done_callback(_on_completion)
592 done.put_nowait(None) # Queue a dummy value for _wait_for_one().
593 todo.clear() # Can't do todo.remove(f) in the loop.
595 def _on_completion(f):
596 if not todo:
597 return # _on_timeout() was here first.
598 todo.remove(f)
599 done.put_nowait(f)
600 if not todo and timeout_handle is not None:
601 timeout_handle.cancel()
603 async def _wait_for_one():
604 f = await done.get()
605 if f is None:
606 # Dummy value from _on_timeout().
607 raise exceptions.TimeoutError
608 return f.result() # May raise f.exception().
610 for f in todo:
611 f.add_done_callback(_on_completion)
612 if todo and timeout is not None:
613 timeout_handle = loop.call_later(timeout, _on_timeout)
614 for _ in range(len(todo)):
615 yield _wait_for_one()
618@types.coroutine
619def __sleep0():
620 """Skip one event loop run cycle.
622 This is a private helper for 'asyncio.sleep()', used
623 when the 'delay' is set to 0. It uses a bare 'yield'
624 expression (which Task.__step knows how to handle)
625 instead of creating a Future object.
626 """
627 yield
630async def sleep(delay, result=None, *, loop=None):
631 """Coroutine that completes after a given time (in seconds)."""
632 if delay <= 0:
633 await __sleep0()
634 return result
636 if loop is None:
637 loop = events.get_running_loop()
638 else:
639 warnings.warn("The loop argument is deprecated since Python 3.8, "
640 "and scheduled for removal in Python 3.10.",
641 DeprecationWarning, stacklevel=2)
643 future = loop.create_future()
644 h = loop.call_later(delay,
645 futures._set_result_unless_cancelled,
646 future, result)
647 try:
648 return await future
649 finally:
650 h.cancel()
653def ensure_future(coro_or_future, *, loop=None):
654 """Wrap a coroutine or an awaitable in a future.
656 If the argument is a Future, it is returned directly.
657 """
658 if coroutines.iscoroutine(coro_or_future):
659 if loop is None:
660 loop = events.get_event_loop()
661 task = loop.create_task(coro_or_future)
662 if task._source_traceback:
663 del task._source_traceback[-1]
664 return task
665 elif futures.isfuture(coro_or_future):
666 if loop is not None and loop is not futures._get_loop(coro_or_future):
667 raise ValueError('The future belongs to a different loop than '
668 'the one specified as the loop argument')
669 return coro_or_future
670 elif inspect.isawaitable(coro_or_future):
671 return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
672 else:
673 raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
674 'required')
677@types.coroutine
678def _wrap_awaitable(awaitable):
679 """Helper for asyncio.ensure_future().
681 Wraps awaitable (an object with __await__) into a coroutine
682 that will later be wrapped in a Task by ensure_future().
683 """
684 return (yield from awaitable.__await__())
686_wrap_awaitable._is_coroutine = _is_coroutine
689class _GatheringFuture(futures.Future):
690 """Helper for gather().
692 This overrides cancel() to cancel all the children and act more
693 like Task.cancel(), which doesn't immediately mark itself as
694 cancelled.
695 """
697 def __init__(self, children, *, loop=None):
698 super().__init__(loop=loop)
699 self._children = children
700 self._cancel_requested = False
702 def cancel(self):
703 if self.done():
704 return False
705 ret = False
706 for child in self._children:
707 if child.cancel():
708 ret = True
709 if ret:
710 # If any child tasks were actually cancelled, we should
711 # propagate the cancellation request regardless of
712 # *return_exceptions* argument. See issue 32684.
713 self._cancel_requested = True
714 return ret
717def gather(*coros_or_futures, loop=None, return_exceptions=False):
718 """Return a future aggregating results from the given coroutines/futures.
720 Coroutines will be wrapped in a future and scheduled in the event
721 loop. They will not necessarily be scheduled in the same order as
722 passed in.
724 All futures must share the same event loop. If all the tasks are
725 done successfully, the returned future's result is the list of
726 results (in the order of the original sequence, not necessarily
727 the order of results arrival). If *return_exceptions* is True,
728 exceptions in the tasks are treated the same as successful
729 results, and gathered in the result list; otherwise, the first
730 raised exception will be immediately propagated to the returned
731 future.
733 Cancellation: if the outer Future is cancelled, all children (that
734 have not completed yet) are also cancelled. If any child is
735 cancelled, this is treated as if it raised CancelledError --
736 the outer Future is *not* cancelled in this case. (This is to
737 prevent the cancellation of one child to cause other children to
738 be cancelled.)
739 """
740 if not coros_or_futures:
741 if loop is None:
742 loop = events.get_event_loop()
743 else:
744 warnings.warn("The loop argument is deprecated since Python 3.8, "
745 "and scheduled for removal in Python 3.10.",
746 DeprecationWarning, stacklevel=2)
747 outer = loop.create_future()
748 outer.set_result([])
749 return outer
751 def _done_callback(fut):
752 nonlocal nfinished
753 nfinished += 1
755 if outer.done():
756 if not fut.cancelled():
757 # Mark exception retrieved.
758 fut.exception()
759 return
761 if not return_exceptions:
762 if fut.cancelled():
763 # Check if 'fut' is cancelled first, as
764 # 'fut.exception()' will *raise* a CancelledError
765 # instead of returning it.
766 exc = exceptions.CancelledError()
767 outer.set_exception(exc)
768 return
769 else:
770 exc = fut.exception()
771 if exc is not None:
772 outer.set_exception(exc)
773 return
775 if nfinished == nfuts:
776 # All futures are done; create a list of results
777 # and set it to the 'outer' future.
778 results = []
780 for fut in children:
781 if fut.cancelled():
782 # Check if 'fut' is cancelled first, as
783 # 'fut.exception()' will *raise* a CancelledError
784 # instead of returning it.
785 res = exceptions.CancelledError()
786 else:
787 res = fut.exception()
788 if res is None:
789 res = fut.result()
790 results.append(res)
792 if outer._cancel_requested:
793 # If gather is being cancelled we must propagate the
794 # cancellation regardless of *return_exceptions* argument.
795 # See issue 32684.
796 outer.set_exception(exceptions.CancelledError())
797 else:
798 outer.set_result(results)
800 arg_to_fut = {}
801 children = []
802 nfuts = 0
803 nfinished = 0
804 for arg in coros_or_futures:
805 if arg not in arg_to_fut:
806 fut = ensure_future(arg, loop=loop)
807 if loop is None:
808 loop = futures._get_loop(fut)
809 if fut is not arg:
810 # 'arg' was not a Future, therefore, 'fut' is a new
811 # Future created specifically for 'arg'. Since the caller
812 # can't control it, disable the "destroy pending task"
813 # warning.
814 fut._log_destroy_pending = False
816 nfuts += 1
817 arg_to_fut[arg] = fut
818 fut.add_done_callback(_done_callback)
820 else:
821 # There's a duplicate Future object in coros_or_futures.
822 fut = arg_to_fut[arg]
824 children.append(fut)
826 outer = _GatheringFuture(children, loop=loop)
827 return outer
830def shield(arg, *, loop=None):
831 """Wait for a future, shielding it from cancellation.
833 The statement
835 res = await shield(something())
837 is exactly equivalent to the statement
839 res = await something()
841 *except* that if the coroutine containing it is cancelled, the
842 task running in something() is not cancelled. From the POV of
843 something(), the cancellation did not happen. But its caller is
844 still cancelled, so the yield-from expression still raises
845 CancelledError. Note: If something() is cancelled by other means
846 this will still cancel shield().
848 If you want to completely ignore cancellation (not recommended)
849 you can combine shield() with a try/except clause, as follows:
851 try:
852 res = await shield(something())
853 except CancelledError:
854 res = None
855 """
856 if loop is not None:
857 warnings.warn("The loop argument is deprecated since Python 3.8, "
858 "and scheduled for removal in Python 3.10.",
859 DeprecationWarning, stacklevel=2)
860 inner = ensure_future(arg, loop=loop)
861 if inner.done():
862 # Shortcut.
863 return inner
864 loop = futures._get_loop(inner)
865 outer = loop.create_future()
867 def _inner_done_callback(inner):
868 if outer.cancelled():
869 if not inner.cancelled():
870 # Mark inner's result as retrieved.
871 inner.exception()
872 return
874 if inner.cancelled():
875 outer.cancel()
876 else:
877 exc = inner.exception()
878 if exc is not None:
879 outer.set_exception(exc)
880 else:
881 outer.set_result(inner.result())
884 def _outer_done_callback(outer):
885 if not inner.done():
886 inner.remove_done_callback(_inner_done_callback)
888 inner.add_done_callback(_inner_done_callback)
889 outer.add_done_callback(_outer_done_callback)
890 return outer
893def run_coroutine_threadsafe(coro, loop):
894 """Submit a coroutine object to a given event loop.
896 Return a concurrent.futures.Future to access the result.
897 """
898 if not coroutines.iscoroutine(coro):
899 raise TypeError('A coroutine object is required')
900 future = concurrent.futures.Future()
902 def callback():
903 try:
904 futures._chain_future(ensure_future(coro, loop=loop), future)
905 except (SystemExit, KeyboardInterrupt):
906 raise
907 except BaseException as exc:
908 if future.set_running_or_notify_cancel():
909 future.set_exception(exc)
910 raise
912 loop.call_soon_threadsafe(callback)
913 return future
916# WeakSet containing all alive tasks.
917_all_tasks = weakref.WeakSet()
919# Dictionary containing tasks that are currently active in
920# all running event loops. {EventLoop: Task}
921_current_tasks = {}
924def _register_task(task):
925 """Register a new task in asyncio as executed by loop."""
926 _all_tasks.add(task)
929def _enter_task(loop, task):
930 current_task = _current_tasks.get(loop)
931 if current_task is not None:
932 raise RuntimeError(f"Cannot enter into task {task!r} while another "
933 f"task {current_task!r} is being executed.")
934 _current_tasks[loop] = task
937def _leave_task(loop, task):
938 current_task = _current_tasks.get(loop)
939 if current_task is not task:
940 raise RuntimeError(f"Leaving task {task!r} does not match "
941 f"the current task {current_task!r}.")
942 del _current_tasks[loop]
945def _unregister_task(task):
946 """Unregister a task."""
947 _all_tasks.discard(task)
950_py_register_task = _register_task
951_py_unregister_task = _unregister_task
952_py_enter_task = _enter_task
953_py_leave_task = _leave_task
956try:
957 from _asyncio import (_register_task, _unregister_task,
958 _enter_task, _leave_task,
959 _all_tasks, _current_tasks)
960except ImportError:
961 pass
962else:
963 _c_register_task = _register_task
964 _c_unregister_task = _unregister_task
965 _c_enter_task = _enter_task
966 _c_leave_task = _leave_task