Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/client_proto.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

191 statements  

1import asyncio 

2from contextlib import suppress 

3from typing import Callable, Protocol 

4 

5from ._websocket.reader import WebSocketDataQueue 

6from .base_protocol import BaseProtocol 

7from .client_exceptions import ( 

8 ClientConnectionError, 

9 ClientOSError, 

10 ClientPayloadError, 

11 ServerDisconnectedError, 

12 SocketTimeoutError, 

13) 

14from .helpers import ( 

15 _EXC_SENTINEL, 

16 DEFAULT_CHUNK_SIZE, 

17 EMPTY_BODY_STATUS_CODES, 

18 BaseTimerContext, 

19 ErrorableProtocol, 

20 set_exception, 

21 set_result, 

22) 

23from .http import HttpResponseParser, RawResponseMessage, WebSocketReader 

24from .http_exceptions import HttpProcessingError 

25from .streams import EMPTY_PAYLOAD, DataQueue, StreamReader 

26 

27 

28class _Payload(ErrorableProtocol, Protocol): 

29 def is_eof(self) -> bool: ... 

30 

31 

32class ResponseHandler(BaseProtocol, DataQueue[tuple[RawResponseMessage, StreamReader]]): 

33 """Helper class to adapt between Protocol and StreamReader.""" 

34 

35 def __init__(self, loop: asyncio.AbstractEventLoop) -> None: 

36 BaseProtocol.__init__(self, loop=loop, parser=None) 

37 DataQueue.__init__(self, loop) 

38 

39 self._should_close = False 

40 

41 self._payload: _Payload | None = None 

42 self._skip_payload = False 

43 self._payload_parser: WebSocketReader | None = None 

44 self._data_received_cb: Callable[[], None] | None = None 

45 

46 self._timer = None 

47 self._tail = b"" 

48 

49 self._read_timeout: float | None = None 

50 self._read_timeout_handle: asyncio.TimerHandle | None = None 

51 

52 self._timeout_ceil_threshold: float | None = 5 

53 

54 self._closed: None | asyncio.Future[None] = None 

55 self._connection_lost_called = False 

56 

57 @property 

58 def closed(self) -> None | asyncio.Future[None]: 

59 """Future that is set when the connection is closed. 

60 

61 This property returns a Future that will be completed when the connection 

62 is closed. The Future is created lazily on first access to avoid creating 

63 futures that will never be awaited. 

64 

65 Returns: 

66 - A Future[None] if the connection is still open or was closed after 

67 this property was accessed 

68 - None if connection_lost() was already called before this property 

69 was ever accessed (indicating no one is waiting for the closure) 

70 """ 

71 if self._closed is None and not self._connection_lost_called: 

72 self._closed = self._loop.create_future() 

73 return self._closed 

74 

75 @property 

76 def upgraded(self) -> bool: 

77 return self._upgraded 

78 

79 @property 

80 def should_close(self) -> bool: 

81 return bool( 

82 self._should_close 

83 or (self._payload is not None and not self._payload.is_eof()) 

84 or self._upgraded 

85 or self._exception is not None 

86 or self._payload_parser is not None 

87 or self._buffer 

88 or self._tail 

89 ) 

90 

91 def force_close(self) -> None: 

92 self._should_close = True 

93 

94 def close(self) -> None: 

95 self._exception = None # Break cyclic references 

96 transport = self.transport 

97 if transport is not None: 

98 transport.close() 

99 self.transport = None 

100 self._payload = None 

101 self._drop_timeout() 

102 

103 def abort(self) -> None: 

104 self._exception = None # Break cyclic references 

105 transport = self.transport 

106 if transport is not None: 

107 transport.abort() 

108 self.transport = None 

109 self._payload = None 

110 self._drop_timeout() 

111 

112 def is_connected(self) -> bool: 

113 return self.transport is not None and not self.transport.is_closing() 

114 

115 def connection_lost(self, exc: BaseException | None) -> None: 

116 self._connection_lost_called = True 

117 self._drop_timeout() 

118 

119 original_connection_error = exc 

120 reraised_exc = original_connection_error 

121 

122 connection_closed_cleanly = original_connection_error is None 

123 

