Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/urllib3/util/ssl_.py: 30%
174 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1from __future__ import annotations
3import hmac
4import os
5import socket
6import sys
7import typing
8import warnings
9from binascii import unhexlify
10from hashlib import md5, sha1, sha256
12from ..exceptions import ProxySchemeUnsupported, SSLError
13from .url import _BRACELESS_IPV6_ADDRZ_RE, _IPV4_RE
15SSLContext = None
16SSLTransport = None
17HAS_NEVER_CHECK_COMMON_NAME = False
18IS_PYOPENSSL = False
19IS_SECURETRANSPORT = False
20ALPN_PROTOCOLS = ["http/1.1"]
22_TYPE_VERSION_INFO = typing.Tuple[int, int, int, str, int]
24# Maps the length of a digest to a possible hash function producing this digest
25HASHFUNC_MAP = {32: md5, 40: sha1, 64: sha256}
28def _is_bpo_43522_fixed(
29 implementation_name: str, version_info: _TYPE_VERSION_INFO
30) -> bool:
31 """Return True for CPython 3.8.9+, 3.9.3+ or 3.10+ where setting
32 SSLContext.hostname_checks_common_name to False works.
34 PyPy 7.3.7 doesn't work as it doesn't ship with OpenSSL 1.1.1l+
35 so we're waiting for a version of PyPy that works before
36 allowing this function to return 'True'.
38 Outside of CPython and PyPy we don't know which implementations work
39 or not so we conservatively use our hostname matching as we know that works
40 on all implementations.
42 https://github.com/urllib3/urllib3/issues/2192#issuecomment-821832963
43 https://foss.heptapod.net/pypy/pypy/-/issues/3539#
44 """
45 if implementation_name != "cpython":
46 return False
48 major_minor = version_info[:2]
49 micro = version_info[2]
50 return (
51 (major_minor == (3, 8) and micro >= 9)
52 or (major_minor == (3, 9) and micro >= 3)
53 or major_minor >= (3, 10)
54 )
57def _is_has_never_check_common_name_reliable(
58 openssl_version_number: int,
59 implementation_name: str,
60 version_info: _TYPE_VERSION_INFO,
61) -> bool:
62 # Before fixing OpenSSL issue #14579, the SSL_new() API was not copying hostflags
63 # like X509_CHECK_FLAG_NEVER_CHECK_SUBJECT, which tripped up CPython.
64 # https://github.com/openssl/openssl/issues/14579
65 # This was released in OpenSSL 1.1.1l+ (>=0x101010cf)
66 is_openssl_issue_14579_fixed = openssl_version_number >= 0x101010CF
68 return is_openssl_issue_14579_fixed or _is_bpo_43522_fixed(
69 implementation_name, version_info
70 )
73if typing.TYPE_CHECKING:
74 from ssl import VerifyMode
76 from typing_extensions import Literal, TypedDict
78 from .ssltransport import SSLTransport as SSLTransportType
80 class _TYPE_PEER_CERT_RET_DICT(TypedDict, total=False):
81 subjectAltName: tuple[tuple[str, str], ...]
82 subject: tuple[tuple[tuple[str, str], ...], ...]
83 serialNumber: str
86# Mapping from 'ssl.PROTOCOL_TLSX' to 'TLSVersion.X'
87_SSL_VERSION_TO_TLS_VERSION: dict[int, int] = {}
89try: # Do we have ssl at all?
90 import ssl
91 from ssl import ( # type: ignore[assignment]
92 CERT_REQUIRED,
93 HAS_NEVER_CHECK_COMMON_NAME,
94 OP_NO_COMPRESSION,
95 OP_NO_TICKET,
96 OPENSSL_VERSION_NUMBER,
97 PROTOCOL_TLS,
98 PROTOCOL_TLS_CLIENT,
99 OP_NO_SSLv2,
100 OP_NO_SSLv3,
101 SSLContext,
102 TLSVersion,
103 )
105 PROTOCOL_SSLv23 = PROTOCOL_TLS
107 # Setting SSLContext.hostname_checks_common_name = False didn't work before CPython
108 # 3.8.9, 3.9.3, and 3.10 (but OK on PyPy) or OpenSSL 1.1.1l+
109 if HAS_NEVER_CHECK_COMMON_NAME and not _is_has_never_check_common_name_reliable(
110 OPENSSL_VERSION_NUMBER,
111 sys.implementation.name,
112 sys.version_info,
113 ):
114 HAS_NEVER_CHECK_COMMON_NAME = False
116 # Need to be careful here in case old TLS versions get
117 # removed in future 'ssl' module implementations.
118 for attr in ("TLSv1", "TLSv1_1", "TLSv1_2"):
119 try:
120 _SSL_VERSION_TO_TLS_VERSION[getattr(ssl, f"PROTOCOL_{attr}")] = getattr(
121 TLSVersion, attr
122 )
123 except AttributeError: # Defensive:
124 continue
126 from .ssltransport import SSLTransport # type: ignore[assignment]
127except ImportError:
128 OP_NO_COMPRESSION = 0x20000 # type: ignore[assignment]
129 OP_NO_TICKET = 0x4000 # type: ignore[assignment]
130 OP_NO_SSLv2 = 0x1000000 # type: ignore[assignment]
131 OP_NO_SSLv3 = 0x2000000 # type: ignore[assignment]
132 PROTOCOL_SSLv23 = PROTOCOL_TLS = 2 # type: ignore[assignment]
133 PROTOCOL_TLS_CLIENT = 16 # type: ignore[assignment]
136_TYPE_PEER_CERT_RET = typing.Union["_TYPE_PEER_CERT_RET_DICT", bytes, None]
139def assert_fingerprint(cert: bytes | None, fingerprint: str) -> None:
140 """
141 Checks if given fingerprint matches the supplied certificate.
143 :param cert:
144 Certificate as bytes object.
145 :param fingerprint:
146 Fingerprint as string of hexdigits, can be interspersed by colons.
147 """
149 if cert is None:
150 raise SSLError("No certificate for the peer.")
152 fingerprint = fingerprint.replace(":", "").lower()
153 digest_length = len(fingerprint)
154 hashfunc = HASHFUNC_MAP.get(digest_length)
155 if not hashfunc:
156 raise SSLError(f"Fingerprint of invalid length: {fingerprint}")
158 # We need encode() here for py32; works on py2 and p33.
159 fingerprint_bytes = unhexlify(fingerprint.encode())
161 cert_digest = hashfunc(cert).digest()
163 if not hmac.compare_digest(cert_digest, fingerprint_bytes):
164 raise SSLError(
165 f'Fingerprints did not match. Expected "{fingerprint}", got "{cert_digest.hex()}"'
166 )
169def resolve_cert_reqs(candidate: None | int | str) -> VerifyMode:
170 """
171 Resolves the argument to a numeric constant, which can be passed to
172 the wrap_socket function/method from the ssl module.
173 Defaults to :data:`ssl.CERT_REQUIRED`.
174 If given a string it is assumed to be the name of the constant in the
175 :mod:`ssl` module or its abbreviation.
176 (So you can specify `REQUIRED` instead of `CERT_REQUIRED`.
177 If it's neither `None` nor a string we assume it is already the numeric
178 constant which can directly be passed to wrap_socket.
179 """
180 if candidate is None:
181 return CERT_REQUIRED
183 if isinstance(candidate, str):
184 res = getattr(ssl, candidate, None)
185 if res is None:
186 res = getattr(ssl, "CERT_" + candidate)
187 return res # type: ignore[no-any-return]
189 return candidate # type: ignore[return-value]
192def resolve_ssl_version(candidate: None | int | str) -> int:
193 """
194 like resolve_cert_reqs
195 """
196 if candidate is None:
197 return PROTOCOL_TLS
199 if isinstance(candidate, str):
200 res = getattr(ssl, candidate, None)
201 if res is None:
202 res = getattr(ssl, "PROTOCOL_" + candidate)
203 return typing.cast(int, res)
205 return candidate
208def create_urllib3_context(
209 ssl_version: int | None = None,
210 cert_reqs: int | None = None,
211 options: int | None = None,
212 ciphers: str | None = None,
213 ssl_minimum_version: int | None = None,
214 ssl_maximum_version: int | None = None,
215) -> ssl.SSLContext:
216 """Creates and configures an :class:`ssl.SSLContext` instance for use with urllib3.
218 :param ssl_version:
219 The desired protocol version to use. This will default to
220 PROTOCOL_SSLv23 which will negotiate the highest protocol that both
221 the server and your installation of OpenSSL support.
223 This parameter is deprecated instead use 'ssl_minimum_version'.
224 :param ssl_minimum_version:
225 The minimum version of TLS to be used. Use the 'ssl.TLSVersion' enum for specifying the value.
226 :param ssl_maximum_version:
227 The maximum version of TLS to be used. Use the 'ssl.TLSVersion' enum for specifying the value.
228 Not recommended to set to anything other than 'ssl.TLSVersion.MAXIMUM_SUPPORTED' which is the
229 default value.
230 :param cert_reqs:
231 Whether to require the certificate verification. This defaults to
232 ``ssl.CERT_REQUIRED``.
233 :param options:
234 Specific OpenSSL options. These default to ``ssl.OP_NO_SSLv2``,
235 ``ssl.OP_NO_SSLv3``, ``ssl.OP_NO_COMPRESSION``, and ``ssl.OP_NO_TICKET``.
236 :param ciphers:
237 Which cipher suites to allow the server to select. Defaults to either system configured
238 ciphers if OpenSSL 1.1.1+, otherwise uses a secure default set of ciphers.
239 :returns:
240 Constructed SSLContext object with specified options
241 :rtype: SSLContext
242 """
243 if SSLContext is None:
244 raise TypeError("Can't create an SSLContext object without an ssl module")
246 # This means 'ssl_version' was specified as an exact value.
247 if ssl_version not in (None, PROTOCOL_TLS, PROTOCOL_TLS_CLIENT):
248 # Disallow setting 'ssl_version' and 'ssl_minimum|maximum_version'
249 # to avoid conflicts.
250 if ssl_minimum_version is not None or ssl_maximum_version is not None:
251 raise ValueError(
252 "Can't specify both 'ssl_version' and either "
253 "'ssl_minimum_version' or 'ssl_maximum_version'"
254 )
256 # 'ssl_version' is deprecated and will be removed in the future.
257 else:
258 # Use 'ssl_minimum_version' and 'ssl_maximum_version' instead.
259 ssl_minimum_version = _SSL_VERSION_TO_TLS_VERSION.get(
260 ssl_version, TLSVersion.MINIMUM_SUPPORTED
261 )
262 ssl_maximum_version = _SSL_VERSION_TO_TLS_VERSION.get(
263 ssl_version, TLSVersion.MAXIMUM_SUPPORTED
264 )
266 # This warning message is pushing users to use 'ssl_minimum_version'
267 # instead of both min/max. Best practice is to only set the minimum version and
268 # keep the maximum version to be it's default value: 'TLSVersion.MAXIMUM_SUPPORTED'
269 warnings.warn(
270 "'ssl_version' option is deprecated and will be "
271 "removed in urllib3 v2.1.0. Instead use 'ssl_minimum_version'",
272 category=DeprecationWarning,
273 stacklevel=2,
274 )
276 # PROTOCOL_TLS is deprecated in Python 3.10 so we always use PROTOCOL_TLS_CLIENT
277 context = SSLContext(PROTOCOL_TLS_CLIENT)
279 if ssl_minimum_version is not None:
280 context.minimum_version = ssl_minimum_version
281 else: # Python <3.10 defaults to 'MINIMUM_SUPPORTED' so explicitly set TLSv1.2 here
282 context.minimum_version = TLSVersion.TLSv1_2
284 if ssl_maximum_version is not None:
285 context.maximum_version = ssl_maximum_version
287 # Unless we're given ciphers defer to either system ciphers in
288 # the case of OpenSSL 1.1.1+ or use our own secure default ciphers.
289 if ciphers:
290 context.set_ciphers(ciphers)
292 # Setting the default here, as we may have no ssl module on import
293 cert_reqs = ssl.CERT_REQUIRED if cert_reqs is None else cert_reqs
295 if options is None:
296 options = 0
297 # SSLv2 is easily broken and is considered harmful and dangerous
298 options |= OP_NO_SSLv2
299 # SSLv3 has several problems and is now dangerous
300 options |= OP_NO_SSLv3
301 # Disable compression to prevent CRIME attacks for OpenSSL 1.0+
302 # (issue #309)
303 options |= OP_NO_COMPRESSION
304 # TLSv1.2 only. Unless set explicitly, do not request tickets.
305 # This may save some bandwidth on wire, and although the ticket is encrypted,
306 # there is a risk associated with it being on wire,
307 # if the server is not rotating its ticketing keys properly.
308 options |= OP_NO_TICKET
310 context.options |= options
312 # Enable post-handshake authentication for TLS 1.3, see GH #1634. PHA is
313 # necessary for conditional client cert authentication with TLS 1.3.
314 # The attribute is None for OpenSSL <= 1.1.0 or does not exist in older
315 # versions of Python. We only enable on Python 3.7.4+ or if certificate
316 # verification is enabled to work around Python issue #37428
317 # See: https://bugs.python.org/issue37428
318 if (cert_reqs == ssl.CERT_REQUIRED or sys.version_info >= (3, 7, 4)) and getattr(
319 context, "post_handshake_auth", None
320 ) is not None:
321 context.post_handshake_auth = True
323 # The order of the below lines setting verify_mode and check_hostname
324 # matter due to safe-guards SSLContext has to prevent an SSLContext with
325 # check_hostname=True, verify_mode=NONE/OPTIONAL.
326 # We always set 'check_hostname=False' for pyOpenSSL so we rely on our own
327 # 'ssl.match_hostname()' implementation.
328 if cert_reqs == ssl.CERT_REQUIRED and not IS_PYOPENSSL:
329 context.verify_mode = cert_reqs
330 context.check_hostname = True
331 else:
332 context.check_hostname = False
333 context.verify_mode = cert_reqs
335 try:
336 context.hostname_checks_common_name = False
337 except AttributeError:
338 pass
340 # Enable logging of TLS session keys via defacto standard environment variable
341 # 'SSLKEYLOGFILE', if the feature is available (Python 3.8+). Skip empty values.
342 if hasattr(context, "keylog_filename"):
343 sslkeylogfile = os.environ.get("SSLKEYLOGFILE")
344 if sslkeylogfile:
345 context.keylog_filename = sslkeylogfile
347 return context
350@typing.overload
351def ssl_wrap_socket(
352 sock: socket.socket,
353 keyfile: str | None = ...,
354 certfile: str | None = ...,
355 cert_reqs: int | None = ...,
356 ca_certs: str | None = ...,
357 server_hostname: str | None = ...,
358 ssl_version: int | None = ...,
359 ciphers: str | None = ...,
360 ssl_context: ssl.SSLContext | None = ...,
361 ca_cert_dir: str | None = ...,
362 key_password: str | None = ...,
363 ca_cert_data: None | str | bytes = ...,
364 tls_in_tls: Literal[False] = ...,
365) -> ssl.SSLSocket:
366 ...
369@typing.overload
370def ssl_wrap_socket(
371 sock: socket.socket,
372 keyfile: str | None = ...,
373 certfile: str | None = ...,
374 cert_reqs: int | None = ...,
375 ca_certs: str | None = ...,
376 server_hostname: str | None = ...,
377 ssl_version: int | None = ...,
378 ciphers: str | None = ...,
379 ssl_context: ssl.SSLContext | None = ...,
380 ca_cert_dir: str | None = ...,
381 key_password: str | None = ...,
382 ca_cert_data: None | str | bytes = ...,
383 tls_in_tls: bool = ...,
384) -> ssl.SSLSocket | SSLTransportType:
385 ...
388def ssl_wrap_socket(
389 sock: socket.socket,
390 keyfile: str | None = None,
391 certfile: str | None = None,
392 cert_reqs: int | None = None,
393 ca_certs: str | None = None,
394 server_hostname: str | None = None,
395 ssl_version: int | None = None,
396 ciphers: str | None = None,
397 ssl_context: ssl.SSLContext | None = None,
398 ca_cert_dir: str | None = None,
399 key_password: str | None = None,
400 ca_cert_data: None | str | bytes = None,
401 tls_in_tls: bool = False,
402) -> ssl.SSLSocket | SSLTransportType:
403 """
404 All arguments except for server_hostname, ssl_context, and ca_cert_dir have
405 the same meaning as they do when using :func:`ssl.wrap_socket`.
407 :param server_hostname:
408 When SNI is supported, the expected hostname of the certificate
409 :param ssl_context:
410 A pre-made :class:`SSLContext` object. If none is provided, one will
411 be created using :func:`create_urllib3_context`.
412 :param ciphers:
413 A string of ciphers we wish the client to support.
414 :param ca_cert_dir:
415 A directory containing CA certificates in multiple separate files, as
416 supported by OpenSSL's -CApath flag or the capath argument to
417 SSLContext.load_verify_locations().
418 :param key_password:
419 Optional password if the keyfile is encrypted.
420 :param ca_cert_data:
421 Optional string containing CA certificates in PEM format suitable for
422 passing as the cadata parameter to SSLContext.load_verify_locations()
423 :param tls_in_tls:
424 Use SSLTransport to wrap the existing socket.
425 """
426 context = ssl_context
427 if context is None:
428 # Note: This branch of code and all the variables in it are only used in tests.
429 # We should consider deprecating and removing this code.
430 context = create_urllib3_context(ssl_version, cert_reqs, ciphers=ciphers)
432 if ca_certs or ca_cert_dir or ca_cert_data:
433 try:
434 context.load_verify_locations(ca_certs, ca_cert_dir, ca_cert_data)
435 except OSError as e:
436 raise SSLError(e) from e
438 elif ssl_context is None and hasattr(context, "load_default_certs"):
439 # try to load OS default certs; works well on Windows.
440 context.load_default_certs()
442 # Attempt to detect if we get the goofy behavior of the
443 # keyfile being encrypted and OpenSSL asking for the
444 # passphrase via the terminal and instead error out.
445 if keyfile and key_password is None and _is_key_file_encrypted(keyfile):
446 raise SSLError("Client private key is encrypted, password is required")
448 if certfile:
449 if key_password is None:
450 context.load_cert_chain(certfile, keyfile)
451 else:
452 context.load_cert_chain(certfile, keyfile, key_password)
454 try:
455 context.set_alpn_protocols(ALPN_PROTOCOLS)
456 except NotImplementedError: # Defensive: in CI, we always have set_alpn_protocols
457 pass
459 ssl_sock = _ssl_wrap_socket_impl(sock, context, tls_in_tls, server_hostname)
460 return ssl_sock
463def is_ipaddress(hostname: str | bytes) -> bool:
464 """Detects whether the hostname given is an IPv4 or IPv6 address.
465 Also detects IPv6 addresses with Zone IDs.
467 :param str hostname: Hostname to examine.
468 :return: True if the hostname is an IP address, False otherwise.
469 """
470 if isinstance(hostname, bytes):
471 # IDN A-label bytes are ASCII compatible.
472 hostname = hostname.decode("ascii")
473 return bool(_IPV4_RE.match(hostname) or _BRACELESS_IPV6_ADDRZ_RE.match(hostname))
476def _is_key_file_encrypted(key_file: str) -> bool:
477 """Detects if a key file is encrypted or not."""
478 with open(key_file) as f:
479 for line in f:
480 # Look for Proc-Type: 4,ENCRYPTED
481 if "ENCRYPTED" in line:
482 return True
484 return False
487def _ssl_wrap_socket_impl(
488 sock: socket.socket,
489 ssl_context: ssl.SSLContext,
490 tls_in_tls: bool,
491 server_hostname: str | None = None,
492) -> ssl.SSLSocket | SSLTransportType:
493 if tls_in_tls:
494 if not SSLTransport:
495 # Import error, ssl is not available.
496 raise ProxySchemeUnsupported(
497 "TLS in TLS requires support for the 'ssl' module"
498 )
500 SSLTransport._validate_ssl_context_for_tls_in_tls(ssl_context)
501 return SSLTransport(sock, ssl_context, server_hostname)
503 return ssl_context.wrap_socket(sock, server_hostname=server_hostname)