Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tornado/gen.py: 23%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

317 statements  

1"""``tornado.gen`` implements generator-based coroutines. 

2 

3.. note:: 

4 

5 The "decorator and generator" approach in this module is a 

6 precursor to native coroutines (using ``async def`` and ``await``) 

7 which were introduced in Python 3.5. Applications that do not 

8 require compatibility with older versions of Python should use 

9 native coroutines instead. Some parts of this module are still 

10 useful with native coroutines, notably `multi`, `sleep`, 

11 `WaitIterator`, and `with_timeout`. Some of these functions have 

12 counterparts in the `asyncio` module which may be used as well, 

13 although the two may not necessarily be 100% compatible. 

14 

15Coroutines provide an easier way to work in an asynchronous 

16environment than chaining callbacks. Code using coroutines is 

17technically asynchronous, but it is written as a single generator 

18instead of a collection of separate functions. 

19 

20For example, here's a coroutine-based handler: 

21 

22.. testcode:: 

23 

24 class GenAsyncHandler(RequestHandler): 

25 @gen.coroutine 

26 def get(self): 

27 http_client = AsyncHTTPClient() 

28 response = yield http_client.fetch("http://example.com") 

29 do_something_with_response(response) 

30 self.render("template.html") 

31 

32.. testoutput:: 

33 :hide: 

34 

35Asynchronous functions in Tornado return an ``Awaitable`` or `.Future`; 

36yielding this object returns its result. 

37 

38You can also yield a list or dict of other yieldable objects, which 

39will be started at the same time and run in parallel; a list or dict 

40of results will be returned when they are all finished: 

41 

42.. testcode:: 

43 

44 @gen.coroutine 

45 def get(self): 

46 http_client = AsyncHTTPClient() 

47 response1, response2 = yield [http_client.fetch(url1), 

48 http_client.fetch(url2)] 

49 response_dict = yield dict(response3=http_client.fetch(url3), 

50 response4=http_client.fetch(url4)) 

51 response3 = response_dict['response3'] 

52 response4 = response_dict['response4'] 

53 

54.. testoutput:: 

55 :hide: 

56 

57If ``tornado.platform.twisted`` is imported, it is also possible to 

58yield Twisted's ``Deferred`` objects. See the `convert_yielded` 

59function to extend this mechanism. 

60 

61.. versionchanged:: 3.2 

62 Dict support added. 

63 

64.. versionchanged:: 4.1 

65 Support added for yielding ``asyncio`` Futures and Twisted Deferreds 

66 via ``singledispatch``. 

67 

68""" 

69 

70import asyncio 

71import builtins 

72import collections 

73from collections.abc import Generator 

74import concurrent.futures 

75import datetime 

76import functools 

77from functools import singledispatch 

78from inspect import isawaitable 

79import sys 

80import types 

81 

82from tornado.concurrent import ( 

83 Future, 

84 is_future, 

85 chain_future, 

86 future_set_exc_info, 

87 future_add_done_callback, 

88 future_set_result_unless_cancelled, 

89) 

90from tornado.ioloop import IOLoop 

91from tornado.log import app_log 

92from tornado.util import TimeoutError 

93 

94try: 

95 import contextvars 

96except ImportError: 

97 contextvars = None # type: ignore 

98 

99import typing 

100from typing import Union, Any, Callable, List, Type, Tuple, Awaitable, Dict, overload 

101 

102if typing.TYPE_CHECKING: 

103 from typing import Sequence, Deque, Optional, Set, Iterable # noqa: F401 

104 

105_T = typing.TypeVar("_T") 

106 

107_Yieldable = Union[ 

108 None, Awaitable, List[Awaitable], Dict[Any, Awaitable], concurrent.futures.Future 

109] 

110 

111 

112class KeyReuseError(Exception): 

113 pass 

114 

115 

116class UnknownKeyError(Exception): 

117 pass 

118 

119 

120class LeakedCallbackError(Exception): 

121 pass 

122 

123 

124class BadYieldError(Exception): 

125 pass 

