Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiohttp/client_proto.py: 20%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

187 statements  

1import asyncio 

2from contextlib import suppress 

3from typing import Any, Optional, Tuple, Union 

4 

5from .base_protocol import BaseProtocol 

6from .client_exceptions import ( 

7 ClientConnectionError, 

8 ClientOSError, 

9 ClientPayloadError, 

10 ServerDisconnectedError, 

11 SocketTimeoutError, 

12) 

13from .helpers import ( 

14 _EXC_SENTINEL, 

15 EMPTY_BODY_STATUS_CODES, 

16 BaseTimerContext, 

17 set_exception, 

18 set_result, 

19) 

20from .http import HttpResponseParser, RawResponseMessage 

21from .http_exceptions import HttpProcessingError 

22from .streams import EMPTY_PAYLOAD, DataQueue, StreamReader 

23 

24 

25class ResponseHandler(BaseProtocol, DataQueue[Tuple[RawResponseMessage, StreamReader]]): 

26 """Helper class to adapt between Protocol and StreamReader.""" 

27 

28 def __init__(self, loop: asyncio.AbstractEventLoop) -> None: 

29 BaseProtocol.__init__(self, loop=loop) 

30 DataQueue.__init__(self, loop) 

31 

32 self._should_close = False 

33 

34 self._payload: Optional[StreamReader] = None 

35 self._skip_payload = False 

36 self._payload_parser = None 

37 

38 self._timer = None 

39 

40 self._tail = b"" 

41 self._upgraded = False 

42 self._parser: Optional[HttpResponseParser] = None 

43 

44 self._read_timeout: Optional[float] = None 

45 self._read_timeout_handle: Optional[asyncio.TimerHandle] = None 

46 

47 self._timeout_ceil_threshold: Optional[float] = 5 

48 

49 self._closed: Union[None, asyncio.Future[None]] = None 

50 self._connection_lost_called = False 

51 

52 @property 

53 def closed(self) -> Union[None, asyncio.Future[None]]: 

54 """Future that is set when the connection is closed. 

55 

56 This property returns a Future that will be completed when the connection 

57 is closed. The Future is created lazily on first access to avoid creating 

58 futures that will never be awaited. 

59 

60 Returns: 

61 - A Future[None] if the connection is still open or was closed after 

62 this property was accessed 

63 - None if connection_lost() was already called before this property 

64 was ever accessed (indicating no one is waiting for the closure) 

65 """ 

66 if self._closed is None and not self._connection_lost_called: 

67 self._closed = self._loop.create_future() 

68 return self._closed 

69 

70 @property 

71 def upgraded(self) -> bool: 

72 return self._upgraded 

73 

74 @property 

75 def should_close(self) -> bool: 

76 return bool( 

77 self._should_close 

78 or (self._payload is not None and not self._payload.is_eof()) 

79 or self._upgraded 

80 or self._exception is not None 

81 or self._payload_parser is not None 

82 or self._buffer 

83 or self._tail 

84 ) 

85 

86 def force_close(self) -> None: 

87 self._should_close = True 

88 

89 def close(self) -> None: 

90 self._exception = None # Break cyclic references 

91 transport = self.transport 

92 if transport is not None: 

93 transport.close() 

94 self.transport = None 

95 self._payload = None 

96 self._drop_timeout() 

97 

98 def abort(self) -> None: 

99 self._exception = None # Break cyclic references 

100 transport = self.transport 

101 if transport is not None: 

102 transport.abort() 

103 self.transport = None 

104 self._payload = None 

105 self._drop_timeout() 

106 

107 def is_connected(self) -> bool: 

108 return self.transport is not None and not self.transport.is_closing() 

109 

110 def connection_lost(self, exc: Optional[BaseException]) -> None: 

111 self._connection_lost_called = True 

112 self._drop_timeout() 

113 

114 original_connection_error = exc 

115 reraised_exc = original_connection_error 

116 

117 connection_closed_cleanly = original_connection_error is None 

118 

119 if self._closed is not None: 

