Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/lowlevel.py: 48%
86 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:19 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:19 +0000
1from __future__ import annotations
3import enum
4import sys
5from dataclasses import dataclass
6from typing import Any, Generic, TypeVar, overload
7from weakref import WeakKeyDictionary
9from ._core._eventloop import get_asynclib
11if sys.version_info >= (3, 8):
12 from typing import Literal
13else:
14 from typing_extensions import Literal
16T = TypeVar("T")
17D = TypeVar("D")
20async def checkpoint() -> None:
21 """
22 Check for cancellation and allow the scheduler to switch to another task.
24 Equivalent to (but more efficient than)::
26 await checkpoint_if_cancelled()
27 await cancel_shielded_checkpoint()
30 .. versionadded:: 3.0
32 """
33 await get_asynclib().checkpoint()
36async def checkpoint_if_cancelled() -> None:
37 """
38 Enter a checkpoint if the enclosing cancel scope has been cancelled.
40 This does not allow the scheduler to switch to a different task.
42 .. versionadded:: 3.0
44 """
45 await get_asynclib().checkpoint_if_cancelled()
48async def cancel_shielded_checkpoint() -> None:
49 """
50 Allow the scheduler to switch to another task but without checking for cancellation.
52 Equivalent to (but potentially more efficient than)::
54 with CancelScope(shield=True):
55 await checkpoint()
58 .. versionadded:: 3.0
60 """
61 await get_asynclib().cancel_shielded_checkpoint()
64def current_token() -> object:
65 """Return a backend specific token object that can be used to get back to the event loop."""
66 return get_asynclib().current_token()
69_run_vars: WeakKeyDictionary[Any, dict[str, Any]] = WeakKeyDictionary()
70_token_wrappers: dict[Any, _TokenWrapper] = {}
73@dataclass(frozen=True)
74class _TokenWrapper:
75 __slots__ = "_token", "__weakref__"
76 _token: object
79class _NoValueSet(enum.Enum):
80 NO_VALUE_SET = enum.auto()
83class RunvarToken(Generic[T]):
84 __slots__ = "_var", "_value", "_redeemed"
86 def __init__(self, var: RunVar[T], value: T | Literal[_NoValueSet.NO_VALUE_SET]):
87 self._var = var
88 self._value: T | Literal[_NoValueSet.NO_VALUE_SET] = value
89 self._redeemed = False
92class RunVar(Generic[T]):
93 """
94 Like a :class:`~contextvars.ContextVar`, except scoped to the running event loop.
95 """
97 __slots__ = "_name", "_default"
99 NO_VALUE_SET: Literal[_NoValueSet.NO_VALUE_SET] = _NoValueSet.NO_VALUE_SET
101 _token_wrappers: set[_TokenWrapper] = set()
103 def __init__(
104 self,
105 name: str,
106 default: T | Literal[_NoValueSet.NO_VALUE_SET] = NO_VALUE_SET,
107 ):
108 self._name = name
109 self._default = default
111 @property
112 def _current_vars(self) -> dict[str, T]:
113 token = current_token()
114 while True:
115 try:
116 return _run_vars[token]
117 except TypeError:
118 # Happens when token isn't weak referable (TrioToken).
119 # This workaround does mean that some memory will leak on Trio until the problem
120 # is fixed on their end.
121 token = _TokenWrapper(token)
122 self._token_wrappers.add(token)
123 except KeyError:
124 run_vars = _run_vars[token] = {}
125 return run_vars
127 @overload
128 def get(self, default: D) -> T | D:
129 ...
131 @overload
132 def get(self) -> T:
133 ...
135 def get(
136 self, default: D | Literal[_NoValueSet.NO_VALUE_SET] = NO_VALUE_SET
137 ) -> T | D:
138 try:
139 return self._current_vars[self._name]
140 except KeyError:
141 if default is not RunVar.NO_VALUE_SET:
142 return default
143 elif self._default is not RunVar.NO_VALUE_SET:
144 return self._default
146 raise LookupError(
147 f'Run variable "{self._name}" has no value and no default set'
148 )
150 def set(self, value: T) -> RunvarToken[T]:
151 current_vars = self._current_vars
152 token = RunvarToken(self, current_vars.get(self._name, RunVar.NO_VALUE_SET))
153 current_vars[self._name] = value
154 return token
156 def reset(self, token: RunvarToken[T]) -> None:
157 if token._var is not self:
158 raise ValueError("This token does not belong to this RunVar")
160 if token._redeemed:
161 raise ValueError("This token has already been used")
163 if token._value is _NoValueSet.NO_VALUE_SET:
164 try:
165 del self._current_vars[self._name]
166 except KeyError:
167 pass
168 else:
169 self._current_vars[self._name] = token._value
171 token._redeemed = True
173 def __repr__(self) -> str:
174 return f"<RunVar name={self._name!r}>"