Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/OpenSSL/crypto.py: 31%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

769 statements  

1from __future__ import annotations 

2 

3import calendar 

4import datetime 

5import functools 

6import sys 

7import typing 

8from base64 import b16encode 

9from collections.abc import Sequence 

10from functools import partial 

11from typing import ( 

12 Any, 

13 Callable, 

14 Union, 

15) 

16 

17if sys.version_info >= (3, 13): 

18 from warnings import deprecated 

19elif sys.version_info < (3, 8): 

20 _T = typing.TypeVar("T") 

21 

22 def deprecated(msg: str, **kwargs: object) -> Callable[[_T], _T]: 

23 return lambda f: f 

24else: 

25 from typing_extensions import deprecated 

26 

27from cryptography import utils, x509 

28from cryptography.hazmat.primitives.asymmetric import ( 

29 dsa, 

30 ec, 

31 ed448, 

32 ed25519, 

33 rsa, 

34) 

35 

36from OpenSSL._util import StrOrBytesPath 

37from OpenSSL._util import ( 

38 byte_string as _byte_string, 

39) 

40from OpenSSL._util import ( 

41 exception_from_error_queue as _exception_from_error_queue, 

42) 

43from OpenSSL._util import ( 

44 ffi as _ffi, 

45) 

46from OpenSSL._util import ( 

47 lib as _lib, 

48) 

49from OpenSSL._util import ( 

50 make_assert as _make_assert, 

51) 

52from OpenSSL._util import ( 

53 path_bytes as _path_bytes, 

54) 

55 

56__all__ = [ 

57 "FILETYPE_ASN1", 

58 "FILETYPE_PEM", 

59 "FILETYPE_TEXT", 

60 "TYPE_DSA", 

61 "TYPE_RSA", 

62 "X509", 

63 "Error", 

64 "PKey", 

65 "X509Name", 

66 "X509Req", 

67 "X509Store", 

68 "X509StoreContext", 

69 "X509StoreContextError", 

70 "X509StoreFlags", 

71 "dump_certificate", 

72 "dump_certificate_request", 

73 "dump_privatekey", 

74 "dump_publickey", 

75 "get_elliptic_curve", 

76 "get_elliptic_curves", 

77 "load_certificate", 

78 "load_certificate_request", 

79 "load_privatekey", 

80 "load_publickey", 

81] 

82 

83 

84_PrivateKey = Union[ 

85 dsa.DSAPrivateKey, 

86 ec.EllipticCurvePrivateKey, 

87 ed25519.Ed25519PrivateKey, 

88 ed448.Ed448PrivateKey, 

89 rsa.RSAPrivateKey, 

90] 

91_PublicKey = Union[ 

92 dsa.DSAPublicKey, 

93 ec.EllipticCurvePublicKey, 

94 ed25519.Ed25519PublicKey, 

95 ed448.Ed448PublicKey, 

96 rsa.RSAPublicKey, 

97] 

98_Key = Union[_PrivateKey, _PublicKey] 

99PassphraseCallableT = Union[bytes, Callable[..., bytes]] 

100 

101 

102FILETYPE_PEM: int = _lib.SSL_FILETYPE_PEM 

103FILETYPE_ASN1: int = _lib.SSL_FILETYPE_ASN1 

104 

105# TODO This was an API mistake. OpenSSL has no such constant. 

106FILETYPE_TEXT = 2**16 - 1 

107 

108TYPE_RSA: int = _lib.EVP_PKEY_RSA 

109TYPE_DSA: int = _lib.EVP_PKEY_DSA 

110TYPE_DH: int = _lib.EVP_PKEY_DH 

111TYPE_EC: int = _lib.EVP_PKEY_EC 

112 

113 

114class Error(Exception): 

115 """ 

116 An error occurred in an `OpenSSL.crypto` API. 

117 """ 

118 

119 

120_raise_current_error = partial(_exception_from_error_queue, Error) 

121_openssl_assert = _make_assert(Error) 

122 

123 

124def _new_mem_buf(buffer: bytes | None = None) -> Any: 

125 """ 

126 Allocate a new OpenSSL memory BIO. 

127 

128 Arrange for the garbage collector to clean it up automatically. 

129 

130 :param buffer: None or some bytes to use to put into the BIO so that they 

131 can be read out. 

132 """ 

133 if buffer is None: 

134 bio = _lib.BIO_new(_lib.BIO_s_mem()) 

135 free = _lib.BIO_free 

136 else: 

137 data = _ffi.new("char[]", buffer) 

138 bio = _lib.BIO_new_mem_buf(data, len(buffer)) 

139 

140 # Keep the memory alive as long as the bio is alive! 

141 def free(bio: Any, ref: Any = data) -> Any: 

142 return _lib.BIO_free(bio) 

143 

144 _openssl_assert(bio != _ffi.NULL) 

145 

146 bio = _ffi.gc(bio, free) 

147 return bio 

148 

149 

150def _bio_to_string(bio: Any) -> bytes: 

151 """ 

152 Copy the contents of an OpenSSL BIO object into a Python byte string. 

153 """ 

154 result_buffer = _ffi.new("char**") 

155 buffer_length = _lib.BIO_get_mem_data(bio, result_buffer) 

156 return _ffi.buffer(result_buffer[0], buffer_length)[:] 

157 

158 

159def _set_asn1_time(boundary: Any, when: bytes) -> None: 

160 """ 

161 The the time value of an ASN1 time object. 

162 

163 @param boundary: An ASN1_TIME pointer (or an object safely 

164 castable to that type) which will have its value set. 

165 @param when: A string representation of the desired time value. 

166 

167 @raise TypeError: If C{when} is not a L{bytes} string. 

168 @raise ValueError: If C{when} does not represent a time in the required 

169 format. 

170 @raise RuntimeError: If the time value cannot be set for some other 

171 (unspecified) reason. 

172 """ 

173 if not isinstance(when, bytes): 

174 raise TypeError("when must be a byte string") 

175 # ASN1_TIME_set_string validates the string without writing anything 

176 # when the destination is NULL. 

177 _openssl_assert(boundary != _ffi.NULL) 

178 

179 set_result = _lib.ASN1_TIME_set_string(boundary, when) 

180 if set_result == 0: 

181 raise ValueError("Invalid string") 

182 

183 

184def _new_asn1_time(when: bytes) -> Any: 

185 """ 

186 Behaves like _set_asn1_time but returns a new ASN1_TIME object. 

187 

188 @param when: A string representation of the desired time value. 

189 

190 @raise TypeError: If C{when} is not a L{bytes} string. 

191 @raise ValueError: If C{when} does not represent a time in the required 

192 format. 

193 @raise RuntimeError: If the time value cannot be set for some other 

194 (unspecified) reason. 

195 """ 

196 ret = _lib.ASN1_TIME_new() 

197 _openssl_assert(ret != _ffi.NULL) 

198 ret = _ffi.gc(ret, _lib.ASN1_TIME_free) 

199 _set_asn1_time(ret, when) 

200 return ret 

201 

202 

203def _get_asn1_time(timestamp: Any) -> bytes | None: 

204 """ 

205 Retrieve the time value of an ASN1 time object. 

206 

207 @param timestamp: An ASN1_GENERALIZEDTIME* (or an object safely castable to 

208 that type) from which the time value will be retrieved. 

209 

210 @return: The time value from C{timestamp} as a L{bytes} string in a certain 

211 format. Or C{None} if the object contains no time value. 

212 """ 

213 string_timestamp = _ffi.cast("ASN1_STRING*", timestamp) 

214 if _lib.ASN1_STRING_length(string_timestamp) == 0: 

215 return None 

216 elif ( 

217 _lib.ASN1_STRING_type(string_timestamp) == _lib.V_ASN1_GENERALIZEDTIME 

218 ): 

219 return _ffi.string(_lib.ASN1_STRING_get0_data(string_timestamp)) 

220 else: 

221 generalized_timestamp = _ffi.new("ASN1_GENERALIZEDTIME**") 

222 _lib.ASN1_TIME_to_generalizedtime(timestamp, generalized_timestamp) 

223 _openssl_assert(generalized_timestamp[0] != _ffi.NULL) 

224 

225 string_timestamp = _ffi.cast("ASN1_STRING*", generalized_timestamp[0]) 

226 string_data = _lib.ASN1_STRING_get0_data(string_timestamp) 

227 string_result = _ffi.string(string_data) 

228 _lib.ASN1_GENERALIZEDTIME_free(generalized_timestamp[0]) 

229 return string_result 

230 

231 

232class _X509NameInvalidator: 

233 def __init__(self) -> None: 

234 self._names: list[X509Name] = [] 

235 

236 def add(self, name: X509Name) -> None: 

237 self._names.append(name) 

238 

239 def clear(self) -> None: 

240 for name in self._names: 

241 # Breaks the object, but also prevents UAF! 

242 del name._name 

243 

244 

245class PKey: 

246 """ 

247 A class representing an DSA or RSA public key or key pair. 

248 """ 

249 

250 _only_public = False 

251 _initialized = True 

252 

253 def __init__(self) -> None: 

254 pkey = _lib.EVP_PKEY_new() 

255 self._pkey = _ffi.gc(pkey, _lib.EVP_PKEY_free) 

256 self._initialized = False 

257 

258 def to_cryptography_key(self) -> _Key: 

259 """ 

260 Export as a ``cryptography`` key. 

261 

262 :rtype: One of ``cryptography``'s `key interfaces`_. 

263 

264 .. _key interfaces: https://cryptography.io/en/latest/hazmat/\ 

265 primitives/asymmetric/rsa/#key-interfaces 

266 

267 .. versionadded:: 16.1.0 

268 """ 

269 from cryptography.hazmat.primitives.serialization import ( 

270 load_der_private_key, 

271 load_der_public_key, 

272 ) 

273 

274 if self._only_public: 

275 der = dump_publickey(FILETYPE_ASN1, self) 

276 return typing.cast(_Key, load_der_public_key(der)) 

277 else: 

278 der = dump_privatekey(FILETYPE_ASN1, self) 

279 return typing.cast(_Key, load_der_private_key(der, password=None)) 

280 

281 @classmethod 

282 def from_cryptography_key(cls, crypto_key: _Key) -> PKey: 