126 

127 

128class ReturnValueIgnoredError(Exception): 

129 pass 

130 

131 

132def _value_from_stopiteration(e: Union[StopIteration, "Return"]) -> Any: 

133 try: 

134 # StopIteration has a value attribute beginning in py33. 

135 # So does our Return class. 

136 return e.value 

137 except AttributeError: 

138 pass 

139 try: 

140 # Cython backports coroutine functionality by putting the value in 

141 # e.args[0]. 

142 return e.args[0] 

143 except (AttributeError, IndexError): 

144 return None 

145 

146 

147def _create_future() -> Future: 

148 future = Future() # type: Future 

149 # Fixup asyncio debug info by removing extraneous stack entries 

150 source_traceback = getattr(future, "_source_traceback", ()) 

151 while source_traceback: 

152 # Each traceback entry is equivalent to a 

153 # (filename, self.lineno, self.name, self.line) tuple 

154 filename = source_traceback[-1][0] 

155 if filename == __file__: 

156 del source_traceback[-1] 

157 else: 

158 break 

159 return future 

160 

161 

162def _fake_ctx_run(f: Callable[..., _T], *args: Any, **kw: Any) -> _T: 

163 return f(*args, **kw) 

164 

165 

166@overload 

167def coroutine( 

168 func: Callable[..., "Generator[Any, Any, _T]"] 

169) -> Callable[..., "Future[_T]"]: ... 

170 

171 

172@overload 

173def coroutine(func: Callable[..., _T]) -> Callable[..., "Future[_T]"]: ... 

174 

175 

176def coroutine( 

177 func: Union[Callable[..., "Generator[Any, Any, _T]"], Callable[..., _T]] 

178) -> Callable[..., "Future[_T]"]: 

179 """Decorator for asynchronous generators. 

180 

181 For compatibility with older versions of Python, coroutines may 

182 also "return" by raising the special exception `Return(value) 

183 <Return>`. 

184 

185 Functions with this decorator return a `.Future`. 

186 

187 .. warning:: 

188 

189 When exceptions occur inside a coroutine, the exception 

190 information will be stored in the `.Future` object. You must 

191 examine the result of the `.Future` object, or the exception 

192 may go unnoticed by your code. This means yielding the function 

193 if called from another coroutine, using something like 

194 `.IOLoop.run_sync` for top-level calls, or passing the `.Future` 

195 to `.IOLoop.add_future`. 

196 

197 .. versionchanged:: 6.0 

198 

199 The ``callback`` argument was removed. Use the returned 

200 awaitable object instead. 

201 

202 """ 

203 

204 @functools.wraps(func) 

205 def wrapper(*args, **kwargs): 

206 # type: (*Any, **Any) -> Future[_T] 

207 # This function is type-annotated with a comment to work around 

208 # https://bitbucket.org/pypy/pypy/issues/2868/segfault-with-args-type-annotation-in 

209 future = _create_future() 

210 if contextvars is not None: 

211 ctx_run = contextvars.copy_context().run # type: Callable 

212 else: 

213 ctx_run = _fake_ctx_run 

214 try: 

215 result = ctx_run(func, *args, **kwargs) 

216 except (Return, StopIteration) as e: 

217 result = _value_from_stopiteration(e) 

218 except Exception: 

219 future_set_exc_info(future, sys.exc_info()) 

220 try: 

221 return future 

222 finally: 

223 # Avoid circular references 

224 future = None # type: ignore 

225 else: 

226 if isinstance(result, Generator): 

227 # Inline the first iteration of Runner.run. This lets us 

228 # avoid the cost of creating a Runner when the coroutine 

229 # never actually yields, which in turn allows us to 

230 # use "optional" coroutines in critical path code without 

231 # performance penalty for the synchronous case. 

232 try: 

233 yielded = ctx_run(next, result) 

234 except (StopIteration, Return) as e: 

235 future_set_result_unless_cancelled( 

236 future, _value_from_stopiteration(e) 

237 ) 

238 except Exception: 

