Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpcore/_async/http11.py: 29%

156 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 07:19 +0000

1import enum 

2import logging 

3import time 

4from types import TracebackType 

5from typing import ( 

6 AsyncIterable, 

7 AsyncIterator, 

8 List, 

9 Optional, 

10 Tuple, 

11 Type, 

12 Union, 

13 cast, 

14) 

15 

16import h11 

17 

18from .._exceptions import ( 

19 ConnectionNotAvailable, 

20 LocalProtocolError, 

21 RemoteProtocolError, 

22 map_exceptions, 

23) 

24from .._models import Origin, Request, Response 

25from .._synchronization import AsyncLock 

26from .._trace import Trace 

27from ..backends.base import AsyncNetworkStream 

28from .interfaces import AsyncConnectionInterface 

29 

30logger = logging.getLogger("httpcore.http11") 

31 

32 

33# A subset of `h11.Event` types supported by `_send_event` 

34H11SendEvent = Union[ 

35 h11.Request, 

36 h11.Data, 

37 h11.EndOfMessage, 

38] 

39 

40 

41class HTTPConnectionState(enum.IntEnum): 

42 NEW = 0 

43 ACTIVE = 1 

44 IDLE = 2 

45 CLOSED = 3 

46 

47 

48class AsyncHTTP11Connection(AsyncConnectionInterface): 

49 READ_NUM_BYTES = 64 * 1024 

50 MAX_INCOMPLETE_EVENT_SIZE = 100 * 1024 

51 

52 def __init__( 

53 self, 

54 origin: Origin, 

55 stream: AsyncNetworkStream, 

56 keepalive_expiry: Optional[float] = None, 

57 ) -> None: 

58 self._origin = origin 

59 self._network_stream = stream 

60 self._keepalive_expiry: Optional[float] = keepalive_expiry 

61 self._expire_at: Optional[float] = None 

62 self._state = HTTPConnectionState.NEW 

63 self._state_lock = AsyncLock() 

64 self._request_count = 0 

65 self._h11_state = h11.Connection( 

66 our_role=h11.CLIENT, 

67 max_incomplete_event_size=self.MAX_INCOMPLETE_EVENT_SIZE, 

68 ) 

69 

70 async def handle_async_request(self, request: Request) -> Response: 

71 if not self.can_handle_request(request.url.origin): 

72 raise RuntimeError( 

73 f"Attempted to send request to {request.url.origin} on connection " 

74 f"to {self._origin}" 

75 ) 

76 

77 async with self._state_lock: 

78 if self._state in (HTTPConnectionState.NEW, HTTPConnectionState.IDLE): 

79 self._request_count += 1 

80 self._state = HTTPConnectionState.ACTIVE 

81 self._expire_at = None 

82 else: 

83 raise ConnectionNotAvailable() 

84 

85 try: 

86 kwargs = {"request": request} 

87 async with Trace("send_request_headers", logger, request, kwargs) as trace: 

88 await self._send_request_headers(**kwargs) 

89 async with Trace("send_request_body", logger, request, kwargs) as trace: 

90 await self._send_request_body(**kwargs) 

91 async with Trace( 

92 "receive_response_headers", logger, request, kwargs 

93 ) as trace: 

94 ( 

95 http_version, 

96 status, 

97 reason_phrase, 

98 headers, 

99 ) = await self._receive_response_headers(**kwargs) 

100 trace.return_value = ( 

101 http_version, 

102 status, 

103 reason_phrase, 

104 headers, 

105 ) 

106 

107 return Response( 

108 status=status, 

109 headers=headers, 

110 content=HTTP11ConnectionByteStream(self, request), 

111 extensions={ 

112 "http_version": http_version, 

113 "reason_phrase": reason_phrase, 

114 "network_stream": self._network_stream, 

115 }, 

116 ) 

117 except BaseException as exc: 

118 async with Trace("response_closed", logger, request) as trace: 

119 await self._response_closed() 

120 raise exc 

121 

122 # Sending the request... 

123 

