1"""
2Module for using pyOpenSSL as a TLS backend. This module was relevant before
3the standard library ``ssl`` module supported SNI, but now that we've dropped
4support for Python 2.7 all relevant Python versions support SNI so
5**this module is no longer recommended**.
6
7This needs the following packages installed:
8
9* `pyOpenSSL`_ (tested with 16.0.0)
10* `cryptography`_ (minimum 1.3.4, from pyopenssl)
11* `idna`_ (minimum 2.0)
12
13However, pyOpenSSL depends on cryptography, so while we use all three directly here we
14end up having relatively few packages required.
15
16You can install them with the following command:
17
18.. code-block:: bash
19
20 $ python -m pip install pyopenssl cryptography idna
21
22To activate certificate checking, call
23:func:`~urllib3.contrib.pyopenssl.inject_into_urllib3` from your Python code
24before you begin making HTTP requests. This can be done in a ``sitecustomize``
25module, or at any other time before your application begins using ``urllib3``,
26like this:
27
28.. code-block:: python
29
30 try:
31 import urllib3.contrib.pyopenssl
32 urllib3.contrib.pyopenssl.inject_into_urllib3()
33 except ImportError:
34 pass
35
36.. _pyopenssl: https://www.pyopenssl.org
37.. _cryptography: https://cryptography.io
38.. _idna: https://github.com/kjd/idna
39"""
40
41from __future__ import annotations
42
43import OpenSSL.SSL # type: ignore[import-untyped]
44from cryptography import x509
45
46try:
47 from cryptography.x509 import UnsupportedExtension # type: ignore[attr-defined]
48except ImportError:
49 # UnsupportedExtension is gone in cryptography >= 2.1.0
50 class UnsupportedExtension(Exception): # type: ignore[no-redef]
51 pass
52
53
54import logging
55import ssl
56import typing
57from io import BytesIO
58from socket import socket as socket_cls
59from socket import timeout
60
61from .. import util
62
63if typing.TYPE_CHECKING:
64 from OpenSSL.crypto import X509 # type: ignore[import-untyped]
65
66
67__all__ = ["inject_into_urllib3", "extract_from_urllib3"]
68
69# Map from urllib3 to PyOpenSSL compatible parameter-values.
70_openssl_versions: dict[int, int] = {
71 util.ssl_.PROTOCOL_TLS: OpenSSL.SSL.SSLv23_METHOD, # type: ignore[attr-defined]
72 util.ssl_.PROTOCOL_TLS_CLIENT: OpenSSL.SSL.SSLv23_METHOD, # type: ignore[attr-defined]
73 ssl.PROTOCOL_TLSv1: OpenSSL.SSL.TLSv1_METHOD,
74}
75
76if hasattr(ssl, "PROTOCOL_TLSv1_1") and hasattr(OpenSSL.SSL, "TLSv1_1_METHOD"):
77 _openssl_versions[ssl.PROTOCOL_TLSv1_1] = OpenSSL.SSL.TLSv1_1_METHOD
78
79if hasattr(ssl, "PROTOCOL_TLSv1_2") and hasattr(OpenSSL.SSL, "TLSv1_2_METHOD"):
80 _openssl_versions[ssl.PROTOCOL_TLSv1_2] = OpenSSL.SSL.TLSv1_2_METHOD
81
82
83_stdlib_to_openssl_verify = {
84 ssl.CERT_NONE: OpenSSL.SSL.VERIFY_NONE,
85 ssl.CERT_OPTIONAL: OpenSSL.SSL.VERIFY_PEER,
86 ssl.CERT_REQUIRED: OpenSSL.SSL.VERIFY_PEER
87 + OpenSSL.SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
88}
89_openssl_to_stdlib_verify = {v: k for k, v in _stdlib_to_openssl_verify.items()}
90
91# The SSLvX values are the most likely to be missing in the future
92# but we check them all just to be sure.
93_OP_NO_SSLv2_OR_SSLv3: int = getattr(OpenSSL.SSL, "OP_NO_SSLv2", 0) | getattr(
94 OpenSSL.SSL, "OP_NO_SSLv3", 0
95)
96_OP_NO_TLSv1: int = getattr(OpenSSL.SSL, "OP_NO_TLSv1", 0)
97_OP_NO_TLSv1_1: int = getattr(OpenSSL.SSL, "OP_NO_TLSv1_1", 0)
98_OP_NO_TLSv1_2: int = getattr(OpenSSL.SSL, "OP_NO_TLSv1_2", 0)
99_OP_NO_TLSv1_3: int = getattr(OpenSSL.SSL, "OP_NO_TLSv1_3", 0)
100
101_openssl_to_ssl_minimum_version: dict[int, int] = {
102 ssl.TLSVersion.MINIMUM_SUPPORTED: _OP_NO_SSLv2_OR_SSLv3,
103 ssl.TLSVersion.TLSv1: _OP_NO_SSLv2_OR_SSLv3,
104 ssl.TLSVersion.TLSv1_1: _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1,
105 ssl.TLSVersion.TLSv1_2: _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1 | _OP_NO_TLSv1_1,
106 ssl.TLSVersion.TLSv1_3: (
107 _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1 | _OP_NO_TLSv1_1 | _OP_NO_TLSv1_2
108 ),
109 ssl.TLSVersion.MAXIMUM_SUPPORTED: (
110 _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1 | _OP_NO_TLSv1_1 | _OP_NO_TLSv1_2
111 ),
112}
113_openssl_to_ssl_maximum_version: dict[int, int] = {
114 ssl.TLSVersion.MINIMUM_SUPPORTED: (
115 _OP_NO_SSLv2_OR_SSLv3
116 | _OP_NO_TLSv1
117 | _OP_NO_TLSv1_1
118 | _OP_NO_TLSv1_2
119 | _OP_NO_TLSv1_3
120 ),
121 ssl.TLSVersion.TLSv1: (
122 _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1_1 | _OP_NO_TLSv1_2 | _OP_NO_TLSv1_3
123 ),
124 ssl.TLSVersion.TLSv1_1: _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1_2 | _OP_NO_TLSv1_3,
125 ssl.TLSVersion.TLSv1_2: _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1_3,
126 ssl.TLSVersion.TLSv1_3: _OP_NO_SSLv2_OR_SSLv3,
127 ssl.TLSVersion.MAXIMUM_SUPPORTED: _OP_NO_SSLv2_OR_SSLv3,
128}
129
130# OpenSSL will only write 16K at a time
131SSL_WRITE_BLOCKSIZE = 16384
132
133orig_util_SSLContext = util.ssl_.SSLContext
134
135
136log = logging.getLogger(__name__)
137
138
139def inject_into_urllib3() -> None:
140 "Monkey-patch urllib3 with PyOpenSSL-backed SSL-support."
141
142 _validate_dependencies_met()
143
144 util.SSLContext = PyOpenSSLContext # type: ignore[assignment]
145 util.ssl_.SSLContext = PyOpenSSLContext # type: ignore[assignment]
146 util.IS_PYOPENSSL = True
147 util.ssl_.IS_PYOPENSSL = True
148
149
150def extract_from_urllib3() -> None:
151 "Undo monkey-patching by :func:`inject_into_urllib3`."
152
153 util.SSLContext = orig_util_SSLContext
154 util.ssl_.SSLContext = orig_util_SSLContext
155 util.IS_PYOPENSSL = False
156 util.ssl_.IS_PYOPENSSL = False
157
158
159def _validate_dependencies_met() -> None:
160 """
161 Verifies that PyOpenSSL's package-level dependencies have been met.
162 Throws `ImportError` if they are not met.
163 """
164 # Method added in `cryptography==1.1`; not available in older versions
165 from cryptography.x509.extensions import Extensions
166
167 if getattr(Extensions, "get_extension_for_class", None) is None:
168 raise ImportError(
169 "'cryptography' module missing required functionality. "
170 "Try upgrading to v1.3.4 or newer."
171 )
172
173 # pyOpenSSL 0.14 and above use cryptography for OpenSSL bindings. The _x509
174 # attribute is only present on those versions.
175 from OpenSSL.crypto import X509
176
177 x509 = X509()
178 if getattr(x509, "_x509", None) is None:
179 raise ImportError(
180 "'pyOpenSSL' module missing required functionality. "
181 "Try upgrading to v0.14 or newer."
182 )
183
184
185def _dnsname_to_stdlib(name: str) -> str | None:
186 """
187 Converts a dNSName SubjectAlternativeName field to the form used by the
188 standard library on the given Python version.
189
190 Cryptography produces a dNSName as a unicode string that was idna-decoded
191 from ASCII bytes. We need to idna-encode that string to get it back, and
192 then on Python 3 we also need to convert to unicode via UTF-8 (the stdlib
193 uses PyUnicode_FromStringAndSize on it, which decodes via UTF-8).
194
195 If the name cannot be idna-encoded then we return None signalling that
196 the name given should be skipped.
197 """
198
199 def idna_encode(name: str) -> bytes | None:
200 """
201 Borrowed wholesale from the Python Cryptography Project. It turns out
202 that we can't just safely call `idna.encode`: it can explode for
203 wildcard names. This avoids that problem.
204 """
205 import idna
206
207 try:
208 for prefix in ["*.", "."]:
209 if name.startswith(prefix):
210 name = name[len(prefix) :]
211 return prefix.encode("ascii") + idna.encode(name)
212 return idna.encode(name)
213 except idna.core.IDNAError:
214 return None
215
216 # Don't send IPv6 addresses through the IDNA encoder.
217 if ":" in name:
218 return name
219
220 encoded_name = idna_encode(name)
221 if encoded_name is None:
222 return None
223 return encoded_name.decode("utf-8")
224
225
226def get_subj_alt_name(peer_cert: X509) -> list[tuple[str, str]]:
227 """
228 Given an PyOpenSSL certificate, provides all the subject alternative names.
229 """
230 cert = peer_cert.to_cryptography()
231
232 # We want to find the SAN extension. Ask Cryptography to locate it (it's
233 # faster than looping in Python)
234 try:
235 ext = cert.extensions.get_extension_for_class(x509.SubjectAlternativeName).value
236 except x509.ExtensionNotFound:
237 # No such extension, return the empty list.
238 return []
239 except (
240 x509.DuplicateExtension,
241 UnsupportedExtension,
242 x509.UnsupportedGeneralNameType,
243 UnicodeError,
244 ) as e:
245 # A problem has been found with the quality of the certificate. Assume
246 # no SAN field is present.
247 log.warning(
248 "A problem was encountered with the certificate that prevented "
249 "urllib3 from finding the SubjectAlternativeName field. This can "
250 "affect certificate validation. The error was %s",
251 e,
252 )
253 return []
254
255 # We want to return dNSName and iPAddress fields. We need to cast the IPs
256 # back to strings because the match_hostname function wants them as
257 # strings.
258 # Sadly the DNS names need to be idna encoded and then, on Python 3, UTF-8
259 # decoded. This is pretty frustrating, but that's what the standard library
260 # does with certificates, and so we need to attempt to do the same.
261 # We also want to skip over names which cannot be idna encoded.
262 names = [
263 ("DNS", name)
264 for name in map(_dnsname_to_stdlib, ext.get_values_for_type(x509.DNSName))
265 if name is not None
266 ]
267 names.extend(
268 ("IP Address", str(name)) for name in ext.get_values_for_type(x509.IPAddress)
269 )
270
271 return names
272
273
274class WrappedSocket:
275 """API-compatibility wrapper for Python OpenSSL's Connection-class."""
276
277 def __init__(
278 self,
279 connection: OpenSSL.SSL.Connection,
280 socket: socket_cls,
281 suppress_ragged_eofs: bool = True,
282 ) -> None:
283 self.connection = connection
284 self.socket = socket
285 self.suppress_ragged_eofs = suppress_ragged_eofs
286 self._io_refs = 0
287 self._closed = False
288
289 def fileno(self) -> int:
290 return self.socket.fileno()
291
292 # Copy-pasted from Python 3.5 source code
293 def _decref_socketios(self) -> None:
294 if self._io_refs > 0:
295 self._io_refs -= 1
296 if self._closed:
297 self.close()
298
299 def recv(self, *args: typing.Any, **kwargs: typing.Any) -> bytes:
300 try:
301 data = self.connection.recv(*args, **kwargs)
302 except OpenSSL.SSL.SysCallError as e:
303 if self.suppress_ragged_eofs and e.args == (-1, "Unexpected EOF"):
304 return b""
305 else:
306 raise OSError(e.args[0], str(e)) from e
307 except OpenSSL.SSL.ZeroReturnError:
308 if self.connection.get_shutdown() == OpenSSL.SSL.RECEIVED_SHUTDOWN:
309 return b""
310 else:
311 raise
312 except OpenSSL.SSL.WantReadError as e:
313 if not util.wait_for_read(self.socket, self.socket.gettimeout()):
314 raise timeout("The read operation timed out") from e
315 else:
316 return self.recv(*args, **kwargs)
317
318 # TLS 1.3 post-handshake authentication
319 except OpenSSL.SSL.Error as e:
320 raise ssl.SSLError(f"read error: {e!r}") from e
321 else:
322 return data # type: ignore[no-any-return]
323
324 def recv_into(self, *args: typing.Any, **kwargs: typing.Any) -> int:
325 try:
326 return self.connection.recv_into(*args, **kwargs) # type: ignore[no-any-return]
327 except OpenSSL.SSL.SysCallError as e:
328 if self.suppress_ragged_eofs and e.args == (-1, "Unexpected EOF"):
329 return 0
330 else:
331 raise OSError(e.args[0], str(e)) from e
332 except OpenSSL.SSL.ZeroReturnError:
333 if self.connection.get_shutdown() == OpenSSL.SSL.RECEIVED_SHUTDOWN:
334 return 0
335 else:
336 raise
337 except OpenSSL.SSL.WantReadError as e:
338 if not util.wait_for_read(self.socket, self.socket.gettimeout()):
339 raise timeout("The read operation timed out") from e
340 else:
341 return self.recv_into(*args, **kwargs)
342
343 # TLS 1.3 post-handshake authentication
344 except OpenSSL.SSL.Error as e:
345 raise ssl.SSLError(f"read error: {e!r}") from e
346
347 def settimeout(self, timeout: float) -> None:
348 return self.socket.settimeout(timeout)
349
350 def _send_until_done(self, data: bytes) -> int:
351 while True:
352 try:
353 return self.connection.send(data) # type: ignore[no-any-return]
354 except OpenSSL.SSL.WantWriteError as e:
355 if not util.wait_for_write(self.socket, self.socket.gettimeout()):
356 raise timeout() from e
357 continue
358 except OpenSSL.SSL.SysCallError as e:
359 raise OSError(e.args[0], str(e)) from e
360
361 def sendall(self, data: bytes) -> None:
362 total_sent = 0
363 while total_sent < len(data):
364 sent = self._send_until_done(
365 data[total_sent : total_sent + SSL_WRITE_BLOCKSIZE]
366 )
367 total_sent += sent
368
369 def shutdown(self, how: int) -> None:
370 try:
371 self.connection.shutdown()
372 except OpenSSL.SSL.Error as e:
373 raise ssl.SSLError(f"shutdown error: {e!r}") from e
374
375 def close(self) -> None:
376 self._closed = True
377 if self._io_refs <= 0:
378 self._real_close()
379
380 def _real_close(self) -> None:
381 try:
382 return self.connection.close() # type: ignore[no-any-return]
383 except OpenSSL.SSL.Error:
384 return
385
386 def getpeercert(
387 self, binary_form: bool = False
388 ) -> dict[str, list[typing.Any]] | None:
389 x509 = self.connection.get_peer_certificate()
390
391 if not x509:
392 return x509 # type: ignore[no-any-return]
393
394 if binary_form:
395 return OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_ASN1, x509) # type: ignore[no-any-return]
396
397 return {
398 "subject": ((("commonName", x509.get_subject().CN),),), # type: ignore[dict-item]
399 "subjectAltName": get_subj_alt_name(x509),
400 }
401
402 def version(self) -> str:
403 return self.connection.get_protocol_version_name() # type: ignore[no-any-return]
404
405 def selected_alpn_protocol(self) -> str | None:
406 alpn_proto = self.connection.get_alpn_proto_negotiated()
407 return alpn_proto.decode() if alpn_proto else None
408
409
410WrappedSocket.makefile = socket_cls.makefile # type: ignore[attr-defined]
411
412
413class PyOpenSSLContext:
414 """
415 I am a wrapper class for the PyOpenSSL ``Context`` object. I am responsible
416 for translating the interface of the standard library ``SSLContext`` object
417 to calls into PyOpenSSL.
418 """
419
420 def __init__(self, protocol: int) -> None:
421 self.protocol = _openssl_versions[protocol]
422 self._ctx = OpenSSL.SSL.Context(self.protocol)
423 self._options = 0
424 self.check_hostname = False
425 self._minimum_version: int = ssl.TLSVersion.MINIMUM_SUPPORTED
426 self._maximum_version: int = ssl.TLSVersion.MAXIMUM_SUPPORTED
427 self._verify_flags: int = ssl.VERIFY_X509_TRUSTED_FIRST
428
429 @property
430 def options(self) -> int:
431 return self._options
432
433 @options.setter
434 def options(self, value: int) -> None:
435 self._options = value
436 self._set_ctx_options()
437
438 @property
439 def verify_flags(self) -> int:
440 return self._verify_flags
441
442 @verify_flags.setter
443 def verify_flags(self, value: int) -> None:
444 self._verify_flags = value
445 self._ctx.get_cert_store().set_flags(self._verify_flags)
446
447 @property
448 def verify_mode(self) -> int:
449 return _openssl_to_stdlib_verify[self._ctx.get_verify_mode()]
450
451 @verify_mode.setter
452 def verify_mode(self, value: ssl.VerifyMode) -> None:
453 self._ctx.set_verify(_stdlib_to_openssl_verify[value], _verify_callback)
454
455 def set_default_verify_paths(self) -> None:
456 self._ctx.set_default_verify_paths()
457
458 def set_ciphers(self, ciphers: bytes | str) -> None:
459 if isinstance(ciphers, str):
460 ciphers = ciphers.encode("utf-8")
461 self._ctx.set_cipher_list(ciphers)
462
463 def load_verify_locations(
464 self,
465 cafile: str | None = None,
466 capath: str | None = None,
467 cadata: bytes | None = None,
468 ) -> None:
469 if cafile is not None:
470 cafile = cafile.encode("utf-8") # type: ignore[assignment]
471 if capath is not None:
472 capath = capath.encode("utf-8") # type: ignore[assignment]
473 try:
474 self._ctx.load_verify_locations(cafile, capath)
475 if cadata is not None:
476 self._ctx.load_verify_locations(BytesIO(cadata))
477 except OpenSSL.SSL.Error as e:
478 raise ssl.SSLError(f"unable to load trusted certificates: {e!r}") from e
479
480 def load_cert_chain(
481 self,
482 certfile: str,
483 keyfile: str | None = None,
484 password: str | None = None,
485 ) -> None:
486 try:
487 self._ctx.use_certificate_chain_file(certfile)
488 if password is not None:
489 if not isinstance(password, bytes):
490 password = password.encode("utf-8") # type: ignore[assignment]
491 self._ctx.set_passwd_cb(lambda *_: password)
492 self._ctx.use_privatekey_file(keyfile or certfile)
493 except OpenSSL.SSL.Error as e:
494 raise ssl.SSLError(f"Unable to load certificate chain: {e!r}") from e
495
496 def set_alpn_protocols(self, protocols: list[bytes | str]) -> None:
497 protocols = [util.util.to_bytes(p, "ascii") for p in protocols]
498 return self._ctx.set_alpn_protos(protocols) # type: ignore[no-any-return]
499
500 def wrap_socket(
501 self,
502 sock: socket_cls,
503 server_side: bool = False,
504 do_handshake_on_connect: bool = True,
505 suppress_ragged_eofs: bool = True,
506 server_hostname: bytes | str | None = None,
507 ) -> WrappedSocket:
508 cnx = OpenSSL.SSL.Connection(self._ctx, sock)
509
510 # If server_hostname is an IP, don't use it for SNI, per RFC6066 Section 3
511 if server_hostname and not util.ssl_.is_ipaddress(server_hostname):
512 if isinstance(server_hostname, str):
513 server_hostname = server_hostname.encode("utf-8")
514 cnx.set_tlsext_host_name(server_hostname)
515
516 cnx.set_connect_state()
517
518 while True:
519 try:
520 cnx.do_handshake()
521 except OpenSSL.SSL.WantReadError as e:
522 if not util.wait_for_read(sock, sock.gettimeout()):
523 raise timeout("select timed out") from e
524 continue
525 except OpenSSL.SSL.Error as e:
526 raise ssl.SSLError(f"bad handshake: {e!r}") from e
527 break
528
529 return WrappedSocket(cnx, sock)
530
531 def _set_ctx_options(self) -> None:
532 self._ctx.set_options(
533 self._options
534 | _openssl_to_ssl_minimum_version[self._minimum_version]
535 | _openssl_to_ssl_maximum_version[self._maximum_version]
536 )
537
538 @property
539 def minimum_version(self) -> int:
540 return self._minimum_version
541
542 @minimum_version.setter
543 def minimum_version(self, minimum_version: int) -> None:
544 self._minimum_version = minimum_version
545 self._set_ctx_options()
546
547 @property
548 def maximum_version(self) -> int:
549 return self._maximum_version
550
551 @maximum_version.setter
552 def maximum_version(self, maximum_version: int) -> None:
553 self._maximum_version = maximum_version
554 self._set_ctx_options()
555
556
557def _verify_callback(
558 cnx: OpenSSL.SSL.Connection,
559 x509: X509,
560 err_no: int,
561 err_depth: int,
562 return_code: int,
563) -> bool:
564 return err_no == 0