Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sigstore/models.py: 38%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

355 statements  

1# Copyright 2022 The Sigstore Authors 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15""" 

16Common models shared between signing and verification. 

17""" 

18 

19from __future__ import annotations 

20 

21import base64 

22import logging 

23from collections import defaultdict 

24from collections.abc import Iterable 

25from enum import Enum 

26from pathlib import Path 

27from textwrap import dedent 

28from typing import Any 

29 

30import rfc8785 

31from cryptography.hazmat.primitives.serialization import Encoding 

32from cryptography.x509 import ( 

33 Certificate, 

34 load_der_x509_certificate, 

35) 

36from pydantic import TypeAdapter 

37from rekor_types import Dsse, Hashedrekord, ProposedEntry 

38from rfc3161_client import TimeStampResponse, decode_timestamp_response 

39from sigstore_models.bundle import v1 as bundle_v1 

40from sigstore_models.bundle.v1 import Bundle as _Bundle 

41from sigstore_models.bundle.v1 import ( 

42 TimestampVerificationData as _TimestampVerificationData, 

43) 

44from sigstore_models.bundle.v1 import VerificationMaterial as _VerificationMaterial 

45from sigstore_models.common import v1 as common_v1 

46from sigstore_models.common.v1 import MessageSignature, RFC3161SignedTimestamp 

47from sigstore_models.rekor import v1 as rekor_v1 

48from sigstore_models.rekor.v1 import TransparencyLogEntry as _TransparencyLogEntry 

49from sigstore_models.trustroot import v1 as trustroot_v1 

50 

51from sigstore import dsse 

52from sigstore._internal.fulcio.client import FulcioClient 

53from sigstore._internal.merkle import verify_merkle_inclusion 

54from sigstore._internal.rekor import RekorLogSubmitter 

55from sigstore._internal.rekor.checkpoint import verify_checkpoint 

56from sigstore._internal.timestamp import TimestampAuthorityClient 

57from sigstore._internal.trust import ( 

58 CertificateAuthority, 

59 CTKeyring, 

60 Keyring, 

61 KeyringPurpose, 

62 RekorKeyring, 

63) 

64from sigstore._internal.tuf import DEFAULT_TUF_URL, STAGING_TUF_URL, TrustUpdater 

65from sigstore._utils import KeyID, cert_is_leaf, cert_is_root_ca, is_timerange_valid 

66from sigstore.errors import Error, MetadataError, TUFError, VerificationError 

67 

68# Versions supported by this client 

69REKOR_VERSIONS = [1, 2] 

70TSA_VERSIONS = [1] 

71FULCIO_VERSIONS = [1] 

72OIDC_VERSIONS = [1] 

73 

74 

75_logger = logging.getLogger(__name__) 

76 

77 

78class TransparencyLogEntry: 

79 """ 

80 Represents a transparency log entry. 

81 """ 

82 

83 def __init__(self, inner: _TransparencyLogEntry) -> None: 

84 """ 

85 Creates a new `TransparencyLogEntry` from the given inner object. 

86 

87 @private 

88 """ 

89 self._inner = inner 

90 self._validate() 

91 

92 def _validate(self) -> None: 

93 """ 

94 Ensure this transparency log entry is well-formed and upholds our 

95 client invariants. 

96 """ 

97 

98 inclusion_proof: rekor_v1.InclusionProof | None = self._inner.inclusion_proof 

99 # This check is required by us as the client, not the 

100 # protobuf-specs themselves. 

101 if not inclusion_proof or not inclusion_proof.checkpoint: 

102 raise InvalidBundle("entry must contain inclusion proof, with checkpoint") 

103 

104 def __eq__(self, value: object) -> bool: 

105 """ 

106 Compares this `TransparencyLogEntry` with another object for equality. 

107 

108 Two `TransparencyLogEntry` instances are considered equal if their 

109 inner contents are equal. 

110 """ 

111 if not isinstance(value, TransparencyLogEntry): 

112 return NotImplemented 

113 return self._inner == value._inner 

114 

115 @classmethod 

116 def _from_v1_response(cls, dict_: dict[str, Any]) -> TransparencyLogEntry: 

117 """ 

118 Create a new `TransparencyLogEntry` from the given API response. 

