Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/OpenSSL/crypto.py: 23%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

869 statements  

1from __future__ import annotations 

2 

3import calendar 

4import datetime 

5import functools 

6import sys 

7import typing 

8import warnings 

9from base64 import b16encode 

10from collections.abc import Iterable, Sequence 

11from functools import partial 

12from typing import ( 

13 Any, 

14 Callable, 

15 Union, 

16) 

17 

18if sys.version_info >= (3, 13): 

19 from warnings import deprecated 

20elif sys.version_info < (3, 8): 

21 _T = typing.TypeVar("T") 

22 

23 def deprecated(msg: str, **kwargs: object) -> Callable[[_T], _T]: 

24 return lambda f: f 

25else: 

26 from typing_extensions import deprecated 

27 

28from cryptography import utils, x509 

29from cryptography.hazmat.primitives.asymmetric import ( 

30 dsa, 

31 ec, 

32 ed448, 

33 ed25519, 

34 rsa, 

35) 

36 

37from OpenSSL._util import StrOrBytesPath 

38from OpenSSL._util import ( 

39 byte_string as _byte_string, 

40) 

41from OpenSSL._util import ( 

42 exception_from_error_queue as _exception_from_error_queue, 

43) 

44from OpenSSL._util import ( 

45 ffi as _ffi, 

46) 

47from OpenSSL._util import ( 

48 lib as _lib, 

49) 

50from OpenSSL._util import ( 

51 make_assert as _make_assert, 

52) 

53from OpenSSL._util import ( 

54 path_bytes as _path_bytes, 

55) 

56 

57__all__ = [ 

58 "FILETYPE_ASN1", 

59 "FILETYPE_PEM", 

60 "FILETYPE_TEXT", 

61 "TYPE_DSA", 

62 "TYPE_RSA", 

63 "X509", 

64 "Error", 

65 "PKey", 

66 "X509Extension", 

67 "X509Name", 

68 "X509Req", 

69 "X509Store", 

70 "X509StoreContext", 

71 "X509StoreContextError", 

72 "X509StoreFlags", 

73 "dump_certificate", 

74 "dump_certificate_request", 

75 "dump_privatekey", 

76 "dump_publickey", 

77 "get_elliptic_curve", 

78 "get_elliptic_curves", 

79 "load_certificate", 

80 "load_certificate_request", 

81 "load_privatekey", 

82 "load_publickey", 

83] 

84 

85 

86_PrivateKey = Union[ 

87 dsa.DSAPrivateKey, 

88 ec.EllipticCurvePrivateKey, 

89 ed25519.Ed25519PrivateKey, 

90 ed448.Ed448PrivateKey, 

91 rsa.RSAPrivateKey, 

92] 

93_PublicKey = Union[ 

94 dsa.DSAPublicKey, 

95 ec.EllipticCurvePublicKey, 

96 ed25519.Ed25519PublicKey, 

97 ed448.Ed448PublicKey, 

98 rsa.RSAPublicKey, 

99] 

100_Key = Union[_PrivateKey, _PublicKey] 

101PassphraseCallableT = Union[bytes, Callable[..., bytes]] 

102 

103 

104FILETYPE_PEM: int = _lib.SSL_FILETYPE_PEM 

105FILETYPE_ASN1: int = _lib.SSL_FILETYPE_ASN1 

106 

107# TODO This was an API mistake. OpenSSL has no such constant. 

108FILETYPE_TEXT = 2**16 - 1 

109 

110TYPE_RSA: int = _lib.EVP_PKEY_RSA 

111TYPE_DSA: int = _lib.EVP_PKEY_DSA 

112TYPE_DH: int = _lib.EVP_PKEY_DH 

113TYPE_EC: int = _lib.EVP_PKEY_EC 

114 

115 

116class Error(Exception): 

117 """ 

118 An error occurred in an `OpenSSL.crypto` API. 

119 """ 

120 

121 

122_raise_current_error = partial(_exception_from_error_queue, Error) 

123_openssl_assert = _make_assert(Error) 

124 

125 

126def _new_mem_buf(buffer: bytes | None = None) -> Any: 

127 """ 

128 Allocate a new OpenSSL memory BIO. 

129 

130 Arrange for the garbage collector to clean it up automatically. 

131 

132 :param buffer: None or some bytes to use to put into the BIO so that they 

133 can be read out. 

134 """ 

135 if buffer is None: 

136 bio = _lib.BIO_new(_lib.BIO_s_mem()) 

137 free = _lib.BIO_free 

138 else: 

139 data = _ffi.new("char[]", buffer) 

140 bio = _lib.BIO_new_mem_buf(data, len(buffer)) 

141 

142 # Keep the memory alive as long as the bio is alive! 

143 def free(bio: Any, ref: Any = data) -> Any: 

144 return _lib.BIO_free(bio) 

145 

146 _openssl_assert(bio != _ffi.NULL) 

147 

148 bio = _ffi.gc(bio, free) 

149 return bio 

150 

151 

152def _bio_to_string(bio: Any) -> bytes: 

153 """ 

154 Copy the contents of an OpenSSL BIO object into a Python byte string. 

155 """ 

156 result_buffer = _ffi.new("char**") 

157 buffer_length = _lib.BIO_get_mem_data(bio, result_buffer) 

158 return _ffi.buffer(result_buffer[0], buffer_length)[:] 

159 

160 

161def _set_asn1_time(boundary: Any, when: bytes) -> None: 

162 """ 

163 The the time value of an ASN1 time object. 

164 

165 @param boundary: An ASN1_TIME pointer (or an object safely 

166 castable to that type) which will have its value set. 

167 @param when: A string representation of the desired time value. 

168 

169 @raise TypeError: If C{when} is not a L{bytes} string. 

170 @raise ValueError: If C{when} does not represent a time in the required 

171 format. 

172 @raise RuntimeError: If the time value cannot be set for some other 

173 (unspecified) reason. 

174 """ 

175 if not isinstance(when, bytes): 

176 raise TypeError("when must be a byte string") 

177 # ASN1_TIME_set_string validates the string without writing anything 

178 # when the destination is NULL. 

179 _openssl_assert(boundary != _ffi.NULL) 

180 

181 set_result = _lib.ASN1_TIME_set_string(boundary, when) 

182 if set_result == 0: 

183 raise ValueError("Invalid string") 

184 

185 

186def _new_asn1_time(when: bytes) -> Any: 

187 """ 

188 Behaves like _set_asn1_time but returns a new ASN1_TIME object. 

189 

190 @param when: A string representation of the desired time value. 

191 

192 @raise TypeError: If C{when} is not a L{bytes} string. 

193 @raise ValueError: If C{when} does not represent a time in the required 

194 format. 

195 @raise RuntimeError: If the time value cannot be set for some other 

196 (unspecified) reason. 

197 """ 

198 ret = _lib.ASN1_TIME_new() 

199 _openssl_assert(ret != _ffi.NULL) 

200 ret = _ffi.gc(ret, _lib.ASN1_TIME_free) 

201 _set_asn1_time(ret, when) 

202 return ret 

203 

204 

205def _get_asn1_time(timestamp: Any) -> bytes | None: 

206 """ 

207 Retrieve the time value of an ASN1 time object. 

208 

209 @param timestamp: An ASN1_GENERALIZEDTIME* (or an object safely castable to 

210 that type) from which the time value will be retrieved. 

211 

212 @return: The time value from C{timestamp} as a L{bytes} string in a certain 

213 format. Or C{None} if the object contains no time value. 

214 """ 

215 string_timestamp = _ffi.cast("ASN1_STRING*", timestamp) 

216 if _lib.ASN1_STRING_length(string_timestamp) == 0: 

217 return None 

218 elif ( 

219 _lib.ASN1_STRING_type(string_timestamp) == _lib.V_ASN1_GENERALIZEDTIME 

220 ): 

221 return _ffi.string(_lib.ASN1_STRING_get0_data(string_timestamp)) 

222 else: 

223 generalized_timestamp = _ffi.new("ASN1_GENERALIZEDTIME**") 

224 _lib.ASN1_TIME_to_generalizedtime(timestamp, generalized_timestamp) 

225 _openssl_assert(generalized_timestamp[0] != _ffi.NULL) 

226 

227 string_timestamp = _ffi.cast("ASN1_STRING*", generalized_timestamp[0]) 

228 string_data = _lib.ASN1_STRING_get0_data(string_timestamp) 

229 string_result = _ffi.string(string_data) 

230 _lib.ASN1_GENERALIZEDTIME_free(generalized_timestamp[0]) 

231 return string_result 

232 

233 

234class _X509NameInvalidator: 

235 def __init__(self) -> None: 

236 self._names: list[X509Name] = [] 

237 

238 def add(self, name: X509Name) -> None: 

239 self._names.append(name) 

240 

241 def clear(self) -> None: 

242 for name in self._names: 

243 # Breaks the object, but also prevents UAF! 

244 del name._name 

245 

246 

247class PKey: 

248 """ 

249 A class representing an DSA or RSA public key or key pair. 

250 """ 

251 

252 _only_public = False 

253 _initialized = True 

254 

255 def __init__(self) -> None: 

256 pkey = _lib.EVP_PKEY_new() 

257 self._pkey = _ffi.gc(pkey, _lib.EVP_PKEY_free) 

258 self._initialized = False 

259 

260 def to_cryptography_key(self) -> _Key: 

261 """ 

262 Export as a ``cryptography`` key. 

263 

264 :rtype: One of ``cryptography``'s `key interfaces`_. 

265 

266 .. _key interfaces: https://cryptography.io/en/latest/hazmat/\ 

267 primitives/asymmetric/rsa/#key-interfaces 

268 

269 .. versionadded:: 16.1.0 

270 """ 

271 from cryptography.hazmat.primitives.serialization import ( 

272 load_der_private_key, 

273 load_der_public_key, 

274 ) 

275 

276 if self._only_public: 

277 der = dump_publickey(FILETYPE_ASN1, self) 

278 return typing.cast(_Key, load_der_public_key(der)) 

279 else: 

280 der = dump_privatekey(FILETYPE_ASN1, self) 

281 return typing.cast(_Key, load_der_private_key(der, password=None)) 

282 

283 @classmethod 

284 def from_cryptography_key(cls, crypto_key: _Key) -> PKey: 

285 """ 

286 Construct based on a ``cryptography`` *crypto_key*. 

287 

288 :param crypto_key: A ``cryptography`` key. 

289 :type crypto_key: One of ``cryptography``'s `key interfaces`_. 

290 

291 :rtype: PKey 

292 

293 .. versionadded:: 16.1.0 

294 """ 

295 if not isinstance( 

296 crypto_key, 

297 ( 

298 dsa.DSAPrivateKey, 

299 dsa.DSAPublicKey, 

300 ec.EllipticCurvePrivateKey, 

301 ec.EllipticCurvePublicKey, 

302 ed25519.Ed25519PrivateKey, 

303 ed25519.Ed25519PublicKey, 

304 ed448.Ed448PrivateKey, 

305 ed448.Ed448PublicKey, 

306 rsa.RSAPrivateKey, 

307 rsa.RSAPublicKey, 

308 ), 

309 ): 

310 raise TypeError("Unsupported key type") 

311 

312 from cryptography.hazmat.primitives.serialization import ( 

313 Encoding, 

314 NoEncryption, 

315 PrivateFormat, 

316 PublicFormat, 

317 ) 

318 

319 if isinstance( 

320 crypto_key, 

321 ( 

322 dsa.DSAPublicKey, 

323 ec.EllipticCurvePublicKey, 

324 ed25519.Ed25519PublicKey, 

325 ed448.Ed448PublicKey, 

326 rsa.RSAPublicKey, 

327 ), 

328 ): 

