Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/asyncio/tasks.py: 1%

468 statements  

« prev     ^ index     » next       coverage.py v7.0.5, created at 2023-01-17 06:13 +0000

1"""Support for tasks, coroutines and the scheduler.""" 

2 

3__all__ = ( 

4 'Task', 'create_task', 

5 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED', 

6 'wait', 'wait_for', 'as_completed', 'sleep', 

7 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe', 

8 'current_task', 'all_tasks', 

9 '_register_task', '_unregister_task', '_enter_task', '_leave_task', 

10) 

11 

12import concurrent.futures 

13import contextvars 

14import functools 

15import inspect 

16import itertools 

17import types 

18import warnings 

19import weakref 

20 

21from . import base_tasks 

22from . import coroutines 

23from . import events 

24from . import exceptions 

25from . import futures 

26from .coroutines import _is_coroutine 

27 

28# Helper to generate new task names 

29# This uses itertools.count() instead of a "+= 1" operation because the latter 

30# is not thread safe. See bpo-11866 for a longer explanation. 

31_task_name_counter = itertools.count(1).__next__ 

32 

33 

34def current_task(loop=None): 

35 """Return a currently executed task.""" 

36 if loop is None: 

37 loop = events.get_running_loop() 

38 return _current_tasks.get(loop) 

39 

40 

41def all_tasks(loop=None): 

42 """Return a set of all tasks for the loop.""" 

43 if loop is None: 

44 loop = events.get_running_loop() 

45 # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another 

46 # thread while we do so. Therefore we cast it to list prior to filtering. The list 

47 # cast itself requires iteration, so we repeat it several times ignoring 

48 # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for 

49 # details. 

50 i = 0 

51 while True: 

52 try: 

53 tasks = list(_all_tasks) 

54 except RuntimeError: 

55 i += 1 

56 if i >= 1000: 

57 raise 

58 else: 

59 break 

60 return {t for t in tasks 

61 if futures._get_loop(t) is loop and not t.done()} 

62 

63 

64def _all_tasks_compat(loop=None): 

65 # Different from "all_task()" by returning *all* Tasks, including 

66 # the completed ones. Used to implement deprecated "Tasks.all_task()" 

67 # method. 

68 if loop is None: 

69 loop = events.get_event_loop() 

70 # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another 

71 # thread while we do so. Therefore we cast it to list prior to filtering. The list 

72 # cast itself requires iteration, so we repeat it several times ignoring 

73 # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for 

74 # details. 

75 i = 0 

76 while True: 

77 try: 

78 tasks = list(_all_tasks) 

79 except RuntimeError: 

80 i += 1 

81 if i >= 1000: 

82 raise 

83 else: 

84 break 

85 return {t for t in tasks if futures._get_loop(t) is loop} 

86 

87 

88def _set_task_name(task, name): 

89 if name is not None: 

90 try: 

91 set_name = task.set_name 

92 except AttributeError: 

93 pass 

94 else: 

95 set_name(name) 

96 

97 

98class Task(futures._PyFuture): # Inherit Python Task implementation 

99 # from a Python Future implementation. 

100 

101 """A coroutine wrapped in a Future.""" 

102 

103 # An important invariant maintained while a Task not done: 

104 # 

105 # - Either _fut_waiter is None, and _step() is scheduled; 

106 # - or _fut_waiter is some Future, and _step() is *not* scheduled. 

107 # 

108 # The only transition from the latter to the former is through 

109 # _wakeup(). When _fut_waiter is not None, one of its callbacks 

110 # must be _wakeup(). 

111 

112 # If False, don't log a message if the task is destroyed whereas its 

113 # status is still pending 

114 _log_destroy_pending = True 

115 

116 @classmethod 

117 def current_task(cls, loop=None): 

118 """Return the currently running task in an event loop or None. 

119 

120 By default the current task for the current event loop is returned. 

121 

122 None is returned when called not in the context of a Task. 

123 """ 

124 warnings.warn("Task.current_task() is deprecated since Python 3.7, " 

125 "use asyncio.current_task() instead", 

126 DeprecationWarning, 

127 stacklevel=2) 

128 if loop is None: 

