Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/OpenSSL/crypto.py: 29%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

870 statements  

1from __future__ import annotations 

2 

3import calendar 

4import datetime 

5import functools 

6import sys 

7import typing 

8import warnings 

9from base64 import b16encode 

10from collections.abc import Iterable, Sequence 

11from functools import partial 

12from typing import ( 

13 Any, 

14 Callable, 

15 Union, 

16) 

17 

18if sys.version_info >= (3, 13): 

19 from warnings import deprecated 

20elif sys.version_info < (3, 8): 

21 _T = typing.TypeVar("T") 

22 

23 def deprecated(msg: str, **kwargs: object) -> Callable[[_T], _T]: 

24 return lambda f: f 

25else: 

26 from typing_extensions import deprecated 

27 

28from cryptography import utils, x509 

29from cryptography.hazmat.primitives.asymmetric import ( 

30 dsa, 

31 ec, 

32 ed448, 

33 ed25519, 

34 rsa, 

35) 

36 

37from OpenSSL._util import StrOrBytesPath 

38from OpenSSL._util import ( 

39 byte_string as _byte_string, 

40) 

41from OpenSSL._util import ( 

42 exception_from_error_queue as _exception_from_error_queue, 

43) 

44from OpenSSL._util import ( 

45 ffi as _ffi, 

46) 

47from OpenSSL._util import ( 

48 lib as _lib, 

49) 

50from OpenSSL._util import ( 

51 make_assert as _make_assert, 

52) 

53from OpenSSL._util import ( 

54 path_bytes as _path_bytes, 

55) 

56 

57__all__ = [ 

58 "FILETYPE_ASN1", 

59 "FILETYPE_PEM", 

60 "FILETYPE_TEXT", 

61 "TYPE_DSA", 

62 "TYPE_RSA", 

63 "X509", 

64 "Error", 

65 "PKey", 

66 "X509Extension", 

67 "X509Name", 

68 "X509Req", 

69 "X509Store", 

70 "X509StoreContext", 

71 "X509StoreContextError", 

72 "X509StoreFlags", 

73 "dump_certificate", 

74 "dump_certificate_request", 

75 "dump_privatekey", 

76 "dump_publickey", 

77 "get_elliptic_curve", 

78 "get_elliptic_curves", 

79 "load_certificate", 

80 "load_certificate_request", 

81 "load_privatekey", 

82 "load_publickey", 

83] 

84 

85 

86_PrivateKey = Union[ 

87 dsa.DSAPrivateKey, 

88 ec.EllipticCurvePrivateKey, 

89 ed25519.Ed25519PrivateKey, 

90 ed448.Ed448PrivateKey, 

91 rsa.RSAPrivateKey, 

92] 

93_PublicKey = Union[ 

94 dsa.DSAPublicKey, 

95 ec.EllipticCurvePublicKey, 

96 ed25519.Ed25519PublicKey, 

97 ed448.Ed448PublicKey, 

98 rsa.RSAPublicKey, 

99] 

100_Key = Union[_PrivateKey, _PublicKey] 

101PassphraseCallableT = Union[bytes, Callable[..., bytes]] 

102 

103 

104FILETYPE_PEM: int = _lib.SSL_FILETYPE_PEM 

105FILETYPE_ASN1: int = _lib.SSL_FILETYPE_ASN1 

106 

107# TODO This was an API mistake. OpenSSL has no such constant. 

108FILETYPE_TEXT = 2**16 - 1 

109 

110TYPE_RSA: int = _lib.EVP_PKEY_RSA 

111TYPE_DSA: int = _lib.EVP_PKEY_DSA 

112TYPE_DH: int = _lib.EVP_PKEY_DH 

113TYPE_EC: int = _lib.EVP_PKEY_EC 

114 

115 

116class Error(Exception): 

117 """ 

118 An error occurred in an `OpenSSL.crypto` API. 

119 """ 

120 

121 

122_raise_current_error = partial(_exception_from_error_queue, Error) 

123_openssl_assert = _make_assert(Error) 

124 

125 

126def _new_mem_buf(buffer: bytes | None = None) -> Any: 

127 """ 

128 Allocate a new OpenSSL memory BIO. 

129 

130 Arrange for the garbage collector to clean it up automatically. 

131 

132 :param buffer: None or some bytes to use to put into the BIO so that they 

133 can be read out. 

134 """ 

135 if buffer is None: 

136 bio = _lib.BIO_new(_lib.BIO_s_mem()) 

137 free = _lib.BIO_free 

138 else: 

139 data = _ffi.new("char[]", buffer) 

140 bio = _lib.BIO_new_mem_buf(data, len(buffer)) 

141 

142 # Keep the memory alive as long as the bio is alive! 

143 def free(bio: Any, ref: Any = data) -> Any: 

144 return _lib.BIO_free(bio) 

145 

146 _openssl_assert(bio != _ffi.NULL) 

147 

148 bio = _ffi.gc(bio, free) 

149 return bio 

150 

151 

152def _bio_to_string(bio: Any) -> bytes: 

153 """ 

154 Copy the contents of an OpenSSL BIO object into a Python byte string. 

155 """ 

156 result_buffer = _ffi.new("char**") 

157 buffer_length = _lib.BIO_get_mem_data(bio, result_buffer) 

158 return _ffi.buffer(result_buffer[0], buffer_length)[:] 

159 

160 

161def _set_asn1_time(boundary: Any, when: bytes) -> None: 

162 """ 

163 The the time value of an ASN1 time object. 

164 

165 @param boundary: An ASN1_TIME pointer (or an object safely 

166 castable to that type) which will have its value set. 

167 @param when: A string representation of the desired time value. 

168 

169 @raise TypeError: If C{when} is not a L{bytes} string. 

170 @raise ValueError: If C{when} does not represent a time in the required 

171 format. 

172 @raise RuntimeError: If the time value cannot be set for some other 

173 (unspecified) reason. 

174 """ 

175 if not isinstance(when, bytes): 

176 raise TypeError("when must be a byte string") 

177 # ASN1_TIME_set_string validates the string without writing anything 

178 # when the destination is NULL. 

179 _openssl_assert(boundary != _ffi.NULL) 

180 

181 set_result = _lib.ASN1_TIME_set_string(boundary, when) 

182 if set_result == 0: 

183 raise ValueError("Invalid string") 

184 

185 

186def _new_asn1_time(when: bytes) -> Any: 

187 """ 

188 Behaves like _set_asn1_time but returns a new ASN1_TIME object. 

189 

190 @param when: A string representation of the desired time value. 

191 

192 @raise TypeError: If C{when} is not a L{bytes} string. 

193 @raise ValueError: If C{when} does not represent a time in the required 

194 format. 

195 @raise RuntimeError: If the time value cannot be set for some other 

196 (unspecified) reason. 

197 """ 

198 ret = _lib.ASN1_TIME_new() 

199 _openssl_assert(ret != _ffi.NULL) 

200 ret = _ffi.gc(ret, _lib.ASN1_TIME_free) 

201 _set_asn1_time(ret, when) 

202 return ret 

203 

204 

205def _get_asn1_time(timestamp: Any) -> bytes | None: 

206 """ 

207 Retrieve the time value of an ASN1 time object. 

208 

209 @param timestamp: An ASN1_GENERALIZEDTIME* (or an object safely castable to 

210 that type) from which the time value will be retrieved. 

211 

212 @return: The time value from C{timestamp} as a L{bytes} string in a certain 

213 format. Or C{None} if the object contains no time value. 

214 """ 

215 string_timestamp = _ffi.cast("ASN1_STRING*", timestamp) 

216 if _lib.ASN1_STRING_length(string_timestamp) == 0: 

217 return None 

218 elif ( 

219 _lib.ASN1_STRING_type(string_timestamp) == _lib.V_ASN1_GENERALIZEDTIME 

220 ): 

221 return _ffi.string(_lib.ASN1_STRING_get0_data(string_timestamp)) 

222 else: 

223 generalized_timestamp = _ffi.new("ASN1_GENERALIZEDTIME**") 

224 _lib.ASN1_TIME_to_generalizedtime(timestamp, generalized_timestamp) 

225 _openssl_assert(generalized_timestamp[0] != _ffi.NULL) 

226 

227 string_timestamp = _ffi.cast("ASN1_STRING*", generalized_timestamp[0]) 

228 string_data = _lib.ASN1_STRING_get0_data(string_timestamp) 

229 string_result = _ffi.string(string_data) 

230 _lib.ASN1_GENERALIZEDTIME_free(generalized_timestamp[0]) 

231 return string_result 

232 

233 

234class _X509NameInvalidator: 

235 def __init__(self) -> None: 

236 self._names: list[X509Name] = [] 

237 

238 def add(self, name: X509Name) -> None: 

239 self._names.append(name) 

240 

241 def clear(self) -> None: 

242 for name in self._names: 

243 # Breaks the object, but also prevents UAF! 

244 del name._name 

245 

246 

247class PKey: 

248 """ 

249 A class representing an DSA or RSA public key or key pair. 

250 """ 

251 

252 _only_public = False 

253 _initialized = True 

254 

255 def __init__(self) -> None: 

256 pkey = _lib.EVP_PKEY_new() 

257 self._pkey = _ffi.gc(pkey, _lib.EVP_PKEY_free) 

258 self._initialized = False 

259 

260 def to_cryptography_key(self) -> _Key: 

261 """ 

262 Export as a ``cryptography`` key. 

263 

264 :rtype: One of ``cryptography``'s `key interfaces`_. 

265 

266 .. _key interfaces: https://cryptography.io/en/latest/hazmat/\ 

267 primitives/asymmetric/rsa/#key-interfaces 

268 

269 .. versionadded:: 16.1.0 

270 """ 

271 from cryptography.hazmat.primitives.serialization import ( 

272 load_der_private_key, 

273 load_der_public_key, 

274 ) 

275 

276 if self._only_public: 

277 der = dump_publickey(FILETYPE_ASN1, self) 

278 return typing.cast(_Key, load_der_public_key(der)) 

279 else: 

280 der = dump_privatekey(FILETYPE_ASN1, self) 

281 return typing.cast(_Key, load_der_private_key(der, password=None)) 

282 

283 @classmethod 

284 def from_cryptography_key(cls, crypto_key: _Key) -> PKey: 

285 """ 

286 Construct based on a ``cryptography`` *crypto_key*. 

287 

288 :param crypto_key: A ``cryptography`` key. 

289 :type crypto_key: One of ``cryptography``'s `key interfaces`_. 

290 

291 :rtype: PKey 

292 

293 .. versionadded:: 16.1.0 

294 """ 

295 if not isinstance( 

296 crypto_key, 

297 ( 

298 dsa.DSAPrivateKey, 

299 dsa.DSAPublicKey, 

300 ec.EllipticCurvePrivateKey, 

301 ec.EllipticCurvePublicKey, 

302 ed25519.Ed25519PrivateKey, 

303 ed25519.Ed25519PublicKey, 

304 ed448.Ed448PrivateKey, 

305 ed448.Ed448PublicKey, 

306 rsa.RSAPrivateKey, 

307 rsa.RSAPublicKey, 

308 ), 

309 ): 

310 raise TypeError("Unsupported key type") 

311 

312 from cryptography.hazmat.primitives.serialization import ( 

313 Encoding, 

314 NoEncryption, 

315 PrivateFormat, 

316 PublicFormat, 

317 ) 

318 

319 if isinstance( 

320 crypto_key, 

321 ( 

322 dsa.DSAPublicKey, 

323 ec.EllipticCurvePublicKey, 

324 ed25519.Ed25519PublicKey, 

325 ed448.Ed448PublicKey, 

326 rsa.RSAPublicKey, 

327 ), 

328 ): 

329 return load_publickey( 

330 FILETYPE_ASN1, 

331 crypto_key.public_bytes( 

332 Encoding.DER, PublicFormat.SubjectPublicKeyInfo 

333 ), 

334 ) 

