Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/anyio/lowlevel.py: 48%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import enum
4from dataclasses import dataclass
5from typing import Any, Generic, Literal, TypeVar, final, overload
6from weakref import WeakKeyDictionary
8from ._core._eventloop import get_async_backend
9from .abc import AsyncBackend
11T = TypeVar("T")
12D = TypeVar("D")
15async def checkpoint() -> None:
16 """
17 Check for cancellation and allow the scheduler to switch to another task.
19 Equivalent to (but more efficient than)::
21 await checkpoint_if_cancelled()
22 await cancel_shielded_checkpoint()
25 .. versionadded:: 3.0
27 """
28 await get_async_backend().checkpoint()
31async def checkpoint_if_cancelled() -> None:
32 """
33 Enter a checkpoint if the enclosing cancel scope has been cancelled.
35 This does not allow the scheduler to switch to a different task.
37 .. versionadded:: 3.0
39 """
40 await get_async_backend().checkpoint_if_cancelled()
43async def cancel_shielded_checkpoint() -> None:
44 """
45 Allow the scheduler to switch to another task but without checking for cancellation.
47 Equivalent to (but potentially more efficient than)::
49 with CancelScope(shield=True):
50 await checkpoint()
53 .. versionadded:: 3.0
55 """
56 await get_async_backend().cancel_shielded_checkpoint()
59@final
60@dataclass(frozen=True, repr=False)
61class EventLoopToken:
62 """
63 An opaque object that holds a reference to an event loop.
65 .. versionadded:: 4.11.0
66 """
68 backend_class: type[AsyncBackend]
69 native_token: object
72def current_token() -> EventLoopToken:
73 """
74 Return a token object that can be used to call code in the current event loop from
75 another thread.
77 .. versionadded:: 4.11.0
79 """
80 backend_class = get_async_backend()
81 raw_token = backend_class.current_token()
82 return EventLoopToken(backend_class, raw_token)
85_run_vars: WeakKeyDictionary[object, dict[RunVar[Any], Any]] = WeakKeyDictionary()
88class _NoValueSet(enum.Enum):
89 NO_VALUE_SET = enum.auto()
92class RunvarToken(Generic[T]):
93 __slots__ = "_var", "_value", "_redeemed"
95 def __init__(self, var: RunVar[T], value: T | Literal[_NoValueSet.NO_VALUE_SET]):
96 self._var = var
97 self._value: T | Literal[_NoValueSet.NO_VALUE_SET] = value
98 self._redeemed = False
101class RunVar(Generic[T]):
102 """
103 Like a :class:`~contextvars.ContextVar`, except scoped to the running event loop.
104 """
106 __slots__ = "_name", "_default"
108 NO_VALUE_SET: Literal[_NoValueSet.NO_VALUE_SET] = _NoValueSet.NO_VALUE_SET
110 def __init__(
111 self, name: str, default: T | Literal[_NoValueSet.NO_VALUE_SET] = NO_VALUE_SET
112 ):
113 self._name = name
114 self._default = default
116 @property
117 def _current_vars(self) -> dict[RunVar[T], T]:
118 native_token = current_token().native_token
119 try:
120 return _run_vars[native_token]
121 except KeyError:
122 run_vars = _run_vars[native_token] = {}
123 return run_vars
125 @overload
126 def get(self, default: D) -> T | D: ...
128 @overload
129 def get(self) -> T: ...
131 def get(
132 self, default: D | Literal[_NoValueSet.NO_VALUE_SET] = NO_VALUE_SET
133 ) -> T | D:
134 try:
135 return self._current_vars[self]
136 except KeyError:
137 if default is not RunVar.NO_VALUE_SET:
138 return default
139 elif self._default is not RunVar.NO_VALUE_SET:
140 return self._default
142 raise LookupError(
143 f'Run variable "{self._name}" has no value and no default set'
144 )
146 def set(self, value: T) -> RunvarToken[T]:
147 current_vars = self._current_vars
148 token = RunvarToken(self, current_vars.get(self, RunVar.NO_VALUE_SET))
149 current_vars[self] = value
150 return token
152 def reset(self, token: RunvarToken[T]) -> None:
153 if token._var is not self:
154 raise ValueError("This token does not belong to this RunVar")
156 if token._redeemed:
157 raise ValueError("This token has already been used")
159 if token._value is _NoValueSet.NO_VALUE_SET:
160 try:
161 del self._current_vars[self]
162 except KeyError:
163 pass
164 else:
165 self._current_vars[self] = token._value
167 token._redeemed = True
169 def __repr__(self) -> str:
170 return f"<RunVar name={self._name!r}>"