Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/cryptography/hazmat/backends/openssl/backend.py: 30%

519 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 07:26 +0000

1# This file is dual licensed under the terms of the Apache License, Version 

2# 2.0, and the BSD License. See the LICENSE file in the root of this repository 

3# for complete details. 

4 

5from __future__ import annotations 

6 

7import collections 

8import contextlib 

9import itertools 

10import typing 

11 

12from cryptography import utils, x509 

13from cryptography.exceptions import UnsupportedAlgorithm, _Reasons 

14from cryptography.hazmat.backends.openssl import aead 

15from cryptography.hazmat.backends.openssl.ciphers import _CipherContext 

16from cryptography.hazmat.bindings._rust import openssl as rust_openssl 

17from cryptography.hazmat.bindings.openssl import binding 

18from cryptography.hazmat.primitives import hashes, serialization 

19from cryptography.hazmat.primitives._asymmetric import AsymmetricPadding 

20from cryptography.hazmat.primitives.asymmetric import dh, ec 

21from cryptography.hazmat.primitives.asymmetric import utils as asym_utils 

22from cryptography.hazmat.primitives.asymmetric.padding import ( 

23 MGF1, 

24 OAEP, 

25 PSS, 

26 PKCS1v15, 

27) 

28from cryptography.hazmat.primitives.asymmetric.types import ( 

29 PrivateKeyTypes, 

30 PublicKeyTypes, 

31) 

32from cryptography.hazmat.primitives.ciphers import ( 

33 CipherAlgorithm, 

34) 

35from cryptography.hazmat.primitives.ciphers.algorithms import ( 

36 AES, 

37 AES128, 

38 AES256, 

39 ARC4, 

40 SM4, 

41 Camellia, 

42 ChaCha20, 

43 TripleDES, 

44 _BlowfishInternal, 

45 _CAST5Internal, 

46 _IDEAInternal, 

47 _SEEDInternal, 

48) 

49from cryptography.hazmat.primitives.ciphers.modes import ( 

50 CBC, 

51 CFB, 

52 CFB8, 

53 CTR, 

54 ECB, 

55 GCM, 

56 OFB, 

57 XTS, 

58 Mode, 

59) 

60from cryptography.hazmat.primitives.serialization.pkcs12 import ( 

61 PBES, 

62 PKCS12Certificate, 

63 PKCS12KeyAndCertificates, 

64 PKCS12PrivateKeyTypes, 

65 _PKCS12CATypes, 

66) 

67 

68_MemoryBIO = collections.namedtuple("_MemoryBIO", ["bio", "char_ptr"]) 

69 

70 

71# Not actually supported, just used as a marker for some serialization tests. 

72class _RC2: 

73 pass 

74 

75 

76class Backend: 

77 """ 

78 OpenSSL API binding interfaces. 

79 """ 

80 

81 name = "openssl" 

82 

83 # FIPS has opinions about acceptable algorithms and key sizes, but the 

84 # disallowed algorithms are still present in OpenSSL. They just error if 

85 # you try to use them. To avoid that we allowlist the algorithms in 

86 # FIPS 140-3. This isn't ideal, but FIPS 140-3 is trash so here we are. 

87 _fips_aead: typing.ClassVar[set[bytes]] = { 

88 b"aes-128-ccm", 

89 b"aes-192-ccm", 

90 b"aes-256-ccm", 

91 b"aes-128-gcm", 

92 b"aes-192-gcm", 

93 b"aes-256-gcm", 

94 } 

95 # TripleDES encryption is disallowed/deprecated throughout 2023 in 

96 # FIPS 140-3. To keep it simple we denylist any use of TripleDES (TDEA). 

97 _fips_ciphers = (AES,) 

98 # Sometimes SHA1 is still permissible. That logic is contained 

99 # within the various *_supported methods. 

100 _fips_hashes = ( 

101 hashes.SHA224, 

102 hashes.SHA256, 

103 hashes.SHA384, 

104 hashes.SHA512, 

105 hashes.SHA512_224, 

106 hashes.SHA512_256, 

107 hashes.SHA3_224, 

108 hashes.SHA3_256, 

109 hashes.SHA3_384, 

110 hashes.SHA3_512, 

111 hashes.SHAKE128, 

112 hashes.SHAKE256, 

113 ) 

114 _fips_ecdh_curves = ( 

115 ec.SECP224R1, 

116 ec.SECP256R1, 

117 ec.SECP384R1, 

118 ec.SECP521R1, 

119 ) 

120 _fips_rsa_min_key_size = 2048 

121 _fips_rsa_min_public_exponent = 65537 

122 _fips_dsa_min_modulus = 1 << 2048 

123 _fips_dh_min_key_size = 2048 

124 _fips_dh_min_modulus = 1 << _fips_dh_min_key_size 

125 

126 def __init__(self) -> None: 

127 self._binding = binding.Binding() 

128 self._ffi = self._binding.ffi 

129 self._lib = self._binding.lib 

130 self._fips_enabled = rust_openssl.is_fips_enabled() 

131 

132 self._cipher_registry: dict[ 

133 tuple[type[CipherAlgorithm], type[Mode]], 

134 typing.Callable, 

135 ] = {} 

136 self._register_default_ciphers() 

137 self._dh_types = [self._lib.EVP_PKEY_DH] 

138 if self._lib.Cryptography_HAS_EVP_PKEY_DHX: 

139 self._dh_types.append(self._lib.EVP_PKEY_DHX) 

140 

141 def __repr__(self) -> str: 

142 return "<OpenSSLBackend(version: {}, FIPS: {}, Legacy: {})>".format( 

143 self.openssl_version_text(), 

144 self._fips_enabled, 

145 self._binding._legacy_provider_loaded, 

146 ) 

147 

148 def openssl_assert( 

149 self, 

150 ok: bool, 

151 errors: list[rust_openssl.OpenSSLError] | None = None, 

152 ) -> None: 

153 return binding._openssl_assert(self._lib, ok, errors=errors) 

154 

155 def _enable_fips(self) -> None: 

156 # This function enables FIPS mode for OpenSSL 3.0.0 on installs that 

