Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/cryptography/hazmat/backends/openssl/backend.py: 19%
1215 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:36 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:36 +0000
1# This file is dual licensed under the terms of the Apache License, Version
2# 2.0, and the BSD License. See the LICENSE file in the root of this repository
3# for complete details.
6import collections
7import contextlib
8import itertools
9import typing
10import warnings
11from contextlib import contextmanager
13from cryptography import utils, x509
14from cryptography.exceptions import UnsupportedAlgorithm, _Reasons
15from cryptography.hazmat.backends.openssl import aead
16from cryptography.hazmat.backends.openssl.ciphers import _CipherContext
17from cryptography.hazmat.backends.openssl.cmac import _CMACContext
18from cryptography.hazmat.backends.openssl.dh import (
19 _dh_params_dup,
20 _DHParameters,
21 _DHPrivateKey,
22 _DHPublicKey,
23)
24from cryptography.hazmat.backends.openssl.dsa import (
25 _DSAParameters,
26 _DSAPrivateKey,
27 _DSAPublicKey,
28)
29from cryptography.hazmat.backends.openssl.ec import (
30 _EllipticCurvePrivateKey,
31 _EllipticCurvePublicKey,
32)
33from cryptography.hazmat.backends.openssl.ed448 import (
34 _ED448_KEY_SIZE,
35 _Ed448PrivateKey,
36 _Ed448PublicKey,
37)
38from cryptography.hazmat.backends.openssl.ed25519 import (
39 _Ed25519PrivateKey,
40 _Ed25519PublicKey,
41)
42from cryptography.hazmat.backends.openssl.hashes import _HashContext
43from cryptography.hazmat.backends.openssl.hmac import _HMACContext
44from cryptography.hazmat.backends.openssl.poly1305 import (
45 _POLY1305_KEY_SIZE,
46 _Poly1305Context,
47)
48from cryptography.hazmat.backends.openssl.rsa import (
49 _RSAPrivateKey,
50 _RSAPublicKey,
51)
52from cryptography.hazmat.backends.openssl.x448 import (
53 _X448PrivateKey,
54 _X448PublicKey,
55)
56from cryptography.hazmat.bindings._rust import openssl as rust_openssl
57from cryptography.hazmat.bindings.openssl import binding
58from cryptography.hazmat.primitives import hashes, serialization
59from cryptography.hazmat.primitives._asymmetric import AsymmetricPadding
60from cryptography.hazmat.primitives.asymmetric import (
61 dh,
62 dsa,
63 ec,
64 ed448,
65 ed25519,
66 rsa,
67 x448,
68 x25519,
69)
70from cryptography.hazmat.primitives.asymmetric.padding import (
71 MGF1,
72 OAEP,
73 PSS,
74 PKCS1v15,
75)
76from cryptography.hazmat.primitives.asymmetric.types import (
77 PrivateKeyTypes,
78 PublicKeyTypes,
79)
80from cryptography.hazmat.primitives.ciphers import (
81 BlockCipherAlgorithm,
82 CipherAlgorithm,
83)
84from cryptography.hazmat.primitives.ciphers.algorithms import (
85 AES,
86 AES128,
87 AES256,
88 ARC4,
89 SM4,
90 Camellia,
91 ChaCha20,
92 TripleDES,
93 _BlowfishInternal,
94 _CAST5Internal,
95 _IDEAInternal,
96 _SEEDInternal,
97)
98from cryptography.hazmat.primitives.ciphers.modes import (
99 CBC,
100 CFB,
101 CFB8,
102 CTR,
103 ECB,
104 GCM,
105 OFB,
106 XTS,
107 Mode,
108)
109from cryptography.hazmat.primitives.kdf import scrypt
110from cryptography.hazmat.primitives.serialization import ssh
111from cryptography.hazmat.primitives.serialization.pkcs12 import (
112 PBES,
113 PKCS12Certificate,
114 PKCS12KeyAndCertificates,
115 PKCS12PrivateKeyTypes,
116 _PKCS12CATypes,
117)
119_MemoryBIO = collections.namedtuple("_MemoryBIO", ["bio", "char_ptr"])
122# Not actually supported, just used as a marker for some serialization tests.
123class _RC2:
124 pass
127class Backend:
128 """
129 OpenSSL API binding interfaces.
130 """
132 name = "openssl"
134 # FIPS has opinions about acceptable algorithms and key sizes, but the
135 # disallowed algorithms are still present in OpenSSL. They just error if
136 # you try to use them. To avoid that we allowlist the algorithms in
137 # FIPS 140-3. This isn't ideal, but FIPS 140-3 is trash so here we are.
138 _fips_aead = {
139 b"aes-128-ccm",
140 b"aes-192-ccm",
141 b"aes-256-ccm",
142 b"aes-128-gcm",
143 b"aes-192-gcm",
144 b"aes-256-gcm",
145 }
146 # TripleDES encryption is disallowed/deprecated throughout 2023 in
147 # FIPS 140-3. To keep it simple we denylist any use of TripleDES (TDEA).
148 _fips_ciphers = (AES,)
149 # Sometimes SHA1 is still permissible. That logic is contained
150 # within the various *_supported methods.
151 _fips_hashes = (
152 hashes.SHA224,
153 hashes.SHA256,
154 hashes.SHA384,
155 hashes.SHA512,
156 hashes.SHA512_224,
157 hashes.SHA512_256,
158 hashes.SHA3_224,
159 hashes.SHA3_256,
160 hashes.SHA3_384,
161 hashes.SHA3_512,
162 hashes.SHAKE128,
163 hashes.SHAKE256,
164 )
165 _fips_ecdh_curves = (
166 ec.SECP224R1,
167 ec.SECP256R1,
168 ec.SECP384R1,
169 ec.SECP521R1,
170 )
171 _fips_rsa_min_key_size = 2048
172 _fips_rsa_min_public_exponent = 65537
173 _fips_dsa_min_modulus = 1 << 2048
174 _fips_dh_min_key_size = 2048
175 _fips_dh_min_modulus = 1 << _fips_dh_min_key_size
177 def __init__(self) -> None:
178 self._binding = binding.Binding()
179 self._ffi = self._binding.ffi
180 self._lib = self._binding.lib
181 self._fips_enabled = self._is_fips_enabled()
183 self._cipher_registry: typing.Dict[
184 typing.Tuple[typing.Type[CipherAlgorithm], typing.Type[Mode]],
185 typing.Callable,
186 ] = {}
187 self._register_default_ciphers()
188 if self._fips_enabled and self._lib.CRYPTOGRAPHY_NEEDS_OSRANDOM_ENGINE:
189 warnings.warn(
190 "OpenSSL FIPS mode is enabled. Can't enable DRBG fork safety.",
191 UserWarning,
192 )
193 else:
194 self.activate_osrandom_engine()
195 self._dh_types = [self._lib.EVP_PKEY_DH]
196 if self._lib.Cryptography_HAS_EVP_PKEY_DHX:
197 self._dh_types.append(self._lib.EVP_PKEY_DHX)
199 def __repr__(self) -> str:
200 return "<OpenSSLBackend(version: {}, FIPS: {}, Legacy: {})>".format(
201 self.openssl_version_text(),
202 self._fips_enabled,
203 self._binding._legacy_provider_loaded,
204 )
206 def openssl_assert(
207 self,
208 ok: bool,
209 errors: typing.Optional[typing.List[rust_openssl.OpenSSLError]] = None,
210 ) -> None:
211 return binding._openssl_assert(self._lib, ok, errors=errors)
213 def _is_fips_enabled(self) -> bool:
214 if self._lib.Cryptography_HAS_300_FIPS:
215 mode = self._lib.EVP_default_properties_is_fips_enabled(
216 self._ffi.NULL
217 )
218 else:
219 mode = self._lib.FIPS_mode()
221 if mode == 0:
222 # OpenSSL without FIPS pushes an error on the error stack
223 self._lib.ERR_clear_error()
224 return bool(mode)
226 def _enable_fips(self) -> None:
227 # This function enables FIPS mode for OpenSSL 3.0.0 on installs that
228 # have the FIPS provider installed properly.
229 self._binding._enable_fips()
230 assert self._is_fips_enabled()
231 self._fips_enabled = self._is_fips_enabled()
233 def activate_builtin_random(self) -> None:
234 if self._lib.CRYPTOGRAPHY_NEEDS_OSRANDOM_ENGINE:
235 # Obtain a new structural reference.
236 e = self._lib.ENGINE_get_default_RAND()
237 if e != self._ffi.NULL:
238 self._lib.ENGINE_unregister_RAND(e)
239 # Reset the RNG to use the built-in.
240 res = self._lib.RAND_set_rand_method(self._ffi.NULL)
241 self.openssl_assert(res == 1)
242 # decrement the structural reference from get_default_RAND
243 res = self._lib.ENGINE_finish(e)
244 self.openssl_assert(res == 1)
246 @contextlib.contextmanager
247 def _get_osurandom_engine(self):
248 # Fetches an engine by id and returns it. This creates a structural
249 # reference.
250 e = self._lib.ENGINE_by_id(self._lib.Cryptography_osrandom_engine_id)
251 self.openssl_assert(e != self._ffi.NULL)
252 # Initialize the engine for use. This adds a functional reference.
253 res = self._lib.ENGINE_init(e)
254 self.openssl_assert(res == 1)
256 try:
257 yield e
258 finally:
259 # Decrement the structural ref incremented by ENGINE_by_id.
260 res = self._lib.ENGINE_free(e)
261 self.openssl_assert(res == 1)
262 # Decrement the functional ref incremented by ENGINE_init.
263 res = self._lib.ENGINE_finish(e)
264 self.openssl_assert(res == 1)
266 def activate_osrandom_engine(self) -> None:
267 if self._lib.CRYPTOGRAPHY_NEEDS_OSRANDOM_ENGINE:
268 # Unregister and free the current engine.
269 self.activate_builtin_random()
270 with self._get_osurandom_engine() as e:
271 # Set the engine as the default RAND provider.
272 res = self._lib.ENGINE_set_default_RAND(e)
273 self.openssl_assert(res == 1)
274 # Reset the RNG to use the engine
275 res = self._lib.RAND_set_rand_method(self._ffi.NULL)
276 self.openssl_assert(res == 1)
278 def osrandom_engine_implementation(self) -> str:
279 buf = self._ffi.new("char[]", 64)
280 with self._get_osurandom_engine() as e:
281 res = self._lib.ENGINE_ctrl_cmd(
282 e, b"get_implementation", len(buf), buf, self._ffi.NULL, 0
283 )
284 self.openssl_assert(res > 0)
285 return self._ffi.string(buf).decode("ascii")
287 def openssl_version_text(self) -> str:
288 """
289 Friendly string name of the loaded OpenSSL library. This is not
290 necessarily the same version as it was compiled against.
292 Example: OpenSSL 1.1.1d 10 Sep 2019
293 """
294 return self._ffi.string(
295 self._lib.OpenSSL_version(self._lib.OPENSSL_VERSION)
296 ).decode("ascii")
298 def openssl_version_number(self) -> int:
299 return self._lib.OpenSSL_version_num()
301 def create_hmac_ctx(
302 self, key: bytes, algorithm: hashes.HashAlgorithm
303 ) -> _HMACContext:
304 return _HMACContext(self, key, algorithm)
306 def _evp_md_from_algorithm(self, algorithm: hashes.HashAlgorithm):
307 if algorithm.name == "blake2b" or algorithm.name == "blake2s":
308 alg = "{}{}".format(
309 algorithm.name, algorithm.digest_size * 8
310 ).encode("ascii")
311 else:
312 alg = algorithm.name.encode("ascii")
314 evp_md = self._lib.EVP_get_digestbyname(alg)
315 return evp_md
317 def _evp_md_non_null_from_algorithm(self, algorithm: hashes.HashAlgorithm):
318 evp_md = self._evp_md_from_algorithm(algorithm)
319 self.openssl_assert(evp_md != self._ffi.NULL)
320 return evp_md
322 def hash_supported(self, algorithm: hashes.HashAlgorithm) -> bool:
323 if self._fips_enabled and not isinstance(algorithm, self._fips_hashes):
324 return False
326 evp_md = self._evp_md_from_algorithm(algorithm)
327 return evp_md != self._ffi.NULL
329 def signature_hash_supported(
330 self, algorithm: hashes.HashAlgorithm
331 ) -> bool:
332 # Dedicated check for hashing algorithm use in message digest for
333 # signatures, e.g. RSA PKCS#1 v1.5 SHA1 (sha1WithRSAEncryption).
334 if self._fips_enabled and isinstance(algorithm, hashes.SHA1):
335 return False
336 return self.hash_supported(algorithm)
338 def scrypt_supported(self) -> bool:
339 if self._fips_enabled:
340 return False
341 else:
342 return self._lib.Cryptography_HAS_SCRYPT == 1
344 def hmac_supported(self, algorithm: hashes.HashAlgorithm) -> bool:
345 # FIPS mode still allows SHA1 for HMAC
346 if self._fips_enabled and isinstance(algorithm, hashes.SHA1):
347 return True
349 return self.hash_supported(algorithm)
351 def create_hash_ctx(
352 self, algorithm: hashes.HashAlgorithm
353 ) -> hashes.HashContext:
354 return _HashContext(self, algorithm)
356 def cipher_supported(self, cipher: CipherAlgorithm, mode: Mode) -> bool:
357 if self._fips_enabled:
358 # FIPS mode requires AES. TripleDES is disallowed/deprecated in
359 # FIPS 140-3.
360 if not isinstance(cipher, self._fips_ciphers):
361 return False
363 try:
364 adapter = self._cipher_registry[type(cipher), type(mode)]
365 except KeyError:
366 return False
367 evp_cipher = adapter(self, cipher, mode)
368 return self._ffi.NULL != evp_cipher
370 def register_cipher_adapter(self, cipher_cls, mode_cls, adapter) -> None:
371 if (cipher_cls, mode_cls) in self._cipher_registry:
372 raise ValueError(
373 "Duplicate registration for: {} {}.".format(
374 cipher_cls, mode_cls
375 )
376 )
377 self._cipher_registry[cipher_cls, mode_cls] = adapter
379 def _register_default_ciphers(self) -> None:
380 for cipher_cls in [AES, AES128, AES256]:
381 for mode_cls in [CBC, CTR, ECB, OFB, CFB, CFB8, GCM]:
382 self.register_cipher_adapter(
383 cipher_cls,
384 mode_cls,
385 GetCipherByName(
386 "{cipher.name}-{cipher.key_size}-{mode.name}"
387 ),
388 )
389 for mode_cls in [CBC, CTR, ECB, OFB, CFB]:
390 self.register_cipher_adapter(
391 Camellia,
392 mode_cls,
393 GetCipherByName("{cipher.name}-{cipher.key_size}-{mode.name}"),
394 )
395 for mode_cls in [CBC, CFB, CFB8, OFB]:
396 self.register_cipher_adapter(
397 TripleDES, mode_cls, GetCipherByName("des-ede3-{mode.name}")
398 )
399 self.register_cipher_adapter(
400 TripleDES, ECB, GetCipherByName("des-ede3")
401 )
402 self.register_cipher_adapter(
403 ChaCha20, type(None), GetCipherByName("chacha20")
404 )
405 self.register_cipher_adapter(AES, XTS, _get_xts_cipher)
406 for mode_cls in [ECB, CBC, OFB, CFB, CTR]:
407 self.register_cipher_adapter(
408 SM4, mode_cls, GetCipherByName("sm4-{mode.name}")
409 )
410 # Don't register legacy ciphers if they're unavailable. Hypothetically
411 # this wouldn't be necessary because we test availability by seeing if
412 # we get an EVP_CIPHER * in the _CipherContext __init__, but OpenSSL 3
413 # will return a valid pointer even though the cipher is unavailable.
414 if (
415 self._binding._legacy_provider_loaded
416 or not self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER
417 ):
418 for mode_cls in [CBC, CFB, OFB, ECB]:
419 self.register_cipher_adapter(
420 _BlowfishInternal,
421 mode_cls,
422 GetCipherByName("bf-{mode.name}"),
423 )
424 for mode_cls in [CBC, CFB, OFB, ECB]:
425 self.register_cipher_adapter(
426 _SEEDInternal,
427 mode_cls,
428 GetCipherByName("seed-{mode.name}"),
429 )
430 for cipher_cls, mode_cls in itertools.product(
431 [_CAST5Internal, _IDEAInternal],
432 [CBC, OFB, CFB, ECB],
433 ):
434 self.register_cipher_adapter(
435 cipher_cls,
436 mode_cls,
437 GetCipherByName("{cipher.name}-{mode.name}"),
438 )
439 self.register_cipher_adapter(
440 ARC4, type(None), GetCipherByName("rc4")
441 )
442 # We don't actually support RC2, this is just used by some tests.
443 self.register_cipher_adapter(
444 _RC2, type(None), GetCipherByName("rc2")
445 )
447 def create_symmetric_encryption_ctx(
448 self, cipher: CipherAlgorithm, mode: Mode
449 ) -> _CipherContext:
450 return _CipherContext(self, cipher, mode, _CipherContext._ENCRYPT)
452 def create_symmetric_decryption_ctx(
453 self, cipher: CipherAlgorithm, mode: Mode
454 ) -> _CipherContext:
455 return _CipherContext(self, cipher, mode, _CipherContext._DECRYPT)
457 def pbkdf2_hmac_supported(self, algorithm: hashes.HashAlgorithm) -> bool:
458 return self.hmac_supported(algorithm)
460 def derive_pbkdf2_hmac(
461 self,
462 algorithm: hashes.HashAlgorithm,
463 length: int,
464 salt: bytes,
465 iterations: int,
466 key_material: bytes,
467 ) -> bytes:
468 buf = self._ffi.new("unsigned char[]", length)
469 evp_md = self._evp_md_non_null_from_algorithm(algorithm)
470 key_material_ptr = self._ffi.from_buffer(key_material)
471 res = self._lib.PKCS5_PBKDF2_HMAC(
472 key_material_ptr,
473 len(key_material),
474 salt,
475 len(salt),
476 iterations,
477 evp_md,
478 length,
479 buf,
480 )
481 self.openssl_assert(res == 1)
482 return self._ffi.buffer(buf)[:]
484 def _consume_errors(self) -> typing.List[rust_openssl.OpenSSLError]:
485 return rust_openssl.capture_error_stack()
487 def _bn_to_int(self, bn) -> int:
488 assert bn != self._ffi.NULL
489 self.openssl_assert(not self._lib.BN_is_negative(bn))
491 bn_num_bytes = self._lib.BN_num_bytes(bn)
492 bin_ptr = self._ffi.new("unsigned char[]", bn_num_bytes)
493 bin_len = self._lib.BN_bn2bin(bn, bin_ptr)
494 # A zero length means the BN has value 0
495 self.openssl_assert(bin_len >= 0)
496 val = int.from_bytes(self._ffi.buffer(bin_ptr)[:bin_len], "big")
497 return val
499 def _int_to_bn(self, num: int, bn=None):
500 """
501 Converts a python integer to a BIGNUM. The returned BIGNUM will not
502 be garbage collected (to support adding them to structs that take
503 ownership of the object). Be sure to register it for GC if it will
504 be discarded after use.
505 """
506 assert bn is None or bn != self._ffi.NULL
508 if bn is None:
509 bn = self._ffi.NULL
511 binary = num.to_bytes(int(num.bit_length() / 8.0 + 1), "big")
512 bn_ptr = self._lib.BN_bin2bn(binary, len(binary), bn)
513 self.openssl_assert(bn_ptr != self._ffi.NULL)
514 return bn_ptr
516 def generate_rsa_private_key(
517 self, public_exponent: int, key_size: int
518 ) -> rsa.RSAPrivateKey:
519 rsa._verify_rsa_parameters(public_exponent, key_size)
521 rsa_cdata = self._lib.RSA_new()
522 self.openssl_assert(rsa_cdata != self._ffi.NULL)
523 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
525 bn = self._int_to_bn(public_exponent)
526 bn = self._ffi.gc(bn, self._lib.BN_free)
528 res = self._lib.RSA_generate_key_ex(
529 rsa_cdata, key_size, bn, self._ffi.NULL
530 )
531 self.openssl_assert(res == 1)
532 evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata)
534 # We can skip RSA key validation here since we just generated the key
535 return _RSAPrivateKey(
536 self, rsa_cdata, evp_pkey, unsafe_skip_rsa_key_validation=True
537 )
539 def generate_rsa_parameters_supported(
540 self, public_exponent: int, key_size: int
541 ) -> bool:
542 return (
543 public_exponent >= 3
544 and public_exponent & 1 != 0
545 and key_size >= 512
546 )
548 def load_rsa_private_numbers(
549 self,
550 numbers: rsa.RSAPrivateNumbers,
551 unsafe_skip_rsa_key_validation: bool,
552 ) -> rsa.RSAPrivateKey:
553 rsa._check_private_key_components(
554 numbers.p,
555 numbers.q,
556 numbers.d,
557 numbers.dmp1,
558 numbers.dmq1,
559 numbers.iqmp,
560 numbers.public_numbers.e,
561 numbers.public_numbers.n,
562 )
563 rsa_cdata = self._lib.RSA_new()
564 self.openssl_assert(rsa_cdata != self._ffi.NULL)
565 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
566 p = self._int_to_bn(numbers.p)
567 q = self._int_to_bn(numbers.q)
568 d = self._int_to_bn(numbers.d)
569 dmp1 = self._int_to_bn(numbers.dmp1)
570 dmq1 = self._int_to_bn(numbers.dmq1)
571 iqmp = self._int_to_bn(numbers.iqmp)
572 e = self._int_to_bn(numbers.public_numbers.e)
573 n = self._int_to_bn(numbers.public_numbers.n)
574 res = self._lib.RSA_set0_factors(rsa_cdata, p, q)
575 self.openssl_assert(res == 1)
576 res = self._lib.RSA_set0_key(rsa_cdata, n, e, d)
577 self.openssl_assert(res == 1)
578 res = self._lib.RSA_set0_crt_params(rsa_cdata, dmp1, dmq1, iqmp)
579 self.openssl_assert(res == 1)
580 evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata)
582 return _RSAPrivateKey(
583 self,
584 rsa_cdata,
585 evp_pkey,
586 unsafe_skip_rsa_key_validation=unsafe_skip_rsa_key_validation,
587 )
589 def load_rsa_public_numbers(
590 self, numbers: rsa.RSAPublicNumbers
591 ) -> rsa.RSAPublicKey:
592 rsa._check_public_key_components(numbers.e, numbers.n)
593 rsa_cdata = self._lib.RSA_new()
594 self.openssl_assert(rsa_cdata != self._ffi.NULL)
595 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
596 e = self._int_to_bn(numbers.e)
597 n = self._int_to_bn(numbers.n)
598 res = self._lib.RSA_set0_key(rsa_cdata, n, e, self._ffi.NULL)
599 self.openssl_assert(res == 1)
600 evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata)
602 return _RSAPublicKey(self, rsa_cdata, evp_pkey)
604 def _create_evp_pkey_gc(self):
605 evp_pkey = self._lib.EVP_PKEY_new()
606 self.openssl_assert(evp_pkey != self._ffi.NULL)
607 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
608 return evp_pkey
610 def _rsa_cdata_to_evp_pkey(self, rsa_cdata):
611 evp_pkey = self._create_evp_pkey_gc()
612 res = self._lib.EVP_PKEY_set1_RSA(evp_pkey, rsa_cdata)
613 self.openssl_assert(res == 1)
614 return evp_pkey
616 def _bytes_to_bio(self, data: bytes) -> _MemoryBIO:
617 """
618 Return a _MemoryBIO namedtuple of (BIO, char*).
620 The char* is the storage for the BIO and it must stay alive until the
621 BIO is finished with.
622 """
623 data_ptr = self._ffi.from_buffer(data)
624 bio = self._lib.BIO_new_mem_buf(data_ptr, len(data))
625 self.openssl_assert(bio != self._ffi.NULL)
627 return _MemoryBIO(self._ffi.gc(bio, self._lib.BIO_free), data_ptr)
629 def _create_mem_bio_gc(self):
630 """
631 Creates an empty memory BIO.
632 """
633 bio_method = self._lib.BIO_s_mem()
634 self.openssl_assert(bio_method != self._ffi.NULL)
635 bio = self._lib.BIO_new(bio_method)
636 self.openssl_assert(bio != self._ffi.NULL)
637 bio = self._ffi.gc(bio, self._lib.BIO_free)
638 return bio
640 def _read_mem_bio(self, bio) -> bytes:
641 """
642 Reads a memory BIO. This only works on memory BIOs.
643 """
644 buf = self._ffi.new("char **")
645 buf_len = self._lib.BIO_get_mem_data(bio, buf)
646 self.openssl_assert(buf_len > 0)
647 self.openssl_assert(buf[0] != self._ffi.NULL)
648 bio_data = self._ffi.buffer(buf[0], buf_len)[:]
649 return bio_data
651 def _evp_pkey_to_private_key(
652 self, evp_pkey, unsafe_skip_rsa_key_validation: bool
653 ) -> PrivateKeyTypes:
654 """
655 Return the appropriate type of PrivateKey given an evp_pkey cdata
656 pointer.
657 """
659 key_type = self._lib.EVP_PKEY_id(evp_pkey)
661 if key_type == self._lib.EVP_PKEY_RSA:
662 rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey)
663 self.openssl_assert(rsa_cdata != self._ffi.NULL)
664 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
665 return _RSAPrivateKey(
666 self,
667 rsa_cdata,
668 evp_pkey,
669 unsafe_skip_rsa_key_validation=unsafe_skip_rsa_key_validation,
670 )
671 elif (
672 key_type == self._lib.EVP_PKEY_RSA_PSS
673 and not self._lib.CRYPTOGRAPHY_IS_LIBRESSL
674 and not self._lib.CRYPTOGRAPHY_IS_BORINGSSL
675 and not self._lib.CRYPTOGRAPHY_OPENSSL_LESS_THAN_111E
676 ):
677 # At the moment the way we handle RSA PSS keys is to strip the
678 # PSS constraints from them and treat them as normal RSA keys
679 # Unfortunately the RSA * itself tracks this data so we need to
680 # extract, serialize, and reload it without the constraints.
681 rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey)
682 self.openssl_assert(rsa_cdata != self._ffi.NULL)
683 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
684 bio = self._create_mem_bio_gc()
685 res = self._lib.i2d_RSAPrivateKey_bio(bio, rsa_cdata)
686 self.openssl_assert(res == 1)
687 return self.load_der_private_key(
688 self._read_mem_bio(bio),
689 password=None,
690 unsafe_skip_rsa_key_validation=unsafe_skip_rsa_key_validation,
691 )
692 elif key_type == self._lib.EVP_PKEY_DSA:
693 dsa_cdata = self._lib.EVP_PKEY_get1_DSA(evp_pkey)
694 self.openssl_assert(dsa_cdata != self._ffi.NULL)
695 dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free)
696 return _DSAPrivateKey(self, dsa_cdata, evp_pkey)
697 elif key_type == self._lib.EVP_PKEY_EC:
698 ec_cdata = self._lib.EVP_PKEY_get1_EC_KEY(evp_pkey)
699 self.openssl_assert(ec_cdata != self._ffi.NULL)
700 ec_cdata = self._ffi.gc(ec_cdata, self._lib.EC_KEY_free)
701 return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey)
702 elif key_type in self._dh_types:
703 dh_cdata = self._lib.EVP_PKEY_get1_DH(evp_pkey)
704 self.openssl_assert(dh_cdata != self._ffi.NULL)
705 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
706 return _DHPrivateKey(self, dh_cdata, evp_pkey)
707 elif key_type == getattr(self._lib, "EVP_PKEY_ED25519", None):
708 # EVP_PKEY_ED25519 is not present in CRYPTOGRAPHY_IS_LIBRESSL
709 return _Ed25519PrivateKey(self, evp_pkey)
710 elif key_type == getattr(self._lib, "EVP_PKEY_X448", None):
711 # EVP_PKEY_X448 is not present in CRYPTOGRAPHY_IS_LIBRESSL
712 return _X448PrivateKey(self, evp_pkey)
713 elif key_type == self._lib.EVP_PKEY_X25519:
714 return rust_openssl.x25519.private_key_from_ptr(
715 int(self._ffi.cast("uintptr_t", evp_pkey))
716 )
717 elif key_type == getattr(self._lib, "EVP_PKEY_ED448", None):
718 # EVP_PKEY_ED448 is not present in CRYPTOGRAPHY_IS_LIBRESSL
719 return _Ed448PrivateKey(self, evp_pkey)
720 else:
721 raise UnsupportedAlgorithm("Unsupported key type.")
723 def _evp_pkey_to_public_key(self, evp_pkey) -> PublicKeyTypes:
724 """
725 Return the appropriate type of PublicKey given an evp_pkey cdata
726 pointer.
727 """
729 key_type = self._lib.EVP_PKEY_id(evp_pkey)
731 if key_type == self._lib.EVP_PKEY_RSA:
732 rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey)
733 self.openssl_assert(rsa_cdata != self._ffi.NULL)
734 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
735 return _RSAPublicKey(self, rsa_cdata, evp_pkey)
736 elif (
737 key_type == self._lib.EVP_PKEY_RSA_PSS
738 and not self._lib.CRYPTOGRAPHY_IS_LIBRESSL
739 and not self._lib.CRYPTOGRAPHY_IS_BORINGSSL
740 and not self._lib.CRYPTOGRAPHY_OPENSSL_LESS_THAN_111E
741 ):
742 rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey)
743 self.openssl_assert(rsa_cdata != self._ffi.NULL)
744 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
745 bio = self._create_mem_bio_gc()
746 res = self._lib.i2d_RSAPublicKey_bio(bio, rsa_cdata)
747 self.openssl_assert(res == 1)
748 return self.load_der_public_key(self._read_mem_bio(bio))
749 elif key_type == self._lib.EVP_PKEY_DSA:
750 dsa_cdata = self._lib.EVP_PKEY_get1_DSA(evp_pkey)
751 self.openssl_assert(dsa_cdata != self._ffi.NULL)
752 dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free)
753 return _DSAPublicKey(self, dsa_cdata, evp_pkey)
754 elif key_type == self._lib.EVP_PKEY_EC:
755 ec_cdata = self._lib.EVP_PKEY_get1_EC_KEY(evp_pkey)
756 if ec_cdata == self._ffi.NULL:
757 errors = self._consume_errors()
758 raise ValueError("Unable to load EC key", errors)
759 ec_cdata = self._ffi.gc(ec_cdata, self._lib.EC_KEY_free)
760 return _EllipticCurvePublicKey(self, ec_cdata, evp_pkey)
761 elif key_type in self._dh_types:
762 dh_cdata = self._lib.EVP_PKEY_get1_DH(evp_pkey)
763 self.openssl_assert(dh_cdata != self._ffi.NULL)
764 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
765 return _DHPublicKey(self, dh_cdata, evp_pkey)
766 elif key_type == getattr(self._lib, "EVP_PKEY_ED25519", None):
767 # EVP_PKEY_ED25519 is not present in CRYPTOGRAPHY_IS_LIBRESSL
768 return _Ed25519PublicKey(self, evp_pkey)
769 elif key_type == getattr(self._lib, "EVP_PKEY_X448", None):
770 # EVP_PKEY_X448 is not present in CRYPTOGRAPHY_IS_LIBRESSL
771 return _X448PublicKey(self, evp_pkey)
772 elif key_type == self._lib.EVP_PKEY_X25519:
773 return rust_openssl.x25519.public_key_from_ptr(
774 int(self._ffi.cast("uintptr_t", evp_pkey))
775 )
776 elif key_type == getattr(self._lib, "EVP_PKEY_ED448", None):
777 # EVP_PKEY_ED448 is not present in CRYPTOGRAPHY_IS_LIBRESSL
778 return _Ed448PublicKey(self, evp_pkey)
779 else:
780 raise UnsupportedAlgorithm("Unsupported key type.")
782 def _oaep_hash_supported(self, algorithm: hashes.HashAlgorithm) -> bool:
783 if self._fips_enabled and isinstance(algorithm, hashes.SHA1):
784 return False
786 return isinstance(
787 algorithm,
788 (
789 hashes.SHA1,
790 hashes.SHA224,
791 hashes.SHA256,
792 hashes.SHA384,
793 hashes.SHA512,
794 ),
795 )
797 def rsa_padding_supported(self, padding: AsymmetricPadding) -> bool:
798 if isinstance(padding, PKCS1v15):
799 return True
800 elif isinstance(padding, PSS) and isinstance(padding._mgf, MGF1):
801 # SHA1 is permissible in MGF1 in FIPS even when SHA1 is blocked
802 # as signature algorithm.
803 if self._fips_enabled and isinstance(
804 padding._mgf._algorithm, hashes.SHA1
805 ):
806 return True
807 else:
808 return self.hash_supported(padding._mgf._algorithm)
809 elif isinstance(padding, OAEP) and isinstance(padding._mgf, MGF1):
810 return self._oaep_hash_supported(
811 padding._mgf._algorithm
812 ) and self._oaep_hash_supported(padding._algorithm)
813 else:
814 return False
816 def rsa_encryption_supported(self, padding: AsymmetricPadding) -> bool:
817 if self._fips_enabled and isinstance(padding, PKCS1v15):
818 return False
819 else:
820 return self.rsa_padding_supported(padding)
822 def generate_dsa_parameters(self, key_size: int) -> dsa.DSAParameters:
823 if key_size not in (1024, 2048, 3072, 4096):
824 raise ValueError(
825 "Key size must be 1024, 2048, 3072, or 4096 bits."
826 )
828 ctx = self._lib.DSA_new()
829 self.openssl_assert(ctx != self._ffi.NULL)
830 ctx = self._ffi.gc(ctx, self._lib.DSA_free)
832 res = self._lib.DSA_generate_parameters_ex(
833 ctx,
834 key_size,
835 self._ffi.NULL,
836 0,
837 self._ffi.NULL,
838 self._ffi.NULL,
839 self._ffi.NULL,
840 )
842 self.openssl_assert(res == 1)
844 return _DSAParameters(self, ctx)
846 def generate_dsa_private_key(
847 self, parameters: dsa.DSAParameters
848 ) -> dsa.DSAPrivateKey:
849 ctx = self._lib.DSAparams_dup(
850 parameters._dsa_cdata # type: ignore[attr-defined]
851 )
852 self.openssl_assert(ctx != self._ffi.NULL)
853 ctx = self._ffi.gc(ctx, self._lib.DSA_free)
854 self._lib.DSA_generate_key(ctx)
855 evp_pkey = self._dsa_cdata_to_evp_pkey(ctx)
857 return _DSAPrivateKey(self, ctx, evp_pkey)
859 def generate_dsa_private_key_and_parameters(
860 self, key_size: int
861 ) -> dsa.DSAPrivateKey:
862 parameters = self.generate_dsa_parameters(key_size)
863 return self.generate_dsa_private_key(parameters)
865 def _dsa_cdata_set_values(
866 self, dsa_cdata, p, q, g, pub_key, priv_key
867 ) -> None:
868 res = self._lib.DSA_set0_pqg(dsa_cdata, p, q, g)
869 self.openssl_assert(res == 1)
870 res = self._lib.DSA_set0_key(dsa_cdata, pub_key, priv_key)
871 self.openssl_assert(res == 1)
873 def load_dsa_private_numbers(
874 self, numbers: dsa.DSAPrivateNumbers
875 ) -> dsa.DSAPrivateKey:
876 dsa._check_dsa_private_numbers(numbers)
877 parameter_numbers = numbers.public_numbers.parameter_numbers
879 dsa_cdata = self._lib.DSA_new()
880 self.openssl_assert(dsa_cdata != self._ffi.NULL)
881 dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free)
883 p = self._int_to_bn(parameter_numbers.p)
884 q = self._int_to_bn(parameter_numbers.q)
885 g = self._int_to_bn(parameter_numbers.g)
886 pub_key = self._int_to_bn(numbers.public_numbers.y)
887 priv_key = self._int_to_bn(numbers.x)
888 self._dsa_cdata_set_values(dsa_cdata, p, q, g, pub_key, priv_key)
890 evp_pkey = self._dsa_cdata_to_evp_pkey(dsa_cdata)
892 return _DSAPrivateKey(self, dsa_cdata, evp_pkey)
894 def load_dsa_public_numbers(
895 self, numbers: dsa.DSAPublicNumbers
896 ) -> dsa.DSAPublicKey:
897 dsa._check_dsa_parameters(numbers.parameter_numbers)
898 dsa_cdata = self._lib.DSA_new()
899 self.openssl_assert(dsa_cdata != self._ffi.NULL)
900 dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free)
902 p = self._int_to_bn(numbers.parameter_numbers.p)
903 q = self._int_to_bn(numbers.parameter_numbers.q)
904 g = self._int_to_bn(numbers.parameter_numbers.g)
905 pub_key = self._int_to_bn(numbers.y)
906 priv_key = self._ffi.NULL
907 self._dsa_cdata_set_values(dsa_cdata, p, q, g, pub_key, priv_key)
909 evp_pkey = self._dsa_cdata_to_evp_pkey(dsa_cdata)
911 return _DSAPublicKey(self, dsa_cdata, evp_pkey)
913 def load_dsa_parameter_numbers(
914 self, numbers: dsa.DSAParameterNumbers
915 ) -> dsa.DSAParameters:
916 dsa._check_dsa_parameters(numbers)
917 dsa_cdata = self._lib.DSA_new()
918 self.openssl_assert(dsa_cdata != self._ffi.NULL)
919 dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free)
921 p = self._int_to_bn(numbers.p)
922 q = self._int_to_bn(numbers.q)
923 g = self._int_to_bn(numbers.g)
924 res = self._lib.DSA_set0_pqg(dsa_cdata, p, q, g)
925 self.openssl_assert(res == 1)
927 return _DSAParameters(self, dsa_cdata)
929 def _dsa_cdata_to_evp_pkey(self, dsa_cdata):
930 evp_pkey = self._create_evp_pkey_gc()
931 res = self._lib.EVP_PKEY_set1_DSA(evp_pkey, dsa_cdata)
932 self.openssl_assert(res == 1)
933 return evp_pkey
935 def dsa_supported(self) -> bool:
936 return not self._fips_enabled
938 def dsa_hash_supported(self, algorithm: hashes.HashAlgorithm) -> bool:
939 if not self.dsa_supported():
940 return False
941 return self.signature_hash_supported(algorithm)
943 def cmac_algorithm_supported(self, algorithm) -> bool:
944 return self.cipher_supported(
945 algorithm, CBC(b"\x00" * algorithm.block_size)
946 )
948 def create_cmac_ctx(self, algorithm: BlockCipherAlgorithm) -> _CMACContext:
949 return _CMACContext(self, algorithm)
951 def load_pem_private_key(
952 self,
953 data: bytes,
954 password: typing.Optional[bytes],
955 unsafe_skip_rsa_key_validation: bool,
956 ) -> PrivateKeyTypes:
957 return self._load_key(
958 self._lib.PEM_read_bio_PrivateKey,
959 data,
960 password,
961 unsafe_skip_rsa_key_validation,
962 )
964 def load_pem_public_key(self, data: bytes) -> PublicKeyTypes:
965 mem_bio = self._bytes_to_bio(data)
966 # In OpenSSL 3.0.x the PEM_read_bio_PUBKEY function will invoke
967 # the default password callback if you pass an encrypted private
968 # key. This is very, very, very bad as the default callback can
969 # trigger an interactive console prompt, which will hang the
970 # Python process. We therefore provide our own callback to
971 # catch this and error out properly.
972 userdata = self._ffi.new("CRYPTOGRAPHY_PASSWORD_DATA *")
973 evp_pkey = self._lib.PEM_read_bio_PUBKEY(
974 mem_bio.bio,
975 self._ffi.NULL,
976 self._ffi.addressof(
977 self._lib._original_lib, "Cryptography_pem_password_cb"
978 ),
979 userdata,
980 )
981 if evp_pkey != self._ffi.NULL:
982 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
983 return self._evp_pkey_to_public_key(evp_pkey)
984 else:
985 # It's not a (RSA/DSA/ECDSA) subjectPublicKeyInfo, but we still
986 # need to check to see if it is a pure PKCS1 RSA public key (not
987 # embedded in a subjectPublicKeyInfo)
988 self._consume_errors()
989 res = self._lib.BIO_reset(mem_bio.bio)
990 self.openssl_assert(res == 1)
991 rsa_cdata = self._lib.PEM_read_bio_RSAPublicKey(
992 mem_bio.bio,
993 self._ffi.NULL,
994 self._ffi.addressof(
995 self._lib._original_lib, "Cryptography_pem_password_cb"
996 ),
997 userdata,
998 )
999 if rsa_cdata != self._ffi.NULL:
1000 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
1001 evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata)
1002 return _RSAPublicKey(self, rsa_cdata, evp_pkey)
1003 else:
1004 self._handle_key_loading_error()
1006 def load_pem_parameters(self, data: bytes) -> dh.DHParameters:
1007 mem_bio = self._bytes_to_bio(data)
1008 # only DH is supported currently
1009 dh_cdata = self._lib.PEM_read_bio_DHparams(
1010 mem_bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL
1011 )
1012 if dh_cdata != self._ffi.NULL:
1013 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
1014 return _DHParameters(self, dh_cdata)
1015 else:
1016 self._handle_key_loading_error()
1018 def load_der_private_key(
1019 self,
1020 data: bytes,
1021 password: typing.Optional[bytes],
1022 unsafe_skip_rsa_key_validation: bool,
1023 ) -> PrivateKeyTypes:
1024 # OpenSSL has a function called d2i_AutoPrivateKey that in theory
1025 # handles this automatically, however it doesn't handle encrypted
1026 # private keys. Instead we try to load the key two different ways.
1027 # First we'll try to load it as a traditional key.
1028 bio_data = self._bytes_to_bio(data)
1029 key = self._evp_pkey_from_der_traditional_key(bio_data, password)
1030 if key:
1031 return self._evp_pkey_to_private_key(
1032 key, unsafe_skip_rsa_key_validation
1033 )
1034 else:
1035 # Finally we try to load it with the method that handles encrypted
1036 # PKCS8 properly.
1037 return self._load_key(
1038 self._lib.d2i_PKCS8PrivateKey_bio,
1039 data,
1040 password,
1041 unsafe_skip_rsa_key_validation,
1042 )
1044 def _evp_pkey_from_der_traditional_key(self, bio_data, password):
1045 key = self._lib.d2i_PrivateKey_bio(bio_data.bio, self._ffi.NULL)
1046 if key != self._ffi.NULL:
1047 key = self._ffi.gc(key, self._lib.EVP_PKEY_free)
1048 if password is not None:
1049 raise TypeError(
1050 "Password was given but private key is not encrypted."
1051 )
1053 return key
1054 else:
1055 self._consume_errors()
1056 return None
1058 def load_der_public_key(self, data: bytes) -> PublicKeyTypes:
1059 mem_bio = self._bytes_to_bio(data)
1060 evp_pkey = self._lib.d2i_PUBKEY_bio(mem_bio.bio, self._ffi.NULL)
1061 if evp_pkey != self._ffi.NULL:
1062 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
1063 return self._evp_pkey_to_public_key(evp_pkey)
1064 else:
1065 # It's not a (RSA/DSA/ECDSA) subjectPublicKeyInfo, but we still
1066 # need to check to see if it is a pure PKCS1 RSA public key (not
1067 # embedded in a subjectPublicKeyInfo)
1068 self._consume_errors()
1069 res = self._lib.BIO_reset(mem_bio.bio)
1070 self.openssl_assert(res == 1)
1071 rsa_cdata = self._lib.d2i_RSAPublicKey_bio(
1072 mem_bio.bio, self._ffi.NULL
1073 )
1074 if rsa_cdata != self._ffi.NULL:
1075 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
1076 evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata)
1077 return _RSAPublicKey(self, rsa_cdata, evp_pkey)
1078 else:
1079 self._handle_key_loading_error()
1081 def load_der_parameters(self, data: bytes) -> dh.DHParameters:
1082 mem_bio = self._bytes_to_bio(data)
1083 dh_cdata = self._lib.d2i_DHparams_bio(mem_bio.bio, self._ffi.NULL)
1084 if dh_cdata != self._ffi.NULL:
1085 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
1086 return _DHParameters(self, dh_cdata)
1087 elif self._lib.Cryptography_HAS_EVP_PKEY_DHX:
1088 # We check to see if the is dhx.
1089 self._consume_errors()
1090 res = self._lib.BIO_reset(mem_bio.bio)
1091 self.openssl_assert(res == 1)
1092 dh_cdata = self._lib.d2i_DHxparams_bio(mem_bio.bio, self._ffi.NULL)
1093 if dh_cdata != self._ffi.NULL:
1094 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
1095 return _DHParameters(self, dh_cdata)
1097 self._handle_key_loading_error()
1099 def _cert2ossl(self, cert: x509.Certificate) -> typing.Any:
1100 data = cert.public_bytes(serialization.Encoding.DER)
1101 mem_bio = self._bytes_to_bio(data)
1102 x509 = self._lib.d2i_X509_bio(mem_bio.bio, self._ffi.NULL)
1103 self.openssl_assert(x509 != self._ffi.NULL)
1104 x509 = self._ffi.gc(x509, self._lib.X509_free)
1105 return x509
1107 def _ossl2cert(self, x509_ptr: typing.Any) -> x509.Certificate:
1108 bio = self._create_mem_bio_gc()
1109 res = self._lib.i2d_X509_bio(bio, x509_ptr)
1110 self.openssl_assert(res == 1)
1111 return x509.load_der_x509_certificate(self._read_mem_bio(bio))
1113 def _check_keys_correspond(self, key1, key2) -> None:
1114 if self._lib.EVP_PKEY_cmp(key1._evp_pkey, key2._evp_pkey) != 1:
1115 raise ValueError("Keys do not correspond")
1117 def _load_key(
1118 self, openssl_read_func, data, password, unsafe_skip_rsa_key_validation
1119 ) -> PrivateKeyTypes:
1120 mem_bio = self._bytes_to_bio(data)
1122 userdata = self._ffi.new("CRYPTOGRAPHY_PASSWORD_DATA *")
1123 if password is not None:
1124 utils._check_byteslike("password", password)
1125 password_ptr = self._ffi.from_buffer(password)
1126 userdata.password = password_ptr
1127 userdata.length = len(password)
1129 evp_pkey = openssl_read_func(
1130 mem_bio.bio,
1131 self._ffi.NULL,
1132 self._ffi.addressof(
1133 self._lib._original_lib, "Cryptography_pem_password_cb"
1134 ),
1135 userdata,
1136 )
1138 if evp_pkey == self._ffi.NULL:
1139 if userdata.error != 0:
1140 self._consume_errors()
1141 if userdata.error == -1:
1142 raise TypeError(
1143 "Password was not given but private key is encrypted"
1144 )
1145 else:
1146 assert userdata.error == -2
1147 raise ValueError(
1148 "Passwords longer than {} bytes are not supported "
1149 "by this backend.".format(userdata.maxsize - 1)
1150 )
1151 else:
1152 self._handle_key_loading_error()
1154 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
1156 if password is not None and userdata.called == 0:
1157 raise TypeError(
1158 "Password was given but private key is not encrypted."
1159 )
1161 assert (
1162 password is not None and userdata.called == 1
1163 ) or password is None
1165 return self._evp_pkey_to_private_key(
1166 evp_pkey, unsafe_skip_rsa_key_validation
1167 )
1169 def _handle_key_loading_error(self) -> typing.NoReturn:
1170 errors = self._consume_errors()
1172 if not errors:
1173 raise ValueError(
1174 "Could not deserialize key data. The data may be in an "
1175 "incorrect format or it may be encrypted with an unsupported "
1176 "algorithm."
1177 )
1179 elif (
1180 errors[0]._lib_reason_match(
1181 self._lib.ERR_LIB_EVP, self._lib.EVP_R_BAD_DECRYPT
1182 )
1183 or errors[0]._lib_reason_match(
1184 self._lib.ERR_LIB_PKCS12,
1185 self._lib.PKCS12_R_PKCS12_CIPHERFINAL_ERROR,
1186 )
1187 or (
1188 self._lib.Cryptography_HAS_PROVIDERS
1189 and errors[0]._lib_reason_match(
1190 self._lib.ERR_LIB_PROV,
1191 self._lib.PROV_R_BAD_DECRYPT,
1192 )
1193 )
1194 ):
1195 raise ValueError("Bad decrypt. Incorrect password?")
1197 elif any(
1198 error._lib_reason_match(
1199 self._lib.ERR_LIB_EVP,
1200 self._lib.EVP_R_UNSUPPORTED_PRIVATE_KEY_ALGORITHM,
1201 )
1202 for error in errors
1203 ):
1204 raise ValueError("Unsupported public key algorithm.")
1206 else:
1207 raise ValueError(
1208 "Could not deserialize key data. The data may be in an "
1209 "incorrect format, it may be encrypted with an unsupported "
1210 "algorithm, or it may be an unsupported key type (e.g. EC "
1211 "curves with explicit parameters).",
1212 errors,
1213 )
1215 def elliptic_curve_supported(self, curve: ec.EllipticCurve) -> bool:
1216 try:
1217 curve_nid = self._elliptic_curve_to_nid(curve)
1218 except UnsupportedAlgorithm:
1219 curve_nid = self._lib.NID_undef
1221 group = self._lib.EC_GROUP_new_by_curve_name(curve_nid)
1223 if group == self._ffi.NULL:
1224 self._consume_errors()
1225 return False
1226 else:
1227 self.openssl_assert(curve_nid != self._lib.NID_undef)
1228 self._lib.EC_GROUP_free(group)
1229 return True
1231 def elliptic_curve_signature_algorithm_supported(
1232 self,
1233 signature_algorithm: ec.EllipticCurveSignatureAlgorithm,
1234 curve: ec.EllipticCurve,
1235 ) -> bool:
1236 # We only support ECDSA right now.
1237 if not isinstance(signature_algorithm, ec.ECDSA):
1238 return False
1240 return self.elliptic_curve_supported(curve)
1242 def generate_elliptic_curve_private_key(
1243 self, curve: ec.EllipticCurve
1244 ) -> ec.EllipticCurvePrivateKey:
1245 """
1246 Generate a new private key on the named curve.
1247 """
1249 if self.elliptic_curve_supported(curve):
1250 ec_cdata = self._ec_key_new_by_curve(curve)
1252 res = self._lib.EC_KEY_generate_key(ec_cdata)
1253 self.openssl_assert(res == 1)
1255 evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata)
1257 return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey)
1258 else:
1259 raise UnsupportedAlgorithm(
1260 f"Backend object does not support {curve.name}.",
1261 _Reasons.UNSUPPORTED_ELLIPTIC_CURVE,
1262 )
1264 def load_elliptic_curve_private_numbers(
1265 self, numbers: ec.EllipticCurvePrivateNumbers
1266 ) -> ec.EllipticCurvePrivateKey:
1267 public = numbers.public_numbers
1269 ec_cdata = self._ec_key_new_by_curve(public.curve)
1271 private_value = self._ffi.gc(
1272 self._int_to_bn(numbers.private_value), self._lib.BN_clear_free
1273 )
1274 res = self._lib.EC_KEY_set_private_key(ec_cdata, private_value)
1275 if res != 1:
1276 self._consume_errors()
1277 raise ValueError("Invalid EC key.")
1279 with self._tmp_bn_ctx() as bn_ctx:
1280 self._ec_key_set_public_key_affine_coordinates(
1281 ec_cdata, public.x, public.y, bn_ctx
1282 )
1283 # derive the expected public point and compare it to the one we
1284 # just set based on the values we were given. If they don't match
1285 # this isn't a valid key pair.
1286 group = self._lib.EC_KEY_get0_group(ec_cdata)
1287 self.openssl_assert(group != self._ffi.NULL)
1288 set_point = backend._lib.EC_KEY_get0_public_key(ec_cdata)
1289 self.openssl_assert(set_point != self._ffi.NULL)
1290 computed_point = self._lib.EC_POINT_new(group)
1291 self.openssl_assert(computed_point != self._ffi.NULL)
1292 computed_point = self._ffi.gc(
1293 computed_point, self._lib.EC_POINT_free
1294 )
1295 res = self._lib.EC_POINT_mul(
1296 group,
1297 computed_point,
1298 private_value,
1299 self._ffi.NULL,
1300 self._ffi.NULL,
1301 bn_ctx,
1302 )
1303 self.openssl_assert(res == 1)
1304 if (
1305 self._lib.EC_POINT_cmp(
1306 group, set_point, computed_point, bn_ctx
1307 )
1308 != 0
1309 ):
1310 raise ValueError("Invalid EC key.")
1312 evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata)
1314 return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey)
1316 def load_elliptic_curve_public_numbers(
1317 self, numbers: ec.EllipticCurvePublicNumbers
1318 ) -> ec.EllipticCurvePublicKey:
1319 ec_cdata = self._ec_key_new_by_curve(numbers.curve)
1320 with self._tmp_bn_ctx() as bn_ctx:
1321 self._ec_key_set_public_key_affine_coordinates(
1322 ec_cdata, numbers.x, numbers.y, bn_ctx
1323 )
1324 evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata)
1326 return _EllipticCurvePublicKey(self, ec_cdata, evp_pkey)
1328 def load_elliptic_curve_public_bytes(
1329 self, curve: ec.EllipticCurve, point_bytes: bytes
1330 ) -> ec.EllipticCurvePublicKey:
1331 ec_cdata = self._ec_key_new_by_curve(curve)
1332 group = self._lib.EC_KEY_get0_group(ec_cdata)
1333 self.openssl_assert(group != self._ffi.NULL)
1334 point = self._lib.EC_POINT_new(group)
1335 self.openssl_assert(point != self._ffi.NULL)
1336 point = self._ffi.gc(point, self._lib.EC_POINT_free)
1337 with self._tmp_bn_ctx() as bn_ctx:
1338 res = self._lib.EC_POINT_oct2point(
1339 group, point, point_bytes, len(point_bytes), bn_ctx
1340 )
1341 if res != 1:
1342 self._consume_errors()
1343 raise ValueError("Invalid public bytes for the given curve")
1345 res = self._lib.EC_KEY_set_public_key(ec_cdata, point)
1346 self.openssl_assert(res == 1)
1347 evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata)
1348 return _EllipticCurvePublicKey(self, ec_cdata, evp_pkey)
1350 def derive_elliptic_curve_private_key(
1351 self, private_value: int, curve: ec.EllipticCurve
1352 ) -> ec.EllipticCurvePrivateKey:
1353 ec_cdata = self._ec_key_new_by_curve(curve)
1355 group = self._lib.EC_KEY_get0_group(ec_cdata)
1356 self.openssl_assert(group != self._ffi.NULL)
1358 point = self._lib.EC_POINT_new(group)
1359 self.openssl_assert(point != self._ffi.NULL)
1360 point = self._ffi.gc(point, self._lib.EC_POINT_free)
1362 value = self._int_to_bn(private_value)
1363 value = self._ffi.gc(value, self._lib.BN_clear_free)
1365 with self._tmp_bn_ctx() as bn_ctx:
1366 res = self._lib.EC_POINT_mul(
1367 group, point, value, self._ffi.NULL, self._ffi.NULL, bn_ctx
1368 )
1369 self.openssl_assert(res == 1)
1371 bn_x = self._lib.BN_CTX_get(bn_ctx)
1372 bn_y = self._lib.BN_CTX_get(bn_ctx)
1374 res = self._lib.EC_POINT_get_affine_coordinates(
1375 group, point, bn_x, bn_y, bn_ctx
1376 )
1377 if res != 1:
1378 self._consume_errors()
1379 raise ValueError("Unable to derive key from private_value")
1381 res = self._lib.EC_KEY_set_public_key(ec_cdata, point)
1382 self.openssl_assert(res == 1)
1383 private = self._int_to_bn(private_value)
1384 private = self._ffi.gc(private, self._lib.BN_clear_free)
1385 res = self._lib.EC_KEY_set_private_key(ec_cdata, private)
1386 self.openssl_assert(res == 1)
1388 evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata)
1390 return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey)
1392 def _ec_key_new_by_curve(self, curve: ec.EllipticCurve):
1393 curve_nid = self._elliptic_curve_to_nid(curve)
1394 return self._ec_key_new_by_curve_nid(curve_nid)
1396 def _ec_key_new_by_curve_nid(self, curve_nid: int):
1397 ec_cdata = self._lib.EC_KEY_new_by_curve_name(curve_nid)
1398 self.openssl_assert(ec_cdata != self._ffi.NULL)
1399 return self._ffi.gc(ec_cdata, self._lib.EC_KEY_free)
1401 def elliptic_curve_exchange_algorithm_supported(
1402 self, algorithm: ec.ECDH, curve: ec.EllipticCurve
1403 ) -> bool:
1404 if self._fips_enabled and not isinstance(
1405 curve, self._fips_ecdh_curves
1406 ):
1407 return False
1409 return self.elliptic_curve_supported(curve) and isinstance(
1410 algorithm, ec.ECDH
1411 )
1413 def _ec_cdata_to_evp_pkey(self, ec_cdata):
1414 evp_pkey = self._create_evp_pkey_gc()
1415 res = self._lib.EVP_PKEY_set1_EC_KEY(evp_pkey, ec_cdata)
1416 self.openssl_assert(res == 1)
1417 return evp_pkey
1419 def _elliptic_curve_to_nid(self, curve: ec.EllipticCurve) -> int:
1420 """
1421 Get the NID for a curve name.
1422 """
1424 curve_aliases = {"secp192r1": "prime192v1", "secp256r1": "prime256v1"}
1426 curve_name = curve_aliases.get(curve.name, curve.name)
1428 curve_nid = self._lib.OBJ_sn2nid(curve_name.encode())
1429 if curve_nid == self._lib.NID_undef:
1430 raise UnsupportedAlgorithm(
1431 f"{curve.name} is not a supported elliptic curve",
1432 _Reasons.UNSUPPORTED_ELLIPTIC_CURVE,
1433 )
1434 return curve_nid
1436 @contextmanager
1437 def _tmp_bn_ctx(self):
1438 bn_ctx = self._lib.BN_CTX_new()
1439 self.openssl_assert(bn_ctx != self._ffi.NULL)
1440 bn_ctx = self._ffi.gc(bn_ctx, self._lib.BN_CTX_free)
1441 self._lib.BN_CTX_start(bn_ctx)
1442 try:
1443 yield bn_ctx
1444 finally:
1445 self._lib.BN_CTX_end(bn_ctx)
1447 def _ec_key_set_public_key_affine_coordinates(
1448 self,
1449 ec_cdata,
1450 x: int,
1451 y: int,
1452 bn_ctx,
1453 ) -> None:
1454 """
1455 Sets the public key point in the EC_KEY context to the affine x and y
1456 values.
1457 """
1459 if x < 0 or y < 0:
1460 raise ValueError(
1461 "Invalid EC key. Both x and y must be non-negative."
1462 )
1464 x = self._ffi.gc(self._int_to_bn(x), self._lib.BN_free)
1465 y = self._ffi.gc(self._int_to_bn(y), self._lib.BN_free)
1466 group = self._lib.EC_KEY_get0_group(ec_cdata)
1467 self.openssl_assert(group != self._ffi.NULL)
1468 point = self._lib.EC_POINT_new(group)
1469 self.openssl_assert(point != self._ffi.NULL)
1470 point = self._ffi.gc(point, self._lib.EC_POINT_free)
1471 res = self._lib.EC_POINT_set_affine_coordinates(
1472 group, point, x, y, bn_ctx
1473 )
1474 if res != 1:
1475 self._consume_errors()
1476 raise ValueError("Invalid EC key.")
1477 res = self._lib.EC_KEY_set_public_key(ec_cdata, point)
1478 self.openssl_assert(res == 1)
1480 def _private_key_bytes(
1481 self,
1482 encoding: serialization.Encoding,
1483 format: serialization.PrivateFormat,
1484 encryption_algorithm: serialization.KeySerializationEncryption,
1485 key,
1486 evp_pkey,
1487 cdata,
1488 ) -> bytes:
1489 # validate argument types
1490 if not isinstance(encoding, serialization.Encoding):
1491 raise TypeError("encoding must be an item from the Encoding enum")
1492 if not isinstance(format, serialization.PrivateFormat):
1493 raise TypeError(
1494 "format must be an item from the PrivateFormat enum"
1495 )
1496 if not isinstance(
1497 encryption_algorithm, serialization.KeySerializationEncryption
1498 ):
1499 raise TypeError(
1500 "Encryption algorithm must be a KeySerializationEncryption "
1501 "instance"
1502 )
1504 # validate password
1505 if isinstance(encryption_algorithm, serialization.NoEncryption):
1506 password = b""
1507 elif isinstance(
1508 encryption_algorithm, serialization.BestAvailableEncryption
1509 ):
1510 password = encryption_algorithm.password
1511 if len(password) > 1023:
1512 raise ValueError(
1513 "Passwords longer than 1023 bytes are not supported by "
1514 "this backend"
1515 )
1516 elif (
1517 isinstance(
1518 encryption_algorithm, serialization._KeySerializationEncryption
1519 )
1520 and encryption_algorithm._format
1521 is format
1522 is serialization.PrivateFormat.OpenSSH
1523 ):
1524 password = encryption_algorithm.password
1525 else:
1526 raise ValueError("Unsupported encryption type")
1528 # PKCS8 + PEM/DER
1529 if format is serialization.PrivateFormat.PKCS8:
1530 if encoding is serialization.Encoding.PEM:
1531 write_bio = self._lib.PEM_write_bio_PKCS8PrivateKey
1532 elif encoding is serialization.Encoding.DER:
1533 write_bio = self._lib.i2d_PKCS8PrivateKey_bio
1534 else:
1535 raise ValueError("Unsupported encoding for PKCS8")
1536 return self._private_key_bytes_via_bio(
1537 write_bio, evp_pkey, password
1538 )
1540 # TraditionalOpenSSL + PEM/DER
1541 if format is serialization.PrivateFormat.TraditionalOpenSSL:
1542 if self._fips_enabled and not isinstance(
1543 encryption_algorithm, serialization.NoEncryption
1544 ):
1545 raise ValueError(
1546 "Encrypted traditional OpenSSL format is not "
1547 "supported in FIPS mode."
1548 )
1549 key_type = self._lib.EVP_PKEY_id(evp_pkey)
1551 if encoding is serialization.Encoding.PEM:
1552 if key_type == self._lib.EVP_PKEY_RSA:
1553 write_bio = self._lib.PEM_write_bio_RSAPrivateKey
1554 elif key_type == self._lib.EVP_PKEY_DSA:
1555 write_bio = self._lib.PEM_write_bio_DSAPrivateKey
1556 elif key_type == self._lib.EVP_PKEY_EC:
1557 write_bio = self._lib.PEM_write_bio_ECPrivateKey
1558 else:
1559 raise ValueError(
1560 "Unsupported key type for TraditionalOpenSSL"
1561 )
1562 return self._private_key_bytes_via_bio(
1563 write_bio, cdata, password
1564 )
1566 if encoding is serialization.Encoding.DER:
1567 if password:
1568 raise ValueError(
1569 "Encryption is not supported for DER encoded "
1570 "traditional OpenSSL keys"
1571 )
1572 if key_type == self._lib.EVP_PKEY_RSA:
1573 write_bio = self._lib.i2d_RSAPrivateKey_bio
1574 elif key_type == self._lib.EVP_PKEY_EC:
1575 write_bio = self._lib.i2d_ECPrivateKey_bio
1576 elif key_type == self._lib.EVP_PKEY_DSA:
1577 write_bio = self._lib.i2d_DSAPrivateKey_bio
1578 else:
1579 raise ValueError(
1580 "Unsupported key type for TraditionalOpenSSL"
1581 )
1582 return self._bio_func_output(write_bio, cdata)
1584 raise ValueError("Unsupported encoding for TraditionalOpenSSL")
1586 # OpenSSH + PEM
1587 if format is serialization.PrivateFormat.OpenSSH:
1588 if encoding is serialization.Encoding.PEM:
1589 return ssh._serialize_ssh_private_key(
1590 key, password, encryption_algorithm
1591 )
1593 raise ValueError(
1594 "OpenSSH private key format can only be used"
1595 " with PEM encoding"
1596 )
1598 # Anything that key-specific code was supposed to handle earlier,
1599 # like Raw.
1600 raise ValueError("format is invalid with this key")
1602 def _private_key_bytes_via_bio(
1603 self, write_bio, evp_pkey, password
1604 ) -> bytes:
1605 if not password:
1606 evp_cipher = self._ffi.NULL
1607 else:
1608 # This is a curated value that we will update over time.
1609 evp_cipher = self._lib.EVP_get_cipherbyname(b"aes-256-cbc")
1611 return self._bio_func_output(
1612 write_bio,
1613 evp_pkey,
1614 evp_cipher,
1615 password,
1616 len(password),
1617 self._ffi.NULL,
1618 self._ffi.NULL,
1619 )
1621 def _bio_func_output(self, write_bio, *args) -> bytes:
1622 bio = self._create_mem_bio_gc()
1623 res = write_bio(bio, *args)
1624 self.openssl_assert(res == 1)
1625 return self._read_mem_bio(bio)
1627 def _public_key_bytes(
1628 self,
1629 encoding: serialization.Encoding,
1630 format: serialization.PublicFormat,
1631 key,
1632 evp_pkey,
1633 cdata,
1634 ) -> bytes:
1635 if not isinstance(encoding, serialization.Encoding):
1636 raise TypeError("encoding must be an item from the Encoding enum")
1637 if not isinstance(format, serialization.PublicFormat):
1638 raise TypeError(
1639 "format must be an item from the PublicFormat enum"
1640 )
1642 # SubjectPublicKeyInfo + PEM/DER
1643 if format is serialization.PublicFormat.SubjectPublicKeyInfo:
1644 if encoding is serialization.Encoding.PEM:
1645 write_bio = self._lib.PEM_write_bio_PUBKEY
1646 elif encoding is serialization.Encoding.DER:
1647 write_bio = self._lib.i2d_PUBKEY_bio
1648 else:
1649 raise ValueError(
1650 "SubjectPublicKeyInfo works only with PEM or DER encoding"
1651 )
1652 return self._bio_func_output(write_bio, evp_pkey)
1654 # PKCS1 + PEM/DER
1655 if format is serialization.PublicFormat.PKCS1:
1656 # Only RSA is supported here.
1657 key_type = self._lib.EVP_PKEY_id(evp_pkey)
1658 if key_type != self._lib.EVP_PKEY_RSA:
1659 raise ValueError("PKCS1 format is supported only for RSA keys")
1661 if encoding is serialization.Encoding.PEM:
1662 write_bio = self._lib.PEM_write_bio_RSAPublicKey
1663 elif encoding is serialization.Encoding.DER:
1664 write_bio = self._lib.i2d_RSAPublicKey_bio
1665 else:
1666 raise ValueError("PKCS1 works only with PEM or DER encoding")
1667 return self._bio_func_output(write_bio, cdata)
1669 # OpenSSH + OpenSSH
1670 if format is serialization.PublicFormat.OpenSSH:
1671 if encoding is serialization.Encoding.OpenSSH:
1672 return ssh.serialize_ssh_public_key(key)
1674 raise ValueError(
1675 "OpenSSH format must be used with OpenSSH encoding"
1676 )
1678 # Anything that key-specific code was supposed to handle earlier,
1679 # like Raw, CompressedPoint, UncompressedPoint
1680 raise ValueError("format is invalid with this key")
1682 def dh_supported(self) -> bool:
1683 return not self._lib.CRYPTOGRAPHY_IS_BORINGSSL
1685 def generate_dh_parameters(
1686 self, generator: int, key_size: int
1687 ) -> dh.DHParameters:
1688 if key_size < dh._MIN_MODULUS_SIZE:
1689 raise ValueError(
1690 "DH key_size must be at least {} bits".format(
1691 dh._MIN_MODULUS_SIZE
1692 )
1693 )
1695 if generator not in (2, 5):
1696 raise ValueError("DH generator must be 2 or 5")
1698 dh_param_cdata = self._lib.DH_new()
1699 self.openssl_assert(dh_param_cdata != self._ffi.NULL)
1700 dh_param_cdata = self._ffi.gc(dh_param_cdata, self._lib.DH_free)
1702 res = self._lib.DH_generate_parameters_ex(
1703 dh_param_cdata, key_size, generator, self._ffi.NULL
1704 )
1705 if res != 1:
1706 errors = self._consume_errors()
1707 raise ValueError("Unable to generate DH parameters", errors)
1709 return _DHParameters(self, dh_param_cdata)
1711 def _dh_cdata_to_evp_pkey(self, dh_cdata):
1712 evp_pkey = self._create_evp_pkey_gc()
1713 res = self._lib.EVP_PKEY_set1_DH(evp_pkey, dh_cdata)
1714 self.openssl_assert(res == 1)
1715 return evp_pkey
1717 def generate_dh_private_key(
1718 self, parameters: dh.DHParameters
1719 ) -> dh.DHPrivateKey:
1720 dh_key_cdata = _dh_params_dup(
1721 parameters._dh_cdata, self # type: ignore[attr-defined]
1722 )
1724 res = self._lib.DH_generate_key(dh_key_cdata)
1725 self.openssl_assert(res == 1)
1727 evp_pkey = self._dh_cdata_to_evp_pkey(dh_key_cdata)
1729 return _DHPrivateKey(self, dh_key_cdata, evp_pkey)
1731 def generate_dh_private_key_and_parameters(
1732 self, generator: int, key_size: int
1733 ) -> dh.DHPrivateKey:
1734 return self.generate_dh_private_key(
1735 self.generate_dh_parameters(generator, key_size)
1736 )
1738 def load_dh_private_numbers(
1739 self, numbers: dh.DHPrivateNumbers
1740 ) -> dh.DHPrivateKey:
1741 parameter_numbers = numbers.public_numbers.parameter_numbers
1743 dh_cdata = self._lib.DH_new()
1744 self.openssl_assert(dh_cdata != self._ffi.NULL)
1745 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
1747 p = self._int_to_bn(parameter_numbers.p)
1748 g = self._int_to_bn(parameter_numbers.g)
1750 if parameter_numbers.q is not None:
1751 q = self._int_to_bn(parameter_numbers.q)
1752 else:
1753 q = self._ffi.NULL
1755 pub_key = self._int_to_bn(numbers.public_numbers.y)
1756 priv_key = self._int_to_bn(numbers.x)
1758 res = self._lib.DH_set0_pqg(dh_cdata, p, q, g)
1759 self.openssl_assert(res == 1)
1761 res = self._lib.DH_set0_key(dh_cdata, pub_key, priv_key)
1762 self.openssl_assert(res == 1)
1764 codes = self._ffi.new("int[]", 1)
1765 res = self._lib.DH_check(dh_cdata, codes)
1766 self.openssl_assert(res == 1)
1768 # DH_check will return DH_NOT_SUITABLE_GENERATOR if p % 24 does not
1769 # equal 11 when the generator is 2 (a quadratic nonresidue).
1770 # We want to ignore that error because p % 24 == 23 is also fine.
1771 # Specifically, g is then a quadratic residue. Within the context of
1772 # Diffie-Hellman this means it can only generate half the possible
1773 # values. That sounds bad, but quadratic nonresidues leak a bit of
1774 # the key to the attacker in exchange for having the full key space
1775 # available. See: https://crypto.stackexchange.com/questions/12961
1776 if codes[0] != 0 and not (
1777 parameter_numbers.g == 2
1778 and codes[0] ^ self._lib.DH_NOT_SUITABLE_GENERATOR == 0
1779 ):
1780 raise ValueError("DH private numbers did not pass safety checks.")
1782 evp_pkey = self._dh_cdata_to_evp_pkey(dh_cdata)
1784 return _DHPrivateKey(self, dh_cdata, evp_pkey)
1786 def load_dh_public_numbers(
1787 self, numbers: dh.DHPublicNumbers
1788 ) -> dh.DHPublicKey:
1789 dh_cdata = self._lib.DH_new()
1790 self.openssl_assert(dh_cdata != self._ffi.NULL)
1791 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
1793 parameter_numbers = numbers.parameter_numbers
1795 p = self._int_to_bn(parameter_numbers.p)
1796 g = self._int_to_bn(parameter_numbers.g)
1798 if parameter_numbers.q is not None:
1799 q = self._int_to_bn(parameter_numbers.q)
1800 else:
1801 q = self._ffi.NULL
1803 pub_key = self._int_to_bn(numbers.y)
1805 res = self._lib.DH_set0_pqg(dh_cdata, p, q, g)
1806 self.openssl_assert(res == 1)
1808 res = self._lib.DH_set0_key(dh_cdata, pub_key, self._ffi.NULL)
1809 self.openssl_assert(res == 1)
1811 evp_pkey = self._dh_cdata_to_evp_pkey(dh_cdata)
1813 return _DHPublicKey(self, dh_cdata, evp_pkey)
1815 def load_dh_parameter_numbers(
1816 self, numbers: dh.DHParameterNumbers
1817 ) -> dh.DHParameters:
1818 dh_cdata = self._lib.DH_new()
1819 self.openssl_assert(dh_cdata != self._ffi.NULL)
1820 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
1822 p = self._int_to_bn(numbers.p)
1823 g = self._int_to_bn(numbers.g)
1825 if numbers.q is not None:
1826 q = self._int_to_bn(numbers.q)
1827 else:
1828 q = self._ffi.NULL
1830 res = self._lib.DH_set0_pqg(dh_cdata, p, q, g)
1831 self.openssl_assert(res == 1)
1833 return _DHParameters(self, dh_cdata)
1835 def dh_parameters_supported(
1836 self, p: int, g: int, q: typing.Optional[int] = None
1837 ) -> bool:
1838 dh_cdata = self._lib.DH_new()
1839 self.openssl_assert(dh_cdata != self._ffi.NULL)
1840 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
1842 p = self._int_to_bn(p)
1843 g = self._int_to_bn(g)
1845 if q is not None:
1846 q = self._int_to_bn(q)
1847 else:
1848 q = self._ffi.NULL
1850 res = self._lib.DH_set0_pqg(dh_cdata, p, q, g)
1851 self.openssl_assert(res == 1)
1853 codes = self._ffi.new("int[]", 1)
1854 res = self._lib.DH_check(dh_cdata, codes)
1855 self.openssl_assert(res == 1)
1857 return codes[0] == 0
1859 def dh_x942_serialization_supported(self) -> bool:
1860 return self._lib.Cryptography_HAS_EVP_PKEY_DHX == 1
1862 def x25519_load_public_bytes(self, data: bytes) -> x25519.X25519PublicKey:
1863 return rust_openssl.x25519.from_public_bytes(data)
1865 def x25519_load_private_bytes(
1866 self, data: bytes
1867 ) -> x25519.X25519PrivateKey:
1868 return rust_openssl.x25519.from_private_bytes(data)
1870 def _evp_pkey_keygen_gc(self, nid):
1871 evp_pkey_ctx = self._lib.EVP_PKEY_CTX_new_id(nid, self._ffi.NULL)
1872 self.openssl_assert(evp_pkey_ctx != self._ffi.NULL)
1873 evp_pkey_ctx = self._ffi.gc(evp_pkey_ctx, self._lib.EVP_PKEY_CTX_free)
1874 res = self._lib.EVP_PKEY_keygen_init(evp_pkey_ctx)
1875 self.openssl_assert(res == 1)
1876 evp_ppkey = self._ffi.new("EVP_PKEY **")
1877 res = self._lib.EVP_PKEY_keygen(evp_pkey_ctx, evp_ppkey)
1878 self.openssl_assert(res == 1)
1879 self.openssl_assert(evp_ppkey[0] != self._ffi.NULL)
1880 evp_pkey = self._ffi.gc(evp_ppkey[0], self._lib.EVP_PKEY_free)
1881 return evp_pkey
1883 def x25519_generate_key(self) -> x25519.X25519PrivateKey:
1884 return rust_openssl.x25519.generate_key()
1886 def x25519_supported(self) -> bool:
1887 if self._fips_enabled:
1888 return False
1889 return not self._lib.CRYPTOGRAPHY_LIBRESSL_LESS_THAN_370
1891 def x448_load_public_bytes(self, data: bytes) -> x448.X448PublicKey:
1892 if len(data) != 56:
1893 raise ValueError("An X448 public key is 56 bytes long")
1895 evp_pkey = self._lib.EVP_PKEY_new_raw_public_key(
1896 self._lib.NID_X448, self._ffi.NULL, data, len(data)
1897 )
1898 self.openssl_assert(evp_pkey != self._ffi.NULL)
1899 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
1900 return _X448PublicKey(self, evp_pkey)
1902 def x448_load_private_bytes(self, data: bytes) -> x448.X448PrivateKey:
1903 if len(data) != 56:
1904 raise ValueError("An X448 private key is 56 bytes long")
1906 data_ptr = self._ffi.from_buffer(data)
1907 evp_pkey = self._lib.EVP_PKEY_new_raw_private_key(
1908 self._lib.NID_X448, self._ffi.NULL, data_ptr, len(data)
1909 )
1910 self.openssl_assert(evp_pkey != self._ffi.NULL)
1911 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
1912 return _X448PrivateKey(self, evp_pkey)
1914 def x448_generate_key(self) -> x448.X448PrivateKey:
1915 evp_pkey = self._evp_pkey_keygen_gc(self._lib.NID_X448)
1916 return _X448PrivateKey(self, evp_pkey)
1918 def x448_supported(self) -> bool:
1919 if self._fips_enabled:
1920 return False
1921 return (
1922 not self._lib.CRYPTOGRAPHY_IS_LIBRESSL
1923 and not self._lib.CRYPTOGRAPHY_IS_BORINGSSL
1924 )
1926 def ed25519_supported(self) -> bool:
1927 if self._fips_enabled:
1928 return False
1929 return self._lib.CRYPTOGRAPHY_HAS_WORKING_ED25519
1931 def ed25519_load_public_bytes(
1932 self, data: bytes
1933 ) -> ed25519.Ed25519PublicKey:
1934 utils._check_bytes("data", data)
1936 if len(data) != ed25519._ED25519_KEY_SIZE:
1937 raise ValueError("An Ed25519 public key is 32 bytes long")
1939 evp_pkey = self._lib.EVP_PKEY_new_raw_public_key(
1940 self._lib.NID_ED25519, self._ffi.NULL, data, len(data)
1941 )
1942 self.openssl_assert(evp_pkey != self._ffi.NULL)
1943 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
1945 return _Ed25519PublicKey(self, evp_pkey)
1947 def ed25519_load_private_bytes(
1948 self, data: bytes
1949 ) -> ed25519.Ed25519PrivateKey:
1950 if len(data) != ed25519._ED25519_KEY_SIZE:
1951 raise ValueError("An Ed25519 private key is 32 bytes long")
1953 utils._check_byteslike("data", data)
1954 data_ptr = self._ffi.from_buffer(data)
1955 evp_pkey = self._lib.EVP_PKEY_new_raw_private_key(
1956 self._lib.NID_ED25519, self._ffi.NULL, data_ptr, len(data)
1957 )
1958 self.openssl_assert(evp_pkey != self._ffi.NULL)
1959 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
1961 return _Ed25519PrivateKey(self, evp_pkey)
1963 def ed25519_generate_key(self) -> ed25519.Ed25519PrivateKey:
1964 evp_pkey = self._evp_pkey_keygen_gc(self._lib.NID_ED25519)
1965 return _Ed25519PrivateKey(self, evp_pkey)
1967 def ed448_supported(self) -> bool:
1968 if self._fips_enabled:
1969 return False
1970 return (
1971 not self._lib.CRYPTOGRAPHY_OPENSSL_LESS_THAN_111B
1972 and not self._lib.CRYPTOGRAPHY_IS_BORINGSSL
1973 )
1975 def ed448_load_public_bytes(self, data: bytes) -> ed448.Ed448PublicKey:
1976 utils._check_bytes("data", data)
1977 if len(data) != _ED448_KEY_SIZE:
1978 raise ValueError("An Ed448 public key is 57 bytes long")
1980 evp_pkey = self._lib.EVP_PKEY_new_raw_public_key(
1981 self._lib.NID_ED448, self._ffi.NULL, data, len(data)
1982 )
1983 self.openssl_assert(evp_pkey != self._ffi.NULL)
1984 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
1986 return _Ed448PublicKey(self, evp_pkey)
1988 def ed448_load_private_bytes(self, data: bytes) -> ed448.Ed448PrivateKey:
1989 utils._check_byteslike("data", data)
1990 if len(data) != _ED448_KEY_SIZE:
1991 raise ValueError("An Ed448 private key is 57 bytes long")
1993 data_ptr = self._ffi.from_buffer(data)
1994 evp_pkey = self._lib.EVP_PKEY_new_raw_private_key(
1995 self._lib.NID_ED448, self._ffi.NULL, data_ptr, len(data)
1996 )
1997 self.openssl_assert(evp_pkey != self._ffi.NULL)
1998 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
2000 return _Ed448PrivateKey(self, evp_pkey)
2002 def ed448_generate_key(self) -> ed448.Ed448PrivateKey:
2003 evp_pkey = self._evp_pkey_keygen_gc(self._lib.NID_ED448)
2004 return _Ed448PrivateKey(self, evp_pkey)
2006 def derive_scrypt(
2007 self,
2008 key_material: bytes,
2009 salt: bytes,
2010 length: int,
2011 n: int,
2012 r: int,
2013 p: int,
2014 ) -> bytes:
2015 buf = self._ffi.new("unsigned char[]", length)
2016 key_material_ptr = self._ffi.from_buffer(key_material)
2017 res = self._lib.EVP_PBE_scrypt(
2018 key_material_ptr,
2019 len(key_material),
2020 salt,
2021 len(salt),
2022 n,
2023 r,
2024 p,
2025 scrypt._MEM_LIMIT,
2026 buf,
2027 length,
2028 )
2029 if res != 1:
2030 errors = self._consume_errors()
2031 # memory required formula explained here:
2032 # https://blog.filippo.io/the-scrypt-parameters/
2033 min_memory = 128 * n * r // (1024**2)
2034 raise MemoryError(
2035 "Not enough memory to derive key. These parameters require"
2036 " {} MB of memory.".format(min_memory),
2037 errors,
2038 )
2039 return self._ffi.buffer(buf)[:]
2041 def aead_cipher_supported(self, cipher) -> bool:
2042 cipher_name = aead._aead_cipher_name(cipher)
2043 if self._fips_enabled and cipher_name not in self._fips_aead:
2044 return False
2045 # SIV isn't loaded through get_cipherbyname but instead a new fetch API
2046 # only available in 3.0+. But if we know we're on 3.0+ then we know
2047 # it's supported.
2048 if cipher_name.endswith(b"-siv"):
2049 return self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER == 1
2050 else:
2051 return (
2052 self._lib.EVP_get_cipherbyname(cipher_name) != self._ffi.NULL
2053 )
2055 def _zero_data(self, data, length: int) -> None:
2056 # We clear things this way because at the moment we're not
2057 # sure of a better way that can guarantee it overwrites the
2058 # memory of a bytearray and doesn't just replace the underlying char *.
2059 for i in range(length):
2060 data[i] = 0
2062 @contextlib.contextmanager
2063 def _zeroed_null_terminated_buf(self, data):
2064 """
2065 This method takes bytes, which can be a bytestring or a mutable
2066 buffer like a bytearray, and yields a null-terminated version of that
2067 data. This is required because PKCS12_parse doesn't take a length with
2068 its password char * and ffi.from_buffer doesn't provide null
2069 termination. So, to support zeroing the data via bytearray we
2070 need to build this ridiculous construct that copies the memory, but
2071 zeroes it after use.
2072 """
2073 if data is None:
2074 yield self._ffi.NULL
2075 else:
2076 data_len = len(data)
2077 buf = self._ffi.new("char[]", data_len + 1)
2078 self._ffi.memmove(buf, data, data_len)
2079 try:
2080 yield buf
2081 finally:
2082 # Cast to a uint8_t * so we can assign by integer
2083 self._zero_data(self._ffi.cast("uint8_t *", buf), data_len)
2085 def load_key_and_certificates_from_pkcs12(
2086 self, data: bytes, password: typing.Optional[bytes]
2087 ) -> typing.Tuple[
2088 typing.Optional[PrivateKeyTypes],
2089 typing.Optional[x509.Certificate],
2090 typing.List[x509.Certificate],
2091 ]:
2092 pkcs12 = self.load_pkcs12(data, password)
2093 return (
2094 pkcs12.key,
2095 pkcs12.cert.certificate if pkcs12.cert else None,
2096 [cert.certificate for cert in pkcs12.additional_certs],
2097 )
2099 def load_pkcs12(
2100 self, data: bytes, password: typing.Optional[bytes]
2101 ) -> PKCS12KeyAndCertificates:
2102 if password is not None:
2103 utils._check_byteslike("password", password)
2105 bio = self._bytes_to_bio(data)
2106 p12 = self._lib.d2i_PKCS12_bio(bio.bio, self._ffi.NULL)
2107 if p12 == self._ffi.NULL:
2108 self._consume_errors()
2109 raise ValueError("Could not deserialize PKCS12 data")
2111 p12 = self._ffi.gc(p12, self._lib.PKCS12_free)
2112 evp_pkey_ptr = self._ffi.new("EVP_PKEY **")
2113 x509_ptr = self._ffi.new("X509 **")
2114 sk_x509_ptr = self._ffi.new("Cryptography_STACK_OF_X509 **")
2115 with self._zeroed_null_terminated_buf(password) as password_buf:
2116 res = self._lib.PKCS12_parse(
2117 p12, password_buf, evp_pkey_ptr, x509_ptr, sk_x509_ptr
2118 )
2119 if res == 0:
2120 self._consume_errors()
2121 raise ValueError("Invalid password or PKCS12 data")
2123 cert = None
2124 key = None
2125 additional_certificates = []
2127 if evp_pkey_ptr[0] != self._ffi.NULL:
2128 evp_pkey = self._ffi.gc(evp_pkey_ptr[0], self._lib.EVP_PKEY_free)
2129 # We don't support turning off RSA key validation when loading
2130 # PKCS12 keys
2131 key = self._evp_pkey_to_private_key(
2132 evp_pkey, unsafe_skip_rsa_key_validation=False
2133 )
2135 if x509_ptr[0] != self._ffi.NULL:
2136 x509 = self._ffi.gc(x509_ptr[0], self._lib.X509_free)
2137 cert_obj = self._ossl2cert(x509)
2138 name = None
2139 maybe_name = self._lib.X509_alias_get0(x509, self._ffi.NULL)
2140 if maybe_name != self._ffi.NULL:
2141 name = self._ffi.string(maybe_name)
2142 cert = PKCS12Certificate(cert_obj, name)
2144 if sk_x509_ptr[0] != self._ffi.NULL:
2145 sk_x509 = self._ffi.gc(sk_x509_ptr[0], self._lib.sk_X509_free)
2146 num = self._lib.sk_X509_num(sk_x509_ptr[0])
2148 # In OpenSSL < 3.0.0 PKCS12 parsing reverses the order of the
2149 # certificates.
2150 indices: typing.Iterable[int]
2151 if (
2152 self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER
2153 or self._lib.CRYPTOGRAPHY_IS_BORINGSSL
2154 ):
2155 indices = range(num)
2156 else:
2157 indices = reversed(range(num))
2159 for i in indices:
2160 x509 = self._lib.sk_X509_value(sk_x509, i)
2161 self.openssl_assert(x509 != self._ffi.NULL)
2162 x509 = self._ffi.gc(x509, self._lib.X509_free)
2163 addl_cert = self._ossl2cert(x509)
2164 addl_name = None
2165 maybe_name = self._lib.X509_alias_get0(x509, self._ffi.NULL)
2166 if maybe_name != self._ffi.NULL:
2167 addl_name = self._ffi.string(maybe_name)
2168 additional_certificates.append(
2169 PKCS12Certificate(addl_cert, addl_name)
2170 )
2172 return PKCS12KeyAndCertificates(key, cert, additional_certificates)
2174 def serialize_key_and_certificates_to_pkcs12(
2175 self,
2176 name: typing.Optional[bytes],
2177 key: typing.Optional[PKCS12PrivateKeyTypes],
2178 cert: typing.Optional[x509.Certificate],
2179 cas: typing.Optional[typing.List[_PKCS12CATypes]],
2180 encryption_algorithm: serialization.KeySerializationEncryption,
2181 ) -> bytes:
2182 password = None
2183 if name is not None:
2184 utils._check_bytes("name", name)
2186 if isinstance(encryption_algorithm, serialization.NoEncryption):
2187 nid_cert = -1
2188 nid_key = -1
2189 pkcs12_iter = 0
2190 mac_iter = 0
2191 mac_alg = self._ffi.NULL
2192 elif isinstance(
2193 encryption_algorithm, serialization.BestAvailableEncryption
2194 ):
2195 # PKCS12 encryption is hopeless trash and can never be fixed.
2196 # OpenSSL 3 supports PBESv2, but Libre and Boring do not, so
2197 # we use PBESv1 with 3DES on the older paths.
2198 if self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER:
2199 nid_cert = self._lib.NID_aes_256_cbc
2200 nid_key = self._lib.NID_aes_256_cbc
2201 else:
2202 nid_cert = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC
2203 nid_key = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC
2204 # At least we can set this higher than OpenSSL's default
2205 pkcs12_iter = 20000
2206 # mac_iter chosen for compatibility reasons, see:
2207 # https://www.openssl.org/docs/man1.1.1/man3/PKCS12_create.html
2208 # Did we mention how lousy PKCS12 encryption is?
2209 mac_iter = 1
2210 # MAC algorithm can only be set on OpenSSL 3.0.0+
2211 mac_alg = self._ffi.NULL
2212 password = encryption_algorithm.password
2213 elif (
2214 isinstance(
2215 encryption_algorithm, serialization._KeySerializationEncryption
2216 )
2217 and encryption_algorithm._format
2218 is serialization.PrivateFormat.PKCS12
2219 ):
2220 # Default to OpenSSL's defaults. Behavior will vary based on the
2221 # version of OpenSSL cryptography is compiled against.
2222 nid_cert = 0
2223 nid_key = 0
2224 # Use the default iters we use in best available
2225 pkcs12_iter = 20000
2226 # See the Best Available comment for why this is 1
2227 mac_iter = 1
2228 password = encryption_algorithm.password
2229 keycertalg = encryption_algorithm._key_cert_algorithm
2230 if keycertalg is PBES.PBESv1SHA1And3KeyTripleDESCBC:
2231 nid_cert = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC
2232 nid_key = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC
2233 elif keycertalg is PBES.PBESv2SHA256AndAES256CBC:
2234 if not self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER:
2235 raise UnsupportedAlgorithm(
2236 "PBESv2 is not supported by this version of OpenSSL"
2237 )
2238 nid_cert = self._lib.NID_aes_256_cbc
2239 nid_key = self._lib.NID_aes_256_cbc
2240 else:
2241 assert keycertalg is None
2242 # We use OpenSSL's defaults
2244 if encryption_algorithm._hmac_hash is not None:
2245 if not self._lib.Cryptography_HAS_PKCS12_SET_MAC:
2246 raise UnsupportedAlgorithm(
2247 "Setting MAC algorithm is not supported by this "
2248 "version of OpenSSL."
2249 )
2250 mac_alg = self._evp_md_non_null_from_algorithm(
2251 encryption_algorithm._hmac_hash
2252 )
2253 self.openssl_assert(mac_alg != self._ffi.NULL)
2254 else:
2255 mac_alg = self._ffi.NULL
2257 if encryption_algorithm._kdf_rounds is not None:
2258 pkcs12_iter = encryption_algorithm._kdf_rounds
2260 else:
2261 raise ValueError("Unsupported key encryption type")
2263 if cas is None or len(cas) == 0:
2264 sk_x509 = self._ffi.NULL
2265 else:
2266 sk_x509 = self._lib.sk_X509_new_null()
2267 sk_x509 = self._ffi.gc(sk_x509, self._lib.sk_X509_free)
2269 # This list is to keep the x509 values alive until end of function
2270 ossl_cas = []
2271 for ca in cas:
2272 if isinstance(ca, PKCS12Certificate):
2273 ca_alias = ca.friendly_name
2274 ossl_ca = self._cert2ossl(ca.certificate)
2275 if ca_alias is None:
2276 res = self._lib.X509_alias_set1(
2277 ossl_ca, self._ffi.NULL, -1
2278 )
2279 else:
2280 res = self._lib.X509_alias_set1(
2281 ossl_ca, ca_alias, len(ca_alias)
2282 )
2283 self.openssl_assert(res == 1)
2284 else:
2285 ossl_ca = self._cert2ossl(ca)
2286 ossl_cas.append(ossl_ca)
2287 res = self._lib.sk_X509_push(sk_x509, ossl_ca)
2288 backend.openssl_assert(res >= 1)
2290 with self._zeroed_null_terminated_buf(password) as password_buf:
2291 with self._zeroed_null_terminated_buf(name) as name_buf:
2292 ossl_cert = self._cert2ossl(cert) if cert else self._ffi.NULL
2293 if key is not None:
2294 evp_pkey = key._evp_pkey # type: ignore[union-attr]
2295 else:
2296 evp_pkey = self._ffi.NULL
2298 p12 = self._lib.PKCS12_create(
2299 password_buf,
2300 name_buf,
2301 evp_pkey,
2302 ossl_cert,
2303 sk_x509,
2304 nid_key,
2305 nid_cert,
2306 pkcs12_iter,
2307 mac_iter,
2308 0,
2309 )
2311 if (
2312 self._lib.Cryptography_HAS_PKCS12_SET_MAC
2313 and mac_alg != self._ffi.NULL
2314 ):
2315 self._lib.PKCS12_set_mac(
2316 p12,
2317 password_buf,
2318 -1,
2319 self._ffi.NULL,
2320 0,
2321 mac_iter,
2322 mac_alg,
2323 )
2325 self.openssl_assert(p12 != self._ffi.NULL)
2326 p12 = self._ffi.gc(p12, self._lib.PKCS12_free)
2328 bio = self._create_mem_bio_gc()
2329 res = self._lib.i2d_PKCS12_bio(bio, p12)
2330 self.openssl_assert(res > 0)
2331 return self._read_mem_bio(bio)
2333 def poly1305_supported(self) -> bool:
2334 if self._fips_enabled:
2335 return False
2336 return self._lib.Cryptography_HAS_POLY1305 == 1
2338 def create_poly1305_ctx(self, key: bytes) -> _Poly1305Context:
2339 utils._check_byteslike("key", key)
2340 if len(key) != _POLY1305_KEY_SIZE:
2341 raise ValueError("A poly1305 key is 32 bytes long")
2343 return _Poly1305Context(self, key)
2345 def pkcs7_supported(self) -> bool:
2346 return not self._lib.CRYPTOGRAPHY_IS_BORINGSSL
2348 def load_pem_pkcs7_certificates(
2349 self, data: bytes
2350 ) -> typing.List[x509.Certificate]:
2351 utils._check_bytes("data", data)
2352 bio = self._bytes_to_bio(data)
2353 p7 = self._lib.PEM_read_bio_PKCS7(
2354 bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL
2355 )
2356 if p7 == self._ffi.NULL:
2357 self._consume_errors()
2358 raise ValueError("Unable to parse PKCS7 data")
2360 p7 = self._ffi.gc(p7, self._lib.PKCS7_free)
2361 return self._load_pkcs7_certificates(p7)
2363 def load_der_pkcs7_certificates(
2364 self, data: bytes
2365 ) -> typing.List[x509.Certificate]:
2366 utils._check_bytes("data", data)
2367 bio = self._bytes_to_bio(data)
2368 p7 = self._lib.d2i_PKCS7_bio(bio.bio, self._ffi.NULL)
2369 if p7 == self._ffi.NULL:
2370 self._consume_errors()
2371 raise ValueError("Unable to parse PKCS7 data")
2373 p7 = self._ffi.gc(p7, self._lib.PKCS7_free)
2374 return self._load_pkcs7_certificates(p7)
2376 def _load_pkcs7_certificates(self, p7) -> typing.List[x509.Certificate]:
2377 nid = self._lib.OBJ_obj2nid(p7.type)
2378 self.openssl_assert(nid != self._lib.NID_undef)
2379 if nid != self._lib.NID_pkcs7_signed:
2380 raise UnsupportedAlgorithm(
2381 "Only basic signed structures are currently supported. NID"
2382 " for this data was {}".format(nid),
2383 _Reasons.UNSUPPORTED_SERIALIZATION,
2384 )
2386 sk_x509 = p7.d.sign.cert
2387 num = self._lib.sk_X509_num(sk_x509)
2388 certs = []
2389 for i in range(num):
2390 x509 = self._lib.sk_X509_value(sk_x509, i)
2391 self.openssl_assert(x509 != self._ffi.NULL)
2392 cert = self._ossl2cert(x509)
2393 certs.append(cert)
2395 return certs
2398class GetCipherByName:
2399 def __init__(self, fmt: str):
2400 self._fmt = fmt
2402 def __call__(self, backend: Backend, cipher: CipherAlgorithm, mode: Mode):
2403 cipher_name = self._fmt.format(cipher=cipher, mode=mode).lower()
2404 evp_cipher = backend._lib.EVP_get_cipherbyname(
2405 cipher_name.encode("ascii")
2406 )
2408 # try EVP_CIPHER_fetch if present
2409 if (
2410 evp_cipher == backend._ffi.NULL
2411 and backend._lib.Cryptography_HAS_300_EVP_CIPHER
2412 ):
2413 evp_cipher = backend._lib.EVP_CIPHER_fetch(
2414 backend._ffi.NULL,
2415 cipher_name.encode("ascii"),
2416 backend._ffi.NULL,
2417 )
2419 backend._consume_errors()
2420 return evp_cipher
2423def _get_xts_cipher(backend: Backend, cipher: AES, mode):
2424 cipher_name = f"aes-{cipher.key_size // 2}-xts"
2425 return backend._lib.EVP_get_cipherbyname(cipher_name.encode("ascii"))
2428backend = Backend()