Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sigstore/dsse/__init__.py: 44%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

115 statements  

1# Copyright 2022 The Sigstore Authors 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15""" 

16Functionality for building and manipulating in-toto Statements and DSSE envelopes. 

17""" 

18 

19from __future__ import annotations 

20 

21import logging 

22from typing import Any, Literal, Optional 

23 

24from cryptography.exceptions import InvalidSignature 

25from cryptography.hazmat.primitives import hashes 

26from cryptography.hazmat.primitives.asymmetric import ec 

27from pydantic import BaseModel, ConfigDict, Field, RootModel, StrictStr, ValidationError 

28from sigstore_protobuf_specs.dev.sigstore.common.v1 import HashAlgorithm 

29from sigstore_protobuf_specs.io.intoto import Envelope as _Envelope 

30from sigstore_protobuf_specs.io.intoto import Signature 

31 

32from sigstore.errors import Error, VerificationError 

33from sigstore.hashes import Hashed 

34 

35_logger = logging.getLogger(__name__) 

36 

37Digest = Literal["sha256", "sha384", "sha512", "sha3_256", "sha3_384", "sha3_512"] 

38""" 

39NOTE: in-toto's DigestSet contains all kinds of hash algorithms that 

40we intentionally do not support. This model is limited to common members of the 

41SHA-2 and SHA-3 family that are at least as strong as SHA-256. 

42 

43See: <https://github.com/in-toto/attestation/blob/main/spec/v1/digest_set.md> 

44""" 

45 

46DigestSet = RootModel[dict[Digest, str]] 

47""" 

48An internal validation model for in-toto subject digest sets. 

49""" 

50 

51 

52class Subject(BaseModel): 

53 """ 

54 A single in-toto statement subject. 

55 """ 

56 

57 name: Optional[StrictStr] # noqa: UP045 

58 digest: DigestSet = Field(...) 

59 

60 

61class _Statement(BaseModel): 

62 """ 

63 An internal validation model for in-toto statements. 

64 """ 

65 

66 model_config = ConfigDict(populate_by_name=True) 

67 

68 type_: Literal["https://in-toto.io/Statement/v1"] = Field(..., alias="_type") 

69 subjects: list[Subject] = Field(..., min_length=1, alias="subject") 

70 predicate_type: StrictStr = Field(..., alias="predicateType") 

71 predicate: Optional[dict[str, Any]] = Field(None, alias="predicate") # noqa: UP045 

72 

73 

74class Statement: 

75 """ 

76 Represents an in-toto statement. 

77 

78 This type deals with opaque bytes to ensure that the encoding does not 

79 change, but Statements are internally checked for conformance against 

80 the JSON object layout defined in the in-toto attestation spec. 

81 

82 See: <https://github.com/in-toto/attestation/blob/main/spec/v1/statement.md> 

83 """ 

84 

85 def __init__(self, contents: bytes | _Statement) -> None: 

86 """ 

87 Construct a new Statement. 

88 

89 This takes an opaque `bytes` containing the statement; use 

90 `StatementBuilder` to manually construct an in-toto statement 

91 from constituent pieces. 

92 """ 

93 if isinstance(contents, bytes): 

94 self._contents = contents 

95 try: 

96 self._inner = _Statement.model_validate_json(contents) 

97 except ValidationError: 

98 raise Error("malformed in-toto statement") 

99 else: 

100 self._contents = contents.model_dump_json(by_alias=True).encode() 

101 self._inner = contents 

102 

103 def _matches_digest(self, digest: Hashed) -> bool: 

104 """ 

105 Returns a boolean indicating whether this in-toto Statement contains a subject 

106 matching the given digest. The subject's name is **not** checked. 

107 

108 No digests other than SHA256 are currently supported. 

109 """ 

110 if digest.algorithm != HashAlgorithm.SHA2_256: 

111 raise VerificationError(f"unexpected digest algorithm: {digest.algorithm}") 

112 

113 for sub in self._inner.subjects: 

114 sub_digest = sub.digest.root.get("sha256") 

115 if sub_digest is None: 

116 continue 

117 if sub_digest == digest.digest.hex(): 

118 return True 

119 

120 return False 

121 

122 def _pae(self) -> bytes: 

123 """ 

124 Construct the PAE encoding for this statement. 

125 """ 

126 

127 return _pae(Envelope._TYPE, self._contents) 

128 

129 

130class StatementBuilder: 

131 """ 

132 A builder-style API for constructing in-toto Statements. 

133 """ 

134 

135 def __init__( 

136 self, 

137 subjects: list[Subject] | None = None, 

138 predicate_type: str | None = None, 

139 predicate: dict[str, Any] | None = None, 

140 ): 

141 """ 

142 Create a new `StatementBuilder`. 

143 """ 

144 self._subjects = subjects or [] 

145 self._predicate_type = predicate_type 

146 self._predicate = predicate 

147 

148 def subjects(self, subjects: list[Subject]) -> StatementBuilder: 

149 """ 

150 Configure the subjects for this builder. 

