Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/cryptography/hazmat/backends/openssl/backend.py: 24%
917 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:50 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:50 +0000
1# This file is dual licensed under the terms of the Apache License, Version
2# 2.0, and the BSD License. See the LICENSE file in the root of this repository
3# for complete details.
5from __future__ import annotations
7import collections
8import contextlib
9import itertools
10import typing
11from contextlib import contextmanager
13from cryptography import utils, x509
14from cryptography.exceptions import UnsupportedAlgorithm, _Reasons
15from cryptography.hazmat.backends.openssl import aead
16from cryptography.hazmat.backends.openssl.ciphers import _CipherContext
17from cryptography.hazmat.backends.openssl.cmac import _CMACContext
18from cryptography.hazmat.backends.openssl.ec import (
19 _EllipticCurvePrivateKey,
20 _EllipticCurvePublicKey,
21)
22from cryptography.hazmat.backends.openssl.rsa import (
23 _RSAPrivateKey,
24 _RSAPublicKey,
25)
26from cryptography.hazmat.bindings._rust import openssl as rust_openssl
27from cryptography.hazmat.bindings.openssl import binding
28from cryptography.hazmat.primitives import hashes, serialization
29from cryptography.hazmat.primitives._asymmetric import AsymmetricPadding
30from cryptography.hazmat.primitives.asymmetric import (
31 dh,
32 dsa,
33 ec,
34 ed448,
35 ed25519,
36 rsa,
37 x448,
38 x25519,
39)
40from cryptography.hazmat.primitives.asymmetric.padding import (
41 MGF1,
42 OAEP,
43 PSS,
44 PKCS1v15,
45)
46from cryptography.hazmat.primitives.asymmetric.types import (
47 PrivateKeyTypes,
48 PublicKeyTypes,
49)
50from cryptography.hazmat.primitives.ciphers import (
51 BlockCipherAlgorithm,
52 CipherAlgorithm,
53)
54from cryptography.hazmat.primitives.ciphers.algorithms import (
55 AES,
56 AES128,
57 AES256,
58 ARC4,
59 SM4,
60 Camellia,
61 ChaCha20,
62 TripleDES,
63 _BlowfishInternal,
64 _CAST5Internal,
65 _IDEAInternal,
66 _SEEDInternal,
67)
68from cryptography.hazmat.primitives.ciphers.modes import (
69 CBC,
70 CFB,
71 CFB8,
72 CTR,
73 ECB,
74 GCM,
75 OFB,
76 XTS,
77 Mode,
78)
79from cryptography.hazmat.primitives.serialization import ssh
80from cryptography.hazmat.primitives.serialization.pkcs12 import (
81 PBES,
82 PKCS12Certificate,
83 PKCS12KeyAndCertificates,
84 PKCS12PrivateKeyTypes,
85 _PKCS12CATypes,
86)
88_MemoryBIO = collections.namedtuple("_MemoryBIO", ["bio", "char_ptr"])
91# Not actually supported, just used as a marker for some serialization tests.
92class _RC2:
93 pass
96class Backend:
97 """
98 OpenSSL API binding interfaces.
99 """
101 name = "openssl"
103 # FIPS has opinions about acceptable algorithms and key sizes, but the
104 # disallowed algorithms are still present in OpenSSL. They just error if
105 # you try to use them. To avoid that we allowlist the algorithms in
106 # FIPS 140-3. This isn't ideal, but FIPS 140-3 is trash so here we are.
107 _fips_aead = {
108 b"aes-128-ccm",
109 b"aes-192-ccm",
110 b"aes-256-ccm",
111 b"aes-128-gcm",
112 b"aes-192-gcm",
113 b"aes-256-gcm",
114 }
115 # TripleDES encryption is disallowed/deprecated throughout 2023 in
116 # FIPS 140-3. To keep it simple we denylist any use of TripleDES (TDEA).
117 _fips_ciphers = (AES,)
118 # Sometimes SHA1 is still permissible. That logic is contained
119 # within the various *_supported methods.
120 _fips_hashes = (
121 hashes.SHA224,
122 hashes.SHA256,
123 hashes.SHA384,
124 hashes.SHA512,
125 hashes.SHA512_224,
126 hashes.SHA512_256,
127 hashes.SHA3_224,
128 hashes.SHA3_256,
129 hashes.SHA3_384,
130 hashes.SHA3_512,
131 hashes.SHAKE128,
132 hashes.SHAKE256,
133 )
134 _fips_ecdh_curves = (
135 ec.SECP224R1,
136 ec.SECP256R1,
137 ec.SECP384R1,
138 ec.SECP521R1,
139 )
140 _fips_rsa_min_key_size = 2048
141 _fips_rsa_min_public_exponent = 65537
142 _fips_dsa_min_modulus = 1 << 2048
143 _fips_dh_min_key_size = 2048
144 _fips_dh_min_modulus = 1 << _fips_dh_min_key_size
146 def __init__(self) -> None:
147 self._binding = binding.Binding()
148 self._ffi = self._binding.ffi
149 self._lib = self._binding.lib
150 self._fips_enabled = rust_openssl.is_fips_enabled()
152 self._cipher_registry: typing.Dict[
153 typing.Tuple[typing.Type[CipherAlgorithm], typing.Type[Mode]],
154 typing.Callable,
155 ] = {}
156 self._register_default_ciphers()
157 self._dh_types = [self._lib.EVP_PKEY_DH]
158 if self._lib.Cryptography_HAS_EVP_PKEY_DHX:
159 self._dh_types.append(self._lib.EVP_PKEY_DHX)
161 def __repr__(self) -> str:
162 return "<OpenSSLBackend(version: {}, FIPS: {}, Legacy: {})>".format(
163 self.openssl_version_text(),
164 self._fips_enabled,
165 self._binding._legacy_provider_loaded,
166 )
168 def openssl_assert(
169 self,
170 ok: bool,
171 errors: typing.Optional[typing.List[rust_openssl.OpenSSLError]] = None,
172 ) -> None:
173 return binding._openssl_assert(self._lib, ok, errors=errors)
175 def _enable_fips(self) -> None:
176 # This function enables FIPS mode for OpenSSL 3.0.0 on installs that
177 # have the FIPS provider installed properly.
178 self._binding._enable_fips()
179 assert rust_openssl.is_fips_enabled()
180 self._fips_enabled = rust_openssl.is_fips_enabled()
182 def openssl_version_text(self) -> str:
183 """
184 Friendly string name of the loaded OpenSSL library. This is not
185 necessarily the same version as it was compiled against.
187 Example: OpenSSL 1.1.1d 10 Sep 2019
188 """
189 return self._ffi.string(
190 self._lib.OpenSSL_version(self._lib.OPENSSL_VERSION)
191 ).decode("ascii")
193 def openssl_version_number(self) -> int:
194 return self._lib.OpenSSL_version_num()
196 def _evp_md_from_algorithm(self, algorithm: hashes.HashAlgorithm):
197 if algorithm.name == "blake2b" or algorithm.name == "blake2s":
198 alg = "{}{}".format(
199 algorithm.name, algorithm.digest_size * 8
200 ).encode("ascii")
201 else:
202 alg = algorithm.name.encode("ascii")
204 evp_md = self._lib.EVP_get_digestbyname(alg)
205 return evp_md
207 def _evp_md_non_null_from_algorithm(self, algorithm: hashes.HashAlgorithm):
208 evp_md = self._evp_md_from_algorithm(algorithm)
209 self.openssl_assert(evp_md != self._ffi.NULL)
210 return evp_md
212 def hash_supported(self, algorithm: hashes.HashAlgorithm) -> bool:
213 if self._fips_enabled and not isinstance(algorithm, self._fips_hashes):
214 return False
216 evp_md = self._evp_md_from_algorithm(algorithm)
217 return evp_md != self._ffi.NULL
219 def signature_hash_supported(
220 self, algorithm: hashes.HashAlgorithm
221 ) -> bool:
222 # Dedicated check for hashing algorithm use in message digest for
223 # signatures, e.g. RSA PKCS#1 v1.5 SHA1 (sha1WithRSAEncryption).
224 if self._fips_enabled and isinstance(algorithm, hashes.SHA1):
225 return False
226 return self.hash_supported(algorithm)
228 def scrypt_supported(self) -> bool:
229 if self._fips_enabled:
230 return False
231 else:
232 return self._lib.Cryptography_HAS_SCRYPT == 1
234 def hmac_supported(self, algorithm: hashes.HashAlgorithm) -> bool:
235 # FIPS mode still allows SHA1 for HMAC
236 if self._fips_enabled and isinstance(algorithm, hashes.SHA1):
237 return True
239 return self.hash_supported(algorithm)
241 def cipher_supported(self, cipher: CipherAlgorithm, mode: Mode) -> bool:
242 if self._fips_enabled:
243 # FIPS mode requires AES. TripleDES is disallowed/deprecated in
244 # FIPS 140-3.
245 if not isinstance(cipher, self._fips_ciphers):
246 return False
248 try:
249 adapter = self._cipher_registry[type(cipher), type(mode)]
250 except KeyError:
251 return False
252 evp_cipher = adapter(self, cipher, mode)
253 return self._ffi.NULL != evp_cipher
255 def register_cipher_adapter(self, cipher_cls, mode_cls, adapter) -> None:
256 if (cipher_cls, mode_cls) in self._cipher_registry:
257 raise ValueError(
258 "Duplicate registration for: {} {}.".format(
259 cipher_cls, mode_cls
260 )
261 )
262 self._cipher_registry[cipher_cls, mode_cls] = adapter
264 def _register_default_ciphers(self) -> None:
265 for cipher_cls in [AES, AES128, AES256]:
266 for mode_cls in [CBC, CTR, ECB, OFB, CFB, CFB8, GCM]:
267 self.register_cipher_adapter(
268 cipher_cls,
269 mode_cls,
270 GetCipherByName(
271 "{cipher.name}-{cipher.key_size}-{mode.name}"
272 ),
273 )
274 for mode_cls in [CBC, CTR, ECB, OFB, CFB]:
275 self.register_cipher_adapter(
276 Camellia,
277 mode_cls,
278 GetCipherByName("{cipher.name}-{cipher.key_size}-{mode.name}"),
279 )
280 for mode_cls in [CBC, CFB, CFB8, OFB]:
281 self.register_cipher_adapter(
282 TripleDES, mode_cls, GetCipherByName("des-ede3-{mode.name}")
283 )
284 self.register_cipher_adapter(
285 TripleDES, ECB, GetCipherByName("des-ede3")
286 )
287 self.register_cipher_adapter(
288 ChaCha20, type(None), GetCipherByName("chacha20")
289 )
290 self.register_cipher_adapter(AES, XTS, _get_xts_cipher)
291 for mode_cls in [ECB, CBC, OFB, CFB, CTR]:
292 self.register_cipher_adapter(
293 SM4, mode_cls, GetCipherByName("sm4-{mode.name}")
294 )
295 # Don't register legacy ciphers if they're unavailable. Hypothetically
296 # this wouldn't be necessary because we test availability by seeing if
297 # we get an EVP_CIPHER * in the _CipherContext __init__, but OpenSSL 3
298 # will return a valid pointer even though the cipher is unavailable.
299 if (
300 self._binding._legacy_provider_loaded
301 or not self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER
302 ):
303 for mode_cls in [CBC, CFB, OFB, ECB]:
304 self.register_cipher_adapter(
305 _BlowfishInternal,
306 mode_cls,
307 GetCipherByName("bf-{mode.name}"),
308 )
309 for mode_cls in [CBC, CFB, OFB, ECB]:
310 self.register_cipher_adapter(
311 _SEEDInternal,
312 mode_cls,
313 GetCipherByName("seed-{mode.name}"),
314 )
315 for cipher_cls, mode_cls in itertools.product(
316 [_CAST5Internal, _IDEAInternal],
317 [CBC, OFB, CFB, ECB],
318 ):
319 self.register_cipher_adapter(
320 cipher_cls,
321 mode_cls,
322 GetCipherByName("{cipher.name}-{mode.name}"),
323 )
324 self.register_cipher_adapter(
325 ARC4, type(None), GetCipherByName("rc4")
326 )
327 # We don't actually support RC2, this is just used by some tests.
328 self.register_cipher_adapter(
329 _RC2, type(None), GetCipherByName("rc2")
330 )
332 def create_symmetric_encryption_ctx(
333 self, cipher: CipherAlgorithm, mode: Mode
334 ) -> _CipherContext:
335 return _CipherContext(self, cipher, mode, _CipherContext._ENCRYPT)
337 def create_symmetric_decryption_ctx(
338 self, cipher: CipherAlgorithm, mode: Mode
339 ) -> _CipherContext:
340 return _CipherContext(self, cipher, mode, _CipherContext._DECRYPT)
342 def pbkdf2_hmac_supported(self, algorithm: hashes.HashAlgorithm) -> bool:
343 return self.hmac_supported(algorithm)
345 def _consume_errors(self) -> typing.List[rust_openssl.OpenSSLError]:
346 return rust_openssl.capture_error_stack()
348 def _bn_to_int(self, bn) -> int:
349 assert bn != self._ffi.NULL
350 self.openssl_assert(not self._lib.BN_is_negative(bn))
352 bn_num_bytes = self._lib.BN_num_bytes(bn)
353 bin_ptr = self._ffi.new("unsigned char[]", bn_num_bytes)
354 bin_len = self._lib.BN_bn2bin(bn, bin_ptr)
355 # A zero length means the BN has value 0
356 self.openssl_assert(bin_len >= 0)
357 val = int.from_bytes(self._ffi.buffer(bin_ptr)[:bin_len], "big")
358 return val
360 def _int_to_bn(self, num: int):
361 """
362 Converts a python integer to a BIGNUM. The returned BIGNUM will not
363 be garbage collected (to support adding them to structs that take
364 ownership of the object). Be sure to register it for GC if it will
365 be discarded after use.
366 """
367 binary = num.to_bytes(int(num.bit_length() / 8.0 + 1), "big")
368 bn_ptr = self._lib.BN_bin2bn(binary, len(binary), self._ffi.NULL)
369 self.openssl_assert(bn_ptr != self._ffi.NULL)
370 return bn_ptr
372 def generate_rsa_private_key(
373 self, public_exponent: int, key_size: int
374 ) -> rsa.RSAPrivateKey:
375 rsa._verify_rsa_parameters(public_exponent, key_size)
377 rsa_cdata = self._lib.RSA_new()
378 self.openssl_assert(rsa_cdata != self._ffi.NULL)
379 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
381 bn = self._int_to_bn(public_exponent)
382 bn = self._ffi.gc(bn, self._lib.BN_free)
384 res = self._lib.RSA_generate_key_ex(
385 rsa_cdata, key_size, bn, self._ffi.NULL
386 )
387 self.openssl_assert(res == 1)
388 evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata)
390 # We can skip RSA key validation here since we just generated the key
391 return _RSAPrivateKey(
392 self, rsa_cdata, evp_pkey, unsafe_skip_rsa_key_validation=True
393 )
395 def generate_rsa_parameters_supported(
396 self, public_exponent: int, key_size: int
397 ) -> bool:
398 return (
399 public_exponent >= 3
400 and public_exponent & 1 != 0
401 and key_size >= 512
402 )
404 def load_rsa_private_numbers(
405 self,
406 numbers: rsa.RSAPrivateNumbers,
407 unsafe_skip_rsa_key_validation: bool,
408 ) -> rsa.RSAPrivateKey:
409 rsa._check_private_key_components(
410 numbers.p,
411 numbers.q,
412 numbers.d,
413 numbers.dmp1,
414 numbers.dmq1,
415 numbers.iqmp,
416 numbers.public_numbers.e,
417 numbers.public_numbers.n,
418 )
419 rsa_cdata = self._lib.RSA_new()
420 self.openssl_assert(rsa_cdata != self._ffi.NULL)
421 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
422 p = self._int_to_bn(numbers.p)
423 q = self._int_to_bn(numbers.q)
424 d = self._int_to_bn(numbers.d)
425 dmp1 = self._int_to_bn(numbers.dmp1)
426 dmq1 = self._int_to_bn(numbers.dmq1)
427 iqmp = self._int_to_bn(numbers.iqmp)
428 e = self._int_to_bn(numbers.public_numbers.e)
429 n = self._int_to_bn(numbers.public_numbers.n)
430 res = self._lib.RSA_set0_factors(rsa_cdata, p, q)
431 self.openssl_assert(res == 1)
432 res = self._lib.RSA_set0_key(rsa_cdata, n, e, d)
433 self.openssl_assert(res == 1)
434 res = self._lib.RSA_set0_crt_params(rsa_cdata, dmp1, dmq1, iqmp)
435 self.openssl_assert(res == 1)
436 evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata)
438 return _RSAPrivateKey(
439 self,
440 rsa_cdata,
441 evp_pkey,
442 unsafe_skip_rsa_key_validation=unsafe_skip_rsa_key_validation,
443 )
445 def load_rsa_public_numbers(
446 self, numbers: rsa.RSAPublicNumbers
447 ) -> rsa.RSAPublicKey:
448 rsa._check_public_key_components(numbers.e, numbers.n)
449 rsa_cdata = self._lib.RSA_new()
450 self.openssl_assert(rsa_cdata != self._ffi.NULL)
451 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
452 e = self._int_to_bn(numbers.e)
453 n = self._int_to_bn(numbers.n)
454 res = self._lib.RSA_set0_key(rsa_cdata, n, e, self._ffi.NULL)
455 self.openssl_assert(res == 1)
456 evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata)
458 return _RSAPublicKey(self, rsa_cdata, evp_pkey)
460 def _create_evp_pkey_gc(self):
461 evp_pkey = self._lib.EVP_PKEY_new()
462 self.openssl_assert(evp_pkey != self._ffi.NULL)
463 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
464 return evp_pkey
466 def _rsa_cdata_to_evp_pkey(self, rsa_cdata):
467 evp_pkey = self._create_evp_pkey_gc()
468 res = self._lib.EVP_PKEY_set1_RSA(evp_pkey, rsa_cdata)
469 self.openssl_assert(res == 1)
470 return evp_pkey
472 def _bytes_to_bio(self, data: bytes) -> _MemoryBIO:
473 """
474 Return a _MemoryBIO namedtuple of (BIO, char*).
476 The char* is the storage for the BIO and it must stay alive until the
477 BIO is finished with.
478 """
479 data_ptr = self._ffi.from_buffer(data)
480 bio = self._lib.BIO_new_mem_buf(data_ptr, len(data))
481 self.openssl_assert(bio != self._ffi.NULL)
483 return _MemoryBIO(self._ffi.gc(bio, self._lib.BIO_free), data_ptr)
485 def _create_mem_bio_gc(self):
486 """
487 Creates an empty memory BIO.
488 """
489 bio_method = self._lib.BIO_s_mem()
490 self.openssl_assert(bio_method != self._ffi.NULL)
491 bio = self._lib.BIO_new(bio_method)
492 self.openssl_assert(bio != self._ffi.NULL)
493 bio = self._ffi.gc(bio, self._lib.BIO_free)
494 return bio
496 def _read_mem_bio(self, bio) -> bytes:
497 """
498 Reads a memory BIO. This only works on memory BIOs.
499 """
500 buf = self._ffi.new("char **")
501 buf_len = self._lib.BIO_get_mem_data(bio, buf)
502 self.openssl_assert(buf_len > 0)
503 self.openssl_assert(buf[0] != self._ffi.NULL)
504 bio_data = self._ffi.buffer(buf[0], buf_len)[:]
505 return bio_data
507 def _evp_pkey_to_private_key(
508 self, evp_pkey, unsafe_skip_rsa_key_validation: bool
509 ) -> PrivateKeyTypes:
510 """
511 Return the appropriate type of PrivateKey given an evp_pkey cdata
512 pointer.
513 """
515 key_type = self._lib.EVP_PKEY_id(evp_pkey)
517 if key_type == self._lib.EVP_PKEY_RSA:
518 rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey)
519 self.openssl_assert(rsa_cdata != self._ffi.NULL)
520 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
521 return _RSAPrivateKey(
522 self,
523 rsa_cdata,
524 evp_pkey,
525 unsafe_skip_rsa_key_validation=unsafe_skip_rsa_key_validation,
526 )
527 elif (
528 key_type == self._lib.EVP_PKEY_RSA_PSS
529 and not self._lib.CRYPTOGRAPHY_IS_LIBRESSL
530 and not self._lib.CRYPTOGRAPHY_IS_BORINGSSL
531 and not self._lib.CRYPTOGRAPHY_OPENSSL_LESS_THAN_111E
532 ):
533 # At the moment the way we handle RSA PSS keys is to strip the
534 # PSS constraints from them and treat them as normal RSA keys
535 # Unfortunately the RSA * itself tracks this data so we need to
536 # extract, serialize, and reload it without the constraints.
537 rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey)
538 self.openssl_assert(rsa_cdata != self._ffi.NULL)
539 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
540 bio = self._create_mem_bio_gc()
541 res = self._lib.i2d_RSAPrivateKey_bio(bio, rsa_cdata)
542 self.openssl_assert(res == 1)
543 return self.load_der_private_key(
544 self._read_mem_bio(bio),
545 password=None,
546 unsafe_skip_rsa_key_validation=unsafe_skip_rsa_key_validation,
547 )
548 elif key_type == self._lib.EVP_PKEY_DSA:
549 return rust_openssl.dsa.private_key_from_ptr(
550 int(self._ffi.cast("uintptr_t", evp_pkey))
551 )
552 elif key_type == self._lib.EVP_PKEY_EC:
553 ec_cdata = self._lib.EVP_PKEY_get1_EC_KEY(evp_pkey)
554 self.openssl_assert(ec_cdata != self._ffi.NULL)
555 ec_cdata = self._ffi.gc(ec_cdata, self._lib.EC_KEY_free)
556 return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey)
557 elif key_type in self._dh_types:
558 return rust_openssl.dh.private_key_from_ptr(
559 int(self._ffi.cast("uintptr_t", evp_pkey))
560 )
561 elif key_type == getattr(self._lib, "EVP_PKEY_ED25519", None):
562 # EVP_PKEY_ED25519 is not present in CRYPTOGRAPHY_IS_LIBRESSL
563 return rust_openssl.ed25519.private_key_from_ptr(
564 int(self._ffi.cast("uintptr_t", evp_pkey))
565 )
566 elif key_type == getattr(self._lib, "EVP_PKEY_X448", None):
567 # EVP_PKEY_X448 is not present in CRYPTOGRAPHY_IS_LIBRESSL
568 return rust_openssl.x448.private_key_from_ptr(
569 int(self._ffi.cast("uintptr_t", evp_pkey))
570 )
571 elif key_type == self._lib.EVP_PKEY_X25519:
572 return rust_openssl.x25519.private_key_from_ptr(
573 int(self._ffi.cast("uintptr_t", evp_pkey))
574 )
575 elif key_type == getattr(self._lib, "EVP_PKEY_ED448", None):
576 # EVP_PKEY_ED448 is not present in CRYPTOGRAPHY_IS_LIBRESSL
577 return rust_openssl.ed448.private_key_from_ptr(
578 int(self._ffi.cast("uintptr_t", evp_pkey))
579 )
580 else:
581 raise UnsupportedAlgorithm("Unsupported key type.")
583 def _evp_pkey_to_public_key(self, evp_pkey) -> PublicKeyTypes:
584 """
585 Return the appropriate type of PublicKey given an evp_pkey cdata
586 pointer.
587 """
589 key_type = self._lib.EVP_PKEY_id(evp_pkey)
591 if key_type == self._lib.EVP_PKEY_RSA:
592 rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey)
593 self.openssl_assert(rsa_cdata != self._ffi.NULL)
594 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
595 return _RSAPublicKey(self, rsa_cdata, evp_pkey)
596 elif (
597 key_type == self._lib.EVP_PKEY_RSA_PSS
598 and not self._lib.CRYPTOGRAPHY_IS_LIBRESSL
599 and not self._lib.CRYPTOGRAPHY_IS_BORINGSSL
600 and not self._lib.CRYPTOGRAPHY_OPENSSL_LESS_THAN_111E
601 ):
602 rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey)
603 self.openssl_assert(rsa_cdata != self._ffi.NULL)
604 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
605 bio = self._create_mem_bio_gc()
606 res = self._lib.i2d_RSAPublicKey_bio(bio, rsa_cdata)
607 self.openssl_assert(res == 1)
608 return self.load_der_public_key(self._read_mem_bio(bio))
609 elif key_type == self._lib.EVP_PKEY_DSA:
610 return rust_openssl.dsa.public_key_from_ptr(
611 int(self._ffi.cast("uintptr_t", evp_pkey))
612 )
613 elif key_type == self._lib.EVP_PKEY_EC:
614 ec_cdata = self._lib.EVP_PKEY_get1_EC_KEY(evp_pkey)
615 if ec_cdata == self._ffi.NULL:
616 errors = self._consume_errors()
617 raise ValueError("Unable to load EC key", errors)
618 ec_cdata = self._ffi.gc(ec_cdata, self._lib.EC_KEY_free)
619 return _EllipticCurvePublicKey(self, ec_cdata, evp_pkey)
620 elif key_type in self._dh_types:
621 return rust_openssl.dh.public_key_from_ptr(
622 int(self._ffi.cast("uintptr_t", evp_pkey))
623 )
624 elif key_type == getattr(self._lib, "EVP_PKEY_ED25519", None):
625 # EVP_PKEY_ED25519 is not present in CRYPTOGRAPHY_IS_LIBRESSL
626 return rust_openssl.ed25519.public_key_from_ptr(
627 int(self._ffi.cast("uintptr_t", evp_pkey))
628 )
629 elif key_type == getattr(self._lib, "EVP_PKEY_X448", None):
630 # EVP_PKEY_X448 is not present in CRYPTOGRAPHY_IS_LIBRESSL
631 return rust_openssl.x448.public_key_from_ptr(
632 int(self._ffi.cast("uintptr_t", evp_pkey))
633 )
634 elif key_type == self._lib.EVP_PKEY_X25519:
635 return rust_openssl.x25519.public_key_from_ptr(
636 int(self._ffi.cast("uintptr_t", evp_pkey))
637 )
638 elif key_type == getattr(self._lib, "EVP_PKEY_ED448", None):
639 # EVP_PKEY_ED448 is not present in CRYPTOGRAPHY_IS_LIBRESSL
640 return rust_openssl.ed448.public_key_from_ptr(
641 int(self._ffi.cast("uintptr_t", evp_pkey))
642 )
643 else:
644 raise UnsupportedAlgorithm("Unsupported key type.")
646 def _oaep_hash_supported(self, algorithm: hashes.HashAlgorithm) -> bool:
647 if self._fips_enabled and isinstance(algorithm, hashes.SHA1):
648 return False
650 return isinstance(
651 algorithm,
652 (
653 hashes.SHA1,
654 hashes.SHA224,
655 hashes.SHA256,
656 hashes.SHA384,
657 hashes.SHA512,
658 ),
659 )
661 def rsa_padding_supported(self, padding: AsymmetricPadding) -> bool:
662 if isinstance(padding, PKCS1v15):
663 return True
664 elif isinstance(padding, PSS) and isinstance(padding._mgf, MGF1):
665 # SHA1 is permissible in MGF1 in FIPS even when SHA1 is blocked
666 # as signature algorithm.
667 if self._fips_enabled and isinstance(
668 padding._mgf._algorithm, hashes.SHA1
669 ):
670 return True
671 else:
672 return self.hash_supported(padding._mgf._algorithm)
673 elif isinstance(padding, OAEP) and isinstance(padding._mgf, MGF1):
674 return self._oaep_hash_supported(
675 padding._mgf._algorithm
676 ) and self._oaep_hash_supported(padding._algorithm)
677 else:
678 return False
680 def rsa_encryption_supported(self, padding: AsymmetricPadding) -> bool:
681 if self._fips_enabled and isinstance(padding, PKCS1v15):
682 return False
683 else:
684 return self.rsa_padding_supported(padding)
686 def generate_dsa_parameters(self, key_size: int) -> dsa.DSAParameters:
687 if key_size not in (1024, 2048, 3072, 4096):
688 raise ValueError(
689 "Key size must be 1024, 2048, 3072, or 4096 bits."
690 )
692 return rust_openssl.dsa.generate_parameters(key_size)
694 def generate_dsa_private_key(
695 self, parameters: dsa.DSAParameters
696 ) -> dsa.DSAPrivateKey:
697 return parameters.generate_private_key()
699 def generate_dsa_private_key_and_parameters(
700 self, key_size: int
701 ) -> dsa.DSAPrivateKey:
702 parameters = self.generate_dsa_parameters(key_size)
703 return self.generate_dsa_private_key(parameters)
705 def load_dsa_private_numbers(
706 self, numbers: dsa.DSAPrivateNumbers
707 ) -> dsa.DSAPrivateKey:
708 dsa._check_dsa_private_numbers(numbers)
709 return rust_openssl.dsa.from_private_numbers(numbers)
711 def load_dsa_public_numbers(
712 self, numbers: dsa.DSAPublicNumbers
713 ) -> dsa.DSAPublicKey:
714 dsa._check_dsa_parameters(numbers.parameter_numbers)
715 return rust_openssl.dsa.from_public_numbers(numbers)
717 def load_dsa_parameter_numbers(
718 self, numbers: dsa.DSAParameterNumbers
719 ) -> dsa.DSAParameters:
720 dsa._check_dsa_parameters(numbers)
721 return rust_openssl.dsa.from_parameter_numbers(numbers)
723 def dsa_supported(self) -> bool:
724 return (
725 not self._lib.CRYPTOGRAPHY_IS_BORINGSSL and not self._fips_enabled
726 )
728 def dsa_hash_supported(self, algorithm: hashes.HashAlgorithm) -> bool:
729 if not self.dsa_supported():
730 return False
731 return self.signature_hash_supported(algorithm)
733 def cmac_algorithm_supported(self, algorithm) -> bool:
734 return self.cipher_supported(
735 algorithm, CBC(b"\x00" * algorithm.block_size)
736 )
738 def create_cmac_ctx(self, algorithm: BlockCipherAlgorithm) -> _CMACContext:
739 return _CMACContext(self, algorithm)
741 def load_pem_private_key(
742 self,
743 data: bytes,
744 password: typing.Optional[bytes],
745 unsafe_skip_rsa_key_validation: bool,
746 ) -> PrivateKeyTypes:
747 return self._load_key(
748 self._lib.PEM_read_bio_PrivateKey,
749 data,
750 password,
751 unsafe_skip_rsa_key_validation,
752 )
754 def load_pem_public_key(self, data: bytes) -> PublicKeyTypes:
755 mem_bio = self._bytes_to_bio(data)
756 # In OpenSSL 3.0.x the PEM_read_bio_PUBKEY function will invoke
757 # the default password callback if you pass an encrypted private
758 # key. This is very, very, very bad as the default callback can
759 # trigger an interactive console prompt, which will hang the
760 # Python process. We therefore provide our own callback to
761 # catch this and error out properly.
762 userdata = self._ffi.new("CRYPTOGRAPHY_PASSWORD_DATA *")
763 evp_pkey = self._lib.PEM_read_bio_PUBKEY(
764 mem_bio.bio,
765 self._ffi.NULL,
766 self._ffi.addressof(
767 self._lib._original_lib, "Cryptography_pem_password_cb"
768 ),
769 userdata,
770 )
771 if evp_pkey != self._ffi.NULL:
772 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
773 return self._evp_pkey_to_public_key(evp_pkey)
774 else:
775 # It's not a (RSA/DSA/ECDSA) subjectPublicKeyInfo, but we still
776 # need to check to see if it is a pure PKCS1 RSA public key (not
777 # embedded in a subjectPublicKeyInfo)
778 self._consume_errors()
779 res = self._lib.BIO_reset(mem_bio.bio)
780 self.openssl_assert(res == 1)
781 rsa_cdata = self._lib.PEM_read_bio_RSAPublicKey(
782 mem_bio.bio,
783 self._ffi.NULL,
784 self._ffi.addressof(
785 self._lib._original_lib, "Cryptography_pem_password_cb"
786 ),
787 userdata,
788 )
789 if rsa_cdata != self._ffi.NULL:
790 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
791 evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata)
792 return _RSAPublicKey(self, rsa_cdata, evp_pkey)
793 else:
794 self._handle_key_loading_error()
796 def load_pem_parameters(self, data: bytes) -> dh.DHParameters:
797 return rust_openssl.dh.from_pem_parameters(data)
799 def load_der_private_key(
800 self,
801 data: bytes,
802 password: typing.Optional[bytes],
803 unsafe_skip_rsa_key_validation: bool,
804 ) -> PrivateKeyTypes:
805 # OpenSSL has a function called d2i_AutoPrivateKey that in theory
806 # handles this automatically, however it doesn't handle encrypted
807 # private keys. Instead we try to load the key two different ways.
808 # First we'll try to load it as a traditional key.
809 bio_data = self._bytes_to_bio(data)
810 key = self._evp_pkey_from_der_traditional_key(bio_data, password)
811 if key:
812 return self._evp_pkey_to_private_key(
813 key, unsafe_skip_rsa_key_validation
814 )
815 else:
816 # Finally we try to load it with the method that handles encrypted
817 # PKCS8 properly.
818 return self._load_key(
819 self._lib.d2i_PKCS8PrivateKey_bio,
820 data,
821 password,
822 unsafe_skip_rsa_key_validation,
823 )
825 def _evp_pkey_from_der_traditional_key(self, bio_data, password):
826 key = self._lib.d2i_PrivateKey_bio(bio_data.bio, self._ffi.NULL)
827 if key != self._ffi.NULL:
828 key = self._ffi.gc(key, self._lib.EVP_PKEY_free)
829 if password is not None:
830 raise TypeError(
831 "Password was given but private key is not encrypted."
832 )
834 return key
835 else:
836 self._consume_errors()
837 return None
839 def load_der_public_key(self, data: bytes) -> PublicKeyTypes:
840 mem_bio = self._bytes_to_bio(data)
841 evp_pkey = self._lib.d2i_PUBKEY_bio(mem_bio.bio, self._ffi.NULL)
842 if evp_pkey != self._ffi.NULL:
843 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
844 return self._evp_pkey_to_public_key(evp_pkey)
845 else:
846 # It's not a (RSA/DSA/ECDSA) subjectPublicKeyInfo, but we still
847 # need to check to see if it is a pure PKCS1 RSA public key (not
848 # embedded in a subjectPublicKeyInfo)
849 self._consume_errors()
850 res = self._lib.BIO_reset(mem_bio.bio)
851 self.openssl_assert(res == 1)
852 rsa_cdata = self._lib.d2i_RSAPublicKey_bio(
853 mem_bio.bio, self._ffi.NULL
854 )
855 if rsa_cdata != self._ffi.NULL:
856 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
857 evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata)
858 return _RSAPublicKey(self, rsa_cdata, evp_pkey)
859 else:
860 self._handle_key_loading_error()
862 def load_der_parameters(self, data: bytes) -> dh.DHParameters:
863 return rust_openssl.dh.from_der_parameters(data)
865 def _cert2ossl(self, cert: x509.Certificate) -> typing.Any:
866 data = cert.public_bytes(serialization.Encoding.DER)
867 mem_bio = self._bytes_to_bio(data)
868 x509 = self._lib.d2i_X509_bio(mem_bio.bio, self._ffi.NULL)
869 self.openssl_assert(x509 != self._ffi.NULL)
870 x509 = self._ffi.gc(x509, self._lib.X509_free)
871 return x509
873 def _ossl2cert(self, x509_ptr: typing.Any) -> x509.Certificate:
874 bio = self._create_mem_bio_gc()
875 res = self._lib.i2d_X509_bio(bio, x509_ptr)
876 self.openssl_assert(res == 1)
877 return x509.load_der_x509_certificate(self._read_mem_bio(bio))
879 def _key2ossl(self, key: PKCS12PrivateKeyTypes) -> typing.Any:
880 data = key.private_bytes(
881 serialization.Encoding.DER,
882 serialization.PrivateFormat.PKCS8,
883 serialization.NoEncryption(),
884 )
885 mem_bio = self._bytes_to_bio(data)
887 evp_pkey = self._lib.d2i_PrivateKey_bio(
888 mem_bio.bio,
889 self._ffi.NULL,
890 )
891 self.openssl_assert(evp_pkey != self._ffi.NULL)
892 return self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
894 def _load_key(
895 self, openssl_read_func, data, password, unsafe_skip_rsa_key_validation
896 ) -> PrivateKeyTypes:
897 mem_bio = self._bytes_to_bio(data)
899 userdata = self._ffi.new("CRYPTOGRAPHY_PASSWORD_DATA *")
900 if password is not None:
901 utils._check_byteslike("password", password)
902 password_ptr = self._ffi.from_buffer(password)
903 userdata.password = password_ptr
904 userdata.length = len(password)
906 evp_pkey = openssl_read_func(
907 mem_bio.bio,
908 self._ffi.NULL,
909 self._ffi.addressof(
910 self._lib._original_lib, "Cryptography_pem_password_cb"
911 ),
912 userdata,
913 )
915 if evp_pkey == self._ffi.NULL:
916 if userdata.error != 0:
917 self._consume_errors()
918 if userdata.error == -1:
919 raise TypeError(
920 "Password was not given but private key is encrypted"
921 )
922 else:
923 assert userdata.error == -2
924 raise ValueError(
925 "Passwords longer than {} bytes are not supported "
926 "by this backend.".format(userdata.maxsize - 1)
927 )
928 else:
929 self._handle_key_loading_error()
931 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
933 if password is not None and userdata.called == 0:
934 raise TypeError(
935 "Password was given but private key is not encrypted."
936 )
938 assert (
939 password is not None and userdata.called == 1
940 ) or password is None
942 return self._evp_pkey_to_private_key(
943 evp_pkey, unsafe_skip_rsa_key_validation
944 )
946 def _handle_key_loading_error(self) -> typing.NoReturn:
947 errors = self._consume_errors()
949 if not errors:
950 raise ValueError(
951 "Could not deserialize key data. The data may be in an "
952 "incorrect format or it may be encrypted with an unsupported "
953 "algorithm."
954 )
956 elif (
957 errors[0]._lib_reason_match(
958 self._lib.ERR_LIB_EVP, self._lib.EVP_R_BAD_DECRYPT
959 )
960 or errors[0]._lib_reason_match(
961 self._lib.ERR_LIB_PKCS12,
962 self._lib.PKCS12_R_PKCS12_CIPHERFINAL_ERROR,
963 )
964 or (
965 self._lib.Cryptography_HAS_PROVIDERS
966 and errors[0]._lib_reason_match(
967 self._lib.ERR_LIB_PROV,
968 self._lib.PROV_R_BAD_DECRYPT,
969 )
970 )
971 ):
972 raise ValueError("Bad decrypt. Incorrect password?")
974 elif any(
975 error._lib_reason_match(
976 self._lib.ERR_LIB_EVP,
977 self._lib.EVP_R_UNSUPPORTED_PRIVATE_KEY_ALGORITHM,
978 )
979 for error in errors
980 ):
981 raise ValueError("Unsupported public key algorithm.")
983 else:
984 raise ValueError(
985 "Could not deserialize key data. The data may be in an "
986 "incorrect format, it may be encrypted with an unsupported "
987 "algorithm, or it may be an unsupported key type (e.g. EC "
988 "curves with explicit parameters).",
989 errors,
990 )
992 def elliptic_curve_supported(self, curve: ec.EllipticCurve) -> bool:
993 try:
994 curve_nid = self._elliptic_curve_to_nid(curve)
995 except UnsupportedAlgorithm:
996 curve_nid = self._lib.NID_undef
998 group = self._lib.EC_GROUP_new_by_curve_name(curve_nid)
1000 if group == self._ffi.NULL:
1001 self._consume_errors()
1002 return False
1003 else:
1004 self.openssl_assert(curve_nid != self._lib.NID_undef)
1005 self._lib.EC_GROUP_free(group)
1006 return True
1008 def elliptic_curve_signature_algorithm_supported(
1009 self,
1010 signature_algorithm: ec.EllipticCurveSignatureAlgorithm,
1011 curve: ec.EllipticCurve,
1012 ) -> bool:
1013 # We only support ECDSA right now.
1014 if not isinstance(signature_algorithm, ec.ECDSA):
1015 return False
1017 return self.elliptic_curve_supported(curve)
1019 def generate_elliptic_curve_private_key(
1020 self, curve: ec.EllipticCurve
1021 ) -> ec.EllipticCurvePrivateKey:
1022 """
1023 Generate a new private key on the named curve.
1024 """
1026 if self.elliptic_curve_supported(curve):
1027 ec_cdata = self._ec_key_new_by_curve(curve)
1029 res = self._lib.EC_KEY_generate_key(ec_cdata)
1030 self.openssl_assert(res == 1)
1032 evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata)
1034 return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey)
1035 else:
1036 raise UnsupportedAlgorithm(
1037 f"Backend object does not support {curve.name}.",
1038 _Reasons.UNSUPPORTED_ELLIPTIC_CURVE,
1039 )
1041 def load_elliptic_curve_private_numbers(
1042 self, numbers: ec.EllipticCurvePrivateNumbers
1043 ) -> ec.EllipticCurvePrivateKey:
1044 public = numbers.public_numbers
1046 ec_cdata = self._ec_key_new_by_curve(public.curve)
1048 private_value = self._ffi.gc(
1049 self._int_to_bn(numbers.private_value), self._lib.BN_clear_free
1050 )
1051 res = self._lib.EC_KEY_set_private_key(ec_cdata, private_value)
1052 if res != 1:
1053 self._consume_errors()
1054 raise ValueError("Invalid EC key.")
1056 with self._tmp_bn_ctx() as bn_ctx:
1057 self._ec_key_set_public_key_affine_coordinates(
1058 ec_cdata, public.x, public.y, bn_ctx
1059 )
1060 # derive the expected public point and compare it to the one we
1061 # just set based on the values we were given. If they don't match
1062 # this isn't a valid key pair.
1063 group = self._lib.EC_KEY_get0_group(ec_cdata)
1064 self.openssl_assert(group != self._ffi.NULL)
1065 set_point = backend._lib.EC_KEY_get0_public_key(ec_cdata)
1066 self.openssl_assert(set_point != self._ffi.NULL)
1067 computed_point = self._lib.EC_POINT_new(group)
1068 self.openssl_assert(computed_point != self._ffi.NULL)
1069 computed_point = self._ffi.gc(
1070 computed_point, self._lib.EC_POINT_free
1071 )
1072 res = self._lib.EC_POINT_mul(
1073 group,
1074 computed_point,
1075 private_value,
1076 self._ffi.NULL,
1077 self._ffi.NULL,
1078 bn_ctx,
1079 )
1080 self.openssl_assert(res == 1)
1081 if (
1082 self._lib.EC_POINT_cmp(
1083 group, set_point, computed_point, bn_ctx
1084 )
1085 != 0
1086 ):
1087 raise ValueError("Invalid EC key.")
1089 evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata)
1091 return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey)
1093 def load_elliptic_curve_public_numbers(
1094 self, numbers: ec.EllipticCurvePublicNumbers
1095 ) -> ec.EllipticCurvePublicKey:
1096 ec_cdata = self._ec_key_new_by_curve(numbers.curve)
1097 with self._tmp_bn_ctx() as bn_ctx:
1098 self._ec_key_set_public_key_affine_coordinates(
1099 ec_cdata, numbers.x, numbers.y, bn_ctx
1100 )
1101 evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata)
1103 return _EllipticCurvePublicKey(self, ec_cdata, evp_pkey)
1105 def load_elliptic_curve_public_bytes(
1106 self, curve: ec.EllipticCurve, point_bytes: bytes
1107 ) -> ec.EllipticCurvePublicKey:
1108 ec_cdata = self._ec_key_new_by_curve(curve)
1109 group = self._lib.EC_KEY_get0_group(ec_cdata)
1110 self.openssl_assert(group != self._ffi.NULL)
1111 point = self._lib.EC_POINT_new(group)
1112 self.openssl_assert(point != self._ffi.NULL)
1113 point = self._ffi.gc(point, self._lib.EC_POINT_free)
1114 with self._tmp_bn_ctx() as bn_ctx:
1115 res = self._lib.EC_POINT_oct2point(
1116 group, point, point_bytes, len(point_bytes), bn_ctx
1117 )
1118 if res != 1:
1119 self._consume_errors()
1120 raise ValueError("Invalid public bytes for the given curve")
1122 res = self._lib.EC_KEY_set_public_key(ec_cdata, point)
1123 self.openssl_assert(res == 1)
1124 evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata)
1125 return _EllipticCurvePublicKey(self, ec_cdata, evp_pkey)
1127 def derive_elliptic_curve_private_key(
1128 self, private_value: int, curve: ec.EllipticCurve
1129 ) -> ec.EllipticCurvePrivateKey:
1130 ec_cdata = self._ec_key_new_by_curve(curve)
1132 group = self._lib.EC_KEY_get0_group(ec_cdata)
1133 self.openssl_assert(group != self._ffi.NULL)
1135 point = self._lib.EC_POINT_new(group)
1136 self.openssl_assert(point != self._ffi.NULL)
1137 point = self._ffi.gc(point, self._lib.EC_POINT_free)
1139 value = self._int_to_bn(private_value)
1140 value = self._ffi.gc(value, self._lib.BN_clear_free)
1142 with self._tmp_bn_ctx() as bn_ctx:
1143 res = self._lib.EC_POINT_mul(
1144 group, point, value, self._ffi.NULL, self._ffi.NULL, bn_ctx
1145 )
1146 self.openssl_assert(res == 1)
1148 bn_x = self._lib.BN_CTX_get(bn_ctx)
1149 bn_y = self._lib.BN_CTX_get(bn_ctx)
1151 res = self._lib.EC_POINT_get_affine_coordinates(
1152 group, point, bn_x, bn_y, bn_ctx
1153 )
1154 if res != 1:
1155 self._consume_errors()
1156 raise ValueError("Unable to derive key from private_value")
1158 res = self._lib.EC_KEY_set_public_key(ec_cdata, point)
1159 self.openssl_assert(res == 1)
1160 private = self._int_to_bn(private_value)
1161 private = self._ffi.gc(private, self._lib.BN_clear_free)
1162 res = self._lib.EC_KEY_set_private_key(ec_cdata, private)
1163 self.openssl_assert(res == 1)
1165 evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata)
1167 return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey)
1169 def _ec_key_new_by_curve(self, curve: ec.EllipticCurve):
1170 curve_nid = self._elliptic_curve_to_nid(curve)
1171 return self._ec_key_new_by_curve_nid(curve_nid)
1173 def _ec_key_new_by_curve_nid(self, curve_nid: int):
1174 ec_cdata = self._lib.EC_KEY_new_by_curve_name(curve_nid)
1175 self.openssl_assert(ec_cdata != self._ffi.NULL)
1176 return self._ffi.gc(ec_cdata, self._lib.EC_KEY_free)
1178 def elliptic_curve_exchange_algorithm_supported(
1179 self, algorithm: ec.ECDH, curve: ec.EllipticCurve
1180 ) -> bool:
1181 if self._fips_enabled and not isinstance(
1182 curve, self._fips_ecdh_curves
1183 ):
1184 return False
1186 return self.elliptic_curve_supported(curve) and isinstance(
1187 algorithm, ec.ECDH
1188 )
1190 def _ec_cdata_to_evp_pkey(self, ec_cdata):
1191 evp_pkey = self._create_evp_pkey_gc()
1192 res = self._lib.EVP_PKEY_set1_EC_KEY(evp_pkey, ec_cdata)
1193 self.openssl_assert(res == 1)
1194 return evp_pkey
1196 def _elliptic_curve_to_nid(self, curve: ec.EllipticCurve) -> int:
1197 """
1198 Get the NID for a curve name.
1199 """
1201 curve_aliases = {"secp192r1": "prime192v1", "secp256r1": "prime256v1"}
1203 curve_name = curve_aliases.get(curve.name, curve.name)
1205 curve_nid = self._lib.OBJ_sn2nid(curve_name.encode())
1206 if curve_nid == self._lib.NID_undef:
1207 raise UnsupportedAlgorithm(
1208 f"{curve.name} is not a supported elliptic curve",
1209 _Reasons.UNSUPPORTED_ELLIPTIC_CURVE,
1210 )
1211 return curve_nid
1213 @contextmanager
1214 def _tmp_bn_ctx(self):
1215 bn_ctx = self._lib.BN_CTX_new()
1216 self.openssl_assert(bn_ctx != self._ffi.NULL)
1217 bn_ctx = self._ffi.gc(bn_ctx, self._lib.BN_CTX_free)
1218 self._lib.BN_CTX_start(bn_ctx)
1219 try:
1220 yield bn_ctx
1221 finally:
1222 self._lib.BN_CTX_end(bn_ctx)
1224 def _ec_key_set_public_key_affine_coordinates(
1225 self,
1226 ec_cdata,
1227 x: int,
1228 y: int,
1229 bn_ctx,
1230 ) -> None:
1231 """
1232 Sets the public key point in the EC_KEY context to the affine x and y
1233 values.
1234 """
1236 if x < 0 or y < 0:
1237 raise ValueError(
1238 "Invalid EC key. Both x and y must be non-negative."
1239 )
1241 x = self._ffi.gc(self._int_to_bn(x), self._lib.BN_free)
1242 y = self._ffi.gc(self._int_to_bn(y), self._lib.BN_free)
1243 group = self._lib.EC_KEY_get0_group(ec_cdata)
1244 self.openssl_assert(group != self._ffi.NULL)
1245 point = self._lib.EC_POINT_new(group)
1246 self.openssl_assert(point != self._ffi.NULL)
1247 point = self._ffi.gc(point, self._lib.EC_POINT_free)
1248 res = self._lib.EC_POINT_set_affine_coordinates(
1249 group, point, x, y, bn_ctx
1250 )
1251 if res != 1:
1252 self._consume_errors()
1253 raise ValueError("Invalid EC key.")
1254 res = self._lib.EC_KEY_set_public_key(ec_cdata, point)
1255 self.openssl_assert(res == 1)
1257 def _private_key_bytes(
1258 self,
1259 encoding: serialization.Encoding,
1260 format: serialization.PrivateFormat,
1261 encryption_algorithm: serialization.KeySerializationEncryption,
1262 key,
1263 evp_pkey,
1264 cdata,
1265 ) -> bytes:
1266 # validate argument types
1267 if not isinstance(encoding, serialization.Encoding):
1268 raise TypeError("encoding must be an item from the Encoding enum")
1269 if not isinstance(format, serialization.PrivateFormat):
1270 raise TypeError(
1271 "format must be an item from the PrivateFormat enum"
1272 )
1273 if not isinstance(
1274 encryption_algorithm, serialization.KeySerializationEncryption
1275 ):
1276 raise TypeError(
1277 "Encryption algorithm must be a KeySerializationEncryption "
1278 "instance"
1279 )
1281 # validate password
1282 if isinstance(encryption_algorithm, serialization.NoEncryption):
1283 password = b""
1284 elif isinstance(
1285 encryption_algorithm, serialization.BestAvailableEncryption
1286 ):
1287 password = encryption_algorithm.password
1288 if len(password) > 1023:
1289 raise ValueError(
1290 "Passwords longer than 1023 bytes are not supported by "
1291 "this backend"
1292 )
1293 elif (
1294 isinstance(
1295 encryption_algorithm, serialization._KeySerializationEncryption
1296 )
1297 and encryption_algorithm._format
1298 is format
1299 is serialization.PrivateFormat.OpenSSH
1300 ):
1301 password = encryption_algorithm.password
1302 else:
1303 raise ValueError("Unsupported encryption type")
1305 # PKCS8 + PEM/DER
1306 if format is serialization.PrivateFormat.PKCS8:
1307 if encoding is serialization.Encoding.PEM:
1308 write_bio = self._lib.PEM_write_bio_PKCS8PrivateKey
1309 elif encoding is serialization.Encoding.DER:
1310 write_bio = self._lib.i2d_PKCS8PrivateKey_bio
1311 else:
1312 raise ValueError("Unsupported encoding for PKCS8")
1313 return self._private_key_bytes_via_bio(
1314 write_bio, evp_pkey, password
1315 )
1317 # TraditionalOpenSSL + PEM/DER
1318 if format is serialization.PrivateFormat.TraditionalOpenSSL:
1319 if self._fips_enabled and not isinstance(
1320 encryption_algorithm, serialization.NoEncryption
1321 ):
1322 raise ValueError(
1323 "Encrypted traditional OpenSSL format is not "
1324 "supported in FIPS mode."
1325 )
1326 key_type = self._lib.EVP_PKEY_id(evp_pkey)
1328 if encoding is serialization.Encoding.PEM:
1329 if key_type == self._lib.EVP_PKEY_RSA:
1330 write_bio = self._lib.PEM_write_bio_RSAPrivateKey
1331 else:
1332 assert key_type == self._lib.EVP_PKEY_EC
1333 write_bio = self._lib.PEM_write_bio_ECPrivateKey
1334 return self._private_key_bytes_via_bio(
1335 write_bio, cdata, password
1336 )
1338 if encoding is serialization.Encoding.DER:
1339 if password:
1340 raise ValueError(
1341 "Encryption is not supported for DER encoded "
1342 "traditional OpenSSL keys"
1343 )
1344 if key_type == self._lib.EVP_PKEY_RSA:
1345 write_bio = self._lib.i2d_RSAPrivateKey_bio
1346 else:
1347 assert key_type == self._lib.EVP_PKEY_EC
1348 write_bio = self._lib.i2d_ECPrivateKey_bio
1349 return self._bio_func_output(write_bio, cdata)
1351 raise ValueError("Unsupported encoding for TraditionalOpenSSL")
1353 # OpenSSH + PEM
1354 if format is serialization.PrivateFormat.OpenSSH:
1355 if encoding is serialization.Encoding.PEM:
1356 return ssh._serialize_ssh_private_key(
1357 key, password, encryption_algorithm
1358 )
1360 raise ValueError(
1361 "OpenSSH private key format can only be used"
1362 " with PEM encoding"
1363 )
1365 # Anything that key-specific code was supposed to handle earlier,
1366 # like Raw.
1367 raise ValueError("format is invalid with this key")
1369 def _private_key_bytes_via_bio(
1370 self, write_bio, evp_pkey, password
1371 ) -> bytes:
1372 if not password:
1373 evp_cipher = self._ffi.NULL
1374 else:
1375 # This is a curated value that we will update over time.
1376 evp_cipher = self._lib.EVP_get_cipherbyname(b"aes-256-cbc")
1378 return self._bio_func_output(
1379 write_bio,
1380 evp_pkey,
1381 evp_cipher,
1382 password,
1383 len(password),
1384 self._ffi.NULL,
1385 self._ffi.NULL,
1386 )
1388 def _bio_func_output(self, write_bio, *args) -> bytes:
1389 bio = self._create_mem_bio_gc()
1390 res = write_bio(bio, *args)
1391 self.openssl_assert(res == 1)
1392 return self._read_mem_bio(bio)
1394 def _public_key_bytes(
1395 self,
1396 encoding: serialization.Encoding,
1397 format: serialization.PublicFormat,
1398 key,
1399 evp_pkey,
1400 cdata,
1401 ) -> bytes:
1402 if not isinstance(encoding, serialization.Encoding):
1403 raise TypeError("encoding must be an item from the Encoding enum")
1404 if not isinstance(format, serialization.PublicFormat):
1405 raise TypeError(
1406 "format must be an item from the PublicFormat enum"
1407 )
1409 # SubjectPublicKeyInfo + PEM/DER
1410 if format is serialization.PublicFormat.SubjectPublicKeyInfo:
1411 if encoding is serialization.Encoding.PEM:
1412 write_bio = self._lib.PEM_write_bio_PUBKEY
1413 elif encoding is serialization.Encoding.DER:
1414 write_bio = self._lib.i2d_PUBKEY_bio
1415 else:
1416 raise ValueError(
1417 "SubjectPublicKeyInfo works only with PEM or DER encoding"
1418 )
1419 return self._bio_func_output(write_bio, evp_pkey)
1421 # PKCS1 + PEM/DER
1422 if format is serialization.PublicFormat.PKCS1:
1423 # Only RSA is supported here.
1424 key_type = self._lib.EVP_PKEY_id(evp_pkey)
1425 if key_type != self._lib.EVP_PKEY_RSA:
1426 raise ValueError("PKCS1 format is supported only for RSA keys")
1428 if encoding is serialization.Encoding.PEM:
1429 write_bio = self._lib.PEM_write_bio_RSAPublicKey
1430 elif encoding is serialization.Encoding.DER:
1431 write_bio = self._lib.i2d_RSAPublicKey_bio
1432 else:
1433 raise ValueError("PKCS1 works only with PEM or DER encoding")
1434 return self._bio_func_output(write_bio, cdata)
1436 # OpenSSH + OpenSSH
1437 if format is serialization.PublicFormat.OpenSSH:
1438 if encoding is serialization.Encoding.OpenSSH:
1439 return ssh.serialize_ssh_public_key(key)
1441 raise ValueError(
1442 "OpenSSH format must be used with OpenSSH encoding"
1443 )
1445 # Anything that key-specific code was supposed to handle earlier,
1446 # like Raw, CompressedPoint, UncompressedPoint
1447 raise ValueError("format is invalid with this key")
1449 def dh_supported(self) -> bool:
1450 return not self._lib.CRYPTOGRAPHY_IS_BORINGSSL
1452 def generate_dh_parameters(
1453 self, generator: int, key_size: int
1454 ) -> dh.DHParameters:
1455 return rust_openssl.dh.generate_parameters(generator, key_size)
1457 def generate_dh_private_key(
1458 self, parameters: dh.DHParameters
1459 ) -> dh.DHPrivateKey:
1460 return parameters.generate_private_key()
1462 def generate_dh_private_key_and_parameters(
1463 self, generator: int, key_size: int
1464 ) -> dh.DHPrivateKey:
1465 return self.generate_dh_private_key(
1466 self.generate_dh_parameters(generator, key_size)
1467 )
1469 def load_dh_private_numbers(
1470 self, numbers: dh.DHPrivateNumbers
1471 ) -> dh.DHPrivateKey:
1472 return rust_openssl.dh.from_private_numbers(numbers)
1474 def load_dh_public_numbers(
1475 self, numbers: dh.DHPublicNumbers
1476 ) -> dh.DHPublicKey:
1477 return rust_openssl.dh.from_public_numbers(numbers)
1479 def load_dh_parameter_numbers(
1480 self, numbers: dh.DHParameterNumbers
1481 ) -> dh.DHParameters:
1482 return rust_openssl.dh.from_parameter_numbers(numbers)
1484 def dh_parameters_supported(
1485 self, p: int, g: int, q: typing.Optional[int] = None
1486 ) -> bool:
1487 try:
1488 rust_openssl.dh.from_parameter_numbers(
1489 dh.DHParameterNumbers(p=p, g=g, q=q)
1490 )
1491 except ValueError:
1492 return False
1493 else:
1494 return True
1496 def dh_x942_serialization_supported(self) -> bool:
1497 return self._lib.Cryptography_HAS_EVP_PKEY_DHX == 1
1499 def x25519_load_public_bytes(self, data: bytes) -> x25519.X25519PublicKey:
1500 return rust_openssl.x25519.from_public_bytes(data)
1502 def x25519_load_private_bytes(
1503 self, data: bytes
1504 ) -> x25519.X25519PrivateKey:
1505 return rust_openssl.x25519.from_private_bytes(data)
1507 def x25519_generate_key(self) -> x25519.X25519PrivateKey:
1508 return rust_openssl.x25519.generate_key()
1510 def x25519_supported(self) -> bool:
1511 if self._fips_enabled:
1512 return False
1513 return not self._lib.CRYPTOGRAPHY_LIBRESSL_LESS_THAN_370
1515 def x448_load_public_bytes(self, data: bytes) -> x448.X448PublicKey:
1516 return rust_openssl.x448.from_public_bytes(data)
1518 def x448_load_private_bytes(self, data: bytes) -> x448.X448PrivateKey:
1519 return rust_openssl.x448.from_private_bytes(data)
1521 def x448_generate_key(self) -> x448.X448PrivateKey:
1522 return rust_openssl.x448.generate_key()
1524 def x448_supported(self) -> bool:
1525 if self._fips_enabled:
1526 return False
1527 return (
1528 not self._lib.CRYPTOGRAPHY_IS_LIBRESSL
1529 and not self._lib.CRYPTOGRAPHY_IS_BORINGSSL
1530 )
1532 def ed25519_supported(self) -> bool:
1533 if self._fips_enabled:
1534 return False
1535 return self._lib.CRYPTOGRAPHY_HAS_WORKING_ED25519
1537 def ed25519_load_public_bytes(
1538 self, data: bytes
1539 ) -> ed25519.Ed25519PublicKey:
1540 return rust_openssl.ed25519.from_public_bytes(data)
1542 def ed25519_load_private_bytes(
1543 self, data: bytes
1544 ) -> ed25519.Ed25519PrivateKey:
1545 return rust_openssl.ed25519.from_private_bytes(data)
1547 def ed25519_generate_key(self) -> ed25519.Ed25519PrivateKey:
1548 return rust_openssl.ed25519.generate_key()
1550 def ed448_supported(self) -> bool:
1551 if self._fips_enabled:
1552 return False
1553 return (
1554 not self._lib.CRYPTOGRAPHY_IS_LIBRESSL
1555 and not self._lib.CRYPTOGRAPHY_IS_BORINGSSL
1556 )
1558 def ed448_load_public_bytes(self, data: bytes) -> ed448.Ed448PublicKey:
1559 return rust_openssl.ed448.from_public_bytes(data)
1561 def ed448_load_private_bytes(self, data: bytes) -> ed448.Ed448PrivateKey:
1562 return rust_openssl.ed448.from_private_bytes(data)
1564 def ed448_generate_key(self) -> ed448.Ed448PrivateKey:
1565 return rust_openssl.ed448.generate_key()
1567 def aead_cipher_supported(self, cipher) -> bool:
1568 return aead._aead_cipher_supported(self, cipher)
1570 def _zero_data(self, data, length: int) -> None:
1571 # We clear things this way because at the moment we're not
1572 # sure of a better way that can guarantee it overwrites the
1573 # memory of a bytearray and doesn't just replace the underlying char *.
1574 for i in range(length):
1575 data[i] = 0
1577 @contextlib.contextmanager
1578 def _zeroed_null_terminated_buf(self, data):
1579 """
1580 This method takes bytes, which can be a bytestring or a mutable
1581 buffer like a bytearray, and yields a null-terminated version of that
1582 data. This is required because PKCS12_parse doesn't take a length with
1583 its password char * and ffi.from_buffer doesn't provide null
1584 termination. So, to support zeroing the data via bytearray we
1585 need to build this ridiculous construct that copies the memory, but
1586 zeroes it after use.
1587 """
1588 if data is None:
1589 yield self._ffi.NULL
1590 else:
1591 data_len = len(data)
1592 buf = self._ffi.new("char[]", data_len + 1)
1593 self._ffi.memmove(buf, data, data_len)
1594 try:
1595 yield buf
1596 finally:
1597 # Cast to a uint8_t * so we can assign by integer
1598 self._zero_data(self._ffi.cast("uint8_t *", buf), data_len)
1600 def load_key_and_certificates_from_pkcs12(
1601 self, data: bytes, password: typing.Optional[bytes]
1602 ) -> typing.Tuple[
1603 typing.Optional[PrivateKeyTypes],
1604 typing.Optional[x509.Certificate],
1605 typing.List[x509.Certificate],
1606 ]:
1607 pkcs12 = self.load_pkcs12(data, password)
1608 return (
1609 pkcs12.key,
1610 pkcs12.cert.certificate if pkcs12.cert else None,
1611 [cert.certificate for cert in pkcs12.additional_certs],
1612 )
1614 def load_pkcs12(
1615 self, data: bytes, password: typing.Optional[bytes]
1616 ) -> PKCS12KeyAndCertificates:
1617 if password is not None:
1618 utils._check_byteslike("password", password)
1620 bio = self._bytes_to_bio(data)
1621 p12 = self._lib.d2i_PKCS12_bio(bio.bio, self._ffi.NULL)
1622 if p12 == self._ffi.NULL:
1623 self._consume_errors()
1624 raise ValueError("Could not deserialize PKCS12 data")
1626 p12 = self._ffi.gc(p12, self._lib.PKCS12_free)
1627 evp_pkey_ptr = self._ffi.new("EVP_PKEY **")
1628 x509_ptr = self._ffi.new("X509 **")
1629 sk_x509_ptr = self._ffi.new("Cryptography_STACK_OF_X509 **")
1630 with self._zeroed_null_terminated_buf(password) as password_buf:
1631 res = self._lib.PKCS12_parse(
1632 p12, password_buf, evp_pkey_ptr, x509_ptr, sk_x509_ptr
1633 )
1634 if res == 0:
1635 self._consume_errors()
1636 raise ValueError("Invalid password or PKCS12 data")
1638 cert = None
1639 key = None
1640 additional_certificates = []
1642 if evp_pkey_ptr[0] != self._ffi.NULL:
1643 evp_pkey = self._ffi.gc(evp_pkey_ptr[0], self._lib.EVP_PKEY_free)
1644 # We don't support turning off RSA key validation when loading
1645 # PKCS12 keys
1646 key = self._evp_pkey_to_private_key(
1647 evp_pkey, unsafe_skip_rsa_key_validation=False
1648 )
1650 if x509_ptr[0] != self._ffi.NULL:
1651 x509 = self._ffi.gc(x509_ptr[0], self._lib.X509_free)
1652 cert_obj = self._ossl2cert(x509)
1653 name = None
1654 maybe_name = self._lib.X509_alias_get0(x509, self._ffi.NULL)
1655 if maybe_name != self._ffi.NULL:
1656 name = self._ffi.string(maybe_name)
1657 cert = PKCS12Certificate(cert_obj, name)
1659 if sk_x509_ptr[0] != self._ffi.NULL:
1660 sk_x509 = self._ffi.gc(sk_x509_ptr[0], self._lib.sk_X509_free)
1661 num = self._lib.sk_X509_num(sk_x509_ptr[0])
1663 # In OpenSSL < 3.0.0 PKCS12 parsing reverses the order of the
1664 # certificates.
1665 indices: typing.Iterable[int]
1666 if (
1667 self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER
1668 or self._lib.CRYPTOGRAPHY_IS_BORINGSSL
1669 ):
1670 indices = range(num)
1671 else:
1672 indices = reversed(range(num))
1674 for i in indices:
1675 x509 = self._lib.sk_X509_value(sk_x509, i)
1676 self.openssl_assert(x509 != self._ffi.NULL)
1677 x509 = self._ffi.gc(x509, self._lib.X509_free)
1678 addl_cert = self._ossl2cert(x509)
1679 addl_name = None
1680 maybe_name = self._lib.X509_alias_get0(x509, self._ffi.NULL)
1681 if maybe_name != self._ffi.NULL:
1682 addl_name = self._ffi.string(maybe_name)
1683 additional_certificates.append(
1684 PKCS12Certificate(addl_cert, addl_name)
1685 )
1687 return PKCS12KeyAndCertificates(key, cert, additional_certificates)
1689 def serialize_key_and_certificates_to_pkcs12(
1690 self,
1691 name: typing.Optional[bytes],
1692 key: typing.Optional[PKCS12PrivateKeyTypes],
1693 cert: typing.Optional[x509.Certificate],
1694 cas: typing.Optional[typing.List[_PKCS12CATypes]],
1695 encryption_algorithm: serialization.KeySerializationEncryption,
1696 ) -> bytes:
1697 password = None
1698 if name is not None:
1699 utils._check_bytes("name", name)
1701 if isinstance(encryption_algorithm, serialization.NoEncryption):
1702 nid_cert = -1
1703 nid_key = -1
1704 pkcs12_iter = 0
1705 mac_iter = 0
1706 mac_alg = self._ffi.NULL
1707 elif isinstance(
1708 encryption_algorithm, serialization.BestAvailableEncryption
1709 ):
1710 # PKCS12 encryption is hopeless trash and can never be fixed.
1711 # OpenSSL 3 supports PBESv2, but Libre and Boring do not, so
1712 # we use PBESv1 with 3DES on the older paths.
1713 if self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER:
1714 nid_cert = self._lib.NID_aes_256_cbc
1715 nid_key = self._lib.NID_aes_256_cbc
1716 else:
1717 nid_cert = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC
1718 nid_key = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC
1719 # At least we can set this higher than OpenSSL's default
1720 pkcs12_iter = 20000
1721 # mac_iter chosen for compatibility reasons, see:
1722 # https://www.openssl.org/docs/man1.1.1/man3/PKCS12_create.html
1723 # Did we mention how lousy PKCS12 encryption is?
1724 mac_iter = 1
1725 # MAC algorithm can only be set on OpenSSL 3.0.0+
1726 mac_alg = self._ffi.NULL
1727 password = encryption_algorithm.password
1728 elif (
1729 isinstance(
1730 encryption_algorithm, serialization._KeySerializationEncryption
1731 )
1732 and encryption_algorithm._format
1733 is serialization.PrivateFormat.PKCS12
1734 ):
1735 # Default to OpenSSL's defaults. Behavior will vary based on the
1736 # version of OpenSSL cryptography is compiled against.
1737 nid_cert = 0
1738 nid_key = 0
1739 # Use the default iters we use in best available
1740 pkcs12_iter = 20000
1741 # See the Best Available comment for why this is 1
1742 mac_iter = 1
1743 password = encryption_algorithm.password
1744 keycertalg = encryption_algorithm._key_cert_algorithm
1745 if keycertalg is PBES.PBESv1SHA1And3KeyTripleDESCBC:
1746 nid_cert = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC
1747 nid_key = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC
1748 elif keycertalg is PBES.PBESv2SHA256AndAES256CBC:
1749 if not self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER:
1750 raise UnsupportedAlgorithm(
1751 "PBESv2 is not supported by this version of OpenSSL"
1752 )
1753 nid_cert = self._lib.NID_aes_256_cbc
1754 nid_key = self._lib.NID_aes_256_cbc
1755 else:
1756 assert keycertalg is None
1757 # We use OpenSSL's defaults
1759 if encryption_algorithm._hmac_hash is not None:
1760 if not self._lib.Cryptography_HAS_PKCS12_SET_MAC:
1761 raise UnsupportedAlgorithm(
1762 "Setting MAC algorithm is not supported by this "
1763 "version of OpenSSL."
1764 )
1765 mac_alg = self._evp_md_non_null_from_algorithm(
1766 encryption_algorithm._hmac_hash
1767 )
1768 self.openssl_assert(mac_alg != self._ffi.NULL)
1769 else:
1770 mac_alg = self._ffi.NULL
1772 if encryption_algorithm._kdf_rounds is not None:
1773 pkcs12_iter = encryption_algorithm._kdf_rounds
1775 else:
1776 raise ValueError("Unsupported key encryption type")
1778 if cas is None or len(cas) == 0:
1779 sk_x509 = self._ffi.NULL
1780 else:
1781 sk_x509 = self._lib.sk_X509_new_null()
1782 sk_x509 = self._ffi.gc(sk_x509, self._lib.sk_X509_free)
1784 # This list is to keep the x509 values alive until end of function
1785 ossl_cas = []
1786 for ca in cas:
1787 if isinstance(ca, PKCS12Certificate):
1788 ca_alias = ca.friendly_name
1789 ossl_ca = self._cert2ossl(ca.certificate)
1790 if ca_alias is None:
1791 res = self._lib.X509_alias_set1(
1792 ossl_ca, self._ffi.NULL, -1
1793 )
1794 else:
1795 res = self._lib.X509_alias_set1(
1796 ossl_ca, ca_alias, len(ca_alias)
1797 )
1798 self.openssl_assert(res == 1)
1799 else:
1800 ossl_ca = self._cert2ossl(ca)
1801 ossl_cas.append(ossl_ca)
1802 res = self._lib.sk_X509_push(sk_x509, ossl_ca)
1803 backend.openssl_assert(res >= 1)
1805 with self._zeroed_null_terminated_buf(password) as password_buf:
1806 with self._zeroed_null_terminated_buf(name) as name_buf:
1807 ossl_cert = self._cert2ossl(cert) if cert else self._ffi.NULL
1808 ossl_pkey = (
1809 self._key2ossl(key) if key is not None else self._ffi.NULL
1810 )
1812 p12 = self._lib.PKCS12_create(
1813 password_buf,
1814 name_buf,
1815 ossl_pkey,
1816 ossl_cert,
1817 sk_x509,
1818 nid_key,
1819 nid_cert,
1820 pkcs12_iter,
1821 mac_iter,
1822 0,
1823 )
1825 if (
1826 self._lib.Cryptography_HAS_PKCS12_SET_MAC
1827 and mac_alg != self._ffi.NULL
1828 ):
1829 self._lib.PKCS12_set_mac(
1830 p12,
1831 password_buf,
1832 -1,
1833 self._ffi.NULL,
1834 0,
1835 mac_iter,
1836 mac_alg,
1837 )
1839 self.openssl_assert(p12 != self._ffi.NULL)
1840 p12 = self._ffi.gc(p12, self._lib.PKCS12_free)
1842 bio = self._create_mem_bio_gc()
1843 res = self._lib.i2d_PKCS12_bio(bio, p12)
1844 self.openssl_assert(res > 0)
1845 return self._read_mem_bio(bio)
1847 def poly1305_supported(self) -> bool:
1848 if self._fips_enabled:
1849 return False
1850 return self._lib.Cryptography_HAS_POLY1305 == 1
1852 def pkcs7_supported(self) -> bool:
1853 return not self._lib.CRYPTOGRAPHY_IS_BORINGSSL
1855 def load_pem_pkcs7_certificates(
1856 self, data: bytes
1857 ) -> typing.List[x509.Certificate]:
1858 utils._check_bytes("data", data)
1859 bio = self._bytes_to_bio(data)
1860 p7 = self._lib.PEM_read_bio_PKCS7(
1861 bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL
1862 )
1863 if p7 == self._ffi.NULL:
1864 self._consume_errors()
1865 raise ValueError("Unable to parse PKCS7 data")
1867 p7 = self._ffi.gc(p7, self._lib.PKCS7_free)
1868 return self._load_pkcs7_certificates(p7)
1870 def load_der_pkcs7_certificates(
1871 self, data: bytes
1872 ) -> typing.List[x509.Certificate]:
1873 utils._check_bytes("data", data)
1874 bio = self._bytes_to_bio(data)
1875 p7 = self._lib.d2i_PKCS7_bio(bio.bio, self._ffi.NULL)
1876 if p7 == self._ffi.NULL:
1877 self._consume_errors()
1878 raise ValueError("Unable to parse PKCS7 data")
1880 p7 = self._ffi.gc(p7, self._lib.PKCS7_free)
1881 return self._load_pkcs7_certificates(p7)
1883 def _load_pkcs7_certificates(self, p7) -> typing.List[x509.Certificate]:
1884 nid = self._lib.OBJ_obj2nid(p7.type)
1885 self.openssl_assert(nid != self._lib.NID_undef)
1886 if nid != self._lib.NID_pkcs7_signed:
1887 raise UnsupportedAlgorithm(
1888 "Only basic signed structures are currently supported. NID"
1889 " for this data was {}".format(nid),
1890 _Reasons.UNSUPPORTED_SERIALIZATION,
1891 )
1893 sk_x509 = p7.d.sign.cert
1894 num = self._lib.sk_X509_num(sk_x509)
1895 certs = []
1896 for i in range(num):
1897 x509 = self._lib.sk_X509_value(sk_x509, i)
1898 self.openssl_assert(x509 != self._ffi.NULL)
1899 cert = self._ossl2cert(x509)
1900 certs.append(cert)
1902 return certs
1905class GetCipherByName:
1906 def __init__(self, fmt: str):
1907 self._fmt = fmt
1909 def __call__(self, backend: Backend, cipher: CipherAlgorithm, mode: Mode):
1910 cipher_name = self._fmt.format(cipher=cipher, mode=mode).lower()
1911 evp_cipher = backend._lib.EVP_get_cipherbyname(
1912 cipher_name.encode("ascii")
1913 )
1915 # try EVP_CIPHER_fetch if present
1916 if (
1917 evp_cipher == backend._ffi.NULL
1918 and backend._lib.Cryptography_HAS_300_EVP_CIPHER
1919 ):
1920 evp_cipher = backend._lib.EVP_CIPHER_fetch(
1921 backend._ffi.NULL,
1922 cipher_name.encode("ascii"),
1923 backend._ffi.NULL,
1924 )
1926 backend._consume_errors()
1927 return evp_cipher
1930def _get_xts_cipher(backend: Backend, cipher: AES, mode):
1931 cipher_name = f"aes-{cipher.key_size // 2}-xts"
1932 return backend._lib.EVP_get_cipherbyname(cipher_name.encode("ascii"))
1935backend = Backend()