Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/cryptography/x509/base.py: 53%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

487 statements  

1# This file is dual licensed under the terms of the Apache License, Version 

2# 2.0, and the BSD License. See the LICENSE file in the root of this repository 

3# for complete details. 

4 

5from __future__ import annotations 

6 

7import abc 

8import datetime 

9import os 

10import typing 

11import warnings 

12 

13from cryptography import utils 

14from cryptography.hazmat.bindings._rust import x509 as rust_x509 

15from cryptography.hazmat.primitives import hashes, serialization 

16from cryptography.hazmat.primitives.asymmetric import ( 

17 dsa, 

18 ec, 

19 ed448, 

20 ed25519, 

21 padding, 

22 rsa, 

23 x448, 

24 x25519, 

25) 

26from cryptography.hazmat.primitives.asymmetric.types import ( 

27 CertificateIssuerPrivateKeyTypes, 

28 CertificateIssuerPublicKeyTypes, 

29 CertificatePublicKeyTypes, 

30) 

31from cryptography.x509.extensions import ( 

32 Extension, 

33 Extensions, 

34 ExtensionType, 

35 _make_sequence_methods, 

36) 

37from cryptography.x509.name import Name, _ASN1Type 

38from cryptography.x509.oid import ObjectIdentifier 

39 

40_EARLIEST_UTC_TIME = datetime.datetime(1950, 1, 1) 

41 

42# This must be kept in sync with sign.rs's list of allowable types in 

43# identify_hash_type 

44_AllowedHashTypes = typing.Union[ 

45 hashes.SHA224, 

46 hashes.SHA256, 

47 hashes.SHA384, 

48 hashes.SHA512, 

49 hashes.SHA3_224, 

50 hashes.SHA3_256, 

51 hashes.SHA3_384, 

52 hashes.SHA3_512, 

53] 

54 

55 

56class AttributeNotFound(Exception): 

57 def __init__(self, msg: str, oid: ObjectIdentifier) -> None: 

58 super().__init__(msg) 

59 self.oid = oid 

60 

61 

62def _reject_duplicate_extension( 

63 extension: Extension[ExtensionType], 

64 extensions: list[Extension[ExtensionType]], 

65) -> None: 

66 # This is quadratic in the number of extensions 

67 for e in extensions: 

68 if e.oid == extension.oid: 

69 raise ValueError("This extension has already been set.") 

70 

71 

72def _reject_duplicate_attribute( 

73 oid: ObjectIdentifier, 

74 attributes: list[tuple[ObjectIdentifier, bytes, int | None]], 

75) -> None: 

76 # This is quadratic in the number of attributes 

77 for attr_oid, _, _ in attributes: 

78 if attr_oid == oid: 

79 raise ValueError("This attribute has already been set.") 

80 

81 

82def _convert_to_naive_utc_time(time: datetime.datetime) -> datetime.datetime: 

83 """Normalizes a datetime to a naive datetime in UTC. 

84 

85 time -- datetime to normalize. Assumed to be in UTC if not timezone 

86 aware. 

87 """ 

88 if time.tzinfo is not None: 

89 offset = time.utcoffset() 

90 offset = offset if offset else datetime.timedelta() 

91 return time.replace(tzinfo=None) - offset 

92 else: 

93 return time 

94 

95 

96class Attribute: 

97 def __init__( 

98 self, 

99 oid: ObjectIdentifier, 

100 value: bytes, 

101 _type: int = _ASN1Type.UTF8String.value, 

102 ) -> None: 

103 self._oid = oid 

104 self._value = value 

105 self._type = _type 

106 

107 @property 

108 def oid(self) -> ObjectIdentifier: 

109 return self._oid 

110 

111 @property 

112 def value(self) -> bytes: 

113 return self._value 

114 

115 def __repr__(self) -> str: 

116 return f"<Attribute(oid={self.oid}, value={self.value!r})>" 

117 

118 def __eq__(self, other: object) -> bool: 

119 if not isinstance(other, Attribute): 

120 return NotImplemented 

121 

122 return ( 

123 self.oid == other.oid 

124 and self.value == other.value 

125 and self._type == other._type 

126 ) 

127 

128 def __hash__(self) -> int: 

129 return hash((self.oid, self.value, self._type)) 

130 

131 

132class Attributes: 

133 def __init__( 

134 self, 

135 attributes: typing.Iterable[Attribute], 

136 ) -> None: 

137 self._attributes = list(attributes) 

138 

139 __len__, __iter__, __getitem__ = _make_sequence_methods("_attributes") 

140 

141 def __repr__(self) -> str: 

142 return f"<Attributes({self._attributes})>" 

143 

144 def get_attribute_for_oid(self, oid: ObjectIdentifier) -> Attribute: 

145 for attr in self: 

146 if attr.oid == oid: 

147 return attr 

148 

149 raise AttributeNotFound(f"No {oid} attribute was found", oid) 

150 

151 

152class Version(utils.Enum): 

153 v1 = 0 

154 v3 = 2 

155 

156 

157class InvalidVersion(Exception): 

158 def __init__(self, msg: str, parsed_version: int) -> None: 

