Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/client_proto.py: 20%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2from contextlib import suppress
3from typing import Any, Optional, Tuple, Type, Union
5from .base_protocol import BaseProtocol
6from .client_exceptions import (
7 ClientConnectionError,
8 ClientOSError,
9 ClientPayloadError,
10 ServerDisconnectedError,
11 SocketTimeoutError,
12)
13from .helpers import (
14 _EXC_SENTINEL,
15 EMPTY_BODY_STATUS_CODES,
16 BaseTimerContext,
17 set_exception,
18 set_result,
19)
20from .http import HttpResponseParser, RawResponseMessage, WebSocketReader
21from .http_exceptions import HttpProcessingError
22from .streams import EMPTY_PAYLOAD, DataQueue, StreamReader
25class ResponseHandler(BaseProtocol, DataQueue[Tuple[RawResponseMessage, StreamReader]]):
26 """Helper class to adapt between Protocol and StreamReader."""
28 def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
29 BaseProtocol.__init__(self, loop=loop)
30 DataQueue.__init__(self, loop)
32 self._should_close = False
34 self._payload: Optional[StreamReader] = None
35 self._skip_payload = False
36 self._payload_parser: Optional[WebSocketReader] = None
38 self._timer = None
40 self._tail = b""
41 self._upgraded = False
42 self._parser: Optional[HttpResponseParser] = None
44 self._read_timeout: Optional[float] = None
45 self._read_timeout_handle: Optional[asyncio.TimerHandle] = None
47 self._timeout_ceil_threshold: Optional[float] = 5
49 self.closed: asyncio.Future[None] = self._loop.create_future()
51 @property
52 def upgraded(self) -> bool:
53 return self._upgraded
55 @property
56 def should_close(self) -> bool:
57 return bool(
58 self._should_close
59 or (self._payload is not None and not self._payload.is_eof())
60 or self._upgraded
61 or self._exception is not None
62 or self._payload_parser is not None
63 or self._buffer
64 or self._tail
65 )
67 def force_close(self) -> None:
68 self._should_close = True
70 def close(self) -> None:
71 self._exception = None # Break cyclic references
72 transport = self.transport
73 if transport is not None:
74 transport.close()
75 self.transport = None
76 self._payload = None
77 self._drop_timeout()
79 def is_connected(self) -> bool:
80 return self.transport is not None and not self.transport.is_closing()
82 def connection_lost(self, exc: Optional[BaseException]) -> None:
83 self._drop_timeout()
85 original_connection_error = exc
86 reraised_exc = original_connection_error
88 connection_closed_cleanly = original_connection_error is None
90 if connection_closed_cleanly:
91 set_result(self.closed, None)
92 else:
93 assert original_connection_error is not None
94 set_exception(
95 self.closed,
96 ClientConnectionError(
97 f"Connection lost: {original_connection_error !s}",
98 ),
99 original_connection_error,
100 )
102 if self._payload_parser is not None:
103 with suppress(Exception): # FIXME: log this somehow?
104 self._payload_parser.feed_eof()
106 uncompleted = None
107 if self._parser is not None:
108 try:
109 uncompleted = self._parser.feed_eof()
110 except Exception as underlying_exc:
111 if self._payload is not None:
112 client_payload_exc_msg = (
113 f"Response payload is not completed: {underlying_exc !r}"
114 )
115 if not connection_closed_cleanly:
116 client_payload_exc_msg = (
117 f"{client_payload_exc_msg !s}. "
118 f"{original_connection_error !r}"
119 )
120 set_exception(
121 self._payload,
122 ClientPayloadError(client_payload_exc_msg),
123 underlying_exc,
124 )
126 if not self.is_eof():
127 if isinstance(original_connection_error, OSError):
128 reraised_exc = ClientOSError(*original_connection_error.args)
129 if connection_closed_cleanly:
130 reraised_exc = ServerDisconnectedError(uncompleted)
131 # assigns self._should_close to True as side effect,
132 # we do it anyway below
133 underlying_non_eof_exc = (
134 _EXC_SENTINEL
135 if connection_closed_cleanly
136 else original_connection_error
137 )
138 assert underlying_non_eof_exc is not None
139 assert reraised_exc is not None
140 self.set_exception(reraised_exc, underlying_non_eof_exc)
142 self._should_close = True
143 self._parser = None
144 self._payload = None
145 self._payload_parser = None
146 self._reading_paused = False
148 super().connection_lost(reraised_exc)
150 def eof_received(self) -> None:
151 # should call parser.feed_eof() most likely
152 self._drop_timeout()
154 def pause_reading(self) -> None:
155 super().pause_reading()
156 self._drop_timeout()
158 def resume_reading(self) -> None:
159 super().resume_reading()
160 self._reschedule_timeout()
162 def set_exception(
163 self,
164 exc: Union[Type[BaseException], BaseException],
165 exc_cause: BaseException = _EXC_SENTINEL,
166 ) -> None:
167 self._should_close = True
168 self._drop_timeout()
169 super().set_exception(exc, exc_cause)
171 def set_parser(self, parser: Any, payload: Any) -> None:
172 # TODO: actual types are:
173 # parser: WebSocketReader
174 # payload: WebSocketDataQueue
175 # but they are not generi enough
176 # Need an ABC for both types
177 self._payload = payload
178 self._payload_parser = parser
180 self._drop_timeout()
182 if self._tail:
183 data, self._tail = self._tail, b""
184 self.data_received(data)
186 def set_response_params(
187 self,
188 *,
189 timer: Optional[BaseTimerContext] = None,
190 skip_payload: bool = False,
191 read_until_eof: bool = False,
192 auto_decompress: bool = True,
193 read_timeout: Optional[float] = None,
194 read_bufsize: int = 2**16,
195 timeout_ceil_threshold: float = 5,
196 max_line_size: int = 8190,
197 max_field_size: int = 8190,
198 ) -> None:
199 self._skip_payload = skip_payload
201 self._read_timeout = read_timeout
203 self._timeout_ceil_threshold = timeout_ceil_threshold
205 self._parser = HttpResponseParser(
206 self,
207 self._loop,
208 read_bufsize,
209 timer=timer,
210 payload_exception=ClientPayloadError,
211 response_with_body=not skip_payload,
212 read_until_eof=read_until_eof,
213 auto_decompress=auto_decompress,
214 max_line_size=max_line_size,
215 max_field_size=max_field_size,
216 )
218 if self._tail:
219 data, self._tail = self._tail, b""
220 self.data_received(data)
222 def _drop_timeout(self) -> None:
223 if self._read_timeout_handle is not None:
224 self._read_timeout_handle.cancel()
225 self._read_timeout_handle = None
227 def _reschedule_timeout(self) -> None:
228 timeout = self._read_timeout
229 if self._read_timeout_handle is not None:
230 self._read_timeout_handle.cancel()
232 if timeout:
233 self._read_timeout_handle = self._loop.call_later(
234 timeout, self._on_read_timeout
235 )
236 else:
237 self._read_timeout_handle = None
239 def start_timeout(self) -> None:
240 self._reschedule_timeout()
242 @property
243 def read_timeout(self) -> Optional[float]:
244 return self._read_timeout
246 @read_timeout.setter
247 def read_timeout(self, read_timeout: Optional[float]) -> None:
248 self._read_timeout = read_timeout
250 def _on_read_timeout(self) -> None:
251 exc = SocketTimeoutError("Timeout on reading data from socket")
252 self.set_exception(exc)
253 if self._payload is not None:
254 set_exception(self._payload, exc)
256 def data_received(self, data: bytes) -> None:
257 self._reschedule_timeout()
259 if not data:
260 return
262 # custom payload parser - currently always WebSocketReader
263 if self._payload_parser is not None:
264 eof, tail = self._payload_parser.feed_data(data)
265 if eof:
266 self._payload = None
267 self._payload_parser = None
269 if tail:
270 self.data_received(tail)
271 return
273 if self._upgraded or self._parser is None:
274 # i.e. websocket connection, websocket parser is not set yet
275 self._tail += data
276 return
278 # parse http messages
279 try:
280 messages, upgraded, tail = self._parser.feed_data(data)
281 except BaseException as underlying_exc:
282 if self.transport is not None:
283 # connection.release() could be called BEFORE
284 # data_received(), the transport is already
285 # closed in this case
286 self.transport.close()
287 # should_close is True after the call
288 if isinstance(underlying_exc, HttpProcessingError):
289 exc = HttpProcessingError(
290 code=underlying_exc.code,
291 message=underlying_exc.message,
292 headers=underlying_exc.headers,
293 )
294 else:
295 exc = HttpProcessingError()
296 self.set_exception(exc, underlying_exc)
297 return
299 self._upgraded = upgraded
301 payload: Optional[StreamReader] = None
302 for message, payload in messages:
303 if message.should_close:
304 self._should_close = True
306 self._payload = payload
308 if self._skip_payload or message.code in EMPTY_BODY_STATUS_CODES:
309 self.feed_data((message, EMPTY_PAYLOAD))
310 else:
311 self.feed_data((message, payload))
313 if payload is not None:
314 # new message(s) was processed
315 # register timeout handler unsubscribing
316 # either on end-of-stream or immediately for
317 # EMPTY_PAYLOAD
318 if payload is not EMPTY_PAYLOAD:
319 payload.on_eof(self._drop_timeout)
320 else:
321 self._drop_timeout()
323 if upgraded and tail:
324 self.data_received(tail)