Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiohttp/client_proto.py: 19%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2from contextlib import suppress
3from typing import Any, Callable
5from .base_protocol import BaseProtocol
6from .client_exceptions import (
7 ClientConnectionError,
8 ClientOSError,
9 ClientPayloadError,
10 ServerDisconnectedError,
11 SocketTimeoutError,
12)
13from .helpers import (
14 _EXC_SENTINEL,
15 DEFAULT_CHUNK_SIZE,
16 EMPTY_BODY_STATUS_CODES,
17 BaseTimerContext,
18 set_exception,
19 set_result,
20)
21from .http import HttpResponseParser, RawResponseMessage
22from .http_exceptions import HttpProcessingError
23from .streams import EMPTY_PAYLOAD, DataQueue, StreamReader
26class ResponseHandler(BaseProtocol, DataQueue[tuple[RawResponseMessage, StreamReader]]):
27 """Helper class to adapt between Protocol and StreamReader."""
29 def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
30 BaseProtocol.__init__(self, loop=loop, parser=None)
31 DataQueue.__init__(self, loop)
33 self._should_close = False
35 self._payload: StreamReader | None = None
36 self._skip_payload = False
37 self._payload_parser = None
38 self._data_received_cb: Callable[[], None] | None = None
40 self._timer = None
41 self._tail = b""
43 self._read_timeout: float | None = None
44 self._read_timeout_handle: asyncio.TimerHandle | None = None
46 self._timeout_ceil_threshold: float | None = 5
48 self._closed: None | asyncio.Future[None] = None
49 self._connection_lost_called = False
51 @property
52 def closed(self) -> None | asyncio.Future[None]:
53 """Future that is set when the connection is closed.
55 This property returns a Future that will be completed when the connection
56 is closed. The Future is created lazily on first access to avoid creating
57 futures that will never be awaited.
59 Returns:
60 - A Future[None] if the connection is still open or was closed after
61 this property was accessed
62 - None if connection_lost() was already called before this property
63 was ever accessed (indicating no one is waiting for the closure)
64 """
65 if self._closed is None and not self._connection_lost_called:
66 self._closed = self._loop.create_future()
67 return self._closed
69 @property
70 def upgraded(self) -> bool:
71 return self._upgraded
73 @property
74 def should_close(self) -> bool:
75 return bool(
76 self._should_close
77 or (self._payload is not None and not self._payload.is_eof())
78 or self._upgraded
79 or self._exception is not None
80 or self._payload_parser is not None
81 or self._buffer
82 or self._tail
83 )
85 def force_close(self) -> None:
86 self._should_close = True
88 def close(self) -> None:
89 self._exception = None # Break cyclic references
90 transport = self.transport
91 if transport is not None:
92 transport.close()
93 self.transport = None
94 self._payload = None
95 self._drop_timeout()
97 def abort(self) -> None:
98 self._exception = None # Break cyclic references
99 transport = self.transport
100 if transport is not None:
101 transport.abort()
102 self.transport = None
103 self._payload = None
104 self._drop_timeout()
106 def is_connected(self) -> bool:
107 return self.transport is not None and not self.transport.is_closing()
109 def connection_lost(self, exc: BaseException | None) -> None:
110 self._connection_lost_called = True
111 self._drop_timeout()
113 original_connection_error = exc
114 reraised_exc = original_connection_error
116 connection_closed_cleanly = original_connection_error is None
118 if self._closed is not None:
119 # If someone is waiting for the closed future,
120 # we should set it to None or an exception. If
121 # self._closed is None, it means that
122 # connection_lost() was called already
123 # or nobody is waiting for it.
124 if connection_closed_cleanly:
125 set_result(self._closed, None)
126 else:
127 assert original_connection_error is not None
128 set_exception(
129 self._closed,
130 ClientConnectionError(
131 f"Connection lost: {original_connection_error !s}",
132 ),
133 original_connection_error,
134 )
136 if self._payload_parser is not None:
137 with suppress(Exception): # FIXME: log this somehow?
138 self._payload_parser.feed_eof()
140 uncompleted = None
141 if self._parser is not None:
142 try:
143 uncompleted = self._parser.feed_eof()
144 except Exception as underlying_exc:
145 if self._payload is not None:
146 client_payload_exc_msg = (
147 f"Response payload is not completed: {underlying_exc !r}"
148 )
149 if not connection_closed_cleanly:
150 client_payload_exc_msg = (
151 f"{client_payload_exc_msg !s}. "
152 f"{original_connection_error !r}"
153 )
154 set_exception(
155 self._payload,
156 ClientPayloadError(client_payload_exc_msg),
157 underlying_exc,
158 )
160 if not self.is_eof():
161 if isinstance(original_connection_error, OSError):
162 reraised_exc = ClientOSError(*original_connection_error.args)
163 if connection_closed_cleanly:
164 reraised_exc = ServerDisconnectedError(uncompleted)
165 # assigns self._should_close to True as side effect,
166 # we do it anyway below
167 underlying_non_eof_exc = (
168 _EXC_SENTINEL
169 if connection_closed_cleanly
170 else original_connection_error
171 )
172 assert underlying_non_eof_exc is not None
173 assert reraised_exc is not None
174 self.set_exception(reraised_exc, underlying_non_eof_exc)
176 self._should_close = True
177 self._parser = None
178 self._payload = None
179 self._payload_parser = None
180 self._reading_paused = False
182 super().connection_lost(reraised_exc)
184 def eof_received(self) -> None:
185 # should call parser.feed_eof() most likely
186 self._drop_timeout()
188 def pause_reading(self) -> None:
189 super().pause_reading()
190 self._drop_timeout()
192 def resume_reading(self, resume_parser: bool = True) -> None:
193 super().resume_reading(resume_parser)
194 self._reschedule_timeout()
196 def set_exception(
197 self,
198 exc: BaseException,
199 exc_cause: BaseException = _EXC_SENTINEL,
200 ) -> None:
201 self._should_close = True
202 self._drop_timeout()
203 super().set_exception(exc, exc_cause)
205 def set_parser(
206 self,
207 parser: Any,
208 payload: Any,
209 data_received_cb: Callable[[], None] | None = None,
210 ) -> None:
211 # TODO: actual types are:
212 # parser: WebSocketReader
213 # payload: WebSocketDataQueue
214 # but they are not generi enough
215 # Need an ABC for both types
216 self._payload = payload
217 self._payload_parser = parser
218 self._data_received_cb = data_received_cb
220 self._drop_timeout()
222 if self._tail:
223 data, self._tail = self._tail, b""
224 self.data_received(data)
226 def set_response_params(
227 self,
228 *,
229 timer: BaseTimerContext | None = None,
230 skip_payload: bool = False,
231 read_until_eof: bool = False,
232 auto_decompress: bool = True,
233 read_timeout: float | None = None,
234 read_bufsize: int = DEFAULT_CHUNK_SIZE,
235 timeout_ceil_threshold: float = 5,
236 max_line_size: int = 8190,
237 max_field_size: int = 8190,
238 max_headers: int = 128,
239 ) -> None:
240 self._skip_payload = skip_payload
242 self._read_timeout = read_timeout
244 self._timeout_ceil_threshold = timeout_ceil_threshold
246 self._parser = HttpResponseParser(
247 self,
248 self._loop,
249 read_bufsize,
250 timer=timer,
251 payload_exception=ClientPayloadError,
252 response_with_body=not skip_payload,
253 read_until_eof=read_until_eof,
254 auto_decompress=auto_decompress,
255 max_line_size=max_line_size,
256 max_field_size=max_field_size,
257 max_headers=max_headers,
258 )
260 if self._tail:
261 data, self._tail = self._tail, b""
262 self.data_received(data)
264 def _drop_timeout(self) -> None:
265 if self._read_timeout_handle is not None:
266 self._read_timeout_handle.cancel()
267 self._read_timeout_handle = None
269 def _reschedule_timeout(self) -> None:
270 timeout = self._read_timeout
271 if self._read_timeout_handle is not None:
272 self._read_timeout_handle.cancel()
274 if timeout:
275 self._read_timeout_handle = self._loop.call_later(
276 timeout, self._on_read_timeout
277 )
278 else:
279 self._read_timeout_handle = None
281 def start_timeout(self) -> None:
282 self._reschedule_timeout()
284 @property
285 def read_timeout(self) -> float | None:
286 return self._read_timeout
288 @read_timeout.setter
289 def read_timeout(self, read_timeout: float | None) -> None:
290 self._read_timeout = read_timeout
292 def _on_read_timeout(self) -> None:
293 exc = SocketTimeoutError("Timeout on reading data from socket")
294 self.set_exception(exc)
295 if self._payload is not None:
296 set_exception(self._payload, exc)
298 def data_received(self, data: bytes) -> None:
299 # If no data, then we are resuming decompression. We haven't received
300 # data from the socket, so we can avoid the reschedule overhead.
301 if data:
302 self._reschedule_timeout()
304 # custom payload parser - currently always WebSocketReader
305 if self._payload_parser is not None:
306 if self._data_received_cb is not None:
307 self._data_received_cb()
308 eof, tail = self._payload_parser.feed_data(data)
309 if eof:
310 self._payload = None
311 self._payload_parser = None
313 if tail:
314 self.data_received(tail)
315 return
317 if self._upgraded or self._parser is None:
318 # i.e. websocket connection, websocket parser is not set yet
319 self._tail += data
320 return
322 # parse http messages
323 try:
324 messages, upgraded, tail = self._parser.feed_data(data)
325 except BaseException as underlying_exc:
326 if self.transport is not None:
327 # connection.release() could be called BEFORE
328 # data_received(), the transport is already
329 # closed in this case
330 self.transport.close()
331 if not isinstance(underlying_exc, Exception):
332 raise
333 # should_close is True after the call
334 if isinstance(underlying_exc, HttpProcessingError):
335 exc = HttpProcessingError(
336 code=underlying_exc.code,
337 message=underlying_exc.message,
338 headers=underlying_exc.headers,
339 )
340 else:
341 exc = HttpProcessingError()
342 self.set_exception(exc, underlying_exc)
343 return
345 self._upgraded = upgraded
347 payload: StreamReader | None = None
348 for message, payload in messages:
349 if message.should_close:
350 self._should_close = True
352 self._payload = payload
354 if self._skip_payload or message.code in EMPTY_BODY_STATUS_CODES:
355 self.feed_data((message, EMPTY_PAYLOAD), 0)
356 else:
357 self.feed_data((message, payload), 0)
359 if payload is not None:
360 # new message(s) was processed
361 # register timeout handler unsubscribing
362 # either on end-of-stream or immediately for
363 # EMPTY_PAYLOAD
364 if payload is not EMPTY_PAYLOAD:
365 payload.on_eof(self._drop_timeout)
366 else:
367 self._drop_timeout()
369 if upgraded and tail:
370 self.data_received(tail)