Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tornado/concurrent.py: 47%

81 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-07-01 06:54 +0000

1# 

2# Copyright 2012 Facebook 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15"""Utilities for working with ``Future`` objects. 

16 

17Tornado previously provided its own ``Future`` class, but now uses 

18`asyncio.Future`. This module contains utility functions for working 

19with `asyncio.Future` in a way that is backwards-compatible with 

20Tornado's old ``Future`` implementation. 

21 

22While this module is an important part of Tornado's internal 

23implementation, applications rarely need to interact with it 

24directly. 

25 

26""" 

27 

28import asyncio 

29from concurrent import futures 

30import functools 

31import sys 

32import types 

33 

34from tornado.log import app_log 

35 

36import typing 

37from typing import Any, Callable, Optional, Tuple, Union 

38 

39_T = typing.TypeVar("_T") 

40 

41 

42class ReturnValueIgnoredError(Exception): 

43 # No longer used; was previously used by @return_future 

44 pass 

45 

46 

47Future = asyncio.Future 

48 

49FUTURES = (futures.Future, Future) 

50 

51 

52def is_future(x: Any) -> bool: 

53 return isinstance(x, FUTURES) 

54 

55 

56class DummyExecutor(futures.Executor): 

57 def submit( 

58 self, fn: Callable[..., _T], *args: Any, **kwargs: Any 

59 ) -> "futures.Future[_T]": 

60 future = futures.Future() # type: futures.Future[_T] 

61 try: 

62 future_set_result_unless_cancelled(future, fn(*args, **kwargs)) 

63 except Exception: 

64 future_set_exc_info(future, sys.exc_info()) 

65 return future 

66 

67 def shutdown(self, wait: bool = True) -> None: 

68 pass 

69 

70 

71dummy_executor = DummyExecutor() 

72 

73 

74def run_on_executor(*args: Any, **kwargs: Any) -> Callable: 

75 """Decorator to run a synchronous method asynchronously on an executor. 

76 

77 Returns a future. 

78 

79 The executor to be used is determined by the ``executor`` 

80 attributes of ``self``. To use a different attribute name, pass a 

81 keyword argument to the decorator:: 

82 

83 @run_on_executor(executor='_thread_pool') 

84 def foo(self): 

85 pass 

86 

87 This decorator should not be confused with the similarly-named 

88 `.IOLoop.run_in_executor`. In general, using ``run_in_executor`` 

89 when *calling* a blocking method is recommended instead of using 

90 this decorator when *defining* a method. If compatibility with older 

91 versions of Tornado is required, consider defining an executor 

92 and using ``executor.submit()`` at the call site. 

93 

94 .. versionchanged:: 4.2 

95 Added keyword arguments to use alternative attributes. 

96 

97 .. versionchanged:: 5.0 

98 Always uses the current IOLoop instead of ``self.io_loop``. 

99 

100 .. versionchanged:: 5.1 

101 Returns a `.Future` compatible with ``await`` instead of a 

102 `concurrent.futures.Future`. 

103 

104 .. deprecated:: 5.1 

105 

106 The ``callback`` argument is deprecated and will be removed in 

107 6.0. The decorator itself is discouraged in new code but will 

108 not be removed in 6.0. 

109 

110 .. versionchanged:: 6.0 

111 

112 The ``callback`` argument was removed. 

113 """ 

114 # Fully type-checking decorators is tricky, and this one is 

115 # discouraged anyway so it doesn't have all the generic magic. 

116 def run_on_executor_decorator(fn: Callable) -> Callable[..., Future]: 

117 executor = kwargs.get("executor", "executor") 

118 

119 @functools.wraps(fn) 

120 def wrapper(self: Any, *args: Any, **kwargs: Any) -> Future: 

121 async_future = Future() # type: Future 

122 conc_future = getattr(self, executor).submit(fn, self, *args, **kwargs) 

123 chain_future(conc_future, async_future) 

124 return async_future 

125 

126 return wrapper 

127 

128 if args and kwargs: 

129 raise ValueError("cannot combine positional and keyword args") 

130 if len(args) == 1: 

131 return run_on_executor_decorator(args[0]) 

132 elif len(args) != 0: 

133 raise ValueError("expected 1 argument, got %d", len(args)) 

134 return run_on_executor_decorator 

135 

136 

137_NO_RESULT = object() 

138 

139 

140def chain_future(a: "Future[_T]", b: "Future[_T]") -> None: 

141 """Chain two futures together so that when one completes, so does the other. 

