Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/cryptography/hazmat/backends/openssl/backend.py: 31%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

394 statements  

1# This file is dual licensed under the terms of the Apache License, Version 

2# 2.0, and the BSD License. See the LICENSE file in the root of this repository 

3# for complete details. 

4 

5from __future__ import annotations 

6 

7import collections 

8import contextlib 

9import itertools 

10import typing 

11 

12from cryptography import utils, x509 

13from cryptography.exceptions import UnsupportedAlgorithm 

14from cryptography.hazmat.backends.openssl import aead 

15from cryptography.hazmat.backends.openssl.ciphers import _CipherContext 

16from cryptography.hazmat.bindings._rust import openssl as rust_openssl 

17from cryptography.hazmat.bindings.openssl import binding 

18from cryptography.hazmat.primitives import hashes, serialization 

19from cryptography.hazmat.primitives._asymmetric import AsymmetricPadding 

20from cryptography.hazmat.primitives.asymmetric import ec 

21from cryptography.hazmat.primitives.asymmetric import utils as asym_utils 

22from cryptography.hazmat.primitives.asymmetric.padding import ( 

23 MGF1, 

24 OAEP, 

25 PSS, 

26 PKCS1v15, 

27) 

28from cryptography.hazmat.primitives.asymmetric.types import ( 

29 PrivateKeyTypes, 

30) 

31from cryptography.hazmat.primitives.ciphers import ( 

32 CipherAlgorithm, 

33) 

34from cryptography.hazmat.primitives.ciphers.algorithms import ( 

35 AES, 

36 AES128, 

37 AES256, 

38 ARC4, 

39 SM4, 

40 Camellia, 

41 ChaCha20, 

42 TripleDES, 

43 _BlowfishInternal, 

44 _CAST5Internal, 

45 _IDEAInternal, 

46 _SEEDInternal, 

47) 

48from cryptography.hazmat.primitives.ciphers.modes import ( 

49 CBC, 

50 CFB, 

51 CFB8, 

52 CTR, 

53 ECB, 

54 GCM, 

55 OFB, 

56 XTS, 

57 Mode, 

58) 

59from cryptography.hazmat.primitives.serialization.pkcs12 import ( 

60 PBES, 

61 PKCS12Certificate, 

62 PKCS12KeyAndCertificates, 

63 PKCS12PrivateKeyTypes, 

64 _PKCS12CATypes, 

65) 

66 

67_MemoryBIO = collections.namedtuple("_MemoryBIO", ["bio", "char_ptr"]) 

68 

69 

70# Not actually supported, just used as a marker for some serialization tests. 

71class _RC2: 

72 pass 

73 

74 

75class Backend: 

76 """ 

77 OpenSSL API binding interfaces. 

78 """ 

79 

80 name = "openssl" 

81 

82 # FIPS has opinions about acceptable algorithms and key sizes, but the 

83 # disallowed algorithms are still present in OpenSSL. They just error if 

84 # you try to use them. To avoid that we allowlist the algorithms in 

85 # FIPS 140-3. This isn't ideal, but FIPS 140-3 is trash so here we are. 

86 _fips_aead: typing.ClassVar[set[bytes]] = { 

87 b"aes-128-ccm", 

88 b"aes-192-ccm", 

89 b"aes-256-ccm", 

90 b"aes-128-gcm", 

91 b"aes-192-gcm", 

92 b"aes-256-gcm", 

93 } 

94 # TripleDES encryption is disallowed/deprecated throughout 2023 in 

95 # FIPS 140-3. To keep it simple we denylist any use of TripleDES (TDEA). 

96 _fips_ciphers = (AES,) 

97 # Sometimes SHA1 is still permissible. That logic is contained 

98 # within the various *_supported methods. 

99 _fips_hashes = ( 

100 hashes.SHA224, 

101 hashes.SHA256, 

102 hashes.SHA384, 

103 hashes.SHA512, 

104 hashes.SHA512_224, 

105 hashes.SHA512_256, 

106 hashes.SHA3_224, 

107 hashes.SHA3_256, 

108 hashes.SHA3_384, 

109 hashes.SHA3_512, 

110 hashes.SHAKE128, 

111 hashes.SHAKE256, 

112 ) 

113 _fips_ecdh_curves = ( 

114 ec.SECP224R1, 

115 ec.SECP256R1, 

116 ec.SECP384R1, 

117 ec.SECP521R1, 

118 ) 

119 _fips_rsa_min_key_size = 2048 

120 _fips_rsa_min_public_exponent = 65537 

121 _fips_dsa_min_modulus = 1 << 2048 

122 _fips_dh_min_key_size = 2048 

123 _fips_dh_min_modulus = 1 << _fips_dh_min_key_size 