157 # have the FIPS provider installed properly. 

158 self._binding._enable_fips() 

159 assert rust_openssl.is_fips_enabled() 

160 self._fips_enabled = rust_openssl.is_fips_enabled() 

161 

162 def openssl_version_text(self) -> str: 

163 """ 

164 Friendly string name of the loaded OpenSSL library. This is not 

165 necessarily the same version as it was compiled against. 

166 

167 Example: OpenSSL 1.1.1d 10 Sep 2019 

168 """ 

169 return self._ffi.string( 

170 self._lib.OpenSSL_version(self._lib.OPENSSL_VERSION) 

171 ).decode("ascii") 

172 

173 def openssl_version_number(self) -> int: 

174 return self._lib.OpenSSL_version_num() 

175 

176 def _evp_md_from_algorithm(self, algorithm: hashes.HashAlgorithm): 

177 if algorithm.name in ("blake2b", "blake2s"): 

178 alg = f"{algorithm.name}{algorithm.digest_size * 8}".encode( 

179 "ascii" 

180 ) 

181 else: 

182 alg = algorithm.name.encode("ascii") 

183 

184 evp_md = self._lib.EVP_get_digestbyname(alg) 

185 return evp_md 

186 

187 def _evp_md_non_null_from_algorithm(self, algorithm: hashes.HashAlgorithm): 

188 evp_md = self._evp_md_from_algorithm(algorithm) 

189 self.openssl_assert(evp_md != self._ffi.NULL) 

190 return evp_md 

191 

192 def hash_supported(self, algorithm: hashes.HashAlgorithm) -> bool: 

193 if self._fips_enabled and not isinstance(algorithm, self._fips_hashes): 

194 return False 

195 

196 evp_md = self._evp_md_from_algorithm(algorithm) 

197 return evp_md != self._ffi.NULL 

198 

199 def signature_hash_supported( 

200 self, algorithm: hashes.HashAlgorithm 

201 ) -> bool: 

202 # Dedicated check for hashing algorithm use in message digest for 

203 # signatures, e.g. RSA PKCS#1 v1.5 SHA1 (sha1WithRSAEncryption). 

204 if self._fips_enabled and isinstance(algorithm, hashes.SHA1): 

205 return False 

206 return self.hash_supported(algorithm) 

207 

208 def scrypt_supported(self) -> bool: 

209 if self._fips_enabled: 

210 return False 

211 else: 

212 return self._lib.Cryptography_HAS_SCRYPT == 1 

213 

214 def hmac_supported(self, algorithm: hashes.HashAlgorithm) -> bool: 

215 # FIPS mode still allows SHA1 for HMAC 

216 if self._fips_enabled and isinstance(algorithm, hashes.SHA1): 

217 return True 

218 

219 return self.hash_supported(algorithm) 

220 

221 def cipher_supported(self, cipher: CipherAlgorithm, mode: Mode) -> bool: 

222 if self._fips_enabled: 

223 # FIPS mode requires AES. TripleDES is disallowed/deprecated in 

224 # FIPS 140-3. 

225 if not isinstance(cipher, self._fips_ciphers): 

226 return False 

227 

228 try: 

229 adapter = self._cipher_registry[type(cipher), type(mode)] 

230 except KeyError: 

231 return False 

232 evp_cipher = adapter(self, cipher, mode) 

233 return self._ffi.NULL != evp_cipher 

234 

235 def register_cipher_adapter(self, cipher_cls, mode_cls, adapter) -> None: 

236 if (cipher_cls, mode_cls) in self._cipher_registry: 

237 raise ValueError( 

238 f"Duplicate registration for: {cipher_cls} {mode_cls}." 

239 ) 

240 self._cipher_registry[cipher_cls, mode_cls] = adapter 

241 

242 def _register_default_ciphers(self) -> None: 

243 for cipher_cls in [AES, AES128, AES256]: 

244 for mode_cls in [CBC, CTR, ECB, OFB, CFB, CFB8, GCM]: 

245 self.register_cipher_adapter( 

246 cipher_cls, 

247 mode_cls, 

248 GetCipherByName( 

249 "{cipher.name}-{cipher.key_size}-{mode.name}" 

250 ), 

251 ) 

252 for mode_cls in [CBC, CTR, ECB, OFB, CFB]: 

253 self.register_cipher_adapter( 

254 Camellia, 

255 mode_cls, 

256 GetCipherByName("{cipher.name}-{cipher.key_size}-{mode.name}"), 

257 ) 

258 for mode_cls in [CBC, CFB, CFB8, OFB]: 

259 self.register_cipher_adapter( 

260 TripleDES, mode_cls, GetCipherByName("des-ede3-{mode.name}") 

261 ) 

262 self.register_cipher_adapter( 

263 TripleDES, ECB, GetCipherByName("des-ede3") 

264 ) 

265 # ChaCha20 uses the Long Name "chacha20" in OpenSSL, but in LibreSSL 

266 # it uses "chacha" 

267 self.register_cipher_adapter( 

268 ChaCha20, 

269 type(None), 

270 GetCipherByName( 

271 "chacha" if self._lib.CRYPTOGRAPHY_IS_LIBRESSL else "chacha20" 

272 ), 

273 ) 

274 self.register_cipher_adapter(AES, XTS, _get_xts_cipher) 

275 for mode_cls in [ECB, CBC, OFB, CFB, CTR]: 

276 self.register_cipher_adapter( 

277 SM4, mode_cls, GetCipherByName("sm4-{mode.name}") 

278 ) 

279 # Don't register legacy ciphers if they're unavailable. Hypothetically 

280 # this wouldn't be necessary because we test availability by seeing if 

281 # we get an EVP_CIPHER * in the _CipherContext __init__, but OpenSSL 3 

282 # will return a valid pointer even though the cipher is unavailable. 

283 if ( 

284 self._binding._legacy_provider_loaded 

285 or not self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER 

286 ): 

287 for mode_cls in [CBC, CFB, OFB, ECB]: 

288 self.register_cipher_adapter( 

289 _BlowfishInternal, 

290 mode_cls, 

291 GetCipherByName("bf-{mode.name}"), 

292 ) 