124 async def _send_request_headers(self, request: Request) -> None: 

125 timeouts = request.extensions.get("timeout", {}) 

126 timeout = timeouts.get("write", None) 

127 

128 with map_exceptions({h11.LocalProtocolError: LocalProtocolError}): 

129 event = h11.Request( 

130 method=request.method, 

131 target=request.url.target, 

132 headers=request.headers, 

133 ) 

134 await self._send_event(event, timeout=timeout) 

135 

136 async def _send_request_body(self, request: Request) -> None: 

137 timeouts = request.extensions.get("timeout", {}) 

138 timeout = timeouts.get("write", None) 

139 

140 assert isinstance(request.stream, AsyncIterable) 

141 async for chunk in request.stream: 

142 event = h11.Data(data=chunk) 

143 await self._send_event(event, timeout=timeout) 

144 

145 await self._send_event(h11.EndOfMessage(), timeout=timeout) 

146 

147 async def _send_event( 

148 self, event: h11.Event, timeout: Optional[float] = None 

149 ) -> None: 

150 bytes_to_send = self._h11_state.send(event) 

151 if bytes_to_send is not None: 

152 await self._network_stream.write(bytes_to_send, timeout=timeout) 

153 

154 # Receiving the response... 

155 

156 async def _receive_response_headers( 

157 self, request: Request 

158 ) -> Tuple[bytes, int, bytes, List[Tuple[bytes, bytes]]]: 

159 timeouts = request.extensions.get("timeout", {}) 

160 timeout = timeouts.get("read", None) 

161 

162 while True: 

163 event = await self._receive_event(timeout=timeout) 

164 if isinstance(event, h11.Response): 

165 break 

166 if ( 

167 isinstance(event, h11.InformationalResponse) 

168 and event.status_code == 101 

169 ): 

170 break 

171 

172 http_version = b"HTTP/" + event.http_version 

173 

174 # h11 version 0.11+ supports a `raw_items` interface to get the 

175 # raw header casing, rather than the enforced lowercase headers. 

176 headers = event.headers.raw_items() 

177 

178 return http_version, event.status_code, event.reason, headers 

179 

180 async def _receive_response_body(self, request: Request) -> AsyncIterator[bytes]: 

181 timeouts = request.extensions.get("timeout", {}) 

182 timeout = timeouts.get("read", None) 

183 

184 while True: 

185 event = await self._receive_event(timeout=timeout) 

186 if isinstance(event, h11.Data): 

187 yield bytes(event.data) 

188 elif isinstance(event, (h11.EndOfMessage, h11.PAUSED)): 

189 break 

190 

191 async def _receive_event( 

192 self, timeout: Optional[float] = None 

193 ) -> Union[h11.Event, Type[h11.PAUSED]]: 

194 while True: 

195 with map_exceptions({h11.RemoteProtocolError: RemoteProtocolError}): 

196 event = self._h11_state.next_event() 

197 

198 if event is h11.NEED_DATA: 

199 data = await self._network_stream.read( 

200 self.READ_NUM_BYTES, timeout=timeout 

201 ) 

202 

203 # If we feed this case through h11 we'll raise an exception like: 

204 # 

205 # httpcore.RemoteProtocolError: can't handle event type 

206 # ConnectionClosed when role=SERVER and state=SEND_RESPONSE 

207 # 

208 # Which is accurate, but not very informative from an end-user 

209 # perspective. Instead we handle this case distinctly and treat 

210 # it as a ConnectError. 

211 if data == b"" and self._h11_state.their_state == h11.SEND_RESPONSE: 

212 msg = "Server disconnected without sending a response." 

213 raise RemoteProtocolError(msg) 

214 

215 self._h11_state.receive_data(data) 

216 else: 

217 # mypy fails to narrow the type in the above if statement above 

218 return cast(Union[h11.Event, Type[h11.PAUSED]], event) 

219 

220 async def _response_closed(self) -> None: 

221 async with self._state_lock: 