119 """ 

120 

121 # Assumes we only get one entry back 

122 entries = list(dict_.items()) 

123 if len(entries) != 1: 

124 raise ValueError("Received multiple entries in response") 

125 _, entry = entries[0] 

126 

127 # Fill in the appropriate kind 

128 body_entry: ProposedEntry = TypeAdapter(ProposedEntry).validate_json( 

129 base64.b64decode(entry["body"]) 

130 ) 

131 if not isinstance(body_entry, (Hashedrekord, Dsse)): 

132 raise InvalidBundle("log entry is not of expected type") 

133 

134 raw_inclusion_proof = entry["verification"]["inclusionProof"] 

135 

136 # NOTE: The type ignores below are a consequence of our Pydantic 

137 # modeling: mypy and other typecheckers see `ProtoU64` as `int`, 

138 # but it gets coerced from a string due to Protobuf's JSON serialization. 

139 inner = _TransparencyLogEntry( 

140 log_index=str(entry["logIndex"]), # type: ignore[arg-type] 

141 log_id=common_v1.LogId( 

142 key_id=base64.b64encode(bytes.fromhex(entry["logID"])) 

143 ), 

144 kind_version=rekor_v1.KindVersion( 

145 kind=body_entry.kind, version=body_entry.api_version 

146 ), 

147 integrated_time=str(entry["integratedTime"]), # type: ignore[arg-type] 

148 inclusion_promise=rekor_v1.InclusionPromise( 

149 signed_entry_timestamp=entry["verification"]["signedEntryTimestamp"] 

150 ), 

151 inclusion_proof=rekor_v1.InclusionProof( 

152 log_index=str(raw_inclusion_proof["logIndex"]), # type: ignore[arg-type] 

153 root_hash=base64.b64encode( 

154 bytes.fromhex(raw_inclusion_proof["rootHash"]) 

155 ), 

156 tree_size=str(raw_inclusion_proof["treeSize"]), # type: ignore[arg-type] 

157 hashes=[ 

158 base64.b64encode(bytes.fromhex(h)) 

159 for h in raw_inclusion_proof["hashes"] 

160 ], 

161 checkpoint=rekor_v1.Checkpoint( 

162 envelope=raw_inclusion_proof["checkpoint"] 

163 ), 

164 ), 

165 canonicalized_body=entry["body"], 

166 ) 

167 

168 return cls(inner) 

169 

170 def _encode_canonical(self) -> bytes: 

171 """ 

172 Returns a canonicalized JSON (RFC 8785) representation of the transparency log entry. 

173 

174 This encoded representation is suitable for verification against 

175 the Signed Entry Timestamp. 

176 """ 

177 # We might not have an integrated time if our log entry is from rekor 

178 # v2, i.e. was integrated synchronously instead of via an 

179 # inclusion promise. 

180 if self._inner.integrated_time is None: 

181 raise ValueError( 

182 "can't encode canonical form for SET without integrated time" 

183 ) 

184 

185 payload: dict[str, int | str] = { 

186 "body": base64.b64encode(self._inner.canonicalized_body).decode(), 

187 "integratedTime": self._inner.integrated_time, 

188 "logID": self._inner.log_id.key_id.hex(), 

189 "logIndex": self._inner.log_index, 

190 } 

191 

192 return rfc8785.dumps(payload) 

193 

194 def _verify_set(self, keyring: RekorKeyring) -> None: 

195 """ 

196 Verify the inclusion promise (Signed Entry Timestamp) for a given transparency log 

197 `entry` using the given `keyring`. 

198 

199 Fails if the given log entry does not contain an inclusion promise. 

200 """ 

201 

202 if self._inner.inclusion_promise is None: 

203 raise VerificationError("SET: invalid inclusion promise: missing") 

204 

205 signed_entry_ts = self._inner.inclusion_promise.signed_entry_timestamp 

206 

207 try: 

208 keyring.verify( 

209 key_id=KeyID(self._inner.log_id.key_id), 

210 signature=signed_entry_ts, 

211 data=self._encode_canonical(), 

212 ) 

213 except VerificationError as exc: 

214 raise VerificationError(f"SET: invalid inclusion promise: {exc}") 

215 

216 def _verify(self, keyring: RekorKeyring) -> None: 

217 """ 

218 Verifies this log entry. 

219 

220 This method performs steps (5), (6), and optionally (7) in 

221 the top-level verify API: 

222 

223 * Verifies the consistency of the entry with the given bundle; 

224 * Verifies the Merkle inclusion proof and its signed checkpoint; 

225 * Verifies the inclusion promise, if present. 

226 """ 

227 

228 verify_merkle_inclusion(self) 

229 verify_checkpoint(keyring, self) 

230 

231 _logger.debug( 

232 f"successfully verified inclusion proof: index={self._inner.log_index}" 

233 ) 

234 

235 if self._inner.inclusion_promise and self._inner.integrated_time: 

236 self._verify_set(keyring) 

237 _logger.debug( 

238 f"successfully verified inclusion promise: index={self._inner.log_index}" 

239 ) 

240 

241 

242class TimestampVerificationData: 

243 """ 