129 loop = events.get_event_loop() 

130 return current_task(loop) 

131 

132 @classmethod 

133 def all_tasks(cls, loop=None): 

134 """Return a set of all tasks for an event loop. 

135 

136 By default all tasks for the current event loop are returned. 

137 """ 

138 warnings.warn("Task.all_tasks() is deprecated since Python 3.7, " 

139 "use asyncio.all_tasks() instead", 

140 DeprecationWarning, 

141 stacklevel=2) 

142 return _all_tasks_compat(loop) 

143 

144 def __init__(self, coro, *, loop=None, name=None): 

145 super().__init__(loop=loop) 

146 if self._source_traceback: 

147 del self._source_traceback[-1] 

148 if not coroutines.iscoroutine(coro): 

149 # raise after Future.__init__(), attrs are required for __del__ 

150 # prevent logging for pending task in __del__ 

151 self._log_destroy_pending = False 

152 raise TypeError(f"a coroutine was expected, got {coro!r}") 

153 

154 if name is None: 

155 self._name = f'Task-{_task_name_counter()}' 

156 else: 

157 self._name = str(name) 

158 

159 self._must_cancel = False 

160 self._fut_waiter = None 

161 self._coro = coro 

162 self._context = contextvars.copy_context() 

163 

164 self._loop.call_soon(self.__step, context=self._context) 

165 _register_task(self) 

166 

167 def __del__(self): 

168 if self._state == futures._PENDING and self._log_destroy_pending: 

169 context = { 

170 'task': self, 

171 'message': 'Task was destroyed but it is pending!', 

172 } 

173 if self._source_traceback: 

174 context['source_traceback'] = self._source_traceback 

175 self._loop.call_exception_handler(context) 

176 super().__del__() 

177 

178 def _repr_info(self): 

179 return base_tasks._task_repr_info(self) 

180 

181 def get_coro(self): 

182 return self._coro 

183 

184 def get_name(self): 

185 return self._name 

186 

187 def set_name(self, value): 

188 self._name = str(value) 

189 

190 def set_result(self, result): 

191 raise RuntimeError('Task does not support set_result operation') 

192 

193 def set_exception(self, exception): 

194 raise RuntimeError('Task does not support set_exception operation') 

195 

196 def get_stack(self, *, limit=None): 

197 """Return the list of stack frames for this task's coroutine. 

198 

199 If the coroutine is not done, this returns the stack where it is 

200 suspended. If the coroutine has completed successfully or was 

201 cancelled, this returns an empty list. If the coroutine was 

202 terminated by an exception, this returns the list of traceback 

203 frames. 

204 

205 The frames are always ordered from oldest to newest. 

206 

207 The optional limit gives the maximum number of frames to 

208 return; by default all available frames are returned. Its 

209 meaning differs depending on whether a stack or a traceback is 

210 returned: the newest frames of a stack are returned, but the 

211 oldest frames of a traceback are returned. (This matches the 

212 behavior of the traceback module.) 

213 

214 For reasons beyond our control, only one stack frame is 

215 returned for a suspended coroutine. 

216 """ 

217 return base_tasks._task_get_stack(self, limit) 

218 

219 def print_stack(self, *, limit=None, file=None): 

220 """Print the stack or traceback for this task's coroutine. 

221 

222 This produces output similar to that of the traceback module, 

223 for the frames retrieved by get_stack(). The limit argument 

224 is passed to get_stack(). The file argument is an I/O stream 

225 to which the output is written; by default output is written 

226 to sys.stderr. 

227 """ 

228 return base_tasks._task_print_stack(self, limit, file) 

229 

230 def cancel(self): 

231 """Request that this task cancel itself. 

232 

233 This arranges for a CancelledError to be thrown into the 

234 wrapped coroutine on the next cycle through the event loop. 

235 The coroutine then has a chance to clean up or even deny 

236 the request using try/except/finally. 

237 

238 Unlike Future.cancel, this does not guarantee that the 

239 task will be cancelled: the exception might be caught and 

240 acted upon, delaying cancellation of the task or preventing 

241 cancellation completely. The task may also return a value or 

242 raise a different exception. 

243 

244 Immediately after this method is called, Task.cancelled() will 

245 not return True (unless the task was already cancelled). A 

246 task will be marked as cancelled when the wrapped coroutine 

247 terminates with a CancelledError exception (even if cancel() 

248 was not called). 

249 """ 

