Coverage for /pythoncovmergedfiles/medio/medio/usr/lib/python3.9/asyncio/tasks.py: 17%

476 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-25 06:05 +0000

1"""Support for tasks, coroutines and the scheduler.""" 

2 

3__all__ = ( 

4 'Task', 'create_task', 

5 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED', 

6 'wait', 'wait_for', 'as_completed', 'sleep', 

7 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe', 

8 'current_task', 'all_tasks', 

9 '_register_task', '_unregister_task', '_enter_task', '_leave_task', 

10) 

11 

12import concurrent.futures 

13import contextvars 

14import functools 

15import inspect 

16import itertools 

17import types 

18import warnings 

19import weakref 

20 

21from . import base_tasks 

22from . import coroutines 

23from . import events 

24from . import exceptions 

25from . import futures 

26from .coroutines import _is_coroutine 

27 

28# Helper to generate new task names 

29# This uses itertools.count() instead of a "+= 1" operation because the latter 

30# is not thread safe. See bpo-11866 for a longer explanation. 

31_task_name_counter = itertools.count(1).__next__ 

32 

33 

34def current_task(loop=None): 

35 """Return a currently executed task.""" 

36 if loop is None: 

37 loop = events.get_running_loop() 

38 return _current_tasks.get(loop) 

39 

40 

41def all_tasks(loop=None): 

42 """Return a set of all tasks for the loop.""" 

43 if loop is None: 

44 loop = events.get_running_loop() 

45 # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another 

46 # thread while we do so. Therefore we cast it to list prior to filtering. The list 

47 # cast itself requires iteration, so we repeat it several times ignoring 

48 # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for 

49 # details. 

50 i = 0 

51 while True: 

52 try: 

53 tasks = list(_all_tasks) 

54 except RuntimeError: 

55 i += 1 

56 if i >= 1000: 

57 raise 

58 else: 

59 break 

60 return {t for t in tasks 

61 if futures._get_loop(t) is loop and not t.done()} 

62 

63 

64def _all_tasks_compat(loop=None): 

65 # Different from "all_task()" by returning *all* Tasks, including 

66 # the completed ones. Used to implement deprecated "Tasks.all_task()" 

67 # method. 

68 if loop is None: 

69 loop = events.get_event_loop() 

70 # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another 

71 # thread while we do so. Therefore we cast it to list prior to filtering. The list 

72 # cast itself requires iteration, so we repeat it several times ignoring 

73 # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for 

74 # details. 

75 i = 0 

76 while True: 

77 try: 

78 tasks = list(_all_tasks) 

79 except RuntimeError: 

80 i += 1 

81 if i >= 1000: 

82 raise 

83 else: 

84 break 

85 return {t for t in tasks if futures._get_loop(t) is loop} 

86 

87 

88def _set_task_name(task, name): 

89 if name is not None: 

90 try: 

91 set_name = task.set_name 

92 except AttributeError: 

93 pass 

94 else: 

95 set_name(name) 

96 

97 

98class Task(futures._PyFuture): # Inherit Python Task implementation 

99 # from a Python Future implementation. 

100 

101 """A coroutine wrapped in a Future.""" 

102 

103 # An important invariant maintained while a Task not done: 

104 # 

105 # - Either _fut_waiter is None, and _step() is scheduled; 

106 # - or _fut_waiter is some Future, and _step() is *not* scheduled. 

107 # 

108 # The only transition from the latter to the former is through 

109 # _wakeup(). When _fut_waiter is not None, one of its callbacks 

110 # must be _wakeup(). 

111 

112 # If False, don't log a message if the task is destroyed whereas its 

113 # status is still pending 

114 _log_destroy_pending = True 

115 

116 def __init__(self, coro, *, loop=None, name=None): 

117 super().__init__(loop=loop) 

118 if self._source_traceback: 

119 del self._source_traceback[-1] 

120 if not coroutines.iscoroutine(coro): 

121 # raise after Future.__init__(), attrs are required for __del__ 

122 # prevent logging for pending task in __del__ 

123 self._log_destroy_pending = False 

124 raise TypeError(f"a coroutine was expected, got {coro!r}") 

125 

126 if name is None: 

127 self._name = f'Task-{_task_name_counter()}' 

128 else: 

129 self._name = str(name) 

