Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sigstore/models.py: 42%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

197 statements  

1# Copyright 2022 The Sigstore Authors 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15""" 

16Common models shared between signing and verification. 

17""" 

18 

19from __future__ import annotations 

20 

21import base64 

22import logging 

23import typing 

24from enum import Enum 

25from textwrap import dedent 

26from typing import Any 

27 

28import rfc8785 

29from cryptography.hazmat.primitives.serialization import Encoding 

30from cryptography.x509 import ( 

31 Certificate, 

32 load_der_x509_certificate, 

33) 

34from pydantic import TypeAdapter 

35from rekor_types import Dsse, Hashedrekord, ProposedEntry 

36from rfc3161_client import TimeStampResponse, decode_timestamp_response 

37from sigstore_models.bundle import v1 as bundle_v1 

38from sigstore_models.bundle.v1 import Bundle as _Bundle 

39from sigstore_models.bundle.v1 import ( 

40 TimestampVerificationData as _TimestampVerificationData, 

41) 

42from sigstore_models.bundle.v1 import VerificationMaterial as _VerificationMaterial 

43from sigstore_models.common import v1 as common_v1 

44from sigstore_models.common.v1 import MessageSignature, RFC3161SignedTimestamp 

45from sigstore_models.rekor import v1 as rekor_v1 

46from sigstore_models.rekor.v1 import TransparencyLogEntry as _TransparencyLogEntry 

47 

48from sigstore import dsse 

49from sigstore._internal.merkle import verify_merkle_inclusion 

50from sigstore._internal.rekor.checkpoint import verify_checkpoint 

51from sigstore._utils import ( 

52 KeyID, 

53 cert_is_leaf, 

54 cert_is_root_ca, 

55) 

56from sigstore.errors import Error, VerificationError 

57 

58if typing.TYPE_CHECKING: 

59 from sigstore._internal.trust import RekorKeyring 

60 

61 

62_logger = logging.getLogger(__name__) 

63 

64 

65class TransparencyLogEntry: 

66 """ 

67 Represents a transparency log entry. 

68 """ 

69 

70 def __init__(self, inner: _TransparencyLogEntry) -> None: 

71 """ 

72 Creates a new `TransparencyLogEntry` from the given inner object. 

73 

74 @private 

75 """ 

76 self._inner = inner 

77 self._validate() 

78 

79 def _validate(self) -> None: 

80 """ 

81 Ensure this transparency log entry is well-formed and upholds our 

82 client invariants. 

83 """ 

84 

85 inclusion_proof: rekor_v1.InclusionProof | None = self._inner.inclusion_proof 

86 # This check is required by us as the client, not the 

87 # protobuf-specs themselves. 

88 if not inclusion_proof or not inclusion_proof.checkpoint: 

89 raise InvalidBundle("entry must contain inclusion proof, with checkpoint") 

90 

91 def __eq__(self, value: object) -> bool: 

92 """ 

93 Compares this `TransparencyLogEntry` with another object for equality. 

94 

95 Two `TransparencyLogEntry` instances are considered equal if their 

96 inner contents are equal. 

97 """ 

98 if not isinstance(value, TransparencyLogEntry): 

99 return NotImplemented 

100 return self._inner == value._inner 

101 

102 @classmethod 

103 def _from_v1_response(cls, dict_: dict[str, Any]) -> TransparencyLogEntry: 

104 """ 

105 Create a new `TransparencyLogEntry` from the given API response. 

