Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpcore/_async/http11.py: 27%
153 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
1import enum
2import time
3from types import TracebackType
4from typing import (
5 AsyncIterable,
6 AsyncIterator,
7 List,
8 Optional,
9 Tuple,
10 Type,
11 Union,
12 cast,
13)
15import h11
17from .._exceptions import (
18 ConnectionNotAvailable,
19 LocalProtocolError,
20 RemoteProtocolError,
21 map_exceptions,
22)
23from .._models import Origin, Request, Response
24from .._synchronization import AsyncLock
25from .._trace import Trace
26from ..backends.base import AsyncNetworkStream
27from .interfaces import AsyncConnectionInterface
29# A subset of `h11.Event` types supported by `_send_event`
30H11SendEvent = Union[
31 h11.Request,
32 h11.Data,
33 h11.EndOfMessage,
34]
37class HTTPConnectionState(enum.IntEnum):
38 NEW = 0
39 ACTIVE = 1
40 IDLE = 2
41 CLOSED = 3
44class AsyncHTTP11Connection(AsyncConnectionInterface):
45 READ_NUM_BYTES = 64 * 1024
47 def __init__(
48 self,
49 origin: Origin,
50 stream: AsyncNetworkStream,
51 keepalive_expiry: Optional[float] = None,
52 ) -> None:
53 self._origin = origin
54 self._network_stream = stream
55 self._keepalive_expiry: Optional[float] = keepalive_expiry
56 self._expire_at: Optional[float] = None
57 self._state = HTTPConnectionState.NEW
58 self._state_lock = AsyncLock()
59 self._request_count = 0
60 self._h11_state = h11.Connection(our_role=h11.CLIENT)
62 async def handle_async_request(self, request: Request) -> Response:
63 if not self.can_handle_request(request.url.origin):
64 raise RuntimeError(
65 f"Attempted to send request to {request.url.origin} on connection "
66 f"to {self._origin}"
67 )
69 async with self._state_lock:
70 if self._state in (HTTPConnectionState.NEW, HTTPConnectionState.IDLE):
71 self._request_count += 1
72 self._state = HTTPConnectionState.ACTIVE
73 self._expire_at = None
74 else:
75 raise ConnectionNotAvailable()
77 try:
78 kwargs = {"request": request}
79 async with Trace("http11.send_request_headers", request, kwargs) as trace:
80 await self._send_request_headers(**kwargs)
81 async with Trace("http11.send_request_body", request, kwargs) as trace:
82 await self._send_request_body(**kwargs)
83 async with Trace(
84 "http11.receive_response_headers", request, kwargs
85 ) as trace:
86 (
87 http_version,
88 status,
89 reason_phrase,
90 headers,
91 ) = await self._receive_response_headers(**kwargs)
92 trace.return_value = (
93 http_version,
94 status,
95 reason_phrase,
96 headers,
97 )
99 return Response(
100 status=status,
101 headers=headers,
102 content=HTTP11ConnectionByteStream(self, request),
103 extensions={
104 "http_version": http_version,
105 "reason_phrase": reason_phrase,
106 "network_stream": self._network_stream,
107 },
108 )
109 except BaseException as exc:
110 async with Trace("http11.response_closed", request) as trace:
111 await self._response_closed()
112 raise exc
114 # Sending the request...
116 async def _send_request_headers(self, request: Request) -> None:
117 timeouts = request.extensions.get("timeout", {})
118 timeout = timeouts.get("write", None)
120 with map_exceptions({h11.LocalProtocolError: LocalProtocolError}):
121 event = h11.Request(
122 method=request.method,
123 target=request.url.target,
124 headers=request.headers,
125 )
126 await self._send_event(event, timeout=timeout)
128 async def _send_request_body(self, request: Request) -> None:
129 timeouts = request.extensions.get("timeout", {})
130 timeout = timeouts.get("write", None)
132 assert isinstance(request.stream, AsyncIterable)
133 async for chunk in request.stream:
134 event = h11.Data(data=chunk)
135 await self._send_event(event, timeout=timeout)
137 await self._send_event(h11.EndOfMessage(), timeout=timeout)
139 async def _send_event(
140 self, event: h11.Event, timeout: Optional[float] = None
141 ) -> None:
142 bytes_to_send = self._h11_state.send(event)
143 if bytes_to_send is not None:
144 await self._network_stream.write(bytes_to_send, timeout=timeout)
146 # Receiving the response...
148 async def _receive_response_headers(
149 self, request: Request
150 ) -> Tuple[bytes, int, bytes, List[Tuple[bytes, bytes]]]:
151 timeouts = request.extensions.get("timeout", {})
152 timeout = timeouts.get("read", None)
154 while True:
155 event = await self._receive_event(timeout=timeout)
156 if isinstance(event, h11.Response):
157 break
158 if (
159 isinstance(event, h11.InformationalResponse)
160 and event.status_code == 101
161 ):
162 break
164 http_version = b"HTTP/" + event.http_version
166 # h11 version 0.11+ supports a `raw_items` interface to get the
167 # raw header casing, rather than the enforced lowercase headers.
168 headers = event.headers.raw_items()
170 return http_version, event.status_code, event.reason, headers
172 async def _receive_response_body(self, request: Request) -> AsyncIterator[bytes]:
173 timeouts = request.extensions.get("timeout", {})
174 timeout = timeouts.get("read", None)
176 while True:
177 event = await self._receive_event(timeout=timeout)
178 if isinstance(event, h11.Data):
179 yield bytes(event.data)
180 elif isinstance(event, (h11.EndOfMessage, h11.PAUSED)):
181 break
183 async def _receive_event(
184 self, timeout: Optional[float] = None
185 ) -> Union[h11.Event, Type[h11.PAUSED]]:
186 while True:
187 with map_exceptions({h11.RemoteProtocolError: RemoteProtocolError}):
188 event = self._h11_state.next_event()
190 if event is h11.NEED_DATA:
191 data = await self._network_stream.read(
192 self.READ_NUM_BYTES, timeout=timeout
193 )
195 # If we feed this case through h11 we'll raise an exception like:
196 #
197 # httpcore.RemoteProtocolError: can't handle event type
198 # ConnectionClosed when role=SERVER and state=SEND_RESPONSE
199 #
200 # Which is accurate, but not very informative from an end-user
201 # perspective. Instead we handle this case distinctly and treat
202 # it as a ConnectError.
203 if data == b"" and self._h11_state.their_state == h11.SEND_RESPONSE:
204 msg = "Server disconnected without sending a response."
205 raise RemoteProtocolError(msg)
207 self._h11_state.receive_data(data)
208 else:
209 # mypy fails to narrow the type in the above if statement above
210 return cast(Union[h11.Event, Type[h11.PAUSED]], event)
212 async def _response_closed(self) -> None:
213 async with self._state_lock:
214 if (
215 self._h11_state.our_state is h11.DONE
216 and self._h11_state.their_state is h11.DONE
217 ):
218 self._state = HTTPConnectionState.IDLE
219 self._h11_state.start_next_cycle()
220 if self._keepalive_expiry is not None:
221 now = time.monotonic()
222 self._expire_at = now + self._keepalive_expiry
223 else:
224 await self.aclose()
226 # Once the connection is no longer required...
228 async def aclose(self) -> None:
229 # Note that this method unilaterally closes the connection, and does
230 # not have any kind of locking in place around it.
231 self._state = HTTPConnectionState.CLOSED
232 await self._network_stream.aclose()
234 # The AsyncConnectionInterface methods provide information about the state of
235 # the connection, allowing for a connection pooling implementation to
236 # determine when to reuse and when to close the connection...
238 def can_handle_request(self, origin: Origin) -> bool:
239 return origin == self._origin
241 def is_available(self) -> bool:
242 # Note that HTTP/1.1 connections in the "NEW" state are not treated as
243 # being "available". The control flow which created the connection will
244 # be able to send an outgoing request, but the connection will not be
245 # acquired from the connection pool for any other request.
246 return self._state == HTTPConnectionState.IDLE
248 def has_expired(self) -> bool:
249 now = time.monotonic()
250 keepalive_expired = self._expire_at is not None and now > self._expire_at
252 # If the HTTP connection is idle but the socket is readable, then the
253 # only valid state is that the socket is about to return b"", indicating
254 # a server-initiated disconnect.
255 server_disconnected = (
256 self._state == HTTPConnectionState.IDLE
257 and self._network_stream.get_extra_info("is_readable")
258 )
260 return keepalive_expired or server_disconnected
262 def is_idle(self) -> bool:
263 return self._state == HTTPConnectionState.IDLE
265 def is_closed(self) -> bool:
266 return self._state == HTTPConnectionState.CLOSED
268 def info(self) -> str:
269 origin = str(self._origin)
270 return (
271 f"{origin!r}, HTTP/1.1, {self._state.name}, "
272 f"Request Count: {self._request_count}"
273 )
275 def __repr__(self) -> str:
276 class_name = self.__class__.__name__
277 origin = str(self._origin)
278 return (
279 f"<{class_name} [{origin!r}, {self._state.name}, "
280 f"Request Count: {self._request_count}]>"
281 )
283 # These context managers are not used in the standard flow, but are
284 # useful for testing or working with connection instances directly.
286 async def __aenter__(self) -> "AsyncHTTP11Connection":
287 return self
289 async def __aexit__(
290 self,
291 exc_type: Optional[Type[BaseException]] = None,
292 exc_value: Optional[BaseException] = None,
293 traceback: Optional[TracebackType] = None,
294 ) -> None:
295 await self.aclose()
298class HTTP11ConnectionByteStream:
299 def __init__(self, connection: AsyncHTTP11Connection, request: Request) -> None:
300 self._connection = connection
301 self._request = request
302 self._closed = False
304 async def __aiter__(self) -> AsyncIterator[bytes]:
305 kwargs = {"request": self._request}
306 try:
307 async with Trace("http11.receive_response_body", self._request, kwargs):
308 async for chunk in self._connection._receive_response_body(**kwargs):
309 yield chunk
310 except BaseException as exc:
311 # If we get an exception while streaming the response,
312 # we want to close the response (and possibly the connection)
313 # before raising that exception.
314 await self.aclose()
315 raise exc
317 async def aclose(self) -> None:
318 if not self._closed:
319 self._closed = True
320 async with Trace("http11.response_closed", self._request):
321 await self._connection._response_closed()