Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/aiohttp/client_proto.py: 19%
149 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:40 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:40 +0000
1import asyncio
2from contextlib import suppress
3from typing import Any, Optional, Tuple
5from .base_protocol import BaseProtocol
6from .client_exceptions import (
7 ClientOSError,
8 ClientPayloadError,
9 ServerDisconnectedError,
10 ServerTimeoutError,
11)
12from .helpers import BaseTimerContext, status_code_must_be_empty_body
13from .http import HttpResponseParser, RawResponseMessage
14from .streams import EMPTY_PAYLOAD, DataQueue, StreamReader
17class ResponseHandler(BaseProtocol, DataQueue[Tuple[RawResponseMessage, StreamReader]]):
18 """Helper class to adapt between Protocol and StreamReader."""
20 def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
21 BaseProtocol.__init__(self, loop=loop)
22 DataQueue.__init__(self, loop)
24 self._should_close = False
26 self._payload: Optional[StreamReader] = None
27 self._skip_payload = False
28 self._payload_parser = None
30 self._timer = None
32 self._tail = b""
33 self._upgraded = False
34 self._parser: Optional[HttpResponseParser] = None
36 self._read_timeout: Optional[float] = None
37 self._read_timeout_handle: Optional[asyncio.TimerHandle] = None
39 self._timeout_ceil_threshold: Optional[float] = 5
41 @property
42 def upgraded(self) -> bool:
43 return self._upgraded
45 @property
46 def should_close(self) -> bool:
47 if self._payload is not None and not self._payload.is_eof() or self._upgraded:
48 return True
50 return (
51 self._should_close
52 or self._upgraded
53 or self.exception() is not None
54 or self._payload_parser is not None
55 or len(self) > 0
56 or bool(self._tail)
57 )
59 def force_close(self) -> None:
60 self._should_close = True
62 def close(self) -> None:
63 transport = self.transport
64 if transport is not None:
65 transport.close()
66 self.transport = None
67 self._payload = None
68 self._drop_timeout()
70 def is_connected(self) -> bool:
71 return self.transport is not None and not self.transport.is_closing()
73 def connection_lost(self, exc: Optional[BaseException]) -> None:
74 self._drop_timeout()
76 if self._payload_parser is not None:
77 with suppress(Exception):
78 self._payload_parser.feed_eof()
80 uncompleted = None
81 if self._parser is not None:
82 try:
83 uncompleted = self._parser.feed_eof()
84 except Exception:
85 if self._payload is not None:
86 self._payload.set_exception(
87 ClientPayloadError("Response payload is not completed")
88 )
90 if not self.is_eof():
91 if isinstance(exc, OSError):
92 exc = ClientOSError(*exc.args)
93 if exc is None:
94 exc = ServerDisconnectedError(uncompleted)
95 # assigns self._should_close to True as side effect,
96 # we do it anyway below
97 self.set_exception(exc)
99 self._should_close = True
100 self._parser = None
101 self._payload = None
102 self._payload_parser = None
103 self._reading_paused = False
105 super().connection_lost(exc)
107 def eof_received(self) -> None:
108 # should call parser.feed_eof() most likely
109 self._drop_timeout()
111 def pause_reading(self) -> None:
112 super().pause_reading()
113 self._drop_timeout()
115 def resume_reading(self) -> None:
116 super().resume_reading()
117 self._reschedule_timeout()
119 def set_exception(self, exc: BaseException) -> None:
120 self._should_close = True
121 self._drop_timeout()
122 super().set_exception(exc)
124 def set_parser(self, parser: Any, payload: Any) -> None:
125 # TODO: actual types are:
126 # parser: WebSocketReader
127 # payload: FlowControlDataQueue
128 # but they are not generi enough
129 # Need an ABC for both types
130 self._payload = payload
131 self._payload_parser = parser
133 self._drop_timeout()
135 if self._tail:
136 data, self._tail = self._tail, b""
137 self.data_received(data)
139 def set_response_params(
140 self,
141 *,
142 timer: Optional[BaseTimerContext] = None,
143 skip_payload: bool = False,
144 read_until_eof: bool = False,
145 auto_decompress: bool = True,
146 read_timeout: Optional[float] = None,
147 read_bufsize: int = 2**16,
148 timeout_ceil_threshold: float = 5,
149 max_line_size: int = 8190,
150 max_field_size: int = 8190,
151 ) -> None:
152 self._skip_payload = skip_payload
154 self._read_timeout = read_timeout
156 self._timeout_ceil_threshold = timeout_ceil_threshold
158 self._parser = HttpResponseParser(
159 self,
160 self._loop,
161 read_bufsize,
162 timer=timer,
163 payload_exception=ClientPayloadError,
164 response_with_body=not skip_payload,
165 read_until_eof=read_until_eof,
166 auto_decompress=auto_decompress,
167 max_line_size=max_line_size,
168 max_field_size=max_field_size,
169 )
171 if self._tail:
172 data, self._tail = self._tail, b""
173 self.data_received(data)
175 def _drop_timeout(self) -> None:
176 if self._read_timeout_handle is not None:
177 self._read_timeout_handle.cancel()
178 self._read_timeout_handle = None
180 def _reschedule_timeout(self) -> None:
181 timeout = self._read_timeout
182 if self._read_timeout_handle is not None:
183 self._read_timeout_handle.cancel()
185 if timeout:
186 self._read_timeout_handle = self._loop.call_later(
187 timeout, self._on_read_timeout
188 )
189 else:
190 self._read_timeout_handle = None
192 def start_timeout(self) -> None:
193 self._reschedule_timeout()
195 def _on_read_timeout(self) -> None:
196 exc = ServerTimeoutError("Timeout on reading data from socket")
197 self.set_exception(exc)
198 if self._payload is not None:
199 self._payload.set_exception(exc)
201 def data_received(self, data: bytes) -> None:
202 self._reschedule_timeout()
204 if not data:
205 return
207 # custom payload parser
208 if self._payload_parser is not None:
209 eof, tail = self._payload_parser.feed_data(data)
210 if eof:
211 self._payload = None
212 self._payload_parser = None
214 if tail:
215 self.data_received(tail)
216 return
217 else:
218 if self._upgraded or self._parser is None:
219 # i.e. websocket connection, websocket parser is not set yet
220 self._tail += data
221 else:
222 # parse http messages
223 try:
224 messages, upgraded, tail = self._parser.feed_data(data)
225 except BaseException as exc:
226 if self.transport is not None:
227 # connection.release() could be called BEFORE
228 # data_received(), the transport is already
229 # closed in this case
230 self.transport.close()
231 # should_close is True after the call
232 self.set_exception(exc)
233 return
235 self._upgraded = upgraded
237 payload: Optional[StreamReader] = None
238 for message, payload in messages:
239 if message.should_close:
240 self._should_close = True
242 self._payload = payload
244 if self._skip_payload or status_code_must_be_empty_body(
245 message.code
246 ):
247 self.feed_data((message, EMPTY_PAYLOAD), 0)
248 else:
249 self.feed_data((message, payload), 0)
250 if payload is not None:
251 # new message(s) was processed
252 # register timeout handler unsubscribing
253 # either on end-of-stream or immediately for
254 # EMPTY_PAYLOAD
255 if payload is not EMPTY_PAYLOAD:
256 payload.on_eof(self._drop_timeout)
257 else:
258 self._drop_timeout()
260 if tail:
261 if upgraded:
262 self.data_received(tail)
263 else:
264 self._tail = tail