Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/OpenSSL/crypto.py: 29%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import calendar
4import datetime
5import functools
6import sys
7import typing
8import warnings
9from base64 import b16encode
10from collections.abc import Iterable, Sequence
11from functools import partial
12from typing import (
13 Any,
14 Callable,
15 Union,
16)
18if sys.version_info >= (3, 13):
19 from warnings import deprecated
20elif sys.version_info < (3, 8):
21 _T = typing.TypeVar("T")
23 def deprecated(msg: str, **kwargs: object) -> Callable[[_T], _T]:
24 return lambda f: f
25else:
26 from typing_extensions import deprecated
28from cryptography import utils, x509
29from cryptography.hazmat.primitives.asymmetric import (
30 dsa,
31 ec,
32 ed448,
33 ed25519,
34 rsa,
35)
37from OpenSSL._util import StrOrBytesPath
38from OpenSSL._util import (
39 byte_string as _byte_string,
40)
41from OpenSSL._util import (
42 exception_from_error_queue as _exception_from_error_queue,
43)
44from OpenSSL._util import (
45 ffi as _ffi,
46)
47from OpenSSL._util import (
48 lib as _lib,
49)
50from OpenSSL._util import (
51 make_assert as _make_assert,
52)
53from OpenSSL._util import (
54 path_bytes as _path_bytes,
55)
57__all__ = [
58 "FILETYPE_ASN1",
59 "FILETYPE_PEM",
60 "FILETYPE_TEXT",
61 "TYPE_DSA",
62 "TYPE_RSA",
63 "X509",
64 "Error",
65 "PKey",
66 "X509Extension",
67 "X509Name",
68 "X509Req",
69 "X509Store",
70 "X509StoreContext",
71 "X509StoreContextError",
72 "X509StoreFlags",
73 "dump_certificate",
74 "dump_certificate_request",
75 "dump_privatekey",
76 "dump_publickey",
77 "get_elliptic_curve",
78 "get_elliptic_curves",
79 "load_certificate",
80 "load_certificate_request",
81 "load_privatekey",
82 "load_publickey",
83]
86_PrivateKey = Union[
87 dsa.DSAPrivateKey,
88 ec.EllipticCurvePrivateKey,
89 ed25519.Ed25519PrivateKey,
90 ed448.Ed448PrivateKey,
91 rsa.RSAPrivateKey,
92]
93_PublicKey = Union[
94 dsa.DSAPublicKey,
95 ec.EllipticCurvePublicKey,
96 ed25519.Ed25519PublicKey,
97 ed448.Ed448PublicKey,
98 rsa.RSAPublicKey,
99]
100_Key = Union[_PrivateKey, _PublicKey]
101PassphraseCallableT = Union[bytes, Callable[..., bytes]]
104FILETYPE_PEM: int = _lib.SSL_FILETYPE_PEM
105FILETYPE_ASN1: int = _lib.SSL_FILETYPE_ASN1
107# TODO This was an API mistake. OpenSSL has no such constant.
108FILETYPE_TEXT = 2**16 - 1
110TYPE_RSA: int = _lib.EVP_PKEY_RSA
111TYPE_DSA: int = _lib.EVP_PKEY_DSA
112TYPE_DH: int = _lib.EVP_PKEY_DH
113TYPE_EC: int = _lib.EVP_PKEY_EC
116class Error(Exception):
117 """
118 An error occurred in an `OpenSSL.crypto` API.
119 """
122_raise_current_error = partial(_exception_from_error_queue, Error)
123_openssl_assert = _make_assert(Error)
126def _new_mem_buf(buffer: bytes | None = None) -> Any:
127 """
128 Allocate a new OpenSSL memory BIO.
130 Arrange for the garbage collector to clean it up automatically.
132 :param buffer: None or some bytes to use to put into the BIO so that they
133 can be read out.
134 """
135 if buffer is None:
136 bio = _lib.BIO_new(_lib.BIO_s_mem())
137 free = _lib.BIO_free
138 else:
139 data = _ffi.new("char[]", buffer)
140 bio = _lib.BIO_new_mem_buf(data, len(buffer))
142 # Keep the memory alive as long as the bio is alive!
143 def free(bio: Any, ref: Any = data) -> Any:
144 return _lib.BIO_free(bio)
146 _openssl_assert(bio != _ffi.NULL)
148 bio = _ffi.gc(bio, free)
149 return bio
152def _bio_to_string(bio: Any) -> bytes:
153 """
154 Copy the contents of an OpenSSL BIO object into a Python byte string.
155 """
156 result_buffer = _ffi.new("char**")
157 buffer_length = _lib.BIO_get_mem_data(bio, result_buffer)
158 return _ffi.buffer(result_buffer[0], buffer_length)[:]
161def _set_asn1_time(boundary: Any, when: bytes) -> None:
162 """
163 The the time value of an ASN1 time object.
165 @param boundary: An ASN1_TIME pointer (or an object safely
166 castable to that type) which will have its value set.
167 @param when: A string representation of the desired time value.
169 @raise TypeError: If C{when} is not a L{bytes} string.
170 @raise ValueError: If C{when} does not represent a time in the required
171 format.
172 @raise RuntimeError: If the time value cannot be set for some other
173 (unspecified) reason.
174 """
175 if not isinstance(when, bytes):
176 raise TypeError("when must be a byte string")
177 # ASN1_TIME_set_string validates the string without writing anything
178 # when the destination is NULL.
179 _openssl_assert(boundary != _ffi.NULL)
181 set_result = _lib.ASN1_TIME_set_string(boundary, when)
182 if set_result == 0:
183 raise ValueError("Invalid string")
186def _new_asn1_time(when: bytes) -> Any:
187 """
188 Behaves like _set_asn1_time but returns a new ASN1_TIME object.
190 @param when: A string representation of the desired time value.
192 @raise TypeError: If C{when} is not a L{bytes} string.
193 @raise ValueError: If C{when} does not represent a time in the required
194 format.
195 @raise RuntimeError: If the time value cannot be set for some other
196 (unspecified) reason.
197 """
198 ret = _lib.ASN1_TIME_new()
199 _openssl_assert(ret != _ffi.NULL)
200 ret = _ffi.gc(ret, _lib.ASN1_TIME_free)
201 _set_asn1_time(ret, when)
202 return ret
205def _get_asn1_time(timestamp: Any) -> bytes | None:
206 """
207 Retrieve the time value of an ASN1 time object.
209 @param timestamp: An ASN1_GENERALIZEDTIME* (or an object safely castable to
210 that type) from which the time value will be retrieved.
212 @return: The time value from C{timestamp} as a L{bytes} string in a certain
213 format. Or C{None} if the object contains no time value.
214 """
215 string_timestamp = _ffi.cast("ASN1_STRING*", timestamp)
216 if _lib.ASN1_STRING_length(string_timestamp) == 0:
217 return None
218 elif (
219 _lib.ASN1_STRING_type(string_timestamp) == _lib.V_ASN1_GENERALIZEDTIME
220 ):
221 return _ffi.string(_lib.ASN1_STRING_get0_data(string_timestamp))
222 else:
223 generalized_timestamp = _ffi.new("ASN1_GENERALIZEDTIME**")
224 _lib.ASN1_TIME_to_generalizedtime(timestamp, generalized_timestamp)
225 _openssl_assert(generalized_timestamp[0] != _ffi.NULL)
227 string_timestamp = _ffi.cast("ASN1_STRING*", generalized_timestamp[0])
228 string_data = _lib.ASN1_STRING_get0_data(string_timestamp)
229 string_result = _ffi.string(string_data)
230 _lib.ASN1_GENERALIZEDTIME_free(generalized_timestamp[0])
231 return string_result
234class _X509NameInvalidator:
235 def __init__(self) -> None:
236 self._names: list[X509Name] = []
238 def add(self, name: X509Name) -> None:
239 self._names.append(name)
241 def clear(self) -> None:
242 for name in self._names:
243 # Breaks the object, but also prevents UAF!
244 del name._name
247class PKey:
248 """
249 A class representing an DSA or RSA public key or key pair.
250 """
252 _only_public = False
253 _initialized = True
255 def __init__(self) -> None:
256 pkey = _lib.EVP_PKEY_new()
257 self._pkey = _ffi.gc(pkey, _lib.EVP_PKEY_free)
258 self._initialized = False
260 def to_cryptography_key(self) -> _Key:
261 """
262 Export as a ``cryptography`` key.
264 :rtype: One of ``cryptography``'s `key interfaces`_.
266 .. _key interfaces: https://cryptography.io/en/latest/hazmat/\
267 primitives/asymmetric/rsa/#key-interfaces
269 .. versionadded:: 16.1.0
270 """
271 from cryptography.hazmat.primitives.serialization import (
272 load_der_private_key,
273 load_der_public_key,
274 )
276 if self._only_public:
277 der = dump_publickey(FILETYPE_ASN1, self)
278 return typing.cast(_Key, load_der_public_key(der))
279 else:
280 der = dump_privatekey(FILETYPE_ASN1, self)
281 return typing.cast(_Key, load_der_private_key(der, password=None))
283 @classmethod
284 def from_cryptography_key(cls, crypto_key: _Key) -> PKey:
285 """
286 Construct based on a ``cryptography`` *crypto_key*.
288 :param crypto_key: A ``cryptography`` key.
289 :type crypto_key: One of ``cryptography``'s `key interfaces`_.
291 :rtype: PKey
293 .. versionadded:: 16.1.0
294 """
295 if not isinstance(
296 crypto_key,
297 (
298 dsa.DSAPrivateKey,
299 dsa.DSAPublicKey,
300 ec.EllipticCurvePrivateKey,
301 ec.EllipticCurvePublicKey,
302 ed25519.Ed25519PrivateKey,
303 ed25519.Ed25519PublicKey,
304 ed448.Ed448PrivateKey,
305 ed448.Ed448PublicKey,
306 rsa.RSAPrivateKey,
307 rsa.RSAPublicKey,
308 ),
309 ):
310 raise TypeError("Unsupported key type")
312 from cryptography.hazmat.primitives.serialization import (
313 Encoding,
314 NoEncryption,
315 PrivateFormat,
316 PublicFormat,
317 )
319 if isinstance(
320 crypto_key,
321 (
322 dsa.DSAPublicKey,
323 ec.EllipticCurvePublicKey,
324 ed25519.Ed25519PublicKey,
325 ed448.Ed448PublicKey,
326 rsa.RSAPublicKey,
327 ),
328 ):
329 return load_publickey(
330 FILETYPE_ASN1,
331 crypto_key.public_bytes(
332 Encoding.DER, PublicFormat.SubjectPublicKeyInfo
333 ),
334 )
335 else:
336 der = crypto_key.private_bytes(
337 Encoding.DER, PrivateFormat.PKCS8, NoEncryption()
338 )
339 return load_privatekey(FILETYPE_ASN1, der)
341 def generate_key(self, type: int, bits: int) -> None:
342 """
343 Generate a key pair of the given type, with the given number of bits.
345 This generates a key "into" the this object.
347 :param type: The key type.
348 :type type: :py:data:`TYPE_RSA` or :py:data:`TYPE_DSA`
349 :param bits: The number of bits.
350 :type bits: :py:data:`int` ``>= 0``
351 :raises TypeError: If :py:data:`type` or :py:data:`bits` isn't
352 of the appropriate type.
353 :raises ValueError: If the number of bits isn't an integer of
354 the appropriate size.
355 :return: ``None``
356 """
357 if not isinstance(type, int):
358 raise TypeError("type must be an integer")
360 if not isinstance(bits, int):
361 raise TypeError("bits must be an integer")
363 if type == TYPE_RSA:
364 if bits <= 0:
365 raise ValueError("Invalid number of bits")
367 # TODO Check error return
368 exponent = _lib.BN_new()
369 exponent = _ffi.gc(exponent, _lib.BN_free)
370 _lib.BN_set_word(exponent, _lib.RSA_F4)
372 rsa = _lib.RSA_new()
374 result = _lib.RSA_generate_key_ex(rsa, bits, exponent, _ffi.NULL)
375 _openssl_assert(result == 1)
377 result = _lib.EVP_PKEY_assign_RSA(self._pkey, rsa)
378 _openssl_assert(result == 1)
380 elif type == TYPE_DSA:
381 dsa = _lib.DSA_new()
382 _openssl_assert(dsa != _ffi.NULL)
384 dsa = _ffi.gc(dsa, _lib.DSA_free)
385 res = _lib.DSA_generate_parameters_ex(
386 dsa, bits, _ffi.NULL, 0, _ffi.NULL, _ffi.NULL, _ffi.NULL
387 )
388 _openssl_assert(res == 1)
390 _openssl_assert(_lib.DSA_generate_key(dsa) == 1)
391 _openssl_assert(_lib.EVP_PKEY_set1_DSA(self._pkey, dsa) == 1)
392 else:
393 raise Error("No such key type")
395 self._initialized = True
397 def check(self) -> bool:
398 """
399 Check the consistency of an RSA private key.
401 This is the Python equivalent of OpenSSL's ``RSA_check_key``.
403 :return: ``True`` if key is consistent.
405 :raise OpenSSL.crypto.Error: if the key is inconsistent.
407 :raise TypeError: if the key is of a type which cannot be checked.
408 Only RSA keys can currently be checked.
409 """
410 if self._only_public:
411 raise TypeError("public key only")
413 if _lib.EVP_PKEY_type(self.type()) != _lib.EVP_PKEY_RSA:
414 raise TypeError("Only RSA keys can currently be checked.")
416 rsa = _lib.EVP_PKEY_get1_RSA(self._pkey)
417 rsa = _ffi.gc(rsa, _lib.RSA_free)
418 result = _lib.RSA_check_key(rsa)
419 if result == 1:
420 return True
421 _raise_current_error()
423 def type(self) -> int:
424 """
425 Returns the type of the key
427 :return: The type of the key.
428 """
429 return _lib.EVP_PKEY_id(self._pkey)
431 def bits(self) -> int:
432 """
433 Returns the number of bits of the key
435 :return: The number of bits of the key.
436 """
437 return _lib.EVP_PKEY_bits(self._pkey)
440class _EllipticCurve:
441 """
442 A representation of a supported elliptic curve.
444 @cvar _curves: :py:obj:`None` until an attempt is made to load the curves.
445 Thereafter, a :py:type:`set` containing :py:type:`_EllipticCurve`
446 instances each of which represents one curve supported by the system.
447 @type _curves: :py:type:`NoneType` or :py:type:`set`
448 """
450 _curves = None
452 def __ne__(self, other: Any) -> bool:
453 """
454 Implement cooperation with the right-hand side argument of ``!=``.
456 Python 3 seems to have dropped this cooperation in this very narrow
457 circumstance.
458 """
459 if isinstance(other, _EllipticCurve):
460 return super().__ne__(other)
461 return NotImplemented
463 @classmethod
464 def _load_elliptic_curves(cls, lib: Any) -> set[_EllipticCurve]:
465 """
466 Get the curves supported by OpenSSL.
468 :param lib: The OpenSSL library binding object.
470 :return: A :py:type:`set` of ``cls`` instances giving the names of the
471 elliptic curves the underlying library supports.
472 """
473 num_curves = lib.EC_get_builtin_curves(_ffi.NULL, 0)
474 builtin_curves = _ffi.new("EC_builtin_curve[]", num_curves)
475 # The return value on this call should be num_curves again. We
476 # could check it to make sure but if it *isn't* then.. what could
477 # we do? Abort the whole process, I suppose...? -exarkun
478 lib.EC_get_builtin_curves(builtin_curves, num_curves)
479 return set(cls.from_nid(lib, c.nid) for c in builtin_curves)
481 @classmethod
482 def _get_elliptic_curves(cls, lib: Any) -> set[_EllipticCurve]:
483 """
484 Get, cache, and return the curves supported by OpenSSL.
486 :param lib: The OpenSSL library binding object.
488 :return: A :py:type:`set` of ``cls`` instances giving the names of the
489 elliptic curves the underlying library supports.
490 """
491 if cls._curves is None:
492 cls._curves = cls._load_elliptic_curves(lib)
493 return cls._curves
495 @classmethod
496 def from_nid(cls, lib: Any, nid: int) -> _EllipticCurve:
497 """
498 Instantiate a new :py:class:`_EllipticCurve` associated with the given
499 OpenSSL NID.
501 :param lib: The OpenSSL library binding object.
503 :param nid: The OpenSSL NID the resulting curve object will represent.
504 This must be a curve NID (and not, for example, a hash NID) or
505 subsequent operations will fail in unpredictable ways.
506 :type nid: :py:class:`int`
508 :return: The curve object.
509 """
510 return cls(lib, nid, _ffi.string(lib.OBJ_nid2sn(nid)).decode("ascii"))
512 def __init__(self, lib: Any, nid: int, name: str) -> None:
513 """
514 :param _lib: The :py:mod:`cryptography` binding instance used to
515 interface with OpenSSL.
517 :param _nid: The OpenSSL NID identifying the curve this object
518 represents.
519 :type _nid: :py:class:`int`
521 :param name: The OpenSSL short name identifying the curve this object
522 represents.
523 :type name: :py:class:`unicode`
524 """
525 self._lib = lib
526 self._nid = nid
527 self.name = name
529 def __repr__(self) -> str:
530 return f"<Curve {self.name!r}>"
532 def _to_EC_KEY(self) -> Any:
533 """
534 Create a new OpenSSL EC_KEY structure initialized to use this curve.
536 The structure is automatically garbage collected when the Python object
537 is garbage collected.
538 """
539 key = self._lib.EC_KEY_new_by_curve_name(self._nid)
540 return _ffi.gc(key, _lib.EC_KEY_free)
543@deprecated(
544 "get_elliptic_curves is deprecated. You should use the APIs in "
545 "cryptography instead."
546)
547def get_elliptic_curves() -> set[_EllipticCurve]:
548 """
549 Return a set of objects representing the elliptic curves supported in the
550 OpenSSL build in use.
552 The curve objects have a :py:class:`unicode` ``name`` attribute by which
553 they identify themselves.
555 The curve objects are useful as values for the argument accepted by
556 :py:meth:`Context.set_tmp_ecdh` to specify which elliptical curve should be
557 used for ECDHE key exchange.
558 """
559 return _EllipticCurve._get_elliptic_curves(_lib)
562@deprecated(
563 "get_elliptic_curve is deprecated. You should use the APIs in "
564 "cryptography instead."
565)
566def get_elliptic_curve(name: str) -> _EllipticCurve:
567 """
568 Return a single curve object selected by name.
570 See :py:func:`get_elliptic_curves` for information about curve objects.
572 :param name: The OpenSSL short name identifying the curve object to
573 retrieve.
574 :type name: :py:class:`unicode`
576 If the named curve is not supported then :py:class:`ValueError` is raised.
577 """
578 for curve in get_elliptic_curves():
579 if curve.name == name:
580 return curve
581 raise ValueError("unknown curve name", name)
584@functools.total_ordering
585class X509Name:
586 """
587 An X.509 Distinguished Name.
589 :ivar countryName: The country of the entity.
590 :ivar C: Alias for :py:attr:`countryName`.
592 :ivar stateOrProvinceName: The state or province of the entity.
593 :ivar ST: Alias for :py:attr:`stateOrProvinceName`.
595 :ivar localityName: The locality of the entity.
596 :ivar L: Alias for :py:attr:`localityName`.
598 :ivar organizationName: The organization name of the entity.
599 :ivar O: Alias for :py:attr:`organizationName`.
601 :ivar organizationalUnitName: The organizational unit of the entity.
602 :ivar OU: Alias for :py:attr:`organizationalUnitName`
604 :ivar commonName: The common name of the entity.
605 :ivar CN: Alias for :py:attr:`commonName`.
607 :ivar emailAddress: The e-mail address of the entity.
608 """
610 def __init__(self, name: X509Name) -> None:
611 """
612 Create a new X509Name, copying the given X509Name instance.
614 :param name: The name to copy.
615 :type name: :py:class:`X509Name`
616 """
617 name = _lib.X509_NAME_dup(name._name)
618 self._name: Any = _ffi.gc(name, _lib.X509_NAME_free)
620 def __setattr__(self, name: str, value: Any) -> None:
621 if name.startswith("_"):
622 return super().__setattr__(name, value)
624 # Note: we really do not want str subclasses here, so we do not use
625 # isinstance.
626 if type(name) is not str:
627 raise TypeError(
628 f"attribute name must be string, not "
629 f"'{type(value).__name__:.200}'"
630 )
632 nid = _lib.OBJ_txt2nid(_byte_string(name))
633 if nid == _lib.NID_undef:
634 try:
635 _raise_current_error()
636 except Error:
637 pass
638 raise AttributeError("No such attribute")
640 # If there's an old entry for this NID, remove it
641 for i in range(_lib.X509_NAME_entry_count(self._name)):
642 ent = _lib.X509_NAME_get_entry(self._name, i)
643 ent_obj = _lib.X509_NAME_ENTRY_get_object(ent)
644 ent_nid = _lib.OBJ_obj2nid(ent_obj)
645 if nid == ent_nid:
646 ent = _lib.X509_NAME_delete_entry(self._name, i)
647 _lib.X509_NAME_ENTRY_free(ent)
648 break
650 if isinstance(value, str):
651 value = value.encode("utf-8")
653 add_result = _lib.X509_NAME_add_entry_by_NID(
654 self._name, nid, _lib.MBSTRING_UTF8, value, -1, -1, 0
655 )
656 if not add_result:
657 _raise_current_error()
659 def __getattr__(self, name: str) -> str | None:
660 """
661 Find attribute. An X509Name object has the following attributes:
662 countryName (alias C), stateOrProvince (alias ST), locality (alias L),
663 organization (alias O), organizationalUnit (alias OU), commonName
664 (alias CN) and more...
665 """
666 nid = _lib.OBJ_txt2nid(_byte_string(name))
667 if nid == _lib.NID_undef:
668 # This is a bit weird. OBJ_txt2nid indicated failure, but it seems
669 # a lower level function, a2d_ASN1_OBJECT, also feels the need to
670 # push something onto the error queue. If we don't clean that up
671 # now, someone else will bump into it later and be quite confused.
672 # See lp#314814.
673 try:
674 _raise_current_error()
675 except Error:
676 pass
677 raise AttributeError("No such attribute")
679 entry_index = _lib.X509_NAME_get_index_by_NID(self._name, nid, -1)
680 if entry_index == -1:
681 return None
683 entry = _lib.X509_NAME_get_entry(self._name, entry_index)
684 data = _lib.X509_NAME_ENTRY_get_data(entry)
686 result_buffer = _ffi.new("unsigned char**")
687 data_length = _lib.ASN1_STRING_to_UTF8(result_buffer, data)
688 _openssl_assert(data_length >= 0)
690 try:
691 result = _ffi.buffer(result_buffer[0], data_length)[:].decode(
692 "utf-8"
693 )
694 finally:
695 # XXX untested
696 _lib.OPENSSL_free(result_buffer[0])
697 return result
699 def __eq__(self, other: Any) -> bool:
700 if not isinstance(other, X509Name):
701 return NotImplemented
703 return _lib.X509_NAME_cmp(self._name, other._name) == 0
705 def __lt__(self, other: Any) -> bool:
706 if not isinstance(other, X509Name):
707 return NotImplemented
709 return _lib.X509_NAME_cmp(self._name, other._name) < 0
711 def __repr__(self) -> str:
712 """
713 String representation of an X509Name
714 """
715 result_buffer = _ffi.new("char[]", 512)
716 format_result = _lib.X509_NAME_oneline(
717 self._name, result_buffer, len(result_buffer)
718 )
719 _openssl_assert(format_result != _ffi.NULL)
721 return "<X509Name object '{}'>".format(
722 _ffi.string(result_buffer).decode("utf-8"),
723 )
725 def hash(self) -> int:
726 """
727 Return an integer representation of the first four bytes of the
728 MD5 digest of the DER representation of the name.
730 This is the Python equivalent of OpenSSL's ``X509_NAME_hash``.
732 :return: The (integer) hash of this name.
733 :rtype: :py:class:`int`
734 """
735 return _lib.X509_NAME_hash(self._name)
737 def der(self) -> bytes:
738 """
739 Return the DER encoding of this name.
741 :return: The DER encoded form of this name.
742 :rtype: :py:class:`bytes`
743 """
744 result_buffer = _ffi.new("unsigned char**")
745 encode_result = _lib.i2d_X509_NAME(self._name, result_buffer)
746 _openssl_assert(encode_result >= 0)
748 string_result = _ffi.buffer(result_buffer[0], encode_result)[:]
749 _lib.OPENSSL_free(result_buffer[0])
750 return string_result
752 def get_components(self) -> list[tuple[bytes, bytes]]:
753 """
754 Returns the components of this name, as a sequence of 2-tuples.
756 :return: The components of this name.
757 :rtype: :py:class:`list` of ``name, value`` tuples.
758 """
759 result = []
760 for i in range(_lib.X509_NAME_entry_count(self._name)):
761 ent = _lib.X509_NAME_get_entry(self._name, i)
763 fname = _lib.X509_NAME_ENTRY_get_object(ent)
764 fval = _lib.X509_NAME_ENTRY_get_data(ent)
766 nid = _lib.OBJ_obj2nid(fname)
767 name = _lib.OBJ_nid2sn(nid)
769 # ffi.string does not handle strings containing NULL bytes
770 # (which may have been generated by old, broken software)
771 value = _ffi.buffer(
772 _lib.ASN1_STRING_get0_data(fval), _lib.ASN1_STRING_length(fval)
773 )[:]
774 result.append((_ffi.string(name), value))
776 return result
779@deprecated(
780 "X509Extension support in pyOpenSSL is deprecated. You should use the "
781 "APIs in cryptography."
782)
783class X509Extension:
784 """
785 An X.509 v3 certificate extension.
787 .. deprecated:: 23.3.0
788 Use cryptography's X509 APIs instead.
789 """
791 def __init__(
792 self,
793 type_name: bytes,
794 critical: bool,
795 value: bytes,
796 subject: X509 | None = None,
797 issuer: X509 | None = None,
798 ) -> None:
799 """
800 Initializes an X509 extension.
802 :param type_name: The name of the type of extension_ to create.
803 :type type_name: :py:data:`bytes`
805 :param bool critical: A flag indicating whether this is a critical
806 extension.
808 :param value: The OpenSSL textual representation of the extension's
809 value.
810 :type value: :py:data:`bytes`
812 :param subject: Optional X509 certificate to use as subject.
813 :type subject: :py:class:`X509`
815 :param issuer: Optional X509 certificate to use as issuer.
816 :type issuer: :py:class:`X509`
818 .. _extension: https://www.openssl.org/docs/manmaster/man5/
819 x509v3_config.html#STANDARD-EXTENSIONS
820 """
821 ctx = _ffi.new("X509V3_CTX*")
823 # A context is necessary for any extension which uses the r2i
824 # conversion method. That is, X509V3_EXT_nconf may segfault if passed
825 # a NULL ctx. Start off by initializing most of the fields to NULL.
826 _lib.X509V3_set_ctx(ctx, _ffi.NULL, _ffi.NULL, _ffi.NULL, _ffi.NULL, 0)
828 # We have no configuration database - but perhaps we should (some
829 # extensions may require it).
830 _lib.X509V3_set_ctx_nodb(ctx)
832 # Initialize the subject and issuer, if appropriate. ctx is a local,
833 # and as far as I can tell none of the X509V3_* APIs invoked here steal
834 # any references, so no need to mess with reference counts or
835 # duplicates.
836 if issuer is not None:
837 if not isinstance(issuer, X509):
838 raise TypeError("issuer must be an X509 instance")
839 ctx.issuer_cert = issuer._x509
840 if subject is not None:
841 if not isinstance(subject, X509):
842 raise TypeError("subject must be an X509 instance")
843 ctx.subject_cert = subject._x509
845 if critical:
846 # There are other OpenSSL APIs which would let us pass in critical
847 # separately, but they're harder to use, and since value is already
848 # a pile of crappy junk smuggling a ton of utterly important
849 # structured data, what's the point of trying to avoid nasty stuff
850 # with strings? (However, X509V3_EXT_i2d in particular seems like
851 # it would be a better API to invoke. I do not know where to get
852 # the ext_struc it desires for its last parameter, though.)
853 value = b"critical," + value
855 extension = _lib.X509V3_EXT_nconf(_ffi.NULL, ctx, type_name, value)
856 if extension == _ffi.NULL:
857 _raise_current_error()
858 self._extension = _ffi.gc(extension, _lib.X509_EXTENSION_free)
860 @property
861 def _nid(self) -> Any:
862 return _lib.OBJ_obj2nid(
863 _lib.X509_EXTENSION_get_object(self._extension)
864 )
866 _prefixes: typing.ClassVar[dict[int, str]] = {
867 _lib.GEN_EMAIL: "email",
868 _lib.GEN_DNS: "DNS",
869 _lib.GEN_URI: "URI",
870 }
872 def _subjectAltNameString(self) -> str:
873 names = _ffi.cast(
874 "GENERAL_NAMES*", _lib.X509V3_EXT_d2i(self._extension)
875 )
877 names = _ffi.gc(names, _lib.GENERAL_NAMES_free)
878 parts = []
879 for i in range(_lib.sk_GENERAL_NAME_num(names)):
880 name = _lib.sk_GENERAL_NAME_value(names, i)
881 try:
882 label = self._prefixes[name.type]
883 except KeyError:
884 bio = _new_mem_buf()
885 _lib.GENERAL_NAME_print(bio, name)
886 parts.append(_bio_to_string(bio).decode("utf-8"))
887 else:
888 asn1_string = _ffi.cast("ASN1_STRING*", name.d.ia5)
889 value = _ffi.buffer(
890 _lib.ASN1_STRING_get0_data(asn1_string),
891 _lib.ASN1_STRING_length(asn1_string),
892 )[:].decode("utf-8")
893 parts.append(label + ":" + value)
894 return ", ".join(parts)
896 def __str__(self) -> str:
897 """
898 :return: a nice text representation of the extension
899 """
900 if _lib.NID_subject_alt_name == self._nid:
901 return self._subjectAltNameString()
903 bio = _new_mem_buf()
904 print_result = _lib.X509V3_EXT_print(bio, self._extension, 0, 0)
905 _openssl_assert(print_result != 0)
907 return _bio_to_string(bio).decode("utf-8")
909 def get_critical(self) -> bool:
910 """
911 Returns the critical field of this X.509 extension.
913 :return: The critical field.
914 """
915 return _lib.X509_EXTENSION_get_critical(self._extension)
917 def get_short_name(self) -> bytes:
918 """
919 Returns the short type name of this X.509 extension.
921 The result is a byte string such as :py:const:`b"basicConstraints"`.
923 :return: The short type name.
924 :rtype: :py:data:`bytes`
926 .. versionadded:: 0.12
927 """
928 obj = _lib.X509_EXTENSION_get_object(self._extension)
929 nid = _lib.OBJ_obj2nid(obj)
930 # OpenSSL 3.1.0 has a bug where nid2sn returns NULL for NIDs that
931 # previously returned UNDEF. This is a workaround for that issue.
932 # https://github.com/openssl/openssl/commit/908ba3ed9adbb3df90f76
933 buf = _lib.OBJ_nid2sn(nid)
934 if buf != _ffi.NULL:
935 return _ffi.string(buf)
936 else:
937 return b"UNDEF"
939 def get_data(self) -> bytes:
940 """
941 Returns the data of the X509 extension, encoded as ASN.1.
943 :return: The ASN.1 encoded data of this X509 extension.
944 :rtype: :py:data:`bytes`
946 .. versionadded:: 0.12
947 """
948 octet_result = _lib.X509_EXTENSION_get_data(self._extension)
949 string_result = _ffi.cast("ASN1_STRING*", octet_result)
950 char_result = _lib.ASN1_STRING_get0_data(string_result)
951 result_length = _lib.ASN1_STRING_length(string_result)
952 return _ffi.buffer(char_result, result_length)[:]
955@deprecated(
956 "CSR support in pyOpenSSL is deprecated. You should use the APIs "
957 "in cryptography."
958)
959class X509Req:
960 """
961 An X.509 certificate signing requests.
963 .. deprecated:: 24.2.0
964 Use `cryptography.x509.CertificateSigningRequest` instead.
965 """
967 def __init__(self) -> None:
968 req = _lib.X509_REQ_new()
969 self._req = _ffi.gc(req, _lib.X509_REQ_free)
970 # Default to version 0.
971 self.set_version(0)
973 def to_cryptography(self) -> x509.CertificateSigningRequest:
974 """
975 Export as a ``cryptography`` certificate signing request.
977 :rtype: ``cryptography.x509.CertificateSigningRequest``
979 .. versionadded:: 17.1.0
980 """
981 from cryptography.x509 import load_der_x509_csr
983 der = _dump_certificate_request_internal(FILETYPE_ASN1, self)
985 return load_der_x509_csr(der)
987 @classmethod
988 def from_cryptography(
989 cls, crypto_req: x509.CertificateSigningRequest
990 ) -> X509Req:
991 """
992 Construct based on a ``cryptography`` *crypto_req*.
994 :param crypto_req: A ``cryptography`` X.509 certificate signing request
995 :type crypto_req: ``cryptography.x509.CertificateSigningRequest``
997 :rtype: X509Req
999 .. versionadded:: 17.1.0
1000 """
1001 if not isinstance(crypto_req, x509.CertificateSigningRequest):
1002 raise TypeError("Must be a certificate signing request")
1004 from cryptography.hazmat.primitives.serialization import Encoding
1006 der = crypto_req.public_bytes(Encoding.DER)
1007 return _load_certificate_request_internal(FILETYPE_ASN1, der)
1009 def set_pubkey(self, pkey: PKey) -> None:
1010 """
1011 Set the public key of the certificate signing request.
1013 :param pkey: The public key to use.
1014 :type pkey: :py:class:`PKey`
1016 :return: ``None``
1017 """
1018 set_result = _lib.X509_REQ_set_pubkey(self._req, pkey._pkey)
1019 _openssl_assert(set_result == 1)
1021 def get_pubkey(self) -> PKey:
1022 """
1023 Get the public key of the certificate signing request.
1025 :return: The public key.
1026 :rtype: :py:class:`PKey`
1027 """
1028 pkey = PKey.__new__(PKey)
1029 pkey._pkey = _lib.X509_REQ_get_pubkey(self._req)
1030 _openssl_assert(pkey._pkey != _ffi.NULL)
1031 pkey._pkey = _ffi.gc(pkey._pkey, _lib.EVP_PKEY_free)
1032 pkey._only_public = True
1033 return pkey
1035 def set_version(self, version: int) -> None:
1036 """
1037 Set the version subfield (RFC 2986, section 4.1) of the certificate
1038 request.
1040 :param int version: The version number.
1041 :return: ``None``
1042 """
1043 if not isinstance(version, int):
1044 raise TypeError("version must be an int")
1045 if version != 0:
1046 raise ValueError(
1047 "Invalid version. The only valid version for X509Req is 0."
1048 )
1049 set_result = _lib.X509_REQ_set_version(self._req, version)
1050 _openssl_assert(set_result == 1)
1052 def get_version(self) -> int:
1053 """
1054 Get the version subfield (RFC 2459, section 4.1.2.1) of the certificate
1055 request.
1057 :return: The value of the version subfield.
1058 :rtype: :py:class:`int`
1059 """
1060 return _lib.X509_REQ_get_version(self._req)
1062 def get_subject(self) -> X509Name:
1063 """
1064 Return the subject of this certificate signing request.
1066 This creates a new :class:`X509Name` that wraps the underlying subject
1067 name field on the certificate signing request. Modifying it will modify
1068 the underlying signing request, and will have the effect of modifying
1069 any other :class:`X509Name` that refers to this subject.
1071 :return: The subject of this certificate signing request.
1072 :rtype: :class:`X509Name`
1073 """
1074 name = X509Name.__new__(X509Name)
1075 name._name = _lib.X509_REQ_get_subject_name(self._req)
1076 _openssl_assert(name._name != _ffi.NULL)
1078 # The name is owned by the X509Req structure. As long as the X509Name
1079 # Python object is alive, keep the X509Req Python object alive.
1080 name._owner = self
1082 return name
1084 def add_extensions(self, extensions: Iterable[X509Extension]) -> None:
1085 """
1086 Add extensions to the certificate signing request.
1088 :param extensions: The X.509 extensions to add.
1089 :type extensions: iterable of :py:class:`X509Extension`
1090 :return: ``None``
1091 """
1092 warnings.warn(
1093 (
1094 "This API is deprecated and will be removed in a future "
1095 "version of pyOpenSSL. You should use pyca/cryptography's "
1096 "X.509 APIs instead."
1097 ),
1098 DeprecationWarning,
1099 stacklevel=2,
1100 )
1102 stack = _lib.sk_X509_EXTENSION_new_null()
1103 _openssl_assert(stack != _ffi.NULL)
1105 stack = _ffi.gc(stack, _lib.sk_X509_EXTENSION_free)
1107 for ext in extensions:
1108 if not isinstance(ext, X509Extension):
1109 raise ValueError("One of the elements is not an X509Extension")
1111 # TODO push can fail (here and elsewhere)
1112 _lib.sk_X509_EXTENSION_push(stack, ext._extension)
1114 add_result = _lib.X509_REQ_add_extensions(self._req, stack)
1115 _openssl_assert(add_result == 1)
1117 def get_extensions(self) -> list[X509Extension]:
1118 """
1119 Get X.509 extensions in the certificate signing request.
1121 :return: The X.509 extensions in this request.
1122 :rtype: :py:class:`list` of :py:class:`X509Extension` objects.
1124 .. versionadded:: 0.15
1125 """
1126 warnings.warn(
1127 (
1128 "This API is deprecated and will be removed in a future "
1129 "version of pyOpenSSL. You should use pyca/cryptography's "
1130 "X.509 APIs instead."
1131 ),
1132 DeprecationWarning,
1133 stacklevel=2,
1134 )
1136 exts = []
1137 native_exts_obj = _lib.X509_REQ_get_extensions(self._req)
1138 native_exts_obj = _ffi.gc(
1139 native_exts_obj,
1140 lambda x: _lib.sk_X509_EXTENSION_pop_free(
1141 x,
1142 _ffi.addressof(_lib._original_lib, "X509_EXTENSION_free"),
1143 ),
1144 )
1146 for i in range(_lib.sk_X509_EXTENSION_num(native_exts_obj)):
1147 ext = X509Extension.__new__(X509Extension)
1148 extension = _lib.X509_EXTENSION_dup(
1149 _lib.sk_X509_EXTENSION_value(native_exts_obj, i)
1150 )
1151 ext._extension = _ffi.gc(extension, _lib.X509_EXTENSION_free)
1152 exts.append(ext)
1153 return exts
1155 def sign(self, pkey: PKey, digest: str) -> None:
1156 """
1157 Sign the certificate signing request with this key and digest type.
1159 :param pkey: The key pair to sign with.
1160 :type pkey: :py:class:`PKey`
1161 :param digest: The name of the message digest to use for the signature,
1162 e.g. :py:data:`"sha256"`.
1163 :type digest: :py:class:`str`
1164 :return: ``None``
1165 """
1166 if pkey._only_public:
1167 raise ValueError("Key has only public part")
1169 if not pkey._initialized:
1170 raise ValueError("Key is uninitialized")
1172 digest_obj = _lib.EVP_get_digestbyname(_byte_string(digest))
1173 if digest_obj == _ffi.NULL:
1174 raise ValueError("No such digest method")
1176 sign_result = _lib.X509_REQ_sign(self._req, pkey._pkey, digest_obj)
1177 _openssl_assert(sign_result > 0)
1179 def verify(self, pkey: PKey) -> bool:
1180 """
1181 Verifies the signature on this certificate signing request.
1183 :param PKey key: A public key.
1185 :return: ``True`` if the signature is correct.
1186 :rtype: bool
1188 :raises OpenSSL.crypto.Error: If the signature is invalid or there is a
1189 problem verifying the signature.
1190 """
1191 if not isinstance(pkey, PKey):
1192 raise TypeError("pkey must be a PKey instance")
1194 result = _lib.X509_REQ_verify(self._req, pkey._pkey)
1195 if result <= 0:
1196 _raise_current_error()
1198 return result
1201class X509:
1202 """
1203 An X.509 certificate.
1204 """
1206 def __init__(self) -> None:
1207 x509 = _lib.X509_new()
1208 _openssl_assert(x509 != _ffi.NULL)
1209 self._x509 = _ffi.gc(x509, _lib.X509_free)
1211 self._issuer_invalidator = _X509NameInvalidator()
1212 self._subject_invalidator = _X509NameInvalidator()
1214 @classmethod
1215 def _from_raw_x509_ptr(cls, x509: Any) -> X509:
1216 cert = cls.__new__(cls)
1217 cert._x509 = _ffi.gc(x509, _lib.X509_free)
1218 cert._issuer_invalidator = _X509NameInvalidator()
1219 cert._subject_invalidator = _X509NameInvalidator()
1220 return cert
1222 def to_cryptography(self) -> x509.Certificate:
1223 """
1224 Export as a ``cryptography`` certificate.
1226 :rtype: ``cryptography.x509.Certificate``
1228 .. versionadded:: 17.1.0
1229 """
1230 from cryptography.x509 import load_der_x509_certificate
1232 der = dump_certificate(FILETYPE_ASN1, self)
1233 return load_der_x509_certificate(der)
1235 @classmethod
1236 def from_cryptography(cls, crypto_cert: x509.Certificate) -> X509:
1237 """
1238 Construct based on a ``cryptography`` *crypto_cert*.
1240 :param crypto_key: A ``cryptography`` X.509 certificate.
1241 :type crypto_key: ``cryptography.x509.Certificate``
1243 :rtype: X509
1245 .. versionadded:: 17.1.0
1246 """
1247 if not isinstance(crypto_cert, x509.Certificate):
1248 raise TypeError("Must be a certificate")
1250 from cryptography.hazmat.primitives.serialization import Encoding
1252 der = crypto_cert.public_bytes(Encoding.DER)
1253 return load_certificate(FILETYPE_ASN1, der)
1255 def set_version(self, version: int) -> None:
1256 """
1257 Set the version number of the certificate. Note that the
1258 version value is zero-based, eg. a value of 0 is V1.
1260 :param version: The version number of the certificate.
1261 :type version: :py:class:`int`
1263 :return: ``None``
1264 """
1265 if not isinstance(version, int):
1266 raise TypeError("version must be an integer")
1268 _openssl_assert(_lib.X509_set_version(self._x509, version) == 1)
1270 def get_version(self) -> int:
1271 """
1272 Return the version number of the certificate.
1274 :return: The version number of the certificate.
1275 :rtype: :py:class:`int`
1276 """
1277 return _lib.X509_get_version(self._x509)
1279 def get_pubkey(self) -> PKey:
1280 """
1281 Get the public key of the certificate.
1283 :return: The public key.
1284 :rtype: :py:class:`PKey`
1285 """
1286 pkey = PKey.__new__(PKey)
1287 pkey._pkey = _lib.X509_get_pubkey(self._x509)
1288 if pkey._pkey == _ffi.NULL:
1289 _raise_current_error()
1290 pkey._pkey = _ffi.gc(pkey._pkey, _lib.EVP_PKEY_free)
1291 pkey._only_public = True
1292 return pkey
1294 def set_pubkey(self, pkey: PKey) -> None:
1295 """
1296 Set the public key of the certificate.
1298 :param pkey: The public key.
1299 :type pkey: :py:class:`PKey`
1301 :return: :py:data:`None`
1302 """
1303 if not isinstance(pkey, PKey):
1304 raise TypeError("pkey must be a PKey instance")
1306 set_result = _lib.X509_set_pubkey(self._x509, pkey._pkey)
1307 _openssl_assert(set_result == 1)
1309 def sign(self, pkey: PKey, digest: str) -> None:
1310 """
1311 Sign the certificate with this key and digest type.
1313 :param pkey: The key to sign with.
1314 :type pkey: :py:class:`PKey`
1316 :param digest: The name of the message digest to use.
1317 :type digest: :py:class:`str`
1319 :return: :py:data:`None`
1320 """
1321 if not isinstance(pkey, PKey):
1322 raise TypeError("pkey must be a PKey instance")
1324 if pkey._only_public:
1325 raise ValueError("Key only has public part")
1327 if not pkey._initialized:
1328 raise ValueError("Key is uninitialized")
1330 evp_md = _lib.EVP_get_digestbyname(_byte_string(digest))
1331 if evp_md == _ffi.NULL:
1332 raise ValueError("No such digest method")
1334 sign_result = _lib.X509_sign(self._x509, pkey._pkey, evp_md)
1335 _openssl_assert(sign_result > 0)
1337 def get_signature_algorithm(self) -> bytes:
1338 """
1339 Return the signature algorithm used in the certificate.
1341 :return: The name of the algorithm.
1342 :rtype: :py:class:`bytes`
1344 :raises ValueError: If the signature algorithm is undefined.
1346 .. versionadded:: 0.13
1347 """
1348 sig_alg = _lib.X509_get0_tbs_sigalg(self._x509)
1349 alg = _ffi.new("ASN1_OBJECT **")
1350 _lib.X509_ALGOR_get0(alg, _ffi.NULL, _ffi.NULL, sig_alg)
1351 nid = _lib.OBJ_obj2nid(alg[0])
1352 if nid == _lib.NID_undef:
1353 raise ValueError("Undefined signature algorithm")
1354 return _ffi.string(_lib.OBJ_nid2ln(nid))
1356 def digest(self, digest_name: str) -> bytes:
1357 """
1358 Return the digest of the X509 object.
1360 :param digest_name: The name of the digest algorithm to use.
1361 :type digest_name: :py:class:`str`
1363 :return: The digest of the object, formatted as
1364 :py:const:`b":"`-delimited hex pairs.
1365 :rtype: :py:class:`bytes`
1366 """
1367 digest = _lib.EVP_get_digestbyname(_byte_string(digest_name))
1368 if digest == _ffi.NULL:
1369 raise ValueError("No such digest method")
1371 result_buffer = _ffi.new("unsigned char[]", _lib.EVP_MAX_MD_SIZE)
1372 result_length = _ffi.new("unsigned int[]", 1)
1373 result_length[0] = len(result_buffer)
1375 digest_result = _lib.X509_digest(
1376 self._x509, digest, result_buffer, result_length
1377 )
1378 _openssl_assert(digest_result == 1)
1380 return b":".join(
1381 [
1382 b16encode(ch).upper()
1383 for ch in _ffi.buffer(result_buffer, result_length[0])
1384 ]
1385 )
1387 def subject_name_hash(self) -> int:
1388 """
1389 Return the hash of the X509 subject.
1391 :return: The hash of the subject.
1392 :rtype: :py:class:`int`
1393 """
1394 return _lib.X509_subject_name_hash(self._x509)
1396 def set_serial_number(self, serial: int) -> None:
1397 """
1398 Set the serial number of the certificate.
1400 :param serial: The new serial number.
1401 :type serial: :py:class:`int`
1403 :return: :py:data`None`
1404 """
1405 if not isinstance(serial, int):
1406 raise TypeError("serial must be an integer")
1408 hex_serial = hex(serial)[2:]
1409 hex_serial_bytes = hex_serial.encode("ascii")
1411 bignum_serial = _ffi.new("BIGNUM**")
1413 # BN_hex2bn stores the result in &bignum.
1414 result = _lib.BN_hex2bn(bignum_serial, hex_serial_bytes)
1415 _openssl_assert(result != _ffi.NULL)
1417 asn1_serial = _lib.BN_to_ASN1_INTEGER(bignum_serial[0], _ffi.NULL)
1418 _lib.BN_free(bignum_serial[0])
1419 _openssl_assert(asn1_serial != _ffi.NULL)
1420 asn1_serial = _ffi.gc(asn1_serial, _lib.ASN1_INTEGER_free)
1421 set_result = _lib.X509_set_serialNumber(self._x509, asn1_serial)
1422 _openssl_assert(set_result == 1)
1424 def get_serial_number(self) -> int:
1425 """
1426 Return the serial number of this certificate.
1428 :return: The serial number.
1429 :rtype: int
1430 """
1431 asn1_serial = _lib.X509_get_serialNumber(self._x509)
1432 bignum_serial = _lib.ASN1_INTEGER_to_BN(asn1_serial, _ffi.NULL)
1433 try:
1434 hex_serial = _lib.BN_bn2hex(bignum_serial)
1435 try:
1436 hexstring_serial = _ffi.string(hex_serial)
1437 serial = int(hexstring_serial, 16)
1438 return serial
1439 finally:
1440 _lib.OPENSSL_free(hex_serial)
1441 finally:
1442 _lib.BN_free(bignum_serial)
1444 def gmtime_adj_notAfter(self, amount: int) -> None:
1445 """
1446 Adjust the time stamp on which the certificate stops being valid.
1448 :param int amount: The number of seconds by which to adjust the
1449 timestamp.
1450 :return: ``None``
1451 """
1452 if not isinstance(amount, int):
1453 raise TypeError("amount must be an integer")
1455 notAfter = _lib.X509_getm_notAfter(self._x509)
1456 _lib.X509_gmtime_adj(notAfter, amount)
1458 def gmtime_adj_notBefore(self, amount: int) -> None:
1459 """
1460 Adjust the timestamp on which the certificate starts being valid.
1462 :param amount: The number of seconds by which to adjust the timestamp.
1463 :return: ``None``
1464 """
1465 if not isinstance(amount, int):
1466 raise TypeError("amount must be an integer")
1468 notBefore = _lib.X509_getm_notBefore(self._x509)
1469 _lib.X509_gmtime_adj(notBefore, amount)
1471 def has_expired(self) -> bool:
1472 """
1473 Check whether the certificate has expired.
1475 :return: ``True`` if the certificate has expired, ``False`` otherwise.
1476 :rtype: bool
1477 """
1478 time_bytes = self.get_notAfter()
1479 if time_bytes is None:
1480 raise ValueError("Unable to determine notAfter")
1481 time_string = time_bytes.decode("utf-8")
1482 not_after = datetime.datetime.strptime(time_string, "%Y%m%d%H%M%SZ")
1484 UTC = datetime.timezone.utc
1485 utcnow = datetime.datetime.now(UTC).replace(tzinfo=None)
1486 return not_after < utcnow
1488 def _get_boundary_time(self, which: Any) -> bytes | None:
1489 return _get_asn1_time(which(self._x509))
1491 def get_notBefore(self) -> bytes | None:
1492 """
1493 Get the timestamp at which the certificate starts being valid.
1495 The timestamp is formatted as an ASN.1 TIME::
1497 YYYYMMDDhhmmssZ
1499 :return: A timestamp string, or ``None`` if there is none.
1500 :rtype: bytes or NoneType
1501 """
1502 return self._get_boundary_time(_lib.X509_getm_notBefore)
1504 def _set_boundary_time(
1505 self, which: Callable[..., Any], when: bytes
1506 ) -> None:
1507 return _set_asn1_time(which(self._x509), when)
1509 def set_notBefore(self, when: bytes) -> None:
1510 """
1511 Set the timestamp at which the certificate starts being valid.
1513 The timestamp is formatted as an ASN.1 TIME::
1515 YYYYMMDDhhmmssZ
1517 :param bytes when: A timestamp string.
1518 :return: ``None``
1519 """
1520 return self._set_boundary_time(_lib.X509_getm_notBefore, when)
1522 def get_notAfter(self) -> bytes | None:
1523 """
1524 Get the timestamp at which the certificate stops being valid.
1526 The timestamp is formatted as an ASN.1 TIME::
1528 YYYYMMDDhhmmssZ
1530 :return: A timestamp string, or ``None`` if there is none.
1531 :rtype: bytes or NoneType
1532 """
1533 return self._get_boundary_time(_lib.X509_getm_notAfter)
1535 def set_notAfter(self, when: bytes) -> None:
1536 """
1537 Set the timestamp at which the certificate stops being valid.
1539 The timestamp is formatted as an ASN.1 TIME::
1541 YYYYMMDDhhmmssZ
1543 :param bytes when: A timestamp string.
1544 :return: ``None``
1545 """
1546 return self._set_boundary_time(_lib.X509_getm_notAfter, when)
1548 def _get_name(self, which: Any) -> X509Name:
1549 name = X509Name.__new__(X509Name)
1550 name._name = which(self._x509)
1551 _openssl_assert(name._name != _ffi.NULL)
1553 # The name is owned by the X509 structure. As long as the X509Name
1554 # Python object is alive, keep the X509 Python object alive.
1555 name._owner = self
1557 return name
1559 def _set_name(self, which: Any, name: X509Name) -> None:
1560 if not isinstance(name, X509Name):
1561 raise TypeError("name must be an X509Name")
1562 set_result = which(self._x509, name._name)
1563 _openssl_assert(set_result == 1)
1565 def get_issuer(self) -> X509Name:
1566 """
1567 Return the issuer of this certificate.
1569 This creates a new :class:`X509Name` that wraps the underlying issuer
1570 name field on the certificate. Modifying it will modify the underlying
1571 certificate, and will have the effect of modifying any other
1572 :class:`X509Name` that refers to this issuer.
1574 :return: The issuer of this certificate.
1575 :rtype: :class:`X509Name`
1576 """
1577 name = self._get_name(_lib.X509_get_issuer_name)
1578 self._issuer_invalidator.add(name)
1579 return name
1581 def set_issuer(self, issuer: X509Name) -> None:
1582 """
1583 Set the issuer of this certificate.
1585 :param issuer: The issuer.
1586 :type issuer: :py:class:`X509Name`
1588 :return: ``None``
1589 """
1590 self._set_name(_lib.X509_set_issuer_name, issuer)
1591 self._issuer_invalidator.clear()
1593 def get_subject(self) -> X509Name:
1594 """
1595 Return the subject of this certificate.
1597 This creates a new :class:`X509Name` that wraps the underlying subject
1598 name field on the certificate. Modifying it will modify the underlying
1599 certificate, and will have the effect of modifying any other
1600 :class:`X509Name` that refers to this subject.
1602 :return: The subject of this certificate.
1603 :rtype: :class:`X509Name`
1604 """
1605 name = self._get_name(_lib.X509_get_subject_name)
1606 self._subject_invalidator.add(name)
1607 return name
1609 def set_subject(self, subject: X509Name) -> None:
1610 """
1611 Set the subject of this certificate.
1613 :param subject: The subject.
1614 :type subject: :py:class:`X509Name`
1616 :return: ``None``
1617 """
1618 self._set_name(_lib.X509_set_subject_name, subject)
1619 self._subject_invalidator.clear()
1621 def get_extension_count(self) -> int:
1622 """
1623 Get the number of extensions on this certificate.
1625 :return: The number of extensions.
1626 :rtype: :py:class:`int`
1628 .. versionadded:: 0.12
1629 """
1630 return _lib.X509_get_ext_count(self._x509)
1632 def add_extensions(self, extensions: Iterable[X509Extension]) -> None:
1633 """
1634 Add extensions to the certificate.
1636 :param extensions: The extensions to add.
1637 :type extensions: An iterable of :py:class:`X509Extension` objects.
1638 :return: ``None``
1639 """
1640 warnings.warn(
1641 (
1642 "This API is deprecated and will be removed in a future "
1643 "version of pyOpenSSL. You should use pyca/cryptography's "
1644 "X.509 APIs instead."
1645 ),
1646 DeprecationWarning,
1647 stacklevel=2,
1648 )
1650 for ext in extensions:
1651 if not isinstance(ext, X509Extension):
1652 raise ValueError("One of the elements is not an X509Extension")
1654 add_result = _lib.X509_add_ext(self._x509, ext._extension, -1)
1655 _openssl_assert(add_result == 1)
1657 def get_extension(self, index: int) -> X509Extension:
1658 """
1659 Get a specific extension of the certificate by index.
1661 Extensions on a certificate are kept in order. The index
1662 parameter selects which extension will be returned.
1664 :param int index: The index of the extension to retrieve.
1665 :return: The extension at the specified index.
1666 :rtype: :py:class:`X509Extension`
1667 :raises IndexError: If the extension index was out of bounds.
1669 .. versionadded:: 0.12
1670 """
1671 warnings.warn(
1672 (
1673 "This API is deprecated and will be removed in a future "
1674 "version of pyOpenSSL. You should use pyca/cryptography's "
1675 "X.509 APIs instead."
1676 ),
1677 DeprecationWarning,
1678 stacklevel=2,
1679 )
1681 ext = X509Extension.__new__(X509Extension)
1682 ext._extension = _lib.X509_get_ext(self._x509, index)
1683 if ext._extension == _ffi.NULL:
1684 raise IndexError("extension index out of bounds")
1686 extension = _lib.X509_EXTENSION_dup(ext._extension)
1687 ext._extension = _ffi.gc(extension, _lib.X509_EXTENSION_free)
1688 return ext
1691class X509StoreFlags:
1692 """
1693 Flags for X509 verification, used to change the behavior of
1694 :class:`X509Store`.
1696 See `OpenSSL Verification Flags`_ for details.
1698 .. _OpenSSL Verification Flags:
1699 https://www.openssl.org/docs/manmaster/man3/X509_VERIFY_PARAM_set_flags.html
1700 """
1702 CRL_CHECK: int = _lib.X509_V_FLAG_CRL_CHECK
1703 CRL_CHECK_ALL: int = _lib.X509_V_FLAG_CRL_CHECK_ALL
1704 IGNORE_CRITICAL: int = _lib.X509_V_FLAG_IGNORE_CRITICAL
1705 X509_STRICT: int = _lib.X509_V_FLAG_X509_STRICT
1706 ALLOW_PROXY_CERTS: int = _lib.X509_V_FLAG_ALLOW_PROXY_CERTS
1707 POLICY_CHECK: int = _lib.X509_V_FLAG_POLICY_CHECK
1708 EXPLICIT_POLICY: int = _lib.X509_V_FLAG_EXPLICIT_POLICY
1709 INHIBIT_MAP: int = _lib.X509_V_FLAG_INHIBIT_MAP
1710 CHECK_SS_SIGNATURE: int = _lib.X509_V_FLAG_CHECK_SS_SIGNATURE
1711 PARTIAL_CHAIN: int = _lib.X509_V_FLAG_PARTIAL_CHAIN
1714class X509Store:
1715 """
1716 An X.509 store.
1718 An X.509 store is used to describe a context in which to verify a
1719 certificate. A description of a context may include a set of certificates
1720 to trust, a set of certificate revocation lists, verification flags and
1721 more.
1723 An X.509 store, being only a description, cannot be used by itself to
1724 verify a certificate. To carry out the actual verification process, see
1725 :class:`X509StoreContext`.
1726 """
1728 def __init__(self) -> None:
1729 store = _lib.X509_STORE_new()
1730 self._store = _ffi.gc(store, _lib.X509_STORE_free)
1732 def add_cert(self, cert: X509) -> None:
1733 """
1734 Adds a trusted certificate to this store.
1736 Adding a certificate with this method adds this certificate as a
1737 *trusted* certificate.
1739 :param X509 cert: The certificate to add to this store.
1741 :raises TypeError: If the certificate is not an :class:`X509`.
1743 :raises OpenSSL.crypto.Error: If OpenSSL was unhappy with your
1744 certificate.
1746 :return: ``None`` if the certificate was added successfully.
1747 """
1748 if not isinstance(cert, X509):
1749 raise TypeError()
1751 res = _lib.X509_STORE_add_cert(self._store, cert._x509)
1752 _openssl_assert(res == 1)
1754 def add_crl(self, crl: x509.CertificateRevocationList) -> None:
1755 """
1756 Add a certificate revocation list to this store.
1758 The certificate revocation lists added to a store will only be used if
1759 the associated flags are configured to check certificate revocation
1760 lists.
1762 .. versionadded:: 16.1.0
1764 :param crl: The certificate revocation list to add to this store.
1765 :type crl: ``cryptography.x509.CertificateRevocationList``
1766 :return: ``None`` if the certificate revocation list was added
1767 successfully.
1768 """
1769 if isinstance(crl, x509.CertificateRevocationList):
1770 from cryptography.hazmat.primitives.serialization import Encoding
1772 bio = _new_mem_buf(crl.public_bytes(Encoding.DER))
1773 openssl_crl = _lib.d2i_X509_CRL_bio(bio, _ffi.NULL)
1774 _openssl_assert(openssl_crl != _ffi.NULL)
1775 crl = _ffi.gc(openssl_crl, _lib.X509_CRL_free)
1776 else:
1777 raise TypeError(
1778 "CRL must be of type "
1779 "cryptography.x509.CertificateRevocationList"
1780 )
1782 _openssl_assert(_lib.X509_STORE_add_crl(self._store, crl) != 0)
1784 def set_flags(self, flags: int) -> None:
1785 """
1786 Set verification flags to this store.
1788 Verification flags can be combined by oring them together.
1790 .. note::
1792 Setting a verification flag sometimes requires clients to add
1793 additional information to the store, otherwise a suitable error will
1794 be raised.
1796 For example, in setting flags to enable CRL checking a
1797 suitable CRL must be added to the store otherwise an error will be
1798 raised.
1800 .. versionadded:: 16.1.0
1802 :param int flags: The verification flags to set on this store.
1803 See :class:`X509StoreFlags` for available constants.
1804 :return: ``None`` if the verification flags were successfully set.
1805 """
1806 _openssl_assert(_lib.X509_STORE_set_flags(self._store, flags) != 0)
1808 def set_time(self, vfy_time: datetime.datetime) -> None:
1809 """
1810 Set the time against which the certificates are verified.
1812 Normally the current time is used.
1814 .. note::
1816 For example, you can determine if a certificate was valid at a given
1817 time.
1819 .. versionadded:: 17.0.0
1821 :param datetime vfy_time: The verification time to set on this store.
1822 :return: ``None`` if the verification time was successfully set.
1823 """
1824 param = _lib.X509_VERIFY_PARAM_new()
1825 param = _ffi.gc(param, _lib.X509_VERIFY_PARAM_free)
1827 _lib.X509_VERIFY_PARAM_set_time(
1828 param, calendar.timegm(vfy_time.timetuple())
1829 )
1830 _openssl_assert(_lib.X509_STORE_set1_param(self._store, param) != 0)
1832 def load_locations(
1833 self,
1834 cafile: StrOrBytesPath | None,
1835 capath: StrOrBytesPath | None = None,
1836 ) -> None:
1837 """
1838 Let X509Store know where we can find trusted certificates for the
1839 certificate chain. Note that the certificates have to be in PEM
1840 format.
1842 If *capath* is passed, it must be a directory prepared using the
1843 ``c_rehash`` tool included with OpenSSL. Either, but not both, of
1844 *cafile* or *capath* may be ``None``.
1846 .. note::
1848 Both *cafile* and *capath* may be set simultaneously.
1850 Call this method multiple times to add more than one location.
1851 For example, CA certificates, and certificate revocation list bundles
1852 may be passed in *cafile* in subsequent calls to this method.
1854 .. versionadded:: 20.0
1856 :param cafile: In which file we can find the certificates (``bytes`` or
1857 ``unicode``).
1858 :param capath: In which directory we can find the certificates
1859 (``bytes`` or ``unicode``).
1861 :return: ``None`` if the locations were set successfully.
1863 :raises OpenSSL.crypto.Error: If both *cafile* and *capath* is ``None``
1864 or the locations could not be set for any reason.
1866 """
1867 if cafile is None:
1868 cafile = _ffi.NULL
1869 else:
1870 cafile = _path_bytes(cafile)
1872 if capath is None:
1873 capath = _ffi.NULL
1874 else:
1875 capath = _path_bytes(capath)
1877 load_result = _lib.X509_STORE_load_locations(
1878 self._store, cafile, capath
1879 )
1880 if not load_result:
1881 _raise_current_error()
1884class X509StoreContextError(Exception):
1885 """
1886 An exception raised when an error occurred while verifying a certificate
1887 using `OpenSSL.X509StoreContext.verify_certificate`.
1889 :ivar certificate: The certificate which caused verificate failure.
1890 :type certificate: :class:`X509`
1891 """
1893 def __init__(
1894 self, message: str, errors: list[Any], certificate: X509
1895 ) -> None:
1896 super().__init__(message)
1897 self.errors = errors
1898 self.certificate = certificate
1901class X509StoreContext:
1902 """
1903 An X.509 store context.
1905 An X.509 store context is used to carry out the actual verification process
1906 of a certificate in a described context. For describing such a context, see
1907 :class:`X509Store`.
1909 :param X509Store store: The certificates which will be trusted for the
1910 purposes of any verifications.
1911 :param X509 certificate: The certificate to be verified.
1912 :param chain: List of untrusted certificates that may be used for building
1913 the certificate chain. May be ``None``.
1914 :type chain: :class:`list` of :class:`X509`
1915 """
1917 def __init__(
1918 self,
1919 store: X509Store,
1920 certificate: X509,
1921 chain: Sequence[X509] | None = None,
1922 ) -> None:
1923 self._store = store
1924 self._cert = certificate
1925 self._chain = self._build_certificate_stack(chain)
1927 @staticmethod
1928 def _build_certificate_stack(
1929 certificates: Sequence[X509] | None,
1930 ) -> None:
1931 def cleanup(s: Any) -> None:
1932 # Equivalent to sk_X509_pop_free, but we don't
1933 # currently have a CFFI binding for that available
1934 for i in range(_lib.sk_X509_num(s)):
1935 x = _lib.sk_X509_value(s, i)
1936 _lib.X509_free(x)
1937 _lib.sk_X509_free(s)
1939 if certificates is None or len(certificates) == 0:
1940 return _ffi.NULL
1942 stack = _lib.sk_X509_new_null()
1943 _openssl_assert(stack != _ffi.NULL)
1944 stack = _ffi.gc(stack, cleanup)
1946 for cert in certificates:
1947 if not isinstance(cert, X509):
1948 raise TypeError("One of the elements is not an X509 instance")
1950 _openssl_assert(_lib.X509_up_ref(cert._x509) > 0)
1951 if _lib.sk_X509_push(stack, cert._x509) <= 0:
1952 _lib.X509_free(cert._x509)
1953 _raise_current_error()
1955 return stack
1957 @staticmethod
1958 def _exception_from_context(store_ctx: Any) -> X509StoreContextError:
1959 """
1960 Convert an OpenSSL native context error failure into a Python
1961 exception.
1963 When a call to native OpenSSL X509_verify_cert fails, additional
1964 information about the failure can be obtained from the store context.
1965 """
1966 message = _ffi.string(
1967 _lib.X509_verify_cert_error_string(
1968 _lib.X509_STORE_CTX_get_error(store_ctx)
1969 )
1970 ).decode("utf-8")
1971 errors = [
1972 _lib.X509_STORE_CTX_get_error(store_ctx),
1973 _lib.X509_STORE_CTX_get_error_depth(store_ctx),
1974 message,
1975 ]
1976 # A context error should always be associated with a certificate, so we
1977 # expect this call to never return :class:`None`.
1978 _x509 = _lib.X509_STORE_CTX_get_current_cert(store_ctx)
1979 _cert = _lib.X509_dup(_x509)
1980 pycert = X509._from_raw_x509_ptr(_cert)
1981 return X509StoreContextError(message, errors, pycert)
1983 def _verify_certificate(self) -> Any:
1984 """
1985 Verifies the certificate and runs an X509_STORE_CTX containing the
1986 results.
1988 :raises X509StoreContextError: If an error occurred when validating a
1989 certificate in the context. Sets ``certificate`` attribute to
1990 indicate which certificate caused the error.
1991 """
1992 store_ctx = _lib.X509_STORE_CTX_new()
1993 _openssl_assert(store_ctx != _ffi.NULL)
1994 store_ctx = _ffi.gc(store_ctx, _lib.X509_STORE_CTX_free)
1996 ret = _lib.X509_STORE_CTX_init(
1997 store_ctx, self._store._store, self._cert._x509, self._chain
1998 )
1999 _openssl_assert(ret == 1)
2001 ret = _lib.X509_verify_cert(store_ctx)
2002 if ret <= 0:
2003 raise self._exception_from_context(store_ctx)
2005 return store_ctx
2007 def set_store(self, store: X509Store) -> None:
2008 """
2009 Set the context's X.509 store.
2011 .. versionadded:: 0.15
2013 :param X509Store store: The store description which will be used for
2014 the purposes of any *future* verifications.
2015 """
2016 self._store = store
2018 def verify_certificate(self) -> None:
2019 """
2020 Verify a certificate in a context.
2022 .. versionadded:: 0.15
2024 :raises X509StoreContextError: If an error occurred when validating a
2025 certificate in the context. Sets ``certificate`` attribute to
2026 indicate which certificate caused the error.
2027 """
2028 self._verify_certificate()
2030 def get_verified_chain(self) -> list[X509]:
2031 """
2032 Verify a certificate in a context and return the complete validated
2033 chain.
2035 :raises X509StoreContextError: If an error occurred when validating a
2036 certificate in the context. Sets ``certificate`` attribute to
2037 indicate which certificate caused the error.
2039 .. versionadded:: 20.0
2040 """
2041 store_ctx = self._verify_certificate()
2043 # Note: X509_STORE_CTX_get1_chain returns a deep copy of the chain.
2044 cert_stack = _lib.X509_STORE_CTX_get1_chain(store_ctx)
2045 _openssl_assert(cert_stack != _ffi.NULL)
2047 result = []
2048 for i in range(_lib.sk_X509_num(cert_stack)):
2049 cert = _lib.sk_X509_value(cert_stack, i)
2050 _openssl_assert(cert != _ffi.NULL)
2051 pycert = X509._from_raw_x509_ptr(cert)
2052 result.append(pycert)
2054 # Free the stack but not the members which are freed by the X509 class.
2055 _lib.sk_X509_free(cert_stack)
2056 return result
2059def load_certificate(type: int, buffer: bytes) -> X509:
2060 """
2061 Load a certificate (X509) from the string *buffer* encoded with the
2062 type *type*.
2064 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
2066 :param bytes buffer: The buffer the certificate is stored in
2068 :return: The X509 object
2069 """
2070 if isinstance(buffer, str):
2071 buffer = buffer.encode("ascii")
2073 bio = _new_mem_buf(buffer)
2075 if type == FILETYPE_PEM:
2076 x509 = _lib.PEM_read_bio_X509(bio, _ffi.NULL, _ffi.NULL, _ffi.NULL)
2077 elif type == FILETYPE_ASN1:
2078 x509 = _lib.d2i_X509_bio(bio, _ffi.NULL)
2079 else:
2080 raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1")
2082 if x509 == _ffi.NULL:
2083 _raise_current_error()
2085 return X509._from_raw_x509_ptr(x509)
2088def dump_certificate(type: int, cert: X509) -> bytes:
2089 """
2090 Dump the certificate *cert* into a buffer string encoded with the type
2091 *type*.
2093 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1, or
2094 FILETYPE_TEXT)
2095 :param cert: The certificate to dump
2096 :return: The buffer with the dumped certificate in
2097 """
2098 bio = _new_mem_buf()
2100 if type == FILETYPE_PEM:
2101 result_code = _lib.PEM_write_bio_X509(bio, cert._x509)
2102 elif type == FILETYPE_ASN1:
2103 result_code = _lib.i2d_X509_bio(bio, cert._x509)
2104 elif type == FILETYPE_TEXT:
2105 result_code = _lib.X509_print_ex(bio, cert._x509, 0, 0)
2106 else:
2107 raise ValueError(
2108 "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or "
2109 "FILETYPE_TEXT"
2110 )
2112 _openssl_assert(result_code == 1)
2113 return _bio_to_string(bio)
2116def dump_publickey(type: int, pkey: PKey) -> bytes:
2117 """
2118 Dump a public key to a buffer.
2120 :param type: The file type (one of :data:`FILETYPE_PEM` or
2121 :data:`FILETYPE_ASN1`).
2122 :param PKey pkey: The public key to dump
2123 :return: The buffer with the dumped key in it.
2124 :rtype: bytes
2125 """
2126 bio = _new_mem_buf()
2127 if type == FILETYPE_PEM:
2128 write_bio = _lib.PEM_write_bio_PUBKEY
2129 elif type == FILETYPE_ASN1:
2130 write_bio = _lib.i2d_PUBKEY_bio
2131 else:
2132 raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1")
2134 result_code = write_bio(bio, pkey._pkey)
2135 if result_code != 1: # pragma: no cover
2136 _raise_current_error()
2138 return _bio_to_string(bio)
2141def dump_privatekey(
2142 type: int,
2143 pkey: PKey,
2144 cipher: str | None = None,
2145 passphrase: PassphraseCallableT | None = None,
2146) -> bytes:
2147 """
2148 Dump the private key *pkey* into a buffer string encoded with the type
2149 *type*. Optionally (if *type* is :const:`FILETYPE_PEM`) encrypting it
2150 using *cipher* and *passphrase*.
2152 :param type: The file type (one of :const:`FILETYPE_PEM`,
2153 :const:`FILETYPE_ASN1`, or :const:`FILETYPE_TEXT`)
2154 :param PKey pkey: The PKey to dump
2155 :param cipher: (optional) if encrypted PEM format, the cipher to use
2156 :param passphrase: (optional) if encrypted PEM format, this can be either
2157 the passphrase to use, or a callback for providing the passphrase.
2159 :return: The buffer with the dumped key in
2160 :rtype: bytes
2161 """
2162 bio = _new_mem_buf()
2164 if not isinstance(pkey, PKey):
2165 raise TypeError("pkey must be a PKey")
2167 if cipher is not None:
2168 if passphrase is None:
2169 raise TypeError(
2170 "if a value is given for cipher "
2171 "one must also be given for passphrase"
2172 )
2173 cipher_obj = _lib.EVP_get_cipherbyname(_byte_string(cipher))
2174 if cipher_obj == _ffi.NULL:
2175 raise ValueError("Invalid cipher name")
2176 else:
2177 cipher_obj = _ffi.NULL
2179 helper = _PassphraseHelper(type, passphrase)
2180 if type == FILETYPE_PEM:
2181 result_code = _lib.PEM_write_bio_PrivateKey(
2182 bio,
2183 pkey._pkey,
2184 cipher_obj,
2185 _ffi.NULL,
2186 0,
2187 helper.callback,
2188 helper.callback_args,
2189 )
2190 helper.raise_if_problem()
2191 elif type == FILETYPE_ASN1:
2192 result_code = _lib.i2d_PrivateKey_bio(bio, pkey._pkey)
2193 elif type == FILETYPE_TEXT:
2194 if _lib.EVP_PKEY_id(pkey._pkey) != _lib.EVP_PKEY_RSA:
2195 raise TypeError("Only RSA keys are supported for FILETYPE_TEXT")
2197 rsa = _ffi.gc(_lib.EVP_PKEY_get1_RSA(pkey._pkey), _lib.RSA_free)
2198 result_code = _lib.RSA_print(bio, rsa, 0)
2199 else:
2200 raise ValueError(
2201 "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or "
2202 "FILETYPE_TEXT"
2203 )
2205 _openssl_assert(result_code != 0)
2207 return _bio_to_string(bio)
2210class _PassphraseHelper:
2211 def __init__(
2212 self,
2213 type: int,
2214 passphrase: PassphraseCallableT | None,
2215 more_args: bool = False,
2216 truncate: bool = False,
2217 ) -> None:
2218 if type != FILETYPE_PEM and passphrase is not None:
2219 raise ValueError(
2220 "only FILETYPE_PEM key format supports encryption"
2221 )
2222 self._passphrase = passphrase
2223 self._more_args = more_args
2224 self._truncate = truncate
2225 self._problems: list[Exception] = []
2227 @property
2228 def callback(self) -> Any:
2229 if self._passphrase is None:
2230 return _ffi.NULL
2231 elif isinstance(self._passphrase, bytes) or callable(self._passphrase):
2232 return _ffi.callback("pem_password_cb", self._read_passphrase)
2233 else:
2234 raise TypeError(
2235 "Last argument must be a byte string or a callable."
2236 )
2238 @property
2239 def callback_args(self) -> Any:
2240 if self._passphrase is None:
2241 return _ffi.NULL
2242 elif isinstance(self._passphrase, bytes) or callable(self._passphrase):
2243 return _ffi.NULL
2244 else:
2245 raise TypeError(
2246 "Last argument must be a byte string or a callable."
2247 )
2249 def raise_if_problem(self, exceptionType: type[Exception] = Error) -> None:
2250 if self._problems:
2251 # Flush the OpenSSL error queue
2252 try:
2253 _exception_from_error_queue(exceptionType)
2254 except exceptionType:
2255 pass
2257 raise self._problems.pop(0)
2259 def _read_passphrase(
2260 self, buf: Any, size: int, rwflag: Any, userdata: Any
2261 ) -> int:
2262 try:
2263 if callable(self._passphrase):
2264 if self._more_args:
2265 result = self._passphrase(size, rwflag, userdata)
2266 else:
2267 result = self._passphrase(rwflag)
2268 else:
2269 assert self._passphrase is not None
2270 result = self._passphrase
2271 if not isinstance(result, bytes):
2272 raise ValueError("Bytes expected")
2273 if len(result) > size:
2274 if self._truncate:
2275 result = result[:size]
2276 else:
2277 raise ValueError(
2278 "passphrase returned by callback is too long"
2279 )
2280 for i in range(len(result)):
2281 buf[i] = result[i : i + 1]
2282 return len(result)
2283 except Exception as e:
2284 self._problems.append(e)
2285 return 0
2288def load_publickey(type: int, buffer: str | bytes) -> PKey:
2289 """
2290 Load a public key from a buffer.
2292 :param type: The file type (one of :data:`FILETYPE_PEM`,
2293 :data:`FILETYPE_ASN1`).
2294 :param buffer: The buffer the key is stored in.
2295 :type buffer: A Python string object, either unicode or bytestring.
2296 :return: The PKey object.
2297 :rtype: :class:`PKey`
2298 """
2299 if isinstance(buffer, str):
2300 buffer = buffer.encode("ascii")
2302 bio = _new_mem_buf(buffer)
2304 if type == FILETYPE_PEM:
2305 evp_pkey = _lib.PEM_read_bio_PUBKEY(
2306 bio, _ffi.NULL, _ffi.NULL, _ffi.NULL
2307 )
2308 elif type == FILETYPE_ASN1:
2309 evp_pkey = _lib.d2i_PUBKEY_bio(bio, _ffi.NULL)
2310 else:
2311 raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1")
2313 if evp_pkey == _ffi.NULL:
2314 _raise_current_error()
2316 pkey = PKey.__new__(PKey)
2317 pkey._pkey = _ffi.gc(evp_pkey, _lib.EVP_PKEY_free)
2318 pkey._only_public = True
2319 return pkey
2322def load_privatekey(
2323 type: int,
2324 buffer: str | bytes,
2325 passphrase: PassphraseCallableT | None = None,
2326) -> PKey:
2327 """
2328 Load a private key (PKey) from the string *buffer* encoded with the type
2329 *type*.
2331 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
2332 :param buffer: The buffer the key is stored in
2333 :param passphrase: (optional) if encrypted PEM format, this can be
2334 either the passphrase to use, or a callback for
2335 providing the passphrase.
2337 :return: The PKey object
2338 """
2339 if isinstance(buffer, str):
2340 buffer = buffer.encode("ascii")
2342 bio = _new_mem_buf(buffer)
2344 helper = _PassphraseHelper(type, passphrase)
2345 if type == FILETYPE_PEM:
2346 evp_pkey = _lib.PEM_read_bio_PrivateKey(
2347 bio, _ffi.NULL, helper.callback, helper.callback_args
2348 )
2349 helper.raise_if_problem()
2350 elif type == FILETYPE_ASN1:
2351 evp_pkey = _lib.d2i_PrivateKey_bio(bio, _ffi.NULL)
2352 else:
2353 raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1")
2355 if evp_pkey == _ffi.NULL:
2356 _raise_current_error()
2358 pkey = PKey.__new__(PKey)
2359 pkey._pkey = _ffi.gc(evp_pkey, _lib.EVP_PKEY_free)
2360 return pkey
2363def dump_certificate_request(type: int, req: X509Req) -> bytes:
2364 """
2365 Dump the certificate request *req* into a buffer string encoded with the
2366 type *type*.
2368 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
2369 :param req: The certificate request to dump
2370 :return: The buffer with the dumped certificate request in
2373 .. deprecated:: 24.2.0
2374 Use `cryptography.x509.CertificateSigningRequest` instead.
2375 """
2376 bio = _new_mem_buf()
2378 if type == FILETYPE_PEM:
2379 result_code = _lib.PEM_write_bio_X509_REQ(bio, req._req)
2380 elif type == FILETYPE_ASN1:
2381 result_code = _lib.i2d_X509_REQ_bio(bio, req._req)
2382 elif type == FILETYPE_TEXT:
2383 result_code = _lib.X509_REQ_print_ex(bio, req._req, 0, 0)
2384 else:
2385 raise ValueError(
2386 "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or "
2387 "FILETYPE_TEXT"
2388 )
2390 _openssl_assert(result_code != 0)
2392 return _bio_to_string(bio)
2395_dump_certificate_request_internal = dump_certificate_request
2397utils.deprecated(
2398 dump_certificate_request,
2399 __name__,
2400 (
2401 "CSR support in pyOpenSSL is deprecated. You should use the APIs "
2402 "in cryptography."
2403 ),
2404 DeprecationWarning,
2405 name="dump_certificate_request",
2406)
2409def load_certificate_request(type: int, buffer: bytes) -> X509Req:
2410 """
2411 Load a certificate request (X509Req) from the string *buffer* encoded with
2412 the type *type*.
2414 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
2415 :param buffer: The buffer the certificate request is stored in
2416 :return: The X509Req object
2418 .. deprecated:: 24.2.0
2419 Use `cryptography.x509.load_der_x509_csr` or
2420 `cryptography.x509.load_pem_x509_csr` instead.
2421 """
2422 if isinstance(buffer, str):
2423 buffer = buffer.encode("ascii")
2425 bio = _new_mem_buf(buffer)
2427 if type == FILETYPE_PEM:
2428 req = _lib.PEM_read_bio_X509_REQ(bio, _ffi.NULL, _ffi.NULL, _ffi.NULL)
2429 elif type == FILETYPE_ASN1:
2430 req = _lib.d2i_X509_REQ_bio(bio, _ffi.NULL)
2431 else:
2432 raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1")
2434 _openssl_assert(req != _ffi.NULL)
2436 x509req = X509Req.__new__(X509Req)
2437 x509req._req = _ffi.gc(req, _lib.X509_REQ_free)
2438 return x509req
2441_load_certificate_request_internal = load_certificate_request
2443utils.deprecated(
2444 load_certificate_request,
2445 __name__,
2446 (
2447 "CSR support in pyOpenSSL is deprecated. You should use the APIs "
2448 "in cryptography."
2449 ),
2450 DeprecationWarning,
2451 name="load_certificate_request",
2452)