Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/cryptography/x509/base.py: 51%

449 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:36 +0000

1# This file is dual licensed under the terms of the Apache License, Version 

2# 2.0, and the BSD License. See the LICENSE file in the root of this repository 

3# for complete details. 

4 

5 

6import abc 

7import datetime 

8import os 

9import typing 

10 

11from cryptography import utils 

12from cryptography.hazmat.bindings._rust import x509 as rust_x509 

13from cryptography.hazmat.primitives import hashes, serialization 

14from cryptography.hazmat.primitives.asymmetric import ( 

15 dsa, 

16 ec, 

17 ed448, 

18 ed25519, 

19 rsa, 

20 x448, 

21 x25519, 

22) 

23from cryptography.hazmat.primitives.asymmetric.types import ( 

24 CertificateIssuerPrivateKeyTypes, 

25 CertificateIssuerPublicKeyTypes, 

26 CertificatePublicKeyTypes, 

27) 

28from cryptography.x509.extensions import ( 

29 Extension, 

30 Extensions, 

31 ExtensionType, 

32 _make_sequence_methods, 

33) 

34from cryptography.x509.name import Name, _ASN1Type 

35from cryptography.x509.oid import ObjectIdentifier 

36 

37_EARLIEST_UTC_TIME = datetime.datetime(1950, 1, 1) 

38 

39# This must be kept in sync with sign.rs's list of allowable types in 

40# identify_hash_type 

41_AllowedHashTypes = typing.Union[ 

42 hashes.SHA224, 

43 hashes.SHA256, 

44 hashes.SHA384, 

45 hashes.SHA512, 

46 hashes.SHA3_224, 

47 hashes.SHA3_256, 

48 hashes.SHA3_384, 

49 hashes.SHA3_512, 

50] 

51 

52 

53class AttributeNotFound(Exception): 

54 def __init__(self, msg: str, oid: ObjectIdentifier) -> None: 

55 super().__init__(msg) 

56 self.oid = oid 

57 

58 

59def _reject_duplicate_extension( 

60 extension: Extension[ExtensionType], 

61 extensions: typing.List[Extension[ExtensionType]], 

62) -> None: 

63 # This is quadratic in the number of extensions 

64 for e in extensions: 

65 if e.oid == extension.oid: 

66 raise ValueError("This extension has already been set.") 

67 

68 

69def _reject_duplicate_attribute( 

70 oid: ObjectIdentifier, 

71 attributes: typing.List[ 

72 typing.Tuple[ObjectIdentifier, bytes, typing.Optional[int]] 

73 ], 

74) -> None: 

75 # This is quadratic in the number of attributes 

76 for attr_oid, _, _ in attributes: 

77 if attr_oid == oid: 

78 raise ValueError("This attribute has already been set.") 

79 

80 

81def _convert_to_naive_utc_time(time: datetime.datetime) -> datetime.datetime: 

82 """Normalizes a datetime to a naive datetime in UTC. 

83 

84 time -- datetime to normalize. Assumed to be in UTC if not timezone 

85 aware. 

86 """ 

87 if time.tzinfo is not None: 

88 offset = time.utcoffset() 

89 offset = offset if offset else datetime.timedelta() 

90 return time.replace(tzinfo=None) - offset 

91 else: 

92 return time 

93 

94 

95class Attribute: 

96 def __init__( 

97 self, 

98 oid: ObjectIdentifier, 

99 value: bytes, 

100 _type: int = _ASN1Type.UTF8String.value, 

101 ) -> None: 

102 self._oid = oid 

103 self._value = value 

104 self._type = _type 

105 

106 @property 

107 def oid(self) -> ObjectIdentifier: 

108 return self._oid 

109 

110 @property 

111 def value(self) -> bytes: 

112 return self._value 

113 

114 def __repr__(self) -> str: 

115 return f"<Attribute(oid={self.oid}, value={self.value!r})>" 

116 

117 def __eq__(self, other: object) -> bool: 

118 if not isinstance(other, Attribute): 

119 return NotImplemented 

120 

121 return ( 

122 self.oid == other.oid 

123 and self.value == other.value 

124 and self._type == other._type 

125 ) 

126 

127 def __hash__(self) -> int: 

128 return hash((self.oid, self.value, self._type)) 

129 

130 

131class Attributes: 

132 def __init__( 

133 self, 

134 attributes: typing.Iterable[Attribute], 

135 ) -> None: 

136 self._attributes = list(attributes) 

137 

138 __len__, __iter__, __getitem__ = _make_sequence_methods("_attributes") 

139 

140 def __repr__(self) -> str: 

141 return f"<Attributes({self._attributes})>" 

142 

143 def get_attribute_for_oid(self, oid: ObjectIdentifier) -> Attribute: 

144 for attr in self: 

145 if attr.oid == oid: 

146 return attr 

147 

