1#
2# Copyright 2012 Facebook
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15"""Utilities for working with ``Future`` objects.
16
17Tornado previously provided its own ``Future`` class, but now uses
18`asyncio.Future`. This module contains utility functions for working
19with `asyncio.Future` in a way that is backwards-compatible with
20Tornado's old ``Future`` implementation.
21
22While this module is an important part of Tornado's internal
23implementation, applications rarely need to interact with it
24directly.
25
26"""
27
28import asyncio
29from concurrent import futures
30import functools
31import sys
32import types
33
34from tornado.log import app_log
35
36import typing
37from typing import Any, Callable, Optional, Tuple, Union
38
39_T = typing.TypeVar("_T")
40
41
42class ReturnValueIgnoredError(Exception):
43 # No longer used; was previously used by @return_future
44 pass
45
46
47Future = asyncio.Future
48
49FUTURES = (futures.Future, Future)
50
51
52def is_future(x: Any) -> bool:
53 return isinstance(x, FUTURES)
54
55
56class DummyExecutor(futures.Executor):
57 def submit( # type: ignore[override]
58 self, fn: Callable[..., _T], *args: Any, **kwargs: Any
59 ) -> "futures.Future[_T]":
60 future = futures.Future() # type: futures.Future[_T]
61 try:
62 future_set_result_unless_cancelled(future, fn(*args, **kwargs))
63 except Exception:
64 future_set_exc_info(future, sys.exc_info())
65 return future
66
67 if sys.version_info >= (3, 9):
68
69 def shutdown(self, wait: bool = True, cancel_futures: bool = False) -> None:
70 pass
71
72 else:
73
74 def shutdown(self, wait: bool = True) -> None:
75 pass
76
77
78dummy_executor = DummyExecutor()
79
80
81def run_on_executor(*args: Any, **kwargs: Any) -> Callable:
82 """Decorator to run a synchronous method asynchronously on an executor.
83
84 Returns a future.
85
86 The executor to be used is determined by the ``executor``
87 attributes of ``self``. To use a different attribute name, pass a
88 keyword argument to the decorator::
89
90 @run_on_executor(executor='_thread_pool')
91 def foo(self):
92 pass
93
94 This decorator should not be confused with the similarly-named
95 `.IOLoop.run_in_executor`. In general, using ``run_in_executor``
96 when *calling* a blocking method is recommended instead of using
97 this decorator when *defining* a method. If compatibility with older
98 versions of Tornado is required, consider defining an executor
99 and using ``executor.submit()`` at the call site.
100
101 .. versionchanged:: 4.2
102 Added keyword arguments to use alternative attributes.
103
104 .. versionchanged:: 5.0
105 Always uses the current IOLoop instead of ``self.io_loop``.
106
107 .. versionchanged:: 5.1
108 Returns a `.Future` compatible with ``await`` instead of a
109 `concurrent.futures.Future`.
110
111 .. deprecated:: 5.1
112
113 The ``callback`` argument is deprecated and will be removed in
114 6.0. The decorator itself is discouraged in new code but will
115 not be removed in 6.0.
116
117 .. versionchanged:: 6.0
118
119 The ``callback`` argument was removed.
120 """
121
122 # Fully type-checking decorators is tricky, and this one is
123 # discouraged anyway so it doesn't have all the generic magic.
124 def run_on_executor_decorator(fn: Callable) -> Callable[..., Future]:
125 executor = kwargs.get("executor", "executor")
126
127 @functools.wraps(fn)
128 def wrapper(self: Any, *args: Any, **kwargs: Any) -> Future:
129 async_future = Future() # type: Future
130 conc_future = getattr(self, executor).submit(fn, self, *args, **kwargs)
131 chain_future(conc_future, async_future)
132 return async_future
133
134 return wrapper
135
136 if args and kwargs:
137 raise ValueError("cannot combine positional and keyword args")
138 if len(args) == 1:
139 return run_on_executor_decorator(args[0])
140 elif len(args) != 0:
141 raise ValueError("expected 1 argument, got %d", len(args))
142 return run_on_executor_decorator
143
144
145_NO_RESULT = object()
146
147
148def chain_future(
149 a: Union["Future[_T]", "futures.Future[_T]"],
150 b: Union["Future[_T]", "futures.Future[_T]"],
151) -> None:
152 """Chain two futures together so that when one completes, so does the other.
153
154 The result (success or failure) of ``a`` will be copied to ``b``, unless
155 ``b`` has already been completed or cancelled by the time ``a`` finishes.
156
157 .. versionchanged:: 5.0
158
159 Now accepts both Tornado/asyncio `Future` objects and
160 `concurrent.futures.Future`.
161
162 """
163
164 def copy(a: "Future[_T]") -> None:
165 if b.done():
166 return
167 if hasattr(a, "exc_info") and a.exc_info() is not None: # type: ignore
168 future_set_exc_info(b, a.exc_info()) # type: ignore
169 else:
170 a_exc = a.exception()
171 if a_exc is not None:
172 b.set_exception(a_exc)
173 else:
174 b.set_result(a.result())
175
176 if isinstance(a, Future):
177 future_add_done_callback(a, copy)
178 else:
179 # concurrent.futures.Future
180 from tornado.ioloop import IOLoop
181
182 IOLoop.current().add_future(a, copy)
183
184
185def future_set_result_unless_cancelled(
186 future: "Union[futures.Future[_T], Future[_T]]", value: _T
187) -> None:
188 """Set the given ``value`` as the `Future`'s result, if not cancelled.
189
190 Avoids ``asyncio.InvalidStateError`` when calling ``set_result()`` on
191 a cancelled `asyncio.Future`.
192
193 .. versionadded:: 5.0
194 """
195 if not future.cancelled():
196 future.set_result(value)
197
198
199def future_set_exception_unless_cancelled(
200 future: "Union[futures.Future[_T], Future[_T]]", exc: BaseException
201) -> None:
202 """Set the given ``exc`` as the `Future`'s exception.
203
204 If the Future is already canceled, logs the exception instead. If
205 this logging is not desired, the caller should explicitly check
206 the state of the Future and call ``Future.set_exception`` instead of
207 this wrapper.
208
209 Avoids ``asyncio.InvalidStateError`` when calling ``set_exception()`` on
210 a cancelled `asyncio.Future`.
211
212 .. versionadded:: 6.0
213
214 """
215 if not future.cancelled():
216 future.set_exception(exc)
217 else:
218 app_log.error("Exception after Future was cancelled", exc_info=exc)
219
220
221def future_set_exc_info(
222 future: "Union[futures.Future[_T], Future[_T]]",
223 exc_info: Tuple[
224 Optional[type], Optional[BaseException], Optional[types.TracebackType]
225 ],
226) -> None:
227 """Set the given ``exc_info`` as the `Future`'s exception.
228
229 Understands both `asyncio.Future` and the extensions in older
230 versions of Tornado to enable better tracebacks on Python 2.
231
232 .. versionadded:: 5.0
233
234 .. versionchanged:: 6.0
235
236 If the future is already cancelled, this function is a no-op.
237 (previously ``asyncio.InvalidStateError`` would be raised)
238
239 """
240 if exc_info[1] is None:
241 raise Exception("future_set_exc_info called with no exception")
242 future_set_exception_unless_cancelled(future, exc_info[1])
243
244
245@typing.overload
246def future_add_done_callback(
247 future: "futures.Future[_T]", callback: Callable[["futures.Future[_T]"], None]
248) -> None:
249 pass
250
251
252@typing.overload # noqa: F811
253def future_add_done_callback(
254 future: "Future[_T]", callback: Callable[["Future[_T]"], None]
255) -> None:
256 pass
257
258
259def future_add_done_callback( # noqa: F811
260 future: "Union[futures.Future[_T], Future[_T]]", callback: Callable[..., None]
261) -> None:
262 """Arrange to call ``callback`` when ``future`` is complete.
263
264 ``callback`` is invoked with one argument, the ``future``.
265
266 If ``future`` is already done, ``callback`` is invoked immediately.
267 This may differ from the behavior of ``Future.add_done_callback``,
268 which makes no such guarantee.
269
270 .. versionadded:: 5.0
271 """
272 if future.done():
273 callback(future)
274 else:
275 future.add_done_callback(callback)