Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tornado/concurrent.py: 47%
81 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-01 06:54 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-01 06:54 +0000
1#
2# Copyright 2012 Facebook
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15"""Utilities for working with ``Future`` objects.
17Tornado previously provided its own ``Future`` class, but now uses
18`asyncio.Future`. This module contains utility functions for working
19with `asyncio.Future` in a way that is backwards-compatible with
20Tornado's old ``Future`` implementation.
22While this module is an important part of Tornado's internal
23implementation, applications rarely need to interact with it
24directly.
26"""
28import asyncio
29from concurrent import futures
30import functools
31import sys
32import types
34from tornado.log import app_log
36import typing
37from typing import Any, Callable, Optional, Tuple, Union
39_T = typing.TypeVar("_T")
42class ReturnValueIgnoredError(Exception):
43 # No longer used; was previously used by @return_future
44 pass
47Future = asyncio.Future
49FUTURES = (futures.Future, Future)
52def is_future(x: Any) -> bool:
53 return isinstance(x, FUTURES)
56class DummyExecutor(futures.Executor):
57 def submit(
58 self, fn: Callable[..., _T], *args: Any, **kwargs: Any
59 ) -> "futures.Future[_T]":
60 future = futures.Future() # type: futures.Future[_T]
61 try:
62 future_set_result_unless_cancelled(future, fn(*args, **kwargs))
63 except Exception:
64 future_set_exc_info(future, sys.exc_info())
65 return future
67 def shutdown(self, wait: bool = True) -> None:
68 pass
71dummy_executor = DummyExecutor()
74def run_on_executor(*args: Any, **kwargs: Any) -> Callable:
75 """Decorator to run a synchronous method asynchronously on an executor.
77 Returns a future.
79 The executor to be used is determined by the ``executor``
80 attributes of ``self``. To use a different attribute name, pass a
81 keyword argument to the decorator::
83 @run_on_executor(executor='_thread_pool')
84 def foo(self):
85 pass
87 This decorator should not be confused with the similarly-named
88 `.IOLoop.run_in_executor`. In general, using ``run_in_executor``
89 when *calling* a blocking method is recommended instead of using
90 this decorator when *defining* a method. If compatibility with older
91 versions of Tornado is required, consider defining an executor
92 and using ``executor.submit()`` at the call site.
94 .. versionchanged:: 4.2
95 Added keyword arguments to use alternative attributes.
97 .. versionchanged:: 5.0
98 Always uses the current IOLoop instead of ``self.io_loop``.
100 .. versionchanged:: 5.1
101 Returns a `.Future` compatible with ``await`` instead of a
102 `concurrent.futures.Future`.
104 .. deprecated:: 5.1
106 The ``callback`` argument is deprecated and will be removed in
107 6.0. The decorator itself is discouraged in new code but will
108 not be removed in 6.0.
110 .. versionchanged:: 6.0
112 The ``callback`` argument was removed.
113 """
114 # Fully type-checking decorators is tricky, and this one is
115 # discouraged anyway so it doesn't have all the generic magic.
116 def run_on_executor_decorator(fn: Callable) -> Callable[..., Future]:
117 executor = kwargs.get("executor", "executor")
119 @functools.wraps(fn)
120 def wrapper(self: Any, *args: Any, **kwargs: Any) -> Future:
121 async_future = Future() # type: Future
122 conc_future = getattr(self, executor).submit(fn, self, *args, **kwargs)
123 chain_future(conc_future, async_future)
124 return async_future
126 return wrapper
128 if args and kwargs:
129 raise ValueError("cannot combine positional and keyword args")
130 if len(args) == 1:
131 return run_on_executor_decorator(args[0])
132 elif len(args) != 0:
133 raise ValueError("expected 1 argument, got %d", len(args))
134 return run_on_executor_decorator
137_NO_RESULT = object()
140def chain_future(a: "Future[_T]", b: "Future[_T]") -> None:
141 """Chain two futures together so that when one completes, so does the other.
143 The result (success or failure) of ``a`` will be copied to ``b``, unless
144 ``b`` has already been completed or cancelled by the time ``a`` finishes.
146 .. versionchanged:: 5.0
148 Now accepts both Tornado/asyncio `Future` objects and
149 `concurrent.futures.Future`.
151 """
153 def copy(future: "Future[_T]") -> None:
154 assert future is a
155 if b.done():
156 return
157 if hasattr(a, "exc_info") and a.exc_info() is not None: # type: ignore
158 future_set_exc_info(b, a.exc_info()) # type: ignore
159 else:
160 a_exc = a.exception()
161 if a_exc is not None:
162 b.set_exception(a_exc)
163 else:
164 b.set_result(a.result())
166 if isinstance(a, Future):
167 future_add_done_callback(a, copy)
168 else:
169 # concurrent.futures.Future
170 from tornado.ioloop import IOLoop
172 IOLoop.current().add_future(a, copy)
175def future_set_result_unless_cancelled(
176 future: "Union[futures.Future[_T], Future[_T]]", value: _T
177) -> None:
178 """Set the given ``value`` as the `Future`'s result, if not cancelled.
180 Avoids ``asyncio.InvalidStateError`` when calling ``set_result()`` on
181 a cancelled `asyncio.Future`.
183 .. versionadded:: 5.0
184 """
185 if not future.cancelled():
186 future.set_result(value)
189def future_set_exception_unless_cancelled(
190 future: "Union[futures.Future[_T], Future[_T]]", exc: BaseException
191) -> None:
192 """Set the given ``exc`` as the `Future`'s exception.
194 If the Future is already canceled, logs the exception instead. If
195 this logging is not desired, the caller should explicitly check
196 the state of the Future and call ``Future.set_exception`` instead of
197 this wrapper.
199 Avoids ``asyncio.InvalidStateError`` when calling ``set_exception()`` on
200 a cancelled `asyncio.Future`.
202 .. versionadded:: 6.0
204 """
205 if not future.cancelled():
206 future.set_exception(exc)
207 else:
208 app_log.error("Exception after Future was cancelled", exc_info=exc)
211def future_set_exc_info(
212 future: "Union[futures.Future[_T], Future[_T]]",
213 exc_info: Tuple[
214 Optional[type], Optional[BaseException], Optional[types.TracebackType]
215 ],
216) -> None:
217 """Set the given ``exc_info`` as the `Future`'s exception.
219 Understands both `asyncio.Future` and the extensions in older
220 versions of Tornado to enable better tracebacks on Python 2.
222 .. versionadded:: 5.0
224 .. versionchanged:: 6.0
226 If the future is already cancelled, this function is a no-op.
227 (previously ``asyncio.InvalidStateError`` would be raised)
229 """
230 if exc_info[1] is None:
231 raise Exception("future_set_exc_info called with no exception")
232 future_set_exception_unless_cancelled(future, exc_info[1])
235@typing.overload
236def future_add_done_callback(
237 future: "futures.Future[_T]", callback: Callable[["futures.Future[_T]"], None]
238) -> None:
239 pass
242@typing.overload # noqa: F811
243def future_add_done_callback(
244 future: "Future[_T]", callback: Callable[["Future[_T]"], None]
245) -> None:
246 pass
249def future_add_done_callback( # noqa: F811
250 future: "Union[futures.Future[_T], Future[_T]]", callback: Callable[..., None]
251) -> None:
252 """Arrange to call ``callback`` when ``future`` is complete.
254 ``callback`` is invoked with one argument, the ``future``.
256 If ``future`` is already done, ``callback`` is invoked immediately.
257 This may differ from the behavior of ``Future.add_done_callback``,
258 which makes no such guarantee.
260 .. versionadded:: 5.0
261 """
262 if future.done():
263 callback(future)
264 else:
265 future.add_done_callback(callback)