335 else: 

336 der = crypto_key.private_bytes( 

337 Encoding.DER, PrivateFormat.PKCS8, NoEncryption() 

338 ) 

339 return load_privatekey(FILETYPE_ASN1, der) 

340 

341 def generate_key(self, type: int, bits: int) -> None: 

342 """ 

343 Generate a key pair of the given type, with the given number of bits. 

344 

345 This generates a key "into" the this object. 

346 

347 :param type: The key type. 

348 :type type: :py:data:`TYPE_RSA` or :py:data:`TYPE_DSA` 

349 :param bits: The number of bits. 

350 :type bits: :py:data:`int` ``>= 0`` 

351 :raises TypeError: If :py:data:`type` or :py:data:`bits` isn't 

352 of the appropriate type. 

353 :raises ValueError: If the number of bits isn't an integer of 

354 the appropriate size. 

355 :return: ``None`` 

356 """ 

357 if not isinstance(type, int): 

358 raise TypeError("type must be an integer") 

359 

360 if not isinstance(bits, int): 

361 raise TypeError("bits must be an integer") 

362 

363 if type == TYPE_RSA: 

364 if bits <= 0: 

365 raise ValueError("Invalid number of bits") 

366 

367 # TODO Check error return 

368 exponent = _lib.BN_new() 

369 exponent = _ffi.gc(exponent, _lib.BN_free) 

370 _lib.BN_set_word(exponent, _lib.RSA_F4) 

371 

372 rsa = _lib.RSA_new() 

373 

374 result = _lib.RSA_generate_key_ex(rsa, bits, exponent, _ffi.NULL) 

375 _openssl_assert(result == 1) 

376 

377 result = _lib.EVP_PKEY_assign_RSA(self._pkey, rsa) 

378 _openssl_assert(result == 1) 

379 

380 elif type == TYPE_DSA: 

381 dsa = _lib.DSA_new() 

382 _openssl_assert(dsa != _ffi.NULL) 

383 

384 dsa = _ffi.gc(dsa, _lib.DSA_free) 

385 res = _lib.DSA_generate_parameters_ex( 

386 dsa, bits, _ffi.NULL, 0, _ffi.NULL, _ffi.NULL, _ffi.NULL 

387 ) 

388 _openssl_assert(res == 1) 

389 

390 _openssl_assert(_lib.DSA_generate_key(dsa) == 1) 

391 _openssl_assert(_lib.EVP_PKEY_set1_DSA(self._pkey, dsa) == 1) 

392 else: 

393 raise Error("No such key type") 

394 

395 self._initialized = True 

396 

397 def check(self) -> bool: 

398 """ 

399 Check the consistency of an RSA private key. 

400 

401 This is the Python equivalent of OpenSSL's ``RSA_check_key``. 

402 

403 :return: ``True`` if key is consistent. 

404 

405 :raise OpenSSL.crypto.Error: if the key is inconsistent. 

406 

407 :raise TypeError: if the key is of a type which cannot be checked. 

408 Only RSA keys can currently be checked. 

409 """ 

410 if self._only_public: 

411 raise TypeError("public key only") 

412 

413 if _lib.EVP_PKEY_type(self.type()) != _lib.EVP_PKEY_RSA: 

414 raise TypeError("Only RSA keys can currently be checked.") 

415 

416 rsa = _lib.EVP_PKEY_get1_RSA(self._pkey) 

417 rsa = _ffi.gc(rsa, _lib.RSA_free) 

418 result = _lib.RSA_check_key(rsa) 

419 if result == 1: 

420 return True 

421 _raise_current_error() 

422 

423 def type(self) -> int: 

424 """ 

425 Returns the type of the key 

426 

427 :return: The type of the key. 

428 """ 

429 return _lib.EVP_PKEY_id(self._pkey) 

430 

431 def bits(self) -> int: 

432 """ 

433 Returns the number of bits of the key 

434 

435 :return: The number of bits of the key. 

436 """ 

437 return _lib.EVP_PKEY_bits(self._pkey) 

438 

439 

440class _EllipticCurve: 

441 """ 

442 A representation of a supported elliptic curve. 

443 

444 @cvar _curves: :py:obj:`None` until an attempt is made to load the curves. 

445 Thereafter, a :py:type:`set` containing :py:type:`_EllipticCurve` 

446 instances each of which represents one curve supported by the system. 

447 @type _curves: :py:type:`NoneType` or :py:type:`set` 

448 """ 

449 

450 _curves = None 

451 

452 def __ne__(self, other: Any) -> bool: 

453 """ 

454 Implement cooperation with the right-hand side argument of ``!=``. 

455 

456 Python 3 seems to have dropped this cooperation in this very narrow 

457 circumstance. 

458 """ 

459 if isinstance(other, _EllipticCurve): 

460 return super().__ne__(other) 

461 return NotImplemented 

462 

463 @classmethod 

464 def _load_elliptic_curves(cls, lib: Any) -> set[_EllipticCurve]: 

465 """ 

466 Get the curves supported by OpenSSL. 

467 

468 :param lib: The OpenSSL library binding object. 

469 

470 :return: A :py:type:`set` of ``cls`` instances giving the names of the 

471 elliptic curves the underlying library supports. 

472 """ 

473 num_curves = lib.EC_get_builtin_curves(_ffi.NULL, 0) 

474 builtin_curves = _ffi.new("EC_builtin_curve[]", num_curves) 

475 # The return value on this call should be num_curves again. We 

476 # could check it to make sure but if it *isn't* then.. what could 

477 # we do? Abort the whole process, I suppose...? -exarkun 

478 lib.EC_get_builtin_curves(builtin_curves, num_curves) 

479 return set(cls.from_nid(lib, c.nid) for c in builtin_curves) 

480 

481 @classmethod 

482 def _get_elliptic_curves(cls, lib: Any) -> set[_EllipticCurve]: 

483 """ 

484 Get, cache, and return the curves supported by OpenSSL. 

485 

486 :param lib: The OpenSSL library binding object. 

487 

488 :return: A :py:type:`set` of ``cls`` instances giving the names of the 

489 elliptic curves the underlying library supports. 

490 """ 

491 if cls._curves is None: 

492 cls._curves = cls._load_elliptic_curves(lib) 

493 return cls._curves 

494 

495 @classmethod 

496 def from_nid(cls, lib: Any, nid: int) -> _EllipticCurve: 

497 """ 

498 Instantiate a new :py:class:`_EllipticCurve` associated with the given 

499 OpenSSL NID. 

500 

501 :param lib: The OpenSSL library binding object. 

502 

503 :param nid: The OpenSSL NID the resulting curve object will represent. 

504 This must be a curve NID (and not, for example, a hash NID) or 

505 subsequent operations will fail in unpredictable ways. 

506 :type nid: :py:class:`int` 

507 

508 :return: The curve object. 

509 """ 

510 return cls(lib, nid, _ffi.string(lib.OBJ_nid2sn(nid)).decode("ascii")) 

511 

512 def __init__(self, lib: Any, nid: int, name: str) -> None: 

513 """ 

514 :param _lib: The :py:mod:`cryptography` binding instance used to 

515 interface with OpenSSL. 

516 

517 :param _nid: The OpenSSL NID identifying the curve this object 

518 represents. 

519 :type _nid: :py:class:`int` 

520 

521 :param name: The OpenSSL short name identifying the curve this object 

522 represents. 

523 :type name: :py:class:`unicode` 

524 """ 

525 self._lib = lib 

526 self._nid = nid 

527 self.name = name 

528 

529 def __repr__(self) -> str: 

530 return f"<Curve {self.name!r}>" 

531 

532 def _to_EC_KEY(self) -> Any: 

533 """ 

534 Create a new OpenSSL EC_KEY structure initialized to use this curve. 

535 

536 The structure is automatically garbage collected when the Python object 

537 is garbage collected. 

538 """ 

539 key = self._lib.EC_KEY_new_by_curve_name(self._nid) 

540 return _ffi.gc(key, _lib.EC_KEY_free) 

541 

542 

543@deprecated( 

544 "get_elliptic_curves is deprecated. You should use the APIs in " 

545 "cryptography instead." 

546) 

547def get_elliptic_curves() -> set[_EllipticCurve]: 

548 """ 

549 Return a set of objects representing the elliptic curves supported in the 

550 OpenSSL build in use. 

551 

552 The curve objects have a :py:class:`unicode` ``name`` attribute by which 

553 they identify themselves. 

554 

555 The curve objects are useful as values for the argument accepted by 

556 :py:meth:`Context.set_tmp_ecdh` to specify which elliptical curve should be 

557 used for ECDHE key exchange. 

558 """ 

559 return _EllipticCurve._get_elliptic_curves(_lib) 

560 

561 

562@deprecated( 

563 "get_elliptic_curve is deprecated. You should use the APIs in " 

564 "cryptography instead." 

565) 

566def get_elliptic_curve(name: str) -> _EllipticCurve: 

567 """ 

568 Return a single curve object selected by name. 

569 

570 See :py:func:`get_elliptic_curves` for information about curve objects. 

571 

572 :param name: The OpenSSL short name identifying the curve object to 

573 retrieve. 

574 :type name: :py:class:`unicode` 

575 

576 If the named curve is not supported then :py:class:`ValueError` is raised. 

577 """ 

578 for curve in get_elliptic_curves(): 

579 if curve.name == name: 

580 return curve 

581 raise ValueError("unknown curve name", name) 

582 

583 

584@functools.total_ordering 

585class X509Name: 

586 """ 

587 An X.509 Distinguished Name. 

588 

589 :ivar countryName: The country of the entity. 

590 :ivar C: Alias for :py:attr:`countryName`. 

591 

592 :ivar stateOrProvinceName: The state or province of the entity. 

593 :ivar ST: Alias for :py:attr:`stateOrProvinceName`. 

594 

595 :ivar localityName: The locality of the entity. 

596 :ivar L: Alias for :py:attr:`localityName`. 

597 

598 :ivar organizationName: The organization name of the entity. 

599 :ivar O: Alias for :py:attr:`organizationName`. 

600 

601 :ivar organizationalUnitName: The organizational unit of the entity. 

602 :ivar OU: Alias for :py:attr:`organizationalUnitName` 

603 

604 :ivar commonName: The common name of the entity. 

605 :ivar CN: Alias for :py:attr:`commonName`. 

606 

607 :ivar emailAddress: The e-mail address of the entity. 

608 """ 

609 

610 def __init__(self, name: X509Name) -> None: 

611 """ 

612 Create a new X509Name, copying the given X509Name instance. 

613 

614 :param name: The name to copy. 

615 :type name: :py:class:`X509Name` 

616 """ 

617 name = _lib.X509_NAME_dup(name._name) 

618 self._name: Any = _ffi.gc(name, _lib.X509_NAME_free) 

619 

620 def __setattr__(self, name: str, value: Any) -> None: 

621 if name.startswith("_"): 

622 return super().__setattr__(name, value) 

623 

624 # Note: we really do not want str subclasses here, so we do not use 

625 # isinstance. 

626 if type(name) is not str: 

627 raise TypeError( 

628 f"attribute name must be string, not " 

629 f"'{type(value).__name__:.200}'" 

630 ) 

631 

632 nid = _lib.OBJ_txt2nid(_byte_string(name)) 

633 if nid == _lib.NID_undef: 

634 try: 

635 _raise_current_error() 

636 except Error: 

637 pass 

638 raise AttributeError("No such attribute") 

639 

640 # If there's an old entry for this NID, remove it 

641 for i in range(_lib.X509_NAME_entry_count(self._name)): 

642 ent = _lib.X509_NAME_get_entry(self._name, i) 

643 ent_obj = _lib.X509_NAME_ENTRY_get_object(ent) 

