Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/cryptography/x509/base.py: 47%

413 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1# This file is dual licensed under the terms of the Apache License, Version 

2# 2.0, and the BSD License. See the LICENSE file in the root of this repository 

3# for complete details. 

4 

5 

6import abc 

7import datetime 

8import os 

9import typing 

10 

11from cryptography import utils 

12from cryptography.hazmat.bindings._rust import x509 as rust_x509 

13from cryptography.hazmat.primitives import hashes, serialization 

14from cryptography.hazmat.primitives.asymmetric import ( 

15 dsa, 

16 ec, 

17 ed25519, 

18 ed448, 

19 rsa, 

20 x25519, 

21 x448, 

22) 

23from cryptography.hazmat.primitives.asymmetric.types import ( 

24 CERTIFICATE_ISSUER_PUBLIC_KEY_TYPES, 

25 CERTIFICATE_PRIVATE_KEY_TYPES, 

26 CERTIFICATE_PUBLIC_KEY_TYPES, 

27) 

28from cryptography.x509.extensions import ( 

29 Extension, 

30 ExtensionType, 

31 Extensions, 

32 _make_sequence_methods, 

33) 

34from cryptography.x509.name import Name, _ASN1Type 

35from cryptography.x509.oid import ObjectIdentifier 

36 

37 

38_EARLIEST_UTC_TIME = datetime.datetime(1950, 1, 1) 

39 

40 

41class AttributeNotFound(Exception): 

42 def __init__(self, msg: str, oid: ObjectIdentifier) -> None: 

43 super(AttributeNotFound, self).__init__(msg) 

44 self.oid = oid 

45 

46 

47def _reject_duplicate_extension( 

48 extension: Extension[ExtensionType], 

49 extensions: typing.List[Extension[ExtensionType]], 

50) -> None: 

51 # This is quadratic in the number of extensions 

52 for e in extensions: 

53 if e.oid == extension.oid: 

54 raise ValueError("This extension has already been set.") 

55 

56 

57def _reject_duplicate_attribute( 

58 oid: ObjectIdentifier, 

59 attributes: typing.List[ 

60 typing.Tuple[ObjectIdentifier, bytes, typing.Optional[int]] 

61 ], 

62) -> None: 

63 # This is quadratic in the number of attributes 

64 for attr_oid, _, _ in attributes: 

65 if attr_oid == oid: 

66 raise ValueError("This attribute has already been set.") 

67 

68 

69def _convert_to_naive_utc_time(time: datetime.datetime) -> datetime.datetime: 

70 """Normalizes a datetime to a naive datetime in UTC. 

71 

72 time -- datetime to normalize. Assumed to be in UTC if not timezone 

73 aware. 

74 """ 

75 if time.tzinfo is not None: 

76 offset = time.utcoffset() 

77 offset = offset if offset else datetime.timedelta() 

78 return time.replace(tzinfo=None) - offset 

79 else: 

80 return time 

81 

82 

83class Attribute: 

84 def __init__( 

85 self, 

86 oid: ObjectIdentifier, 

87 value: bytes, 

88 _type: int = _ASN1Type.UTF8String.value, 

89 ) -> None: 

90 self._oid = oid 

91 self._value = value 

92 self._type = _type 

93 

94 @property 

95 def oid(self) -> ObjectIdentifier: 

96 return self._oid 

97 

98 @property 

99 def value(self) -> bytes: 

100 return self._value 

101 

102 def __repr__(self) -> str: 

103 return "<Attribute(oid={}, value={!r})>".format(self.oid, self.value) 

104 

105 def __eq__(self, other: object) -> bool: 

106 if not isinstance(other, Attribute): 

107 return NotImplemented 

108 

109 return ( 

110 self.oid == other.oid 

111 and self.value == other.value 

112 and self._type == other._type 

113 ) 

114 

115 def __hash__(self) -> int: 

116 return hash((self.oid, self.value, self._type)) 

117 

118 

119class Attributes: 

120 def __init__( 

121 self, 

122 attributes: typing.Iterable[Attribute], 

123 ) -> None: 

124 self._attributes = list(attributes) 

125 

126 __len__, __iter__, __getitem__ = _make_sequence_methods("_attributes") 

127 

128 def __repr__(self) -> str: 

129 return "<Attributes({})>".format(self._attributes) 

130 

131 def get_attribute_for_oid(self, oid: ObjectIdentifier) -> Attribute: 

132 for attr in self: 

133 if attr.oid == oid: 

134 return attr 

135 

136 raise AttributeNotFound("No {} attribute was found".format(oid), oid) 

137 

138 

139class Version(utils.Enum): 

140 v1 = 0 

141 v3 = 2 

142 

