Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/anyio/lowlevel.py: 49%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

85 statements  

1from __future__ import annotations 

2 

3__all__ = ( 

4 "EventLoopToken", 

5 "RunvarToken", 

6 "RunVar", 

7 "checkpoint", 

8 "checkpoint_if_cancelled", 

9 "cancel_shielded_checkpoint", 

10 "current_token", 

11) 

12 

13import enum 

14from dataclasses import dataclass 

15from types import TracebackType 

16from typing import Any, Generic, Literal, TypeVar, final, overload 

17from weakref import WeakKeyDictionary 

18 

19from ._core._eventloop import get_async_backend 

20from .abc import AsyncBackend 

21 

22T = TypeVar("T") 

23D = TypeVar("D") 

24 

25 

26async def checkpoint() -> None: 

27 """ 

28 Check for cancellation and allow the scheduler to switch to another task. 

29 

30 Equivalent to (but more efficient than):: 

31 

32 await checkpoint_if_cancelled() 

33 await cancel_shielded_checkpoint() 

34 

35 .. versionadded:: 3.0 

36 

37 """ 

38 await get_async_backend().checkpoint() 

39 

40 

41async def checkpoint_if_cancelled() -> None: 

42 """ 

43 Enter a checkpoint if the enclosing cancel scope has been cancelled. 

44 

45 This does not allow the scheduler to switch to a different task. 

46 

47 .. versionadded:: 3.0 

48 

49 """ 

50 await get_async_backend().checkpoint_if_cancelled() 

51 

52 

53async def cancel_shielded_checkpoint() -> None: 

54 """ 

55 Allow the scheduler to switch to another task but without checking for cancellation. 

56 

57 Equivalent to (but potentially more efficient than):: 

58 

59 with CancelScope(shield=True): 

60 await checkpoint() 

61 

62 .. versionadded:: 3.0 

63 

64 """ 

65 await get_async_backend().cancel_shielded_checkpoint() 

66 

67 

68@final 

69@dataclass(frozen=True, repr=False) 

70class EventLoopToken: 

71 """ 

72 An opaque object that holds a reference to an event loop. 

73 

74 .. versionadded:: 4.11.0 

75 """ 

76 

77 backend_class: type[AsyncBackend] 

78 native_token: object 

79 

80 

81def current_token() -> EventLoopToken: 

82 """ 

83 Return a token object that can be used to call code in the current event loop from 

84 another thread. 

85 

86 :raises NoEventLoopError: if no supported asynchronous event loop is running in the 

87 current thread 

88 

89 .. versionadded:: 4.11.0 

90 

91 """ 

92 backend_class = get_async_backend() 

93 raw_token = backend_class.current_token() 

94 return EventLoopToken(backend_class, raw_token) 

95 

96 

97_run_vars: WeakKeyDictionary[object, dict[RunVar[Any], Any]] = WeakKeyDictionary() 

98 

99 

100class _NoValueSet(enum.Enum): 

101 NO_VALUE_SET = enum.auto() 

102 

103 

104class RunvarToken(Generic[T]): 

105 __slots__ = "_var", "_value", "_redeemed" 

106 

107 def __init__(self, var: RunVar[T], value: T | Literal[_NoValueSet.NO_VALUE_SET]): 

108 self._var = var 

109 self._value: T | Literal[_NoValueSet.NO_VALUE_SET] = value 

110 self._redeemed = False 

111 

112 def __enter__(self) -> RunvarToken[T]: 

113 return self 

114 

115 def __exit__( 

116 self, 

117 exc_type: type[BaseException] | None, 

118 exc_val: BaseException | None, 

119 exc_tb: TracebackType | None, 

120 ) -> None: 

121 self._var.reset(self) 

122 

123 

124class RunVar(Generic[T]): 

125 """ 

126 Like a :class:`~contextvars.ContextVar`, except scoped to the running event loop. 

127 

128 Can be used as a context manager, Just like :class:`~contextvars.ContextVar`, that 

129 will reset the variable to its previous value when the context block is exited. 

130 """ 

131 

132 __slots__ = "_name", "_default" 

133 

134 NO_VALUE_SET: Literal[_NoValueSet.NO_VALUE_SET] = _NoValueSet.NO_VALUE_SET 

135 

136 def __init__( 

137 self, name: str, default: T | Literal[_NoValueSet.NO_VALUE_SET] = NO_VALUE_SET 

138 ): 

139 self._name = name 

140 self._default = default 

141 

142 @property 

143 def _current_vars(self) -> dict[RunVar[T], T]: 

144 native_token = current_token().native_token 

145 try: 

146 return _run_vars[native_token] 

147 except KeyError: 

148 run_vars = _run_vars[native_token] = {} 

149 return run_vars 

150 

151 @overload 

152 def get(self, default: D) -> T | D: ... 

153 

154 @overload 

155 def get(self) -> T: ... 

156 

157 def get( 

158 self, default: D | Literal[_NoValueSet.NO_VALUE_SET] = NO_VALUE_SET 

159 ) -> T | D: 

160 try: 

161 return self._current_vars[self] 

162 except KeyError: 

163 if default is not RunVar.NO_VALUE_SET: 

164 return default 

165 elif self._default is not RunVar.NO_VALUE_SET: 

166 return self._default 

167 

168 raise LookupError( 

169 f'Run variable "{self._name}" has no value and no default set' 

170 ) 

171 

172 def set(self, value: T) -> RunvarToken[T]: 

173 current_vars = self._current_vars 

174 token = RunvarToken(self, current_vars.get(self, RunVar.NO_VALUE_SET)) 

175 current_vars[self] = value 

176 return token 

177 

178 def reset(self, token: RunvarToken[T]) -> None: 

179 if token._var is not self: 

180 raise ValueError("This token does not belong to this RunVar") 

181 

182 if token._redeemed: 

183 raise ValueError("This token has already been used") 

184 

185 if token._value is _NoValueSet.NO_VALUE_SET: 

186 try: 

187 del self._current_vars[self] 

188 except KeyError: 

189 pass 

190 else: 

191 self._current_vars[self] = token._value 

192 

193 token._redeemed = True 

194 

195 def __repr__(self) -> str: 

196 return f"<RunVar name={self._name!r}>"