244 Represents a TimestampVerificationData structure. 

245 

246 @private 

247 """ 

248 

249 def __init__(self, inner: _TimestampVerificationData) -> None: 

250 """Init method.""" 

251 self._inner = inner 

252 self._verify() 

253 

254 def _verify(self) -> None: 

255 """ 

256 Verifies the TimestampVerificationData. 

257 

258 It verifies that TimeStamp Responses embedded in the bundle are correctly 

259 formed. 

260 """ 

261 if not (timestamps := self._inner.rfc3161_timestamps): 

262 timestamps = [] 

263 

264 try: 

265 self._signed_ts = [ 

266 decode_timestamp_response(ts.signed_timestamp) for ts in timestamps 

267 ] 

268 except ValueError: 

269 raise VerificationError("Invalid Timestamp Response") 

270 

271 @property 

272 def rfc3161_timestamps(self) -> list[TimeStampResponse]: 

273 """Returns a list of signed timestamp.""" 

274 return self._signed_ts 

275 

276 @classmethod 

277 def from_json(cls, raw: str | bytes) -> TimestampVerificationData: 

278 """ 

279 Deserialize the given timestamp verification data. 

280 """ 

281 inner = _TimestampVerificationData.from_json(raw) 

282 return cls(inner) 

283 

284 

285class VerificationMaterial: 

286 """ 

287 Represents a VerificationMaterial structure. 

288 """ 

289 

290 def __init__(self, inner: _VerificationMaterial) -> None: 

291 """Init method.""" 

292 self._inner = inner 

293 

294 @property 

295 def timestamp_verification_data(self) -> TimestampVerificationData | None: 

296 """ 

297 Returns the Timestamp Verification Data, if present. 

298 """ 

299 if ( 

300 self._inner.timestamp_verification_data 

301 and self._inner.timestamp_verification_data.rfc3161_timestamps 

302 ): 

303 return TimestampVerificationData(self._inner.timestamp_verification_data) 

304 return None 

305 

306 

307class InvalidBundle(Error): 

308 """ 

309 Raised when the associated `Bundle` is invalid in some way. 

310 """ 

311 

312 def diagnostics(self) -> str: 

313 """Returns diagnostics for the error.""" 

314 

315 return dedent( 

316 f"""\ 

317 An issue occurred while parsing the Sigstore bundle. 

318 

319 The provided bundle is malformed and may have been modified maliciously. 

320 

321 Additional context: 

322 

323 {self} 

324 """ 

325 ) 

326 

327 

328class IncompatibleEntry(InvalidBundle): 

329 """ 

330 Raised when the log entry within the `Bundle` has an incompatible KindVersion. 

331 """ 

332 

333 def diagnostics(self) -> str: 

334 """Returns diagnostics for the error.""" 

335 

336 return dedent( 

337 f"""\ 

338 The provided bundle contains a transparency log entry that is incompatible with this version of sigstore-python. Please upgrade your verifying client. 

339 

340 Additional context: 

341 

342 {self} 

343 """ 

344 ) 

345 

346 

347class Bundle: 

348 """ 

349 Represents a Sigstore bundle. 

350 """ 

351 

352 class BundleType(str, Enum): 

353 """ 

354 Known Sigstore bundle media types. 

355 """ 

356 

357 BUNDLE_0_1 = "application/vnd.dev.sigstore.bundle+json;version=0.1" 

358 BUNDLE_0_2 = "application/vnd.dev.sigstore.bundle+json;version=0.2" 

359 BUNDLE_0_3_ALT = "application/vnd.dev.sigstore.bundle+json;version=0.3" 

360 BUNDLE_0_3 = "application/vnd.dev.sigstore.bundle.v0.3+json" 

361 

362 def __str__(self) -> str: 

363 """Returns the variant's string value.""" 

364 return self.value 

365 

366 def __init__(self, inner: _Bundle) -> None: 

367 """ 

368 Creates a new bundle. This is not a public API; use 

369 `from_json` instead. 

370 

371 @private 

372 """ 

373 self._inner = inner 

374 self._verify() 

375 

376 def _verify(self) -> None: 

377 """ 

378 Performs various feats of heroism to ensure the bundle is well-formed 

379 and upholds invariants, including: 

380 

381 * The "leaf" (signing) certificate is present; 

382 * There is a inclusion proof present, even if the Bundle's version 

383 predates a mandatory inclusion proof. 

384 """ 

