Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpcore/_sync/http2.py: 2%
236 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
1import enum
2import time
3import types
4import typing
6import h2.config
7import h2.connection
8import h2.events
9import h2.exceptions
10import h2.settings
12from .._exceptions import (
13 ConnectionNotAvailable,
14 LocalProtocolError,
15 RemoteProtocolError,
16)
17from .._models import Origin, Request, Response
18from .._synchronization import Lock, Semaphore
19from .._trace import Trace
20from ..backends.base import NetworkStream
21from .interfaces import ConnectionInterface
24def has_body_headers(request: Request) -> bool:
25 return any(
26 k.lower() == b"content-length" or k.lower() == b"transfer-encoding"
27 for k, v in request.headers
28 )
31class HTTPConnectionState(enum.IntEnum):
32 ACTIVE = 1
33 IDLE = 2
34 CLOSED = 3
37class HTTP2Connection(ConnectionInterface):
38 READ_NUM_BYTES = 64 * 1024
39 CONFIG = h2.config.H2Configuration(validate_inbound_headers=False)
41 def __init__(
42 self,
43 origin: Origin,
44 stream: NetworkStream,
45 keepalive_expiry: typing.Optional[float] = None,
46 ):
47 self._origin = origin
48 self._network_stream = stream
49 self._keepalive_expiry: typing.Optional[float] = keepalive_expiry
50 self._h2_state = h2.connection.H2Connection(config=self.CONFIG)
51 self._state = HTTPConnectionState.IDLE
52 self._expire_at: typing.Optional[float] = None
53 self._request_count = 0
54 self._init_lock = Lock()
55 self._state_lock = Lock()
56 self._read_lock = Lock()
57 self._write_lock = Lock()
58 self._sent_connection_init = False
59 self._used_all_stream_ids = False
60 self._connection_error = False
61 self._events: typing.Dict[int, h2.events.Event] = {}
62 self._read_exception: typing.Optional[Exception] = None
63 self._write_exception: typing.Optional[Exception] = None
64 self._connection_error_event: typing.Optional[h2.events.Event] = None
66 def handle_request(self, request: Request) -> Response:
67 if not self.can_handle_request(request.url.origin):
68 # This cannot occur in normal operation, since the connection pool
69 # will only send requests on connections that handle them.
70 # It's in place simply for resilience as a guard against incorrect
71 # usage, for anyone working directly with httpcore connections.
72 raise RuntimeError(
73 f"Attempted to send request to {request.url.origin} on connection "
74 f"to {self._origin}"
75 )
77 with self._state_lock:
78 if self._state in (HTTPConnectionState.ACTIVE, HTTPConnectionState.IDLE):
79 self._request_count += 1
80 self._expire_at = None
81 self._state = HTTPConnectionState.ACTIVE
82 else:
83 raise ConnectionNotAvailable()
85 with self._init_lock:
86 if not self._sent_connection_init:
87 kwargs = {"request": request}
88 with Trace("http2.send_connection_init", request, kwargs):
89 self._send_connection_init(**kwargs)
90 self._sent_connection_init = True
91 max_streams = self._h2_state.local_settings.max_concurrent_streams
92 self._max_streams_semaphore = Semaphore(max_streams)
94 self._max_streams_semaphore.acquire()
96 try:
97 stream_id = self._h2_state.get_next_available_stream_id()
98 self._events[stream_id] = []
99 except h2.exceptions.NoAvailableStreamIDError: # pragma: nocover
100 self._used_all_stream_ids = True
101 raise ConnectionNotAvailable()
103 try:
104 kwargs = {"request": request, "stream_id": stream_id}
105 with Trace("http2.send_request_headers", request, kwargs):
106 self._send_request_headers(request=request, stream_id=stream_id)
107 with Trace("http2.send_request_body", request, kwargs):
108 self._send_request_body(request=request, stream_id=stream_id)
109 with Trace(
110 "http2.receive_response_headers", request, kwargs
111 ) as trace:
112 status, headers = self._receive_response(
113 request=request, stream_id=stream_id
114 )
115 trace.return_value = (status, headers)
117 return Response(
118 status=status,
119 headers=headers,
120 content=HTTP2ConnectionByteStream(self, request, stream_id=stream_id),
121 extensions={"stream_id": stream_id, "http_version": b"HTTP/2"},
122 )
123 except Exception as exc: # noqa: PIE786
124 kwargs = {"stream_id": stream_id}
125 with Trace("http2.response_closed", request, kwargs):
126 self._response_closed(stream_id=stream_id)
128 if isinstance(exc, h2.exceptions.ProtocolError):
129 # One case where h2 can raise a protocol error is when a
130 # closed frame has been seen by the state machine.
131 #
132 # This happens when one stream is reading, and encounters
133 # a GOAWAY event. Other flows of control may then raise
134 # a protocol error at any point they interact with the 'h2_state'.
135 #
136 # In this case we'll have stored the event, and should raise
137 # it as a RemoteProtocolError.
138 if self._connection_error_event:
139 raise RemoteProtocolError(self._connection_error_event)
140 # If h2 raises a protocol error in some other state then we
141 # must somehow have made a protocol violation.
142 raise LocalProtocolError(exc) # pragma: nocover
144 raise exc
146 def _send_connection_init(self, request: Request) -> None:
147 """
148 The HTTP/2 connection requires some initial setup before we can start
149 using individual request/response streams on it.
150 """
151 # Need to set these manually here instead of manipulating via
152 # __setitem__() otherwise the H2Connection will emit SettingsUpdate
153 # frames in addition to sending the undesired defaults.
154 self._h2_state.local_settings = h2.settings.Settings(
155 client=True,
156 initial_values={
157 # Disable PUSH_PROMISE frames from the server since we don't do anything
158 # with them for now. Maybe when we support caching?
159 h2.settings.SettingCodes.ENABLE_PUSH: 0,
160 # These two are taken from h2 for safe defaults
161 h2.settings.SettingCodes.MAX_CONCURRENT_STREAMS: 100,
162 h2.settings.SettingCodes.MAX_HEADER_LIST_SIZE: 65536,
163 },
164 )
166 # Some websites (*cough* Yahoo *cough*) balk at this setting being
167 # present in the initial handshake since it's not defined in the original
168 # RFC despite the RFC mandating ignoring settings you don't know about.
169 del self._h2_state.local_settings[
170 h2.settings.SettingCodes.ENABLE_CONNECT_PROTOCOL
171 ]
173 self._h2_state.initiate_connection()
174 self._h2_state.increment_flow_control_window(2**24)
175 self._write_outgoing_data(request)
177 # Sending the request...
179 def _send_request_headers(self, request: Request, stream_id: int) -> None:
180 end_stream = not has_body_headers(request)
182 # In HTTP/2 the ':authority' pseudo-header is used instead of 'Host'.
183 # In order to gracefully handle HTTP/1.1 and HTTP/2 we always require
184 # HTTP/1.1 style headers, and map them appropriately if we end up on
185 # an HTTP/2 connection.
186 authority = [v for k, v in request.headers if k.lower() == b"host"][0]
188 headers = [
189 (b":method", request.method),
190 (b":authority", authority),
191 (b":scheme", request.url.scheme),
192 (b":path", request.url.target),
193 ] + [
194 (k.lower(), v)
195 for k, v in request.headers
196 if k.lower()
197 not in (
198 b"host",
199 b"transfer-encoding",
200 )
201 ]
203 self._h2_state.send_headers(stream_id, headers, end_stream=end_stream)
204 self._h2_state.increment_flow_control_window(2**24, stream_id=stream_id)
205 self._write_outgoing_data(request)
207 def _send_request_body(self, request: Request, stream_id: int) -> None:
208 if not has_body_headers(request):
209 return
211 assert isinstance(request.stream, typing.Iterable)
212 for data in request.stream:
213 while data:
214 max_flow = self._wait_for_outgoing_flow(request, stream_id)
215 chunk_size = min(len(data), max_flow)
216 chunk, data = data[:chunk_size], data[chunk_size:]
217 self._h2_state.send_data(stream_id, chunk)
218 self._write_outgoing_data(request)
220 self._h2_state.end_stream(stream_id)
221 self._write_outgoing_data(request)
223 # Receiving the response...
225 def _receive_response(
226 self, request: Request, stream_id: int
227 ) -> typing.Tuple[int, typing.List[typing.Tuple[bytes, bytes]]]:
228 while True:
229 event = self._receive_stream_event(request, stream_id)
230 if isinstance(event, h2.events.ResponseReceived):
231 break
233 status_code = 200
234 headers = []
235 for k, v in event.headers:
236 if k == b":status":
237 status_code = int(v.decode("ascii", errors="ignore"))
238 elif not k.startswith(b":"):
239 headers.append((k, v))
241 return (status_code, headers)
243 def _receive_response_body(
244 self, request: Request, stream_id: int
245 ) -> typing.Iterator[bytes]:
246 while True:
247 event = self._receive_stream_event(request, stream_id)
248 if isinstance(event, h2.events.DataReceived):
249 amount = event.flow_controlled_length
250 self._h2_state.acknowledge_received_data(amount, stream_id)
251 self._write_outgoing_data(request)
252 yield event.data
253 elif isinstance(event, (h2.events.StreamEnded, h2.events.StreamReset)):
254 break
256 def _receive_stream_event(
257 self, request: Request, stream_id: int
258 ) -> h2.events.Event:
259 while not self._events.get(stream_id):
260 self._receive_events(request, stream_id)
261 event = self._events[stream_id].pop(0)
262 # The StreamReset event applies to a single stream.
263 if hasattr(event, "error_code"):
264 raise RemoteProtocolError(event)
265 return event
267 def _receive_events(
268 self, request: Request, stream_id: typing.Optional[int] = None
269 ) -> None:
270 with self._read_lock:
271 if self._connection_error_event is not None: # pragma: nocover
272 raise RemoteProtocolError(self._connection_error_event)
274 # This conditional is a bit icky. We don't want to block reading if we've
275 # actually got an event to return for a given stream. We need to do that
276 # check *within* the atomic read lock. Though it also need to be optional,
277 # because when we call it from `_wait_for_outgoing_flow` we *do* want to
278 # block until we've available flow control, event when we have events
279 # pending for the stream ID we're attempting to send on.
280 if stream_id is None or not self._events.get(stream_id):
281 events = self._read_incoming_data(request)
282 for event in events:
283 event_stream_id = getattr(event, "stream_id", 0)
285 # The ConnectionTerminatedEvent applies to the entire connection,
286 # and should be saved so it can be raised on all streams.
287 if hasattr(event, "error_code") and event_stream_id == 0:
288 self._connection_error_event = event
289 raise RemoteProtocolError(event)
291 if event_stream_id in self._events:
292 self._events[event_stream_id].append(event)
294 self._write_outgoing_data(request)
296 def _response_closed(self, stream_id: int) -> None:
297 self._max_streams_semaphore.release()
298 del self._events[stream_id]
299 with self._state_lock:
300 if self._state == HTTPConnectionState.ACTIVE and not self._events:
301 self._state = HTTPConnectionState.IDLE
302 if self._keepalive_expiry is not None:
303 now = time.monotonic()
304 self._expire_at = now + self._keepalive_expiry
305 if self._used_all_stream_ids: # pragma: nocover
306 self.close()
308 def close(self) -> None:
309 # Note that this method unilaterally closes the connection, and does
310 # not have any kind of locking in place around it.
311 self._h2_state.close_connection()
312 self._state = HTTPConnectionState.CLOSED
313 self._network_stream.close()
315 # Wrappers around network read/write operations...
317 def _read_incoming_data(
318 self, request: Request
319 ) -> typing.List[h2.events.Event]:
320 timeouts = request.extensions.get("timeout", {})
321 timeout = timeouts.get("read", None)
323 if self._read_exception is not None:
324 raise self._read_exception # pragma: nocover
326 try:
327 data = self._network_stream.read(self.READ_NUM_BYTES, timeout)
328 if data == b"":
329 raise RemoteProtocolError("Server disconnected")
330 except Exception as exc:
331 # If we get a network error we should:
332 #
333 # 1. Save the exception and just raise it immediately on any future reads.
334 # (For example, this means that a single read timeout or disconnect will
335 # immediately close all pending streams. Without requiring multiple
336 # sequential timeouts.)
337 # 2. Mark the connection as errored, so that we don't accept any other
338 # incoming requests.
339 self._read_exception = exc
340 self._connection_error = True
341 raise exc
343 events: typing.List[h2.events.Event] = self._h2_state.receive_data(data)
345 return events
347 def _write_outgoing_data(self, request: Request) -> None:
348 timeouts = request.extensions.get("timeout", {})
349 timeout = timeouts.get("write", None)
351 with self._write_lock:
352 data_to_send = self._h2_state.data_to_send()
354 if self._write_exception is not None:
355 raise self._write_exception # pragma: nocover
357 try:
358 self._network_stream.write(data_to_send, timeout)
359 except Exception as exc: # pragma: nocover
360 # If we get a network error we should:
361 #
362 # 1. Save the exception and just raise it immediately on any future write.
363 # (For example, this means that a single write timeout or disconnect will
364 # immediately close all pending streams. Without requiring multiple
365 # sequential timeouts.)
366 # 2. Mark the connection as errored, so that we don't accept any other
367 # incoming requests.
368 self._write_exception = exc
369 self._connection_error = True
370 raise exc
372 # Flow control...
374 def _wait_for_outgoing_flow(self, request: Request, stream_id: int) -> int:
375 """
376 Returns the maximum allowable outgoing flow for a given stream.
378 If the allowable flow is zero, then waits on the network until
379 WindowUpdated frames have increased the flow rate.
380 https://tools.ietf.org/html/rfc7540#section-6.9
381 """
382 local_flow: int = self._h2_state.local_flow_control_window(stream_id)
383 max_frame_size: int = self._h2_state.max_outbound_frame_size
384 flow = min(local_flow, max_frame_size)
385 while flow == 0:
386 self._receive_events(request)
387 local_flow = self._h2_state.local_flow_control_window(stream_id)
388 max_frame_size = self._h2_state.max_outbound_frame_size
389 flow = min(local_flow, max_frame_size)
390 return flow
392 # Interface for connection pooling...
394 def can_handle_request(self, origin: Origin) -> bool:
395 return origin == self._origin
397 def is_available(self) -> bool:
398 return (
399 self._state != HTTPConnectionState.CLOSED
400 and not self._connection_error
401 and not self._used_all_stream_ids
402 )
404 def has_expired(self) -> bool:
405 now = time.monotonic()
406 return self._expire_at is not None and now > self._expire_at
408 def is_idle(self) -> bool:
409 return self._state == HTTPConnectionState.IDLE
411 def is_closed(self) -> bool:
412 return self._state == HTTPConnectionState.CLOSED
414 def info(self) -> str:
415 origin = str(self._origin)
416 return (
417 f"{origin!r}, HTTP/2, {self._state.name}, "
418 f"Request Count: {self._request_count}"
419 )
421 def __repr__(self) -> str:
422 class_name = self.__class__.__name__
423 origin = str(self._origin)
424 return (
425 f"<{class_name} [{origin!r}, {self._state.name}, "
426 f"Request Count: {self._request_count}]>"
427 )
429 # These context managers are not used in the standard flow, but are
430 # useful for testing or working with connection instances directly.
432 def __enter__(self) -> "HTTP2Connection":
433 return self
435 def __exit__(
436 self,
437 exc_type: typing.Optional[typing.Type[BaseException]] = None,
438 exc_value: typing.Optional[BaseException] = None,
439 traceback: typing.Optional[types.TracebackType] = None,
440 ) -> None:
441 self.close()
444class HTTP2ConnectionByteStream:
445 def __init__(
446 self, connection: HTTP2Connection, request: Request, stream_id: int
447 ) -> None:
448 self._connection = connection
449 self._request = request
450 self._stream_id = stream_id
451 self._closed = False
453 def __iter__(self) -> typing.Iterator[bytes]:
454 kwargs = {"request": self._request, "stream_id": self._stream_id}
455 try:
456 with Trace("http2.receive_response_body", self._request, kwargs):
457 for chunk in self._connection._receive_response_body(
458 request=self._request, stream_id=self._stream_id
459 ):
460 yield chunk
461 except BaseException as exc:
462 # If we get an exception while streaming the response,
463 # we want to close the response (and possibly the connection)
464 # before raising that exception.
465 self.close()
466 raise exc
468 def close(self) -> None:
469 if not self._closed:
470 self._closed = True
471 kwargs = {"stream_id": self._stream_id}
472 with Trace("http2.response_closed", self._request, kwargs):
473 self._connection._response_closed(stream_id=self._stream_id)