Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/cryptography/hazmat/backends/openssl/backend.py: 24%

1231 statements  

« prev     ^ index     » next       coverage.py v7.2.1, created at 2023-03-14 06:36 +0000

1# This file is dual licensed under the terms of the Apache License, Version 

2# 2.0, and the BSD License. See the LICENSE file in the root of this repository 

3# for complete details. 

4 

5 

6import collections 

7import contextlib 

8import itertools 

9import typing 

10import warnings 

11from contextlib import contextmanager 

12 

13from cryptography import utils, x509 

14from cryptography.exceptions import UnsupportedAlgorithm, _Reasons 

15from cryptography.hazmat.backends.openssl import aead 

16from cryptography.hazmat.backends.openssl.ciphers import _CipherContext 

17from cryptography.hazmat.backends.openssl.cmac import _CMACContext 

18from cryptography.hazmat.backends.openssl.dh import ( 

19 _dh_params_dup, 

20 _DHParameters, 

21 _DHPrivateKey, 

22 _DHPublicKey, 

23) 

24from cryptography.hazmat.backends.openssl.dsa import ( 

25 _DSAParameters, 

26 _DSAPrivateKey, 

27 _DSAPublicKey, 

28) 

29from cryptography.hazmat.backends.openssl.ec import ( 

30 _EllipticCurvePrivateKey, 

31 _EllipticCurvePublicKey, 

32) 

33from cryptography.hazmat.backends.openssl.ed448 import ( 

34 _ED448_KEY_SIZE, 

35 _Ed448PrivateKey, 

36 _Ed448PublicKey, 

37) 

38from cryptography.hazmat.backends.openssl.ed25519 import ( 

39 _Ed25519PrivateKey, 

40 _Ed25519PublicKey, 

41) 

42from cryptography.hazmat.backends.openssl.hashes import _HashContext 

43from cryptography.hazmat.backends.openssl.hmac import _HMACContext 

44from cryptography.hazmat.backends.openssl.poly1305 import ( 

45 _POLY1305_KEY_SIZE, 

46 _Poly1305Context, 

47) 

48from cryptography.hazmat.backends.openssl.rsa import ( 

49 _RSAPrivateKey, 

50 _RSAPublicKey, 

51) 

52from cryptography.hazmat.backends.openssl.x448 import ( 

53 _X448PrivateKey, 

54 _X448PublicKey, 

55) 

56from cryptography.hazmat.backends.openssl.x25519 import ( 

57 _X25519PrivateKey, 

58 _X25519PublicKey, 

59) 

60from cryptography.hazmat.bindings.openssl import binding 

61from cryptography.hazmat.primitives import hashes, serialization 

62from cryptography.hazmat.primitives._asymmetric import AsymmetricPadding 

63from cryptography.hazmat.primitives.asymmetric import ( 

64 dh, 

65 dsa, 

66 ec, 

67 ed448, 

68 ed25519, 

69 rsa, 

70 x448, 

71 x25519, 

72) 

73from cryptography.hazmat.primitives.asymmetric.padding import ( 

74 MGF1, 

75 OAEP, 

76 PSS, 

77 PKCS1v15, 

78) 

79from cryptography.hazmat.primitives.asymmetric.types import ( 

80 PrivateKeyTypes, 

81 PublicKeyTypes, 

82) 

83from cryptography.hazmat.primitives.ciphers import ( 

84 BlockCipherAlgorithm, 

85 CipherAlgorithm, 

86) 

87from cryptography.hazmat.primitives.ciphers.algorithms import ( 

88 AES, 

89 AES128, 

90 AES256, 

91 ARC4, 

92 SM4, 

93 Camellia, 

94 ChaCha20, 

95 TripleDES, 

96 _BlowfishInternal, 

97 _CAST5Internal, 

98 _IDEAInternal, 

99 _SEEDInternal, 

100) 

101from cryptography.hazmat.primitives.ciphers.modes import ( 

102 CBC, 

103 CFB, 

104 CFB8, 

105 CTR, 

106 ECB, 

107 GCM, 

108 OFB, 

109 XTS, 

110 Mode, 

111) 

112from cryptography.hazmat.primitives.kdf import scrypt 

113from cryptography.hazmat.primitives.serialization import ssh 

114from cryptography.hazmat.primitives.serialization.pkcs12 import ( 

115 PBES, 

116 PKCS12Certificate, 

117 PKCS12KeyAndCertificates, 

118 PKCS12PrivateKeyTypes, 

119 _PKCS12CATypes, 

120) 

121 

122_MemoryBIO = collections.namedtuple("_MemoryBIO", ["bio", "char_ptr"]) 

123 

124 

125# Not actually supported, just used as a marker for some serialization tests. 

126class _RC2: 

127 pass 

128 

129 

130class Backend: 

131 """ 

132 OpenSSL API binding interfaces. 

133 """ 

134 

135 name = "openssl" 

136 

137 # FIPS has opinions about acceptable algorithms and key sizes, but the 

138 # disallowed algorithms are still present in OpenSSL. They just error if 

139 # you try to use them. To avoid that we allowlist the algorithms in 

140 # FIPS 140-3. This isn't ideal, but FIPS 140-3 is trash so here we are. 

141 _fips_aead = { 

142 b"aes-128-ccm", 

143 b"aes-192-ccm", 

144 b"aes-256-ccm", 

145 b"aes-128-gcm", 

146 b"aes-192-gcm", 

147 b"aes-256-gcm", 

148 } 

149 # TripleDES encryption is disallowed/deprecated throughout 2023 in 

150 # FIPS 140-3. To keep it simple we denylist any use of TripleDES (TDEA). 

151 _fips_ciphers = (AES,) 

152 # Sometimes SHA1 is still permissible. That logic is contained 

153 # within the various *_supported methods. 

154 _fips_hashes = ( 

155 hashes.SHA224, 

156 hashes.SHA256, 

157 hashes.SHA384, 

158 hashes.SHA512, 

159 hashes.SHA512_224, 

160 hashes.SHA512_256, 

161 hashes.SHA3_224, 

162 hashes.SHA3_256, 

163 hashes.SHA3_384, 

164 hashes.SHA3_512, 

165 hashes.SHAKE128, 

166 hashes.SHAKE256, 

167 ) 

168 _fips_ecdh_curves = ( 

169 ec.SECP224R1, 

170 ec.SECP256R1, 

171 ec.SECP384R1, 

172 ec.SECP521R1, 

173 ) 

174 _fips_rsa_min_key_size = 2048 

175 _fips_rsa_min_public_exponent = 65537 

176 _fips_dsa_min_modulus = 1 << 2048 

177 _fips_dh_min_key_size = 2048 

178 _fips_dh_min_modulus = 1 << _fips_dh_min_key_size 

179 

180 def __init__(self) -> None: 

181 self._binding = binding.Binding() 

182 self._ffi = self._binding.ffi 

183 self._lib = self._binding.lib 

184 self._fips_enabled = self._is_fips_enabled() 

185 

186 self._cipher_registry: typing.Dict[ 

187 typing.Tuple[typing.Type[CipherAlgorithm], typing.Type[Mode]], 

188 typing.Callable, 

189 ] = {} 

190 self._register_default_ciphers() 

191 if self._fips_enabled and self._lib.CRYPTOGRAPHY_NEEDS_OSRANDOM_ENGINE: 

192 warnings.warn( 

193 "OpenSSL FIPS mode is enabled. Can't enable DRBG fork safety.", 

194 UserWarning, 

195 ) 

196 else: 

197 self.activate_osrandom_engine() 

198 self._dh_types = [self._lib.EVP_PKEY_DH] 

199 if self._lib.Cryptography_HAS_EVP_PKEY_DHX: 

200 self._dh_types.append(self._lib.EVP_PKEY_DHX) 

201 

202 def __repr__(self) -> str: 

203 return "<OpenSSLBackend(version: {}, FIPS: {}, Legacy: {})>".format( 

204 self.openssl_version_text(), 

205 self._fips_enabled, 

206 self._binding._legacy_provider_loaded, 

207 ) 

208 

209 def openssl_assert( 

210 self, 

211 ok: bool, 

212 errors: typing.Optional[typing.List[binding._OpenSSLError]] = None, 

213 ) -> None: 

214 return binding._openssl_assert(self._lib, ok, errors=errors) 

215 

216 def _is_fips_enabled(self) -> bool: 

217 if self._lib.Cryptography_HAS_300_FIPS: 

218 mode = self._lib.EVP_default_properties_is_fips_enabled( 

219 self._ffi.NULL 

220 ) 

221 else: 

222 mode = self._lib.FIPS_mode() 

223 

224 if mode == 0: 

225 # OpenSSL without FIPS pushes an error on the error stack 

226 self._lib.ERR_clear_error() 

227 return bool(mode) 

228 

229 def _enable_fips(self) -> None: 

230 # This function enables FIPS mode for OpenSSL 3.0.0 on installs that 

231 # have the FIPS provider installed properly. 

232 self._binding._enable_fips() 

233 assert self._is_fips_enabled() 

234 self._fips_enabled = self._is_fips_enabled() 

235 

236 def activate_builtin_random(self) -> None: 

237 if self._lib.CRYPTOGRAPHY_NEEDS_OSRANDOM_ENGINE: 

238 # Obtain a new structural reference. 

239 e = self._lib.ENGINE_get_default_RAND() 

240 if e != self._ffi.NULL: 

241 self._lib.ENGINE_unregister_RAND(e) 

242 # Reset the RNG to use the built-in. 

243 res = self._lib.RAND_set_rand_method(self._ffi.NULL) 

244 self.openssl_assert(res == 1) 

245 # decrement the structural reference from get_default_RAND 

246 res = self._lib.ENGINE_finish(e) 

247 self.openssl_assert(res == 1) 

248 

249 @contextlib.contextmanager 

250 def _get_osurandom_engine(self): 

251 # Fetches an engine by id and returns it. This creates a structural 

252 # reference. 

253 e = self._lib.ENGINE_by_id(self._lib.Cryptography_osrandom_engine_id) 

254 self.openssl_assert(e != self._ffi.NULL) 

255 # Initialize the engine for use. This adds a functional reference. 

256 res = self._lib.ENGINE_init(e) 

257 self.openssl_assert(res == 1) 

258 

259 try: 

260 yield e 

261 finally: 

262 # Decrement the structural ref incremented by ENGINE_by_id. 

263 res = self._lib.ENGINE_free(e) 

264 self.openssl_assert(res == 1) 

265 # Decrement the functional ref incremented by ENGINE_init. 

266 res = self._lib.ENGINE_finish(e) 

267 self.openssl_assert(res == 1) 

268 

269 def activate_osrandom_engine(self) -> None: 

270 if self._lib.CRYPTOGRAPHY_NEEDS_OSRANDOM_ENGINE: 

271 # Unregister and free the current engine. 

272 self.activate_builtin_random() 

273 with self._get_osurandom_engine() as e: 

274 # Set the engine as the default RAND provider. 

275 res = self._lib.ENGINE_set_default_RAND(e) 

276 self.openssl_assert(res == 1) 

277 # Reset the RNG to use the engine 

278 res = self._lib.RAND_set_rand_method(self._ffi.NULL) 

279 self.openssl_assert(res == 1) 

280 

281 def osrandom_engine_implementation(self) -> str: 

282 buf = self._ffi.new("char[]", 64) 

283 with self._get_osurandom_engine() as e: 

284 res = self._lib.ENGINE_ctrl_cmd( 

285 e, b"get_implementation", len(buf), buf, self._ffi.NULL, 0 

286 ) 

287 self.openssl_assert(res > 0) 

288 return self._ffi.string(buf).decode("ascii") 

289 

290 def openssl_version_text(self) -> str: 

291 """ 

292 Friendly string name of the loaded OpenSSL library. This is not 

293 necessarily the same version as it was compiled against. 

294 

295 Example: OpenSSL 1.1.1d 10 Sep 2019 

296 """ 

297 return self._ffi.string( 

298 self._lib.OpenSSL_version(self._lib.OPENSSL_VERSION) 

299 ).decode("ascii") 

300 

301 def openssl_version_number(self) -> int: 

302 return self._lib.OpenSSL_version_num() 

303 

304 def create_hmac_ctx( 

305 self, key: bytes, algorithm: hashes.HashAlgorithm 

306 ) -> _HMACContext: 

307 return _HMACContext(self, key, algorithm) 

308 

309 def _evp_md_from_algorithm(self, algorithm: hashes.HashAlgorithm): 

310 if algorithm.name == "blake2b" or algorithm.name == "blake2s": 

311 alg = "{}{}".format( 

312 algorithm.name, algorithm.digest_size * 8 

313 ).encode("ascii") 

314 else: 

315 alg = algorithm.name.encode("ascii") 

316 

317 evp_md = self._lib.EVP_get_digestbyname(alg) 

318 return evp_md 

319 

320 def _evp_md_non_null_from_algorithm(self, algorithm: hashes.HashAlgorithm): 

321 evp_md = self._evp_md_from_algorithm(algorithm) 

322 self.openssl_assert(evp_md != self._ffi.NULL) 

323 return evp_md 

324 

325 def hash_supported(self, algorithm: hashes.HashAlgorithm) -> bool: 

326 if self._fips_enabled and not isinstance(algorithm, self._fips_hashes): 

327 return False 

328 

329 evp_md = self._evp_md_from_algorithm(algorithm) 

330 return evp_md != self._ffi.NULL 

331 

332 def signature_hash_supported( 

333 self, algorithm: hashes.HashAlgorithm 

334 ) -> bool: 

335 # Dedicated check for hashing algorithm use in message digest for 

336 # signatures, e.g. RSA PKCS#1 v1.5 SHA1 (sha1WithRSAEncryption). 

