Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/client_proto.py: 20%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

171 statements  

1import asyncio 

2from contextlib import suppress 

3from typing import Any, Optional, Tuple, Type, Union 

4 

5from .base_protocol import BaseProtocol 

6from .client_exceptions import ( 

7 ClientConnectionError, 

8 ClientOSError, 

9 ClientPayloadError, 

10 ServerDisconnectedError, 

11 SocketTimeoutError, 

12) 

13from .helpers import ( 

14 _EXC_SENTINEL, 

15 EMPTY_BODY_STATUS_CODES, 

16 BaseTimerContext, 

17 set_exception, 

18 set_result, 

19) 

20from .http import HttpResponseParser, RawResponseMessage, WebSocketReader 

21from .http_exceptions import HttpProcessingError 

22from .streams import EMPTY_PAYLOAD, DataQueue, StreamReader 

23 

24 

25class ResponseHandler(BaseProtocol, DataQueue[Tuple[RawResponseMessage, StreamReader]]): 

26 """Helper class to adapt between Protocol and StreamReader.""" 

27 

28 def __init__(self, loop: asyncio.AbstractEventLoop) -> None: 

29 BaseProtocol.__init__(self, loop=loop) 

30 DataQueue.__init__(self, loop) 

31 

32 self._should_close = False 

33 

34 self._payload: Optional[StreamReader] = None 

35 self._skip_payload = False 

36 self._payload_parser: Optional[WebSocketReader] = None 

37 

38 self._timer = None 

39 

40 self._tail = b"" 

41 self._upgraded = False 

42 self._parser: Optional[HttpResponseParser] = None 

43 

44 self._read_timeout: Optional[float] = None 

45 self._read_timeout_handle: Optional[asyncio.TimerHandle] = None 

46 

47 self._timeout_ceil_threshold: Optional[float] = 5 

48 

49 self.closed: asyncio.Future[None] = self._loop.create_future() 

50 

51 @property 

52 def upgraded(self) -> bool: 

53 return self._upgraded 

54 

55 @property 

56 def should_close(self) -> bool: 

57 return bool( 

58 self._should_close 

59 or (self._payload is not None and not self._payload.is_eof()) 

60 or self._upgraded 

61 or self._exception is not None 

62 or self._payload_parser is not None 

63 or self._buffer 

64 or self._tail 

65 ) 

66 

67 def force_close(self) -> None: 

68 self._should_close = True 

69 

70 def close(self) -> None: 

71 self._exception = None # Break cyclic references 

72 transport = self.transport 

73 if transport is not None: 

74 transport.close() 

75 self.transport = None 

76 self._payload = None 

77 self._drop_timeout() 

78 

79 def is_connected(self) -> bool: 

80 return self.transport is not None and not self.transport.is_closing() 

81 

82 def connection_lost(self, exc: Optional[BaseException]) -> None: 

83 self._drop_timeout() 

84 

85 original_connection_error = exc 

86 reraised_exc = original_connection_error 

87 

88 connection_closed_cleanly = original_connection_error is None 

89 

90 if connection_closed_cleanly: 

91 set_result(self.closed, None) 

92 else: 

93 assert original_connection_error is not None 

94 set_exception( 

95 self.closed, 

96 ClientConnectionError( 

97 f"Connection lost: {original_connection_error !s}", 

98 ), 

99 original_connection_error, 

100 ) 

101 

102 if self._payload_parser is not None: 

103 with suppress(Exception): # FIXME: log this somehow? 

104 self._payload_parser.feed_eof() 

105 

106 uncompleted = None 

107 if self._parser is not None: 

108 try: 

109 uncompleted = self._parser.feed_eof() 

110 except Exception as underlying_exc: 

111 if self._payload is not None: 

112 client_payload_exc_msg = ( 

113 f"Response payload is not completed: {underlying_exc !r}" 

114 ) 

115 if not connection_closed_cleanly: 

116 client_payload_exc_msg = ( 

117 f"{client_payload_exc_msg !s}. " 

118 f"{original_connection_error !r}" 

119 ) 

120 set_exception( 

121 self._payload, 

122 ClientPayloadError(client_payload_exc_msg), 

123 underlying_exc, 

124 ) 

125 

126 if not self.is_eof(): 

127 if isinstance(original_connection_error, OSError): 

128 reraised_exc = ClientOSError(*original_connection_error.args) 

129 if connection_closed_cleanly: 

130 reraised_exc = ServerDisconnectedError(uncompleted) 

131 # assigns self._should_close to True as side effect, 

132 # we do it anyway below 

133 underlying_non_eof_exc = ( 

134 _EXC_SENTINEL 

135 if connection_closed_cleanly 

136 else original_connection_error 

137 ) 

138 assert underlying_non_eof_exc is not None 

139 assert reraised_exc is not None 

140 self.set_exception(reraised_exc, underlying_non_eof_exc) 

141 

142 self._should_close = True 

143 self._parser = None 

144 self._payload = None 

145 self._payload_parser = None 

146 self._reading_paused = False 

147 

148 super().connection_lost(reraised_exc) 

149 

150 def eof_received(self) -> None: 

151 # should call parser.feed_eof() most likely 

152 self._drop_timeout() 

153 

154 def pause_reading(self) -> None: 

155 super().pause_reading() 

156 self._drop_timeout() 

157 

158 def resume_reading(self) -> None: 

159 super().resume_reading() 