130 

131 self._must_cancel = False 

132 self._fut_waiter = None 

133 self._coro = coro 

134 self._context = contextvars.copy_context() 

135 

136 self._loop.call_soon(self.__step, context=self._context) 

137 _register_task(self) 

138 

139 def __del__(self): 

140 if self._state == futures._PENDING and self._log_destroy_pending: 

141 context = { 

142 'task': self, 

143 'message': 'Task was destroyed but it is pending!', 

144 } 

145 if self._source_traceback: 

146 context['source_traceback'] = self._source_traceback 

147 self._loop.call_exception_handler(context) 

148 super().__del__() 

149 

150 def __class_getitem__(cls, type): 

151 return cls 

152 

153 def _repr_info(self): 

154 return base_tasks._task_repr_info(self) 

155 

156 def get_coro(self): 

157 return self._coro 

158 

159 def get_name(self): 

160 return self._name 

161 

162 def set_name(self, value): 

163 self._name = str(value) 

164 

165 def set_result(self, result): 

166 raise RuntimeError('Task does not support set_result operation') 

167 

168 def set_exception(self, exception): 

169 raise RuntimeError('Task does not support set_exception operation') 

170 

171 def get_stack(self, *, limit=None): 

172 """Return the list of stack frames for this task's coroutine. 

173 

174 If the coroutine is not done, this returns the stack where it is 

175 suspended. If the coroutine has completed successfully or was 

176 cancelled, this returns an empty list. If the coroutine was 

177 terminated by an exception, this returns the list of traceback 

178 frames. 

179 

180 The frames are always ordered from oldest to newest. 

181 

182 The optional limit gives the maximum number of frames to 

183 return; by default all available frames are returned. Its 

184 meaning differs depending on whether a stack or a traceback is 

185 returned: the newest frames of a stack are returned, but the 

186 oldest frames of a traceback are returned. (This matches the 

187 behavior of the traceback module.) 

188 

189 For reasons beyond our control, only one stack frame is 

190 returned for a suspended coroutine. 

191 """ 

192 return base_tasks._task_get_stack(self, limit) 

193 

194 def print_stack(self, *, limit=None, file=None): 

195 """Print the stack or traceback for this task's coroutine. 

196 

197 This produces output similar to that of the traceback module, 

198 for the frames retrieved by get_stack(). The limit argument 

199 is passed to get_stack(). The file argument is an I/O stream 

200 to which the output is written; by default output is written 

201 to sys.stderr. 

202 """ 

203 return base_tasks._task_print_stack(self, limit, file) 

204 

205 def cancel(self, msg=None): 

206 """Request that this task cancel itself. 

207 

208 This arranges for a CancelledError to be thrown into the 

209 wrapped coroutine on the next cycle through the event loop. 

210 The coroutine then has a chance to clean up or even deny 

211 the request using try/except/finally. 

212 

213 Unlike Future.cancel, this does not guarantee that the 

214 task will be cancelled: the exception might be caught and 

215 acted upon, delaying cancellation of the task or preventing 

216 cancellation completely. The task may also return a value or 

217 raise a different exception. 

218 

219 Immediately after this method is called, Task.cancelled() will 

220 not return True (unless the task was already cancelled). A 

221 task will be marked as cancelled when the wrapped coroutine 

222 terminates with a CancelledError exception (even if cancel() 

223 was not called). 

224 """ 

225 self._log_traceback = False 

226 if self.done(): 

227 return False 

228 if self._fut_waiter is not None: 

229 if self._fut_waiter.cancel(msg=msg): 

230 # Leave self._fut_waiter; it may be a Task that 

231 # catches and ignores the cancellation so we may have 

232 # to cancel it again later. 

233 return True 

234 # It must be the case that self.__step is already scheduled. 

235 self._must_cancel = True 

236 self._cancel_message = msg 

237 return True 

238 

239 def __step(self, exc=None): 

240 if self.done(): 

241 raise exceptions.InvalidStateError( 

242 f'_step(): already done: {self!r}, {exc!r}') 

243 if self._must_cancel: 

244 if not isinstance(exc, exceptions.CancelledError): 

245 exc = self._make_cancelled_error() 

246 self._must_cancel = False 

247 coro = self._coro 

248 self._fut_waiter = None 

