Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sigstore/models.py: 39%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright 2022 The Sigstore Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""
16Common models shared between signing and verification.
17"""
19from __future__ import annotations
21import base64
22import logging
23from collections import defaultdict
24from collections.abc import Iterable
25from enum import Enum
26from pathlib import Path
27from textwrap import dedent
28from typing import Any
30import rfc8785
31from cryptography.hazmat.primitives.serialization import Encoding
32from cryptography.x509 import (
33 Certificate,
34 load_der_x509_certificate,
35)
36from pydantic import TypeAdapter
37from rekor_types import Dsse, Hashedrekord, ProposedEntry
38from rfc3161_client import TimeStampResponse, decode_timestamp_response
39from sigstore_models.bundle import v1 as bundle_v1
40from sigstore_models.bundle.v1 import Bundle as _Bundle
41from sigstore_models.bundle.v1 import (
42 TimestampVerificationData as _TimestampVerificationData,
43)
44from sigstore_models.bundle.v1 import VerificationMaterial as _VerificationMaterial
45from sigstore_models.common import v1 as common_v1
46from sigstore_models.common.v1 import MessageSignature, RFC3161SignedTimestamp
47from sigstore_models.rekor import v1 as rekor_v1
48from sigstore_models.rekor.v1 import TransparencyLogEntry as _TransparencyLogEntry
49from sigstore_models.trustroot import v1 as trustroot_v1
51from sigstore import dsse
52from sigstore._internal.fulcio.client import FulcioClient
53from sigstore._internal.merkle import verify_merkle_inclusion
54from sigstore._internal.rekor import RekorLogSubmitter
55from sigstore._internal.rekor.checkpoint import verify_checkpoint
56from sigstore._internal.timestamp import TimestampAuthorityClient
57from sigstore._internal.trust import (
58 CertificateAuthority,
59 CTKeyring,
60 Keyring,
61 KeyringPurpose,
62 RekorKeyring,
63)
64from sigstore._internal.tuf import DEFAULT_TUF_URL, STAGING_TUF_URL, TrustUpdater
65from sigstore._utils import KeyID, cert_is_leaf, cert_is_root_ca, is_timerange_valid
66from sigstore.errors import Error, MetadataError, TUFError, VerificationError
68# Versions supported by this client
69REKOR_VERSIONS = [1, 2]
70TSA_VERSIONS = [1]
71FULCIO_VERSIONS = [1]
72OIDC_VERSIONS = [1]
75_logger = logging.getLogger(__name__)
78class TransparencyLogEntry:
79 """
80 Represents a transparency log entry.
81 """
83 def __init__(self, inner: _TransparencyLogEntry) -> None:
84 """
85 Creates a new `TransparencyLogEntry` from the given inner object.
87 @private
88 """
89 self._inner = inner
90 self._validate()
92 def _validate(self) -> None:
93 """
94 Ensure this transparency log entry is well-formed and upholds our
95 client invariants.
96 """
98 inclusion_proof: rekor_v1.InclusionProof | None = self._inner.inclusion_proof
99 # This check is required by us as the client, not the
100 # protobuf-specs themselves.
101 if not inclusion_proof or not inclusion_proof.checkpoint:
102 raise InvalidBundle("entry must contain inclusion proof, with checkpoint")
104 def __eq__(self, value: object) -> bool:
105 """
106 Compares this `TransparencyLogEntry` with another object for equality.
108 Two `TransparencyLogEntry` instances are considered equal if their
109 inner contents are equal.
110 """
111 if not isinstance(value, TransparencyLogEntry):
112 return NotImplemented
113 return self._inner == value._inner
115 @classmethod
116 def _from_v1_response(cls, dict_: dict[str, Any]) -> TransparencyLogEntry:
117 """
118 Create a new `TransparencyLogEntry` from the given API response.
119 """
121 # Assumes we only get one entry back
122 entries = list(dict_.items())
123 if len(entries) != 1:
124 raise ValueError("Received multiple entries in response")
125 _, entry = entries[0]
127 # Fill in the appropriate kind
128 body_entry: ProposedEntry = TypeAdapter(ProposedEntry).validate_json(
129 base64.b64decode(entry["body"])
130 )
131 if not isinstance(body_entry, (Hashedrekord, Dsse)):
132 raise InvalidBundle("log entry is not of expected type")
134 raw_inclusion_proof = entry["verification"]["inclusionProof"]
136 # NOTE: The type ignores below are a consequence of our Pydantic
137 # modeling: mypy and other typecheckers see `ProtoU64` as `int`,
138 # but it gets coerced from a string due to Protobuf's JSON serialization.
139 inner = _TransparencyLogEntry(
140 log_index=str(entry["logIndex"]), # type: ignore[arg-type]
141 log_id=common_v1.LogId(
142 key_id=base64.b64encode(bytes.fromhex(entry["logID"]))
143 ),
144 kind_version=rekor_v1.KindVersion(
145 kind=body_entry.kind, version=body_entry.api_version
146 ),
147 integrated_time=str(entry["integratedTime"]), # type: ignore[arg-type]
148 inclusion_promise=rekor_v1.InclusionPromise(
149 signed_entry_timestamp=entry["verification"]["signedEntryTimestamp"]
150 ),
151 inclusion_proof=rekor_v1.InclusionProof(
152 log_index=str(raw_inclusion_proof["logIndex"]), # type: ignore[arg-type]
153 root_hash=base64.b64encode(
154 bytes.fromhex(raw_inclusion_proof["rootHash"])
155 ),
156 tree_size=str(raw_inclusion_proof["treeSize"]), # type: ignore[arg-type]
157 hashes=[
158 base64.b64encode(bytes.fromhex(h))
159 for h in raw_inclusion_proof["hashes"]
160 ],
161 checkpoint=rekor_v1.Checkpoint(
162 envelope=raw_inclusion_proof["checkpoint"]
163 ),
164 ),
165 canonicalized_body=entry["body"],
166 )
168 return cls(inner)
170 def _encode_canonical(self) -> bytes:
171 """
172 Returns a canonicalized JSON (RFC 8785) representation of the transparency log entry.
174 This encoded representation is suitable for verification against
175 the Signed Entry Timestamp.
176 """
177 # We might not have an integrated time if our log entry is from rekor
178 # v2, i.e. was integrated synchronously instead of via an
179 # inclusion promise.
180 if self._inner.integrated_time is None:
181 raise ValueError(
182 "can't encode canonical form for SET without integrated time"
183 )
185 payload: dict[str, int | str] = {
186 "body": base64.b64encode(self._inner.canonicalized_body).decode(),
187 "integratedTime": self._inner.integrated_time,
188 "logID": self._inner.log_id.key_id.hex(),
189 "logIndex": self._inner.log_index,
190 }
192 return rfc8785.dumps(payload)
194 def _verify_set(self, keyring: RekorKeyring) -> None:
195 """
196 Verify the inclusion promise (Signed Entry Timestamp) for a given transparency log
197 `entry` using the given `keyring`.
199 Fails if the given log entry does not contain an inclusion promise.
200 """
202 if self._inner.inclusion_promise is None:
203 raise VerificationError("SET: invalid inclusion promise: missing")
205 signed_entry_ts = self._inner.inclusion_promise.signed_entry_timestamp
207 try:
208 keyring.verify(
209 key_id=KeyID(self._inner.log_id.key_id),
210 signature=signed_entry_ts,
211 data=self._encode_canonical(),
212 )
213 except VerificationError as exc:
214 raise VerificationError(f"SET: invalid inclusion promise: {exc}")
216 def _verify(self, keyring: RekorKeyring) -> None:
217 """
218 Verifies this log entry.
220 This method performs steps (5), (6), and optionally (7) in
221 the top-level verify API:
223 * Verifies the consistency of the entry with the given bundle;
224 * Verifies the Merkle inclusion proof and its signed checkpoint;
225 * Verifies the inclusion promise, if present.
226 """
228 verify_merkle_inclusion(self)
229 verify_checkpoint(keyring, self)
231 _logger.debug(
232 f"successfully verified inclusion proof: index={self._inner.log_index}"
233 )
235 if self._inner.inclusion_promise and self._inner.integrated_time:
236 self._verify_set(keyring)
237 _logger.debug(
238 f"successfully verified inclusion promise: index={self._inner.log_index}"
239 )
242class TimestampVerificationData:
243 """
244 Represents a TimestampVerificationData structure.
246 @private
247 """
249 def __init__(self, inner: _TimestampVerificationData) -> None:
250 """Init method."""
251 self._inner = inner
252 self._verify()
254 def _verify(self) -> None:
255 """
256 Verifies the TimestampVerificationData.
258 It verifies that TimeStamp Responses embedded in the bundle are correctly
259 formed.
260 """
261 if not (timestamps := self._inner.rfc3161_timestamps):
262 timestamps = []
264 try:
265 self._signed_ts = [
266 decode_timestamp_response(ts.signed_timestamp) for ts in timestamps
267 ]
268 except ValueError:
269 raise VerificationError("Invalid Timestamp Response")
271 @property
272 def rfc3161_timestamps(self) -> list[TimeStampResponse]:
273 """Returns a list of signed timestamp."""
274 return self._signed_ts
276 @classmethod
277 def from_json(cls, raw: str | bytes) -> TimestampVerificationData:
278 """
279 Deserialize the given timestamp verification data.
280 """
281 inner = _TimestampVerificationData.from_json(raw)
282 return cls(inner)
285class VerificationMaterial:
286 """
287 Represents a VerificationMaterial structure.
288 """
290 def __init__(self, inner: _VerificationMaterial) -> None:
291 """Init method."""
292 self._inner = inner
294 @property
295 def timestamp_verification_data(self) -> TimestampVerificationData | None:
296 """
297 Returns the Timestamp Verification Data, if present.
298 """
299 if (
300 self._inner.timestamp_verification_data
301 and self._inner.timestamp_verification_data.rfc3161_timestamps
302 ):
303 return TimestampVerificationData(self._inner.timestamp_verification_data)
304 return None
307class InvalidBundle(Error):
308 """
309 Raised when the associated `Bundle` is invalid in some way.
310 """
312 def diagnostics(self) -> str:
313 """Returns diagnostics for the error."""
315 return dedent(
316 f"""\
317 An issue occurred while parsing the Sigstore bundle.
319 The provided bundle is malformed and may have been modified maliciously.
321 Additional context:
323 {self}
324 """
325 )
328class IncompatibleEntry(InvalidBundle):
329 """
330 Raised when the log entry within the `Bundle` has an incompatible KindVersion.
331 """
333 def diagnostics(self) -> str:
334 """Returns diagnostics for the error."""
336 return dedent(
337 f"""\
338 The provided bundle contains a transparency log entry that is incompatible with this version of sigstore-python. Please upgrade your verifying client.
340 Additional context:
342 {self}
343 """
344 )
347class Bundle:
348 """
349 Represents a Sigstore bundle.
350 """
352 class BundleType(str, Enum):
353 """
354 Known Sigstore bundle media types.
355 """
357 BUNDLE_0_1 = "application/vnd.dev.sigstore.bundle+json;version=0.1"
358 BUNDLE_0_2 = "application/vnd.dev.sigstore.bundle+json;version=0.2"
359 BUNDLE_0_3_ALT = "application/vnd.dev.sigstore.bundle+json;version=0.3"
360 BUNDLE_0_3 = "application/vnd.dev.sigstore.bundle.v0.3+json"
362 def __str__(self) -> str:
363 """Returns the variant's string value."""
364 return self.value
366 def __init__(self, inner: _Bundle) -> None:
367 """
368 Creates a new bundle. This is not a public API; use
369 `from_json` instead.
371 @private
372 """
373 self._inner = inner
374 self._verify()
376 def _verify(self) -> None:
377 """
378 Performs various feats of heroism to ensure the bundle is well-formed
379 and upholds invariants, including:
381 * The "leaf" (signing) certificate is present;
382 * There is a inclusion proof present, even if the Bundle's version
383 predates a mandatory inclusion proof.
384 """
386 # The bundle must have a recognized media type.
387 try:
388 media_type = Bundle.BundleType(self._inner.media_type)
389 except ValueError:
390 raise InvalidBundle(f"unsupported bundle format: {self._inner.media_type}")
392 # Extract the signing certificate.
393 if media_type in (
394 Bundle.BundleType.BUNDLE_0_3,
395 Bundle.BundleType.BUNDLE_0_3_ALT,
396 ):
397 # For "v3" bundles, the signing certificate is the only one present.
398 if not self._inner.verification_material.certificate:
399 raise InvalidBundle("expected certificate in bundle")
401 leaf_cert = load_der_x509_certificate(
402 self._inner.verification_material.certificate.raw_bytes
403 )
404 else:
405 # In older bundles, there is an entire pool (misleadingly called
406 # a chain) of certificates, the first of which is the signing
407 # certificate.
408 if not self._inner.verification_material.x509_certificate_chain:
409 raise InvalidBundle("expected certificate chain in bundle")
411 chain = self._inner.verification_material.x509_certificate_chain
412 if not chain.certificates:
413 raise InvalidBundle("expected non-empty certificate chain in bundle")
415 # Per client policy in protobuf-specs: the first entry in the chain
416 # MUST be a leaf certificate, and the rest of the chain MUST NOT
417 # include a root CA or any intermediate CAs that appear in an
418 # independent root of trust.
419 #
420 # We expect some old bundles to violate the rules around root
421 # and intermediate CAs, so we issue warnings and not hard errors
422 # in those cases.
423 leaf_cert, *chain_certs = (
424 load_der_x509_certificate(cert.raw_bytes) for cert in chain.certificates
425 )
426 if not cert_is_leaf(leaf_cert):
427 raise InvalidBundle(
428 "bundle contains an invalid leaf or non-leaf certificate in the leaf position"
429 )
431 for chain_cert in chain_certs:
432 # TODO: We should also retrieve the root of trust here and
433 # cross-check against it.
434 if cert_is_root_ca(chain_cert):
435 _logger.warning(
436 "this bundle contains a root CA, making it subject to misuse"
437 )
439 self._signing_certificate = leaf_cert
441 # Extract the log entry. For the time being, we expect
442 # bundles to only contain a single log entry.
443 tlog_entries = self._inner.verification_material.tlog_entries
444 if len(tlog_entries) != 1:
445 raise InvalidBundle("expected exactly one log entry in bundle")
446 tlog_entry = tlog_entries[0]
448 if tlog_entry.kind_version.version not in ["0.0.1", "0.0.2"]:
449 raise IncompatibleEntry(
450 f"Expected log entry version 0.0.1 - 0.0.2, got {tlog_entry.kind_version.version}"
451 )
453 # Handling of inclusion promises and proofs varies between bundle
454 # format versions:
455 #
456 # * For 0.1, an inclusion promise is required; the client
457 # MUST verify the inclusion promise.
458 # The inclusion proof is NOT required. If provided, it might NOT
459 # contain a checkpoint; in this case, we ignore it (since it's
460 # useless without one).
461 #
462 # * For 0.2+, an inclusion proof is required; the client MUST
463 # verify the inclusion proof. The inclusion prof MUST contain
464 # a checkpoint.
465 #
466 # The inclusion promise is NOT required if another source of signed
467 # time (such as a signed timestamp) is present. If no other source
468 # of signed time is present, then the inclusion promise MUST be
469 # present.
470 #
471 # Before all of this, we require that the inclusion proof be present
472 # (when constructing the LogEntry).
473 log_entry = TransparencyLogEntry(tlog_entry)
475 if media_type == Bundle.BundleType.BUNDLE_0_1:
476 if not log_entry._inner.inclusion_promise:
477 raise InvalidBundle("bundle must contain an inclusion promise")
478 if not log_entry._inner.inclusion_proof.checkpoint:
479 _logger.debug(
480 "0.1 bundle contains inclusion proof without checkpoint; ignoring"
481 )
482 else:
483 if not log_entry._inner.inclusion_proof.checkpoint:
484 raise InvalidBundle("expected checkpoint in inclusion proof")
486 if (
487 not log_entry._inner.inclusion_promise
488 and not self.verification_material.timestamp_verification_data
489 ):
490 raise InvalidBundle(
491 "bundle must contain an inclusion promise or signed timestamp(s)"
492 )
494 self._log_entry = log_entry
496 @property
497 def signing_certificate(self) -> Certificate:
498 """Returns the bundle's contained signing (i.e. leaf) certificate."""
499 return self._signing_certificate
501 @property
502 def log_entry(self) -> TransparencyLogEntry:
503 """
504 Returns the bundle's log entry, containing an inclusion proof
505 (with checkpoint) and an inclusion promise (if the latter is present).
506 """
507 return self._log_entry
509 @property
510 def _dsse_envelope(self) -> dsse.Envelope | None:
511 """
512 Returns the DSSE envelope within this Bundle as a `dsse.Envelope`.
514 @private
515 """
516 if self._inner.dsse_envelope is not None:
517 return dsse.Envelope(self._inner.dsse_envelope)
518 return None
520 @property
521 def signature(self) -> bytes:
522 """
523 Returns the signature bytes of this bundle.
524 Either from the DSSE Envelope or from the message itself.
525 """
526 return (
527 self._dsse_envelope.signature
528 if self._dsse_envelope
529 else self._inner.message_signature.signature # type: ignore[union-attr]
530 )
532 @property
533 def verification_material(self) -> VerificationMaterial:
534 """
535 Returns the bundle's verification material.
536 """
537 return VerificationMaterial(self._inner.verification_material)
539 @classmethod
540 def from_json(cls, raw: bytes | str) -> Bundle:
541 """
542 Deserialize the given Sigstore bundle.
543 """
544 try:
545 inner = _Bundle.from_json(raw)
546 except ValueError as exc:
547 raise InvalidBundle(f"failed to load bundle: {exc}")
548 return cls(inner)
550 def to_json(self) -> str:
551 """
552 Return a JSON encoding of this bundle.
553 """
554 return self._inner.to_json()
556 def _to_parts(
557 self,
558 ) -> tuple[Certificate, MessageSignature | dsse.Envelope, TransparencyLogEntry]:
559 """
560 Decompose the `Bundle` into its core constituent parts.
562 @private
563 """
565 content: MessageSignature | dsse.Envelope
566 if self._dsse_envelope:
567 content = self._dsse_envelope
568 else:
569 content = self._inner.message_signature # type: ignore[assignment]
571 return (self.signing_certificate, content, self.log_entry)
573 @classmethod
574 def from_parts(
575 cls, cert: Certificate, sig: bytes, log_entry: TransparencyLogEntry
576 ) -> Bundle:
577 """
578 Construct a Sigstore bundle (of `hashedrekord` type) from its
579 constituent parts.
580 """
582 return cls._from_parts(
583 cert, MessageSignature(signature=base64.b64encode(sig)), log_entry
584 )
586 @classmethod
587 def _from_parts(
588 cls,
589 cert: Certificate,
590 content: MessageSignature | dsse.Envelope,
591 log_entry: TransparencyLogEntry,
592 signed_timestamp: list[TimeStampResponse] | None = None,
593 ) -> Bundle:
594 """
595 @private
596 """
598 timestamp_verifcation_data = bundle_v1.TimestampVerificationData(
599 rfc3161_timestamps=[]
600 )
601 if signed_timestamp is not None:
602 timestamp_verifcation_data.rfc3161_timestamps.extend(
603 [
604 RFC3161SignedTimestamp(
605 signed_timestamp=base64.b64encode(response.as_bytes())
606 )
607 for response in signed_timestamp
608 ]
609 )
611 # Fill in the appropriate variant.
612 message_signature = None
613 dsse_envelope = None
614 if isinstance(content, MessageSignature):
615 message_signature = content
616 else:
617 dsse_envelope = content._inner
619 inner = _Bundle(
620 media_type=Bundle.BundleType.BUNDLE_0_3.value,
621 verification_material=bundle_v1.VerificationMaterial(
622 certificate=common_v1.X509Certificate(
623 raw_bytes=base64.b64encode(cert.public_bytes(Encoding.DER))
624 ),
625 tlog_entries=[log_entry._inner],
626 timestamp_verification_data=timestamp_verifcation_data,
627 ),
628 message_signature=message_signature,
629 dsse_envelope=dsse_envelope,
630 )
632 return cls(inner)
635class SigningConfig:
636 """
637 Signing configuration for a Sigstore instance.
638 """
640 class SigningConfigType(str, Enum):
641 """
642 Known Sigstore signing config media types.
643 """
645 SIGNING_CONFIG_0_2 = "application/vnd.dev.sigstore.signingconfig.v0.2+json"
647 def __str__(self) -> str:
648 """Returns the variant's string value."""
649 return self.value
651 def __init__(
652 self, inner: trustroot_v1.SigningConfig, tlog_version: int | None = None
653 ):
654 """
655 Construct a new `SigningConfig`.
657 tlog_version is an optional argument that enforces that only specified
658 versions of rekor are included in the transparency logs.
660 @api private
661 """
662 self._inner = inner
664 # must have a recognized media type.
665 try:
666 SigningConfig.SigningConfigType(self._inner.media_type)
667 except ValueError:
668 raise Error(f"unsupported signing config format: {self._inner.media_type}")
670 # Create lists of service protos that are valid, selected by the service
671 # configuration & supported by this client
672 if tlog_version is None:
673 tlog_versions = REKOR_VERSIONS
674 else:
675 tlog_versions = [tlog_version]
677 self._tlogs = self._get_valid_services(
678 self._inner.rekor_tlog_urls, tlog_versions, self._inner.rekor_tlog_config
679 )
680 if not self._tlogs:
681 raise Error("No valid Rekor transparency log found in signing config")
683 self._tsas = self._get_valid_services(
684 self._inner.tsa_urls, TSA_VERSIONS, self._inner.tsa_config
685 )
687 self._fulcios = self._get_valid_services(
688 self._inner.ca_urls, FULCIO_VERSIONS, None
689 )
690 if not self._fulcios:
691 raise Error("No valid Fulcio CA found in signing config")
693 self._oidcs = self._get_valid_services(
694 self._inner.oidc_urls, OIDC_VERSIONS, None
695 )
697 @classmethod
698 def from_file(
699 cls,
700 path: str,
701 ) -> SigningConfig:
702 """Create a new signing config from file"""
703 inner = trustroot_v1.SigningConfig.from_json(Path(path).read_bytes())
704 return cls(inner)
706 @staticmethod
707 def _get_valid_services(
708 services: list[trustroot_v1.Service],
709 supported_versions: list[int],
710 config: trustroot_v1.ServiceConfiguration | None,
711 ) -> list[trustroot_v1.Service]:
712 """Return supported services, taking SigningConfig restrictions into account"""
714 # split services by operator, only include valid services
715 services_by_operator: dict[str, list[trustroot_v1.Service]] = defaultdict(list)
716 for service in services:
717 if service.major_api_version not in supported_versions:
718 continue
720 if not is_timerange_valid(service.valid_for, allow_expired=False):
721 continue
723 services_by_operator[service.operator].append(service)
725 # build a list of services but make sure we only include one service per operator
726 # and use the highest version available for that operator
727 result: list[trustroot_v1.Service] = []
728 for op_services in services_by_operator.values():
729 op_services.sort(key=lambda s: s.major_api_version)
730 result.append(op_services[-1])
732 # Depending on ServiceSelector, prune the result list
733 if not config or config.selector == trustroot_v1.ServiceSelector.ALL:
734 return result
736 # handle EXACT and ANY selectors
737 count = (
738 config.count
739 if config.selector == trustroot_v1.ServiceSelector.EXACT and config.count
740 else 1
741 )
743 if (
744 config.selector == trustroot_v1.ServiceSelector.EXACT
745 and len(result) < count
746 ):
747 raise ValueError(
748 f"Expected {count} services in signing config, found {len(result)}"
749 )
751 return result[:count]
753 def get_tlogs(self) -> list[RekorLogSubmitter]:
754 """
755 Returns the rekor transparency log clients to sign with.
756 """
757 result: list[RekorLogSubmitter] = []
758 for tlog in self._tlogs:
759 if tlog.major_api_version == 1:
760 from sigstore._internal.rekor.client import RekorClient
762 result.append(RekorClient(tlog.url))
763 elif tlog.major_api_version == 2:
764 from sigstore._internal.rekor.client_v2 import RekorV2Client
766 result.append(RekorV2Client(tlog.url))
767 else:
768 raise AssertionError(f"Unexpected Rekor v{tlog.major_api_version}")
769 return result
771 def get_fulcio(self) -> FulcioClient:
772 """
773 Returns a Fulcio client to get a signing certificate from
774 """
775 return FulcioClient(self._fulcios[0].url)
777 def get_oidc_url(self) -> str:
778 """
779 Returns url for the OIDC provider that client should use to interactively
780 authenticate.
781 """
782 if not self._oidcs:
783 raise Error("No valid OIDC provider found in signing config")
784 return self._oidcs[0].url
786 def get_tsas(self) -> list[TimestampAuthorityClient]:
787 """
788 Returns timestamp authority clients for urls configured in signing config.
789 """
790 return [TimestampAuthorityClient(s.url) for s in self._tsas]
793class TrustedRoot:
794 """
795 The cryptographic root(s) of trust for a Sigstore instance.
796 """
798 class TrustedRootType(str, Enum):
799 """
800 Known Sigstore trusted root media types.
801 """
803 TRUSTED_ROOT_0_1 = "application/vnd.dev.sigstore.trustedroot+json;version=0.1"
805 def __str__(self) -> str:
806 """Returns the variant's string value."""
807 return self.value
809 def __init__(self, inner: trustroot_v1.TrustedRoot):
810 """
811 Construct a new `TrustedRoot`.
813 @api private
814 """
815 self._inner = inner
816 self._verify()
818 def _verify(self) -> None:
819 """
820 Performs various feats of heroism to ensure that the trusted root
821 is well-formed.
822 """
824 # The trusted root must have a recognized media type.
825 try:
826 TrustedRoot.TrustedRootType(self._inner.media_type)
827 except ValueError:
828 raise Error(f"unsupported trusted root format: {self._inner.media_type}")
830 @classmethod
831 def from_file(
832 cls,
833 path: str,
834 ) -> TrustedRoot:
835 """Create a new trust root from file"""
836 inner = trustroot_v1.TrustedRoot.from_json(Path(path).read_bytes())
837 return cls(inner)
839 def _get_tlog_keys(
840 self, tlogs: list[trustroot_v1.TransparencyLogInstance], purpose: KeyringPurpose
841 ) -> Iterable[common_v1.PublicKey]:
842 """
843 Yields an iterator of public keys for transparency log instances that
844 are suitable for `purpose`.
845 """
846 allow_expired = purpose is KeyringPurpose.VERIFY
847 for tlog in tlogs:
848 if not is_timerange_valid(
849 tlog.public_key.valid_for, allow_expired=allow_expired
850 ):
851 continue
853 yield tlog.public_key
855 def rekor_keyring(self, purpose: KeyringPurpose) -> RekorKeyring:
856 """Return keyring with keys for Rekor."""
858 keys: list[common_v1.PublicKey] = list(
859 self._get_tlog_keys(self._inner.tlogs, purpose)
860 )
861 if len(keys) == 0:
862 raise MetadataError("Did not find any Rekor keys in trusted root")
863 return RekorKeyring(Keyring(keys))
865 def ct_keyring(self, purpose: KeyringPurpose) -> CTKeyring:
866 """Return keyring with key for CTFE."""
867 ctfes: list[common_v1.PublicKey] = list(
868 self._get_tlog_keys(self._inner.ctlogs, purpose)
869 )
870 if not ctfes:
871 raise MetadataError("CTFE keys not found in trusted root")
872 return CTKeyring(Keyring(ctfes))
874 def get_fulcio_certs(self) -> list[Certificate]:
875 """Return the Fulcio certificates."""
877 certs: list[Certificate] = []
879 # Return expired certificates too: they are expired now but may have
880 # been active when the certificate was used to sign.
881 for authority in self._inner.certificate_authorities:
882 certificate_authority = CertificateAuthority(authority)
883 certs.extend(certificate_authority.certificates(allow_expired=True))
885 if not certs:
886 raise MetadataError("Fulcio certificates not found in trusted root")
887 return certs
889 def get_timestamp_authorities(self) -> list[CertificateAuthority]:
890 """
891 Return the TSA present in the trusted root.
893 This list may be empty and in this case, no timestamp verification can be
894 performed.
895 """
896 certificate_authorities: list[CertificateAuthority] = [
897 CertificateAuthority(cert_chain)
898 for cert_chain in self._inner.timestamp_authorities
899 ]
900 return certificate_authorities
903class ClientTrustConfig:
904 """
905 Represents a Sigstore client's trust configuration, including a root of trust.
906 """
908 class ClientTrustConfigType(str, Enum):
909 """
910 Known Sigstore client trust config media types.
911 """
913 CONFIG_0_1 = "application/vnd.dev.sigstore.clienttrustconfig.v0.1+json"
915 def __str__(self) -> str:
916 """Returns the variant's string value."""
917 return self.value
919 @classmethod
920 def from_json(cls, raw: str) -> ClientTrustConfig:
921 """
922 Deserialize the given client trust config.
923 """
924 inner = trustroot_v1.ClientTrustConfig.from_json(raw)
925 return cls(inner)
927 @classmethod
928 def production(
929 cls,
930 offline: bool = False,
931 ) -> ClientTrustConfig:
932 """Create new trust config from Sigstore production TUF repository.
934 If `offline`, will use data in local TUF cache. Otherwise will
935 update the data from remote TUF repository.
936 """
937 return cls.from_tuf(DEFAULT_TUF_URL, offline)
939 @classmethod
940 def staging(
941 cls,
942 offline: bool = False,
943 ) -> ClientTrustConfig:
944 """Create new trust config from Sigstore staging TUF repository.
946 If `offline`, will use data in local TUF cache. Otherwise will
947 update the data from remote TUF repository.
948 """
949 return cls.from_tuf(STAGING_TUF_URL, offline)
951 @classmethod
952 def from_tuf(
953 cls,
954 url: str,
955 offline: bool = False,
956 bootstrap_root: Path | None = None,
957 ) -> ClientTrustConfig:
958 """Create a new trust config from a TUF repository.
960 If `offline`, will use data in local TUF cache. Otherwise will
961 update the trust config from remote TUF repository.
962 """
963 updater = TrustUpdater(url, offline, bootstrap_root)
965 tr_path = updater.get_trusted_root_path()
966 inner_tr = trustroot_v1.TrustedRoot.from_json(Path(tr_path).read_bytes())
968 try:
969 sc_path = updater.get_signing_config_path()
970 inner_sc = trustroot_v1.SigningConfig.from_json(Path(sc_path).read_bytes())
971 except TUFError as e:
972 raise e
974 return cls(
975 trustroot_v1.ClientTrustConfig(
976 media_type=ClientTrustConfig.ClientTrustConfigType.CONFIG_0_1.value,
977 trusted_root=inner_tr,
978 signing_config=inner_sc,
979 )
980 )
982 def __init__(self, inner: trustroot_v1.ClientTrustConfig) -> None:
983 """
984 @api private
985 """
986 self._inner = inner
988 # This can be used to enforce a specific rekor major version in signingconfig
989 self.force_tlog_version: int | None = None
991 @property
992 def trusted_root(self) -> TrustedRoot:
993 """
994 Return the interior root of trust, as a `TrustedRoot`.
995 """
996 return TrustedRoot(self._inner.trusted_root)
998 @property
999 def signing_config(self) -> SigningConfig:
1000 """
1001 Return the interior root of trust, as a `SigningConfig`.
1002 """
1003 return SigningConfig(
1004 self._inner.signing_config, tlog_version=self.force_tlog_version
1005 )