143 

144class InvalidVersion(Exception): 

145 def __init__(self, msg: str, parsed_version: int) -> None: 

146 super(InvalidVersion, self).__init__(msg) 

147 self.parsed_version = parsed_version 

148 

149 

150class Certificate(metaclass=abc.ABCMeta): 

151 @abc.abstractmethod 

152 def fingerprint(self, algorithm: hashes.HashAlgorithm) -> bytes: 

153 """ 

154 Returns bytes using digest passed. 

155 """ 

156 

157 @abc.abstractproperty 

158 def serial_number(self) -> int: 

159 """ 

160 Returns certificate serial number 

161 """ 

162 

163 @abc.abstractproperty 

164 def version(self) -> Version: 

165 """ 

166 Returns the certificate version 

167 """ 

168 

169 @abc.abstractmethod 

170 def public_key(self) -> CERTIFICATE_PUBLIC_KEY_TYPES: 

171 """ 

172 Returns the public key 

173 """ 

174 

175 @abc.abstractproperty 

176 def not_valid_before(self) -> datetime.datetime: 

177 """ 

178 Not before time (represented as UTC datetime) 

179 """ 

180 

181 @abc.abstractproperty 

182 def not_valid_after(self) -> datetime.datetime: 

183 """ 

184 Not after time (represented as UTC datetime) 

185 """ 

186 

187 @abc.abstractproperty 

188 def issuer(self) -> Name: 

189 """ 

190 Returns the issuer name object. 

191 """ 

192 

193 @abc.abstractproperty 

194 def subject(self) -> Name: 

195 """ 

196 Returns the subject name object. 

197 """ 

198 

199 @abc.abstractproperty 

200 def signature_hash_algorithm( 

201 self, 

202 ) -> typing.Optional[hashes.HashAlgorithm]: 

203 """ 

204 Returns a HashAlgorithm corresponding to the type of the digest signed 

205 in the certificate. 

206 """ 

207 

208 @abc.abstractproperty 

209 def signature_algorithm_oid(self) -> ObjectIdentifier: 

210 """ 

211 Returns the ObjectIdentifier of the signature algorithm. 

212 """ 

213 

214 @abc.abstractproperty 

215 def extensions(self) -> Extensions: 

216 """ 

217 Returns an Extensions object. 

218 """ 

219 

220 @abc.abstractproperty 

221 def signature(self) -> bytes: 

222 """ 

223 Returns the signature bytes. 

224 """ 

225 

226 @abc.abstractproperty 

227 def tbs_certificate_bytes(self) -> bytes: 

228 """ 

229 Returns the tbsCertificate payload bytes as defined in RFC 5280. 

230 """ 

231 

232 @abc.abstractproperty 

233 def tbs_precertificate_bytes(self) -> bytes: 

234 """ 

235 Returns the tbsCertificate payload bytes with the SCT list extension 

236 stripped. 

237 """ 

238 

239 @abc.abstractmethod 

240 def __eq__(self, other: object) -> bool: 

241 """ 

242 Checks equality. 

243 """ 

244 

245 @abc.abstractmethod 

246 def __hash__(self) -> int: 

247 """ 

248 Computes a hash. 

249 """ 

250 

251 @abc.abstractmethod 

252 def public_bytes(self, encoding: serialization.Encoding) -> bytes: 

253 """ 

254 Serializes the certificate to PEM or DER format. 

255 """ 

256 

257 

258# Runtime isinstance checks need this since the rust class is not a subclass. 

259Certificate.register(rust_x509.Certificate) 

260 

261 

262class RevokedCertificate(metaclass=abc.ABCMeta): 

263 @abc.abstractproperty 

264 def serial_number(self) -> int: 

265 """ 

266 Returns the serial number of the revoked certificate. 

267 """ 

268 

269 @abc.abstractproperty 

270 def revocation_date(self) -> datetime.datetime: 

271 """ 

272 Returns the date of when this certificate was revoked. 

273 """ 

274 

275 @abc.abstractproperty 

276 def extensions(self) -> Extensions: 

277 """ 

278 Returns an Extensions object containing a list of Revoked extensions. 

279 """ 

280 

281 

282# Runtime isinstance checks need this since the rust class is not a subclass. 

283RevokedCertificate.register(rust_x509.RevokedCertificate) 

284 

285 

286class _RawRevokedCertificate(RevokedCertificate): 

287 def __init__( 

288 self, 

289 serial_number: int, 

290 revocation_date: datetime.datetime, 

291 extensions: Extensions, 

292 ): 

293 self._serial_number = serial_number 

294 self._revocation_date = revocation_date 

295 self._extensions = extensions 

296 

297 @property 

298 def serial_number(self) -> int: 

