Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/async_timeout/__init__.py: 50%

116 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:40 +0000

1import asyncio 

2import enum 

3import sys 

4import warnings 

5from types import TracebackType 

6from typing import Optional, Type 

7 

8 

9if sys.version_info >= (3, 8): 

10 from typing import final 

11else: 

12 from typing_extensions import final 

13 

14 

15if sys.version_info >= (3, 11): 

16 

17 def _uncancel_task(task: "asyncio.Task[object]") -> None: 

18 task.uncancel() 

19 

20else: 

21 

22 def _uncancel_task(task: "asyncio.Task[object]") -> None: 

23 pass 

24 

25 

26__version__ = "4.0.3" 

27 

28 

29__all__ = ("timeout", "timeout_at", "Timeout") 

30 

31 

32def timeout(delay: Optional[float]) -> "Timeout": 

33 """timeout context manager. 

34 

35 Useful in cases when you want to apply timeout logic around block 

36 of code or in cases when asyncio.wait_for is not suitable. For example: 

37 

38 >>> async with timeout(0.001): 

39 ... async with aiohttp.get('https://github.com') as r: 

40 ... await r.text() 

41 

42 

43 delay - value in seconds or None to disable timeout logic 

44 """ 

45 loop = asyncio.get_running_loop() 

46 if delay is not None: 

47 deadline = loop.time() + delay # type: Optional[float] 

48 else: 

49 deadline = None 

50 return Timeout(deadline, loop) 

51 

52 

53def timeout_at(deadline: Optional[float]) -> "Timeout": 

54 """Schedule the timeout at absolute time. 

55 

56 deadline argument points on the time in the same clock system 

57 as loop.time(). 

58 

59 Please note: it is not POSIX time but a time with 

60 undefined starting base, e.g. the time of the system power on. 

61 

62 >>> async with timeout_at(loop.time() + 10): 

63 ... async with aiohttp.get('https://github.com') as r: 

64 ... await r.text() 

65 

66 

67 """ 

68 loop = asyncio.get_running_loop() 

69 return Timeout(deadline, loop) 

70 

71 

72class _State(enum.Enum): 

73 INIT = "INIT" 

74 ENTER = "ENTER" 

75 TIMEOUT = "TIMEOUT" 

76 EXIT = "EXIT" 

77 

78 

79@final 

80class Timeout: 

81 # Internal class, please don't instantiate it directly 

82 # Use timeout() and timeout_at() public factories instead. 

83 # 

84 # Implementation note: `async with timeout()` is preferred 

85 # over `with timeout()`. 

86 # While technically the Timeout class implementation 

87 # doesn't need to be async at all, 

88 # the `async with` statement explicitly points that 

89 # the context manager should be used from async function context. 

90 # 

91 # This design allows to avoid many silly misusages. 

92 # 

93 # TimeoutError is raised immediately when scheduled 

94 # if the deadline is passed. 

95 # The purpose is to time out as soon as possible 

96 # without waiting for the next await expression. 

97 

98 __slots__ = ("_deadline", "_loop", "_state", "_timeout_handler", "_task") 

99 

100 def __init__( 

101 self, deadline: Optional[float], loop: asyncio.AbstractEventLoop 

102 ) -> None: 

103 self._loop = loop 

104 self._state = _State.INIT 

105 

106 self._task: Optional["asyncio.Task[object]"] = None 

107 self._timeout_handler = None # type: Optional[asyncio.Handle] 

108 if deadline is None: 

109 self._deadline = None # type: Optional[float] 

110 else: 

111 self.update(deadline) 

112 

113 def __enter__(self) -> "Timeout": 

114 warnings.warn( 

115 "with timeout() is deprecated, use async with timeout() instead", 

116 DeprecationWarning, 

117 stacklevel=2, 

118 ) 

119 self._do_enter() 

120 return self 

121 

122 def __exit__( 

123 self, 

124 exc_type: Optional[Type[BaseException]], 

125 exc_val: Optional[BaseException], 

126 exc_tb: Optional[TracebackType], 

127 ) -> Optional[bool]: 