644 ent_nid = _lib.OBJ_obj2nid(ent_obj) 

645 if nid == ent_nid: 

646 ent = _lib.X509_NAME_delete_entry(self._name, i) 

647 _lib.X509_NAME_ENTRY_free(ent) 

648 break 

649 

650 if isinstance(value, str): 

651 value = value.encode("utf-8") 

652 

653 add_result = _lib.X509_NAME_add_entry_by_NID( 

654 self._name, nid, _lib.MBSTRING_UTF8, value, -1, -1, 0 

655 ) 

656 if not add_result: 

657 _raise_current_error() 

658 

659 def __getattr__(self, name: str) -> str | None: 

660 """ 

661 Find attribute. An X509Name object has the following attributes: 

662 countryName (alias C), stateOrProvince (alias ST), locality (alias L), 

663 organization (alias O), organizationalUnit (alias OU), commonName 

664 (alias CN) and more... 

665 """ 

666 nid = _lib.OBJ_txt2nid(_byte_string(name)) 

667 if nid == _lib.NID_undef: 

668 # This is a bit weird. OBJ_txt2nid indicated failure, but it seems 

669 # a lower level function, a2d_ASN1_OBJECT, also feels the need to 

670 # push something onto the error queue. If we don't clean that up 

671 # now, someone else will bump into it later and be quite confused. 

672 # See lp#314814. 

673 try: 

674 _raise_current_error() 

675 except Error: 

676 pass 

677 raise AttributeError("No such attribute") 

678 

679 entry_index = _lib.X509_NAME_get_index_by_NID(self._name, nid, -1) 

680 if entry_index == -1: 

681 return None 

682 

683 entry = _lib.X509_NAME_get_entry(self._name, entry_index) 

684 data = _lib.X509_NAME_ENTRY_get_data(entry) 

685 

686 result_buffer = _ffi.new("unsigned char**") 

687 data_length = _lib.ASN1_STRING_to_UTF8(result_buffer, data) 

688 _openssl_assert(data_length >= 0) 

689 

690 try: 

691 result = _ffi.buffer(result_buffer[0], data_length)[:].decode( 

692 "utf-8" 

693 ) 

694 finally: 

695 # XXX untested 

696 _lib.OPENSSL_free(result_buffer[0]) 

697 return result 

698 

699 def __eq__(self, other: Any) -> bool: 

700 if not isinstance(other, X509Name): 

701 return NotImplemented 

702 

703 return _lib.X509_NAME_cmp(self._name, other._name) == 0 

704 

705 def __lt__(self, other: Any) -> bool: 

706 if not isinstance(other, X509Name): 

707 return NotImplemented 

708 

709 return _lib.X509_NAME_cmp(self._name, other._name) < 0 

710 

711 def __repr__(self) -> str: 

712 """ 

713 String representation of an X509Name 

714 """ 

715 result_buffer = _ffi.new("char[]", 512) 

716 format_result = _lib.X509_NAME_oneline( 

717 self._name, result_buffer, len(result_buffer) 

718 ) 

719 _openssl_assert(format_result != _ffi.NULL) 

720 

721 return "<X509Name object '{}'>".format( 

722 _ffi.string(result_buffer).decode("utf-8"), 

723 ) 

724 

725 def hash(self) -> int: 

726 """ 

727 Return an integer representation of the first four bytes of the 

728 MD5 digest of the DER representation of the name. 

729 

730 This is the Python equivalent of OpenSSL's ``X509_NAME_hash``. 

731 

732 :return: The (integer) hash of this name. 

733 :rtype: :py:class:`int` 

734 """ 

735 return _lib.X509_NAME_hash(self._name) 

736 

737 def der(self) -> bytes: 

738 """ 

739 Return the DER encoding of this name. 

740 

741 :return: The DER encoded form of this name. 

742 :rtype: :py:class:`bytes` 

743 """ 

744 result_buffer = _ffi.new("unsigned char**") 

745 encode_result = _lib.i2d_X509_NAME(self._name, result_buffer) 

746 _openssl_assert(encode_result >= 0) 

747 

748 string_result = _ffi.buffer(result_buffer[0], encode_result)[:] 

749 _lib.OPENSSL_free(result_buffer[0]) 

750 return string_result 

751 

752 def get_components(self) -> list[tuple[bytes, bytes]]: 

753 """ 

754 Returns the components of this name, as a sequence of 2-tuples. 

755 

756 :return: The components of this name. 

757 :rtype: :py:class:`list` of ``name, value`` tuples. 

758 """ 

759 result = [] 

760 for i in range(_lib.X509_NAME_entry_count(self._name)): 

761 ent = _lib.X509_NAME_get_entry(self._name, i) 

762 

763 fname = _lib.X509_NAME_ENTRY_get_object(ent) 

764 fval = _lib.X509_NAME_ENTRY_get_data(ent) 

765 

766 nid = _lib.OBJ_obj2nid(fname) 

767 name = _lib.OBJ_nid2sn(nid) 

768 

769 # ffi.string does not handle strings containing NULL bytes 

770 # (which may have been generated by old, broken software) 

771 value = _ffi.buffer( 

772 _lib.ASN1_STRING_get0_data(fval), _lib.ASN1_STRING_length(fval) 

773 )[:] 

774 result.append((_ffi.string(name), value)) 

775 

776 return result 

777 

778 

779@deprecated( 

780 "X509Extension support in pyOpenSSL is deprecated. You should use the " 

781 "APIs in cryptography." 

782) 

783class X509Extension: 

784 """ 

785 An X.509 v3 certificate extension. 

786 

787 .. deprecated:: 23.3.0 

788 Use cryptography's X509 APIs instead. 

789 """ 

790 

791 def __init__( 

792 self, 

793 type_name: bytes, 

794 critical: bool, 

795 value: bytes, 

796 subject: X509 | None = None, 

797 issuer: X509 | None = None, 

798 ) -> None: 

799 """ 

800 Initializes an X509 extension. 

801 

802 :param type_name: The name of the type of extension_ to create. 

803 :type type_name: :py:data:`bytes` 

804 

805 :param bool critical: A flag indicating whether this is a critical 

806 extension. 

807 

808 :param value: The OpenSSL textual representation of the extension's 

809 value. 

810 :type value: :py:data:`bytes` 

811 

812 :param subject: Optional X509 certificate to use as subject. 

813 :type subject: :py:class:`X509` 

814 

815 :param issuer: Optional X509 certificate to use as issuer. 

816 :type issuer: :py:class:`X509` 

817 

818 .. _extension: https://www.openssl.org/docs/manmaster/man5/ 

819 x509v3_config.html#STANDARD-EXTENSIONS 

820 """ 

821 ctx = _ffi.new("X509V3_CTX*") 

822 

823 # A context is necessary for any extension which uses the r2i 

824 # conversion method. That is, X509V3_EXT_nconf may segfault if passed 

825 # a NULL ctx. Start off by initializing most of the fields to NULL. 

826 _lib.X509V3_set_ctx(ctx, _ffi.NULL, _ffi.NULL, _ffi.NULL, _ffi.NULL, 0) 

827 

828 # We have no configuration database - but perhaps we should (some 

829 # extensions may require it). 

830 _lib.X509V3_set_ctx_nodb(ctx) 

831 

832 # Initialize the subject and issuer, if appropriate. ctx is a local, 

833 # and as far as I can tell none of the X509V3_* APIs invoked here steal 

834 # any references, so no need to mess with reference counts or 

835 # duplicates. 

836 if issuer is not None: 

837 if not isinstance(issuer, X509): 

838 raise TypeError("issuer must be an X509 instance") 

839 ctx.issuer_cert = issuer._x509 

840 if subject is not None: 

841 if not isinstance(subject, X509): 

842 raise TypeError("subject must be an X509 instance") 

843 ctx.subject_cert = subject._x509 

844 

845 if critical: 

846 # There are other OpenSSL APIs which would let us pass in critical 

847 # separately, but they're harder to use, and since value is already 

848 # a pile of crappy junk smuggling a ton of utterly important 

849 # structured data, what's the point of trying to avoid nasty stuff 

850 # with strings? (However, X509V3_EXT_i2d in particular seems like 

851 # it would be a better API to invoke. I do not know where to get 

852 # the ext_struc it desires for its last parameter, though.) 

853 value = b"critical," + value 

854 

855 extension = _lib.X509V3_EXT_nconf(_ffi.NULL, ctx, type_name, value) 

856 if extension == _ffi.NULL: 

857 _raise_current_error() 

858 self._extension = _ffi.gc(extension, _lib.X509_EXTENSION_free) 

859 

860 @property 

861 def _nid(self) -> Any: 

862 return _lib.OBJ_obj2nid( 

863 _lib.X509_EXTENSION_get_object(self._extension) 

864 ) 

865 

866 _prefixes: typing.ClassVar[dict[int, str]] = { 

867 _lib.GEN_EMAIL: "email", 

868 _lib.GEN_DNS: "DNS", 

869 _lib.GEN_URI: "URI", 

870 } 

871 

872 def _subjectAltNameString(self) -> str: 

873 names = _ffi.cast( 

874 "GENERAL_NAMES*", _lib.X509V3_EXT_d2i(self._extension) 

875 ) 

876 

877 names = _ffi.gc(names, _lib.GENERAL_NAMES_free) 

878 parts = [] 

879 for i in range(_lib.sk_GENERAL_NAME_num(names)): 

880 name = _lib.sk_GENERAL_NAME_value(names, i) 

881 try: 

882 label = self._prefixes[name.type] 

883 except KeyError: 

884 bio = _new_mem_buf() 

885 _lib.GENERAL_NAME_print(bio, name) 

886 parts.append(_bio_to_string(bio).decode("utf-8")) 

887 else: 

888 asn1_string = _ffi.cast("ASN1_STRING*", name.d.ia5) 

889 value = _ffi.buffer( 

890 _lib.ASN1_STRING_get0_data(asn1_string), 

891 _lib.ASN1_STRING_length(asn1_string), 

892 )[:].decode("utf-8") 

893 parts.append(label + ":" + value) 

894 return ", ".join(parts) 

895 

896 def __str__(self) -> str: 

897 """ 

898 :return: a nice text representation of the extension 

899 """ 

900 if _lib.NID_subject_alt_name == self._nid: 

901 return self._subjectAltNameString() 

902 

903 bio = _new_mem_buf() 

904 print_result = _lib.X509V3_EXT_print(bio, self._extension, 0, 0) 

905 _openssl_assert(print_result != 0) 

906 

907 return _bio_to_string(bio).decode("utf-8") 

908 

909 def get_critical(self) -> bool: 

910 """ 

911 Returns the critical field of this X.509 extension. 

912 

913 :return: The critical field. 

914 """ 

915 return _lib.X509_EXTENSION_get_critical(self._extension) 

916 

917 def get_short_name(self) -> bytes: 

918 """ 

919 Returns the short type name of this X.509 extension. 

920 

921 The result is a byte string such as :py:const:`b"basicConstraints"`. 

922 

923 :return: The short type name. 

924 :rtype: :py:data:`bytes` 

925 

926 .. versionadded:: 0.12 

927 """ 

928 obj = _lib.X509_EXTENSION_get_object(self._extension) 

929 nid = _lib.OBJ_obj2nid(obj) 

930 # OpenSSL 3.1.0 has a bug where nid2sn returns NULL for NIDs that 

931 # previously returned UNDEF. This is a workaround for that issue. 

932 # https://github.com/openssl/openssl/commit/908ba3ed9adbb3df90f76 

933 buf = _lib.OBJ_nid2sn(nid) 

934 if buf != _ffi.NULL: 

935 return _ffi.string(buf) 

936 else: 

937 return b"UNDEF" 

938 

939 def get_data(self) -> bytes: 

