1import asyncio
2from contextlib import suppress
3from typing import Any, Optional, Tuple
4
5from .base_protocol import BaseProtocol
6from .client_exceptions import (
7 ClientOSError,
8 ClientPayloadError,
9 ServerDisconnectedError,
10 ServerTimeoutError,
11)
12from .helpers import (
13 _EXC_SENTINEL,
14 BaseTimerContext,
15 set_exception,
16 status_code_must_be_empty_body,
17)
18from .http import HttpResponseParser, RawResponseMessage
19from .http_exceptions import HttpProcessingError
20from .streams import EMPTY_PAYLOAD, DataQueue, StreamReader
21
22
23class ResponseHandler(BaseProtocol, DataQueue[Tuple[RawResponseMessage, StreamReader]]):
24 """Helper class to adapt between Protocol and StreamReader."""
25
26 def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
27 BaseProtocol.__init__(self, loop=loop)
28 DataQueue.__init__(self, loop)
29
30 self._should_close = False
31
32 self._payload: Optional[StreamReader] = None
33 self._skip_payload = False
34 self._payload_parser = None
35
36 self._timer = None
37
38 self._tail = b""
39 self._upgraded = False
40 self._parser: Optional[HttpResponseParser] = None
41
42 self._read_timeout: Optional[float] = None
43 self._read_timeout_handle: Optional[asyncio.TimerHandle] = None
44
45 self._timeout_ceil_threshold: Optional[float] = 5
46
47 @property
48 def upgraded(self) -> bool:
49 return self._upgraded
50
51 @property
52 def should_close(self) -> bool:
53 if self._payload is not None and not self._payload.is_eof() or self._upgraded:
54 return True
55
56 return (
57 self._should_close
58 or self._upgraded
59 or self.exception() is not None
60 or self._payload_parser is not None
61 or len(self) > 0
62 or bool(self._tail)
63 )
64
65 def force_close(self) -> None:
66 self._should_close = True
67
68 def close(self) -> None:
69 transport = self.transport
70 if transport is not None:
71 transport.close()
72 self.transport = None
73 self._payload = None
74 self._drop_timeout()
75
76 def is_connected(self) -> bool:
77 return self.transport is not None and not self.transport.is_closing()
78
79 def connection_lost(self, exc: Optional[BaseException]) -> None:
80 self._drop_timeout()
81
82 original_connection_error = exc
83 reraised_exc = original_connection_error
84
85 connection_closed_cleanly = original_connection_error is None
86
87 if self._payload_parser is not None:
88 with suppress(Exception): # FIXME: log this somehow?
89 self._payload_parser.feed_eof()
90
91 uncompleted = None
92 if self._parser is not None:
93 try:
94 uncompleted = self._parser.feed_eof()
95 except Exception as underlying_exc:
96 if self._payload is not None:
97 client_payload_exc_msg = (
98 f"Response payload is not completed: {underlying_exc !r}"
99 )
100 if not connection_closed_cleanly:
101 client_payload_exc_msg = (
102 f"{client_payload_exc_msg !s}. "
103 f"{original_connection_error !r}"
104 )
105 set_exception(
106 self._payload,
107 ClientPayloadError(client_payload_exc_msg),
108 underlying_exc,
109 )
110
111 if not self.is_eof():
112 if isinstance(original_connection_error, OSError):
113 reraised_exc = ClientOSError(*original_connection_error.args)
114 if connection_closed_cleanly:
115 reraised_exc = ServerDisconnectedError(uncompleted)
116 # assigns self._should_close to True as side effect,
117 # we do it anyway below
118 underlying_non_eof_exc = (
119 _EXC_SENTINEL
120 if connection_closed_cleanly
121 else original_connection_error
122 )
123 assert underlying_non_eof_exc is not None
124 assert reraised_exc is not None
125 self.set_exception(reraised_exc, underlying_non_eof_exc)
126
127 self._should_close = True
128 self._parser = None
129 self._payload = None
130 self._payload_parser = None
131 self._reading_paused = False
132
133 super().connection_lost(reraised_exc)
134
135 def eof_received(self) -> None:
136 # should call parser.feed_eof() most likely
137 self._drop_timeout()
138
139 def pause_reading(self) -> None:
140 super().pause_reading()
141 self._drop_timeout()
142
143 def resume_reading(self) -> None:
144 super().resume_reading()
145 self._reschedule_timeout()
146
147 def set_exception(
148 self,
149 exc: BaseException,
150 exc_cause: BaseException = _EXC_SENTINEL,
151 ) -> None:
152 self._should_close = True
153 self._drop_timeout()
154 super().set_exception(exc, exc_cause)
155
156 def set_parser(self, parser: Any, payload: Any) -> None:
157 # TODO: actual types are:
158 # parser: WebSocketReader
159 # payload: FlowControlDataQueue
160 # but they are not generi enough
161 # Need an ABC for both types
162 self._payload = payload
163 self._payload_parser = parser
164
165 self._drop_timeout()
166
167 if self._tail:
168 data, self._tail = self._tail, b""
169 self.data_received(data)
170
171 def set_response_params(
172 self,
173 *,
174 timer: Optional[BaseTimerContext] = None,
175 skip_payload: bool = False,
176 read_until_eof: bool = False,
177 auto_decompress: bool = True,
178 read_timeout: Optional[float] = None,
179 read_bufsize: int = 2**16,
180 timeout_ceil_threshold: float = 5,
181 max_line_size: int = 8190,
182 max_field_size: int = 8190,
183 ) -> None:
184 self._skip_payload = skip_payload
185
186 self._read_timeout = read_timeout
187
188 self._timeout_ceil_threshold = timeout_ceil_threshold
189
190 self._parser = HttpResponseParser(
191 self,
192 self._loop,
193 read_bufsize,
194 timer=timer,
195 payload_exception=ClientPayloadError,
196 response_with_body=not skip_payload,
197 read_until_eof=read_until_eof,
198 auto_decompress=auto_decompress,
199 max_line_size=max_line_size,
200 max_field_size=max_field_size,
201 )
202
203 if self._tail:
204 data, self._tail = self._tail, b""
205 self.data_received(data)
206
207 def _drop_timeout(self) -> None:
208 if self._read_timeout_handle is not None:
209 self._read_timeout_handle.cancel()
210 self._read_timeout_handle = None
211
212 def _reschedule_timeout(self) -> None:
213 timeout = self._read_timeout
214 if self._read_timeout_handle is not None:
215 self._read_timeout_handle.cancel()
216
217 if timeout:
218 self._read_timeout_handle = self._loop.call_later(
219 timeout, self._on_read_timeout
220 )
221 else:
222 self._read_timeout_handle = None
223
224 def start_timeout(self) -> None:
225 self._reschedule_timeout()
226
227 def _on_read_timeout(self) -> None:
228 exc = ServerTimeoutError("Timeout on reading data from socket")
229 self.set_exception(exc)
230 if self._payload is not None:
231 set_exception(self._payload, exc)
232
233 def data_received(self, data: bytes) -> None:
234 self._reschedule_timeout()
235
236 if not data:
237 return
238
239 # custom payload parser
240 if self._payload_parser is not None:
241 eof, tail = self._payload_parser.feed_data(data)
242 if eof:
243 self._payload = None
244 self._payload_parser = None
245
246 if tail:
247 self.data_received(tail)
248 return
249 else:
250 if self._upgraded or self._parser is None:
251 # i.e. websocket connection, websocket parser is not set yet
252 self._tail += data
253 else:
254 # parse http messages
255 try:
256 messages, upgraded, tail = self._parser.feed_data(data)
257 except BaseException as underlying_exc:
258 if self.transport is not None:
259 # connection.release() could be called BEFORE
260 # data_received(), the transport is already
261 # closed in this case
262 self.transport.close()
263 # should_close is True after the call
264 self.set_exception(HttpProcessingError(), underlying_exc)
265 return
266
267 self._upgraded = upgraded
268
269 payload: Optional[StreamReader] = None
270 for message, payload in messages:
271 if message.should_close:
272 self._should_close = True
273
274 self._payload = payload
275
276 if self._skip_payload or status_code_must_be_empty_body(
277 message.code
278 ):
279 self.feed_data((message, EMPTY_PAYLOAD), 0)
280 else:
281 self.feed_data((message, payload), 0)
282 if payload is not None:
283 # new message(s) was processed
284 # register timeout handler unsubscribing
285 # either on end-of-stream or immediately for
286 # EMPTY_PAYLOAD
287 if payload is not EMPTY_PAYLOAD:
288 payload.on_eof(self._drop_timeout)
289 else:
290 self._drop_timeout()
291
292 if tail:
293 if upgraded:
294 self.data_received(tail)
295 else:
296 self._tail = tail