Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sigstore/models.py: 38%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

350 statements  

1# Copyright 2022 The Sigstore Authors 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15""" 

16Common models shared between signing and verification. 

17""" 

18 

19from __future__ import annotations 

20 

21import base64 

22import logging 

23from collections import defaultdict 

24from collections.abc import Iterable 

25from enum import Enum 

26from pathlib import Path 

27from textwrap import dedent 

28from typing import Any 

29 

30import rfc8785 

31from cryptography.hazmat.primitives.serialization import Encoding 

32from cryptography.x509 import ( 

33 Certificate, 

34 load_der_x509_certificate, 

35) 

36from pydantic import TypeAdapter 

37from rekor_types import Dsse, Hashedrekord, ProposedEntry 

38from rfc3161_client import TimeStampResponse, decode_timestamp_response 

39from sigstore_models.bundle import v1 as bundle_v1 

40from sigstore_models.bundle.v1 import Bundle as _Bundle 

41from sigstore_models.bundle.v1 import ( 

42 TimestampVerificationData as _TimestampVerificationData, 

43) 

44from sigstore_models.bundle.v1 import VerificationMaterial as _VerificationMaterial 

45from sigstore_models.common import v1 as common_v1 

46from sigstore_models.common.v1 import MessageSignature, RFC3161SignedTimestamp 

47from sigstore_models.rekor import v1 as rekor_v1 

48from sigstore_models.rekor.v1 import TransparencyLogEntry as _TransparencyLogEntry 

49from sigstore_models.trustroot import v1 as trustroot_v1 

50 

51from sigstore import dsse 

52from sigstore._internal.fulcio.client import FulcioClient 

53from sigstore._internal.merkle import verify_merkle_inclusion 

54from sigstore._internal.rekor import RekorLogSubmitter 

55from sigstore._internal.rekor.checkpoint import verify_checkpoint 

56from sigstore._internal.timestamp import TimestampAuthorityClient 

57from sigstore._internal.trust import ( 

58 CertificateAuthority, 

59 CTKeyring, 

60 Keyring, 

61 KeyringPurpose, 

62 RekorKeyring, 

63) 

64from sigstore._internal.tuf import DEFAULT_TUF_URL, STAGING_TUF_URL, TrustUpdater 

65from sigstore._utils import KeyID, cert_is_leaf, cert_is_root_ca, is_timerange_valid 

66from sigstore.errors import Error, MetadataError, TUFError, VerificationError 

67 

68# Versions supported by this client 

69REKOR_VERSIONS = [1, 2] 

70TSA_VERSIONS = [1] 

71FULCIO_VERSIONS = [1] 

72OIDC_VERSIONS = [1] 

73 

74 

75_logger = logging.getLogger(__name__) 

76 

77 

78class TransparencyLogEntry: 

79 """ 

80 Represents a transparency log entry. 

81 """ 

82 

83 def __init__(self, inner: _TransparencyLogEntry) -> None: 

84 """ 

85 Creates a new `TransparencyLogEntry` from the given inner object. 

86 

87 @private 

88 """ 

89 self._inner = inner 

90 self._validate() 

91 

92 def _validate(self) -> None: 

93 """ 

94 Ensure this transparency log entry is well-formed and upholds our 

95 client invariants. 

96 """ 

97 

98 inclusion_proof: rekor_v1.InclusionProof | None = self._inner.inclusion_proof 

99 # This check is required by us as the client, not the 

100 # protobuf-specs themselves. 

101 if not inclusion_proof or not inclusion_proof.checkpoint: 

102 raise InvalidBundle("entry must contain inclusion proof, with checkpoint") 

103 

104 def __eq__(self, value: object) -> bool: 

105 """ 

106 Compares this `TransparencyLogEntry` with another object for equality. 

107 

108 Two `TransparencyLogEntry` instances are considered equal if their 

109 inner contents are equal. 

110 """ 

111 if not isinstance(value, TransparencyLogEntry): 

112 return NotImplemented 

113 return self._inner == value._inner 

114 

115 @classmethod 

116 def _from_v1_response(cls, dict_: dict[str, Any]) -> TransparencyLogEntry: 

117 """ 

118 Create a new `TransparencyLogEntry` from the given API response. 