148 raise AttributeNotFound(f"No {oid} attribute was found", oid) 

149 

150 

151class Version(utils.Enum): 

152 v1 = 0 

153 v3 = 2 

154 

155 

156class InvalidVersion(Exception): 

157 def __init__(self, msg: str, parsed_version: int) -> None: 

158 super().__init__(msg) 

159 self.parsed_version = parsed_version 

160 

161 

162class Certificate(metaclass=abc.ABCMeta): 

163 @abc.abstractmethod 

164 def fingerprint(self, algorithm: hashes.HashAlgorithm) -> bytes: 

165 """ 

166 Returns bytes using digest passed. 

167 """ 

168 

169 @property 

170 @abc.abstractmethod 

171 def serial_number(self) -> int: 

172 """ 

173 Returns certificate serial number 

174 """ 

175 

176 @property 

177 @abc.abstractmethod 

178 def version(self) -> Version: 

179 """ 

180 Returns the certificate version 

181 """ 

182 

183 @abc.abstractmethod 

184 def public_key(self) -> CertificatePublicKeyTypes: 

185 """ 

186 Returns the public key 

187 """ 

188 

189 @property 

190 @abc.abstractmethod 

191 def not_valid_before(self) -> datetime.datetime: 

192 """ 

193 Not before time (represented as UTC datetime) 

194 """ 

195 

196 @property 

197 @abc.abstractmethod 

198 def not_valid_after(self) -> datetime.datetime: 

199 """ 

200 Not after time (represented as UTC datetime) 

201 """ 

202 

203 @property 

204 @abc.abstractmethod 

205 def issuer(self) -> Name: 

206 """ 

207 Returns the issuer name object. 

208 """ 

209 

210 @property 

211 @abc.abstractmethod 

212 def subject(self) -> Name: 

213 """ 

214 Returns the subject name object. 

215 """ 

216 

217 @property 

218 @abc.abstractmethod 

219 def signature_hash_algorithm( 

220 self, 

221 ) -> typing.Optional[hashes.HashAlgorithm]: 

222 """ 

223 Returns a HashAlgorithm corresponding to the type of the digest signed 

224 in the certificate. 

225 """ 

226 

227 @property 

228 @abc.abstractmethod 

229 def signature_algorithm_oid(self) -> ObjectIdentifier: 

230 """ 

231 Returns the ObjectIdentifier of the signature algorithm. 

232 """ 

233 

234 @property 

235 @abc.abstractmethod 

236 def extensions(self) -> Extensions: 

237 """ 

238 Returns an Extensions object. 

239 """ 

240 

241 @property 

242 @abc.abstractmethod 

243 def signature(self) -> bytes: 

244 """ 

245 Returns the signature bytes. 

246 """ 

247 

248 @property 

249 @abc.abstractmethod 

250 def tbs_certificate_bytes(self) -> bytes: 

251 """ 

252 Returns the tbsCertificate payload bytes as defined in RFC 5280. 

253 """ 

254 

255 @property 

256 @abc.abstractmethod 

257 def tbs_precertificate_bytes(self) -> bytes: 

258 """ 

259 Returns the tbsCertificate payload bytes with the SCT list extension 

260 stripped. 

261 """ 

262 

263 @abc.abstractmethod 

264 def __eq__(self, other: object) -> bool: 

265 """ 

266 Checks equality. 

267 """ 

268 

269 @abc.abstractmethod 

270 def __hash__(self) -> int: 

271 """ 

272 Computes a hash. 

273 """ 

274 

275 @abc.abstractmethod 

276 def public_bytes(self, encoding: serialization.Encoding) -> bytes: 

277 """ 

278 Serializes the certificate to PEM or DER format. 

279 """ 

280 

281 @abc.abstractmethod 

282 def verify_directly_issued_by(self, issuer: "Certificate") -> None: 

283 """ 

284 This method verifies that certificate issuer name matches the 

285 issuer subject name and that the certificate is signed by the 

286 issuer's private key. No other validation is performed. 

287 """ 

288 

289 

290# Runtime isinstance checks need this since the rust class is not a subclass. 

291Certificate.register(rust_x509.Certificate) 

292 

293 

294class RevokedCertificate(metaclass=abc.ABCMeta): 

295 @property 

296 @abc.abstractmethod 

297 def serial_number(self) -> int: 

298 """ 

299 Returns the serial number of the revoked certificate. 

300 """ 

301 

302 @property 

303 @abc.abstractmethod 

304 def revocation_date(self) -> datetime.datetime: 

305 """ 

306 Returns the date of when this certificate was revoked. 

307 """ 

308 

309 @property 

310 @abc.abstractmethod 

311 def extensions(self) -> Extensions: 

312 """ 

313 Returns an Extensions object containing a list of Revoked extensions. 

314 """ 

315 

316 