124 if self._closed is not None: 

125 # If someone is waiting for the closed future, 

126 # we should set it to None or an exception. If 

127 # self._closed is None, it means that 

128 # connection_lost() was called already 

129 # or nobody is waiting for it. 

130 if connection_closed_cleanly: 

131 set_result(self._closed, None) 

132 else: 

133 assert original_connection_error is not None 

134 set_exception( 

135 self._closed, 

136 ClientConnectionError( 

137 f"Connection lost: {original_connection_error !s}", 

138 ), 

139 original_connection_error, 

140 ) 

141 

142 if self._payload_parser is not None: 

143 with suppress(Exception): # FIXME: log this somehow? 

144 self._payload_parser.feed_eof() 

145 

146 uncompleted = None 

147 if self._parser is not None: 

148 try: 

149 uncompleted = self._parser.feed_eof() 

150 except Exception as underlying_exc: 

151 if self._payload is not None: 

152 client_payload_exc_msg = ( 

153 f"Response payload is not completed: {underlying_exc !r}" 

154 ) 

155 if not connection_closed_cleanly: 

156 client_payload_exc_msg = ( 

157 f"{client_payload_exc_msg !s}. " 

158 f"{original_connection_error !r}" 

159 ) 

160 set_exception( 

161 self._payload, 

162 ClientPayloadError(client_payload_exc_msg), 

163 underlying_exc, 

164 ) 

165 

166 if not self.is_eof(): 

167 if isinstance(original_connection_error, OSError): 

168 reraised_exc = ClientOSError(*original_connection_error.args) 

169 if connection_closed_cleanly: 

170 reraised_exc = ServerDisconnectedError(uncompleted) 

171 # assigns self._should_close to True as side effect, 

172 # we do it anyway below 

173 underlying_non_eof_exc = ( 

174 _EXC_SENTINEL 

175 if connection_closed_cleanly 

176 else original_connection_error 

177 ) 

178 assert underlying_non_eof_exc is not None 

179 assert reraised_exc is not None 

180 self.set_exception(reraised_exc, underlying_non_eof_exc) 

181 

182 self._should_close = True 

183 self._parser = None 

184 self._payload = None 

185 self._payload_parser = None 

186 self._reading_paused = False 

187 

188 super().connection_lost(reraised_exc) 

189 

190 def eof_received(self) -> None: 

191 # should call parser.feed_eof() most likely 

192 self._drop_timeout() 

193 

194 def pause_reading(self) -> None: 

195 super().pause_reading() 

196 self._drop_timeout() 

197 

198 def resume_reading(self, resume_parser: bool = True) -> None: 

199 super().resume_reading(resume_parser) 

200 self._reschedule_timeout() 

201 

202 def set_exception( 

203 self, 

204 exc: type[BaseException] | BaseException, 

205 exc_cause: BaseException = _EXC_SENTINEL, 

206 ) -> None: 

207 self._should_close = True 

208 self._drop_timeout() 

209 super().set_exception(exc, exc_cause) 

210 

211 def set_parser( 

212 self, 

213 parser: WebSocketReader, 

214 payload: WebSocketDataQueue, 

215 data_received_cb: Callable[[], None] | None = None, 

216 ) -> None: 

217 self._payload = payload 

218 self._payload_parser = parser 

219 self._data_received_cb = data_received_cb 

220 

221 self._drop_timeout() 

222 

223 if self._tail: 

224 data, self._tail = self._tail, b"" 

225 self.data_received(data) 

226 

227 def set_response_params( 

228 self, 

229 *, 

230 timer: BaseTimerContext | None = None, 

231 skip_payload: bool = False, 

232 read_until_eof: bool = False, 

233 auto_decompress: bool = True, 

234 read_timeout: float | None = None, 

235 read_bufsize: int = DEFAULT_CHUNK_SIZE, 

236 timeout_ceil_threshold: float = 5, 

237 max_line_size: int = 8190, 

238 max_field_size: int = 8190, 

239 max_headers: int = 128, 

240 ) -> None: 

241 self._skip_payload = skip_payload 

242 

243 self._read_timeout = read_timeout 

244 