283 """ 

284 Construct based on a ``cryptography`` *crypto_key*. 

285 

286 :param crypto_key: A ``cryptography`` key. 

287 :type crypto_key: One of ``cryptography``'s `key interfaces`_. 

288 

289 :rtype: PKey 

290 

291 .. versionadded:: 16.1.0 

292 """ 

293 if not isinstance( 

294 crypto_key, 

295 ( 

296 dsa.DSAPrivateKey, 

297 dsa.DSAPublicKey, 

298 ec.EllipticCurvePrivateKey, 

299 ec.EllipticCurvePublicKey, 

300 ed25519.Ed25519PrivateKey, 

301 ed25519.Ed25519PublicKey, 

302 ed448.Ed448PrivateKey, 

303 ed448.Ed448PublicKey, 

304 rsa.RSAPrivateKey, 

305 rsa.RSAPublicKey, 

306 ), 

307 ): 

308 raise TypeError("Unsupported key type") 

309 

310 from cryptography.hazmat.primitives.serialization import ( 

311 Encoding, 

312 NoEncryption, 

313 PrivateFormat, 

314 PublicFormat, 

315 ) 

316 

317 if isinstance( 

318 crypto_key, 

319 ( 

320 dsa.DSAPublicKey, 

321 ec.EllipticCurvePublicKey, 

322 ed25519.Ed25519PublicKey, 

323 ed448.Ed448PublicKey, 

324 rsa.RSAPublicKey, 

325 ), 

326 ): 

327 return load_publickey( 

328 FILETYPE_ASN1, 

329 crypto_key.public_bytes( 

330 Encoding.DER, PublicFormat.SubjectPublicKeyInfo 

331 ), 

332 ) 

333 else: 

334 der = crypto_key.private_bytes( 

335 Encoding.DER, PrivateFormat.PKCS8, NoEncryption() 

336 ) 

337 return load_privatekey(FILETYPE_ASN1, der) 

338 

339 def generate_key(self, type: int, bits: int) -> None: 

340 """ 

341 Generate a key pair of the given type, with the given number of bits. 

342 

343 This generates a key "into" the this object. 

344 

345 :param type: The key type. 

346 :type type: :py:data:`TYPE_RSA` or :py:data:`TYPE_DSA` 

347 :param bits: The number of bits. 

348 :type bits: :py:data:`int` ``>= 0`` 

349 :raises TypeError: If :py:data:`type` or :py:data:`bits` isn't 

350 of the appropriate type. 

351 :raises ValueError: If the number of bits isn't an integer of 

352 the appropriate size. 

353 :return: ``None`` 

354 """ 

355 if not isinstance(type, int): 

356 raise TypeError("type must be an integer") 

357 

358 if not isinstance(bits, int): 

359 raise TypeError("bits must be an integer") 

360 

361 if type == TYPE_RSA: 

362 if bits <= 0: 

363 raise ValueError("Invalid number of bits") 

364 

365 # TODO Check error return 

366 exponent = _lib.BN_new() 

367 exponent = _ffi.gc(exponent, _lib.BN_free) 

368 _lib.BN_set_word(exponent, _lib.RSA_F4) 

369 

370 rsa = _lib.RSA_new() 

371 

372 result = _lib.RSA_generate_key_ex(rsa, bits, exponent, _ffi.NULL) 

373 _openssl_assert(result == 1) 

374 

375 result = _lib.EVP_PKEY_assign_RSA(self._pkey, rsa) 

376 _openssl_assert(result == 1) 

377 

378 elif type == TYPE_DSA: 

379 dsa = _lib.DSA_new() 

380 _openssl_assert(dsa != _ffi.NULL) 

381 

382 dsa = _ffi.gc(dsa, _lib.DSA_free) 

383 res = _lib.DSA_generate_parameters_ex( 

384 dsa, bits, _ffi.NULL, 0, _ffi.NULL, _ffi.NULL, _ffi.NULL 

385 ) 

386 _openssl_assert(res == 1) 

387 

388 _openssl_assert(_lib.DSA_generate_key(dsa) == 1) 

389 _openssl_assert(_lib.EVP_PKEY_set1_DSA(self._pkey, dsa) == 1) 

390 else: 

391 raise Error("No such key type") 

392 

393 self._initialized = True 

394 

395 def check(self) -> bool: 

396 """ 

397 Check the consistency of an RSA private key. 

398 

399 This is the Python equivalent of OpenSSL's ``RSA_check_key``. 

400 

401 :return: ``True`` if key is consistent. 

402 

403 :raise OpenSSL.crypto.Error: if the key is inconsistent. 

404 

405 :raise TypeError: if the key is of a type which cannot be checked. 

406 Only RSA keys can currently be checked. 

407 """ 

408 if self._only_public: 

409 raise TypeError("public key only") 

410 

411 if _lib.EVP_PKEY_type(self.type()) != _lib.EVP_PKEY_RSA: 

412 raise TypeError("Only RSA keys can currently be checked.") 

413 

414 rsa = _lib.EVP_PKEY_get1_RSA(self._pkey) 

415 rsa = _ffi.gc(rsa, _lib.RSA_free) 

416 result = _lib.RSA_check_key(rsa) 

417 if result == 1: 

418 return True 

419 _raise_current_error() 

420 

421 def type(self) -> int: 

422 """ 

423 Returns the type of the key 

424 

425 :return: The type of the key. 

426 """ 

427 return _lib.EVP_PKEY_id(self._pkey) 

428 

429 def bits(self) -> int: 

430 """ 

431 Returns the number of bits of the key 

432 

433 :return: The number of bits of the key. 

434 """ 

435 return _lib.EVP_PKEY_bits(self._pkey) 

436 

437 

438class _EllipticCurve: 

439 """ 

440 A representation of a supported elliptic curve. 

441 

442 @cvar _curves: :py:obj:`None` until an attempt is made to load the curves. 

443 Thereafter, a :py:type:`set` containing :py:type:`_EllipticCurve` 

444 instances each of which represents one curve supported by the system. 

445 @type _curves: :py:type:`NoneType` or :py:type:`set` 

446 """ 

447 

448 _curves = None 

449 

450 def __ne__(self, other: Any) -> bool: 

451 """ 

452 Implement cooperation with the right-hand side argument of ``!=``. 

453 

454 Python 3 seems to have dropped this cooperation in this very narrow 

455 circumstance. 

456 """ 

457 if isinstance(other, _EllipticCurve): 

458 return super().__ne__(other) 

459 return NotImplemented 

460 

461 @classmethod 

462 def _load_elliptic_curves(cls, lib: Any) -> set[_EllipticCurve]: 

463 """ 

464 Get the curves supported by OpenSSL. 

465 

466 :param lib: The OpenSSL library binding object. 

467 

468 :return: A :py:type:`set` of ``cls`` instances giving the names of the 

469 elliptic curves the underlying library supports. 

470 """ 

471 num_curves = lib.EC_get_builtin_curves(_ffi.NULL, 0) 

472 builtin_curves = _ffi.new("EC_builtin_curve[]", num_curves) 

473 # The return value on this call should be num_curves again. We 

474 # could check it to make sure but if it *isn't* then.. what could 

475 # we do? Abort the whole process, I suppose...? -exarkun 

476 lib.EC_get_builtin_curves(builtin_curves, num_curves) 

477 return set(cls.from_nid(lib, c.nid) for c in builtin_curves) 

478 

479 @classmethod 

480 def _get_elliptic_curves(cls, lib: Any) -> set[_EllipticCurve]: 

481 """ 

482 Get, cache, and return the curves supported by OpenSSL. 

483 

484 :param lib: The OpenSSL library binding object. 

485 

486 :return: A :py:type:`set` of ``cls`` instances giving the names of the 

487 elliptic curves the underlying library supports. 

488 """ 

489 if cls._curves is None: 

490 cls._curves = cls._load_elliptic_curves(lib) 

491 return cls._curves 

492 

493 @classmethod 

494 def from_nid(cls, lib: Any, nid: int) -> _EllipticCurve: 

495 """ 

496 Instantiate a new :py:class:`_EllipticCurve` associated with the given 

497 OpenSSL NID. 

498 

499 :param lib: The OpenSSL library binding object. 

500 

501 :param nid: The OpenSSL NID the resulting curve object will represent. 

502 This must be a curve NID (and not, for example, a hash NID) or 

503 subsequent operations will fail in unpredictable ways. 

504 :type nid: :py:class:`int` 

505 

506 :return: The curve object. 

507 """ 

508 return cls(lib, nid, _ffi.string(lib.OBJ_nid2sn(nid)).decode("ascii")) 

509 

510 def __init__(self, lib: Any, nid: int, name: str) -> None: 

511 """ 

512 :param _lib: The :py:mod:`cryptography` binding instance used to 

513 interface with OpenSSL. 

514 

515 :param _nid: The OpenSSL NID identifying the curve this object 

516 represents. 

517 :type _nid: :py:class:`int` 

518 

519 :param name: The OpenSSL short name identifying the curve this object 

520 represents. 

521 :type name: :py:class:`unicode` 

522 """ 

523 self._lib = lib 

524 self._nid = nid 

525 self.name = name 

526 

527 def __repr__(self) -> str: 

528 return f"<Curve {self.name!r}>" 

529 

530 def _to_EC_KEY(self) -> Any: 

531 """ 

532 Create a new OpenSSL EC_KEY structure initialized to use this curve. 

533 

534 The structure is automatically garbage collected when the Python object 

535 is garbage collected. 

536 """ 

537 key = self._lib.EC_KEY_new_by_curve_name(self._nid) 

538 return _ffi.gc(key, _lib.EC_KEY_free) 

539 

540 

541@deprecated( 

542 "get_elliptic_curves is deprecated. You should use the APIs in " 

543 "cryptography instead." 

544) 

545def get_elliptic_curves() -> set[_EllipticCurve]: 

546 """ 

547 Return a set of objects representing the elliptic curves supported in the 

548 OpenSSL build in use. 

549 

550 The curve objects have a :py:class:`unicode` ``name`` attribute by which 

551 they identify themselves. 

552 

553 The curve objects are useful as values for the argument accepted by 

554 :py:meth:`Context.set_tmp_ecdh` to specify which elliptical curve should be 

555 used for ECDHE key exchange. 

556 """ 