159 super().__init__(msg) 

160 self.parsed_version = parsed_version 

161 

162 

163class Certificate(metaclass=abc.ABCMeta): 

164 @abc.abstractmethod 

165 def fingerprint(self, algorithm: hashes.HashAlgorithm) -> bytes: 

166 """ 

167 Returns bytes using digest passed. 

168 """ 

169 

170 @property 

171 @abc.abstractmethod 

172 def serial_number(self) -> int: 

173 """ 

174 Returns certificate serial number 

175 """ 

176 

177 @property 

178 @abc.abstractmethod 

179 def version(self) -> Version: 

180 """ 

181 Returns the certificate version 

182 """ 

183 

184 @abc.abstractmethod 

185 def public_key(self) -> CertificatePublicKeyTypes: 

186 """ 

187 Returns the public key 

188 """ 

189 

190 @property 

191 @abc.abstractmethod 

192 def not_valid_before(self) -> datetime.datetime: 

193 """ 

194 Not before time (represented as UTC datetime) 

195 """ 

196 

197 @property 

198 @abc.abstractmethod 

199 def not_valid_before_utc(self) -> datetime.datetime: 

200 """ 

201 Not before time (represented as a non-naive UTC datetime) 

202 """ 

203 

204 @property 

205 @abc.abstractmethod 

206 def not_valid_after(self) -> datetime.datetime: 

207 """ 

208 Not after time (represented as UTC datetime) 

209 """ 

210 

211 @property 

212 @abc.abstractmethod 

213 def not_valid_after_utc(self) -> datetime.datetime: 

214 """ 

215 Not after time (represented as a non-naive UTC datetime) 

216 """ 

217 

218 @property 

219 @abc.abstractmethod 

220 def issuer(self) -> Name: 

221 """ 

222 Returns the issuer name object. 

223 """ 

224 

225 @property 

226 @abc.abstractmethod 

227 def subject(self) -> Name: 

228 """ 

229 Returns the subject name object. 

230 """ 

231 

232 @property 

233 @abc.abstractmethod 

234 def signature_hash_algorithm( 

235 self, 

236 ) -> hashes.HashAlgorithm | None: 

237 """ 

238 Returns a HashAlgorithm corresponding to the type of the digest signed 

239 in the certificate. 

240 """ 

241 

242 @property 

243 @abc.abstractmethod 

244 def signature_algorithm_oid(self) -> ObjectIdentifier: 

245 """ 

246 Returns the ObjectIdentifier of the signature algorithm. 

247 """ 

248 

249 @property 

250 @abc.abstractmethod 

251 def signature_algorithm_parameters( 

252 self, 

253 ) -> None | padding.PSS | padding.PKCS1v15 | ec.ECDSA: 

254 """ 

255 Returns the signature algorithm parameters. 

256 """ 

257 

258 @property 

259 @abc.abstractmethod 

260 def extensions(self) -> Extensions: 

261 """ 

262 Returns an Extensions object. 

263 """ 

264 

265 @property 

266 @abc.abstractmethod 

267 def signature(self) -> bytes: 

268 """ 

269 Returns the signature bytes. 

270 """ 

271 

272 @property 

273 @abc.abstractmethod 

274 def tbs_certificate_bytes(self) -> bytes: 

275 """ 

276 Returns the tbsCertificate payload bytes as defined in RFC 5280. 

277 """ 

278 

279 @property 

280 @abc.abstractmethod 

281 def tbs_precertificate_bytes(self) -> bytes: 

282 """ 

283 Returns the tbsCertificate payload bytes with the SCT list extension 

284 stripped. 

285 """ 

286 

287 @abc.abstractmethod 

288 def __eq__(self, other: object) -> bool: 

289 """ 

290 Checks equality. 

291 """ 

292 

293 @abc.abstractmethod 

294 def __hash__(self) -> int: 

295 """ 

296 Computes a hash. 

297 """ 

298 

299 @abc.abstractmethod 

300 def public_bytes(self, encoding: serialization.Encoding) -> bytes: 

301 """ 

302 Serializes the certificate to PEM or DER format. 

303 """ 

304 

305 @abc.abstractmethod 

306 def verify_directly_issued_by(self, issuer: Certificate) -> None: 

307 """ 

308 This method verifies that certificate issuer name matches the 

309 issuer subject name and that the certificate is signed by the 

310 issuer's private key. No other validation is performed. 

311 """ 

312 

313 

314# Runtime isinstance checks need this since the rust class is not a subclass. 

315Certificate.register(rust_x509.Certificate) 

316 

317 

318class RevokedCertificate(metaclass=abc.ABCMeta): 

319 @property 

320 @abc.abstractmethod 

321 def serial_number(self) -> int: 

322 """ 

323 Returns the serial number of the revoked certificate. 

324 """ 

325 

326 @property 

327 @abc.abstractmethod 

328 def revocation_date(self) -> datetime.datetime: 

329 """ 

330 Returns the date of when this certificate was revoked. 

331 """ 

332 

333 @property 

334 @abc.abstractmethod 

335 def revocation_date_utc(self) -> datetime.datetime: 

