Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/aiohttp/client_proto.py: 19%

146 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:56 +0000

1import asyncio 

2from contextlib import suppress 

3from typing import Any, Optional, Tuple 

4 

5from .base_protocol import BaseProtocol 

6from .client_exceptions import ( 

7 ClientOSError, 

8 ClientPayloadError, 

9 ServerDisconnectedError, 

10 ServerTimeoutError, 

11) 

12from .helpers import BaseTimerContext 

13from .http import HttpResponseParser, RawResponseMessage 

14from .streams import EMPTY_PAYLOAD, DataQueue, StreamReader 

15 

16 

17class ResponseHandler(BaseProtocol, DataQueue[Tuple[RawResponseMessage, StreamReader]]): 

18 """Helper class to adapt between Protocol and StreamReader.""" 

19 

20 def __init__(self, loop: asyncio.AbstractEventLoop) -> None: 

21 BaseProtocol.__init__(self, loop=loop) 

22 DataQueue.__init__(self, loop) 

23 

24 self._should_close = False 

25 

26 self._payload: Optional[StreamReader] = None 

27 self._skip_payload = False 

28 self._payload_parser = None 

29 

30 self._timer = None 

31 

32 self._tail = b"" 

33 self._upgraded = False 

34 self._parser: Optional[HttpResponseParser] = None 

35 

36 self._read_timeout: Optional[float] = None 

37 self._read_timeout_handle: Optional[asyncio.TimerHandle] = None 

38 

39 @property 

40 def upgraded(self) -> bool: 

41 return self._upgraded 

42 

43 @property 

44 def should_close(self) -> bool: 

45 if self._payload is not None and not self._payload.is_eof() or self._upgraded: 

46 return True 

47 

48 return ( 

49 self._should_close 

50 or self._upgraded 

51 or self.exception() is not None 

52 or self._payload_parser is not None 

53 or len(self) > 0 

54 or bool(self._tail) 

55 ) 

56 

57 def force_close(self) -> None: 

58 self._should_close = True 

59 

60 def close(self) -> None: 

61 transport = self.transport 

62 if transport is not None: 

63 transport.close() 

64 self.transport = None 

65 self._payload = None 

66 self._drop_timeout() 

67 

68 def is_connected(self) -> bool: 

69 return self.transport is not None and not self.transport.is_closing() 

70 

71 def connection_lost(self, exc: Optional[BaseException]) -> None: 

72 self._drop_timeout() 

73 

74 if self._payload_parser is not None: 

75 with suppress(Exception): 

76 self._payload_parser.feed_eof() 

77 

78 uncompleted = None 

79 if self._parser is not None: 

80 try: 

81 uncompleted = self._parser.feed_eof() 

82 except Exception: 

83 if self._payload is not None: 

84 self._payload.set_exception( 

85 ClientPayloadError("Response payload is not completed") 

86 ) 

87 

88 if not self.is_eof(): 

89 if isinstance(exc, OSError): 

90 exc = ClientOSError(*exc.args) 

91 if exc is None: 

92 exc = ServerDisconnectedError(uncompleted) 

93 # assigns self._should_close to True as side effect, 

94 # we do it anyway below 

95 self.set_exception(exc) 

96 

97 self._should_close = True 

98 self._parser = None 

99 self._payload = None 

100 self._payload_parser = None 

101 self._reading_paused = False 

102 

103 super().connection_lost(exc) 

104 

105 def eof_received(self) -> None: 

106 # should call parser.feed_eof() most likely 

107 self._drop_timeout() 

108 

109 def pause_reading(self) -> None: 

110 super().pause_reading() 

111 self._drop_timeout() 

112 

113 def resume_reading(self) -> None: 

114 super().resume_reading() 

115 self._reschedule_timeout() 

116 

117 def set_exception(self, exc: BaseException) -> None: 

118 self._should_close = True 

119 self._drop_timeout() 

120 super().set_exception(exc) 

121 

122 def set_parser(self, parser: Any, payload: Any) -> None: 

123 # TODO: actual types are: 

