1# This file is dual licensed under the terms of the Apache License, Version
2# 2.0, and the BSD License. See the LICENSE file in the root of this repository
3# for complete details.
4
5from __future__ import annotations
6
7import abc
8import datetime
9import os
10import typing
11import warnings
12
13from cryptography import utils
14from cryptography.hazmat.bindings._rust import x509 as rust_x509
15from cryptography.hazmat.primitives import hashes, serialization
16from cryptography.hazmat.primitives.asymmetric import (
17 dsa,
18 ec,
19 ed448,
20 ed25519,
21 padding,
22 rsa,
23 x448,
24 x25519,
25)
26from cryptography.hazmat.primitives.asymmetric.types import (
27 CertificateIssuerPrivateKeyTypes,
28 CertificateIssuerPublicKeyTypes,
29 CertificatePublicKeyTypes,
30)
31from cryptography.x509.extensions import (
32 Extension,
33 Extensions,
34 ExtensionType,
35 _make_sequence_methods,
36)
37from cryptography.x509.name import Name, _ASN1Type
38from cryptography.x509.oid import ObjectIdentifier
39
40_EARLIEST_UTC_TIME = datetime.datetime(1950, 1, 1)
41
42# This must be kept in sync with sign.rs's list of allowable types in
43# identify_hash_type
44_AllowedHashTypes = typing.Union[
45 hashes.SHA224,
46 hashes.SHA256,
47 hashes.SHA384,
48 hashes.SHA512,
49 hashes.SHA3_224,
50 hashes.SHA3_256,
51 hashes.SHA3_384,
52 hashes.SHA3_512,
53]
54
55
56class AttributeNotFound(Exception):
57 def __init__(self, msg: str, oid: ObjectIdentifier) -> None:
58 super().__init__(msg)
59 self.oid = oid
60
61
62def _reject_duplicate_extension(
63 extension: Extension[ExtensionType],
64 extensions: list[Extension[ExtensionType]],
65) -> None:
66 # This is quadratic in the number of extensions
67 for e in extensions:
68 if e.oid == extension.oid:
69 raise ValueError("This extension has already been set.")
70
71
72def _reject_duplicate_attribute(
73 oid: ObjectIdentifier,
74 attributes: list[tuple[ObjectIdentifier, bytes, int | None]],
75) -> None:
76 # This is quadratic in the number of attributes
77 for attr_oid, _, _ in attributes:
78 if attr_oid == oid:
79 raise ValueError("This attribute has already been set.")
80
81
82def _convert_to_naive_utc_time(time: datetime.datetime) -> datetime.datetime:
83 """Normalizes a datetime to a naive datetime in UTC.
84
85 time -- datetime to normalize. Assumed to be in UTC if not timezone
86 aware.
87 """
88 if time.tzinfo is not None:
89 offset = time.utcoffset()
90 offset = offset if offset else datetime.timedelta()
91 return time.replace(tzinfo=None) - offset
92 else:
93 return time
94
95
96class Attribute:
97 def __init__(
98 self,
99 oid: ObjectIdentifier,
100 value: bytes,
101 _type: int = _ASN1Type.UTF8String.value,
102 ) -> None:
103 self._oid = oid
104 self._value = value
105 self._type = _type
106
107 @property
108 def oid(self) -> ObjectIdentifier:
109 return self._oid
110
111 @property
112 def value(self) -> bytes:
113 return self._value
114
115 def __repr__(self) -> str:
116 return f"<Attribute(oid={self.oid}, value={self.value!r})>"
117
118 def __eq__(self, other: object) -> bool:
119 if not isinstance(other, Attribute):
120 return NotImplemented
121
122 return (
123 self.oid == other.oid
124 and self.value == other.value
125 and self._type == other._type
126 )
127
128 def __hash__(self) -> int:
129 return hash((self.oid, self.value, self._type))
130
131
132class Attributes:
133 def __init__(
134 self,
135 attributes: typing.Iterable[Attribute],
136 ) -> None:
137 self._attributes = list(attributes)
138
139 __len__, __iter__, __getitem__ = _make_sequence_methods("_attributes")
140
141 def __repr__(self) -> str:
142 return f"<Attributes({self._attributes})>"
143
144 def get_attribute_for_oid(self, oid: ObjectIdentifier) -> Attribute:
145 for attr in self:
146 if attr.oid == oid:
147 return attr
148
149 raise AttributeNotFound(f"No {oid} attribute was found", oid)
150
151
152class Version(utils.Enum):
153 v1 = 0
154 v3 = 2
155
156
157class InvalidVersion(Exception):
158 def __init__(self, msg: str, parsed_version: int) -> None:
159 super().__init__(msg)
160 self.parsed_version = parsed_version
161
162
163class Certificate(metaclass=abc.ABCMeta):
164 @abc.abstractmethod
165 def fingerprint(self, algorithm: hashes.HashAlgorithm) -> bytes:
166 """
167 Returns bytes using digest passed.
168 """
169
170 @property
171 @abc.abstractmethod
172 def serial_number(self) -> int:
173 """
174 Returns certificate serial number
175 """
176
177 @property
178 @abc.abstractmethod
179 def version(self) -> Version:
180 """
181 Returns the certificate version
182 """
183
184 @abc.abstractmethod
185 def public_key(self) -> CertificatePublicKeyTypes:
186 """
187 Returns the public key
188 """
189
190 @property
191 @abc.abstractmethod
192 def not_valid_before(self) -> datetime.datetime:
193 """
194 Not before time (represented as UTC datetime)
195 """
196
197 @property
198 @abc.abstractmethod
199 def not_valid_before_utc(self) -> datetime.datetime:
200 """
201 Not before time (represented as a non-naive UTC datetime)
202 """
203
204 @property
205 @abc.abstractmethod
206 def not_valid_after(self) -> datetime.datetime:
207 """
208 Not after time (represented as UTC datetime)
209 """
210
211 @property
212 @abc.abstractmethod
213 def not_valid_after_utc(self) -> datetime.datetime:
214 """
215 Not after time (represented as a non-naive UTC datetime)
216 """
217
218 @property
219 @abc.abstractmethod
220 def issuer(self) -> Name:
221 """
222 Returns the issuer name object.
223 """
224
225 @property
226 @abc.abstractmethod
227 def subject(self) -> Name:
228 """
229 Returns the subject name object.
230 """
231
232 @property
233 @abc.abstractmethod
234 def signature_hash_algorithm(
235 self,
236 ) -> hashes.HashAlgorithm | None:
237 """
238 Returns a HashAlgorithm corresponding to the type of the digest signed
239 in the certificate.
240 """
241
242 @property
243 @abc.abstractmethod
244 def signature_algorithm_oid(self) -> ObjectIdentifier:
245 """
246 Returns the ObjectIdentifier of the signature algorithm.
247 """
248
249 @property
250 @abc.abstractmethod
251 def signature_algorithm_parameters(
252 self,
253 ) -> None | padding.PSS | padding.PKCS1v15 | ec.ECDSA:
254 """
255 Returns the signature algorithm parameters.
256 """
257
258 @property
259 @abc.abstractmethod
260 def extensions(self) -> Extensions:
261 """
262 Returns an Extensions object.
263 """
264
265 @property
266 @abc.abstractmethod
267 def signature(self) -> bytes:
268 """
269 Returns the signature bytes.
270 """
271
272 @property
273 @abc.abstractmethod
274 def tbs_certificate_bytes(self) -> bytes:
275 """
276 Returns the tbsCertificate payload bytes as defined in RFC 5280.
277 """
278
279 @property
280 @abc.abstractmethod
281 def tbs_precertificate_bytes(self) -> bytes:
282 """
283 Returns the tbsCertificate payload bytes with the SCT list extension
284 stripped.
285 """
286
287 @abc.abstractmethod
288 def __eq__(self, other: object) -> bool:
289 """
290 Checks equality.
291 """
292
293 @abc.abstractmethod
294 def __hash__(self) -> int:
295 """
296 Computes a hash.
297 """
298
299 @abc.abstractmethod
300 def public_bytes(self, encoding: serialization.Encoding) -> bytes:
301 """
302 Serializes the certificate to PEM or DER format.
303 """
304
305 @abc.abstractmethod
306 def verify_directly_issued_by(self, issuer: Certificate) -> None:
307 """
308 This method verifies that certificate issuer name matches the
309 issuer subject name and that the certificate is signed by the
310 issuer's private key. No other validation is performed.
311 """
312
313
314# Runtime isinstance checks need this since the rust class is not a subclass.
315Certificate.register(rust_x509.Certificate)
316
317
318class RevokedCertificate(metaclass=abc.ABCMeta):
319 @property
320 @abc.abstractmethod
321 def serial_number(self) -> int:
322 """
323 Returns the serial number of the revoked certificate.
324 """
325
326 @property
327 @abc.abstractmethod
328 def revocation_date(self) -> datetime.datetime:
329 """
330 Returns the date of when this certificate was revoked.
331 """
332
333 @property
334 @abc.abstractmethod
335 def revocation_date_utc(self) -> datetime.datetime:
336 """
337 Returns the date of when this certificate was revoked as a non-naive
338 UTC datetime.
339 """
340
341 @property
342 @abc.abstractmethod
343 def extensions(self) -> Extensions:
344 """
345 Returns an Extensions object containing a list of Revoked extensions.
346 """
347
348
349# Runtime isinstance checks need this since the rust class is not a subclass.
350RevokedCertificate.register(rust_x509.RevokedCertificate)
351
352
353class _RawRevokedCertificate(RevokedCertificate):
354 def __init__(
355 self,
356 serial_number: int,
357 revocation_date: datetime.datetime,
358 extensions: Extensions,
359 ):
360 self._serial_number = serial_number
361 self._revocation_date = revocation_date
362 self._extensions = extensions
363
364 @property
365 def serial_number(self) -> int:
366 return self._serial_number
367
368 @property
369 def revocation_date(self) -> datetime.datetime:
370 warnings.warn(
371 "Properties that return a naïve datetime object have been "
372 "deprecated. Please switch to revocation_date_utc.",
373 utils.DeprecatedIn42,
374 stacklevel=2,
375 )
376 return self._revocation_date
377
378 @property
379 def revocation_date_utc(self) -> datetime.datetime:
380 return self._revocation_date.replace(tzinfo=datetime.timezone.utc)
381
382 @property
383 def extensions(self) -> Extensions:
384 return self._extensions
385
386
387class CertificateRevocationList(metaclass=abc.ABCMeta):
388 @abc.abstractmethod
389 def public_bytes(self, encoding: serialization.Encoding) -> bytes:
390 """
391 Serializes the CRL to PEM or DER format.
392 """
393
394 @abc.abstractmethod
395 def fingerprint(self, algorithm: hashes.HashAlgorithm) -> bytes:
396 """
397 Returns bytes using digest passed.
398 """
399
400 @abc.abstractmethod
401 def get_revoked_certificate_by_serial_number(
402 self, serial_number: int
403 ) -> RevokedCertificate | None:
404 """
405 Returns an instance of RevokedCertificate or None if the serial_number
406 is not in the CRL.
407 """
408
409 @property
410 @abc.abstractmethod
411 def signature_hash_algorithm(
412 self,
413 ) -> hashes.HashAlgorithm | None:
414 """
415 Returns a HashAlgorithm corresponding to the type of the digest signed
416 in the certificate.
417 """
418
419 @property
420 @abc.abstractmethod
421 def signature_algorithm_oid(self) -> ObjectIdentifier:
422 """
423 Returns the ObjectIdentifier of the signature algorithm.
424 """
425
426 @property
427 @abc.abstractmethod
428 def signature_algorithm_parameters(
429 self,
430 ) -> None | padding.PSS | padding.PKCS1v15 | ec.ECDSA:
431 """
432 Returns the signature algorithm parameters.
433 """
434
435 @property
436 @abc.abstractmethod
437 def issuer(self) -> Name:
438 """
439 Returns the X509Name with the issuer of this CRL.
440 """
441
442 @property
443 @abc.abstractmethod
444 def next_update(self) -> datetime.datetime | None:
445 """
446 Returns the date of next update for this CRL.
447 """
448
449 @property
450 @abc.abstractmethod
451 def next_update_utc(self) -> datetime.datetime | None:
452 """
453 Returns the date of next update for this CRL as a non-naive UTC
454 datetime.
455 """
456
457 @property
458 @abc.abstractmethod
459 def last_update(self) -> datetime.datetime:
460 """
461 Returns the date of last update for this CRL.
462 """
463
464 @property
465 @abc.abstractmethod
466 def last_update_utc(self) -> datetime.datetime:
467 """
468 Returns the date of last update for this CRL as a non-naive UTC
469 datetime.
470 """
471
472 @property
473 @abc.abstractmethod
474 def extensions(self) -> Extensions:
475 """
476 Returns an Extensions object containing a list of CRL extensions.
477 """
478
479 @property
480 @abc.abstractmethod
481 def signature(self) -> bytes:
482 """
483 Returns the signature bytes.
484 """
485
486 @property
487 @abc.abstractmethod
488 def tbs_certlist_bytes(self) -> bytes:
489 """
490 Returns the tbsCertList payload bytes as defined in RFC 5280.
491 """
492
493 @abc.abstractmethod
494 def __eq__(self, other: object) -> bool:
495 """
496 Checks equality.
497 """
498
499 @abc.abstractmethod
500 def __len__(self) -> int:
501 """
502 Number of revoked certificates in the CRL.
503 """
504
505 @typing.overload
506 def __getitem__(self, idx: int) -> RevokedCertificate:
507 ...
508
509 @typing.overload
510 def __getitem__(self, idx: slice) -> list[RevokedCertificate]:
511 ...
512
513 @abc.abstractmethod
514 def __getitem__(
515 self, idx: int | slice
516 ) -> RevokedCertificate | list[RevokedCertificate]:
517 """
518 Returns a revoked certificate (or slice of revoked certificates).
519 """
520
521 @abc.abstractmethod
522 def __iter__(self) -> typing.Iterator[RevokedCertificate]:
523 """
524 Iterator over the revoked certificates
525 """
526
527 @abc.abstractmethod
528 def is_signature_valid(
529 self, public_key: CertificateIssuerPublicKeyTypes
530 ) -> bool:
531 """
532 Verifies signature of revocation list against given public key.
533 """
534
535
536CertificateRevocationList.register(rust_x509.CertificateRevocationList)
537
538
539class CertificateSigningRequest(metaclass=abc.ABCMeta):
540 @abc.abstractmethod
541 def __eq__(self, other: object) -> bool:
542 """
543 Checks equality.
544 """
545
546 @abc.abstractmethod
547 def __hash__(self) -> int:
548 """
549 Computes a hash.
550 """
551
552 @abc.abstractmethod
553 def public_key(self) -> CertificatePublicKeyTypes:
554 """
555 Returns the public key
556 """
557
558 @property
559 @abc.abstractmethod
560 def subject(self) -> Name:
561 """
562 Returns the subject name object.
563 """
564
565 @property
566 @abc.abstractmethod
567 def signature_hash_algorithm(
568 self,
569 ) -> hashes.HashAlgorithm | None:
570 """
571 Returns a HashAlgorithm corresponding to the type of the digest signed
572 in the certificate.
573 """
574
575 @property
576 @abc.abstractmethod
577 def signature_algorithm_oid(self) -> ObjectIdentifier:
578 """
579 Returns the ObjectIdentifier of the signature algorithm.
580 """
581
582 @property
583 @abc.abstractmethod
584 def signature_algorithm_parameters(
585 self,
586 ) -> None | padding.PSS | padding.PKCS1v15 | ec.ECDSA:
587 """
588 Returns the signature algorithm parameters.
589 """
590
591 @property
592 @abc.abstractmethod
593 def extensions(self) -> Extensions:
594 """
595 Returns the extensions in the signing request.
596 """
597
598 @property
599 @abc.abstractmethod
600 def attributes(self) -> Attributes:
601 """
602 Returns an Attributes object.
603 """
604
605 @abc.abstractmethod
606 def public_bytes(self, encoding: serialization.Encoding) -> bytes:
607 """
608 Encodes the request to PEM or DER format.
609 """
610
611 @property
612 @abc.abstractmethod
613 def signature(self) -> bytes:
614 """
615 Returns the signature bytes.
616 """
617
618 @property
619 @abc.abstractmethod
620 def tbs_certrequest_bytes(self) -> bytes:
621 """
622 Returns the PKCS#10 CertificationRequestInfo bytes as defined in RFC
623 2986.
624 """
625
626 @property
627 @abc.abstractmethod
628 def is_signature_valid(self) -> bool:
629 """
630 Verifies signature of signing request.
631 """
632
633 @abc.abstractmethod
634 def get_attribute_for_oid(self, oid: ObjectIdentifier) -> bytes:
635 """
636 Get the attribute value for a given OID.
637 """
638
639
640# Runtime isinstance checks need this since the rust class is not a subclass.
641CertificateSigningRequest.register(rust_x509.CertificateSigningRequest)
642
643
644load_pem_x509_certificate = rust_x509.load_pem_x509_certificate
645load_der_x509_certificate = rust_x509.load_der_x509_certificate
646
647load_pem_x509_certificates = rust_x509.load_pem_x509_certificates
648
649load_pem_x509_csr = rust_x509.load_pem_x509_csr
650load_der_x509_csr = rust_x509.load_der_x509_csr
651
652load_pem_x509_crl = rust_x509.load_pem_x509_crl
653load_der_x509_crl = rust_x509.load_der_x509_crl
654
655
656class CertificateSigningRequestBuilder:
657 def __init__(
658 self,
659 subject_name: Name | None = None,
660 extensions: list[Extension[ExtensionType]] = [],
661 attributes: list[tuple[ObjectIdentifier, bytes, int | None]] = [],
662 ):
663 """
664 Creates an empty X.509 certificate request (v1).
665 """
666 self._subject_name = subject_name
667 self._extensions = extensions
668 self._attributes = attributes
669
670 def subject_name(self, name: Name) -> CertificateSigningRequestBuilder:
671 """
672 Sets the certificate requestor's distinguished name.
673 """
674 if not isinstance(name, Name):
675 raise TypeError("Expecting x509.Name object.")
676 if self._subject_name is not None:
677 raise ValueError("The subject name may only be set once.")
678 return CertificateSigningRequestBuilder(
679 name, self._extensions, self._attributes
680 )
681
682 def add_extension(
683 self, extval: ExtensionType, critical: bool
684 ) -> CertificateSigningRequestBuilder:
685 """
686 Adds an X.509 extension to the certificate request.
687 """
688 if not isinstance(extval, ExtensionType):
689 raise TypeError("extension must be an ExtensionType")
690
691 extension = Extension(extval.oid, critical, extval)
692 _reject_duplicate_extension(extension, self._extensions)
693
694 return CertificateSigningRequestBuilder(
695 self._subject_name,
696 [*self._extensions, extension],
697 self._attributes,
698 )
699
700 def add_attribute(
701 self,
702 oid: ObjectIdentifier,
703 value: bytes,
704 *,
705 _tag: _ASN1Type | None = None,
706 ) -> CertificateSigningRequestBuilder:
707 """
708 Adds an X.509 attribute with an OID and associated value.
709 """
710 if not isinstance(oid, ObjectIdentifier):
711 raise TypeError("oid must be an ObjectIdentifier")
712
713 if not isinstance(value, bytes):
714 raise TypeError("value must be bytes")
715
716 if _tag is not None and not isinstance(_tag, _ASN1Type):
717 raise TypeError("tag must be _ASN1Type")
718
719 _reject_duplicate_attribute(oid, self._attributes)
720
721 if _tag is not None:
722 tag = _tag.value
723 else:
724 tag = None
725
726 return CertificateSigningRequestBuilder(
727 self._subject_name,
728 self._extensions,
729 [*self._attributes, (oid, value, tag)],
730 )
731
732 def sign(
733 self,
734 private_key: CertificateIssuerPrivateKeyTypes,
735 algorithm: _AllowedHashTypes | None,
736 backend: typing.Any = None,
737 *,
738 rsa_padding: padding.PSS | padding.PKCS1v15 | None = None,
739 ) -> CertificateSigningRequest:
740 """
741 Signs the request using the requestor's private key.
742 """
743 if self._subject_name is None:
744 raise ValueError("A CertificateSigningRequest must have a subject")
745
746 if rsa_padding is not None:
747 if not isinstance(rsa_padding, (padding.PSS, padding.PKCS1v15)):
748 raise TypeError("Padding must be PSS or PKCS1v15")
749 if not isinstance(private_key, rsa.RSAPrivateKey):
750 raise TypeError("Padding is only supported for RSA keys")
751
752 return rust_x509.create_x509_csr(
753 self, private_key, algorithm, rsa_padding
754 )
755
756
757class CertificateBuilder:
758 _extensions: list[Extension[ExtensionType]]
759
760 def __init__(
761 self,
762 issuer_name: Name | None = None,
763 subject_name: Name | None = None,
764 public_key: CertificatePublicKeyTypes | None = None,
765 serial_number: int | None = None,
766 not_valid_before: datetime.datetime | None = None,
767 not_valid_after: datetime.datetime | None = None,
768 extensions: list[Extension[ExtensionType]] = [],
769 ) -> None:
770 self._version = Version.v3
771 self._issuer_name = issuer_name
772 self._subject_name = subject_name
773 self._public_key = public_key
774 self._serial_number = serial_number
775 self._not_valid_before = not_valid_before
776 self._not_valid_after = not_valid_after
777 self._extensions = extensions
778
779 def issuer_name(self, name: Name) -> CertificateBuilder:
780 """
781 Sets the CA's distinguished name.
782 """
783 if not isinstance(name, Name):
784 raise TypeError("Expecting x509.Name object.")
785 if self._issuer_name is not None:
786 raise ValueError("The issuer name may only be set once.")
787 return CertificateBuilder(
788 name,
789 self._subject_name,
790 self._public_key,
791 self._serial_number,
792 self._not_valid_before,
793 self._not_valid_after,
794 self._extensions,
795 )
796
797 def subject_name(self, name: Name) -> CertificateBuilder:
798 """
799 Sets the requestor's distinguished name.
800 """
801 if not isinstance(name, Name):
802 raise TypeError("Expecting x509.Name object.")
803 if self._subject_name is not None:
804 raise ValueError("The subject name may only be set once.")
805 return CertificateBuilder(
806 self._issuer_name,
807 name,
808 self._public_key,
809 self._serial_number,
810 self._not_valid_before,
811 self._not_valid_after,
812 self._extensions,
813 )
814
815 def public_key(
816 self,
817 key: CertificatePublicKeyTypes,
818 ) -> CertificateBuilder:
819 """
820 Sets the requestor's public key (as found in the signing request).
821 """
822 if not isinstance(
823 key,
824 (
825 dsa.DSAPublicKey,
826 rsa.RSAPublicKey,
827 ec.EllipticCurvePublicKey,
828 ed25519.Ed25519PublicKey,
829 ed448.Ed448PublicKey,
830 x25519.X25519PublicKey,
831 x448.X448PublicKey,
832 ),
833 ):
834 raise TypeError(
835 "Expecting one of DSAPublicKey, RSAPublicKey,"
836 " EllipticCurvePublicKey, Ed25519PublicKey,"
837 " Ed448PublicKey, X25519PublicKey, or "
838 "X448PublicKey."
839 )
840 if self._public_key is not None:
841 raise ValueError("The public key may only be set once.")
842 return CertificateBuilder(
843 self._issuer_name,
844 self._subject_name,
845 key,
846 self._serial_number,
847 self._not_valid_before,
848 self._not_valid_after,
849 self._extensions,
850 )
851
852 def serial_number(self, number: int) -> CertificateBuilder:
853 """
854 Sets the certificate serial number.
855 """
856 if not isinstance(number, int):
857 raise TypeError("Serial number must be of integral type.")
858 if self._serial_number is not None:
859 raise ValueError("The serial number may only be set once.")
860 if number <= 0:
861 raise ValueError("The serial number should be positive.")
862
863 # ASN.1 integers are always signed, so most significant bit must be
864 # zero.
865 if number.bit_length() >= 160: # As defined in RFC 5280
866 raise ValueError(
867 "The serial number should not be more than 159 " "bits."
868 )
869 return CertificateBuilder(
870 self._issuer_name,
871 self._subject_name,
872 self._public_key,
873 number,
874 self._not_valid_before,
875 self._not_valid_after,
876 self._extensions,
877 )
878
879 def not_valid_before(self, time: datetime.datetime) -> CertificateBuilder:
880 """
881 Sets the certificate activation time.
882 """
883 if not isinstance(time, datetime.datetime):
884 raise TypeError("Expecting datetime object.")
885 if self._not_valid_before is not None:
886 raise ValueError("The not valid before may only be set once.")
887 time = _convert_to_naive_utc_time(time)
888 if time < _EARLIEST_UTC_TIME:
889 raise ValueError(
890 "The not valid before date must be on or after"
891 " 1950 January 1)."
892 )
893 if self._not_valid_after is not None and time > self._not_valid_after:
894 raise ValueError(
895 "The not valid before date must be before the not valid after "
896 "date."
897 )
898 return CertificateBuilder(
899 self._issuer_name,
900 self._subject_name,
901 self._public_key,
902 self._serial_number,
903 time,
904 self._not_valid_after,
905 self._extensions,
906 )
907
908 def not_valid_after(self, time: datetime.datetime) -> CertificateBuilder:
909 """
910 Sets the certificate expiration time.
911 """
912 if not isinstance(time, datetime.datetime):
913 raise TypeError("Expecting datetime object.")
914 if self._not_valid_after is not None:
915 raise ValueError("The not valid after may only be set once.")
916 time = _convert_to_naive_utc_time(time)
917 if time < _EARLIEST_UTC_TIME:
918 raise ValueError(
919 "The not valid after date must be on or after"
920 " 1950 January 1."
921 )
922 if (
923 self._not_valid_before is not None
924 and time < self._not_valid_before
925 ):
926 raise ValueError(
927 "The not valid after date must be after the not valid before "
928 "date."
929 )
930 return CertificateBuilder(
931 self._issuer_name,
932 self._subject_name,
933 self._public_key,
934 self._serial_number,
935 self._not_valid_before,
936 time,
937 self._extensions,
938 )
939
940 def add_extension(
941 self, extval: ExtensionType, critical: bool
942 ) -> CertificateBuilder:
943 """
944 Adds an X.509 extension to the certificate.
945 """
946 if not isinstance(extval, ExtensionType):
947 raise TypeError("extension must be an ExtensionType")
948
949 extension = Extension(extval.oid, critical, extval)
950 _reject_duplicate_extension(extension, self._extensions)
951
952 return CertificateBuilder(
953 self._issuer_name,
954 self._subject_name,
955 self._public_key,
956 self._serial_number,
957 self._not_valid_before,
958 self._not_valid_after,
959 [*self._extensions, extension],
960 )
961
962 def sign(
963 self,
964 private_key: CertificateIssuerPrivateKeyTypes,
965 algorithm: _AllowedHashTypes | None,
966 backend: typing.Any = None,
967 *,
968 rsa_padding: padding.PSS | padding.PKCS1v15 | None = None,
969 ) -> Certificate:
970 """
971 Signs the certificate using the CA's private key.
972 """
973 if self._subject_name is None:
974 raise ValueError("A certificate must have a subject name")
975
976 if self._issuer_name is None:
977 raise ValueError("A certificate must have an issuer name")
978
979 if self._serial_number is None:
980 raise ValueError("A certificate must have a serial number")
981
982 if self._not_valid_before is None:
983 raise ValueError("A certificate must have a not valid before time")
984
985 if self._not_valid_after is None:
986 raise ValueError("A certificate must have a not valid after time")
987
988 if self._public_key is None:
989 raise ValueError("A certificate must have a public key")
990
991 if rsa_padding is not None:
992 if not isinstance(rsa_padding, (padding.PSS, padding.PKCS1v15)):
993 raise TypeError("Padding must be PSS or PKCS1v15")
994 if not isinstance(private_key, rsa.RSAPrivateKey):
995 raise TypeError("Padding is only supported for RSA keys")
996
997 return rust_x509.create_x509_certificate(
998 self, private_key, algorithm, rsa_padding
999 )
1000
1001
1002class CertificateRevocationListBuilder:
1003 _extensions: list[Extension[ExtensionType]]
1004 _revoked_certificates: list[RevokedCertificate]
1005
1006 def __init__(
1007 self,
1008 issuer_name: Name | None = None,
1009 last_update: datetime.datetime | None = None,
1010 next_update: datetime.datetime | None = None,
1011 extensions: list[Extension[ExtensionType]] = [],
1012 revoked_certificates: list[RevokedCertificate] = [],
1013 ):
1014 self._issuer_name = issuer_name
1015 self._last_update = last_update
1016 self._next_update = next_update
1017 self._extensions = extensions
1018 self._revoked_certificates = revoked_certificates
1019
1020 def issuer_name(
1021 self, issuer_name: Name
1022 ) -> CertificateRevocationListBuilder:
1023 if not isinstance(issuer_name, Name):
1024 raise TypeError("Expecting x509.Name object.")
1025 if self._issuer_name is not None:
1026 raise ValueError("The issuer name may only be set once.")
1027 return CertificateRevocationListBuilder(
1028 issuer_name,
1029 self._last_update,
1030 self._next_update,
1031 self._extensions,
1032 self._revoked_certificates,
1033 )
1034
1035 def last_update(
1036 self, last_update: datetime.datetime
1037 ) -> CertificateRevocationListBuilder:
1038 if not isinstance(last_update, datetime.datetime):
1039 raise TypeError("Expecting datetime object.")
1040 if self._last_update is not None:
1041 raise ValueError("Last update may only be set once.")
1042 last_update = _convert_to_naive_utc_time(last_update)
1043 if last_update < _EARLIEST_UTC_TIME:
1044 raise ValueError(
1045 "The last update date must be on or after" " 1950 January 1."
1046 )
1047 if self._next_update is not None and last_update > self._next_update:
1048 raise ValueError(
1049 "The last update date must be before the next update date."
1050 )
1051 return CertificateRevocationListBuilder(
1052 self._issuer_name,
1053 last_update,
1054 self._next_update,
1055 self._extensions,
1056 self._revoked_certificates,
1057 )
1058
1059 def next_update(
1060 self, next_update: datetime.datetime
1061 ) -> CertificateRevocationListBuilder:
1062 if not isinstance(next_update, datetime.datetime):
1063 raise TypeError("Expecting datetime object.")
1064 if self._next_update is not None:
1065 raise ValueError("Last update may only be set once.")
1066 next_update = _convert_to_naive_utc_time(next_update)
1067 if next_update < _EARLIEST_UTC_TIME:
1068 raise ValueError(
1069 "The last update date must be on or after" " 1950 January 1."
1070 )
1071 if self._last_update is not None and next_update < self._last_update:
1072 raise ValueError(
1073 "The next update date must be after the last update date."
1074 )
1075 return CertificateRevocationListBuilder(
1076 self._issuer_name,
1077 self._last_update,
1078 next_update,
1079 self._extensions,
1080 self._revoked_certificates,
1081 )
1082
1083 def add_extension(
1084 self, extval: ExtensionType, critical: bool
1085 ) -> CertificateRevocationListBuilder:
1086 """
1087 Adds an X.509 extension to the certificate revocation list.
1088 """
1089 if not isinstance(extval, ExtensionType):
1090 raise TypeError("extension must be an ExtensionType")
1091
1092 extension = Extension(extval.oid, critical, extval)
1093 _reject_duplicate_extension(extension, self._extensions)
1094 return CertificateRevocationListBuilder(
1095 self._issuer_name,
1096 self._last_update,
1097 self._next_update,
1098 [*self._extensions, extension],
1099 self._revoked_certificates,
1100 )
1101
1102 def add_revoked_certificate(
1103 self, revoked_certificate: RevokedCertificate
1104 ) -> CertificateRevocationListBuilder:
1105 """
1106 Adds a revoked certificate to the CRL.
1107 """
1108 if not isinstance(revoked_certificate, RevokedCertificate):
1109 raise TypeError("Must be an instance of RevokedCertificate")
1110
1111 return CertificateRevocationListBuilder(
1112 self._issuer_name,
1113 self._last_update,
1114 self._next_update,
1115 self._extensions,
1116 [*self._revoked_certificates, revoked_certificate],
1117 )
1118
1119 def sign(
1120 self,
1121 private_key: CertificateIssuerPrivateKeyTypes,
1122 algorithm: _AllowedHashTypes | None,
1123 backend: typing.Any = None,
1124 *,
1125 rsa_padding: padding.PSS | padding.PKCS1v15 | None = None,
1126 ) -> CertificateRevocationList:
1127 if self._issuer_name is None:
1128 raise ValueError("A CRL must have an issuer name")
1129
1130 if self._last_update is None:
1131 raise ValueError("A CRL must have a last update time")
1132
1133 if self._next_update is None:
1134 raise ValueError("A CRL must have a next update time")
1135
1136 if rsa_padding is not None:
1137 if not isinstance(rsa_padding, (padding.PSS, padding.PKCS1v15)):
1138 raise TypeError("Padding must be PSS or PKCS1v15")
1139 if not isinstance(private_key, rsa.RSAPrivateKey):
1140 raise TypeError("Padding is only supported for RSA keys")
1141
1142 return rust_x509.create_x509_crl(
1143 self, private_key, algorithm, rsa_padding
1144 )
1145
1146
1147class RevokedCertificateBuilder:
1148 def __init__(
1149 self,
1150 serial_number: int | None = None,
1151 revocation_date: datetime.datetime | None = None,
1152 extensions: list[Extension[ExtensionType]] = [],
1153 ):
1154 self._serial_number = serial_number
1155 self._revocation_date = revocation_date
1156 self._extensions = extensions
1157
1158 def serial_number(self, number: int) -> RevokedCertificateBuilder:
1159 if not isinstance(number, int):
1160 raise TypeError("Serial number must be of integral type.")
1161 if self._serial_number is not None:
1162 raise ValueError("The serial number may only be set once.")
1163 if number <= 0:
1164 raise ValueError("The serial number should be positive")
1165
1166 # ASN.1 integers are always signed, so most significant bit must be
1167 # zero.
1168 if number.bit_length() >= 160: # As defined in RFC 5280
1169 raise ValueError(
1170 "The serial number should not be more than 159 " "bits."
1171 )
1172 return RevokedCertificateBuilder(
1173 number, self._revocation_date, self._extensions
1174 )
1175
1176 def revocation_date(
1177 self, time: datetime.datetime
1178 ) -> RevokedCertificateBuilder:
1179 if not isinstance(time, datetime.datetime):
1180 raise TypeError("Expecting datetime object.")
1181 if self._revocation_date is not None:
1182 raise ValueError("The revocation date may only be set once.")
1183 time = _convert_to_naive_utc_time(time)
1184 if time < _EARLIEST_UTC_TIME:
1185 raise ValueError(
1186 "The revocation date must be on or after" " 1950 January 1."
1187 )
1188 return RevokedCertificateBuilder(
1189 self._serial_number, time, self._extensions
1190 )
1191
1192 def add_extension(
1193 self, extval: ExtensionType, critical: bool
1194 ) -> RevokedCertificateBuilder:
1195 if not isinstance(extval, ExtensionType):
1196 raise TypeError("extension must be an ExtensionType")
1197
1198 extension = Extension(extval.oid, critical, extval)
1199 _reject_duplicate_extension(extension, self._extensions)
1200 return RevokedCertificateBuilder(
1201 self._serial_number,
1202 self._revocation_date,
1203 [*self._extensions, extension],
1204 )
1205
1206 def build(self, backend: typing.Any = None) -> RevokedCertificate:
1207 if self._serial_number is None:
1208 raise ValueError("A revoked certificate must have a serial number")
1209 if self._revocation_date is None:
1210 raise ValueError(
1211 "A revoked certificate must have a revocation date"
1212 )
1213 return _RawRevokedCertificate(
1214 self._serial_number,
1215 self._revocation_date,
1216 Extensions(self._extensions),
1217 )
1218
1219
1220def random_serial_number() -> int:
1221 return int.from_bytes(os.urandom(20), "big") >> 1