336 """ 

337 Returns the date of when this certificate was revoked as a non-naive 

338 UTC datetime. 

339 """ 

340 

341 @property 

342 @abc.abstractmethod 

343 def extensions(self) -> Extensions: 

344 """ 

345 Returns an Extensions object containing a list of Revoked extensions. 

346 """ 

347 

348 

349# Runtime isinstance checks need this since the rust class is not a subclass. 

350RevokedCertificate.register(rust_x509.RevokedCertificate) 

351 

352 

353class _RawRevokedCertificate(RevokedCertificate): 

354 def __init__( 

355 self, 

356 serial_number: int, 

357 revocation_date: datetime.datetime, 

358 extensions: Extensions, 

359 ): 

360 self._serial_number = serial_number 

361 self._revocation_date = revocation_date 

362 self._extensions = extensions 

363 

364 @property 

365 def serial_number(self) -> int: 

366 return self._serial_number 

367 

368 @property 

369 def revocation_date(self) -> datetime.datetime: 

370 warnings.warn( 

371 "Properties that return a naïve datetime object have been " 

372 "deprecated. Please switch to revocation_date_utc.", 

373 utils.DeprecatedIn42, 

374 stacklevel=2, 

375 ) 

376 return self._revocation_date 

377 

378 @property 

379 def revocation_date_utc(self) -> datetime.datetime: 

380 return self._revocation_date.replace(tzinfo=datetime.timezone.utc) 

381 

382 @property 

383 def extensions(self) -> Extensions: 

384 return self._extensions 

385 

386 

387class CertificateRevocationList(metaclass=abc.ABCMeta): 

388 @abc.abstractmethod 

389 def public_bytes(self, encoding: serialization.Encoding) -> bytes: 

390 """ 

391 Serializes the CRL to PEM or DER format. 

392 """ 

393 

394 @abc.abstractmethod 

395 def fingerprint(self, algorithm: hashes.HashAlgorithm) -> bytes: 

396 """ 

397 Returns bytes using digest passed. 

398 """ 

399 

400 @abc.abstractmethod 

401 def get_revoked_certificate_by_serial_number( 

402 self, serial_number: int 

403 ) -> RevokedCertificate | None: 

404 """ 

405 Returns an instance of RevokedCertificate or None if the serial_number 

406 is not in the CRL. 

407 """ 

408 

409 @property 

410 @abc.abstractmethod 

411 def signature_hash_algorithm( 

412 self, 

413 ) -> hashes.HashAlgorithm | None: 

414 """ 

415 Returns a HashAlgorithm corresponding to the type of the digest signed 

416 in the certificate. 

417 """ 

418 

419 @property 

420 @abc.abstractmethod 

421 def signature_algorithm_oid(self) -> ObjectIdentifier: 

422 """ 

423 Returns the ObjectIdentifier of the signature algorithm. 

424 """ 

425 

426 @property 

427 @abc.abstractmethod 

428 def signature_algorithm_parameters( 

429 self, 

430 ) -> None | padding.PSS | padding.PKCS1v15 | ec.ECDSA: 

431 """ 

432 Returns the signature algorithm parameters. 

433 """ 

434 

435 @property 

436 @abc.abstractmethod 

437 def issuer(self) -> Name: 

438 """ 

439 Returns the X509Name with the issuer of this CRL. 

440 """ 

441 

442 @property 

443 @abc.abstractmethod 

444 def next_update(self) -> datetime.datetime | None: 

445 """ 

446 Returns the date of next update for this CRL. 

447 """ 

448 

449 @property 

450 @abc.abstractmethod 

451 def next_update_utc(self) -> datetime.datetime | None: 

452 """ 

453 Returns the date of next update for this CRL as a non-naive UTC 

454 datetime. 

455 """ 

456 

457 @property 

458 @abc.abstractmethod 

459 def last_update(self) -> datetime.datetime: 

460 """ 

461 Returns the date of last update for this CRL. 

462 """ 

463 

464 @property 

465 @abc.abstractmethod 

466 def last_update_utc(self) -> datetime.datetime: 

467 """ 

468 Returns the date of last update for this CRL as a non-naive UTC 

469 datetime. 

470 """ 

471 

472 @property 

473 @abc.abstractmethod 

474 def extensions(self) -> Extensions: 

475 """ 

476 Returns an Extensions object containing a list of CRL extensions. 

477 """ 

478 

479 @property 

480 @abc.abstractmethod 

481 def signature(self) -> bytes: 

482 """ 

483 Returns the signature bytes. 

484 """ 

485 

486 @property 

487 @abc.abstractmethod 

488 def tbs_certlist_bytes(self) -> bytes: 

489 """ 

490 Returns the tbsCertList payload bytes as defined in RFC 5280. 

491 """ 

492 

493 @abc.abstractmethod 

494 def __eq__(self, other: object) -> bool: 

495 """ 

496 Checks equality. 

497 """ 

498 

499 @abc.abstractmethod 

500 def __len__(self) -> int: 

501 """ 

502 Number of revoked certificates in the CRL. 

503 """ 

504 

505 @typing.overload 

