Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/cryptography/hazmat/backends/openssl/backend.py: 24%

917 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:50 +0000

1# This file is dual licensed under the terms of the Apache License, Version 

2# 2.0, and the BSD License. See the LICENSE file in the root of this repository 

3# for complete details. 

4 

5from __future__ import annotations 

6 

7import collections 

8import contextlib 

9import itertools 

10import typing 

11from contextlib import contextmanager 

12 

13from cryptography import utils, x509 

14from cryptography.exceptions import UnsupportedAlgorithm, _Reasons 

15from cryptography.hazmat.backends.openssl import aead 

16from cryptography.hazmat.backends.openssl.ciphers import _CipherContext 

17from cryptography.hazmat.backends.openssl.cmac import _CMACContext 

18from cryptography.hazmat.backends.openssl.ec import ( 

19 _EllipticCurvePrivateKey, 

20 _EllipticCurvePublicKey, 

21) 

22from cryptography.hazmat.backends.openssl.rsa import ( 

23 _RSAPrivateKey, 

24 _RSAPublicKey, 

25) 

26from cryptography.hazmat.bindings._rust import openssl as rust_openssl 

27from cryptography.hazmat.bindings.openssl import binding 

28from cryptography.hazmat.primitives import hashes, serialization 

29from cryptography.hazmat.primitives._asymmetric import AsymmetricPadding 

30from cryptography.hazmat.primitives.asymmetric import ( 

31 dh, 

32 dsa, 

33 ec, 

34 ed448, 

35 ed25519, 

36 rsa, 

37 x448, 

38 x25519, 

39) 

40from cryptography.hazmat.primitives.asymmetric.padding import ( 

41 MGF1, 

42 OAEP, 

43 PSS, 

44 PKCS1v15, 

45) 

46from cryptography.hazmat.primitives.asymmetric.types import ( 

47 PrivateKeyTypes, 

48 PublicKeyTypes, 

49) 

50from cryptography.hazmat.primitives.ciphers import ( 

51 BlockCipherAlgorithm, 

52 CipherAlgorithm, 

53) 

54from cryptography.hazmat.primitives.ciphers.algorithms import ( 

55 AES, 

56 AES128, 

57 AES256, 

58 ARC4, 

59 SM4, 

60 Camellia, 

61 ChaCha20, 

62 TripleDES, 

63 _BlowfishInternal, 

64 _CAST5Internal, 

65 _IDEAInternal, 

66 _SEEDInternal, 

67) 

68from cryptography.hazmat.primitives.ciphers.modes import ( 

69 CBC, 

70 CFB, 

71 CFB8, 

72 CTR, 

73 ECB, 

74 GCM, 

75 OFB, 

76 XTS, 

77 Mode, 

78) 

79from cryptography.hazmat.primitives.serialization import ssh 

80from cryptography.hazmat.primitives.serialization.pkcs12 import ( 

81 PBES, 

82 PKCS12Certificate, 

83 PKCS12KeyAndCertificates, 

84 PKCS12PrivateKeyTypes, 

85 _PKCS12CATypes, 

86) 

87 

88_MemoryBIO = collections.namedtuple("_MemoryBIO", ["bio", "char_ptr"]) 

89 

90 

91# Not actually supported, just used as a marker for some serialization tests. 

92class _RC2: 

93 pass 

94 

95 

96class Backend: 

97 """ 

98 OpenSSL API binding interfaces. 

99 """ 

100 

101 name = "openssl" 

102 

103 # FIPS has opinions about acceptable algorithms and key sizes, but the 

104 # disallowed algorithms are still present in OpenSSL. They just error if 

105 # you try to use them. To avoid that we allowlist the algorithms in 

106 # FIPS 140-3. This isn't ideal, but FIPS 140-3 is trash so here we are. 

107 _fips_aead = { 

108 b"aes-128-ccm", 

109 b"aes-192-ccm", 

110 b"aes-256-ccm", 

111 b"aes-128-gcm", 

112 b"aes-192-gcm", 

113 b"aes-256-gcm", 

114 } 

115 # TripleDES encryption is disallowed/deprecated throughout 2023 in 

116 # FIPS 140-3. To keep it simple we denylist any use of TripleDES (TDEA). 

117 _fips_ciphers = (AES,) 

118 # Sometimes SHA1 is still permissible. That logic is contained 

119 # within the various *_supported methods. 

120 _fips_hashes = ( 

121 hashes.SHA224, 

122 hashes.SHA256, 

123 hashes.SHA384, 

124 hashes.SHA512, 

125 hashes.SHA512_224, 

126 hashes.SHA512_256, 

127 hashes.SHA3_224, 

128 hashes.SHA3_256, 

129 hashes.SHA3_384, 

130 hashes.SHA3_512, 

131 hashes.SHAKE128, 

132 hashes.SHAKE256, 

133 ) 

134 _fips_ecdh_curves = ( 

135 ec.SECP224R1, 

136 ec.SECP256R1, 

137 ec.SECP384R1, 

138 ec.SECP521R1, 

139 ) 

140 _fips_rsa_min_key_size = 2048 

141 _fips_rsa_min_public_exponent = 65537 

142 _fips_dsa_min_modulus = 1 << 2048 

143 _fips_dh_min_key_size = 2048 

144 _fips_dh_min_modulus = 1 << _fips_dh_min_key_size 

145 

146 def __init__(self) -> None: 

147 self._binding = binding.Binding() 

148 self._ffi = self._binding.ffi 

149 self._lib = self._binding.lib 

150 self._fips_enabled = rust_openssl.is_fips_enabled() 

151 

152 self._cipher_registry: typing.Dict[ 

153 typing.Tuple[typing.Type[CipherAlgorithm], typing.Type[Mode]], 

154 typing.Callable, 

155 ] = {} 

156 self._register_default_ciphers() 

157 self._dh_types = [self._lib.EVP_PKEY_DH] 

158 if self._lib.Cryptography_HAS_EVP_PKEY_DHX: 

159 self._dh_types.append(self._lib.EVP_PKEY_DHX) 

160 

161 def __repr__(self) -> str: 

162 return "<OpenSSLBackend(version: {}, FIPS: {}, Legacy: {})>".format( 

163 self.openssl_version_text(), 

164 self._fips_enabled, 

165 self._binding._legacy_provider_loaded, 

166 ) 

167 

168 def openssl_assert( 

169 self, 

170 ok: bool, 

171 errors: typing.Optional[typing.List[rust_openssl.OpenSSLError]] = None, 

172 ) -> None: 

173 return binding._openssl_assert(self._lib, ok, errors=errors) 

174 

175 def _enable_fips(self) -> None: 

176 # This function enables FIPS mode for OpenSSL 3.0.0 on installs that 

177 # have the FIPS provider installed properly. 

178 self._binding._enable_fips() 

179 assert rust_openssl.is_fips_enabled() 

180 self._fips_enabled = rust_openssl.is_fips_enabled() 

181 

182 def openssl_version_text(self) -> str: 

183 """ 

184 Friendly string name of the loaded OpenSSL library. This is not 

185 necessarily the same version as it was compiled against. 

186 

187 Example: OpenSSL 1.1.1d 10 Sep 2019 

188 """ 

189 return self._ffi.string( 

190 self._lib.OpenSSL_version(self._lib.OPENSSL_VERSION) 

191 ).decode("ascii") 

192 

193 def openssl_version_number(self) -> int: 

194 return self._lib.OpenSSL_version_num() 

195 

196 def _evp_md_from_algorithm(self, algorithm: hashes.HashAlgorithm): 

197 if algorithm.name == "blake2b" or algorithm.name == "blake2s": 

198 alg = "{}{}".format( 

199 algorithm.name, algorithm.digest_size * 8 

200 ).encode("ascii") 

201 else: 

202 alg = algorithm.name.encode("ascii") 

203 

204 evp_md = self._lib.EVP_get_digestbyname(alg) 

205 return evp_md 

206 

207 def _evp_md_non_null_from_algorithm(self, algorithm: hashes.HashAlgorithm): 

208 evp_md = self._evp_md_from_algorithm(algorithm) 

209 self.openssl_assert(evp_md != self._ffi.NULL) 

210 return evp_md 

211 

212 def hash_supported(self, algorithm: hashes.HashAlgorithm) -> bool: 

213 if self._fips_enabled and not isinstance(algorithm, self._fips_hashes): 

214 return False 

215 

216 evp_md = self._evp_md_from_algorithm(algorithm) 

217 return evp_md != self._ffi.NULL 

218 

219 def signature_hash_supported( 

220 self, algorithm: hashes.HashAlgorithm 

221 ) -> bool: 

222 # Dedicated check for hashing algorithm use in message digest for 

223 # signatures, e.g. RSA PKCS#1 v1.5 SHA1 (sha1WithRSAEncryption). 

224 if self._fips_enabled and isinstance(algorithm, hashes.SHA1): 

225 return False 

226 return self.hash_supported(algorithm) 

227 

228 def scrypt_supported(self) -> bool: 

229 if self._fips_enabled: 

230 return False 

231 else: 

232 return self._lib.Cryptography_HAS_SCRYPT == 1 

233 

234 def hmac_supported(self, algorithm: hashes.HashAlgorithm) -> bool: 

235 # FIPS mode still allows SHA1 for HMAC 

236 if self._fips_enabled and isinstance(algorithm, hashes.SHA1): 

237 return True 

238 

239 return self.hash_supported(algorithm) 

240 

241 def cipher_supported(self, cipher: CipherAlgorithm, mode: Mode) -> bool: 

242 if self._fips_enabled: 

243 # FIPS mode requires AES. TripleDES is disallowed/deprecated in 

244 # FIPS 140-3. 

245 if not isinstance(cipher, self._fips_ciphers): 

246 return False 

247 

248 try: 

249 adapter = self._cipher_registry[type(cipher), type(mode)] 

250 except KeyError: 

251 return False 

252 evp_cipher = adapter(self, cipher, mode) 

253 return self._ffi.NULL != evp_cipher 

254 

255 def register_cipher_adapter(self, cipher_cls, mode_cls, adapter) -> None: 

256 if (cipher_cls, mode_cls) in self._cipher_registry: 

257 raise ValueError( 

258 "Duplicate registration for: {} {}.".format( 

259 cipher_cls, mode_cls 

260 ) 

261 ) 

262 self._cipher_registry[cipher_cls, mode_cls] = adapter 

263 

264 def _register_default_ciphers(self) -> None: 

265 for cipher_cls in [AES, AES128, AES256]: 

266 for mode_cls in [CBC, CTR, ECB, OFB, CFB, CFB8, GCM]: 

