1# This file is dual licensed under the terms of the Apache License, Version
2# 2.0, and the BSD License. See the LICENSE file in the root of this repository
3# for complete details.
4
5from __future__ import annotations
6
7import datetime
8import os
9import typing
10from collections.abc import Iterable
11
12from cryptography import utils
13from cryptography.hazmat.bindings._rust import x509 as rust_x509
14from cryptography.hazmat.primitives import hashes
15from cryptography.hazmat.primitives.asymmetric import (
16 dsa,
17 ec,
18 ed448,
19 ed25519,
20 padding,
21 rsa,
22 x448,
23 x25519,
24)
25from cryptography.hazmat.primitives.asymmetric.types import (
26 CertificateIssuerPrivateKeyTypes,
27 CertificatePublicKeyTypes,
28)
29from cryptography.x509.extensions import (
30 Extension,
31 ExtensionType,
32 _make_sequence_methods,
33)
34from cryptography.x509.name import Name, _ASN1Type
35from cryptography.x509.oid import ObjectIdentifier
36
37_EARLIEST_UTC_TIME = datetime.datetime(1950, 1, 1)
38
39# This must be kept in sync with sign.rs's list of allowable types in
40# identify_hash_type
41_AllowedHashTypes = typing.Union[
42 hashes.SHA224,
43 hashes.SHA256,
44 hashes.SHA384,
45 hashes.SHA512,
46 hashes.SHA3_224,
47 hashes.SHA3_256,
48 hashes.SHA3_384,
49 hashes.SHA3_512,
50]
51
52
53class AttributeNotFound(Exception):
54 def __init__(self, msg: str, oid: ObjectIdentifier) -> None:
55 super().__init__(msg)
56 self.oid = oid
57
58
59def _reject_duplicate_extension(
60 extension: Extension[ExtensionType],
61 extensions: list[Extension[ExtensionType]],
62) -> None:
63 # This is quadratic in the number of extensions
64 for e in extensions:
65 if e.oid == extension.oid:
66 raise ValueError("This extension has already been set.")
67
68
69def _reject_duplicate_attribute(
70 oid: ObjectIdentifier,
71 attributes: list[tuple[ObjectIdentifier, bytes, int | None]],
72) -> None:
73 # This is quadratic in the number of attributes
74 for attr_oid, _, _ in attributes:
75 if attr_oid == oid:
76 raise ValueError("This attribute has already been set.")
77
78
79def _convert_to_naive_utc_time(time: datetime.datetime) -> datetime.datetime:
80 """Normalizes a datetime to a naive datetime in UTC.
81
82 time -- datetime to normalize. Assumed to be in UTC if not timezone
83 aware.
84 """
85 if time.tzinfo is not None:
86 offset = time.utcoffset()
87 offset = offset if offset else datetime.timedelta()
88 return time.replace(tzinfo=None) - offset
89 else:
90 return time
91
92
93class Attribute:
94 def __init__(
95 self,
96 oid: ObjectIdentifier,
97 value: bytes,
98 _type: int = _ASN1Type.UTF8String.value,
99 ) -> None:
100 self._oid = oid
101 self._value = value
102 self._type = _type
103
104 @property
105 def oid(self) -> ObjectIdentifier:
106 return self._oid
107
108 @property
109 def value(self) -> bytes:
110 return self._value
111
112 def __repr__(self) -> str:
113 return f"<Attribute(oid={self.oid}, value={self.value!r})>"
114
115 def __eq__(self, other: object) -> bool:
116 if not isinstance(other, Attribute):
117 return NotImplemented
118
119 return (
120 self.oid == other.oid
121 and self.value == other.value
122 and self._type == other._type
123 )
124
125 def __hash__(self) -> int:
126 return hash((self.oid, self.value, self._type))
127
128
129class Attributes:
130 def __init__(
131 self,
132 attributes: Iterable[Attribute],
133 ) -> None:
134 self._attributes = list(attributes)
135
136 __len__, __iter__, __getitem__ = _make_sequence_methods("_attributes")
137
138 def __repr__(self) -> str:
139 return f"<Attributes({self._attributes})>"
140
141 def get_attribute_for_oid(self, oid: ObjectIdentifier) -> Attribute:
142 for attr in self:
143 if attr.oid == oid:
144 return attr
145
146 raise AttributeNotFound(f"No {oid} attribute was found", oid)
147
148
149class Version(utils.Enum):
150 v1 = 0
151 v3 = 2
152
153
154class InvalidVersion(Exception):
155 def __init__(self, msg: str, parsed_version: int) -> None:
156 super().__init__(msg)
157 self.parsed_version = parsed_version
158
159
160Certificate = rust_x509.Certificate
161RevokedCertificate = rust_x509.RevokedCertificate
162
163
164CertificateRevocationList = rust_x509.CertificateRevocationList
165CertificateSigningRequest = rust_x509.CertificateSigningRequest
166
167
168load_pem_x509_certificate = rust_x509.load_pem_x509_certificate
169load_der_x509_certificate = rust_x509.load_der_x509_certificate
170
171load_pem_x509_certificates = rust_x509.load_pem_x509_certificates
172
173load_pem_x509_csr = rust_x509.load_pem_x509_csr
174load_der_x509_csr = rust_x509.load_der_x509_csr
175
176load_pem_x509_crl = rust_x509.load_pem_x509_crl
177load_der_x509_crl = rust_x509.load_der_x509_crl
178
179
180class CertificateSigningRequestBuilder:
181 def __init__(
182 self,
183 subject_name: Name | None = None,
184 extensions: list[Extension[ExtensionType]] = [],
185 attributes: list[tuple[ObjectIdentifier, bytes, int | None]] = [],
186 ):
187 """
188 Creates an empty X.509 certificate request (v1).
189 """
190 self._subject_name = subject_name
191 self._extensions = extensions
192 self._attributes = attributes
193
194 def subject_name(self, name: Name) -> CertificateSigningRequestBuilder:
195 """
196 Sets the certificate requestor's distinguished name.
197 """
198 if not isinstance(name, Name):
199 raise TypeError("Expecting x509.Name object.")
200 if self._subject_name is not None:
201 raise ValueError("The subject name may only be set once.")
202 return CertificateSigningRequestBuilder(
203 name, self._extensions, self._attributes
204 )
205
206 def add_extension(
207 self, extval: ExtensionType, critical: bool
208 ) -> CertificateSigningRequestBuilder:
209 """
210 Adds an X.509 extension to the certificate request.
211 """
212 if not isinstance(extval, ExtensionType):
213 raise TypeError("extension must be an ExtensionType")
214
215 extension = Extension(extval.oid, critical, extval)
216 _reject_duplicate_extension(extension, self._extensions)
217
218 return CertificateSigningRequestBuilder(
219 self._subject_name,
220 [*self._extensions, extension],
221 self._attributes,
222 )
223
224 def add_attribute(
225 self,
226 oid: ObjectIdentifier,
227 value: bytes,
228 *,
229 _tag: _ASN1Type | None = None,
230 ) -> CertificateSigningRequestBuilder:
231 """
232 Adds an X.509 attribute with an OID and associated value.
233 """
234 if not isinstance(oid, ObjectIdentifier):
235 raise TypeError("oid must be an ObjectIdentifier")
236
237 if not isinstance(value, bytes):
238 raise TypeError("value must be bytes")
239
240 if _tag is not None and not isinstance(_tag, _ASN1Type):
241 raise TypeError("tag must be _ASN1Type")
242
243 _reject_duplicate_attribute(oid, self._attributes)
244
245 if _tag is not None:
246 tag = _tag.value
247 else:
248 tag = None
249
250 return CertificateSigningRequestBuilder(
251 self._subject_name,
252 self._extensions,
253 [*self._attributes, (oid, value, tag)],
254 )
255
256 def sign(
257 self,
258 private_key: CertificateIssuerPrivateKeyTypes,
259 algorithm: _AllowedHashTypes | None,
260 backend: typing.Any = None,
261 *,
262 rsa_padding: padding.PSS | padding.PKCS1v15 | None = None,
263 ecdsa_deterministic: bool | None = None,
264 ) -> CertificateSigningRequest:
265 """
266 Signs the request using the requestor's private key.
267 """
268 if self._subject_name is None:
269 raise ValueError("A CertificateSigningRequest must have a subject")
270
271 if rsa_padding is not None:
272 if not isinstance(rsa_padding, (padding.PSS, padding.PKCS1v15)):
273 raise TypeError("Padding must be PSS or PKCS1v15")
274 if not isinstance(private_key, rsa.RSAPrivateKey):
275 raise TypeError("Padding is only supported for RSA keys")
276
277 if ecdsa_deterministic is not None:
278 if not isinstance(private_key, ec.EllipticCurvePrivateKey):
279 raise TypeError(
280 "Deterministic ECDSA is only supported for EC keys"
281 )
282
283 return rust_x509.create_x509_csr(
284 self,
285 private_key,
286 algorithm,
287 rsa_padding,
288 ecdsa_deterministic,
289 )
290
291
292class CertificateBuilder:
293 _extensions: list[Extension[ExtensionType]]
294
295 def __init__(
296 self,
297 issuer_name: Name | None = None,
298 subject_name: Name | None = None,
299 public_key: CertificatePublicKeyTypes | None = None,
300 serial_number: int | None = None,
301 not_valid_before: datetime.datetime | None = None,
302 not_valid_after: datetime.datetime | None = None,
303 extensions: list[Extension[ExtensionType]] = [],
304 ) -> None:
305 self._version = Version.v3
306 self._issuer_name = issuer_name
307 self._subject_name = subject_name
308 self._public_key = public_key
309 self._serial_number = serial_number
310 self._not_valid_before = not_valid_before
311 self._not_valid_after = not_valid_after
312 self._extensions = extensions
313
314 def issuer_name(self, name: Name) -> CertificateBuilder:
315 """
316 Sets the CA's distinguished name.
317 """
318 if not isinstance(name, Name):
319 raise TypeError("Expecting x509.Name object.")
320 if self._issuer_name is not None:
321 raise ValueError("The issuer name may only be set once.")
322 return CertificateBuilder(
323 name,
324 self._subject_name,
325 self._public_key,
326 self._serial_number,
327 self._not_valid_before,
328 self._not_valid_after,
329 self._extensions,
330 )
331
332 def subject_name(self, name: Name) -> CertificateBuilder:
333 """
334 Sets the requestor's distinguished name.
335 """
336 if not isinstance(name, Name):
337 raise TypeError("Expecting x509.Name object.")
338 if self._subject_name is not None:
339 raise ValueError("The subject name may only be set once.")
340 return CertificateBuilder(
341 self._issuer_name,
342 name,
343 self._public_key,
344 self._serial_number,
345 self._not_valid_before,
346 self._not_valid_after,
347 self._extensions,
348 )
349
350 def public_key(
351 self,
352 key: CertificatePublicKeyTypes,
353 ) -> CertificateBuilder:
354 """
355 Sets the requestor's public key (as found in the signing request).
356 """
357 if not isinstance(
358 key,
359 (
360 dsa.DSAPublicKey,
361 rsa.RSAPublicKey,
362 ec.EllipticCurvePublicKey,
363 ed25519.Ed25519PublicKey,
364 ed448.Ed448PublicKey,
365 x25519.X25519PublicKey,
366 x448.X448PublicKey,
367 ),
368 ):
369 raise TypeError(
370 "Expecting one of DSAPublicKey, RSAPublicKey,"
371 " EllipticCurvePublicKey, Ed25519PublicKey,"
372 " Ed448PublicKey, X25519PublicKey, or "
373 "X448PublicKey."
374 )
375 if self._public_key is not None:
376 raise ValueError("The public key may only be set once.")
377 return CertificateBuilder(
378 self._issuer_name,
379 self._subject_name,
380 key,
381 self._serial_number,
382 self._not_valid_before,
383 self._not_valid_after,
384 self._extensions,
385 )
386
387 def serial_number(self, number: int) -> CertificateBuilder:
388 """
389 Sets the certificate serial number.
390 """
391 if not isinstance(number, int):
392 raise TypeError("Serial number must be of integral type.")
393 if self._serial_number is not None:
394 raise ValueError("The serial number may only be set once.")
395 if number <= 0:
396 raise ValueError("The serial number should be positive.")
397
398 # ASN.1 integers are always signed, so most significant bit must be
399 # zero.
400 if number.bit_length() >= 160: # As defined in RFC 5280
401 raise ValueError(
402 "The serial number should not be more than 159 bits."
403 )
404 return CertificateBuilder(
405 self._issuer_name,
406 self._subject_name,
407 self._public_key,
408 number,
409 self._not_valid_before,
410 self._not_valid_after,
411 self._extensions,
412 )
413
414 def not_valid_before(self, time: datetime.datetime) -> CertificateBuilder:
415 """
416 Sets the certificate activation time.
417 """
418 if not isinstance(time, datetime.datetime):
419 raise TypeError("Expecting datetime object.")
420 if self._not_valid_before is not None:
421 raise ValueError("The not valid before may only be set once.")
422 time = _convert_to_naive_utc_time(time)
423 if time < _EARLIEST_UTC_TIME:
424 raise ValueError(
425 "The not valid before date must be on or after"
426 " 1950 January 1)."
427 )
428 if self._not_valid_after is not None and time > self._not_valid_after:
429 raise ValueError(
430 "The not valid before date must be before the not valid after "
431 "date."
432 )
433 return CertificateBuilder(
434 self._issuer_name,
435 self._subject_name,
436 self._public_key,
437 self._serial_number,
438 time,
439 self._not_valid_after,
440 self._extensions,
441 )
442
443 def not_valid_after(self, time: datetime.datetime) -> CertificateBuilder:
444 """
445 Sets the certificate expiration time.
446 """
447 if not isinstance(time, datetime.datetime):
448 raise TypeError("Expecting datetime object.")
449 if self._not_valid_after is not None:
450 raise ValueError("The not valid after may only be set once.")
451 time = _convert_to_naive_utc_time(time)
452 if time < _EARLIEST_UTC_TIME:
453 raise ValueError(
454 "The not valid after date must be on or after 1950 January 1."
455 )
456 if (
457 self._not_valid_before is not None
458 and time < self._not_valid_before
459 ):
460 raise ValueError(
461 "The not valid after date must be after the not valid before "
462 "date."
463 )
464 return CertificateBuilder(
465 self._issuer_name,
466 self._subject_name,
467 self._public_key,
468 self._serial_number,
469 self._not_valid_before,
470 time,
471 self._extensions,
472 )
473
474 def add_extension(
475 self, extval: ExtensionType, critical: bool
476 ) -> CertificateBuilder:
477 """
478 Adds an X.509 extension to the certificate.
479 """
480 if not isinstance(extval, ExtensionType):
481 raise TypeError("extension must be an ExtensionType")
482
483 extension = Extension(extval.oid, critical, extval)
484 _reject_duplicate_extension(extension, self._extensions)
485
486 return CertificateBuilder(
487 self._issuer_name,
488 self._subject_name,
489 self._public_key,
490 self._serial_number,
491 self._not_valid_before,
492 self._not_valid_after,
493 [*self._extensions, extension],
494 )
495
496 def sign(
497 self,
498 private_key: CertificateIssuerPrivateKeyTypes,
499 algorithm: _AllowedHashTypes | None,
500 backend: typing.Any = None,
501 *,
502 rsa_padding: padding.PSS | padding.PKCS1v15 | None = None,
503 ecdsa_deterministic: bool | None = None,
504 ) -> Certificate:
505 """
506 Signs the certificate using the CA's private key.
507 """
508 if self._subject_name is None:
509 raise ValueError("A certificate must have a subject name")
510
511 if self._issuer_name is None:
512 raise ValueError("A certificate must have an issuer name")
513
514 if self._serial_number is None:
515 raise ValueError("A certificate must have a serial number")
516
517 if self._not_valid_before is None:
518 raise ValueError("A certificate must have a not valid before time")
519
520 if self._not_valid_after is None:
521 raise ValueError("A certificate must have a not valid after time")
522
523 if self._public_key is None:
524 raise ValueError("A certificate must have a public key")
525
526 if rsa_padding is not None:
527 if not isinstance(rsa_padding, (padding.PSS, padding.PKCS1v15)):
528 raise TypeError("Padding must be PSS or PKCS1v15")
529 if not isinstance(private_key, rsa.RSAPrivateKey):
530 raise TypeError("Padding is only supported for RSA keys")
531
532 if ecdsa_deterministic is not None:
533 if not isinstance(private_key, ec.EllipticCurvePrivateKey):
534 raise TypeError(
535 "Deterministic ECDSA is only supported for EC keys"
536 )
537
538 return rust_x509.create_x509_certificate(
539 self,
540 private_key,
541 algorithm,
542 rsa_padding,
543 ecdsa_deterministic,
544 )
545
546
547class CertificateRevocationListBuilder:
548 _extensions: list[Extension[ExtensionType]]
549 _revoked_certificates: list[RevokedCertificate]
550
551 def __init__(
552 self,
553 issuer_name: Name | None = None,
554 last_update: datetime.datetime | None = None,
555 next_update: datetime.datetime | None = None,
556 extensions: list[Extension[ExtensionType]] = [],
557 revoked_certificates: list[RevokedCertificate] = [],
558 ):
559 self._issuer_name = issuer_name
560 self._last_update = last_update
561 self._next_update = next_update
562 self._extensions = extensions
563 self._revoked_certificates = revoked_certificates
564
565 def issuer_name(
566 self, issuer_name: Name
567 ) -> CertificateRevocationListBuilder:
568 if not isinstance(issuer_name, Name):
569 raise TypeError("Expecting x509.Name object.")
570 if self._issuer_name is not None:
571 raise ValueError("The issuer name may only be set once.")
572 return CertificateRevocationListBuilder(
573 issuer_name,
574 self._last_update,
575 self._next_update,
576 self._extensions,
577 self._revoked_certificates,
578 )
579
580 def last_update(
581 self, last_update: datetime.datetime
582 ) -> CertificateRevocationListBuilder:
583 if not isinstance(last_update, datetime.datetime):
584 raise TypeError("Expecting datetime object.")
585 if self._last_update is not None:
586 raise ValueError("Last update may only be set once.")
587 last_update = _convert_to_naive_utc_time(last_update)
588 if last_update < _EARLIEST_UTC_TIME:
589 raise ValueError(
590 "The last update date must be on or after 1950 January 1."
591 )
592 if self._next_update is not None and last_update > self._next_update:
593 raise ValueError(
594 "The last update date must be before the next update date."
595 )
596 return CertificateRevocationListBuilder(
597 self._issuer_name,
598 last_update,
599 self._next_update,
600 self._extensions,
601 self._revoked_certificates,
602 )
603
604 def next_update(
605 self, next_update: datetime.datetime
606 ) -> CertificateRevocationListBuilder:
607 if not isinstance(next_update, datetime.datetime):
608 raise TypeError("Expecting datetime object.")
609 if self._next_update is not None:
610 raise ValueError("Last update may only be set once.")
611 next_update = _convert_to_naive_utc_time(next_update)
612 if next_update < _EARLIEST_UTC_TIME:
613 raise ValueError(
614 "The last update date must be on or after 1950 January 1."
615 )
616 if self._last_update is not None and next_update < self._last_update:
617 raise ValueError(
618 "The next update date must be after the last update date."
619 )
620 return CertificateRevocationListBuilder(
621 self._issuer_name,
622 self._last_update,
623 next_update,
624 self._extensions,
625 self._revoked_certificates,
626 )
627
628 def add_extension(
629 self, extval: ExtensionType, critical: bool
630 ) -> CertificateRevocationListBuilder:
631 """
632 Adds an X.509 extension to the certificate revocation list.
633 """
634 if not isinstance(extval, ExtensionType):
635 raise TypeError("extension must be an ExtensionType")
636
637 extension = Extension(extval.oid, critical, extval)
638 _reject_duplicate_extension(extension, self._extensions)
639 return CertificateRevocationListBuilder(
640 self._issuer_name,
641 self._last_update,
642 self._next_update,
643 [*self._extensions, extension],
644 self._revoked_certificates,
645 )
646
647 def add_revoked_certificate(
648 self, revoked_certificate: RevokedCertificate
649 ) -> CertificateRevocationListBuilder:
650 """
651 Adds a revoked certificate to the CRL.
652 """
653 if not isinstance(revoked_certificate, RevokedCertificate):
654 raise TypeError("Must be an instance of RevokedCertificate")
655
656 return CertificateRevocationListBuilder(
657 self._issuer_name,
658 self._last_update,
659 self._next_update,
660 self._extensions,
661 [*self._revoked_certificates, revoked_certificate],
662 )
663
664 def sign(
665 self,
666 private_key: CertificateIssuerPrivateKeyTypes,
667 algorithm: _AllowedHashTypes | None,
668 backend: typing.Any = None,
669 *,
670 rsa_padding: padding.PSS | padding.PKCS1v15 | None = None,
671 ecdsa_deterministic: bool | None = None,
672 ) -> CertificateRevocationList:
673 if self._issuer_name is None:
674 raise ValueError("A CRL must have an issuer name")
675
676 if self._last_update is None:
677 raise ValueError("A CRL must have a last update time")
678
679 if self._next_update is None:
680 raise ValueError("A CRL must have a next update time")
681
682 if rsa_padding is not None:
683 if not isinstance(rsa_padding, (padding.PSS, padding.PKCS1v15)):
684 raise TypeError("Padding must be PSS or PKCS1v15")
685 if not isinstance(private_key, rsa.RSAPrivateKey):
686 raise TypeError("Padding is only supported for RSA keys")
687
688 if ecdsa_deterministic is not None:
689 if not isinstance(private_key, ec.EllipticCurvePrivateKey):
690 raise TypeError(
691 "Deterministic ECDSA is only supported for EC keys"
692 )
693
694 return rust_x509.create_x509_crl(
695 self,
696 private_key,
697 algorithm,
698 rsa_padding,
699 ecdsa_deterministic,
700 )
701
702
703class RevokedCertificateBuilder:
704 def __init__(
705 self,
706 serial_number: int | None = None,
707 revocation_date: datetime.datetime | None = None,
708 extensions: list[Extension[ExtensionType]] = [],
709 ):
710 self._serial_number = serial_number
711 self._revocation_date = revocation_date
712 self._extensions = extensions
713
714 def serial_number(self, number: int) -> RevokedCertificateBuilder:
715 if not isinstance(number, int):
716 raise TypeError("Serial number must be of integral type.")
717 if self._serial_number is not None:
718 raise ValueError("The serial number may only be set once.")
719 if number <= 0:
720 raise ValueError("The serial number should be positive")
721
722 # ASN.1 integers are always signed, so most significant bit must be
723 # zero.
724 if number.bit_length() >= 160: # As defined in RFC 5280
725 raise ValueError(
726 "The serial number should not be more than 159 bits."
727 )
728 return RevokedCertificateBuilder(
729 number, self._revocation_date, self._extensions
730 )
731
732 def revocation_date(
733 self, time: datetime.datetime
734 ) -> RevokedCertificateBuilder:
735 if not isinstance(time, datetime.datetime):
736 raise TypeError("Expecting datetime object.")
737 if self._revocation_date is not None:
738 raise ValueError("The revocation date may only be set once.")
739 time = _convert_to_naive_utc_time(time)
740 if time < _EARLIEST_UTC_TIME:
741 raise ValueError(
742 "The revocation date must be on or after 1950 January 1."
743 )
744 return RevokedCertificateBuilder(
745 self._serial_number, time, self._extensions
746 )
747
748 def add_extension(
749 self, extval: ExtensionType, critical: bool
750 ) -> RevokedCertificateBuilder:
751 if not isinstance(extval, ExtensionType):
752 raise TypeError("extension must be an ExtensionType")
753
754 extension = Extension(extval.oid, critical, extval)
755 _reject_duplicate_extension(extension, self._extensions)
756 return RevokedCertificateBuilder(
757 self._serial_number,
758 self._revocation_date,
759 [*self._extensions, extension],
760 )
761
762 def build(self, backend: typing.Any = None) -> RevokedCertificate:
763 if self._serial_number is None:
764 raise ValueError("A revoked certificate must have a serial number")
765 if self._revocation_date is None:
766 raise ValueError(
767 "A revoked certificate must have a revocation date"
768 )
769 return rust_x509.create_revoked_certificate(self)
770
771
772def random_serial_number() -> int:
773 return int.from_bytes(os.urandom(20), "big") >> 1