Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/urllib3/contrib/pyopenssl.py: 26%
255 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:40 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:40 +0000
1"""
2Module for using pyOpenSSL as a TLS backend. This module was relevant before
3the standard library ``ssl`` module supported SNI, but now that we've dropped
4support for Python 2.7 all relevant Python versions support SNI so
5**this module is no longer recommended**.
7This needs the following packages installed:
9* `pyOpenSSL`_ (tested with 16.0.0)
10* `cryptography`_ (minimum 1.3.4, from pyopenssl)
11* `idna`_ (minimum 2.0)
13However, pyOpenSSL depends on cryptography, so while we use all three directly here we
14end up having relatively few packages required.
16You can install them with the following command:
18.. code-block:: bash
20 $ python -m pip install pyopenssl cryptography idna
22To activate certificate checking, call
23:func:`~urllib3.contrib.pyopenssl.inject_into_urllib3` from your Python code
24before you begin making HTTP requests. This can be done in a ``sitecustomize``
25module, or at any other time before your application begins using ``urllib3``,
26like this:
28.. code-block:: python
30 try:
31 import urllib3.contrib.pyopenssl
32 urllib3.contrib.pyopenssl.inject_into_urllib3()
33 except ImportError:
34 pass
36.. _pyopenssl: https://www.pyopenssl.org
37.. _cryptography: https://cryptography.io
38.. _idna: https://github.com/kjd/idna
39"""
41from __future__ import annotations
43import OpenSSL.SSL # type: ignore[import]
44from cryptography import x509
46try:
47 from cryptography.x509 import UnsupportedExtension # type: ignore[attr-defined]
48except ImportError:
49 # UnsupportedExtension is gone in cryptography >= 2.1.0
50 class UnsupportedExtension(Exception): # type: ignore[no-redef]
51 pass
54import logging
55import ssl
56import typing
57from io import BytesIO
58from socket import socket as socket_cls
59from socket import timeout
61from .. import util
63if typing.TYPE_CHECKING:
64 from OpenSSL.crypto import X509 # type: ignore[import]
67__all__ = ["inject_into_urllib3", "extract_from_urllib3"]
69# Map from urllib3 to PyOpenSSL compatible parameter-values.
70_openssl_versions = {
71 util.ssl_.PROTOCOL_TLS: OpenSSL.SSL.SSLv23_METHOD, # type: ignore[attr-defined]
72 util.ssl_.PROTOCOL_TLS_CLIENT: OpenSSL.SSL.SSLv23_METHOD, # type: ignore[attr-defined]
73 ssl.PROTOCOL_TLSv1: OpenSSL.SSL.TLSv1_METHOD,
74}
76if hasattr(ssl, "PROTOCOL_TLSv1_1") and hasattr(OpenSSL.SSL, "TLSv1_1_METHOD"):
77 _openssl_versions[ssl.PROTOCOL_TLSv1_1] = OpenSSL.SSL.TLSv1_1_METHOD
79if hasattr(ssl, "PROTOCOL_TLSv1_2") and hasattr(OpenSSL.SSL, "TLSv1_2_METHOD"):
80 _openssl_versions[ssl.PROTOCOL_TLSv1_2] = OpenSSL.SSL.TLSv1_2_METHOD
83_stdlib_to_openssl_verify = {
84 ssl.CERT_NONE: OpenSSL.SSL.VERIFY_NONE,
85 ssl.CERT_OPTIONAL: OpenSSL.SSL.VERIFY_PEER,
86 ssl.CERT_REQUIRED: OpenSSL.SSL.VERIFY_PEER
87 + OpenSSL.SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
88}
89_openssl_to_stdlib_verify = {v: k for k, v in _stdlib_to_openssl_verify.items()}
91# The SSLvX values are the most likely to be missing in the future
92# but we check them all just to be sure.
93_OP_NO_SSLv2_OR_SSLv3: int = getattr(OpenSSL.SSL, "OP_NO_SSLv2", 0) | getattr(
94 OpenSSL.SSL, "OP_NO_SSLv3", 0
95)
96_OP_NO_TLSv1: int = getattr(OpenSSL.SSL, "OP_NO_TLSv1", 0)
97_OP_NO_TLSv1_1: int = getattr(OpenSSL.SSL, "OP_NO_TLSv1_1", 0)
98_OP_NO_TLSv1_2: int = getattr(OpenSSL.SSL, "OP_NO_TLSv1_2", 0)
99_OP_NO_TLSv1_3: int = getattr(OpenSSL.SSL, "OP_NO_TLSv1_3", 0)
101_openssl_to_ssl_minimum_version: dict[int, int] = {
102 ssl.TLSVersion.MINIMUM_SUPPORTED: _OP_NO_SSLv2_OR_SSLv3,
103 ssl.TLSVersion.TLSv1: _OP_NO_SSLv2_OR_SSLv3,
104 ssl.TLSVersion.TLSv1_1: _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1,
105 ssl.TLSVersion.TLSv1_2: _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1 | _OP_NO_TLSv1_1,
106 ssl.TLSVersion.TLSv1_3: (
107 _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1 | _OP_NO_TLSv1_1 | _OP_NO_TLSv1_2
108 ),
109 ssl.TLSVersion.MAXIMUM_SUPPORTED: (
110 _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1 | _OP_NO_TLSv1_1 | _OP_NO_TLSv1_2
111 ),
112}
113_openssl_to_ssl_maximum_version: dict[int, int] = {
114 ssl.TLSVersion.MINIMUM_SUPPORTED: (
115 _OP_NO_SSLv2_OR_SSLv3
116 | _OP_NO_TLSv1
117 | _OP_NO_TLSv1_1
118 | _OP_NO_TLSv1_2
119 | _OP_NO_TLSv1_3
120 ),
121 ssl.TLSVersion.TLSv1: (
122 _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1_1 | _OP_NO_TLSv1_2 | _OP_NO_TLSv1_3
123 ),
124 ssl.TLSVersion.TLSv1_1: _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1_2 | _OP_NO_TLSv1_3,
125 ssl.TLSVersion.TLSv1_2: _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1_3,
126 ssl.TLSVersion.TLSv1_3: _OP_NO_SSLv2_OR_SSLv3,
127 ssl.TLSVersion.MAXIMUM_SUPPORTED: _OP_NO_SSLv2_OR_SSLv3,
128}
130# OpenSSL will only write 16K at a time
131SSL_WRITE_BLOCKSIZE = 16384
133orig_util_SSLContext = util.ssl_.SSLContext
136log = logging.getLogger(__name__)
139def inject_into_urllib3() -> None:
140 "Monkey-patch urllib3 with PyOpenSSL-backed SSL-support."
142 _validate_dependencies_met()
144 util.SSLContext = PyOpenSSLContext # type: ignore[assignment]
145 util.ssl_.SSLContext = PyOpenSSLContext # type: ignore[assignment]
146 util.IS_PYOPENSSL = True
147 util.ssl_.IS_PYOPENSSL = True
150def extract_from_urllib3() -> None:
151 "Undo monkey-patching by :func:`inject_into_urllib3`."
153 util.SSLContext = orig_util_SSLContext
154 util.ssl_.SSLContext = orig_util_SSLContext
155 util.IS_PYOPENSSL = False
156 util.ssl_.IS_PYOPENSSL = False
159def _validate_dependencies_met() -> None:
160 """
161 Verifies that PyOpenSSL's package-level dependencies have been met.
162 Throws `ImportError` if they are not met.
163 """
164 # Method added in `cryptography==1.1`; not available in older versions
165 from cryptography.x509.extensions import Extensions
167 if getattr(Extensions, "get_extension_for_class", None) is None:
168 raise ImportError(
169 "'cryptography' module missing required functionality. "
170 "Try upgrading to v1.3.4 or newer."
171 )
173 # pyOpenSSL 0.14 and above use cryptography for OpenSSL bindings. The _x509
174 # attribute is only present on those versions.
175 from OpenSSL.crypto import X509
177 x509 = X509()
178 if getattr(x509, "_x509", None) is None:
179 raise ImportError(
180 "'pyOpenSSL' module missing required functionality. "
181 "Try upgrading to v0.14 or newer."
182 )
185def _dnsname_to_stdlib(name: str) -> str | None:
186 """
187 Converts a dNSName SubjectAlternativeName field to the form used by the
188 standard library on the given Python version.
190 Cryptography produces a dNSName as a unicode string that was idna-decoded
191 from ASCII bytes. We need to idna-encode that string to get it back, and
192 then on Python 3 we also need to convert to unicode via UTF-8 (the stdlib
193 uses PyUnicode_FromStringAndSize on it, which decodes via UTF-8).
195 If the name cannot be idna-encoded then we return None signalling that
196 the name given should be skipped.
197 """
199 def idna_encode(name: str) -> bytes | None:
200 """
201 Borrowed wholesale from the Python Cryptography Project. It turns out
202 that we can't just safely call `idna.encode`: it can explode for
203 wildcard names. This avoids that problem.
204 """
205 import idna
207 try:
208 for prefix in ["*.", "."]:
209 if name.startswith(prefix):
210 name = name[len(prefix) :]
211 return prefix.encode("ascii") + idna.encode(name)
212 return idna.encode(name)
213 except idna.core.IDNAError:
214 return None
216 # Don't send IPv6 addresses through the IDNA encoder.
217 if ":" in name:
218 return name
220 encoded_name = idna_encode(name)
221 if encoded_name is None:
222 return None
223 return encoded_name.decode("utf-8")
226def get_subj_alt_name(peer_cert: X509) -> list[tuple[str, str]]:
227 """
228 Given an PyOpenSSL certificate, provides all the subject alternative names.
229 """
230 cert = peer_cert.to_cryptography()
232 # We want to find the SAN extension. Ask Cryptography to locate it (it's
233 # faster than looping in Python)
234 try:
235 ext = cert.extensions.get_extension_for_class(x509.SubjectAlternativeName).value
236 except x509.ExtensionNotFound:
237 # No such extension, return the empty list.
238 return []
239 except (
240 x509.DuplicateExtension,
241 UnsupportedExtension,
242 x509.UnsupportedGeneralNameType,
243 UnicodeError,
244 ) as e:
245 # A problem has been found with the quality of the certificate. Assume
246 # no SAN field is present.
247 log.warning(
248 "A problem was encountered with the certificate that prevented "
249 "urllib3 from finding the SubjectAlternativeName field. This can "
250 "affect certificate validation. The error was %s",
251 e,
252 )
253 return []
255 # We want to return dNSName and iPAddress fields. We need to cast the IPs
256 # back to strings because the match_hostname function wants them as
257 # strings.
258 # Sadly the DNS names need to be idna encoded and then, on Python 3, UTF-8
259 # decoded. This is pretty frustrating, but that's what the standard library
260 # does with certificates, and so we need to attempt to do the same.
261 # We also want to skip over names which cannot be idna encoded.
262 names = [
263 ("DNS", name)
264 for name in map(_dnsname_to_stdlib, ext.get_values_for_type(x509.DNSName))
265 if name is not None
266 ]
267 names.extend(
268 ("IP Address", str(name)) for name in ext.get_values_for_type(x509.IPAddress)
269 )
271 return names
274class WrappedSocket:
275 """API-compatibility wrapper for Python OpenSSL's Connection-class."""
277 def __init__(
278 self,
279 connection: OpenSSL.SSL.Connection,
280 socket: socket_cls,
281 suppress_ragged_eofs: bool = True,
282 ) -> None:
283 self.connection = connection
284 self.socket = socket
285 self.suppress_ragged_eofs = suppress_ragged_eofs
286 self._io_refs = 0
287 self._closed = False
289 def fileno(self) -> int:
290 return self.socket.fileno()
292 # Copy-pasted from Python 3.5 source code
293 def _decref_socketios(self) -> None:
294 if self._io_refs > 0:
295 self._io_refs -= 1
296 if self._closed:
297 self.close()
299 def recv(self, *args: typing.Any, **kwargs: typing.Any) -> bytes:
300 try:
301 data = self.connection.recv(*args, **kwargs)
302 except OpenSSL.SSL.SysCallError as e:
303 if self.suppress_ragged_eofs and e.args == (-1, "Unexpected EOF"):
304 return b""
305 else:
306 raise OSError(e.args[0], str(e)) from e
307 except OpenSSL.SSL.ZeroReturnError:
308 if self.connection.get_shutdown() == OpenSSL.SSL.RECEIVED_SHUTDOWN:
309 return b""
310 else:
311 raise
312 except OpenSSL.SSL.WantReadError as e:
313 if not util.wait_for_read(self.socket, self.socket.gettimeout()):
314 raise timeout("The read operation timed out") from e
315 else:
316 return self.recv(*args, **kwargs)
318 # TLS 1.3 post-handshake authentication
319 except OpenSSL.SSL.Error as e:
320 raise ssl.SSLError(f"read error: {e!r}") from e
321 else:
322 return data # type: ignore[no-any-return]
324 def recv_into(self, *args: typing.Any, **kwargs: typing.Any) -> int:
325 try:
326 return self.connection.recv_into(*args, **kwargs) # type: ignore[no-any-return]
327 except OpenSSL.SSL.SysCallError as e:
328 if self.suppress_ragged_eofs and e.args == (-1, "Unexpected EOF"):
329 return 0
330 else:
331 raise OSError(e.args[0], str(e)) from e
332 except OpenSSL.SSL.ZeroReturnError:
333 if self.connection.get_shutdown() == OpenSSL.SSL.RECEIVED_SHUTDOWN:
334 return 0
335 else:
336 raise
337 except OpenSSL.SSL.WantReadError as e:
338 if not util.wait_for_read(self.socket, self.socket.gettimeout()):
339 raise timeout("The read operation timed out") from e
340 else:
341 return self.recv_into(*args, **kwargs)
343 # TLS 1.3 post-handshake authentication
344 except OpenSSL.SSL.Error as e:
345 raise ssl.SSLError(f"read error: {e!r}") from e
347 def settimeout(self, timeout: float) -> None:
348 return self.socket.settimeout(timeout)
350 def _send_until_done(self, data: bytes) -> int:
351 while True:
352 try:
353 return self.connection.send(data) # type: ignore[no-any-return]
354 except OpenSSL.SSL.WantWriteError as e:
355 if not util.wait_for_write(self.socket, self.socket.gettimeout()):
356 raise timeout() from e
357 continue
358 except OpenSSL.SSL.SysCallError as e:
359 raise OSError(e.args[0], str(e)) from e
361 def sendall(self, data: bytes) -> None:
362 total_sent = 0
363 while total_sent < len(data):
364 sent = self._send_until_done(
365 data[total_sent : total_sent + SSL_WRITE_BLOCKSIZE]
366 )
367 total_sent += sent
369 def shutdown(self) -> None:
370 # FIXME rethrow compatible exceptions should we ever use this
371 self.connection.shutdown()
373 def close(self) -> None:
374 self._closed = True
375 if self._io_refs <= 0:
376 self._real_close()
378 def _real_close(self) -> None:
379 try:
380 return self.connection.close() # type: ignore[no-any-return]
381 except OpenSSL.SSL.Error:
382 return
384 def getpeercert(
385 self, binary_form: bool = False
386 ) -> dict[str, list[typing.Any]] | None:
387 x509 = self.connection.get_peer_certificate()
389 if not x509:
390 return x509 # type: ignore[no-any-return]
392 if binary_form:
393 return OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_ASN1, x509) # type: ignore[no-any-return]
395 return {
396 "subject": ((("commonName", x509.get_subject().CN),),), # type: ignore[dict-item]
397 "subjectAltName": get_subj_alt_name(x509),
398 }
400 def version(self) -> str:
401 return self.connection.get_protocol_version_name() # type: ignore[no-any-return]
404WrappedSocket.makefile = socket_cls.makefile # type: ignore[attr-defined]
407class PyOpenSSLContext:
408 """
409 I am a wrapper class for the PyOpenSSL ``Context`` object. I am responsible
410 for translating the interface of the standard library ``SSLContext`` object
411 to calls into PyOpenSSL.
412 """
414 def __init__(self, protocol: int) -> None:
415 self.protocol = _openssl_versions[protocol]
416 self._ctx = OpenSSL.SSL.Context(self.protocol)
417 self._options = 0
418 self.check_hostname = False
419 self._minimum_version: int = ssl.TLSVersion.MINIMUM_SUPPORTED
420 self._maximum_version: int = ssl.TLSVersion.MAXIMUM_SUPPORTED
422 @property
423 def options(self) -> int:
424 return self._options
426 @options.setter
427 def options(self, value: int) -> None:
428 self._options = value
429 self._set_ctx_options()
431 @property
432 def verify_mode(self) -> int:
433 return _openssl_to_stdlib_verify[self._ctx.get_verify_mode()]
435 @verify_mode.setter
436 def verify_mode(self, value: ssl.VerifyMode) -> None:
437 self._ctx.set_verify(_stdlib_to_openssl_verify[value], _verify_callback)
439 def set_default_verify_paths(self) -> None:
440 self._ctx.set_default_verify_paths()
442 def set_ciphers(self, ciphers: bytes | str) -> None:
443 if isinstance(ciphers, str):
444 ciphers = ciphers.encode("utf-8")
445 self._ctx.set_cipher_list(ciphers)
447 def load_verify_locations(
448 self,
449 cafile: str | None = None,
450 capath: str | None = None,
451 cadata: bytes | None = None,
452 ) -> None:
453 if cafile is not None:
454 cafile = cafile.encode("utf-8") # type: ignore[assignment]
455 if capath is not None:
456 capath = capath.encode("utf-8") # type: ignore[assignment]
457 try:
458 self._ctx.load_verify_locations(cafile, capath)
459 if cadata is not None:
460 self._ctx.load_verify_locations(BytesIO(cadata))
461 except OpenSSL.SSL.Error as e:
462 raise ssl.SSLError(f"unable to load trusted certificates: {e!r}") from e
464 def load_cert_chain(
465 self,
466 certfile: str,
467 keyfile: str | None = None,
468 password: str | None = None,
469 ) -> None:
470 try:
471 self._ctx.use_certificate_chain_file(certfile)
472 if password is not None:
473 if not isinstance(password, bytes):
474 password = password.encode("utf-8") # type: ignore[assignment]
475 self._ctx.set_passwd_cb(lambda *_: password)
476 self._ctx.use_privatekey_file(keyfile or certfile)
477 except OpenSSL.SSL.Error as e:
478 raise ssl.SSLError(f"Unable to load certificate chain: {e!r}") from e
480 def set_alpn_protocols(self, protocols: list[bytes | str]) -> None:
481 protocols = [util.util.to_bytes(p, "ascii") for p in protocols]
482 return self._ctx.set_alpn_protos(protocols) # type: ignore[no-any-return]
484 def wrap_socket(
485 self,
486 sock: socket_cls,
487 server_side: bool = False,
488 do_handshake_on_connect: bool = True,
489 suppress_ragged_eofs: bool = True,
490 server_hostname: bytes | str | None = None,
491 ) -> WrappedSocket:
492 cnx = OpenSSL.SSL.Connection(self._ctx, sock)
494 # If server_hostname is an IP, don't use it for SNI, per RFC6066 Section 3
495 if server_hostname and not util.ssl_.is_ipaddress(server_hostname):
496 if isinstance(server_hostname, str):
497 server_hostname = server_hostname.encode("utf-8")
498 cnx.set_tlsext_host_name(server_hostname)
500 cnx.set_connect_state()
502 while True:
503 try:
504 cnx.do_handshake()
505 except OpenSSL.SSL.WantReadError as e:
506 if not util.wait_for_read(sock, sock.gettimeout()):
507 raise timeout("select timed out") from e
508 continue
509 except OpenSSL.SSL.Error as e:
510 raise ssl.SSLError(f"bad handshake: {e!r}") from e
511 break
513 return WrappedSocket(cnx, sock)
515 def _set_ctx_options(self) -> None:
516 self._ctx.set_options(
517 self._options
518 | _openssl_to_ssl_minimum_version[self._minimum_version]
519 | _openssl_to_ssl_maximum_version[self._maximum_version]
520 )
522 @property
523 def minimum_version(self) -> int:
524 return self._minimum_version
526 @minimum_version.setter
527 def minimum_version(self, minimum_version: int) -> None:
528 self._minimum_version = minimum_version
529 self._set_ctx_options()
531 @property
532 def maximum_version(self) -> int:
533 return self._maximum_version
535 @maximum_version.setter
536 def maximum_version(self, maximum_version: int) -> None:
537 self._maximum_version = maximum_version
538 self._set_ctx_options()
541def _verify_callback(
542 cnx: OpenSSL.SSL.Connection,
543 x509: X509,
544 err_no: int,
545 err_depth: int,
546 return_code: int,
547) -> bool:
548 return err_no == 0