Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/OpenSSL/crypto.py: 31%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import calendar
4import datetime
5import functools
6import sys
7import typing
8from base64 import b16encode
9from collections.abc import Sequence
10from functools import partial
11from typing import (
12 Any,
13 Callable,
14 Union,
15)
17if sys.version_info >= (3, 13):
18 from warnings import deprecated
19elif sys.version_info < (3, 8):
20 _T = typing.TypeVar("T")
22 def deprecated(msg: str, **kwargs: object) -> Callable[[_T], _T]:
23 return lambda f: f
24else:
25 from typing_extensions import deprecated
27from cryptography import utils, x509
28from cryptography.hazmat.primitives.asymmetric import (
29 dsa,
30 ec,
31 ed448,
32 ed25519,
33 rsa,
34)
36from OpenSSL._util import StrOrBytesPath
37from OpenSSL._util import (
38 byte_string as _byte_string,
39)
40from OpenSSL._util import (
41 exception_from_error_queue as _exception_from_error_queue,
42)
43from OpenSSL._util import (
44 ffi as _ffi,
45)
46from OpenSSL._util import (
47 lib as _lib,
48)
49from OpenSSL._util import (
50 make_assert as _make_assert,
51)
52from OpenSSL._util import (
53 path_bytes as _path_bytes,
54)
56__all__ = [
57 "FILETYPE_ASN1",
58 "FILETYPE_PEM",
59 "FILETYPE_TEXT",
60 "TYPE_DSA",
61 "TYPE_RSA",
62 "X509",
63 "Error",
64 "PKey",
65 "X509Name",
66 "X509Req",
67 "X509Store",
68 "X509StoreContext",
69 "X509StoreContextError",
70 "X509StoreFlags",
71 "dump_certificate",
72 "dump_certificate_request",
73 "dump_privatekey",
74 "dump_publickey",
75 "get_elliptic_curve",
76 "get_elliptic_curves",
77 "load_certificate",
78 "load_certificate_request",
79 "load_privatekey",
80 "load_publickey",
81]
84_PrivateKey = Union[
85 dsa.DSAPrivateKey,
86 ec.EllipticCurvePrivateKey,
87 ed25519.Ed25519PrivateKey,
88 ed448.Ed448PrivateKey,
89 rsa.RSAPrivateKey,
90]
91_PublicKey = Union[
92 dsa.DSAPublicKey,
93 ec.EllipticCurvePublicKey,
94 ed25519.Ed25519PublicKey,
95 ed448.Ed448PublicKey,
96 rsa.RSAPublicKey,
97]
98_Key = Union[_PrivateKey, _PublicKey]
99PassphraseCallableT = Union[bytes, Callable[..., bytes]]
102FILETYPE_PEM: int = _lib.SSL_FILETYPE_PEM
103FILETYPE_ASN1: int = _lib.SSL_FILETYPE_ASN1
105# TODO This was an API mistake. OpenSSL has no such constant.
106FILETYPE_TEXT = 2**16 - 1
108TYPE_RSA: int = _lib.EVP_PKEY_RSA
109TYPE_DSA: int = _lib.EVP_PKEY_DSA
110TYPE_DH: int = _lib.EVP_PKEY_DH
111TYPE_EC: int = _lib.EVP_PKEY_EC
114class Error(Exception):
115 """
116 An error occurred in an `OpenSSL.crypto` API.
117 """
120_raise_current_error = partial(_exception_from_error_queue, Error)
121_openssl_assert = _make_assert(Error)
124def _new_mem_buf(buffer: bytes | None = None) -> Any:
125 """
126 Allocate a new OpenSSL memory BIO.
128 Arrange for the garbage collector to clean it up automatically.
130 :param buffer: None or some bytes to use to put into the BIO so that they
131 can be read out.
132 """
133 if buffer is None:
134 bio = _lib.BIO_new(_lib.BIO_s_mem())
135 free = _lib.BIO_free
136 else:
137 data = _ffi.new("char[]", buffer)
138 bio = _lib.BIO_new_mem_buf(data, len(buffer))
140 # Keep the memory alive as long as the bio is alive!
141 def free(bio: Any, ref: Any = data) -> Any:
142 return _lib.BIO_free(bio)
144 _openssl_assert(bio != _ffi.NULL)
146 bio = _ffi.gc(bio, free)
147 return bio
150def _bio_to_string(bio: Any) -> bytes:
151 """
152 Copy the contents of an OpenSSL BIO object into a Python byte string.
153 """
154 result_buffer = _ffi.new("char**")
155 buffer_length = _lib.BIO_get_mem_data(bio, result_buffer)
156 return _ffi.buffer(result_buffer[0], buffer_length)[:]
159def _set_asn1_time(boundary: Any, when: bytes) -> None:
160 """
161 The the time value of an ASN1 time object.
163 @param boundary: An ASN1_TIME pointer (or an object safely
164 castable to that type) which will have its value set.
165 @param when: A string representation of the desired time value.
167 @raise TypeError: If C{when} is not a L{bytes} string.
168 @raise ValueError: If C{when} does not represent a time in the required
169 format.
170 @raise RuntimeError: If the time value cannot be set for some other
171 (unspecified) reason.
172 """
173 if not isinstance(when, bytes):
174 raise TypeError("when must be a byte string")
175 # ASN1_TIME_set_string validates the string without writing anything
176 # when the destination is NULL.
177 _openssl_assert(boundary != _ffi.NULL)
179 set_result = _lib.ASN1_TIME_set_string(boundary, when)
180 if set_result == 0:
181 raise ValueError("Invalid string")
184def _new_asn1_time(when: bytes) -> Any:
185 """
186 Behaves like _set_asn1_time but returns a new ASN1_TIME object.
188 @param when: A string representation of the desired time value.
190 @raise TypeError: If C{when} is not a L{bytes} string.
191 @raise ValueError: If C{when} does not represent a time in the required
192 format.
193 @raise RuntimeError: If the time value cannot be set for some other
194 (unspecified) reason.
195 """
196 ret = _lib.ASN1_TIME_new()
197 _openssl_assert(ret != _ffi.NULL)
198 ret = _ffi.gc(ret, _lib.ASN1_TIME_free)
199 _set_asn1_time(ret, when)
200 return ret
203def _get_asn1_time(timestamp: Any) -> bytes | None:
204 """
205 Retrieve the time value of an ASN1 time object.
207 @param timestamp: An ASN1_GENERALIZEDTIME* (or an object safely castable to
208 that type) from which the time value will be retrieved.
210 @return: The time value from C{timestamp} as a L{bytes} string in a certain
211 format. Or C{None} if the object contains no time value.
212 """
213 string_timestamp = _ffi.cast("ASN1_STRING*", timestamp)
214 if _lib.ASN1_STRING_length(string_timestamp) == 0:
215 return None
216 elif (
217 _lib.ASN1_STRING_type(string_timestamp) == _lib.V_ASN1_GENERALIZEDTIME
218 ):
219 return _ffi.string(_lib.ASN1_STRING_get0_data(string_timestamp))
220 else:
221 generalized_timestamp = _ffi.new("ASN1_GENERALIZEDTIME**")
222 _lib.ASN1_TIME_to_generalizedtime(timestamp, generalized_timestamp)
223 _openssl_assert(generalized_timestamp[0] != _ffi.NULL)
225 string_timestamp = _ffi.cast("ASN1_STRING*", generalized_timestamp[0])
226 string_data = _lib.ASN1_STRING_get0_data(string_timestamp)
227 string_result = _ffi.string(string_data)
228 _lib.ASN1_GENERALIZEDTIME_free(generalized_timestamp[0])
229 return string_result
232class _X509NameInvalidator:
233 def __init__(self) -> None:
234 self._names: list[X509Name] = []
236 def add(self, name: X509Name) -> None:
237 self._names.append(name)
239 def clear(self) -> None:
240 for name in self._names:
241 # Breaks the object, but also prevents UAF!
242 del name._name
245class PKey:
246 """
247 A class representing an DSA or RSA public key or key pair.
248 """
250 _only_public = False
251 _initialized = True
253 def __init__(self) -> None:
254 pkey = _lib.EVP_PKEY_new()
255 self._pkey = _ffi.gc(pkey, _lib.EVP_PKEY_free)
256 self._initialized = False
258 def to_cryptography_key(self) -> _Key:
259 """
260 Export as a ``cryptography`` key.
262 :rtype: One of ``cryptography``'s `key interfaces`_.
264 .. _key interfaces: https://cryptography.io/en/latest/hazmat/\
265 primitives/asymmetric/rsa/#key-interfaces
267 .. versionadded:: 16.1.0
268 """
269 from cryptography.hazmat.primitives.serialization import (
270 load_der_private_key,
271 load_der_public_key,
272 )
274 if self._only_public:
275 der = dump_publickey(FILETYPE_ASN1, self)
276 return typing.cast(_Key, load_der_public_key(der))
277 else:
278 der = dump_privatekey(FILETYPE_ASN1, self)
279 return typing.cast(_Key, load_der_private_key(der, password=None))
281 @classmethod
282 def from_cryptography_key(cls, crypto_key: _Key) -> PKey:
283 """
284 Construct based on a ``cryptography`` *crypto_key*.
286 :param crypto_key: A ``cryptography`` key.
287 :type crypto_key: One of ``cryptography``'s `key interfaces`_.
289 :rtype: PKey
291 .. versionadded:: 16.1.0
292 """
293 if not isinstance(
294 crypto_key,
295 (
296 dsa.DSAPrivateKey,
297 dsa.DSAPublicKey,
298 ec.EllipticCurvePrivateKey,
299 ec.EllipticCurvePublicKey,
300 ed25519.Ed25519PrivateKey,
301 ed25519.Ed25519PublicKey,
302 ed448.Ed448PrivateKey,
303 ed448.Ed448PublicKey,
304 rsa.RSAPrivateKey,
305 rsa.RSAPublicKey,
306 ),
307 ):
308 raise TypeError("Unsupported key type")
310 from cryptography.hazmat.primitives.serialization import (
311 Encoding,
312 NoEncryption,
313 PrivateFormat,
314 PublicFormat,
315 )
317 if isinstance(
318 crypto_key,
319 (
320 dsa.DSAPublicKey,
321 ec.EllipticCurvePublicKey,
322 ed25519.Ed25519PublicKey,
323 ed448.Ed448PublicKey,
324 rsa.RSAPublicKey,
325 ),
326 ):
327 return load_publickey(
328 FILETYPE_ASN1,
329 crypto_key.public_bytes(
330 Encoding.DER, PublicFormat.SubjectPublicKeyInfo
331 ),
332 )
333 else:
334 der = crypto_key.private_bytes(
335 Encoding.DER, PrivateFormat.PKCS8, NoEncryption()
336 )
337 return load_privatekey(FILETYPE_ASN1, der)
339 def generate_key(self, type: int, bits: int) -> None:
340 """
341 Generate a key pair of the given type, with the given number of bits.
343 This generates a key "into" the this object.
345 :param type: The key type.
346 :type type: :py:data:`TYPE_RSA` or :py:data:`TYPE_DSA`
347 :param bits: The number of bits.
348 :type bits: :py:data:`int` ``>= 0``
349 :raises TypeError: If :py:data:`type` or :py:data:`bits` isn't
350 of the appropriate type.
351 :raises ValueError: If the number of bits isn't an integer of
352 the appropriate size.
353 :return: ``None``
354 """
355 if not isinstance(type, int):
356 raise TypeError("type must be an integer")
358 if not isinstance(bits, int):
359 raise TypeError("bits must be an integer")
361 if type == TYPE_RSA:
362 if bits <= 0:
363 raise ValueError("Invalid number of bits")
365 # TODO Check error return
366 exponent = _lib.BN_new()
367 exponent = _ffi.gc(exponent, _lib.BN_free)
368 _lib.BN_set_word(exponent, _lib.RSA_F4)
370 rsa = _lib.RSA_new()
372 result = _lib.RSA_generate_key_ex(rsa, bits, exponent, _ffi.NULL)
373 _openssl_assert(result == 1)
375 result = _lib.EVP_PKEY_assign_RSA(self._pkey, rsa)
376 _openssl_assert(result == 1)
378 elif type == TYPE_DSA:
379 dsa = _lib.DSA_new()
380 _openssl_assert(dsa != _ffi.NULL)
382 dsa = _ffi.gc(dsa, _lib.DSA_free)
383 res = _lib.DSA_generate_parameters_ex(
384 dsa, bits, _ffi.NULL, 0, _ffi.NULL, _ffi.NULL, _ffi.NULL
385 )
386 _openssl_assert(res == 1)
388 _openssl_assert(_lib.DSA_generate_key(dsa) == 1)
389 _openssl_assert(_lib.EVP_PKEY_set1_DSA(self._pkey, dsa) == 1)
390 else:
391 raise Error("No such key type")
393 self._initialized = True
395 def check(self) -> bool:
396 """
397 Check the consistency of an RSA private key.
399 This is the Python equivalent of OpenSSL's ``RSA_check_key``.
401 :return: ``True`` if key is consistent.
403 :raise OpenSSL.crypto.Error: if the key is inconsistent.
405 :raise TypeError: if the key is of a type which cannot be checked.
406 Only RSA keys can currently be checked.
407 """
408 if self._only_public:
409 raise TypeError("public key only")
411 if _lib.EVP_PKEY_type(self.type()) != _lib.EVP_PKEY_RSA:
412 raise TypeError("Only RSA keys can currently be checked.")
414 rsa = _lib.EVP_PKEY_get1_RSA(self._pkey)
415 rsa = _ffi.gc(rsa, _lib.RSA_free)
416 result = _lib.RSA_check_key(rsa)
417 if result == 1:
418 return True
419 _raise_current_error()
421 def type(self) -> int:
422 """
423 Returns the type of the key
425 :return: The type of the key.
426 """
427 return _lib.EVP_PKEY_id(self._pkey)
429 def bits(self) -> int:
430 """
431 Returns the number of bits of the key
433 :return: The number of bits of the key.
434 """
435 return _lib.EVP_PKEY_bits(self._pkey)
438class _EllipticCurve:
439 """
440 A representation of a supported elliptic curve.
442 @cvar _curves: :py:obj:`None` until an attempt is made to load the curves.
443 Thereafter, a :py:type:`set` containing :py:type:`_EllipticCurve`
444 instances each of which represents one curve supported by the system.
445 @type _curves: :py:type:`NoneType` or :py:type:`set`
446 """
448 _curves = None
450 def __ne__(self, other: Any) -> bool:
451 """
452 Implement cooperation with the right-hand side argument of ``!=``.
454 Python 3 seems to have dropped this cooperation in this very narrow
455 circumstance.
456 """
457 if isinstance(other, _EllipticCurve):
458 return super().__ne__(other)
459 return NotImplemented
461 @classmethod
462 def _load_elliptic_curves(cls, lib: Any) -> set[_EllipticCurve]:
463 """
464 Get the curves supported by OpenSSL.
466 :param lib: The OpenSSL library binding object.
468 :return: A :py:type:`set` of ``cls`` instances giving the names of the
469 elliptic curves the underlying library supports.
470 """
471 num_curves = lib.EC_get_builtin_curves(_ffi.NULL, 0)
472 builtin_curves = _ffi.new("EC_builtin_curve[]", num_curves)
473 # The return value on this call should be num_curves again. We
474 # could check it to make sure but if it *isn't* then.. what could
475 # we do? Abort the whole process, I suppose...? -exarkun
476 lib.EC_get_builtin_curves(builtin_curves, num_curves)
477 return set(cls.from_nid(lib, c.nid) for c in builtin_curves)
479 @classmethod
480 def _get_elliptic_curves(cls, lib: Any) -> set[_EllipticCurve]:
481 """
482 Get, cache, and return the curves supported by OpenSSL.
484 :param lib: The OpenSSL library binding object.
486 :return: A :py:type:`set` of ``cls`` instances giving the names of the
487 elliptic curves the underlying library supports.
488 """
489 if cls._curves is None:
490 cls._curves = cls._load_elliptic_curves(lib)
491 return cls._curves
493 @classmethod
494 def from_nid(cls, lib: Any, nid: int) -> _EllipticCurve:
495 """
496 Instantiate a new :py:class:`_EllipticCurve` associated with the given
497 OpenSSL NID.
499 :param lib: The OpenSSL library binding object.
501 :param nid: The OpenSSL NID the resulting curve object will represent.
502 This must be a curve NID (and not, for example, a hash NID) or
503 subsequent operations will fail in unpredictable ways.
504 :type nid: :py:class:`int`
506 :return: The curve object.
507 """
508 return cls(lib, nid, _ffi.string(lib.OBJ_nid2sn(nid)).decode("ascii"))
510 def __init__(self, lib: Any, nid: int, name: str) -> None:
511 """
512 :param _lib: The :py:mod:`cryptography` binding instance used to
513 interface with OpenSSL.
515 :param _nid: The OpenSSL NID identifying the curve this object
516 represents.
517 :type _nid: :py:class:`int`
519 :param name: The OpenSSL short name identifying the curve this object
520 represents.
521 :type name: :py:class:`unicode`
522 """
523 self._lib = lib
524 self._nid = nid
525 self.name = name
527 def __repr__(self) -> str:
528 return f"<Curve {self.name!r}>"
530 def _to_EC_KEY(self) -> Any:
531 """
532 Create a new OpenSSL EC_KEY structure initialized to use this curve.
534 The structure is automatically garbage collected when the Python object
535 is garbage collected.
536 """
537 key = self._lib.EC_KEY_new_by_curve_name(self._nid)
538 return _ffi.gc(key, _lib.EC_KEY_free)
541@deprecated(
542 "get_elliptic_curves is deprecated. You should use the APIs in "
543 "cryptography instead."
544)
545def get_elliptic_curves() -> set[_EllipticCurve]:
546 """
547 Return a set of objects representing the elliptic curves supported in the
548 OpenSSL build in use.
550 The curve objects have a :py:class:`unicode` ``name`` attribute by which
551 they identify themselves.
553 The curve objects are useful as values for the argument accepted by
554 :py:meth:`Context.set_tmp_ecdh` to specify which elliptical curve should be
555 used for ECDHE key exchange.
556 """
557 return _EllipticCurve._get_elliptic_curves(_lib)
560@deprecated(
561 "get_elliptic_curve is deprecated. You should use the APIs in "
562 "cryptography instead."
563)
564def get_elliptic_curve(name: str) -> _EllipticCurve:
565 """
566 Return a single curve object selected by name.
568 See :py:func:`get_elliptic_curves` for information about curve objects.
570 :param name: The OpenSSL short name identifying the curve object to
571 retrieve.
572 :type name: :py:class:`unicode`
574 If the named curve is not supported then :py:class:`ValueError` is raised.
575 """
576 for curve in get_elliptic_curves():
577 if curve.name == name:
578 return curve
579 raise ValueError("unknown curve name", name)
582@functools.total_ordering
583class X509Name:
584 """
585 An X.509 Distinguished Name.
587 :ivar countryName: The country of the entity.
588 :ivar C: Alias for :py:attr:`countryName`.
590 :ivar stateOrProvinceName: The state or province of the entity.
591 :ivar ST: Alias for :py:attr:`stateOrProvinceName`.
593 :ivar localityName: The locality of the entity.
594 :ivar L: Alias for :py:attr:`localityName`.
596 :ivar organizationName: The organization name of the entity.
597 :ivar O: Alias for :py:attr:`organizationName`.
599 :ivar organizationalUnitName: The organizational unit of the entity.
600 :ivar OU: Alias for :py:attr:`organizationalUnitName`
602 :ivar commonName: The common name of the entity.
603 :ivar CN: Alias for :py:attr:`commonName`.
605 :ivar emailAddress: The e-mail address of the entity.
606 """
608 def __init__(self, name: X509Name) -> None:
609 """
610 Create a new X509Name, copying the given X509Name instance.
612 :param name: The name to copy.
613 :type name: :py:class:`X509Name`
614 """
615 name = _lib.X509_NAME_dup(name._name)
616 self._name: Any = _ffi.gc(name, _lib.X509_NAME_free)
618 def __setattr__(self, name: str, value: Any) -> None:
619 if name.startswith("_"):
620 return super().__setattr__(name, value)
622 # Note: we really do not want str subclasses here, so we do not use
623 # isinstance.
624 if type(name) is not str:
625 raise TypeError(
626 f"attribute name must be string, not "
627 f"'{type(value).__name__:.200}'"
628 )
630 nid = _lib.OBJ_txt2nid(_byte_string(name))
631 if nid == _lib.NID_undef:
632 try:
633 _raise_current_error()
634 except Error:
635 pass
636 raise AttributeError("No such attribute")
638 # If there's an old entry for this NID, remove it
639 for i in range(_lib.X509_NAME_entry_count(self._name)):
640 ent = _lib.X509_NAME_get_entry(self._name, i)
641 ent_obj = _lib.X509_NAME_ENTRY_get_object(ent)
642 ent_nid = _lib.OBJ_obj2nid(ent_obj)
643 if nid == ent_nid:
644 ent = _lib.X509_NAME_delete_entry(self._name, i)
645 _lib.X509_NAME_ENTRY_free(ent)
646 break
648 if isinstance(value, str):
649 value = value.encode("utf-8")
651 add_result = _lib.X509_NAME_add_entry_by_NID(
652 self._name, nid, _lib.MBSTRING_UTF8, value, len(value), -1, 0
653 )
654 if not add_result:
655 _raise_current_error()
657 def __getattr__(self, name: str) -> str | None:
658 """
659 Find attribute. An X509Name object has the following attributes:
660 countryName (alias C), stateOrProvince (alias ST), locality (alias L),
661 organization (alias O), organizationalUnit (alias OU), commonName
662 (alias CN) and more...
663 """
664 nid = _lib.OBJ_txt2nid(_byte_string(name))
665 if nid == _lib.NID_undef:
666 # This is a bit weird. OBJ_txt2nid indicated failure, but it seems
667 # a lower level function, a2d_ASN1_OBJECT, also feels the need to
668 # push something onto the error queue. If we don't clean that up
669 # now, someone else will bump into it later and be quite confused.
670 # See lp#314814.
671 try:
672 _raise_current_error()
673 except Error:
674 pass
675 raise AttributeError("No such attribute")
677 entry_index = _lib.X509_NAME_get_index_by_NID(self._name, nid, -1)
678 if entry_index == -1:
679 return None
681 entry = _lib.X509_NAME_get_entry(self._name, entry_index)
682 data = _lib.X509_NAME_ENTRY_get_data(entry)
684 result_buffer = _ffi.new("unsigned char**")
685 data_length = _lib.ASN1_STRING_to_UTF8(result_buffer, data)
686 _openssl_assert(data_length >= 0)
688 try:
689 result = _ffi.buffer(result_buffer[0], data_length)[:].decode(
690 "utf-8"
691 )
692 finally:
693 # XXX untested
694 _lib.OPENSSL_free(result_buffer[0])
695 return result
697 def __eq__(self, other: Any) -> bool:
698 if not isinstance(other, X509Name):
699 return NotImplemented
701 return _lib.X509_NAME_cmp(self._name, other._name) == 0
703 def __lt__(self, other: Any) -> bool:
704 if not isinstance(other, X509Name):
705 return NotImplemented
707 return _lib.X509_NAME_cmp(self._name, other._name) < 0
709 def __repr__(self) -> str:
710 """
711 String representation of an X509Name
712 """
713 result_buffer = _ffi.new("char[]", 512)
714 format_result = _lib.X509_NAME_oneline(
715 self._name, result_buffer, len(result_buffer)
716 )
717 _openssl_assert(format_result != _ffi.NULL)
719 return "<X509Name object '{}'>".format(
720 _ffi.string(result_buffer).decode("utf-8"),
721 )
723 def hash(self) -> int:
724 """
725 Return an integer representation of the first four bytes of the
726 MD5 digest of the DER representation of the name.
728 This is the Python equivalent of OpenSSL's ``X509_NAME_hash``.
730 :return: The (integer) hash of this name.
731 :rtype: :py:class:`int`
732 """
733 return _lib.X509_NAME_hash(self._name)
735 def der(self) -> bytes:
736 """
737 Return the DER encoding of this name.
739 :return: The DER encoded form of this name.
740 :rtype: :py:class:`bytes`
741 """
742 result_buffer = _ffi.new("unsigned char**")
743 encode_result = _lib.i2d_X509_NAME(self._name, result_buffer)
744 _openssl_assert(encode_result >= 0)
746 string_result = _ffi.buffer(result_buffer[0], encode_result)[:]
747 _lib.OPENSSL_free(result_buffer[0])
748 return string_result
750 def get_components(self) -> list[tuple[bytes, bytes]]:
751 """
752 Returns the components of this name, as a sequence of 2-tuples.
754 :return: The components of this name.
755 :rtype: :py:class:`list` of ``name, value`` tuples.
756 """
757 result = []
758 for i in range(_lib.X509_NAME_entry_count(self._name)):
759 ent = _lib.X509_NAME_get_entry(self._name, i)
761 fname = _lib.X509_NAME_ENTRY_get_object(ent)
762 fval = _lib.X509_NAME_ENTRY_get_data(ent)
764 nid = _lib.OBJ_obj2nid(fname)
765 name = _lib.OBJ_nid2sn(nid)
767 # ffi.string does not handle strings containing NULL bytes
768 # (which may have been generated by old, broken software)
769 value = _ffi.buffer(
770 _lib.ASN1_STRING_get0_data(fval), _lib.ASN1_STRING_length(fval)
771 )[:]
772 result.append((_ffi.string(name), value))
774 return result
777@deprecated(
778 "CSR support in pyOpenSSL is deprecated. You should use the APIs "
779 "in cryptography."
780)
781class X509Req:
782 """
783 An X.509 certificate signing requests.
785 .. deprecated:: 24.2.0
786 Use `cryptography.x509.CertificateSigningRequest` instead.
787 """
789 def __init__(self) -> None:
790 req = _lib.X509_REQ_new()
791 self._req = _ffi.gc(req, _lib.X509_REQ_free)
792 # Default to version 0.
793 self.set_version(0)
795 def to_cryptography(self) -> x509.CertificateSigningRequest:
796 """
797 Export as a ``cryptography`` certificate signing request.
799 :rtype: ``cryptography.x509.CertificateSigningRequest``
801 .. versionadded:: 17.1.0
802 """
803 from cryptography.x509 import load_der_x509_csr
805 der = _dump_certificate_request_internal(FILETYPE_ASN1, self)
807 return load_der_x509_csr(der)
809 @classmethod
810 def from_cryptography(
811 cls, crypto_req: x509.CertificateSigningRequest
812 ) -> X509Req:
813 """
814 Construct based on a ``cryptography`` *crypto_req*.
816 :param crypto_req: A ``cryptography`` X.509 certificate signing request
817 :type crypto_req: ``cryptography.x509.CertificateSigningRequest``
819 :rtype: X509Req
821 .. versionadded:: 17.1.0
822 """
823 if not isinstance(crypto_req, x509.CertificateSigningRequest):
824 raise TypeError("Must be a certificate signing request")
826 from cryptography.hazmat.primitives.serialization import Encoding
828 der = crypto_req.public_bytes(Encoding.DER)
829 return _load_certificate_request_internal(FILETYPE_ASN1, der)
831 def set_pubkey(self, pkey: PKey) -> None:
832 """
833 Set the public key of the certificate signing request.
835 :param pkey: The public key to use.
836 :type pkey: :py:class:`PKey`
838 :return: ``None``
839 """
840 set_result = _lib.X509_REQ_set_pubkey(self._req, pkey._pkey)
841 _openssl_assert(set_result == 1)
843 def get_pubkey(self) -> PKey:
844 """
845 Get the public key of the certificate signing request.
847 :return: The public key.
848 :rtype: :py:class:`PKey`
849 """
850 pkey = PKey.__new__(PKey)
851 pkey._pkey = _lib.X509_REQ_get_pubkey(self._req)
852 _openssl_assert(pkey._pkey != _ffi.NULL)
853 pkey._pkey = _ffi.gc(pkey._pkey, _lib.EVP_PKEY_free)
854 pkey._only_public = True
855 return pkey
857 def set_version(self, version: int) -> None:
858 """
859 Set the version subfield (RFC 2986, section 4.1) of the certificate
860 request.
862 :param int version: The version number.
863 :return: ``None``
864 """
865 if not isinstance(version, int):
866 raise TypeError("version must be an int")
867 if version != 0:
868 raise ValueError(
869 "Invalid version. The only valid version for X509Req is 0."
870 )
871 set_result = _lib.X509_REQ_set_version(self._req, version)
872 _openssl_assert(set_result == 1)
874 def get_version(self) -> int:
875 """
876 Get the version subfield (RFC 2459, section 4.1.2.1) of the certificate
877 request.
879 :return: The value of the version subfield.
880 :rtype: :py:class:`int`
881 """
882 return _lib.X509_REQ_get_version(self._req)
884 def get_subject(self) -> X509Name:
885 """
886 Return the subject of this certificate signing request.
888 This creates a new :class:`X509Name` that wraps the underlying subject
889 name field on the certificate signing request. Modifying it will modify
890 the underlying signing request, and will have the effect of modifying
891 any other :class:`X509Name` that refers to this subject.
893 :return: The subject of this certificate signing request.
894 :rtype: :class:`X509Name`
895 """
896 name = X509Name.__new__(X509Name)
897 name._name = _lib.X509_REQ_get_subject_name(self._req)
898 _openssl_assert(name._name != _ffi.NULL)
900 # The name is owned by the X509Req structure. As long as the X509Name
901 # Python object is alive, keep the X509Req Python object alive.
902 name._owner = self
904 return name
906 def sign(self, pkey: PKey, digest: str) -> None:
907 """
908 Sign the certificate signing request with this key and digest type.
910 :param pkey: The key pair to sign with.
911 :type pkey: :py:class:`PKey`
912 :param digest: The name of the message digest to use for the signature,
913 e.g. :py:data:`"sha256"`.
914 :type digest: :py:class:`str`
915 :return: ``None``
916 """
917 if pkey._only_public:
918 raise ValueError("Key has only public part")
920 if not pkey._initialized:
921 raise ValueError("Key is uninitialized")
923 digest_obj = _lib.EVP_get_digestbyname(_byte_string(digest))
924 if digest_obj == _ffi.NULL:
925 raise ValueError("No such digest method")
927 sign_result = _lib.X509_REQ_sign(self._req, pkey._pkey, digest_obj)
928 _openssl_assert(sign_result > 0)
930 def verify(self, pkey: PKey) -> bool:
931 """
932 Verifies the signature on this certificate signing request.
934 :param PKey key: A public key.
936 :return: ``True`` if the signature is correct.
937 :rtype: bool
939 :raises OpenSSL.crypto.Error: If the signature is invalid or there is a
940 problem verifying the signature.
941 """
942 if not isinstance(pkey, PKey):
943 raise TypeError("pkey must be a PKey instance")
945 result = _lib.X509_REQ_verify(self._req, pkey._pkey)
946 if result <= 0:
947 _raise_current_error()
949 return result
952class X509:
953 """
954 An X.509 certificate.
955 """
957 def __init__(self) -> None:
958 x509 = _lib.X509_new()
959 _openssl_assert(x509 != _ffi.NULL)
960 self._x509 = _ffi.gc(x509, _lib.X509_free)
962 self._issuer_invalidator = _X509NameInvalidator()
963 self._subject_invalidator = _X509NameInvalidator()
965 @classmethod
966 def _from_raw_x509_ptr(cls, x509: Any) -> X509:
967 cert = cls.__new__(cls)
968 cert._x509 = _ffi.gc(x509, _lib.X509_free)
969 cert._issuer_invalidator = _X509NameInvalidator()
970 cert._subject_invalidator = _X509NameInvalidator()
971 return cert
973 def to_cryptography(self) -> x509.Certificate:
974 """
975 Export as a ``cryptography`` certificate.
977 :rtype: ``cryptography.x509.Certificate``
979 .. versionadded:: 17.1.0
980 """
981 from cryptography.x509 import load_der_x509_certificate
983 der = dump_certificate(FILETYPE_ASN1, self)
984 return load_der_x509_certificate(der)
986 @classmethod
987 def from_cryptography(cls, crypto_cert: x509.Certificate) -> X509:
988 """
989 Construct based on a ``cryptography`` *crypto_cert*.
991 :param crypto_key: A ``cryptography`` X.509 certificate.
992 :type crypto_key: ``cryptography.x509.Certificate``
994 :rtype: X509
996 .. versionadded:: 17.1.0
997 """
998 if not isinstance(crypto_cert, x509.Certificate):
999 raise TypeError("Must be a certificate")
1001 from cryptography.hazmat.primitives.serialization import Encoding
1003 der = crypto_cert.public_bytes(Encoding.DER)
1004 return load_certificate(FILETYPE_ASN1, der)
1006 def set_version(self, version: int) -> None:
1007 """
1008 Set the version number of the certificate. Note that the
1009 version value is zero-based, eg. a value of 0 is V1.
1011 :param version: The version number of the certificate.
1012 :type version: :py:class:`int`
1014 :return: ``None``
1015 """
1016 if not isinstance(version, int):
1017 raise TypeError("version must be an integer")
1019 _openssl_assert(_lib.X509_set_version(self._x509, version) == 1)
1021 def get_version(self) -> int:
1022 """
1023 Return the version number of the certificate.
1025 :return: The version number of the certificate.
1026 :rtype: :py:class:`int`
1027 """
1028 return _lib.X509_get_version(self._x509)
1030 def get_pubkey(self) -> PKey:
1031 """
1032 Get the public key of the certificate.
1034 :return: The public key.
1035 :rtype: :py:class:`PKey`
1036 """
1037 pkey = PKey.__new__(PKey)
1038 pkey._pkey = _lib.X509_get_pubkey(self._x509)
1039 if pkey._pkey == _ffi.NULL:
1040 _raise_current_error()
1041 pkey._pkey = _ffi.gc(pkey._pkey, _lib.EVP_PKEY_free)
1042 pkey._only_public = True
1043 return pkey
1045 def set_pubkey(self, pkey: PKey) -> None:
1046 """
1047 Set the public key of the certificate.
1049 :param pkey: The public key.
1050 :type pkey: :py:class:`PKey`
1052 :return: :py:data:`None`
1053 """
1054 if not isinstance(pkey, PKey):
1055 raise TypeError("pkey must be a PKey instance")
1057 set_result = _lib.X509_set_pubkey(self._x509, pkey._pkey)
1058 _openssl_assert(set_result == 1)
1060 def sign(self, pkey: PKey, digest: str) -> None:
1061 """
1062 Sign the certificate with this key and digest type.
1064 :param pkey: The key to sign with.
1065 :type pkey: :py:class:`PKey`
1067 :param digest: The name of the message digest to use.
1068 :type digest: :py:class:`str`
1070 :return: :py:data:`None`
1071 """
1072 if not isinstance(pkey, PKey):
1073 raise TypeError("pkey must be a PKey instance")
1075 if pkey._only_public:
1076 raise ValueError("Key only has public part")
1078 if not pkey._initialized:
1079 raise ValueError("Key is uninitialized")
1081 evp_md = _lib.EVP_get_digestbyname(_byte_string(digest))
1082 if evp_md == _ffi.NULL:
1083 raise ValueError("No such digest method")
1085 sign_result = _lib.X509_sign(self._x509, pkey._pkey, evp_md)
1086 _openssl_assert(sign_result > 0)
1088 def get_signature_algorithm(self) -> bytes:
1089 """
1090 Return the signature algorithm used in the certificate.
1092 :return: The name of the algorithm.
1093 :rtype: :py:class:`bytes`
1095 :raises ValueError: If the signature algorithm is undefined.
1097 .. versionadded:: 0.13
1098 """
1099 sig_alg = _lib.X509_get0_tbs_sigalg(self._x509)
1100 alg = _ffi.new("ASN1_OBJECT **")
1101 _lib.X509_ALGOR_get0(alg, _ffi.NULL, _ffi.NULL, sig_alg)
1102 nid = _lib.OBJ_obj2nid(alg[0])
1103 if nid == _lib.NID_undef:
1104 raise ValueError("Undefined signature algorithm")
1105 return _ffi.string(_lib.OBJ_nid2ln(nid))
1107 def digest(self, digest_name: str) -> bytes:
1108 """
1109 Return the digest of the X509 object.
1111 :param digest_name: The name of the digest algorithm to use.
1112 :type digest_name: :py:class:`str`
1114 :return: The digest of the object, formatted as
1115 :py:const:`b":"`-delimited hex pairs.
1116 :rtype: :py:class:`bytes`
1117 """
1118 digest = _lib.EVP_get_digestbyname(_byte_string(digest_name))
1119 if digest == _ffi.NULL:
1120 raise ValueError("No such digest method")
1122 result_buffer = _ffi.new("unsigned char[]", _lib.EVP_MAX_MD_SIZE)
1123 result_length = _ffi.new("unsigned int[]", 1)
1124 result_length[0] = len(result_buffer)
1126 digest_result = _lib.X509_digest(
1127 self._x509, digest, result_buffer, result_length
1128 )
1129 _openssl_assert(digest_result == 1)
1131 return b":".join(
1132 [
1133 b16encode(ch).upper()
1134 for ch in _ffi.buffer(result_buffer, result_length[0])
1135 ]
1136 )
1138 def subject_name_hash(self) -> int:
1139 """
1140 Return the hash of the X509 subject.
1142 :return: The hash of the subject.
1143 :rtype: :py:class:`int`
1144 """
1145 return _lib.X509_subject_name_hash(self._x509)
1147 def set_serial_number(self, serial: int) -> None:
1148 """
1149 Set the serial number of the certificate.
1151 :param serial: The new serial number.
1152 :type serial: :py:class:`int`
1154 :return: :py:data`None`
1155 """
1156 if not isinstance(serial, int):
1157 raise TypeError("serial must be an integer")
1159 hex_serial = hex(serial)[2:]
1160 hex_serial_bytes = hex_serial.encode("ascii")
1162 bignum_serial = _ffi.new("BIGNUM**")
1164 # BN_hex2bn stores the result in &bignum.
1165 result = _lib.BN_hex2bn(bignum_serial, hex_serial_bytes)
1166 _openssl_assert(result != _ffi.NULL)
1168 asn1_serial = _lib.BN_to_ASN1_INTEGER(bignum_serial[0], _ffi.NULL)
1169 _lib.BN_free(bignum_serial[0])
1170 _openssl_assert(asn1_serial != _ffi.NULL)
1171 asn1_serial = _ffi.gc(asn1_serial, _lib.ASN1_INTEGER_free)
1172 set_result = _lib.X509_set_serialNumber(self._x509, asn1_serial)
1173 _openssl_assert(set_result == 1)
1175 def get_serial_number(self) -> int:
1176 """
1177 Return the serial number of this certificate.
1179 :return: The serial number.
1180 :rtype: int
1181 """
1182 asn1_serial = _lib.X509_get_serialNumber(self._x509)
1183 bignum_serial = _lib.ASN1_INTEGER_to_BN(asn1_serial, _ffi.NULL)
1184 try:
1185 hex_serial = _lib.BN_bn2hex(bignum_serial)
1186 try:
1187 hexstring_serial = _ffi.string(hex_serial)
1188 serial = int(hexstring_serial, 16)
1189 return serial
1190 finally:
1191 _lib.OPENSSL_free(hex_serial)
1192 finally:
1193 _lib.BN_free(bignum_serial)
1195 def gmtime_adj_notAfter(self, amount: int) -> None:
1196 """
1197 Adjust the time stamp on which the certificate stops being valid.
1199 :param int amount: The number of seconds by which to adjust the
1200 timestamp.
1201 :return: ``None``
1202 """
1203 if not isinstance(amount, int):
1204 raise TypeError("amount must be an integer")
1206 notAfter = _lib.X509_getm_notAfter(self._x509)
1207 _lib.X509_gmtime_adj(notAfter, amount)
1209 def gmtime_adj_notBefore(self, amount: int) -> None:
1210 """
1211 Adjust the timestamp on which the certificate starts being valid.
1213 :param amount: The number of seconds by which to adjust the timestamp.
1214 :return: ``None``
1215 """
1216 if not isinstance(amount, int):
1217 raise TypeError("amount must be an integer")
1219 notBefore = _lib.X509_getm_notBefore(self._x509)
1220 _lib.X509_gmtime_adj(notBefore, amount)
1222 def has_expired(self) -> bool:
1223 """
1224 Check whether the certificate has expired.
1226 :return: ``True`` if the certificate has expired, ``False`` otherwise.
1227 :rtype: bool
1228 """
1229 time_bytes = self.get_notAfter()
1230 if time_bytes is None:
1231 raise ValueError("Unable to determine notAfter")
1232 time_string = time_bytes.decode("utf-8")
1233 not_after = datetime.datetime.strptime(time_string, "%Y%m%d%H%M%SZ")
1235 UTC = datetime.timezone.utc
1236 utcnow = datetime.datetime.now(UTC).replace(tzinfo=None)
1237 return not_after < utcnow
1239 def _get_boundary_time(self, which: Any) -> bytes | None:
1240 return _get_asn1_time(which(self._x509))
1242 def get_notBefore(self) -> bytes | None:
1243 """
1244 Get the timestamp at which the certificate starts being valid.
1246 The timestamp is formatted as an ASN.1 TIME::
1248 YYYYMMDDhhmmssZ
1250 :return: A timestamp string, or ``None`` if there is none.
1251 :rtype: bytes or NoneType
1252 """
1253 return self._get_boundary_time(_lib.X509_getm_notBefore)
1255 def _set_boundary_time(
1256 self, which: Callable[..., Any], when: bytes
1257 ) -> None:
1258 return _set_asn1_time(which(self._x509), when)
1260 def set_notBefore(self, when: bytes) -> None:
1261 """
1262 Set the timestamp at which the certificate starts being valid.
1264 The timestamp is formatted as an ASN.1 TIME::
1266 YYYYMMDDhhmmssZ
1268 :param bytes when: A timestamp string.
1269 :return: ``None``
1270 """
1271 return self._set_boundary_time(_lib.X509_getm_notBefore, when)
1273 def get_notAfter(self) -> bytes | None:
1274 """
1275 Get the timestamp at which the certificate stops being valid.
1277 The timestamp is formatted as an ASN.1 TIME::
1279 YYYYMMDDhhmmssZ
1281 :return: A timestamp string, or ``None`` if there is none.
1282 :rtype: bytes or NoneType
1283 """
1284 return self._get_boundary_time(_lib.X509_getm_notAfter)
1286 def set_notAfter(self, when: bytes) -> None:
1287 """
1288 Set the timestamp at which the certificate stops being valid.
1290 The timestamp is formatted as an ASN.1 TIME::
1292 YYYYMMDDhhmmssZ
1294 :param bytes when: A timestamp string.
1295 :return: ``None``
1296 """
1297 return self._set_boundary_time(_lib.X509_getm_notAfter, when)
1299 def _get_name(self, which: Any) -> X509Name:
1300 name = X509Name.__new__(X509Name)
1301 name._name = which(self._x509)
1302 _openssl_assert(name._name != _ffi.NULL)
1304 # The name is owned by the X509 structure. As long as the X509Name
1305 # Python object is alive, keep the X509 Python object alive.
1306 name._owner = self
1308 return name
1310 def _set_name(self, which: Any, name: X509Name) -> None:
1311 if not isinstance(name, X509Name):
1312 raise TypeError("name must be an X509Name")
1313 set_result = which(self._x509, name._name)
1314 _openssl_assert(set_result == 1)
1316 def get_issuer(self) -> X509Name:
1317 """
1318 Return the issuer of this certificate.
1320 This creates a new :class:`X509Name` that wraps the underlying issuer
1321 name field on the certificate. Modifying it will modify the underlying
1322 certificate, and will have the effect of modifying any other
1323 :class:`X509Name` that refers to this issuer.
1325 :return: The issuer of this certificate.
1326 :rtype: :class:`X509Name`
1327 """
1328 name = self._get_name(_lib.X509_get_issuer_name)
1329 self._issuer_invalidator.add(name)
1330 return name
1332 def set_issuer(self, issuer: X509Name) -> None:
1333 """
1334 Set the issuer of this certificate.
1336 :param issuer: The issuer.
1337 :type issuer: :py:class:`X509Name`
1339 :return: ``None``
1340 """
1341 self._set_name(_lib.X509_set_issuer_name, issuer)
1342 self._issuer_invalidator.clear()
1344 def get_subject(self) -> X509Name:
1345 """
1346 Return the subject of this certificate.
1348 This creates a new :class:`X509Name` that wraps the underlying subject
1349 name field on the certificate. Modifying it will modify the underlying
1350 certificate, and will have the effect of modifying any other
1351 :class:`X509Name` that refers to this subject.
1353 :return: The subject of this certificate.
1354 :rtype: :class:`X509Name`
1355 """
1356 name = self._get_name(_lib.X509_get_subject_name)
1357 self._subject_invalidator.add(name)
1358 return name
1360 def set_subject(self, subject: X509Name) -> None:
1361 """
1362 Set the subject of this certificate.
1364 :param subject: The subject.
1365 :type subject: :py:class:`X509Name`
1367 :return: ``None``
1368 """
1369 self._set_name(_lib.X509_set_subject_name, subject)
1370 self._subject_invalidator.clear()
1372 def get_extension_count(self) -> int:
1373 """
1374 Get the number of extensions on this certificate.
1376 :return: The number of extensions.
1377 :rtype: :py:class:`int`
1379 .. versionadded:: 0.12
1380 """
1381 return _lib.X509_get_ext_count(self._x509)
1384class X509StoreFlags:
1385 """
1386 Flags for X509 verification, used to change the behavior of
1387 :class:`X509Store`.
1389 See `OpenSSL Verification Flags`_ for details.
1391 .. _OpenSSL Verification Flags:
1392 https://www.openssl.org/docs/manmaster/man3/X509_VERIFY_PARAM_set_flags.html
1393 """
1395 CRL_CHECK: int = _lib.X509_V_FLAG_CRL_CHECK
1396 CRL_CHECK_ALL: int = _lib.X509_V_FLAG_CRL_CHECK_ALL
1397 IGNORE_CRITICAL: int = _lib.X509_V_FLAG_IGNORE_CRITICAL
1398 X509_STRICT: int = _lib.X509_V_FLAG_X509_STRICT
1399 ALLOW_PROXY_CERTS: int = _lib.X509_V_FLAG_ALLOW_PROXY_CERTS
1400 POLICY_CHECK: int = _lib.X509_V_FLAG_POLICY_CHECK
1401 EXPLICIT_POLICY: int = _lib.X509_V_FLAG_EXPLICIT_POLICY
1402 INHIBIT_MAP: int = _lib.X509_V_FLAG_INHIBIT_MAP
1403 CHECK_SS_SIGNATURE: int = _lib.X509_V_FLAG_CHECK_SS_SIGNATURE
1404 PARTIAL_CHAIN: int = _lib.X509_V_FLAG_PARTIAL_CHAIN
1407class X509Store:
1408 """
1409 An X.509 store.
1411 An X.509 store is used to describe a context in which to verify a
1412 certificate. A description of a context may include a set of certificates
1413 to trust, a set of certificate revocation lists, verification flags and
1414 more.
1416 An X.509 store, being only a description, cannot be used by itself to
1417 verify a certificate. To carry out the actual verification process, see
1418 :class:`X509StoreContext`.
1419 """
1421 def __init__(self) -> None:
1422 store = _lib.X509_STORE_new()
1423 self._store = _ffi.gc(store, _lib.X509_STORE_free)
1425 def add_cert(self, cert: X509) -> None:
1426 """
1427 Adds a trusted certificate to this store.
1429 Adding a certificate with this method adds this certificate as a
1430 *trusted* certificate.
1432 :param X509 cert: The certificate to add to this store.
1434 :raises TypeError: If the certificate is not an :class:`X509`.
1436 :raises OpenSSL.crypto.Error: If OpenSSL was unhappy with your
1437 certificate.
1439 :return: ``None`` if the certificate was added successfully.
1440 """
1441 if not isinstance(cert, X509):
1442 raise TypeError()
1444 res = _lib.X509_STORE_add_cert(self._store, cert._x509)
1445 _openssl_assert(res == 1)
1447 def add_crl(self, crl: x509.CertificateRevocationList) -> None:
1448 """
1449 Add a certificate revocation list to this store.
1451 The certificate revocation lists added to a store will only be used if
1452 the associated flags are configured to check certificate revocation
1453 lists.
1455 .. versionadded:: 16.1.0
1457 :param crl: The certificate revocation list to add to this store.
1458 :type crl: ``cryptography.x509.CertificateRevocationList``
1459 :return: ``None`` if the certificate revocation list was added
1460 successfully.
1461 """
1462 if isinstance(crl, x509.CertificateRevocationList):
1463 from cryptography.hazmat.primitives.serialization import Encoding
1465 bio = _new_mem_buf(crl.public_bytes(Encoding.DER))
1466 openssl_crl = _lib.d2i_X509_CRL_bio(bio, _ffi.NULL)
1467 _openssl_assert(openssl_crl != _ffi.NULL)
1468 crl = _ffi.gc(openssl_crl, _lib.X509_CRL_free)
1469 else:
1470 raise TypeError(
1471 "CRL must be of type "
1472 "cryptography.x509.CertificateRevocationList"
1473 )
1475 _openssl_assert(_lib.X509_STORE_add_crl(self._store, crl) != 0)
1477 def set_flags(self, flags: int) -> None:
1478 """
1479 Set verification flags to this store.
1481 Verification flags can be combined by oring them together.
1483 .. note::
1485 Setting a verification flag sometimes requires clients to add
1486 additional information to the store, otherwise a suitable error will
1487 be raised.
1489 For example, in setting flags to enable CRL checking a
1490 suitable CRL must be added to the store otherwise an error will be
1491 raised.
1493 .. versionadded:: 16.1.0
1495 :param int flags: The verification flags to set on this store.
1496 See :class:`X509StoreFlags` for available constants.
1497 :return: ``None`` if the verification flags were successfully set.
1498 """
1499 _openssl_assert(_lib.X509_STORE_set_flags(self._store, flags) != 0)
1501 def set_time(self, vfy_time: datetime.datetime) -> None:
1502 """
1503 Set the time against which the certificates are verified.
1505 Normally the current time is used.
1507 .. note::
1509 For example, you can determine if a certificate was valid at a given
1510 time.
1512 .. versionadded:: 17.0.0
1514 :param datetime vfy_time: The verification time to set on this store.
1515 :return: ``None`` if the verification time was successfully set.
1516 """
1517 param = _lib.X509_VERIFY_PARAM_new()
1518 param = _ffi.gc(param, _lib.X509_VERIFY_PARAM_free)
1520 _lib.X509_VERIFY_PARAM_set_time(
1521 param, calendar.timegm(vfy_time.timetuple())
1522 )
1523 _openssl_assert(_lib.X509_STORE_set1_param(self._store, param) != 0)
1525 def load_locations(
1526 self,
1527 cafile: StrOrBytesPath | None,
1528 capath: StrOrBytesPath | None = None,
1529 ) -> None:
1530 """
1531 Let X509Store know where we can find trusted certificates for the
1532 certificate chain. Note that the certificates have to be in PEM
1533 format.
1535 If *capath* is passed, it must be a directory prepared using the
1536 ``c_rehash`` tool included with OpenSSL. Either, but not both, of
1537 *cafile* or *capath* may be ``None``.
1539 .. note::
1541 Both *cafile* and *capath* may be set simultaneously.
1543 Call this method multiple times to add more than one location.
1544 For example, CA certificates, and certificate revocation list bundles
1545 may be passed in *cafile* in subsequent calls to this method.
1547 .. versionadded:: 20.0
1549 :param cafile: In which file we can find the certificates (``bytes`` or
1550 ``unicode``).
1551 :param capath: In which directory we can find the certificates
1552 (``bytes`` or ``unicode``).
1554 :return: ``None`` if the locations were set successfully.
1556 :raises OpenSSL.crypto.Error: If both *cafile* and *capath* is ``None``
1557 or the locations could not be set for any reason.
1559 """
1560 if cafile is None:
1561 cafile = _ffi.NULL
1562 else:
1563 cafile = _path_bytes(cafile)
1565 if capath is None:
1566 capath = _ffi.NULL
1567 else:
1568 capath = _path_bytes(capath)
1570 load_result = _lib.X509_STORE_load_locations(
1571 self._store, cafile, capath
1572 )
1573 if not load_result:
1574 _raise_current_error()
1577class X509StoreContextError(Exception):
1578 """
1579 An exception raised when an error occurred while verifying a certificate
1580 using `OpenSSL.X509StoreContext.verify_certificate`.
1582 :ivar certificate: The certificate which caused verificate failure.
1583 :type certificate: :class:`X509`
1584 """
1586 def __init__(
1587 self, message: str, errors: list[Any], certificate: X509
1588 ) -> None:
1589 super().__init__(message)
1590 self.errors = errors
1591 self.certificate = certificate
1594class X509StoreContext:
1595 """
1596 An X.509 store context.
1598 An X.509 store context is used to carry out the actual verification process
1599 of a certificate in a described context. For describing such a context, see
1600 :class:`X509Store`.
1602 :param X509Store store: The certificates which will be trusted for the
1603 purposes of any verifications.
1604 :param X509 certificate: The certificate to be verified.
1605 :param chain: List of untrusted certificates that may be used for building
1606 the certificate chain. May be ``None``.
1607 :type chain: :class:`list` of :class:`X509`
1608 """
1610 def __init__(
1611 self,
1612 store: X509Store,
1613 certificate: X509,
1614 chain: Sequence[X509] | None = None,
1615 ) -> None:
1616 self._store = store
1617 self._cert = certificate
1618 self._chain = self._build_certificate_stack(chain)
1620 @staticmethod
1621 def _build_certificate_stack(
1622 certificates: Sequence[X509] | None,
1623 ) -> None:
1624 def cleanup(s: Any) -> None:
1625 # Equivalent to sk_X509_pop_free, but we don't
1626 # currently have a CFFI binding for that available
1627 for i in range(_lib.sk_X509_num(s)):
1628 x = _lib.sk_X509_value(s, i)
1629 _lib.X509_free(x)
1630 _lib.sk_X509_free(s)
1632 if certificates is None or len(certificates) == 0:
1633 return _ffi.NULL
1635 stack = _lib.sk_X509_new_null()
1636 _openssl_assert(stack != _ffi.NULL)
1637 stack = _ffi.gc(stack, cleanup)
1639 for cert in certificates:
1640 if not isinstance(cert, X509):
1641 raise TypeError("One of the elements is not an X509 instance")
1643 _openssl_assert(_lib.X509_up_ref(cert._x509) > 0)
1644 if _lib.sk_X509_push(stack, cert._x509) <= 0:
1645 _lib.X509_free(cert._x509)
1646 _raise_current_error()
1648 return stack
1650 @staticmethod
1651 def _exception_from_context(store_ctx: Any) -> X509StoreContextError:
1652 """
1653 Convert an OpenSSL native context error failure into a Python
1654 exception.
1656 When a call to native OpenSSL X509_verify_cert fails, additional
1657 information about the failure can be obtained from the store context.
1658 """
1659 message = _ffi.string(
1660 _lib.X509_verify_cert_error_string(
1661 _lib.X509_STORE_CTX_get_error(store_ctx)
1662 )
1663 ).decode("utf-8")
1664 errors = [
1665 _lib.X509_STORE_CTX_get_error(store_ctx),
1666 _lib.X509_STORE_CTX_get_error_depth(store_ctx),
1667 message,
1668 ]
1669 # A context error should always be associated with a certificate, so we
1670 # expect this call to never return :class:`None`.
1671 _x509 = _lib.X509_STORE_CTX_get_current_cert(store_ctx)
1672 _cert = _lib.X509_dup(_x509)
1673 pycert = X509._from_raw_x509_ptr(_cert)
1674 return X509StoreContextError(message, errors, pycert)
1676 def _verify_certificate(self) -> Any:
1677 """
1678 Verifies the certificate and runs an X509_STORE_CTX containing the
1679 results.
1681 :raises X509StoreContextError: If an error occurred when validating a
1682 certificate in the context. Sets ``certificate`` attribute to
1683 indicate which certificate caused the error.
1684 """
1685 store_ctx = _lib.X509_STORE_CTX_new()
1686 _openssl_assert(store_ctx != _ffi.NULL)
1687 store_ctx = _ffi.gc(store_ctx, _lib.X509_STORE_CTX_free)
1689 ret = _lib.X509_STORE_CTX_init(
1690 store_ctx, self._store._store, self._cert._x509, self._chain
1691 )
1692 _openssl_assert(ret == 1)
1694 ret = _lib.X509_verify_cert(store_ctx)
1695 if ret <= 0:
1696 raise self._exception_from_context(store_ctx)
1698 return store_ctx
1700 def set_store(self, store: X509Store) -> None:
1701 """
1702 Set the context's X.509 store.
1704 .. versionadded:: 0.15
1706 :param X509Store store: The store description which will be used for
1707 the purposes of any *future* verifications.
1708 """
1709 self._store = store
1711 def verify_certificate(self) -> None:
1712 """
1713 Verify a certificate in a context.
1715 .. versionadded:: 0.15
1717 :raises X509StoreContextError: If an error occurred when validating a
1718 certificate in the context. Sets ``certificate`` attribute to
1719 indicate which certificate caused the error.
1720 """
1721 self._verify_certificate()
1723 def get_verified_chain(self) -> list[X509]:
1724 """
1725 Verify a certificate in a context and return the complete validated
1726 chain.
1728 :raises X509StoreContextError: If an error occurred when validating a
1729 certificate in the context. Sets ``certificate`` attribute to
1730 indicate which certificate caused the error.
1732 .. versionadded:: 20.0
1733 """
1734 store_ctx = self._verify_certificate()
1736 # Note: X509_STORE_CTX_get1_chain returns a deep copy of the chain.
1737 cert_stack = _lib.X509_STORE_CTX_get1_chain(store_ctx)
1738 _openssl_assert(cert_stack != _ffi.NULL)
1740 result = []
1741 for i in range(_lib.sk_X509_num(cert_stack)):
1742 cert = _lib.sk_X509_value(cert_stack, i)
1743 _openssl_assert(cert != _ffi.NULL)
1744 pycert = X509._from_raw_x509_ptr(cert)
1745 result.append(pycert)
1747 # Free the stack but not the members which are freed by the X509 class.
1748 _lib.sk_X509_free(cert_stack)
1749 return result
1752def load_certificate(type: int, buffer: bytes) -> X509:
1753 """
1754 Load a certificate (X509) from the string *buffer* encoded with the
1755 type *type*.
1757 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
1759 :param bytes buffer: The buffer the certificate is stored in
1761 :return: The X509 object
1762 """
1763 if isinstance(buffer, str):
1764 buffer = buffer.encode("ascii")
1766 bio = _new_mem_buf(buffer)
1768 if type == FILETYPE_PEM:
1769 x509 = _lib.PEM_read_bio_X509(bio, _ffi.NULL, _ffi.NULL, _ffi.NULL)
1770 elif type == FILETYPE_ASN1:
1771 x509 = _lib.d2i_X509_bio(bio, _ffi.NULL)
1772 else:
1773 raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1")
1775 if x509 == _ffi.NULL:
1776 _raise_current_error()
1778 return X509._from_raw_x509_ptr(x509)
1781def dump_certificate(type: int, cert: X509) -> bytes:
1782 """
1783 Dump the certificate *cert* into a buffer string encoded with the type
1784 *type*.
1786 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1, or
1787 FILETYPE_TEXT)
1788 :param cert: The certificate to dump
1789 :return: The buffer with the dumped certificate in
1790 """
1791 bio = _new_mem_buf()
1793 if type == FILETYPE_PEM:
1794 result_code = _lib.PEM_write_bio_X509(bio, cert._x509)
1795 elif type == FILETYPE_ASN1:
1796 result_code = _lib.i2d_X509_bio(bio, cert._x509)
1797 elif type == FILETYPE_TEXT:
1798 result_code = _lib.X509_print_ex(bio, cert._x509, 0, 0)
1799 else:
1800 raise ValueError(
1801 "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or "
1802 "FILETYPE_TEXT"
1803 )
1805 _openssl_assert(result_code == 1)
1806 return _bio_to_string(bio)
1809def dump_publickey(type: int, pkey: PKey) -> bytes:
1810 """
1811 Dump a public key to a buffer.
1813 :param type: The file type (one of :data:`FILETYPE_PEM` or
1814 :data:`FILETYPE_ASN1`).
1815 :param PKey pkey: The public key to dump
1816 :return: The buffer with the dumped key in it.
1817 :rtype: bytes
1818 """
1819 bio = _new_mem_buf()
1820 if type == FILETYPE_PEM:
1821 write_bio = _lib.PEM_write_bio_PUBKEY
1822 elif type == FILETYPE_ASN1:
1823 write_bio = _lib.i2d_PUBKEY_bio
1824 else:
1825 raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1")
1827 result_code = write_bio(bio, pkey._pkey)
1828 if result_code != 1: # pragma: no cover
1829 _raise_current_error()
1831 return _bio_to_string(bio)
1834def dump_privatekey(
1835 type: int,
1836 pkey: PKey,
1837 cipher: str | None = None,
1838 passphrase: PassphraseCallableT | None = None,
1839) -> bytes:
1840 """
1841 Dump the private key *pkey* into a buffer string encoded with the type
1842 *type*. Optionally (if *type* is :const:`FILETYPE_PEM`) encrypting it
1843 using *cipher* and *passphrase*.
1845 :param type: The file type (one of :const:`FILETYPE_PEM`,
1846 :const:`FILETYPE_ASN1`, or :const:`FILETYPE_TEXT`)
1847 :param PKey pkey: The PKey to dump
1848 :param cipher: (optional) if encrypted PEM format, the cipher to use
1849 :param passphrase: (optional) if encrypted PEM format, this can be either
1850 the passphrase to use, or a callback for providing the passphrase.
1852 :return: The buffer with the dumped key in
1853 :rtype: bytes
1854 """
1855 bio = _new_mem_buf()
1857 if not isinstance(pkey, PKey):
1858 raise TypeError("pkey must be a PKey")
1860 if cipher is not None:
1861 if passphrase is None:
1862 raise TypeError(
1863 "if a value is given for cipher "
1864 "one must also be given for passphrase"
1865 )
1866 cipher_obj = _lib.EVP_get_cipherbyname(_byte_string(cipher))
1867 if cipher_obj == _ffi.NULL:
1868 raise ValueError("Invalid cipher name")
1869 else:
1870 cipher_obj = _ffi.NULL
1872 helper = _PassphraseHelper(type, passphrase)
1873 if type == FILETYPE_PEM:
1874 result_code = _lib.PEM_write_bio_PrivateKey(
1875 bio,
1876 pkey._pkey,
1877 cipher_obj,
1878 _ffi.NULL,
1879 0,
1880 helper.callback,
1881 helper.callback_args,
1882 )
1883 helper.raise_if_problem()
1884 elif type == FILETYPE_ASN1:
1885 result_code = _lib.i2d_PrivateKey_bio(bio, pkey._pkey)
1886 elif type == FILETYPE_TEXT:
1887 if _lib.EVP_PKEY_id(pkey._pkey) != _lib.EVP_PKEY_RSA:
1888 raise TypeError("Only RSA keys are supported for FILETYPE_TEXT")
1890 rsa = _ffi.gc(_lib.EVP_PKEY_get1_RSA(pkey._pkey), _lib.RSA_free)
1891 result_code = _lib.RSA_print(bio, rsa, 0)
1892 else:
1893 raise ValueError(
1894 "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or "
1895 "FILETYPE_TEXT"
1896 )
1898 _openssl_assert(result_code != 0)
1900 return _bio_to_string(bio)
1903class _PassphraseHelper:
1904 def __init__(
1905 self,
1906 type: int,
1907 passphrase: PassphraseCallableT | None,
1908 more_args: bool = False,
1909 truncate: bool = False,
1910 ) -> None:
1911 if type != FILETYPE_PEM and passphrase is not None:
1912 raise ValueError(
1913 "only FILETYPE_PEM key format supports encryption"
1914 )
1915 self._passphrase = passphrase
1916 self._more_args = more_args
1917 self._truncate = truncate
1918 self._problems: list[Exception] = []
1920 @property
1921 def callback(self) -> Any:
1922 if self._passphrase is None:
1923 return _ffi.NULL
1924 elif isinstance(self._passphrase, bytes) or callable(self._passphrase):
1925 return _ffi.callback("pem_password_cb", self._read_passphrase)
1926 else:
1927 raise TypeError(
1928 "Last argument must be a byte string or a callable."
1929 )
1931 @property
1932 def callback_args(self) -> Any:
1933 if self._passphrase is None:
1934 return _ffi.NULL
1935 elif isinstance(self._passphrase, bytes) or callable(self._passphrase):
1936 return _ffi.NULL
1937 else:
1938 raise TypeError(
1939 "Last argument must be a byte string or a callable."
1940 )
1942 def raise_if_problem(self, exceptionType: type[Exception] = Error) -> None:
1943 if self._problems:
1944 # Flush the OpenSSL error queue
1945 try:
1946 _exception_from_error_queue(exceptionType)
1947 except exceptionType:
1948 pass
1950 raise self._problems.pop(0)
1952 def _read_passphrase(
1953 self, buf: Any, size: int, rwflag: Any, userdata: Any
1954 ) -> int:
1955 try:
1956 if callable(self._passphrase):
1957 if self._more_args:
1958 result = self._passphrase(size, rwflag, userdata)
1959 else:
1960 result = self._passphrase(rwflag)
1961 else:
1962 assert self._passphrase is not None
1963 result = self._passphrase
1964 if not isinstance(result, bytes):
1965 raise ValueError("Bytes expected")
1966 if len(result) > size:
1967 if self._truncate:
1968 result = result[:size]
1969 else:
1970 raise ValueError(
1971 "passphrase returned by callback is too long"
1972 )
1973 for i in range(len(result)):
1974 buf[i] = result[i : i + 1]
1975 return len(result)
1976 except Exception as e:
1977 self._problems.append(e)
1978 return 0
1981def load_publickey(type: int, buffer: str | bytes) -> PKey:
1982 """
1983 Load a public key from a buffer.
1985 :param type: The file type (one of :data:`FILETYPE_PEM`,
1986 :data:`FILETYPE_ASN1`).
1987 :param buffer: The buffer the key is stored in.
1988 :type buffer: A Python string object, either unicode or bytestring.
1989 :return: The PKey object.
1990 :rtype: :class:`PKey`
1991 """
1992 if isinstance(buffer, str):
1993 buffer = buffer.encode("ascii")
1995 bio = _new_mem_buf(buffer)
1997 if type == FILETYPE_PEM:
1998 evp_pkey = _lib.PEM_read_bio_PUBKEY(
1999 bio, _ffi.NULL, _ffi.NULL, _ffi.NULL
2000 )
2001 elif type == FILETYPE_ASN1:
2002 evp_pkey = _lib.d2i_PUBKEY_bio(bio, _ffi.NULL)
2003 else:
2004 raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1")
2006 if evp_pkey == _ffi.NULL:
2007 _raise_current_error()
2009 pkey = PKey.__new__(PKey)
2010 pkey._pkey = _ffi.gc(evp_pkey, _lib.EVP_PKEY_free)
2011 pkey._only_public = True
2012 return pkey
2015def load_privatekey(
2016 type: int,
2017 buffer: str | bytes,
2018 passphrase: PassphraseCallableT | None = None,
2019) -> PKey:
2020 """
2021 Load a private key (PKey) from the string *buffer* encoded with the type
2022 *type*.
2024 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
2025 :param buffer: The buffer the key is stored in
2026 :param passphrase: (optional) if encrypted PEM format, this can be
2027 either the passphrase to use, or a callback for
2028 providing the passphrase.
2030 :return: The PKey object
2031 """
2032 if isinstance(buffer, str):
2033 buffer = buffer.encode("ascii")
2035 bio = _new_mem_buf(buffer)
2037 helper = _PassphraseHelper(type, passphrase)
2038 if type == FILETYPE_PEM:
2039 evp_pkey = _lib.PEM_read_bio_PrivateKey(
2040 bio, _ffi.NULL, helper.callback, helper.callback_args
2041 )
2042 helper.raise_if_problem()
2043 elif type == FILETYPE_ASN1:
2044 evp_pkey = _lib.d2i_PrivateKey_bio(bio, _ffi.NULL)
2045 else:
2046 raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1")
2048 if evp_pkey == _ffi.NULL:
2049 _raise_current_error()
2051 pkey = PKey.__new__(PKey)
2052 pkey._pkey = _ffi.gc(evp_pkey, _lib.EVP_PKEY_free)
2053 return pkey
2056def dump_certificate_request(type: int, req: X509Req) -> bytes:
2057 """
2058 Dump the certificate request *req* into a buffer string encoded with the
2059 type *type*.
2061 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
2062 :param req: The certificate request to dump
2063 :return: The buffer with the dumped certificate request in
2066 .. deprecated:: 24.2.0
2067 Use `cryptography.x509.CertificateSigningRequest` instead.
2068 """
2069 bio = _new_mem_buf()
2071 if type == FILETYPE_PEM:
2072 result_code = _lib.PEM_write_bio_X509_REQ(bio, req._req)
2073 elif type == FILETYPE_ASN1:
2074 result_code = _lib.i2d_X509_REQ_bio(bio, req._req)
2075 elif type == FILETYPE_TEXT:
2076 result_code = _lib.X509_REQ_print_ex(bio, req._req, 0, 0)
2077 else:
2078 raise ValueError(
2079 "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or "
2080 "FILETYPE_TEXT"
2081 )
2083 _openssl_assert(result_code != 0)
2085 return _bio_to_string(bio)
2088_dump_certificate_request_internal = dump_certificate_request
2090utils.deprecated(
2091 dump_certificate_request,
2092 __name__,
2093 (
2094 "CSR support in pyOpenSSL is deprecated. You should use the APIs "
2095 "in cryptography."
2096 ),
2097 DeprecationWarning,
2098 name="dump_certificate_request",
2099)
2102def load_certificate_request(type: int, buffer: bytes) -> X509Req:
2103 """
2104 Load a certificate request (X509Req) from the string *buffer* encoded with
2105 the type *type*.
2107 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
2108 :param buffer: The buffer the certificate request is stored in
2109 :return: The X509Req object
2111 .. deprecated:: 24.2.0
2112 Use `cryptography.x509.load_der_x509_csr` or
2113 `cryptography.x509.load_pem_x509_csr` instead.
2114 """
2115 if isinstance(buffer, str):
2116 buffer = buffer.encode("ascii")
2118 bio = _new_mem_buf(buffer)
2120 if type == FILETYPE_PEM:
2121 req = _lib.PEM_read_bio_X509_REQ(bio, _ffi.NULL, _ffi.NULL, _ffi.NULL)
2122 elif type == FILETYPE_ASN1:
2123 req = _lib.d2i_X509_REQ_bio(bio, _ffi.NULL)
2124 else:
2125 raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1")
2127 _openssl_assert(req != _ffi.NULL)
2129 x509req = X509Req.__new__(X509Req)
2130 x509req._req = _ffi.gc(req, _lib.X509_REQ_free)
2131 return x509req
2134_load_certificate_request_internal = load_certificate_request
2136utils.deprecated(
2137 load_certificate_request,
2138 __name__,
2139 (
2140 "CSR support in pyOpenSSL is deprecated. You should use the APIs "
2141 "in cryptography."
2142 ),
2143 DeprecationWarning,
2144 name="load_certificate_request",
2145)