557 return _EllipticCurve._get_elliptic_curves(_lib) 

558 

559 

560@deprecated( 

561 "get_elliptic_curve is deprecated. You should use the APIs in " 

562 "cryptography instead." 

563) 

564def get_elliptic_curve(name: str) -> _EllipticCurve: 

565 """ 

566 Return a single curve object selected by name. 

567 

568 See :py:func:`get_elliptic_curves` for information about curve objects. 

569 

570 :param name: The OpenSSL short name identifying the curve object to 

571 retrieve. 

572 :type name: :py:class:`unicode` 

573 

574 If the named curve is not supported then :py:class:`ValueError` is raised. 

575 """ 

576 for curve in get_elliptic_curves(): 

577 if curve.name == name: 

578 return curve 

579 raise ValueError("unknown curve name", name) 

580 

581 

582@functools.total_ordering 

583class X509Name: 

584 """ 

585 An X.509 Distinguished Name. 

586 

587 :ivar countryName: The country of the entity. 

588 :ivar C: Alias for :py:attr:`countryName`. 

589 

590 :ivar stateOrProvinceName: The state or province of the entity. 

591 :ivar ST: Alias for :py:attr:`stateOrProvinceName`. 

592 

593 :ivar localityName: The locality of the entity. 

594 :ivar L: Alias for :py:attr:`localityName`. 

595 

596 :ivar organizationName: The organization name of the entity. 

597 :ivar O: Alias for :py:attr:`organizationName`. 

598 

599 :ivar organizationalUnitName: The organizational unit of the entity. 

600 :ivar OU: Alias for :py:attr:`organizationalUnitName` 

601 

602 :ivar commonName: The common name of the entity. 

603 :ivar CN: Alias for :py:attr:`commonName`. 

604 

605 :ivar emailAddress: The e-mail address of the entity. 

606 """ 

607 

608 def __init__(self, name: X509Name) -> None: 

609 """ 

610 Create a new X509Name, copying the given X509Name instance. 

611 

612 :param name: The name to copy. 

613 :type name: :py:class:`X509Name` 

614 """ 

615 name = _lib.X509_NAME_dup(name._name) 

616 self._name: Any = _ffi.gc(name, _lib.X509_NAME_free) 

617 

618 def __setattr__(self, name: str, value: Any) -> None: 

619 if name.startswith("_"): 

620 return super().__setattr__(name, value) 

621 

622 # Note: we really do not want str subclasses here, so we do not use 

623 # isinstance. 

624 if type(name) is not str: 

625 raise TypeError( 

626 f"attribute name must be string, not " 

627 f"'{type(value).__name__:.200}'" 

628 ) 

629 

630 nid = _lib.OBJ_txt2nid(_byte_string(name)) 

631 if nid == _lib.NID_undef: 

632 try: 

633 _raise_current_error() 

634 except Error: 

635 pass 

636 raise AttributeError("No such attribute") 

637 

638 # If there's an old entry for this NID, remove it 

639 for i in range(_lib.X509_NAME_entry_count(self._name)): 

640 ent = _lib.X509_NAME_get_entry(self._name, i) 

641 ent_obj = _lib.X509_NAME_ENTRY_get_object(ent) 

642 ent_nid = _lib.OBJ_obj2nid(ent_obj) 

643 if nid == ent_nid: 

644 ent = _lib.X509_NAME_delete_entry(self._name, i) 

645 _lib.X509_NAME_ENTRY_free(ent) 

646 break 

647 

648 if isinstance(value, str): 

649 value = value.encode("utf-8") 

650 

651 add_result = _lib.X509_NAME_add_entry_by_NID( 

652 self._name, nid, _lib.MBSTRING_UTF8, value, len(value), -1, 0 

653 ) 

654 if not add_result: 

655 _raise_current_error() 

656 

657 def __getattr__(self, name: str) -> str | None: 

658 """ 

659 Find attribute. An X509Name object has the following attributes: 

660 countryName (alias C), stateOrProvince (alias ST), locality (alias L), 

661 organization (alias O), organizationalUnit (alias OU), commonName 

662 (alias CN) and more... 

663 """ 

664 nid = _lib.OBJ_txt2nid(_byte_string(name)) 

665 if nid == _lib.NID_undef: 

666 # This is a bit weird. OBJ_txt2nid indicated failure, but it seems 

667 # a lower level function, a2d_ASN1_OBJECT, also feels the need to 

668 # push something onto the error queue. If we don't clean that up 

669 # now, someone else will bump into it later and be quite confused. 

670 # See lp#314814. 

671 try: 

672 _raise_current_error() 

673 except Error: 

674 pass 

675 raise AttributeError("No such attribute") 

676 

677 entry_index = _lib.X509_NAME_get_index_by_NID(self._name, nid, -1) 

678 if entry_index == -1: 

679 return None 

680 

681 entry = _lib.X509_NAME_get_entry(self._name, entry_index) 

682 data = _lib.X509_NAME_ENTRY_get_data(entry) 

683 

684 result_buffer = _ffi.new("unsigned char**") 

685 data_length = _lib.ASN1_STRING_to_UTF8(result_buffer, data) 

686 _openssl_assert(data_length >= 0) 

687 

688 try: 

689 result = _ffi.buffer(result_buffer[0], data_length)[:].decode( 

690 "utf-8" 

691 ) 

692 finally: 

693 # XXX untested 

694 _lib.OPENSSL_free(result_buffer[0]) 

695 return result 

696 

697 def __eq__(self, other: Any) -> bool: 

698 if not isinstance(other, X509Name): 

699 return NotImplemented 

700 

701 return _lib.X509_NAME_cmp(self._name, other._name) == 0 

702 

703 def __lt__(self, other: Any) -> bool: 

704 if not isinstance(other, X509Name): 

705 return NotImplemented 

706 

707 return _lib.X509_NAME_cmp(self._name, other._name) < 0 

708 

709 def __repr__(self) -> str: 

710 """ 

711 String representation of an X509Name 

712 """ 

713 result_buffer = _ffi.new("char[]", 512) 

714 format_result = _lib.X509_NAME_oneline( 

715 self._name, result_buffer, len(result_buffer) 

716 ) 

717 _openssl_assert(format_result != _ffi.NULL) 

718 

719 return "<X509Name object '{}'>".format( 

720 _ffi.string(result_buffer).decode("utf-8"), 

721 ) 

722 

723 def hash(self) -> int: 

724 """ 

725 Return an integer representation of the first four bytes of the 

726 MD5 digest of the DER representation of the name. 

727 

728 This is the Python equivalent of OpenSSL's ``X509_NAME_hash``. 

729 

730 :return: The (integer) hash of this name. 

731 :rtype: :py:class:`int` 

732 """ 

733 return _lib.X509_NAME_hash(self._name) 

734 

735 def der(self) -> bytes: 

736 """ 

737 Return the DER encoding of this name. 

738 

739 :return: The DER encoded form of this name. 

740 :rtype: :py:class:`bytes` 

741 """ 

742 result_buffer = _ffi.new("unsigned char**") 

743 encode_result = _lib.i2d_X509_NAME(self._name, result_buffer) 

744 _openssl_assert(encode_result >= 0) 

745 

746 string_result = _ffi.buffer(result_buffer[0], encode_result)[:] 

747 _lib.OPENSSL_free(result_buffer[0]) 

748 return string_result 

749 

750 def get_components(self) -> list[tuple[bytes, bytes]]: 

751 """ 

752 Returns the components of this name, as a sequence of 2-tuples. 

753 

754 :return: The components of this name. 

755 :rtype: :py:class:`list` of ``name, value`` tuples. 

756 """ 

757 result = [] 

758 for i in range(_lib.X509_NAME_entry_count(self._name)): 

759 ent = _lib.X509_NAME_get_entry(self._name, i) 

760 

761 fname = _lib.X509_NAME_ENTRY_get_object(ent) 

762 fval = _lib.X509_NAME_ENTRY_get_data(ent) 

763 

764 nid = _lib.OBJ_obj2nid(fname) 

765 name = _lib.OBJ_nid2sn(nid) 

766 

767 # ffi.string does not handle strings containing NULL bytes 

768 # (which may have been generated by old, broken software) 

769 value = _ffi.buffer( 

770 _lib.ASN1_STRING_get0_data(fval), _lib.ASN1_STRING_length(fval) 

771 )[:] 

772 result.append((_ffi.string(name), value)) 

773 

774 return result 

775 

776 

777@deprecated( 

778 "CSR support in pyOpenSSL is deprecated. You should use the APIs " 

779 "in cryptography." 

780) 

781class X509Req: 

782 """ 

783 An X.509 certificate signing requests. 

784 

785 .. deprecated:: 24.2.0 

786 Use `cryptography.x509.CertificateSigningRequest` instead. 

787 """ 

788 

789 def __init__(self) -> None: 

790 req = _lib.X509_REQ_new() 

791 self._req = _ffi.gc(req, _lib.X509_REQ_free) 

792 # Default to version 0. 

793 self.set_version(0) 

794 

795 def to_cryptography(self) -> x509.CertificateSigningRequest: 

796 """ 

797 Export as a ``cryptography`` certificate signing request. 

798 

799 :rtype: ``cryptography.x509.CertificateSigningRequest`` 

800 

801 .. versionadded:: 17.1.0 

802 """ 

803 from cryptography.x509 import load_der_x509_csr 

804 

805 der = _dump_certificate_request_internal(FILETYPE_ASN1, self) 

806 

807 return load_der_x509_csr(der) 

808 

809 @classmethod 

810 def from_cryptography( 

811 cls, crypto_req: x509.CertificateSigningRequest 

812 ) -> X509Req: 

813 """ 

814 Construct based on a ``cryptography`` *crypto_req*. 

815 

816 :param crypto_req: A ``cryptography`` X.509 certificate signing request 

817 :type crypto_req: ``cryptography.x509.CertificateSigningRequest`` 

818 

819 :rtype: X509Req 

820 

821 .. versionadded:: 17.1.0 

822 """ 

823 if not isinstance(crypto_req, x509.CertificateSigningRequest): 

824 raise TypeError("Must be a certificate signing request") 

825 

826 from cryptography.hazmat.primitives.serialization import Encoding 

827 