267 self.register_cipher_adapter( 

268 cipher_cls, 

269 mode_cls, 

270 GetCipherByName( 

271 "{cipher.name}-{cipher.key_size}-{mode.name}" 

272 ), 

273 ) 

274 for mode_cls in [CBC, CTR, ECB, OFB, CFB]: 

275 self.register_cipher_adapter( 

276 Camellia, 

277 mode_cls, 

278 GetCipherByName("{cipher.name}-{cipher.key_size}-{mode.name}"), 

279 ) 

280 for mode_cls in [CBC, CFB, CFB8, OFB]: 

281 self.register_cipher_adapter( 

282 TripleDES, mode_cls, GetCipherByName("des-ede3-{mode.name}") 

283 ) 

284 self.register_cipher_adapter( 

285 TripleDES, ECB, GetCipherByName("des-ede3") 

286 ) 

287 self.register_cipher_adapter( 

288 ChaCha20, type(None), GetCipherByName("chacha20") 

289 ) 

290 self.register_cipher_adapter(AES, XTS, _get_xts_cipher) 

291 for mode_cls in [ECB, CBC, OFB, CFB, CTR]: 

292 self.register_cipher_adapter( 

293 SM4, mode_cls, GetCipherByName("sm4-{mode.name}") 

294 ) 

295 # Don't register legacy ciphers if they're unavailable. Hypothetically 

296 # this wouldn't be necessary because we test availability by seeing if 

297 # we get an EVP_CIPHER * in the _CipherContext __init__, but OpenSSL 3 

298 # will return a valid pointer even though the cipher is unavailable. 

299 if ( 

300 self._binding._legacy_provider_loaded 

301 or not self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER 

302 ): 

303 for mode_cls in [CBC, CFB, OFB, ECB]: 

304 self.register_cipher_adapter( 

305 _BlowfishInternal, 

306 mode_cls, 

307 GetCipherByName("bf-{mode.name}"), 

308 ) 

309 for mode_cls in [CBC, CFB, OFB, ECB]: 

310 self.register_cipher_adapter( 

311 _SEEDInternal, 

312 mode_cls, 

313 GetCipherByName("seed-{mode.name}"), 

314 ) 

315 for cipher_cls, mode_cls in itertools.product( 

316 [_CAST5Internal, _IDEAInternal], 

317 [CBC, OFB, CFB, ECB], 

318 ): 

319 self.register_cipher_adapter( 

320 cipher_cls, 

321 mode_cls, 

322 GetCipherByName("{cipher.name}-{mode.name}"), 

323 ) 

324 self.register_cipher_adapter( 

325 ARC4, type(None), GetCipherByName("rc4") 

326 ) 

327 # We don't actually support RC2, this is just used by some tests. 

328 self.register_cipher_adapter( 

329 _RC2, type(None), GetCipherByName("rc2") 

330 ) 

331 

332 def create_symmetric_encryption_ctx( 

333 self, cipher: CipherAlgorithm, mode: Mode 

334 ) -> _CipherContext: 

335 return _CipherContext(self, cipher, mode, _CipherContext._ENCRYPT) 

336 

337 def create_symmetric_decryption_ctx( 

338 self, cipher: CipherAlgorithm, mode: Mode 

339 ) -> _CipherContext: 

340 return _CipherContext(self, cipher, mode, _CipherContext._DECRYPT) 

341 

342 def pbkdf2_hmac_supported(self, algorithm: hashes.HashAlgorithm) -> bool: 

343 return self.hmac_supported(algorithm) 

344 

345 def _consume_errors(self) -> typing.List[rust_openssl.OpenSSLError]: 

346 return rust_openssl.capture_error_stack() 

347 

348 def _bn_to_int(self, bn) -> int: 

349 assert bn != self._ffi.NULL 

350 self.openssl_assert(not self._lib.BN_is_negative(bn)) 

351 

352 bn_num_bytes = self._lib.BN_num_bytes(bn) 

353 bin_ptr = self._ffi.new("unsigned char[]", bn_num_bytes) 

354 bin_len = self._lib.BN_bn2bin(bn, bin_ptr) 

355 # A zero length means the BN has value 0 

356 self.openssl_assert(bin_len >= 0) 

357 val = int.from_bytes(self._ffi.buffer(bin_ptr)[:bin_len], "big") 

358 return val 

359 

360 def _int_to_bn(self, num: int): 

361 """ 

362 Converts a python integer to a BIGNUM. The returned BIGNUM will not 

363 be garbage collected (to support adding them to structs that take 

364 ownership of the object). Be sure to register it for GC if it will 

365 be discarded after use. 

366 """ 

367 binary = num.to_bytes(int(num.bit_length() / 8.0 + 1), "big") 

368 bn_ptr = self._lib.BN_bin2bn(binary, len(binary), self._ffi.NULL) 

369 self.openssl_assert(bn_ptr != self._ffi.NULL) 

370 return bn_ptr 

371 

372 def generate_rsa_private_key( 

373 self, public_exponent: int, key_size: int 

374 ) -> rsa.RSAPrivateKey: 

375 rsa._verify_rsa_parameters(public_exponent, key_size) 

376 

377 rsa_cdata = self._lib.RSA_new() 

378 self.openssl_assert(rsa_cdata != self._ffi.NULL) 

379 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) 

380 

381 bn = self._int_to_bn(public_exponent) 

382 bn = self._ffi.gc(bn, self._lib.BN_free) 

383 

384 res = self._lib.RSA_generate_key_ex( 

385 rsa_cdata, key_size, bn, self._ffi.NULL 

386 ) 

387 self.openssl_assert(res == 1) 

388 evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata) 

389 

390 # We can skip RSA key validation here since we just generated the key 

391 return _RSAPrivateKey( 

392 self, rsa_cdata, evp_pkey, unsafe_skip_rsa_key_validation=True 

393 ) 

394 

395 def generate_rsa_parameters_supported( 

396 self, public_exponent: int, key_size: int 

397 ) -> bool: 

398 return ( 

399 public_exponent >= 3 

400 and public_exponent & 1 != 0 

401 and key_size >= 512 

402 ) 

403 

404 def load_rsa_private_numbers( 

405 self, 

406 numbers: rsa.RSAPrivateNumbers, 

407 unsafe_skip_rsa_key_validation: bool, 

408 ) -> rsa.RSAPrivateKey: 

409 rsa._check_private_key_components( 

410 numbers.p, 

411 numbers.q, 

412 numbers.d, 

413 numbers.dmp1, 

414 numbers.dmq1, 

415 numbers.iqmp, 

416 numbers.public_numbers.e, 

417 numbers.public_numbers.n, 

418 ) 

419 rsa_cdata = self._lib.RSA_new() 

420 self.openssl_assert(rsa_cdata != self._ffi.NULL) 

421 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) 

422 p = self._int_to_bn(numbers.p) 

423 q = self._int_to_bn(numbers.q) 

424 d = self._int_to_bn(numbers.d) 

425 dmp1 = self._int_to_bn(numbers.dmp1) 

426 dmq1 = self._int_to_bn(numbers.dmq1) 

427 iqmp = self._int_to_bn(numbers.iqmp) 

428 e = self._int_to_bn(numbers.public_numbers.e) 

429 n = self._int_to_bn(numbers.public_numbers.n) 

430 res = self._lib.RSA_set0_factors(rsa_cdata, p, q) 

431 self.openssl_assert(res == 1) 

432 res = self._lib.RSA_set0_key(rsa_cdata, n, e, d) 

433 self.openssl_assert(res == 1) 

434 res = self._lib.RSA_set0_crt_params(rsa_cdata, dmp1, dmq1, iqmp) 

435 self.openssl_assert(res == 1) 

436 evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata) 

437 

438 return _RSAPrivateKey( 

439 self, 

440 rsa_cdata, 

441 evp_pkey, 

442 unsafe_skip_rsa_key_validation=unsafe_skip_rsa_key_validation, 

443 ) 

444 

445 def load_rsa_public_numbers( 

446 self, numbers: rsa.RSAPublicNumbers 

447 ) -> rsa.RSAPublicKey: 

448 rsa._check_public_key_components(numbers.e, numbers.n) 

449 rsa_cdata = self._lib.RSA_new() 

450 self.openssl_assert(rsa_cdata != self._ffi.NULL) 

451 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) 

452 e = self._int_to_bn(numbers.e) 

453 n = self._int_to_bn(numbers.n) 

454 res = self._lib.RSA_set0_key(rsa_cdata, n, e, self._ffi.NULL) 

455 self.openssl_assert(res == 1) 

456 evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata) 

457 

458 return _RSAPublicKey(self, rsa_cdata, evp_pkey) 

459 

460 def _create_evp_pkey_gc(self): 

461 evp_pkey = self._lib.EVP_PKEY_new() 

462 self.openssl_assert(evp_pkey != self._ffi.NULL) 

463 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 

464 return evp_pkey 

465 

466 def _rsa_cdata_to_evp_pkey(self, rsa_cdata): 

467 evp_pkey = self._create_evp_pkey_gc() 

468 res = self._lib.EVP_PKEY_set1_RSA(evp_pkey, rsa_cdata) 

469 self.openssl_assert(res == 1) 

470 return evp_pkey 

471 

472 def _bytes_to_bio(self, data: bytes) -> _MemoryBIO: 

473 """ 

474 Return a _MemoryBIO namedtuple of (BIO, char*). 

475 

476 The char* is the storage for the BIO and it must stay alive until the 

477 BIO is finished with. 

478 """ 

479 data_ptr = self._ffi.from_buffer(data) 

480 bio = self._lib.BIO_new_mem_buf(data_ptr, len(data)) 

481 self.openssl_assert(bio != self._ffi.NULL) 

482 

483 return _MemoryBIO(self._ffi.gc(bio, self._lib.BIO_free), data_ptr) 

484 

485 def _create_mem_bio_gc(self): 

486 """ 

487 Creates an empty memory BIO. 

488 """ 

489 bio_method = self._lib.BIO_s_mem() 

490 self.openssl_assert(bio_method != self._ffi.NULL) 

491 bio = self._lib.BIO_new(bio_method) 

492 self.openssl_assert(bio != self._ffi.NULL) 

493 bio = self._ffi.gc(bio, self._lib.BIO_free) 

494 return bio 

495 

496 def _read_mem_bio(self, bio) -> bytes: 

497 """ 

498 Reads a memory BIO. This only works on memory BIOs. 

499 """ 

500 buf = self._ffi.new("char **") 

501 buf_len = self._lib.BIO_get_mem_data(bio, buf) 

502 self.openssl_assert(buf_len > 0) 

503 self.openssl_assert(buf[0] != self._ffi.NULL) 