106 """ 

107 

108 # Assumes we only get one entry back 

109 entries = list(dict_.items()) 

110 if len(entries) != 1: 

111 raise ValueError("Received multiple entries in response") 

112 _, entry = entries[0] 

113 

114 # Fill in the appropriate kind 

115 body_entry: ProposedEntry = TypeAdapter(ProposedEntry).validate_json( 

116 base64.b64decode(entry["body"]) 

117 ) 

118 if not isinstance(body_entry, (Hashedrekord, Dsse)): 

119 raise InvalidBundle("log entry is not of expected type") 

120 

121 raw_inclusion_proof = entry["verification"]["inclusionProof"] 

122 

123 # NOTE: The type ignores below are a consequence of our Pydantic 

124 # modeling: mypy and other typecheckers see `ProtoU64` as `int`, 

125 # but it gets coerced from a string due to Protobuf's JSON serialization. 

126 inner = _TransparencyLogEntry( 

127 log_index=str(entry["logIndex"]), # type: ignore[arg-type] 

128 log_id=common_v1.LogId( 

129 key_id=base64.b64encode(bytes.fromhex(entry["logID"])) 

130 ), 

131 kind_version=rekor_v1.KindVersion( 

132 kind=body_entry.kind, version=body_entry.api_version 

133 ), 

134 integrated_time=str(entry["integratedTime"]), # type: ignore[arg-type] 

135 inclusion_promise=rekor_v1.InclusionPromise( 

136 signed_entry_timestamp=entry["verification"]["signedEntryTimestamp"] 

137 ), 

138 inclusion_proof=rekor_v1.InclusionProof( 

139 log_index=str(raw_inclusion_proof["logIndex"]), # type: ignore[arg-type] 

140 root_hash=base64.b64encode( 

141 bytes.fromhex(raw_inclusion_proof["rootHash"]) 

142 ), 

143 tree_size=str(raw_inclusion_proof["treeSize"]), # type: ignore[arg-type] 

144 hashes=[ 

145 base64.b64encode(bytes.fromhex(h)) 

146 for h in raw_inclusion_proof["hashes"] 

147 ], 

148 checkpoint=rekor_v1.Checkpoint( 

149 envelope=raw_inclusion_proof["checkpoint"] 

150 ), 

151 ), 

152 canonicalized_body=entry["body"], 

153 ) 

154 

155 return cls(inner) 

156 

157 def _encode_canonical(self) -> bytes: 

158 """ 

159 Returns a canonicalized JSON (RFC 8785) representation of the transparency log entry. 

160 

161 This encoded representation is suitable for verification against 

162 the Signed Entry Timestamp. 

163 """ 

164 # We might not have an integrated time if our log entry is from rekor 

165 # v2, i.e. was integrated synchronously instead of via an 

166 # inclusion promise. 

167 if self._inner.integrated_time is None: 

168 raise ValueError( 

169 "can't encode canonical form for SET without integrated time" 

170 ) 

171 

172 payload: dict[str, int | str] = { 

173 "body": base64.b64encode(self._inner.canonicalized_body).decode(), 

174 "integratedTime": self._inner.integrated_time, 

175 "logID": self._inner.log_id.key_id.hex(), 

176 "logIndex": self._inner.log_index, 

177 } 

178 

179 return rfc8785.dumps(payload) 

180 

181 def _verify_set(self, keyring: RekorKeyring) -> None: 

182 """ 

183 Verify the inclusion promise (Signed Entry Timestamp) for a given transparency log 

184 `entry` using the given `keyring`. 

185 

186 Fails if the given log entry does not contain an inclusion promise. 

187 """ 

188 

189 if self._inner.inclusion_promise is None: 

190 raise VerificationError("SET: invalid inclusion promise: missing") 

191 

192 signed_entry_ts = self._inner.inclusion_promise.signed_entry_timestamp 

193 

194 try: 

195 keyring.verify( 

196 key_id=KeyID(self._inner.log_id.key_id), 

197 signature=signed_entry_ts, 

198 data=self._encode_canonical(), 

199 ) 

200 except VerificationError as exc: 

201 raise VerificationError(f"SET: invalid inclusion promise: {exc}") 

202 

203 def _verify(self, keyring: RekorKeyring) -> None: 

204 """ 

205 Verifies this log entry. 

206 

207 This method performs steps (5), (6), and optionally (7) in 

208 the top-level verify API: 

209 

210 * Verifies the consistency of the entry with the given bundle; 

211 * Verifies the Merkle inclusion proof and its signed checkpoint; 

212 * Verifies the inclusion promise, if present. 

213 """ 

214 

215 verify_merkle_inclusion(self) 

216 verify_checkpoint(keyring, self) 

217 

218 _logger.debug( 

219 f"successfully verified inclusion proof: index={self._inner.log_index}" 

220 ) 

221 

222 if self._inner.inclusion_promise and self._inner.integrated_time: 

223 self._verify_set(keyring) 

224 _logger.debug( 

225 f"successfully verified inclusion promise: index={self._inner.log_index}" 

226 ) 

227 

228 

229class TimestampVerificationData: 

230 """ 

231 Represents a TimestampVerificationData structure. 

232 

233 @private 

