Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.10/site-packages/async_timeout/__init__.py: 25%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

131 statements  

1import asyncio 

2import enum 

3import sys 

4from types import TracebackType 

5from typing import Optional, Type, final 

6 

7 

8__version__ = "5.0.1" 

9 

10 

11__all__ = ("timeout", "timeout_at", "Timeout") 

12 

13 

14def timeout(delay: Optional[float]) -> "Timeout": 

15 """timeout context manager. 

16 

17 Useful in cases when you want to apply timeout logic around block 

18 of code or in cases when asyncio.wait_for is not suitable. For example: 

19 

20 >>> async with timeout(0.001): 

21 ... async with aiohttp.get('https://github.com') as r: 

22 ... await r.text() 

23 

24 

25 delay - value in seconds or None to disable timeout logic 

26 """ 

27 loop = asyncio.get_running_loop() 

28 if delay is not None: 

29 deadline = loop.time() + delay # type: Optional[float] 

30 else: 

31 deadline = None 

32 return Timeout(deadline, loop) 

33 

34 

35def timeout_at(deadline: Optional[float]) -> "Timeout": 

36 """Schedule the timeout at absolute time. 

37 

38 deadline argument points on the time in the same clock system 

39 as loop.time(). 

40 

41 Please note: it is not POSIX time but a time with 

42 undefined starting base, e.g. the time of the system power on. 

43 

44 >>> async with timeout_at(loop.time() + 10): 

45 ... async with aiohttp.get('https://github.com') as r: 

46 ... await r.text() 

47 

48 

49 """ 

50 loop = asyncio.get_running_loop() 

51 return Timeout(deadline, loop) 

52 

53 

54class _State(enum.Enum): 

55 INIT = "INIT" 

56 ENTER = "ENTER" 

57 TIMEOUT = "TIMEOUT" 

58 EXIT = "EXIT" 

59 

60 

61if sys.version_info >= (3, 11): 

62 

63 class _Expired: 

64 __slots__ = ("_val",) 

65 

66 def __init__(self, val: bool) -> None: 

67 self._val = val 

68 

69 def __call__(self) -> bool: 

70 return self._val 

71 

72 def __bool__(self) -> bool: 

73 return self._val 

74 

75 def __repr__(self) -> str: 

76 return repr(self._val) 

77 

78 def __str__(self) -> str: 

79 return str(self._val) 

80 

81 @final 

82 class Timeout(asyncio.Timeout): # type: ignore[misc] 

83 # Supports full asyncio.Timeout API. 

84 # Also provides several asyncio_timeout specific methods 

85 # for backward compatibility. 

86 def __init__( 

87 self, deadline: Optional[float], loop: asyncio.AbstractEventLoop 

88 ) -> None: 

89 super().__init__(deadline) 

90 

91 @property 

92 def expired(self) -> _Expired: 

93 # a hacky property hat can provide both roles: 

94 # timeout.expired() from asyncio 

95 # timeout.expired from asyncio_timeout 

96 return _Expired(super().expired()) 

97 

98 @property 

99 def deadline(self) -> Optional[float]: 

100 return self.when() 

101 

102 def reject(self) -> None: 

103 """Reject scheduled timeout if any.""" 

104 # cancel is maybe better name but 

105 # task.cancel() raises CancelledError in asyncio world. 

106 self.reschedule(None) 

107 

108 def shift(self, delay: float) -> None: 

109 """Advance timeout on delay seconds. 

110 

111 The delay can be negative. 

112 

113 Raise RuntimeError if shift is called when deadline is not scheduled 

114 """ 

115 deadline = self.when() 

116 if deadline is None: 

117 raise RuntimeError("cannot shift timeout if deadline is not scheduled") 

118 self.reschedule(deadline + delay) 

119 

120 def update(self, deadline: float) -> None: 

121 """Set deadline to absolute value. 

122 

123 deadline argument points on the time in the same clock system 

124 as loop.time(). 

125 

126 If new deadline is in the past the timeout is raised immediately. 

127 

128 Please note: it is not POSIX time but a time with 

129 undefined starting base, e.g. the time of the system power on. 

130 """ 

131 self.reschedule(deadline) 

132 

133else: 

134 

135 @final 

136 class Timeout: 

137 # Internal class, please don't instantiate it directly 

138 # Use timeout() and timeout_at() public factories instead. 

139 # 

140 # Implementation note: `async with timeout()` is preferred 

141 # over `with timeout()`. 

142 # While technically the Timeout class implementation 

143 # doesn't need to be async at all, 

144 # the `async with` statement explicitly points that 

145 # the context manager should be used from async function context. 

146 # 

