Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/cryptography/hazmat/backends/openssl/backend.py: 19%

1215 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:36 +0000

1# This file is dual licensed under the terms of the Apache License, Version 

2# 2.0, and the BSD License. See the LICENSE file in the root of this repository 

3# for complete details. 

4 

5 

6import collections 

7import contextlib 

8import itertools 

9import typing 

10import warnings 

11from contextlib import contextmanager 

12 

13from cryptography import utils, x509 

14from cryptography.exceptions import UnsupportedAlgorithm, _Reasons 

15from cryptography.hazmat.backends.openssl import aead 

16from cryptography.hazmat.backends.openssl.ciphers import _CipherContext 

17from cryptography.hazmat.backends.openssl.cmac import _CMACContext 

18from cryptography.hazmat.backends.openssl.dh import ( 

19 _dh_params_dup, 

20 _DHParameters, 

21 _DHPrivateKey, 

22 _DHPublicKey, 

23) 

24from cryptography.hazmat.backends.openssl.dsa import ( 

25 _DSAParameters, 

26 _DSAPrivateKey, 

27 _DSAPublicKey, 

28) 

29from cryptography.hazmat.backends.openssl.ec import ( 

30 _EllipticCurvePrivateKey, 

31 _EllipticCurvePublicKey, 

32) 

33from cryptography.hazmat.backends.openssl.ed448 import ( 

34 _ED448_KEY_SIZE, 

35 _Ed448PrivateKey, 

36 _Ed448PublicKey, 

37) 

38from cryptography.hazmat.backends.openssl.ed25519 import ( 

39 _Ed25519PrivateKey, 

40 _Ed25519PublicKey, 

41) 

42from cryptography.hazmat.backends.openssl.hashes import _HashContext 

43from cryptography.hazmat.backends.openssl.hmac import _HMACContext 

44from cryptography.hazmat.backends.openssl.poly1305 import ( 

45 _POLY1305_KEY_SIZE, 

46 _Poly1305Context, 

47) 

48from cryptography.hazmat.backends.openssl.rsa import ( 

49 _RSAPrivateKey, 

50 _RSAPublicKey, 

51) 

52from cryptography.hazmat.backends.openssl.x448 import ( 

53 _X448PrivateKey, 

54 _X448PublicKey, 

55) 

56from cryptography.hazmat.bindings._rust import openssl as rust_openssl 

57from cryptography.hazmat.bindings.openssl import binding 

58from cryptography.hazmat.primitives import hashes, serialization 

59from cryptography.hazmat.primitives._asymmetric import AsymmetricPadding 

60from cryptography.hazmat.primitives.asymmetric import ( 

61 dh, 

62 dsa, 

63 ec, 

64 ed448, 

65 ed25519, 

66 rsa, 

67 x448, 

68 x25519, 

69) 

70from cryptography.hazmat.primitives.asymmetric.padding import ( 

71 MGF1, 

72 OAEP, 

73 PSS, 

74 PKCS1v15, 

75) 

76from cryptography.hazmat.primitives.asymmetric.types import ( 

77 PrivateKeyTypes, 

78 PublicKeyTypes, 

79) 

80from cryptography.hazmat.primitives.ciphers import ( 

81 BlockCipherAlgorithm, 

82 CipherAlgorithm, 

83) 

84from cryptography.hazmat.primitives.ciphers.algorithms import ( 

85 AES, 

86 AES128, 

87 AES256, 

88 ARC4, 

89 SM4, 

90 Camellia, 

91 ChaCha20, 

92 TripleDES, 

93 _BlowfishInternal, 

94 _CAST5Internal, 

95 _IDEAInternal, 

96 _SEEDInternal, 

97) 

98from cryptography.hazmat.primitives.ciphers.modes import ( 

99 CBC, 

100 CFB, 

101 CFB8, 

102 CTR, 

103 ECB, 

104 GCM, 

105 OFB, 

106 XTS, 

107 Mode, 

108) 

109from cryptography.hazmat.primitives.kdf import scrypt 

110from cryptography.hazmat.primitives.serialization import ssh 

111from cryptography.hazmat.primitives.serialization.pkcs12 import ( 

112 PBES, 

113 PKCS12Certificate, 

114 PKCS12KeyAndCertificates, 

115 PKCS12PrivateKeyTypes, 

116 _PKCS12CATypes, 

117) 

118 

119_MemoryBIO = collections.namedtuple("_MemoryBIO", ["bio", "char_ptr"]) 

120 

121 

122# Not actually supported, just used as a marker for some serialization tests. 

123class _RC2: 

124 pass 

125 

126 

127class Backend: 

128 """ 

129 OpenSSL API binding interfaces. 

130 """ 

131 

132 name = "openssl" 

133 

134 # FIPS has opinions about acceptable algorithms and key sizes, but the 

135 # disallowed algorithms are still present in OpenSSL. They just error if 

136 # you try to use them. To avoid that we allowlist the algorithms in 

137 # FIPS 140-3. This isn't ideal, but FIPS 140-3 is trash so here we are. 

138 _fips_aead = { 

139 b"aes-128-ccm", 

140 b"aes-192-ccm", 

141 b"aes-256-ccm", 

142 b"aes-128-gcm", 

143 b"aes-192-gcm", 

144 b"aes-256-gcm", 

145 } 

146 # TripleDES encryption is disallowed/deprecated throughout 2023 in 

147 # FIPS 140-3. To keep it simple we denylist any use of TripleDES (TDEA). 

148 _fips_ciphers = (AES,) 

149 # Sometimes SHA1 is still permissible. That logic is contained 

150 # within the various *_supported methods. 

151 _fips_hashes = ( 

152 hashes.SHA224, 

153 hashes.SHA256, 

154 hashes.SHA384, 

155 hashes.SHA512, 

156 hashes.SHA512_224, 

157 hashes.SHA512_256, 

158 hashes.SHA3_224, 

159 hashes.SHA3_256, 

160 hashes.SHA3_384, 

161 hashes.SHA3_512, 

162 hashes.SHAKE128, 

163 hashes.SHAKE256, 

164 ) 

165 _fips_ecdh_curves = ( 

166 ec.SECP224R1, 

167 ec.SECP256R1, 

168 ec.SECP384R1, 

169 ec.SECP521R1, 

170 ) 

171 _fips_rsa_min_key_size = 2048 

172 _fips_rsa_min_public_exponent = 65537 

173 _fips_dsa_min_modulus = 1 << 2048 

174 _fips_dh_min_key_size = 2048 

175 _fips_dh_min_modulus = 1 << _fips_dh_min_key_size 

176 

177 def __init__(self) -> None: 

178 self._binding = binding.Binding() 

179 self._ffi = self._binding.ffi 

180 self._lib = self._binding.lib 

181 self._fips_enabled = self._is_fips_enabled() 

182 

183 self._cipher_registry: typing.Dict[ 

184 typing.Tuple[typing.Type[CipherAlgorithm], typing.Type[Mode]], 

185 typing.Callable, 

186 ] = {} 

187 self._register_default_ciphers() 

188 if self._fips_enabled and self._lib.CRYPTOGRAPHY_NEEDS_OSRANDOM_ENGINE: 

189 warnings.warn( 

190 "OpenSSL FIPS mode is enabled. Can't enable DRBG fork safety.", 

191 UserWarning, 

192 ) 

193 else: 

194 self.activate_osrandom_engine() 

195 self._dh_types = [self._lib.EVP_PKEY_DH] 

196 if self._lib.Cryptography_HAS_EVP_PKEY_DHX: 

197 self._dh_types.append(self._lib.EVP_PKEY_DHX) 

198 

199 def __repr__(self) -> str: 

200 return "<OpenSSLBackend(version: {}, FIPS: {}, Legacy: {})>".format( 

201 self.openssl_version_text(), 

202 self._fips_enabled, 

203 self._binding._legacy_provider_loaded, 

204 ) 

205 

206 def openssl_assert( 

207 self, 

208 ok: bool, 

209 errors: typing.Optional[typing.List[rust_openssl.OpenSSLError]] = None, 

210 ) -> None: 

211 return binding._openssl_assert(self._lib, ok, errors=errors) 

212 

213 def _is_fips_enabled(self) -> bool: 

214 if self._lib.Cryptography_HAS_300_FIPS: 

215 mode = self._lib.EVP_default_properties_is_fips_enabled( 

216 self._ffi.NULL 

217 ) 

218 else: 

219 mode = self._lib.FIPS_mode() 

220 

221 if mode == 0: 

222 # OpenSSL without FIPS pushes an error on the error stack 

223 self._lib.ERR_clear_error() 

224 return bool(mode) 

225 

226 def _enable_fips(self) -> None: 

227 # This function enables FIPS mode for OpenSSL 3.0.0 on installs that 

228 # have the FIPS provider installed properly. 

229 self._binding._enable_fips() 

230 assert self._is_fips_enabled() 

231 self._fips_enabled = self._is_fips_enabled() 

232 

233 def activate_builtin_random(self) -> None: 

234 if self._lib.CRYPTOGRAPHY_NEEDS_OSRANDOM_ENGINE: 

235 # Obtain a new structural reference. 

236 e = self._lib.ENGINE_get_default_RAND() 

237 if e != self._ffi.NULL: 

238 self._lib.ENGINE_unregister_RAND(e) 

239 # Reset the RNG to use the built-in. 

240 res = self._lib.RAND_set_rand_method(self._ffi.NULL) 

241 self.openssl_assert(res == 1) 

242 # decrement the structural reference from get_default_RAND 

243 res = self._lib.ENGINE_finish(e) 

244 self.openssl_assert(res == 1) 

245 

246 @contextlib.contextmanager 

247 def _get_osurandom_engine(self): 

248 # Fetches an engine by id and returns it. This creates a structural 

249 # reference. 

250 e = self._lib.ENGINE_by_id(self._lib.Cryptography_osrandom_engine_id) 

251 self.openssl_assert(e != self._ffi.NULL) 

252 # Initialize the engine for use. This adds a functional reference. 

253 res = self._lib.ENGINE_init(e) 

254 self.openssl_assert(res == 1) 

255 

256 try: 

257 yield e 

258 finally: 

259 # Decrement the structural ref incremented by ENGINE_by_id. 

260 res = self._lib.ENGINE_free(e) 

261 self.openssl_assert(res == 1) 

262 # Decrement the functional ref incremented by ENGINE_init. 

263 res = self._lib.ENGINE_finish(e) 

264 self.openssl_assert(res == 1) 

265 

266 def activate_osrandom_engine(self) -> None: 

267 if self._lib.CRYPTOGRAPHY_NEEDS_OSRANDOM_ENGINE: 

268 # Unregister and free the current engine. 

269 self.activate_builtin_random() 

270 with self._get_osurandom_engine() as e: 

271 # Set the engine as the default RAND provider. 

272 res = self._lib.ENGINE_set_default_RAND(e) 

273 self.openssl_assert(res == 1) 

274 # Reset the RNG to use the engine 

275 res = self._lib.RAND_set_rand_method(self._ffi.NULL) 

276 self.openssl_assert(res == 1) 

277 

278 def osrandom_engine_implementation(self) -> str: 

279 buf = self._ffi.new("char[]", 64) 

280 with self._get_osurandom_engine() as e: 

281 res = self._lib.ENGINE_ctrl_cmd( 

282 e, b"get_implementation", len(buf), buf, self._ffi.NULL, 0 

283 ) 

284 self.openssl_assert(res > 0) 

285 return self._ffi.string(buf).decode("ascii") 

286 

287 def openssl_version_text(self) -> str: 

288 """ 

289 Friendly string name of the loaded OpenSSL library. This is not 

290 necessarily the same version as it was compiled against. 

291 

292 Example: OpenSSL 1.1.1d 10 Sep 2019 

293 """ 

294 return self._ffi.string( 

295 self._lib.OpenSSL_version(self._lib.OPENSSL_VERSION) 

296 ).decode("ascii") 

297 

298 def openssl_version_number(self) -> int: 

299 return self._lib.OpenSSL_version_num() 

300 

301 def create_hmac_ctx( 

302 self, key: bytes, algorithm: hashes.HashAlgorithm 

303 ) -> _HMACContext: 

304 return _HMACContext(self, key, algorithm) 

305 

306 def _evp_md_from_algorithm(self, algorithm: hashes.HashAlgorithm): 

307 if algorithm.name == "blake2b" or algorithm.name == "blake2s": 

308 alg = "{}{}".format( 

309 algorithm.name, algorithm.digest_size * 8 

310 ).encode("ascii") 

311 else: 

312 alg = algorithm.name.encode("ascii") 

313 

314 evp_md = self._lib.EVP_get_digestbyname(alg) 

315 return evp_md 

316 

317 def _evp_md_non_null_from_algorithm(self, algorithm: hashes.HashAlgorithm): 

318 evp_md = self._evp_md_from_algorithm(algorithm) 

319 self.openssl_assert(evp_md != self._ffi.NULL) 

320 return evp_md 

321 

322 def hash_supported(self, algorithm: hashes.HashAlgorithm) -> bool: 

323 if self._fips_enabled and not isinstance(algorithm, self._fips_hashes): 

324 return False 

325 

326 evp_md = self._evp_md_from_algorithm(algorithm) 

327 return evp_md != self._ffi.NULL 

328 

329 def signature_hash_supported( 

330 self, algorithm: hashes.HashAlgorithm 

331 ) -> bool: 

332 # Dedicated check for hashing algorithm use in message digest for 

333 # signatures, e.g. RSA PKCS#1 v1.5 SHA1 (sha1WithRSAEncryption). 