293 for mode_cls in [CBC, CFB, OFB, ECB]: 

294 self.register_cipher_adapter( 

295 _SEEDInternal, 

296 mode_cls, 

297 GetCipherByName("seed-{mode.name}"), 

298 ) 

299 for cipher_cls, mode_cls in itertools.product( 

300 [_CAST5Internal, _IDEAInternal], 

301 [CBC, OFB, CFB, ECB], 

302 ): 

303 self.register_cipher_adapter( 

304 cipher_cls, 

305 mode_cls, 

306 GetCipherByName("{cipher.name}-{mode.name}"), 

307 ) 

308 self.register_cipher_adapter( 

309 ARC4, type(None), GetCipherByName("rc4") 

310 ) 

311 # We don't actually support RC2, this is just used by some tests. 

312 self.register_cipher_adapter( 

313 _RC2, type(None), GetCipherByName("rc2") 

314 ) 

315 

316 def create_symmetric_encryption_ctx( 

317 self, cipher: CipherAlgorithm, mode: Mode 

318 ) -> _CipherContext: 

319 return _CipherContext(self, cipher, mode, _CipherContext._ENCRYPT) 

320 

321 def create_symmetric_decryption_ctx( 

322 self, cipher: CipherAlgorithm, mode: Mode 

323 ) -> _CipherContext: 

324 return _CipherContext(self, cipher, mode, _CipherContext._DECRYPT) 

325 

326 def pbkdf2_hmac_supported(self, algorithm: hashes.HashAlgorithm) -> bool: 

327 return self.hmac_supported(algorithm) 

328 

329 def _consume_errors(self) -> list[rust_openssl.OpenSSLError]: 

330 return rust_openssl.capture_error_stack() 

331 

332 def generate_rsa_parameters_supported( 

333 self, public_exponent: int, key_size: int 

334 ) -> bool: 

335 return ( 

336 public_exponent >= 3 

337 and public_exponent & 1 != 0 

338 and key_size >= 512 

339 ) 

340 

341 def _create_evp_pkey_gc(self): 

342 evp_pkey = self._lib.EVP_PKEY_new() 

343 self.openssl_assert(evp_pkey != self._ffi.NULL) 

344 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 

345 return evp_pkey 

346 

347 def _rsa_cdata_to_evp_pkey(self, rsa_cdata): 

348 evp_pkey = self._create_evp_pkey_gc() 

349 res = self._lib.EVP_PKEY_set1_RSA(evp_pkey, rsa_cdata) 

350 self.openssl_assert(res == 1) 

351 return evp_pkey 

352 

353 def _bytes_to_bio(self, data: bytes) -> _MemoryBIO: 

354 """ 

355 Return a _MemoryBIO namedtuple of (BIO, char*). 

356 

357 The char* is the storage for the BIO and it must stay alive until the 

358 BIO is finished with. 

359 """ 

360 data_ptr = self._ffi.from_buffer(data) 

361 bio = self._lib.BIO_new_mem_buf(data_ptr, len(data)) 

362 self.openssl_assert(bio != self._ffi.NULL) 

363 

364 return _MemoryBIO(self._ffi.gc(bio, self._lib.BIO_free), data_ptr) 

365 

366 def _create_mem_bio_gc(self): 

367 """ 

368 Creates an empty memory BIO. 

369 """ 

370 bio_method = self._lib.BIO_s_mem() 

371 self.openssl_assert(bio_method != self._ffi.NULL) 

372 bio = self._lib.BIO_new(bio_method) 

373 self.openssl_assert(bio != self._ffi.NULL) 

374 bio = self._ffi.gc(bio, self._lib.BIO_free) 

375 return bio 

376 

377 def _read_mem_bio(self, bio) -> bytes: 

378 """ 

379 Reads a memory BIO. This only works on memory BIOs. 

380 """ 

381 buf = self._ffi.new("char **") 

382 buf_len = self._lib.BIO_get_mem_data(bio, buf) 

383 self.openssl_assert(buf_len > 0) 

384 self.openssl_assert(buf[0] != self._ffi.NULL) 

385 bio_data = self._ffi.buffer(buf[0], buf_len)[:] 

386 return bio_data 

387 

388 def _evp_pkey_to_private_key( 

389 self, evp_pkey, unsafe_skip_rsa_key_validation: bool 

390 ) -> PrivateKeyTypes: 

391 """ 

392 Return the appropriate type of PrivateKey given an evp_pkey cdata 

393 pointer. 

394 """ 

395 return rust_openssl.keys.private_key_from_ptr( 

396 int(self._ffi.cast("uintptr_t", evp_pkey)), 

397 unsafe_skip_rsa_key_validation=unsafe_skip_rsa_key_validation, 

398 ) 

399 

400 def _evp_pkey_to_public_key(self, evp_pkey) -> PublicKeyTypes: 

401 """ 

402 Return the appropriate type of PublicKey given an evp_pkey cdata 

403 pointer. 

404 """ 

405 return rust_openssl.keys.public_key_from_ptr( 

406 int(self._ffi.cast("uintptr_t", evp_pkey)) 

407 ) 

408 

409 def _oaep_hash_supported(self, algorithm: hashes.HashAlgorithm) -> bool: 

410 if self._fips_enabled and isinstance(algorithm, hashes.SHA1): 

411 return False 

412 

413 return isinstance( 

414 algorithm, 

415 ( 

416 hashes.SHA1, 

417 hashes.SHA224, 

418 hashes.SHA256, 

419 hashes.SHA384, 

420 hashes.SHA512, 

421 ), 

422 ) 

423 

424 def rsa_padding_supported(self, padding: AsymmetricPadding) -> bool: 

425 if isinstance(padding, PKCS1v15): 

426 return True 

427 elif isinstance(padding, PSS) and isinstance(padding._mgf, MGF1): 

428 # SHA1 is permissible in MGF1 in FIPS even when SHA1 is blocked 

429 # as signature algorithm. 

430 if self._fips_enabled and isinstance( 

431 padding._mgf._algorithm, hashes.SHA1 

432 ): 

433 return True 