249 

250 _enter_task(self._loop, self) 

251 # Call either coro.throw(exc) or coro.send(None). 

252 try: 

253 if exc is None: 

254 # We use the `send` method directly, because coroutines 

255 # don't have `__iter__` and `__next__` methods. 

256 result = coro.send(None) 

257 else: 

258 result = coro.throw(exc) 

259 except StopIteration as exc: 

260 if self._must_cancel: 

261 # Task is cancelled right before coro stops. 

262 self._must_cancel = False 

263 super().cancel(msg=self._cancel_message) 

264 else: 

265 super().set_result(exc.value) 

266 except exceptions.CancelledError as exc: 

267 # Save the original exception so we can chain it later. 

268 self._cancelled_exc = exc 

269 super().cancel() # I.e., Future.cancel(self). 

270 except (KeyboardInterrupt, SystemExit) as exc: 

271 super().set_exception(exc) 

272 raise 

273 except BaseException as exc: 

274 super().set_exception(exc) 

275 else: 

276 blocking = getattr(result, '_asyncio_future_blocking', None) 

277 if blocking is not None: 

278 # Yielded Future must come from Future.__iter__(). 

279 if futures._get_loop(result) is not self._loop: 

280 new_exc = RuntimeError( 

281 f'Task {self!r} got Future ' 

282 f'{result!r} attached to a different loop') 

283 self._loop.call_soon( 

284 self.__step, new_exc, context=self._context) 

285 elif blocking: 

286 if result is self: 

287 new_exc = RuntimeError( 

288 f'Task cannot await on itself: {self!r}') 

289 self._loop.call_soon( 

290 self.__step, new_exc, context=self._context) 

291 else: 

292 result._asyncio_future_blocking = False 

293 result.add_done_callback( 

294 self.__wakeup, context=self._context) 

295 self._fut_waiter = result 

296 if self._must_cancel: 

297 if self._fut_waiter.cancel( 

298 msg=self._cancel_message): 

299 self._must_cancel = False 

300 else: 

301 new_exc = RuntimeError( 

302 f'yield was used instead of yield from ' 

303 f'in task {self!r} with {result!r}') 

304 self._loop.call_soon( 

305 self.__step, new_exc, context=self._context) 

306 

307 elif result is None: 

308 # Bare yield relinquishes control for one event loop iteration. 

309 self._loop.call_soon(self.__step, context=self._context) 

310 elif inspect.isgenerator(result): 

311 # Yielding a generator is just wrong. 

312 new_exc = RuntimeError( 

313 f'yield was used instead of yield from for ' 

314 f'generator in task {self!r} with {result!r}') 

315 self._loop.call_soon( 

316 self.__step, new_exc, context=self._context) 

317 else: 

318 # Yielding something else is an error. 

319 new_exc = RuntimeError(f'Task got bad yield: {result!r}') 

320 self._loop.call_soon( 

321 self.__step, new_exc, context=self._context) 

322 finally: 

323 _leave_task(self._loop, self) 

324 self = None # Needed to break cycles when an exception occurs. 

325 

326 def __wakeup(self, future): 

327 try: 

328 future.result() 

329 except BaseException as exc: 

330 # This may also be a cancellation. 

331 self.__step(exc) 

332 else: 

333 # Don't pass the value of `future.result()` explicitly, 

334 # as `Future.__iter__` and `Future.__await__` don't need it. 

335 # If we call `_step(value, None)` instead of `_step()`, 

336 # Python eval loop would use `.send(value)` method call, 

337 # instead of `__next__()`, which is slower for futures 

338 # that return non-generator iterators from their `__iter__`. 

339 self.__step() 

340 self = None # Needed to break cycles when an exception occurs. 

341 

342 

343_PyTask = Task 

344 

345 

346try: 

347 import _asyncio 

348except ImportError: 

349 pass 

350else: 

351 # _CTask is needed for tests. 

352 Task = _CTask = _asyncio.Task 

353 

354 

355def create_task(coro, *, name=None): 

356 """Schedule the execution of a coroutine object in a spawn task. 

357 

358 Return a Task object. 

359 """ 

360 loop = events.get_running_loop() 

361 task = loop.create_task(coro) 

362 _set_task_name(task, name) 

363 return task 

364 

365 

