Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/anyio/_core/_tasks.py: 60%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

53 statements  

1from __future__ import annotations 

2 

3import math 

4from collections.abc import Generator 

5from contextlib import contextmanager 

6from types import TracebackType 

7 

8from ..abc._tasks import TaskGroup, TaskStatus 

9from ._eventloop import get_async_backend 

10 

11 

12class _IgnoredTaskStatus(TaskStatus[object]): 

13 def started(self, value: object = None) -> None: 

14 pass 

15 

16 

17TASK_STATUS_IGNORED = _IgnoredTaskStatus() 

18 

19 

20class CancelScope: 

21 """ 

22 Wraps a unit of work that can be made separately cancellable. 

23 

24 :param deadline: The time (clock value) when this scope is cancelled automatically 

25 :param shield: ``True`` to shield the cancel scope from external cancellation 

26 :raises NoEventLoopError: if no supported asynchronous event loop is running in the 

27 current thread 

28 """ 

29 

30 def __new__( 

31 cls, *, deadline: float = math.inf, shield: bool = False 

32 ) -> CancelScope: 

33 return get_async_backend().create_cancel_scope(shield=shield, deadline=deadline) 

34 

35 def cancel(self, reason: str | None = None) -> None: 

36 """ 

37 Cancel this scope immediately. 

38 

39 :param reason: a message describing the reason for the cancellation 

40 

41 """ 

42 raise NotImplementedError 

43 

44 @property 

45 def deadline(self) -> float: 

46 """ 

47 The time (clock value) when this scope is cancelled automatically. 

48 

49 Will be ``float('inf')`` if no timeout has been set. 

50 

51 """ 

52 raise NotImplementedError 

53 

54 @deadline.setter 

55 def deadline(self, value: float) -> None: 

56 raise NotImplementedError 

57 

58 @property 

59 def cancel_called(self) -> bool: 

60 """``True`` if :meth:`cancel` has been called.""" 

61 raise NotImplementedError 

62 

63 @property 

64 def cancelled_caught(self) -> bool: 

65 """ 

66 ``True`` if this scope suppressed a cancellation exception it itself raised. 

67 

68 This is typically used to check if any work was interrupted, or to see if the 

69 scope was cancelled due to its deadline being reached. The value will, however, 

70 only be ``True`` if the cancellation was triggered by the scope itself (and not 

71 an outer scope). 

72 

73 """ 

74 raise NotImplementedError 

75 

76 @property 

77 def shield(self) -> bool: 

78 """ 

79 ``True`` if this scope is shielded from external cancellation. 

80 

81 While a scope is shielded, it will not receive cancellations from outside. 

82 

83 """ 

84 raise NotImplementedError 

85 

86 @shield.setter 

87 def shield(self, value: bool) -> None: 

88 raise NotImplementedError 

89 

90 def __enter__(self) -> CancelScope: 

91 raise NotImplementedError 

92 

93 def __exit__( 

94 self, 

95 exc_type: type[BaseException] | None, 

96 exc_val: BaseException | None, 

97 exc_tb: TracebackType | None, 

98 ) -> bool: 

99 raise NotImplementedError 

100 

101 

102@contextmanager 

103def fail_after( 

104 delay: float | None, shield: bool = False 

105) -> Generator[CancelScope, None, None]: 

106 """ 

107 Create a context manager which raises a :class:`TimeoutError` if does not finish in 

108 time. 

109 

110 :param delay: maximum allowed time (in seconds) before raising the exception, or 

111 ``None`` to disable the timeout 

112 :param shield: ``True`` to shield the cancel scope from external cancellation 

113 :return: a context manager that yields a cancel scope 

114 :rtype: :class:`~typing.ContextManager`\\[:class:`~anyio.CancelScope`\\] 

115 :raises NoEventLoopError: if no supported asynchronous event loop is running in the 

116 current thread 

117 

118 """ 

119 current_time = get_async_backend().current_time 

120 deadline = (current_time() + delay) if delay is not None else math.inf 

121 with get_async_backend().create_cancel_scope( 

122 deadline=deadline, shield=shield 

123 ) as cancel_scope: 

124 yield cancel_scope 

125 

126 if cancel_scope.cancelled_caught and current_time() >= cancel_scope.deadline: 

127 raise TimeoutError 

128 

129 

130def move_on_after(delay: float | None, shield: bool = False) -> CancelScope: 

131 """ 

132 Create a cancel scope with a deadline that expires after the given delay. 

133 

134 :param delay: maximum allowed time (in seconds) before exiting the context block, or 

135 ``None`` to disable the timeout 

136 :param shield: ``True`` to shield the cancel scope from external cancellation 

137 :return: a cancel scope 

138 :raises NoEventLoopError: if no supported asynchronous event loop is running in the 

139 current thread 

140 

141 """ 

142 deadline = ( 

143 (get_async_backend().current_time() + delay) if delay is not None else math.inf 

144 ) 

145 return get_async_backend().create_cancel_scope(deadline=deadline, shield=shield) 

146 

147 

148def current_effective_deadline() -> float: 

149 """ 

150 Return the nearest deadline among all the cancel scopes effective for the current 

151 task. 

152 

153 :return: a clock value from the event loop's internal clock (or ``float('inf')`` if 

154 there is no deadline in effect, or ``float('-inf')`` if the current scope has 

155 been cancelled) 

156 :rtype: float 

157 :raises NoEventLoopError: if no supported asynchronous event loop is running in the 

158 current thread 

159 

160 """ 

161 return get_async_backend().current_effective_deadline() 

162 

163 

164def create_task_group() -> TaskGroup: 

165 """ 

166 Create a task group. 

167 

168 :return: a task group 

169 :raises NoEventLoopError: if no supported asynchronous event loop is running in the 

170 current thread 

171 

172 """ 

173 return get_async_backend().create_task_group()