Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/client_proto.py: 21%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2from contextlib import suppress
3from typing import Callable, Protocol
5from ._websocket.reader import WebSocketDataQueue
6from .base_protocol import BaseProtocol
7from .client_exceptions import (
8 ClientConnectionError,
9 ClientOSError,
10 ClientPayloadError,
11 ServerDisconnectedError,
12 SocketTimeoutError,
13)
14from .helpers import (
15 _EXC_SENTINEL,
16 DEFAULT_CHUNK_SIZE,
17 EMPTY_BODY_STATUS_CODES,
18 BaseTimerContext,
19 ErrorableProtocol,
20 set_exception,
21 set_result,
22)
23from .http import HttpResponseParser, RawResponseMessage, WebSocketReader
24from .http_exceptions import HttpProcessingError
25from .streams import EMPTY_PAYLOAD, DataQueue, StreamReader
28class _Payload(ErrorableProtocol, Protocol):
29 def is_eof(self) -> bool: ...
32class ResponseHandler(BaseProtocol, DataQueue[tuple[RawResponseMessage, StreamReader]]):
33 """Helper class to adapt between Protocol and StreamReader."""
35 def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
36 BaseProtocol.__init__(self, loop=loop, parser=None)
37 DataQueue.__init__(self, loop)
39 self._should_close = False
41 self._payload: _Payload | None = None
42 self._skip_payload = False
43 self._payload_parser: WebSocketReader | None = None
44 self._data_received_cb: Callable[[], None] | None = None
46 self._timer = None
47 self._tail = b""
49 self._read_timeout: float | None = None
50 self._read_timeout_handle: asyncio.TimerHandle | None = None
52 self._timeout_ceil_threshold: float | None = 5
54 self._closed: None | asyncio.Future[None] = None
55 self._connection_lost_called = False
57 @property
58 def closed(self) -> None | asyncio.Future[None]:
59 """Future that is set when the connection is closed.
61 This property returns a Future that will be completed when the connection
62 is closed. The Future is created lazily on first access to avoid creating
63 futures that will never be awaited.
65 Returns:
66 - A Future[None] if the connection is still open or was closed after
67 this property was accessed
68 - None if connection_lost() was already called before this property
69 was ever accessed (indicating no one is waiting for the closure)
70 """
71 if self._closed is None and not self._connection_lost_called:
72 self._closed = self._loop.create_future()
73 return self._closed
75 @property
76 def upgraded(self) -> bool:
77 return self._upgraded
79 @property
80 def should_close(self) -> bool:
81 return bool(
82 self._should_close
83 or (self._payload is not None and not self._payload.is_eof())
84 or self._upgraded
85 or self._exception is not None
86 or self._payload_parser is not None
87 or self._buffer
88 or self._tail
89 )
91 def force_close(self) -> None:
92 self._should_close = True
94 def close(self) -> None:
95 self._exception = None # Break cyclic references
96 transport = self.transport
97 if transport is not None:
98 transport.close()
99 self.transport = None
100 self._payload = None
101 self._drop_timeout()
103 def abort(self) -> None:
104 self._exception = None # Break cyclic references
105 transport = self.transport
106 if transport is not None:
107 transport.abort()
108 self.transport = None
109 self._payload = None
110 self._drop_timeout()
112 def is_connected(self) -> bool:
113 return self.transport is not None and not self.transport.is_closing()
115 def connection_lost(self, exc: BaseException | None) -> None:
116 self._connection_lost_called = True
117 self._drop_timeout()
119 original_connection_error = exc
120 reraised_exc = original_connection_error
122 connection_closed_cleanly = original_connection_error is None
124 if self._closed is not None:
125 # If someone is waiting for the closed future,
126 # we should set it to None or an exception. If
127 # self._closed is None, it means that
128 # connection_lost() was called already
129 # or nobody is waiting for it.
130 if connection_closed_cleanly:
131 set_result(self._closed, None)
132 else:
133 assert original_connection_error is not None
134 set_exception(
135 self._closed,
136 ClientConnectionError(
137 f"Connection lost: {original_connection_error !s}",
138 ),
139 original_connection_error,
140 )
142 if self._payload_parser is not None:
143 with suppress(Exception): # FIXME: log this somehow?
144 self._payload_parser.feed_eof()
146 uncompleted = None
147 if self._parser is not None:
148 try:
149 uncompleted = self._parser.feed_eof()
150 except Exception as underlying_exc:
151 if self._payload is not None:
152 client_payload_exc_msg = (
153 f"Response payload is not completed: {underlying_exc !r}"
154 )
155 if not connection_closed_cleanly:
156 client_payload_exc_msg = (
157 f"{client_payload_exc_msg !s}. "
158 f"{original_connection_error !r}"
159 )
160 set_exception(
161 self._payload,
162 ClientPayloadError(client_payload_exc_msg),
163 underlying_exc,
164 )
166 if not self.is_eof():
167 if isinstance(original_connection_error, OSError):
168 reraised_exc = ClientOSError(*original_connection_error.args)
169 if connection_closed_cleanly:
170 reraised_exc = ServerDisconnectedError(uncompleted)
171 # assigns self._should_close to True as side effect,
172 # we do it anyway below
173 underlying_non_eof_exc = (
174 _EXC_SENTINEL
175 if connection_closed_cleanly
176 else original_connection_error
177 )
178 assert underlying_non_eof_exc is not None
179 assert reraised_exc is not None
180 self.set_exception(reraised_exc, underlying_non_eof_exc)
182 self._should_close = True
183 self._parser = None
184 self._payload = None
185 self._payload_parser = None
186 self._reading_paused = False
188 super().connection_lost(reraised_exc)
190 def eof_received(self) -> None:
191 # should call parser.feed_eof() most likely
192 self._drop_timeout()
194 def pause_reading(self) -> None:
195 super().pause_reading()
196 self._drop_timeout()
198 def resume_reading(self, resume_parser: bool = True) -> None:
199 super().resume_reading(resume_parser)
200 self._reschedule_timeout()
202 def set_exception(
203 self,
204 exc: type[BaseException] | BaseException,
205 exc_cause: BaseException = _EXC_SENTINEL,
206 ) -> None:
207 self._should_close = True
208 self._drop_timeout()
209 super().set_exception(exc, exc_cause)
211 def set_parser(
212 self,
213 parser: WebSocketReader,
214 payload: WebSocketDataQueue,
215 data_received_cb: Callable[[], None] | None = None,
216 ) -> None:
217 self._payload = payload
218 self._payload_parser = parser
219 self._data_received_cb = data_received_cb
221 self._drop_timeout()
223 if self._tail:
224 data, self._tail = self._tail, b""
225 self.data_received(data)
227 def set_response_params(
228 self,
229 *,
230 timer: BaseTimerContext | None = None,
231 skip_payload: bool = False,
232 read_until_eof: bool = False,
233 auto_decompress: bool = True,
234 read_timeout: float | None = None,
235 read_bufsize: int = DEFAULT_CHUNK_SIZE,
236 timeout_ceil_threshold: float = 5,
237 max_line_size: int = 8190,
238 max_field_size: int = 8190,
239 max_headers: int = 128,
240 ) -> None:
241 self._skip_payload = skip_payload
243 self._read_timeout = read_timeout
245 self._timeout_ceil_threshold = timeout_ceil_threshold
247 self._parser = HttpResponseParser(
248 self,
249 self._loop,
250 read_bufsize,
251 timer=timer,
252 payload_exception=ClientPayloadError,
253 response_with_body=not skip_payload,
254 read_until_eof=read_until_eof,
255 auto_decompress=auto_decompress,
256 max_line_size=max_line_size,
257 max_field_size=max_field_size,
258 max_headers=max_headers,
259 )
261 if self._tail:
262 data, self._tail = self._tail, b""
263 self.data_received(data)
265 def _drop_timeout(self) -> None:
266 if self._read_timeout_handle is not None:
267 self._read_timeout_handle.cancel()
268 self._read_timeout_handle = None
270 def _reschedule_timeout(self) -> None:
271 timeout = self._read_timeout
272 if self._read_timeout_handle is not None:
273 self._read_timeout_handle.cancel()
275 if timeout:
276 self._read_timeout_handle = self._loop.call_later(
277 timeout, self._on_read_timeout
278 )
279 else:
280 self._read_timeout_handle = None
282 def start_timeout(self) -> None:
283 self._reschedule_timeout()
285 @property
286 def read_timeout(self) -> float | None:
287 return self._read_timeout
289 @read_timeout.setter
290 def read_timeout(self, read_timeout: float | None) -> None:
291 self._read_timeout = read_timeout
293 def _on_read_timeout(self) -> None:
294 exc = SocketTimeoutError("Timeout on reading data from socket")
295 self.set_exception(exc)
296 if self._payload is not None:
297 set_exception(self._payload, exc)
299 def data_received(self, data: bytes) -> None:
300 # If no data, then we are resuming decompression. We haven't received
301 # data from the socket, so we can avoid the reschedule overhead.
302 if data:
303 self._reschedule_timeout()
305 # custom payload parser - currently always WebSocketReader
306 if self._payload_parser is not None:
307 if self._data_received_cb is not None:
308 self._data_received_cb()
309 eof, tail = self._payload_parser.feed_data(data)
310 if eof:
311 self._payload = None
312 self._payload_parser = None
314 if tail:
315 self.data_received(tail)
316 return
318 if self._upgraded or self._parser is None:
319 # i.e. websocket connection, websocket parser is not set yet
320 self._tail += data
321 return
323 # parse http messages
324 try:
325 messages, upgraded, tail = self._parser.feed_data(data)
326 except Exception as underlying_exc:
327 if self.transport is not None:
328 # connection.release() could be called BEFORE
329 # data_received(), the transport is already
330 # closed in this case
331 self.transport.close()
332 # should_close is True after the call
333 if isinstance(underlying_exc, HttpProcessingError):
334 exc = HttpProcessingError(
335 code=underlying_exc.code,
336 message=underlying_exc.message,
337 headers=underlying_exc.headers,
338 )
339 else:
340 exc = HttpProcessingError()
341 self.set_exception(exc, underlying_exc)
342 return
344 self._upgraded = upgraded
346 payload: StreamReader | None = None
347 for message, payload in messages:
348 if message.should_close:
349 self._should_close = True
351 self._payload = payload
353 if self._skip_payload or message.code in EMPTY_BODY_STATUS_CODES:
354 self.feed_data((message, EMPTY_PAYLOAD))
355 else:
356 self.feed_data((message, payload))
358 if payload is not None:
359 # new message(s) was processed
360 # register timeout handler unsubscribing
361 # either on end-of-stream or immediately for
362 # EMPTY_PAYLOAD
363 if payload is not EMPTY_PAYLOAD:
364 payload.on_eof(self._drop_timeout)
365 else:
366 self._drop_timeout()
368 if upgraded and tail:
369 self.data_received(tail)