Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/cryptography/hazmat/backends/openssl/backend.py: 17%

1352 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1# This file is dual licensed under the terms of the Apache License, Version 

2# 2.0, and the BSD License. See the LICENSE file in the root of this repository 

3# for complete details. 

4 

5 

6import collections 

7import contextlib 

8import itertools 

9import typing 

10import warnings 

11from contextlib import contextmanager 

12 

13from cryptography import utils, x509 

14from cryptography.exceptions import UnsupportedAlgorithm, _Reasons 

15from cryptography.hazmat.backends.openssl import aead 

16from cryptography.hazmat.backends.openssl.ciphers import _CipherContext 

17from cryptography.hazmat.backends.openssl.cmac import _CMACContext 

18from cryptography.hazmat.backends.openssl.dh import ( 

19 _DHParameters, 

20 _DHPrivateKey, 

21 _DHPublicKey, 

22 _dh_params_dup, 

23) 

24from cryptography.hazmat.backends.openssl.dsa import ( 

25 _DSAParameters, 

26 _DSAPrivateKey, 

27 _DSAPublicKey, 

28) 

29from cryptography.hazmat.backends.openssl.ec import ( 

30 _EllipticCurvePrivateKey, 

31 _EllipticCurvePublicKey, 

32) 

33from cryptography.hazmat.backends.openssl.ed25519 import ( 

34 _Ed25519PrivateKey, 

35 _Ed25519PublicKey, 

36) 

37from cryptography.hazmat.backends.openssl.ed448 import ( 

38 _ED448_KEY_SIZE, 

39 _Ed448PrivateKey, 

40 _Ed448PublicKey, 

41) 

42from cryptography.hazmat.backends.openssl.hashes import _HashContext 

43from cryptography.hazmat.backends.openssl.hmac import _HMACContext 

44from cryptography.hazmat.backends.openssl.poly1305 import ( 

45 _POLY1305_KEY_SIZE, 

46 _Poly1305Context, 

47) 

48from cryptography.hazmat.backends.openssl.rsa import ( 

49 _RSAPrivateKey, 

50 _RSAPublicKey, 

51) 

52from cryptography.hazmat.backends.openssl.x25519 import ( 

53 _X25519PrivateKey, 

54 _X25519PublicKey, 

55) 

56from cryptography.hazmat.backends.openssl.x448 import ( 

57 _X448PrivateKey, 

58 _X448PublicKey, 

59) 

60from cryptography.hazmat.bindings._rust import ( 

61 x509 as rust_x509, 

62) 

63from cryptography.hazmat.bindings.openssl import binding 

64from cryptography.hazmat.primitives import hashes, serialization 

65from cryptography.hazmat.primitives._asymmetric import AsymmetricPadding 

66from cryptography.hazmat.primitives.asymmetric import ( 

67 dh, 

68 dsa, 

69 ec, 

70 ed25519, 

71 ed448, 

72 rsa, 

73 x25519, 

74 x448, 

75) 

76from cryptography.hazmat.primitives.asymmetric.padding import ( 

77 MGF1, 

78 OAEP, 

79 PKCS1v15, 

80 PSS, 

81) 

82from cryptography.hazmat.primitives.asymmetric.types import ( 

83 CERTIFICATE_ISSUER_PUBLIC_KEY_TYPES, 

84 PRIVATE_KEY_TYPES, 

85 PUBLIC_KEY_TYPES, 

86) 

87from cryptography.hazmat.primitives.ciphers import ( 

88 BlockCipherAlgorithm, 

89 CipherAlgorithm, 

90) 

91from cryptography.hazmat.primitives.ciphers.algorithms import ( 

92 AES, 

93 AES128, 

94 AES256, 

95 ARC4, 

96 Camellia, 

97 ChaCha20, 

98 SM4, 

99 TripleDES, 

100 _BlowfishInternal, 

101 _CAST5Internal, 

102 _IDEAInternal, 

103 _SEEDInternal, 

104) 

105from cryptography.hazmat.primitives.ciphers.modes import ( 

106 CBC, 

107 CFB, 

108 CFB8, 

109 CTR, 

110 ECB, 

111 GCM, 

112 Mode, 

113 OFB, 

114 XTS, 

115) 

116from cryptography.hazmat.primitives.kdf import scrypt 

117from cryptography.hazmat.primitives.serialization import pkcs7, ssh 

118from cryptography.hazmat.primitives.serialization.pkcs12 import ( 

119 PBES, 

120 PKCS12Certificate, 

121 PKCS12KeyAndCertificates, 

122 _ALLOWED_PKCS12_TYPES, 

123 _PKCS12_CAS_TYPES, 

124) 

125 

126 

127_MemoryBIO = collections.namedtuple("_MemoryBIO", ["bio", "char_ptr"]) 

128 

129 

130# Not actually supported, just used as a marker for some serialization tests. 

131class _RC2: 

132 pass 

133 

134 

135class Backend: 

136 """ 

137 OpenSSL API binding interfaces. 

138 """ 

139 

140 name = "openssl" 

141 

142 # FIPS has opinions about acceptable algorithms and key sizes, but the 

143 # disallowed algorithms are still present in OpenSSL. They just error if 

144 # you try to use them. To avoid that we allowlist the algorithms in 

145 # FIPS 140-3. This isn't ideal, but FIPS 140-3 is trash so here we are. 

146 _fips_aead = { 

147 b"aes-128-ccm", 

148 b"aes-192-ccm", 

149 b"aes-256-ccm", 

150 b"aes-128-gcm", 

151 b"aes-192-gcm", 

152 b"aes-256-gcm", 

153 } 

154 # TripleDES encryption is disallowed/deprecated throughout 2023 in 

155 # FIPS 140-3. To keep it simple we denylist any use of TripleDES (TDEA). 

156 _fips_ciphers = (AES,) 

157 # Sometimes SHA1 is still permissible. That logic is contained 

158 # within the various *_supported methods. 

159 _fips_hashes = ( 

160 hashes.SHA224, 

161 hashes.SHA256, 

162 hashes.SHA384, 

163 hashes.SHA512, 

164 hashes.SHA512_224, 

165 hashes.SHA512_256, 

166 hashes.SHA3_224, 

167 hashes.SHA3_256, 

168 hashes.SHA3_384, 

169 hashes.SHA3_512, 

170 hashes.SHAKE128, 

171 hashes.SHAKE256, 

172 ) 

173 _fips_ecdh_curves = ( 

174 ec.SECP224R1, 

175 ec.SECP256R1, 

176 ec.SECP384R1, 

177 ec.SECP521R1, 

178 ) 

179 _fips_rsa_min_key_size = 2048 

180 _fips_rsa_min_public_exponent = 65537 

181 _fips_dsa_min_modulus = 1 << 2048 

182 _fips_dh_min_key_size = 2048 

183 _fips_dh_min_modulus = 1 << _fips_dh_min_key_size 

184 

185 def __init__(self): 

186 self._binding = binding.Binding() 

187 self._ffi = self._binding.ffi 

188 self._lib = self._binding.lib 

189 self._rsa_skip_check_key = False 

190 self._fips_enabled = self._is_fips_enabled() 

191 

192 self._cipher_registry = {} 

193 self._register_default_ciphers() 

194 if self._fips_enabled and self._lib.CRYPTOGRAPHY_NEEDS_OSRANDOM_ENGINE: 

195 warnings.warn( 

196 "OpenSSL FIPS mode is enabled. Can't enable DRBG fork safety.", 

197 UserWarning, 

198 ) 

199 else: 

200 self.activate_osrandom_engine() 

201 self._dh_types = [self._lib.EVP_PKEY_DH] 

202 if self._lib.Cryptography_HAS_EVP_PKEY_DHX: 

203 self._dh_types.append(self._lib.EVP_PKEY_DHX) 

204 

205 def __repr__(self) -> str: 

206 return "<OpenSSLBackend(version: {}, FIPS: {})>".format( 

207 self.openssl_version_text(), self._fips_enabled 

208 ) 

209 

210 def openssl_assert( 

211 self, 

212 ok: bool, 

213 errors: typing.Optional[typing.List[binding._OpenSSLError]] = None, 

214 ) -> None: 

215 return binding._openssl_assert(self._lib, ok, errors=errors) 

216 

217 def _is_fips_enabled(self) -> bool: 

218 if self._lib.Cryptography_HAS_300_FIPS: 

219 mode = self._lib.EVP_default_properties_is_fips_enabled( 

220 self._ffi.NULL 

221 ) 

222 else: 

223 mode = getattr(self._lib, "FIPS_mode", lambda: 0)() 

224 

225 if mode == 0: 

226 # OpenSSL without FIPS pushes an error on the error stack 

227 self._lib.ERR_clear_error() 

228 return bool(mode) 

229 

230 def _enable_fips(self) -> None: 

231 # This function enables FIPS mode for OpenSSL 3.0.0 on installs that 

232 # have the FIPS provider installed properly. 

233 self._binding._enable_fips() 

234 assert self._is_fips_enabled() 

235 self._fips_enabled = self._is_fips_enabled() 

236 

237 def activate_builtin_random(self) -> None: 

238 if self._lib.CRYPTOGRAPHY_NEEDS_OSRANDOM_ENGINE: 

239 # Obtain a new structural reference. 

240 e = self._lib.ENGINE_get_default_RAND() 

241 if e != self._ffi.NULL: 

242 self._lib.ENGINE_unregister_RAND(e) 

243 # Reset the RNG to use the built-in. 

244 res = self._lib.RAND_set_rand_method(self._ffi.NULL) 

245 self.openssl_assert(res == 1) 

246 # decrement the structural reference from get_default_RAND 

247 res = self._lib.ENGINE_finish(e) 

248 self.openssl_assert(res == 1) 

249 

250 @contextlib.contextmanager 

251 def _get_osurandom_engine(self): 

252 # Fetches an engine by id and returns it. This creates a structural 

253 # reference. 

254 e = self._lib.ENGINE_by_id(self._lib.Cryptography_osrandom_engine_id) 

255 self.openssl_assert(e != self._ffi.NULL) 

256 # Initialize the engine for use. This adds a functional reference. 

257 res = self._lib.ENGINE_init(e) 

258 self.openssl_assert(res == 1) 

259 

260 try: 

261 yield e 

262 finally: 

263 # Decrement the structural ref incremented by ENGINE_by_id. 

264 res = self._lib.ENGINE_free(e) 

265 self.openssl_assert(res == 1) 

266 # Decrement the functional ref incremented by ENGINE_init. 

267 res = self._lib.ENGINE_finish(e) 

268 self.openssl_assert(res == 1) 

269 

270 def activate_osrandom_engine(self) -> None: 

271 if self._lib.CRYPTOGRAPHY_NEEDS_OSRANDOM_ENGINE: 

272 # Unregister and free the current engine. 

273 self.activate_builtin_random() 

274 with self._get_osurandom_engine() as e: 

275 # Set the engine as the default RAND provider. 

276 res = self._lib.ENGINE_set_default_RAND(e) 

277 self.openssl_assert(res == 1) 

278 # Reset the RNG to use the engine 

279 res = self._lib.RAND_set_rand_method(self._ffi.NULL) 

280 self.openssl_assert(res == 1) 

281 

282 def osrandom_engine_implementation(self) -> str: 

283 buf = self._ffi.new("char[]", 64) 

284 with self._get_osurandom_engine() as e: 

285 res = self._lib.ENGINE_ctrl_cmd( 

286 e, b"get_implementation", len(buf), buf, self._ffi.NULL, 0 

287 ) 

288 self.openssl_assert(res > 0) 

289 return self._ffi.string(buf).decode("ascii") 

290 

291 def openssl_version_text(self) -> str: 

292 """ 

293 Friendly string name of the loaded OpenSSL library. This is not 

294 necessarily the same version as it was compiled against. 

295 

296 Example: OpenSSL 1.1.1d 10 Sep 2019 

297 """ 

298 return self._ffi.string( 

299 self._lib.OpenSSL_version(self._lib.OPENSSL_VERSION) 

300 ).decode("ascii") 

301 

302 def openssl_version_number(self) -> int: 

303 return self._lib.OpenSSL_version_num() 

304 

305 def create_hmac_ctx( 

306 self, key: bytes, algorithm: hashes.HashAlgorithm 

307 ) -> _HMACContext: 

308 return _HMACContext(self, key, algorithm) 

309 

310 def _evp_md_from_algorithm(self, algorithm: hashes.HashAlgorithm): 

311 if algorithm.name == "blake2b" or algorithm.name == "blake2s": 

312 alg = "{}{}".format( 

313 algorithm.name, algorithm.digest_size * 8 

314 ).encode("ascii") 

315 else: 

316 alg = algorithm.name.encode("ascii") 

317 

318 evp_md = self._lib.EVP_get_digestbyname(alg) 

319 return evp_md 

320 

321 def _evp_md_non_null_from_algorithm(self, algorithm: hashes.HashAlgorithm): 

322 evp_md = self._evp_md_from_algorithm(algorithm) 

323 self.openssl_assert(evp_md != self._ffi.NULL) 

324 return evp_md 

325 

326 def hash_supported(self, algorithm: hashes.HashAlgorithm) -> bool: 

327 if self._fips_enabled and not isinstance(algorithm, self._fips_hashes): 

328 return False 

329 

330 evp_md = self._evp_md_from_algorithm(algorithm) 

331 return evp_md != self._ffi.NULL 

332 

333 def signature_hash_supported( 

334 self, algorithm: hashes.HashAlgorithm 

335 ) -> bool: 

336 # Dedicated check for hashing algorithm use in message digest for 

337 # signatures, e.g. RSA PKCS#1 v1.5 SHA1 (sha1WithRSAEncryption). 

338 if self._fips_enabled and isinstance(algorithm, hashes.SHA1): 

339 return False 

340 return self.hash_supported(algorithm) 

341 

342 def scrypt_supported(self) -> bool: 

343 if self._fips_enabled: 

344 return False 

345 else: 

346 return self._lib.Cryptography_HAS_SCRYPT == 1 

347 

348 def hmac_supported(self, algorithm: hashes.HashAlgorithm) -> bool: 

349 # FIPS mode still allows SHA1 for HMAC 

350 if self._fips_enabled and isinstance(algorithm, hashes.SHA1): 

351 return True 

352 

353 return self.hash_supported(algorithm) 

354 

355 def create_hash_ctx( 

356 self, algorithm: hashes.HashAlgorithm 

357 ) -> hashes.HashContext: 

358 return _HashContext(self, algorithm) 

359 

360 def cipher_supported(self, cipher: CipherAlgorithm, mode: Mode) -> bool: 

361 if self._fips_enabled: 

362 # FIPS mode requires AES. TripleDES is disallowed/deprecated in 

363 # FIPS 140-3. 

364 if not isinstance(cipher, self._fips_ciphers): 

365 return False 

366 

367 try: 

368 adapter = self._cipher_registry[type(cipher), type(mode)] 

369 except KeyError: 

370 return False 

371 evp_cipher = adapter(self, cipher, mode) 