385 

386 # The bundle must have a recognized media type. 

387 try: 

388 media_type = Bundle.BundleType(self._inner.media_type) 

389 except ValueError: 

390 raise InvalidBundle(f"unsupported bundle format: {self._inner.media_type}") 

391 

392 # Extract the signing certificate. 

393 if media_type in ( 

394 Bundle.BundleType.BUNDLE_0_3, 

395 Bundle.BundleType.BUNDLE_0_3_ALT, 

396 ): 

397 # For "v3" bundles, the signing certificate is the only one present. 

398 if not self._inner.verification_material.certificate: 

399 raise InvalidBundle("expected certificate in bundle") 

400 

401 leaf_cert = load_der_x509_certificate( 

402 self._inner.verification_material.certificate.raw_bytes 

403 ) 

404 else: 

405 # In older bundles, there is an entire pool (misleadingly called 

406 # a chain) of certificates, the first of which is the signing 

407 # certificate. 

408 if not self._inner.verification_material.x509_certificate_chain: 

409 raise InvalidBundle("expected certificate chain in bundle") 

410 

411 chain = self._inner.verification_material.x509_certificate_chain 

412 if not chain.certificates: 

413 raise InvalidBundle("expected non-empty certificate chain in bundle") 

414 

415 # Per client policy in protobuf-specs: the first entry in the chain 

416 # MUST be a leaf certificate, and the rest of the chain MUST NOT 

417 # include a root CA or any intermediate CAs that appear in an 

418 # independent root of trust. 

419 # 

420 # We expect some old bundles to violate the rules around root 

421 # and intermediate CAs, so we issue warnings and not hard errors 

422 # in those cases. 

423 leaf_cert, *chain_certs = ( 

424 load_der_x509_certificate(cert.raw_bytes) for cert in chain.certificates 

425 ) 

426 if not cert_is_leaf(leaf_cert): 

427 raise InvalidBundle( 

428 "bundle contains an invalid leaf or non-leaf certificate in the leaf position" 

429 ) 

430 

431 for chain_cert in chain_certs: 

432 # TODO: We should also retrieve the root of trust here and 

433 # cross-check against it. 

434 if cert_is_root_ca(chain_cert): 

435 _logger.warning( 

436 "this bundle contains a root CA, making it subject to misuse" 

437 ) 

438 

439 self._signing_certificate = leaf_cert 

440 

441 # Extract the log entry. For the time being, we expect 

442 # bundles to only contain a single log entry. 

443 tlog_entries = self._inner.verification_material.tlog_entries 

444 if len(tlog_entries) != 1: 

445 raise InvalidBundle("expected exactly one log entry in bundle") 

446 tlog_entry = tlog_entries[0] 

447 

448 if tlog_entry.kind_version.version not in ["0.0.1", "0.0.2"]: 

449 raise IncompatibleEntry( 

450 f"Expected log entry version 0.0.1 - 0.0.2, got {tlog_entry.kind_version.version}" 

451 ) 

452 

453 # Handling of inclusion promises and proofs varies between bundle 

454 # format versions: 

455 # 

456 # * For 0.1, an inclusion promise is required; the client 

457 # MUST verify the inclusion promise. 

458 # The inclusion proof is NOT required. If provided, it might NOT 

459 # contain a checkpoint; in this case, we ignore it (since it's 

460 # useless without one). 

461 # 

462 # * For 0.2+, an inclusion proof is required; the client MUST 

463 # verify the inclusion proof. The inclusion prof MUST contain 

464 # a checkpoint. 

465 # 

466 # The inclusion promise is NOT required if another source of signed 

467 # time (such as a signed timestamp) is present. If no other source 

468 # of signed time is present, then the inclusion promise MUST be 

469 # present. 

470 # 

471 # Before all of this, we require that the inclusion proof be present 

472 # (when constructing the LogEntry). 

473 log_entry = TransparencyLogEntry(tlog_entry) 

474 

475 if media_type == Bundle.BundleType.BUNDLE_0_1: 

476 if not log_entry._inner.inclusion_promise: 

477 raise InvalidBundle("bundle must contain an inclusion promise") 

478 if not log_entry._inner.inclusion_proof.checkpoint: 

479 _logger.debug( 

480 "0.1 bundle contains inclusion proof without checkpoint; ignoring" 

481 ) 

482 else: 

483 if not log_entry._inner.inclusion_proof.checkpoint: 

484 raise InvalidBundle("expected checkpoint in inclusion proof") 

485 