504 bio_data = self._ffi.buffer(buf[0], buf_len)[:] 

505 return bio_data 

506 

507 def _evp_pkey_to_private_key( 

508 self, evp_pkey, unsafe_skip_rsa_key_validation: bool 

509 ) -> PrivateKeyTypes: 

510 """ 

511 Return the appropriate type of PrivateKey given an evp_pkey cdata 

512 pointer. 

513 """ 

514 

515 key_type = self._lib.EVP_PKEY_id(evp_pkey) 

516 

517 if key_type == self._lib.EVP_PKEY_RSA: 

518 rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey) 

519 self.openssl_assert(rsa_cdata != self._ffi.NULL) 

520 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) 

521 return _RSAPrivateKey( 

522 self, 

523 rsa_cdata, 

524 evp_pkey, 

525 unsafe_skip_rsa_key_validation=unsafe_skip_rsa_key_validation, 

526 ) 

527 elif ( 

528 key_type == self._lib.EVP_PKEY_RSA_PSS 

529 and not self._lib.CRYPTOGRAPHY_IS_LIBRESSL 

530 and not self._lib.CRYPTOGRAPHY_IS_BORINGSSL 

531 and not self._lib.CRYPTOGRAPHY_OPENSSL_LESS_THAN_111E 

532 ): 

533 # At the moment the way we handle RSA PSS keys is to strip the 

534 # PSS constraints from them and treat them as normal RSA keys 

535 # Unfortunately the RSA * itself tracks this data so we need to 

536 # extract, serialize, and reload it without the constraints. 

537 rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey) 

538 self.openssl_assert(rsa_cdata != self._ffi.NULL) 

539 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) 

540 bio = self._create_mem_bio_gc() 

541 res = self._lib.i2d_RSAPrivateKey_bio(bio, rsa_cdata) 

542 self.openssl_assert(res == 1) 

543 return self.load_der_private_key( 

544 self._read_mem_bio(bio), 

545 password=None, 

546 unsafe_skip_rsa_key_validation=unsafe_skip_rsa_key_validation, 

547 ) 

548 elif key_type == self._lib.EVP_PKEY_DSA: 

549 return rust_openssl.dsa.private_key_from_ptr( 

550 int(self._ffi.cast("uintptr_t", evp_pkey)) 

551 ) 

552 elif key_type == self._lib.EVP_PKEY_EC: 

553 ec_cdata = self._lib.EVP_PKEY_get1_EC_KEY(evp_pkey) 

554 self.openssl_assert(ec_cdata != self._ffi.NULL) 

555 ec_cdata = self._ffi.gc(ec_cdata, self._lib.EC_KEY_free) 

556 return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey) 

557 elif key_type in self._dh_types: 

558 return rust_openssl.dh.private_key_from_ptr( 

559 int(self._ffi.cast("uintptr_t", evp_pkey)) 

560 ) 

561 elif key_type == getattr(self._lib, "EVP_PKEY_ED25519", None): 

562 # EVP_PKEY_ED25519 is not present in CRYPTOGRAPHY_IS_LIBRESSL 

563 return rust_openssl.ed25519.private_key_from_ptr( 

564 int(self._ffi.cast("uintptr_t", evp_pkey)) 

565 ) 

566 elif key_type == getattr(self._lib, "EVP_PKEY_X448", None): 

567 # EVP_PKEY_X448 is not present in CRYPTOGRAPHY_IS_LIBRESSL 

568 return rust_openssl.x448.private_key_from_ptr( 

569 int(self._ffi.cast("uintptr_t", evp_pkey)) 

570 ) 

571 elif key_type == self._lib.EVP_PKEY_X25519: 

572 return rust_openssl.x25519.private_key_from_ptr( 

573 int(self._ffi.cast("uintptr_t", evp_pkey)) 

574 ) 

575 elif key_type == getattr(self._lib, "EVP_PKEY_ED448", None): 

576 # EVP_PKEY_ED448 is not present in CRYPTOGRAPHY_IS_LIBRESSL 

577 return rust_openssl.ed448.private_key_from_ptr( 

578 int(self._ffi.cast("uintptr_t", evp_pkey)) 

579 ) 

580 else: 

581 raise UnsupportedAlgorithm("Unsupported key type.") 

582 

583 def _evp_pkey_to_public_key(self, evp_pkey) -> PublicKeyTypes: 

584 """ 

585 Return the appropriate type of PublicKey given an evp_pkey cdata 

586 pointer. 

587 """ 

588 

589 key_type = self._lib.EVP_PKEY_id(evp_pkey) 

590 

591 if key_type == self._lib.EVP_PKEY_RSA: 

592 rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey) 

593 self.openssl_assert(rsa_cdata != self._ffi.NULL) 

594 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) 

595 return _RSAPublicKey(self, rsa_cdata, evp_pkey) 

596 elif ( 

597 key_type == self._lib.EVP_PKEY_RSA_PSS 

598 and not self._lib.CRYPTOGRAPHY_IS_LIBRESSL 

599 and not self._lib.CRYPTOGRAPHY_IS_BORINGSSL 

600 and not self._lib.CRYPTOGRAPHY_OPENSSL_LESS_THAN_111E 

601 ): 

602 rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey) 

603 self.openssl_assert(rsa_cdata != self._ffi.NULL) 

604 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) 

605 bio = self._create_mem_bio_gc() 

606 res = self._lib.i2d_RSAPublicKey_bio(bio, rsa_cdata) 

607 self.openssl_assert(res == 1) 

608 return self.load_der_public_key(self._read_mem_bio(bio)) 

609 elif key_type == self._lib.EVP_PKEY_DSA: 

610 return rust_openssl.dsa.public_key_from_ptr( 

611 int(self._ffi.cast("uintptr_t", evp_pkey)) 

612 ) 

613 elif key_type == self._lib.EVP_PKEY_EC: 

614 ec_cdata = self._lib.EVP_PKEY_get1_EC_KEY(evp_pkey) 

615 if ec_cdata == self._ffi.NULL: 

616 errors = self._consume_errors() 

617 raise ValueError("Unable to load EC key", errors) 

618 ec_cdata = self._ffi.gc(ec_cdata, self._lib.EC_KEY_free) 

619 return _EllipticCurvePublicKey(self, ec_cdata, evp_pkey) 

620 elif key_type in self._dh_types: 

621 return rust_openssl.dh.public_key_from_ptr( 

622 int(self._ffi.cast("uintptr_t", evp_pkey)) 

623 ) 

624 elif key_type == getattr(self._lib, "EVP_PKEY_ED25519", None): 

625 # EVP_PKEY_ED25519 is not present in CRYPTOGRAPHY_IS_LIBRESSL 

626 return rust_openssl.ed25519.public_key_from_ptr( 

627 int(self._ffi.cast("uintptr_t", evp_pkey)) 

628 ) 

629 elif key_type == getattr(self._lib, "EVP_PKEY_X448", None): 

630 # EVP_PKEY_X448 is not present in CRYPTOGRAPHY_IS_LIBRESSL 

631 return rust_openssl.x448.public_key_from_ptr( 

632 int(self._ffi.cast("uintptr_t", evp_pkey)) 

633 ) 

634 elif key_type == self._lib.EVP_PKEY_X25519: 

635 return rust_openssl.x25519.public_key_from_ptr( 

636 int(self._ffi.cast("uintptr_t", evp_pkey)) 

637 ) 

638 elif key_type == getattr(self._lib, "EVP_PKEY_ED448", None): 

639 # EVP_PKEY_ED448 is not present in CRYPTOGRAPHY_IS_LIBRESSL 

640 return rust_openssl.ed448.public_key_from_ptr( 

641 int(self._ffi.cast("uintptr_t", evp_pkey)) 

642 ) 

643 else: 

644 raise UnsupportedAlgorithm("Unsupported key type.") 

645 

646 def _oaep_hash_supported(self, algorithm: hashes.HashAlgorithm) -> bool: 

647 if self._fips_enabled and isinstance(algorithm, hashes.SHA1): 

648 return False 

649 

650 return isinstance( 

651 algorithm, 

652 ( 

653 hashes.SHA1, 

654 hashes.SHA224, 

655 hashes.SHA256, 

656 hashes.SHA384, 

657 hashes.SHA512, 

658 ), 

659 ) 

660 

661 def rsa_padding_supported(self, padding: AsymmetricPadding) -> bool: 

662 if isinstance(padding, PKCS1v15): 

663 return True 

664 elif isinstance(padding, PSS) and isinstance(padding._mgf, MGF1): 

665 # SHA1 is permissible in MGF1 in FIPS even when SHA1 is blocked 

666 # as signature algorithm. 

667 if self._fips_enabled and isinstance( 

668 padding._mgf._algorithm, hashes.SHA1 

669 ): 

670 return True 

671 else: 

672 return self.hash_supported(padding._mgf._algorithm) 

673 elif isinstance(padding, OAEP) and isinstance(padding._mgf, MGF1): 

674 return self._oaep_hash_supported( 

675 padding._mgf._algorithm 

676 ) and self._oaep_hash_supported(padding._algorithm) 

677 else: 

678 return False 

679 

680 def rsa_encryption_supported(self, padding: AsymmetricPadding) -> bool: 

681 if self._fips_enabled and isinstance(padding, PKCS1v15): 

682 return False 

683 else: 

684 return self.rsa_padding_supported(padding) 

685 

686 def generate_dsa_parameters(self, key_size: int) -> dsa.DSAParameters: 

687 if key_size not in (1024, 2048, 3072, 4096): 

688 raise ValueError( 

689 "Key size must be 1024, 2048, 3072, or 4096 bits." 

690 ) 

691 

692 return rust_openssl.dsa.generate_parameters(key_size) 

693 

694 def generate_dsa_private_key( 

695 self, parameters: dsa.DSAParameters 

696 ) -> dsa.DSAPrivateKey: 

697 return parameters.generate_private_key() 

698 

699 def generate_dsa_private_key_and_parameters( 

700 self, key_size: int 

701 ) -> dsa.DSAPrivateKey: 

702 parameters = self.generate_dsa_parameters(key_size) 

703 return self.generate_dsa_private_key(parameters) 

704 

705 def load_dsa_private_numbers( 

706 self, numbers: dsa.DSAPrivateNumbers 

707 ) -> dsa.DSAPrivateKey: 

708 dsa._check_dsa_private_numbers(numbers) 

709 return rust_openssl.dsa.from_private_numbers(numbers) 

710 

711 def load_dsa_public_numbers( 

712 self, numbers: dsa.DSAPublicNumbers 

713 ) -> dsa.DSAPublicKey: 

714 dsa._check_dsa_parameters(numbers.parameter_numbers) 