366# wait() and as_completed() similar to those in PEP 3148. 

367 

368FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED 

369FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION 

370ALL_COMPLETED = concurrent.futures.ALL_COMPLETED 

371 

372 

373async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED): 

374 """Wait for the Futures and coroutines given by fs to complete. 

375 

376 The fs iterable must not be empty. 

377 

378 Coroutines will be wrapped in Tasks. 

379 

380 Returns two sets of Future: (done, pending). 

381 

382 Usage: 

383 

384 done, pending = await asyncio.wait(fs) 

385 

386 Note: This does not raise TimeoutError! Futures that aren't done 

387 when the timeout occurs are returned in the second set. 

388 """ 

389 if futures.isfuture(fs) or coroutines.iscoroutine(fs): 

390 raise TypeError(f"expect a list of futures, not {type(fs).__name__}") 

391 if not fs: 

392 raise ValueError('Set of coroutines/Futures is empty.') 

393 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED): 

394 raise ValueError(f'Invalid return_when value: {return_when}') 

395 

396 if loop is None: 

397 loop = events.get_running_loop() 

398 else: 

399 warnings.warn("The loop argument is deprecated since Python 3.8, " 

400 "and scheduled for removal in Python 3.10.", 

401 DeprecationWarning, stacklevel=2) 

402 

403 fs = set(fs) 

404 

405 if any(coroutines.iscoroutine(f) for f in fs): 

406 warnings.warn("The explicit passing of coroutine objects to " 

407 "asyncio.wait() is deprecated since Python 3.8, and " 

408 "scheduled for removal in Python 3.11.", 

409 DeprecationWarning, stacklevel=2) 

410 

411 fs = {ensure_future(f, loop=loop) for f in fs} 

412 

413 return await _wait(fs, timeout, return_when, loop) 

414 

415 

416def _release_waiter(waiter, *args): 

417 if not waiter.done(): 

418 waiter.set_result(None) 

419 

420 

421async def wait_for(fut, timeout, *, loop=None): 

422 """Wait for the single Future or coroutine to complete, with timeout. 

423 

424 Coroutine will be wrapped in Task. 

425 

426 Returns result of the Future or coroutine. When a timeout occurs, 

427 it cancels the task and raises TimeoutError. To avoid the task 

428 cancellation, wrap it in shield(). 

429 

430 If the wait is cancelled, the task is also cancelled. 

431 

432 This function is a coroutine. 

433 """ 

434 if loop is None: 

435 loop = events.get_running_loop() 

436 else: 

437 warnings.warn("The loop argument is deprecated since Python 3.8, " 

438 "and scheduled for removal in Python 3.10.", 

439 DeprecationWarning, stacklevel=2) 

440 

441 if timeout is None: 

442 return await fut 

443 

444 if timeout <= 0: 

445 fut = ensure_future(fut, loop=loop) 

446 

447 if fut.done(): 

448 return fut.result() 

449 

450 await _cancel_and_wait(fut, loop=loop) 

451 try: 

452 fut.result() 

453 except exceptions.CancelledError as exc: 

454 raise exceptions.TimeoutError() from exc 

455 else: 

456 raise exceptions.TimeoutError() 

457 

458 waiter = loop.create_future() 

459 timeout_handle = loop.call_later(timeout, _release_waiter, waiter) 

460 cb = functools.partial(_release_waiter, waiter) 

461 

462 fut = ensure_future(fut, loop=loop) 

463 fut.add_done_callback(cb) 

464 

465 try: 

466 # wait until the future completes or the timeout 

467 try: 

468 await waiter 

469 except exceptions.CancelledError: 

470 if fut.done(): 

471 return fut.result() 

472 else: 

473 fut.remove_done_callback(cb) 

474 # We must ensure that the task is not running 

475 # after wait_for() returns. 

476 # See https://bugs.python.org/issue32751 

477 await _cancel_and_wait(fut, loop=loop) 

478 raise 

479 

480 if fut.done(): 

481 return fut.result() 

482 else: 

483 fut.remove_done_callback(cb) 

484 # We must ensure that the task is not running 

485 # after wait_for() returns. 

486 # See https://bugs.python.org/issue32751 

487 await _cancel_and_wait(fut, loop=loop) 

