Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/anyio/lowlevel.py: 49%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3__all__ = (
4 "EventLoopToken",
5 "RunvarToken",
6 "RunVar",
7 "checkpoint",
8 "checkpoint_if_cancelled",
9 "cancel_shielded_checkpoint",
10 "current_token",
11)
13import enum
14from dataclasses import dataclass
15from types import TracebackType
16from typing import Any, Generic, Literal, TypeVar, final, overload
17from weakref import WeakKeyDictionary
19from ._core._eventloop import get_async_backend
20from .abc import AsyncBackend
22T = TypeVar("T")
23D = TypeVar("D")
26async def checkpoint() -> None:
27 """
28 Check for cancellation and allow the scheduler to switch to another task.
30 Equivalent to (but more efficient than)::
32 await checkpoint_if_cancelled()
33 await cancel_shielded_checkpoint()
35 .. versionadded:: 3.0
37 """
38 await get_async_backend().checkpoint()
41async def checkpoint_if_cancelled() -> None:
42 """
43 Enter a checkpoint if the enclosing cancel scope has been cancelled.
45 This does not allow the scheduler to switch to a different task.
47 .. versionadded:: 3.0
49 """
50 await get_async_backend().checkpoint_if_cancelled()
53async def cancel_shielded_checkpoint() -> None:
54 """
55 Allow the scheduler to switch to another task but without checking for cancellation.
57 Equivalent to (but potentially more efficient than)::
59 with CancelScope(shield=True):
60 await checkpoint()
62 .. versionadded:: 3.0
64 """
65 await get_async_backend().cancel_shielded_checkpoint()
68@final
69@dataclass(frozen=True, repr=False)
70class EventLoopToken:
71 """
72 An opaque object that holds a reference to an event loop.
74 .. versionadded:: 4.11.0
75 """
77 backend_class: type[AsyncBackend]
78 native_token: object
81def current_token() -> EventLoopToken:
82 """
83 Return a token object that can be used to call code in the current event loop from
84 another thread.
86 :raises NoEventLoopError: if no supported asynchronous event loop is running in the
87 current thread
89 .. versionadded:: 4.11.0
91 """
92 backend_class = get_async_backend()
93 raw_token = backend_class.current_token()
94 return EventLoopToken(backend_class, raw_token)
97_run_vars: WeakKeyDictionary[object, dict[RunVar[Any], Any]] = WeakKeyDictionary()
100class _NoValueSet(enum.Enum):
101 NO_VALUE_SET = enum.auto()
104class RunvarToken(Generic[T]):
105 __slots__ = "_var", "_value", "_redeemed"
107 def __init__(self, var: RunVar[T], value: T | Literal[_NoValueSet.NO_VALUE_SET]):
108 self._var = var
109 self._value: T | Literal[_NoValueSet.NO_VALUE_SET] = value
110 self._redeemed = False
112 def __enter__(self) -> RunvarToken[T]:
113 return self
115 def __exit__(
116 self,
117 exc_type: type[BaseException] | None,
118 exc_val: BaseException | None,
119 exc_tb: TracebackType | None,
120 ) -> None:
121 self._var.reset(self)
124class RunVar(Generic[T]):
125 """
126 Like a :class:`~contextvars.ContextVar`, except scoped to the running event loop.
128 Can be used as a context manager, Just like :class:`~contextvars.ContextVar`, that
129 will reset the variable to its previous value when the context block is exited.
130 """
132 __slots__ = "_name", "_default"
134 NO_VALUE_SET: Literal[_NoValueSet.NO_VALUE_SET] = _NoValueSet.NO_VALUE_SET
136 def __init__(
137 self, name: str, default: T | Literal[_NoValueSet.NO_VALUE_SET] = NO_VALUE_SET
138 ):
139 self._name = name
140 self._default = default
142 @property
143 def _current_vars(self) -> dict[RunVar[T], T]:
144 native_token = current_token().native_token
145 try:
146 return _run_vars[native_token]
147 except KeyError:
148 run_vars = _run_vars[native_token] = {}
149 return run_vars
151 @overload
152 def get(self, default: D) -> T | D: ...
154 @overload
155 def get(self) -> T: ...
157 def get(
158 self, default: D | Literal[_NoValueSet.NO_VALUE_SET] = NO_VALUE_SET
159 ) -> T | D:
160 try:
161 return self._current_vars[self]
162 except KeyError:
163 if default is not RunVar.NO_VALUE_SET:
164 return default
165 elif self._default is not RunVar.NO_VALUE_SET:
166 return self._default
168 raise LookupError(
169 f'Run variable "{self._name}" has no value and no default set'
170 )
172 def set(self, value: T) -> RunvarToken[T]:
173 current_vars = self._current_vars
174 token = RunvarToken(self, current_vars.get(self, RunVar.NO_VALUE_SET))
175 current_vars[self] = value
176 return token
178 def reset(self, token: RunvarToken[T]) -> None:
179 if token._var is not self:
180 raise ValueError("This token does not belong to this RunVar")
182 if token._redeemed:
183 raise ValueError("This token has already been used")
185 if token._value is _NoValueSet.NO_VALUE_SET:
186 try:
187 del self._current_vars[self]
188 except KeyError:
189 pass
190 else:
191 self._current_vars[self] = token._value
193 token._redeemed = True
195 def __repr__(self) -> str:
196 return f"<RunVar name={self._name!r}>"