329 return load_publickey( 

330 FILETYPE_ASN1, 

331 crypto_key.public_bytes( 

332 Encoding.DER, PublicFormat.SubjectPublicKeyInfo 

333 ), 

334 ) 

335 else: 

336 der = crypto_key.private_bytes( 

337 Encoding.DER, PrivateFormat.PKCS8, NoEncryption() 

338 ) 

339 return load_privatekey(FILETYPE_ASN1, der) 

340 

341 def generate_key(self, type: int, bits: int) -> None: 

342 """ 

343 Generate a key pair of the given type, with the given number of bits. 

344 

345 This generates a key "into" the this object. 

346 

347 :param type: The key type. 

348 :type type: :py:data:`TYPE_RSA` or :py:data:`TYPE_DSA` 

349 :param bits: The number of bits. 

350 :type bits: :py:data:`int` ``>= 0`` 

351 :raises TypeError: If :py:data:`type` or :py:data:`bits` isn't 

352 of the appropriate type. 

353 :raises ValueError: If the number of bits isn't an integer of 

354 the appropriate size. 

355 :return: ``None`` 

356 """ 

357 if not isinstance(type, int): 

358 raise TypeError("type must be an integer") 

359 

360 if not isinstance(bits, int): 

361 raise TypeError("bits must be an integer") 

362 

363 if type == TYPE_RSA: 

364 if bits <= 0: 

365 raise ValueError("Invalid number of bits") 

366 

367 # TODO Check error return 

368 exponent = _lib.BN_new() 

369 exponent = _ffi.gc(exponent, _lib.BN_free) 

370 _lib.BN_set_word(exponent, _lib.RSA_F4) 

371 

372 rsa = _lib.RSA_new() 

373 

374 result = _lib.RSA_generate_key_ex(rsa, bits, exponent, _ffi.NULL) 

375 _openssl_assert(result == 1) 

376 

377 result = _lib.EVP_PKEY_assign_RSA(self._pkey, rsa) 

378 _openssl_assert(result == 1) 

379 

380 elif type == TYPE_DSA: 

381 dsa = _lib.DSA_new() 

382 _openssl_assert(dsa != _ffi.NULL) 

383 

384 dsa = _ffi.gc(dsa, _lib.DSA_free) 

385 res = _lib.DSA_generate_parameters_ex( 

386 dsa, bits, _ffi.NULL, 0, _ffi.NULL, _ffi.NULL, _ffi.NULL 

387 ) 

388 _openssl_assert(res == 1) 

389 

390 _openssl_assert(_lib.DSA_generate_key(dsa) == 1) 

391 _openssl_assert(_lib.EVP_PKEY_set1_DSA(self._pkey, dsa) == 1) 

392 else: 

393 raise Error("No such key type") 

394 

395 self._initialized = True 

396 

397 def check(self) -> bool: 

398 """ 

399 Check the consistency of an RSA private key. 

400 

401 This is the Python equivalent of OpenSSL's ``RSA_check_key``. 

402 

403 :return: ``True`` if key is consistent. 

404 

405 :raise OpenSSL.crypto.Error: if the key is inconsistent. 

406 

407 :raise TypeError: if the key is of a type which cannot be checked. 

408 Only RSA keys can currently be checked. 

409 """ 

410 if self._only_public: 

411 raise TypeError("public key only") 

412 

413 if _lib.EVP_PKEY_type(self.type()) != _lib.EVP_PKEY_RSA: 

414 raise TypeError("Only RSA keys can currently be checked.") 

415 

416 rsa = _lib.EVP_PKEY_get1_RSA(self._pkey) 

417 rsa = _ffi.gc(rsa, _lib.RSA_free) 

418 result = _lib.RSA_check_key(rsa) 

419 if result == 1: 

420 return True 

421 _raise_current_error() 

422 

423 def type(self) -> int: 

424 """ 

425 Returns the type of the key 

426 

427 :return: The type of the key. 

428 """ 

429 return _lib.EVP_PKEY_id(self._pkey) 

430 

431 def bits(self) -> int: 

432 """ 

433 Returns the number of bits of the key 

434 

435 :return: The number of bits of the key. 

436 """ 

437 return _lib.EVP_PKEY_bits(self._pkey) 

438 

439 

440class _EllipticCurve: 

441 """ 

442 A representation of a supported elliptic curve. 

443 

444 @cvar _curves: :py:obj:`None` until an attempt is made to load the curves. 

445 Thereafter, a :py:type:`set` containing :py:type:`_EllipticCurve` 

446 instances each of which represents one curve supported by the system. 

447 @type _curves: :py:type:`NoneType` or :py:type:`set` 

448 """ 

449 

450 _curves = None 

451 

452 def __ne__(self, other: Any) -> bool: 

453 """ 

454 Implement cooperation with the right-hand side argument of ``!=``. 

455 

456 Python 3 seems to have dropped this cooperation in this very narrow 

457 circumstance. 

458 """ 

459 if isinstance(other, _EllipticCurve): 

460 return super().__ne__(other) 

461 return NotImplemented 

462 

463 @classmethod 

464 def _load_elliptic_curves(cls, lib: Any) -> set[_EllipticCurve]: 

465 """ 

466 Get the curves supported by OpenSSL. 

467 

468 :param lib: The OpenSSL library binding object. 

469 

470 :return: A :py:type:`set` of ``cls`` instances giving the names of the 

471 elliptic curves the underlying library supports. 

472 """ 

473 num_curves = lib.EC_get_builtin_curves(_ffi.NULL, 0) 

474 builtin_curves = _ffi.new("EC_builtin_curve[]", num_curves) 

475 # The return value on this call should be num_curves again. We 

476 # could check it to make sure but if it *isn't* then.. what could 

477 # we do? Abort the whole process, I suppose...? -exarkun 

478 lib.EC_get_builtin_curves(builtin_curves, num_curves) 

479 return set(cls.from_nid(lib, c.nid) for c in builtin_curves) 

480 

481 @classmethod 

482 def _get_elliptic_curves(cls, lib: Any) -> set[_EllipticCurve]: 

483 """ 

484 Get, cache, and return the curves supported by OpenSSL. 

485 

486 :param lib: The OpenSSL library binding object. 

487 

488 :return: A :py:type:`set` of ``cls`` instances giving the names of the 

489 elliptic curves the underlying library supports. 

490 """ 

491 if cls._curves is None: 

492 cls._curves = cls._load_elliptic_curves(lib) 

493 return cls._curves 

494 

495 @classmethod 

496 def from_nid(cls, lib: Any, nid: int) -> _EllipticCurve: 

497 """ 

498 Instantiate a new :py:class:`_EllipticCurve` associated with the given 

499 OpenSSL NID. 

500 

501 :param lib: The OpenSSL library binding object. 

502 

503 :param nid: The OpenSSL NID the resulting curve object will represent. 

504 This must be a curve NID (and not, for example, a hash NID) or 

505 subsequent operations will fail in unpredictable ways. 

506 :type nid: :py:class:`int` 

507 

508 :return: The curve object. 

509 """ 

510 return cls(lib, nid, _ffi.string(lib.OBJ_nid2sn(nid)).decode("ascii")) 

511 

512 def __init__(self, lib: Any, nid: int, name: str) -> None: 

513 """ 

514 :param _lib: The :py:mod:`cryptography` binding instance used to 

515 interface with OpenSSL. 

516 

517 :param _nid: The OpenSSL NID identifying the curve this object 

518 represents. 

519 :type _nid: :py:class:`int` 

520 

521 :param name: The OpenSSL short name identifying the curve this object 

522 represents. 

523 :type name: :py:class:`unicode` 

524 """ 

525 self._lib = lib 

526 self._nid = nid 

527 self.name = name 

528 

529 def __repr__(self) -> str: 

530 return f"<Curve {self.name!r}>" 

531 

532 def _to_EC_KEY(self) -> Any: 

533 """ 

534 Create a new OpenSSL EC_KEY structure initialized to use this curve. 

535 

536 The structure is automatically garbage collected when the Python object 

537 is garbage collected. 

538 """ 

539 key = self._lib.EC_KEY_new_by_curve_name(self._nid) 

540 return _ffi.gc(key, _lib.EC_KEY_free) 

541 

542 

543@deprecated( 

544 "get_elliptic_curves is deprecated. You should use the APIs in " 

545 "cryptography instead." 

546) 

547def get_elliptic_curves() -> set[_EllipticCurve]: 

548 """ 

549 Return a set of objects representing the elliptic curves supported in the 

550 OpenSSL build in use. 

551 

552 The curve objects have a :py:class:`unicode` ``name`` attribute by which 

553 they identify themselves. 

554 

555 The curve objects are useful as values for the argument accepted by 

556 :py:meth:`Context.set_tmp_ecdh` to specify which elliptical curve should be 

557 used for ECDHE key exchange. 

558 """ 

559 return _EllipticCurve._get_elliptic_curves(_lib) 

560 

561 

562@deprecated( 

563 "get_elliptic_curve is deprecated. You should use the APIs in " 

564 "cryptography instead." 

565) 

566def get_elliptic_curve(name: str) -> _EllipticCurve: 

567 """ 

568 Return a single curve object selected by name. 

569 

570 See :py:func:`get_elliptic_curves` for information about curve objects. 

571 

572 :param name: The OpenSSL short name identifying the curve object to 

573 retrieve. 

574 :type name: :py:class:`unicode` 

575 

576 If the named curve is not supported then :py:class:`ValueError` is raised. 

577 """ 

578 for curve in get_elliptic_curves(): 

579 if curve.name == name: 

580 return curve 

581 raise ValueError("unknown curve name", name) 

582 

583 

584@functools.total_ordering 

585class X509Name: 

586 """ 

587 An X.509 Distinguished Name. 

588 

589 :ivar countryName: The country of the entity. 

590 :ivar C: Alias for :py:attr:`countryName`. 

591 

592 :ivar stateOrProvinceName: The state or province of the entity. 

593 :ivar ST: Alias for :py:attr:`stateOrProvinceName`. 

594 

595 :ivar localityName: The locality of the entity. 

596 :ivar L: Alias for :py:attr:`localityName`. 

597 

598 :ivar organizationName: The organization name of the entity. 

599 :ivar O: Alias for :py:attr:`organizationName`. 

600 

601 :ivar organizationalUnitName: The organizational unit of the entity. 

602 :ivar OU: Alias for :py:attr:`organizationalUnitName` 

603 

604 :ivar commonName: The common name of the entity. 

605 :ivar CN: Alias for :py:attr:`commonName`. 

606 

607 :ivar emailAddress: The e-mail address of the entity. 

608 """ 

609 

610 def __init__(self, name: X509Name) -> None: 

611 """ 

612 Create a new X509Name, copying the given X509Name instance. 

613 

614 :param name: The name to copy. 

615 :type name: :py:class:`X509Name` 

616 """ 

617 name = _lib.X509_NAME_dup(name._name) 

618 self._name: Any = _ffi.gc(name, _lib.X509_NAME_free) 

619 

620 def __setattr__(self, name: str, value: Any) -> None: 

621 if name.startswith("_"): 

622 return super().__setattr__(name, value) 

623 

624 # Note: we really do not want str subclasses here, so we do not use 

625 # isinstance. 

626 if type(name) is not str: 

627 raise TypeError( 

628 f"attribute name must be string, not " 

629 f"'{type(value).__name__:.200}'" 

630 ) 

631 

632 nid = _lib.OBJ_txt2nid(_byte_string(name)) 

633 if nid == _lib.NID_undef: 

634 try: 

635 _raise_current_error() 

636 except Error: 

637 pass 

638 raise AttributeError("No such attribute") 

639 

640 # If there's an old entry for this NID, remove it 

641 for i in range(_lib.X509_NAME_entry_count(self._name)): 

642 ent = _lib.X509_NAME_get_entry(self._name, i) 

643 ent_obj = _lib.X509_NAME_ENTRY_get_object(ent) 