434 else: 

435 return self.hash_supported(padding._mgf._algorithm) 

436 elif isinstance(padding, OAEP) and isinstance(padding._mgf, MGF1): 

437 return self._oaep_hash_supported( 

438 padding._mgf._algorithm 

439 ) and self._oaep_hash_supported(padding._algorithm) 

440 else: 

441 return False 

442 

443 def rsa_encryption_supported(self, padding: AsymmetricPadding) -> bool: 

444 if self._fips_enabled and isinstance(padding, PKCS1v15): 

445 return False 

446 else: 

447 return self.rsa_padding_supported(padding) 

448 

449 def dsa_supported(self) -> bool: 

450 return ( 

451 not self._lib.CRYPTOGRAPHY_IS_BORINGSSL and not self._fips_enabled 

452 ) 

453 

454 def dsa_hash_supported(self, algorithm: hashes.HashAlgorithm) -> bool: 

455 if not self.dsa_supported(): 

456 return False 

457 return self.signature_hash_supported(algorithm) 

458 

459 def cmac_algorithm_supported(self, algorithm) -> bool: 

460 return self.cipher_supported( 

461 algorithm, CBC(b"\x00" * algorithm.block_size) 

462 ) 

463 

464 def load_pem_private_key( 

465 self, 

466 data: bytes, 

467 password: bytes | None, 

468 unsafe_skip_rsa_key_validation: bool, 

469 ) -> PrivateKeyTypes: 

470 return self._load_key( 

471 self._lib.PEM_read_bio_PrivateKey, 

472 data, 

473 password, 

474 unsafe_skip_rsa_key_validation, 

475 ) 

476 

477 def load_pem_public_key(self, data: bytes) -> PublicKeyTypes: 

478 mem_bio = self._bytes_to_bio(data) 

479 # In OpenSSL 3.0.x the PEM_read_bio_PUBKEY function will invoke 

480 # the default password callback if you pass an encrypted private 

481 # key. This is very, very, very bad as the default callback can 

482 # trigger an interactive console prompt, which will hang the 

483 # Python process. We therefore provide our own callback to 

484 # catch this and error out properly. 

485 userdata = self._ffi.new("CRYPTOGRAPHY_PASSWORD_DATA *") 

486 evp_pkey = self._lib.PEM_read_bio_PUBKEY( 

487 mem_bio.bio, 

488 self._ffi.NULL, 

489 self._ffi.addressof( 

490 self._lib._original_lib, "Cryptography_pem_password_cb" 

491 ), 

492 userdata, 

493 ) 

494 if evp_pkey != self._ffi.NULL: 

495 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 

496 return self._evp_pkey_to_public_key(evp_pkey) 

497 else: 

498 # It's not a (RSA/DSA/ECDSA) subjectPublicKeyInfo, but we still 

499 # need to check to see if it is a pure PKCS1 RSA public key (not 

500 # embedded in a subjectPublicKeyInfo) 

501 self._consume_errors() 

502 res = self._lib.BIO_reset(mem_bio.bio) 

503 self.openssl_assert(res == 1) 

504 rsa_cdata = self._lib.PEM_read_bio_RSAPublicKey( 

505 mem_bio.bio, 

506 self._ffi.NULL, 

507 self._ffi.addressof( 

508 self._lib._original_lib, "Cryptography_pem_password_cb" 

509 ), 

510 userdata, 

511 ) 

512 if rsa_cdata != self._ffi.NULL: 

513 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) 

514 evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata) 

515 return self._evp_pkey_to_public_key(evp_pkey) 

516 else: 

517 self._handle_key_loading_error() 

518 

519 def load_der_private_key( 

520 self, 

521 data: bytes, 

522 password: bytes | None, 

523 unsafe_skip_rsa_key_validation: bool, 

524 ) -> PrivateKeyTypes: 

525 # OpenSSL has a function called d2i_AutoPrivateKey that in theory 

526 # handles this automatically, however it doesn't handle encrypted 

527 # private keys. Instead we try to load the key two different ways. 

528 # First we'll try to load it as a traditional key. 

529 bio_data = self._bytes_to_bio(data) 

530 key = self._evp_pkey_from_der_traditional_key(bio_data, password) 

531 if key: 

532 return self._evp_pkey_to_private_key( 

533 key, unsafe_skip_rsa_key_validation 

534 ) 

535 else: 

536 # Finally we try to load it with the method that handles encrypted 

537 # PKCS8 properly. 

538 return self._load_key( 

539 self._lib.d2i_PKCS8PrivateKey_bio, 

540 data, 

541 password, 

542 unsafe_skip_rsa_key_validation, 

543 ) 

544 

545 def _evp_pkey_from_der_traditional_key(self, bio_data, password): 

546 key = self._lib.d2i_PrivateKey_bio(bio_data.bio, self._ffi.NULL) 

547 if key != self._ffi.NULL: 

548 key = self._ffi.gc(key, self._lib.EVP_PKEY_free) 

549 if password is not None: 

550 raise TypeError( 

551 "Password was given but private key is not encrypted." 

552 ) 

553 

554 return key 

555 else: 

556 self._consume_errors() 

557 return None 

558 

559 def load_der_public_key(self, data: bytes) -> PublicKeyTypes: 

560 mem_bio = self._bytes_to_bio(data) 

561 evp_pkey = self._lib.d2i_PUBKEY_bio(mem_bio.bio, self._ffi.NULL) 

562 if evp_pkey != self._ffi.NULL: 

563 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 

564 return self._evp_pkey_to_public_key(evp_pkey) 

565 else: 

566 # It's not a (RSA/DSA/ECDSA) subjectPublicKeyInfo, but we still 

567 # need to check to see if it is a pure PKCS1 RSA public key (not 

568 # embedded in a subjectPublicKeyInfo) 

569 self._consume_errors() 

570 res = self._lib.BIO_reset(mem_bio.bio) 

571 self.openssl_assert(res == 1) 

572 rsa_cdata = self._lib.d2i_RSAPublicKey_bio( 

573 mem_bio.bio, self._ffi.NULL 

574 ) 