337 if self._fips_enabled and isinstance(algorithm, hashes.SHA1): 

338 return False 

339 return self.hash_supported(algorithm) 

340 

341 def scrypt_supported(self) -> bool: 

342 if self._fips_enabled: 

343 return False 

344 else: 

345 return self._lib.Cryptography_HAS_SCRYPT == 1 

346 

347 def hmac_supported(self, algorithm: hashes.HashAlgorithm) -> bool: 

348 # FIPS mode still allows SHA1 for HMAC 

349 if self._fips_enabled and isinstance(algorithm, hashes.SHA1): 

350 return True 

351 

352 return self.hash_supported(algorithm) 

353 

354 def create_hash_ctx( 

355 self, algorithm: hashes.HashAlgorithm 

356 ) -> hashes.HashContext: 

357 return _HashContext(self, algorithm) 

358 

359 def cipher_supported(self, cipher: CipherAlgorithm, mode: Mode) -> bool: 

360 if self._fips_enabled: 

361 # FIPS mode requires AES. TripleDES is disallowed/deprecated in 

362 # FIPS 140-3. 

363 if not isinstance(cipher, self._fips_ciphers): 

364 return False 

365 

366 try: 

367 adapter = self._cipher_registry[type(cipher), type(mode)] 

368 except KeyError: 

369 return False 

370 evp_cipher = adapter(self, cipher, mode) 

371 return self._ffi.NULL != evp_cipher 

372 

373 def register_cipher_adapter(self, cipher_cls, mode_cls, adapter) -> None: 

374 if (cipher_cls, mode_cls) in self._cipher_registry: 

375 raise ValueError( 

376 "Duplicate registration for: {} {}.".format( 

377 cipher_cls, mode_cls 

378 ) 

379 ) 

380 self._cipher_registry[cipher_cls, mode_cls] = adapter 

381 

382 def _register_default_ciphers(self) -> None: 

383 for cipher_cls in [AES, AES128, AES256]: 

384 for mode_cls in [CBC, CTR, ECB, OFB, CFB, CFB8, GCM]: 

385 self.register_cipher_adapter( 

386 cipher_cls, 

387 mode_cls, 

388 GetCipherByName( 

389 "{cipher.name}-{cipher.key_size}-{mode.name}" 

390 ), 

391 ) 

392 for mode_cls in [CBC, CTR, ECB, OFB, CFB]: 

393 self.register_cipher_adapter( 

394 Camellia, 

395 mode_cls, 

396 GetCipherByName("{cipher.name}-{cipher.key_size}-{mode.name}"), 

397 ) 

398 for mode_cls in [CBC, CFB, CFB8, OFB]: 

399 self.register_cipher_adapter( 

400 TripleDES, mode_cls, GetCipherByName("des-ede3-{mode.name}") 

401 ) 

402 self.register_cipher_adapter( 

403 TripleDES, ECB, GetCipherByName("des-ede3") 

404 ) 

405 self.register_cipher_adapter( 

406 ChaCha20, type(None), GetCipherByName("chacha20") 

407 ) 

408 self.register_cipher_adapter(AES, XTS, _get_xts_cipher) 

409 for mode_cls in [ECB, CBC, OFB, CFB, CTR]: 

410 self.register_cipher_adapter( 

411 SM4, mode_cls, GetCipherByName("sm4-{mode.name}") 

412 ) 

413 # Don't register legacy ciphers if they're unavailable. Hypothetically 

414 # this wouldn't be necessary because we test availability by seeing if 

415 # we get an EVP_CIPHER * in the _CipherContext __init__, but OpenSSL 3 

416 # will return a valid pointer even though the cipher is unavailable. 

417 if ( 

418 self._binding._legacy_provider_loaded 

419 or not self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER 

420 ): 

421 for mode_cls in [CBC, CFB, OFB, ECB]: 

422 self.register_cipher_adapter( 

423 _BlowfishInternal, 

424 mode_cls, 

425 GetCipherByName("bf-{mode.name}"), 

426 ) 

427 for mode_cls in [CBC, CFB, OFB, ECB]: 

428 self.register_cipher_adapter( 

429 _SEEDInternal, 

430 mode_cls, 

431 GetCipherByName("seed-{mode.name}"), 

432 ) 

433 for cipher_cls, mode_cls in itertools.product( 

434 [_CAST5Internal, _IDEAInternal], 

435 [CBC, OFB, CFB, ECB], 

436 ): 

437 self.register_cipher_adapter( 

438 cipher_cls, 

439 mode_cls, 

440 GetCipherByName("{cipher.name}-{mode.name}"), 

441 ) 

442 self.register_cipher_adapter( 

443 ARC4, type(None), GetCipherByName("rc4") 

444 ) 

445 # We don't actually support RC2, this is just used by some tests. 

446 self.register_cipher_adapter( 

447 _RC2, type(None), GetCipherByName("rc2") 

448 ) 

449 

450 def create_symmetric_encryption_ctx( 

451 self, cipher: CipherAlgorithm, mode: Mode 

452 ) -> _CipherContext: 

453 return _CipherContext(self, cipher, mode, _CipherContext._ENCRYPT) 

454 

455 def create_symmetric_decryption_ctx( 

456 self, cipher: CipherAlgorithm, mode: Mode 

457 ) -> _CipherContext: 

458 return _CipherContext(self, cipher, mode, _CipherContext._DECRYPT) 

459 

460 def pbkdf2_hmac_supported(self, algorithm: hashes.HashAlgorithm) -> bool: 

461 return self.hmac_supported(algorithm) 

462 

463 def derive_pbkdf2_hmac( 

464 self, 

465 algorithm: hashes.HashAlgorithm, 

466 length: int, 

467 salt: bytes, 

468 iterations: int, 

469 key_material: bytes, 

470 ) -> bytes: 

471 buf = self._ffi.new("unsigned char[]", length) 

472 evp_md = self._evp_md_non_null_from_algorithm(algorithm) 

473 key_material_ptr = self._ffi.from_buffer(key_material) 

474 res = self._lib.PKCS5_PBKDF2_HMAC( 

475 key_material_ptr, 

476 len(key_material), 

477 salt, 

478 len(salt), 

479 iterations, 

480 evp_md, 

481 length, 

482 buf, 

483 ) 

484 self.openssl_assert(res == 1) 

485 return self._ffi.buffer(buf)[:] 

486 

487 def _consume_errors(self) -> typing.List[binding._OpenSSLError]: 

488 return binding._consume_errors(self._lib) 

489 

490 def _consume_errors_with_text( 

491 self, 

492 ) -> typing.List[binding._OpenSSLErrorWithText]: 

493 return binding._consume_errors_with_text(self._lib) 

494 

495 def _bn_to_int(self, bn) -> int: 

496 assert bn != self._ffi.NULL 

497 self.openssl_assert(not self._lib.BN_is_negative(bn)) 

498 

499 bn_num_bytes = self._lib.BN_num_bytes(bn) 

500 bin_ptr = self._ffi.new("unsigned char[]", bn_num_bytes) 

501 bin_len = self._lib.BN_bn2bin(bn, bin_ptr) 

502 # A zero length means the BN has value 0 

503 self.openssl_assert(bin_len >= 0) 

504 val = int.from_bytes(self._ffi.buffer(bin_ptr)[:bin_len], "big") 

505 return val 

506 

507 def _int_to_bn(self, num: int, bn=None): 

508 """ 

509 Converts a python integer to a BIGNUM. The returned BIGNUM will not 

510 be garbage collected (to support adding them to structs that take 

511 ownership of the object). Be sure to register it for GC if it will 

512 be discarded after use. 

513 """ 

514 assert bn is None or bn != self._ffi.NULL 

515 

516 if bn is None: 

517 bn = self._ffi.NULL 

518 

519 binary = num.to_bytes(int(num.bit_length() / 8.0 + 1), "big") 

520 bn_ptr = self._lib.BN_bin2bn(binary, len(binary), bn) 

521 self.openssl_assert(bn_ptr != self._ffi.NULL) 

522 return bn_ptr 

523 

524 def generate_rsa_private_key( 

525 self, public_exponent: int, key_size: int 

526 ) -> rsa.RSAPrivateKey: 

527 rsa._verify_rsa_parameters(public_exponent, key_size) 

528 

529 rsa_cdata = self._lib.RSA_new() 

530 self.openssl_assert(rsa_cdata != self._ffi.NULL) 

531 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) 

532 

533 bn = self._int_to_bn(public_exponent) 

534 bn = self._ffi.gc(bn, self._lib.BN_free) 

535 

536 res = self._lib.RSA_generate_key_ex( 

537 rsa_cdata, key_size, bn, self._ffi.NULL 

538 ) 

539 self.openssl_assert(res == 1) 

540 evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata) 

541 

542 # We can skip RSA key validation here since we just generated the key 

543 return _RSAPrivateKey( 

544 self, rsa_cdata, evp_pkey, unsafe_skip_rsa_key_validation=True 

545 ) 

546 

547 def generate_rsa_parameters_supported( 

548 self, public_exponent: int, key_size: int 

549 ) -> bool: 

550 return ( 

551 public_exponent >= 3 

552 and public_exponent & 1 != 0 

553 and key_size >= 512 

554 ) 

555 

556 def load_rsa_private_numbers( 

557 self, 

558 numbers: rsa.RSAPrivateNumbers, 

559 unsafe_skip_rsa_key_validation: bool, 

560 ) -> rsa.RSAPrivateKey: 

561 rsa._check_private_key_components( 

562 numbers.p, 

563 numbers.q, 

564 numbers.d, 

565 numbers.dmp1, 

566 numbers.dmq1, 

567 numbers.iqmp, 

568 numbers.public_numbers.e, 

569 numbers.public_numbers.n, 

570 ) 

571 rsa_cdata = self._lib.RSA_new() 

572 self.openssl_assert(rsa_cdata != self._ffi.NULL) 

573 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) 

574 p = self._int_to_bn(numbers.p) 

575 q = self._int_to_bn(numbers.q) 

576 d = self._int_to_bn(numbers.d) 

577 dmp1 = self._int_to_bn(numbers.dmp1) 

578 dmq1 = self._int_to_bn(numbers.dmq1) 

579 iqmp = self._int_to_bn(numbers.iqmp) 

580 e = self._int_to_bn(numbers.public_numbers.e) 

581 n = self._int_to_bn(numbers.public_numbers.n) 

582 res = self._lib.RSA_set0_factors(rsa_cdata, p, q) 

583 self.openssl_assert(res == 1) 

584 res = self._lib.RSA_set0_key(rsa_cdata, n, e, d) 

585 self.openssl_assert(res == 1) 

586 res = self._lib.RSA_set0_crt_params(rsa_cdata, dmp1, dmq1, iqmp) 

587 self.openssl_assert(res == 1) 

588 evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata) 

589 

590 return _RSAPrivateKey( 

591 self, 

592 rsa_cdata, 

593 evp_pkey, 

594 unsafe_skip_rsa_key_validation=unsafe_skip_rsa_key_validation, 

595 ) 

596 

597 def load_rsa_public_numbers( 

598 self, numbers: rsa.RSAPublicNumbers 

599 ) -> rsa.RSAPublicKey: 

600 rsa._check_public_key_components(numbers.e, numbers.n) 

601 rsa_cdata = self._lib.RSA_new() 

602 self.openssl_assert(rsa_cdata != self._ffi.NULL) 

603 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) 

604 e = self._int_to_bn(numbers.e) 

605 n = self._int_to_bn(numbers.n) 

606 res = self._lib.RSA_set0_key(rsa_cdata, n, e, self._ffi.NULL) 

607 self.openssl_assert(res == 1) 

608 evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata) 

609 

610 return _RSAPublicKey(self, rsa_cdata, evp_pkey) 

611 

612 def _create_evp_pkey_gc(self): 

613 evp_pkey = self._lib.EVP_PKEY_new() 

614 self.openssl_assert(evp_pkey != self._ffi.NULL) 

615 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 

616 return evp_pkey 

617 

618 def _rsa_cdata_to_evp_pkey(self, rsa_cdata): 

619 evp_pkey = self._create_evp_pkey_gc() 

620 res = self._lib.EVP_PKEY_set1_RSA(evp_pkey, rsa_cdata) 

621 self.openssl_assert(res == 1) 

622 return evp_pkey 

623 

624 def _bytes_to_bio(self, data: bytes) -> _MemoryBIO: 

625 """ 

626 Return a _MemoryBIO namedtuple of (BIO, char*). 

627 

628 The char* is the storage for the BIO and it must stay alive until the 

629 BIO is finished with. 

630 """ 

631 data_ptr = self._ffi.from_buffer(data) 

632 bio = self._lib.BIO_new_mem_buf(data_ptr, len(data)) 

633 self.openssl_assert(bio != self._ffi.NULL) 

634 

635 return _MemoryBIO(self._ffi.gc(bio, self._lib.BIO_free), data_ptr) 

636 

637 def _create_mem_bio_gc(self): 

638 """ 

639 Creates an empty memory BIO. 

640 """ 

641 bio_method = self._lib.BIO_s_mem() 

642 self.openssl_assert(bio_method != self._ffi.NULL) 

643 bio = self._lib.BIO_new(bio_method) 

644 self.openssl_assert(bio != self._ffi.NULL) 

645 bio = self._ffi.gc(bio, self._lib.BIO_free) 

646 return bio 

647 

648 def _read_mem_bio(self, bio) -> bytes: 

649 """ 

650 Reads a memory BIO. This only works on memory BIOs. 

651 """ 

652 buf = self._ffi.new("char **") 

653 buf_len = self._lib.BIO_get_mem_data(bio, buf) 

654 self.openssl_assert(buf_len > 0) 