317# Runtime isinstance checks need this since the rust class is not a subclass. 

318RevokedCertificate.register(rust_x509.RevokedCertificate) 

319 

320 

321class _RawRevokedCertificate(RevokedCertificate): 

322 def __init__( 

323 self, 

324 serial_number: int, 

325 revocation_date: datetime.datetime, 

326 extensions: Extensions, 

327 ): 

328 self._serial_number = serial_number 

329 self._revocation_date = revocation_date 

330 self._extensions = extensions 

331 

332 @property 

333 def serial_number(self) -> int: 

334 return self._serial_number 

335 

336 @property 

337 def revocation_date(self) -> datetime.datetime: 

338 return self._revocation_date 

339 

340 @property 

341 def extensions(self) -> Extensions: 

342 return self._extensions 

343 

344 

345class CertificateRevocationList(metaclass=abc.ABCMeta): 

346 @abc.abstractmethod 

347 def public_bytes(self, encoding: serialization.Encoding) -> bytes: 

348 """ 

349 Serializes the CRL to PEM or DER format. 

350 """ 

351 

352 @abc.abstractmethod 

353 def fingerprint(self, algorithm: hashes.HashAlgorithm) -> bytes: 

354 """ 

355 Returns bytes using digest passed. 

356 """ 

357 

358 @abc.abstractmethod 

359 def get_revoked_certificate_by_serial_number( 

360 self, serial_number: int 

361 ) -> typing.Optional[RevokedCertificate]: 

362 """ 

363 Returns an instance of RevokedCertificate or None if the serial_number 

364 is not in the CRL. 

365 """ 

366 

367 @property 

368 @abc.abstractmethod 

369 def signature_hash_algorithm( 

370 self, 

371 ) -> typing.Optional[hashes.HashAlgorithm]: 

372 """ 

373 Returns a HashAlgorithm corresponding to the type of the digest signed 

374 in the certificate. 

375 """ 

376 

377 @property 

378 @abc.abstractmethod 

379 def signature_algorithm_oid(self) -> ObjectIdentifier: 

380 """ 

381 Returns the ObjectIdentifier of the signature algorithm. 

382 """ 

383 

384 @property 

385 @abc.abstractmethod 

386 def issuer(self) -> Name: 

387 """ 

388 Returns the X509Name with the issuer of this CRL. 

389 """ 

390 

391 @property 

392 @abc.abstractmethod 

393 def next_update(self) -> typing.Optional[datetime.datetime]: 

394 """ 

395 Returns the date of next update for this CRL. 

396 """ 

397 

398 @property 

399 @abc.abstractmethod 

400 def last_update(self) -> datetime.datetime: 

401 """ 

402 Returns the date of last update for this CRL. 

403 """ 

404 

405 @property 

406 @abc.abstractmethod 

407 def extensions(self) -> Extensions: 

408 """ 

409 Returns an Extensions object containing a list of CRL extensions. 

410 """ 

411 

412 @property 

413 @abc.abstractmethod 

414 def signature(self) -> bytes: 

415 """ 

416 Returns the signature bytes. 

417 """ 

418 

419 @property 

420 @abc.abstractmethod 

421 def tbs_certlist_bytes(self) -> bytes: 

422 """ 

423 Returns the tbsCertList payload bytes as defined in RFC 5280. 

424 """ 

425 

426 @abc.abstractmethod 

427 def __eq__(self, other: object) -> bool: 

428 """ 

429 Checks equality. 

430 """ 

431 

432 @abc.abstractmethod 

433 def __len__(self) -> int: 

434 """ 

435 Number of revoked certificates in the CRL. 

436 """ 

437 

438 @typing.overload 

439 def __getitem__(self, idx: int) -> RevokedCertificate: 

440 ... 

441 

442 @typing.overload 

443 def __getitem__(self, idx: slice) -> typing.List[RevokedCertificate]: 

444 ... 

445 

446 @abc.abstractmethod 

447 def __getitem__( 

448 self, idx: typing.Union[int, slice] 

449 ) -> typing.Union[RevokedCertificate, typing.List[RevokedCertificate]]: 

450 """ 

451 Returns a revoked certificate (or slice of revoked certificates). 

452 """ 

453 

454 @abc.abstractmethod 

455 def __iter__(self) -> typing.Iterator[RevokedCertificate]: 

456 """ 

457 Iterator over the revoked certificates 

458 """ 

459 

460 @abc.abstractmethod 

461 def is_signature_valid( 

462 self, public_key: CertificateIssuerPublicKeyTypes 

463 ) -> bool: 

464 """ 

465 Verifies signature of revocation list against given public key. 

466 """ 

467 

468 

469CertificateRevocationList.register(rust_x509.CertificateRevocationList) 

470 

471 

472class CertificateSigningRequest(metaclass=abc.ABCMeta): 

473 @abc.abstractmethod 

