Coverage for /pythoncovmergedfiles/medio/medio/usr/lib/python3.9/asyncio/futures.py: 26%

220 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-25 06:05 +0000

1"""A Future class similar to the one in PEP 3148.""" 

2 

3__all__ = ( 

4 'Future', 'wrap_future', 'isfuture', 

5) 

6 

7import concurrent.futures 

8import contextvars 

9import logging 

10import sys 

11 

12from . import base_futures 

13from . import events 

14from . import exceptions 

15from . import format_helpers 

16 

17 

18isfuture = base_futures.isfuture 

19 

20 

21_PENDING = base_futures._PENDING 

22_CANCELLED = base_futures._CANCELLED 

23_FINISHED = base_futures._FINISHED 

24 

25 

26STACK_DEBUG = logging.DEBUG - 1 # heavy-duty debugging 

27 

28 

29class Future: 

30 """This class is *almost* compatible with concurrent.futures.Future. 

31 

32 Differences: 

33 

34 - This class is not thread-safe. 

35 

36 - result() and exception() do not take a timeout argument and 

37 raise an exception when the future isn't done yet. 

38 

39 - Callbacks registered with add_done_callback() are always called 

40 via the event loop's call_soon(). 

41 

42 - This class is not compatible with the wait() and as_completed() 

43 methods in the concurrent.futures package. 

44 

45 (In Python 3.4 or later we may be able to unify the implementations.) 

46 """ 

47 

48 # Class variables serving as defaults for instance variables. 

49 _state = _PENDING 

50 _result = None 

51 _exception = None 

52 _loop = None 

53 _source_traceback = None 

54 _cancel_message = None 

55 # A saved CancelledError for later chaining as an exception context. 

56 _cancelled_exc = None 

57 

58 # This field is used for a dual purpose: 

59 # - Its presence is a marker to declare that a class implements 

60 # the Future protocol (i.e. is intended to be duck-type compatible). 

61 # The value must also be not-None, to enable a subclass to declare 

62 # that it is not compatible by setting this to None. 

63 # - It is set by __iter__() below so that Task._step() can tell 

64 # the difference between 

65 # `await Future()` or`yield from Future()` (correct) vs. 

66 # `yield Future()` (incorrect). 

67 _asyncio_future_blocking = False 

68 

69 __log_traceback = False 

70 

71 def __init__(self, *, loop=None): 

72 """Initialize the future. 

73 

74 The optional event_loop argument allows explicitly setting the event 

75 loop object used by the future. If it's not provided, the future uses 

76 the default event loop. 

77 """ 

78 if loop is None: 

79 self._loop = events.get_event_loop() 

80 else: 

81 self._loop = loop 

82 self._callbacks = [] 

83 if self._loop.get_debug(): 

84 self._source_traceback = format_helpers.extract_stack( 

85 sys._getframe(1)) 

86 

87 _repr_info = base_futures._future_repr_info 

88 

89 def __repr__(self): 

90 return '<{} {}>'.format(self.__class__.__name__, 

91 ' '.join(self._repr_info())) 

92 

93 def __del__(self): 

94 if not self.__log_traceback: 

95 # set_exception() was not called, or result() or exception() 

96 # has consumed the exception 

97 return 

98 exc = self._exception 

99 context = { 

100 'message': 

101 f'{self.__class__.__name__} exception was never retrieved', 

102 'exception': exc, 

103 'future': self, 

104 } 

105 if self._source_traceback: 

106 context['source_traceback'] = self._source_traceback 

107 self._loop.call_exception_handler(context) 

108 

109 def __class_getitem__(cls, type): 

110 return cls 

111 

112 @property 

113 def _log_traceback(self): 

114 return self.__log_traceback 

115 

116 @_log_traceback.setter 

117 def _log_traceback(self, val): 

118 if bool(val): 

119 raise ValueError('_log_traceback can only be set to False') 

120 self.__log_traceback = False 

121 

122 def get_loop(self): 

123 """Return the event loop the Future is bound to.""" 

124 loop = self._loop 

125 if loop is None: 

126 raise RuntimeError("Future object is not initialized.") 

127 return loop 

128 

129 def _make_cancelled_error(self): 

130 """Create the CancelledError to raise if the Future is cancelled. 

131 

132 This should only be called once when handling a cancellation since 

133 it erases the saved context exception value. 

134 """ 

135 if self._cancel_message is None: 

136 exc = exceptions.CancelledError() 

137 else: 

138 exc = exceptions.CancelledError(self._cancel_message) 

139 exc.__context__ = self._cancelled_exc 

140 # Remove the reference since we don't need this anymore. 

141 self._cancelled_exc = None 

142 return exc 

143 

144 def cancel(self, msg=None): 

