Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpcore/_async/http11.py: 28%

161 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-25 06:38 +0000

1import enum 

2import logging 

3import time 

4from types import TracebackType 

5from typing import ( 

6 AsyncIterable, 

7 AsyncIterator, 

8 List, 

9 Optional, 

10 Tuple, 

11 Type, 

12 Union, 

13 cast, 

14) 

15 

16import h11 

17 

18from .._backends.base import AsyncNetworkStream 

19from .._exceptions import ( 

20 ConnectionNotAvailable, 

21 LocalProtocolError, 

22 RemoteProtocolError, 

23 WriteError, 

24 map_exceptions, 

25) 

26from .._models import Origin, Request, Response 

27from .._synchronization import AsyncLock, AsyncShieldCancellation 

28from .._trace import Trace 

29from .interfaces import AsyncConnectionInterface 

30 

31logger = logging.getLogger("httpcore.http11") 

32 

33 

34# A subset of `h11.Event` types supported by `_send_event` 

35H11SendEvent = Union[ 

36 h11.Request, 

37 h11.Data, 

38 h11.EndOfMessage, 

39] 

40 

41 

42class HTTPConnectionState(enum.IntEnum): 

43 NEW = 0 

44 ACTIVE = 1 

45 IDLE = 2 

46 CLOSED = 3 

47 

48 

49class AsyncHTTP11Connection(AsyncConnectionInterface): 

50 READ_NUM_BYTES = 64 * 1024 

51 MAX_INCOMPLETE_EVENT_SIZE = 100 * 1024 

52 

53 def __init__( 

54 self, 

55 origin: Origin, 

56 stream: AsyncNetworkStream, 

57 keepalive_expiry: Optional[float] = None, 

58 ) -> None: 

59 self._origin = origin 

60 self._network_stream = stream 

61 self._keepalive_expiry: Optional[float] = keepalive_expiry 

62 self._expire_at: Optional[float] = None 

63 self._state = HTTPConnectionState.NEW 

64 self._state_lock = AsyncLock() 

65 self._request_count = 0 

66 self._h11_state = h11.Connection( 

67 our_role=h11.CLIENT, 

68 max_incomplete_event_size=self.MAX_INCOMPLETE_EVENT_SIZE, 

69 ) 

70 

71 async def handle_async_request(self, request: Request) -> Response: 

72 if not self.can_handle_request(request.url.origin): 

73 raise RuntimeError( 

74 f"Attempted to send request to {request.url.origin} on connection " 

75 f"to {self._origin}" 

76 ) 

77 

78 async with self._state_lock: 

79 if self._state in (HTTPConnectionState.NEW, HTTPConnectionState.IDLE): 

80 self._request_count += 1 

81 self._state = HTTPConnectionState.ACTIVE 

82 self._expire_at = None 

83 else: 

84 raise ConnectionNotAvailable() 

85 

86 try: 

87 kwargs = {"request": request} 

88 try: 

89 async with Trace( 

90 "send_request_headers", logger, request, kwargs 

91 ) as trace: 

92 await self._send_request_headers(**kwargs) 

93 async with Trace("send_request_body", logger, request, kwargs) as trace: 

94 await self._send_request_body(**kwargs) 

95 except WriteError: 

96 # If we get a write error while we're writing the request, 

97 # then we supress this error and move on to attempting to 

98 # read the response. Servers can sometimes close the request 

99 # pre-emptively and then respond with a well formed HTTP 

100 # error response. 

101 pass 

102 

103 async with Trace( 

104 "receive_response_headers", logger, request, kwargs 

105 ) as trace: 

106 ( 

107 http_version, 

108 status, 

109 reason_phrase, 

110 headers, 

111 ) = await self._receive_response_headers(**kwargs) 

112 trace.return_value = ( 

113 http_version, 

114 status, 

115 reason_phrase, 

116 headers, 

117 ) 

118 

119 return Response( 

120 status=status, 

121 headers=headers, 

122 content=HTTP11ConnectionByteStream(self, request), 

123 extensions={ 

124 "http_version": http_version, 

125 "reason_phrase": reason_phrase, 

126 "network_stream": self._network_stream, 

127 }, 

128 ) 