506 def __getitem__(self, idx: int) -> RevokedCertificate: 

507 ... 

508 

509 @typing.overload 

510 def __getitem__(self, idx: slice) -> list[RevokedCertificate]: 

511 ... 

512 

513 @abc.abstractmethod 

514 def __getitem__( 

515 self, idx: int | slice 

516 ) -> RevokedCertificate | list[RevokedCertificate]: 

517 """ 

518 Returns a revoked certificate (or slice of revoked certificates). 

519 """ 

520 

521 @abc.abstractmethod 

522 def __iter__(self) -> typing.Iterator[RevokedCertificate]: 

523 """ 

524 Iterator over the revoked certificates 

525 """ 

526 

527 @abc.abstractmethod 

528 def is_signature_valid( 

529 self, public_key: CertificateIssuerPublicKeyTypes 

530 ) -> bool: 

531 """ 

532 Verifies signature of revocation list against given public key. 

533 """ 

534 

535 

536CertificateRevocationList.register(rust_x509.CertificateRevocationList) 

537 

538 

539class CertificateSigningRequest(metaclass=abc.ABCMeta): 

540 @abc.abstractmethod 

541 def __eq__(self, other: object) -> bool: 

542 """ 

543 Checks equality. 

544 """ 

545 

546 @abc.abstractmethod 

547 def __hash__(self) -> int: 

548 """ 

549 Computes a hash. 

550 """ 

551 

552 @abc.abstractmethod 

553 def public_key(self) -> CertificatePublicKeyTypes: 

554 """ 

555 Returns the public key 

556 """ 

557 

558 @property 

559 @abc.abstractmethod 

560 def subject(self) -> Name: 

561 """ 

562 Returns the subject name object. 

563 """ 

564 

565 @property 

566 @abc.abstractmethod 

567 def signature_hash_algorithm( 

568 self, 

569 ) -> hashes.HashAlgorithm | None: 

570 """ 

571 Returns a HashAlgorithm corresponding to the type of the digest signed 

572 in the certificate. 

573 """ 

574 

575 @property 

576 @abc.abstractmethod 

577 def signature_algorithm_oid(self) -> ObjectIdentifier: 

578 """ 

579 Returns the ObjectIdentifier of the signature algorithm. 

580 """ 

581 

582 @property 

583 @abc.abstractmethod 

584 def signature_algorithm_parameters( 

585 self, 

586 ) -> None | padding.PSS | padding.PKCS1v15 | ec.ECDSA: 

587 """ 

588 Returns the signature algorithm parameters. 

589 """ 

590 

591 @property 

592 @abc.abstractmethod 

593 def extensions(self) -> Extensions: 

594 """ 

595 Returns the extensions in the signing request. 

596 """ 

597 

598 @property 

599 @abc.abstractmethod 

600 def attributes(self) -> Attributes: 

601 """ 

602 Returns an Attributes object. 

603 """ 

604 

605 @abc.abstractmethod 

606 def public_bytes(self, encoding: serialization.Encoding) -> bytes: 

607 """ 

608 Encodes the request to PEM or DER format. 

609 """ 

610 

611 @property 

612 @abc.abstractmethod 

613 def signature(self) -> bytes: 

614 """ 

615 Returns the signature bytes. 

616 """ 

617 

618 @property 

619 @abc.abstractmethod 

620 def tbs_certrequest_bytes(self) -> bytes: 

621 """ 

622 Returns the PKCS#10 CertificationRequestInfo bytes as defined in RFC 

623 2986. 

624 """ 

625 

626 @property 

627 @abc.abstractmethod 

628 def is_signature_valid(self) -> bool: 

629 """ 

630 Verifies signature of signing request. 

631 """ 

632 

633 @abc.abstractmethod 

634 def get_attribute_for_oid(self, oid: ObjectIdentifier) -> bytes: 

635 """ 

636 Get the attribute value for a given OID. 

637 """ 

638 

639 

640# Runtime isinstance checks need this since the rust class is not a subclass. 

641CertificateSigningRequest.register(rust_x509.CertificateSigningRequest) 

642 

643 

644load_pem_x509_certificate = rust_x509.load_pem_x509_certificate 

645load_der_x509_certificate = rust_x509.load_der_x509_certificate 

646 

647load_pem_x509_certificates = rust_x509.load_pem_x509_certificates 

648 

649load_pem_x509_csr = rust_x509.load_pem_x509_csr 

650load_der_x509_csr = rust_x509.load_der_x509_csr 

651 

652load_pem_x509_crl = rust_x509.load_pem_x509_crl 

653load_der_x509_crl = rust_x509.load_der_x509_crl 

654 

655 

656class CertificateSigningRequestBuilder: 

657 def __init__( 

658 self, 

659 subject_name: Name | None = None, 

660 extensions: list[Extension[ExtensionType]] = [], 

661 attributes: list[tuple[ObjectIdentifier, bytes, int | None]] = [], 

662 ): 

663 """ 

664 Creates an empty X.509 certificate request (v1). 

665 """ 

666 self._subject_name = subject_name 

667 self._extensions = extensions 

668 self._attributes = attributes 

669 