474 def __eq__(self, other: object) -> bool: 

475 """ 

476 Checks equality. 

477 """ 

478 

479 @abc.abstractmethod 

480 def __hash__(self) -> int: 

481 """ 

482 Computes a hash. 

483 """ 

484 

485 @abc.abstractmethod 

486 def public_key(self) -> CertificatePublicKeyTypes: 

487 """ 

488 Returns the public key 

489 """ 

490 

491 @property 

492 @abc.abstractmethod 

493 def subject(self) -> Name: 

494 """ 

495 Returns the subject name object. 

496 """ 

497 

498 @property 

499 @abc.abstractmethod 

500 def signature_hash_algorithm( 

501 self, 

502 ) -> typing.Optional[hashes.HashAlgorithm]: 

503 """ 

504 Returns a HashAlgorithm corresponding to the type of the digest signed 

505 in the certificate. 

506 """ 

507 

508 @property 

509 @abc.abstractmethod 

510 def signature_algorithm_oid(self) -> ObjectIdentifier: 

511 """ 

512 Returns the ObjectIdentifier of the signature algorithm. 

513 """ 

514 

515 @property 

516 @abc.abstractmethod 

517 def extensions(self) -> Extensions: 

518 """ 

519 Returns the extensions in the signing request. 

520 """ 

521 

522 @property 

523 @abc.abstractmethod 

524 def attributes(self) -> Attributes: 

525 """ 

526 Returns an Attributes object. 

527 """ 

528 

529 @abc.abstractmethod 

530 def public_bytes(self, encoding: serialization.Encoding) -> bytes: 

531 """ 

532 Encodes the request to PEM or DER format. 

533 """ 

534 

535 @property 

536 @abc.abstractmethod 

537 def signature(self) -> bytes: 

538 """ 

539 Returns the signature bytes. 

540 """ 

541 

542 @property 

543 @abc.abstractmethod 

544 def tbs_certrequest_bytes(self) -> bytes: 

545 """ 

546 Returns the PKCS#10 CertificationRequestInfo bytes as defined in RFC 

547 2986. 

548 """ 

549 

550 @property 

551 @abc.abstractmethod 

552 def is_signature_valid(self) -> bool: 

553 """ 

554 Verifies signature of signing request. 

555 """ 

556 

557 @abc.abstractmethod 

558 def get_attribute_for_oid(self, oid: ObjectIdentifier) -> bytes: 

559 """ 

560 Get the attribute value for a given OID. 

561 """ 

562 

563 

564# Runtime isinstance checks need this since the rust class is not a subclass. 

565CertificateSigningRequest.register(rust_x509.CertificateSigningRequest) 

566 

567 

568# Backend argument preserved for API compatibility, but ignored. 

569def load_pem_x509_certificate( 

570 data: bytes, backend: typing.Any = None 

571) -> Certificate: 

572 return rust_x509.load_pem_x509_certificate(data) 

573 

574 

575def load_pem_x509_certificates(data: bytes) -> typing.List[Certificate]: 

576 return rust_x509.load_pem_x509_certificates(data) 

577 

578 

579# Backend argument preserved for API compatibility, but ignored. 

580def load_der_x509_certificate( 

581 data: bytes, backend: typing.Any = None 

582) -> Certificate: 

583 return rust_x509.load_der_x509_certificate(data) 

584 

585 

586# Backend argument preserved for API compatibility, but ignored. 

587def load_pem_x509_csr( 

588 data: bytes, backend: typing.Any = None 

589) -> CertificateSigningRequest: 

590 return rust_x509.load_pem_x509_csr(data) 

591 

592 

593# Backend argument preserved for API compatibility, but ignored. 

594def load_der_x509_csr( 

595 data: bytes, backend: typing.Any = None 

596) -> CertificateSigningRequest: 

597 return rust_x509.load_der_x509_csr(data) 

598 

599 

600# Backend argument preserved for API compatibility, but ignored. 

601def load_pem_x509_crl( 

602 data: bytes, backend: typing.Any = None 

603) -> CertificateRevocationList: 

604 return rust_x509.load_pem_x509_crl(data) 

605 

606 

607# Backend argument preserved for API compatibility, but ignored. 

608def load_der_x509_crl( 

609 data: bytes, backend: typing.Any = None 

610) -> CertificateRevocationList: 

611 return rust_x509.load_der_x509_crl(data) 

612 

613 

614class CertificateSigningRequestBuilder: 

615 def __init__( 

616 self, 

617 subject_name: typing.Optional[Name] = None, 

618 extensions: typing.List[Extension[ExtensionType]] = [], 

619 attributes: typing.List[ 

620 typing.Tuple[ObjectIdentifier, bytes, typing.Optional[int]] 

621 ] = [], 

622 ): 

623 """ 

624 Creates an empty X.509 certificate request (v1). 

625 """ 