828 der = crypto_req.public_bytes(Encoding.DER) 

829 return _load_certificate_request_internal(FILETYPE_ASN1, der) 

830 

831 def set_pubkey(self, pkey: PKey) -> None: 

832 """ 

833 Set the public key of the certificate signing request. 

834 

835 :param pkey: The public key to use. 

836 :type pkey: :py:class:`PKey` 

837 

838 :return: ``None`` 

839 """ 

840 set_result = _lib.X509_REQ_set_pubkey(self._req, pkey._pkey) 

841 _openssl_assert(set_result == 1) 

842 

843 def get_pubkey(self) -> PKey: 

844 """ 

845 Get the public key of the certificate signing request. 

846 

847 :return: The public key. 

848 :rtype: :py:class:`PKey` 

849 """ 

850 pkey = PKey.__new__(PKey) 

851 pkey._pkey = _lib.X509_REQ_get_pubkey(self._req) 

852 _openssl_assert(pkey._pkey != _ffi.NULL) 

853 pkey._pkey = _ffi.gc(pkey._pkey, _lib.EVP_PKEY_free) 

854 pkey._only_public = True 

855 return pkey 

856 

857 def set_version(self, version: int) -> None: 

858 """ 

859 Set the version subfield (RFC 2986, section 4.1) of the certificate 

860 request. 

861 

862 :param int version: The version number. 

863 :return: ``None`` 

864 """ 

865 if not isinstance(version, int): 

866 raise TypeError("version must be an int") 

867 if version != 0: 

868 raise ValueError( 

869 "Invalid version. The only valid version for X509Req is 0." 

870 ) 

871 set_result = _lib.X509_REQ_set_version(self._req, version) 

872 _openssl_assert(set_result == 1) 

873 

874 def get_version(self) -> int: 

875 """ 

876 Get the version subfield (RFC 2459, section 4.1.2.1) of the certificate 

877 request. 

878 

879 :return: The value of the version subfield. 

880 :rtype: :py:class:`int` 

881 """ 

882 return _lib.X509_REQ_get_version(self._req) 

883 

884 def get_subject(self) -> X509Name: 

885 """ 

886 Return the subject of this certificate signing request. 

887 

888 This creates a new :class:`X509Name` that wraps the underlying subject 

889 name field on the certificate signing request. Modifying it will modify 

890 the underlying signing request, and will have the effect of modifying 

891 any other :class:`X509Name` that refers to this subject. 

892 

893 :return: The subject of this certificate signing request. 

894 :rtype: :class:`X509Name` 

895 """ 

896 name = X509Name.__new__(X509Name) 

897 name._name = _lib.X509_REQ_get_subject_name(self._req) 

898 _openssl_assert(name._name != _ffi.NULL) 

899 

900 # The name is owned by the X509Req structure. As long as the X509Name 

901 # Python object is alive, keep the X509Req Python object alive. 

902 name._owner = self 

903 

904 return name 

905 

906 def sign(self, pkey: PKey, digest: str) -> None: 

907 """ 

908 Sign the certificate signing request with this key and digest type. 

909 

910 :param pkey: The key pair to sign with. 

911 :type pkey: :py:class:`PKey` 

912 :param digest: The name of the message digest to use for the signature, 

913 e.g. :py:data:`"sha256"`. 

914 :type digest: :py:class:`str` 

915 :return: ``None`` 

916 """ 

917 if pkey._only_public: 

918 raise ValueError("Key has only public part") 

919 

920 if not pkey._initialized: 

921 raise ValueError("Key is uninitialized") 

922 

923 digest_obj = _lib.EVP_get_digestbyname(_byte_string(digest)) 

924 if digest_obj == _ffi.NULL: 

925 raise ValueError("No such digest method") 

926 

927 sign_result = _lib.X509_REQ_sign(self._req, pkey._pkey, digest_obj) 

928 _openssl_assert(sign_result > 0) 

929 

930 def verify(self, pkey: PKey) -> bool: 

931 """ 

932 Verifies the signature on this certificate signing request. 

933 

934 :param PKey key: A public key. 

935 

936 :return: ``True`` if the signature is correct. 

937 :rtype: bool 

938 

939 :raises OpenSSL.crypto.Error: If the signature is invalid or there is a 

940 problem verifying the signature. 

941 """ 

942 if not isinstance(pkey, PKey): 

943 raise TypeError("pkey must be a PKey instance") 

944 

945 result = _lib.X509_REQ_verify(self._req, pkey._pkey) 

946 if result <= 0: 

947 _raise_current_error() 

948 

949 return result 

950 

951 

952class X509: 

953 """ 

954 An X.509 certificate. 

955 """ 

956 

957 def __init__(self) -> None: 

958 x509 = _lib.X509_new() 

959 _openssl_assert(x509 != _ffi.NULL) 

960 self._x509 = _ffi.gc(x509, _lib.X509_free) 

961 

962 self._issuer_invalidator = _X509NameInvalidator() 

963 self._subject_invalidator = _X509NameInvalidator() 

964 

965 @classmethod 

966 def _from_raw_x509_ptr(cls, x509: Any) -> X509: 

967 cert = cls.__new__(cls) 

968 cert._x509 = _ffi.gc(x509, _lib.X509_free) 

969 cert._issuer_invalidator = _X509NameInvalidator() 

970 cert._subject_invalidator = _X509NameInvalidator() 

971 return cert 

972 

973 def to_cryptography(self) -> x509.Certificate: 

974 """ 

975 Export as a ``cryptography`` certificate. 

976 

977 :rtype: ``cryptography.x509.Certificate`` 

978 

979 .. versionadded:: 17.1.0 

980 """ 

981 from cryptography.x509 import load_der_x509_certificate 

982 

983 der = dump_certificate(FILETYPE_ASN1, self) 

984 return load_der_x509_certificate(der) 

985 

986 @classmethod 

987 def from_cryptography(cls, crypto_cert: x509.Certificate) -> X509: 

988 """ 

989 Construct based on a ``cryptography`` *crypto_cert*. 

990 

991 :param crypto_key: A ``cryptography`` X.509 certificate. 

992 :type crypto_key: ``cryptography.x509.Certificate`` 

993 

994 :rtype: X509 

995 

996 .. versionadded:: 17.1.0 

997 """ 

998 if not isinstance(crypto_cert, x509.Certificate): 

999 raise TypeError("Must be a certificate") 

1000 

1001 from cryptography.hazmat.primitives.serialization import Encoding 

1002 

1003 der = crypto_cert.public_bytes(Encoding.DER) 

1004 return load_certificate(FILETYPE_ASN1, der) 

1005 

1006 def set_version(self, version: int) -> None: 

1007 """ 

1008 Set the version number of the certificate. Note that the 

1009 version value is zero-based, eg. a value of 0 is V1. 

1010 

1011 :param version: The version number of the certificate. 

1012 :type version: :py:class:`int` 

1013 

1014 :return: ``None`` 

1015 """ 

1016 if not isinstance(version, int): 

1017 raise TypeError("version must be an integer") 

1018 

1019 _openssl_assert(_lib.X509_set_version(self._x509, version) == 1) 

1020 

1021 def get_version(self) -> int: 

1022 """ 

1023 Return the version number of the certificate. 

1024 

1025 :return: The version number of the certificate. 

1026 :rtype: :py:class:`int` 

1027 """ 

1028 return _lib.X509_get_version(self._x509) 

1029 

1030 def get_pubkey(self) -> PKey: 

1031 """ 

1032 Get the public key of the certificate. 

1033 

1034 :return: The public key. 

1035 :rtype: :py:class:`PKey` 

1036 """ 

1037 pkey = PKey.__new__(PKey) 

1038 pkey._pkey = _lib.X509_get_pubkey(self._x509) 

1039 if pkey._pkey == _ffi.NULL: 

1040 _raise_current_error() 

1041 pkey._pkey = _ffi.gc(pkey._pkey, _lib.EVP_PKEY_free) 

1042 pkey._only_public = True 

1043 return pkey 

1044 

1045 def set_pubkey(self, pkey: PKey) -> None: 

1046 """ 

1047 Set the public key of the certificate. 

1048 

1049 :param pkey: The public key. 

1050 :type pkey: :py:class:`PKey` 

1051 

1052 :return: :py:data:`None` 

1053 """ 

1054 if not isinstance(pkey, PKey): 

1055 raise TypeError("pkey must be a PKey instance") 

1056 

1057 set_result = _lib.X509_set_pubkey(self._x509, pkey._pkey) 

1058 _openssl_assert(set_result == 1) 

1059 

1060 def sign(self, pkey: PKey, digest: str) -> None: 

1061 """ 

1062 Sign the certificate with this key and digest type. 

1063 

1064 :param pkey: The key to sign with. 

1065 :type pkey: :py:class:`PKey` 

1066 

1067 :param digest: The name of the message digest to use. 

1068 :type digest: :py:class:`str` 

1069 

1070 :return: :py:data:`None` 

1071 """ 

1072 if not isinstance(pkey, PKey): 

1073 raise TypeError("pkey must be a PKey instance") 

1074 

1075 if pkey._only_public: 

1076 raise ValueError("Key only has public part") 

1077 

1078 if not pkey._initialized: 

1079 raise ValueError("Key is uninitialized") 

1080 

1081 evp_md = _lib.EVP_get_digestbyname(_byte_string(digest)) 

1082 if evp_md == _ffi.NULL: 

1083 raise ValueError("No such digest method") 

1084 

1085 sign_result = _lib.X509_sign(self._x509, pkey._pkey, evp_md) 

1086 _openssl_assert(sign_result > 0) 

1087 

1088 def get_signature_algorithm(self) -> bytes: 

1089 """ 

1090 Return the signature algorithm used in the certificate. 

1091 

1092 :return: The name of the algorithm. 

1093 :rtype: :py:class:`bytes` 

1094 

1095 :raises ValueError: If the signature algorithm is undefined. 

1096 

1097 .. versionadded:: 0.13 

1098 """ 

1099 sig_alg = _lib.X509_get0_tbs_sigalg(self._x509) 

1100 alg = _ffi.new("ASN1_OBJECT **") 

1101 _lib.X509_ALGOR_get0(alg, _ffi.NULL, _ffi.NULL, sig_alg) 