239 future_set_exc_info(future, sys.exc_info()) 

240 else: 

241 # Provide strong references to Runner objects as long 

242 # as their result future objects also have strong 

243 # references (typically from the parent coroutine's 

244 # Runner). This keeps the coroutine's Runner alive. 

245 # We do this by exploiting the public API 

246 # add_done_callback() instead of putting a private 

247 # attribute on the Future. 

248 # (GitHub issues #1769, #2229). 

249 runner = Runner(ctx_run, result, future, yielded) 

250 future.add_done_callback(lambda _: runner) 

251 yielded = None 

252 try: 

253 return future 

254 finally: 

255 # Subtle memory optimization: if next() raised an exception, 

256 # the future's exc_info contains a traceback which 

257 # includes this stack frame. This creates a cycle, 

258 # which will be collected at the next full GC but has 

259 # been shown to greatly increase memory usage of 

260 # benchmarks (relative to the refcount-based scheme 

261 # used in the absence of cycles). We can avoid the 

262 # cycle by clearing the local variable after we return it. 

263 future = None # type: ignore 

264 future_set_result_unless_cancelled(future, result) 

265 return future 

266 

267 wrapper.__wrapped__ = func # type: ignore 

268 wrapper.__tornado_coroutine__ = True # type: ignore 

269 return wrapper 

270 

271 

272def is_coroutine_function(func: Any) -> bool: 

273 """Return whether *func* is a coroutine function, i.e. a function 

274 wrapped with `~.gen.coroutine`. 

275 

276 .. versionadded:: 4.5 

277 """ 

278 return getattr(func, "__tornado_coroutine__", False) 

279 

280 

281class Return(Exception): 

282 """Special exception to return a value from a `coroutine`. 

283 

284 If this exception is raised, its value argument is used as the 

285 result of the coroutine:: 

286 

287 @gen.coroutine 

288 def fetch_json(url): 

289 response = yield AsyncHTTPClient().fetch(url) 

290 raise gen.Return(json_decode(response.body)) 

291 

292 In Python 3.3, this exception is no longer necessary: the ``return`` 

293 statement can be used directly to return a value (previously 

294 ``yield`` and ``return`` with a value could not be combined in the 

295 same function). 

296 

297 By analogy with the return statement, the value argument is optional, 

298 but it is never necessary to ``raise gen.Return()``. The ``return`` 

299 statement can be used with no arguments instead. 

300 """ 

301 

302 def __init__(self, value: Any = None) -> None: 

303 super().__init__() 

304 self.value = value 

305 # Cython recognizes subclasses of StopIteration with a .args tuple. 

306 self.args = (value,) 

307 

308 

309class WaitIterator(object): 

310 """Provides an iterator to yield the results of awaitables as they finish. 

311 

312 Yielding a set of awaitables like this: 

313 

314 ``results = yield [awaitable1, awaitable2]`` 

315 

316 pauses the coroutine until both ``awaitable1`` and ``awaitable2`` 

317 return, and then restarts the coroutine with the results of both 

318 awaitables. If either awaitable raises an exception, the 

319 expression will raise that exception and all the results will be 

320 lost. 

321 

322 If you need to get the result of each awaitable as soon as possible, 

323 or if you need the result of some awaitables even if others produce 

324 errors, you can use ``WaitIterator``:: 

325 

326 wait_iterator = gen.WaitIterator(awaitable1, awaitable2) 

327 while not wait_iterator.done(): 

328 try: 

329 result = yield wait_iterator.next() 

330 except Exception as e: 

331 print("Error {} from {}".format(e, wait_iterator.current_future)) 

332 else: 

333 print("Result {} received from {} at {}".format( 

334 result, wait_iterator.current_future, 

335 wait_iterator.current_index)) 

336 

337 Because results are returned as soon as they are available the 

338 output from the iterator *will not be in the same order as the 

339 input arguments*. If you need to know which future produced the 

340 current result, you can use the attributes 

341 ``WaitIterator.current_future``, or ``WaitIterator.current_index`` 

342 to get the index of the awaitable from the input list. (if keyword 

343 arguments were used in the construction of the `WaitIterator`, 

344 ``current_index`` will use the corresponding keyword). 

345 

346 On Python 3.5, `WaitIterator` implements the async iterator 

347 protocol, so it can be used with the ``async for`` statement (note 

348 that in this version the entire iteration is aborted if any value 

349 raises an exception, while the previous example can continue past 

350 individual errors):: 

351 

352 async for result in gen.WaitIterator(future1, future2): 

353 print("Result {} received from {} at {}".format( 

354 result, wait_iterator.current_future, 

355 wait_iterator.current_index)) 

356 

357 .. versionadded:: 4.1 

358 

359 .. versionchanged:: 4.3 

360 Added ``async for`` support in Python 3.5. 

361 

362 """ 

