Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpx/_config.py: 34%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import logging
4import os
5import ssl
6import typing
7from pathlib import Path
9import certifi
11from ._compat import set_minimum_tls_version_1_2
12from ._models import Headers
13from ._types import CertTypes, HeaderTypes, TimeoutTypes, VerifyTypes
14from ._urls import URL
15from ._utils import get_ca_bundle_from_env
17__all__ = ["Limits", "Proxy", "Timeout", "create_ssl_context"]
19DEFAULT_CIPHERS = ":".join(
20 [
21 "ECDHE+AESGCM",
22 "ECDHE+CHACHA20",
23 "DHE+AESGCM",
24 "DHE+CHACHA20",
25 "ECDH+AESGCM",
26 "DH+AESGCM",
27 "ECDH+AES",
28 "DH+AES",
29 "RSA+AESGCM",
30 "RSA+AES",
31 "!aNULL",
32 "!eNULL",
33 "!MD5",
34 "!DSS",
35 ]
36)
39logger = logging.getLogger("httpx")
42class UnsetType:
43 pass # pragma: no cover
46UNSET = UnsetType()
49def create_ssl_context(
50 cert: CertTypes | None = None,
51 verify: VerifyTypes = True,
52 trust_env: bool = True,
53 http2: bool = False,
54) -> ssl.SSLContext:
55 return SSLConfig(
56 cert=cert, verify=verify, trust_env=trust_env, http2=http2
57 ).ssl_context
60class SSLConfig:
61 """
62 SSL Configuration.
63 """
65 DEFAULT_CA_BUNDLE_PATH = Path(certifi.where())
67 def __init__(
68 self,
69 *,
70 cert: CertTypes | None = None,
71 verify: VerifyTypes = True,
72 trust_env: bool = True,
73 http2: bool = False,
74 ) -> None:
75 self.cert = cert
76 self.verify = verify
77 self.trust_env = trust_env
78 self.http2 = http2
79 self.ssl_context = self.load_ssl_context()
81 def load_ssl_context(self) -> ssl.SSLContext:
82 logger.debug(
83 "load_ssl_context verify=%r cert=%r trust_env=%r http2=%r",
84 self.verify,
85 self.cert,
86 self.trust_env,
87 self.http2,
88 )
90 if self.verify:
91 return self.load_ssl_context_verify()
92 return self.load_ssl_context_no_verify()
94 def load_ssl_context_no_verify(self) -> ssl.SSLContext:
95 """
96 Return an SSL context for unverified connections.
97 """
98 context = self._create_default_ssl_context()
99 context.check_hostname = False
100 context.verify_mode = ssl.CERT_NONE
101 self._load_client_certs(context)
102 return context
104 def load_ssl_context_verify(self) -> ssl.SSLContext:
105 """
106 Return an SSL context for verified connections.
107 """
108 if self.trust_env and self.verify is True:
109 ca_bundle = get_ca_bundle_from_env()
110 if ca_bundle is not None:
111 self.verify = ca_bundle
113 if isinstance(self.verify, ssl.SSLContext):
114 # Allow passing in our own SSLContext object that's pre-configured.
115 context = self.verify
116 self._load_client_certs(context)
117 return context
118 elif isinstance(self.verify, bool):
119 ca_bundle_path = self.DEFAULT_CA_BUNDLE_PATH
120 elif Path(self.verify).exists():
121 ca_bundle_path = Path(self.verify)
122 else:
123 raise IOError(
124 "Could not find a suitable TLS CA certificate bundle, "
125 "invalid path: {}".format(self.verify)
126 )
128 context = self._create_default_ssl_context()
129 context.verify_mode = ssl.CERT_REQUIRED
130 context.check_hostname = True
132 # Signal to server support for PHA in TLS 1.3. Raises an
133 # AttributeError if only read-only access is implemented.
134 try:
135 context.post_handshake_auth = True
136 except AttributeError: # pragma: no cover
137 pass
139 # Disable using 'commonName' for SSLContext.check_hostname
140 # when the 'subjectAltName' extension isn't available.
141 try:
142 context.hostname_checks_common_name = False
143 except AttributeError: # pragma: no cover
144 pass
146 if ca_bundle_path.is_file():
147 cafile = str(ca_bundle_path)
148 logger.debug("load_verify_locations cafile=%r", cafile)
149 context.load_verify_locations(cafile=cafile)
150 elif ca_bundle_path.is_dir():
151 capath = str(ca_bundle_path)
152 logger.debug("load_verify_locations capath=%r", capath)
153 context.load_verify_locations(capath=capath)
155 self._load_client_certs(context)
157 return context
159 def _create_default_ssl_context(self) -> ssl.SSLContext:
160 """
161 Creates the default SSLContext object that's used for both verified
162 and unverified connections.
163 """
164 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
165 set_minimum_tls_version_1_2(context)
166 context.options |= ssl.OP_NO_COMPRESSION
167 context.set_ciphers(DEFAULT_CIPHERS)
169 if ssl.HAS_ALPN:
170 alpn_idents = ["http/1.1", "h2"] if self.http2 else ["http/1.1"]
171 context.set_alpn_protocols(alpn_idents)
173 keylogfile = os.environ.get("SSLKEYLOGFILE")
174 if keylogfile and self.trust_env:
175 context.keylog_filename = keylogfile
177 return context
179 def _load_client_certs(self, ssl_context: ssl.SSLContext) -> None:
180 """
181 Loads client certificates into our SSLContext object
182 """
183 if self.cert is not None:
184 if isinstance(self.cert, str):
185 ssl_context.load_cert_chain(certfile=self.cert)
186 elif isinstance(self.cert, tuple) and len(self.cert) == 2:
187 ssl_context.load_cert_chain(certfile=self.cert[0], keyfile=self.cert[1])
188 elif isinstance(self.cert, tuple) and len(self.cert) == 3:
189 ssl_context.load_cert_chain(
190 certfile=self.cert[0],
191 keyfile=self.cert[1],
192 password=self.cert[2],
193 )
196class Timeout:
197 """
198 Timeout configuration.
200 **Usage**:
202 Timeout(None) # No timeouts.
203 Timeout(5.0) # 5s timeout on all operations.
204 Timeout(None, connect=5.0) # 5s timeout on connect, no other timeouts.
205 Timeout(5.0, connect=10.0) # 10s timeout on connect. 5s timeout elsewhere.
206 Timeout(5.0, pool=None) # No timeout on acquiring connection from pool.
207 # 5s timeout elsewhere.
208 """
210 def __init__(
211 self,
212 timeout: TimeoutTypes | UnsetType = UNSET,
213 *,
214 connect: None | float | UnsetType = UNSET,
215 read: None | float | UnsetType = UNSET,
216 write: None | float | UnsetType = UNSET,
217 pool: None | float | UnsetType = UNSET,
218 ) -> None:
219 if isinstance(timeout, Timeout):
220 # Passed as a single explicit Timeout.
221 assert connect is UNSET
222 assert read is UNSET
223 assert write is UNSET
224 assert pool is UNSET
225 self.connect = timeout.connect # type: typing.Optional[float]
226 self.read = timeout.read # type: typing.Optional[float]
227 self.write = timeout.write # type: typing.Optional[float]
228 self.pool = timeout.pool # type: typing.Optional[float]
229 elif isinstance(timeout, tuple):
230 # Passed as a tuple.
231 self.connect = timeout[0]
232 self.read = timeout[1]
233 self.write = None if len(timeout) < 3 else timeout[2]
234 self.pool = None if len(timeout) < 4 else timeout[3]
235 elif not (
236 isinstance(connect, UnsetType)
237 or isinstance(read, UnsetType)
238 or isinstance(write, UnsetType)
239 or isinstance(pool, UnsetType)
240 ):
241 self.connect = connect
242 self.read = read
243 self.write = write
244 self.pool = pool
245 else:
246 if isinstance(timeout, UnsetType):
247 raise ValueError(
248 "httpx.Timeout must either include a default, or set all "
249 "four parameters explicitly."
250 )
251 self.connect = timeout if isinstance(connect, UnsetType) else connect
252 self.read = timeout if isinstance(read, UnsetType) else read
253 self.write = timeout if isinstance(write, UnsetType) else write
254 self.pool = timeout if isinstance(pool, UnsetType) else pool
256 def as_dict(self) -> dict[str, float | None]:
257 return {
258 "connect": self.connect,
259 "read": self.read,
260 "write": self.write,
261 "pool": self.pool,
262 }
264 def __eq__(self, other: typing.Any) -> bool:
265 return (
266 isinstance(other, self.__class__)
267 and self.connect == other.connect
268 and self.read == other.read
269 and self.write == other.write
270 and self.pool == other.pool
271 )
273 def __repr__(self) -> str:
274 class_name = self.__class__.__name__
275 if len({self.connect, self.read, self.write, self.pool}) == 1:
276 return f"{class_name}(timeout={self.connect})"
277 return (
278 f"{class_name}(connect={self.connect}, "
279 f"read={self.read}, write={self.write}, pool={self.pool})"
280 )
283class Limits:
284 """
285 Configuration for limits to various client behaviors.
287 **Parameters:**
289 * **max_connections** - The maximum number of concurrent connections that may be
290 established.
291 * **max_keepalive_connections** - Allow the connection pool to maintain
292 keep-alive connections below this point. Should be less than or equal
293 to `max_connections`.
294 * **keepalive_expiry** - Time limit on idle keep-alive connections in seconds.
295 """
297 def __init__(
298 self,
299 *,
300 max_connections: int | None = None,
301 max_keepalive_connections: int | None = None,
302 keepalive_expiry: float | None = 5.0,
303 ) -> None:
304 self.max_connections = max_connections
305 self.max_keepalive_connections = max_keepalive_connections
306 self.keepalive_expiry = keepalive_expiry
308 def __eq__(self, other: typing.Any) -> bool:
309 return (
310 isinstance(other, self.__class__)
311 and self.max_connections == other.max_connections
312 and self.max_keepalive_connections == other.max_keepalive_connections
313 and self.keepalive_expiry == other.keepalive_expiry
314 )
316 def __repr__(self) -> str:
317 class_name = self.__class__.__name__
318 return (
319 f"{class_name}(max_connections={self.max_connections}, "
320 f"max_keepalive_connections={self.max_keepalive_connections}, "
321 f"keepalive_expiry={self.keepalive_expiry})"
322 )
325class Proxy:
326 def __init__(
327 self,
328 url: URL | str,
329 *,
330 ssl_context: ssl.SSLContext | None = None,
331 auth: tuple[str, str] | None = None,
332 headers: HeaderTypes | None = None,
333 ) -> None:
334 url = URL(url)
335 headers = Headers(headers)
337 if url.scheme not in ("http", "https", "socks5"):
338 raise ValueError(f"Unknown scheme for proxy URL {url!r}")
340 if url.username or url.password:
341 # Remove any auth credentials from the URL.
342 auth = (url.username, url.password)
343 url = url.copy_with(username=None, password=None)
345 self.url = url
346 self.auth = auth
347 self.headers = headers
348 self.ssl_context = ssl_context
350 @property
351 def raw_auth(self) -> tuple[bytes, bytes] | None:
352 # The proxy authentication as raw bytes.
353 return (
354 None
355 if self.auth is None
356 else (self.auth[0].encode("utf-8"), self.auth[1].encode("utf-8"))
357 )
359 def __repr__(self) -> str:
360 # The authentication is represented with the password component masked.
361 auth = (self.auth[0], "********") if self.auth else None
363 # Build a nice concise representation.
364 url_str = f"{str(self.url)!r}"
365 auth_str = f", auth={auth!r}" if auth else ""
366 headers_str = f", headers={dict(self.headers)!r}" if self.headers else ""
367 return f"Proxy({url_str}{auth_str}{headers_str})"
370DEFAULT_TIMEOUT_CONFIG = Timeout(timeout=5.0)
371DEFAULT_LIMITS = Limits(max_connections=100, max_keepalive_connections=20)
372DEFAULT_MAX_REDIRECTS = 20