1102 nid = _lib.OBJ_obj2nid(alg[0]) 

1103 if nid == _lib.NID_undef: 

1104 raise ValueError("Undefined signature algorithm") 

1105 return _ffi.string(_lib.OBJ_nid2ln(nid)) 

1106 

1107 def digest(self, digest_name: str) -> bytes: 

1108 """ 

1109 Return the digest of the X509 object. 

1110 

1111 :param digest_name: The name of the digest algorithm to use. 

1112 :type digest_name: :py:class:`str` 

1113 

1114 :return: The digest of the object, formatted as 

1115 :py:const:`b":"`-delimited hex pairs. 

1116 :rtype: :py:class:`bytes` 

1117 """ 

1118 digest = _lib.EVP_get_digestbyname(_byte_string(digest_name)) 

1119 if digest == _ffi.NULL: 

1120 raise ValueError("No such digest method") 

1121 

1122 result_buffer = _ffi.new("unsigned char[]", _lib.EVP_MAX_MD_SIZE) 

1123 result_length = _ffi.new("unsigned int[]", 1) 

1124 result_length[0] = len(result_buffer) 

1125 

1126 digest_result = _lib.X509_digest( 

1127 self._x509, digest, result_buffer, result_length 

1128 ) 

1129 _openssl_assert(digest_result == 1) 

1130 

1131 return b":".join( 

1132 [ 

1133 b16encode(ch).upper() 

1134 for ch in _ffi.buffer(result_buffer, result_length[0]) 

1135 ] 

1136 ) 

1137 

1138 def subject_name_hash(self) -> int: 

1139 """ 

1140 Return the hash of the X509 subject. 

1141 

1142 :return: The hash of the subject. 

1143 :rtype: :py:class:`int` 

1144 """ 

1145 return _lib.X509_subject_name_hash(self._x509) 

1146 

1147 def set_serial_number(self, serial: int) -> None: 

1148 """ 

1149 Set the serial number of the certificate. 

1150 

1151 :param serial: The new serial number. 

1152 :type serial: :py:class:`int` 

1153 

1154 :return: :py:data`None` 

1155 """ 

1156 if not isinstance(serial, int): 

1157 raise TypeError("serial must be an integer") 

1158 

1159 hex_serial = hex(serial)[2:] 

1160 hex_serial_bytes = hex_serial.encode("ascii") 

1161 

1162 bignum_serial = _ffi.new("BIGNUM**") 

1163 

1164 # BN_hex2bn stores the result in &bignum. 

1165 result = _lib.BN_hex2bn(bignum_serial, hex_serial_bytes) 

1166 _openssl_assert(result != _ffi.NULL) 

1167 

1168 asn1_serial = _lib.BN_to_ASN1_INTEGER(bignum_serial[0], _ffi.NULL) 

1169 _lib.BN_free(bignum_serial[0]) 

1170 _openssl_assert(asn1_serial != _ffi.NULL) 

1171 asn1_serial = _ffi.gc(asn1_serial, _lib.ASN1_INTEGER_free) 

1172 set_result = _lib.X509_set_serialNumber(self._x509, asn1_serial) 

1173 _openssl_assert(set_result == 1) 

1174 

1175 def get_serial_number(self) -> int: 

1176 """ 

1177 Return the serial number of this certificate. 

1178 

1179 :return: The serial number. 

1180 :rtype: int 

1181 """ 

1182 asn1_serial = _lib.X509_get_serialNumber(self._x509) 

1183 bignum_serial = _lib.ASN1_INTEGER_to_BN(asn1_serial, _ffi.NULL) 

1184 try: 

1185 hex_serial = _lib.BN_bn2hex(bignum_serial) 

1186 try: 

1187 hexstring_serial = _ffi.string(hex_serial) 

1188 serial = int(hexstring_serial, 16) 

1189 return serial 

1190 finally: 

1191 _lib.OPENSSL_free(hex_serial) 

1192 finally: 

1193 _lib.BN_free(bignum_serial) 

1194 

1195 def gmtime_adj_notAfter(self, amount: int) -> None: 

1196 """ 

1197 Adjust the time stamp on which the certificate stops being valid. 

1198 

1199 :param int amount: The number of seconds by which to adjust the 

1200 timestamp. 

1201 :return: ``None`` 

1202 """ 

1203 if not isinstance(amount, int): 

1204 raise TypeError("amount must be an integer") 

1205 

1206 notAfter = _lib.X509_getm_notAfter(self._x509) 

1207 _lib.X509_gmtime_adj(notAfter, amount) 

1208 

1209 def gmtime_adj_notBefore(self, amount: int) -> None: 

1210 """ 

1211 Adjust the timestamp on which the certificate starts being valid. 

1212 

1213 :param amount: The number of seconds by which to adjust the timestamp. 

1214 :return: ``None`` 

1215 """ 

1216 if not isinstance(amount, int): 

1217 raise TypeError("amount must be an integer") 

1218 

1219 notBefore = _lib.X509_getm_notBefore(self._x509) 

1220 _lib.X509_gmtime_adj(notBefore, amount) 

1221 

1222 def has_expired(self) -> bool: 

1223 """ 

1224 Check whether the certificate has expired. 

1225 

1226 :return: ``True`` if the certificate has expired, ``False`` otherwise. 

1227 :rtype: bool 

1228 """ 

1229 time_bytes = self.get_notAfter() 

1230 if time_bytes is None: 

1231 raise ValueError("Unable to determine notAfter") 

1232 time_string = time_bytes.decode("utf-8") 

1233 not_after = datetime.datetime.strptime(time_string, "%Y%m%d%H%M%SZ") 

1234 

1235 UTC = datetime.timezone.utc 

1236 utcnow = datetime.datetime.now(UTC).replace(tzinfo=None) 

1237 return not_after < utcnow 

1238 

1239 def _get_boundary_time(self, which: Any) -> bytes | None: 

1240 return _get_asn1_time(which(self._x509)) 

1241 

1242 def get_notBefore(self) -> bytes | None: 

1243 """ 

1244 Get the timestamp at which the certificate starts being valid. 

1245 

1246 The timestamp is formatted as an ASN.1 TIME:: 

1247 

1248 YYYYMMDDhhmmssZ 

1249 

1250 :return: A timestamp string, or ``None`` if there is none. 

1251 :rtype: bytes or NoneType 

1252 """ 

1253 return self._get_boundary_time(_lib.X509_getm_notBefore) 

1254 

1255 def _set_boundary_time( 

1256 self, which: Callable[..., Any], when: bytes 

1257 ) -> None: 

1258 return _set_asn1_time(which(self._x509), when) 

1259 

1260 def set_notBefore(self, when: bytes) -> None: 

1261 """ 

1262 Set the timestamp at which the certificate starts being valid. 

1263 

1264 The timestamp is formatted as an ASN.1 TIME:: 

1265 

1266 YYYYMMDDhhmmssZ 

1267 

1268 :param bytes when: A timestamp string. 

1269 :return: ``None`` 

1270 """ 

1271 return self._set_boundary_time(_lib.X509_getm_notBefore, when) 

1272 

1273 def get_notAfter(self) -> bytes | None: 

1274 """ 

1275 Get the timestamp at which the certificate stops being valid. 

1276 

1277 The timestamp is formatted as an ASN.1 TIME:: 

1278 

1279 YYYYMMDDhhmmssZ 

1280 

1281 :return: A timestamp string, or ``None`` if there is none. 

1282 :rtype: bytes or NoneType 

1283 """ 

1284 return self._get_boundary_time(_lib.X509_getm_notAfter) 

1285 

1286 def set_notAfter(self, when: bytes) -> None: 

1287 """ 

1288 Set the timestamp at which the certificate stops being valid. 

1289 

1290 The timestamp is formatted as an ASN.1 TIME:: 

1291 

1292 YYYYMMDDhhmmssZ 

1293 

1294 :param bytes when: A timestamp string. 

1295 :return: ``None`` 

1296 """ 

1297 return self._set_boundary_time(_lib.X509_getm_notAfter, when) 

1298 

1299 def _get_name(self, which: Any) -> X509Name: 

1300 name = X509Name.__new__(X509Name) 

1301 name._name = which(self._x509) 

1302 _openssl_assert(name._name != _ffi.NULL) 

1303 

1304 # The name is owned by the X509 structure. As long as the X509Name 

1305 # Python object is alive, keep the X509 Python object alive. 

1306 name._owner = self 

1307 

1308 return name 

1309 

1310 def _set_name(self, which: Any, name: X509Name) -> None: 

1311 if not isinstance(name, X509Name): 

1312 raise TypeError("name must be an X509Name") 

1313 set_result = which(self._x509, name._name) 

1314 _openssl_assert(set_result == 1) 

1315 

1316 def get_issuer(self) -> X509Name: 

1317 """ 

1318 Return the issuer of this certificate. 

1319 

1320 This creates a new :class:`X509Name` that wraps the underlying issuer 

1321 name field on the certificate. Modifying it will modify the underlying 

1322 certificate, and will have the effect of modifying any other 

1323 :class:`X509Name` that refers to this issuer. 

1324 

1325 :return: The issuer of this certificate. 

1326 :rtype: :class:`X509Name` 

1327 """ 

1328 name = self._get_name(_lib.X509_get_issuer_name) 

1329 self._issuer_invalidator.add(name) 

1330 return name 

1331 

1332 def set_issuer(self, issuer: X509Name) -> None: 

1333 """ 

1334 Set the issuer of this certificate. 

1335 

1336 :param issuer: The issuer. 

1337 :type issuer: :py:class:`X509Name` 

1338 

1339 :return: ``None`` 

1340 """ 

1341 self._set_name(_lib.X509_set_issuer_name, issuer) 

1342 self._issuer_invalidator.clear() 

1343 

1344 def get_subject(self) -> X509Name: 

1345 """ 

1346 Return the subject of this certificate. 

1347 

1348 This creates a new :class:`X509Name` that wraps the underlying subject 

1349 name field on the certificate. Modifying it will modify the underlying 

1350 certificate, and will have the effect of modifying any other 

1351 :class:`X509Name` that refers to this subject. 

1352 

1353 :return: The subject of this certificate. 

1354 :rtype: :class:`X509Name` 

1355 """ 

1356 name = self._get_name(_lib.X509_get_subject_name) 

