Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiohttp/client_proto.py: 19%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

190 statements  

1import asyncio 

2from contextlib import suppress 

3from typing import Any, Callable 

4 

5from .base_protocol import BaseProtocol 

6from .client_exceptions import ( 

7 ClientConnectionError, 

8 ClientOSError, 

9 ClientPayloadError, 

10 ServerDisconnectedError, 

11 SocketTimeoutError, 

12) 

13from .helpers import ( 

14 _EXC_SENTINEL, 

15 DEFAULT_CHUNK_SIZE, 

16 EMPTY_BODY_STATUS_CODES, 

17 BaseTimerContext, 

18 set_exception, 

19 set_result, 

20) 

21from .http import HttpResponseParser, RawResponseMessage 

22from .http_exceptions import HttpProcessingError 

23from .streams import EMPTY_PAYLOAD, DataQueue, StreamReader 

24 

25 

26class ResponseHandler(BaseProtocol, DataQueue[tuple[RawResponseMessage, StreamReader]]): 

27 """Helper class to adapt between Protocol and StreamReader.""" 

28 

29 def __init__(self, loop: asyncio.AbstractEventLoop) -> None: 

30 BaseProtocol.__init__(self, loop=loop, parser=None) 

31 DataQueue.__init__(self, loop) 

32 

33 self._should_close = False 

34 

35 self._payload: StreamReader | None = None 

36 self._skip_payload = False 

37 self._payload_parser = None 

38 self._data_received_cb: Callable[[], None] | None = None 

39 

40 self._timer = None 

41 self._tail = b"" 

42 

43 self._read_timeout: float | None = None 

44 self._read_timeout_handle: asyncio.TimerHandle | None = None 

45 

46 self._timeout_ceil_threshold: float | None = 5 

47 

48 self._closed: None | asyncio.Future[None] = None 

49 self._connection_lost_called = False 

50 

51 @property 

52 def closed(self) -> None | asyncio.Future[None]: 

53 """Future that is set when the connection is closed. 

54 

55 This property returns a Future that will be completed when the connection 

56 is closed. The Future is created lazily on first access to avoid creating 

57 futures that will never be awaited. 

58 

59 Returns: 

60 - A Future[None] if the connection is still open or was closed after 

61 this property was accessed 

62 - None if connection_lost() was already called before this property 

63 was ever accessed (indicating no one is waiting for the closure) 

64 """ 

65 if self._closed is None and not self._connection_lost_called: 

66 self._closed = self._loop.create_future() 

67 return self._closed 

68 

69 @property 

70 def upgraded(self) -> bool: 

71 return self._upgraded 

72 

73 @property 

74 def should_close(self) -> bool: 

75 return bool( 

76 self._should_close 

77 or (self._payload is not None and not self._payload.is_eof()) 

78 or self._upgraded 

79 or self._exception is not None 

80 or self._payload_parser is not None 

81 or self._buffer 

82 or self._tail 

83 ) 

84 

85 def force_close(self) -> None: 

86 self._should_close = True 

87 

88 def close(self) -> None: 

89 self._exception = None # Break cyclic references 

90 transport = self.transport 

91 if transport is not None: 

92 transport.close() 

93 self.transport = None 

94 self._payload = None 

95 self._drop_timeout() 

96 

97 def abort(self) -> None: 

98 self._exception = None # Break cyclic references 

99 transport = self.transport 

100 if transport is not None: 

101 transport.abort() 

102 self.transport = None 

103 self._payload = None 

104 self._drop_timeout() 

105 

106 def is_connected(self) -> bool: 

107 return self.transport is not None and not self.transport.is_closing() 

108 

109 def connection_lost(self, exc: BaseException | None) -> None: 

110 self._connection_lost_called = True 

111 self._drop_timeout() 

112 

113 original_connection_error = exc 

114 reraised_exc = original_connection_error 

115 

116 connection_closed_cleanly = original_connection_error is None 

117 

118 if self._closed is not None: 

119 # If someone is waiting for the closed future, 

120 # we should set it to None or an exception. If 

121 # self._closed is None, it means that 

122 # connection_lost() was called already 

123 # or nobody is waiting for it. 