145 """Cancel the future and schedule callbacks. 

146 

147 If the future is already done or cancelled, return False. Otherwise, 

148 change the future's state to cancelled, schedule the callbacks and 

149 return True. 

150 """ 

151 self.__log_traceback = False 

152 if self._state != _PENDING: 

153 return False 

154 self._state = _CANCELLED 

155 self._cancel_message = msg 

156 self.__schedule_callbacks() 

157 return True 

158 

159 def __schedule_callbacks(self): 

160 """Internal: Ask the event loop to call all callbacks. 

161 

162 The callbacks are scheduled to be called as soon as possible. Also 

163 clears the callback list. 

164 """ 

165 callbacks = self._callbacks[:] 

166 if not callbacks: 

167 return 

168 

169 self._callbacks[:] = [] 

170 for callback, ctx in callbacks: 

171 self._loop.call_soon(callback, self, context=ctx) 

172 

173 def cancelled(self): 

174 """Return True if the future was cancelled.""" 

175 return self._state == _CANCELLED 

176 

177 # Don't implement running(); see http://bugs.python.org/issue18699 

178 

179 def done(self): 

180 """Return True if the future is done. 

181 

182 Done means either that a result / exception are available, or that the 

183 future was cancelled. 

184 """ 

185 return self._state != _PENDING 

186 

187 def result(self): 

188 """Return the result this future represents. 

189 

190 If the future has been cancelled, raises CancelledError. If the 

191 future's result isn't yet available, raises InvalidStateError. If 

192 the future is done and has an exception set, this exception is raised. 

193 """ 

194 if self._state == _CANCELLED: 

195 exc = self._make_cancelled_error() 

196 raise exc 

197 if self._state != _FINISHED: 

198 raise exceptions.InvalidStateError('Result is not ready.') 

199 self.__log_traceback = False 

200 if self._exception is not None: 

201 raise self._exception 

202 return self._result 

203 

204 def exception(self): 

205 """Return the exception that was set on this future. 

206 

207 The exception (or None if no exception was set) is returned only if 

208 the future is done. If the future has been cancelled, raises 

209 CancelledError. If the future isn't done yet, raises 

210 InvalidStateError. 

211 """ 

212 if self._state == _CANCELLED: 

213 exc = self._make_cancelled_error() 

214 raise exc 

215 if self._state != _FINISHED: 

216 raise exceptions.InvalidStateError('Exception is not set.') 

217 self.__log_traceback = False 

218 return self._exception 

219 

220 def add_done_callback(self, fn, *, context=None): 

221 """Add a callback to be run when the future becomes done. 

222 

223 The callback is called with a single argument - the future object. If 

224 the future is already done when this is called, the callback is 

225 scheduled with call_soon. 

226 """ 

227 if self._state != _PENDING: 

228 self._loop.call_soon(fn, self, context=context) 

229 else: 

230 if context is None: 

231 context = contextvars.copy_context() 

232 self._callbacks.append((fn, context)) 

233 

234 # New method not in PEP 3148. 

235 

236 def remove_done_callback(self, fn): 

237 """Remove all instances of a callback from the "call when done" list. 

238 

239 Returns the number of callbacks removed. 

240 """ 

241 filtered_callbacks = [(f, ctx) 

242 for (f, ctx) in self._callbacks 

243 if f != fn] 

244 removed_count = len(self._callbacks) - len(filtered_callbacks) 

245 if removed_count: 

246 self._callbacks[:] = filtered_callbacks 

247 return removed_count 

248 

249 # So-called internal methods (note: no set_running_or_notify_cancel()). 

250 

251 def set_result(self, result): 

252 """Mark the future done and set its result. 

253 

254 If the future is already done when this method is called, raises 

255 InvalidStateError. 

256 """ 

257 if self._state != _PENDING: 

258 raise exceptions.InvalidStateError(f'{self._state}: {self!r}') 

259 self._result = result 

260 self._state = _FINISHED 

261 self.__schedule_callbacks() 

262 

263 def set_exception(self, exception): 

264 """Mark the future done and set an exception. 

265 

266 If the future is already done when this method is called, raises 

267 InvalidStateError. 

268 """ 

269 if self._state != _PENDING: 

270 raise exceptions.InvalidStateError(f'{self._state}: {self!r}') 

271 if isinstance(exception, type): 

272 exception = exception() 

273 if type(exception) is StopIteration: 

274 raise TypeError("StopIteration interacts badly with generators " 

275 "and cannot be raised into a Future") 

276 self._exception = exception 

277 self._state = _FINISHED 

278 self.__schedule_callbacks() 

279 self.__log_traceback = True 

280 

281 def __await__(self): 

282 if not self.done(): 

283 self._asyncio_future_blocking = True 

