Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/anyio/lowlevel.py: 48%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

79 statements  

1from __future__ import annotations 

2 

3import enum 

4from dataclasses import dataclass 

5from typing import Any, Generic, Literal, TypeVar, final, overload 

6from weakref import WeakKeyDictionary 

7 

8from ._core._eventloop import get_async_backend 

9from .abc import AsyncBackend 

10 

11T = TypeVar("T") 

12D = TypeVar("D") 

13 

14 

15async def checkpoint() -> None: 

16 """ 

17 Check for cancellation and allow the scheduler to switch to another task. 

18 

19 Equivalent to (but more efficient than):: 

20 

21 await checkpoint_if_cancelled() 

22 await cancel_shielded_checkpoint() 

23 

24 

25 .. versionadded:: 3.0 

26 

27 """ 

28 await get_async_backend().checkpoint() 

29 

30 

31async def checkpoint_if_cancelled() -> None: 

32 """ 

33 Enter a checkpoint if the enclosing cancel scope has been cancelled. 

34 

35 This does not allow the scheduler to switch to a different task. 

36 

37 .. versionadded:: 3.0 

38 

39 """ 

40 await get_async_backend().checkpoint_if_cancelled() 

41 

42 

43async def cancel_shielded_checkpoint() -> None: 

44 """ 

45 Allow the scheduler to switch to another task but without checking for cancellation. 

46 

47 Equivalent to (but potentially more efficient than):: 

48 

49 with CancelScope(shield=True): 

50 await checkpoint() 

51 

52 

53 .. versionadded:: 3.0 

54 

55 """ 

56 await get_async_backend().cancel_shielded_checkpoint() 

57 

58 

59@final 

60@dataclass(frozen=True, repr=False) 

61class EventLoopToken: 

62 """ 

63 An opaque object that holds a reference to an event loop. 

64 

65 .. versionadded:: 4.11.0 

66 """ 

67 

68 backend_class: type[AsyncBackend] 

69 native_token: object 

70 

71 

72def current_token() -> EventLoopToken: 

73 """ 

74 Return a token object that can be used to call code in the current event loop from 

75 another thread. 

76 

77 .. versionadded:: 4.11.0 

78 

79 """ 

80 backend_class = get_async_backend() 

81 raw_token = backend_class.current_token() 

82 return EventLoopToken(backend_class, raw_token) 

83 

84 

85_run_vars: WeakKeyDictionary[object, dict[RunVar[Any], Any]] = WeakKeyDictionary() 

86 

87 

88class _NoValueSet(enum.Enum): 

89 NO_VALUE_SET = enum.auto() 

90 

91 

92class RunvarToken(Generic[T]): 

93 __slots__ = "_var", "_value", "_redeemed" 

94 

95 def __init__(self, var: RunVar[T], value: T | Literal[_NoValueSet.NO_VALUE_SET]): 

96 self._var = var 

97 self._value: T | Literal[_NoValueSet.NO_VALUE_SET] = value 

98 self._redeemed = False 

99 

100 

101class RunVar(Generic[T]): 

102 """ 

103 Like a :class:`~contextvars.ContextVar`, except scoped to the running event loop. 

104 """ 

105 

106 __slots__ = "_name", "_default" 

107 

108 NO_VALUE_SET: Literal[_NoValueSet.NO_VALUE_SET] = _NoValueSet.NO_VALUE_SET 

109 

110 def __init__( 

111 self, name: str, default: T | Literal[_NoValueSet.NO_VALUE_SET] = NO_VALUE_SET 

112 ): 

113 self._name = name 

114 self._default = default 

115 

116 @property 

117 def _current_vars(self) -> dict[RunVar[T], T]: 

118 native_token = current_token().native_token 

119 try: 

120 return _run_vars[native_token] 

121 except KeyError: 

122 run_vars = _run_vars[native_token] = {} 

123 return run_vars 

124 

125 @overload 

126 def get(self, default: D) -> T | D: ... 

127 

128 @overload 

129 def get(self) -> T: ... 

130 

131 def get( 

132 self, default: D | Literal[_NoValueSet.NO_VALUE_SET] = NO_VALUE_SET 

133 ) -> T | D: 

134 try: 

135 return self._current_vars[self] 

136 except KeyError: 

137 if default is not RunVar.NO_VALUE_SET: 

138 return default 

139 elif self._default is not RunVar.NO_VALUE_SET: 

140 return self._default 

141 

142 raise LookupError( 

143 f'Run variable "{self._name}" has no value and no default set' 

144 ) 

145 

146 def set(self, value: T) -> RunvarToken[T]: 

147 current_vars = self._current_vars 

148 token = RunvarToken(self, current_vars.get(self, RunVar.NO_VALUE_SET)) 

149 current_vars[self] = value 

150 return token 

151 

152 def reset(self, token: RunvarToken[T]) -> None: 

153 if token._var is not self: 

154 raise ValueError("This token does not belong to this RunVar") 

155 

156 if token._redeemed: 

157 raise ValueError("This token has already been used") 

158 

159 if token._value is _NoValueSet.NO_VALUE_SET: 

160 try: 

161 del self._current_vars[self] 

162 except KeyError: 

163 pass 

164 else: 

165 self._current_vars[self] = token._value 

166 

167 token._redeemed = True 

168 

169 def __repr__(self) -> str: 

170 return f"<RunVar name={self._name!r}>"