128 self._do_exit(exc_type) 

129 return None 

130 

131 async def __aenter__(self) -> "Timeout": 

132 self._do_enter() 

133 return self 

134 

135 async def __aexit__( 

136 self, 

137 exc_type: Optional[Type[BaseException]], 

138 exc_val: Optional[BaseException], 

139 exc_tb: Optional[TracebackType], 

140 ) -> Optional[bool]: 

141 self._do_exit(exc_type) 

142 return None 

143 

144 @property 

145 def expired(self) -> bool: 

146 """Is timeout expired during execution?""" 

147 return self._state == _State.TIMEOUT 

148 

149 @property 

150 def deadline(self) -> Optional[float]: 

151 return self._deadline 

152 

153 def reject(self) -> None: 

154 """Reject scheduled timeout if any.""" 

155 # cancel is maybe better name but 

156 # task.cancel() raises CancelledError in asyncio world. 

157 if self._state not in (_State.INIT, _State.ENTER): 

158 raise RuntimeError(f"invalid state {self._state.value}") 

159 self._reject() 

160 

161 def _reject(self) -> None: 

162 self._task = None 

163 if self._timeout_handler is not None: 

164 self._timeout_handler.cancel() 

165 self._timeout_handler = None 

166 

167 def shift(self, delay: float) -> None: 

168 """Advance timeout on delay seconds. 

169 

170 The delay can be negative. 

171 

172 Raise RuntimeError if shift is called when deadline is not scheduled 

173 """ 

174 deadline = self._deadline 

175 if deadline is None: 

176 raise RuntimeError("cannot shift timeout if deadline is not scheduled") 

177 self.update(deadline + delay) 

178 

179 def update(self, deadline: float) -> None: 

180 """Set deadline to absolute value. 

181 

182 deadline argument points on the time in the same clock system 

183 as loop.time(). 

184 

185 If new deadline is in the past the timeout is raised immediately. 

186 

187 Please note: it is not POSIX time but a time with 

188 undefined starting base, e.g. the time of the system power on. 

189 """ 

190 if self._state == _State.EXIT: 

191 raise RuntimeError("cannot reschedule after exit from context manager") 

192 if self._state == _State.TIMEOUT: 

193 raise RuntimeError("cannot reschedule expired timeout") 

194 if self._timeout_handler is not None: 

195 self._timeout_handler.cancel() 

196 self._deadline = deadline 

197 if self._state != _State.INIT: 

198 self._reschedule() 

199 

200 def _reschedule(self) -> None: 

201 assert self._state == _State.ENTER 

202 deadline = self._deadline 

203 if deadline is None: 

204 return 

205 

206 now = self._loop.time() 

207 if self._timeout_handler is not None: 

208 self._timeout_handler.cancel() 

209 

210 self._task = asyncio.current_task() 

211 if deadline <= now: 

212 self._timeout_handler = self._loop.call_soon(self._on_timeout) 

213 else: 

214 self._timeout_handler = self._loop.call_at(deadline, self._on_timeout) 

215 

216 def _do_enter(self) -> None: 

217 if self._state != _State.INIT: 

218 raise RuntimeError(f"invalid state {self._state.value}") 

219 self._state = _State.ENTER 

220 self._reschedule() 

221 

222 def _do_exit(self, exc_type: Optional[Type[BaseException]]) -> None: 

223 if exc_type is asyncio.CancelledError and self._state == _State.TIMEOUT: 

224 assert self._task is not None 

225 _uncancel_task(self._task) 

226 self._timeout_handler = None 

227 self._task = None 

228 raise asyncio.TimeoutError 

229 # timeout has not expired 

230 self._state = _State.EXIT 

231 self._reject() 

232 return None 

233 

234 def _on_timeout(self) -> None: 

235 assert self._task is not None 

236 self._task.cancel() 

237 self._state = _State.TIMEOUT 

238 # drop the reference early 

239 self._timeout_handler = None