Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/client_proto.py: 19%

155 statements  

« prev     ^ index     » next       coverage.py v7.4.0, created at 2024-01-26 06:16 +0000

1import asyncio 

2from contextlib import suppress 

3from typing import Any, Optional, Tuple 

4 

5from .base_protocol import BaseProtocol 

6from .client_exceptions import ( 

7 ClientOSError, 

8 ClientPayloadError, 

9 ServerDisconnectedError, 

10 SocketTimeoutError, 

11) 

12from .helpers import ( 

13 BaseTimerContext, 

14 set_exception, 

15 set_result, 

16 status_code_must_be_empty_body, 

17) 

18from .http import HttpResponseParser, RawResponseMessage, WebSocketReader 

19from .streams import EMPTY_PAYLOAD, DataQueue, StreamReader 

20 

21 

22class ResponseHandler(BaseProtocol, DataQueue[Tuple[RawResponseMessage, StreamReader]]): 

23 """Helper class to adapt between Protocol and StreamReader.""" 

24 

25 def __init__(self, loop: asyncio.AbstractEventLoop) -> None: 

26 BaseProtocol.__init__(self, loop=loop) 

27 DataQueue.__init__(self, loop) 

28 

29 self._should_close = False 

30 

31 self._payload: Optional[StreamReader] = None 

32 self._skip_payload = False 

33 self._payload_parser: Optional[WebSocketReader] = None 

34 

35 self._timer = None 

36 

37 self._tail = b"" 

38 self._upgraded = False 

39 self._parser: Optional[HttpResponseParser] = None 

40 

41 self._read_timeout: Optional[float] = None 

42 self._read_timeout_handle: Optional[asyncio.TimerHandle] = None 

43 

44 self._timeout_ceil_threshold: Optional[float] = 5 

45 

46 self.closed: asyncio.Future[None] = self._loop.create_future() 

47 

48 @property 

49 def upgraded(self) -> bool: 

50 return self._upgraded 

51 

52 @property 

53 def should_close(self) -> bool: 

54 if self._payload is not None and not self._payload.is_eof(): 

55 return True 

56 

57 return ( 

58 self._should_close 

59 or self._upgraded 

60 or self.exception() is not None 

61 or self._payload_parser is not None 

62 or len(self) > 0 

63 or bool(self._tail) 

64 ) 

65 

66 def force_close(self) -> None: 

67 self._should_close = True 

68 

69 def close(self) -> None: 

70 transport = self.transport 

71 if transport is not None: 

72 transport.close() 

73 self.transport = None 

74 self._payload = None 

75 self._drop_timeout() 

76 

77 def is_connected(self) -> bool: 

78 return self.transport is not None and not self.transport.is_closing() 

79 

80 def connection_lost(self, exc: Optional[BaseException]) -> None: 

81 self._drop_timeout() 

82 

83 if exc is not None: 

84 set_exception(self.closed, exc) 

85 else: 

86 set_result(self.closed, None) 

87 

88 if self._payload_parser is not None: 

89 with suppress(Exception): 

90 self._payload_parser.feed_eof() 

91 

92 uncompleted = None 

93 if self._parser is not None: 

94 try: 

95 uncompleted = self._parser.feed_eof() 

96 except Exception as e: 

97 if self._payload is not None: 

98 exc = ClientPayloadError("Response payload is not completed") 

99 exc.__cause__ = e 

100 self._payload.set_exception(exc) 

101 

102 if not self.is_eof(): 

103 if isinstance(exc, OSError): 

104 exc = ClientOSError(*exc.args) 

105 if exc is None: 

106 exc = ServerDisconnectedError(uncompleted) 

107 # assigns self._should_close to True as side effect, 

108 # we do it anyway below 

109 self.set_exception(exc) 

110 

111 self._should_close = True 

112 self._parser = None 

113 self._payload = None 

114 self._payload_parser = None 

115 self._reading_paused = False 

116 

117 super().connection_lost(exc) 

118 

119 def eof_received(self) -> None: 

120 # should call parser.feed_eof() most likely 

121 self._drop_timeout() 

122 

123 def pause_reading(self) -> None: 

124 super().pause_reading() 

125 self._drop_timeout() 

126 

127 def resume_reading(self) -> None: 

128 super().resume_reading() 

129 self._reschedule_timeout() 

130 

131 def set_exception(self, exc: BaseException) -> None: 

132 self._should_close = True 

133 self._drop_timeout() 

134 super().set_exception(exc) 

135 