655 self.openssl_assert(buf[0] != self._ffi.NULL) 

656 bio_data = self._ffi.buffer(buf[0], buf_len)[:] 

657 return bio_data 

658 

659 def _evp_pkey_to_private_key( 

660 self, evp_pkey, unsafe_skip_rsa_key_validation: bool 

661 ) -> PrivateKeyTypes: 

662 """ 

663 Return the appropriate type of PrivateKey given an evp_pkey cdata 

664 pointer. 

665 """ 

666 

667 key_type = self._lib.EVP_PKEY_id(evp_pkey) 

668 

669 if key_type == self._lib.EVP_PKEY_RSA: 

670 rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey) 

671 self.openssl_assert(rsa_cdata != self._ffi.NULL) 

672 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) 

673 return _RSAPrivateKey( 

674 self, 

675 rsa_cdata, 

676 evp_pkey, 

677 unsafe_skip_rsa_key_validation=unsafe_skip_rsa_key_validation, 

678 ) 

679 elif ( 

680 key_type == self._lib.EVP_PKEY_RSA_PSS 

681 and not self._lib.CRYPTOGRAPHY_IS_LIBRESSL 

682 and not self._lib.CRYPTOGRAPHY_IS_BORINGSSL 

683 and not self._lib.CRYPTOGRAPHY_OPENSSL_LESS_THAN_111E 

684 ): 

685 # At the moment the way we handle RSA PSS keys is to strip the 

686 # PSS constraints from them and treat them as normal RSA keys 

687 # Unfortunately the RSA * itself tracks this data so we need to 

688 # extract, serialize, and reload it without the constraints. 

689 rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey) 

690 self.openssl_assert(rsa_cdata != self._ffi.NULL) 

691 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) 

692 bio = self._create_mem_bio_gc() 

693 res = self._lib.i2d_RSAPrivateKey_bio(bio, rsa_cdata) 

694 self.openssl_assert(res == 1) 

695 return self.load_der_private_key( 

696 self._read_mem_bio(bio), 

697 password=None, 

698 unsafe_skip_rsa_key_validation=unsafe_skip_rsa_key_validation, 

699 ) 

700 elif key_type == self._lib.EVP_PKEY_DSA: 

701 dsa_cdata = self._lib.EVP_PKEY_get1_DSA(evp_pkey) 

702 self.openssl_assert(dsa_cdata != self._ffi.NULL) 

703 dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free) 

704 return _DSAPrivateKey(self, dsa_cdata, evp_pkey) 

705 elif key_type == self._lib.EVP_PKEY_EC: 

706 ec_cdata = self._lib.EVP_PKEY_get1_EC_KEY(evp_pkey) 

707 self.openssl_assert(ec_cdata != self._ffi.NULL) 

708 ec_cdata = self._ffi.gc(ec_cdata, self._lib.EC_KEY_free) 

709 return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey) 

710 elif key_type in self._dh_types: 

711 dh_cdata = self._lib.EVP_PKEY_get1_DH(evp_pkey) 

712 self.openssl_assert(dh_cdata != self._ffi.NULL) 

713 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free) 

714 return _DHPrivateKey(self, dh_cdata, evp_pkey) 

715 elif key_type == getattr(self._lib, "EVP_PKEY_ED25519", None): 

716 # EVP_PKEY_ED25519 is not present in CRYPTOGRAPHY_IS_LIBRESSL 

717 return _Ed25519PrivateKey(self, evp_pkey) 

718 elif key_type == getattr(self._lib, "EVP_PKEY_X448", None): 

719 # EVP_PKEY_X448 is not present in CRYPTOGRAPHY_IS_LIBRESSL 

720 return _X448PrivateKey(self, evp_pkey) 

721 elif key_type == self._lib.EVP_PKEY_X25519: 

722 return _X25519PrivateKey(self, evp_pkey) 

723 elif key_type == getattr(self._lib, "EVP_PKEY_ED448", None): 

724 # EVP_PKEY_ED448 is not present in CRYPTOGRAPHY_IS_LIBRESSL 

725 return _Ed448PrivateKey(self, evp_pkey) 

726 else: 

727 raise UnsupportedAlgorithm("Unsupported key type.") 

728 

729 def _evp_pkey_to_public_key(self, evp_pkey) -> PublicKeyTypes: 

730 """ 

731 Return the appropriate type of PublicKey given an evp_pkey cdata 

732 pointer. 

733 """ 

734 

735 key_type = self._lib.EVP_PKEY_id(evp_pkey) 

736 

737 if key_type == self._lib.EVP_PKEY_RSA: 

738 rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey) 

739 self.openssl_assert(rsa_cdata != self._ffi.NULL) 

740 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) 

741 return _RSAPublicKey(self, rsa_cdata, evp_pkey) 

742 elif ( 

743 key_type == self._lib.EVP_PKEY_RSA_PSS 

744 and not self._lib.CRYPTOGRAPHY_IS_LIBRESSL 

745 and not self._lib.CRYPTOGRAPHY_IS_BORINGSSL 

746 and not self._lib.CRYPTOGRAPHY_OPENSSL_LESS_THAN_111E 

747 ): 

748 rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey) 

749 self.openssl_assert(rsa_cdata != self._ffi.NULL) 

750 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) 

751 bio = self._create_mem_bio_gc() 

752 res = self._lib.i2d_RSAPublicKey_bio(bio, rsa_cdata) 

753 self.openssl_assert(res == 1) 

754 return self.load_der_public_key(self._read_mem_bio(bio)) 

755 elif key_type == self._lib.EVP_PKEY_DSA: 

756 dsa_cdata = self._lib.EVP_PKEY_get1_DSA(evp_pkey) 

757 self.openssl_assert(dsa_cdata != self._ffi.NULL) 

758 dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free) 

759 return _DSAPublicKey(self, dsa_cdata, evp_pkey) 

760 elif key_type == self._lib.EVP_PKEY_EC: 

761 ec_cdata = self._lib.EVP_PKEY_get1_EC_KEY(evp_pkey) 

762 if ec_cdata == self._ffi.NULL: 

763 errors = self._consume_errors_with_text() 

764 raise ValueError("Unable to load EC key", errors) 

765 ec_cdata = self._ffi.gc(ec_cdata, self._lib.EC_KEY_free) 

766 return _EllipticCurvePublicKey(self, ec_cdata, evp_pkey) 

767 elif key_type in self._dh_types: 

768 dh_cdata = self._lib.EVP_PKEY_get1_DH(evp_pkey) 

769 self.openssl_assert(dh_cdata != self._ffi.NULL) 

770 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free) 

771 return _DHPublicKey(self, dh_cdata, evp_pkey) 

772 elif key_type == getattr(self._lib, "EVP_PKEY_ED25519", None): 

773 # EVP_PKEY_ED25519 is not present in CRYPTOGRAPHY_IS_LIBRESSL 

774 return _Ed25519PublicKey(self, evp_pkey) 

775 elif key_type == getattr(self._lib, "EVP_PKEY_X448", None): 

776 # EVP_PKEY_X448 is not present in CRYPTOGRAPHY_IS_LIBRESSL 

777 return _X448PublicKey(self, evp_pkey) 

778 elif key_type == self._lib.EVP_PKEY_X25519: 

779 return _X25519PublicKey(self, evp_pkey) 

780 elif key_type == getattr(self._lib, "EVP_PKEY_ED448", None): 

781 # EVP_PKEY_ED448 is not present in CRYPTOGRAPHY_IS_LIBRESSL 

782 return _Ed448PublicKey(self, evp_pkey) 

783 else: 

784 raise UnsupportedAlgorithm("Unsupported key type.") 

785 

786 def _oaep_hash_supported(self, algorithm: hashes.HashAlgorithm) -> bool: 

787 if self._fips_enabled and isinstance(algorithm, hashes.SHA1): 

788 return False 

789 

790 return isinstance( 

791 algorithm, 

792 ( 

793 hashes.SHA1, 

794 hashes.SHA224, 

795 hashes.SHA256, 

796 hashes.SHA384, 

797 hashes.SHA512, 

798 ), 

799 ) 

800 

801 def rsa_padding_supported(self, padding: AsymmetricPadding) -> bool: 

802 if isinstance(padding, PKCS1v15): 

803 return True 

804 elif isinstance(padding, PSS) and isinstance(padding._mgf, MGF1): 

805 # SHA1 is permissible in MGF1 in FIPS even when SHA1 is blocked 

806 # as signature algorithm. 

807 if self._fips_enabled and isinstance( 

808 padding._mgf._algorithm, hashes.SHA1 

809 ): 

810 return True 

811 else: 

812 return self.hash_supported(padding._mgf._algorithm) 

813 elif isinstance(padding, OAEP) and isinstance(padding._mgf, MGF1): 

814 return self._oaep_hash_supported( 

815 padding._mgf._algorithm 

816 ) and self._oaep_hash_supported(padding._algorithm) 

817 else: 

818 return False 

819 

820 def rsa_encryption_supported(self, padding: AsymmetricPadding) -> bool: 

821 if self._fips_enabled and isinstance(padding, PKCS1v15): 

822 return False 

823 else: 

824 return self.rsa_padding_supported(padding) 

825 

826 def generate_dsa_parameters(self, key_size: int) -> dsa.DSAParameters: 

827 if key_size not in (1024, 2048, 3072, 4096): 

828 raise ValueError( 

829 "Key size must be 1024, 2048, 3072, or 4096 bits." 

830 ) 

831 

832 ctx = self._lib.DSA_new() 

833 self.openssl_assert(ctx != self._ffi.NULL) 

834 ctx = self._ffi.gc(ctx, self._lib.DSA_free) 

835 

836 res = self._lib.DSA_generate_parameters_ex( 

837 ctx, 

838 key_size, 

839 self._ffi.NULL, 

840 0, 

841 self._ffi.NULL, 

842 self._ffi.NULL, 

843 self._ffi.NULL, 

844 ) 

845 

846 self.openssl_assert(res == 1) 

847 

848 return _DSAParameters(self, ctx) 

849 

850 def generate_dsa_private_key( 

851 self, parameters: dsa.DSAParameters 

852 ) -> dsa.DSAPrivateKey: 

853 ctx = self._lib.DSAparams_dup( 

854 parameters._dsa_cdata # type: ignore[attr-defined] 

855 ) 

856 self.openssl_assert(ctx != self._ffi.NULL) 

857 ctx = self._ffi.gc(ctx, self._lib.DSA_free) 

858 self._lib.DSA_generate_key(ctx) 

859 evp_pkey = self._dsa_cdata_to_evp_pkey(ctx) 

860 

861 return _DSAPrivateKey(self, ctx, evp_pkey) 

862 

863 def generate_dsa_private_key_and_parameters( 

864 self, key_size: int 

865 ) -> dsa.DSAPrivateKey: 

866 parameters = self.generate_dsa_parameters(key_size) 

867 return self.generate_dsa_private_key(parameters) 

868 

869 def _dsa_cdata_set_values( 

870 self, dsa_cdata, p, q, g, pub_key, priv_key 

871 ) -> None: 

872 res = self._lib.DSA_set0_pqg(dsa_cdata, p, q, g) 

873 self.openssl_assert(res == 1) 

874 res = self._lib.DSA_set0_key(dsa_cdata, pub_key, priv_key) 

875 self.openssl_assert(res == 1) 

876 

877 def load_dsa_private_numbers( 

878 self, numbers: dsa.DSAPrivateNumbers 

879 ) -> dsa.DSAPrivateKey: 

880 dsa._check_dsa_private_numbers(numbers) 

881 parameter_numbers = numbers.public_numbers.parameter_numbers 

882 

883 dsa_cdata = self._lib.DSA_new() 

884 self.openssl_assert(dsa_cdata != self._ffi.NULL) 

885 dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free) 

886 

887 p = self._int_to_bn(parameter_numbers.p) 

888 q = self._int_to_bn(parameter_numbers.q) 

889 g = self._int_to_bn(parameter_numbers.g) 

890 pub_key = self._int_to_bn(numbers.public_numbers.y) 

891 priv_key = self._int_to_bn(numbers.x) 

892 self._dsa_cdata_set_values(dsa_cdata, p, q, g, pub_key, priv_key) 

893 

894 evp_pkey = self._dsa_cdata_to_evp_pkey(dsa_cdata) 

895 

896 return _DSAPrivateKey(self, dsa_cdata, evp_pkey) 

897 

898 def load_dsa_public_numbers( 

899 self, numbers: dsa.DSAPublicNumbers 

900 ) -> dsa.DSAPublicKey: 

901 dsa._check_dsa_parameters(numbers.parameter_numbers) 

902 dsa_cdata = self._lib.DSA_new() 

903 self.openssl_assert(dsa_cdata != self._ffi.NULL) 

904 dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free) 

905 

906 p = self._int_to_bn(numbers.parameter_numbers.p) 

907 q = self._int_to_bn(numbers.parameter_numbers.q) 

908 g = self._int_to_bn(numbers.parameter_numbers.g) 

909 pub_key = self._int_to_bn(numbers.y) 

910 priv_key = self._ffi.NULL 

911 self._dsa_cdata_set_values(dsa_cdata, p, q, g, pub_key, priv_key) 

912 

913 evp_pkey = self._dsa_cdata_to_evp_pkey(dsa_cdata) 

914 

915 return _DSAPublicKey(self, dsa_cdata, evp_pkey) 

916 

917 def load_dsa_parameter_numbers( 

918 self, numbers: dsa.DSAParameterNumbers 

919 ) -> dsa.DSAParameters: 

920 dsa._check_dsa_parameters(numbers) 

921 dsa_cdata = self._lib.DSA_new() 

922 self.openssl_assert(dsa_cdata != self._ffi.NULL) 

