Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/OpenSSL/crypto.py: 23%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import calendar
4import datetime
5import functools
6import sys
7import typing
8import warnings
9from base64 import b16encode
10from collections.abc import Iterable, Sequence
11from functools import partial
12from typing import (
13 Any,
14 Callable,
15 Union,
16)
18if sys.version_info >= (3, 13):
19 from warnings import deprecated
20elif sys.version_info < (3, 8):
21 _T = typing.TypeVar("T")
23 def deprecated(msg: str, **kwargs: object) -> Callable[[_T], _T]:
24 return lambda f: f
25else:
26 from typing_extensions import deprecated
28from cryptography import utils, x509
29from cryptography.hazmat.primitives.asymmetric import (
30 dsa,
31 ec,
32 ed448,
33 ed25519,
34 rsa,
35)
37from OpenSSL._util import StrOrBytesPath
38from OpenSSL._util import (
39 byte_string as _byte_string,
40)
41from OpenSSL._util import (
42 exception_from_error_queue as _exception_from_error_queue,
43)
44from OpenSSL._util import (
45 ffi as _ffi,
46)
47from OpenSSL._util import (
48 lib as _lib,
49)
50from OpenSSL._util import (
51 make_assert as _make_assert,
52)
53from OpenSSL._util import (
54 path_bytes as _path_bytes,
55)
57__all__ = [
58 "FILETYPE_ASN1",
59 "FILETYPE_PEM",
60 "FILETYPE_TEXT",
61 "TYPE_DSA",
62 "TYPE_RSA",
63 "X509",
64 "Error",
65 "PKey",
66 "X509Extension",
67 "X509Name",
68 "X509Req",
69 "X509Store",
70 "X509StoreContext",
71 "X509StoreContextError",
72 "X509StoreFlags",
73 "dump_certificate",
74 "dump_certificate_request",
75 "dump_privatekey",
76 "dump_publickey",
77 "get_elliptic_curve",
78 "get_elliptic_curves",
79 "load_certificate",
80 "load_certificate_request",
81 "load_privatekey",
82 "load_publickey",
83]
86_PrivateKey = Union[
87 dsa.DSAPrivateKey,
88 ec.EllipticCurvePrivateKey,
89 ed25519.Ed25519PrivateKey,
90 ed448.Ed448PrivateKey,
91 rsa.RSAPrivateKey,
92]
93_PublicKey = Union[
94 dsa.DSAPublicKey,
95 ec.EllipticCurvePublicKey,
96 ed25519.Ed25519PublicKey,
97 ed448.Ed448PublicKey,
98 rsa.RSAPublicKey,
99]
100_Key = Union[_PrivateKey, _PublicKey]
101PassphraseCallableT = Union[bytes, Callable[..., bytes]]
104FILETYPE_PEM: int = _lib.SSL_FILETYPE_PEM
105FILETYPE_ASN1: int = _lib.SSL_FILETYPE_ASN1
107# TODO This was an API mistake. OpenSSL has no such constant.
108FILETYPE_TEXT = 2**16 - 1
110TYPE_RSA: int = _lib.EVP_PKEY_RSA
111TYPE_DSA: int = _lib.EVP_PKEY_DSA
112TYPE_DH: int = _lib.EVP_PKEY_DH
113TYPE_EC: int = _lib.EVP_PKEY_EC
116class Error(Exception):
117 """
118 An error occurred in an `OpenSSL.crypto` API.
119 """
122_raise_current_error = partial(_exception_from_error_queue, Error)
123_openssl_assert = _make_assert(Error)
126def _new_mem_buf(buffer: bytes | None = None) -> Any:
127 """
128 Allocate a new OpenSSL memory BIO.
130 Arrange for the garbage collector to clean it up automatically.
132 :param buffer: None or some bytes to use to put into the BIO so that they
133 can be read out.
134 """
135 if buffer is None:
136 bio = _lib.BIO_new(_lib.BIO_s_mem())
137 free = _lib.BIO_free
138 else:
139 data = _ffi.new("char[]", buffer)
140 bio = _lib.BIO_new_mem_buf(data, len(buffer))
142 # Keep the memory alive as long as the bio is alive!
143 def free(bio: Any, ref: Any = data) -> Any:
144 return _lib.BIO_free(bio)
146 _openssl_assert(bio != _ffi.NULL)
148 bio = _ffi.gc(bio, free)
149 return bio
152def _bio_to_string(bio: Any) -> bytes:
153 """
154 Copy the contents of an OpenSSL BIO object into a Python byte string.
155 """
156 result_buffer = _ffi.new("char**")
157 buffer_length = _lib.BIO_get_mem_data(bio, result_buffer)
158 return _ffi.buffer(result_buffer[0], buffer_length)[:]
161def _set_asn1_time(boundary: Any, when: bytes) -> None:
162 """
163 The the time value of an ASN1 time object.
165 @param boundary: An ASN1_TIME pointer (or an object safely
166 castable to that type) which will have its value set.
167 @param when: A string representation of the desired time value.
169 @raise TypeError: If C{when} is not a L{bytes} string.
170 @raise ValueError: If C{when} does not represent a time in the required
171 format.
172 @raise RuntimeError: If the time value cannot be set for some other
173 (unspecified) reason.
174 """
175 if not isinstance(when, bytes):
176 raise TypeError("when must be a byte string")
177 # ASN1_TIME_set_string validates the string without writing anything
178 # when the destination is NULL.
179 _openssl_assert(boundary != _ffi.NULL)
181 set_result = _lib.ASN1_TIME_set_string(boundary, when)
182 if set_result == 0:
183 raise ValueError("Invalid string")
186def _new_asn1_time(when: bytes) -> Any:
187 """
188 Behaves like _set_asn1_time but returns a new ASN1_TIME object.
190 @param when: A string representation of the desired time value.
192 @raise TypeError: If C{when} is not a L{bytes} string.
193 @raise ValueError: If C{when} does not represent a time in the required
194 format.
195 @raise RuntimeError: If the time value cannot be set for some other
196 (unspecified) reason.
197 """
198 ret = _lib.ASN1_TIME_new()
199 _openssl_assert(ret != _ffi.NULL)
200 ret = _ffi.gc(ret, _lib.ASN1_TIME_free)
201 _set_asn1_time(ret, when)
202 return ret
205def _get_asn1_time(timestamp: Any) -> bytes | None:
206 """
207 Retrieve the time value of an ASN1 time object.
209 @param timestamp: An ASN1_GENERALIZEDTIME* (or an object safely castable to
210 that type) from which the time value will be retrieved.
212 @return: The time value from C{timestamp} as a L{bytes} string in a certain
213 format. Or C{None} if the object contains no time value.
214 """
215 string_timestamp = _ffi.cast("ASN1_STRING*", timestamp)
216 if _lib.ASN1_STRING_length(string_timestamp) == 0:
217 return None
218 elif (
219 _lib.ASN1_STRING_type(string_timestamp) == _lib.V_ASN1_GENERALIZEDTIME
220 ):
221 return _ffi.string(_lib.ASN1_STRING_get0_data(string_timestamp))
222 else:
223 generalized_timestamp = _ffi.new("ASN1_GENERALIZEDTIME**")
224 _lib.ASN1_TIME_to_generalizedtime(timestamp, generalized_timestamp)
225 _openssl_assert(generalized_timestamp[0] != _ffi.NULL)
227 string_timestamp = _ffi.cast("ASN1_STRING*", generalized_timestamp[0])
228 string_data = _lib.ASN1_STRING_get0_data(string_timestamp)
229 string_result = _ffi.string(string_data)
230 _lib.ASN1_GENERALIZEDTIME_free(generalized_timestamp[0])
231 return string_result
234class _X509NameInvalidator:
235 def __init__(self) -> None:
236 self._names: list[X509Name] = []
238 def add(self, name: X509Name) -> None:
239 self._names.append(name)
241 def clear(self) -> None:
242 for name in self._names:
243 # Breaks the object, but also prevents UAF!
244 del name._name
247class PKey:
248 """
249 A class representing an DSA or RSA public key or key pair.
250 """
252 _only_public = False
253 _initialized = True
255 def __init__(self) -> None:
256 pkey = _lib.EVP_PKEY_new()
257 self._pkey = _ffi.gc(pkey, _lib.EVP_PKEY_free)
258 self._initialized = False
260 def to_cryptography_key(self) -> _Key:
261 """
262 Export as a ``cryptography`` key.
264 :rtype: One of ``cryptography``'s `key interfaces`_.
266 .. _key interfaces: https://cryptography.io/en/latest/hazmat/\
267 primitives/asymmetric/rsa/#key-interfaces
269 .. versionadded:: 16.1.0
270 """
271 from cryptography.hazmat.primitives.serialization import (
272 load_der_private_key,
273 load_der_public_key,
274 )
276 if self._only_public:
277 der = dump_publickey(FILETYPE_ASN1, self)
278 return typing.cast(_Key, load_der_public_key(der))
279 else:
280 der = dump_privatekey(FILETYPE_ASN1, self)
281 return typing.cast(_Key, load_der_private_key(der, password=None))
283 @classmethod
284 def from_cryptography_key(cls, crypto_key: _Key) -> PKey:
285 """
286 Construct based on a ``cryptography`` *crypto_key*.
288 :param crypto_key: A ``cryptography`` key.
289 :type crypto_key: One of ``cryptography``'s `key interfaces`_.
291 :rtype: PKey
293 .. versionadded:: 16.1.0
294 """
295 if not isinstance(
296 crypto_key,
297 (
298 dsa.DSAPrivateKey,
299 dsa.DSAPublicKey,
300 ec.EllipticCurvePrivateKey,
301 ec.EllipticCurvePublicKey,
302 ed25519.Ed25519PrivateKey,
303 ed25519.Ed25519PublicKey,
304 ed448.Ed448PrivateKey,
305 ed448.Ed448PublicKey,
306 rsa.RSAPrivateKey,
307 rsa.RSAPublicKey,
308 ),
309 ):
310 raise TypeError("Unsupported key type")
312 from cryptography.hazmat.primitives.serialization import (
313 Encoding,
314 NoEncryption,
315 PrivateFormat,
316 PublicFormat,
317 )
319 if isinstance(
320 crypto_key,
321 (
322 dsa.DSAPublicKey,
323 ec.EllipticCurvePublicKey,
324 ed25519.Ed25519PublicKey,
325 ed448.Ed448PublicKey,
326 rsa.RSAPublicKey,
327 ),
328 ):
329 return load_publickey(
330 FILETYPE_ASN1,
331 crypto_key.public_bytes(
332 Encoding.DER, PublicFormat.SubjectPublicKeyInfo
333 ),
334 )
335 else:
336 der = crypto_key.private_bytes(
337 Encoding.DER, PrivateFormat.PKCS8, NoEncryption()
338 )
339 return load_privatekey(FILETYPE_ASN1, der)
341 def generate_key(self, type: int, bits: int) -> None:
342 """
343 Generate a key pair of the given type, with the given number of bits.
345 This generates a key "into" the this object.
347 :param type: The key type.
348 :type type: :py:data:`TYPE_RSA` or :py:data:`TYPE_DSA`
349 :param bits: The number of bits.
350 :type bits: :py:data:`int` ``>= 0``
351 :raises TypeError: If :py:data:`type` or :py:data:`bits` isn't
352 of the appropriate type.
353 :raises ValueError: If the number of bits isn't an integer of
354 the appropriate size.
355 :return: ``None``
356 """
357 if not isinstance(type, int):
358 raise TypeError("type must be an integer")
360 if not isinstance(bits, int):
361 raise TypeError("bits must be an integer")
363 if type == TYPE_RSA:
364 if bits <= 0:
365 raise ValueError("Invalid number of bits")
367 # TODO Check error return
368 exponent = _lib.BN_new()
369 exponent = _ffi.gc(exponent, _lib.BN_free)
370 _lib.BN_set_word(exponent, _lib.RSA_F4)
372 rsa = _lib.RSA_new()
374 result = _lib.RSA_generate_key_ex(rsa, bits, exponent, _ffi.NULL)
375 _openssl_assert(result == 1)
377 result = _lib.EVP_PKEY_assign_RSA(self._pkey, rsa)
378 _openssl_assert(result == 1)
380 elif type == TYPE_DSA:
381 dsa = _lib.DSA_new()
382 _openssl_assert(dsa != _ffi.NULL)
384 dsa = _ffi.gc(dsa, _lib.DSA_free)
385 res = _lib.DSA_generate_parameters_ex(
386 dsa, bits, _ffi.NULL, 0, _ffi.NULL, _ffi.NULL, _ffi.NULL
387 )
388 _openssl_assert(res == 1)
390 _openssl_assert(_lib.DSA_generate_key(dsa) == 1)
391 _openssl_assert(_lib.EVP_PKEY_set1_DSA(self._pkey, dsa) == 1)
392 else:
393 raise Error("No such key type")
395 self._initialized = True
397 def check(self) -> bool:
398 """
399 Check the consistency of an RSA private key.
401 This is the Python equivalent of OpenSSL's ``RSA_check_key``.
403 :return: ``True`` if key is consistent.
405 :raise OpenSSL.crypto.Error: if the key is inconsistent.
407 :raise TypeError: if the key is of a type which cannot be checked.
408 Only RSA keys can currently be checked.
409 """
410 if self._only_public:
411 raise TypeError("public key only")
413 if _lib.EVP_PKEY_type(self.type()) != _lib.EVP_PKEY_RSA:
414 raise TypeError("Only RSA keys can currently be checked.")
416 rsa = _lib.EVP_PKEY_get1_RSA(self._pkey)
417 rsa = _ffi.gc(rsa, _lib.RSA_free)
418 result = _lib.RSA_check_key(rsa)
419 if result == 1:
420 return True
421 _raise_current_error()
423 def type(self) -> int:
424 """
425 Returns the type of the key
427 :return: The type of the key.
428 """
429 return _lib.EVP_PKEY_id(self._pkey)
431 def bits(self) -> int:
432 """
433 Returns the number of bits of the key
435 :return: The number of bits of the key.
436 """
437 return _lib.EVP_PKEY_bits(self._pkey)
440class _EllipticCurve:
441 """
442 A representation of a supported elliptic curve.
444 @cvar _curves: :py:obj:`None` until an attempt is made to load the curves.
445 Thereafter, a :py:type:`set` containing :py:type:`_EllipticCurve`
446 instances each of which represents one curve supported by the system.
447 @type _curves: :py:type:`NoneType` or :py:type:`set`
448 """
450 _curves = None
452 def __ne__(self, other: Any) -> bool:
453 """
454 Implement cooperation with the right-hand side argument of ``!=``.
456 Python 3 seems to have dropped this cooperation in this very narrow
457 circumstance.
458 """
459 if isinstance(other, _EllipticCurve):
460 return super().__ne__(other)
461 return NotImplemented
463 @classmethod
464 def _load_elliptic_curves(cls, lib: Any) -> set[_EllipticCurve]:
465 """
466 Get the curves supported by OpenSSL.
468 :param lib: The OpenSSL library binding object.
470 :return: A :py:type:`set` of ``cls`` instances giving the names of the
471 elliptic curves the underlying library supports.
472 """
473 num_curves = lib.EC_get_builtin_curves(_ffi.NULL, 0)
474 builtin_curves = _ffi.new("EC_builtin_curve[]", num_curves)
475 # The return value on this call should be num_curves again. We
476 # could check it to make sure but if it *isn't* then.. what could
477 # we do? Abort the whole process, I suppose...? -exarkun
478 lib.EC_get_builtin_curves(builtin_curves, num_curves)
479 return set(cls.from_nid(lib, c.nid) for c in builtin_curves)
481 @classmethod
482 def _get_elliptic_curves(cls, lib: Any) -> set[_EllipticCurve]:
483 """
484 Get, cache, and return the curves supported by OpenSSL.
486 :param lib: The OpenSSL library binding object.
488 :return: A :py:type:`set` of ``cls`` instances giving the names of the
489 elliptic curves the underlying library supports.
490 """
491 if cls._curves is None:
492 cls._curves = cls._load_elliptic_curves(lib)
493 return cls._curves
495 @classmethod
496 def from_nid(cls, lib: Any, nid: int) -> _EllipticCurve:
497 """
498 Instantiate a new :py:class:`_EllipticCurve` associated with the given
499 OpenSSL NID.
501 :param lib: The OpenSSL library binding object.
503 :param nid: The OpenSSL NID the resulting curve object will represent.
504 This must be a curve NID (and not, for example, a hash NID) or
505 subsequent operations will fail in unpredictable ways.
506 :type nid: :py:class:`int`
508 :return: The curve object.
509 """
510 return cls(lib, nid, _ffi.string(lib.OBJ_nid2sn(nid)).decode("ascii"))
512 def __init__(self, lib: Any, nid: int, name: str) -> None:
513 """
514 :param _lib: The :py:mod:`cryptography` binding instance used to
515 interface with OpenSSL.
517 :param _nid: The OpenSSL NID identifying the curve this object
518 represents.
519 :type _nid: :py:class:`int`
521 :param name: The OpenSSL short name identifying the curve this object
522 represents.
523 :type name: :py:class:`unicode`
524 """
525 self._lib = lib
526 self._nid = nid
527 self.name = name
529 def __repr__(self) -> str:
530 return f"<Curve {self.name!r}>"
532 def _to_EC_KEY(self) -> Any:
533 """
534 Create a new OpenSSL EC_KEY structure initialized to use this curve.
536 The structure is automatically garbage collected when the Python object
537 is garbage collected.
538 """
539 key = self._lib.EC_KEY_new_by_curve_name(self._nid)
540 return _ffi.gc(key, _lib.EC_KEY_free)
543@deprecated(
544 "get_elliptic_curves is deprecated. You should use the APIs in "
545 "cryptography instead."
546)
547def get_elliptic_curves() -> set[_EllipticCurve]:
548 """
549 Return a set of objects representing the elliptic curves supported in the
550 OpenSSL build in use.
552 The curve objects have a :py:class:`unicode` ``name`` attribute by which
553 they identify themselves.
555 The curve objects are useful as values for the argument accepted by
556 :py:meth:`Context.set_tmp_ecdh` to specify which elliptical curve should be
557 used for ECDHE key exchange.
558 """
559 return _EllipticCurve._get_elliptic_curves(_lib)
562@deprecated(
563 "get_elliptic_curve is deprecated. You should use the APIs in "
564 "cryptography instead."
565)
566def get_elliptic_curve(name: str) -> _EllipticCurve:
567 """
568 Return a single curve object selected by name.
570 See :py:func:`get_elliptic_curves` for information about curve objects.
572 :param name: The OpenSSL short name identifying the curve object to
573 retrieve.
574 :type name: :py:class:`unicode`
576 If the named curve is not supported then :py:class:`ValueError` is raised.
577 """
578 for curve in get_elliptic_curves():
579 if curve.name == name:
580 return curve
581 raise ValueError("unknown curve name", name)
584@functools.total_ordering
585class X509Name:
586 """
587 An X.509 Distinguished Name.
589 :ivar countryName: The country of the entity.
590 :ivar C: Alias for :py:attr:`countryName`.
592 :ivar stateOrProvinceName: The state or province of the entity.
593 :ivar ST: Alias for :py:attr:`stateOrProvinceName`.
595 :ivar localityName: The locality of the entity.
596 :ivar L: Alias for :py:attr:`localityName`.
598 :ivar organizationName: The organization name of the entity.
599 :ivar O: Alias for :py:attr:`organizationName`.
601 :ivar organizationalUnitName: The organizational unit of the entity.
602 :ivar OU: Alias for :py:attr:`organizationalUnitName`
604 :ivar commonName: The common name of the entity.
605 :ivar CN: Alias for :py:attr:`commonName`.
607 :ivar emailAddress: The e-mail address of the entity.
608 """
610 def __init__(self, name: X509Name) -> None:
611 """
612 Create a new X509Name, copying the given X509Name instance.
614 :param name: The name to copy.
615 :type name: :py:class:`X509Name`
616 """
617 name = _lib.X509_NAME_dup(name._name)
618 self._name: Any = _ffi.gc(name, _lib.X509_NAME_free)
620 def __setattr__(self, name: str, value: Any) -> None:
621 if name.startswith("_"):
622 return super().__setattr__(name, value)
624 # Note: we really do not want str subclasses here, so we do not use
625 # isinstance.
626 if type(name) is not str:
627 raise TypeError(
628 f"attribute name must be string, not "
629 f"'{type(value).__name__:.200}'"
630 )
632 nid = _lib.OBJ_txt2nid(_byte_string(name))
633 if nid == _lib.NID_undef:
634 try:
635 _raise_current_error()
636 except Error:
637 pass
638 raise AttributeError("No such attribute")
640 # If there's an old entry for this NID, remove it
641 for i in range(_lib.X509_NAME_entry_count(self._name)):
642 ent = _lib.X509_NAME_get_entry(self._name, i)
643 ent_obj = _lib.X509_NAME_ENTRY_get_object(ent)
644 ent_nid = _lib.OBJ_obj2nid(ent_obj)
645 if nid == ent_nid:
646 ent = _lib.X509_NAME_delete_entry(self._name, i)
647 _lib.X509_NAME_ENTRY_free(ent)
648 break
650 if isinstance(value, str):
651 value = value.encode("utf-8")
653 add_result = _lib.X509_NAME_add_entry_by_NID(
654 self._name, nid, _lib.MBSTRING_UTF8, value, -1, -1, 0
655 )
656 if not add_result:
657 _raise_current_error()
659 def __getattr__(self, name: str) -> str | None:
660 """
661 Find attribute. An X509Name object has the following attributes:
662 countryName (alias C), stateOrProvince (alias ST), locality (alias L),
663 organization (alias O), organizationalUnit (alias OU), commonName
664 (alias CN) and more...
665 """
666 nid = _lib.OBJ_txt2nid(_byte_string(name))
667 if nid == _lib.NID_undef:
668 # This is a bit weird. OBJ_txt2nid indicated failure, but it seems
669 # a lower level function, a2d_ASN1_OBJECT, also feels the need to
670 # push something onto the error queue. If we don't clean that up
671 # now, someone else will bump into it later and be quite confused.
672 # See lp#314814.
673 try:
674 _raise_current_error()
675 except Error:
676 pass
677 raise AttributeError("No such attribute")
679 entry_index = _lib.X509_NAME_get_index_by_NID(self._name, nid, -1)
680 if entry_index == -1:
681 return None
683 entry = _lib.X509_NAME_get_entry(self._name, entry_index)
684 data = _lib.X509_NAME_ENTRY_get_data(entry)
686 result_buffer = _ffi.new("unsigned char**")
687 data_length = _lib.ASN1_STRING_to_UTF8(result_buffer, data)
688 _openssl_assert(data_length >= 0)
690 try:
691 result = _ffi.buffer(result_buffer[0], data_length)[:].decode(
692 "utf-8"
693 )
694 finally:
695 # XXX untested
696 _lib.OPENSSL_free(result_buffer[0])
697 return result
699 def __eq__(self, other: Any) -> bool:
700 if not isinstance(other, X509Name):
701 return NotImplemented
703 return _lib.X509_NAME_cmp(self._name, other._name) == 0
705 def __lt__(self, other: Any) -> bool:
706 if not isinstance(other, X509Name):
707 return NotImplemented
709 return _lib.X509_NAME_cmp(self._name, other._name) < 0
711 def __repr__(self) -> str:
712 """
713 String representation of an X509Name
714 """
715 result_buffer = _ffi.new("char[]", 512)
716 format_result = _lib.X509_NAME_oneline(
717 self._name, result_buffer, len(result_buffer)
718 )
719 _openssl_assert(format_result != _ffi.NULL)
721 return "<X509Name object '{}'>".format(
722 _ffi.string(result_buffer).decode("utf-8"),
723 )
725 def hash(self) -> int:
726 """
727 Return an integer representation of the first four bytes of the
728 MD5 digest of the DER representation of the name.
730 This is the Python equivalent of OpenSSL's ``X509_NAME_hash``.
732 :return: The (integer) hash of this name.
733 :rtype: :py:class:`int`
734 """
735 return _lib.X509_NAME_hash(self._name)
737 def der(self) -> bytes:
738 """
739 Return the DER encoding of this name.
741 :return: The DER encoded form of this name.
742 :rtype: :py:class:`bytes`
743 """
744 result_buffer = _ffi.new("unsigned char**")
745 encode_result = _lib.i2d_X509_NAME(self._name, result_buffer)
746 _openssl_assert(encode_result >= 0)
748 string_result = _ffi.buffer(result_buffer[0], encode_result)[:]
749 _lib.OPENSSL_free(result_buffer[0])
750 return string_result
752 def get_components(self) -> list[tuple[bytes, bytes]]:
753 """
754 Returns the components of this name, as a sequence of 2-tuples.
756 :return: The components of this name.
757 :rtype: :py:class:`list` of ``name, value`` tuples.
758 """
759 result = []
760 for i in range(_lib.X509_NAME_entry_count(self._name)):
761 ent = _lib.X509_NAME_get_entry(self._name, i)
763 fname = _lib.X509_NAME_ENTRY_get_object(ent)
764 fval = _lib.X509_NAME_ENTRY_get_data(ent)
766 nid = _lib.OBJ_obj2nid(fname)
767 name = _lib.OBJ_nid2sn(nid)
769 # ffi.string does not handle strings containing NULL bytes
770 # (which may have been generated by old, broken software)
771 value = _ffi.buffer(
772 _lib.ASN1_STRING_get0_data(fval), _lib.ASN1_STRING_length(fval)
773 )[:]
774 result.append((_ffi.string(name), value))
776 return result
779@deprecated(
780 "X509Extension support in pyOpenSSL is deprecated. You should use the "
781 "APIs in cryptography."
782)
783class X509Extension:
784 """
785 An X.509 v3 certificate extension.
787 .. deprecated:: 23.3.0
788 Use cryptography's X509 APIs instead.
789 """
791 def __init__(
792 self,
793 type_name: bytes,
794 critical: bool,
795 value: bytes,
796 subject: X509 | None = None,
797 issuer: X509 | None = None,
798 ) -> None:
799 """
800 Initializes an X509 extension.
802 :param type_name: The name of the type of extension_ to create.
803 :type type_name: :py:data:`bytes`
805 :param bool critical: A flag indicating whether this is a critical
806 extension.
808 :param value: The OpenSSL textual representation of the extension's
809 value.
810 :type value: :py:data:`bytes`
812 :param subject: Optional X509 certificate to use as subject.
813 :type subject: :py:class:`X509`
815 :param issuer: Optional X509 certificate to use as issuer.
816 :type issuer: :py:class:`X509`
818 .. _extension: https://www.openssl.org/docs/manmaster/man5/
819 x509v3_config.html#STANDARD-EXTENSIONS
820 """
821 ctx = _ffi.new("X509V3_CTX*")
823 # A context is necessary for any extension which uses the r2i
824 # conversion method. That is, X509V3_EXT_nconf may segfault if passed
825 # a NULL ctx. Start off by initializing most of the fields to NULL.
826 _lib.X509V3_set_ctx(ctx, _ffi.NULL, _ffi.NULL, _ffi.NULL, _ffi.NULL, 0)
828 # We have no configuration database - but perhaps we should (some
829 # extensions may require it).
830 _lib.X509V3_set_ctx_nodb(ctx)
832 # Initialize the subject and issuer, if appropriate. ctx is a local,
833 # and as far as I can tell none of the X509V3_* APIs invoked here steal
834 # any references, so no need to mess with reference counts or
835 # duplicates.
836 if issuer is not None:
837 if not isinstance(issuer, X509):
838 raise TypeError("issuer must be an X509 instance")
839 ctx.issuer_cert = issuer._x509
840 if subject is not None:
841 if not isinstance(subject, X509):
842 raise TypeError("subject must be an X509 instance")
843 ctx.subject_cert = subject._x509
845 if critical:
846 # There are other OpenSSL APIs which would let us pass in critical
847 # separately, but they're harder to use, and since value is already
848 # a pile of crappy junk smuggling a ton of utterly important
849 # structured data, what's the point of trying to avoid nasty stuff
850 # with strings? (However, X509V3_EXT_i2d in particular seems like
851 # it would be a better API to invoke. I do not know where to get
852 # the ext_struc it desires for its last parameter, though.)
853 value = b"critical," + value
855 extension = _lib.X509V3_EXT_nconf(_ffi.NULL, ctx, type_name, value)
856 if extension == _ffi.NULL:
857 _raise_current_error()
858 self._extension = _ffi.gc(extension, _lib.X509_EXTENSION_free)
860 @property
861 def _nid(self) -> Any:
862 return _lib.OBJ_obj2nid(
863 _lib.X509_EXTENSION_get_object(self._extension)
864 )
866 _prefixes: typing.ClassVar[dict[int, str]] = {
867 _lib.GEN_EMAIL: "email",
868 _lib.GEN_DNS: "DNS",
869 _lib.GEN_URI: "URI",
870 }
872 def _subjectAltNameString(self) -> str:
873 names = _ffi.cast(
874 "GENERAL_NAMES*", _lib.X509V3_EXT_d2i(self._extension)
875 )
877 names = _ffi.gc(names, _lib.GENERAL_NAMES_free)
878 parts = []
879 for i in range(_lib.sk_GENERAL_NAME_num(names)):
880 name = _lib.sk_GENERAL_NAME_value(names, i)
881 try:
882 label = self._prefixes[name.type]
883 except KeyError:
884 bio = _new_mem_buf()
885 _lib.GENERAL_NAME_print(bio, name)
886 parts.append(_bio_to_string(bio).decode("utf-8"))
887 else:
888 value = _ffi.buffer(name.d.ia5.data, name.d.ia5.length)[
889 :
890 ].decode("utf-8")
891 parts.append(label + ":" + value)
892 return ", ".join(parts)
894 def __str__(self) -> str:
895 """
896 :return: a nice text representation of the extension
897 """
898 if _lib.NID_subject_alt_name == self._nid:
899 return self._subjectAltNameString()
901 bio = _new_mem_buf()
902 print_result = _lib.X509V3_EXT_print(bio, self._extension, 0, 0)
903 _openssl_assert(print_result != 0)
905 return _bio_to_string(bio).decode("utf-8")
907 def get_critical(self) -> bool:
908 """
909 Returns the critical field of this X.509 extension.
911 :return: The critical field.
912 """
913 return _lib.X509_EXTENSION_get_critical(self._extension)
915 def get_short_name(self) -> bytes:
916 """
917 Returns the short type name of this X.509 extension.
919 The result is a byte string such as :py:const:`b"basicConstraints"`.
921 :return: The short type name.
922 :rtype: :py:data:`bytes`
924 .. versionadded:: 0.12
925 """
926 obj = _lib.X509_EXTENSION_get_object(self._extension)
927 nid = _lib.OBJ_obj2nid(obj)
928 # OpenSSL 3.1.0 has a bug where nid2sn returns NULL for NIDs that
929 # previously returned UNDEF. This is a workaround for that issue.
930 # https://github.com/openssl/openssl/commit/908ba3ed9adbb3df90f76
931 buf = _lib.OBJ_nid2sn(nid)
932 if buf != _ffi.NULL:
933 return _ffi.string(buf)
934 else:
935 return b"UNDEF"
937 def get_data(self) -> bytes:
938 """
939 Returns the data of the X509 extension, encoded as ASN.1.
941 :return: The ASN.1 encoded data of this X509 extension.
942 :rtype: :py:data:`bytes`
944 .. versionadded:: 0.12
945 """
946 octet_result = _lib.X509_EXTENSION_get_data(self._extension)
947 string_result = _ffi.cast("ASN1_STRING*", octet_result)
948 char_result = _lib.ASN1_STRING_get0_data(string_result)
949 result_length = _lib.ASN1_STRING_length(string_result)
950 return _ffi.buffer(char_result, result_length)[:]
953@deprecated(
954 "CSR support in pyOpenSSL is deprecated. You should use the APIs "
955 "in cryptography."
956)
957class X509Req:
958 """
959 An X.509 certificate signing requests.
961 .. deprecated:: 24.2.0
962 Use `cryptography.x509.CertificateSigningRequest` instead.
963 """
965 def __init__(self) -> None:
966 req = _lib.X509_REQ_new()
967 self._req = _ffi.gc(req, _lib.X509_REQ_free)
968 # Default to version 0.
969 self.set_version(0)
971 def to_cryptography(self) -> x509.CertificateSigningRequest:
972 """
973 Export as a ``cryptography`` certificate signing request.
975 :rtype: ``cryptography.x509.CertificateSigningRequest``
977 .. versionadded:: 17.1.0
978 """
979 from cryptography.x509 import load_der_x509_csr
981 der = _dump_certificate_request_internal(FILETYPE_ASN1, self)
983 return load_der_x509_csr(der)
985 @classmethod
986 def from_cryptography(
987 cls, crypto_req: x509.CertificateSigningRequest
988 ) -> X509Req:
989 """
990 Construct based on a ``cryptography`` *crypto_req*.
992 :param crypto_req: A ``cryptography`` X.509 certificate signing request
993 :type crypto_req: ``cryptography.x509.CertificateSigningRequest``
995 :rtype: X509Req
997 .. versionadded:: 17.1.0
998 """
999 if not isinstance(crypto_req, x509.CertificateSigningRequest):
1000 raise TypeError("Must be a certificate signing request")
1002 from cryptography.hazmat.primitives.serialization import Encoding
1004 der = crypto_req.public_bytes(Encoding.DER)
1005 return _load_certificate_request_internal(FILETYPE_ASN1, der)
1007 def set_pubkey(self, pkey: PKey) -> None:
1008 """
1009 Set the public key of the certificate signing request.
1011 :param pkey: The public key to use.
1012 :type pkey: :py:class:`PKey`
1014 :return: ``None``
1015 """
1016 set_result = _lib.X509_REQ_set_pubkey(self._req, pkey._pkey)
1017 _openssl_assert(set_result == 1)
1019 def get_pubkey(self) -> PKey:
1020 """
1021 Get the public key of the certificate signing request.
1023 :return: The public key.
1024 :rtype: :py:class:`PKey`
1025 """
1026 pkey = PKey.__new__(PKey)
1027 pkey._pkey = _lib.X509_REQ_get_pubkey(self._req)
1028 _openssl_assert(pkey._pkey != _ffi.NULL)
1029 pkey._pkey = _ffi.gc(pkey._pkey, _lib.EVP_PKEY_free)
1030 pkey._only_public = True
1031 return pkey
1033 def set_version(self, version: int) -> None:
1034 """
1035 Set the version subfield (RFC 2986, section 4.1) of the certificate
1036 request.
1038 :param int version: The version number.
1039 :return: ``None``
1040 """
1041 if not isinstance(version, int):
1042 raise TypeError("version must be an int")
1043 if version != 0:
1044 raise ValueError(
1045 "Invalid version. The only valid version for X509Req is 0."
1046 )
1047 set_result = _lib.X509_REQ_set_version(self._req, version)
1048 _openssl_assert(set_result == 1)
1050 def get_version(self) -> int:
1051 """
1052 Get the version subfield (RFC 2459, section 4.1.2.1) of the certificate
1053 request.
1055 :return: The value of the version subfield.
1056 :rtype: :py:class:`int`
1057 """
1058 return _lib.X509_REQ_get_version(self._req)
1060 def get_subject(self) -> X509Name:
1061 """
1062 Return the subject of this certificate signing request.
1064 This creates a new :class:`X509Name` that wraps the underlying subject
1065 name field on the certificate signing request. Modifying it will modify
1066 the underlying signing request, and will have the effect of modifying
1067 any other :class:`X509Name` that refers to this subject.
1069 :return: The subject of this certificate signing request.
1070 :rtype: :class:`X509Name`
1071 """
1072 name = X509Name.__new__(X509Name)
1073 name._name = _lib.X509_REQ_get_subject_name(self._req)
1074 _openssl_assert(name._name != _ffi.NULL)
1076 # The name is owned by the X509Req structure. As long as the X509Name
1077 # Python object is alive, keep the X509Req Python object alive.
1078 name._owner = self
1080 return name
1082 def add_extensions(self, extensions: Iterable[X509Extension]) -> None:
1083 """
1084 Add extensions to the certificate signing request.
1086 :param extensions: The X.509 extensions to add.
1087 :type extensions: iterable of :py:class:`X509Extension`
1088 :return: ``None``
1089 """
1090 warnings.warn(
1091 (
1092 "This API is deprecated and will be removed in a future "
1093 "version of pyOpenSSL. You should use pyca/cryptography's "
1094 "X.509 APIs instead."
1095 ),
1096 DeprecationWarning,
1097 stacklevel=2,
1098 )
1100 stack = _lib.sk_X509_EXTENSION_new_null()
1101 _openssl_assert(stack != _ffi.NULL)
1103 stack = _ffi.gc(stack, _lib.sk_X509_EXTENSION_free)
1105 for ext in extensions:
1106 if not isinstance(ext, X509Extension):
1107 raise ValueError("One of the elements is not an X509Extension")
1109 # TODO push can fail (here and elsewhere)
1110 _lib.sk_X509_EXTENSION_push(stack, ext._extension)
1112 add_result = _lib.X509_REQ_add_extensions(self._req, stack)
1113 _openssl_assert(add_result == 1)
1115 def get_extensions(self) -> list[X509Extension]:
1116 """
1117 Get X.509 extensions in the certificate signing request.
1119 :return: The X.509 extensions in this request.
1120 :rtype: :py:class:`list` of :py:class:`X509Extension` objects.
1122 .. versionadded:: 0.15
1123 """
1124 warnings.warn(
1125 (
1126 "This API is deprecated and will be removed in a future "
1127 "version of pyOpenSSL. You should use pyca/cryptography's "
1128 "X.509 APIs instead."
1129 ),
1130 DeprecationWarning,
1131 stacklevel=2,
1132 )
1134 exts = []
1135 native_exts_obj = _lib.X509_REQ_get_extensions(self._req)
1136 native_exts_obj = _ffi.gc(
1137 native_exts_obj,
1138 lambda x: _lib.sk_X509_EXTENSION_pop_free(
1139 x,
1140 _ffi.addressof(_lib._original_lib, "X509_EXTENSION_free"),
1141 ),
1142 )
1144 for i in range(_lib.sk_X509_EXTENSION_num(native_exts_obj)):
1145 ext = X509Extension.__new__(X509Extension)
1146 extension = _lib.X509_EXTENSION_dup(
1147 _lib.sk_X509_EXTENSION_value(native_exts_obj, i)
1148 )
1149 ext._extension = _ffi.gc(extension, _lib.X509_EXTENSION_free)
1150 exts.append(ext)
1151 return exts
1153 def sign(self, pkey: PKey, digest: str) -> None:
1154 """
1155 Sign the certificate signing request with this key and digest type.
1157 :param pkey: The key pair to sign with.
1158 :type pkey: :py:class:`PKey`
1159 :param digest: The name of the message digest to use for the signature,
1160 e.g. :py:data:`"sha256"`.
1161 :type digest: :py:class:`str`
1162 :return: ``None``
1163 """
1164 if pkey._only_public:
1165 raise ValueError("Key has only public part")
1167 if not pkey._initialized:
1168 raise ValueError("Key is uninitialized")
1170 digest_obj = _lib.EVP_get_digestbyname(_byte_string(digest))
1171 if digest_obj == _ffi.NULL:
1172 raise ValueError("No such digest method")
1174 sign_result = _lib.X509_REQ_sign(self._req, pkey._pkey, digest_obj)
1175 _openssl_assert(sign_result > 0)
1177 def verify(self, pkey: PKey) -> bool:
1178 """
1179 Verifies the signature on this certificate signing request.
1181 :param PKey key: A public key.
1183 :return: ``True`` if the signature is correct.
1184 :rtype: bool
1186 :raises OpenSSL.crypto.Error: If the signature is invalid or there is a
1187 problem verifying the signature.
1188 """
1189 if not isinstance(pkey, PKey):
1190 raise TypeError("pkey must be a PKey instance")
1192 result = _lib.X509_REQ_verify(self._req, pkey._pkey)
1193 if result <= 0:
1194 _raise_current_error()
1196 return result
1199class X509:
1200 """
1201 An X.509 certificate.
1202 """
1204 def __init__(self) -> None:
1205 x509 = _lib.X509_new()
1206 _openssl_assert(x509 != _ffi.NULL)
1207 self._x509 = _ffi.gc(x509, _lib.X509_free)
1209 self._issuer_invalidator = _X509NameInvalidator()
1210 self._subject_invalidator = _X509NameInvalidator()
1212 @classmethod
1213 def _from_raw_x509_ptr(cls, x509: Any) -> X509:
1214 cert = cls.__new__(cls)
1215 cert._x509 = _ffi.gc(x509, _lib.X509_free)
1216 cert._issuer_invalidator = _X509NameInvalidator()
1217 cert._subject_invalidator = _X509NameInvalidator()
1218 return cert
1220 def to_cryptography(self) -> x509.Certificate:
1221 """
1222 Export as a ``cryptography`` certificate.
1224 :rtype: ``cryptography.x509.Certificate``
1226 .. versionadded:: 17.1.0
1227 """
1228 from cryptography.x509 import load_der_x509_certificate
1230 der = dump_certificate(FILETYPE_ASN1, self)
1231 return load_der_x509_certificate(der)
1233 @classmethod
1234 def from_cryptography(cls, crypto_cert: x509.Certificate) -> X509:
1235 """
1236 Construct based on a ``cryptography`` *crypto_cert*.
1238 :param crypto_key: A ``cryptography`` X.509 certificate.
1239 :type crypto_key: ``cryptography.x509.Certificate``
1241 :rtype: X509
1243 .. versionadded:: 17.1.0
1244 """
1245 if not isinstance(crypto_cert, x509.Certificate):
1246 raise TypeError("Must be a certificate")
1248 from cryptography.hazmat.primitives.serialization import Encoding
1250 der = crypto_cert.public_bytes(Encoding.DER)
1251 return load_certificate(FILETYPE_ASN1, der)
1253 def set_version(self, version: int) -> None:
1254 """
1255 Set the version number of the certificate. Note that the
1256 version value is zero-based, eg. a value of 0 is V1.
1258 :param version: The version number of the certificate.
1259 :type version: :py:class:`int`
1261 :return: ``None``
1262 """
1263 if not isinstance(version, int):
1264 raise TypeError("version must be an integer")
1266 _openssl_assert(_lib.X509_set_version(self._x509, version) == 1)
1268 def get_version(self) -> int:
1269 """
1270 Return the version number of the certificate.
1272 :return: The version number of the certificate.
1273 :rtype: :py:class:`int`
1274 """
1275 return _lib.X509_get_version(self._x509)
1277 def get_pubkey(self) -> PKey:
1278 """
1279 Get the public key of the certificate.
1281 :return: The public key.
1282 :rtype: :py:class:`PKey`
1283 """
1284 pkey = PKey.__new__(PKey)
1285 pkey._pkey = _lib.X509_get_pubkey(self._x509)
1286 if pkey._pkey == _ffi.NULL:
1287 _raise_current_error()
1288 pkey._pkey = _ffi.gc(pkey._pkey, _lib.EVP_PKEY_free)
1289 pkey._only_public = True
1290 return pkey
1292 def set_pubkey(self, pkey: PKey) -> None:
1293 """
1294 Set the public key of the certificate.
1296 :param pkey: The public key.
1297 :type pkey: :py:class:`PKey`
1299 :return: :py:data:`None`
1300 """
1301 if not isinstance(pkey, PKey):
1302 raise TypeError("pkey must be a PKey instance")
1304 set_result = _lib.X509_set_pubkey(self._x509, pkey._pkey)
1305 _openssl_assert(set_result == 1)
1307 def sign(self, pkey: PKey, digest: str) -> None:
1308 """
1309 Sign the certificate with this key and digest type.
1311 :param pkey: The key to sign with.
1312 :type pkey: :py:class:`PKey`
1314 :param digest: The name of the message digest to use.
1315 :type digest: :py:class:`str`
1317 :return: :py:data:`None`
1318 """
1319 if not isinstance(pkey, PKey):
1320 raise TypeError("pkey must be a PKey instance")
1322 if pkey._only_public:
1323 raise ValueError("Key only has public part")
1325 if not pkey._initialized:
1326 raise ValueError("Key is uninitialized")
1328 evp_md = _lib.EVP_get_digestbyname(_byte_string(digest))
1329 if evp_md == _ffi.NULL:
1330 raise ValueError("No such digest method")
1332 sign_result = _lib.X509_sign(self._x509, pkey._pkey, evp_md)
1333 _openssl_assert(sign_result > 0)
1335 def get_signature_algorithm(self) -> bytes:
1336 """
1337 Return the signature algorithm used in the certificate.
1339 :return: The name of the algorithm.
1340 :rtype: :py:class:`bytes`
1342 :raises ValueError: If the signature algorithm is undefined.
1344 .. versionadded:: 0.13
1345 """
1346 sig_alg = _lib.X509_get0_tbs_sigalg(self._x509)
1347 alg = _ffi.new("ASN1_OBJECT **")
1348 _lib.X509_ALGOR_get0(alg, _ffi.NULL, _ffi.NULL, sig_alg)
1349 nid = _lib.OBJ_obj2nid(alg[0])
1350 if nid == _lib.NID_undef:
1351 raise ValueError("Undefined signature algorithm")
1352 return _ffi.string(_lib.OBJ_nid2ln(nid))
1354 def digest(self, digest_name: str) -> bytes:
1355 """
1356 Return the digest of the X509 object.
1358 :param digest_name: The name of the digest algorithm to use.
1359 :type digest_name: :py:class:`str`
1361 :return: The digest of the object, formatted as
1362 :py:const:`b":"`-delimited hex pairs.
1363 :rtype: :py:class:`bytes`
1364 """
1365 digest = _lib.EVP_get_digestbyname(_byte_string(digest_name))
1366 if digest == _ffi.NULL:
1367 raise ValueError("No such digest method")
1369 result_buffer = _ffi.new("unsigned char[]", _lib.EVP_MAX_MD_SIZE)
1370 result_length = _ffi.new("unsigned int[]", 1)
1371 result_length[0] = len(result_buffer)
1373 digest_result = _lib.X509_digest(
1374 self._x509, digest, result_buffer, result_length
1375 )
1376 _openssl_assert(digest_result == 1)
1378 return b":".join(
1379 [
1380 b16encode(ch).upper()
1381 for ch in _ffi.buffer(result_buffer, result_length[0])
1382 ]
1383 )
1385 def subject_name_hash(self) -> int:
1386 """
1387 Return the hash of the X509 subject.
1389 :return: The hash of the subject.
1390 :rtype: :py:class:`int`
1391 """
1392 return _lib.X509_subject_name_hash(self._x509)
1394 def set_serial_number(self, serial: int) -> None:
1395 """
1396 Set the serial number of the certificate.
1398 :param serial: The new serial number.
1399 :type serial: :py:class:`int`
1401 :return: :py:data`None`
1402 """
1403 if not isinstance(serial, int):
1404 raise TypeError("serial must be an integer")
1406 hex_serial = hex(serial)[2:]
1407 hex_serial_bytes = hex_serial.encode("ascii")
1409 bignum_serial = _ffi.new("BIGNUM**")
1411 # BN_hex2bn stores the result in &bignum.
1412 result = _lib.BN_hex2bn(bignum_serial, hex_serial_bytes)
1413 _openssl_assert(result != _ffi.NULL)
1415 asn1_serial = _lib.BN_to_ASN1_INTEGER(bignum_serial[0], _ffi.NULL)
1416 _lib.BN_free(bignum_serial[0])
1417 _openssl_assert(asn1_serial != _ffi.NULL)
1418 asn1_serial = _ffi.gc(asn1_serial, _lib.ASN1_INTEGER_free)
1419 set_result = _lib.X509_set_serialNumber(self._x509, asn1_serial)
1420 _openssl_assert(set_result == 1)
1422 def get_serial_number(self) -> int:
1423 """
1424 Return the serial number of this certificate.
1426 :return: The serial number.
1427 :rtype: int
1428 """
1429 asn1_serial = _lib.X509_get_serialNumber(self._x509)
1430 bignum_serial = _lib.ASN1_INTEGER_to_BN(asn1_serial, _ffi.NULL)
1431 try:
1432 hex_serial = _lib.BN_bn2hex(bignum_serial)
1433 try:
1434 hexstring_serial = _ffi.string(hex_serial)
1435 serial = int(hexstring_serial, 16)
1436 return serial
1437 finally:
1438 _lib.OPENSSL_free(hex_serial)
1439 finally:
1440 _lib.BN_free(bignum_serial)
1442 def gmtime_adj_notAfter(self, amount: int) -> None:
1443 """
1444 Adjust the time stamp on which the certificate stops being valid.
1446 :param int amount: The number of seconds by which to adjust the
1447 timestamp.
1448 :return: ``None``
1449 """
1450 if not isinstance(amount, int):
1451 raise TypeError("amount must be an integer")
1453 notAfter = _lib.X509_getm_notAfter(self._x509)
1454 _lib.X509_gmtime_adj(notAfter, amount)
1456 def gmtime_adj_notBefore(self, amount: int) -> None:
1457 """
1458 Adjust the timestamp on which the certificate starts being valid.
1460 :param amount: The number of seconds by which to adjust the timestamp.
1461 :return: ``None``
1462 """
1463 if not isinstance(amount, int):
1464 raise TypeError("amount must be an integer")
1466 notBefore = _lib.X509_getm_notBefore(self._x509)
1467 _lib.X509_gmtime_adj(notBefore, amount)
1469 def has_expired(self) -> bool:
1470 """
1471 Check whether the certificate has expired.
1473 :return: ``True`` if the certificate has expired, ``False`` otherwise.
1474 :rtype: bool
1475 """
1476 time_bytes = self.get_notAfter()
1477 if time_bytes is None:
1478 raise ValueError("Unable to determine notAfter")
1479 time_string = time_bytes.decode("utf-8")
1480 not_after = datetime.datetime.strptime(time_string, "%Y%m%d%H%M%SZ")
1482 UTC = datetime.timezone.utc
1483 utcnow = datetime.datetime.now(UTC).replace(tzinfo=None)
1484 return not_after < utcnow
1486 def _get_boundary_time(self, which: Any) -> bytes | None:
1487 return _get_asn1_time(which(self._x509))
1489 def get_notBefore(self) -> bytes | None:
1490 """
1491 Get the timestamp at which the certificate starts being valid.
1493 The timestamp is formatted as an ASN.1 TIME::
1495 YYYYMMDDhhmmssZ
1497 :return: A timestamp string, or ``None`` if there is none.
1498 :rtype: bytes or NoneType
1499 """
1500 return self._get_boundary_time(_lib.X509_getm_notBefore)
1502 def _set_boundary_time(
1503 self, which: Callable[..., Any], when: bytes
1504 ) -> None:
1505 return _set_asn1_time(which(self._x509), when)
1507 def set_notBefore(self, when: bytes) -> None:
1508 """
1509 Set the timestamp at which the certificate starts being valid.
1511 The timestamp is formatted as an ASN.1 TIME::
1513 YYYYMMDDhhmmssZ
1515 :param bytes when: A timestamp string.
1516 :return: ``None``
1517 """
1518 return self._set_boundary_time(_lib.X509_getm_notBefore, when)
1520 def get_notAfter(self) -> bytes | None:
1521 """
1522 Get the timestamp at which the certificate stops being valid.
1524 The timestamp is formatted as an ASN.1 TIME::
1526 YYYYMMDDhhmmssZ
1528 :return: A timestamp string, or ``None`` if there is none.
1529 :rtype: bytes or NoneType
1530 """
1531 return self._get_boundary_time(_lib.X509_getm_notAfter)
1533 def set_notAfter(self, when: bytes) -> None:
1534 """
1535 Set the timestamp at which the certificate stops being valid.
1537 The timestamp is formatted as an ASN.1 TIME::
1539 YYYYMMDDhhmmssZ
1541 :param bytes when: A timestamp string.
1542 :return: ``None``
1543 """
1544 return self._set_boundary_time(_lib.X509_getm_notAfter, when)
1546 def _get_name(self, which: Any) -> X509Name:
1547 name = X509Name.__new__(X509Name)
1548 name._name = which(self._x509)
1549 _openssl_assert(name._name != _ffi.NULL)
1551 # The name is owned by the X509 structure. As long as the X509Name
1552 # Python object is alive, keep the X509 Python object alive.
1553 name._owner = self
1555 return name
1557 def _set_name(self, which: Any, name: X509Name) -> None:
1558 if not isinstance(name, X509Name):
1559 raise TypeError("name must be an X509Name")
1560 set_result = which(self._x509, name._name)
1561 _openssl_assert(set_result == 1)
1563 def get_issuer(self) -> X509Name:
1564 """
1565 Return the issuer of this certificate.
1567 This creates a new :class:`X509Name` that wraps the underlying issuer
1568 name field on the certificate. Modifying it will modify the underlying
1569 certificate, and will have the effect of modifying any other
1570 :class:`X509Name` that refers to this issuer.
1572 :return: The issuer of this certificate.
1573 :rtype: :class:`X509Name`
1574 """
1575 name = self._get_name(_lib.X509_get_issuer_name)
1576 self._issuer_invalidator.add(name)
1577 return name
1579 def set_issuer(self, issuer: X509Name) -> None:
1580 """
1581 Set the issuer of this certificate.
1583 :param issuer: The issuer.
1584 :type issuer: :py:class:`X509Name`
1586 :return: ``None``
1587 """
1588 self._set_name(_lib.X509_set_issuer_name, issuer)
1589 self._issuer_invalidator.clear()
1591 def get_subject(self) -> X509Name:
1592 """
1593 Return the subject of this certificate.
1595 This creates a new :class:`X509Name` that wraps the underlying subject
1596 name field on the certificate. Modifying it will modify the underlying
1597 certificate, and will have the effect of modifying any other
1598 :class:`X509Name` that refers to this subject.
1600 :return: The subject of this certificate.
1601 :rtype: :class:`X509Name`
1602 """
1603 name = self._get_name(_lib.X509_get_subject_name)
1604 self._subject_invalidator.add(name)
1605 return name
1607 def set_subject(self, subject: X509Name) -> None:
1608 """
1609 Set the subject of this certificate.
1611 :param subject: The subject.
1612 :type subject: :py:class:`X509Name`
1614 :return: ``None``
1615 """
1616 self._set_name(_lib.X509_set_subject_name, subject)
1617 self._subject_invalidator.clear()
1619 def get_extension_count(self) -> int:
1620 """
1621 Get the number of extensions on this certificate.
1623 :return: The number of extensions.
1624 :rtype: :py:class:`int`
1626 .. versionadded:: 0.12
1627 """
1628 return _lib.X509_get_ext_count(self._x509)
1630 def add_extensions(self, extensions: Iterable[X509Extension]) -> None:
1631 """
1632 Add extensions to the certificate.
1634 :param extensions: The extensions to add.
1635 :type extensions: An iterable of :py:class:`X509Extension` objects.
1636 :return: ``None``
1637 """
1638 warnings.warn(
1639 (
1640 "This API is deprecated and will be removed in a future "
1641 "version of pyOpenSSL. You should use pyca/cryptography's "
1642 "X.509 APIs instead."
1643 ),
1644 DeprecationWarning,
1645 stacklevel=2,
1646 )
1648 for ext in extensions:
1649 if not isinstance(ext, X509Extension):
1650 raise ValueError("One of the elements is not an X509Extension")
1652 add_result = _lib.X509_add_ext(self._x509, ext._extension, -1)
1653 _openssl_assert(add_result == 1)
1655 def get_extension(self, index: int) -> X509Extension:
1656 """
1657 Get a specific extension of the certificate by index.
1659 Extensions on a certificate are kept in order. The index
1660 parameter selects which extension will be returned.
1662 :param int index: The index of the extension to retrieve.
1663 :return: The extension at the specified index.
1664 :rtype: :py:class:`X509Extension`
1665 :raises IndexError: If the extension index was out of bounds.
1667 .. versionadded:: 0.12
1668 """
1669 warnings.warn(
1670 (
1671 "This API is deprecated and will be removed in a future "
1672 "version of pyOpenSSL. You should use pyca/cryptography's "
1673 "X.509 APIs instead."
1674 ),
1675 DeprecationWarning,
1676 stacklevel=2,
1677 )
1679 ext = X509Extension.__new__(X509Extension)
1680 ext._extension = _lib.X509_get_ext(self._x509, index)
1681 if ext._extension == _ffi.NULL:
1682 raise IndexError("extension index out of bounds")
1684 extension = _lib.X509_EXTENSION_dup(ext._extension)
1685 ext._extension = _ffi.gc(extension, _lib.X509_EXTENSION_free)
1686 return ext
1689class X509StoreFlags:
1690 """
1691 Flags for X509 verification, used to change the behavior of
1692 :class:`X509Store`.
1694 See `OpenSSL Verification Flags`_ for details.
1696 .. _OpenSSL Verification Flags:
1697 https://www.openssl.org/docs/manmaster/man3/X509_VERIFY_PARAM_set_flags.html
1698 """
1700 CRL_CHECK: int = _lib.X509_V_FLAG_CRL_CHECK
1701 CRL_CHECK_ALL: int = _lib.X509_V_FLAG_CRL_CHECK_ALL
1702 IGNORE_CRITICAL: int = _lib.X509_V_FLAG_IGNORE_CRITICAL
1703 X509_STRICT: int = _lib.X509_V_FLAG_X509_STRICT
1704 ALLOW_PROXY_CERTS: int = _lib.X509_V_FLAG_ALLOW_PROXY_CERTS
1705 POLICY_CHECK: int = _lib.X509_V_FLAG_POLICY_CHECK
1706 EXPLICIT_POLICY: int = _lib.X509_V_FLAG_EXPLICIT_POLICY
1707 INHIBIT_MAP: int = _lib.X509_V_FLAG_INHIBIT_MAP
1708 CHECK_SS_SIGNATURE: int = _lib.X509_V_FLAG_CHECK_SS_SIGNATURE
1709 PARTIAL_CHAIN: int = _lib.X509_V_FLAG_PARTIAL_CHAIN
1712class X509Store:
1713 """
1714 An X.509 store.
1716 An X.509 store is used to describe a context in which to verify a
1717 certificate. A description of a context may include a set of certificates
1718 to trust, a set of certificate revocation lists, verification flags and
1719 more.
1721 An X.509 store, being only a description, cannot be used by itself to
1722 verify a certificate. To carry out the actual verification process, see
1723 :class:`X509StoreContext`.
1724 """
1726 def __init__(self) -> None:
1727 store = _lib.X509_STORE_new()
1728 self._store = _ffi.gc(store, _lib.X509_STORE_free)
1730 def add_cert(self, cert: X509) -> None:
1731 """
1732 Adds a trusted certificate to this store.
1734 Adding a certificate with this method adds this certificate as a
1735 *trusted* certificate.
1737 :param X509 cert: The certificate to add to this store.
1739 :raises TypeError: If the certificate is not an :class:`X509`.
1741 :raises OpenSSL.crypto.Error: If OpenSSL was unhappy with your
1742 certificate.
1744 :return: ``None`` if the certificate was added successfully.
1745 """
1746 if not isinstance(cert, X509):
1747 raise TypeError()
1749 res = _lib.X509_STORE_add_cert(self._store, cert._x509)
1750 _openssl_assert(res == 1)
1752 def add_crl(self, crl: x509.CertificateRevocationList) -> None:
1753 """
1754 Add a certificate revocation list to this store.
1756 The certificate revocation lists added to a store will only be used if
1757 the associated flags are configured to check certificate revocation
1758 lists.
1760 .. versionadded:: 16.1.0
1762 :param crl: The certificate revocation list to add to this store.
1763 :type crl: ``cryptography.x509.CertificateRevocationList``
1764 :return: ``None`` if the certificate revocation list was added
1765 successfully.
1766 """
1767 if isinstance(crl, x509.CertificateRevocationList):
1768 from cryptography.hazmat.primitives.serialization import Encoding
1770 bio = _new_mem_buf(crl.public_bytes(Encoding.DER))
1771 openssl_crl = _lib.d2i_X509_CRL_bio(bio, _ffi.NULL)
1772 _openssl_assert(openssl_crl != _ffi.NULL)
1773 crl = _ffi.gc(openssl_crl, _lib.X509_CRL_free)
1774 else:
1775 raise TypeError(
1776 "CRL must be of type "
1777 "cryptography.x509.CertificateRevocationList"
1778 )
1780 _openssl_assert(_lib.X509_STORE_add_crl(self._store, crl) != 0)
1782 def set_flags(self, flags: int) -> None:
1783 """
1784 Set verification flags to this store.
1786 Verification flags can be combined by oring them together.
1788 .. note::
1790 Setting a verification flag sometimes requires clients to add
1791 additional information to the store, otherwise a suitable error will
1792 be raised.
1794 For example, in setting flags to enable CRL checking a
1795 suitable CRL must be added to the store otherwise an error will be
1796 raised.
1798 .. versionadded:: 16.1.0
1800 :param int flags: The verification flags to set on this store.
1801 See :class:`X509StoreFlags` for available constants.
1802 :return: ``None`` if the verification flags were successfully set.
1803 """
1804 _openssl_assert(_lib.X509_STORE_set_flags(self._store, flags) != 0)
1806 def set_time(self, vfy_time: datetime.datetime) -> None:
1807 """
1808 Set the time against which the certificates are verified.
1810 Normally the current time is used.
1812 .. note::
1814 For example, you can determine if a certificate was valid at a given
1815 time.
1817 .. versionadded:: 17.0.0
1819 :param datetime vfy_time: The verification time to set on this store.
1820 :return: ``None`` if the verification time was successfully set.
1821 """
1822 param = _lib.X509_VERIFY_PARAM_new()
1823 param = _ffi.gc(param, _lib.X509_VERIFY_PARAM_free)
1825 _lib.X509_VERIFY_PARAM_set_time(
1826 param, calendar.timegm(vfy_time.timetuple())
1827 )
1828 _openssl_assert(_lib.X509_STORE_set1_param(self._store, param) != 0)
1830 def load_locations(
1831 self,
1832 cafile: StrOrBytesPath | None,
1833 capath: StrOrBytesPath | None = None,
1834 ) -> None:
1835 """
1836 Let X509Store know where we can find trusted certificates for the
1837 certificate chain. Note that the certificates have to be in PEM
1838 format.
1840 If *capath* is passed, it must be a directory prepared using the
1841 ``c_rehash`` tool included with OpenSSL. Either, but not both, of
1842 *cafile* or *capath* may be ``None``.
1844 .. note::
1846 Both *cafile* and *capath* may be set simultaneously.
1848 Call this method multiple times to add more than one location.
1849 For example, CA certificates, and certificate revocation list bundles
1850 may be passed in *cafile* in subsequent calls to this method.
1852 .. versionadded:: 20.0
1854 :param cafile: In which file we can find the certificates (``bytes`` or
1855 ``unicode``).
1856 :param capath: In which directory we can find the certificates
1857 (``bytes`` or ``unicode``).
1859 :return: ``None`` if the locations were set successfully.
1861 :raises OpenSSL.crypto.Error: If both *cafile* and *capath* is ``None``
1862 or the locations could not be set for any reason.
1864 """
1865 if cafile is None:
1866 cafile = _ffi.NULL
1867 else:
1868 cafile = _path_bytes(cafile)
1870 if capath is None:
1871 capath = _ffi.NULL
1872 else:
1873 capath = _path_bytes(capath)
1875 load_result = _lib.X509_STORE_load_locations(
1876 self._store, cafile, capath
1877 )
1878 if not load_result:
1879 _raise_current_error()
1882class X509StoreContextError(Exception):
1883 """
1884 An exception raised when an error occurred while verifying a certificate
1885 using `OpenSSL.X509StoreContext.verify_certificate`.
1887 :ivar certificate: The certificate which caused verificate failure.
1888 :type certificate: :class:`X509`
1889 """
1891 def __init__(
1892 self, message: str, errors: list[Any], certificate: X509
1893 ) -> None:
1894 super().__init__(message)
1895 self.errors = errors
1896 self.certificate = certificate
1899class X509StoreContext:
1900 """
1901 An X.509 store context.
1903 An X.509 store context is used to carry out the actual verification process
1904 of a certificate in a described context. For describing such a context, see
1905 :class:`X509Store`.
1907 :param X509Store store: The certificates which will be trusted for the
1908 purposes of any verifications.
1909 :param X509 certificate: The certificate to be verified.
1910 :param chain: List of untrusted certificates that may be used for building
1911 the certificate chain. May be ``None``.
1912 :type chain: :class:`list` of :class:`X509`
1913 """
1915 def __init__(
1916 self,
1917 store: X509Store,
1918 certificate: X509,
1919 chain: Sequence[X509] | None = None,
1920 ) -> None:
1921 self._store = store
1922 self._cert = certificate
1923 self._chain = self._build_certificate_stack(chain)
1925 @staticmethod
1926 def _build_certificate_stack(
1927 certificates: Sequence[X509] | None,
1928 ) -> None:
1929 def cleanup(s: Any) -> None:
1930 # Equivalent to sk_X509_pop_free, but we don't
1931 # currently have a CFFI binding for that available
1932 for i in range(_lib.sk_X509_num(s)):
1933 x = _lib.sk_X509_value(s, i)
1934 _lib.X509_free(x)
1935 _lib.sk_X509_free(s)
1937 if certificates is None or len(certificates) == 0:
1938 return _ffi.NULL
1940 stack = _lib.sk_X509_new_null()
1941 _openssl_assert(stack != _ffi.NULL)
1942 stack = _ffi.gc(stack, cleanup)
1944 for cert in certificates:
1945 if not isinstance(cert, X509):
1946 raise TypeError("One of the elements is not an X509 instance")
1948 _openssl_assert(_lib.X509_up_ref(cert._x509) > 0)
1949 if _lib.sk_X509_push(stack, cert._x509) <= 0:
1950 _lib.X509_free(cert._x509)
1951 _raise_current_error()
1953 return stack
1955 @staticmethod
1956 def _exception_from_context(store_ctx: Any) -> X509StoreContextError:
1957 """
1958 Convert an OpenSSL native context error failure into a Python
1959 exception.
1961 When a call to native OpenSSL X509_verify_cert fails, additional
1962 information about the failure can be obtained from the store context.
1963 """
1964 message = _ffi.string(
1965 _lib.X509_verify_cert_error_string(
1966 _lib.X509_STORE_CTX_get_error(store_ctx)
1967 )
1968 ).decode("utf-8")
1969 errors = [
1970 _lib.X509_STORE_CTX_get_error(store_ctx),
1971 _lib.X509_STORE_CTX_get_error_depth(store_ctx),
1972 message,
1973 ]
1974 # A context error should always be associated with a certificate, so we
1975 # expect this call to never return :class:`None`.
1976 _x509 = _lib.X509_STORE_CTX_get_current_cert(store_ctx)
1977 _cert = _lib.X509_dup(_x509)
1978 pycert = X509._from_raw_x509_ptr(_cert)
1979 return X509StoreContextError(message, errors, pycert)
1981 def _verify_certificate(self) -> Any:
1982 """
1983 Verifies the certificate and runs an X509_STORE_CTX containing the
1984 results.
1986 :raises X509StoreContextError: If an error occurred when validating a
1987 certificate in the context. Sets ``certificate`` attribute to
1988 indicate which certificate caused the error.
1989 """
1990 store_ctx = _lib.X509_STORE_CTX_new()
1991 _openssl_assert(store_ctx != _ffi.NULL)
1992 store_ctx = _ffi.gc(store_ctx, _lib.X509_STORE_CTX_free)
1994 ret = _lib.X509_STORE_CTX_init(
1995 store_ctx, self._store._store, self._cert._x509, self._chain
1996 )
1997 _openssl_assert(ret == 1)
1999 ret = _lib.X509_verify_cert(store_ctx)
2000 if ret <= 0:
2001 raise self._exception_from_context(store_ctx)
2003 return store_ctx
2005 def set_store(self, store: X509Store) -> None:
2006 """
2007 Set the context's X.509 store.
2009 .. versionadded:: 0.15
2011 :param X509Store store: The store description which will be used for
2012 the purposes of any *future* verifications.
2013 """
2014 self._store = store
2016 def verify_certificate(self) -> None:
2017 """
2018 Verify a certificate in a context.
2020 .. versionadded:: 0.15
2022 :raises X509StoreContextError: If an error occurred when validating a
2023 certificate in the context. Sets ``certificate`` attribute to
2024 indicate which certificate caused the error.
2025 """
2026 self._verify_certificate()
2028 def get_verified_chain(self) -> list[X509]:
2029 """
2030 Verify a certificate in a context and return the complete validated
2031 chain.
2033 :raises X509StoreContextError: If an error occurred when validating a
2034 certificate in the context. Sets ``certificate`` attribute to
2035 indicate which certificate caused the error.
2037 .. versionadded:: 20.0
2038 """
2039 store_ctx = self._verify_certificate()
2041 # Note: X509_STORE_CTX_get1_chain returns a deep copy of the chain.
2042 cert_stack = _lib.X509_STORE_CTX_get1_chain(store_ctx)
2043 _openssl_assert(cert_stack != _ffi.NULL)
2045 result = []
2046 for i in range(_lib.sk_X509_num(cert_stack)):
2047 cert = _lib.sk_X509_value(cert_stack, i)
2048 _openssl_assert(cert != _ffi.NULL)
2049 pycert = X509._from_raw_x509_ptr(cert)
2050 result.append(pycert)
2052 # Free the stack but not the members which are freed by the X509 class.
2053 _lib.sk_X509_free(cert_stack)
2054 return result
2057def load_certificate(type: int, buffer: bytes) -> X509:
2058 """
2059 Load a certificate (X509) from the string *buffer* encoded with the
2060 type *type*.
2062 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
2064 :param bytes buffer: The buffer the certificate is stored in
2066 :return: The X509 object
2067 """
2068 if isinstance(buffer, str):
2069 buffer = buffer.encode("ascii")
2071 bio = _new_mem_buf(buffer)
2073 if type == FILETYPE_PEM:
2074 x509 = _lib.PEM_read_bio_X509(bio, _ffi.NULL, _ffi.NULL, _ffi.NULL)
2075 elif type == FILETYPE_ASN1:
2076 x509 = _lib.d2i_X509_bio(bio, _ffi.NULL)
2077 else:
2078 raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1")
2080 if x509 == _ffi.NULL:
2081 _raise_current_error()
2083 return X509._from_raw_x509_ptr(x509)
2086def dump_certificate(type: int, cert: X509) -> bytes:
2087 """
2088 Dump the certificate *cert* into a buffer string encoded with the type
2089 *type*.
2091 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1, or
2092 FILETYPE_TEXT)
2093 :param cert: The certificate to dump
2094 :return: The buffer with the dumped certificate in
2095 """
2096 bio = _new_mem_buf()
2098 if type == FILETYPE_PEM:
2099 result_code = _lib.PEM_write_bio_X509(bio, cert._x509)
2100 elif type == FILETYPE_ASN1:
2101 result_code = _lib.i2d_X509_bio(bio, cert._x509)
2102 elif type == FILETYPE_TEXT:
2103 result_code = _lib.X509_print_ex(bio, cert._x509, 0, 0)
2104 else:
2105 raise ValueError(
2106 "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or "
2107 "FILETYPE_TEXT"
2108 )
2110 _openssl_assert(result_code == 1)
2111 return _bio_to_string(bio)
2114def dump_publickey(type: int, pkey: PKey) -> bytes:
2115 """
2116 Dump a public key to a buffer.
2118 :param type: The file type (one of :data:`FILETYPE_PEM` or
2119 :data:`FILETYPE_ASN1`).
2120 :param PKey pkey: The public key to dump
2121 :return: The buffer with the dumped key in it.
2122 :rtype: bytes
2123 """
2124 bio = _new_mem_buf()
2125 if type == FILETYPE_PEM:
2126 write_bio = _lib.PEM_write_bio_PUBKEY
2127 elif type == FILETYPE_ASN1:
2128 write_bio = _lib.i2d_PUBKEY_bio
2129 else:
2130 raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1")
2132 result_code = write_bio(bio, pkey._pkey)
2133 if result_code != 1: # pragma: no cover
2134 _raise_current_error()
2136 return _bio_to_string(bio)
2139def dump_privatekey(
2140 type: int,
2141 pkey: PKey,
2142 cipher: str | None = None,
2143 passphrase: PassphraseCallableT | None = None,
2144) -> bytes:
2145 """
2146 Dump the private key *pkey* into a buffer string encoded with the type
2147 *type*. Optionally (if *type* is :const:`FILETYPE_PEM`) encrypting it
2148 using *cipher* and *passphrase*.
2150 :param type: The file type (one of :const:`FILETYPE_PEM`,
2151 :const:`FILETYPE_ASN1`, or :const:`FILETYPE_TEXT`)
2152 :param PKey pkey: The PKey to dump
2153 :param cipher: (optional) if encrypted PEM format, the cipher to use
2154 :param passphrase: (optional) if encrypted PEM format, this can be either
2155 the passphrase to use, or a callback for providing the passphrase.
2157 :return: The buffer with the dumped key in
2158 :rtype: bytes
2159 """
2160 bio = _new_mem_buf()
2162 if not isinstance(pkey, PKey):
2163 raise TypeError("pkey must be a PKey")
2165 if cipher is not None:
2166 if passphrase is None:
2167 raise TypeError(
2168 "if a value is given for cipher "
2169 "one must also be given for passphrase"
2170 )
2171 cipher_obj = _lib.EVP_get_cipherbyname(_byte_string(cipher))
2172 if cipher_obj == _ffi.NULL:
2173 raise ValueError("Invalid cipher name")
2174 else:
2175 cipher_obj = _ffi.NULL
2177 helper = _PassphraseHelper(type, passphrase)
2178 if type == FILETYPE_PEM:
2179 result_code = _lib.PEM_write_bio_PrivateKey(
2180 bio,
2181 pkey._pkey,
2182 cipher_obj,
2183 _ffi.NULL,
2184 0,
2185 helper.callback,
2186 helper.callback_args,
2187 )
2188 helper.raise_if_problem()
2189 elif type == FILETYPE_ASN1:
2190 result_code = _lib.i2d_PrivateKey_bio(bio, pkey._pkey)
2191 elif type == FILETYPE_TEXT:
2192 if _lib.EVP_PKEY_id(pkey._pkey) != _lib.EVP_PKEY_RSA:
2193 raise TypeError("Only RSA keys are supported for FILETYPE_TEXT")
2195 rsa = _ffi.gc(_lib.EVP_PKEY_get1_RSA(pkey._pkey), _lib.RSA_free)
2196 result_code = _lib.RSA_print(bio, rsa, 0)
2197 else:
2198 raise ValueError(
2199 "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or "
2200 "FILETYPE_TEXT"
2201 )
2203 _openssl_assert(result_code != 0)
2205 return _bio_to_string(bio)
2208class _PassphraseHelper:
2209 def __init__(
2210 self,
2211 type: int,
2212 passphrase: PassphraseCallableT | None,
2213 more_args: bool = False,
2214 truncate: bool = False,
2215 ) -> None:
2216 if type != FILETYPE_PEM and passphrase is not None:
2217 raise ValueError(
2218 "only FILETYPE_PEM key format supports encryption"
2219 )
2220 self._passphrase = passphrase
2221 self._more_args = more_args
2222 self._truncate = truncate
2223 self._problems: list[Exception] = []
2225 @property
2226 def callback(self) -> Any:
2227 if self._passphrase is None:
2228 return _ffi.NULL
2229 elif isinstance(self._passphrase, bytes) or callable(self._passphrase):
2230 return _ffi.callback("pem_password_cb", self._read_passphrase)
2231 else:
2232 raise TypeError(
2233 "Last argument must be a byte string or a callable."
2234 )
2236 @property
2237 def callback_args(self) -> Any:
2238 if self._passphrase is None:
2239 return _ffi.NULL
2240 elif isinstance(self._passphrase, bytes) or callable(self._passphrase):
2241 return _ffi.NULL
2242 else:
2243 raise TypeError(
2244 "Last argument must be a byte string or a callable."
2245 )
2247 def raise_if_problem(self, exceptionType: type[Exception] = Error) -> None:
2248 if self._problems:
2249 # Flush the OpenSSL error queue
2250 try:
2251 _exception_from_error_queue(exceptionType)
2252 except exceptionType:
2253 pass
2255 raise self._problems.pop(0)
2257 def _read_passphrase(
2258 self, buf: Any, size: int, rwflag: Any, userdata: Any
2259 ) -> int:
2260 try:
2261 if callable(self._passphrase):
2262 if self._more_args:
2263 result = self._passphrase(size, rwflag, userdata)
2264 else:
2265 result = self._passphrase(rwflag)
2266 else:
2267 assert self._passphrase is not None
2268 result = self._passphrase
2269 if not isinstance(result, bytes):
2270 raise ValueError("Bytes expected")
2271 if len(result) > size:
2272 if self._truncate:
2273 result = result[:size]
2274 else:
2275 raise ValueError(
2276 "passphrase returned by callback is too long"
2277 )
2278 for i in range(len(result)):
2279 buf[i] = result[i : i + 1]
2280 return len(result)
2281 except Exception as e:
2282 self._problems.append(e)
2283 return 0
2286def load_publickey(type: int, buffer: str | bytes) -> PKey:
2287 """
2288 Load a public key from a buffer.
2290 :param type: The file type (one of :data:`FILETYPE_PEM`,
2291 :data:`FILETYPE_ASN1`).
2292 :param buffer: The buffer the key is stored in.
2293 :type buffer: A Python string object, either unicode or bytestring.
2294 :return: The PKey object.
2295 :rtype: :class:`PKey`
2296 """
2297 if isinstance(buffer, str):
2298 buffer = buffer.encode("ascii")
2300 bio = _new_mem_buf(buffer)
2302 if type == FILETYPE_PEM:
2303 evp_pkey = _lib.PEM_read_bio_PUBKEY(
2304 bio, _ffi.NULL, _ffi.NULL, _ffi.NULL
2305 )
2306 elif type == FILETYPE_ASN1:
2307 evp_pkey = _lib.d2i_PUBKEY_bio(bio, _ffi.NULL)
2308 else:
2309 raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1")
2311 if evp_pkey == _ffi.NULL:
2312 _raise_current_error()
2314 pkey = PKey.__new__(PKey)
2315 pkey._pkey = _ffi.gc(evp_pkey, _lib.EVP_PKEY_free)
2316 pkey._only_public = True
2317 return pkey
2320def load_privatekey(
2321 type: int,
2322 buffer: str | bytes,
2323 passphrase: PassphraseCallableT | None = None,
2324) -> PKey:
2325 """
2326 Load a private key (PKey) from the string *buffer* encoded with the type
2327 *type*.
2329 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
2330 :param buffer: The buffer the key is stored in
2331 :param passphrase: (optional) if encrypted PEM format, this can be
2332 either the passphrase to use, or a callback for
2333 providing the passphrase.
2335 :return: The PKey object
2336 """
2337 if isinstance(buffer, str):
2338 buffer = buffer.encode("ascii")
2340 bio = _new_mem_buf(buffer)
2342 helper = _PassphraseHelper(type, passphrase)
2343 if type == FILETYPE_PEM:
2344 evp_pkey = _lib.PEM_read_bio_PrivateKey(
2345 bio, _ffi.NULL, helper.callback, helper.callback_args
2346 )
2347 helper.raise_if_problem()
2348 elif type == FILETYPE_ASN1:
2349 evp_pkey = _lib.d2i_PrivateKey_bio(bio, _ffi.NULL)
2350 else:
2351 raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1")
2353 if evp_pkey == _ffi.NULL:
2354 _raise_current_error()
2356 pkey = PKey.__new__(PKey)
2357 pkey._pkey = _ffi.gc(evp_pkey, _lib.EVP_PKEY_free)
2358 return pkey
2361def dump_certificate_request(type: int, req: X509Req) -> bytes:
2362 """
2363 Dump the certificate request *req* into a buffer string encoded with the
2364 type *type*.
2366 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
2367 :param req: The certificate request to dump
2368 :return: The buffer with the dumped certificate request in
2371 .. deprecated:: 24.2.0
2372 Use `cryptography.x509.CertificateSigningRequest` instead.
2373 """
2374 bio = _new_mem_buf()
2376 if type == FILETYPE_PEM:
2377 result_code = _lib.PEM_write_bio_X509_REQ(bio, req._req)
2378 elif type == FILETYPE_ASN1:
2379 result_code = _lib.i2d_X509_REQ_bio(bio, req._req)
2380 elif type == FILETYPE_TEXT:
2381 result_code = _lib.X509_REQ_print_ex(bio, req._req, 0, 0)
2382 else:
2383 raise ValueError(
2384 "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or "
2385 "FILETYPE_TEXT"
2386 )
2388 _openssl_assert(result_code != 0)
2390 return _bio_to_string(bio)
2393_dump_certificate_request_internal = dump_certificate_request
2395utils.deprecated(
2396 dump_certificate_request,
2397 __name__,
2398 (
2399 "CSR support in pyOpenSSL is deprecated. You should use the APIs "
2400 "in cryptography."
2401 ),
2402 DeprecationWarning,
2403 name="dump_certificate_request",
2404)
2407def load_certificate_request(type: int, buffer: bytes) -> X509Req:
2408 """
2409 Load a certificate request (X509Req) from the string *buffer* encoded with
2410 the type *type*.
2412 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
2413 :param buffer: The buffer the certificate request is stored in
2414 :return: The X509Req object
2416 .. deprecated:: 24.2.0
2417 Use `cryptography.x509.load_der_x509_csr` or
2418 `cryptography.x509.load_pem_x509_csr` instead.
2419 """
2420 if isinstance(buffer, str):
2421 buffer = buffer.encode("ascii")
2423 bio = _new_mem_buf(buffer)
2425 if type == FILETYPE_PEM:
2426 req = _lib.PEM_read_bio_X509_REQ(bio, _ffi.NULL, _ffi.NULL, _ffi.NULL)
2427 elif type == FILETYPE_ASN1:
2428 req = _lib.d2i_X509_REQ_bio(bio, _ffi.NULL)
2429 else:
2430 raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1")
2432 _openssl_assert(req != _ffi.NULL)
2434 x509req = X509Req.__new__(X509Req)
2435 x509req._req = _ffi.gc(req, _lib.X509_REQ_free)
2436 return x509req
2439_load_certificate_request_internal = load_certificate_request
2441utils.deprecated(
2442 load_certificate_request,
2443 __name__,
2444 (
2445 "CSR support in pyOpenSSL is deprecated. You should use the APIs "
2446 "in cryptography."
2447 ),
2448 DeprecationWarning,
2449 name="load_certificate_request",
2450)