626 self._subject_name = subject_name 

627 self._extensions = extensions 

628 self._attributes = attributes 

629 

630 def subject_name(self, name: Name) -> "CertificateSigningRequestBuilder": 

631 """ 

632 Sets the certificate requestor's distinguished name. 

633 """ 

634 if not isinstance(name, Name): 

635 raise TypeError("Expecting x509.Name object.") 

636 if self._subject_name is not None: 

637 raise ValueError("The subject name may only be set once.") 

638 return CertificateSigningRequestBuilder( 

639 name, self._extensions, self._attributes 

640 ) 

641 

642 def add_extension( 

643 self, extval: ExtensionType, critical: bool 

644 ) -> "CertificateSigningRequestBuilder": 

645 """ 

646 Adds an X.509 extension to the certificate request. 

647 """ 

648 if not isinstance(extval, ExtensionType): 

649 raise TypeError("extension must be an ExtensionType") 

650 

651 extension = Extension(extval.oid, critical, extval) 

652 _reject_duplicate_extension(extension, self._extensions) 

653 

654 return CertificateSigningRequestBuilder( 

655 self._subject_name, 

656 self._extensions + [extension], 

657 self._attributes, 

658 ) 

659 

660 def add_attribute( 

661 self, 

662 oid: ObjectIdentifier, 

663 value: bytes, 

664 *, 

665 _tag: typing.Optional[_ASN1Type] = None, 

666 ) -> "CertificateSigningRequestBuilder": 

667 """ 

668 Adds an X.509 attribute with an OID and associated value. 

669 """ 

670 if not isinstance(oid, ObjectIdentifier): 

671 raise TypeError("oid must be an ObjectIdentifier") 

672 

673 if not isinstance(value, bytes): 

674 raise TypeError("value must be bytes") 

675 

676 if _tag is not None and not isinstance(_tag, _ASN1Type): 

677 raise TypeError("tag must be _ASN1Type") 

678 

679 _reject_duplicate_attribute(oid, self._attributes) 

680 

681 if _tag is not None: 

682 tag = _tag.value 

683 else: 

684 tag = None 

685 

686 return CertificateSigningRequestBuilder( 

687 self._subject_name, 

688 self._extensions, 

689 self._attributes + [(oid, value, tag)], 

690 ) 

691 

692 def sign( 

693 self, 

694 private_key: CertificateIssuerPrivateKeyTypes, 

695 algorithm: typing.Optional[_AllowedHashTypes], 

696 backend: typing.Any = None, 

697 ) -> CertificateSigningRequest: 

698 """ 

699 Signs the request using the requestor's private key. 

700 """ 

701 if self._subject_name is None: 

702 raise ValueError("A CertificateSigningRequest must have a subject") 

703 return rust_x509.create_x509_csr(self, private_key, algorithm) 

704 

705 

706class CertificateBuilder: 

707 _extensions: typing.List[Extension[ExtensionType]] 

708 

709 def __init__( 

710 self, 

711 issuer_name: typing.Optional[Name] = None, 

712 subject_name: typing.Optional[Name] = None, 

713 public_key: typing.Optional[CertificatePublicKeyTypes] = None, 

714 serial_number: typing.Optional[int] = None, 

715 not_valid_before: typing.Optional[datetime.datetime] = None, 

716 not_valid_after: typing.Optional[datetime.datetime] = None, 

717 extensions: typing.List[Extension[ExtensionType]] = [], 

718 ) -> None: 

719 self._version = Version.v3 

720 self._issuer_name = issuer_name 

721 self._subject_name = subject_name 

722 self._public_key = public_key 

723 self._serial_number = serial_number 

724 self._not_valid_before = not_valid_before 

725 self._not_valid_after = not_valid_after 

726 self._extensions = extensions 

727 

728 def issuer_name(self, name: Name) -> "CertificateBuilder": 

729 """ 

730 Sets the CA's distinguished name. 

731 """ 

732 if not isinstance(name, Name): 

733 raise TypeError("Expecting x509.Name object.") 

734 if self._issuer_name is not None: 

735 raise ValueError("The issuer name may only be set once.") 

736 return CertificateBuilder( 

737 name, 

738 self._subject_name, 

739 self._public_key, 

740 self._serial_number, 

741 self._not_valid_before, 

742 self._not_valid_after, 

743 self._extensions, 

744 ) 

745 

746 def subject_name(self, name: Name) -> "CertificateBuilder": 

747 """ 

748 Sets the requestor's distinguished name. 

749 """ 

750 if not isinstance(name, Name): 

751 raise TypeError("Expecting x509.Name object.") 

752 if self._subject_name is not None: 

753 raise ValueError("The subject name may only be set once.") 

