Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpcore/_async/http2.py: 2%

236 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:12 +0000

1import enum 

2import time 

3import types 

4import typing 

5 

6import h2.config 

7import h2.connection 

8import h2.events 

9import h2.exceptions 

10import h2.settings 

11 

12from .._exceptions import ( 

13 ConnectionNotAvailable, 

14 LocalProtocolError, 

15 RemoteProtocolError, 

16) 

17from .._models import Origin, Request, Response 

18from .._synchronization import AsyncLock, AsyncSemaphore 

19from .._trace import Trace 

20from ..backends.base import AsyncNetworkStream 

21from .interfaces import AsyncConnectionInterface 

22 

23 

24def has_body_headers(request: Request) -> bool: 

25 return any( 

26 k.lower() == b"content-length" or k.lower() == b"transfer-encoding" 

27 for k, v in request.headers 

28 ) 

29 

30 

31class HTTPConnectionState(enum.IntEnum): 

32 ACTIVE = 1 

33 IDLE = 2 

34 CLOSED = 3 

35 

36 

37class AsyncHTTP2Connection(AsyncConnectionInterface): 

38 READ_NUM_BYTES = 64 * 1024 

39 CONFIG = h2.config.H2Configuration(validate_inbound_headers=False) 

40 

41 def __init__( 

42 self, 

43 origin: Origin, 

44 stream: AsyncNetworkStream, 

45 keepalive_expiry: typing.Optional[float] = None, 

46 ): 

47 self._origin = origin 

48 self._network_stream = stream 

49 self._keepalive_expiry: typing.Optional[float] = keepalive_expiry 

50 self._h2_state = h2.connection.H2Connection(config=self.CONFIG) 

51 self._state = HTTPConnectionState.IDLE 

52 self._expire_at: typing.Optional[float] = None 

53 self._request_count = 0 

54 self._init_lock = AsyncLock() 

55 self._state_lock = AsyncLock() 

56 self._read_lock = AsyncLock() 

57 self._write_lock = AsyncLock() 

58 self._sent_connection_init = False 

59 self._used_all_stream_ids = False 

60 self._connection_error = False 

61 self._events: typing.Dict[int, h2.events.Event] = {} 

62 self._read_exception: typing.Optional[Exception] = None 

63 self._write_exception: typing.Optional[Exception] = None 

64 self._connection_error_event: typing.Optional[h2.events.Event] = None 

65 

66 async def handle_async_request(self, request: Request) -> Response: 

67 if not self.can_handle_request(request.url.origin): 

68 # This cannot occur in normal operation, since the connection pool 

69 # will only send requests on connections that handle them. 

70 # It's in place simply for resilience as a guard against incorrect 

71 # usage, for anyone working directly with httpcore connections. 

72 raise RuntimeError( 

73 f"Attempted to send request to {request.url.origin} on connection " 

74 f"to {self._origin}" 

75 ) 

76 

77 async with self._state_lock: 

78 if self._state in (HTTPConnectionState.ACTIVE, HTTPConnectionState.IDLE): 

79 self._request_count += 1 

80 self._expire_at = None 

81 self._state = HTTPConnectionState.ACTIVE 

82 else: 

83 raise ConnectionNotAvailable() 

84 

85 async with self._init_lock: 

86 if not self._sent_connection_init: 

87 kwargs = {"request": request} 

88 async with Trace("http2.send_connection_init", request, kwargs): 

89 await self._send_connection_init(**kwargs) 

90 self._sent_connection_init = True 

91 max_streams = self._h2_state.local_settings.max_concurrent_streams 

92 self._max_streams_semaphore = AsyncSemaphore(max_streams) 

93 

94 await self._max_streams_semaphore.acquire() 

95 

96 try: 

97 stream_id = self._h2_state.get_next_available_stream_id() 

98 self._events[stream_id] = [] 

99 except h2.exceptions.NoAvailableStreamIDError: # pragma: nocover 

100 self._used_all_stream_ids = True 

101 raise ConnectionNotAvailable() 

102 

103 try: 

104 kwargs = {"request": request, "stream_id": stream_id} 

105 async with Trace("http2.send_request_headers", request, kwargs): 

106 await self._send_request_headers(request=request, stream_id=stream_id) 

107 async with Trace("http2.send_request_body", request, kwargs): 

108 await self._send_request_body(request=request, stream_id=stream_id) 

109 async with Trace( 

110 "http2.receive_response_headers", request, kwargs 

111 ) as trace: 

112 status, headers = await self._receive_response( 

113 request=request, stream_id=stream_id 

114 ) 

115 trace.return_value = (status, headers) 

116 

117 return Response( 

118 status=status, 

119 headers=headers, 

120 content=HTTP2ConnectionByteStream(self, request, stream_id=stream_id), 

121 extensions={"stream_id": stream_id, "http_version": b"HTTP/2"}, 

122 ) 

