Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sigstore/models.py: 51%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright 2022 The Sigstore Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""
16Common models shared between signing and verification.
17"""
19from __future__ import annotations
21import base64
22import json
23import logging
24import typing
25from enum import Enum
26from textwrap import dedent
27from typing import Any, Optional
29import rfc8785
30from cryptography.hazmat.primitives.serialization import Encoding
31from cryptography.x509 import (
32 Certificate,
33 load_der_x509_certificate,
34)
35from pydantic import (
36 BaseModel,
37 ConfigDict,
38 Field,
39 StrictInt,
40 StrictStr,
41 TypeAdapter,
42 ValidationInfo,
43 field_validator,
44)
45from pydantic.dataclasses import dataclass
46from rekor_types import Dsse, Hashedrekord, ProposedEntry
47from rfc3161_client import TimeStampResponse, decode_timestamp_response
48from sigstore_protobuf_specs.dev.sigstore.bundle import v1 as bundle_v1
49from sigstore_protobuf_specs.dev.sigstore.bundle.v1 import (
50 Bundle as _Bundle,
51)
52from sigstore_protobuf_specs.dev.sigstore.bundle.v1 import (
53 TimestampVerificationData as _TimestampVerificationData,
54)
55from sigstore_protobuf_specs.dev.sigstore.bundle.v1 import (
56 VerificationMaterial as _VerificationMaterial,
57)
58from sigstore_protobuf_specs.dev.sigstore.common import v1 as common_v1
59from sigstore_protobuf_specs.dev.sigstore.common.v1 import Rfc3161SignedTimestamp
60from sigstore_protobuf_specs.dev.sigstore.rekor import v1 as rekor_v1
61from sigstore_protobuf_specs.dev.sigstore.rekor.v1 import InclusionProof, KindVersion
63from sigstore import dsse
64from sigstore._internal.merkle import verify_merkle_inclusion
65from sigstore._internal.rekor.checkpoint import verify_checkpoint
66from sigstore._utils import (
67 B64Str,
68 KeyID,
69 cert_is_leaf,
70 cert_is_root_ca,
71)
72from sigstore.errors import Error, VerificationError
74if typing.TYPE_CHECKING:
75 from sigstore._internal.trust import RekorKeyring
78_logger = logging.getLogger(__name__)
81class LogInclusionProof(BaseModel):
82 """
83 Represents an inclusion proof for a transparency log entry.
84 """
86 model_config = ConfigDict(populate_by_name=True)
88 checkpoint: StrictStr = Field(..., alias="checkpoint")
89 hashes: list[StrictStr] = Field(..., alias="hashes")
90 log_index: StrictInt = Field(..., alias="logIndex")
91 root_hash: StrictStr = Field(..., alias="rootHash")
92 tree_size: StrictInt = Field(..., alias="treeSize")
94 @field_validator("log_index")
95 def _log_index_positive(cls, v: int) -> int:
96 if v < 0:
97 raise ValueError(f"Inclusion proof has invalid log index: {v} < 0")
98 return v
100 @field_validator("tree_size")
101 def _tree_size_positive(cls, v: int) -> int:
102 if v < 0:
103 raise ValueError(f"Inclusion proof has invalid tree size: {v} < 0")
104 return v
106 @field_validator("tree_size")
107 def _log_index_within_tree_size(
108 cls, v: int, info: ValidationInfo, **kwargs: Any
109 ) -> int:
110 if "log_index" in info.data and v <= info.data["log_index"]:
111 raise ValueError(
112 "Inclusion proof has log index greater than or equal to tree size: "
113 f"{v} <= {info.data['log_index']}"
114 )
115 return v
118@dataclass(frozen=True)
119class LogEntry:
120 """
121 Represents a transparency log entry.
123 Log entries are retrieved from the transparency log after signing or verification events,
124 or loaded from "Sigstore" bundles provided by the user.
126 This representation allows for either a missing inclusion promise or a missing
127 inclusion proof, but not both: attempting to construct a `LogEntry` without
128 at least one will fail.
129 """
131 uuid: Optional[str] # noqa: UP045
132 """
133 This entry's unique ID in the log instance it was retrieved from.
135 For sharded log deployments, IDs are unique per-shard.
137 Not present for `LogEntry` instances loaded from Sigstore bundles.
138 """
140 body: B64Str
141 """
142 The base64-encoded body of the transparency log entry.
143 """
145 integrated_time: int
146 """
147 The UNIX time at which this entry was integrated into the transparency log.
148 """
150 log_id: str
151 """
152 The log's ID (as the SHA256 hash of the DER-encoded public key for the log
153 at the time of entry inclusion).
154 """
156 log_index: int
157 """
158 The index of this entry within the log.
159 """
161 inclusion_proof: LogInclusionProof
162 """
163 An inclusion proof for this log entry.
164 """
166 inclusion_promise: Optional[B64Str] # noqa: UP045
167 """
168 An inclusion promise for this log entry, if present.
170 Internally, this is a base64-encoded Signed Entry Timestamp (SET) for this
171 log entry.
172 """
174 _kind_version: KindVersion
175 """
176 The kind and version of the log entry.
177 """
179 @classmethod
180 def _from_response(cls, dict_: dict[str, Any]) -> LogEntry:
181 """
182 Create a new `LogEntry` from the given API response.
183 """
185 # Assumes we only get one entry back
186 entries = list(dict_.items())
187 if len(entries) != 1:
188 raise ValueError("Received multiple entries in response")
189 uuid, entry = entries[0]
191 # Fill in the appropriate kind
192 body_entry: ProposedEntry = TypeAdapter(ProposedEntry).validate_json(
193 base64.b64decode(entry["body"])
194 )
195 if not isinstance(body_entry, (Hashedrekord, Dsse)):
196 raise InvalidBundle("log entry is not of expected type")
198 return LogEntry(
199 uuid=uuid,
200 body=entry["body"],
201 integrated_time=entry["integratedTime"],
202 log_id=entry["logID"],
203 log_index=entry["logIndex"],
204 inclusion_proof=LogInclusionProof.model_validate(
205 entry["verification"]["inclusionProof"]
206 ),
207 inclusion_promise=entry["verification"]["signedEntryTimestamp"],
208 _kind_version=KindVersion(
209 kind=body_entry.kind, version=body_entry.api_version
210 ),
211 )
213 @classmethod
214 def _from_dict_rekor(cls, dict_: dict[str, Any]) -> LogEntry:
215 """
216 Create a new `LogEntry` from the given Rekor TransparencyLogEntry.
217 """
218 tlog_entry = rekor_v1.TransparencyLogEntry()
219 tlog_entry.from_dict(dict_)
221 inclusion_proof: InclusionProof | None = tlog_entry.inclusion_proof
222 # This check is required by us as the client, not the
223 # protobuf-specs themselves.
224 if not inclusion_proof or not inclusion_proof.checkpoint.envelope:
225 raise InvalidBundle("entry must contain inclusion proof, with checkpoint")
227 parsed_inclusion_proof = LogInclusionProof(
228 checkpoint=inclusion_proof.checkpoint.envelope,
229 hashes=[h.hex() for h in inclusion_proof.hashes],
230 log_index=inclusion_proof.log_index,
231 root_hash=inclusion_proof.root_hash.hex(),
232 tree_size=inclusion_proof.tree_size,
233 )
235 inclusion_promise: B64Str | None = None
236 if tlog_entry.inclusion_promise:
237 inclusion_promise = B64Str(
238 base64.b64encode(
239 tlog_entry.inclusion_promise.signed_entry_timestamp
240 ).decode()
241 )
243 return LogEntry(
244 uuid=None,
245 body=B64Str(base64.b64encode(tlog_entry.canonicalized_body).decode()),
246 integrated_time=tlog_entry.integrated_time,
247 log_id=tlog_entry.log_id.key_id.hex(),
248 log_index=tlog_entry.log_index,
249 inclusion_proof=parsed_inclusion_proof,
250 _kind_version=tlog_entry.kind_version,
251 inclusion_promise=inclusion_promise,
252 )
254 def _to_rekor(self) -> rekor_v1.TransparencyLogEntry:
255 """
256 Create a new protobuf-level `TransparencyLogEntry` from this `LogEntry`.
258 @private
259 """
260 inclusion_proof = rekor_v1.InclusionProof(
261 log_index=self.inclusion_proof.log_index,
262 root_hash=bytes.fromhex(self.inclusion_proof.root_hash),
263 tree_size=self.inclusion_proof.tree_size,
264 hashes=[bytes.fromhex(hash_) for hash_ in self.inclusion_proof.hashes],
265 checkpoint=rekor_v1.Checkpoint(envelope=self.inclusion_proof.checkpoint),
266 )
268 tlog_entry = rekor_v1.TransparencyLogEntry(
269 log_index=self.log_index,
270 log_id=common_v1.LogId(key_id=bytes.fromhex(self.log_id)),
271 integrated_time=self.integrated_time,
272 inclusion_proof=inclusion_proof,
273 kind_version=self._kind_version,
274 canonicalized_body=base64.b64decode(self.body),
275 )
276 if self.inclusion_promise:
277 inclusion_promise = rekor_v1.InclusionPromise(
278 signed_entry_timestamp=base64.b64decode(self.inclusion_promise)
279 )
280 tlog_entry.inclusion_promise = inclusion_promise
282 return tlog_entry
284 def encode_canonical(self) -> bytes:
285 """
286 Returns a canonicalized JSON (RFC 8785) representation of the transparency log entry.
288 This encoded representation is suitable for verification against
289 the Signed Entry Timestamp.
290 """
291 payload: dict[str, int | str] = {
292 "body": self.body,
293 "integratedTime": self.integrated_time,
294 "logID": self.log_id,
295 "logIndex": self.log_index,
296 }
298 return rfc8785.dumps(payload)
300 def _verify_set(self, keyring: RekorKeyring) -> None:
301 """
302 Verify the inclusion promise (Signed Entry Timestamp) for a given transparency log
303 `entry` using the given `keyring`.
305 Fails if the given log entry does not contain an inclusion promise.
306 """
308 if self.inclusion_promise is None:
309 raise VerificationError("SET: invalid inclusion promise: missing")
311 signed_entry_ts = base64.b64decode(self.inclusion_promise)
313 try:
314 keyring.verify(
315 key_id=KeyID(bytes.fromhex(self.log_id)),
316 signature=signed_entry_ts,
317 data=self.encode_canonical(),
318 )
319 except VerificationError as exc:
320 raise VerificationError(f"SET: invalid inclusion promise: {exc}")
322 def _verify(self, keyring: RekorKeyring) -> None:
323 """
324 Verifies this log entry.
326 This method performs steps (5), (6), and optionally (7) in
327 the top-level verify API:
329 * Verifies the consistency of the entry with the given bundle;
330 * Verifies the Merkle inclusion proof and its signed checkpoint;
331 * Verifies the inclusion promise, if present.
332 """
334 verify_merkle_inclusion(self)
335 verify_checkpoint(keyring, self)
337 _logger.debug(f"successfully verified inclusion proof: index={self.log_index}")
339 if self.inclusion_promise:
340 self._verify_set(keyring)
341 _logger.debug(
342 f"successfully verified inclusion promise: index={self.log_index}"
343 )
346class TimestampVerificationData:
347 """
348 Represents a TimestampVerificationData structure.
350 @private
351 """
353 def __init__(self, inner: _TimestampVerificationData) -> None:
354 """Init method."""
355 self._inner = inner
356 self._verify()
358 def _verify(self) -> None:
359 """
360 Verifies the TimestampVerificationData.
362 It verifies that TimeStamp Responses embedded in the bundle are correctly
363 formed.
364 """
365 try:
366 self._signed_ts = [
367 decode_timestamp_response(ts.signed_timestamp)
368 for ts in self._inner.rfc3161_timestamps
369 ]
370 except ValueError:
371 raise VerificationError("Invalid Timestamp Response")
373 @property
374 def rfc3161_timestamps(self) -> list[TimeStampResponse]:
375 """Returns a list of signed timestamp."""
376 return self._signed_ts
378 @classmethod
379 def from_json(cls, raw: str | bytes) -> TimestampVerificationData:
380 """
381 Deserialize the given timestamp verification data.
382 """
383 inner = _TimestampVerificationData().from_json(raw)
384 return cls(inner)
387class VerificationMaterial:
388 """
389 Represents a VerificationMaterial structure.
390 """
392 def __init__(self, inner: _VerificationMaterial) -> None:
393 """Init method."""
394 self._inner = inner
396 @property
397 def timestamp_verification_data(self) -> TimestampVerificationData:
398 """
399 Returns the Timestamp Verification Data.
400 """
401 return TimestampVerificationData(self._inner.timestamp_verification_data)
404class InvalidBundle(Error):
405 """
406 Raised when the associated `Bundle` is invalid in some way.
407 """
409 def diagnostics(self) -> str:
410 """Returns diagnostics for the error."""
412 return dedent(
413 f"""\
414 An issue occurred while parsing the Sigstore bundle.
416 The provided bundle is malformed and may have been modified maliciously.
418 Additional context:
420 {self}
421 """
422 )
425class Bundle:
426 """
427 Represents a Sigstore bundle.
428 """
430 class BundleType(str, Enum):
431 """
432 Known Sigstore bundle media types.
433 """
435 BUNDLE_0_1 = "application/vnd.dev.sigstore.bundle+json;version=0.1"
436 BUNDLE_0_2 = "application/vnd.dev.sigstore.bundle+json;version=0.2"
437 BUNDLE_0_3_ALT = "application/vnd.dev.sigstore.bundle+json;version=0.3"
438 BUNDLE_0_3 = "application/vnd.dev.sigstore.bundle.v0.3+json"
440 def __str__(self) -> str:
441 """Returns the variant's string value."""
442 return self.value
444 def __init__(self, inner: _Bundle) -> None:
445 """
446 Creates a new bundle. This is not a public API; use
447 `from_json` instead.
449 @private
450 """
451 self._inner = inner
452 self._verify()
454 def _verify(self) -> None:
455 """
456 Performs various feats of heroism to ensure the bundle is well-formed
457 and upholds invariants, including:
459 * The "leaf" (signing) certificate is present;
460 * There is a inclusion proof present, even if the Bundle's version
461 predates a mandatory inclusion proof.
462 """
464 # The bundle must have a recognized media type.
465 try:
466 media_type = Bundle.BundleType(self._inner.media_type)
467 except ValueError:
468 raise InvalidBundle(f"unsupported bundle format: {self._inner.media_type}")
470 # Extract the signing certificate.
471 if media_type in (
472 Bundle.BundleType.BUNDLE_0_3,
473 Bundle.BundleType.BUNDLE_0_3_ALT,
474 ):
475 # For "v3" bundles, the signing certificate is the only one present.
476 if not self._inner.verification_material.certificate:
477 raise InvalidBundle("expected certificate in bundle")
479 leaf_cert = load_der_x509_certificate(
480 self._inner.verification_material.certificate.raw_bytes
481 )
482 else:
483 # In older bundles, there is an entire pool (misleadingly called
484 # a chain) of certificates, the first of which is the signing
485 # certificate.
486 chain = self._inner.verification_material.x509_certificate_chain
487 if not chain or not chain.certificates:
488 raise InvalidBundle("expected non-empty certificate chain in bundle")
490 # Per client policy in protobuf-specs: the first entry in the chain
491 # MUST be a leaf certificate, and the rest of the chain MUST NOT
492 # include a root CA or any intermediate CAs that appear in an
493 # independent root of trust.
494 #
495 # We expect some old bundles to violate the rules around root
496 # and intermediate CAs, so we issue warnings and not hard errors
497 # in those cases.
498 leaf_cert, *chain_certs = (
499 load_der_x509_certificate(cert.raw_bytes) for cert in chain.certificates
500 )
501 if not cert_is_leaf(leaf_cert):
502 raise InvalidBundle(
503 "bundle contains an invalid leaf or non-leaf certificate in the leaf position"
504 )
506 for chain_cert in chain_certs:
507 # TODO: We should also retrieve the root of trust here and
508 # cross-check against it.
509 if cert_is_root_ca(chain_cert):
510 _logger.warning(
511 "this bundle contains a root CA, making it subject to misuse"
512 )
514 self._signing_certificate = leaf_cert
516 # Extract the log entry. For the time being, we expect
517 # bundles to only contain a single log entry.
518 tlog_entries = self._inner.verification_material.tlog_entries
519 if len(tlog_entries) != 1:
520 raise InvalidBundle("expected exactly one log entry in bundle")
521 tlog_entry = tlog_entries[0]
523 # Handling of inclusion promises and proofs varies between bundle
524 # format versions:
525 #
526 # * For 0.1, an inclusion promise is required; the client
527 # MUST verify the inclusion promise.
528 # The inclusion proof is NOT required. If provided, it might NOT
529 # contain a checkpoint; in this case, we ignore it (since it's
530 # useless without one).
531 #
532 # * For 0.2+, an inclusion proof is required; the client MUST
533 # verify the inclusion proof. The inclusion prof MUST contain
534 # a checkpoint.
535 #
536 # The inclusion promise is NOT required if another source of signed
537 # time (such as a signed timestamp) is present. If no other source
538 # of signed time is present, then the inclusion promise MUST be
539 # present.
540 #
541 # Before all of this, we require that the inclusion proof be present
542 # (when constructing the LogEntry).
543 log_entry = LogEntry._from_dict_rekor(tlog_entry.to_dict())
545 if media_type == Bundle.BundleType.BUNDLE_0_1:
546 if not log_entry.inclusion_promise:
547 raise InvalidBundle("bundle must contain an inclusion promise")
548 if not log_entry.inclusion_proof.checkpoint:
549 _logger.debug(
550 "0.1 bundle contains inclusion proof without checkpoint; ignoring"
551 )
552 else:
553 if not log_entry.inclusion_proof.checkpoint:
554 raise InvalidBundle("expected checkpoint in inclusion proof")
556 if (
557 not log_entry.inclusion_promise
558 and not self._inner.verification_material.timestamp_verification_data.rfc3161_timestamps
559 ):
560 raise InvalidBundle(
561 "bundle must contain an inclusion promise or signed timestamp(s)"
562 )
564 self._log_entry = log_entry
566 @property
567 def signing_certificate(self) -> Certificate:
568 """Returns the bundle's contained signing (i.e. leaf) certificate."""
569 return self._signing_certificate
571 @property
572 def log_entry(self) -> LogEntry:
573 """
574 Returns the bundle's log entry, containing an inclusion proof
575 (with checkpoint) and an inclusion promise (if the latter is present).
576 """
577 return self._log_entry
579 @property
580 def _dsse_envelope(self) -> dsse.Envelope | None:
581 """
582 Returns the DSSE envelope within this Bundle as a `dsse.Envelope`.
584 @private
585 """
586 if self._inner.is_set("dsse_envelope"):
587 return dsse.Envelope(self._inner.dsse_envelope) # type: ignore[arg-type]
588 return None
590 @property
591 def signature(self) -> bytes:
592 """
593 Returns the signature bytes of this bundle.
594 Either from the DSSE Envelope or from the message itself.
595 """
596 return (
597 self._dsse_envelope.signature
598 if self._dsse_envelope
599 else self._inner.message_signature.signature # type: ignore[union-attr]
600 )
602 @property
603 def verification_material(self) -> VerificationMaterial:
604 """
605 Returns the bundle's verification material.
606 """
607 return VerificationMaterial(self._inner.verification_material)
609 @classmethod
610 def from_json(cls, raw: bytes | str) -> Bundle:
611 """
612 Deserialize the given Sigstore bundle.
613 """
614 inner = _Bundle.from_dict(json.loads(raw))
615 return cls(inner)
617 def to_json(self) -> str:
618 """
619 Return a JSON encoding of this bundle.
620 """
621 return self._inner.to_json()
623 def _to_parts(
624 self,
625 ) -> tuple[Certificate, common_v1.MessageSignature | dsse.Envelope, LogEntry]:
626 """
627 Decompose the `Bundle` into its core constituent parts.
629 @private
630 """
632 content: common_v1.MessageSignature | dsse.Envelope
633 if self._dsse_envelope:
634 content = self._dsse_envelope
635 else:
636 content = self._inner.message_signature # type: ignore[assignment]
638 return (self.signing_certificate, content, self.log_entry)
640 @classmethod
641 def from_parts(cls, cert: Certificate, sig: bytes, log_entry: LogEntry) -> Bundle:
642 """
643 Construct a Sigstore bundle (of `hashedrekord` type) from its
644 constituent parts.
645 """
647 return cls._from_parts(
648 cert, common_v1.MessageSignature(signature=sig), log_entry
649 )
651 @classmethod
652 def _from_parts(
653 cls,
654 cert: Certificate,
655 content: common_v1.MessageSignature | dsse.Envelope,
656 log_entry: LogEntry,
657 signed_timestamp: list[TimeStampResponse] | None = None,
658 ) -> Bundle:
659 """
660 @private
661 """
663 timestamp_verifcation_data = bundle_v1.TimestampVerificationData(
664 rfc3161_timestamps=[]
665 )
666 if signed_timestamp is not None:
667 timestamp_verifcation_data.rfc3161_timestamps.extend(
668 [
669 Rfc3161SignedTimestamp(signed_timestamp=response.as_bytes())
670 for response in signed_timestamp
671 ]
672 )
674 # Fill in the appropriate variants.
675 if isinstance(content, common_v1.MessageSignature):
676 # mypy will be mystified if types are specified here
677 content_dict: dict[str, Any] = {"message_signature": content}
678 else:
679 content_dict = {"dsse_envelope": content._inner}
681 inner = _Bundle(
682 media_type=Bundle.BundleType.BUNDLE_0_3.value,
683 verification_material=bundle_v1.VerificationMaterial(
684 certificate=common_v1.X509Certificate(cert.public_bytes(Encoding.DER)),
685 tlog_entries=[log_entry._to_rekor()],
686 timestamp_verification_data=timestamp_verifcation_data,
687 ),
688 **content_dict,
689 )
691 return cls(inner)