Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/client_proto.py: 19%

153 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:52 +0000

1import asyncio 

2from contextlib import suppress 

3from typing import Any, Optional, Tuple 

4 

5from .base_protocol import BaseProtocol 

6from .client_exceptions import ( 

7 ClientOSError, 

8 ClientPayloadError, 

9 ServerDisconnectedError, 

10 ServerTimeoutError, 

11) 

12from .helpers import BaseTimerContext, set_exception, set_result 

13from .http import HttpResponseParser, RawResponseMessage 

14from .streams import EMPTY_PAYLOAD, DataQueue, StreamReader 

15 

16 

17class ResponseHandler(BaseProtocol, DataQueue[Tuple[RawResponseMessage, StreamReader]]): 

18 """Helper class to adapt between Protocol and StreamReader.""" 

19 

20 def __init__(self, loop: asyncio.AbstractEventLoop) -> None: 

21 BaseProtocol.__init__(self, loop=loop) 

22 DataQueue.__init__(self, loop) 

23 

24 self._should_close = False 

25 

26 self._payload: Optional[StreamReader] = None 

27 self._skip_payload = False 

28 self._payload_parser = None 

29 

30 self._timer = None 

31 

32 self._tail = b"" 

33 self._upgraded = False 

34 self._parser: Optional[HttpResponseParser] = None 

35 

36 self._read_timeout: Optional[float] = None 

37 self._read_timeout_handle: Optional[asyncio.TimerHandle] = None 

38 

39 self._timeout_ceil_threshold: Optional[float] = 5 

40 

41 self.closed: asyncio.Future[None] = self._loop.create_future() 

42 

43 @property 

44 def upgraded(self) -> bool: 

45 return self._upgraded 

46 

47 @property 

48 def should_close(self) -> bool: 

49 if self._payload is not None and not self._payload.is_eof() or self._upgraded: 

50 return True 

51 

52 return ( 

53 self._should_close 

54 or self._upgraded 

55 or self.exception() is not None 

56 or self._payload_parser is not None 

57 or len(self) > 0 

58 or bool(self._tail) 

59 ) 

60 

61 def force_close(self) -> None: 

62 self._should_close = True 

63 

64 def close(self) -> None: 

65 transport = self.transport 

66 if transport is not None: 

67 transport.close() 

68 self.transport = None 

69 self._payload = None 

70 self._drop_timeout() 

71 

72 def is_connected(self) -> bool: 

73 return self.transport is not None and not self.transport.is_closing() 

74 

75 def connection_lost(self, exc: Optional[BaseException]) -> None: 

76 self._drop_timeout() 

77 

78 if exc is not None: 

79 set_exception(self.closed, exc) 

80 else: 

81 set_result(self.closed, None) 

82 

83 if self._payload_parser is not None: 

84 with suppress(Exception): 

85 self._payload_parser.feed_eof() 

86 

87 uncompleted = None 

88 if self._parser is not None: 

89 try: 

90 uncompleted = self._parser.feed_eof() 

91 except Exception: 

92 if self._payload is not None: 

93 self._payload.set_exception( 

94 ClientPayloadError("Response payload is not completed") 

95 ) 

96 

97 if not self.is_eof(): 

98 if isinstance(exc, OSError): 

99 exc = ClientOSError(*exc.args) 

100 if exc is None: 

101 exc = ServerDisconnectedError(uncompleted) 

102 # assigns self._should_close to True as side effect, 

103 # we do it anyway below 

104 self.set_exception(exc) 

105 

106 self._should_close = True 

107 self._parser = None 

108 self._payload = None 

109 self._payload_parser = None 

110 self._reading_paused = False 

111 

112 super().connection_lost(exc) 

113 

114 def eof_received(self) -> None: 

115 # should call parser.feed_eof() most likely 

116 self._drop_timeout() 

117 

118 def pause_reading(self) -> None: 

119 super().pause_reading() 

120 self._drop_timeout() 

121 

122 def resume_reading(self) -> None: 

123 super().resume_reading() 

124 self._reschedule_timeout() 

125 

126 def set_exception(self, exc: BaseException) -> None: 

127 self._should_close = True 

128 self._drop_timeout() 

129 super().set_exception(exc) 

130 

131 def set_parser(self, parser: Any, payload: Any) -> None: 

132 # TODO: actual types are: 

133 # parser: WebSocketReader 

