Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpcore/_async/http11.py: 27%

153 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:12 +0000

1import enum 

2import time 

3from types import TracebackType 

4from typing import ( 

5 AsyncIterable, 

6 AsyncIterator, 

7 List, 

8 Optional, 

9 Tuple, 

10 Type, 

11 Union, 

12 cast, 

13) 

14 

15import h11 

16 

17from .._exceptions import ( 

18 ConnectionNotAvailable, 

19 LocalProtocolError, 

20 RemoteProtocolError, 

21 map_exceptions, 

22) 

23from .._models import Origin, Request, Response 

24from .._synchronization import AsyncLock 

25from .._trace import Trace 

26from ..backends.base import AsyncNetworkStream 

27from .interfaces import AsyncConnectionInterface 

28 

29# A subset of `h11.Event` types supported by `_send_event` 

30H11SendEvent = Union[ 

31 h11.Request, 

32 h11.Data, 

33 h11.EndOfMessage, 

34] 

35 

36 

37class HTTPConnectionState(enum.IntEnum): 

38 NEW = 0 

39 ACTIVE = 1 

40 IDLE = 2 

41 CLOSED = 3 

42 

43 

44class AsyncHTTP11Connection(AsyncConnectionInterface): 

45 READ_NUM_BYTES = 64 * 1024 

46 

47 def __init__( 

48 self, 

49 origin: Origin, 

50 stream: AsyncNetworkStream, 

51 keepalive_expiry: Optional[float] = None, 

52 ) -> None: 

53 self._origin = origin 

54 self._network_stream = stream 

55 self._keepalive_expiry: Optional[float] = keepalive_expiry 

56 self._expire_at: Optional[float] = None 

57 self._state = HTTPConnectionState.NEW 

58 self._state_lock = AsyncLock() 

59 self._request_count = 0 

60 self._h11_state = h11.Connection(our_role=h11.CLIENT) 

61 

62 async def handle_async_request(self, request: Request) -> Response: 

63 if not self.can_handle_request(request.url.origin): 

64 raise RuntimeError( 

65 f"Attempted to send request to {request.url.origin} on connection " 

66 f"to {self._origin}" 

67 ) 

68 

69 async with self._state_lock: 

70 if self._state in (HTTPConnectionState.NEW, HTTPConnectionState.IDLE): 

71 self._request_count += 1 

72 self._state = HTTPConnectionState.ACTIVE 

73 self._expire_at = None 

74 else: 

75 raise ConnectionNotAvailable() 

76 

77 try: 

78 kwargs = {"request": request} 

79 async with Trace("http11.send_request_headers", request, kwargs) as trace: 

80 await self._send_request_headers(**kwargs) 

81 async with Trace("http11.send_request_body", request, kwargs) as trace: 

82 await self._send_request_body(**kwargs) 

83 async with Trace( 

84 "http11.receive_response_headers", request, kwargs 

85 ) as trace: 

86 ( 

87 http_version, 

88 status, 

89 reason_phrase, 

90 headers, 

91 ) = await self._receive_response_headers(**kwargs) 

92 trace.return_value = ( 

93 http_version, 

94 status, 

95 reason_phrase, 

96 headers, 

97 ) 

98 

99 return Response( 

100 status=status, 

101 headers=headers, 

102 content=HTTP11ConnectionByteStream(self, request), 

103 extensions={ 

104 "http_version": http_version, 

105 "reason_phrase": reason_phrase, 

106 "network_stream": self._network_stream, 

107 }, 

108 ) 

109 except BaseException as exc: 

110 async with Trace("http11.response_closed", request) as trace: 

111 await self._response_closed() 

112 raise exc 

113 

114 # Sending the request... 

115 

116 async def _send_request_headers(self, request: Request) -> None: 

117 timeouts = request.extensions.get("timeout", {}) 

118 timeout = timeouts.get("write", None) 

119 