488 # In case task cancellation failed with some 

489 # exception, we should re-raise it 

490 # See https://bugs.python.org/issue40607 

491 try: 

492 fut.result() 

493 except exceptions.CancelledError as exc: 

494 raise exceptions.TimeoutError() from exc 

495 else: 

496 raise exceptions.TimeoutError() 

497 finally: 

498 timeout_handle.cancel() 

499 

500 

501async def _wait(fs, timeout, return_when, loop): 

502 """Internal helper for wait(). 

503 

504 The fs argument must be a collection of Futures. 

505 """ 

506 assert fs, 'Set of Futures is empty.' 

507 waiter = loop.create_future() 

508 timeout_handle = None 

509 if timeout is not None: 

510 timeout_handle = loop.call_later(timeout, _release_waiter, waiter) 

511 counter = len(fs) 

512 

513 def _on_completion(f): 

514 nonlocal counter 

515 counter -= 1 

516 if (counter <= 0 or 

517 return_when == FIRST_COMPLETED or 

518 return_when == FIRST_EXCEPTION and (not f.cancelled() and 

519 f.exception() is not None)): 

520 if timeout_handle is not None: 

521 timeout_handle.cancel() 

522 if not waiter.done(): 

523 waiter.set_result(None) 

524 

525 for f in fs: 

526 f.add_done_callback(_on_completion) 

527 

528 try: 

529 await waiter 

530 finally: 

531 if timeout_handle is not None: 

532 timeout_handle.cancel() 

533 for f in fs: 

534 f.remove_done_callback(_on_completion) 

535 

536 done, pending = set(), set() 

537 for f in fs: 

538 if f.done(): 

539 done.add(f) 

540 else: 

541 pending.add(f) 

542 return done, pending 

543 

544 

545async def _cancel_and_wait(fut, loop): 

546 """Cancel the *fut* future or task and wait until it completes.""" 

547 

548 waiter = loop.create_future() 

549 cb = functools.partial(_release_waiter, waiter) 

550 fut.add_done_callback(cb) 

551 

552 try: 

553 fut.cancel() 

554 # We cannot wait on *fut* directly to make 

555 # sure _cancel_and_wait itself is reliably cancellable. 

556 await waiter 

557 finally: 

558 fut.remove_done_callback(cb) 

559 

560 

561# This is *not* a @coroutine! It is just an iterator (yielding Futures). 

562def as_completed(fs, *, loop=None, timeout=None): 

563 """Return an iterator whose values are coroutines. 

564 

565 When waiting for the yielded coroutines you'll get the results (or 

566 exceptions!) of the original Futures (or coroutines), in the order 

567 in which and as soon as they complete. 

568 

569 This differs from PEP 3148; the proper way to use this is: 

570 

571 for f in as_completed(fs): 

572 result = await f # The 'await' may raise. 

573 # Use result. 

574 

575 If a timeout is specified, the 'await' will raise 

576 TimeoutError when the timeout occurs before all Futures are done. 

577 

578 Note: The futures 'f' are not necessarily members of fs. 

579 """ 

580 if futures.isfuture(fs) or coroutines.iscoroutine(fs): 

581 raise TypeError(f"expect an iterable of futures, not {type(fs).__name__}") 

582 

583 from .queues import Queue # Import here to avoid circular import problem. 

584 done = Queue(loop=loop) 

585 

586 if loop is None: 

587 loop = events.get_event_loop() 

588 else: 

589 warnings.warn("The loop argument is deprecated since Python 3.8, " 

590 "and scheduled for removal in Python 3.10.", 

591 DeprecationWarning, stacklevel=2) 

592 todo = {ensure_future(f, loop=loop) for f in set(fs)} 

593 timeout_handle = None 

594 

595 def _on_timeout(): 

596 for f in todo: 

597 f.remove_done_callback(_on_completion) 

598 done.put_nowait(None) # Queue a dummy value for _wait_for_one(). 

599 todo.clear() # Can't do todo.remove(f) in the loop. 

600 

601 def _on_completion(f): 

602 if not todo: 

603 return # _on_timeout() was here first. 

604 todo.remove(f) 

605 done.put_nowait(f) 

606 if not todo and timeout_handle is not None: 

607 timeout_handle.cancel() 

608 

609 async def _wait_for_one(): 