136 def set_parser(self, parser: Any, payload: Any) -> None: 

137 # TODO: actual types are: 

138 # parser: WebSocketReader 

139 # payload: FlowControlDataQueue 

140 # but they are not generi enough 

141 # Need an ABC for both types 

142 self._payload = payload 

143 self._payload_parser = parser 

144 

145 self._drop_timeout() 

146 

147 if self._tail: 

148 data, self._tail = self._tail, b"" 

149 self.data_received(data) 

150 

151 def set_response_params( 

152 self, 

153 *, 

154 timer: Optional[BaseTimerContext] = None, 

155 skip_payload: bool = False, 

156 read_until_eof: bool = False, 

157 auto_decompress: bool = True, 

158 read_timeout: Optional[float] = None, 

159 read_bufsize: int = 2**16, 

160 timeout_ceil_threshold: float = 5, 

161 max_line_size: int = 8190, 

162 max_field_size: int = 8190, 

163 ) -> None: 

164 self._skip_payload = skip_payload 

165 

166 self._read_timeout = read_timeout 

167 

168 self._timeout_ceil_threshold = timeout_ceil_threshold 

169 

170 self._parser = HttpResponseParser( 

171 self, 

172 self._loop, 

173 read_bufsize, 

174 timer=timer, 

175 payload_exception=ClientPayloadError, 

176 response_with_body=not skip_payload, 

177 read_until_eof=read_until_eof, 

178 auto_decompress=auto_decompress, 

179 max_line_size=max_line_size, 

180 max_field_size=max_field_size, 

181 ) 

182 

183 if self._tail: 

184 data, self._tail = self._tail, b"" 

185 self.data_received(data) 

186 

187 def _drop_timeout(self) -> None: 

188 if self._read_timeout_handle is not None: 

189 self._read_timeout_handle.cancel() 

190 self._read_timeout_handle = None 

191 

192 def _reschedule_timeout(self) -> None: 

193 timeout = self._read_timeout 

194 if self._read_timeout_handle is not None: 

195 self._read_timeout_handle.cancel() 

196 

197 if timeout: 

198 self._read_timeout_handle = self._loop.call_later( 

199 timeout, self._on_read_timeout 

200 ) 

201 else: 

202 self._read_timeout_handle = None 

203 

204 def start_timeout(self) -> None: 

205 self._reschedule_timeout() 

206 

207 def _on_read_timeout(self) -> None: 

208 exc = SocketTimeoutError("Timeout on reading data from socket") 

209 self.set_exception(exc) 

210 if self._payload is not None: 

211 self._payload.set_exception(exc) 

212 

213 def data_received(self, data: bytes) -> None: 

214 self._reschedule_timeout() 

215 

216 if not data: 

217 return 

218 

219 # custom payload parser 

220 if self._payload_parser is not None: 

221 eof, tail = self._payload_parser.feed_data(data) 

222 if eof: 

223 self._payload = None 

224 self._payload_parser = None 

225 

226 if tail: 

227 self.data_received(tail) 

228 return 

229 else: 

230 if self._upgraded or self._parser is None: 

231 # i.e. websocket connection, websocket parser is not set yet 

232 self._tail += data 

233 else: 

234 # parse http messages 

235 try: 

236 messages, upgraded, tail = self._parser.feed_data(data) 

237 except BaseException as exc: 

238 if self.transport is not None: 

239 # connection.release() could be called BEFORE 

240 # data_received(), the transport is already 

241 # closed in this case 

242 self.transport.close() 

243 # should_close is True after the call 

244 self.set_exception(exc) 

245 return 

246 

247 self._upgraded = upgraded 

248 

249 payload: Optional[StreamReader] = None 

250 for message, payload in messages: 

251 if message.should_close: 

252 self._should_close = True 

253 

254 self._payload = payload 

255 

256 if self._skip_payload or status_code_must_be_empty_body( 

257 message.code 

258 ): 

259 self.feed_data((message, EMPTY_PAYLOAD), 0) 

260 else: 

261 self.feed_data((message, payload), 0) 

262 if payload is not None: 

263 # new message(s) was processed 

264 # register timeout handler unsubscribing 

265 # either on end-of-stream or immediately for 

266 # EMPTY_PAYLOAD 

267 if payload is not EMPTY_PAYLOAD: 

268 payload.on_eof(self._drop_timeout) 

269 else: 

270 self._drop_timeout() 

271 

272 if tail: 

273 if upgraded: 

274 self.data_received(tail) 

275 else: 

276 self._tail = tail