120 with map_exceptions({h11.LocalProtocolError: LocalProtocolError}): 

121 event = h11.Request( 

122 method=request.method, 

123 target=request.url.target, 

124 headers=request.headers, 

125 ) 

126 await self._send_event(event, timeout=timeout) 

127 

128 async def _send_request_body(self, request: Request) -> None: 

129 timeouts = request.extensions.get("timeout", {}) 

130 timeout = timeouts.get("write", None) 

131 

132 assert isinstance(request.stream, AsyncIterable) 

133 async for chunk in request.stream: 

134 event = h11.Data(data=chunk) 

135 await self._send_event(event, timeout=timeout) 

136 

137 await self._send_event(h11.EndOfMessage(), timeout=timeout) 

138 

139 async def _send_event( 

140 self, event: h11.Event, timeout: Optional[float] = None 

141 ) -> None: 

142 bytes_to_send = self._h11_state.send(event) 

143 if bytes_to_send is not None: 

144 await self._network_stream.write(bytes_to_send, timeout=timeout) 

145 

146 # Receiving the response... 

147 

148 async def _receive_response_headers( 

149 self, request: Request 

150 ) -> Tuple[bytes, int, bytes, List[Tuple[bytes, bytes]]]: 

151 timeouts = request.extensions.get("timeout", {}) 

152 timeout = timeouts.get("read", None) 

153 

154 while True: 

155 event = await self._receive_event(timeout=timeout) 

156 if isinstance(event, h11.Response): 

157 break 

158 if ( 

159 isinstance(event, h11.InformationalResponse) 

160 and event.status_code == 101 

161 ): 

162 break 

163 

164 http_version = b"HTTP/" + event.http_version 

165 

166 # h11 version 0.11+ supports a `raw_items` interface to get the 

167 # raw header casing, rather than the enforced lowercase headers. 

168 headers = event.headers.raw_items() 

169 

170 return http_version, event.status_code, event.reason, headers 

171 

172 async def _receive_response_body(self, request: Request) -> AsyncIterator[bytes]: 

173 timeouts = request.extensions.get("timeout", {}) 

174 timeout = timeouts.get("read", None) 

175 

176 while True: 

177 event = await self._receive_event(timeout=timeout) 

178 if isinstance(event, h11.Data): 

179 yield bytes(event.data) 

180 elif isinstance(event, (h11.EndOfMessage, h11.PAUSED)): 

181 break 

182 

183 async def _receive_event( 

184 self, timeout: Optional[float] = None 

185 ) -> Union[h11.Event, Type[h11.PAUSED]]: 

186 while True: 

187 with map_exceptions({h11.RemoteProtocolError: RemoteProtocolError}): 

188 event = self._h11_state.next_event() 

189 

190 if event is h11.NEED_DATA: 

191 data = await self._network_stream.read( 

192 self.READ_NUM_BYTES, timeout=timeout 

193 ) 

194 

195 # If we feed this case through h11 we'll raise an exception like: 

196 # 

197 # httpcore.RemoteProtocolError: can't handle event type 

198 # ConnectionClosed when role=SERVER and state=SEND_RESPONSE 

199 # 

200 # Which is accurate, but not very informative from an end-user 

201 # perspective. Instead we handle this case distinctly and treat 

202 # it as a ConnectError. 

203 if data == b"" and self._h11_state.their_state == h11.SEND_RESPONSE: 

204 msg = "Server disconnected without sending a response." 

205 raise RemoteProtocolError(msg) 

206 

207 self._h11_state.receive_data(data) 

208 else: 

209 # mypy fails to narrow the type in the above if statement above 

210 return cast(Union[h11.Event, Type[h11.PAUSED]], event) 

211 

212 async def _response_closed(self) -> None: 

213 async with self._state_lock: 

214 if ( 

215 self._h11_state.our_state is h11.DONE 

216 and self._h11_state.their_state is h11.DONE 

217 ): 

