Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpcore/_async/http11.py: 28%
161 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:38 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:38 +0000
1import enum
2import logging
3import time
4from types import TracebackType
5from typing import (
6 AsyncIterable,
7 AsyncIterator,
8 List,
9 Optional,
10 Tuple,
11 Type,
12 Union,
13 cast,
14)
16import h11
18from .._backends.base import AsyncNetworkStream
19from .._exceptions import (
20 ConnectionNotAvailable,
21 LocalProtocolError,
22 RemoteProtocolError,
23 WriteError,
24 map_exceptions,
25)
26from .._models import Origin, Request, Response
27from .._synchronization import AsyncLock, AsyncShieldCancellation
28from .._trace import Trace
29from .interfaces import AsyncConnectionInterface
31logger = logging.getLogger("httpcore.http11")
34# A subset of `h11.Event` types supported by `_send_event`
35H11SendEvent = Union[
36 h11.Request,
37 h11.Data,
38 h11.EndOfMessage,
39]
42class HTTPConnectionState(enum.IntEnum):
43 NEW = 0
44 ACTIVE = 1
45 IDLE = 2
46 CLOSED = 3
49class AsyncHTTP11Connection(AsyncConnectionInterface):
50 READ_NUM_BYTES = 64 * 1024
51 MAX_INCOMPLETE_EVENT_SIZE = 100 * 1024
53 def __init__(
54 self,
55 origin: Origin,
56 stream: AsyncNetworkStream,
57 keepalive_expiry: Optional[float] = None,
58 ) -> None:
59 self._origin = origin
60 self._network_stream = stream
61 self._keepalive_expiry: Optional[float] = keepalive_expiry
62 self._expire_at: Optional[float] = None
63 self._state = HTTPConnectionState.NEW
64 self._state_lock = AsyncLock()
65 self._request_count = 0
66 self._h11_state = h11.Connection(
67 our_role=h11.CLIENT,
68 max_incomplete_event_size=self.MAX_INCOMPLETE_EVENT_SIZE,
69 )
71 async def handle_async_request(self, request: Request) -> Response:
72 if not self.can_handle_request(request.url.origin):
73 raise RuntimeError(
74 f"Attempted to send request to {request.url.origin} on connection "
75 f"to {self._origin}"
76 )
78 async with self._state_lock:
79 if self._state in (HTTPConnectionState.NEW, HTTPConnectionState.IDLE):
80 self._request_count += 1
81 self._state = HTTPConnectionState.ACTIVE
82 self._expire_at = None
83 else:
84 raise ConnectionNotAvailable()
86 try:
87 kwargs = {"request": request}
88 try:
89 async with Trace(
90 "send_request_headers", logger, request, kwargs
91 ) as trace:
92 await self._send_request_headers(**kwargs)
93 async with Trace("send_request_body", logger, request, kwargs) as trace:
94 await self._send_request_body(**kwargs)
95 except WriteError:
96 # If we get a write error while we're writing the request,
97 # then we supress this error and move on to attempting to
98 # read the response. Servers can sometimes close the request
99 # pre-emptively and then respond with a well formed HTTP
100 # error response.
101 pass
103 async with Trace(
104 "receive_response_headers", logger, request, kwargs
105 ) as trace:
106 (
107 http_version,
108 status,
109 reason_phrase,
110 headers,
111 ) = await self._receive_response_headers(**kwargs)
112 trace.return_value = (
113 http_version,
114 status,
115 reason_phrase,
116 headers,
117 )
119 return Response(
120 status=status,
121 headers=headers,
122 content=HTTP11ConnectionByteStream(self, request),
123 extensions={
124 "http_version": http_version,
125 "reason_phrase": reason_phrase,
126 "network_stream": self._network_stream,
127 },
128 )
129 except BaseException as exc:
130 with AsyncShieldCancellation():
131 async with Trace("response_closed", logger, request) as trace:
132 await self._response_closed()
133 raise exc
135 # Sending the request...
137 async def _send_request_headers(self, request: Request) -> None:
138 timeouts = request.extensions.get("timeout", {})
139 timeout = timeouts.get("write", None)
141 with map_exceptions({h11.LocalProtocolError: LocalProtocolError}):
142 event = h11.Request(
143 method=request.method,
144 target=request.url.target,
145 headers=request.headers,
146 )
147 await self._send_event(event, timeout=timeout)
149 async def _send_request_body(self, request: Request) -> None:
150 timeouts = request.extensions.get("timeout", {})
151 timeout = timeouts.get("write", None)
153 assert isinstance(request.stream, AsyncIterable)
154 async for chunk in request.stream:
155 event = h11.Data(data=chunk)
156 await self._send_event(event, timeout=timeout)
158 await self._send_event(h11.EndOfMessage(), timeout=timeout)
160 async def _send_event(
161 self, event: h11.Event, timeout: Optional[float] = None
162 ) -> None:
163 bytes_to_send = self._h11_state.send(event)
164 if bytes_to_send is not None:
165 await self._network_stream.write(bytes_to_send, timeout=timeout)
167 # Receiving the response...
169 async def _receive_response_headers(
170 self, request: Request
171 ) -> Tuple[bytes, int, bytes, List[Tuple[bytes, bytes]]]:
172 timeouts = request.extensions.get("timeout", {})
173 timeout = timeouts.get("read", None)
175 while True:
176 event = await self._receive_event(timeout=timeout)
177 if isinstance(event, h11.Response):
178 break
179 if (
180 isinstance(event, h11.InformationalResponse)
181 and event.status_code == 101
182 ):
183 break
185 http_version = b"HTTP/" + event.http_version
187 # h11 version 0.11+ supports a `raw_items` interface to get the
188 # raw header casing, rather than the enforced lowercase headers.
189 headers = event.headers.raw_items()
191 return http_version, event.status_code, event.reason, headers
193 async def _receive_response_body(self, request: Request) -> AsyncIterator[bytes]:
194 timeouts = request.extensions.get("timeout", {})
195 timeout = timeouts.get("read", None)
197 while True:
198 event = await self._receive_event(timeout=timeout)
199 if isinstance(event, h11.Data):
200 yield bytes(event.data)
201 elif isinstance(event, (h11.EndOfMessage, h11.PAUSED)):
202 break
204 async def _receive_event(
205 self, timeout: Optional[float] = None
206 ) -> Union[h11.Event, Type[h11.PAUSED]]:
207 while True:
208 with map_exceptions({h11.RemoteProtocolError: RemoteProtocolError}):
209 event = self._h11_state.next_event()
211 if event is h11.NEED_DATA:
212 data = await self._network_stream.read(
213 self.READ_NUM_BYTES, timeout=timeout
214 )
216 # If we feed this case through h11 we'll raise an exception like:
217 #
218 # httpcore.RemoteProtocolError: can't handle event type
219 # ConnectionClosed when role=SERVER and state=SEND_RESPONSE
220 #
221 # Which is accurate, but not very informative from an end-user
222 # perspective. Instead we handle this case distinctly and treat
223 # it as a ConnectError.
224 if data == b"" and self._h11_state.their_state == h11.SEND_RESPONSE:
225 msg = "Server disconnected without sending a response."
226 raise RemoteProtocolError(msg)
228 self._h11_state.receive_data(data)
229 else:
230 # mypy fails to narrow the type in the above if statement above
231 return cast(Union[h11.Event, Type[h11.PAUSED]], event)
233 async def _response_closed(self) -> None:
234 async with self._state_lock:
235 if (
236 self._h11_state.our_state is h11.DONE
237 and self._h11_state.their_state is h11.DONE
238 ):
239 self._state = HTTPConnectionState.IDLE
240 self._h11_state.start_next_cycle()
241 if self._keepalive_expiry is not None:
242 now = time.monotonic()
243 self._expire_at = now + self._keepalive_expiry
244 else:
245 await self.aclose()
247 # Once the connection is no longer required...
249 async def aclose(self) -> None:
250 # Note that this method unilaterally closes the connection, and does
251 # not have any kind of locking in place around it.
252 self._state = HTTPConnectionState.CLOSED
253 await self._network_stream.aclose()
255 # The AsyncConnectionInterface methods provide information about the state of
256 # the connection, allowing for a connection pooling implementation to
257 # determine when to reuse and when to close the connection...
259 def can_handle_request(self, origin: Origin) -> bool:
260 return origin == self._origin
262 def is_available(self) -> bool:
263 # Note that HTTP/1.1 connections in the "NEW" state are not treated as
264 # being "available". The control flow which created the connection will
265 # be able to send an outgoing request, but the connection will not be
266 # acquired from the connection pool for any other request.
267 return self._state == HTTPConnectionState.IDLE
269 def has_expired(self) -> bool:
270 now = time.monotonic()
271 keepalive_expired = self._expire_at is not None and now > self._expire_at
273 # If the HTTP connection is idle but the socket is readable, then the
274 # only valid state is that the socket is about to return b"", indicating
275 # a server-initiated disconnect.
276 server_disconnected = (
277 self._state == HTTPConnectionState.IDLE
278 and self._network_stream.get_extra_info("is_readable")
279 )
281 return keepalive_expired or server_disconnected
283 def is_idle(self) -> bool:
284 return self._state == HTTPConnectionState.IDLE
286 def is_closed(self) -> bool:
287 return self._state == HTTPConnectionState.CLOSED
289 def info(self) -> str:
290 origin = str(self._origin)
291 return (
292 f"{origin!r}, HTTP/1.1, {self._state.name}, "
293 f"Request Count: {self._request_count}"
294 )
296 def __repr__(self) -> str:
297 class_name = self.__class__.__name__
298 origin = str(self._origin)
299 return (
300 f"<{class_name} [{origin!r}, {self._state.name}, "
301 f"Request Count: {self._request_count}]>"
302 )
304 # These context managers are not used in the standard flow, but are
305 # useful for testing or working with connection instances directly.
307 async def __aenter__(self) -> "AsyncHTTP11Connection":
308 return self
310 async def __aexit__(
311 self,
312 exc_type: Optional[Type[BaseException]] = None,
313 exc_value: Optional[BaseException] = None,
314 traceback: Optional[TracebackType] = None,
315 ) -> None:
316 await self.aclose()
319class HTTP11ConnectionByteStream:
320 def __init__(self, connection: AsyncHTTP11Connection, request: Request) -> None:
321 self._connection = connection
322 self._request = request
323 self._closed = False
325 async def __aiter__(self) -> AsyncIterator[bytes]:
326 kwargs = {"request": self._request}
327 try:
328 async with Trace("receive_response_body", logger, self._request, kwargs):
329 async for chunk in self._connection._receive_response_body(**kwargs):
330 yield chunk
331 except BaseException as exc:
332 # If we get an exception while streaming the response,
333 # we want to close the response (and possibly the connection)
334 # before raising that exception.
335 with AsyncShieldCancellation():
336 await self.aclose()
337 raise exc
339 async def aclose(self) -> None:
340 if not self._closed:
341 self._closed = True
342 async with Trace("response_closed", logger, self._request):
343 await self._connection._response_closed()