Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sigstore/models.py: 48%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright 2022 The Sigstore Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""
16Common models shared between signing and verification.
17"""
19from __future__ import annotations
21import base64
22import logging
23import typing
24from enum import Enum
25from textwrap import dedent
26from typing import Any, Optional
28import rfc8785
29from cryptography.hazmat.primitives.serialization import Encoding
30from cryptography.x509 import (
31 Certificate,
32 load_der_x509_certificate,
33)
34from pydantic import (
35 BaseModel,
36 ConfigDict,
37 Field,
38 StrictInt,
39 StrictStr,
40 TypeAdapter,
41 ValidationInfo,
42 field_validator,
43)
44from pydantic.dataclasses import dataclass
45from rekor_types import Dsse, Hashedrekord, ProposedEntry
46from rfc3161_client import TimeStampResponse, decode_timestamp_response
47from sigstore_protobuf_specs.dev.sigstore.bundle import v1 as bundle_v1
48from sigstore_protobuf_specs.dev.sigstore.bundle.v1 import (
49 Bundle as _Bundle,
50)
51from sigstore_protobuf_specs.dev.sigstore.bundle.v1 import (
52 TimestampVerificationData as _TimestampVerificationData,
53)
54from sigstore_protobuf_specs.dev.sigstore.bundle.v1 import (
55 VerificationMaterial as _VerificationMaterial,
56)
57from sigstore_protobuf_specs.dev.sigstore.common import v1 as common_v1
58from sigstore_protobuf_specs.dev.sigstore.common.v1 import Rfc3161SignedTimestamp
59from sigstore_protobuf_specs.dev.sigstore.rekor import v1 as rekor_v1
60from sigstore_protobuf_specs.dev.sigstore.rekor.v1 import (
61 InclusionProof,
62)
64from sigstore import dsse
65from sigstore._internal.merkle import verify_merkle_inclusion
66from sigstore._internal.rekor.checkpoint import verify_checkpoint
67from sigstore._utils import (
68 B64Str,
69 KeyID,
70 cert_is_leaf,
71 cert_is_root_ca,
72)
73from sigstore.errors import Error, VerificationError
75if typing.TYPE_CHECKING:
76 from sigstore._internal.trust import RekorKeyring
79_logger = logging.getLogger(__name__)
82class LogInclusionProof(BaseModel):
83 """
84 Represents an inclusion proof for a transparency log entry.
85 """
87 model_config = ConfigDict(populate_by_name=True)
89 checkpoint: StrictStr = Field(..., alias="checkpoint")
90 hashes: list[StrictStr] = Field(..., alias="hashes")
91 log_index: StrictInt = Field(..., alias="logIndex")
92 root_hash: StrictStr = Field(..., alias="rootHash")
93 tree_size: StrictInt = Field(..., alias="treeSize")
95 @field_validator("log_index")
96 def _log_index_positive(cls, v: int) -> int:
97 if v < 0:
98 raise ValueError(f"Inclusion proof has invalid log index: {v} < 0")
99 return v
101 @field_validator("tree_size")
102 def _tree_size_positive(cls, v: int) -> int:
103 if v < 0:
104 raise ValueError(f"Inclusion proof has invalid tree size: {v} < 0")
105 return v
107 @field_validator("tree_size")
108 def _log_index_within_tree_size(
109 cls, v: int, info: ValidationInfo, **kwargs: Any
110 ) -> int:
111 if "log_index" in info.data and v <= info.data["log_index"]:
112 raise ValueError(
113 "Inclusion proof has log index greater than or equal to tree size: "
114 f"{v} <= {info.data['log_index']}"
115 )
116 return v
119@dataclass(frozen=True)
120class LogEntry:
121 """
122 Represents a transparency log entry.
124 Log entries are retrieved from the transparency log after signing or verification events,
125 or loaded from "Sigstore" bundles provided by the user.
127 This representation allows for either a missing inclusion promise or a missing
128 inclusion proof, but not both: attempting to construct a `LogEntry` without
129 at least one will fail.
130 """
132 uuid: Optional[str]
133 """
134 This entry's unique ID in the log instance it was retrieved from.
136 For sharded log deployments, IDs are unique per-shard.
138 Not present for `LogEntry` instances loaded from Sigstore bundles.
139 """
141 body: B64Str
142 """
143 The base64-encoded body of the transparency log entry.
144 """
146 integrated_time: int
147 """
148 The UNIX time at which this entry was integrated into the transparency log.
149 """
151 log_id: str
152 """
153 The log's ID (as the SHA256 hash of the DER-encoded public key for the log
154 at the time of entry inclusion).
155 """
157 log_index: int
158 """
159 The index of this entry within the log.
160 """
162 inclusion_proof: LogInclusionProof
163 """
164 An inclusion proof for this log entry.
165 """
167 inclusion_promise: Optional[B64Str]
168 """
169 An inclusion promise for this log entry, if present.
171 Internally, this is a base64-encoded Signed Entry Timestamp (SET) for this
172 log entry.
173 """
175 @classmethod
176 def _from_response(cls, dict_: dict[str, Any]) -> LogEntry:
177 """
178 Create a new `LogEntry` from the given API response.
179 """
181 # Assumes we only get one entry back
182 entries = list(dict_.items())
183 if len(entries) != 1:
184 raise ValueError("Received multiple entries in response")
186 uuid, entry = entries[0]
187 return LogEntry(
188 uuid=uuid,
189 body=entry["body"],
190 integrated_time=entry["integratedTime"],
191 log_id=entry["logID"],
192 log_index=entry["logIndex"],
193 inclusion_proof=LogInclusionProof.model_validate(
194 entry["verification"]["inclusionProof"]
195 ),
196 inclusion_promise=entry["verification"]["signedEntryTimestamp"],
197 )
199 @classmethod
200 def _from_dict_rekor(cls, dict_: dict[str, Any]) -> LogEntry:
201 """
202 Create a new `LogEntry` from the given Rekor TransparencyLogEntry.
203 """
204 tlog_entry = rekor_v1.TransparencyLogEntry()
205 tlog_entry.from_dict(dict_)
207 inclusion_proof: InclusionProof | None = tlog_entry.inclusion_proof
208 # This check is required by us as the client, not the
209 # protobuf-specs themselves.
210 if not inclusion_proof or not inclusion_proof.checkpoint.envelope:
211 raise InvalidBundle("entry must contain inclusion proof, with checkpoint")
213 parsed_inclusion_proof = LogInclusionProof(
214 checkpoint=inclusion_proof.checkpoint.envelope,
215 hashes=[h.hex() for h in inclusion_proof.hashes],
216 log_index=inclusion_proof.log_index,
217 root_hash=inclusion_proof.root_hash.hex(),
218 tree_size=inclusion_proof.tree_size,
219 )
221 return LogEntry(
222 uuid=None,
223 body=B64Str(base64.b64encode(tlog_entry.canonicalized_body).decode()),
224 integrated_time=tlog_entry.integrated_time,
225 log_id=tlog_entry.log_id.key_id.hex(),
226 log_index=tlog_entry.log_index,
227 inclusion_proof=parsed_inclusion_proof,
228 inclusion_promise=B64Str(
229 base64.b64encode(
230 tlog_entry.inclusion_promise.signed_entry_timestamp
231 ).decode()
232 ),
233 )
235 def _to_rekor(self) -> rekor_v1.TransparencyLogEntry:
236 """
237 Create a new protobuf-level `TransparencyLogEntry` from this `LogEntry`.
239 @private
240 """
241 inclusion_promise: rekor_v1.InclusionPromise | None = None
242 if self.inclusion_promise:
243 inclusion_promise = rekor_v1.InclusionPromise(
244 signed_entry_timestamp=base64.b64decode(self.inclusion_promise)
245 )
247 inclusion_proof = rekor_v1.InclusionProof(
248 log_index=self.inclusion_proof.log_index,
249 root_hash=bytes.fromhex(self.inclusion_proof.root_hash),
250 tree_size=self.inclusion_proof.tree_size,
251 hashes=[bytes.fromhex(hash_) for hash_ in self.inclusion_proof.hashes],
252 checkpoint=rekor_v1.Checkpoint(envelope=self.inclusion_proof.checkpoint),
253 )
255 tlog_entry = rekor_v1.TransparencyLogEntry(
256 log_index=self.log_index,
257 log_id=common_v1.LogId(key_id=bytes.fromhex(self.log_id)),
258 integrated_time=self.integrated_time,
259 inclusion_promise=inclusion_promise, # type: ignore[arg-type]
260 inclusion_proof=inclusion_proof,
261 canonicalized_body=base64.b64decode(self.body),
262 )
264 # Fill in the appropriate kind
265 body_entry: ProposedEntry = TypeAdapter(ProposedEntry).validate_json(
266 tlog_entry.canonicalized_body
267 )
268 if not isinstance(body_entry, (Hashedrekord, Dsse)):
269 raise InvalidBundle("log entry is not of expected type")
271 tlog_entry.kind_version = rekor_v1.KindVersion(
272 kind=body_entry.kind, version=body_entry.api_version
273 )
275 return tlog_entry
277 def encode_canonical(self) -> bytes:
278 """
279 Returns a canonicalized JSON (RFC 8785) representation of the transparency log entry.
281 This encoded representation is suitable for verification against
282 the Signed Entry Timestamp.
283 """
284 payload: dict[str, int | str] = {
285 "body": self.body,
286 "integratedTime": self.integrated_time,
287 "logID": self.log_id,
288 "logIndex": self.log_index,
289 }
291 return rfc8785.dumps(payload)
293 def _verify_set(self, keyring: RekorKeyring) -> None:
294 """
295 Verify the inclusion promise (Signed Entry Timestamp) for a given transparency log
296 `entry` using the given `keyring`.
298 Fails if the given log entry does not contain an inclusion promise.
299 """
301 if self.inclusion_promise is None:
302 raise VerificationError("SET: invalid inclusion promise: missing")
304 signed_entry_ts = base64.b64decode(self.inclusion_promise)
306 try:
307 keyring.verify(
308 key_id=KeyID(bytes.fromhex(self.log_id)),
309 signature=signed_entry_ts,
310 data=self.encode_canonical(),
311 )
312 except VerificationError as exc:
313 raise VerificationError(f"SET: invalid inclusion promise: {exc}")
315 def _verify(self, keyring: RekorKeyring) -> None:
316 """
317 Verifies this log entry.
319 This method performs steps (5), (6), and optionally (7) in
320 the top-level verify API:
322 * Verifies the consistency of the entry with the given bundle;
323 * Verifies the Merkle inclusion proof and its signed checkpoint;
324 * Verifies the inclusion promise, if present.
325 """
327 verify_merkle_inclusion(self)
328 verify_checkpoint(keyring, self)
330 _logger.debug(f"successfully verified inclusion proof: index={self.log_index}")
332 if self.inclusion_promise:
333 self._verify_set(keyring)
334 _logger.debug(
335 f"successfully verified inclusion promise: index={self.log_index}"
336 )
339class TimestampVerificationData:
340 """
341 Represents a TimestampVerificationData structure.
343 @private
344 """
346 def __init__(self, inner: _TimestampVerificationData) -> None:
347 """Init method."""
348 self._inner = inner
349 self._verify()
351 def _verify(self) -> None:
352 """
353 Verifies the TimestampVerificationData.
355 It verifies that TimeStamp Responses embedded in the bundle are correctly
356 formed.
357 """
358 try:
359 self._signed_ts = [
360 decode_timestamp_response(ts.signed_timestamp)
361 for ts in self._inner.rfc3161_timestamps
362 ]
363 except ValueError:
364 raise VerificationError("Invalid Timestamp Response")
366 @property
367 def rfc3161_timestamps(self) -> list[TimeStampResponse]:
368 """Returns a list of signed timestamp."""
369 return self._signed_ts
371 @classmethod
372 def from_json(cls, raw: str | bytes) -> TimestampVerificationData:
373 """
374 Deserialize the given timestamp verification data.
375 """
376 inner = _TimestampVerificationData().from_json(raw)
377 return cls(inner)
380class VerificationMaterial:
381 """
382 Represents a VerificationMaterial structure.
383 """
385 def __init__(self, inner: _VerificationMaterial) -> None:
386 """Init method."""
387 self._inner = inner
389 @property
390 def timestamp_verification_data(self) -> TimestampVerificationData:
391 """
392 Returns the Timestamp Verification Data.
393 """
394 return TimestampVerificationData(self._inner.timestamp_verification_data)
397class InvalidBundle(Error):
398 """
399 Raised when the associated `Bundle` is invalid in some way.
400 """
402 def diagnostics(self) -> str:
403 """Returns diagnostics for the error."""
405 return dedent(
406 f"""\
407 An issue occurred while parsing the Sigstore bundle.
409 The provided bundle is malformed and may have been modified maliciously.
411 Additional context:
413 {self}
414 """
415 )
418class Bundle:
419 """
420 Represents a Sigstore bundle.
421 """
423 class BundleType(str, Enum):
424 """
425 Known Sigstore bundle media types.
426 """
428 BUNDLE_0_1 = "application/vnd.dev.sigstore.bundle+json;version=0.1"
429 BUNDLE_0_2 = "application/vnd.dev.sigstore.bundle+json;version=0.2"
430 BUNDLE_0_3_ALT = "application/vnd.dev.sigstore.bundle+json;version=0.3"
431 BUNDLE_0_3 = "application/vnd.dev.sigstore.bundle.v0.3+json"
433 def __str__(self) -> str:
434 """Returns the variant's string value."""
435 return self.value
437 def __init__(self, inner: _Bundle) -> None:
438 """
439 Creates a new bundle. This is not a public API; use
440 `from_json` instead.
442 @private
443 """
444 self._inner = inner
445 self._verify()
447 def _verify(self) -> None:
448 """
449 Performs various feats of heroism to ensure the bundle is well-formed
450 and upholds invariants, including:
452 * The "leaf" (signing) certificate is present;
453 * There is a inclusion proof present, even if the Bundle's version
454 predates a mandatory inclusion proof.
455 """
457 # The bundle must have a recognized media type.
458 try:
459 media_type = Bundle.BundleType(self._inner.media_type)
460 except ValueError:
461 raise InvalidBundle(f"unsupported bundle format: {self._inner.media_type}")
463 # Extract the signing certificate.
464 if media_type in (
465 Bundle.BundleType.BUNDLE_0_3,
466 Bundle.BundleType.BUNDLE_0_3_ALT,
467 ):
468 # For "v3" bundles, the signing certificate is the only one present.
469 leaf_cert = load_der_x509_certificate(
470 self._inner.verification_material.certificate.raw_bytes
471 )
472 else:
473 # In older bundles, there is an entire pool (misleadingly called
474 # a chain) of certificates, the first of which is the signing
475 # certificate.
476 certs = (
477 self._inner.verification_material.x509_certificate_chain.certificates
478 )
480 if len(certs) == 0:
481 raise InvalidBundle("expected non-empty certificate chain in bundle")
483 # Per client policy in protobuf-specs: the first entry in the chain
484 # MUST be a leaf certificate, and the rest of the chain MUST NOT
485 # include a root CA or any intermediate CAs that appear in an
486 # independent root of trust.
487 #
488 # We expect some old bundles to violate the rules around root
489 # and intermediate CAs, so we issue warnings and not hard errors
490 # in those cases.
491 leaf_cert, *chain_certs = [
492 load_der_x509_certificate(cert.raw_bytes) for cert in certs
493 ]
494 if not cert_is_leaf(leaf_cert):
495 raise InvalidBundle(
496 "bundle contains an invalid leaf or non-leaf certificate in the leaf position"
497 )
499 for chain_cert in chain_certs:
500 # TODO: We should also retrieve the root of trust here and
501 # cross-check against it.
502 if cert_is_root_ca(chain_cert):
503 _logger.warning(
504 "this bundle contains a root CA, making it subject to misuse"
505 )
507 self._signing_certificate = leaf_cert
509 # Extract the log entry. For the time being, we expect
510 # bundles to only contain a single log entry.
511 tlog_entries = self._inner.verification_material.tlog_entries
512 if len(tlog_entries) != 1:
513 raise InvalidBundle("expected exactly one log entry in bundle")
514 tlog_entry = tlog_entries[0]
516 # Handling of inclusion promises and proofs varies between bundle
517 # format versions:
518 #
519 # * For 0.1, an inclusion promise is required; the client
520 # MUST verify the inclusion promise.
521 # The inclusion proof is NOT required. If provided, it might NOT
522 # contain a checkpoint; in this case, we ignore it (since it's
523 # useless without one).
524 #
525 # * For 0.2+, an inclusion proof is required; the client MUST
526 # verify the inclusion proof. The inclusion prof MUST contain
527 # a checkpoint.
528 #
529 # The inclusion promise is NOT required if another source of signed
530 # time (such as a signed timestamp) is present. If no other source
531 # of signed time is present, then the inclusion promise MUST be
532 # present.
533 #
534 # Before all of this, we require that the inclusion proof be present
535 # (when constructing the LogEntry).
536 log_entry = LogEntry._from_dict_rekor(tlog_entry.to_dict())
538 if media_type == Bundle.BundleType.BUNDLE_0_1:
539 if not log_entry.inclusion_promise:
540 raise InvalidBundle("bundle must contain an inclusion promise")
541 if not log_entry.inclusion_proof.checkpoint:
542 _logger.debug(
543 "0.1 bundle contains inclusion proof without checkpoint; ignoring"
544 )
545 else:
546 if not log_entry.inclusion_proof.checkpoint:
547 raise InvalidBundle("expected checkpoint in inclusion proof")
549 if (
550 not log_entry.inclusion_promise
551 and not self._inner.verification_material.timestamp_verification_data.rfc3161_timestamps
552 ):
553 raise InvalidBundle(
554 "bundle must contain an inclusion promise or signed timestamp(s)"
555 )
557 self._log_entry = log_entry
559 @property
560 def signing_certificate(self) -> Certificate:
561 """Returns the bundle's contained signing (i.e. leaf) certificate."""
562 return self._signing_certificate
564 @property
565 def log_entry(self) -> LogEntry:
566 """
567 Returns the bundle's log entry, containing an inclusion proof
568 (with checkpoint) and an inclusion promise (if the latter is present).
569 """
570 return self._log_entry
572 @property
573 def _dsse_envelope(self) -> dsse.Envelope | None:
574 """
575 Returns the DSSE envelope within this Bundle as a `dsse.Envelope`.
577 @private
578 """
579 if self._inner.dsse_envelope:
580 return dsse.Envelope(self._inner.dsse_envelope)
581 return None
583 @property
584 def signature(self) -> bytes:
585 """
586 Returns the signature bytes of this bundle.
587 Either from the DSSE Envelope or from the message itself.
588 """
589 return (
590 self._dsse_envelope.signature
591 if self._dsse_envelope
592 else self._inner.message_signature.signature
593 )
595 @property
596 def verification_material(self) -> VerificationMaterial:
597 """
598 Returns the bundle's verification material.
599 """
600 return VerificationMaterial(self._inner.verification_material)
602 @classmethod
603 def from_json(cls, raw: bytes | str) -> Bundle:
604 """
605 Deserialize the given Sigstore bundle.
606 """
607 inner = _Bundle().from_json(raw)
608 return cls(inner)
610 def to_json(self) -> str:
611 """
612 Return a JSON encoding of this bundle.
613 """
614 return self._inner.to_json()
616 def _to_parts(
617 self,
618 ) -> tuple[Certificate, common_v1.MessageSignature | dsse.Envelope, LogEntry]:
619 """
620 Decompose the `Bundle` into its core constituent parts.
622 @private
623 """
625 content: common_v1.MessageSignature | dsse.Envelope
626 content = self._dsse_envelope or self._inner.message_signature
628 return (self.signing_certificate, content, self.log_entry)
630 @classmethod
631 def from_parts(cls, cert: Certificate, sig: bytes, log_entry: LogEntry) -> Bundle:
632 """
633 Construct a Sigstore bundle (of `hashedrekord` type) from its
634 constituent parts.
635 """
637 return cls._from_parts(
638 cert, common_v1.MessageSignature(signature=sig), log_entry
639 )
641 @classmethod
642 def _from_parts(
643 cls,
644 cert: Certificate,
645 content: common_v1.MessageSignature | dsse.Envelope,
646 log_entry: LogEntry,
647 signed_timestamp: Optional[list[TimeStampResponse]] = None,
648 ) -> Bundle:
649 """
650 @private
651 """
653 inner = _Bundle(
654 media_type=Bundle.BundleType.BUNDLE_0_3.value,
655 verification_material=bundle_v1.VerificationMaterial(
656 certificate=common_v1.X509Certificate(cert.public_bytes(Encoding.DER)),
657 ),
658 )
660 # Fill in the appropriate variants.
661 if isinstance(content, common_v1.MessageSignature):
662 inner.message_signature = content
663 else:
664 inner.dsse_envelope = content._inner
666 tlog_entry = log_entry._to_rekor()
667 inner.verification_material.tlog_entries = [tlog_entry]
669 if signed_timestamp is not None:
670 inner.verification_material.timestamp_verification_data = (
671 bundle_v1.TimestampVerificationData(
672 rfc3161_timestamps=[
673 Rfc3161SignedTimestamp(signed_timestamp=response.as_bytes())
674 for response in signed_timestamp
675 ]
676 )
677 )
679 return cls(inner)