Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sigstore/models.py: 38%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright 2022 The Sigstore Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""
16Common models shared between signing and verification.
17"""
19from __future__ import annotations
21import base64
22import logging
23from collections import defaultdict
24from collections.abc import Iterable
25from enum import Enum
26from pathlib import Path
27from textwrap import dedent
28from typing import Any
30import rfc8785
31from cryptography.hazmat.primitives.serialization import Encoding
32from cryptography.x509 import (
33 Certificate,
34 load_der_x509_certificate,
35)
36from pydantic import TypeAdapter
37from rekor_types import Dsse, Hashedrekord, ProposedEntry
38from rfc3161_client import TimeStampResponse, decode_timestamp_response
39from sigstore_models.bundle import v1 as bundle_v1
40from sigstore_models.bundle.v1 import Bundle as _Bundle
41from sigstore_models.bundle.v1 import (
42 TimestampVerificationData as _TimestampVerificationData,
43)
44from sigstore_models.bundle.v1 import VerificationMaterial as _VerificationMaterial
45from sigstore_models.common import v1 as common_v1
46from sigstore_models.common.v1 import MessageSignature, RFC3161SignedTimestamp
47from sigstore_models.rekor import v1 as rekor_v1
48from sigstore_models.rekor.v1 import TransparencyLogEntry as _TransparencyLogEntry
49from sigstore_models.trustroot import v1 as trustroot_v1
51from sigstore import dsse
52from sigstore._internal.fulcio.client import FulcioClient
53from sigstore._internal.merkle import verify_merkle_inclusion
54from sigstore._internal.rekor import RekorLogSubmitter
55from sigstore._internal.rekor.checkpoint import verify_checkpoint
56from sigstore._internal.timestamp import TimestampAuthorityClient
57from sigstore._internal.trust import (
58 CertificateAuthority,
59 CTKeyring,
60 Keyring,
61 KeyringPurpose,
62 RekorKeyring,
63)
64from sigstore._internal.tuf import DEFAULT_TUF_URL, STAGING_TUF_URL, TrustUpdater
65from sigstore._utils import KeyID, cert_is_leaf, cert_is_root_ca, is_timerange_valid
66from sigstore.errors import Error, MetadataError, TUFError, VerificationError
68# Versions supported by this client
69REKOR_VERSIONS = [1, 2]
70TSA_VERSIONS = [1]
71FULCIO_VERSIONS = [1]
72OIDC_VERSIONS = [1]
75_logger = logging.getLogger(__name__)
78class TransparencyLogEntry:
79 """
80 Represents a transparency log entry.
81 """
83 def __init__(self, inner: _TransparencyLogEntry) -> None:
84 """
85 Creates a new `TransparencyLogEntry` from the given inner object.
87 @private
88 """
89 self._inner = inner
90 self._validate()
92 def _validate(self) -> None:
93 """
94 Ensure this transparency log entry is well-formed and upholds our
95 client invariants.
96 """
98 inclusion_proof: rekor_v1.InclusionProof | None = self._inner.inclusion_proof
99 # This check is required by us as the client, not the
100 # protobuf-specs themselves.
101 if not inclusion_proof or not inclusion_proof.checkpoint:
102 raise InvalidBundle("entry must contain inclusion proof, with checkpoint")
104 def __eq__(self, value: object) -> bool:
105 """
106 Compares this `TransparencyLogEntry` with another object for equality.
108 Two `TransparencyLogEntry` instances are considered equal if their
109 inner contents are equal.
110 """
111 if not isinstance(value, TransparencyLogEntry):
112 return NotImplemented
113 return self._inner == value._inner
115 @classmethod
116 def _from_v1_response(cls, dict_: dict[str, Any]) -> TransparencyLogEntry:
117 """
118 Create a new `TransparencyLogEntry` from the given API response.
119 """
121 # Assumes we only get one entry back
122 entries = list(dict_.items())
123 if len(entries) != 1:
124 raise ValueError("Received multiple entries in response")
125 _, entry = entries[0]
127 # Fill in the appropriate kind
128 body_entry: ProposedEntry = TypeAdapter(ProposedEntry).validate_json(
129 base64.b64decode(entry["body"])
130 )
131 if not isinstance(body_entry, (Hashedrekord, Dsse)):
132 raise InvalidBundle("log entry is not of expected type")
134 raw_inclusion_proof = entry["verification"]["inclusionProof"]
136 # NOTE: The type ignores below are a consequence of our Pydantic
137 # modeling: mypy and other typecheckers see `ProtoU64` as `int`,
138 # but it gets coerced from a string due to Protobuf's JSON serialization.
139 inner = _TransparencyLogEntry(
140 log_index=str(entry["logIndex"]), # type: ignore[arg-type]
141 log_id=common_v1.LogId(
142 key_id=base64.b64encode(bytes.fromhex(entry["logID"]))
143 ),
144 kind_version=rekor_v1.KindVersion(
145 kind=body_entry.kind, version=body_entry.api_version
146 ),
147 integrated_time=str(entry["integratedTime"]), # type: ignore[arg-type]
148 inclusion_promise=rekor_v1.InclusionPromise(
149 signed_entry_timestamp=entry["verification"]["signedEntryTimestamp"]
150 ),
151 inclusion_proof=rekor_v1.InclusionProof(
152 log_index=str(raw_inclusion_proof["logIndex"]), # type: ignore[arg-type]
153 root_hash=base64.b64encode(
154 bytes.fromhex(raw_inclusion_proof["rootHash"])
155 ),
156 tree_size=str(raw_inclusion_proof["treeSize"]), # type: ignore[arg-type]
157 hashes=[
158 base64.b64encode(bytes.fromhex(h))
159 for h in raw_inclusion_proof["hashes"]
160 ],
161 checkpoint=rekor_v1.Checkpoint(
162 envelope=raw_inclusion_proof["checkpoint"]
163 ),
164 ),
165 canonicalized_body=entry["body"],
166 )
168 return cls(inner)
170 def _encode_canonical(self) -> bytes:
171 """
172 Returns a canonicalized JSON (RFC 8785) representation of the transparency log entry.
174 This encoded representation is suitable for verification against
175 the Signed Entry Timestamp.
176 """
177 # We might not have an integrated time if our log entry is from rekor
178 # v2, i.e. was integrated synchronously instead of via an
179 # inclusion promise.
180 if self._inner.integrated_time is None:
181 raise ValueError(
182 "can't encode canonical form for SET without integrated time"
183 )
185 payload: dict[str, int | str] = {
186 "body": base64.b64encode(self._inner.canonicalized_body).decode(),
187 "integratedTime": self._inner.integrated_time,
188 "logID": self._inner.log_id.key_id.hex(),
189 "logIndex": self._inner.log_index,
190 }
192 return rfc8785.dumps(payload)
194 def _verify_set(self, keyring: RekorKeyring) -> None:
195 """
196 Verify the inclusion promise (Signed Entry Timestamp) for a given transparency log
197 `entry` using the given `keyring`.
199 Fails if the given log entry does not contain an inclusion promise.
200 """
202 if self._inner.inclusion_promise is None:
203 raise VerificationError("SET: invalid inclusion promise: missing")
205 signed_entry_ts = self._inner.inclusion_promise.signed_entry_timestamp
207 try:
208 keyring.verify(
209 key_id=KeyID(self._inner.log_id.key_id),
210 signature=signed_entry_ts,
211 data=self._encode_canonical(),
212 )
213 except VerificationError as exc:
214 raise VerificationError(f"SET: invalid inclusion promise: {exc}")
216 def _verify(self, keyring: RekorKeyring) -> None:
217 """
218 Verifies this log entry.
220 This method performs steps (5), (6), and optionally (7) in
221 the top-level verify API:
223 * Verifies the consistency of the entry with the given bundle;
224 * Verifies the Merkle inclusion proof and its signed checkpoint;
225 * Verifies the inclusion promise, if present.
226 """
228 verify_merkle_inclusion(self)
229 verify_checkpoint(keyring, self)
231 _logger.debug(
232 f"successfully verified inclusion proof: index={self._inner.log_index}"
233 )
235 if self._inner.inclusion_promise and self._inner.integrated_time:
236 self._verify_set(keyring)
237 _logger.debug(
238 f"successfully verified inclusion promise: index={self._inner.log_index}"
239 )
242class TimestampVerificationData:
243 """
244 Represents a TimestampVerificationData structure.
246 @private
247 """
249 def __init__(self, inner: _TimestampVerificationData) -> None:
250 """Init method."""
251 self._inner = inner
252 self._verify()
254 def _verify(self) -> None:
255 """
256 Verifies the TimestampVerificationData.
258 It verifies that TimeStamp Responses embedded in the bundle are correctly
259 formed.
260 """
261 if not (timestamps := self._inner.rfc3161_timestamps):
262 timestamps = []
264 try:
265 self._signed_ts = [
266 decode_timestamp_response(ts.signed_timestamp) for ts in timestamps
267 ]
268 except ValueError:
269 raise VerificationError("Invalid Timestamp Response")
271 @property
272 def rfc3161_timestamps(self) -> list[TimeStampResponse]:
273 """Returns a list of signed timestamp."""
274 return self._signed_ts
276 @classmethod
277 def from_json(cls, raw: str | bytes) -> TimestampVerificationData:
278 """
279 Deserialize the given timestamp verification data.
280 """
281 inner = _TimestampVerificationData.from_json(raw)
282 return cls(inner)
285class VerificationMaterial:
286 """
287 Represents a VerificationMaterial structure.
288 """
290 def __init__(self, inner: _VerificationMaterial) -> None:
291 """Init method."""
292 self._inner = inner
294 @property
295 def timestamp_verification_data(self) -> TimestampVerificationData | None:
296 """
297 Returns the Timestamp Verification Data, if present.
298 """
299 if (
300 self._inner.timestamp_verification_data
301 and self._inner.timestamp_verification_data.rfc3161_timestamps
302 ):
303 return TimestampVerificationData(self._inner.timestamp_verification_data)
304 return None
307class InvalidBundle(Error):
308 """
309 Raised when the associated `Bundle` is invalid in some way.
310 """
312 def diagnostics(self) -> str:
313 """Returns diagnostics for the error."""
315 return dedent(
316 f"""\
317 An issue occurred while parsing the Sigstore bundle.
319 The provided bundle is malformed and may have been modified maliciously.
321 Additional context:
323 {self}
324 """
325 )
328class Bundle:
329 """
330 Represents a Sigstore bundle.
331 """
333 class BundleType(str, Enum):
334 """
335 Known Sigstore bundle media types.
336 """
338 BUNDLE_0_1 = "application/vnd.dev.sigstore.bundle+json;version=0.1"
339 BUNDLE_0_2 = "application/vnd.dev.sigstore.bundle+json;version=0.2"
340 BUNDLE_0_3_ALT = "application/vnd.dev.sigstore.bundle+json;version=0.3"
341 BUNDLE_0_3 = "application/vnd.dev.sigstore.bundle.v0.3+json"
343 def __str__(self) -> str:
344 """Returns the variant's string value."""
345 return self.value
347 def __init__(self, inner: _Bundle) -> None:
348 """
349 Creates a new bundle. This is not a public API; use
350 `from_json` instead.
352 @private
353 """
354 self._inner = inner
355 self._verify()
357 def _verify(self) -> None:
358 """
359 Performs various feats of heroism to ensure the bundle is well-formed
360 and upholds invariants, including:
362 * The "leaf" (signing) certificate is present;
363 * There is a inclusion proof present, even if the Bundle's version
364 predates a mandatory inclusion proof.
365 """
367 # The bundle must have a recognized media type.
368 try:
369 media_type = Bundle.BundleType(self._inner.media_type)
370 except ValueError:
371 raise InvalidBundle(f"unsupported bundle format: {self._inner.media_type}")
373 # Extract the signing certificate.
374 if media_type in (
375 Bundle.BundleType.BUNDLE_0_3,
376 Bundle.BundleType.BUNDLE_0_3_ALT,
377 ):
378 # For "v3" bundles, the signing certificate is the only one present.
379 if not self._inner.verification_material.certificate:
380 raise InvalidBundle("expected certificate in bundle")
382 leaf_cert = load_der_x509_certificate(
383 self._inner.verification_material.certificate.raw_bytes
384 )
385 else:
386 # In older bundles, there is an entire pool (misleadingly called
387 # a chain) of certificates, the first of which is the signing
388 # certificate.
389 if not self._inner.verification_material.x509_certificate_chain:
390 raise InvalidBundle("expected certificate chain in bundle")
392 chain = self._inner.verification_material.x509_certificate_chain
393 if not chain.certificates:
394 raise InvalidBundle("expected non-empty certificate chain in bundle")
396 # Per client policy in protobuf-specs: the first entry in the chain
397 # MUST be a leaf certificate, and the rest of the chain MUST NOT
398 # include a root CA or any intermediate CAs that appear in an
399 # independent root of trust.
400 #
401 # We expect some old bundles to violate the rules around root
402 # and intermediate CAs, so we issue warnings and not hard errors
403 # in those cases.
404 leaf_cert, *chain_certs = (
405 load_der_x509_certificate(cert.raw_bytes) for cert in chain.certificates
406 )
407 if not cert_is_leaf(leaf_cert):
408 raise InvalidBundle(
409 "bundle contains an invalid leaf or non-leaf certificate in the leaf position"
410 )
412 for chain_cert in chain_certs:
413 # TODO: We should also retrieve the root of trust here and
414 # cross-check against it.
415 if cert_is_root_ca(chain_cert):
416 _logger.warning(
417 "this bundle contains a root CA, making it subject to misuse"
418 )
420 self._signing_certificate = leaf_cert
422 # Extract the log entry. For the time being, we expect
423 # bundles to only contain a single log entry.
424 tlog_entries = self._inner.verification_material.tlog_entries
425 if len(tlog_entries) != 1:
426 raise InvalidBundle("expected exactly one log entry in bundle")
427 tlog_entry = tlog_entries[0]
429 # Handling of inclusion promises and proofs varies between bundle
430 # format versions:
431 #
432 # * For 0.1, an inclusion promise is required; the client
433 # MUST verify the inclusion promise.
434 # The inclusion proof is NOT required. If provided, it might NOT
435 # contain a checkpoint; in this case, we ignore it (since it's
436 # useless without one).
437 #
438 # * For 0.2+, an inclusion proof is required; the client MUST
439 # verify the inclusion proof. The inclusion prof MUST contain
440 # a checkpoint.
441 #
442 # The inclusion promise is NOT required if another source of signed
443 # time (such as a signed timestamp) is present. If no other source
444 # of signed time is present, then the inclusion promise MUST be
445 # present.
446 #
447 # Before all of this, we require that the inclusion proof be present
448 # (when constructing the LogEntry).
449 log_entry = TransparencyLogEntry(tlog_entry)
451 if media_type == Bundle.BundleType.BUNDLE_0_1:
452 if not log_entry._inner.inclusion_promise:
453 raise InvalidBundle("bundle must contain an inclusion promise")
454 if not log_entry._inner.inclusion_proof.checkpoint:
455 _logger.debug(
456 "0.1 bundle contains inclusion proof without checkpoint; ignoring"
457 )
458 else:
459 if not log_entry._inner.inclusion_proof.checkpoint:
460 raise InvalidBundle("expected checkpoint in inclusion proof")
462 if (
463 not log_entry._inner.inclusion_promise
464 and not self.verification_material.timestamp_verification_data
465 ):
466 raise InvalidBundle(
467 "bundle must contain an inclusion promise or signed timestamp(s)"
468 )
470 self._log_entry = log_entry
472 @property
473 def signing_certificate(self) -> Certificate:
474 """Returns the bundle's contained signing (i.e. leaf) certificate."""
475 return self._signing_certificate
477 @property
478 def log_entry(self) -> TransparencyLogEntry:
479 """
480 Returns the bundle's log entry, containing an inclusion proof
481 (with checkpoint) and an inclusion promise (if the latter is present).
482 """
483 return self._log_entry
485 @property
486 def _dsse_envelope(self) -> dsse.Envelope | None:
487 """
488 Returns the DSSE envelope within this Bundle as a `dsse.Envelope`.
490 @private
491 """
492 if self._inner.dsse_envelope is not None:
493 return dsse.Envelope(self._inner.dsse_envelope)
494 return None
496 @property
497 def signature(self) -> bytes:
498 """
499 Returns the signature bytes of this bundle.
500 Either from the DSSE Envelope or from the message itself.
501 """
502 return (
503 self._dsse_envelope.signature
504 if self._dsse_envelope
505 else self._inner.message_signature.signature # type: ignore[union-attr]
506 )
508 @property
509 def verification_material(self) -> VerificationMaterial:
510 """
511 Returns the bundle's verification material.
512 """
513 return VerificationMaterial(self._inner.verification_material)
515 @classmethod
516 def from_json(cls, raw: bytes | str) -> Bundle:
517 """
518 Deserialize the given Sigstore bundle.
519 """
520 try:
521 inner = _Bundle.from_json(raw)
522 except ValueError as exc:
523 raise InvalidBundle(f"failed to load bundle: {exc}")
524 return cls(inner)
526 def to_json(self) -> str:
527 """
528 Return a JSON encoding of this bundle.
529 """
530 return self._inner.to_json()
532 def _to_parts(
533 self,
534 ) -> tuple[Certificate, MessageSignature | dsse.Envelope, TransparencyLogEntry]:
535 """
536 Decompose the `Bundle` into its core constituent parts.
538 @private
539 """
541 content: MessageSignature | dsse.Envelope
542 if self._dsse_envelope:
543 content = self._dsse_envelope
544 else:
545 content = self._inner.message_signature # type: ignore[assignment]
547 return (self.signing_certificate, content, self.log_entry)
549 @classmethod
550 def from_parts(
551 cls, cert: Certificate, sig: bytes, log_entry: TransparencyLogEntry
552 ) -> Bundle:
553 """
554 Construct a Sigstore bundle (of `hashedrekord` type) from its
555 constituent parts.
556 """
558 return cls._from_parts(
559 cert, MessageSignature(signature=base64.b64encode(sig)), log_entry
560 )
562 @classmethod
563 def _from_parts(
564 cls,
565 cert: Certificate,
566 content: MessageSignature | dsse.Envelope,
567 log_entry: TransparencyLogEntry,
568 signed_timestamp: list[TimeStampResponse] | None = None,
569 ) -> Bundle:
570 """
571 @private
572 """
574 timestamp_verifcation_data = bundle_v1.TimestampVerificationData(
575 rfc3161_timestamps=[]
576 )
577 if signed_timestamp is not None:
578 timestamp_verifcation_data.rfc3161_timestamps.extend(
579 [
580 RFC3161SignedTimestamp(
581 signed_timestamp=base64.b64encode(response.as_bytes())
582 )
583 for response in signed_timestamp
584 ]
585 )
587 # Fill in the appropriate variant.
588 message_signature = None
589 dsse_envelope = None
590 if isinstance(content, MessageSignature):
591 message_signature = content
592 else:
593 dsse_envelope = content._inner
595 inner = _Bundle(
596 media_type=Bundle.BundleType.BUNDLE_0_3.value,
597 verification_material=bundle_v1.VerificationMaterial(
598 certificate=common_v1.X509Certificate(
599 raw_bytes=base64.b64encode(cert.public_bytes(Encoding.DER))
600 ),
601 tlog_entries=[log_entry._inner],
602 timestamp_verification_data=timestamp_verifcation_data,
603 ),
604 message_signature=message_signature,
605 dsse_envelope=dsse_envelope,
606 )
608 return cls(inner)
611class SigningConfig:
612 """
613 Signing configuration for a Sigstore instance.
614 """
616 class SigningConfigType(str, Enum):
617 """
618 Known Sigstore signing config media types.
619 """
621 SIGNING_CONFIG_0_2 = "application/vnd.dev.sigstore.signingconfig.v0.2+json"
623 def __str__(self) -> str:
624 """Returns the variant's string value."""
625 return self.value
627 def __init__(
628 self, inner: trustroot_v1.SigningConfig, tlog_version: int | None = None
629 ):
630 """
631 Construct a new `SigningConfig`.
633 tlog_version is an optional argument that enforces that only specified
634 versions of rekor are included in the transparency logs.
636 @api private
637 """
638 self._inner = inner
640 # must have a recognized media type.
641 try:
642 SigningConfig.SigningConfigType(self._inner.media_type)
643 except ValueError:
644 raise Error(f"unsupported signing config format: {self._inner.media_type}")
646 # Create lists of service protos that are valid, selected by the service
647 # configuration & supported by this client
648 if tlog_version is None:
649 tlog_versions = REKOR_VERSIONS
650 else:
651 tlog_versions = [tlog_version]
653 self._tlogs = self._get_valid_services(
654 self._inner.rekor_tlog_urls, tlog_versions, self._inner.rekor_tlog_config
655 )
656 if not self._tlogs:
657 raise Error("No valid Rekor transparency log found in signing config")
659 self._tsas = self._get_valid_services(
660 self._inner.tsa_urls, TSA_VERSIONS, self._inner.tsa_config
661 )
663 self._fulcios = self._get_valid_services(
664 self._inner.ca_urls, FULCIO_VERSIONS, None
665 )
666 if not self._fulcios:
667 raise Error("No valid Fulcio CA found in signing config")
669 self._oidcs = self._get_valid_services(
670 self._inner.oidc_urls, OIDC_VERSIONS, None
671 )
673 @classmethod
674 def from_file(
675 cls,
676 path: str,
677 ) -> SigningConfig:
678 """Create a new signing config from file"""
679 inner = trustroot_v1.SigningConfig.from_json(Path(path).read_bytes())
680 return cls(inner)
682 @staticmethod
683 def _get_valid_services(
684 services: list[trustroot_v1.Service],
685 supported_versions: list[int],
686 config: trustroot_v1.ServiceConfiguration | None,
687 ) -> list[trustroot_v1.Service]:
688 """Return supported services, taking SigningConfig restrictions into account"""
690 # split services by operator, only include valid services
691 services_by_operator: dict[str, list[trustroot_v1.Service]] = defaultdict(list)
692 for service in services:
693 if service.major_api_version not in supported_versions:
694 continue
696 if not is_timerange_valid(service.valid_for, allow_expired=False):
697 continue
699 services_by_operator[service.operator].append(service)
701 # build a list of services but make sure we only include one service per operator
702 # and use the highest version available for that operator
703 result: list[trustroot_v1.Service] = []
704 for op_services in services_by_operator.values():
705 op_services.sort(key=lambda s: s.major_api_version)
706 result.append(op_services[-1])
708 # Depending on ServiceSelector, prune the result list
709 if not config or config.selector == trustroot_v1.ServiceSelector.ALL:
710 return result
712 # handle EXACT and ANY selectors
713 count = (
714 config.count
715 if config.selector == trustroot_v1.ServiceSelector.EXACT and config.count
716 else 1
717 )
719 if (
720 config.selector == trustroot_v1.ServiceSelector.EXACT
721 and len(result) < count
722 ):
723 raise ValueError(
724 f"Expected {count} services in signing config, found {len(result)}"
725 )
727 return result[:count]
729 def get_tlogs(self) -> list[RekorLogSubmitter]:
730 """
731 Returns the rekor transparency log clients to sign with.
732 """
733 result: list[RekorLogSubmitter] = []
734 for tlog in self._tlogs:
735 if tlog.major_api_version == 1:
736 from sigstore._internal.rekor.client import RekorClient
738 result.append(RekorClient(tlog.url))
739 elif tlog.major_api_version == 2:
740 from sigstore._internal.rekor.client_v2 import RekorV2Client
742 result.append(RekorV2Client(tlog.url))
743 else:
744 raise AssertionError(f"Unexpected Rekor v{tlog.major_api_version}")
745 return result
747 def get_fulcio(self) -> FulcioClient:
748 """
749 Returns a Fulcio client to get a signing certificate from
750 """
751 return FulcioClient(self._fulcios[0].url)
753 def get_oidc_url(self) -> str:
754 """
755 Returns url for the OIDC provider that client should use to interactively
756 authenticate.
757 """
758 if not self._oidcs:
759 raise Error("No valid OIDC provider found in signing config")
760 return self._oidcs[0].url
762 def get_tsas(self) -> list[TimestampAuthorityClient]:
763 """
764 Returns timestamp authority clients for urls configured in signing config.
765 """
766 return [TimestampAuthorityClient(s.url) for s in self._tsas]
769class TrustedRoot:
770 """
771 The cryptographic root(s) of trust for a Sigstore instance.
772 """
774 class TrustedRootType(str, Enum):
775 """
776 Known Sigstore trusted root media types.
777 """
779 TRUSTED_ROOT_0_1 = "application/vnd.dev.sigstore.trustedroot+json;version=0.1"
781 def __str__(self) -> str:
782 """Returns the variant's string value."""
783 return self.value
785 def __init__(self, inner: trustroot_v1.TrustedRoot):
786 """
787 Construct a new `TrustedRoot`.
789 @api private
790 """
791 self._inner = inner
792 self._verify()
794 def _verify(self) -> None:
795 """
796 Performs various feats of heroism to ensure that the trusted root
797 is well-formed.
798 """
800 # The trusted root must have a recognized media type.
801 try:
802 TrustedRoot.TrustedRootType(self._inner.media_type)
803 except ValueError:
804 raise Error(f"unsupported trusted root format: {self._inner.media_type}")
806 @classmethod
807 def from_file(
808 cls,
809 path: str,
810 ) -> TrustedRoot:
811 """Create a new trust root from file"""
812 inner = trustroot_v1.TrustedRoot.from_json(Path(path).read_bytes())
813 return cls(inner)
815 def _get_tlog_keys(
816 self, tlogs: list[trustroot_v1.TransparencyLogInstance], purpose: KeyringPurpose
817 ) -> Iterable[common_v1.PublicKey]:
818 """
819 Yields an iterator of public keys for transparency log instances that
820 are suitable for `purpose`.
821 """
822 allow_expired = purpose is KeyringPurpose.VERIFY
823 for tlog in tlogs:
824 if not is_timerange_valid(
825 tlog.public_key.valid_for, allow_expired=allow_expired
826 ):
827 continue
829 yield tlog.public_key
831 def rekor_keyring(self, purpose: KeyringPurpose) -> RekorKeyring:
832 """Return keyring with keys for Rekor."""
834 keys: list[common_v1.PublicKey] = list(
835 self._get_tlog_keys(self._inner.tlogs, purpose)
836 )
837 if len(keys) == 0:
838 raise MetadataError("Did not find any Rekor keys in trusted root")
839 return RekorKeyring(Keyring(keys))
841 def ct_keyring(self, purpose: KeyringPurpose) -> CTKeyring:
842 """Return keyring with key for CTFE."""
843 ctfes: list[common_v1.PublicKey] = list(
844 self._get_tlog_keys(self._inner.ctlogs, purpose)
845 )
846 if not ctfes:
847 raise MetadataError("CTFE keys not found in trusted root")
848 return CTKeyring(Keyring(ctfes))
850 def get_fulcio_certs(self) -> list[Certificate]:
851 """Return the Fulcio certificates."""
853 certs: list[Certificate] = []
855 # Return expired certificates too: they are expired now but may have
856 # been active when the certificate was used to sign.
857 for authority in self._inner.certificate_authorities:
858 certificate_authority = CertificateAuthority(authority)
859 certs.extend(certificate_authority.certificates(allow_expired=True))
861 if not certs:
862 raise MetadataError("Fulcio certificates not found in trusted root")
863 return certs
865 def get_timestamp_authorities(self) -> list[CertificateAuthority]:
866 """
867 Return the TSA present in the trusted root.
869 This list may be empty and in this case, no timestamp verification can be
870 performed.
871 """
872 certificate_authorities: list[CertificateAuthority] = [
873 CertificateAuthority(cert_chain)
874 for cert_chain in self._inner.timestamp_authorities
875 ]
876 return certificate_authorities
879class ClientTrustConfig:
880 """
881 Represents a Sigstore client's trust configuration, including a root of trust.
882 """
884 class ClientTrustConfigType(str, Enum):
885 """
886 Known Sigstore client trust config media types.
887 """
889 CONFIG_0_1 = "application/vnd.dev.sigstore.clienttrustconfig.v0.1+json"
891 def __str__(self) -> str:
892 """Returns the variant's string value."""
893 return self.value
895 @classmethod
896 def from_json(cls, raw: str) -> ClientTrustConfig:
897 """
898 Deserialize the given client trust config.
899 """
900 inner = trustroot_v1.ClientTrustConfig.from_json(raw)
901 return cls(inner)
903 @classmethod
904 def production(
905 cls,
906 offline: bool = False,
907 ) -> ClientTrustConfig:
908 """Create new trust config from Sigstore production TUF repository.
910 If `offline`, will use data in local TUF cache. Otherwise will
911 update the data from remote TUF repository.
912 """
913 return cls.from_tuf(DEFAULT_TUF_URL, offline)
915 @classmethod
916 def staging(
917 cls,
918 offline: bool = False,
919 ) -> ClientTrustConfig:
920 """Create new trust config from Sigstore staging TUF repository.
922 If `offline`, will use data in local TUF cache. Otherwise will
923 update the data from remote TUF repository.
924 """
925 return cls.from_tuf(STAGING_TUF_URL, offline)
927 @classmethod
928 def from_tuf(
929 cls,
930 url: str,
931 offline: bool = False,
932 ) -> ClientTrustConfig:
933 """Create a new trust config from a TUF repository.
935 If `offline`, will use data in local TUF cache. Otherwise will
936 update the trust config from remote TUF repository.
937 """
938 updater = TrustUpdater(url, offline)
940 tr_path = updater.get_trusted_root_path()
941 inner_tr = trustroot_v1.TrustedRoot.from_json(Path(tr_path).read_bytes())
943 try:
944 sc_path = updater.get_signing_config_path()
945 inner_sc = trustroot_v1.SigningConfig.from_json(Path(sc_path).read_bytes())
946 except TUFError as e:
947 raise e
949 return cls(
950 trustroot_v1.ClientTrustConfig(
951 media_type=ClientTrustConfig.ClientTrustConfigType.CONFIG_0_1.value,
952 trusted_root=inner_tr,
953 signing_config=inner_sc,
954 )
955 )
957 def __init__(self, inner: trustroot_v1.ClientTrustConfig) -> None:
958 """
959 @api private
960 """
961 self._inner = inner
963 # This can be used to enforce a specific rekor major version in signingconfig
964 self.force_tlog_version: int | None = None
966 @property
967 def trusted_root(self) -> TrustedRoot:
968 """
969 Return the interior root of trust, as a `TrustedRoot`.
970 """
971 return TrustedRoot(self._inner.trusted_root)
973 @property
974 def signing_config(self) -> SigningConfig:
975 """
976 Return the interior root of trust, as a `SigningConfig`.
977 """
978 return SigningConfig(
979 self._inner.signing_config, tlog_version=self.force_tlog_version
980 )