923 dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free) 

924 

925 p = self._int_to_bn(numbers.p) 

926 q = self._int_to_bn(numbers.q) 

927 g = self._int_to_bn(numbers.g) 

928 res = self._lib.DSA_set0_pqg(dsa_cdata, p, q, g) 

929 self.openssl_assert(res == 1) 

930 

931 return _DSAParameters(self, dsa_cdata) 

932 

933 def _dsa_cdata_to_evp_pkey(self, dsa_cdata): 

934 evp_pkey = self._create_evp_pkey_gc() 

935 res = self._lib.EVP_PKEY_set1_DSA(evp_pkey, dsa_cdata) 

936 self.openssl_assert(res == 1) 

937 return evp_pkey 

938 

939 def dsa_supported(self) -> bool: 

940 return not self._fips_enabled 

941 

942 def dsa_hash_supported(self, algorithm: hashes.HashAlgorithm) -> bool: 

943 if not self.dsa_supported(): 

944 return False 

945 return self.signature_hash_supported(algorithm) 

946 

947 def cmac_algorithm_supported(self, algorithm) -> bool: 

948 return self.cipher_supported( 

949 algorithm, CBC(b"\x00" * algorithm.block_size) 

950 ) 

951 

952 def create_cmac_ctx(self, algorithm: BlockCipherAlgorithm) -> _CMACContext: 

953 return _CMACContext(self, algorithm) 

954 

955 def load_pem_private_key( 

956 self, 

957 data: bytes, 

958 password: typing.Optional[bytes], 

959 unsafe_skip_rsa_key_validation: bool, 

960 ) -> PrivateKeyTypes: 

961 return self._load_key( 

962 self._lib.PEM_read_bio_PrivateKey, 

963 data, 

964 password, 

965 unsafe_skip_rsa_key_validation, 

966 ) 

967 

968 def load_pem_public_key(self, data: bytes) -> PublicKeyTypes: 

969 mem_bio = self._bytes_to_bio(data) 

970 # In OpenSSL 3.0.x the PEM_read_bio_PUBKEY function will invoke 

971 # the default password callback if you pass an encrypted private 

972 # key. This is very, very, very bad as the default callback can 

973 # trigger an interactive console prompt, which will hang the 

974 # Python process. We therefore provide our own callback to 

975 # catch this and error out properly. 

976 userdata = self._ffi.new("CRYPTOGRAPHY_PASSWORD_DATA *") 

977 evp_pkey = self._lib.PEM_read_bio_PUBKEY( 

978 mem_bio.bio, 

979 self._ffi.NULL, 

980 self._ffi.addressof( 

981 self._lib._original_lib, "Cryptography_pem_password_cb" 

982 ), 

983 userdata, 

984 ) 

985 if evp_pkey != self._ffi.NULL: 

986 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 

987 return self._evp_pkey_to_public_key(evp_pkey) 

988 else: 

989 # It's not a (RSA/DSA/ECDSA) subjectPublicKeyInfo, but we still 

990 # need to check to see if it is a pure PKCS1 RSA public key (not 

991 # embedded in a subjectPublicKeyInfo) 

992 self._consume_errors() 

993 res = self._lib.BIO_reset(mem_bio.bio) 

994 self.openssl_assert(res == 1) 

995 rsa_cdata = self._lib.PEM_read_bio_RSAPublicKey( 

996 mem_bio.bio, 

997 self._ffi.NULL, 

998 self._ffi.addressof( 

999 self._lib._original_lib, "Cryptography_pem_password_cb" 

1000 ), 

1001 userdata, 

1002 ) 

1003 if rsa_cdata != self._ffi.NULL: 

1004 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) 

1005 evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata) 

1006 return _RSAPublicKey(self, rsa_cdata, evp_pkey) 

1007 else: 

1008 self._handle_key_loading_error() 

1009 

1010 def load_pem_parameters(self, data: bytes) -> dh.DHParameters: 

1011 mem_bio = self._bytes_to_bio(data) 

1012 # only DH is supported currently 

1013 dh_cdata = self._lib.PEM_read_bio_DHparams( 

1014 mem_bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL 

1015 ) 

1016 if dh_cdata != self._ffi.NULL: 

1017 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free) 

1018 return _DHParameters(self, dh_cdata) 

1019 else: 

1020 self._handle_key_loading_error() 

1021 

1022 def load_der_private_key( 

1023 self, 

1024 data: bytes, 

1025 password: typing.Optional[bytes], 

1026 unsafe_skip_rsa_key_validation: bool, 

1027 ) -> PrivateKeyTypes: 

1028 # OpenSSL has a function called d2i_AutoPrivateKey that in theory 

1029 # handles this automatically, however it doesn't handle encrypted 

1030 # private keys. Instead we try to load the key two different ways. 

1031 # First we'll try to load it as a traditional key. 

1032 bio_data = self._bytes_to_bio(data) 

1033 key = self._evp_pkey_from_der_traditional_key(bio_data, password) 

1034 if key: 

1035 return self._evp_pkey_to_private_key( 

1036 key, unsafe_skip_rsa_key_validation 

1037 ) 

1038 else: 

1039 # Finally we try to load it with the method that handles encrypted 

1040 # PKCS8 properly. 

1041 return self._load_key( 

1042 self._lib.d2i_PKCS8PrivateKey_bio, 

1043 data, 

1044 password, 

1045 unsafe_skip_rsa_key_validation, 

1046 ) 

1047 

1048 def _evp_pkey_from_der_traditional_key(self, bio_data, password): 

1049 key = self._lib.d2i_PrivateKey_bio(bio_data.bio, self._ffi.NULL) 

1050 if key != self._ffi.NULL: 

1051 key = self._ffi.gc(key, self._lib.EVP_PKEY_free) 

1052 if password is not None: 

1053 raise TypeError( 

1054 "Password was given but private key is not encrypted." 

1055 ) 

1056 

1057 return key 

1058 else: 

1059 self._consume_errors() 

1060 return None 

1061 

1062 def load_der_public_key(self, data: bytes) -> PublicKeyTypes: 

1063 mem_bio = self._bytes_to_bio(data) 

1064 evp_pkey = self._lib.d2i_PUBKEY_bio(mem_bio.bio, self._ffi.NULL) 

1065 if evp_pkey != self._ffi.NULL: 

1066 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 

1067 return self._evp_pkey_to_public_key(evp_pkey) 

1068 else: 

1069 # It's not a (RSA/DSA/ECDSA) subjectPublicKeyInfo, but we still 

1070 # need to check to see if it is a pure PKCS1 RSA public key (not 

1071 # embedded in a subjectPublicKeyInfo) 

1072 self._consume_errors() 

1073 res = self._lib.BIO_reset(mem_bio.bio) 

1074 self.openssl_assert(res == 1) 

1075 rsa_cdata = self._lib.d2i_RSAPublicKey_bio( 

1076 mem_bio.bio, self._ffi.NULL 

1077 ) 

1078 if rsa_cdata != self._ffi.NULL: 

1079 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) 

1080 evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata) 

1081 return _RSAPublicKey(self, rsa_cdata, evp_pkey) 

1082 else: 

1083 self._handle_key_loading_error() 

1084 

1085 def load_der_parameters(self, data: bytes) -> dh.DHParameters: 

1086 mem_bio = self._bytes_to_bio(data) 

1087 dh_cdata = self._lib.d2i_DHparams_bio(mem_bio.bio, self._ffi.NULL) 

1088 if dh_cdata != self._ffi.NULL: 

1089 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free) 

1090 return _DHParameters(self, dh_cdata) 

1091 elif self._lib.Cryptography_HAS_EVP_PKEY_DHX: 

1092 # We check to see if the is dhx. 

1093 self._consume_errors() 

1094 res = self._lib.BIO_reset(mem_bio.bio) 

1095 self.openssl_assert(res == 1) 

1096 dh_cdata = self._lib.d2i_DHxparams_bio(mem_bio.bio, self._ffi.NULL) 

1097 if dh_cdata != self._ffi.NULL: 

1098 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free) 

1099 return _DHParameters(self, dh_cdata) 

1100 

1101 self._handle_key_loading_error() 

1102 

1103 def _cert2ossl(self, cert: x509.Certificate) -> typing.Any: 

1104 data = cert.public_bytes(serialization.Encoding.DER) 

1105 mem_bio = self._bytes_to_bio(data) 

1106 x509 = self._lib.d2i_X509_bio(mem_bio.bio, self._ffi.NULL) 

1107 self.openssl_assert(x509 != self._ffi.NULL) 

1108 x509 = self._ffi.gc(x509, self._lib.X509_free) 

1109 return x509 

1110 

1111 def _ossl2cert(self, x509_ptr: typing.Any) -> x509.Certificate: 

1112 bio = self._create_mem_bio_gc() 

1113 res = self._lib.i2d_X509_bio(bio, x509_ptr) 

1114 self.openssl_assert(res == 1) 

1115 return x509.load_der_x509_certificate(self._read_mem_bio(bio)) 

1116 

1117 def _check_keys_correspond(self, key1, key2) -> None: 

1118 if self._lib.EVP_PKEY_cmp(key1._evp_pkey, key2._evp_pkey) != 1: 

1119 raise ValueError("Keys do not correspond") 

1120 

1121 def _load_key( 

1122 self, openssl_read_func, data, password, unsafe_skip_rsa_key_validation 

1123 ) -> PrivateKeyTypes: 

1124 mem_bio = self._bytes_to_bio(data) 

1125 

1126 userdata = self._ffi.new("CRYPTOGRAPHY_PASSWORD_DATA *") 

1127 if password is not None: 

1128 utils._check_byteslike("password", password) 

1129 password_ptr = self._ffi.from_buffer(password) 

1130 userdata.password = password_ptr 

1131 userdata.length = len(password) 

1132 

1133 evp_pkey = openssl_read_func( 

1134 mem_bio.bio, 

1135 self._ffi.NULL, 

1136 self._ffi.addressof( 

1137 self._lib._original_lib, "Cryptography_pem_password_cb" 

1138 ), 

1139 userdata, 

1140 ) 

1141 

1142 if evp_pkey == self._ffi.NULL: 

1143 if userdata.error != 0: 

1144 self._consume_errors() 

1145 if userdata.error == -1: 

1146 raise TypeError( 

1147 "Password was not given but private key is encrypted" 

1148 ) 

1149 else: 

1150 assert userdata.error == -2 

1151 raise ValueError( 

1152 "Passwords longer than {} bytes are not supported " 

1153 "by this backend.".format(userdata.maxsize - 1) 

1154 ) 

1155 else: 

1156 self._handle_key_loading_error() 

1157 

1158 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 

1159 

1160 if password is not None and userdata.called == 0: 

1161 raise TypeError( 

1162 "Password was given but private key is not encrypted." 

1163 ) 

1164 

1165 assert ( 

1166 password is not None and userdata.called == 1 

1167 ) or password is None 

1168 

1169 return self._evp_pkey_to_private_key( 

1170 evp_pkey, unsafe_skip_rsa_key_validation 

1171 ) 

1172 

1173 def _handle_key_loading_error(self) -> typing.NoReturn: 

1174 errors = self._consume_errors() 

1175 

1176 if not errors: 

1177 raise ValueError( 

1178 "Could not deserialize key data. The data may be in an " 

1179 "incorrect format or it may be encrypted with an unsupported " 

1180 "algorithm." 

1181 ) 

1182 

1183 elif ( 

1184 errors[0]._lib_reason_match( 

1185 self._lib.ERR_LIB_EVP, self._lib.EVP_R_BAD_DECRYPT 

1186 ) 

1187 or errors[0]._lib_reason_match( 

1188 self._lib.ERR_LIB_PKCS12, 

1189 self._lib.PKCS12_R_PKCS12_CIPHERFINAL_ERROR, 

1190 ) 

1191 or ( 

1192 self._lib.Cryptography_HAS_PROVIDERS 

1193 and errors[0]._lib_reason_match( 

1194 self._lib.ERR_LIB_PROV, 

1195 self._lib.PROV_R_BAD_DECRYPT, 

1196 ) 

1197 ) 

1198 ): 

1199 raise ValueError("Bad decrypt. Incorrect password?") 

1200 

1201 elif any( 

1202 error._lib_reason_match( 

1203 self._lib.ERR_LIB_EVP, 

1204 self._lib.EVP_R_UNSUPPORTED_PRIVATE_KEY_ALGORITHM, 

1205 ) 

1206 for error in errors 

1207 ): 

1208 raise ValueError("Unsupported public key algorithm.") 

1209 

1210 else: 

1211 errors_with_text = binding._errors_with_text(errors) 

1212 raise ValueError( 

1213 "Could not deserialize key data. The data may be in an " 

1214 "incorrect format, it may be encrypted with an unsupported " 

1215 "algorithm, or it may be an unsupported key type (e.g. EC " 

1216 "curves with explicit parameters).", 

1217 errors_with_text, 

1218 ) 

1219 

1220 def elliptic_curve_supported(self, curve: ec.EllipticCurve) -> bool: 

1221 try: 

1222 curve_nid = self._elliptic_curve_to_nid(curve) 

1223 except UnsupportedAlgorithm: 

1224 curve_nid = self._lib.NID_undef 

1225 

1226 group = self._lib.EC_GROUP_new_by_curve_name(curve_nid) 

1227 

1228 if group == self._ffi.NULL: 

1229 self._consume_errors() 

1230 return False 

1231 else: 

1232 self.openssl_assert(curve_nid != self._lib.NID_undef) 

1233 self._lib.EC_GROUP_free(group) 

1234 return True 

1235 

