Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tornado/concurrent.py: 47%

83 statements  

« prev     ^ index     » next       coverage.py v7.3.3, created at 2023-12-15 06:13 +0000

1# 

2# Copyright 2012 Facebook 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15"""Utilities for working with ``Future`` objects. 

16 

17Tornado previously provided its own ``Future`` class, but now uses 

18`asyncio.Future`. This module contains utility functions for working 

19with `asyncio.Future` in a way that is backwards-compatible with 

20Tornado's old ``Future`` implementation. 

21 

22While this module is an important part of Tornado's internal 

23implementation, applications rarely need to interact with it 

24directly. 

25 

26""" 

27 

28import asyncio 

29from concurrent import futures 

30import functools 

31import sys 

32import types 

33 

34from tornado.log import app_log 

35 

36import typing 

37from typing import Any, Callable, Optional, Tuple, Union 

38 

39_T = typing.TypeVar("_T") 

40 

41 

42class ReturnValueIgnoredError(Exception): 

43 # No longer used; was previously used by @return_future 

44 pass 

45 

46 

47Future = asyncio.Future 

48 

49FUTURES = (futures.Future, Future) 

50 

51 

52def is_future(x: Any) -> bool: 

53 return isinstance(x, FUTURES) 

54 

55 

56class DummyExecutor(futures.Executor): 

57 def submit( # type: ignore[override] 

58 self, fn: Callable[..., _T], *args: Any, **kwargs: Any 

59 ) -> "futures.Future[_T]": 

60 future = futures.Future() # type: futures.Future[_T] 

61 try: 

62 future_set_result_unless_cancelled(future, fn(*args, **kwargs)) 

63 except Exception: 

64 future_set_exc_info(future, sys.exc_info()) 

65 return future 

66 

67 if sys.version_info >= (3, 9): 

68 

69 def shutdown(self, wait: bool = True, cancel_futures: bool = False) -> None: 

70 pass 

71 

72 else: 

73 

74 def shutdown(self, wait: bool = True) -> None: 

75 pass 

76 

77 

78dummy_executor = DummyExecutor() 

79 

80 

81def run_on_executor(*args: Any, **kwargs: Any) -> Callable: 

82 """Decorator to run a synchronous method asynchronously on an executor. 

83 

84 Returns a future. 

85 

86 The executor to be used is determined by the ``executor`` 

87 attributes of ``self``. To use a different attribute name, pass a 

88 keyword argument to the decorator:: 

89 

90 @run_on_executor(executor='_thread_pool') 

91 def foo(self): 

92 pass 

93 

94 This decorator should not be confused with the similarly-named 

95 `.IOLoop.run_in_executor`. In general, using ``run_in_executor`` 

96 when *calling* a blocking method is recommended instead of using 

97 this decorator when *defining* a method. If compatibility with older 

98 versions of Tornado is required, consider defining an executor 

99 and using ``executor.submit()`` at the call site. 

100 

101 .. versionchanged:: 4.2 

102 Added keyword arguments to use alternative attributes. 

103 

104 .. versionchanged:: 5.0 

105 Always uses the current IOLoop instead of ``self.io_loop``. 

106 

107 .. versionchanged:: 5.1 

108 Returns a `.Future` compatible with ``await`` instead of a 

109 `concurrent.futures.Future`. 

110 

111 .. deprecated:: 5.1 

112 

113 The ``callback`` argument is deprecated and will be removed in 

114 6.0. The decorator itself is discouraged in new code but will 

115 not be removed in 6.0. 

116 

117 .. versionchanged:: 6.0 

118 

119 The ``callback`` argument was removed. 

120 """ 

121 # Fully type-checking decorators is tricky, and this one is 

122 # discouraged anyway so it doesn't have all the generic magic. 

123 def run_on_executor_decorator(fn: Callable) -> Callable[..., Future]: 

124 executor = kwargs.get("executor", "executor") 

125 

126 @functools.wraps(fn) 

127 def wrapper(self: Any, *args: Any, **kwargs: Any) -> Future: 

128 async_future = Future() # type: Future 

129 conc_future = getattr(self, executor).submit(fn, self, *args, **kwargs) 

130 chain_future(conc_future, async_future) 

131 return async_future 

132 

133 return wrapper 

134 

135 if args and kwargs: 

136 raise ValueError("cannot combine positional and keyword args") 

137 if len(args) == 1: 

138 return run_on_executor_decorator(args[0]) 

139 elif len(args) != 0: 

140 raise ValueError("expected 1 argument, got %d", len(args)) 

141 return run_on_executor_decorator 

142 

143 

144_NO_RESULT = object() 

145 

146 

147def chain_future(a: "Future[_T]", b: "Future[_T]") -> None: 

148 """Chain two futures together so that when one completes, so does the other. 