372 return self._ffi.NULL != evp_cipher 

373 

374 def register_cipher_adapter(self, cipher_cls, mode_cls, adapter): 

375 if (cipher_cls, mode_cls) in self._cipher_registry: 

376 raise ValueError( 

377 "Duplicate registration for: {} {}.".format( 

378 cipher_cls, mode_cls 

379 ) 

380 ) 

381 self._cipher_registry[cipher_cls, mode_cls] = adapter 

382 

383 def _register_default_ciphers(self) -> None: 

384 for cipher_cls in [AES, AES128, AES256]: 

385 for mode_cls in [CBC, CTR, ECB, OFB, CFB, CFB8, GCM]: 

386 self.register_cipher_adapter( 

387 cipher_cls, 

388 mode_cls, 

389 GetCipherByName( 

390 "{cipher.name}-{cipher.key_size}-{mode.name}" 

391 ), 

392 ) 

393 for mode_cls in [CBC, CTR, ECB, OFB, CFB]: 

394 self.register_cipher_adapter( 

395 Camellia, 

396 mode_cls, 

397 GetCipherByName("{cipher.name}-{cipher.key_size}-{mode.name}"), 

398 ) 

399 for mode_cls in [CBC, CFB, CFB8, OFB]: 

400 self.register_cipher_adapter( 

401 TripleDES, mode_cls, GetCipherByName("des-ede3-{mode.name}") 

402 ) 

403 self.register_cipher_adapter( 

404 TripleDES, ECB, GetCipherByName("des-ede3") 

405 ) 

406 for mode_cls in [CBC, CFB, OFB, ECB]: 

407 self.register_cipher_adapter( 

408 _BlowfishInternal, mode_cls, GetCipherByName("bf-{mode.name}") 

409 ) 

410 for mode_cls in [CBC, CFB, OFB, ECB]: 

411 self.register_cipher_adapter( 

412 _SEEDInternal, mode_cls, GetCipherByName("seed-{mode.name}") 

413 ) 

414 for cipher_cls, mode_cls in itertools.product( 

415 [_CAST5Internal, _IDEAInternal], 

416 [CBC, OFB, CFB, ECB], 

417 ): 

418 self.register_cipher_adapter( 

419 cipher_cls, 

420 mode_cls, 

421 GetCipherByName("{cipher.name}-{mode.name}"), 

422 ) 

423 self.register_cipher_adapter(ARC4, type(None), GetCipherByName("rc4")) 

424 # We don't actually support RC2, this is just used by some tests. 

425 self.register_cipher_adapter(_RC2, type(None), GetCipherByName("rc2")) 

426 self.register_cipher_adapter( 

427 ChaCha20, type(None), GetCipherByName("chacha20") 

428 ) 

429 self.register_cipher_adapter(AES, XTS, _get_xts_cipher) 

430 for mode_cls in [ECB, CBC, OFB, CFB, CTR]: 

431 self.register_cipher_adapter( 

432 SM4, mode_cls, GetCipherByName("sm4-{mode.name}") 

433 ) 

434 

435 def create_symmetric_encryption_ctx( 

436 self, cipher: CipherAlgorithm, mode: Mode 

437 ) -> _CipherContext: 

438 return _CipherContext(self, cipher, mode, _CipherContext._ENCRYPT) 

439 

440 def create_symmetric_decryption_ctx( 

441 self, cipher: CipherAlgorithm, mode: Mode 

442 ) -> _CipherContext: 

443 return _CipherContext(self, cipher, mode, _CipherContext._DECRYPT) 

444 

445 def pbkdf2_hmac_supported(self, algorithm: hashes.HashAlgorithm) -> bool: 

446 return self.hmac_supported(algorithm) 

447 

448 def derive_pbkdf2_hmac( 

449 self, 

450 algorithm: hashes.HashAlgorithm, 

451 length: int, 

452 salt: bytes, 

453 iterations: int, 

454 key_material: bytes, 

455 ) -> bytes: 

456 buf = self._ffi.new("unsigned char[]", length) 

457 evp_md = self._evp_md_non_null_from_algorithm(algorithm) 

458 key_material_ptr = self._ffi.from_buffer(key_material) 

459 res = self._lib.PKCS5_PBKDF2_HMAC( 

460 key_material_ptr, 

461 len(key_material), 

462 salt, 

463 len(salt), 

464 iterations, 

465 evp_md, 

466 length, 

467 buf, 

468 ) 

469 self.openssl_assert(res == 1) 

470 return self._ffi.buffer(buf)[:] 

471 

472 def _consume_errors(self) -> typing.List[binding._OpenSSLError]: 

473 return binding._consume_errors(self._lib) 

474 

475 def _consume_errors_with_text( 

476 self, 

477 ) -> typing.List[binding._OpenSSLErrorWithText]: 

478 return binding._consume_errors_with_text(self._lib) 

479 

480 def _bn_to_int(self, bn) -> int: 

481 assert bn != self._ffi.NULL 

482 self.openssl_assert(not self._lib.BN_is_negative(bn)) 

483 

484 bn_num_bytes = self._lib.BN_num_bytes(bn) 

485 bin_ptr = self._ffi.new("unsigned char[]", bn_num_bytes) 

486 bin_len = self._lib.BN_bn2bin(bn, bin_ptr) 

487 # A zero length means the BN has value 0 

488 self.openssl_assert(bin_len >= 0) 

489 val = int.from_bytes(self._ffi.buffer(bin_ptr)[:bin_len], "big") 

490 return val 

491 

492 def _int_to_bn(self, num: int, bn=None): 

493 """ 

494 Converts a python integer to a BIGNUM. The returned BIGNUM will not 

495 be garbage collected (to support adding them to structs that take 

496 ownership of the object). Be sure to register it for GC if it will 

497 be discarded after use. 

498 """ 

499 assert bn is None or bn != self._ffi.NULL 

500 

501 if bn is None: 

502 bn = self._ffi.NULL 

503 

504 binary = num.to_bytes(int(num.bit_length() / 8.0 + 1), "big") 

505 bn_ptr = self._lib.BN_bin2bn(binary, len(binary), bn) 

506 self.openssl_assert(bn_ptr != self._ffi.NULL) 

507 return bn_ptr 

508 

509 def generate_rsa_private_key( 

510 self, public_exponent: int, key_size: int 

511 ) -> rsa.RSAPrivateKey: 

512 rsa._verify_rsa_parameters(public_exponent, key_size) 

513 

514 rsa_cdata = self._lib.RSA_new() 

515 self.openssl_assert(rsa_cdata != self._ffi.NULL) 

516 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) 

517 

518 bn = self._int_to_bn(public_exponent) 

519 bn = self._ffi.gc(bn, self._lib.BN_free) 

520 

521 res = self._lib.RSA_generate_key_ex( 

522 rsa_cdata, key_size, bn, self._ffi.NULL 

523 ) 

524 self.openssl_assert(res == 1) 

525 evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata) 

526 

527 return _RSAPrivateKey( 

528 self, rsa_cdata, evp_pkey, self._rsa_skip_check_key 

529 ) 

530 

531 def generate_rsa_parameters_supported( 

532 self, public_exponent: int, key_size: int 

533 ) -> bool: 

534 return ( 

535 public_exponent >= 3 

536 and public_exponent & 1 != 0 

537 and key_size >= 512 

538 ) 

539 

540 def load_rsa_private_numbers( 

541 self, numbers: rsa.RSAPrivateNumbers 

542 ) -> rsa.RSAPrivateKey: 

543 rsa._check_private_key_components( 

544 numbers.p, 

545 numbers.q, 

546 numbers.d, 

547 numbers.dmp1, 

548 numbers.dmq1, 

549 numbers.iqmp, 

550 numbers.public_numbers.e, 

551 numbers.public_numbers.n, 

552 ) 

553 rsa_cdata = self._lib.RSA_new() 

554 self.openssl_assert(rsa_cdata != self._ffi.NULL) 

555 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) 

556 p = self._int_to_bn(numbers.p) 

557 q = self._int_to_bn(numbers.q) 

558 d = self._int_to_bn(numbers.d) 

559 dmp1 = self._int_to_bn(numbers.dmp1) 

560 dmq1 = self._int_to_bn(numbers.dmq1) 

561 iqmp = self._int_to_bn(numbers.iqmp) 

562 e = self._int_to_bn(numbers.public_numbers.e) 

563 n = self._int_to_bn(numbers.public_numbers.n) 

564 res = self._lib.RSA_set0_factors(rsa_cdata, p, q) 

565 self.openssl_assert(res == 1) 

566 res = self._lib.RSA_set0_key(rsa_cdata, n, e, d) 

567 self.openssl_assert(res == 1) 

568 res = self._lib.RSA_set0_crt_params(rsa_cdata, dmp1, dmq1, iqmp) 

569 self.openssl_assert(res == 1) 

570 evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata) 

571 

572 return _RSAPrivateKey( 

573 self, rsa_cdata, evp_pkey, self._rsa_skip_check_key 

574 ) 

575 

576 def load_rsa_public_numbers( 

577 self, numbers: rsa.RSAPublicNumbers 

578 ) -> rsa.RSAPublicKey: 

579 rsa._check_public_key_components(numbers.e, numbers.n) 

580 rsa_cdata = self._lib.RSA_new() 

581 self.openssl_assert(rsa_cdata != self._ffi.NULL) 

582 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) 

583 e = self._int_to_bn(numbers.e) 

584 n = self._int_to_bn(numbers.n) 

585 res = self._lib.RSA_set0_key(rsa_cdata, n, e, self._ffi.NULL) 

586 self.openssl_assert(res == 1) 

587 evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata) 

588 

589 return _RSAPublicKey(self, rsa_cdata, evp_pkey) 

590 

591 def _create_evp_pkey_gc(self): 

592 evp_pkey = self._lib.EVP_PKEY_new() 

593 self.openssl_assert(evp_pkey != self._ffi.NULL) 

594 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 

595 return evp_pkey 

596 

597 def _rsa_cdata_to_evp_pkey(self, rsa_cdata): 

598 evp_pkey = self._create_evp_pkey_gc() 

599 res = self._lib.EVP_PKEY_set1_RSA(evp_pkey, rsa_cdata) 

600 self.openssl_assert(res == 1) 

601 return evp_pkey 

602 

603 def _bytes_to_bio(self, data: bytes): 

604 """ 

605 Return a _MemoryBIO namedtuple of (BIO, char*). 

606 

607 The char* is the storage for the BIO and it must stay alive until the 

608 BIO is finished with. 

609 """ 

610 data_ptr = self._ffi.from_buffer(data) 

611 bio = self._lib.BIO_new_mem_buf(data_ptr, len(data)) 

612 self.openssl_assert(bio != self._ffi.NULL) 

613 

614 return _MemoryBIO(self._ffi.gc(bio, self._lib.BIO_free), data_ptr) 

615 

616 def _create_mem_bio_gc(self): 

617 """ 

618 Creates an empty memory BIO. 

619 """ 

620 bio_method = self._lib.BIO_s_mem() 

621 self.openssl_assert(bio_method != self._ffi.NULL) 

622 bio = self._lib.BIO_new(bio_method) 

623 self.openssl_assert(bio != self._ffi.NULL) 

624 bio = self._ffi.gc(bio, self._lib.BIO_free) 

625 return bio 

626 

627 def _read_mem_bio(self, bio) -> bytes: 

628 """ 

629 Reads a memory BIO. This only works on memory BIOs. 

630 """ 

631 buf = self._ffi.new("char **") 

632 buf_len = self._lib.BIO_get_mem_data(bio, buf) 

633 self.openssl_assert(buf_len > 0) 

634 self.openssl_assert(buf[0] != self._ffi.NULL) 

635 bio_data = self._ffi.buffer(buf[0], buf_len)[:] 

636 return bio_data 

637 

638 def _evp_pkey_to_private_key(self, evp_pkey) -> PRIVATE_KEY_TYPES: 

639 """ 

640 Return the appropriate type of PrivateKey given an evp_pkey cdata 

641 pointer. 

642 """ 

643 

644 key_type = self._lib.EVP_PKEY_id(evp_pkey) 

645 

646 if key_type == self._lib.EVP_PKEY_RSA: 

647 rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey) 

648 self.openssl_assert(rsa_cdata != self._ffi.NULL) 

649 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) 

650 return _RSAPrivateKey( 

651 self, rsa_cdata, evp_pkey, self._rsa_skip_check_key 

652 ) 

653 elif ( 

654 key_type == self._lib.EVP_PKEY_RSA_PSS 

655 and not self._lib.CRYPTOGRAPHY_IS_LIBRESSL 

656 and not self._lib.CRYPTOGRAPHY_IS_BORINGSSL 

657 and not self._lib.CRYPTOGRAPHY_OPENSSL_LESS_THAN_111E 

658 ): 

659 # At the moment the way we handle RSA PSS keys is to strip the 

660 # PSS constraints from them and treat them as normal RSA keys 

661 # Unfortunately the RSA * itself tracks this data so we need to 

662 # extract, serialize, and reload it without the constraints. 

663 rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey) 

664 self.openssl_assert(rsa_cdata != self._ffi.NULL) 

665 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) 

666 bio = self._create_mem_bio_gc() 

667 res = self._lib.i2d_RSAPrivateKey_bio(bio, rsa_cdata) 

668 self.openssl_assert(res == 1) 

669 return self.load_der_private_key( 

670 self._read_mem_bio(bio), password=None 

671 ) 

672 elif key_type == self._lib.EVP_PKEY_DSA: 

673 dsa_cdata = self._lib.EVP_PKEY_get1_DSA(evp_pkey) 

674 self.openssl_assert(dsa_cdata != self._ffi.NULL) 

675 dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free) 

676 return _DSAPrivateKey(self, dsa_cdata, evp_pkey) 

677 elif key_type == self._lib.EVP_PKEY_EC: 

678 ec_cdata = self._lib.EVP_PKEY_get1_EC_KEY(evp_pkey) 

679 self.openssl_assert(ec_cdata != self._ffi.NULL) 

680 ec_cdata = self._ffi.gc(ec_cdata, self._lib.EC_KEY_free) 

681 return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey) 

682 elif key_type in self._dh_types: 

683 dh_cdata = self._lib.EVP_PKEY_get1_DH(evp_pkey) 

684 self.openssl_assert(dh_cdata != self._ffi.NULL) 

685 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free) 

686 return _DHPrivateKey(self, dh_cdata, evp_pkey) 

687 elif key_type == getattr(self._lib, "EVP_PKEY_ED25519", None): 

688 # EVP_PKEY_ED25519 is not present in OpenSSL < 1.1.1 

689 return _Ed25519PrivateKey(self, evp_pkey) 

690 elif key_type == getattr(self._lib, "EVP_PKEY_X448", None): 

691 # EVP_PKEY_X448 is not present in OpenSSL < 1.1.1 

692 return _X448PrivateKey(self, evp_pkey) 

693 elif key_type == getattr(self._lib, "EVP_PKEY_X25519", None): 

