Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/cryptography/x509/base.py: 52%

486 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 07:26 +0000

1# This file is dual licensed under the terms of the Apache License, Version 

2# 2.0, and the BSD License. See the LICENSE file in the root of this repository 

3# for complete details. 

4 

5from __future__ import annotations 

6 

7import abc 

8import datetime 

9import os 

10import typing 

11import warnings 

12 

13from cryptography import utils 

14from cryptography.hazmat.bindings._rust import x509 as rust_x509 

15from cryptography.hazmat.primitives import hashes, serialization 

16from cryptography.hazmat.primitives.asymmetric import ( 

17 dsa, 

18 ec, 

19 ed448, 

20 ed25519, 

21 padding, 

22 rsa, 

23 x448, 

24 x25519, 

25) 

26from cryptography.hazmat.primitives.asymmetric.types import ( 

27 CertificateIssuerPrivateKeyTypes, 

28 CertificateIssuerPublicKeyTypes, 

29 CertificatePublicKeyTypes, 

30) 

31from cryptography.x509.extensions import ( 

32 Extension, 

33 Extensions, 

34 ExtensionType, 

35 _make_sequence_methods, 

36) 

37from cryptography.x509.name import Name, _ASN1Type 

38from cryptography.x509.oid import ObjectIdentifier 

39 

40_EARLIEST_UTC_TIME = datetime.datetime(1950, 1, 1) 

41 

42# This must be kept in sync with sign.rs's list of allowable types in 

43# identify_hash_type 

44_AllowedHashTypes = typing.Union[ 

45 hashes.SHA224, 

46 hashes.SHA256, 

47 hashes.SHA384, 

48 hashes.SHA512, 

49 hashes.SHA3_224, 

50 hashes.SHA3_256, 

51 hashes.SHA3_384, 

52 hashes.SHA3_512, 

53] 

54 

55 

56class AttributeNotFound(Exception): 

57 def __init__(self, msg: str, oid: ObjectIdentifier) -> None: 

58 super().__init__(msg) 

59 self.oid = oid 

60 

61 

62def _reject_duplicate_extension( 

63 extension: Extension[ExtensionType], 

64 extensions: list[Extension[ExtensionType]], 

65) -> None: 

66 # This is quadratic in the number of extensions 

67 for e in extensions: 

68 if e.oid == extension.oid: 

69 raise ValueError("This extension has already been set.") 

70 

71 

72def _reject_duplicate_attribute( 

73 oid: ObjectIdentifier, 

74 attributes: list[tuple[ObjectIdentifier, bytes, int | None]], 

75) -> None: 

76 # This is quadratic in the number of attributes 

77 for attr_oid, _, _ in attributes: 

78 if attr_oid == oid: 

79 raise ValueError("This attribute has already been set.") 

80 

81 

82def _convert_to_naive_utc_time(time: datetime.datetime) -> datetime.datetime: 

83 """Normalizes a datetime to a naive datetime in UTC. 

84 

85 time -- datetime to normalize. Assumed to be in UTC if not timezone 

86 aware. 

87 """ 

88 if time.tzinfo is not None: 

89 offset = time.utcoffset() 

90 offset = offset if offset else datetime.timedelta() 

91 return time.replace(tzinfo=None) - offset 

92 else: 

93 return time 

94 

95 

96class Attribute: 

97 def __init__( 

98 self, 

99 oid: ObjectIdentifier, 

100 value: bytes, 

101 _type: int = _ASN1Type.UTF8String.value, 

102 ) -> None: 

103 self._oid = oid 

104 self._value = value 

105 self._type = _type 

106 

107 @property 

108 def oid(self) -> ObjectIdentifier: 

109 return self._oid 

110 

111 @property 

112 def value(self) -> bytes: 

113 return self._value 

114 

115 def __repr__(self) -> str: 

116 return f"<Attribute(oid={self.oid}, value={self.value!r})>" 

117 

118 def __eq__(self, other: object) -> bool: 

119 if not isinstance(other, Attribute): 

120 return NotImplemented 

121 

122 return ( 

123 self.oid == other.oid 

124 and self.value == other.value 

125 and self._type == other._type 

126 ) 

127 

128 def __hash__(self) -> int: 

129 return hash((self.oid, self.value, self._type)) 

130 

131 

132class Attributes: 

133 def __init__( 

134 self, 

135 attributes: typing.Iterable[Attribute], 

136 ) -> None: 

137 self._attributes = list(attributes) 

138 

139 __len__, __iter__, __getitem__ = _make_sequence_methods("_attributes") 

140 

141 def __repr__(self) -> str: 

142 return f"<Attributes({self._attributes})>" 

143 

144 def get_attribute_for_oid(self, oid: ObjectIdentifier) -> Attribute: 

145 for attr in self: 

146 if attr.oid == oid: 

147 return attr 

148 

149 raise AttributeNotFound(f"No {oid} attribute was found", oid) 

150 

151 

152class Version(utils.Enum): 

153 v1 = 0 

154 v3 = 2 

155 

156 

157class InvalidVersion(Exception): 

158 def __init__(self, msg: str, parsed_version: int) -> None: 

159 super().__init__(msg) 

