Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/cryptography/x509/base.py: 51%

458 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:50 +0000

1# This file is dual licensed under the terms of the Apache License, Version 

2# 2.0, and the BSD License. See the LICENSE file in the root of this repository 

3# for complete details. 

4 

5from __future__ import annotations 

6 

7import abc 

8import datetime 

9import os 

10import typing 

11 

12from cryptography import utils 

13from cryptography.hazmat.bindings._rust import x509 as rust_x509 

14from cryptography.hazmat.primitives import hashes, serialization 

15from cryptography.hazmat.primitives.asymmetric import ( 

16 dsa, 

17 ec, 

18 ed448, 

19 ed25519, 

20 padding, 

21 rsa, 

22 x448, 

23 x25519, 

24) 

25from cryptography.hazmat.primitives.asymmetric.types import ( 

26 CertificateIssuerPrivateKeyTypes, 

27 CertificateIssuerPublicKeyTypes, 

28 CertificatePublicKeyTypes, 

29) 

30from cryptography.x509.extensions import ( 

31 Extension, 

32 Extensions, 

33 ExtensionType, 

34 _make_sequence_methods, 

35) 

36from cryptography.x509.name import Name, _ASN1Type 

37from cryptography.x509.oid import ObjectIdentifier 

38 

39_EARLIEST_UTC_TIME = datetime.datetime(1950, 1, 1) 

40 

41# This must be kept in sync with sign.rs's list of allowable types in 

42# identify_hash_type 

43_AllowedHashTypes = typing.Union[ 

44 hashes.SHA224, 

45 hashes.SHA256, 

46 hashes.SHA384, 

47 hashes.SHA512, 

48 hashes.SHA3_224, 

49 hashes.SHA3_256, 

50 hashes.SHA3_384, 

51 hashes.SHA3_512, 

52] 

53 

54 

55class AttributeNotFound(Exception): 

56 def __init__(self, msg: str, oid: ObjectIdentifier) -> None: 

57 super().__init__(msg) 

58 self.oid = oid 

59 

60 

61def _reject_duplicate_extension( 

62 extension: Extension[ExtensionType], 

63 extensions: typing.List[Extension[ExtensionType]], 

64) -> None: 

65 # This is quadratic in the number of extensions 

66 for e in extensions: 

67 if e.oid == extension.oid: 

68 raise ValueError("This extension has already been set.") 

69 

70 

71def _reject_duplicate_attribute( 

72 oid: ObjectIdentifier, 

73 attributes: typing.List[ 

74 typing.Tuple[ObjectIdentifier, bytes, typing.Optional[int]] 

75 ], 

76) -> None: 

77 # This is quadratic in the number of attributes 

78 for attr_oid, _, _ in attributes: 

79 if attr_oid == oid: 

80 raise ValueError("This attribute has already been set.") 

81 

82 

83def _convert_to_naive_utc_time(time: datetime.datetime) -> datetime.datetime: 

84 """Normalizes a datetime to a naive datetime in UTC. 

85 

86 time -- datetime to normalize. Assumed to be in UTC if not timezone 

87 aware. 

88 """ 

89 if time.tzinfo is not None: 

90 offset = time.utcoffset() 

91 offset = offset if offset else datetime.timedelta() 

92 return time.replace(tzinfo=None) - offset 

93 else: 

94 return time 

95 

96 

97class Attribute: 

98 def __init__( 

99 self, 

100 oid: ObjectIdentifier, 

101 value: bytes, 

102 _type: int = _ASN1Type.UTF8String.value, 

103 ) -> None: 

104 self._oid = oid 

105 self._value = value 

106 self._type = _type 

107 

108 @property 

109 def oid(self) -> ObjectIdentifier: 

110 return self._oid 

111 

112 @property 

113 def value(self) -> bytes: 

114 return self._value 

115 

116 def __repr__(self) -> str: 

117 return f"<Attribute(oid={self.oid}, value={self.value!r})>" 

118 

119 def __eq__(self, other: object) -> bool: 

120 if not isinstance(other, Attribute): 

121 return NotImplemented 

122 

123 return ( 

124 self.oid == other.oid 

125 and self.value == other.value 

126 and self._type == other._type 

127 ) 

128 

129 def __hash__(self) -> int: 

130 return hash((self.oid, self.value, self._type)) 

131 

132 

133class Attributes: 

134 def __init__( 

135 self, 

136 attributes: typing.Iterable[Attribute], 

137 ) -> None: 

138 self._attributes = list(attributes) 

139 

140 __len__, __iter__, __getitem__ = _make_sequence_methods("_attributes") 

141 

142 def __repr__(self) -> str: 

143 return f"<Attributes({self._attributes})>" 

144 

145 def get_attribute_for_oid(self, oid: ObjectIdentifier) -> Attribute: 

146 for attr in self: 

147 if attr.oid == oid: 

148 return attr 

149 

150 raise AttributeNotFound(f"No {oid} attribute was found", oid) 

