1import asyncio
2import enum
3import sys
4from types import TracebackType
5from typing import Optional, Type, final
6
7
8__version__ = "5.0.1"
9
10
11__all__ = ("timeout", "timeout_at", "Timeout")
12
13
14def timeout(delay: Optional[float]) -> "Timeout":
15 """timeout context manager.
16
17 Useful in cases when you want to apply timeout logic around block
18 of code or in cases when asyncio.wait_for is not suitable. For example:
19
20 >>> async with timeout(0.001):
21 ... async with aiohttp.get('https://github.com') as r:
22 ... await r.text()
23
24
25 delay - value in seconds or None to disable timeout logic
26 """
27 loop = asyncio.get_running_loop()
28 if delay is not None:
29 deadline = loop.time() + delay # type: Optional[float]
30 else:
31 deadline = None
32 return Timeout(deadline, loop)
33
34
35def timeout_at(deadline: Optional[float]) -> "Timeout":
36 """Schedule the timeout at absolute time.
37
38 deadline argument points on the time in the same clock system
39 as loop.time().
40
41 Please note: it is not POSIX time but a time with
42 undefined starting base, e.g. the time of the system power on.
43
44 >>> async with timeout_at(loop.time() + 10):
45 ... async with aiohttp.get('https://github.com') as r:
46 ... await r.text()
47
48
49 """
50 loop = asyncio.get_running_loop()
51 return Timeout(deadline, loop)
52
53
54class _State(enum.Enum):
55 INIT = "INIT"
56 ENTER = "ENTER"
57 TIMEOUT = "TIMEOUT"
58 EXIT = "EXIT"
59
60
61if sys.version_info >= (3, 11):
62
63 class _Expired:
64 __slots__ = ("_val",)
65
66 def __init__(self, val: bool) -> None:
67 self._val = val
68
69 def __call__(self) -> bool:
70 return self._val
71
72 def __bool__(self) -> bool:
73 return self._val
74
75 def __repr__(self) -> str:
76 return repr(self._val)
77
78 def __str__(self) -> str:
79 return str(self._val)
80
81 @final
82 class Timeout(asyncio.Timeout): # type: ignore[misc]
83 # Supports full asyncio.Timeout API.
84 # Also provides several asyncio_timeout specific methods
85 # for backward compatibility.
86 def __init__(
87 self, deadline: Optional[float], loop: asyncio.AbstractEventLoop
88 ) -> None:
89 super().__init__(deadline)
90
91 @property
92 def expired(self) -> _Expired:
93 # a hacky property hat can provide both roles:
94 # timeout.expired() from asyncio
95 # timeout.expired from asyncio_timeout
96 return _Expired(super().expired())
97
98 @property
99 def deadline(self) -> Optional[float]:
100 return self.when()
101
102 def reject(self) -> None:
103 """Reject scheduled timeout if any."""
104 # cancel is maybe better name but
105 # task.cancel() raises CancelledError in asyncio world.
106 self.reschedule(None)
107
108 def shift(self, delay: float) -> None:
109 """Advance timeout on delay seconds.
110
111 The delay can be negative.
112
113 Raise RuntimeError if shift is called when deadline is not scheduled
114 """
115 deadline = self.when()
116 if deadline is None:
117 raise RuntimeError("cannot shift timeout if deadline is not scheduled")
118 self.reschedule(deadline + delay)
119
120 def update(self, deadline: float) -> None:
121 """Set deadline to absolute value.
122
123 deadline argument points on the time in the same clock system
124 as loop.time().
125
126 If new deadline is in the past the timeout is raised immediately.
127
128 Please note: it is not POSIX time but a time with
129 undefined starting base, e.g. the time of the system power on.
130 """
131 self.reschedule(deadline)
132
133else:
134
135 @final
136 class Timeout:
137 # Internal class, please don't instantiate it directly
138 # Use timeout() and timeout_at() public factories instead.
139 #
140 # Implementation note: `async with timeout()` is preferred
141 # over `with timeout()`.
142 # While technically the Timeout class implementation
143 # doesn't need to be async at all,
144 # the `async with` statement explicitly points that
145 # the context manager should be used from async function context.
146 #
147 # This design allows to avoid many silly misusages.
148 #
149 # TimeoutError is raised immediately when scheduled
150 # if the deadline is passed.
151 # The purpose is to time out as soon as possible
152 # without waiting for the next await expression.
153
154 __slots__ = ("_deadline", "_loop", "_state", "_timeout_handler", "_task")
155
156 def __init__(
157 self, deadline: Optional[float], loop: asyncio.AbstractEventLoop
158 ) -> None:
159 self._loop = loop
160 self._state = _State.INIT
161
162 self._task: Optional["asyncio.Task[object]"] = None
163 self._timeout_handler = None # type: Optional[asyncio.Handle]
164 if deadline is None:
165 self._deadline = None # type: Optional[float]
166 else:
167 self.update(deadline)
168
169 async def __aenter__(self) -> "Timeout":
170 self._do_enter()
171 return self
172
173 async def __aexit__(
174 self,
175 exc_type: Optional[Type[BaseException]],
176 exc_val: Optional[BaseException],
177 exc_tb: Optional[TracebackType],
178 ) -> Optional[bool]:
179 self._do_exit(exc_type)
180 return None
181
182 @property
183 def expired(self) -> bool:
184 """Is timeout expired during execution?"""
185 return self._state == _State.TIMEOUT
186
187 @property
188 def deadline(self) -> Optional[float]:
189 return self._deadline
190
191 def reject(self) -> None:
192 """Reject scheduled timeout if any."""
193 # cancel is maybe better name but
194 # task.cancel() raises CancelledError in asyncio world.
195 if self._state not in (_State.INIT, _State.ENTER):
196 raise RuntimeError(f"invalid state {self._state.value}")
197 self._reject()
198
199 def _reject(self) -> None:
200 self._task = None
201 if self._timeout_handler is not None:
202 self._timeout_handler.cancel()
203 self._timeout_handler = None
204
205 def shift(self, delay: float) -> None:
206 """Advance timeout on delay seconds.
207
208 The delay can be negative.
209
210 Raise RuntimeError if shift is called when deadline is not scheduled
211 """
212 deadline = self._deadline
213 if deadline is None:
214 raise RuntimeError("cannot shift timeout if deadline is not scheduled")
215 self.update(deadline + delay)
216
217 def update(self, deadline: float) -> None:
218 """Set deadline to absolute value.
219
220 deadline argument points on the time in the same clock system
221 as loop.time().
222
223 If new deadline is in the past the timeout is raised immediately.
224
225 Please note: it is not POSIX time but a time with
226 undefined starting base, e.g. the time of the system power on.
227 """
228 if self._state == _State.EXIT:
229 raise RuntimeError("cannot reschedule after exit from context manager")
230 if self._state == _State.TIMEOUT:
231 raise RuntimeError("cannot reschedule expired timeout")
232 if self._timeout_handler is not None:
233 self._timeout_handler.cancel()
234 self._deadline = deadline
235 if self._state != _State.INIT:
236 self._reschedule()
237
238 def _reschedule(self) -> None:
239 assert self._state == _State.ENTER
240 deadline = self._deadline
241 if deadline is None:
242 return
243
244 now = self._loop.time()
245 if self._timeout_handler is not None:
246 self._timeout_handler.cancel()
247
248 self._task = asyncio.current_task()
249 if deadline <= now:
250 self._timeout_handler = self._loop.call_soon(self._on_timeout)
251 else:
252 self._timeout_handler = self._loop.call_at(deadline, self._on_timeout)
253
254 def _do_enter(self) -> None:
255 if self._state != _State.INIT:
256 raise RuntimeError(f"invalid state {self._state.value}")
257 self._state = _State.ENTER
258 self._reschedule()
259
260 def _do_exit(self, exc_type: Optional[Type[BaseException]]) -> None:
261 if exc_type is asyncio.CancelledError and self._state == _State.TIMEOUT:
262 assert self._task is not None
263 self._timeout_handler = None
264 self._task = None
265 raise asyncio.TimeoutError
266 # timeout has not expired
267 self._state = _State.EXIT
268 self._reject()
269 return None
270
271 def _on_timeout(self) -> None:
272 assert self._task is not None
273 self._task.cancel()
274 self._state = _State.TIMEOUT
275 # drop the reference early
276 self._timeout_handler = None