124 

125 def __init__(self) -> None: 

126 self._binding = binding.Binding() 

127 self._ffi = self._binding.ffi 

128 self._lib = self._binding.lib 

129 self._fips_enabled = rust_openssl.is_fips_enabled() 

130 

131 self._cipher_registry: dict[ 

132 tuple[type[CipherAlgorithm], type[Mode]], 

133 typing.Callable, 

134 ] = {} 

135 self._register_default_ciphers() 

136 

137 def __repr__(self) -> str: 

138 return "<OpenSSLBackend(version: {}, FIPS: {}, Legacy: {})>".format( 

139 self.openssl_version_text(), 

140 self._fips_enabled, 

141 rust_openssl._legacy_provider_loaded, 

142 ) 

143 

144 def openssl_assert( 

145 self, 

146 ok: bool, 

147 errors: list[rust_openssl.OpenSSLError] | None = None, 

148 ) -> None: 

149 return binding._openssl_assert(ok, errors=errors) 

150 

151 def _enable_fips(self) -> None: 

152 # This function enables FIPS mode for OpenSSL 3.0.0 on installs that 

153 # have the FIPS provider installed properly. 

154 self._binding._enable_fips() 

155 assert rust_openssl.is_fips_enabled() 

156 self._fips_enabled = rust_openssl.is_fips_enabled() 

157 

158 def openssl_version_text(self) -> str: 

159 """ 

160 Friendly string name of the loaded OpenSSL library. This is not 

161 necessarily the same version as it was compiled against. 

162 

163 Example: OpenSSL 1.1.1d 10 Sep 2019 

164 """ 

165 return self._ffi.string( 

166 self._lib.OpenSSL_version(self._lib.OPENSSL_VERSION) 

167 ).decode("ascii") 

168 

169 def openssl_version_number(self) -> int: 

170 return self._lib.OpenSSL_version_num() 

171 

172 def _evp_md_from_algorithm(self, algorithm: hashes.HashAlgorithm): 

173 if algorithm.name in ("blake2b", "blake2s"): 

174 alg = f"{algorithm.name}{algorithm.digest_size * 8}".encode( 

175 "ascii" 

176 ) 

177 else: 

178 alg = algorithm.name.encode("ascii") 

179 

180 evp_md = self._lib.EVP_get_digestbyname(alg) 

181 return evp_md 

182 

183 def _evp_md_non_null_from_algorithm(self, algorithm: hashes.HashAlgorithm): 

184 evp_md = self._evp_md_from_algorithm(algorithm) 

185 self.openssl_assert(evp_md != self._ffi.NULL) 

186 return evp_md 

187 

188 def hash_supported(self, algorithm: hashes.HashAlgorithm) -> bool: 

189 if self._fips_enabled and not isinstance(algorithm, self._fips_hashes): 

190 return False 

191 

192 evp_md = self._evp_md_from_algorithm(algorithm) 

193 return evp_md != self._ffi.NULL 

194 

195 def signature_hash_supported( 

196 self, algorithm: hashes.HashAlgorithm 

197 ) -> bool: 

198 # Dedicated check for hashing algorithm use in message digest for 

199 # signatures, e.g. RSA PKCS#1 v1.5 SHA1 (sha1WithRSAEncryption). 

200 if self._fips_enabled and isinstance(algorithm, hashes.SHA1): 

201 return False 

202 return self.hash_supported(algorithm) 

203 

204 def scrypt_supported(self) -> bool: 

205 if self._fips_enabled: 

206 return False 

207 else: 

208 return self._lib.Cryptography_HAS_SCRYPT == 1 

209 

210 def hmac_supported(self, algorithm: hashes.HashAlgorithm) -> bool: 

211 # FIPS mode still allows SHA1 for HMAC 

212 if self._fips_enabled and isinstance(algorithm, hashes.SHA1): 

213 return True 

214 

215 return self.hash_supported(algorithm) 

216 

217 def cipher_supported(self, cipher: CipherAlgorithm, mode: Mode) -> bool: 

218 if self._fips_enabled: 

219 # FIPS mode requires AES. TripleDES is disallowed/deprecated in 

220 # FIPS 140-3. 

221 if not isinstance(cipher, self._fips_ciphers): 

222 return False 

223 

224 try: 

225 adapter = self._cipher_registry[type(cipher), type(mode)] 

226 except KeyError: 

227 return False 

228 evp_cipher = adapter(self, cipher, mode) 

229 return self._ffi.NULL != evp_cipher 

230 

231 def register_cipher_adapter(self, cipher_cls, mode_cls, adapter) -> None: 

232 if (cipher_cls, mode_cls) in self._cipher_registry: 

