1"""
2Module for using pyOpenSSL as a TLS backend. This module was relevant before
3the standard library ``ssl`` module supported SNI, but now that we've dropped
4support for Python 2.7 all relevant Python versions support SNI so
5**this module is no longer recommended**.
6
7This needs the following packages installed:
8
9* `pyOpenSSL`_ (tested with 19.0.0)
10* `cryptography`_ (minimum 2.3, from pyopenssl)
11* `idna`_ (minimum 2.1, from cryptography)
12
13However, pyOpenSSL depends on cryptography, so while we use all three directly here we
14end up having relatively few packages required.
15
16You can install them with the following command:
17
18.. code-block:: bash
19
20 $ python -m pip install pyopenssl cryptography idna
21
22To activate certificate checking, call
23:func:`~urllib3.contrib.pyopenssl.inject_into_urllib3` from your Python code
24before you begin making HTTP requests. This can be done in a ``sitecustomize``
25module, or at any other time before your application begins using ``urllib3``,
26like this:
27
28.. code-block:: python
29
30 try:
31 import urllib3.contrib.pyopenssl
32 urllib3.contrib.pyopenssl.inject_into_urllib3()
33 except ImportError:
34 pass
35
36.. _pyopenssl: https://www.pyopenssl.org
37.. _cryptography: https://cryptography.io
38.. _idna: https://github.com/kjd/idna
39"""
40
41from __future__ import annotations
42
43import OpenSSL.SSL # type: ignore[import-not-found]
44from cryptography import x509
45
46try:
47 from cryptography.x509 import UnsupportedExtension # type: ignore[attr-defined]
48except ImportError:
49 # UnsupportedExtension is gone in cryptography >= 2.1.0
50 class UnsupportedExtension(Exception): # type: ignore[no-redef]
51 pass
52
53
54import logging
55import ssl
56import typing
57from io import BytesIO
58from socket import socket as socket_cls
59
60from .. import util
61
62if typing.TYPE_CHECKING:
63 from OpenSSL.crypto import X509 # type: ignore[import-not-found]
64
65
66__all__ = ["inject_into_urllib3", "extract_from_urllib3"]
67
68# Map from urllib3 to PyOpenSSL compatible parameter-values.
69_openssl_versions: dict[int, int] = {
70 util.ssl_.PROTOCOL_TLS: OpenSSL.SSL.SSLv23_METHOD, # type: ignore[attr-defined]
71 util.ssl_.PROTOCOL_TLS_CLIENT: OpenSSL.SSL.SSLv23_METHOD, # type: ignore[attr-defined]
72 ssl.PROTOCOL_TLSv1: OpenSSL.SSL.TLSv1_METHOD,
73}
74
75if hasattr(ssl, "PROTOCOL_TLSv1_1") and hasattr(OpenSSL.SSL, "TLSv1_1_METHOD"):
76 _openssl_versions[ssl.PROTOCOL_TLSv1_1] = OpenSSL.SSL.TLSv1_1_METHOD
77
78if hasattr(ssl, "PROTOCOL_TLSv1_2") and hasattr(OpenSSL.SSL, "TLSv1_2_METHOD"):
79 _openssl_versions[ssl.PROTOCOL_TLSv1_2] = OpenSSL.SSL.TLSv1_2_METHOD
80
81
82_stdlib_to_openssl_verify = {
83 ssl.CERT_NONE: OpenSSL.SSL.VERIFY_NONE,
84 ssl.CERT_OPTIONAL: OpenSSL.SSL.VERIFY_PEER,
85 ssl.CERT_REQUIRED: OpenSSL.SSL.VERIFY_PEER
86 + OpenSSL.SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
87}
88_openssl_to_stdlib_verify = {v: k for k, v in _stdlib_to_openssl_verify.items()}
89
90# The SSLvX values are the most likely to be missing in the future
91# but we check them all just to be sure.
92_OP_NO_SSLv2_OR_SSLv3: int = getattr(OpenSSL.SSL, "OP_NO_SSLv2", 0) | getattr(
93 OpenSSL.SSL, "OP_NO_SSLv3", 0
94)
95_OP_NO_TLSv1: int = getattr(OpenSSL.SSL, "OP_NO_TLSv1", 0)
96_OP_NO_TLSv1_1: int = getattr(OpenSSL.SSL, "OP_NO_TLSv1_1", 0)
97_OP_NO_TLSv1_2: int = getattr(OpenSSL.SSL, "OP_NO_TLSv1_2", 0)
98_OP_NO_TLSv1_3: int = getattr(OpenSSL.SSL, "OP_NO_TLSv1_3", 0)
99
100_openssl_to_ssl_minimum_version: dict[int, int] = {
101 ssl.TLSVersion.MINIMUM_SUPPORTED: _OP_NO_SSLv2_OR_SSLv3,
102 ssl.TLSVersion.TLSv1: _OP_NO_SSLv2_OR_SSLv3,
103 ssl.TLSVersion.TLSv1_1: _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1,
104 ssl.TLSVersion.TLSv1_2: _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1 | _OP_NO_TLSv1_1,
105 ssl.TLSVersion.TLSv1_3: (
106 _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1 | _OP_NO_TLSv1_1 | _OP_NO_TLSv1_2
107 ),
108 ssl.TLSVersion.MAXIMUM_SUPPORTED: (
109 _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1 | _OP_NO_TLSv1_1 | _OP_NO_TLSv1_2
110 ),
111}
112_openssl_to_ssl_maximum_version: dict[int, int] = {
113 ssl.TLSVersion.MINIMUM_SUPPORTED: (
114 _OP_NO_SSLv2_OR_SSLv3
115 | _OP_NO_TLSv1
116 | _OP_NO_TLSv1_1
117 | _OP_NO_TLSv1_2
118 | _OP_NO_TLSv1_3
119 ),
120 ssl.TLSVersion.TLSv1: (
121 _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1_1 | _OP_NO_TLSv1_2 | _OP_NO_TLSv1_3
122 ),
123 ssl.TLSVersion.TLSv1_1: _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1_2 | _OP_NO_TLSv1_3,
124 ssl.TLSVersion.TLSv1_2: _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1_3,
125 ssl.TLSVersion.TLSv1_3: _OP_NO_SSLv2_OR_SSLv3,
126 ssl.TLSVersion.MAXIMUM_SUPPORTED: _OP_NO_SSLv2_OR_SSLv3,
127}
128
129# OpenSSL will only write 16K at a time
130SSL_WRITE_BLOCKSIZE = 16384
131
132orig_util_SSLContext = util.ssl_.SSLContext
133
134
135log = logging.getLogger(__name__)
136
137
138def inject_into_urllib3() -> None:
139 "Monkey-patch urllib3 with PyOpenSSL-backed SSL-support."
140
141 _validate_dependencies_met()
142
143 util.SSLContext = PyOpenSSLContext # type: ignore[assignment]
144 util.ssl_.SSLContext = PyOpenSSLContext # type: ignore[assignment]
145 util.IS_PYOPENSSL = True
146 util.ssl_.IS_PYOPENSSL = True
147
148
149def extract_from_urllib3() -> None:
150 "Undo monkey-patching by :func:`inject_into_urllib3`."
151
152 util.SSLContext = orig_util_SSLContext
153 util.ssl_.SSLContext = orig_util_SSLContext
154 util.IS_PYOPENSSL = False
155 util.ssl_.IS_PYOPENSSL = False
156
157
158def _validate_dependencies_met() -> None:
159 """
160 Verifies that PyOpenSSL's package-level dependencies have been met.
161 Throws `ImportError` if they are not met.
162 """
163 # Method added in `cryptography==1.1`; not available in older versions
164 from cryptography.x509.extensions import Extensions
165
166 if getattr(Extensions, "get_extension_for_class", None) is None:
167 raise ImportError(
168 "'cryptography' module missing required functionality. "
169 "Try upgrading to v1.3.4 or newer."
170 )
171
172 # pyOpenSSL 0.14 and above use cryptography for OpenSSL bindings. The _x509
173 # attribute is only present on those versions.
174 from OpenSSL.crypto import X509
175
176 x509 = X509()
177 if getattr(x509, "_x509", None) is None:
178 raise ImportError(
179 "'pyOpenSSL' module missing required functionality. "
180 "Try upgrading to v0.14 or newer."
181 )
182
183
184def _dnsname_to_stdlib(name: str) -> str | None:
185 """
186 Converts a dNSName SubjectAlternativeName field to the form used by the
187 standard library on the given Python version.
188
189 Cryptography produces a dNSName as a unicode string that was idna-decoded
190 from ASCII bytes. We need to idna-encode that string to get it back, and
191 then on Python 3 we also need to convert to unicode via UTF-8 (the stdlib
192 uses PyUnicode_FromStringAndSize on it, which decodes via UTF-8).
193
194 If the name cannot be idna-encoded then we return None signalling that
195 the name given should be skipped.
196 """
197
198 def idna_encode(name: str) -> bytes | None:
199 """
200 Borrowed wholesale from the Python Cryptography Project. It turns out
201 that we can't just safely call `idna.encode`: it can explode for
202 wildcard names. This avoids that problem.
203 """
204 import idna
205
206 try:
207 for prefix in ["*.", "."]:
208 if name.startswith(prefix):
209 name = name[len(prefix) :]
210 return prefix.encode("ascii") + idna.encode(name)
211 return idna.encode(name)
212 except idna.core.IDNAError:
213 return None
214
215 # Don't send IPv6 addresses through the IDNA encoder.
216 if ":" in name:
217 return name
218
219 encoded_name = idna_encode(name)
220 if encoded_name is None:
221 return None
222 return encoded_name.decode("utf-8")
223
224
225def get_subj_alt_name(peer_cert: X509) -> list[tuple[str, str]]:
226 """
227 Given an PyOpenSSL certificate, provides all the subject alternative names.
228 """
229 cert = peer_cert.to_cryptography()
230
231 # We want to find the SAN extension. Ask Cryptography to locate it (it's
232 # faster than looping in Python)
233 try:
234 ext = cert.extensions.get_extension_for_class(x509.SubjectAlternativeName).value
235 except x509.ExtensionNotFound:
236 # No such extension, return the empty list.
237 return []
238 except (
239 x509.DuplicateExtension,
240 UnsupportedExtension,
241 x509.UnsupportedGeneralNameType,
242 UnicodeError,
243 ) as e:
244 # A problem has been found with the quality of the certificate. Assume
245 # no SAN field is present.
246 log.warning(
247 "A problem was encountered with the certificate that prevented "
248 "urllib3 from finding the SubjectAlternativeName field. This can "
249 "affect certificate validation. The error was %s",
250 e,
251 )
252 return []
253
254 # We want to return dNSName and iPAddress fields. We need to cast the IPs
255 # back to strings because the match_hostname function wants them as
256 # strings.
257 # Sadly the DNS names need to be idna encoded and then, on Python 3, UTF-8
258 # decoded. This is pretty frustrating, but that's what the standard library
259 # does with certificates, and so we need to attempt to do the same.
260 # We also want to skip over names which cannot be idna encoded.
261 names = [
262 ("DNS", name)
263 for name in map(_dnsname_to_stdlib, ext.get_values_for_type(x509.DNSName))
264 if name is not None
265 ]
266 names.extend(
267 ("IP Address", str(name)) for name in ext.get_values_for_type(x509.IPAddress)
268 )
269
270 return names
271
272
273class WrappedSocket:
274 """API-compatibility wrapper for Python OpenSSL's Connection-class."""
275
276 def __init__(
277 self,
278 connection: OpenSSL.SSL.Connection,
279 socket: socket_cls,
280 suppress_ragged_eofs: bool = True,
281 ) -> None:
282 self.connection = connection
283 self.socket = socket
284 self.suppress_ragged_eofs = suppress_ragged_eofs
285 self._io_refs = 0
286 self._closed = False
287
288 def fileno(self) -> int:
289 return self.socket.fileno()
290
291 # Copy-pasted from Python 3.5 source code
292 def _decref_socketios(self) -> None:
293 if self._io_refs > 0:
294 self._io_refs -= 1
295 if self._closed:
296 self.close()
297
298 def recv(self, *args: typing.Any, **kwargs: typing.Any) -> bytes:
299 try:
300 data = self.connection.recv(*args, **kwargs)
301 except OpenSSL.SSL.SysCallError as e:
302 if self.suppress_ragged_eofs and e.args == (-1, "Unexpected EOF"):
303 return b""
304 else:
305 raise OSError(e.args[0], str(e)) from e
306 except OpenSSL.SSL.ZeroReturnError:
307 if self.connection.get_shutdown() == OpenSSL.SSL.RECEIVED_SHUTDOWN:
308 return b""
309 else:
310 raise
311 except OpenSSL.SSL.WantReadError as e:
312 if not util.wait_for_read(self.socket, self.socket.gettimeout()):
313 raise TimeoutError("The read operation timed out") from e
314 else:
315 return self.recv(*args, **kwargs)
316
317 # TLS 1.3 post-handshake authentication
318 except OpenSSL.SSL.Error as e:
319 raise ssl.SSLError(f"read error: {e!r}") from e
320 else:
321 return data # type: ignore[no-any-return]
322
323 def recv_into(self, *args: typing.Any, **kwargs: typing.Any) -> int:
324 try:
325 return self.connection.recv_into(*args, **kwargs) # type: ignore[no-any-return]
326 except OpenSSL.SSL.SysCallError as e:
327 if self.suppress_ragged_eofs and e.args == (-1, "Unexpected EOF"):
328 return 0
329 else:
330 raise OSError(e.args[0], str(e)) from e
331 except OpenSSL.SSL.ZeroReturnError:
332 if self.connection.get_shutdown() == OpenSSL.SSL.RECEIVED_SHUTDOWN:
333 return 0
334 else:
335 raise
336 except OpenSSL.SSL.WantReadError as e:
337 if not util.wait_for_read(self.socket, self.socket.gettimeout()):
338 raise TimeoutError("The read operation timed out") from e
339 else:
340 return self.recv_into(*args, **kwargs)
341
342 # TLS 1.3 post-handshake authentication
343 except OpenSSL.SSL.Error as e:
344 raise ssl.SSLError(f"read error: {e!r}") from e
345
346 def settimeout(self, timeout: float) -> None:
347 return self.socket.settimeout(timeout)
348
349 def _send_until_done(self, data: bytes) -> int:
350 while True:
351 try:
352 return self.connection.send(data) # type: ignore[no-any-return]
353 except OpenSSL.SSL.WantWriteError as e:
354 if not util.wait_for_write(self.socket, self.socket.gettimeout()):
355 raise TimeoutError() from e
356 continue
357 except OpenSSL.SSL.SysCallError as e:
358 raise OSError(e.args[0], str(e)) from e
359
360 def sendall(self, data: bytes) -> None:
361 total_sent = 0
362 while total_sent < len(data):
363 sent = self._send_until_done(
364 data[total_sent : total_sent + SSL_WRITE_BLOCKSIZE]
365 )
366 total_sent += sent
367
368 def shutdown(self, how: int) -> None:
369 try:
370 self.connection.shutdown()
371 except OpenSSL.SSL.Error as e:
372 raise ssl.SSLError(f"shutdown error: {e!r}") from e
373
374 def close(self) -> None:
375 self._closed = True
376 if self._io_refs <= 0:
377 self._real_close()
378
379 def _real_close(self) -> None:
380 try:
381 return self.connection.close() # type: ignore[no-any-return]
382 except OpenSSL.SSL.Error:
383 return
384
385 def getpeercert(
386 self, binary_form: bool = False
387 ) -> dict[str, list[typing.Any]] | None:
388 x509 = self.connection.get_peer_certificate()
389
390 if not x509:
391 return x509 # type: ignore[no-any-return]
392
393 if binary_form:
394 return OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_ASN1, x509) # type: ignore[no-any-return]
395
396 return {
397 "subject": ((("commonName", x509.get_subject().CN),),), # type: ignore[dict-item]
398 "subjectAltName": get_subj_alt_name(x509),
399 }
400
401 def version(self) -> str:
402 return self.connection.get_protocol_version_name() # type: ignore[no-any-return]
403
404 def selected_alpn_protocol(self) -> str | None:
405 alpn_proto = self.connection.get_alpn_proto_negotiated()
406 return alpn_proto.decode() if alpn_proto else None
407
408
409WrappedSocket.makefile = socket_cls.makefile # type: ignore[attr-defined]
410
411
412class PyOpenSSLContext:
413 """
414 I am a wrapper class for the PyOpenSSL ``Context`` object. I am responsible
415 for translating the interface of the standard library ``SSLContext`` object
416 to calls into PyOpenSSL.
417 """
418
419 def __init__(self, protocol: int) -> None:
420 self.protocol = _openssl_versions[protocol]
421 self._ctx = OpenSSL.SSL.Context(self.protocol)
422 self._options = 0
423 self.check_hostname = False
424 self._minimum_version: int = ssl.TLSVersion.MINIMUM_SUPPORTED
425 self._maximum_version: int = ssl.TLSVersion.MAXIMUM_SUPPORTED
426 self._verify_flags: int = ssl.VERIFY_X509_TRUSTED_FIRST
427
428 @property
429 def options(self) -> int:
430 return self._options
431
432 @options.setter
433 def options(self, value: int) -> None:
434 self._options = value
435 self._set_ctx_options()
436
437 @property
438 def verify_flags(self) -> int:
439 return self._verify_flags
440
441 @verify_flags.setter
442 def verify_flags(self, value: int) -> None:
443 self._verify_flags = value
444 self._ctx.get_cert_store().set_flags(self._verify_flags)
445
446 @property
447 def verify_mode(self) -> int:
448 return _openssl_to_stdlib_verify[self._ctx.get_verify_mode()]
449
450 @verify_mode.setter
451 def verify_mode(self, value: ssl.VerifyMode) -> None:
452 self._ctx.set_verify(_stdlib_to_openssl_verify[value], _verify_callback)
453
454 def set_default_verify_paths(self) -> None:
455 self._ctx.set_default_verify_paths()
456
457 def set_ciphers(self, ciphers: bytes | str) -> None:
458 if isinstance(ciphers, str):
459 ciphers = ciphers.encode("utf-8")
460 self._ctx.set_cipher_list(ciphers)
461
462 def load_verify_locations(
463 self,
464 cafile: str | None = None,
465 capath: str | None = None,
466 cadata: bytes | None = None,
467 ) -> None:
468 if cafile is not None:
469 cafile = cafile.encode("utf-8") # type: ignore[assignment]
470 if capath is not None:
471 capath = capath.encode("utf-8") # type: ignore[assignment]
472 try:
473 self._ctx.load_verify_locations(cafile, capath)
474 if cadata is not None:
475 self._ctx.load_verify_locations(BytesIO(cadata))
476 except OpenSSL.SSL.Error as e:
477 raise ssl.SSLError(f"unable to load trusted certificates: {e!r}") from e
478
479 def load_cert_chain(
480 self,
481 certfile: str,
482 keyfile: str | None = None,
483 password: str | None = None,
484 ) -> None:
485 try:
486 self._ctx.use_certificate_chain_file(certfile)
487 if password is not None:
488 if not isinstance(password, bytes):
489 password = password.encode("utf-8") # type: ignore[assignment]
490 self._ctx.set_passwd_cb(lambda *_: password)
491 self._ctx.use_privatekey_file(keyfile or certfile)
492 except OpenSSL.SSL.Error as e:
493 raise ssl.SSLError(f"Unable to load certificate chain: {e!r}") from e
494
495 def set_alpn_protocols(self, protocols: list[bytes | str]) -> None:
496 protocols = [util.util.to_bytes(p, "ascii") for p in protocols]
497 return self._ctx.set_alpn_protos(protocols) # type: ignore[no-any-return]
498
499 def wrap_socket(
500 self,
501 sock: socket_cls,
502 server_side: bool = False,
503 do_handshake_on_connect: bool = True,
504 suppress_ragged_eofs: bool = True,
505 server_hostname: bytes | str | None = None,
506 ) -> WrappedSocket:
507 cnx = OpenSSL.SSL.Connection(self._ctx, sock)
508
509 # If server_hostname is an IP, don't use it for SNI, per RFC6066 Section 3
510 if server_hostname and not util.ssl_.is_ipaddress(server_hostname):
511 if isinstance(server_hostname, str):
512 server_hostname = server_hostname.encode("utf-8")
513 cnx.set_tlsext_host_name(server_hostname)
514
515 cnx.set_connect_state()
516
517 while True:
518 try:
519 cnx.do_handshake()
520 except OpenSSL.SSL.WantReadError as e:
521 if not util.wait_for_read(sock, sock.gettimeout()):
522 raise TimeoutError("select timed out") from e
523 continue
524 except OpenSSL.SSL.Error as e:
525 raise ssl.SSLError(f"bad handshake: {e!r}") from e
526 break
527
528 return WrappedSocket(cnx, sock)
529
530 def _set_ctx_options(self) -> None:
531 self._ctx.set_options(
532 self._options
533 | _openssl_to_ssl_minimum_version[self._minimum_version]
534 | _openssl_to_ssl_maximum_version[self._maximum_version]
535 )
536
537 @property
538 def minimum_version(self) -> int:
539 return self._minimum_version
540
541 @minimum_version.setter
542 def minimum_version(self, minimum_version: int) -> None:
543 self._minimum_version = minimum_version
544 self._set_ctx_options()
545
546 @property
547 def maximum_version(self) -> int:
548 return self._maximum_version
549
550 @maximum_version.setter
551 def maximum_version(self, maximum_version: int) -> None:
552 self._maximum_version = maximum_version
553 self._set_ctx_options()
554
555
556def _verify_callback(
557 cnx: OpenSSL.SSL.Connection,
558 x509: X509,
559 err_no: int,
560 err_depth: int,
561 return_code: int,
562) -> bool:
563 return err_no == 0