299 return self._serial_number 

300 

301 @property 

302 def revocation_date(self) -> datetime.datetime: 

303 return self._revocation_date 

304 

305 @property 

306 def extensions(self) -> Extensions: 

307 return self._extensions 

308 

309 

310class CertificateRevocationList(metaclass=abc.ABCMeta): 

311 @abc.abstractmethod 

312 def public_bytes(self, encoding: serialization.Encoding) -> bytes: 

313 """ 

314 Serializes the CRL to PEM or DER format. 

315 """ 

316 

317 @abc.abstractmethod 

318 def fingerprint(self, algorithm: hashes.HashAlgorithm) -> bytes: 

319 """ 

320 Returns bytes using digest passed. 

321 """ 

322 

323 @abc.abstractmethod 

324 def get_revoked_certificate_by_serial_number( 

325 self, serial_number: int 

326 ) -> typing.Optional[RevokedCertificate]: 

327 """ 

328 Returns an instance of RevokedCertificate or None if the serial_number 

329 is not in the CRL. 

330 """ 

331 

332 @abc.abstractproperty 

333 def signature_hash_algorithm( 

334 self, 

335 ) -> typing.Optional[hashes.HashAlgorithm]: 

336 """ 

337 Returns a HashAlgorithm corresponding to the type of the digest signed 

338 in the certificate. 

339 """ 

340 

341 @abc.abstractproperty 

342 def signature_algorithm_oid(self) -> ObjectIdentifier: 

343 """ 

344 Returns the ObjectIdentifier of the signature algorithm. 

345 """ 

346 

347 @abc.abstractproperty 

348 def issuer(self) -> Name: 

349 """ 

350 Returns the X509Name with the issuer of this CRL. 

351 """ 

352 

353 @abc.abstractproperty 

354 def next_update(self) -> typing.Optional[datetime.datetime]: 

355 """ 

356 Returns the date of next update for this CRL. 

357 """ 

358 

359 @abc.abstractproperty 

360 def last_update(self) -> datetime.datetime: 

361 """ 

362 Returns the date of last update for this CRL. 

363 """ 

364 

365 @abc.abstractproperty 

366 def extensions(self) -> Extensions: 

367 """ 

368 Returns an Extensions object containing a list of CRL extensions. 

369 """ 

370 

371 @abc.abstractproperty 

372 def signature(self) -> bytes: 

373 """ 

374 Returns the signature bytes. 

375 """ 

376 

377 @abc.abstractproperty 

378 def tbs_certlist_bytes(self) -> bytes: 

379 """ 

380 Returns the tbsCertList payload bytes as defined in RFC 5280. 

381 """ 

382 

383 @abc.abstractmethod 

384 def __eq__(self, other: object) -> bool: 

385 """ 

386 Checks equality. 

387 """ 

388 

389 @abc.abstractmethod 

390 def __len__(self) -> int: 

391 """ 

392 Number of revoked certificates in the CRL. 

393 """ 

394 

395 @typing.overload 

396 def __getitem__(self, idx: int) -> RevokedCertificate: 

397 ... 

398 

399 @typing.overload 

400 def __getitem__(self, idx: slice) -> typing.List[RevokedCertificate]: 

401 ... 

402 

403 @abc.abstractmethod 

404 def __getitem__( 

405 self, idx: typing.Union[int, slice] 

406 ) -> typing.Union[RevokedCertificate, typing.List[RevokedCertificate]]: 

407 """ 

408 Returns a revoked certificate (or slice of revoked certificates). 

409 """ 

410 

411 @abc.abstractmethod 

412 def __iter__(self) -> typing.Iterator[RevokedCertificate]: 

413 """ 

414 Iterator over the revoked certificates 

415 """ 

416 

417 @abc.abstractmethod 

418 def is_signature_valid( 

419 self, public_key: CERTIFICATE_ISSUER_PUBLIC_KEY_TYPES 

420 ) -> bool: 

421 """ 

422 Verifies signature of revocation list against given public key. 

423 """ 

424 

425 

426CertificateRevocationList.register(rust_x509.CertificateRevocationList) 

427 

428 

429class CertificateSigningRequest(metaclass=abc.ABCMeta): 

430 @abc.abstractmethod 

431 def __eq__(self, other: object) -> bool: 

432 """ 

433 Checks equality. 

434 """ 

435 

436 @abc.abstractmethod 

437 def __hash__(self) -> int: 

438 """ 

439 Computes a hash. 

440 """ 

441 

442 @abc.abstractmethod 

443 def public_key(self) -> CERTIFICATE_PUBLIC_KEY_TYPES: 

444 """ 

445 Returns the public key 

446 """ 

447 