644 ent_nid = _lib.OBJ_obj2nid(ent_obj) 

645 if nid == ent_nid: 

646 ent = _lib.X509_NAME_delete_entry(self._name, i) 

647 _lib.X509_NAME_ENTRY_free(ent) 

648 break 

649 

650 if isinstance(value, str): 

651 value = value.encode("utf-8") 

652 

653 add_result = _lib.X509_NAME_add_entry_by_NID( 

654 self._name, nid, _lib.MBSTRING_UTF8, value, -1, -1, 0 

655 ) 

656 if not add_result: 

657 _raise_current_error() 

658 

659 def __getattr__(self, name: str) -> str | None: 

660 """ 

661 Find attribute. An X509Name object has the following attributes: 

662 countryName (alias C), stateOrProvince (alias ST), locality (alias L), 

663 organization (alias O), organizationalUnit (alias OU), commonName 

664 (alias CN) and more... 

665 """ 

666 nid = _lib.OBJ_txt2nid(_byte_string(name)) 

667 if nid == _lib.NID_undef: 

668 # This is a bit weird. OBJ_txt2nid indicated failure, but it seems 

669 # a lower level function, a2d_ASN1_OBJECT, also feels the need to 

670 # push something onto the error queue. If we don't clean that up 

671 # now, someone else will bump into it later and be quite confused. 

672 # See lp#314814. 

673 try: 

674 _raise_current_error() 

675 except Error: 

676 pass 

677 raise AttributeError("No such attribute") 

678 

679 entry_index = _lib.X509_NAME_get_index_by_NID(self._name, nid, -1) 

680 if entry_index == -1: 

681 return None 

682 

683 entry = _lib.X509_NAME_get_entry(self._name, entry_index) 

684 data = _lib.X509_NAME_ENTRY_get_data(entry) 

685 

686 result_buffer = _ffi.new("unsigned char**") 

687 data_length = _lib.ASN1_STRING_to_UTF8(result_buffer, data) 

688 _openssl_assert(data_length >= 0) 

689 

690 try: 

691 result = _ffi.buffer(result_buffer[0], data_length)[:].decode( 

692 "utf-8" 

693 ) 

694 finally: 

695 # XXX untested 

696 _lib.OPENSSL_free(result_buffer[0]) 

697 return result 

698 

699 def __eq__(self, other: Any) -> bool: 

700 if not isinstance(other, X509Name): 

701 return NotImplemented 

702 

703 return _lib.X509_NAME_cmp(self._name, other._name) == 0 

704 

705 def __lt__(self, other: Any) -> bool: 

706 if not isinstance(other, X509Name): 

707 return NotImplemented 

708 

709 return _lib.X509_NAME_cmp(self._name, other._name) < 0 

710 

711 def __repr__(self) -> str: 

712 """ 

713 String representation of an X509Name 

714 """ 

715 result_buffer = _ffi.new("char[]", 512) 

716 format_result = _lib.X509_NAME_oneline( 

717 self._name, result_buffer, len(result_buffer) 

718 ) 

719 _openssl_assert(format_result != _ffi.NULL) 

720 

721 return "<X509Name object '{}'>".format( 

722 _ffi.string(result_buffer).decode("utf-8"), 

723 ) 

724 

725 def hash(self) -> int: 

726 """ 

727 Return an integer representation of the first four bytes of the 

728 MD5 digest of the DER representation of the name. 

729 

730 This is the Python equivalent of OpenSSL's ``X509_NAME_hash``. 

731 

732 :return: The (integer) hash of this name. 

733 :rtype: :py:class:`int` 

734 """ 

735 return _lib.X509_NAME_hash(self._name) 

736 

737 def der(self) -> bytes: 

738 """ 

739 Return the DER encoding of this name. 

740 

741 :return: The DER encoded form of this name. 

742 :rtype: :py:class:`bytes` 

743 """ 

744 result_buffer = _ffi.new("unsigned char**") 

745 encode_result = _lib.i2d_X509_NAME(self._name, result_buffer) 

746 _openssl_assert(encode_result >= 0) 

747 

748 string_result = _ffi.buffer(result_buffer[0], encode_result)[:] 

749 _lib.OPENSSL_free(result_buffer[0]) 

750 return string_result 

751 

752 def get_components(self) -> list[tuple[bytes, bytes]]: 

753 """ 

754 Returns the components of this name, as a sequence of 2-tuples. 

755 

756 :return: The components of this name. 

757 :rtype: :py:class:`list` of ``name, value`` tuples. 

758 """ 

759 result = [] 

760 for i in range(_lib.X509_NAME_entry_count(self._name)): 

761 ent = _lib.X509_NAME_get_entry(self._name, i) 

762 

763 fname = _lib.X509_NAME_ENTRY_get_object(ent) 

764 fval = _lib.X509_NAME_ENTRY_get_data(ent) 

765 

766 nid = _lib.OBJ_obj2nid(fname) 

767 name = _lib.OBJ_nid2sn(nid) 

768 

769 # ffi.string does not handle strings containing NULL bytes 

770 # (which may have been generated by old, broken software) 

771 value = _ffi.buffer( 

772 _lib.ASN1_STRING_get0_data(fval), _lib.ASN1_STRING_length(fval) 

773 )[:] 

774 result.append((_ffi.string(name), value)) 

775 

776 return result 

777 

778 

779@deprecated( 

780 "X509Extension support in pyOpenSSL is deprecated. You should use the " 

781 "APIs in cryptography." 

782) 

783class X509Extension: 

784 """ 

785 An X.509 v3 certificate extension. 

786 

787 .. deprecated:: 23.3.0 

788 Use cryptography's X509 APIs instead. 

789 """ 

790 

791 def __init__( 

792 self, 

793 type_name: bytes, 

794 critical: bool, 

795 value: bytes, 

796 subject: X509 | None = None, 

797 issuer: X509 | None = None, 

798 ) -> None: 

799 """ 

800 Initializes an X509 extension. 

801 

802 :param type_name: The name of the type of extension_ to create. 

803 :type type_name: :py:data:`bytes` 

804 

805 :param bool critical: A flag indicating whether this is a critical 

806 extension. 

807 

808 :param value: The OpenSSL textual representation of the extension's 

809 value. 

810 :type value: :py:data:`bytes` 

811 

812 :param subject: Optional X509 certificate to use as subject. 

813 :type subject: :py:class:`X509` 

814 

815 :param issuer: Optional X509 certificate to use as issuer. 

816 :type issuer: :py:class:`X509` 

817 

818 .. _extension: https://www.openssl.org/docs/manmaster/man5/ 

819 x509v3_config.html#STANDARD-EXTENSIONS 

820 """ 

821 ctx = _ffi.new("X509V3_CTX*") 

822 

823 # A context is necessary for any extension which uses the r2i 

824 # conversion method. That is, X509V3_EXT_nconf may segfault if passed 

825 # a NULL ctx. Start off by initializing most of the fields to NULL. 

826 _lib.X509V3_set_ctx(ctx, _ffi.NULL, _ffi.NULL, _ffi.NULL, _ffi.NULL, 0) 

827 

828 # We have no configuration database - but perhaps we should (some 

829 # extensions may require it). 

830 _lib.X509V3_set_ctx_nodb(ctx) 

831 

832 # Initialize the subject and issuer, if appropriate. ctx is a local, 

833 # and as far as I can tell none of the X509V3_* APIs invoked here steal 

834 # any references, so no need to mess with reference counts or 

835 # duplicates. 

836 if issuer is not None: 

837 if not isinstance(issuer, X509): 

838 raise TypeError("issuer must be an X509 instance") 

839 ctx.issuer_cert = issuer._x509 

840 if subject is not None: 

841 if not isinstance(subject, X509): 

842 raise TypeError("subject must be an X509 instance") 

843 ctx.subject_cert = subject._x509 

844 

845 if critical: 

846 # There are other OpenSSL APIs which would let us pass in critical 

847 # separately, but they're harder to use, and since value is already 

848 # a pile of crappy junk smuggling a ton of utterly important 

849 # structured data, what's the point of trying to avoid nasty stuff 

850 # with strings? (However, X509V3_EXT_i2d in particular seems like 

851 # it would be a better API to invoke. I do not know where to get 

852 # the ext_struc it desires for its last parameter, though.) 

853 value = b"critical," + value 

854 

855 extension = _lib.X509V3_EXT_nconf(_ffi.NULL, ctx, type_name, value) 

856 if extension == _ffi.NULL: 

857 _raise_current_error() 

858 self._extension = _ffi.gc(extension, _lib.X509_EXTENSION_free) 

859 

860 @property 

861 def _nid(self) -> Any: 

862 return _lib.OBJ_obj2nid( 

863 _lib.X509_EXTENSION_get_object(self._extension) 

864 ) 

865 

866 _prefixes: typing.ClassVar[dict[int, str]] = { 

867 _lib.GEN_EMAIL: "email", 

868 _lib.GEN_DNS: "DNS", 

869 _lib.GEN_URI: "URI", 

870 } 

871 

872 def _subjectAltNameString(self) -> str: 

873 names = _ffi.cast( 

874 "GENERAL_NAMES*", _lib.X509V3_EXT_d2i(self._extension) 

875 ) 

876 

877 names = _ffi.gc(names, _lib.GENERAL_NAMES_free) 

878 parts = [] 

879 for i in range(_lib.sk_GENERAL_NAME_num(names)): 

880 name = _lib.sk_GENERAL_NAME_value(names, i) 

881 try: 

882 label = self._prefixes[name.type] 

883 except KeyError: 

884 bio = _new_mem_buf() 

885 _lib.GENERAL_NAME_print(bio, name) 

886 parts.append(_bio_to_string(bio).decode("utf-8")) 

887 else: 

888 value = _ffi.buffer(name.d.ia5.data, name.d.ia5.length)[ 

889 : 

890 ].decode("utf-8") 

891 parts.append(label + ":" + value) 

892 return ", ".join(parts) 

893 

894 def __str__(self) -> str: 

895 """ 

896 :return: a nice text representation of the extension 

897 """ 

898 if _lib.NID_subject_alt_name == self._nid: 

899 return self._subjectAltNameString() 

900 

901 bio = _new_mem_buf() 

902 print_result = _lib.X509V3_EXT_print(bio, self._extension, 0, 0) 

903 _openssl_assert(print_result != 0) 

904 

905 return _bio_to_string(bio).decode("utf-8") 

906 

907 def get_critical(self) -> bool: 

908 """ 

909 Returns the critical field of this X.509 extension. 

910 

911 :return: The critical field. 

912 """ 

913 return _lib.X509_EXTENSION_get_critical(self._extension) 

914 

915 def get_short_name(self) -> bytes: 

916 """ 

917 Returns the short type name of this X.509 extension. 

918 

919 The result is a byte string such as :py:const:`b"basicConstraints"`. 

920 

921 :return: The short type name. 

922 :rtype: :py:data:`bytes` 

923 

924 .. versionadded:: 0.12 

925 """ 

926 obj = _lib.X509_EXTENSION_get_object(self._extension) 

927 nid = _lib.OBJ_obj2nid(obj) 

928 # OpenSSL 3.1.0 has a bug where nid2sn returns NULL for NIDs that 

929 # previously returned UNDEF. This is a workaround for that issue. 

930 # https://github.com/openssl/openssl/commit/908ba3ed9adbb3df90f76 

931 buf = _lib.OBJ_nid2sn(nid) 

932 if buf != _ffi.NULL: 

933 return _ffi.string(buf) 

934 else: 

935 return b"UNDEF" 

936 

937 def get_data(self) -> bytes: 

938 """ 

939 Returns the data of the X509 extension, encoded as ASN.1. 

940 

941 :return: The ASN.1 encoded data of this X509 extension. 

942 :rtype: :py:data:`bytes` 

943 

944 .. versionadded:: 0.12 

945 """ 

