Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpx/_config.py: 35%
150 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
1import os
2import ssl
3import sys
4import typing
5from pathlib import Path
7import certifi
9from ._compat import set_minimum_tls_version_1_2
10from ._models import Headers
11from ._types import CertTypes, HeaderTypes, TimeoutTypes, URLTypes, VerifyTypes
12from ._urls import URL
13from ._utils import get_ca_bundle_from_env, get_logger
15DEFAULT_CIPHERS = ":".join(
16 [
17 "ECDHE+AESGCM",
18 "ECDHE+CHACHA20",
19 "DHE+AESGCM",
20 "DHE+CHACHA20",
21 "ECDH+AESGCM",
22 "DH+AESGCM",
23 "ECDH+AES",
24 "DH+AES",
25 "RSA+AESGCM",
26 "RSA+AES",
27 "!aNULL",
28 "!eNULL",
29 "!MD5",
30 "!DSS",
31 ]
32)
35logger = get_logger(__name__)
38class UnsetType:
39 pass # pragma: no cover
42UNSET = UnsetType()
45def create_ssl_context(
46 cert: typing.Optional[CertTypes] = None,
47 verify: VerifyTypes = True,
48 trust_env: bool = True,
49 http2: bool = False,
50) -> ssl.SSLContext:
51 return SSLConfig(
52 cert=cert, verify=verify, trust_env=trust_env, http2=http2
53 ).ssl_context
56class SSLConfig:
57 """
58 SSL Configuration.
59 """
61 DEFAULT_CA_BUNDLE_PATH = Path(certifi.where())
63 def __init__(
64 self,
65 *,
66 cert: typing.Optional[CertTypes] = None,
67 verify: VerifyTypes = True,
68 trust_env: bool = True,
69 http2: bool = False,
70 ):
71 self.cert = cert
72 self.verify = verify
73 self.trust_env = trust_env
74 self.http2 = http2
75 self.ssl_context = self.load_ssl_context()
77 def load_ssl_context(self) -> ssl.SSLContext:
78 logger.trace(
79 f"load_ssl_context "
80 f"verify={self.verify!r} "
81 f"cert={self.cert!r} "
82 f"trust_env={self.trust_env!r} "
83 f"http2={self.http2!r}"
84 )
86 if self.verify:
87 return self.load_ssl_context_verify()
88 return self.load_ssl_context_no_verify()
90 def load_ssl_context_no_verify(self) -> ssl.SSLContext:
91 """
92 Return an SSL context for unverified connections.
93 """
94 context = self._create_default_ssl_context()
95 context.check_hostname = False
96 context.verify_mode = ssl.CERT_NONE
97 self._load_client_certs(context)
98 return context
100 def load_ssl_context_verify(self) -> ssl.SSLContext:
101 """
102 Return an SSL context for verified connections.
103 """
104 if self.trust_env and self.verify is True:
105 ca_bundle = get_ca_bundle_from_env()
106 if ca_bundle is not None:
107 self.verify = ca_bundle
109 if isinstance(self.verify, ssl.SSLContext):
110 # Allow passing in our own SSLContext object that's pre-configured.
111 context = self.verify
112 self._load_client_certs(context)
113 return context
114 elif isinstance(self.verify, bool):
115 ca_bundle_path = self.DEFAULT_CA_BUNDLE_PATH
116 elif Path(self.verify).exists():
117 ca_bundle_path = Path(self.verify)
118 else:
119 raise IOError(
120 "Could not find a suitable TLS CA certificate bundle, "
121 "invalid path: {}".format(self.verify)
122 )
124 context = self._create_default_ssl_context()
125 context.verify_mode = ssl.CERT_REQUIRED
126 context.check_hostname = True
128 # Signal to server support for PHA in TLS 1.3. Raises an
129 # AttributeError if only read-only access is implemented.
130 if sys.version_info >= (3, 8): # pragma: no cover
131 try:
132 context.post_handshake_auth = True
133 except AttributeError: # pragma: no cover
134 pass
136 # Disable using 'commonName' for SSLContext.check_hostname
137 # when the 'subjectAltName' extension isn't available.
138 try:
139 context.hostname_checks_common_name = False
140 except AttributeError: # pragma: no cover
141 pass
143 if ca_bundle_path.is_file():
144 logger.trace(f"load_verify_locations cafile={ca_bundle_path!s}")
145 context.load_verify_locations(cafile=str(ca_bundle_path))
146 elif ca_bundle_path.is_dir():
147 logger.trace(f"load_verify_locations capath={ca_bundle_path!s}")
148 context.load_verify_locations(capath=str(ca_bundle_path))
150 self._load_client_certs(context)
152 return context
154 def _create_default_ssl_context(self) -> ssl.SSLContext:
155 """
156 Creates the default SSLContext object that's used for both verified
157 and unverified connections.
158 """
159 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
160 set_minimum_tls_version_1_2(context)
161 context.options |= ssl.OP_NO_COMPRESSION
162 context.set_ciphers(DEFAULT_CIPHERS)
164 if ssl.HAS_ALPN:
165 alpn_idents = ["http/1.1", "h2"] if self.http2 else ["http/1.1"]
166 context.set_alpn_protocols(alpn_idents)
168 if sys.version_info >= (3, 8): # pragma: no cover
169 keylogfile = os.environ.get("SSLKEYLOGFILE")
170 if keylogfile and self.trust_env:
171 context.keylog_filename = keylogfile
173 return context
175 def _load_client_certs(self, ssl_context: ssl.SSLContext) -> None:
176 """
177 Loads client certificates into our SSLContext object
178 """
179 if self.cert is not None:
180 if isinstance(self.cert, str):
181 ssl_context.load_cert_chain(certfile=self.cert)
182 elif isinstance(self.cert, tuple) and len(self.cert) == 2:
183 ssl_context.load_cert_chain(certfile=self.cert[0], keyfile=self.cert[1])
184 elif isinstance(self.cert, tuple) and len(self.cert) == 3:
185 ssl_context.load_cert_chain(
186 certfile=self.cert[0],
187 keyfile=self.cert[1],
188 password=self.cert[2], # type: ignore
189 )
192class Timeout:
193 """
194 Timeout configuration.
196 **Usage**:
198 Timeout(None) # No timeouts.
199 Timeout(5.0) # 5s timeout on all operations.
200 Timeout(None, connect=5.0) # 5s timeout on connect, no other timeouts.
201 Timeout(5.0, connect=10.0) # 10s timeout on connect. 5s timeout elsewhere.
202 Timeout(5.0, pool=None) # No timeout on acquiring connection from pool.
203 # 5s timeout elsewhere.
204 """
206 def __init__(
207 self,
208 timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET,
209 *,
210 connect: typing.Union[None, float, UnsetType] = UNSET,
211 read: typing.Union[None, float, UnsetType] = UNSET,
212 write: typing.Union[None, float, UnsetType] = UNSET,
213 pool: typing.Union[None, float, UnsetType] = UNSET,
214 ):
215 if isinstance(timeout, Timeout):
216 # Passed as a single explicit Timeout.
217 assert connect is UNSET
218 assert read is UNSET
219 assert write is UNSET
220 assert pool is UNSET
221 self.connect = timeout.connect # type: typing.Optional[float]
222 self.read = timeout.read # type: typing.Optional[float]
223 self.write = timeout.write # type: typing.Optional[float]
224 self.pool = timeout.pool # type: typing.Optional[float]
225 elif isinstance(timeout, tuple):
226 # Passed as a tuple.
227 self.connect = timeout[0]
228 self.read = timeout[1]
229 self.write = None if len(timeout) < 3 else timeout[2]
230 self.pool = None if len(timeout) < 4 else timeout[3]
231 elif not (
232 isinstance(connect, UnsetType)
233 or isinstance(read, UnsetType)
234 or isinstance(write, UnsetType)
235 or isinstance(pool, UnsetType)
236 ):
237 self.connect = connect
238 self.read = read
239 self.write = write
240 self.pool = pool
241 else:
242 if isinstance(timeout, UnsetType):
243 raise ValueError(
244 "httpx.Timeout must either include a default, or set all "
245 "four parameters explicitly."
246 )
247 self.connect = timeout if isinstance(connect, UnsetType) else connect
248 self.read = timeout if isinstance(read, UnsetType) else read
249 self.write = timeout if isinstance(write, UnsetType) else write
250 self.pool = timeout if isinstance(pool, UnsetType) else pool
252 def as_dict(self) -> typing.Dict[str, typing.Optional[float]]:
253 return {
254 "connect": self.connect,
255 "read": self.read,
256 "write": self.write,
257 "pool": self.pool,
258 }
260 def __eq__(self, other: typing.Any) -> bool:
261 return (
262 isinstance(other, self.__class__)
263 and self.connect == other.connect
264 and self.read == other.read
265 and self.write == other.write
266 and self.pool == other.pool
267 )
269 def __repr__(self) -> str:
270 class_name = self.__class__.__name__
271 if len({self.connect, self.read, self.write, self.pool}) == 1:
272 return f"{class_name}(timeout={self.connect})"
273 return (
274 f"{class_name}(connect={self.connect}, "
275 f"read={self.read}, write={self.write}, pool={self.pool})"
276 )
279class Limits:
280 """
281 Configuration for limits to various client behaviors.
283 **Parameters:**
285 * **max_connections** - The maximum number of concurrent connections that may be
286 established.
287 * **max_keepalive_connections** - Allow the connection pool to maintain
288 keep-alive connections below this point. Should be less than or equal
289 to `max_connections`.
290 * **keepalive_expiry** - Time limit on idle keep-alive connections in seconds.
291 """
293 def __init__(
294 self,
295 *,
296 max_connections: typing.Optional[int] = None,
297 max_keepalive_connections: typing.Optional[int] = None,
298 keepalive_expiry: typing.Optional[float] = 5.0,
299 ):
300 self.max_connections = max_connections
301 self.max_keepalive_connections = max_keepalive_connections
302 self.keepalive_expiry = keepalive_expiry
304 def __eq__(self, other: typing.Any) -> bool:
305 return (
306 isinstance(other, self.__class__)
307 and self.max_connections == other.max_connections
308 and self.max_keepalive_connections == other.max_keepalive_connections
309 and self.keepalive_expiry == other.keepalive_expiry
310 )
312 def __repr__(self) -> str:
313 class_name = self.__class__.__name__
314 return (
315 f"{class_name}(max_connections={self.max_connections}, "
316 f"max_keepalive_connections={self.max_keepalive_connections}, "
317 f"keepalive_expiry={self.keepalive_expiry})"
318 )
321class Proxy:
322 def __init__(
323 self,
324 url: URLTypes,
325 *,
326 auth: typing.Optional[typing.Tuple[str, str]] = None,
327 headers: typing.Optional[HeaderTypes] = None,
328 ):
329 url = URL(url)
330 headers = Headers(headers)
332 if url.scheme not in ("http", "https", "socks5"):
333 raise ValueError(f"Unknown scheme for proxy URL {url!r}")
335 if url.username or url.password:
336 # Remove any auth credentials from the URL.
337 auth = (url.username, url.password)
338 url = url.copy_with(username=None, password=None)
340 self.url = url
341 self.auth = auth
342 self.headers = headers
344 @property
345 def raw_auth(self) -> typing.Optional[typing.Tuple[bytes, bytes]]:
346 # The proxy authentication as raw bytes.
347 return (
348 None
349 if self.auth is None
350 else (self.auth[0].encode("utf-8"), self.auth[1].encode("utf-8"))
351 )
353 def __repr__(self) -> str:
354 # The authentication is represented with the password component masked.
355 auth = (self.auth[0], "********") if self.auth else None
357 # Build a nice concise representation.
358 url_str = f"{str(self.url)!r}"
359 auth_str = f", auth={auth!r}" if auth else ""
360 headers_str = f", headers={dict(self.headers)!r}" if self.headers else ""
361 return f"Proxy({url_str}{auth_str}{headers_str})"
364DEFAULT_TIMEOUT_CONFIG = Timeout(timeout=5.0)
365DEFAULT_LIMITS = Limits(max_connections=100, max_keepalive_connections=20)
366DEFAULT_MAX_REDIRECTS = 20