Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/aiohttp/client_proto.py: 19%
146 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:56 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:56 +0000
1import asyncio
2from contextlib import suppress
3from typing import Any, Optional, Tuple
5from .base_protocol import BaseProtocol
6from .client_exceptions import (
7 ClientOSError,
8 ClientPayloadError,
9 ServerDisconnectedError,
10 ServerTimeoutError,
11)
12from .helpers import BaseTimerContext
13from .http import HttpResponseParser, RawResponseMessage
14from .streams import EMPTY_PAYLOAD, DataQueue, StreamReader
17class ResponseHandler(BaseProtocol, DataQueue[Tuple[RawResponseMessage, StreamReader]]):
18 """Helper class to adapt between Protocol and StreamReader."""
20 def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
21 BaseProtocol.__init__(self, loop=loop)
22 DataQueue.__init__(self, loop)
24 self._should_close = False
26 self._payload: Optional[StreamReader] = None
27 self._skip_payload = False
28 self._payload_parser = None
30 self._timer = None
32 self._tail = b""
33 self._upgraded = False
34 self._parser: Optional[HttpResponseParser] = None
36 self._read_timeout: Optional[float] = None
37 self._read_timeout_handle: Optional[asyncio.TimerHandle] = None
39 @property
40 def upgraded(self) -> bool:
41 return self._upgraded
43 @property
44 def should_close(self) -> bool:
45 if self._payload is not None and not self._payload.is_eof() or self._upgraded:
46 return True
48 return (
49 self._should_close
50 or self._upgraded
51 or self.exception() is not None
52 or self._payload_parser is not None
53 or len(self) > 0
54 or bool(self._tail)
55 )
57 def force_close(self) -> None:
58 self._should_close = True
60 def close(self) -> None:
61 transport = self.transport
62 if transport is not None:
63 transport.close()
64 self.transport = None
65 self._payload = None
66 self._drop_timeout()
68 def is_connected(self) -> bool:
69 return self.transport is not None and not self.transport.is_closing()
71 def connection_lost(self, exc: Optional[BaseException]) -> None:
72 self._drop_timeout()
74 if self._payload_parser is not None:
75 with suppress(Exception):
76 self._payload_parser.feed_eof()
78 uncompleted = None
79 if self._parser is not None:
80 try:
81 uncompleted = self._parser.feed_eof()
82 except Exception:
83 if self._payload is not None:
84 self._payload.set_exception(
85 ClientPayloadError("Response payload is not completed")
86 )
88 if not self.is_eof():
89 if isinstance(exc, OSError):
90 exc = ClientOSError(*exc.args)
91 if exc is None:
92 exc = ServerDisconnectedError(uncompleted)
93 # assigns self._should_close to True as side effect,
94 # we do it anyway below
95 self.set_exception(exc)
97 self._should_close = True
98 self._parser = None
99 self._payload = None
100 self._payload_parser = None
101 self._reading_paused = False
103 super().connection_lost(exc)
105 def eof_received(self) -> None:
106 # should call parser.feed_eof() most likely
107 self._drop_timeout()
109 def pause_reading(self) -> None:
110 super().pause_reading()
111 self._drop_timeout()
113 def resume_reading(self) -> None:
114 super().resume_reading()
115 self._reschedule_timeout()
117 def set_exception(self, exc: BaseException) -> None:
118 self._should_close = True
119 self._drop_timeout()
120 super().set_exception(exc)
122 def set_parser(self, parser: Any, payload: Any) -> None:
123 # TODO: actual types are:
124 # parser: WebSocketReader
125 # payload: FlowControlDataQueue
126 # but they are not generi enough
127 # Need an ABC for both types
128 self._payload = payload
129 self._payload_parser = parser
131 self._drop_timeout()
133 if self._tail:
134 data, self._tail = self._tail, b""
135 self.data_received(data)
137 def set_response_params(
138 self,
139 *,
140 timer: Optional[BaseTimerContext] = None,
141 skip_payload: bool = False,
142 read_until_eof: bool = False,
143 auto_decompress: bool = True,
144 read_timeout: Optional[float] = None,
145 read_bufsize: int = 2**16,
146 ) -> None:
147 self._skip_payload = skip_payload
149 self._read_timeout = read_timeout
150 self._reschedule_timeout()
152 self._parser = HttpResponseParser(
153 self,
154 self._loop,
155 read_bufsize,
156 timer=timer,
157 payload_exception=ClientPayloadError,
158 response_with_body=not skip_payload,
159 read_until_eof=read_until_eof,
160 auto_decompress=auto_decompress,
161 )
163 if self._tail:
164 data, self._tail = self._tail, b""
165 self.data_received(data)
167 def _drop_timeout(self) -> None:
168 if self._read_timeout_handle is not None:
169 self._read_timeout_handle.cancel()
170 self._read_timeout_handle = None
172 def _reschedule_timeout(self) -> None:
173 timeout = self._read_timeout
174 if self._read_timeout_handle is not None:
175 self._read_timeout_handle.cancel()
177 if timeout:
178 self._read_timeout_handle = self._loop.call_later(
179 timeout, self._on_read_timeout
180 )
181 else:
182 self._read_timeout_handle = None
184 def _on_read_timeout(self) -> None:
185 exc = ServerTimeoutError("Timeout on reading data from socket")
186 self.set_exception(exc)
187 if self._payload is not None:
188 self._payload.set_exception(exc)
190 def data_received(self, data: bytes) -> None:
191 self._reschedule_timeout()
193 if not data:
194 return
196 # custom payload parser
197 if self._payload_parser is not None:
198 eof, tail = self._payload_parser.feed_data(data)
199 if eof:
200 self._payload = None
201 self._payload_parser = None
203 if tail:
204 self.data_received(tail)
205 return
206 else:
207 if self._upgraded or self._parser is None:
208 # i.e. websocket connection, websocket parser is not set yet
209 self._tail += data
210 else:
211 # parse http messages
212 try:
213 messages, upgraded, tail = self._parser.feed_data(data)
214 except BaseException as exc:
215 if self.transport is not None:
216 # connection.release() could be called BEFORE
217 # data_received(), the transport is already
218 # closed in this case
219 self.transport.close()
220 # should_close is True after the call
221 self.set_exception(exc)
222 return
224 self._upgraded = upgraded
226 payload: Optional[StreamReader] = None
227 for message, payload in messages:
228 if message.should_close:
229 self._should_close = True
231 self._payload = payload
233 if self._skip_payload or message.code in (204, 304):
234 self.feed_data((message, EMPTY_PAYLOAD), 0)
235 else:
236 self.feed_data((message, payload), 0)
237 if payload is not None:
238 # new message(s) was processed
239 # register timeout handler unsubscribing
240 # either on end-of-stream or immediately for
241 # EMPTY_PAYLOAD
242 if payload is not EMPTY_PAYLOAD:
243 payload.on_eof(self._drop_timeout)
244 else:
245 self._drop_timeout()
247 if tail:
248 if upgraded:
249 self.data_received(tail)
250 else:
251 self._tail = tail