334 if self._fips_enabled and isinstance(algorithm, hashes.SHA1): 

335 return False 

336 return self.hash_supported(algorithm) 

337 

338 def scrypt_supported(self) -> bool: 

339 if self._fips_enabled: 

340 return False 

341 else: 

342 return self._lib.Cryptography_HAS_SCRYPT == 1 

343 

344 def hmac_supported(self, algorithm: hashes.HashAlgorithm) -> bool: 

345 # FIPS mode still allows SHA1 for HMAC 

346 if self._fips_enabled and isinstance(algorithm, hashes.SHA1): 

347 return True 

348 

349 return self.hash_supported(algorithm) 

350 

351 def create_hash_ctx( 

352 self, algorithm: hashes.HashAlgorithm 

353 ) -> hashes.HashContext: 

354 return _HashContext(self, algorithm) 

355 

356 def cipher_supported(self, cipher: CipherAlgorithm, mode: Mode) -> bool: 

357 if self._fips_enabled: 

358 # FIPS mode requires AES. TripleDES is disallowed/deprecated in 

359 # FIPS 140-3. 

360 if not isinstance(cipher, self._fips_ciphers): 

361 return False 

362 

363 try: 

364 adapter = self._cipher_registry[type(cipher), type(mode)] 

365 except KeyError: 

366 return False 

367 evp_cipher = adapter(self, cipher, mode) 

368 return self._ffi.NULL != evp_cipher 

369 

370 def register_cipher_adapter(self, cipher_cls, mode_cls, adapter) -> None: 

371 if (cipher_cls, mode_cls) in self._cipher_registry: 

372 raise ValueError( 

373 "Duplicate registration for: {} {}.".format( 

374 cipher_cls, mode_cls 

375 ) 

376 ) 

377 self._cipher_registry[cipher_cls, mode_cls] = adapter 

378 

379 def _register_default_ciphers(self) -> None: 

380 for cipher_cls in [AES, AES128, AES256]: 

381 for mode_cls in [CBC, CTR, ECB, OFB, CFB, CFB8, GCM]: 

382 self.register_cipher_adapter( 

383 cipher_cls, 

384 mode_cls, 

385 GetCipherByName( 

386 "{cipher.name}-{cipher.key_size}-{mode.name}" 

387 ), 

388 ) 

389 for mode_cls in [CBC, CTR, ECB, OFB, CFB]: 

390 self.register_cipher_adapter( 

391 Camellia, 

392 mode_cls, 

393 GetCipherByName("{cipher.name}-{cipher.key_size}-{mode.name}"), 

394 ) 

395 for mode_cls in [CBC, CFB, CFB8, OFB]: 

396 self.register_cipher_adapter( 

397 TripleDES, mode_cls, GetCipherByName("des-ede3-{mode.name}") 

398 ) 

399 self.register_cipher_adapter( 

400 TripleDES, ECB, GetCipherByName("des-ede3") 

401 ) 

402 self.register_cipher_adapter( 

403 ChaCha20, type(None), GetCipherByName("chacha20") 

404 ) 

405 self.register_cipher_adapter(AES, XTS, _get_xts_cipher) 

406 for mode_cls in [ECB, CBC, OFB, CFB, CTR]: 

407 self.register_cipher_adapter( 

408 SM4, mode_cls, GetCipherByName("sm4-{mode.name}") 

409 ) 

410 # Don't register legacy ciphers if they're unavailable. Hypothetically 

411 # this wouldn't be necessary because we test availability by seeing if 

412 # we get an EVP_CIPHER * in the _CipherContext __init__, but OpenSSL 3 

413 # will return a valid pointer even though the cipher is unavailable. 

414 if ( 

415 self._binding._legacy_provider_loaded 

416 or not self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER 

417 ): 

418 for mode_cls in [CBC, CFB, OFB, ECB]: 

419 self.register_cipher_adapter( 

420 _BlowfishInternal, 

421 mode_cls, 

422 GetCipherByName("bf-{mode.name}"), 

423 ) 

424 for mode_cls in [CBC, CFB, OFB, ECB]: 

425 self.register_cipher_adapter( 

426 _SEEDInternal, 

427 mode_cls, 

428 GetCipherByName("seed-{mode.name}"), 

429 ) 

430 for cipher_cls, mode_cls in itertools.product( 

431 [_CAST5Internal, _IDEAInternal], 

432 [CBC, OFB, CFB, ECB], 

433 ): 

434 self.register_cipher_adapter( 

435 cipher_cls, 

436 mode_cls, 

437 GetCipherByName("{cipher.name}-{mode.name}"), 

438 ) 

439 self.register_cipher_adapter( 

440 ARC4, type(None), GetCipherByName("rc4") 

441 ) 

442 # We don't actually support RC2, this is just used by some tests. 

443 self.register_cipher_adapter( 

444 _RC2, type(None), GetCipherByName("rc2") 

445 ) 

446 

447 def create_symmetric_encryption_ctx( 

448 self, cipher: CipherAlgorithm, mode: Mode 

449 ) -> _CipherContext: 

450 return _CipherContext(self, cipher, mode, _CipherContext._ENCRYPT) 

451 

452 def create_symmetric_decryption_ctx( 

453 self, cipher: CipherAlgorithm, mode: Mode 

454 ) -> _CipherContext: 

455 return _CipherContext(self, cipher, mode, _CipherContext._DECRYPT) 

456 

457 def pbkdf2_hmac_supported(self, algorithm: hashes.HashAlgorithm) -> bool: 

458 return self.hmac_supported(algorithm) 

459 

460 def derive_pbkdf2_hmac( 

461 self, 

462 algorithm: hashes.HashAlgorithm, 

463 length: int, 

464 salt: bytes, 

465 iterations: int, 

466 key_material: bytes, 

467 ) -> bytes: 

468 buf = self._ffi.new("unsigned char[]", length) 

469 evp_md = self._evp_md_non_null_from_algorithm(algorithm) 

470 key_material_ptr = self._ffi.from_buffer(key_material) 

471 res = self._lib.PKCS5_PBKDF2_HMAC( 

472 key_material_ptr, 

473 len(key_material), 

474 salt, 

475 len(salt), 

476 iterations, 

477 evp_md, 

478 length, 

479 buf, 

480 ) 

481 self.openssl_assert(res == 1) 

482 return self._ffi.buffer(buf)[:] 

483 

484 def _consume_errors(self) -> typing.List[rust_openssl.OpenSSLError]: 

485 return rust_openssl.capture_error_stack() 

486 

487 def _bn_to_int(self, bn) -> int: 

488 assert bn != self._ffi.NULL 

489 self.openssl_assert(not self._lib.BN_is_negative(bn)) 

490 

491 bn_num_bytes = self._lib.BN_num_bytes(bn) 

492 bin_ptr = self._ffi.new("unsigned char[]", bn_num_bytes) 

493 bin_len = self._lib.BN_bn2bin(bn, bin_ptr) 

494 # A zero length means the BN has value 0 

495 self.openssl_assert(bin_len >= 0) 

496 val = int.from_bytes(self._ffi.buffer(bin_ptr)[:bin_len], "big") 

497 return val 

498 

499 def _int_to_bn(self, num: int, bn=None): 

500 """ 

501 Converts a python integer to a BIGNUM. The returned BIGNUM will not 

502 be garbage collected (to support adding them to structs that take 

503 ownership of the object). Be sure to register it for GC if it will 

504 be discarded after use. 

505 """ 

506 assert bn is None or bn != self._ffi.NULL 

507 

508 if bn is None: 

509 bn = self._ffi.NULL 

510 

511 binary = num.to_bytes(int(num.bit_length() / 8.0 + 1), "big") 

512 bn_ptr = self._lib.BN_bin2bn(binary, len(binary), bn) 

513 self.openssl_assert(bn_ptr != self._ffi.NULL) 

514 return bn_ptr 

515 

516 def generate_rsa_private_key( 

517 self, public_exponent: int, key_size: int 

518 ) -> rsa.RSAPrivateKey: 

519 rsa._verify_rsa_parameters(public_exponent, key_size) 

520 

521 rsa_cdata = self._lib.RSA_new() 

522 self.openssl_assert(rsa_cdata != self._ffi.NULL) 

523 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) 

524 

525 bn = self._int_to_bn(public_exponent) 

526 bn = self._ffi.gc(bn, self._lib.BN_free) 

527 

528 res = self._lib.RSA_generate_key_ex( 

529 rsa_cdata, key_size, bn, self._ffi.NULL 

530 ) 

531 self.openssl_assert(res == 1) 

532 evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata) 

533 

534 # We can skip RSA key validation here since we just generated the key 

535 return _RSAPrivateKey( 

536 self, rsa_cdata, evp_pkey, unsafe_skip_rsa_key_validation=True 

537 ) 

538 

539 def generate_rsa_parameters_supported( 

540 self, public_exponent: int, key_size: int 

541 ) -> bool: 

542 return ( 

543 public_exponent >= 3 

544 and public_exponent & 1 != 0 

545 and key_size >= 512 

546 ) 

547 

548 def load_rsa_private_numbers( 

549 self, 

550 numbers: rsa.RSAPrivateNumbers, 

551 unsafe_skip_rsa_key_validation: bool, 

552 ) -> rsa.RSAPrivateKey: 

553 rsa._check_private_key_components( 

554 numbers.p, 

555 numbers.q, 

556 numbers.d, 

557 numbers.dmp1, 

558 numbers.dmq1, 

559 numbers.iqmp, 

560 numbers.public_numbers.e, 

561 numbers.public_numbers.n, 

562 ) 

563 rsa_cdata = self._lib.RSA_new() 

564 self.openssl_assert(rsa_cdata != self._ffi.NULL) 

565 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) 

566 p = self._int_to_bn(numbers.p) 

567 q = self._int_to_bn(numbers.q) 

568 d = self._int_to_bn(numbers.d) 

569 dmp1 = self._int_to_bn(numbers.dmp1) 

570 dmq1 = self._int_to_bn(numbers.dmq1) 

571 iqmp = self._int_to_bn(numbers.iqmp) 

572 e = self._int_to_bn(numbers.public_numbers.e) 

573 n = self._int_to_bn(numbers.public_numbers.n) 

574 res = self._lib.RSA_set0_factors(rsa_cdata, p, q) 

575 self.openssl_assert(res == 1) 

576 res = self._lib.RSA_set0_key(rsa_cdata, n, e, d) 

577 self.openssl_assert(res == 1) 

578 res = self._lib.RSA_set0_crt_params(rsa_cdata, dmp1, dmq1, iqmp) 

579 self.openssl_assert(res == 1) 

580 evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata) 

581 

582 return _RSAPrivateKey( 

583 self, 

584 rsa_cdata, 

585 evp_pkey, 

586 unsafe_skip_rsa_key_validation=unsafe_skip_rsa_key_validation, 

587 ) 

588 

589 def load_rsa_public_numbers( 

590 self, numbers: rsa.RSAPublicNumbers 

591 ) -> rsa.RSAPublicKey: 

592 rsa._check_public_key_components(numbers.e, numbers.n) 

593 rsa_cdata = self._lib.RSA_new() 

594 self.openssl_assert(rsa_cdata != self._ffi.NULL) 

595 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) 

596 e = self._int_to_bn(numbers.e) 

597 n = self._int_to_bn(numbers.n) 

598 res = self._lib.RSA_set0_key(rsa_cdata, n, e, self._ffi.NULL) 

599 self.openssl_assert(res == 1) 

600 evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata) 

601 

602 return _RSAPublicKey(self, rsa_cdata, evp_pkey) 

603 

604 def _create_evp_pkey_gc(self): 

605 evp_pkey = self._lib.EVP_PKEY_new() 

606 self.openssl_assert(evp_pkey != self._ffi.NULL) 

607 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 

608 return evp_pkey 

609 

610 def _rsa_cdata_to_evp_pkey(self, rsa_cdata): 

611 evp_pkey = self._create_evp_pkey_gc() 

612 res = self._lib.EVP_PKEY_set1_RSA(evp_pkey, rsa_cdata) 

613 self.openssl_assert(res == 1) 

614 return evp_pkey 

615 

616 def _bytes_to_bio(self, data: bytes) -> _MemoryBIO: 

617 """ 

618 Return a _MemoryBIO namedtuple of (BIO, char*). 

619 

620 The char* is the storage for the BIO and it must stay alive until the 

621 BIO is finished with. 

622 """ 

623 data_ptr = self._ffi.from_buffer(data) 

624 bio = self._lib.BIO_new_mem_buf(data_ptr, len(data)) 

625 self.openssl_assert(bio != self._ffi.NULL) 

626 

627 return _MemoryBIO(self._ffi.gc(bio, self._lib.BIO_free), data_ptr) 

628 

629 def _create_mem_bio_gc(self): 

630 """ 

631 Creates an empty memory BIO. 

632 """ 

633 bio_method = self._lib.BIO_s_mem() 

634 self.openssl_assert(bio_method != self._ffi.NULL) 

635 bio = self._lib.BIO_new(bio_method) 

636 self.openssl_assert(bio != self._ffi.NULL) 

637 bio = self._ffi.gc(bio, self._lib.BIO_free) 

638 return bio 

639 

640 def _read_mem_bio(self, bio) -> bytes: 

641 """ 

642 Reads a memory BIO. This only works on memory BIOs. 

643 """ 

644 buf = self._ffi.new("char **") 

645 buf_len = self._lib.BIO_get_mem_data(bio, buf) 

646 self.openssl_assert(buf_len > 0) 