123 except Exception as exc: # noqa: PIE786 

124 kwargs = {"stream_id": stream_id} 

125 async with Trace("http2.response_closed", request, kwargs): 

126 await self._response_closed(stream_id=stream_id) 

127 

128 if isinstance(exc, h2.exceptions.ProtocolError): 

129 # One case where h2 can raise a protocol error is when a 

130 # closed frame has been seen by the state machine. 

131 # 

132 # This happens when one stream is reading, and encounters 

133 # a GOAWAY event. Other flows of control may then raise 

134 # a protocol error at any point they interact with the 'h2_state'. 

135 # 

136 # In this case we'll have stored the event, and should raise 

137 # it as a RemoteProtocolError. 

138 if self._connection_error_event: 

139 raise RemoteProtocolError(self._connection_error_event) 

140 # If h2 raises a protocol error in some other state then we 

141 # must somehow have made a protocol violation. 

142 raise LocalProtocolError(exc) # pragma: nocover 

143 

144 raise exc 

145 

146 async def _send_connection_init(self, request: Request) -> None: 

147 """ 

148 The HTTP/2 connection requires some initial setup before we can start 

149 using individual request/response streams on it. 

150 """ 

151 # Need to set these manually here instead of manipulating via 

152 # __setitem__() otherwise the H2Connection will emit SettingsUpdate 

153 # frames in addition to sending the undesired defaults. 

154 self._h2_state.local_settings = h2.settings.Settings( 

155 client=True, 

156 initial_values={ 

157 # Disable PUSH_PROMISE frames from the server since we don't do anything 

158 # with them for now. Maybe when we support caching? 

159 h2.settings.SettingCodes.ENABLE_PUSH: 0, 

160 # These two are taken from h2 for safe defaults 

161 h2.settings.SettingCodes.MAX_CONCURRENT_STREAMS: 100, 

162 h2.settings.SettingCodes.MAX_HEADER_LIST_SIZE: 65536, 

163 }, 

164 ) 

165 

166 # Some websites (*cough* Yahoo *cough*) balk at this setting being 

167 # present in the initial handshake since it's not defined in the original 

168 # RFC despite the RFC mandating ignoring settings you don't know about. 

169 del self._h2_state.local_settings[ 

170 h2.settings.SettingCodes.ENABLE_CONNECT_PROTOCOL 

171 ] 

172 

173 self._h2_state.initiate_connection() 

174 self._h2_state.increment_flow_control_window(2**24) 

175 await self._write_outgoing_data(request) 

176 

177 # Sending the request... 

178 

179 async def _send_request_headers(self, request: Request, stream_id: int) -> None: 

180 end_stream = not has_body_headers(request) 

181 

182 # In HTTP/2 the ':authority' pseudo-header is used instead of 'Host'. 

183 # In order to gracefully handle HTTP/1.1 and HTTP/2 we always require 

184 # HTTP/1.1 style headers, and map them appropriately if we end up on 

185 # an HTTP/2 connection. 

186 authority = [v for k, v in request.headers if k.lower() == b"host"][0] 

187 

188 headers = [ 

189 (b":method", request.method), 

190 (b":authority", authority), 

191 (b":scheme", request.url.scheme), 

192 (b":path", request.url.target), 

193 ] + [ 

194 (k.lower(), v) 

195 for k, v in request.headers 

196 if k.lower() 

197 not in ( 

198 b"host", 

199 b"transfer-encoding", 

200 ) 

201 ] 

202 

203 self._h2_state.send_headers(stream_id, headers, end_stream=end_stream) 

204 self._h2_state.increment_flow_control_window(2**24, stream_id=stream_id) 

205 await self._write_outgoing_data(request) 

206 

207 async def _send_request_body(self, request: Request, stream_id: int) -> None: 

208 if not has_body_headers(request): 

209 return 

210 

211 assert isinstance(request.stream, typing.AsyncIterable) 

212 async for data in request.stream: 

213 while data: 

214 max_flow = await self._wait_for_outgoing_flow(request, stream_id) 

215 chunk_size = min(len(data), max_flow) 

216 chunk, data = data[:chunk_size], data[chunk_size:] 

217 self._h2_state.send_data(stream_id, chunk) 

218 await self._write_outgoing_data(request) 

219 

220 self._h2_state.end_stream(stream_id) 

221 await self._write_outgoing_data(request) 

222 

223 # Receiving the response... 

224 

225 async def _receive_response( 

226 self, request: Request, stream_id: int 

227 ) -> typing.Tuple[int, typing.List[typing.Tuple[bytes, bytes]]]: 

228 while True: 

229 event = await self._receive_stream_event(request, stream_id) 

