1# Copyright 2022 The Sigstore Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""
16Functionality for building and manipulating in-toto Statements and DSSE envelopes.
17"""
18
19from __future__ import annotations
20
21import logging
22from typing import Any, Literal, Optional
23
24from cryptography.exceptions import InvalidSignature
25from cryptography.hazmat.primitives import hashes
26from cryptography.hazmat.primitives.asymmetric import ec
27from pydantic import BaseModel, ConfigDict, Field, RootModel, StrictStr, ValidationError
28from sigstore_protobuf_specs.dev.sigstore.common.v1 import HashAlgorithm
29from sigstore_protobuf_specs.io.intoto import Envelope as _Envelope
30from sigstore_protobuf_specs.io.intoto import Signature
31
32from sigstore.errors import Error, VerificationError
33from sigstore.hashes import Hashed
34
35_logger = logging.getLogger(__name__)
36
37Digest = Literal["sha256", "sha384", "sha512", "sha3_256", "sha3_384", "sha3_512"]
38"""
39NOTE: in-toto's DigestSet contains all kinds of hash algorithms that
40we intentionally do not support. This model is limited to common members of the
41SHA-2 and SHA-3 family that are at least as strong as SHA-256.
42
43See: <https://github.com/in-toto/attestation/blob/main/spec/v1/digest_set.md>
44"""
45
46DigestSet = RootModel[dict[Digest, str]]
47"""
48An internal validation model for in-toto subject digest sets.
49"""
50
51
52class Subject(BaseModel):
53 """
54 A single in-toto statement subject.
55 """
56
57 name: Optional[StrictStr] # noqa: UP045
58 digest: DigestSet = Field(...)
59
60
61class _Statement(BaseModel):
62 """
63 An internal validation model for in-toto statements.
64 """
65
66 model_config = ConfigDict(populate_by_name=True)
67
68 type_: Literal["https://in-toto.io/Statement/v1"] = Field(..., alias="_type")
69 subjects: list[Subject] = Field(..., min_length=1, alias="subject")
70 predicate_type: StrictStr = Field(..., alias="predicateType")
71 predicate: Optional[dict[str, Any]] = Field(None, alias="predicate") # noqa: UP045
72
73
74class Statement:
75 """
76 Represents an in-toto statement.
77
78 This type deals with opaque bytes to ensure that the encoding does not
79 change, but Statements are internally checked for conformance against
80 the JSON object layout defined in the in-toto attestation spec.
81
82 See: <https://github.com/in-toto/attestation/blob/main/spec/v1/statement.md>
83 """
84
85 def __init__(self, contents: bytes | _Statement) -> None:
86 """
87 Construct a new Statement.
88
89 This takes an opaque `bytes` containing the statement; use
90 `StatementBuilder` to manually construct an in-toto statement
91 from constituent pieces.
92 """
93 if isinstance(contents, bytes):
94 self._contents = contents
95 try:
96 self._inner = _Statement.model_validate_json(contents)
97 except ValidationError:
98 raise Error("malformed in-toto statement")
99 else:
100 self._contents = contents.model_dump_json(by_alias=True).encode()
101 self._inner = contents
102
103 def _matches_digest(self, digest: Hashed) -> bool:
104 """
105 Returns a boolean indicating whether this in-toto Statement contains a subject
106 matching the given digest. The subject's name is **not** checked.
107
108 No digests other than SHA256 are currently supported.
109 """
110 if digest.algorithm != HashAlgorithm.SHA2_256:
111 raise VerificationError(f"unexpected digest algorithm: {digest.algorithm}")
112
113 for sub in self._inner.subjects:
114 sub_digest = sub.digest.root.get("sha256")
115 if sub_digest is None:
116 continue
117 if sub_digest == digest.digest.hex():
118 return True
119
120 return False
121
122 def _pae(self) -> bytes:
123 """
124 Construct the PAE encoding for this statement.
125 """
126
127 return _pae(Envelope._TYPE, self._contents)
128
129
130class StatementBuilder:
131 """
132 A builder-style API for constructing in-toto Statements.
133 """
134
135 def __init__(
136 self,
137 subjects: list[Subject] | None = None,
138 predicate_type: str | None = None,
139 predicate: dict[str, Any] | None = None,
140 ):
141 """
142 Create a new `StatementBuilder`.
143 """
144 self._subjects = subjects or []
145 self._predicate_type = predicate_type
146 self._predicate = predicate
147
148 def subjects(self, subjects: list[Subject]) -> StatementBuilder:
149 """
150 Configure the subjects for this builder.
151 """
152 self._subjects = subjects
153 return self
154
155 def predicate_type(self, predicate_type: str) -> StatementBuilder:
156 """
157 Configure the predicate type for this builder.
158 """
159 self._predicate_type = predicate_type
160 return self
161
162 def predicate(self, predicate: dict[str, Any]) -> StatementBuilder:
163 """
164 Configure the predicate for this builder.
165 """
166 self._predicate = predicate
167 return self
168
169 def build(self) -> Statement:
170 """
171 Build a `Statement` from the builder's state.
172 """
173 try:
174 stmt = _Statement(
175 type_="https://in-toto.io/Statement/v1",
176 subjects=self._subjects,
177 predicate_type=self._predicate_type,
178 predicate=self._predicate,
179 )
180 except ValidationError as e:
181 raise Error(f"invalid statement: {e}")
182
183 return Statement(stmt)
184
185
186class InvalidEnvelope(Error):
187 """
188 Raised when the associated `Envelope` is invalid in some way.
189 """
190
191
192class Envelope:
193 """
194 Represents a DSSE envelope.
195
196 This class cannot be constructed directly; you must use `sign` or `from_json`.
197
198 See: <https://github.com/secure-systems-lab/dsse/blob/v1.0.0/envelope.md>
199 """
200
201 _TYPE = "application/vnd.in-toto+json"
202
203 def __init__(self, inner: _Envelope) -> None:
204 """
205 @private
206 """
207
208 self._inner = inner
209 self._verify()
210
211 def _verify(self) -> None:
212 """
213 Verify and load the Envelope.
214 """
215 if len(self._inner.signatures) != 1:
216 raise InvalidEnvelope("envelope must contain exactly one signature")
217
218 if not self._inner.signatures[0].sig:
219 raise InvalidEnvelope("envelope signature must be non-empty")
220
221 self._signature_bytes = self._inner.signatures[0].sig
222
223 @classmethod
224 def _from_json(cls, contents: bytes | str) -> Envelope:
225 """Return a DSSE envelope from the given JSON representation."""
226 inner = _Envelope().from_json(contents)
227 return cls(inner)
228
229 def to_json(self) -> str:
230 """
231 Return a JSON string with this DSSE envelope's contents.
232 """
233 return self._inner.to_json()
234
235 def __eq__(self, other: object) -> bool:
236 """Equality for DSSE envelopes."""
237
238 if not isinstance(other, Envelope):
239 return NotImplemented
240
241 return self._inner == other._inner
242
243 @property
244 def signature(self) -> bytes:
245 """Return the decoded bytes of the Envelope signature."""
246 return self._signature_bytes
247
248
249def _pae(type_: str, body: bytes) -> bytes:
250 """
251 Compute the PAE encoding for the given `type_` and `body`.
252 """
253
254 # See:
255 # https://github.com/secure-systems-lab/dsse/blob/v1.0.0/envelope.md
256 # https://github.com/in-toto/attestation/blob/v1.0/spec/v1.0/envelope.md
257 pae = f"DSSEv1 {len(type_)} {type_} ".encode()
258 pae += b" ".join([str(len(body)).encode(), body])
259 return pae
260
261
262def _sign(key: ec.EllipticCurvePrivateKey, stmt: Statement) -> Envelope:
263 """
264 Sign for the given in-toto `Statement`, and encapsulate the resulting
265 signature in a DSSE `Envelope`.
266 """
267 pae = stmt._pae()
268 _logger.debug(f"DSSE PAE: {pae!r}")
269
270 signature = key.sign(pae, ec.ECDSA(hashes.SHA256()))
271 return Envelope(
272 _Envelope(
273 payload=stmt._contents,
274 payload_type=Envelope._TYPE,
275 signatures=[Signature(sig=signature)],
276 )
277 )
278
279
280def _verify(key: ec.EllipticCurvePublicKey, evp: Envelope) -> bytes:
281 """
282 Verify the given in-toto `Envelope`, returning the verified inner payload.
283
284 This function does **not** check the envelope's payload type. The caller
285 is responsible for performing this check.
286 """
287
288 pae = _pae(evp._inner.payload_type, evp._inner.payload)
289
290 nsigs = len(evp._inner.signatures)
291 if nsigs != 1:
292 raise VerificationError(f"DSSE: exactly 1 signature allowed, got {nsigs}")
293
294 signature = evp._inner.signatures[0].sig
295
296 try:
297 key.verify(signature, pae, ec.ECDSA(hashes.SHA256()))
298 except InvalidSignature:
299 raise VerificationError("DSSE: invalid signature")
300
301 return evp._inner.payload