Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/aiohttp/client_proto.py: 19%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

159 statements  

1import asyncio 

2from contextlib import suppress 

3from typing import Any, Optional, Tuple 

4 

5from .base_protocol import BaseProtocol 

6from .client_exceptions import ( 

7 ClientOSError, 

8 ClientPayloadError, 

9 ServerDisconnectedError, 

10 ServerTimeoutError, 

11) 

12from .helpers import ( 

13 _EXC_SENTINEL, 

14 BaseTimerContext, 

15 set_exception, 

16 status_code_must_be_empty_body, 

17) 

18from .http import HttpResponseParser, RawResponseMessage 

19from .http_exceptions import HttpProcessingError 

20from .streams import EMPTY_PAYLOAD, DataQueue, StreamReader 

21 

22 

23class ResponseHandler(BaseProtocol, DataQueue[Tuple[RawResponseMessage, StreamReader]]): 

24 """Helper class to adapt between Protocol and StreamReader.""" 

25 

26 def __init__(self, loop: asyncio.AbstractEventLoop) -> None: 

27 BaseProtocol.__init__(self, loop=loop) 

28 DataQueue.__init__(self, loop) 

29 

30 self._should_close = False 

31 

32 self._payload: Optional[StreamReader] = None 

33 self._skip_payload = False 

34 self._payload_parser = None 

35 

36 self._timer = None 

37 

38 self._tail = b"" 

39 self._upgraded = False 

40 self._parser: Optional[HttpResponseParser] = None 

41 

42 self._read_timeout: Optional[float] = None 

43 self._read_timeout_handle: Optional[asyncio.TimerHandle] = None 

44 

45 self._timeout_ceil_threshold: Optional[float] = 5 

46 

47 @property 

48 def upgraded(self) -> bool: 

49 return self._upgraded 

50 

51 @property 

52 def should_close(self) -> bool: 

53 if self._payload is not None and not self._payload.is_eof() or self._upgraded: 

54 return True 

55 

56 return ( 

57 self._should_close 

58 or self._upgraded 

59 or self.exception() is not None 

60 or self._payload_parser is not None 

61 or len(self) > 0 

62 or bool(self._tail) 

63 ) 

64 

65 def force_close(self) -> None: 

66 self._should_close = True 

67 

68 def close(self) -> None: 

69 transport = self.transport 

70 if transport is not None: 

71 transport.close() 

72 self.transport = None 

73 self._payload = None 

74 self._drop_timeout() 

75 

76 def is_connected(self) -> bool: 

77 return self.transport is not None and not self.transport.is_closing() 

78 

79 def connection_lost(self, exc: Optional[BaseException]) -> None: 

80 self._drop_timeout() 

81 

82 original_connection_error = exc 

83 reraised_exc = original_connection_error 

84 

85 connection_closed_cleanly = original_connection_error is None 

86 

87 if self._payload_parser is not None: 

88 with suppress(Exception): # FIXME: log this somehow? 

89 self._payload_parser.feed_eof() 

90 

91 uncompleted = None 

92 if self._parser is not None: 

93 try: 

94 uncompleted = self._parser.feed_eof() 

95 except Exception as underlying_exc: 

96 if self._payload is not None: 

97 client_payload_exc_msg = ( 

98 f"Response payload is not completed: {underlying_exc !r}" 

99 ) 

100 if not connection_closed_cleanly: 

101 client_payload_exc_msg = ( 

102 f"{client_payload_exc_msg !s}. " 

103 f"{original_connection_error !r}" 

104 ) 

105 set_exception( 

106 self._payload, 

107 ClientPayloadError(client_payload_exc_msg), 

108 underlying_exc, 

109 ) 

110 

111 if not self.is_eof(): 

112 if isinstance(original_connection_error, OSError): 

113 reraised_exc = ClientOSError(*original_connection_error.args) 

114 if connection_closed_cleanly: 

115 reraised_exc = ServerDisconnectedError(uncompleted) 

116 # assigns self._should_close to True as side effect, 

117 # we do it anyway below 

118 underlying_non_eof_exc = ( 

119 _EXC_SENTINEL 

120 if connection_closed_cleanly 

121 else original_connection_error 

122 ) 

123 assert underlying_non_eof_exc is not None 

124 assert reraised_exc is not None 

125 self.set_exception(reraised_exc, underlying_non_eof_exc) 

126 

127 self._should_close = True 

128 self._parser = None 

129 self._payload = None 

130 self._payload_parser = None 

131 self._reading_paused = False 

132 

133 super().connection_lost(reraised_exc) 

134 

135 def eof_received(self) -> None: 

136 # should call parser.feed_eof() most likely 

137 self._drop_timeout() 

138 

139 def pause_reading(self) -> None: 

140 super().pause_reading() 

141 self._drop_timeout() 

142 

143 def resume_reading(self) -> None: 

144 super().resume_reading() 