647 self.openssl_assert(buf[0] != self._ffi.NULL) 

648 bio_data = self._ffi.buffer(buf[0], buf_len)[:] 

649 return bio_data 

650 

651 def _evp_pkey_to_private_key( 

652 self, evp_pkey, unsafe_skip_rsa_key_validation: bool 

653 ) -> PrivateKeyTypes: 

654 """ 

655 Return the appropriate type of PrivateKey given an evp_pkey cdata 

656 pointer. 

657 """ 

658 

659 key_type = self._lib.EVP_PKEY_id(evp_pkey) 

660 

661 if key_type == self._lib.EVP_PKEY_RSA: 

662 rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey) 

663 self.openssl_assert(rsa_cdata != self._ffi.NULL) 

664 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) 

665 return _RSAPrivateKey( 

666 self, 

667 rsa_cdata, 

668 evp_pkey, 

669 unsafe_skip_rsa_key_validation=unsafe_skip_rsa_key_validation, 

670 ) 

671 elif ( 

672 key_type == self._lib.EVP_PKEY_RSA_PSS 

673 and not self._lib.CRYPTOGRAPHY_IS_LIBRESSL 

674 and not self._lib.CRYPTOGRAPHY_IS_BORINGSSL 

675 and not self._lib.CRYPTOGRAPHY_OPENSSL_LESS_THAN_111E 

676 ): 

677 # At the moment the way we handle RSA PSS keys is to strip the 

678 # PSS constraints from them and treat them as normal RSA keys 

679 # Unfortunately the RSA * itself tracks this data so we need to 

680 # extract, serialize, and reload it without the constraints. 

681 rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey) 

682 self.openssl_assert(rsa_cdata != self._ffi.NULL) 

683 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) 

684 bio = self._create_mem_bio_gc() 

685 res = self._lib.i2d_RSAPrivateKey_bio(bio, rsa_cdata) 

686 self.openssl_assert(res == 1) 

687 return self.load_der_private_key( 

688 self._read_mem_bio(bio), 

689 password=None, 

690 unsafe_skip_rsa_key_validation=unsafe_skip_rsa_key_validation, 

691 ) 

692 elif key_type == self._lib.EVP_PKEY_DSA: 

693 dsa_cdata = self._lib.EVP_PKEY_get1_DSA(evp_pkey) 

694 self.openssl_assert(dsa_cdata != self._ffi.NULL) 

695 dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free) 

696 return _DSAPrivateKey(self, dsa_cdata, evp_pkey) 

697 elif key_type == self._lib.EVP_PKEY_EC: 

698 ec_cdata = self._lib.EVP_PKEY_get1_EC_KEY(evp_pkey) 

699 self.openssl_assert(ec_cdata != self._ffi.NULL) 

700 ec_cdata = self._ffi.gc(ec_cdata, self._lib.EC_KEY_free) 

701 return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey) 

702 elif key_type in self._dh_types: 

703 dh_cdata = self._lib.EVP_PKEY_get1_DH(evp_pkey) 

704 self.openssl_assert(dh_cdata != self._ffi.NULL) 

705 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free) 

706 return _DHPrivateKey(self, dh_cdata, evp_pkey) 

707 elif key_type == getattr(self._lib, "EVP_PKEY_ED25519", None): 

708 # EVP_PKEY_ED25519 is not present in CRYPTOGRAPHY_IS_LIBRESSL 

709 return _Ed25519PrivateKey(self, evp_pkey) 

710 elif key_type == getattr(self._lib, "EVP_PKEY_X448", None): 

711 # EVP_PKEY_X448 is not present in CRYPTOGRAPHY_IS_LIBRESSL 

712 return _X448PrivateKey(self, evp_pkey) 

713 elif key_type == self._lib.EVP_PKEY_X25519: 

714 return rust_openssl.x25519.private_key_from_ptr( 

715 int(self._ffi.cast("uintptr_t", evp_pkey)) 

716 ) 

717 elif key_type == getattr(self._lib, "EVP_PKEY_ED448", None): 

718 # EVP_PKEY_ED448 is not present in CRYPTOGRAPHY_IS_LIBRESSL 

719 return _Ed448PrivateKey(self, evp_pkey) 

720 else: 

721 raise UnsupportedAlgorithm("Unsupported key type.") 

722 

723 def _evp_pkey_to_public_key(self, evp_pkey) -> PublicKeyTypes: 

724 """ 

725 Return the appropriate type of PublicKey given an evp_pkey cdata 

726 pointer. 

727 """ 

728 

729 key_type = self._lib.EVP_PKEY_id(evp_pkey) 

730 

731 if key_type == self._lib.EVP_PKEY_RSA: 

732 rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey) 

733 self.openssl_assert(rsa_cdata != self._ffi.NULL) 

734 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) 

735 return _RSAPublicKey(self, rsa_cdata, evp_pkey) 

736 elif ( 

737 key_type == self._lib.EVP_PKEY_RSA_PSS 

738 and not self._lib.CRYPTOGRAPHY_IS_LIBRESSL 

739 and not self._lib.CRYPTOGRAPHY_IS_BORINGSSL 

740 and not self._lib.CRYPTOGRAPHY_OPENSSL_LESS_THAN_111E 

741 ): 

742 rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey) 

743 self.openssl_assert(rsa_cdata != self._ffi.NULL) 

744 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) 

745 bio = self._create_mem_bio_gc() 

746 res = self._lib.i2d_RSAPublicKey_bio(bio, rsa_cdata) 

747 self.openssl_assert(res == 1) 

748 return self.load_der_public_key(self._read_mem_bio(bio)) 

749 elif key_type == self._lib.EVP_PKEY_DSA: 

750 dsa_cdata = self._lib.EVP_PKEY_get1_DSA(evp_pkey) 

751 self.openssl_assert(dsa_cdata != self._ffi.NULL) 

752 dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free) 

753 return _DSAPublicKey(self, dsa_cdata, evp_pkey) 

754 elif key_type == self._lib.EVP_PKEY_EC: 

755 ec_cdata = self._lib.EVP_PKEY_get1_EC_KEY(evp_pkey) 

756 if ec_cdata == self._ffi.NULL: 

757 errors = self._consume_errors() 

758 raise ValueError("Unable to load EC key", errors) 

759 ec_cdata = self._ffi.gc(ec_cdata, self._lib.EC_KEY_free) 

760 return _EllipticCurvePublicKey(self, ec_cdata, evp_pkey) 

761 elif key_type in self._dh_types: 

762 dh_cdata = self._lib.EVP_PKEY_get1_DH(evp_pkey) 

763 self.openssl_assert(dh_cdata != self._ffi.NULL) 

764 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free) 

765 return _DHPublicKey(self, dh_cdata, evp_pkey) 

766 elif key_type == getattr(self._lib, "EVP_PKEY_ED25519", None): 

767 # EVP_PKEY_ED25519 is not present in CRYPTOGRAPHY_IS_LIBRESSL 

768 return _Ed25519PublicKey(self, evp_pkey) 

769 elif key_type == getattr(self._lib, "EVP_PKEY_X448", None): 

770 # EVP_PKEY_X448 is not present in CRYPTOGRAPHY_IS_LIBRESSL 

771 return _X448PublicKey(self, evp_pkey) 

772 elif key_type == self._lib.EVP_PKEY_X25519: 

773 return rust_openssl.x25519.public_key_from_ptr( 

774 int(self._ffi.cast("uintptr_t", evp_pkey)) 

775 ) 

776 elif key_type == getattr(self._lib, "EVP_PKEY_ED448", None): 

777 # EVP_PKEY_ED448 is not present in CRYPTOGRAPHY_IS_LIBRESSL 

778 return _Ed448PublicKey(self, evp_pkey) 

779 else: 

780 raise UnsupportedAlgorithm("Unsupported key type.") 

781 

782 def _oaep_hash_supported(self, algorithm: hashes.HashAlgorithm) -> bool: 

783 if self._fips_enabled and isinstance(algorithm, hashes.SHA1): 

784 return False 

785 

786 return isinstance( 

787 algorithm, 

788 ( 

789 hashes.SHA1, 

790 hashes.SHA224, 

791 hashes.SHA256, 

792 hashes.SHA384, 

793 hashes.SHA512, 

794 ), 

795 ) 

796 

797 def rsa_padding_supported(self, padding: AsymmetricPadding) -> bool: 

798 if isinstance(padding, PKCS1v15): 

799 return True 

800 elif isinstance(padding, PSS) and isinstance(padding._mgf, MGF1): 

801 # SHA1 is permissible in MGF1 in FIPS even when SHA1 is blocked 

802 # as signature algorithm. 

803 if self._fips_enabled and isinstance( 

804 padding._mgf._algorithm, hashes.SHA1 

805 ): 

806 return True 

807 else: 

808 return self.hash_supported(padding._mgf._algorithm) 

809 elif isinstance(padding, OAEP) and isinstance(padding._mgf, MGF1): 

810 return self._oaep_hash_supported( 

811 padding._mgf._algorithm 

812 ) and self._oaep_hash_supported(padding._algorithm) 

813 else: 

814 return False 

815 

816 def rsa_encryption_supported(self, padding: AsymmetricPadding) -> bool: 

817 if self._fips_enabled and isinstance(padding, PKCS1v15): 

818 return False 

819 else: 

820 return self.rsa_padding_supported(padding) 

821 

822 def generate_dsa_parameters(self, key_size: int) -> dsa.DSAParameters: 

823 if key_size not in (1024, 2048, 3072, 4096): 

824 raise ValueError( 

825 "Key size must be 1024, 2048, 3072, or 4096 bits." 

826 ) 

827 

828 ctx = self._lib.DSA_new() 

829 self.openssl_assert(ctx != self._ffi.NULL) 

830 ctx = self._ffi.gc(ctx, self._lib.DSA_free) 

831 

832 res = self._lib.DSA_generate_parameters_ex( 

833 ctx, 

834 key_size, 

835 self._ffi.NULL, 

836 0, 

837 self._ffi.NULL, 

838 self._ffi.NULL, 

839 self._ffi.NULL, 

840 ) 

841 

842 self.openssl_assert(res == 1) 

843 

844 return _DSAParameters(self, ctx) 

845 

846 def generate_dsa_private_key( 

847 self, parameters: dsa.DSAParameters 

848 ) -> dsa.DSAPrivateKey: 

849 ctx = self._lib.DSAparams_dup( 

850 parameters._dsa_cdata # type: ignore[attr-defined] 

851 ) 

852 self.openssl_assert(ctx != self._ffi.NULL) 

853 ctx = self._ffi.gc(ctx, self._lib.DSA_free) 

854 self._lib.DSA_generate_key(ctx) 

855 evp_pkey = self._dsa_cdata_to_evp_pkey(ctx) 

856 

857 return _DSAPrivateKey(self, ctx, evp_pkey) 

858 

859 def generate_dsa_private_key_and_parameters( 

860 self, key_size: int 

861 ) -> dsa.DSAPrivateKey: 

862 parameters = self.generate_dsa_parameters(key_size) 

863 return self.generate_dsa_private_key(parameters) 

864 

865 def _dsa_cdata_set_values( 

866 self, dsa_cdata, p, q, g, pub_key, priv_key 

867 ) -> None: 

868 res = self._lib.DSA_set0_pqg(dsa_cdata, p, q, g) 

869 self.openssl_assert(res == 1) 

870 res = self._lib.DSA_set0_key(dsa_cdata, pub_key, priv_key) 

871 self.openssl_assert(res == 1) 

872 

873 def load_dsa_private_numbers( 

874 self, numbers: dsa.DSAPrivateNumbers 

875 ) -> dsa.DSAPrivateKey: 

876 dsa._check_dsa_private_numbers(numbers) 

877 parameter_numbers = numbers.public_numbers.parameter_numbers 

878 

879 dsa_cdata = self._lib.DSA_new() 

880 self.openssl_assert(dsa_cdata != self._ffi.NULL) 

881 dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free) 

882 

883 p = self._int_to_bn(parameter_numbers.p) 

884 q = self._int_to_bn(parameter_numbers.q) 

885 g = self._int_to_bn(parameter_numbers.g) 

886 pub_key = self._int_to_bn(numbers.public_numbers.y) 

887 priv_key = self._int_to_bn(numbers.x) 

888 self._dsa_cdata_set_values(dsa_cdata, p, q, g, pub_key, priv_key) 

889 

890 evp_pkey = self._dsa_cdata_to_evp_pkey(dsa_cdata) 

891 

892 return _DSAPrivateKey(self, dsa_cdata, evp_pkey) 

893 

894 def load_dsa_public_numbers( 

895 self, numbers: dsa.DSAPublicNumbers 

896 ) -> dsa.DSAPublicKey: 

897 dsa._check_dsa_parameters(numbers.parameter_numbers) 

898 dsa_cdata = self._lib.DSA_new() 

899 self.openssl_assert(dsa_cdata != self._ffi.NULL) 

900 dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free) 

901 

902 p = self._int_to_bn(numbers.parameter_numbers.p) 

903 q = self._int_to_bn(numbers.parameter_numbers.q) 

904 g = self._int_to_bn(numbers.parameter_numbers.g) 

905 pub_key = self._int_to_bn(numbers.y) 

906 priv_key = self._ffi.NULL 

907 self._dsa_cdata_set_values(dsa_cdata, p, q, g, pub_key, priv_key) 

908 

909 evp_pkey = self._dsa_cdata_to_evp_pkey(dsa_cdata) 

910 

911 return _DSAPublicKey(self, dsa_cdata, evp_pkey) 

