Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/cryptography/hazmat/backends/openssl/backend.py: 17%
1352 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
1# This file is dual licensed under the terms of the Apache License, Version
2# 2.0, and the BSD License. See the LICENSE file in the root of this repository
3# for complete details.
6import collections
7import contextlib
8import itertools
9import typing
10import warnings
11from contextlib import contextmanager
13from cryptography import utils, x509
14from cryptography.exceptions import UnsupportedAlgorithm, _Reasons
15from cryptography.hazmat.backends.openssl import aead
16from cryptography.hazmat.backends.openssl.ciphers import _CipherContext
17from cryptography.hazmat.backends.openssl.cmac import _CMACContext
18from cryptography.hazmat.backends.openssl.dh import (
19 _DHParameters,
20 _DHPrivateKey,
21 _DHPublicKey,
22 _dh_params_dup,
23)
24from cryptography.hazmat.backends.openssl.dsa import (
25 _DSAParameters,
26 _DSAPrivateKey,
27 _DSAPublicKey,
28)
29from cryptography.hazmat.backends.openssl.ec import (
30 _EllipticCurvePrivateKey,
31 _EllipticCurvePublicKey,
32)
33from cryptography.hazmat.backends.openssl.ed25519 import (
34 _Ed25519PrivateKey,
35 _Ed25519PublicKey,
36)
37from cryptography.hazmat.backends.openssl.ed448 import (
38 _ED448_KEY_SIZE,
39 _Ed448PrivateKey,
40 _Ed448PublicKey,
41)
42from cryptography.hazmat.backends.openssl.hashes import _HashContext
43from cryptography.hazmat.backends.openssl.hmac import _HMACContext
44from cryptography.hazmat.backends.openssl.poly1305 import (
45 _POLY1305_KEY_SIZE,
46 _Poly1305Context,
47)
48from cryptography.hazmat.backends.openssl.rsa import (
49 _RSAPrivateKey,
50 _RSAPublicKey,
51)
52from cryptography.hazmat.backends.openssl.x25519 import (
53 _X25519PrivateKey,
54 _X25519PublicKey,
55)
56from cryptography.hazmat.backends.openssl.x448 import (
57 _X448PrivateKey,
58 _X448PublicKey,
59)
60from cryptography.hazmat.bindings._rust import (
61 x509 as rust_x509,
62)
63from cryptography.hazmat.bindings.openssl import binding
64from cryptography.hazmat.primitives import hashes, serialization
65from cryptography.hazmat.primitives._asymmetric import AsymmetricPadding
66from cryptography.hazmat.primitives.asymmetric import (
67 dh,
68 dsa,
69 ec,
70 ed25519,
71 ed448,
72 rsa,
73 x25519,
74 x448,
75)
76from cryptography.hazmat.primitives.asymmetric.padding import (
77 MGF1,
78 OAEP,
79 PKCS1v15,
80 PSS,
81)
82from cryptography.hazmat.primitives.asymmetric.types import (
83 CERTIFICATE_ISSUER_PUBLIC_KEY_TYPES,
84 PRIVATE_KEY_TYPES,
85 PUBLIC_KEY_TYPES,
86)
87from cryptography.hazmat.primitives.ciphers import (
88 BlockCipherAlgorithm,
89 CipherAlgorithm,
90)
91from cryptography.hazmat.primitives.ciphers.algorithms import (
92 AES,
93 AES128,
94 AES256,
95 ARC4,
96 Camellia,
97 ChaCha20,
98 SM4,
99 TripleDES,
100 _BlowfishInternal,
101 _CAST5Internal,
102 _IDEAInternal,
103 _SEEDInternal,
104)
105from cryptography.hazmat.primitives.ciphers.modes import (
106 CBC,
107 CFB,
108 CFB8,
109 CTR,
110 ECB,
111 GCM,
112 Mode,
113 OFB,
114 XTS,
115)
116from cryptography.hazmat.primitives.kdf import scrypt
117from cryptography.hazmat.primitives.serialization import pkcs7, ssh
118from cryptography.hazmat.primitives.serialization.pkcs12 import (
119 PBES,
120 PKCS12Certificate,
121 PKCS12KeyAndCertificates,
122 _ALLOWED_PKCS12_TYPES,
123 _PKCS12_CAS_TYPES,
124)
127_MemoryBIO = collections.namedtuple("_MemoryBIO", ["bio", "char_ptr"])
130# Not actually supported, just used as a marker for some serialization tests.
131class _RC2:
132 pass
135class Backend:
136 """
137 OpenSSL API binding interfaces.
138 """
140 name = "openssl"
142 # FIPS has opinions about acceptable algorithms and key sizes, but the
143 # disallowed algorithms are still present in OpenSSL. They just error if
144 # you try to use them. To avoid that we allowlist the algorithms in
145 # FIPS 140-3. This isn't ideal, but FIPS 140-3 is trash so here we are.
146 _fips_aead = {
147 b"aes-128-ccm",
148 b"aes-192-ccm",
149 b"aes-256-ccm",
150 b"aes-128-gcm",
151 b"aes-192-gcm",
152 b"aes-256-gcm",
153 }
154 # TripleDES encryption is disallowed/deprecated throughout 2023 in
155 # FIPS 140-3. To keep it simple we denylist any use of TripleDES (TDEA).
156 _fips_ciphers = (AES,)
157 # Sometimes SHA1 is still permissible. That logic is contained
158 # within the various *_supported methods.
159 _fips_hashes = (
160 hashes.SHA224,
161 hashes.SHA256,
162 hashes.SHA384,
163 hashes.SHA512,
164 hashes.SHA512_224,
165 hashes.SHA512_256,
166 hashes.SHA3_224,
167 hashes.SHA3_256,
168 hashes.SHA3_384,
169 hashes.SHA3_512,
170 hashes.SHAKE128,
171 hashes.SHAKE256,
172 )
173 _fips_ecdh_curves = (
174 ec.SECP224R1,
175 ec.SECP256R1,
176 ec.SECP384R1,
177 ec.SECP521R1,
178 )
179 _fips_rsa_min_key_size = 2048
180 _fips_rsa_min_public_exponent = 65537
181 _fips_dsa_min_modulus = 1 << 2048
182 _fips_dh_min_key_size = 2048
183 _fips_dh_min_modulus = 1 << _fips_dh_min_key_size
185 def __init__(self):
186 self._binding = binding.Binding()
187 self._ffi = self._binding.ffi
188 self._lib = self._binding.lib
189 self._rsa_skip_check_key = False
190 self._fips_enabled = self._is_fips_enabled()
192 self._cipher_registry = {}
193 self._register_default_ciphers()
194 if self._fips_enabled and self._lib.CRYPTOGRAPHY_NEEDS_OSRANDOM_ENGINE:
195 warnings.warn(
196 "OpenSSL FIPS mode is enabled. Can't enable DRBG fork safety.",
197 UserWarning,
198 )
199 else:
200 self.activate_osrandom_engine()
201 self._dh_types = [self._lib.EVP_PKEY_DH]
202 if self._lib.Cryptography_HAS_EVP_PKEY_DHX:
203 self._dh_types.append(self._lib.EVP_PKEY_DHX)
205 def __repr__(self) -> str:
206 return "<OpenSSLBackend(version: {}, FIPS: {})>".format(
207 self.openssl_version_text(), self._fips_enabled
208 )
210 def openssl_assert(
211 self,
212 ok: bool,
213 errors: typing.Optional[typing.List[binding._OpenSSLError]] = None,
214 ) -> None:
215 return binding._openssl_assert(self._lib, ok, errors=errors)
217 def _is_fips_enabled(self) -> bool:
218 if self._lib.Cryptography_HAS_300_FIPS:
219 mode = self._lib.EVP_default_properties_is_fips_enabled(
220 self._ffi.NULL
221 )
222 else:
223 mode = getattr(self._lib, "FIPS_mode", lambda: 0)()
225 if mode == 0:
226 # OpenSSL without FIPS pushes an error on the error stack
227 self._lib.ERR_clear_error()
228 return bool(mode)
230 def _enable_fips(self) -> None:
231 # This function enables FIPS mode for OpenSSL 3.0.0 on installs that
232 # have the FIPS provider installed properly.
233 self._binding._enable_fips()
234 assert self._is_fips_enabled()
235 self._fips_enabled = self._is_fips_enabled()
237 def activate_builtin_random(self) -> None:
238 if self._lib.CRYPTOGRAPHY_NEEDS_OSRANDOM_ENGINE:
239 # Obtain a new structural reference.
240 e = self._lib.ENGINE_get_default_RAND()
241 if e != self._ffi.NULL:
242 self._lib.ENGINE_unregister_RAND(e)
243 # Reset the RNG to use the built-in.
244 res = self._lib.RAND_set_rand_method(self._ffi.NULL)
245 self.openssl_assert(res == 1)
246 # decrement the structural reference from get_default_RAND
247 res = self._lib.ENGINE_finish(e)
248 self.openssl_assert(res == 1)
250 @contextlib.contextmanager
251 def _get_osurandom_engine(self):
252 # Fetches an engine by id and returns it. This creates a structural
253 # reference.
254 e = self._lib.ENGINE_by_id(self._lib.Cryptography_osrandom_engine_id)
255 self.openssl_assert(e != self._ffi.NULL)
256 # Initialize the engine for use. This adds a functional reference.
257 res = self._lib.ENGINE_init(e)
258 self.openssl_assert(res == 1)
260 try:
261 yield e
262 finally:
263 # Decrement the structural ref incremented by ENGINE_by_id.
264 res = self._lib.ENGINE_free(e)
265 self.openssl_assert(res == 1)
266 # Decrement the functional ref incremented by ENGINE_init.
267 res = self._lib.ENGINE_finish(e)
268 self.openssl_assert(res == 1)
270 def activate_osrandom_engine(self) -> None:
271 if self._lib.CRYPTOGRAPHY_NEEDS_OSRANDOM_ENGINE:
272 # Unregister and free the current engine.
273 self.activate_builtin_random()
274 with self._get_osurandom_engine() as e:
275 # Set the engine as the default RAND provider.
276 res = self._lib.ENGINE_set_default_RAND(e)
277 self.openssl_assert(res == 1)
278 # Reset the RNG to use the engine
279 res = self._lib.RAND_set_rand_method(self._ffi.NULL)
280 self.openssl_assert(res == 1)
282 def osrandom_engine_implementation(self) -> str:
283 buf = self._ffi.new("char[]", 64)
284 with self._get_osurandom_engine() as e:
285 res = self._lib.ENGINE_ctrl_cmd(
286 e, b"get_implementation", len(buf), buf, self._ffi.NULL, 0
287 )
288 self.openssl_assert(res > 0)
289 return self._ffi.string(buf).decode("ascii")
291 def openssl_version_text(self) -> str:
292 """
293 Friendly string name of the loaded OpenSSL library. This is not
294 necessarily the same version as it was compiled against.
296 Example: OpenSSL 1.1.1d 10 Sep 2019
297 """
298 return self._ffi.string(
299 self._lib.OpenSSL_version(self._lib.OPENSSL_VERSION)
300 ).decode("ascii")
302 def openssl_version_number(self) -> int:
303 return self._lib.OpenSSL_version_num()
305 def create_hmac_ctx(
306 self, key: bytes, algorithm: hashes.HashAlgorithm
307 ) -> _HMACContext:
308 return _HMACContext(self, key, algorithm)
310 def _evp_md_from_algorithm(self, algorithm: hashes.HashAlgorithm):
311 if algorithm.name == "blake2b" or algorithm.name == "blake2s":
312 alg = "{}{}".format(
313 algorithm.name, algorithm.digest_size * 8
314 ).encode("ascii")
315 else:
316 alg = algorithm.name.encode("ascii")
318 evp_md = self._lib.EVP_get_digestbyname(alg)
319 return evp_md
321 def _evp_md_non_null_from_algorithm(self, algorithm: hashes.HashAlgorithm):
322 evp_md = self._evp_md_from_algorithm(algorithm)
323 self.openssl_assert(evp_md != self._ffi.NULL)
324 return evp_md
326 def hash_supported(self, algorithm: hashes.HashAlgorithm) -> bool:
327 if self._fips_enabled and not isinstance(algorithm, self._fips_hashes):
328 return False
330 evp_md = self._evp_md_from_algorithm(algorithm)
331 return evp_md != self._ffi.NULL
333 def signature_hash_supported(
334 self, algorithm: hashes.HashAlgorithm
335 ) -> bool:
336 # Dedicated check for hashing algorithm use in message digest for
337 # signatures, e.g. RSA PKCS#1 v1.5 SHA1 (sha1WithRSAEncryption).
338 if self._fips_enabled and isinstance(algorithm, hashes.SHA1):
339 return False
340 return self.hash_supported(algorithm)
342 def scrypt_supported(self) -> bool:
343 if self._fips_enabled:
344 return False
345 else:
346 return self._lib.Cryptography_HAS_SCRYPT == 1
348 def hmac_supported(self, algorithm: hashes.HashAlgorithm) -> bool:
349 # FIPS mode still allows SHA1 for HMAC
350 if self._fips_enabled and isinstance(algorithm, hashes.SHA1):
351 return True
353 return self.hash_supported(algorithm)
355 def create_hash_ctx(
356 self, algorithm: hashes.HashAlgorithm
357 ) -> hashes.HashContext:
358 return _HashContext(self, algorithm)
360 def cipher_supported(self, cipher: CipherAlgorithm, mode: Mode) -> bool:
361 if self._fips_enabled:
362 # FIPS mode requires AES. TripleDES is disallowed/deprecated in
363 # FIPS 140-3.
364 if not isinstance(cipher, self._fips_ciphers):
365 return False
367 try:
368 adapter = self._cipher_registry[type(cipher), type(mode)]
369 except KeyError:
370 return False
371 evp_cipher = adapter(self, cipher, mode)
372 return self._ffi.NULL != evp_cipher
374 def register_cipher_adapter(self, cipher_cls, mode_cls, adapter):
375 if (cipher_cls, mode_cls) in self._cipher_registry:
376 raise ValueError(
377 "Duplicate registration for: {} {}.".format(
378 cipher_cls, mode_cls
379 )
380 )
381 self._cipher_registry[cipher_cls, mode_cls] = adapter
383 def _register_default_ciphers(self) -> None:
384 for cipher_cls in [AES, AES128, AES256]:
385 for mode_cls in [CBC, CTR, ECB, OFB, CFB, CFB8, GCM]:
386 self.register_cipher_adapter(
387 cipher_cls,
388 mode_cls,
389 GetCipherByName(
390 "{cipher.name}-{cipher.key_size}-{mode.name}"
391 ),
392 )
393 for mode_cls in [CBC, CTR, ECB, OFB, CFB]:
394 self.register_cipher_adapter(
395 Camellia,
396 mode_cls,
397 GetCipherByName("{cipher.name}-{cipher.key_size}-{mode.name}"),
398 )
399 for mode_cls in [CBC, CFB, CFB8, OFB]:
400 self.register_cipher_adapter(
401 TripleDES, mode_cls, GetCipherByName("des-ede3-{mode.name}")
402 )
403 self.register_cipher_adapter(
404 TripleDES, ECB, GetCipherByName("des-ede3")
405 )
406 for mode_cls in [CBC, CFB, OFB, ECB]:
407 self.register_cipher_adapter(
408 _BlowfishInternal, mode_cls, GetCipherByName("bf-{mode.name}")
409 )
410 for mode_cls in [CBC, CFB, OFB, ECB]:
411 self.register_cipher_adapter(
412 _SEEDInternal, mode_cls, GetCipherByName("seed-{mode.name}")
413 )
414 for cipher_cls, mode_cls in itertools.product(
415 [_CAST5Internal, _IDEAInternal],
416 [CBC, OFB, CFB, ECB],
417 ):
418 self.register_cipher_adapter(
419 cipher_cls,
420 mode_cls,
421 GetCipherByName("{cipher.name}-{mode.name}"),
422 )
423 self.register_cipher_adapter(ARC4, type(None), GetCipherByName("rc4"))
424 # We don't actually support RC2, this is just used by some tests.
425 self.register_cipher_adapter(_RC2, type(None), GetCipherByName("rc2"))
426 self.register_cipher_adapter(
427 ChaCha20, type(None), GetCipherByName("chacha20")
428 )
429 self.register_cipher_adapter(AES, XTS, _get_xts_cipher)
430 for mode_cls in [ECB, CBC, OFB, CFB, CTR]:
431 self.register_cipher_adapter(
432 SM4, mode_cls, GetCipherByName("sm4-{mode.name}")
433 )
435 def create_symmetric_encryption_ctx(
436 self, cipher: CipherAlgorithm, mode: Mode
437 ) -> _CipherContext:
438 return _CipherContext(self, cipher, mode, _CipherContext._ENCRYPT)
440 def create_symmetric_decryption_ctx(
441 self, cipher: CipherAlgorithm, mode: Mode
442 ) -> _CipherContext:
443 return _CipherContext(self, cipher, mode, _CipherContext._DECRYPT)
445 def pbkdf2_hmac_supported(self, algorithm: hashes.HashAlgorithm) -> bool:
446 return self.hmac_supported(algorithm)
448 def derive_pbkdf2_hmac(
449 self,
450 algorithm: hashes.HashAlgorithm,
451 length: int,
452 salt: bytes,
453 iterations: int,
454 key_material: bytes,
455 ) -> bytes:
456 buf = self._ffi.new("unsigned char[]", length)
457 evp_md = self._evp_md_non_null_from_algorithm(algorithm)
458 key_material_ptr = self._ffi.from_buffer(key_material)
459 res = self._lib.PKCS5_PBKDF2_HMAC(
460 key_material_ptr,
461 len(key_material),
462 salt,
463 len(salt),
464 iterations,
465 evp_md,
466 length,
467 buf,
468 )
469 self.openssl_assert(res == 1)
470 return self._ffi.buffer(buf)[:]
472 def _consume_errors(self) -> typing.List[binding._OpenSSLError]:
473 return binding._consume_errors(self._lib)
475 def _consume_errors_with_text(
476 self,
477 ) -> typing.List[binding._OpenSSLErrorWithText]:
478 return binding._consume_errors_with_text(self._lib)
480 def _bn_to_int(self, bn) -> int:
481 assert bn != self._ffi.NULL
482 self.openssl_assert(not self._lib.BN_is_negative(bn))
484 bn_num_bytes = self._lib.BN_num_bytes(bn)
485 bin_ptr = self._ffi.new("unsigned char[]", bn_num_bytes)
486 bin_len = self._lib.BN_bn2bin(bn, bin_ptr)
487 # A zero length means the BN has value 0
488 self.openssl_assert(bin_len >= 0)
489 val = int.from_bytes(self._ffi.buffer(bin_ptr)[:bin_len], "big")
490 return val
492 def _int_to_bn(self, num: int, bn=None):
493 """
494 Converts a python integer to a BIGNUM. The returned BIGNUM will not
495 be garbage collected (to support adding them to structs that take
496 ownership of the object). Be sure to register it for GC if it will
497 be discarded after use.
498 """
499 assert bn is None or bn != self._ffi.NULL
501 if bn is None:
502 bn = self._ffi.NULL
504 binary = num.to_bytes(int(num.bit_length() / 8.0 + 1), "big")
505 bn_ptr = self._lib.BN_bin2bn(binary, len(binary), bn)
506 self.openssl_assert(bn_ptr != self._ffi.NULL)
507 return bn_ptr
509 def generate_rsa_private_key(
510 self, public_exponent: int, key_size: int
511 ) -> rsa.RSAPrivateKey:
512 rsa._verify_rsa_parameters(public_exponent, key_size)
514 rsa_cdata = self._lib.RSA_new()
515 self.openssl_assert(rsa_cdata != self._ffi.NULL)
516 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
518 bn = self._int_to_bn(public_exponent)
519 bn = self._ffi.gc(bn, self._lib.BN_free)
521 res = self._lib.RSA_generate_key_ex(
522 rsa_cdata, key_size, bn, self._ffi.NULL
523 )
524 self.openssl_assert(res == 1)
525 evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata)
527 return _RSAPrivateKey(
528 self, rsa_cdata, evp_pkey, self._rsa_skip_check_key
529 )
531 def generate_rsa_parameters_supported(
532 self, public_exponent: int, key_size: int
533 ) -> bool:
534 return (
535 public_exponent >= 3
536 and public_exponent & 1 != 0
537 and key_size >= 512
538 )
540 def load_rsa_private_numbers(
541 self, numbers: rsa.RSAPrivateNumbers
542 ) -> rsa.RSAPrivateKey:
543 rsa._check_private_key_components(
544 numbers.p,
545 numbers.q,
546 numbers.d,
547 numbers.dmp1,
548 numbers.dmq1,
549 numbers.iqmp,
550 numbers.public_numbers.e,
551 numbers.public_numbers.n,
552 )
553 rsa_cdata = self._lib.RSA_new()
554 self.openssl_assert(rsa_cdata != self._ffi.NULL)
555 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
556 p = self._int_to_bn(numbers.p)
557 q = self._int_to_bn(numbers.q)
558 d = self._int_to_bn(numbers.d)
559 dmp1 = self._int_to_bn(numbers.dmp1)
560 dmq1 = self._int_to_bn(numbers.dmq1)
561 iqmp = self._int_to_bn(numbers.iqmp)
562 e = self._int_to_bn(numbers.public_numbers.e)
563 n = self._int_to_bn(numbers.public_numbers.n)
564 res = self._lib.RSA_set0_factors(rsa_cdata, p, q)
565 self.openssl_assert(res == 1)
566 res = self._lib.RSA_set0_key(rsa_cdata, n, e, d)
567 self.openssl_assert(res == 1)
568 res = self._lib.RSA_set0_crt_params(rsa_cdata, dmp1, dmq1, iqmp)
569 self.openssl_assert(res == 1)
570 evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata)
572 return _RSAPrivateKey(
573 self, rsa_cdata, evp_pkey, self._rsa_skip_check_key
574 )
576 def load_rsa_public_numbers(
577 self, numbers: rsa.RSAPublicNumbers
578 ) -> rsa.RSAPublicKey:
579 rsa._check_public_key_components(numbers.e, numbers.n)
580 rsa_cdata = self._lib.RSA_new()
581 self.openssl_assert(rsa_cdata != self._ffi.NULL)
582 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
583 e = self._int_to_bn(numbers.e)
584 n = self._int_to_bn(numbers.n)
585 res = self._lib.RSA_set0_key(rsa_cdata, n, e, self._ffi.NULL)
586 self.openssl_assert(res == 1)
587 evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata)
589 return _RSAPublicKey(self, rsa_cdata, evp_pkey)
591 def _create_evp_pkey_gc(self):
592 evp_pkey = self._lib.EVP_PKEY_new()
593 self.openssl_assert(evp_pkey != self._ffi.NULL)
594 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
595 return evp_pkey
597 def _rsa_cdata_to_evp_pkey(self, rsa_cdata):
598 evp_pkey = self._create_evp_pkey_gc()
599 res = self._lib.EVP_PKEY_set1_RSA(evp_pkey, rsa_cdata)
600 self.openssl_assert(res == 1)
601 return evp_pkey
603 def _bytes_to_bio(self, data: bytes):
604 """
605 Return a _MemoryBIO namedtuple of (BIO, char*).
607 The char* is the storage for the BIO and it must stay alive until the
608 BIO is finished with.
609 """
610 data_ptr = self._ffi.from_buffer(data)
611 bio = self._lib.BIO_new_mem_buf(data_ptr, len(data))
612 self.openssl_assert(bio != self._ffi.NULL)
614 return _MemoryBIO(self._ffi.gc(bio, self._lib.BIO_free), data_ptr)
616 def _create_mem_bio_gc(self):
617 """
618 Creates an empty memory BIO.
619 """
620 bio_method = self._lib.BIO_s_mem()
621 self.openssl_assert(bio_method != self._ffi.NULL)
622 bio = self._lib.BIO_new(bio_method)
623 self.openssl_assert(bio != self._ffi.NULL)
624 bio = self._ffi.gc(bio, self._lib.BIO_free)
625 return bio
627 def _read_mem_bio(self, bio) -> bytes:
628 """
629 Reads a memory BIO. This only works on memory BIOs.
630 """
631 buf = self._ffi.new("char **")
632 buf_len = self._lib.BIO_get_mem_data(bio, buf)
633 self.openssl_assert(buf_len > 0)
634 self.openssl_assert(buf[0] != self._ffi.NULL)
635 bio_data = self._ffi.buffer(buf[0], buf_len)[:]
636 return bio_data
638 def _evp_pkey_to_private_key(self, evp_pkey) -> PRIVATE_KEY_TYPES:
639 """
640 Return the appropriate type of PrivateKey given an evp_pkey cdata
641 pointer.
642 """
644 key_type = self._lib.EVP_PKEY_id(evp_pkey)
646 if key_type == self._lib.EVP_PKEY_RSA:
647 rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey)
648 self.openssl_assert(rsa_cdata != self._ffi.NULL)
649 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
650 return _RSAPrivateKey(
651 self, rsa_cdata, evp_pkey, self._rsa_skip_check_key
652 )
653 elif (
654 key_type == self._lib.EVP_PKEY_RSA_PSS
655 and not self._lib.CRYPTOGRAPHY_IS_LIBRESSL
656 and not self._lib.CRYPTOGRAPHY_IS_BORINGSSL
657 and not self._lib.CRYPTOGRAPHY_OPENSSL_LESS_THAN_111E
658 ):
659 # At the moment the way we handle RSA PSS keys is to strip the
660 # PSS constraints from them and treat them as normal RSA keys
661 # Unfortunately the RSA * itself tracks this data so we need to
662 # extract, serialize, and reload it without the constraints.
663 rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey)
664 self.openssl_assert(rsa_cdata != self._ffi.NULL)
665 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
666 bio = self._create_mem_bio_gc()
667 res = self._lib.i2d_RSAPrivateKey_bio(bio, rsa_cdata)
668 self.openssl_assert(res == 1)
669 return self.load_der_private_key(
670 self._read_mem_bio(bio), password=None
671 )
672 elif key_type == self._lib.EVP_PKEY_DSA:
673 dsa_cdata = self._lib.EVP_PKEY_get1_DSA(evp_pkey)
674 self.openssl_assert(dsa_cdata != self._ffi.NULL)
675 dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free)
676 return _DSAPrivateKey(self, dsa_cdata, evp_pkey)
677 elif key_type == self._lib.EVP_PKEY_EC:
678 ec_cdata = self._lib.EVP_PKEY_get1_EC_KEY(evp_pkey)
679 self.openssl_assert(ec_cdata != self._ffi.NULL)
680 ec_cdata = self._ffi.gc(ec_cdata, self._lib.EC_KEY_free)
681 return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey)
682 elif key_type in self._dh_types:
683 dh_cdata = self._lib.EVP_PKEY_get1_DH(evp_pkey)
684 self.openssl_assert(dh_cdata != self._ffi.NULL)
685 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
686 return _DHPrivateKey(self, dh_cdata, evp_pkey)
687 elif key_type == getattr(self._lib, "EVP_PKEY_ED25519", None):
688 # EVP_PKEY_ED25519 is not present in OpenSSL < 1.1.1
689 return _Ed25519PrivateKey(self, evp_pkey)
690 elif key_type == getattr(self._lib, "EVP_PKEY_X448", None):
691 # EVP_PKEY_X448 is not present in OpenSSL < 1.1.1
692 return _X448PrivateKey(self, evp_pkey)
693 elif key_type == getattr(self._lib, "EVP_PKEY_X25519", None):
694 # EVP_PKEY_X25519 is not present in OpenSSL < 1.1.0
695 return _X25519PrivateKey(self, evp_pkey)
696 elif key_type == getattr(self._lib, "EVP_PKEY_ED448", None):
697 # EVP_PKEY_ED448 is not present in OpenSSL < 1.1.1
698 return _Ed448PrivateKey(self, evp_pkey)
699 else:
700 raise UnsupportedAlgorithm("Unsupported key type.")
702 def _evp_pkey_to_public_key(self, evp_pkey) -> PUBLIC_KEY_TYPES:
703 """
704 Return the appropriate type of PublicKey given an evp_pkey cdata
705 pointer.
706 """
708 key_type = self._lib.EVP_PKEY_id(evp_pkey)
710 if key_type == self._lib.EVP_PKEY_RSA:
711 rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey)
712 self.openssl_assert(rsa_cdata != self._ffi.NULL)
713 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
714 return _RSAPublicKey(self, rsa_cdata, evp_pkey)
715 elif (
716 key_type == self._lib.EVP_PKEY_RSA_PSS
717 and not self._lib.CRYPTOGRAPHY_IS_LIBRESSL
718 and not self._lib.CRYPTOGRAPHY_IS_BORINGSSL
719 and not self._lib.CRYPTOGRAPHY_OPENSSL_LESS_THAN_111E
720 ):
721 rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey)
722 self.openssl_assert(rsa_cdata != self._ffi.NULL)
723 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
724 bio = self._create_mem_bio_gc()
725 res = self._lib.i2d_RSAPublicKey_bio(bio, rsa_cdata)
726 self.openssl_assert(res == 1)
727 return self.load_der_public_key(self._read_mem_bio(bio))
728 elif key_type == self._lib.EVP_PKEY_DSA:
729 dsa_cdata = self._lib.EVP_PKEY_get1_DSA(evp_pkey)
730 self.openssl_assert(dsa_cdata != self._ffi.NULL)
731 dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free)
732 return _DSAPublicKey(self, dsa_cdata, evp_pkey)
733 elif key_type == self._lib.EVP_PKEY_EC:
734 ec_cdata = self._lib.EVP_PKEY_get1_EC_KEY(evp_pkey)
735 if ec_cdata == self._ffi.NULL:
736 errors = self._consume_errors_with_text()
737 raise ValueError("Unable to load EC key", errors)
738 ec_cdata = self._ffi.gc(ec_cdata, self._lib.EC_KEY_free)
739 return _EllipticCurvePublicKey(self, ec_cdata, evp_pkey)
740 elif key_type in self._dh_types:
741 dh_cdata = self._lib.EVP_PKEY_get1_DH(evp_pkey)
742 self.openssl_assert(dh_cdata != self._ffi.NULL)
743 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
744 return _DHPublicKey(self, dh_cdata, evp_pkey)
745 elif key_type == getattr(self._lib, "EVP_PKEY_ED25519", None):
746 # EVP_PKEY_ED25519 is not present in OpenSSL < 1.1.1
747 return _Ed25519PublicKey(self, evp_pkey)
748 elif key_type == getattr(self._lib, "EVP_PKEY_X448", None):
749 # EVP_PKEY_X448 is not present in OpenSSL < 1.1.1
750 return _X448PublicKey(self, evp_pkey)
751 elif key_type == getattr(self._lib, "EVP_PKEY_X25519", None):
752 # EVP_PKEY_X25519 is not present in OpenSSL < 1.1.0
753 return _X25519PublicKey(self, evp_pkey)
754 elif key_type == getattr(self._lib, "EVP_PKEY_ED448", None):
755 # EVP_PKEY_X25519 is not present in OpenSSL < 1.1.1
756 return _Ed448PublicKey(self, evp_pkey)
757 else:
758 raise UnsupportedAlgorithm("Unsupported key type.")
760 def _oaep_hash_supported(self, algorithm: hashes.HashAlgorithm) -> bool:
761 return isinstance(
762 algorithm,
763 (
764 hashes.SHA1,
765 hashes.SHA224,
766 hashes.SHA256,
767 hashes.SHA384,
768 hashes.SHA512,
769 ),
770 )
772 def rsa_padding_supported(self, padding: AsymmetricPadding) -> bool:
773 if isinstance(padding, PKCS1v15):
774 return True
775 elif isinstance(padding, PSS) and isinstance(padding._mgf, MGF1):
776 # SHA1 is permissible in MGF1 in FIPS even when SHA1 is blocked
777 # as signature algorithm.
778 if self._fips_enabled and isinstance(
779 padding._mgf._algorithm, hashes.SHA1
780 ):
781 return True
782 else:
783 return self.hash_supported(padding._mgf._algorithm)
784 elif isinstance(padding, OAEP) and isinstance(padding._mgf, MGF1):
785 return self._oaep_hash_supported(
786 padding._mgf._algorithm
787 ) and self._oaep_hash_supported(padding._algorithm)
788 else:
789 return False
791 def generate_dsa_parameters(self, key_size: int) -> dsa.DSAParameters:
792 if key_size not in (1024, 2048, 3072, 4096):
793 raise ValueError(
794 "Key size must be 1024, 2048, 3072, or 4096 bits."
795 )
797 ctx = self._lib.DSA_new()
798 self.openssl_assert(ctx != self._ffi.NULL)
799 ctx = self._ffi.gc(ctx, self._lib.DSA_free)
801 res = self._lib.DSA_generate_parameters_ex(
802 ctx,
803 key_size,
804 self._ffi.NULL,
805 0,
806 self._ffi.NULL,
807 self._ffi.NULL,
808 self._ffi.NULL,
809 )
811 self.openssl_assert(res == 1)
813 return _DSAParameters(self, ctx)
815 def generate_dsa_private_key(
816 self, parameters: dsa.DSAParameters
817 ) -> dsa.DSAPrivateKey:
818 ctx = self._lib.DSAparams_dup(
819 parameters._dsa_cdata # type: ignore[attr-defined]
820 )
821 self.openssl_assert(ctx != self._ffi.NULL)
822 ctx = self._ffi.gc(ctx, self._lib.DSA_free)
823 self._lib.DSA_generate_key(ctx)
824 evp_pkey = self._dsa_cdata_to_evp_pkey(ctx)
826 return _DSAPrivateKey(self, ctx, evp_pkey)
828 def generate_dsa_private_key_and_parameters(
829 self, key_size: int
830 ) -> dsa.DSAPrivateKey:
831 parameters = self.generate_dsa_parameters(key_size)
832 return self.generate_dsa_private_key(parameters)
834 def _dsa_cdata_set_values(self, dsa_cdata, p, q, g, pub_key, priv_key):
835 res = self._lib.DSA_set0_pqg(dsa_cdata, p, q, g)
836 self.openssl_assert(res == 1)
837 res = self._lib.DSA_set0_key(dsa_cdata, pub_key, priv_key)
838 self.openssl_assert(res == 1)
840 def load_dsa_private_numbers(
841 self, numbers: dsa.DSAPrivateNumbers
842 ) -> dsa.DSAPrivateKey:
843 dsa._check_dsa_private_numbers(numbers)
844 parameter_numbers = numbers.public_numbers.parameter_numbers
846 dsa_cdata = self._lib.DSA_new()
847 self.openssl_assert(dsa_cdata != self._ffi.NULL)
848 dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free)
850 p = self._int_to_bn(parameter_numbers.p)
851 q = self._int_to_bn(parameter_numbers.q)
852 g = self._int_to_bn(parameter_numbers.g)
853 pub_key = self._int_to_bn(numbers.public_numbers.y)
854 priv_key = self._int_to_bn(numbers.x)
855 self._dsa_cdata_set_values(dsa_cdata, p, q, g, pub_key, priv_key)
857 evp_pkey = self._dsa_cdata_to_evp_pkey(dsa_cdata)
859 return _DSAPrivateKey(self, dsa_cdata, evp_pkey)
861 def load_dsa_public_numbers(
862 self, numbers: dsa.DSAPublicNumbers
863 ) -> dsa.DSAPublicKey:
864 dsa._check_dsa_parameters(numbers.parameter_numbers)
865 dsa_cdata = self._lib.DSA_new()
866 self.openssl_assert(dsa_cdata != self._ffi.NULL)
867 dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free)
869 p = self._int_to_bn(numbers.parameter_numbers.p)
870 q = self._int_to_bn(numbers.parameter_numbers.q)
871 g = self._int_to_bn(numbers.parameter_numbers.g)
872 pub_key = self._int_to_bn(numbers.y)
873 priv_key = self._ffi.NULL
874 self._dsa_cdata_set_values(dsa_cdata, p, q, g, pub_key, priv_key)
876 evp_pkey = self._dsa_cdata_to_evp_pkey(dsa_cdata)
878 return _DSAPublicKey(self, dsa_cdata, evp_pkey)
880 def load_dsa_parameter_numbers(
881 self, numbers: dsa.DSAParameterNumbers
882 ) -> dsa.DSAParameters:
883 dsa._check_dsa_parameters(numbers)
884 dsa_cdata = self._lib.DSA_new()
885 self.openssl_assert(dsa_cdata != self._ffi.NULL)
886 dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free)
888 p = self._int_to_bn(numbers.p)
889 q = self._int_to_bn(numbers.q)
890 g = self._int_to_bn(numbers.g)
891 res = self._lib.DSA_set0_pqg(dsa_cdata, p, q, g)
892 self.openssl_assert(res == 1)
894 return _DSAParameters(self, dsa_cdata)
896 def _dsa_cdata_to_evp_pkey(self, dsa_cdata):
897 evp_pkey = self._create_evp_pkey_gc()
898 res = self._lib.EVP_PKEY_set1_DSA(evp_pkey, dsa_cdata)
899 self.openssl_assert(res == 1)
900 return evp_pkey
902 def dsa_supported(self) -> bool:
903 return not self._fips_enabled
905 def dsa_hash_supported(self, algorithm: hashes.HashAlgorithm) -> bool:
906 if not self.dsa_supported():
907 return False
908 return self.signature_hash_supported(algorithm)
910 def cmac_algorithm_supported(self, algorithm) -> bool:
911 return self.cipher_supported(
912 algorithm, CBC(b"\x00" * algorithm.block_size)
913 )
915 def create_cmac_ctx(self, algorithm: BlockCipherAlgorithm) -> _CMACContext:
916 return _CMACContext(self, algorithm)
918 def load_pem_private_key(
919 self, data: bytes, password: typing.Optional[bytes]
920 ) -> PRIVATE_KEY_TYPES:
921 return self._load_key(
922 self._lib.PEM_read_bio_PrivateKey,
923 self._evp_pkey_to_private_key,
924 data,
925 password,
926 )
928 def load_pem_public_key(self, data: bytes) -> PUBLIC_KEY_TYPES:
929 mem_bio = self._bytes_to_bio(data)
930 # In OpenSSL 3.0.x the PEM_read_bio_PUBKEY function will invoke
931 # the default password callback if you pass an encrypted private
932 # key. This is very, very, very bad as the default callback can
933 # trigger an interactive console prompt, which will hang the
934 # Python process. We therefore provide our own callback to
935 # catch this and error out properly.
936 userdata = self._ffi.new("CRYPTOGRAPHY_PASSWORD_DATA *")
937 evp_pkey = self._lib.PEM_read_bio_PUBKEY(
938 mem_bio.bio,
939 self._ffi.NULL,
940 self._ffi.addressof(
941 self._lib._original_lib, "Cryptography_pem_password_cb"
942 ),
943 userdata,
944 )
945 if evp_pkey != self._ffi.NULL:
946 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
947 return self._evp_pkey_to_public_key(evp_pkey)
948 else:
949 # It's not a (RSA/DSA/ECDSA) subjectPublicKeyInfo, but we still
950 # need to check to see if it is a pure PKCS1 RSA public key (not
951 # embedded in a subjectPublicKeyInfo)
952 self._consume_errors()
953 res = self._lib.BIO_reset(mem_bio.bio)
954 self.openssl_assert(res == 1)
955 rsa_cdata = self._lib.PEM_read_bio_RSAPublicKey(
956 mem_bio.bio,
957 self._ffi.NULL,
958 self._ffi.addressof(
959 self._lib._original_lib, "Cryptography_pem_password_cb"
960 ),
961 userdata,
962 )
963 if rsa_cdata != self._ffi.NULL:
964 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
965 evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata)
966 return _RSAPublicKey(self, rsa_cdata, evp_pkey)
967 else:
968 self._handle_key_loading_error()
970 def load_pem_parameters(self, data: bytes) -> dh.DHParameters:
971 mem_bio = self._bytes_to_bio(data)
972 # only DH is supported currently
973 dh_cdata = self._lib.PEM_read_bio_DHparams(
974 mem_bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL
975 )
976 if dh_cdata != self._ffi.NULL:
977 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
978 return _DHParameters(self, dh_cdata)
979 else:
980 self._handle_key_loading_error()
982 def load_der_private_key(
983 self, data: bytes, password: typing.Optional[bytes]
984 ) -> PRIVATE_KEY_TYPES:
985 # OpenSSL has a function called d2i_AutoPrivateKey that in theory
986 # handles this automatically, however it doesn't handle encrypted
987 # private keys. Instead we try to load the key two different ways.
988 # First we'll try to load it as a traditional key.
989 bio_data = self._bytes_to_bio(data)
990 key = self._evp_pkey_from_der_traditional_key(bio_data, password)
991 if key:
992 return self._evp_pkey_to_private_key(key)
993 else:
994 # Finally we try to load it with the method that handles encrypted
995 # PKCS8 properly.
996 return self._load_key(
997 self._lib.d2i_PKCS8PrivateKey_bio,
998 self._evp_pkey_to_private_key,
999 data,
1000 password,
1001 )
1003 def _evp_pkey_from_der_traditional_key(self, bio_data, password):
1004 key = self._lib.d2i_PrivateKey_bio(bio_data.bio, self._ffi.NULL)
1005 if key != self._ffi.NULL:
1006 # In OpenSSL 3.0.0-alpha15 there exist scenarios where the key will
1007 # successfully load but errors are still put on the stack. Tracked
1008 # as https://github.com/openssl/openssl/issues/14996
1009 self._consume_errors()
1011 key = self._ffi.gc(key, self._lib.EVP_PKEY_free)
1012 if password is not None:
1013 raise TypeError(
1014 "Password was given but private key is not encrypted."
1015 )
1017 return key
1018 else:
1019 self._consume_errors()
1020 return None
1022 def load_der_public_key(self, data: bytes) -> PUBLIC_KEY_TYPES:
1023 mem_bio = self._bytes_to_bio(data)
1024 evp_pkey = self._lib.d2i_PUBKEY_bio(mem_bio.bio, self._ffi.NULL)
1025 if evp_pkey != self._ffi.NULL:
1026 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
1027 return self._evp_pkey_to_public_key(evp_pkey)
1028 else:
1029 # It's not a (RSA/DSA/ECDSA) subjectPublicKeyInfo, but we still
1030 # need to check to see if it is a pure PKCS1 RSA public key (not
1031 # embedded in a subjectPublicKeyInfo)
1032 self._consume_errors()
1033 res = self._lib.BIO_reset(mem_bio.bio)
1034 self.openssl_assert(res == 1)
1035 rsa_cdata = self._lib.d2i_RSAPublicKey_bio(
1036 mem_bio.bio, self._ffi.NULL
1037 )
1038 if rsa_cdata != self._ffi.NULL:
1039 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
1040 evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata)
1041 return _RSAPublicKey(self, rsa_cdata, evp_pkey)
1042 else:
1043 self._handle_key_loading_error()
1045 def load_der_parameters(self, data: bytes) -> dh.DHParameters:
1046 mem_bio = self._bytes_to_bio(data)
1047 dh_cdata = self._lib.d2i_DHparams_bio(mem_bio.bio, self._ffi.NULL)
1048 if dh_cdata != self._ffi.NULL:
1049 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
1050 return _DHParameters(self, dh_cdata)
1051 elif self._lib.Cryptography_HAS_EVP_PKEY_DHX:
1052 # We check to see if the is dhx.
1053 self._consume_errors()
1054 res = self._lib.BIO_reset(mem_bio.bio)
1055 self.openssl_assert(res == 1)
1056 dh_cdata = self._lib.Cryptography_d2i_DHxparams_bio(
1057 mem_bio.bio, self._ffi.NULL
1058 )
1059 if dh_cdata != self._ffi.NULL:
1060 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
1061 return _DHParameters(self, dh_cdata)
1063 self._handle_key_loading_error()
1065 def _cert2ossl(self, cert: x509.Certificate) -> typing.Any:
1066 data = cert.public_bytes(serialization.Encoding.DER)
1067 mem_bio = self._bytes_to_bio(data)
1068 x509 = self._lib.d2i_X509_bio(mem_bio.bio, self._ffi.NULL)
1069 self.openssl_assert(x509 != self._ffi.NULL)
1070 x509 = self._ffi.gc(x509, self._lib.X509_free)
1071 return x509
1073 def _ossl2cert(self, x509: typing.Any) -> x509.Certificate:
1074 bio = self._create_mem_bio_gc()
1075 res = self._lib.i2d_X509_bio(bio, x509)
1076 self.openssl_assert(res == 1)
1077 return rust_x509.load_der_x509_certificate(self._read_mem_bio(bio))
1079 def _csr2ossl(self, csr: x509.CertificateSigningRequest) -> typing.Any:
1080 data = csr.public_bytes(serialization.Encoding.DER)
1081 mem_bio = self._bytes_to_bio(data)
1082 x509_req = self._lib.d2i_X509_REQ_bio(mem_bio.bio, self._ffi.NULL)
1083 self.openssl_assert(x509_req != self._ffi.NULL)
1084 x509_req = self._ffi.gc(x509_req, self._lib.X509_REQ_free)
1085 return x509_req
1087 def _ossl2csr(
1088 self, x509_req: typing.Any
1089 ) -> x509.CertificateSigningRequest:
1090 bio = self._create_mem_bio_gc()
1091 res = self._lib.i2d_X509_REQ_bio(bio, x509_req)
1092 self.openssl_assert(res == 1)
1093 return rust_x509.load_der_x509_csr(self._read_mem_bio(bio))
1095 def _crl2ossl(self, crl: x509.CertificateRevocationList) -> typing.Any:
1096 data = crl.public_bytes(serialization.Encoding.DER)
1097 mem_bio = self._bytes_to_bio(data)
1098 x509_crl = self._lib.d2i_X509_CRL_bio(mem_bio.bio, self._ffi.NULL)
1099 self.openssl_assert(x509_crl != self._ffi.NULL)
1100 x509_crl = self._ffi.gc(x509_crl, self._lib.X509_CRL_free)
1101 return x509_crl
1103 def _ossl2crl(
1104 self, x509_crl: typing.Any
1105 ) -> x509.CertificateRevocationList:
1106 bio = self._create_mem_bio_gc()
1107 res = self._lib.i2d_X509_CRL_bio(bio, x509_crl)
1108 self.openssl_assert(res == 1)
1109 return rust_x509.load_der_x509_crl(self._read_mem_bio(bio))
1111 def _crl_is_signature_valid(
1112 self,
1113 crl: x509.CertificateRevocationList,
1114 public_key: CERTIFICATE_ISSUER_PUBLIC_KEY_TYPES,
1115 ) -> bool:
1116 if not isinstance(
1117 public_key,
1118 (
1119 _DSAPublicKey,
1120 _RSAPublicKey,
1121 _EllipticCurvePublicKey,
1122 ),
1123 ):
1124 raise TypeError(
1125 "Expecting one of DSAPublicKey, RSAPublicKey,"
1126 " or EllipticCurvePublicKey."
1127 )
1128 x509_crl = self._crl2ossl(crl)
1129 res = self._lib.X509_CRL_verify(x509_crl, public_key._evp_pkey)
1131 if res != 1:
1132 self._consume_errors()
1133 return False
1135 return True
1137 def _csr_is_signature_valid(
1138 self, csr: x509.CertificateSigningRequest
1139 ) -> bool:
1140 x509_req = self._csr2ossl(csr)
1141 pkey = self._lib.X509_REQ_get_pubkey(x509_req)
1142 self.openssl_assert(pkey != self._ffi.NULL)
1143 pkey = self._ffi.gc(pkey, self._lib.EVP_PKEY_free)
1144 res = self._lib.X509_REQ_verify(x509_req, pkey)
1146 if res != 1:
1147 self._consume_errors()
1148 return False
1150 return True
1152 def _check_keys_correspond(self, key1, key2):
1153 if self._lib.EVP_PKEY_cmp(key1._evp_pkey, key2._evp_pkey) != 1:
1154 raise ValueError("Keys do not correspond")
1156 def _load_key(self, openssl_read_func, convert_func, data, password):
1157 mem_bio = self._bytes_to_bio(data)
1159 userdata = self._ffi.new("CRYPTOGRAPHY_PASSWORD_DATA *")
1160 if password is not None:
1161 utils._check_byteslike("password", password)
1162 password_ptr = self._ffi.from_buffer(password)
1163 userdata.password = password_ptr
1164 userdata.length = len(password)
1166 evp_pkey = openssl_read_func(
1167 mem_bio.bio,
1168 self._ffi.NULL,
1169 self._ffi.addressof(
1170 self._lib._original_lib, "Cryptography_pem_password_cb"
1171 ),
1172 userdata,
1173 )
1175 if evp_pkey == self._ffi.NULL:
1176 if userdata.error != 0:
1177 self._consume_errors()
1178 if userdata.error == -1:
1179 raise TypeError(
1180 "Password was not given but private key is encrypted"
1181 )
1182 else:
1183 assert userdata.error == -2
1184 raise ValueError(
1185 "Passwords longer than {} bytes are not supported "
1186 "by this backend.".format(userdata.maxsize - 1)
1187 )
1188 else:
1189 self._handle_key_loading_error()
1191 # In OpenSSL 3.0.0-alpha15 there exist scenarios where the key will
1192 # successfully load but errors are still put on the stack. Tracked
1193 # as https://github.com/openssl/openssl/issues/14996
1194 self._consume_errors()
1196 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
1198 if password is not None and userdata.called == 0:
1199 raise TypeError(
1200 "Password was given but private key is not encrypted."
1201 )
1203 assert (
1204 password is not None and userdata.called == 1
1205 ) or password is None
1207 return convert_func(evp_pkey)
1209 def _handle_key_loading_error(self) -> typing.NoReturn:
1210 errors = self._consume_errors()
1212 if not errors:
1213 raise ValueError(
1214 "Could not deserialize key data. The data may be in an "
1215 "incorrect format or it may be encrypted with an unsupported "
1216 "algorithm."
1217 )
1219 elif (
1220 errors[0]._lib_reason_match(
1221 self._lib.ERR_LIB_EVP, self._lib.EVP_R_BAD_DECRYPT
1222 )
1223 or errors[0]._lib_reason_match(
1224 self._lib.ERR_LIB_PKCS12,
1225 self._lib.PKCS12_R_PKCS12_CIPHERFINAL_ERROR,
1226 )
1227 or (
1228 self._lib.Cryptography_HAS_PROVIDERS
1229 and errors[0]._lib_reason_match(
1230 self._lib.ERR_LIB_PROV,
1231 self._lib.PROV_R_BAD_DECRYPT,
1232 )
1233 )
1234 ):
1235 raise ValueError("Bad decrypt. Incorrect password?")
1237 elif any(
1238 error._lib_reason_match(
1239 self._lib.ERR_LIB_EVP,
1240 self._lib.EVP_R_UNSUPPORTED_PRIVATE_KEY_ALGORITHM,
1241 )
1242 for error in errors
1243 ):
1244 raise ValueError("Unsupported public key algorithm.")
1246 else:
1247 errors_with_text = binding._errors_with_text(errors)
1248 raise ValueError(
1249 "Could not deserialize key data. The data may be in an "
1250 "incorrect format, it may be encrypted with an unsupported "
1251 "algorithm, or it may be an unsupported key type (e.g. EC "
1252 "curves with explicit parameters).",
1253 errors_with_text,
1254 )
1256 def elliptic_curve_supported(self, curve: ec.EllipticCurve) -> bool:
1257 try:
1258 curve_nid = self._elliptic_curve_to_nid(curve)
1259 except UnsupportedAlgorithm:
1260 curve_nid = self._lib.NID_undef
1262 group = self._lib.EC_GROUP_new_by_curve_name(curve_nid)
1264 if group == self._ffi.NULL:
1265 self._consume_errors()
1266 return False
1267 else:
1268 self.openssl_assert(curve_nid != self._lib.NID_undef)
1269 self._lib.EC_GROUP_free(group)
1270 return True
1272 def elliptic_curve_signature_algorithm_supported(
1273 self,
1274 signature_algorithm: ec.EllipticCurveSignatureAlgorithm,
1275 curve: ec.EllipticCurve,
1276 ) -> bool:
1277 # We only support ECDSA right now.
1278 if not isinstance(signature_algorithm, ec.ECDSA):
1279 return False
1281 return self.elliptic_curve_supported(curve)
1283 def generate_elliptic_curve_private_key(
1284 self, curve: ec.EllipticCurve
1285 ) -> ec.EllipticCurvePrivateKey:
1286 """
1287 Generate a new private key on the named curve.
1288 """
1290 if self.elliptic_curve_supported(curve):
1291 ec_cdata = self._ec_key_new_by_curve(curve)
1293 res = self._lib.EC_KEY_generate_key(ec_cdata)
1294 self.openssl_assert(res == 1)
1296 evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata)
1298 return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey)
1299 else:
1300 raise UnsupportedAlgorithm(
1301 "Backend object does not support {}.".format(curve.name),
1302 _Reasons.UNSUPPORTED_ELLIPTIC_CURVE,
1303 )
1305 def load_elliptic_curve_private_numbers(
1306 self, numbers: ec.EllipticCurvePrivateNumbers
1307 ) -> ec.EllipticCurvePrivateKey:
1308 public = numbers.public_numbers
1310 ec_cdata = self._ec_key_new_by_curve(public.curve)
1312 private_value = self._ffi.gc(
1313 self._int_to_bn(numbers.private_value), self._lib.BN_clear_free
1314 )
1315 res = self._lib.EC_KEY_set_private_key(ec_cdata, private_value)
1316 if res != 1:
1317 self._consume_errors()
1318 raise ValueError("Invalid EC key.")
1320 self._ec_key_set_public_key_affine_coordinates(
1321 ec_cdata, public.x, public.y
1322 )
1324 evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata)
1326 return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey)
1328 def load_elliptic_curve_public_numbers(
1329 self, numbers: ec.EllipticCurvePublicNumbers
1330 ) -> ec.EllipticCurvePublicKey:
1331 ec_cdata = self._ec_key_new_by_curve(numbers.curve)
1332 self._ec_key_set_public_key_affine_coordinates(
1333 ec_cdata, numbers.x, numbers.y
1334 )
1335 evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata)
1337 return _EllipticCurvePublicKey(self, ec_cdata, evp_pkey)
1339 def load_elliptic_curve_public_bytes(
1340 self, curve: ec.EllipticCurve, point_bytes: bytes
1341 ) -> ec.EllipticCurvePublicKey:
1342 ec_cdata = self._ec_key_new_by_curve(curve)
1343 group = self._lib.EC_KEY_get0_group(ec_cdata)
1344 self.openssl_assert(group != self._ffi.NULL)
1345 point = self._lib.EC_POINT_new(group)
1346 self.openssl_assert(point != self._ffi.NULL)
1347 point = self._ffi.gc(point, self._lib.EC_POINT_free)
1348 with self._tmp_bn_ctx() as bn_ctx:
1349 res = self._lib.EC_POINT_oct2point(
1350 group, point, point_bytes, len(point_bytes), bn_ctx
1351 )
1352 if res != 1:
1353 self._consume_errors()
1354 raise ValueError("Invalid public bytes for the given curve")
1356 res = self._lib.EC_KEY_set_public_key(ec_cdata, point)
1357 self.openssl_assert(res == 1)
1358 evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata)
1359 return _EllipticCurvePublicKey(self, ec_cdata, evp_pkey)
1361 def derive_elliptic_curve_private_key(
1362 self, private_value: int, curve: ec.EllipticCurve
1363 ) -> ec.EllipticCurvePrivateKey:
1364 ec_cdata = self._ec_key_new_by_curve(curve)
1366 get_func, group = self._ec_key_determine_group_get_func(ec_cdata)
1368 point = self._lib.EC_POINT_new(group)
1369 self.openssl_assert(point != self._ffi.NULL)
1370 point = self._ffi.gc(point, self._lib.EC_POINT_free)
1372 value = self._int_to_bn(private_value)
1373 value = self._ffi.gc(value, self._lib.BN_clear_free)
1375 with self._tmp_bn_ctx() as bn_ctx:
1376 res = self._lib.EC_POINT_mul(
1377 group, point, value, self._ffi.NULL, self._ffi.NULL, bn_ctx
1378 )
1379 self.openssl_assert(res == 1)
1381 bn_x = self._lib.BN_CTX_get(bn_ctx)
1382 bn_y = self._lib.BN_CTX_get(bn_ctx)
1384 res = get_func(group, point, bn_x, bn_y, bn_ctx)
1385 if res != 1:
1386 self._consume_errors()
1387 raise ValueError("Unable to derive key from private_value")
1389 res = self._lib.EC_KEY_set_public_key(ec_cdata, point)
1390 self.openssl_assert(res == 1)
1391 private = self._int_to_bn(private_value)
1392 private = self._ffi.gc(private, self._lib.BN_clear_free)
1393 res = self._lib.EC_KEY_set_private_key(ec_cdata, private)
1394 self.openssl_assert(res == 1)
1396 evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata)
1398 return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey)
1400 def _ec_key_new_by_curve(self, curve: ec.EllipticCurve):
1401 curve_nid = self._elliptic_curve_to_nid(curve)
1402 return self._ec_key_new_by_curve_nid(curve_nid)
1404 def _ec_key_new_by_curve_nid(self, curve_nid: int):
1405 ec_cdata = self._lib.EC_KEY_new_by_curve_name(curve_nid)
1406 self.openssl_assert(ec_cdata != self._ffi.NULL)
1407 return self._ffi.gc(ec_cdata, self._lib.EC_KEY_free)
1409 def elliptic_curve_exchange_algorithm_supported(
1410 self, algorithm: ec.ECDH, curve: ec.EllipticCurve
1411 ) -> bool:
1412 if self._fips_enabled and not isinstance(
1413 curve, self._fips_ecdh_curves
1414 ):
1415 return False
1417 return self.elliptic_curve_supported(curve) and isinstance(
1418 algorithm, ec.ECDH
1419 )
1421 def _ec_cdata_to_evp_pkey(self, ec_cdata):
1422 evp_pkey = self._create_evp_pkey_gc()
1423 res = self._lib.EVP_PKEY_set1_EC_KEY(evp_pkey, ec_cdata)
1424 self.openssl_assert(res == 1)
1425 return evp_pkey
1427 def _elliptic_curve_to_nid(self, curve: ec.EllipticCurve) -> int:
1428 """
1429 Get the NID for a curve name.
1430 """
1432 curve_aliases = {"secp192r1": "prime192v1", "secp256r1": "prime256v1"}
1434 curve_name = curve_aliases.get(curve.name, curve.name)
1436 curve_nid = self._lib.OBJ_sn2nid(curve_name.encode())
1437 if curve_nid == self._lib.NID_undef:
1438 raise UnsupportedAlgorithm(
1439 "{} is not a supported elliptic curve".format(curve.name),
1440 _Reasons.UNSUPPORTED_ELLIPTIC_CURVE,
1441 )
1442 return curve_nid
1444 @contextmanager
1445 def _tmp_bn_ctx(self):
1446 bn_ctx = self._lib.BN_CTX_new()
1447 self.openssl_assert(bn_ctx != self._ffi.NULL)
1448 bn_ctx = self._ffi.gc(bn_ctx, self._lib.BN_CTX_free)
1449 self._lib.BN_CTX_start(bn_ctx)
1450 try:
1451 yield bn_ctx
1452 finally:
1453 self._lib.BN_CTX_end(bn_ctx)
1455 def _ec_key_determine_group_get_func(self, ctx):
1456 """
1457 Given an EC_KEY determine the group and what function is required to
1458 get point coordinates.
1459 """
1460 self.openssl_assert(ctx != self._ffi.NULL)
1462 nid_two_field = self._lib.OBJ_sn2nid(b"characteristic-two-field")
1463 self.openssl_assert(nid_two_field != self._lib.NID_undef)
1465 group = self._lib.EC_KEY_get0_group(ctx)
1466 self.openssl_assert(group != self._ffi.NULL)
1468 method = self._lib.EC_GROUP_method_of(group)
1469 self.openssl_assert(method != self._ffi.NULL)
1471 nid = self._lib.EC_METHOD_get_field_type(method)
1472 self.openssl_assert(nid != self._lib.NID_undef)
1474 if nid == nid_two_field and self._lib.Cryptography_HAS_EC2M:
1475 get_func = self._lib.EC_POINT_get_affine_coordinates_GF2m
1476 else:
1477 get_func = self._lib.EC_POINT_get_affine_coordinates_GFp
1479 assert get_func
1481 return get_func, group
1483 def _ec_key_set_public_key_affine_coordinates(self, ctx, x: int, y: int):
1484 """
1485 Sets the public key point in the EC_KEY context to the affine x and y
1486 values.
1487 """
1489 if x < 0 or y < 0:
1490 raise ValueError(
1491 "Invalid EC key. Both x and y must be non-negative."
1492 )
1494 x = self._ffi.gc(self._int_to_bn(x), self._lib.BN_free)
1495 y = self._ffi.gc(self._int_to_bn(y), self._lib.BN_free)
1496 res = self._lib.EC_KEY_set_public_key_affine_coordinates(ctx, x, y)
1497 if res != 1:
1498 self._consume_errors()
1499 raise ValueError("Invalid EC key.")
1501 def _private_key_bytes(
1502 self,
1503 encoding: serialization.Encoding,
1504 format: serialization.PrivateFormat,
1505 encryption_algorithm: serialization.KeySerializationEncryption,
1506 key,
1507 evp_pkey,
1508 cdata,
1509 ) -> bytes:
1510 # validate argument types
1511 if not isinstance(encoding, serialization.Encoding):
1512 raise TypeError("encoding must be an item from the Encoding enum")
1513 if not isinstance(format, serialization.PrivateFormat):
1514 raise TypeError(
1515 "format must be an item from the PrivateFormat enum"
1516 )
1517 if not isinstance(
1518 encryption_algorithm, serialization.KeySerializationEncryption
1519 ):
1520 raise TypeError(
1521 "Encryption algorithm must be a KeySerializationEncryption "
1522 "instance"
1523 )
1525 # validate password
1526 if isinstance(encryption_algorithm, serialization.NoEncryption):
1527 password = b""
1528 elif isinstance(
1529 encryption_algorithm, serialization.BestAvailableEncryption
1530 ):
1531 password = encryption_algorithm.password
1532 if len(password) > 1023:
1533 raise ValueError(
1534 "Passwords longer than 1023 bytes are not supported by "
1535 "this backend"
1536 )
1537 elif (
1538 isinstance(
1539 encryption_algorithm, serialization._KeySerializationEncryption
1540 )
1541 and encryption_algorithm._format
1542 is format
1543 is serialization.PrivateFormat.OpenSSH
1544 ):
1545 password = encryption_algorithm.password
1546 else:
1547 raise ValueError("Unsupported encryption type")
1549 # PKCS8 + PEM/DER
1550 if format is serialization.PrivateFormat.PKCS8:
1551 if encoding is serialization.Encoding.PEM:
1552 write_bio = self._lib.PEM_write_bio_PKCS8PrivateKey
1553 elif encoding is serialization.Encoding.DER:
1554 write_bio = self._lib.i2d_PKCS8PrivateKey_bio
1555 else:
1556 raise ValueError("Unsupported encoding for PKCS8")
1557 return self._private_key_bytes_via_bio(
1558 write_bio, evp_pkey, password
1559 )
1561 # TraditionalOpenSSL + PEM/DER
1562 if format is serialization.PrivateFormat.TraditionalOpenSSL:
1563 if self._fips_enabled and not isinstance(
1564 encryption_algorithm, serialization.NoEncryption
1565 ):
1566 raise ValueError(
1567 "Encrypted traditional OpenSSL format is not "
1568 "supported in FIPS mode."
1569 )
1570 key_type = self._lib.EVP_PKEY_id(evp_pkey)
1572 if encoding is serialization.Encoding.PEM:
1573 if key_type == self._lib.EVP_PKEY_RSA:
1574 write_bio = self._lib.PEM_write_bio_RSAPrivateKey
1575 elif key_type == self._lib.EVP_PKEY_DSA:
1576 write_bio = self._lib.PEM_write_bio_DSAPrivateKey
1577 elif key_type == self._lib.EVP_PKEY_EC:
1578 write_bio = self._lib.PEM_write_bio_ECPrivateKey
1579 else:
1580 raise ValueError(
1581 "Unsupported key type for TraditionalOpenSSL"
1582 )
1583 return self._private_key_bytes_via_bio(
1584 write_bio, cdata, password
1585 )
1587 if encoding is serialization.Encoding.DER:
1588 if password:
1589 raise ValueError(
1590 "Encryption is not supported for DER encoded "
1591 "traditional OpenSSL keys"
1592 )
1593 if key_type == self._lib.EVP_PKEY_RSA:
1594 write_bio = self._lib.i2d_RSAPrivateKey_bio
1595 elif key_type == self._lib.EVP_PKEY_EC:
1596 write_bio = self._lib.i2d_ECPrivateKey_bio
1597 elif key_type == self._lib.EVP_PKEY_DSA:
1598 write_bio = self._lib.i2d_DSAPrivateKey_bio
1599 else:
1600 raise ValueError(
1601 "Unsupported key type for TraditionalOpenSSL"
1602 )
1603 return self._bio_func_output(write_bio, cdata)
1605 raise ValueError("Unsupported encoding for TraditionalOpenSSL")
1607 # OpenSSH + PEM
1608 if format is serialization.PrivateFormat.OpenSSH:
1609 if encoding is serialization.Encoding.PEM:
1610 return ssh._serialize_ssh_private_key(
1611 key, password, encryption_algorithm
1612 )
1614 raise ValueError(
1615 "OpenSSH private key format can only be used"
1616 " with PEM encoding"
1617 )
1619 # Anything that key-specific code was supposed to handle earlier,
1620 # like Raw.
1621 raise ValueError("format is invalid with this key")
1623 def _private_key_bytes_via_bio(self, write_bio, evp_pkey, password):
1624 if not password:
1625 evp_cipher = self._ffi.NULL
1626 else:
1627 # This is a curated value that we will update over time.
1628 evp_cipher = self._lib.EVP_get_cipherbyname(b"aes-256-cbc")
1630 return self._bio_func_output(
1631 write_bio,
1632 evp_pkey,
1633 evp_cipher,
1634 password,
1635 len(password),
1636 self._ffi.NULL,
1637 self._ffi.NULL,
1638 )
1640 def _bio_func_output(self, write_bio, *args):
1641 bio = self._create_mem_bio_gc()
1642 res = write_bio(bio, *args)
1643 self.openssl_assert(res == 1)
1644 return self._read_mem_bio(bio)
1646 def _public_key_bytes(
1647 self,
1648 encoding: serialization.Encoding,
1649 format: serialization.PublicFormat,
1650 key,
1651 evp_pkey,
1652 cdata,
1653 ) -> bytes:
1654 if not isinstance(encoding, serialization.Encoding):
1655 raise TypeError("encoding must be an item from the Encoding enum")
1656 if not isinstance(format, serialization.PublicFormat):
1657 raise TypeError(
1658 "format must be an item from the PublicFormat enum"
1659 )
1661 # SubjectPublicKeyInfo + PEM/DER
1662 if format is serialization.PublicFormat.SubjectPublicKeyInfo:
1663 if encoding is serialization.Encoding.PEM:
1664 write_bio = self._lib.PEM_write_bio_PUBKEY
1665 elif encoding is serialization.Encoding.DER:
1666 write_bio = self._lib.i2d_PUBKEY_bio
1667 else:
1668 raise ValueError(
1669 "SubjectPublicKeyInfo works only with PEM or DER encoding"
1670 )
1671 return self._bio_func_output(write_bio, evp_pkey)
1673 # PKCS1 + PEM/DER
1674 if format is serialization.PublicFormat.PKCS1:
1675 # Only RSA is supported here.
1676 key_type = self._lib.EVP_PKEY_id(evp_pkey)
1677 if key_type != self._lib.EVP_PKEY_RSA:
1678 raise ValueError("PKCS1 format is supported only for RSA keys")
1680 if encoding is serialization.Encoding.PEM:
1681 write_bio = self._lib.PEM_write_bio_RSAPublicKey
1682 elif encoding is serialization.Encoding.DER:
1683 write_bio = self._lib.i2d_RSAPublicKey_bio
1684 else:
1685 raise ValueError("PKCS1 works only with PEM or DER encoding")
1686 return self._bio_func_output(write_bio, cdata)
1688 # OpenSSH + OpenSSH
1689 if format is serialization.PublicFormat.OpenSSH:
1690 if encoding is serialization.Encoding.OpenSSH:
1691 return ssh.serialize_ssh_public_key(key)
1693 raise ValueError(
1694 "OpenSSH format must be used with OpenSSH encoding"
1695 )
1697 # Anything that key-specific code was supposed to handle earlier,
1698 # like Raw, CompressedPoint, UncompressedPoint
1699 raise ValueError("format is invalid with this key")
1701 def dh_supported(self) -> bool:
1702 return not self._lib.CRYPTOGRAPHY_IS_BORINGSSL
1704 def generate_dh_parameters(
1705 self, generator: int, key_size: int
1706 ) -> dh.DHParameters:
1707 if key_size < dh._MIN_MODULUS_SIZE:
1708 raise ValueError(
1709 "DH key_size must be at least {} bits".format(
1710 dh._MIN_MODULUS_SIZE
1711 )
1712 )
1714 if generator not in (2, 5):
1715 raise ValueError("DH generator must be 2 or 5")
1717 dh_param_cdata = self._lib.DH_new()
1718 self.openssl_assert(dh_param_cdata != self._ffi.NULL)
1719 dh_param_cdata = self._ffi.gc(dh_param_cdata, self._lib.DH_free)
1721 res = self._lib.DH_generate_parameters_ex(
1722 dh_param_cdata, key_size, generator, self._ffi.NULL
1723 )
1724 self.openssl_assert(res == 1)
1726 return _DHParameters(self, dh_param_cdata)
1728 def _dh_cdata_to_evp_pkey(self, dh_cdata):
1729 evp_pkey = self._create_evp_pkey_gc()
1730 res = self._lib.EVP_PKEY_set1_DH(evp_pkey, dh_cdata)
1731 self.openssl_assert(res == 1)
1732 return evp_pkey
1734 def generate_dh_private_key(
1735 self, parameters: dh.DHParameters
1736 ) -> dh.DHPrivateKey:
1737 dh_key_cdata = _dh_params_dup(
1738 parameters._dh_cdata, self # type: ignore[attr-defined]
1739 )
1741 res = self._lib.DH_generate_key(dh_key_cdata)
1742 self.openssl_assert(res == 1)
1744 evp_pkey = self._dh_cdata_to_evp_pkey(dh_key_cdata)
1746 return _DHPrivateKey(self, dh_key_cdata, evp_pkey)
1748 def generate_dh_private_key_and_parameters(
1749 self, generator: int, key_size: int
1750 ) -> dh.DHPrivateKey:
1751 return self.generate_dh_private_key(
1752 self.generate_dh_parameters(generator, key_size)
1753 )
1755 def load_dh_private_numbers(
1756 self, numbers: dh.DHPrivateNumbers
1757 ) -> dh.DHPrivateKey:
1758 parameter_numbers = numbers.public_numbers.parameter_numbers
1760 dh_cdata = self._lib.DH_new()
1761 self.openssl_assert(dh_cdata != self._ffi.NULL)
1762 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
1764 p = self._int_to_bn(parameter_numbers.p)
1765 g = self._int_to_bn(parameter_numbers.g)
1767 if parameter_numbers.q is not None:
1768 q = self._int_to_bn(parameter_numbers.q)
1769 else:
1770 q = self._ffi.NULL
1772 pub_key = self._int_to_bn(numbers.public_numbers.y)
1773 priv_key = self._int_to_bn(numbers.x)
1775 res = self._lib.DH_set0_pqg(dh_cdata, p, q, g)
1776 self.openssl_assert(res == 1)
1778 res = self._lib.DH_set0_key(dh_cdata, pub_key, priv_key)
1779 self.openssl_assert(res == 1)
1781 codes = self._ffi.new("int[]", 1)
1782 res = self._lib.Cryptography_DH_check(dh_cdata, codes)
1783 self.openssl_assert(res == 1)
1785 # DH_check will return DH_NOT_SUITABLE_GENERATOR if p % 24 does not
1786 # equal 11 when the generator is 2 (a quadratic nonresidue).
1787 # We want to ignore that error because p % 24 == 23 is also fine.
1788 # Specifically, g is then a quadratic residue. Within the context of
1789 # Diffie-Hellman this means it can only generate half the possible
1790 # values. That sounds bad, but quadratic nonresidues leak a bit of
1791 # the key to the attacker in exchange for having the full key space
1792 # available. See: https://crypto.stackexchange.com/questions/12961
1793 if codes[0] != 0 and not (
1794 parameter_numbers.g == 2
1795 and codes[0] ^ self._lib.DH_NOT_SUITABLE_GENERATOR == 0
1796 ):
1797 raise ValueError("DH private numbers did not pass safety checks.")
1799 evp_pkey = self._dh_cdata_to_evp_pkey(dh_cdata)
1801 return _DHPrivateKey(self, dh_cdata, evp_pkey)
1803 def load_dh_public_numbers(
1804 self, numbers: dh.DHPublicNumbers
1805 ) -> dh.DHPublicKey:
1806 dh_cdata = self._lib.DH_new()
1807 self.openssl_assert(dh_cdata != self._ffi.NULL)
1808 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
1810 parameter_numbers = numbers.parameter_numbers
1812 p = self._int_to_bn(parameter_numbers.p)
1813 g = self._int_to_bn(parameter_numbers.g)
1815 if parameter_numbers.q is not None:
1816 q = self._int_to_bn(parameter_numbers.q)
1817 else:
1818 q = self._ffi.NULL
1820 pub_key = self._int_to_bn(numbers.y)
1822 res = self._lib.DH_set0_pqg(dh_cdata, p, q, g)
1823 self.openssl_assert(res == 1)
1825 res = self._lib.DH_set0_key(dh_cdata, pub_key, self._ffi.NULL)
1826 self.openssl_assert(res == 1)
1828 evp_pkey = self._dh_cdata_to_evp_pkey(dh_cdata)
1830 return _DHPublicKey(self, dh_cdata, evp_pkey)
1832 def load_dh_parameter_numbers(
1833 self, numbers: dh.DHParameterNumbers
1834 ) -> dh.DHParameters:
1835 dh_cdata = self._lib.DH_new()
1836 self.openssl_assert(dh_cdata != self._ffi.NULL)
1837 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
1839 p = self._int_to_bn(numbers.p)
1840 g = self._int_to_bn(numbers.g)
1842 if numbers.q is not None:
1843 q = self._int_to_bn(numbers.q)
1844 else:
1845 q = self._ffi.NULL
1847 res = self._lib.DH_set0_pqg(dh_cdata, p, q, g)
1848 self.openssl_assert(res == 1)
1850 return _DHParameters(self, dh_cdata)
1852 def dh_parameters_supported(
1853 self, p: int, g: int, q: typing.Optional[int] = None
1854 ) -> bool:
1855 dh_cdata = self._lib.DH_new()
1856 self.openssl_assert(dh_cdata != self._ffi.NULL)
1857 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
1859 p = self._int_to_bn(p)
1860 g = self._int_to_bn(g)
1862 if q is not None:
1863 q = self._int_to_bn(q)
1864 else:
1865 q = self._ffi.NULL
1867 res = self._lib.DH_set0_pqg(dh_cdata, p, q, g)
1868 self.openssl_assert(res == 1)
1870 codes = self._ffi.new("int[]", 1)
1871 res = self._lib.Cryptography_DH_check(dh_cdata, codes)
1872 self.openssl_assert(res == 1)
1874 return codes[0] == 0
1876 def dh_x942_serialization_supported(self) -> bool:
1877 return self._lib.Cryptography_HAS_EVP_PKEY_DHX == 1
1879 def x25519_load_public_bytes(self, data: bytes) -> x25519.X25519PublicKey:
1880 # When we drop support for CRYPTOGRAPHY_OPENSSL_LESS_THAN_111 we can
1881 # switch this to EVP_PKEY_new_raw_public_key
1882 if len(data) != 32:
1883 raise ValueError("An X25519 public key is 32 bytes long")
1885 evp_pkey = self._create_evp_pkey_gc()
1886 res = self._lib.EVP_PKEY_set_type(evp_pkey, self._lib.NID_X25519)
1887 self.openssl_assert(res == 1)
1888 res = self._lib.EVP_PKEY_set1_tls_encodedpoint(
1889 evp_pkey, data, len(data)
1890 )
1891 self.openssl_assert(res == 1)
1892 return _X25519PublicKey(self, evp_pkey)
1894 def x25519_load_private_bytes(
1895 self, data: bytes
1896 ) -> x25519.X25519PrivateKey:
1897 # When we drop support for CRYPTOGRAPHY_OPENSSL_LESS_THAN_111 we can
1898 # switch this to EVP_PKEY_new_raw_private_key and drop the
1899 # zeroed_bytearray garbage.
1900 # OpenSSL only has facilities for loading PKCS8 formatted private
1901 # keys using the algorithm identifiers specified in
1902 # https://tools.ietf.org/html/draft-ietf-curdle-pkix-09.
1903 # This is the standard PKCS8 prefix for a 32 byte X25519 key.
1904 # The form is:
1905 # 0:d=0 hl=2 l= 46 cons: SEQUENCE
1906 # 2:d=1 hl=2 l= 1 prim: INTEGER :00
1907 # 5:d=1 hl=2 l= 5 cons: SEQUENCE
1908 # 7:d=2 hl=2 l= 3 prim: OBJECT :1.3.101.110
1909 # 12:d=1 hl=2 l= 34 prim: OCTET STRING (the key)
1910 # Of course there's a bit more complexity. In reality OCTET STRING
1911 # contains an OCTET STRING of length 32! So the last two bytes here
1912 # are \x04\x20, which is an OCTET STRING of length 32.
1913 if len(data) != 32:
1914 raise ValueError("An X25519 private key is 32 bytes long")
1916 pkcs8_prefix = b'0.\x02\x01\x000\x05\x06\x03+en\x04"\x04 '
1917 with self._zeroed_bytearray(48) as ba:
1918 ba[0:16] = pkcs8_prefix
1919 ba[16:] = data
1920 bio = self._bytes_to_bio(ba)
1921 evp_pkey = self._lib.d2i_PrivateKey_bio(bio.bio, self._ffi.NULL)
1923 self.openssl_assert(evp_pkey != self._ffi.NULL)
1924 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
1925 self.openssl_assert(
1926 self._lib.EVP_PKEY_id(evp_pkey) == self._lib.EVP_PKEY_X25519
1927 )
1928 return _X25519PrivateKey(self, evp_pkey)
1930 def _evp_pkey_keygen_gc(self, nid):
1931 evp_pkey_ctx = self._lib.EVP_PKEY_CTX_new_id(nid, self._ffi.NULL)
1932 self.openssl_assert(evp_pkey_ctx != self._ffi.NULL)
1933 evp_pkey_ctx = self._ffi.gc(evp_pkey_ctx, self._lib.EVP_PKEY_CTX_free)
1934 res = self._lib.EVP_PKEY_keygen_init(evp_pkey_ctx)
1935 self.openssl_assert(res == 1)
1936 evp_ppkey = self._ffi.new("EVP_PKEY **")
1937 res = self._lib.EVP_PKEY_keygen(evp_pkey_ctx, evp_ppkey)
1938 self.openssl_assert(res == 1)
1939 self.openssl_assert(evp_ppkey[0] != self._ffi.NULL)
1940 evp_pkey = self._ffi.gc(evp_ppkey[0], self._lib.EVP_PKEY_free)
1941 return evp_pkey
1943 def x25519_generate_key(self) -> x25519.X25519PrivateKey:
1944 evp_pkey = self._evp_pkey_keygen_gc(self._lib.NID_X25519)
1945 return _X25519PrivateKey(self, evp_pkey)
1947 def x25519_supported(self) -> bool:
1948 if self._fips_enabled:
1949 return False
1950 return not self._lib.CRYPTOGRAPHY_IS_LIBRESSL
1952 def x448_load_public_bytes(self, data: bytes) -> x448.X448PublicKey:
1953 if len(data) != 56:
1954 raise ValueError("An X448 public key is 56 bytes long")
1956 evp_pkey = self._lib.EVP_PKEY_new_raw_public_key(
1957 self._lib.NID_X448, self._ffi.NULL, data, len(data)
1958 )
1959 self.openssl_assert(evp_pkey != self._ffi.NULL)
1960 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
1961 return _X448PublicKey(self, evp_pkey)
1963 def x448_load_private_bytes(self, data: bytes) -> x448.X448PrivateKey:
1964 if len(data) != 56:
1965 raise ValueError("An X448 private key is 56 bytes long")
1967 data_ptr = self._ffi.from_buffer(data)
1968 evp_pkey = self._lib.EVP_PKEY_new_raw_private_key(
1969 self._lib.NID_X448, self._ffi.NULL, data_ptr, len(data)
1970 )
1971 self.openssl_assert(evp_pkey != self._ffi.NULL)
1972 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
1973 return _X448PrivateKey(self, evp_pkey)
1975 def x448_generate_key(self) -> x448.X448PrivateKey:
1976 evp_pkey = self._evp_pkey_keygen_gc(self._lib.NID_X448)
1977 return _X448PrivateKey(self, evp_pkey)
1979 def x448_supported(self) -> bool:
1980 if self._fips_enabled:
1981 return False
1982 return (
1983 not self._lib.CRYPTOGRAPHY_OPENSSL_LESS_THAN_111
1984 and not self._lib.CRYPTOGRAPHY_IS_BORINGSSL
1985 )
1987 def ed25519_supported(self) -> bool:
1988 if self._fips_enabled:
1989 return False
1990 return not self._lib.CRYPTOGRAPHY_OPENSSL_LESS_THAN_111B
1992 def ed25519_load_public_bytes(
1993 self, data: bytes
1994 ) -> ed25519.Ed25519PublicKey:
1995 utils._check_bytes("data", data)
1997 if len(data) != ed25519._ED25519_KEY_SIZE:
1998 raise ValueError("An Ed25519 public key is 32 bytes long")
2000 evp_pkey = self._lib.EVP_PKEY_new_raw_public_key(
2001 self._lib.NID_ED25519, self._ffi.NULL, data, len(data)
2002 )
2003 self.openssl_assert(evp_pkey != self._ffi.NULL)
2004 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
2006 return _Ed25519PublicKey(self, evp_pkey)
2008 def ed25519_load_private_bytes(
2009 self, data: bytes
2010 ) -> ed25519.Ed25519PrivateKey:
2011 if len(data) != ed25519._ED25519_KEY_SIZE:
2012 raise ValueError("An Ed25519 private key is 32 bytes long")
2014 utils._check_byteslike("data", data)
2015 data_ptr = self._ffi.from_buffer(data)
2016 evp_pkey = self._lib.EVP_PKEY_new_raw_private_key(
2017 self._lib.NID_ED25519, self._ffi.NULL, data_ptr, len(data)
2018 )
2019 self.openssl_assert(evp_pkey != self._ffi.NULL)
2020 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
2022 return _Ed25519PrivateKey(self, evp_pkey)
2024 def ed25519_generate_key(self) -> ed25519.Ed25519PrivateKey:
2025 evp_pkey = self._evp_pkey_keygen_gc(self._lib.NID_ED25519)
2026 return _Ed25519PrivateKey(self, evp_pkey)
2028 def ed448_supported(self) -> bool:
2029 if self._fips_enabled:
2030 return False
2031 return (
2032 not self._lib.CRYPTOGRAPHY_OPENSSL_LESS_THAN_111B
2033 and not self._lib.CRYPTOGRAPHY_IS_BORINGSSL
2034 )
2036 def ed448_load_public_bytes(self, data: bytes) -> ed448.Ed448PublicKey:
2037 utils._check_bytes("data", data)
2038 if len(data) != _ED448_KEY_SIZE:
2039 raise ValueError("An Ed448 public key is 57 bytes long")
2041 evp_pkey = self._lib.EVP_PKEY_new_raw_public_key(
2042 self._lib.NID_ED448, self._ffi.NULL, data, len(data)
2043 )
2044 self.openssl_assert(evp_pkey != self._ffi.NULL)
2045 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
2047 return _Ed448PublicKey(self, evp_pkey)
2049 def ed448_load_private_bytes(self, data: bytes) -> ed448.Ed448PrivateKey:
2050 utils._check_byteslike("data", data)
2051 if len(data) != _ED448_KEY_SIZE:
2052 raise ValueError("An Ed448 private key is 57 bytes long")
2054 data_ptr = self._ffi.from_buffer(data)
2055 evp_pkey = self._lib.EVP_PKEY_new_raw_private_key(
2056 self._lib.NID_ED448, self._ffi.NULL, data_ptr, len(data)
2057 )
2058 self.openssl_assert(evp_pkey != self._ffi.NULL)
2059 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
2061 return _Ed448PrivateKey(self, evp_pkey)
2063 def ed448_generate_key(self) -> ed448.Ed448PrivateKey:
2064 evp_pkey = self._evp_pkey_keygen_gc(self._lib.NID_ED448)
2065 return _Ed448PrivateKey(self, evp_pkey)
2067 def derive_scrypt(
2068 self,
2069 key_material: bytes,
2070 salt: bytes,
2071 length: int,
2072 n: int,
2073 r: int,
2074 p: int,
2075 ) -> bytes:
2076 buf = self._ffi.new("unsigned char[]", length)
2077 key_material_ptr = self._ffi.from_buffer(key_material)
2078 res = self._lib.EVP_PBE_scrypt(
2079 key_material_ptr,
2080 len(key_material),
2081 salt,
2082 len(salt),
2083 n,
2084 r,
2085 p,
2086 scrypt._MEM_LIMIT,
2087 buf,
2088 length,
2089 )
2090 if res != 1:
2091 errors = self._consume_errors_with_text()
2092 # memory required formula explained here:
2093 # https://blog.filippo.io/the-scrypt-parameters/
2094 min_memory = 128 * n * r // (1024**2)
2095 raise MemoryError(
2096 "Not enough memory to derive key. These parameters require"
2097 " {} MB of memory.".format(min_memory),
2098 errors,
2099 )
2100 return self._ffi.buffer(buf)[:]
2102 def aead_cipher_supported(self, cipher) -> bool:
2103 cipher_name = aead._aead_cipher_name(cipher)
2104 if self._fips_enabled and cipher_name not in self._fips_aead:
2105 return False
2106 # SIV isn't loaded through get_cipherbyname but instead a new fetch API
2107 # only available in 3.0+. But if we know we're on 3.0+ then we know
2108 # it's supported.
2109 if cipher_name.endswith(b"-siv"):
2110 return self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER == 1
2111 else:
2112 return (
2113 self._lib.EVP_get_cipherbyname(cipher_name) != self._ffi.NULL
2114 )
2116 @contextlib.contextmanager
2117 def _zeroed_bytearray(self, length: int) -> typing.Iterator[bytearray]:
2118 """
2119 This method creates a bytearray, which we copy data into (hopefully
2120 also from a mutable buffer that can be dynamically erased!), and then
2121 zero when we're done.
2122 """
2123 ba = bytearray(length)
2124 try:
2125 yield ba
2126 finally:
2127 self._zero_data(ba, length)
2129 def _zero_data(self, data, length: int) -> None:
2130 # We clear things this way because at the moment we're not
2131 # sure of a better way that can guarantee it overwrites the
2132 # memory of a bytearray and doesn't just replace the underlying char *.
2133 for i in range(length):
2134 data[i] = 0
2136 @contextlib.contextmanager
2137 def _zeroed_null_terminated_buf(self, data):
2138 """
2139 This method takes bytes, which can be a bytestring or a mutable
2140 buffer like a bytearray, and yields a null-terminated version of that
2141 data. This is required because PKCS12_parse doesn't take a length with
2142 its password char * and ffi.from_buffer doesn't provide null
2143 termination. So, to support zeroing the data via bytearray we
2144 need to build this ridiculous construct that copies the memory, but
2145 zeroes it after use.
2146 """
2147 if data is None:
2148 yield self._ffi.NULL
2149 else:
2150 data_len = len(data)
2151 buf = self._ffi.new("char[]", data_len + 1)
2152 self._ffi.memmove(buf, data, data_len)
2153 try:
2154 yield buf
2155 finally:
2156 # Cast to a uint8_t * so we can assign by integer
2157 self._zero_data(self._ffi.cast("uint8_t *", buf), data_len)
2159 def load_key_and_certificates_from_pkcs12(
2160 self, data: bytes, password: typing.Optional[bytes]
2161 ) -> typing.Tuple[
2162 typing.Optional[PRIVATE_KEY_TYPES],
2163 typing.Optional[x509.Certificate],
2164 typing.List[x509.Certificate],
2165 ]:
2166 pkcs12 = self.load_pkcs12(data, password)
2167 return (
2168 pkcs12.key,
2169 pkcs12.cert.certificate if pkcs12.cert else None,
2170 [cert.certificate for cert in pkcs12.additional_certs],
2171 )
2173 def load_pkcs12(
2174 self, data: bytes, password: typing.Optional[bytes]
2175 ) -> PKCS12KeyAndCertificates:
2176 if password is not None:
2177 utils._check_byteslike("password", password)
2179 bio = self._bytes_to_bio(data)
2180 p12 = self._lib.d2i_PKCS12_bio(bio.bio, self._ffi.NULL)
2181 if p12 == self._ffi.NULL:
2182 self._consume_errors()
2183 raise ValueError("Could not deserialize PKCS12 data")
2185 p12 = self._ffi.gc(p12, self._lib.PKCS12_free)
2186 evp_pkey_ptr = self._ffi.new("EVP_PKEY **")
2187 x509_ptr = self._ffi.new("X509 **")
2188 sk_x509_ptr = self._ffi.new("Cryptography_STACK_OF_X509 **")
2189 with self._zeroed_null_terminated_buf(password) as password_buf:
2190 res = self._lib.PKCS12_parse(
2191 p12, password_buf, evp_pkey_ptr, x509_ptr, sk_x509_ptr
2192 )
2193 # OpenSSL 3.0.6 leaves errors on the stack even in success, so
2194 # we consume all errors unconditionally.
2195 # https://github.com/openssl/openssl/issues/19389
2196 self._consume_errors()
2197 if res == 0:
2198 raise ValueError("Invalid password or PKCS12 data")
2200 cert = None
2201 key = None
2202 additional_certificates = []
2204 if evp_pkey_ptr[0] != self._ffi.NULL:
2205 evp_pkey = self._ffi.gc(evp_pkey_ptr[0], self._lib.EVP_PKEY_free)
2206 key = self._evp_pkey_to_private_key(evp_pkey)
2208 if x509_ptr[0] != self._ffi.NULL:
2209 x509 = self._ffi.gc(x509_ptr[0], self._lib.X509_free)
2210 cert_obj = self._ossl2cert(x509)
2211 name = None
2212 maybe_name = self._lib.X509_alias_get0(x509, self._ffi.NULL)
2213 if maybe_name != self._ffi.NULL:
2214 name = self._ffi.string(maybe_name)
2215 cert = PKCS12Certificate(cert_obj, name)
2217 if sk_x509_ptr[0] != self._ffi.NULL:
2218 sk_x509 = self._ffi.gc(sk_x509_ptr[0], self._lib.sk_X509_free)
2219 num = self._lib.sk_X509_num(sk_x509_ptr[0])
2221 # In OpenSSL < 3.0.0 PKCS12 parsing reverses the order of the
2222 # certificates.
2223 indices: typing.Iterable[int]
2224 if (
2225 self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER
2226 or self._lib.CRYPTOGRAPHY_IS_BORINGSSL
2227 ):
2228 indices = range(num)
2229 else:
2230 indices = reversed(range(num))
2232 for i in indices:
2233 x509 = self._lib.sk_X509_value(sk_x509, i)
2234 self.openssl_assert(x509 != self._ffi.NULL)
2235 x509 = self._ffi.gc(x509, self._lib.X509_free)
2236 addl_cert = self._ossl2cert(x509)
2237 addl_name = None
2238 maybe_name = self._lib.X509_alias_get0(x509, self._ffi.NULL)
2239 if maybe_name != self._ffi.NULL:
2240 addl_name = self._ffi.string(maybe_name)
2241 additional_certificates.append(
2242 PKCS12Certificate(addl_cert, addl_name)
2243 )
2245 return PKCS12KeyAndCertificates(key, cert, additional_certificates)
2247 def serialize_key_and_certificates_to_pkcs12(
2248 self,
2249 name: typing.Optional[bytes],
2250 key: typing.Optional[_ALLOWED_PKCS12_TYPES],
2251 cert: typing.Optional[x509.Certificate],
2252 cas: typing.Optional[typing.List[_PKCS12_CAS_TYPES]],
2253 encryption_algorithm: serialization.KeySerializationEncryption,
2254 ) -> bytes:
2255 password = None
2256 if name is not None:
2257 utils._check_bytes("name", name)
2259 if isinstance(encryption_algorithm, serialization.NoEncryption):
2260 nid_cert = -1
2261 nid_key = -1
2262 pkcs12_iter = 0
2263 mac_iter = 0
2264 mac_alg = self._ffi.NULL
2265 elif isinstance(
2266 encryption_algorithm, serialization.BestAvailableEncryption
2267 ):
2268 # PKCS12 encryption is hopeless trash and can never be fixed.
2269 # OpenSSL 3 supports PBESv2, but Libre and Boring do not, so
2270 # we use PBESv1 with 3DES on the older paths.
2271 if self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER:
2272 nid_cert = self._lib.NID_aes_256_cbc
2273 nid_key = self._lib.NID_aes_256_cbc
2274 else:
2275 nid_cert = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC
2276 nid_key = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC
2277 # At least we can set this higher than OpenSSL's default
2278 pkcs12_iter = 20000
2279 # mac_iter chosen for compatibility reasons, see:
2280 # https://www.openssl.org/docs/man1.1.1/man3/PKCS12_create.html
2281 # Did we mention how lousy PKCS12 encryption is?
2282 mac_iter = 1
2283 # MAC algorithm can only be set on OpenSSL 3.0.0+
2284 mac_alg = self._ffi.NULL
2285 password = encryption_algorithm.password
2286 elif (
2287 isinstance(
2288 encryption_algorithm, serialization._KeySerializationEncryption
2289 )
2290 and encryption_algorithm._format
2291 is serialization.PrivateFormat.PKCS12
2292 ):
2293 # Default to OpenSSL's defaults. Behavior will vary based on the
2294 # version of OpenSSL cryptography is compiled against.
2295 nid_cert = 0
2296 nid_key = 0
2297 # Use the default iters we use in best available
2298 pkcs12_iter = 20000
2299 # See the Best Available comment for why this is 1
2300 mac_iter = 1
2301 password = encryption_algorithm.password
2302 keycertalg = encryption_algorithm._key_cert_algorithm
2303 if keycertalg is PBES.PBESv1SHA1And3KeyTripleDESCBC:
2304 nid_cert = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC
2305 nid_key = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC
2306 elif keycertalg is PBES.PBESv2SHA256AndAES256CBC:
2307 if not self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER:
2308 raise UnsupportedAlgorithm(
2309 "PBESv2 is not supported by this version of OpenSSL"
2310 )
2311 nid_cert = self._lib.NID_aes_256_cbc
2312 nid_key = self._lib.NID_aes_256_cbc
2313 else:
2314 assert keycertalg is None
2315 # We use OpenSSL's defaults
2317 if encryption_algorithm._hmac_hash is not None:
2318 if not self._lib.Cryptography_HAS_PKCS12_SET_MAC:
2319 raise UnsupportedAlgorithm(
2320 "Setting MAC algorithm is not supported by this "
2321 "version of OpenSSL."
2322 )
2323 mac_alg = self._evp_md_non_null_from_algorithm(
2324 encryption_algorithm._hmac_hash
2325 )
2326 self.openssl_assert(mac_alg != self._ffi.NULL)
2327 else:
2328 mac_alg = self._ffi.NULL
2330 if encryption_algorithm._kdf_rounds is not None:
2331 pkcs12_iter = encryption_algorithm._kdf_rounds
2333 else:
2334 raise ValueError("Unsupported key encryption type")
2336 if cas is None or len(cas) == 0:
2337 sk_x509 = self._ffi.NULL
2338 else:
2339 sk_x509 = self._lib.sk_X509_new_null()
2340 sk_x509 = self._ffi.gc(sk_x509, self._lib.sk_X509_free)
2342 # This list is to keep the x509 values alive until end of function
2343 ossl_cas = []
2344 for ca in cas:
2345 if isinstance(ca, PKCS12Certificate):
2346 ca_alias = ca.friendly_name
2347 ossl_ca = self._cert2ossl(ca.certificate)
2348 with self._zeroed_null_terminated_buf(
2349 ca_alias
2350 ) as ca_name_buf:
2351 res = self._lib.X509_alias_set1(
2352 ossl_ca, ca_name_buf, -1
2353 )
2354 self.openssl_assert(res == 1)
2355 else:
2356 ossl_ca = self._cert2ossl(ca)
2357 ossl_cas.append(ossl_ca)
2358 res = self._lib.sk_X509_push(sk_x509, ossl_ca)
2359 backend.openssl_assert(res >= 1)
2361 with self._zeroed_null_terminated_buf(password) as password_buf:
2362 with self._zeroed_null_terminated_buf(name) as name_buf:
2363 ossl_cert = self._cert2ossl(cert) if cert else self._ffi.NULL
2364 if key is not None:
2365 evp_pkey = key._evp_pkey # type: ignore[union-attr]
2366 else:
2367 evp_pkey = self._ffi.NULL
2369 p12 = self._lib.PKCS12_create(
2370 password_buf,
2371 name_buf,
2372 evp_pkey,
2373 ossl_cert,
2374 sk_x509,
2375 nid_key,
2376 nid_cert,
2377 pkcs12_iter,
2378 mac_iter,
2379 0,
2380 )
2382 if (
2383 self._lib.Cryptography_HAS_PKCS12_SET_MAC
2384 and mac_alg != self._ffi.NULL
2385 ):
2386 self._lib.PKCS12_set_mac(
2387 p12,
2388 password_buf,
2389 -1,
2390 self._ffi.NULL,
2391 0,
2392 mac_iter,
2393 mac_alg,
2394 )
2396 self.openssl_assert(p12 != self._ffi.NULL)
2397 p12 = self._ffi.gc(p12, self._lib.PKCS12_free)
2399 bio = self._create_mem_bio_gc()
2400 res = self._lib.i2d_PKCS12_bio(bio, p12)
2401 self.openssl_assert(res > 0)
2402 return self._read_mem_bio(bio)
2404 def poly1305_supported(self) -> bool:
2405 if self._fips_enabled:
2406 return False
2407 return self._lib.Cryptography_HAS_POLY1305 == 1
2409 def create_poly1305_ctx(self, key: bytes) -> _Poly1305Context:
2410 utils._check_byteslike("key", key)
2411 if len(key) != _POLY1305_KEY_SIZE:
2412 raise ValueError("A poly1305 key is 32 bytes long")
2414 return _Poly1305Context(self, key)
2416 def pkcs7_supported(self) -> bool:
2417 return not self._lib.CRYPTOGRAPHY_IS_BORINGSSL
2419 def load_pem_pkcs7_certificates(
2420 self, data: bytes
2421 ) -> typing.List[x509.Certificate]:
2422 utils._check_bytes("data", data)
2423 bio = self._bytes_to_bio(data)
2424 p7 = self._lib.PEM_read_bio_PKCS7(
2425 bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL
2426 )
2427 if p7 == self._ffi.NULL:
2428 self._consume_errors()
2429 raise ValueError("Unable to parse PKCS7 data")
2431 p7 = self._ffi.gc(p7, self._lib.PKCS7_free)
2432 return self._load_pkcs7_certificates(p7)
2434 def load_der_pkcs7_certificates(
2435 self, data: bytes
2436 ) -> typing.List[x509.Certificate]:
2437 utils._check_bytes("data", data)
2438 bio = self._bytes_to_bio(data)
2439 p7 = self._lib.d2i_PKCS7_bio(bio.bio, self._ffi.NULL)
2440 if p7 == self._ffi.NULL:
2441 self._consume_errors()
2442 raise ValueError("Unable to parse PKCS7 data")
2444 p7 = self._ffi.gc(p7, self._lib.PKCS7_free)
2445 return self._load_pkcs7_certificates(p7)
2447 def _load_pkcs7_certificates(self, p7):
2448 nid = self._lib.OBJ_obj2nid(p7.type)
2449 self.openssl_assert(nid != self._lib.NID_undef)
2450 if nid != self._lib.NID_pkcs7_signed:
2451 raise UnsupportedAlgorithm(
2452 "Only basic signed structures are currently supported. NID"
2453 " for this data was {}".format(nid),
2454 _Reasons.UNSUPPORTED_SERIALIZATION,
2455 )
2457 sk_x509 = p7.d.sign.cert
2458 num = self._lib.sk_X509_num(sk_x509)
2459 certs = []
2460 for i in range(num):
2461 x509 = self._lib.sk_X509_value(sk_x509, i)
2462 self.openssl_assert(x509 != self._ffi.NULL)
2463 res = self._lib.X509_up_ref(x509)
2464 # When OpenSSL is less than 1.1.0 up_ref returns the current
2465 # refcount. On 1.1.0+ it returns 1 for success.
2466 self.openssl_assert(res >= 1)
2467 x509 = self._ffi.gc(x509, self._lib.X509_free)
2468 cert = self._ossl2cert(x509)
2469 certs.append(cert)
2471 return certs
2473 def pkcs7_serialize_certificates(
2474 self,
2475 certs: typing.List[x509.Certificate],
2476 encoding: serialization.Encoding,
2477 ):
2478 certs = list(certs)
2479 if not certs or not all(
2480 isinstance(cert, x509.Certificate) for cert in certs
2481 ):
2482 raise TypeError("certs must be a list of certs with length >= 1")
2484 if encoding not in (
2485 serialization.Encoding.PEM,
2486 serialization.Encoding.DER,
2487 ):
2488 raise TypeError("encoding must DER or PEM from the Encoding enum")
2490 certs_sk = self._lib.sk_X509_new_null()
2491 certs_sk = self._ffi.gc(certs_sk, self._lib.sk_X509_free)
2492 # This list is to keep the x509 values alive until end of function
2493 ossl_certs = []
2494 for cert in certs:
2495 ossl_cert = self._cert2ossl(cert)
2496 ossl_certs.append(ossl_cert)
2497 res = self._lib.sk_X509_push(certs_sk, ossl_cert)
2498 self.openssl_assert(res >= 1)
2499 # We use PKCS7_sign here because it creates the PKCS7 and PKCS7_SIGNED
2500 # structures for us rather than requiring manual assignment.
2501 p7 = self._lib.PKCS7_sign(
2502 self._ffi.NULL,
2503 self._ffi.NULL,
2504 certs_sk,
2505 self._ffi.NULL,
2506 self._lib.PKCS7_PARTIAL,
2507 )
2508 bio_out = self._create_mem_bio_gc()
2509 if encoding is serialization.Encoding.PEM:
2510 res = self._lib.PEM_write_bio_PKCS7_stream(
2511 bio_out, p7, self._ffi.NULL, 0
2512 )
2513 else:
2514 assert encoding is serialization.Encoding.DER
2515 res = self._lib.i2d_PKCS7_bio(bio_out, p7)
2517 self.openssl_assert(res == 1)
2518 return self._read_mem_bio(bio_out)
2520 def pkcs7_sign(
2521 self,
2522 builder: pkcs7.PKCS7SignatureBuilder,
2523 encoding: serialization.Encoding,
2524 options: typing.List[pkcs7.PKCS7Options],
2525 ) -> bytes:
2526 assert builder._data is not None
2527 bio = self._bytes_to_bio(builder._data)
2528 init_flags = self._lib.PKCS7_PARTIAL
2529 final_flags = 0
2531 if len(builder._additional_certs) == 0:
2532 certs = self._ffi.NULL
2533 else:
2534 certs = self._lib.sk_X509_new_null()
2535 certs = self._ffi.gc(certs, self._lib.sk_X509_free)
2536 # This list is to keep the x509 values alive until end of function
2537 ossl_certs = []
2538 for cert in builder._additional_certs:
2539 ossl_cert = self._cert2ossl(cert)
2540 ossl_certs.append(ossl_cert)
2541 res = self._lib.sk_X509_push(certs, ossl_cert)
2542 self.openssl_assert(res >= 1)
2544 if pkcs7.PKCS7Options.DetachedSignature in options:
2545 # Don't embed the data in the PKCS7 structure
2546 init_flags |= self._lib.PKCS7_DETACHED
2547 final_flags |= self._lib.PKCS7_DETACHED
2549 # This just inits a structure for us. However, there
2550 # are flags we need to set, joy.
2551 p7 = self._lib.PKCS7_sign(
2552 self._ffi.NULL,
2553 self._ffi.NULL,
2554 certs,
2555 self._ffi.NULL,
2556 init_flags,
2557 )
2558 self.openssl_assert(p7 != self._ffi.NULL)
2559 p7 = self._ffi.gc(p7, self._lib.PKCS7_free)
2560 signer_flags = 0
2561 # These flags are configurable on a per-signature basis
2562 # but we've deliberately chosen to make the API only allow
2563 # setting it across all signatures for now.
2564 if pkcs7.PKCS7Options.NoCapabilities in options:
2565 signer_flags |= self._lib.PKCS7_NOSMIMECAP
2566 elif pkcs7.PKCS7Options.NoAttributes in options:
2567 signer_flags |= self._lib.PKCS7_NOATTR
2569 if pkcs7.PKCS7Options.NoCerts in options:
2570 signer_flags |= self._lib.PKCS7_NOCERTS
2572 for certificate, private_key, hash_algorithm in builder._signers:
2573 ossl_cert = self._cert2ossl(certificate)
2574 md = self._evp_md_non_null_from_algorithm(hash_algorithm)
2575 p7signerinfo = self._lib.PKCS7_sign_add_signer(
2576 p7,
2577 ossl_cert,
2578 private_key._evp_pkey, # type: ignore[union-attr]
2579 md,
2580 signer_flags,
2581 )
2582 self.openssl_assert(p7signerinfo != self._ffi.NULL)
2584 for option in options:
2585 # DetachedSignature, NoCapabilities, and NoAttributes are already
2586 # handled so we just need to check these last two options.
2587 if option is pkcs7.PKCS7Options.Text:
2588 final_flags |= self._lib.PKCS7_TEXT
2589 elif option is pkcs7.PKCS7Options.Binary:
2590 final_flags |= self._lib.PKCS7_BINARY
2592 bio_out = self._create_mem_bio_gc()
2593 if encoding is serialization.Encoding.SMIME:
2594 # This finalizes the structure
2595 res = self._lib.SMIME_write_PKCS7(
2596 bio_out, p7, bio.bio, final_flags
2597 )
2598 elif encoding is serialization.Encoding.PEM:
2599 res = self._lib.PKCS7_final(p7, bio.bio, final_flags)
2600 self.openssl_assert(res == 1)
2601 res = self._lib.PEM_write_bio_PKCS7_stream(
2602 bio_out, p7, bio.bio, final_flags
2603 )
2604 else:
2605 assert encoding is serialization.Encoding.DER
2606 # We need to call finalize here becauase i2d_PKCS7_bio does not
2607 # finalize.
2608 res = self._lib.PKCS7_final(p7, bio.bio, final_flags)
2609 self.openssl_assert(res == 1)
2610 # OpenSSL 3.0 leaves a random bio error on the stack:
2611 # https://github.com/openssl/openssl/issues/16681
2612 if self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER:
2613 self._consume_errors()
2614 res = self._lib.i2d_PKCS7_bio(bio_out, p7)
2615 self.openssl_assert(res == 1)
2616 return self._read_mem_bio(bio_out)
2619class GetCipherByName:
2620 def __init__(self, fmt: str):
2621 self._fmt = fmt
2623 def __call__(self, backend: Backend, cipher: CipherAlgorithm, mode: Mode):
2624 cipher_name = self._fmt.format(cipher=cipher, mode=mode).lower()
2625 evp_cipher = backend._lib.EVP_get_cipherbyname(
2626 cipher_name.encode("ascii")
2627 )
2629 # try EVP_CIPHER_fetch if present
2630 if (
2631 evp_cipher == backend._ffi.NULL
2632 and backend._lib.Cryptography_HAS_300_EVP_CIPHER
2633 ):
2634 evp_cipher = backend._lib.EVP_CIPHER_fetch(
2635 backend._ffi.NULL,
2636 cipher_name.encode("ascii"),
2637 backend._ffi.NULL,
2638 )
2640 backend._consume_errors()
2641 return evp_cipher
2644def _get_xts_cipher(backend: Backend, cipher: AES, mode):
2645 cipher_name = "aes-{}-xts".format(cipher.key_size // 2)
2646 return backend._lib.EVP_get_cipherbyname(cipher_name.encode("ascii"))
2649backend = Backend()