151 

152 

153class Version(utils.Enum): 

154 v1 = 0 

155 v3 = 2 

156 

157 

158class InvalidVersion(Exception): 

159 def __init__(self, msg: str, parsed_version: int) -> None: 

160 super().__init__(msg) 

161 self.parsed_version = parsed_version 

162 

163 

164class Certificate(metaclass=abc.ABCMeta): 

165 @abc.abstractmethod 

166 def fingerprint(self, algorithm: hashes.HashAlgorithm) -> bytes: 

167 """ 

168 Returns bytes using digest passed. 

169 """ 

170 

171 @property 

172 @abc.abstractmethod 

173 def serial_number(self) -> int: 

174 """ 

175 Returns certificate serial number 

176 """ 

177 

178 @property 

179 @abc.abstractmethod 

180 def version(self) -> Version: 

181 """ 

182 Returns the certificate version 

183 """ 

184 

185 @abc.abstractmethod 

186 def public_key(self) -> CertificatePublicKeyTypes: 

187 """ 

188 Returns the public key 

189 """ 

190 

191 @property 

192 @abc.abstractmethod 

193 def not_valid_before(self) -> datetime.datetime: 

194 """ 

195 Not before time (represented as UTC datetime) 

196 """ 

197 

198 @property 

199 @abc.abstractmethod 

200 def not_valid_after(self) -> datetime.datetime: 

201 """ 

202 Not after time (represented as UTC datetime) 

203 """ 

204 

205 @property 

206 @abc.abstractmethod 

207 def issuer(self) -> Name: 

208 """ 

209 Returns the issuer name object. 

210 """ 

211 

212 @property 

213 @abc.abstractmethod 

214 def subject(self) -> Name: 

215 """ 

216 Returns the subject name object. 

217 """ 

218 

219 @property 

220 @abc.abstractmethod 

221 def signature_hash_algorithm( 

222 self, 

223 ) -> typing.Optional[hashes.HashAlgorithm]: 

224 """ 

225 Returns a HashAlgorithm corresponding to the type of the digest signed 

226 in the certificate. 

227 """ 

228 

229 @property 

230 @abc.abstractmethod 

231 def signature_algorithm_oid(self) -> ObjectIdentifier: 

232 """ 

233 Returns the ObjectIdentifier of the signature algorithm. 

234 """ 

235 

236 @property 

237 @abc.abstractmethod 

238 def signature_algorithm_parameters( 

239 self, 

240 ) -> typing.Union[None, padding.PSS, padding.PKCS1v15, ec.ECDSA]: 

241 """ 

242 Returns the signature algorithm parameters. 

243 """ 

244 

245 @property 

246 @abc.abstractmethod 

247 def extensions(self) -> Extensions: 

248 """ 

249 Returns an Extensions object. 

250 """ 

251 

252 @property 

253 @abc.abstractmethod 

254 def signature(self) -> bytes: 

255 """ 

256 Returns the signature bytes. 

257 """ 

258 

259 @property 

260 @abc.abstractmethod 

261 def tbs_certificate_bytes(self) -> bytes: 

262 """ 

263 Returns the tbsCertificate payload bytes as defined in RFC 5280. 

264 """ 

265 

266 @property 

267 @abc.abstractmethod 

268 def tbs_precertificate_bytes(self) -> bytes: 

269 """ 

270 Returns the tbsCertificate payload bytes with the SCT list extension 

271 stripped. 

272 """ 

273 

274 @abc.abstractmethod 

275 def __eq__(self, other: object) -> bool: 

276 """ 

277 Checks equality. 

278 """ 

279 

280 @abc.abstractmethod 

281 def __hash__(self) -> int: 

282 """ 

283 Computes a hash. 

284 """ 

285 

286 @abc.abstractmethod 

287 def public_bytes(self, encoding: serialization.Encoding) -> bytes: 

288 """ 

289 Serializes the certificate to PEM or DER format. 

290 """ 

291 

292 @abc.abstractmethod 

293 def verify_directly_issued_by(self, issuer: Certificate) -> None: 

294 """ 

295 This method verifies that certificate issuer name matches the 

296 issuer subject name and that the certificate is signed by the 

297 issuer's private key. No other validation is performed. 

298 """ 

299 

300 

301# Runtime isinstance checks need this since the rust class is not a subclass. 

302Certificate.register(rust_x509.Certificate) 

303 

304 

305class RevokedCertificate(metaclass=abc.ABCMeta): 

306 @property 

307 @abc.abstractmethod 

308 def serial_number(self) -> int: 

309 """ 

310 Returns the serial number of the revoked certificate. 

311 """ 

312 

313 @property 

314 @abc.abstractmethod 

315 def revocation_date(self) -> datetime.datetime: 

316 """ 

317 Returns the date of when this certificate was revoked. 

318 """ 

319 

320 @property 

321 @abc.abstractmethod 

322 def extensions(self) -> Extensions: 