754 return CertificateBuilder( 

755 self._issuer_name, 

756 name, 

757 self._public_key, 

758 self._serial_number, 

759 self._not_valid_before, 

760 self._not_valid_after, 

761 self._extensions, 

762 ) 

763 

764 def public_key( 

765 self, 

766 key: CertificatePublicKeyTypes, 

767 ) -> "CertificateBuilder": 

768 """ 

769 Sets the requestor's public key (as found in the signing request). 

770 """ 

771 if not isinstance( 

772 key, 

773 ( 

774 dsa.DSAPublicKey, 

775 rsa.RSAPublicKey, 

776 ec.EllipticCurvePublicKey, 

777 ed25519.Ed25519PublicKey, 

778 ed448.Ed448PublicKey, 

779 x25519.X25519PublicKey, 

780 x448.X448PublicKey, 

781 ), 

782 ): 

783 raise TypeError( 

784 "Expecting one of DSAPublicKey, RSAPublicKey," 

785 " EllipticCurvePublicKey, Ed25519PublicKey," 

786 " Ed448PublicKey, X25519PublicKey, or " 

787 "X448PublicKey." 

788 ) 

789 if self._public_key is not None: 

790 raise ValueError("The public key may only be set once.") 

791 return CertificateBuilder( 

792 self._issuer_name, 

793 self._subject_name, 

794 key, 

795 self._serial_number, 

796 self._not_valid_before, 

797 self._not_valid_after, 

798 self._extensions, 

799 ) 

800 

801 def serial_number(self, number: int) -> "CertificateBuilder": 

802 """ 

803 Sets the certificate serial number. 

804 """ 

805 if not isinstance(number, int): 

806 raise TypeError("Serial number must be of integral type.") 

807 if self._serial_number is not None: 

808 raise ValueError("The serial number may only be set once.") 

809 if number <= 0: 

810 raise ValueError("The serial number should be positive.") 

811 

812 # ASN.1 integers are always signed, so most significant bit must be 

813 # zero. 

814 if number.bit_length() >= 160: # As defined in RFC 5280 

815 raise ValueError( 

816 "The serial number should not be more than 159 " "bits." 

817 ) 

818 return CertificateBuilder( 

819 self._issuer_name, 

820 self._subject_name, 

821 self._public_key, 

822 number, 

823 self._not_valid_before, 

824 self._not_valid_after, 

825 self._extensions, 

826 ) 

827 

828 def not_valid_before( 

829 self, time: datetime.datetime 

830 ) -> "CertificateBuilder": 

831 """ 

832 Sets the certificate activation time. 

833 """ 

834 if not isinstance(time, datetime.datetime): 

835 raise TypeError("Expecting datetime object.") 

836 if self._not_valid_before is not None: 

837 raise ValueError("The not valid before may only be set once.") 

838 time = _convert_to_naive_utc_time(time) 

839 if time < _EARLIEST_UTC_TIME: 

840 raise ValueError( 

841 "The not valid before date must be on or after" 

842 " 1950 January 1)." 

843 ) 

844 if self._not_valid_after is not None and time > self._not_valid_after: 

845 raise ValueError( 

846 "The not valid before date must be before the not valid after " 

847 "date." 

848 ) 

849 return CertificateBuilder( 

850 self._issuer_name, 

851 self._subject_name, 

852 self._public_key, 

853 self._serial_number, 

854 time, 

855 self._not_valid_after, 

856 self._extensions, 

857 ) 

858 

859 def not_valid_after(self, time: datetime.datetime) -> "CertificateBuilder": 

860 """ 

861 Sets the certificate expiration time. 

862 """ 

863 if not isinstance(time, datetime.datetime): 

864 raise TypeError("Expecting datetime object.") 

865 if self._not_valid_after is not None: 

866 raise ValueError("The not valid after may only be set once.") 

867 time = _convert_to_naive_utc_time(time) 

868 if time < _EARLIEST_UTC_TIME: 

869 raise ValueError( 

870 "The not valid after date must be on or after" 

871 " 1950 January 1." 

872 ) 

873 if ( 

874 self._not_valid_before is not None 

875 and time < self._not_valid_before 

876 ): 

877 raise ValueError( 

878 "The not valid after date must be after the not valid before " 

879 "date." 

880 ) 

881 return CertificateBuilder( 

882 self._issuer_name, 

883 self._subject_name, 

884 self._public_key, 

885 self._serial_number, 

886 self._not_valid_before, 

887 time, 

888 self._extensions, 

889 ) 

890 

891 def add_extension( 

892 self, extval: ExtensionType, critical: bool 

893 ) -> "CertificateBuilder": 

894 """ 

895 Adds an X.509 extension to the certificate. 

896 """ 

897 if not isinstance(extval, ExtensionType): 

898 raise TypeError("extension must be an ExtensionType") 

899 

900 extension = Extension(extval.oid, critical, extval) 

901 _reject_duplicate_extension(extension, self._extensions) 