124 # parser: WebSocketReader 

125 # payload: FlowControlDataQueue 

126 # but they are not generi enough 

127 # Need an ABC for both types 

128 self._payload = payload 

129 self._payload_parser = parser 

130 

131 self._drop_timeout() 

132 

133 if self._tail: 

134 data, self._tail = self._tail, b"" 

135 self.data_received(data) 

136 

137 def set_response_params( 

138 self, 

139 *, 

140 timer: Optional[BaseTimerContext] = None, 

141 skip_payload: bool = False, 

142 read_until_eof: bool = False, 

143 auto_decompress: bool = True, 

144 read_timeout: Optional[float] = None, 

145 read_bufsize: int = 2**16, 

146 ) -> None: 

147 self._skip_payload = skip_payload 

148 

149 self._read_timeout = read_timeout 

150 self._reschedule_timeout() 

151 

152 self._parser = HttpResponseParser( 

153 self, 

154 self._loop, 

155 read_bufsize, 

156 timer=timer, 

157 payload_exception=ClientPayloadError, 

158 response_with_body=not skip_payload, 

159 read_until_eof=read_until_eof, 

160 auto_decompress=auto_decompress, 

161 ) 

162 

163 if self._tail: 

164 data, self._tail = self._tail, b"" 

165 self.data_received(data) 

166 

167 def _drop_timeout(self) -> None: 

168 if self._read_timeout_handle is not None: 

169 self._read_timeout_handle.cancel() 

170 self._read_timeout_handle = None 

171 

172 def _reschedule_timeout(self) -> None: 

173 timeout = self._read_timeout 

174 if self._read_timeout_handle is not None: 

175 self._read_timeout_handle.cancel() 

176 

177 if timeout: 

178 self._read_timeout_handle = self._loop.call_later( 

179 timeout, self._on_read_timeout 

180 ) 

181 else: 

182 self._read_timeout_handle = None 

183 

184 def _on_read_timeout(self) -> None: 

185 exc = ServerTimeoutError("Timeout on reading data from socket") 

186 self.set_exception(exc) 

187 if self._payload is not None: 

188 self._payload.set_exception(exc) 

189 

190 def data_received(self, data: bytes) -> None: 

191 self._reschedule_timeout() 

192 

193 if not data: 

194 return 

195 

196 # custom payload parser 

197 if self._payload_parser is not None: 

198 eof, tail = self._payload_parser.feed_data(data) 

199 if eof: 

200 self._payload = None 

201 self._payload_parser = None 

202 

203 if tail: 

204 self.data_received(tail) 

205 return 

206 else: 

207 if self._upgraded or self._parser is None: 

208 # i.e. websocket connection, websocket parser is not set yet 

209 self._tail += data 

210 else: 

211 # parse http messages 

212 try: 

213 messages, upgraded, tail = self._parser.feed_data(data) 

214 except BaseException as exc: 

215 if self.transport is not None: 

216 # connection.release() could be called BEFORE 

217 # data_received(), the transport is already 

218 # closed in this case 

219 self.transport.close() 

220 # should_close is True after the call 

221 self.set_exception(exc) 

222 return 

223 

224 self._upgraded = upgraded 

225 

226 payload: Optional[StreamReader] = None 

227 for message, payload in messages: 

228 if message.should_close: 

229 self._should_close = True 

230 

231 self._payload = payload 

232 

233 if self._skip_payload or message.code in (204, 304): 

234 self.feed_data((message, EMPTY_PAYLOAD), 0) 

235 else: 

236 self.feed_data((message, payload), 0) 

237 if payload is not None: 

238 # new message(s) was processed 

239 # register timeout handler unsubscribing 

240 # either on end-of-stream or immediately for 

241 # EMPTY_PAYLOAD 

242 if payload is not EMPTY_PAYLOAD: 

243 payload.on_eof(self._drop_timeout) 

244 else: 

245 self._drop_timeout() 

246 

247 if tail: 

248 if upgraded: 

249 self.data_received(tail) 

250 else: 

251 self._tail = tail