160 self.parsed_version = parsed_version 

161 

162 

163class Certificate(metaclass=abc.ABCMeta): 

164 @abc.abstractmethod 

165 def fingerprint(self, algorithm: hashes.HashAlgorithm) -> bytes: 

166 """ 

167 Returns bytes using digest passed. 

168 """ 

169 

170 @property 

171 @abc.abstractmethod 

172 def serial_number(self) -> int: 

173 """ 

174 Returns certificate serial number 

175 """ 

176 

177 @property 

178 @abc.abstractmethod 

179 def version(self) -> Version: 

180 """ 

181 Returns the certificate version 

182 """ 

183 

184 @abc.abstractmethod 

185 def public_key(self) -> CertificatePublicKeyTypes: 

186 """ 

187 Returns the public key 

188 """ 

189 

190 @property 

191 @abc.abstractmethod 

192 def not_valid_before(self) -> datetime.datetime: 

193 """ 

194 Not before time (represented as UTC datetime) 

195 """ 

196 

197 @property 

198 @abc.abstractmethod 

199 def not_valid_before_utc(self) -> datetime.datetime: 

200 """ 

201 Not before time (represented as a non-naive UTC datetime) 

202 """ 

203 

204 @property 

205 @abc.abstractmethod 

206 def not_valid_after(self) -> datetime.datetime: 

207 """ 

208 Not after time (represented as UTC datetime) 

209 """ 

210 

211 @property 

212 @abc.abstractmethod 

213 def not_valid_after_utc(self) -> datetime.datetime: 

214 """ 

215 Not after time (represented as a non-naive UTC datetime) 

216 """ 

217 

218 @property 

219 @abc.abstractmethod 

220 def issuer(self) -> Name: 

221 """ 

222 Returns the issuer name object. 

223 """ 

224 

225 @property 

226 @abc.abstractmethod 

227 def subject(self) -> Name: 

228 """ 

229 Returns the subject name object. 

230 """ 

231 

232 @property 

233 @abc.abstractmethod 

234 def signature_hash_algorithm( 

235 self, 

236 ) -> hashes.HashAlgorithm | None: 

237 """ 

238 Returns a HashAlgorithm corresponding to the type of the digest signed 

239 in the certificate. 

240 """ 

241 

242 @property 

243 @abc.abstractmethod 

244 def signature_algorithm_oid(self) -> ObjectIdentifier: 

245 """ 

246 Returns the ObjectIdentifier of the signature algorithm. 

247 """ 

248 

249 @property 

250 @abc.abstractmethod 

251 def signature_algorithm_parameters( 

252 self, 

253 ) -> None | padding.PSS | padding.PKCS1v15 | ec.ECDSA: 

254 """ 

255 Returns the signature algorithm parameters. 

256 """ 

257 

258 @property 

259 @abc.abstractmethod 

260 def extensions(self) -> Extensions: 

261 """ 

262 Returns an Extensions object. 

263 """ 

264 

265 @property 

266 @abc.abstractmethod 

267 def signature(self) -> bytes: 

268 """ 

269 Returns the signature bytes. 

270 """ 

271 

272 @property 

273 @abc.abstractmethod 

274 def tbs_certificate_bytes(self) -> bytes: 

275 """ 

276 Returns the tbsCertificate payload bytes as defined in RFC 5280. 

277 """ 

278 

279 @property 

280 @abc.abstractmethod 

281 def tbs_precertificate_bytes(self) -> bytes: 

282 """ 

283 Returns the tbsCertificate payload bytes with the SCT list extension 

284 stripped. 

285 """ 

286 

287 @abc.abstractmethod 

288 def __eq__(self, other: object) -> bool: 

289 """ 

290 Checks equality. 

291 """ 

292 

293 @abc.abstractmethod 

294 def __hash__(self) -> int: 

295 """ 

296 Computes a hash. 

297 """ 

298 

299 @abc.abstractmethod 

300 def public_bytes(self, encoding: serialization.Encoding) -> bytes: 

301 """ 

302 Serializes the certificate to PEM or DER format. 

303 """ 

304 

305 @abc.abstractmethod 

306 def verify_directly_issued_by(self, issuer: Certificate) -> None: 

307 """ 

308 This method verifies that certificate issuer name matches the 

309 issuer subject name and that the certificate is signed by the 

310 issuer's private key. No other validation is performed. 

311 """ 

312 

313 

314# Runtime isinstance checks need this since the rust class is not a subclass. 

315Certificate.register(rust_x509.Certificate) 

316 

317 

318class RevokedCertificate(metaclass=abc.ABCMeta): 

319 @property 

320 @abc.abstractmethod 

321 def serial_number(self) -> int: 

322 """ 

323 Returns the serial number of the revoked certificate. 

324 """ 

325 

326 @property 

327 @abc.abstractmethod 

328 def revocation_date(self) -> datetime.datetime: 

329 """ 

330 Returns the date of when this certificate was revoked. 

331 """ 

332 

333 @property 

334 @abc.abstractmethod 

335 def revocation_date_utc(self) -> datetime.datetime: 

336 """ 

337 Returns the date of when this certificate was revoked as a non-naive 

338 UTC datetime. 

339 """ 