694 # EVP_PKEY_X25519 is not present in OpenSSL < 1.1.0 

695 return _X25519PrivateKey(self, evp_pkey) 

696 elif key_type == getattr(self._lib, "EVP_PKEY_ED448", None): 

697 # EVP_PKEY_ED448 is not present in OpenSSL < 1.1.1 

698 return _Ed448PrivateKey(self, evp_pkey) 

699 else: 

700 raise UnsupportedAlgorithm("Unsupported key type.") 

701 

702 def _evp_pkey_to_public_key(self, evp_pkey) -> PUBLIC_KEY_TYPES: 

703 """ 

704 Return the appropriate type of PublicKey given an evp_pkey cdata 

705 pointer. 

706 """ 

707 

708 key_type = self._lib.EVP_PKEY_id(evp_pkey) 

709 

710 if key_type == self._lib.EVP_PKEY_RSA: 

711 rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey) 

712 self.openssl_assert(rsa_cdata != self._ffi.NULL) 

713 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) 

714 return _RSAPublicKey(self, rsa_cdata, evp_pkey) 

715 elif ( 

716 key_type == self._lib.EVP_PKEY_RSA_PSS 

717 and not self._lib.CRYPTOGRAPHY_IS_LIBRESSL 

718 and not self._lib.CRYPTOGRAPHY_IS_BORINGSSL 

719 and not self._lib.CRYPTOGRAPHY_OPENSSL_LESS_THAN_111E 

720 ): 

721 rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey) 

722 self.openssl_assert(rsa_cdata != self._ffi.NULL) 

723 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) 

724 bio = self._create_mem_bio_gc() 

725 res = self._lib.i2d_RSAPublicKey_bio(bio, rsa_cdata) 

726 self.openssl_assert(res == 1) 

727 return self.load_der_public_key(self._read_mem_bio(bio)) 

728 elif key_type == self._lib.EVP_PKEY_DSA: 

729 dsa_cdata = self._lib.EVP_PKEY_get1_DSA(evp_pkey) 

730 self.openssl_assert(dsa_cdata != self._ffi.NULL) 

731 dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free) 

732 return _DSAPublicKey(self, dsa_cdata, evp_pkey) 

733 elif key_type == self._lib.EVP_PKEY_EC: 

734 ec_cdata = self._lib.EVP_PKEY_get1_EC_KEY(evp_pkey) 

735 if ec_cdata == self._ffi.NULL: 

736 errors = self._consume_errors_with_text() 

737 raise ValueError("Unable to load EC key", errors) 

738 ec_cdata = self._ffi.gc(ec_cdata, self._lib.EC_KEY_free) 

739 return _EllipticCurvePublicKey(self, ec_cdata, evp_pkey) 

740 elif key_type in self._dh_types: 

741 dh_cdata = self._lib.EVP_PKEY_get1_DH(evp_pkey) 

742 self.openssl_assert(dh_cdata != self._ffi.NULL) 

743 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free) 

744 return _DHPublicKey(self, dh_cdata, evp_pkey) 

745 elif key_type == getattr(self._lib, "EVP_PKEY_ED25519", None): 

746 # EVP_PKEY_ED25519 is not present in OpenSSL < 1.1.1 

747 return _Ed25519PublicKey(self, evp_pkey) 

748 elif key_type == getattr(self._lib, "EVP_PKEY_X448", None): 

749 # EVP_PKEY_X448 is not present in OpenSSL < 1.1.1 

750 return _X448PublicKey(self, evp_pkey) 

751 elif key_type == getattr(self._lib, "EVP_PKEY_X25519", None): 

752 # EVP_PKEY_X25519 is not present in OpenSSL < 1.1.0 

753 return _X25519PublicKey(self, evp_pkey) 

754 elif key_type == getattr(self._lib, "EVP_PKEY_ED448", None): 

755 # EVP_PKEY_X25519 is not present in OpenSSL < 1.1.1 

756 return _Ed448PublicKey(self, evp_pkey) 

757 else: 

758 raise UnsupportedAlgorithm("Unsupported key type.") 

759 

760 def _oaep_hash_supported(self, algorithm: hashes.HashAlgorithm) -> bool: 

761 return isinstance( 

762 algorithm, 

763 ( 

764 hashes.SHA1, 

765 hashes.SHA224, 

766 hashes.SHA256, 

767 hashes.SHA384, 

768 hashes.SHA512, 

769 ), 

770 ) 

771 

772 def rsa_padding_supported(self, padding: AsymmetricPadding) -> bool: 

773 if isinstance(padding, PKCS1v15): 

774 return True 

775 elif isinstance(padding, PSS) and isinstance(padding._mgf, MGF1): 

776 # SHA1 is permissible in MGF1 in FIPS even when SHA1 is blocked 

777 # as signature algorithm. 

778 if self._fips_enabled and isinstance( 

779 padding._mgf._algorithm, hashes.SHA1 

780 ): 

781 return True 

782 else: 

783 return self.hash_supported(padding._mgf._algorithm) 

784 elif isinstance(padding, OAEP) and isinstance(padding._mgf, MGF1): 

785 return self._oaep_hash_supported( 

786 padding._mgf._algorithm 

787 ) and self._oaep_hash_supported(padding._algorithm) 

788 else: 

789 return False 

790 

791 def generate_dsa_parameters(self, key_size: int) -> dsa.DSAParameters: 

792 if key_size not in (1024, 2048, 3072, 4096): 

793 raise ValueError( 

794 "Key size must be 1024, 2048, 3072, or 4096 bits." 

795 ) 

796 

797 ctx = self._lib.DSA_new() 

798 self.openssl_assert(ctx != self._ffi.NULL) 

799 ctx = self._ffi.gc(ctx, self._lib.DSA_free) 

800 

801 res = self._lib.DSA_generate_parameters_ex( 

802 ctx, 

803 key_size, 

804 self._ffi.NULL, 

805 0, 

806 self._ffi.NULL, 

807 self._ffi.NULL, 

808 self._ffi.NULL, 

809 ) 

810 

811 self.openssl_assert(res == 1) 

812 

813 return _DSAParameters(self, ctx) 

814 

815 def generate_dsa_private_key( 

816 self, parameters: dsa.DSAParameters 

817 ) -> dsa.DSAPrivateKey: 

818 ctx = self._lib.DSAparams_dup( 

819 parameters._dsa_cdata # type: ignore[attr-defined] 

820 ) 

821 self.openssl_assert(ctx != self._ffi.NULL) 

822 ctx = self._ffi.gc(ctx, self._lib.DSA_free) 

823 self._lib.DSA_generate_key(ctx) 

824 evp_pkey = self._dsa_cdata_to_evp_pkey(ctx) 

825 

826 return _DSAPrivateKey(self, ctx, evp_pkey) 

827 

828 def generate_dsa_private_key_and_parameters( 

829 self, key_size: int 

830 ) -> dsa.DSAPrivateKey: 

831 parameters = self.generate_dsa_parameters(key_size) 

832 return self.generate_dsa_private_key(parameters) 

833 

834 def _dsa_cdata_set_values(self, dsa_cdata, p, q, g, pub_key, priv_key): 

835 res = self._lib.DSA_set0_pqg(dsa_cdata, p, q, g) 

836 self.openssl_assert(res == 1) 

837 res = self._lib.DSA_set0_key(dsa_cdata, pub_key, priv_key) 

838 self.openssl_assert(res == 1) 

839 

840 def load_dsa_private_numbers( 

841 self, numbers: dsa.DSAPrivateNumbers 

842 ) -> dsa.DSAPrivateKey: 

843 dsa._check_dsa_private_numbers(numbers) 

844 parameter_numbers = numbers.public_numbers.parameter_numbers 

845 

846 dsa_cdata = self._lib.DSA_new() 

847 self.openssl_assert(dsa_cdata != self._ffi.NULL) 

848 dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free) 

849 

850 p = self._int_to_bn(parameter_numbers.p) 

851 q = self._int_to_bn(parameter_numbers.q) 

852 g = self._int_to_bn(parameter_numbers.g) 

853 pub_key = self._int_to_bn(numbers.public_numbers.y) 

854 priv_key = self._int_to_bn(numbers.x) 

855 self._dsa_cdata_set_values(dsa_cdata, p, q, g, pub_key, priv_key) 

856 

857 evp_pkey = self._dsa_cdata_to_evp_pkey(dsa_cdata) 

858 

859 return _DSAPrivateKey(self, dsa_cdata, evp_pkey) 

860 

861 def load_dsa_public_numbers( 

862 self, numbers: dsa.DSAPublicNumbers 

863 ) -> dsa.DSAPublicKey: 

864 dsa._check_dsa_parameters(numbers.parameter_numbers) 

865 dsa_cdata = self._lib.DSA_new() 

866 self.openssl_assert(dsa_cdata != self._ffi.NULL) 

867 dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free) 

868 

869 p = self._int_to_bn(numbers.parameter_numbers.p) 

870 q = self._int_to_bn(numbers.parameter_numbers.q) 

871 g = self._int_to_bn(numbers.parameter_numbers.g) 

872 pub_key = self._int_to_bn(numbers.y) 

873 priv_key = self._ffi.NULL 

874 self._dsa_cdata_set_values(dsa_cdata, p, q, g, pub_key, priv_key) 

875 

876 evp_pkey = self._dsa_cdata_to_evp_pkey(dsa_cdata) 

877 

878 return _DSAPublicKey(self, dsa_cdata, evp_pkey) 

879 

880 def load_dsa_parameter_numbers( 

881 self, numbers: dsa.DSAParameterNumbers 

882 ) -> dsa.DSAParameters: 

883 dsa._check_dsa_parameters(numbers) 

884 dsa_cdata = self._lib.DSA_new() 

885 self.openssl_assert(dsa_cdata != self._ffi.NULL) 

886 dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free) 

887 

888 p = self._int_to_bn(numbers.p) 

889 q = self._int_to_bn(numbers.q) 

890 g = self._int_to_bn(numbers.g) 

891 res = self._lib.DSA_set0_pqg(dsa_cdata, p, q, g) 

892 self.openssl_assert(res == 1) 

893 

894 return _DSAParameters(self, dsa_cdata) 

895 

896 def _dsa_cdata_to_evp_pkey(self, dsa_cdata): 

897 evp_pkey = self._create_evp_pkey_gc() 

898 res = self._lib.EVP_PKEY_set1_DSA(evp_pkey, dsa_cdata) 

899 self.openssl_assert(res == 1) 

900 return evp_pkey 

901 

902 def dsa_supported(self) -> bool: 

903 return not self._fips_enabled 

904 

905 def dsa_hash_supported(self, algorithm: hashes.HashAlgorithm) -> bool: 

906 if not self.dsa_supported(): 

907 return False 

908 return self.signature_hash_supported(algorithm) 

909 

910 def cmac_algorithm_supported(self, algorithm) -> bool: 

911 return self.cipher_supported( 

912 algorithm, CBC(b"\x00" * algorithm.block_size) 

913 ) 

914 

915 def create_cmac_ctx(self, algorithm: BlockCipherAlgorithm) -> _CMACContext: 

916 return _CMACContext(self, algorithm) 

917 

918 def load_pem_private_key( 

919 self, data: bytes, password: typing.Optional[bytes] 

920 ) -> PRIVATE_KEY_TYPES: 

921 return self._load_key( 

922 self._lib.PEM_read_bio_PrivateKey, 

923 self._evp_pkey_to_private_key, 

924 data, 

925 password, 

926 ) 

927 

928 def load_pem_public_key(self, data: bytes) -> PUBLIC_KEY_TYPES: 

929 mem_bio = self._bytes_to_bio(data) 

930 # In OpenSSL 3.0.x the PEM_read_bio_PUBKEY function will invoke 

931 # the default password callback if you pass an encrypted private 

932 # key. This is very, very, very bad as the default callback can 

933 # trigger an interactive console prompt, which will hang the 

934 # Python process. We therefore provide our own callback to 

935 # catch this and error out properly. 

936 userdata = self._ffi.new("CRYPTOGRAPHY_PASSWORD_DATA *") 

937 evp_pkey = self._lib.PEM_read_bio_PUBKEY( 

938 mem_bio.bio, 

939 self._ffi.NULL, 

940 self._ffi.addressof( 

941 self._lib._original_lib, "Cryptography_pem_password_cb" 

942 ), 

943 userdata, 

944 ) 

945 if evp_pkey != self._ffi.NULL: 

946 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 

947 return self._evp_pkey_to_public_key(evp_pkey) 

948 else: 

949 # It's not a (RSA/DSA/ECDSA) subjectPublicKeyInfo, but we still 

950 # need to check to see if it is a pure PKCS1 RSA public key (not 

951 # embedded in a subjectPublicKeyInfo) 

952 self._consume_errors() 

953 res = self._lib.BIO_reset(mem_bio.bio) 

954 self.openssl_assert(res == 1) 

955 rsa_cdata = self._lib.PEM_read_bio_RSAPublicKey( 

956 mem_bio.bio, 

957 self._ffi.NULL, 

958 self._ffi.addressof( 

959 self._lib._original_lib, "Cryptography_pem_password_cb" 

960 ), 

961 userdata, 

962 ) 

963 if rsa_cdata != self._ffi.NULL: 

964 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) 

965 evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata) 

966 return _RSAPublicKey(self, rsa_cdata, evp_pkey) 

967 else: 

968 self._handle_key_loading_error() 

969 

970 def load_pem_parameters(self, data: bytes) -> dh.DHParameters: 

971 mem_bio = self._bytes_to_bio(data) 

972 # only DH is supported currently 

973 dh_cdata = self._lib.PEM_read_bio_DHparams( 

974 mem_bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL 

975 ) 

976 if dh_cdata != self._ffi.NULL: 

977 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free) 

978 return _DHParameters(self, dh_cdata) 

979 else: 

980 self._handle_key_loading_error() 

981 

982 def load_der_private_key( 

983 self, data: bytes, password: typing.Optional[bytes] 

984 ) -> PRIVATE_KEY_TYPES: 

985 # OpenSSL has a function called d2i_AutoPrivateKey that in theory 

986 # handles this automatically, however it doesn't handle encrypted 

987 # private keys. Instead we try to load the key two different ways. 

988 # First we'll try to load it as a traditional key. 

989 bio_data = self._bytes_to_bio(data) 

990 key = self._evp_pkey_from_der_traditional_key(bio_data, password) 

991 if key: 

992 return self._evp_pkey_to_private_key(key) 

993 else: 

994 # Finally we try to load it with the method that handles encrypted 

995 # PKCS8 properly. 

996 return self._load_key( 

997 self._lib.d2i_PKCS8PrivateKey_bio, 

998 self._evp_pkey_to_private_key, 

999 data, 

1000 password, 

1001 ) 

1002 

1003 def _evp_pkey_from_der_traditional_key(self, bio_data, password): 

1004 key = self._lib.d2i_PrivateKey_bio(bio_data.bio, self._ffi.NULL) 

1005 if key != self._ffi.NULL: 