575 if rsa_cdata != self._ffi.NULL: 

576 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) 

577 evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata) 

578 return self._evp_pkey_to_public_key(evp_pkey) 

579 else: 

580 self._handle_key_loading_error() 

581 

582 def _cert2ossl(self, cert: x509.Certificate) -> typing.Any: 

583 data = cert.public_bytes(serialization.Encoding.DER) 

584 mem_bio = self._bytes_to_bio(data) 

585 x509 = self._lib.d2i_X509_bio(mem_bio.bio, self._ffi.NULL) 

586 self.openssl_assert(x509 != self._ffi.NULL) 

587 x509 = self._ffi.gc(x509, self._lib.X509_free) 

588 return x509 

589 

590 def _ossl2cert(self, x509_ptr: typing.Any) -> x509.Certificate: 

591 bio = self._create_mem_bio_gc() 

592 res = self._lib.i2d_X509_bio(bio, x509_ptr) 

593 self.openssl_assert(res == 1) 

594 return x509.load_der_x509_certificate(self._read_mem_bio(bio)) 

595 

596 def _key2ossl(self, key: PKCS12PrivateKeyTypes) -> typing.Any: 

597 data = key.private_bytes( 

598 serialization.Encoding.DER, 

599 serialization.PrivateFormat.PKCS8, 

600 serialization.NoEncryption(), 

601 ) 

602 mem_bio = self._bytes_to_bio(data) 

603 

604 evp_pkey = self._lib.d2i_PrivateKey_bio( 

605 mem_bio.bio, 

606 self._ffi.NULL, 

607 ) 

608 self.openssl_assert(evp_pkey != self._ffi.NULL) 

609 return self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 

610 

611 def _load_key( 

612 self, openssl_read_func, data, password, unsafe_skip_rsa_key_validation 

613 ) -> PrivateKeyTypes: 

614 mem_bio = self._bytes_to_bio(data) 

615 

616 userdata = self._ffi.new("CRYPTOGRAPHY_PASSWORD_DATA *") 

617 if password is not None: 

618 utils._check_byteslike("password", password) 

619 password_ptr = self._ffi.from_buffer(password) 

620 userdata.password = password_ptr 

621 userdata.length = len(password) 

622 

623 evp_pkey = openssl_read_func( 

624 mem_bio.bio, 

625 self._ffi.NULL, 

626 self._ffi.addressof( 

627 self._lib._original_lib, "Cryptography_pem_password_cb" 

628 ), 

629 userdata, 

630 ) 

631 

632 if evp_pkey == self._ffi.NULL: 

633 if userdata.error != 0: 

634 self._consume_errors() 

635 if userdata.error == -1: 

636 raise TypeError( 

637 "Password was not given but private key is encrypted" 

638 ) 

639 else: 

640 assert userdata.error == -2 

641 raise ValueError( 

642 "Passwords longer than {} bytes are not supported " 

643 "by this backend.".format(userdata.maxsize - 1) 

644 ) 

645 else: 

646 self._handle_key_loading_error() 

647 

648 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 

649 

650 if password is not None and userdata.called == 0: 

651 raise TypeError( 

652 "Password was given but private key is not encrypted." 

653 ) 

654 

655 assert ( 

656 password is not None and userdata.called == 1 

657 ) or password is None 

658 

659 return self._evp_pkey_to_private_key( 

660 evp_pkey, unsafe_skip_rsa_key_validation 

661 ) 

662 

663 def _handle_key_loading_error(self) -> typing.NoReturn: 

664 errors = self._consume_errors() 

665 

666 if not errors: 

667 raise ValueError( 

668 "Could not deserialize key data. The data may be in an " 

669 "incorrect format or it may be encrypted with an unsupported " 

670 "algorithm." 

671 ) 

672 

673 elif ( 

674 errors[0]._lib_reason_match( 

675 self._lib.ERR_LIB_EVP, self._lib.EVP_R_BAD_DECRYPT 

676 ) 

677 or errors[0]._lib_reason_match( 

678 self._lib.ERR_LIB_PKCS12, 

679 self._lib.PKCS12_R_PKCS12_CIPHERFINAL_ERROR, 

680 ) 

681 or ( 

682 self._lib.Cryptography_HAS_PROVIDERS 

683 and errors[0]._lib_reason_match( 

684 self._lib.ERR_LIB_PROV, 

685 self._lib.PROV_R_BAD_DECRYPT, 

686 ) 

687 ) 

688 ): 

689 raise ValueError("Bad decrypt. Incorrect password?") 

690 

691 elif any( 

692 error._lib_reason_match( 

693 self._lib.ERR_LIB_EVP, 

694 self._lib.EVP_R_UNSUPPORTED_PRIVATE_KEY_ALGORITHM, 

695 ) 

696 for error in errors 

697 ): 

698 raise ValueError("Unsupported public key algorithm.") 

699 

700 else: 

701 raise ValueError( 

702 "Could not deserialize key data. The data may be in an " 

703 "incorrect format, it may be encrypted with an unsupported " 

704 "algorithm, or it may be an unsupported key type (e.g. EC " 

705 "curves with explicit parameters).", 

706 errors, 

707 ) 

708 

709 def elliptic_curve_supported(self, curve: ec.EllipticCurve) -> bool: 

710 if self._fips_enabled and not isinstance( 

711 curve, self._fips_ecdh_curves 

712 ): 

713 return False 

714 

715 return rust_openssl.ec.curve_supported(curve) 

716 

717 def elliptic_curve_signature_algorithm_supported( 

718 self, 

719 signature_algorithm: ec.EllipticCurveSignatureAlgorithm, 

720 curve: ec.EllipticCurve, 

721 ) -> bool: 

722 # We only support ECDSA right now. 

723 if not isinstance(signature_algorithm, ec.ECDSA): 

724 return False 

725 

726 return self.elliptic_curve_supported(curve) and ( 

727 isinstance(signature_algorithm.algorithm, asym_utils.Prehashed) 

728 or self.hash_supported(signature_algorithm.algorithm) 

729 ) 

730 