250 self._log_traceback = False 

251 if self.done(): 

252 return False 

253 if self._fut_waiter is not None: 

254 if self._fut_waiter.cancel(): 

255 # Leave self._fut_waiter; it may be a Task that 

256 # catches and ignores the cancellation so we may have 

257 # to cancel it again later. 

258 return True 

259 # It must be the case that self.__step is already scheduled. 

260 self._must_cancel = True 

261 return True 

262 

263 def __step(self, exc=None): 

264 if self.done(): 

265 raise exceptions.InvalidStateError( 

266 f'_step(): already done: {self!r}, {exc!r}') 

267 if self._must_cancel: 

268 if not isinstance(exc, exceptions.CancelledError): 

269 exc = exceptions.CancelledError() 

270 self._must_cancel = False 

271 coro = self._coro 

272 self._fut_waiter = None 

273 

274 _enter_task(self._loop, self) 

275 # Call either coro.throw(exc) or coro.send(None). 

276 try: 

277 if exc is None: 

278 # We use the `send` method directly, because coroutines 

279 # don't have `__iter__` and `__next__` methods. 

280 result = coro.send(None) 

281 else: 

282 result = coro.throw(exc) 

283 except StopIteration as exc: 

284 if self._must_cancel: 

285 # Task is cancelled right before coro stops. 

286 self._must_cancel = False 

287 super().cancel() 

288 else: 

289 super().set_result(exc.value) 

290 except exceptions.CancelledError: 

291 super().cancel() # I.e., Future.cancel(self). 

292 except (KeyboardInterrupt, SystemExit) as exc: 

293 super().set_exception(exc) 

294 raise 

295 except BaseException as exc: 

296 super().set_exception(exc) 

297 else: 

298 blocking = getattr(result, '_asyncio_future_blocking', None) 

299 if blocking is not None: 

300 # Yielded Future must come from Future.__iter__(). 

301 if futures._get_loop(result) is not self._loop: 

302 new_exc = RuntimeError( 

303 f'Task {self!r} got Future ' 

304 f'{result!r} attached to a different loop') 

305 self._loop.call_soon( 

306 self.__step, new_exc, context=self._context) 

307 elif blocking: 

308 if result is self: 

309 new_exc = RuntimeError( 

310 f'Task cannot await on itself: {self!r}') 

311 self._loop.call_soon( 

312 self.__step, new_exc, context=self._context) 

313 else: 

314 result._asyncio_future_blocking = False 

315 result.add_done_callback( 

316 self.__wakeup, context=self._context) 

317 self._fut_waiter = result 

318 if self._must_cancel: 

319 if self._fut_waiter.cancel(): 

320 self._must_cancel = False 

321 else: 

322 new_exc = RuntimeError( 

323 f'yield was used instead of yield from ' 

324 f'in task {self!r} with {result!r}') 

325 self._loop.call_soon( 

326 self.__step, new_exc, context=self._context) 

327 

328 elif result is None: 

329 # Bare yield relinquishes control for one event loop iteration. 

330 self._loop.call_soon(self.__step, context=self._context) 

331 elif inspect.isgenerator(result): 

332 # Yielding a generator is just wrong. 

333 new_exc = RuntimeError( 

334 f'yield was used instead of yield from for ' 

335 f'generator in task {self!r} with {result!r}') 

336 self._loop.call_soon( 

337 self.__step, new_exc, context=self._context) 

338 else: 

339 # Yielding something else is an error. 

340 new_exc = RuntimeError(f'Task got bad yield: {result!r}') 

341 self._loop.call_soon( 

342 self.__step, new_exc, context=self._context) 

343 finally: 

344 _leave_task(self._loop, self) 

345 self = None # Needed to break cycles when an exception occurs. 

346 

347 def __wakeup(self, future): 

348 try: 

349 future.result() 

350 except BaseException as exc: 

