Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/cryptography/x509/base.py: 47%
413 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
1# This file is dual licensed under the terms of the Apache License, Version
2# 2.0, and the BSD License. See the LICENSE file in the root of this repository
3# for complete details.
6import abc
7import datetime
8import os
9import typing
11from cryptography import utils
12from cryptography.hazmat.bindings._rust import x509 as rust_x509
13from cryptography.hazmat.primitives import hashes, serialization
14from cryptography.hazmat.primitives.asymmetric import (
15 dsa,
16 ec,
17 ed25519,
18 ed448,
19 rsa,
20 x25519,
21 x448,
22)
23from cryptography.hazmat.primitives.asymmetric.types import (
24 CERTIFICATE_ISSUER_PUBLIC_KEY_TYPES,
25 CERTIFICATE_PRIVATE_KEY_TYPES,
26 CERTIFICATE_PUBLIC_KEY_TYPES,
27)
28from cryptography.x509.extensions import (
29 Extension,
30 ExtensionType,
31 Extensions,
32 _make_sequence_methods,
33)
34from cryptography.x509.name import Name, _ASN1Type
35from cryptography.x509.oid import ObjectIdentifier
38_EARLIEST_UTC_TIME = datetime.datetime(1950, 1, 1)
41class AttributeNotFound(Exception):
42 def __init__(self, msg: str, oid: ObjectIdentifier) -> None:
43 super(AttributeNotFound, self).__init__(msg)
44 self.oid = oid
47def _reject_duplicate_extension(
48 extension: Extension[ExtensionType],
49 extensions: typing.List[Extension[ExtensionType]],
50) -> None:
51 # This is quadratic in the number of extensions
52 for e in extensions:
53 if e.oid == extension.oid:
54 raise ValueError("This extension has already been set.")
57def _reject_duplicate_attribute(
58 oid: ObjectIdentifier,
59 attributes: typing.List[
60 typing.Tuple[ObjectIdentifier, bytes, typing.Optional[int]]
61 ],
62) -> None:
63 # This is quadratic in the number of attributes
64 for attr_oid, _, _ in attributes:
65 if attr_oid == oid:
66 raise ValueError("This attribute has already been set.")
69def _convert_to_naive_utc_time(time: datetime.datetime) -> datetime.datetime:
70 """Normalizes a datetime to a naive datetime in UTC.
72 time -- datetime to normalize. Assumed to be in UTC if not timezone
73 aware.
74 """
75 if time.tzinfo is not None:
76 offset = time.utcoffset()
77 offset = offset if offset else datetime.timedelta()
78 return time.replace(tzinfo=None) - offset
79 else:
80 return time
83class Attribute:
84 def __init__(
85 self,
86 oid: ObjectIdentifier,
87 value: bytes,
88 _type: int = _ASN1Type.UTF8String.value,
89 ) -> None:
90 self._oid = oid
91 self._value = value
92 self._type = _type
94 @property
95 def oid(self) -> ObjectIdentifier:
96 return self._oid
98 @property
99 def value(self) -> bytes:
100 return self._value
102 def __repr__(self) -> str:
103 return "<Attribute(oid={}, value={!r})>".format(self.oid, self.value)
105 def __eq__(self, other: object) -> bool:
106 if not isinstance(other, Attribute):
107 return NotImplemented
109 return (
110 self.oid == other.oid
111 and self.value == other.value
112 and self._type == other._type
113 )
115 def __hash__(self) -> int:
116 return hash((self.oid, self.value, self._type))
119class Attributes:
120 def __init__(
121 self,
122 attributes: typing.Iterable[Attribute],
123 ) -> None:
124 self._attributes = list(attributes)
126 __len__, __iter__, __getitem__ = _make_sequence_methods("_attributes")
128 def __repr__(self) -> str:
129 return "<Attributes({})>".format(self._attributes)
131 def get_attribute_for_oid(self, oid: ObjectIdentifier) -> Attribute:
132 for attr in self:
133 if attr.oid == oid:
134 return attr
136 raise AttributeNotFound("No {} attribute was found".format(oid), oid)
139class Version(utils.Enum):
140 v1 = 0
141 v3 = 2
144class InvalidVersion(Exception):
145 def __init__(self, msg: str, parsed_version: int) -> None:
146 super(InvalidVersion, self).__init__(msg)
147 self.parsed_version = parsed_version
150class Certificate(metaclass=abc.ABCMeta):
151 @abc.abstractmethod
152 def fingerprint(self, algorithm: hashes.HashAlgorithm) -> bytes:
153 """
154 Returns bytes using digest passed.
155 """
157 @abc.abstractproperty
158 def serial_number(self) -> int:
159 """
160 Returns certificate serial number
161 """
163 @abc.abstractproperty
164 def version(self) -> Version:
165 """
166 Returns the certificate version
167 """
169 @abc.abstractmethod
170 def public_key(self) -> CERTIFICATE_PUBLIC_KEY_TYPES:
171 """
172 Returns the public key
173 """
175 @abc.abstractproperty
176 def not_valid_before(self) -> datetime.datetime:
177 """
178 Not before time (represented as UTC datetime)
179 """
181 @abc.abstractproperty
182 def not_valid_after(self) -> datetime.datetime:
183 """
184 Not after time (represented as UTC datetime)
185 """
187 @abc.abstractproperty
188 def issuer(self) -> Name:
189 """
190 Returns the issuer name object.
191 """
193 @abc.abstractproperty
194 def subject(self) -> Name:
195 """
196 Returns the subject name object.
197 """
199 @abc.abstractproperty
200 def signature_hash_algorithm(
201 self,
202 ) -> typing.Optional[hashes.HashAlgorithm]:
203 """
204 Returns a HashAlgorithm corresponding to the type of the digest signed
205 in the certificate.
206 """
208 @abc.abstractproperty
209 def signature_algorithm_oid(self) -> ObjectIdentifier:
210 """
211 Returns the ObjectIdentifier of the signature algorithm.
212 """
214 @abc.abstractproperty
215 def extensions(self) -> Extensions:
216 """
217 Returns an Extensions object.
218 """
220 @abc.abstractproperty
221 def signature(self) -> bytes:
222 """
223 Returns the signature bytes.
224 """
226 @abc.abstractproperty
227 def tbs_certificate_bytes(self) -> bytes:
228 """
229 Returns the tbsCertificate payload bytes as defined in RFC 5280.
230 """
232 @abc.abstractproperty
233 def tbs_precertificate_bytes(self) -> bytes:
234 """
235 Returns the tbsCertificate payload bytes with the SCT list extension
236 stripped.
237 """
239 @abc.abstractmethod
240 def __eq__(self, other: object) -> bool:
241 """
242 Checks equality.
243 """
245 @abc.abstractmethod
246 def __hash__(self) -> int:
247 """
248 Computes a hash.
249 """
251 @abc.abstractmethod
252 def public_bytes(self, encoding: serialization.Encoding) -> bytes:
253 """
254 Serializes the certificate to PEM or DER format.
255 """
258# Runtime isinstance checks need this since the rust class is not a subclass.
259Certificate.register(rust_x509.Certificate)
262class RevokedCertificate(metaclass=abc.ABCMeta):
263 @abc.abstractproperty
264 def serial_number(self) -> int:
265 """
266 Returns the serial number of the revoked certificate.
267 """
269 @abc.abstractproperty
270 def revocation_date(self) -> datetime.datetime:
271 """
272 Returns the date of when this certificate was revoked.
273 """
275 @abc.abstractproperty
276 def extensions(self) -> Extensions:
277 """
278 Returns an Extensions object containing a list of Revoked extensions.
279 """
282# Runtime isinstance checks need this since the rust class is not a subclass.
283RevokedCertificate.register(rust_x509.RevokedCertificate)
286class _RawRevokedCertificate(RevokedCertificate):
287 def __init__(
288 self,
289 serial_number: int,
290 revocation_date: datetime.datetime,
291 extensions: Extensions,
292 ):
293 self._serial_number = serial_number
294 self._revocation_date = revocation_date
295 self._extensions = extensions
297 @property
298 def serial_number(self) -> int:
299 return self._serial_number
301 @property
302 def revocation_date(self) -> datetime.datetime:
303 return self._revocation_date
305 @property
306 def extensions(self) -> Extensions:
307 return self._extensions
310class CertificateRevocationList(metaclass=abc.ABCMeta):
311 @abc.abstractmethod
312 def public_bytes(self, encoding: serialization.Encoding) -> bytes:
313 """
314 Serializes the CRL to PEM or DER format.
315 """
317 @abc.abstractmethod
318 def fingerprint(self, algorithm: hashes.HashAlgorithm) -> bytes:
319 """
320 Returns bytes using digest passed.
321 """
323 @abc.abstractmethod
324 def get_revoked_certificate_by_serial_number(
325 self, serial_number: int
326 ) -> typing.Optional[RevokedCertificate]:
327 """
328 Returns an instance of RevokedCertificate or None if the serial_number
329 is not in the CRL.
330 """
332 @abc.abstractproperty
333 def signature_hash_algorithm(
334 self,
335 ) -> typing.Optional[hashes.HashAlgorithm]:
336 """
337 Returns a HashAlgorithm corresponding to the type of the digest signed
338 in the certificate.
339 """
341 @abc.abstractproperty
342 def signature_algorithm_oid(self) -> ObjectIdentifier:
343 """
344 Returns the ObjectIdentifier of the signature algorithm.
345 """
347 @abc.abstractproperty
348 def issuer(self) -> Name:
349 """
350 Returns the X509Name with the issuer of this CRL.
351 """
353 @abc.abstractproperty
354 def next_update(self) -> typing.Optional[datetime.datetime]:
355 """
356 Returns the date of next update for this CRL.
357 """
359 @abc.abstractproperty
360 def last_update(self) -> datetime.datetime:
361 """
362 Returns the date of last update for this CRL.
363 """
365 @abc.abstractproperty
366 def extensions(self) -> Extensions:
367 """
368 Returns an Extensions object containing a list of CRL extensions.
369 """
371 @abc.abstractproperty
372 def signature(self) -> bytes:
373 """
374 Returns the signature bytes.
375 """
377 @abc.abstractproperty
378 def tbs_certlist_bytes(self) -> bytes:
379 """
380 Returns the tbsCertList payload bytes as defined in RFC 5280.
381 """
383 @abc.abstractmethod
384 def __eq__(self, other: object) -> bool:
385 """
386 Checks equality.
387 """
389 @abc.abstractmethod
390 def __len__(self) -> int:
391 """
392 Number of revoked certificates in the CRL.
393 """
395 @typing.overload
396 def __getitem__(self, idx: int) -> RevokedCertificate:
397 ...
399 @typing.overload
400 def __getitem__(self, idx: slice) -> typing.List[RevokedCertificate]:
401 ...
403 @abc.abstractmethod
404 def __getitem__(
405 self, idx: typing.Union[int, slice]
406 ) -> typing.Union[RevokedCertificate, typing.List[RevokedCertificate]]:
407 """
408 Returns a revoked certificate (or slice of revoked certificates).
409 """
411 @abc.abstractmethod
412 def __iter__(self) -> typing.Iterator[RevokedCertificate]:
413 """
414 Iterator over the revoked certificates
415 """
417 @abc.abstractmethod
418 def is_signature_valid(
419 self, public_key: CERTIFICATE_ISSUER_PUBLIC_KEY_TYPES
420 ) -> bool:
421 """
422 Verifies signature of revocation list against given public key.
423 """
426CertificateRevocationList.register(rust_x509.CertificateRevocationList)
429class CertificateSigningRequest(metaclass=abc.ABCMeta):
430 @abc.abstractmethod
431 def __eq__(self, other: object) -> bool:
432 """
433 Checks equality.
434 """
436 @abc.abstractmethod
437 def __hash__(self) -> int:
438 """
439 Computes a hash.
440 """
442 @abc.abstractmethod
443 def public_key(self) -> CERTIFICATE_PUBLIC_KEY_TYPES:
444 """
445 Returns the public key
446 """
448 @abc.abstractproperty
449 def subject(self) -> Name:
450 """
451 Returns the subject name object.
452 """
454 @abc.abstractproperty
455 def signature_hash_algorithm(
456 self,
457 ) -> typing.Optional[hashes.HashAlgorithm]:
458 """
459 Returns a HashAlgorithm corresponding to the type of the digest signed
460 in the certificate.
461 """
463 @abc.abstractproperty
464 def signature_algorithm_oid(self) -> ObjectIdentifier:
465 """
466 Returns the ObjectIdentifier of the signature algorithm.
467 """
469 @abc.abstractproperty
470 def extensions(self) -> Extensions:
471 """
472 Returns the extensions in the signing request.
473 """
475 @abc.abstractproperty
476 def attributes(self) -> Attributes:
477 """
478 Returns an Attributes object.
479 """
481 @abc.abstractmethod
482 def public_bytes(self, encoding: serialization.Encoding) -> bytes:
483 """
484 Encodes the request to PEM or DER format.
485 """
487 @abc.abstractproperty
488 def signature(self) -> bytes:
489 """
490 Returns the signature bytes.
491 """
493 @abc.abstractproperty
494 def tbs_certrequest_bytes(self) -> bytes:
495 """
496 Returns the PKCS#10 CertificationRequestInfo bytes as defined in RFC
497 2986.
498 """
500 @abc.abstractproperty
501 def is_signature_valid(self) -> bool:
502 """
503 Verifies signature of signing request.
504 """
506 @abc.abstractmethod
507 def get_attribute_for_oid(self, oid: ObjectIdentifier) -> bytes:
508 """
509 Get the attribute value for a given OID.
510 """
513# Runtime isinstance checks need this since the rust class is not a subclass.
514CertificateSigningRequest.register(rust_x509.CertificateSigningRequest)
517# Backend argument preserved for API compatibility, but ignored.
518def load_pem_x509_certificate(
519 data: bytes, backend: typing.Any = None
520) -> Certificate:
521 return rust_x509.load_pem_x509_certificate(data)
524# Backend argument preserved for API compatibility, but ignored.
525def load_der_x509_certificate(
526 data: bytes, backend: typing.Any = None
527) -> Certificate:
528 return rust_x509.load_der_x509_certificate(data)
531# Backend argument preserved for API compatibility, but ignored.
532def load_pem_x509_csr(
533 data: bytes, backend: typing.Any = None
534) -> CertificateSigningRequest:
535 return rust_x509.load_pem_x509_csr(data)
538# Backend argument preserved for API compatibility, but ignored.
539def load_der_x509_csr(
540 data: bytes, backend: typing.Any = None
541) -> CertificateSigningRequest:
542 return rust_x509.load_der_x509_csr(data)
545# Backend argument preserved for API compatibility, but ignored.
546def load_pem_x509_crl(
547 data: bytes, backend: typing.Any = None
548) -> CertificateRevocationList:
549 return rust_x509.load_pem_x509_crl(data)
552# Backend argument preserved for API compatibility, but ignored.
553def load_der_x509_crl(
554 data: bytes, backend: typing.Any = None
555) -> CertificateRevocationList:
556 return rust_x509.load_der_x509_crl(data)
559class CertificateSigningRequestBuilder:
560 def __init__(
561 self,
562 subject_name: typing.Optional[Name] = None,
563 extensions: typing.List[Extension[ExtensionType]] = [],
564 attributes: typing.List[
565 typing.Tuple[ObjectIdentifier, bytes, typing.Optional[int]]
566 ] = [],
567 ):
568 """
569 Creates an empty X.509 certificate request (v1).
570 """
571 self._subject_name = subject_name
572 self._extensions = extensions
573 self._attributes = attributes
575 def subject_name(self, name: Name) -> "CertificateSigningRequestBuilder":
576 """
577 Sets the certificate requestor's distinguished name.
578 """
579 if not isinstance(name, Name):
580 raise TypeError("Expecting x509.Name object.")
581 if self._subject_name is not None:
582 raise ValueError("The subject name may only be set once.")
583 return CertificateSigningRequestBuilder(
584 name, self._extensions, self._attributes
585 )
587 def add_extension(
588 self, extval: ExtensionType, critical: bool
589 ) -> "CertificateSigningRequestBuilder":
590 """
591 Adds an X.509 extension to the certificate request.
592 """
593 if not isinstance(extval, ExtensionType):
594 raise TypeError("extension must be an ExtensionType")
596 extension = Extension(extval.oid, critical, extval)
597 _reject_duplicate_extension(extension, self._extensions)
599 return CertificateSigningRequestBuilder(
600 self._subject_name,
601 self._extensions + [extension],
602 self._attributes,
603 )
605 def add_attribute(
606 self,
607 oid: ObjectIdentifier,
608 value: bytes,
609 *,
610 _tag: typing.Optional[_ASN1Type] = None,
611 ) -> "CertificateSigningRequestBuilder":
612 """
613 Adds an X.509 attribute with an OID and associated value.
614 """
615 if not isinstance(oid, ObjectIdentifier):
616 raise TypeError("oid must be an ObjectIdentifier")
618 if not isinstance(value, bytes):
619 raise TypeError("value must be bytes")
621 if _tag is not None and not isinstance(_tag, _ASN1Type):
622 raise TypeError("tag must be _ASN1Type")
624 _reject_duplicate_attribute(oid, self._attributes)
626 if _tag is not None:
627 tag = _tag.value
628 else:
629 tag = None
631 return CertificateSigningRequestBuilder(
632 self._subject_name,
633 self._extensions,
634 self._attributes + [(oid, value, tag)],
635 )
637 def sign(
638 self,
639 private_key: CERTIFICATE_PRIVATE_KEY_TYPES,
640 algorithm: typing.Optional[hashes.HashAlgorithm],
641 backend: typing.Any = None,
642 ) -> CertificateSigningRequest:
643 """
644 Signs the request using the requestor's private key.
645 """
646 if self._subject_name is None:
647 raise ValueError("A CertificateSigningRequest must have a subject")
648 return rust_x509.create_x509_csr(self, private_key, algorithm)
651class CertificateBuilder:
652 _extensions: typing.List[Extension[ExtensionType]]
654 def __init__(
655 self,
656 issuer_name: typing.Optional[Name] = None,
657 subject_name: typing.Optional[Name] = None,
658 public_key: typing.Optional[CERTIFICATE_PUBLIC_KEY_TYPES] = None,
659 serial_number: typing.Optional[int] = None,
660 not_valid_before: typing.Optional[datetime.datetime] = None,
661 not_valid_after: typing.Optional[datetime.datetime] = None,
662 extensions: typing.List[Extension[ExtensionType]] = [],
663 ) -> None:
664 self._version = Version.v3
665 self._issuer_name = issuer_name
666 self._subject_name = subject_name
667 self._public_key = public_key
668 self._serial_number = serial_number
669 self._not_valid_before = not_valid_before
670 self._not_valid_after = not_valid_after
671 self._extensions = extensions
673 def issuer_name(self, name: Name) -> "CertificateBuilder":
674 """
675 Sets the CA's distinguished name.
676 """
677 if not isinstance(name, Name):
678 raise TypeError("Expecting x509.Name object.")
679 if self._issuer_name is not None:
680 raise ValueError("The issuer name may only be set once.")
681 return CertificateBuilder(
682 name,
683 self._subject_name,
684 self._public_key,
685 self._serial_number,
686 self._not_valid_before,
687 self._not_valid_after,
688 self._extensions,
689 )
691 def subject_name(self, name: Name) -> "CertificateBuilder":
692 """
693 Sets the requestor's distinguished name.
694 """
695 if not isinstance(name, Name):
696 raise TypeError("Expecting x509.Name object.")
697 if self._subject_name is not None:
698 raise ValueError("The subject name may only be set once.")
699 return CertificateBuilder(
700 self._issuer_name,
701 name,
702 self._public_key,
703 self._serial_number,
704 self._not_valid_before,
705 self._not_valid_after,
706 self._extensions,
707 )
709 def public_key(
710 self,
711 key: CERTIFICATE_PUBLIC_KEY_TYPES,
712 ) -> "CertificateBuilder":
713 """
714 Sets the requestor's public key (as found in the signing request).
715 """
716 if not isinstance(
717 key,
718 (
719 dsa.DSAPublicKey,
720 rsa.RSAPublicKey,
721 ec.EllipticCurvePublicKey,
722 ed25519.Ed25519PublicKey,
723 ed448.Ed448PublicKey,
724 x25519.X25519PublicKey,
725 x448.X448PublicKey,
726 ),
727 ):
728 raise TypeError(
729 "Expecting one of DSAPublicKey, RSAPublicKey,"
730 " EllipticCurvePublicKey, Ed25519PublicKey,"
731 " Ed448PublicKey, X25519PublicKey, or "
732 "X448PublicKey."
733 )
734 if self._public_key is not None:
735 raise ValueError("The public key may only be set once.")
736 return CertificateBuilder(
737 self._issuer_name,
738 self._subject_name,
739 key,
740 self._serial_number,
741 self._not_valid_before,
742 self._not_valid_after,
743 self._extensions,
744 )
746 def serial_number(self, number: int) -> "CertificateBuilder":
747 """
748 Sets the certificate serial number.
749 """
750 if not isinstance(number, int):
751 raise TypeError("Serial number must be of integral type.")
752 if self._serial_number is not None:
753 raise ValueError("The serial number may only be set once.")
754 if number <= 0:
755 raise ValueError("The serial number should be positive.")
757 # ASN.1 integers are always signed, so most significant bit must be
758 # zero.
759 if number.bit_length() >= 160: # As defined in RFC 5280
760 raise ValueError(
761 "The serial number should not be more than 159 " "bits."
762 )
763 return CertificateBuilder(
764 self._issuer_name,
765 self._subject_name,
766 self._public_key,
767 number,
768 self._not_valid_before,
769 self._not_valid_after,
770 self._extensions,
771 )
773 def not_valid_before(
774 self, time: datetime.datetime
775 ) -> "CertificateBuilder":
776 """
777 Sets the certificate activation time.
778 """
779 if not isinstance(time, datetime.datetime):
780 raise TypeError("Expecting datetime object.")
781 if self._not_valid_before is not None:
782 raise ValueError("The not valid before may only be set once.")
783 time = _convert_to_naive_utc_time(time)
784 if time < _EARLIEST_UTC_TIME:
785 raise ValueError(
786 "The not valid before date must be on or after"
787 " 1950 January 1)."
788 )
789 if self._not_valid_after is not None and time > self._not_valid_after:
790 raise ValueError(
791 "The not valid before date must be before the not valid after "
792 "date."
793 )
794 return CertificateBuilder(
795 self._issuer_name,
796 self._subject_name,
797 self._public_key,
798 self._serial_number,
799 time,
800 self._not_valid_after,
801 self._extensions,
802 )
804 def not_valid_after(self, time: datetime.datetime) -> "CertificateBuilder":
805 """
806 Sets the certificate expiration time.
807 """
808 if not isinstance(time, datetime.datetime):
809 raise TypeError("Expecting datetime object.")
810 if self._not_valid_after is not None:
811 raise ValueError("The not valid after may only be set once.")
812 time = _convert_to_naive_utc_time(time)
813 if time < _EARLIEST_UTC_TIME:
814 raise ValueError(
815 "The not valid after date must be on or after"
816 " 1950 January 1."
817 )
818 if (
819 self._not_valid_before is not None
820 and time < self._not_valid_before
821 ):
822 raise ValueError(
823 "The not valid after date must be after the not valid before "
824 "date."
825 )
826 return CertificateBuilder(
827 self._issuer_name,
828 self._subject_name,
829 self._public_key,
830 self._serial_number,
831 self._not_valid_before,
832 time,
833 self._extensions,
834 )
836 def add_extension(
837 self, extval: ExtensionType, critical: bool
838 ) -> "CertificateBuilder":
839 """
840 Adds an X.509 extension to the certificate.
841 """
842 if not isinstance(extval, ExtensionType):
843 raise TypeError("extension must be an ExtensionType")
845 extension = Extension(extval.oid, critical, extval)
846 _reject_duplicate_extension(extension, self._extensions)
848 return CertificateBuilder(
849 self._issuer_name,
850 self._subject_name,
851 self._public_key,
852 self._serial_number,
853 self._not_valid_before,
854 self._not_valid_after,
855 self._extensions + [extension],
856 )
858 def sign(
859 self,
860 private_key: CERTIFICATE_PRIVATE_KEY_TYPES,
861 algorithm: typing.Optional[hashes.HashAlgorithm],
862 backend: typing.Any = None,
863 ) -> Certificate:
864 """
865 Signs the certificate using the CA's private key.
866 """
867 if self._subject_name is None:
868 raise ValueError("A certificate must have a subject name")
870 if self._issuer_name is None:
871 raise ValueError("A certificate must have an issuer name")
873 if self._serial_number is None:
874 raise ValueError("A certificate must have a serial number")
876 if self._not_valid_before is None:
877 raise ValueError("A certificate must have a not valid before time")
879 if self._not_valid_after is None:
880 raise ValueError("A certificate must have a not valid after time")
882 if self._public_key is None:
883 raise ValueError("A certificate must have a public key")
885 return rust_x509.create_x509_certificate(self, private_key, algorithm)
888class CertificateRevocationListBuilder:
889 _extensions: typing.List[Extension[ExtensionType]]
890 _revoked_certificates: typing.List[RevokedCertificate]
892 def __init__(
893 self,
894 issuer_name: typing.Optional[Name] = None,
895 last_update: typing.Optional[datetime.datetime] = None,
896 next_update: typing.Optional[datetime.datetime] = None,
897 extensions: typing.List[Extension[ExtensionType]] = [],
898 revoked_certificates: typing.List[RevokedCertificate] = [],
899 ):
900 self._issuer_name = issuer_name
901 self._last_update = last_update
902 self._next_update = next_update
903 self._extensions = extensions
904 self._revoked_certificates = revoked_certificates
906 def issuer_name(
907 self, issuer_name: Name
908 ) -> "CertificateRevocationListBuilder":
909 if not isinstance(issuer_name, Name):
910 raise TypeError("Expecting x509.Name object.")
911 if self._issuer_name is not None:
912 raise ValueError("The issuer name may only be set once.")
913 return CertificateRevocationListBuilder(
914 issuer_name,
915 self._last_update,
916 self._next_update,
917 self._extensions,
918 self._revoked_certificates,
919 )
921 def last_update(
922 self, last_update: datetime.datetime
923 ) -> "CertificateRevocationListBuilder":
924 if not isinstance(last_update, datetime.datetime):
925 raise TypeError("Expecting datetime object.")
926 if self._last_update is not None:
927 raise ValueError("Last update may only be set once.")
928 last_update = _convert_to_naive_utc_time(last_update)
929 if last_update < _EARLIEST_UTC_TIME:
930 raise ValueError(
931 "The last update date must be on or after" " 1950 January 1."
932 )
933 if self._next_update is not None and last_update > self._next_update:
934 raise ValueError(
935 "The last update date must be before the next update date."
936 )
937 return CertificateRevocationListBuilder(
938 self._issuer_name,
939 last_update,
940 self._next_update,
941 self._extensions,
942 self._revoked_certificates,
943 )
945 def next_update(
946 self, next_update: datetime.datetime
947 ) -> "CertificateRevocationListBuilder":
948 if not isinstance(next_update, datetime.datetime):
949 raise TypeError("Expecting datetime object.")
950 if self._next_update is not None:
951 raise ValueError("Last update may only be set once.")
952 next_update = _convert_to_naive_utc_time(next_update)
953 if next_update < _EARLIEST_UTC_TIME:
954 raise ValueError(
955 "The last update date must be on or after" " 1950 January 1."
956 )
957 if self._last_update is not None and next_update < self._last_update:
958 raise ValueError(
959 "The next update date must be after the last update date."
960 )
961 return CertificateRevocationListBuilder(
962 self._issuer_name,
963 self._last_update,
964 next_update,
965 self._extensions,
966 self._revoked_certificates,
967 )
969 def add_extension(
970 self, extval: ExtensionType, critical: bool
971 ) -> "CertificateRevocationListBuilder":
972 """
973 Adds an X.509 extension to the certificate revocation list.
974 """
975 if not isinstance(extval, ExtensionType):
976 raise TypeError("extension must be an ExtensionType")
978 extension = Extension(extval.oid, critical, extval)
979 _reject_duplicate_extension(extension, self._extensions)
980 return CertificateRevocationListBuilder(
981 self._issuer_name,
982 self._last_update,
983 self._next_update,
984 self._extensions + [extension],
985 self._revoked_certificates,
986 )
988 def add_revoked_certificate(
989 self, revoked_certificate: RevokedCertificate
990 ) -> "CertificateRevocationListBuilder":
991 """
992 Adds a revoked certificate to the CRL.
993 """
994 if not isinstance(revoked_certificate, RevokedCertificate):
995 raise TypeError("Must be an instance of RevokedCertificate")
997 return CertificateRevocationListBuilder(
998 self._issuer_name,
999 self._last_update,
1000 self._next_update,
1001 self._extensions,
1002 self._revoked_certificates + [revoked_certificate],
1003 )
1005 def sign(
1006 self,
1007 private_key: CERTIFICATE_PRIVATE_KEY_TYPES,
1008 algorithm: typing.Optional[hashes.HashAlgorithm],
1009 backend: typing.Any = None,
1010 ) -> CertificateRevocationList:
1011 if self._issuer_name is None:
1012 raise ValueError("A CRL must have an issuer name")
1014 if self._last_update is None:
1015 raise ValueError("A CRL must have a last update time")
1017 if self._next_update is None:
1018 raise ValueError("A CRL must have a next update time")
1020 return rust_x509.create_x509_crl(self, private_key, algorithm)
1023class RevokedCertificateBuilder:
1024 def __init__(
1025 self,
1026 serial_number: typing.Optional[int] = None,
1027 revocation_date: typing.Optional[datetime.datetime] = None,
1028 extensions: typing.List[Extension[ExtensionType]] = [],
1029 ):
1030 self._serial_number = serial_number
1031 self._revocation_date = revocation_date
1032 self._extensions = extensions
1034 def serial_number(self, number: int) -> "RevokedCertificateBuilder":
1035 if not isinstance(number, int):
1036 raise TypeError("Serial number must be of integral type.")
1037 if self._serial_number is not None:
1038 raise ValueError("The serial number may only be set once.")
1039 if number <= 0:
1040 raise ValueError("The serial number should be positive")
1042 # ASN.1 integers are always signed, so most significant bit must be
1043 # zero.
1044 if number.bit_length() >= 160: # As defined in RFC 5280
1045 raise ValueError(
1046 "The serial number should not be more than 159 " "bits."
1047 )
1048 return RevokedCertificateBuilder(
1049 number, self._revocation_date, self._extensions
1050 )
1052 def revocation_date(
1053 self, time: datetime.datetime
1054 ) -> "RevokedCertificateBuilder":
1055 if not isinstance(time, datetime.datetime):
1056 raise TypeError("Expecting datetime object.")
1057 if self._revocation_date is not None:
1058 raise ValueError("The revocation date may only be set once.")
1059 time = _convert_to_naive_utc_time(time)
1060 if time < _EARLIEST_UTC_TIME:
1061 raise ValueError(
1062 "The revocation date must be on or after" " 1950 January 1."
1063 )
1064 return RevokedCertificateBuilder(
1065 self._serial_number, time, self._extensions
1066 )
1068 def add_extension(
1069 self, extval: ExtensionType, critical: bool
1070 ) -> "RevokedCertificateBuilder":
1071 if not isinstance(extval, ExtensionType):
1072 raise TypeError("extension must be an ExtensionType")
1074 extension = Extension(extval.oid, critical, extval)
1075 _reject_duplicate_extension(extension, self._extensions)
1076 return RevokedCertificateBuilder(
1077 self._serial_number,
1078 self._revocation_date,
1079 self._extensions + [extension],
1080 )
1082 def build(self, backend: typing.Any = None) -> RevokedCertificate:
1083 if self._serial_number is None:
1084 raise ValueError("A revoked certificate must have a serial number")
1085 if self._revocation_date is None:
1086 raise ValueError(
1087 "A revoked certificate must have a revocation date"
1088 )
1089 return _RawRevokedCertificate(
1090 self._serial_number,
1091 self._revocation_date,
1092 Extensions(self._extensions),
1093 )
1096def random_serial_number() -> int:
1097 return int.from_bytes(os.urandom(20), "big") >> 1