731 def elliptic_curve_exchange_algorithm_supported( 

732 self, algorithm: ec.ECDH, curve: ec.EllipticCurve 

733 ) -> bool: 

734 return self.elliptic_curve_supported(curve) and isinstance( 

735 algorithm, ec.ECDH 

736 ) 

737 

738 def dh_supported(self) -> bool: 

739 return not self._lib.CRYPTOGRAPHY_IS_BORINGSSL 

740 

741 def dh_parameters_supported( 

742 self, p: int, g: int, q: int | None = None 

743 ) -> bool: 

744 try: 

745 rust_openssl.dh.from_parameter_numbers( 

746 dh.DHParameterNumbers(p=p, g=g, q=q) 

747 ) 

748 except ValueError: 

749 return False 

750 else: 

751 return True 

752 

753 def dh_x942_serialization_supported(self) -> bool: 

754 return self._lib.Cryptography_HAS_EVP_PKEY_DHX == 1 

755 

756 def x25519_supported(self) -> bool: 

757 if self._fips_enabled: 

758 return False 

759 return True 

760 

761 def x448_supported(self) -> bool: 

762 if self._fips_enabled: 

763 return False 

764 return ( 

765 not self._lib.CRYPTOGRAPHY_IS_LIBRESSL 

766 and not self._lib.CRYPTOGRAPHY_IS_BORINGSSL 

767 ) 

768 

769 def ed25519_supported(self) -> bool: 

770 if self._fips_enabled: 

771 return False 

772 return True 

773 

774 def ed448_supported(self) -> bool: 

775 if self._fips_enabled: 

776 return False 

777 return ( 

778 not self._lib.CRYPTOGRAPHY_IS_LIBRESSL 

779 and not self._lib.CRYPTOGRAPHY_IS_BORINGSSL 

780 ) 

781 

782 def aead_cipher_supported(self, cipher) -> bool: 

783 return aead._aead_cipher_supported(self, cipher) 

784 

785 def _zero_data(self, data, length: int) -> None: 

786 # We clear things this way because at the moment we're not 

787 # sure of a better way that can guarantee it overwrites the 

788 # memory of a bytearray and doesn't just replace the underlying char *. 

789 for i in range(length): 

790 data[i] = 0 

791 

792 @contextlib.contextmanager 

793 def _zeroed_null_terminated_buf(self, data): 

794 """ 

795 This method takes bytes, which can be a bytestring or a mutable 

796 buffer like a bytearray, and yields a null-terminated version of that 

797 data. This is required because PKCS12_parse doesn't take a length with 

798 its password char * and ffi.from_buffer doesn't provide null 

799 termination. So, to support zeroing the data via bytearray we 

800 need to build this ridiculous construct that copies the memory, but 

801 zeroes it after use. 

802 """ 

803 if data is None: 

804 yield self._ffi.NULL 

805 else: 

806 data_len = len(data) 

807 buf = self._ffi.new("char[]", data_len + 1) 

808 self._ffi.memmove(buf, data, data_len) 

809 try: 

810 yield buf 

811 finally: 

812 # Cast to a uint8_t * so we can assign by integer 

813 self._zero_data(self._ffi.cast("uint8_t *", buf), data_len) 

814 

815 def load_key_and_certificates_from_pkcs12( 

816 self, data: bytes, password: bytes | None 

817 ) -> tuple[ 

818 PrivateKeyTypes | None, 

819 x509.Certificate | None, 

820 list[x509.Certificate], 

821 ]: 

822 pkcs12 = self.load_pkcs12(data, password) 

823 return ( 

824 pkcs12.key, 

825 pkcs12.cert.certificate if pkcs12.cert else None, 

826 [cert.certificate for cert in pkcs12.additional_certs], 

827 ) 

828 

829 def load_pkcs12( 

830 self, data: bytes, password: bytes | None 

831 ) -> PKCS12KeyAndCertificates: 

832 if password is not None: 

833 utils._check_byteslike("password", password) 

834 

835 bio = self._bytes_to_bio(data) 

836 p12 = self._lib.d2i_PKCS12_bio(bio.bio, self._ffi.NULL) 

837 if p12 == self._ffi.NULL: 

838 self._consume_errors() 

839 raise ValueError("Could not deserialize PKCS12 data") 

840 

841 p12 = self._ffi.gc(p12, self._lib.PKCS12_free) 

842 evp_pkey_ptr = self._ffi.new("EVP_PKEY **") 

843 x509_ptr = self._ffi.new("X509 **") 

844 sk_x509_ptr = self._ffi.new("Cryptography_STACK_OF_X509 **") 

845 with self._zeroed_null_terminated_buf(password) as password_buf: 

846 res = self._lib.PKCS12_parse( 

847 p12, password_buf, evp_pkey_ptr, x509_ptr, sk_x509_ptr 

848 ) 

849 if res == 0: 

850 self._consume_errors() 

851 raise ValueError("Invalid password or PKCS12 data") 

852 

853 cert = None 

854 key = None 

855 additional_certificates = [] 

856 

857 if evp_pkey_ptr[0] != self._ffi.NULL: 

858 evp_pkey = self._ffi.gc(evp_pkey_ptr[0], self._lib.EVP_PKEY_free) 

859 # We don't support turning off RSA key validation when loading 

860 # PKCS12 keys 

861 key = self._evp_pkey_to_private_key( 

862 evp_pkey, unsafe_skip_rsa_key_validation=False 

863 ) 

864 

865 if x509_ptr[0] != self._ffi.NULL: 

866 x509 = self._ffi.gc(x509_ptr[0], self._lib.X509_free) 

867 cert_obj = self._ossl2cert(x509) 

868 name = None 

869 maybe_name = self._lib.X509_alias_get0(x509, self._ffi.NULL) 

870 if maybe_name != self._ffi.NULL: 

871 name = self._ffi.string(maybe_name) 

872 cert = PKCS12Certificate(cert_obj, name) 

873 

874 if sk_x509_ptr[0] != self._ffi.NULL: 

875 sk_x509 = self._ffi.gc(sk_x509_ptr[0], self._lib.sk_X509_free) 

876 num = self._lib.sk_X509_num(sk_x509_ptr[0]) 

