1"""
2Module for using pyOpenSSL as a TLS backend. This module was relevant before
3the standard library ``ssl`` module supported SNI, but now that we've dropped
4support for Python 2.7 all relevant Python versions support SNI so
5**this module is no longer recommended**.
6
7This needs the following packages installed:
8
9* `pyOpenSSL`_ (tested with 16.0.0)
10* `cryptography`_ (minimum 1.3.4, from pyopenssl)
11* `idna`_ (minimum 2.0)
12
13However, pyOpenSSL depends on cryptography, so while we use all three directly here we
14end up having relatively few packages required.
15
16You can install them with the following command:
17
18.. code-block:: bash
19
20 $ python -m pip install pyopenssl cryptography idna
21
22To activate certificate checking, call
23:func:`~urllib3.contrib.pyopenssl.inject_into_urllib3` from your Python code
24before you begin making HTTP requests. This can be done in a ``sitecustomize``
25module, or at any other time before your application begins using ``urllib3``,
26like this:
27
28.. code-block:: python
29
30 try:
31 import urllib3.contrib.pyopenssl
32 urllib3.contrib.pyopenssl.inject_into_urllib3()
33 except ImportError:
34 pass
35
36.. _pyopenssl: https://www.pyopenssl.org
37.. _cryptography: https://cryptography.io
38.. _idna: https://github.com/kjd/idna
39"""
40
41from __future__ import annotations
42
43import OpenSSL.SSL # type: ignore[import-untyped]
44from cryptography import x509
45
46try:
47 from cryptography.x509 import UnsupportedExtension # type: ignore[attr-defined]
48except ImportError:
49 # UnsupportedExtension is gone in cryptography >= 2.1.0
50 class UnsupportedExtension(Exception): # type: ignore[no-redef]
51 pass
52
53
54import logging
55import ssl
56import typing
57from io import BytesIO
58from socket import socket as socket_cls
59from socket import timeout
60
61from .. import util
62
63if typing.TYPE_CHECKING:
64 from OpenSSL.crypto import X509 # type: ignore[import-untyped]
65
66
67__all__ = ["inject_into_urllib3", "extract_from_urllib3"]
68
69# Map from urllib3 to PyOpenSSL compatible parameter-values.
70_openssl_versions: dict[int, int] = {
71 util.ssl_.PROTOCOL_TLS: OpenSSL.SSL.SSLv23_METHOD, # type: ignore[attr-defined]
72 util.ssl_.PROTOCOL_TLS_CLIENT: OpenSSL.SSL.SSLv23_METHOD, # type: ignore[attr-defined]
73 ssl.PROTOCOL_TLSv1: OpenSSL.SSL.TLSv1_METHOD,
74}
75
76if hasattr(ssl, "PROTOCOL_TLSv1_1") and hasattr(OpenSSL.SSL, "TLSv1_1_METHOD"):
77 _openssl_versions[ssl.PROTOCOL_TLSv1_1] = OpenSSL.SSL.TLSv1_1_METHOD
78
79if hasattr(ssl, "PROTOCOL_TLSv1_2") and hasattr(OpenSSL.SSL, "TLSv1_2_METHOD"):
80 _openssl_versions[ssl.PROTOCOL_TLSv1_2] = OpenSSL.SSL.TLSv1_2_METHOD
81
82
83_stdlib_to_openssl_verify = {
84 ssl.CERT_NONE: OpenSSL.SSL.VERIFY_NONE,
85 ssl.CERT_OPTIONAL: OpenSSL.SSL.VERIFY_PEER,
86 ssl.CERT_REQUIRED: OpenSSL.SSL.VERIFY_PEER
87 + OpenSSL.SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
88}
89_openssl_to_stdlib_verify = {v: k for k, v in _stdlib_to_openssl_verify.items()}
90
91# The SSLvX values are the most likely to be missing in the future
92# but we check them all just to be sure.
93_OP_NO_SSLv2_OR_SSLv3: int = getattr(OpenSSL.SSL, "OP_NO_SSLv2", 0) | getattr(
94 OpenSSL.SSL, "OP_NO_SSLv3", 0
95)
96_OP_NO_TLSv1: int = getattr(OpenSSL.SSL, "OP_NO_TLSv1", 0)
97_OP_NO_TLSv1_1: int = getattr(OpenSSL.SSL, "OP_NO_TLSv1_1", 0)
98_OP_NO_TLSv1_2: int = getattr(OpenSSL.SSL, "OP_NO_TLSv1_2", 0)
99_OP_NO_TLSv1_3: int = getattr(OpenSSL.SSL, "OP_NO_TLSv1_3", 0)
100
101_openssl_to_ssl_minimum_version: dict[int, int] = {
102 ssl.TLSVersion.MINIMUM_SUPPORTED: _OP_NO_SSLv2_OR_SSLv3,
103 ssl.TLSVersion.TLSv1: _OP_NO_SSLv2_OR_SSLv3,
104 ssl.TLSVersion.TLSv1_1: _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1,
105 ssl.TLSVersion.TLSv1_2: _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1 | _OP_NO_TLSv1_1,
106 ssl.TLSVersion.TLSv1_3: (
107 _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1 | _OP_NO_TLSv1_1 | _OP_NO_TLSv1_2
108 ),
109 ssl.TLSVersion.MAXIMUM_SUPPORTED: (
110 _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1 | _OP_NO_TLSv1_1 | _OP_NO_TLSv1_2
111 ),
112}
113_openssl_to_ssl_maximum_version: dict[int, int] = {
114 ssl.TLSVersion.MINIMUM_SUPPORTED: (
115 _OP_NO_SSLv2_OR_SSLv3
116 | _OP_NO_TLSv1
117 | _OP_NO_TLSv1_1
118 | _OP_NO_TLSv1_2
119 | _OP_NO_TLSv1_3
120 ),
121 ssl.TLSVersion.TLSv1: (
122 _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1_1 | _OP_NO_TLSv1_2 | _OP_NO_TLSv1_3
123 ),
124 ssl.TLSVersion.TLSv1_1: _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1_2 | _OP_NO_TLSv1_3,
125 ssl.TLSVersion.TLSv1_2: _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1_3,
126 ssl.TLSVersion.TLSv1_3: _OP_NO_SSLv2_OR_SSLv3,
127 ssl.TLSVersion.MAXIMUM_SUPPORTED: _OP_NO_SSLv2_OR_SSLv3,
128}
129
130# OpenSSL will only write 16K at a time
131SSL_WRITE_BLOCKSIZE = 16384
132
133orig_util_SSLContext = util.ssl_.SSLContext
134
135
136log = logging.getLogger(__name__)
137
138
139def inject_into_urllib3() -> None:
140 "Monkey-patch urllib3 with PyOpenSSL-backed SSL-support."
141
142 _validate_dependencies_met()
143
144 util.SSLContext = PyOpenSSLContext # type: ignore[assignment]
145 util.ssl_.SSLContext = PyOpenSSLContext # type: ignore[assignment]
146 util.IS_PYOPENSSL = True
147 util.ssl_.IS_PYOPENSSL = True
148
149
150def extract_from_urllib3() -> None:
151 "Undo monkey-patching by :func:`inject_into_urllib3`."
152
153 util.SSLContext = orig_util_SSLContext
154 util.ssl_.SSLContext = orig_util_SSLContext
155 util.IS_PYOPENSSL = False
156 util.ssl_.IS_PYOPENSSL = False
157
158
159def _validate_dependencies_met() -> None:
160 """
161 Verifies that PyOpenSSL's package-level dependencies have been met.
162 Throws `ImportError` if they are not met.
163 """
164 # Method added in `cryptography==1.1`; not available in older versions
165 from cryptography.x509.extensions import Extensions
166
167 if getattr(Extensions, "get_extension_for_class", None) is None:
168 raise ImportError(
169 "'cryptography' module missing required functionality. "
170 "Try upgrading to v1.3.4 or newer."
171 )
172
173 # pyOpenSSL 0.14 and above use cryptography for OpenSSL bindings. The _x509
174 # attribute is only present on those versions.
175 from OpenSSL.crypto import X509
176
177 x509 = X509()
178 if getattr(x509, "_x509", None) is None:
179 raise ImportError(
180 "'pyOpenSSL' module missing required functionality. "
181 "Try upgrading to v0.14 or newer."
182 )
183
184
185def _dnsname_to_stdlib(name: str) -> str | None:
186 """
187 Converts a dNSName SubjectAlternativeName field to the form used by the
188 standard library on the given Python version.
189
190 Cryptography produces a dNSName as a unicode string that was idna-decoded
191 from ASCII bytes. We need to idna-encode that string to get it back, and
192 then on Python 3 we also need to convert to unicode via UTF-8 (the stdlib
193 uses PyUnicode_FromStringAndSize on it, which decodes via UTF-8).
194
195 If the name cannot be idna-encoded then we return None signalling that
196 the name given should be skipped.
197 """
198
199 def idna_encode(name: str) -> bytes | None:
200 """
201 Borrowed wholesale from the Python Cryptography Project. It turns out
202 that we can't just safely call `idna.encode`: it can explode for
203 wildcard names. This avoids that problem.
204 """
205 import idna
206
207 try:
208 for prefix in ["*.", "."]:
209 if name.startswith(prefix):
210 name = name[len(prefix) :]
211 return prefix.encode("ascii") + idna.encode(name)
212 return idna.encode(name)
213 except idna.core.IDNAError:
214 return None
215
216 # Don't send IPv6 addresses through the IDNA encoder.
217 if ":" in name:
218 return name
219
220 encoded_name = idna_encode(name)
221 if encoded_name is None:
222 return None
223 return encoded_name.decode("utf-8")
224
225
226def get_subj_alt_name(peer_cert: X509) -> list[tuple[str, str]]:
227 """
228 Given an PyOpenSSL certificate, provides all the subject alternative names.
229 """
230 cert = peer_cert.to_cryptography()
231
232 # We want to find the SAN extension. Ask Cryptography to locate it (it's
233 # faster than looping in Python)
234 try:
235 ext = cert.extensions.get_extension_for_class(x509.SubjectAlternativeName).value
236 except x509.ExtensionNotFound:
237 # No such extension, return the empty list.
238 return []
239 except (
240 x509.DuplicateExtension,
241 UnsupportedExtension,
242 x509.UnsupportedGeneralNameType,
243 UnicodeError,
244 ) as e:
245 # A problem has been found with the quality of the certificate. Assume
246 # no SAN field is present.
247 log.warning(
248 "A problem was encountered with the certificate that prevented "
249 "urllib3 from finding the SubjectAlternativeName field. This can "
250 "affect certificate validation. The error was %s",
251 e,
252 )
253 return []
254
255 # We want to return dNSName and iPAddress fields. We need to cast the IPs
256 # back to strings because the match_hostname function wants them as
257 # strings.
258 # Sadly the DNS names need to be idna encoded and then, on Python 3, UTF-8
259 # decoded. This is pretty frustrating, but that's what the standard library
260 # does with certificates, and so we need to attempt to do the same.
261 # We also want to skip over names which cannot be idna encoded.
262 names = [
263 ("DNS", name)
264 for name in map(_dnsname_to_stdlib, ext.get_values_for_type(x509.DNSName))
265 if name is not None
266 ]
267 names.extend(
268 ("IP Address", str(name)) for name in ext.get_values_for_type(x509.IPAddress)
269 )
270
271 return names
272
273
274class WrappedSocket:
275 """API-compatibility wrapper for Python OpenSSL's Connection-class."""
276
277 def __init__(
278 self,
279 connection: OpenSSL.SSL.Connection,
280 socket: socket_cls,
281 suppress_ragged_eofs: bool = True,
282 ) -> None:
283 self.connection = connection
284 self.socket = socket
285 self.suppress_ragged_eofs = suppress_ragged_eofs
286 self._io_refs = 0
287 self._closed = False
288
289 def fileno(self) -> int:
290 return self.socket.fileno()
291
292 # Copy-pasted from Python 3.5 source code
293 def _decref_socketios(self) -> None:
294 if self._io_refs > 0:
295 self._io_refs -= 1
296 if self._closed:
297 self.close()
298
299 def recv(self, *args: typing.Any, **kwargs: typing.Any) -> bytes:
300 try:
301 data = self.connection.recv(*args, **kwargs)
302 except OpenSSL.SSL.SysCallError as e:
303 if self.suppress_ragged_eofs and e.args == (-1, "Unexpected EOF"):
304 return b""
305 else:
306 raise OSError(e.args[0], str(e)) from e
307 except OpenSSL.SSL.ZeroReturnError:
308 if self.connection.get_shutdown() == OpenSSL.SSL.RECEIVED_SHUTDOWN:
309 return b""
310 else:
311 raise
312 except OpenSSL.SSL.WantReadError as e:
313 if not util.wait_for_read(self.socket, self.socket.gettimeout()):
314 raise timeout("The read operation timed out") from e
315 else:
316 return self.recv(*args, **kwargs)
317
318 # TLS 1.3 post-handshake authentication
319 except OpenSSL.SSL.Error as e:
320 raise ssl.SSLError(f"read error: {e!r}") from e
321 else:
322 return data # type: ignore[no-any-return]
323
324 def recv_into(self, *args: typing.Any, **kwargs: typing.Any) -> int:
325 try:
326 return self.connection.recv_into(*args, **kwargs) # type: ignore[no-any-return]
327 except OpenSSL.SSL.SysCallError as e:
328 if self.suppress_ragged_eofs and e.args == (-1, "Unexpected EOF"):
329 return 0
330 else:
331 raise OSError(e.args[0], str(e)) from e
332 except OpenSSL.SSL.ZeroReturnError:
333 if self.connection.get_shutdown() == OpenSSL.SSL.RECEIVED_SHUTDOWN:
334 return 0
335 else:
336 raise
337 except OpenSSL.SSL.WantReadError as e:
338 if not util.wait_for_read(self.socket, self.socket.gettimeout()):
339 raise timeout("The read operation timed out") from e
340 else:
341 return self.recv_into(*args, **kwargs)
342
343 # TLS 1.3 post-handshake authentication
344 except OpenSSL.SSL.Error as e:
345 raise ssl.SSLError(f"read error: {e!r}") from e
346
347 def settimeout(self, timeout: float) -> None:
348 return self.socket.settimeout(timeout)
349
350 def _send_until_done(self, data: bytes) -> int:
351 while True:
352 try:
353 return self.connection.send(data) # type: ignore[no-any-return]
354 except OpenSSL.SSL.WantWriteError as e:
355 if not util.wait_for_write(self.socket, self.socket.gettimeout()):
356 raise timeout() from e
357 continue
358 except OpenSSL.SSL.SysCallError as e:
359 raise OSError(e.args[0], str(e)) from e
360
361 def sendall(self, data: bytes) -> None:
362 total_sent = 0
363 while total_sent < len(data):
364 sent = self._send_until_done(
365 data[total_sent : total_sent + SSL_WRITE_BLOCKSIZE]
366 )
367 total_sent += sent
368
369 def shutdown(self, how: int) -> None:
370 try:
371 self.connection.shutdown()
372 except OpenSSL.SSL.Error as e:
373 raise ssl.SSLError(f"shutdown error: {e!r}") from e
374
375 def close(self) -> None:
376 self._closed = True
377 if self._io_refs <= 0:
378 self._real_close()
379
380 def _real_close(self) -> None:
381 try:
382 return self.connection.close() # type: ignore[no-any-return]
383 except OpenSSL.SSL.Error:
384 return
385
386 def getpeercert(
387 self, binary_form: bool = False
388 ) -> dict[str, list[typing.Any]] | None:
389 x509 = self.connection.get_peer_certificate()
390
391 if not x509:
392 return x509 # type: ignore[no-any-return]
393
394 if binary_form:
395 return OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_ASN1, x509) # type: ignore[no-any-return]
396
397 return {
398 "subject": ((("commonName", x509.get_subject().CN),),), # type: ignore[dict-item]
399 "subjectAltName": get_subj_alt_name(x509),
400 }
401
402 def version(self) -> str:
403 return self.connection.get_protocol_version_name() # type: ignore[no-any-return]
404
405 def selected_alpn_protocol(self) -> str | None:
406 alpn_proto = self.connection.get_alpn_proto_negotiated()
407 return alpn_proto.decode() if alpn_proto else None
408
409
410WrappedSocket.makefile = socket_cls.makefile # type: ignore[attr-defined]
411
412
413class PyOpenSSLContext:
414 """
415 I am a wrapper class for the PyOpenSSL ``Context`` object. I am responsible
416 for translating the interface of the standard library ``SSLContext`` object
417 to calls into PyOpenSSL.
418 """
419
420 def __init__(self, protocol: int) -> None:
421 self.protocol = _openssl_versions[protocol]
422 self._ctx = OpenSSL.SSL.Context(self.protocol)
423 self._options = 0
424 self.check_hostname = False
425 self._minimum_version: int = ssl.TLSVersion.MINIMUM_SUPPORTED
426 self._maximum_version: int = ssl.TLSVersion.MAXIMUM_SUPPORTED
427
428 @property
429 def options(self) -> int:
430 return self._options
431
432 @options.setter
433 def options(self, value: int) -> None:
434 self._options = value
435 self._set_ctx_options()
436
437 @property
438 def verify_mode(self) -> int:
439 return _openssl_to_stdlib_verify[self._ctx.get_verify_mode()]
440
441 @verify_mode.setter
442 def verify_mode(self, value: ssl.VerifyMode) -> None:
443 self._ctx.set_verify(_stdlib_to_openssl_verify[value], _verify_callback)
444
445 def set_default_verify_paths(self) -> None:
446 self._ctx.set_default_verify_paths()
447
448 def set_ciphers(self, ciphers: bytes | str) -> None:
449 if isinstance(ciphers, str):
450 ciphers = ciphers.encode("utf-8")
451 self._ctx.set_cipher_list(ciphers)
452
453 def load_verify_locations(
454 self,
455 cafile: str | None = None,
456 capath: str | None = None,
457 cadata: bytes | None = None,
458 ) -> None:
459 if cafile is not None:
460 cafile = cafile.encode("utf-8") # type: ignore[assignment]
461 if capath is not None:
462 capath = capath.encode("utf-8") # type: ignore[assignment]
463 try:
464 self._ctx.load_verify_locations(cafile, capath)
465 if cadata is not None:
466 self._ctx.load_verify_locations(BytesIO(cadata))
467 except OpenSSL.SSL.Error as e:
468 raise ssl.SSLError(f"unable to load trusted certificates: {e!r}") from e
469
470 def load_cert_chain(
471 self,
472 certfile: str,
473 keyfile: str | None = None,
474 password: str | None = None,
475 ) -> None:
476 try:
477 self._ctx.use_certificate_chain_file(certfile)
478 if password is not None:
479 if not isinstance(password, bytes):
480 password = password.encode("utf-8") # type: ignore[assignment]
481 self._ctx.set_passwd_cb(lambda *_: password)
482 self._ctx.use_privatekey_file(keyfile or certfile)
483 except OpenSSL.SSL.Error as e:
484 raise ssl.SSLError(f"Unable to load certificate chain: {e!r}") from e
485
486 def set_alpn_protocols(self, protocols: list[bytes | str]) -> None:
487 protocols = [util.util.to_bytes(p, "ascii") for p in protocols]
488 return self._ctx.set_alpn_protos(protocols) # type: ignore[no-any-return]
489
490 def wrap_socket(
491 self,
492 sock: socket_cls,
493 server_side: bool = False,
494 do_handshake_on_connect: bool = True,
495 suppress_ragged_eofs: bool = True,
496 server_hostname: bytes | str | None = None,
497 ) -> WrappedSocket:
498 cnx = OpenSSL.SSL.Connection(self._ctx, sock)
499
500 # If server_hostname is an IP, don't use it for SNI, per RFC6066 Section 3
501 if server_hostname and not util.ssl_.is_ipaddress(server_hostname):
502 if isinstance(server_hostname, str):
503 server_hostname = server_hostname.encode("utf-8")
504 cnx.set_tlsext_host_name(server_hostname)
505
506 cnx.set_connect_state()
507
508 while True:
509 try:
510 cnx.do_handshake()
511 except OpenSSL.SSL.WantReadError as e:
512 if not util.wait_for_read(sock, sock.gettimeout()):
513 raise timeout("select timed out") from e
514 continue
515 except OpenSSL.SSL.Error as e:
516 raise ssl.SSLError(f"bad handshake: {e!r}") from e
517 break
518
519 return WrappedSocket(cnx, sock)
520
521 def _set_ctx_options(self) -> None:
522 self._ctx.set_options(
523 self._options
524 | _openssl_to_ssl_minimum_version[self._minimum_version]
525 | _openssl_to_ssl_maximum_version[self._maximum_version]
526 )
527
528 @property
529 def minimum_version(self) -> int:
530 return self._minimum_version
531
532 @minimum_version.setter
533 def minimum_version(self, minimum_version: int) -> None:
534 self._minimum_version = minimum_version
535 self._set_ctx_options()
536
537 @property
538 def maximum_version(self) -> int:
539 return self._maximum_version
540
541 @maximum_version.setter
542 def maximum_version(self, maximum_version: int) -> None:
543 self._maximum_version = maximum_version
544 self._set_ctx_options()
545
546
547def _verify_callback(
548 cnx: OpenSSL.SSL.Connection,
549 x509: X509,
550 err_no: int,
551 err_depth: int,
552 return_code: int,
553) -> bool:
554 return err_no == 0