670 def subject_name(self, name: Name) -> CertificateSigningRequestBuilder: 

671 """ 

672 Sets the certificate requestor's distinguished name. 

673 """ 

674 if not isinstance(name, Name): 

675 raise TypeError("Expecting x509.Name object.") 

676 if self._subject_name is not None: 

677 raise ValueError("The subject name may only be set once.") 

678 return CertificateSigningRequestBuilder( 

679 name, self._extensions, self._attributes 

680 ) 

681 

682 def add_extension( 

683 self, extval: ExtensionType, critical: bool 

684 ) -> CertificateSigningRequestBuilder: 

685 """ 

686 Adds an X.509 extension to the certificate request. 

687 """ 

688 if not isinstance(extval, ExtensionType): 

689 raise TypeError("extension must be an ExtensionType") 

690 

691 extension = Extension(extval.oid, critical, extval) 

692 _reject_duplicate_extension(extension, self._extensions) 

693 

694 return CertificateSigningRequestBuilder( 

695 self._subject_name, 

696 [*self._extensions, extension], 

697 self._attributes, 

698 ) 

699 

700 def add_attribute( 

701 self, 

702 oid: ObjectIdentifier, 

703 value: bytes, 

704 *, 

705 _tag: _ASN1Type | None = None, 

706 ) -> CertificateSigningRequestBuilder: 

707 """ 

708 Adds an X.509 attribute with an OID and associated value. 

709 """ 

710 if not isinstance(oid, ObjectIdentifier): 

711 raise TypeError("oid must be an ObjectIdentifier") 

712 

713 if not isinstance(value, bytes): 

714 raise TypeError("value must be bytes") 

715 

716 if _tag is not None and not isinstance(_tag, _ASN1Type): 

717 raise TypeError("tag must be _ASN1Type") 

718 

719 _reject_duplicate_attribute(oid, self._attributes) 

720 

721 if _tag is not None: 

722 tag = _tag.value 

723 else: 

724 tag = None 

725 

726 return CertificateSigningRequestBuilder( 

727 self._subject_name, 

728 self._extensions, 

729 [*self._attributes, (oid, value, tag)], 

730 ) 

731 

732 def sign( 

733 self, 

734 private_key: CertificateIssuerPrivateKeyTypes, 

735 algorithm: _AllowedHashTypes | None, 

736 backend: typing.Any = None, 

737 *, 

738 rsa_padding: padding.PSS | padding.PKCS1v15 | None = None, 

739 ) -> CertificateSigningRequest: 

740 """ 

741 Signs the request using the requestor's private key. 

742 """ 

743 if self._subject_name is None: 

744 raise ValueError("A CertificateSigningRequest must have a subject") 

745 

746 if rsa_padding is not None: 

747 if not isinstance(rsa_padding, (padding.PSS, padding.PKCS1v15)): 

748 raise TypeError("Padding must be PSS or PKCS1v15") 

749 if not isinstance(private_key, rsa.RSAPrivateKey): 

750 raise TypeError("Padding is only supported for RSA keys") 

751 

752 return rust_x509.create_x509_csr( 

753 self, private_key, algorithm, rsa_padding 

754 ) 

755 

756 

757class CertificateBuilder: 

758 _extensions: list[Extension[ExtensionType]] 

759 

760 def __init__( 

761 self, 

762 issuer_name: Name | None = None, 

763 subject_name: Name | None = None, 

764 public_key: CertificatePublicKeyTypes | None = None, 

765 serial_number: int | None = None, 

766 not_valid_before: datetime.datetime | None = None, 

767 not_valid_after: datetime.datetime | None = None, 

768 extensions: list[Extension[ExtensionType]] = [], 

769 ) -> None: 

770 self._version = Version.v3 

771 self._issuer_name = issuer_name 

772 self._subject_name = subject_name 

773 self._public_key = public_key 

774 self._serial_number = serial_number 

775 self._not_valid_before = not_valid_before 

776 self._not_valid_after = not_valid_after 

777 self._extensions = extensions 

778 

779 def issuer_name(self, name: Name) -> CertificateBuilder: 

780 """ 

781 Sets the CA's distinguished name. 

782 """ 

783 if not isinstance(name, Name): 

784 raise TypeError("Expecting x509.Name object.") 

785 if self._issuer_name is not None: 

786 raise ValueError("The issuer name may only be set once.") 

787 return CertificateBuilder( 

788 name, 

789 self._subject_name, 

790 self._public_key, 

791 self._serial_number, 

792 self._not_valid_before, 

793 self._not_valid_after, 

794 self._extensions, 

795 ) 

796 

797 def subject_name(self, name: Name) -> CertificateBuilder: 

798 """ 

799 Sets the requestor's distinguished name. 

800 """ 

801 if not isinstance(name, Name): 

802 raise TypeError("Expecting x509.Name object.") 

803 if self._subject_name is not None: 

804 raise ValueError("The subject name may only be set once.") 

805 return CertificateBuilder( 

806 self._issuer_name, 

807 name, 

808 self._public_key, 

809 self._serial_number, 

810 self._not_valid_before, 

811 self._not_valid_after, 

812 self._extensions, 

813 ) 

