Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/client_proto.py: 19%
155 statements
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-26 06:16 +0000
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-26 06:16 +0000
1import asyncio
2from contextlib import suppress
3from typing import Any, Optional, Tuple
5from .base_protocol import BaseProtocol
6from .client_exceptions import (
7 ClientOSError,
8 ClientPayloadError,
9 ServerDisconnectedError,
10 SocketTimeoutError,
11)
12from .helpers import (
13 BaseTimerContext,
14 set_exception,
15 set_result,
16 status_code_must_be_empty_body,
17)
18from .http import HttpResponseParser, RawResponseMessage, WebSocketReader
19from .streams import EMPTY_PAYLOAD, DataQueue, StreamReader
22class ResponseHandler(BaseProtocol, DataQueue[Tuple[RawResponseMessage, StreamReader]]):
23 """Helper class to adapt between Protocol and StreamReader."""
25 def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
26 BaseProtocol.__init__(self, loop=loop)
27 DataQueue.__init__(self, loop)
29 self._should_close = False
31 self._payload: Optional[StreamReader] = None
32 self._skip_payload = False
33 self._payload_parser: Optional[WebSocketReader] = None
35 self._timer = None
37 self._tail = b""
38 self._upgraded = False
39 self._parser: Optional[HttpResponseParser] = None
41 self._read_timeout: Optional[float] = None
42 self._read_timeout_handle: Optional[asyncio.TimerHandle] = None
44 self._timeout_ceil_threshold: Optional[float] = 5
46 self.closed: asyncio.Future[None] = self._loop.create_future()
48 @property
49 def upgraded(self) -> bool:
50 return self._upgraded
52 @property
53 def should_close(self) -> bool:
54 if self._payload is not None and not self._payload.is_eof():
55 return True
57 return (
58 self._should_close
59 or self._upgraded
60 or self.exception() is not None
61 or self._payload_parser is not None
62 or len(self) > 0
63 or bool(self._tail)
64 )
66 def force_close(self) -> None:
67 self._should_close = True
69 def close(self) -> None:
70 transport = self.transport
71 if transport is not None:
72 transport.close()
73 self.transport = None
74 self._payload = None
75 self._drop_timeout()
77 def is_connected(self) -> bool:
78 return self.transport is not None and not self.transport.is_closing()
80 def connection_lost(self, exc: Optional[BaseException]) -> None:
81 self._drop_timeout()
83 if exc is not None:
84 set_exception(self.closed, exc)
85 else:
86 set_result(self.closed, None)
88 if self._payload_parser is not None:
89 with suppress(Exception):
90 self._payload_parser.feed_eof()
92 uncompleted = None
93 if self._parser is not None:
94 try:
95 uncompleted = self._parser.feed_eof()
96 except Exception as e:
97 if self._payload is not None:
98 exc = ClientPayloadError("Response payload is not completed")
99 exc.__cause__ = e
100 self._payload.set_exception(exc)
102 if not self.is_eof():
103 if isinstance(exc, OSError):
104 exc = ClientOSError(*exc.args)
105 if exc is None:
106 exc = ServerDisconnectedError(uncompleted)
107 # assigns self._should_close to True as side effect,
108 # we do it anyway below
109 self.set_exception(exc)
111 self._should_close = True
112 self._parser = None
113 self._payload = None
114 self._payload_parser = None
115 self._reading_paused = False
117 super().connection_lost(exc)
119 def eof_received(self) -> None:
120 # should call parser.feed_eof() most likely
121 self._drop_timeout()
123 def pause_reading(self) -> None:
124 super().pause_reading()
125 self._drop_timeout()
127 def resume_reading(self) -> None:
128 super().resume_reading()
129 self._reschedule_timeout()
131 def set_exception(self, exc: BaseException) -> None:
132 self._should_close = True
133 self._drop_timeout()
134 super().set_exception(exc)
136 def set_parser(self, parser: Any, payload: Any) -> None:
137 # TODO: actual types are:
138 # parser: WebSocketReader
139 # payload: FlowControlDataQueue
140 # but they are not generi enough
141 # Need an ABC for both types
142 self._payload = payload
143 self._payload_parser = parser
145 self._drop_timeout()
147 if self._tail:
148 data, self._tail = self._tail, b""
149 self.data_received(data)
151 def set_response_params(
152 self,
153 *,
154 timer: Optional[BaseTimerContext] = None,
155 skip_payload: bool = False,
156 read_until_eof: bool = False,
157 auto_decompress: bool = True,
158 read_timeout: Optional[float] = None,
159 read_bufsize: int = 2**16,
160 timeout_ceil_threshold: float = 5,
161 max_line_size: int = 8190,
162 max_field_size: int = 8190,
163 ) -> None:
164 self._skip_payload = skip_payload
166 self._read_timeout = read_timeout
168 self._timeout_ceil_threshold = timeout_ceil_threshold
170 self._parser = HttpResponseParser(
171 self,
172 self._loop,
173 read_bufsize,
174 timer=timer,
175 payload_exception=ClientPayloadError,
176 response_with_body=not skip_payload,
177 read_until_eof=read_until_eof,
178 auto_decompress=auto_decompress,
179 max_line_size=max_line_size,
180 max_field_size=max_field_size,
181 )
183 if self._tail:
184 data, self._tail = self._tail, b""
185 self.data_received(data)
187 def _drop_timeout(self) -> None:
188 if self._read_timeout_handle is not None:
189 self._read_timeout_handle.cancel()
190 self._read_timeout_handle = None
192 def _reschedule_timeout(self) -> None:
193 timeout = self._read_timeout
194 if self._read_timeout_handle is not None:
195 self._read_timeout_handle.cancel()
197 if timeout:
198 self._read_timeout_handle = self._loop.call_later(
199 timeout, self._on_read_timeout
200 )
201 else:
202 self._read_timeout_handle = None
204 def start_timeout(self) -> None:
205 self._reschedule_timeout()
207 def _on_read_timeout(self) -> None:
208 exc = SocketTimeoutError("Timeout on reading data from socket")
209 self.set_exception(exc)
210 if self._payload is not None:
211 self._payload.set_exception(exc)
213 def data_received(self, data: bytes) -> None:
214 self._reschedule_timeout()
216 if not data:
217 return
219 # custom payload parser
220 if self._payload_parser is not None:
221 eof, tail = self._payload_parser.feed_data(data)
222 if eof:
223 self._payload = None
224 self._payload_parser = None
226 if tail:
227 self.data_received(tail)
228 return
229 else:
230 if self._upgraded or self._parser is None:
231 # i.e. websocket connection, websocket parser is not set yet
232 self._tail += data
233 else:
234 # parse http messages
235 try:
236 messages, upgraded, tail = self._parser.feed_data(data)
237 except BaseException as exc:
238 if self.transport is not None:
239 # connection.release() could be called BEFORE
240 # data_received(), the transport is already
241 # closed in this case
242 self.transport.close()
243 # should_close is True after the call
244 self.set_exception(exc)
245 return
247 self._upgraded = upgraded
249 payload: Optional[StreamReader] = None
250 for message, payload in messages:
251 if message.should_close:
252 self._should_close = True
254 self._payload = payload
256 if self._skip_payload or status_code_must_be_empty_body(
257 message.code
258 ):
259 self.feed_data((message, EMPTY_PAYLOAD), 0)
260 else:
261 self.feed_data((message, payload), 0)
262 if payload is not None:
263 # new message(s) was processed
264 # register timeout handler unsubscribing
265 # either on end-of-stream or immediately for
266 # EMPTY_PAYLOAD
267 if payload is not EMPTY_PAYLOAD:
268 payload.on_eof(self._drop_timeout)
269 else:
270 self._drop_timeout()
272 if tail:
273 if upgraded:
274 self.data_received(tail)
275 else:
276 self._tail = tail