233 raise ValueError( 

234 f"Duplicate registration for: {cipher_cls} {mode_cls}." 

235 ) 

236 self._cipher_registry[cipher_cls, mode_cls] = adapter 

237 

238 def _register_default_ciphers(self) -> None: 

239 for cipher_cls in [AES, AES128, AES256]: 

240 for mode_cls in [CBC, CTR, ECB, OFB, CFB, CFB8, GCM]: 

241 self.register_cipher_adapter( 

242 cipher_cls, 

243 mode_cls, 

244 GetCipherByName( 

245 "{cipher.name}-{cipher.key_size}-{mode.name}" 

246 ), 

247 ) 

248 for mode_cls in [CBC, CTR, ECB, OFB, CFB]: 

249 self.register_cipher_adapter( 

250 Camellia, 

251 mode_cls, 

252 GetCipherByName("{cipher.name}-{cipher.key_size}-{mode.name}"), 

253 ) 

254 for mode_cls in [CBC, CFB, CFB8, OFB]: 

255 self.register_cipher_adapter( 

256 TripleDES, mode_cls, GetCipherByName("des-ede3-{mode.name}") 

257 ) 

258 self.register_cipher_adapter( 

259 TripleDES, ECB, GetCipherByName("des-ede3") 

260 ) 

261 # ChaCha20 uses the Long Name "chacha20" in OpenSSL, but in LibreSSL 

262 # it uses "chacha" 

263 self.register_cipher_adapter( 

264 ChaCha20, 

265 type(None), 

266 GetCipherByName( 

267 "chacha" if self._lib.CRYPTOGRAPHY_IS_LIBRESSL else "chacha20" 

268 ), 

269 ) 

270 self.register_cipher_adapter(AES, XTS, _get_xts_cipher) 

271 for mode_cls in [ECB, CBC, OFB, CFB, CTR, GCM]: 

272 self.register_cipher_adapter( 

273 SM4, mode_cls, GetCipherByName("sm4-{mode.name}") 

274 ) 

275 # Don't register legacy ciphers if they're unavailable. Hypothetically 

276 # this wouldn't be necessary because we test availability by seeing if 

277 # we get an EVP_CIPHER * in the _CipherContext __init__, but OpenSSL 3 

278 # will return a valid pointer even though the cipher is unavailable. 

279 if ( 

280 rust_openssl._legacy_provider_loaded 

281 or not self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER 

282 ): 

283 for mode_cls in [CBC, CFB, OFB, ECB]: 

284 self.register_cipher_adapter( 

285 _BlowfishInternal, 

286 mode_cls, 

287 GetCipherByName("bf-{mode.name}"), 

288 ) 

289 for mode_cls in [CBC, CFB, OFB, ECB]: 

290 self.register_cipher_adapter( 

291 _SEEDInternal, 

292 mode_cls, 

293 GetCipherByName("seed-{mode.name}"), 

294 ) 

295 for cipher_cls, mode_cls in itertools.product( 

296 [_CAST5Internal, _IDEAInternal], 

297 [CBC, OFB, CFB, ECB], 

298 ): 

299 self.register_cipher_adapter( 

300 cipher_cls, 

301 mode_cls, 

302 GetCipherByName("{cipher.name}-{mode.name}"), 

303 ) 

304 self.register_cipher_adapter( 

305 ARC4, type(None), GetCipherByName("rc4") 

306 ) 

307 # We don't actually support RC2, this is just used by some tests. 

308 self.register_cipher_adapter( 

309 _RC2, type(None), GetCipherByName("rc2") 

310 ) 

311 

312 def create_symmetric_encryption_ctx( 

313 self, cipher: CipherAlgorithm, mode: Mode 

314 ) -> _CipherContext: 

315 return _CipherContext(self, cipher, mode, _CipherContext._ENCRYPT) 

316 

317 def create_symmetric_decryption_ctx( 

318 self, cipher: CipherAlgorithm, mode: Mode 

319 ) -> _CipherContext: 

320 return _CipherContext(self, cipher, mode, _CipherContext._DECRYPT) 

321 

322 def pbkdf2_hmac_supported(self, algorithm: hashes.HashAlgorithm) -> bool: 

323 return self.hmac_supported(algorithm) 

324 

325 def _consume_errors(self) -> list[rust_openssl.OpenSSLError]: 

326 return rust_openssl.capture_error_stack() 

327 

328 def generate_rsa_parameters_supported( 

329 self, public_exponent: int, key_size: int 

330 ) -> bool: 

331 return ( 

332 public_exponent >= 3 

333 and public_exponent & 1 != 0 

334 and key_size >= 512 

335 ) 

336 

337 def _bytes_to_bio(self, data: bytes) -> _MemoryBIO: 

