Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/anyio/lowlevel.py: 49%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3__all__ = (
4 "EventLoopToken",
5 "RunvarToken",
6 "RunVar",
7 "checkpoint",
8 "checkpoint_if_cancelled",
9 "cancel_shielded_checkpoint",
10 "current_token",
11)
13import enum
14from dataclasses import dataclass
15from types import TracebackType
16from typing import Any, Generic, Literal, TypeVar, final, overload
17from weakref import WeakKeyDictionary
19from ._core._eventloop import get_async_backend
20from .abc import AsyncBackend
22T = TypeVar("T")
23D = TypeVar("D")
26async def checkpoint() -> None:
27 """
28 Check for cancellation and allow the scheduler to switch to another task.
30 Equivalent to (but more efficient than)::
32 await checkpoint_if_cancelled()
33 await cancel_shielded_checkpoint()
36 .. versionadded:: 3.0
38 """
39 await get_async_backend().checkpoint()
42async def checkpoint_if_cancelled() -> None:
43 """
44 Enter a checkpoint if the enclosing cancel scope has been cancelled.
46 This does not allow the scheduler to switch to a different task.
48 .. versionadded:: 3.0
50 """
51 await get_async_backend().checkpoint_if_cancelled()
54async def cancel_shielded_checkpoint() -> None:
55 """
56 Allow the scheduler to switch to another task but without checking for cancellation.
58 Equivalent to (but potentially more efficient than)::
60 with CancelScope(shield=True):
61 await checkpoint()
64 .. versionadded:: 3.0
66 """
67 await get_async_backend().cancel_shielded_checkpoint()
70@final
71@dataclass(frozen=True, repr=False)
72class EventLoopToken:
73 """
74 An opaque object that holds a reference to an event loop.
76 .. versionadded:: 4.11.0
77 """
79 backend_class: type[AsyncBackend]
80 native_token: object
83def current_token() -> EventLoopToken:
84 """
85 Return a token object that can be used to call code in the current event loop from
86 another thread.
88 .. versionadded:: 4.11.0
90 """
91 backend_class = get_async_backend()
92 raw_token = backend_class.current_token()
93 return EventLoopToken(backend_class, raw_token)
96_run_vars: WeakKeyDictionary[object, dict[RunVar[Any], Any]] = WeakKeyDictionary()
99class _NoValueSet(enum.Enum):
100 NO_VALUE_SET = enum.auto()
103class RunvarToken(Generic[T]):
104 __slots__ = "_var", "_value", "_redeemed"
106 def __init__(self, var: RunVar[T], value: T | Literal[_NoValueSet.NO_VALUE_SET]):
107 self._var = var
108 self._value: T | Literal[_NoValueSet.NO_VALUE_SET] = value
109 self._redeemed = False
111 def __enter__(self) -> RunvarToken[T]:
112 return self
114 def __exit__(
115 self,
116 exc_type: type[BaseException] | None,
117 exc_val: BaseException | None,
118 exc_tb: TracebackType | None,
119 ) -> None:
120 self._var.reset(self)
123class RunVar(Generic[T]):
124 """
125 Like a :class:`~contextvars.ContextVar`, except scoped to the running event loop.
127 Can be used as a context manager, Just like :class:`~contextvars.ContextVar`, that
128 will reset the variable to its previous value when the context block is exited.
129 """
131 __slots__ = "_name", "_default"
133 NO_VALUE_SET: Literal[_NoValueSet.NO_VALUE_SET] = _NoValueSet.NO_VALUE_SET
135 def __init__(
136 self, name: str, default: T | Literal[_NoValueSet.NO_VALUE_SET] = NO_VALUE_SET
137 ):
138 self._name = name
139 self._default = default
141 @property
142 def _current_vars(self) -> dict[RunVar[T], T]:
143 native_token = current_token().native_token
144 try:
145 return _run_vars[native_token]
146 except KeyError:
147 run_vars = _run_vars[native_token] = {}
148 return run_vars
150 @overload
151 def get(self, default: D) -> T | D: ...
153 @overload
154 def get(self) -> T: ...
156 def get(
157 self, default: D | Literal[_NoValueSet.NO_VALUE_SET] = NO_VALUE_SET
158 ) -> T | D:
159 try:
160 return self._current_vars[self]
161 except KeyError:
162 if default is not RunVar.NO_VALUE_SET:
163 return default
164 elif self._default is not RunVar.NO_VALUE_SET:
165 return self._default
167 raise LookupError(
168 f'Run variable "{self._name}" has no value and no default set'
169 )
171 def set(self, value: T) -> RunvarToken[T]:
172 current_vars = self._current_vars
173 token = RunvarToken(self, current_vars.get(self, RunVar.NO_VALUE_SET))
174 current_vars[self] = value
175 return token
177 def reset(self, token: RunvarToken[T]) -> None:
178 if token._var is not self:
179 raise ValueError("This token does not belong to this RunVar")
181 if token._redeemed:
182 raise ValueError("This token has already been used")
184 if token._value is _NoValueSet.NO_VALUE_SET:
185 try:
186 del self._current_vars[self]
187 except KeyError:
188 pass
189 else:
190 self._current_vars[self] = token._value
192 token._redeemed = True
194 def __repr__(self) -> str:
195 return f"<RunVar name={self._name!r}>"