Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpcore/_async/connection.py: 26%
118 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:19 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:19 +0000
1import itertools
2import logging
3import ssl
4from types import TracebackType
5from typing import Iterable, Iterator, Optional, Type
7from .._exceptions import ConnectError, ConnectionNotAvailable, ConnectTimeout
8from .._models import Origin, Request, Response
9from .._ssl import default_ssl_context
10from .._synchronization import AsyncLock
11from .._trace import Trace
12from ..backends.auto import AutoBackend
13from ..backends.base import SOCKET_OPTION, AsyncNetworkBackend, AsyncNetworkStream
14from .http11 import AsyncHTTP11Connection
15from .interfaces import AsyncConnectionInterface
17RETRIES_BACKOFF_FACTOR = 0.5 # 0s, 0.5s, 1s, 2s, 4s, etc.
20logger = logging.getLogger("httpcore.connection")
23def exponential_backoff(factor: float) -> Iterator[float]:
24 yield 0
25 for n in itertools.count(2):
26 yield factor * (2 ** (n - 2))
29class AsyncHTTPConnection(AsyncConnectionInterface):
30 def __init__(
31 self,
32 origin: Origin,
33 ssl_context: Optional[ssl.SSLContext] = None,
34 keepalive_expiry: Optional[float] = None,
35 http1: bool = True,
36 http2: bool = False,
37 retries: int = 0,
38 local_address: Optional[str] = None,
39 uds: Optional[str] = None,
40 network_backend: Optional[AsyncNetworkBackend] = None,
41 socket_options: Optional[Iterable[SOCKET_OPTION]] = None,
42 ) -> None:
43 self._origin = origin
44 self._ssl_context = ssl_context
45 self._keepalive_expiry = keepalive_expiry
46 self._http1 = http1
47 self._http2 = http2
48 self._retries = retries
49 self._local_address = local_address
50 self._uds = uds
52 self._network_backend: AsyncNetworkBackend = (
53 AutoBackend() if network_backend is None else network_backend
54 )
55 self._connection: Optional[AsyncConnectionInterface] = None
56 self._connect_failed: bool = False
57 self._request_lock = AsyncLock()
58 self._socket_options = socket_options
60 async def handle_async_request(self, request: Request) -> Response:
61 if not self.can_handle_request(request.url.origin):
62 raise RuntimeError(
63 f"Attempted to send request to {request.url.origin} on connection to {self._origin}"
64 )
66 async with self._request_lock:
67 if self._connection is None:
68 try:
69 stream = await self._connect(request)
71 ssl_object = stream.get_extra_info("ssl_object")
72 http2_negotiated = (
73 ssl_object is not None
74 and ssl_object.selected_alpn_protocol() == "h2"
75 )
76 if http2_negotiated or (self._http2 and not self._http1):
77 from .http2 import AsyncHTTP2Connection
79 self._connection = AsyncHTTP2Connection(
80 origin=self._origin,
81 stream=stream,
82 keepalive_expiry=self._keepalive_expiry,
83 )
84 else:
85 self._connection = AsyncHTTP11Connection(
86 origin=self._origin,
87 stream=stream,
88 keepalive_expiry=self._keepalive_expiry,
89 )
90 except Exception as exc:
91 self._connect_failed = True
92 raise exc
93 elif not self._connection.is_available():
94 raise ConnectionNotAvailable()
96 return await self._connection.handle_async_request(request)
98 async def _connect(self, request: Request) -> AsyncNetworkStream:
99 timeouts = request.extensions.get("timeout", {})
100 sni_hostname = request.extensions.get("sni_hostname", None)
101 timeout = timeouts.get("connect", None)
103 retries_left = self._retries
104 delays = exponential_backoff(factor=RETRIES_BACKOFF_FACTOR)
106 while True:
107 try:
108 if self._uds is None:
109 kwargs = {
110 "host": self._origin.host.decode("ascii"),
111 "port": self._origin.port,
112 "local_address": self._local_address,
113 "timeout": timeout,
114 "socket_options": self._socket_options,
115 }
116 async with Trace("connect_tcp", logger, request, kwargs) as trace:
117 stream = await self._network_backend.connect_tcp(**kwargs)
118 trace.return_value = stream
119 else:
120 kwargs = {
121 "path": self._uds,
122 "timeout": timeout,
123 "socket_options": self._socket_options,
124 }
125 async with Trace(
126 "connect_unix_socket", logger, request, kwargs
127 ) as trace:
128 stream = await self._network_backend.connect_unix_socket(
129 **kwargs
130 )
131 trace.return_value = stream
133 if self._origin.scheme == b"https":
134 ssl_context = (
135 default_ssl_context()
136 if self._ssl_context is None
137 else self._ssl_context
138 )
139 alpn_protocols = ["http/1.1", "h2"] if self._http2 else ["http/1.1"]
140 ssl_context.set_alpn_protocols(alpn_protocols)
142 kwargs = {
143 "ssl_context": ssl_context,
144 "server_hostname": sni_hostname
145 or self._origin.host.decode("ascii"),
146 "timeout": timeout,
147 }
148 async with Trace("start_tls", logger, request, kwargs) as trace:
149 stream = await stream.start_tls(**kwargs)
150 trace.return_value = stream
151 return stream
152 except (ConnectError, ConnectTimeout):
153 if retries_left <= 0:
154 raise
155 retries_left -= 1
156 delay = next(delays)
157 async with Trace("retry", logger, request, kwargs) as trace:
158 await self._network_backend.sleep(delay)
160 def can_handle_request(self, origin: Origin) -> bool:
161 return origin == self._origin
163 async def aclose(self) -> None:
164 if self._connection is not None:
165 async with Trace("close", logger, None, {}):
166 await self._connection.aclose()
168 def is_available(self) -> bool:
169 if self._connection is None:
170 # If HTTP/2 support is enabled, and the resulting connection could
171 # end up as HTTP/2 then we should indicate the connection as being
172 # available to service multiple requests.
173 return (
174 self._http2
175 and (self._origin.scheme == b"https" or not self._http1)
176 and not self._connect_failed
177 )
178 return self._connection.is_available()
180 def has_expired(self) -> bool:
181 if self._connection is None:
182 return self._connect_failed
183 return self._connection.has_expired()
185 def is_idle(self) -> bool:
186 if self._connection is None:
187 return self._connect_failed
188 return self._connection.is_idle()
190 def is_closed(self) -> bool:
191 if self._connection is None:
192 return self._connect_failed
193 return self._connection.is_closed()
195 def info(self) -> str:
196 if self._connection is None:
197 return "CONNECTION FAILED" if self._connect_failed else "CONNECTING"
198 return self._connection.info()
200 def __repr__(self) -> str:
201 return f"<{self.__class__.__name__} [{self.info()}]>"
203 # These context managers are not used in the standard flow, but are
204 # useful for testing or working with connection instances directly.
206 async def __aenter__(self) -> "AsyncHTTPConnection":
207 return self
209 async def __aexit__(
210 self,
211 exc_type: Optional[Type[BaseException]] = None,
212 exc_value: Optional[BaseException] = None,
213 traceback: Optional[TracebackType] = None,
214 ) -> None:
215 await self.aclose()