119 """ 

120 

121 # Assumes we only get one entry back 

122 entries = list(dict_.items()) 

123 if len(entries) != 1: 

124 raise ValueError("Received multiple entries in response") 

125 _, entry = entries[0] 

126 

127 # Fill in the appropriate kind 

128 body_entry: ProposedEntry = TypeAdapter(ProposedEntry).validate_json( 

129 base64.b64decode(entry["body"]) 

130 ) 

131 if not isinstance(body_entry, (Hashedrekord, Dsse)): 

132 raise InvalidBundle("log entry is not of expected type") 

133 

134 raw_inclusion_proof = entry["verification"]["inclusionProof"] 

135 

136 # NOTE: The type ignores below are a consequence of our Pydantic 

137 # modeling: mypy and other typecheckers see `ProtoU64` as `int`, 

138 # but it gets coerced from a string due to Protobuf's JSON serialization. 

139 inner = _TransparencyLogEntry( 

140 log_index=str(entry["logIndex"]), # type: ignore[arg-type] 

141 log_id=common_v1.LogId( 

142 key_id=base64.b64encode(bytes.fromhex(entry["logID"])) 

143 ), 

144 kind_version=rekor_v1.KindVersion( 

145 kind=body_entry.kind, version=body_entry.api_version 

146 ), 

147 integrated_time=str(entry["integratedTime"]), # type: ignore[arg-type] 

148 inclusion_promise=rekor_v1.InclusionPromise( 

149 signed_entry_timestamp=entry["verification"]["signedEntryTimestamp"] 

150 ), 

151 inclusion_proof=rekor_v1.InclusionProof( 

152 log_index=str(raw_inclusion_proof["logIndex"]), # type: ignore[arg-type] 

153 root_hash=base64.b64encode( 

154 bytes.fromhex(raw_inclusion_proof["rootHash"]) 

155 ), 

156 tree_size=str(raw_inclusion_proof["treeSize"]), # type: ignore[arg-type] 

157 hashes=[ 

158 base64.b64encode(bytes.fromhex(h)) 

159 for h in raw_inclusion_proof["hashes"] 

160 ], 

161 checkpoint=rekor_v1.Checkpoint( 

162 envelope=raw_inclusion_proof["checkpoint"] 

163 ), 

164 ), 

165 canonicalized_body=entry["body"], 

166 ) 

167 

168 return cls(inner) 

169 

170 def _encode_canonical(self) -> bytes: 

171 """ 

172 Returns a canonicalized JSON (RFC 8785) representation of the transparency log entry. 

173 

174 This encoded representation is suitable for verification against 

175 the Signed Entry Timestamp. 

176 """ 

177 # We might not have an integrated time if our log entry is from rekor 

178 # v2, i.e. was integrated synchronously instead of via an 

179 # inclusion promise. 

180 if self._inner.integrated_time is None: 

181 raise ValueError( 

182 "can't encode canonical form for SET without integrated time" 

183 ) 

184 

185 payload: dict[str, int | str] = { 

186 "body": base64.b64encode(self._inner.canonicalized_body).decode(), 

187 "integratedTime": self._inner.integrated_time, 

188 "logID": self._inner.log_id.key_id.hex(), 

189 "logIndex": self._inner.log_index, 

190 } 

191 

192 return rfc8785.dumps(payload) 

193 

194 def _verify_set(self, keyring: RekorKeyring) -> None: 

195 """ 

196 Verify the inclusion promise (Signed Entry Timestamp) for a given transparency log 

197 `entry` using the given `keyring`. 

198 

199 Fails if the given log entry does not contain an inclusion promise. 

200 """ 

201 

202 if self._inner.inclusion_promise is None: 

203 raise VerificationError("SET: invalid inclusion promise: missing") 

204 

205 signed_entry_ts = self._inner.inclusion_promise.signed_entry_timestamp 

206 

207 try: 

208 keyring.verify( 

209 key_id=KeyID(self._inner.log_id.key_id), 

210 signature=signed_entry_ts, 

211 data=self._encode_canonical(), 

212 ) 

213 except VerificationError as exc: 

214 raise VerificationError(f"SET: invalid inclusion promise: {exc}") 

215 

216 def _verify(self, keyring: RekorKeyring) -> None: 

217 """ 

218 Verifies this log entry. 

219 

220 This method performs steps (5), (6), and optionally (7) in 

221 the top-level verify API: 

222 

223 * Verifies the consistency of the entry with the given bundle; 

224 * Verifies the Merkle inclusion proof and its signed checkpoint; 

225 * Verifies the inclusion promise, if present. 