912 

913 def load_dsa_parameter_numbers( 

914 self, numbers: dsa.DSAParameterNumbers 

915 ) -> dsa.DSAParameters: 

916 dsa._check_dsa_parameters(numbers) 

917 dsa_cdata = self._lib.DSA_new() 

918 self.openssl_assert(dsa_cdata != self._ffi.NULL) 

919 dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free) 

920 

921 p = self._int_to_bn(numbers.p) 

922 q = self._int_to_bn(numbers.q) 

923 g = self._int_to_bn(numbers.g) 

924 res = self._lib.DSA_set0_pqg(dsa_cdata, p, q, g) 

925 self.openssl_assert(res == 1) 

926 

927 return _DSAParameters(self, dsa_cdata) 

928 

929 def _dsa_cdata_to_evp_pkey(self, dsa_cdata): 

930 evp_pkey = self._create_evp_pkey_gc() 

931 res = self._lib.EVP_PKEY_set1_DSA(evp_pkey, dsa_cdata) 

932 self.openssl_assert(res == 1) 

933 return evp_pkey 

934 

935 def dsa_supported(self) -> bool: 

936 return not self._fips_enabled 

937 

938 def dsa_hash_supported(self, algorithm: hashes.HashAlgorithm) -> bool: 

939 if not self.dsa_supported(): 

940 return False 

941 return self.signature_hash_supported(algorithm) 

942 

943 def cmac_algorithm_supported(self, algorithm) -> bool: 

944 return self.cipher_supported( 

945 algorithm, CBC(b"\x00" * algorithm.block_size) 

946 ) 

947 

948 def create_cmac_ctx(self, algorithm: BlockCipherAlgorithm) -> _CMACContext: 

949 return _CMACContext(self, algorithm) 

950 

951 def load_pem_private_key( 

952 self, 

953 data: bytes, 

954 password: typing.Optional[bytes], 

955 unsafe_skip_rsa_key_validation: bool, 

956 ) -> PrivateKeyTypes: 

957 return self._load_key( 

958 self._lib.PEM_read_bio_PrivateKey, 

959 data, 

960 password, 

961 unsafe_skip_rsa_key_validation, 

962 ) 

963 

964 def load_pem_public_key(self, data: bytes) -> PublicKeyTypes: 

965 mem_bio = self._bytes_to_bio(data) 

966 # In OpenSSL 3.0.x the PEM_read_bio_PUBKEY function will invoke 

967 # the default password callback if you pass an encrypted private 

968 # key. This is very, very, very bad as the default callback can 

969 # trigger an interactive console prompt, which will hang the 

970 # Python process. We therefore provide our own callback to 

971 # catch this and error out properly. 

972 userdata = self._ffi.new("CRYPTOGRAPHY_PASSWORD_DATA *") 

973 evp_pkey = self._lib.PEM_read_bio_PUBKEY( 

974 mem_bio.bio, 

975 self._ffi.NULL, 

976 self._ffi.addressof( 

977 self._lib._original_lib, "Cryptography_pem_password_cb" 

978 ), 

979 userdata, 

980 ) 

981 if evp_pkey != self._ffi.NULL: 

982 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 

983 return self._evp_pkey_to_public_key(evp_pkey) 

984 else: 

985 # It's not a (RSA/DSA/ECDSA) subjectPublicKeyInfo, but we still 

986 # need to check to see if it is a pure PKCS1 RSA public key (not 

987 # embedded in a subjectPublicKeyInfo) 

988 self._consume_errors() 

989 res = self._lib.BIO_reset(mem_bio.bio) 

990 self.openssl_assert(res == 1) 

991 rsa_cdata = self._lib.PEM_read_bio_RSAPublicKey( 

992 mem_bio.bio, 

993 self._ffi.NULL, 

994 self._ffi.addressof( 

995 self._lib._original_lib, "Cryptography_pem_password_cb" 

996 ), 

997 userdata, 

998 ) 

999 if rsa_cdata != self._ffi.NULL: 

1000 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) 

1001 evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata) 

1002 return _RSAPublicKey(self, rsa_cdata, evp_pkey) 

1003 else: 

1004 self._handle_key_loading_error() 

1005 

1006 def load_pem_parameters(self, data: bytes) -> dh.DHParameters: 

1007 mem_bio = self._bytes_to_bio(data) 

1008 # only DH is supported currently 

1009 dh_cdata = self._lib.PEM_read_bio_DHparams( 

1010 mem_bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL 

1011 ) 

1012 if dh_cdata != self._ffi.NULL: 

1013 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free) 

1014 return _DHParameters(self, dh_cdata) 

1015 else: 

1016 self._handle_key_loading_error() 

1017 

1018 def load_der_private_key( 

1019 self, 

1020 data: bytes, 

1021 password: typing.Optional[bytes], 

1022 unsafe_skip_rsa_key_validation: bool, 

1023 ) -> PrivateKeyTypes: 

1024 # OpenSSL has a function called d2i_AutoPrivateKey that in theory 

1025 # handles this automatically, however it doesn't handle encrypted 

1026 # private keys. Instead we try to load the key two different ways. 

1027 # First we'll try to load it as a traditional key. 

1028 bio_data = self._bytes_to_bio(data) 

1029 key = self._evp_pkey_from_der_traditional_key(bio_data, password) 

1030 if key: 

1031 return self._evp_pkey_to_private_key( 

1032 key, unsafe_skip_rsa_key_validation 

1033 ) 

1034 else: 

1035 # Finally we try to load it with the method that handles encrypted 

1036 # PKCS8 properly. 

1037 return self._load_key( 

1038 self._lib.d2i_PKCS8PrivateKey_bio, 

1039 data, 

1040 password, 

1041 unsafe_skip_rsa_key_validation, 

1042 ) 

1043 

1044 def _evp_pkey_from_der_traditional_key(self, bio_data, password): 

1045 key = self._lib.d2i_PrivateKey_bio(bio_data.bio, self._ffi.NULL) 

1046 if key != self._ffi.NULL: 

1047 key = self._ffi.gc(key, self._lib.EVP_PKEY_free) 

1048 if password is not None: 

1049 raise TypeError( 

1050 "Password was given but private key is not encrypted." 

1051 ) 

1052 

1053 return key 

1054 else: 

1055 self._consume_errors() 

1056 return None 

1057 

1058 def load_der_public_key(self, data: bytes) -> PublicKeyTypes: 

1059 mem_bio = self._bytes_to_bio(data) 

1060 evp_pkey = self._lib.d2i_PUBKEY_bio(mem_bio.bio, self._ffi.NULL) 

1061 if evp_pkey != self._ffi.NULL: 

1062 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 

1063 return self._evp_pkey_to_public_key(evp_pkey) 

1064 else: 

1065 # It's not a (RSA/DSA/ECDSA) subjectPublicKeyInfo, but we still 

1066 # need to check to see if it is a pure PKCS1 RSA public key (not 

1067 # embedded in a subjectPublicKeyInfo) 

1068 self._consume_errors() 

1069 res = self._lib.BIO_reset(mem_bio.bio) 

1070 self.openssl_assert(res == 1) 

1071 rsa_cdata = self._lib.d2i_RSAPublicKey_bio( 

1072 mem_bio.bio, self._ffi.NULL 

1073 ) 

1074 if rsa_cdata != self._ffi.NULL: 

1075 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) 

1076 evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata) 

1077 return _RSAPublicKey(self, rsa_cdata, evp_pkey) 

1078 else: 

1079 self._handle_key_loading_error() 

1080 

1081 def load_der_parameters(self, data: bytes) -> dh.DHParameters: 

1082 mem_bio = self._bytes_to_bio(data) 

1083 dh_cdata = self._lib.d2i_DHparams_bio(mem_bio.bio, self._ffi.NULL) 

1084 if dh_cdata != self._ffi.NULL: 

1085 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free) 

1086 return _DHParameters(self, dh_cdata) 

1087 elif self._lib.Cryptography_HAS_EVP_PKEY_DHX: 

1088 # We check to see if the is dhx. 

1089 self._consume_errors() 

1090 res = self._lib.BIO_reset(mem_bio.bio) 

1091 self.openssl_assert(res == 1) 

1092 dh_cdata = self._lib.d2i_DHxparams_bio(mem_bio.bio, self._ffi.NULL) 

1093 if dh_cdata != self._ffi.NULL: 

1094 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free) 

1095 return _DHParameters(self, dh_cdata) 

1096 

1097 self._handle_key_loading_error() 

1098 

1099 def _cert2ossl(self, cert: x509.Certificate) -> typing.Any: 

1100 data = cert.public_bytes(serialization.Encoding.DER) 

1101 mem_bio = self._bytes_to_bio(data) 

1102 x509 = self._lib.d2i_X509_bio(mem_bio.bio, self._ffi.NULL) 

1103 self.openssl_assert(x509 != self._ffi.NULL) 

1104 x509 = self._ffi.gc(x509, self._lib.X509_free) 

1105 return x509 

1106 

1107 def _ossl2cert(self, x509_ptr: typing.Any) -> x509.Certificate: 

1108 bio = self._create_mem_bio_gc() 

1109 res = self._lib.i2d_X509_bio(bio, x509_ptr) 

1110 self.openssl_assert(res == 1) 

1111 return x509.load_der_x509_certificate(self._read_mem_bio(bio)) 

1112 

1113 def _check_keys_correspond(self, key1, key2) -> None: 

1114 if self._lib.EVP_PKEY_cmp(key1._evp_pkey, key2._evp_pkey) != 1: 

1115 raise ValueError("Keys do not correspond") 

1116 

1117 def _load_key( 

1118 self, openssl_read_func, data, password, unsafe_skip_rsa_key_validation 

1119 ) -> PrivateKeyTypes: 

1120 mem_bio = self._bytes_to_bio(data) 

1121 

1122 userdata = self._ffi.new("CRYPTOGRAPHY_PASSWORD_DATA *") 

1123 if password is not None: 

1124 utils._check_byteslike("password", password) 

1125 password_ptr = self._ffi.from_buffer(password) 

1126 userdata.password = password_ptr 

1127 userdata.length = len(password) 

1128 

1129 evp_pkey = openssl_read_func( 

1130 mem_bio.bio, 

1131 self._ffi.NULL, 

1132 self._ffi.addressof( 

1133 self._lib._original_lib, "Cryptography_pem_password_cb" 

1134 ), 

1135 userdata, 

1136 ) 

1137 

1138 if evp_pkey == self._ffi.NULL: 

1139 if userdata.error != 0: 

1140 self._consume_errors() 

1141 if userdata.error == -1: 

1142 raise TypeError( 

1143 "Password was not given but private key is encrypted" 

1144 ) 

1145 else: 

1146 assert userdata.error == -2 

1147 raise ValueError( 

1148 "Passwords longer than {} bytes are not supported " 

1149 "by this backend.".format(userdata.maxsize - 1) 

1150 ) 

1151 else: 

1152 self._handle_key_loading_error() 

1153 

1154 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 

1155 

1156 if password is not None and userdata.called == 0: 

1157 raise TypeError( 

1158 "Password was given but private key is not encrypted." 

1159 ) 

1160 

1161 assert ( 

1162 password is not None and userdata.called == 1 

1163 ) or password is None 

1164 

1165 return self._evp_pkey_to_private_key( 

1166 evp_pkey, unsafe_skip_rsa_key_validation 

1167 ) 

1168 

1169 def _handle_key_loading_error(self) -> typing.NoReturn: 

1170 errors = self._consume_errors() 

1171 

1172 if not errors: 

1173 raise ValueError( 

1174 "Could not deserialize key data. The data may be in an " 

1175 "incorrect format or it may be encrypted with an unsupported " 

1176 "algorithm." 

1177 ) 

1178 

1179 elif ( 

1180 errors[0]._lib_reason_match( 

1181 self._lib.ERR_LIB_EVP, self._lib.EVP_R_BAD_DECRYPT 

1182 ) 

1183 or errors[0]._lib_reason_match( 

1184 self._lib.ERR_LIB_PKCS12, 

1185 self._lib.PKCS12_R_PKCS12_CIPHERFINAL_ERROR, 

1186 ) 

1187 or ( 

1188 self._lib.Cryptography_HAS_PROVIDERS 

1189 and errors[0]._lib_reason_match( 

1190 self._lib.ERR_LIB_PROV, 

1191 self._lib.PROV_R_BAD_DECRYPT, 

1192 ) 

1193 ) 

1194 ): 

1195 raise ValueError("Bad decrypt. Incorrect password?") 

1196 

1197 elif any( 

1198 error._lib_reason_match( 

1199 self._lib.ERR_LIB_EVP, 

1200 self._lib.EVP_R_UNSUPPORTED_PRIVATE_KEY_ALGORITHM, 

1201 ) 

1202 for error in errors 

1203 ): 

1204 raise ValueError("Unsupported public key algorithm.") 

1205 

1206 else: 

1207 raise ValueError( 

1208 "Could not deserialize key data. The data may be in an " 

1209 "incorrect format, it may be encrypted with an unsupported " 

1210 "algorithm, or it may be an unsupported key type (e.g. EC " 

1211 "curves with explicit parameters).", 

1212 errors, 

1213 ) 

1214 

1215 def elliptic_curve_supported(self, curve: ec.EllipticCurve) -> bool: 

1216 try: 

1217 curve_nid = self._elliptic_curve_to_nid(curve) 

1218 except UnsupportedAlgorithm: 

1219 curve_nid = self._lib.NID_undef 

1220 

1221 group = self._lib.EC_GROUP_new_by_curve_name(curve_nid) 