448 @abc.abstractproperty 

449 def subject(self) -> Name: 

450 """ 

451 Returns the subject name object. 

452 """ 

453 

454 @abc.abstractproperty 

455 def signature_hash_algorithm( 

456 self, 

457 ) -> typing.Optional[hashes.HashAlgorithm]: 

458 """ 

459 Returns a HashAlgorithm corresponding to the type of the digest signed 

460 in the certificate. 

461 """ 

462 

463 @abc.abstractproperty 

464 def signature_algorithm_oid(self) -> ObjectIdentifier: 

465 """ 

466 Returns the ObjectIdentifier of the signature algorithm. 

467 """ 

468 

469 @abc.abstractproperty 

470 def extensions(self) -> Extensions: 

471 """ 

472 Returns the extensions in the signing request. 

473 """ 

474 

475 @abc.abstractproperty 

476 def attributes(self) -> Attributes: 

477 """ 

478 Returns an Attributes object. 

479 """ 

480 

481 @abc.abstractmethod 

482 def public_bytes(self, encoding: serialization.Encoding) -> bytes: 

483 """ 

484 Encodes the request to PEM or DER format. 

485 """ 

486 

487 @abc.abstractproperty 

488 def signature(self) -> bytes: 

489 """ 

490 Returns the signature bytes. 

491 """ 

492 

493 @abc.abstractproperty 

494 def tbs_certrequest_bytes(self) -> bytes: 

495 """ 

496 Returns the PKCS#10 CertificationRequestInfo bytes as defined in RFC 

497 2986. 

498 """ 

499 

500 @abc.abstractproperty 

501 def is_signature_valid(self) -> bool: 

502 """ 

503 Verifies signature of signing request. 

504 """ 

505 

506 @abc.abstractmethod 

507 def get_attribute_for_oid(self, oid: ObjectIdentifier) -> bytes: 

508 """ 

509 Get the attribute value for a given OID. 

510 """ 

511 

512 

513# Runtime isinstance checks need this since the rust class is not a subclass. 

514CertificateSigningRequest.register(rust_x509.CertificateSigningRequest) 

515 

516 

517# Backend argument preserved for API compatibility, but ignored. 

518def load_pem_x509_certificate( 

519 data: bytes, backend: typing.Any = None 

520) -> Certificate: 

521 return rust_x509.load_pem_x509_certificate(data) 

522 

523 

524# Backend argument preserved for API compatibility, but ignored. 

525def load_der_x509_certificate( 

526 data: bytes, backend: typing.Any = None 

527) -> Certificate: 

528 return rust_x509.load_der_x509_certificate(data) 

529 

530 

531# Backend argument preserved for API compatibility, but ignored. 

532def load_pem_x509_csr( 

533 data: bytes, backend: typing.Any = None 

534) -> CertificateSigningRequest: 

535 return rust_x509.load_pem_x509_csr(data) 

536 

537 

538# Backend argument preserved for API compatibility, but ignored. 

539def load_der_x509_csr( 

540 data: bytes, backend: typing.Any = None 

541) -> CertificateSigningRequest: 

542 return rust_x509.load_der_x509_csr(data) 

543 

544 

545# Backend argument preserved for API compatibility, but ignored. 

546def load_pem_x509_crl( 

547 data: bytes, backend: typing.Any = None 

548) -> CertificateRevocationList: 

549 return rust_x509.load_pem_x509_crl(data) 

550 

551 

552# Backend argument preserved for API compatibility, but ignored. 

553def load_der_x509_crl( 

554 data: bytes, backend: typing.Any = None 

555) -> CertificateRevocationList: 

556 return rust_x509.load_der_x509_crl(data) 

557 

558 

559class CertificateSigningRequestBuilder: 

560 def __init__( 

561 self, 

562 subject_name: typing.Optional[Name] = None, 

563 extensions: typing.List[Extension[ExtensionType]] = [], 

564 attributes: typing.List[ 

565 typing.Tuple[ObjectIdentifier, bytes, typing.Optional[int]] 

566 ] = [], 

567 ): 

568 """ 

569 Creates an empty X.509 certificate request (v1). 

570 """ 

571 self._subject_name = subject_name 

572 self._extensions = extensions 

573 self._attributes = attributes 

574 

575 def subject_name(self, name: Name) -> "CertificateSigningRequestBuilder": 

576 """ 

577 Sets the certificate requestor's distinguished name. 

578 """ 

579 if not isinstance(name, Name): 

580 raise TypeError("Expecting x509.Name object.") 

581 if self._subject_name is not None: 

582 raise ValueError("The subject name may only be set once.") 

583 return CertificateSigningRequestBuilder( 

584 name, self._extensions, self._attributes 

585 ) 

