Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/anyio/lowlevel.py: 49%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

77 statements  

1from __future__ import annotations 

2 

3import enum 

4from dataclasses import dataclass 

5from typing import Any, Generic, Literal, TypeVar, overload 

6from weakref import WeakKeyDictionary 

7 

8from ._core._eventloop import get_async_backend 

9 

10T = TypeVar("T") 

11D = TypeVar("D") 

12 

13 

14async def checkpoint() -> None: 

15 """ 

16 Check for cancellation and allow the scheduler to switch to another task. 

17 

18 Equivalent to (but more efficient than):: 

19 

20 await checkpoint_if_cancelled() 

21 await cancel_shielded_checkpoint() 

22 

23 

24 .. versionadded:: 3.0 

25 

26 """ 

27 await get_async_backend().checkpoint() 

28 

29 

30async def checkpoint_if_cancelled() -> None: 

31 """ 

32 Enter a checkpoint if the enclosing cancel scope has been cancelled. 

33 

34 This does not allow the scheduler to switch to a different task. 

35 

36 .. versionadded:: 3.0 

37 

38 """ 

39 await get_async_backend().checkpoint_if_cancelled() 

40 

41 

42async def cancel_shielded_checkpoint() -> None: 

43 """ 

44 Allow the scheduler to switch to another task but without checking for cancellation. 

45 

46 Equivalent to (but potentially more efficient than):: 

47 

48 with CancelScope(shield=True): 

49 await checkpoint() 

50 

51 

52 .. versionadded:: 3.0 

53 

54 """ 

55 await get_async_backend().cancel_shielded_checkpoint() 

56 

57 

58def current_token() -> object: 

59 """ 

60 Return a backend specific token object that can be used to get back to the event 

61 loop. 

62 

63 """ 

64 return get_async_backend().current_token() 

65 

66 

67_run_vars: WeakKeyDictionary[Any, dict[str, Any]] = WeakKeyDictionary() 

68_token_wrappers: dict[Any, _TokenWrapper] = {} 

69 

70 

71@dataclass(frozen=True) 

72class _TokenWrapper: 

73 __slots__ = "_token", "__weakref__" 

74 _token: object 

75 

76 

77class _NoValueSet(enum.Enum): 

78 NO_VALUE_SET = enum.auto() 

79 

80 

81class RunvarToken(Generic[T]): 

82 __slots__ = "_var", "_value", "_redeemed" 

83 

84 def __init__(self, var: RunVar[T], value: T | Literal[_NoValueSet.NO_VALUE_SET]): 

85 self._var = var 

86 self._value: T | Literal[_NoValueSet.NO_VALUE_SET] = value 

87 self._redeemed = False 

88 

89 

90class RunVar(Generic[T]): 

91 """ 

92 Like a :class:`~contextvars.ContextVar`, except scoped to the running event loop. 

93 """ 

94 

95 __slots__ = "_name", "_default" 

96 

97 NO_VALUE_SET: Literal[_NoValueSet.NO_VALUE_SET] = _NoValueSet.NO_VALUE_SET 

98 

99 _token_wrappers: set[_TokenWrapper] = set() 

100 

101 def __init__( 

102 self, name: str, default: T | Literal[_NoValueSet.NO_VALUE_SET] = NO_VALUE_SET 

103 ): 

104 self._name = name 

105 self._default = default 

106 

107 @property 

108 def _current_vars(self) -> dict[str, T]: 

109 token = current_token() 

110 try: 

111 return _run_vars[token] 

112 except KeyError: 

113 run_vars = _run_vars[token] = {} 

114 return run_vars 

115 

116 @overload 

117 def get(self, default: D) -> T | D: ... 

118 

119 @overload 

120 def get(self) -> T: ... 

121 

122 def get( 

123 self, default: D | Literal[_NoValueSet.NO_VALUE_SET] = NO_VALUE_SET 

124 ) -> T | D: 

125 try: 

126 return self._current_vars[self._name] 

127 except KeyError: 

128 if default is not RunVar.NO_VALUE_SET: 

129 return default 

130 elif self._default is not RunVar.NO_VALUE_SET: 

131 return self._default 

132 

133 raise LookupError( 

134 f'Run variable "{self._name}" has no value and no default set' 

135 ) 

136 

137 def set(self, value: T) -> RunvarToken[T]: 

138 current_vars = self._current_vars 

139 token = RunvarToken(self, current_vars.get(self._name, RunVar.NO_VALUE_SET)) 

140 current_vars[self._name] = value 

141 return token 

142 

143 def reset(self, token: RunvarToken[T]) -> None: 

144 if token._var is not self: 

145 raise ValueError("This token does not belong to this RunVar") 

146 

147 if token._redeemed: 

148 raise ValueError("This token has already been used") 

149 

150 if token._value is _NoValueSet.NO_VALUE_SET: 

151 try: 

152 del self._current_vars[self._name] 

153 except KeyError: 

154 pass 

155 else: 

156 self._current_vars[self._name] = token._value 

157 

158 token._redeemed = True 

159 

160 def __repr__(self) -> str: 

161 return f"<RunVar name={self._name!r}>"