814 

815 def public_key( 

816 self, 

817 key: CertificatePublicKeyTypes, 

818 ) -> CertificateBuilder: 

819 """ 

820 Sets the requestor's public key (as found in the signing request). 

821 """ 

822 if not isinstance( 

823 key, 

824 ( 

825 dsa.DSAPublicKey, 

826 rsa.RSAPublicKey, 

827 ec.EllipticCurvePublicKey, 

828 ed25519.Ed25519PublicKey, 

829 ed448.Ed448PublicKey, 

830 x25519.X25519PublicKey, 

831 x448.X448PublicKey, 

832 ), 

833 ): 

834 raise TypeError( 

835 "Expecting one of DSAPublicKey, RSAPublicKey," 

836 " EllipticCurvePublicKey, Ed25519PublicKey," 

837 " Ed448PublicKey, X25519PublicKey, or " 

838 "X448PublicKey." 

839 ) 

840 if self._public_key is not None: 

841 raise ValueError("The public key may only be set once.") 

842 return CertificateBuilder( 

843 self._issuer_name, 

844 self._subject_name, 

845 key, 

846 self._serial_number, 

847 self._not_valid_before, 

848 self._not_valid_after, 

849 self._extensions, 

850 ) 

851 

852 def serial_number(self, number: int) -> CertificateBuilder: 

853 """ 

854 Sets the certificate serial number. 

855 """ 

856 if not isinstance(number, int): 

857 raise TypeError("Serial number must be of integral type.") 

858 if self._serial_number is not None: 

859 raise ValueError("The serial number may only be set once.") 

860 if number <= 0: 

861 raise ValueError("The serial number should be positive.") 

862 

863 # ASN.1 integers are always signed, so most significant bit must be 

864 # zero. 

865 if number.bit_length() >= 160: # As defined in RFC 5280 

866 raise ValueError( 

867 "The serial number should not be more than 159 " "bits." 

868 ) 

869 return CertificateBuilder( 

870 self._issuer_name, 

871 self._subject_name, 

872 self._public_key, 

873 number, 

874 self._not_valid_before, 

875 self._not_valid_after, 

876 self._extensions, 

877 ) 

878 

879 def not_valid_before(self, time: datetime.datetime) -> CertificateBuilder: 

880 """ 

881 Sets the certificate activation time. 

882 """ 

883 if not isinstance(time, datetime.datetime): 

884 raise TypeError("Expecting datetime object.") 

885 if self._not_valid_before is not None: 

886 raise ValueError("The not valid before may only be set once.") 

887 time = _convert_to_naive_utc_time(time) 

888 if time < _EARLIEST_UTC_TIME: 

889 raise ValueError( 

890 "The not valid before date must be on or after" 

891 " 1950 January 1)." 

892 ) 

893 if self._not_valid_after is not None and time > self._not_valid_after: 

894 raise ValueError( 

895 "The not valid before date must be before the not valid after " 

896 "date." 

897 ) 

898 return CertificateBuilder( 

899 self._issuer_name, 

900 self._subject_name, 

901 self._public_key, 

902 self._serial_number, 

903 time, 

904 self._not_valid_after, 

905 self._extensions, 

906 ) 

907 

908 def not_valid_after(self, time: datetime.datetime) -> CertificateBuilder: 

909 """ 

910 Sets the certificate expiration time. 

911 """ 

912 if not isinstance(time, datetime.datetime): 

913 raise TypeError("Expecting datetime object.") 

914 if self._not_valid_after is not None: 

915 raise ValueError("The not valid after may only be set once.") 

916 time = _convert_to_naive_utc_time(time) 

917 if time < _EARLIEST_UTC_TIME: 

918 raise ValueError( 

919 "The not valid after date must be on or after" 

920 " 1950 January 1." 

921 ) 

922 if ( 

923 self._not_valid_before is not None 

924 and time < self._not_valid_before 

925 ): 

926 raise ValueError( 

927 "The not valid after date must be after the not valid before " 

928 "date." 

929 ) 

930 return CertificateBuilder( 

931 self._issuer_name, 

932 self._subject_name, 

933 self._public_key, 

934 self._serial_number, 

935 self._not_valid_before, 

936 time, 

937 self._extensions, 

938 ) 

939 

940 def add_extension( 

941 self, extval: ExtensionType, critical: bool 

942 ) -> CertificateBuilder: 

943 """ 

944 Adds an X.509 extension to the certificate. 

945 """ 

946 if not isinstance(extval, ExtensionType): 

947 raise TypeError("extension must be an ExtensionType") 

948 

949 extension = Extension(extval.oid, critical, extval) 

950 _reject_duplicate_extension(extension, self._extensions) 

951 

952 return CertificateBuilder( 

953 self._issuer_name, 

954 self._subject_name, 

955 self._public_key, 

956 self._serial_number, 

957 self._not_valid_before, 

958 self._not_valid_after, 

959 [*self._extensions, extension], 

960 ) 

961 