902 

903 return CertificateBuilder( 

904 self._issuer_name, 

905 self._subject_name, 

906 self._public_key, 

907 self._serial_number, 

908 self._not_valid_before, 

909 self._not_valid_after, 

910 self._extensions + [extension], 

911 ) 

912 

913 def sign( 

914 self, 

915 private_key: CertificateIssuerPrivateKeyTypes, 

916 algorithm: typing.Optional[_AllowedHashTypes], 

917 backend: typing.Any = None, 

918 ) -> Certificate: 

919 """ 

920 Signs the certificate using the CA's private key. 

921 """ 

922 if self._subject_name is None: 

923 raise ValueError("A certificate must have a subject name") 

924 

925 if self._issuer_name is None: 

926 raise ValueError("A certificate must have an issuer name") 

927 

928 if self._serial_number is None: 

929 raise ValueError("A certificate must have a serial number") 

930 

931 if self._not_valid_before is None: 

932 raise ValueError("A certificate must have a not valid before time") 

933 

934 if self._not_valid_after is None: 

935 raise ValueError("A certificate must have a not valid after time") 

936 

937 if self._public_key is None: 

938 raise ValueError("A certificate must have a public key") 

939 

940 return rust_x509.create_x509_certificate(self, private_key, algorithm) 

941 

942 

943class CertificateRevocationListBuilder: 

944 _extensions: typing.List[Extension[ExtensionType]] 

945 _revoked_certificates: typing.List[RevokedCertificate] 

946 

947 def __init__( 

948 self, 

949 issuer_name: typing.Optional[Name] = None, 

950 last_update: typing.Optional[datetime.datetime] = None, 

951 next_update: typing.Optional[datetime.datetime] = None, 

952 extensions: typing.List[Extension[ExtensionType]] = [], 

953 revoked_certificates: typing.List[RevokedCertificate] = [], 

954 ): 

955 self._issuer_name = issuer_name 

956 self._last_update = last_update 

957 self._next_update = next_update 

958 self._extensions = extensions 

959 self._revoked_certificates = revoked_certificates 

960 

961 def issuer_name( 

962 self, issuer_name: Name 

963 ) -> "CertificateRevocationListBuilder": 

964 if not isinstance(issuer_name, Name): 

965 raise TypeError("Expecting x509.Name object.") 

966 if self._issuer_name is not None: 

967 raise ValueError("The issuer name may only be set once.") 

968 return CertificateRevocationListBuilder( 

969 issuer_name, 

970 self._last_update, 

971 self._next_update, 

972 self._extensions, 

973 self._revoked_certificates, 

974 ) 

975 

976 def last_update( 

977 self, last_update: datetime.datetime 

978 ) -> "CertificateRevocationListBuilder": 

979 if not isinstance(last_update, datetime.datetime): 

980 raise TypeError("Expecting datetime object.") 

981 if self._last_update is not None: 

982 raise ValueError("Last update may only be set once.") 

983 last_update = _convert_to_naive_utc_time(last_update) 

984 if last_update < _EARLIEST_UTC_TIME: 

985 raise ValueError( 

986 "The last update date must be on or after" " 1950 January 1." 

987 ) 

988 if self._next_update is not None and last_update > self._next_update: 

989 raise ValueError( 

990 "The last update date must be before the next update date." 

991 ) 

992 return CertificateRevocationListBuilder( 

993 self._issuer_name, 

994 last_update, 

995 self._next_update, 

996 self._extensions, 

997 self._revoked_certificates, 

998 ) 

999 

1000 def next_update( 

1001 self, next_update: datetime.datetime 

1002 ) -> "CertificateRevocationListBuilder": 

1003 if not isinstance(next_update, datetime.datetime): 

1004 raise TypeError("Expecting datetime object.") 

1005 if self._next_update is not None: 

1006 raise ValueError("Last update may only be set once.") 

1007 next_update = _convert_to_naive_utc_time(next_update) 

1008 if next_update < _EARLIEST_UTC_TIME: 

1009 raise ValueError( 

1010 "The last update date must be on or after" " 1950 January 1." 

1011 ) 

1012 if self._last_update is not None and next_update < self._last_update: 

1013 raise ValueError( 

1014 "The next update date must be after the last update date." 

1015 ) 

1016 return CertificateRevocationListBuilder( 

1017 self._issuer_name, 

1018 self._last_update, 

1019 next_update, 

1020 self._extensions, 

1021 self._revoked_certificates, 

1022 ) 

1023 

1024 def add_extension( 

1025 self, extval: ExtensionType, critical: bool 

1026 ) -> "CertificateRevocationListBuilder": 

1027 """ 

1028 Adds an X.509 extension to the certificate revocation list. 

1029 """ 

1030 if not isinstance(extval, ExtensionType): 

1031 raise TypeError("extension must be an ExtensionType") 