120 # If someone is waiting for the closed future, 

121 # we should set it to None or an exception. If 

122 # self._closed is None, it means that 

123 # connection_lost() was called already 

124 # or nobody is waiting for it. 

125 if connection_closed_cleanly: 

126 set_result(self._closed, None) 

127 else: 

128 assert original_connection_error is not None 

129 set_exception( 

130 self._closed, 

131 ClientConnectionError( 

132 f"Connection lost: {original_connection_error !s}", 

133 ), 

134 original_connection_error, 

135 ) 

136 

137 if self._payload_parser is not None: 

138 with suppress(Exception): # FIXME: log this somehow? 

139 self._payload_parser.feed_eof() 

140 

141 uncompleted = None 

142 if self._parser is not None: 

143 try: 

144 uncompleted = self._parser.feed_eof() 

145 except Exception as underlying_exc: 

146 if self._payload is not None: 

147 client_payload_exc_msg = ( 

148 f"Response payload is not completed: {underlying_exc !r}" 

149 ) 

150 if not connection_closed_cleanly: 

151 client_payload_exc_msg = ( 

152 f"{client_payload_exc_msg !s}. " 

153 f"{original_connection_error !r}" 

154 ) 

155 set_exception( 

156 self._payload, 

157 ClientPayloadError(client_payload_exc_msg), 

158 underlying_exc, 

159 ) 

160 

161 if not self.is_eof(): 

162 if isinstance(original_connection_error, OSError): 

163 reraised_exc = ClientOSError(*original_connection_error.args) 

164 if connection_closed_cleanly: 

165 reraised_exc = ServerDisconnectedError(uncompleted) 

166 # assigns self._should_close to True as side effect, 

167 # we do it anyway below 

168 underlying_non_eof_exc = ( 

169 _EXC_SENTINEL 

170 if connection_closed_cleanly 

171 else original_connection_error 

172 ) 

173 assert underlying_non_eof_exc is not None 

174 assert reraised_exc is not None 

175 self.set_exception(reraised_exc, underlying_non_eof_exc) 

176 

177 self._should_close = True 

178 self._parser = None 

179 self._payload = None 

180 self._payload_parser = None 

181 self._reading_paused = False 

182 

183 super().connection_lost(reraised_exc) 

184 

185 def eof_received(self) -> None: 

186 # should call parser.feed_eof() most likely 

187 self._drop_timeout() 

188 

189 def pause_reading(self) -> None: 

190 super().pause_reading() 

191 self._drop_timeout() 

192 

193 def resume_reading(self) -> None: 

194 super().resume_reading() 

195 self._reschedule_timeout() 

196 

197 def set_exception( 

198 self, 

199 exc: BaseException, 

200 exc_cause: BaseException = _EXC_SENTINEL, 

201 ) -> None: 

202 self._should_close = True 

203 self._drop_timeout() 

204 super().set_exception(exc, exc_cause) 

205 

206 def set_parser(self, parser: Any, payload: Any) -> None: 

207 # TODO: actual types are: 

208 # parser: WebSocketReader 

209 # payload: WebSocketDataQueue 

210 # but they are not generi enough 

211 # Need an ABC for both types 

212 self._payload = payload 

213 self._payload_parser = parser 

214 

215 self._drop_timeout() 

216 

217 if self._tail: 

218 data, self._tail = self._tail, b"" 

219 self.data_received(data) 

220 

221 def set_response_params( 

222 self, 

223 *, 

224 timer: Optional[BaseTimerContext] = None, 

225 skip_payload: bool = False, 

226 read_until_eof: bool = False, 

227 auto_decompress: bool = True, 

228 read_timeout: Optional[float] = None, 

229 read_bufsize: int = 2**16, 

230 timeout_ceil_threshold: float = 5, 

231 max_line_size: int = 8190, 

232 max_field_size: int = 8190, 

233 max_headers: int = 128, 

234 ) -> None: 

235 self._skip_payload = skip_payload 

236 

237 self._read_timeout = read_timeout 

238 