338 """ 

339 Return a _MemoryBIO namedtuple of (BIO, char*). 

340 

341 The char* is the storage for the BIO and it must stay alive until the 

342 BIO is finished with. 

343 """ 

344 data_ptr = self._ffi.from_buffer(data) 

345 bio = self._lib.BIO_new_mem_buf(data_ptr, len(data)) 

346 self.openssl_assert(bio != self._ffi.NULL) 

347 

348 return _MemoryBIO(self._ffi.gc(bio, self._lib.BIO_free), data_ptr) 

349 

350 def _create_mem_bio_gc(self): 

351 """ 

352 Creates an empty memory BIO. 

353 """ 

354 bio_method = self._lib.BIO_s_mem() 

355 self.openssl_assert(bio_method != self._ffi.NULL) 

356 bio = self._lib.BIO_new(bio_method) 

357 self.openssl_assert(bio != self._ffi.NULL) 

358 bio = self._ffi.gc(bio, self._lib.BIO_free) 

359 return bio 

360 

361 def _read_mem_bio(self, bio) -> bytes: 

362 """ 

363 Reads a memory BIO. This only works on memory BIOs. 

364 """ 

365 buf = self._ffi.new("char **") 

366 buf_len = self._lib.BIO_get_mem_data(bio, buf) 

367 self.openssl_assert(buf_len > 0) 

368 self.openssl_assert(buf[0] != self._ffi.NULL) 

369 bio_data = self._ffi.buffer(buf[0], buf_len)[:] 

370 return bio_data 

371 

372 def _oaep_hash_supported(self, algorithm: hashes.HashAlgorithm) -> bool: 

373 if self._fips_enabled and isinstance(algorithm, hashes.SHA1): 

374 return False 

375 

376 return isinstance( 

377 algorithm, 

378 ( 

379 hashes.SHA1, 

380 hashes.SHA224, 

381 hashes.SHA256, 

382 hashes.SHA384, 

383 hashes.SHA512, 

384 ), 

385 ) 

386 

387 def rsa_padding_supported(self, padding: AsymmetricPadding) -> bool: 

388 if isinstance(padding, PKCS1v15): 

389 return True 

390 elif isinstance(padding, PSS) and isinstance(padding._mgf, MGF1): 

391 # SHA1 is permissible in MGF1 in FIPS even when SHA1 is blocked 

392 # as signature algorithm. 

393 if self._fips_enabled and isinstance( 

394 padding._mgf._algorithm, hashes.SHA1 

395 ): 

396 return True 

397 else: 

398 return self.hash_supported(padding._mgf._algorithm) 

399 elif isinstance(padding, OAEP) and isinstance(padding._mgf, MGF1): 

400 return self._oaep_hash_supported( 

401 padding._mgf._algorithm 

402 ) and self._oaep_hash_supported(padding._algorithm) 

403 else: 

404 return False 

405 

406 def rsa_encryption_supported(self, padding: AsymmetricPadding) -> bool: 

407 if self._fips_enabled and isinstance(padding, PKCS1v15): 

408 return False 

409 else: 

410 return self.rsa_padding_supported(padding) 

411 

412 def dsa_supported(self) -> bool: 

413 return ( 

414 not self._lib.CRYPTOGRAPHY_IS_BORINGSSL and not self._fips_enabled 

415 ) 

416 

417 def dsa_hash_supported(self, algorithm: hashes.HashAlgorithm) -> bool: 

418 if not self.dsa_supported(): 

419 return False 

420 return self.signature_hash_supported(algorithm) 

421 

422 def cmac_algorithm_supported(self, algorithm) -> bool: 

423 return self.cipher_supported( 

424 algorithm, CBC(b"\x00" * algorithm.block_size) 

425 ) 

426 

427 def _cert2ossl(self, cert: x509.Certificate) -> typing.Any: 

428 data = cert.public_bytes(serialization.Encoding.DER) 

429 mem_bio = self._bytes_to_bio(data) 

430 x509 = self._lib.d2i_X509_bio(mem_bio.bio, self._ffi.NULL) 

431 self.openssl_assert(x509 != self._ffi.NULL) 

432 x509 = self._ffi.gc(x509, self._lib.X509_free) 

433 return x509 

434 

435 def _ossl2cert(self, x509_ptr: typing.Any) -> x509.Certificate: 

436 bio = self._create_mem_bio_gc() 

437 res = self._lib.i2d_X509_bio(bio, x509_ptr) 

438 self.openssl_assert(res == 1) 

439 return x509.load_der_x509_certificate(self._read_mem_bio(bio)) 

440 

441 def _key2ossl(self, key: PKCS12PrivateKeyTypes) -> typing.Any: 