940 """ 

941 Returns the data of the X509 extension, encoded as ASN.1. 

942 

943 :return: The ASN.1 encoded data of this X509 extension. 

944 :rtype: :py:data:`bytes` 

945 

946 .. versionadded:: 0.12 

947 """ 

948 octet_result = _lib.X509_EXTENSION_get_data(self._extension) 

949 string_result = _ffi.cast("ASN1_STRING*", octet_result) 

950 char_result = _lib.ASN1_STRING_get0_data(string_result) 

951 result_length = _lib.ASN1_STRING_length(string_result) 

952 return _ffi.buffer(char_result, result_length)[:] 

953 

954 

955@deprecated( 

956 "CSR support in pyOpenSSL is deprecated. You should use the APIs " 

957 "in cryptography." 

958) 

959class X509Req: 

960 """ 

961 An X.509 certificate signing requests. 

962 

963 .. deprecated:: 24.2.0 

964 Use `cryptography.x509.CertificateSigningRequest` instead. 

965 """ 

966 

967 def __init__(self) -> None: 

968 req = _lib.X509_REQ_new() 

969 self._req = _ffi.gc(req, _lib.X509_REQ_free) 

970 # Default to version 0. 

971 self.set_version(0) 

972 

973 def to_cryptography(self) -> x509.CertificateSigningRequest: 

974 """ 

975 Export as a ``cryptography`` certificate signing request. 

976 

977 :rtype: ``cryptography.x509.CertificateSigningRequest`` 

978 

979 .. versionadded:: 17.1.0 

980 """ 

981 from cryptography.x509 import load_der_x509_csr 

982 

983 der = _dump_certificate_request_internal(FILETYPE_ASN1, self) 

984 

985 return load_der_x509_csr(der) 

986 

987 @classmethod 

988 def from_cryptography( 

989 cls, crypto_req: x509.CertificateSigningRequest 

990 ) -> X509Req: 

991 """ 

992 Construct based on a ``cryptography`` *crypto_req*. 

993 

994 :param crypto_req: A ``cryptography`` X.509 certificate signing request 

995 :type crypto_req: ``cryptography.x509.CertificateSigningRequest`` 

996 

997 :rtype: X509Req 

998 

999 .. versionadded:: 17.1.0 

1000 """ 

1001 if not isinstance(crypto_req, x509.CertificateSigningRequest): 

1002 raise TypeError("Must be a certificate signing request") 

1003 

1004 from cryptography.hazmat.primitives.serialization import Encoding 

1005 

1006 der = crypto_req.public_bytes(Encoding.DER) 

1007 return _load_certificate_request_internal(FILETYPE_ASN1, der) 

1008 

1009 def set_pubkey(self, pkey: PKey) -> None: 

1010 """ 

1011 Set the public key of the certificate signing request. 

1012 

1013 :param pkey: The public key to use. 

1014 :type pkey: :py:class:`PKey` 

1015 

1016 :return: ``None`` 

1017 """ 

1018 set_result = _lib.X509_REQ_set_pubkey(self._req, pkey._pkey) 

1019 _openssl_assert(set_result == 1) 

1020 

1021 def get_pubkey(self) -> PKey: 

1022 """ 

1023 Get the public key of the certificate signing request. 

1024 

1025 :return: The public key. 

1026 :rtype: :py:class:`PKey` 

1027 """ 

1028 pkey = PKey.__new__(PKey) 

1029 pkey._pkey = _lib.X509_REQ_get_pubkey(self._req) 

1030 _openssl_assert(pkey._pkey != _ffi.NULL) 

1031 pkey._pkey = _ffi.gc(pkey._pkey, _lib.EVP_PKEY_free) 

1032 pkey._only_public = True 

1033 return pkey 

1034 

1035 def set_version(self, version: int) -> None: 

1036 """ 

1037 Set the version subfield (RFC 2986, section 4.1) of the certificate 

1038 request. 

1039 

1040 :param int version: The version number. 

1041 :return: ``None`` 

1042 """ 

1043 if not isinstance(version, int): 

1044 raise TypeError("version must be an int") 

1045 if version != 0: 

1046 raise ValueError( 

1047 "Invalid version. The only valid version for X509Req is 0." 

1048 ) 

1049 set_result = _lib.X509_REQ_set_version(self._req, version) 

1050 _openssl_assert(set_result == 1) 

1051 

1052 def get_version(self) -> int: 

1053 """ 

1054 Get the version subfield (RFC 2459, section 4.1.2.1) of the certificate 

1055 request. 

1056 

1057 :return: The value of the version subfield. 

1058 :rtype: :py:class:`int` 

1059 """ 

1060 return _lib.X509_REQ_get_version(self._req) 

1061 

1062 def get_subject(self) -> X509Name: 

1063 """ 

1064 Return the subject of this certificate signing request. 

1065 

1066 This creates a new :class:`X509Name` that wraps the underlying subject 

1067 name field on the certificate signing request. Modifying it will modify 

1068 the underlying signing request, and will have the effect of modifying 

1069 any other :class:`X509Name` that refers to this subject. 

1070 

1071 :return: The subject of this certificate signing request. 

1072 :rtype: :class:`X509Name` 

1073 """ 

1074 name = X509Name.__new__(X509Name) 

1075 name._name = _lib.X509_REQ_get_subject_name(self._req) 

1076 _openssl_assert(name._name != _ffi.NULL) 

1077 

1078 # The name is owned by the X509Req structure. As long as the X509Name 

1079 # Python object is alive, keep the X509Req Python object alive. 

1080 name._owner = self 

1081 

1082 return name 

1083 

1084 def add_extensions(self, extensions: Iterable[X509Extension]) -> None: 

1085 """ 

1086 Add extensions to the certificate signing request. 

1087 

1088 :param extensions: The X.509 extensions to add. 

1089 :type extensions: iterable of :py:class:`X509Extension` 

1090 :return: ``None`` 

1091 """ 

1092 warnings.warn( 

1093 ( 

1094 "This API is deprecated and will be removed in a future " 

1095 "version of pyOpenSSL. You should use pyca/cryptography's " 

1096 "X.509 APIs instead." 

1097 ), 

1098 DeprecationWarning, 

1099 stacklevel=2, 

1100 ) 

1101 

1102 stack = _lib.sk_X509_EXTENSION_new_null() 

1103 _openssl_assert(stack != _ffi.NULL) 

1104 

1105 stack = _ffi.gc(stack, _lib.sk_X509_EXTENSION_free) 

1106 

1107 for ext in extensions: 

1108 if not isinstance(ext, X509Extension): 

1109 raise ValueError("One of the elements is not an X509Extension") 

1110 

1111 # TODO push can fail (here and elsewhere) 

1112 _lib.sk_X509_EXTENSION_push(stack, ext._extension) 

1113 

1114 add_result = _lib.X509_REQ_add_extensions(self._req, stack) 

1115 _openssl_assert(add_result == 1) 

1116 

1117 def get_extensions(self) -> list[X509Extension]: 

1118 """ 

1119 Get X.509 extensions in the certificate signing request. 

1120 

1121 :return: The X.509 extensions in this request. 

1122 :rtype: :py:class:`list` of :py:class:`X509Extension` objects. 

1123 

1124 .. versionadded:: 0.15 

1125 """ 

1126 warnings.warn( 

1127 ( 

1128 "This API is deprecated and will be removed in a future " 

1129 "version of pyOpenSSL. You should use pyca/cryptography's " 

1130 "X.509 APIs instead." 

1131 ), 

1132 DeprecationWarning, 

1133 stacklevel=2, 

1134 ) 

1135 

1136 exts = [] 

1137 native_exts_obj = _lib.X509_REQ_get_extensions(self._req) 

1138 native_exts_obj = _ffi.gc( 

1139 native_exts_obj, 

1140 lambda x: _lib.sk_X509_EXTENSION_pop_free( 

1141 x, 

1142 _ffi.addressof(_lib._original_lib, "X509_EXTENSION_free"), 

1143 ), 

1144 ) 

1145 

1146 for i in range(_lib.sk_X509_EXTENSION_num(native_exts_obj)): 

1147 ext = X509Extension.__new__(X509Extension) 

1148 extension = _lib.X509_EXTENSION_dup( 

1149 _lib.sk_X509_EXTENSION_value(native_exts_obj, i) 

1150 ) 

1151 ext._extension = _ffi.gc(extension, _lib.X509_EXTENSION_free) 

1152 exts.append(ext) 

1153 return exts 

1154 

1155 def sign(self, pkey: PKey, digest: str) -> None: 

1156 """ 

1157 Sign the certificate signing request with this key and digest type. 

1158 

1159 :param pkey: The key pair to sign with. 

1160 :type pkey: :py:class:`PKey` 

1161 :param digest: The name of the message digest to use for the signature, 

1162 e.g. :py:data:`"sha256"`. 

1163 :type digest: :py:class:`str` 

1164 :return: ``None`` 

1165 """ 

1166 if pkey._only_public: 

1167 raise ValueError("Key has only public part") 

1168 

1169 if not pkey._initialized: 

1170 raise ValueError("Key is uninitialized") 

1171 

1172 digest_obj = _lib.EVP_get_digestbyname(_byte_string(digest)) 

1173 if digest_obj == _ffi.NULL: 

1174 raise ValueError("No such digest method") 

1175 

1176 sign_result = _lib.X509_REQ_sign(self._req, pkey._pkey, digest_obj) 

1177 _openssl_assert(sign_result > 0) 

1178 

1179 def verify(self, pkey: PKey) -> bool: 

1180 """ 

1181 Verifies the signature on this certificate signing request. 

1182 

1183 :param PKey key: A public key. 

1184 

1185 :return: ``True`` if the signature is correct. 

1186 :rtype: bool 

1187 

1188 :raises OpenSSL.crypto.Error: If the signature is invalid or there is a 

1189 problem verifying the signature. 

1190 """ 

1191 if not isinstance(pkey, PKey): 

1192 raise TypeError("pkey must be a PKey instance") 

1193 

1194 result = _lib.X509_REQ_verify(self._req, pkey._pkey) 

1195 if result <= 0: 

1196 _raise_current_error() 

1197 

1198 return result 

1199 

1200 

1201class X509: 

1202 """ 

1203 An X.509 certificate. 

1204 """ 

1205 

1206 def __init__(self) -> None: 

1207 x509 = _lib.X509_new() 

1208 _openssl_assert(x509 != _ffi.NULL) 

1209 self._x509 = _ffi.gc(x509, _lib.X509_free) 

1210 

1211 self._issuer_invalidator = _X509NameInvalidator() 

1212 self._subject_invalidator = _X509NameInvalidator() 

1213 

1214 @classmethod 

1215 def _from_raw_x509_ptr(cls, x509: Any) -> X509: 

1216 cert = cls.__new__(cls) 

1217 cert._x509 = _ffi.gc(x509, _lib.X509_free) 

1218 cert._issuer_invalidator = _X509NameInvalidator() 

1219 cert._subject_invalidator = _X509NameInvalidator() 

1220 return cert 

1221 

1222 def to_cryptography(self) -> x509.Certificate: 

1223 """ 

1224 Export as a ``cryptography`` certificate. 

1225 

1226 :rtype: ``cryptography.x509.Certificate`` 

1227 

1228 .. versionadded:: 17.1.0 

1229 """ 

1230 from cryptography.x509 import load_der_x509_certificate 

1231 

1232 der = dump_certificate(FILETYPE_ASN1, self) 

1233 return load_der_x509_certificate(der) 

1234 

1235 @classmethod 

1236 def from_cryptography(cls, crypto_cert: x509.Certificate) -> X509: 

1237 """ 

1238 Construct based on a ``cryptography`` *crypto_cert*. 

1239 

1240 :param crypto_key: A ``cryptography`` X.509 certificate. 

1241 :type crypto_key: ``cryptography.x509.Certificate`` 

1242 

1243 :rtype: X509 

1244 

1245 .. versionadded:: 17.1.0 

1246 """ 