877 

878 # In OpenSSL < 3.0.0 PKCS12 parsing reverses the order of the 

879 # certificates. 

880 indices: typing.Iterable[int] 

881 if ( 

882 self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER 

883 or self._lib.CRYPTOGRAPHY_IS_BORINGSSL 

884 ): 

885 indices = range(num) 

886 else: 

887 indices = reversed(range(num)) 

888 

889 for i in indices: 

890 x509 = self._lib.sk_X509_value(sk_x509, i) 

891 self.openssl_assert(x509 != self._ffi.NULL) 

892 x509 = self._ffi.gc(x509, self._lib.X509_free) 

893 addl_cert = self._ossl2cert(x509) 

894 addl_name = None 

895 maybe_name = self._lib.X509_alias_get0(x509, self._ffi.NULL) 

896 if maybe_name != self._ffi.NULL: 

897 addl_name = self._ffi.string(maybe_name) 

898 additional_certificates.append( 

899 PKCS12Certificate(addl_cert, addl_name) 

900 ) 

901 

902 return PKCS12KeyAndCertificates(key, cert, additional_certificates) 

903 

904 def serialize_key_and_certificates_to_pkcs12( 

905 self, 

906 name: bytes | None, 

907 key: PKCS12PrivateKeyTypes | None, 

908 cert: x509.Certificate | None, 

909 cas: list[_PKCS12CATypes] | None, 

910 encryption_algorithm: serialization.KeySerializationEncryption, 

911 ) -> bytes: 

912 password = None 

913 if name is not None: 

914 utils._check_bytes("name", name) 

915 

916 if isinstance(encryption_algorithm, serialization.NoEncryption): 

917 nid_cert = -1 

918 nid_key = -1 

919 pkcs12_iter = 0 

920 mac_iter = 0 

921 mac_alg = self._ffi.NULL 

922 elif isinstance( 

923 encryption_algorithm, serialization.BestAvailableEncryption 

924 ): 

925 # PKCS12 encryption is hopeless trash and can never be fixed. 

926 # OpenSSL 3 supports PBESv2, but Libre and Boring do not, so 

927 # we use PBESv1 with 3DES on the older paths. 

928 if self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER: 

929 nid_cert = self._lib.NID_aes_256_cbc 

930 nid_key = self._lib.NID_aes_256_cbc 

931 else: 

932 nid_cert = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC 

933 nid_key = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC 

934 # At least we can set this higher than OpenSSL's default 

935 pkcs12_iter = 20000 

936 # mac_iter chosen for compatibility reasons, see: 

937 # https://www.openssl.org/docs/man1.1.1/man3/PKCS12_create.html 

938 # Did we mention how lousy PKCS12 encryption is? 

939 mac_iter = 1 

940 # MAC algorithm can only be set on OpenSSL 3.0.0+ 

941 mac_alg = self._ffi.NULL 

942 password = encryption_algorithm.password 

943 elif ( 

944 isinstance( 

945 encryption_algorithm, serialization._KeySerializationEncryption 

946 ) 

947 and encryption_algorithm._format 

948 is serialization.PrivateFormat.PKCS12 

949 ): 

950 # Default to OpenSSL's defaults. Behavior will vary based on the 

951 # version of OpenSSL cryptography is compiled against. 

952 nid_cert = 0 

953 nid_key = 0 

954 # Use the default iters we use in best available 

955 pkcs12_iter = 20000 

956 # See the Best Available comment for why this is 1 

957 mac_iter = 1 

958 password = encryption_algorithm.password 

959 keycertalg = encryption_algorithm._key_cert_algorithm 

960 if keycertalg is PBES.PBESv1SHA1And3KeyTripleDESCBC: 

961 nid_cert = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC 

962 nid_key = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC 

963 elif keycertalg is PBES.PBESv2SHA256AndAES256CBC: 

964 if not self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER: 

965 raise UnsupportedAlgorithm( 

966 "PBESv2 is not supported by this version of OpenSSL" 

967 ) 

968 nid_cert = self._lib.NID_aes_256_cbc 

969 nid_key = self._lib.NID_aes_256_cbc 

970 else: 

971 assert keycertalg is None 

972 # We use OpenSSL's defaults 

973 

974 if encryption_algorithm._hmac_hash is not None: 

975 if not self._lib.Cryptography_HAS_PKCS12_SET_MAC: 

976 raise UnsupportedAlgorithm( 

977 "Setting MAC algorithm is not supported by this " 

978 "version of OpenSSL." 

979 ) 

980 mac_alg = self._evp_md_non_null_from_algorithm( 

981 encryption_algorithm._hmac_hash 

982 ) 

983 self.openssl_assert(mac_alg != self._ffi.NULL) 

984 else: 

985 mac_alg = self._ffi.NULL 

986 

987 if encryption_algorithm._kdf_rounds is not None: 

988 pkcs12_iter = encryption_algorithm._kdf_rounds 

989 

990 else: 

991 raise ValueError("Unsupported key encryption type") 

992 

993 if cas is None or len(cas) == 0: 

994 sk_x509 = self._ffi.NULL 

995 else: 

996 sk_x509 = self._lib.sk_X509_new_null() 

997 sk_x509 = self._ffi.gc(sk_x509, self._lib.sk_X509_free) 

998 

999 # This list is to keep the x509 values alive until end of function 

1000 ossl_cas = [] 

1001 for ca in cas: 

1002 if isinstance(ca, PKCS12Certificate): 

1003 ca_alias = ca.friendly_name 

1004 ossl_ca = self._cert2ossl(ca.certificate) 

1005 if ca_alias is None: 

1006 res = self._lib.X509_alias_set1( 

1007 ossl_ca, self._ffi.NULL, -1 

1008 ) 

1009 else: 

1010 res = self._lib.X509_alias_set1( 

1011 ossl_ca, ca_alias, len(ca_alias) 

1012 ) 

1013 self.openssl_assert(res == 1) 

1014 else: 

1015 ossl_ca = self._cert2ossl(ca) 

1016 ossl_cas.append(ossl_ca) 

1017 res = self._lib.sk_X509_push(sk_x509, ossl_ca) 