234 """ 

235 

236 def __init__(self, inner: _TimestampVerificationData) -> None: 

237 """Init method.""" 

238 self._inner = inner 

239 self._verify() 

240 

241 def _verify(self) -> None: 

242 """ 

243 Verifies the TimestampVerificationData. 

244 

245 It verifies that TimeStamp Responses embedded in the bundle are correctly 

246 formed. 

247 """ 

248 if not (timestamps := self._inner.rfc3161_timestamps): 

249 timestamps = [] 

250 

251 try: 

252 self._signed_ts = [ 

253 decode_timestamp_response(ts.signed_timestamp) for ts in timestamps 

254 ] 

255 except ValueError: 

256 raise VerificationError("Invalid Timestamp Response") 

257 

258 @property 

259 def rfc3161_timestamps(self) -> list[TimeStampResponse]: 

260 """Returns a list of signed timestamp.""" 

261 return self._signed_ts 

262 

263 @classmethod 

264 def from_json(cls, raw: str | bytes) -> TimestampVerificationData: 

265 """ 

266 Deserialize the given timestamp verification data. 

267 """ 

268 inner = _TimestampVerificationData.from_json(raw) 

269 return cls(inner) 

270 

271 

272class VerificationMaterial: 

273 """ 

274 Represents a VerificationMaterial structure. 

275 """ 

276 

277 def __init__(self, inner: _VerificationMaterial) -> None: 

278 """Init method.""" 

279 self._inner = inner 

280 

281 @property 

282 def timestamp_verification_data(self) -> TimestampVerificationData | None: 

283 """ 

284 Returns the Timestamp Verification Data, if present. 

285 """ 

286 if ( 

287 self._inner.timestamp_verification_data 

288 and self._inner.timestamp_verification_data.rfc3161_timestamps 

289 ): 

290 return TimestampVerificationData(self._inner.timestamp_verification_data) 

291 return None 

292 

293 

294class InvalidBundle(Error): 

295 """ 

296 Raised when the associated `Bundle` is invalid in some way. 

297 """ 

298 

299 def diagnostics(self) -> str: 

300 """Returns diagnostics for the error.""" 

301 

302 return dedent( 

303 f"""\ 

304 An issue occurred while parsing the Sigstore bundle. 

305 

306 The provided bundle is malformed and may have been modified maliciously. 

307 

308 Additional context: 

309 

310 {self} 

311 """ 

312 ) 

313 

314 

315class Bundle: 

316 """ 

317 Represents a Sigstore bundle. 

318 """ 

319 

320 class BundleType(str, Enum): 

321 """ 

322 Known Sigstore bundle media types. 

323 """ 

324 

325 BUNDLE_0_1 = "application/vnd.dev.sigstore.bundle+json;version=0.1" 

326 BUNDLE_0_2 = "application/vnd.dev.sigstore.bundle+json;version=0.2" 

327 BUNDLE_0_3_ALT = "application/vnd.dev.sigstore.bundle+json;version=0.3" 

328 BUNDLE_0_3 = "application/vnd.dev.sigstore.bundle.v0.3+json" 

329 

330 def __str__(self) -> str: 

331 """Returns the variant's string value.""" 

332 return self.value 

333 

334 def __init__(self, inner: _Bundle) -> None: 

335 """ 

336 Creates a new bundle. This is not a public API; use 

337 `from_json` instead. 

338 

339 @private 

340 """ 

341 self._inner = inner 

342 self._verify() 

343 

344 def _verify(self) -> None: 

345 """ 

346 Performs various feats of heroism to ensure the bundle is well-formed 

347 and upholds invariants, including: 

348 

349 * The "leaf" (signing) certificate is present; 

350 * There is a inclusion proof present, even if the Bundle's version 

351 predates a mandatory inclusion proof. 

352 """ 

353 

354 # The bundle must have a recognized media type. 

355 try: 

356 media_type = Bundle.BundleType(self._inner.media_type) 

357 except ValueError: 

358 raise InvalidBundle(f"unsupported bundle format: {self._inner.media_type}") 

359 

360 # Extract the signing certificate. 

361 if media_type in ( 

362 Bundle.BundleType.BUNDLE_0_3, 

363 Bundle.BundleType.BUNDLE_0_3_ALT, 

364 ): 

365 # For "v3" bundles, the signing certificate is the only one present. 