486 if ( 

487 not log_entry._inner.inclusion_promise 

488 and not self.verification_material.timestamp_verification_data 

489 ): 

490 raise InvalidBundle( 

491 "bundle must contain an inclusion promise or signed timestamp(s)" 

492 ) 

493 

494 self._log_entry = log_entry 

495 

496 @property 

497 def signing_certificate(self) -> Certificate: 

498 """Returns the bundle's contained signing (i.e. leaf) certificate.""" 

499 return self._signing_certificate 

500 

501 @property 

502 def log_entry(self) -> TransparencyLogEntry: 

503 """ 

504 Returns the bundle's log entry, containing an inclusion proof 

505 (with checkpoint) and an inclusion promise (if the latter is present). 

506 """ 

507 return self._log_entry 

508 

509 @property 

510 def _dsse_envelope(self) -> dsse.Envelope | None: 

511 """ 

512 Returns the DSSE envelope within this Bundle as a `dsse.Envelope`. 

513 

514 @private 

515 """ 

516 if self._inner.dsse_envelope is not None: 

517 return dsse.Envelope(self._inner.dsse_envelope) 

518 return None 

519 

520 @property 

521 def signature(self) -> bytes: 

522 """ 

523 Returns the signature bytes of this bundle. 

524 Either from the DSSE Envelope or from the message itself. 

525 """ 

526 return ( 

527 self._dsse_envelope.signature 

528 if self._dsse_envelope 

529 else self._inner.message_signature.signature # type: ignore[union-attr] 

530 ) 

531 

532 @property 

533 def verification_material(self) -> VerificationMaterial: 

534 """ 

535 Returns the bundle's verification material. 

536 """ 

537 return VerificationMaterial(self._inner.verification_material) 

538 

539 @classmethod 

540 def from_json(cls, raw: bytes | str) -> Bundle: 

541 """ 

542 Deserialize the given Sigstore bundle. 

543 """ 

544 try: 

545 inner = _Bundle.from_json(raw) 

546 except ValueError as exc: 

547 raise InvalidBundle(f"failed to load bundle: {exc}") 

548 return cls(inner) 

549 

550 def to_json(self) -> str: 

551 """ 

552 Return a JSON encoding of this bundle. 

553 """ 

554 return self._inner.to_json() 

555 

556 def _to_parts( 

557 self, 

558 ) -> tuple[Certificate, MessageSignature | dsse.Envelope, TransparencyLogEntry]: 

559 """ 

560 Decompose the `Bundle` into its core constituent parts. 

561 

562 @private 

563 """ 

564 

565 content: MessageSignature | dsse.Envelope 

566 if self._dsse_envelope: 

567 content = self._dsse_envelope 

568 else: 

569 content = self._inner.message_signature # type: ignore[assignment] 

570 

571 return (self.signing_certificate, content, self.log_entry) 

572 

573 @classmethod 

574 def from_parts( 

575 cls, cert: Certificate, sig: bytes, log_entry: TransparencyLogEntry 

576 ) -> Bundle: 

577 """ 

578 Construct a Sigstore bundle (of `hashedrekord` type) from its 

579 constituent parts. 

580 """ 

581 

582 return cls._from_parts( 

583 cert, MessageSignature(signature=base64.b64encode(sig)), log_entry 

584 ) 

585 

586 @classmethod 

587 def _from_parts( 

588 cls, 

589 cert: Certificate, 

590 content: MessageSignature | dsse.Envelope, 

591 log_entry: TransparencyLogEntry, 

592 signed_timestamp: list[TimeStampResponse] | None = None, 

593 ) -> Bundle: 

594 """ 

595 @private 

596 """ 

597 

598 timestamp_verifcation_data = bundle_v1.TimestampVerificationData( 

599 rfc3161_timestamps=[] 

600 ) 

601 if signed_timestamp is not None: 

602 timestamp_verifcation_data.rfc3161_timestamps.extend( 

603 [ 

604 RFC3161SignedTimestamp( 

605 signed_timestamp=base64.b64encode(response.as_bytes()) 

606 ) 

607 for response in signed_timestamp 

608 ] 

609 ) 

610 

611 # Fill in the appropriate variant. 

612 message_signature = None 

613 dsse_envelope = None 

614 if isinstance(content, MessageSignature): 

615 message_signature = content 

616 else: 

617 dsse_envelope = content._inner 

618 