1236 def elliptic_curve_signature_algorithm_supported( 

1237 self, 

1238 signature_algorithm: ec.EllipticCurveSignatureAlgorithm, 

1239 curve: ec.EllipticCurve, 

1240 ) -> bool: 

1241 # We only support ECDSA right now. 

1242 if not isinstance(signature_algorithm, ec.ECDSA): 

1243 return False 

1244 

1245 return self.elliptic_curve_supported(curve) 

1246 

1247 def generate_elliptic_curve_private_key( 

1248 self, curve: ec.EllipticCurve 

1249 ) -> ec.EllipticCurvePrivateKey: 

1250 """ 

1251 Generate a new private key on the named curve. 

1252 """ 

1253 

1254 if self.elliptic_curve_supported(curve): 

1255 ec_cdata = self._ec_key_new_by_curve(curve) 

1256 

1257 res = self._lib.EC_KEY_generate_key(ec_cdata) 

1258 self.openssl_assert(res == 1) 

1259 

1260 evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata) 

1261 

1262 return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey) 

1263 else: 

1264 raise UnsupportedAlgorithm( 

1265 f"Backend object does not support {curve.name}.", 

1266 _Reasons.UNSUPPORTED_ELLIPTIC_CURVE, 

1267 ) 

1268 

1269 def load_elliptic_curve_private_numbers( 

1270 self, numbers: ec.EllipticCurvePrivateNumbers 

1271 ) -> ec.EllipticCurvePrivateKey: 

1272 public = numbers.public_numbers 

1273 

1274 ec_cdata = self._ec_key_new_by_curve(public.curve) 

1275 

1276 private_value = self._ffi.gc( 

1277 self._int_to_bn(numbers.private_value), self._lib.BN_clear_free 

1278 ) 

1279 res = self._lib.EC_KEY_set_private_key(ec_cdata, private_value) 

1280 if res != 1: 

1281 self._consume_errors() 

1282 raise ValueError("Invalid EC key.") 

1283 

1284 with self._tmp_bn_ctx() as bn_ctx: 

1285 self._ec_key_set_public_key_affine_coordinates( 

1286 ec_cdata, public.x, public.y, bn_ctx 

1287 ) 

1288 # derive the expected public point and compare it to the one we 

1289 # just set based on the values we were given. If they don't match 

1290 # this isn't a valid key pair. 

1291 group = self._lib.EC_KEY_get0_group(ec_cdata) 

1292 self.openssl_assert(group != self._ffi.NULL) 

1293 set_point = backend._lib.EC_KEY_get0_public_key(ec_cdata) 

1294 self.openssl_assert(set_point != self._ffi.NULL) 

1295 computed_point = self._lib.EC_POINT_new(group) 

1296 self.openssl_assert(computed_point != self._ffi.NULL) 

1297 computed_point = self._ffi.gc( 

1298 computed_point, self._lib.EC_POINT_free 

1299 ) 

1300 res = self._lib.EC_POINT_mul( 

1301 group, 

1302 computed_point, 

1303 private_value, 

1304 self._ffi.NULL, 

1305 self._ffi.NULL, 

1306 bn_ctx, 

1307 ) 

1308 self.openssl_assert(res == 1) 

1309 if ( 

1310 self._lib.EC_POINT_cmp( 

1311 group, set_point, computed_point, bn_ctx 

1312 ) 

1313 != 0 

1314 ): 

1315 raise ValueError("Invalid EC key.") 

1316 

1317 evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata) 

1318 

1319 return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey) 

1320 

1321 def load_elliptic_curve_public_numbers( 

1322 self, numbers: ec.EllipticCurvePublicNumbers 

1323 ) -> ec.EllipticCurvePublicKey: 

1324 ec_cdata = self._ec_key_new_by_curve(numbers.curve) 

1325 with self._tmp_bn_ctx() as bn_ctx: 

1326 self._ec_key_set_public_key_affine_coordinates( 

1327 ec_cdata, numbers.x, numbers.y, bn_ctx 

1328 ) 

1329 evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata) 

1330 

1331 return _EllipticCurvePublicKey(self, ec_cdata, evp_pkey) 

1332 

1333 def load_elliptic_curve_public_bytes( 

1334 self, curve: ec.EllipticCurve, point_bytes: bytes 

1335 ) -> ec.EllipticCurvePublicKey: 

1336 ec_cdata = self._ec_key_new_by_curve(curve) 

1337 group = self._lib.EC_KEY_get0_group(ec_cdata) 

1338 self.openssl_assert(group != self._ffi.NULL) 

1339 point = self._lib.EC_POINT_new(group) 

1340 self.openssl_assert(point != self._ffi.NULL) 

1341 point = self._ffi.gc(point, self._lib.EC_POINT_free) 

1342 with self._tmp_bn_ctx() as bn_ctx: 

1343 res = self._lib.EC_POINT_oct2point( 

1344 group, point, point_bytes, len(point_bytes), bn_ctx 

1345 ) 

1346 if res != 1: 

1347 self._consume_errors() 

1348 raise ValueError("Invalid public bytes for the given curve") 

1349 

1350 res = self._lib.EC_KEY_set_public_key(ec_cdata, point) 

1351 self.openssl_assert(res == 1) 

1352 evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata) 

1353 return _EllipticCurvePublicKey(self, ec_cdata, evp_pkey) 

1354 

1355 def derive_elliptic_curve_private_key( 

1356 self, private_value: int, curve: ec.EllipticCurve 

1357 ) -> ec.EllipticCurvePrivateKey: 

1358 ec_cdata = self._ec_key_new_by_curve(curve) 

1359 

1360 group = self._lib.EC_KEY_get0_group(ec_cdata) 

1361 self.openssl_assert(group != self._ffi.NULL) 

1362 

1363 point = self._lib.EC_POINT_new(group) 

1364 self.openssl_assert(point != self._ffi.NULL) 

1365 point = self._ffi.gc(point, self._lib.EC_POINT_free) 

1366 

1367 value = self._int_to_bn(private_value) 

1368 value = self._ffi.gc(value, self._lib.BN_clear_free) 

1369 

1370 with self._tmp_bn_ctx() as bn_ctx: 

1371 res = self._lib.EC_POINT_mul( 

1372 group, point, value, self._ffi.NULL, self._ffi.NULL, bn_ctx 

1373 ) 

1374 self.openssl_assert(res == 1) 

1375 

1376 bn_x = self._lib.BN_CTX_get(bn_ctx) 

1377 bn_y = self._lib.BN_CTX_get(bn_ctx) 

1378 

1379 res = self._lib.EC_POINT_get_affine_coordinates( 

1380 group, point, bn_x, bn_y, bn_ctx 

1381 ) 

1382 if res != 1: 

1383 self._consume_errors() 

1384 raise ValueError("Unable to derive key from private_value") 

1385 

1386 res = self._lib.EC_KEY_set_public_key(ec_cdata, point) 

1387 self.openssl_assert(res == 1) 

1388 private = self._int_to_bn(private_value) 

1389 private = self._ffi.gc(private, self._lib.BN_clear_free) 

1390 res = self._lib.EC_KEY_set_private_key(ec_cdata, private) 

1391 self.openssl_assert(res == 1) 

1392 

1393 evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata) 

1394 

1395 return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey) 

1396 

1397 def _ec_key_new_by_curve(self, curve: ec.EllipticCurve): 

1398 curve_nid = self._elliptic_curve_to_nid(curve) 

1399 return self._ec_key_new_by_curve_nid(curve_nid) 

1400 

1401 def _ec_key_new_by_curve_nid(self, curve_nid: int): 

1402 ec_cdata = self._lib.EC_KEY_new_by_curve_name(curve_nid) 

1403 self.openssl_assert(ec_cdata != self._ffi.NULL) 

1404 return self._ffi.gc(ec_cdata, self._lib.EC_KEY_free) 

1405 

1406 def elliptic_curve_exchange_algorithm_supported( 

1407 self, algorithm: ec.ECDH, curve: ec.EllipticCurve 

1408 ) -> bool: 

1409 if self._fips_enabled and not isinstance( 

1410 curve, self._fips_ecdh_curves 

1411 ): 

1412 return False 

1413 

1414 return self.elliptic_curve_supported(curve) and isinstance( 

1415 algorithm, ec.ECDH 

1416 ) 

1417 

1418 def _ec_cdata_to_evp_pkey(self, ec_cdata): 

1419 evp_pkey = self._create_evp_pkey_gc() 

1420 res = self._lib.EVP_PKEY_set1_EC_KEY(evp_pkey, ec_cdata) 

1421 self.openssl_assert(res == 1) 

1422 return evp_pkey 

1423 

1424 def _elliptic_curve_to_nid(self, curve: ec.EllipticCurve) -> int: 

1425 """ 

1426 Get the NID for a curve name. 

1427 """ 

1428 

1429 curve_aliases = {"secp192r1": "prime192v1", "secp256r1": "prime256v1"} 

1430 

1431 curve_name = curve_aliases.get(curve.name, curve.name) 

1432 

1433 curve_nid = self._lib.OBJ_sn2nid(curve_name.encode()) 

1434 if curve_nid == self._lib.NID_undef: 

1435 raise UnsupportedAlgorithm( 

1436 f"{curve.name} is not a supported elliptic curve", 

1437 _Reasons.UNSUPPORTED_ELLIPTIC_CURVE, 

1438 ) 

1439 return curve_nid 

1440 

1441 @contextmanager 

1442 def _tmp_bn_ctx(self): 

1443 bn_ctx = self._lib.BN_CTX_new() 

1444 self.openssl_assert(bn_ctx != self._ffi.NULL) 

1445 bn_ctx = self._ffi.gc(bn_ctx, self._lib.BN_CTX_free) 

1446 self._lib.BN_CTX_start(bn_ctx) 

1447 try: 

1448 yield bn_ctx 

1449 finally: 

1450 self._lib.BN_CTX_end(bn_ctx) 

1451 

1452 def _ec_key_set_public_key_affine_coordinates( 

1453 self, 

1454 ec_cdata, 

1455 x: int, 

1456 y: int, 

1457 bn_ctx, 

1458 ) -> None: 

1459 """ 

1460 Sets the public key point in the EC_KEY context to the affine x and y 

1461 values. 

1462 """ 

1463 

1464 if x < 0 or y < 0: 

1465 raise ValueError( 

1466 "Invalid EC key. Both x and y must be non-negative." 

1467 ) 

1468 

1469 x = self._ffi.gc(self._int_to_bn(x), self._lib.BN_free) 

1470 y = self._ffi.gc(self._int_to_bn(y), self._lib.BN_free) 

1471 group = self._lib.EC_KEY_get0_group(ec_cdata) 

1472 self.openssl_assert(group != self._ffi.NULL) 

1473 point = self._lib.EC_POINT_new(group) 

1474 self.openssl_assert(point != self._ffi.NULL) 

1475 point = self._ffi.gc(point, self._lib.EC_POINT_free) 

1476 res = self._lib.EC_POINT_set_affine_coordinates( 

1477 group, point, x, y, bn_ctx 

1478 ) 

1479 if res != 1: 

1480 self._consume_errors() 

1481 raise ValueError("Invalid EC key.") 

1482 res = self._lib.EC_KEY_set_public_key(ec_cdata, point) 

1483 self.openssl_assert(res == 1) 

1484 

1485 def _private_key_bytes( 

1486 self, 

1487 encoding: serialization.Encoding, 

1488 format: serialization.PrivateFormat, 

1489 encryption_algorithm: serialization.KeySerializationEncryption, 

1490 key, 

1491 evp_pkey, 

1492 cdata, 

1493 ) -> bytes: 

1494 # validate argument types 

1495 if not isinstance(encoding, serialization.Encoding): 

1496 raise TypeError("encoding must be an item from the Encoding enum") 

1497 if not isinstance(format, serialization.PrivateFormat): 

1498 raise TypeError( 

1499 "format must be an item from the PrivateFormat enum" 

1500 ) 

1501 if not isinstance( 

1502 encryption_algorithm, serialization.KeySerializationEncryption 

1503 ): 

1504 raise TypeError( 

1505 "Encryption algorithm must be a KeySerializationEncryption " 

1506 "instance" 

1507 ) 

1508 

1509 # validate password 

1510 if isinstance(encryption_algorithm, serialization.NoEncryption): 

1511 password = b"" 

1512 elif isinstance( 

1513 encryption_algorithm, serialization.BestAvailableEncryption 

1514 ): 

1515 password = encryption_algorithm.password 

1516 if len(password) > 1023: 

1517 raise ValueError( 

1518 "Passwords longer than 1023 bytes are not supported by " 

1519 "this backend" 

1520 ) 

1521 elif ( 

1522 isinstance( 

1523 encryption_algorithm, serialization._KeySerializationEncryption 

1524 ) 

1525 and encryption_algorithm._format 

1526 is format 

1527 is serialization.PrivateFormat.OpenSSH 

1528 ): 

1529 password = encryption_algorithm.password 

1530 else: 

1531 raise ValueError("Unsupported encryption type") 

1532 

1533 # PKCS8 + PEM/DER 

1534 if format is serialization.PrivateFormat.PKCS8: 

1535 if encoding is serialization.Encoding.PEM: 

1536 write_bio = self._lib.PEM_write_bio_PKCS8PrivateKey 

1537 elif encoding is serialization.Encoding.DER: 

1538 write_bio = self._lib.i2d_PKCS8PrivateKey_bio 

1539 else: 

1540 raise ValueError("Unsupported encoding for PKCS8") 

1541 return self._private_key_bytes_via_bio( 

1542 write_bio, evp_pkey, password 

1543 ) 

1544 

1545 # TraditionalOpenSSL + PEM/DER 

1546 if format is serialization.PrivateFormat.TraditionalOpenSSL: 