715 return rust_openssl.dsa.from_public_numbers(numbers) 

716 

717 def load_dsa_parameter_numbers( 

718 self, numbers: dsa.DSAParameterNumbers 

719 ) -> dsa.DSAParameters: 

720 dsa._check_dsa_parameters(numbers) 

721 return rust_openssl.dsa.from_parameter_numbers(numbers) 

722 

723 def dsa_supported(self) -> bool: 

724 return ( 

725 not self._lib.CRYPTOGRAPHY_IS_BORINGSSL and not self._fips_enabled 

726 ) 

727 

728 def dsa_hash_supported(self, algorithm: hashes.HashAlgorithm) -> bool: 

729 if not self.dsa_supported(): 

730 return False 

731 return self.signature_hash_supported(algorithm) 

732 

733 def cmac_algorithm_supported(self, algorithm) -> bool: 

734 return self.cipher_supported( 

735 algorithm, CBC(b"\x00" * algorithm.block_size) 

736 ) 

737 

738 def create_cmac_ctx(self, algorithm: BlockCipherAlgorithm) -> _CMACContext: 

739 return _CMACContext(self, algorithm) 

740 

741 def load_pem_private_key( 

742 self, 

743 data: bytes, 

744 password: typing.Optional[bytes], 

745 unsafe_skip_rsa_key_validation: bool, 

746 ) -> PrivateKeyTypes: 

747 return self._load_key( 

748 self._lib.PEM_read_bio_PrivateKey, 

749 data, 

750 password, 

751 unsafe_skip_rsa_key_validation, 

752 ) 

753 

754 def load_pem_public_key(self, data: bytes) -> PublicKeyTypes: 

755 mem_bio = self._bytes_to_bio(data) 

756 # In OpenSSL 3.0.x the PEM_read_bio_PUBKEY function will invoke 

757 # the default password callback if you pass an encrypted private 

758 # key. This is very, very, very bad as the default callback can 

759 # trigger an interactive console prompt, which will hang the 

760 # Python process. We therefore provide our own callback to 

761 # catch this and error out properly. 

762 userdata = self._ffi.new("CRYPTOGRAPHY_PASSWORD_DATA *") 

763 evp_pkey = self._lib.PEM_read_bio_PUBKEY( 

764 mem_bio.bio, 

765 self._ffi.NULL, 

766 self._ffi.addressof( 

767 self._lib._original_lib, "Cryptography_pem_password_cb" 

768 ), 

769 userdata, 

770 ) 

771 if evp_pkey != self._ffi.NULL: 

772 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 

773 return self._evp_pkey_to_public_key(evp_pkey) 

774 else: 

775 # It's not a (RSA/DSA/ECDSA) subjectPublicKeyInfo, but we still 

776 # need to check to see if it is a pure PKCS1 RSA public key (not 

777 # embedded in a subjectPublicKeyInfo) 

778 self._consume_errors() 

779 res = self._lib.BIO_reset(mem_bio.bio) 

780 self.openssl_assert(res == 1) 

781 rsa_cdata = self._lib.PEM_read_bio_RSAPublicKey( 

782 mem_bio.bio, 

783 self._ffi.NULL, 

784 self._ffi.addressof( 

785 self._lib._original_lib, "Cryptography_pem_password_cb" 

786 ), 

787 userdata, 

788 ) 

789 if rsa_cdata != self._ffi.NULL: 

790 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) 

791 evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata) 

792 return _RSAPublicKey(self, rsa_cdata, evp_pkey) 

793 else: 

794 self._handle_key_loading_error() 

795 

796 def load_pem_parameters(self, data: bytes) -> dh.DHParameters: 

797 return rust_openssl.dh.from_pem_parameters(data) 

798 

799 def load_der_private_key( 

800 self, 

801 data: bytes, 

802 password: typing.Optional[bytes], 

803 unsafe_skip_rsa_key_validation: bool, 

804 ) -> PrivateKeyTypes: 

805 # OpenSSL has a function called d2i_AutoPrivateKey that in theory 

806 # handles this automatically, however it doesn't handle encrypted 

807 # private keys. Instead we try to load the key two different ways. 

808 # First we'll try to load it as a traditional key. 

809 bio_data = self._bytes_to_bio(data) 

810 key = self._evp_pkey_from_der_traditional_key(bio_data, password) 

811 if key: 

812 return self._evp_pkey_to_private_key( 

813 key, unsafe_skip_rsa_key_validation 

814 ) 

815 else: 

816 # Finally we try to load it with the method that handles encrypted 

817 # PKCS8 properly. 

818 return self._load_key( 

819 self._lib.d2i_PKCS8PrivateKey_bio, 

820 data, 

821 password, 

822 unsafe_skip_rsa_key_validation, 

823 ) 

824 

825 def _evp_pkey_from_der_traditional_key(self, bio_data, password): 

826 key = self._lib.d2i_PrivateKey_bio(bio_data.bio, self._ffi.NULL) 

827 if key != self._ffi.NULL: 

828 key = self._ffi.gc(key, self._lib.EVP_PKEY_free) 

829 if password is not None: 

830 raise TypeError( 

831 "Password was given but private key is not encrypted." 

832 ) 

833 

834 return key 

835 else: 

836 self._consume_errors() 

837 return None 

838 

839 def load_der_public_key(self, data: bytes) -> PublicKeyTypes: 

840 mem_bio = self._bytes_to_bio(data) 

841 evp_pkey = self._lib.d2i_PUBKEY_bio(mem_bio.bio, self._ffi.NULL) 

842 if evp_pkey != self._ffi.NULL: 

843 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 

844 return self._evp_pkey_to_public_key(evp_pkey) 

845 else: 

846 # It's not a (RSA/DSA/ECDSA) subjectPublicKeyInfo, but we still 

847 # need to check to see if it is a pure PKCS1 RSA public key (not 

848 # embedded in a subjectPublicKeyInfo) 

849 self._consume_errors() 

850 res = self._lib.BIO_reset(mem_bio.bio) 

851 self.openssl_assert(res == 1) 

852 rsa_cdata = self._lib.d2i_RSAPublicKey_bio( 

853 mem_bio.bio, self._ffi.NULL 

854 ) 

855 if rsa_cdata != self._ffi.NULL: 

856 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) 

857 evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata) 

858 return _RSAPublicKey(self, rsa_cdata, evp_pkey) 

859 else: 

860 self._handle_key_loading_error() 

861 

862 def load_der_parameters(self, data: bytes) -> dh.DHParameters: 

863 return rust_openssl.dh.from_der_parameters(data) 

864 

865 def _cert2ossl(self, cert: x509.Certificate) -> typing.Any: 

866 data = cert.public_bytes(serialization.Encoding.DER) 

867 mem_bio = self._bytes_to_bio(data) 

868 x509 = self._lib.d2i_X509_bio(mem_bio.bio, self._ffi.NULL) 

869 self.openssl_assert(x509 != self._ffi.NULL) 

870 x509 = self._ffi.gc(x509, self._lib.X509_free) 

871 return x509 

872 

873 def _ossl2cert(self, x509_ptr: typing.Any) -> x509.Certificate: 

874 bio = self._create_mem_bio_gc() 

875 res = self._lib.i2d_X509_bio(bio, x509_ptr) 

876 self.openssl_assert(res == 1) 

877 return x509.load_der_x509_certificate(self._read_mem_bio(bio)) 

878 

879 def _key2ossl(self, key: PKCS12PrivateKeyTypes) -> typing.Any: 

880 data = key.private_bytes( 

881 serialization.Encoding.DER, 

882 serialization.PrivateFormat.PKCS8, 

883 serialization.NoEncryption(), 

884 ) 

885 mem_bio = self._bytes_to_bio(data) 

886 

887 evp_pkey = self._lib.d2i_PrivateKey_bio( 

888 mem_bio.bio, 

889 self._ffi.NULL, 

890 ) 

891 self.openssl_assert(evp_pkey != self._ffi.NULL) 

892 return self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 

893 

894 def _load_key( 

895 self, openssl_read_func, data, password, unsafe_skip_rsa_key_validation 

896 ) -> PrivateKeyTypes: 

897 mem_bio = self._bytes_to_bio(data) 

898 

899 userdata = self._ffi.new("CRYPTOGRAPHY_PASSWORD_DATA *") 

900 if password is not None: 

901 utils._check_byteslike("password", password) 

902 password_ptr = self._ffi.from_buffer(password) 

903 userdata.password = password_ptr 

904 userdata.length = len(password) 

905 

906 evp_pkey = openssl_read_func( 

907 mem_bio.bio, 

908 self._ffi.NULL, 

909 self._ffi.addressof( 

910 self._lib._original_lib, "Cryptography_pem_password_cb" 

911 ), 

912 userdata, 

913 ) 

914 

915 if evp_pkey == self._ffi.NULL: 

916 if userdata.error != 0: 

917 self._consume_errors() 

918 if userdata.error == -1: 

919 raise TypeError( 

920 "Password was not given but private key is encrypted" 

921 ) 

922 else: 

923 assert userdata.error == -2 

924 raise ValueError( 

925 "Passwords longer than {} bytes are not supported " 

926 "by this backend.".format(userdata.maxsize - 1) 

927 ) 

928 else: 

929 self._handle_key_loading_error() 

930 

931 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 

932 

933 if password is not None and userdata.called == 0: 

934 raise TypeError( 

935 "Password was given but private key is not encrypted." 

936 ) 

937 

938 assert ( 

939 password is not None and userdata.called == 1 

940 ) or password is None 

941 

942 return self._evp_pkey_to_private_key( 

943 evp_pkey, unsafe_skip_rsa_key_validation 

944 ) 

945 

946 def _handle_key_loading_error(self) -> typing.NoReturn: 

947 errors = self._consume_errors() 

948 

949 if not errors: 

950 raise ValueError( 

951 "Could not deserialize key data. The data may be in an " 

952 "incorrect format or it may be encrypted with an unsupported " 

953 "algorithm." 

954 ) 

955 

956 elif ( 

957 errors[0]._lib_reason_match( 

958 self._lib.ERR_LIB_EVP, self._lib.EVP_R_BAD_DECRYPT 

959 ) 

960 or errors[0]._lib_reason_match( 

961 self._lib.ERR_LIB_PKCS12, 

962 self._lib.PKCS12_R_PKCS12_CIPHERFINAL_ERROR, 

963 ) 

964 or ( 

965 self._lib.Cryptography_HAS_PROVIDERS 

966 and errors[0]._lib_reason_match( 

967 self._lib.ERR_LIB_PROV, 

968 self._lib.PROV_R_BAD_DECRYPT, 

969 ) 

970 ) 

971 ): 

