Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/aiohttp/client_proto.py: 19%

149 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:40 +0000

1import asyncio 

2from contextlib import suppress 

3from typing import Any, Optional, Tuple 

4 

5from .base_protocol import BaseProtocol 

6from .client_exceptions import ( 

7 ClientOSError, 

8 ClientPayloadError, 

9 ServerDisconnectedError, 

10 ServerTimeoutError, 

11) 

12from .helpers import BaseTimerContext, status_code_must_be_empty_body 

13from .http import HttpResponseParser, RawResponseMessage 

14from .streams import EMPTY_PAYLOAD, DataQueue, StreamReader 

15 

16 

17class ResponseHandler(BaseProtocol, DataQueue[Tuple[RawResponseMessage, StreamReader]]): 

18 """Helper class to adapt between Protocol and StreamReader.""" 

19 

20 def __init__(self, loop: asyncio.AbstractEventLoop) -> None: 

21 BaseProtocol.__init__(self, loop=loop) 

22 DataQueue.__init__(self, loop) 

23 

24 self._should_close = False 

25 

26 self._payload: Optional[StreamReader] = None 

27 self._skip_payload = False 

28 self._payload_parser = None 

29 

30 self._timer = None 

31 

32 self._tail = b"" 

33 self._upgraded = False 

34 self._parser: Optional[HttpResponseParser] = None 

35 

36 self._read_timeout: Optional[float] = None 

37 self._read_timeout_handle: Optional[asyncio.TimerHandle] = None 

38 

39 self._timeout_ceil_threshold: Optional[float] = 5 

40 

41 @property 

42 def upgraded(self) -> bool: 

43 return self._upgraded 

44 

45 @property 

46 def should_close(self) -> bool: 

47 if self._payload is not None and not self._payload.is_eof() or self._upgraded: 

48 return True 

49 

50 return ( 

51 self._should_close 

52 or self._upgraded 

53 or self.exception() is not None 

54 or self._payload_parser is not None 

55 or len(self) > 0 

56 or bool(self._tail) 

57 ) 

58 

59 def force_close(self) -> None: 

60 self._should_close = True 

61 

62 def close(self) -> None: 

63 transport = self.transport 

64 if transport is not None: 

65 transport.close() 

66 self.transport = None 

67 self._payload = None 

68 self._drop_timeout() 

69 

70 def is_connected(self) -> bool: 

71 return self.transport is not None and not self.transport.is_closing() 

72 

73 def connection_lost(self, exc: Optional[BaseException]) -> None: 

74 self._drop_timeout() 

75 

76 if self._payload_parser is not None: 

77 with suppress(Exception): 

78 self._payload_parser.feed_eof() 

79 

80 uncompleted = None 

81 if self._parser is not None: 

82 try: 

83 uncompleted = self._parser.feed_eof() 

84 except Exception: 

85 if self._payload is not None: 

86 self._payload.set_exception( 

87 ClientPayloadError("Response payload is not completed") 

88 ) 

89 

90 if not self.is_eof(): 

91 if isinstance(exc, OSError): 

92 exc = ClientOSError(*exc.args) 

93 if exc is None: 

94 exc = ServerDisconnectedError(uncompleted) 

95 # assigns self._should_close to True as side effect, 

96 # we do it anyway below 

97 self.set_exception(exc) 

98 

99 self._should_close = True 

100 self._parser = None 

101 self._payload = None 

102 self._payload_parser = None 

103 self._reading_paused = False 

104 

105 super().connection_lost(exc) 

106 

107 def eof_received(self) -> None: 

108 # should call parser.feed_eof() most likely 

109 self._drop_timeout() 

110 

111 def pause_reading(self) -> None: 

112 super().pause_reading() 

113 self._drop_timeout() 

114 

115 def resume_reading(self) -> None: 

116 super().resume_reading() 

117 self._reschedule_timeout() 

118 

119 def set_exception(self, exc: BaseException) -> None: 

120 self._should_close = True 

121 self._drop_timeout() 

122 super().set_exception(exc) 

123 

124 def set_parser(self, parser: Any, payload: Any) -> None: 

125 # TODO: actual types are: 

126 # parser: WebSocketReader 

127 # payload: FlowControlDataQueue 

128 # but they are not generi enough 

129 # Need an ABC for both types 

