1# Copyright 2022 The Sigstore Authors 
    2# 
    3# Licensed under the Apache License, Version 2.0 (the "License"); 
    4# you may not use this file except in compliance with the License. 
    5# You may obtain a copy of the License at 
    6# 
    7#      http://www.apache.org/licenses/LICENSE-2.0 
    8# 
    9# Unless required by applicable law or agreed to in writing, software 
    10# distributed under the License is distributed on an "AS IS" BASIS, 
    11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
    12# See the License for the specific language governing permissions and 
    13# limitations under the License. 
    14 
    15""" 
    16Functionality for building and manipulating in-toto Statements and DSSE envelopes. 
    17""" 
    18 
    19from __future__ import annotations 
    20 
    21import base64 
    22import logging 
    23from typing import Any, Literal, Optional 
    24 
    25from cryptography.exceptions import InvalidSignature 
    26from cryptography.hazmat.primitives import hashes 
    27from cryptography.hazmat.primitives.asymmetric import ec 
    28from pydantic import BaseModel, ConfigDict, Field, RootModel, StrictStr, ValidationError 
    29from sigstore_models.common.v1 import HashAlgorithm 
    30from sigstore_models.intoto import Envelope as _Envelope 
    31from sigstore_models.intoto import Signature as _Signature 
    32 
    33from sigstore.errors import Error, VerificationError 
    34from sigstore.hashes import Hashed 
    35 
    36_logger = logging.getLogger(__name__) 
    37 
    38Digest = Literal["sha256", "sha384", "sha512", "sha3_256", "sha3_384", "sha3_512"] 
    39""" 
    40NOTE: in-toto's DigestSet contains all kinds of hash algorithms that 
    41we intentionally do not support. This model is limited to common members of the 
    42SHA-2 and SHA-3 family that are at least as strong as SHA-256. 
    43 
    44See: <https://github.com/in-toto/attestation/blob/main/spec/v1/digest_set.md> 
    45""" 
    46 
    47DigestSet = RootModel[dict[Digest, str]] 
    48""" 
    49An internal validation model for in-toto subject digest sets. 
    50""" 
    51 
    52 
    53class Subject(BaseModel): 
    54    """ 
    55    A single in-toto statement subject. 
    56    """ 
    57 
    58    name: Optional[StrictStr]  # noqa: UP045 
    59    digest: DigestSet = Field(...) 
    60 
    61 
    62class _Statement(BaseModel): 
    63    """ 
    64    An internal validation model for in-toto statements. 
    65    """ 
    66 
    67    model_config = ConfigDict(populate_by_name=True) 
    68 
    69    type_: Literal["https://in-toto.io/Statement/v1"] = Field(..., alias="_type") 
    70    subjects: list[Subject] = Field(..., min_length=1, alias="subject") 
    71    predicate_type: StrictStr = Field(..., alias="predicateType") 
    72    predicate: Optional[dict[str, Any]] = Field(None, alias="predicate")  # noqa: UP045 
    73 
    74 
    75class Statement: 
    76    """ 
    77    Represents an in-toto statement. 
    78 
    79    This type deals with opaque bytes to ensure that the encoding does not 
    80    change, but Statements are internally checked for conformance against 
    81    the JSON object layout defined in the in-toto attestation spec. 
    82 
    83    See: <https://github.com/in-toto/attestation/blob/main/spec/v1/statement.md> 
    84    """ 
    85 
    86    def __init__(self, contents: bytes | _Statement) -> None: 
    87        """ 
    88        Construct a new Statement. 
    89 
    90        This takes an opaque `bytes` containing the statement; use 
    91        `StatementBuilder` to manually construct an in-toto statement 
    92        from constituent pieces. 
    93        """ 
    94        if isinstance(contents, bytes): 
    95            self._contents = contents 
    96            try: 
    97                self._inner = _Statement.model_validate_json(contents) 
    98            except ValidationError: 
    99                raise Error("malformed in-toto statement") 
    100        else: 
    101            self._contents = contents.model_dump_json(by_alias=True).encode() 
    102            self._inner = contents 
    103 
    104    def _matches_digest(self, digest: Hashed) -> bool: 
    105        """ 
    106        Returns a boolean indicating whether this in-toto Statement contains a subject 
    107        matching the given digest. The subject's name is **not** checked. 
    108 
    109        No digests other than SHA256 are currently supported. 
    110        """ 
    111        if digest.algorithm != HashAlgorithm.SHA2_256: 
    112            raise VerificationError(f"unexpected digest algorithm: {digest.algorithm}") 
    113 
    114        for sub in self._inner.subjects: 
    115            sub_digest = sub.digest.root.get("sha256") 
    116            if sub_digest is None: 
    117                continue 
    118            if sub_digest == digest.digest.hex(): 
    119                return True 
    120 
    121        return False 
    122 
    123    def _pae(self) -> bytes: 
    124        """ 
    125        Construct the PAE encoding for this statement. 
    126        """ 
    127 
    128        return _pae(Envelope._TYPE, self._contents) 
    129 
    130 
    131class StatementBuilder: 
    132    """ 
    133    A builder-style API for constructing in-toto Statements. 
    134    """ 
    135 
    136    def __init__( 
    137        self, 
    138        subjects: list[Subject] | None = None, 
    139        predicate_type: str | None = None, 
    140        predicate: dict[str, Any] | None = None, 
    141    ): 
    142        """ 
    143        Create a new `StatementBuilder`. 
    144        """ 
    145        self._subjects = subjects or [] 
    146        self._predicate_type = predicate_type 
    147        self._predicate = predicate 
    148 
    149    def subjects(self, subjects: list[Subject]) -> StatementBuilder: 
    150        """ 
    151        Configure the subjects for this builder. 
    152        """ 
    153        self._subjects = subjects 
    154        return self 
    155 
    156    def predicate_type(self, predicate_type: str) -> StatementBuilder: 
    157        """ 
    158        Configure the predicate type for this builder. 
    159        """ 
    160        self._predicate_type = predicate_type 
    161        return self 
    162 
    163    def predicate(self, predicate: dict[str, Any]) -> StatementBuilder: 
    164        """ 
    165        Configure the predicate for this builder. 
    166        """ 
    167        self._predicate = predicate 
    168        return self 
    169 
    170    def build(self) -> Statement: 
    171        """ 
    172        Build a `Statement` from the builder's state. 
    173        """ 
    174        try: 
    175            stmt = _Statement( 
    176                type_="https://in-toto.io/Statement/v1", 
    177                subjects=self._subjects, 
    178                predicate_type=self._predicate_type, 
    179                predicate=self._predicate, 
    180            ) 
    181        except ValidationError as e: 
    182            raise Error(f"invalid statement: {e}") 
    183 
    184        return Statement(stmt) 
    185 
    186 
    187class InvalidEnvelope(Error): 
    188    """ 
    189    Raised when the associated `Envelope` is invalid in some way. 
    190    """ 
    191 
    192 
    193class Envelope: 
    194    """ 
    195    Represents a DSSE envelope. 
    196 
    197    This class cannot be constructed directly; you must use `sign` or `from_json`. 
    198 
    199    See: <https://github.com/secure-systems-lab/dsse/blob/v1.0.0/envelope.md> 
    200    """ 
    201 
    202    _TYPE = "application/vnd.in-toto+json" 
    203 
    204    def __init__(self, inner: _Envelope) -> None: 
    205        """ 
    206        @private 
    207        """ 
    208 
    209        self._inner = inner 
    210        self._verify() 
    211 
    212    def _verify(self) -> None: 
    213        """ 
    214        Verify and load the Envelope. 
    215        """ 
    216        if len(self._inner.signatures) != 1: 
    217            raise InvalidEnvelope("envelope must contain exactly one signature") 
    218 
    219        if not self._inner.signatures[0].sig: 
    220            raise InvalidEnvelope("envelope signature must be non-empty") 
    221 
    222        self._signature_bytes = self._inner.signatures[0].sig 
    223 
    224    @classmethod 
    225    def _from_json(cls, contents: bytes | str) -> Envelope: 
    226        """Return a DSSE envelope from the given JSON representation.""" 
    227        inner = _Envelope.from_json(contents) 
    228        return cls(inner) 
    229 
    230    def to_json(self) -> str: 
    231        """ 
    232        Return a JSON string with this DSSE envelope's contents. 
    233        """ 
    234        return self._inner.to_json() 
    235 
    236    def __eq__(self, other: object) -> bool: 
    237        """Equality for DSSE envelopes.""" 
    238 
    239        if not isinstance(other, Envelope): 
    240            return NotImplemented 
    241 
    242        return self._inner == other._inner 
    243 
    244    @property 
    245    def signature(self) -> bytes: 
    246        """Return the decoded bytes of the Envelope signature.""" 
    247        return self._signature_bytes 
    248 
    249 
    250def _pae(type_: str, body: bytes) -> bytes: 
    251    """ 
    252    Compute the PAE encoding for the given `type_` and `body`. 
    253    """ 
    254 
    255    # See: 
    256    # https://github.com/secure-systems-lab/dsse/blob/v1.0.0/envelope.md 
    257    # https://github.com/in-toto/attestation/blob/v1.0/spec/v1.0/envelope.md 
    258    pae = f"DSSEv1 {len(type_)} {type_} ".encode() 
    259    pae += b" ".join([str(len(body)).encode(), body]) 
    260    return pae 
    261 
    262 
    263def _sign(key: ec.EllipticCurvePrivateKey, stmt: Statement) -> Envelope: 
    264    """ 
    265    Sign for the given in-toto `Statement`, and encapsulate the resulting 
    266    signature in a DSSE `Envelope`. 
    267    """ 
    268    pae = stmt._pae() 
    269    _logger.debug(f"DSSE PAE: {pae!r}") 
    270 
    271    signature = key.sign(pae, ec.ECDSA(hashes.SHA256())) 
    272    return Envelope( 
    273        _Envelope( 
    274            payload=base64.b64encode(stmt._contents), 
    275            payload_type=Envelope._TYPE, 
    276            signatures=[_Signature(sig=base64.b64encode(signature))], 
    277        ) 
    278    ) 
    279 
    280 
    281def _verify(key: ec.EllipticCurvePublicKey, evp: Envelope) -> bytes: 
    282    """ 
    283    Verify the given in-toto `Envelope`, returning the verified inner payload. 
    284 
    285    This function does **not** check the envelope's payload type. The caller 
    286    is responsible for performing this check. 
    287    """ 
    288 
    289    pae = _pae(evp._inner.payload_type, evp._inner.payload) 
    290 
    291    nsigs = len(evp._inner.signatures) 
    292    if nsigs != 1: 
    293        raise VerificationError(f"DSSE: exactly 1 signature allowed, got {nsigs}") 
    294 
    295    signature = evp._inner.signatures[0].sig 
    296 
    297    try: 
    298        key.verify(signature, pae, ec.ECDSA(hashes.SHA256())) 
    299    except InvalidSignature: 
    300        raise VerificationError("DSSE: invalid signature") 
    301 
    302    return evp._inner.payload