1# Copyright 2022 The Sigstore Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""
16Functionality for building and manipulating in-toto Statements and DSSE envelopes.
17"""
18
19from __future__ import annotations
20
21import base64
22import logging
23from typing import Any, Literal, Optional
24
25from cryptography.exceptions import InvalidSignature
26from cryptography.hazmat.primitives import hashes
27from cryptography.hazmat.primitives.asymmetric import ec
28from pydantic import BaseModel, ConfigDict, Field, RootModel, StrictStr, ValidationError
29from sigstore_models.common.v1 import HashAlgorithm
30from sigstore_models.intoto import Envelope as _Envelope
31from sigstore_models.intoto import Signature as _Signature
32
33from sigstore.errors import Error, VerificationError
34from sigstore.hashes import Hashed
35
36_logger = logging.getLogger(__name__)
37
38Digest = Literal["sha256", "sha384", "sha512", "sha3_256", "sha3_384", "sha3_512"]
39"""
40NOTE: in-toto's DigestSet contains all kinds of hash algorithms that
41we intentionally do not support. This model is limited to common members of the
42SHA-2 and SHA-3 family that are at least as strong as SHA-256.
43
44See: <https://github.com/in-toto/attestation/blob/main/spec/v1/digest_set.md>
45"""
46
47DigestSet = RootModel[dict[Digest, str]]
48"""
49An internal validation model for in-toto subject digest sets.
50"""
51
52
53class Subject(BaseModel):
54 """
55 A single in-toto statement subject.
56 """
57
58 name: Optional[StrictStr] # noqa: UP045
59 digest: DigestSet = Field(...)
60
61
62class _Statement(BaseModel):
63 """
64 An internal validation model for in-toto statements.
65 """
66
67 model_config = ConfigDict(populate_by_name=True)
68
69 type_: Literal["https://in-toto.io/Statement/v1"] = Field(..., alias="_type")
70 subjects: list[Subject] = Field(..., min_length=1, alias="subject")
71 predicate_type: StrictStr = Field(..., alias="predicateType")
72 predicate: Optional[dict[str, Any]] = Field(None, alias="predicate") # noqa: UP045
73
74
75class Statement:
76 """
77 Represents an in-toto statement.
78
79 This type deals with opaque bytes to ensure that the encoding does not
80 change, but Statements are internally checked for conformance against
81 the JSON object layout defined in the in-toto attestation spec.
82
83 See: <https://github.com/in-toto/attestation/blob/main/spec/v1/statement.md>
84 """
85
86 def __init__(self, contents: bytes | _Statement) -> None:
87 """
88 Construct a new Statement.
89
90 This takes an opaque `bytes` containing the statement; use
91 `StatementBuilder` to manually construct an in-toto statement
92 from constituent pieces.
93 """
94 if isinstance(contents, bytes):
95 self._contents = contents
96 try:
97 self._inner = _Statement.model_validate_json(contents)
98 except ValidationError:
99 raise Error("malformed in-toto statement")
100 else:
101 self._contents = contents.model_dump_json(by_alias=True).encode()
102 self._inner = contents
103
104 def _matches_digest(self, digest: Hashed) -> bool:
105 """
106 Returns a boolean indicating whether this in-toto Statement contains a subject
107 matching the given digest. The subject's name is **not** checked.
108
109 No digests other than SHA256 are currently supported.
110 """
111 if digest.algorithm != HashAlgorithm.SHA2_256:
112 raise VerificationError(f"unexpected digest algorithm: {digest.algorithm}")
113
114 for sub in self._inner.subjects:
115 sub_digest = sub.digest.root.get("sha256")
116 if sub_digest is None:
117 continue
118 if sub_digest == digest.digest.hex():
119 return True
120
121 return False
122
123 def _pae(self) -> bytes:
124 """
125 Construct the PAE encoding for this statement.
126 """
127
128 return _pae(Envelope._TYPE, self._contents)
129
130
131class StatementBuilder:
132 """
133 A builder-style API for constructing in-toto Statements.
134 """
135
136 def __init__(
137 self,
138 subjects: list[Subject] | None = None,
139 predicate_type: str | None = None,
140 predicate: dict[str, Any] | None = None,
141 ):
142 """
143 Create a new `StatementBuilder`.
144 """
145 self._subjects = subjects or []
146 self._predicate_type = predicate_type
147 self._predicate = predicate
148
149 def subjects(self, subjects: list[Subject]) -> StatementBuilder:
150 """
151 Configure the subjects for this builder.
152 """
153 self._subjects = subjects
154 return self
155
156 def predicate_type(self, predicate_type: str) -> StatementBuilder:
157 """
158 Configure the predicate type for this builder.
159 """
160 self._predicate_type = predicate_type
161 return self
162
163 def predicate(self, predicate: dict[str, Any]) -> StatementBuilder:
164 """
165 Configure the predicate for this builder.
166 """
167 self._predicate = predicate
168 return self
169
170 def build(self) -> Statement:
171 """
172 Build a `Statement` from the builder's state.
173 """
174 try:
175 stmt = _Statement(
176 type_="https://in-toto.io/Statement/v1",
177 subjects=self._subjects,
178 predicate_type=self._predicate_type,
179 predicate=self._predicate,
180 )
181 except ValidationError as e:
182 raise Error(f"invalid statement: {e}")
183
184 return Statement(stmt)
185
186
187class InvalidEnvelope(Error):
188 """
189 Raised when the associated `Envelope` is invalid in some way.
190 """
191
192
193class Envelope:
194 """
195 Represents a DSSE envelope.
196
197 This class cannot be constructed directly; you must use `sign` or `from_json`.
198
199 See: <https://github.com/secure-systems-lab/dsse/blob/v1.0.0/envelope.md>
200 """
201
202 _TYPE = "application/vnd.in-toto+json"
203
204 def __init__(self, inner: _Envelope) -> None:
205 """
206 @private
207 """
208
209 self._inner = inner
210 self._verify()
211
212 def _verify(self) -> None:
213 """
214 Verify and load the Envelope.
215 """
216 if len(self._inner.signatures) != 1:
217 raise InvalidEnvelope("envelope must contain exactly one signature")
218
219 if not self._inner.signatures[0].sig:
220 raise InvalidEnvelope("envelope signature must be non-empty")
221
222 self._signature_bytes = self._inner.signatures[0].sig
223
224 @classmethod
225 def _from_json(cls, contents: bytes | str) -> Envelope:
226 """Return a DSSE envelope from the given JSON representation."""
227 inner = _Envelope.from_json(contents)
228 return cls(inner)
229
230 def to_json(self) -> str:
231 """
232 Return a JSON string with this DSSE envelope's contents.
233 """
234 return self._inner.to_json()
235
236 def __eq__(self, other: object) -> bool:
237 """Equality for DSSE envelopes."""
238
239 if not isinstance(other, Envelope):
240 return NotImplemented
241
242 return self._inner == other._inner
243
244 @property
245 def signature(self) -> bytes:
246 """Return the decoded bytes of the Envelope signature."""
247 return self._signature_bytes
248
249
250def _pae(type_: str, body: bytes) -> bytes:
251 """
252 Compute the PAE encoding for the given `type_` and `body`.
253 """
254
255 # See:
256 # https://github.com/secure-systems-lab/dsse/blob/v1.0.0/envelope.md
257 # https://github.com/in-toto/attestation/blob/v1.0/spec/v1.0/envelope.md
258 pae = f"DSSEv1 {len(type_)} {type_} ".encode()
259 pae += b" ".join([str(len(body)).encode(), body])
260 return pae
261
262
263def _sign(key: ec.EllipticCurvePrivateKey, stmt: Statement) -> Envelope:
264 """
265 Sign for the given in-toto `Statement`, and encapsulate the resulting
266 signature in a DSSE `Envelope`.
267 """
268 pae = stmt._pae()
269 _logger.debug(f"DSSE PAE: {pae!r}")
270
271 signature = key.sign(pae, ec.ECDSA(hashes.SHA256()))
272 return Envelope(
273 _Envelope(
274 payload=base64.b64encode(stmt._contents),
275 payload_type=Envelope._TYPE,
276 signatures=[_Signature(sig=base64.b64encode(signature))],
277 )
278 )
279
280
281def _verify(key: ec.EllipticCurvePublicKey, evp: Envelope) -> bytes:
282 """
283 Verify the given in-toto `Envelope`, returning the verified inner payload.
284
285 This function does **not** check the envelope's payload type. The caller
286 is responsible for performing this check.
287 """
288
289 pae = _pae(evp._inner.payload_type, evp._inner.payload)
290
291 nsigs = len(evp._inner.signatures)
292 if nsigs != 1:
293 raise VerificationError(f"DSSE: exactly 1 signature allowed, got {nsigs}")
294
295 signature = evp._inner.signatures[0].sig
296
297 try:
298 key.verify(signature, pae, ec.ECDSA(hashes.SHA256()))
299 except InvalidSignature:
300 raise VerificationError("DSSE: invalid signature")
301
302 return evp._inner.payload