351 # This may also be a cancellation. 

352 self.__step(exc) 

353 else: 

354 # Don't pass the value of `future.result()` explicitly, 

355 # as `Future.__iter__` and `Future.__await__` don't need it. 

356 # If we call `_step(value, None)` instead of `_step()`, 

357 # Python eval loop would use `.send(value)` method call, 

358 # instead of `__next__()`, which is slower for futures 

359 # that return non-generator iterators from their `__iter__`. 

360 self.__step() 

361 self = None # Needed to break cycles when an exception occurs. 

362 

363 

364_PyTask = Task 

365 

366 

367try: 

368 import _asyncio 

369except ImportError: 

370 pass 

371else: 

372 # _CTask is needed for tests. 

373 Task = _CTask = _asyncio.Task 

374 

375 

376def create_task(coro, *, name=None): 

377 """Schedule the execution of a coroutine object in a spawn task. 

378 

379 Return a Task object. 

380 """ 

381 loop = events.get_running_loop() 

382 task = loop.create_task(coro) 

383 _set_task_name(task, name) 

384 return task 

385 

386 

387# wait() and as_completed() similar to those in PEP 3148. 

388 

389FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED 

390FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION 

391ALL_COMPLETED = concurrent.futures.ALL_COMPLETED 

392 

393 

394async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED): 

395 """Wait for the Futures and coroutines given by fs to complete. 

396 

397 The sequence futures must not be empty. 

398 

399 Coroutines will be wrapped in Tasks. 

400 

401 Returns two sets of Future: (done, pending). 

402 

403 Usage: 

404 

405 done, pending = await asyncio.wait(fs) 

406 

407 Note: This does not raise TimeoutError! Futures that aren't done 

408 when the timeout occurs are returned in the second set. 

409 """ 

410 if futures.isfuture(fs) or coroutines.iscoroutine(fs): 

411 raise TypeError(f"expect a list of futures, not {type(fs).__name__}") 

412 if not fs: 

413 raise ValueError('Set of coroutines/Futures is empty.') 

414 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED): 

415 raise ValueError(f'Invalid return_when value: {return_when}') 

416 

417 if loop is None: 

418 loop = events.get_running_loop() 

419 else: 

420 warnings.warn("The loop argument is deprecated since Python 3.8, " 

421 "and scheduled for removal in Python 3.10.", 

422 DeprecationWarning, stacklevel=2) 

423 

424 fs = {ensure_future(f, loop=loop) for f in set(fs)} 

425 

426 return await _wait(fs, timeout, return_when, loop) 

427 

428 

429def _release_waiter(waiter, *args): 

430 if not waiter.done(): 

431 waiter.set_result(None) 

432 

433 

434async def wait_for(fut, timeout, *, loop=None): 

435 """Wait for the single Future or coroutine to complete, with timeout. 

436 

437 Coroutine will be wrapped in Task. 

438 

439 Returns result of the Future or coroutine. When a timeout occurs, 

440 it cancels the task and raises TimeoutError. To avoid the task 

441 cancellation, wrap it in shield(). 

442 

443 If the wait is cancelled, the task is also cancelled. 

444 

445 This function is a coroutine. 

446 """ 

447 if loop is None: 

448 loop = events.get_running_loop() 

449 else: 

450 warnings.warn("The loop argument is deprecated since Python 3.8, " 

451 "and scheduled for removal in Python 3.10.", 

452 DeprecationWarning, stacklevel=2) 

453 

454 if timeout is None: 

455 return await fut 

456 

457 if timeout <= 0: 

458 fut = ensure_future(fut, loop=loop) 

459 

460 if fut.done(): 

461 return fut.result() 

462 

463 fut.cancel() 

464 raise exceptions.TimeoutError() 

465 

466 waiter = loop.create_future() 

467 timeout_handle = loop.call_later(timeout, _release_waiter, waiter) 

468 cb = functools.partial(_release_waiter, waiter) 

469 

470 fut = ensure_future(fut, loop=loop) 

471 fut.add_done_callback(cb) 

472 

473 try: 

474 # wait until the future completes or the timeout 

475 try: 

476 await waiter 