962 def sign( 

963 self, 

964 private_key: CertificateIssuerPrivateKeyTypes, 

965 algorithm: _AllowedHashTypes | None, 

966 backend: typing.Any = None, 

967 *, 

968 rsa_padding: padding.PSS | padding.PKCS1v15 | None = None, 

969 ) -> Certificate: 

970 """ 

971 Signs the certificate using the CA's private key. 

972 """ 

973 if self._subject_name is None: 

974 raise ValueError("A certificate must have a subject name") 

975 

976 if self._issuer_name is None: 

977 raise ValueError("A certificate must have an issuer name") 

978 

979 if self._serial_number is None: 

980 raise ValueError("A certificate must have a serial number") 

981 

982 if self._not_valid_before is None: 

983 raise ValueError("A certificate must have a not valid before time") 

984 

985 if self._not_valid_after is None: 

986 raise ValueError("A certificate must have a not valid after time") 

987 

988 if self._public_key is None: 

989 raise ValueError("A certificate must have a public key") 

990 

991 if rsa_padding is not None: 

992 if not isinstance(rsa_padding, (padding.PSS, padding.PKCS1v15)): 

993 raise TypeError("Padding must be PSS or PKCS1v15") 

994 if not isinstance(private_key, rsa.RSAPrivateKey): 

995 raise TypeError("Padding is only supported for RSA keys") 

996 

997 return rust_x509.create_x509_certificate( 

998 self, private_key, algorithm, rsa_padding 

999 ) 

1000 

1001 

1002class CertificateRevocationListBuilder: 

1003 _extensions: list[Extension[ExtensionType]] 

1004 _revoked_certificates: list[RevokedCertificate] 

1005 

1006 def __init__( 

1007 self, 

1008 issuer_name: Name | None = None, 

1009 last_update: datetime.datetime | None = None, 

1010 next_update: datetime.datetime | None = None, 

1011 extensions: list[Extension[ExtensionType]] = [], 

1012 revoked_certificates: list[RevokedCertificate] = [], 

1013 ): 

1014 self._issuer_name = issuer_name 

1015 self._last_update = last_update 

1016 self._next_update = next_update 

1017 self._extensions = extensions 

1018 self._revoked_certificates = revoked_certificates 

1019 

1020 def issuer_name( 

1021 self, issuer_name: Name 

1022 ) -> CertificateRevocationListBuilder: 

1023 if not isinstance(issuer_name, Name): 

1024 raise TypeError("Expecting x509.Name object.") 

1025 if self._issuer_name is not None: 

1026 raise ValueError("The issuer name may only be set once.") 

1027 return CertificateRevocationListBuilder( 

1028 issuer_name, 

1029 self._last_update, 

1030 self._next_update, 

1031 self._extensions, 

1032 self._revoked_certificates, 

1033 ) 

1034 

1035 def last_update( 

1036 self, last_update: datetime.datetime 

1037 ) -> CertificateRevocationListBuilder: 

1038 if not isinstance(last_update, datetime.datetime): 

1039 raise TypeError("Expecting datetime object.") 

1040 if self._last_update is not None: 

1041 raise ValueError("Last update may only be set once.") 

1042 last_update = _convert_to_naive_utc_time(last_update) 

1043 if last_update < _EARLIEST_UTC_TIME: 

1044 raise ValueError( 

1045 "The last update date must be on or after" " 1950 January 1." 

1046 ) 

1047 if self._next_update is not None and last_update > self._next_update: 

1048 raise ValueError( 

1049 "The last update date must be before the next update date." 

1050 ) 

1051 return CertificateRevocationListBuilder( 

1052 self._issuer_name, 

1053 last_update, 

1054 self._next_update, 

1055 self._extensions, 

1056 self._revoked_certificates, 

1057 ) 

1058 

1059 def next_update( 

1060 self, next_update: datetime.datetime 

1061 ) -> CertificateRevocationListBuilder: 

1062 if not isinstance(next_update, datetime.datetime): 

1063 raise TypeError("Expecting datetime object.") 

1064 if self._next_update is not None: 

1065 raise ValueError("Last update may only be set once.") 

1066 next_update = _convert_to_naive_utc_time(next_update) 

1067 if next_update < _EARLIEST_UTC_TIME: 

1068 raise ValueError( 

1069 "The last update date must be on or after" " 1950 January 1." 

1070 ) 

1071 if self._last_update is not None and next_update < self._last_update: 

1072 raise ValueError( 

1073 "The next update date must be after the last update date." 

1074 ) 

1075 return CertificateRevocationListBuilder( 

1076 self._issuer_name, 

1077 self._last_update, 

1078 next_update, 

1079 self._extensions, 

1080 self._revoked_certificates, 

1081 ) 

1082 

1083 def add_extension( 

1084 self, extval: ExtensionType, critical: bool 

1085 ) -> CertificateRevocationListBuilder: 

1086 """ 

1087 Adds an X.509 extension to the certificate revocation list. 

1088 """ 

1089 if not isinstance(extval, ExtensionType): 

1090 raise TypeError("extension must be an ExtensionType") 

1091 

1092 extension = Extension(extval.oid, critical, extval) 

1093 _reject_duplicate_extension(extension, self._extensions) 