1357 self._subject_invalidator.add(name) 

1358 return name 

1359 

1360 def set_subject(self, subject: X509Name) -> None: 

1361 """ 

1362 Set the subject of this certificate. 

1363 

1364 :param subject: The subject. 

1365 :type subject: :py:class:`X509Name` 

1366 

1367 :return: ``None`` 

1368 """ 

1369 self._set_name(_lib.X509_set_subject_name, subject) 

1370 self._subject_invalidator.clear() 

1371 

1372 def get_extension_count(self) -> int: 

1373 """ 

1374 Get the number of extensions on this certificate. 

1375 

1376 :return: The number of extensions. 

1377 :rtype: :py:class:`int` 

1378 

1379 .. versionadded:: 0.12 

1380 """ 

1381 return _lib.X509_get_ext_count(self._x509) 

1382 

1383 

1384class X509StoreFlags: 

1385 """ 

1386 Flags for X509 verification, used to change the behavior of 

1387 :class:`X509Store`. 

1388 

1389 See `OpenSSL Verification Flags`_ for details. 

1390 

1391 .. _OpenSSL Verification Flags: 

1392 https://www.openssl.org/docs/manmaster/man3/X509_VERIFY_PARAM_set_flags.html 

1393 """ 

1394 

1395 CRL_CHECK: int = _lib.X509_V_FLAG_CRL_CHECK 

1396 CRL_CHECK_ALL: int = _lib.X509_V_FLAG_CRL_CHECK_ALL 

1397 IGNORE_CRITICAL: int = _lib.X509_V_FLAG_IGNORE_CRITICAL 

1398 X509_STRICT: int = _lib.X509_V_FLAG_X509_STRICT 

1399 ALLOW_PROXY_CERTS: int = _lib.X509_V_FLAG_ALLOW_PROXY_CERTS 

1400 POLICY_CHECK: int = _lib.X509_V_FLAG_POLICY_CHECK 

1401 EXPLICIT_POLICY: int = _lib.X509_V_FLAG_EXPLICIT_POLICY 

1402 INHIBIT_MAP: int = _lib.X509_V_FLAG_INHIBIT_MAP 

1403 CHECK_SS_SIGNATURE: int = _lib.X509_V_FLAG_CHECK_SS_SIGNATURE 

1404 PARTIAL_CHAIN: int = _lib.X509_V_FLAG_PARTIAL_CHAIN 

1405 

1406 

1407class X509Store: 

1408 """ 

1409 An X.509 store. 

1410 

1411 An X.509 store is used to describe a context in which to verify a 

1412 certificate. A description of a context may include a set of certificates 

1413 to trust, a set of certificate revocation lists, verification flags and 

1414 more. 

1415 

1416 An X.509 store, being only a description, cannot be used by itself to 

1417 verify a certificate. To carry out the actual verification process, see 

1418 :class:`X509StoreContext`. 

1419 """ 

1420 

1421 def __init__(self) -> None: 

1422 store = _lib.X509_STORE_new() 

1423 self._store = _ffi.gc(store, _lib.X509_STORE_free) 

1424 

1425 def add_cert(self, cert: X509) -> None: 

1426 """ 

1427 Adds a trusted certificate to this store. 

1428 

1429 Adding a certificate with this method adds this certificate as a 

1430 *trusted* certificate. 

1431 

1432 :param X509 cert: The certificate to add to this store. 

1433 

1434 :raises TypeError: If the certificate is not an :class:`X509`. 

1435 

1436 :raises OpenSSL.crypto.Error: If OpenSSL was unhappy with your 

1437 certificate. 

1438 

1439 :return: ``None`` if the certificate was added successfully. 

1440 """ 

1441 if not isinstance(cert, X509): 

1442 raise TypeError() 

1443 

1444 res = _lib.X509_STORE_add_cert(self._store, cert._x509) 

1445 _openssl_assert(res == 1) 

1446 

1447 def add_crl(self, crl: x509.CertificateRevocationList) -> None: 

1448 """ 

1449 Add a certificate revocation list to this store. 

1450 

1451 The certificate revocation lists added to a store will only be used if 

1452 the associated flags are configured to check certificate revocation 

1453 lists. 

1454 

1455 .. versionadded:: 16.1.0 

1456 

1457 :param crl: The certificate revocation list to add to this store. 

1458 :type crl: ``cryptography.x509.CertificateRevocationList`` 

1459 :return: ``None`` if the certificate revocation list was added 

1460 successfully. 

1461 """ 

1462 if isinstance(crl, x509.CertificateRevocationList): 

1463 from cryptography.hazmat.primitives.serialization import Encoding 

1464 

1465 bio = _new_mem_buf(crl.public_bytes(Encoding.DER)) 

1466 openssl_crl = _lib.d2i_X509_CRL_bio(bio, _ffi.NULL) 

1467 _openssl_assert(openssl_crl != _ffi.NULL) 

1468 crl = _ffi.gc(openssl_crl, _lib.X509_CRL_free) 

1469 else: 

1470 raise TypeError( 

1471 "CRL must be of type " 

1472 "cryptography.x509.CertificateRevocationList" 

1473 ) 

1474 

1475 _openssl_assert(_lib.X509_STORE_add_crl(self._store, crl) != 0) 

1476 

1477 def set_flags(self, flags: int) -> None: 

1478 """ 

1479 Set verification flags to this store. 

1480 

1481 Verification flags can be combined by oring them together. 

1482 

1483 .. note:: 

1484 

1485 Setting a verification flag sometimes requires clients to add 

1486 additional information to the store, otherwise a suitable error will 

1487 be raised. 

1488 

1489 For example, in setting flags to enable CRL checking a 

1490 suitable CRL must be added to the store otherwise an error will be 

1491 raised. 

1492 

1493 .. versionadded:: 16.1.0 

1494 

1495 :param int flags: The verification flags to set on this store. 

1496 See :class:`X509StoreFlags` for available constants. 

1497 :return: ``None`` if the verification flags were successfully set. 

1498 """ 

1499 _openssl_assert(_lib.X509_STORE_set_flags(self._store, flags) != 0) 

1500 

1501 def set_time(self, vfy_time: datetime.datetime) -> None: 

1502 """ 

1503 Set the time against which the certificates are verified. 

1504 

1505 Normally the current time is used. 

1506 

1507 .. note:: 

1508 

1509 For example, you can determine if a certificate was valid at a given 

1510 time. 

1511 

1512 .. versionadded:: 17.0.0 

1513 

1514 :param datetime vfy_time: The verification time to set on this store. 

1515 :return: ``None`` if the verification time was successfully set. 

1516 """ 

1517 param = _lib.X509_VERIFY_PARAM_new() 

1518 param = _ffi.gc(param, _lib.X509_VERIFY_PARAM_free) 

1519 

1520 _lib.X509_VERIFY_PARAM_set_time( 

1521 param, calendar.timegm(vfy_time.timetuple()) 

1522 ) 

1523 _openssl_assert(_lib.X509_STORE_set1_param(self._store, param) != 0) 

1524 

1525 def load_locations( 

1526 self, 

1527 cafile: StrOrBytesPath | None, 

1528 capath: StrOrBytesPath | None = None, 

1529 ) -> None: 

1530 """ 

1531 Let X509Store know where we can find trusted certificates for the 

1532 certificate chain. Note that the certificates have to be in PEM 

1533 format. 

1534 

1535 If *capath* is passed, it must be a directory prepared using the 

1536 ``c_rehash`` tool included with OpenSSL. Either, but not both, of 

1537 *cafile* or *capath* may be ``None``. 

1538 

1539 .. note:: 

1540 

1541 Both *cafile* and *capath* may be set simultaneously. 

1542 

1543 Call this method multiple times to add more than one location. 

1544 For example, CA certificates, and certificate revocation list bundles 

1545 may be passed in *cafile* in subsequent calls to this method. 

1546 

1547 .. versionadded:: 20.0 

1548 

1549 :param cafile: In which file we can find the certificates (``bytes`` or 

1550 ``unicode``). 

1551 :param capath: In which directory we can find the certificates 

1552 (``bytes`` or ``unicode``). 

1553 

1554 :return: ``None`` if the locations were set successfully. 

1555 

1556 :raises OpenSSL.crypto.Error: If both *cafile* and *capath* is ``None`` 

1557 or the locations could not be set for any reason. 

1558 

1559 """ 

1560 if cafile is None: 

1561 cafile = _ffi.NULL 

1562 else: 

1563 cafile = _path_bytes(cafile) 

1564 

1565 if capath is None: 

1566 capath = _ffi.NULL 

1567 else: 

1568 capath = _path_bytes(capath) 

1569 

1570 load_result = _lib.X509_STORE_load_locations( 

1571 self._store, cafile, capath 

1572 ) 

1573 if not load_result: 

1574 _raise_current_error() 

1575 

1576 

1577class X509StoreContextError(Exception): 

1578 """ 

1579 An exception raised when an error occurred while verifying a certificate 

1580 using `OpenSSL.X509StoreContext.verify_certificate`. 

1581 

1582 :ivar certificate: The certificate which caused verificate failure. 

1583 :type certificate: :class:`X509` 

1584 """ 

1585 

1586 def __init__( 

1587 self, message: str, errors: list[Any], certificate: X509 

1588 ) -> None: 

1589 super().__init__(message) 

1590 self.errors = errors 

1591 self.certificate = certificate 

1592 

1593 

1594class X509StoreContext: 

1595 """ 

1596 An X.509 store context. 

1597 

1598 An X.509 store context is used to carry out the actual verification process 

1599 of a certificate in a described context. For describing such a context, see 

1600 :class:`X509Store`. 

1601 

1602 :param X509Store store: The certificates which will be trusted for the 

1603 purposes of any verifications. 

1604 :param X509 certificate: The certificate to be verified. 

1605 :param chain: List of untrusted certificates that may be used for building 

1606 the certificate chain. May be ``None``. 

1607 :type chain: :class:`list` of :class:`X509` 

1608 """ 

1609 

1610 def __init__( 

1611 self, 

1612 store: X509Store, 

1613 certificate: X509, 

1614 chain: Sequence[X509] | None = None, 

1615 ) -> None: 

1616 self._store = store 

1617 self._cert = certificate 

