Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/cryptography/x509/base.py: 51%
449 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:36 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:36 +0000
1# This file is dual licensed under the terms of the Apache License, Version
2# 2.0, and the BSD License. See the LICENSE file in the root of this repository
3# for complete details.
6import abc
7import datetime
8import os
9import typing
11from cryptography import utils
12from cryptography.hazmat.bindings._rust import x509 as rust_x509
13from cryptography.hazmat.primitives import hashes, serialization
14from cryptography.hazmat.primitives.asymmetric import (
15 dsa,
16 ec,
17 ed448,
18 ed25519,
19 rsa,
20 x448,
21 x25519,
22)
23from cryptography.hazmat.primitives.asymmetric.types import (
24 CertificateIssuerPrivateKeyTypes,
25 CertificateIssuerPublicKeyTypes,
26 CertificatePublicKeyTypes,
27)
28from cryptography.x509.extensions import (
29 Extension,
30 Extensions,
31 ExtensionType,
32 _make_sequence_methods,
33)
34from cryptography.x509.name import Name, _ASN1Type
35from cryptography.x509.oid import ObjectIdentifier
37_EARLIEST_UTC_TIME = datetime.datetime(1950, 1, 1)
39# This must be kept in sync with sign.rs's list of allowable types in
40# identify_hash_type
41_AllowedHashTypes = typing.Union[
42 hashes.SHA224,
43 hashes.SHA256,
44 hashes.SHA384,
45 hashes.SHA512,
46 hashes.SHA3_224,
47 hashes.SHA3_256,
48 hashes.SHA3_384,
49 hashes.SHA3_512,
50]
53class AttributeNotFound(Exception):
54 def __init__(self, msg: str, oid: ObjectIdentifier) -> None:
55 super().__init__(msg)
56 self.oid = oid
59def _reject_duplicate_extension(
60 extension: Extension[ExtensionType],
61 extensions: typing.List[Extension[ExtensionType]],
62) -> None:
63 # This is quadratic in the number of extensions
64 for e in extensions:
65 if e.oid == extension.oid:
66 raise ValueError("This extension has already been set.")
69def _reject_duplicate_attribute(
70 oid: ObjectIdentifier,
71 attributes: typing.List[
72 typing.Tuple[ObjectIdentifier, bytes, typing.Optional[int]]
73 ],
74) -> None:
75 # This is quadratic in the number of attributes
76 for attr_oid, _, _ in attributes:
77 if attr_oid == oid:
78 raise ValueError("This attribute has already been set.")
81def _convert_to_naive_utc_time(time: datetime.datetime) -> datetime.datetime:
82 """Normalizes a datetime to a naive datetime in UTC.
84 time -- datetime to normalize. Assumed to be in UTC if not timezone
85 aware.
86 """
87 if time.tzinfo is not None:
88 offset = time.utcoffset()
89 offset = offset if offset else datetime.timedelta()
90 return time.replace(tzinfo=None) - offset
91 else:
92 return time
95class Attribute:
96 def __init__(
97 self,
98 oid: ObjectIdentifier,
99 value: bytes,
100 _type: int = _ASN1Type.UTF8String.value,
101 ) -> None:
102 self._oid = oid
103 self._value = value
104 self._type = _type
106 @property
107 def oid(self) -> ObjectIdentifier:
108 return self._oid
110 @property
111 def value(self) -> bytes:
112 return self._value
114 def __repr__(self) -> str:
115 return f"<Attribute(oid={self.oid}, value={self.value!r})>"
117 def __eq__(self, other: object) -> bool:
118 if not isinstance(other, Attribute):
119 return NotImplemented
121 return (
122 self.oid == other.oid
123 and self.value == other.value
124 and self._type == other._type
125 )
127 def __hash__(self) -> int:
128 return hash((self.oid, self.value, self._type))
131class Attributes:
132 def __init__(
133 self,
134 attributes: typing.Iterable[Attribute],
135 ) -> None:
136 self._attributes = list(attributes)
138 __len__, __iter__, __getitem__ = _make_sequence_methods("_attributes")
140 def __repr__(self) -> str:
141 return f"<Attributes({self._attributes})>"
143 def get_attribute_for_oid(self, oid: ObjectIdentifier) -> Attribute:
144 for attr in self:
145 if attr.oid == oid:
146 return attr
148 raise AttributeNotFound(f"No {oid} attribute was found", oid)
151class Version(utils.Enum):
152 v1 = 0
153 v3 = 2
156class InvalidVersion(Exception):
157 def __init__(self, msg: str, parsed_version: int) -> None:
158 super().__init__(msg)
159 self.parsed_version = parsed_version
162class Certificate(metaclass=abc.ABCMeta):
163 @abc.abstractmethod
164 def fingerprint(self, algorithm: hashes.HashAlgorithm) -> bytes:
165 """
166 Returns bytes using digest passed.
167 """
169 @property
170 @abc.abstractmethod
171 def serial_number(self) -> int:
172 """
173 Returns certificate serial number
174 """
176 @property
177 @abc.abstractmethod
178 def version(self) -> Version:
179 """
180 Returns the certificate version
181 """
183 @abc.abstractmethod
184 def public_key(self) -> CertificatePublicKeyTypes:
185 """
186 Returns the public key
187 """
189 @property
190 @abc.abstractmethod
191 def not_valid_before(self) -> datetime.datetime:
192 """
193 Not before time (represented as UTC datetime)
194 """
196 @property
197 @abc.abstractmethod
198 def not_valid_after(self) -> datetime.datetime:
199 """
200 Not after time (represented as UTC datetime)
201 """
203 @property
204 @abc.abstractmethod
205 def issuer(self) -> Name:
206 """
207 Returns the issuer name object.
208 """
210 @property
211 @abc.abstractmethod
212 def subject(self) -> Name:
213 """
214 Returns the subject name object.
215 """
217 @property
218 @abc.abstractmethod
219 def signature_hash_algorithm(
220 self,
221 ) -> typing.Optional[hashes.HashAlgorithm]:
222 """
223 Returns a HashAlgorithm corresponding to the type of the digest signed
224 in the certificate.
225 """
227 @property
228 @abc.abstractmethod
229 def signature_algorithm_oid(self) -> ObjectIdentifier:
230 """
231 Returns the ObjectIdentifier of the signature algorithm.
232 """
234 @property
235 @abc.abstractmethod
236 def extensions(self) -> Extensions:
237 """
238 Returns an Extensions object.
239 """
241 @property
242 @abc.abstractmethod
243 def signature(self) -> bytes:
244 """
245 Returns the signature bytes.
246 """
248 @property
249 @abc.abstractmethod
250 def tbs_certificate_bytes(self) -> bytes:
251 """
252 Returns the tbsCertificate payload bytes as defined in RFC 5280.
253 """
255 @property
256 @abc.abstractmethod
257 def tbs_precertificate_bytes(self) -> bytes:
258 """
259 Returns the tbsCertificate payload bytes with the SCT list extension
260 stripped.
261 """
263 @abc.abstractmethod
264 def __eq__(self, other: object) -> bool:
265 """
266 Checks equality.
267 """
269 @abc.abstractmethod
270 def __hash__(self) -> int:
271 """
272 Computes a hash.
273 """
275 @abc.abstractmethod
276 def public_bytes(self, encoding: serialization.Encoding) -> bytes:
277 """
278 Serializes the certificate to PEM or DER format.
279 """
281 @abc.abstractmethod
282 def verify_directly_issued_by(self, issuer: "Certificate") -> None:
283 """
284 This method verifies that certificate issuer name matches the
285 issuer subject name and that the certificate is signed by the
286 issuer's private key. No other validation is performed.
287 """
290# Runtime isinstance checks need this since the rust class is not a subclass.
291Certificate.register(rust_x509.Certificate)
294class RevokedCertificate(metaclass=abc.ABCMeta):
295 @property
296 @abc.abstractmethod
297 def serial_number(self) -> int:
298 """
299 Returns the serial number of the revoked certificate.
300 """
302 @property
303 @abc.abstractmethod
304 def revocation_date(self) -> datetime.datetime:
305 """
306 Returns the date of when this certificate was revoked.
307 """
309 @property
310 @abc.abstractmethod
311 def extensions(self) -> Extensions:
312 """
313 Returns an Extensions object containing a list of Revoked extensions.
314 """
317# Runtime isinstance checks need this since the rust class is not a subclass.
318RevokedCertificate.register(rust_x509.RevokedCertificate)
321class _RawRevokedCertificate(RevokedCertificate):
322 def __init__(
323 self,
324 serial_number: int,
325 revocation_date: datetime.datetime,
326 extensions: Extensions,
327 ):
328 self._serial_number = serial_number
329 self._revocation_date = revocation_date
330 self._extensions = extensions
332 @property
333 def serial_number(self) -> int:
334 return self._serial_number
336 @property
337 def revocation_date(self) -> datetime.datetime:
338 return self._revocation_date
340 @property
341 def extensions(self) -> Extensions:
342 return self._extensions
345class CertificateRevocationList(metaclass=abc.ABCMeta):
346 @abc.abstractmethod
347 def public_bytes(self, encoding: serialization.Encoding) -> bytes:
348 """
349 Serializes the CRL to PEM or DER format.
350 """
352 @abc.abstractmethod
353 def fingerprint(self, algorithm: hashes.HashAlgorithm) -> bytes:
354 """
355 Returns bytes using digest passed.
356 """
358 @abc.abstractmethod
359 def get_revoked_certificate_by_serial_number(
360 self, serial_number: int
361 ) -> typing.Optional[RevokedCertificate]:
362 """
363 Returns an instance of RevokedCertificate or None if the serial_number
364 is not in the CRL.
365 """
367 @property
368 @abc.abstractmethod
369 def signature_hash_algorithm(
370 self,
371 ) -> typing.Optional[hashes.HashAlgorithm]:
372 """
373 Returns a HashAlgorithm corresponding to the type of the digest signed
374 in the certificate.
375 """
377 @property
378 @abc.abstractmethod
379 def signature_algorithm_oid(self) -> ObjectIdentifier:
380 """
381 Returns the ObjectIdentifier of the signature algorithm.
382 """
384 @property
385 @abc.abstractmethod
386 def issuer(self) -> Name:
387 """
388 Returns the X509Name with the issuer of this CRL.
389 """
391 @property
392 @abc.abstractmethod
393 def next_update(self) -> typing.Optional[datetime.datetime]:
394 """
395 Returns the date of next update for this CRL.
396 """
398 @property
399 @abc.abstractmethod
400 def last_update(self) -> datetime.datetime:
401 """
402 Returns the date of last update for this CRL.
403 """
405 @property
406 @abc.abstractmethod
407 def extensions(self) -> Extensions:
408 """
409 Returns an Extensions object containing a list of CRL extensions.
410 """
412 @property
413 @abc.abstractmethod
414 def signature(self) -> bytes:
415 """
416 Returns the signature bytes.
417 """
419 @property
420 @abc.abstractmethod
421 def tbs_certlist_bytes(self) -> bytes:
422 """
423 Returns the tbsCertList payload bytes as defined in RFC 5280.
424 """
426 @abc.abstractmethod
427 def __eq__(self, other: object) -> bool:
428 """
429 Checks equality.
430 """
432 @abc.abstractmethod
433 def __len__(self) -> int:
434 """
435 Number of revoked certificates in the CRL.
436 """
438 @typing.overload
439 def __getitem__(self, idx: int) -> RevokedCertificate:
440 ...
442 @typing.overload
443 def __getitem__(self, idx: slice) -> typing.List[RevokedCertificate]:
444 ...
446 @abc.abstractmethod
447 def __getitem__(
448 self, idx: typing.Union[int, slice]
449 ) -> typing.Union[RevokedCertificate, typing.List[RevokedCertificate]]:
450 """
451 Returns a revoked certificate (or slice of revoked certificates).
452 """
454 @abc.abstractmethod
455 def __iter__(self) -> typing.Iterator[RevokedCertificate]:
456 """
457 Iterator over the revoked certificates
458 """
460 @abc.abstractmethod
461 def is_signature_valid(
462 self, public_key: CertificateIssuerPublicKeyTypes
463 ) -> bool:
464 """
465 Verifies signature of revocation list against given public key.
466 """
469CertificateRevocationList.register(rust_x509.CertificateRevocationList)
472class CertificateSigningRequest(metaclass=abc.ABCMeta):
473 @abc.abstractmethod
474 def __eq__(self, other: object) -> bool:
475 """
476 Checks equality.
477 """
479 @abc.abstractmethod
480 def __hash__(self) -> int:
481 """
482 Computes a hash.
483 """
485 @abc.abstractmethod
486 def public_key(self) -> CertificatePublicKeyTypes:
487 """
488 Returns the public key
489 """
491 @property
492 @abc.abstractmethod
493 def subject(self) -> Name:
494 """
495 Returns the subject name object.
496 """
498 @property
499 @abc.abstractmethod
500 def signature_hash_algorithm(
501 self,
502 ) -> typing.Optional[hashes.HashAlgorithm]:
503 """
504 Returns a HashAlgorithm corresponding to the type of the digest signed
505 in the certificate.
506 """
508 @property
509 @abc.abstractmethod
510 def signature_algorithm_oid(self) -> ObjectIdentifier:
511 """
512 Returns the ObjectIdentifier of the signature algorithm.
513 """
515 @property
516 @abc.abstractmethod
517 def extensions(self) -> Extensions:
518 """
519 Returns the extensions in the signing request.
520 """
522 @property
523 @abc.abstractmethod
524 def attributes(self) -> Attributes:
525 """
526 Returns an Attributes object.
527 """
529 @abc.abstractmethod
530 def public_bytes(self, encoding: serialization.Encoding) -> bytes:
531 """
532 Encodes the request to PEM or DER format.
533 """
535 @property
536 @abc.abstractmethod
537 def signature(self) -> bytes:
538 """
539 Returns the signature bytes.
540 """
542 @property
543 @abc.abstractmethod
544 def tbs_certrequest_bytes(self) -> bytes:
545 """
546 Returns the PKCS#10 CertificationRequestInfo bytes as defined in RFC
547 2986.
548 """
550 @property
551 @abc.abstractmethod
552 def is_signature_valid(self) -> bool:
553 """
554 Verifies signature of signing request.
555 """
557 @abc.abstractmethod
558 def get_attribute_for_oid(self, oid: ObjectIdentifier) -> bytes:
559 """
560 Get the attribute value for a given OID.
561 """
564# Runtime isinstance checks need this since the rust class is not a subclass.
565CertificateSigningRequest.register(rust_x509.CertificateSigningRequest)
568# Backend argument preserved for API compatibility, but ignored.
569def load_pem_x509_certificate(
570 data: bytes, backend: typing.Any = None
571) -> Certificate:
572 return rust_x509.load_pem_x509_certificate(data)
575def load_pem_x509_certificates(data: bytes) -> typing.List[Certificate]:
576 return rust_x509.load_pem_x509_certificates(data)
579# Backend argument preserved for API compatibility, but ignored.
580def load_der_x509_certificate(
581 data: bytes, backend: typing.Any = None
582) -> Certificate:
583 return rust_x509.load_der_x509_certificate(data)
586# Backend argument preserved for API compatibility, but ignored.
587def load_pem_x509_csr(
588 data: bytes, backend: typing.Any = None
589) -> CertificateSigningRequest:
590 return rust_x509.load_pem_x509_csr(data)
593# Backend argument preserved for API compatibility, but ignored.
594def load_der_x509_csr(
595 data: bytes, backend: typing.Any = None
596) -> CertificateSigningRequest:
597 return rust_x509.load_der_x509_csr(data)
600# Backend argument preserved for API compatibility, but ignored.
601def load_pem_x509_crl(
602 data: bytes, backend: typing.Any = None
603) -> CertificateRevocationList:
604 return rust_x509.load_pem_x509_crl(data)
607# Backend argument preserved for API compatibility, but ignored.
608def load_der_x509_crl(
609 data: bytes, backend: typing.Any = None
610) -> CertificateRevocationList:
611 return rust_x509.load_der_x509_crl(data)
614class CertificateSigningRequestBuilder:
615 def __init__(
616 self,
617 subject_name: typing.Optional[Name] = None,
618 extensions: typing.List[Extension[ExtensionType]] = [],
619 attributes: typing.List[
620 typing.Tuple[ObjectIdentifier, bytes, typing.Optional[int]]
621 ] = [],
622 ):
623 """
624 Creates an empty X.509 certificate request (v1).
625 """
626 self._subject_name = subject_name
627 self._extensions = extensions
628 self._attributes = attributes
630 def subject_name(self, name: Name) -> "CertificateSigningRequestBuilder":
631 """
632 Sets the certificate requestor's distinguished name.
633 """
634 if not isinstance(name, Name):
635 raise TypeError("Expecting x509.Name object.")
636 if self._subject_name is not None:
637 raise ValueError("The subject name may only be set once.")
638 return CertificateSigningRequestBuilder(
639 name, self._extensions, self._attributes
640 )
642 def add_extension(
643 self, extval: ExtensionType, critical: bool
644 ) -> "CertificateSigningRequestBuilder":
645 """
646 Adds an X.509 extension to the certificate request.
647 """
648 if not isinstance(extval, ExtensionType):
649 raise TypeError("extension must be an ExtensionType")
651 extension = Extension(extval.oid, critical, extval)
652 _reject_duplicate_extension(extension, self._extensions)
654 return CertificateSigningRequestBuilder(
655 self._subject_name,
656 self._extensions + [extension],
657 self._attributes,
658 )
660 def add_attribute(
661 self,
662 oid: ObjectIdentifier,
663 value: bytes,
664 *,
665 _tag: typing.Optional[_ASN1Type] = None,
666 ) -> "CertificateSigningRequestBuilder":
667 """
668 Adds an X.509 attribute with an OID and associated value.
669 """
670 if not isinstance(oid, ObjectIdentifier):
671 raise TypeError("oid must be an ObjectIdentifier")
673 if not isinstance(value, bytes):
674 raise TypeError("value must be bytes")
676 if _tag is not None and not isinstance(_tag, _ASN1Type):
677 raise TypeError("tag must be _ASN1Type")
679 _reject_duplicate_attribute(oid, self._attributes)
681 if _tag is not None:
682 tag = _tag.value
683 else:
684 tag = None
686 return CertificateSigningRequestBuilder(
687 self._subject_name,
688 self._extensions,
689 self._attributes + [(oid, value, tag)],
690 )
692 def sign(
693 self,
694 private_key: CertificateIssuerPrivateKeyTypes,
695 algorithm: typing.Optional[_AllowedHashTypes],
696 backend: typing.Any = None,
697 ) -> CertificateSigningRequest:
698 """
699 Signs the request using the requestor's private key.
700 """
701 if self._subject_name is None:
702 raise ValueError("A CertificateSigningRequest must have a subject")
703 return rust_x509.create_x509_csr(self, private_key, algorithm)
706class CertificateBuilder:
707 _extensions: typing.List[Extension[ExtensionType]]
709 def __init__(
710 self,
711 issuer_name: typing.Optional[Name] = None,
712 subject_name: typing.Optional[Name] = None,
713 public_key: typing.Optional[CertificatePublicKeyTypes] = None,
714 serial_number: typing.Optional[int] = None,
715 not_valid_before: typing.Optional[datetime.datetime] = None,
716 not_valid_after: typing.Optional[datetime.datetime] = None,
717 extensions: typing.List[Extension[ExtensionType]] = [],
718 ) -> None:
719 self._version = Version.v3
720 self._issuer_name = issuer_name
721 self._subject_name = subject_name
722 self._public_key = public_key
723 self._serial_number = serial_number
724 self._not_valid_before = not_valid_before
725 self._not_valid_after = not_valid_after
726 self._extensions = extensions
728 def issuer_name(self, name: Name) -> "CertificateBuilder":
729 """
730 Sets the CA's distinguished name.
731 """
732 if not isinstance(name, Name):
733 raise TypeError("Expecting x509.Name object.")
734 if self._issuer_name is not None:
735 raise ValueError("The issuer name may only be set once.")
736 return CertificateBuilder(
737 name,
738 self._subject_name,
739 self._public_key,
740 self._serial_number,
741 self._not_valid_before,
742 self._not_valid_after,
743 self._extensions,
744 )
746 def subject_name(self, name: Name) -> "CertificateBuilder":
747 """
748 Sets the requestor's distinguished name.
749 """
750 if not isinstance(name, Name):
751 raise TypeError("Expecting x509.Name object.")
752 if self._subject_name is not None:
753 raise ValueError("The subject name may only be set once.")
754 return CertificateBuilder(
755 self._issuer_name,
756 name,
757 self._public_key,
758 self._serial_number,
759 self._not_valid_before,
760 self._not_valid_after,
761 self._extensions,
762 )
764 def public_key(
765 self,
766 key: CertificatePublicKeyTypes,
767 ) -> "CertificateBuilder":
768 """
769 Sets the requestor's public key (as found in the signing request).
770 """
771 if not isinstance(
772 key,
773 (
774 dsa.DSAPublicKey,
775 rsa.RSAPublicKey,
776 ec.EllipticCurvePublicKey,
777 ed25519.Ed25519PublicKey,
778 ed448.Ed448PublicKey,
779 x25519.X25519PublicKey,
780 x448.X448PublicKey,
781 ),
782 ):
783 raise TypeError(
784 "Expecting one of DSAPublicKey, RSAPublicKey,"
785 " EllipticCurvePublicKey, Ed25519PublicKey,"
786 " Ed448PublicKey, X25519PublicKey, or "
787 "X448PublicKey."
788 )
789 if self._public_key is not None:
790 raise ValueError("The public key may only be set once.")
791 return CertificateBuilder(
792 self._issuer_name,
793 self._subject_name,
794 key,
795 self._serial_number,
796 self._not_valid_before,
797 self._not_valid_after,
798 self._extensions,
799 )
801 def serial_number(self, number: int) -> "CertificateBuilder":
802 """
803 Sets the certificate serial number.
804 """
805 if not isinstance(number, int):
806 raise TypeError("Serial number must be of integral type.")
807 if self._serial_number is not None:
808 raise ValueError("The serial number may only be set once.")
809 if number <= 0:
810 raise ValueError("The serial number should be positive.")
812 # ASN.1 integers are always signed, so most significant bit must be
813 # zero.
814 if number.bit_length() >= 160: # As defined in RFC 5280
815 raise ValueError(
816 "The serial number should not be more than 159 " "bits."
817 )
818 return CertificateBuilder(
819 self._issuer_name,
820 self._subject_name,
821 self._public_key,
822 number,
823 self._not_valid_before,
824 self._not_valid_after,
825 self._extensions,
826 )
828 def not_valid_before(
829 self, time: datetime.datetime
830 ) -> "CertificateBuilder":
831 """
832 Sets the certificate activation time.
833 """
834 if not isinstance(time, datetime.datetime):
835 raise TypeError("Expecting datetime object.")
836 if self._not_valid_before is not None:
837 raise ValueError("The not valid before may only be set once.")
838 time = _convert_to_naive_utc_time(time)
839 if time < _EARLIEST_UTC_TIME:
840 raise ValueError(
841 "The not valid before date must be on or after"
842 " 1950 January 1)."
843 )
844 if self._not_valid_after is not None and time > self._not_valid_after:
845 raise ValueError(
846 "The not valid before date must be before the not valid after "
847 "date."
848 )
849 return CertificateBuilder(
850 self._issuer_name,
851 self._subject_name,
852 self._public_key,
853 self._serial_number,
854 time,
855 self._not_valid_after,
856 self._extensions,
857 )
859 def not_valid_after(self, time: datetime.datetime) -> "CertificateBuilder":
860 """
861 Sets the certificate expiration time.
862 """
863 if not isinstance(time, datetime.datetime):
864 raise TypeError("Expecting datetime object.")
865 if self._not_valid_after is not None:
866 raise ValueError("The not valid after may only be set once.")
867 time = _convert_to_naive_utc_time(time)
868 if time < _EARLIEST_UTC_TIME:
869 raise ValueError(
870 "The not valid after date must be on or after"
871 " 1950 January 1."
872 )
873 if (
874 self._not_valid_before is not None
875 and time < self._not_valid_before
876 ):
877 raise ValueError(
878 "The not valid after date must be after the not valid before "
879 "date."
880 )
881 return CertificateBuilder(
882 self._issuer_name,
883 self._subject_name,
884 self._public_key,
885 self._serial_number,
886 self._not_valid_before,
887 time,
888 self._extensions,
889 )
891 def add_extension(
892 self, extval: ExtensionType, critical: bool
893 ) -> "CertificateBuilder":
894 """
895 Adds an X.509 extension to the certificate.
896 """
897 if not isinstance(extval, ExtensionType):
898 raise TypeError("extension must be an ExtensionType")
900 extension = Extension(extval.oid, critical, extval)
901 _reject_duplicate_extension(extension, self._extensions)
903 return CertificateBuilder(
904 self._issuer_name,
905 self._subject_name,
906 self._public_key,
907 self._serial_number,
908 self._not_valid_before,
909 self._not_valid_after,
910 self._extensions + [extension],
911 )
913 def sign(
914 self,
915 private_key: CertificateIssuerPrivateKeyTypes,
916 algorithm: typing.Optional[_AllowedHashTypes],
917 backend: typing.Any = None,
918 ) -> Certificate:
919 """
920 Signs the certificate using the CA's private key.
921 """
922 if self._subject_name is None:
923 raise ValueError("A certificate must have a subject name")
925 if self._issuer_name is None:
926 raise ValueError("A certificate must have an issuer name")
928 if self._serial_number is None:
929 raise ValueError("A certificate must have a serial number")
931 if self._not_valid_before is None:
932 raise ValueError("A certificate must have a not valid before time")
934 if self._not_valid_after is None:
935 raise ValueError("A certificate must have a not valid after time")
937 if self._public_key is None:
938 raise ValueError("A certificate must have a public key")
940 return rust_x509.create_x509_certificate(self, private_key, algorithm)
943class CertificateRevocationListBuilder:
944 _extensions: typing.List[Extension[ExtensionType]]
945 _revoked_certificates: typing.List[RevokedCertificate]
947 def __init__(
948 self,
949 issuer_name: typing.Optional[Name] = None,
950 last_update: typing.Optional[datetime.datetime] = None,
951 next_update: typing.Optional[datetime.datetime] = None,
952 extensions: typing.List[Extension[ExtensionType]] = [],
953 revoked_certificates: typing.List[RevokedCertificate] = [],
954 ):
955 self._issuer_name = issuer_name
956 self._last_update = last_update
957 self._next_update = next_update
958 self._extensions = extensions
959 self._revoked_certificates = revoked_certificates
961 def issuer_name(
962 self, issuer_name: Name
963 ) -> "CertificateRevocationListBuilder":
964 if not isinstance(issuer_name, Name):
965 raise TypeError("Expecting x509.Name object.")
966 if self._issuer_name is not None:
967 raise ValueError("The issuer name may only be set once.")
968 return CertificateRevocationListBuilder(
969 issuer_name,
970 self._last_update,
971 self._next_update,
972 self._extensions,
973 self._revoked_certificates,
974 )
976 def last_update(
977 self, last_update: datetime.datetime
978 ) -> "CertificateRevocationListBuilder":
979 if not isinstance(last_update, datetime.datetime):
980 raise TypeError("Expecting datetime object.")
981 if self._last_update is not None:
982 raise ValueError("Last update may only be set once.")
983 last_update = _convert_to_naive_utc_time(last_update)
984 if last_update < _EARLIEST_UTC_TIME:
985 raise ValueError(
986 "The last update date must be on or after" " 1950 January 1."
987 )
988 if self._next_update is not None and last_update > self._next_update:
989 raise ValueError(
990 "The last update date must be before the next update date."
991 )
992 return CertificateRevocationListBuilder(
993 self._issuer_name,
994 last_update,
995 self._next_update,
996 self._extensions,
997 self._revoked_certificates,
998 )
1000 def next_update(
1001 self, next_update: datetime.datetime
1002 ) -> "CertificateRevocationListBuilder":
1003 if not isinstance(next_update, datetime.datetime):
1004 raise TypeError("Expecting datetime object.")
1005 if self._next_update is not None:
1006 raise ValueError("Last update may only be set once.")
1007 next_update = _convert_to_naive_utc_time(next_update)
1008 if next_update < _EARLIEST_UTC_TIME:
1009 raise ValueError(
1010 "The last update date must be on or after" " 1950 January 1."
1011 )
1012 if self._last_update is not None and next_update < self._last_update:
1013 raise ValueError(
1014 "The next update date must be after the last update date."
1015 )
1016 return CertificateRevocationListBuilder(
1017 self._issuer_name,
1018 self._last_update,
1019 next_update,
1020 self._extensions,
1021 self._revoked_certificates,
1022 )
1024 def add_extension(
1025 self, extval: ExtensionType, critical: bool
1026 ) -> "CertificateRevocationListBuilder":
1027 """
1028 Adds an X.509 extension to the certificate revocation list.
1029 """
1030 if not isinstance(extval, ExtensionType):
1031 raise TypeError("extension must be an ExtensionType")
1033 extension = Extension(extval.oid, critical, extval)
1034 _reject_duplicate_extension(extension, self._extensions)
1035 return CertificateRevocationListBuilder(
1036 self._issuer_name,
1037 self._last_update,
1038 self._next_update,
1039 self._extensions + [extension],
1040 self._revoked_certificates,
1041 )
1043 def add_revoked_certificate(
1044 self, revoked_certificate: RevokedCertificate
1045 ) -> "CertificateRevocationListBuilder":
1046 """
1047 Adds a revoked certificate to the CRL.
1048 """
1049 if not isinstance(revoked_certificate, RevokedCertificate):
1050 raise TypeError("Must be an instance of RevokedCertificate")
1052 return CertificateRevocationListBuilder(
1053 self._issuer_name,
1054 self._last_update,
1055 self._next_update,
1056 self._extensions,
1057 self._revoked_certificates + [revoked_certificate],
1058 )
1060 def sign(
1061 self,
1062 private_key: CertificateIssuerPrivateKeyTypes,
1063 algorithm: typing.Optional[_AllowedHashTypes],
1064 backend: typing.Any = None,
1065 ) -> CertificateRevocationList:
1066 if self._issuer_name is None:
1067 raise ValueError("A CRL must have an issuer name")
1069 if self._last_update is None:
1070 raise ValueError("A CRL must have a last update time")
1072 if self._next_update is None:
1073 raise ValueError("A CRL must have a next update time")
1075 return rust_x509.create_x509_crl(self, private_key, algorithm)
1078class RevokedCertificateBuilder:
1079 def __init__(
1080 self,
1081 serial_number: typing.Optional[int] = None,
1082 revocation_date: typing.Optional[datetime.datetime] = None,
1083 extensions: typing.List[Extension[ExtensionType]] = [],
1084 ):
1085 self._serial_number = serial_number
1086 self._revocation_date = revocation_date
1087 self._extensions = extensions
1089 def serial_number(self, number: int) -> "RevokedCertificateBuilder":
1090 if not isinstance(number, int):
1091 raise TypeError("Serial number must be of integral type.")
1092 if self._serial_number is not None:
1093 raise ValueError("The serial number may only be set once.")
1094 if number <= 0:
1095 raise ValueError("The serial number should be positive")
1097 # ASN.1 integers are always signed, so most significant bit must be
1098 # zero.
1099 if number.bit_length() >= 160: # As defined in RFC 5280
1100 raise ValueError(
1101 "The serial number should not be more than 159 " "bits."
1102 )
1103 return RevokedCertificateBuilder(
1104 number, self._revocation_date, self._extensions
1105 )
1107 def revocation_date(
1108 self, time: datetime.datetime
1109 ) -> "RevokedCertificateBuilder":
1110 if not isinstance(time, datetime.datetime):
1111 raise TypeError("Expecting datetime object.")
1112 if self._revocation_date is not None:
1113 raise ValueError("The revocation date may only be set once.")
1114 time = _convert_to_naive_utc_time(time)
1115 if time < _EARLIEST_UTC_TIME:
1116 raise ValueError(
1117 "The revocation date must be on or after" " 1950 January 1."
1118 )
1119 return RevokedCertificateBuilder(
1120 self._serial_number, time, self._extensions
1121 )
1123 def add_extension(
1124 self, extval: ExtensionType, critical: bool
1125 ) -> "RevokedCertificateBuilder":
1126 if not isinstance(extval, ExtensionType):
1127 raise TypeError("extension must be an ExtensionType")
1129 extension = Extension(extval.oid, critical, extval)
1130 _reject_duplicate_extension(extension, self._extensions)
1131 return RevokedCertificateBuilder(
1132 self._serial_number,
1133 self._revocation_date,
1134 self._extensions + [extension],
1135 )
1137 def build(self, backend: typing.Any = None) -> RevokedCertificate:
1138 if self._serial_number is None:
1139 raise ValueError("A revoked certificate must have a serial number")
1140 if self._revocation_date is None:
1141 raise ValueError(
1142 "A revoked certificate must have a revocation date"
1143 )
1144 return _RawRevokedCertificate(
1145 self._serial_number,
1146 self._revocation_date,
1147 Extensions(self._extensions),
1148 )
1151def random_serial_number() -> int:
1152 return int.from_bytes(os.urandom(20), "big") >> 1