129 except BaseException as exc: 

130 with AsyncShieldCancellation(): 

131 async with Trace("response_closed", logger, request) as trace: 

132 await self._response_closed() 

133 raise exc 

134 

135 # Sending the request... 

136 

137 async def _send_request_headers(self, request: Request) -> None: 

138 timeouts = request.extensions.get("timeout", {}) 

139 timeout = timeouts.get("write", None) 

140 

141 with map_exceptions({h11.LocalProtocolError: LocalProtocolError}): 

142 event = h11.Request( 

143 method=request.method, 

144 target=request.url.target, 

145 headers=request.headers, 

146 ) 

147 await self._send_event(event, timeout=timeout) 

148 

149 async def _send_request_body(self, request: Request) -> None: 

150 timeouts = request.extensions.get("timeout", {}) 

151 timeout = timeouts.get("write", None) 

152 

153 assert isinstance(request.stream, AsyncIterable) 

154 async for chunk in request.stream: 

155 event = h11.Data(data=chunk) 

156 await self._send_event(event, timeout=timeout) 

157 

158 await self._send_event(h11.EndOfMessage(), timeout=timeout) 

159 

160 async def _send_event( 

161 self, event: h11.Event, timeout: Optional[float] = None 

162 ) -> None: 

163 bytes_to_send = self._h11_state.send(event) 

164 if bytes_to_send is not None: 

165 await self._network_stream.write(bytes_to_send, timeout=timeout) 

166 

167 # Receiving the response... 

168 

169 async def _receive_response_headers( 

170 self, request: Request 

171 ) -> Tuple[bytes, int, bytes, List[Tuple[bytes, bytes]]]: 

172 timeouts = request.extensions.get("timeout", {}) 

173 timeout = timeouts.get("read", None) 

174 

175 while True: 

176 event = await self._receive_event(timeout=timeout) 

177 if isinstance(event, h11.Response): 

178 break 

179 if ( 

180 isinstance(event, h11.InformationalResponse) 

181 and event.status_code == 101 

182 ): 

183 break 

184 

185 http_version = b"HTTP/" + event.http_version 

186 

187 # h11 version 0.11+ supports a `raw_items` interface to get the 

188 # raw header casing, rather than the enforced lowercase headers. 

189 headers = event.headers.raw_items() 

190 

191 return http_version, event.status_code, event.reason, headers 

192 

193 async def _receive_response_body(self, request: Request) -> AsyncIterator[bytes]: 

194 timeouts = request.extensions.get("timeout", {}) 

195 timeout = timeouts.get("read", None) 

196 

197 while True: 

198 event = await self._receive_event(timeout=timeout) 

199 if isinstance(event, h11.Data): 

200 yield bytes(event.data) 

201 elif isinstance(event, (h11.EndOfMessage, h11.PAUSED)): 

202 break 

203 

204 async def _receive_event( 

205 self, timeout: Optional[float] = None 

206 ) -> Union[h11.Event, Type[h11.PAUSED]]: 

207 while True: 

208 with map_exceptions({h11.RemoteProtocolError: RemoteProtocolError}): 

209 event = self._h11_state.next_event() 

210 

211 if event is h11.NEED_DATA: 

212 data = await self._network_stream.read( 

213 self.READ_NUM_BYTES, timeout=timeout 

214 ) 

215 

216 # If we feed this case through h11 we'll raise an exception like: 

217 # 

218 # httpcore.RemoteProtocolError: can't handle event type 

219 # ConnectionClosed when role=SERVER and state=SEND_RESPONSE 

220 # 

221 # Which is accurate, but not very informative from an end-user 

222 # perspective. Instead we handle this case distinctly and treat 

223 # it as a ConnectError. 

224 if data == b"" and self._h11_state.their_state == h11.SEND_RESPONSE: 

225 msg = "Server disconnected without sending a response." 

226 raise RemoteProtocolError(msg) 

227 

228 self._h11_state.receive_data(data) 

229 else: 

230 # mypy fails to narrow the type in the above if statement above 

231 return cast(Union[h11.Event, Type[h11.PAUSED]], event) 

232 

233 async def _response_closed(self) -> None: 