586 

587 def add_extension( 

588 self, extval: ExtensionType, critical: bool 

589 ) -> "CertificateSigningRequestBuilder": 

590 """ 

591 Adds an X.509 extension to the certificate request. 

592 """ 

593 if not isinstance(extval, ExtensionType): 

594 raise TypeError("extension must be an ExtensionType") 

595 

596 extension = Extension(extval.oid, critical, extval) 

597 _reject_duplicate_extension(extension, self._extensions) 

598 

599 return CertificateSigningRequestBuilder( 

600 self._subject_name, 

601 self._extensions + [extension], 

602 self._attributes, 

603 ) 

604 

605 def add_attribute( 

606 self, 

607 oid: ObjectIdentifier, 

608 value: bytes, 

609 *, 

610 _tag: typing.Optional[_ASN1Type] = None, 

611 ) -> "CertificateSigningRequestBuilder": 

612 """ 

613 Adds an X.509 attribute with an OID and associated value. 

614 """ 

615 if not isinstance(oid, ObjectIdentifier): 

616 raise TypeError("oid must be an ObjectIdentifier") 

617 

618 if not isinstance(value, bytes): 

619 raise TypeError("value must be bytes") 

620 

621 if _tag is not None and not isinstance(_tag, _ASN1Type): 

622 raise TypeError("tag must be _ASN1Type") 

623 

624 _reject_duplicate_attribute(oid, self._attributes) 

625 

626 if _tag is not None: 

627 tag = _tag.value 

628 else: 

629 tag = None 

630 

631 return CertificateSigningRequestBuilder( 

632 self._subject_name, 

633 self._extensions, 

634 self._attributes + [(oid, value, tag)], 

635 ) 

636 

637 def sign( 

638 self, 

639 private_key: CERTIFICATE_PRIVATE_KEY_TYPES, 

640 algorithm: typing.Optional[hashes.HashAlgorithm], 

641 backend: typing.Any = None, 

642 ) -> CertificateSigningRequest: 

643 """ 

644 Signs the request using the requestor's private key. 

645 """ 

646 if self._subject_name is None: 

647 raise ValueError("A CertificateSigningRequest must have a subject") 

648 return rust_x509.create_x509_csr(self, private_key, algorithm) 

649 

650 

651class CertificateBuilder: 

652 _extensions: typing.List[Extension[ExtensionType]] 

653 

654 def __init__( 

655 self, 

656 issuer_name: typing.Optional[Name] = None, 

657 subject_name: typing.Optional[Name] = None, 

658 public_key: typing.Optional[CERTIFICATE_PUBLIC_KEY_TYPES] = None, 

659 serial_number: typing.Optional[int] = None, 

660 not_valid_before: typing.Optional[datetime.datetime] = None, 

661 not_valid_after: typing.Optional[datetime.datetime] = None, 

662 extensions: typing.List[Extension[ExtensionType]] = [], 

663 ) -> None: 

664 self._version = Version.v3 

665 self._issuer_name = issuer_name 

666 self._subject_name = subject_name 

667 self._public_key = public_key 

668 self._serial_number = serial_number 

669 self._not_valid_before = not_valid_before 

670 self._not_valid_after = not_valid_after 

671 self._extensions = extensions 

672 

673 def issuer_name(self, name: Name) -> "CertificateBuilder": 

674 """ 

675 Sets the CA's distinguished name. 

676 """ 

677 if not isinstance(name, Name): 

678 raise TypeError("Expecting x509.Name object.") 

679 if self._issuer_name is not None: 

680 raise ValueError("The issuer name may only be set once.") 

681 return CertificateBuilder( 

682 name, 

683 self._subject_name, 

684 self._public_key, 

685 self._serial_number, 

686 self._not_valid_before, 

687 self._not_valid_after, 

688 self._extensions, 

689 ) 

690 

691 def subject_name(self, name: Name) -> "CertificateBuilder": 

692 """ 

693 Sets the requestor's distinguished name. 

694 """ 

695 if not isinstance(name, Name): 

696 raise TypeError("Expecting x509.Name object.") 

697 if self._subject_name is not None: 

698 raise ValueError("The subject name may only be set once.") 

699 return CertificateBuilder( 

700 self._issuer_name, 

701 name, 

702 self._public_key, 

703 self._serial_number, 

704 self._not_valid_before, 

705 self._not_valid_after, 

706 self._extensions, 

707 ) 

708 

709 def public_key( 

710 self, 

711 key: CERTIFICATE_PUBLIC_KEY_TYPES, 

712 ) -> "CertificateBuilder": 

713 """ 

714 Sets the requestor's public key (as found in the signing request). 

715 """ 