1006 # In OpenSSL 3.0.0-alpha15 there exist scenarios where the key will 

1007 # successfully load but errors are still put on the stack. Tracked 

1008 # as https://github.com/openssl/openssl/issues/14996 

1009 self._consume_errors() 

1010 

1011 key = self._ffi.gc(key, self._lib.EVP_PKEY_free) 

1012 if password is not None: 

1013 raise TypeError( 

1014 "Password was given but private key is not encrypted." 

1015 ) 

1016 

1017 return key 

1018 else: 

1019 self._consume_errors() 

1020 return None 

1021 

1022 def load_der_public_key(self, data: bytes) -> PUBLIC_KEY_TYPES: 

1023 mem_bio = self._bytes_to_bio(data) 

1024 evp_pkey = self._lib.d2i_PUBKEY_bio(mem_bio.bio, self._ffi.NULL) 

1025 if evp_pkey != self._ffi.NULL: 

1026 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 

1027 return self._evp_pkey_to_public_key(evp_pkey) 

1028 else: 

1029 # It's not a (RSA/DSA/ECDSA) subjectPublicKeyInfo, but we still 

1030 # need to check to see if it is a pure PKCS1 RSA public key (not 

1031 # embedded in a subjectPublicKeyInfo) 

1032 self._consume_errors() 

1033 res = self._lib.BIO_reset(mem_bio.bio) 

1034 self.openssl_assert(res == 1) 

1035 rsa_cdata = self._lib.d2i_RSAPublicKey_bio( 

1036 mem_bio.bio, self._ffi.NULL 

1037 ) 

1038 if rsa_cdata != self._ffi.NULL: 

1039 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) 

1040 evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata) 

1041 return _RSAPublicKey(self, rsa_cdata, evp_pkey) 

1042 else: 

1043 self._handle_key_loading_error() 

1044 

1045 def load_der_parameters(self, data: bytes) -> dh.DHParameters: 

1046 mem_bio = self._bytes_to_bio(data) 

1047 dh_cdata = self._lib.d2i_DHparams_bio(mem_bio.bio, self._ffi.NULL) 

1048 if dh_cdata != self._ffi.NULL: 

1049 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free) 

1050 return _DHParameters(self, dh_cdata) 

1051 elif self._lib.Cryptography_HAS_EVP_PKEY_DHX: 

1052 # We check to see if the is dhx. 

1053 self._consume_errors() 

1054 res = self._lib.BIO_reset(mem_bio.bio) 

1055 self.openssl_assert(res == 1) 

1056 dh_cdata = self._lib.Cryptography_d2i_DHxparams_bio( 

1057 mem_bio.bio, self._ffi.NULL 

1058 ) 

1059 if dh_cdata != self._ffi.NULL: 

1060 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free) 

1061 return _DHParameters(self, dh_cdata) 

1062 

1063 self._handle_key_loading_error() 

1064 

1065 def _cert2ossl(self, cert: x509.Certificate) -> typing.Any: 

1066 data = cert.public_bytes(serialization.Encoding.DER) 

1067 mem_bio = self._bytes_to_bio(data) 

1068 x509 = self._lib.d2i_X509_bio(mem_bio.bio, self._ffi.NULL) 

1069 self.openssl_assert(x509 != self._ffi.NULL) 

1070 x509 = self._ffi.gc(x509, self._lib.X509_free) 

1071 return x509 

1072 

1073 def _ossl2cert(self, x509: typing.Any) -> x509.Certificate: 

1074 bio = self._create_mem_bio_gc() 

1075 res = self._lib.i2d_X509_bio(bio, x509) 

1076 self.openssl_assert(res == 1) 

1077 return rust_x509.load_der_x509_certificate(self._read_mem_bio(bio)) 

1078 

1079 def _csr2ossl(self, csr: x509.CertificateSigningRequest) -> typing.Any: 

1080 data = csr.public_bytes(serialization.Encoding.DER) 

1081 mem_bio = self._bytes_to_bio(data) 

1082 x509_req = self._lib.d2i_X509_REQ_bio(mem_bio.bio, self._ffi.NULL) 

1083 self.openssl_assert(x509_req != self._ffi.NULL) 

1084 x509_req = self._ffi.gc(x509_req, self._lib.X509_REQ_free) 

1085 return x509_req 

1086 

1087 def _ossl2csr( 

1088 self, x509_req: typing.Any 

1089 ) -> x509.CertificateSigningRequest: 

1090 bio = self._create_mem_bio_gc() 

1091 res = self._lib.i2d_X509_REQ_bio(bio, x509_req) 

1092 self.openssl_assert(res == 1) 

1093 return rust_x509.load_der_x509_csr(self._read_mem_bio(bio)) 

1094 

1095 def _crl2ossl(self, crl: x509.CertificateRevocationList) -> typing.Any: 

1096 data = crl.public_bytes(serialization.Encoding.DER) 

1097 mem_bio = self._bytes_to_bio(data) 

1098 x509_crl = self._lib.d2i_X509_CRL_bio(mem_bio.bio, self._ffi.NULL) 

1099 self.openssl_assert(x509_crl != self._ffi.NULL) 

1100 x509_crl = self._ffi.gc(x509_crl, self._lib.X509_CRL_free) 

1101 return x509_crl 

1102 

1103 def _ossl2crl( 

1104 self, x509_crl: typing.Any 

1105 ) -> x509.CertificateRevocationList: 

1106 bio = self._create_mem_bio_gc() 

1107 res = self._lib.i2d_X509_CRL_bio(bio, x509_crl) 

1108 self.openssl_assert(res == 1) 

1109 return rust_x509.load_der_x509_crl(self._read_mem_bio(bio)) 

1110 

1111 def _crl_is_signature_valid( 

1112 self, 

1113 crl: x509.CertificateRevocationList, 

1114 public_key: CERTIFICATE_ISSUER_PUBLIC_KEY_TYPES, 

1115 ) -> bool: 

1116 if not isinstance( 

1117 public_key, 

1118 ( 

1119 _DSAPublicKey, 

1120 _RSAPublicKey, 

1121 _EllipticCurvePublicKey, 

1122 ), 

1123 ): 

1124 raise TypeError( 

1125 "Expecting one of DSAPublicKey, RSAPublicKey," 

1126 " or EllipticCurvePublicKey." 

1127 ) 

1128 x509_crl = self._crl2ossl(crl) 

1129 res = self._lib.X509_CRL_verify(x509_crl, public_key._evp_pkey) 

1130 

1131 if res != 1: 

1132 self._consume_errors() 

1133 return False 

1134 

1135 return True 

1136 

1137 def _csr_is_signature_valid( 

1138 self, csr: x509.CertificateSigningRequest 

1139 ) -> bool: 

1140 x509_req = self._csr2ossl(csr) 

1141 pkey = self._lib.X509_REQ_get_pubkey(x509_req) 

1142 self.openssl_assert(pkey != self._ffi.NULL) 

1143 pkey = self._ffi.gc(pkey, self._lib.EVP_PKEY_free) 

1144 res = self._lib.X509_REQ_verify(x509_req, pkey) 

1145 

1146 if res != 1: 

1147 self._consume_errors() 

1148 return False 

1149 

1150 return True 

1151 

1152 def _check_keys_correspond(self, key1, key2): 

1153 if self._lib.EVP_PKEY_cmp(key1._evp_pkey, key2._evp_pkey) != 1: 

1154 raise ValueError("Keys do not correspond") 

1155 

1156 def _load_key(self, openssl_read_func, convert_func, data, password): 

1157 mem_bio = self._bytes_to_bio(data) 

1158 

1159 userdata = self._ffi.new("CRYPTOGRAPHY_PASSWORD_DATA *") 

1160 if password is not None: 

1161 utils._check_byteslike("password", password) 

1162 password_ptr = self._ffi.from_buffer(password) 

1163 userdata.password = password_ptr 

1164 userdata.length = len(password) 

1165 

1166 evp_pkey = openssl_read_func( 

1167 mem_bio.bio, 

1168 self._ffi.NULL, 

1169 self._ffi.addressof( 

1170 self._lib._original_lib, "Cryptography_pem_password_cb" 

1171 ), 

1172 userdata, 

1173 ) 

1174 

1175 if evp_pkey == self._ffi.NULL: 

1176 if userdata.error != 0: 

1177 self._consume_errors() 

1178 if userdata.error == -1: 

1179 raise TypeError( 

1180 "Password was not given but private key is encrypted" 

1181 ) 

1182 else: 

1183 assert userdata.error == -2 

1184 raise ValueError( 

1185 "Passwords longer than {} bytes are not supported " 

1186 "by this backend.".format(userdata.maxsize - 1) 

1187 ) 

1188 else: 

1189 self._handle_key_loading_error() 

1190 

1191 # In OpenSSL 3.0.0-alpha15 there exist scenarios where the key will 

1192 # successfully load but errors are still put on the stack. Tracked 

1193 # as https://github.com/openssl/openssl/issues/14996 

1194 self._consume_errors() 

1195 

1196 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 

1197 

1198 if password is not None and userdata.called == 0: 

1199 raise TypeError( 

1200 "Password was given but private key is not encrypted." 

1201 ) 

1202 

1203 assert ( 

1204 password is not None and userdata.called == 1 

1205 ) or password is None 

1206 

1207 return convert_func(evp_pkey) 

1208 

1209 def _handle_key_loading_error(self) -> typing.NoReturn: 

1210 errors = self._consume_errors() 

1211 

1212 if not errors: 

1213 raise ValueError( 

1214 "Could not deserialize key data. The data may be in an " 

1215 "incorrect format or it may be encrypted with an unsupported " 

1216 "algorithm." 

1217 ) 

1218 

1219 elif ( 

1220 errors[0]._lib_reason_match( 

1221 self._lib.ERR_LIB_EVP, self._lib.EVP_R_BAD_DECRYPT 

1222 ) 

1223 or errors[0]._lib_reason_match( 

1224 self._lib.ERR_LIB_PKCS12, 

1225 self._lib.PKCS12_R_PKCS12_CIPHERFINAL_ERROR, 

1226 ) 

1227 or ( 

1228 self._lib.Cryptography_HAS_PROVIDERS 

1229 and errors[0]._lib_reason_match( 

1230 self._lib.ERR_LIB_PROV, 

1231 self._lib.PROV_R_BAD_DECRYPT, 

1232 ) 

1233 ) 

1234 ): 

1235 raise ValueError("Bad decrypt. Incorrect password?") 

1236 

1237 elif any( 

1238 error._lib_reason_match( 

1239 self._lib.ERR_LIB_EVP, 

1240 self._lib.EVP_R_UNSUPPORTED_PRIVATE_KEY_ALGORITHM, 

1241 ) 

1242 for error in errors 

1243 ): 

1244 raise ValueError("Unsupported public key algorithm.") 

1245 

1246 else: 

1247 errors_with_text = binding._errors_with_text(errors) 

1248 raise ValueError( 

1249 "Could not deserialize key data. The data may be in an " 

1250 "incorrect format, it may be encrypted with an unsupported " 

1251 "algorithm, or it may be an unsupported key type (e.g. EC " 

1252 "curves with explicit parameters).", 

1253 errors_with_text, 

1254 ) 

1255 

1256 def elliptic_curve_supported(self, curve: ec.EllipticCurve) -> bool: 

1257 try: 

1258 curve_nid = self._elliptic_curve_to_nid(curve) 

1259 except UnsupportedAlgorithm: 

1260 curve_nid = self._lib.NID_undef 

1261 

1262 group = self._lib.EC_GROUP_new_by_curve_name(curve_nid) 

1263 

1264 if group == self._ffi.NULL: 

1265 self._consume_errors() 

1266 return False 

1267 else: 

1268 self.openssl_assert(curve_nid != self._lib.NID_undef) 

1269 self._lib.EC_GROUP_free(group) 

1270 return True 

1271 

1272 def elliptic_curve_signature_algorithm_supported( 

1273 self, 

1274 signature_algorithm: ec.EllipticCurveSignatureAlgorithm, 

1275 curve: ec.EllipticCurve, 

1276 ) -> bool: 

1277 # We only support ECDSA right now. 

1278 if not isinstance(signature_algorithm, ec.ECDSA): 

1279 return False 

1280 

1281 return self.elliptic_curve_supported(curve) 

1282 

1283 def generate_elliptic_curve_private_key( 

1284 self, curve: ec.EllipticCurve 

1285 ) -> ec.EllipticCurvePrivateKey: 

1286 """ 

1287 Generate a new private key on the named curve. 

1288 """ 

1289 

1290 if self.elliptic_curve_supported(curve): 

1291 ec_cdata = self._ec_key_new_by_curve(curve) 

1292 

1293 res = self._lib.EC_KEY_generate_key(ec_cdata) 

1294 self.openssl_assert(res == 1) 

1295 

1296 evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata) 

1297 

1298 return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey) 

1299 else: 

1300 raise UnsupportedAlgorithm( 

1301 "Backend object does not support {}.".format(curve.name), 

1302 _Reasons.UNSUPPORTED_ELLIPTIC_CURVE, 

1303 ) 

1304 

1305 def load_elliptic_curve_private_numbers( 

1306 self, numbers: ec.EllipticCurvePrivateNumbers 

1307 ) -> ec.EllipticCurvePrivateKey: 

1308 public = numbers.public_numbers 

1309 

1310 ec_cdata = self._ec_key_new_by_curve(public.curve) 

1311 

1312 private_value = self._ffi.gc( 

1313 self._int_to_bn(numbers.private_value), self._lib.BN_clear_free 

1314 ) 

1315 res = self._lib.EC_KEY_set_private_key(ec_cdata, private_value) 

1316 if res != 1: 

1317 self._consume_errors() 

1318 raise ValueError("Invalid EC key.") 

1319 

1320 self._ec_key_set_public_key_affine_coordinates( 

1321 ec_cdata, public.x, public.y 

1322 ) 

1323 

1324 evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata) 

1325 

1326 return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey) 

1327 

1328 def load_elliptic_curve_public_numbers( 

1329 self, numbers: ec.EllipticCurvePublicNumbers 

1330 ) -> ec.EllipticCurvePublicKey: 

1331 ec_cdata = self._ec_key_new_by_curve(numbers.curve) 

1332 self._ec_key_set_public_key_affine_coordinates( 

1333 ec_cdata, numbers.x, numbers.y 

1334 ) 

1335 evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata) 

1336 

1337 return _EllipticCurvePublicKey(self, ec_cdata, evp_pkey) 

1338 

1339 def load_elliptic_curve_public_bytes( 

1340 self, curve: ec.EllipticCurve, point_bytes: bytes 

1341 ) -> ec.EllipticCurvePublicKey: 

1342 ec_cdata = self._ec_key_new_by_curve(curve) 

1343 group = self._lib.EC_KEY_get0_group(ec_cdata) 

1344 self.openssl_assert(group != self._ffi.NULL) 

1345 point = self._lib.EC_POINT_new(group) 

1346 self.openssl_assert(point != self._ffi.NULL) 

