1from __future__ import annotations
2
3import sys
4from abc import ABCMeta, abstractmethod
5from collections.abc import Awaitable, Callable
6from types import TracebackType
7from typing import TYPE_CHECKING, Any, Protocol, overload
8
9if sys.version_info >= (3, 13):
10 from typing import TypeVar
11else:
12 from typing_extensions import TypeVar
13
14if sys.version_info >= (3, 11):
15 from typing import TypeVarTuple, Unpack
16else:
17 from typing_extensions import TypeVarTuple, Unpack
18
19if TYPE_CHECKING:
20 from .._core._tasks import CancelScope
21
22T_Retval = TypeVar("T_Retval")
23T_contra = TypeVar("T_contra", contravariant=True, default=None)
24PosArgsT = TypeVarTuple("PosArgsT")
25
26
27class TaskStatus(Protocol[T_contra]):
28 @overload
29 def started(self: TaskStatus[None]) -> None: ...
30
31 @overload
32 def started(self, value: T_contra) -> None: ...
33
34 def started(self, value: T_contra | None = None) -> None:
35 """
36 Signal that the task has started.
37
38 :param value: object passed back to the starter of the task
39 """
40
41
42class TaskGroup(metaclass=ABCMeta):
43 """
44 Groups several asynchronous tasks together.
45
46 :ivar cancel_scope: the cancel scope inherited by all child tasks
47 :vartype cancel_scope: CancelScope
48
49 .. note:: On asyncio, support for eager task factories is considered to be
50 **experimental**. In particular, they don't follow the usual semantics of new
51 tasks being scheduled on the next iteration of the event loop, and may thus
52 cause unexpected behavior in code that wasn't written with such semantics in
53 mind.
54 """
55
56 cancel_scope: CancelScope
57
58 @abstractmethod
59 def start_soon(
60 self,
61 func: Callable[[Unpack[PosArgsT]], Awaitable[Any]],
62 *args: Unpack[PosArgsT],
63 name: object = None,
64 ) -> None:
65 """
66 Start a new task in this task group.
67
68 :param func: a coroutine function
69 :param args: positional arguments to call the function with
70 :param name: name of the task, for the purposes of introspection and debugging
71
72 .. versionadded:: 3.0
73 """
74
75 @abstractmethod
76 async def start(
77 self,
78 func: Callable[..., Awaitable[Any]],
79 *args: object,
80 name: object = None,
81 ) -> Any:
82 """
83 Start a new task and wait until it signals for readiness.
84
85 The target callable must accept a keyword argument ``task_status`` (of type
86 :class:`TaskStatus`). Awaiting on this method will return whatever was passed to
87 ``task_status.started()`` (``None`` by default).
88
89 .. note:: The :class:`TaskStatus` class is generic, and the type argument should
90 indicate the type of the value that will be passed to
91 ``task_status.started()``.
92
93 :param func: a coroutine function that accepts the ``task_status`` keyword
94 argument
95 :param args: positional arguments to call the function with
96 :param name: an optional name for the task, for introspection and debugging
97 :return: the value passed to ``task_status.started()``
98 :raises RuntimeError: if the task finishes without calling
99 ``task_status.started()``
100
101 .. seealso:: :ref:`start_initialize`
102
103 .. versionadded:: 3.0
104 """
105
106 @abstractmethod
107 async def __aenter__(self) -> TaskGroup:
108 """Enter the task group context and allow starting new tasks."""
109
110 @abstractmethod
111 async def __aexit__(
112 self,
113 exc_type: type[BaseException] | None,
114 exc_val: BaseException | None,
115 exc_tb: TracebackType | None,
116 ) -> bool:
117 """Exit the task group context waiting for all tasks to finish."""