946 octet_result = _lib.X509_EXTENSION_get_data(self._extension) 

947 string_result = _ffi.cast("ASN1_STRING*", octet_result) 

948 char_result = _lib.ASN1_STRING_get0_data(string_result) 

949 result_length = _lib.ASN1_STRING_length(string_result) 

950 return _ffi.buffer(char_result, result_length)[:] 

951 

952 

953@deprecated( 

954 "CSR support in pyOpenSSL is deprecated. You should use the APIs " 

955 "in cryptography." 

956) 

957class X509Req: 

958 """ 

959 An X.509 certificate signing requests. 

960 

961 .. deprecated:: 24.2.0 

962 Use `cryptography.x509.CertificateSigningRequest` instead. 

963 """ 

964 

965 def __init__(self) -> None: 

966 req = _lib.X509_REQ_new() 

967 self._req = _ffi.gc(req, _lib.X509_REQ_free) 

968 # Default to version 0. 

969 self.set_version(0) 

970 

971 def to_cryptography(self) -> x509.CertificateSigningRequest: 

972 """ 

973 Export as a ``cryptography`` certificate signing request. 

974 

975 :rtype: ``cryptography.x509.CertificateSigningRequest`` 

976 

977 .. versionadded:: 17.1.0 

978 """ 

979 from cryptography.x509 import load_der_x509_csr 

980 

981 der = _dump_certificate_request_internal(FILETYPE_ASN1, self) 

982 

983 return load_der_x509_csr(der) 

984 

985 @classmethod 

986 def from_cryptography( 

987 cls, crypto_req: x509.CertificateSigningRequest 

988 ) -> X509Req: 

989 """ 

990 Construct based on a ``cryptography`` *crypto_req*. 

991 

992 :param crypto_req: A ``cryptography`` X.509 certificate signing request 

993 :type crypto_req: ``cryptography.x509.CertificateSigningRequest`` 

994 

995 :rtype: X509Req 

996 

997 .. versionadded:: 17.1.0 

998 """ 

999 if not isinstance(crypto_req, x509.CertificateSigningRequest): 

1000 raise TypeError("Must be a certificate signing request") 

1001 

1002 from cryptography.hazmat.primitives.serialization import Encoding 

1003 

1004 der = crypto_req.public_bytes(Encoding.DER) 

1005 return _load_certificate_request_internal(FILETYPE_ASN1, der) 

1006 

1007 def set_pubkey(self, pkey: PKey) -> None: 

1008 """ 

1009 Set the public key of the certificate signing request. 

1010 

1011 :param pkey: The public key to use. 

1012 :type pkey: :py:class:`PKey` 

1013 

1014 :return: ``None`` 

1015 """ 

1016 set_result = _lib.X509_REQ_set_pubkey(self._req, pkey._pkey) 

1017 _openssl_assert(set_result == 1) 

1018 

1019 def get_pubkey(self) -> PKey: 

1020 """ 

1021 Get the public key of the certificate signing request. 

1022 

1023 :return: The public key. 

1024 :rtype: :py:class:`PKey` 

1025 """ 

1026 pkey = PKey.__new__(PKey) 

1027 pkey._pkey = _lib.X509_REQ_get_pubkey(self._req) 

1028 _openssl_assert(pkey._pkey != _ffi.NULL) 

1029 pkey._pkey = _ffi.gc(pkey._pkey, _lib.EVP_PKEY_free) 

1030 pkey._only_public = True 

1031 return pkey 

1032 

1033 def set_version(self, version: int) -> None: 

1034 """ 

1035 Set the version subfield (RFC 2986, section 4.1) of the certificate 

1036 request. 

1037 

1038 :param int version: The version number. 

1039 :return: ``None`` 

1040 """ 

1041 if not isinstance(version, int): 

1042 raise TypeError("version must be an int") 

1043 if version != 0: 

1044 raise ValueError( 

1045 "Invalid version. The only valid version for X509Req is 0." 

1046 ) 

1047 set_result = _lib.X509_REQ_set_version(self._req, version) 

1048 _openssl_assert(set_result == 1) 

1049 

1050 def get_version(self) -> int: 

1051 """ 

1052 Get the version subfield (RFC 2459, section 4.1.2.1) of the certificate 

1053 request. 

1054 

1055 :return: The value of the version subfield. 

1056 :rtype: :py:class:`int` 

1057 """ 

1058 return _lib.X509_REQ_get_version(self._req) 

1059 

1060 def get_subject(self) -> X509Name: 

1061 """ 

1062 Return the subject of this certificate signing request. 

1063 

1064 This creates a new :class:`X509Name` that wraps the underlying subject 

1065 name field on the certificate signing request. Modifying it will modify 

1066 the underlying signing request, and will have the effect of modifying 

1067 any other :class:`X509Name` that refers to this subject. 

1068 

1069 :return: The subject of this certificate signing request. 

1070 :rtype: :class:`X509Name` 

1071 """ 

1072 name = X509Name.__new__(X509Name) 

1073 name._name = _lib.X509_REQ_get_subject_name(self._req) 

1074 _openssl_assert(name._name != _ffi.NULL) 

1075 

1076 # The name is owned by the X509Req structure. As long as the X509Name 

1077 # Python object is alive, keep the X509Req Python object alive. 

1078 name._owner = self 

1079 

1080 return name 

1081 

1082 def add_extensions(self, extensions: Iterable[X509Extension]) -> None: 

1083 """ 

1084 Add extensions to the certificate signing request. 

1085 

1086 :param extensions: The X.509 extensions to add. 

1087 :type extensions: iterable of :py:class:`X509Extension` 

1088 :return: ``None`` 

1089 """ 

1090 warnings.warn( 

1091 ( 

1092 "This API is deprecated and will be removed in a future " 

1093 "version of pyOpenSSL. You should use pyca/cryptography's " 

1094 "X.509 APIs instead." 

1095 ), 

1096 DeprecationWarning, 

1097 stacklevel=2, 

1098 ) 

1099 

1100 stack = _lib.sk_X509_EXTENSION_new_null() 

1101 _openssl_assert(stack != _ffi.NULL) 

1102 

1103 stack = _ffi.gc(stack, _lib.sk_X509_EXTENSION_free) 

1104 

1105 for ext in extensions: 

1106 if not isinstance(ext, X509Extension): 

1107 raise ValueError("One of the elements is not an X509Extension") 

1108 

1109 # TODO push can fail (here and elsewhere) 

1110 _lib.sk_X509_EXTENSION_push(stack, ext._extension) 

1111 

1112 add_result = _lib.X509_REQ_add_extensions(self._req, stack) 

1113 _openssl_assert(add_result == 1) 

1114 

1115 def get_extensions(self) -> list[X509Extension]: 

1116 """ 

1117 Get X.509 extensions in the certificate signing request. 

1118 

1119 :return: The X.509 extensions in this request. 

1120 :rtype: :py:class:`list` of :py:class:`X509Extension` objects. 

1121 

1122 .. versionadded:: 0.15 

1123 """ 

1124 warnings.warn( 

1125 ( 

1126 "This API is deprecated and will be removed in a future " 

1127 "version of pyOpenSSL. You should use pyca/cryptography's " 

1128 "X.509 APIs instead." 

1129 ), 

1130 DeprecationWarning, 

1131 stacklevel=2, 

1132 ) 

1133 

1134 exts = [] 

1135 native_exts_obj = _lib.X509_REQ_get_extensions(self._req) 

1136 native_exts_obj = _ffi.gc( 

1137 native_exts_obj, 

1138 lambda x: _lib.sk_X509_EXTENSION_pop_free( 

1139 x, 

1140 _ffi.addressof(_lib._original_lib, "X509_EXTENSION_free"), 

1141 ), 

1142 ) 

1143 

1144 for i in range(_lib.sk_X509_EXTENSION_num(native_exts_obj)): 

1145 ext = X509Extension.__new__(X509Extension) 

1146 extension = _lib.X509_EXTENSION_dup( 

1147 _lib.sk_X509_EXTENSION_value(native_exts_obj, i) 

1148 ) 

1149 ext._extension = _ffi.gc(extension, _lib.X509_EXTENSION_free) 

1150 exts.append(ext) 

1151 return exts 

1152 

1153 def sign(self, pkey: PKey, digest: str) -> None: 

1154 """ 

1155 Sign the certificate signing request with this key and digest type. 

1156 

1157 :param pkey: The key pair to sign with. 

1158 :type pkey: :py:class:`PKey` 

1159 :param digest: The name of the message digest to use for the signature, 

1160 e.g. :py:data:`"sha256"`. 

1161 :type digest: :py:class:`str` 

1162 :return: ``None`` 

1163 """ 

1164 if pkey._only_public: 

1165 raise ValueError("Key has only public part") 

1166 

1167 if not pkey._initialized: 

1168 raise ValueError("Key is uninitialized") 

1169 

1170 digest_obj = _lib.EVP_get_digestbyname(_byte_string(digest)) 

1171 if digest_obj == _ffi.NULL: 

1172 raise ValueError("No such digest method") 

1173 

1174 sign_result = _lib.X509_REQ_sign(self._req, pkey._pkey, digest_obj) 

1175 _openssl_assert(sign_result > 0) 

1176 

1177 def verify(self, pkey: PKey) -> bool: 

1178 """ 

1179 Verifies the signature on this certificate signing request. 

1180 

1181 :param PKey key: A public key. 

1182 

1183 :return: ``True`` if the signature is correct. 

1184 :rtype: bool 

1185 

1186 :raises OpenSSL.crypto.Error: If the signature is invalid or there is a 

1187 problem verifying the signature. 

1188 """ 

1189 if not isinstance(pkey, PKey): 

1190 raise TypeError("pkey must be a PKey instance") 

1191 

1192 result = _lib.X509_REQ_verify(self._req, pkey._pkey) 

1193 if result <= 0: 

1194 _raise_current_error() 

1195 

1196 return result 

1197 

1198 

1199class X509: 

1200 """ 

1201 An X.509 certificate. 

1202 """ 

1203 

1204 def __init__(self) -> None: 

1205 x509 = _lib.X509_new() 

1206 _openssl_assert(x509 != _ffi.NULL) 

1207 self._x509 = _ffi.gc(x509, _lib.X509_free) 

1208 

1209 self._issuer_invalidator = _X509NameInvalidator() 

1210 self._subject_invalidator = _X509NameInvalidator() 

1211 

1212 @classmethod 

1213 def _from_raw_x509_ptr(cls, x509: Any) -> X509: 

1214 cert = cls.__new__(cls) 

1215 cert._x509 = _ffi.gc(x509, _lib.X509_free) 

1216 cert._issuer_invalidator = _X509NameInvalidator() 

1217 cert._subject_invalidator = _X509NameInvalidator() 

1218 return cert 

1219 

1220 def to_cryptography(self) -> x509.Certificate: 

1221 """ 

1222 Export as a ``cryptography`` certificate. 

1223 

1224 :rtype: ``cryptography.x509.Certificate`` 

1225 

1226 .. versionadded:: 17.1.0 

1227 """ 

1228 from cryptography.x509 import load_der_x509_certificate 

1229 

1230 der = dump_certificate(FILETYPE_ASN1, self) 

1231 return load_der_x509_certificate(der) 

1232 

1233 @classmethod 

1234 def from_cryptography(cls, crypto_cert: x509.Certificate) -> X509: 

1235 """ 

1236 Construct based on a ``cryptography`` *crypto_cert*. 

1237 

1238 :param crypto_key: A ``cryptography`` X.509 certificate. 

1239 :type crypto_key: ``cryptography.x509.Certificate`` 

1240 

1241 :rtype: X509 

1242 

1243 .. versionadded:: 17.1.0 

1244 """ 