1347 point = self._ffi.gc(point, self._lib.EC_POINT_free) 

1348 with self._tmp_bn_ctx() as bn_ctx: 

1349 res = self._lib.EC_POINT_oct2point( 

1350 group, point, point_bytes, len(point_bytes), bn_ctx 

1351 ) 

1352 if res != 1: 

1353 self._consume_errors() 

1354 raise ValueError("Invalid public bytes for the given curve") 

1355 

1356 res = self._lib.EC_KEY_set_public_key(ec_cdata, point) 

1357 self.openssl_assert(res == 1) 

1358 evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata) 

1359 return _EllipticCurvePublicKey(self, ec_cdata, evp_pkey) 

1360 

1361 def derive_elliptic_curve_private_key( 

1362 self, private_value: int, curve: ec.EllipticCurve 

1363 ) -> ec.EllipticCurvePrivateKey: 

1364 ec_cdata = self._ec_key_new_by_curve(curve) 

1365 

1366 get_func, group = self._ec_key_determine_group_get_func(ec_cdata) 

1367 

1368 point = self._lib.EC_POINT_new(group) 

1369 self.openssl_assert(point != self._ffi.NULL) 

1370 point = self._ffi.gc(point, self._lib.EC_POINT_free) 

1371 

1372 value = self._int_to_bn(private_value) 

1373 value = self._ffi.gc(value, self._lib.BN_clear_free) 

1374 

1375 with self._tmp_bn_ctx() as bn_ctx: 

1376 res = self._lib.EC_POINT_mul( 

1377 group, point, value, self._ffi.NULL, self._ffi.NULL, bn_ctx 

1378 ) 

1379 self.openssl_assert(res == 1) 

1380 

1381 bn_x = self._lib.BN_CTX_get(bn_ctx) 

1382 bn_y = self._lib.BN_CTX_get(bn_ctx) 

1383 

1384 res = get_func(group, point, bn_x, bn_y, bn_ctx) 

1385 if res != 1: 

1386 self._consume_errors() 

1387 raise ValueError("Unable to derive key from private_value") 

1388 

1389 res = self._lib.EC_KEY_set_public_key(ec_cdata, point) 

1390 self.openssl_assert(res == 1) 

1391 private = self._int_to_bn(private_value) 

1392 private = self._ffi.gc(private, self._lib.BN_clear_free) 

1393 res = self._lib.EC_KEY_set_private_key(ec_cdata, private) 

1394 self.openssl_assert(res == 1) 

1395 

1396 evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata) 

1397 

1398 return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey) 

1399 

1400 def _ec_key_new_by_curve(self, curve: ec.EllipticCurve): 

1401 curve_nid = self._elliptic_curve_to_nid(curve) 

1402 return self._ec_key_new_by_curve_nid(curve_nid) 

1403 

1404 def _ec_key_new_by_curve_nid(self, curve_nid: int): 

1405 ec_cdata = self._lib.EC_KEY_new_by_curve_name(curve_nid) 

1406 self.openssl_assert(ec_cdata != self._ffi.NULL) 

1407 return self._ffi.gc(ec_cdata, self._lib.EC_KEY_free) 

1408 

1409 def elliptic_curve_exchange_algorithm_supported( 

1410 self, algorithm: ec.ECDH, curve: ec.EllipticCurve 

1411 ) -> bool: 

1412 if self._fips_enabled and not isinstance( 

1413 curve, self._fips_ecdh_curves 

1414 ): 

1415 return False 

1416 

1417 return self.elliptic_curve_supported(curve) and isinstance( 

1418 algorithm, ec.ECDH 

1419 ) 

1420 

1421 def _ec_cdata_to_evp_pkey(self, ec_cdata): 

1422 evp_pkey = self._create_evp_pkey_gc() 

1423 res = self._lib.EVP_PKEY_set1_EC_KEY(evp_pkey, ec_cdata) 

1424 self.openssl_assert(res == 1) 

1425 return evp_pkey 

1426 

1427 def _elliptic_curve_to_nid(self, curve: ec.EllipticCurve) -> int: 

1428 """ 

1429 Get the NID for a curve name. 

1430 """ 

1431 

1432 curve_aliases = {"secp192r1": "prime192v1", "secp256r1": "prime256v1"} 

1433 

1434 curve_name = curve_aliases.get(curve.name, curve.name) 

1435 

1436 curve_nid = self._lib.OBJ_sn2nid(curve_name.encode()) 

1437 if curve_nid == self._lib.NID_undef: 

1438 raise UnsupportedAlgorithm( 

1439 "{} is not a supported elliptic curve".format(curve.name), 

1440 _Reasons.UNSUPPORTED_ELLIPTIC_CURVE, 

1441 ) 

1442 return curve_nid 

1443 

1444 @contextmanager 

1445 def _tmp_bn_ctx(self): 

1446 bn_ctx = self._lib.BN_CTX_new() 

1447 self.openssl_assert(bn_ctx != self._ffi.NULL) 

1448 bn_ctx = self._ffi.gc(bn_ctx, self._lib.BN_CTX_free) 

1449 self._lib.BN_CTX_start(bn_ctx) 

1450 try: 

1451 yield bn_ctx 

1452 finally: 

1453 self._lib.BN_CTX_end(bn_ctx) 

1454 

1455 def _ec_key_determine_group_get_func(self, ctx): 

1456 """ 

1457 Given an EC_KEY determine the group and what function is required to 

1458 get point coordinates. 

1459 """ 

1460 self.openssl_assert(ctx != self._ffi.NULL) 

1461 

1462 nid_two_field = self._lib.OBJ_sn2nid(b"characteristic-two-field") 

1463 self.openssl_assert(nid_two_field != self._lib.NID_undef) 

1464 

1465 group = self._lib.EC_KEY_get0_group(ctx) 

1466 self.openssl_assert(group != self._ffi.NULL) 

1467 

1468 method = self._lib.EC_GROUP_method_of(group) 

1469 self.openssl_assert(method != self._ffi.NULL) 

1470 

1471 nid = self._lib.EC_METHOD_get_field_type(method) 

1472 self.openssl_assert(nid != self._lib.NID_undef) 

1473 

1474 if nid == nid_two_field and self._lib.Cryptography_HAS_EC2M: 

1475 get_func = self._lib.EC_POINT_get_affine_coordinates_GF2m 

1476 else: 

1477 get_func = self._lib.EC_POINT_get_affine_coordinates_GFp 

1478 

1479 assert get_func 

1480 

1481 return get_func, group 

1482 

1483 def _ec_key_set_public_key_affine_coordinates(self, ctx, x: int, y: int): 

1484 """ 

1485 Sets the public key point in the EC_KEY context to the affine x and y 

1486 values. 

1487 """ 

1488 

1489 if x < 0 or y < 0: 

1490 raise ValueError( 

1491 "Invalid EC key. Both x and y must be non-negative." 

1492 ) 

1493 

1494 x = self._ffi.gc(self._int_to_bn(x), self._lib.BN_free) 

1495 y = self._ffi.gc(self._int_to_bn(y), self._lib.BN_free) 

1496 res = self._lib.EC_KEY_set_public_key_affine_coordinates(ctx, x, y) 

1497 if res != 1: 

1498 self._consume_errors() 

1499 raise ValueError("Invalid EC key.") 

1500 

1501 def _private_key_bytes( 

1502 self, 

1503 encoding: serialization.Encoding, 

1504 format: serialization.PrivateFormat, 

1505 encryption_algorithm: serialization.KeySerializationEncryption, 

1506 key, 

1507 evp_pkey, 

1508 cdata, 

1509 ) -> bytes: 

1510 # validate argument types 

1511 if not isinstance(encoding, serialization.Encoding): 

1512 raise TypeError("encoding must be an item from the Encoding enum") 

1513 if not isinstance(format, serialization.PrivateFormat): 

1514 raise TypeError( 

1515 "format must be an item from the PrivateFormat enum" 

1516 ) 

1517 if not isinstance( 

1518 encryption_algorithm, serialization.KeySerializationEncryption 

1519 ): 

1520 raise TypeError( 

1521 "Encryption algorithm must be a KeySerializationEncryption " 

1522 "instance" 

1523 ) 

1524 

1525 # validate password 

1526 if isinstance(encryption_algorithm, serialization.NoEncryption): 

1527 password = b"" 

1528 elif isinstance( 

1529 encryption_algorithm, serialization.BestAvailableEncryption 

1530 ): 

1531 password = encryption_algorithm.password 

1532 if len(password) > 1023: 

1533 raise ValueError( 

1534 "Passwords longer than 1023 bytes are not supported by " 

1535 "this backend" 

1536 ) 

1537 elif ( 

1538 isinstance( 

1539 encryption_algorithm, serialization._KeySerializationEncryption 

1540 ) 

1541 and encryption_algorithm._format 

1542 is format 

1543 is serialization.PrivateFormat.OpenSSH 

1544 ): 

1545 password = encryption_algorithm.password 

1546 else: 

1547 raise ValueError("Unsupported encryption type") 

1548 

1549 # PKCS8 + PEM/DER 

1550 if format is serialization.PrivateFormat.PKCS8: 

1551 if encoding is serialization.Encoding.PEM: 

1552 write_bio = self._lib.PEM_write_bio_PKCS8PrivateKey 

1553 elif encoding is serialization.Encoding.DER: 

1554 write_bio = self._lib.i2d_PKCS8PrivateKey_bio 

1555 else: 

1556 raise ValueError("Unsupported encoding for PKCS8") 

1557 return self._private_key_bytes_via_bio( 

1558 write_bio, evp_pkey, password 

1559 ) 

1560 

1561 # TraditionalOpenSSL + PEM/DER 

1562 if format is serialization.PrivateFormat.TraditionalOpenSSL: 

1563 if self._fips_enabled and not isinstance( 

1564 encryption_algorithm, serialization.NoEncryption 

1565 ): 

1566 raise ValueError( 

1567 "Encrypted traditional OpenSSL format is not " 

1568 "supported in FIPS mode." 

1569 ) 

1570 key_type = self._lib.EVP_PKEY_id(evp_pkey) 

1571 

1572 if encoding is serialization.Encoding.PEM: 

1573 if key_type == self._lib.EVP_PKEY_RSA: 

1574 write_bio = self._lib.PEM_write_bio_RSAPrivateKey 

1575 elif key_type == self._lib.EVP_PKEY_DSA: 

1576 write_bio = self._lib.PEM_write_bio_DSAPrivateKey 

1577 elif key_type == self._lib.EVP_PKEY_EC: 

1578 write_bio = self._lib.PEM_write_bio_ECPrivateKey 

1579 else: 

1580 raise ValueError( 

1581 "Unsupported key type for TraditionalOpenSSL" 

1582 ) 

1583 return self._private_key_bytes_via_bio( 

1584 write_bio, cdata, password 

1585 ) 

1586 

1587 if encoding is serialization.Encoding.DER: 

1588 if password: 

1589 raise ValueError( 

1590 "Encryption is not supported for DER encoded " 

1591 "traditional OpenSSL keys" 

1592 ) 

1593 if key_type == self._lib.EVP_PKEY_RSA: 

1594 write_bio = self._lib.i2d_RSAPrivateKey_bio 

1595 elif key_type == self._lib.EVP_PKEY_EC: 

1596 write_bio = self._lib.i2d_ECPrivateKey_bio 

1597 elif key_type == self._lib.EVP_PKEY_DSA: 

1598 write_bio = self._lib.i2d_DSAPrivateKey_bio 

1599 else: 

1600 raise ValueError( 

1601 "Unsupported key type for TraditionalOpenSSL" 

1602 ) 

1603 return self._bio_func_output(write_bio, cdata) 

1604 

1605 raise ValueError("Unsupported encoding for TraditionalOpenSSL") 

1606 

1607 # OpenSSH + PEM 

1608 if format is serialization.PrivateFormat.OpenSSH: 

1609 if encoding is serialization.Encoding.PEM: 

1610 return ssh._serialize_ssh_private_key( 

1611 key, password, encryption_algorithm 

1612 ) 

1613 

1614 raise ValueError( 

1615 "OpenSSH private key format can only be used" 

1616 " with PEM encoding" 

1617 ) 

1618 

1619 # Anything that key-specific code was supposed to handle earlier, 

1620 # like Raw. 

1621 raise ValueError("format is invalid with this key") 

1622 

1623 def _private_key_bytes_via_bio(self, write_bio, evp_pkey, password): 

1624 if not password: 

1625 evp_cipher = self._ffi.NULL 

1626 else: 

1627 # This is a curated value that we will update over time. 

1628 evp_cipher = self._lib.EVP_get_cipherbyname(b"aes-256-cbc") 

1629 

1630 return self._bio_func_output( 

1631 write_bio, 

1632 evp_pkey, 

1633 evp_cipher, 

1634 password, 

1635 len(password), 

1636 self._ffi.NULL, 

1637 self._ffi.NULL, 

1638 ) 

1639 

1640 def _bio_func_output(self, write_bio, *args): 

1641 bio = self._create_mem_bio_gc() 

1642 res = write_bio(bio, *args) 

1643 self.openssl_assert(res == 1) 

1644 return self._read_mem_bio(bio) 

1645 

1646 def _public_key_bytes( 

1647 self, 

1648 encoding: serialization.Encoding, 

1649 format: serialization.PublicFormat, 

1650 key, 

1651 evp_pkey, 

1652 cdata, 

1653 ) -> bytes: 

1654 if not isinstance(encoding, serialization.Encoding): 

1655 raise TypeError("encoding must be an item from the Encoding enum") 

1656 if not isinstance(format, serialization.PublicFormat): 

1657 raise TypeError( 

1658 "format must be an item from the PublicFormat enum" 

1659 ) 

1660 

1661 # SubjectPublicKeyInfo + PEM/DER 

1662 if format is serialization.PublicFormat.SubjectPublicKeyInfo: 

1663 if encoding is serialization.Encoding.PEM: 

1664 write_bio = self._lib.PEM_write_bio_PUBKEY 

1665 elif encoding is serialization.Encoding.DER: 

1666 write_bio = self._lib.i2d_PUBKEY_bio 

1667 else: 

1668 raise ValueError( 

1669 "SubjectPublicKeyInfo works only with PEM or DER encoding" 

1670 ) 

1671 return self._bio_func_output(write_bio, evp_pkey) 

1672 

1673 # PKCS1 + PEM/DER 

1674 if format is serialization.PublicFormat.PKCS1: 

1675 # Only RSA is supported here. 

1676 key_type = self._lib.EVP_PKEY_id(evp_pkey) 

1677 if key_type != self._lib.EVP_PKEY_RSA: 

1678 raise ValueError("PKCS1 format is supported only for RSA keys") 

1679 

1680 if encoding is serialization.Encoding.PEM: 

1681 write_bio = self._lib.PEM_write_bio_RSAPublicKey 

1682 elif encoding is serialization.Encoding.DER: 

1683 write_bio = self._lib.i2d_RSAPublicKey_bio 

1684 else: 