226 """ 

227 

228 verify_merkle_inclusion(self) 

229 verify_checkpoint(keyring, self) 

230 

231 _logger.debug( 

232 f"successfully verified inclusion proof: index={self._inner.log_index}" 

233 ) 

234 

235 if self._inner.inclusion_promise and self._inner.integrated_time: 

236 self._verify_set(keyring) 

237 _logger.debug( 

238 f"successfully verified inclusion promise: index={self._inner.log_index}" 

239 ) 

240 

241 

242class TimestampVerificationData: 

243 """ 

244 Represents a TimestampVerificationData structure. 

245 

246 @private 

247 """ 

248 

249 def __init__(self, inner: _TimestampVerificationData) -> None: 

250 """Init method.""" 

251 self._inner = inner 

252 self._verify() 

253 

254 def _verify(self) -> None: 

255 """ 

256 Verifies the TimestampVerificationData. 

257 

258 It verifies that TimeStamp Responses embedded in the bundle are correctly 

259 formed. 

260 """ 

261 if not (timestamps := self._inner.rfc3161_timestamps): 

262 timestamps = [] 

263 

264 try: 

265 self._signed_ts = [ 

266 decode_timestamp_response(ts.signed_timestamp) for ts in timestamps 

267 ] 

268 except ValueError: 

269 raise VerificationError("Invalid Timestamp Response") 

270 

271 @property 

272 def rfc3161_timestamps(self) -> list[TimeStampResponse]: 

273 """Returns a list of signed timestamp.""" 

274 return self._signed_ts 

275 

276 @classmethod 

277 def from_json(cls, raw: str | bytes) -> TimestampVerificationData: 

278 """ 

279 Deserialize the given timestamp verification data. 

280 """ 

281 inner = _TimestampVerificationData.from_json(raw) 

282 return cls(inner) 

283 

284 

285class VerificationMaterial: 

286 """ 

287 Represents a VerificationMaterial structure. 

288 """ 

289 

290 def __init__(self, inner: _VerificationMaterial) -> None: 

291 """Init method.""" 

292 self._inner = inner 

293 

294 @property 

295 def timestamp_verification_data(self) -> TimestampVerificationData | None: 

296 """ 

297 Returns the Timestamp Verification Data, if present. 

298 """ 

299 if ( 

300 self._inner.timestamp_verification_data 

301 and self._inner.timestamp_verification_data.rfc3161_timestamps 

302 ): 

303 return TimestampVerificationData(self._inner.timestamp_verification_data) 

304 return None 

305 

306 

307class InvalidBundle(Error): 

308 """ 

309 Raised when the associated `Bundle` is invalid in some way. 

310 """ 

311 

312 def diagnostics(self) -> str: 

313 """Returns diagnostics for the error.""" 

314 

315 return dedent( 

316 f"""\ 

317 An issue occurred while parsing the Sigstore bundle. 

318 

319 The provided bundle is malformed and may have been modified maliciously. 

320 

321 Additional context: 

322 

323 {self} 

324 """ 

325 ) 

326 

327 

328class Bundle: 

329 """ 

330 Represents a Sigstore bundle. 

331 """ 

332 

333 class BundleType(str, Enum): 

334 """ 

335 Known Sigstore bundle media types. 

336 """ 

337 

338 BUNDLE_0_1 = "application/vnd.dev.sigstore.bundle+json;version=0.1" 

339 BUNDLE_0_2 = "application/vnd.dev.sigstore.bundle+json;version=0.2" 

340 BUNDLE_0_3_ALT = "application/vnd.dev.sigstore.bundle+json;version=0.3" 

341 BUNDLE_0_3 = "application/vnd.dev.sigstore.bundle.v0.3+json" 

342 

343 def __str__(self) -> str: 

344 """Returns the variant's string value.""" 

345 return self.value 

346 

347 def __init__(self, inner: _Bundle) -> None: 

348 """ 

349 Creates a new bundle. This is not a public API; use 

350 `from_json` instead. 

351 

352 @private 

353 """ 

354 self._inner = inner 

355 self._verify() 

356 

357 def _verify(self) -> None: 

358 """ 

359 Performs various feats of heroism to ensure the bundle is well-formed 

360 and upholds invariants, including: 

361 

362 * The "leaf" (signing) certificate is present; 

363 * There is a inclusion proof present, even if the Bundle's version 

364 predates a mandatory inclusion proof. 

365 """ 

366 

367 # The bundle must have a recognized media type. 

368 try: 

369 media_type = Bundle.BundleType(self._inner.media_type) 

370 except ValueError: 

