1from __future__ import annotations
2
3import math
4from collections.abc import Generator
5from contextlib import contextmanager
6from types import TracebackType
7
8from ..abc._tasks import TaskGroup, TaskStatus
9from ._eventloop import get_async_backend
10
11
12class _IgnoredTaskStatus(TaskStatus[object]):
13 def started(self, value: object = None) -> None:
14 pass
15
16
17TASK_STATUS_IGNORED = _IgnoredTaskStatus()
18
19
20class CancelScope:
21 """
22 Wraps a unit of work that can be made separately cancellable.
23
24 :param deadline: The time (clock value) when this scope is cancelled automatically
25 :param shield: ``True`` to shield the cancel scope from external cancellation
26 """
27
28 def __new__(
29 cls, *, deadline: float = math.inf, shield: bool = False
30 ) -> CancelScope:
31 return get_async_backend().create_cancel_scope(shield=shield, deadline=deadline)
32
33 def cancel(self, reason: str | None = None) -> None:
34 """
35 Cancel this scope immediately.
36
37 :param reason: a message describing the reason for the cancellation
38
39 """
40 raise NotImplementedError
41
42 @property
43 def deadline(self) -> float:
44 """
45 The time (clock value) when this scope is cancelled automatically.
46
47 Will be ``float('inf')`` if no timeout has been set.
48
49 """
50 raise NotImplementedError
51
52 @deadline.setter
53 def deadline(self, value: float) -> None:
54 raise NotImplementedError
55
56 @property
57 def cancel_called(self) -> bool:
58 """``True`` if :meth:`cancel` has been called."""
59 raise NotImplementedError
60
61 @property
62 def cancelled_caught(self) -> bool:
63 """
64 ``True`` if this scope suppressed a cancellation exception it itself raised.
65
66 This is typically used to check if any work was interrupted, or to see if the
67 scope was cancelled due to its deadline being reached. The value will, however,
68 only be ``True`` if the cancellation was triggered by the scope itself (and not
69 an outer scope).
70
71 """
72 raise NotImplementedError
73
74 @property
75 def shield(self) -> bool:
76 """
77 ``True`` if this scope is shielded from external cancellation.
78
79 While a scope is shielded, it will not receive cancellations from outside.
80
81 """
82 raise NotImplementedError
83
84 @shield.setter
85 def shield(self, value: bool) -> None:
86 raise NotImplementedError
87
88 def __enter__(self) -> CancelScope:
89 raise NotImplementedError
90
91 def __exit__(
92 self,
93 exc_type: type[BaseException] | None,
94 exc_val: BaseException | None,
95 exc_tb: TracebackType | None,
96 ) -> bool:
97 raise NotImplementedError
98
99
100@contextmanager
101def fail_after(
102 delay: float | None, shield: bool = False
103) -> Generator[CancelScope, None, None]:
104 """
105 Create a context manager which raises a :class:`TimeoutError` if does not finish in
106 time.
107
108 :param delay: maximum allowed time (in seconds) before raising the exception, or
109 ``None`` to disable the timeout
110 :param shield: ``True`` to shield the cancel scope from external cancellation
111 :return: a context manager that yields a cancel scope
112 :rtype: :class:`~typing.ContextManager`\\[:class:`~anyio.CancelScope`\\]
113
114 """
115 current_time = get_async_backend().current_time
116 deadline = (current_time() + delay) if delay is not None else math.inf
117 with get_async_backend().create_cancel_scope(
118 deadline=deadline, shield=shield
119 ) as cancel_scope:
120 yield cancel_scope
121
122 if cancel_scope.cancelled_caught and current_time() >= cancel_scope.deadline:
123 raise TimeoutError
124
125
126def move_on_after(delay: float | None, shield: bool = False) -> CancelScope:
127 """
128 Create a cancel scope with a deadline that expires after the given delay.
129
130 :param delay: maximum allowed time (in seconds) before exiting the context block, or
131 ``None`` to disable the timeout
132 :param shield: ``True`` to shield the cancel scope from external cancellation
133 :return: a cancel scope
134
135 """
136 deadline = (
137 (get_async_backend().current_time() + delay) if delay is not None else math.inf
138 )
139 return get_async_backend().create_cancel_scope(deadline=deadline, shield=shield)
140
141
142def current_effective_deadline() -> float:
143 """
144 Return the nearest deadline among all the cancel scopes effective for the current
145 task.
146
147 :return: a clock value from the event loop's internal clock (or ``float('inf')`` if
148 there is no deadline in effect, or ``float('-inf')`` if the current scope has
149 been cancelled)
150 :rtype: float
151
152 """
153 return get_async_backend().current_effective_deadline()
154
155
156def create_task_group() -> TaskGroup:
157 """
158 Create a task group.
159
160 :return: a task group
161
162 """
163 return get_async_backend().create_task_group()