Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/_core/_tasks.py: 58%

59 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:12 +0000

1import math 

2from types import TracebackType 

3from typing import Optional, Type 

4from warnings import warn 

5 

6from ..abc._tasks import TaskGroup, TaskStatus 

7from ._compat import ( 

8 DeprecatedAsyncContextManager, 

9 DeprecatedAwaitable, 

10 DeprecatedAwaitableFloat, 

11) 

12from ._eventloop import get_asynclib 

13 

14 

15class _IgnoredTaskStatus(TaskStatus): 

16 def started(self, value: object = None) -> None: 

17 pass 

18 

19 

20TASK_STATUS_IGNORED = _IgnoredTaskStatus() 

21 

22 

23class CancelScope(DeprecatedAsyncContextManager["CancelScope"]): 

24 """ 

25 Wraps a unit of work that can be made separately cancellable. 

26 

27 :param deadline: The time (clock value) when this scope is cancelled automatically 

28 :param shield: ``True`` to shield the cancel scope from external cancellation 

29 """ 

30 

31 def __new__( 

32 cls, *, deadline: float = math.inf, shield: bool = False 

33 ) -> "CancelScope": 

34 return get_asynclib().CancelScope(shield=shield, deadline=deadline) 

35 

36 def cancel(self) -> DeprecatedAwaitable: 

37 """Cancel this scope immediately.""" 

38 raise NotImplementedError 

39 

40 @property 

41 def deadline(self) -> float: 

42 """ 

43 The time (clock value) when this scope is cancelled automatically. 

44 

45 Will be ``float('inf')`` if no timeout has been set. 

46 

47 """ 

48 raise NotImplementedError 

49 

50 @deadline.setter 

51 def deadline(self, value: float) -> None: 

52 raise NotImplementedError 

53 

54 @property 

55 def cancel_called(self) -> bool: 

56 """``True`` if :meth:`cancel` has been called.""" 

57 raise NotImplementedError 

58 

59 @property 

60 def shield(self) -> bool: 

61 """ 

62 ``True`` if this scope is shielded from external cancellation. 

63 

64 While a scope is shielded, it will not receive cancellations from outside. 

65 

66 """ 

67 raise NotImplementedError 

68 

69 @shield.setter 

70 def shield(self, value: bool) -> None: 

71 raise NotImplementedError 

72 

73 def __enter__(self) -> "CancelScope": 

74 raise NotImplementedError 

75 

76 def __exit__( 

77 self, 

78 exc_type: Optional[Type[BaseException]], 

79 exc_val: Optional[BaseException], 

80 exc_tb: Optional[TracebackType], 

81 ) -> Optional[bool]: 

82 raise NotImplementedError 

83 

84 

85def open_cancel_scope(*, shield: bool = False) -> CancelScope: 

86 """ 

87 Open a cancel scope. 

88 

89 :param shield: ``True`` to shield the cancel scope from external cancellation 

90 :return: a cancel scope 

91 

92 .. deprecated:: 3.0 

93 Use :class:`~CancelScope` directly. 

94 

95 """ 

96 warn( 

97 "open_cancel_scope() is deprecated -- use CancelScope() directly", 

98 DeprecationWarning, 

99 ) 

100 return get_asynclib().CancelScope(shield=shield) 

101 

102 

103class FailAfterContextManager(DeprecatedAsyncContextManager[CancelScope]): 

104 def __init__(self, cancel_scope: CancelScope): 

105 self._cancel_scope = cancel_scope 

106 

107 def __enter__(self) -> CancelScope: 

108 return self._cancel_scope.__enter__() 

109 

110 def __exit__( 

111 self, 

112 exc_type: Optional[Type[BaseException]], 

113 exc_val: Optional[BaseException], 

114 exc_tb: Optional[TracebackType], 

115 ) -> Optional[bool]: 

116 retval = self._cancel_scope.__exit__(exc_type, exc_val, exc_tb) 

117 if self._cancel_scope.cancel_called: 

118 raise TimeoutError 

119 

120 return retval 

121 

122 

123def fail_after(delay: Optional[float], shield: bool = False) -> FailAfterContextManager: 

124 """ 

125 Create a context manager which raises a :class:`TimeoutError` if does not finish in time. 

126 

127 :param delay: maximum allowed time (in seconds) before raising the exception, or ``None`` to 

128 disable the timeout 

129 :param shield: ``True`` to shield the cancel scope from external cancellation 

130 :return: a context manager that yields a cancel scope 

131 :rtype: :class:`~typing.ContextManager`\\[:class:`~anyio.abc.CancelScope`\\] 

132 

133 """ 

134 deadline = ( 

135 (get_asynclib().current_time() + delay) if delay is not None else math.inf 

136 ) 

137 cancel_scope = get_asynclib().CancelScope(deadline=deadline, shield=shield) 

138 return FailAfterContextManager(cancel_scope) 

139 

140 

141def move_on_after(delay: Optional[float], shield: bool = False) -> CancelScope: 

142 """ 

143 Create a cancel scope with a deadline that expires after the given delay. 

144 

145 :param delay: maximum allowed time (in seconds) before exiting the context block, or ``None`` 

146 to disable the timeout 

147 :param shield: ``True`` to shield the cancel scope from external cancellation 

148 :return: a cancel scope 

149 

150 """ 

151 deadline = ( 

152 (get_asynclib().current_time() + delay) if delay is not None else math.inf 

153 ) 

154 return get_asynclib().CancelScope(deadline=deadline, shield=shield) 

155 

156 

157def current_effective_deadline() -> DeprecatedAwaitableFloat: 

158 """ 

159 Return the nearest deadline among all the cancel scopes effective for the current task. 

160 

161 :return: a clock value from the event loop's internal clock (``float('inf')`` if there is no 

162 deadline in effect) 

163 :rtype: float 

164 

165 """ 

166 return DeprecatedAwaitableFloat( 

167 get_asynclib().current_effective_deadline(), current_effective_deadline 

168 ) 

169 

170 

171def create_task_group() -> "TaskGroup": 

172 """ 

173 Create a task group. 

174 

175 :return: a task group 

176 

177 """ 

178 return get_asynclib().TaskGroup()