371 raise InvalidBundle(f"unsupported bundle format: {self._inner.media_type}") 

372 

373 # Extract the signing certificate. 

374 if media_type in ( 

375 Bundle.BundleType.BUNDLE_0_3, 

376 Bundle.BundleType.BUNDLE_0_3_ALT, 

377 ): 

378 # For "v3" bundles, the signing certificate is the only one present. 

379 if not self._inner.verification_material.certificate: 

380 raise InvalidBundle("expected certificate in bundle") 

381 

382 leaf_cert = load_der_x509_certificate( 

383 self._inner.verification_material.certificate.raw_bytes 

384 ) 

385 else: 

386 # In older bundles, there is an entire pool (misleadingly called 

387 # a chain) of certificates, the first of which is the signing 

388 # certificate. 

389 if not self._inner.verification_material.x509_certificate_chain: 

390 raise InvalidBundle("expected certificate chain in bundle") 

391 

392 chain = self._inner.verification_material.x509_certificate_chain 

393 if not chain.certificates: 

394 raise InvalidBundle("expected non-empty certificate chain in bundle") 

395 

396 # Per client policy in protobuf-specs: the first entry in the chain 

397 # MUST be a leaf certificate, and the rest of the chain MUST NOT 

398 # include a root CA or any intermediate CAs that appear in an 

399 # independent root of trust. 

400 # 

401 # We expect some old bundles to violate the rules around root 

402 # and intermediate CAs, so we issue warnings and not hard errors 

403 # in those cases. 

404 leaf_cert, *chain_certs = ( 

405 load_der_x509_certificate(cert.raw_bytes) for cert in chain.certificates 

406 ) 

407 if not cert_is_leaf(leaf_cert): 

408 raise InvalidBundle( 

409 "bundle contains an invalid leaf or non-leaf certificate in the leaf position" 

410 ) 

411 

412 for chain_cert in chain_certs: 

413 # TODO: We should also retrieve the root of trust here and 

414 # cross-check against it. 

415 if cert_is_root_ca(chain_cert): 

416 _logger.warning( 

417 "this bundle contains a root CA, making it subject to misuse" 

418 ) 

419 

420 self._signing_certificate = leaf_cert 

421 

422 # Extract the log entry. For the time being, we expect 

423 # bundles to only contain a single log entry. 

424 tlog_entries = self._inner.verification_material.tlog_entries 

425 if len(tlog_entries) != 1: 

426 raise InvalidBundle("expected exactly one log entry in bundle") 

427 tlog_entry = tlog_entries[0] 

428 

429 # Handling of inclusion promises and proofs varies between bundle 

430 # format versions: 

431 # 

432 # * For 0.1, an inclusion promise is required; the client 

433 # MUST verify the inclusion promise. 

434 # The inclusion proof is NOT required. If provided, it might NOT 

435 # contain a checkpoint; in this case, we ignore it (since it's 

436 # useless without one). 

437 # 

438 # * For 0.2+, an inclusion proof is required; the client MUST 

439 # verify the inclusion proof. The inclusion prof MUST contain 

440 # a checkpoint. 

441 # 

442 # The inclusion promise is NOT required if another source of signed 

443 # time (such as a signed timestamp) is present. If no other source 

444 # of signed time is present, then the inclusion promise MUST be 

445 # present. 

446 # 

447 # Before all of this, we require that the inclusion proof be present 

448 # (when constructing the LogEntry). 

449 log_entry = TransparencyLogEntry(tlog_entry) 

450 

451 if media_type == Bundle.BundleType.BUNDLE_0_1: 

452 if not log_entry._inner.inclusion_promise: 

453 raise InvalidBundle("bundle must contain an inclusion promise") 

454 if not log_entry._inner.inclusion_proof.checkpoint: 

455 _logger.debug( 

456 "0.1 bundle contains inclusion proof without checkpoint; ignoring" 

457 ) 

458 else: 

459 if not log_entry._inner.inclusion_proof.checkpoint: 

460 raise InvalidBundle("expected checkpoint in inclusion proof") 

461 

462 if ( 

463 not log_entry._inner.inclusion_promise 

464 and not self.verification_material.timestamp_verification_data 

465 ): 

466 raise InvalidBundle( 

467 "bundle must contain an inclusion promise or signed timestamp(s)" 

468 ) 

469 

470 self._log_entry = log_entry 

471 

472 @property 

473 def signing_certificate(self) -> Certificate: 

