Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/lowlevel.py: 47%
85 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
1import enum
2import sys
3from dataclasses import dataclass
4from typing import Any, Dict, Generic, Set, TypeVar, Union, overload
5from weakref import WeakKeyDictionary
7from ._core._eventloop import get_asynclib
9if sys.version_info >= (3, 8):
10 from typing import Literal
11else:
12 from typing_extensions import Literal
14T = TypeVar("T")
15D = TypeVar("D")
18async def checkpoint() -> None:
19 """
20 Check for cancellation and allow the scheduler to switch to another task.
22 Equivalent to (but more efficient than)::
24 await checkpoint_if_cancelled()
25 await cancel_shielded_checkpoint()
27 .. versionadded:: 3.0
29 """
30 await get_asynclib().checkpoint()
33async def checkpoint_if_cancelled() -> None:
34 """
35 Enter a checkpoint if the enclosing cancel scope has been cancelled.
37 This does not allow the scheduler to switch to a different task.
39 .. versionadded:: 3.0
41 """
42 await get_asynclib().checkpoint_if_cancelled()
45async def cancel_shielded_checkpoint() -> None:
46 """
47 Allow the scheduler to switch to another task but without checking for cancellation.
49 Equivalent to (but potentially more efficient than)::
51 with CancelScope(shield=True):
52 await checkpoint()
54 .. versionadded:: 3.0
56 """
57 await get_asynclib().cancel_shielded_checkpoint()
60def current_token() -> object:
61 """Return a backend specific token object that can be used to get back to the event loop."""
62 return get_asynclib().current_token()
65_run_vars = WeakKeyDictionary() # type: WeakKeyDictionary[Any, Dict[str, Any]]
66_token_wrappers: Dict[Any, "_TokenWrapper"] = {}
69@dataclass(frozen=True)
70class _TokenWrapper:
71 __slots__ = "_token", "__weakref__"
72 _token: object
75class _NoValueSet(enum.Enum):
76 NO_VALUE_SET = enum.auto()
79class RunvarToken(Generic[T]):
80 __slots__ = "_var", "_value", "_redeemed"
82 def __init__(
83 self, var: "RunVar[T]", value: Union[T, Literal[_NoValueSet.NO_VALUE_SET]]
84 ):
85 self._var = var
86 self._value: Union[T, Literal[_NoValueSet.NO_VALUE_SET]] = value
87 self._redeemed = False
90class RunVar(Generic[T]):
91 """Like a :class:`~contextvars.ContextVar`, expect scoped to the running event loop."""
93 __slots__ = "_name", "_default"
95 NO_VALUE_SET: Literal[_NoValueSet.NO_VALUE_SET] = _NoValueSet.NO_VALUE_SET
97 _token_wrappers: Set[_TokenWrapper] = set()
99 def __init__(
100 self,
101 name: str,
102 default: Union[T, Literal[_NoValueSet.NO_VALUE_SET]] = NO_VALUE_SET,
103 ):
104 self._name = name
105 self._default = default
107 @property
108 def _current_vars(self) -> Dict[str, T]:
109 token = current_token()
110 while True:
111 try:
112 return _run_vars[token]
113 except TypeError:
114 # Happens when token isn't weak referable (TrioToken).
115 # This workaround does mean that some memory will leak on Trio until the problem
116 # is fixed on their end.
117 token = _TokenWrapper(token)
118 self._token_wrappers.add(token)
119 except KeyError:
120 run_vars = _run_vars[token] = {}
121 return run_vars
123 @overload
124 def get(self, default: D) -> Union[T, D]:
125 ...
127 @overload
128 def get(self) -> T:
129 ...
131 def get(
132 self, default: Union[D, Literal[_NoValueSet.NO_VALUE_SET]] = NO_VALUE_SET
133 ) -> Union[T, D]:
134 try:
135 return self._current_vars[self._name]
136 except KeyError:
137 if default is not RunVar.NO_VALUE_SET:
138 return default
139 elif self._default is not RunVar.NO_VALUE_SET:
140 return self._default
142 raise LookupError(
143 f'Run variable "{self._name}" has no value and no default set'
144 )
146 def set(self, value: T) -> RunvarToken[T]:
147 current_vars = self._current_vars
148 token = RunvarToken(self, current_vars.get(self._name, RunVar.NO_VALUE_SET))
149 current_vars[self._name] = value
150 return token
152 def reset(self, token: RunvarToken[T]) -> None:
153 if token._var is not self:
154 raise ValueError("This token does not belong to this RunVar")
156 if token._redeemed:
157 raise ValueError("This token has already been used")
159 if token._value is _NoValueSet.NO_VALUE_SET:
160 try:
161 del self._current_vars[self._name]
162 except KeyError:
163 pass
164 else:
165 self._current_vars[self._name] = token._value
167 token._redeemed = True
169 def __repr__(self) -> str:
170 return f"<RunVar name={self._name!r}>"