972 raise ValueError("Bad decrypt. Incorrect password?") 

973 

974 elif any( 

975 error._lib_reason_match( 

976 self._lib.ERR_LIB_EVP, 

977 self._lib.EVP_R_UNSUPPORTED_PRIVATE_KEY_ALGORITHM, 

978 ) 

979 for error in errors 

980 ): 

981 raise ValueError("Unsupported public key algorithm.") 

982 

983 else: 

984 raise ValueError( 

985 "Could not deserialize key data. The data may be in an " 

986 "incorrect format, it may be encrypted with an unsupported " 

987 "algorithm, or it may be an unsupported key type (e.g. EC " 

988 "curves with explicit parameters).", 

989 errors, 

990 ) 

991 

992 def elliptic_curve_supported(self, curve: ec.EllipticCurve) -> bool: 

993 try: 

994 curve_nid = self._elliptic_curve_to_nid(curve) 

995 except UnsupportedAlgorithm: 

996 curve_nid = self._lib.NID_undef 

997 

998 group = self._lib.EC_GROUP_new_by_curve_name(curve_nid) 

999 

1000 if group == self._ffi.NULL: 

1001 self._consume_errors() 

1002 return False 

1003 else: 

1004 self.openssl_assert(curve_nid != self._lib.NID_undef) 

1005 self._lib.EC_GROUP_free(group) 

1006 return True 

1007 

1008 def elliptic_curve_signature_algorithm_supported( 

1009 self, 

1010 signature_algorithm: ec.EllipticCurveSignatureAlgorithm, 

1011 curve: ec.EllipticCurve, 

1012 ) -> bool: 

1013 # We only support ECDSA right now. 

1014 if not isinstance(signature_algorithm, ec.ECDSA): 

1015 return False 

1016 

1017 return self.elliptic_curve_supported(curve) 

1018 

1019 def generate_elliptic_curve_private_key( 

1020 self, curve: ec.EllipticCurve 

1021 ) -> ec.EllipticCurvePrivateKey: 

1022 """ 

1023 Generate a new private key on the named curve. 

1024 """ 

1025 

1026 if self.elliptic_curve_supported(curve): 

1027 ec_cdata = self._ec_key_new_by_curve(curve) 

1028 

1029 res = self._lib.EC_KEY_generate_key(ec_cdata) 

1030 self.openssl_assert(res == 1) 

1031 

1032 evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata) 

1033 

1034 return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey) 

1035 else: 

1036 raise UnsupportedAlgorithm( 

1037 f"Backend object does not support {curve.name}.", 

1038 _Reasons.UNSUPPORTED_ELLIPTIC_CURVE, 

1039 ) 

1040 

1041 def load_elliptic_curve_private_numbers( 

1042 self, numbers: ec.EllipticCurvePrivateNumbers 

1043 ) -> ec.EllipticCurvePrivateKey: 

1044 public = numbers.public_numbers 

1045 

1046 ec_cdata = self._ec_key_new_by_curve(public.curve) 

1047 

1048 private_value = self._ffi.gc( 

1049 self._int_to_bn(numbers.private_value), self._lib.BN_clear_free 

1050 ) 

1051 res = self._lib.EC_KEY_set_private_key(ec_cdata, private_value) 

1052 if res != 1: 

1053 self._consume_errors() 

1054 raise ValueError("Invalid EC key.") 

1055 

1056 with self._tmp_bn_ctx() as bn_ctx: 

1057 self._ec_key_set_public_key_affine_coordinates( 

1058 ec_cdata, public.x, public.y, bn_ctx 

1059 ) 

1060 # derive the expected public point and compare it to the one we 

1061 # just set based on the values we were given. If they don't match 

1062 # this isn't a valid key pair. 

1063 group = self._lib.EC_KEY_get0_group(ec_cdata) 

1064 self.openssl_assert(group != self._ffi.NULL) 

1065 set_point = backend._lib.EC_KEY_get0_public_key(ec_cdata) 

1066 self.openssl_assert(set_point != self._ffi.NULL) 

1067 computed_point = self._lib.EC_POINT_new(group) 

1068 self.openssl_assert(computed_point != self._ffi.NULL) 

1069 computed_point = self._ffi.gc( 

1070 computed_point, self._lib.EC_POINT_free 

1071 ) 

1072 res = self._lib.EC_POINT_mul( 

1073 group, 

1074 computed_point, 

1075 private_value, 

1076 self._ffi.NULL, 

1077 self._ffi.NULL, 

1078 bn_ctx, 

1079 ) 

1080 self.openssl_assert(res == 1) 

1081 if ( 

1082 self._lib.EC_POINT_cmp( 

1083 group, set_point, computed_point, bn_ctx 

1084 ) 

1085 != 0 

1086 ): 

1087 raise ValueError("Invalid EC key.") 

1088 

1089 evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata) 

1090 

1091 return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey) 

1092 

1093 def load_elliptic_curve_public_numbers( 

1094 self, numbers: ec.EllipticCurvePublicNumbers 

1095 ) -> ec.EllipticCurvePublicKey: 

1096 ec_cdata = self._ec_key_new_by_curve(numbers.curve) 

1097 with self._tmp_bn_ctx() as bn_ctx: 

1098 self._ec_key_set_public_key_affine_coordinates( 

1099 ec_cdata, numbers.x, numbers.y, bn_ctx 

1100 ) 

1101 evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata) 

1102 

1103 return _EllipticCurvePublicKey(self, ec_cdata, evp_pkey) 

1104 

1105 def load_elliptic_curve_public_bytes( 

1106 self, curve: ec.EllipticCurve, point_bytes: bytes 

1107 ) -> ec.EllipticCurvePublicKey: 

1108 ec_cdata = self._ec_key_new_by_curve(curve) 

1109 group = self._lib.EC_KEY_get0_group(ec_cdata) 

1110 self.openssl_assert(group != self._ffi.NULL) 

1111 point = self._lib.EC_POINT_new(group) 

1112 self.openssl_assert(point != self._ffi.NULL) 

1113 point = self._ffi.gc(point, self._lib.EC_POINT_free) 

1114 with self._tmp_bn_ctx() as bn_ctx: 

1115 res = self._lib.EC_POINT_oct2point( 

1116 group, point, point_bytes, len(point_bytes), bn_ctx 

1117 ) 

1118 if res != 1: 

1119 self._consume_errors() 

1120 raise ValueError("Invalid public bytes for the given curve") 

1121 

1122 res = self._lib.EC_KEY_set_public_key(ec_cdata, point) 

1123 self.openssl_assert(res == 1) 

1124 evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata) 

1125 return _EllipticCurvePublicKey(self, ec_cdata, evp_pkey) 

1126 

1127 def derive_elliptic_curve_private_key( 

1128 self, private_value: int, curve: ec.EllipticCurve 

1129 ) -> ec.EllipticCurvePrivateKey: 

1130 ec_cdata = self._ec_key_new_by_curve(curve) 

1131 

1132 group = self._lib.EC_KEY_get0_group(ec_cdata) 

1133 self.openssl_assert(group != self._ffi.NULL) 

1134 

1135 point = self._lib.EC_POINT_new(group) 

1136 self.openssl_assert(point != self._ffi.NULL) 

1137 point = self._ffi.gc(point, self._lib.EC_POINT_free) 

1138 

1139 value = self._int_to_bn(private_value) 

1140 value = self._ffi.gc(value, self._lib.BN_clear_free) 

1141 

1142 with self._tmp_bn_ctx() as bn_ctx: 

1143 res = self._lib.EC_POINT_mul( 

1144 group, point, value, self._ffi.NULL, self._ffi.NULL, bn_ctx 

1145 ) 

1146 self.openssl_assert(res == 1) 

1147 

1148 bn_x = self._lib.BN_CTX_get(bn_ctx) 

1149 bn_y = self._lib.BN_CTX_get(bn_ctx) 

1150 

1151 res = self._lib.EC_POINT_get_affine_coordinates( 

1152 group, point, bn_x, bn_y, bn_ctx 

1153 ) 

1154 if res != 1: 

1155 self._consume_errors() 

1156 raise ValueError("Unable to derive key from private_value") 

1157 

1158 res = self._lib.EC_KEY_set_public_key(ec_cdata, point) 

1159 self.openssl_assert(res == 1) 

1160 private = self._int_to_bn(private_value) 

1161 private = self._ffi.gc(private, self._lib.BN_clear_free) 

1162 res = self._lib.EC_KEY_set_private_key(ec_cdata, private) 

1163 self.openssl_assert(res == 1) 

1164 

1165 evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata) 

1166 

1167 return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey) 

1168 

1169 def _ec_key_new_by_curve(self, curve: ec.EllipticCurve): 

1170 curve_nid = self._elliptic_curve_to_nid(curve) 

1171 return self._ec_key_new_by_curve_nid(curve_nid) 

1172 

1173 def _ec_key_new_by_curve_nid(self, curve_nid: int): 

1174 ec_cdata = self._lib.EC_KEY_new_by_curve_name(curve_nid) 

1175 self.openssl_assert(ec_cdata != self._ffi.NULL) 

1176 return self._ffi.gc(ec_cdata, self._lib.EC_KEY_free) 

1177 

1178 def elliptic_curve_exchange_algorithm_supported( 

1179 self, algorithm: ec.ECDH, curve: ec.EllipticCurve 

1180 ) -> bool: 

1181 if self._fips_enabled and not isinstance( 

1182 curve, self._fips_ecdh_curves 

1183 ): 

1184 return False 

1185 

1186 return self.elliptic_curve_supported(curve) and isinstance( 

1187 algorithm, ec.ECDH 

1188 ) 

1189 

1190 def _ec_cdata_to_evp_pkey(self, ec_cdata): 

1191 evp_pkey = self._create_evp_pkey_gc() 

1192 res = self._lib.EVP_PKEY_set1_EC_KEY(evp_pkey, ec_cdata) 

1193 self.openssl_assert(res == 1) 

1194 return evp_pkey 

1195 

1196 def _elliptic_curve_to_nid(self, curve: ec.EllipticCurve) -> int: 

1197 """ 

1198 Get the NID for a curve name. 

1199 """ 

1200 

1201 curve_aliases = {"secp192r1": "prime192v1", "secp256r1": "prime256v1"} 

1202 

1203 curve_name = curve_aliases.get(curve.name, curve.name) 

1204 

1205 curve_nid = self._lib.OBJ_sn2nid(curve_name.encode()) 

1206 if curve_nid == self._lib.NID_undef: 

1207 raise UnsupportedAlgorithm( 

1208 f"{curve.name} is not a supported elliptic curve", 

1209 _Reasons.UNSUPPORTED_ELLIPTIC_CURVE, 

1210 ) 