366 if not self._inner.verification_material.certificate: 

367 raise InvalidBundle("expected certificate in bundle") 

368 

369 leaf_cert = load_der_x509_certificate( 

370 self._inner.verification_material.certificate.raw_bytes 

371 ) 

372 else: 

373 # In older bundles, there is an entire pool (misleadingly called 

374 # a chain) of certificates, the first of which is the signing 

375 # certificate. 

376 if not self._inner.verification_material.x509_certificate_chain: 

377 raise InvalidBundle("expected certificate chain in bundle") 

378 

379 chain = self._inner.verification_material.x509_certificate_chain 

380 if not chain.certificates: 

381 raise InvalidBundle("expected non-empty certificate chain in bundle") 

382 

383 # Per client policy in protobuf-specs: the first entry in the chain 

384 # MUST be a leaf certificate, and the rest of the chain MUST NOT 

385 # include a root CA or any intermediate CAs that appear in an 

386 # independent root of trust. 

387 # 

388 # We expect some old bundles to violate the rules around root 

389 # and intermediate CAs, so we issue warnings and not hard errors 

390 # in those cases. 

391 leaf_cert, *chain_certs = ( 

392 load_der_x509_certificate(cert.raw_bytes) for cert in chain.certificates 

393 ) 

394 if not cert_is_leaf(leaf_cert): 

395 raise InvalidBundle( 

396 "bundle contains an invalid leaf or non-leaf certificate in the leaf position" 

397 ) 

398 

399 for chain_cert in chain_certs: 

400 # TODO: We should also retrieve the root of trust here and 

401 # cross-check against it. 

402 if cert_is_root_ca(chain_cert): 

403 _logger.warning( 

404 "this bundle contains a root CA, making it subject to misuse" 

405 ) 

406 

407 self._signing_certificate = leaf_cert 

408 

409 # Extract the log entry. For the time being, we expect 

410 # bundles to only contain a single log entry. 

411 tlog_entries = self._inner.verification_material.tlog_entries 

412 if len(tlog_entries) != 1: 

413 raise InvalidBundle("expected exactly one log entry in bundle") 

414 tlog_entry = tlog_entries[0] 

415 

416 # Handling of inclusion promises and proofs varies between bundle 

417 # format versions: 

418 # 

419 # * For 0.1, an inclusion promise is required; the client 

420 # MUST verify the inclusion promise. 

421 # The inclusion proof is NOT required. If provided, it might NOT 

422 # contain a checkpoint; in this case, we ignore it (since it's 

423 # useless without one). 

424 # 

425 # * For 0.2+, an inclusion proof is required; the client MUST 

426 # verify the inclusion proof. The inclusion prof MUST contain 

427 # a checkpoint. 

428 # 

429 # The inclusion promise is NOT required if another source of signed 

430 # time (such as a signed timestamp) is present. If no other source 

431 # of signed time is present, then the inclusion promise MUST be 

432 # present. 

433 # 

434 # Before all of this, we require that the inclusion proof be present 

435 # (when constructing the LogEntry). 

436 log_entry = TransparencyLogEntry(tlog_entry) 

437 

438 if media_type == Bundle.BundleType.BUNDLE_0_1: 

439 if not log_entry._inner.inclusion_promise: 

440 raise InvalidBundle("bundle must contain an inclusion promise") 

441 if not log_entry._inner.inclusion_proof.checkpoint: 

442 _logger.debug( 

443 "0.1 bundle contains inclusion proof without checkpoint; ignoring" 

444 ) 

445 else: 

446 if not log_entry._inner.inclusion_proof.checkpoint: 

447 raise InvalidBundle("expected checkpoint in inclusion proof") 

448 

449 if ( 

450 not log_entry._inner.inclusion_promise 

451 and not self.verification_material.timestamp_verification_data 

452 ): 

453 raise InvalidBundle( 

454 "bundle must contain an inclusion promise or signed timestamp(s)" 

455 ) 

456 

457 self._log_entry = log_entry 

458 

459 @property 

460 def signing_certificate(self) -> Certificate: 

461 """Returns the bundle's contained signing (i.e. leaf) certificate.""" 

462 return self._signing_certificate 

463 

464 @property 

465 def log_entry(self) -> TransparencyLogEntry: 