363 

364 _unfinished = {} # type: Dict[Future, Union[int, str]] 

365 

366 def __init__(self, *args: Future, **kwargs: Future) -> None: 

367 if args and kwargs: 

368 raise ValueError("You must provide args or kwargs, not both") 

369 

370 if kwargs: 

371 self._unfinished = dict((f, k) for (k, f) in kwargs.items()) 

372 futures = list(kwargs.values()) # type: Sequence[Future] 

373 else: 

374 self._unfinished = dict((f, i) for (i, f) in enumerate(args)) 

375 futures = args 

376 

377 self._finished = collections.deque() # type: Deque[Future] 

378 self.current_index = None # type: Optional[Union[str, int]] 

379 self.current_future = None # type: Optional[Future] 

380 self._running_future = None # type: Optional[Future] 

381 

382 for future in futures: 

383 future_add_done_callback(future, self._done_callback) 

384 

385 def done(self) -> bool: 

386 """Returns True if this iterator has no more results.""" 

387 if self._finished or self._unfinished: 

388 return False 

389 # Clear the 'current' values when iteration is done. 

390 self.current_index = self.current_future = None 

391 return True 

392 

393 def next(self) -> Future: 

394 """Returns a `.Future` that will yield the next available result. 

395 

396 Note that this `.Future` will not be the same object as any of 

397 the inputs. 

398 """ 

399 self._running_future = Future() 

400 

401 if self._finished: 

402 return self._return_result(self._finished.popleft()) 

403 

404 return self._running_future 

405 

406 def _done_callback(self, done: Future) -> None: 

407 if self._running_future and not self._running_future.done(): 

408 self._return_result(done) 

409 else: 

410 self._finished.append(done) 

411 

412 def _return_result(self, done: Future) -> Future: 

413 """Called set the returned future's state that of the future 

414 we yielded, and set the current future for the iterator. 

415 """ 

416 if self._running_future is None: 

417 raise Exception("no future is running") 

418 chain_future(done, self._running_future) 

419 

420 res = self._running_future 

421 self._running_future = None 

422 self.current_future = done 

423 self.current_index = self._unfinished.pop(done) 

424 

425 return res 

426 

427 def __aiter__(self) -> typing.AsyncIterator: 

428 return self 

429 

430 def __anext__(self) -> Future: 

431 if self.done(): 

432 # Lookup by name to silence pyflakes on older versions. 

433 raise getattr(builtins, "StopAsyncIteration")() 

434 return self.next() 

435 

436 

437def multi( 

438 children: Union[List[_Yieldable], Dict[Any, _Yieldable]], 

439 quiet_exceptions: "Union[Type[Exception], Tuple[Type[Exception], ...]]" = (), 

440) -> "Union[Future[List], Future[Dict]]": 