145 self._reschedule_timeout() 

146 

147 def set_exception( 

148 self, 

149 exc: BaseException, 

150 exc_cause: BaseException = _EXC_SENTINEL, 

151 ) -> None: 

152 self._should_close = True 

153 self._drop_timeout() 

154 super().set_exception(exc, exc_cause) 

155 

156 def set_parser(self, parser: Any, payload: Any) -> None: 

157 # TODO: actual types are: 

158 # parser: WebSocketReader 

159 # payload: FlowControlDataQueue 

160 # but they are not generi enough 

161 # Need an ABC for both types 

162 self._payload = payload 

163 self._payload_parser = parser 

164 

165 self._drop_timeout() 

166 

167 if self._tail: 

168 data, self._tail = self._tail, b"" 

169 self.data_received(data) 

170 

171 def set_response_params( 

172 self, 

173 *, 

174 timer: Optional[BaseTimerContext] = None, 

175 skip_payload: bool = False, 

176 read_until_eof: bool = False, 

177 auto_decompress: bool = True, 

178 read_timeout: Optional[float] = None, 

179 read_bufsize: int = 2**16, 

180 timeout_ceil_threshold: float = 5, 

181 max_line_size: int = 8190, 

182 max_field_size: int = 8190, 

183 ) -> None: 

184 self._skip_payload = skip_payload 

185 

186 self._read_timeout = read_timeout 

187 

188 self._timeout_ceil_threshold = timeout_ceil_threshold 

189 

190 self._parser = HttpResponseParser( 

191 self, 

192 self._loop, 

193 read_bufsize, 

194 timer=timer, 

195 payload_exception=ClientPayloadError, 

196 response_with_body=not skip_payload, 

197 read_until_eof=read_until_eof, 

198 auto_decompress=auto_decompress, 

199 max_line_size=max_line_size, 

200 max_field_size=max_field_size, 

201 ) 

202 

203 if self._tail: 

204 data, self._tail = self._tail, b"" 

205 self.data_received(data) 

206 

207 def _drop_timeout(self) -> None: 

208 if self._read_timeout_handle is not None: 

209 self._read_timeout_handle.cancel() 

210 self._read_timeout_handle = None 

211 

212 def _reschedule_timeout(self) -> None: 

213 timeout = self._read_timeout 

214 if self._read_timeout_handle is not None: 

215 self._read_timeout_handle.cancel() 

216 

217 if timeout: 

218 self._read_timeout_handle = self._loop.call_later( 

219 timeout, self._on_read_timeout 

220 ) 

221 else: 

222 self._read_timeout_handle = None 

223 

224 def start_timeout(self) -> None: 

225 self._reschedule_timeout() 

226 

227 def _on_read_timeout(self) -> None: 

228 exc = ServerTimeoutError("Timeout on reading data from socket") 

229 self.set_exception(exc) 

230 if self._payload is not None: 

231 set_exception(self._payload, exc) 

232 

233 def data_received(self, data: bytes) -> None: 

234 self._reschedule_timeout() 

235 

236 if not data: 

237 return 

238 

239 # custom payload parser 

240 if self._payload_parser is not None: 

241 eof, tail = self._payload_parser.feed_data(data) 

242 if eof: 

243 self._payload = None 

244 self._payload_parser = None 

245 

246 if tail: 

247 self.data_received(tail) 

248 return 

249 else: 

250 if self._upgraded or self._parser is None: 

251 # i.e. websocket connection, websocket parser is not set yet 

252 self._tail += data 

253 else: 

254 # parse http messages 

255 try: 

256 messages, upgraded, tail = self._parser.feed_data(data) 

257 except BaseException as underlying_exc: 

258 if self.transport is not None: 

259 # connection.release() could be called BEFORE 

260 # data_received(), the transport is already 

261 # closed in this case 

262 self.transport.close() 

263 # should_close is True after the call 

264 self.set_exception(HttpProcessingError(), underlying_exc) 

265 return 

266 

267 self._upgraded = upgraded 

268 

269 payload: Optional[StreamReader] = None 

270 for message, payload in messages: 

271 if message.should_close: 

272 self._should_close = True 

273 

274 self._payload = payload 

275 

276 if self._skip_payload or status_code_must_be_empty_body( 

277 message.code 

278 ): 

279 self.feed_data((message, EMPTY_PAYLOAD), 0) 

280 else: 

281 self.feed_data((message, payload), 0) 

282 if payload is not None: 

283 # new message(s) was processed 

284 # register timeout handler unsubscribing 

285 # either on end-of-stream or immediately for 

286 # EMPTY_PAYLOAD 

287 if payload is not EMPTY_PAYLOAD: 

288 payload.on_eof(self._drop_timeout) 

289 else: 

290 self._drop_timeout() 

291 

292 if tail: 

293 if upgraded: 

294 self.data_received(tail) 

295 else: 

296 self._tail = tail