466 """ 

467 Returns the bundle's log entry, containing an inclusion proof 

468 (with checkpoint) and an inclusion promise (if the latter is present). 

469 """ 

470 return self._log_entry 

471 

472 @property 

473 def _dsse_envelope(self) -> dsse.Envelope | None: 

474 """ 

475 Returns the DSSE envelope within this Bundle as a `dsse.Envelope`. 

476 

477 @private 

478 """ 

479 if self._inner.dsse_envelope is not None: 

480 return dsse.Envelope(self._inner.dsse_envelope) 

481 return None 

482 

483 @property 

484 def signature(self) -> bytes: 

485 """ 

486 Returns the signature bytes of this bundle. 

487 Either from the DSSE Envelope or from the message itself. 

488 """ 

489 return ( 

490 self._dsse_envelope.signature 

491 if self._dsse_envelope 

492 else self._inner.message_signature.signature # type: ignore[union-attr] 

493 ) 

494 

495 @property 

496 def verification_material(self) -> VerificationMaterial: 

497 """ 

498 Returns the bundle's verification material. 

499 """ 

500 return VerificationMaterial(self._inner.verification_material) 

501 

502 @classmethod 

503 def from_json(cls, raw: bytes | str) -> Bundle: 

504 """ 

505 Deserialize the given Sigstore bundle. 

506 """ 

507 try: 

508 inner = _Bundle.from_json(raw) 

509 except ValueError as exc: 

510 raise InvalidBundle(f"failed to load bundle: {exc}") 

511 return cls(inner) 

512 

513 def to_json(self) -> str: 

514 """ 

515 Return a JSON encoding of this bundle. 

516 """ 

517 return self._inner.to_json() 

518 

519 def _to_parts( 

520 self, 

521 ) -> tuple[Certificate, MessageSignature | dsse.Envelope, TransparencyLogEntry]: 

522 """ 

523 Decompose the `Bundle` into its core constituent parts. 

524 

525 @private 

526 """ 

527 

528 content: MessageSignature | dsse.Envelope 

529 if self._dsse_envelope: 

530 content = self._dsse_envelope 

531 else: 

532 content = self._inner.message_signature # type: ignore[assignment] 

533 

534 return (self.signing_certificate, content, self.log_entry) 

535 

536 @classmethod 

537 def from_parts( 

538 cls, cert: Certificate, sig: bytes, log_entry: TransparencyLogEntry 

539 ) -> Bundle: 

540 """ 

541 Construct a Sigstore bundle (of `hashedrekord` type) from its 

542 constituent parts. 

543 """ 

544 

545 return cls._from_parts( 

546 cert, MessageSignature(signature=base64.b64encode(sig)), log_entry 

547 ) 

548 

549 @classmethod 

550 def _from_parts( 

551 cls, 

552 cert: Certificate, 

553 content: MessageSignature | dsse.Envelope, 

554 log_entry: TransparencyLogEntry, 

555 signed_timestamp: list[TimeStampResponse] | None = None, 

556 ) -> Bundle: 

557 """ 

558 @private 

559 """ 

560 

561 timestamp_verifcation_data = bundle_v1.TimestampVerificationData( 

562 rfc3161_timestamps=[] 

563 ) 

564 if signed_timestamp is not None: 

565 timestamp_verifcation_data.rfc3161_timestamps.extend( 

566 [ 

567 RFC3161SignedTimestamp( 

568 signed_timestamp=base64.b64encode(response.as_bytes()) 

569 ) 

570 for response in signed_timestamp 

571 ] 

572 ) 

573 

574 # Fill in the appropriate variant. 

575 message_signature = None 

576 dsse_envelope = None 

577 if isinstance(content, MessageSignature): 

578 message_signature = content 

579 else: 

580 dsse_envelope = content._inner 

581 

582 inner = _Bundle( 

583 media_type=Bundle.BundleType.BUNDLE_0_3.value, 

584 verification_material=bundle_v1.VerificationMaterial( 

585 certificate=common_v1.X509Certificate( 

586 raw_bytes=base64.b64encode(cert.public_bytes(Encoding.DER)) 

587 ), 

588 tlog_entries=[log_entry._inner], 

589 timestamp_verification_data=timestamp_verifcation_data, 

590 ), 

591 message_signature=message_signature, 

592 dsse_envelope=dsse_envelope, 

593 ) 

594 

595 return cls(inner)