Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/cryptography/x509/base.py: 51%
458 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:50 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:50 +0000
1# This file is dual licensed under the terms of the Apache License, Version
2# 2.0, and the BSD License. See the LICENSE file in the root of this repository
3# for complete details.
5from __future__ import annotations
7import abc
8import datetime
9import os
10import typing
12from cryptography import utils
13from cryptography.hazmat.bindings._rust import x509 as rust_x509
14from cryptography.hazmat.primitives import hashes, serialization
15from cryptography.hazmat.primitives.asymmetric import (
16 dsa,
17 ec,
18 ed448,
19 ed25519,
20 padding,
21 rsa,
22 x448,
23 x25519,
24)
25from cryptography.hazmat.primitives.asymmetric.types import (
26 CertificateIssuerPrivateKeyTypes,
27 CertificateIssuerPublicKeyTypes,
28 CertificatePublicKeyTypes,
29)
30from cryptography.x509.extensions import (
31 Extension,
32 Extensions,
33 ExtensionType,
34 _make_sequence_methods,
35)
36from cryptography.x509.name import Name, _ASN1Type
37from cryptography.x509.oid import ObjectIdentifier
39_EARLIEST_UTC_TIME = datetime.datetime(1950, 1, 1)
41# This must be kept in sync with sign.rs's list of allowable types in
42# identify_hash_type
43_AllowedHashTypes = typing.Union[
44 hashes.SHA224,
45 hashes.SHA256,
46 hashes.SHA384,
47 hashes.SHA512,
48 hashes.SHA3_224,
49 hashes.SHA3_256,
50 hashes.SHA3_384,
51 hashes.SHA3_512,
52]
55class AttributeNotFound(Exception):
56 def __init__(self, msg: str, oid: ObjectIdentifier) -> None:
57 super().__init__(msg)
58 self.oid = oid
61def _reject_duplicate_extension(
62 extension: Extension[ExtensionType],
63 extensions: typing.List[Extension[ExtensionType]],
64) -> None:
65 # This is quadratic in the number of extensions
66 for e in extensions:
67 if e.oid == extension.oid:
68 raise ValueError("This extension has already been set.")
71def _reject_duplicate_attribute(
72 oid: ObjectIdentifier,
73 attributes: typing.List[
74 typing.Tuple[ObjectIdentifier, bytes, typing.Optional[int]]
75 ],
76) -> None:
77 # This is quadratic in the number of attributes
78 for attr_oid, _, _ in attributes:
79 if attr_oid == oid:
80 raise ValueError("This attribute has already been set.")
83def _convert_to_naive_utc_time(time: datetime.datetime) -> datetime.datetime:
84 """Normalizes a datetime to a naive datetime in UTC.
86 time -- datetime to normalize. Assumed to be in UTC if not timezone
87 aware.
88 """
89 if time.tzinfo is not None:
90 offset = time.utcoffset()
91 offset = offset if offset else datetime.timedelta()
92 return time.replace(tzinfo=None) - offset
93 else:
94 return time
97class Attribute:
98 def __init__(
99 self,
100 oid: ObjectIdentifier,
101 value: bytes,
102 _type: int = _ASN1Type.UTF8String.value,
103 ) -> None:
104 self._oid = oid
105 self._value = value
106 self._type = _type
108 @property
109 def oid(self) -> ObjectIdentifier:
110 return self._oid
112 @property
113 def value(self) -> bytes:
114 return self._value
116 def __repr__(self) -> str:
117 return f"<Attribute(oid={self.oid}, value={self.value!r})>"
119 def __eq__(self, other: object) -> bool:
120 if not isinstance(other, Attribute):
121 return NotImplemented
123 return (
124 self.oid == other.oid
125 and self.value == other.value
126 and self._type == other._type
127 )
129 def __hash__(self) -> int:
130 return hash((self.oid, self.value, self._type))
133class Attributes:
134 def __init__(
135 self,
136 attributes: typing.Iterable[Attribute],
137 ) -> None:
138 self._attributes = list(attributes)
140 __len__, __iter__, __getitem__ = _make_sequence_methods("_attributes")
142 def __repr__(self) -> str:
143 return f"<Attributes({self._attributes})>"
145 def get_attribute_for_oid(self, oid: ObjectIdentifier) -> Attribute:
146 for attr in self:
147 if attr.oid == oid:
148 return attr
150 raise AttributeNotFound(f"No {oid} attribute was found", oid)
153class Version(utils.Enum):
154 v1 = 0
155 v3 = 2
158class InvalidVersion(Exception):
159 def __init__(self, msg: str, parsed_version: int) -> None:
160 super().__init__(msg)
161 self.parsed_version = parsed_version
164class Certificate(metaclass=abc.ABCMeta):
165 @abc.abstractmethod
166 def fingerprint(self, algorithm: hashes.HashAlgorithm) -> bytes:
167 """
168 Returns bytes using digest passed.
169 """
171 @property
172 @abc.abstractmethod
173 def serial_number(self) -> int:
174 """
175 Returns certificate serial number
176 """
178 @property
179 @abc.abstractmethod
180 def version(self) -> Version:
181 """
182 Returns the certificate version
183 """
185 @abc.abstractmethod
186 def public_key(self) -> CertificatePublicKeyTypes:
187 """
188 Returns the public key
189 """
191 @property
192 @abc.abstractmethod
193 def not_valid_before(self) -> datetime.datetime:
194 """
195 Not before time (represented as UTC datetime)
196 """
198 @property
199 @abc.abstractmethod
200 def not_valid_after(self) -> datetime.datetime:
201 """
202 Not after time (represented as UTC datetime)
203 """
205 @property
206 @abc.abstractmethod
207 def issuer(self) -> Name:
208 """
209 Returns the issuer name object.
210 """
212 @property
213 @abc.abstractmethod
214 def subject(self) -> Name:
215 """
216 Returns the subject name object.
217 """
219 @property
220 @abc.abstractmethod
221 def signature_hash_algorithm(
222 self,
223 ) -> typing.Optional[hashes.HashAlgorithm]:
224 """
225 Returns a HashAlgorithm corresponding to the type of the digest signed
226 in the certificate.
227 """
229 @property
230 @abc.abstractmethod
231 def signature_algorithm_oid(self) -> ObjectIdentifier:
232 """
233 Returns the ObjectIdentifier of the signature algorithm.
234 """
236 @property
237 @abc.abstractmethod
238 def signature_algorithm_parameters(
239 self,
240 ) -> typing.Union[None, padding.PSS, padding.PKCS1v15, ec.ECDSA]:
241 """
242 Returns the signature algorithm parameters.
243 """
245 @property
246 @abc.abstractmethod
247 def extensions(self) -> Extensions:
248 """
249 Returns an Extensions object.
250 """
252 @property
253 @abc.abstractmethod
254 def signature(self) -> bytes:
255 """
256 Returns the signature bytes.
257 """
259 @property
260 @abc.abstractmethod
261 def tbs_certificate_bytes(self) -> bytes:
262 """
263 Returns the tbsCertificate payload bytes as defined in RFC 5280.
264 """
266 @property
267 @abc.abstractmethod
268 def tbs_precertificate_bytes(self) -> bytes:
269 """
270 Returns the tbsCertificate payload bytes with the SCT list extension
271 stripped.
272 """
274 @abc.abstractmethod
275 def __eq__(self, other: object) -> bool:
276 """
277 Checks equality.
278 """
280 @abc.abstractmethod
281 def __hash__(self) -> int:
282 """
283 Computes a hash.
284 """
286 @abc.abstractmethod
287 def public_bytes(self, encoding: serialization.Encoding) -> bytes:
288 """
289 Serializes the certificate to PEM or DER format.
290 """
292 @abc.abstractmethod
293 def verify_directly_issued_by(self, issuer: Certificate) -> None:
294 """
295 This method verifies that certificate issuer name matches the
296 issuer subject name and that the certificate is signed by the
297 issuer's private key. No other validation is performed.
298 """
301# Runtime isinstance checks need this since the rust class is not a subclass.
302Certificate.register(rust_x509.Certificate)
305class RevokedCertificate(metaclass=abc.ABCMeta):
306 @property
307 @abc.abstractmethod
308 def serial_number(self) -> int:
309 """
310 Returns the serial number of the revoked certificate.
311 """
313 @property
314 @abc.abstractmethod
315 def revocation_date(self) -> datetime.datetime:
316 """
317 Returns the date of when this certificate was revoked.
318 """
320 @property
321 @abc.abstractmethod
322 def extensions(self) -> Extensions:
323 """
324 Returns an Extensions object containing a list of Revoked extensions.
325 """
328# Runtime isinstance checks need this since the rust class is not a subclass.
329RevokedCertificate.register(rust_x509.RevokedCertificate)
332class _RawRevokedCertificate(RevokedCertificate):
333 def __init__(
334 self,
335 serial_number: int,
336 revocation_date: datetime.datetime,
337 extensions: Extensions,
338 ):
339 self._serial_number = serial_number
340 self._revocation_date = revocation_date
341 self._extensions = extensions
343 @property
344 def serial_number(self) -> int:
345 return self._serial_number
347 @property
348 def revocation_date(self) -> datetime.datetime:
349 return self._revocation_date
351 @property
352 def extensions(self) -> Extensions:
353 return self._extensions
356class CertificateRevocationList(metaclass=abc.ABCMeta):
357 @abc.abstractmethod
358 def public_bytes(self, encoding: serialization.Encoding) -> bytes:
359 """
360 Serializes the CRL to PEM or DER format.
361 """
363 @abc.abstractmethod
364 def fingerprint(self, algorithm: hashes.HashAlgorithm) -> bytes:
365 """
366 Returns bytes using digest passed.
367 """
369 @abc.abstractmethod
370 def get_revoked_certificate_by_serial_number(
371 self, serial_number: int
372 ) -> typing.Optional[RevokedCertificate]:
373 """
374 Returns an instance of RevokedCertificate or None if the serial_number
375 is not in the CRL.
376 """
378 @property
379 @abc.abstractmethod
380 def signature_hash_algorithm(
381 self,
382 ) -> typing.Optional[hashes.HashAlgorithm]:
383 """
384 Returns a HashAlgorithm corresponding to the type of the digest signed
385 in the certificate.
386 """
388 @property
389 @abc.abstractmethod
390 def signature_algorithm_oid(self) -> ObjectIdentifier:
391 """
392 Returns the ObjectIdentifier of the signature algorithm.
393 """
395 @property
396 @abc.abstractmethod
397 def issuer(self) -> Name:
398 """
399 Returns the X509Name with the issuer of this CRL.
400 """
402 @property
403 @abc.abstractmethod
404 def next_update(self) -> typing.Optional[datetime.datetime]:
405 """
406 Returns the date of next update for this CRL.
407 """
409 @property
410 @abc.abstractmethod
411 def last_update(self) -> datetime.datetime:
412 """
413 Returns the date of last update for this CRL.
414 """
416 @property
417 @abc.abstractmethod
418 def extensions(self) -> Extensions:
419 """
420 Returns an Extensions object containing a list of CRL extensions.
421 """
423 @property
424 @abc.abstractmethod
425 def signature(self) -> bytes:
426 """
427 Returns the signature bytes.
428 """
430 @property
431 @abc.abstractmethod
432 def tbs_certlist_bytes(self) -> bytes:
433 """
434 Returns the tbsCertList payload bytes as defined in RFC 5280.
435 """
437 @abc.abstractmethod
438 def __eq__(self, other: object) -> bool:
439 """
440 Checks equality.
441 """
443 @abc.abstractmethod
444 def __len__(self) -> int:
445 """
446 Number of revoked certificates in the CRL.
447 """
449 @typing.overload
450 def __getitem__(self, idx: int) -> RevokedCertificate:
451 ...
453 @typing.overload
454 def __getitem__(self, idx: slice) -> typing.List[RevokedCertificate]:
455 ...
457 @abc.abstractmethod
458 def __getitem__(
459 self, idx: typing.Union[int, slice]
460 ) -> typing.Union[RevokedCertificate, typing.List[RevokedCertificate]]:
461 """
462 Returns a revoked certificate (or slice of revoked certificates).
463 """
465 @abc.abstractmethod
466 def __iter__(self) -> typing.Iterator[RevokedCertificate]:
467 """
468 Iterator over the revoked certificates
469 """
471 @abc.abstractmethod
472 def is_signature_valid(
473 self, public_key: CertificateIssuerPublicKeyTypes
474 ) -> bool:
475 """
476 Verifies signature of revocation list against given public key.
477 """
480CertificateRevocationList.register(rust_x509.CertificateRevocationList)
483class CertificateSigningRequest(metaclass=abc.ABCMeta):
484 @abc.abstractmethod
485 def __eq__(self, other: object) -> bool:
486 """
487 Checks equality.
488 """
490 @abc.abstractmethod
491 def __hash__(self) -> int:
492 """
493 Computes a hash.
494 """
496 @abc.abstractmethod
497 def public_key(self) -> CertificatePublicKeyTypes:
498 """
499 Returns the public key
500 """
502 @property
503 @abc.abstractmethod
504 def subject(self) -> Name:
505 """
506 Returns the subject name object.
507 """
509 @property
510 @abc.abstractmethod
511 def signature_hash_algorithm(
512 self,
513 ) -> typing.Optional[hashes.HashAlgorithm]:
514 """
515 Returns a HashAlgorithm corresponding to the type of the digest signed
516 in the certificate.
517 """
519 @property
520 @abc.abstractmethod
521 def signature_algorithm_oid(self) -> ObjectIdentifier:
522 """
523 Returns the ObjectIdentifier of the signature algorithm.
524 """
526 @property
527 @abc.abstractmethod
528 def extensions(self) -> Extensions:
529 """
530 Returns the extensions in the signing request.
531 """
533 @property
534 @abc.abstractmethod
535 def attributes(self) -> Attributes:
536 """
537 Returns an Attributes object.
538 """
540 @abc.abstractmethod
541 def public_bytes(self, encoding: serialization.Encoding) -> bytes:
542 """
543 Encodes the request to PEM or DER format.
544 """
546 @property
547 @abc.abstractmethod
548 def signature(self) -> bytes:
549 """
550 Returns the signature bytes.
551 """
553 @property
554 @abc.abstractmethod
555 def tbs_certrequest_bytes(self) -> bytes:
556 """
557 Returns the PKCS#10 CertificationRequestInfo bytes as defined in RFC
558 2986.
559 """
561 @property
562 @abc.abstractmethod
563 def is_signature_valid(self) -> bool:
564 """
565 Verifies signature of signing request.
566 """
568 @abc.abstractmethod
569 def get_attribute_for_oid(self, oid: ObjectIdentifier) -> bytes:
570 """
571 Get the attribute value for a given OID.
572 """
575# Runtime isinstance checks need this since the rust class is not a subclass.
576CertificateSigningRequest.register(rust_x509.CertificateSigningRequest)
579# Backend argument preserved for API compatibility, but ignored.
580def load_pem_x509_certificate(
581 data: bytes, backend: typing.Any = None
582) -> Certificate:
583 return rust_x509.load_pem_x509_certificate(data)
586def load_pem_x509_certificates(data: bytes) -> typing.List[Certificate]:
587 return rust_x509.load_pem_x509_certificates(data)
590# Backend argument preserved for API compatibility, but ignored.
591def load_der_x509_certificate(
592 data: bytes, backend: typing.Any = None
593) -> Certificate:
594 return rust_x509.load_der_x509_certificate(data)
597# Backend argument preserved for API compatibility, but ignored.
598def load_pem_x509_csr(
599 data: bytes, backend: typing.Any = None
600) -> CertificateSigningRequest:
601 return rust_x509.load_pem_x509_csr(data)
604# Backend argument preserved for API compatibility, but ignored.
605def load_der_x509_csr(
606 data: bytes, backend: typing.Any = None
607) -> CertificateSigningRequest:
608 return rust_x509.load_der_x509_csr(data)
611# Backend argument preserved for API compatibility, but ignored.
612def load_pem_x509_crl(
613 data: bytes, backend: typing.Any = None
614) -> CertificateRevocationList:
615 return rust_x509.load_pem_x509_crl(data)
618# Backend argument preserved for API compatibility, but ignored.
619def load_der_x509_crl(
620 data: bytes, backend: typing.Any = None
621) -> CertificateRevocationList:
622 return rust_x509.load_der_x509_crl(data)
625class CertificateSigningRequestBuilder:
626 def __init__(
627 self,
628 subject_name: typing.Optional[Name] = None,
629 extensions: typing.List[Extension[ExtensionType]] = [],
630 attributes: typing.List[
631 typing.Tuple[ObjectIdentifier, bytes, typing.Optional[int]]
632 ] = [],
633 ):
634 """
635 Creates an empty X.509 certificate request (v1).
636 """
637 self._subject_name = subject_name
638 self._extensions = extensions
639 self._attributes = attributes
641 def subject_name(self, name: Name) -> CertificateSigningRequestBuilder:
642 """
643 Sets the certificate requestor's distinguished name.
644 """
645 if not isinstance(name, Name):
646 raise TypeError("Expecting x509.Name object.")
647 if self._subject_name is not None:
648 raise ValueError("The subject name may only be set once.")
649 return CertificateSigningRequestBuilder(
650 name, self._extensions, self._attributes
651 )
653 def add_extension(
654 self, extval: ExtensionType, critical: bool
655 ) -> CertificateSigningRequestBuilder:
656 """
657 Adds an X.509 extension to the certificate request.
658 """
659 if not isinstance(extval, ExtensionType):
660 raise TypeError("extension must be an ExtensionType")
662 extension = Extension(extval.oid, critical, extval)
663 _reject_duplicate_extension(extension, self._extensions)
665 return CertificateSigningRequestBuilder(
666 self._subject_name,
667 self._extensions + [extension],
668 self._attributes,
669 )
671 def add_attribute(
672 self,
673 oid: ObjectIdentifier,
674 value: bytes,
675 *,
676 _tag: typing.Optional[_ASN1Type] = None,
677 ) -> CertificateSigningRequestBuilder:
678 """
679 Adds an X.509 attribute with an OID and associated value.
680 """
681 if not isinstance(oid, ObjectIdentifier):
682 raise TypeError("oid must be an ObjectIdentifier")
684 if not isinstance(value, bytes):
685 raise TypeError("value must be bytes")
687 if _tag is not None and not isinstance(_tag, _ASN1Type):
688 raise TypeError("tag must be _ASN1Type")
690 _reject_duplicate_attribute(oid, self._attributes)
692 if _tag is not None:
693 tag = _tag.value
694 else:
695 tag = None
697 return CertificateSigningRequestBuilder(
698 self._subject_name,
699 self._extensions,
700 self._attributes + [(oid, value, tag)],
701 )
703 def sign(
704 self,
705 private_key: CertificateIssuerPrivateKeyTypes,
706 algorithm: typing.Optional[_AllowedHashTypes],
707 backend: typing.Any = None,
708 ) -> CertificateSigningRequest:
709 """
710 Signs the request using the requestor's private key.
711 """
712 if self._subject_name is None:
713 raise ValueError("A CertificateSigningRequest must have a subject")
714 return rust_x509.create_x509_csr(self, private_key, algorithm)
717class CertificateBuilder:
718 _extensions: typing.List[Extension[ExtensionType]]
720 def __init__(
721 self,
722 issuer_name: typing.Optional[Name] = None,
723 subject_name: typing.Optional[Name] = None,
724 public_key: typing.Optional[CertificatePublicKeyTypes] = None,
725 serial_number: typing.Optional[int] = None,
726 not_valid_before: typing.Optional[datetime.datetime] = None,
727 not_valid_after: typing.Optional[datetime.datetime] = None,
728 extensions: typing.List[Extension[ExtensionType]] = [],
729 ) -> None:
730 self._version = Version.v3
731 self._issuer_name = issuer_name
732 self._subject_name = subject_name
733 self._public_key = public_key
734 self._serial_number = serial_number
735 self._not_valid_before = not_valid_before
736 self._not_valid_after = not_valid_after
737 self._extensions = extensions
739 def issuer_name(self, name: Name) -> CertificateBuilder:
740 """
741 Sets the CA's distinguished name.
742 """
743 if not isinstance(name, Name):
744 raise TypeError("Expecting x509.Name object.")
745 if self._issuer_name is not None:
746 raise ValueError("The issuer name may only be set once.")
747 return CertificateBuilder(
748 name,
749 self._subject_name,
750 self._public_key,
751 self._serial_number,
752 self._not_valid_before,
753 self._not_valid_after,
754 self._extensions,
755 )
757 def subject_name(self, name: Name) -> CertificateBuilder:
758 """
759 Sets the requestor's distinguished name.
760 """
761 if not isinstance(name, Name):
762 raise TypeError("Expecting x509.Name object.")
763 if self._subject_name is not None:
764 raise ValueError("The subject name may only be set once.")
765 return CertificateBuilder(
766 self._issuer_name,
767 name,
768 self._public_key,
769 self._serial_number,
770 self._not_valid_before,
771 self._not_valid_after,
772 self._extensions,
773 )
775 def public_key(
776 self,
777 key: CertificatePublicKeyTypes,
778 ) -> CertificateBuilder:
779 """
780 Sets the requestor's public key (as found in the signing request).
781 """
782 if not isinstance(
783 key,
784 (
785 dsa.DSAPublicKey,
786 rsa.RSAPublicKey,
787 ec.EllipticCurvePublicKey,
788 ed25519.Ed25519PublicKey,
789 ed448.Ed448PublicKey,
790 x25519.X25519PublicKey,
791 x448.X448PublicKey,
792 ),
793 ):
794 raise TypeError(
795 "Expecting one of DSAPublicKey, RSAPublicKey,"
796 " EllipticCurvePublicKey, Ed25519PublicKey,"
797 " Ed448PublicKey, X25519PublicKey, or "
798 "X448PublicKey."
799 )
800 if self._public_key is not None:
801 raise ValueError("The public key may only be set once.")
802 return CertificateBuilder(
803 self._issuer_name,
804 self._subject_name,
805 key,
806 self._serial_number,
807 self._not_valid_before,
808 self._not_valid_after,
809 self._extensions,
810 )
812 def serial_number(self, number: int) -> CertificateBuilder:
813 """
814 Sets the certificate serial number.
815 """
816 if not isinstance(number, int):
817 raise TypeError("Serial number must be of integral type.")
818 if self._serial_number is not None:
819 raise ValueError("The serial number may only be set once.")
820 if number <= 0:
821 raise ValueError("The serial number should be positive.")
823 # ASN.1 integers are always signed, so most significant bit must be
824 # zero.
825 if number.bit_length() >= 160: # As defined in RFC 5280
826 raise ValueError(
827 "The serial number should not be more than 159 " "bits."
828 )
829 return CertificateBuilder(
830 self._issuer_name,
831 self._subject_name,
832 self._public_key,
833 number,
834 self._not_valid_before,
835 self._not_valid_after,
836 self._extensions,
837 )
839 def not_valid_before(self, time: datetime.datetime) -> CertificateBuilder:
840 """
841 Sets the certificate activation time.
842 """
843 if not isinstance(time, datetime.datetime):
844 raise TypeError("Expecting datetime object.")
845 if self._not_valid_before is not None:
846 raise ValueError("The not valid before may only be set once.")
847 time = _convert_to_naive_utc_time(time)
848 if time < _EARLIEST_UTC_TIME:
849 raise ValueError(
850 "The not valid before date must be on or after"
851 " 1950 January 1)."
852 )
853 if self._not_valid_after is not None and time > self._not_valid_after:
854 raise ValueError(
855 "The not valid before date must be before the not valid after "
856 "date."
857 )
858 return CertificateBuilder(
859 self._issuer_name,
860 self._subject_name,
861 self._public_key,
862 self._serial_number,
863 time,
864 self._not_valid_after,
865 self._extensions,
866 )
868 def not_valid_after(self, time: datetime.datetime) -> CertificateBuilder:
869 """
870 Sets the certificate expiration time.
871 """
872 if not isinstance(time, datetime.datetime):
873 raise TypeError("Expecting datetime object.")
874 if self._not_valid_after is not None:
875 raise ValueError("The not valid after may only be set once.")
876 time = _convert_to_naive_utc_time(time)
877 if time < _EARLIEST_UTC_TIME:
878 raise ValueError(
879 "The not valid after date must be on or after"
880 " 1950 January 1."
881 )
882 if (
883 self._not_valid_before is not None
884 and time < self._not_valid_before
885 ):
886 raise ValueError(
887 "The not valid after date must be after the not valid before "
888 "date."
889 )
890 return CertificateBuilder(
891 self._issuer_name,
892 self._subject_name,
893 self._public_key,
894 self._serial_number,
895 self._not_valid_before,
896 time,
897 self._extensions,
898 )
900 def add_extension(
901 self, extval: ExtensionType, critical: bool
902 ) -> CertificateBuilder:
903 """
904 Adds an X.509 extension to the certificate.
905 """
906 if not isinstance(extval, ExtensionType):
907 raise TypeError("extension must be an ExtensionType")
909 extension = Extension(extval.oid, critical, extval)
910 _reject_duplicate_extension(extension, self._extensions)
912 return CertificateBuilder(
913 self._issuer_name,
914 self._subject_name,
915 self._public_key,
916 self._serial_number,
917 self._not_valid_before,
918 self._not_valid_after,
919 self._extensions + [extension],
920 )
922 def sign(
923 self,
924 private_key: CertificateIssuerPrivateKeyTypes,
925 algorithm: typing.Optional[_AllowedHashTypes],
926 backend: typing.Any = None,
927 *,
928 rsa_padding: typing.Optional[
929 typing.Union[padding.PSS, padding.PKCS1v15]
930 ] = None,
931 ) -> Certificate:
932 """
933 Signs the certificate using the CA's private key.
934 """
935 if self._subject_name is None:
936 raise ValueError("A certificate must have a subject name")
938 if self._issuer_name is None:
939 raise ValueError("A certificate must have an issuer name")
941 if self._serial_number is None:
942 raise ValueError("A certificate must have a serial number")
944 if self._not_valid_before is None:
945 raise ValueError("A certificate must have a not valid before time")
947 if self._not_valid_after is None:
948 raise ValueError("A certificate must have a not valid after time")
950 if self._public_key is None:
951 raise ValueError("A certificate must have a public key")
953 if rsa_padding is not None:
954 if not isinstance(rsa_padding, (padding.PSS, padding.PKCS1v15)):
955 raise TypeError("Padding must be PSS or PKCS1v15")
956 if not isinstance(private_key, rsa.RSAPrivateKey):
957 raise TypeError("Padding is only supported for RSA keys")
959 return rust_x509.create_x509_certificate(
960 self, private_key, algorithm, rsa_padding
961 )
964class CertificateRevocationListBuilder:
965 _extensions: typing.List[Extension[ExtensionType]]
966 _revoked_certificates: typing.List[RevokedCertificate]
968 def __init__(
969 self,
970 issuer_name: typing.Optional[Name] = None,
971 last_update: typing.Optional[datetime.datetime] = None,
972 next_update: typing.Optional[datetime.datetime] = None,
973 extensions: typing.List[Extension[ExtensionType]] = [],
974 revoked_certificates: typing.List[RevokedCertificate] = [],
975 ):
976 self._issuer_name = issuer_name
977 self._last_update = last_update
978 self._next_update = next_update
979 self._extensions = extensions
980 self._revoked_certificates = revoked_certificates
982 def issuer_name(
983 self, issuer_name: Name
984 ) -> CertificateRevocationListBuilder:
985 if not isinstance(issuer_name, Name):
986 raise TypeError("Expecting x509.Name object.")
987 if self._issuer_name is not None:
988 raise ValueError("The issuer name may only be set once.")
989 return CertificateRevocationListBuilder(
990 issuer_name,
991 self._last_update,
992 self._next_update,
993 self._extensions,
994 self._revoked_certificates,
995 )
997 def last_update(
998 self, last_update: datetime.datetime
999 ) -> CertificateRevocationListBuilder:
1000 if not isinstance(last_update, datetime.datetime):
1001 raise TypeError("Expecting datetime object.")
1002 if self._last_update is not None:
1003 raise ValueError("Last update may only be set once.")
1004 last_update = _convert_to_naive_utc_time(last_update)
1005 if last_update < _EARLIEST_UTC_TIME:
1006 raise ValueError(
1007 "The last update date must be on or after" " 1950 January 1."
1008 )
1009 if self._next_update is not None and last_update > self._next_update:
1010 raise ValueError(
1011 "The last update date must be before the next update date."
1012 )
1013 return CertificateRevocationListBuilder(
1014 self._issuer_name,
1015 last_update,
1016 self._next_update,
1017 self._extensions,
1018 self._revoked_certificates,
1019 )
1021 def next_update(
1022 self, next_update: datetime.datetime
1023 ) -> CertificateRevocationListBuilder:
1024 if not isinstance(next_update, datetime.datetime):
1025 raise TypeError("Expecting datetime object.")
1026 if self._next_update is not None:
1027 raise ValueError("Last update may only be set once.")
1028 next_update = _convert_to_naive_utc_time(next_update)
1029 if next_update < _EARLIEST_UTC_TIME:
1030 raise ValueError(
1031 "The last update date must be on or after" " 1950 January 1."
1032 )
1033 if self._last_update is not None and next_update < self._last_update:
1034 raise ValueError(
1035 "The next update date must be after the last update date."
1036 )
1037 return CertificateRevocationListBuilder(
1038 self._issuer_name,
1039 self._last_update,
1040 next_update,
1041 self._extensions,
1042 self._revoked_certificates,
1043 )
1045 def add_extension(
1046 self, extval: ExtensionType, critical: bool
1047 ) -> CertificateRevocationListBuilder:
1048 """
1049 Adds an X.509 extension to the certificate revocation list.
1050 """
1051 if not isinstance(extval, ExtensionType):
1052 raise TypeError("extension must be an ExtensionType")
1054 extension = Extension(extval.oid, critical, extval)
1055 _reject_duplicate_extension(extension, self._extensions)
1056 return CertificateRevocationListBuilder(
1057 self._issuer_name,
1058 self._last_update,
1059 self._next_update,
1060 self._extensions + [extension],
1061 self._revoked_certificates,
1062 )
1064 def add_revoked_certificate(
1065 self, revoked_certificate: RevokedCertificate
1066 ) -> CertificateRevocationListBuilder:
1067 """
1068 Adds a revoked certificate to the CRL.
1069 """
1070 if not isinstance(revoked_certificate, RevokedCertificate):
1071 raise TypeError("Must be an instance of RevokedCertificate")
1073 return CertificateRevocationListBuilder(
1074 self._issuer_name,
1075 self._last_update,
1076 self._next_update,
1077 self._extensions,
1078 self._revoked_certificates + [revoked_certificate],
1079 )
1081 def sign(
1082 self,
1083 private_key: CertificateIssuerPrivateKeyTypes,
1084 algorithm: typing.Optional[_AllowedHashTypes],
1085 backend: typing.Any = None,
1086 ) -> CertificateRevocationList:
1087 if self._issuer_name is None:
1088 raise ValueError("A CRL must have an issuer name")
1090 if self._last_update is None:
1091 raise ValueError("A CRL must have a last update time")
1093 if self._next_update is None:
1094 raise ValueError("A CRL must have a next update time")
1096 return rust_x509.create_x509_crl(self, private_key, algorithm)
1099class RevokedCertificateBuilder:
1100 def __init__(
1101 self,
1102 serial_number: typing.Optional[int] = None,
1103 revocation_date: typing.Optional[datetime.datetime] = None,
1104 extensions: typing.List[Extension[ExtensionType]] = [],
1105 ):
1106 self._serial_number = serial_number
1107 self._revocation_date = revocation_date
1108 self._extensions = extensions
1110 def serial_number(self, number: int) -> RevokedCertificateBuilder:
1111 if not isinstance(number, int):
1112 raise TypeError("Serial number must be of integral type.")
1113 if self._serial_number is not None:
1114 raise ValueError("The serial number may only be set once.")
1115 if number <= 0:
1116 raise ValueError("The serial number should be positive")
1118 # ASN.1 integers are always signed, so most significant bit must be
1119 # zero.
1120 if number.bit_length() >= 160: # As defined in RFC 5280
1121 raise ValueError(
1122 "The serial number should not be more than 159 " "bits."
1123 )
1124 return RevokedCertificateBuilder(
1125 number, self._revocation_date, self._extensions
1126 )
1128 def revocation_date(
1129 self, time: datetime.datetime
1130 ) -> RevokedCertificateBuilder:
1131 if not isinstance(time, datetime.datetime):
1132 raise TypeError("Expecting datetime object.")
1133 if self._revocation_date is not None:
1134 raise ValueError("The revocation date may only be set once.")
1135 time = _convert_to_naive_utc_time(time)
1136 if time < _EARLIEST_UTC_TIME:
1137 raise ValueError(
1138 "The revocation date must be on or after" " 1950 January 1."
1139 )
1140 return RevokedCertificateBuilder(
1141 self._serial_number, time, self._extensions
1142 )
1144 def add_extension(
1145 self, extval: ExtensionType, critical: bool
1146 ) -> RevokedCertificateBuilder:
1147 if not isinstance(extval, ExtensionType):
1148 raise TypeError("extension must be an ExtensionType")
1150 extension = Extension(extval.oid, critical, extval)
1151 _reject_duplicate_extension(extension, self._extensions)
1152 return RevokedCertificateBuilder(
1153 self._serial_number,
1154 self._revocation_date,
1155 self._extensions + [extension],
1156 )
1158 def build(self, backend: typing.Any = None) -> RevokedCertificate:
1159 if self._serial_number is None:
1160 raise ValueError("A revoked certificate must have a serial number")
1161 if self._revocation_date is None:
1162 raise ValueError(
1163 "A revoked certificate must have a revocation date"
1164 )
1165 return _RawRevokedCertificate(
1166 self._serial_number,
1167 self._revocation_date,
1168 Extensions(self._extensions),
1169 )
1172def random_serial_number() -> int:
1173 return int.from_bytes(os.urandom(20), "big") >> 1