Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/_core/_tasks.py: 60%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

53 statements  

1from __future__ import annotations 

2 

3import math 

4from collections.abc import Generator 

5from contextlib import contextmanager 

6from types import TracebackType 

7 

8from ..abc._tasks import TaskGroup, TaskStatus 

9from ._eventloop import get_async_backend 

10 

11 

12class _IgnoredTaskStatus(TaskStatus[object]): 

13 def started(self, value: object = None) -> None: 

14 pass 

15 

16 

17TASK_STATUS_IGNORED = _IgnoredTaskStatus() 

18 

19 

20class CancelScope: 

21 """ 

22 Wraps a unit of work that can be made separately cancellable. 

23 

24 :param deadline: The time (clock value) when this scope is cancelled automatically 

25 :param shield: ``True`` to shield the cancel scope from external cancellation 

26 """ 

27 

28 def __new__( 

29 cls, *, deadline: float = math.inf, shield: bool = False 

30 ) -> CancelScope: 

31 return get_async_backend().create_cancel_scope(shield=shield, deadline=deadline) 

32 

33 def cancel(self) -> None: 

34 """Cancel this scope immediately.""" 

35 raise NotImplementedError 

36 

37 @property 

38 def deadline(self) -> float: 

39 """ 

40 The time (clock value) when this scope is cancelled automatically. 

41 

42 Will be ``float('inf')`` if no timeout has been set. 

43 

44 """ 

45 raise NotImplementedError 

46 

47 @deadline.setter 

48 def deadline(self, value: float) -> None: 

49 raise NotImplementedError 

50 

51 @property 

52 def cancel_called(self) -> bool: 

53 """``True`` if :meth:`cancel` has been called.""" 

54 raise NotImplementedError 

55 

56 @property 

57 def cancelled_caught(self) -> bool: 

58 """ 

59 ``True`` if this scope suppressed a cancellation exception it itself raised. 

60 

61 This is typically used to check if any work was interrupted, or to see if the 

62 scope was cancelled due to its deadline being reached. The value will, however, 

63 only be ``True`` if the cancellation was triggered by the scope itself (and not 

64 an outer scope). 

65 

66 """ 

67 raise NotImplementedError 

68 

69 @property 

70 def shield(self) -> bool: 

71 """ 

72 ``True`` if this scope is shielded from external cancellation. 

73 

74 While a scope is shielded, it will not receive cancellations from outside. 

75 

76 """ 

77 raise NotImplementedError 

78 

79 @shield.setter 

80 def shield(self, value: bool) -> None: 

81 raise NotImplementedError 

82 

83 def __enter__(self) -> CancelScope: 

84 raise NotImplementedError 

85 

86 def __exit__( 

87 self, 

88 exc_type: type[BaseException] | None, 

89 exc_val: BaseException | None, 

90 exc_tb: TracebackType | None, 

91 ) -> bool | None: 

92 raise NotImplementedError 

93 

94 

95@contextmanager 

96def fail_after( 

97 delay: float | None, shield: bool = False 

98) -> Generator[CancelScope, None, None]: 

99 """ 

100 Create a context manager which raises a :class:`TimeoutError` if does not finish in 

101 time. 

102 

103 :param delay: maximum allowed time (in seconds) before raising the exception, or 

104 ``None`` to disable the timeout 

105 :param shield: ``True`` to shield the cancel scope from external cancellation 

106 :return: a context manager that yields a cancel scope 

107 :rtype: :class:`~typing.ContextManager`\\[:class:`~anyio.CancelScope`\\] 

108 

109 """ 

110 current_time = get_async_backend().current_time 

111 deadline = (current_time() + delay) if delay is not None else math.inf 

112 with get_async_backend().create_cancel_scope( 

113 deadline=deadline, shield=shield 

114 ) as cancel_scope: 

115 yield cancel_scope 

116 

117 if cancel_scope.cancelled_caught and current_time() >= cancel_scope.deadline: 

118 raise TimeoutError 

119 

120 

121def move_on_after(delay: float | None, shield: bool = False) -> CancelScope: 

122 """ 

123 Create a cancel scope with a deadline that expires after the given delay. 

124 

125 :param delay: maximum allowed time (in seconds) before exiting the context block, or 

126 ``None`` to disable the timeout 

127 :param shield: ``True`` to shield the cancel scope from external cancellation 

128 :return: a cancel scope 

129 

130 """ 

131 deadline = ( 

132 (get_async_backend().current_time() + delay) if delay is not None else math.inf 

133 ) 

134 return get_async_backend().create_cancel_scope(deadline=deadline, shield=shield) 

135 

136 

137def current_effective_deadline() -> float: 

138 """ 

139 Return the nearest deadline among all the cancel scopes effective for the current 

140 task. 

141 

142 :return: a clock value from the event loop's internal clock (or ``float('inf')`` if 

143 there is no deadline in effect, or ``float('-inf')`` if the current scope has 

144 been cancelled) 

145 :rtype: float 

146 

147 """ 

148 return get_async_backend().current_effective_deadline() 

149 

150 

151def create_task_group() -> TaskGroup: 

152 """ 

153 Create a task group. 

154 

155 :return: a task group 

156 

157 """ 

158 return get_async_backend().create_task_group()