474 """Returns the bundle's contained signing (i.e. leaf) certificate.""" 

475 return self._signing_certificate 

476 

477 @property 

478 def log_entry(self) -> TransparencyLogEntry: 

479 """ 

480 Returns the bundle's log entry, containing an inclusion proof 

481 (with checkpoint) and an inclusion promise (if the latter is present). 

482 """ 

483 return self._log_entry 

484 

485 @property 

486 def _dsse_envelope(self) -> dsse.Envelope | None: 

487 """ 

488 Returns the DSSE envelope within this Bundle as a `dsse.Envelope`. 

489 

490 @private 

491 """ 

492 if self._inner.dsse_envelope is not None: 

493 return dsse.Envelope(self._inner.dsse_envelope) 

494 return None 

495 

496 @property 

497 def signature(self) -> bytes: 

498 """ 

499 Returns the signature bytes of this bundle. 

500 Either from the DSSE Envelope or from the message itself. 

501 """ 

502 return ( 

503 self._dsse_envelope.signature 

504 if self._dsse_envelope 

505 else self._inner.message_signature.signature # type: ignore[union-attr] 

506 ) 

507 

508 @property 

509 def verification_material(self) -> VerificationMaterial: 

510 """ 

511 Returns the bundle's verification material. 

512 """ 

513 return VerificationMaterial(self._inner.verification_material) 

514 

515 @classmethod 

516 def from_json(cls, raw: bytes | str) -> Bundle: 

517 """ 

518 Deserialize the given Sigstore bundle. 

519 """ 

520 try: 

521 inner = _Bundle.from_json(raw) 

522 except ValueError as exc: 

523 raise InvalidBundle(f"failed to load bundle: {exc}") 

524 return cls(inner) 

525 

526 def to_json(self) -> str: 

527 """ 

528 Return a JSON encoding of this bundle. 

529 """ 

530 return self._inner.to_json() 

531 

532 def _to_parts( 

533 self, 

534 ) -> tuple[Certificate, MessageSignature | dsse.Envelope, TransparencyLogEntry]: 

535 """ 

536 Decompose the `Bundle` into its core constituent parts. 

537 

538 @private 

539 """ 

540 

541 content: MessageSignature | dsse.Envelope 

542 if self._dsse_envelope: 

543 content = self._dsse_envelope 

544 else: 

545 content = self._inner.message_signature # type: ignore[assignment] 

546 

547 return (self.signing_certificate, content, self.log_entry) 

548 

549 @classmethod 

550 def from_parts( 

551 cls, cert: Certificate, sig: bytes, log_entry: TransparencyLogEntry 

552 ) -> Bundle: 

553 """ 

554 Construct a Sigstore bundle (of `hashedrekord` type) from its 

555 constituent parts. 

556 """ 

557 

558 return cls._from_parts( 

559 cert, MessageSignature(signature=base64.b64encode(sig)), log_entry 

560 ) 

561 

562 @classmethod 

563 def _from_parts( 

564 cls, 

565 cert: Certificate, 

566 content: MessageSignature | dsse.Envelope, 

567 log_entry: TransparencyLogEntry, 

568 signed_timestamp: list[TimeStampResponse] | None = None, 

569 ) -> Bundle: 

570 """ 

571 @private 

572 """ 

573 

574 timestamp_verifcation_data = bundle_v1.TimestampVerificationData( 

575 rfc3161_timestamps=[] 

576 ) 

577 if signed_timestamp is not None: 

578 timestamp_verifcation_data.rfc3161_timestamps.extend( 

579 [ 

580 RFC3161SignedTimestamp( 

581 signed_timestamp=base64.b64encode(response.as_bytes()) 

582 ) 

583 for response in signed_timestamp 

584 ] 

585 ) 

586 

587 # Fill in the appropriate variant. 

588 message_signature = None 

589 dsse_envelope = None 

590 if isinstance(content, MessageSignature): 

591 message_signature = content 

592 else: 

593 dsse_envelope = content._inner 

594 

595 inner = _Bundle( 

596 media_type=Bundle.BundleType.BUNDLE_0_3.value, 

597 verification_material=bundle_v1.VerificationMaterial( 

598 certificate=common_v1.X509Certificate( 

599 raw_bytes=base64.b64encode(cert.public_bytes(Encoding.DER)) 

600 ), 

601 tlog_entries=[log_entry._inner], 

602 timestamp_verification_data=timestamp_verifcation_data, 

603 ), 

604 message_signature=message_signature, 

605 dsse_envelope=dsse_envelope, 

606 ) 