245 self._timeout_ceil_threshold = timeout_ceil_threshold 

246 

247 self._parser = HttpResponseParser( 

248 self, 

249 self._loop, 

250 read_bufsize, 

251 timer=timer, 

252 payload_exception=ClientPayloadError, 

253 response_with_body=not skip_payload, 

254 read_until_eof=read_until_eof, 

255 auto_decompress=auto_decompress, 

256 max_line_size=max_line_size, 

257 max_field_size=max_field_size, 

258 max_headers=max_headers, 

259 ) 

260 

261 if self._tail: 

262 data, self._tail = self._tail, b"" 

263 self.data_received(data) 

264 

265 def _drop_timeout(self) -> None: 

266 if self._read_timeout_handle is not None: 

267 self._read_timeout_handle.cancel() 

268 self._read_timeout_handle = None 

269 

270 def _reschedule_timeout(self) -> None: 

271 timeout = self._read_timeout 

272 if self._read_timeout_handle is not None: 

273 self._read_timeout_handle.cancel() 

274 

275 if timeout: 

276 self._read_timeout_handle = self._loop.call_later( 

277 timeout, self._on_read_timeout 

278 ) 

279 else: 

280 self._read_timeout_handle = None 

281 

282 def start_timeout(self) -> None: 

283 self._reschedule_timeout() 

284 

285 @property 

286 def read_timeout(self) -> float | None: 

287 return self._read_timeout 

288 

289 @read_timeout.setter 

290 def read_timeout(self, read_timeout: float | None) -> None: 

291 self._read_timeout = read_timeout 

292 

293 def _on_read_timeout(self) -> None: 

294 exc = SocketTimeoutError("Timeout on reading data from socket") 

295 self.set_exception(exc) 

296 if self._payload is not None: 

297 set_exception(self._payload, exc) 

298 

299 def data_received(self, data: bytes) -> None: 

300 # If no data, then we are resuming decompression. We haven't received 

301 # data from the socket, so we can avoid the reschedule overhead. 

302 if data: 

303 self._reschedule_timeout() 

304 

305 # custom payload parser - currently always WebSocketReader 

306 if self._payload_parser is not None: 

307 if self._data_received_cb is not None: 

308 self._data_received_cb() 

309 eof, tail = self._payload_parser.feed_data(data) 

310 if eof: 

311 self._payload = None 

312 self._payload_parser = None 

313 

314 if tail: 

315 self.data_received(tail) 

316 return 

317 

318 if self._upgraded or self._parser is None: 

319 # i.e. websocket connection, websocket parser is not set yet 

320 self._tail += data 

321 return 

322 

323 # parse http messages 

324 try: 

325 messages, upgraded, tail = self._parser.feed_data(data) 

326 except Exception as underlying_exc: 

327 if self.transport is not None: 

328 # connection.release() could be called BEFORE 

329 # data_received(), the transport is already 

330 # closed in this case 

331 self.transport.close() 

332 # should_close is True after the call 

333 if isinstance(underlying_exc, HttpProcessingError): 

334 exc = HttpProcessingError( 

335 code=underlying_exc.code, 

336 message=underlying_exc.message, 

337 headers=underlying_exc.headers, 

338 ) 

339 else: 

340 exc = HttpProcessingError() 

341 self.set_exception(exc, underlying_exc) 

342 return 

343 

344 self._upgraded = upgraded 

345 

346 payload: StreamReader | None = None 

347 for message, payload in messages: 

348 if message.should_close: 

349 self._should_close = True 

350 

351 self._payload = payload 

352 

353 if self._skip_payload or message.code in EMPTY_BODY_STATUS_CODES: 

354 self.feed_data((message, EMPTY_PAYLOAD)) 

355 else: 

356 self.feed_data((message, payload)) 

357 

358 if payload is not None: 

359 # new message(s) was processed 

360 # register timeout handler unsubscribing 

361 # either on end-of-stream or immediately for 

362 # EMPTY_PAYLOAD 

363 if payload is not EMPTY_PAYLOAD: 

364 payload.on_eof(self._drop_timeout) 

365 else: 

366 self._drop_timeout() 

367 

368 if upgraded and tail: 

369 self.data_received(tail)