441 """Runs multiple asynchronous operations in parallel. 

442 

443 ``children`` may either be a list or a dict whose values are 

444 yieldable objects. ``multi()`` returns a new yieldable 

445 object that resolves to a parallel structure containing their 

446 results. If ``children`` is a list, the result is a list of 

447 results in the same order; if it is a dict, the result is a dict 

448 with the same keys. 

449 

450 That is, ``results = yield multi(list_of_futures)`` is equivalent 

451 to:: 

452 

453 results = [] 

454 for future in list_of_futures: 

455 results.append(yield future) 

456 

457 If any children raise exceptions, ``multi()`` will raise the first 

458 one. All others will be logged, unless they are of types 

459 contained in the ``quiet_exceptions`` argument. 

460 

461 In a ``yield``-based coroutine, it is not normally necessary to 

462 call this function directly, since the coroutine runner will 

463 do it automatically when a list or dict is yielded. However, 

464 it is necessary in ``await``-based coroutines, or to pass 

465 the ``quiet_exceptions`` argument. 

466 

467 This function is available under the names ``multi()`` and ``Multi()`` 

468 for historical reasons. 

469 

470 Cancelling a `.Future` returned by ``multi()`` does not cancel its 

471 children. `asyncio.gather` is similar to ``multi()``, but it does 

472 cancel its children. 

473 

474 .. versionchanged:: 4.2 

475 If multiple yieldables fail, any exceptions after the first 

476 (which is raised) will be logged. Added the ``quiet_exceptions`` 

477 argument to suppress this logging for selected exception types. 

478 

479 .. versionchanged:: 4.3 

480 Replaced the class ``Multi`` and the function ``multi_future`` 

481 with a unified function ``multi``. Added support for yieldables 

482 other than ``YieldPoint`` and `.Future`. 

483 

484 """ 

485 return multi_future(children, quiet_exceptions=quiet_exceptions) 

486 

487 

488Multi = multi 

489 

490 

491def multi_future( 

492 children: Union[List[_Yieldable], Dict[Any, _Yieldable]], 

493 quiet_exceptions: "Union[Type[Exception], Tuple[Type[Exception], ...]]" = (), 

494) -> "Union[Future[List], Future[Dict]]": 

495 """Wait for multiple asynchronous futures in parallel. 

496 

497 Since Tornado 6.0, this function is exactly the same as `multi`. 

498 

499 .. versionadded:: 4.0 

500 

501 .. versionchanged:: 4.2 

502 If multiple ``Futures`` fail, any exceptions after the first (which is 

503 raised) will be logged. Added the ``quiet_exceptions`` 

504 argument to suppress this logging for selected exception types. 

505 

506 .. deprecated:: 4.3 

507 Use `multi` instead. 

508 """ 

509 if isinstance(children, dict): 

510 keys = list(children.keys()) # type: Optional[List] 

511 children_seq = children.values() # type: Iterable 

512 else: 

513 keys = None 

514 children_seq = children 

515 children_futs = list(map(convert_yielded, children_seq)) 

516 assert all(is_future(i) or isinstance(i, _NullFuture) for i in children_futs) 

517 unfinished_children = set(children_futs) 

518 

519 future = _create_future() 

520 if not children_futs: 

521 future_set_result_unless_cancelled(future, {} if keys is not None else []) 

522 

523 def callback(fut: Future) -> None: 

524 unfinished_children.remove(fut) 

525 if not unfinished_children: 

526 result_list = [] 

527 for f in children_futs: 

528 try: 

529 result_list.append(f.result()) 

530 except Exception as e: 

531 if future.done(): 

532 if not isinstance(e, quiet_exceptions): 

533 app_log.error( 

534 "Multiple exceptions in yield list", exc_info=True 

535 ) 

536 else: 

537 future_set_exc_info(future, sys.exc_info()) 

538 if not future.done(): 

539 if keys is not None: 

540 future_set_result_unless_cancelled( 

541 future, dict(zip(keys, result_list)) 

542 ) 

543 else: 

544 future_set_result_unless_cancelled(future, result_list) 

545 

546 listening = set() # type: Set[Future] 

547 for f in children_futs: 

548 if f not in listening: 

549 listening.add(f) 

550 future_add_done_callback(f, callback) 

551 return future 

552 

553 

554def maybe_future(x: Any) -> Future: 