222 if ( 

223 self._h11_state.our_state is h11.DONE 

224 and self._h11_state.their_state is h11.DONE 

225 ): 

226 self._state = HTTPConnectionState.IDLE 

227 self._h11_state.start_next_cycle() 

228 if self._keepalive_expiry is not None: 

229 now = time.monotonic() 

230 self._expire_at = now + self._keepalive_expiry 

231 else: 

232 await self.aclose() 

233 

234 # Once the connection is no longer required... 

235 

236 async def aclose(self) -> None: 

237 # Note that this method unilaterally closes the connection, and does 

238 # not have any kind of locking in place around it. 

239 self._state = HTTPConnectionState.CLOSED 

240 await self._network_stream.aclose() 

241 

242 # The AsyncConnectionInterface methods provide information about the state of 

243 # the connection, allowing for a connection pooling implementation to 

244 # determine when to reuse and when to close the connection... 

245 

246 def can_handle_request(self, origin: Origin) -> bool: 

247 return origin == self._origin 

248 

249 def is_available(self) -> bool: 

250 # Note that HTTP/1.1 connections in the "NEW" state are not treated as 

251 # being "available". The control flow which created the connection will 

252 # be able to send an outgoing request, but the connection will not be 

253 # acquired from the connection pool for any other request. 

254 return self._state == HTTPConnectionState.IDLE 

255 

256 def has_expired(self) -> bool: 

257 now = time.monotonic() 

258 keepalive_expired = self._expire_at is not None and now > self._expire_at 

259 

260 # If the HTTP connection is idle but the socket is readable, then the 

261 # only valid state is that the socket is about to return b"", indicating 

262 # a server-initiated disconnect. 

263 server_disconnected = ( 

264 self._state == HTTPConnectionState.IDLE 

265 and self._network_stream.get_extra_info("is_readable") 

266 ) 

267 

268 return keepalive_expired or server_disconnected 

269 

270 def is_idle(self) -> bool: 

271 return self._state == HTTPConnectionState.IDLE 

272 

273 def is_closed(self) -> bool: 

274 return self._state == HTTPConnectionState.CLOSED 

275 

276 def info(self) -> str: 

277 origin = str(self._origin) 

278 return ( 

279 f"{origin!r}, HTTP/1.1, {self._state.name}, " 

280 f"Request Count: {self._request_count}" 

281 ) 

282 

283 def __repr__(self) -> str: 

284 class_name = self.__class__.__name__ 

285 origin = str(self._origin) 

286 return ( 

287 f"<{class_name} [{origin!r}, {self._state.name}, " 

288 f"Request Count: {self._request_count}]>" 

289 ) 

290 

291 # These context managers are not used in the standard flow, but are 

292 # useful for testing or working with connection instances directly. 

293 

294 async def __aenter__(self) -> "AsyncHTTP11Connection": 

295 return self 

296 

297 async def __aexit__( 

298 self, 

299 exc_type: Optional[Type[BaseException]] = None, 

300 exc_value: Optional[BaseException] = None, 

301 traceback: Optional[TracebackType] = None, 

302 ) -> None: 

303 await self.aclose() 

304 

305 

306class HTTP11ConnectionByteStream: 

307 def __init__(self, connection: AsyncHTTP11Connection, request: Request) -> None: 

308 self._connection = connection 

309 self._request = request 

310 self._closed = False 

311 

312 async def __aiter__(self) -> AsyncIterator[bytes]: 

313 kwargs = {"request": self._request} 

314 try: 

315 async with Trace("receive_response_body", logger, self._request, kwargs): 

316 async for chunk in self._connection._receive_response_body(**kwargs): 

317 yield chunk 

318 except BaseException as exc: 

319 # If we get an exception while streaming the response, 

320 # we want to close the response (and possibly the connection) 

321 # before raising that exception. 

322 await self.aclose() 

323 raise exc 

324 

325 async def aclose(self) -> None: 

326 if not self._closed: 

327 self._closed = True 

328 async with Trace("response_closed", logger, self._request): 

329 await self._connection._response_closed()