1222 

1223 if group == self._ffi.NULL: 

1224 self._consume_errors() 

1225 return False 

1226 else: 

1227 self.openssl_assert(curve_nid != self._lib.NID_undef) 

1228 self._lib.EC_GROUP_free(group) 

1229 return True 

1230 

1231 def elliptic_curve_signature_algorithm_supported( 

1232 self, 

1233 signature_algorithm: ec.EllipticCurveSignatureAlgorithm, 

1234 curve: ec.EllipticCurve, 

1235 ) -> bool: 

1236 # We only support ECDSA right now. 

1237 if not isinstance(signature_algorithm, ec.ECDSA): 

1238 return False 

1239 

1240 return self.elliptic_curve_supported(curve) 

1241 

1242 def generate_elliptic_curve_private_key( 

1243 self, curve: ec.EllipticCurve 

1244 ) -> ec.EllipticCurvePrivateKey: 

1245 """ 

1246 Generate a new private key on the named curve. 

1247 """ 

1248 

1249 if self.elliptic_curve_supported(curve): 

1250 ec_cdata = self._ec_key_new_by_curve(curve) 

1251 

1252 res = self._lib.EC_KEY_generate_key(ec_cdata) 

1253 self.openssl_assert(res == 1) 

1254 

1255 evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata) 

1256 

1257 return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey) 

1258 else: 

1259 raise UnsupportedAlgorithm( 

1260 f"Backend object does not support {curve.name}.", 

1261 _Reasons.UNSUPPORTED_ELLIPTIC_CURVE, 

1262 ) 

1263 

1264 def load_elliptic_curve_private_numbers( 

1265 self, numbers: ec.EllipticCurvePrivateNumbers 

1266 ) -> ec.EllipticCurvePrivateKey: 

1267 public = numbers.public_numbers 

1268 

1269 ec_cdata = self._ec_key_new_by_curve(public.curve) 

1270 

1271 private_value = self._ffi.gc( 

1272 self._int_to_bn(numbers.private_value), self._lib.BN_clear_free 

1273 ) 

1274 res = self._lib.EC_KEY_set_private_key(ec_cdata, private_value) 

1275 if res != 1: 

1276 self._consume_errors() 

1277 raise ValueError("Invalid EC key.") 

1278 

1279 with self._tmp_bn_ctx() as bn_ctx: 

1280 self._ec_key_set_public_key_affine_coordinates( 

1281 ec_cdata, public.x, public.y, bn_ctx 

1282 ) 

1283 # derive the expected public point and compare it to the one we 

1284 # just set based on the values we were given. If they don't match 

1285 # this isn't a valid key pair. 

1286 group = self._lib.EC_KEY_get0_group(ec_cdata) 

1287 self.openssl_assert(group != self._ffi.NULL) 

1288 set_point = backend._lib.EC_KEY_get0_public_key(ec_cdata) 

1289 self.openssl_assert(set_point != self._ffi.NULL) 

1290 computed_point = self._lib.EC_POINT_new(group) 

1291 self.openssl_assert(computed_point != self._ffi.NULL) 

1292 computed_point = self._ffi.gc( 

1293 computed_point, self._lib.EC_POINT_free 

1294 ) 

1295 res = self._lib.EC_POINT_mul( 

1296 group, 

1297 computed_point, 

1298 private_value, 

1299 self._ffi.NULL, 

1300 self._ffi.NULL, 

1301 bn_ctx, 

1302 ) 

1303 self.openssl_assert(res == 1) 

1304 if ( 

1305 self._lib.EC_POINT_cmp( 

1306 group, set_point, computed_point, bn_ctx 

1307 ) 

1308 != 0 

1309 ): 

1310 raise ValueError("Invalid EC key.") 

1311 

1312 evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata) 

1313 

1314 return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey) 

1315 

1316 def load_elliptic_curve_public_numbers( 

1317 self, numbers: ec.EllipticCurvePublicNumbers 

1318 ) -> ec.EllipticCurvePublicKey: 

1319 ec_cdata = self._ec_key_new_by_curve(numbers.curve) 

1320 with self._tmp_bn_ctx() as bn_ctx: 

1321 self._ec_key_set_public_key_affine_coordinates( 

1322 ec_cdata, numbers.x, numbers.y, bn_ctx 

1323 ) 

1324 evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata) 

1325 

1326 return _EllipticCurvePublicKey(self, ec_cdata, evp_pkey) 

1327 

1328 def load_elliptic_curve_public_bytes( 

1329 self, curve: ec.EllipticCurve, point_bytes: bytes 

1330 ) -> ec.EllipticCurvePublicKey: 

1331 ec_cdata = self._ec_key_new_by_curve(curve) 

1332 group = self._lib.EC_KEY_get0_group(ec_cdata) 

1333 self.openssl_assert(group != self._ffi.NULL) 

1334 point = self._lib.EC_POINT_new(group) 

1335 self.openssl_assert(point != self._ffi.NULL) 

1336 point = self._ffi.gc(point, self._lib.EC_POINT_free) 

1337 with self._tmp_bn_ctx() as bn_ctx: 

1338 res = self._lib.EC_POINT_oct2point( 

1339 group, point, point_bytes, len(point_bytes), bn_ctx 

1340 ) 

1341 if res != 1: 

1342 self._consume_errors() 

1343 raise ValueError("Invalid public bytes for the given curve") 

1344 

1345 res = self._lib.EC_KEY_set_public_key(ec_cdata, point) 

1346 self.openssl_assert(res == 1) 

1347 evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata) 

1348 return _EllipticCurvePublicKey(self, ec_cdata, evp_pkey) 

1349 

1350 def derive_elliptic_curve_private_key( 

1351 self, private_value: int, curve: ec.EllipticCurve 

1352 ) -> ec.EllipticCurvePrivateKey: 

1353 ec_cdata = self._ec_key_new_by_curve(curve) 

1354 

1355 group = self._lib.EC_KEY_get0_group(ec_cdata) 

1356 self.openssl_assert(group != self._ffi.NULL) 

1357 

1358 point = self._lib.EC_POINT_new(group) 

1359 self.openssl_assert(point != self._ffi.NULL) 

1360 point = self._ffi.gc(point, self._lib.EC_POINT_free) 

1361 

1362 value = self._int_to_bn(private_value) 

1363 value = self._ffi.gc(value, self._lib.BN_clear_free) 

1364 

1365 with self._tmp_bn_ctx() as bn_ctx: 

1366 res = self._lib.EC_POINT_mul( 

1367 group, point, value, self._ffi.NULL, self._ffi.NULL, bn_ctx 

1368 ) 

1369 self.openssl_assert(res == 1) 

1370 

1371 bn_x = self._lib.BN_CTX_get(bn_ctx) 

1372 bn_y = self._lib.BN_CTX_get(bn_ctx) 

1373 

1374 res = self._lib.EC_POINT_get_affine_coordinates( 

1375 group, point, bn_x, bn_y, bn_ctx 

1376 ) 

1377 if res != 1: 

1378 self._consume_errors() 

1379 raise ValueError("Unable to derive key from private_value") 

1380 

1381 res = self._lib.EC_KEY_set_public_key(ec_cdata, point) 

1382 self.openssl_assert(res == 1) 

1383 private = self._int_to_bn(private_value) 

1384 private = self._ffi.gc(private, self._lib.BN_clear_free) 

1385 res = self._lib.EC_KEY_set_private_key(ec_cdata, private) 

1386 self.openssl_assert(res == 1) 

1387 

1388 evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata) 

1389 

1390 return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey) 

1391 

1392 def _ec_key_new_by_curve(self, curve: ec.EllipticCurve): 

1393 curve_nid = self._elliptic_curve_to_nid(curve) 

1394 return self._ec_key_new_by_curve_nid(curve_nid) 

1395 

1396 def _ec_key_new_by_curve_nid(self, curve_nid: int): 

1397 ec_cdata = self._lib.EC_KEY_new_by_curve_name(curve_nid) 

1398 self.openssl_assert(ec_cdata != self._ffi.NULL) 

1399 return self._ffi.gc(ec_cdata, self._lib.EC_KEY_free) 

1400 

1401 def elliptic_curve_exchange_algorithm_supported( 

1402 self, algorithm: ec.ECDH, curve: ec.EllipticCurve 

1403 ) -> bool: 

1404 if self._fips_enabled and not isinstance( 

1405 curve, self._fips_ecdh_curves 

1406 ): 

1407 return False 

1408 

1409 return self.elliptic_curve_supported(curve) and isinstance( 

1410 algorithm, ec.ECDH 

1411 ) 

1412 

1413 def _ec_cdata_to_evp_pkey(self, ec_cdata): 

1414 evp_pkey = self._create_evp_pkey_gc() 

1415 res = self._lib.EVP_PKEY_set1_EC_KEY(evp_pkey, ec_cdata) 

1416 self.openssl_assert(res == 1) 

1417 return evp_pkey 

1418 

1419 def _elliptic_curve_to_nid(self, curve: ec.EllipticCurve) -> int: 

1420 """ 

1421 Get the NID for a curve name. 

1422 """ 

1423 

1424 curve_aliases = {"secp192r1": "prime192v1", "secp256r1": "prime256v1"} 

1425 

1426 curve_name = curve_aliases.get(curve.name, curve.name) 

1427 

1428 curve_nid = self._lib.OBJ_sn2nid(curve_name.encode()) 

1429 if curve_nid == self._lib.NID_undef: 

1430 raise UnsupportedAlgorithm( 

1431 f"{curve.name} is not a supported elliptic curve", 

1432 _Reasons.UNSUPPORTED_ELLIPTIC_CURVE, 

1433 ) 

1434 return curve_nid 

1435 

1436 @contextmanager 

1437 def _tmp_bn_ctx(self): 

1438 bn_ctx = self._lib.BN_CTX_new() 

1439 self.openssl_assert(bn_ctx != self._ffi.NULL) 

1440 bn_ctx = self._ffi.gc(bn_ctx, self._lib.BN_CTX_free) 

1441 self._lib.BN_CTX_start(bn_ctx) 

1442 try: 

1443 yield bn_ctx 

1444 finally: 

1445 self._lib.BN_CTX_end(bn_ctx) 

1446 

1447 def _ec_key_set_public_key_affine_coordinates( 

1448 self, 

1449 ec_cdata, 

1450 x: int, 

1451 y: int, 

1452 bn_ctx, 

1453 ) -> None: 

1454 """ 

1455 Sets the public key point in the EC_KEY context to the affine x and y 

1456 values. 

1457 """ 

1458 

1459 if x < 0 or y < 0: 

1460 raise ValueError( 

1461 "Invalid EC key. Both x and y must be non-negative." 

1462 ) 

1463 

1464 x = self._ffi.gc(self._int_to_bn(x), self._lib.BN_free) 

1465 y = self._ffi.gc(self._int_to_bn(y), self._lib.BN_free) 

1466 group = self._lib.EC_KEY_get0_group(ec_cdata) 

1467 self.openssl_assert(group != self._ffi.NULL) 

1468 point = self._lib.EC_POINT_new(group) 

1469 self.openssl_assert(point != self._ffi.NULL) 

1470 point = self._ffi.gc(point, self._lib.EC_POINT_free) 

1471 res = self._lib.EC_POINT_set_affine_coordinates( 

1472 group, point, x, y, bn_ctx 

1473 ) 

1474 if res != 1: 

1475 self._consume_errors() 

1476 raise ValueError("Invalid EC key.") 

1477 res = self._lib.EC_KEY_set_public_key(ec_cdata, point) 

1478 self.openssl_assert(res == 1) 

1479 

1480 def _private_key_bytes( 

1481 self, 

1482 encoding: serialization.Encoding, 

1483 format: serialization.PrivateFormat, 

1484 encryption_algorithm: serialization.KeySerializationEncryption, 

1485 key, 

1486 evp_pkey, 

1487 cdata, 

1488 ) -> bytes: 

1489 # validate argument types 

1490 if not isinstance(encoding, serialization.Encoding): 

1491 raise TypeError("encoding must be an item from the Encoding enum") 

1492 if not isinstance(format, serialization.PrivateFormat): 

1493 raise TypeError( 

1494 "format must be an item from the PrivateFormat enum" 

1495 ) 

1496 if not isinstance( 

1497 encryption_algorithm, serialization.KeySerializationEncryption 

1498 ): 

1499 raise TypeError( 

1500 "Encryption algorithm must be a KeySerializationEncryption " 

1501 "instance" 

1502 ) 

1503 

1504 # validate password 

1505 if isinstance(encryption_algorithm, serialization.NoEncryption): 

1506 password = b"" 

1507 elif isinstance( 

1508 encryption_algorithm, serialization.BestAvailableEncryption 

1509 ): 

1510 password = encryption_algorithm.password 

1511 if len(password) > 1023: 

1512 raise ValueError( 

1513 "Passwords longer than 1023 bytes are not supported by " 

1514 "this backend" 

1515 ) 

1516 elif ( 

1517 isinstance( 

1518 encryption_algorithm, serialization._KeySerializationEncryption 

1519 ) 

1520 and encryption_algorithm._format 

1521 is format 

1522 is serialization.PrivateFormat.OpenSSH 

1523 ): 

1524 password = encryption_algorithm.password 

1525 else: 

1526 raise ValueError("Unsupported encryption type") 

1527 

1528 # PKCS8 + PEM/DER 

1529 if format is serialization.PrivateFormat.PKCS8: 

1530 if encoding is serialization.Encoding.PEM: 