234 async with self._state_lock: 

235 if ( 

236 self._h11_state.our_state is h11.DONE 

237 and self._h11_state.their_state is h11.DONE 

238 ): 

239 self._state = HTTPConnectionState.IDLE 

240 self._h11_state.start_next_cycle() 

241 if self._keepalive_expiry is not None: 

242 now = time.monotonic() 

243 self._expire_at = now + self._keepalive_expiry 

244 else: 

245 await self.aclose() 

246 

247 # Once the connection is no longer required... 

248 

249 async def aclose(self) -> None: 

250 # Note that this method unilaterally closes the connection, and does 

251 # not have any kind of locking in place around it. 

252 self._state = HTTPConnectionState.CLOSED 

253 await self._network_stream.aclose() 

254 

255 # The AsyncConnectionInterface methods provide information about the state of 

256 # the connection, allowing for a connection pooling implementation to 

257 # determine when to reuse and when to close the connection... 

258 

259 def can_handle_request(self, origin: Origin) -> bool: 

260 return origin == self._origin 

261 

262 def is_available(self) -> bool: 

263 # Note that HTTP/1.1 connections in the "NEW" state are not treated as 

264 # being "available". The control flow which created the connection will 

265 # be able to send an outgoing request, but the connection will not be 

266 # acquired from the connection pool for any other request. 

267 return self._state == HTTPConnectionState.IDLE 

268 

269 def has_expired(self) -> bool: 

270 now = time.monotonic() 

271 keepalive_expired = self._expire_at is not None and now > self._expire_at 

272 

273 # If the HTTP connection is idle but the socket is readable, then the 

274 # only valid state is that the socket is about to return b"", indicating 

275 # a server-initiated disconnect. 

276 server_disconnected = ( 

277 self._state == HTTPConnectionState.IDLE 

278 and self._network_stream.get_extra_info("is_readable") 

279 ) 

280 

281 return keepalive_expired or server_disconnected 

282 

283 def is_idle(self) -> bool: 

284 return self._state == HTTPConnectionState.IDLE 

285 

286 def is_closed(self) -> bool: 

287 return self._state == HTTPConnectionState.CLOSED 

288 

289 def info(self) -> str: 

290 origin = str(self._origin) 

291 return ( 

292 f"{origin!r}, HTTP/1.1, {self._state.name}, " 

293 f"Request Count: {self._request_count}" 

294 ) 

295 

296 def __repr__(self) -> str: 

297 class_name = self.__class__.__name__ 

298 origin = str(self._origin) 

299 return ( 

300 f"<{class_name} [{origin!r}, {self._state.name}, " 

301 f"Request Count: {self._request_count}]>" 

302 ) 

303 

304 # These context managers are not used in the standard flow, but are 

305 # useful for testing or working with connection instances directly. 

306 

307 async def __aenter__(self) -> "AsyncHTTP11Connection": 

308 return self 

309 

310 async def __aexit__( 

311 self, 

312 exc_type: Optional[Type[BaseException]] = None, 

313 exc_value: Optional[BaseException] = None, 

314 traceback: Optional[TracebackType] = None, 

315 ) -> None: 

316 await self.aclose() 

317 

318 

319class HTTP11ConnectionByteStream: 

320 def __init__(self, connection: AsyncHTTP11Connection, request: Request) -> None: 

321 self._connection = connection 

322 self._request = request 

323 self._closed = False 

324 

325 async def __aiter__(self) -> AsyncIterator[bytes]: 

326 kwargs = {"request": self._request} 

327 try: 

328 async with Trace("receive_response_body", logger, self._request, kwargs): 

329 async for chunk in self._connection._receive_response_body(**kwargs): 

330 yield chunk 

331 except BaseException as exc: 

332 # If we get an exception while streaming the response, 

333 # we want to close the response (and possibly the connection) 

334 # before raising that exception. 

335 with AsyncShieldCancellation(): 

336 await self.aclose() 

337 raise exc 

338 

339 async def aclose(self) -> None: 

340 if not self._closed: 

341 self._closed = True 

342 async with Trace("response_closed", logger, self._request): 

343 await self._connection._response_closed()