1245 if not isinstance(crypto_cert, x509.Certificate): 

1246 raise TypeError("Must be a certificate") 

1247 

1248 from cryptography.hazmat.primitives.serialization import Encoding 

1249 

1250 der = crypto_cert.public_bytes(Encoding.DER) 

1251 return load_certificate(FILETYPE_ASN1, der) 

1252 

1253 def set_version(self, version: int) -> None: 

1254 """ 

1255 Set the version number of the certificate. Note that the 

1256 version value is zero-based, eg. a value of 0 is V1. 

1257 

1258 :param version: The version number of the certificate. 

1259 :type version: :py:class:`int` 

1260 

1261 :return: ``None`` 

1262 """ 

1263 if not isinstance(version, int): 

1264 raise TypeError("version must be an integer") 

1265 

1266 _openssl_assert(_lib.X509_set_version(self._x509, version) == 1) 

1267 

1268 def get_version(self) -> int: 

1269 """ 

1270 Return the version number of the certificate. 

1271 

1272 :return: The version number of the certificate. 

1273 :rtype: :py:class:`int` 

1274 """ 

1275 return _lib.X509_get_version(self._x509) 

1276 

1277 def get_pubkey(self) -> PKey: 

1278 """ 

1279 Get the public key of the certificate. 

1280 

1281 :return: The public key. 

1282 :rtype: :py:class:`PKey` 

1283 """ 

1284 pkey = PKey.__new__(PKey) 

1285 pkey._pkey = _lib.X509_get_pubkey(self._x509) 

1286 if pkey._pkey == _ffi.NULL: 

1287 _raise_current_error() 

1288 pkey._pkey = _ffi.gc(pkey._pkey, _lib.EVP_PKEY_free) 

1289 pkey._only_public = True 

1290 return pkey 

1291 

1292 def set_pubkey(self, pkey: PKey) -> None: 

1293 """ 

1294 Set the public key of the certificate. 

1295 

1296 :param pkey: The public key. 

1297 :type pkey: :py:class:`PKey` 

1298 

1299 :return: :py:data:`None` 

1300 """ 

1301 if not isinstance(pkey, PKey): 

1302 raise TypeError("pkey must be a PKey instance") 

1303 

1304 set_result = _lib.X509_set_pubkey(self._x509, pkey._pkey) 

1305 _openssl_assert(set_result == 1) 

1306 

1307 def sign(self, pkey: PKey, digest: str) -> None: 

1308 """ 

1309 Sign the certificate with this key and digest type. 

1310 

1311 :param pkey: The key to sign with. 

1312 :type pkey: :py:class:`PKey` 

1313 

1314 :param digest: The name of the message digest to use. 

1315 :type digest: :py:class:`str` 

1316 

1317 :return: :py:data:`None` 

1318 """ 

1319 if not isinstance(pkey, PKey): 

1320 raise TypeError("pkey must be a PKey instance") 

1321 

1322 if pkey._only_public: 

1323 raise ValueError("Key only has public part") 

1324 

1325 if not pkey._initialized: 

1326 raise ValueError("Key is uninitialized") 

1327 

1328 evp_md = _lib.EVP_get_digestbyname(_byte_string(digest)) 

1329 if evp_md == _ffi.NULL: 

1330 raise ValueError("No such digest method") 

1331 

1332 sign_result = _lib.X509_sign(self._x509, pkey._pkey, evp_md) 

1333 _openssl_assert(sign_result > 0) 

1334 

1335 def get_signature_algorithm(self) -> bytes: 

1336 """ 

1337 Return the signature algorithm used in the certificate. 

1338 

1339 :return: The name of the algorithm. 

1340 :rtype: :py:class:`bytes` 

1341 

1342 :raises ValueError: If the signature algorithm is undefined. 

1343 

1344 .. versionadded:: 0.13 

1345 """ 

1346 sig_alg = _lib.X509_get0_tbs_sigalg(self._x509) 

1347 alg = _ffi.new("ASN1_OBJECT **") 

1348 _lib.X509_ALGOR_get0(alg, _ffi.NULL, _ffi.NULL, sig_alg) 

1349 nid = _lib.OBJ_obj2nid(alg[0]) 

1350 if nid == _lib.NID_undef: 

1351 raise ValueError("Undefined signature algorithm") 

1352 return _ffi.string(_lib.OBJ_nid2ln(nid)) 

1353 

1354 def digest(self, digest_name: str) -> bytes: 

1355 """ 

1356 Return the digest of the X509 object. 

1357 

1358 :param digest_name: The name of the digest algorithm to use. 

1359 :type digest_name: :py:class:`str` 

1360 

1361 :return: The digest of the object, formatted as 

1362 :py:const:`b":"`-delimited hex pairs. 

1363 :rtype: :py:class:`bytes` 

1364 """ 

1365 digest = _lib.EVP_get_digestbyname(_byte_string(digest_name)) 

1366 if digest == _ffi.NULL: 

1367 raise ValueError("No such digest method") 

1368 

1369 result_buffer = _ffi.new("unsigned char[]", _lib.EVP_MAX_MD_SIZE) 

1370 result_length = _ffi.new("unsigned int[]", 1) 

1371 result_length[0] = len(result_buffer) 

1372 

1373 digest_result = _lib.X509_digest( 

1374 self._x509, digest, result_buffer, result_length 

1375 ) 

1376 _openssl_assert(digest_result == 1) 

1377 

1378 return b":".join( 

1379 [ 

1380 b16encode(ch).upper() 

1381 for ch in _ffi.buffer(result_buffer, result_length[0]) 

1382 ] 

1383 ) 

1384 

1385 def subject_name_hash(self) -> int: 

1386 """ 

1387 Return the hash of the X509 subject. 

1388 

1389 :return: The hash of the subject. 

1390 :rtype: :py:class:`int` 

1391 """ 

1392 return _lib.X509_subject_name_hash(self._x509) 

1393 

1394 def set_serial_number(self, serial: int) -> None: 

1395 """ 

1396 Set the serial number of the certificate. 

1397 

1398 :param serial: The new serial number. 

1399 :type serial: :py:class:`int` 

1400 

1401 :return: :py:data`None` 

1402 """ 

1403 if not isinstance(serial, int): 

1404 raise TypeError("serial must be an integer") 

1405 

1406 hex_serial = hex(serial)[2:] 

1407 hex_serial_bytes = hex_serial.encode("ascii") 

1408 

1409 bignum_serial = _ffi.new("BIGNUM**") 

1410 

1411 # BN_hex2bn stores the result in &bignum. 

1412 result = _lib.BN_hex2bn(bignum_serial, hex_serial_bytes) 

1413 _openssl_assert(result != _ffi.NULL) 

1414 

1415 asn1_serial = _lib.BN_to_ASN1_INTEGER(bignum_serial[0], _ffi.NULL) 

1416 _lib.BN_free(bignum_serial[0]) 

1417 _openssl_assert(asn1_serial != _ffi.NULL) 

1418 asn1_serial = _ffi.gc(asn1_serial, _lib.ASN1_INTEGER_free) 

1419 set_result = _lib.X509_set_serialNumber(self._x509, asn1_serial) 

1420 _openssl_assert(set_result == 1) 

1421 

1422 def get_serial_number(self) -> int: 

1423 """ 

1424 Return the serial number of this certificate. 

1425 

1426 :return: The serial number. 

1427 :rtype: int 

1428 """ 

1429 asn1_serial = _lib.X509_get_serialNumber(self._x509) 

1430 bignum_serial = _lib.ASN1_INTEGER_to_BN(asn1_serial, _ffi.NULL) 

1431 try: 

1432 hex_serial = _lib.BN_bn2hex(bignum_serial) 

1433 try: 

1434 hexstring_serial = _ffi.string(hex_serial) 

1435 serial = int(hexstring_serial, 16) 

1436 return serial 

1437 finally: 

1438 _lib.OPENSSL_free(hex_serial) 

1439 finally: 

1440 _lib.BN_free(bignum_serial) 

1441 

1442 def gmtime_adj_notAfter(self, amount: int) -> None: 

1443 """ 

1444 Adjust the time stamp on which the certificate stops being valid. 

1445 

1446 :param int amount: The number of seconds by which to adjust the 

1447 timestamp. 

1448 :return: ``None`` 

1449 """ 

1450 if not isinstance(amount, int): 

1451 raise TypeError("amount must be an integer") 

1452 

1453 notAfter = _lib.X509_getm_notAfter(self._x509) 

1454 _lib.X509_gmtime_adj(notAfter, amount) 

1455 

1456 def gmtime_adj_notBefore(self, amount: int) -> None: 

1457 """ 

1458 Adjust the timestamp on which the certificate starts being valid. 

1459 

1460 :param amount: The number of seconds by which to adjust the timestamp. 

1461 :return: ``None`` 

1462 """ 

1463 if not isinstance(amount, int): 

1464 raise TypeError("amount must be an integer") 

1465 

1466 notBefore = _lib.X509_getm_notBefore(self._x509) 

1467 _lib.X509_gmtime_adj(notBefore, amount) 

1468 

1469 def has_expired(self) -> bool: 

1470 """ 

1471 Check whether the certificate has expired. 

1472 

1473 :return: ``True`` if the certificate has expired, ``False`` otherwise. 

1474 :rtype: bool 

1475 """ 

1476 time_bytes = self.get_notAfter() 

1477 if time_bytes is None: 

1478 raise ValueError("Unable to determine notAfter") 

1479 time_string = time_bytes.decode("utf-8") 

1480 not_after = datetime.datetime.strptime(time_string, "%Y%m%d%H%M%SZ") 

1481 

1482 UTC = datetime.timezone.utc 

1483 utcnow = datetime.datetime.now(UTC).replace(tzinfo=None) 

1484 return not_after < utcnow 

1485 

1486 def _get_boundary_time(self, which: Any) -> bytes | None: 

1487 return _get_asn1_time(which(self._x509)) 

1488 

1489 def get_notBefore(self) -> bytes | None: 

1490 """ 

1491 Get the timestamp at which the certificate starts being valid. 

1492 

1493 The timestamp is formatted as an ASN.1 TIME:: 

1494 

1495 YYYYMMDDhhmmssZ 

1496 

1497 :return: A timestamp string, or ``None`` if there is none. 

1498 :rtype: bytes or NoneType 

1499 """ 

1500 return self._get_boundary_time(_lib.X509_getm_notBefore) 

1501 

1502 def _set_boundary_time( 

1503 self, which: Callable[..., Any], when: bytes 

1504 ) -> None: 

1505 return _set_asn1_time(which(self._x509), when) 

1506 

1507 def set_notBefore(self, when: bytes) -> None: 

1508 """ 

1509 Set the timestamp at which the certificate starts being valid. 

1510 

1511 The timestamp is formatted as an ASN.1 TIME:: 

1512 

1513 YYYYMMDDhhmmssZ 

1514 

1515 :param bytes when: A timestamp string. 

1516 :return: ``None`` 

1517 """ 

1518 return self._set_boundary_time(_lib.X509_getm_notBefore, when) 

1519 

1520 def get_notAfter(self) -> bytes | None: 

1521 """ 

1522 Get the timestamp at which the certificate stops being valid. 

1523 

1524 The timestamp is formatted as an ASN.1 TIME:: 

1525 

1526 YYYYMMDDhhmmssZ 

1527 

1528 :return: A timestamp string, or ``None`` if there is none. 

1529 :rtype: bytes or NoneType 

1530 """ 

1531 return self._get_boundary_time(_lib.X509_getm_notAfter) 

1532 

1533 def set_notAfter(self, when: bytes) -> None: 

1534 """ 

1535 Set the timestamp at which the certificate stops being valid. 

1536 

1537 The timestamp is formatted as an ASN.1 TIME:: 

1538 

1539 YYYYMMDDhhmmssZ 

1540 

1541 :param bytes when: A timestamp string. 

1542 :return: ``None`` 

1543 """ 

