1from __future__ import annotations
2
3import math
4from collections.abc import Generator
5from contextlib import contextmanager
6from types import TracebackType
7
8from ..abc._tasks import TaskGroup, TaskStatus
9from ._eventloop import get_async_backend
10
11
12class _IgnoredTaskStatus(TaskStatus[object]):
13 def started(self, value: object = None) -> None:
14 pass
15
16
17TASK_STATUS_IGNORED = _IgnoredTaskStatus()
18
19
20class CancelScope:
21 """
22 Wraps a unit of work that can be made separately cancellable.
23
24 :param deadline: The time (clock value) when this scope is cancelled automatically
25 :param shield: ``True`` to shield the cancel scope from external cancellation
26 """
27
28 def __new__(
29 cls, *, deadline: float = math.inf, shield: bool = False
30 ) -> CancelScope:
31 return get_async_backend().create_cancel_scope(shield=shield, deadline=deadline)
32
33 def cancel(self) -> None:
34 """Cancel this scope immediately."""
35 raise NotImplementedError
36
37 @property
38 def deadline(self) -> float:
39 """
40 The time (clock value) when this scope is cancelled automatically.
41
42 Will be ``float('inf')`` if no timeout has been set.
43
44 """
45 raise NotImplementedError
46
47 @deadline.setter
48 def deadline(self, value: float) -> None:
49 raise NotImplementedError
50
51 @property
52 def cancel_called(self) -> bool:
53 """``True`` if :meth:`cancel` has been called."""
54 raise NotImplementedError
55
56 @property
57 def cancelled_caught(self) -> bool:
58 """
59 ``True`` if this scope suppressed a cancellation exception it itself raised.
60
61 This is typically used to check if any work was interrupted, or to see if the
62 scope was cancelled due to its deadline being reached. The value will, however,
63 only be ``True`` if the cancellation was triggered by the scope itself (and not
64 an outer scope).
65
66 """
67 raise NotImplementedError
68
69 @property
70 def shield(self) -> bool:
71 """
72 ``True`` if this scope is shielded from external cancellation.
73
74 While a scope is shielded, it will not receive cancellations from outside.
75
76 """
77 raise NotImplementedError
78
79 @shield.setter
80 def shield(self, value: bool) -> None:
81 raise NotImplementedError
82
83 def __enter__(self) -> CancelScope:
84 raise NotImplementedError
85
86 def __exit__(
87 self,
88 exc_type: type[BaseException] | None,
89 exc_val: BaseException | None,
90 exc_tb: TracebackType | None,
91 ) -> bool | None:
92 raise NotImplementedError
93
94
95@contextmanager
96def fail_after(
97 delay: float | None, shield: bool = False
98) -> Generator[CancelScope, None, None]:
99 """
100 Create a context manager which raises a :class:`TimeoutError` if does not finish in
101 time.
102
103 :param delay: maximum allowed time (in seconds) before raising the exception, or
104 ``None`` to disable the timeout
105 :param shield: ``True`` to shield the cancel scope from external cancellation
106 :return: a context manager that yields a cancel scope
107 :rtype: :class:`~typing.ContextManager`\\[:class:`~anyio.CancelScope`\\]
108
109 """
110 current_time = get_async_backend().current_time
111 deadline = (current_time() + delay) if delay is not None else math.inf
112 with get_async_backend().create_cancel_scope(
113 deadline=deadline, shield=shield
114 ) as cancel_scope:
115 yield cancel_scope
116
117 if cancel_scope.cancelled_caught and current_time() >= cancel_scope.deadline:
118 raise TimeoutError
119
120
121def move_on_after(delay: float | None, shield: bool = False) -> CancelScope:
122 """
123 Create a cancel scope with a deadline that expires after the given delay.
124
125 :param delay: maximum allowed time (in seconds) before exiting the context block, or
126 ``None`` to disable the timeout
127 :param shield: ``True`` to shield the cancel scope from external cancellation
128 :return: a cancel scope
129
130 """
131 deadline = (
132 (get_async_backend().current_time() + delay) if delay is not None else math.inf
133 )
134 return get_async_backend().create_cancel_scope(deadline=deadline, shield=shield)
135
136
137def current_effective_deadline() -> float:
138 """
139 Return the nearest deadline among all the cancel scopes effective for the current
140 task.
141
142 :return: a clock value from the event loop's internal clock (or ``float('inf')`` if
143 there is no deadline in effect, or ``float('-inf')`` if the current scope has
144 been cancelled)
145 :rtype: float
146
147 """
148 return get_async_backend().current_effective_deadline()
149
150
151def create_task_group() -> TaskGroup:
152 """
153 Create a task group.
154
155 :return: a task group
156
157 """
158 return get_async_backend().create_task_group()