716 if not isinstance( 

717 key, 

718 ( 

719 dsa.DSAPublicKey, 

720 rsa.RSAPublicKey, 

721 ec.EllipticCurvePublicKey, 

722 ed25519.Ed25519PublicKey, 

723 ed448.Ed448PublicKey, 

724 x25519.X25519PublicKey, 

725 x448.X448PublicKey, 

726 ), 

727 ): 

728 raise TypeError( 

729 "Expecting one of DSAPublicKey, RSAPublicKey," 

730 " EllipticCurvePublicKey, Ed25519PublicKey," 

731 " Ed448PublicKey, X25519PublicKey, or " 

732 "X448PublicKey." 

733 ) 

734 if self._public_key is not None: 

735 raise ValueError("The public key may only be set once.") 

736 return CertificateBuilder( 

737 self._issuer_name, 

738 self._subject_name, 

739 key, 

740 self._serial_number, 

741 self._not_valid_before, 

742 self._not_valid_after, 

743 self._extensions, 

744 ) 

745 

746 def serial_number(self, number: int) -> "CertificateBuilder": 

747 """ 

748 Sets the certificate serial number. 

749 """ 

750 if not isinstance(number, int): 

751 raise TypeError("Serial number must be of integral type.") 

752 if self._serial_number is not None: 

753 raise ValueError("The serial number may only be set once.") 

754 if number <= 0: 

755 raise ValueError("The serial number should be positive.") 

756 

757 # ASN.1 integers are always signed, so most significant bit must be 

758 # zero. 

759 if number.bit_length() >= 160: # As defined in RFC 5280 

760 raise ValueError( 

761 "The serial number should not be more than 159 " "bits." 

762 ) 

763 return CertificateBuilder( 

764 self._issuer_name, 

765 self._subject_name, 

766 self._public_key, 

767 number, 

768 self._not_valid_before, 

769 self._not_valid_after, 

770 self._extensions, 

771 ) 

772 

773 def not_valid_before( 

774 self, time: datetime.datetime 

775 ) -> "CertificateBuilder": 

776 """ 

777 Sets the certificate activation time. 

778 """ 

779 if not isinstance(time, datetime.datetime): 

780 raise TypeError("Expecting datetime object.") 

781 if self._not_valid_before is not None: 

782 raise ValueError("The not valid before may only be set once.") 

783 time = _convert_to_naive_utc_time(time) 

784 if time < _EARLIEST_UTC_TIME: 

785 raise ValueError( 

786 "The not valid before date must be on or after" 

787 " 1950 January 1)." 

788 ) 

789 if self._not_valid_after is not None and time > self._not_valid_after: 

790 raise ValueError( 

791 "The not valid before date must be before the not valid after " 

792 "date." 

793 ) 

794 return CertificateBuilder( 

795 self._issuer_name, 

796 self._subject_name, 

797 self._public_key, 

798 self._serial_number, 

799 time, 

800 self._not_valid_after, 

801 self._extensions, 

802 ) 

803 

804 def not_valid_after(self, time: datetime.datetime) -> "CertificateBuilder": 

805 """ 

806 Sets the certificate expiration time. 

807 """ 

808 if not isinstance(time, datetime.datetime): 

809 raise TypeError("Expecting datetime object.") 

810 if self._not_valid_after is not None: 

811 raise ValueError("The not valid after may only be set once.") 

812 time = _convert_to_naive_utc_time(time) 

813 if time < _EARLIEST_UTC_TIME: 

814 raise ValueError( 

815 "The not valid after date must be on or after" 

816 " 1950 January 1." 

817 ) 

818 if ( 

819 self._not_valid_before is not None 

820 and time < self._not_valid_before 

821 ): 

822 raise ValueError( 

823 "The not valid after date must be after the not valid before " 

824 "date." 

825 ) 

826 return CertificateBuilder( 

827 self._issuer_name, 

828 self._subject_name, 

829 self._public_key, 

830 self._serial_number, 

831 self._not_valid_before, 

832 time, 

833 self._extensions, 

834 ) 

835 

836 def add_extension( 

837 self, extval: ExtensionType, critical: bool 

838 ) -> "CertificateBuilder": 

839 """ 

840 Adds an X.509 extension to the certificate. 

841 """ 

842 if not isinstance(extval, ExtensionType): 

843 raise TypeError("extension must be an ExtensionType") 

844 

845 extension = Extension(extval.oid, critical, extval) 

846 _reject_duplicate_extension(extension, self._extensions) 

847 

848 return CertificateBuilder( 

849 self._issuer_name, 

850 self._subject_name, 

851 self._public_key, 

852 self._serial_number, 

853 self._not_valid_before, 

854 self._not_valid_after, 

855 self._extensions + [extension], 

856 ) 