230 if isinstance(event, h2.events.ResponseReceived): 

231 break 

232 

233 status_code = 200 

234 headers = [] 

235 for k, v in event.headers: 

236 if k == b":status": 

237 status_code = int(v.decode("ascii", errors="ignore")) 

238 elif not k.startswith(b":"): 

239 headers.append((k, v)) 

240 

241 return (status_code, headers) 

242 

243 async def _receive_response_body( 

244 self, request: Request, stream_id: int 

245 ) -> typing.AsyncIterator[bytes]: 

246 while True: 

247 event = await self._receive_stream_event(request, stream_id) 

248 if isinstance(event, h2.events.DataReceived): 

249 amount = event.flow_controlled_length 

250 self._h2_state.acknowledge_received_data(amount, stream_id) 

251 await self._write_outgoing_data(request) 

252 yield event.data 

253 elif isinstance(event, (h2.events.StreamEnded, h2.events.StreamReset)): 

254 break 

255 

256 async def _receive_stream_event( 

257 self, request: Request, stream_id: int 

258 ) -> h2.events.Event: 

259 while not self._events.get(stream_id): 

260 await self._receive_events(request, stream_id) 

261 event = self._events[stream_id].pop(0) 

262 # The StreamReset event applies to a single stream. 

263 if hasattr(event, "error_code"): 

264 raise RemoteProtocolError(event) 

265 return event 

266 

267 async def _receive_events( 

268 self, request: Request, stream_id: typing.Optional[int] = None 

269 ) -> None: 

270 async with self._read_lock: 

271 if self._connection_error_event is not None: # pragma: nocover 

272 raise RemoteProtocolError(self._connection_error_event) 

273 

274 # This conditional is a bit icky. We don't want to block reading if we've 

275 # actually got an event to return for a given stream. We need to do that 

276 # check *within* the atomic read lock. Though it also need to be optional, 

277 # because when we call it from `_wait_for_outgoing_flow` we *do* want to 

278 # block until we've available flow control, event when we have events 

279 # pending for the stream ID we're attempting to send on. 

280 if stream_id is None or not self._events.get(stream_id): 

281 events = await self._read_incoming_data(request) 

282 for event in events: 

283 event_stream_id = getattr(event, "stream_id", 0) 

284 

285 # The ConnectionTerminatedEvent applies to the entire connection, 

286 # and should be saved so it can be raised on all streams. 

287 if hasattr(event, "error_code") and event_stream_id == 0: 

288 self._connection_error_event = event 

289 raise RemoteProtocolError(event) 

290 

291 if event_stream_id in self._events: 

292 self._events[event_stream_id].append(event) 

293 

294 await self._write_outgoing_data(request) 

295 

296 async def _response_closed(self, stream_id: int) -> None: 

297 await self._max_streams_semaphore.release() 

298 del self._events[stream_id] 

299 async with self._state_lock: 

300 if self._state == HTTPConnectionState.ACTIVE and not self._events: 

301 self._state = HTTPConnectionState.IDLE 

302 if self._keepalive_expiry is not None: 

303 now = time.monotonic() 

304 self._expire_at = now + self._keepalive_expiry 

305 if self._used_all_stream_ids: # pragma: nocover 

306 await self.aclose() 

307 

308 async def aclose(self) -> None: 

309 # Note that this method unilaterally closes the connection, and does 

310 # not have any kind of locking in place around it. 

311 self._h2_state.close_connection() 

312 self._state = HTTPConnectionState.CLOSED 

313 await self._network_stream.aclose() 

314 

315 # Wrappers around network read/write operations... 

316 

317 async def _read_incoming_data( 

318 self, request: Request 

319 ) -> typing.List[h2.events.Event]: 

320 timeouts = request.extensions.get("timeout", {}) 

321 timeout = timeouts.get("read", None) 

322 

323 if self._read_exception is not None: 

324 raise self._read_exception # pragma: nocover 

325 

326 try: 

327 data = await self._network_stream.read(self.READ_NUM_BYTES, timeout) 

328 if data == b"": 

329 raise RemoteProtocolError("Server disconnected") 

330 except Exception as exc: 

331 # If we get a network error we should: 

332 # 

333 # 1. Save the exception and just raise it immediately on any future reads. 

334 # (For example, this means that a single read timeout or disconnect will 

335 # immediately close all pending streams. Without requiring multiple 

336 # sequential timeouts.) 

337 # 2. Mark the connection as errored, so that we don't accept any other 

338 # incoming requests. 

339 self._read_exception = exc 

340 self._connection_error = True 

341 raise exc 

342 

343 events: typing.List[h2.events.Event] = self._h2_state.receive_data(data) 

344 

345 return events 

346 

347 async def _write_outgoing_data(self, request: Request) -> None: 

