Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sigstore/models.py: 42%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright 2022 The Sigstore Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""
16Common models shared between signing and verification.
17"""
19from __future__ import annotations
21import base64
22import logging
23import typing
24from enum import Enum
25from textwrap import dedent
26from typing import Any
28import rfc8785
29from cryptography.hazmat.primitives.serialization import Encoding
30from cryptography.x509 import (
31 Certificate,
32 load_der_x509_certificate,
33)
34from pydantic import TypeAdapter
35from rekor_types import Dsse, Hashedrekord, ProposedEntry
36from rfc3161_client import TimeStampResponse, decode_timestamp_response
37from sigstore_models.bundle import v1 as bundle_v1
38from sigstore_models.bundle.v1 import Bundle as _Bundle
39from sigstore_models.bundle.v1 import (
40 TimestampVerificationData as _TimestampVerificationData,
41)
42from sigstore_models.bundle.v1 import VerificationMaterial as _VerificationMaterial
43from sigstore_models.common import v1 as common_v1
44from sigstore_models.common.v1 import MessageSignature, RFC3161SignedTimestamp
45from sigstore_models.rekor import v1 as rekor_v1
46from sigstore_models.rekor.v1 import TransparencyLogEntry as _TransparencyLogEntry
48from sigstore import dsse
49from sigstore._internal.merkle import verify_merkle_inclusion
50from sigstore._internal.rekor.checkpoint import verify_checkpoint
51from sigstore._utils import (
52 KeyID,
53 cert_is_leaf,
54 cert_is_root_ca,
55)
56from sigstore.errors import Error, VerificationError
58if typing.TYPE_CHECKING:
59 from sigstore._internal.trust import RekorKeyring
62_logger = logging.getLogger(__name__)
65class TransparencyLogEntry:
66 """
67 Represents a transparency log entry.
68 """
70 def __init__(self, inner: _TransparencyLogEntry) -> None:
71 """
72 Creates a new `TransparencyLogEntry` from the given inner object.
74 @private
75 """
76 self._inner = inner
77 self._validate()
79 def _validate(self) -> None:
80 """
81 Ensure this transparency log entry is well-formed and upholds our
82 client invariants.
83 """
85 inclusion_proof: rekor_v1.InclusionProof | None = self._inner.inclusion_proof
86 # This check is required by us as the client, not the
87 # protobuf-specs themselves.
88 if not inclusion_proof or not inclusion_proof.checkpoint:
89 raise InvalidBundle("entry must contain inclusion proof, with checkpoint")
91 def __eq__(self, value: object) -> bool:
92 """
93 Compares this `TransparencyLogEntry` with another object for equality.
95 Two `TransparencyLogEntry` instances are considered equal if their
96 inner contents are equal.
97 """
98 if not isinstance(value, TransparencyLogEntry):
99 return NotImplemented
100 return self._inner == value._inner
102 @classmethod
103 def _from_v1_response(cls, dict_: dict[str, Any]) -> TransparencyLogEntry:
104 """
105 Create a new `TransparencyLogEntry` from the given API response.
106 """
108 # Assumes we only get one entry back
109 entries = list(dict_.items())
110 if len(entries) != 1:
111 raise ValueError("Received multiple entries in response")
112 _, entry = entries[0]
114 # Fill in the appropriate kind
115 body_entry: ProposedEntry = TypeAdapter(ProposedEntry).validate_json(
116 base64.b64decode(entry["body"])
117 )
118 if not isinstance(body_entry, (Hashedrekord, Dsse)):
119 raise InvalidBundle("log entry is not of expected type")
121 raw_inclusion_proof = entry["verification"]["inclusionProof"]
123 # NOTE: The type ignores below are a consequence of our Pydantic
124 # modeling: mypy and other typecheckers see `ProtoU64` as `int`,
125 # but it gets coerced from a string due to Protobuf's JSON serialization.
126 inner = _TransparencyLogEntry(
127 log_index=str(entry["logIndex"]), # type: ignore[arg-type]
128 log_id=common_v1.LogId(
129 key_id=base64.b64encode(bytes.fromhex(entry["logID"]))
130 ),
131 kind_version=rekor_v1.KindVersion(
132 kind=body_entry.kind, version=body_entry.api_version
133 ),
134 integrated_time=str(entry["integratedTime"]), # type: ignore[arg-type]
135 inclusion_promise=rekor_v1.InclusionPromise(
136 signed_entry_timestamp=entry["verification"]["signedEntryTimestamp"]
137 ),
138 inclusion_proof=rekor_v1.InclusionProof(
139 log_index=str(raw_inclusion_proof["logIndex"]), # type: ignore[arg-type]
140 root_hash=base64.b64encode(
141 bytes.fromhex(raw_inclusion_proof["rootHash"])
142 ),
143 tree_size=str(raw_inclusion_proof["treeSize"]), # type: ignore[arg-type]
144 hashes=[
145 base64.b64encode(bytes.fromhex(h))
146 for h in raw_inclusion_proof["hashes"]
147 ],
148 checkpoint=rekor_v1.Checkpoint(
149 envelope=raw_inclusion_proof["checkpoint"]
150 ),
151 ),
152 canonicalized_body=entry["body"],
153 )
155 return cls(inner)
157 def _encode_canonical(self) -> bytes:
158 """
159 Returns a canonicalized JSON (RFC 8785) representation of the transparency log entry.
161 This encoded representation is suitable for verification against
162 the Signed Entry Timestamp.
163 """
164 # We might not have an integrated time if our log entry is from rekor
165 # v2, i.e. was integrated synchronously instead of via an
166 # inclusion promise.
167 if self._inner.integrated_time is None:
168 raise ValueError(
169 "can't encode canonical form for SET without integrated time"
170 )
172 payload: dict[str, int | str] = {
173 "body": base64.b64encode(self._inner.canonicalized_body).decode(),
174 "integratedTime": self._inner.integrated_time,
175 "logID": self._inner.log_id.key_id.hex(),
176 "logIndex": self._inner.log_index,
177 }
179 return rfc8785.dumps(payload)
181 def _verify_set(self, keyring: RekorKeyring) -> None:
182 """
183 Verify the inclusion promise (Signed Entry Timestamp) for a given transparency log
184 `entry` using the given `keyring`.
186 Fails if the given log entry does not contain an inclusion promise.
187 """
189 if self._inner.inclusion_promise is None:
190 raise VerificationError("SET: invalid inclusion promise: missing")
192 signed_entry_ts = self._inner.inclusion_promise.signed_entry_timestamp
194 try:
195 keyring.verify(
196 key_id=KeyID(self._inner.log_id.key_id),
197 signature=signed_entry_ts,
198 data=self._encode_canonical(),
199 )
200 except VerificationError as exc:
201 raise VerificationError(f"SET: invalid inclusion promise: {exc}")
203 def _verify(self, keyring: RekorKeyring) -> None:
204 """
205 Verifies this log entry.
207 This method performs steps (5), (6), and optionally (7) in
208 the top-level verify API:
210 * Verifies the consistency of the entry with the given bundle;
211 * Verifies the Merkle inclusion proof and its signed checkpoint;
212 * Verifies the inclusion promise, if present.
213 """
215 verify_merkle_inclusion(self)
216 verify_checkpoint(keyring, self)
218 _logger.debug(
219 f"successfully verified inclusion proof: index={self._inner.log_index}"
220 )
222 if self._inner.inclusion_promise and self._inner.integrated_time:
223 self._verify_set(keyring)
224 _logger.debug(
225 f"successfully verified inclusion promise: index={self._inner.log_index}"
226 )
229class TimestampVerificationData:
230 """
231 Represents a TimestampVerificationData structure.
233 @private
234 """
236 def __init__(self, inner: _TimestampVerificationData) -> None:
237 """Init method."""
238 self._inner = inner
239 self._verify()
241 def _verify(self) -> None:
242 """
243 Verifies the TimestampVerificationData.
245 It verifies that TimeStamp Responses embedded in the bundle are correctly
246 formed.
247 """
248 if not (timestamps := self._inner.rfc3161_timestamps):
249 timestamps = []
251 try:
252 self._signed_ts = [
253 decode_timestamp_response(ts.signed_timestamp) for ts in timestamps
254 ]
255 except ValueError:
256 raise VerificationError("Invalid Timestamp Response")
258 @property
259 def rfc3161_timestamps(self) -> list[TimeStampResponse]:
260 """Returns a list of signed timestamp."""
261 return self._signed_ts
263 @classmethod
264 def from_json(cls, raw: str | bytes) -> TimestampVerificationData:
265 """
266 Deserialize the given timestamp verification data.
267 """
268 inner = _TimestampVerificationData.from_json(raw)
269 return cls(inner)
272class VerificationMaterial:
273 """
274 Represents a VerificationMaterial structure.
275 """
277 def __init__(self, inner: _VerificationMaterial) -> None:
278 """Init method."""
279 self._inner = inner
281 @property
282 def timestamp_verification_data(self) -> TimestampVerificationData | None:
283 """
284 Returns the Timestamp Verification Data, if present.
285 """
286 if (
287 self._inner.timestamp_verification_data
288 and self._inner.timestamp_verification_data.rfc3161_timestamps
289 ):
290 return TimestampVerificationData(self._inner.timestamp_verification_data)
291 return None
294class InvalidBundle(Error):
295 """
296 Raised when the associated `Bundle` is invalid in some way.
297 """
299 def diagnostics(self) -> str:
300 """Returns diagnostics for the error."""
302 return dedent(
303 f"""\
304 An issue occurred while parsing the Sigstore bundle.
306 The provided bundle is malformed and may have been modified maliciously.
308 Additional context:
310 {self}
311 """
312 )
315class Bundle:
316 """
317 Represents a Sigstore bundle.
318 """
320 class BundleType(str, Enum):
321 """
322 Known Sigstore bundle media types.
323 """
325 BUNDLE_0_1 = "application/vnd.dev.sigstore.bundle+json;version=0.1"
326 BUNDLE_0_2 = "application/vnd.dev.sigstore.bundle+json;version=0.2"
327 BUNDLE_0_3_ALT = "application/vnd.dev.sigstore.bundle+json;version=0.3"
328 BUNDLE_0_3 = "application/vnd.dev.sigstore.bundle.v0.3+json"
330 def __str__(self) -> str:
331 """Returns the variant's string value."""
332 return self.value
334 def __init__(self, inner: _Bundle) -> None:
335 """
336 Creates a new bundle. This is not a public API; use
337 `from_json` instead.
339 @private
340 """
341 self._inner = inner
342 self._verify()
344 def _verify(self) -> None:
345 """
346 Performs various feats of heroism to ensure the bundle is well-formed
347 and upholds invariants, including:
349 * The "leaf" (signing) certificate is present;
350 * There is a inclusion proof present, even if the Bundle's version
351 predates a mandatory inclusion proof.
352 """
354 # The bundle must have a recognized media type.
355 try:
356 media_type = Bundle.BundleType(self._inner.media_type)
357 except ValueError:
358 raise InvalidBundle(f"unsupported bundle format: {self._inner.media_type}")
360 # Extract the signing certificate.
361 if media_type in (
362 Bundle.BundleType.BUNDLE_0_3,
363 Bundle.BundleType.BUNDLE_0_3_ALT,
364 ):
365 # For "v3" bundles, the signing certificate is the only one present.
366 if not self._inner.verification_material.certificate:
367 raise InvalidBundle("expected certificate in bundle")
369 leaf_cert = load_der_x509_certificate(
370 self._inner.verification_material.certificate.raw_bytes
371 )
372 else:
373 # In older bundles, there is an entire pool (misleadingly called
374 # a chain) of certificates, the first of which is the signing
375 # certificate.
376 if not self._inner.verification_material.x509_certificate_chain:
377 raise InvalidBundle("expected certificate chain in bundle")
379 chain = self._inner.verification_material.x509_certificate_chain
380 if not chain.certificates:
381 raise InvalidBundle("expected non-empty certificate chain in bundle")
383 # Per client policy in protobuf-specs: the first entry in the chain
384 # MUST be a leaf certificate, and the rest of the chain MUST NOT
385 # include a root CA or any intermediate CAs that appear in an
386 # independent root of trust.
387 #
388 # We expect some old bundles to violate the rules around root
389 # and intermediate CAs, so we issue warnings and not hard errors
390 # in those cases.
391 leaf_cert, *chain_certs = (
392 load_der_x509_certificate(cert.raw_bytes) for cert in chain.certificates
393 )
394 if not cert_is_leaf(leaf_cert):
395 raise InvalidBundle(
396 "bundle contains an invalid leaf or non-leaf certificate in the leaf position"
397 )
399 for chain_cert in chain_certs:
400 # TODO: We should also retrieve the root of trust here and
401 # cross-check against it.
402 if cert_is_root_ca(chain_cert):
403 _logger.warning(
404 "this bundle contains a root CA, making it subject to misuse"
405 )
407 self._signing_certificate = leaf_cert
409 # Extract the log entry. For the time being, we expect
410 # bundles to only contain a single log entry.
411 tlog_entries = self._inner.verification_material.tlog_entries
412 if len(tlog_entries) != 1:
413 raise InvalidBundle("expected exactly one log entry in bundle")
414 tlog_entry = tlog_entries[0]
416 # Handling of inclusion promises and proofs varies between bundle
417 # format versions:
418 #
419 # * For 0.1, an inclusion promise is required; the client
420 # MUST verify the inclusion promise.
421 # The inclusion proof is NOT required. If provided, it might NOT
422 # contain a checkpoint; in this case, we ignore it (since it's
423 # useless without one).
424 #
425 # * For 0.2+, an inclusion proof is required; the client MUST
426 # verify the inclusion proof. The inclusion prof MUST contain
427 # a checkpoint.
428 #
429 # The inclusion promise is NOT required if another source of signed
430 # time (such as a signed timestamp) is present. If no other source
431 # of signed time is present, then the inclusion promise MUST be
432 # present.
433 #
434 # Before all of this, we require that the inclusion proof be present
435 # (when constructing the LogEntry).
436 log_entry = TransparencyLogEntry(tlog_entry)
438 if media_type == Bundle.BundleType.BUNDLE_0_1:
439 if not log_entry._inner.inclusion_promise:
440 raise InvalidBundle("bundle must contain an inclusion promise")
441 if not log_entry._inner.inclusion_proof.checkpoint:
442 _logger.debug(
443 "0.1 bundle contains inclusion proof without checkpoint; ignoring"
444 )
445 else:
446 if not log_entry._inner.inclusion_proof.checkpoint:
447 raise InvalidBundle("expected checkpoint in inclusion proof")
449 if (
450 not log_entry._inner.inclusion_promise
451 and not self.verification_material.timestamp_verification_data
452 ):
453 raise InvalidBundle(
454 "bundle must contain an inclusion promise or signed timestamp(s)"
455 )
457 self._log_entry = log_entry
459 @property
460 def signing_certificate(self) -> Certificate:
461 """Returns the bundle's contained signing (i.e. leaf) certificate."""
462 return self._signing_certificate
464 @property
465 def log_entry(self) -> TransparencyLogEntry:
466 """
467 Returns the bundle's log entry, containing an inclusion proof
468 (with checkpoint) and an inclusion promise (if the latter is present).
469 """
470 return self._log_entry
472 @property
473 def _dsse_envelope(self) -> dsse.Envelope | None:
474 """
475 Returns the DSSE envelope within this Bundle as a `dsse.Envelope`.
477 @private
478 """
479 if self._inner.dsse_envelope is not None:
480 return dsse.Envelope(self._inner.dsse_envelope)
481 return None
483 @property
484 def signature(self) -> bytes:
485 """
486 Returns the signature bytes of this bundle.
487 Either from the DSSE Envelope or from the message itself.
488 """
489 return (
490 self._dsse_envelope.signature
491 if self._dsse_envelope
492 else self._inner.message_signature.signature # type: ignore[union-attr]
493 )
495 @property
496 def verification_material(self) -> VerificationMaterial:
497 """
498 Returns the bundle's verification material.
499 """
500 return VerificationMaterial(self._inner.verification_material)
502 @classmethod
503 def from_json(cls, raw: bytes | str) -> Bundle:
504 """
505 Deserialize the given Sigstore bundle.
506 """
507 try:
508 inner = _Bundle.from_json(raw)
509 except ValueError as exc:
510 raise InvalidBundle(f"failed to load bundle: {exc}")
511 return cls(inner)
513 def to_json(self) -> str:
514 """
515 Return a JSON encoding of this bundle.
516 """
517 return self._inner.to_json()
519 def _to_parts(
520 self,
521 ) -> tuple[Certificate, MessageSignature | dsse.Envelope, TransparencyLogEntry]:
522 """
523 Decompose the `Bundle` into its core constituent parts.
525 @private
526 """
528 content: MessageSignature | dsse.Envelope
529 if self._dsse_envelope:
530 content = self._dsse_envelope
531 else:
532 content = self._inner.message_signature # type: ignore[assignment]
534 return (self.signing_certificate, content, self.log_entry)
536 @classmethod
537 def from_parts(
538 cls, cert: Certificate, sig: bytes, log_entry: TransparencyLogEntry
539 ) -> Bundle:
540 """
541 Construct a Sigstore bundle (of `hashedrekord` type) from its
542 constituent parts.
543 """
545 return cls._from_parts(
546 cert, MessageSignature(signature=base64.b64encode(sig)), log_entry
547 )
549 @classmethod
550 def _from_parts(
551 cls,
552 cert: Certificate,
553 content: MessageSignature | dsse.Envelope,
554 log_entry: TransparencyLogEntry,
555 signed_timestamp: list[TimeStampResponse] | None = None,
556 ) -> Bundle:
557 """
558 @private
559 """
561 timestamp_verifcation_data = bundle_v1.TimestampVerificationData(
562 rfc3161_timestamps=[]
563 )
564 if signed_timestamp is not None:
565 timestamp_verifcation_data.rfc3161_timestamps.extend(
566 [
567 RFC3161SignedTimestamp(
568 signed_timestamp=base64.b64encode(response.as_bytes())
569 )
570 for response in signed_timestamp
571 ]
572 )
574 # Fill in the appropriate variant.
575 message_signature = None
576 dsse_envelope = None
577 if isinstance(content, MessageSignature):
578 message_signature = content
579 else:
580 dsse_envelope = content._inner
582 inner = _Bundle(
583 media_type=Bundle.BundleType.BUNDLE_0_3.value,
584 verification_material=bundle_v1.VerificationMaterial(
585 certificate=common_v1.X509Certificate(
586 raw_bytes=base64.b64encode(cert.public_bytes(Encoding.DER))
587 ),
588 tlog_entries=[log_entry._inner],
589 timestamp_verification_data=timestamp_verifcation_data,
590 ),
591 message_signature=message_signature,
592 dsse_envelope=dsse_envelope,
593 )
595 return cls(inner)