218 self._state = HTTPConnectionState.IDLE 

219 self._h11_state.start_next_cycle() 

220 if self._keepalive_expiry is not None: 

221 now = time.monotonic() 

222 self._expire_at = now + self._keepalive_expiry 

223 else: 

224 await self.aclose() 

225 

226 # Once the connection is no longer required... 

227 

228 async def aclose(self) -> None: 

229 # Note that this method unilaterally closes the connection, and does 

230 # not have any kind of locking in place around it. 

231 self._state = HTTPConnectionState.CLOSED 

232 await self._network_stream.aclose() 

233 

234 # The AsyncConnectionInterface methods provide information about the state of 

235 # the connection, allowing for a connection pooling implementation to 

236 # determine when to reuse and when to close the connection... 

237 

238 def can_handle_request(self, origin: Origin) -> bool: 

239 return origin == self._origin 

240 

241 def is_available(self) -> bool: 

242 # Note that HTTP/1.1 connections in the "NEW" state are not treated as 

243 # being "available". The control flow which created the connection will 

244 # be able to send an outgoing request, but the connection will not be 

245 # acquired from the connection pool for any other request. 

246 return self._state == HTTPConnectionState.IDLE 

247 

248 def has_expired(self) -> bool: 

249 now = time.monotonic() 

250 keepalive_expired = self._expire_at is not None and now > self._expire_at 

251 

252 # If the HTTP connection is idle but the socket is readable, then the 

253 # only valid state is that the socket is about to return b"", indicating 

254 # a server-initiated disconnect. 

255 server_disconnected = ( 

256 self._state == HTTPConnectionState.IDLE 

257 and self._network_stream.get_extra_info("is_readable") 

258 ) 

259 

260 return keepalive_expired or server_disconnected 

261 

262 def is_idle(self) -> bool: 

263 return self._state == HTTPConnectionState.IDLE 

264 

265 def is_closed(self) -> bool: 

266 return self._state == HTTPConnectionState.CLOSED 

267 

268 def info(self) -> str: 

269 origin = str(self._origin) 

270 return ( 

271 f"{origin!r}, HTTP/1.1, {self._state.name}, " 

272 f"Request Count: {self._request_count}" 

273 ) 

274 

275 def __repr__(self) -> str: 

276 class_name = self.__class__.__name__ 

277 origin = str(self._origin) 

278 return ( 

279 f"<{class_name} [{origin!r}, {self._state.name}, " 

280 f"Request Count: {self._request_count}]>" 

281 ) 

282 

283 # These context managers are not used in the standard flow, but are 

284 # useful for testing or working with connection instances directly. 

285 

286 async def __aenter__(self) -> "AsyncHTTP11Connection": 

287 return self 

288 

289 async def __aexit__( 

290 self, 

291 exc_type: Optional[Type[BaseException]] = None, 

292 exc_value: Optional[BaseException] = None, 

293 traceback: Optional[TracebackType] = None, 

294 ) -> None: 

295 await self.aclose() 

296 

297 

298class HTTP11ConnectionByteStream: 

299 def __init__(self, connection: AsyncHTTP11Connection, request: Request) -> None: 

300 self._connection = connection 

301 self._request = request 

302 self._closed = False 

303 

304 async def __aiter__(self) -> AsyncIterator[bytes]: 

305 kwargs = {"request": self._request} 

306 try: 

307 async with Trace("http11.receive_response_body", self._request, kwargs): 

308 async for chunk in self._connection._receive_response_body(**kwargs): 

309 yield chunk 

310 except BaseException as exc: 

311 # If we get an exception while streaming the response, 

312 # we want to close the response (and possibly the connection) 

313 # before raising that exception. 

314 await self.aclose() 

315 raise exc 

316 

317 async def aclose(self) -> None: 

318 if not self._closed: 

319 self._closed = True 

320 async with Trace("http11.response_closed", self._request): 

321 await self._connection._response_closed()