1247 if not isinstance(crypto_cert, x509.Certificate): 

1248 raise TypeError("Must be a certificate") 

1249 

1250 from cryptography.hazmat.primitives.serialization import Encoding 

1251 

1252 der = crypto_cert.public_bytes(Encoding.DER) 

1253 return load_certificate(FILETYPE_ASN1, der) 

1254 

1255 def set_version(self, version: int) -> None: 

1256 """ 

1257 Set the version number of the certificate. Note that the 

1258 version value is zero-based, eg. a value of 0 is V1. 

1259 

1260 :param version: The version number of the certificate. 

1261 :type version: :py:class:`int` 

1262 

1263 :return: ``None`` 

1264 """ 

1265 if not isinstance(version, int): 

1266 raise TypeError("version must be an integer") 

1267 

1268 _openssl_assert(_lib.X509_set_version(self._x509, version) == 1) 

1269 

1270 def get_version(self) -> int: 

1271 """ 

1272 Return the version number of the certificate. 

1273 

1274 :return: The version number of the certificate. 

1275 :rtype: :py:class:`int` 

1276 """ 

1277 return _lib.X509_get_version(self._x509) 

1278 

1279 def get_pubkey(self) -> PKey: 

1280 """ 

1281 Get the public key of the certificate. 

1282 

1283 :return: The public key. 

1284 :rtype: :py:class:`PKey` 

1285 """ 

1286 pkey = PKey.__new__(PKey) 

1287 pkey._pkey = _lib.X509_get_pubkey(self._x509) 

1288 if pkey._pkey == _ffi.NULL: 

1289 _raise_current_error() 

1290 pkey._pkey = _ffi.gc(pkey._pkey, _lib.EVP_PKEY_free) 

1291 pkey._only_public = True 

1292 return pkey 

1293 

1294 def set_pubkey(self, pkey: PKey) -> None: 

1295 """ 

1296 Set the public key of the certificate. 

1297 

1298 :param pkey: The public key. 

1299 :type pkey: :py:class:`PKey` 

1300 

1301 :return: :py:data:`None` 

1302 """ 

1303 if not isinstance(pkey, PKey): 

1304 raise TypeError("pkey must be a PKey instance") 

1305 

1306 set_result = _lib.X509_set_pubkey(self._x509, pkey._pkey) 

1307 _openssl_assert(set_result == 1) 

1308 

1309 def sign(self, pkey: PKey, digest: str) -> None: 

1310 """ 

1311 Sign the certificate with this key and digest type. 

1312 

1313 :param pkey: The key to sign with. 

1314 :type pkey: :py:class:`PKey` 

1315 

1316 :param digest: The name of the message digest to use. 

1317 :type digest: :py:class:`str` 

1318 

1319 :return: :py:data:`None` 

1320 """ 

1321 if not isinstance(pkey, PKey): 

1322 raise TypeError("pkey must be a PKey instance") 

1323 

1324 if pkey._only_public: 

1325 raise ValueError("Key only has public part") 

1326 

1327 if not pkey._initialized: 

1328 raise ValueError("Key is uninitialized") 

1329 

1330 evp_md = _lib.EVP_get_digestbyname(_byte_string(digest)) 

1331 if evp_md == _ffi.NULL: 

1332 raise ValueError("No such digest method") 

1333 

1334 sign_result = _lib.X509_sign(self._x509, pkey._pkey, evp_md) 

1335 _openssl_assert(sign_result > 0) 

1336 

1337 def get_signature_algorithm(self) -> bytes: 

1338 """ 

1339 Return the signature algorithm used in the certificate. 

1340 

1341 :return: The name of the algorithm. 

1342 :rtype: :py:class:`bytes` 

1343 

1344 :raises ValueError: If the signature algorithm is undefined. 

1345 

1346 .. versionadded:: 0.13 

1347 """ 

1348 sig_alg = _lib.X509_get0_tbs_sigalg(self._x509) 

1349 alg = _ffi.new("ASN1_OBJECT **") 

1350 _lib.X509_ALGOR_get0(alg, _ffi.NULL, _ffi.NULL, sig_alg) 

1351 nid = _lib.OBJ_obj2nid(alg[0]) 

1352 if nid == _lib.NID_undef: 

1353 raise ValueError("Undefined signature algorithm") 

1354 return _ffi.string(_lib.OBJ_nid2ln(nid)) 

1355 

1356 def digest(self, digest_name: str) -> bytes: 

1357 """ 

1358 Return the digest of the X509 object. 

1359 

1360 :param digest_name: The name of the digest algorithm to use. 

1361 :type digest_name: :py:class:`str` 

1362 

1363 :return: The digest of the object, formatted as 

1364 :py:const:`b":"`-delimited hex pairs. 

1365 :rtype: :py:class:`bytes` 

1366 """ 

1367 digest = _lib.EVP_get_digestbyname(_byte_string(digest_name)) 

1368 if digest == _ffi.NULL: 

1369 raise ValueError("No such digest method") 

1370 

1371 result_buffer = _ffi.new("unsigned char[]", _lib.EVP_MAX_MD_SIZE) 

1372 result_length = _ffi.new("unsigned int[]", 1) 

1373 result_length[0] = len(result_buffer) 

1374 

1375 digest_result = _lib.X509_digest( 

1376 self._x509, digest, result_buffer, result_length 

1377 ) 

1378 _openssl_assert(digest_result == 1) 

1379 

1380 return b":".join( 

1381 [ 

1382 b16encode(ch).upper() 

1383 for ch in _ffi.buffer(result_buffer, result_length[0]) 

1384 ] 

1385 ) 

1386 

1387 def subject_name_hash(self) -> int: 

1388 """ 

1389 Return the hash of the X509 subject. 

1390 

1391 :return: The hash of the subject. 

1392 :rtype: :py:class:`int` 

1393 """ 

1394 return _lib.X509_subject_name_hash(self._x509) 

1395 

1396 def set_serial_number(self, serial: int) -> None: 

1397 """ 

1398 Set the serial number of the certificate. 

1399 

1400 :param serial: The new serial number. 

1401 :type serial: :py:class:`int` 

1402 

1403 :return: :py:data`None` 

1404 """ 

1405 if not isinstance(serial, int): 

1406 raise TypeError("serial must be an integer") 

1407 

1408 hex_serial = hex(serial)[2:] 

1409 hex_serial_bytes = hex_serial.encode("ascii") 

1410 

1411 bignum_serial = _ffi.new("BIGNUM**") 

1412 

1413 # BN_hex2bn stores the result in &bignum. 

1414 result = _lib.BN_hex2bn(bignum_serial, hex_serial_bytes) 

1415 _openssl_assert(result != _ffi.NULL) 

1416 

1417 asn1_serial = _lib.BN_to_ASN1_INTEGER(bignum_serial[0], _ffi.NULL) 

1418 _lib.BN_free(bignum_serial[0]) 

1419 _openssl_assert(asn1_serial != _ffi.NULL) 

1420 asn1_serial = _ffi.gc(asn1_serial, _lib.ASN1_INTEGER_free) 

1421 set_result = _lib.X509_set_serialNumber(self._x509, asn1_serial) 

1422 _openssl_assert(set_result == 1) 

1423 

1424 def get_serial_number(self) -> int: 

1425 """ 

1426 Return the serial number of this certificate. 

1427 

1428 :return: The serial number. 

1429 :rtype: int 

1430 """ 

1431 asn1_serial = _lib.X509_get_serialNumber(self._x509) 

1432 bignum_serial = _lib.ASN1_INTEGER_to_BN(asn1_serial, _ffi.NULL) 

1433 try: 

1434 hex_serial = _lib.BN_bn2hex(bignum_serial) 

1435 try: 

1436 hexstring_serial = _ffi.string(hex_serial) 

1437 serial = int(hexstring_serial, 16) 

1438 return serial 

1439 finally: 

1440 _lib.OPENSSL_free(hex_serial) 

1441 finally: 

1442 _lib.BN_free(bignum_serial) 

1443 

1444 def gmtime_adj_notAfter(self, amount: int) -> None: 

1445 """ 

1446 Adjust the time stamp on which the certificate stops being valid. 

1447 

1448 :param int amount: The number of seconds by which to adjust the 

1449 timestamp. 

1450 :return: ``None`` 

1451 """ 

1452 if not isinstance(amount, int): 

1453 raise TypeError("amount must be an integer") 

1454 

1455 notAfter = _lib.X509_getm_notAfter(self._x509) 

1456 _lib.X509_gmtime_adj(notAfter, amount) 

1457 

1458 def gmtime_adj_notBefore(self, amount: int) -> None: 

1459 """ 

1460 Adjust the timestamp on which the certificate starts being valid. 

1461 

1462 :param amount: The number of seconds by which to adjust the timestamp. 

1463 :return: ``None`` 

1464 """ 

1465 if not isinstance(amount, int): 

1466 raise TypeError("amount must be an integer") 

1467 

1468 notBefore = _lib.X509_getm_notBefore(self._x509) 

1469 _lib.X509_gmtime_adj(notBefore, amount) 

1470 

1471 def has_expired(self) -> bool: 

1472 """ 

1473 Check whether the certificate has expired. 

1474 

1475 :return: ``True`` if the certificate has expired, ``False`` otherwise. 

1476 :rtype: bool 

1477 """ 

1478 time_bytes = self.get_notAfter() 

1479 if time_bytes is None: 

1480 raise ValueError("Unable to determine notAfter") 

1481 time_string = time_bytes.decode("utf-8") 

1482 not_after = datetime.datetime.strptime(time_string, "%Y%m%d%H%M%SZ") 

1483 

1484 UTC = datetime.timezone.utc 

1485 utcnow = datetime.datetime.now(UTC).replace(tzinfo=None) 

1486 return not_after < utcnow 

1487 

1488 def _get_boundary_time(self, which: Any) -> bytes | None: 

1489 return _get_asn1_time(which(self._x509)) 

1490 

1491 def get_notBefore(self) -> bytes | None: 

1492 """ 

1493 Get the timestamp at which the certificate starts being valid. 

1494 

1495 The timestamp is formatted as an ASN.1 TIME:: 

1496 

1497 YYYYMMDDhhmmssZ 

1498 

1499 :return: A timestamp string, or ``None`` if there is none. 

1500 :rtype: bytes or NoneType 

1501 """ 

1502 return self._get_boundary_time(_lib.X509_getm_notBefore) 

1503 

1504 def _set_boundary_time( 

1505 self, which: Callable[..., Any], when: bytes 

1506 ) -> None: 

1507 return _set_asn1_time(which(self._x509), when) 

1508 

1509 def set_notBefore(self, when: bytes) -> None: 

1510 """ 

1511 Set the timestamp at which the certificate starts being valid. 

1512 

1513 The timestamp is formatted as an ASN.1 TIME:: 

1514 

1515 YYYYMMDDhhmmssZ 

1516 

1517 :param bytes when: A timestamp string. 

1518 :return: ``None`` 

1519 """ 

1520 return self._set_boundary_time(_lib.X509_getm_notBefore, when) 

1521 

1522 def get_notAfter(self) -> bytes | None: 

1523 """ 

1524 Get the timestamp at which the certificate stops being valid. 

1525 

1526 The timestamp is formatted as an ASN.1 TIME:: 

1527 

1528 YYYYMMDDhhmmssZ 

1529 

1530 :return: A timestamp string, or ``None`` if there is none. 

1531 :rtype: bytes or NoneType 

1532 """ 

1533 return self._get_boundary_time(_lib.X509_getm_notAfter) 

1534 

1535 def set_notAfter(self, when: bytes) -> None: 

