Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/tornado/concurrent.py: 46%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

84 statements  

1# 

2# Copyright 2012 Facebook 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15"""Utilities for working with ``Future`` objects. 

16 

17Tornado previously provided its own ``Future`` class, but now uses 

18`asyncio.Future`. This module contains utility functions for working 

19with `asyncio.Future` in a way that is backwards-compatible with 

20Tornado's old ``Future`` implementation. 

21 

22While this module is an important part of Tornado's internal 

23implementation, applications rarely need to interact with it 

24directly. 

25 

26""" 

27 

28import asyncio 

29from concurrent import futures 

30import functools 

31import sys 

32import types 

33 

34from tornado.log import app_log 

35 

36import typing 

37from typing import Any, Callable, Optional, Tuple, Union 

38 

39_T = typing.TypeVar("_T") 

40 

41 

42class ReturnValueIgnoredError(Exception): 

43 # No longer used; was previously used by @return_future 

44 pass 

45 

46 

47Future = asyncio.Future 

48 

49FUTURES = (futures.Future, Future) 

50 

51 

52def is_future(x: Any) -> bool: 

53 return isinstance(x, FUTURES) 

54 

55 

56class DummyExecutor(futures.Executor): 

57 def submit( # type: ignore[override] 

58 self, fn: Callable[..., _T], *args: Any, **kwargs: Any 

59 ) -> "futures.Future[_T]": 

60 future = futures.Future() # type: futures.Future[_T] 

61 try: 

62 future_set_result_unless_cancelled(future, fn(*args, **kwargs)) 

63 except Exception: 

64 future_set_exc_info(future, sys.exc_info()) 

65 return future 

66 

67 if sys.version_info >= (3, 9): 

68 

69 def shutdown(self, wait: bool = True, cancel_futures: bool = False) -> None: 

70 pass 

71 

72 else: 

73 

74 def shutdown(self, wait: bool = True) -> None: 

75 pass 

76 

77 

78dummy_executor = DummyExecutor() 

79 

80 

81def run_on_executor(*args: Any, **kwargs: Any) -> Callable: 

82 """Decorator to run a synchronous method asynchronously on an executor. 

83 

84 Returns a future. 

85 

86 The executor to be used is determined by the ``executor`` 

87 attributes of ``self``. To use a different attribute name, pass a 

88 keyword argument to the decorator:: 

89 

90 @run_on_executor(executor='_thread_pool') 

91 def foo(self): 

92 pass 

93 

94 This decorator should not be confused with the similarly-named 

95 `.IOLoop.run_in_executor`. In general, using ``run_in_executor`` 

96 when *calling* a blocking method is recommended instead of using 

97 this decorator when *defining* a method. If compatibility with older 

98 versions of Tornado is required, consider defining an executor 

99 and using ``executor.submit()`` at the call site. 

100 

101 .. versionchanged:: 4.2 

102 Added keyword arguments to use alternative attributes. 

103 

104 .. versionchanged:: 5.0 

105 Always uses the current IOLoop instead of ``self.io_loop``. 

106 

107 .. versionchanged:: 5.1 

108 Returns a `.Future` compatible with ``await`` instead of a 

109 `concurrent.futures.Future`. 

110 

111 .. deprecated:: 5.1 

112 

113 The ``callback`` argument is deprecated and will be removed in 

114 6.0. The decorator itself is discouraged in new code but will 

115 not be removed in 6.0. 

116 

117 .. versionchanged:: 6.0 

118 

119 The ``callback`` argument was removed. 

120 """ 

121 

122 # Fully type-checking decorators is tricky, and this one is 

123 # discouraged anyway so it doesn't have all the generic magic. 

124 def run_on_executor_decorator(fn: Callable) -> Callable[..., Future]: 

125 executor = kwargs.get("executor", "executor") 

126 

127 @functools.wraps(fn) 

128 def wrapper(self: Any, *args: Any, **kwargs: Any) -> Future: 

129 async_future = Future() # type: Future 

130 conc_future = getattr(self, executor).submit(fn, self, *args, **kwargs) 

131 chain_future(conc_future, async_future) 

132 return async_future 

133 

134 return wrapper 

135 

136 if args and kwargs: 

137 raise ValueError("cannot combine positional and keyword args") 

138 if len(args) == 1: 

139 return run_on_executor_decorator(args[0]) 

140 elif len(args) != 0: 

141 raise ValueError("expected 1 argument, got %d", len(args)) 

142 return run_on_executor_decorator 

143 

144 

145_NO_RESULT = object() 

146 

147 

148def chain_future( 

149 a: Union["Future[_T]", "futures.Future[_T]"], 

150 b: Union["Future[_T]", "futures.Future[_T]"], 

151) -> None: 

152 """Chain two futures together so that when one completes, so does the other. 

