1from __future__ import annotations
2
3import hashlib
4import hmac
5import os
6import socket
7import sys
8import typing
9import warnings
10from binascii import unhexlify
11
12from ..exceptions import ProxySchemeUnsupported, SSLError
13from .url import _BRACELESS_IPV6_ADDRZ_RE, _IPV4_RE
14
15SSLContext = None
16SSLTransport = None
17HAS_NEVER_CHECK_COMMON_NAME = False
18IS_PYOPENSSL = False
19ALPN_PROTOCOLS = ["http/1.1"]
20
21_TYPE_VERSION_INFO = tuple[int, int, int, str, int]
22
23# Maps the length of a digest to a possible hash function producing this digest
24HASHFUNC_MAP = {
25 length: getattr(hashlib, algorithm, None)
26 for length, algorithm in ((32, "md5"), (40, "sha1"), (64, "sha256"))
27}
28
29
30def _is_bpo_43522_fixed(
31 implementation_name: str,
32 version_info: _TYPE_VERSION_INFO,
33 pypy_version_info: _TYPE_VERSION_INFO | None,
34) -> bool:
35 """Return True for CPython 3.9.3+ or 3.10+ and PyPy 7.3.8+ where
36 setting SSLContext.hostname_checks_common_name to False works.
37
38 Outside of CPython and PyPy we don't know which implementations work
39 or not so we conservatively use our hostname matching as we know that works
40 on all implementations.
41
42 https://github.com/urllib3/urllib3/issues/2192#issuecomment-821832963
43 https://foss.heptapod.net/pypy/pypy/-/issues/3539
44 """
45 if implementation_name == "pypy":
46 # https://foss.heptapod.net/pypy/pypy/-/issues/3129
47 return pypy_version_info >= (7, 3, 8) # type: ignore[operator]
48 elif implementation_name == "cpython":
49 major_minor = version_info[:2]
50 micro = version_info[2]
51 return (major_minor == (3, 9) and micro >= 3) or major_minor >= (3, 10)
52 else: # Defensive:
53 return False
54
55
56def _is_has_never_check_common_name_reliable(
57 openssl_version: str,
58 openssl_version_number: int,
59 implementation_name: str,
60 version_info: _TYPE_VERSION_INFO,
61 pypy_version_info: _TYPE_VERSION_INFO | None,
62) -> bool:
63 # As of May 2023, all released versions of LibreSSL fail to reject certificates with
64 # only common names, see https://github.com/urllib3/urllib3/pull/3024
65 is_openssl = openssl_version.startswith("OpenSSL ")
66 # Before fixing OpenSSL issue #14579, the SSL_new() API was not copying hostflags
67 # like X509_CHECK_FLAG_NEVER_CHECK_SUBJECT, which tripped up CPython.
68 # https://github.com/openssl/openssl/issues/14579
69 # This was released in OpenSSL 1.1.1l+ (>=0x101010cf)
70 is_openssl_issue_14579_fixed = openssl_version_number >= 0x101010CF
71
72 return is_openssl and (
73 is_openssl_issue_14579_fixed
74 or _is_bpo_43522_fixed(implementation_name, version_info, pypy_version_info)
75 )
76
77
78if typing.TYPE_CHECKING:
79 from ssl import VerifyMode
80 from typing import TypedDict
81
82 from .ssltransport import SSLTransport as SSLTransportType
83
84 class _TYPE_PEER_CERT_RET_DICT(TypedDict, total=False):
85 subjectAltName: tuple[tuple[str, str], ...]
86 subject: tuple[tuple[tuple[str, str], ...], ...]
87 serialNumber: str
88
89
90# Mapping from 'ssl.PROTOCOL_TLSX' to 'TLSVersion.X'
91_SSL_VERSION_TO_TLS_VERSION: dict[int, int] = {}
92
93try: # Do we have ssl at all?
94 import ssl
95 from ssl import ( # type: ignore[assignment]
96 CERT_REQUIRED,
97 HAS_NEVER_CHECK_COMMON_NAME,
98 OP_NO_COMPRESSION,
99 OP_NO_TICKET,
100 OPENSSL_VERSION,
101 OPENSSL_VERSION_NUMBER,
102 PROTOCOL_TLS,
103 PROTOCOL_TLS_CLIENT,
104 VERIFY_X509_STRICT,
105 OP_NO_SSLv2,
106 OP_NO_SSLv3,
107 SSLContext,
108 TLSVersion,
109 )
110
111 PROTOCOL_SSLv23 = PROTOCOL_TLS
112
113 # Needed for Python 3.9 which does not define this
114 VERIFY_X509_PARTIAL_CHAIN = getattr(ssl, "VERIFY_X509_PARTIAL_CHAIN", 0x80000)
115
116 # Setting SSLContext.hostname_checks_common_name = False didn't work before CPython
117 # 3.9.3, and 3.10 (but OK on PyPy) or OpenSSL 1.1.1l+
118 if HAS_NEVER_CHECK_COMMON_NAME and not _is_has_never_check_common_name_reliable(
119 OPENSSL_VERSION,
120 OPENSSL_VERSION_NUMBER,
121 sys.implementation.name,
122 sys.version_info,
123 sys.pypy_version_info if sys.implementation.name == "pypy" else None, # type: ignore[attr-defined]
124 ): # Defensive: for Python < 3.9.3
125 HAS_NEVER_CHECK_COMMON_NAME = False
126
127 # Need to be careful here in case old TLS versions get
128 # removed in future 'ssl' module implementations.
129 for attr in ("TLSv1", "TLSv1_1", "TLSv1_2"):
130 try:
131 _SSL_VERSION_TO_TLS_VERSION[getattr(ssl, f"PROTOCOL_{attr}")] = getattr(
132 TLSVersion, attr
133 )
134 except AttributeError: # Defensive:
135 continue
136
137 from .ssltransport import SSLTransport # type: ignore[assignment]
138except ImportError:
139 OP_NO_COMPRESSION = 0x20000 # type: ignore[assignment]
140 OP_NO_TICKET = 0x4000 # type: ignore[assignment]
141 OP_NO_SSLv2 = 0x1000000 # type: ignore[assignment]
142 OP_NO_SSLv3 = 0x2000000 # type: ignore[assignment]
143 PROTOCOL_SSLv23 = PROTOCOL_TLS = 2 # type: ignore[assignment]
144 PROTOCOL_TLS_CLIENT = 16 # type: ignore[assignment]
145 VERIFY_X509_PARTIAL_CHAIN = 0x80000
146 VERIFY_X509_STRICT = 0x20 # type: ignore[assignment]
147
148
149_TYPE_PEER_CERT_RET = typing.Union["_TYPE_PEER_CERT_RET_DICT", bytes, None]
150
151
152def assert_fingerprint(cert: bytes | None, fingerprint: str) -> None:
153 """
154 Checks if given fingerprint matches the supplied certificate.
155
156 :param cert:
157 Certificate as bytes object.
158 :param fingerprint:
159 Fingerprint as string of hexdigits, can be interspersed by colons.
160 """
161
162 if cert is None:
163 raise SSLError("No certificate for the peer.")
164
165 fingerprint = fingerprint.replace(":", "").lower()
166 digest_length = len(fingerprint)
167 if digest_length not in HASHFUNC_MAP:
168 raise SSLError(f"Fingerprint of invalid length: {fingerprint}")
169 hashfunc = HASHFUNC_MAP.get(digest_length)
170 if hashfunc is None:
171 raise SSLError(
172 f"Hash function implementation unavailable for fingerprint length: {digest_length}"
173 )
174
175 # We need encode() here for py32; works on py2 and p33.
176 fingerprint_bytes = unhexlify(fingerprint.encode())
177
178 cert_digest = hashfunc(cert).digest()
179
180 if not hmac.compare_digest(cert_digest, fingerprint_bytes):
181 raise SSLError(
182 f'Fingerprints did not match. Expected "{fingerprint}", got "{cert_digest.hex()}"'
183 )
184
185
186def resolve_cert_reqs(candidate: None | int | str) -> VerifyMode:
187 """
188 Resolves the argument to a numeric constant, which can be passed to
189 the wrap_socket function/method from the ssl module.
190 Defaults to :data:`ssl.CERT_REQUIRED`.
191 If given a string it is assumed to be the name of the constant in the
192 :mod:`ssl` module or its abbreviation.
193 (So you can specify `REQUIRED` instead of `CERT_REQUIRED`.
194 If it's neither `None` nor a string we assume it is already the numeric
195 constant which can directly be passed to wrap_socket.
196 """
197 if candidate is None:
198 return CERT_REQUIRED
199
200 if isinstance(candidate, str):
201 res = getattr(ssl, candidate, None)
202 if res is None:
203 res = getattr(ssl, "CERT_" + candidate)
204 return res # type: ignore[no-any-return]
205
206 return candidate # type: ignore[return-value]
207
208
209def resolve_ssl_version(candidate: None | int | str) -> int:
210 """
211 like resolve_cert_reqs
212 """
213 if candidate is None:
214 return PROTOCOL_TLS
215
216 if isinstance(candidate, str):
217 res = getattr(ssl, candidate, None)
218 if res is None:
219 res = getattr(ssl, "PROTOCOL_" + candidate)
220 return typing.cast(int, res)
221
222 return candidate
223
224
225def create_urllib3_context(
226 ssl_version: int | None = None,
227 cert_reqs: int | None = None,
228 options: int | None = None,
229 ciphers: str | None = None,
230 ssl_minimum_version: int | None = None,
231 ssl_maximum_version: int | None = None,
232 verify_flags: int | None = None,
233) -> ssl.SSLContext:
234 """Creates and configures an :class:`ssl.SSLContext` instance for use with urllib3.
235
236 :param ssl_version:
237 The desired protocol version to use. This will default to
238 PROTOCOL_SSLv23 which will negotiate the highest protocol that both
239 the server and your installation of OpenSSL support.
240
241 This parameter is deprecated instead use 'ssl_minimum_version'.
242 :param ssl_minimum_version:
243 The minimum version of TLS to be used. Use the 'ssl.TLSVersion' enum for specifying the value.
244 :param ssl_maximum_version:
245 The maximum version of TLS to be used. Use the 'ssl.TLSVersion' enum for specifying the value.
246 Not recommended to set to anything other than 'ssl.TLSVersion.MAXIMUM_SUPPORTED' which is the
247 default value.
248 :param cert_reqs:
249 Whether to require the certificate verification. This defaults to
250 ``ssl.CERT_REQUIRED``.
251 :param options:
252 Specific OpenSSL options. These default to ``ssl.OP_NO_SSLv2``,
253 ``ssl.OP_NO_SSLv3``, ``ssl.OP_NO_COMPRESSION``, and ``ssl.OP_NO_TICKET``.
254 :param ciphers:
255 Which cipher suites to allow the server to select. Defaults to either system configured
256 ciphers if OpenSSL 1.1.1+, otherwise uses a secure default set of ciphers.
257 :param verify_flags:
258 The flags for certificate verification operations. These default to
259 ``ssl.VERIFY_X509_PARTIAL_CHAIN`` and ``ssl.VERIFY_X509_STRICT`` for Python 3.13+.
260 :returns:
261 Constructed SSLContext object with specified options
262 :rtype: SSLContext
263 """
264 if SSLContext is None:
265 raise TypeError("Can't create an SSLContext object without an ssl module")
266
267 # This means 'ssl_version' was specified as an exact value.
268 if ssl_version not in (None, PROTOCOL_TLS, PROTOCOL_TLS_CLIENT):
269 # Disallow setting 'ssl_version' and 'ssl_minimum|maximum_version'
270 # to avoid conflicts.
271 if ssl_minimum_version is not None or ssl_maximum_version is not None:
272 raise ValueError(
273 "Can't specify both 'ssl_version' and either "
274 "'ssl_minimum_version' or 'ssl_maximum_version'"
275 )
276
277 # 'ssl_version' is deprecated and will be removed in the future.
278 else:
279 # Use 'ssl_minimum_version' and 'ssl_maximum_version' instead.
280 ssl_minimum_version = _SSL_VERSION_TO_TLS_VERSION.get(
281 ssl_version, TLSVersion.MINIMUM_SUPPORTED
282 )
283 ssl_maximum_version = _SSL_VERSION_TO_TLS_VERSION.get(
284 ssl_version, TLSVersion.MAXIMUM_SUPPORTED
285 )
286
287 # This warning message is pushing users to use 'ssl_minimum_version'
288 # instead of both min/max. Best practice is to only set the minimum version and
289 # keep the maximum version to be it's default value: 'TLSVersion.MAXIMUM_SUPPORTED'
290 warnings.warn(
291 "'ssl_version' option is deprecated and will be "
292 "removed in urllib3 v2.1.0. Instead use 'ssl_minimum_version'",
293 category=DeprecationWarning,
294 stacklevel=2,
295 )
296
297 # PROTOCOL_TLS is deprecated in Python 3.10 so we always use PROTOCOL_TLS_CLIENT
298 context = SSLContext(PROTOCOL_TLS_CLIENT)
299
300 if ssl_minimum_version is not None:
301 context.minimum_version = ssl_minimum_version
302 else: # Python <3.10 defaults to 'MINIMUM_SUPPORTED' so explicitly set TLSv1.2 here
303 context.minimum_version = TLSVersion.TLSv1_2
304
305 if ssl_maximum_version is not None:
306 context.maximum_version = ssl_maximum_version
307
308 # Unless we're given ciphers defer to either system ciphers in
309 # the case of OpenSSL 1.1.1+ or use our own secure default ciphers.
310 if ciphers:
311 context.set_ciphers(ciphers)
312
313 # Setting the default here, as we may have no ssl module on import
314 cert_reqs = ssl.CERT_REQUIRED if cert_reqs is None else cert_reqs
315
316 if options is None:
317 options = 0
318 # SSLv2 is easily broken and is considered harmful and dangerous
319 options |= OP_NO_SSLv2
320 # SSLv3 has several problems and is now dangerous
321 options |= OP_NO_SSLv3
322 # Disable compression to prevent CRIME attacks for OpenSSL 1.0+
323 # (issue #309)
324 options |= OP_NO_COMPRESSION
325 # TLSv1.2 only. Unless set explicitly, do not request tickets.
326 # This may save some bandwidth on wire, and although the ticket is encrypted,
327 # there is a risk associated with it being on wire,
328 # if the server is not rotating its ticketing keys properly.
329 options |= OP_NO_TICKET
330
331 context.options |= options
332
333 if verify_flags is None:
334 verify_flags = 0
335 # In Python 3.13+ ssl.create_default_context() sets VERIFY_X509_PARTIAL_CHAIN
336 # and VERIFY_X509_STRICT so we do the same
337 if sys.version_info >= (3, 13):
338 verify_flags |= VERIFY_X509_PARTIAL_CHAIN
339 verify_flags |= VERIFY_X509_STRICT
340
341 context.verify_flags |= verify_flags
342
343 # Enable post-handshake authentication for TLS 1.3, see GH #1634. PHA is
344 # necessary for conditional client cert authentication with TLS 1.3.
345 # The attribute is None for OpenSSL <= 1.1.0 or does not exist when using
346 # an SSLContext created by pyOpenSSL.
347 if getattr(context, "post_handshake_auth", None) is not None:
348 context.post_handshake_auth = True
349
350 # The order of the below lines setting verify_mode and check_hostname
351 # matter due to safe-guards SSLContext has to prevent an SSLContext with
352 # check_hostname=True, verify_mode=NONE/OPTIONAL.
353 # We always set 'check_hostname=False' for pyOpenSSL so we rely on our own
354 # 'ssl.match_hostname()' implementation.
355 if cert_reqs == ssl.CERT_REQUIRED and not IS_PYOPENSSL:
356 context.verify_mode = cert_reqs
357 context.check_hostname = True
358 else:
359 context.check_hostname = False
360 context.verify_mode = cert_reqs
361
362 try:
363 context.hostname_checks_common_name = False
364 except AttributeError: # Defensive: for CPython < 3.9.3; for PyPy < 7.3.8
365 pass
366
367 sslkeylogfile = os.environ.get("SSLKEYLOGFILE")
368 if sslkeylogfile:
369 context.keylog_filename = sslkeylogfile
370
371 return context
372
373
374@typing.overload
375def ssl_wrap_socket(
376 sock: socket.socket,
377 keyfile: str | None = ...,
378 certfile: str | None = ...,
379 cert_reqs: int | None = ...,
380 ca_certs: str | None = ...,
381 server_hostname: str | None = ...,
382 ssl_version: int | None = ...,
383 ciphers: str | None = ...,
384 ssl_context: ssl.SSLContext | None = ...,
385 ca_cert_dir: str | None = ...,
386 key_password: str | None = ...,
387 ca_cert_data: None | str | bytes = ...,
388 tls_in_tls: typing.Literal[False] = ...,
389) -> ssl.SSLSocket: ...
390
391
392@typing.overload
393def ssl_wrap_socket(
394 sock: socket.socket,
395 keyfile: str | None = ...,
396 certfile: str | None = ...,
397 cert_reqs: int | None = ...,
398 ca_certs: str | None = ...,
399 server_hostname: str | None = ...,
400 ssl_version: int | None = ...,
401 ciphers: str | None = ...,
402 ssl_context: ssl.SSLContext | None = ...,
403 ca_cert_dir: str | None = ...,
404 key_password: str | None = ...,
405 ca_cert_data: None | str | bytes = ...,
406 tls_in_tls: bool = ...,
407) -> ssl.SSLSocket | SSLTransportType: ...
408
409
410def ssl_wrap_socket(
411 sock: socket.socket,
412 keyfile: str | None = None,
413 certfile: str | None = None,
414 cert_reqs: int | None = None,
415 ca_certs: str | None = None,
416 server_hostname: str | None = None,
417 ssl_version: int | None = None,
418 ciphers: str | None = None,
419 ssl_context: ssl.SSLContext | None = None,
420 ca_cert_dir: str | None = None,
421 key_password: str | None = None,
422 ca_cert_data: None | str | bytes = None,
423 tls_in_tls: bool = False,
424) -> ssl.SSLSocket | SSLTransportType:
425 """
426 All arguments except for server_hostname, ssl_context, tls_in_tls, ca_cert_data and
427 ca_cert_dir have the same meaning as they do when using
428 :func:`ssl.create_default_context`, :meth:`ssl.SSLContext.load_cert_chain`,
429 :meth:`ssl.SSLContext.set_ciphers` and :meth:`ssl.SSLContext.wrap_socket`.
430
431 :param server_hostname:
432 When SNI is supported, the expected hostname of the certificate
433 :param ssl_context:
434 A pre-made :class:`SSLContext` object. If none is provided, one will
435 be created using :func:`create_urllib3_context`.
436 :param ciphers:
437 A string of ciphers we wish the client to support.
438 :param ca_cert_dir:
439 A directory containing CA certificates in multiple separate files, as
440 supported by OpenSSL's -CApath flag or the capath argument to
441 SSLContext.load_verify_locations().
442 :param key_password:
443 Optional password if the keyfile is encrypted.
444 :param ca_cert_data:
445 Optional string containing CA certificates in PEM format suitable for
446 passing as the cadata parameter to SSLContext.load_verify_locations()
447 :param tls_in_tls:
448 Use SSLTransport to wrap the existing socket.
449 """
450 context = ssl_context
451 if context is None:
452 # Note: This branch of code and all the variables in it are only used in tests.
453 # We should consider deprecating and removing this code.
454 context = create_urllib3_context(ssl_version, cert_reqs, ciphers=ciphers)
455
456 if ca_certs or ca_cert_dir or ca_cert_data:
457 try:
458 context.load_verify_locations(ca_certs, ca_cert_dir, ca_cert_data)
459 except OSError as e:
460 raise SSLError(e) from e
461
462 elif ssl_context is None and hasattr(context, "load_default_certs"):
463 # try to load OS default certs; works well on Windows.
464 context.load_default_certs()
465
466 # Attempt to detect if we get the goofy behavior of the
467 # keyfile being encrypted and OpenSSL asking for the
468 # passphrase via the terminal and instead error out.
469 if keyfile and key_password is None and _is_key_file_encrypted(keyfile):
470 raise SSLError("Client private key is encrypted, password is required")
471
472 if certfile:
473 if key_password is None:
474 context.load_cert_chain(certfile, keyfile)
475 else:
476 context.load_cert_chain(certfile, keyfile, key_password)
477
478 context.set_alpn_protocols(ALPN_PROTOCOLS)
479
480 ssl_sock = _ssl_wrap_socket_impl(sock, context, tls_in_tls, server_hostname)
481 return ssl_sock
482
483
484def is_ipaddress(hostname: str | bytes) -> bool:
485 """Detects whether the hostname given is an IPv4 or IPv6 address.
486 Also detects IPv6 addresses with Zone IDs.
487
488 :param str hostname: Hostname to examine.
489 :return: True if the hostname is an IP address, False otherwise.
490 """
491 if isinstance(hostname, bytes):
492 # IDN A-label bytes are ASCII compatible.
493 hostname = hostname.decode("ascii")
494 return bool(_IPV4_RE.match(hostname) or _BRACELESS_IPV6_ADDRZ_RE.match(hostname))
495
496
497def _is_key_file_encrypted(key_file: str) -> bool:
498 """Detects if a key file is encrypted or not."""
499 with open(key_file) as f:
500 for line in f:
501 # Look for Proc-Type: 4,ENCRYPTED
502 if "ENCRYPTED" in line:
503 return True
504
505 return False
506
507
508def _ssl_wrap_socket_impl(
509 sock: socket.socket,
510 ssl_context: ssl.SSLContext,
511 tls_in_tls: bool,
512 server_hostname: str | None = None,
513) -> ssl.SSLSocket | SSLTransportType:
514 if tls_in_tls:
515 if not SSLTransport:
516 # Import error, ssl is not available.
517 raise ProxySchemeUnsupported(
518 "TLS in TLS requires support for the 'ssl' module"
519 )
520
521 SSLTransport._validate_ssl_context_for_tls_in_tls(ssl_context)
522 return SSLTransport(sock, ssl_context, server_hostname)
523
524 return ssl_context.wrap_socket(sock, server_hostname=server_hostname)