160 self._reschedule_timeout() 

161 

162 def set_exception( 

163 self, 

164 exc: Union[Type[BaseException], BaseException], 

165 exc_cause: BaseException = _EXC_SENTINEL, 

166 ) -> None: 

167 self._should_close = True 

168 self._drop_timeout() 

169 super().set_exception(exc, exc_cause) 

170 

171 def set_parser(self, parser: Any, payload: Any) -> None: 

172 # TODO: actual types are: 

173 # parser: WebSocketReader 

174 # payload: WebSocketDataQueue 

175 # but they are not generi enough 

176 # Need an ABC for both types 

177 self._payload = payload 

178 self._payload_parser = parser 

179 

180 self._drop_timeout() 

181 

182 if self._tail: 

183 data, self._tail = self._tail, b"" 

184 self.data_received(data) 

185 

186 def set_response_params( 

187 self, 

188 *, 

189 timer: Optional[BaseTimerContext] = None, 

190 skip_payload: bool = False, 

191 read_until_eof: bool = False, 

192 auto_decompress: bool = True, 

193 read_timeout: Optional[float] = None, 

194 read_bufsize: int = 2**16, 

195 timeout_ceil_threshold: float = 5, 

196 max_line_size: int = 8190, 

197 max_field_size: int = 8190, 

198 ) -> None: 

199 self._skip_payload = skip_payload 

200 

201 self._read_timeout = read_timeout 

202 

203 self._timeout_ceil_threshold = timeout_ceil_threshold 

204 

205 self._parser = HttpResponseParser( 

206 self, 

207 self._loop, 

208 read_bufsize, 

209 timer=timer, 

210 payload_exception=ClientPayloadError, 

211 response_with_body=not skip_payload, 

212 read_until_eof=read_until_eof, 

213 auto_decompress=auto_decompress, 

214 max_line_size=max_line_size, 

215 max_field_size=max_field_size, 

216 ) 

217 

218 if self._tail: 

219 data, self._tail = self._tail, b"" 

220 self.data_received(data) 

221 

222 def _drop_timeout(self) -> None: 

223 if self._read_timeout_handle is not None: 

224 self._read_timeout_handle.cancel() 

225 self._read_timeout_handle = None 

226 

227 def _reschedule_timeout(self) -> None: 

228 timeout = self._read_timeout 

229 if self._read_timeout_handle is not None: 

230 self._read_timeout_handle.cancel() 

231 

232 if timeout: 

233 self._read_timeout_handle = self._loop.call_later( 

234 timeout, self._on_read_timeout 

235 ) 

236 else: 

237 self._read_timeout_handle = None 

238 

239 def start_timeout(self) -> None: 

240 self._reschedule_timeout() 

241 

242 @property 

243 def read_timeout(self) -> Optional[float]: 

244 return self._read_timeout 

245 

246 @read_timeout.setter 

247 def read_timeout(self, read_timeout: Optional[float]) -> None: 

248 self._read_timeout = read_timeout 

249 

250 def _on_read_timeout(self) -> None: 

251 exc = SocketTimeoutError("Timeout on reading data from socket") 

252 self.set_exception(exc) 

253 if self._payload is not None: 

254 set_exception(self._payload, exc) 

255 

256 def data_received(self, data: bytes) -> None: 

257 self._reschedule_timeout() 

258 

259 if not data: 

260 return 

261 

262 # custom payload parser - currently always WebSocketReader 

263 if self._payload_parser is not None: 

264 eof, tail = self._payload_parser.feed_data(data) 

265 if eof: 

266 self._payload = None 

267 self._payload_parser = None 

268 

269 if tail: 

270 self.data_received(tail) 

271 return 

272 

273 if self._upgraded or self._parser is None: 

274 # i.e. websocket connection, websocket parser is not set yet 

275 self._tail += data 

276 return 

277 

278 # parse http messages 

279 try: 

280 messages, upgraded, tail = self._parser.feed_data(data) 

281 except BaseException as underlying_exc: 

282 if self.transport is not None: 

283 # connection.release() could be called BEFORE 

284 # data_received(), the transport is already 

285 # closed in this case 

286 self.transport.close() 

287 # should_close is True after the call 

288 if isinstance(underlying_exc, HttpProcessingError): 

289 exc = HttpProcessingError( 

290 code=underlying_exc.code, 

291 message=underlying_exc.message, 

292 headers=underlying_exc.headers, 

293 ) 

294 else: 

295 exc = HttpProcessingError() 

296 self.set_exception(exc, underlying_exc) 

297 return 

298 

299 self._upgraded = upgraded 

300 

301 payload: Optional[StreamReader] = None 

302 for message, payload in messages: 

303 if message.should_close: 

304 self._should_close = True 

305 

306 self._payload = payload 

307 

308 if self._skip_payload or message.code in EMPTY_BODY_STATUS_CODES: 

309 self.feed_data((message, EMPTY_PAYLOAD)) 

310 else: 

311 self.feed_data((message, payload)) 

312 

313 if payload is not None: 

314 # new message(s) was processed 

315 # register timeout handler unsubscribing 

316 # either on end-of-stream or immediately for 

317 # EMPTY_PAYLOAD 

318 if payload is not EMPTY_PAYLOAD: 

319 payload.on_eof(self._drop_timeout) 

320 else: 

321 self._drop_timeout() 

322 

323 if upgraded and tail: 

324 self.data_received(tail)