Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/async_timeout/__init__.py: 50%
116 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:40 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:40 +0000
1import asyncio
2import enum
3import sys
4import warnings
5from types import TracebackType
6from typing import Optional, Type
9if sys.version_info >= (3, 8):
10 from typing import final
11else:
12 from typing_extensions import final
15if sys.version_info >= (3, 11):
17 def _uncancel_task(task: "asyncio.Task[object]") -> None:
18 task.uncancel()
20else:
22 def _uncancel_task(task: "asyncio.Task[object]") -> None:
23 pass
26__version__ = "4.0.3"
29__all__ = ("timeout", "timeout_at", "Timeout")
32def timeout(delay: Optional[float]) -> "Timeout":
33 """timeout context manager.
35 Useful in cases when you want to apply timeout logic around block
36 of code or in cases when asyncio.wait_for is not suitable. For example:
38 >>> async with timeout(0.001):
39 ... async with aiohttp.get('https://github.com') as r:
40 ... await r.text()
43 delay - value in seconds or None to disable timeout logic
44 """
45 loop = asyncio.get_running_loop()
46 if delay is not None:
47 deadline = loop.time() + delay # type: Optional[float]
48 else:
49 deadline = None
50 return Timeout(deadline, loop)
53def timeout_at(deadline: Optional[float]) -> "Timeout":
54 """Schedule the timeout at absolute time.
56 deadline argument points on the time in the same clock system
57 as loop.time().
59 Please note: it is not POSIX time but a time with
60 undefined starting base, e.g. the time of the system power on.
62 >>> async with timeout_at(loop.time() + 10):
63 ... async with aiohttp.get('https://github.com') as r:
64 ... await r.text()
67 """
68 loop = asyncio.get_running_loop()
69 return Timeout(deadline, loop)
72class _State(enum.Enum):
73 INIT = "INIT"
74 ENTER = "ENTER"
75 TIMEOUT = "TIMEOUT"
76 EXIT = "EXIT"
79@final
80class Timeout:
81 # Internal class, please don't instantiate it directly
82 # Use timeout() and timeout_at() public factories instead.
83 #
84 # Implementation note: `async with timeout()` is preferred
85 # over `with timeout()`.
86 # While technically the Timeout class implementation
87 # doesn't need to be async at all,
88 # the `async with` statement explicitly points that
89 # the context manager should be used from async function context.
90 #
91 # This design allows to avoid many silly misusages.
92 #
93 # TimeoutError is raised immediately when scheduled
94 # if the deadline is passed.
95 # The purpose is to time out as soon as possible
96 # without waiting for the next await expression.
98 __slots__ = ("_deadline", "_loop", "_state", "_timeout_handler", "_task")
100 def __init__(
101 self, deadline: Optional[float], loop: asyncio.AbstractEventLoop
102 ) -> None:
103 self._loop = loop
104 self._state = _State.INIT
106 self._task: Optional["asyncio.Task[object]"] = None
107 self._timeout_handler = None # type: Optional[asyncio.Handle]
108 if deadline is None:
109 self._deadline = None # type: Optional[float]
110 else:
111 self.update(deadline)
113 def __enter__(self) -> "Timeout":
114 warnings.warn(
115 "with timeout() is deprecated, use async with timeout() instead",
116 DeprecationWarning,
117 stacklevel=2,
118 )
119 self._do_enter()
120 return self
122 def __exit__(
123 self,
124 exc_type: Optional[Type[BaseException]],
125 exc_val: Optional[BaseException],
126 exc_tb: Optional[TracebackType],
127 ) -> Optional[bool]:
128 self._do_exit(exc_type)
129 return None
131 async def __aenter__(self) -> "Timeout":
132 self._do_enter()
133 return self
135 async def __aexit__(
136 self,
137 exc_type: Optional[Type[BaseException]],
138 exc_val: Optional[BaseException],
139 exc_tb: Optional[TracebackType],
140 ) -> Optional[bool]:
141 self._do_exit(exc_type)
142 return None
144 @property
145 def expired(self) -> bool:
146 """Is timeout expired during execution?"""
147 return self._state == _State.TIMEOUT
149 @property
150 def deadline(self) -> Optional[float]:
151 return self._deadline
153 def reject(self) -> None:
154 """Reject scheduled timeout if any."""
155 # cancel is maybe better name but
156 # task.cancel() raises CancelledError in asyncio world.
157 if self._state not in (_State.INIT, _State.ENTER):
158 raise RuntimeError(f"invalid state {self._state.value}")
159 self._reject()
161 def _reject(self) -> None:
162 self._task = None
163 if self._timeout_handler is not None:
164 self._timeout_handler.cancel()
165 self._timeout_handler = None
167 def shift(self, delay: float) -> None:
168 """Advance timeout on delay seconds.
170 The delay can be negative.
172 Raise RuntimeError if shift is called when deadline is not scheduled
173 """
174 deadline = self._deadline
175 if deadline is None:
176 raise RuntimeError("cannot shift timeout if deadline is not scheduled")
177 self.update(deadline + delay)
179 def update(self, deadline: float) -> None:
180 """Set deadline to absolute value.
182 deadline argument points on the time in the same clock system
183 as loop.time().
185 If new deadline is in the past the timeout is raised immediately.
187 Please note: it is not POSIX time but a time with
188 undefined starting base, e.g. the time of the system power on.
189 """
190 if self._state == _State.EXIT:
191 raise RuntimeError("cannot reschedule after exit from context manager")
192 if self._state == _State.TIMEOUT:
193 raise RuntimeError("cannot reschedule expired timeout")
194 if self._timeout_handler is not None:
195 self._timeout_handler.cancel()
196 self._deadline = deadline
197 if self._state != _State.INIT:
198 self._reschedule()
200 def _reschedule(self) -> None:
201 assert self._state == _State.ENTER
202 deadline = self._deadline
203 if deadline is None:
204 return
206 now = self._loop.time()
207 if self._timeout_handler is not None:
208 self._timeout_handler.cancel()
210 self._task = asyncio.current_task()
211 if deadline <= now:
212 self._timeout_handler = self._loop.call_soon(self._on_timeout)
213 else:
214 self._timeout_handler = self._loop.call_at(deadline, self._on_timeout)
216 def _do_enter(self) -> None:
217 if self._state != _State.INIT:
218 raise RuntimeError(f"invalid state {self._state.value}")
219 self._state = _State.ENTER
220 self._reschedule()
222 def _do_exit(self, exc_type: Optional[Type[BaseException]]) -> None:
223 if exc_type is asyncio.CancelledError and self._state == _State.TIMEOUT:
224 assert self._task is not None
225 _uncancel_task(self._task)
226 self._timeout_handler = None
227 self._task = None
228 raise asyncio.TimeoutError
229 # timeout has not expired
230 self._state = _State.EXIT
231 self._reject()
232 return None
234 def _on_timeout(self) -> None:
235 assert self._task is not None
236 self._task.cancel()
237 self._state = _State.TIMEOUT
238 # drop the reference early
239 self._timeout_handler = None