Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sigstore/dsse/__init__.py: 45%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

116 statements  

1# Copyright 2022 The Sigstore Authors 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15""" 

16Functionality for building and manipulating in-toto Statements and DSSE envelopes. 

17""" 

18 

19from __future__ import annotations 

20 

21import base64 

22import logging 

23from typing import Any, Literal, Optional 

24 

25from cryptography.exceptions import InvalidSignature 

26from cryptography.hazmat.primitives import hashes 

27from cryptography.hazmat.primitives.asymmetric import ec 

28from pydantic import BaseModel, ConfigDict, Field, RootModel, StrictStr, ValidationError 

29from sigstore_models.common.v1 import HashAlgorithm 

30from sigstore_models.intoto import Envelope as _Envelope 

31from sigstore_models.intoto import Signature as _Signature 

32 

33from sigstore.errors import Error, VerificationError 

34from sigstore.hashes import Hashed 

35 

36_logger = logging.getLogger(__name__) 

37 

38Digest = Literal["sha256", "sha384", "sha512", "sha3_256", "sha3_384", "sha3_512"] 

39""" 

40NOTE: in-toto's DigestSet contains all kinds of hash algorithms that 

41we intentionally do not support. This model is limited to common members of the 

42SHA-2 and SHA-3 family that are at least as strong as SHA-256. 

43 

44See: <https://github.com/in-toto/attestation/blob/main/spec/v1/digest_set.md> 

45""" 

46 

47DigestSet = RootModel[dict[Digest, str]] 

48""" 

49An internal validation model for in-toto subject digest sets. 

50""" 

51 

52 

53class Subject(BaseModel): 

54 """ 

55 A single in-toto statement subject. 

56 """ 

57 

58 name: Optional[StrictStr] # noqa: UP045 

59 digest: DigestSet = Field(...) 

60 

61 

62class _Statement(BaseModel): 

63 """ 

64 An internal validation model for in-toto statements. 

65 """ 

66 

67 model_config = ConfigDict(populate_by_name=True) 

68 

69 type_: Literal["https://in-toto.io/Statement/v1"] = Field(..., alias="_type") 

70 subjects: list[Subject] = Field(..., min_length=1, alias="subject") 

71 predicate_type: StrictStr = Field(..., alias="predicateType") 

72 predicate: Optional[dict[str, Any]] = Field(None, alias="predicate") # noqa: UP045 

73 

74 

75class Statement: 

76 """ 

77 Represents an in-toto statement. 

78 

79 This type deals with opaque bytes to ensure that the encoding does not 

80 change, but Statements are internally checked for conformance against 

81 the JSON object layout defined in the in-toto attestation spec. 

82 

83 See: <https://github.com/in-toto/attestation/blob/main/spec/v1/statement.md> 

84 """ 

85 

86 def __init__(self, contents: bytes | _Statement) -> None: 

87 """ 

88 Construct a new Statement. 

89 

90 This takes an opaque `bytes` containing the statement; use 

91 `StatementBuilder` to manually construct an in-toto statement 

92 from constituent pieces. 

93 """ 

94 if isinstance(contents, bytes): 

95 self._contents = contents 

96 try: 

97 self._inner = _Statement.model_validate_json(contents) 

98 except ValidationError: 

99 raise Error("malformed in-toto statement") 

100 else: 

101 self._contents = contents.model_dump_json(by_alias=True).encode() 

102 self._inner = contents 

103 

104 def _matches_digest(self, digest: Hashed) -> bool: 

105 """ 

106 Returns a boolean indicating whether this in-toto Statement contains a subject 

107 matching the given digest. The subject's name is **not** checked. 

108 

109 No digests other than SHA256 are currently supported. 

110 """ 

111 if digest.algorithm != HashAlgorithm.SHA2_256: 

112 raise VerificationError(f"unexpected digest algorithm: {digest.algorithm}") 

113 

114 for sub in self._inner.subjects: 

115 sub_digest = sub.digest.root.get("sha256") 

116 if sub_digest is None: 

117 continue 

118 if sub_digest == digest.digest.hex(): 

119 return True 

120 

121 return False 

122 

123 def _pae(self) -> bytes: 

124 """ 

125 Construct the PAE encoding for this statement. 

126 """ 

127 

128 return _pae(Envelope._TYPE, self._contents) 

129 

130 

131class StatementBuilder: 

132 """ 

133 A builder-style API for constructing in-toto Statements. 

134 """ 

135 

136 def __init__( 

137 self, 

138 subjects: list[Subject] | None = None, 

139 predicate_type: str | None = None, 

140 predicate: dict[str, Any] | None = None, 

141 ): 

142 """ 

143 Create a new `StatementBuilder`. 

144 """ 

145 self._subjects = subjects or [] 

146 self._predicate_type = predicate_type 

147 self._predicate = predicate 

148 

149 def subjects(self, subjects: list[Subject]) -> StatementBuilder: 

150 """ 

151 Configure the subjects for this builder. 