284 yield self # This tells Task to wait for completion. 

285 if not self.done(): 

286 raise RuntimeError("await wasn't used with future") 

287 return self.result() # May raise too. 

288 

289 __iter__ = __await__ # make compatible with 'yield from'. 

290 

291 

292# Needed for testing purposes. 

293_PyFuture = Future 

294 

295 

296def _get_loop(fut): 

297 # Tries to call Future.get_loop() if it's available. 

298 # Otherwise fallbacks to using the old '_loop' property. 

299 try: 

300 get_loop = fut.get_loop 

301 except AttributeError: 

302 pass 

303 else: 

304 return get_loop() 

305 return fut._loop 

306 

307 

308def _set_result_unless_cancelled(fut, result): 

309 """Helper setting the result only if the future was not cancelled.""" 

310 if fut.cancelled(): 

311 return 

312 fut.set_result(result) 

313 

314 

315def _convert_future_exc(exc): 

316 exc_class = type(exc) 

317 if exc_class is concurrent.futures.CancelledError: 

318 return exceptions.CancelledError(*exc.args) 

319 elif exc_class is concurrent.futures.TimeoutError: 

320 return exceptions.TimeoutError(*exc.args) 

321 elif exc_class is concurrent.futures.InvalidStateError: 

322 return exceptions.InvalidStateError(*exc.args) 

323 else: 

324 return exc 

325 

326 

327def _set_concurrent_future_state(concurrent, source): 

328 """Copy state from a future to a concurrent.futures.Future.""" 

329 assert source.done() 

330 if source.cancelled(): 

331 concurrent.cancel() 

332 if not concurrent.set_running_or_notify_cancel(): 

333 return 

334 exception = source.exception() 

335 if exception is not None: 

336 concurrent.set_exception(_convert_future_exc(exception)) 

337 else: 

338 result = source.result() 

339 concurrent.set_result(result) 

340 

341 

342def _copy_future_state(source, dest): 

343 """Internal helper to copy state from another Future. 

344 

345 The other Future may be a concurrent.futures.Future. 

346 """ 

347 assert source.done() 

348 if dest.cancelled(): 

349 return 

350 assert not dest.done() 

351 if source.cancelled(): 

352 dest.cancel() 

353 else: 

354 exception = source.exception() 

355 if exception is not None: 

356 dest.set_exception(_convert_future_exc(exception)) 

357 else: 

358 result = source.result() 

359 dest.set_result(result) 

360 

361 

362def _chain_future(source, destination): 

363 """Chain two futures so that when one completes, so does the other. 

364 

365 The result (or exception) of source will be copied to destination. 

366 If destination is cancelled, source gets cancelled too. 

367 Compatible with both asyncio.Future and concurrent.futures.Future. 

368 """ 

369 if not isfuture(source) and not isinstance(source, 

370 concurrent.futures.Future): 

371 raise TypeError('A future is required for source argument') 

372 if not isfuture(destination) and not isinstance(destination, 

373 concurrent.futures.Future): 

374 raise TypeError('A future is required for destination argument') 

375 source_loop = _get_loop(source) if isfuture(source) else None 

376 dest_loop = _get_loop(destination) if isfuture(destination) else None 

377 

378 def _set_state(future, other): 

379 if isfuture(future): 

380 _copy_future_state(other, future) 

381 else: 

382 _set_concurrent_future_state(future, other) 

383 

384 def _call_check_cancel(destination): 

385 if destination.cancelled(): 

386 if source_loop is None or source_loop is dest_loop: 

387 source.cancel() 

388 else: 

389 source_loop.call_soon_threadsafe(source.cancel) 

390 

391 def _call_set_state(source): 

392 if (destination.cancelled() and 

393 dest_loop is not None and dest_loop.is_closed()): 

394 return 

395 if dest_loop is None or dest_loop is source_loop: 

396 _set_state(destination, source) 

397 else: 

398 dest_loop.call_soon_threadsafe(_set_state, destination, source) 

399 

400 destination.add_done_callback(_call_check_cancel) 

401 source.add_done_callback(_call_set_state) 

402 

403 

404def wrap_future(future, *, loop=None): 

405 """Wrap concurrent.futures.Future object.""" 

406 if isfuture(future): 

407 return future 

408 assert isinstance(future, concurrent.futures.Future), \ 

409 f'concurrent.futures.Future is expected, got {future!r}' 

410 if loop is None: 

411 loop = events.get_event_loop() 

412 new_future = loop.create_future() 

413 _chain_future(future, new_future) 

414 return new_future 

415 

416 

417try: 

418 import _asyncio 

419except ImportError: 

420 pass 

421else: 

422 # _CFuture is needed for tests. 

423 Future = _CFuture = _asyncio.Future