1211 return curve_nid 

1212 

1213 @contextmanager 

1214 def _tmp_bn_ctx(self): 

1215 bn_ctx = self._lib.BN_CTX_new() 

1216 self.openssl_assert(bn_ctx != self._ffi.NULL) 

1217 bn_ctx = self._ffi.gc(bn_ctx, self._lib.BN_CTX_free) 

1218 self._lib.BN_CTX_start(bn_ctx) 

1219 try: 

1220 yield bn_ctx 

1221 finally: 

1222 self._lib.BN_CTX_end(bn_ctx) 

1223 

1224 def _ec_key_set_public_key_affine_coordinates( 

1225 self, 

1226 ec_cdata, 

1227 x: int, 

1228 y: int, 

1229 bn_ctx, 

1230 ) -> None: 

1231 """ 

1232 Sets the public key point in the EC_KEY context to the affine x and y 

1233 values. 

1234 """ 

1235 

1236 if x < 0 or y < 0: 

1237 raise ValueError( 

1238 "Invalid EC key. Both x and y must be non-negative." 

1239 ) 

1240 

1241 x = self._ffi.gc(self._int_to_bn(x), self._lib.BN_free) 

1242 y = self._ffi.gc(self._int_to_bn(y), self._lib.BN_free) 

1243 group = self._lib.EC_KEY_get0_group(ec_cdata) 

1244 self.openssl_assert(group != self._ffi.NULL) 

1245 point = self._lib.EC_POINT_new(group) 

1246 self.openssl_assert(point != self._ffi.NULL) 

1247 point = self._ffi.gc(point, self._lib.EC_POINT_free) 

1248 res = self._lib.EC_POINT_set_affine_coordinates( 

1249 group, point, x, y, bn_ctx 

1250 ) 

1251 if res != 1: 

1252 self._consume_errors() 

1253 raise ValueError("Invalid EC key.") 

1254 res = self._lib.EC_KEY_set_public_key(ec_cdata, point) 

1255 self.openssl_assert(res == 1) 

1256 

1257 def _private_key_bytes( 

1258 self, 

1259 encoding: serialization.Encoding, 

1260 format: serialization.PrivateFormat, 

1261 encryption_algorithm: serialization.KeySerializationEncryption, 

1262 key, 

1263 evp_pkey, 

1264 cdata, 

1265 ) -> bytes: 

1266 # validate argument types 

1267 if not isinstance(encoding, serialization.Encoding): 

1268 raise TypeError("encoding must be an item from the Encoding enum") 

1269 if not isinstance(format, serialization.PrivateFormat): 

1270 raise TypeError( 

1271 "format must be an item from the PrivateFormat enum" 

1272 ) 

1273 if not isinstance( 

1274 encryption_algorithm, serialization.KeySerializationEncryption 

1275 ): 

1276 raise TypeError( 

1277 "Encryption algorithm must be a KeySerializationEncryption " 

1278 "instance" 

1279 ) 

1280 

1281 # validate password 

1282 if isinstance(encryption_algorithm, serialization.NoEncryption): 

1283 password = b"" 

1284 elif isinstance( 

1285 encryption_algorithm, serialization.BestAvailableEncryption 

1286 ): 

1287 password = encryption_algorithm.password 

1288 if len(password) > 1023: 

1289 raise ValueError( 

1290 "Passwords longer than 1023 bytes are not supported by " 

1291 "this backend" 

1292 ) 

1293 elif ( 

1294 isinstance( 

1295 encryption_algorithm, serialization._KeySerializationEncryption 

1296 ) 

1297 and encryption_algorithm._format 

1298 is format 

1299 is serialization.PrivateFormat.OpenSSH 

1300 ): 

1301 password = encryption_algorithm.password 

1302 else: 

1303 raise ValueError("Unsupported encryption type") 

1304 

1305 # PKCS8 + PEM/DER 

1306 if format is serialization.PrivateFormat.PKCS8: 

1307 if encoding is serialization.Encoding.PEM: 

1308 write_bio = self._lib.PEM_write_bio_PKCS8PrivateKey 

1309 elif encoding is serialization.Encoding.DER: 

1310 write_bio = self._lib.i2d_PKCS8PrivateKey_bio 

1311 else: 

1312 raise ValueError("Unsupported encoding for PKCS8") 

1313 return self._private_key_bytes_via_bio( 

1314 write_bio, evp_pkey, password 

1315 ) 

1316 

1317 # TraditionalOpenSSL + PEM/DER 

1318 if format is serialization.PrivateFormat.TraditionalOpenSSL: 

1319 if self._fips_enabled and not isinstance( 

1320 encryption_algorithm, serialization.NoEncryption 

1321 ): 

1322 raise ValueError( 

1323 "Encrypted traditional OpenSSL format is not " 

1324 "supported in FIPS mode." 

1325 ) 

1326 key_type = self._lib.EVP_PKEY_id(evp_pkey) 

1327 

1328 if encoding is serialization.Encoding.PEM: 

1329 if key_type == self._lib.EVP_PKEY_RSA: 

1330 write_bio = self._lib.PEM_write_bio_RSAPrivateKey 

1331 else: 

1332 assert key_type == self._lib.EVP_PKEY_EC 

1333 write_bio = self._lib.PEM_write_bio_ECPrivateKey 

1334 return self._private_key_bytes_via_bio( 

1335 write_bio, cdata, password 

1336 ) 

1337 

1338 if encoding is serialization.Encoding.DER: 

1339 if password: 

1340 raise ValueError( 

1341 "Encryption is not supported for DER encoded " 

1342 "traditional OpenSSL keys" 

1343 ) 

1344 if key_type == self._lib.EVP_PKEY_RSA: 

1345 write_bio = self._lib.i2d_RSAPrivateKey_bio 

1346 else: 

1347 assert key_type == self._lib.EVP_PKEY_EC 

1348 write_bio = self._lib.i2d_ECPrivateKey_bio 

1349 return self._bio_func_output(write_bio, cdata) 

1350 

1351 raise ValueError("Unsupported encoding for TraditionalOpenSSL") 

1352 

1353 # OpenSSH + PEM 

1354 if format is serialization.PrivateFormat.OpenSSH: 

1355 if encoding is serialization.Encoding.PEM: 

1356 return ssh._serialize_ssh_private_key( 

1357 key, password, encryption_algorithm 

1358 ) 

1359 

1360 raise ValueError( 

1361 "OpenSSH private key format can only be used" 

1362 " with PEM encoding" 

1363 ) 

1364 

1365 # Anything that key-specific code was supposed to handle earlier, 

1366 # like Raw. 

1367 raise ValueError("format is invalid with this key") 

1368 

1369 def _private_key_bytes_via_bio( 

1370 self, write_bio, evp_pkey, password 

1371 ) -> bytes: 

1372 if not password: 

1373 evp_cipher = self._ffi.NULL 

1374 else: 

1375 # This is a curated value that we will update over time. 

1376 evp_cipher = self._lib.EVP_get_cipherbyname(b"aes-256-cbc") 

1377 

1378 return self._bio_func_output( 

1379 write_bio, 

1380 evp_pkey, 

1381 evp_cipher, 

1382 password, 

1383 len(password), 

1384 self._ffi.NULL, 

1385 self._ffi.NULL, 

1386 ) 

1387 

1388 def _bio_func_output(self, write_bio, *args) -> bytes: 

1389 bio = self._create_mem_bio_gc() 

1390 res = write_bio(bio, *args) 

1391 self.openssl_assert(res == 1) 

1392 return self._read_mem_bio(bio) 

1393 

1394 def _public_key_bytes( 

1395 self, 

1396 encoding: serialization.Encoding, 

1397 format: serialization.PublicFormat, 

1398 key, 

1399 evp_pkey, 

1400 cdata, 

1401 ) -> bytes: 

1402 if not isinstance(encoding, serialization.Encoding): 

1403 raise TypeError("encoding must be an item from the Encoding enum") 

1404 if not isinstance(format, serialization.PublicFormat): 

1405 raise TypeError( 

1406 "format must be an item from the PublicFormat enum" 

1407 ) 

1408 

1409 # SubjectPublicKeyInfo + PEM/DER 

1410 if format is serialization.PublicFormat.SubjectPublicKeyInfo: 

1411 if encoding is serialization.Encoding.PEM: 

1412 write_bio = self._lib.PEM_write_bio_PUBKEY 

1413 elif encoding is serialization.Encoding.DER: 

1414 write_bio = self._lib.i2d_PUBKEY_bio 

1415 else: 

1416 raise ValueError( 

1417 "SubjectPublicKeyInfo works only with PEM or DER encoding" 

1418 ) 

1419 return self._bio_func_output(write_bio, evp_pkey) 

1420 

1421 # PKCS1 + PEM/DER 

1422 if format is serialization.PublicFormat.PKCS1: 

1423 # Only RSA is supported here. 

1424 key_type = self._lib.EVP_PKEY_id(evp_pkey) 

1425 if key_type != self._lib.EVP_PKEY_RSA: 

1426 raise ValueError("PKCS1 format is supported only for RSA keys") 

1427 

1428 if encoding is serialization.Encoding.PEM: 

1429 write_bio = self._lib.PEM_write_bio_RSAPublicKey 

1430 elif encoding is serialization.Encoding.DER: 

1431 write_bio = self._lib.i2d_RSAPublicKey_bio 

1432 else: 

1433 raise ValueError("PKCS1 works only with PEM or DER encoding") 

1434 return self._bio_func_output(write_bio, cdata) 

1435 

1436 # OpenSSH + OpenSSH 

1437 if format is serialization.PublicFormat.OpenSSH: 

1438 if encoding is serialization.Encoding.OpenSSH: 

1439 return ssh.serialize_ssh_public_key(key) 

1440 

1441 raise ValueError( 

1442 "OpenSSH format must be used with OpenSSH encoding" 

1443 ) 

1444 

1445 # Anything that key-specific code was supposed to handle earlier, 

1446 # like Raw, CompressedPoint, UncompressedPoint 

1447 raise ValueError("format is invalid with this key") 

1448 

1449 def dh_supported(self) -> bool: 

1450 return not self._lib.CRYPTOGRAPHY_IS_BORINGSSL 

1451 

1452 def generate_dh_parameters( 

1453 self, generator: int, key_size: int 

1454 ) -> dh.DHParameters: 

1455 return rust_openssl.dh.generate_parameters(generator, key_size) 

1456 

1457 def generate_dh_private_key( 

1458 self, parameters: dh.DHParameters 

1459 ) -> dh.DHPrivateKey: 

1460 return parameters.generate_private_key() 

1461 