1094 return CertificateRevocationListBuilder( 

1095 self._issuer_name, 

1096 self._last_update, 

1097 self._next_update, 

1098 [*self._extensions, extension], 

1099 self._revoked_certificates, 

1100 ) 

1101 

1102 def add_revoked_certificate( 

1103 self, revoked_certificate: RevokedCertificate 

1104 ) -> CertificateRevocationListBuilder: 

1105 """ 

1106 Adds a revoked certificate to the CRL. 

1107 """ 

1108 if not isinstance(revoked_certificate, RevokedCertificate): 

1109 raise TypeError("Must be an instance of RevokedCertificate") 

1110 

1111 return CertificateRevocationListBuilder( 

1112 self._issuer_name, 

1113 self._last_update, 

1114 self._next_update, 

1115 self._extensions, 

1116 [*self._revoked_certificates, revoked_certificate], 

1117 ) 

1118 

1119 def sign( 

1120 self, 

1121 private_key: CertificateIssuerPrivateKeyTypes, 

1122 algorithm: _AllowedHashTypes | None, 

1123 backend: typing.Any = None, 

1124 *, 

1125 rsa_padding: padding.PSS | padding.PKCS1v15 | None = None, 

1126 ) -> CertificateRevocationList: 

1127 if self._issuer_name is None: 

1128 raise ValueError("A CRL must have an issuer name") 

1129 

1130 if self._last_update is None: 

1131 raise ValueError("A CRL must have a last update time") 

1132 

1133 if self._next_update is None: 

1134 raise ValueError("A CRL must have a next update time") 

1135 

1136 if rsa_padding is not None: 

1137 if not isinstance(rsa_padding, (padding.PSS, padding.PKCS1v15)): 

1138 raise TypeError("Padding must be PSS or PKCS1v15") 

1139 if not isinstance(private_key, rsa.RSAPrivateKey): 

1140 raise TypeError("Padding is only supported for RSA keys") 

1141 

1142 return rust_x509.create_x509_crl( 

1143 self, private_key, algorithm, rsa_padding 

1144 ) 

1145 

1146 

1147class RevokedCertificateBuilder: 

1148 def __init__( 

1149 self, 

1150 serial_number: int | None = None, 

1151 revocation_date: datetime.datetime | None = None, 

1152 extensions: list[Extension[ExtensionType]] = [], 

1153 ): 

1154 self._serial_number = serial_number 

1155 self._revocation_date = revocation_date 

1156 self._extensions = extensions 

1157 

1158 def serial_number(self, number: int) -> RevokedCertificateBuilder: 

1159 if not isinstance(number, int): 

1160 raise TypeError("Serial number must be of integral type.") 

1161 if self._serial_number is not None: 

1162 raise ValueError("The serial number may only be set once.") 

1163 if number <= 0: 

1164 raise ValueError("The serial number should be positive") 

1165 

1166 # ASN.1 integers are always signed, so most significant bit must be 

1167 # zero. 

1168 if number.bit_length() >= 160: # As defined in RFC 5280 

1169 raise ValueError( 

1170 "The serial number should not be more than 159 " "bits." 

1171 ) 

1172 return RevokedCertificateBuilder( 

1173 number, self._revocation_date, self._extensions 

1174 ) 

1175 

1176 def revocation_date( 

1177 self, time: datetime.datetime 

1178 ) -> RevokedCertificateBuilder: 

1179 if not isinstance(time, datetime.datetime): 

1180 raise TypeError("Expecting datetime object.") 

1181 if self._revocation_date is not None: 

1182 raise ValueError("The revocation date may only be set once.") 

1183 time = _convert_to_naive_utc_time(time) 

1184 if time < _EARLIEST_UTC_TIME: 

1185 raise ValueError( 

1186 "The revocation date must be on or after" " 1950 January 1." 

1187 ) 

1188 return RevokedCertificateBuilder( 

1189 self._serial_number, time, self._extensions 

1190 ) 

1191 

1192 def add_extension( 

1193 self, extval: ExtensionType, critical: bool 

1194 ) -> RevokedCertificateBuilder: 

1195 if not isinstance(extval, ExtensionType): 

1196 raise TypeError("extension must be an ExtensionType") 

1197 

1198 extension = Extension(extval.oid, critical, extval) 

1199 _reject_duplicate_extension(extension, self._extensions) 

1200 return RevokedCertificateBuilder( 

1201 self._serial_number, 

1202 self._revocation_date, 

1203 [*self._extensions, extension], 

1204 ) 

1205 

1206 def build(self, backend: typing.Any = None) -> RevokedCertificate: 

1207 if self._serial_number is None: 

1208 raise ValueError("A revoked certificate must have a serial number") 

1209 if self._revocation_date is None: 

1210 raise ValueError( 

1211 "A revoked certificate must have a revocation date" 

1212 ) 

1213 return _RawRevokedCertificate( 

1214 self._serial_number, 

1215 self._revocation_date, 

1216 Extensions(self._extensions), 

1217 ) 

1218 

1219 

1220def random_serial_number() -> int: 

1221 return int.from_bytes(os.urandom(20), "big") >> 1