340 

341 @property 

342 @abc.abstractmethod 

343 def extensions(self) -> Extensions: 

344 """ 

345 Returns an Extensions object containing a list of Revoked extensions. 

346 """ 

347 

348 

349# Runtime isinstance checks need this since the rust class is not a subclass. 

350RevokedCertificate.register(rust_x509.RevokedCertificate) 

351 

352 

353class _RawRevokedCertificate(RevokedCertificate): 

354 def __init__( 

355 self, 

356 serial_number: int, 

357 revocation_date: datetime.datetime, 

358 extensions: Extensions, 

359 ): 

360 self._serial_number = serial_number 

361 self._revocation_date = revocation_date 

362 self._extensions = extensions 

363 

364 @property 

365 def serial_number(self) -> int: 

366 return self._serial_number 

367 

368 @property 

369 def revocation_date(self) -> datetime.datetime: 

370 warnings.warn( 

371 "Properties that return a naïve datetime object have been " 

372 "deprecated. Please switch to revocation_date_utc.", 

373 utils.DeprecatedIn42, 

374 stacklevel=2, 

375 ) 

376 return self._revocation_date 

377 

378 @property 

379 def revocation_date_utc(self) -> datetime.datetime: 

380 return self._revocation_date.replace(tzinfo=datetime.timezone.utc) 

381 

382 @property 

383 def extensions(self) -> Extensions: 

384 return self._extensions 

385 

386 

387class CertificateRevocationList(metaclass=abc.ABCMeta): 

388 @abc.abstractmethod 

389 def public_bytes(self, encoding: serialization.Encoding) -> bytes: 

390 """ 

391 Serializes the CRL to PEM or DER format. 

392 """ 

393 

394 @abc.abstractmethod 

395 def fingerprint(self, algorithm: hashes.HashAlgorithm) -> bytes: 

396 """ 

397 Returns bytes using digest passed. 

398 """ 

399 

400 @abc.abstractmethod 

401 def get_revoked_certificate_by_serial_number( 

402 self, serial_number: int 

403 ) -> RevokedCertificate | None: 

404 """ 

405 Returns an instance of RevokedCertificate or None if the serial_number 

406 is not in the CRL. 

407 """ 

408 

409 @property 

410 @abc.abstractmethod 

411 def signature_hash_algorithm( 

412 self, 

413 ) -> hashes.HashAlgorithm | None: 

414 """ 

415 Returns a HashAlgorithm corresponding to the type of the digest signed 

416 in the certificate. 

417 """ 

418 

419 @property 

420 @abc.abstractmethod 

421 def signature_algorithm_oid(self) -> ObjectIdentifier: 

422 """ 

423 Returns the ObjectIdentifier of the signature algorithm. 

424 """ 

425 

426 @property 

427 @abc.abstractmethod 

428 def issuer(self) -> Name: 

429 """ 

430 Returns the X509Name with the issuer of this CRL. 

431 """ 

432 

433 @property 

434 @abc.abstractmethod 

435 def next_update(self) -> datetime.datetime | None: 

436 """ 

437 Returns the date of next update for this CRL. 

438 """ 

439 

440 @property 

441 @abc.abstractmethod 

442 def next_update_utc(self) -> datetime.datetime | None: 

443 """ 

444 Returns the date of next update for this CRL as a non-naive UTC 

445 datetime. 

446 """ 

447 

448 @property 

449 @abc.abstractmethod 

450 def last_update(self) -> datetime.datetime: 

451 """ 

452 Returns the date of last update for this CRL. 

453 """ 

454 

455 @property 

456 @abc.abstractmethod 

457 def last_update_utc(self) -> datetime.datetime: 

458 """ 

459 Returns the date of last update for this CRL as a non-naive UTC 

460 datetime. 

461 """ 

462 

463 @property 

464 @abc.abstractmethod 

465 def extensions(self) -> Extensions: 

466 """ 

467 Returns an Extensions object containing a list of CRL extensions. 

468 """ 

469 

470 @property 

471 @abc.abstractmethod 

472 def signature(self) -> bytes: 

473 """ 

474 Returns the signature bytes. 

475 """ 

476 

477 @property 

478 @abc.abstractmethod 

479 def tbs_certlist_bytes(self) -> bytes: 

480 """ 

481 Returns the tbsCertList payload bytes as defined in RFC 5280. 

482 """ 

483 

484 @abc.abstractmethod 

485 def __eq__(self, other: object) -> bool: 

486 """ 

487 Checks equality. 

488 """ 

489 

490 @abc.abstractmethod 

491 def __len__(self) -> int: 

492 """ 

493 Number of revoked certificates in the CRL. 

494 """ 

495 

496 @typing.overload 

497 def __getitem__(self, idx: int) -> RevokedCertificate: 

498 ... 

499 

500 @typing.overload 

501 def __getitem__(self, idx: slice) -> list[RevokedCertificate]: 

502 ... 

503 

504 @abc.abstractmethod 

505 def __getitem__( 

506 self, idx: int | slice 

507 ) -> RevokedCertificate | list[RevokedCertificate]: 

508 """ 

509 Returns a revoked certificate (or slice of revoked certificates). 

510 """ 