1685 raise ValueError("PKCS1 works only with PEM or DER encoding") 

1686 return self._bio_func_output(write_bio, cdata) 

1687 

1688 # OpenSSH + OpenSSH 

1689 if format is serialization.PublicFormat.OpenSSH: 

1690 if encoding is serialization.Encoding.OpenSSH: 

1691 return ssh.serialize_ssh_public_key(key) 

1692 

1693 raise ValueError( 

1694 "OpenSSH format must be used with OpenSSH encoding" 

1695 ) 

1696 

1697 # Anything that key-specific code was supposed to handle earlier, 

1698 # like Raw, CompressedPoint, UncompressedPoint 

1699 raise ValueError("format is invalid with this key") 

1700 

1701 def dh_supported(self) -> bool: 

1702 return not self._lib.CRYPTOGRAPHY_IS_BORINGSSL 

1703 

1704 def generate_dh_parameters( 

1705 self, generator: int, key_size: int 

1706 ) -> dh.DHParameters: 

1707 if key_size < dh._MIN_MODULUS_SIZE: 

1708 raise ValueError( 

1709 "DH key_size must be at least {} bits".format( 

1710 dh._MIN_MODULUS_SIZE 

1711 ) 

1712 ) 

1713 

1714 if generator not in (2, 5): 

1715 raise ValueError("DH generator must be 2 or 5") 

1716 

1717 dh_param_cdata = self._lib.DH_new() 

1718 self.openssl_assert(dh_param_cdata != self._ffi.NULL) 

1719 dh_param_cdata = self._ffi.gc(dh_param_cdata, self._lib.DH_free) 

1720 

1721 res = self._lib.DH_generate_parameters_ex( 

1722 dh_param_cdata, key_size, generator, self._ffi.NULL 

1723 ) 

1724 self.openssl_assert(res == 1) 

1725 

1726 return _DHParameters(self, dh_param_cdata) 

1727 

1728 def _dh_cdata_to_evp_pkey(self, dh_cdata): 

1729 evp_pkey = self._create_evp_pkey_gc() 

1730 res = self._lib.EVP_PKEY_set1_DH(evp_pkey, dh_cdata) 

1731 self.openssl_assert(res == 1) 

1732 return evp_pkey 

1733 

1734 def generate_dh_private_key( 

1735 self, parameters: dh.DHParameters 

1736 ) -> dh.DHPrivateKey: 

1737 dh_key_cdata = _dh_params_dup( 

1738 parameters._dh_cdata, self # type: ignore[attr-defined] 

1739 ) 

1740 

1741 res = self._lib.DH_generate_key(dh_key_cdata) 

1742 self.openssl_assert(res == 1) 

1743 

1744 evp_pkey = self._dh_cdata_to_evp_pkey(dh_key_cdata) 

1745 

1746 return _DHPrivateKey(self, dh_key_cdata, evp_pkey) 

1747 

1748 def generate_dh_private_key_and_parameters( 

1749 self, generator: int, key_size: int 

1750 ) -> dh.DHPrivateKey: 

1751 return self.generate_dh_private_key( 

1752 self.generate_dh_parameters(generator, key_size) 

1753 ) 

1754 

1755 def load_dh_private_numbers( 

1756 self, numbers: dh.DHPrivateNumbers 

1757 ) -> dh.DHPrivateKey: 

1758 parameter_numbers = numbers.public_numbers.parameter_numbers 

1759 

1760 dh_cdata = self._lib.DH_new() 

1761 self.openssl_assert(dh_cdata != self._ffi.NULL) 

1762 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free) 

1763 

1764 p = self._int_to_bn(parameter_numbers.p) 

1765 g = self._int_to_bn(parameter_numbers.g) 

1766 

1767 if parameter_numbers.q is not None: 

1768 q = self._int_to_bn(parameter_numbers.q) 

1769 else: 

1770 q = self._ffi.NULL 

1771 

1772 pub_key = self._int_to_bn(numbers.public_numbers.y) 

1773 priv_key = self._int_to_bn(numbers.x) 

1774 

1775 res = self._lib.DH_set0_pqg(dh_cdata, p, q, g) 

1776 self.openssl_assert(res == 1) 

1777 

1778 res = self._lib.DH_set0_key(dh_cdata, pub_key, priv_key) 

1779 self.openssl_assert(res == 1) 

1780 

1781 codes = self._ffi.new("int[]", 1) 

1782 res = self._lib.Cryptography_DH_check(dh_cdata, codes) 

1783 self.openssl_assert(res == 1) 

1784 

1785 # DH_check will return DH_NOT_SUITABLE_GENERATOR if p % 24 does not 

1786 # equal 11 when the generator is 2 (a quadratic nonresidue). 

1787 # We want to ignore that error because p % 24 == 23 is also fine. 

1788 # Specifically, g is then a quadratic residue. Within the context of 

1789 # Diffie-Hellman this means it can only generate half the possible 

1790 # values. That sounds bad, but quadratic nonresidues leak a bit of 

1791 # the key to the attacker in exchange for having the full key space 

1792 # available. See: https://crypto.stackexchange.com/questions/12961 

1793 if codes[0] != 0 and not ( 

1794 parameter_numbers.g == 2 

1795 and codes[0] ^ self._lib.DH_NOT_SUITABLE_GENERATOR == 0 

1796 ): 

1797 raise ValueError("DH private numbers did not pass safety checks.") 

1798 

1799 evp_pkey = self._dh_cdata_to_evp_pkey(dh_cdata) 

1800 

1801 return _DHPrivateKey(self, dh_cdata, evp_pkey) 

1802 

1803 def load_dh_public_numbers( 

1804 self, numbers: dh.DHPublicNumbers 

1805 ) -> dh.DHPublicKey: 

1806 dh_cdata = self._lib.DH_new() 

1807 self.openssl_assert(dh_cdata != self._ffi.NULL) 

1808 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free) 

1809 

1810 parameter_numbers = numbers.parameter_numbers 

1811 

1812 p = self._int_to_bn(parameter_numbers.p) 

1813 g = self._int_to_bn(parameter_numbers.g) 

1814 

1815 if parameter_numbers.q is not None: 

1816 q = self._int_to_bn(parameter_numbers.q) 

1817 else: 

1818 q = self._ffi.NULL 

1819 

1820 pub_key = self._int_to_bn(numbers.y) 

1821 

1822 res = self._lib.DH_set0_pqg(dh_cdata, p, q, g) 

1823 self.openssl_assert(res == 1) 

1824 

1825 res = self._lib.DH_set0_key(dh_cdata, pub_key, self._ffi.NULL) 

1826 self.openssl_assert(res == 1) 

1827 

1828 evp_pkey = self._dh_cdata_to_evp_pkey(dh_cdata) 

1829 

1830 return _DHPublicKey(self, dh_cdata, evp_pkey) 

1831 

1832 def load_dh_parameter_numbers( 

1833 self, numbers: dh.DHParameterNumbers 

1834 ) -> dh.DHParameters: 

1835 dh_cdata = self._lib.DH_new() 

1836 self.openssl_assert(dh_cdata != self._ffi.NULL) 

1837 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free) 

1838 

1839 p = self._int_to_bn(numbers.p) 

1840 g = self._int_to_bn(numbers.g) 

1841 

1842 if numbers.q is not None: 

1843 q = self._int_to_bn(numbers.q) 

1844 else: 

1845 q = self._ffi.NULL 

1846 

1847 res = self._lib.DH_set0_pqg(dh_cdata, p, q, g) 

1848 self.openssl_assert(res == 1) 

1849 

1850 return _DHParameters(self, dh_cdata) 

1851 

1852 def dh_parameters_supported( 

1853 self, p: int, g: int, q: typing.Optional[int] = None 

1854 ) -> bool: 

1855 dh_cdata = self._lib.DH_new() 

1856 self.openssl_assert(dh_cdata != self._ffi.NULL) 

1857 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free) 

1858 

1859 p = self._int_to_bn(p) 

1860 g = self._int_to_bn(g) 

1861 

1862 if q is not None: 

1863 q = self._int_to_bn(q) 

1864 else: 

1865 q = self._ffi.NULL 

1866 

1867 res = self._lib.DH_set0_pqg(dh_cdata, p, q, g) 

1868 self.openssl_assert(res == 1) 

1869 

1870 codes = self._ffi.new("int[]", 1) 

1871 res = self._lib.Cryptography_DH_check(dh_cdata, codes) 

1872 self.openssl_assert(res == 1) 

1873 

1874 return codes[0] == 0 

1875 

1876 def dh_x942_serialization_supported(self) -> bool: 

1877 return self._lib.Cryptography_HAS_EVP_PKEY_DHX == 1 

1878 

1879 def x25519_load_public_bytes(self, data: bytes) -> x25519.X25519PublicKey: 

1880 # When we drop support for CRYPTOGRAPHY_OPENSSL_LESS_THAN_111 we can 

1881 # switch this to EVP_PKEY_new_raw_public_key 

1882 if len(data) != 32: 

1883 raise ValueError("An X25519 public key is 32 bytes long") 

1884 

1885 evp_pkey = self._create_evp_pkey_gc() 

1886 res = self._lib.EVP_PKEY_set_type(evp_pkey, self._lib.NID_X25519) 

1887 self.openssl_assert(res == 1) 

1888 res = self._lib.EVP_PKEY_set1_tls_encodedpoint( 

1889 evp_pkey, data, len(data) 

1890 ) 

1891 self.openssl_assert(res == 1) 

1892 return _X25519PublicKey(self, evp_pkey) 

1893 

1894 def x25519_load_private_bytes( 

1895 self, data: bytes 

1896 ) -> x25519.X25519PrivateKey: 

1897 # When we drop support for CRYPTOGRAPHY_OPENSSL_LESS_THAN_111 we can 

1898 # switch this to EVP_PKEY_new_raw_private_key and drop the 

1899 # zeroed_bytearray garbage. 

1900 # OpenSSL only has facilities for loading PKCS8 formatted private 

1901 # keys using the algorithm identifiers specified in 

1902 # https://tools.ietf.org/html/draft-ietf-curdle-pkix-09. 

1903 # This is the standard PKCS8 prefix for a 32 byte X25519 key. 

1904 # The form is: 

1905 # 0:d=0 hl=2 l= 46 cons: SEQUENCE 

1906 # 2:d=1 hl=2 l= 1 prim: INTEGER :00 

1907 # 5:d=1 hl=2 l= 5 cons: SEQUENCE 

1908 # 7:d=2 hl=2 l= 3 prim: OBJECT :1.3.101.110 

1909 # 12:d=1 hl=2 l= 34 prim: OCTET STRING (the key) 

1910 # Of course there's a bit more complexity. In reality OCTET STRING 

1911 # contains an OCTET STRING of length 32! So the last two bytes here 

1912 # are \x04\x20, which is an OCTET STRING of length 32. 

1913 if len(data) != 32: 

1914 raise ValueError("An X25519 private key is 32 bytes long") 

1915 

1916 pkcs8_prefix = b'0.\x02\x01\x000\x05\x06\x03+en\x04"\x04 ' 

1917 with self._zeroed_bytearray(48) as ba: 

1918 ba[0:16] = pkcs8_prefix 

1919 ba[16:] = data 

1920 bio = self._bytes_to_bio(ba) 

1921 evp_pkey = self._lib.d2i_PrivateKey_bio(bio.bio, self._ffi.NULL) 

1922 

1923 self.openssl_assert(evp_pkey != self._ffi.NULL) 

1924 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 

1925 self.openssl_assert( 

1926 self._lib.EVP_PKEY_id(evp_pkey) == self._lib.EVP_PKEY_X25519 

1927 ) 

1928 return _X25519PrivateKey(self, evp_pkey) 

1929 

1930 def _evp_pkey_keygen_gc(self, nid): 

1931 evp_pkey_ctx = self._lib.EVP_PKEY_CTX_new_id(nid, self._ffi.NULL) 

1932 self.openssl_assert(evp_pkey_ctx != self._ffi.NULL) 

1933 evp_pkey_ctx = self._ffi.gc(evp_pkey_ctx, self._lib.EVP_PKEY_CTX_free) 

1934 res = self._lib.EVP_PKEY_keygen_init(evp_pkey_ctx) 

1935 self.openssl_assert(res == 1) 

1936 evp_ppkey = self._ffi.new("EVP_PKEY **") 

1937 res = self._lib.EVP_PKEY_keygen(evp_pkey_ctx, evp_ppkey) 

1938 self.openssl_assert(res == 1) 

1939 self.openssl_assert(evp_ppkey[0] != self._ffi.NULL) 

1940 evp_pkey = self._ffi.gc(evp_ppkey[0], self._lib.EVP_PKEY_free) 

1941 return evp_pkey 

1942 

1943 def x25519_generate_key(self) -> x25519.X25519PrivateKey: 

1944 evp_pkey = self._evp_pkey_keygen_gc(self._lib.NID_X25519) 

1945 return _X25519PrivateKey(self, evp_pkey) 

1946 

1947 def x25519_supported(self) -> bool: 

1948 if self._fips_enabled: 

1949 return False 

1950 return not self._lib.CRYPTOGRAPHY_IS_LIBRESSL 

1951 

1952 def x448_load_public_bytes(self, data: bytes) -> x448.X448PublicKey: 

1953 if len(data) != 56: 

1954 raise ValueError("An X448 public key is 56 bytes long") 

1955 

1956 evp_pkey = self._lib.EVP_PKEY_new_raw_public_key( 

1957 self._lib.NID_X448, self._ffi.NULL, data, len(data) 

1958 ) 

1959 self.openssl_assert(evp_pkey != self._ffi.NULL) 

1960 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 

1961 return _X448PublicKey(self, evp_pkey) 

1962 

1963 def x448_load_private_bytes(self, data: bytes) -> x448.X448PrivateKey: 

1964 if len(data) != 56: 

1965 raise ValueError("An X448 private key is 56 bytes long") 

1966 

1967 data_ptr = self._ffi.from_buffer(data) 

1968 evp_pkey = self._lib.EVP_PKEY_new_raw_private_key( 

1969 self._lib.NID_X448, self._ffi.NULL, data_ptr, len(data) 

1970 ) 

1971 self.openssl_assert(evp_pkey != self._ffi.NULL) 

1972 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 

1973 return _X448PrivateKey(self, evp_pkey) 

1974 

1975 def x448_generate_key(self) -> x448.X448PrivateKey: 

1976 evp_pkey = self._evp_pkey_keygen_gc(self._lib.NID_X448) 

1977 return _X448PrivateKey(self, evp_pkey) 

1978 

1979 def x448_supported(self) -> bool: 

1980 if self._fips_enabled: 

1981 return False 

1982 return ( 

1983 not self._lib.CRYPTOGRAPHY_OPENSSL_LESS_THAN_111 

1984 and not self._lib.CRYPTOGRAPHY_IS_BORINGSSL 

1985 ) 

1986 

1987 def ed25519_supported(self) -> bool: 

1988 if self._fips_enabled: 

1989 return False 