151 """ 

152 self._subjects = subjects 

153 return self 

154 

155 def predicate_type(self, predicate_type: str) -> StatementBuilder: 

156 """ 

157 Configure the predicate type for this builder. 

158 """ 

159 self._predicate_type = predicate_type 

160 return self 

161 

162 def predicate(self, predicate: dict[str, Any]) -> StatementBuilder: 

163 """ 

164 Configure the predicate for this builder. 

165 """ 

166 self._predicate = predicate 

167 return self 

168 

169 def build(self) -> Statement: 

170 """ 

171 Build a `Statement` from the builder's state. 

172 """ 

173 try: 

174 stmt = _Statement( 

175 type_="https://in-toto.io/Statement/v1", 

176 subjects=self._subjects, 

177 predicate_type=self._predicate_type, 

178 predicate=self._predicate, 

179 ) 

180 except ValidationError as e: 

181 raise Error(f"invalid statement: {e}") 

182 

183 return Statement(stmt) 

184 

185 

186class InvalidEnvelope(Error): 

187 """ 

188 Raised when the associated `Envelope` is invalid in some way. 

189 """ 

190 

191 

192class Envelope: 

193 """ 

194 Represents a DSSE envelope. 

195 

196 This class cannot be constructed directly; you must use `sign` or `from_json`. 

197 

198 See: <https://github.com/secure-systems-lab/dsse/blob/v1.0.0/envelope.md> 

199 """ 

200 

201 _TYPE = "application/vnd.in-toto+json" 

202 

203 def __init__(self, inner: _Envelope) -> None: 

204 """ 

205 @private 

206 """ 

207 

208 self._inner = inner 

209 self._verify() 

210 

211 def _verify(self) -> None: 

212 """ 

213 Verify and load the Envelope. 

214 """ 

215 if len(self._inner.signatures) != 1: 

216 raise InvalidEnvelope("envelope must contain exactly one signature") 

217 

218 if not self._inner.signatures[0].sig: 

219 raise InvalidEnvelope("envelope signature must be non-empty") 

220 

221 self._signature_bytes = self._inner.signatures[0].sig 

222 

223 @classmethod 

224 def _from_json(cls, contents: bytes | str) -> Envelope: 

225 """Return a DSSE envelope from the given JSON representation.""" 

226 inner = _Envelope().from_json(contents) 

227 return cls(inner) 

228 

229 def to_json(self) -> str: 

230 """ 

231 Return a JSON string with this DSSE envelope's contents. 

232 """ 

233 return self._inner.to_json() 

234 

235 def __eq__(self, other: object) -> bool: 

236 """Equality for DSSE envelopes.""" 

237 

238 if not isinstance(other, Envelope): 

239 return NotImplemented 

240 

241 return self._inner == other._inner 

242 

243 @property 

244 def signature(self) -> bytes: 

245 """Return the decoded bytes of the Envelope signature.""" 

246 return self._signature_bytes 

247 

248 

249def _pae(type_: str, body: bytes) -> bytes: 

250 """ 

251 Compute the PAE encoding for the given `type_` and `body`. 

252 """ 

253 

254 # See: 

255 # https://github.com/secure-systems-lab/dsse/blob/v1.0.0/envelope.md 

256 # https://github.com/in-toto/attestation/blob/v1.0/spec/v1.0/envelope.md 

257 pae = f"DSSEv1 {len(type_)} {type_} ".encode() 

258 pae += b" ".join([str(len(body)).encode(), body]) 

259 return pae 

260 

261 

262def _sign(key: ec.EllipticCurvePrivateKey, stmt: Statement) -> Envelope: 

263 """ 

264 Sign for the given in-toto `Statement`, and encapsulate the resulting 

265 signature in a DSSE `Envelope`. 

266 """ 

267 pae = stmt._pae() 

268 _logger.debug(f"DSSE PAE: {pae!r}") 

269 

270 signature = key.sign(pae, ec.ECDSA(hashes.SHA256())) 

271 return Envelope( 

272 _Envelope( 

273 payload=stmt._contents, 

274 payload_type=Envelope._TYPE, 

275 signatures=[Signature(sig=signature)], 

276 ) 

277 ) 

278 

279 

280def _verify(key: ec.EllipticCurvePublicKey, evp: Envelope) -> bytes: 

281 """ 

282 Verify the given in-toto `Envelope`, returning the verified inner payload. 

283 

284 This function does **not** check the envelope's payload type. The caller 

285 is responsible for performing this check. 

286 """ 

287 

288 pae = _pae(evp._inner.payload_type, evp._inner.payload) 

289 

290 nsigs = len(evp._inner.signatures) 

291 if nsigs != 1: 

292 raise VerificationError(f"DSSE: exactly 1 signature allowed, got {nsigs}") 

293 

294 signature = evp._inner.signatures[0].sig 

295 

296 try: 

297 key.verify(signature, pae, ec.ECDSA(hashes.SHA256())) 

298 except InvalidSignature: 

299 raise VerificationError("DSSE: invalid signature") 

300 

301 return evp._inner.payload