619 inner = _Bundle( 

620 media_type=Bundle.BundleType.BUNDLE_0_3.value, 

621 verification_material=bundle_v1.VerificationMaterial( 

622 certificate=common_v1.X509Certificate( 

623 raw_bytes=base64.b64encode(cert.public_bytes(Encoding.DER)) 

624 ), 

625 tlog_entries=[log_entry._inner], 

626 timestamp_verification_data=timestamp_verifcation_data, 

627 ), 

628 message_signature=message_signature, 

629 dsse_envelope=dsse_envelope, 

630 ) 

631 

632 return cls(inner) 

633 

634 

635class SigningConfig: 

636 """ 

637 Signing configuration for a Sigstore instance. 

638 """ 

639 

640 class SigningConfigType(str, Enum): 

641 """ 

642 Known Sigstore signing config media types. 

643 """ 

644 

645 SIGNING_CONFIG_0_2 = "application/vnd.dev.sigstore.signingconfig.v0.2+json" 

646 

647 def __str__(self) -> str: 

648 """Returns the variant's string value.""" 

649 return self.value 

650 

651 def __init__( 

652 self, inner: trustroot_v1.SigningConfig, tlog_version: int | None = None 

653 ): 

654 """ 

655 Construct a new `SigningConfig`. 

656 

657 tlog_version is an optional argument that enforces that only specified 

658 versions of rekor are included in the transparency logs. 

659 

660 @api private 

661 """ 

662 self._inner = inner 

663 

664 # must have a recognized media type. 

665 try: 

666 SigningConfig.SigningConfigType(self._inner.media_type) 

667 except ValueError: 

668 raise Error(f"unsupported signing config format: {self._inner.media_type}") 

669 

670 # Create lists of service protos that are valid, selected by the service 

671 # configuration & supported by this client 

672 if tlog_version is None: 

673 tlog_versions = REKOR_VERSIONS 

674 else: 

675 tlog_versions = [tlog_version] 

676 

677 self._tlogs = self._get_valid_services( 

678 self._inner.rekor_tlog_urls, tlog_versions, self._inner.rekor_tlog_config 

679 ) 

680 if not self._tlogs: 

681 raise Error("No valid Rekor transparency log found in signing config") 

682 

683 self._tsas = self._get_valid_services( 

684 self._inner.tsa_urls, TSA_VERSIONS, self._inner.tsa_config 

685 ) 

686 

687 self._fulcios = self._get_valid_services( 

688 self._inner.ca_urls, FULCIO_VERSIONS, None 

689 ) 

690 if not self._fulcios: 

691 raise Error("No valid Fulcio CA found in signing config") 

692 

693 self._oidcs = self._get_valid_services( 

694 self._inner.oidc_urls, OIDC_VERSIONS, None 

695 ) 

696 

697 @classmethod 

698 def from_file( 

699 cls, 

700 path: str, 

701 ) -> SigningConfig: 

702 """Create a new signing config from file""" 

703 inner = trustroot_v1.SigningConfig.from_json(Path(path).read_bytes()) 

704 return cls(inner) 

705 

706 @staticmethod 

707 def _get_valid_services( 

708 services: list[trustroot_v1.Service], 

709 supported_versions: list[int], 

710 config: trustroot_v1.ServiceConfiguration | None, 

711 ) -> list[trustroot_v1.Service]: 

712 """Return supported services, taking SigningConfig restrictions into account""" 

713 

714 # split services by operator, only include valid services 

715 services_by_operator: dict[str, list[trustroot_v1.Service]] = defaultdict(list) 

716 for service in services: 

717 if service.major_api_version not in supported_versions: 

718 continue 

719 

720 if not is_timerange_valid(service.valid_for, allow_expired=False): 

721 continue 

722 

723 services_by_operator[service.operator].append(service) 

724 

725 # build a list of services but make sure we only include one service per operator 

726 # and use the highest version available for that operator 

727 result: list[trustroot_v1.Service] = [] 

728 for op_services in services_by_operator.values(): 

729 op_services.sort(key=lambda s: s.major_api_version) 

730 result.append(op_services[-1]) 

731 

732 # Depending on ServiceSelector, prune the result list 

733 if not config or config.selector == trustroot_v1.ServiceSelector.ALL: 

734 return result 

735 

736 # handle EXACT and ANY selectors 

737 count = ( 

738 config.count 

739 if config.selector == trustroot_v1.ServiceSelector.EXACT and config.count 

740 else 1 

741 ) 

742 

743 if ( 

744 config.selector == trustroot_v1.ServiceSelector.EXACT 

745 and len(result) < count 

746 ): 

747 raise ValueError( 

748 f"Expected {count} services in signing config, found {len(result)}" 

749 ) 

750 

751 return result[:count] 

752 

753 def get_tlogs(self) -> list[RekorLogSubmitter]: 