149 

150 The result (success or failure) of ``a`` will be copied to ``b``, unless 

151 ``b`` has already been completed or cancelled by the time ``a`` finishes. 

152 

153 .. versionchanged:: 5.0 

154 

155 Now accepts both Tornado/asyncio `Future` objects and 

156 `concurrent.futures.Future`. 

157 

158 """ 

159 

160 def copy(a: "Future[_T]") -> None: 

161 if b.done(): 

162 return 

163 if hasattr(a, "exc_info") and a.exc_info() is not None: # type: ignore 

164 future_set_exc_info(b, a.exc_info()) # type: ignore 

165 else: 

166 a_exc = a.exception() 

167 if a_exc is not None: 

168 b.set_exception(a_exc) 

169 else: 

170 b.set_result(a.result()) 

171 

172 if isinstance(a, Future): 

173 future_add_done_callback(a, copy) 

174 else: 

175 # concurrent.futures.Future 

176 from tornado.ioloop import IOLoop 

177 

178 IOLoop.current().add_future(a, copy) 

179 

180 

181def future_set_result_unless_cancelled( 

182 future: "Union[futures.Future[_T], Future[_T]]", value: _T 

183) -> None: 

184 """Set the given ``value`` as the `Future`'s result, if not cancelled. 

185 

186 Avoids ``asyncio.InvalidStateError`` when calling ``set_result()`` on 

187 a cancelled `asyncio.Future`. 

188 

189 .. versionadded:: 5.0 

190 """ 

191 if not future.cancelled(): 

192 future.set_result(value) 

193 

194 

195def future_set_exception_unless_cancelled( 

196 future: "Union[futures.Future[_T], Future[_T]]", exc: BaseException 

197) -> None: 

198 """Set the given ``exc`` as the `Future`'s exception. 

199 

200 If the Future is already canceled, logs the exception instead. If 

201 this logging is not desired, the caller should explicitly check 

202 the state of the Future and call ``Future.set_exception`` instead of 

203 this wrapper. 

204 

205 Avoids ``asyncio.InvalidStateError`` when calling ``set_exception()`` on 

206 a cancelled `asyncio.Future`. 

207 

208 .. versionadded:: 6.0 

209 

210 """ 

211 if not future.cancelled(): 

212 future.set_exception(exc) 

213 else: 

214 app_log.error("Exception after Future was cancelled", exc_info=exc) 

215 

216 

217def future_set_exc_info( 

218 future: "Union[futures.Future[_T], Future[_T]]", 

219 exc_info: Tuple[ 

220 Optional[type], Optional[BaseException], Optional[types.TracebackType] 

221 ], 

222) -> None: 

223 """Set the given ``exc_info`` as the `Future`'s exception. 

224 

225 Understands both `asyncio.Future` and the extensions in older 

226 versions of Tornado to enable better tracebacks on Python 2. 

227 

228 .. versionadded:: 5.0 

229 

230 .. versionchanged:: 6.0 

231 

232 If the future is already cancelled, this function is a no-op. 

233 (previously ``asyncio.InvalidStateError`` would be raised) 

234 

235 """ 

236 if exc_info[1] is None: 

237 raise Exception("future_set_exc_info called with no exception") 

238 future_set_exception_unless_cancelled(future, exc_info[1]) 

239 

240 

241@typing.overload 

242def future_add_done_callback( 

243 future: "futures.Future[_T]", callback: Callable[["futures.Future[_T]"], None] 

244) -> None: 

245 pass 

246 

247 

248@typing.overload # noqa: F811 

249def future_add_done_callback( 

250 future: "Future[_T]", callback: Callable[["Future[_T]"], None] 

251) -> None: 

252 pass 

253 

254 

255def future_add_done_callback( # noqa: F811 

256 future: "Union[futures.Future[_T], Future[_T]]", callback: Callable[..., None] 

257) -> None: 

258 """Arrange to call ``callback`` when ``future`` is complete. 

259 

260 ``callback`` is invoked with one argument, the ``future``. 

261 

262 If ``future`` is already done, ``callback`` is invoked immediately. 

263 This may differ from the behavior of ``Future.add_done_callback``, 

264 which makes no such guarantee. 

265 

266 .. versionadded:: 5.0 

267 """ 

268 if future.done(): 

269 callback(future) 

270 else: 

271 future.add_done_callback(callback)