1536 """ 

1537 Set the timestamp at which the certificate stops being valid. 

1538 

1539 The timestamp is formatted as an ASN.1 TIME:: 

1540 

1541 YYYYMMDDhhmmssZ 

1542 

1543 :param bytes when: A timestamp string. 

1544 :return: ``None`` 

1545 """ 

1546 return self._set_boundary_time(_lib.X509_getm_notAfter, when) 

1547 

1548 def _get_name(self, which: Any) -> X509Name: 

1549 name = X509Name.__new__(X509Name) 

1550 name._name = which(self._x509) 

1551 _openssl_assert(name._name != _ffi.NULL) 

1552 

1553 # The name is owned by the X509 structure. As long as the X509Name 

1554 # Python object is alive, keep the X509 Python object alive. 

1555 name._owner = self 

1556 

1557 return name 

1558 

1559 def _set_name(self, which: Any, name: X509Name) -> None: 

1560 if not isinstance(name, X509Name): 

1561 raise TypeError("name must be an X509Name") 

1562 set_result = which(self._x509, name._name) 

1563 _openssl_assert(set_result == 1) 

1564 

1565 def get_issuer(self) -> X509Name: 

1566 """ 

1567 Return the issuer of this certificate. 

1568 

1569 This creates a new :class:`X509Name` that wraps the underlying issuer 

1570 name field on the certificate. Modifying it will modify the underlying 

1571 certificate, and will have the effect of modifying any other 

1572 :class:`X509Name` that refers to this issuer. 

1573 

1574 :return: The issuer of this certificate. 

1575 :rtype: :class:`X509Name` 

1576 """ 

1577 name = self._get_name(_lib.X509_get_issuer_name) 

1578 self._issuer_invalidator.add(name) 

1579 return name 

1580 

1581 def set_issuer(self, issuer: X509Name) -> None: 

1582 """ 

1583 Set the issuer of this certificate. 

1584 

1585 :param issuer: The issuer. 

1586 :type issuer: :py:class:`X509Name` 

1587 

1588 :return: ``None`` 

1589 """ 

1590 self._set_name(_lib.X509_set_issuer_name, issuer) 

1591 self._issuer_invalidator.clear() 

1592 

1593 def get_subject(self) -> X509Name: 

1594 """ 

1595 Return the subject of this certificate. 

1596 

1597 This creates a new :class:`X509Name` that wraps the underlying subject 

1598 name field on the certificate. Modifying it will modify the underlying 

1599 certificate, and will have the effect of modifying any other 

1600 :class:`X509Name` that refers to this subject. 

1601 

1602 :return: The subject of this certificate. 

1603 :rtype: :class:`X509Name` 

1604 """ 

1605 name = self._get_name(_lib.X509_get_subject_name) 

1606 self._subject_invalidator.add(name) 

1607 return name 

1608 

1609 def set_subject(self, subject: X509Name) -> None: 

1610 """ 

1611 Set the subject of this certificate. 

1612 

1613 :param subject: The subject. 

1614 :type subject: :py:class:`X509Name` 

1615 

1616 :return: ``None`` 

1617 """ 

1618 self._set_name(_lib.X509_set_subject_name, subject) 

1619 self._subject_invalidator.clear() 

1620 

1621 def get_extension_count(self) -> int: 

1622 """ 

1623 Get the number of extensions on this certificate. 

1624 

1625 :return: The number of extensions. 

1626 :rtype: :py:class:`int` 

1627 

1628 .. versionadded:: 0.12 

1629 """ 

1630 return _lib.X509_get_ext_count(self._x509) 

1631 

1632 def add_extensions(self, extensions: Iterable[X509Extension]) -> None: 

1633 """ 

1634 Add extensions to the certificate. 

1635 

1636 :param extensions: The extensions to add. 

1637 :type extensions: An iterable of :py:class:`X509Extension` objects. 

1638 :return: ``None`` 

1639 """ 

1640 warnings.warn( 

1641 ( 

1642 "This API is deprecated and will be removed in a future " 

1643 "version of pyOpenSSL. You should use pyca/cryptography's " 

1644 "X.509 APIs instead." 

1645 ), 

1646 DeprecationWarning, 

1647 stacklevel=2, 

1648 ) 

1649 

1650 for ext in extensions: 

1651 if not isinstance(ext, X509Extension): 

1652 raise ValueError("One of the elements is not an X509Extension") 

1653 

1654 add_result = _lib.X509_add_ext(self._x509, ext._extension, -1) 

1655 _openssl_assert(add_result == 1) 

1656 

1657 def get_extension(self, index: int) -> X509Extension: 

1658 """ 

1659 Get a specific extension of the certificate by index. 

1660 

1661 Extensions on a certificate are kept in order. The index 

1662 parameter selects which extension will be returned. 

1663 

1664 :param int index: The index of the extension to retrieve. 

1665 :return: The extension at the specified index. 

1666 :rtype: :py:class:`X509Extension` 

1667 :raises IndexError: If the extension index was out of bounds. 

1668 

1669 .. versionadded:: 0.12 

1670 """ 

1671 warnings.warn( 

1672 ( 

1673 "This API is deprecated and will be removed in a future " 

1674 "version of pyOpenSSL. You should use pyca/cryptography's " 

1675 "X.509 APIs instead." 

1676 ), 

1677 DeprecationWarning, 

1678 stacklevel=2, 

1679 ) 

1680 

1681 ext = X509Extension.__new__(X509Extension) 

1682 ext._extension = _lib.X509_get_ext(self._x509, index) 

1683 if ext._extension == _ffi.NULL: 

1684 raise IndexError("extension index out of bounds") 

1685 

1686 extension = _lib.X509_EXTENSION_dup(ext._extension) 

1687 ext._extension = _ffi.gc(extension, _lib.X509_EXTENSION_free) 

1688 return ext 

1689 

1690 

1691class X509StoreFlags: 

1692 """ 

1693 Flags for X509 verification, used to change the behavior of 

1694 :class:`X509Store`. 

1695 

1696 See `OpenSSL Verification Flags`_ for details. 

1697 

1698 .. _OpenSSL Verification Flags: 

1699 https://www.openssl.org/docs/manmaster/man3/X509_VERIFY_PARAM_set_flags.html 

1700 """ 

1701 

1702 CRL_CHECK: int = _lib.X509_V_FLAG_CRL_CHECK 

1703 CRL_CHECK_ALL: int = _lib.X509_V_FLAG_CRL_CHECK_ALL 

1704 IGNORE_CRITICAL: int = _lib.X509_V_FLAG_IGNORE_CRITICAL 

1705 X509_STRICT: int = _lib.X509_V_FLAG_X509_STRICT 

1706 ALLOW_PROXY_CERTS: int = _lib.X509_V_FLAG_ALLOW_PROXY_CERTS 

1707 POLICY_CHECK: int = _lib.X509_V_FLAG_POLICY_CHECK 

1708 EXPLICIT_POLICY: int = _lib.X509_V_FLAG_EXPLICIT_POLICY 

1709 INHIBIT_MAP: int = _lib.X509_V_FLAG_INHIBIT_MAP 

1710 CHECK_SS_SIGNATURE: int = _lib.X509_V_FLAG_CHECK_SS_SIGNATURE 

1711 PARTIAL_CHAIN: int = _lib.X509_V_FLAG_PARTIAL_CHAIN 

1712 

1713 

1714class X509Store: 

1715 """ 

1716 An X.509 store. 

1717 

1718 An X.509 store is used to describe a context in which to verify a 

1719 certificate. A description of a context may include a set of certificates 

1720 to trust, a set of certificate revocation lists, verification flags and 

1721 more. 

1722 

1723 An X.509 store, being only a description, cannot be used by itself to 

1724 verify a certificate. To carry out the actual verification process, see 

1725 :class:`X509StoreContext`. 

1726 """ 

1727 

1728 def __init__(self) -> None: 

1729 store = _lib.X509_STORE_new() 

1730 self._store = _ffi.gc(store, _lib.X509_STORE_free) 

1731 

1732 def add_cert(self, cert: X509) -> None: 

1733 """ 

1734 Adds a trusted certificate to this store. 

1735 

1736 Adding a certificate with this method adds this certificate as a 

1737 *trusted* certificate. 

1738 

1739 :param X509 cert: The certificate to add to this store. 

1740 

1741 :raises TypeError: If the certificate is not an :class:`X509`. 

1742 

1743 :raises OpenSSL.crypto.Error: If OpenSSL was unhappy with your 

1744 certificate. 

1745 

1746 :return: ``None`` if the certificate was added successfully. 

1747 """ 

1748 if not isinstance(cert, X509): 

1749 raise TypeError() 

1750 

1751 res = _lib.X509_STORE_add_cert(self._store, cert._x509) 

1752 _openssl_assert(res == 1) 

1753 

1754 def add_crl(self, crl: x509.CertificateRevocationList) -> None: 

1755 """ 

1756 Add a certificate revocation list to this store. 

1757 

1758 The certificate revocation lists added to a store will only be used if 

1759 the associated flags are configured to check certificate revocation 

1760 lists. 

1761 

1762 .. versionadded:: 16.1.0 

1763 

1764 :param crl: The certificate revocation list to add to this store. 

1765 :type crl: ``cryptography.x509.CertificateRevocationList`` 

1766 :return: ``None`` if the certificate revocation list was added 

1767 successfully. 

1768 """ 

1769 if isinstance(crl, x509.CertificateRevocationList): 

1770 from cryptography.hazmat.primitives.serialization import Encoding 

1771 

1772 bio = _new_mem_buf(crl.public_bytes(Encoding.DER)) 

1773 openssl_crl = _lib.d2i_X509_CRL_bio(bio, _ffi.NULL) 

1774 _openssl_assert(openssl_crl != _ffi.NULL) 

1775 crl = _ffi.gc(openssl_crl, _lib.X509_CRL_free) 

1776 else: 

1777 raise TypeError( 

1778 "CRL must be of type " 

1779 "cryptography.x509.CertificateRevocationList" 

1780 ) 

1781 

1782 _openssl_assert(_lib.X509_STORE_add_crl(self._store, crl) != 0) 

1783 

1784 def set_flags(self, flags: int) -> None: 

1785 """ 

1786 Set verification flags to this store. 

1787 

1788 Verification flags can be combined by oring them together. 

1789 

1790 .. note:: 

1791 

1792 Setting a verification flag sometimes requires clients to add 

1793 additional information to the store, otherwise a suitable error will 

1794 be raised. 

1795 

1796 For example, in setting flags to enable CRL checking a 

1797 suitable CRL must be added to the store otherwise an error will be 

1798 raised. 

1799 

1800 .. versionadded:: 16.1.0 

1801 

1802 :param int flags: The verification flags to set on this store. 

1803 See :class:`X509StoreFlags` for available constants. 

1804 :return: ``None`` if the verification flags were successfully set. 

1805 """ 

1806 _openssl_assert(_lib.X509_STORE_set_flags(self._store, flags) != 0) 

1807 

1808 def set_time(self, vfy_time: datetime.datetime) -> None: 

1809 """ 

1810 Set the time against which the certificates are verified. 

1811 

1812 Normally the current time is used. 

1813 

1814 .. note:: 

1815 

1816 For example, you can determine if a certificate was valid at a given 

1817 time. 

1818 

1819 .. versionadded:: 17.0.0 

1820 

1821 :param datetime vfy_time: The verification time to set on this store. 

1822 :return: ``None`` if the verification time was successfully set. 

1823 """ 

1824 param = _lib.X509_VERIFY_PARAM_new() 

1825 param = _ffi.gc(param, _lib.X509_VERIFY_PARAM_free) 

1826 

1827 _lib.X509_VERIFY_PARAM_set_time( 

1828 param, calendar.timegm(vfy_time.timetuple()) 

1829 ) 

1830 _openssl_assert(_lib.X509_STORE_set1_param(self._store, param) != 0) 

1831 