1544 return self._set_boundary_time(_lib.X509_getm_notAfter, when) 

1545 

1546 def _get_name(self, which: Any) -> X509Name: 

1547 name = X509Name.__new__(X509Name) 

1548 name._name = which(self._x509) 

1549 _openssl_assert(name._name != _ffi.NULL) 

1550 

1551 # The name is owned by the X509 structure. As long as the X509Name 

1552 # Python object is alive, keep the X509 Python object alive. 

1553 name._owner = self 

1554 

1555 return name 

1556 

1557 def _set_name(self, which: Any, name: X509Name) -> None: 

1558 if not isinstance(name, X509Name): 

1559 raise TypeError("name must be an X509Name") 

1560 set_result = which(self._x509, name._name) 

1561 _openssl_assert(set_result == 1) 

1562 

1563 def get_issuer(self) -> X509Name: 

1564 """ 

1565 Return the issuer of this certificate. 

1566 

1567 This creates a new :class:`X509Name` that wraps the underlying issuer 

1568 name field on the certificate. Modifying it will modify the underlying 

1569 certificate, and will have the effect of modifying any other 

1570 :class:`X509Name` that refers to this issuer. 

1571 

1572 :return: The issuer of this certificate. 

1573 :rtype: :class:`X509Name` 

1574 """ 

1575 name = self._get_name(_lib.X509_get_issuer_name) 

1576 self._issuer_invalidator.add(name) 

1577 return name 

1578 

1579 def set_issuer(self, issuer: X509Name) -> None: 

1580 """ 

1581 Set the issuer of this certificate. 

1582 

1583 :param issuer: The issuer. 

1584 :type issuer: :py:class:`X509Name` 

1585 

1586 :return: ``None`` 

1587 """ 

1588 self._set_name(_lib.X509_set_issuer_name, issuer) 

1589 self._issuer_invalidator.clear() 

1590 

1591 def get_subject(self) -> X509Name: 

1592 """ 

1593 Return the subject of this certificate. 

1594 

1595 This creates a new :class:`X509Name` that wraps the underlying subject 

1596 name field on the certificate. Modifying it will modify the underlying 

1597 certificate, and will have the effect of modifying any other 

1598 :class:`X509Name` that refers to this subject. 

1599 

1600 :return: The subject of this certificate. 

1601 :rtype: :class:`X509Name` 

1602 """ 

1603 name = self._get_name(_lib.X509_get_subject_name) 

1604 self._subject_invalidator.add(name) 

1605 return name 

1606 

1607 def set_subject(self, subject: X509Name) -> None: 

1608 """ 

1609 Set the subject of this certificate. 

1610 

1611 :param subject: The subject. 

1612 :type subject: :py:class:`X509Name` 

1613 

1614 :return: ``None`` 

1615 """ 

1616 self._set_name(_lib.X509_set_subject_name, subject) 

1617 self._subject_invalidator.clear() 

1618 

1619 def get_extension_count(self) -> int: 

1620 """ 

1621 Get the number of extensions on this certificate. 

1622 

1623 :return: The number of extensions. 

1624 :rtype: :py:class:`int` 

1625 

1626 .. versionadded:: 0.12 

1627 """ 

1628 return _lib.X509_get_ext_count(self._x509) 

1629 

1630 def add_extensions(self, extensions: Iterable[X509Extension]) -> None: 

1631 """ 

1632 Add extensions to the certificate. 

1633 

1634 :param extensions: The extensions to add. 

1635 :type extensions: An iterable of :py:class:`X509Extension` objects. 

1636 :return: ``None`` 

1637 """ 

1638 warnings.warn( 

1639 ( 

1640 "This API is deprecated and will be removed in a future " 

1641 "version of pyOpenSSL. You should use pyca/cryptography's " 

1642 "X.509 APIs instead." 

1643 ), 

1644 DeprecationWarning, 

1645 stacklevel=2, 

1646 ) 

1647 

1648 for ext in extensions: 

1649 if not isinstance(ext, X509Extension): 

1650 raise ValueError("One of the elements is not an X509Extension") 

1651 

1652 add_result = _lib.X509_add_ext(self._x509, ext._extension, -1) 

1653 _openssl_assert(add_result == 1) 

1654 

1655 def get_extension(self, index: int) -> X509Extension: 

1656 """ 

1657 Get a specific extension of the certificate by index. 

1658 

1659 Extensions on a certificate are kept in order. The index 

1660 parameter selects which extension will be returned. 

1661 

1662 :param int index: The index of the extension to retrieve. 

1663 :return: The extension at the specified index. 

1664 :rtype: :py:class:`X509Extension` 

1665 :raises IndexError: If the extension index was out of bounds. 

1666 

1667 .. versionadded:: 0.12 

1668 """ 

1669 warnings.warn( 

1670 ( 

1671 "This API is deprecated and will be removed in a future " 

1672 "version of pyOpenSSL. You should use pyca/cryptography's " 

1673 "X.509 APIs instead." 

1674 ), 

1675 DeprecationWarning, 

1676 stacklevel=2, 

1677 ) 

1678 

1679 ext = X509Extension.__new__(X509Extension) 

1680 ext._extension = _lib.X509_get_ext(self._x509, index) 

1681 if ext._extension == _ffi.NULL: 

1682 raise IndexError("extension index out of bounds") 

1683 

1684 extension = _lib.X509_EXTENSION_dup(ext._extension) 

1685 ext._extension = _ffi.gc(extension, _lib.X509_EXTENSION_free) 

1686 return ext 

1687 

1688 

1689class X509StoreFlags: 

1690 """ 

1691 Flags for X509 verification, used to change the behavior of 

1692 :class:`X509Store`. 

1693 

1694 See `OpenSSL Verification Flags`_ for details. 

1695 

1696 .. _OpenSSL Verification Flags: 

1697 https://www.openssl.org/docs/manmaster/man3/X509_VERIFY_PARAM_set_flags.html 

1698 """ 

1699 

1700 CRL_CHECK: int = _lib.X509_V_FLAG_CRL_CHECK 

1701 CRL_CHECK_ALL: int = _lib.X509_V_FLAG_CRL_CHECK_ALL 

1702 IGNORE_CRITICAL: int = _lib.X509_V_FLAG_IGNORE_CRITICAL 

1703 X509_STRICT: int = _lib.X509_V_FLAG_X509_STRICT 

1704 ALLOW_PROXY_CERTS: int = _lib.X509_V_FLAG_ALLOW_PROXY_CERTS 

1705 POLICY_CHECK: int = _lib.X509_V_FLAG_POLICY_CHECK 

1706 EXPLICIT_POLICY: int = _lib.X509_V_FLAG_EXPLICIT_POLICY 

1707 INHIBIT_MAP: int = _lib.X509_V_FLAG_INHIBIT_MAP 

1708 CHECK_SS_SIGNATURE: int = _lib.X509_V_FLAG_CHECK_SS_SIGNATURE 

1709 PARTIAL_CHAIN: int = _lib.X509_V_FLAG_PARTIAL_CHAIN 

1710 

1711 

1712class X509Store: 

1713 """ 

1714 An X.509 store. 

1715 

1716 An X.509 store is used to describe a context in which to verify a 

1717 certificate. A description of a context may include a set of certificates 

1718 to trust, a set of certificate revocation lists, verification flags and 

1719 more. 

1720 

1721 An X.509 store, being only a description, cannot be used by itself to 

1722 verify a certificate. To carry out the actual verification process, see 

1723 :class:`X509StoreContext`. 

1724 """ 

1725 

1726 def __init__(self) -> None: 

1727 store = _lib.X509_STORE_new() 

1728 self._store = _ffi.gc(store, _lib.X509_STORE_free) 

1729 

1730 def add_cert(self, cert: X509) -> None: 

1731 """ 

1732 Adds a trusted certificate to this store. 

1733 

1734 Adding a certificate with this method adds this certificate as a 

1735 *trusted* certificate. 

1736 

1737 :param X509 cert: The certificate to add to this store. 

1738 

1739 :raises TypeError: If the certificate is not an :class:`X509`. 

1740 

1741 :raises OpenSSL.crypto.Error: If OpenSSL was unhappy with your 

1742 certificate. 

1743 

1744 :return: ``None`` if the certificate was added successfully. 

1745 """ 

1746 if not isinstance(cert, X509): 

1747 raise TypeError() 

1748 

1749 res = _lib.X509_STORE_add_cert(self._store, cert._x509) 

1750 _openssl_assert(res == 1) 

1751 

1752 def add_crl(self, crl: x509.CertificateRevocationList) -> None: 

1753 """ 

1754 Add a certificate revocation list to this store. 

1755 

1756 The certificate revocation lists added to a store will only be used if 

1757 the associated flags are configured to check certificate revocation 

1758 lists. 

1759 

1760 .. versionadded:: 16.1.0 

1761 

1762 :param crl: The certificate revocation list to add to this store. 

1763 :type crl: ``cryptography.x509.CertificateRevocationList`` 

1764 :return: ``None`` if the certificate revocation list was added 

1765 successfully. 

1766 """ 

1767 if isinstance(crl, x509.CertificateRevocationList): 

1768 from cryptography.hazmat.primitives.serialization import Encoding 

1769 

1770 bio = _new_mem_buf(crl.public_bytes(Encoding.DER)) 

1771 openssl_crl = _lib.d2i_X509_CRL_bio(bio, _ffi.NULL) 

1772 _openssl_assert(openssl_crl != _ffi.NULL) 

1773 crl = _ffi.gc(openssl_crl, _lib.X509_CRL_free) 

1774 else: 

1775 raise TypeError( 

1776 "CRL must be of type " 

1777 "cryptography.x509.CertificateRevocationList" 

1778 ) 

1779 

1780 _openssl_assert(_lib.X509_STORE_add_crl(self._store, crl) != 0) 

1781 

1782 def set_flags(self, flags: int) -> None: 

1783 """ 

1784 Set verification flags to this store. 

1785 

1786 Verification flags can be combined by oring them together. 

1787 

1788 .. note:: 

1789 

1790 Setting a verification flag sometimes requires clients to add 

1791 additional information to the store, otherwise a suitable error will 

1792 be raised. 

1793 

1794 For example, in setting flags to enable CRL checking a 

1795 suitable CRL must be added to the store otherwise an error will be 

1796 raised. 

1797 

1798 .. versionadded:: 16.1.0 

1799 

1800 :param int flags: The verification flags to set on this store. 

1801 See :class:`X509StoreFlags` for available constants. 

1802 :return: ``None`` if the verification flags were successfully set. 

1803 """ 

1804 _openssl_assert(_lib.X509_STORE_set_flags(self._store, flags) != 0) 

1805 

1806 def set_time(self, vfy_time: datetime.datetime) -> None: 

1807 """ 

1808 Set the time against which the certificates are verified. 

1809 

1810 Normally the current time is used. 

1811 

1812 .. note:: 

1813 

1814 For example, you can determine if a certificate was valid at a given 

1815 time. 

1816 

1817 .. versionadded:: 17.0.0 

1818 

1819 :param datetime vfy_time: The verification time to set on this store. 

1820 :return: ``None`` if the verification time was successfully set. 

1821 """ 

1822 param = _lib.X509_VERIFY_PARAM_new() 

1823 param = _ffi.gc(param, _lib.X509_VERIFY_PARAM_free) 

1824 

1825 _lib.X509_VERIFY_PARAM_set_time( 

1826 param, calendar.timegm(vfy_time.timetuple()) 

1827 ) 

1828 _openssl_assert(_lib.X509_STORE_set1_param(self._store, param) != 0) 

1829 