477 except exceptions.CancelledError: 

478 fut.remove_done_callback(cb) 

479 fut.cancel() 

480 raise 

481 

482 if fut.done(): 

483 return fut.result() 

484 else: 

485 fut.remove_done_callback(cb) 

486 # We must ensure that the task is not running 

487 # after wait_for() returns. 

488 # See https://bugs.python.org/issue32751 

489 await _cancel_and_wait(fut, loop=loop) 

490 raise exceptions.TimeoutError() 

491 finally: 

492 timeout_handle.cancel() 

493 

494 

495async def _wait(fs, timeout, return_when, loop): 

496 """Internal helper for wait(). 

497 

498 The fs argument must be a collection of Futures. 

499 """ 

500 assert fs, 'Set of Futures is empty.' 

501 waiter = loop.create_future() 

502 timeout_handle = None 

503 if timeout is not None: 

504 timeout_handle = loop.call_later(timeout, _release_waiter, waiter) 

505 counter = len(fs) 

506 

507 def _on_completion(f): 

508 nonlocal counter 

509 counter -= 1 

510 if (counter <= 0 or 

511 return_when == FIRST_COMPLETED or 

512 return_when == FIRST_EXCEPTION and (not f.cancelled() and 

513 f.exception() is not None)): 

514 if timeout_handle is not None: 

515 timeout_handle.cancel() 

516 if not waiter.done(): 

517 waiter.set_result(None) 

518 

519 for f in fs: 

520 f.add_done_callback(_on_completion) 

521 

522 try: 

523 await waiter 

524 finally: 

525 if timeout_handle is not None: 

526 timeout_handle.cancel() 

527 for f in fs: 

528 f.remove_done_callback(_on_completion) 

529 

530 done, pending = set(), set() 

531 for f in fs: 

532 if f.done(): 

533 done.add(f) 

534 else: 

535 pending.add(f) 

536 return done, pending 

537 

538 

539async def _cancel_and_wait(fut, loop): 

540 """Cancel the *fut* future or task and wait until it completes.""" 

541 

542 waiter = loop.create_future() 

543 cb = functools.partial(_release_waiter, waiter) 

544 fut.add_done_callback(cb) 

545 

546 try: 

547 fut.cancel() 

548 # We cannot wait on *fut* directly to make 

549 # sure _cancel_and_wait itself is reliably cancellable. 

550 await waiter 

551 finally: 

552 fut.remove_done_callback(cb) 

553 

554 

555# This is *not* a @coroutine! It is just an iterator (yielding Futures). 

556def as_completed(fs, *, loop=None, timeout=None): 

557 """Return an iterator whose values are coroutines. 

558 

559 When waiting for the yielded coroutines you'll get the results (or 

560 exceptions!) of the original Futures (or coroutines), in the order 

561 in which and as soon as they complete. 

562 

563 This differs from PEP 3148; the proper way to use this is: 

564 

565 for f in as_completed(fs): 

566 result = await f # The 'await' may raise. 

567 # Use result. 

568 

569 If a timeout is specified, the 'await' will raise 

570 TimeoutError when the timeout occurs before all Futures are done. 

571 

572 Note: The futures 'f' are not necessarily members of fs. 

573 """ 

574 if futures.isfuture(fs) or coroutines.iscoroutine(fs): 

575 raise TypeError(f"expect a list of futures, not {type(fs).__name__}") 

576 

577 from .queues import Queue # Import here to avoid circular import problem. 

578 done = Queue(loop=loop) 

579 

580 if loop is None: 

581 loop = events.get_event_loop() 

582 else: 

583 warnings.warn("The loop argument is deprecated since Python 3.8, " 

584 "and scheduled for removal in Python 3.10.", 

585 DeprecationWarning, stacklevel=2) 

586 todo = {ensure_future(f, loop=loop) for f in set(fs)} 

587 timeout_handle = None 

588 

589 def _on_timeout(): 

590 for f in todo: 

591 f.remove_done_callback(_on_completion) 

592 done.put_nowait(None) # Queue a dummy value for _wait_for_one(). 

593 todo.clear() # Can't do todo.remove(f) in the loop. 

594 