1547 if self._fips_enabled and not isinstance( 

1548 encryption_algorithm, serialization.NoEncryption 

1549 ): 

1550 raise ValueError( 

1551 "Encrypted traditional OpenSSL format is not " 

1552 "supported in FIPS mode." 

1553 ) 

1554 key_type = self._lib.EVP_PKEY_id(evp_pkey) 

1555 

1556 if encoding is serialization.Encoding.PEM: 

1557 if key_type == self._lib.EVP_PKEY_RSA: 

1558 write_bio = self._lib.PEM_write_bio_RSAPrivateKey 

1559 elif key_type == self._lib.EVP_PKEY_DSA: 

1560 write_bio = self._lib.PEM_write_bio_DSAPrivateKey 

1561 elif key_type == self._lib.EVP_PKEY_EC: 

1562 write_bio = self._lib.PEM_write_bio_ECPrivateKey 

1563 else: 

1564 raise ValueError( 

1565 "Unsupported key type for TraditionalOpenSSL" 

1566 ) 

1567 return self._private_key_bytes_via_bio( 

1568 write_bio, cdata, password 

1569 ) 

1570 

1571 if encoding is serialization.Encoding.DER: 

1572 if password: 

1573 raise ValueError( 

1574 "Encryption is not supported for DER encoded " 

1575 "traditional OpenSSL keys" 

1576 ) 

1577 if key_type == self._lib.EVP_PKEY_RSA: 

1578 write_bio = self._lib.i2d_RSAPrivateKey_bio 

1579 elif key_type == self._lib.EVP_PKEY_EC: 

1580 write_bio = self._lib.i2d_ECPrivateKey_bio 

1581 elif key_type == self._lib.EVP_PKEY_DSA: 

1582 write_bio = self._lib.i2d_DSAPrivateKey_bio 

1583 else: 

1584 raise ValueError( 

1585 "Unsupported key type for TraditionalOpenSSL" 

1586 ) 

1587 return self._bio_func_output(write_bio, cdata) 

1588 

1589 raise ValueError("Unsupported encoding for TraditionalOpenSSL") 

1590 

1591 # OpenSSH + PEM 

1592 if format is serialization.PrivateFormat.OpenSSH: 

1593 if encoding is serialization.Encoding.PEM: 

1594 return ssh._serialize_ssh_private_key( 

1595 key, password, encryption_algorithm 

1596 ) 

1597 

1598 raise ValueError( 

1599 "OpenSSH private key format can only be used" 

1600 " with PEM encoding" 

1601 ) 

1602 

1603 # Anything that key-specific code was supposed to handle earlier, 

1604 # like Raw. 

1605 raise ValueError("format is invalid with this key") 

1606 

1607 def _private_key_bytes_via_bio( 

1608 self, write_bio, evp_pkey, password 

1609 ) -> bytes: 

1610 if not password: 

1611 evp_cipher = self._ffi.NULL 

1612 else: 

1613 # This is a curated value that we will update over time. 

1614 evp_cipher = self._lib.EVP_get_cipherbyname(b"aes-256-cbc") 

1615 

1616 return self._bio_func_output( 

1617 write_bio, 

1618 evp_pkey, 

1619 evp_cipher, 

1620 password, 

1621 len(password), 

1622 self._ffi.NULL, 

1623 self._ffi.NULL, 

1624 ) 

1625 

1626 def _bio_func_output(self, write_bio, *args) -> bytes: 

1627 bio = self._create_mem_bio_gc() 

1628 res = write_bio(bio, *args) 

1629 self.openssl_assert(res == 1) 

1630 return self._read_mem_bio(bio) 

1631 

1632 def _public_key_bytes( 

1633 self, 

1634 encoding: serialization.Encoding, 

1635 format: serialization.PublicFormat, 

1636 key, 

1637 evp_pkey, 

1638 cdata, 

1639 ) -> bytes: 

1640 if not isinstance(encoding, serialization.Encoding): 

1641 raise TypeError("encoding must be an item from the Encoding enum") 

1642 if not isinstance(format, serialization.PublicFormat): 

1643 raise TypeError( 

1644 "format must be an item from the PublicFormat enum" 

1645 ) 

1646 

1647 # SubjectPublicKeyInfo + PEM/DER 

1648 if format is serialization.PublicFormat.SubjectPublicKeyInfo: 

1649 if encoding is serialization.Encoding.PEM: 

1650 write_bio = self._lib.PEM_write_bio_PUBKEY 

1651 elif encoding is serialization.Encoding.DER: 

1652 write_bio = self._lib.i2d_PUBKEY_bio 

1653 else: 

1654 raise ValueError( 

1655 "SubjectPublicKeyInfo works only with PEM or DER encoding" 

1656 ) 

1657 return self._bio_func_output(write_bio, evp_pkey) 

1658 

1659 # PKCS1 + PEM/DER 

1660 if format is serialization.PublicFormat.PKCS1: 

1661 # Only RSA is supported here. 

1662 key_type = self._lib.EVP_PKEY_id(evp_pkey) 

1663 if key_type != self._lib.EVP_PKEY_RSA: 

1664 raise ValueError("PKCS1 format is supported only for RSA keys") 

1665 

1666 if encoding is serialization.Encoding.PEM: 

1667 write_bio = self._lib.PEM_write_bio_RSAPublicKey 

1668 elif encoding is serialization.Encoding.DER: 

1669 write_bio = self._lib.i2d_RSAPublicKey_bio 

1670 else: 

1671 raise ValueError("PKCS1 works only with PEM or DER encoding") 

1672 return self._bio_func_output(write_bio, cdata) 

1673 

1674 # OpenSSH + OpenSSH 

1675 if format is serialization.PublicFormat.OpenSSH: 

1676 if encoding is serialization.Encoding.OpenSSH: 

1677 return ssh.serialize_ssh_public_key(key) 

1678 

1679 raise ValueError( 

1680 "OpenSSH format must be used with OpenSSH encoding" 

1681 ) 

1682 

1683 # Anything that key-specific code was supposed to handle earlier, 

1684 # like Raw, CompressedPoint, UncompressedPoint 

1685 raise ValueError("format is invalid with this key") 

1686 

1687 def dh_supported(self) -> bool: 

1688 return not self._lib.CRYPTOGRAPHY_IS_BORINGSSL 

1689 

1690 def generate_dh_parameters( 

1691 self, generator: int, key_size: int 

1692 ) -> dh.DHParameters: 

1693 if key_size < dh._MIN_MODULUS_SIZE: 

1694 raise ValueError( 

1695 "DH key_size must be at least {} bits".format( 

1696 dh._MIN_MODULUS_SIZE 

1697 ) 

1698 ) 

1699 

1700 if generator not in (2, 5): 

1701 raise ValueError("DH generator must be 2 or 5") 

1702 

1703 dh_param_cdata = self._lib.DH_new() 

1704 self.openssl_assert(dh_param_cdata != self._ffi.NULL) 

1705 dh_param_cdata = self._ffi.gc(dh_param_cdata, self._lib.DH_free) 

1706 

1707 res = self._lib.DH_generate_parameters_ex( 

1708 dh_param_cdata, key_size, generator, self._ffi.NULL 

1709 ) 

1710 if res != 1: 

1711 errors = self._consume_errors_with_text() 

1712 raise ValueError("Unable to generate DH parameters", errors) 

1713 

1714 return _DHParameters(self, dh_param_cdata) 

1715 

1716 def _dh_cdata_to_evp_pkey(self, dh_cdata): 

1717 evp_pkey = self._create_evp_pkey_gc() 

1718 res = self._lib.EVP_PKEY_set1_DH(evp_pkey, dh_cdata) 

1719 self.openssl_assert(res == 1) 

1720 return evp_pkey 

1721 

1722 def generate_dh_private_key( 

1723 self, parameters: dh.DHParameters 

1724 ) -> dh.DHPrivateKey: 

1725 dh_key_cdata = _dh_params_dup( 

1726 parameters._dh_cdata, self # type: ignore[attr-defined] 

1727 ) 

1728 

1729 res = self._lib.DH_generate_key(dh_key_cdata) 

1730 self.openssl_assert(res == 1) 

1731 

1732 evp_pkey = self._dh_cdata_to_evp_pkey(dh_key_cdata) 

1733 

1734 return _DHPrivateKey(self, dh_key_cdata, evp_pkey) 

1735 

1736 def generate_dh_private_key_and_parameters( 

1737 self, generator: int, key_size: int 

1738 ) -> dh.DHPrivateKey: 

1739 return self.generate_dh_private_key( 

1740 self.generate_dh_parameters(generator, key_size) 

1741 ) 

1742 

1743 def load_dh_private_numbers( 

1744 self, numbers: dh.DHPrivateNumbers 

1745 ) -> dh.DHPrivateKey: 

1746 parameter_numbers = numbers.public_numbers.parameter_numbers 

1747 

1748 dh_cdata = self._lib.DH_new() 

1749 self.openssl_assert(dh_cdata != self._ffi.NULL) 

1750 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free) 

1751 

1752 p = self._int_to_bn(parameter_numbers.p) 

1753 g = self._int_to_bn(parameter_numbers.g) 

1754 

1755 if parameter_numbers.q is not None: 

1756 q = self._int_to_bn(parameter_numbers.q) 

1757 else: 

1758 q = self._ffi.NULL 

1759 

1760 pub_key = self._int_to_bn(numbers.public_numbers.y) 

1761 priv_key = self._int_to_bn(numbers.x) 

1762 

1763 res = self._lib.DH_set0_pqg(dh_cdata, p, q, g) 

1764 self.openssl_assert(res == 1) 

1765 

1766 res = self._lib.DH_set0_key(dh_cdata, pub_key, priv_key) 

1767 self.openssl_assert(res == 1) 

1768 

1769 codes = self._ffi.new("int[]", 1) 

1770 res = self._lib.DH_check(dh_cdata, codes) 

1771 self.openssl_assert(res == 1) 

1772 

1773 # DH_check will return DH_NOT_SUITABLE_GENERATOR if p % 24 does not 

1774 # equal 11 when the generator is 2 (a quadratic nonresidue). 

1775 # We want to ignore that error because p % 24 == 23 is also fine. 

1776 # Specifically, g is then a quadratic residue. Within the context of 

1777 # Diffie-Hellman this means it can only generate half the possible 

1778 # values. That sounds bad, but quadratic nonresidues leak a bit of 

1779 # the key to the attacker in exchange for having the full key space 

1780 # available. See: https://crypto.stackexchange.com/questions/12961 

1781 if codes[0] != 0 and not ( 

1782 parameter_numbers.g == 2 

1783 and codes[0] ^ self._lib.DH_NOT_SUITABLE_GENERATOR == 0 

1784 ): 

1785 raise ValueError("DH private numbers did not pass safety checks.") 

1786 

1787 evp_pkey = self._dh_cdata_to_evp_pkey(dh_cdata) 

1788 

1789 return _DHPrivateKey(self, dh_cdata, evp_pkey) 

1790 

1791 def load_dh_public_numbers( 

1792 self, numbers: dh.DHPublicNumbers 

1793 ) -> dh.DHPublicKey: 

1794 dh_cdata = self._lib.DH_new() 

1795 self.openssl_assert(dh_cdata != self._ffi.NULL) 

1796 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free) 

1797 

1798 parameter_numbers = numbers.parameter_numbers 

1799 

1800 p = self._int_to_bn(parameter_numbers.p) 

1801 g = self._int_to_bn(parameter_numbers.g) 

1802 

1803 if parameter_numbers.q is not None: 

1804 q = self._int_to_bn(parameter_numbers.q) 

1805 else: 

1806 q = self._ffi.NULL 

1807 

1808 pub_key = self._int_to_bn(numbers.y) 

1809 

1810 res = self._lib.DH_set0_pqg(dh_cdata, p, q, g) 

1811 self.openssl_assert(res == 1) 

1812 

1813 res = self._lib.DH_set0_key(dh_cdata, pub_key, self._ffi.NULL) 

1814 self.openssl_assert(res == 1) 

1815 

1816 evp_pkey = self._dh_cdata_to_evp_pkey(dh_cdata) 

1817 

1818 return _DHPublicKey(self, dh_cdata, evp_pkey) 

1819 

1820 def load_dh_parameter_numbers( 

1821 self, numbers: dh.DHParameterNumbers 

1822 ) -> dh.DHParameters: 

1823 dh_cdata = self._lib.DH_new() 

1824 self.openssl_assert(dh_cdata != self._ffi.NULL) 

1825 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free) 

1826 

1827 p = self._int_to_bn(numbers.p) 

1828 g = self._int_to_bn(numbers.g) 

1829 

1830 if numbers.q is not None: 

1831 q = self._int_to_bn(numbers.q) 

1832 else: 

1833 q = self._ffi.NULL 

1834 

1835 res = self._lib.DH_set0_pqg(dh_cdata, p, q, g) 

1836 self.openssl_assert(res == 1) 

1837 

1838 return _DHParameters(self, dh_cdata) 

1839 

1840 def dh_parameters_supported( 

1841 self, p: int, g: int, q: typing.Optional[int] = None 

1842 ) -> bool: 

1843 dh_cdata = self._lib.DH_new() 

1844 self.openssl_assert(dh_cdata != self._ffi.NULL) 

1845 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free) 

1846 

1847 p = self._int_to_bn(p) 

1848 g = self._int_to_bn(g) 

1849 

1850 if q is not None: 

1851 q = self._int_to_bn(q) 

1852 else: 

1853 q = self._ffi.NULL 

1854 

1855 res = self._lib.DH_set0_pqg(dh_cdata, p, q, g) 

1856 self.openssl_assert(res == 1) 

1857 

1858 codes = self._ffi.new("int[]", 1) 