754 """ 

755 Returns the rekor transparency log clients to sign with. 

756 """ 

757 result: list[RekorLogSubmitter] = [] 

758 for tlog in self._tlogs: 

759 if tlog.major_api_version == 1: 

760 from sigstore._internal.rekor.client import RekorClient 

761 

762 result.append(RekorClient(tlog.url)) 

763 elif tlog.major_api_version == 2: 

764 from sigstore._internal.rekor.client_v2 import RekorV2Client 

765 

766 result.append(RekorV2Client(tlog.url)) 

767 else: 

768 raise AssertionError(f"Unexpected Rekor v{tlog.major_api_version}") 

769 return result 

770 

771 def get_fulcio(self) -> FulcioClient: 

772 """ 

773 Returns a Fulcio client to get a signing certificate from 

774 """ 

775 return FulcioClient(self._fulcios[0].url) 

776 

777 def get_oidc_url(self) -> str: 

778 """ 

779 Returns url for the OIDC provider that client should use to interactively 

780 authenticate. 

781 """ 

782 if not self._oidcs: 

783 raise Error("No valid OIDC provider found in signing config") 

784 return self._oidcs[0].url 

785 

786 def get_tsas(self) -> list[TimestampAuthorityClient]: 

787 """ 

788 Returns timestamp authority clients for urls configured in signing config. 

789 """ 

790 return [TimestampAuthorityClient(s.url) for s in self._tsas] 

791 

792 

793class TrustedRoot: 

794 """ 

795 The cryptographic root(s) of trust for a Sigstore instance. 

796 """ 

797 

798 class TrustedRootType(str, Enum): 

799 """ 

800 Known Sigstore trusted root media types. 

801 """ 

802 

803 TRUSTED_ROOT_0_1 = "application/vnd.dev.sigstore.trustedroot+json;version=0.1" 

804 

805 def __str__(self) -> str: 

806 """Returns the variant's string value.""" 

807 return self.value 

808 

809 def __init__(self, inner: trustroot_v1.TrustedRoot): 

810 """ 

811 Construct a new `TrustedRoot`. 

812 

813 @api private 

814 """ 

815 self._inner = inner 

816 self._verify() 

817 

818 def _verify(self) -> None: 

819 """ 

820 Performs various feats of heroism to ensure that the trusted root 

821 is well-formed. 

822 """ 

823 

824 # The trusted root must have a recognized media type. 

825 try: 

826 TrustedRoot.TrustedRootType(self._inner.media_type) 

827 except ValueError: 

828 raise Error(f"unsupported trusted root format: {self._inner.media_type}") 

829 

830 @classmethod 

831 def from_file( 

832 cls, 

833 path: str, 

834 ) -> TrustedRoot: 

835 """Create a new trust root from file""" 

836 inner = trustroot_v1.TrustedRoot.from_json(Path(path).read_bytes()) 

837 return cls(inner) 

838 

839 def _get_tlog_keys( 

840 self, tlogs: list[trustroot_v1.TransparencyLogInstance], purpose: KeyringPurpose 

841 ) -> Iterable[common_v1.PublicKey]: 

842 """ 

843 Yields an iterator of public keys for transparency log instances that 

844 are suitable for `purpose`. 

845 """ 

846 allow_expired = purpose is KeyringPurpose.VERIFY 

847 for tlog in tlogs: 

848 if not is_timerange_valid( 

849 tlog.public_key.valid_for, allow_expired=allow_expired 

850 ): 

851 continue 

852 

853 yield tlog.public_key 

854 

855 def rekor_keyring(self, purpose: KeyringPurpose) -> RekorKeyring: 

856 """Return keyring with keys for Rekor.""" 

857 

858 keys: list[common_v1.PublicKey] = list( 

859 self._get_tlog_keys(self._inner.tlogs, purpose) 

860 ) 

861 if len(keys) == 0: 

862 raise MetadataError("Did not find any Rekor keys in trusted root") 

863 return RekorKeyring(Keyring(keys)) 

864 

865 def ct_keyring(self, purpose: KeyringPurpose) -> CTKeyring: 

866 """Return keyring with key for CTFE.""" 

867 ctfes: list[common_v1.PublicKey] = list( 

868 self._get_tlog_keys(self._inner.ctlogs, purpose) 

869 ) 

870 if not ctfes: 

871 raise MetadataError("CTFE keys not found in trusted root") 

872 return CTKeyring(Keyring(ctfes)) 

873 

874 def get_fulcio_certs(self) -> list[Certificate]: 

875 """Return the Fulcio certificates.""" 