511 

512 @abc.abstractmethod 

513 def __iter__(self) -> typing.Iterator[RevokedCertificate]: 

514 """ 

515 Iterator over the revoked certificates 

516 """ 

517 

518 @abc.abstractmethod 

519 def is_signature_valid( 

520 self, public_key: CertificateIssuerPublicKeyTypes 

521 ) -> bool: 

522 """ 

523 Verifies signature of revocation list against given public key. 

524 """ 

525 

526 

527CertificateRevocationList.register(rust_x509.CertificateRevocationList) 

528 

529 

530class CertificateSigningRequest(metaclass=abc.ABCMeta): 

531 @abc.abstractmethod 

532 def __eq__(self, other: object) -> bool: 

533 """ 

534 Checks equality. 

535 """ 

536 

537 @abc.abstractmethod 

538 def __hash__(self) -> int: 

539 """ 

540 Computes a hash. 

541 """ 

542 

543 @abc.abstractmethod 

544 def public_key(self) -> CertificatePublicKeyTypes: 

545 """ 

546 Returns the public key 

547 """ 

548 

549 @property 

550 @abc.abstractmethod 

551 def subject(self) -> Name: 

552 """ 

553 Returns the subject name object. 

554 """ 

555 

556 @property 

557 @abc.abstractmethod 

558 def signature_hash_algorithm( 

559 self, 

560 ) -> hashes.HashAlgorithm | None: 

561 """ 

562 Returns a HashAlgorithm corresponding to the type of the digest signed 

563 in the certificate. 

564 """ 

565 

566 @property 

567 @abc.abstractmethod 

568 def signature_algorithm_oid(self) -> ObjectIdentifier: 

569 """ 

570 Returns the ObjectIdentifier of the signature algorithm. 

571 """ 

572 

573 @property 

574 @abc.abstractmethod 

575 def signature_algorithm_parameters( 

576 self, 

577 ) -> None | padding.PSS | padding.PKCS1v15 | ec.ECDSA: 

578 """ 

579 Returns the signature algorithm parameters. 

580 """ 

581 

582 @property 

583 @abc.abstractmethod 

584 def extensions(self) -> Extensions: 

585 """ 

586 Returns the extensions in the signing request. 

587 """ 

588 

589 @property 

590 @abc.abstractmethod 

591 def attributes(self) -> Attributes: 

592 """ 

593 Returns an Attributes object. 

594 """ 

595 

596 @abc.abstractmethod 

597 def public_bytes(self, encoding: serialization.Encoding) -> bytes: 

598 """ 

599 Encodes the request to PEM or DER format. 

600 """ 

601 

602 @property 

603 @abc.abstractmethod 

604 def signature(self) -> bytes: 

605 """ 

606 Returns the signature bytes. 

607 """ 

608 

609 @property 

610 @abc.abstractmethod 

611 def tbs_certrequest_bytes(self) -> bytes: 

612 """ 

613 Returns the PKCS#10 CertificationRequestInfo bytes as defined in RFC 

614 2986. 

615 """ 

616 

617 @property 

618 @abc.abstractmethod 

619 def is_signature_valid(self) -> bool: 

620 """ 

621 Verifies signature of signing request. 

622 """ 

623 

624 @abc.abstractmethod 

625 def get_attribute_for_oid(self, oid: ObjectIdentifier) -> bytes: 

626 """ 

627 Get the attribute value for a given OID. 

628 """ 

629 

630 

631# Runtime isinstance checks need this since the rust class is not a subclass. 

632CertificateSigningRequest.register(rust_x509.CertificateSigningRequest) 

633 

634 

635# Backend argument preserved for API compatibility, but ignored. 

636def load_pem_x509_certificate( 

637 data: bytes, backend: typing.Any = None 

638) -> Certificate: 

639 return rust_x509.load_pem_x509_certificate(data) 

640 

641 

642def load_pem_x509_certificates(data: bytes) -> list[Certificate]: 

643 return rust_x509.load_pem_x509_certificates(data) 

644 

645 

646# Backend argument preserved for API compatibility, but ignored. 

647def load_der_x509_certificate( 

648 data: bytes, backend: typing.Any = None 

649) -> Certificate: 

650 return rust_x509.load_der_x509_certificate(data) 

651 

652 

653# Backend argument preserved for API compatibility, but ignored. 

654def load_pem_x509_csr( 

655 data: bytes, backend: typing.Any = None 

656) -> CertificateSigningRequest: 

657 return rust_x509.load_pem_x509_csr(data) 

658 

659 

660# Backend argument preserved for API compatibility, but ignored. 

661def load_der_x509_csr( 

662 data: bytes, backend: typing.Any = None 

663) -> CertificateSigningRequest: 

664 return rust_x509.load_der_x509_csr(data) 

665 

666 

667# Backend argument preserved for API compatibility, but ignored. 

668def load_pem_x509_crl( 

669 data: bytes, backend: typing.Any = None 

670) -> CertificateRevocationList: 

671 return rust_x509.load_pem_x509_crl(data) 

672 

673 

674# Backend argument preserved for API compatibility, but ignored. 