348 timeouts = request.extensions.get("timeout", {}) 

349 timeout = timeouts.get("write", None) 

350 

351 async with self._write_lock: 

352 data_to_send = self._h2_state.data_to_send() 

353 

354 if self._write_exception is not None: 

355 raise self._write_exception # pragma: nocover 

356 

357 try: 

358 await self._network_stream.write(data_to_send, timeout) 

359 except Exception as exc: # pragma: nocover 

360 # If we get a network error we should: 

361 # 

362 # 1. Save the exception and just raise it immediately on any future write. 

363 # (For example, this means that a single write timeout or disconnect will 

364 # immediately close all pending streams. Without requiring multiple 

365 # sequential timeouts.) 

366 # 2. Mark the connection as errored, so that we don't accept any other 

367 # incoming requests. 

368 self._write_exception = exc 

369 self._connection_error = True 

370 raise exc 

371 

372 # Flow control... 

373 

374 async def _wait_for_outgoing_flow(self, request: Request, stream_id: int) -> int: 

375 """ 

376 Returns the maximum allowable outgoing flow for a given stream. 

377 

378 If the allowable flow is zero, then waits on the network until 

379 WindowUpdated frames have increased the flow rate. 

380 https://tools.ietf.org/html/rfc7540#section-6.9 

381 """ 

382 local_flow: int = self._h2_state.local_flow_control_window(stream_id) 

383 max_frame_size: int = self._h2_state.max_outbound_frame_size 

384 flow = min(local_flow, max_frame_size) 

385 while flow == 0: 

386 await self._receive_events(request) 

387 local_flow = self._h2_state.local_flow_control_window(stream_id) 

388 max_frame_size = self._h2_state.max_outbound_frame_size 

389 flow = min(local_flow, max_frame_size) 

390 return flow 

391 

392 # Interface for connection pooling... 

393 

394 def can_handle_request(self, origin: Origin) -> bool: 

395 return origin == self._origin 

396 

397 def is_available(self) -> bool: 

398 return ( 

399 self._state != HTTPConnectionState.CLOSED 

400 and not self._connection_error 

401 and not self._used_all_stream_ids 

402 ) 

403 

404 def has_expired(self) -> bool: 

405 now = time.monotonic() 

406 return self._expire_at is not None and now > self._expire_at 

407 

408 def is_idle(self) -> bool: 

409 return self._state == HTTPConnectionState.IDLE 

410 

411 def is_closed(self) -> bool: 

412 return self._state == HTTPConnectionState.CLOSED 

413 

414 def info(self) -> str: 

415 origin = str(self._origin) 

416 return ( 

417 f"{origin!r}, HTTP/2, {self._state.name}, " 

418 f"Request Count: {self._request_count}" 

419 ) 

420 

421 def __repr__(self) -> str: 

422 class_name = self.__class__.__name__ 

423 origin = str(self._origin) 

424 return ( 

425 f"<{class_name} [{origin!r}, {self._state.name}, " 

426 f"Request Count: {self._request_count}]>" 

427 ) 

428 

429 # These context managers are not used in the standard flow, but are 

430 # useful for testing or working with connection instances directly. 

431 

432 async def __aenter__(self) -> "AsyncHTTP2Connection": 

433 return self 

434 

435 async def __aexit__( 

436 self, 

437 exc_type: typing.Optional[typing.Type[BaseException]] = None, 

438 exc_value: typing.Optional[BaseException] = None, 

439 traceback: typing.Optional[types.TracebackType] = None, 

440 ) -> None: 

441 await self.aclose() 

442 

443 

444class HTTP2ConnectionByteStream: 

445 def __init__( 

446 self, connection: AsyncHTTP2Connection, request: Request, stream_id: int 

447 ) -> None: 

448 self._connection = connection 

449 self._request = request 

450 self._stream_id = stream_id 

451 self._closed = False 

452 

453 async def __aiter__(self) -> typing.AsyncIterator[bytes]: 

454 kwargs = {"request": self._request, "stream_id": self._stream_id} 

455 try: 

456 async with Trace("http2.receive_response_body", self._request, kwargs): 

457 async for chunk in self._connection._receive_response_body( 

458 request=self._request, stream_id=self._stream_id 

459 ): 

460 yield chunk 

461 except BaseException as exc: 

462 # If we get an exception while streaming the response, 

463 # we want to close the response (and possibly the connection) 

464 # before raising that exception. 

465 await self.aclose() 

466 raise exc 

467 

468 async def aclose(self) -> None: 

469 if not self._closed: 

470 self._closed = True 

471 kwargs = {"stream_id": self._stream_id} 

472 async with Trace("http2.response_closed", self._request, kwargs): 

473 await self._connection._response_closed(stream_id=self._stream_id)