142 

143 The result (success or failure) of ``a`` will be copied to ``b``, unless 

144 ``b`` has already been completed or cancelled by the time ``a`` finishes. 

145 

146 .. versionchanged:: 5.0 

147 

148 Now accepts both Tornado/asyncio `Future` objects and 

149 `concurrent.futures.Future`. 

150 

151 """ 

152 

153 def copy(future: "Future[_T]") -> None: 

154 assert future is a 

155 if b.done(): 

156 return 

157 if hasattr(a, "exc_info") and a.exc_info() is not None: # type: ignore 

158 future_set_exc_info(b, a.exc_info()) # type: ignore 

159 else: 

160 a_exc = a.exception() 

161 if a_exc is not None: 

162 b.set_exception(a_exc) 

163 else: 

164 b.set_result(a.result()) 

165 

166 if isinstance(a, Future): 

167 future_add_done_callback(a, copy) 

168 else: 

169 # concurrent.futures.Future 

170 from tornado.ioloop import IOLoop 

171 

172 IOLoop.current().add_future(a, copy) 

173 

174 

175def future_set_result_unless_cancelled( 

176 future: "Union[futures.Future[_T], Future[_T]]", value: _T 

177) -> None: 

178 """Set the given ``value`` as the `Future`'s result, if not cancelled. 

179 

180 Avoids ``asyncio.InvalidStateError`` when calling ``set_result()`` on 

181 a cancelled `asyncio.Future`. 

182 

183 .. versionadded:: 5.0 

184 """ 

185 if not future.cancelled(): 

186 future.set_result(value) 

187 

188 

189def future_set_exception_unless_cancelled( 

190 future: "Union[futures.Future[_T], Future[_T]]", exc: BaseException 

191) -> None: 

192 """Set the given ``exc`` as the `Future`'s exception. 

193 

194 If the Future is already canceled, logs the exception instead. If 

195 this logging is not desired, the caller should explicitly check 

196 the state of the Future and call ``Future.set_exception`` instead of 

197 this wrapper. 

198 

199 Avoids ``asyncio.InvalidStateError`` when calling ``set_exception()`` on 

200 a cancelled `asyncio.Future`. 

201 

202 .. versionadded:: 6.0 

203 

204 """ 

205 if not future.cancelled(): 

206 future.set_exception(exc) 

207 else: 

208 app_log.error("Exception after Future was cancelled", exc_info=exc) 

209 

210 

211def future_set_exc_info( 

212 future: "Union[futures.Future[_T], Future[_T]]", 

213 exc_info: Tuple[ 

214 Optional[type], Optional[BaseException], Optional[types.TracebackType] 

215 ], 

216) -> None: 

217 """Set the given ``exc_info`` as the `Future`'s exception. 

218 

219 Understands both `asyncio.Future` and the extensions in older 

220 versions of Tornado to enable better tracebacks on Python 2. 

221 

222 .. versionadded:: 5.0 

223 

224 .. versionchanged:: 6.0 

225 

226 If the future is already cancelled, this function is a no-op. 

227 (previously ``asyncio.InvalidStateError`` would be raised) 

228 

229 """ 

230 if exc_info[1] is None: 

231 raise Exception("future_set_exc_info called with no exception") 

232 future_set_exception_unless_cancelled(future, exc_info[1]) 

233 

234 

235@typing.overload 

236def future_add_done_callback( 

237 future: "futures.Future[_T]", callback: Callable[["futures.Future[_T]"], None] 

238) -> None: 

239 pass 

240 

241 

242@typing.overload # noqa: F811 

243def future_add_done_callback( 

244 future: "Future[_T]", callback: Callable[["Future[_T]"], None] 

245) -> None: 

246 pass 

247 

248 

249def future_add_done_callback( # noqa: F811 

250 future: "Union[futures.Future[_T], Future[_T]]", callback: Callable[..., None] 

251) -> None: 

252 """Arrange to call ``callback`` when ``future`` is complete. 

253 

254 ``callback`` is invoked with one argument, the ``future``. 

255 

256 If ``future`` is already done, ``callback`` is invoked immediately. 

257 This may differ from the behavior of ``Future.add_done_callback``, 

258 which makes no such guarantee. 

259 

260 .. versionadded:: 5.0 

261 """ 

262 if future.done(): 

263 callback(future) 

264 else: 

265 future.add_done_callback(callback)