1830 def load_locations( 

1831 self, 

1832 cafile: StrOrBytesPath | None, 

1833 capath: StrOrBytesPath | None = None, 

1834 ) -> None: 

1835 """ 

1836 Let X509Store know where we can find trusted certificates for the 

1837 certificate chain. Note that the certificates have to be in PEM 

1838 format. 

1839 

1840 If *capath* is passed, it must be a directory prepared using the 

1841 ``c_rehash`` tool included with OpenSSL. Either, but not both, of 

1842 *cafile* or *capath* may be ``None``. 

1843 

1844 .. note:: 

1845 

1846 Both *cafile* and *capath* may be set simultaneously. 

1847 

1848 Call this method multiple times to add more than one location. 

1849 For example, CA certificates, and certificate revocation list bundles 

1850 may be passed in *cafile* in subsequent calls to this method. 

1851 

1852 .. versionadded:: 20.0 

1853 

1854 :param cafile: In which file we can find the certificates (``bytes`` or 

1855 ``unicode``). 

1856 :param capath: In which directory we can find the certificates 

1857 (``bytes`` or ``unicode``). 

1858 

1859 :return: ``None`` if the locations were set successfully. 

1860 

1861 :raises OpenSSL.crypto.Error: If both *cafile* and *capath* is ``None`` 

1862 or the locations could not be set for any reason. 

1863 

1864 """ 

1865 if cafile is None: 

1866 cafile = _ffi.NULL 

1867 else: 

1868 cafile = _path_bytes(cafile) 

1869 

1870 if capath is None: 

1871 capath = _ffi.NULL 

1872 else: 

1873 capath = _path_bytes(capath) 

1874 

1875 load_result = _lib.X509_STORE_load_locations( 

1876 self._store, cafile, capath 

1877 ) 

1878 if not load_result: 

1879 _raise_current_error() 

1880 

1881 

1882class X509StoreContextError(Exception): 

1883 """ 

1884 An exception raised when an error occurred while verifying a certificate 

1885 using `OpenSSL.X509StoreContext.verify_certificate`. 

1886 

1887 :ivar certificate: The certificate which caused verificate failure. 

1888 :type certificate: :class:`X509` 

1889 """ 

1890 

1891 def __init__( 

1892 self, message: str, errors: list[Any], certificate: X509 

1893 ) -> None: 

1894 super().__init__(message) 

1895 self.errors = errors 

1896 self.certificate = certificate 

1897 

1898 

1899class X509StoreContext: 

1900 """ 

1901 An X.509 store context. 

1902 

1903 An X.509 store context is used to carry out the actual verification process 

1904 of a certificate in a described context. For describing such a context, see 

1905 :class:`X509Store`. 

1906 

1907 :param X509Store store: The certificates which will be trusted for the 

1908 purposes of any verifications. 

1909 :param X509 certificate: The certificate to be verified. 

1910 :param chain: List of untrusted certificates that may be used for building 

1911 the certificate chain. May be ``None``. 

1912 :type chain: :class:`list` of :class:`X509` 

1913 """ 

1914 

1915 def __init__( 

1916 self, 

1917 store: X509Store, 

1918 certificate: X509, 

1919 chain: Sequence[X509] | None = None, 

1920 ) -> None: 

1921 self._store = store 

1922 self._cert = certificate 

1923 self._chain = self._build_certificate_stack(chain) 

1924 

1925 @staticmethod 

1926 def _build_certificate_stack( 

1927 certificates: Sequence[X509] | None, 

1928 ) -> None: 

1929 def cleanup(s: Any) -> None: 

1930 # Equivalent to sk_X509_pop_free, but we don't 

1931 # currently have a CFFI binding for that available 

1932 for i in range(_lib.sk_X509_num(s)): 

1933 x = _lib.sk_X509_value(s, i) 

1934 _lib.X509_free(x) 

1935 _lib.sk_X509_free(s) 

1936 

1937 if certificates is None or len(certificates) == 0: 

1938 return _ffi.NULL 

1939 

1940 stack = _lib.sk_X509_new_null() 

1941 _openssl_assert(stack != _ffi.NULL) 

1942 stack = _ffi.gc(stack, cleanup) 

1943 

1944 for cert in certificates: 

1945 if not isinstance(cert, X509): 

1946 raise TypeError("One of the elements is not an X509 instance") 

1947 

1948 _openssl_assert(_lib.X509_up_ref(cert._x509) > 0) 

1949 if _lib.sk_X509_push(stack, cert._x509) <= 0: 

1950 _lib.X509_free(cert._x509) 

1951 _raise_current_error() 

1952 

1953 return stack 

1954 

1955 @staticmethod 

1956 def _exception_from_context(store_ctx: Any) -> X509StoreContextError: 

1957 """ 

1958 Convert an OpenSSL native context error failure into a Python 

1959 exception. 

1960 

1961 When a call to native OpenSSL X509_verify_cert fails, additional 

1962 information about the failure can be obtained from the store context. 

1963 """ 

1964 message = _ffi.string( 

1965 _lib.X509_verify_cert_error_string( 

1966 _lib.X509_STORE_CTX_get_error(store_ctx) 

1967 ) 

1968 ).decode("utf-8") 

1969 errors = [ 

1970 _lib.X509_STORE_CTX_get_error(store_ctx), 

1971 _lib.X509_STORE_CTX_get_error_depth(store_ctx), 

1972 message, 

1973 ] 

1974 # A context error should always be associated with a certificate, so we 

1975 # expect this call to never return :class:`None`. 

1976 _x509 = _lib.X509_STORE_CTX_get_current_cert(store_ctx) 

1977 _cert = _lib.X509_dup(_x509) 

1978 pycert = X509._from_raw_x509_ptr(_cert) 

1979 return X509StoreContextError(message, errors, pycert) 

1980 

1981 def _verify_certificate(self) -> Any: 

1982 """ 

1983 Verifies the certificate and runs an X509_STORE_CTX containing the 

1984 results. 

1985 

1986 :raises X509StoreContextError: If an error occurred when validating a 

1987 certificate in the context. Sets ``certificate`` attribute to 

1988 indicate which certificate caused the error. 

1989 """ 

1990 store_ctx = _lib.X509_STORE_CTX_new() 

1991 _openssl_assert(store_ctx != _ffi.NULL) 

1992 store_ctx = _ffi.gc(store_ctx, _lib.X509_STORE_CTX_free) 

1993 

1994 ret = _lib.X509_STORE_CTX_init( 

1995 store_ctx, self._store._store, self._cert._x509, self._chain 

1996 ) 

1997 _openssl_assert(ret == 1) 

1998 

1999 ret = _lib.X509_verify_cert(store_ctx) 

2000 if ret <= 0: 

2001 raise self._exception_from_context(store_ctx) 

2002 

2003 return store_ctx 

2004 

2005 def set_store(self, store: X509Store) -> None: 

2006 """ 

2007 Set the context's X.509 store. 

2008 

2009 .. versionadded:: 0.15 

2010 

2011 :param X509Store store: The store description which will be used for 

2012 the purposes of any *future* verifications. 

2013 """ 

2014 self._store = store 

2015 

2016 def verify_certificate(self) -> None: 

2017 """ 

2018 Verify a certificate in a context. 

2019 

2020 .. versionadded:: 0.15 

2021 

2022 :raises X509StoreContextError: If an error occurred when validating a 

2023 certificate in the context. Sets ``certificate`` attribute to 

2024 indicate which certificate caused the error. 

2025 """ 

2026 self._verify_certificate() 

2027 

2028 def get_verified_chain(self) -> list[X509]: 

2029 """ 

2030 Verify a certificate in a context and return the complete validated 

2031 chain. 

2032 

2033 :raises X509StoreContextError: If an error occurred when validating a 

2034 certificate in the context. Sets ``certificate`` attribute to 

2035 indicate which certificate caused the error. 

2036 

2037 .. versionadded:: 20.0 

2038 """ 

2039 store_ctx = self._verify_certificate() 

2040 

2041 # Note: X509_STORE_CTX_get1_chain returns a deep copy of the chain. 

2042 cert_stack = _lib.X509_STORE_CTX_get1_chain(store_ctx) 

2043 _openssl_assert(cert_stack != _ffi.NULL) 

2044 

2045 result = [] 

2046 for i in range(_lib.sk_X509_num(cert_stack)): 

2047 cert = _lib.sk_X509_value(cert_stack, i) 

2048 _openssl_assert(cert != _ffi.NULL) 

2049 pycert = X509._from_raw_x509_ptr(cert) 

2050 result.append(pycert) 

2051 

2052 # Free the stack but not the members which are freed by the X509 class. 

2053 _lib.sk_X509_free(cert_stack) 

2054 return result 

2055 

2056 

2057def load_certificate(type: int, buffer: bytes) -> X509: 

2058 """ 

2059 Load a certificate (X509) from the string *buffer* encoded with the 

2060 type *type*. 

2061 

2062 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1) 

2063 

2064 :param bytes buffer: The buffer the certificate is stored in 

2065 

2066 :return: The X509 object 

2067 """ 

2068 if isinstance(buffer, str): 

2069 buffer = buffer.encode("ascii") 

2070 

2071 bio = _new_mem_buf(buffer) 

2072 

2073 if type == FILETYPE_PEM: 

2074 x509 = _lib.PEM_read_bio_X509(bio, _ffi.NULL, _ffi.NULL, _ffi.NULL) 

2075 elif type == FILETYPE_ASN1: 

2076 x509 = _lib.d2i_X509_bio(bio, _ffi.NULL) 

2077 else: 

2078 raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1") 

2079 

2080 if x509 == _ffi.NULL: 

2081 _raise_current_error() 

2082 

2083 return X509._from_raw_x509_ptr(x509) 

2084 

2085 

2086def dump_certificate(type: int, cert: X509) -> bytes: 

2087 """ 

2088 Dump the certificate *cert* into a buffer string encoded with the type 

2089 *type*. 

2090 

2091 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1, or 

2092 FILETYPE_TEXT) 

2093 :param cert: The certificate to dump 

2094 :return: The buffer with the dumped certificate in 

2095 """ 

2096 bio = _new_mem_buf() 

2097 

2098 if type == FILETYPE_PEM: 

2099 result_code = _lib.PEM_write_bio_X509(bio, cert._x509) 

2100 elif type == FILETYPE_ASN1: 

2101 result_code = _lib.i2d_X509_bio(bio, cert._x509) 

2102 elif type == FILETYPE_TEXT: 

2103 result_code = _lib.X509_print_ex(bio, cert._x509, 0, 0) 

2104 else: 

2105 raise ValueError( 

2106 "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or " 

2107 "FILETYPE_TEXT" 

2108 ) 

2109 

2110 _openssl_assert(result_code == 1) 

2111 return _bio_to_string(bio) 

2112 

2113 

2114def dump_publickey(type: int, pkey: PKey) -> bytes: 

2115 """ 

2116 Dump a public key to a buffer. 

2117 

2118 :param type: The file type (one of :data:`FILETYPE_PEM` or 

2119 :data:`FILETYPE_ASN1`). 

2120 :param PKey pkey: The public key to dump 

2121 :return: The buffer with the dumped key in it. 

2122 :rtype: bytes 

2123 """ 

2124 bio = _new_mem_buf() 

2125 if type == FILETYPE_PEM: 

2126 write_bio = _lib.PEM_write_bio_PUBKEY 

2127 elif type == FILETYPE_ASN1: 

2128 write_bio = _lib.i2d_PUBKEY_bio 

2129 else: 

2130 raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1") 

2131 

2132 result_code = write_bio(bio, pkey._pkey) 

2133 if result_code != 1: # pragma: no cover 

2134 _raise_current_error() 

2135 

2136 return _bio_to_string(bio) 

2137 

2138 

2139def dump_privatekey( 

2140 type: int, 

2141 pkey: PKey, 

2142 cipher: str | None = None, 

2143 passphrase: PassphraseCallableT | None = None, 

2144) -> bytes: 