147 # This design allows to avoid many silly misusages. 

148 # 

149 # TimeoutError is raised immediately when scheduled 

150 # if the deadline is passed. 

151 # The purpose is to time out as soon as possible 

152 # without waiting for the next await expression. 

153 

154 __slots__ = ("_deadline", "_loop", "_state", "_timeout_handler", "_task") 

155 

156 def __init__( 

157 self, deadline: Optional[float], loop: asyncio.AbstractEventLoop 

158 ) -> None: 

159 self._loop = loop 

160 self._state = _State.INIT 

161 

162 self._task: Optional["asyncio.Task[object]"] = None 

163 self._timeout_handler = None # type: Optional[asyncio.Handle] 

164 if deadline is None: 

165 self._deadline = None # type: Optional[float] 

166 else: 

167 self.update(deadline) 

168 

169 async def __aenter__(self) -> "Timeout": 

170 self._do_enter() 

171 return self 

172 

173 async def __aexit__( 

174 self, 

175 exc_type: Optional[Type[BaseException]], 

176 exc_val: Optional[BaseException], 

177 exc_tb: Optional[TracebackType], 

178 ) -> Optional[bool]: 

179 self._do_exit(exc_type) 

180 return None 

181 

182 @property 

183 def expired(self) -> bool: 

184 """Is timeout expired during execution?""" 

185 return self._state == _State.TIMEOUT 

186 

187 @property 

188 def deadline(self) -> Optional[float]: 

189 return self._deadline 

190 

191 def reject(self) -> None: 

192 """Reject scheduled timeout if any.""" 

193 # cancel is maybe better name but 

194 # task.cancel() raises CancelledError in asyncio world. 

195 if self._state not in (_State.INIT, _State.ENTER): 

196 raise RuntimeError(f"invalid state {self._state.value}") 

197 self._reject() 

198 

199 def _reject(self) -> None: 

200 self._task = None 

201 if self._timeout_handler is not None: 

202 self._timeout_handler.cancel() 

203 self._timeout_handler = None 

204 

205 def shift(self, delay: float) -> None: 

206 """Advance timeout on delay seconds. 

207 

208 The delay can be negative. 

209 

210 Raise RuntimeError if shift is called when deadline is not scheduled 

211 """ 

212 deadline = self._deadline 

213 if deadline is None: 

214 raise RuntimeError("cannot shift timeout if deadline is not scheduled") 

215 self.update(deadline + delay) 

216 

217 def update(self, deadline: float) -> None: 

218 """Set deadline to absolute value. 

219 

220 deadline argument points on the time in the same clock system 

221 as loop.time(). 

222 

223 If new deadline is in the past the timeout is raised immediately. 

224 

225 Please note: it is not POSIX time but a time with 

226 undefined starting base, e.g. the time of the system power on. 

227 """ 

228 if self._state == _State.EXIT: 

229 raise RuntimeError("cannot reschedule after exit from context manager") 

230 if self._state == _State.TIMEOUT: 

231 raise RuntimeError("cannot reschedule expired timeout") 

232 if self._timeout_handler is not None: 

233 self._timeout_handler.cancel() 

234 self._deadline = deadline 

235 if self._state != _State.INIT: 

236 self._reschedule() 

237 

238 def _reschedule(self) -> None: 

239 assert self._state == _State.ENTER 

240 deadline = self._deadline 

241 if deadline is None: 

242 return 

243 

244 now = self._loop.time() 

245 if self._timeout_handler is not None: 

246 self._timeout_handler.cancel() 

247 

248 self._task = asyncio.current_task() 

249 if deadline <= now: 

250 self._timeout_handler = self._loop.call_soon(self._on_timeout) 

251 else: 

252 self._timeout_handler = self._loop.call_at(deadline, self._on_timeout) 

253 

254 def _do_enter(self) -> None: 

255 if self._state != _State.INIT: 

256 raise RuntimeError(f"invalid state {self._state.value}") 

257 self._state = _State.ENTER 

258 self._reschedule() 

259 

260 def _do_exit(self, exc_type: Optional[Type[BaseException]]) -> None: 

261 if exc_type is asyncio.CancelledError and self._state == _State.TIMEOUT: 

262 assert self._task is not None 

263 self._timeout_handler = None 

264 self._task = None 

265 raise asyncio.TimeoutError 

266 # timeout has not expired 

267 self._state = _State.EXIT 

268 self._reject() 

269 return None 

270 

271 def _on_timeout(self) -> None: 

272 assert self._task is not None 

273 self._task.cancel() 

274 self._state = _State.TIMEOUT 

275 # drop the reference early 

276 self._timeout_handler = None