323 """ 

324 Returns an Extensions object containing a list of Revoked extensions. 

325 """ 

326 

327 

328# Runtime isinstance checks need this since the rust class is not a subclass. 

329RevokedCertificate.register(rust_x509.RevokedCertificate) 

330 

331 

332class _RawRevokedCertificate(RevokedCertificate): 

333 def __init__( 

334 self, 

335 serial_number: int, 

336 revocation_date: datetime.datetime, 

337 extensions: Extensions, 

338 ): 

339 self._serial_number = serial_number 

340 self._revocation_date = revocation_date 

341 self._extensions = extensions 

342 

343 @property 

344 def serial_number(self) -> int: 

345 return self._serial_number 

346 

347 @property 

348 def revocation_date(self) -> datetime.datetime: 

349 return self._revocation_date 

350 

351 @property 

352 def extensions(self) -> Extensions: 

353 return self._extensions 

354 

355 

356class CertificateRevocationList(metaclass=abc.ABCMeta): 

357 @abc.abstractmethod 

358 def public_bytes(self, encoding: serialization.Encoding) -> bytes: 

359 """ 

360 Serializes the CRL to PEM or DER format. 

361 """ 

362 

363 @abc.abstractmethod 

364 def fingerprint(self, algorithm: hashes.HashAlgorithm) -> bytes: 

365 """ 

366 Returns bytes using digest passed. 

367 """ 

368 

369 @abc.abstractmethod 

370 def get_revoked_certificate_by_serial_number( 

371 self, serial_number: int 

372 ) -> typing.Optional[RevokedCertificate]: 

373 """ 

374 Returns an instance of RevokedCertificate or None if the serial_number 

375 is not in the CRL. 

376 """ 

377 

378 @property 

379 @abc.abstractmethod 

380 def signature_hash_algorithm( 

381 self, 

382 ) -> typing.Optional[hashes.HashAlgorithm]: 

383 """ 

384 Returns a HashAlgorithm corresponding to the type of the digest signed 

385 in the certificate. 

386 """ 

387 

388 @property 

389 @abc.abstractmethod 

390 def signature_algorithm_oid(self) -> ObjectIdentifier: 

391 """ 

392 Returns the ObjectIdentifier of the signature algorithm. 

393 """ 

394 

395 @property 

396 @abc.abstractmethod 

397 def issuer(self) -> Name: 

398 """ 

399 Returns the X509Name with the issuer of this CRL. 

400 """ 

401 

402 @property 

403 @abc.abstractmethod 

404 def next_update(self) -> typing.Optional[datetime.datetime]: 

405 """ 

406 Returns the date of next update for this CRL. 

407 """ 

408 

409 @property 

410 @abc.abstractmethod 

411 def last_update(self) -> datetime.datetime: 

412 """ 

413 Returns the date of last update for this CRL. 

414 """ 

415 

416 @property 

417 @abc.abstractmethod 

418 def extensions(self) -> Extensions: 

419 """ 

420 Returns an Extensions object containing a list of CRL extensions. 

421 """ 

422 

423 @property 

424 @abc.abstractmethod 

425 def signature(self) -> bytes: 

426 """ 

427 Returns the signature bytes. 

428 """ 

429 

430 @property 

431 @abc.abstractmethod 

432 def tbs_certlist_bytes(self) -> bytes: 

433 """ 

434 Returns the tbsCertList payload bytes as defined in RFC 5280. 

435 """ 

436 

437 @abc.abstractmethod 

438 def __eq__(self, other: object) -> bool: 

439 """ 

440 Checks equality. 

441 """ 

442 

443 @abc.abstractmethod 

444 def __len__(self) -> int: 

445 """ 

446 Number of revoked certificates in the CRL. 

447 """ 

448 

449 @typing.overload 

450 def __getitem__(self, idx: int) -> RevokedCertificate: 

451 ... 

452 

453 @typing.overload 

454 def __getitem__(self, idx: slice) -> typing.List[RevokedCertificate]: 

455 ... 

456 

457 @abc.abstractmethod 

458 def __getitem__( 

459 self, idx: typing.Union[int, slice] 

460 ) -> typing.Union[RevokedCertificate, typing.List[RevokedCertificate]]: 

461 """ 

462 Returns a revoked certificate (or slice of revoked certificates). 

463 """ 

464 

465 @abc.abstractmethod 

466 def __iter__(self) -> typing.Iterator[RevokedCertificate]: 

467 """ 

468 Iterator over the revoked certificates 

469 """ 

470 

471 @abc.abstractmethod 

472 def is_signature_valid( 

473 self, public_key: CertificateIssuerPublicKeyTypes 

474 ) -> bool: 

475 """ 

476 Verifies signature of revocation list against given public key. 

477 """ 

478 

479 

480CertificateRevocationList.register(rust_x509.CertificateRevocationList) 

481 

482 

483class CertificateSigningRequest(metaclass=abc.ABCMeta): 