675def load_der_x509_crl( 

676 data: bytes, backend: typing.Any = None 

677) -> CertificateRevocationList: 

678 return rust_x509.load_der_x509_crl(data) 

679 

680 

681class CertificateSigningRequestBuilder: 

682 def __init__( 

683 self, 

684 subject_name: Name | None = None, 

685 extensions: list[Extension[ExtensionType]] = [], 

686 attributes: list[tuple[ObjectIdentifier, bytes, int | None]] = [], 

687 ): 

688 """ 

689 Creates an empty X.509 certificate request (v1). 

690 """ 

691 self._subject_name = subject_name 

692 self._extensions = extensions 

693 self._attributes = attributes 

694 

695 def subject_name(self, name: Name) -> CertificateSigningRequestBuilder: 

696 """ 

697 Sets the certificate requestor's distinguished name. 

698 """ 

699 if not isinstance(name, Name): 

700 raise TypeError("Expecting x509.Name object.") 

701 if self._subject_name is not None: 

702 raise ValueError("The subject name may only be set once.") 

703 return CertificateSigningRequestBuilder( 

704 name, self._extensions, self._attributes 

705 ) 

706 

707 def add_extension( 

708 self, extval: ExtensionType, critical: bool 

709 ) -> CertificateSigningRequestBuilder: 

710 """ 

711 Adds an X.509 extension to the certificate request. 

712 """ 

713 if not isinstance(extval, ExtensionType): 

714 raise TypeError("extension must be an ExtensionType") 

715 

716 extension = Extension(extval.oid, critical, extval) 

717 _reject_duplicate_extension(extension, self._extensions) 

718 

719 return CertificateSigningRequestBuilder( 

720 self._subject_name, 

721 [*self._extensions, extension], 

722 self._attributes, 

723 ) 

724 

725 def add_attribute( 

726 self, 

727 oid: ObjectIdentifier, 

728 value: bytes, 

729 *, 

730 _tag: _ASN1Type | None = None, 

731 ) -> CertificateSigningRequestBuilder: 

732 """ 

733 Adds an X.509 attribute with an OID and associated value. 

734 """ 

735 if not isinstance(oid, ObjectIdentifier): 

736 raise TypeError("oid must be an ObjectIdentifier") 

737 

738 if not isinstance(value, bytes): 

739 raise TypeError("value must be bytes") 

740 

741 if _tag is not None and not isinstance(_tag, _ASN1Type): 

742 raise TypeError("tag must be _ASN1Type") 

743 

744 _reject_duplicate_attribute(oid, self._attributes) 

745 

746 if _tag is not None: 

747 tag = _tag.value 

748 else: 

749 tag = None 

750 

751 return CertificateSigningRequestBuilder( 

752 self._subject_name, 

753 self._extensions, 

754 [*self._attributes, (oid, value, tag)], 

755 ) 

756 

757 def sign( 

758 self, 

759 private_key: CertificateIssuerPrivateKeyTypes, 

760 algorithm: _AllowedHashTypes | None, 

761 backend: typing.Any = None, 

762 *, 

763 rsa_padding: padding.PSS | padding.PKCS1v15 | None = None, 

764 ) -> CertificateSigningRequest: 

765 """ 

766 Signs the request using the requestor's private key. 

767 """ 

768 if self._subject_name is None: 

769 raise ValueError("A CertificateSigningRequest must have a subject") 

770 

771 if rsa_padding is not None: 

772 if not isinstance(rsa_padding, (padding.PSS, padding.PKCS1v15)): 

773 raise TypeError("Padding must be PSS or PKCS1v15") 

774 if not isinstance(private_key, rsa.RSAPrivateKey): 

775 raise TypeError("Padding is only supported for RSA keys") 

776 

777 return rust_x509.create_x509_csr( 

778 self, private_key, algorithm, rsa_padding 

779 ) 

780 

781 

782class CertificateBuilder: 

783 _extensions: list[Extension[ExtensionType]] 

784 

785 def __init__( 

786 self, 

787 issuer_name: Name | None = None, 

788 subject_name: Name | None = None, 

789 public_key: CertificatePublicKeyTypes | None = None, 

790 serial_number: int | None = None, 

791 not_valid_before: datetime.datetime | None = None, 

792 not_valid_after: datetime.datetime | None = None, 

793 extensions: list[Extension[ExtensionType]] = [], 

794 ) -> None: 

795 self._version = Version.v3 

796 self._issuer_name = issuer_name 

797 self._subject_name = subject_name 

798 self._public_key = public_key 

799 self._serial_number = serial_number 

800 self._not_valid_before = not_valid_before 

801 self._not_valid_after = not_valid_after 

802 self._extensions = extensions 

803 

804 def issuer_name(self, name: Name) -> CertificateBuilder: 

805 """ 

806 Sets the CA's distinguished name. 

807 """ 

808 if not isinstance(name, Name): 

809 raise TypeError("Expecting x509.Name object.") 

810 if self._issuer_name is not None: 

811 raise ValueError("The issuer name may only be set once.") 

812 return CertificateBuilder( 

813 name, 

814 self._subject_name, 

815 self._public_key, 

816 self._serial_number, 

817 self._not_valid_before, 

818 self._not_valid_after, 

819 self._extensions, 

820 ) 