607 

608 return cls(inner) 

609 

610 

611class SigningConfig: 

612 """ 

613 Signing configuration for a Sigstore instance. 

614 """ 

615 

616 class SigningConfigType(str, Enum): 

617 """ 

618 Known Sigstore signing config media types. 

619 """ 

620 

621 SIGNING_CONFIG_0_2 = "application/vnd.dev.sigstore.signingconfig.v0.2+json" 

622 

623 def __str__(self) -> str: 

624 """Returns the variant's string value.""" 

625 return self.value 

626 

627 def __init__( 

628 self, inner: trustroot_v1.SigningConfig, tlog_version: int | None = None 

629 ): 

630 """ 

631 Construct a new `SigningConfig`. 

632 

633 tlog_version is an optional argument that enforces that only specified 

634 versions of rekor are included in the transparency logs. 

635 

636 @api private 

637 """ 

638 self._inner = inner 

639 

640 # must have a recognized media type. 

641 try: 

642 SigningConfig.SigningConfigType(self._inner.media_type) 

643 except ValueError: 

644 raise Error(f"unsupported signing config format: {self._inner.media_type}") 

645 

646 # Create lists of service protos that are valid, selected by the service 

647 # configuration & supported by this client 

648 if tlog_version is None: 

649 tlog_versions = REKOR_VERSIONS 

650 else: 

651 tlog_versions = [tlog_version] 

652 

653 self._tlogs = self._get_valid_services( 

654 self._inner.rekor_tlog_urls, tlog_versions, self._inner.rekor_tlog_config 

655 ) 

656 if not self._tlogs: 

657 raise Error("No valid Rekor transparency log found in signing config") 

658 

659 self._tsas = self._get_valid_services( 

660 self._inner.tsa_urls, TSA_VERSIONS, self._inner.tsa_config 

661 ) 

662 

663 self._fulcios = self._get_valid_services( 

664 self._inner.ca_urls, FULCIO_VERSIONS, None 

665 ) 

666 if not self._fulcios: 

667 raise Error("No valid Fulcio CA found in signing config") 

668 

669 self._oidcs = self._get_valid_services( 

670 self._inner.oidc_urls, OIDC_VERSIONS, None 

671 ) 

672 

673 @classmethod 

674 def from_file( 

675 cls, 

676 path: str, 

677 ) -> SigningConfig: 

678 """Create a new signing config from file""" 

679 inner = trustroot_v1.SigningConfig.from_json(Path(path).read_bytes()) 

680 return cls(inner) 

681 

682 @staticmethod 

683 def _get_valid_services( 

684 services: list[trustroot_v1.Service], 

685 supported_versions: list[int], 

686 config: trustroot_v1.ServiceConfiguration | None, 

687 ) -> list[trustroot_v1.Service]: 

688 """Return supported services, taking SigningConfig restrictions into account""" 

689 

690 # split services by operator, only include valid services 

691 services_by_operator: dict[str, list[trustroot_v1.Service]] = defaultdict(list) 

692 for service in services: 

693 if service.major_api_version not in supported_versions: 

694 continue 

695 

696 if not is_timerange_valid(service.valid_for, allow_expired=False): 

697 continue 

698 

699 services_by_operator[service.operator].append(service) 

700 

701 # build a list of services but make sure we only include one service per operator 

702 # and use the highest version available for that operator 

703 result: list[trustroot_v1.Service] = [] 

704 for op_services in services_by_operator.values(): 

705 op_services.sort(key=lambda s: s.major_api_version) 

706 result.append(op_services[-1]) 

707 

708 # Depending on ServiceSelector, prune the result list 

709 if not config or config.selector == trustroot_v1.ServiceSelector.ALL: 

710 return result 

711 

712 # handle EXACT and ANY selectors 

713 count = ( 

714 config.count 

715 if config.selector == trustroot_v1.ServiceSelector.EXACT and config.count 

716 else 1 

717 ) 

718 

719 if ( 

720 config.selector == trustroot_v1.ServiceSelector.EXACT 

721 and len(result) < count 

722 ): 

723 raise ValueError( 

724 f"Expected {count} services in signing config, found {len(result)}" 

725 ) 

726 

727 return result[:count] 

728 

729 def get_tlogs(self) -> list[RekorLogSubmitter]: 

730 """ 

731 Returns the rekor transparency log clients to sign with. 

732 """ 

733 result: list[RekorLogSubmitter] = [] 