1032 

1033 extension = Extension(extval.oid, critical, extval) 

1034 _reject_duplicate_extension(extension, self._extensions) 

1035 return CertificateRevocationListBuilder( 

1036 self._issuer_name, 

1037 self._last_update, 

1038 self._next_update, 

1039 self._extensions + [extension], 

1040 self._revoked_certificates, 

1041 ) 

1042 

1043 def add_revoked_certificate( 

1044 self, revoked_certificate: RevokedCertificate 

1045 ) -> "CertificateRevocationListBuilder": 

1046 """ 

1047 Adds a revoked certificate to the CRL. 

1048 """ 

1049 if not isinstance(revoked_certificate, RevokedCertificate): 

1050 raise TypeError("Must be an instance of RevokedCertificate") 

1051 

1052 return CertificateRevocationListBuilder( 

1053 self._issuer_name, 

1054 self._last_update, 

1055 self._next_update, 

1056 self._extensions, 

1057 self._revoked_certificates + [revoked_certificate], 

1058 ) 

1059 

1060 def sign( 

1061 self, 

1062 private_key: CertificateIssuerPrivateKeyTypes, 

1063 algorithm: typing.Optional[_AllowedHashTypes], 

1064 backend: typing.Any = None, 

1065 ) -> CertificateRevocationList: 

1066 if self._issuer_name is None: 

1067 raise ValueError("A CRL must have an issuer name") 

1068 

1069 if self._last_update is None: 

1070 raise ValueError("A CRL must have a last update time") 

1071 

1072 if self._next_update is None: 

1073 raise ValueError("A CRL must have a next update time") 

1074 

1075 return rust_x509.create_x509_crl(self, private_key, algorithm) 

1076 

1077 

1078class RevokedCertificateBuilder: 

1079 def __init__( 

1080 self, 

1081 serial_number: typing.Optional[int] = None, 

1082 revocation_date: typing.Optional[datetime.datetime] = None, 

1083 extensions: typing.List[Extension[ExtensionType]] = [], 

1084 ): 

1085 self._serial_number = serial_number 

1086 self._revocation_date = revocation_date 

1087 self._extensions = extensions 

1088 

1089 def serial_number(self, number: int) -> "RevokedCertificateBuilder": 

1090 if not isinstance(number, int): 

1091 raise TypeError("Serial number must be of integral type.") 

1092 if self._serial_number is not None: 

1093 raise ValueError("The serial number may only be set once.") 

1094 if number <= 0: 

1095 raise ValueError("The serial number should be positive") 

1096 

1097 # ASN.1 integers are always signed, so most significant bit must be 

1098 # zero. 

1099 if number.bit_length() >= 160: # As defined in RFC 5280 

1100 raise ValueError( 

1101 "The serial number should not be more than 159 " "bits." 

1102 ) 

1103 return RevokedCertificateBuilder( 

1104 number, self._revocation_date, self._extensions 

1105 ) 

1106 

1107 def revocation_date( 

1108 self, time: datetime.datetime 

1109 ) -> "RevokedCertificateBuilder": 

1110 if not isinstance(time, datetime.datetime): 

1111 raise TypeError("Expecting datetime object.") 

1112 if self._revocation_date is not None: 

1113 raise ValueError("The revocation date may only be set once.") 

1114 time = _convert_to_naive_utc_time(time) 

1115 if time < _EARLIEST_UTC_TIME: 

1116 raise ValueError( 

1117 "The revocation date must be on or after" " 1950 January 1." 

1118 ) 

1119 return RevokedCertificateBuilder( 

1120 self._serial_number, time, self._extensions 

1121 ) 

1122 

1123 def add_extension( 

1124 self, extval: ExtensionType, critical: bool 

1125 ) -> "RevokedCertificateBuilder": 

1126 if not isinstance(extval, ExtensionType): 

1127 raise TypeError("extension must be an ExtensionType") 

1128 

1129 extension = Extension(extval.oid, critical, extval) 

1130 _reject_duplicate_extension(extension, self._extensions) 

1131 return RevokedCertificateBuilder( 

1132 self._serial_number, 

1133 self._revocation_date, 

1134 self._extensions + [extension], 

1135 ) 

1136 

1137 def build(self, backend: typing.Any = None) -> RevokedCertificate: 

1138 if self._serial_number is None: 

1139 raise ValueError("A revoked certificate must have a serial number") 

1140 if self._revocation_date is None: 

1141 raise ValueError( 

1142 "A revoked certificate must have a revocation date" 

1143 ) 

1144 return _RawRevokedCertificate( 

1145 self._serial_number, 

1146 self._revocation_date, 

1147 Extensions(self._extensions), 

1148 ) 

1149 

1150 

1151def random_serial_number() -> int: 

1152 return int.from_bytes(os.urandom(20), "big") >> 1