484 @abc.abstractmethod 

485 def __eq__(self, other: object) -> bool: 

486 """ 

487 Checks equality. 

488 """ 

489 

490 @abc.abstractmethod 

491 def __hash__(self) -> int: 

492 """ 

493 Computes a hash. 

494 """ 

495 

496 @abc.abstractmethod 

497 def public_key(self) -> CertificatePublicKeyTypes: 

498 """ 

499 Returns the public key 

500 """ 

501 

502 @property 

503 @abc.abstractmethod 

504 def subject(self) -> Name: 

505 """ 

506 Returns the subject name object. 

507 """ 

508 

509 @property 

510 @abc.abstractmethod 

511 def signature_hash_algorithm( 

512 self, 

513 ) -> typing.Optional[hashes.HashAlgorithm]: 

514 """ 

515 Returns a HashAlgorithm corresponding to the type of the digest signed 

516 in the certificate. 

517 """ 

518 

519 @property 

520 @abc.abstractmethod 

521 def signature_algorithm_oid(self) -> ObjectIdentifier: 

522 """ 

523 Returns the ObjectIdentifier of the signature algorithm. 

524 """ 

525 

526 @property 

527 @abc.abstractmethod 

528 def extensions(self) -> Extensions: 

529 """ 

530 Returns the extensions in the signing request. 

531 """ 

532 

533 @property 

534 @abc.abstractmethod 

535 def attributes(self) -> Attributes: 

536 """ 

537 Returns an Attributes object. 

538 """ 

539 

540 @abc.abstractmethod 

541 def public_bytes(self, encoding: serialization.Encoding) -> bytes: 

542 """ 

543 Encodes the request to PEM or DER format. 

544 """ 

545 

546 @property 

547 @abc.abstractmethod 

548 def signature(self) -> bytes: 

549 """ 

550 Returns the signature bytes. 

551 """ 

552 

553 @property 

554 @abc.abstractmethod 

555 def tbs_certrequest_bytes(self) -> bytes: 

556 """ 

557 Returns the PKCS#10 CertificationRequestInfo bytes as defined in RFC 

558 2986. 

559 """ 

560 

561 @property 

562 @abc.abstractmethod 

563 def is_signature_valid(self) -> bool: 

564 """ 

565 Verifies signature of signing request. 

566 """ 

567 

568 @abc.abstractmethod 

569 def get_attribute_for_oid(self, oid: ObjectIdentifier) -> bytes: 

570 """ 

571 Get the attribute value for a given OID. 

572 """ 

573 

574 

575# Runtime isinstance checks need this since the rust class is not a subclass. 

576CertificateSigningRequest.register(rust_x509.CertificateSigningRequest) 

577 

578 

579# Backend argument preserved for API compatibility, but ignored. 

580def load_pem_x509_certificate( 

581 data: bytes, backend: typing.Any = None 

582) -> Certificate: 

583 return rust_x509.load_pem_x509_certificate(data) 

584 

585 

586def load_pem_x509_certificates(data: bytes) -> typing.List[Certificate]: 

587 return rust_x509.load_pem_x509_certificates(data) 

588 

589 

590# Backend argument preserved for API compatibility, but ignored. 

591def load_der_x509_certificate( 

592 data: bytes, backend: typing.Any = None 

593) -> Certificate: 

594 return rust_x509.load_der_x509_certificate(data) 

595 

596 

597# Backend argument preserved for API compatibility, but ignored. 

598def load_pem_x509_csr( 

599 data: bytes, backend: typing.Any = None 

600) -> CertificateSigningRequest: 

601 return rust_x509.load_pem_x509_csr(data) 

602 

603 

604# Backend argument preserved for API compatibility, but ignored. 

605def load_der_x509_csr( 

606 data: bytes, backend: typing.Any = None 

607) -> CertificateSigningRequest: 

608 return rust_x509.load_der_x509_csr(data) 

609 

610 

611# Backend argument preserved for API compatibility, but ignored. 

612def load_pem_x509_crl( 

613 data: bytes, backend: typing.Any = None 

614) -> CertificateRevocationList: 

615 return rust_x509.load_pem_x509_crl(data) 

616 

617 

618# Backend argument preserved for API compatibility, but ignored. 

619def load_der_x509_crl( 

620 data: bytes, backend: typing.Any = None 

621) -> CertificateRevocationList: 

622 return rust_x509.load_der_x509_crl(data) 

623 

624 

625class CertificateSigningRequestBuilder: 

626 def __init__( 

627 self, 

628 subject_name: typing.Optional[Name] = None, 

629 extensions: typing.List[Extension[ExtensionType]] = [], 

630 attributes: typing.List[ 

631 typing.Tuple[ObjectIdentifier, bytes, typing.Optional[int]] 

632 ] = [], 

633 ): 

634 """ 

635 Creates an empty X.509 certificate request (v1). 

636 """ 

637 self._subject_name = subject_name 

638 self._extensions = extensions 