555 """Converts ``x`` into a `.Future`. 

556 

557 If ``x`` is already a `.Future`, it is simply returned; otherwise 

558 it is wrapped in a new `.Future`. This is suitable for use as 

559 ``result = yield gen.maybe_future(f())`` when you don't know whether 

560 ``f()`` returns a `.Future` or not. 

561 

562 .. deprecated:: 4.3 

563 This function only handles ``Futures``, not other yieldable objects. 

564 Instead of `maybe_future`, check for the non-future result types 

565 you expect (often just ``None``), and ``yield`` anything unknown. 

566 """ 

567 if is_future(x): 

568 return x 

569 else: 

570 fut = _create_future() 

571 fut.set_result(x) 

572 return fut 

573 

574 

575def with_timeout( 

576 timeout: Union[float, datetime.timedelta], 

577 future: _Yieldable, 

578 quiet_exceptions: "Union[Type[Exception], Tuple[Type[Exception], ...]]" = (), 

579) -> Future: 

580 """Wraps a `.Future` (or other yieldable object) in a timeout. 

581 

582 Raises `tornado.util.TimeoutError` if the input future does not 

583 complete before ``timeout``, which may be specified in any form 

584 allowed by `.IOLoop.add_timeout` (i.e. a `datetime.timedelta` or 

585 an absolute time relative to `.IOLoop.time`) 

586 

587 If the wrapped `.Future` fails after it has timed out, the exception 

588 will be logged unless it is either of a type contained in 

589 ``quiet_exceptions`` (which may be an exception type or a sequence of 

590 types), or an ``asyncio.CancelledError``. 

591 

592 The wrapped `.Future` is not canceled when the timeout expires, 

593 permitting it to be reused. `asyncio.wait_for` is similar to this 

594 function but it does cancel the wrapped `.Future` on timeout. 

595 

596 .. versionadded:: 4.0 

597 

598 .. versionchanged:: 4.1 

599 Added the ``quiet_exceptions`` argument and the logging of unhandled 

600 exceptions. 

601 

602 .. versionchanged:: 4.4 

603 Added support for yieldable objects other than `.Future`. 

604 

605 .. versionchanged:: 6.0.3 

606 ``asyncio.CancelledError`` is now always considered "quiet". 

607 

608 .. versionchanged:: 6.2 

609 ``tornado.util.TimeoutError`` is now an alias to ``asyncio.TimeoutError``. 

610 

611 """ 

612 # It's tempting to optimize this by cancelling the input future on timeout 

613 # instead of creating a new one, but A) we can't know if we are the only 

614 # one waiting on the input future, so cancelling it might disrupt other 

615 # callers and B) concurrent futures can only be cancelled while they are 

616 # in the queue, so cancellation cannot reliably bound our waiting time. 

617 future_converted = convert_yielded(future) 

618 result = _create_future() 

619 chain_future(future_converted, result) 

620 io_loop = IOLoop.current() 

621 

622 def error_callback(future: Future) -> None: 

623 try: 

624 future.result() 

625 except asyncio.CancelledError: 

626 pass 

627 except Exception as e: 

628 if not isinstance(e, quiet_exceptions): 

629 app_log.error( 

630 "Exception in Future %r after timeout", future, exc_info=True 

631 ) 

632 

633 def timeout_callback() -> None: 

634 if not result.done(): 

635 result.set_exception(TimeoutError("Timeout")) 

636 # In case the wrapped future goes on to fail, log it. 

637 future_add_done_callback(future_converted, error_callback) 

638 

639 timeout_handle = io_loop.add_timeout(timeout, timeout_callback) 

640 if isinstance(future_converted, Future): 

641 # We know this future will resolve on the IOLoop, so we don't 

642 # need the extra thread-safety of IOLoop.add_future (and we also 

643 # don't care about StackContext here. 

644 future_add_done_callback( 

645 future_converted, lambda future: io_loop.remove_timeout(timeout_handle) 

646 ) 

647 else: 

648 # concurrent.futures.Futures may resolve on any thread, so we 

649 # need to route them back to the IOLoop. 