595 def _on_completion(f): 

596 if not todo: 

597 return # _on_timeout() was here first. 

598 todo.remove(f) 

599 done.put_nowait(f) 

600 if not todo and timeout_handle is not None: 

601 timeout_handle.cancel() 

602 

603 async def _wait_for_one(): 

604 f = await done.get() 

605 if f is None: 

606 # Dummy value from _on_timeout(). 

607 raise exceptions.TimeoutError 

608 return f.result() # May raise f.exception(). 

609 

610 for f in todo: 

611 f.add_done_callback(_on_completion) 

612 if todo and timeout is not None: 

613 timeout_handle = loop.call_later(timeout, _on_timeout) 

614 for _ in range(len(todo)): 

615 yield _wait_for_one() 

616 

617 

618@types.coroutine 

619def __sleep0(): 

620 """Skip one event loop run cycle. 

621 

622 This is a private helper for 'asyncio.sleep()', used 

623 when the 'delay' is set to 0. It uses a bare 'yield' 

624 expression (which Task.__step knows how to handle) 

625 instead of creating a Future object. 

626 """ 

627 yield 

628 

629 

630async def sleep(delay, result=None, *, loop=None): 

631 """Coroutine that completes after a given time (in seconds).""" 

632 if delay <= 0: 

633 await __sleep0() 

634 return result 

635 

636 if loop is None: 

637 loop = events.get_running_loop() 

638 else: 

639 warnings.warn("The loop argument is deprecated since Python 3.8, " 

640 "and scheduled for removal in Python 3.10.", 

641 DeprecationWarning, stacklevel=2) 

642 

643 future = loop.create_future() 

644 h = loop.call_later(delay, 

645 futures._set_result_unless_cancelled, 

646 future, result) 

647 try: 

648 return await future 

649 finally: 

650 h.cancel() 

651 

652 

653def ensure_future(coro_or_future, *, loop=None): 

654 """Wrap a coroutine or an awaitable in a future. 

655 

656 If the argument is a Future, it is returned directly. 

657 """ 

658 if coroutines.iscoroutine(coro_or_future): 

659 if loop is None: 

660 loop = events.get_event_loop() 

661 task = loop.create_task(coro_or_future) 

662 if task._source_traceback: 

663 del task._source_traceback[-1] 

664 return task 

665 elif futures.isfuture(coro_or_future): 

666 if loop is not None and loop is not futures._get_loop(coro_or_future): 

667 raise ValueError('The future belongs to a different loop than ' 

668 'the one specified as the loop argument') 

669 return coro_or_future 

670 elif inspect.isawaitable(coro_or_future): 

671 return ensure_future(_wrap_awaitable(coro_or_future), loop=loop) 

672 else: 

673 raise TypeError('An asyncio.Future, a coroutine or an awaitable is ' 

674 'required') 

675 

676 

677@types.coroutine 

678def _wrap_awaitable(awaitable): 

679 """Helper for asyncio.ensure_future(). 

680 

681 Wraps awaitable (an object with __await__) into a coroutine 

682 that will later be wrapped in a Task by ensure_future(). 

683 """ 

684 return (yield from awaitable.__await__()) 

685 

686_wrap_awaitable._is_coroutine = _is_coroutine 

687 

688 

689class _GatheringFuture(futures.Future): 

690 """Helper for gather(). 

691 

692 This overrides cancel() to cancel all the children and act more 

693 like Task.cancel(), which doesn't immediately mark itself as 

694 cancelled. 

695 """ 

696 

697 def __init__(self, children, *, loop=None): 

698 super().__init__(loop=loop) 

699 self._children = children 

700 self._cancel_requested = False 

701 

702 def cancel(self): 

703 if self.done(): 

704 return False 

705 ret = False 

706 for child in self._children: 

707 if child.cancel(): 

708 ret = True 

709 if ret: 

710 # If any child tasks were actually cancelled, we should 

711 # propagate the cancellation request regardless of 

712 # *return_exceptions* argument. See issue 32684. 

713 self._cancel_requested = True 

714 return ret 

715 

716 

717def gather(*coros_or_futures, loop=None, return_exceptions=False): 