639 self._attributes = attributes 

640 

641 def subject_name(self, name: Name) -> CertificateSigningRequestBuilder: 

642 """ 

643 Sets the certificate requestor's distinguished name. 

644 """ 

645 if not isinstance(name, Name): 

646 raise TypeError("Expecting x509.Name object.") 

647 if self._subject_name is not None: 

648 raise ValueError("The subject name may only be set once.") 

649 return CertificateSigningRequestBuilder( 

650 name, self._extensions, self._attributes 

651 ) 

652 

653 def add_extension( 

654 self, extval: ExtensionType, critical: bool 

655 ) -> CertificateSigningRequestBuilder: 

656 """ 

657 Adds an X.509 extension to the certificate request. 

658 """ 

659 if not isinstance(extval, ExtensionType): 

660 raise TypeError("extension must be an ExtensionType") 

661 

662 extension = Extension(extval.oid, critical, extval) 

663 _reject_duplicate_extension(extension, self._extensions) 

664 

665 return CertificateSigningRequestBuilder( 

666 self._subject_name, 

667 self._extensions + [extension], 

668 self._attributes, 

669 ) 

670 

671 def add_attribute( 

672 self, 

673 oid: ObjectIdentifier, 

674 value: bytes, 

675 *, 

676 _tag: typing.Optional[_ASN1Type] = None, 

677 ) -> CertificateSigningRequestBuilder: 

678 """ 

679 Adds an X.509 attribute with an OID and associated value. 

680 """ 

681 if not isinstance(oid, ObjectIdentifier): 

682 raise TypeError("oid must be an ObjectIdentifier") 

683 

684 if not isinstance(value, bytes): 

685 raise TypeError("value must be bytes") 

686 

687 if _tag is not None and not isinstance(_tag, _ASN1Type): 

688 raise TypeError("tag must be _ASN1Type") 

689 

690 _reject_duplicate_attribute(oid, self._attributes) 

691 

692 if _tag is not None: 

693 tag = _tag.value 

694 else: 

695 tag = None 

696 

697 return CertificateSigningRequestBuilder( 

698 self._subject_name, 

699 self._extensions, 

700 self._attributes + [(oid, value, tag)], 

701 ) 

702 

703 def sign( 

704 self, 

705 private_key: CertificateIssuerPrivateKeyTypes, 

706 algorithm: typing.Optional[_AllowedHashTypes], 

707 backend: typing.Any = None, 

708 ) -> CertificateSigningRequest: 

709 """ 

710 Signs the request using the requestor's private key. 

711 """ 

712 if self._subject_name is None: 

713 raise ValueError("A CertificateSigningRequest must have a subject") 

714 return rust_x509.create_x509_csr(self, private_key, algorithm) 

715 

716 

717class CertificateBuilder: 

718 _extensions: typing.List[Extension[ExtensionType]] 

719 

720 def __init__( 

721 self, 

722 issuer_name: typing.Optional[Name] = None, 

723 subject_name: typing.Optional[Name] = None, 

724 public_key: typing.Optional[CertificatePublicKeyTypes] = None, 

725 serial_number: typing.Optional[int] = None, 

726 not_valid_before: typing.Optional[datetime.datetime] = None, 

727 not_valid_after: typing.Optional[datetime.datetime] = None, 

728 extensions: typing.List[Extension[ExtensionType]] = [], 

729 ) -> None: 

730 self._version = Version.v3 

731 self._issuer_name = issuer_name 

732 self._subject_name = subject_name 

733 self._public_key = public_key 

734 self._serial_number = serial_number 

735 self._not_valid_before = not_valid_before 

736 self._not_valid_after = not_valid_after 

737 self._extensions = extensions 

738 

739 def issuer_name(self, name: Name) -> CertificateBuilder: 

740 """ 

741 Sets the CA's distinguished name. 

742 """ 

743 if not isinstance(name, Name): 

744 raise TypeError("Expecting x509.Name object.") 

745 if self._issuer_name is not None: 

746 raise ValueError("The issuer name may only be set once.") 

747 return CertificateBuilder( 

748 name, 

749 self._subject_name, 

750 self._public_key, 

751 self._serial_number, 

752 self._not_valid_before, 

753 self._not_valid_after, 

754 self._extensions, 

755 ) 

756 

757 def subject_name(self, name: Name) -> CertificateBuilder: 

758 """ 

759 Sets the requestor's distinguished name. 

760 """ 

761 if not isinstance(name, Name): 

762 raise TypeError("Expecting x509.Name object.") 

763 if self._subject_name is not None: 

764 raise ValueError("The subject name may only be set once.") 

765 return CertificateBuilder( 

766 self._issuer_name, 

767 name, 

768 self._public_key, 

769 self._serial_number, 

770 self._not_valid_before, 

771 self._not_valid_after, 

772 self._extensions, 

773 ) 

774 

775 def public_key( 

776 self, 

777 key: CertificatePublicKeyTypes, 

778 ) -> CertificateBuilder: 