876 

877 certs: list[Certificate] = [] 

878 

879 # Return expired certificates too: they are expired now but may have 

880 # been active when the certificate was used to sign. 

881 for authority in self._inner.certificate_authorities: 

882 certificate_authority = CertificateAuthority(authority) 

883 certs.extend(certificate_authority.certificates(allow_expired=True)) 

884 

885 if not certs: 

886 raise MetadataError("Fulcio certificates not found in trusted root") 

887 return certs 

888 

889 def get_timestamp_authorities(self) -> list[CertificateAuthority]: 

890 """ 

891 Return the TSA present in the trusted root. 

892 

893 This list may be empty and in this case, no timestamp verification can be 

894 performed. 

895 """ 

896 certificate_authorities: list[CertificateAuthority] = [ 

897 CertificateAuthority(cert_chain) 

898 for cert_chain in self._inner.timestamp_authorities 

899 ] 

900 return certificate_authorities 

901 

902 

903class ClientTrustConfig: 

904 """ 

905 Represents a Sigstore client's trust configuration, including a root of trust. 

906 """ 

907 

908 class ClientTrustConfigType(str, Enum): 

909 """ 

910 Known Sigstore client trust config media types. 

911 """ 

912 

913 CONFIG_0_1 = "application/vnd.dev.sigstore.clienttrustconfig.v0.1+json" 

914 

915 def __str__(self) -> str: 

916 """Returns the variant's string value.""" 

917 return self.value 

918 

919 @classmethod 

920 def from_json(cls, raw: str) -> ClientTrustConfig: 

921 """ 

922 Deserialize the given client trust config. 

923 """ 

924 inner = trustroot_v1.ClientTrustConfig.from_json(raw) 

925 return cls(inner) 

926 

927 @classmethod 

928 def production( 

929 cls, 

930 offline: bool = False, 

931 ) -> ClientTrustConfig: 

932 """Create new trust config from Sigstore production TUF repository. 

933 

934 If `offline`, will use data in local TUF cache. Otherwise will 

935 update the data from remote TUF repository. 

936 """ 

937 return cls.from_tuf(DEFAULT_TUF_URL, offline) 

938 

939 @classmethod 

940 def staging( 

941 cls, 

942 offline: bool = False, 

943 ) -> ClientTrustConfig: 

944 """Create new trust config from Sigstore staging TUF repository. 

945 

946 If `offline`, will use data in local TUF cache. Otherwise will 

947 update the data from remote TUF repository. 

948 """ 

949 return cls.from_tuf(STAGING_TUF_URL, offline) 

950 

951 @classmethod 

952 def from_tuf( 

953 cls, 

954 url: str, 

955 offline: bool = False, 

956 bootstrap_root: Path | None = None, 

957 ) -> ClientTrustConfig: 

958 """Create a new trust config from a TUF repository. 

959 

960 If `offline`, will use data in local TUF cache. Otherwise will 

961 update the trust config from remote TUF repository. 

962 """ 

963 updater = TrustUpdater(url, offline, bootstrap_root) 

964 

965 tr_path = updater.get_trusted_root_path() 

966 inner_tr = trustroot_v1.TrustedRoot.from_json(Path(tr_path).read_bytes()) 

967 

968 try: 

969 sc_path = updater.get_signing_config_path() 

970 inner_sc = trustroot_v1.SigningConfig.from_json(Path(sc_path).read_bytes()) 

971 except TUFError as e: 

972 raise e 

973 

974 return cls( 

975 trustroot_v1.ClientTrustConfig( 

976 media_type=ClientTrustConfig.ClientTrustConfigType.CONFIG_0_1.value, 

977 trusted_root=inner_tr, 

978 signing_config=inner_sc, 

979 ) 

980 ) 

981 

982 def __init__(self, inner: trustroot_v1.ClientTrustConfig) -> None: 

983 """ 

984 @api private 

985 """ 

986 self._inner = inner 

987 

988 # This can be used to enforce a specific rekor major version in signingconfig 

989 self.force_tlog_version: int | None = None 

990 

991 @property 

992 def trusted_root(self) -> TrustedRoot: 

993 """ 

994 Return the interior root of trust, as a `TrustedRoot`. 

995 """ 

996 return TrustedRoot(self._inner.trusted_root) 

997 

998 @property 

999 def signing_config(self) -> SigningConfig: 

1000 """ 

1001 Return the interior root of trust, as a `SigningConfig`. 

1002 """ 

1003 return SigningConfig( 

1004 self._inner.signing_config, tlog_version=self.force_tlog_version 

1005 )