734 for tlog in self._tlogs: 

735 if tlog.major_api_version == 1: 

736 from sigstore._internal.rekor.client import RekorClient 

737 

738 result.append(RekorClient(tlog.url)) 

739 elif tlog.major_api_version == 2: 

740 from sigstore._internal.rekor.client_v2 import RekorV2Client 

741 

742 result.append(RekorV2Client(tlog.url)) 

743 else: 

744 raise AssertionError(f"Unexpected Rekor v{tlog.major_api_version}") 

745 return result 

746 

747 def get_fulcio(self) -> FulcioClient: 

748 """ 

749 Returns a Fulcio client to get a signing certificate from 

750 """ 

751 return FulcioClient(self._fulcios[0].url) 

752 

753 def get_oidc_url(self) -> str: 

754 """ 

755 Returns url for the OIDC provider that client should use to interactively 

756 authenticate. 

757 """ 

758 if not self._oidcs: 

759 raise Error("No valid OIDC provider found in signing config") 

760 return self._oidcs[0].url 

761 

762 def get_tsas(self) -> list[TimestampAuthorityClient]: 

763 """ 

764 Returns timestamp authority clients for urls configured in signing config. 

765 """ 

766 return [TimestampAuthorityClient(s.url) for s in self._tsas] 

767 

768 

769class TrustedRoot: 

770 """ 

771 The cryptographic root(s) of trust for a Sigstore instance. 

772 """ 

773 

774 class TrustedRootType(str, Enum): 

775 """ 

776 Known Sigstore trusted root media types. 

777 """ 

778 

779 TRUSTED_ROOT_0_1 = "application/vnd.dev.sigstore.trustedroot+json;version=0.1" 

780 

781 def __str__(self) -> str: 

782 """Returns the variant's string value.""" 

783 return self.value 

784 

785 def __init__(self, inner: trustroot_v1.TrustedRoot): 

786 """ 

787 Construct a new `TrustedRoot`. 

788 

789 @api private 

790 """ 

791 self._inner = inner 

792 self._verify() 

793 

794 def _verify(self) -> None: 

795 """ 

796 Performs various feats of heroism to ensure that the trusted root 

797 is well-formed. 

798 """ 

799 

800 # The trusted root must have a recognized media type. 

801 try: 

802 TrustedRoot.TrustedRootType(self._inner.media_type) 

803 except ValueError: 

804 raise Error(f"unsupported trusted root format: {self._inner.media_type}") 

805 

806 @classmethod 

807 def from_file( 

808 cls, 

809 path: str, 

810 ) -> TrustedRoot: 

811 """Create a new trust root from file""" 

812 inner = trustroot_v1.TrustedRoot.from_json(Path(path).read_bytes()) 

813 return cls(inner) 

814 

815 def _get_tlog_keys( 

816 self, tlogs: list[trustroot_v1.TransparencyLogInstance], purpose: KeyringPurpose 

817 ) -> Iterable[common_v1.PublicKey]: 

818 """ 

819 Yields an iterator of public keys for transparency log instances that 

820 are suitable for `purpose`. 

821 """ 

822 allow_expired = purpose is KeyringPurpose.VERIFY 

823 for tlog in tlogs: 

824 if not is_timerange_valid( 

825 tlog.public_key.valid_for, allow_expired=allow_expired 

826 ): 

827 continue 

828 

829 yield tlog.public_key 

830 

831 def rekor_keyring(self, purpose: KeyringPurpose) -> RekorKeyring: 

832 """Return keyring with keys for Rekor.""" 

833 

834 keys: list[common_v1.PublicKey] = list( 

835 self._get_tlog_keys(self._inner.tlogs, purpose) 

836 ) 

837 if len(keys) == 0: 

838 raise MetadataError("Did not find any Rekor keys in trusted root") 

839 return RekorKeyring(Keyring(keys)) 

840 

841 def ct_keyring(self, purpose: KeyringPurpose) -> CTKeyring: 

842 """Return keyring with key for CTFE.""" 

843 ctfes: list[common_v1.PublicKey] = list( 

844 self._get_tlog_keys(self._inner.ctlogs, purpose) 

845 ) 

846 if not ctfes: 

847 raise MetadataError("CTFE keys not found in trusted root") 

848 return CTKeyring(Keyring(ctfes)) 

849 

850 def get_fulcio_certs(self) -> list[Certificate]: 

851 """Return the Fulcio certificates.""" 

852 