1859 res = self._lib.DH_check(dh_cdata, codes) 

1860 self.openssl_assert(res == 1) 

1861 

1862 return codes[0] == 0 

1863 

1864 def dh_x942_serialization_supported(self) -> bool: 

1865 return self._lib.Cryptography_HAS_EVP_PKEY_DHX == 1 

1866 

1867 def x25519_load_public_bytes(self, data: bytes) -> x25519.X25519PublicKey: 

1868 if len(data) != 32: 

1869 raise ValueError("An X25519 public key is 32 bytes long") 

1870 

1871 data_ptr = self._ffi.from_buffer(data) 

1872 evp_pkey = self._lib.EVP_PKEY_new_raw_public_key( 

1873 self._lib.NID_X25519, self._ffi.NULL, data_ptr, len(data) 

1874 ) 

1875 self.openssl_assert(evp_pkey != self._ffi.NULL) 

1876 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 

1877 return _X25519PublicKey(self, evp_pkey) 

1878 

1879 def x25519_load_private_bytes( 

1880 self, data: bytes 

1881 ) -> x25519.X25519PrivateKey: 

1882 if len(data) != 32: 

1883 raise ValueError("An X25519 private key is 32 bytes long") 

1884 

1885 data_ptr = self._ffi.from_buffer(data) 

1886 evp_pkey = self._lib.EVP_PKEY_new_raw_private_key( 

1887 self._lib.NID_X25519, self._ffi.NULL, data_ptr, len(data) 

1888 ) 

1889 self.openssl_assert(evp_pkey != self._ffi.NULL) 

1890 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 

1891 return _X25519PrivateKey(self, evp_pkey) 

1892 

1893 def _evp_pkey_keygen_gc(self, nid): 

1894 evp_pkey_ctx = self._lib.EVP_PKEY_CTX_new_id(nid, self._ffi.NULL) 

1895 self.openssl_assert(evp_pkey_ctx != self._ffi.NULL) 

1896 evp_pkey_ctx = self._ffi.gc(evp_pkey_ctx, self._lib.EVP_PKEY_CTX_free) 

1897 res = self._lib.EVP_PKEY_keygen_init(evp_pkey_ctx) 

1898 self.openssl_assert(res == 1) 

1899 evp_ppkey = self._ffi.new("EVP_PKEY **") 

1900 res = self._lib.EVP_PKEY_keygen(evp_pkey_ctx, evp_ppkey) 

1901 self.openssl_assert(res == 1) 

1902 self.openssl_assert(evp_ppkey[0] != self._ffi.NULL) 

1903 evp_pkey = self._ffi.gc(evp_ppkey[0], self._lib.EVP_PKEY_free) 

1904 return evp_pkey 

1905 

1906 def x25519_generate_key(self) -> x25519.X25519PrivateKey: 

1907 evp_pkey = self._evp_pkey_keygen_gc(self._lib.NID_X25519) 

1908 return _X25519PrivateKey(self, evp_pkey) 

1909 

1910 def x25519_supported(self) -> bool: 

1911 if self._fips_enabled: 

1912 return False 

1913 return not self._lib.CRYPTOGRAPHY_LIBRESSL_LESS_THAN_370 

1914 

1915 def x448_load_public_bytes(self, data: bytes) -> x448.X448PublicKey: 

1916 if len(data) != 56: 

1917 raise ValueError("An X448 public key is 56 bytes long") 

1918 

1919 evp_pkey = self._lib.EVP_PKEY_new_raw_public_key( 

1920 self._lib.NID_X448, self._ffi.NULL, data, len(data) 

1921 ) 

1922 self.openssl_assert(evp_pkey != self._ffi.NULL) 

1923 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 

1924 return _X448PublicKey(self, evp_pkey) 

1925 

1926 def x448_load_private_bytes(self, data: bytes) -> x448.X448PrivateKey: 

1927 if len(data) != 56: 

1928 raise ValueError("An X448 private key is 56 bytes long") 

1929 

1930 data_ptr = self._ffi.from_buffer(data) 

1931 evp_pkey = self._lib.EVP_PKEY_new_raw_private_key( 

1932 self._lib.NID_X448, self._ffi.NULL, data_ptr, len(data) 

1933 ) 

1934 self.openssl_assert(evp_pkey != self._ffi.NULL) 

1935 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 

1936 return _X448PrivateKey(self, evp_pkey) 

1937 

1938 def x448_generate_key(self) -> x448.X448PrivateKey: 

1939 evp_pkey = self._evp_pkey_keygen_gc(self._lib.NID_X448) 

1940 return _X448PrivateKey(self, evp_pkey) 

1941 

1942 def x448_supported(self) -> bool: 

1943 if self._fips_enabled: 

1944 return False 

1945 return ( 

1946 not self._lib.CRYPTOGRAPHY_IS_LIBRESSL 

1947 and not self._lib.CRYPTOGRAPHY_IS_BORINGSSL 

1948 ) 

1949 

1950 def ed25519_supported(self) -> bool: 

1951 if self._fips_enabled: 

1952 return False 

1953 return self._lib.CRYPTOGRAPHY_HAS_WORKING_ED25519 

1954 

1955 def ed25519_load_public_bytes( 

1956 self, data: bytes 

1957 ) -> ed25519.Ed25519PublicKey: 

1958 utils._check_bytes("data", data) 

1959 

1960 if len(data) != ed25519._ED25519_KEY_SIZE: 

1961 raise ValueError("An Ed25519 public key is 32 bytes long") 

1962 

1963 evp_pkey = self._lib.EVP_PKEY_new_raw_public_key( 

1964 self._lib.NID_ED25519, self._ffi.NULL, data, len(data) 

1965 ) 

1966 self.openssl_assert(evp_pkey != self._ffi.NULL) 

1967 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 

1968 

1969 return _Ed25519PublicKey(self, evp_pkey) 

1970 

1971 def ed25519_load_private_bytes( 

1972 self, data: bytes 

1973 ) -> ed25519.Ed25519PrivateKey: 

1974 if len(data) != ed25519._ED25519_KEY_SIZE: 

1975 raise ValueError("An Ed25519 private key is 32 bytes long") 

1976 

1977 utils._check_byteslike("data", data) 

1978 data_ptr = self._ffi.from_buffer(data) 

1979 evp_pkey = self._lib.EVP_PKEY_new_raw_private_key( 

1980 self._lib.NID_ED25519, self._ffi.NULL, data_ptr, len(data) 

1981 ) 

1982 self.openssl_assert(evp_pkey != self._ffi.NULL) 

1983 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 

1984 

1985 return _Ed25519PrivateKey(self, evp_pkey) 

1986 

1987 def ed25519_generate_key(self) -> ed25519.Ed25519PrivateKey: 

1988 evp_pkey = self._evp_pkey_keygen_gc(self._lib.NID_ED25519) 

1989 return _Ed25519PrivateKey(self, evp_pkey) 

1990 

1991 def ed448_supported(self) -> bool: 

1992 if self._fips_enabled: 

1993 return False 

1994 return ( 

1995 not self._lib.CRYPTOGRAPHY_OPENSSL_LESS_THAN_111B 

1996 and not self._lib.CRYPTOGRAPHY_IS_BORINGSSL 

1997 ) 

1998 

1999 def ed448_load_public_bytes(self, data: bytes) -> ed448.Ed448PublicKey: 

2000 utils._check_bytes("data", data) 

2001 if len(data) != _ED448_KEY_SIZE: 

2002 raise ValueError("An Ed448 public key is 57 bytes long") 

2003 

2004 evp_pkey = self._lib.EVP_PKEY_new_raw_public_key( 

2005 self._lib.NID_ED448, self._ffi.NULL, data, len(data) 

2006 ) 

2007 self.openssl_assert(evp_pkey != self._ffi.NULL) 

2008 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 

2009 

2010 return _Ed448PublicKey(self, evp_pkey) 

2011 

2012 def ed448_load_private_bytes(self, data: bytes) -> ed448.Ed448PrivateKey: 

2013 utils._check_byteslike("data", data) 

2014 if len(data) != _ED448_KEY_SIZE: 

2015 raise ValueError("An Ed448 private key is 57 bytes long") 

2016 

2017 data_ptr = self._ffi.from_buffer(data) 

2018 evp_pkey = self._lib.EVP_PKEY_new_raw_private_key( 

2019 self._lib.NID_ED448, self._ffi.NULL, data_ptr, len(data) 

2020 ) 

2021 self.openssl_assert(evp_pkey != self._ffi.NULL) 

2022 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 

2023 

2024 return _Ed448PrivateKey(self, evp_pkey) 

2025 

2026 def ed448_generate_key(self) -> ed448.Ed448PrivateKey: 

2027 evp_pkey = self._evp_pkey_keygen_gc(self._lib.NID_ED448) 

2028 return _Ed448PrivateKey(self, evp_pkey) 

2029 

2030 def derive_scrypt( 

2031 self, 

2032 key_material: bytes, 

2033 salt: bytes, 

2034 length: int, 

2035 n: int, 

2036 r: int, 

2037 p: int, 

2038 ) -> bytes: 

2039 buf = self._ffi.new("unsigned char[]", length) 

2040 key_material_ptr = self._ffi.from_buffer(key_material) 

2041 res = self._lib.EVP_PBE_scrypt( 

2042 key_material_ptr, 

2043 len(key_material), 

2044 salt, 

2045 len(salt), 

2046 n, 

2047 r, 

2048 p, 

2049 scrypt._MEM_LIMIT, 

2050 buf, 

2051 length, 

2052 ) 

2053 if res != 1: 

2054 errors = self._consume_errors_with_text() 

2055 # memory required formula explained here: 

2056 # https://blog.filippo.io/the-scrypt-parameters/ 

2057 min_memory = 128 * n * r // (1024**2) 

2058 raise MemoryError( 

2059 "Not enough memory to derive key. These parameters require" 

2060 " {} MB of memory.".format(min_memory), 

2061 errors, 

2062 ) 

2063 return self._ffi.buffer(buf)[:] 

2064 

2065 def aead_cipher_supported(self, cipher) -> bool: 

2066 cipher_name = aead._aead_cipher_name(cipher) 

2067 if self._fips_enabled and cipher_name not in self._fips_aead: 

2068 return False 

2069 # SIV isn't loaded through get_cipherbyname but instead a new fetch API 

2070 # only available in 3.0+. But if we know we're on 3.0+ then we know 

2071 # it's supported. 

2072 if cipher_name.endswith(b"-siv"): 

2073 return self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER == 1 

2074 else: 

2075 return ( 

2076 self._lib.EVP_get_cipherbyname(cipher_name) != self._ffi.NULL 

2077 ) 

2078 

2079 def _zero_data(self, data, length: int) -> None: 

2080 # We clear things this way because at the moment we're not 

2081 # sure of a better way that can guarantee it overwrites the 

2082 # memory of a bytearray and doesn't just replace the underlying char *. 

2083 for i in range(length): 

2084 data[i] = 0 

2085 

2086 @contextlib.contextmanager 

2087 def _zeroed_null_terminated_buf(self, data): 

2088 """ 

2089 This method takes bytes, which can be a bytestring or a mutable 

2090 buffer like a bytearray, and yields a null-terminated version of that 

2091 data. This is required because PKCS12_parse doesn't take a length with 

2092 its password char * and ffi.from_buffer doesn't provide null 

2093 termination. So, to support zeroing the data via bytearray we 

2094 need to build this ridiculous construct that copies the memory, but 

2095 zeroes it after use. 

2096 """ 

2097 if data is None: 

2098 yield self._ffi.NULL 

2099 else: 

2100 data_len = len(data) 

2101 buf = self._ffi.new("char[]", data_len + 1) 

2102 self._ffi.memmove(buf, data, data_len) 

2103 try: 

2104 yield buf 

2105 finally: 

2106 # Cast to a uint8_t * so we can assign by integer 

2107 self._zero_data(self._ffi.cast("uint8_t *", buf), data_len) 

2108 

2109 def load_key_and_certificates_from_pkcs12( 

2110 self, data: bytes, password: typing.Optional[bytes] 

2111 ) -> typing.Tuple[ 

2112 typing.Optional[PrivateKeyTypes], 

2113 typing.Optional[x509.Certificate], 

2114 typing.List[x509.Certificate], 

2115 ]: 

2116 pkcs12 = self.load_pkcs12(data, password) 

2117 return ( 

2118 pkcs12.key, 

2119 pkcs12.cert.certificate if pkcs12.cert else None, 

2120 [cert.certificate for cert in pkcs12.additional_certs], 

2121 ) 

2122 

2123 def load_pkcs12( 

2124 self, data: bytes, password: typing.Optional[bytes] 

2125 ) -> PKCS12KeyAndCertificates: 

2126 if password is not None: 

2127 utils._check_byteslike("password", password) 

2128 

2129 bio = self._bytes_to_bio(data) 

2130 p12 = self._lib.d2i_PKCS12_bio(bio.bio, self._ffi.NULL) 

2131 if p12 == self._ffi.NULL: 

2132 self._consume_errors() 

2133 raise ValueError("Could not deserialize PKCS12 data") 

2134 

2135 p12 = self._ffi.gc(p12, self._lib.PKCS12_free) 

2136 evp_pkey_ptr = self._ffi.new("EVP_PKEY **") 

2137 x509_ptr = self._ffi.new("X509 **") 

2138 sk_x509_ptr = self._ffi.new("Cryptography_STACK_OF_X509 **") 

2139 with self._zeroed_null_terminated_buf(password) as password_buf: 

2140 res = self._lib.PKCS12_parse( 

2141 p12, password_buf, evp_pkey_ptr, x509_ptr, sk_x509_ptr 

2142 ) 

2143 if res == 0: 

