Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/urllib3/contrib/pyopenssl.py: 28%
257 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1"""
2Module for using pyOpenSSL as a TLS backend. This module was relevant before
3the standard library ``ssl`` module supported SNI, but now that we've dropped
4support for Python 2.7 all relevant Python versions support SNI so
5**this module is no longer recommended**.
7This needs the following packages installed:
9* `pyOpenSSL`_ (tested with 16.0.0)
10* `cryptography`_ (minimum 1.3.4, from pyopenssl)
11* `idna`_ (minimum 2.0, from cryptography)
13However, pyOpenSSL depends on cryptography, which depends on idna, so while we
14use all three directly here we end up having relatively few packages required.
16You can install them with the following command:
18.. code-block:: bash
20 $ python -m pip install pyopenssl cryptography idna
22To activate certificate checking, call
23:func:`~urllib3.contrib.pyopenssl.inject_into_urllib3` from your Python code
24before you begin making HTTP requests. This can be done in a ``sitecustomize``
25module, or at any other time before your application begins using ``urllib3``,
26like this:
28.. code-block:: python
30 try:
31 import urllib3.contrib.pyopenssl
32 urllib3.contrib.pyopenssl.inject_into_urllib3()
33 except ImportError:
34 pass
36.. _pyopenssl: https://www.pyopenssl.org
37.. _cryptography: https://cryptography.io
38.. _idna: https://github.com/kjd/idna
39"""
41from __future__ import annotations
43import OpenSSL.SSL # type: ignore[import]
44from cryptography import x509
46try:
47 from cryptography.x509 import UnsupportedExtension # type: ignore[attr-defined]
48except ImportError:
49 # UnsupportedExtension is gone in cryptography >= 2.1.0
50 class UnsupportedExtension(Exception): # type: ignore[no-redef]
51 pass
54import logging
55import ssl
56import typing
57import warnings
58from io import BytesIO
59from socket import socket as socket_cls
60from socket import timeout
62from .. import util
64warnings.warn(
65 "'urllib3.contrib.pyopenssl' module is deprecated and will be removed "
66 "in urllib3 v2.1.0. Read more in this issue: "
67 "https://github.com/urllib3/urllib3/issues/2680",
68 category=DeprecationWarning,
69 stacklevel=2,
70)
72if typing.TYPE_CHECKING:
73 from OpenSSL.crypto import X509 # type: ignore[import]
76__all__ = ["inject_into_urllib3", "extract_from_urllib3"]
78# Map from urllib3 to PyOpenSSL compatible parameter-values.
79_openssl_versions = {
80 util.ssl_.PROTOCOL_TLS: OpenSSL.SSL.SSLv23_METHOD, # type: ignore[attr-defined]
81 util.ssl_.PROTOCOL_TLS_CLIENT: OpenSSL.SSL.SSLv23_METHOD, # type: ignore[attr-defined]
82 ssl.PROTOCOL_TLSv1: OpenSSL.SSL.TLSv1_METHOD,
83}
85if hasattr(ssl, "PROTOCOL_TLSv1_1") and hasattr(OpenSSL.SSL, "TLSv1_1_METHOD"):
86 _openssl_versions[ssl.PROTOCOL_TLSv1_1] = OpenSSL.SSL.TLSv1_1_METHOD
88if hasattr(ssl, "PROTOCOL_TLSv1_2") and hasattr(OpenSSL.SSL, "TLSv1_2_METHOD"):
89 _openssl_versions[ssl.PROTOCOL_TLSv1_2] = OpenSSL.SSL.TLSv1_2_METHOD
92_stdlib_to_openssl_verify = {
93 ssl.CERT_NONE: OpenSSL.SSL.VERIFY_NONE,
94 ssl.CERT_OPTIONAL: OpenSSL.SSL.VERIFY_PEER,
95 ssl.CERT_REQUIRED: OpenSSL.SSL.VERIFY_PEER
96 + OpenSSL.SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
97}
98_openssl_to_stdlib_verify = {v: k for k, v in _stdlib_to_openssl_verify.items()}
100# The SSLvX values are the most likely to be missing in the future
101# but we check them all just to be sure.
102_OP_NO_SSLv2_OR_SSLv3: int = getattr(OpenSSL.SSL, "OP_NO_SSLv2", 0) | getattr(
103 OpenSSL.SSL, "OP_NO_SSLv3", 0
104)
105_OP_NO_TLSv1: int = getattr(OpenSSL.SSL, "OP_NO_TLSv1", 0)
106_OP_NO_TLSv1_1: int = getattr(OpenSSL.SSL, "OP_NO_TLSv1_1", 0)
107_OP_NO_TLSv1_2: int = getattr(OpenSSL.SSL, "OP_NO_TLSv1_2", 0)
108_OP_NO_TLSv1_3: int = getattr(OpenSSL.SSL, "OP_NO_TLSv1_3", 0)
110_openssl_to_ssl_minimum_version: dict[int, int] = {
111 ssl.TLSVersion.MINIMUM_SUPPORTED: _OP_NO_SSLv2_OR_SSLv3,
112 ssl.TLSVersion.TLSv1: _OP_NO_SSLv2_OR_SSLv3,
113 ssl.TLSVersion.TLSv1_1: _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1,
114 ssl.TLSVersion.TLSv1_2: _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1 | _OP_NO_TLSv1_1,
115 ssl.TLSVersion.TLSv1_3: (
116 _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1 | _OP_NO_TLSv1_1 | _OP_NO_TLSv1_2
117 ),
118 ssl.TLSVersion.MAXIMUM_SUPPORTED: (
119 _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1 | _OP_NO_TLSv1_1 | _OP_NO_TLSv1_2
120 ),
121}
122_openssl_to_ssl_maximum_version: dict[int, int] = {
123 ssl.TLSVersion.MINIMUM_SUPPORTED: (
124 _OP_NO_SSLv2_OR_SSLv3
125 | _OP_NO_TLSv1
126 | _OP_NO_TLSv1_1
127 | _OP_NO_TLSv1_2
128 | _OP_NO_TLSv1_3
129 ),
130 ssl.TLSVersion.TLSv1: (
131 _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1_1 | _OP_NO_TLSv1_2 | _OP_NO_TLSv1_3
132 ),
133 ssl.TLSVersion.TLSv1_1: _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1_2 | _OP_NO_TLSv1_3,
134 ssl.TLSVersion.TLSv1_2: _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1_3,
135 ssl.TLSVersion.TLSv1_3: _OP_NO_SSLv2_OR_SSLv3,
136 ssl.TLSVersion.MAXIMUM_SUPPORTED: _OP_NO_SSLv2_OR_SSLv3,
137}
139# OpenSSL will only write 16K at a time
140SSL_WRITE_BLOCKSIZE = 16384
142orig_util_SSLContext = util.ssl_.SSLContext
145log = logging.getLogger(__name__)
148def inject_into_urllib3() -> None:
149 "Monkey-patch urllib3 with PyOpenSSL-backed SSL-support."
151 _validate_dependencies_met()
153 util.SSLContext = PyOpenSSLContext # type: ignore[assignment]
154 util.ssl_.SSLContext = PyOpenSSLContext # type: ignore[assignment]
155 util.IS_PYOPENSSL = True
156 util.ssl_.IS_PYOPENSSL = True
159def extract_from_urllib3() -> None:
160 "Undo monkey-patching by :func:`inject_into_urllib3`."
162 util.SSLContext = orig_util_SSLContext
163 util.ssl_.SSLContext = orig_util_SSLContext
164 util.IS_PYOPENSSL = False
165 util.ssl_.IS_PYOPENSSL = False
168def _validate_dependencies_met() -> None:
169 """
170 Verifies that PyOpenSSL's package-level dependencies have been met.
171 Throws `ImportError` if they are not met.
172 """
173 # Method added in `cryptography==1.1`; not available in older versions
174 from cryptography.x509.extensions import Extensions
176 if getattr(Extensions, "get_extension_for_class", None) is None:
177 raise ImportError(
178 "'cryptography' module missing required functionality. "
179 "Try upgrading to v1.3.4 or newer."
180 )
182 # pyOpenSSL 0.14 and above use cryptography for OpenSSL bindings. The _x509
183 # attribute is only present on those versions.
184 from OpenSSL.crypto import X509
186 x509 = X509()
187 if getattr(x509, "_x509", None) is None:
188 raise ImportError(
189 "'pyOpenSSL' module missing required functionality. "
190 "Try upgrading to v0.14 or newer."
191 )
194def _dnsname_to_stdlib(name: str) -> str | None:
195 """
196 Converts a dNSName SubjectAlternativeName field to the form used by the
197 standard library on the given Python version.
199 Cryptography produces a dNSName as a unicode string that was idna-decoded
200 from ASCII bytes. We need to idna-encode that string to get it back, and
201 then on Python 3 we also need to convert to unicode via UTF-8 (the stdlib
202 uses PyUnicode_FromStringAndSize on it, which decodes via UTF-8).
204 If the name cannot be idna-encoded then we return None signalling that
205 the name given should be skipped.
206 """
208 def idna_encode(name: str) -> bytes | None:
209 """
210 Borrowed wholesale from the Python Cryptography Project. It turns out
211 that we can't just safely call `idna.encode`: it can explode for
212 wildcard names. This avoids that problem.
213 """
214 import idna
216 try:
217 for prefix in ["*.", "."]:
218 if name.startswith(prefix):
219 name = name[len(prefix) :]
220 return prefix.encode("ascii") + idna.encode(name)
221 return idna.encode(name)
222 except idna.core.IDNAError:
223 return None
225 # Don't send IPv6 addresses through the IDNA encoder.
226 if ":" in name:
227 return name
229 encoded_name = idna_encode(name)
230 if encoded_name is None:
231 return None
232 return encoded_name.decode("utf-8")
235def get_subj_alt_name(peer_cert: X509) -> list[tuple[str, str]]:
236 """
237 Given an PyOpenSSL certificate, provides all the subject alternative names.
238 """
239 cert = peer_cert.to_cryptography()
241 # We want to find the SAN extension. Ask Cryptography to locate it (it's
242 # faster than looping in Python)
243 try:
244 ext = cert.extensions.get_extension_for_class(x509.SubjectAlternativeName).value
245 except x509.ExtensionNotFound:
246 # No such extension, return the empty list.
247 return []
248 except (
249 x509.DuplicateExtension,
250 UnsupportedExtension,
251 x509.UnsupportedGeneralNameType,
252 UnicodeError,
253 ) as e:
254 # A problem has been found with the quality of the certificate. Assume
255 # no SAN field is present.
256 log.warning(
257 "A problem was encountered with the certificate that prevented "
258 "urllib3 from finding the SubjectAlternativeName field. This can "
259 "affect certificate validation. The error was %s",
260 e,
261 )
262 return []
264 # We want to return dNSName and iPAddress fields. We need to cast the IPs
265 # back to strings because the match_hostname function wants them as
266 # strings.
267 # Sadly the DNS names need to be idna encoded and then, on Python 3, UTF-8
268 # decoded. This is pretty frustrating, but that's what the standard library
269 # does with certificates, and so we need to attempt to do the same.
270 # We also want to skip over names which cannot be idna encoded.
271 names = [
272 ("DNS", name)
273 for name in map(_dnsname_to_stdlib, ext.get_values_for_type(x509.DNSName))
274 if name is not None
275 ]
276 names.extend(
277 ("IP Address", str(name)) for name in ext.get_values_for_type(x509.IPAddress)
278 )
280 return names
283class WrappedSocket:
284 """API-compatibility wrapper for Python OpenSSL's Connection-class."""
286 def __init__(
287 self,
288 connection: OpenSSL.SSL.Connection,
289 socket: socket_cls,
290 suppress_ragged_eofs: bool = True,
291 ) -> None:
292 self.connection = connection
293 self.socket = socket
294 self.suppress_ragged_eofs = suppress_ragged_eofs
295 self._io_refs = 0
296 self._closed = False
298 def fileno(self) -> int:
299 return self.socket.fileno()
301 # Copy-pasted from Python 3.5 source code
302 def _decref_socketios(self) -> None:
303 if self._io_refs > 0:
304 self._io_refs -= 1
305 if self._closed:
306 self.close()
308 def recv(self, *args: typing.Any, **kwargs: typing.Any) -> bytes:
309 try:
310 data = self.connection.recv(*args, **kwargs)
311 except OpenSSL.SSL.SysCallError as e:
312 if self.suppress_ragged_eofs and e.args == (-1, "Unexpected EOF"):
313 return b""
314 else:
315 raise OSError(e.args[0], str(e)) from e
316 except OpenSSL.SSL.ZeroReturnError:
317 if self.connection.get_shutdown() == OpenSSL.SSL.RECEIVED_SHUTDOWN:
318 return b""
319 else:
320 raise
321 except OpenSSL.SSL.WantReadError as e:
322 if not util.wait_for_read(self.socket, self.socket.gettimeout()):
323 raise timeout("The read operation timed out") from e
324 else:
325 return self.recv(*args, **kwargs)
327 # TLS 1.3 post-handshake authentication
328 except OpenSSL.SSL.Error as e:
329 raise ssl.SSLError(f"read error: {e!r}") from e
330 else:
331 return data # type: ignore[no-any-return]
333 def recv_into(self, *args: typing.Any, **kwargs: typing.Any) -> int:
334 try:
335 return self.connection.recv_into(*args, **kwargs) # type: ignore[no-any-return]
336 except OpenSSL.SSL.SysCallError as e:
337 if self.suppress_ragged_eofs and e.args == (-1, "Unexpected EOF"):
338 return 0
339 else:
340 raise OSError(e.args[0], str(e)) from e
341 except OpenSSL.SSL.ZeroReturnError:
342 if self.connection.get_shutdown() == OpenSSL.SSL.RECEIVED_SHUTDOWN:
343 return 0
344 else:
345 raise
346 except OpenSSL.SSL.WantReadError as e:
347 if not util.wait_for_read(self.socket, self.socket.gettimeout()):
348 raise timeout("The read operation timed out") from e
349 else:
350 return self.recv_into(*args, **kwargs)
352 # TLS 1.3 post-handshake authentication
353 except OpenSSL.SSL.Error as e:
354 raise ssl.SSLError(f"read error: {e!r}") from e
356 def settimeout(self, timeout: float) -> None:
357 return self.socket.settimeout(timeout)
359 def _send_until_done(self, data: bytes) -> int:
360 while True:
361 try:
362 return self.connection.send(data) # type: ignore[no-any-return]
363 except OpenSSL.SSL.WantWriteError as e:
364 if not util.wait_for_write(self.socket, self.socket.gettimeout()):
365 raise timeout() from e
366 continue
367 except OpenSSL.SSL.SysCallError as e:
368 raise OSError(e.args[0], str(e)) from e
370 def sendall(self, data: bytes) -> None:
371 total_sent = 0
372 while total_sent < len(data):
373 sent = self._send_until_done(
374 data[total_sent : total_sent + SSL_WRITE_BLOCKSIZE]
375 )
376 total_sent += sent
378 def shutdown(self) -> None:
379 # FIXME rethrow compatible exceptions should we ever use this
380 self.connection.shutdown()
382 def close(self) -> None:
383 self._closed = True
384 if self._io_refs <= 0:
385 self._real_close()
387 def _real_close(self) -> None:
388 try:
389 return self.connection.close() # type: ignore[no-any-return]
390 except OpenSSL.SSL.Error:
391 return
393 def getpeercert(
394 self, binary_form: bool = False
395 ) -> dict[str, list[typing.Any]] | None:
396 x509 = self.connection.get_peer_certificate()
398 if not x509:
399 return x509 # type: ignore[no-any-return]
401 if binary_form:
402 return OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_ASN1, x509) # type: ignore[no-any-return]
404 return {
405 "subject": ((("commonName", x509.get_subject().CN),),), # type: ignore[dict-item]
406 "subjectAltName": get_subj_alt_name(x509),
407 }
409 def version(self) -> str:
410 return self.connection.get_protocol_version_name() # type: ignore[no-any-return]
413WrappedSocket.makefile = socket_cls.makefile # type: ignore[attr-defined]
416class PyOpenSSLContext:
417 """
418 I am a wrapper class for the PyOpenSSL ``Context`` object. I am responsible
419 for translating the interface of the standard library ``SSLContext`` object
420 to calls into PyOpenSSL.
421 """
423 def __init__(self, protocol: int) -> None:
424 self.protocol = _openssl_versions[protocol]
425 self._ctx = OpenSSL.SSL.Context(self.protocol)
426 self._options = 0
427 self.check_hostname = False
428 self._minimum_version: int = ssl.TLSVersion.MINIMUM_SUPPORTED
429 self._maximum_version: int = ssl.TLSVersion.MAXIMUM_SUPPORTED
431 @property
432 def options(self) -> int:
433 return self._options
435 @options.setter
436 def options(self, value: int) -> None:
437 self._options = value
438 self._set_ctx_options()
440 @property
441 def verify_mode(self) -> int:
442 return _openssl_to_stdlib_verify[self._ctx.get_verify_mode()]
444 @verify_mode.setter
445 def verify_mode(self, value: ssl.VerifyMode) -> None:
446 self._ctx.set_verify(_stdlib_to_openssl_verify[value], _verify_callback)
448 def set_default_verify_paths(self) -> None:
449 self._ctx.set_default_verify_paths()
451 def set_ciphers(self, ciphers: bytes | str) -> None:
452 if isinstance(ciphers, str):
453 ciphers = ciphers.encode("utf-8")
454 self._ctx.set_cipher_list(ciphers)
456 def load_verify_locations(
457 self,
458 cafile: str | None = None,
459 capath: str | None = None,
460 cadata: bytes | None = None,
461 ) -> None:
462 if cafile is not None:
463 cafile = cafile.encode("utf-8") # type: ignore[assignment]
464 if capath is not None:
465 capath = capath.encode("utf-8") # type: ignore[assignment]
466 try:
467 self._ctx.load_verify_locations(cafile, capath)
468 if cadata is not None:
469 self._ctx.load_verify_locations(BytesIO(cadata))
470 except OpenSSL.SSL.Error as e:
471 raise ssl.SSLError(f"unable to load trusted certificates: {e!r}") from e
473 def load_cert_chain(
474 self,
475 certfile: str,
476 keyfile: str | None = None,
477 password: str | None = None,
478 ) -> None:
479 try:
480 self._ctx.use_certificate_chain_file(certfile)
481 if password is not None:
482 if not isinstance(password, bytes):
483 password = password.encode("utf-8") # type: ignore[assignment]
484 self._ctx.set_passwd_cb(lambda *_: password)
485 self._ctx.use_privatekey_file(keyfile or certfile)
486 except OpenSSL.SSL.Error as e:
487 raise ssl.SSLError(f"Unable to load certificate chain: {e!r}") from e
489 def set_alpn_protocols(self, protocols: list[bytes | str]) -> None:
490 protocols = [util.util.to_bytes(p, "ascii") for p in protocols]
491 return self._ctx.set_alpn_protos(protocols) # type: ignore[no-any-return]
493 def wrap_socket(
494 self,
495 sock: socket_cls,
496 server_side: bool = False,
497 do_handshake_on_connect: bool = True,
498 suppress_ragged_eofs: bool = True,
499 server_hostname: bytes | str | None = None,
500 ) -> WrappedSocket:
501 cnx = OpenSSL.SSL.Connection(self._ctx, sock)
503 # If server_hostname is an IP, don't use it for SNI, per RFC6066 Section 3
504 if server_hostname and not util.ssl_.is_ipaddress(server_hostname):
505 if isinstance(server_hostname, str):
506 server_hostname = server_hostname.encode("utf-8")
507 cnx.set_tlsext_host_name(server_hostname)
509 cnx.set_connect_state()
511 while True:
512 try:
513 cnx.do_handshake()
514 except OpenSSL.SSL.WantReadError as e:
515 if not util.wait_for_read(sock, sock.gettimeout()):
516 raise timeout("select timed out") from e
517 continue
518 except OpenSSL.SSL.Error as e:
519 raise ssl.SSLError(f"bad handshake: {e!r}") from e
520 break
522 return WrappedSocket(cnx, sock)
524 def _set_ctx_options(self) -> None:
525 self._ctx.set_options(
526 self._options
527 | _openssl_to_ssl_minimum_version[self._minimum_version]
528 | _openssl_to_ssl_maximum_version[self._maximum_version]
529 )
531 @property
532 def minimum_version(self) -> int:
533 return self._minimum_version
535 @minimum_version.setter
536 def minimum_version(self, minimum_version: int) -> None:
537 self._minimum_version = minimum_version
538 self._set_ctx_options()
540 @property
541 def maximum_version(self) -> int:
542 return self._maximum_version
544 @maximum_version.setter
545 def maximum_version(self, maximum_version: int) -> None:
546 self._maximum_version = maximum_version
547 self._set_ctx_options()
550def _verify_callback(
551 cnx: OpenSSL.SSL.Connection,
552 x509: X509,
553 err_no: int,
554 err_depth: int,
555 return_code: int,
556) -> bool:
557 return err_no == 0