152 """ 

153 self._subjects = subjects 

154 return self 

155 

156 def predicate_type(self, predicate_type: str) -> StatementBuilder: 

157 """ 

158 Configure the predicate type for this builder. 

159 """ 

160 self._predicate_type = predicate_type 

161 return self 

162 

163 def predicate(self, predicate: dict[str, Any]) -> StatementBuilder: 

164 """ 

165 Configure the predicate for this builder. 

166 """ 

167 self._predicate = predicate 

168 return self 

169 

170 def build(self) -> Statement: 

171 """ 

172 Build a `Statement` from the builder's state. 

173 """ 

174 try: 

175 stmt = _Statement( 

176 type_="https://in-toto.io/Statement/v1", 

177 subjects=self._subjects, 

178 predicate_type=self._predicate_type, 

179 predicate=self._predicate, 

180 ) 

181 except ValidationError as e: 

182 raise Error(f"invalid statement: {e}") 

183 

184 return Statement(stmt) 

185 

186 

187class InvalidEnvelope(Error): 

188 """ 

189 Raised when the associated `Envelope` is invalid in some way. 

190 """ 

191 

192 

193class Envelope: 

194 """ 

195 Represents a DSSE envelope. 

196 

197 This class cannot be constructed directly; you must use `sign` or `from_json`. 

198 

199 See: <https://github.com/secure-systems-lab/dsse/blob/v1.0.0/envelope.md> 

200 """ 

201 

202 _TYPE = "application/vnd.in-toto+json" 

203 

204 def __init__(self, inner: _Envelope) -> None: 

205 """ 

206 @private 

207 """ 

208 

209 self._inner = inner 

210 self._verify() 

211 

212 def _verify(self) -> None: 

213 """ 

214 Verify and load the Envelope. 

215 """ 

216 if len(self._inner.signatures) != 1: 

217 raise InvalidEnvelope("envelope must contain exactly one signature") 

218 

219 if not self._inner.signatures[0].sig: 

220 raise InvalidEnvelope("envelope signature must be non-empty") 

221 

222 self._signature_bytes = self._inner.signatures[0].sig 

223 

224 @classmethod 

225 def _from_json(cls, contents: bytes | str) -> Envelope: 

226 """Return a DSSE envelope from the given JSON representation.""" 

227 inner = _Envelope.from_json(contents) 

228 return cls(inner) 

229 

230 def to_json(self) -> str: 

231 """ 

232 Return a JSON string with this DSSE envelope's contents. 

233 """ 

234 return self._inner.to_json() 

235 

236 def __eq__(self, other: object) -> bool: 

237 """Equality for DSSE envelopes.""" 

238 

239 if not isinstance(other, Envelope): 

240 return NotImplemented 

241 

242 return self._inner == other._inner 

243 

244 @property 

245 def signature(self) -> bytes: 

246 """Return the decoded bytes of the Envelope signature.""" 

247 return self._signature_bytes 

248 

249 

250def _pae(type_: str, body: bytes) -> bytes: 

251 """ 

252 Compute the PAE encoding for the given `type_` and `body`. 

253 """ 

254 

255 # See: 

256 # https://github.com/secure-systems-lab/dsse/blob/v1.0.0/envelope.md 

257 # https://github.com/in-toto/attestation/blob/v1.0/spec/v1.0/envelope.md 

258 pae = f"DSSEv1 {len(type_)} {type_} ".encode() 

259 pae += b" ".join([str(len(body)).encode(), body]) 

260 return pae 

261 

262 

263def _sign(key: ec.EllipticCurvePrivateKey, stmt: Statement) -> Envelope: 

264 """ 

265 Sign for the given in-toto `Statement`, and encapsulate the resulting 

266 signature in a DSSE `Envelope`. 

267 """ 

268 pae = stmt._pae() 

269 _logger.debug(f"DSSE PAE: {pae!r}") 

270 

271 signature = key.sign(pae, ec.ECDSA(hashes.SHA256())) 

272 return Envelope( 

273 _Envelope( 

274 payload=base64.b64encode(stmt._contents), 

275 payload_type=Envelope._TYPE, 

276 signatures=[_Signature(sig=base64.b64encode(signature))], 

277 ) 

278 ) 

279 

280 

281def _verify(key: ec.EllipticCurvePublicKey, evp: Envelope) -> bytes: 

282 """ 

283 Verify the given in-toto `Envelope`, returning the verified inner payload. 

284 

285 This function does **not** check the envelope's payload type. The caller 

286 is responsible for performing this check. 

287 """ 

288 

289 pae = _pae(evp._inner.payload_type, evp._inner.payload) 

290 

291 nsigs = len(evp._inner.signatures) 

292 if nsigs != 1: 

293 raise VerificationError(f"DSSE: exactly 1 signature allowed, got {nsigs}") 

294 

295 signature = evp._inner.signatures[0].sig 

296 

297 try: 

298 key.verify(signature, pae, ec.ECDSA(hashes.SHA256())) 

299 except InvalidSignature: 

300 raise VerificationError("DSSE: invalid signature") 

301 

302 return evp._inner.payload