153 

154 The result (success or failure) of ``a`` will be copied to ``b``, unless 

155 ``b`` has already been completed or cancelled by the time ``a`` finishes. 

156 

157 .. versionchanged:: 5.0 

158 

159 Now accepts both Tornado/asyncio `Future` objects and 

160 `concurrent.futures.Future`. 

161 

162 """ 

163 

164 def copy(a: "Future[_T]") -> None: 

165 if b.done(): 

166 return 

167 if hasattr(a, "exc_info") and a.exc_info() is not None: # type: ignore 

168 future_set_exc_info(b, a.exc_info()) # type: ignore 

169 else: 

170 a_exc = a.exception() 

171 if a_exc is not None: 

172 b.set_exception(a_exc) 

173 else: 

174 b.set_result(a.result()) 

175 

176 if isinstance(a, Future): 

177 future_add_done_callback(a, copy) 

178 else: 

179 # concurrent.futures.Future 

180 from tornado.ioloop import IOLoop 

181 

182 IOLoop.current().add_future(a, copy) 

183 

184 

185def future_set_result_unless_cancelled( 

186 future: "Union[futures.Future[_T], Future[_T]]", value: _T 

187) -> None: 

188 """Set the given ``value`` as the `Future`'s result, if not cancelled. 

189 

190 Avoids ``asyncio.InvalidStateError`` when calling ``set_result()`` on 

191 a cancelled `asyncio.Future`. 

192 

193 .. versionadded:: 5.0 

194 """ 

195 if not future.cancelled(): 

196 future.set_result(value) 

197 

198 

199def future_set_exception_unless_cancelled( 

200 future: "Union[futures.Future[_T], Future[_T]]", exc: BaseException 

201) -> None: 

202 """Set the given ``exc`` as the `Future`'s exception. 

203 

204 If the Future is already canceled, logs the exception instead. If 

205 this logging is not desired, the caller should explicitly check 

206 the state of the Future and call ``Future.set_exception`` instead of 

207 this wrapper. 

208 

209 Avoids ``asyncio.InvalidStateError`` when calling ``set_exception()`` on 

210 a cancelled `asyncio.Future`. 

211 

212 .. versionadded:: 6.0 

213 

214 """ 

215 if not future.cancelled(): 

216 future.set_exception(exc) 

217 else: 

218 app_log.error("Exception after Future was cancelled", exc_info=exc) 

219 

220 

221def future_set_exc_info( 

222 future: "Union[futures.Future[_T], Future[_T]]", 

223 exc_info: Tuple[ 

224 Optional[type], Optional[BaseException], Optional[types.TracebackType] 

225 ], 

226) -> None: 

227 """Set the given ``exc_info`` as the `Future`'s exception. 

228 

229 Understands both `asyncio.Future` and the extensions in older 

230 versions of Tornado to enable better tracebacks on Python 2. 

231 

232 .. versionadded:: 5.0 

233 

234 .. versionchanged:: 6.0 

235 

236 If the future is already cancelled, this function is a no-op. 

237 (previously ``asyncio.InvalidStateError`` would be raised) 

238 

239 """ 

240 if exc_info[1] is None: 

241 raise Exception("future_set_exc_info called with no exception") 

242 future_set_exception_unless_cancelled(future, exc_info[1]) 

243 

244 

245@typing.overload 

246def future_add_done_callback( 

247 future: "futures.Future[_T]", callback: Callable[["futures.Future[_T]"], None] 

248) -> None: 

249 pass 

250 

251 

252@typing.overload # noqa: F811 

253def future_add_done_callback( 

254 future: "Future[_T]", callback: Callable[["Future[_T]"], None] 

255) -> None: 

256 pass 

257 

258 

259def future_add_done_callback( # noqa: F811 

260 future: "Union[futures.Future[_T], Future[_T]]", callback: Callable[..., None] 

261) -> None: 

262 """Arrange to call ``callback`` when ``future`` is complete. 

263 

264 ``callback`` is invoked with one argument, the ``future``. 

265 

266 If ``future`` is already done, ``callback`` is invoked immediately. 

267 This may differ from the behavior of ``Future.add_done_callback``, 

268 which makes no such guarantee. 

269 

270 .. versionadded:: 5.0 

271 """ 

272 if future.done(): 

273 callback(future) 

274 else: 

275 future.add_done_callback(callback)