2144 self._consume_errors() 

2145 raise ValueError("Invalid password or PKCS12 data") 

2146 

2147 cert = None 

2148 key = None 

2149 additional_certificates = [] 

2150 

2151 if evp_pkey_ptr[0] != self._ffi.NULL: 

2152 evp_pkey = self._ffi.gc(evp_pkey_ptr[0], self._lib.EVP_PKEY_free) 

2153 # We don't support turning off RSA key validation when loading 

2154 # PKCS12 keys 

2155 key = self._evp_pkey_to_private_key( 

2156 evp_pkey, unsafe_skip_rsa_key_validation=False 

2157 ) 

2158 

2159 if x509_ptr[0] != self._ffi.NULL: 

2160 x509 = self._ffi.gc(x509_ptr[0], self._lib.X509_free) 

2161 cert_obj = self._ossl2cert(x509) 

2162 name = None 

2163 maybe_name = self._lib.X509_alias_get0(x509, self._ffi.NULL) 

2164 if maybe_name != self._ffi.NULL: 

2165 name = self._ffi.string(maybe_name) 

2166 cert = PKCS12Certificate(cert_obj, name) 

2167 

2168 if sk_x509_ptr[0] != self._ffi.NULL: 

2169 sk_x509 = self._ffi.gc(sk_x509_ptr[0], self._lib.sk_X509_free) 

2170 num = self._lib.sk_X509_num(sk_x509_ptr[0]) 

2171 

2172 # In OpenSSL < 3.0.0 PKCS12 parsing reverses the order of the 

2173 # certificates. 

2174 indices: typing.Iterable[int] 

2175 if ( 

2176 self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER 

2177 or self._lib.CRYPTOGRAPHY_IS_BORINGSSL 

2178 ): 

2179 indices = range(num) 

2180 else: 

2181 indices = reversed(range(num)) 

2182 

2183 for i in indices: 

2184 x509 = self._lib.sk_X509_value(sk_x509, i) 

2185 self.openssl_assert(x509 != self._ffi.NULL) 

2186 x509 = self._ffi.gc(x509, self._lib.X509_free) 

2187 addl_cert = self._ossl2cert(x509) 

2188 addl_name = None 

2189 maybe_name = self._lib.X509_alias_get0(x509, self._ffi.NULL) 

2190 if maybe_name != self._ffi.NULL: 

2191 addl_name = self._ffi.string(maybe_name) 

2192 additional_certificates.append( 

2193 PKCS12Certificate(addl_cert, addl_name) 

2194 ) 

2195 

2196 return PKCS12KeyAndCertificates(key, cert, additional_certificates) 

2197 

2198 def serialize_key_and_certificates_to_pkcs12( 

2199 self, 

2200 name: typing.Optional[bytes], 

2201 key: typing.Optional[PKCS12PrivateKeyTypes], 

2202 cert: typing.Optional[x509.Certificate], 

2203 cas: typing.Optional[typing.List[_PKCS12CATypes]], 

2204 encryption_algorithm: serialization.KeySerializationEncryption, 

2205 ) -> bytes: 

2206 password = None 

2207 if name is not None: 

2208 utils._check_bytes("name", name) 

2209 

2210 if isinstance(encryption_algorithm, serialization.NoEncryption): 

2211 nid_cert = -1 

2212 nid_key = -1 

2213 pkcs12_iter = 0 

2214 mac_iter = 0 

2215 mac_alg = self._ffi.NULL 

2216 elif isinstance( 

2217 encryption_algorithm, serialization.BestAvailableEncryption 

2218 ): 

2219 # PKCS12 encryption is hopeless trash and can never be fixed. 

2220 # OpenSSL 3 supports PBESv2, but Libre and Boring do not, so 

2221 # we use PBESv1 with 3DES on the older paths. 

2222 if self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER: 

2223 nid_cert = self._lib.NID_aes_256_cbc 

2224 nid_key = self._lib.NID_aes_256_cbc 

2225 else: 

2226 nid_cert = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC 

2227 nid_key = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC 

2228 # At least we can set this higher than OpenSSL's default 

2229 pkcs12_iter = 20000 

2230 # mac_iter chosen for compatibility reasons, see: 

2231 # https://www.openssl.org/docs/man1.1.1/man3/PKCS12_create.html 

2232 # Did we mention how lousy PKCS12 encryption is? 

2233 mac_iter = 1 

2234 # MAC algorithm can only be set on OpenSSL 3.0.0+ 

2235 mac_alg = self._ffi.NULL 

2236 password = encryption_algorithm.password 

2237 elif ( 

2238 isinstance( 

2239 encryption_algorithm, serialization._KeySerializationEncryption 

2240 ) 

2241 and encryption_algorithm._format 

2242 is serialization.PrivateFormat.PKCS12 

2243 ): 

2244 # Default to OpenSSL's defaults. Behavior will vary based on the 

2245 # version of OpenSSL cryptography is compiled against. 

2246 nid_cert = 0 

2247 nid_key = 0 

2248 # Use the default iters we use in best available 

2249 pkcs12_iter = 20000 

2250 # See the Best Available comment for why this is 1 

2251 mac_iter = 1 

2252 password = encryption_algorithm.password 

2253 keycertalg = encryption_algorithm._key_cert_algorithm 

2254 if keycertalg is PBES.PBESv1SHA1And3KeyTripleDESCBC: 

2255 nid_cert = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC 

2256 nid_key = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC 

2257 elif keycertalg is PBES.PBESv2SHA256AndAES256CBC: 

2258 if not self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER: 

2259 raise UnsupportedAlgorithm( 

2260 "PBESv2 is not supported by this version of OpenSSL" 

2261 ) 

2262 nid_cert = self._lib.NID_aes_256_cbc 

2263 nid_key = self._lib.NID_aes_256_cbc 

2264 else: 

2265 assert keycertalg is None 

2266 # We use OpenSSL's defaults 

2267 

2268 if encryption_algorithm._hmac_hash is not None: 

2269 if not self._lib.Cryptography_HAS_PKCS12_SET_MAC: 

2270 raise UnsupportedAlgorithm( 

2271 "Setting MAC algorithm is not supported by this " 

2272 "version of OpenSSL." 

2273 ) 

2274 mac_alg = self._evp_md_non_null_from_algorithm( 

2275 encryption_algorithm._hmac_hash 

2276 ) 

2277 self.openssl_assert(mac_alg != self._ffi.NULL) 

2278 else: 

2279 mac_alg = self._ffi.NULL 

2280 

2281 if encryption_algorithm._kdf_rounds is not None: 

2282 pkcs12_iter = encryption_algorithm._kdf_rounds 

2283 

2284 else: 

2285 raise ValueError("Unsupported key encryption type") 

2286 

2287 if cas is None or len(cas) == 0: 

2288 sk_x509 = self._ffi.NULL 

2289 else: 

2290 sk_x509 = self._lib.sk_X509_new_null() 

2291 sk_x509 = self._ffi.gc(sk_x509, self._lib.sk_X509_free) 

2292 

2293 # This list is to keep the x509 values alive until end of function 

2294 ossl_cas = [] 

2295 for ca in cas: 

2296 if isinstance(ca, PKCS12Certificate): 

2297 ca_alias = ca.friendly_name 

2298 ossl_ca = self._cert2ossl(ca.certificate) 

2299 if ca_alias is None: 

2300 res = self._lib.X509_alias_set1( 

2301 ossl_ca, self._ffi.NULL, -1 

2302 ) 

2303 else: 

2304 res = self._lib.X509_alias_set1( 

2305 ossl_ca, ca_alias, len(ca_alias) 

2306 ) 

2307 self.openssl_assert(res == 1) 

2308 else: 

2309 ossl_ca = self._cert2ossl(ca) 

2310 ossl_cas.append(ossl_ca) 

2311 res = self._lib.sk_X509_push(sk_x509, ossl_ca) 

2312 backend.openssl_assert(res >= 1) 

2313 

2314 with self._zeroed_null_terminated_buf(password) as password_buf: 

2315 with self._zeroed_null_terminated_buf(name) as name_buf: 

2316 ossl_cert = self._cert2ossl(cert) if cert else self._ffi.NULL 

2317 if key is not None: 

2318 evp_pkey = key._evp_pkey # type: ignore[union-attr] 

2319 else: 

2320 evp_pkey = self._ffi.NULL 

2321 

2322 p12 = self._lib.PKCS12_create( 

2323 password_buf, 

2324 name_buf, 

2325 evp_pkey, 

2326 ossl_cert, 

2327 sk_x509, 

2328 nid_key, 

2329 nid_cert, 

2330 pkcs12_iter, 

2331 mac_iter, 

2332 0, 

2333 ) 

2334 

2335 if ( 

2336 self._lib.Cryptography_HAS_PKCS12_SET_MAC 

2337 and mac_alg != self._ffi.NULL 

2338 ): 

2339 self._lib.PKCS12_set_mac( 

2340 p12, 

2341 password_buf, 

2342 -1, 

2343 self._ffi.NULL, 

2344 0, 

2345 mac_iter, 

2346 mac_alg, 

2347 ) 

2348 

2349 self.openssl_assert(p12 != self._ffi.NULL) 

2350 p12 = self._ffi.gc(p12, self._lib.PKCS12_free) 

2351 

2352 bio = self._create_mem_bio_gc() 

2353 res = self._lib.i2d_PKCS12_bio(bio, p12) 

2354 self.openssl_assert(res > 0) 

2355 return self._read_mem_bio(bio) 

2356 

2357 def poly1305_supported(self) -> bool: 

2358 if self._fips_enabled: 

2359 return False 

2360 return self._lib.Cryptography_HAS_POLY1305 == 1 

2361 

2362 def create_poly1305_ctx(self, key: bytes) -> _Poly1305Context: 

2363 utils._check_byteslike("key", key) 

2364 if len(key) != _POLY1305_KEY_SIZE: 

2365 raise ValueError("A poly1305 key is 32 bytes long") 

2366 

2367 return _Poly1305Context(self, key) 

2368 

2369 def pkcs7_supported(self) -> bool: 

2370 return not self._lib.CRYPTOGRAPHY_IS_BORINGSSL 

2371 

2372 def load_pem_pkcs7_certificates( 

2373 self, data: bytes 

2374 ) -> typing.List[x509.Certificate]: 

2375 utils._check_bytes("data", data) 

2376 bio = self._bytes_to_bio(data) 

2377 p7 = self._lib.PEM_read_bio_PKCS7( 

2378 bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL 

2379 ) 

2380 if p7 == self._ffi.NULL: 

2381 self._consume_errors() 

2382 raise ValueError("Unable to parse PKCS7 data") 

2383 

2384 p7 = self._ffi.gc(p7, self._lib.PKCS7_free) 

2385 return self._load_pkcs7_certificates(p7) 

2386 

2387 def load_der_pkcs7_certificates( 

2388 self, data: bytes 

2389 ) -> typing.List[x509.Certificate]: 

2390 utils._check_bytes("data", data) 

2391 bio = self._bytes_to_bio(data) 

2392 p7 = self._lib.d2i_PKCS7_bio(bio.bio, self._ffi.NULL) 

2393 if p7 == self._ffi.NULL: 

2394 self._consume_errors() 

2395 raise ValueError("Unable to parse PKCS7 data") 

2396 

2397 p7 = self._ffi.gc(p7, self._lib.PKCS7_free) 

2398 return self._load_pkcs7_certificates(p7) 

2399 

2400 def _load_pkcs7_certificates(self, p7) -> typing.List[x509.Certificate]: 

2401 nid = self._lib.OBJ_obj2nid(p7.type) 

2402 self.openssl_assert(nid != self._lib.NID_undef) 

2403 if nid != self._lib.NID_pkcs7_signed: 

2404 raise UnsupportedAlgorithm( 

2405 "Only basic signed structures are currently supported. NID" 

2406 " for this data was {}".format(nid), 

2407 _Reasons.UNSUPPORTED_SERIALIZATION, 

2408 ) 

2409 

2410 sk_x509 = p7.d.sign.cert 

2411 num = self._lib.sk_X509_num(sk_x509) 

2412 certs = [] 

2413 for i in range(num): 

2414 x509 = self._lib.sk_X509_value(sk_x509, i) 

2415 self.openssl_assert(x509 != self._ffi.NULL) 

2416 cert = self._ossl2cert(x509) 

2417 certs.append(cert) 

2418 

2419 return certs 

2420 

2421 

2422class GetCipherByName: 

2423 def __init__(self, fmt: str): 

2424 self._fmt = fmt 

2425 

2426 def __call__(self, backend: Backend, cipher: CipherAlgorithm, mode: Mode): 

2427 cipher_name = self._fmt.format(cipher=cipher, mode=mode).lower() 

2428 evp_cipher = backend._lib.EVP_get_cipherbyname( 

2429 cipher_name.encode("ascii") 

2430 ) 

2431 

2432 # try EVP_CIPHER_fetch if present 

2433 if ( 

2434 evp_cipher == backend._ffi.NULL 

2435 and backend._lib.Cryptography_HAS_300_EVP_CIPHER 

2436 ): 

2437 evp_cipher = backend._lib.EVP_CIPHER_fetch( 

2438 backend._ffi.NULL, 

2439 cipher_name.encode("ascii"), 

2440 backend._ffi.NULL, 

2441 ) 

2442 

2443 backend._consume_errors() 

2444 return evp_cipher 

2445 

2446 

2447def _get_xts_cipher(backend: Backend, cipher: AES, mode): 

2448 cipher_name = f"aes-{cipher.key_size // 2}-xts" 

2449 return backend._lib.EVP_get_cipherbyname(cipher_name.encode("ascii")) 

2450 

2451 

2452backend = Backend()