Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/async_timeout/__init__.py: 35%

118 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:52 +0000

1import asyncio 

2import enum 

3import sys 

4import warnings 

5from types import TracebackType 

6from typing import Any, Optional, Type 

7 

8 

9if sys.version_info >= (3, 8): 

10 from typing import final 

11else: 

12 from typing_extensions import final 

13 

14 

15__version__ = "4.0.2" 

16 

17 

18__all__ = ("timeout", "timeout_at", "Timeout") 

19 

20 

21def timeout(delay: Optional[float]) -> "Timeout": 

22 """timeout context manager. 

23 

24 Useful in cases when you want to apply timeout logic around block 

25 of code or in cases when asyncio.wait_for is not suitable. For example: 

26 

27 >>> async with timeout(0.001): 

28 ... async with aiohttp.get('https://github.com') as r: 

29 ... await r.text() 

30 

31 

32 delay - value in seconds or None to disable timeout logic 

33 """ 

34 loop = _get_running_loop() 

35 if delay is not None: 

36 deadline = loop.time() + delay # type: Optional[float] 

37 else: 

38 deadline = None 

39 return Timeout(deadline, loop) 

40 

41 

42def timeout_at(deadline: Optional[float]) -> "Timeout": 

43 """Schedule the timeout at absolute time. 

44 

45 deadline argument points on the time in the same clock system 

46 as loop.time(). 

47 

48 Please note: it is not POSIX time but a time with 

49 undefined starting base, e.g. the time of the system power on. 

50 

51 >>> async with timeout_at(loop.time() + 10): 

52 ... async with aiohttp.get('https://github.com') as r: 

53 ... await r.text() 

54 

55 

56 """ 

57 loop = _get_running_loop() 

58 return Timeout(deadline, loop) 

59 

60 

61class _State(enum.Enum): 

62 INIT = "INIT" 

63 ENTER = "ENTER" 

64 TIMEOUT = "TIMEOUT" 

65 EXIT = "EXIT" 

66 

67 

68@final 

69class Timeout: 

70 # Internal class, please don't instantiate it directly 

71 # Use timeout() and timeout_at() public factories instead. 

72 # 

73 # Implementation note: `async with timeout()` is preferred 

74 # over `with timeout()`. 

75 # While technically the Timeout class implementation 

76 # doesn't need to be async at all, 

77 # the `async with` statement explicitly points that 

78 # the context manager should be used from async function context. 

79 # 

80 # This design allows to avoid many silly misusages. 

81 # 

82 # TimeoutError is raised immadiatelly when scheduled 

83 # if the deadline is passed. 

84 # The purpose is to time out as sson as possible 

85 # without waiting for the next await expression. 

86 

87 __slots__ = ("_deadline", "_loop", "_state", "_timeout_handler") 

88 

89 def __init__( 

90 self, deadline: Optional[float], loop: asyncio.AbstractEventLoop 

91 ) -> None: 

92 self._loop = loop 

93 self._state = _State.INIT 

94 

95 self._timeout_handler = None # type: Optional[asyncio.Handle] 

96 if deadline is None: 

97 self._deadline = None # type: Optional[float] 

98 else: 

99 self.update(deadline) 

100 

101 def __enter__(self) -> "Timeout": 

102 warnings.warn( 

103 "with timeout() is deprecated, use async with timeout() instead", 

104 DeprecationWarning, 

105 stacklevel=2, 

106 ) 

107 self._do_enter() 

108 return self 

109 

110 def __exit__( 

111 self, 

112 exc_type: Optional[Type[BaseException]], 

113 exc_val: Optional[BaseException], 

114 exc_tb: Optional[TracebackType], 

115 ) -> Optional[bool]: 

116 self._do_exit(exc_type) 

117 return None 

118 

119 async def __aenter__(self) -> "Timeout": 

120 self._do_enter() 

121 return self 

122 

123 async def __aexit__( 

124 self, 

125 exc_type: Optional[Type[BaseException]], 

126 exc_val: Optional[BaseException], 

127 exc_tb: Optional[TracebackType], 

128 ) -> Optional[bool]: 

129 self._do_exit(exc_type) 

130 return None 

131 

132 @property 

133 def expired(self) -> bool: 