779 """ 

780 Sets the requestor's public key (as found in the signing request). 

781 """ 

782 if not isinstance( 

783 key, 

784 ( 

785 dsa.DSAPublicKey, 

786 rsa.RSAPublicKey, 

787 ec.EllipticCurvePublicKey, 

788 ed25519.Ed25519PublicKey, 

789 ed448.Ed448PublicKey, 

790 x25519.X25519PublicKey, 

791 x448.X448PublicKey, 

792 ), 

793 ): 

794 raise TypeError( 

795 "Expecting one of DSAPublicKey, RSAPublicKey," 

796 " EllipticCurvePublicKey, Ed25519PublicKey," 

797 " Ed448PublicKey, X25519PublicKey, or " 

798 "X448PublicKey." 

799 ) 

800 if self._public_key is not None: 

801 raise ValueError("The public key may only be set once.") 

802 return CertificateBuilder( 

803 self._issuer_name, 

804 self._subject_name, 

805 key, 

806 self._serial_number, 

807 self._not_valid_before, 

808 self._not_valid_after, 

809 self._extensions, 

810 ) 

811 

812 def serial_number(self, number: int) -> CertificateBuilder: 

813 """ 

814 Sets the certificate serial number. 

815 """ 

816 if not isinstance(number, int): 

817 raise TypeError("Serial number must be of integral type.") 

818 if self._serial_number is not None: 

819 raise ValueError("The serial number may only be set once.") 

820 if number <= 0: 

821 raise ValueError("The serial number should be positive.") 

822 

823 # ASN.1 integers are always signed, so most significant bit must be 

824 # zero. 

825 if number.bit_length() >= 160: # As defined in RFC 5280 

826 raise ValueError( 

827 "The serial number should not be more than 159 " "bits." 

828 ) 

829 return CertificateBuilder( 

830 self._issuer_name, 

831 self._subject_name, 

832 self._public_key, 

833 number, 

834 self._not_valid_before, 

835 self._not_valid_after, 

836 self._extensions, 

837 ) 

838 

839 def not_valid_before(self, time: datetime.datetime) -> CertificateBuilder: 

840 """ 

841 Sets the certificate activation time. 

842 """ 

843 if not isinstance(time, datetime.datetime): 

844 raise TypeError("Expecting datetime object.") 

845 if self._not_valid_before is not None: 

846 raise ValueError("The not valid before may only be set once.") 

847 time = _convert_to_naive_utc_time(time) 

848 if time < _EARLIEST_UTC_TIME: 

849 raise ValueError( 

850 "The not valid before date must be on or after" 

851 " 1950 January 1)." 

852 ) 

853 if self._not_valid_after is not None and time > self._not_valid_after: 

854 raise ValueError( 

855 "The not valid before date must be before the not valid after " 

856 "date." 

857 ) 

858 return CertificateBuilder( 

859 self._issuer_name, 

860 self._subject_name, 

861 self._public_key, 

862 self._serial_number, 

863 time, 

864 self._not_valid_after, 

865 self._extensions, 

866 ) 

867 

868 def not_valid_after(self, time: datetime.datetime) -> CertificateBuilder: 

869 """ 

870 Sets the certificate expiration time. 

871 """ 

872 if not isinstance(time, datetime.datetime): 

873 raise TypeError("Expecting datetime object.") 

874 if self._not_valid_after is not None: 

875 raise ValueError("The not valid after may only be set once.") 

876 time = _convert_to_naive_utc_time(time) 

877 if time < _EARLIEST_UTC_TIME: 

878 raise ValueError( 

879 "The not valid after date must be on or after" 

880 " 1950 January 1." 

881 ) 

882 if ( 

883 self._not_valid_before is not None 

884 and time < self._not_valid_before 

885 ): 

886 raise ValueError( 

887 "The not valid after date must be after the not valid before " 

888 "date." 

889 ) 

890 return CertificateBuilder( 

891 self._issuer_name, 

892 self._subject_name, 

893 self._public_key, 

894 self._serial_number, 

895 self._not_valid_before, 

896 time, 

897 self._extensions, 

898 ) 

899 

900 def add_extension( 

901 self, extval: ExtensionType, critical: bool 

902 ) -> CertificateBuilder: 

903 """ 

904 Adds an X.509 extension to the certificate. 

905 """ 

906 if not isinstance(extval, ExtensionType): 

907 raise TypeError("extension must be an ExtensionType") 

908 

909 extension = Extension(extval.oid, critical, extval) 

910 _reject_duplicate_extension(extension, self._extensions) 

911 

912 return CertificateBuilder( 

913 self._issuer_name, 

914 self._subject_name, 

915 self._public_key, 

916 self._serial_number, 

917 self._not_valid_before, 

918 self._not_valid_after, 

919 self._extensions + [extension], 

920 ) 

921 