821 

822 def subject_name(self, name: Name) -> CertificateBuilder: 

823 """ 

824 Sets the requestor's distinguished name. 

825 """ 

826 if not isinstance(name, Name): 

827 raise TypeError("Expecting x509.Name object.") 

828 if self._subject_name is not None: 

829 raise ValueError("The subject name may only be set once.") 

830 return CertificateBuilder( 

831 self._issuer_name, 

832 name, 

833 self._public_key, 

834 self._serial_number, 

835 self._not_valid_before, 

836 self._not_valid_after, 

837 self._extensions, 

838 ) 

839 

840 def public_key( 

841 self, 

842 key: CertificatePublicKeyTypes, 

843 ) -> CertificateBuilder: 

844 """ 

845 Sets the requestor's public key (as found in the signing request). 

846 """ 

847 if not isinstance( 

848 key, 

849 ( 

850 dsa.DSAPublicKey, 

851 rsa.RSAPublicKey, 

852 ec.EllipticCurvePublicKey, 

853 ed25519.Ed25519PublicKey, 

854 ed448.Ed448PublicKey, 

855 x25519.X25519PublicKey, 

856 x448.X448PublicKey, 

857 ), 

858 ): 

859 raise TypeError( 

860 "Expecting one of DSAPublicKey, RSAPublicKey," 

861 " EllipticCurvePublicKey, Ed25519PublicKey," 

862 " Ed448PublicKey, X25519PublicKey, or " 

863 "X448PublicKey." 

864 ) 

865 if self._public_key is not None: 

866 raise ValueError("The public key may only be set once.") 

867 return CertificateBuilder( 

868 self._issuer_name, 

869 self._subject_name, 

870 key, 

871 self._serial_number, 

872 self._not_valid_before, 

873 self._not_valid_after, 

874 self._extensions, 

875 ) 

876 

877 def serial_number(self, number: int) -> CertificateBuilder: 

878 """ 

879 Sets the certificate serial number. 

880 """ 

881 if not isinstance(number, int): 

882 raise TypeError("Serial number must be of integral type.") 

883 if self._serial_number is not None: 

884 raise ValueError("The serial number may only be set once.") 

885 if number <= 0: 

886 raise ValueError("The serial number should be positive.") 

887 

888 # ASN.1 integers are always signed, so most significant bit must be 

889 # zero. 

890 if number.bit_length() >= 160: # As defined in RFC 5280 

891 raise ValueError( 

892 "The serial number should not be more than 159 " "bits." 

893 ) 

894 return CertificateBuilder( 

895 self._issuer_name, 

896 self._subject_name, 

897 self._public_key, 

898 number, 

899 self._not_valid_before, 

900 self._not_valid_after, 

901 self._extensions, 

902 ) 

903 

904 def not_valid_before(self, time: datetime.datetime) -> CertificateBuilder: 

905 """ 

906 Sets the certificate activation time. 

907 """ 

908 if not isinstance(time, datetime.datetime): 

909 raise TypeError("Expecting datetime object.") 

910 if self._not_valid_before is not None: 

911 raise ValueError("The not valid before may only be set once.") 

912 time = _convert_to_naive_utc_time(time) 

913 if time < _EARLIEST_UTC_TIME: 

914 raise ValueError( 

915 "The not valid before date must be on or after" 

916 " 1950 January 1)." 

917 ) 

918 if self._not_valid_after is not None and time > self._not_valid_after: 

919 raise ValueError( 

920 "The not valid before date must be before the not valid after " 

921 "date." 

922 ) 

923 return CertificateBuilder( 

924 self._issuer_name, 

925 self._subject_name, 

926 self._public_key, 

927 self._serial_number, 

928 time, 

929 self._not_valid_after, 

930 self._extensions, 

931 ) 

932 

933 def not_valid_after(self, time: datetime.datetime) -> CertificateBuilder: 

934 """ 

935 Sets the certificate expiration time. 

936 """ 

937 if not isinstance(time, datetime.datetime): 

938 raise TypeError("Expecting datetime object.") 

939 if self._not_valid_after is not None: 

940 raise ValueError("The not valid after may only be set once.") 

941 time = _convert_to_naive_utc_time(time) 

942 if time < _EARLIEST_UTC_TIME: 

943 raise ValueError( 

944 "The not valid after date must be on or after" 

945 " 1950 January 1." 

946 ) 

947 if ( 

948 self._not_valid_before is not None 

949 and time < self._not_valid_before 

950 ): 

951 raise ValueError( 

952 "The not valid after date must be after the not valid before " 

953 "date." 

954 ) 

955 return CertificateBuilder( 

956 self._issuer_name, 

957 self._subject_name, 

958 self._public_key, 

959 self._serial_number, 

960 self._not_valid_before, 

961 time, 

962 self._extensions, 

963 ) 

964 

965 def add_extension( 

966 self, extval: ExtensionType, critical: bool 

967 ) -> CertificateBuilder: 

968 """ 

969 Adds an X.509 extension to the certificate. 

970 """ 

971 if not isinstance(extval, ExtensionType): 