239 self._timeout_ceil_threshold = timeout_ceil_threshold 

240 

241 self._parser = HttpResponseParser( 

242 self, 

243 self._loop, 

244 read_bufsize, 

245 timer=timer, 

246 payload_exception=ClientPayloadError, 

247 response_with_body=not skip_payload, 

248 read_until_eof=read_until_eof, 

249 auto_decompress=auto_decompress, 

250 max_line_size=max_line_size, 

251 max_field_size=max_field_size, 

252 max_headers=max_headers, 

253 ) 

254 

255 if self._tail: 

256 data, self._tail = self._tail, b"" 

257 self.data_received(data) 

258 

259 def _drop_timeout(self) -> None: 

260 if self._read_timeout_handle is not None: 

261 self._read_timeout_handle.cancel() 

262 self._read_timeout_handle = None 

263 

264 def _reschedule_timeout(self) -> None: 

265 timeout = self._read_timeout 

266 if self._read_timeout_handle is not None: 

267 self._read_timeout_handle.cancel() 

268 

269 if timeout: 

270 self._read_timeout_handle = self._loop.call_later( 

271 timeout, self._on_read_timeout 

272 ) 

273 else: 

274 self._read_timeout_handle = None 

275 

276 def start_timeout(self) -> None: 

277 self._reschedule_timeout() 

278 

279 @property 

280 def read_timeout(self) -> Optional[float]: 

281 return self._read_timeout 

282 

283 @read_timeout.setter 

284 def read_timeout(self, read_timeout: Optional[float]) -> None: 

285 self._read_timeout = read_timeout 

286 

287 def _on_read_timeout(self) -> None: 

288 exc = SocketTimeoutError("Timeout on reading data from socket") 

289 self.set_exception(exc) 

290 if self._payload is not None: 

291 set_exception(self._payload, exc) 

292 

293 def data_received(self, data: bytes) -> None: 

294 self._reschedule_timeout() 

295 

296 if not data: 

297 return 

298 

299 # custom payload parser - currently always WebSocketReader 

300 if self._payload_parser is not None: 

301 eof, tail = self._payload_parser.feed_data(data) 

302 if eof: 

303 self._payload = None 

304 self._payload_parser = None 

305 

306 if tail: 

307 self.data_received(tail) 

308 return 

309 

310 if self._upgraded or self._parser is None: 

311 # i.e. websocket connection, websocket parser is not set yet 

312 self._tail += data 

313 return 

314 

315 # parse http messages 

316 try: 

317 messages, upgraded, tail = self._parser.feed_data(data) 

318 except BaseException as underlying_exc: 

319 if self.transport is not None: 

320 # connection.release() could be called BEFORE 

321 # data_received(), the transport is already 

322 # closed in this case 

323 self.transport.close() 

324 # should_close is True after the call 

325 if isinstance(underlying_exc, HttpProcessingError): 

326 exc = HttpProcessingError( 

327 code=underlying_exc.code, 

328 message=underlying_exc.message, 

329 headers=underlying_exc.headers, 

330 ) 

331 else: 

332 exc = HttpProcessingError() 

333 self.set_exception(exc, underlying_exc) 

334 return 

335 

336 self._upgraded = upgraded 

337 

338 payload: Optional[StreamReader] = None 

339 for message, payload in messages: 

340 if message.should_close: 

341 self._should_close = True 

342 

343 self._payload = payload 

344 

345 if self._skip_payload or message.code in EMPTY_BODY_STATUS_CODES: 

346 self.feed_data((message, EMPTY_PAYLOAD), 0) 

347 else: 

348 self.feed_data((message, payload), 0) 

349 

350 if payload is not None: 

351 # new message(s) was processed 

352 # register timeout handler unsubscribing 

353 # either on end-of-stream or immediately for 

354 # EMPTY_PAYLOAD 

355 if payload is not EMPTY_PAYLOAD: 

356 payload.on_eof(self._drop_timeout) 

357 else: 

358 self._drop_timeout() 

359 

360 if upgraded and tail: 

361 self.data_received(tail)