Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/anyio/abc/_tasks.py: 91%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

33 statements  

1from __future__ import annotations 

2 

3import sys 

4from abc import ABCMeta, abstractmethod 

5from collections.abc import Awaitable, Callable 

6from types import TracebackType 

7from typing import TYPE_CHECKING, Any, Protocol, overload 

8 

9if sys.version_info >= (3, 13): 

10 from typing import TypeVar 

11else: 

12 from typing_extensions import TypeVar 

13 

14if sys.version_info >= (3, 11): 

15 from typing import TypeVarTuple, Unpack 

16else: 

17 from typing_extensions import TypeVarTuple, Unpack 

18 

19if TYPE_CHECKING: 

20 from .._core._tasks import CancelScope 

21 

22T_Retval = TypeVar("T_Retval") 

23T_contra = TypeVar("T_contra", contravariant=True, default=None) 

24PosArgsT = TypeVarTuple("PosArgsT") 

25 

26 

27class TaskStatus(Protocol[T_contra]): 

28 @overload 

29 def started(self: TaskStatus[None]) -> None: ... 

30 

31 @overload 

32 def started(self, value: T_contra) -> None: ... 

33 

34 def started(self, value: T_contra | None = None) -> None: 

35 """ 

36 Signal that the task has started. 

37 

38 :param value: object passed back to the starter of the task 

39 """ 

40 

41 

42class TaskGroup(metaclass=ABCMeta): 

43 """ 

44 Groups several asynchronous tasks together. 

45 

46 :ivar cancel_scope: the cancel scope inherited by all child tasks 

47 :vartype cancel_scope: CancelScope 

48 

49 .. note:: On asyncio, support for eager task factories is considered to be 

50 **experimental**. In particular, they don't follow the usual semantics of new 

51 tasks being scheduled on the next iteration of the event loop, and may thus 

52 cause unexpected behavior in code that wasn't written with such semantics in 

53 mind. 

54 """ 

55 

56 cancel_scope: CancelScope 

57 

58 @abstractmethod 

59 def start_soon( 

60 self, 

61 func: Callable[[Unpack[PosArgsT]], Awaitable[Any]], 

62 *args: Unpack[PosArgsT], 

63 name: object = None, 

64 ) -> None: 

65 """ 

66 Start a new task in this task group. 

67 

68 :param func: a coroutine function 

69 :param args: positional arguments to call the function with 

70 :param name: name of the task, for the purposes of introspection and debugging 

71 

72 .. versionadded:: 3.0 

73 """ 

74 

75 @abstractmethod 

76 async def start( 

77 self, 

78 func: Callable[..., Awaitable[Any]], 

79 *args: object, 

80 name: object = None, 

81 ) -> Any: 

82 """ 

83 Start a new task and wait until it signals for readiness. 

84 

85 The target callable must accept a keyword argument ``task_status`` (of type 

86 :class:`TaskStatus`). Awaiting on this method will return whatever was passed to 

87 ``task_status.started()`` (``None`` by default). 

88 

89 .. note:: The :class:`TaskStatus` class is generic, and the type argument should 

90 indicate the type of the value that will be passed to 

91 ``task_status.started()``. 

92 

93 :param func: a coroutine function that accepts the ``task_status`` keyword 

94 argument 

95 :param args: positional arguments to call the function with 

96 :param name: an optional name for the task, for introspection and debugging 

97 :return: the value passed to ``task_status.started()`` 

98 :raises RuntimeError: if the task finishes without calling 

99 ``task_status.started()`` 

100 

101 .. seealso:: :ref:`start_initialize` 

102 

103 .. versionadded:: 3.0 

104 """ 

105 

106 @abstractmethod 

107 async def __aenter__(self) -> TaskGroup: 

108 """Enter the task group context and allow starting new tasks.""" 

109 

110 @abstractmethod 

111 async def __aexit__( 

112 self, 

113 exc_type: type[BaseException] | None, 

114 exc_val: BaseException | None, 

115 exc_tb: TracebackType | None, 

116 ) -> bool: 

117 """Exit the task group context waiting for all tasks to finish."""