134 """Is timeout expired during execution?""" 

135 return self._state == _State.TIMEOUT 

136 

137 @property 

138 def deadline(self) -> Optional[float]: 

139 return self._deadline 

140 

141 def reject(self) -> None: 

142 """Reject scheduled timeout if any.""" 

143 # cancel is maybe better name but 

144 # task.cancel() raises CancelledError in asyncio world. 

145 if self._state not in (_State.INIT, _State.ENTER): 

146 raise RuntimeError(f"invalid state {self._state.value}") 

147 self._reject() 

148 

149 def _reject(self) -> None: 

150 if self._timeout_handler is not None: 

151 self._timeout_handler.cancel() 

152 self._timeout_handler = None 

153 

154 def shift(self, delay: float) -> None: 

155 """Advance timeout on delay seconds. 

156 

157 The delay can be negative. 

158 

159 Raise RuntimeError if shift is called when deadline is not scheduled 

160 """ 

161 deadline = self._deadline 

162 if deadline is None: 

163 raise RuntimeError("cannot shift timeout if deadline is not scheduled") 

164 self.update(deadline + delay) 

165 

166 def update(self, deadline: float) -> None: 

167 """Set deadline to absolute value. 

168 

169 deadline argument points on the time in the same clock system 

170 as loop.time(). 

171 

172 If new deadline is in the past the timeout is raised immediatelly. 

173 

174 Please note: it is not POSIX time but a time with 

175 undefined starting base, e.g. the time of the system power on. 

176 """ 

177 if self._state == _State.EXIT: 

178 raise RuntimeError("cannot reschedule after exit from context manager") 

179 if self._state == _State.TIMEOUT: 

180 raise RuntimeError("cannot reschedule expired timeout") 

181 if self._timeout_handler is not None: 

182 self._timeout_handler.cancel() 

183 self._deadline = deadline 

184 if self._state != _State.INIT: 

185 self._reschedule() 

186 

187 def _reschedule(self) -> None: 

188 assert self._state == _State.ENTER 

189 deadline = self._deadline 

190 if deadline is None: 

191 return 

192 

193 now = self._loop.time() 

194 if self._timeout_handler is not None: 

195 self._timeout_handler.cancel() 

196 

197 task = _current_task(self._loop) 

198 if deadline <= now: 

199 self._timeout_handler = self._loop.call_soon(self._on_timeout, task) 

200 else: 

201 self._timeout_handler = self._loop.call_at(deadline, self._on_timeout, task) 

202 

203 def _do_enter(self) -> None: 

204 if self._state != _State.INIT: 

205 raise RuntimeError(f"invalid state {self._state.value}") 

206 self._state = _State.ENTER 

207 self._reschedule() 

208 

209 def _do_exit(self, exc_type: Optional[Type[BaseException]]) -> None: 

210 if exc_type is asyncio.CancelledError and self._state == _State.TIMEOUT: 

211 self._timeout_handler = None 

212 raise asyncio.TimeoutError 

213 # timeout has not expired 

214 self._state = _State.EXIT 

215 self._reject() 

216 return None 

217 

218 def _on_timeout(self, task: "asyncio.Task[None]") -> None: 

219 task.cancel() 

220 self._state = _State.TIMEOUT 

221 # drop the reference early 

222 self._timeout_handler = None 

223 

224 

225if sys.version_info >= (3, 7): 

226 

227 def _current_task(loop: asyncio.AbstractEventLoop) -> "Optional[asyncio.Task[Any]]": 

228 return asyncio.current_task(loop=loop) 

229 

230else: 

231 

232 def _current_task(loop: asyncio.AbstractEventLoop) -> "Optional[asyncio.Task[Any]]": 

233 return asyncio.Task.current_task(loop=loop) 

234 

235 

236if sys.version_info >= (3, 7): 

237 

238 def _get_running_loop() -> asyncio.AbstractEventLoop: 

239 return asyncio.get_running_loop() 

240 

241else: 

242 

243 def _get_running_loop() -> asyncio.AbstractEventLoop: 

244 loop = asyncio.get_event_loop() 

245 if not loop.is_running(): 

246 raise RuntimeError("no running event loop") 

247 return loop