124 if connection_closed_cleanly: 

125 set_result(self._closed, None) 

126 else: 

127 assert original_connection_error is not None 

128 set_exception( 

129 self._closed, 

130 ClientConnectionError( 

131 f"Connection lost: {original_connection_error !s}", 

132 ), 

133 original_connection_error, 

134 ) 

135 

136 if self._payload_parser is not None: 

137 with suppress(Exception): # FIXME: log this somehow? 

138 self._payload_parser.feed_eof() 

139 

140 uncompleted = None 

141 if self._parser is not None: 

142 try: 

143 uncompleted = self._parser.feed_eof() 

144 except Exception as underlying_exc: 

145 if self._payload is not None: 

146 client_payload_exc_msg = ( 

147 f"Response payload is not completed: {underlying_exc !r}" 

148 ) 

149 if not connection_closed_cleanly: 

150 client_payload_exc_msg = ( 

151 f"{client_payload_exc_msg !s}. " 

152 f"{original_connection_error !r}" 

153 ) 

154 set_exception( 

155 self._payload, 

156 ClientPayloadError(client_payload_exc_msg), 

157 underlying_exc, 

158 ) 

159 

160 if not self.is_eof(): 

161 if isinstance(original_connection_error, OSError): 

162 reraised_exc = ClientOSError(*original_connection_error.args) 

163 if connection_closed_cleanly: 

164 reraised_exc = ServerDisconnectedError(uncompleted) 

165 # assigns self._should_close to True as side effect, 

166 # we do it anyway below 

167 underlying_non_eof_exc = ( 

168 _EXC_SENTINEL 

169 if connection_closed_cleanly 

170 else original_connection_error 

171 ) 

172 assert underlying_non_eof_exc is not None 

173 assert reraised_exc is not None 

174 self.set_exception(reraised_exc, underlying_non_eof_exc) 

175 

176 self._should_close = True 

177 self._parser = None 

178 self._payload = None 

179 self._payload_parser = None 

180 self._reading_paused = False 

181 

182 super().connection_lost(reraised_exc) 

183 

184 def eof_received(self) -> None: 

185 # should call parser.feed_eof() most likely 

186 self._drop_timeout() 

187 

188 def pause_reading(self) -> None: 

189 super().pause_reading() 

190 self._drop_timeout() 

191 

192 def resume_reading(self, resume_parser: bool = True) -> None: 

193 super().resume_reading(resume_parser) 

194 self._reschedule_timeout() 

195 

196 def set_exception( 

197 self, 

198 exc: BaseException, 

199 exc_cause: BaseException = _EXC_SENTINEL, 

200 ) -> None: 

201 self._should_close = True 

202 self._drop_timeout() 

203 super().set_exception(exc, exc_cause) 

204 

205 def set_parser( 

206 self, 

207 parser: Any, 

208 payload: Any, 

209 data_received_cb: Callable[[], None] | None = None, 

210 ) -> None: 

211 # TODO: actual types are: 

212 # parser: WebSocketReader 

213 # payload: WebSocketDataQueue 

214 # but they are not generi enough 

215 # Need an ABC for both types 

216 self._payload = payload 

217 self._payload_parser = parser 

218 self._data_received_cb = data_received_cb 

219 

220 self._drop_timeout() 

221 

222 if self._tail: 

223 data, self._tail = self._tail, b"" 

224 self.data_received(data) 

225 

226 def set_response_params( 

227 self, 

228 *, 

229 timer: BaseTimerContext | None = None, 

230 skip_payload: bool = False, 

231 read_until_eof: bool = False, 

232 auto_decompress: bool = True, 

233 read_timeout: float | None = None, 

234 read_bufsize: int = DEFAULT_CHUNK_SIZE, 

235 timeout_ceil_threshold: float = 5, 

236 max_line_size: int = 8190, 

237 max_field_size: int = 8190, 

238 max_headers: int = 128, 

239 ) -> None: 

240 self._skip_payload = skip_payload 

241 

242 self._read_timeout = read_timeout 

243 

244 self._timeout_ceil_threshold = timeout_ceil_threshold 

245 