922 def sign( 

923 self, 

924 private_key: CertificateIssuerPrivateKeyTypes, 

925 algorithm: typing.Optional[_AllowedHashTypes], 

926 backend: typing.Any = None, 

927 *, 

928 rsa_padding: typing.Optional[ 

929 typing.Union[padding.PSS, padding.PKCS1v15] 

930 ] = None, 

931 ) -> Certificate: 

932 """ 

933 Signs the certificate using the CA's private key. 

934 """ 

935 if self._subject_name is None: 

936 raise ValueError("A certificate must have a subject name") 

937 

938 if self._issuer_name is None: 

939 raise ValueError("A certificate must have an issuer name") 

940 

941 if self._serial_number is None: 

942 raise ValueError("A certificate must have a serial number") 

943 

944 if self._not_valid_before is None: 

945 raise ValueError("A certificate must have a not valid before time") 

946 

947 if self._not_valid_after is None: 

948 raise ValueError("A certificate must have a not valid after time") 

949 

950 if self._public_key is None: 

951 raise ValueError("A certificate must have a public key") 

952 

953 if rsa_padding is not None: 

954 if not isinstance(rsa_padding, (padding.PSS, padding.PKCS1v15)): 

955 raise TypeError("Padding must be PSS or PKCS1v15") 

956 if not isinstance(private_key, rsa.RSAPrivateKey): 

957 raise TypeError("Padding is only supported for RSA keys") 

958 

959 return rust_x509.create_x509_certificate( 

960 self, private_key, algorithm, rsa_padding 

961 ) 

962 

963 

964class CertificateRevocationListBuilder: 

965 _extensions: typing.List[Extension[ExtensionType]] 

966 _revoked_certificates: typing.List[RevokedCertificate] 

967 

968 def __init__( 

969 self, 

970 issuer_name: typing.Optional[Name] = None, 

971 last_update: typing.Optional[datetime.datetime] = None, 

972 next_update: typing.Optional[datetime.datetime] = None, 

973 extensions: typing.List[Extension[ExtensionType]] = [], 

974 revoked_certificates: typing.List[RevokedCertificate] = [], 

975 ): 

976 self._issuer_name = issuer_name 

977 self._last_update = last_update 

978 self._next_update = next_update 

979 self._extensions = extensions 

980 self._revoked_certificates = revoked_certificates 

981 

982 def issuer_name( 

983 self, issuer_name: Name 

984 ) -> CertificateRevocationListBuilder: 

985 if not isinstance(issuer_name, Name): 

986 raise TypeError("Expecting x509.Name object.") 

987 if self._issuer_name is not None: 

988 raise ValueError("The issuer name may only be set once.") 

989 return CertificateRevocationListBuilder( 

990 issuer_name, 

991 self._last_update, 

992 self._next_update, 

993 self._extensions, 

994 self._revoked_certificates, 

995 ) 

996 

997 def last_update( 

998 self, last_update: datetime.datetime 

999 ) -> CertificateRevocationListBuilder: 

1000 if not isinstance(last_update, datetime.datetime): 

1001 raise TypeError("Expecting datetime object.") 

1002 if self._last_update is not None: 

1003 raise ValueError("Last update may only be set once.") 

1004 last_update = _convert_to_naive_utc_time(last_update) 

1005 if last_update < _EARLIEST_UTC_TIME: 

1006 raise ValueError( 

1007 "The last update date must be on or after" " 1950 January 1." 

1008 ) 

1009 if self._next_update is not None and last_update > self._next_update: 

1010 raise ValueError( 

1011 "The last update date must be before the next update date." 

1012 ) 

1013 return CertificateRevocationListBuilder( 

1014 self._issuer_name, 

1015 last_update, 

1016 self._next_update, 

1017 self._extensions, 

1018 self._revoked_certificates, 

1019 ) 

1020 

1021 def next_update( 

1022 self, next_update: datetime.datetime 

1023 ) -> CertificateRevocationListBuilder: 

1024 if not isinstance(next_update, datetime.datetime): 

1025 raise TypeError("Expecting datetime object.") 

1026 if self._next_update is not None: 

1027 raise ValueError("Last update may only be set once.") 

1028 next_update = _convert_to_naive_utc_time(next_update) 

1029 if next_update < _EARLIEST_UTC_TIME: 

1030 raise ValueError( 

1031 "The last update date must be on or after" " 1950 January 1." 

1032 ) 

1033 if self._last_update is not None and next_update < self._last_update: 

1034 raise ValueError( 

1035 "The next update date must be after the last update date." 

1036 ) 

1037 return CertificateRevocationListBuilder( 

1038 self._issuer_name, 

1039 self._last_update, 

1040 next_update, 

1041 self._extensions, 

1042 self._revoked_certificates, 

1043 ) 

1044 

1045 def add_extension( 

1046 self, extval: ExtensionType, critical: bool 

1047 ) -> CertificateRevocationListBuilder: 

1048 """ 

1049 Adds an X.509 extension to the certificate revocation list. 

1050 """ 

