Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/cryptography/hazmat/backends/openssl/backend.py: 30%
519 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 07:26 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 07:26 +0000
1# This file is dual licensed under the terms of the Apache License, Version
2# 2.0, and the BSD License. See the LICENSE file in the root of this repository
3# for complete details.
5from __future__ import annotations
7import collections
8import contextlib
9import itertools
10import typing
12from cryptography import utils, x509
13from cryptography.exceptions import UnsupportedAlgorithm, _Reasons
14from cryptography.hazmat.backends.openssl import aead
15from cryptography.hazmat.backends.openssl.ciphers import _CipherContext
16from cryptography.hazmat.bindings._rust import openssl as rust_openssl
17from cryptography.hazmat.bindings.openssl import binding
18from cryptography.hazmat.primitives import hashes, serialization
19from cryptography.hazmat.primitives._asymmetric import AsymmetricPadding
20from cryptography.hazmat.primitives.asymmetric import dh, ec
21from cryptography.hazmat.primitives.asymmetric import utils as asym_utils
22from cryptography.hazmat.primitives.asymmetric.padding import (
23 MGF1,
24 OAEP,
25 PSS,
26 PKCS1v15,
27)
28from cryptography.hazmat.primitives.asymmetric.types import (
29 PrivateKeyTypes,
30 PublicKeyTypes,
31)
32from cryptography.hazmat.primitives.ciphers import (
33 CipherAlgorithm,
34)
35from cryptography.hazmat.primitives.ciphers.algorithms import (
36 AES,
37 AES128,
38 AES256,
39 ARC4,
40 SM4,
41 Camellia,
42 ChaCha20,
43 TripleDES,
44 _BlowfishInternal,
45 _CAST5Internal,
46 _IDEAInternal,
47 _SEEDInternal,
48)
49from cryptography.hazmat.primitives.ciphers.modes import (
50 CBC,
51 CFB,
52 CFB8,
53 CTR,
54 ECB,
55 GCM,
56 OFB,
57 XTS,
58 Mode,
59)
60from cryptography.hazmat.primitives.serialization.pkcs12 import (
61 PBES,
62 PKCS12Certificate,
63 PKCS12KeyAndCertificates,
64 PKCS12PrivateKeyTypes,
65 _PKCS12CATypes,
66)
68_MemoryBIO = collections.namedtuple("_MemoryBIO", ["bio", "char_ptr"])
71# Not actually supported, just used as a marker for some serialization tests.
72class _RC2:
73 pass
76class Backend:
77 """
78 OpenSSL API binding interfaces.
79 """
81 name = "openssl"
83 # FIPS has opinions about acceptable algorithms and key sizes, but the
84 # disallowed algorithms are still present in OpenSSL. They just error if
85 # you try to use them. To avoid that we allowlist the algorithms in
86 # FIPS 140-3. This isn't ideal, but FIPS 140-3 is trash so here we are.
87 _fips_aead: typing.ClassVar[set[bytes]] = {
88 b"aes-128-ccm",
89 b"aes-192-ccm",
90 b"aes-256-ccm",
91 b"aes-128-gcm",
92 b"aes-192-gcm",
93 b"aes-256-gcm",
94 }
95 # TripleDES encryption is disallowed/deprecated throughout 2023 in
96 # FIPS 140-3. To keep it simple we denylist any use of TripleDES (TDEA).
97 _fips_ciphers = (AES,)
98 # Sometimes SHA1 is still permissible. That logic is contained
99 # within the various *_supported methods.
100 _fips_hashes = (
101 hashes.SHA224,
102 hashes.SHA256,
103 hashes.SHA384,
104 hashes.SHA512,
105 hashes.SHA512_224,
106 hashes.SHA512_256,
107 hashes.SHA3_224,
108 hashes.SHA3_256,
109 hashes.SHA3_384,
110 hashes.SHA3_512,
111 hashes.SHAKE128,
112 hashes.SHAKE256,
113 )
114 _fips_ecdh_curves = (
115 ec.SECP224R1,
116 ec.SECP256R1,
117 ec.SECP384R1,
118 ec.SECP521R1,
119 )
120 _fips_rsa_min_key_size = 2048
121 _fips_rsa_min_public_exponent = 65537
122 _fips_dsa_min_modulus = 1 << 2048
123 _fips_dh_min_key_size = 2048
124 _fips_dh_min_modulus = 1 << _fips_dh_min_key_size
126 def __init__(self) -> None:
127 self._binding = binding.Binding()
128 self._ffi = self._binding.ffi
129 self._lib = self._binding.lib
130 self._fips_enabled = rust_openssl.is_fips_enabled()
132 self._cipher_registry: dict[
133 tuple[type[CipherAlgorithm], type[Mode]],
134 typing.Callable,
135 ] = {}
136 self._register_default_ciphers()
137 self._dh_types = [self._lib.EVP_PKEY_DH]
138 if self._lib.Cryptography_HAS_EVP_PKEY_DHX:
139 self._dh_types.append(self._lib.EVP_PKEY_DHX)
141 def __repr__(self) -> str:
142 return "<OpenSSLBackend(version: {}, FIPS: {}, Legacy: {})>".format(
143 self.openssl_version_text(),
144 self._fips_enabled,
145 self._binding._legacy_provider_loaded,
146 )
148 def openssl_assert(
149 self,
150 ok: bool,
151 errors: list[rust_openssl.OpenSSLError] | None = None,
152 ) -> None:
153 return binding._openssl_assert(self._lib, ok, errors=errors)
155 def _enable_fips(self) -> None:
156 # This function enables FIPS mode for OpenSSL 3.0.0 on installs that
157 # have the FIPS provider installed properly.
158 self._binding._enable_fips()
159 assert rust_openssl.is_fips_enabled()
160 self._fips_enabled = rust_openssl.is_fips_enabled()
162 def openssl_version_text(self) -> str:
163 """
164 Friendly string name of the loaded OpenSSL library. This is not
165 necessarily the same version as it was compiled against.
167 Example: OpenSSL 1.1.1d 10 Sep 2019
168 """
169 return self._ffi.string(
170 self._lib.OpenSSL_version(self._lib.OPENSSL_VERSION)
171 ).decode("ascii")
173 def openssl_version_number(self) -> int:
174 return self._lib.OpenSSL_version_num()
176 def _evp_md_from_algorithm(self, algorithm: hashes.HashAlgorithm):
177 if algorithm.name in ("blake2b", "blake2s"):
178 alg = f"{algorithm.name}{algorithm.digest_size * 8}".encode(
179 "ascii"
180 )
181 else:
182 alg = algorithm.name.encode("ascii")
184 evp_md = self._lib.EVP_get_digestbyname(alg)
185 return evp_md
187 def _evp_md_non_null_from_algorithm(self, algorithm: hashes.HashAlgorithm):
188 evp_md = self._evp_md_from_algorithm(algorithm)
189 self.openssl_assert(evp_md != self._ffi.NULL)
190 return evp_md
192 def hash_supported(self, algorithm: hashes.HashAlgorithm) -> bool:
193 if self._fips_enabled and not isinstance(algorithm, self._fips_hashes):
194 return False
196 evp_md = self._evp_md_from_algorithm(algorithm)
197 return evp_md != self._ffi.NULL
199 def signature_hash_supported(
200 self, algorithm: hashes.HashAlgorithm
201 ) -> bool:
202 # Dedicated check for hashing algorithm use in message digest for
203 # signatures, e.g. RSA PKCS#1 v1.5 SHA1 (sha1WithRSAEncryption).
204 if self._fips_enabled and isinstance(algorithm, hashes.SHA1):
205 return False
206 return self.hash_supported(algorithm)
208 def scrypt_supported(self) -> bool:
209 if self._fips_enabled:
210 return False
211 else:
212 return self._lib.Cryptography_HAS_SCRYPT == 1
214 def hmac_supported(self, algorithm: hashes.HashAlgorithm) -> bool:
215 # FIPS mode still allows SHA1 for HMAC
216 if self._fips_enabled and isinstance(algorithm, hashes.SHA1):
217 return True
219 return self.hash_supported(algorithm)
221 def cipher_supported(self, cipher: CipherAlgorithm, mode: Mode) -> bool:
222 if self._fips_enabled:
223 # FIPS mode requires AES. TripleDES is disallowed/deprecated in
224 # FIPS 140-3.
225 if not isinstance(cipher, self._fips_ciphers):
226 return False
228 try:
229 adapter = self._cipher_registry[type(cipher), type(mode)]
230 except KeyError:
231 return False
232 evp_cipher = adapter(self, cipher, mode)
233 return self._ffi.NULL != evp_cipher
235 def register_cipher_adapter(self, cipher_cls, mode_cls, adapter) -> None:
236 if (cipher_cls, mode_cls) in self._cipher_registry:
237 raise ValueError(
238 f"Duplicate registration for: {cipher_cls} {mode_cls}."
239 )
240 self._cipher_registry[cipher_cls, mode_cls] = adapter
242 def _register_default_ciphers(self) -> None:
243 for cipher_cls in [AES, AES128, AES256]:
244 for mode_cls in [CBC, CTR, ECB, OFB, CFB, CFB8, GCM]:
245 self.register_cipher_adapter(
246 cipher_cls,
247 mode_cls,
248 GetCipherByName(
249 "{cipher.name}-{cipher.key_size}-{mode.name}"
250 ),
251 )
252 for mode_cls in [CBC, CTR, ECB, OFB, CFB]:
253 self.register_cipher_adapter(
254 Camellia,
255 mode_cls,
256 GetCipherByName("{cipher.name}-{cipher.key_size}-{mode.name}"),
257 )
258 for mode_cls in [CBC, CFB, CFB8, OFB]:
259 self.register_cipher_adapter(
260 TripleDES, mode_cls, GetCipherByName("des-ede3-{mode.name}")
261 )
262 self.register_cipher_adapter(
263 TripleDES, ECB, GetCipherByName("des-ede3")
264 )
265 # ChaCha20 uses the Long Name "chacha20" in OpenSSL, but in LibreSSL
266 # it uses "chacha"
267 self.register_cipher_adapter(
268 ChaCha20,
269 type(None),
270 GetCipherByName(
271 "chacha" if self._lib.CRYPTOGRAPHY_IS_LIBRESSL else "chacha20"
272 ),
273 )
274 self.register_cipher_adapter(AES, XTS, _get_xts_cipher)
275 for mode_cls in [ECB, CBC, OFB, CFB, CTR]:
276 self.register_cipher_adapter(
277 SM4, mode_cls, GetCipherByName("sm4-{mode.name}")
278 )
279 # Don't register legacy ciphers if they're unavailable. Hypothetically
280 # this wouldn't be necessary because we test availability by seeing if
281 # we get an EVP_CIPHER * in the _CipherContext __init__, but OpenSSL 3
282 # will return a valid pointer even though the cipher is unavailable.
283 if (
284 self._binding._legacy_provider_loaded
285 or not self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER
286 ):
287 for mode_cls in [CBC, CFB, OFB, ECB]:
288 self.register_cipher_adapter(
289 _BlowfishInternal,
290 mode_cls,
291 GetCipherByName("bf-{mode.name}"),
292 )
293 for mode_cls in [CBC, CFB, OFB, ECB]:
294 self.register_cipher_adapter(
295 _SEEDInternal,
296 mode_cls,
297 GetCipherByName("seed-{mode.name}"),
298 )
299 for cipher_cls, mode_cls in itertools.product(
300 [_CAST5Internal, _IDEAInternal],
301 [CBC, OFB, CFB, ECB],
302 ):
303 self.register_cipher_adapter(
304 cipher_cls,
305 mode_cls,
306 GetCipherByName("{cipher.name}-{mode.name}"),
307 )
308 self.register_cipher_adapter(
309 ARC4, type(None), GetCipherByName("rc4")
310 )
311 # We don't actually support RC2, this is just used by some tests.
312 self.register_cipher_adapter(
313 _RC2, type(None), GetCipherByName("rc2")
314 )
316 def create_symmetric_encryption_ctx(
317 self, cipher: CipherAlgorithm, mode: Mode
318 ) -> _CipherContext:
319 return _CipherContext(self, cipher, mode, _CipherContext._ENCRYPT)
321 def create_symmetric_decryption_ctx(
322 self, cipher: CipherAlgorithm, mode: Mode
323 ) -> _CipherContext:
324 return _CipherContext(self, cipher, mode, _CipherContext._DECRYPT)
326 def pbkdf2_hmac_supported(self, algorithm: hashes.HashAlgorithm) -> bool:
327 return self.hmac_supported(algorithm)
329 def _consume_errors(self) -> list[rust_openssl.OpenSSLError]:
330 return rust_openssl.capture_error_stack()
332 def generate_rsa_parameters_supported(
333 self, public_exponent: int, key_size: int
334 ) -> bool:
335 return (
336 public_exponent >= 3
337 and public_exponent & 1 != 0
338 and key_size >= 512
339 )
341 def _create_evp_pkey_gc(self):
342 evp_pkey = self._lib.EVP_PKEY_new()
343 self.openssl_assert(evp_pkey != self._ffi.NULL)
344 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
345 return evp_pkey
347 def _rsa_cdata_to_evp_pkey(self, rsa_cdata):
348 evp_pkey = self._create_evp_pkey_gc()
349 res = self._lib.EVP_PKEY_set1_RSA(evp_pkey, rsa_cdata)
350 self.openssl_assert(res == 1)
351 return evp_pkey
353 def _bytes_to_bio(self, data: bytes) -> _MemoryBIO:
354 """
355 Return a _MemoryBIO namedtuple of (BIO, char*).
357 The char* is the storage for the BIO and it must stay alive until the
358 BIO is finished with.
359 """
360 data_ptr = self._ffi.from_buffer(data)
361 bio = self._lib.BIO_new_mem_buf(data_ptr, len(data))
362 self.openssl_assert(bio != self._ffi.NULL)
364 return _MemoryBIO(self._ffi.gc(bio, self._lib.BIO_free), data_ptr)
366 def _create_mem_bio_gc(self):
367 """
368 Creates an empty memory BIO.
369 """
370 bio_method = self._lib.BIO_s_mem()
371 self.openssl_assert(bio_method != self._ffi.NULL)
372 bio = self._lib.BIO_new(bio_method)
373 self.openssl_assert(bio != self._ffi.NULL)
374 bio = self._ffi.gc(bio, self._lib.BIO_free)
375 return bio
377 def _read_mem_bio(self, bio) -> bytes:
378 """
379 Reads a memory BIO. This only works on memory BIOs.
380 """
381 buf = self._ffi.new("char **")
382 buf_len = self._lib.BIO_get_mem_data(bio, buf)
383 self.openssl_assert(buf_len > 0)
384 self.openssl_assert(buf[0] != self._ffi.NULL)
385 bio_data = self._ffi.buffer(buf[0], buf_len)[:]
386 return bio_data
388 def _evp_pkey_to_private_key(
389 self, evp_pkey, unsafe_skip_rsa_key_validation: bool
390 ) -> PrivateKeyTypes:
391 """
392 Return the appropriate type of PrivateKey given an evp_pkey cdata
393 pointer.
394 """
395 return rust_openssl.keys.private_key_from_ptr(
396 int(self._ffi.cast("uintptr_t", evp_pkey)),
397 unsafe_skip_rsa_key_validation=unsafe_skip_rsa_key_validation,
398 )
400 def _evp_pkey_to_public_key(self, evp_pkey) -> PublicKeyTypes:
401 """
402 Return the appropriate type of PublicKey given an evp_pkey cdata
403 pointer.
404 """
405 return rust_openssl.keys.public_key_from_ptr(
406 int(self._ffi.cast("uintptr_t", evp_pkey))
407 )
409 def _oaep_hash_supported(self, algorithm: hashes.HashAlgorithm) -> bool:
410 if self._fips_enabled and isinstance(algorithm, hashes.SHA1):
411 return False
413 return isinstance(
414 algorithm,
415 (
416 hashes.SHA1,
417 hashes.SHA224,
418 hashes.SHA256,
419 hashes.SHA384,
420 hashes.SHA512,
421 ),
422 )
424 def rsa_padding_supported(self, padding: AsymmetricPadding) -> bool:
425 if isinstance(padding, PKCS1v15):
426 return True
427 elif isinstance(padding, PSS) and isinstance(padding._mgf, MGF1):
428 # SHA1 is permissible in MGF1 in FIPS even when SHA1 is blocked
429 # as signature algorithm.
430 if self._fips_enabled and isinstance(
431 padding._mgf._algorithm, hashes.SHA1
432 ):
433 return True
434 else:
435 return self.hash_supported(padding._mgf._algorithm)
436 elif isinstance(padding, OAEP) and isinstance(padding._mgf, MGF1):
437 return self._oaep_hash_supported(
438 padding._mgf._algorithm
439 ) and self._oaep_hash_supported(padding._algorithm)
440 else:
441 return False
443 def rsa_encryption_supported(self, padding: AsymmetricPadding) -> bool:
444 if self._fips_enabled and isinstance(padding, PKCS1v15):
445 return False
446 else:
447 return self.rsa_padding_supported(padding)
449 def dsa_supported(self) -> bool:
450 return (
451 not self._lib.CRYPTOGRAPHY_IS_BORINGSSL and not self._fips_enabled
452 )
454 def dsa_hash_supported(self, algorithm: hashes.HashAlgorithm) -> bool:
455 if not self.dsa_supported():
456 return False
457 return self.signature_hash_supported(algorithm)
459 def cmac_algorithm_supported(self, algorithm) -> bool:
460 return self.cipher_supported(
461 algorithm, CBC(b"\x00" * algorithm.block_size)
462 )
464 def load_pem_private_key(
465 self,
466 data: bytes,
467 password: bytes | None,
468 unsafe_skip_rsa_key_validation: bool,
469 ) -> PrivateKeyTypes:
470 return self._load_key(
471 self._lib.PEM_read_bio_PrivateKey,
472 data,
473 password,
474 unsafe_skip_rsa_key_validation,
475 )
477 def load_pem_public_key(self, data: bytes) -> PublicKeyTypes:
478 mem_bio = self._bytes_to_bio(data)
479 # In OpenSSL 3.0.x the PEM_read_bio_PUBKEY function will invoke
480 # the default password callback if you pass an encrypted private
481 # key. This is very, very, very bad as the default callback can
482 # trigger an interactive console prompt, which will hang the
483 # Python process. We therefore provide our own callback to
484 # catch this and error out properly.
485 userdata = self._ffi.new("CRYPTOGRAPHY_PASSWORD_DATA *")
486 evp_pkey = self._lib.PEM_read_bio_PUBKEY(
487 mem_bio.bio,
488 self._ffi.NULL,
489 self._ffi.addressof(
490 self._lib._original_lib, "Cryptography_pem_password_cb"
491 ),
492 userdata,
493 )
494 if evp_pkey != self._ffi.NULL:
495 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
496 return self._evp_pkey_to_public_key(evp_pkey)
497 else:
498 # It's not a (RSA/DSA/ECDSA) subjectPublicKeyInfo, but we still
499 # need to check to see if it is a pure PKCS1 RSA public key (not
500 # embedded in a subjectPublicKeyInfo)
501 self._consume_errors()
502 res = self._lib.BIO_reset(mem_bio.bio)
503 self.openssl_assert(res == 1)
504 rsa_cdata = self._lib.PEM_read_bio_RSAPublicKey(
505 mem_bio.bio,
506 self._ffi.NULL,
507 self._ffi.addressof(
508 self._lib._original_lib, "Cryptography_pem_password_cb"
509 ),
510 userdata,
511 )
512 if rsa_cdata != self._ffi.NULL:
513 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
514 evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata)
515 return self._evp_pkey_to_public_key(evp_pkey)
516 else:
517 self._handle_key_loading_error()
519 def load_der_private_key(
520 self,
521 data: bytes,
522 password: bytes | None,
523 unsafe_skip_rsa_key_validation: bool,
524 ) -> PrivateKeyTypes:
525 # OpenSSL has a function called d2i_AutoPrivateKey that in theory
526 # handles this automatically, however it doesn't handle encrypted
527 # private keys. Instead we try to load the key two different ways.
528 # First we'll try to load it as a traditional key.
529 bio_data = self._bytes_to_bio(data)
530 key = self._evp_pkey_from_der_traditional_key(bio_data, password)
531 if key:
532 return self._evp_pkey_to_private_key(
533 key, unsafe_skip_rsa_key_validation
534 )
535 else:
536 # Finally we try to load it with the method that handles encrypted
537 # PKCS8 properly.
538 return self._load_key(
539 self._lib.d2i_PKCS8PrivateKey_bio,
540 data,
541 password,
542 unsafe_skip_rsa_key_validation,
543 )
545 def _evp_pkey_from_der_traditional_key(self, bio_data, password):
546 key = self._lib.d2i_PrivateKey_bio(bio_data.bio, self._ffi.NULL)
547 if key != self._ffi.NULL:
548 key = self._ffi.gc(key, self._lib.EVP_PKEY_free)
549 if password is not None:
550 raise TypeError(
551 "Password was given but private key is not encrypted."
552 )
554 return key
555 else:
556 self._consume_errors()
557 return None
559 def load_der_public_key(self, data: bytes) -> PublicKeyTypes:
560 mem_bio = self._bytes_to_bio(data)
561 evp_pkey = self._lib.d2i_PUBKEY_bio(mem_bio.bio, self._ffi.NULL)
562 if evp_pkey != self._ffi.NULL:
563 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
564 return self._evp_pkey_to_public_key(evp_pkey)
565 else:
566 # It's not a (RSA/DSA/ECDSA) subjectPublicKeyInfo, but we still
567 # need to check to see if it is a pure PKCS1 RSA public key (not
568 # embedded in a subjectPublicKeyInfo)
569 self._consume_errors()
570 res = self._lib.BIO_reset(mem_bio.bio)
571 self.openssl_assert(res == 1)
572 rsa_cdata = self._lib.d2i_RSAPublicKey_bio(
573 mem_bio.bio, self._ffi.NULL
574 )
575 if rsa_cdata != self._ffi.NULL:
576 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
577 evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata)
578 return self._evp_pkey_to_public_key(evp_pkey)
579 else:
580 self._handle_key_loading_error()
582 def _cert2ossl(self, cert: x509.Certificate) -> typing.Any:
583 data = cert.public_bytes(serialization.Encoding.DER)
584 mem_bio = self._bytes_to_bio(data)
585 x509 = self._lib.d2i_X509_bio(mem_bio.bio, self._ffi.NULL)
586 self.openssl_assert(x509 != self._ffi.NULL)
587 x509 = self._ffi.gc(x509, self._lib.X509_free)
588 return x509
590 def _ossl2cert(self, x509_ptr: typing.Any) -> x509.Certificate:
591 bio = self._create_mem_bio_gc()
592 res = self._lib.i2d_X509_bio(bio, x509_ptr)
593 self.openssl_assert(res == 1)
594 return x509.load_der_x509_certificate(self._read_mem_bio(bio))
596 def _key2ossl(self, key: PKCS12PrivateKeyTypes) -> typing.Any:
597 data = key.private_bytes(
598 serialization.Encoding.DER,
599 serialization.PrivateFormat.PKCS8,
600 serialization.NoEncryption(),
601 )
602 mem_bio = self._bytes_to_bio(data)
604 evp_pkey = self._lib.d2i_PrivateKey_bio(
605 mem_bio.bio,
606 self._ffi.NULL,
607 )
608 self.openssl_assert(evp_pkey != self._ffi.NULL)
609 return self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
611 def _load_key(
612 self, openssl_read_func, data, password, unsafe_skip_rsa_key_validation
613 ) -> PrivateKeyTypes:
614 mem_bio = self._bytes_to_bio(data)
616 userdata = self._ffi.new("CRYPTOGRAPHY_PASSWORD_DATA *")
617 if password is not None:
618 utils._check_byteslike("password", password)
619 password_ptr = self._ffi.from_buffer(password)
620 userdata.password = password_ptr
621 userdata.length = len(password)
623 evp_pkey = openssl_read_func(
624 mem_bio.bio,
625 self._ffi.NULL,
626 self._ffi.addressof(
627 self._lib._original_lib, "Cryptography_pem_password_cb"
628 ),
629 userdata,
630 )
632 if evp_pkey == self._ffi.NULL:
633 if userdata.error != 0:
634 self._consume_errors()
635 if userdata.error == -1:
636 raise TypeError(
637 "Password was not given but private key is encrypted"
638 )
639 else:
640 assert userdata.error == -2
641 raise ValueError(
642 "Passwords longer than {} bytes are not supported "
643 "by this backend.".format(userdata.maxsize - 1)
644 )
645 else:
646 self._handle_key_loading_error()
648 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
650 if password is not None and userdata.called == 0:
651 raise TypeError(
652 "Password was given but private key is not encrypted."
653 )
655 assert (
656 password is not None and userdata.called == 1
657 ) or password is None
659 return self._evp_pkey_to_private_key(
660 evp_pkey, unsafe_skip_rsa_key_validation
661 )
663 def _handle_key_loading_error(self) -> typing.NoReturn:
664 errors = self._consume_errors()
666 if not errors:
667 raise ValueError(
668 "Could not deserialize key data. The data may be in an "
669 "incorrect format or it may be encrypted with an unsupported "
670 "algorithm."
671 )
673 elif (
674 errors[0]._lib_reason_match(
675 self._lib.ERR_LIB_EVP, self._lib.EVP_R_BAD_DECRYPT
676 )
677 or errors[0]._lib_reason_match(
678 self._lib.ERR_LIB_PKCS12,
679 self._lib.PKCS12_R_PKCS12_CIPHERFINAL_ERROR,
680 )
681 or (
682 self._lib.Cryptography_HAS_PROVIDERS
683 and errors[0]._lib_reason_match(
684 self._lib.ERR_LIB_PROV,
685 self._lib.PROV_R_BAD_DECRYPT,
686 )
687 )
688 ):
689 raise ValueError("Bad decrypt. Incorrect password?")
691 elif any(
692 error._lib_reason_match(
693 self._lib.ERR_LIB_EVP,
694 self._lib.EVP_R_UNSUPPORTED_PRIVATE_KEY_ALGORITHM,
695 )
696 for error in errors
697 ):
698 raise ValueError("Unsupported public key algorithm.")
700 else:
701 raise ValueError(
702 "Could not deserialize key data. The data may be in an "
703 "incorrect format, it may be encrypted with an unsupported "
704 "algorithm, or it may be an unsupported key type (e.g. EC "
705 "curves with explicit parameters).",
706 errors,
707 )
709 def elliptic_curve_supported(self, curve: ec.EllipticCurve) -> bool:
710 if self._fips_enabled and not isinstance(
711 curve, self._fips_ecdh_curves
712 ):
713 return False
715 return rust_openssl.ec.curve_supported(curve)
717 def elliptic_curve_signature_algorithm_supported(
718 self,
719 signature_algorithm: ec.EllipticCurveSignatureAlgorithm,
720 curve: ec.EllipticCurve,
721 ) -> bool:
722 # We only support ECDSA right now.
723 if not isinstance(signature_algorithm, ec.ECDSA):
724 return False
726 return self.elliptic_curve_supported(curve) and (
727 isinstance(signature_algorithm.algorithm, asym_utils.Prehashed)
728 or self.hash_supported(signature_algorithm.algorithm)
729 )
731 def elliptic_curve_exchange_algorithm_supported(
732 self, algorithm: ec.ECDH, curve: ec.EllipticCurve
733 ) -> bool:
734 return self.elliptic_curve_supported(curve) and isinstance(
735 algorithm, ec.ECDH
736 )
738 def dh_supported(self) -> bool:
739 return not self._lib.CRYPTOGRAPHY_IS_BORINGSSL
741 def dh_parameters_supported(
742 self, p: int, g: int, q: int | None = None
743 ) -> bool:
744 try:
745 rust_openssl.dh.from_parameter_numbers(
746 dh.DHParameterNumbers(p=p, g=g, q=q)
747 )
748 except ValueError:
749 return False
750 else:
751 return True
753 def dh_x942_serialization_supported(self) -> bool:
754 return self._lib.Cryptography_HAS_EVP_PKEY_DHX == 1
756 def x25519_supported(self) -> bool:
757 if self._fips_enabled:
758 return False
759 return True
761 def x448_supported(self) -> bool:
762 if self._fips_enabled:
763 return False
764 return (
765 not self._lib.CRYPTOGRAPHY_IS_LIBRESSL
766 and not self._lib.CRYPTOGRAPHY_IS_BORINGSSL
767 )
769 def ed25519_supported(self) -> bool:
770 if self._fips_enabled:
771 return False
772 return True
774 def ed448_supported(self) -> bool:
775 if self._fips_enabled:
776 return False
777 return (
778 not self._lib.CRYPTOGRAPHY_IS_LIBRESSL
779 and not self._lib.CRYPTOGRAPHY_IS_BORINGSSL
780 )
782 def aead_cipher_supported(self, cipher) -> bool:
783 return aead._aead_cipher_supported(self, cipher)
785 def _zero_data(self, data, length: int) -> None:
786 # We clear things this way because at the moment we're not
787 # sure of a better way that can guarantee it overwrites the
788 # memory of a bytearray and doesn't just replace the underlying char *.
789 for i in range(length):
790 data[i] = 0
792 @contextlib.contextmanager
793 def _zeroed_null_terminated_buf(self, data):
794 """
795 This method takes bytes, which can be a bytestring or a mutable
796 buffer like a bytearray, and yields a null-terminated version of that
797 data. This is required because PKCS12_parse doesn't take a length with
798 its password char * and ffi.from_buffer doesn't provide null
799 termination. So, to support zeroing the data via bytearray we
800 need to build this ridiculous construct that copies the memory, but
801 zeroes it after use.
802 """
803 if data is None:
804 yield self._ffi.NULL
805 else:
806 data_len = len(data)
807 buf = self._ffi.new("char[]", data_len + 1)
808 self._ffi.memmove(buf, data, data_len)
809 try:
810 yield buf
811 finally:
812 # Cast to a uint8_t * so we can assign by integer
813 self._zero_data(self._ffi.cast("uint8_t *", buf), data_len)
815 def load_key_and_certificates_from_pkcs12(
816 self, data: bytes, password: bytes | None
817 ) -> tuple[
818 PrivateKeyTypes | None,
819 x509.Certificate | None,
820 list[x509.Certificate],
821 ]:
822 pkcs12 = self.load_pkcs12(data, password)
823 return (
824 pkcs12.key,
825 pkcs12.cert.certificate if pkcs12.cert else None,
826 [cert.certificate for cert in pkcs12.additional_certs],
827 )
829 def load_pkcs12(
830 self, data: bytes, password: bytes | None
831 ) -> PKCS12KeyAndCertificates:
832 if password is not None:
833 utils._check_byteslike("password", password)
835 bio = self._bytes_to_bio(data)
836 p12 = self._lib.d2i_PKCS12_bio(bio.bio, self._ffi.NULL)
837 if p12 == self._ffi.NULL:
838 self._consume_errors()
839 raise ValueError("Could not deserialize PKCS12 data")
841 p12 = self._ffi.gc(p12, self._lib.PKCS12_free)
842 evp_pkey_ptr = self._ffi.new("EVP_PKEY **")
843 x509_ptr = self._ffi.new("X509 **")
844 sk_x509_ptr = self._ffi.new("Cryptography_STACK_OF_X509 **")
845 with self._zeroed_null_terminated_buf(password) as password_buf:
846 res = self._lib.PKCS12_parse(
847 p12, password_buf, evp_pkey_ptr, x509_ptr, sk_x509_ptr
848 )
849 if res == 0:
850 self._consume_errors()
851 raise ValueError("Invalid password or PKCS12 data")
853 cert = None
854 key = None
855 additional_certificates = []
857 if evp_pkey_ptr[0] != self._ffi.NULL:
858 evp_pkey = self._ffi.gc(evp_pkey_ptr[0], self._lib.EVP_PKEY_free)
859 # We don't support turning off RSA key validation when loading
860 # PKCS12 keys
861 key = self._evp_pkey_to_private_key(
862 evp_pkey, unsafe_skip_rsa_key_validation=False
863 )
865 if x509_ptr[0] != self._ffi.NULL:
866 x509 = self._ffi.gc(x509_ptr[0], self._lib.X509_free)
867 cert_obj = self._ossl2cert(x509)
868 name = None
869 maybe_name = self._lib.X509_alias_get0(x509, self._ffi.NULL)
870 if maybe_name != self._ffi.NULL:
871 name = self._ffi.string(maybe_name)
872 cert = PKCS12Certificate(cert_obj, name)
874 if sk_x509_ptr[0] != self._ffi.NULL:
875 sk_x509 = self._ffi.gc(sk_x509_ptr[0], self._lib.sk_X509_free)
876 num = self._lib.sk_X509_num(sk_x509_ptr[0])
878 # In OpenSSL < 3.0.0 PKCS12 parsing reverses the order of the
879 # certificates.
880 indices: typing.Iterable[int]
881 if (
882 self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER
883 or self._lib.CRYPTOGRAPHY_IS_BORINGSSL
884 ):
885 indices = range(num)
886 else:
887 indices = reversed(range(num))
889 for i in indices:
890 x509 = self._lib.sk_X509_value(sk_x509, i)
891 self.openssl_assert(x509 != self._ffi.NULL)
892 x509 = self._ffi.gc(x509, self._lib.X509_free)
893 addl_cert = self._ossl2cert(x509)
894 addl_name = None
895 maybe_name = self._lib.X509_alias_get0(x509, self._ffi.NULL)
896 if maybe_name != self._ffi.NULL:
897 addl_name = self._ffi.string(maybe_name)
898 additional_certificates.append(
899 PKCS12Certificate(addl_cert, addl_name)
900 )
902 return PKCS12KeyAndCertificates(key, cert, additional_certificates)
904 def serialize_key_and_certificates_to_pkcs12(
905 self,
906 name: bytes | None,
907 key: PKCS12PrivateKeyTypes | None,
908 cert: x509.Certificate | None,
909 cas: list[_PKCS12CATypes] | None,
910 encryption_algorithm: serialization.KeySerializationEncryption,
911 ) -> bytes:
912 password = None
913 if name is not None:
914 utils._check_bytes("name", name)
916 if isinstance(encryption_algorithm, serialization.NoEncryption):
917 nid_cert = -1
918 nid_key = -1
919 pkcs12_iter = 0
920 mac_iter = 0
921 mac_alg = self._ffi.NULL
922 elif isinstance(
923 encryption_algorithm, serialization.BestAvailableEncryption
924 ):
925 # PKCS12 encryption is hopeless trash and can never be fixed.
926 # OpenSSL 3 supports PBESv2, but Libre and Boring do not, so
927 # we use PBESv1 with 3DES on the older paths.
928 if self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER:
929 nid_cert = self._lib.NID_aes_256_cbc
930 nid_key = self._lib.NID_aes_256_cbc
931 else:
932 nid_cert = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC
933 nid_key = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC
934 # At least we can set this higher than OpenSSL's default
935 pkcs12_iter = 20000
936 # mac_iter chosen for compatibility reasons, see:
937 # https://www.openssl.org/docs/man1.1.1/man3/PKCS12_create.html
938 # Did we mention how lousy PKCS12 encryption is?
939 mac_iter = 1
940 # MAC algorithm can only be set on OpenSSL 3.0.0+
941 mac_alg = self._ffi.NULL
942 password = encryption_algorithm.password
943 elif (
944 isinstance(
945 encryption_algorithm, serialization._KeySerializationEncryption
946 )
947 and encryption_algorithm._format
948 is serialization.PrivateFormat.PKCS12
949 ):
950 # Default to OpenSSL's defaults. Behavior will vary based on the
951 # version of OpenSSL cryptography is compiled against.
952 nid_cert = 0
953 nid_key = 0
954 # Use the default iters we use in best available
955 pkcs12_iter = 20000
956 # See the Best Available comment for why this is 1
957 mac_iter = 1
958 password = encryption_algorithm.password
959 keycertalg = encryption_algorithm._key_cert_algorithm
960 if keycertalg is PBES.PBESv1SHA1And3KeyTripleDESCBC:
961 nid_cert = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC
962 nid_key = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC
963 elif keycertalg is PBES.PBESv2SHA256AndAES256CBC:
964 if not self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER:
965 raise UnsupportedAlgorithm(
966 "PBESv2 is not supported by this version of OpenSSL"
967 )
968 nid_cert = self._lib.NID_aes_256_cbc
969 nid_key = self._lib.NID_aes_256_cbc
970 else:
971 assert keycertalg is None
972 # We use OpenSSL's defaults
974 if encryption_algorithm._hmac_hash is not None:
975 if not self._lib.Cryptography_HAS_PKCS12_SET_MAC:
976 raise UnsupportedAlgorithm(
977 "Setting MAC algorithm is not supported by this "
978 "version of OpenSSL."
979 )
980 mac_alg = self._evp_md_non_null_from_algorithm(
981 encryption_algorithm._hmac_hash
982 )
983 self.openssl_assert(mac_alg != self._ffi.NULL)
984 else:
985 mac_alg = self._ffi.NULL
987 if encryption_algorithm._kdf_rounds is not None:
988 pkcs12_iter = encryption_algorithm._kdf_rounds
990 else:
991 raise ValueError("Unsupported key encryption type")
993 if cas is None or len(cas) == 0:
994 sk_x509 = self._ffi.NULL
995 else:
996 sk_x509 = self._lib.sk_X509_new_null()
997 sk_x509 = self._ffi.gc(sk_x509, self._lib.sk_X509_free)
999 # This list is to keep the x509 values alive until end of function
1000 ossl_cas = []
1001 for ca in cas:
1002 if isinstance(ca, PKCS12Certificate):
1003 ca_alias = ca.friendly_name
1004 ossl_ca = self._cert2ossl(ca.certificate)
1005 if ca_alias is None:
1006 res = self._lib.X509_alias_set1(
1007 ossl_ca, self._ffi.NULL, -1
1008 )
1009 else:
1010 res = self._lib.X509_alias_set1(
1011 ossl_ca, ca_alias, len(ca_alias)
1012 )
1013 self.openssl_assert(res == 1)
1014 else:
1015 ossl_ca = self._cert2ossl(ca)
1016 ossl_cas.append(ossl_ca)
1017 res = self._lib.sk_X509_push(sk_x509, ossl_ca)
1018 backend.openssl_assert(res >= 1)
1020 with self._zeroed_null_terminated_buf(password) as password_buf:
1021 with self._zeroed_null_terminated_buf(name) as name_buf:
1022 ossl_cert = self._cert2ossl(cert) if cert else self._ffi.NULL
1023 ossl_pkey = (
1024 self._key2ossl(key) if key is not None else self._ffi.NULL
1025 )
1027 p12 = self._lib.PKCS12_create(
1028 password_buf,
1029 name_buf,
1030 ossl_pkey,
1031 ossl_cert,
1032 sk_x509,
1033 nid_key,
1034 nid_cert,
1035 pkcs12_iter,
1036 mac_iter,
1037 0,
1038 )
1040 if (
1041 self._lib.Cryptography_HAS_PKCS12_SET_MAC
1042 and mac_alg != self._ffi.NULL
1043 ):
1044 self._lib.PKCS12_set_mac(
1045 p12,
1046 password_buf,
1047 -1,
1048 self._ffi.NULL,
1049 0,
1050 mac_iter,
1051 mac_alg,
1052 )
1054 self.openssl_assert(p12 != self._ffi.NULL)
1055 p12 = self._ffi.gc(p12, self._lib.PKCS12_free)
1057 bio = self._create_mem_bio_gc()
1058 res = self._lib.i2d_PKCS12_bio(bio, p12)
1059 self.openssl_assert(res > 0)
1060 return self._read_mem_bio(bio)
1062 def poly1305_supported(self) -> bool:
1063 if self._fips_enabled:
1064 return False
1065 elif (
1066 self._lib.CRYPTOGRAPHY_IS_BORINGSSL
1067 or self._lib.CRYPTOGRAPHY_IS_LIBRESSL
1068 ):
1069 return True
1070 else:
1071 return self._lib.Cryptography_HAS_POLY1305 == 1
1073 def pkcs7_supported(self) -> bool:
1074 return not self._lib.CRYPTOGRAPHY_IS_BORINGSSL
1076 def load_pem_pkcs7_certificates(
1077 self, data: bytes
1078 ) -> list[x509.Certificate]:
1079 utils._check_bytes("data", data)
1080 bio = self._bytes_to_bio(data)
1081 p7 = self._lib.PEM_read_bio_PKCS7(
1082 bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL
1083 )
1084 if p7 == self._ffi.NULL:
1085 self._consume_errors()
1086 raise ValueError("Unable to parse PKCS7 data")
1088 p7 = self._ffi.gc(p7, self._lib.PKCS7_free)
1089 return self._load_pkcs7_certificates(p7)
1091 def load_der_pkcs7_certificates(
1092 self, data: bytes
1093 ) -> list[x509.Certificate]:
1094 utils._check_bytes("data", data)
1095 bio = self._bytes_to_bio(data)
1096 p7 = self._lib.d2i_PKCS7_bio(bio.bio, self._ffi.NULL)
1097 if p7 == self._ffi.NULL:
1098 self._consume_errors()
1099 raise ValueError("Unable to parse PKCS7 data")
1101 p7 = self._ffi.gc(p7, self._lib.PKCS7_free)
1102 return self._load_pkcs7_certificates(p7)
1104 def _load_pkcs7_certificates(self, p7) -> list[x509.Certificate]:
1105 nid = self._lib.OBJ_obj2nid(p7.type)
1106 self.openssl_assert(nid != self._lib.NID_undef)
1107 if nid != self._lib.NID_pkcs7_signed:
1108 raise UnsupportedAlgorithm(
1109 "Only basic signed structures are currently supported. NID"
1110 f" for this data was {nid}",
1111 _Reasons.UNSUPPORTED_SERIALIZATION,
1112 )
1114 if p7.d.sign == self._ffi.NULL:
1115 raise ValueError(
1116 "The provided PKCS7 has no certificate data, but a cert "
1117 "loading method was called."
1118 )
1120 sk_x509 = p7.d.sign.cert
1121 num = self._lib.sk_X509_num(sk_x509)
1122 certs: list[x509.Certificate] = []
1123 for i in range(num):
1124 x509 = self._lib.sk_X509_value(sk_x509, i)
1125 self.openssl_assert(x509 != self._ffi.NULL)
1126 cert = self._ossl2cert(x509)
1127 certs.append(cert)
1129 return certs
1132class GetCipherByName:
1133 def __init__(self, fmt: str):
1134 self._fmt = fmt
1136 def __call__(self, backend: Backend, cipher: CipherAlgorithm, mode: Mode):
1137 cipher_name = self._fmt.format(cipher=cipher, mode=mode).lower()
1138 evp_cipher = backend._lib.EVP_get_cipherbyname(
1139 cipher_name.encode("ascii")
1140 )
1142 # try EVP_CIPHER_fetch if present
1143 if (
1144 evp_cipher == backend._ffi.NULL
1145 and backend._lib.Cryptography_HAS_300_EVP_CIPHER
1146 ):
1147 evp_cipher = backend._lib.EVP_CIPHER_fetch(
1148 backend._ffi.NULL,
1149 cipher_name.encode("ascii"),
1150 backend._ffi.NULL,
1151 )
1153 backend._consume_errors()
1154 return evp_cipher
1157def _get_xts_cipher(backend: Backend, cipher: AES, mode):
1158 cipher_name = f"aes-{cipher.key_size // 2}-xts"
1159 return backend._lib.EVP_get_cipherbyname(cipher_name.encode("ascii"))
1162backend = Backend()