972 raise TypeError("extension must be an ExtensionType") 

973 

974 extension = Extension(extval.oid, critical, extval) 

975 _reject_duplicate_extension(extension, self._extensions) 

976 

977 return CertificateBuilder( 

978 self._issuer_name, 

979 self._subject_name, 

980 self._public_key, 

981 self._serial_number, 

982 self._not_valid_before, 

983 self._not_valid_after, 

984 [*self._extensions, extension], 

985 ) 

986 

987 def sign( 

988 self, 

989 private_key: CertificateIssuerPrivateKeyTypes, 

990 algorithm: _AllowedHashTypes | None, 

991 backend: typing.Any = None, 

992 *, 

993 rsa_padding: padding.PSS | padding.PKCS1v15 | None = None, 

994 ) -> Certificate: 

995 """ 

996 Signs the certificate using the CA's private key. 

997 """ 

998 if self._subject_name is None: 

999 raise ValueError("A certificate must have a subject name") 

1000 

1001 if self._issuer_name is None: 

1002 raise ValueError("A certificate must have an issuer name") 

1003 

1004 if self._serial_number is None: 

1005 raise ValueError("A certificate must have a serial number") 

1006 

1007 if self._not_valid_before is None: 

1008 raise ValueError("A certificate must have a not valid before time") 

1009 

1010 if self._not_valid_after is None: 

1011 raise ValueError("A certificate must have a not valid after time") 

1012 

1013 if self._public_key is None: 

1014 raise ValueError("A certificate must have a public key") 

1015 

1016 if rsa_padding is not None: 

1017 if not isinstance(rsa_padding, (padding.PSS, padding.PKCS1v15)): 

1018 raise TypeError("Padding must be PSS or PKCS1v15") 

1019 if not isinstance(private_key, rsa.RSAPrivateKey): 

1020 raise TypeError("Padding is only supported for RSA keys") 

1021 

1022 return rust_x509.create_x509_certificate( 

1023 self, private_key, algorithm, rsa_padding 

1024 ) 

1025 

1026 

1027class CertificateRevocationListBuilder: 

1028 _extensions: list[Extension[ExtensionType]] 

1029 _revoked_certificates: list[RevokedCertificate] 

1030 

1031 def __init__( 

1032 self, 

1033 issuer_name: Name | None = None, 

1034 last_update: datetime.datetime | None = None, 

1035 next_update: datetime.datetime | None = None, 

1036 extensions: list[Extension[ExtensionType]] = [], 

1037 revoked_certificates: list[RevokedCertificate] = [], 

1038 ): 

1039 self._issuer_name = issuer_name 

1040 self._last_update = last_update 

1041 self._next_update = next_update 

1042 self._extensions = extensions 

1043 self._revoked_certificates = revoked_certificates 

1044 

1045 def issuer_name( 

1046 self, issuer_name: Name 

1047 ) -> CertificateRevocationListBuilder: 

1048 if not isinstance(issuer_name, Name): 

1049 raise TypeError("Expecting x509.Name object.") 

1050 if self._issuer_name is not None: 

1051 raise ValueError("The issuer name may only be set once.") 

1052 return CertificateRevocationListBuilder( 

1053 issuer_name, 

1054 self._last_update, 

1055 self._next_update, 

1056 self._extensions, 

1057 self._revoked_certificates, 

1058 ) 

1059 

1060 def last_update( 

1061 self, last_update: datetime.datetime 

1062 ) -> CertificateRevocationListBuilder: 

1063 if not isinstance(last_update, datetime.datetime): 

1064 raise TypeError("Expecting datetime object.") 

1065 if self._last_update is not None: 

1066 raise ValueError("Last update may only be set once.") 

1067 last_update = _convert_to_naive_utc_time(last_update) 

1068 if last_update < _EARLIEST_UTC_TIME: 

1069 raise ValueError( 

1070 "The last update date must be on or after" " 1950 January 1." 

1071 ) 

1072 if self._next_update is not None and last_update > self._next_update: 

1073 raise ValueError( 

1074 "The last update date must be before the next update date." 

1075 ) 

1076 return CertificateRevocationListBuilder( 

1077 self._issuer_name, 

1078 last_update, 

1079 self._next_update, 

1080 self._extensions, 

1081 self._revoked_certificates, 

1082 ) 

1083 

1084 def next_update( 

1085 self, next_update: datetime.datetime 

1086 ) -> CertificateRevocationListBuilder: 

1087 if not isinstance(next_update, datetime.datetime): 

1088 raise TypeError("Expecting datetime object.") 

1089 if self._next_update is not None: 

1090 raise ValueError("Last update may only be set once.") 

1091 next_update = _convert_to_naive_utc_time(next_update) 

1092 if next_update < _EARLIEST_UTC_TIME: 

1093 raise ValueError( 

1094 "The last update date must be on or after" " 1950 January 1." 

1095 ) 

1096 if self._last_update is not None and next_update < self._last_update: 

1097 raise ValueError( 

1098 "The next update date must be after the last update date." 

1099 ) 

1100 return CertificateRevocationListBuilder( 

1101 self._issuer_name, 

1102 self._last_update, 

1103 next_update, 

1104 self._extensions, 

1105 self._revoked_certificates, 

1106 ) 