2145 """ 

2146 Dump the private key *pkey* into a buffer string encoded with the type 

2147 *type*. Optionally (if *type* is :const:`FILETYPE_PEM`) encrypting it 

2148 using *cipher* and *passphrase*. 

2149 

2150 :param type: The file type (one of :const:`FILETYPE_PEM`, 

2151 :const:`FILETYPE_ASN1`, or :const:`FILETYPE_TEXT`) 

2152 :param PKey pkey: The PKey to dump 

2153 :param cipher: (optional) if encrypted PEM format, the cipher to use 

2154 :param passphrase: (optional) if encrypted PEM format, this can be either 

2155 the passphrase to use, or a callback for providing the passphrase. 

2156 

2157 :return: The buffer with the dumped key in 

2158 :rtype: bytes 

2159 """ 

2160 bio = _new_mem_buf() 

2161 

2162 if not isinstance(pkey, PKey): 

2163 raise TypeError("pkey must be a PKey") 

2164 

2165 if cipher is not None: 

2166 if passphrase is None: 

2167 raise TypeError( 

2168 "if a value is given for cipher " 

2169 "one must also be given for passphrase" 

2170 ) 

2171 cipher_obj = _lib.EVP_get_cipherbyname(_byte_string(cipher)) 

2172 if cipher_obj == _ffi.NULL: 

2173 raise ValueError("Invalid cipher name") 

2174 else: 

2175 cipher_obj = _ffi.NULL 

2176 

2177 helper = _PassphraseHelper(type, passphrase) 

2178 if type == FILETYPE_PEM: 

2179 result_code = _lib.PEM_write_bio_PrivateKey( 

2180 bio, 

2181 pkey._pkey, 

2182 cipher_obj, 

2183 _ffi.NULL, 

2184 0, 

2185 helper.callback, 

2186 helper.callback_args, 

2187 ) 

2188 helper.raise_if_problem() 

2189 elif type == FILETYPE_ASN1: 

2190 result_code = _lib.i2d_PrivateKey_bio(bio, pkey._pkey) 

2191 elif type == FILETYPE_TEXT: 

2192 if _lib.EVP_PKEY_id(pkey._pkey) != _lib.EVP_PKEY_RSA: 

2193 raise TypeError("Only RSA keys are supported for FILETYPE_TEXT") 

2194 

2195 rsa = _ffi.gc(_lib.EVP_PKEY_get1_RSA(pkey._pkey), _lib.RSA_free) 

2196 result_code = _lib.RSA_print(bio, rsa, 0) 

2197 else: 

2198 raise ValueError( 

2199 "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or " 

2200 "FILETYPE_TEXT" 

2201 ) 

2202 

2203 _openssl_assert(result_code != 0) 

2204 

2205 return _bio_to_string(bio) 

2206 

2207 

2208class _PassphraseHelper: 

2209 def __init__( 

2210 self, 

2211 type: int, 

2212 passphrase: PassphraseCallableT | None, 

2213 more_args: bool = False, 

2214 truncate: bool = False, 

2215 ) -> None: 

2216 if type != FILETYPE_PEM and passphrase is not None: 

2217 raise ValueError( 

2218 "only FILETYPE_PEM key format supports encryption" 

2219 ) 

2220 self._passphrase = passphrase 

2221 self._more_args = more_args 

2222 self._truncate = truncate 

2223 self._problems: list[Exception] = [] 

2224 

2225 @property 

2226 def callback(self) -> Any: 

2227 if self._passphrase is None: 

2228 return _ffi.NULL 

2229 elif isinstance(self._passphrase, bytes) or callable(self._passphrase): 

2230 return _ffi.callback("pem_password_cb", self._read_passphrase) 

2231 else: 

2232 raise TypeError( 

2233 "Last argument must be a byte string or a callable." 

2234 ) 

2235 

2236 @property 

2237 def callback_args(self) -> Any: 

2238 if self._passphrase is None: 

2239 return _ffi.NULL 

2240 elif isinstance(self._passphrase, bytes) or callable(self._passphrase): 

2241 return _ffi.NULL 

2242 else: 

2243 raise TypeError( 

2244 "Last argument must be a byte string or a callable." 

2245 ) 

2246 

2247 def raise_if_problem(self, exceptionType: type[Exception] = Error) -> None: 

2248 if self._problems: 

2249 # Flush the OpenSSL error queue 

2250 try: 

2251 _exception_from_error_queue(exceptionType) 

2252 except exceptionType: 

2253 pass 

2254 

2255 raise self._problems.pop(0) 

2256 

2257 def _read_passphrase( 

2258 self, buf: Any, size: int, rwflag: Any, userdata: Any 

2259 ) -> int: 

2260 try: 

2261 if callable(self._passphrase): 

2262 if self._more_args: 

2263 result = self._passphrase(size, rwflag, userdata) 

2264 else: 

2265 result = self._passphrase(rwflag) 

2266 else: 

2267 assert self._passphrase is not None 

2268 result = self._passphrase 

2269 if not isinstance(result, bytes): 

2270 raise ValueError("Bytes expected") 

2271 if len(result) > size: 

2272 if self._truncate: 

2273 result = result[:size] 

2274 else: 

2275 raise ValueError( 

2276 "passphrase returned by callback is too long" 

2277 ) 

2278 for i in range(len(result)): 

2279 buf[i] = result[i : i + 1] 

2280 return len(result) 

2281 except Exception as e: 

2282 self._problems.append(e) 

2283 return 0 

2284 

2285 

2286def load_publickey(type: int, buffer: str | bytes) -> PKey: 

2287 """ 

2288 Load a public key from a buffer. 

2289 

2290 :param type: The file type (one of :data:`FILETYPE_PEM`, 

2291 :data:`FILETYPE_ASN1`). 

2292 :param buffer: The buffer the key is stored in. 

2293 :type buffer: A Python string object, either unicode or bytestring. 

2294 :return: The PKey object. 

2295 :rtype: :class:`PKey` 

2296 """ 

2297 if isinstance(buffer, str): 

2298 buffer = buffer.encode("ascii") 

2299 

2300 bio = _new_mem_buf(buffer) 

2301 

2302 if type == FILETYPE_PEM: 

2303 evp_pkey = _lib.PEM_read_bio_PUBKEY( 

2304 bio, _ffi.NULL, _ffi.NULL, _ffi.NULL 

2305 ) 

2306 elif type == FILETYPE_ASN1: 

2307 evp_pkey = _lib.d2i_PUBKEY_bio(bio, _ffi.NULL) 

2308 else: 

2309 raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1") 

2310 

2311 if evp_pkey == _ffi.NULL: 

2312 _raise_current_error() 

2313 

2314 pkey = PKey.__new__(PKey) 

2315 pkey._pkey = _ffi.gc(evp_pkey, _lib.EVP_PKEY_free) 

2316 pkey._only_public = True 

2317 return pkey 

2318 

2319 

2320def load_privatekey( 

2321 type: int, 

2322 buffer: str | bytes, 

2323 passphrase: PassphraseCallableT | None = None, 

2324) -> PKey: 

2325 """ 

2326 Load a private key (PKey) from the string *buffer* encoded with the type 

2327 *type*. 

2328 

2329 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1) 

2330 :param buffer: The buffer the key is stored in 

2331 :param passphrase: (optional) if encrypted PEM format, this can be 

2332 either the passphrase to use, or a callback for 

2333 providing the passphrase. 

2334 

2335 :return: The PKey object 

2336 """ 

2337 if isinstance(buffer, str): 

2338 buffer = buffer.encode("ascii") 

2339 

2340 bio = _new_mem_buf(buffer) 

2341 

2342 helper = _PassphraseHelper(type, passphrase) 

2343 if type == FILETYPE_PEM: 

2344 evp_pkey = _lib.PEM_read_bio_PrivateKey( 

2345 bio, _ffi.NULL, helper.callback, helper.callback_args 

2346 ) 

2347 helper.raise_if_problem() 

2348 elif type == FILETYPE_ASN1: 

2349 evp_pkey = _lib.d2i_PrivateKey_bio(bio, _ffi.NULL) 

2350 else: 

2351 raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1") 

2352 

2353 if evp_pkey == _ffi.NULL: 

2354 _raise_current_error() 

2355 

2356 pkey = PKey.__new__(PKey) 

2357 pkey._pkey = _ffi.gc(evp_pkey, _lib.EVP_PKEY_free) 

2358 return pkey 

2359 

2360 

2361def dump_certificate_request(type: int, req: X509Req) -> bytes: 

2362 """ 

2363 Dump the certificate request *req* into a buffer string encoded with the 

2364 type *type*. 

2365 

2366 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1) 

2367 :param req: The certificate request to dump 

2368 :return: The buffer with the dumped certificate request in 

2369 

2370 

2371 .. deprecated:: 24.2.0 

2372 Use `cryptography.x509.CertificateSigningRequest` instead. 

2373 """ 

2374 bio = _new_mem_buf() 

2375 

2376 if type == FILETYPE_PEM: 

2377 result_code = _lib.PEM_write_bio_X509_REQ(bio, req._req) 

2378 elif type == FILETYPE_ASN1: 

2379 result_code = _lib.i2d_X509_REQ_bio(bio, req._req) 

2380 elif type == FILETYPE_TEXT: 

2381 result_code = _lib.X509_REQ_print_ex(bio, req._req, 0, 0) 

2382 else: 

2383 raise ValueError( 

2384 "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or " 

2385 "FILETYPE_TEXT" 

2386 ) 

2387 

2388 _openssl_assert(result_code != 0) 

2389 

2390 return _bio_to_string(bio) 

2391 

2392 

2393_dump_certificate_request_internal = dump_certificate_request 

2394 

2395utils.deprecated( 

2396 dump_certificate_request, 

2397 __name__, 

2398 ( 

2399 "CSR support in pyOpenSSL is deprecated. You should use the APIs " 

2400 "in cryptography." 

2401 ), 

2402 DeprecationWarning, 

2403 name="dump_certificate_request", 

2404) 

2405 

2406 

2407def load_certificate_request(type: int, buffer: bytes) -> X509Req: 

2408 """ 

2409 Load a certificate request (X509Req) from the string *buffer* encoded with 

2410 the type *type*. 

2411 

2412 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1) 

2413 :param buffer: The buffer the certificate request is stored in 

2414 :return: The X509Req object 

2415 

2416 .. deprecated:: 24.2.0 

2417 Use `cryptography.x509.load_der_x509_csr` or 

2418 `cryptography.x509.load_pem_x509_csr` instead. 

2419 """ 

2420 if isinstance(buffer, str): 

2421 buffer = buffer.encode("ascii") 

2422 

2423 bio = _new_mem_buf(buffer) 

2424 

2425 if type == FILETYPE_PEM: 

2426 req = _lib.PEM_read_bio_X509_REQ(bio, _ffi.NULL, _ffi.NULL, _ffi.NULL) 

2427 elif type == FILETYPE_ASN1: 

2428 req = _lib.d2i_X509_REQ_bio(bio, _ffi.NULL) 

2429 else: 

2430 raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1") 

2431 

2432 _openssl_assert(req != _ffi.NULL) 

2433 

2434 x509req = X509Req.__new__(X509Req) 

2435 x509req._req = _ffi.gc(req, _lib.X509_REQ_free) 

2436 return x509req 

2437 

2438 

2439_load_certificate_request_internal = load_certificate_request 

2440 

2441utils.deprecated( 

2442 load_certificate_request, 

2443 __name__, 

2444 ( 

2445 "CSR support in pyOpenSSL is deprecated. You should use the APIs " 

2446 "in cryptography." 

2447 ), 

2448 DeprecationWarning, 

2449 name="load_certificate_request", 

2450)