Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/lowlevel.py: 48%
79 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:38 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:38 +0000
1from __future__ import annotations
3import enum
4from dataclasses import dataclass
5from typing import Any, Generic, Literal, TypeVar, overload
6from weakref import WeakKeyDictionary
8from ._core._eventloop import get_async_backend
10T = TypeVar("T")
11D = TypeVar("D")
14async def checkpoint() -> None:
15 """
16 Check for cancellation and allow the scheduler to switch to another task.
18 Equivalent to (but more efficient than)::
20 await checkpoint_if_cancelled()
21 await cancel_shielded_checkpoint()
24 .. versionadded:: 3.0
26 """
27 await get_async_backend().checkpoint()
30async def checkpoint_if_cancelled() -> None:
31 """
32 Enter a checkpoint if the enclosing cancel scope has been cancelled.
34 This does not allow the scheduler to switch to a different task.
36 .. versionadded:: 3.0
38 """
39 await get_async_backend().checkpoint_if_cancelled()
42async def cancel_shielded_checkpoint() -> None:
43 """
44 Allow the scheduler to switch to another task but without checking for cancellation.
46 Equivalent to (but potentially more efficient than)::
48 with CancelScope(shield=True):
49 await checkpoint()
52 .. versionadded:: 3.0
54 """
55 await get_async_backend().cancel_shielded_checkpoint()
58def current_token() -> object:
59 """
60 Return a backend specific token object that can be used to get back to the event
61 loop.
63 """
64 return get_async_backend().current_token()
67_run_vars: WeakKeyDictionary[Any, dict[str, Any]] = WeakKeyDictionary()
68_token_wrappers: dict[Any, _TokenWrapper] = {}
71@dataclass(frozen=True)
72class _TokenWrapper:
73 __slots__ = "_token", "__weakref__"
74 _token: object
77class _NoValueSet(enum.Enum):
78 NO_VALUE_SET = enum.auto()
81class RunvarToken(Generic[T]):
82 __slots__ = "_var", "_value", "_redeemed"
84 def __init__(self, var: RunVar[T], value: T | Literal[_NoValueSet.NO_VALUE_SET]):
85 self._var = var
86 self._value: T | Literal[_NoValueSet.NO_VALUE_SET] = value
87 self._redeemed = False
90class RunVar(Generic[T]):
91 """
92 Like a :class:`~contextvars.ContextVar`, except scoped to the running event loop.
93 """
95 __slots__ = "_name", "_default"
97 NO_VALUE_SET: Literal[_NoValueSet.NO_VALUE_SET] = _NoValueSet.NO_VALUE_SET
99 _token_wrappers: set[_TokenWrapper] = set()
101 def __init__(
102 self, name: str, default: T | Literal[_NoValueSet.NO_VALUE_SET] = NO_VALUE_SET
103 ):
104 self._name = name
105 self._default = default
107 @property
108 def _current_vars(self) -> dict[str, T]:
109 token = current_token()
110 try:
111 return _run_vars[token]
112 except KeyError:
113 run_vars = _run_vars[token] = {}
114 return run_vars
116 @overload
117 def get(self, default: D) -> T | D:
118 ...
120 @overload
121 def get(self) -> T:
122 ...
124 def get(
125 self, default: D | Literal[_NoValueSet.NO_VALUE_SET] = NO_VALUE_SET
126 ) -> T | D:
127 try:
128 return self._current_vars[self._name]
129 except KeyError:
130 if default is not RunVar.NO_VALUE_SET:
131 return default
132 elif self._default is not RunVar.NO_VALUE_SET:
133 return self._default
135 raise LookupError(
136 f'Run variable "{self._name}" has no value and no default set'
137 )
139 def set(self, value: T) -> RunvarToken[T]:
140 current_vars = self._current_vars
141 token = RunvarToken(self, current_vars.get(self._name, RunVar.NO_VALUE_SET))
142 current_vars[self._name] = value
143 return token
145 def reset(self, token: RunvarToken[T]) -> None:
146 if token._var is not self:
147 raise ValueError("This token does not belong to this RunVar")
149 if token._redeemed:
150 raise ValueError("This token has already been used")
152 if token._value is _NoValueSet.NO_VALUE_SET:
153 try:
154 del self._current_vars[self._name]
155 except KeyError:
156 pass
157 else:
158 self._current_vars[self._name] = token._value
160 token._redeemed = True
162 def __repr__(self) -> str:
163 return f"<RunVar name={self._name!r}>"