134 # payload: FlowControlDataQueue 

135 # but they are not generi enough 

136 # Need an ABC for both types 

137 self._payload = payload 

138 self._payload_parser = parser 

139 

140 self._drop_timeout() 

141 

142 if self._tail: 

143 data, self._tail = self._tail, b"" 

144 self.data_received(data) 

145 

146 def set_response_params( 

147 self, 

148 *, 

149 timer: Optional[BaseTimerContext] = None, 

150 skip_payload: bool = False, 

151 read_until_eof: bool = False, 

152 auto_decompress: bool = True, 

153 read_timeout: Optional[float] = None, 

154 read_bufsize: int = 2**16, 

155 timeout_ceil_threshold: float = 5, 

156 max_line_size: int = 8190, 

157 max_field_size: int = 8190, 

158 ) -> None: 

159 self._skip_payload = skip_payload 

160 

161 self._read_timeout = read_timeout 

162 

163 self._timeout_ceil_threshold = timeout_ceil_threshold 

164 

165 self._parser = HttpResponseParser( 

166 self, 

167 self._loop, 

168 read_bufsize, 

169 timer=timer, 

170 payload_exception=ClientPayloadError, 

171 response_with_body=not skip_payload, 

172 read_until_eof=read_until_eof, 

173 auto_decompress=auto_decompress, 

174 max_line_size=max_line_size, 

175 max_field_size=max_field_size, 

176 ) 

177 

178 if self._tail: 

179 data, self._tail = self._tail, b"" 

180 self.data_received(data) 

181 

182 def _drop_timeout(self) -> None: 

183 if self._read_timeout_handle is not None: 

184 self._read_timeout_handle.cancel() 

185 self._read_timeout_handle = None 

186 

187 def _reschedule_timeout(self) -> None: 

188 timeout = self._read_timeout 

189 if self._read_timeout_handle is not None: 

190 self._read_timeout_handle.cancel() 

191 

192 if timeout: 

193 self._read_timeout_handle = self._loop.call_later( 

194 timeout, self._on_read_timeout 

195 ) 

196 else: 

197 self._read_timeout_handle = None 

198 

199 def start_timeout(self) -> None: 

200 self._reschedule_timeout() 

201 

202 def _on_read_timeout(self) -> None: 

203 exc = ServerTimeoutError("Timeout on reading data from socket") 

204 self.set_exception(exc) 

205 if self._payload is not None: 

206 self._payload.set_exception(exc) 

207 

208 def data_received(self, data: bytes) -> None: 

209 self._reschedule_timeout() 

210 

211 if not data: 

212 return 

213 

214 # custom payload parser 

215 if self._payload_parser is not None: 

216 eof, tail = self._payload_parser.feed_data(data) 

217 if eof: 

218 self._payload = None 

219 self._payload_parser = None 

220 

221 if tail: 

222 self.data_received(tail) 

223 return 

224 else: 

225 if self._upgraded or self._parser is None: 

226 # i.e. websocket connection, websocket parser is not set yet 

227 self._tail += data 

228 else: 

229 # parse http messages 

230 try: 

231 messages, upgraded, tail = self._parser.feed_data(data) 

232 except BaseException as exc: 

233 if self.transport is not None: 

234 # connection.release() could be called BEFORE 

235 # data_received(), the transport is already 

236 # closed in this case 

237 self.transport.close() 

238 # should_close is True after the call 

239 self.set_exception(exc) 

240 return 

241 

242 self._upgraded = upgraded 

243 

244 payload: Optional[StreamReader] = None 

245 for message, payload in messages: 

246 if message.should_close: 

247 self._should_close = True 

248 

249 self._payload = payload 

250 

251 if self._skip_payload or message.code in (204, 304): 

252 self.feed_data((message, EMPTY_PAYLOAD), 0) 

253 else: 

254 self.feed_data((message, payload), 0) 

255 if payload is not None: 

256 # new message(s) was processed 

257 # register timeout handler unsubscribing 

258 # either on end-of-stream or immediately for 

259 # EMPTY_PAYLOAD 

260 if payload is not EMPTY_PAYLOAD: 

261 payload.on_eof(self._drop_timeout) 

262 else: 

263 self._drop_timeout() 

264 

265 if tail: 

266 if upgraded: 

267 self.data_received(tail) 

268 else: 

269 self._tail = tail