Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/_core/_tasks.py: 58%

59 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 07:19 +0000

1from __future__ import annotations 

2 

3import math 

4from types import TracebackType 

5from warnings import warn 

6 

7from ..abc._tasks import TaskGroup, TaskStatus 

8from ._compat import ( 

9 DeprecatedAsyncContextManager, 

10 DeprecatedAwaitable, 

11 DeprecatedAwaitableFloat, 

12) 

13from ._eventloop import get_asynclib 

14 

15 

16class _IgnoredTaskStatus(TaskStatus[object]): 

17 def started(self, value: object = None) -> None: 

18 pass 

19 

20 

21TASK_STATUS_IGNORED = _IgnoredTaskStatus() 

22 

23 

24class CancelScope(DeprecatedAsyncContextManager["CancelScope"]): 

25 """ 

26 Wraps a unit of work that can be made separately cancellable. 

27 

28 :param deadline: The time (clock value) when this scope is cancelled automatically 

29 :param shield: ``True`` to shield the cancel scope from external cancellation 

30 """ 

31 

32 def __new__( 

33 cls, *, deadline: float = math.inf, shield: bool = False 

34 ) -> CancelScope: 

35 return get_asynclib().CancelScope(shield=shield, deadline=deadline) 

36 

37 def cancel(self) -> DeprecatedAwaitable: 

38 """Cancel this scope immediately.""" 

39 raise NotImplementedError 

40 

41 @property 

42 def deadline(self) -> float: 

43 """ 

44 The time (clock value) when this scope is cancelled automatically. 

45 

46 Will be ``float('inf')`` if no timeout has been set. 

47 

48 """ 

49 raise NotImplementedError 

50 

51 @deadline.setter 

52 def deadline(self, value: float) -> None: 

53 raise NotImplementedError 

54 

55 @property 

56 def cancel_called(self) -> bool: 

57 """``True`` if :meth:`cancel` has been called.""" 

58 raise NotImplementedError 

59 

60 @property 

61 def shield(self) -> bool: 

62 """ 

63 ``True`` if this scope is shielded from external cancellation. 

64 

65 While a scope is shielded, it will not receive cancellations from outside. 

66 

67 """ 

68 raise NotImplementedError 

69 

70 @shield.setter 

71 def shield(self, value: bool) -> None: 

72 raise NotImplementedError 

73 

74 def __enter__(self) -> CancelScope: 

75 raise NotImplementedError 

76 

77 def __exit__( 

78 self, 

79 exc_type: type[BaseException] | None, 

80 exc_val: BaseException | None, 

81 exc_tb: TracebackType | None, 

82 ) -> bool | None: 

83 raise NotImplementedError 

84 

85 

86def open_cancel_scope(*, shield: bool = False) -> CancelScope: 

87 """ 

88 Open a cancel scope. 

89 

90 :param shield: ``True`` to shield the cancel scope from external cancellation 

91 :return: a cancel scope 

92 

93 .. deprecated:: 3.0 

94 Use :class:`~CancelScope` directly. 

95 

96 """ 

97 warn( 

98 "open_cancel_scope() is deprecated -- use CancelScope() directly", 

99 DeprecationWarning, 

100 ) 

101 return get_asynclib().CancelScope(shield=shield) 

102 

103 

104class FailAfterContextManager(DeprecatedAsyncContextManager[CancelScope]): 

105 def __init__(self, cancel_scope: CancelScope): 

106 self._cancel_scope = cancel_scope 

107 

108 def __enter__(self) -> CancelScope: 

109 return self._cancel_scope.__enter__() 

110 

111 def __exit__( 

112 self, 

113 exc_type: type[BaseException] | None, 

114 exc_val: BaseException | None, 

115 exc_tb: TracebackType | None, 

116 ) -> bool | None: 

117 retval = self._cancel_scope.__exit__(exc_type, exc_val, exc_tb) 

118 if self._cancel_scope.cancel_called: 

119 raise TimeoutError 

120 

121 return retval 

122 

123 

124def fail_after(delay: float | None, shield: bool = False) -> FailAfterContextManager: 

125 """ 

126 Create a context manager which raises a :class:`TimeoutError` if does not finish in time. 

127 

128 :param delay: maximum allowed time (in seconds) before raising the exception, or ``None`` to 

129 disable the timeout 

130 :param shield: ``True`` to shield the cancel scope from external cancellation 

131 :return: a context manager that yields a cancel scope 

132 :rtype: :class:`~typing.ContextManager`\\[:class:`~anyio.abc.CancelScope`\\] 

133 

134 """ 

135 deadline = ( 

136 (get_asynclib().current_time() + delay) if delay is not None else math.inf 

137 ) 

138 cancel_scope = get_asynclib().CancelScope(deadline=deadline, shield=shield) 

139 return FailAfterContextManager(cancel_scope) 

140 

141 

142def move_on_after(delay: float | None, shield: bool = False) -> CancelScope: 

143 """ 

144 Create a cancel scope with a deadline that expires after the given delay. 

145 

146 :param delay: maximum allowed time (in seconds) before exiting the context block, or ``None`` 

147 to disable the timeout 

148 :param shield: ``True`` to shield the cancel scope from external cancellation 

149 :return: a cancel scope 

150 

151 """ 

152 deadline = ( 

153 (get_asynclib().current_time() + delay) if delay is not None else math.inf 

154 ) 

155 return get_asynclib().CancelScope(deadline=deadline, shield=shield) 

156 

157 

158def current_effective_deadline() -> DeprecatedAwaitableFloat: 

159 """ 

160 Return the nearest deadline among all the cancel scopes effective for the current task. 

161 

162 :return: a clock value from the event loop's internal clock (or ``float('inf')`` if 

163 there is no deadline in effect, or ``float('-inf')`` if the current scope has 

164 been cancelled) 

165 :rtype: float 

166 

167 """ 

168 return DeprecatedAwaitableFloat( 

169 get_asynclib().current_effective_deadline(), current_effective_deadline 

170 ) 

171 

172 

173def create_task_group() -> TaskGroup: 

174 """ 

175 Create a task group. 

176 

177 :return: a task group 

178 

179 """ 

180 return get_asynclib().TaskGroup()