246 self._parser = HttpResponseParser( 

247 self, 

248 self._loop, 

249 read_bufsize, 

250 timer=timer, 

251 payload_exception=ClientPayloadError, 

252 response_with_body=not skip_payload, 

253 read_until_eof=read_until_eof, 

254 auto_decompress=auto_decompress, 

255 max_line_size=max_line_size, 

256 max_field_size=max_field_size, 

257 max_headers=max_headers, 

258 ) 

259 

260 if self._tail: 

261 data, self._tail = self._tail, b"" 

262 self.data_received(data) 

263 

264 def _drop_timeout(self) -> None: 

265 if self._read_timeout_handle is not None: 

266 self._read_timeout_handle.cancel() 

267 self._read_timeout_handle = None 

268 

269 def _reschedule_timeout(self) -> None: 

270 timeout = self._read_timeout 

271 if self._read_timeout_handle is not None: 

272 self._read_timeout_handle.cancel() 

273 

274 if timeout: 

275 self._read_timeout_handle = self._loop.call_later( 

276 timeout, self._on_read_timeout 

277 ) 

278 else: 

279 self._read_timeout_handle = None 

280 

281 def start_timeout(self) -> None: 

282 self._reschedule_timeout() 

283 

284 @property 

285 def read_timeout(self) -> float | None: 

286 return self._read_timeout 

287 

288 @read_timeout.setter 

289 def read_timeout(self, read_timeout: float | None) -> None: 

290 self._read_timeout = read_timeout 

291 

292 def _on_read_timeout(self) -> None: 

293 exc = SocketTimeoutError("Timeout on reading data from socket") 

294 self.set_exception(exc) 

295 if self._payload is not None: 

296 set_exception(self._payload, exc) 

297 

298 def data_received(self, data: bytes) -> None: 

299 # If no data, then we are resuming decompression. We haven't received 

300 # data from the socket, so we can avoid the reschedule overhead. 

301 if data: 

302 self._reschedule_timeout() 

303 

304 # custom payload parser - currently always WebSocketReader 

305 if self._payload_parser is not None: 

306 if self._data_received_cb is not None: 

307 self._data_received_cb() 

308 eof, tail = self._payload_parser.feed_data(data) 

309 if eof: 

310 self._payload = None 

311 self._payload_parser = None 

312 

313 if tail: 

314 self.data_received(tail) 

315 return 

316 

317 if self._upgraded or self._parser is None: 

318 # i.e. websocket connection, websocket parser is not set yet 

319 self._tail += data 

320 return 

321 

322 # parse http messages 

323 try: 

324 messages, upgraded, tail = self._parser.feed_data(data) 

325 except BaseException as underlying_exc: 

326 if self.transport is not None: 

327 # connection.release() could be called BEFORE 

328 # data_received(), the transport is already 

329 # closed in this case 

330 self.transport.close() 

331 if not isinstance(underlying_exc, Exception): 

332 raise 

333 # should_close is True after the call 

334 if isinstance(underlying_exc, HttpProcessingError): 

335 exc = HttpProcessingError( 

336 code=underlying_exc.code, 

337 message=underlying_exc.message, 

338 headers=underlying_exc.headers, 

339 ) 

340 else: 

341 exc = HttpProcessingError() 

342 self.set_exception(exc, underlying_exc) 

343 return 

344 

345 self._upgraded = upgraded 

346 

347 payload: StreamReader | None = None 

348 for message, payload in messages: 

349 if message.should_close: 

350 self._should_close = True 

351 

352 self._payload = payload 

353 

354 if self._skip_payload or message.code in EMPTY_BODY_STATUS_CODES: 

355 self.feed_data((message, EMPTY_PAYLOAD), 0) 

356 else: 

357 self.feed_data((message, payload), 0) 

358 

359 if payload is not None: 

360 # new message(s) was processed 

361 # register timeout handler unsubscribing 

362 # either on end-of-stream or immediately for 

363 # EMPTY_PAYLOAD 

364 if payload is not EMPTY_PAYLOAD: 

365 payload.on_eof(self._drop_timeout) 

366 else: 

367 self._drop_timeout() 

368 

369 if upgraded and tail: 

370 self.data_received(tail)