610 f = await done.get() 

611 if f is None: 

612 # Dummy value from _on_timeout(). 

613 raise exceptions.TimeoutError 

614 return f.result() # May raise f.exception(). 

615 

616 for f in todo: 

617 f.add_done_callback(_on_completion) 

618 if todo and timeout is not None: 

619 timeout_handle = loop.call_later(timeout, _on_timeout) 

620 for _ in range(len(todo)): 

621 yield _wait_for_one() 

622 

623 

624@types.coroutine 

625def __sleep0(): 

626 """Skip one event loop run cycle. 

627 

628 This is a private helper for 'asyncio.sleep()', used 

629 when the 'delay' is set to 0. It uses a bare 'yield' 

630 expression (which Task.__step knows how to handle) 

631 instead of creating a Future object. 

632 """ 

633 yield 

634 

635 

636async def sleep(delay, result=None, *, loop=None): 

637 """Coroutine that completes after a given time (in seconds).""" 

638 if delay <= 0: 

639 await __sleep0() 

640 return result 

641 

642 if loop is None: 

643 loop = events.get_running_loop() 

644 else: 

645 warnings.warn("The loop argument is deprecated since Python 3.8, " 

646 "and scheduled for removal in Python 3.10.", 

647 DeprecationWarning, stacklevel=2) 

648 

649 future = loop.create_future() 

650 h = loop.call_later(delay, 

651 futures._set_result_unless_cancelled, 

652 future, result) 

653 try: 

654 return await future 

655 finally: 

656 h.cancel() 

657 

658 

659def ensure_future(coro_or_future, *, loop=None): 

660 """Wrap a coroutine or an awaitable in a future. 

661 

662 If the argument is a Future, it is returned directly. 

663 """ 

664 if coroutines.iscoroutine(coro_or_future): 

665 if loop is None: 

666 loop = events.get_event_loop() 

667 task = loop.create_task(coro_or_future) 

668 if task._source_traceback: 

669 del task._source_traceback[-1] 

670 return task 

671 elif futures.isfuture(coro_or_future): 

672 if loop is not None and loop is not futures._get_loop(coro_or_future): 

673 raise ValueError('The future belongs to a different loop than ' 

674 'the one specified as the loop argument') 

675 return coro_or_future 

676 elif inspect.isawaitable(coro_or_future): 

677 return ensure_future(_wrap_awaitable(coro_or_future), loop=loop) 

678 else: 

679 raise TypeError('An asyncio.Future, a coroutine or an awaitable is ' 

680 'required') 

681 

682 

683@types.coroutine 

684def _wrap_awaitable(awaitable): 

685 """Helper for asyncio.ensure_future(). 

686 

687 Wraps awaitable (an object with __await__) into a coroutine 

688 that will later be wrapped in a Task by ensure_future(). 

689 """ 

690 return (yield from awaitable.__await__()) 

691 

692_wrap_awaitable._is_coroutine = _is_coroutine 

693 

694 

695class _GatheringFuture(futures.Future): 

696 """Helper for gather(). 

697 

698 This overrides cancel() to cancel all the children and act more 

699 like Task.cancel(), which doesn't immediately mark itself as 

700 cancelled. 

701 """ 

702 

703 def __init__(self, children, *, loop=None): 

704 super().__init__(loop=loop) 

705 self._children = children 

706 self._cancel_requested = False 

707 

708 def cancel(self, msg=None): 

709 if self.done(): 

710 return False 

711 ret = False 

712 for child in self._children: 

713 if child.cancel(msg=msg): 

714 ret = True 

715 if ret: 

716 # If any child tasks were actually cancelled, we should 

717 # propagate the cancellation request regardless of 

718 # *return_exceptions* argument. See issue 32684. 

719 self._cancel_requested = True 

720 return ret 

721 

722 

723def gather(*coros_or_futures, loop=None, return_exceptions=False): 

