1# This file is dual licensed under the terms of the Apache License, Version
2# 2.0, and the BSD License. See the LICENSE file in the root of this repository
3# for complete details.
4
5from __future__ import annotations
6
7import collections
8import contextlib
9import itertools
10import typing
11
12from cryptography import utils, x509
13from cryptography.exceptions import UnsupportedAlgorithm
14from cryptography.hazmat.backends.openssl import aead
15from cryptography.hazmat.backends.openssl.ciphers import _CipherContext
16from cryptography.hazmat.bindings._rust import openssl as rust_openssl
17from cryptography.hazmat.bindings.openssl import binding
18from cryptography.hazmat.primitives import hashes, serialization
19from cryptography.hazmat.primitives._asymmetric import AsymmetricPadding
20from cryptography.hazmat.primitives.asymmetric import ec
21from cryptography.hazmat.primitives.asymmetric import utils as asym_utils
22from cryptography.hazmat.primitives.asymmetric.padding import (
23 MGF1,
24 OAEP,
25 PSS,
26 PKCS1v15,
27)
28from cryptography.hazmat.primitives.asymmetric.types import (
29 PrivateKeyTypes,
30)
31from cryptography.hazmat.primitives.ciphers import (
32 CipherAlgorithm,
33)
34from cryptography.hazmat.primitives.ciphers.algorithms import (
35 AES,
36 AES128,
37 AES256,
38 ARC4,
39 SM4,
40 Camellia,
41 ChaCha20,
42 TripleDES,
43 _BlowfishInternal,
44 _CAST5Internal,
45 _IDEAInternal,
46 _SEEDInternal,
47)
48from cryptography.hazmat.primitives.ciphers.modes import (
49 CBC,
50 CFB,
51 CFB8,
52 CTR,
53 ECB,
54 GCM,
55 OFB,
56 XTS,
57 Mode,
58)
59from cryptography.hazmat.primitives.serialization.pkcs12 import (
60 PBES,
61 PKCS12Certificate,
62 PKCS12KeyAndCertificates,
63 PKCS12PrivateKeyTypes,
64 _PKCS12CATypes,
65)
66
67_MemoryBIO = collections.namedtuple("_MemoryBIO", ["bio", "char_ptr"])
68
69
70# Not actually supported, just used as a marker for some serialization tests.
71class _RC2:
72 pass
73
74
75class Backend:
76 """
77 OpenSSL API binding interfaces.
78 """
79
80 name = "openssl"
81
82 # FIPS has opinions about acceptable algorithms and key sizes, but the
83 # disallowed algorithms are still present in OpenSSL. They just error if
84 # you try to use them. To avoid that we allowlist the algorithms in
85 # FIPS 140-3. This isn't ideal, but FIPS 140-3 is trash so here we are.
86 _fips_aead: typing.ClassVar[set[bytes]] = {
87 b"aes-128-ccm",
88 b"aes-192-ccm",
89 b"aes-256-ccm",
90 b"aes-128-gcm",
91 b"aes-192-gcm",
92 b"aes-256-gcm",
93 }
94 # TripleDES encryption is disallowed/deprecated throughout 2023 in
95 # FIPS 140-3. To keep it simple we denylist any use of TripleDES (TDEA).
96 _fips_ciphers = (AES,)
97 # Sometimes SHA1 is still permissible. That logic is contained
98 # within the various *_supported methods.
99 _fips_hashes = (
100 hashes.SHA224,
101 hashes.SHA256,
102 hashes.SHA384,
103 hashes.SHA512,
104 hashes.SHA512_224,
105 hashes.SHA512_256,
106 hashes.SHA3_224,
107 hashes.SHA3_256,
108 hashes.SHA3_384,
109 hashes.SHA3_512,
110 hashes.SHAKE128,
111 hashes.SHAKE256,
112 )
113 _fips_ecdh_curves = (
114 ec.SECP224R1,
115 ec.SECP256R1,
116 ec.SECP384R1,
117 ec.SECP521R1,
118 )
119 _fips_rsa_min_key_size = 2048
120 _fips_rsa_min_public_exponent = 65537
121 _fips_dsa_min_modulus = 1 << 2048
122 _fips_dh_min_key_size = 2048
123 _fips_dh_min_modulus = 1 << _fips_dh_min_key_size
124
125 def __init__(self) -> None:
126 self._binding = binding.Binding()
127 self._ffi = self._binding.ffi
128 self._lib = self._binding.lib
129 self._fips_enabled = rust_openssl.is_fips_enabled()
130
131 self._cipher_registry: dict[
132 tuple[type[CipherAlgorithm], type[Mode]],
133 typing.Callable,
134 ] = {}
135 self._register_default_ciphers()
136
137 def __repr__(self) -> str:
138 return "<OpenSSLBackend(version: {}, FIPS: {}, Legacy: {})>".format(
139 self.openssl_version_text(),
140 self._fips_enabled,
141 rust_openssl._legacy_provider_loaded,
142 )
143
144 def openssl_assert(
145 self,
146 ok: bool,
147 errors: list[rust_openssl.OpenSSLError] | None = None,
148 ) -> None:
149 return binding._openssl_assert(ok, errors=errors)
150
151 def _enable_fips(self) -> None:
152 # This function enables FIPS mode for OpenSSL 3.0.0 on installs that
153 # have the FIPS provider installed properly.
154 self._binding._enable_fips()
155 assert rust_openssl.is_fips_enabled()
156 self._fips_enabled = rust_openssl.is_fips_enabled()
157
158 def openssl_version_text(self) -> str:
159 """
160 Friendly string name of the loaded OpenSSL library. This is not
161 necessarily the same version as it was compiled against.
162
163 Example: OpenSSL 1.1.1d 10 Sep 2019
164 """
165 return self._ffi.string(
166 self._lib.OpenSSL_version(self._lib.OPENSSL_VERSION)
167 ).decode("ascii")
168
169 def openssl_version_number(self) -> int:
170 return self._lib.OpenSSL_version_num()
171
172 def _evp_md_from_algorithm(self, algorithm: hashes.HashAlgorithm):
173 if algorithm.name in ("blake2b", "blake2s"):
174 alg = f"{algorithm.name}{algorithm.digest_size * 8}".encode(
175 "ascii"
176 )
177 else:
178 alg = algorithm.name.encode("ascii")
179
180 evp_md = self._lib.EVP_get_digestbyname(alg)
181 return evp_md
182
183 def _evp_md_non_null_from_algorithm(self, algorithm: hashes.HashAlgorithm):
184 evp_md = self._evp_md_from_algorithm(algorithm)
185 self.openssl_assert(evp_md != self._ffi.NULL)
186 return evp_md
187
188 def hash_supported(self, algorithm: hashes.HashAlgorithm) -> bool:
189 if self._fips_enabled and not isinstance(algorithm, self._fips_hashes):
190 return False
191
192 evp_md = self._evp_md_from_algorithm(algorithm)
193 return evp_md != self._ffi.NULL
194
195 def signature_hash_supported(
196 self, algorithm: hashes.HashAlgorithm
197 ) -> bool:
198 # Dedicated check for hashing algorithm use in message digest for
199 # signatures, e.g. RSA PKCS#1 v1.5 SHA1 (sha1WithRSAEncryption).
200 if self._fips_enabled and isinstance(algorithm, hashes.SHA1):
201 return False
202 return self.hash_supported(algorithm)
203
204 def scrypt_supported(self) -> bool:
205 if self._fips_enabled:
206 return False
207 else:
208 return self._lib.Cryptography_HAS_SCRYPT == 1
209
210 def hmac_supported(self, algorithm: hashes.HashAlgorithm) -> bool:
211 # FIPS mode still allows SHA1 for HMAC
212 if self._fips_enabled and isinstance(algorithm, hashes.SHA1):
213 return True
214
215 return self.hash_supported(algorithm)
216
217 def cipher_supported(self, cipher: CipherAlgorithm, mode: Mode) -> bool:
218 if self._fips_enabled:
219 # FIPS mode requires AES. TripleDES is disallowed/deprecated in
220 # FIPS 140-3.
221 if not isinstance(cipher, self._fips_ciphers):
222 return False
223
224 try:
225 adapter = self._cipher_registry[type(cipher), type(mode)]
226 except KeyError:
227 return False
228 evp_cipher = adapter(self, cipher, mode)
229 return self._ffi.NULL != evp_cipher
230
231 def register_cipher_adapter(self, cipher_cls, mode_cls, adapter) -> None:
232 if (cipher_cls, mode_cls) in self._cipher_registry:
233 raise ValueError(
234 f"Duplicate registration for: {cipher_cls} {mode_cls}."
235 )
236 self._cipher_registry[cipher_cls, mode_cls] = adapter
237
238 def _register_default_ciphers(self) -> None:
239 for cipher_cls in [AES, AES128, AES256]:
240 for mode_cls in [CBC, CTR, ECB, OFB, CFB, CFB8, GCM]:
241 self.register_cipher_adapter(
242 cipher_cls,
243 mode_cls,
244 GetCipherByName(
245 "{cipher.name}-{cipher.key_size}-{mode.name}"
246 ),
247 )
248 for mode_cls in [CBC, CTR, ECB, OFB, CFB]:
249 self.register_cipher_adapter(
250 Camellia,
251 mode_cls,
252 GetCipherByName("{cipher.name}-{cipher.key_size}-{mode.name}"),
253 )
254 for mode_cls in [CBC, CFB, CFB8, OFB]:
255 self.register_cipher_adapter(
256 TripleDES, mode_cls, GetCipherByName("des-ede3-{mode.name}")
257 )
258 self.register_cipher_adapter(
259 TripleDES, ECB, GetCipherByName("des-ede3")
260 )
261 # ChaCha20 uses the Long Name "chacha20" in OpenSSL, but in LibreSSL
262 # it uses "chacha"
263 self.register_cipher_adapter(
264 ChaCha20,
265 type(None),
266 GetCipherByName(
267 "chacha" if self._lib.CRYPTOGRAPHY_IS_LIBRESSL else "chacha20"
268 ),
269 )
270 self.register_cipher_adapter(AES, XTS, _get_xts_cipher)
271 for mode_cls in [ECB, CBC, OFB, CFB, CTR, GCM]:
272 self.register_cipher_adapter(
273 SM4, mode_cls, GetCipherByName("sm4-{mode.name}")
274 )
275 # Don't register legacy ciphers if they're unavailable. Hypothetically
276 # this wouldn't be necessary because we test availability by seeing if
277 # we get an EVP_CIPHER * in the _CipherContext __init__, but OpenSSL 3
278 # will return a valid pointer even though the cipher is unavailable.
279 if (
280 rust_openssl._legacy_provider_loaded
281 or not self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER
282 ):
283 for mode_cls in [CBC, CFB, OFB, ECB]:
284 self.register_cipher_adapter(
285 _BlowfishInternal,
286 mode_cls,
287 GetCipherByName("bf-{mode.name}"),
288 )
289 for mode_cls in [CBC, CFB, OFB, ECB]:
290 self.register_cipher_adapter(
291 _SEEDInternal,
292 mode_cls,
293 GetCipherByName("seed-{mode.name}"),
294 )
295 for cipher_cls, mode_cls in itertools.product(
296 [_CAST5Internal, _IDEAInternal],
297 [CBC, OFB, CFB, ECB],
298 ):
299 self.register_cipher_adapter(
300 cipher_cls,
301 mode_cls,
302 GetCipherByName("{cipher.name}-{mode.name}"),
303 )
304 self.register_cipher_adapter(
305 ARC4, type(None), GetCipherByName("rc4")
306 )
307 # We don't actually support RC2, this is just used by some tests.
308 self.register_cipher_adapter(
309 _RC2, type(None), GetCipherByName("rc2")
310 )
311
312 def create_symmetric_encryption_ctx(
313 self, cipher: CipherAlgorithm, mode: Mode
314 ) -> _CipherContext:
315 return _CipherContext(self, cipher, mode, _CipherContext._ENCRYPT)
316
317 def create_symmetric_decryption_ctx(
318 self, cipher: CipherAlgorithm, mode: Mode
319 ) -> _CipherContext:
320 return _CipherContext(self, cipher, mode, _CipherContext._DECRYPT)
321
322 def pbkdf2_hmac_supported(self, algorithm: hashes.HashAlgorithm) -> bool:
323 return self.hmac_supported(algorithm)
324
325 def _consume_errors(self) -> list[rust_openssl.OpenSSLError]:
326 return rust_openssl.capture_error_stack()
327
328 def generate_rsa_parameters_supported(
329 self, public_exponent: int, key_size: int
330 ) -> bool:
331 return (
332 public_exponent >= 3
333 and public_exponent & 1 != 0
334 and key_size >= 512
335 )
336
337 def _bytes_to_bio(self, data: bytes) -> _MemoryBIO:
338 """
339 Return a _MemoryBIO namedtuple of (BIO, char*).
340
341 The char* is the storage for the BIO and it must stay alive until the
342 BIO is finished with.
343 """
344 data_ptr = self._ffi.from_buffer(data)
345 bio = self._lib.BIO_new_mem_buf(data_ptr, len(data))
346 self.openssl_assert(bio != self._ffi.NULL)
347
348 return _MemoryBIO(self._ffi.gc(bio, self._lib.BIO_free), data_ptr)
349
350 def _create_mem_bio_gc(self):
351 """
352 Creates an empty memory BIO.
353 """
354 bio_method = self._lib.BIO_s_mem()
355 self.openssl_assert(bio_method != self._ffi.NULL)
356 bio = self._lib.BIO_new(bio_method)
357 self.openssl_assert(bio != self._ffi.NULL)
358 bio = self._ffi.gc(bio, self._lib.BIO_free)
359 return bio
360
361 def _read_mem_bio(self, bio) -> bytes:
362 """
363 Reads a memory BIO. This only works on memory BIOs.
364 """
365 buf = self._ffi.new("char **")
366 buf_len = self._lib.BIO_get_mem_data(bio, buf)
367 self.openssl_assert(buf_len > 0)
368 self.openssl_assert(buf[0] != self._ffi.NULL)
369 bio_data = self._ffi.buffer(buf[0], buf_len)[:]
370 return bio_data
371
372 def _oaep_hash_supported(self, algorithm: hashes.HashAlgorithm) -> bool:
373 if self._fips_enabled and isinstance(algorithm, hashes.SHA1):
374 return False
375
376 return isinstance(
377 algorithm,
378 (
379 hashes.SHA1,
380 hashes.SHA224,
381 hashes.SHA256,
382 hashes.SHA384,
383 hashes.SHA512,
384 ),
385 )
386
387 def rsa_padding_supported(self, padding: AsymmetricPadding) -> bool:
388 if isinstance(padding, PKCS1v15):
389 return True
390 elif isinstance(padding, PSS) and isinstance(padding._mgf, MGF1):
391 # SHA1 is permissible in MGF1 in FIPS even when SHA1 is blocked
392 # as signature algorithm.
393 if self._fips_enabled and isinstance(
394 padding._mgf._algorithm, hashes.SHA1
395 ):
396 return True
397 else:
398 return self.hash_supported(padding._mgf._algorithm)
399 elif isinstance(padding, OAEP) and isinstance(padding._mgf, MGF1):
400 return self._oaep_hash_supported(
401 padding._mgf._algorithm
402 ) and self._oaep_hash_supported(padding._algorithm)
403 else:
404 return False
405
406 def rsa_encryption_supported(self, padding: AsymmetricPadding) -> bool:
407 if self._fips_enabled and isinstance(padding, PKCS1v15):
408 return False
409 else:
410 return self.rsa_padding_supported(padding)
411
412 def dsa_supported(self) -> bool:
413 return (
414 not self._lib.CRYPTOGRAPHY_IS_BORINGSSL and not self._fips_enabled
415 )
416
417 def dsa_hash_supported(self, algorithm: hashes.HashAlgorithm) -> bool:
418 if not self.dsa_supported():
419 return False
420 return self.signature_hash_supported(algorithm)
421
422 def cmac_algorithm_supported(self, algorithm) -> bool:
423 return self.cipher_supported(
424 algorithm, CBC(b"\x00" * algorithm.block_size)
425 )
426
427 def _cert2ossl(self, cert: x509.Certificate) -> typing.Any:
428 data = cert.public_bytes(serialization.Encoding.DER)
429 mem_bio = self._bytes_to_bio(data)
430 x509 = self._lib.d2i_X509_bio(mem_bio.bio, self._ffi.NULL)
431 self.openssl_assert(x509 != self._ffi.NULL)
432 x509 = self._ffi.gc(x509, self._lib.X509_free)
433 return x509
434
435 def _ossl2cert(self, x509_ptr: typing.Any) -> x509.Certificate:
436 bio = self._create_mem_bio_gc()
437 res = self._lib.i2d_X509_bio(bio, x509_ptr)
438 self.openssl_assert(res == 1)
439 return x509.load_der_x509_certificate(self._read_mem_bio(bio))
440
441 def _key2ossl(self, key: PKCS12PrivateKeyTypes) -> typing.Any:
442 data = key.private_bytes(
443 serialization.Encoding.DER,
444 serialization.PrivateFormat.PKCS8,
445 serialization.NoEncryption(),
446 )
447 mem_bio = self._bytes_to_bio(data)
448
449 evp_pkey = self._lib.d2i_PrivateKey_bio(
450 mem_bio.bio,
451 self._ffi.NULL,
452 )
453 self.openssl_assert(evp_pkey != self._ffi.NULL)
454 return self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
455
456 def _handle_key_loading_error(
457 self, errors: list[rust_openssl.OpenSSLError]
458 ) -> typing.NoReturn:
459 if not errors:
460 raise ValueError(
461 "Could not deserialize key data. The data may be in an "
462 "incorrect format or it may be encrypted with an unsupported "
463 "algorithm."
464 )
465
466 elif (
467 errors[0]._lib_reason_match(
468 self._lib.ERR_LIB_EVP, self._lib.EVP_R_BAD_DECRYPT
469 )
470 or errors[0]._lib_reason_match(
471 self._lib.ERR_LIB_PKCS12,
472 self._lib.PKCS12_R_PKCS12_CIPHERFINAL_ERROR,
473 )
474 or (
475 self._lib.Cryptography_HAS_PROVIDERS
476 and errors[0]._lib_reason_match(
477 self._lib.ERR_LIB_PROV,
478 self._lib.PROV_R_BAD_DECRYPT,
479 )
480 )
481 ):
482 raise ValueError("Bad decrypt. Incorrect password?")
483
484 elif any(
485 error._lib_reason_match(
486 self._lib.ERR_LIB_EVP,
487 self._lib.EVP_R_UNSUPPORTED_PRIVATE_KEY_ALGORITHM,
488 )
489 for error in errors
490 ):
491 raise ValueError("Unsupported public key algorithm.")
492
493 else:
494 raise ValueError(
495 "Could not deserialize key data. The data may be in an "
496 "incorrect format, it may be encrypted with an unsupported "
497 "algorithm, or it may be an unsupported key type (e.g. EC "
498 "curves with explicit parameters).",
499 errors,
500 )
501
502 def elliptic_curve_supported(self, curve: ec.EllipticCurve) -> bool:
503 if self._fips_enabled and not isinstance(
504 curve, self._fips_ecdh_curves
505 ):
506 return False
507
508 return rust_openssl.ec.curve_supported(curve)
509
510 def elliptic_curve_signature_algorithm_supported(
511 self,
512 signature_algorithm: ec.EllipticCurveSignatureAlgorithm,
513 curve: ec.EllipticCurve,
514 ) -> bool:
515 # We only support ECDSA right now.
516 if not isinstance(signature_algorithm, ec.ECDSA):
517 return False
518
519 return self.elliptic_curve_supported(curve) and (
520 isinstance(signature_algorithm.algorithm, asym_utils.Prehashed)
521 or self.hash_supported(signature_algorithm.algorithm)
522 )
523
524 def elliptic_curve_exchange_algorithm_supported(
525 self, algorithm: ec.ECDH, curve: ec.EllipticCurve
526 ) -> bool:
527 return self.elliptic_curve_supported(curve) and isinstance(
528 algorithm, ec.ECDH
529 )
530
531 def dh_supported(self) -> bool:
532 return not self._lib.CRYPTOGRAPHY_IS_BORINGSSL
533
534 def dh_x942_serialization_supported(self) -> bool:
535 return self._lib.Cryptography_HAS_EVP_PKEY_DHX == 1
536
537 def x25519_supported(self) -> bool:
538 if self._fips_enabled:
539 return False
540 return True
541
542 def x448_supported(self) -> bool:
543 if self._fips_enabled:
544 return False
545 return (
546 not self._lib.CRYPTOGRAPHY_IS_LIBRESSL
547 and not self._lib.CRYPTOGRAPHY_IS_BORINGSSL
548 )
549
550 def ed25519_supported(self) -> bool:
551 if self._fips_enabled:
552 return False
553 return True
554
555 def ed448_supported(self) -> bool:
556 if self._fips_enabled:
557 return False
558 return (
559 not self._lib.CRYPTOGRAPHY_IS_LIBRESSL
560 and not self._lib.CRYPTOGRAPHY_IS_BORINGSSL
561 )
562
563 def aead_cipher_supported(self, cipher) -> bool:
564 return aead._aead_cipher_supported(self, cipher)
565
566 def _zero_data(self, data, length: int) -> None:
567 # We clear things this way because at the moment we're not
568 # sure of a better way that can guarantee it overwrites the
569 # memory of a bytearray and doesn't just replace the underlying char *.
570 for i in range(length):
571 data[i] = 0
572
573 @contextlib.contextmanager
574 def _zeroed_null_terminated_buf(self, data):
575 """
576 This method takes bytes, which can be a bytestring or a mutable
577 buffer like a bytearray, and yields a null-terminated version of that
578 data. This is required because PKCS12_parse doesn't take a length with
579 its password char * and ffi.from_buffer doesn't provide null
580 termination. So, to support zeroing the data via bytearray we
581 need to build this ridiculous construct that copies the memory, but
582 zeroes it after use.
583 """
584 if data is None:
585 yield self._ffi.NULL
586 else:
587 data_len = len(data)
588 buf = self._ffi.new("char[]", data_len + 1)
589 self._ffi.memmove(buf, data, data_len)
590 try:
591 yield buf
592 finally:
593 # Cast to a uint8_t * so we can assign by integer
594 self._zero_data(self._ffi.cast("uint8_t *", buf), data_len)
595
596 def load_key_and_certificates_from_pkcs12(
597 self, data: bytes, password: bytes | None
598 ) -> tuple[
599 PrivateKeyTypes | None,
600 x509.Certificate | None,
601 list[x509.Certificate],
602 ]:
603 pkcs12 = self.load_pkcs12(data, password)
604 return (
605 pkcs12.key,
606 pkcs12.cert.certificate if pkcs12.cert else None,
607 [cert.certificate for cert in pkcs12.additional_certs],
608 )
609
610 def load_pkcs12(
611 self, data: bytes, password: bytes | None
612 ) -> PKCS12KeyAndCertificates:
613 if password is not None:
614 utils._check_byteslike("password", password)
615
616 bio = self._bytes_to_bio(data)
617 p12 = self._lib.d2i_PKCS12_bio(bio.bio, self._ffi.NULL)
618 if p12 == self._ffi.NULL:
619 self._consume_errors()
620 raise ValueError("Could not deserialize PKCS12 data")
621
622 p12 = self._ffi.gc(p12, self._lib.PKCS12_free)
623 evp_pkey_ptr = self._ffi.new("EVP_PKEY **")
624 x509_ptr = self._ffi.new("X509 **")
625 sk_x509_ptr = self._ffi.new("Cryptography_STACK_OF_X509 **")
626 with self._zeroed_null_terminated_buf(password) as password_buf:
627 res = self._lib.PKCS12_parse(
628 p12, password_buf, evp_pkey_ptr, x509_ptr, sk_x509_ptr
629 )
630 if res == 0:
631 self._consume_errors()
632 raise ValueError("Invalid password or PKCS12 data")
633
634 cert = None
635 key = None
636 additional_certificates = []
637
638 if evp_pkey_ptr[0] != self._ffi.NULL:
639 evp_pkey = self._ffi.gc(evp_pkey_ptr[0], self._lib.EVP_PKEY_free)
640 # We don't support turning off RSA key validation when loading
641 # PKCS12 keys
642 key = rust_openssl.keys.private_key_from_ptr(
643 int(self._ffi.cast("uintptr_t", evp_pkey)),
644 unsafe_skip_rsa_key_validation=False,
645 )
646
647 if x509_ptr[0] != self._ffi.NULL:
648 x509 = self._ffi.gc(x509_ptr[0], self._lib.X509_free)
649 cert_obj = self._ossl2cert(x509)
650 name = None
651 maybe_name = self._lib.X509_alias_get0(x509, self._ffi.NULL)
652 if maybe_name != self._ffi.NULL:
653 name = self._ffi.string(maybe_name)
654 cert = PKCS12Certificate(cert_obj, name)
655
656 if sk_x509_ptr[0] != self._ffi.NULL:
657 sk_x509 = self._ffi.gc(sk_x509_ptr[0], self._lib.sk_X509_free)
658 num = self._lib.sk_X509_num(sk_x509_ptr[0])
659
660 # In OpenSSL < 3.0.0 PKCS12 parsing reverses the order of the
661 # certificates.
662 indices: typing.Iterable[int]
663 if (
664 self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER
665 or self._lib.CRYPTOGRAPHY_IS_BORINGSSL
666 ):
667 indices = range(num)
668 else:
669 indices = reversed(range(num))
670
671 for i in indices:
672 x509 = self._lib.sk_X509_value(sk_x509, i)
673 self.openssl_assert(x509 != self._ffi.NULL)
674 x509 = self._ffi.gc(x509, self._lib.X509_free)
675 addl_cert = self._ossl2cert(x509)
676 addl_name = None
677 maybe_name = self._lib.X509_alias_get0(x509, self._ffi.NULL)
678 if maybe_name != self._ffi.NULL:
679 addl_name = self._ffi.string(maybe_name)
680 additional_certificates.append(
681 PKCS12Certificate(addl_cert, addl_name)
682 )
683
684 return PKCS12KeyAndCertificates(key, cert, additional_certificates)
685
686 def serialize_key_and_certificates_to_pkcs12(
687 self,
688 name: bytes | None,
689 key: PKCS12PrivateKeyTypes | None,
690 cert: x509.Certificate | None,
691 cas: list[_PKCS12CATypes] | None,
692 encryption_algorithm: serialization.KeySerializationEncryption,
693 ) -> bytes:
694 password = None
695 if name is not None:
696 utils._check_bytes("name", name)
697
698 if isinstance(encryption_algorithm, serialization.NoEncryption):
699 nid_cert = -1
700 nid_key = -1
701 pkcs12_iter = 0
702 mac_iter = 0
703 mac_alg = self._ffi.NULL
704 elif isinstance(
705 encryption_algorithm, serialization.BestAvailableEncryption
706 ):
707 # PKCS12 encryption is hopeless trash and can never be fixed.
708 # OpenSSL 3 supports PBESv2, but Libre and Boring do not, so
709 # we use PBESv1 with 3DES on the older paths.
710 if self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER:
711 nid_cert = self._lib.NID_aes_256_cbc
712 nid_key = self._lib.NID_aes_256_cbc
713 else:
714 nid_cert = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC
715 nid_key = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC
716 # At least we can set this higher than OpenSSL's default
717 pkcs12_iter = 20000
718 # mac_iter chosen for compatibility reasons, see:
719 # https://www.openssl.org/docs/man1.1.1/man3/PKCS12_create.html
720 # Did we mention how lousy PKCS12 encryption is?
721 mac_iter = 1
722 # MAC algorithm can only be set on OpenSSL 3.0.0+
723 mac_alg = self._ffi.NULL
724 password = encryption_algorithm.password
725 elif (
726 isinstance(
727 encryption_algorithm, serialization._KeySerializationEncryption
728 )
729 and encryption_algorithm._format
730 is serialization.PrivateFormat.PKCS12
731 ):
732 # Default to OpenSSL's defaults. Behavior will vary based on the
733 # version of OpenSSL cryptography is compiled against.
734 nid_cert = 0
735 nid_key = 0
736 # Use the default iters we use in best available
737 pkcs12_iter = 20000
738 # See the Best Available comment for why this is 1
739 mac_iter = 1
740 password = encryption_algorithm.password
741 keycertalg = encryption_algorithm._key_cert_algorithm
742 if keycertalg is PBES.PBESv1SHA1And3KeyTripleDESCBC:
743 nid_cert = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC
744 nid_key = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC
745 elif keycertalg is PBES.PBESv2SHA256AndAES256CBC:
746 if not self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER:
747 raise UnsupportedAlgorithm(
748 "PBESv2 is not supported by this version of OpenSSL"
749 )
750 nid_cert = self._lib.NID_aes_256_cbc
751 nid_key = self._lib.NID_aes_256_cbc
752 else:
753 assert keycertalg is None
754 # We use OpenSSL's defaults
755
756 if encryption_algorithm._hmac_hash is not None:
757 if not self._lib.Cryptography_HAS_PKCS12_SET_MAC:
758 raise UnsupportedAlgorithm(
759 "Setting MAC algorithm is not supported by this "
760 "version of OpenSSL."
761 )
762 mac_alg = self._evp_md_non_null_from_algorithm(
763 encryption_algorithm._hmac_hash
764 )
765 self.openssl_assert(mac_alg != self._ffi.NULL)
766 else:
767 mac_alg = self._ffi.NULL
768
769 if encryption_algorithm._kdf_rounds is not None:
770 pkcs12_iter = encryption_algorithm._kdf_rounds
771
772 else:
773 raise ValueError("Unsupported key encryption type")
774
775 if cas is None or len(cas) == 0:
776 sk_x509 = self._ffi.NULL
777 else:
778 sk_x509 = self._lib.sk_X509_new_null()
779 sk_x509 = self._ffi.gc(sk_x509, self._lib.sk_X509_free)
780
781 # This list is to keep the x509 values alive until end of function
782 ossl_cas = []
783 for ca in cas:
784 if isinstance(ca, PKCS12Certificate):
785 ca_alias = ca.friendly_name
786 ossl_ca = self._cert2ossl(ca.certificate)
787 if ca_alias is None:
788 res = self._lib.X509_alias_set1(
789 ossl_ca, self._ffi.NULL, -1
790 )
791 else:
792 res = self._lib.X509_alias_set1(
793 ossl_ca, ca_alias, len(ca_alias)
794 )
795 self.openssl_assert(res == 1)
796 else:
797 ossl_ca = self._cert2ossl(ca)
798 ossl_cas.append(ossl_ca)
799 res = self._lib.sk_X509_push(sk_x509, ossl_ca)
800 backend.openssl_assert(res >= 1)
801
802 with self._zeroed_null_terminated_buf(password) as password_buf:
803 with self._zeroed_null_terminated_buf(name) as name_buf:
804 ossl_cert = self._cert2ossl(cert) if cert else self._ffi.NULL
805 ossl_pkey = (
806 self._key2ossl(key) if key is not None else self._ffi.NULL
807 )
808
809 p12 = self._lib.PKCS12_create(
810 password_buf,
811 name_buf,
812 ossl_pkey,
813 ossl_cert,
814 sk_x509,
815 nid_key,
816 nid_cert,
817 pkcs12_iter,
818 mac_iter,
819 0,
820 )
821 if p12 == self._ffi.NULL:
822 errors = self._consume_errors()
823 raise ValueError(
824 (
825 "Failed to create PKCS12 (does the key match the "
826 "certificate?)"
827 ),
828 errors,
829 )
830
831 if (
832 self._lib.Cryptography_HAS_PKCS12_SET_MAC
833 and mac_alg != self._ffi.NULL
834 ):
835 self._lib.PKCS12_set_mac(
836 p12,
837 password_buf,
838 -1,
839 self._ffi.NULL,
840 0,
841 mac_iter,
842 mac_alg,
843 )
844
845 self.openssl_assert(p12 != self._ffi.NULL)
846 p12 = self._ffi.gc(p12, self._lib.PKCS12_free)
847
848 bio = self._create_mem_bio_gc()
849 res = self._lib.i2d_PKCS12_bio(bio, p12)
850 self.openssl_assert(res > 0)
851 return self._read_mem_bio(bio)
852
853 def poly1305_supported(self) -> bool:
854 if self._fips_enabled:
855 return False
856 elif (
857 self._lib.CRYPTOGRAPHY_IS_BORINGSSL
858 or self._lib.CRYPTOGRAPHY_IS_LIBRESSL
859 ):
860 return True
861 else:
862 return self._lib.Cryptography_HAS_POLY1305 == 1
863
864 def pkcs7_supported(self) -> bool:
865 return not self._lib.CRYPTOGRAPHY_IS_BORINGSSL
866
867
868class GetCipherByName:
869 def __init__(self, fmt: str):
870 self._fmt = fmt
871
872 def __call__(self, backend: Backend, cipher: CipherAlgorithm, mode: Mode):
873 cipher_name = self._fmt.format(cipher=cipher, mode=mode).lower()
874 evp_cipher = backend._lib.EVP_get_cipherbyname(
875 cipher_name.encode("ascii")
876 )
877
878 # try EVP_CIPHER_fetch if present
879 if (
880 evp_cipher == backend._ffi.NULL
881 and backend._lib.Cryptography_HAS_300_EVP_CIPHER
882 ):
883 evp_cipher = backend._lib.EVP_CIPHER_fetch(
884 backend._ffi.NULL,
885 cipher_name.encode("ascii"),
886 backend._ffi.NULL,
887 )
888
889 backend._consume_errors()
890 return evp_cipher
891
892
893def _get_xts_cipher(backend: Backend, cipher: AES, mode):
894 cipher_name = f"aes-{cipher.key_size // 2}-xts"
895 return backend._lib.EVP_get_cipherbyname(cipher_name.encode("ascii"))
896
897
898backend = Backend()