Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/lowlevel.py: 47%

85 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:12 +0000

1import enum 

2import sys 

3from dataclasses import dataclass 

4from typing import Any, Dict, Generic, Set, TypeVar, Union, overload 

5from weakref import WeakKeyDictionary 

6 

7from ._core._eventloop import get_asynclib 

8 

9if sys.version_info >= (3, 8): 

10 from typing import Literal 

11else: 

12 from typing_extensions import Literal 

13 

14T = TypeVar("T") 

15D = TypeVar("D") 

16 

17 

18async def checkpoint() -> None: 

19 """ 

20 Check for cancellation and allow the scheduler to switch to another task. 

21 

22 Equivalent to (but more efficient than):: 

23 

24 await checkpoint_if_cancelled() 

25 await cancel_shielded_checkpoint() 

26 

27 .. versionadded:: 3.0 

28 

29 """ 

30 await get_asynclib().checkpoint() 

31 

32 

33async def checkpoint_if_cancelled() -> None: 

34 """ 

35 Enter a checkpoint if the enclosing cancel scope has been cancelled. 

36 

37 This does not allow the scheduler to switch to a different task. 

38 

39 .. versionadded:: 3.0 

40 

41 """ 

42 await get_asynclib().checkpoint_if_cancelled() 

43 

44 

45async def cancel_shielded_checkpoint() -> None: 

46 """ 

47 Allow the scheduler to switch to another task but without checking for cancellation. 

48 

49 Equivalent to (but potentially more efficient than):: 

50 

51 with CancelScope(shield=True): 

52 await checkpoint() 

53 

54 .. versionadded:: 3.0 

55 

56 """ 

57 await get_asynclib().cancel_shielded_checkpoint() 

58 

59 

60def current_token() -> object: 

61 """Return a backend specific token object that can be used to get back to the event loop.""" 

62 return get_asynclib().current_token() 

63 

64 

65_run_vars = WeakKeyDictionary() # type: WeakKeyDictionary[Any, Dict[str, Any]] 

66_token_wrappers: Dict[Any, "_TokenWrapper"] = {} 

67 

68 

69@dataclass(frozen=True) 

70class _TokenWrapper: 

71 __slots__ = "_token", "__weakref__" 

72 _token: object 

73 

74 

75class _NoValueSet(enum.Enum): 

76 NO_VALUE_SET = enum.auto() 

77 

78 

79class RunvarToken(Generic[T]): 

80 __slots__ = "_var", "_value", "_redeemed" 

81 

82 def __init__( 

83 self, var: "RunVar[T]", value: Union[T, Literal[_NoValueSet.NO_VALUE_SET]] 

84 ): 

85 self._var = var 

86 self._value: Union[T, Literal[_NoValueSet.NO_VALUE_SET]] = value 

87 self._redeemed = False 

88 

89 

90class RunVar(Generic[T]): 

91 """Like a :class:`~contextvars.ContextVar`, expect scoped to the running event loop.""" 

92 

93 __slots__ = "_name", "_default" 

94 

95 NO_VALUE_SET: Literal[_NoValueSet.NO_VALUE_SET] = _NoValueSet.NO_VALUE_SET 

96 

97 _token_wrappers: Set[_TokenWrapper] = set() 

98 

99 def __init__( 

100 self, 

101 name: str, 

102 default: Union[T, Literal[_NoValueSet.NO_VALUE_SET]] = NO_VALUE_SET, 

103 ): 

104 self._name = name 

105 self._default = default 

106 

107 @property 

108 def _current_vars(self) -> Dict[str, T]: 

109 token = current_token() 

110 while True: 

111 try: 

112 return _run_vars[token] 

113 except TypeError: 

114 # Happens when token isn't weak referable (TrioToken). 

115 # This workaround does mean that some memory will leak on Trio until the problem 

116 # is fixed on their end. 

117 token = _TokenWrapper(token) 

118 self._token_wrappers.add(token) 

119 except KeyError: 

120 run_vars = _run_vars[token] = {} 

121 return run_vars 

122 

123 @overload 

124 def get(self, default: D) -> Union[T, D]: 

125 ... 

126 

127 @overload 

128 def get(self) -> T: 

129 ... 

130 

131 def get( 

132 self, default: Union[D, Literal[_NoValueSet.NO_VALUE_SET]] = NO_VALUE_SET 

133 ) -> Union[T, D]: 

134 try: 

135 return self._current_vars[self._name] 

136 except KeyError: 

137 if default is not RunVar.NO_VALUE_SET: 

138 return default 

139 elif self._default is not RunVar.NO_VALUE_SET: 

140 return self._default 

141 

142 raise LookupError( 

143 f'Run variable "{self._name}" has no value and no default set' 

144 ) 

145 

146 def set(self, value: T) -> RunvarToken[T]: 

147 current_vars = self._current_vars 

148 token = RunvarToken(self, current_vars.get(self._name, RunVar.NO_VALUE_SET)) 

149 current_vars[self._name] = value 

150 return token 

151 

152 def reset(self, token: RunvarToken[T]) -> None: 

153 if token._var is not self: 

154 raise ValueError("This token does not belong to this RunVar") 

155 

156 if token._redeemed: 

157 raise ValueError("This token has already been used") 

158 

159 if token._value is _NoValueSet.NO_VALUE_SET: 

160 try: 

161 del self._current_vars[self._name] 

162 except KeyError: 

163 pass 

164 else: 

165 self._current_vars[self._name] = token._value 

166 

167 token._redeemed = True 

168 

169 def __repr__(self) -> str: 

170 return f"<RunVar name={self._name!r}>"