1531 write_bio = self._lib.PEM_write_bio_PKCS8PrivateKey 

1532 elif encoding is serialization.Encoding.DER: 

1533 write_bio = self._lib.i2d_PKCS8PrivateKey_bio 

1534 else: 

1535 raise ValueError("Unsupported encoding for PKCS8") 

1536 return self._private_key_bytes_via_bio( 

1537 write_bio, evp_pkey, password 

1538 ) 

1539 

1540 # TraditionalOpenSSL + PEM/DER 

1541 if format is serialization.PrivateFormat.TraditionalOpenSSL: 

1542 if self._fips_enabled and not isinstance( 

1543 encryption_algorithm, serialization.NoEncryption 

1544 ): 

1545 raise ValueError( 

1546 "Encrypted traditional OpenSSL format is not " 

1547 "supported in FIPS mode." 

1548 ) 

1549 key_type = self._lib.EVP_PKEY_id(evp_pkey) 

1550 

1551 if encoding is serialization.Encoding.PEM: 

1552 if key_type == self._lib.EVP_PKEY_RSA: 

1553 write_bio = self._lib.PEM_write_bio_RSAPrivateKey 

1554 elif key_type == self._lib.EVP_PKEY_DSA: 

1555 write_bio = self._lib.PEM_write_bio_DSAPrivateKey 

1556 elif key_type == self._lib.EVP_PKEY_EC: 

1557 write_bio = self._lib.PEM_write_bio_ECPrivateKey 

1558 else: 

1559 raise ValueError( 

1560 "Unsupported key type for TraditionalOpenSSL" 

1561 ) 

1562 return self._private_key_bytes_via_bio( 

1563 write_bio, cdata, password 

1564 ) 

1565 

1566 if encoding is serialization.Encoding.DER: 

1567 if password: 

1568 raise ValueError( 

1569 "Encryption is not supported for DER encoded " 

1570 "traditional OpenSSL keys" 

1571 ) 

1572 if key_type == self._lib.EVP_PKEY_RSA: 

1573 write_bio = self._lib.i2d_RSAPrivateKey_bio 

1574 elif key_type == self._lib.EVP_PKEY_EC: 

1575 write_bio = self._lib.i2d_ECPrivateKey_bio 

1576 elif key_type == self._lib.EVP_PKEY_DSA: 

1577 write_bio = self._lib.i2d_DSAPrivateKey_bio 

1578 else: 

1579 raise ValueError( 

1580 "Unsupported key type for TraditionalOpenSSL" 

1581 ) 

1582 return self._bio_func_output(write_bio, cdata) 

1583 

1584 raise ValueError("Unsupported encoding for TraditionalOpenSSL") 

1585 

1586 # OpenSSH + PEM 

1587 if format is serialization.PrivateFormat.OpenSSH: 

1588 if encoding is serialization.Encoding.PEM: 

1589 return ssh._serialize_ssh_private_key( 

1590 key, password, encryption_algorithm 

1591 ) 

1592 

1593 raise ValueError( 

1594 "OpenSSH private key format can only be used" 

1595 " with PEM encoding" 

1596 ) 

1597 

1598 # Anything that key-specific code was supposed to handle earlier, 

1599 # like Raw. 

1600 raise ValueError("format is invalid with this key") 

1601 

1602 def _private_key_bytes_via_bio( 

1603 self, write_bio, evp_pkey, password 

1604 ) -> bytes: 

1605 if not password: 

1606 evp_cipher = self._ffi.NULL 

1607 else: 

1608 # This is a curated value that we will update over time. 

1609 evp_cipher = self._lib.EVP_get_cipherbyname(b"aes-256-cbc") 

1610 

1611 return self._bio_func_output( 

1612 write_bio, 

1613 evp_pkey, 

1614 evp_cipher, 

1615 password, 

1616 len(password), 

1617 self._ffi.NULL, 

1618 self._ffi.NULL, 

1619 ) 

1620 

1621 def _bio_func_output(self, write_bio, *args) -> bytes: 

1622 bio = self._create_mem_bio_gc() 

1623 res = write_bio(bio, *args) 

1624 self.openssl_assert(res == 1) 

1625 return self._read_mem_bio(bio) 

1626 

1627 def _public_key_bytes( 

1628 self, 

1629 encoding: serialization.Encoding, 

1630 format: serialization.PublicFormat, 

1631 key, 

1632 evp_pkey, 

1633 cdata, 

1634 ) -> bytes: 

1635 if not isinstance(encoding, serialization.Encoding): 

1636 raise TypeError("encoding must be an item from the Encoding enum") 

1637 if not isinstance(format, serialization.PublicFormat): 

1638 raise TypeError( 

1639 "format must be an item from the PublicFormat enum" 

1640 ) 

1641 

1642 # SubjectPublicKeyInfo + PEM/DER 

1643 if format is serialization.PublicFormat.SubjectPublicKeyInfo: 

1644 if encoding is serialization.Encoding.PEM: 

1645 write_bio = self._lib.PEM_write_bio_PUBKEY 

1646 elif encoding is serialization.Encoding.DER: 

1647 write_bio = self._lib.i2d_PUBKEY_bio 

1648 else: 

1649 raise ValueError( 

1650 "SubjectPublicKeyInfo works only with PEM or DER encoding" 

1651 ) 

1652 return self._bio_func_output(write_bio, evp_pkey) 

1653 

1654 # PKCS1 + PEM/DER 

1655 if format is serialization.PublicFormat.PKCS1: 

1656 # Only RSA is supported here. 

1657 key_type = self._lib.EVP_PKEY_id(evp_pkey) 

1658 if key_type != self._lib.EVP_PKEY_RSA: 

1659 raise ValueError("PKCS1 format is supported only for RSA keys") 

1660 

1661 if encoding is serialization.Encoding.PEM: 

1662 write_bio = self._lib.PEM_write_bio_RSAPublicKey 

1663 elif encoding is serialization.Encoding.DER: 

1664 write_bio = self._lib.i2d_RSAPublicKey_bio 

1665 else: 

1666 raise ValueError("PKCS1 works only with PEM or DER encoding") 

1667 return self._bio_func_output(write_bio, cdata) 

1668 

1669 # OpenSSH + OpenSSH 

1670 if format is serialization.PublicFormat.OpenSSH: 

1671 if encoding is serialization.Encoding.OpenSSH: 

1672 return ssh.serialize_ssh_public_key(key) 

1673 

1674 raise ValueError( 

1675 "OpenSSH format must be used with OpenSSH encoding" 

1676 ) 

1677 

1678 # Anything that key-specific code was supposed to handle earlier, 

1679 # like Raw, CompressedPoint, UncompressedPoint 

1680 raise ValueError("format is invalid with this key") 

1681 

1682 def dh_supported(self) -> bool: 

1683 return not self._lib.CRYPTOGRAPHY_IS_BORINGSSL 

1684 

1685 def generate_dh_parameters( 

1686 self, generator: int, key_size: int 

1687 ) -> dh.DHParameters: 

1688 if key_size < dh._MIN_MODULUS_SIZE: 

1689 raise ValueError( 

1690 "DH key_size must be at least {} bits".format( 

1691 dh._MIN_MODULUS_SIZE 

1692 ) 

1693 ) 

1694 

1695 if generator not in (2, 5): 

1696 raise ValueError("DH generator must be 2 or 5") 

1697 

1698 dh_param_cdata = self._lib.DH_new() 

1699 self.openssl_assert(dh_param_cdata != self._ffi.NULL) 

1700 dh_param_cdata = self._ffi.gc(dh_param_cdata, self._lib.DH_free) 

1701 

1702 res = self._lib.DH_generate_parameters_ex( 

1703 dh_param_cdata, key_size, generator, self._ffi.NULL 

1704 ) 

1705 if res != 1: 

1706 errors = self._consume_errors() 

1707 raise ValueError("Unable to generate DH parameters", errors) 

1708 

1709 return _DHParameters(self, dh_param_cdata) 

1710 

1711 def _dh_cdata_to_evp_pkey(self, dh_cdata): 

1712 evp_pkey = self._create_evp_pkey_gc() 

1713 res = self._lib.EVP_PKEY_set1_DH(evp_pkey, dh_cdata) 

1714 self.openssl_assert(res == 1) 

1715 return evp_pkey 

1716 

1717 def generate_dh_private_key( 

1718 self, parameters: dh.DHParameters 

1719 ) -> dh.DHPrivateKey: 

1720 dh_key_cdata = _dh_params_dup( 

1721 parameters._dh_cdata, self # type: ignore[attr-defined] 

1722 ) 

1723 

1724 res = self._lib.DH_generate_key(dh_key_cdata) 

1725 self.openssl_assert(res == 1) 

1726 

1727 evp_pkey = self._dh_cdata_to_evp_pkey(dh_key_cdata) 

1728 

1729 return _DHPrivateKey(self, dh_key_cdata, evp_pkey) 

1730 

1731 def generate_dh_private_key_and_parameters( 

1732 self, generator: int, key_size: int 

1733 ) -> dh.DHPrivateKey: 

1734 return self.generate_dh_private_key( 

1735 self.generate_dh_parameters(generator, key_size) 

1736 ) 

1737 

1738 def load_dh_private_numbers( 

1739 self, numbers: dh.DHPrivateNumbers 

1740 ) -> dh.DHPrivateKey: 

1741 parameter_numbers = numbers.public_numbers.parameter_numbers 

1742 

1743 dh_cdata = self._lib.DH_new() 

1744 self.openssl_assert(dh_cdata != self._ffi.NULL) 

1745 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free) 

1746 

1747 p = self._int_to_bn(parameter_numbers.p) 

1748 g = self._int_to_bn(parameter_numbers.g) 

1749 

1750 if parameter_numbers.q is not None: 

1751 q = self._int_to_bn(parameter_numbers.q) 

1752 else: 

1753 q = self._ffi.NULL 

1754 

1755 pub_key = self._int_to_bn(numbers.public_numbers.y) 

1756 priv_key = self._int_to_bn(numbers.x) 

1757 

1758 res = self._lib.DH_set0_pqg(dh_cdata, p, q, g) 

1759 self.openssl_assert(res == 1) 

1760 

1761 res = self._lib.DH_set0_key(dh_cdata, pub_key, priv_key) 

1762 self.openssl_assert(res == 1) 

1763 

1764 codes = self._ffi.new("int[]", 1) 

1765 res = self._lib.DH_check(dh_cdata, codes) 

1766 self.openssl_assert(res == 1) 

1767 

1768 # DH_check will return DH_NOT_SUITABLE_GENERATOR if p % 24 does not 

1769 # equal 11 when the generator is 2 (a quadratic nonresidue). 

1770 # We want to ignore that error because p % 24 == 23 is also fine. 

1771 # Specifically, g is then a quadratic residue. Within the context of 

1772 # Diffie-Hellman this means it can only generate half the possible 

1773 # values. That sounds bad, but quadratic nonresidues leak a bit of 

1774 # the key to the attacker in exchange for having the full key space 

1775 # available. See: https://crypto.stackexchange.com/questions/12961 

1776 if codes[0] != 0 and not ( 

1777 parameter_numbers.g == 2 

1778 and codes[0] ^ self._lib.DH_NOT_SUITABLE_GENERATOR == 0 

1779 ): 

1780 raise ValueError("DH private numbers did not pass safety checks.") 

1781 

1782 evp_pkey = self._dh_cdata_to_evp_pkey(dh_cdata) 

1783 

1784 return _DHPrivateKey(self, dh_cdata, evp_pkey) 

1785 

1786 def load_dh_public_numbers( 

1787 self, numbers: dh.DHPublicNumbers 

1788 ) -> dh.DHPublicKey: 

1789 dh_cdata = self._lib.DH_new() 

1790 self.openssl_assert(dh_cdata != self._ffi.NULL) 

1791 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free) 

1792 

1793 parameter_numbers = numbers.parameter_numbers 

1794 

1795 p = self._int_to_bn(parameter_numbers.p) 

1796 g = self._int_to_bn(parameter_numbers.g) 

1797 

1798 if parameter_numbers.q is not None: 

1799 q = self._int_to_bn(parameter_numbers.q) 

1800 else: 

1801 q = self._ffi.NULL 

1802 

1803 pub_key = self._int_to_bn(numbers.y) 

1804 

1805 res = self._lib.DH_set0_pqg(dh_cdata, p, q, g) 

1806 self.openssl_assert(res == 1) 

1807 

1808 res = self._lib.DH_set0_key(dh_cdata, pub_key, self._ffi.NULL) 

1809 self.openssl_assert(res == 1) 

1810 

1811 evp_pkey = self._dh_cdata_to_evp_pkey(dh_cdata) 

1812 

1813 return _DHPublicKey(self, dh_cdata, evp_pkey) 

1814 

1815 def load_dh_parameter_numbers( 

1816 self, numbers: dh.DHParameterNumbers 

1817 ) -> dh.DHParameters: 

1818 dh_cdata = self._lib.DH_new() 

1819 self.openssl_assert(dh_cdata != self._ffi.NULL) 

1820 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free) 

1821 

1822 p = self._int_to_bn(numbers.p) 

1823 g = self._int_to_bn(numbers.g) 

1824 

1825 if numbers.q is not None: 

1826 q = self._int_to_bn(numbers.q) 

1827 else: 

1828 q = self._ffi.NULL 

1829 

1830 res = self._lib.DH_set0_pqg(dh_cdata, p, q, g) 

1831 self.openssl_assert(res == 1) 

1832 

1833 return _DHParameters(self, dh_cdata) 

1834 