1462 def generate_dh_private_key_and_parameters( 

1463 self, generator: int, key_size: int 

1464 ) -> dh.DHPrivateKey: 

1465 return self.generate_dh_private_key( 

1466 self.generate_dh_parameters(generator, key_size) 

1467 ) 

1468 

1469 def load_dh_private_numbers( 

1470 self, numbers: dh.DHPrivateNumbers 

1471 ) -> dh.DHPrivateKey: 

1472 return rust_openssl.dh.from_private_numbers(numbers) 

1473 

1474 def load_dh_public_numbers( 

1475 self, numbers: dh.DHPublicNumbers 

1476 ) -> dh.DHPublicKey: 

1477 return rust_openssl.dh.from_public_numbers(numbers) 

1478 

1479 def load_dh_parameter_numbers( 

1480 self, numbers: dh.DHParameterNumbers 

1481 ) -> dh.DHParameters: 

1482 return rust_openssl.dh.from_parameter_numbers(numbers) 

1483 

1484 def dh_parameters_supported( 

1485 self, p: int, g: int, q: typing.Optional[int] = None 

1486 ) -> bool: 

1487 try: 

1488 rust_openssl.dh.from_parameter_numbers( 

1489 dh.DHParameterNumbers(p=p, g=g, q=q) 

1490 ) 

1491 except ValueError: 

1492 return False 

1493 else: 

1494 return True 

1495 

1496 def dh_x942_serialization_supported(self) -> bool: 

1497 return self._lib.Cryptography_HAS_EVP_PKEY_DHX == 1 

1498 

1499 def x25519_load_public_bytes(self, data: bytes) -> x25519.X25519PublicKey: 

1500 return rust_openssl.x25519.from_public_bytes(data) 

1501 

1502 def x25519_load_private_bytes( 

1503 self, data: bytes 

1504 ) -> x25519.X25519PrivateKey: 

1505 return rust_openssl.x25519.from_private_bytes(data) 

1506 

1507 def x25519_generate_key(self) -> x25519.X25519PrivateKey: 

1508 return rust_openssl.x25519.generate_key() 

1509 

1510 def x25519_supported(self) -> bool: 

1511 if self._fips_enabled: 

1512 return False 

1513 return not self._lib.CRYPTOGRAPHY_LIBRESSL_LESS_THAN_370 

1514 

1515 def x448_load_public_bytes(self, data: bytes) -> x448.X448PublicKey: 

1516 return rust_openssl.x448.from_public_bytes(data) 

1517 

1518 def x448_load_private_bytes(self, data: bytes) -> x448.X448PrivateKey: 

1519 return rust_openssl.x448.from_private_bytes(data) 

1520 

1521 def x448_generate_key(self) -> x448.X448PrivateKey: 

1522 return rust_openssl.x448.generate_key() 

1523 

1524 def x448_supported(self) -> bool: 

1525 if self._fips_enabled: 

1526 return False 

1527 return ( 

1528 not self._lib.CRYPTOGRAPHY_IS_LIBRESSL 

1529 and not self._lib.CRYPTOGRAPHY_IS_BORINGSSL 

1530 ) 

1531 

1532 def ed25519_supported(self) -> bool: 

1533 if self._fips_enabled: 

1534 return False 

1535 return self._lib.CRYPTOGRAPHY_HAS_WORKING_ED25519 

1536 

1537 def ed25519_load_public_bytes( 

1538 self, data: bytes 

1539 ) -> ed25519.Ed25519PublicKey: 

1540 return rust_openssl.ed25519.from_public_bytes(data) 

1541 

1542 def ed25519_load_private_bytes( 

1543 self, data: bytes 

1544 ) -> ed25519.Ed25519PrivateKey: 

1545 return rust_openssl.ed25519.from_private_bytes(data) 

1546 

1547 def ed25519_generate_key(self) -> ed25519.Ed25519PrivateKey: 

1548 return rust_openssl.ed25519.generate_key() 

1549 

1550 def ed448_supported(self) -> bool: 

1551 if self._fips_enabled: 

1552 return False 

1553 return ( 

1554 not self._lib.CRYPTOGRAPHY_IS_LIBRESSL 

1555 and not self._lib.CRYPTOGRAPHY_IS_BORINGSSL 

1556 ) 

1557 

1558 def ed448_load_public_bytes(self, data: bytes) -> ed448.Ed448PublicKey: 

1559 return rust_openssl.ed448.from_public_bytes(data) 

1560 

1561 def ed448_load_private_bytes(self, data: bytes) -> ed448.Ed448PrivateKey: 

1562 return rust_openssl.ed448.from_private_bytes(data) 

1563 

1564 def ed448_generate_key(self) -> ed448.Ed448PrivateKey: 

1565 return rust_openssl.ed448.generate_key() 

1566 

1567 def aead_cipher_supported(self, cipher) -> bool: 

1568 return aead._aead_cipher_supported(self, cipher) 

1569 

1570 def _zero_data(self, data, length: int) -> None: 

1571 # We clear things this way because at the moment we're not 

1572 # sure of a better way that can guarantee it overwrites the 

1573 # memory of a bytearray and doesn't just replace the underlying char *. 

1574 for i in range(length): 

1575 data[i] = 0 

1576 

1577 @contextlib.contextmanager 

1578 def _zeroed_null_terminated_buf(self, data): 

1579 """ 

1580 This method takes bytes, which can be a bytestring or a mutable 

1581 buffer like a bytearray, and yields a null-terminated version of that 

1582 data. This is required because PKCS12_parse doesn't take a length with 

1583 its password char * and ffi.from_buffer doesn't provide null 

1584 termination. So, to support zeroing the data via bytearray we 

1585 need to build this ridiculous construct that copies the memory, but 

1586 zeroes it after use. 

1587 """ 

1588 if data is None: 

1589 yield self._ffi.NULL 

1590 else: 

1591 data_len = len(data) 

1592 buf = self._ffi.new("char[]", data_len + 1) 

1593 self._ffi.memmove(buf, data, data_len) 

1594 try: 

1595 yield buf 

1596 finally: 

1597 # Cast to a uint8_t * so we can assign by integer 

1598 self._zero_data(self._ffi.cast("uint8_t *", buf), data_len) 

1599 

1600 def load_key_and_certificates_from_pkcs12( 

1601 self, data: bytes, password: typing.Optional[bytes] 

1602 ) -> typing.Tuple[ 

1603 typing.Optional[PrivateKeyTypes], 

1604 typing.Optional[x509.Certificate], 

1605 typing.List[x509.Certificate], 

1606 ]: 

1607 pkcs12 = self.load_pkcs12(data, password) 

1608 return ( 

1609 pkcs12.key, 

1610 pkcs12.cert.certificate if pkcs12.cert else None, 

1611 [cert.certificate for cert in pkcs12.additional_certs], 

1612 ) 

1613 

1614 def load_pkcs12( 

1615 self, data: bytes, password: typing.Optional[bytes] 

1616 ) -> PKCS12KeyAndCertificates: 

1617 if password is not None: 

1618 utils._check_byteslike("password", password) 

1619 

1620 bio = self._bytes_to_bio(data) 

1621 p12 = self._lib.d2i_PKCS12_bio(bio.bio, self._ffi.NULL) 

1622 if p12 == self._ffi.NULL: 

1623 self._consume_errors() 

1624 raise ValueError("Could not deserialize PKCS12 data") 

1625 

1626 p12 = self._ffi.gc(p12, self._lib.PKCS12_free) 

1627 evp_pkey_ptr = self._ffi.new("EVP_PKEY **") 

1628 x509_ptr = self._ffi.new("X509 **") 

1629 sk_x509_ptr = self._ffi.new("Cryptography_STACK_OF_X509 **") 

1630 with self._zeroed_null_terminated_buf(password) as password_buf: 

1631 res = self._lib.PKCS12_parse( 

1632 p12, password_buf, evp_pkey_ptr, x509_ptr, sk_x509_ptr 

1633 ) 

1634 if res == 0: 

1635 self._consume_errors() 

1636 raise ValueError("Invalid password or PKCS12 data") 

1637 

1638 cert = None 

1639 key = None 

1640 additional_certificates = [] 

1641 

1642 if evp_pkey_ptr[0] != self._ffi.NULL: 

1643 evp_pkey = self._ffi.gc(evp_pkey_ptr[0], self._lib.EVP_PKEY_free) 

1644 # We don't support turning off RSA key validation when loading 

1645 # PKCS12 keys 

1646 key = self._evp_pkey_to_private_key( 

1647 evp_pkey, unsafe_skip_rsa_key_validation=False 

1648 ) 

1649 

1650 if x509_ptr[0] != self._ffi.NULL: 

1651 x509 = self._ffi.gc(x509_ptr[0], self._lib.X509_free) 

1652 cert_obj = self._ossl2cert(x509) 

1653 name = None 

1654 maybe_name = self._lib.X509_alias_get0(x509, self._ffi.NULL) 

1655 if maybe_name != self._ffi.NULL: 

1656 name = self._ffi.string(maybe_name) 

1657 cert = PKCS12Certificate(cert_obj, name) 

1658 

1659 if sk_x509_ptr[0] != self._ffi.NULL: 

1660 sk_x509 = self._ffi.gc(sk_x509_ptr[0], self._lib.sk_X509_free) 

1661 num = self._lib.sk_X509_num(sk_x509_ptr[0]) 

1662 

1663 # In OpenSSL < 3.0.0 PKCS12 parsing reverses the order of the 

1664 # certificates. 

1665 indices: typing.Iterable[int] 

1666 if ( 

1667 self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER 

1668 or self._lib.CRYPTOGRAPHY_IS_BORINGSSL 

1669 ): 

1670 indices = range(num) 

1671 else: 

1672 indices = reversed(range(num)) 

1673 

1674 for i in indices: 

1675 x509 = self._lib.sk_X509_value(sk_x509, i) 

1676 self.openssl_assert(x509 != self._ffi.NULL) 

1677 x509 = self._ffi.gc(x509, self._lib.X509_free) 

1678 addl_cert = self._ossl2cert(x509) 

1679 addl_name = None 

1680 maybe_name = self._lib.X509_alias_get0(x509, self._ffi.NULL) 

1681 if maybe_name != self._ffi.NULL: 

1682 addl_name = self._ffi.string(maybe_name) 

1683 additional_certificates.append( 

1684 PKCS12Certificate(addl_cert, addl_name) 

1685 ) 

1686 

1687 return PKCS12KeyAndCertificates(key, cert, additional_certificates) 

1688 