1990 return not self._lib.CRYPTOGRAPHY_OPENSSL_LESS_THAN_111B 

1991 

1992 def ed25519_load_public_bytes( 

1993 self, data: bytes 

1994 ) -> ed25519.Ed25519PublicKey: 

1995 utils._check_bytes("data", data) 

1996 

1997 if len(data) != ed25519._ED25519_KEY_SIZE: 

1998 raise ValueError("An Ed25519 public key is 32 bytes long") 

1999 

2000 evp_pkey = self._lib.EVP_PKEY_new_raw_public_key( 

2001 self._lib.NID_ED25519, self._ffi.NULL, data, len(data) 

2002 ) 

2003 self.openssl_assert(evp_pkey != self._ffi.NULL) 

2004 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 

2005 

2006 return _Ed25519PublicKey(self, evp_pkey) 

2007 

2008 def ed25519_load_private_bytes( 

2009 self, data: bytes 

2010 ) -> ed25519.Ed25519PrivateKey: 

2011 if len(data) != ed25519._ED25519_KEY_SIZE: 

2012 raise ValueError("An Ed25519 private key is 32 bytes long") 

2013 

2014 utils._check_byteslike("data", data) 

2015 data_ptr = self._ffi.from_buffer(data) 

2016 evp_pkey = self._lib.EVP_PKEY_new_raw_private_key( 

2017 self._lib.NID_ED25519, self._ffi.NULL, data_ptr, len(data) 

2018 ) 

2019 self.openssl_assert(evp_pkey != self._ffi.NULL) 

2020 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 

2021 

2022 return _Ed25519PrivateKey(self, evp_pkey) 

2023 

2024 def ed25519_generate_key(self) -> ed25519.Ed25519PrivateKey: 

2025 evp_pkey = self._evp_pkey_keygen_gc(self._lib.NID_ED25519) 

2026 return _Ed25519PrivateKey(self, evp_pkey) 

2027 

2028 def ed448_supported(self) -> bool: 

2029 if self._fips_enabled: 

2030 return False 

2031 return ( 

2032 not self._lib.CRYPTOGRAPHY_OPENSSL_LESS_THAN_111B 

2033 and not self._lib.CRYPTOGRAPHY_IS_BORINGSSL 

2034 ) 

2035 

2036 def ed448_load_public_bytes(self, data: bytes) -> ed448.Ed448PublicKey: 

2037 utils._check_bytes("data", data) 

2038 if len(data) != _ED448_KEY_SIZE: 

2039 raise ValueError("An Ed448 public key is 57 bytes long") 

2040 

2041 evp_pkey = self._lib.EVP_PKEY_new_raw_public_key( 

2042 self._lib.NID_ED448, self._ffi.NULL, data, len(data) 

2043 ) 

2044 self.openssl_assert(evp_pkey != self._ffi.NULL) 

2045 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 

2046 

2047 return _Ed448PublicKey(self, evp_pkey) 

2048 

2049 def ed448_load_private_bytes(self, data: bytes) -> ed448.Ed448PrivateKey: 

2050 utils._check_byteslike("data", data) 

2051 if len(data) != _ED448_KEY_SIZE: 

2052 raise ValueError("An Ed448 private key is 57 bytes long") 

2053 

2054 data_ptr = self._ffi.from_buffer(data) 

2055 evp_pkey = self._lib.EVP_PKEY_new_raw_private_key( 

2056 self._lib.NID_ED448, self._ffi.NULL, data_ptr, len(data) 

2057 ) 

2058 self.openssl_assert(evp_pkey != self._ffi.NULL) 

2059 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 

2060 

2061 return _Ed448PrivateKey(self, evp_pkey) 

2062 

2063 def ed448_generate_key(self) -> ed448.Ed448PrivateKey: 

2064 evp_pkey = self._evp_pkey_keygen_gc(self._lib.NID_ED448) 

2065 return _Ed448PrivateKey(self, evp_pkey) 

2066 

2067 def derive_scrypt( 

2068 self, 

2069 key_material: bytes, 

2070 salt: bytes, 

2071 length: int, 

2072 n: int, 

2073 r: int, 

2074 p: int, 

2075 ) -> bytes: 

2076 buf = self._ffi.new("unsigned char[]", length) 

2077 key_material_ptr = self._ffi.from_buffer(key_material) 

2078 res = self._lib.EVP_PBE_scrypt( 

2079 key_material_ptr, 

2080 len(key_material), 

2081 salt, 

2082 len(salt), 

2083 n, 

2084 r, 

2085 p, 

2086 scrypt._MEM_LIMIT, 

2087 buf, 

2088 length, 

2089 ) 

2090 if res != 1: 

2091 errors = self._consume_errors_with_text() 

2092 # memory required formula explained here: 

2093 # https://blog.filippo.io/the-scrypt-parameters/ 

2094 min_memory = 128 * n * r // (1024**2) 

2095 raise MemoryError( 

2096 "Not enough memory to derive key. These parameters require" 

2097 " {} MB of memory.".format(min_memory), 

2098 errors, 

2099 ) 

2100 return self._ffi.buffer(buf)[:] 

2101 

2102 def aead_cipher_supported(self, cipher) -> bool: 

2103 cipher_name = aead._aead_cipher_name(cipher) 

2104 if self._fips_enabled and cipher_name not in self._fips_aead: 

2105 return False 

2106 # SIV isn't loaded through get_cipherbyname but instead a new fetch API 

2107 # only available in 3.0+. But if we know we're on 3.0+ then we know 

2108 # it's supported. 

2109 if cipher_name.endswith(b"-siv"): 

2110 return self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER == 1 

2111 else: 

2112 return ( 

2113 self._lib.EVP_get_cipherbyname(cipher_name) != self._ffi.NULL 

2114 ) 

2115 

2116 @contextlib.contextmanager 

2117 def _zeroed_bytearray(self, length: int) -> typing.Iterator[bytearray]: 

2118 """ 

2119 This method creates a bytearray, which we copy data into (hopefully 

2120 also from a mutable buffer that can be dynamically erased!), and then 

2121 zero when we're done. 

2122 """ 

2123 ba = bytearray(length) 

2124 try: 

2125 yield ba 

2126 finally: 

2127 self._zero_data(ba, length) 

2128 

2129 def _zero_data(self, data, length: int) -> None: 

2130 # We clear things this way because at the moment we're not 

2131 # sure of a better way that can guarantee it overwrites the 

2132 # memory of a bytearray and doesn't just replace the underlying char *. 

2133 for i in range(length): 

2134 data[i] = 0 

2135 

2136 @contextlib.contextmanager 

2137 def _zeroed_null_terminated_buf(self, data): 

2138 """ 

2139 This method takes bytes, which can be a bytestring or a mutable 

2140 buffer like a bytearray, and yields a null-terminated version of that 

2141 data. This is required because PKCS12_parse doesn't take a length with 

2142 its password char * and ffi.from_buffer doesn't provide null 

2143 termination. So, to support zeroing the data via bytearray we 

2144 need to build this ridiculous construct that copies the memory, but 

2145 zeroes it after use. 

2146 """ 

2147 if data is None: 

2148 yield self._ffi.NULL 

2149 else: 

2150 data_len = len(data) 

2151 buf = self._ffi.new("char[]", data_len + 1) 

2152 self._ffi.memmove(buf, data, data_len) 

2153 try: 

2154 yield buf 

2155 finally: 

2156 # Cast to a uint8_t * so we can assign by integer 

2157 self._zero_data(self._ffi.cast("uint8_t *", buf), data_len) 

2158 

2159 def load_key_and_certificates_from_pkcs12( 

2160 self, data: bytes, password: typing.Optional[bytes] 

2161 ) -> typing.Tuple[ 

2162 typing.Optional[PRIVATE_KEY_TYPES], 

2163 typing.Optional[x509.Certificate], 

2164 typing.List[x509.Certificate], 

2165 ]: 

2166 pkcs12 = self.load_pkcs12(data, password) 

2167 return ( 

2168 pkcs12.key, 

2169 pkcs12.cert.certificate if pkcs12.cert else None, 

2170 [cert.certificate for cert in pkcs12.additional_certs], 

2171 ) 

2172 

2173 def load_pkcs12( 

2174 self, data: bytes, password: typing.Optional[bytes] 

2175 ) -> PKCS12KeyAndCertificates: 

2176 if password is not None: 

2177 utils._check_byteslike("password", password) 

2178 

2179 bio = self._bytes_to_bio(data) 

2180 p12 = self._lib.d2i_PKCS12_bio(bio.bio, self._ffi.NULL) 

2181 if p12 == self._ffi.NULL: 

2182 self._consume_errors() 

2183 raise ValueError("Could not deserialize PKCS12 data") 

2184 

2185 p12 = self._ffi.gc(p12, self._lib.PKCS12_free) 

2186 evp_pkey_ptr = self._ffi.new("EVP_PKEY **") 

2187 x509_ptr = self._ffi.new("X509 **") 

2188 sk_x509_ptr = self._ffi.new("Cryptography_STACK_OF_X509 **") 

2189 with self._zeroed_null_terminated_buf(password) as password_buf: 

2190 res = self._lib.PKCS12_parse( 

2191 p12, password_buf, evp_pkey_ptr, x509_ptr, sk_x509_ptr 

2192 ) 

2193 # OpenSSL 3.0.6 leaves errors on the stack even in success, so 

2194 # we consume all errors unconditionally. 

2195 # https://github.com/openssl/openssl/issues/19389 

2196 self._consume_errors() 

2197 if res == 0: 

2198 raise ValueError("Invalid password or PKCS12 data") 

2199 

2200 cert = None 

2201 key = None 

2202 additional_certificates = [] 

2203 

2204 if evp_pkey_ptr[0] != self._ffi.NULL: 

2205 evp_pkey = self._ffi.gc(evp_pkey_ptr[0], self._lib.EVP_PKEY_free) 

2206 key = self._evp_pkey_to_private_key(evp_pkey) 

2207 

2208 if x509_ptr[0] != self._ffi.NULL: 

2209 x509 = self._ffi.gc(x509_ptr[0], self._lib.X509_free) 

2210 cert_obj = self._ossl2cert(x509) 

2211 name = None 

2212 maybe_name = self._lib.X509_alias_get0(x509, self._ffi.NULL) 

2213 if maybe_name != self._ffi.NULL: 

2214 name = self._ffi.string(maybe_name) 

2215 cert = PKCS12Certificate(cert_obj, name) 

2216 

2217 if sk_x509_ptr[0] != self._ffi.NULL: 

2218 sk_x509 = self._ffi.gc(sk_x509_ptr[0], self._lib.sk_X509_free) 

2219 num = self._lib.sk_X509_num(sk_x509_ptr[0]) 

2220 

2221 # In OpenSSL < 3.0.0 PKCS12 parsing reverses the order of the 

2222 # certificates. 

2223 indices: typing.Iterable[int] 

2224 if ( 

2225 self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER 

2226 or self._lib.CRYPTOGRAPHY_IS_BORINGSSL 

2227 ): 

2228 indices = range(num) 

2229 else: 

2230 indices = reversed(range(num)) 

2231 

2232 for i in indices: 

2233 x509 = self._lib.sk_X509_value(sk_x509, i) 

2234 self.openssl_assert(x509 != self._ffi.NULL) 

2235 x509 = self._ffi.gc(x509, self._lib.X509_free) 

2236 addl_cert = self._ossl2cert(x509) 

2237 addl_name = None 

2238 maybe_name = self._lib.X509_alias_get0(x509, self._ffi.NULL) 

2239 if maybe_name != self._ffi.NULL: 

2240 addl_name = self._ffi.string(maybe_name) 

2241 additional_certificates.append( 

2242 PKCS12Certificate(addl_cert, addl_name) 

2243 ) 

2244 

2245 return PKCS12KeyAndCertificates(key, cert, additional_certificates) 

2246 

2247 def serialize_key_and_certificates_to_pkcs12( 

2248 self, 

2249 name: typing.Optional[bytes], 

2250 key: typing.Optional[_ALLOWED_PKCS12_TYPES], 

2251 cert: typing.Optional[x509.Certificate], 

2252 cas: typing.Optional[typing.List[_PKCS12_CAS_TYPES]], 

2253 encryption_algorithm: serialization.KeySerializationEncryption, 

2254 ) -> bytes: 

2255 password = None 

2256 if name is not None: 

2257 utils._check_bytes("name", name) 

2258 

2259 if isinstance(encryption_algorithm, serialization.NoEncryption): 

2260 nid_cert = -1 

2261 nid_key = -1 

2262 pkcs12_iter = 0 

2263 mac_iter = 0 

2264 mac_alg = self._ffi.NULL 

2265 elif isinstance( 

2266 encryption_algorithm, serialization.BestAvailableEncryption 

2267 ): 

2268 # PKCS12 encryption is hopeless trash and can never be fixed. 

2269 # OpenSSL 3 supports PBESv2, but Libre and Boring do not, so 

2270 # we use PBESv1 with 3DES on the older paths. 

2271 if self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER: 

2272 nid_cert = self._lib.NID_aes_256_cbc 

2273 nid_key = self._lib.NID_aes_256_cbc 

2274 else: 

2275 nid_cert = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC 

2276 nid_key = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC 

2277 # At least we can set this higher than OpenSSL's default 

2278 pkcs12_iter = 20000 

2279 # mac_iter chosen for compatibility reasons, see: 

2280 # https://www.openssl.org/docs/man1.1.1/man3/PKCS12_create.html 

2281 # Did we mention how lousy PKCS12 encryption is? 

2282 mac_iter = 1 

2283 # MAC algorithm can only be set on OpenSSL 3.0.0+ 

2284 mac_alg = self._ffi.NULL 

2285 password = encryption_algorithm.password 

2286 elif ( 

2287 isinstance( 

2288 encryption_algorithm, serialization._KeySerializationEncryption 

2289 ) 

2290 and encryption_algorithm._format 

2291 is serialization.PrivateFormat.PKCS12 

2292 ): 

2293 # Default to OpenSSL's defaults. Behavior will vary based on the 

2294 # version of OpenSSL cryptography is compiled against. 

2295 nid_cert = 0 

2296 nid_key = 0 

2297 # Use the default iters we use in best available 

2298 pkcs12_iter = 20000 

2299 # See the Best Available comment for why this is 1 

2300 mac_iter = 1 

2301 password = encryption_algorithm.password 

2302 keycertalg = encryption_algorithm._key_cert_algorithm 

2303 if keycertalg is PBES.PBESv1SHA1And3KeyTripleDESCBC: 

2304 nid_cert = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC 

2305 nid_key = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC 

2306 elif keycertalg is PBES.PBESv2SHA256AndAES256CBC: 

2307 if not self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER: 

2308 raise UnsupportedAlgorithm( 

2309 "PBESv2 is not supported by this version of OpenSSL" 

2310 ) 

2311 nid_cert = self._lib.NID_aes_256_cbc 

2312 nid_key = self._lib.NID_aes_256_cbc 

2313 else: 