650 io_loop.add_future( 

651 future_converted, lambda future: io_loop.remove_timeout(timeout_handle) 

652 ) 

653 return result 

654 

655 

656def sleep(duration: float) -> "Future[None]": 

657 """Return a `.Future` that resolves after the given number of seconds. 

658 

659 When used with ``yield`` in a coroutine, this is a non-blocking 

660 analogue to `time.sleep` (which should not be used in coroutines 

661 because it is blocking):: 

662 

663 yield gen.sleep(0.5) 

664 

665 Note that calling this function on its own does nothing; you must 

666 wait on the `.Future` it returns (usually by yielding it). 

667 

668 .. versionadded:: 4.1 

669 """ 

670 f = _create_future() 

671 IOLoop.current().call_later( 

672 duration, lambda: future_set_result_unless_cancelled(f, None) 

673 ) 

674 return f 

675 

676 

677class _NullFuture(object): 

678 """_NullFuture resembles a Future that finished with a result of None. 

679 

680 It's not actually a `Future` to avoid depending on a particular event loop. 

681 Handled as a special case in the coroutine runner. 

682 

683 We lie and tell the type checker that a _NullFuture is a Future so 

684 we don't have to leak _NullFuture into lots of public APIs. But 

685 this means that the type checker can't warn us when we're passing 

686 a _NullFuture into a code path that doesn't understand what to do 

687 with it. 

688 """ 

689 

690 def result(self) -> None: 

691 return None 

692 

693 def done(self) -> bool: 

694 return True 

695 

696 

697# _null_future is used as a dummy value in the coroutine runner. It differs 

698# from moment in that moment always adds a delay of one IOLoop iteration 

699# while _null_future is processed as soon as possible. 

700_null_future = typing.cast(Future, _NullFuture()) 

701 

702moment = typing.cast(Future, _NullFuture()) 

703moment.__doc__ = """A special object which may be yielded to allow the IOLoop to run for 

704one iteration. 

705 

706This is not needed in normal use but it can be helpful in long-running 

707coroutines that are likely to yield Futures that are ready instantly. 

708 

709Usage: ``yield gen.moment`` 

710 

711In native coroutines, the equivalent of ``yield gen.moment`` is 

712``await asyncio.sleep(0)``. 

713 

714.. versionadded:: 4.0 

715 

716.. deprecated:: 4.5 

717 ``yield None`` (or ``yield`` with no argument) is now equivalent to 

718 ``yield gen.moment``. 

719""" 

720 

721 

722class Runner(object): 

723 """Internal implementation of `tornado.gen.coroutine`. 

724 

725 Maintains information about pending callbacks and their results. 

726 

727 The results of the generator are stored in ``result_future`` (a 

728 `.Future`) 

729 """ 

730 

731 def __init__( 

732 self, 

733 ctx_run: Callable, 

734 gen: "Generator[_Yieldable, Any, _T]", 

735 result_future: "Future[_T]", 

736 first_yielded: _Yieldable, 

737 ) -> None: 

738 self.ctx_run = ctx_run 

739 self.gen = gen 

740 self.result_future = result_future 

741 self.future = _null_future # type: Union[None, Future] 

742 self.running = False 

743 self.finished = False 

744 self.io_loop = IOLoop.current() 

745 if self.ctx_run(self.handle_yield, first_yielded): 

746 gen = result_future = first_yielded = None # type: ignore 

747 self.ctx_run(self.run) 

748 

749 def run(self) -> None: 

750 """Starts or resumes the generator, running until it reaches a 

751 yield point that is not ready. 

752 """ 

753 if self.running or self.finished: 

754 return 

755 try: 

756 self.running = True 

757 while True: 

758 future = self.future 

759 if future is None: 

760 raise Exception("No pending future") 

761 if not future.done(): 

762 return 

763 self.future = None 

764 try: 

765 try: 

766 value = future.result() 

767 except Exception as e: 

768 # Save the exception for later. It's important that 

769 # gen.throw() not be called inside this try/except block 

770 # because that makes sys.exc_info behave unexpectedly. 