1689 def serialize_key_and_certificates_to_pkcs12( 

1690 self, 

1691 name: typing.Optional[bytes], 

1692 key: typing.Optional[PKCS12PrivateKeyTypes], 

1693 cert: typing.Optional[x509.Certificate], 

1694 cas: typing.Optional[typing.List[_PKCS12CATypes]], 

1695 encryption_algorithm: serialization.KeySerializationEncryption, 

1696 ) -> bytes: 

1697 password = None 

1698 if name is not None: 

1699 utils._check_bytes("name", name) 

1700 

1701 if isinstance(encryption_algorithm, serialization.NoEncryption): 

1702 nid_cert = -1 

1703 nid_key = -1 

1704 pkcs12_iter = 0 

1705 mac_iter = 0 

1706 mac_alg = self._ffi.NULL 

1707 elif isinstance( 

1708 encryption_algorithm, serialization.BestAvailableEncryption 

1709 ): 

1710 # PKCS12 encryption is hopeless trash and can never be fixed. 

1711 # OpenSSL 3 supports PBESv2, but Libre and Boring do not, so 

1712 # we use PBESv1 with 3DES on the older paths. 

1713 if self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER: 

1714 nid_cert = self._lib.NID_aes_256_cbc 

1715 nid_key = self._lib.NID_aes_256_cbc 

1716 else: 

1717 nid_cert = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC 

1718 nid_key = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC 

1719 # At least we can set this higher than OpenSSL's default 

1720 pkcs12_iter = 20000 

1721 # mac_iter chosen for compatibility reasons, see: 

1722 # https://www.openssl.org/docs/man1.1.1/man3/PKCS12_create.html 

1723 # Did we mention how lousy PKCS12 encryption is? 

1724 mac_iter = 1 

1725 # MAC algorithm can only be set on OpenSSL 3.0.0+ 

1726 mac_alg = self._ffi.NULL 

1727 password = encryption_algorithm.password 

1728 elif ( 

1729 isinstance( 

1730 encryption_algorithm, serialization._KeySerializationEncryption 

1731 ) 

1732 and encryption_algorithm._format 

1733 is serialization.PrivateFormat.PKCS12 

1734 ): 

1735 # Default to OpenSSL's defaults. Behavior will vary based on the 

1736 # version of OpenSSL cryptography is compiled against. 

1737 nid_cert = 0 

1738 nid_key = 0 

1739 # Use the default iters we use in best available 

1740 pkcs12_iter = 20000 

1741 # See the Best Available comment for why this is 1 

1742 mac_iter = 1 

1743 password = encryption_algorithm.password 

1744 keycertalg = encryption_algorithm._key_cert_algorithm 

1745 if keycertalg is PBES.PBESv1SHA1And3KeyTripleDESCBC: 

1746 nid_cert = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC 

1747 nid_key = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC 

1748 elif keycertalg is PBES.PBESv2SHA256AndAES256CBC: 

1749 if not self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER: 

1750 raise UnsupportedAlgorithm( 

1751 "PBESv2 is not supported by this version of OpenSSL" 

1752 ) 

1753 nid_cert = self._lib.NID_aes_256_cbc 

1754 nid_key = self._lib.NID_aes_256_cbc 

1755 else: 

1756 assert keycertalg is None 

1757 # We use OpenSSL's defaults 

1758 

1759 if encryption_algorithm._hmac_hash is not None: 

1760 if not self._lib.Cryptography_HAS_PKCS12_SET_MAC: 

1761 raise UnsupportedAlgorithm( 

1762 "Setting MAC algorithm is not supported by this " 

1763 "version of OpenSSL." 

1764 ) 

1765 mac_alg = self._evp_md_non_null_from_algorithm( 

1766 encryption_algorithm._hmac_hash 

1767 ) 

1768 self.openssl_assert(mac_alg != self._ffi.NULL) 

1769 else: 

1770 mac_alg = self._ffi.NULL 

1771 

1772 if encryption_algorithm._kdf_rounds is not None: 

1773 pkcs12_iter = encryption_algorithm._kdf_rounds 

1774 

1775 else: 

1776 raise ValueError("Unsupported key encryption type") 

1777 

1778 if cas is None or len(cas) == 0: 

1779 sk_x509 = self._ffi.NULL 

1780 else: 

1781 sk_x509 = self._lib.sk_X509_new_null() 

1782 sk_x509 = self._ffi.gc(sk_x509, self._lib.sk_X509_free) 

1783 

1784 # This list is to keep the x509 values alive until end of function 

1785 ossl_cas = [] 

1786 for ca in cas: 

1787 if isinstance(ca, PKCS12Certificate): 

1788 ca_alias = ca.friendly_name 

1789 ossl_ca = self._cert2ossl(ca.certificate) 

1790 if ca_alias is None: 

1791 res = self._lib.X509_alias_set1( 

1792 ossl_ca, self._ffi.NULL, -1 

1793 ) 

1794 else: 

1795 res = self._lib.X509_alias_set1( 

1796 ossl_ca, ca_alias, len(ca_alias) 

1797 ) 

1798 self.openssl_assert(res == 1) 

1799 else: 

1800 ossl_ca = self._cert2ossl(ca) 

1801 ossl_cas.append(ossl_ca) 

1802 res = self._lib.sk_X509_push(sk_x509, ossl_ca) 

1803 backend.openssl_assert(res >= 1) 

1804 

1805 with self._zeroed_null_terminated_buf(password) as password_buf: 

1806 with self._zeroed_null_terminated_buf(name) as name_buf: 

1807 ossl_cert = self._cert2ossl(cert) if cert else self._ffi.NULL 

1808 ossl_pkey = ( 

1809 self._key2ossl(key) if key is not None else self._ffi.NULL 

1810 ) 

1811 

1812 p12 = self._lib.PKCS12_create( 

1813 password_buf, 

1814 name_buf, 

1815 ossl_pkey, 

1816 ossl_cert, 

1817 sk_x509, 

1818 nid_key, 

1819 nid_cert, 

1820 pkcs12_iter, 

1821 mac_iter, 

1822 0, 

1823 ) 

1824 

1825 if ( 

1826 self._lib.Cryptography_HAS_PKCS12_SET_MAC 

1827 and mac_alg != self._ffi.NULL 

1828 ): 

1829 self._lib.PKCS12_set_mac( 

1830 p12, 

1831 password_buf, 

1832 -1, 

1833 self._ffi.NULL, 

1834 0, 

1835 mac_iter, 

1836 mac_alg, 

1837 ) 

1838 

1839 self.openssl_assert(p12 != self._ffi.NULL) 

1840 p12 = self._ffi.gc(p12, self._lib.PKCS12_free) 

1841 

1842 bio = self._create_mem_bio_gc() 

1843 res = self._lib.i2d_PKCS12_bio(bio, p12) 

1844 self.openssl_assert(res > 0) 

1845 return self._read_mem_bio(bio) 

1846 

1847 def poly1305_supported(self) -> bool: 

1848 if self._fips_enabled: 

1849 return False 

1850 return self._lib.Cryptography_HAS_POLY1305 == 1 

1851 

1852 def pkcs7_supported(self) -> bool: 

1853 return not self._lib.CRYPTOGRAPHY_IS_BORINGSSL 

1854 

1855 def load_pem_pkcs7_certificates( 

1856 self, data: bytes 

1857 ) -> typing.List[x509.Certificate]: 

1858 utils._check_bytes("data", data) 

1859 bio = self._bytes_to_bio(data) 

1860 p7 = self._lib.PEM_read_bio_PKCS7( 

1861 bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL 

1862 ) 

1863 if p7 == self._ffi.NULL: 

1864 self._consume_errors() 

1865 raise ValueError("Unable to parse PKCS7 data") 

1866 

1867 p7 = self._ffi.gc(p7, self._lib.PKCS7_free) 

1868 return self._load_pkcs7_certificates(p7) 

1869 

1870 def load_der_pkcs7_certificates( 

1871 self, data: bytes 

1872 ) -> typing.List[x509.Certificate]: 

1873 utils._check_bytes("data", data) 

1874 bio = self._bytes_to_bio(data) 

1875 p7 = self._lib.d2i_PKCS7_bio(bio.bio, self._ffi.NULL) 

1876 if p7 == self._ffi.NULL: 

1877 self._consume_errors() 

1878 raise ValueError("Unable to parse PKCS7 data") 

1879 

1880 p7 = self._ffi.gc(p7, self._lib.PKCS7_free) 

1881 return self._load_pkcs7_certificates(p7) 

1882 

1883 def _load_pkcs7_certificates(self, p7) -> typing.List[x509.Certificate]: 

1884 nid = self._lib.OBJ_obj2nid(p7.type) 

1885 self.openssl_assert(nid != self._lib.NID_undef) 

1886 if nid != self._lib.NID_pkcs7_signed: 

1887 raise UnsupportedAlgorithm( 

1888 "Only basic signed structures are currently supported. NID" 

1889 " for this data was {}".format(nid), 

1890 _Reasons.UNSUPPORTED_SERIALIZATION, 

1891 ) 

1892 

1893 sk_x509 = p7.d.sign.cert 

1894 num = self._lib.sk_X509_num(sk_x509) 

1895 certs = [] 

1896 for i in range(num): 

1897 x509 = self._lib.sk_X509_value(sk_x509, i) 

1898 self.openssl_assert(x509 != self._ffi.NULL) 

1899 cert = self._ossl2cert(x509) 

1900 certs.append(cert) 

1901 

1902 return certs 

1903 

1904 

1905class GetCipherByName: 

1906 def __init__(self, fmt: str): 

1907 self._fmt = fmt 

1908 

1909 def __call__(self, backend: Backend, cipher: CipherAlgorithm, mode: Mode): 

1910 cipher_name = self._fmt.format(cipher=cipher, mode=mode).lower() 

1911 evp_cipher = backend._lib.EVP_get_cipherbyname( 

1912 cipher_name.encode("ascii") 

1913 ) 

1914 

1915 # try EVP_CIPHER_fetch if present 

1916 if ( 

1917 evp_cipher == backend._ffi.NULL 

1918 and backend._lib.Cryptography_HAS_300_EVP_CIPHER 

1919 ): 

1920 evp_cipher = backend._lib.EVP_CIPHER_fetch( 

1921 backend._ffi.NULL, 

1922 cipher_name.encode("ascii"), 

1923 backend._ffi.NULL, 

1924 ) 

1925 

1926 backend._consume_errors() 

1927 return evp_cipher 

1928 

1929 

1930def _get_xts_cipher(backend: Backend, cipher: AES, mode): 

1931 cipher_name = f"aes-{cipher.key_size // 2}-xts" 

1932 return backend._lib.EVP_get_cipherbyname(cipher_name.encode("ascii")) 

1933 

1934 

1935backend = Backend()