130 self._payload = payload 

131 self._payload_parser = parser 

132 

133 self._drop_timeout() 

134 

135 if self._tail: 

136 data, self._tail = self._tail, b"" 

137 self.data_received(data) 

138 

139 def set_response_params( 

140 self, 

141 *, 

142 timer: Optional[BaseTimerContext] = None, 

143 skip_payload: bool = False, 

144 read_until_eof: bool = False, 

145 auto_decompress: bool = True, 

146 read_timeout: Optional[float] = None, 

147 read_bufsize: int = 2**16, 

148 timeout_ceil_threshold: float = 5, 

149 max_line_size: int = 8190, 

150 max_field_size: int = 8190, 

151 ) -> None: 

152 self._skip_payload = skip_payload 

153 

154 self._read_timeout = read_timeout 

155 

156 self._timeout_ceil_threshold = timeout_ceil_threshold 

157 

158 self._parser = HttpResponseParser( 

159 self, 

160 self._loop, 

161 read_bufsize, 

162 timer=timer, 

163 payload_exception=ClientPayloadError, 

164 response_with_body=not skip_payload, 

165 read_until_eof=read_until_eof, 

166 auto_decompress=auto_decompress, 

167 max_line_size=max_line_size, 

168 max_field_size=max_field_size, 

169 ) 

170 

171 if self._tail: 

172 data, self._tail = self._tail, b"" 

173 self.data_received(data) 

174 

175 def _drop_timeout(self) -> None: 

176 if self._read_timeout_handle is not None: 

177 self._read_timeout_handle.cancel() 

178 self._read_timeout_handle = None 

179 

180 def _reschedule_timeout(self) -> None: 

181 timeout = self._read_timeout 

182 if self._read_timeout_handle is not None: 

183 self._read_timeout_handle.cancel() 

184 

185 if timeout: 

186 self._read_timeout_handle = self._loop.call_later( 

187 timeout, self._on_read_timeout 

188 ) 

189 else: 

190 self._read_timeout_handle = None 

191 

192 def start_timeout(self) -> None: 

193 self._reschedule_timeout() 

194 

195 def _on_read_timeout(self) -> None: 

196 exc = ServerTimeoutError("Timeout on reading data from socket") 

197 self.set_exception(exc) 

198 if self._payload is not None: 

199 self._payload.set_exception(exc) 

200 

201 def data_received(self, data: bytes) -> None: 

202 self._reschedule_timeout() 

203 

204 if not data: 

205 return 

206 

207 # custom payload parser 

208 if self._payload_parser is not None: 

209 eof, tail = self._payload_parser.feed_data(data) 

210 if eof: 

211 self._payload = None 

212 self._payload_parser = None 

213 

214 if tail: 

215 self.data_received(tail) 

216 return 

217 else: 

218 if self._upgraded or self._parser is None: 

219 # i.e. websocket connection, websocket parser is not set yet 

220 self._tail += data 

221 else: 

222 # parse http messages 

223 try: 

224 messages, upgraded, tail = self._parser.feed_data(data) 

225 except BaseException as exc: 

226 if self.transport is not None: 

227 # connection.release() could be called BEFORE 

228 # data_received(), the transport is already 

229 # closed in this case 

230 self.transport.close() 

231 # should_close is True after the call 

232 self.set_exception(exc) 

233 return 

234 

235 self._upgraded = upgraded 

236 

237 payload: Optional[StreamReader] = None 

238 for message, payload in messages: 

239 if message.should_close: 

240 self._should_close = True 

241 

242 self._payload = payload 

243 

244 if self._skip_payload or status_code_must_be_empty_body( 

245 message.code 

246 ): 

247 self.feed_data((message, EMPTY_PAYLOAD), 0) 

248 else: 

249 self.feed_data((message, payload), 0) 

250 if payload is not None: 

251 # new message(s) was processed 

252 # register timeout handler unsubscribing 

253 # either on end-of-stream or immediately for 

254 # EMPTY_PAYLOAD 

255 if payload is not EMPTY_PAYLOAD: 

256 payload.on_eof(self._drop_timeout) 

257 else: 

258 self._drop_timeout() 

259 

260 if tail: 

261 if upgraded: 

262 self.data_received(tail) 

263 else: 

264 self._tail = tail