Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpcore/_sync/http11.py: 29%
156 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:19 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:19 +0000
1import enum
2import logging
3import time
4from types import TracebackType
5from typing import (
6 Iterable,
7 Iterator,
8 List,
9 Optional,
10 Tuple,
11 Type,
12 Union,
13 cast,
14)
16import h11
18from .._exceptions import (
19 ConnectionNotAvailable,
20 LocalProtocolError,
21 RemoteProtocolError,
22 map_exceptions,
23)
24from .._models import Origin, Request, Response
25from .._synchronization import Lock
26from .._trace import Trace
27from ..backends.base import NetworkStream
28from .interfaces import ConnectionInterface
30logger = logging.getLogger("httpcore.http11")
33# A subset of `h11.Event` types supported by `_send_event`
34H11SendEvent = Union[
35 h11.Request,
36 h11.Data,
37 h11.EndOfMessage,
38]
41class HTTPConnectionState(enum.IntEnum):
42 NEW = 0
43 ACTIVE = 1
44 IDLE = 2
45 CLOSED = 3
48class HTTP11Connection(ConnectionInterface):
49 READ_NUM_BYTES = 64 * 1024
50 MAX_INCOMPLETE_EVENT_SIZE = 100 * 1024
52 def __init__(
53 self,
54 origin: Origin,
55 stream: NetworkStream,
56 keepalive_expiry: Optional[float] = None,
57 ) -> None:
58 self._origin = origin
59 self._network_stream = stream
60 self._keepalive_expiry: Optional[float] = keepalive_expiry
61 self._expire_at: Optional[float] = None
62 self._state = HTTPConnectionState.NEW
63 self._state_lock = Lock()
64 self._request_count = 0
65 self._h11_state = h11.Connection(
66 our_role=h11.CLIENT,
67 max_incomplete_event_size=self.MAX_INCOMPLETE_EVENT_SIZE,
68 )
70 def handle_request(self, request: Request) -> Response:
71 if not self.can_handle_request(request.url.origin):
72 raise RuntimeError(
73 f"Attempted to send request to {request.url.origin} on connection "
74 f"to {self._origin}"
75 )
77 with self._state_lock:
78 if self._state in (HTTPConnectionState.NEW, HTTPConnectionState.IDLE):
79 self._request_count += 1
80 self._state = HTTPConnectionState.ACTIVE
81 self._expire_at = None
82 else:
83 raise ConnectionNotAvailable()
85 try:
86 kwargs = {"request": request}
87 with Trace("send_request_headers", logger, request, kwargs) as trace:
88 self._send_request_headers(**kwargs)
89 with Trace("send_request_body", logger, request, kwargs) as trace:
90 self._send_request_body(**kwargs)
91 with Trace(
92 "receive_response_headers", logger, request, kwargs
93 ) as trace:
94 (
95 http_version,
96 status,
97 reason_phrase,
98 headers,
99 ) = self._receive_response_headers(**kwargs)
100 trace.return_value = (
101 http_version,
102 status,
103 reason_phrase,
104 headers,
105 )
107 return Response(
108 status=status,
109 headers=headers,
110 content=HTTP11ConnectionByteStream(self, request),
111 extensions={
112 "http_version": http_version,
113 "reason_phrase": reason_phrase,
114 "network_stream": self._network_stream,
115 },
116 )
117 except BaseException as exc:
118 with Trace("response_closed", logger, request) as trace:
119 self._response_closed()
120 raise exc
122 # Sending the request...
124 def _send_request_headers(self, request: Request) -> None:
125 timeouts = request.extensions.get("timeout", {})
126 timeout = timeouts.get("write", None)
128 with map_exceptions({h11.LocalProtocolError: LocalProtocolError}):
129 event = h11.Request(
130 method=request.method,
131 target=request.url.target,
132 headers=request.headers,
133 )
134 self._send_event(event, timeout=timeout)
136 def _send_request_body(self, request: Request) -> None:
137 timeouts = request.extensions.get("timeout", {})
138 timeout = timeouts.get("write", None)
140 assert isinstance(request.stream, Iterable)
141 for chunk in request.stream:
142 event = h11.Data(data=chunk)
143 self._send_event(event, timeout=timeout)
145 self._send_event(h11.EndOfMessage(), timeout=timeout)
147 def _send_event(
148 self, event: h11.Event, timeout: Optional[float] = None
149 ) -> None:
150 bytes_to_send = self._h11_state.send(event)
151 if bytes_to_send is not None:
152 self._network_stream.write(bytes_to_send, timeout=timeout)
154 # Receiving the response...
156 def _receive_response_headers(
157 self, request: Request
158 ) -> Tuple[bytes, int, bytes, List[Tuple[bytes, bytes]]]:
159 timeouts = request.extensions.get("timeout", {})
160 timeout = timeouts.get("read", None)
162 while True:
163 event = self._receive_event(timeout=timeout)
164 if isinstance(event, h11.Response):
165 break
166 if (
167 isinstance(event, h11.InformationalResponse)
168 and event.status_code == 101
169 ):
170 break
172 http_version = b"HTTP/" + event.http_version
174 # h11 version 0.11+ supports a `raw_items` interface to get the
175 # raw header casing, rather than the enforced lowercase headers.
176 headers = event.headers.raw_items()
178 return http_version, event.status_code, event.reason, headers
180 def _receive_response_body(self, request: Request) -> Iterator[bytes]:
181 timeouts = request.extensions.get("timeout", {})
182 timeout = timeouts.get("read", None)
184 while True:
185 event = self._receive_event(timeout=timeout)
186 if isinstance(event, h11.Data):
187 yield bytes(event.data)
188 elif isinstance(event, (h11.EndOfMessage, h11.PAUSED)):
189 break
191 def _receive_event(
192 self, timeout: Optional[float] = None
193 ) -> Union[h11.Event, Type[h11.PAUSED]]:
194 while True:
195 with map_exceptions({h11.RemoteProtocolError: RemoteProtocolError}):
196 event = self._h11_state.next_event()
198 if event is h11.NEED_DATA:
199 data = self._network_stream.read(
200 self.READ_NUM_BYTES, timeout=timeout
201 )
203 # If we feed this case through h11 we'll raise an exception like:
204 #
205 # httpcore.RemoteProtocolError: can't handle event type
206 # ConnectionClosed when role=SERVER and state=SEND_RESPONSE
207 #
208 # Which is accurate, but not very informative from an end-user
209 # perspective. Instead we handle this case distinctly and treat
210 # it as a ConnectError.
211 if data == b"" and self._h11_state.their_state == h11.SEND_RESPONSE:
212 msg = "Server disconnected without sending a response."
213 raise RemoteProtocolError(msg)
215 self._h11_state.receive_data(data)
216 else:
217 # mypy fails to narrow the type in the above if statement above
218 return cast(Union[h11.Event, Type[h11.PAUSED]], event)
220 def _response_closed(self) -> None:
221 with self._state_lock:
222 if (
223 self._h11_state.our_state is h11.DONE
224 and self._h11_state.their_state is h11.DONE
225 ):
226 self._state = HTTPConnectionState.IDLE
227 self._h11_state.start_next_cycle()
228 if self._keepalive_expiry is not None:
229 now = time.monotonic()
230 self._expire_at = now + self._keepalive_expiry
231 else:
232 self.close()
234 # Once the connection is no longer required...
236 def close(self) -> None:
237 # Note that this method unilaterally closes the connection, and does
238 # not have any kind of locking in place around it.
239 self._state = HTTPConnectionState.CLOSED
240 self._network_stream.close()
242 # The ConnectionInterface methods provide information about the state of
243 # the connection, allowing for a connection pooling implementation to
244 # determine when to reuse and when to close the connection...
246 def can_handle_request(self, origin: Origin) -> bool:
247 return origin == self._origin
249 def is_available(self) -> bool:
250 # Note that HTTP/1.1 connections in the "NEW" state are not treated as
251 # being "available". The control flow which created the connection will
252 # be able to send an outgoing request, but the connection will not be
253 # acquired from the connection pool for any other request.
254 return self._state == HTTPConnectionState.IDLE
256 def has_expired(self) -> bool:
257 now = time.monotonic()
258 keepalive_expired = self._expire_at is not None and now > self._expire_at
260 # If the HTTP connection is idle but the socket is readable, then the
261 # only valid state is that the socket is about to return b"", indicating
262 # a server-initiated disconnect.
263 server_disconnected = (
264 self._state == HTTPConnectionState.IDLE
265 and self._network_stream.get_extra_info("is_readable")
266 )
268 return keepalive_expired or server_disconnected
270 def is_idle(self) -> bool:
271 return self._state == HTTPConnectionState.IDLE
273 def is_closed(self) -> bool:
274 return self._state == HTTPConnectionState.CLOSED
276 def info(self) -> str:
277 origin = str(self._origin)
278 return (
279 f"{origin!r}, HTTP/1.1, {self._state.name}, "
280 f"Request Count: {self._request_count}"
281 )
283 def __repr__(self) -> str:
284 class_name = self.__class__.__name__
285 origin = str(self._origin)
286 return (
287 f"<{class_name} [{origin!r}, {self._state.name}, "
288 f"Request Count: {self._request_count}]>"
289 )
291 # These context managers are not used in the standard flow, but are
292 # useful for testing or working with connection instances directly.
294 def __enter__(self) -> "HTTP11Connection":
295 return self
297 def __exit__(
298 self,
299 exc_type: Optional[Type[BaseException]] = None,
300 exc_value: Optional[BaseException] = None,
301 traceback: Optional[TracebackType] = None,
302 ) -> None:
303 self.close()
306class HTTP11ConnectionByteStream:
307 def __init__(self, connection: HTTP11Connection, request: Request) -> None:
308 self._connection = connection
309 self._request = request
310 self._closed = False
312 def __iter__(self) -> Iterator[bytes]:
313 kwargs = {"request": self._request}
314 try:
315 with Trace("receive_response_body", logger, self._request, kwargs):
316 for chunk in self._connection._receive_response_body(**kwargs):
317 yield chunk
318 except BaseException as exc:
319 # If we get an exception while streaming the response,
320 # we want to close the response (and possibly the connection)
321 # before raising that exception.
322 self.close()
323 raise exc
325 def close(self) -> None:
326 if not self._closed:
327 self._closed = True
328 with Trace("response_closed", logger, self._request):
329 self._connection._response_closed()