857 

858 def sign( 

859 self, 

860 private_key: CERTIFICATE_PRIVATE_KEY_TYPES, 

861 algorithm: typing.Optional[hashes.HashAlgorithm], 

862 backend: typing.Any = None, 

863 ) -> Certificate: 

864 """ 

865 Signs the certificate using the CA's private key. 

866 """ 

867 if self._subject_name is None: 

868 raise ValueError("A certificate must have a subject name") 

869 

870 if self._issuer_name is None: 

871 raise ValueError("A certificate must have an issuer name") 

872 

873 if self._serial_number is None: 

874 raise ValueError("A certificate must have a serial number") 

875 

876 if self._not_valid_before is None: 

877 raise ValueError("A certificate must have a not valid before time") 

878 

879 if self._not_valid_after is None: 

880 raise ValueError("A certificate must have a not valid after time") 

881 

882 if self._public_key is None: 

883 raise ValueError("A certificate must have a public key") 

884 

885 return rust_x509.create_x509_certificate(self, private_key, algorithm) 

886 

887 

888class CertificateRevocationListBuilder: 

889 _extensions: typing.List[Extension[ExtensionType]] 

890 _revoked_certificates: typing.List[RevokedCertificate] 

891 

892 def __init__( 

893 self, 

894 issuer_name: typing.Optional[Name] = None, 

895 last_update: typing.Optional[datetime.datetime] = None, 

896 next_update: typing.Optional[datetime.datetime] = None, 

897 extensions: typing.List[Extension[ExtensionType]] = [], 

898 revoked_certificates: typing.List[RevokedCertificate] = [], 

899 ): 

900 self._issuer_name = issuer_name 

901 self._last_update = last_update 

902 self._next_update = next_update 

903 self._extensions = extensions 

904 self._revoked_certificates = revoked_certificates 

905 

906 def issuer_name( 

907 self, issuer_name: Name 

908 ) -> "CertificateRevocationListBuilder": 

909 if not isinstance(issuer_name, Name): 

910 raise TypeError("Expecting x509.Name object.") 

911 if self._issuer_name is not None: 

912 raise ValueError("The issuer name may only be set once.") 

913 return CertificateRevocationListBuilder( 

914 issuer_name, 

915 self._last_update, 

916 self._next_update, 

917 self._extensions, 

918 self._revoked_certificates, 

919 ) 

920 

921 def last_update( 

922 self, last_update: datetime.datetime 

923 ) -> "CertificateRevocationListBuilder": 

924 if not isinstance(last_update, datetime.datetime): 

925 raise TypeError("Expecting datetime object.") 

926 if self._last_update is not None: 

927 raise ValueError("Last update may only be set once.") 

928 last_update = _convert_to_naive_utc_time(last_update) 

929 if last_update < _EARLIEST_UTC_TIME: 

930 raise ValueError( 

931 "The last update date must be on or after" " 1950 January 1." 

932 ) 

933 if self._next_update is not None and last_update > self._next_update: 

934 raise ValueError( 

935 "The last update date must be before the next update date." 

936 ) 

937 return CertificateRevocationListBuilder( 

938 self._issuer_name, 

939 last_update, 

940 self._next_update, 

941 self._extensions, 

942 self._revoked_certificates, 

943 ) 

944 

945 def next_update( 

946 self, next_update: datetime.datetime 

947 ) -> "CertificateRevocationListBuilder": 

948 if not isinstance(next_update, datetime.datetime): 

949 raise TypeError("Expecting datetime object.") 

950 if self._next_update is not None: 

951 raise ValueError("Last update may only be set once.") 

952 next_update = _convert_to_naive_utc_time(next_update) 

953 if next_update < _EARLIEST_UTC_TIME: 

954 raise ValueError( 

955 "The last update date must be on or after" " 1950 January 1." 

956 ) 

957 if self._last_update is not None and next_update < self._last_update: 

958 raise ValueError( 

959 "The next update date must be after the last update date." 

960 ) 

961 return CertificateRevocationListBuilder( 

962 self._issuer_name, 

963 self._last_update, 

964 next_update, 

965 self._extensions, 

966 self._revoked_certificates, 

967 ) 

968 

969 def add_extension( 

970 self, extval: ExtensionType, critical: bool 

971 ) -> "CertificateRevocationListBuilder": 

972 """ 

973 Adds an X.509 extension to the certificate revocation list. 

974 """ 

975 if not isinstance(extval, ExtensionType): 

976 raise TypeError("extension must be an ExtensionType") 

977 

978 extension = Extension(extval.oid, critical, extval) 

