Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpx/_config.py: 33%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import logging
4import os
5import ssl
6import typing
7from pathlib import Path
9import certifi
11from ._compat import set_minimum_tls_version_1_2
12from ._models import Headers
13from ._types import CertTypes, HeaderTypes, TimeoutTypes, URLTypes, VerifyTypes
14from ._urls import URL
15from ._utils import get_ca_bundle_from_env
17DEFAULT_CIPHERS = ":".join(
18 [
19 "ECDHE+AESGCM",
20 "ECDHE+CHACHA20",
21 "DHE+AESGCM",
22 "DHE+CHACHA20",
23 "ECDH+AESGCM",
24 "DH+AESGCM",
25 "ECDH+AES",
26 "DH+AES",
27 "RSA+AESGCM",
28 "RSA+AES",
29 "!aNULL",
30 "!eNULL",
31 "!MD5",
32 "!DSS",
33 ]
34)
37logger = logging.getLogger("httpx")
40class UnsetType:
41 pass # pragma: no cover
44UNSET = UnsetType()
47def create_ssl_context(
48 cert: CertTypes | None = None,
49 verify: VerifyTypes = True,
50 trust_env: bool = True,
51 http2: bool = False,
52) -> ssl.SSLContext:
53 return SSLConfig(
54 cert=cert, verify=verify, trust_env=trust_env, http2=http2
55 ).ssl_context
58class SSLConfig:
59 """
60 SSL Configuration.
61 """
63 DEFAULT_CA_BUNDLE_PATH = Path(certifi.where())
65 def __init__(
66 self,
67 *,
68 cert: CertTypes | None = None,
69 verify: VerifyTypes = True,
70 trust_env: bool = True,
71 http2: bool = False,
72 ) -> None:
73 self.cert = cert
74 self.verify = verify
75 self.trust_env = trust_env
76 self.http2 = http2
77 self.ssl_context = self.load_ssl_context()
79 def load_ssl_context(self) -> ssl.SSLContext:
80 logger.debug(
81 "load_ssl_context verify=%r cert=%r trust_env=%r http2=%r",
82 self.verify,
83 self.cert,
84 self.trust_env,
85 self.http2,
86 )
88 if self.verify:
89 return self.load_ssl_context_verify()
90 return self.load_ssl_context_no_verify()
92 def load_ssl_context_no_verify(self) -> ssl.SSLContext:
93 """
94 Return an SSL context for unverified connections.
95 """
96 context = self._create_default_ssl_context()
97 context.check_hostname = False
98 context.verify_mode = ssl.CERT_NONE
99 self._load_client_certs(context)
100 return context
102 def load_ssl_context_verify(self) -> ssl.SSLContext:
103 """
104 Return an SSL context for verified connections.
105 """
106 if self.trust_env and self.verify is True:
107 ca_bundle = get_ca_bundle_from_env()
108 if ca_bundle is not None:
109 self.verify = ca_bundle
111 if isinstance(self.verify, ssl.SSLContext):
112 # Allow passing in our own SSLContext object that's pre-configured.
113 context = self.verify
114 self._load_client_certs(context)
115 return context
116 elif isinstance(self.verify, bool):
117 ca_bundle_path = self.DEFAULT_CA_BUNDLE_PATH
118 elif Path(self.verify).exists():
119 ca_bundle_path = Path(self.verify)
120 else:
121 raise IOError(
122 "Could not find a suitable TLS CA certificate bundle, "
123 "invalid path: {}".format(self.verify)
124 )
126 context = self._create_default_ssl_context()
127 context.verify_mode = ssl.CERT_REQUIRED
128 context.check_hostname = True
130 # Signal to server support for PHA in TLS 1.3. Raises an
131 # AttributeError if only read-only access is implemented.
132 try:
133 context.post_handshake_auth = True
134 except AttributeError: # pragma: no cover
135 pass
137 # Disable using 'commonName' for SSLContext.check_hostname
138 # when the 'subjectAltName' extension isn't available.
139 try:
140 context.hostname_checks_common_name = False
141 except AttributeError: # pragma: no cover
142 pass
144 if ca_bundle_path.is_file():
145 cafile = str(ca_bundle_path)
146 logger.debug("load_verify_locations cafile=%r", cafile)
147 context.load_verify_locations(cafile=cafile)
148 elif ca_bundle_path.is_dir():
149 capath = str(ca_bundle_path)
150 logger.debug("load_verify_locations capath=%r", capath)
151 context.load_verify_locations(capath=capath)
153 self._load_client_certs(context)
155 return context
157 def _create_default_ssl_context(self) -> ssl.SSLContext:
158 """
159 Creates the default SSLContext object that's used for both verified
160 and unverified connections.
161 """
162 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
163 set_minimum_tls_version_1_2(context)
164 context.options |= ssl.OP_NO_COMPRESSION
165 context.set_ciphers(DEFAULT_CIPHERS)
167 if ssl.HAS_ALPN:
168 alpn_idents = ["http/1.1", "h2"] if self.http2 else ["http/1.1"]
169 context.set_alpn_protocols(alpn_idents)
171 keylogfile = os.environ.get("SSLKEYLOGFILE")
172 if keylogfile and self.trust_env:
173 context.keylog_filename = keylogfile
175 return context
177 def _load_client_certs(self, ssl_context: ssl.SSLContext) -> None:
178 """
179 Loads client certificates into our SSLContext object
180 """
181 if self.cert is not None:
182 if isinstance(self.cert, str):
183 ssl_context.load_cert_chain(certfile=self.cert)
184 elif isinstance(self.cert, tuple) and len(self.cert) == 2:
185 ssl_context.load_cert_chain(certfile=self.cert[0], keyfile=self.cert[1])
186 elif isinstance(self.cert, tuple) and len(self.cert) == 3:
187 ssl_context.load_cert_chain(
188 certfile=self.cert[0],
189 keyfile=self.cert[1],
190 password=self.cert[2],
191 )
194class Timeout:
195 """
196 Timeout configuration.
198 **Usage**:
200 Timeout(None) # No timeouts.
201 Timeout(5.0) # 5s timeout on all operations.
202 Timeout(None, connect=5.0) # 5s timeout on connect, no other timeouts.
203 Timeout(5.0, connect=10.0) # 10s timeout on connect. 5s timeout elsewhere.
204 Timeout(5.0, pool=None) # No timeout on acquiring connection from pool.
205 # 5s timeout elsewhere.
206 """
208 def __init__(
209 self,
210 timeout: TimeoutTypes | UnsetType = UNSET,
211 *,
212 connect: None | float | UnsetType = UNSET,
213 read: None | float | UnsetType = UNSET,
214 write: None | float | UnsetType = UNSET,
215 pool: None | float | UnsetType = UNSET,
216 ) -> None:
217 if isinstance(timeout, Timeout):
218 # Passed as a single explicit Timeout.
219 assert connect is UNSET
220 assert read is UNSET
221 assert write is UNSET
222 assert pool is UNSET
223 self.connect = timeout.connect # type: typing.Optional[float]
224 self.read = timeout.read # type: typing.Optional[float]
225 self.write = timeout.write # type: typing.Optional[float]
226 self.pool = timeout.pool # type: typing.Optional[float]
227 elif isinstance(timeout, tuple):
228 # Passed as a tuple.
229 self.connect = timeout[0]
230 self.read = timeout[1]
231 self.write = None if len(timeout) < 3 else timeout[2]
232 self.pool = None if len(timeout) < 4 else timeout[3]
233 elif not (
234 isinstance(connect, UnsetType)
235 or isinstance(read, UnsetType)
236 or isinstance(write, UnsetType)
237 or isinstance(pool, UnsetType)
238 ):
239 self.connect = connect
240 self.read = read
241 self.write = write
242 self.pool = pool
243 else:
244 if isinstance(timeout, UnsetType):
245 raise ValueError(
246 "httpx.Timeout must either include a default, or set all "
247 "four parameters explicitly."
248 )
249 self.connect = timeout if isinstance(connect, UnsetType) else connect
250 self.read = timeout if isinstance(read, UnsetType) else read
251 self.write = timeout if isinstance(write, UnsetType) else write
252 self.pool = timeout if isinstance(pool, UnsetType) else pool
254 def as_dict(self) -> dict[str, float | None]:
255 return {
256 "connect": self.connect,
257 "read": self.read,
258 "write": self.write,
259 "pool": self.pool,
260 }
262 def __eq__(self, other: typing.Any) -> bool:
263 return (
264 isinstance(other, self.__class__)
265 and self.connect == other.connect
266 and self.read == other.read
267 and self.write == other.write
268 and self.pool == other.pool
269 )
271 def __repr__(self) -> str:
272 class_name = self.__class__.__name__
273 if len({self.connect, self.read, self.write, self.pool}) == 1:
274 return f"{class_name}(timeout={self.connect})"
275 return (
276 f"{class_name}(connect={self.connect}, "
277 f"read={self.read}, write={self.write}, pool={self.pool})"
278 )
281class Limits:
282 """
283 Configuration for limits to various client behaviors.
285 **Parameters:**
287 * **max_connections** - The maximum number of concurrent connections that may be
288 established.
289 * **max_keepalive_connections** - Allow the connection pool to maintain
290 keep-alive connections below this point. Should be less than or equal
291 to `max_connections`.
292 * **keepalive_expiry** - Time limit on idle keep-alive connections in seconds.
293 """
295 def __init__(
296 self,
297 *,
298 max_connections: int | None = None,
299 max_keepalive_connections: int | None = None,
300 keepalive_expiry: float | None = 5.0,
301 ) -> None:
302 self.max_connections = max_connections
303 self.max_keepalive_connections = max_keepalive_connections
304 self.keepalive_expiry = keepalive_expiry
306 def __eq__(self, other: typing.Any) -> bool:
307 return (
308 isinstance(other, self.__class__)
309 and self.max_connections == other.max_connections
310 and self.max_keepalive_connections == other.max_keepalive_connections
311 and self.keepalive_expiry == other.keepalive_expiry
312 )
314 def __repr__(self) -> str:
315 class_name = self.__class__.__name__
316 return (
317 f"{class_name}(max_connections={self.max_connections}, "
318 f"max_keepalive_connections={self.max_keepalive_connections}, "
319 f"keepalive_expiry={self.keepalive_expiry})"
320 )
323class Proxy:
324 def __init__(
325 self,
326 url: URLTypes,
327 *,
328 ssl_context: ssl.SSLContext | None = None,
329 auth: tuple[str, str] | None = None,
330 headers: HeaderTypes | None = None,
331 ) -> None:
332 url = URL(url)
333 headers = Headers(headers)
335 if url.scheme not in ("http", "https", "socks5"):
336 raise ValueError(f"Unknown scheme for proxy URL {url!r}")
338 if url.username or url.password:
339 # Remove any auth credentials from the URL.
340 auth = (url.username, url.password)
341 url = url.copy_with(username=None, password=None)
343 self.url = url
344 self.auth = auth
345 self.headers = headers
346 self.ssl_context = ssl_context
348 @property
349 def raw_auth(self) -> tuple[bytes, bytes] | None:
350 # The proxy authentication as raw bytes.
351 return (
352 None
353 if self.auth is None
354 else (self.auth[0].encode("utf-8"), self.auth[1].encode("utf-8"))
355 )
357 def __repr__(self) -> str:
358 # The authentication is represented with the password component masked.
359 auth = (self.auth[0], "********") if self.auth else None
361 # Build a nice concise representation.
362 url_str = f"{str(self.url)!r}"
363 auth_str = f", auth={auth!r}" if auth else ""
364 headers_str = f", headers={dict(self.headers)!r}" if self.headers else ""
365 return f"Proxy({url_str}{auth_str}{headers_str})"
368DEFAULT_TIMEOUT_CONFIG = Timeout(timeout=5.0)
369DEFAULT_LIMITS = Limits(max_connections=100, max_keepalive_connections=20)
370DEFAULT_MAX_REDIRECTS = 20