1018 backend.openssl_assert(res >= 1) 

1019 

1020 with self._zeroed_null_terminated_buf(password) as password_buf: 

1021 with self._zeroed_null_terminated_buf(name) as name_buf: 

1022 ossl_cert = self._cert2ossl(cert) if cert else self._ffi.NULL 

1023 ossl_pkey = ( 

1024 self._key2ossl(key) if key is not None else self._ffi.NULL 

1025 ) 

1026 

1027 p12 = self._lib.PKCS12_create( 

1028 password_buf, 

1029 name_buf, 

1030 ossl_pkey, 

1031 ossl_cert, 

1032 sk_x509, 

1033 nid_key, 

1034 nid_cert, 

1035 pkcs12_iter, 

1036 mac_iter, 

1037 0, 

1038 ) 

1039 

1040 if ( 

1041 self._lib.Cryptography_HAS_PKCS12_SET_MAC 

1042 and mac_alg != self._ffi.NULL 

1043 ): 

1044 self._lib.PKCS12_set_mac( 

1045 p12, 

1046 password_buf, 

1047 -1, 

1048 self._ffi.NULL, 

1049 0, 

1050 mac_iter, 

1051 mac_alg, 

1052 ) 

1053 

1054 self.openssl_assert(p12 != self._ffi.NULL) 

1055 p12 = self._ffi.gc(p12, self._lib.PKCS12_free) 

1056 

1057 bio = self._create_mem_bio_gc() 

1058 res = self._lib.i2d_PKCS12_bio(bio, p12) 

1059 self.openssl_assert(res > 0) 

1060 return self._read_mem_bio(bio) 

1061 

1062 def poly1305_supported(self) -> bool: 

1063 if self._fips_enabled: 

1064 return False 

1065 elif ( 

1066 self._lib.CRYPTOGRAPHY_IS_BORINGSSL 

1067 or self._lib.CRYPTOGRAPHY_IS_LIBRESSL 

1068 ): 

1069 return True 

1070 else: 

1071 return self._lib.Cryptography_HAS_POLY1305 == 1 

1072 

1073 def pkcs7_supported(self) -> bool: 

1074 return not self._lib.CRYPTOGRAPHY_IS_BORINGSSL 

1075 

1076 def load_pem_pkcs7_certificates( 

1077 self, data: bytes 

1078 ) -> list[x509.Certificate]: 

1079 utils._check_bytes("data", data) 

1080 bio = self._bytes_to_bio(data) 

1081 p7 = self._lib.PEM_read_bio_PKCS7( 

1082 bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL 

1083 ) 

1084 if p7 == self._ffi.NULL: 

1085 self._consume_errors() 

1086 raise ValueError("Unable to parse PKCS7 data") 

1087 

1088 p7 = self._ffi.gc(p7, self._lib.PKCS7_free) 

1089 return self._load_pkcs7_certificates(p7) 

1090 

1091 def load_der_pkcs7_certificates( 

1092 self, data: bytes 

1093 ) -> list[x509.Certificate]: 

1094 utils._check_bytes("data", data) 

1095 bio = self._bytes_to_bio(data) 

1096 p7 = self._lib.d2i_PKCS7_bio(bio.bio, self._ffi.NULL) 

1097 if p7 == self._ffi.NULL: 

1098 self._consume_errors() 

1099 raise ValueError("Unable to parse PKCS7 data") 

1100 

1101 p7 = self._ffi.gc(p7, self._lib.PKCS7_free) 

1102 return self._load_pkcs7_certificates(p7) 

1103 

1104 def _load_pkcs7_certificates(self, p7) -> list[x509.Certificate]: 

1105 nid = self._lib.OBJ_obj2nid(p7.type) 

1106 self.openssl_assert(nid != self._lib.NID_undef) 

1107 if nid != self._lib.NID_pkcs7_signed: 

1108 raise UnsupportedAlgorithm( 

1109 "Only basic signed structures are currently supported. NID" 

1110 f" for this data was {nid}", 

1111 _Reasons.UNSUPPORTED_SERIALIZATION, 

1112 ) 

1113 

1114 if p7.d.sign == self._ffi.NULL: 

1115 raise ValueError( 

1116 "The provided PKCS7 has no certificate data, but a cert " 

1117 "loading method was called." 

1118 ) 

1119 

1120 sk_x509 = p7.d.sign.cert 

1121 num = self._lib.sk_X509_num(sk_x509) 

1122 certs: list[x509.Certificate] = [] 

1123 for i in range(num): 

1124 x509 = self._lib.sk_X509_value(sk_x509, i) 

1125 self.openssl_assert(x509 != self._ffi.NULL) 

1126 cert = self._ossl2cert(x509) 

1127 certs.append(cert) 

1128 

1129 return certs 

1130 

1131 

1132class GetCipherByName: 

1133 def __init__(self, fmt: str): 

1134 self._fmt = fmt 

1135 

1136 def __call__(self, backend: Backend, cipher: CipherAlgorithm, mode: Mode): 

1137 cipher_name = self._fmt.format(cipher=cipher, mode=mode).lower() 

1138 evp_cipher = backend._lib.EVP_get_cipherbyname( 

1139 cipher_name.encode("ascii") 

1140 ) 

1141 

1142 # try EVP_CIPHER_fetch if present 

1143 if ( 

1144 evp_cipher == backend._ffi.NULL 

1145 and backend._lib.Cryptography_HAS_300_EVP_CIPHER 

1146 ): 

1147 evp_cipher = backend._lib.EVP_CIPHER_fetch( 

1148 backend._ffi.NULL, 

1149 cipher_name.encode("ascii"), 

1150 backend._ffi.NULL, 

1151 ) 

1152 

1153 backend._consume_errors() 

1154 return evp_cipher 

1155 

1156 

1157def _get_xts_cipher(backend: Backend, cipher: AES, mode): 

1158 cipher_name = f"aes-{cipher.key_size // 2}-xts" 

1159 return backend._lib.EVP_get_cipherbyname(cipher_name.encode("ascii")) 

1160 

1161 

1162backend = Backend()