Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tornado/gen.py: 23%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""``tornado.gen`` implements generator-based coroutines.
3.. note::
5 The "decorator and generator" approach in this module is a
6 precursor to native coroutines (using ``async def`` and ``await``)
7 which were introduced in Python 3.5. Applications that do not
8 require compatibility with older versions of Python should use
9 native coroutines instead. Some parts of this module are still
10 useful with native coroutines, notably `multi`, `sleep`,
11 `WaitIterator`, and `with_timeout`. Some of these functions have
12 counterparts in the `asyncio` module which may be used as well,
13 although the two may not necessarily be 100% compatible.
15Coroutines provide an easier way to work in an asynchronous
16environment than chaining callbacks. Code using coroutines is
17technically asynchronous, but it is written as a single generator
18instead of a collection of separate functions.
20For example, here's a coroutine-based handler:
22.. testcode::
24 class GenAsyncHandler(RequestHandler):
25 @gen.coroutine
26 def get(self):
27 http_client = AsyncHTTPClient()
28 response = yield http_client.fetch("http://example.com")
29 do_something_with_response(response)
30 self.render("template.html")
32.. testoutput::
33 :hide:
35Asynchronous functions in Tornado return an ``Awaitable`` or `.Future`;
36yielding this object returns its result.
38You can also yield a list or dict of other yieldable objects, which
39will be started at the same time and run in parallel; a list or dict
40of results will be returned when they are all finished:
42.. testcode::
44 @gen.coroutine
45 def get(self):
46 http_client = AsyncHTTPClient()
47 response1, response2 = yield [http_client.fetch(url1),
48 http_client.fetch(url2)]
49 response_dict = yield dict(response3=http_client.fetch(url3),
50 response4=http_client.fetch(url4))
51 response3 = response_dict['response3']
52 response4 = response_dict['response4']
54.. testoutput::
55 :hide:
57If ``tornado.platform.twisted`` is imported, it is also possible to
58yield Twisted's ``Deferred`` objects. See the `convert_yielded`
59function to extend this mechanism.
61.. versionchanged:: 3.2
62 Dict support added.
64.. versionchanged:: 4.1
65 Support added for yielding ``asyncio`` Futures and Twisted Deferreds
66 via ``singledispatch``.
68"""
70import asyncio
71import builtins
72import collections
73from collections.abc import Generator
74import concurrent.futures
75import datetime
76import functools
77from functools import singledispatch
78from inspect import isawaitable
79import sys
80import types
82from tornado.concurrent import (
83 Future,
84 is_future,
85 chain_future,
86 future_set_exc_info,
87 future_add_done_callback,
88 future_set_result_unless_cancelled,
89)
90from tornado.ioloop import IOLoop
91from tornado.log import app_log
92from tornado.util import TimeoutError
94try:
95 import contextvars
96except ImportError:
97 contextvars = None # type: ignore
99import typing
100from typing import Union, Any, Callable, List, Type, Tuple, Awaitable, Dict, overload
102if typing.TYPE_CHECKING:
103 from typing import Sequence, Deque, Optional, Set, Iterable # noqa: F401
105_T = typing.TypeVar("_T")
107_Yieldable = Union[
108 None, Awaitable, List[Awaitable], Dict[Any, Awaitable], concurrent.futures.Future
109]
112class KeyReuseError(Exception):
113 pass
116class UnknownKeyError(Exception):
117 pass
120class LeakedCallbackError(Exception):
121 pass
124class BadYieldError(Exception):
125 pass
128class ReturnValueIgnoredError(Exception):
129 pass
132def _value_from_stopiteration(e: Union[StopIteration, "Return"]) -> Any:
133 try:
134 # StopIteration has a value attribute beginning in py33.
135 # So does our Return class.
136 return e.value
137 except AttributeError:
138 pass
139 try:
140 # Cython backports coroutine functionality by putting the value in
141 # e.args[0].
142 return e.args[0]
143 except (AttributeError, IndexError):
144 return None
147def _create_future() -> Future:
148 future = Future() # type: Future
149 # Fixup asyncio debug info by removing extraneous stack entries
150 source_traceback = getattr(future, "_source_traceback", ())
151 while source_traceback:
152 # Each traceback entry is equivalent to a
153 # (filename, self.lineno, self.name, self.line) tuple
154 filename = source_traceback[-1][0]
155 if filename == __file__:
156 del source_traceback[-1]
157 else:
158 break
159 return future
162def _fake_ctx_run(f: Callable[..., _T], *args: Any, **kw: Any) -> _T:
163 return f(*args, **kw)
166@overload
167def coroutine(
168 func: Callable[..., "Generator[Any, Any, _T]"]
169) -> Callable[..., "Future[_T]"]: ...
172@overload
173def coroutine(func: Callable[..., _T]) -> Callable[..., "Future[_T]"]: ...
176def coroutine(
177 func: Union[Callable[..., "Generator[Any, Any, _T]"], Callable[..., _T]]
178) -> Callable[..., "Future[_T]"]:
179 """Decorator for asynchronous generators.
181 For compatibility with older versions of Python, coroutines may
182 also "return" by raising the special exception `Return(value)
183 <Return>`.
185 Functions with this decorator return a `.Future`.
187 .. warning::
189 When exceptions occur inside a coroutine, the exception
190 information will be stored in the `.Future` object. You must
191 examine the result of the `.Future` object, or the exception
192 may go unnoticed by your code. This means yielding the function
193 if called from another coroutine, using something like
194 `.IOLoop.run_sync` for top-level calls, or passing the `.Future`
195 to `.IOLoop.add_future`.
197 .. versionchanged:: 6.0
199 The ``callback`` argument was removed. Use the returned
200 awaitable object instead.
202 """
204 @functools.wraps(func)
205 def wrapper(*args, **kwargs):
206 # type: (*Any, **Any) -> Future[_T]
207 # This function is type-annotated with a comment to work around
208 # https://bitbucket.org/pypy/pypy/issues/2868/segfault-with-args-type-annotation-in
209 future = _create_future()
210 if contextvars is not None:
211 ctx_run = contextvars.copy_context().run # type: Callable
212 else:
213 ctx_run = _fake_ctx_run
214 try:
215 result = ctx_run(func, *args, **kwargs)
216 except (Return, StopIteration) as e:
217 result = _value_from_stopiteration(e)
218 except Exception:
219 future_set_exc_info(future, sys.exc_info())
220 try:
221 return future
222 finally:
223 # Avoid circular references
224 future = None # type: ignore
225 else:
226 if isinstance(result, Generator):
227 # Inline the first iteration of Runner.run. This lets us
228 # avoid the cost of creating a Runner when the coroutine
229 # never actually yields, which in turn allows us to
230 # use "optional" coroutines in critical path code without
231 # performance penalty for the synchronous case.
232 try:
233 yielded = ctx_run(next, result)
234 except (StopIteration, Return) as e:
235 future_set_result_unless_cancelled(
236 future, _value_from_stopiteration(e)
237 )
238 except Exception:
239 future_set_exc_info(future, sys.exc_info())
240 else:
241 # Provide strong references to Runner objects as long
242 # as their result future objects also have strong
243 # references (typically from the parent coroutine's
244 # Runner). This keeps the coroutine's Runner alive.
245 # We do this by exploiting the public API
246 # add_done_callback() instead of putting a private
247 # attribute on the Future.
248 # (GitHub issues #1769, #2229).
249 runner = Runner(ctx_run, result, future, yielded)
250 future.add_done_callback(lambda _: runner)
251 yielded = None
252 try:
253 return future
254 finally:
255 # Subtle memory optimization: if next() raised an exception,
256 # the future's exc_info contains a traceback which
257 # includes this stack frame. This creates a cycle,
258 # which will be collected at the next full GC but has
259 # been shown to greatly increase memory usage of
260 # benchmarks (relative to the refcount-based scheme
261 # used in the absence of cycles). We can avoid the
262 # cycle by clearing the local variable after we return it.
263 future = None # type: ignore
264 future_set_result_unless_cancelled(future, result)
265 return future
267 wrapper.__wrapped__ = func # type: ignore
268 wrapper.__tornado_coroutine__ = True # type: ignore
269 return wrapper
272def is_coroutine_function(func: Any) -> bool:
273 """Return whether *func* is a coroutine function, i.e. a function
274 wrapped with `~.gen.coroutine`.
276 .. versionadded:: 4.5
277 """
278 return getattr(func, "__tornado_coroutine__", False)
281class Return(Exception):
282 """Special exception to return a value from a `coroutine`.
284 If this exception is raised, its value argument is used as the
285 result of the coroutine::
287 @gen.coroutine
288 def fetch_json(url):
289 response = yield AsyncHTTPClient().fetch(url)
290 raise gen.Return(json_decode(response.body))
292 In Python 3.3, this exception is no longer necessary: the ``return``
293 statement can be used directly to return a value (previously
294 ``yield`` and ``return`` with a value could not be combined in the
295 same function).
297 By analogy with the return statement, the value argument is optional,
298 but it is never necessary to ``raise gen.Return()``. The ``return``
299 statement can be used with no arguments instead.
300 """
302 def __init__(self, value: Any = None) -> None:
303 super().__init__()
304 self.value = value
305 # Cython recognizes subclasses of StopIteration with a .args tuple.
306 self.args = (value,)
309class WaitIterator(object):
310 """Provides an iterator to yield the results of awaitables as they finish.
312 Yielding a set of awaitables like this:
314 ``results = yield [awaitable1, awaitable2]``
316 pauses the coroutine until both ``awaitable1`` and ``awaitable2``
317 return, and then restarts the coroutine with the results of both
318 awaitables. If either awaitable raises an exception, the
319 expression will raise that exception and all the results will be
320 lost.
322 If you need to get the result of each awaitable as soon as possible,
323 or if you need the result of some awaitables even if others produce
324 errors, you can use ``WaitIterator``::
326 wait_iterator = gen.WaitIterator(awaitable1, awaitable2)
327 while not wait_iterator.done():
328 try:
329 result = yield wait_iterator.next()
330 except Exception as e:
331 print("Error {} from {}".format(e, wait_iterator.current_future))
332 else:
333 print("Result {} received from {} at {}".format(
334 result, wait_iterator.current_future,
335 wait_iterator.current_index))
337 Because results are returned as soon as they are available the
338 output from the iterator *will not be in the same order as the
339 input arguments*. If you need to know which future produced the
340 current result, you can use the attributes
341 ``WaitIterator.current_future``, or ``WaitIterator.current_index``
342 to get the index of the awaitable from the input list. (if keyword
343 arguments were used in the construction of the `WaitIterator`,
344 ``current_index`` will use the corresponding keyword).
346 On Python 3.5, `WaitIterator` implements the async iterator
347 protocol, so it can be used with the ``async for`` statement (note
348 that in this version the entire iteration is aborted if any value
349 raises an exception, while the previous example can continue past
350 individual errors)::
352 async for result in gen.WaitIterator(future1, future2):
353 print("Result {} received from {} at {}".format(
354 result, wait_iterator.current_future,
355 wait_iterator.current_index))
357 .. versionadded:: 4.1
359 .. versionchanged:: 4.3
360 Added ``async for`` support in Python 3.5.
362 """
364 _unfinished = {} # type: Dict[Future, Union[int, str]]
366 def __init__(self, *args: Future, **kwargs: Future) -> None:
367 if args and kwargs:
368 raise ValueError("You must provide args or kwargs, not both")
370 if kwargs:
371 self._unfinished = dict((f, k) for (k, f) in kwargs.items())
372 futures = list(kwargs.values()) # type: Sequence[Future]
373 else:
374 self._unfinished = dict((f, i) for (i, f) in enumerate(args))
375 futures = args
377 self._finished = collections.deque() # type: Deque[Future]
378 self.current_index = None # type: Optional[Union[str, int]]
379 self.current_future = None # type: Optional[Future]
380 self._running_future = None # type: Optional[Future]
382 for future in futures:
383 future_add_done_callback(future, self._done_callback)
385 def done(self) -> bool:
386 """Returns True if this iterator has no more results."""
387 if self._finished or self._unfinished:
388 return False
389 # Clear the 'current' values when iteration is done.
390 self.current_index = self.current_future = None
391 return True
393 def next(self) -> Future:
394 """Returns a `.Future` that will yield the next available result.
396 Note that this `.Future` will not be the same object as any of
397 the inputs.
398 """
399 self._running_future = Future()
401 if self._finished:
402 return self._return_result(self._finished.popleft())
404 return self._running_future
406 def _done_callback(self, done: Future) -> None:
407 if self._running_future and not self._running_future.done():
408 self._return_result(done)
409 else:
410 self._finished.append(done)
412 def _return_result(self, done: Future) -> Future:
413 """Called set the returned future's state that of the future
414 we yielded, and set the current future for the iterator.
415 """
416 if self._running_future is None:
417 raise Exception("no future is running")
418 chain_future(done, self._running_future)
420 res = self._running_future
421 self._running_future = None
422 self.current_future = done
423 self.current_index = self._unfinished.pop(done)
425 return res
427 def __aiter__(self) -> typing.AsyncIterator:
428 return self
430 def __anext__(self) -> Future:
431 if self.done():
432 # Lookup by name to silence pyflakes on older versions.
433 raise getattr(builtins, "StopAsyncIteration")()
434 return self.next()
437def multi(
438 children: Union[List[_Yieldable], Dict[Any, _Yieldable]],
439 quiet_exceptions: "Union[Type[Exception], Tuple[Type[Exception], ...]]" = (),
440) -> "Union[Future[List], Future[Dict]]":
441 """Runs multiple asynchronous operations in parallel.
443 ``children`` may either be a list or a dict whose values are
444 yieldable objects. ``multi()`` returns a new yieldable
445 object that resolves to a parallel structure containing their
446 results. If ``children`` is a list, the result is a list of
447 results in the same order; if it is a dict, the result is a dict
448 with the same keys.
450 That is, ``results = yield multi(list_of_futures)`` is equivalent
451 to::
453 results = []
454 for future in list_of_futures:
455 results.append(yield future)
457 If any children raise exceptions, ``multi()`` will raise the first
458 one. All others will be logged, unless they are of types
459 contained in the ``quiet_exceptions`` argument.
461 In a ``yield``-based coroutine, it is not normally necessary to
462 call this function directly, since the coroutine runner will
463 do it automatically when a list or dict is yielded. However,
464 it is necessary in ``await``-based coroutines, or to pass
465 the ``quiet_exceptions`` argument.
467 This function is available under the names ``multi()`` and ``Multi()``
468 for historical reasons.
470 Cancelling a `.Future` returned by ``multi()`` does not cancel its
471 children. `asyncio.gather` is similar to ``multi()``, but it does
472 cancel its children.
474 .. versionchanged:: 4.2
475 If multiple yieldables fail, any exceptions after the first
476 (which is raised) will be logged. Added the ``quiet_exceptions``
477 argument to suppress this logging for selected exception types.
479 .. versionchanged:: 4.3
480 Replaced the class ``Multi`` and the function ``multi_future``
481 with a unified function ``multi``. Added support for yieldables
482 other than ``YieldPoint`` and `.Future`.
484 """
485 return multi_future(children, quiet_exceptions=quiet_exceptions)
488Multi = multi
491def multi_future(
492 children: Union[List[_Yieldable], Dict[Any, _Yieldable]],
493 quiet_exceptions: "Union[Type[Exception], Tuple[Type[Exception], ...]]" = (),
494) -> "Union[Future[List], Future[Dict]]":
495 """Wait for multiple asynchronous futures in parallel.
497 Since Tornado 6.0, this function is exactly the same as `multi`.
499 .. versionadded:: 4.0
501 .. versionchanged:: 4.2
502 If multiple ``Futures`` fail, any exceptions after the first (which is
503 raised) will be logged. Added the ``quiet_exceptions``
504 argument to suppress this logging for selected exception types.
506 .. deprecated:: 4.3
507 Use `multi` instead.
508 """
509 if isinstance(children, dict):
510 keys = list(children.keys()) # type: Optional[List]
511 children_seq = children.values() # type: Iterable
512 else:
513 keys = None
514 children_seq = children
515 children_futs = list(map(convert_yielded, children_seq))
516 assert all(is_future(i) or isinstance(i, _NullFuture) for i in children_futs)
517 unfinished_children = set(children_futs)
519 future = _create_future()
520 if not children_futs:
521 future_set_result_unless_cancelled(future, {} if keys is not None else [])
523 def callback(fut: Future) -> None:
524 unfinished_children.remove(fut)
525 if not unfinished_children:
526 result_list = []
527 for f in children_futs:
528 try:
529 result_list.append(f.result())
530 except Exception as e:
531 if future.done():
532 if not isinstance(e, quiet_exceptions):
533 app_log.error(
534 "Multiple exceptions in yield list", exc_info=True
535 )
536 else:
537 future_set_exc_info(future, sys.exc_info())
538 if not future.done():
539 if keys is not None:
540 future_set_result_unless_cancelled(
541 future, dict(zip(keys, result_list))
542 )
543 else:
544 future_set_result_unless_cancelled(future, result_list)
546 listening = set() # type: Set[Future]
547 for f in children_futs:
548 if f not in listening:
549 listening.add(f)
550 future_add_done_callback(f, callback)
551 return future
554def maybe_future(x: Any) -> Future:
555 """Converts ``x`` into a `.Future`.
557 If ``x`` is already a `.Future`, it is simply returned; otherwise
558 it is wrapped in a new `.Future`. This is suitable for use as
559 ``result = yield gen.maybe_future(f())`` when you don't know whether
560 ``f()`` returns a `.Future` or not.
562 .. deprecated:: 4.3
563 This function only handles ``Futures``, not other yieldable objects.
564 Instead of `maybe_future`, check for the non-future result types
565 you expect (often just ``None``), and ``yield`` anything unknown.
566 """
567 if is_future(x):
568 return x
569 else:
570 fut = _create_future()
571 fut.set_result(x)
572 return fut
575def with_timeout(
576 timeout: Union[float, datetime.timedelta],
577 future: _Yieldable,
578 quiet_exceptions: "Union[Type[Exception], Tuple[Type[Exception], ...]]" = (),
579) -> Future:
580 """Wraps a `.Future` (or other yieldable object) in a timeout.
582 Raises `tornado.util.TimeoutError` if the input future does not
583 complete before ``timeout``, which may be specified in any form
584 allowed by `.IOLoop.add_timeout` (i.e. a `datetime.timedelta` or
585 an absolute time relative to `.IOLoop.time`)
587 If the wrapped `.Future` fails after it has timed out, the exception
588 will be logged unless it is either of a type contained in
589 ``quiet_exceptions`` (which may be an exception type or a sequence of
590 types), or an ``asyncio.CancelledError``.
592 The wrapped `.Future` is not canceled when the timeout expires,
593 permitting it to be reused. `asyncio.wait_for` is similar to this
594 function but it does cancel the wrapped `.Future` on timeout.
596 .. versionadded:: 4.0
598 .. versionchanged:: 4.1
599 Added the ``quiet_exceptions`` argument and the logging of unhandled
600 exceptions.
602 .. versionchanged:: 4.4
603 Added support for yieldable objects other than `.Future`.
605 .. versionchanged:: 6.0.3
606 ``asyncio.CancelledError`` is now always considered "quiet".
608 .. versionchanged:: 6.2
609 ``tornado.util.TimeoutError`` is now an alias to ``asyncio.TimeoutError``.
611 """
612 # It's tempting to optimize this by cancelling the input future on timeout
613 # instead of creating a new one, but A) we can't know if we are the only
614 # one waiting on the input future, so cancelling it might disrupt other
615 # callers and B) concurrent futures can only be cancelled while they are
616 # in the queue, so cancellation cannot reliably bound our waiting time.
617 future_converted = convert_yielded(future)
618 result = _create_future()
619 chain_future(future_converted, result)
620 io_loop = IOLoop.current()
622 def error_callback(future: Future) -> None:
623 try:
624 future.result()
625 except asyncio.CancelledError:
626 pass
627 except Exception as e:
628 if not isinstance(e, quiet_exceptions):
629 app_log.error(
630 "Exception in Future %r after timeout", future, exc_info=True
631 )
633 def timeout_callback() -> None:
634 if not result.done():
635 result.set_exception(TimeoutError("Timeout"))
636 # In case the wrapped future goes on to fail, log it.
637 future_add_done_callback(future_converted, error_callback)
639 timeout_handle = io_loop.add_timeout(timeout, timeout_callback)
640 if isinstance(future_converted, Future):
641 # We know this future will resolve on the IOLoop, so we don't
642 # need the extra thread-safety of IOLoop.add_future (and we also
643 # don't care about StackContext here.
644 future_add_done_callback(
645 future_converted, lambda future: io_loop.remove_timeout(timeout_handle)
646 )
647 else:
648 # concurrent.futures.Futures may resolve on any thread, so we
649 # need to route them back to the IOLoop.
650 io_loop.add_future(
651 future_converted, lambda future: io_loop.remove_timeout(timeout_handle)
652 )
653 return result
656def sleep(duration: float) -> "Future[None]":
657 """Return a `.Future` that resolves after the given number of seconds.
659 When used with ``yield`` in a coroutine, this is a non-blocking
660 analogue to `time.sleep` (which should not be used in coroutines
661 because it is blocking)::
663 yield gen.sleep(0.5)
665 Note that calling this function on its own does nothing; you must
666 wait on the `.Future` it returns (usually by yielding it).
668 .. versionadded:: 4.1
669 """
670 f = _create_future()
671 IOLoop.current().call_later(
672 duration, lambda: future_set_result_unless_cancelled(f, None)
673 )
674 return f
677class _NullFuture(object):
678 """_NullFuture resembles a Future that finished with a result of None.
680 It's not actually a `Future` to avoid depending on a particular event loop.
681 Handled as a special case in the coroutine runner.
683 We lie and tell the type checker that a _NullFuture is a Future so
684 we don't have to leak _NullFuture into lots of public APIs. But
685 this means that the type checker can't warn us when we're passing
686 a _NullFuture into a code path that doesn't understand what to do
687 with it.
688 """
690 def result(self) -> None:
691 return None
693 def done(self) -> bool:
694 return True
697# _null_future is used as a dummy value in the coroutine runner. It differs
698# from moment in that moment always adds a delay of one IOLoop iteration
699# while _null_future is processed as soon as possible.
700_null_future = typing.cast(Future, _NullFuture())
702moment = typing.cast(Future, _NullFuture())
703moment.__doc__ = """A special object which may be yielded to allow the IOLoop to run for
704one iteration.
706This is not needed in normal use but it can be helpful in long-running
707coroutines that are likely to yield Futures that are ready instantly.
709Usage: ``yield gen.moment``
711In native coroutines, the equivalent of ``yield gen.moment`` is
712``await asyncio.sleep(0)``.
714.. versionadded:: 4.0
716.. deprecated:: 4.5
717 ``yield None`` (or ``yield`` with no argument) is now equivalent to
718 ``yield gen.moment``.
719"""
722class Runner(object):
723 """Internal implementation of `tornado.gen.coroutine`.
725 Maintains information about pending callbacks and their results.
727 The results of the generator are stored in ``result_future`` (a
728 `.Future`)
729 """
731 def __init__(
732 self,
733 ctx_run: Callable,
734 gen: "Generator[_Yieldable, Any, _T]",
735 result_future: "Future[_T]",
736 first_yielded: _Yieldable,
737 ) -> None:
738 self.ctx_run = ctx_run
739 self.gen = gen
740 self.result_future = result_future
741 self.future = _null_future # type: Union[None, Future]
742 self.running = False
743 self.finished = False
744 self.io_loop = IOLoop.current()
745 if self.ctx_run(self.handle_yield, first_yielded):
746 gen = result_future = first_yielded = None # type: ignore
747 self.ctx_run(self.run)
749 def run(self) -> None:
750 """Starts or resumes the generator, running until it reaches a
751 yield point that is not ready.
752 """
753 if self.running or self.finished:
754 return
755 try:
756 self.running = True
757 while True:
758 future = self.future
759 if future is None:
760 raise Exception("No pending future")
761 if not future.done():
762 return
763 self.future = None
764 try:
765 try:
766 value = future.result()
767 except Exception as e:
768 # Save the exception for later. It's important that
769 # gen.throw() not be called inside this try/except block
770 # because that makes sys.exc_info behave unexpectedly.
771 exc: Optional[Exception] = e
772 else:
773 exc = None
774 finally:
775 future = None
777 if exc is not None:
778 try:
779 yielded = self.gen.throw(exc)
780 finally:
781 # Break up a circular reference for faster GC on
782 # CPython.
783 del exc
784 else:
785 yielded = self.gen.send(value)
787 except (StopIteration, Return) as e:
788 self.finished = True
789 self.future = _null_future
790 future_set_result_unless_cancelled(
791 self.result_future, _value_from_stopiteration(e)
792 )
793 self.result_future = None # type: ignore
794 return
795 except Exception:
796 self.finished = True
797 self.future = _null_future
798 future_set_exc_info(self.result_future, sys.exc_info())
799 self.result_future = None # type: ignore
800 return
801 if not self.handle_yield(yielded):
802 return
803 yielded = None
804 finally:
805 self.running = False
807 def handle_yield(self, yielded: _Yieldable) -> bool:
808 try:
809 self.future = convert_yielded(yielded)
810 except BadYieldError:
811 self.future = Future()
812 future_set_exc_info(self.future, sys.exc_info())
814 if self.future is moment:
815 self.io_loop.add_callback(self.ctx_run, self.run)
816 return False
817 elif self.future is None:
818 raise Exception("no pending future")
819 elif not self.future.done():
821 def inner(f: Any) -> None:
822 # Break a reference cycle to speed GC.
823 f = None # noqa: F841
824 self.ctx_run(self.run)
826 self.io_loop.add_future(self.future, inner)
827 return False
828 return True
830 def handle_exception(
831 self, typ: Type[Exception], value: Exception, tb: types.TracebackType
832 ) -> bool:
833 if not self.running and not self.finished:
834 self.future = Future()
835 future_set_exc_info(self.future, (typ, value, tb))
836 self.ctx_run(self.run)
837 return True
838 else:
839 return False
842def _wrap_awaitable(awaitable: Awaitable) -> Future:
843 # Convert Awaitables into Futures.
844 # Note that we use ensure_future, which handles both awaitables
845 # and coroutines, rather than create_task, which only accepts
846 # coroutines. (ensure_future calls create_task if given a coroutine)
847 fut = asyncio.ensure_future(awaitable)
848 # See comments on IOLoop._pending_tasks.
849 loop = IOLoop.current()
850 loop._register_task(fut)
851 fut.add_done_callback(lambda f: loop._unregister_task(f))
852 return fut
855def convert_yielded(yielded: _Yieldable) -> Future:
856 """Convert a yielded object into a `.Future`.
858 The default implementation accepts lists, dictionaries, and
859 Futures. This has the side effect of starting any coroutines that
860 did not start themselves, similar to `asyncio.ensure_future`.
862 If the `~functools.singledispatch` library is available, this function
863 may be extended to support additional types. For example::
865 @convert_yielded.register(asyncio.Future)
866 def _(asyncio_future):
867 return tornado.platform.asyncio.to_tornado_future(asyncio_future)
869 .. versionadded:: 4.1
871 """
872 if yielded is None or yielded is moment:
873 return moment
874 elif yielded is _null_future:
875 return _null_future
876 elif isinstance(yielded, (list, dict)):
877 return multi(yielded) # type: ignore
878 elif is_future(yielded):
879 return typing.cast(Future, yielded)
880 elif isawaitable(yielded):
881 return _wrap_awaitable(yielded) # type: ignore
882 else:
883 raise BadYieldError("yielded unknown object %r" % (yielded,))
886convert_yielded = singledispatch(convert_yielded)