1832 def load_locations( 

1833 self, 

1834 cafile: StrOrBytesPath | None, 

1835 capath: StrOrBytesPath | None = None, 

1836 ) -> None: 

1837 """ 

1838 Let X509Store know where we can find trusted certificates for the 

1839 certificate chain. Note that the certificates have to be in PEM 

1840 format. 

1841 

1842 If *capath* is passed, it must be a directory prepared using the 

1843 ``c_rehash`` tool included with OpenSSL. Either, but not both, of 

1844 *cafile* or *capath* may be ``None``. 

1845 

1846 .. note:: 

1847 

1848 Both *cafile* and *capath* may be set simultaneously. 

1849 

1850 Call this method multiple times to add more than one location. 

1851 For example, CA certificates, and certificate revocation list bundles 

1852 may be passed in *cafile* in subsequent calls to this method. 

1853 

1854 .. versionadded:: 20.0 

1855 

1856 :param cafile: In which file we can find the certificates (``bytes`` or 

1857 ``unicode``). 

1858 :param capath: In which directory we can find the certificates 

1859 (``bytes`` or ``unicode``). 

1860 

1861 :return: ``None`` if the locations were set successfully. 

1862 

1863 :raises OpenSSL.crypto.Error: If both *cafile* and *capath* is ``None`` 

1864 or the locations could not be set for any reason. 

1865 

1866 """ 

1867 if cafile is None: 

1868 cafile = _ffi.NULL 

1869 else: 

1870 cafile = _path_bytes(cafile) 

1871 

1872 if capath is None: 

1873 capath = _ffi.NULL 

1874 else: 

1875 capath = _path_bytes(capath) 

1876 

1877 load_result = _lib.X509_STORE_load_locations( 

1878 self._store, cafile, capath 

1879 ) 

1880 if not load_result: 

1881 _raise_current_error() 

1882 

1883 

1884class X509StoreContextError(Exception): 

1885 """ 

1886 An exception raised when an error occurred while verifying a certificate 

1887 using `OpenSSL.X509StoreContext.verify_certificate`. 

1888 

1889 :ivar certificate: The certificate which caused verificate failure. 

1890 :type certificate: :class:`X509` 

1891 """ 

1892 

1893 def __init__( 

1894 self, message: str, errors: list[Any], certificate: X509 

1895 ) -> None: 

1896 super().__init__(message) 

1897 self.errors = errors 

1898 self.certificate = certificate 

1899 

1900 

1901class X509StoreContext: 

1902 """ 

1903 An X.509 store context. 

1904 

1905 An X.509 store context is used to carry out the actual verification process 

1906 of a certificate in a described context. For describing such a context, see 

1907 :class:`X509Store`. 

1908 

1909 :param X509Store store: The certificates which will be trusted for the 

1910 purposes of any verifications. 

1911 :param X509 certificate: The certificate to be verified. 

1912 :param chain: List of untrusted certificates that may be used for building 

1913 the certificate chain. May be ``None``. 

1914 :type chain: :class:`list` of :class:`X509` 

1915 """ 

1916 

1917 def __init__( 

1918 self, 

1919 store: X509Store, 

1920 certificate: X509, 

1921 chain: Sequence[X509] | None = None, 

1922 ) -> None: 

1923 self._store = store 

1924 self._cert = certificate 

1925 self._chain = self._build_certificate_stack(chain) 

1926 

1927 @staticmethod 

1928 def _build_certificate_stack( 

1929 certificates: Sequence[X509] | None, 

1930 ) -> None: 

1931 def cleanup(s: Any) -> None: 

1932 # Equivalent to sk_X509_pop_free, but we don't 

1933 # currently have a CFFI binding for that available 

1934 for i in range(_lib.sk_X509_num(s)): 

1935 x = _lib.sk_X509_value(s, i) 

1936 _lib.X509_free(x) 

1937 _lib.sk_X509_free(s) 

1938 

1939 if certificates is None or len(certificates) == 0: 

1940 return _ffi.NULL 

1941 

1942 stack = _lib.sk_X509_new_null() 

1943 _openssl_assert(stack != _ffi.NULL) 

1944 stack = _ffi.gc(stack, cleanup) 

1945 

1946 for cert in certificates: 

1947 if not isinstance(cert, X509): 

1948 raise TypeError("One of the elements is not an X509 instance") 

1949 

1950 _openssl_assert(_lib.X509_up_ref(cert._x509) > 0) 

1951 if _lib.sk_X509_push(stack, cert._x509) <= 0: 

1952 _lib.X509_free(cert._x509) 

1953 _raise_current_error() 

1954 

1955 return stack 

1956 

1957 @staticmethod 

1958 def _exception_from_context(store_ctx: Any) -> X509StoreContextError: 

1959 """ 

1960 Convert an OpenSSL native context error failure into a Python 

1961 exception. 

1962 

1963 When a call to native OpenSSL X509_verify_cert fails, additional 

1964 information about the failure can be obtained from the store context. 

1965 """ 

1966 message = _ffi.string( 

1967 _lib.X509_verify_cert_error_string( 

1968 _lib.X509_STORE_CTX_get_error(store_ctx) 

1969 ) 

1970 ).decode("utf-8") 

1971 errors = [ 

1972 _lib.X509_STORE_CTX_get_error(store_ctx), 

1973 _lib.X509_STORE_CTX_get_error_depth(store_ctx), 

1974 message, 

1975 ] 

1976 # A context error should always be associated with a certificate, so we 

1977 # expect this call to never return :class:`None`. 

1978 _x509 = _lib.X509_STORE_CTX_get_current_cert(store_ctx) 

1979 _cert = _lib.X509_dup(_x509) 

1980 pycert = X509._from_raw_x509_ptr(_cert) 

1981 return X509StoreContextError(message, errors, pycert) 

1982 

1983 def _verify_certificate(self) -> Any: 

1984 """ 

1985 Verifies the certificate and runs an X509_STORE_CTX containing the 

1986 results. 

1987 

1988 :raises X509StoreContextError: If an error occurred when validating a 

1989 certificate in the context. Sets ``certificate`` attribute to 

1990 indicate which certificate caused the error. 

1991 """ 

1992 store_ctx = _lib.X509_STORE_CTX_new() 

1993 _openssl_assert(store_ctx != _ffi.NULL) 

1994 store_ctx = _ffi.gc(store_ctx, _lib.X509_STORE_CTX_free) 

1995 

1996 ret = _lib.X509_STORE_CTX_init( 

1997 store_ctx, self._store._store, self._cert._x509, self._chain 

1998 ) 

1999 _openssl_assert(ret == 1) 

2000 

2001 ret = _lib.X509_verify_cert(store_ctx) 

2002 if ret <= 0: 

2003 raise self._exception_from_context(store_ctx) 

2004 

2005 return store_ctx 

2006 

2007 def set_store(self, store: X509Store) -> None: 

2008 """ 

2009 Set the context's X.509 store. 

2010 

2011 .. versionadded:: 0.15 

2012 

2013 :param X509Store store: The store description which will be used for 

2014 the purposes of any *future* verifications. 

2015 """ 

2016 self._store = store 

2017 

2018 def verify_certificate(self) -> None: 

2019 """ 

2020 Verify a certificate in a context. 

2021 

2022 .. versionadded:: 0.15 

2023 

2024 :raises X509StoreContextError: If an error occurred when validating a 

2025 certificate in the context. Sets ``certificate`` attribute to 

2026 indicate which certificate caused the error. 

2027 """ 

2028 self._verify_certificate() 

2029 

2030 def get_verified_chain(self) -> list[X509]: 

2031 """ 

2032 Verify a certificate in a context and return the complete validated 

2033 chain. 

2034 

2035 :raises X509StoreContextError: If an error occurred when validating a 

2036 certificate in the context. Sets ``certificate`` attribute to 

2037 indicate which certificate caused the error. 

2038 

2039 .. versionadded:: 20.0 

2040 """ 

2041 store_ctx = self._verify_certificate() 

2042 

2043 # Note: X509_STORE_CTX_get1_chain returns a deep copy of the chain. 

2044 cert_stack = _lib.X509_STORE_CTX_get1_chain(store_ctx) 

2045 _openssl_assert(cert_stack != _ffi.NULL) 

2046 

2047 result = [] 

2048 for i in range(_lib.sk_X509_num(cert_stack)): 

2049 cert = _lib.sk_X509_value(cert_stack, i) 

2050 _openssl_assert(cert != _ffi.NULL) 

2051 pycert = X509._from_raw_x509_ptr(cert) 

2052 result.append(pycert) 

2053 

2054 # Free the stack but not the members which are freed by the X509 class. 

2055 _lib.sk_X509_free(cert_stack) 

2056 return result 

2057 

2058 

2059def load_certificate(type: int, buffer: bytes) -> X509: 

2060 """ 

2061 Load a certificate (X509) from the string *buffer* encoded with the 

2062 type *type*. 

2063 

2064 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1) 

2065 

2066 :param bytes buffer: The buffer the certificate is stored in 

2067 

2068 :return: The X509 object 

2069 """ 

2070 if isinstance(buffer, str): 

2071 buffer = buffer.encode("ascii") 

2072 

2073 bio = _new_mem_buf(buffer) 

2074 

2075 if type == FILETYPE_PEM: 

2076 x509 = _lib.PEM_read_bio_X509(bio, _ffi.NULL, _ffi.NULL, _ffi.NULL) 

2077 elif type == FILETYPE_ASN1: 

2078 x509 = _lib.d2i_X509_bio(bio, _ffi.NULL) 

2079 else: 

2080 raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1") 

2081 

2082 if x509 == _ffi.NULL: 

2083 _raise_current_error() 

2084 

2085 return X509._from_raw_x509_ptr(x509) 

2086 

2087 

2088def dump_certificate(type: int, cert: X509) -> bytes: 

2089 """ 

2090 Dump the certificate *cert* into a buffer string encoded with the type 

2091 *type*. 

2092 

2093 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1, or 

2094 FILETYPE_TEXT) 

2095 :param cert: The certificate to dump 

2096 :return: The buffer with the dumped certificate in 

2097 """ 

2098 bio = _new_mem_buf() 

2099 

2100 if type == FILETYPE_PEM: 

2101 result_code = _lib.PEM_write_bio_X509(bio, cert._x509) 

2102 elif type == FILETYPE_ASN1: 

2103 result_code = _lib.i2d_X509_bio(bio, cert._x509) 

2104 elif type == FILETYPE_TEXT: 

2105 result_code = _lib.X509_print_ex(bio, cert._x509, 0, 0) 

2106 else: 

2107 raise ValueError( 

2108 "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or " 

2109 "FILETYPE_TEXT" 

2110 ) 

2111 

2112 _openssl_assert(result_code == 1) 

2113 return _bio_to_string(bio) 

2114 

2115 

2116def dump_publickey(type: int, pkey: PKey) -> bytes: 

2117 """ 

2118 Dump a public key to a buffer. 

2119 

2120 :param type: The file type (one of :data:`FILETYPE_PEM` or 

2121 :data:`FILETYPE_ASN1`). 

2122 :param PKey pkey: The public key to dump 

2123 :return: The buffer with the dumped key in it. 

2124 :rtype: bytes 

2125 """ 

2126 bio = _new_mem_buf() 

2127 if type == FILETYPE_PEM: 

2128 write_bio = _lib.PEM_write_bio_PUBKEY 

2129 elif type == FILETYPE_ASN1: 

2130 write_bio = _lib.i2d_PUBKEY_bio 

2131 else: 

2132 raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1") 

2133 

2134 result_code = write_bio(bio, pkey._pkey) 

2135 if result_code != 1: # pragma: no cover 

2136 _raise_current_error() 

2137 

2138 return _bio_to_string(bio) 

2139 

2140 

