Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/cryptography/hazmat/backends/openssl/backend.py: 24%
1231 statements
« prev ^ index » next coverage.py v7.2.1, created at 2023-03-14 06:36 +0000
« prev ^ index » next coverage.py v7.2.1, created at 2023-03-14 06:36 +0000
1# This file is dual licensed under the terms of the Apache License, Version
2# 2.0, and the BSD License. See the LICENSE file in the root of this repository
3# for complete details.
6import collections
7import contextlib
8import itertools
9import typing
10import warnings
11from contextlib import contextmanager
13from cryptography import utils, x509
14from cryptography.exceptions import UnsupportedAlgorithm, _Reasons
15from cryptography.hazmat.backends.openssl import aead
16from cryptography.hazmat.backends.openssl.ciphers import _CipherContext
17from cryptography.hazmat.backends.openssl.cmac import _CMACContext
18from cryptography.hazmat.backends.openssl.dh import (
19 _dh_params_dup,
20 _DHParameters,
21 _DHPrivateKey,
22 _DHPublicKey,
23)
24from cryptography.hazmat.backends.openssl.dsa import (
25 _DSAParameters,
26 _DSAPrivateKey,
27 _DSAPublicKey,
28)
29from cryptography.hazmat.backends.openssl.ec import (
30 _EllipticCurvePrivateKey,
31 _EllipticCurvePublicKey,
32)
33from cryptography.hazmat.backends.openssl.ed448 import (
34 _ED448_KEY_SIZE,
35 _Ed448PrivateKey,
36 _Ed448PublicKey,
37)
38from cryptography.hazmat.backends.openssl.ed25519 import (
39 _Ed25519PrivateKey,
40 _Ed25519PublicKey,
41)
42from cryptography.hazmat.backends.openssl.hashes import _HashContext
43from cryptography.hazmat.backends.openssl.hmac import _HMACContext
44from cryptography.hazmat.backends.openssl.poly1305 import (
45 _POLY1305_KEY_SIZE,
46 _Poly1305Context,
47)
48from cryptography.hazmat.backends.openssl.rsa import (
49 _RSAPrivateKey,
50 _RSAPublicKey,
51)
52from cryptography.hazmat.backends.openssl.x448 import (
53 _X448PrivateKey,
54 _X448PublicKey,
55)
56from cryptography.hazmat.backends.openssl.x25519 import (
57 _X25519PrivateKey,
58 _X25519PublicKey,
59)
60from cryptography.hazmat.bindings.openssl import binding
61from cryptography.hazmat.primitives import hashes, serialization
62from cryptography.hazmat.primitives._asymmetric import AsymmetricPadding
63from cryptography.hazmat.primitives.asymmetric import (
64 dh,
65 dsa,
66 ec,
67 ed448,
68 ed25519,
69 rsa,
70 x448,
71 x25519,
72)
73from cryptography.hazmat.primitives.asymmetric.padding import (
74 MGF1,
75 OAEP,
76 PSS,
77 PKCS1v15,
78)
79from cryptography.hazmat.primitives.asymmetric.types import (
80 PrivateKeyTypes,
81 PublicKeyTypes,
82)
83from cryptography.hazmat.primitives.ciphers import (
84 BlockCipherAlgorithm,
85 CipherAlgorithm,
86)
87from cryptography.hazmat.primitives.ciphers.algorithms import (
88 AES,
89 AES128,
90 AES256,
91 ARC4,
92 SM4,
93 Camellia,
94 ChaCha20,
95 TripleDES,
96 _BlowfishInternal,
97 _CAST5Internal,
98 _IDEAInternal,
99 _SEEDInternal,
100)
101from cryptography.hazmat.primitives.ciphers.modes import (
102 CBC,
103 CFB,
104 CFB8,
105 CTR,
106 ECB,
107 GCM,
108 OFB,
109 XTS,
110 Mode,
111)
112from cryptography.hazmat.primitives.kdf import scrypt
113from cryptography.hazmat.primitives.serialization import ssh
114from cryptography.hazmat.primitives.serialization.pkcs12 import (
115 PBES,
116 PKCS12Certificate,
117 PKCS12KeyAndCertificates,
118 PKCS12PrivateKeyTypes,
119 _PKCS12CATypes,
120)
122_MemoryBIO = collections.namedtuple("_MemoryBIO", ["bio", "char_ptr"])
125# Not actually supported, just used as a marker for some serialization tests.
126class _RC2:
127 pass
130class Backend:
131 """
132 OpenSSL API binding interfaces.
133 """
135 name = "openssl"
137 # FIPS has opinions about acceptable algorithms and key sizes, but the
138 # disallowed algorithms are still present in OpenSSL. They just error if
139 # you try to use them. To avoid that we allowlist the algorithms in
140 # FIPS 140-3. This isn't ideal, but FIPS 140-3 is trash so here we are.
141 _fips_aead = {
142 b"aes-128-ccm",
143 b"aes-192-ccm",
144 b"aes-256-ccm",
145 b"aes-128-gcm",
146 b"aes-192-gcm",
147 b"aes-256-gcm",
148 }
149 # TripleDES encryption is disallowed/deprecated throughout 2023 in
150 # FIPS 140-3. To keep it simple we denylist any use of TripleDES (TDEA).
151 _fips_ciphers = (AES,)
152 # Sometimes SHA1 is still permissible. That logic is contained
153 # within the various *_supported methods.
154 _fips_hashes = (
155 hashes.SHA224,
156 hashes.SHA256,
157 hashes.SHA384,
158 hashes.SHA512,
159 hashes.SHA512_224,
160 hashes.SHA512_256,
161 hashes.SHA3_224,
162 hashes.SHA3_256,
163 hashes.SHA3_384,
164 hashes.SHA3_512,
165 hashes.SHAKE128,
166 hashes.SHAKE256,
167 )
168 _fips_ecdh_curves = (
169 ec.SECP224R1,
170 ec.SECP256R1,
171 ec.SECP384R1,
172 ec.SECP521R1,
173 )
174 _fips_rsa_min_key_size = 2048
175 _fips_rsa_min_public_exponent = 65537
176 _fips_dsa_min_modulus = 1 << 2048
177 _fips_dh_min_key_size = 2048
178 _fips_dh_min_modulus = 1 << _fips_dh_min_key_size
180 def __init__(self) -> None:
181 self._binding = binding.Binding()
182 self._ffi = self._binding.ffi
183 self._lib = self._binding.lib
184 self._fips_enabled = self._is_fips_enabled()
186 self._cipher_registry: typing.Dict[
187 typing.Tuple[typing.Type[CipherAlgorithm], typing.Type[Mode]],
188 typing.Callable,
189 ] = {}
190 self._register_default_ciphers()
191 if self._fips_enabled and self._lib.CRYPTOGRAPHY_NEEDS_OSRANDOM_ENGINE:
192 warnings.warn(
193 "OpenSSL FIPS mode is enabled. Can't enable DRBG fork safety.",
194 UserWarning,
195 )
196 else:
197 self.activate_osrandom_engine()
198 self._dh_types = [self._lib.EVP_PKEY_DH]
199 if self._lib.Cryptography_HAS_EVP_PKEY_DHX:
200 self._dh_types.append(self._lib.EVP_PKEY_DHX)
202 def __repr__(self) -> str:
203 return "<OpenSSLBackend(version: {}, FIPS: {}, Legacy: {})>".format(
204 self.openssl_version_text(),
205 self._fips_enabled,
206 self._binding._legacy_provider_loaded,
207 )
209 def openssl_assert(
210 self,
211 ok: bool,
212 errors: typing.Optional[typing.List[binding._OpenSSLError]] = None,
213 ) -> None:
214 return binding._openssl_assert(self._lib, ok, errors=errors)
216 def _is_fips_enabled(self) -> bool:
217 if self._lib.Cryptography_HAS_300_FIPS:
218 mode = self._lib.EVP_default_properties_is_fips_enabled(
219 self._ffi.NULL
220 )
221 else:
222 mode = self._lib.FIPS_mode()
224 if mode == 0:
225 # OpenSSL without FIPS pushes an error on the error stack
226 self._lib.ERR_clear_error()
227 return bool(mode)
229 def _enable_fips(self) -> None:
230 # This function enables FIPS mode for OpenSSL 3.0.0 on installs that
231 # have the FIPS provider installed properly.
232 self._binding._enable_fips()
233 assert self._is_fips_enabled()
234 self._fips_enabled = self._is_fips_enabled()
236 def activate_builtin_random(self) -> None:
237 if self._lib.CRYPTOGRAPHY_NEEDS_OSRANDOM_ENGINE:
238 # Obtain a new structural reference.
239 e = self._lib.ENGINE_get_default_RAND()
240 if e != self._ffi.NULL:
241 self._lib.ENGINE_unregister_RAND(e)
242 # Reset the RNG to use the built-in.
243 res = self._lib.RAND_set_rand_method(self._ffi.NULL)
244 self.openssl_assert(res == 1)
245 # decrement the structural reference from get_default_RAND
246 res = self._lib.ENGINE_finish(e)
247 self.openssl_assert(res == 1)
249 @contextlib.contextmanager
250 def _get_osurandom_engine(self):
251 # Fetches an engine by id and returns it. This creates a structural
252 # reference.
253 e = self._lib.ENGINE_by_id(self._lib.Cryptography_osrandom_engine_id)
254 self.openssl_assert(e != self._ffi.NULL)
255 # Initialize the engine for use. This adds a functional reference.
256 res = self._lib.ENGINE_init(e)
257 self.openssl_assert(res == 1)
259 try:
260 yield e
261 finally:
262 # Decrement the structural ref incremented by ENGINE_by_id.
263 res = self._lib.ENGINE_free(e)
264 self.openssl_assert(res == 1)
265 # Decrement the functional ref incremented by ENGINE_init.
266 res = self._lib.ENGINE_finish(e)
267 self.openssl_assert(res == 1)
269 def activate_osrandom_engine(self) -> None:
270 if self._lib.CRYPTOGRAPHY_NEEDS_OSRANDOM_ENGINE:
271 # Unregister and free the current engine.
272 self.activate_builtin_random()
273 with self._get_osurandom_engine() as e:
274 # Set the engine as the default RAND provider.
275 res = self._lib.ENGINE_set_default_RAND(e)
276 self.openssl_assert(res == 1)
277 # Reset the RNG to use the engine
278 res = self._lib.RAND_set_rand_method(self._ffi.NULL)
279 self.openssl_assert(res == 1)
281 def osrandom_engine_implementation(self) -> str:
282 buf = self._ffi.new("char[]", 64)
283 with self._get_osurandom_engine() as e:
284 res = self._lib.ENGINE_ctrl_cmd(
285 e, b"get_implementation", len(buf), buf, self._ffi.NULL, 0
286 )
287 self.openssl_assert(res > 0)
288 return self._ffi.string(buf).decode("ascii")
290 def openssl_version_text(self) -> str:
291 """
292 Friendly string name of the loaded OpenSSL library. This is not
293 necessarily the same version as it was compiled against.
295 Example: OpenSSL 1.1.1d 10 Sep 2019
296 """
297 return self._ffi.string(
298 self._lib.OpenSSL_version(self._lib.OPENSSL_VERSION)
299 ).decode("ascii")
301 def openssl_version_number(self) -> int:
302 return self._lib.OpenSSL_version_num()
304 def create_hmac_ctx(
305 self, key: bytes, algorithm: hashes.HashAlgorithm
306 ) -> _HMACContext:
307 return _HMACContext(self, key, algorithm)
309 def _evp_md_from_algorithm(self, algorithm: hashes.HashAlgorithm):
310 if algorithm.name == "blake2b" or algorithm.name == "blake2s":
311 alg = "{}{}".format(
312 algorithm.name, algorithm.digest_size * 8
313 ).encode("ascii")
314 else:
315 alg = algorithm.name.encode("ascii")
317 evp_md = self._lib.EVP_get_digestbyname(alg)
318 return evp_md
320 def _evp_md_non_null_from_algorithm(self, algorithm: hashes.HashAlgorithm):
321 evp_md = self._evp_md_from_algorithm(algorithm)
322 self.openssl_assert(evp_md != self._ffi.NULL)
323 return evp_md
325 def hash_supported(self, algorithm: hashes.HashAlgorithm) -> bool:
326 if self._fips_enabled and not isinstance(algorithm, self._fips_hashes):
327 return False
329 evp_md = self._evp_md_from_algorithm(algorithm)
330 return evp_md != self._ffi.NULL
332 def signature_hash_supported(
333 self, algorithm: hashes.HashAlgorithm
334 ) -> bool:
335 # Dedicated check for hashing algorithm use in message digest for
336 # signatures, e.g. RSA PKCS#1 v1.5 SHA1 (sha1WithRSAEncryption).
337 if self._fips_enabled and isinstance(algorithm, hashes.SHA1):
338 return False
339 return self.hash_supported(algorithm)
341 def scrypt_supported(self) -> bool:
342 if self._fips_enabled:
343 return False
344 else:
345 return self._lib.Cryptography_HAS_SCRYPT == 1
347 def hmac_supported(self, algorithm: hashes.HashAlgorithm) -> bool:
348 # FIPS mode still allows SHA1 for HMAC
349 if self._fips_enabled and isinstance(algorithm, hashes.SHA1):
350 return True
352 return self.hash_supported(algorithm)
354 def create_hash_ctx(
355 self, algorithm: hashes.HashAlgorithm
356 ) -> hashes.HashContext:
357 return _HashContext(self, algorithm)
359 def cipher_supported(self, cipher: CipherAlgorithm, mode: Mode) -> bool:
360 if self._fips_enabled:
361 # FIPS mode requires AES. TripleDES is disallowed/deprecated in
362 # FIPS 140-3.
363 if not isinstance(cipher, self._fips_ciphers):
364 return False
366 try:
367 adapter = self._cipher_registry[type(cipher), type(mode)]
368 except KeyError:
369 return False
370 evp_cipher = adapter(self, cipher, mode)
371 return self._ffi.NULL != evp_cipher
373 def register_cipher_adapter(self, cipher_cls, mode_cls, adapter) -> None:
374 if (cipher_cls, mode_cls) in self._cipher_registry:
375 raise ValueError(
376 "Duplicate registration for: {} {}.".format(
377 cipher_cls, mode_cls
378 )
379 )
380 self._cipher_registry[cipher_cls, mode_cls] = adapter
382 def _register_default_ciphers(self) -> None:
383 for cipher_cls in [AES, AES128, AES256]:
384 for mode_cls in [CBC, CTR, ECB, OFB, CFB, CFB8, GCM]:
385 self.register_cipher_adapter(
386 cipher_cls,
387 mode_cls,
388 GetCipherByName(
389 "{cipher.name}-{cipher.key_size}-{mode.name}"
390 ),
391 )
392 for mode_cls in [CBC, CTR, ECB, OFB, CFB]:
393 self.register_cipher_adapter(
394 Camellia,
395 mode_cls,
396 GetCipherByName("{cipher.name}-{cipher.key_size}-{mode.name}"),
397 )
398 for mode_cls in [CBC, CFB, CFB8, OFB]:
399 self.register_cipher_adapter(
400 TripleDES, mode_cls, GetCipherByName("des-ede3-{mode.name}")
401 )
402 self.register_cipher_adapter(
403 TripleDES, ECB, GetCipherByName("des-ede3")
404 )
405 self.register_cipher_adapter(
406 ChaCha20, type(None), GetCipherByName("chacha20")
407 )
408 self.register_cipher_adapter(AES, XTS, _get_xts_cipher)
409 for mode_cls in [ECB, CBC, OFB, CFB, CTR]:
410 self.register_cipher_adapter(
411 SM4, mode_cls, GetCipherByName("sm4-{mode.name}")
412 )
413 # Don't register legacy ciphers if they're unavailable. Hypothetically
414 # this wouldn't be necessary because we test availability by seeing if
415 # we get an EVP_CIPHER * in the _CipherContext __init__, but OpenSSL 3
416 # will return a valid pointer even though the cipher is unavailable.
417 if (
418 self._binding._legacy_provider_loaded
419 or not self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER
420 ):
421 for mode_cls in [CBC, CFB, OFB, ECB]:
422 self.register_cipher_adapter(
423 _BlowfishInternal,
424 mode_cls,
425 GetCipherByName("bf-{mode.name}"),
426 )
427 for mode_cls in [CBC, CFB, OFB, ECB]:
428 self.register_cipher_adapter(
429 _SEEDInternal,
430 mode_cls,
431 GetCipherByName("seed-{mode.name}"),
432 )
433 for cipher_cls, mode_cls in itertools.product(
434 [_CAST5Internal, _IDEAInternal],
435 [CBC, OFB, CFB, ECB],
436 ):
437 self.register_cipher_adapter(
438 cipher_cls,
439 mode_cls,
440 GetCipherByName("{cipher.name}-{mode.name}"),
441 )
442 self.register_cipher_adapter(
443 ARC4, type(None), GetCipherByName("rc4")
444 )
445 # We don't actually support RC2, this is just used by some tests.
446 self.register_cipher_adapter(
447 _RC2, type(None), GetCipherByName("rc2")
448 )
450 def create_symmetric_encryption_ctx(
451 self, cipher: CipherAlgorithm, mode: Mode
452 ) -> _CipherContext:
453 return _CipherContext(self, cipher, mode, _CipherContext._ENCRYPT)
455 def create_symmetric_decryption_ctx(
456 self, cipher: CipherAlgorithm, mode: Mode
457 ) -> _CipherContext:
458 return _CipherContext(self, cipher, mode, _CipherContext._DECRYPT)
460 def pbkdf2_hmac_supported(self, algorithm: hashes.HashAlgorithm) -> bool:
461 return self.hmac_supported(algorithm)
463 def derive_pbkdf2_hmac(
464 self,
465 algorithm: hashes.HashAlgorithm,
466 length: int,
467 salt: bytes,
468 iterations: int,
469 key_material: bytes,
470 ) -> bytes:
471 buf = self._ffi.new("unsigned char[]", length)
472 evp_md = self._evp_md_non_null_from_algorithm(algorithm)
473 key_material_ptr = self._ffi.from_buffer(key_material)
474 res = self._lib.PKCS5_PBKDF2_HMAC(
475 key_material_ptr,
476 len(key_material),
477 salt,
478 len(salt),
479 iterations,
480 evp_md,
481 length,
482 buf,
483 )
484 self.openssl_assert(res == 1)
485 return self._ffi.buffer(buf)[:]
487 def _consume_errors(self) -> typing.List[binding._OpenSSLError]:
488 return binding._consume_errors(self._lib)
490 def _consume_errors_with_text(
491 self,
492 ) -> typing.List[binding._OpenSSLErrorWithText]:
493 return binding._consume_errors_with_text(self._lib)
495 def _bn_to_int(self, bn) -> int:
496 assert bn != self._ffi.NULL
497 self.openssl_assert(not self._lib.BN_is_negative(bn))
499 bn_num_bytes = self._lib.BN_num_bytes(bn)
500 bin_ptr = self._ffi.new("unsigned char[]", bn_num_bytes)
501 bin_len = self._lib.BN_bn2bin(bn, bin_ptr)
502 # A zero length means the BN has value 0
503 self.openssl_assert(bin_len >= 0)
504 val = int.from_bytes(self._ffi.buffer(bin_ptr)[:bin_len], "big")
505 return val
507 def _int_to_bn(self, num: int, bn=None):
508 """
509 Converts a python integer to a BIGNUM. The returned BIGNUM will not
510 be garbage collected (to support adding them to structs that take
511 ownership of the object). Be sure to register it for GC if it will
512 be discarded after use.
513 """
514 assert bn is None or bn != self._ffi.NULL
516 if bn is None:
517 bn = self._ffi.NULL
519 binary = num.to_bytes(int(num.bit_length() / 8.0 + 1), "big")
520 bn_ptr = self._lib.BN_bin2bn(binary, len(binary), bn)
521 self.openssl_assert(bn_ptr != self._ffi.NULL)
522 return bn_ptr
524 def generate_rsa_private_key(
525 self, public_exponent: int, key_size: int
526 ) -> rsa.RSAPrivateKey:
527 rsa._verify_rsa_parameters(public_exponent, key_size)
529 rsa_cdata = self._lib.RSA_new()
530 self.openssl_assert(rsa_cdata != self._ffi.NULL)
531 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
533 bn = self._int_to_bn(public_exponent)
534 bn = self._ffi.gc(bn, self._lib.BN_free)
536 res = self._lib.RSA_generate_key_ex(
537 rsa_cdata, key_size, bn, self._ffi.NULL
538 )
539 self.openssl_assert(res == 1)
540 evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata)
542 # We can skip RSA key validation here since we just generated the key
543 return _RSAPrivateKey(
544 self, rsa_cdata, evp_pkey, unsafe_skip_rsa_key_validation=True
545 )
547 def generate_rsa_parameters_supported(
548 self, public_exponent: int, key_size: int
549 ) -> bool:
550 return (
551 public_exponent >= 3
552 and public_exponent & 1 != 0
553 and key_size >= 512
554 )
556 def load_rsa_private_numbers(
557 self,
558 numbers: rsa.RSAPrivateNumbers,
559 unsafe_skip_rsa_key_validation: bool,
560 ) -> rsa.RSAPrivateKey:
561 rsa._check_private_key_components(
562 numbers.p,
563 numbers.q,
564 numbers.d,
565 numbers.dmp1,
566 numbers.dmq1,
567 numbers.iqmp,
568 numbers.public_numbers.e,
569 numbers.public_numbers.n,
570 )
571 rsa_cdata = self._lib.RSA_new()
572 self.openssl_assert(rsa_cdata != self._ffi.NULL)
573 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
574 p = self._int_to_bn(numbers.p)
575 q = self._int_to_bn(numbers.q)
576 d = self._int_to_bn(numbers.d)
577 dmp1 = self._int_to_bn(numbers.dmp1)
578 dmq1 = self._int_to_bn(numbers.dmq1)
579 iqmp = self._int_to_bn(numbers.iqmp)
580 e = self._int_to_bn(numbers.public_numbers.e)
581 n = self._int_to_bn(numbers.public_numbers.n)
582 res = self._lib.RSA_set0_factors(rsa_cdata, p, q)
583 self.openssl_assert(res == 1)
584 res = self._lib.RSA_set0_key(rsa_cdata, n, e, d)
585 self.openssl_assert(res == 1)
586 res = self._lib.RSA_set0_crt_params(rsa_cdata, dmp1, dmq1, iqmp)
587 self.openssl_assert(res == 1)
588 evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata)
590 return _RSAPrivateKey(
591 self,
592 rsa_cdata,
593 evp_pkey,
594 unsafe_skip_rsa_key_validation=unsafe_skip_rsa_key_validation,
595 )
597 def load_rsa_public_numbers(
598 self, numbers: rsa.RSAPublicNumbers
599 ) -> rsa.RSAPublicKey:
600 rsa._check_public_key_components(numbers.e, numbers.n)
601 rsa_cdata = self._lib.RSA_new()
602 self.openssl_assert(rsa_cdata != self._ffi.NULL)
603 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
604 e = self._int_to_bn(numbers.e)
605 n = self._int_to_bn(numbers.n)
606 res = self._lib.RSA_set0_key(rsa_cdata, n, e, self._ffi.NULL)
607 self.openssl_assert(res == 1)
608 evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata)
610 return _RSAPublicKey(self, rsa_cdata, evp_pkey)
612 def _create_evp_pkey_gc(self):
613 evp_pkey = self._lib.EVP_PKEY_new()
614 self.openssl_assert(evp_pkey != self._ffi.NULL)
615 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
616 return evp_pkey
618 def _rsa_cdata_to_evp_pkey(self, rsa_cdata):
619 evp_pkey = self._create_evp_pkey_gc()
620 res = self._lib.EVP_PKEY_set1_RSA(evp_pkey, rsa_cdata)
621 self.openssl_assert(res == 1)
622 return evp_pkey
624 def _bytes_to_bio(self, data: bytes) -> _MemoryBIO:
625 """
626 Return a _MemoryBIO namedtuple of (BIO, char*).
628 The char* is the storage for the BIO and it must stay alive until the
629 BIO is finished with.
630 """
631 data_ptr = self._ffi.from_buffer(data)
632 bio = self._lib.BIO_new_mem_buf(data_ptr, len(data))
633 self.openssl_assert(bio != self._ffi.NULL)
635 return _MemoryBIO(self._ffi.gc(bio, self._lib.BIO_free), data_ptr)
637 def _create_mem_bio_gc(self):
638 """
639 Creates an empty memory BIO.
640 """
641 bio_method = self._lib.BIO_s_mem()
642 self.openssl_assert(bio_method != self._ffi.NULL)
643 bio = self._lib.BIO_new(bio_method)
644 self.openssl_assert(bio != self._ffi.NULL)
645 bio = self._ffi.gc(bio, self._lib.BIO_free)
646 return bio
648 def _read_mem_bio(self, bio) -> bytes:
649 """
650 Reads a memory BIO. This only works on memory BIOs.
651 """
652 buf = self._ffi.new("char **")
653 buf_len = self._lib.BIO_get_mem_data(bio, buf)
654 self.openssl_assert(buf_len > 0)
655 self.openssl_assert(buf[0] != self._ffi.NULL)
656 bio_data = self._ffi.buffer(buf[0], buf_len)[:]
657 return bio_data
659 def _evp_pkey_to_private_key(
660 self, evp_pkey, unsafe_skip_rsa_key_validation: bool
661 ) -> PrivateKeyTypes:
662 """
663 Return the appropriate type of PrivateKey given an evp_pkey cdata
664 pointer.
665 """
667 key_type = self._lib.EVP_PKEY_id(evp_pkey)
669 if key_type == self._lib.EVP_PKEY_RSA:
670 rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey)
671 self.openssl_assert(rsa_cdata != self._ffi.NULL)
672 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
673 return _RSAPrivateKey(
674 self,
675 rsa_cdata,
676 evp_pkey,
677 unsafe_skip_rsa_key_validation=unsafe_skip_rsa_key_validation,
678 )
679 elif (
680 key_type == self._lib.EVP_PKEY_RSA_PSS
681 and not self._lib.CRYPTOGRAPHY_IS_LIBRESSL
682 and not self._lib.CRYPTOGRAPHY_IS_BORINGSSL
683 and not self._lib.CRYPTOGRAPHY_OPENSSL_LESS_THAN_111E
684 ):
685 # At the moment the way we handle RSA PSS keys is to strip the
686 # PSS constraints from them and treat them as normal RSA keys
687 # Unfortunately the RSA * itself tracks this data so we need to
688 # extract, serialize, and reload it without the constraints.
689 rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey)
690 self.openssl_assert(rsa_cdata != self._ffi.NULL)
691 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
692 bio = self._create_mem_bio_gc()
693 res = self._lib.i2d_RSAPrivateKey_bio(bio, rsa_cdata)
694 self.openssl_assert(res == 1)
695 return self.load_der_private_key(
696 self._read_mem_bio(bio),
697 password=None,
698 unsafe_skip_rsa_key_validation=unsafe_skip_rsa_key_validation,
699 )
700 elif key_type == self._lib.EVP_PKEY_DSA:
701 dsa_cdata = self._lib.EVP_PKEY_get1_DSA(evp_pkey)
702 self.openssl_assert(dsa_cdata != self._ffi.NULL)
703 dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free)
704 return _DSAPrivateKey(self, dsa_cdata, evp_pkey)
705 elif key_type == self._lib.EVP_PKEY_EC:
706 ec_cdata = self._lib.EVP_PKEY_get1_EC_KEY(evp_pkey)
707 self.openssl_assert(ec_cdata != self._ffi.NULL)
708 ec_cdata = self._ffi.gc(ec_cdata, self._lib.EC_KEY_free)
709 return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey)
710 elif key_type in self._dh_types:
711 dh_cdata = self._lib.EVP_PKEY_get1_DH(evp_pkey)
712 self.openssl_assert(dh_cdata != self._ffi.NULL)
713 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
714 return _DHPrivateKey(self, dh_cdata, evp_pkey)
715 elif key_type == getattr(self._lib, "EVP_PKEY_ED25519", None):
716 # EVP_PKEY_ED25519 is not present in CRYPTOGRAPHY_IS_LIBRESSL
717 return _Ed25519PrivateKey(self, evp_pkey)
718 elif key_type == getattr(self._lib, "EVP_PKEY_X448", None):
719 # EVP_PKEY_X448 is not present in CRYPTOGRAPHY_IS_LIBRESSL
720 return _X448PrivateKey(self, evp_pkey)
721 elif key_type == self._lib.EVP_PKEY_X25519:
722 return _X25519PrivateKey(self, evp_pkey)
723 elif key_type == getattr(self._lib, "EVP_PKEY_ED448", None):
724 # EVP_PKEY_ED448 is not present in CRYPTOGRAPHY_IS_LIBRESSL
725 return _Ed448PrivateKey(self, evp_pkey)
726 else:
727 raise UnsupportedAlgorithm("Unsupported key type.")
729 def _evp_pkey_to_public_key(self, evp_pkey) -> PublicKeyTypes:
730 """
731 Return the appropriate type of PublicKey given an evp_pkey cdata
732 pointer.
733 """
735 key_type = self._lib.EVP_PKEY_id(evp_pkey)
737 if key_type == self._lib.EVP_PKEY_RSA:
738 rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey)
739 self.openssl_assert(rsa_cdata != self._ffi.NULL)
740 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
741 return _RSAPublicKey(self, rsa_cdata, evp_pkey)
742 elif (
743 key_type == self._lib.EVP_PKEY_RSA_PSS
744 and not self._lib.CRYPTOGRAPHY_IS_LIBRESSL
745 and not self._lib.CRYPTOGRAPHY_IS_BORINGSSL
746 and not self._lib.CRYPTOGRAPHY_OPENSSL_LESS_THAN_111E
747 ):
748 rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey)
749 self.openssl_assert(rsa_cdata != self._ffi.NULL)
750 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
751 bio = self._create_mem_bio_gc()
752 res = self._lib.i2d_RSAPublicKey_bio(bio, rsa_cdata)
753 self.openssl_assert(res == 1)
754 return self.load_der_public_key(self._read_mem_bio(bio))
755 elif key_type == self._lib.EVP_PKEY_DSA:
756 dsa_cdata = self._lib.EVP_PKEY_get1_DSA(evp_pkey)
757 self.openssl_assert(dsa_cdata != self._ffi.NULL)
758 dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free)
759 return _DSAPublicKey(self, dsa_cdata, evp_pkey)
760 elif key_type == self._lib.EVP_PKEY_EC:
761 ec_cdata = self._lib.EVP_PKEY_get1_EC_KEY(evp_pkey)
762 if ec_cdata == self._ffi.NULL:
763 errors = self._consume_errors_with_text()
764 raise ValueError("Unable to load EC key", errors)
765 ec_cdata = self._ffi.gc(ec_cdata, self._lib.EC_KEY_free)
766 return _EllipticCurvePublicKey(self, ec_cdata, evp_pkey)
767 elif key_type in self._dh_types:
768 dh_cdata = self._lib.EVP_PKEY_get1_DH(evp_pkey)
769 self.openssl_assert(dh_cdata != self._ffi.NULL)
770 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
771 return _DHPublicKey(self, dh_cdata, evp_pkey)
772 elif key_type == getattr(self._lib, "EVP_PKEY_ED25519", None):
773 # EVP_PKEY_ED25519 is not present in CRYPTOGRAPHY_IS_LIBRESSL
774 return _Ed25519PublicKey(self, evp_pkey)
775 elif key_type == getattr(self._lib, "EVP_PKEY_X448", None):
776 # EVP_PKEY_X448 is not present in CRYPTOGRAPHY_IS_LIBRESSL
777 return _X448PublicKey(self, evp_pkey)
778 elif key_type == self._lib.EVP_PKEY_X25519:
779 return _X25519PublicKey(self, evp_pkey)
780 elif key_type == getattr(self._lib, "EVP_PKEY_ED448", None):
781 # EVP_PKEY_ED448 is not present in CRYPTOGRAPHY_IS_LIBRESSL
782 return _Ed448PublicKey(self, evp_pkey)
783 else:
784 raise UnsupportedAlgorithm("Unsupported key type.")
786 def _oaep_hash_supported(self, algorithm: hashes.HashAlgorithm) -> bool:
787 if self._fips_enabled and isinstance(algorithm, hashes.SHA1):
788 return False
790 return isinstance(
791 algorithm,
792 (
793 hashes.SHA1,
794 hashes.SHA224,
795 hashes.SHA256,
796 hashes.SHA384,
797 hashes.SHA512,
798 ),
799 )
801 def rsa_padding_supported(self, padding: AsymmetricPadding) -> bool:
802 if isinstance(padding, PKCS1v15):
803 return True
804 elif isinstance(padding, PSS) and isinstance(padding._mgf, MGF1):
805 # SHA1 is permissible in MGF1 in FIPS even when SHA1 is blocked
806 # as signature algorithm.
807 if self._fips_enabled and isinstance(
808 padding._mgf._algorithm, hashes.SHA1
809 ):
810 return True
811 else:
812 return self.hash_supported(padding._mgf._algorithm)
813 elif isinstance(padding, OAEP) and isinstance(padding._mgf, MGF1):
814 return self._oaep_hash_supported(
815 padding._mgf._algorithm
816 ) and self._oaep_hash_supported(padding._algorithm)
817 else:
818 return False
820 def rsa_encryption_supported(self, padding: AsymmetricPadding) -> bool:
821 if self._fips_enabled and isinstance(padding, PKCS1v15):
822 return False
823 else:
824 return self.rsa_padding_supported(padding)
826 def generate_dsa_parameters(self, key_size: int) -> dsa.DSAParameters:
827 if key_size not in (1024, 2048, 3072, 4096):
828 raise ValueError(
829 "Key size must be 1024, 2048, 3072, or 4096 bits."
830 )
832 ctx = self._lib.DSA_new()
833 self.openssl_assert(ctx != self._ffi.NULL)
834 ctx = self._ffi.gc(ctx, self._lib.DSA_free)
836 res = self._lib.DSA_generate_parameters_ex(
837 ctx,
838 key_size,
839 self._ffi.NULL,
840 0,
841 self._ffi.NULL,
842 self._ffi.NULL,
843 self._ffi.NULL,
844 )
846 self.openssl_assert(res == 1)
848 return _DSAParameters(self, ctx)
850 def generate_dsa_private_key(
851 self, parameters: dsa.DSAParameters
852 ) -> dsa.DSAPrivateKey:
853 ctx = self._lib.DSAparams_dup(
854 parameters._dsa_cdata # type: ignore[attr-defined]
855 )
856 self.openssl_assert(ctx != self._ffi.NULL)
857 ctx = self._ffi.gc(ctx, self._lib.DSA_free)
858 self._lib.DSA_generate_key(ctx)
859 evp_pkey = self._dsa_cdata_to_evp_pkey(ctx)
861 return _DSAPrivateKey(self, ctx, evp_pkey)
863 def generate_dsa_private_key_and_parameters(
864 self, key_size: int
865 ) -> dsa.DSAPrivateKey:
866 parameters = self.generate_dsa_parameters(key_size)
867 return self.generate_dsa_private_key(parameters)
869 def _dsa_cdata_set_values(
870 self, dsa_cdata, p, q, g, pub_key, priv_key
871 ) -> None:
872 res = self._lib.DSA_set0_pqg(dsa_cdata, p, q, g)
873 self.openssl_assert(res == 1)
874 res = self._lib.DSA_set0_key(dsa_cdata, pub_key, priv_key)
875 self.openssl_assert(res == 1)
877 def load_dsa_private_numbers(
878 self, numbers: dsa.DSAPrivateNumbers
879 ) -> dsa.DSAPrivateKey:
880 dsa._check_dsa_private_numbers(numbers)
881 parameter_numbers = numbers.public_numbers.parameter_numbers
883 dsa_cdata = self._lib.DSA_new()
884 self.openssl_assert(dsa_cdata != self._ffi.NULL)
885 dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free)
887 p = self._int_to_bn(parameter_numbers.p)
888 q = self._int_to_bn(parameter_numbers.q)
889 g = self._int_to_bn(parameter_numbers.g)
890 pub_key = self._int_to_bn(numbers.public_numbers.y)
891 priv_key = self._int_to_bn(numbers.x)
892 self._dsa_cdata_set_values(dsa_cdata, p, q, g, pub_key, priv_key)
894 evp_pkey = self._dsa_cdata_to_evp_pkey(dsa_cdata)
896 return _DSAPrivateKey(self, dsa_cdata, evp_pkey)
898 def load_dsa_public_numbers(
899 self, numbers: dsa.DSAPublicNumbers
900 ) -> dsa.DSAPublicKey:
901 dsa._check_dsa_parameters(numbers.parameter_numbers)
902 dsa_cdata = self._lib.DSA_new()
903 self.openssl_assert(dsa_cdata != self._ffi.NULL)
904 dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free)
906 p = self._int_to_bn(numbers.parameter_numbers.p)
907 q = self._int_to_bn(numbers.parameter_numbers.q)
908 g = self._int_to_bn(numbers.parameter_numbers.g)
909 pub_key = self._int_to_bn(numbers.y)
910 priv_key = self._ffi.NULL
911 self._dsa_cdata_set_values(dsa_cdata, p, q, g, pub_key, priv_key)
913 evp_pkey = self._dsa_cdata_to_evp_pkey(dsa_cdata)
915 return _DSAPublicKey(self, dsa_cdata, evp_pkey)
917 def load_dsa_parameter_numbers(
918 self, numbers: dsa.DSAParameterNumbers
919 ) -> dsa.DSAParameters:
920 dsa._check_dsa_parameters(numbers)
921 dsa_cdata = self._lib.DSA_new()
922 self.openssl_assert(dsa_cdata != self._ffi.NULL)
923 dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free)
925 p = self._int_to_bn(numbers.p)
926 q = self._int_to_bn(numbers.q)
927 g = self._int_to_bn(numbers.g)
928 res = self._lib.DSA_set0_pqg(dsa_cdata, p, q, g)
929 self.openssl_assert(res == 1)
931 return _DSAParameters(self, dsa_cdata)
933 def _dsa_cdata_to_evp_pkey(self, dsa_cdata):
934 evp_pkey = self._create_evp_pkey_gc()
935 res = self._lib.EVP_PKEY_set1_DSA(evp_pkey, dsa_cdata)
936 self.openssl_assert(res == 1)
937 return evp_pkey
939 def dsa_supported(self) -> bool:
940 return not self._fips_enabled
942 def dsa_hash_supported(self, algorithm: hashes.HashAlgorithm) -> bool:
943 if not self.dsa_supported():
944 return False
945 return self.signature_hash_supported(algorithm)
947 def cmac_algorithm_supported(self, algorithm) -> bool:
948 return self.cipher_supported(
949 algorithm, CBC(b"\x00" * algorithm.block_size)
950 )
952 def create_cmac_ctx(self, algorithm: BlockCipherAlgorithm) -> _CMACContext:
953 return _CMACContext(self, algorithm)
955 def load_pem_private_key(
956 self,
957 data: bytes,
958 password: typing.Optional[bytes],
959 unsafe_skip_rsa_key_validation: bool,
960 ) -> PrivateKeyTypes:
961 return self._load_key(
962 self._lib.PEM_read_bio_PrivateKey,
963 data,
964 password,
965 unsafe_skip_rsa_key_validation,
966 )
968 def load_pem_public_key(self, data: bytes) -> PublicKeyTypes:
969 mem_bio = self._bytes_to_bio(data)
970 # In OpenSSL 3.0.x the PEM_read_bio_PUBKEY function will invoke
971 # the default password callback if you pass an encrypted private
972 # key. This is very, very, very bad as the default callback can
973 # trigger an interactive console prompt, which will hang the
974 # Python process. We therefore provide our own callback to
975 # catch this and error out properly.
976 userdata = self._ffi.new("CRYPTOGRAPHY_PASSWORD_DATA *")
977 evp_pkey = self._lib.PEM_read_bio_PUBKEY(
978 mem_bio.bio,
979 self._ffi.NULL,
980 self._ffi.addressof(
981 self._lib._original_lib, "Cryptography_pem_password_cb"
982 ),
983 userdata,
984 )
985 if evp_pkey != self._ffi.NULL:
986 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
987 return self._evp_pkey_to_public_key(evp_pkey)
988 else:
989 # It's not a (RSA/DSA/ECDSA) subjectPublicKeyInfo, but we still
990 # need to check to see if it is a pure PKCS1 RSA public key (not
991 # embedded in a subjectPublicKeyInfo)
992 self._consume_errors()
993 res = self._lib.BIO_reset(mem_bio.bio)
994 self.openssl_assert(res == 1)
995 rsa_cdata = self._lib.PEM_read_bio_RSAPublicKey(
996 mem_bio.bio,
997 self._ffi.NULL,
998 self._ffi.addressof(
999 self._lib._original_lib, "Cryptography_pem_password_cb"
1000 ),
1001 userdata,
1002 )
1003 if rsa_cdata != self._ffi.NULL:
1004 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
1005 evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata)
1006 return _RSAPublicKey(self, rsa_cdata, evp_pkey)
1007 else:
1008 self._handle_key_loading_error()
1010 def load_pem_parameters(self, data: bytes) -> dh.DHParameters:
1011 mem_bio = self._bytes_to_bio(data)
1012 # only DH is supported currently
1013 dh_cdata = self._lib.PEM_read_bio_DHparams(
1014 mem_bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL
1015 )
1016 if dh_cdata != self._ffi.NULL:
1017 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
1018 return _DHParameters(self, dh_cdata)
1019 else:
1020 self._handle_key_loading_error()
1022 def load_der_private_key(
1023 self,
1024 data: bytes,
1025 password: typing.Optional[bytes],
1026 unsafe_skip_rsa_key_validation: bool,
1027 ) -> PrivateKeyTypes:
1028 # OpenSSL has a function called d2i_AutoPrivateKey that in theory
1029 # handles this automatically, however it doesn't handle encrypted
1030 # private keys. Instead we try to load the key two different ways.
1031 # First we'll try to load it as a traditional key.
1032 bio_data = self._bytes_to_bio(data)
1033 key = self._evp_pkey_from_der_traditional_key(bio_data, password)
1034 if key:
1035 return self._evp_pkey_to_private_key(
1036 key, unsafe_skip_rsa_key_validation
1037 )
1038 else:
1039 # Finally we try to load it with the method that handles encrypted
1040 # PKCS8 properly.
1041 return self._load_key(
1042 self._lib.d2i_PKCS8PrivateKey_bio,
1043 data,
1044 password,
1045 unsafe_skip_rsa_key_validation,
1046 )
1048 def _evp_pkey_from_der_traditional_key(self, bio_data, password):
1049 key = self._lib.d2i_PrivateKey_bio(bio_data.bio, self._ffi.NULL)
1050 if key != self._ffi.NULL:
1051 key = self._ffi.gc(key, self._lib.EVP_PKEY_free)
1052 if password is not None:
1053 raise TypeError(
1054 "Password was given but private key is not encrypted."
1055 )
1057 return key
1058 else:
1059 self._consume_errors()
1060 return None
1062 def load_der_public_key(self, data: bytes) -> PublicKeyTypes:
1063 mem_bio = self._bytes_to_bio(data)
1064 evp_pkey = self._lib.d2i_PUBKEY_bio(mem_bio.bio, self._ffi.NULL)
1065 if evp_pkey != self._ffi.NULL:
1066 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
1067 return self._evp_pkey_to_public_key(evp_pkey)
1068 else:
1069 # It's not a (RSA/DSA/ECDSA) subjectPublicKeyInfo, but we still
1070 # need to check to see if it is a pure PKCS1 RSA public key (not
1071 # embedded in a subjectPublicKeyInfo)
1072 self._consume_errors()
1073 res = self._lib.BIO_reset(mem_bio.bio)
1074 self.openssl_assert(res == 1)
1075 rsa_cdata = self._lib.d2i_RSAPublicKey_bio(
1076 mem_bio.bio, self._ffi.NULL
1077 )
1078 if rsa_cdata != self._ffi.NULL:
1079 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
1080 evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata)
1081 return _RSAPublicKey(self, rsa_cdata, evp_pkey)
1082 else:
1083 self._handle_key_loading_error()
1085 def load_der_parameters(self, data: bytes) -> dh.DHParameters:
1086 mem_bio = self._bytes_to_bio(data)
1087 dh_cdata = self._lib.d2i_DHparams_bio(mem_bio.bio, self._ffi.NULL)
1088 if dh_cdata != self._ffi.NULL:
1089 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
1090 return _DHParameters(self, dh_cdata)
1091 elif self._lib.Cryptography_HAS_EVP_PKEY_DHX:
1092 # We check to see if the is dhx.
1093 self._consume_errors()
1094 res = self._lib.BIO_reset(mem_bio.bio)
1095 self.openssl_assert(res == 1)
1096 dh_cdata = self._lib.d2i_DHxparams_bio(mem_bio.bio, self._ffi.NULL)
1097 if dh_cdata != self._ffi.NULL:
1098 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
1099 return _DHParameters(self, dh_cdata)
1101 self._handle_key_loading_error()
1103 def _cert2ossl(self, cert: x509.Certificate) -> typing.Any:
1104 data = cert.public_bytes(serialization.Encoding.DER)
1105 mem_bio = self._bytes_to_bio(data)
1106 x509 = self._lib.d2i_X509_bio(mem_bio.bio, self._ffi.NULL)
1107 self.openssl_assert(x509 != self._ffi.NULL)
1108 x509 = self._ffi.gc(x509, self._lib.X509_free)
1109 return x509
1111 def _ossl2cert(self, x509_ptr: typing.Any) -> x509.Certificate:
1112 bio = self._create_mem_bio_gc()
1113 res = self._lib.i2d_X509_bio(bio, x509_ptr)
1114 self.openssl_assert(res == 1)
1115 return x509.load_der_x509_certificate(self._read_mem_bio(bio))
1117 def _check_keys_correspond(self, key1, key2) -> None:
1118 if self._lib.EVP_PKEY_cmp(key1._evp_pkey, key2._evp_pkey) != 1:
1119 raise ValueError("Keys do not correspond")
1121 def _load_key(
1122 self, openssl_read_func, data, password, unsafe_skip_rsa_key_validation
1123 ) -> PrivateKeyTypes:
1124 mem_bio = self._bytes_to_bio(data)
1126 userdata = self._ffi.new("CRYPTOGRAPHY_PASSWORD_DATA *")
1127 if password is not None:
1128 utils._check_byteslike("password", password)
1129 password_ptr = self._ffi.from_buffer(password)
1130 userdata.password = password_ptr
1131 userdata.length = len(password)
1133 evp_pkey = openssl_read_func(
1134 mem_bio.bio,
1135 self._ffi.NULL,
1136 self._ffi.addressof(
1137 self._lib._original_lib, "Cryptography_pem_password_cb"
1138 ),
1139 userdata,
1140 )
1142 if evp_pkey == self._ffi.NULL:
1143 if userdata.error != 0:
1144 self._consume_errors()
1145 if userdata.error == -1:
1146 raise TypeError(
1147 "Password was not given but private key is encrypted"
1148 )
1149 else:
1150 assert userdata.error == -2
1151 raise ValueError(
1152 "Passwords longer than {} bytes are not supported "
1153 "by this backend.".format(userdata.maxsize - 1)
1154 )
1155 else:
1156 self._handle_key_loading_error()
1158 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
1160 if password is not None and userdata.called == 0:
1161 raise TypeError(
1162 "Password was given but private key is not encrypted."
1163 )
1165 assert (
1166 password is not None and userdata.called == 1
1167 ) or password is None
1169 return self._evp_pkey_to_private_key(
1170 evp_pkey, unsafe_skip_rsa_key_validation
1171 )
1173 def _handle_key_loading_error(self) -> typing.NoReturn:
1174 errors = self._consume_errors()
1176 if not errors:
1177 raise ValueError(
1178 "Could not deserialize key data. The data may be in an "
1179 "incorrect format or it may be encrypted with an unsupported "
1180 "algorithm."
1181 )
1183 elif (
1184 errors[0]._lib_reason_match(
1185 self._lib.ERR_LIB_EVP, self._lib.EVP_R_BAD_DECRYPT
1186 )
1187 or errors[0]._lib_reason_match(
1188 self._lib.ERR_LIB_PKCS12,
1189 self._lib.PKCS12_R_PKCS12_CIPHERFINAL_ERROR,
1190 )
1191 or (
1192 self._lib.Cryptography_HAS_PROVIDERS
1193 and errors[0]._lib_reason_match(
1194 self._lib.ERR_LIB_PROV,
1195 self._lib.PROV_R_BAD_DECRYPT,
1196 )
1197 )
1198 ):
1199 raise ValueError("Bad decrypt. Incorrect password?")
1201 elif any(
1202 error._lib_reason_match(
1203 self._lib.ERR_LIB_EVP,
1204 self._lib.EVP_R_UNSUPPORTED_PRIVATE_KEY_ALGORITHM,
1205 )
1206 for error in errors
1207 ):
1208 raise ValueError("Unsupported public key algorithm.")
1210 else:
1211 errors_with_text = binding._errors_with_text(errors)
1212 raise ValueError(
1213 "Could not deserialize key data. The data may be in an "
1214 "incorrect format, it may be encrypted with an unsupported "
1215 "algorithm, or it may be an unsupported key type (e.g. EC "
1216 "curves with explicit parameters).",
1217 errors_with_text,
1218 )
1220 def elliptic_curve_supported(self, curve: ec.EllipticCurve) -> bool:
1221 try:
1222 curve_nid = self._elliptic_curve_to_nid(curve)
1223 except UnsupportedAlgorithm:
1224 curve_nid = self._lib.NID_undef
1226 group = self._lib.EC_GROUP_new_by_curve_name(curve_nid)
1228 if group == self._ffi.NULL:
1229 self._consume_errors()
1230 return False
1231 else:
1232 self.openssl_assert(curve_nid != self._lib.NID_undef)
1233 self._lib.EC_GROUP_free(group)
1234 return True
1236 def elliptic_curve_signature_algorithm_supported(
1237 self,
1238 signature_algorithm: ec.EllipticCurveSignatureAlgorithm,
1239 curve: ec.EllipticCurve,
1240 ) -> bool:
1241 # We only support ECDSA right now.
1242 if not isinstance(signature_algorithm, ec.ECDSA):
1243 return False
1245 return self.elliptic_curve_supported(curve)
1247 def generate_elliptic_curve_private_key(
1248 self, curve: ec.EllipticCurve
1249 ) -> ec.EllipticCurvePrivateKey:
1250 """
1251 Generate a new private key on the named curve.
1252 """
1254 if self.elliptic_curve_supported(curve):
1255 ec_cdata = self._ec_key_new_by_curve(curve)
1257 res = self._lib.EC_KEY_generate_key(ec_cdata)
1258 self.openssl_assert(res == 1)
1260 evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata)
1262 return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey)
1263 else:
1264 raise UnsupportedAlgorithm(
1265 f"Backend object does not support {curve.name}.",
1266 _Reasons.UNSUPPORTED_ELLIPTIC_CURVE,
1267 )
1269 def load_elliptic_curve_private_numbers(
1270 self, numbers: ec.EllipticCurvePrivateNumbers
1271 ) -> ec.EllipticCurvePrivateKey:
1272 public = numbers.public_numbers
1274 ec_cdata = self._ec_key_new_by_curve(public.curve)
1276 private_value = self._ffi.gc(
1277 self._int_to_bn(numbers.private_value), self._lib.BN_clear_free
1278 )
1279 res = self._lib.EC_KEY_set_private_key(ec_cdata, private_value)
1280 if res != 1:
1281 self._consume_errors()
1282 raise ValueError("Invalid EC key.")
1284 with self._tmp_bn_ctx() as bn_ctx:
1285 self._ec_key_set_public_key_affine_coordinates(
1286 ec_cdata, public.x, public.y, bn_ctx
1287 )
1288 # derive the expected public point and compare it to the one we
1289 # just set based on the values we were given. If they don't match
1290 # this isn't a valid key pair.
1291 group = self._lib.EC_KEY_get0_group(ec_cdata)
1292 self.openssl_assert(group != self._ffi.NULL)
1293 set_point = backend._lib.EC_KEY_get0_public_key(ec_cdata)
1294 self.openssl_assert(set_point != self._ffi.NULL)
1295 computed_point = self._lib.EC_POINT_new(group)
1296 self.openssl_assert(computed_point != self._ffi.NULL)
1297 computed_point = self._ffi.gc(
1298 computed_point, self._lib.EC_POINT_free
1299 )
1300 res = self._lib.EC_POINT_mul(
1301 group,
1302 computed_point,
1303 private_value,
1304 self._ffi.NULL,
1305 self._ffi.NULL,
1306 bn_ctx,
1307 )
1308 self.openssl_assert(res == 1)
1309 if (
1310 self._lib.EC_POINT_cmp(
1311 group, set_point, computed_point, bn_ctx
1312 )
1313 != 0
1314 ):
1315 raise ValueError("Invalid EC key.")
1317 evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata)
1319 return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey)
1321 def load_elliptic_curve_public_numbers(
1322 self, numbers: ec.EllipticCurvePublicNumbers
1323 ) -> ec.EllipticCurvePublicKey:
1324 ec_cdata = self._ec_key_new_by_curve(numbers.curve)
1325 with self._tmp_bn_ctx() as bn_ctx:
1326 self._ec_key_set_public_key_affine_coordinates(
1327 ec_cdata, numbers.x, numbers.y, bn_ctx
1328 )
1329 evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata)
1331 return _EllipticCurvePublicKey(self, ec_cdata, evp_pkey)
1333 def load_elliptic_curve_public_bytes(
1334 self, curve: ec.EllipticCurve, point_bytes: bytes
1335 ) -> ec.EllipticCurvePublicKey:
1336 ec_cdata = self._ec_key_new_by_curve(curve)
1337 group = self._lib.EC_KEY_get0_group(ec_cdata)
1338 self.openssl_assert(group != self._ffi.NULL)
1339 point = self._lib.EC_POINT_new(group)
1340 self.openssl_assert(point != self._ffi.NULL)
1341 point = self._ffi.gc(point, self._lib.EC_POINT_free)
1342 with self._tmp_bn_ctx() as bn_ctx:
1343 res = self._lib.EC_POINT_oct2point(
1344 group, point, point_bytes, len(point_bytes), bn_ctx
1345 )
1346 if res != 1:
1347 self._consume_errors()
1348 raise ValueError("Invalid public bytes for the given curve")
1350 res = self._lib.EC_KEY_set_public_key(ec_cdata, point)
1351 self.openssl_assert(res == 1)
1352 evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata)
1353 return _EllipticCurvePublicKey(self, ec_cdata, evp_pkey)
1355 def derive_elliptic_curve_private_key(
1356 self, private_value: int, curve: ec.EllipticCurve
1357 ) -> ec.EllipticCurvePrivateKey:
1358 ec_cdata = self._ec_key_new_by_curve(curve)
1360 group = self._lib.EC_KEY_get0_group(ec_cdata)
1361 self.openssl_assert(group != self._ffi.NULL)
1363 point = self._lib.EC_POINT_new(group)
1364 self.openssl_assert(point != self._ffi.NULL)
1365 point = self._ffi.gc(point, self._lib.EC_POINT_free)
1367 value = self._int_to_bn(private_value)
1368 value = self._ffi.gc(value, self._lib.BN_clear_free)
1370 with self._tmp_bn_ctx() as bn_ctx:
1371 res = self._lib.EC_POINT_mul(
1372 group, point, value, self._ffi.NULL, self._ffi.NULL, bn_ctx
1373 )
1374 self.openssl_assert(res == 1)
1376 bn_x = self._lib.BN_CTX_get(bn_ctx)
1377 bn_y = self._lib.BN_CTX_get(bn_ctx)
1379 res = self._lib.EC_POINT_get_affine_coordinates(
1380 group, point, bn_x, bn_y, bn_ctx
1381 )
1382 if res != 1:
1383 self._consume_errors()
1384 raise ValueError("Unable to derive key from private_value")
1386 res = self._lib.EC_KEY_set_public_key(ec_cdata, point)
1387 self.openssl_assert(res == 1)
1388 private = self._int_to_bn(private_value)
1389 private = self._ffi.gc(private, self._lib.BN_clear_free)
1390 res = self._lib.EC_KEY_set_private_key(ec_cdata, private)
1391 self.openssl_assert(res == 1)
1393 evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata)
1395 return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey)
1397 def _ec_key_new_by_curve(self, curve: ec.EllipticCurve):
1398 curve_nid = self._elliptic_curve_to_nid(curve)
1399 return self._ec_key_new_by_curve_nid(curve_nid)
1401 def _ec_key_new_by_curve_nid(self, curve_nid: int):
1402 ec_cdata = self._lib.EC_KEY_new_by_curve_name(curve_nid)
1403 self.openssl_assert(ec_cdata != self._ffi.NULL)
1404 return self._ffi.gc(ec_cdata, self._lib.EC_KEY_free)
1406 def elliptic_curve_exchange_algorithm_supported(
1407 self, algorithm: ec.ECDH, curve: ec.EllipticCurve
1408 ) -> bool:
1409 if self._fips_enabled and not isinstance(
1410 curve, self._fips_ecdh_curves
1411 ):
1412 return False
1414 return self.elliptic_curve_supported(curve) and isinstance(
1415 algorithm, ec.ECDH
1416 )
1418 def _ec_cdata_to_evp_pkey(self, ec_cdata):
1419 evp_pkey = self._create_evp_pkey_gc()
1420 res = self._lib.EVP_PKEY_set1_EC_KEY(evp_pkey, ec_cdata)
1421 self.openssl_assert(res == 1)
1422 return evp_pkey
1424 def _elliptic_curve_to_nid(self, curve: ec.EllipticCurve) -> int:
1425 """
1426 Get the NID for a curve name.
1427 """
1429 curve_aliases = {"secp192r1": "prime192v1", "secp256r1": "prime256v1"}
1431 curve_name = curve_aliases.get(curve.name, curve.name)
1433 curve_nid = self._lib.OBJ_sn2nid(curve_name.encode())
1434 if curve_nid == self._lib.NID_undef:
1435 raise UnsupportedAlgorithm(
1436 f"{curve.name} is not a supported elliptic curve",
1437 _Reasons.UNSUPPORTED_ELLIPTIC_CURVE,
1438 )
1439 return curve_nid
1441 @contextmanager
1442 def _tmp_bn_ctx(self):
1443 bn_ctx = self._lib.BN_CTX_new()
1444 self.openssl_assert(bn_ctx != self._ffi.NULL)
1445 bn_ctx = self._ffi.gc(bn_ctx, self._lib.BN_CTX_free)
1446 self._lib.BN_CTX_start(bn_ctx)
1447 try:
1448 yield bn_ctx
1449 finally:
1450 self._lib.BN_CTX_end(bn_ctx)
1452 def _ec_key_set_public_key_affine_coordinates(
1453 self,
1454 ec_cdata,
1455 x: int,
1456 y: int,
1457 bn_ctx,
1458 ) -> None:
1459 """
1460 Sets the public key point in the EC_KEY context to the affine x and y
1461 values.
1462 """
1464 if x < 0 or y < 0:
1465 raise ValueError(
1466 "Invalid EC key. Both x and y must be non-negative."
1467 )
1469 x = self._ffi.gc(self._int_to_bn(x), self._lib.BN_free)
1470 y = self._ffi.gc(self._int_to_bn(y), self._lib.BN_free)
1471 group = self._lib.EC_KEY_get0_group(ec_cdata)
1472 self.openssl_assert(group != self._ffi.NULL)
1473 point = self._lib.EC_POINT_new(group)
1474 self.openssl_assert(point != self._ffi.NULL)
1475 point = self._ffi.gc(point, self._lib.EC_POINT_free)
1476 res = self._lib.EC_POINT_set_affine_coordinates(
1477 group, point, x, y, bn_ctx
1478 )
1479 if res != 1:
1480 self._consume_errors()
1481 raise ValueError("Invalid EC key.")
1482 res = self._lib.EC_KEY_set_public_key(ec_cdata, point)
1483 self.openssl_assert(res == 1)
1485 def _private_key_bytes(
1486 self,
1487 encoding: serialization.Encoding,
1488 format: serialization.PrivateFormat,
1489 encryption_algorithm: serialization.KeySerializationEncryption,
1490 key,
1491 evp_pkey,
1492 cdata,
1493 ) -> bytes:
1494 # validate argument types
1495 if not isinstance(encoding, serialization.Encoding):
1496 raise TypeError("encoding must be an item from the Encoding enum")
1497 if not isinstance(format, serialization.PrivateFormat):
1498 raise TypeError(
1499 "format must be an item from the PrivateFormat enum"
1500 )
1501 if not isinstance(
1502 encryption_algorithm, serialization.KeySerializationEncryption
1503 ):
1504 raise TypeError(
1505 "Encryption algorithm must be a KeySerializationEncryption "
1506 "instance"
1507 )
1509 # validate password
1510 if isinstance(encryption_algorithm, serialization.NoEncryption):
1511 password = b""
1512 elif isinstance(
1513 encryption_algorithm, serialization.BestAvailableEncryption
1514 ):
1515 password = encryption_algorithm.password
1516 if len(password) > 1023:
1517 raise ValueError(
1518 "Passwords longer than 1023 bytes are not supported by "
1519 "this backend"
1520 )
1521 elif (
1522 isinstance(
1523 encryption_algorithm, serialization._KeySerializationEncryption
1524 )
1525 and encryption_algorithm._format
1526 is format
1527 is serialization.PrivateFormat.OpenSSH
1528 ):
1529 password = encryption_algorithm.password
1530 else:
1531 raise ValueError("Unsupported encryption type")
1533 # PKCS8 + PEM/DER
1534 if format is serialization.PrivateFormat.PKCS8:
1535 if encoding is serialization.Encoding.PEM:
1536 write_bio = self._lib.PEM_write_bio_PKCS8PrivateKey
1537 elif encoding is serialization.Encoding.DER:
1538 write_bio = self._lib.i2d_PKCS8PrivateKey_bio
1539 else:
1540 raise ValueError("Unsupported encoding for PKCS8")
1541 return self._private_key_bytes_via_bio(
1542 write_bio, evp_pkey, password
1543 )
1545 # TraditionalOpenSSL + PEM/DER
1546 if format is serialization.PrivateFormat.TraditionalOpenSSL:
1547 if self._fips_enabled and not isinstance(
1548 encryption_algorithm, serialization.NoEncryption
1549 ):
1550 raise ValueError(
1551 "Encrypted traditional OpenSSL format is not "
1552 "supported in FIPS mode."
1553 )
1554 key_type = self._lib.EVP_PKEY_id(evp_pkey)
1556 if encoding is serialization.Encoding.PEM:
1557 if key_type == self._lib.EVP_PKEY_RSA:
1558 write_bio = self._lib.PEM_write_bio_RSAPrivateKey
1559 elif key_type == self._lib.EVP_PKEY_DSA:
1560 write_bio = self._lib.PEM_write_bio_DSAPrivateKey
1561 elif key_type == self._lib.EVP_PKEY_EC:
1562 write_bio = self._lib.PEM_write_bio_ECPrivateKey
1563 else:
1564 raise ValueError(
1565 "Unsupported key type for TraditionalOpenSSL"
1566 )
1567 return self._private_key_bytes_via_bio(
1568 write_bio, cdata, password
1569 )
1571 if encoding is serialization.Encoding.DER:
1572 if password:
1573 raise ValueError(
1574 "Encryption is not supported for DER encoded "
1575 "traditional OpenSSL keys"
1576 )
1577 if key_type == self._lib.EVP_PKEY_RSA:
1578 write_bio = self._lib.i2d_RSAPrivateKey_bio
1579 elif key_type == self._lib.EVP_PKEY_EC:
1580 write_bio = self._lib.i2d_ECPrivateKey_bio
1581 elif key_type == self._lib.EVP_PKEY_DSA:
1582 write_bio = self._lib.i2d_DSAPrivateKey_bio
1583 else:
1584 raise ValueError(
1585 "Unsupported key type for TraditionalOpenSSL"
1586 )
1587 return self._bio_func_output(write_bio, cdata)
1589 raise ValueError("Unsupported encoding for TraditionalOpenSSL")
1591 # OpenSSH + PEM
1592 if format is serialization.PrivateFormat.OpenSSH:
1593 if encoding is serialization.Encoding.PEM:
1594 return ssh._serialize_ssh_private_key(
1595 key, password, encryption_algorithm
1596 )
1598 raise ValueError(
1599 "OpenSSH private key format can only be used"
1600 " with PEM encoding"
1601 )
1603 # Anything that key-specific code was supposed to handle earlier,
1604 # like Raw.
1605 raise ValueError("format is invalid with this key")
1607 def _private_key_bytes_via_bio(
1608 self, write_bio, evp_pkey, password
1609 ) -> bytes:
1610 if not password:
1611 evp_cipher = self._ffi.NULL
1612 else:
1613 # This is a curated value that we will update over time.
1614 evp_cipher = self._lib.EVP_get_cipherbyname(b"aes-256-cbc")
1616 return self._bio_func_output(
1617 write_bio,
1618 evp_pkey,
1619 evp_cipher,
1620 password,
1621 len(password),
1622 self._ffi.NULL,
1623 self._ffi.NULL,
1624 )
1626 def _bio_func_output(self, write_bio, *args) -> bytes:
1627 bio = self._create_mem_bio_gc()
1628 res = write_bio(bio, *args)
1629 self.openssl_assert(res == 1)
1630 return self._read_mem_bio(bio)
1632 def _public_key_bytes(
1633 self,
1634 encoding: serialization.Encoding,
1635 format: serialization.PublicFormat,
1636 key,
1637 evp_pkey,
1638 cdata,
1639 ) -> bytes:
1640 if not isinstance(encoding, serialization.Encoding):
1641 raise TypeError("encoding must be an item from the Encoding enum")
1642 if not isinstance(format, serialization.PublicFormat):
1643 raise TypeError(
1644 "format must be an item from the PublicFormat enum"
1645 )
1647 # SubjectPublicKeyInfo + PEM/DER
1648 if format is serialization.PublicFormat.SubjectPublicKeyInfo:
1649 if encoding is serialization.Encoding.PEM:
1650 write_bio = self._lib.PEM_write_bio_PUBKEY
1651 elif encoding is serialization.Encoding.DER:
1652 write_bio = self._lib.i2d_PUBKEY_bio
1653 else:
1654 raise ValueError(
1655 "SubjectPublicKeyInfo works only with PEM or DER encoding"
1656 )
1657 return self._bio_func_output(write_bio, evp_pkey)
1659 # PKCS1 + PEM/DER
1660 if format is serialization.PublicFormat.PKCS1:
1661 # Only RSA is supported here.
1662 key_type = self._lib.EVP_PKEY_id(evp_pkey)
1663 if key_type != self._lib.EVP_PKEY_RSA:
1664 raise ValueError("PKCS1 format is supported only for RSA keys")
1666 if encoding is serialization.Encoding.PEM:
1667 write_bio = self._lib.PEM_write_bio_RSAPublicKey
1668 elif encoding is serialization.Encoding.DER:
1669 write_bio = self._lib.i2d_RSAPublicKey_bio
1670 else:
1671 raise ValueError("PKCS1 works only with PEM or DER encoding")
1672 return self._bio_func_output(write_bio, cdata)
1674 # OpenSSH + OpenSSH
1675 if format is serialization.PublicFormat.OpenSSH:
1676 if encoding is serialization.Encoding.OpenSSH:
1677 return ssh.serialize_ssh_public_key(key)
1679 raise ValueError(
1680 "OpenSSH format must be used with OpenSSH encoding"
1681 )
1683 # Anything that key-specific code was supposed to handle earlier,
1684 # like Raw, CompressedPoint, UncompressedPoint
1685 raise ValueError("format is invalid with this key")
1687 def dh_supported(self) -> bool:
1688 return not self._lib.CRYPTOGRAPHY_IS_BORINGSSL
1690 def generate_dh_parameters(
1691 self, generator: int, key_size: int
1692 ) -> dh.DHParameters:
1693 if key_size < dh._MIN_MODULUS_SIZE:
1694 raise ValueError(
1695 "DH key_size must be at least {} bits".format(
1696 dh._MIN_MODULUS_SIZE
1697 )
1698 )
1700 if generator not in (2, 5):
1701 raise ValueError("DH generator must be 2 or 5")
1703 dh_param_cdata = self._lib.DH_new()
1704 self.openssl_assert(dh_param_cdata != self._ffi.NULL)
1705 dh_param_cdata = self._ffi.gc(dh_param_cdata, self._lib.DH_free)
1707 res = self._lib.DH_generate_parameters_ex(
1708 dh_param_cdata, key_size, generator, self._ffi.NULL
1709 )
1710 if res != 1:
1711 errors = self._consume_errors_with_text()
1712 raise ValueError("Unable to generate DH parameters", errors)
1714 return _DHParameters(self, dh_param_cdata)
1716 def _dh_cdata_to_evp_pkey(self, dh_cdata):
1717 evp_pkey = self._create_evp_pkey_gc()
1718 res = self._lib.EVP_PKEY_set1_DH(evp_pkey, dh_cdata)
1719 self.openssl_assert(res == 1)
1720 return evp_pkey
1722 def generate_dh_private_key(
1723 self, parameters: dh.DHParameters
1724 ) -> dh.DHPrivateKey:
1725 dh_key_cdata = _dh_params_dup(
1726 parameters._dh_cdata, self # type: ignore[attr-defined]
1727 )
1729 res = self._lib.DH_generate_key(dh_key_cdata)
1730 self.openssl_assert(res == 1)
1732 evp_pkey = self._dh_cdata_to_evp_pkey(dh_key_cdata)
1734 return _DHPrivateKey(self, dh_key_cdata, evp_pkey)
1736 def generate_dh_private_key_and_parameters(
1737 self, generator: int, key_size: int
1738 ) -> dh.DHPrivateKey:
1739 return self.generate_dh_private_key(
1740 self.generate_dh_parameters(generator, key_size)
1741 )
1743 def load_dh_private_numbers(
1744 self, numbers: dh.DHPrivateNumbers
1745 ) -> dh.DHPrivateKey:
1746 parameter_numbers = numbers.public_numbers.parameter_numbers
1748 dh_cdata = self._lib.DH_new()
1749 self.openssl_assert(dh_cdata != self._ffi.NULL)
1750 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
1752 p = self._int_to_bn(parameter_numbers.p)
1753 g = self._int_to_bn(parameter_numbers.g)
1755 if parameter_numbers.q is not None:
1756 q = self._int_to_bn(parameter_numbers.q)
1757 else:
1758 q = self._ffi.NULL
1760 pub_key = self._int_to_bn(numbers.public_numbers.y)
1761 priv_key = self._int_to_bn(numbers.x)
1763 res = self._lib.DH_set0_pqg(dh_cdata, p, q, g)
1764 self.openssl_assert(res == 1)
1766 res = self._lib.DH_set0_key(dh_cdata, pub_key, priv_key)
1767 self.openssl_assert(res == 1)
1769 codes = self._ffi.new("int[]", 1)
1770 res = self._lib.DH_check(dh_cdata, codes)
1771 self.openssl_assert(res == 1)
1773 # DH_check will return DH_NOT_SUITABLE_GENERATOR if p % 24 does not
1774 # equal 11 when the generator is 2 (a quadratic nonresidue).
1775 # We want to ignore that error because p % 24 == 23 is also fine.
1776 # Specifically, g is then a quadratic residue. Within the context of
1777 # Diffie-Hellman this means it can only generate half the possible
1778 # values. That sounds bad, but quadratic nonresidues leak a bit of
1779 # the key to the attacker in exchange for having the full key space
1780 # available. See: https://crypto.stackexchange.com/questions/12961
1781 if codes[0] != 0 and not (
1782 parameter_numbers.g == 2
1783 and codes[0] ^ self._lib.DH_NOT_SUITABLE_GENERATOR == 0
1784 ):
1785 raise ValueError("DH private numbers did not pass safety checks.")
1787 evp_pkey = self._dh_cdata_to_evp_pkey(dh_cdata)
1789 return _DHPrivateKey(self, dh_cdata, evp_pkey)
1791 def load_dh_public_numbers(
1792 self, numbers: dh.DHPublicNumbers
1793 ) -> dh.DHPublicKey:
1794 dh_cdata = self._lib.DH_new()
1795 self.openssl_assert(dh_cdata != self._ffi.NULL)
1796 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
1798 parameter_numbers = numbers.parameter_numbers
1800 p = self._int_to_bn(parameter_numbers.p)
1801 g = self._int_to_bn(parameter_numbers.g)
1803 if parameter_numbers.q is not None:
1804 q = self._int_to_bn(parameter_numbers.q)
1805 else:
1806 q = self._ffi.NULL
1808 pub_key = self._int_to_bn(numbers.y)
1810 res = self._lib.DH_set0_pqg(dh_cdata, p, q, g)
1811 self.openssl_assert(res == 1)
1813 res = self._lib.DH_set0_key(dh_cdata, pub_key, self._ffi.NULL)
1814 self.openssl_assert(res == 1)
1816 evp_pkey = self._dh_cdata_to_evp_pkey(dh_cdata)
1818 return _DHPublicKey(self, dh_cdata, evp_pkey)
1820 def load_dh_parameter_numbers(
1821 self, numbers: dh.DHParameterNumbers
1822 ) -> dh.DHParameters:
1823 dh_cdata = self._lib.DH_new()
1824 self.openssl_assert(dh_cdata != self._ffi.NULL)
1825 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
1827 p = self._int_to_bn(numbers.p)
1828 g = self._int_to_bn(numbers.g)
1830 if numbers.q is not None:
1831 q = self._int_to_bn(numbers.q)
1832 else:
1833 q = self._ffi.NULL
1835 res = self._lib.DH_set0_pqg(dh_cdata, p, q, g)
1836 self.openssl_assert(res == 1)
1838 return _DHParameters(self, dh_cdata)
1840 def dh_parameters_supported(
1841 self, p: int, g: int, q: typing.Optional[int] = None
1842 ) -> bool:
1843 dh_cdata = self._lib.DH_new()
1844 self.openssl_assert(dh_cdata != self._ffi.NULL)
1845 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
1847 p = self._int_to_bn(p)
1848 g = self._int_to_bn(g)
1850 if q is not None:
1851 q = self._int_to_bn(q)
1852 else:
1853 q = self._ffi.NULL
1855 res = self._lib.DH_set0_pqg(dh_cdata, p, q, g)
1856 self.openssl_assert(res == 1)
1858 codes = self._ffi.new("int[]", 1)
1859 res = self._lib.DH_check(dh_cdata, codes)
1860 self.openssl_assert(res == 1)
1862 return codes[0] == 0
1864 def dh_x942_serialization_supported(self) -> bool:
1865 return self._lib.Cryptography_HAS_EVP_PKEY_DHX == 1
1867 def x25519_load_public_bytes(self, data: bytes) -> x25519.X25519PublicKey:
1868 if len(data) != 32:
1869 raise ValueError("An X25519 public key is 32 bytes long")
1871 data_ptr = self._ffi.from_buffer(data)
1872 evp_pkey = self._lib.EVP_PKEY_new_raw_public_key(
1873 self._lib.NID_X25519, self._ffi.NULL, data_ptr, len(data)
1874 )
1875 self.openssl_assert(evp_pkey != self._ffi.NULL)
1876 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
1877 return _X25519PublicKey(self, evp_pkey)
1879 def x25519_load_private_bytes(
1880 self, data: bytes
1881 ) -> x25519.X25519PrivateKey:
1882 if len(data) != 32:
1883 raise ValueError("An X25519 private key is 32 bytes long")
1885 data_ptr = self._ffi.from_buffer(data)
1886 evp_pkey = self._lib.EVP_PKEY_new_raw_private_key(
1887 self._lib.NID_X25519, self._ffi.NULL, data_ptr, len(data)
1888 )
1889 self.openssl_assert(evp_pkey != self._ffi.NULL)
1890 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
1891 return _X25519PrivateKey(self, evp_pkey)
1893 def _evp_pkey_keygen_gc(self, nid):
1894 evp_pkey_ctx = self._lib.EVP_PKEY_CTX_new_id(nid, self._ffi.NULL)
1895 self.openssl_assert(evp_pkey_ctx != self._ffi.NULL)
1896 evp_pkey_ctx = self._ffi.gc(evp_pkey_ctx, self._lib.EVP_PKEY_CTX_free)
1897 res = self._lib.EVP_PKEY_keygen_init(evp_pkey_ctx)
1898 self.openssl_assert(res == 1)
1899 evp_ppkey = self._ffi.new("EVP_PKEY **")
1900 res = self._lib.EVP_PKEY_keygen(evp_pkey_ctx, evp_ppkey)
1901 self.openssl_assert(res == 1)
1902 self.openssl_assert(evp_ppkey[0] != self._ffi.NULL)
1903 evp_pkey = self._ffi.gc(evp_ppkey[0], self._lib.EVP_PKEY_free)
1904 return evp_pkey
1906 def x25519_generate_key(self) -> x25519.X25519PrivateKey:
1907 evp_pkey = self._evp_pkey_keygen_gc(self._lib.NID_X25519)
1908 return _X25519PrivateKey(self, evp_pkey)
1910 def x25519_supported(self) -> bool:
1911 if self._fips_enabled:
1912 return False
1913 return not self._lib.CRYPTOGRAPHY_LIBRESSL_LESS_THAN_370
1915 def x448_load_public_bytes(self, data: bytes) -> x448.X448PublicKey:
1916 if len(data) != 56:
1917 raise ValueError("An X448 public key is 56 bytes long")
1919 evp_pkey = self._lib.EVP_PKEY_new_raw_public_key(
1920 self._lib.NID_X448, self._ffi.NULL, data, len(data)
1921 )
1922 self.openssl_assert(evp_pkey != self._ffi.NULL)
1923 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
1924 return _X448PublicKey(self, evp_pkey)
1926 def x448_load_private_bytes(self, data: bytes) -> x448.X448PrivateKey:
1927 if len(data) != 56:
1928 raise ValueError("An X448 private key is 56 bytes long")
1930 data_ptr = self._ffi.from_buffer(data)
1931 evp_pkey = self._lib.EVP_PKEY_new_raw_private_key(
1932 self._lib.NID_X448, self._ffi.NULL, data_ptr, len(data)
1933 )
1934 self.openssl_assert(evp_pkey != self._ffi.NULL)
1935 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
1936 return _X448PrivateKey(self, evp_pkey)
1938 def x448_generate_key(self) -> x448.X448PrivateKey:
1939 evp_pkey = self._evp_pkey_keygen_gc(self._lib.NID_X448)
1940 return _X448PrivateKey(self, evp_pkey)
1942 def x448_supported(self) -> bool:
1943 if self._fips_enabled:
1944 return False
1945 return (
1946 not self._lib.CRYPTOGRAPHY_IS_LIBRESSL
1947 and not self._lib.CRYPTOGRAPHY_IS_BORINGSSL
1948 )
1950 def ed25519_supported(self) -> bool:
1951 if self._fips_enabled:
1952 return False
1953 return self._lib.CRYPTOGRAPHY_HAS_WORKING_ED25519
1955 def ed25519_load_public_bytes(
1956 self, data: bytes
1957 ) -> ed25519.Ed25519PublicKey:
1958 utils._check_bytes("data", data)
1960 if len(data) != ed25519._ED25519_KEY_SIZE:
1961 raise ValueError("An Ed25519 public key is 32 bytes long")
1963 evp_pkey = self._lib.EVP_PKEY_new_raw_public_key(
1964 self._lib.NID_ED25519, self._ffi.NULL, data, len(data)
1965 )
1966 self.openssl_assert(evp_pkey != self._ffi.NULL)
1967 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
1969 return _Ed25519PublicKey(self, evp_pkey)
1971 def ed25519_load_private_bytes(
1972 self, data: bytes
1973 ) -> ed25519.Ed25519PrivateKey:
1974 if len(data) != ed25519._ED25519_KEY_SIZE:
1975 raise ValueError("An Ed25519 private key is 32 bytes long")
1977 utils._check_byteslike("data", data)
1978 data_ptr = self._ffi.from_buffer(data)
1979 evp_pkey = self._lib.EVP_PKEY_new_raw_private_key(
1980 self._lib.NID_ED25519, self._ffi.NULL, data_ptr, len(data)
1981 )
1982 self.openssl_assert(evp_pkey != self._ffi.NULL)
1983 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
1985 return _Ed25519PrivateKey(self, evp_pkey)
1987 def ed25519_generate_key(self) -> ed25519.Ed25519PrivateKey:
1988 evp_pkey = self._evp_pkey_keygen_gc(self._lib.NID_ED25519)
1989 return _Ed25519PrivateKey(self, evp_pkey)
1991 def ed448_supported(self) -> bool:
1992 if self._fips_enabled:
1993 return False
1994 return (
1995 not self._lib.CRYPTOGRAPHY_OPENSSL_LESS_THAN_111B
1996 and not self._lib.CRYPTOGRAPHY_IS_BORINGSSL
1997 )
1999 def ed448_load_public_bytes(self, data: bytes) -> ed448.Ed448PublicKey:
2000 utils._check_bytes("data", data)
2001 if len(data) != _ED448_KEY_SIZE:
2002 raise ValueError("An Ed448 public key is 57 bytes long")
2004 evp_pkey = self._lib.EVP_PKEY_new_raw_public_key(
2005 self._lib.NID_ED448, self._ffi.NULL, data, len(data)
2006 )
2007 self.openssl_assert(evp_pkey != self._ffi.NULL)
2008 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
2010 return _Ed448PublicKey(self, evp_pkey)
2012 def ed448_load_private_bytes(self, data: bytes) -> ed448.Ed448PrivateKey:
2013 utils._check_byteslike("data", data)
2014 if len(data) != _ED448_KEY_SIZE:
2015 raise ValueError("An Ed448 private key is 57 bytes long")
2017 data_ptr = self._ffi.from_buffer(data)
2018 evp_pkey = self._lib.EVP_PKEY_new_raw_private_key(
2019 self._lib.NID_ED448, self._ffi.NULL, data_ptr, len(data)
2020 )
2021 self.openssl_assert(evp_pkey != self._ffi.NULL)
2022 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
2024 return _Ed448PrivateKey(self, evp_pkey)
2026 def ed448_generate_key(self) -> ed448.Ed448PrivateKey:
2027 evp_pkey = self._evp_pkey_keygen_gc(self._lib.NID_ED448)
2028 return _Ed448PrivateKey(self, evp_pkey)
2030 def derive_scrypt(
2031 self,
2032 key_material: bytes,
2033 salt: bytes,
2034 length: int,
2035 n: int,
2036 r: int,
2037 p: int,
2038 ) -> bytes:
2039 buf = self._ffi.new("unsigned char[]", length)
2040 key_material_ptr = self._ffi.from_buffer(key_material)
2041 res = self._lib.EVP_PBE_scrypt(
2042 key_material_ptr,
2043 len(key_material),
2044 salt,
2045 len(salt),
2046 n,
2047 r,
2048 p,
2049 scrypt._MEM_LIMIT,
2050 buf,
2051 length,
2052 )
2053 if res != 1:
2054 errors = self._consume_errors_with_text()
2055 # memory required formula explained here:
2056 # https://blog.filippo.io/the-scrypt-parameters/
2057 min_memory = 128 * n * r // (1024**2)
2058 raise MemoryError(
2059 "Not enough memory to derive key. These parameters require"
2060 " {} MB of memory.".format(min_memory),
2061 errors,
2062 )
2063 return self._ffi.buffer(buf)[:]
2065 def aead_cipher_supported(self, cipher) -> bool:
2066 cipher_name = aead._aead_cipher_name(cipher)
2067 if self._fips_enabled and cipher_name not in self._fips_aead:
2068 return False
2069 # SIV isn't loaded through get_cipherbyname but instead a new fetch API
2070 # only available in 3.0+. But if we know we're on 3.0+ then we know
2071 # it's supported.
2072 if cipher_name.endswith(b"-siv"):
2073 return self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER == 1
2074 else:
2075 return (
2076 self._lib.EVP_get_cipherbyname(cipher_name) != self._ffi.NULL
2077 )
2079 def _zero_data(self, data, length: int) -> None:
2080 # We clear things this way because at the moment we're not
2081 # sure of a better way that can guarantee it overwrites the
2082 # memory of a bytearray and doesn't just replace the underlying char *.
2083 for i in range(length):
2084 data[i] = 0
2086 @contextlib.contextmanager
2087 def _zeroed_null_terminated_buf(self, data):
2088 """
2089 This method takes bytes, which can be a bytestring or a mutable
2090 buffer like a bytearray, and yields a null-terminated version of that
2091 data. This is required because PKCS12_parse doesn't take a length with
2092 its password char * and ffi.from_buffer doesn't provide null
2093 termination. So, to support zeroing the data via bytearray we
2094 need to build this ridiculous construct that copies the memory, but
2095 zeroes it after use.
2096 """
2097 if data is None:
2098 yield self._ffi.NULL
2099 else:
2100 data_len = len(data)
2101 buf = self._ffi.new("char[]", data_len + 1)
2102 self._ffi.memmove(buf, data, data_len)
2103 try:
2104 yield buf
2105 finally:
2106 # Cast to a uint8_t * so we can assign by integer
2107 self._zero_data(self._ffi.cast("uint8_t *", buf), data_len)
2109 def load_key_and_certificates_from_pkcs12(
2110 self, data: bytes, password: typing.Optional[bytes]
2111 ) -> typing.Tuple[
2112 typing.Optional[PrivateKeyTypes],
2113 typing.Optional[x509.Certificate],
2114 typing.List[x509.Certificate],
2115 ]:
2116 pkcs12 = self.load_pkcs12(data, password)
2117 return (
2118 pkcs12.key,
2119 pkcs12.cert.certificate if pkcs12.cert else None,
2120 [cert.certificate for cert in pkcs12.additional_certs],
2121 )
2123 def load_pkcs12(
2124 self, data: bytes, password: typing.Optional[bytes]
2125 ) -> PKCS12KeyAndCertificates:
2126 if password is not None:
2127 utils._check_byteslike("password", password)
2129 bio = self._bytes_to_bio(data)
2130 p12 = self._lib.d2i_PKCS12_bio(bio.bio, self._ffi.NULL)
2131 if p12 == self._ffi.NULL:
2132 self._consume_errors()
2133 raise ValueError("Could not deserialize PKCS12 data")
2135 p12 = self._ffi.gc(p12, self._lib.PKCS12_free)
2136 evp_pkey_ptr = self._ffi.new("EVP_PKEY **")
2137 x509_ptr = self._ffi.new("X509 **")
2138 sk_x509_ptr = self._ffi.new("Cryptography_STACK_OF_X509 **")
2139 with self._zeroed_null_terminated_buf(password) as password_buf:
2140 res = self._lib.PKCS12_parse(
2141 p12, password_buf, evp_pkey_ptr, x509_ptr, sk_x509_ptr
2142 )
2143 if res == 0:
2144 self._consume_errors()
2145 raise ValueError("Invalid password or PKCS12 data")
2147 cert = None
2148 key = None
2149 additional_certificates = []
2151 if evp_pkey_ptr[0] != self._ffi.NULL:
2152 evp_pkey = self._ffi.gc(evp_pkey_ptr[0], self._lib.EVP_PKEY_free)
2153 # We don't support turning off RSA key validation when loading
2154 # PKCS12 keys
2155 key = self._evp_pkey_to_private_key(
2156 evp_pkey, unsafe_skip_rsa_key_validation=False
2157 )
2159 if x509_ptr[0] != self._ffi.NULL:
2160 x509 = self._ffi.gc(x509_ptr[0], self._lib.X509_free)
2161 cert_obj = self._ossl2cert(x509)
2162 name = None
2163 maybe_name = self._lib.X509_alias_get0(x509, self._ffi.NULL)
2164 if maybe_name != self._ffi.NULL:
2165 name = self._ffi.string(maybe_name)
2166 cert = PKCS12Certificate(cert_obj, name)
2168 if sk_x509_ptr[0] != self._ffi.NULL:
2169 sk_x509 = self._ffi.gc(sk_x509_ptr[0], self._lib.sk_X509_free)
2170 num = self._lib.sk_X509_num(sk_x509_ptr[0])
2172 # In OpenSSL < 3.0.0 PKCS12 parsing reverses the order of the
2173 # certificates.
2174 indices: typing.Iterable[int]
2175 if (
2176 self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER
2177 or self._lib.CRYPTOGRAPHY_IS_BORINGSSL
2178 ):
2179 indices = range(num)
2180 else:
2181 indices = reversed(range(num))
2183 for i in indices:
2184 x509 = self._lib.sk_X509_value(sk_x509, i)
2185 self.openssl_assert(x509 != self._ffi.NULL)
2186 x509 = self._ffi.gc(x509, self._lib.X509_free)
2187 addl_cert = self._ossl2cert(x509)
2188 addl_name = None
2189 maybe_name = self._lib.X509_alias_get0(x509, self._ffi.NULL)
2190 if maybe_name != self._ffi.NULL:
2191 addl_name = self._ffi.string(maybe_name)
2192 additional_certificates.append(
2193 PKCS12Certificate(addl_cert, addl_name)
2194 )
2196 return PKCS12KeyAndCertificates(key, cert, additional_certificates)
2198 def serialize_key_and_certificates_to_pkcs12(
2199 self,
2200 name: typing.Optional[bytes],
2201 key: typing.Optional[PKCS12PrivateKeyTypes],
2202 cert: typing.Optional[x509.Certificate],
2203 cas: typing.Optional[typing.List[_PKCS12CATypes]],
2204 encryption_algorithm: serialization.KeySerializationEncryption,
2205 ) -> bytes:
2206 password = None
2207 if name is not None:
2208 utils._check_bytes("name", name)
2210 if isinstance(encryption_algorithm, serialization.NoEncryption):
2211 nid_cert = -1
2212 nid_key = -1
2213 pkcs12_iter = 0
2214 mac_iter = 0
2215 mac_alg = self._ffi.NULL
2216 elif isinstance(
2217 encryption_algorithm, serialization.BestAvailableEncryption
2218 ):
2219 # PKCS12 encryption is hopeless trash and can never be fixed.
2220 # OpenSSL 3 supports PBESv2, but Libre and Boring do not, so
2221 # we use PBESv1 with 3DES on the older paths.
2222 if self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER:
2223 nid_cert = self._lib.NID_aes_256_cbc
2224 nid_key = self._lib.NID_aes_256_cbc
2225 else:
2226 nid_cert = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC
2227 nid_key = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC
2228 # At least we can set this higher than OpenSSL's default
2229 pkcs12_iter = 20000
2230 # mac_iter chosen for compatibility reasons, see:
2231 # https://www.openssl.org/docs/man1.1.1/man3/PKCS12_create.html
2232 # Did we mention how lousy PKCS12 encryption is?
2233 mac_iter = 1
2234 # MAC algorithm can only be set on OpenSSL 3.0.0+
2235 mac_alg = self._ffi.NULL
2236 password = encryption_algorithm.password
2237 elif (
2238 isinstance(
2239 encryption_algorithm, serialization._KeySerializationEncryption
2240 )
2241 and encryption_algorithm._format
2242 is serialization.PrivateFormat.PKCS12
2243 ):
2244 # Default to OpenSSL's defaults. Behavior will vary based on the
2245 # version of OpenSSL cryptography is compiled against.
2246 nid_cert = 0
2247 nid_key = 0
2248 # Use the default iters we use in best available
2249 pkcs12_iter = 20000
2250 # See the Best Available comment for why this is 1
2251 mac_iter = 1
2252 password = encryption_algorithm.password
2253 keycertalg = encryption_algorithm._key_cert_algorithm
2254 if keycertalg is PBES.PBESv1SHA1And3KeyTripleDESCBC:
2255 nid_cert = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC
2256 nid_key = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC
2257 elif keycertalg is PBES.PBESv2SHA256AndAES256CBC:
2258 if not self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER:
2259 raise UnsupportedAlgorithm(
2260 "PBESv2 is not supported by this version of OpenSSL"
2261 )
2262 nid_cert = self._lib.NID_aes_256_cbc
2263 nid_key = self._lib.NID_aes_256_cbc
2264 else:
2265 assert keycertalg is None
2266 # We use OpenSSL's defaults
2268 if encryption_algorithm._hmac_hash is not None:
2269 if not self._lib.Cryptography_HAS_PKCS12_SET_MAC:
2270 raise UnsupportedAlgorithm(
2271 "Setting MAC algorithm is not supported by this "
2272 "version of OpenSSL."
2273 )
2274 mac_alg = self._evp_md_non_null_from_algorithm(
2275 encryption_algorithm._hmac_hash
2276 )
2277 self.openssl_assert(mac_alg != self._ffi.NULL)
2278 else:
2279 mac_alg = self._ffi.NULL
2281 if encryption_algorithm._kdf_rounds is not None:
2282 pkcs12_iter = encryption_algorithm._kdf_rounds
2284 else:
2285 raise ValueError("Unsupported key encryption type")
2287 if cas is None or len(cas) == 0:
2288 sk_x509 = self._ffi.NULL
2289 else:
2290 sk_x509 = self._lib.sk_X509_new_null()
2291 sk_x509 = self._ffi.gc(sk_x509, self._lib.sk_X509_free)
2293 # This list is to keep the x509 values alive until end of function
2294 ossl_cas = []
2295 for ca in cas:
2296 if isinstance(ca, PKCS12Certificate):
2297 ca_alias = ca.friendly_name
2298 ossl_ca = self._cert2ossl(ca.certificate)
2299 if ca_alias is None:
2300 res = self._lib.X509_alias_set1(
2301 ossl_ca, self._ffi.NULL, -1
2302 )
2303 else:
2304 res = self._lib.X509_alias_set1(
2305 ossl_ca, ca_alias, len(ca_alias)
2306 )
2307 self.openssl_assert(res == 1)
2308 else:
2309 ossl_ca = self._cert2ossl(ca)
2310 ossl_cas.append(ossl_ca)
2311 res = self._lib.sk_X509_push(sk_x509, ossl_ca)
2312 backend.openssl_assert(res >= 1)
2314 with self._zeroed_null_terminated_buf(password) as password_buf:
2315 with self._zeroed_null_terminated_buf(name) as name_buf:
2316 ossl_cert = self._cert2ossl(cert) if cert else self._ffi.NULL
2317 if key is not None:
2318 evp_pkey = key._evp_pkey # type: ignore[union-attr]
2319 else:
2320 evp_pkey = self._ffi.NULL
2322 p12 = self._lib.PKCS12_create(
2323 password_buf,
2324 name_buf,
2325 evp_pkey,
2326 ossl_cert,
2327 sk_x509,
2328 nid_key,
2329 nid_cert,
2330 pkcs12_iter,
2331 mac_iter,
2332 0,
2333 )
2335 if (
2336 self._lib.Cryptography_HAS_PKCS12_SET_MAC
2337 and mac_alg != self._ffi.NULL
2338 ):
2339 self._lib.PKCS12_set_mac(
2340 p12,
2341 password_buf,
2342 -1,
2343 self._ffi.NULL,
2344 0,
2345 mac_iter,
2346 mac_alg,
2347 )
2349 self.openssl_assert(p12 != self._ffi.NULL)
2350 p12 = self._ffi.gc(p12, self._lib.PKCS12_free)
2352 bio = self._create_mem_bio_gc()
2353 res = self._lib.i2d_PKCS12_bio(bio, p12)
2354 self.openssl_assert(res > 0)
2355 return self._read_mem_bio(bio)
2357 def poly1305_supported(self) -> bool:
2358 if self._fips_enabled:
2359 return False
2360 return self._lib.Cryptography_HAS_POLY1305 == 1
2362 def create_poly1305_ctx(self, key: bytes) -> _Poly1305Context:
2363 utils._check_byteslike("key", key)
2364 if len(key) != _POLY1305_KEY_SIZE:
2365 raise ValueError("A poly1305 key is 32 bytes long")
2367 return _Poly1305Context(self, key)
2369 def pkcs7_supported(self) -> bool:
2370 return not self._lib.CRYPTOGRAPHY_IS_BORINGSSL
2372 def load_pem_pkcs7_certificates(
2373 self, data: bytes
2374 ) -> typing.List[x509.Certificate]:
2375 utils._check_bytes("data", data)
2376 bio = self._bytes_to_bio(data)
2377 p7 = self._lib.PEM_read_bio_PKCS7(
2378 bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL
2379 )
2380 if p7 == self._ffi.NULL:
2381 self._consume_errors()
2382 raise ValueError("Unable to parse PKCS7 data")
2384 p7 = self._ffi.gc(p7, self._lib.PKCS7_free)
2385 return self._load_pkcs7_certificates(p7)
2387 def load_der_pkcs7_certificates(
2388 self, data: bytes
2389 ) -> typing.List[x509.Certificate]:
2390 utils._check_bytes("data", data)
2391 bio = self._bytes_to_bio(data)
2392 p7 = self._lib.d2i_PKCS7_bio(bio.bio, self._ffi.NULL)
2393 if p7 == self._ffi.NULL:
2394 self._consume_errors()
2395 raise ValueError("Unable to parse PKCS7 data")
2397 p7 = self._ffi.gc(p7, self._lib.PKCS7_free)
2398 return self._load_pkcs7_certificates(p7)
2400 def _load_pkcs7_certificates(self, p7) -> typing.List[x509.Certificate]:
2401 nid = self._lib.OBJ_obj2nid(p7.type)
2402 self.openssl_assert(nid != self._lib.NID_undef)
2403 if nid != self._lib.NID_pkcs7_signed:
2404 raise UnsupportedAlgorithm(
2405 "Only basic signed structures are currently supported. NID"
2406 " for this data was {}".format(nid),
2407 _Reasons.UNSUPPORTED_SERIALIZATION,
2408 )
2410 sk_x509 = p7.d.sign.cert
2411 num = self._lib.sk_X509_num(sk_x509)
2412 certs = []
2413 for i in range(num):
2414 x509 = self._lib.sk_X509_value(sk_x509, i)
2415 self.openssl_assert(x509 != self._ffi.NULL)
2416 cert = self._ossl2cert(x509)
2417 certs.append(cert)
2419 return certs
2422class GetCipherByName:
2423 def __init__(self, fmt: str):
2424 self._fmt = fmt
2426 def __call__(self, backend: Backend, cipher: CipherAlgorithm, mode: Mode):
2427 cipher_name = self._fmt.format(cipher=cipher, mode=mode).lower()
2428 evp_cipher = backend._lib.EVP_get_cipherbyname(
2429 cipher_name.encode("ascii")
2430 )
2432 # try EVP_CIPHER_fetch if present
2433 if (
2434 evp_cipher == backend._ffi.NULL
2435 and backend._lib.Cryptography_HAS_300_EVP_CIPHER
2436 ):
2437 evp_cipher = backend._lib.EVP_CIPHER_fetch(
2438 backend._ffi.NULL,
2439 cipher_name.encode("ascii"),
2440 backend._ffi.NULL,
2441 )
2443 backend._consume_errors()
2444 return evp_cipher
2447def _get_xts_cipher(backend: Backend, cipher: AES, mode):
2448 cipher_name = f"aes-{cipher.key_size // 2}-xts"
2449 return backend._lib.EVP_get_cipherbyname(cipher_name.encode("ascii"))
2452backend = Backend()