718 """Return a future aggregating results from the given coroutines/futures. 

719 

720 Coroutines will be wrapped in a future and scheduled in the event 

721 loop. They will not necessarily be scheduled in the same order as 

722 passed in. 

723 

724 All futures must share the same event loop. If all the tasks are 

725 done successfully, the returned future's result is the list of 

726 results (in the order of the original sequence, not necessarily 

727 the order of results arrival). If *return_exceptions* is True, 

728 exceptions in the tasks are treated the same as successful 

729 results, and gathered in the result list; otherwise, the first 

730 raised exception will be immediately propagated to the returned 

731 future. 

732 

733 Cancellation: if the outer Future is cancelled, all children (that 

734 have not completed yet) are also cancelled. If any child is 

735 cancelled, this is treated as if it raised CancelledError -- 

736 the outer Future is *not* cancelled in this case. (This is to 

737 prevent the cancellation of one child to cause other children to 

738 be cancelled.) 

739 """ 

740 if not coros_or_futures: 

741 if loop is None: 

742 loop = events.get_event_loop() 

743 else: 

744 warnings.warn("The loop argument is deprecated since Python 3.8, " 

745 "and scheduled for removal in Python 3.10.", 

746 DeprecationWarning, stacklevel=2) 

747 outer = loop.create_future() 

748 outer.set_result([]) 

749 return outer 

750 

751 def _done_callback(fut): 

752 nonlocal nfinished 

753 nfinished += 1 

754 

755 if outer.done(): 

756 if not fut.cancelled(): 

757 # Mark exception retrieved. 

758 fut.exception() 

759 return 

760 

761 if not return_exceptions: 

762 if fut.cancelled(): 

763 # Check if 'fut' is cancelled first, as 

764 # 'fut.exception()' will *raise* a CancelledError 

765 # instead of returning it. 

766 exc = exceptions.CancelledError() 

767 outer.set_exception(exc) 

768 return 

769 else: 

770 exc = fut.exception() 

771 if exc is not None: 

772 outer.set_exception(exc) 

773 return 

774 

775 if nfinished == nfuts: 

776 # All futures are done; create a list of results 

777 # and set it to the 'outer' future. 

778 results = [] 

779 

780 for fut in children: 

781 if fut.cancelled(): 

782 # Check if 'fut' is cancelled first, as 

783 # 'fut.exception()' will *raise* a CancelledError 

784 # instead of returning it. 

785 res = exceptions.CancelledError() 

786 else: 

787 res = fut.exception() 

788 if res is None: 

789 res = fut.result() 

790 results.append(res) 

791 

792 if outer._cancel_requested: 

793 # If gather is being cancelled we must propagate the 

794 # cancellation regardless of *return_exceptions* argument. 

795 # See issue 32684. 

796 outer.set_exception(exceptions.CancelledError()) 

797 else: 

798 outer.set_result(results) 

799 

800 arg_to_fut = {} 

801 children = [] 

802 nfuts = 0 

803 nfinished = 0 

804 for arg in coros_or_futures: 

805 if arg not in arg_to_fut: 

806 fut = ensure_future(arg, loop=loop) 

807 if loop is None: 

808 loop = futures._get_loop(fut) 

809 if fut is not arg: 

810 # 'arg' was not a Future, therefore, 'fut' is a new 

811 # Future created specifically for 'arg'. Since the caller 

812 # can't control it, disable the "destroy pending task" 

813 # warning. 

814 fut._log_destroy_pending = False 

815 

816 nfuts += 1 

817 arg_to_fut[arg] = fut 

818 fut.add_done_callback(_done_callback) 

819 

820 else: 

821 # There's a duplicate Future object in coros_or_futures. 

822 fut = arg_to_fut[arg] 

823 

824 children.append(fut) 

825 

826 outer = _GatheringFuture(children, loop=loop) 

827 return outer 

828 

829 

830def shield(arg, *, loop=None): 