1835 def dh_parameters_supported( 

1836 self, p: int, g: int, q: typing.Optional[int] = None 

1837 ) -> bool: 

1838 dh_cdata = self._lib.DH_new() 

1839 self.openssl_assert(dh_cdata != self._ffi.NULL) 

1840 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free) 

1841 

1842 p = self._int_to_bn(p) 

1843 g = self._int_to_bn(g) 

1844 

1845 if q is not None: 

1846 q = self._int_to_bn(q) 

1847 else: 

1848 q = self._ffi.NULL 

1849 

1850 res = self._lib.DH_set0_pqg(dh_cdata, p, q, g) 

1851 self.openssl_assert(res == 1) 

1852 

1853 codes = self._ffi.new("int[]", 1) 

1854 res = self._lib.DH_check(dh_cdata, codes) 

1855 self.openssl_assert(res == 1) 

1856 

1857 return codes[0] == 0 

1858 

1859 def dh_x942_serialization_supported(self) -> bool: 

1860 return self._lib.Cryptography_HAS_EVP_PKEY_DHX == 1 

1861 

1862 def x25519_load_public_bytes(self, data: bytes) -> x25519.X25519PublicKey: 

1863 return rust_openssl.x25519.from_public_bytes(data) 

1864 

1865 def x25519_load_private_bytes( 

1866 self, data: bytes 

1867 ) -> x25519.X25519PrivateKey: 

1868 return rust_openssl.x25519.from_private_bytes(data) 

1869 

1870 def _evp_pkey_keygen_gc(self, nid): 

1871 evp_pkey_ctx = self._lib.EVP_PKEY_CTX_new_id(nid, self._ffi.NULL) 

1872 self.openssl_assert(evp_pkey_ctx != self._ffi.NULL) 

1873 evp_pkey_ctx = self._ffi.gc(evp_pkey_ctx, self._lib.EVP_PKEY_CTX_free) 

1874 res = self._lib.EVP_PKEY_keygen_init(evp_pkey_ctx) 

1875 self.openssl_assert(res == 1) 

1876 evp_ppkey = self._ffi.new("EVP_PKEY **") 

1877 res = self._lib.EVP_PKEY_keygen(evp_pkey_ctx, evp_ppkey) 

1878 self.openssl_assert(res == 1) 

1879 self.openssl_assert(evp_ppkey[0] != self._ffi.NULL) 

1880 evp_pkey = self._ffi.gc(evp_ppkey[0], self._lib.EVP_PKEY_free) 

1881 return evp_pkey 

1882 

1883 def x25519_generate_key(self) -> x25519.X25519PrivateKey: 

1884 return rust_openssl.x25519.generate_key() 

1885 

1886 def x25519_supported(self) -> bool: 

1887 if self._fips_enabled: 

1888 return False 

1889 return not self._lib.CRYPTOGRAPHY_LIBRESSL_LESS_THAN_370 

1890 

1891 def x448_load_public_bytes(self, data: bytes) -> x448.X448PublicKey: 

1892 if len(data) != 56: 

1893 raise ValueError("An X448 public key is 56 bytes long") 

1894 

1895 evp_pkey = self._lib.EVP_PKEY_new_raw_public_key( 

1896 self._lib.NID_X448, self._ffi.NULL, data, len(data) 

1897 ) 

1898 self.openssl_assert(evp_pkey != self._ffi.NULL) 

1899 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 

1900 return _X448PublicKey(self, evp_pkey) 

1901 

1902 def x448_load_private_bytes(self, data: bytes) -> x448.X448PrivateKey: 

1903 if len(data) != 56: 

1904 raise ValueError("An X448 private key is 56 bytes long") 

1905 

1906 data_ptr = self._ffi.from_buffer(data) 

1907 evp_pkey = self._lib.EVP_PKEY_new_raw_private_key( 

1908 self._lib.NID_X448, self._ffi.NULL, data_ptr, len(data) 

1909 ) 

1910 self.openssl_assert(evp_pkey != self._ffi.NULL) 

1911 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 

1912 return _X448PrivateKey(self, evp_pkey) 

1913 

1914 def x448_generate_key(self) -> x448.X448PrivateKey: 

1915 evp_pkey = self._evp_pkey_keygen_gc(self._lib.NID_X448) 

1916 return _X448PrivateKey(self, evp_pkey) 

1917 

1918 def x448_supported(self) -> bool: 

1919 if self._fips_enabled: 

1920 return False 

1921 return ( 

1922 not self._lib.CRYPTOGRAPHY_IS_LIBRESSL 

1923 and not self._lib.CRYPTOGRAPHY_IS_BORINGSSL 

1924 ) 

1925 

1926 def ed25519_supported(self) -> bool: 

1927 if self._fips_enabled: 

1928 return False 

1929 return self._lib.CRYPTOGRAPHY_HAS_WORKING_ED25519 

1930 

1931 def ed25519_load_public_bytes( 

1932 self, data: bytes 

1933 ) -> ed25519.Ed25519PublicKey: 

1934 utils._check_bytes("data", data) 

1935 

1936 if len(data) != ed25519._ED25519_KEY_SIZE: 

1937 raise ValueError("An Ed25519 public key is 32 bytes long") 

1938 

1939 evp_pkey = self._lib.EVP_PKEY_new_raw_public_key( 

1940 self._lib.NID_ED25519, self._ffi.NULL, data, len(data) 

1941 ) 

1942 self.openssl_assert(evp_pkey != self._ffi.NULL) 

1943 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 

1944 

1945 return _Ed25519PublicKey(self, evp_pkey) 

1946 

1947 def ed25519_load_private_bytes( 

1948 self, data: bytes 

1949 ) -> ed25519.Ed25519PrivateKey: 

1950 if len(data) != ed25519._ED25519_KEY_SIZE: 

1951 raise ValueError("An Ed25519 private key is 32 bytes long") 

1952 

1953 utils._check_byteslike("data", data) 

1954 data_ptr = self._ffi.from_buffer(data) 

1955 evp_pkey = self._lib.EVP_PKEY_new_raw_private_key( 

1956 self._lib.NID_ED25519, self._ffi.NULL, data_ptr, len(data) 

1957 ) 

1958 self.openssl_assert(evp_pkey != self._ffi.NULL) 

1959 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 

1960 

1961 return _Ed25519PrivateKey(self, evp_pkey) 

1962 

1963 def ed25519_generate_key(self) -> ed25519.Ed25519PrivateKey: 

1964 evp_pkey = self._evp_pkey_keygen_gc(self._lib.NID_ED25519) 

1965 return _Ed25519PrivateKey(self, evp_pkey) 

1966 

1967 def ed448_supported(self) -> bool: 

1968 if self._fips_enabled: 

1969 return False 

1970 return ( 

1971 not self._lib.CRYPTOGRAPHY_OPENSSL_LESS_THAN_111B 

1972 and not self._lib.CRYPTOGRAPHY_IS_BORINGSSL 

1973 ) 

1974 

1975 def ed448_load_public_bytes(self, data: bytes) -> ed448.Ed448PublicKey: 

1976 utils._check_bytes("data", data) 

1977 if len(data) != _ED448_KEY_SIZE: 

1978 raise ValueError("An Ed448 public key is 57 bytes long") 

1979 

1980 evp_pkey = self._lib.EVP_PKEY_new_raw_public_key( 

1981 self._lib.NID_ED448, self._ffi.NULL, data, len(data) 

1982 ) 

1983 self.openssl_assert(evp_pkey != self._ffi.NULL) 

1984 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 

1985 

1986 return _Ed448PublicKey(self, evp_pkey) 

1987 

1988 def ed448_load_private_bytes(self, data: bytes) -> ed448.Ed448PrivateKey: 

1989 utils._check_byteslike("data", data) 

1990 if len(data) != _ED448_KEY_SIZE: 

1991 raise ValueError("An Ed448 private key is 57 bytes long") 

1992 

1993 data_ptr = self._ffi.from_buffer(data) 

1994 evp_pkey = self._lib.EVP_PKEY_new_raw_private_key( 

1995 self._lib.NID_ED448, self._ffi.NULL, data_ptr, len(data) 

1996 ) 

1997 self.openssl_assert(evp_pkey != self._ffi.NULL) 

1998 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 

1999 

2000 return _Ed448PrivateKey(self, evp_pkey) 

2001 

2002 def ed448_generate_key(self) -> ed448.Ed448PrivateKey: 

2003 evp_pkey = self._evp_pkey_keygen_gc(self._lib.NID_ED448) 

2004 return _Ed448PrivateKey(self, evp_pkey) 

2005 

2006 def derive_scrypt( 

2007 self, 

2008 key_material: bytes, 

2009 salt: bytes, 

2010 length: int, 

2011 n: int, 

2012 r: int, 

2013 p: int, 

2014 ) -> bytes: 

2015 buf = self._ffi.new("unsigned char[]", length) 

2016 key_material_ptr = self._ffi.from_buffer(key_material) 

2017 res = self._lib.EVP_PBE_scrypt( 

2018 key_material_ptr, 

2019 len(key_material), 

2020 salt, 

2021 len(salt), 

2022 n, 

2023 r, 

2024 p, 

2025 scrypt._MEM_LIMIT, 

2026 buf, 

2027 length, 

2028 ) 

2029 if res != 1: 

2030 errors = self._consume_errors() 

2031 # memory required formula explained here: 

2032 # https://blog.filippo.io/the-scrypt-parameters/ 

2033 min_memory = 128 * n * r // (1024**2) 

2034 raise MemoryError( 

2035 "Not enough memory to derive key. These parameters require" 

2036 " {} MB of memory.".format(min_memory), 

2037 errors, 

2038 ) 

2039 return self._ffi.buffer(buf)[:] 

2040 

2041 def aead_cipher_supported(self, cipher) -> bool: 

2042 cipher_name = aead._aead_cipher_name(cipher) 

2043 if self._fips_enabled and cipher_name not in self._fips_aead: 

2044 return False 

2045 # SIV isn't loaded through get_cipherbyname but instead a new fetch API 

2046 # only available in 3.0+. But if we know we're on 3.0+ then we know 

2047 # it's supported. 

2048 if cipher_name.endswith(b"-siv"): 

2049 return self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER == 1 

2050 else: 

2051 return ( 

2052 self._lib.EVP_get_cipherbyname(cipher_name) != self._ffi.NULL 

2053 ) 

2054 

2055 def _zero_data(self, data, length: int) -> None: 

2056 # We clear things this way because at the moment we're not 

2057 # sure of a better way that can guarantee it overwrites the 

2058 # memory of a bytearray and doesn't just replace the underlying char *. 

2059 for i in range(length): 

2060 data[i] = 0 

2061 

2062 @contextlib.contextmanager 

2063 def _zeroed_null_terminated_buf(self, data): 

2064 """ 

2065 This method takes bytes, which can be a bytestring or a mutable 

2066 buffer like a bytearray, and yields a null-terminated version of that 

2067 data. This is required because PKCS12_parse doesn't take a length with 

2068 its password char * and ffi.from_buffer doesn't provide null 

2069 termination. So, to support zeroing the data via bytearray we 

2070 need to build this ridiculous construct that copies the memory, but 

2071 zeroes it after use. 

2072 """ 

2073 if data is None: 

2074 yield self._ffi.NULL 

2075 else: 

2076 data_len = len(data) 

2077 buf = self._ffi.new("char[]", data_len + 1) 

2078 self._ffi.memmove(buf, data, data_len) 

2079 try: 

2080 yield buf 

2081 finally: 

2082 # Cast to a uint8_t * so we can assign by integer 

2083 self._zero_data(self._ffi.cast("uint8_t *", buf), data_len) 

2084 

2085 def load_key_and_certificates_from_pkcs12( 

2086 self, data: bytes, password: typing.Optional[bytes] 

2087 ) -> typing.Tuple[ 

2088 typing.Optional[PrivateKeyTypes], 

2089 typing.Optional[x509.Certificate], 

2090 typing.List[x509.Certificate], 

2091 ]: 

2092 pkcs12 = self.load_pkcs12(data, password) 

2093 return ( 

2094 pkcs12.key, 

2095 pkcs12.cert.certificate if pkcs12.cert else None, 

2096 [cert.certificate for cert in pkcs12.additional_certs], 

2097 ) 

2098 

2099 def load_pkcs12( 

2100 self, data: bytes, password: typing.Optional[bytes] 

2101 ) -> PKCS12KeyAndCertificates: 

2102 if password is not None: 

2103 utils._check_byteslike("password", password) 

2104 

2105 bio = self._bytes_to_bio(data) 

2106 p12 = self._lib.d2i_PKCS12_bio(bio.bio, self._ffi.NULL) 

2107 if p12 == self._ffi.NULL: 

2108 self._consume_errors() 

2109 raise ValueError("Could not deserialize PKCS12 data") 

2110 

2111 p12 = self._ffi.gc(p12, self._lib.PKCS12_free) 

2112 evp_pkey_ptr = self._ffi.new("EVP_PKEY **") 

2113 x509_ptr = self._ffi.new("X509 **") 

2114 sk_x509_ptr = self._ffi.new("Cryptography_STACK_OF_X509 **") 

2115 with self._zeroed_null_terminated_buf(password) as password_buf: 

2116 res = self._lib.PKCS12_parse( 

2117 p12, password_buf, evp_pkey_ptr, x509_ptr, sk_x509_ptr 

2118 ) 

2119 if res == 0: 

2120 self._consume_errors() 

2121 raise ValueError("Invalid password or PKCS12 data") 

2122 

2123 cert = None 

2124 key = None 