1107 

1108 def add_extension( 

1109 self, extval: ExtensionType, critical: bool 

1110 ) -> CertificateRevocationListBuilder: 

1111 """ 

1112 Adds an X.509 extension to the certificate revocation list. 

1113 """ 

1114 if not isinstance(extval, ExtensionType): 

1115 raise TypeError("extension must be an ExtensionType") 

1116 

1117 extension = Extension(extval.oid, critical, extval) 

1118 _reject_duplicate_extension(extension, self._extensions) 

1119 return CertificateRevocationListBuilder( 

1120 self._issuer_name, 

1121 self._last_update, 

1122 self._next_update, 

1123 [*self._extensions, extension], 

1124 self._revoked_certificates, 

1125 ) 

1126 

1127 def add_revoked_certificate( 

1128 self, revoked_certificate: RevokedCertificate 

1129 ) -> CertificateRevocationListBuilder: 

1130 """ 

1131 Adds a revoked certificate to the CRL. 

1132 """ 

1133 if not isinstance(revoked_certificate, RevokedCertificate): 

1134 raise TypeError("Must be an instance of RevokedCertificate") 

1135 

1136 return CertificateRevocationListBuilder( 

1137 self._issuer_name, 

1138 self._last_update, 

1139 self._next_update, 

1140 self._extensions, 

1141 [*self._revoked_certificates, revoked_certificate], 

1142 ) 

1143 

1144 def sign( 

1145 self, 

1146 private_key: CertificateIssuerPrivateKeyTypes, 

1147 algorithm: _AllowedHashTypes | None, 

1148 backend: typing.Any = None, 

1149 ) -> CertificateRevocationList: 

1150 if self._issuer_name is None: 

1151 raise ValueError("A CRL must have an issuer name") 

1152 

1153 if self._last_update is None: 

1154 raise ValueError("A CRL must have a last update time") 

1155 

1156 if self._next_update is None: 

1157 raise ValueError("A CRL must have a next update time") 

1158 

1159 return rust_x509.create_x509_crl(self, private_key, algorithm) 

1160 

1161 

1162class RevokedCertificateBuilder: 

1163 def __init__( 

1164 self, 

1165 serial_number: int | None = None, 

1166 revocation_date: datetime.datetime | None = None, 

1167 extensions: list[Extension[ExtensionType]] = [], 

1168 ): 

1169 self._serial_number = serial_number 

1170 self._revocation_date = revocation_date 

1171 self._extensions = extensions 

1172 

1173 def serial_number(self, number: int) -> RevokedCertificateBuilder: 

1174 if not isinstance(number, int): 

1175 raise TypeError("Serial number must be of integral type.") 

1176 if self._serial_number is not None: 

1177 raise ValueError("The serial number may only be set once.") 

1178 if number <= 0: 

1179 raise ValueError("The serial number should be positive") 

1180 

1181 # ASN.1 integers are always signed, so most significant bit must be 

1182 # zero. 

1183 if number.bit_length() >= 160: # As defined in RFC 5280 

1184 raise ValueError( 

1185 "The serial number should not be more than 159 " "bits." 

1186 ) 

1187 return RevokedCertificateBuilder( 

1188 number, self._revocation_date, self._extensions 

1189 ) 

1190 

1191 def revocation_date( 

1192 self, time: datetime.datetime 

1193 ) -> RevokedCertificateBuilder: 

1194 if not isinstance(time, datetime.datetime): 

1195 raise TypeError("Expecting datetime object.") 

1196 if self._revocation_date is not None: 

1197 raise ValueError("The revocation date may only be set once.") 

1198 time = _convert_to_naive_utc_time(time) 

1199 if time < _EARLIEST_UTC_TIME: 

1200 raise ValueError( 

1201 "The revocation date must be on or after" " 1950 January 1." 

1202 ) 

1203 return RevokedCertificateBuilder( 

1204 self._serial_number, time, self._extensions 

1205 ) 

1206 

1207 def add_extension( 

1208 self, extval: ExtensionType, critical: bool 

1209 ) -> RevokedCertificateBuilder: 

1210 if not isinstance(extval, ExtensionType): 

1211 raise TypeError("extension must be an ExtensionType") 

1212 

1213 extension = Extension(extval.oid, critical, extval) 

1214 _reject_duplicate_extension(extension, self._extensions) 

1215 return RevokedCertificateBuilder( 

1216 self._serial_number, 

1217 self._revocation_date, 

1218 [*self._extensions, extension], 

1219 ) 

1220 

1221 def build(self, backend: typing.Any = None) -> RevokedCertificate: 

1222 if self._serial_number is None: 

1223 raise ValueError("A revoked certificate must have a serial number") 

1224 if self._revocation_date is None: 

1225 raise ValueError( 

1226 "A revoked certificate must have a revocation date" 

1227 ) 

1228 return _RawRevokedCertificate( 

1229 self._serial_number, 

1230 self._revocation_date, 

1231 Extensions(self._extensions), 

1232 ) 

1233 

1234 

1235def random_serial_number() -> int: 

1236 return int.from_bytes(os.urandom(20), "big") >> 1