724 """Return a future aggregating results from the given coroutines/futures. 

725 

726 Coroutines will be wrapped in a future and scheduled in the event 

727 loop. They will not necessarily be scheduled in the same order as 

728 passed in. 

729 

730 All futures must share the same event loop. If all the tasks are 

731 done successfully, the returned future's result is the list of 

732 results (in the order of the original sequence, not necessarily 

733 the order of results arrival). If *return_exceptions* is True, 

734 exceptions in the tasks are treated the same as successful 

735 results, and gathered in the result list; otherwise, the first 

736 raised exception will be immediately propagated to the returned 

737 future. 

738 

739 Cancellation: if the outer Future is cancelled, all children (that 

740 have not completed yet) are also cancelled. If any child is 

741 cancelled, this is treated as if it raised CancelledError -- 

742 the outer Future is *not* cancelled in this case. (This is to 

743 prevent the cancellation of one child to cause other children to 

744 be cancelled.) 

745 

746 If *return_exceptions* is False, cancelling gather() after it 

747 has been marked done won't cancel any submitted awaitables. 

748 For instance, gather can be marked done after propagating an 

749 exception to the caller, therefore, calling ``gather.cancel()`` 

750 after catching an exception (raised by one of the awaitables) from 

751 gather won't cancel any other awaitables. 

752 """ 

753 if not coros_or_futures: 

754 if loop is None: 

755 loop = events.get_event_loop() 

756 else: 

757 warnings.warn("The loop argument is deprecated since Python 3.8, " 

758 "and scheduled for removal in Python 3.10.", 

759 DeprecationWarning, stacklevel=2) 

760 outer = loop.create_future() 

761 outer.set_result([]) 

762 return outer 

763 

764 def _done_callback(fut): 

765 nonlocal nfinished 

766 nfinished += 1 

767 

768 if outer.done(): 

769 if not fut.cancelled(): 

770 # Mark exception retrieved. 

771 fut.exception() 

772 return 

773 

774 if not return_exceptions: 

775 if fut.cancelled(): 

776 # Check if 'fut' is cancelled first, as 

777 # 'fut.exception()' will *raise* a CancelledError 

778 # instead of returning it. 

779 exc = fut._make_cancelled_error() 

780 outer.set_exception(exc) 

781 return 

782 else: 

783 exc = fut.exception() 

784 if exc is not None: 

785 outer.set_exception(exc) 

786 return 

787 

788 if nfinished == nfuts: 

789 # All futures are done; create a list of results 

790 # and set it to the 'outer' future. 

791 results = [] 

792 

793 for fut in children: 

794 if fut.cancelled(): 

795 # Check if 'fut' is cancelled first, as 'fut.exception()' 

796 # will *raise* a CancelledError instead of returning it. 

797 # Also, since we're adding the exception return value 

798 # to 'results' instead of raising it, don't bother 

799 # setting __context__. This also lets us preserve 

800 # calling '_make_cancelled_error()' at most once. 

801 res = exceptions.CancelledError( 

802 '' if fut._cancel_message is None else 

803 fut._cancel_message) 

804 else: 

805 res = fut.exception() 

806 if res is None: 

807 res = fut.result() 

808 results.append(res) 

809 

810 if outer._cancel_requested: 

811 # If gather is being cancelled we must propagate the 

812 # cancellation regardless of *return_exceptions* argument. 

813 # See issue 32684. 

814 exc = fut._make_cancelled_error() 

815 outer.set_exception(exc) 

816 else: 

817 outer.set_result(results) 

818 

819 arg_to_fut = {} 

820 children = [] 

821 nfuts = 0 

822 nfinished = 0 

823 for arg in coros_or_futures: 

824 if arg not in arg_to_fut: 

825 fut = ensure_future(arg, loop=loop) 

826 if loop is None: 

827 loop = futures._get_loop(fut) 

828 if fut is not arg: 

829 # 'arg' was not a Future, therefore, 'fut' is a new 

830 # Future created specifically for 'arg'. Since the caller 

831 # can't control it, disable the "destroy pending task" 

832 # warning. 

833 fut._log_destroy_pending = False 

834 

835 nfuts += 1 

836 arg_to_fut[arg] = fut 

837 fut.add_done_callback(_done_callback) 

838 

839 else: 

840 # There's a duplicate Future object in coros_or_futures. 

841 fut = arg_to_fut[arg] 

842 

843 children.append(fut) 

844 

845 outer = _GatheringFuture(children, loop=loop) 

846 return outer 

847 

848 

849def shield(arg, *, loop=None): 