2141def dump_privatekey( 

2142 type: int, 

2143 pkey: PKey, 

2144 cipher: str | None = None, 

2145 passphrase: PassphraseCallableT | None = None, 

2146) -> bytes: 

2147 """ 

2148 Dump the private key *pkey* into a buffer string encoded with the type 

2149 *type*. Optionally (if *type* is :const:`FILETYPE_PEM`) encrypting it 

2150 using *cipher* and *passphrase*. 

2151 

2152 :param type: The file type (one of :const:`FILETYPE_PEM`, 

2153 :const:`FILETYPE_ASN1`, or :const:`FILETYPE_TEXT`) 

2154 :param PKey pkey: The PKey to dump 

2155 :param cipher: (optional) if encrypted PEM format, the cipher to use 

2156 :param passphrase: (optional) if encrypted PEM format, this can be either 

2157 the passphrase to use, or a callback for providing the passphrase. 

2158 

2159 :return: The buffer with the dumped key in 

2160 :rtype: bytes 

2161 """ 

2162 bio = _new_mem_buf() 

2163 

2164 if not isinstance(pkey, PKey): 

2165 raise TypeError("pkey must be a PKey") 

2166 

2167 if cipher is not None: 

2168 if passphrase is None: 

2169 raise TypeError( 

2170 "if a value is given for cipher " 

2171 "one must also be given for passphrase" 

2172 ) 

2173 cipher_obj = _lib.EVP_get_cipherbyname(_byte_string(cipher)) 

2174 if cipher_obj == _ffi.NULL: 

2175 raise ValueError("Invalid cipher name") 

2176 else: 

2177 cipher_obj = _ffi.NULL 

2178 

2179 helper = _PassphraseHelper(type, passphrase) 

2180 if type == FILETYPE_PEM: 

2181 result_code = _lib.PEM_write_bio_PrivateKey( 

2182 bio, 

2183 pkey._pkey, 

2184 cipher_obj, 

2185 _ffi.NULL, 

2186 0, 

2187 helper.callback, 

2188 helper.callback_args, 

2189 ) 

2190 helper.raise_if_problem() 

2191 elif type == FILETYPE_ASN1: 

2192 result_code = _lib.i2d_PrivateKey_bio(bio, pkey._pkey) 

2193 elif type == FILETYPE_TEXT: 

2194 if _lib.EVP_PKEY_id(pkey._pkey) != _lib.EVP_PKEY_RSA: 

2195 raise TypeError("Only RSA keys are supported for FILETYPE_TEXT") 

2196 

2197 rsa = _ffi.gc(_lib.EVP_PKEY_get1_RSA(pkey._pkey), _lib.RSA_free) 

2198 result_code = _lib.RSA_print(bio, rsa, 0) 

2199 else: 

2200 raise ValueError( 

2201 "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or " 

2202 "FILETYPE_TEXT" 

2203 ) 

2204 

2205 _openssl_assert(result_code != 0) 

2206 

2207 return _bio_to_string(bio) 

2208 

2209 

2210class _PassphraseHelper: 

2211 def __init__( 

2212 self, 

2213 type: int, 

2214 passphrase: PassphraseCallableT | None, 

2215 more_args: bool = False, 

2216 truncate: bool = False, 

2217 ) -> None: 

2218 if type != FILETYPE_PEM and passphrase is not None: 

2219 raise ValueError( 

2220 "only FILETYPE_PEM key format supports encryption" 

2221 ) 

2222 self._passphrase = passphrase 

2223 self._more_args = more_args 

2224 self._truncate = truncate 

2225 self._problems: list[Exception] = [] 

2226 

2227 @property 

2228 def callback(self) -> Any: 

2229 if self._passphrase is None: 

2230 return _ffi.NULL 

2231 elif isinstance(self._passphrase, bytes) or callable(self._passphrase): 

2232 return _ffi.callback("pem_password_cb", self._read_passphrase) 

2233 else: 

2234 raise TypeError( 

2235 "Last argument must be a byte string or a callable." 

2236 ) 

2237 

2238 @property 

2239 def callback_args(self) -> Any: 

2240 if self._passphrase is None: 

2241 return _ffi.NULL 

2242 elif isinstance(self._passphrase, bytes) or callable(self._passphrase): 

2243 return _ffi.NULL 

2244 else: 

2245 raise TypeError( 

2246 "Last argument must be a byte string or a callable." 

2247 ) 

2248 

2249 def raise_if_problem(self, exceptionType: type[Exception] = Error) -> None: 

2250 if self._problems: 

2251 # Flush the OpenSSL error queue 

2252 try: 

2253 _exception_from_error_queue(exceptionType) 

2254 except exceptionType: 

2255 pass 

2256 

2257 raise self._problems.pop(0) 

2258 

2259 def _read_passphrase( 

2260 self, buf: Any, size: int, rwflag: Any, userdata: Any 

2261 ) -> int: 

2262 try: 

2263 if callable(self._passphrase): 

2264 if self._more_args: 

2265 result = self._passphrase(size, rwflag, userdata) 

2266 else: 

2267 result = self._passphrase(rwflag) 

2268 else: 

2269 assert self._passphrase is not None 

2270 result = self._passphrase 

2271 if not isinstance(result, bytes): 

2272 raise ValueError("Bytes expected") 

2273 if len(result) > size: 

2274 if self._truncate: 

2275 result = result[:size] 

2276 else: 

2277 raise ValueError( 

2278 "passphrase returned by callback is too long" 

2279 ) 

2280 for i in range(len(result)): 

2281 buf[i] = result[i : i + 1] 

2282 return len(result) 

2283 except Exception as e: 

2284 self._problems.append(e) 

2285 return 0 

2286 

2287 

2288def load_publickey(type: int, buffer: str | bytes) -> PKey: 

2289 """ 

2290 Load a public key from a buffer. 

2291 

2292 :param type: The file type (one of :data:`FILETYPE_PEM`, 

2293 :data:`FILETYPE_ASN1`). 

2294 :param buffer: The buffer the key is stored in. 

2295 :type buffer: A Python string object, either unicode or bytestring. 

2296 :return: The PKey object. 

2297 :rtype: :class:`PKey` 

2298 """ 

2299 if isinstance(buffer, str): 

2300 buffer = buffer.encode("ascii") 

2301 

2302 bio = _new_mem_buf(buffer) 

2303 

2304 if type == FILETYPE_PEM: 

2305 evp_pkey = _lib.PEM_read_bio_PUBKEY( 

2306 bio, _ffi.NULL, _ffi.NULL, _ffi.NULL 

2307 ) 

2308 elif type == FILETYPE_ASN1: 

2309 evp_pkey = _lib.d2i_PUBKEY_bio(bio, _ffi.NULL) 

2310 else: 

2311 raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1") 

2312 

2313 if evp_pkey == _ffi.NULL: 

2314 _raise_current_error() 

2315 

2316 pkey = PKey.__new__(PKey) 

2317 pkey._pkey = _ffi.gc(evp_pkey, _lib.EVP_PKEY_free) 

2318 pkey._only_public = True 

2319 return pkey 

2320 

2321 

2322def load_privatekey( 

2323 type: int, 

2324 buffer: str | bytes, 

2325 passphrase: PassphraseCallableT | None = None, 

2326) -> PKey: 

2327 """ 

2328 Load a private key (PKey) from the string *buffer* encoded with the type 

2329 *type*. 

2330 

2331 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1) 

2332 :param buffer: The buffer the key is stored in 

2333 :param passphrase: (optional) if encrypted PEM format, this can be 

2334 either the passphrase to use, or a callback for 

2335 providing the passphrase. 

2336 

2337 :return: The PKey object 

2338 """ 

2339 if isinstance(buffer, str): 

2340 buffer = buffer.encode("ascii") 

2341 

2342 bio = _new_mem_buf(buffer) 

2343 

2344 helper = _PassphraseHelper(type, passphrase) 

2345 if type == FILETYPE_PEM: 

2346 evp_pkey = _lib.PEM_read_bio_PrivateKey( 

2347 bio, _ffi.NULL, helper.callback, helper.callback_args 

2348 ) 

2349 helper.raise_if_problem() 

2350 elif type == FILETYPE_ASN1: 

2351 evp_pkey = _lib.d2i_PrivateKey_bio(bio, _ffi.NULL) 

2352 else: 

2353 raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1") 

2354 

2355 if evp_pkey == _ffi.NULL: 

2356 _raise_current_error() 

2357 

2358 pkey = PKey.__new__(PKey) 

2359 pkey._pkey = _ffi.gc(evp_pkey, _lib.EVP_PKEY_free) 

2360 return pkey 

2361 

2362 

2363def dump_certificate_request(type: int, req: X509Req) -> bytes: 

2364 """ 

2365 Dump the certificate request *req* into a buffer string encoded with the 

2366 type *type*. 

2367 

2368 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1) 

2369 :param req: The certificate request to dump 

2370 :return: The buffer with the dumped certificate request in 

2371 

2372 

2373 .. deprecated:: 24.2.0 

2374 Use `cryptography.x509.CertificateSigningRequest` instead. 

2375 """ 

2376 bio = _new_mem_buf() 

2377 

2378 if type == FILETYPE_PEM: 

2379 result_code = _lib.PEM_write_bio_X509_REQ(bio, req._req) 

2380 elif type == FILETYPE_ASN1: 

2381 result_code = _lib.i2d_X509_REQ_bio(bio, req._req) 

2382 elif type == FILETYPE_TEXT: 

2383 result_code = _lib.X509_REQ_print_ex(bio, req._req, 0, 0) 

2384 else: 

2385 raise ValueError( 

2386 "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or " 

2387 "FILETYPE_TEXT" 

2388 ) 

2389 

2390 _openssl_assert(result_code != 0) 

2391 

2392 return _bio_to_string(bio) 

2393 

2394 

2395_dump_certificate_request_internal = dump_certificate_request 

2396 

2397utils.deprecated( 

2398 dump_certificate_request, 

2399 __name__, 

2400 ( 

2401 "CSR support in pyOpenSSL is deprecated. You should use the APIs " 

2402 "in cryptography." 

2403 ), 

2404 DeprecationWarning, 

2405 name="dump_certificate_request", 

2406) 

2407 

2408 

2409def load_certificate_request(type: int, buffer: bytes) -> X509Req: 

2410 """ 

2411 Load a certificate request (X509Req) from the string *buffer* encoded with 

2412 the type *type*. 

2413 

2414 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1) 

2415 :param buffer: The buffer the certificate request is stored in 

2416 :return: The X509Req object 

2417 

2418 .. deprecated:: 24.2.0 

2419 Use `cryptography.x509.load_der_x509_csr` or 

2420 `cryptography.x509.load_pem_x509_csr` instead. 

2421 """ 

2422 if isinstance(buffer, str): 

2423 buffer = buffer.encode("ascii") 

2424 

2425 bio = _new_mem_buf(buffer) 

2426 

2427 if type == FILETYPE_PEM: 

2428 req = _lib.PEM_read_bio_X509_REQ(bio, _ffi.NULL, _ffi.NULL, _ffi.NULL) 

2429 elif type == FILETYPE_ASN1: 

2430 req = _lib.d2i_X509_REQ_bio(bio, _ffi.NULL) 

2431 else: 

2432 raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1") 

2433 

2434 _openssl_assert(req != _ffi.NULL) 

2435 

2436 x509req = X509Req.__new__(X509Req) 

2437 x509req._req = _ffi.gc(req, _lib.X509_REQ_free) 

2438 return x509req 

2439 

2440 

2441_load_certificate_request_internal = load_certificate_request 

2442 

2443utils.deprecated( 

2444 load_certificate_request, 

2445 __name__, 

2446 ( 

2447 "CSR support in pyOpenSSL is deprecated. You should use the APIs " 

2448 "in cryptography." 

2449 ), 

2450 DeprecationWarning, 

2451 name="load_certificate_request", 

2452)