Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/client_proto.py: 19%
153 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:52 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:52 +0000
1import asyncio
2from contextlib import suppress
3from typing import Any, Optional, Tuple
5from .base_protocol import BaseProtocol
6from .client_exceptions import (
7 ClientOSError,
8 ClientPayloadError,
9 ServerDisconnectedError,
10 ServerTimeoutError,
11)
12from .helpers import BaseTimerContext, set_exception, set_result
13from .http import HttpResponseParser, RawResponseMessage
14from .streams import EMPTY_PAYLOAD, DataQueue, StreamReader
17class ResponseHandler(BaseProtocol, DataQueue[Tuple[RawResponseMessage, StreamReader]]):
18 """Helper class to adapt between Protocol and StreamReader."""
20 def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
21 BaseProtocol.__init__(self, loop=loop)
22 DataQueue.__init__(self, loop)
24 self._should_close = False
26 self._payload: Optional[StreamReader] = None
27 self._skip_payload = False
28 self._payload_parser = None
30 self._timer = None
32 self._tail = b""
33 self._upgraded = False
34 self._parser: Optional[HttpResponseParser] = None
36 self._read_timeout: Optional[float] = None
37 self._read_timeout_handle: Optional[asyncio.TimerHandle] = None
39 self._timeout_ceil_threshold: Optional[float] = 5
41 self.closed: asyncio.Future[None] = self._loop.create_future()
43 @property
44 def upgraded(self) -> bool:
45 return self._upgraded
47 @property
48 def should_close(self) -> bool:
49 if self._payload is not None and not self._payload.is_eof() or self._upgraded:
50 return True
52 return (
53 self._should_close
54 or self._upgraded
55 or self.exception() is not None
56 or self._payload_parser is not None
57 or len(self) > 0
58 or bool(self._tail)
59 )
61 def force_close(self) -> None:
62 self._should_close = True
64 def close(self) -> None:
65 transport = self.transport
66 if transport is not None:
67 transport.close()
68 self.transport = None
69 self._payload = None
70 self._drop_timeout()
72 def is_connected(self) -> bool:
73 return self.transport is not None and not self.transport.is_closing()
75 def connection_lost(self, exc: Optional[BaseException]) -> None:
76 self._drop_timeout()
78 if exc is not None:
79 set_exception(self.closed, exc)
80 else:
81 set_result(self.closed, None)
83 if self._payload_parser is not None:
84 with suppress(Exception):
85 self._payload_parser.feed_eof()
87 uncompleted = None
88 if self._parser is not None:
89 try:
90 uncompleted = self._parser.feed_eof()
91 except Exception:
92 if self._payload is not None:
93 self._payload.set_exception(
94 ClientPayloadError("Response payload is not completed")
95 )
97 if not self.is_eof():
98 if isinstance(exc, OSError):
99 exc = ClientOSError(*exc.args)
100 if exc is None:
101 exc = ServerDisconnectedError(uncompleted)
102 # assigns self._should_close to True as side effect,
103 # we do it anyway below
104 self.set_exception(exc)
106 self._should_close = True
107 self._parser = None
108 self._payload = None
109 self._payload_parser = None
110 self._reading_paused = False
112 super().connection_lost(exc)
114 def eof_received(self) -> None:
115 # should call parser.feed_eof() most likely
116 self._drop_timeout()
118 def pause_reading(self) -> None:
119 super().pause_reading()
120 self._drop_timeout()
122 def resume_reading(self) -> None:
123 super().resume_reading()
124 self._reschedule_timeout()
126 def set_exception(self, exc: BaseException) -> None:
127 self._should_close = True
128 self._drop_timeout()
129 super().set_exception(exc)
131 def set_parser(self, parser: Any, payload: Any) -> None:
132 # TODO: actual types are:
133 # parser: WebSocketReader
134 # payload: FlowControlDataQueue
135 # but they are not generi enough
136 # Need an ABC for both types
137 self._payload = payload
138 self._payload_parser = parser
140 self._drop_timeout()
142 if self._tail:
143 data, self._tail = self._tail, b""
144 self.data_received(data)
146 def set_response_params(
147 self,
148 *,
149 timer: Optional[BaseTimerContext] = None,
150 skip_payload: bool = False,
151 read_until_eof: bool = False,
152 auto_decompress: bool = True,
153 read_timeout: Optional[float] = None,
154 read_bufsize: int = 2**16,
155 timeout_ceil_threshold: float = 5,
156 max_line_size: int = 8190,
157 max_field_size: int = 8190,
158 ) -> None:
159 self._skip_payload = skip_payload
161 self._read_timeout = read_timeout
163 self._timeout_ceil_threshold = timeout_ceil_threshold
165 self._parser = HttpResponseParser(
166 self,
167 self._loop,
168 read_bufsize,
169 timer=timer,
170 payload_exception=ClientPayloadError,
171 response_with_body=not skip_payload,
172 read_until_eof=read_until_eof,
173 auto_decompress=auto_decompress,
174 max_line_size=max_line_size,
175 max_field_size=max_field_size,
176 )
178 if self._tail:
179 data, self._tail = self._tail, b""
180 self.data_received(data)
182 def _drop_timeout(self) -> None:
183 if self._read_timeout_handle is not None:
184 self._read_timeout_handle.cancel()
185 self._read_timeout_handle = None
187 def _reschedule_timeout(self) -> None:
188 timeout = self._read_timeout
189 if self._read_timeout_handle is not None:
190 self._read_timeout_handle.cancel()
192 if timeout:
193 self._read_timeout_handle = self._loop.call_later(
194 timeout, self._on_read_timeout
195 )
196 else:
197 self._read_timeout_handle = None
199 def start_timeout(self) -> None:
200 self._reschedule_timeout()
202 def _on_read_timeout(self) -> None:
203 exc = ServerTimeoutError("Timeout on reading data from socket")
204 self.set_exception(exc)
205 if self._payload is not None:
206 self._payload.set_exception(exc)
208 def data_received(self, data: bytes) -> None:
209 self._reschedule_timeout()
211 if not data:
212 return
214 # custom payload parser
215 if self._payload_parser is not None:
216 eof, tail = self._payload_parser.feed_data(data)
217 if eof:
218 self._payload = None
219 self._payload_parser = None
221 if tail:
222 self.data_received(tail)
223 return
224 else:
225 if self._upgraded or self._parser is None:
226 # i.e. websocket connection, websocket parser is not set yet
227 self._tail += data
228 else:
229 # parse http messages
230 try:
231 messages, upgraded, tail = self._parser.feed_data(data)
232 except BaseException as exc:
233 if self.transport is not None:
234 # connection.release() could be called BEFORE
235 # data_received(), the transport is already
236 # closed in this case
237 self.transport.close()
238 # should_close is True after the call
239 self.set_exception(exc)
240 return
242 self._upgraded = upgraded
244 payload: Optional[StreamReader] = None
245 for message, payload in messages:
246 if message.should_close:
247 self._should_close = True
249 self._payload = payload
251 if self._skip_payload or message.code in (204, 304):
252 self.feed_data((message, EMPTY_PAYLOAD), 0)
253 else:
254 self.feed_data((message, payload), 0)
255 if payload is not None:
256 # new message(s) was processed
257 # register timeout handler unsubscribing
258 # either on end-of-stream or immediately for
259 # EMPTY_PAYLOAD
260 if payload is not EMPTY_PAYLOAD:
261 payload.on_eof(self._drop_timeout)
262 else:
263 self._drop_timeout()
265 if tail:
266 if upgraded:
267 self.data_received(tail)
268 else:
269 self._tail = tail