850 """Wait for a future, shielding it from cancellation. 

851 

852 The statement 

853 

854 res = await shield(something()) 

855 

856 is exactly equivalent to the statement 

857 

858 res = await something() 

859 

860 *except* that if the coroutine containing it is cancelled, the 

861 task running in something() is not cancelled. From the POV of 

862 something(), the cancellation did not happen. But its caller is 

863 still cancelled, so the yield-from expression still raises 

864 CancelledError. Note: If something() is cancelled by other means 

865 this will still cancel shield(). 

866 

867 If you want to completely ignore cancellation (not recommended) 

868 you can combine shield() with a try/except clause, as follows: 

869 

870 try: 

871 res = await shield(something()) 

872 except CancelledError: 

873 res = None 

874 """ 

875 if loop is not None: 

876 warnings.warn("The loop argument is deprecated since Python 3.8, " 

877 "and scheduled for removal in Python 3.10.", 

878 DeprecationWarning, stacklevel=2) 

879 inner = ensure_future(arg, loop=loop) 

880 if inner.done(): 

881 # Shortcut. 

882 return inner 

883 loop = futures._get_loop(inner) 

884 outer = loop.create_future() 

885 

886 def _inner_done_callback(inner): 

887 if outer.cancelled(): 

888 if not inner.cancelled(): 

889 # Mark inner's result as retrieved. 

890 inner.exception() 

891 return 

892 

893 if inner.cancelled(): 

894 outer.cancel() 

895 else: 

896 exc = inner.exception() 

897 if exc is not None: 

898 outer.set_exception(exc) 

899 else: 

900 outer.set_result(inner.result()) 

901 

902 

903 def _outer_done_callback(outer): 

904 if not inner.done(): 

905 inner.remove_done_callback(_inner_done_callback) 

906 

907 inner.add_done_callback(_inner_done_callback) 

908 outer.add_done_callback(_outer_done_callback) 

909 return outer 

910 

911 

912def run_coroutine_threadsafe(coro, loop): 

913 """Submit a coroutine object to a given event loop. 

914 

915 Return a concurrent.futures.Future to access the result. 

916 """ 

917 if not coroutines.iscoroutine(coro): 

918 raise TypeError('A coroutine object is required') 

919 future = concurrent.futures.Future() 

920 

921 def callback(): 

922 try: 

923 futures._chain_future(ensure_future(coro, loop=loop), future) 

924 except (SystemExit, KeyboardInterrupt): 

925 raise 

926 except BaseException as exc: 

927 if future.set_running_or_notify_cancel(): 

928 future.set_exception(exc) 

929 raise 

930 

931 loop.call_soon_threadsafe(callback) 

932 return future 

933 

934 

935# WeakSet containing all alive tasks. 

936_all_tasks = weakref.WeakSet() 

937 

938# Dictionary containing tasks that are currently active in 

939# all running event loops. {EventLoop: Task} 

940_current_tasks = {} 

941 

942 

943def _register_task(task): 

944 """Register a new task in asyncio as executed by loop.""" 

945 _all_tasks.add(task) 

946 

947 

948def _enter_task(loop, task): 

949 current_task = _current_tasks.get(loop) 

950 if current_task is not None: 

951 raise RuntimeError(f"Cannot enter into task {task!r} while another " 

952 f"task {current_task!r} is being executed.") 

953 _current_tasks[loop] = task 

954 

955 

956def _leave_task(loop, task): 

957 current_task = _current_tasks.get(loop) 

958 if current_task is not task: 

959 raise RuntimeError(f"Leaving task {task!r} does not match " 

960 f"the current task {current_task!r}.") 

961 del _current_tasks[loop] 

962 

963 

964def _unregister_task(task): 

965 """Unregister a task.""" 

966 _all_tasks.discard(task) 

967 

968 

969_py_register_task = _register_task 

970_py_unregister_task = _unregister_task 

971_py_enter_task = _enter_task 

972_py_leave_task = _leave_task 

973 

974 

975try: 

976 from _asyncio import (_register_task, _unregister_task, 

977 _enter_task, _leave_task, 

978 _all_tasks, _current_tasks) 

979except ImportError: 

980 pass 

981else: 

982 _c_register_task = _register_task 

983 _c_unregister_task = _unregister_task 

984 _c_enter_task = _enter_task 

985 _c_leave_task = _leave_task