979 _reject_duplicate_extension(extension, self._extensions) 

980 return CertificateRevocationListBuilder( 

981 self._issuer_name, 

982 self._last_update, 

983 self._next_update, 

984 self._extensions + [extension], 

985 self._revoked_certificates, 

986 ) 

987 

988 def add_revoked_certificate( 

989 self, revoked_certificate: RevokedCertificate 

990 ) -> "CertificateRevocationListBuilder": 

991 """ 

992 Adds a revoked certificate to the CRL. 

993 """ 

994 if not isinstance(revoked_certificate, RevokedCertificate): 

995 raise TypeError("Must be an instance of RevokedCertificate") 

996 

997 return CertificateRevocationListBuilder( 

998 self._issuer_name, 

999 self._last_update, 

1000 self._next_update, 

1001 self._extensions, 

1002 self._revoked_certificates + [revoked_certificate], 

1003 ) 

1004 

1005 def sign( 

1006 self, 

1007 private_key: CERTIFICATE_PRIVATE_KEY_TYPES, 

1008 algorithm: typing.Optional[hashes.HashAlgorithm], 

1009 backend: typing.Any = None, 

1010 ) -> CertificateRevocationList: 

1011 if self._issuer_name is None: 

1012 raise ValueError("A CRL must have an issuer name") 

1013 

1014 if self._last_update is None: 

1015 raise ValueError("A CRL must have a last update time") 

1016 

1017 if self._next_update is None: 

1018 raise ValueError("A CRL must have a next update time") 

1019 

1020 return rust_x509.create_x509_crl(self, private_key, algorithm) 

1021 

1022 

1023class RevokedCertificateBuilder: 

1024 def __init__( 

1025 self, 

1026 serial_number: typing.Optional[int] = None, 

1027 revocation_date: typing.Optional[datetime.datetime] = None, 

1028 extensions: typing.List[Extension[ExtensionType]] = [], 

1029 ): 

1030 self._serial_number = serial_number 

1031 self._revocation_date = revocation_date 

1032 self._extensions = extensions 

1033 

1034 def serial_number(self, number: int) -> "RevokedCertificateBuilder": 

1035 if not isinstance(number, int): 

1036 raise TypeError("Serial number must be of integral type.") 

1037 if self._serial_number is not None: 

1038 raise ValueError("The serial number may only be set once.") 

1039 if number <= 0: 

1040 raise ValueError("The serial number should be positive") 

1041 

1042 # ASN.1 integers are always signed, so most significant bit must be 

1043 # zero. 

1044 if number.bit_length() >= 160: # As defined in RFC 5280 

1045 raise ValueError( 

1046 "The serial number should not be more than 159 " "bits." 

1047 ) 

1048 return RevokedCertificateBuilder( 

1049 number, self._revocation_date, self._extensions 

1050 ) 

1051 

1052 def revocation_date( 

1053 self, time: datetime.datetime 

1054 ) -> "RevokedCertificateBuilder": 

1055 if not isinstance(time, datetime.datetime): 

1056 raise TypeError("Expecting datetime object.") 

1057 if self._revocation_date is not None: 

1058 raise ValueError("The revocation date may only be set once.") 

1059 time = _convert_to_naive_utc_time(time) 

1060 if time < _EARLIEST_UTC_TIME: 

1061 raise ValueError( 

1062 "The revocation date must be on or after" " 1950 January 1." 

1063 ) 

1064 return RevokedCertificateBuilder( 

1065 self._serial_number, time, self._extensions 

1066 ) 

1067 

1068 def add_extension( 

1069 self, extval: ExtensionType, critical: bool 

1070 ) -> "RevokedCertificateBuilder": 

1071 if not isinstance(extval, ExtensionType): 

1072 raise TypeError("extension must be an ExtensionType") 

1073 

1074 extension = Extension(extval.oid, critical, extval) 

1075 _reject_duplicate_extension(extension, self._extensions) 

1076 return RevokedCertificateBuilder( 

1077 self._serial_number, 

1078 self._revocation_date, 

1079 self._extensions + [extension], 

1080 ) 

1081 

1082 def build(self, backend: typing.Any = None) -> RevokedCertificate: 

1083 if self._serial_number is None: 

1084 raise ValueError("A revoked certificate must have a serial number") 

1085 if self._revocation_date is None: 

1086 raise ValueError( 

1087 "A revoked certificate must have a revocation date" 

1088 ) 

1089 return _RawRevokedCertificate( 

1090 self._serial_number, 

1091 self._revocation_date, 

1092 Extensions(self._extensions), 

1093 ) 

1094 

1095 

1096def random_serial_number() -> int: 

1097 return int.from_bytes(os.urandom(20), "big") >> 1