831 """Wait for a future, shielding it from cancellation. 

832 

833 The statement 

834 

835 res = await shield(something()) 

836 

837 is exactly equivalent to the statement 

838 

839 res = await something() 

840 

841 *except* that if the coroutine containing it is cancelled, the 

842 task running in something() is not cancelled. From the POV of 

843 something(), the cancellation did not happen. But its caller is 

844 still cancelled, so the yield-from expression still raises 

845 CancelledError. Note: If something() is cancelled by other means 

846 this will still cancel shield(). 

847 

848 If you want to completely ignore cancellation (not recommended) 

849 you can combine shield() with a try/except clause, as follows: 

850 

851 try: 

852 res = await shield(something()) 

853 except CancelledError: 

854 res = None 

855 """ 

856 if loop is not None: 

857 warnings.warn("The loop argument is deprecated since Python 3.8, " 

858 "and scheduled for removal in Python 3.10.", 

859 DeprecationWarning, stacklevel=2) 

860 inner = ensure_future(arg, loop=loop) 

861 if inner.done(): 

862 # Shortcut. 

863 return inner 

864 loop = futures._get_loop(inner) 

865 outer = loop.create_future() 

866 

867 def _inner_done_callback(inner): 

868 if outer.cancelled(): 

869 if not inner.cancelled(): 

870 # Mark inner's result as retrieved. 

871 inner.exception() 

872 return 

873 

874 if inner.cancelled(): 

875 outer.cancel() 

876 else: 

877 exc = inner.exception() 

878 if exc is not None: 

879 outer.set_exception(exc) 

880 else: 

881 outer.set_result(inner.result()) 

882 

883 

884 def _outer_done_callback(outer): 

885 if not inner.done(): 

886 inner.remove_done_callback(_inner_done_callback) 

887 

888 inner.add_done_callback(_inner_done_callback) 

889 outer.add_done_callback(_outer_done_callback) 

890 return outer 

891 

892 

893def run_coroutine_threadsafe(coro, loop): 

894 """Submit a coroutine object to a given event loop. 

895 

896 Return a concurrent.futures.Future to access the result. 

897 """ 

898 if not coroutines.iscoroutine(coro): 

899 raise TypeError('A coroutine object is required') 

900 future = concurrent.futures.Future() 

901 

902 def callback(): 

903 try: 

904 futures._chain_future(ensure_future(coro, loop=loop), future) 

905 except (SystemExit, KeyboardInterrupt): 

906 raise 

907 except BaseException as exc: 

908 if future.set_running_or_notify_cancel(): 

909 future.set_exception(exc) 

910 raise 

911 

912 loop.call_soon_threadsafe(callback) 

913 return future 

914 

915 

916# WeakSet containing all alive tasks. 

917_all_tasks = weakref.WeakSet() 

918 

919# Dictionary containing tasks that are currently active in 

920# all running event loops. {EventLoop: Task} 

921_current_tasks = {} 

922 

923 

924def _register_task(task): 

925 """Register a new task in asyncio as executed by loop.""" 

926 _all_tasks.add(task) 

927 

928 

929def _enter_task(loop, task): 

930 current_task = _current_tasks.get(loop) 

931 if current_task is not None: 

932 raise RuntimeError(f"Cannot enter into task {task!r} while another " 

933 f"task {current_task!r} is being executed.") 

934 _current_tasks[loop] = task 

935 

936 

937def _leave_task(loop, task): 

938 current_task = _current_tasks.get(loop) 

939 if current_task is not task: 

940 raise RuntimeError(f"Leaving task {task!r} does not match " 

941 f"the current task {current_task!r}.") 

942 del _current_tasks[loop] 

943 

944 

945def _unregister_task(task): 

946 """Unregister a task.""" 

947 _all_tasks.discard(task) 

948 

949 

950_py_register_task = _register_task 

951_py_unregister_task = _unregister_task 

952_py_enter_task = _enter_task 

953_py_leave_task = _leave_task 

954 

955 

956try: 

957 from _asyncio import (_register_task, _unregister_task, 

958 _enter_task, _leave_task, 

959 _all_tasks, _current_tasks) 

960except ImportError: 

961 pass 

962else: 

963 _c_register_task = _register_task 

964 _c_unregister_task = _unregister_task 

965 _c_enter_task = _enter_task 

966 _c_leave_task = _leave_task