Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/lowlevel.py: 48%

86 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 07:19 +0000

1from __future__ import annotations 

2 

3import enum 

4import sys 

5from dataclasses import dataclass 

6from typing import Any, Generic, TypeVar, overload 

7from weakref import WeakKeyDictionary 

8 

9from ._core._eventloop import get_asynclib 

10 

11if sys.version_info >= (3, 8): 

12 from typing import Literal 

13else: 

14 from typing_extensions import Literal 

15 

16T = TypeVar("T") 

17D = TypeVar("D") 

18 

19 

20async def checkpoint() -> None: 

21 """ 

22 Check for cancellation and allow the scheduler to switch to another task. 

23 

24 Equivalent to (but more efficient than):: 

25 

26 await checkpoint_if_cancelled() 

27 await cancel_shielded_checkpoint() 

28 

29 

30 .. versionadded:: 3.0 

31 

32 """ 

33 await get_asynclib().checkpoint() 

34 

35 

36async def checkpoint_if_cancelled() -> None: 

37 """ 

38 Enter a checkpoint if the enclosing cancel scope has been cancelled. 

39 

40 This does not allow the scheduler to switch to a different task. 

41 

42 .. versionadded:: 3.0 

43 

44 """ 

45 await get_asynclib().checkpoint_if_cancelled() 

46 

47 

48async def cancel_shielded_checkpoint() -> None: 

49 """ 

50 Allow the scheduler to switch to another task but without checking for cancellation. 

51 

52 Equivalent to (but potentially more efficient than):: 

53 

54 with CancelScope(shield=True): 

55 await checkpoint() 

56 

57 

58 .. versionadded:: 3.0 

59 

60 """ 

61 await get_asynclib().cancel_shielded_checkpoint() 

62 

63 

64def current_token() -> object: 

65 """Return a backend specific token object that can be used to get back to the event loop.""" 

66 return get_asynclib().current_token() 

67 

68 

69_run_vars: WeakKeyDictionary[Any, dict[str, Any]] = WeakKeyDictionary() 

70_token_wrappers: dict[Any, _TokenWrapper] = {} 

71 

72 

73@dataclass(frozen=True) 

74class _TokenWrapper: 

75 __slots__ = "_token", "__weakref__" 

76 _token: object 

77 

78 

79class _NoValueSet(enum.Enum): 

80 NO_VALUE_SET = enum.auto() 

81 

82 

83class RunvarToken(Generic[T]): 

84 __slots__ = "_var", "_value", "_redeemed" 

85 

86 def __init__(self, var: RunVar[T], value: T | Literal[_NoValueSet.NO_VALUE_SET]): 

87 self._var = var 

88 self._value: T | Literal[_NoValueSet.NO_VALUE_SET] = value 

89 self._redeemed = False 

90 

91 

92class RunVar(Generic[T]): 

93 """ 

94 Like a :class:`~contextvars.ContextVar`, except scoped to the running event loop. 

95 """ 

96 

97 __slots__ = "_name", "_default" 

98 

99 NO_VALUE_SET: Literal[_NoValueSet.NO_VALUE_SET] = _NoValueSet.NO_VALUE_SET 

100 

101 _token_wrappers: set[_TokenWrapper] = set() 

102 

103 def __init__( 

104 self, 

105 name: str, 

106 default: T | Literal[_NoValueSet.NO_VALUE_SET] = NO_VALUE_SET, 

107 ): 

108 self._name = name 

109 self._default = default 

110 

111 @property 

112 def _current_vars(self) -> dict[str, T]: 

113 token = current_token() 

114 while True: 

115 try: 

116 return _run_vars[token] 

117 except TypeError: 

118 # Happens when token isn't weak referable (TrioToken). 

119 # This workaround does mean that some memory will leak on Trio until the problem 

120 # is fixed on their end. 

121 token = _TokenWrapper(token) 

122 self._token_wrappers.add(token) 

123 except KeyError: 

124 run_vars = _run_vars[token] = {} 

125 return run_vars 

126 

127 @overload 

128 def get(self, default: D) -> T | D: 

129 ... 

130 

131 @overload 

132 def get(self) -> T: 

133 ... 

134 

135 def get( 

136 self, default: D | Literal[_NoValueSet.NO_VALUE_SET] = NO_VALUE_SET 

137 ) -> T | D: 

138 try: 

139 return self._current_vars[self._name] 

140 except KeyError: 

141 if default is not RunVar.NO_VALUE_SET: 

142 return default 

143 elif self._default is not RunVar.NO_VALUE_SET: 

144 return self._default 

145 

146 raise LookupError( 

147 f'Run variable "{self._name}" has no value and no default set' 

148 ) 

149 

150 def set(self, value: T) -> RunvarToken[T]: 

151 current_vars = self._current_vars 

152 token = RunvarToken(self, current_vars.get(self._name, RunVar.NO_VALUE_SET)) 

153 current_vars[self._name] = value 

154 return token 

155 

156 def reset(self, token: RunvarToken[T]) -> None: 

157 if token._var is not self: 

158 raise ValueError("This token does not belong to this RunVar") 

159 

160 if token._redeemed: 

161 raise ValueError("This token has already been used") 

162 

163 if token._value is _NoValueSet.NO_VALUE_SET: 

164 try: 

165 del self._current_vars[self._name] 

166 except KeyError: 

167 pass 

168 else: 

169 self._current_vars[self._name] = token._value 

170 

171 token._redeemed = True 

172 

173 def __repr__(self) -> str: 

174 return f"<RunVar name={self._name!r}>"