2314 assert keycertalg is None 

2315 # We use OpenSSL's defaults 

2316 

2317 if encryption_algorithm._hmac_hash is not None: 

2318 if not self._lib.Cryptography_HAS_PKCS12_SET_MAC: 

2319 raise UnsupportedAlgorithm( 

2320 "Setting MAC algorithm is not supported by this " 

2321 "version of OpenSSL." 

2322 ) 

2323 mac_alg = self._evp_md_non_null_from_algorithm( 

2324 encryption_algorithm._hmac_hash 

2325 ) 

2326 self.openssl_assert(mac_alg != self._ffi.NULL) 

2327 else: 

2328 mac_alg = self._ffi.NULL 

2329 

2330 if encryption_algorithm._kdf_rounds is not None: 

2331 pkcs12_iter = encryption_algorithm._kdf_rounds 

2332 

2333 else: 

2334 raise ValueError("Unsupported key encryption type") 

2335 

2336 if cas is None or len(cas) == 0: 

2337 sk_x509 = self._ffi.NULL 

2338 else: 

2339 sk_x509 = self._lib.sk_X509_new_null() 

2340 sk_x509 = self._ffi.gc(sk_x509, self._lib.sk_X509_free) 

2341 

2342 # This list is to keep the x509 values alive until end of function 

2343 ossl_cas = [] 

2344 for ca in cas: 

2345 if isinstance(ca, PKCS12Certificate): 

2346 ca_alias = ca.friendly_name 

2347 ossl_ca = self._cert2ossl(ca.certificate) 

2348 with self._zeroed_null_terminated_buf( 

2349 ca_alias 

2350 ) as ca_name_buf: 

2351 res = self._lib.X509_alias_set1( 

2352 ossl_ca, ca_name_buf, -1 

2353 ) 

2354 self.openssl_assert(res == 1) 

2355 else: 

2356 ossl_ca = self._cert2ossl(ca) 

2357 ossl_cas.append(ossl_ca) 

2358 res = self._lib.sk_X509_push(sk_x509, ossl_ca) 

2359 backend.openssl_assert(res >= 1) 

2360 

2361 with self._zeroed_null_terminated_buf(password) as password_buf: 

2362 with self._zeroed_null_terminated_buf(name) as name_buf: 

2363 ossl_cert = self._cert2ossl(cert) if cert else self._ffi.NULL 

2364 if key is not None: 

2365 evp_pkey = key._evp_pkey # type: ignore[union-attr] 

2366 else: 

2367 evp_pkey = self._ffi.NULL 

2368 

2369 p12 = self._lib.PKCS12_create( 

2370 password_buf, 

2371 name_buf, 

2372 evp_pkey, 

2373 ossl_cert, 

2374 sk_x509, 

2375 nid_key, 

2376 nid_cert, 

2377 pkcs12_iter, 

2378 mac_iter, 

2379 0, 

2380 ) 

2381 

2382 if ( 

2383 self._lib.Cryptography_HAS_PKCS12_SET_MAC 

2384 and mac_alg != self._ffi.NULL 

2385 ): 

2386 self._lib.PKCS12_set_mac( 

2387 p12, 

2388 password_buf, 

2389 -1, 

2390 self._ffi.NULL, 

2391 0, 

2392 mac_iter, 

2393 mac_alg, 

2394 ) 

2395 

2396 self.openssl_assert(p12 != self._ffi.NULL) 

2397 p12 = self._ffi.gc(p12, self._lib.PKCS12_free) 

2398 

2399 bio = self._create_mem_bio_gc() 

2400 res = self._lib.i2d_PKCS12_bio(bio, p12) 

2401 self.openssl_assert(res > 0) 

2402 return self._read_mem_bio(bio) 

2403 

2404 def poly1305_supported(self) -> bool: 

2405 if self._fips_enabled: 

2406 return False 

2407 return self._lib.Cryptography_HAS_POLY1305 == 1 

2408 

2409 def create_poly1305_ctx(self, key: bytes) -> _Poly1305Context: 

2410 utils._check_byteslike("key", key) 

2411 if len(key) != _POLY1305_KEY_SIZE: 

2412 raise ValueError("A poly1305 key is 32 bytes long") 

2413 

2414 return _Poly1305Context(self, key) 

2415 

2416 def pkcs7_supported(self) -> bool: 

2417 return not self._lib.CRYPTOGRAPHY_IS_BORINGSSL 

2418 

2419 def load_pem_pkcs7_certificates( 

2420 self, data: bytes 

2421 ) -> typing.List[x509.Certificate]: 

2422 utils._check_bytes("data", data) 

2423 bio = self._bytes_to_bio(data) 

2424 p7 = self._lib.PEM_read_bio_PKCS7( 

2425 bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL 

2426 ) 

2427 if p7 == self._ffi.NULL: 

2428 self._consume_errors() 

2429 raise ValueError("Unable to parse PKCS7 data") 

2430 

2431 p7 = self._ffi.gc(p7, self._lib.PKCS7_free) 

2432 return self._load_pkcs7_certificates(p7) 

2433 

2434 def load_der_pkcs7_certificates( 

2435 self, data: bytes 

2436 ) -> typing.List[x509.Certificate]: 

2437 utils._check_bytes("data", data) 

2438 bio = self._bytes_to_bio(data) 

2439 p7 = self._lib.d2i_PKCS7_bio(bio.bio, self._ffi.NULL) 

2440 if p7 == self._ffi.NULL: 

2441 self._consume_errors() 

2442 raise ValueError("Unable to parse PKCS7 data") 

2443 

2444 p7 = self._ffi.gc(p7, self._lib.PKCS7_free) 

2445 return self._load_pkcs7_certificates(p7) 

2446 

2447 def _load_pkcs7_certificates(self, p7): 

2448 nid = self._lib.OBJ_obj2nid(p7.type) 

2449 self.openssl_assert(nid != self._lib.NID_undef) 

2450 if nid != self._lib.NID_pkcs7_signed: 

2451 raise UnsupportedAlgorithm( 

2452 "Only basic signed structures are currently supported. NID" 

2453 " for this data was {}".format(nid), 

2454 _Reasons.UNSUPPORTED_SERIALIZATION, 

2455 ) 

2456 

2457 sk_x509 = p7.d.sign.cert 

2458 num = self._lib.sk_X509_num(sk_x509) 

2459 certs = [] 

2460 for i in range(num): 

2461 x509 = self._lib.sk_X509_value(sk_x509, i) 

2462 self.openssl_assert(x509 != self._ffi.NULL) 

2463 res = self._lib.X509_up_ref(x509) 

2464 # When OpenSSL is less than 1.1.0 up_ref returns the current 

2465 # refcount. On 1.1.0+ it returns 1 for success. 

2466 self.openssl_assert(res >= 1) 

2467 x509 = self._ffi.gc(x509, self._lib.X509_free) 

2468 cert = self._ossl2cert(x509) 

2469 certs.append(cert) 

2470 

2471 return certs 

2472 

2473 def pkcs7_serialize_certificates( 

2474 self, 

2475 certs: typing.List[x509.Certificate], 

2476 encoding: serialization.Encoding, 

2477 ): 

2478 certs = list(certs) 

2479 if not certs or not all( 

2480 isinstance(cert, x509.Certificate) for cert in certs 

2481 ): 

2482 raise TypeError("certs must be a list of certs with length >= 1") 

2483 

2484 if encoding not in ( 

2485 serialization.Encoding.PEM, 

2486 serialization.Encoding.DER, 

2487 ): 

2488 raise TypeError("encoding must DER or PEM from the Encoding enum") 

2489 

2490 certs_sk = self._lib.sk_X509_new_null() 

2491 certs_sk = self._ffi.gc(certs_sk, self._lib.sk_X509_free) 

2492 # This list is to keep the x509 values alive until end of function 

2493 ossl_certs = [] 

2494 for cert in certs: 

2495 ossl_cert = self._cert2ossl(cert) 

2496 ossl_certs.append(ossl_cert) 

2497 res = self._lib.sk_X509_push(certs_sk, ossl_cert) 

2498 self.openssl_assert(res >= 1) 

2499 # We use PKCS7_sign here because it creates the PKCS7 and PKCS7_SIGNED 

2500 # structures for us rather than requiring manual assignment. 

2501 p7 = self._lib.PKCS7_sign( 

2502 self._ffi.NULL, 

2503 self._ffi.NULL, 

2504 certs_sk, 

2505 self._ffi.NULL, 

2506 self._lib.PKCS7_PARTIAL, 

2507 ) 

2508 bio_out = self._create_mem_bio_gc() 

2509 if encoding is serialization.Encoding.PEM: 

2510 res = self._lib.PEM_write_bio_PKCS7_stream( 

2511 bio_out, p7, self._ffi.NULL, 0 

2512 ) 

2513 else: 

2514 assert encoding is serialization.Encoding.DER 

2515 res = self._lib.i2d_PKCS7_bio(bio_out, p7) 

2516 

2517 self.openssl_assert(res == 1) 

2518 return self._read_mem_bio(bio_out) 

2519 

2520 def pkcs7_sign( 

2521 self, 

2522 builder: pkcs7.PKCS7SignatureBuilder, 

2523 encoding: serialization.Encoding, 

2524 options: typing.List[pkcs7.PKCS7Options], 

2525 ) -> bytes: 

2526 assert builder._data is not None 

2527 bio = self._bytes_to_bio(builder._data) 

2528 init_flags = self._lib.PKCS7_PARTIAL 

2529 final_flags = 0 

2530 

2531 if len(builder._additional_certs) == 0: 

2532 certs = self._ffi.NULL 

2533 else: 

2534 certs = self._lib.sk_X509_new_null() 

2535 certs = self._ffi.gc(certs, self._lib.sk_X509_free) 

2536 # This list is to keep the x509 values alive until end of function 

2537 ossl_certs = [] 

2538 for cert in builder._additional_certs: 

2539 ossl_cert = self._cert2ossl(cert) 

2540 ossl_certs.append(ossl_cert) 

2541 res = self._lib.sk_X509_push(certs, ossl_cert) 

2542 self.openssl_assert(res >= 1) 

2543 

2544 if pkcs7.PKCS7Options.DetachedSignature in options: 

2545 # Don't embed the data in the PKCS7 structure 

2546 init_flags |= self._lib.PKCS7_DETACHED 

2547 final_flags |= self._lib.PKCS7_DETACHED 

2548 

2549 # This just inits a structure for us. However, there 

2550 # are flags we need to set, joy. 

2551 p7 = self._lib.PKCS7_sign( 

2552 self._ffi.NULL, 

2553 self._ffi.NULL, 

2554 certs, 

2555 self._ffi.NULL, 

2556 init_flags, 

2557 ) 

2558 self.openssl_assert(p7 != self._ffi.NULL) 

2559 p7 = self._ffi.gc(p7, self._lib.PKCS7_free) 

2560 signer_flags = 0 

2561 # These flags are configurable on a per-signature basis 

2562 # but we've deliberately chosen to make the API only allow 

2563 # setting it across all signatures for now. 

2564 if pkcs7.PKCS7Options.NoCapabilities in options: 

2565 signer_flags |= self._lib.PKCS7_NOSMIMECAP 

2566 elif pkcs7.PKCS7Options.NoAttributes in options: 

2567 signer_flags |= self._lib.PKCS7_NOATTR 

2568 

2569 if pkcs7.PKCS7Options.NoCerts in options: 

2570 signer_flags |= self._lib.PKCS7_NOCERTS 

2571 

2572 for certificate, private_key, hash_algorithm in builder._signers: 

2573 ossl_cert = self._cert2ossl(certificate) 

2574 md = self._evp_md_non_null_from_algorithm(hash_algorithm) 

2575 p7signerinfo = self._lib.PKCS7_sign_add_signer( 

2576 p7, 

2577 ossl_cert, 

2578 private_key._evp_pkey, # type: ignore[union-attr] 

2579 md, 

2580 signer_flags, 

2581 ) 

2582 self.openssl_assert(p7signerinfo != self._ffi.NULL) 

2583 

2584 for option in options: 

2585 # DetachedSignature, NoCapabilities, and NoAttributes are already 

2586 # handled so we just need to check these last two options. 

2587 if option is pkcs7.PKCS7Options.Text: 

2588 final_flags |= self._lib.PKCS7_TEXT 

2589 elif option is pkcs7.PKCS7Options.Binary: 

2590 final_flags |= self._lib.PKCS7_BINARY 

2591 

2592 bio_out = self._create_mem_bio_gc() 

2593 if encoding is serialization.Encoding.SMIME: 

2594 # This finalizes the structure 

2595 res = self._lib.SMIME_write_PKCS7( 

2596 bio_out, p7, bio.bio, final_flags 

2597 ) 

2598 elif encoding is serialization.Encoding.PEM: 

2599 res = self._lib.PKCS7_final(p7, bio.bio, final_flags) 

2600 self.openssl_assert(res == 1) 

2601 res = self._lib.PEM_write_bio_PKCS7_stream( 

2602 bio_out, p7, bio.bio, final_flags 

2603 ) 

2604 else: 

2605 assert encoding is serialization.Encoding.DER 

2606 # We need to call finalize here becauase i2d_PKCS7_bio does not 

2607 # finalize. 

2608 res = self._lib.PKCS7_final(p7, bio.bio, final_flags) 

2609 self.openssl_assert(res == 1) 

2610 # OpenSSL 3.0 leaves a random bio error on the stack: 

2611 # https://github.com/openssl/openssl/issues/16681 

2612 if self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER: 

2613 self._consume_errors() 

2614 res = self._lib.i2d_PKCS7_bio(bio_out, p7) 

2615 self.openssl_assert(res == 1) 

2616 return self._read_mem_bio(bio_out) 

2617 

2618 

2619class GetCipherByName: 

2620 def __init__(self, fmt: str): 

2621 self._fmt = fmt 

2622 

2623 def __call__(self, backend: Backend, cipher: CipherAlgorithm, mode: Mode): 

2624 cipher_name = self._fmt.format(cipher=cipher, mode=mode).lower() 

2625 evp_cipher = backend._lib.EVP_get_cipherbyname( 

2626 cipher_name.encode("ascii") 

2627 ) 

2628 

2629 # try EVP_CIPHER_fetch if present 

2630 if ( 

2631 evp_cipher == backend._ffi.NULL 

2632 and backend._lib.Cryptography_HAS_300_EVP_CIPHER 

2633 ): 

2634 evp_cipher = backend._lib.EVP_CIPHER_fetch( 

2635 backend._ffi.NULL, 

2636 cipher_name.encode("ascii"), 

2637 backend._ffi.NULL, 

2638 ) 

2639 

2640 backend._consume_errors() 

2641 return evp_cipher 

2642 

2643 

2644def _get_xts_cipher(backend: Backend, cipher: AES, mode): 

2645 cipher_name = "aes-{}-xts".format(cipher.key_size // 2) 

2646 return backend._lib.EVP_get_cipherbyname(cipher_name.encode("ascii")) 

2647 

2648 

2649backend = Backend()