442 data = key.private_bytes( 

443 serialization.Encoding.DER, 

444 serialization.PrivateFormat.PKCS8, 

445 serialization.NoEncryption(), 

446 ) 

447 mem_bio = self._bytes_to_bio(data) 

448 

449 evp_pkey = self._lib.d2i_PrivateKey_bio( 

450 mem_bio.bio, 

451 self._ffi.NULL, 

452 ) 

453 self.openssl_assert(evp_pkey != self._ffi.NULL) 

454 return self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 

455 

456 def _handle_key_loading_error( 

457 self, errors: list[rust_openssl.OpenSSLError] 

458 ) -> typing.NoReturn: 

459 if not errors: 

460 raise ValueError( 

461 "Could not deserialize key data. The data may be in an " 

462 "incorrect format or it may be encrypted with an unsupported " 

463 "algorithm." 

464 ) 

465 

466 elif ( 

467 errors[0]._lib_reason_match( 

468 self._lib.ERR_LIB_EVP, self._lib.EVP_R_BAD_DECRYPT 

469 ) 

470 or errors[0]._lib_reason_match( 

471 self._lib.ERR_LIB_PKCS12, 

472 self._lib.PKCS12_R_PKCS12_CIPHERFINAL_ERROR, 

473 ) 

474 or ( 

475 self._lib.Cryptography_HAS_PROVIDERS 

476 and errors[0]._lib_reason_match( 

477 self._lib.ERR_LIB_PROV, 

478 self._lib.PROV_R_BAD_DECRYPT, 

479 ) 

480 ) 

481 ): 

482 raise ValueError("Bad decrypt. Incorrect password?") 

483 

484 elif any( 

485 error._lib_reason_match( 

486 self._lib.ERR_LIB_EVP, 

487 self._lib.EVP_R_UNSUPPORTED_PRIVATE_KEY_ALGORITHM, 

488 ) 

489 for error in errors 

490 ): 

491 raise ValueError("Unsupported public key algorithm.") 

492 

493 else: 

494 raise ValueError( 

495 "Could not deserialize key data. The data may be in an " 

496 "incorrect format, it may be encrypted with an unsupported " 

497 "algorithm, or it may be an unsupported key type (e.g. EC " 

498 "curves with explicit parameters).", 

499 errors, 

500 ) 

501 

502 def elliptic_curve_supported(self, curve: ec.EllipticCurve) -> bool: 

503 if self._fips_enabled and not isinstance( 

504 curve, self._fips_ecdh_curves 

505 ): 

506 return False 

507 

508 return rust_openssl.ec.curve_supported(curve) 

509 

510 def elliptic_curve_signature_algorithm_supported( 

511 self, 

512 signature_algorithm: ec.EllipticCurveSignatureAlgorithm, 

513 curve: ec.EllipticCurve, 

514 ) -> bool: 

515 # We only support ECDSA right now. 

516 if not isinstance(signature_algorithm, ec.ECDSA): 

517 return False 

518 

519 return self.elliptic_curve_supported(curve) and ( 

520 isinstance(signature_algorithm.algorithm, asym_utils.Prehashed) 

521 or self.hash_supported(signature_algorithm.algorithm) 

522 ) 

523 

524 def elliptic_curve_exchange_algorithm_supported( 

525 self, algorithm: ec.ECDH, curve: ec.EllipticCurve 

526 ) -> bool: 

527 return self.elliptic_curve_supported(curve) and isinstance( 

528 algorithm, ec.ECDH 

529 ) 

530 

531 def dh_supported(self) -> bool: 

532 return not self._lib.CRYPTOGRAPHY_IS_BORINGSSL 

533 

534 def dh_x942_serialization_supported(self) -> bool: 

535 return self._lib.Cryptography_HAS_EVP_PKEY_DHX == 1 

536 

537 def x25519_supported(self) -> bool: 

538 if self._fips_enabled: 

539 return False 

540 return True 

541 

542 def x448_supported(self) -> bool: 

543 if self._fips_enabled: 

544 return False 

545 return ( 

546 not self._lib.CRYPTOGRAPHY_IS_LIBRESSL 

547 and not self._lib.CRYPTOGRAPHY_IS_BORINGSSL 

548 ) 

549 

550 def ed25519_supported(self) -> bool: 

551 if self._fips_enabled: 

552 return False 

553 return True 

554 

555 def ed448_supported(self) -> bool: 

556 if self._fips_enabled: 

557 return False 

558 return ( 

559 not self._lib.CRYPTOGRAPHY_IS_LIBRESSL 

560 and not self._lib.CRYPTOGRAPHY_IS_BORINGSSL 

561 ) 

562 

563 def aead_cipher_supported(self, cipher) -> bool: 

