Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/async_timeout/__init__.py: 35%
118 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:52 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:52 +0000
1import asyncio
2import enum
3import sys
4import warnings
5from types import TracebackType
6from typing import Any, Optional, Type
9if sys.version_info >= (3, 8):
10 from typing import final
11else:
12 from typing_extensions import final
15__version__ = "4.0.2"
18__all__ = ("timeout", "timeout_at", "Timeout")
21def timeout(delay: Optional[float]) -> "Timeout":
22 """timeout context manager.
24 Useful in cases when you want to apply timeout logic around block
25 of code or in cases when asyncio.wait_for is not suitable. For example:
27 >>> async with timeout(0.001):
28 ... async with aiohttp.get('https://github.com') as r:
29 ... await r.text()
32 delay - value in seconds or None to disable timeout logic
33 """
34 loop = _get_running_loop()
35 if delay is not None:
36 deadline = loop.time() + delay # type: Optional[float]
37 else:
38 deadline = None
39 return Timeout(deadline, loop)
42def timeout_at(deadline: Optional[float]) -> "Timeout":
43 """Schedule the timeout at absolute time.
45 deadline argument points on the time in the same clock system
46 as loop.time().
48 Please note: it is not POSIX time but a time with
49 undefined starting base, e.g. the time of the system power on.
51 >>> async with timeout_at(loop.time() + 10):
52 ... async with aiohttp.get('https://github.com') as r:
53 ... await r.text()
56 """
57 loop = _get_running_loop()
58 return Timeout(deadline, loop)
61class _State(enum.Enum):
62 INIT = "INIT"
63 ENTER = "ENTER"
64 TIMEOUT = "TIMEOUT"
65 EXIT = "EXIT"
68@final
69class Timeout:
70 # Internal class, please don't instantiate it directly
71 # Use timeout() and timeout_at() public factories instead.
72 #
73 # Implementation note: `async with timeout()` is preferred
74 # over `with timeout()`.
75 # While technically the Timeout class implementation
76 # doesn't need to be async at all,
77 # the `async with` statement explicitly points that
78 # the context manager should be used from async function context.
79 #
80 # This design allows to avoid many silly misusages.
81 #
82 # TimeoutError is raised immadiatelly when scheduled
83 # if the deadline is passed.
84 # The purpose is to time out as sson as possible
85 # without waiting for the next await expression.
87 __slots__ = ("_deadline", "_loop", "_state", "_timeout_handler")
89 def __init__(
90 self, deadline: Optional[float], loop: asyncio.AbstractEventLoop
91 ) -> None:
92 self._loop = loop
93 self._state = _State.INIT
95 self._timeout_handler = None # type: Optional[asyncio.Handle]
96 if deadline is None:
97 self._deadline = None # type: Optional[float]
98 else:
99 self.update(deadline)
101 def __enter__(self) -> "Timeout":
102 warnings.warn(
103 "with timeout() is deprecated, use async with timeout() instead",
104 DeprecationWarning,
105 stacklevel=2,
106 )
107 self._do_enter()
108 return self
110 def __exit__(
111 self,
112 exc_type: Optional[Type[BaseException]],
113 exc_val: Optional[BaseException],
114 exc_tb: Optional[TracebackType],
115 ) -> Optional[bool]:
116 self._do_exit(exc_type)
117 return None
119 async def __aenter__(self) -> "Timeout":
120 self._do_enter()
121 return self
123 async def __aexit__(
124 self,
125 exc_type: Optional[Type[BaseException]],
126 exc_val: Optional[BaseException],
127 exc_tb: Optional[TracebackType],
128 ) -> Optional[bool]:
129 self._do_exit(exc_type)
130 return None
132 @property
133 def expired(self) -> bool:
134 """Is timeout expired during execution?"""
135 return self._state == _State.TIMEOUT
137 @property
138 def deadline(self) -> Optional[float]:
139 return self._deadline
141 def reject(self) -> None:
142 """Reject scheduled timeout if any."""
143 # cancel is maybe better name but
144 # task.cancel() raises CancelledError in asyncio world.
145 if self._state not in (_State.INIT, _State.ENTER):
146 raise RuntimeError(f"invalid state {self._state.value}")
147 self._reject()
149 def _reject(self) -> None:
150 if self._timeout_handler is not None:
151 self._timeout_handler.cancel()
152 self._timeout_handler = None
154 def shift(self, delay: float) -> None:
155 """Advance timeout on delay seconds.
157 The delay can be negative.
159 Raise RuntimeError if shift is called when deadline is not scheduled
160 """
161 deadline = self._deadline
162 if deadline is None:
163 raise RuntimeError("cannot shift timeout if deadline is not scheduled")
164 self.update(deadline + delay)
166 def update(self, deadline: float) -> None:
167 """Set deadline to absolute value.
169 deadline argument points on the time in the same clock system
170 as loop.time().
172 If new deadline is in the past the timeout is raised immediatelly.
174 Please note: it is not POSIX time but a time with
175 undefined starting base, e.g. the time of the system power on.
176 """
177 if self._state == _State.EXIT:
178 raise RuntimeError("cannot reschedule after exit from context manager")
179 if self._state == _State.TIMEOUT:
180 raise RuntimeError("cannot reschedule expired timeout")
181 if self._timeout_handler is not None:
182 self._timeout_handler.cancel()
183 self._deadline = deadline
184 if self._state != _State.INIT:
185 self._reschedule()
187 def _reschedule(self) -> None:
188 assert self._state == _State.ENTER
189 deadline = self._deadline
190 if deadline is None:
191 return
193 now = self._loop.time()
194 if self._timeout_handler is not None:
195 self._timeout_handler.cancel()
197 task = _current_task(self._loop)
198 if deadline <= now:
199 self._timeout_handler = self._loop.call_soon(self._on_timeout, task)
200 else:
201 self._timeout_handler = self._loop.call_at(deadline, self._on_timeout, task)
203 def _do_enter(self) -> None:
204 if self._state != _State.INIT:
205 raise RuntimeError(f"invalid state {self._state.value}")
206 self._state = _State.ENTER
207 self._reschedule()
209 def _do_exit(self, exc_type: Optional[Type[BaseException]]) -> None:
210 if exc_type is asyncio.CancelledError and self._state == _State.TIMEOUT:
211 self._timeout_handler = None
212 raise asyncio.TimeoutError
213 # timeout has not expired
214 self._state = _State.EXIT
215 self._reject()
216 return None
218 def _on_timeout(self, task: "asyncio.Task[None]") -> None:
219 task.cancel()
220 self._state = _State.TIMEOUT
221 # drop the reference early
222 self._timeout_handler = None
225if sys.version_info >= (3, 7):
227 def _current_task(loop: asyncio.AbstractEventLoop) -> "Optional[asyncio.Task[Any]]":
228 return asyncio.current_task(loop=loop)
230else:
232 def _current_task(loop: asyncio.AbstractEventLoop) -> "Optional[asyncio.Task[Any]]":
233 return asyncio.Task.current_task(loop=loop)
236if sys.version_info >= (3, 7):
238 def _get_running_loop() -> asyncio.AbstractEventLoop:
239 return asyncio.get_running_loop()
241else:
243 def _get_running_loop() -> asyncio.AbstractEventLoop:
244 loop = asyncio.get_event_loop()
245 if not loop.is_running():
246 raise RuntimeError("no running event loop")
247 return loop