1from __future__ import annotations
2
3import sys
4from abc import ABCMeta, abstractmethod
5from collections.abc import Awaitable, Callable
6from types import TracebackType
7from typing import TYPE_CHECKING, Any, Protocol, TypeVar, overload
8
9if sys.version_info >= (3, 11):
10 from typing import TypeVarTuple, Unpack
11else:
12 from typing_extensions import TypeVarTuple, Unpack
13
14if TYPE_CHECKING:
15 from .._core._tasks import CancelScope
16
17T_Retval = TypeVar("T_Retval")
18T_contra = TypeVar("T_contra", contravariant=True)
19PosArgsT = TypeVarTuple("PosArgsT")
20
21
22class TaskStatus(Protocol[T_contra]):
23 @overload
24 def started(self: TaskStatus[None]) -> None: ...
25
26 @overload
27 def started(self, value: T_contra) -> None: ...
28
29 def started(self, value: T_contra | None = None) -> None:
30 """
31 Signal that the task has started.
32
33 :param value: object passed back to the starter of the task
34 """
35
36
37class TaskGroup(metaclass=ABCMeta):
38 """
39 Groups several asynchronous tasks together.
40
41 :ivar cancel_scope: the cancel scope inherited by all child tasks
42 :vartype cancel_scope: CancelScope
43
44 .. note:: On asyncio, support for eager task factories is considered to be
45 **experimental**. In particular, they don't follow the usual semantics of new
46 tasks being scheduled on the next iteration of the event loop, and may thus
47 cause unexpected behavior in code that wasn't written with such semantics in
48 mind.
49 """
50
51 cancel_scope: CancelScope
52
53 @abstractmethod
54 def start_soon(
55 self,
56 func: Callable[[Unpack[PosArgsT]], Awaitable[Any]],
57 *args: Unpack[PosArgsT],
58 name: object = None,
59 ) -> None:
60 """
61 Start a new task in this task group.
62
63 :param func: a coroutine function
64 :param args: positional arguments to call the function with
65 :param name: name of the task, for the purposes of introspection and debugging
66
67 .. versionadded:: 3.0
68 """
69
70 @abstractmethod
71 async def start(
72 self,
73 func: Callable[..., Awaitable[Any]],
74 *args: object,
75 name: object = None,
76 ) -> Any:
77 """
78 Start a new task and wait until it signals for readiness.
79
80 The target callable must accept a keyword argument ``task_status`` (of type
81 :class:`TaskStatus`). Awaiting on this method will return whatever was passed to
82 ``task_status.started()`` (``None`` by default).
83
84 .. note:: The :class:`TaskStatus` class is generic, and the type argument should
85 indicate the type of the value that will be passed to
86 ``task_status.started()``.
87
88 :param func: a coroutine function that accepts the ``task_status`` keyword
89 argument
90 :param args: positional arguments to call the function with
91 :param name: an optional name for the task, for introspection and debugging
92 :return: the value passed to ``task_status.started()``
93 :raises RuntimeError: if the task finishes without calling
94 ``task_status.started()``
95
96 .. seealso:: :ref:`start_initialize`
97
98 .. versionadded:: 3.0
99 """
100
101 @abstractmethod
102 async def __aenter__(self) -> TaskGroup:
103 """Enter the task group context and allow starting new tasks."""
104
105 @abstractmethod
106 async def __aexit__(
107 self,
108 exc_type: type[BaseException] | None,
109 exc_val: BaseException | None,
110 exc_tb: TracebackType | None,
111 ) -> bool:
112 """Exit the task group context waiting for all tasks to finish."""