853 certs: list[Certificate] = [] 

854 

855 # Return expired certificates too: they are expired now but may have 

856 # been active when the certificate was used to sign. 

857 for authority in self._inner.certificate_authorities: 

858 certificate_authority = CertificateAuthority(authority) 

859 certs.extend(certificate_authority.certificates(allow_expired=True)) 

860 

861 if not certs: 

862 raise MetadataError("Fulcio certificates not found in trusted root") 

863 return certs 

864 

865 def get_timestamp_authorities(self) -> list[CertificateAuthority]: 

866 """ 

867 Return the TSA present in the trusted root. 

868 

869 This list may be empty and in this case, no timestamp verification can be 

870 performed. 

871 """ 

872 certificate_authorities: list[CertificateAuthority] = [ 

873 CertificateAuthority(cert_chain) 

874 for cert_chain in self._inner.timestamp_authorities 

875 ] 

876 return certificate_authorities 

877 

878 

879class ClientTrustConfig: 

880 """ 

881 Represents a Sigstore client's trust configuration, including a root of trust. 

882 """ 

883 

884 class ClientTrustConfigType(str, Enum): 

885 """ 

886 Known Sigstore client trust config media types. 

887 """ 

888 

889 CONFIG_0_1 = "application/vnd.dev.sigstore.clienttrustconfig.v0.1+json" 

890 

891 def __str__(self) -> str: 

892 """Returns the variant's string value.""" 

893 return self.value 

894 

895 @classmethod 

896 def from_json(cls, raw: str) -> ClientTrustConfig: 

897 """ 

898 Deserialize the given client trust config. 

899 """ 

900 inner = trustroot_v1.ClientTrustConfig.from_json(raw) 

901 return cls(inner) 

902 

903 @classmethod 

904 def production( 

905 cls, 

906 offline: bool = False, 

907 ) -> ClientTrustConfig: 

908 """Create new trust config from Sigstore production TUF repository. 

909 

910 If `offline`, will use data in local TUF cache. Otherwise will 

911 update the data from remote TUF repository. 

912 """ 

913 return cls.from_tuf(DEFAULT_TUF_URL, offline) 

914 

915 @classmethod 

916 def staging( 

917 cls, 

918 offline: bool = False, 

919 ) -> ClientTrustConfig: 

920 """Create new trust config from Sigstore staging TUF repository. 

921 

922 If `offline`, will use data in local TUF cache. Otherwise will 

923 update the data from remote TUF repository. 

924 """ 

925 return cls.from_tuf(STAGING_TUF_URL, offline) 

926 

927 @classmethod 

928 def from_tuf( 

929 cls, 

930 url: str, 

931 offline: bool = False, 

932 ) -> ClientTrustConfig: 

933 """Create a new trust config from a TUF repository. 

934 

935 If `offline`, will use data in local TUF cache. Otherwise will 

936 update the trust config from remote TUF repository. 

937 """ 

938 updater = TrustUpdater(url, offline) 

939 

940 tr_path = updater.get_trusted_root_path() 

941 inner_tr = trustroot_v1.TrustedRoot.from_json(Path(tr_path).read_bytes()) 

942 

943 try: 

944 sc_path = updater.get_signing_config_path() 

945 inner_sc = trustroot_v1.SigningConfig.from_json(Path(sc_path).read_bytes()) 

946 except TUFError as e: 

947 raise e 

948 

949 return cls( 

950 trustroot_v1.ClientTrustConfig( 

951 media_type=ClientTrustConfig.ClientTrustConfigType.CONFIG_0_1.value, 

952 trusted_root=inner_tr, 

953 signing_config=inner_sc, 

954 ) 

955 ) 

956 

957 def __init__(self, inner: trustroot_v1.ClientTrustConfig) -> None: 

958 """ 

959 @api private 

960 """ 

961 self._inner = inner 

962 

963 # This can be used to enforce a specific rekor major version in signingconfig 

964 self.force_tlog_version: int | None = None 

965 

966 @property 

967 def trusted_root(self) -> TrustedRoot: 

968 """ 

969 Return the interior root of trust, as a `TrustedRoot`. 

970 """ 

971 return TrustedRoot(self._inner.trusted_root) 

972 

973 @property 

974 def signing_config(self) -> SigningConfig: 

975 """ 

976 Return the interior root of trust, as a `SigningConfig`. 

977 """ 

978 return SigningConfig( 

979 self._inner.signing_config, tlog_version=self.force_tlog_version 

980 )