1618 self._chain = self._build_certificate_stack(chain) 

1619 

1620 @staticmethod 

1621 def _build_certificate_stack( 

1622 certificates: Sequence[X509] | None, 

1623 ) -> None: 

1624 def cleanup(s: Any) -> None: 

1625 # Equivalent to sk_X509_pop_free, but we don't 

1626 # currently have a CFFI binding for that available 

1627 for i in range(_lib.sk_X509_num(s)): 

1628 x = _lib.sk_X509_value(s, i) 

1629 _lib.X509_free(x) 

1630 _lib.sk_X509_free(s) 

1631 

1632 if certificates is None or len(certificates) == 0: 

1633 return _ffi.NULL 

1634 

1635 stack = _lib.sk_X509_new_null() 

1636 _openssl_assert(stack != _ffi.NULL) 

1637 stack = _ffi.gc(stack, cleanup) 

1638 

1639 for cert in certificates: 

1640 if not isinstance(cert, X509): 

1641 raise TypeError("One of the elements is not an X509 instance") 

1642 

1643 _openssl_assert(_lib.X509_up_ref(cert._x509) > 0) 

1644 if _lib.sk_X509_push(stack, cert._x509) <= 0: 

1645 _lib.X509_free(cert._x509) 

1646 _raise_current_error() 

1647 

1648 return stack 

1649 

1650 @staticmethod 

1651 def _exception_from_context(store_ctx: Any) -> X509StoreContextError: 

1652 """ 

1653 Convert an OpenSSL native context error failure into a Python 

1654 exception. 

1655 

1656 When a call to native OpenSSL X509_verify_cert fails, additional 

1657 information about the failure can be obtained from the store context. 

1658 """ 

1659 message = _ffi.string( 

1660 _lib.X509_verify_cert_error_string( 

1661 _lib.X509_STORE_CTX_get_error(store_ctx) 

1662 ) 

1663 ).decode("utf-8") 

1664 errors = [ 

1665 _lib.X509_STORE_CTX_get_error(store_ctx), 

1666 _lib.X509_STORE_CTX_get_error_depth(store_ctx), 

1667 message, 

1668 ] 

1669 # A context error should always be associated with a certificate, so we 

1670 # expect this call to never return :class:`None`. 

1671 _x509 = _lib.X509_STORE_CTX_get_current_cert(store_ctx) 

1672 _cert = _lib.X509_dup(_x509) 

1673 pycert = X509._from_raw_x509_ptr(_cert) 

1674 return X509StoreContextError(message, errors, pycert) 

1675 

1676 def _verify_certificate(self) -> Any: 

1677 """ 

1678 Verifies the certificate and runs an X509_STORE_CTX containing the 

1679 results. 

1680 

1681 :raises X509StoreContextError: If an error occurred when validating a 

1682 certificate in the context. Sets ``certificate`` attribute to 

1683 indicate which certificate caused the error. 

1684 """ 

1685 store_ctx = _lib.X509_STORE_CTX_new() 

1686 _openssl_assert(store_ctx != _ffi.NULL) 

1687 store_ctx = _ffi.gc(store_ctx, _lib.X509_STORE_CTX_free) 

1688 

1689 ret = _lib.X509_STORE_CTX_init( 

1690 store_ctx, self._store._store, self._cert._x509, self._chain 

1691 ) 

1692 _openssl_assert(ret == 1) 

1693 

1694 ret = _lib.X509_verify_cert(store_ctx) 

1695 if ret <= 0: 

1696 raise self._exception_from_context(store_ctx) 

1697 

1698 return store_ctx 

1699 

1700 def set_store(self, store: X509Store) -> None: 

1701 """ 

1702 Set the context's X.509 store. 

1703 

1704 .. versionadded:: 0.15 

1705 

1706 :param X509Store store: The store description which will be used for 

1707 the purposes of any *future* verifications. 

1708 """ 

1709 self._store = store 

1710 

1711 def verify_certificate(self) -> None: 

1712 """ 

1713 Verify a certificate in a context. 

1714 

1715 .. versionadded:: 0.15 

1716 

1717 :raises X509StoreContextError: If an error occurred when validating a 

1718 certificate in the context. Sets ``certificate`` attribute to 

1719 indicate which certificate caused the error. 

1720 """ 

1721 self._verify_certificate() 

1722 

1723 def get_verified_chain(self) -> list[X509]: 

1724 """ 

1725 Verify a certificate in a context and return the complete validated 

1726 chain. 

1727 

1728 :raises X509StoreContextError: If an error occurred when validating a 

1729 certificate in the context. Sets ``certificate`` attribute to 

1730 indicate which certificate caused the error. 

1731 

1732 .. versionadded:: 20.0 

1733 """ 

1734 store_ctx = self._verify_certificate() 

1735 

1736 # Note: X509_STORE_CTX_get1_chain returns a deep copy of the chain. 

1737 cert_stack = _lib.X509_STORE_CTX_get1_chain(store_ctx) 

1738 _openssl_assert(cert_stack != _ffi.NULL) 

1739 

1740 result = [] 

1741 for i in range(_lib.sk_X509_num(cert_stack)): 

1742 cert = _lib.sk_X509_value(cert_stack, i) 

1743 _openssl_assert(cert != _ffi.NULL) 

1744 pycert = X509._from_raw_x509_ptr(cert) 

1745 result.append(pycert) 

1746 

1747 # Free the stack but not the members which are freed by the X509 class. 

1748 _lib.sk_X509_free(cert_stack) 

1749 return result 

1750 

1751 

1752def load_certificate(type: int, buffer: bytes) -> X509: 

1753 """ 

1754 Load a certificate (X509) from the string *buffer* encoded with the 

1755 type *type*. 

1756 

1757 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1) 

1758 

1759 :param bytes buffer: The buffer the certificate is stored in 

1760 

1761 :return: The X509 object 

1762 """ 

1763 if isinstance(buffer, str): 

1764 buffer = buffer.encode("ascii") 

1765 

1766 bio = _new_mem_buf(buffer) 

1767 

1768 if type == FILETYPE_PEM: 

1769 x509 = _lib.PEM_read_bio_X509(bio, _ffi.NULL, _ffi.NULL, _ffi.NULL) 

1770 elif type == FILETYPE_ASN1: 

1771 x509 = _lib.d2i_X509_bio(bio, _ffi.NULL) 

1772 else: 

1773 raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1") 

1774 

1775 if x509 == _ffi.NULL: 

1776 _raise_current_error() 

1777 

1778 return X509._from_raw_x509_ptr(x509) 

1779 

1780 

1781def dump_certificate(type: int, cert: X509) -> bytes: 

1782 """ 

1783 Dump the certificate *cert* into a buffer string encoded with the type 

1784 *type*. 

1785 

1786 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1, or 

1787 FILETYPE_TEXT) 

1788 :param cert: The certificate to dump 

1789 :return: The buffer with the dumped certificate in 

1790 """ 

1791 bio = _new_mem_buf() 

1792 

1793 if type == FILETYPE_PEM: 

1794 result_code = _lib.PEM_write_bio_X509(bio, cert._x509) 

1795 elif type == FILETYPE_ASN1: 

1796 result_code = _lib.i2d_X509_bio(bio, cert._x509) 

1797 elif type == FILETYPE_TEXT: 

1798 result_code = _lib.X509_print_ex(bio, cert._x509, 0, 0) 

1799 else: 

1800 raise ValueError( 

1801 "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or " 

1802 "FILETYPE_TEXT" 

1803 ) 

1804 

1805 _openssl_assert(result_code == 1) 

1806 return _bio_to_string(bio) 

1807 

1808 

1809def dump_publickey(type: int, pkey: PKey) -> bytes: 

1810 """ 

1811 Dump a public key to a buffer. 

1812 

1813 :param type: The file type (one of :data:`FILETYPE_PEM` or 

1814 :data:`FILETYPE_ASN1`). 

1815 :param PKey pkey: The public key to dump 

1816 :return: The buffer with the dumped key in it. 

1817 :rtype: bytes 

1818 """ 

1819 bio = _new_mem_buf() 

1820 if type == FILETYPE_PEM: 

1821 write_bio = _lib.PEM_write_bio_PUBKEY 

1822 elif type == FILETYPE_ASN1: 

1823 write_bio = _lib.i2d_PUBKEY_bio 

1824 else: 

1825 raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1") 

1826 

1827 result_code = write_bio(bio, pkey._pkey) 

1828 if result_code != 1: # pragma: no cover 

1829 _raise_current_error() 

1830 

1831 return _bio_to_string(bio) 

1832 

1833 

1834def dump_privatekey( 

1835 type: int, 

1836 pkey: PKey, 

1837 cipher: str | None = None, 

1838 passphrase: PassphraseCallableT | None = None, 

1839) -> bytes: 

1840 """ 

1841 Dump the private key *pkey* into a buffer string encoded with the type 

1842 *type*. Optionally (if *type* is :const:`FILETYPE_PEM`) encrypting it 

1843 using *cipher* and *passphrase*. 

1844 

1845 :param type: The file type (one of :const:`FILETYPE_PEM`, 

1846 :const:`FILETYPE_ASN1`, or :const:`FILETYPE_TEXT`) 

1847 :param PKey pkey: The PKey to dump 

1848 :param cipher: (optional) if encrypted PEM format, the cipher to use 

1849 :param passphrase: (optional) if encrypted PEM format, this can be either 

1850 the passphrase to use, or a callback for providing the passphrase. 

1851 

1852 :return: The buffer with the dumped key in 

1853 :rtype: bytes 

1854 """ 

1855 bio = _new_mem_buf() 

1856 

1857 if not isinstance(pkey, PKey): 

1858 raise TypeError("pkey must be a PKey") 

1859 

1860 if cipher is not None: 

1861 if passphrase is None: 

1862 raise TypeError( 

1863 "if a value is given for cipher " 

1864 "one must also be given for passphrase" 

1865 ) 

1866 cipher_obj = _lib.EVP_get_cipherbyname(_byte_string(cipher)) 

1867 if cipher_obj == _ffi.NULL: 

1868 raise ValueError("Invalid cipher name") 

1869 else: 

1870 cipher_obj = _ffi.NULL 

1871 

1872 helper = _PassphraseHelper(type, passphrase) 