2125 additional_certificates = [] 

2126 

2127 if evp_pkey_ptr[0] != self._ffi.NULL: 

2128 evp_pkey = self._ffi.gc(evp_pkey_ptr[0], self._lib.EVP_PKEY_free) 

2129 # We don't support turning off RSA key validation when loading 

2130 # PKCS12 keys 

2131 key = self._evp_pkey_to_private_key( 

2132 evp_pkey, unsafe_skip_rsa_key_validation=False 

2133 ) 

2134 

2135 if x509_ptr[0] != self._ffi.NULL: 

2136 x509 = self._ffi.gc(x509_ptr[0], self._lib.X509_free) 

2137 cert_obj = self._ossl2cert(x509) 

2138 name = None 

2139 maybe_name = self._lib.X509_alias_get0(x509, self._ffi.NULL) 

2140 if maybe_name != self._ffi.NULL: 

2141 name = self._ffi.string(maybe_name) 

2142 cert = PKCS12Certificate(cert_obj, name) 

2143 

2144 if sk_x509_ptr[0] != self._ffi.NULL: 

2145 sk_x509 = self._ffi.gc(sk_x509_ptr[0], self._lib.sk_X509_free) 

2146 num = self._lib.sk_X509_num(sk_x509_ptr[0]) 

2147 

2148 # In OpenSSL < 3.0.0 PKCS12 parsing reverses the order of the 

2149 # certificates. 

2150 indices: typing.Iterable[int] 

2151 if ( 

2152 self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER 

2153 or self._lib.CRYPTOGRAPHY_IS_BORINGSSL 

2154 ): 

2155 indices = range(num) 

2156 else: 

2157 indices = reversed(range(num)) 

2158 

2159 for i in indices: 

2160 x509 = self._lib.sk_X509_value(sk_x509, i) 

2161 self.openssl_assert(x509 != self._ffi.NULL) 

2162 x509 = self._ffi.gc(x509, self._lib.X509_free) 

2163 addl_cert = self._ossl2cert(x509) 

2164 addl_name = None 

2165 maybe_name = self._lib.X509_alias_get0(x509, self._ffi.NULL) 

2166 if maybe_name != self._ffi.NULL: 

2167 addl_name = self._ffi.string(maybe_name) 

2168 additional_certificates.append( 

2169 PKCS12Certificate(addl_cert, addl_name) 

2170 ) 

2171 

2172 return PKCS12KeyAndCertificates(key, cert, additional_certificates) 

2173 

2174 def serialize_key_and_certificates_to_pkcs12( 

2175 self, 

2176 name: typing.Optional[bytes], 

2177 key: typing.Optional[PKCS12PrivateKeyTypes], 

2178 cert: typing.Optional[x509.Certificate], 

2179 cas: typing.Optional[typing.List[_PKCS12CATypes]], 

2180 encryption_algorithm: serialization.KeySerializationEncryption, 

2181 ) -> bytes: 

2182 password = None 

2183 if name is not None: 

2184 utils._check_bytes("name", name) 

2185 

2186 if isinstance(encryption_algorithm, serialization.NoEncryption): 

2187 nid_cert = -1 

2188 nid_key = -1 

2189 pkcs12_iter = 0 

2190 mac_iter = 0 

2191 mac_alg = self._ffi.NULL 

2192 elif isinstance( 

2193 encryption_algorithm, serialization.BestAvailableEncryption 

2194 ): 

2195 # PKCS12 encryption is hopeless trash and can never be fixed. 

2196 # OpenSSL 3 supports PBESv2, but Libre and Boring do not, so 

2197 # we use PBESv1 with 3DES on the older paths. 

2198 if self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER: 

2199 nid_cert = self._lib.NID_aes_256_cbc 

2200 nid_key = self._lib.NID_aes_256_cbc 

2201 else: 

2202 nid_cert = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC 

2203 nid_key = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC 

2204 # At least we can set this higher than OpenSSL's default 

2205 pkcs12_iter = 20000 

2206 # mac_iter chosen for compatibility reasons, see: 

2207 # https://www.openssl.org/docs/man1.1.1/man3/PKCS12_create.html 

2208 # Did we mention how lousy PKCS12 encryption is? 

2209 mac_iter = 1 

2210 # MAC algorithm can only be set on OpenSSL 3.0.0+ 

2211 mac_alg = self._ffi.NULL 

2212 password = encryption_algorithm.password 

2213 elif ( 

2214 isinstance( 

2215 encryption_algorithm, serialization._KeySerializationEncryption 

2216 ) 

2217 and encryption_algorithm._format 

2218 is serialization.PrivateFormat.PKCS12 

2219 ): 

2220 # Default to OpenSSL's defaults. Behavior will vary based on the 

2221 # version of OpenSSL cryptography is compiled against. 

2222 nid_cert = 0 

2223 nid_key = 0 

2224 # Use the default iters we use in best available 

2225 pkcs12_iter = 20000 

2226 # See the Best Available comment for why this is 1 

2227 mac_iter = 1 

2228 password = encryption_algorithm.password 

2229 keycertalg = encryption_algorithm._key_cert_algorithm 

2230 if keycertalg is PBES.PBESv1SHA1And3KeyTripleDESCBC: 

2231 nid_cert = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC 

2232 nid_key = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC 

2233 elif keycertalg is PBES.PBESv2SHA256AndAES256CBC: 

2234 if not self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER: 

2235 raise UnsupportedAlgorithm( 

2236 "PBESv2 is not supported by this version of OpenSSL" 

2237 ) 

2238 nid_cert = self._lib.NID_aes_256_cbc 

2239 nid_key = self._lib.NID_aes_256_cbc 

2240 else: 

2241 assert keycertalg is None 

2242 # We use OpenSSL's defaults 

2243 

2244 if encryption_algorithm._hmac_hash is not None: 

2245 if not self._lib.Cryptography_HAS_PKCS12_SET_MAC: 

2246 raise UnsupportedAlgorithm( 

2247 "Setting MAC algorithm is not supported by this " 

2248 "version of OpenSSL." 

2249 ) 

2250 mac_alg = self._evp_md_non_null_from_algorithm( 

2251 encryption_algorithm._hmac_hash 

2252 ) 

2253 self.openssl_assert(mac_alg != self._ffi.NULL) 

2254 else: 

2255 mac_alg = self._ffi.NULL 

2256 

2257 if encryption_algorithm._kdf_rounds is not None: 

2258 pkcs12_iter = encryption_algorithm._kdf_rounds 

2259 

2260 else: 

2261 raise ValueError("Unsupported key encryption type") 

2262 

2263 if cas is None or len(cas) == 0: 

2264 sk_x509 = self._ffi.NULL 

2265 else: 

2266 sk_x509 = self._lib.sk_X509_new_null() 

2267 sk_x509 = self._ffi.gc(sk_x509, self._lib.sk_X509_free) 

2268 

2269 # This list is to keep the x509 values alive until end of function 

2270 ossl_cas = [] 

2271 for ca in cas: 

2272 if isinstance(ca, PKCS12Certificate): 

2273 ca_alias = ca.friendly_name 

2274 ossl_ca = self._cert2ossl(ca.certificate) 

2275 if ca_alias is None: 

2276 res = self._lib.X509_alias_set1( 

2277 ossl_ca, self._ffi.NULL, -1 

2278 ) 

2279 else: 

2280 res = self._lib.X509_alias_set1( 

2281 ossl_ca, ca_alias, len(ca_alias) 

2282 ) 

2283 self.openssl_assert(res == 1) 

2284 else: 

2285 ossl_ca = self._cert2ossl(ca) 

2286 ossl_cas.append(ossl_ca) 

2287 res = self._lib.sk_X509_push(sk_x509, ossl_ca) 

2288 backend.openssl_assert(res >= 1) 

2289 

2290 with self._zeroed_null_terminated_buf(password) as password_buf: 

2291 with self._zeroed_null_terminated_buf(name) as name_buf: 

2292 ossl_cert = self._cert2ossl(cert) if cert else self._ffi.NULL 

2293 if key is not None: 

2294 evp_pkey = key._evp_pkey # type: ignore[union-attr] 

2295 else: 

2296 evp_pkey = self._ffi.NULL 

2297 

2298 p12 = self._lib.PKCS12_create( 

2299 password_buf, 

2300 name_buf, 

2301 evp_pkey, 

2302 ossl_cert, 

2303 sk_x509, 

2304 nid_key, 

2305 nid_cert, 

2306 pkcs12_iter, 

2307 mac_iter, 

2308 0, 

2309 ) 

2310 

2311 if ( 

2312 self._lib.Cryptography_HAS_PKCS12_SET_MAC 

2313 and mac_alg != self._ffi.NULL 

2314 ): 

2315 self._lib.PKCS12_set_mac( 

2316 p12, 

2317 password_buf, 

2318 -1, 

2319 self._ffi.NULL, 

2320 0, 

2321 mac_iter, 

2322 mac_alg, 

2323 ) 

2324 

2325 self.openssl_assert(p12 != self._ffi.NULL) 

2326 p12 = self._ffi.gc(p12, self._lib.PKCS12_free) 

2327 

2328 bio = self._create_mem_bio_gc() 

2329 res = self._lib.i2d_PKCS12_bio(bio, p12) 

2330 self.openssl_assert(res > 0) 

2331 return self._read_mem_bio(bio) 

2332 

2333 def poly1305_supported(self) -> bool: 

2334 if self._fips_enabled: 

2335 return False 

2336 return self._lib.Cryptography_HAS_POLY1305 == 1 

2337 

2338 def create_poly1305_ctx(self, key: bytes) -> _Poly1305Context: 

2339 utils._check_byteslike("key", key) 

2340 if len(key) != _POLY1305_KEY_SIZE: 

2341 raise ValueError("A poly1305 key is 32 bytes long") 

2342 

2343 return _Poly1305Context(self, key) 

2344 

2345 def pkcs7_supported(self) -> bool: 

2346 return not self._lib.CRYPTOGRAPHY_IS_BORINGSSL 

2347 

2348 def load_pem_pkcs7_certificates( 

2349 self, data: bytes 

2350 ) -> typing.List[x509.Certificate]: 

2351 utils._check_bytes("data", data) 

2352 bio = self._bytes_to_bio(data) 

2353 p7 = self._lib.PEM_read_bio_PKCS7( 

2354 bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL 

2355 ) 

2356 if p7 == self._ffi.NULL: 

2357 self._consume_errors() 

2358 raise ValueError("Unable to parse PKCS7 data") 

2359 

2360 p7 = self._ffi.gc(p7, self._lib.PKCS7_free) 

2361 return self._load_pkcs7_certificates(p7) 

2362 

2363 def load_der_pkcs7_certificates( 

2364 self, data: bytes 

2365 ) -> typing.List[x509.Certificate]: 

2366 utils._check_bytes("data", data) 

2367 bio = self._bytes_to_bio(data) 

2368 p7 = self._lib.d2i_PKCS7_bio(bio.bio, self._ffi.NULL) 

2369 if p7 == self._ffi.NULL: 

2370 self._consume_errors() 

2371 raise ValueError("Unable to parse PKCS7 data") 

2372 

2373 p7 = self._ffi.gc(p7, self._lib.PKCS7_free) 

2374 return self._load_pkcs7_certificates(p7) 

2375 

2376 def _load_pkcs7_certificates(self, p7) -> typing.List[x509.Certificate]: 

2377 nid = self._lib.OBJ_obj2nid(p7.type) 

2378 self.openssl_assert(nid != self._lib.NID_undef) 

2379 if nid != self._lib.NID_pkcs7_signed: 

2380 raise UnsupportedAlgorithm( 

2381 "Only basic signed structures are currently supported. NID" 

2382 " for this data was {}".format(nid), 

2383 _Reasons.UNSUPPORTED_SERIALIZATION, 

2384 ) 

2385 

2386 sk_x509 = p7.d.sign.cert 

2387 num = self._lib.sk_X509_num(sk_x509) 

2388 certs = [] 

2389 for i in range(num): 

2390 x509 = self._lib.sk_X509_value(sk_x509, i) 

2391 self.openssl_assert(x509 != self._ffi.NULL) 

2392 cert = self._ossl2cert(x509) 

2393 certs.append(cert) 

2394 

2395 return certs 

2396 

2397 

2398class GetCipherByName: 

2399 def __init__(self, fmt: str): 

2400 self._fmt = fmt 

2401 

2402 def __call__(self, backend: Backend, cipher: CipherAlgorithm, mode: Mode): 

2403 cipher_name = self._fmt.format(cipher=cipher, mode=mode).lower() 

2404 evp_cipher = backend._lib.EVP_get_cipherbyname( 

2405 cipher_name.encode("ascii") 

2406 ) 

2407 

2408 # try EVP_CIPHER_fetch if present 

2409 if ( 

2410 evp_cipher == backend._ffi.NULL 

2411 and backend._lib.Cryptography_HAS_300_EVP_CIPHER 

2412 ): 

2413 evp_cipher = backend._lib.EVP_CIPHER_fetch( 

2414 backend._ffi.NULL, 

2415 cipher_name.encode("ascii"), 

2416 backend._ffi.NULL, 

2417 ) 

2418 

2419 backend._consume_errors() 

2420 return evp_cipher 

2421 

2422 

2423def _get_xts_cipher(backend: Backend, cipher: AES, mode): 

2424 cipher_name = f"aes-{cipher.key_size // 2}-xts" 

2425 return backend._lib.EVP_get_cipherbyname(cipher_name.encode("ascii")) 

2426 

2427 

2428backend = Backend()