564 return aead._aead_cipher_supported(self, cipher) 

565 

566 def _zero_data(self, data, length: int) -> None: 

567 # We clear things this way because at the moment we're not 

568 # sure of a better way that can guarantee it overwrites the 

569 # memory of a bytearray and doesn't just replace the underlying char *. 

570 for i in range(length): 

571 data[i] = 0 

572 

573 @contextlib.contextmanager 

574 def _zeroed_null_terminated_buf(self, data): 

575 """ 

576 This method takes bytes, which can be a bytestring or a mutable 

577 buffer like a bytearray, and yields a null-terminated version of that 

578 data. This is required because PKCS12_parse doesn't take a length with 

579 its password char * and ffi.from_buffer doesn't provide null 

580 termination. So, to support zeroing the data via bytearray we 

581 need to build this ridiculous construct that copies the memory, but 

582 zeroes it after use. 

583 """ 

584 if data is None: 

585 yield self._ffi.NULL 

586 else: 

587 data_len = len(data) 

588 buf = self._ffi.new("char[]", data_len + 1) 

589 self._ffi.memmove(buf, data, data_len) 

590 try: 

591 yield buf 

592 finally: 

593 # Cast to a uint8_t * so we can assign by integer 

594 self._zero_data(self._ffi.cast("uint8_t *", buf), data_len) 

595 

596 def load_key_and_certificates_from_pkcs12( 

597 self, data: bytes, password: bytes | None 

598 ) -> tuple[ 

599 PrivateKeyTypes | None, 

600 x509.Certificate | None, 

601 list[x509.Certificate], 

602 ]: 

603 pkcs12 = self.load_pkcs12(data, password) 

604 return ( 

605 pkcs12.key, 

606 pkcs12.cert.certificate if pkcs12.cert else None, 

607 [cert.certificate for cert in pkcs12.additional_certs], 

608 ) 

609 

610 def load_pkcs12( 

611 self, data: bytes, password: bytes | None 

612 ) -> PKCS12KeyAndCertificates: 

613 if password is not None: 

614 utils._check_byteslike("password", password) 

615 

616 bio = self._bytes_to_bio(data) 

617 p12 = self._lib.d2i_PKCS12_bio(bio.bio, self._ffi.NULL) 

618 if p12 == self._ffi.NULL: 

619 self._consume_errors() 

620 raise ValueError("Could not deserialize PKCS12 data") 

621 

622 p12 = self._ffi.gc(p12, self._lib.PKCS12_free) 

623 evp_pkey_ptr = self._ffi.new("EVP_PKEY **") 

624 x509_ptr = self._ffi.new("X509 **") 

625 sk_x509_ptr = self._ffi.new("Cryptography_STACK_OF_X509 **") 

626 with self._zeroed_null_terminated_buf(password) as password_buf: 

627 res = self._lib.PKCS12_parse( 

628 p12, password_buf, evp_pkey_ptr, x509_ptr, sk_x509_ptr 

629 ) 

630 if res == 0: 

631 self._consume_errors() 

632 raise ValueError("Invalid password or PKCS12 data") 

633 

634 cert = None 

635 key = None 

636 additional_certificates = [] 

637 

638 if evp_pkey_ptr[0] != self._ffi.NULL: 

639 evp_pkey = self._ffi.gc(evp_pkey_ptr[0], self._lib.EVP_PKEY_free) 

640 # We don't support turning off RSA key validation when loading 

641 # PKCS12 keys 

642 key = rust_openssl.keys.private_key_from_ptr( 

643 int(self._ffi.cast("uintptr_t", evp_pkey)), 

644 unsafe_skip_rsa_key_validation=False, 

645 ) 

646 

647 if x509_ptr[0] != self._ffi.NULL: 

648 x509 = self._ffi.gc(x509_ptr[0], self._lib.X509_free) 

649 cert_obj = self._ossl2cert(x509) 

650 name = None 

651 maybe_name = self._lib.X509_alias_get0(x509, self._ffi.NULL) 

652 if maybe_name != self._ffi.NULL: 

653 name = self._ffi.string(maybe_name) 

654 cert = PKCS12Certificate(cert_obj, name) 

655 

656 if sk_x509_ptr[0] != self._ffi.NULL: 

657 sk_x509 = self._ffi.gc(sk_x509_ptr[0], self._lib.sk_X509_free) 

658 num = self._lib.sk_X509_num(sk_x509_ptr[0]) 

659 

660 # In OpenSSL < 3.0.0 PKCS12 parsing reverses the order of the 

661 # certificates. 

662 indices: typing.Iterable[int] 

663 if ( 

664 self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER 

665 or self._lib.CRYPTOGRAPHY_IS_BORINGSSL 

666 ): 