771 exc: Optional[Exception] = e 

772 else: 

773 exc = None 

774 finally: 

775 future = None 

776 

777 if exc is not None: 

778 try: 

779 yielded = self.gen.throw(exc) 

780 finally: 

781 # Break up a circular reference for faster GC on 

782 # CPython. 

783 del exc 

784 else: 

785 yielded = self.gen.send(value) 

786 

787 except (StopIteration, Return) as e: 

788 self.finished = True 

789 self.future = _null_future 

790 future_set_result_unless_cancelled( 

791 self.result_future, _value_from_stopiteration(e) 

792 ) 

793 self.result_future = None # type: ignore 

794 return 

795 except Exception: 

796 self.finished = True 

797 self.future = _null_future 

798 future_set_exc_info(self.result_future, sys.exc_info()) 

799 self.result_future = None # type: ignore 

800 return 

801 if not self.handle_yield(yielded): 

802 return 

803 yielded = None 

804 finally: 

805 self.running = False 

806 

807 def handle_yield(self, yielded: _Yieldable) -> bool: 

808 try: 

809 self.future = convert_yielded(yielded) 

810 except BadYieldError: 

811 self.future = Future() 

812 future_set_exc_info(self.future, sys.exc_info()) 

813 

814 if self.future is moment: 

815 self.io_loop.add_callback(self.ctx_run, self.run) 

816 return False 

817 elif self.future is None: 

818 raise Exception("no pending future") 

819 elif not self.future.done(): 

820 

821 def inner(f: Any) -> None: 

822 # Break a reference cycle to speed GC. 

823 f = None # noqa: F841 

824 self.ctx_run(self.run) 

825 

826 self.io_loop.add_future(self.future, inner) 

827 return False 

828 return True 

829 

830 def handle_exception( 

831 self, typ: Type[Exception], value: Exception, tb: types.TracebackType 

832 ) -> bool: 

833 if not self.running and not self.finished: 

834 self.future = Future() 

835 future_set_exc_info(self.future, (typ, value, tb)) 

836 self.ctx_run(self.run) 

837 return True 

838 else: 

839 return False 

840 

841 

842def _wrap_awaitable(awaitable: Awaitable) -> Future: 

843 # Convert Awaitables into Futures. 

844 # Note that we use ensure_future, which handles both awaitables 

845 # and coroutines, rather than create_task, which only accepts 

846 # coroutines. (ensure_future calls create_task if given a coroutine) 

847 fut = asyncio.ensure_future(awaitable) 

848 # See comments on IOLoop._pending_tasks. 

849 loop = IOLoop.current() 

850 loop._register_task(fut) 

851 fut.add_done_callback(lambda f: loop._unregister_task(f)) 

852 return fut 

853 

854 

855def convert_yielded(yielded: _Yieldable) -> Future: 

856 """Convert a yielded object into a `.Future`. 

857 

858 The default implementation accepts lists, dictionaries, and 

859 Futures. This has the side effect of starting any coroutines that 

860 did not start themselves, similar to `asyncio.ensure_future`. 

861 

862 If the `~functools.singledispatch` library is available, this function 

863 may be extended to support additional types. For example:: 

864 

865 @convert_yielded.register(asyncio.Future) 

866 def _(asyncio_future): 

867 return tornado.platform.asyncio.to_tornado_future(asyncio_future) 

868 

869 .. versionadded:: 4.1 

870 

871 """ 

872 if yielded is None or yielded is moment: 

873 return moment 

874 elif yielded is _null_future: 

875 return _null_future 

876 elif isinstance(yielded, (list, dict)): 

877 return multi(yielded) # type: ignore 

878 elif is_future(yielded): 

879 return typing.cast(Future, yielded) 

880 elif isawaitable(yielded): 

881 return _wrap_awaitable(yielded) # type: ignore 

882 else: 

883 raise BadYieldError("yielded unknown object %r" % (yielded,)) 

884 

885 

886convert_yielded = singledispatch(convert_yielded)