1051 if not isinstance(extval, ExtensionType): 

1052 raise TypeError("extension must be an ExtensionType") 

1053 

1054 extension = Extension(extval.oid, critical, extval) 

1055 _reject_duplicate_extension(extension, self._extensions) 

1056 return CertificateRevocationListBuilder( 

1057 self._issuer_name, 

1058 self._last_update, 

1059 self._next_update, 

1060 self._extensions + [extension], 

1061 self._revoked_certificates, 

1062 ) 

1063 

1064 def add_revoked_certificate( 

1065 self, revoked_certificate: RevokedCertificate 

1066 ) -> CertificateRevocationListBuilder: 

1067 """ 

1068 Adds a revoked certificate to the CRL. 

1069 """ 

1070 if not isinstance(revoked_certificate, RevokedCertificate): 

1071 raise TypeError("Must be an instance of RevokedCertificate") 

1072 

1073 return CertificateRevocationListBuilder( 

1074 self._issuer_name, 

1075 self._last_update, 

1076 self._next_update, 

1077 self._extensions, 

1078 self._revoked_certificates + [revoked_certificate], 

1079 ) 

1080 

1081 def sign( 

1082 self, 

1083 private_key: CertificateIssuerPrivateKeyTypes, 

1084 algorithm: typing.Optional[_AllowedHashTypes], 

1085 backend: typing.Any = None, 

1086 ) -> CertificateRevocationList: 

1087 if self._issuer_name is None: 

1088 raise ValueError("A CRL must have an issuer name") 

1089 

1090 if self._last_update is None: 

1091 raise ValueError("A CRL must have a last update time") 

1092 

1093 if self._next_update is None: 

1094 raise ValueError("A CRL must have a next update time") 

1095 

1096 return rust_x509.create_x509_crl(self, private_key, algorithm) 

1097 

1098 

1099class RevokedCertificateBuilder: 

1100 def __init__( 

1101 self, 

1102 serial_number: typing.Optional[int] = None, 

1103 revocation_date: typing.Optional[datetime.datetime] = None, 

1104 extensions: typing.List[Extension[ExtensionType]] = [], 

1105 ): 

1106 self._serial_number = serial_number 

1107 self._revocation_date = revocation_date 

1108 self._extensions = extensions 

1109 

1110 def serial_number(self, number: int) -> RevokedCertificateBuilder: 

1111 if not isinstance(number, int): 

1112 raise TypeError("Serial number must be of integral type.") 

1113 if self._serial_number is not None: 

1114 raise ValueError("The serial number may only be set once.") 

1115 if number <= 0: 

1116 raise ValueError("The serial number should be positive") 

1117 

1118 # ASN.1 integers are always signed, so most significant bit must be 

1119 # zero. 

1120 if number.bit_length() >= 160: # As defined in RFC 5280 

1121 raise ValueError( 

1122 "The serial number should not be more than 159 " "bits." 

1123 ) 

1124 return RevokedCertificateBuilder( 

1125 number, self._revocation_date, self._extensions 

1126 ) 

1127 

1128 def revocation_date( 

1129 self, time: datetime.datetime 

1130 ) -> RevokedCertificateBuilder: 

1131 if not isinstance(time, datetime.datetime): 

1132 raise TypeError("Expecting datetime object.") 

1133 if self._revocation_date is not None: 

1134 raise ValueError("The revocation date may only be set once.") 

1135 time = _convert_to_naive_utc_time(time) 

1136 if time < _EARLIEST_UTC_TIME: 

1137 raise ValueError( 

1138 "The revocation date must be on or after" " 1950 January 1." 

1139 ) 

1140 return RevokedCertificateBuilder( 

1141 self._serial_number, time, self._extensions 

1142 ) 

1143 

1144 def add_extension( 

1145 self, extval: ExtensionType, critical: bool 

1146 ) -> RevokedCertificateBuilder: 

1147 if not isinstance(extval, ExtensionType): 

1148 raise TypeError("extension must be an ExtensionType") 

1149 

1150 extension = Extension(extval.oid, critical, extval) 

1151 _reject_duplicate_extension(extension, self._extensions) 

1152 return RevokedCertificateBuilder( 

1153 self._serial_number, 

1154 self._revocation_date, 

1155 self._extensions + [extension], 

1156 ) 

1157 

1158 def build(self, backend: typing.Any = None) -> RevokedCertificate: 

1159 if self._serial_number is None: 

1160 raise ValueError("A revoked certificate must have a serial number") 

1161 if self._revocation_date is None: 

1162 raise ValueError( 

1163 "A revoked certificate must have a revocation date" 

1164 ) 

1165 return _RawRevokedCertificate( 

1166 self._serial_number, 

1167 self._revocation_date, 

1168 Extensions(self._extensions), 

1169 ) 

1170 

1171 

1172def random_serial_number() -> int: 

1173 return int.from_bytes(os.urandom(20), "big") >> 1