667 indices = range(num) 

668 else: 

669 indices = reversed(range(num)) 

670 

671 for i in indices: 

672 x509 = self._lib.sk_X509_value(sk_x509, i) 

673 self.openssl_assert(x509 != self._ffi.NULL) 

674 x509 = self._ffi.gc(x509, self._lib.X509_free) 

675 addl_cert = self._ossl2cert(x509) 

676 addl_name = None 

677 maybe_name = self._lib.X509_alias_get0(x509, self._ffi.NULL) 

678 if maybe_name != self._ffi.NULL: 

679 addl_name = self._ffi.string(maybe_name) 

680 additional_certificates.append( 

681 PKCS12Certificate(addl_cert, addl_name) 

682 ) 

683 

684 return PKCS12KeyAndCertificates(key, cert, additional_certificates) 

685 

686 def serialize_key_and_certificates_to_pkcs12( 

687 self, 

688 name: bytes | None, 

689 key: PKCS12PrivateKeyTypes | None, 

690 cert: x509.Certificate | None, 

691 cas: list[_PKCS12CATypes] | None, 

692 encryption_algorithm: serialization.KeySerializationEncryption, 

693 ) -> bytes: 

694 password = None 

695 if name is not None: 

696 utils._check_bytes("name", name) 

697 

698 if isinstance(encryption_algorithm, serialization.NoEncryption): 

699 nid_cert = -1 

700 nid_key = -1 

701 pkcs12_iter = 0 

702 mac_iter = 0 

703 mac_alg = self._ffi.NULL 

704 elif isinstance( 

705 encryption_algorithm, serialization.BestAvailableEncryption 

706 ): 

707 # PKCS12 encryption is hopeless trash and can never be fixed. 

708 # OpenSSL 3 supports PBESv2, but Libre and Boring do not, so 

709 # we use PBESv1 with 3DES on the older paths. 

710 if self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER: 

711 nid_cert = self._lib.NID_aes_256_cbc 

712 nid_key = self._lib.NID_aes_256_cbc 

713 else: 

714 nid_cert = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC 

715 nid_key = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC 

716 # At least we can set this higher than OpenSSL's default 

717 pkcs12_iter = 20000 

718 # mac_iter chosen for compatibility reasons, see: 

719 # https://www.openssl.org/docs/man1.1.1/man3/PKCS12_create.html 

720 # Did we mention how lousy PKCS12 encryption is? 

721 mac_iter = 1 

722 # MAC algorithm can only be set on OpenSSL 3.0.0+ 

723 mac_alg = self._ffi.NULL 

724 password = encryption_algorithm.password 

725 elif ( 

726 isinstance( 

727 encryption_algorithm, serialization._KeySerializationEncryption 

728 ) 

729 and encryption_algorithm._format 

730 is serialization.PrivateFormat.PKCS12 

731 ): 

732 # Default to OpenSSL's defaults. Behavior will vary based on the 

733 # version of OpenSSL cryptography is compiled against. 

734 nid_cert = 0 

735 nid_key = 0 

736 # Use the default iters we use in best available 

737 pkcs12_iter = 20000 

738 # See the Best Available comment for why this is 1 

739 mac_iter = 1 

740 password = encryption_algorithm.password 

741 keycertalg = encryption_algorithm._key_cert_algorithm 

742 if keycertalg is PBES.PBESv1SHA1And3KeyTripleDESCBC: 

743 nid_cert = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC 

744 nid_key = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC 

745 elif keycertalg is PBES.PBESv2SHA256AndAES256CBC: 

746 if not self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER: 

747 raise UnsupportedAlgorithm( 

748 "PBESv2 is not supported by this version of OpenSSL" 

749 ) 

750 nid_cert = self._lib.NID_aes_256_cbc 

751 nid_key = self._lib.NID_aes_256_cbc 

752 else: 

753 assert keycertalg is None 

754 # We use OpenSSL's defaults 

755 

756 if encryption_algorithm._hmac_hash is not None: 

757 if not self._lib.Cryptography_HAS_PKCS12_SET_MAC: 

758 raise UnsupportedAlgorithm( 

759 "Setting MAC algorithm is not supported by this " 

760 "version of OpenSSL." 

761 ) 

762 mac_alg = self._evp_md_non_null_from_algorithm( 

763 encryption_algorithm._hmac_hash 

764 ) 

765 self.openssl_assert(mac_alg != self._ffi.NULL) 

766 else: 

767 mac_alg = self._ffi.NULL 

768 

769 if encryption_algorithm._kdf_rounds is not None: 

770 pkcs12_iter = encryption_algorithm._kdf_rounds 

771 

772 else: 

773 raise ValueError("Unsupported key encryption type") 

774 