1873 if type == FILETYPE_PEM: 

1874 result_code = _lib.PEM_write_bio_PrivateKey( 

1875 bio, 

1876 pkey._pkey, 

1877 cipher_obj, 

1878 _ffi.NULL, 

1879 0, 

1880 helper.callback, 

1881 helper.callback_args, 

1882 ) 

1883 helper.raise_if_problem() 

1884 elif type == FILETYPE_ASN1: 

1885 result_code = _lib.i2d_PrivateKey_bio(bio, pkey._pkey) 

1886 elif type == FILETYPE_TEXT: 

1887 if _lib.EVP_PKEY_id(pkey._pkey) != _lib.EVP_PKEY_RSA: 

1888 raise TypeError("Only RSA keys are supported for FILETYPE_TEXT") 

1889 

1890 rsa = _ffi.gc(_lib.EVP_PKEY_get1_RSA(pkey._pkey), _lib.RSA_free) 

1891 result_code = _lib.RSA_print(bio, rsa, 0) 

1892 else: 

1893 raise ValueError( 

1894 "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or " 

1895 "FILETYPE_TEXT" 

1896 ) 

1897 

1898 _openssl_assert(result_code != 0) 

1899 

1900 return _bio_to_string(bio) 

1901 

1902 

1903class _PassphraseHelper: 

1904 def __init__( 

1905 self, 

1906 type: int, 

1907 passphrase: PassphraseCallableT | None, 

1908 more_args: bool = False, 

1909 truncate: bool = False, 

1910 ) -> None: 

1911 if type != FILETYPE_PEM and passphrase is not None: 

1912 raise ValueError( 

1913 "only FILETYPE_PEM key format supports encryption" 

1914 ) 

1915 self._passphrase = passphrase 

1916 self._more_args = more_args 

1917 self._truncate = truncate 

1918 self._problems: list[Exception] = [] 

1919 

1920 @property 

1921 def callback(self) -> Any: 

1922 if self._passphrase is None: 

1923 return _ffi.NULL 

1924 elif isinstance(self._passphrase, bytes) or callable(self._passphrase): 

1925 return _ffi.callback("pem_password_cb", self._read_passphrase) 

1926 else: 

1927 raise TypeError( 

1928 "Last argument must be a byte string or a callable." 

1929 ) 

1930 

1931 @property 

1932 def callback_args(self) -> Any: 

1933 if self._passphrase is None: 

1934 return _ffi.NULL 

1935 elif isinstance(self._passphrase, bytes) or callable(self._passphrase): 

1936 return _ffi.NULL 

1937 else: 

1938 raise TypeError( 

1939 "Last argument must be a byte string or a callable." 

1940 ) 

1941 

1942 def raise_if_problem(self, exceptionType: type[Exception] = Error) -> None: 

1943 if self._problems: 

1944 # Flush the OpenSSL error queue 

1945 try: 

1946 _exception_from_error_queue(exceptionType) 

1947 except exceptionType: 

1948 pass 

1949 

1950 raise self._problems.pop(0) 

1951 

1952 def _read_passphrase( 

1953 self, buf: Any, size: int, rwflag: Any, userdata: Any 

1954 ) -> int: 

1955 try: 

1956 if callable(self._passphrase): 

1957 if self._more_args: 

1958 result = self._passphrase(size, rwflag, userdata) 

1959 else: 

1960 result = self._passphrase(rwflag) 

1961 else: 

1962 assert self._passphrase is not None 

1963 result = self._passphrase 

1964 if not isinstance(result, bytes): 

1965 raise ValueError("Bytes expected") 

1966 if len(result) > size: 

1967 if self._truncate: 

1968 result = result[:size] 

1969 else: 

1970 raise ValueError( 

1971 "passphrase returned by callback is too long" 

1972 ) 

1973 for i in range(len(result)): 

1974 buf[i] = result[i : i + 1] 

1975 return len(result) 

1976 except Exception as e: 

1977 self._problems.append(e) 

1978 return 0 

1979 

1980 

1981def load_publickey(type: int, buffer: str | bytes) -> PKey: 

1982 """ 

1983 Load a public key from a buffer. 

1984 

1985 :param type: The file type (one of :data:`FILETYPE_PEM`, 

1986 :data:`FILETYPE_ASN1`). 

1987 :param buffer: The buffer the key is stored in. 

1988 :type buffer: A Python string object, either unicode or bytestring. 

1989 :return: The PKey object. 

1990 :rtype: :class:`PKey` 

1991 """ 

1992 if isinstance(buffer, str): 

1993 buffer = buffer.encode("ascii") 

1994 

1995 bio = _new_mem_buf(buffer) 

1996 

1997 if type == FILETYPE_PEM: 

1998 evp_pkey = _lib.PEM_read_bio_PUBKEY( 

1999 bio, _ffi.NULL, _ffi.NULL, _ffi.NULL 

2000 ) 

2001 elif type == FILETYPE_ASN1: 

2002 evp_pkey = _lib.d2i_PUBKEY_bio(bio, _ffi.NULL) 

2003 else: 

2004 raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1") 

2005 

2006 if evp_pkey == _ffi.NULL: 

2007 _raise_current_error() 

2008 

2009 pkey = PKey.__new__(PKey) 

2010 pkey._pkey = _ffi.gc(evp_pkey, _lib.EVP_PKEY_free) 

2011 pkey._only_public = True 

2012 return pkey 

2013 

2014 

2015def load_privatekey( 

2016 type: int, 

2017 buffer: str | bytes, 

2018 passphrase: PassphraseCallableT | None = None, 

2019) -> PKey: 

2020 """ 

2021 Load a private key (PKey) from the string *buffer* encoded with the type 

2022 *type*. 

2023 

2024 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1) 

2025 :param buffer: The buffer the key is stored in 

2026 :param passphrase: (optional) if encrypted PEM format, this can be 

2027 either the passphrase to use, or a callback for 

2028 providing the passphrase. 

2029 

2030 :return: The PKey object 

2031 """ 

2032 if isinstance(buffer, str): 

2033 buffer = buffer.encode("ascii") 

2034 

2035 bio = _new_mem_buf(buffer) 

2036 

2037 helper = _PassphraseHelper(type, passphrase) 

2038 if type == FILETYPE_PEM: 

2039 evp_pkey = _lib.PEM_read_bio_PrivateKey( 

2040 bio, _ffi.NULL, helper.callback, helper.callback_args 

2041 ) 

2042 helper.raise_if_problem() 

2043 elif type == FILETYPE_ASN1: 

2044 evp_pkey = _lib.d2i_PrivateKey_bio(bio, _ffi.NULL) 

2045 else: 

2046 raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1") 

2047 

2048 if evp_pkey == _ffi.NULL: 

2049 _raise_current_error() 

2050 

2051 pkey = PKey.__new__(PKey) 

2052 pkey._pkey = _ffi.gc(evp_pkey, _lib.EVP_PKEY_free) 

2053 return pkey 

2054 

2055 

2056def dump_certificate_request(type: int, req: X509Req) -> bytes: 

2057 """ 

2058 Dump the certificate request *req* into a buffer string encoded with the 

2059 type *type*. 

2060 

2061 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1) 

2062 :param req: The certificate request to dump 

2063 :return: The buffer with the dumped certificate request in 

2064 

2065 

2066 .. deprecated:: 24.2.0 

2067 Use `cryptography.x509.CertificateSigningRequest` instead. 

2068 """ 

2069 bio = _new_mem_buf() 

2070 

2071 if type == FILETYPE_PEM: 

2072 result_code = _lib.PEM_write_bio_X509_REQ(bio, req._req) 

2073 elif type == FILETYPE_ASN1: 

2074 result_code = _lib.i2d_X509_REQ_bio(bio, req._req) 

2075 elif type == FILETYPE_TEXT: 

2076 result_code = _lib.X509_REQ_print_ex(bio, req._req, 0, 0) 

2077 else: 

2078 raise ValueError( 

2079 "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or " 

2080 "FILETYPE_TEXT" 

2081 ) 

2082 

2083 _openssl_assert(result_code != 0) 

2084 

2085 return _bio_to_string(bio) 

2086 

2087 

2088_dump_certificate_request_internal = dump_certificate_request 

2089 

2090utils.deprecated( 

2091 dump_certificate_request, 

2092 __name__, 

2093 ( 

2094 "CSR support in pyOpenSSL is deprecated. You should use the APIs " 

2095 "in cryptography." 

2096 ), 

2097 DeprecationWarning, 

2098 name="dump_certificate_request", 

2099) 

2100 

2101 

2102def load_certificate_request(type: int, buffer: bytes) -> X509Req: 

2103 """ 

2104 Load a certificate request (X509Req) from the string *buffer* encoded with 

2105 the type *type*. 

2106 

2107 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1) 

2108 :param buffer: The buffer the certificate request is stored in 

2109 :return: The X509Req object 

2110 

2111 .. deprecated:: 24.2.0 

2112 Use `cryptography.x509.load_der_x509_csr` or 

2113 `cryptography.x509.load_pem_x509_csr` instead. 

2114 """ 

2115 if isinstance(buffer, str): 

2116 buffer = buffer.encode("ascii") 

2117 

2118 bio = _new_mem_buf(buffer) 

2119 

2120 if type == FILETYPE_PEM: 

2121 req = _lib.PEM_read_bio_X509_REQ(bio, _ffi.NULL, _ffi.NULL, _ffi.NULL) 

2122 elif type == FILETYPE_ASN1: 

2123 req = _lib.d2i_X509_REQ_bio(bio, _ffi.NULL) 

2124 else: 

2125 raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1") 

2126 

2127 _openssl_assert(req != _ffi.NULL) 

2128 

2129 x509req = X509Req.__new__(X509Req) 

2130 x509req._req = _ffi.gc(req, _lib.X509_REQ_free) 

2131 return x509req 

2132 

2133 

2134_load_certificate_request_internal = load_certificate_request 

2135 

2136utils.deprecated( 

2137 load_certificate_request, 

2138 __name__, 

2139 ( 

2140 "CSR support in pyOpenSSL is deprecated. You should use the APIs " 

2141 "in cryptography." 

2142 ), 

2143 DeprecationWarning, 

2144 name="load_certificate_request", 

2145)