775 if cas is None or len(cas) == 0: 

776 sk_x509 = self._ffi.NULL 

777 else: 

778 sk_x509 = self._lib.sk_X509_new_null() 

779 sk_x509 = self._ffi.gc(sk_x509, self._lib.sk_X509_free) 

780 

781 # This list is to keep the x509 values alive until end of function 

782 ossl_cas = [] 

783 for ca in cas: 

784 if isinstance(ca, PKCS12Certificate): 

785 ca_alias = ca.friendly_name 

786 ossl_ca = self._cert2ossl(ca.certificate) 

787 if ca_alias is None: 

788 res = self._lib.X509_alias_set1( 

789 ossl_ca, self._ffi.NULL, -1 

790 ) 

791 else: 

792 res = self._lib.X509_alias_set1( 

793 ossl_ca, ca_alias, len(ca_alias) 

794 ) 

795 self.openssl_assert(res == 1) 

796 else: 

797 ossl_ca = self._cert2ossl(ca) 

798 ossl_cas.append(ossl_ca) 

799 res = self._lib.sk_X509_push(sk_x509, ossl_ca) 

800 backend.openssl_assert(res >= 1) 

801 

802 with self._zeroed_null_terminated_buf(password) as password_buf: 

803 with self._zeroed_null_terminated_buf(name) as name_buf: 

804 ossl_cert = self._cert2ossl(cert) if cert else self._ffi.NULL 

805 ossl_pkey = ( 

806 self._key2ossl(key) if key is not None else self._ffi.NULL 

807 ) 

808 

809 p12 = self._lib.PKCS12_create( 

810 password_buf, 

811 name_buf, 

812 ossl_pkey, 

813 ossl_cert, 

814 sk_x509, 

815 nid_key, 

816 nid_cert, 

817 pkcs12_iter, 

818 mac_iter, 

819 0, 

820 ) 

821 if p12 == self._ffi.NULL: 

822 errors = self._consume_errors() 

823 raise ValueError( 

824 ( 

825 "Failed to create PKCS12 (does the key match the " 

826 "certificate?)" 

827 ), 

828 errors, 

829 ) 

830 

831 if ( 

832 self._lib.Cryptography_HAS_PKCS12_SET_MAC 

833 and mac_alg != self._ffi.NULL 

834 ): 

835 self._lib.PKCS12_set_mac( 

836 p12, 

837 password_buf, 

838 -1, 

839 self._ffi.NULL, 

840 0, 

841 mac_iter, 

842 mac_alg, 

843 ) 

844 

845 self.openssl_assert(p12 != self._ffi.NULL) 

846 p12 = self._ffi.gc(p12, self._lib.PKCS12_free) 

847 

848 bio = self._create_mem_bio_gc() 

849 res = self._lib.i2d_PKCS12_bio(bio, p12) 

850 self.openssl_assert(res > 0) 

851 return self._read_mem_bio(bio) 

852 

853 def poly1305_supported(self) -> bool: 

854 if self._fips_enabled: 

855 return False 

856 elif ( 

857 self._lib.CRYPTOGRAPHY_IS_BORINGSSL 

858 or self._lib.CRYPTOGRAPHY_IS_LIBRESSL 

859 ): 

860 return True 

861 else: 

862 return self._lib.Cryptography_HAS_POLY1305 == 1 

863 

864 def pkcs7_supported(self) -> bool: 

865 return not self._lib.CRYPTOGRAPHY_IS_BORINGSSL 

866 

867 

868class GetCipherByName: 

869 def __init__(self, fmt: str): 

870 self._fmt = fmt 

871 

872 def __call__(self, backend: Backend, cipher: CipherAlgorithm, mode: Mode): 

873 cipher_name = self._fmt.format(cipher=cipher, mode=mode).lower() 

874 evp_cipher = backend._lib.EVP_get_cipherbyname( 

875 cipher_name.encode("ascii") 

876 ) 

877 

878 # try EVP_CIPHER_fetch if present 

879 if ( 

880 evp_cipher == backend._ffi.NULL 

881 and backend._lib.Cryptography_HAS_300_EVP_CIPHER 

882 ): 

883 evp_cipher = backend._lib.EVP_CIPHER_fetch( 

884 backend._ffi.NULL, 

885 cipher_name.encode("ascii"), 

886 backend._ffi.NULL, 

887 ) 

888 

889 backend._consume_errors() 

890 return evp_cipher 

891 

892 

893def _get_xts_cipher(backend: Backend, cipher: AES, mode): 

894 cipher_name = f"aes-{cipher.key_size // 2}-xts" 

895 return backend._lib.EVP_get_cipherbyname(cipher_name.encode("ascii")) 

896 

897 

898backend = Backend()