Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sigstore/models.py: 48%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

223 statements  

1# Copyright 2022 The Sigstore Authors 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15""" 

16Common models shared between signing and verification. 

17""" 

18 

19from __future__ import annotations 

20 

21import base64 

22import logging 

23import typing 

24from enum import Enum 

25from textwrap import dedent 

26from typing import Any, Optional 

27 

28import rfc8785 

29from cryptography.hazmat.primitives.serialization import Encoding 

30from cryptography.x509 import ( 

31 Certificate, 

32 load_der_x509_certificate, 

33) 

34from pydantic import ( 

35 BaseModel, 

36 ConfigDict, 

37 Field, 

38 StrictInt, 

39 StrictStr, 

40 TypeAdapter, 

41 ValidationInfo, 

42 field_validator, 

43) 

44from pydantic.dataclasses import dataclass 

45from rekor_types import Dsse, Hashedrekord, ProposedEntry 

46from rfc3161_client import TimeStampResponse, decode_timestamp_response 

47from sigstore_protobuf_specs.dev.sigstore.bundle import v1 as bundle_v1 

48from sigstore_protobuf_specs.dev.sigstore.bundle.v1 import ( 

49 Bundle as _Bundle, 

50) 

51from sigstore_protobuf_specs.dev.sigstore.bundle.v1 import ( 

52 TimestampVerificationData as _TimestampVerificationData, 

53) 

54from sigstore_protobuf_specs.dev.sigstore.bundle.v1 import ( 

55 VerificationMaterial as _VerificationMaterial, 

56) 

57from sigstore_protobuf_specs.dev.sigstore.common import v1 as common_v1 

58from sigstore_protobuf_specs.dev.sigstore.common.v1 import Rfc3161SignedTimestamp 

59from sigstore_protobuf_specs.dev.sigstore.rekor import v1 as rekor_v1 

60from sigstore_protobuf_specs.dev.sigstore.rekor.v1 import ( 

61 InclusionProof, 

62) 

63 

64from sigstore import dsse 

65from sigstore._internal.merkle import verify_merkle_inclusion 

66from sigstore._internal.rekor.checkpoint import verify_checkpoint 

67from sigstore._utils import ( 

68 B64Str, 

69 KeyID, 

70 cert_is_leaf, 

71 cert_is_root_ca, 

72) 

73from sigstore.errors import Error, VerificationError 

74 

75if typing.TYPE_CHECKING: 

76 from sigstore._internal.trust import RekorKeyring 

77 

78 

79_logger = logging.getLogger(__name__) 

80 

81 

82class LogInclusionProof(BaseModel): 

83 """ 

84 Represents an inclusion proof for a transparency log entry. 

85 """ 

86 

87 model_config = ConfigDict(populate_by_name=True) 

88 

89 checkpoint: StrictStr = Field(..., alias="checkpoint") 

90 hashes: list[StrictStr] = Field(..., alias="hashes") 

91 log_index: StrictInt = Field(..., alias="logIndex") 

92 root_hash: StrictStr = Field(..., alias="rootHash") 

93 tree_size: StrictInt = Field(..., alias="treeSize") 

94 

95 @field_validator("log_index") 

96 def _log_index_positive(cls, v: int) -> int: 

97 if v < 0: 

98 raise ValueError(f"Inclusion proof has invalid log index: {v} < 0") 

99 return v 

100 

101 @field_validator("tree_size") 

102 def _tree_size_positive(cls, v: int) -> int: 

103 if v < 0: 

104 raise ValueError(f"Inclusion proof has invalid tree size: {v} < 0") 

105 return v 

106 

107 @field_validator("tree_size") 

108 def _log_index_within_tree_size( 

109 cls, v: int, info: ValidationInfo, **kwargs: Any 

110 ) -> int: 

111 if "log_index" in info.data and v <= info.data["log_index"]: 

112 raise ValueError( 

113 "Inclusion proof has log index greater than or equal to tree size: " 

114 f"{v} <= {info.data['log_index']}" 

115 ) 

116 return v 

117 

118 

119@dataclass(frozen=True) 

120class LogEntry: 

121 """ 

122 Represents a transparency log entry. 

123 

124 Log entries are retrieved from the transparency log after signing or verification events, 

125 or loaded from "Sigstore" bundles provided by the user. 

126 

127 This representation allows for either a missing inclusion promise or a missing 

128 inclusion proof, but not both: attempting to construct a `LogEntry` without 

129 at least one will fail. 

130 """ 

131 

132 uuid: Optional[str] 

133 """ 

134 This entry's unique ID in the log instance it was retrieved from. 

135 

136 For sharded log deployments, IDs are unique per-shard. 

137 

138 Not present for `LogEntry` instances loaded from Sigstore bundles. 

139 """ 

140 

141 body: B64Str 

142 """ 

143 The base64-encoded body of the transparency log entry. 

144 """ 

145 

146 integrated_time: int 

147 """ 

148 The UNIX time at which this entry was integrated into the transparency log. 

149 """ 

150 

151 log_id: str 

152 """ 

153 The log's ID (as the SHA256 hash of the DER-encoded public key for the log 

154 at the time of entry inclusion). 

155 """ 

156 

157 log_index: int 

158 """ 

159 The index of this entry within the log. 

160 """ 

161 

162 inclusion_proof: LogInclusionProof 

163 """ 

164 An inclusion proof for this log entry. 

165 """ 

166 

167 inclusion_promise: Optional[B64Str] 

168 """ 

169 An inclusion promise for this log entry, if present. 

170 

171 Internally, this is a base64-encoded Signed Entry Timestamp (SET) for this 

172 log entry. 

173 """ 

174 

175 @classmethod 

176 def _from_response(cls, dict_: dict[str, Any]) -> LogEntry: 

177 """ 

178 Create a new `LogEntry` from the given API response. 

179 """ 

180 

181 # Assumes we only get one entry back 

182 entries = list(dict_.items()) 

183 if len(entries) != 1: 

184 raise ValueError("Received multiple entries in response") 

185 

186 uuid, entry = entries[0] 

187 return LogEntry( 

188 uuid=uuid, 

189 body=entry["body"], 

190 integrated_time=entry["integratedTime"], 

191 log_id=entry["logID"], 

192 log_index=entry["logIndex"], 

193 inclusion_proof=LogInclusionProof.model_validate( 

194 entry["verification"]["inclusionProof"] 

195 ), 

196 inclusion_promise=entry["verification"]["signedEntryTimestamp"], 

197 ) 

198 

199 @classmethod 

200 def _from_dict_rekor(cls, dict_: dict[str, Any]) -> LogEntry: 

201 """ 

202 Create a new `LogEntry` from the given Rekor TransparencyLogEntry. 

203 """ 

204 tlog_entry = rekor_v1.TransparencyLogEntry() 

205 tlog_entry.from_dict(dict_) 

206 

207 inclusion_proof: InclusionProof | None = tlog_entry.inclusion_proof 

208 # This check is required by us as the client, not the 

209 # protobuf-specs themselves. 

210 if not inclusion_proof or not inclusion_proof.checkpoint.envelope: 

211 raise InvalidBundle("entry must contain inclusion proof, with checkpoint") 

212 

213 parsed_inclusion_proof = LogInclusionProof( 

214 checkpoint=inclusion_proof.checkpoint.envelope, 

215 hashes=[h.hex() for h in inclusion_proof.hashes], 

216 log_index=inclusion_proof.log_index, 

217 root_hash=inclusion_proof.root_hash.hex(), 

218 tree_size=inclusion_proof.tree_size, 

219 ) 

220 

221 return LogEntry( 

222 uuid=None, 

223 body=B64Str(base64.b64encode(tlog_entry.canonicalized_body).decode()), 

224 integrated_time=tlog_entry.integrated_time, 

225 log_id=tlog_entry.log_id.key_id.hex(), 

226 log_index=tlog_entry.log_index, 

227 inclusion_proof=parsed_inclusion_proof, 

228 inclusion_promise=B64Str( 

229 base64.b64encode( 

230 tlog_entry.inclusion_promise.signed_entry_timestamp 

231 ).decode() 

232 ), 

233 ) 

234 

235 def _to_rekor(self) -> rekor_v1.TransparencyLogEntry: 

236 """ 

237 Create a new protobuf-level `TransparencyLogEntry` from this `LogEntry`. 

238 

239 @private 

240 """ 

241 inclusion_promise: rekor_v1.InclusionPromise | None = None 

242 if self.inclusion_promise: 

243 inclusion_promise = rekor_v1.InclusionPromise( 

244 signed_entry_timestamp=base64.b64decode(self.inclusion_promise) 

245 ) 

246 

247 inclusion_proof = rekor_v1.InclusionProof( 

248 log_index=self.inclusion_proof.log_index, 

249 root_hash=bytes.fromhex(self.inclusion_proof.root_hash), 

250 tree_size=self.inclusion_proof.tree_size, 

251 hashes=[bytes.fromhex(hash_) for hash_ in self.inclusion_proof.hashes], 

252 checkpoint=rekor_v1.Checkpoint(envelope=self.inclusion_proof.checkpoint), 

253 ) 

254 

255 tlog_entry = rekor_v1.TransparencyLogEntry( 

256 log_index=self.log_index, 

257 log_id=common_v1.LogId(key_id=bytes.fromhex(self.log_id)), 

258 integrated_time=self.integrated_time, 

259 inclusion_promise=inclusion_promise, # type: ignore[arg-type] 

260 inclusion_proof=inclusion_proof, 

261 canonicalized_body=base64.b64decode(self.body), 

262 ) 

263 

264 # Fill in the appropriate kind 

265 body_entry: ProposedEntry = TypeAdapter(ProposedEntry).validate_json( 

266 tlog_entry.canonicalized_body 

267 ) 

268 if not isinstance(body_entry, (Hashedrekord, Dsse)): 

269 raise InvalidBundle("log entry is not of expected type") 

270 

271 tlog_entry.kind_version = rekor_v1.KindVersion( 

272 kind=body_entry.kind, version=body_entry.api_version 

273 ) 

274 

275 return tlog_entry 

276 

277 def encode_canonical(self) -> bytes: 

278 """ 

279 Returns a canonicalized JSON (RFC 8785) representation of the transparency log entry. 

280 

281 This encoded representation is suitable for verification against 

282 the Signed Entry Timestamp. 

283 """ 

284 payload: dict[str, int | str] = { 

285 "body": self.body, 

286 "integratedTime": self.integrated_time, 

287 "logID": self.log_id, 

288 "logIndex": self.log_index, 

289 } 

290 

291 return rfc8785.dumps(payload) 

292 

293 def _verify_set(self, keyring: RekorKeyring) -> None: 

294 """ 

295 Verify the inclusion promise (Signed Entry Timestamp) for a given transparency log 

296 `entry` using the given `keyring`. 

297 

298 Fails if the given log entry does not contain an inclusion promise. 

299 """ 

300 

301 if self.inclusion_promise is None: 

302 raise VerificationError("SET: invalid inclusion promise: missing") 

303 

304 signed_entry_ts = base64.b64decode(self.inclusion_promise) 

305 

306 try: 

307 keyring.verify( 

308 key_id=KeyID(bytes.fromhex(self.log_id)), 

309 signature=signed_entry_ts, 

310 data=self.encode_canonical(), 

311 ) 

312 except VerificationError as exc: 

313 raise VerificationError(f"SET: invalid inclusion promise: {exc}") 

314 

315 def _verify(self, keyring: RekorKeyring) -> None: 

316 """ 

317 Verifies this log entry. 

318 

319 This method performs steps (5), (6), and optionally (7) in 

320 the top-level verify API: 

321 

322 * Verifies the consistency of the entry with the given bundle; 

323 * Verifies the Merkle inclusion proof and its signed checkpoint; 

324 * Verifies the inclusion promise, if present. 

325 """ 

326 

327 verify_merkle_inclusion(self) 

328 verify_checkpoint(keyring, self) 

329 

330 _logger.debug(f"successfully verified inclusion proof: index={self.log_index}") 

331 

332 if self.inclusion_promise: 

333 self._verify_set(keyring) 

334 _logger.debug( 

335 f"successfully verified inclusion promise: index={self.log_index}" 

336 ) 

337 

338 

339class TimestampVerificationData: 

340 """ 

341 Represents a TimestampVerificationData structure. 

342 

343 @private 

344 """ 

345 

346 def __init__(self, inner: _TimestampVerificationData) -> None: 

347 """Init method.""" 

348 self._inner = inner 

349 self._verify() 

350 

351 def _verify(self) -> None: 

352 """ 

353 Verifies the TimestampVerificationData. 

354 

355 It verifies that TimeStamp Responses embedded in the bundle are correctly 

356 formed. 

357 """ 

358 try: 

359 self._signed_ts = [ 

360 decode_timestamp_response(ts.signed_timestamp) 

361 for ts in self._inner.rfc3161_timestamps 

362 ] 

363 except ValueError: 

364 raise VerificationError("Invalid Timestamp Response") 

365 

366 @property 

367 def rfc3161_timestamps(self) -> list[TimeStampResponse]: 

368 """Returns a list of signed timestamp.""" 

369 return self._signed_ts 

370 

371 @classmethod 

372 def from_json(cls, raw: str | bytes) -> TimestampVerificationData: 

373 """ 

374 Deserialize the given timestamp verification data. 

375 """ 

376 inner = _TimestampVerificationData().from_json(raw) 

377 return cls(inner) 

378 

379 

380class VerificationMaterial: 

381 """ 

382 Represents a VerificationMaterial structure. 

383 """ 

384 

385 def __init__(self, inner: _VerificationMaterial) -> None: 

386 """Init method.""" 

387 self._inner = inner 

388 

389 @property 

390 def timestamp_verification_data(self) -> TimestampVerificationData: 

391 """ 

392 Returns the Timestamp Verification Data. 

393 """ 

394 return TimestampVerificationData(self._inner.timestamp_verification_data) 

395 

396 

397class InvalidBundle(Error): 

398 """ 

399 Raised when the associated `Bundle` is invalid in some way. 

400 """ 

401 

402 def diagnostics(self) -> str: 

403 """Returns diagnostics for the error.""" 

404 

405 return dedent( 

406 f"""\ 

407 An issue occurred while parsing the Sigstore bundle. 

408 

409 The provided bundle is malformed and may have been modified maliciously. 

410 

411 Additional context: 

412 

413 {self} 

414 """ 

415 ) 

416 

417 

418class Bundle: 

419 """ 

420 Represents a Sigstore bundle. 

421 """ 

422 

423 class BundleType(str, Enum): 

424 """ 

425 Known Sigstore bundle media types. 

426 """ 

427 

428 BUNDLE_0_1 = "application/vnd.dev.sigstore.bundle+json;version=0.1" 

429 BUNDLE_0_2 = "application/vnd.dev.sigstore.bundle+json;version=0.2" 

430 BUNDLE_0_3_ALT = "application/vnd.dev.sigstore.bundle+json;version=0.3" 

431 BUNDLE_0_3 = "application/vnd.dev.sigstore.bundle.v0.3+json" 

432 

433 def __str__(self) -> str: 

434 """Returns the variant's string value.""" 

435 return self.value 

436 

437 def __init__(self, inner: _Bundle) -> None: 

438 """ 

439 Creates a new bundle. This is not a public API; use 

440 `from_json` instead. 

441 

442 @private 

443 """ 

444 self._inner = inner 

445 self._verify() 

446 

447 def _verify(self) -> None: 

448 """ 

449 Performs various feats of heroism to ensure the bundle is well-formed 

450 and upholds invariants, including: 

451 

452 * The "leaf" (signing) certificate is present; 

453 * There is a inclusion proof present, even if the Bundle's version 

454 predates a mandatory inclusion proof. 

455 """ 

456 

457 # The bundle must have a recognized media type. 

458 try: 

459 media_type = Bundle.BundleType(self._inner.media_type) 

460 except ValueError: 

461 raise InvalidBundle(f"unsupported bundle format: {self._inner.media_type}") 

462 

463 # Extract the signing certificate. 

464 if media_type in ( 

465 Bundle.BundleType.BUNDLE_0_3, 

466 Bundle.BundleType.BUNDLE_0_3_ALT, 

467 ): 

468 # For "v3" bundles, the signing certificate is the only one present. 

469 leaf_cert = load_der_x509_certificate( 

470 self._inner.verification_material.certificate.raw_bytes 

471 ) 

472 else: 

473 # In older bundles, there is an entire pool (misleadingly called 

474 # a chain) of certificates, the first of which is the signing 

475 # certificate. 

476 certs = ( 

477 self._inner.verification_material.x509_certificate_chain.certificates 

478 ) 

479 

480 if len(certs) == 0: 

481 raise InvalidBundle("expected non-empty certificate chain in bundle") 

482 

483 # Per client policy in protobuf-specs: the first entry in the chain 

484 # MUST be a leaf certificate, and the rest of the chain MUST NOT 

485 # include a root CA or any intermediate CAs that appear in an 

486 # independent root of trust. 

487 # 

488 # We expect some old bundles to violate the rules around root 

489 # and intermediate CAs, so we issue warnings and not hard errors 

490 # in those cases. 

491 leaf_cert, *chain_certs = [ 

492 load_der_x509_certificate(cert.raw_bytes) for cert in certs 

493 ] 

494 if not cert_is_leaf(leaf_cert): 

495 raise InvalidBundle( 

496 "bundle contains an invalid leaf or non-leaf certificate in the leaf position" 

497 ) 

498 

499 for chain_cert in chain_certs: 

500 # TODO: We should also retrieve the root of trust here and 

501 # cross-check against it. 

502 if cert_is_root_ca(chain_cert): 

503 _logger.warning( 

504 "this bundle contains a root CA, making it subject to misuse" 

505 ) 

506 

507 self._signing_certificate = leaf_cert 

508 

509 # Extract the log entry. For the time being, we expect 

510 # bundles to only contain a single log entry. 

511 tlog_entries = self._inner.verification_material.tlog_entries 

512 if len(tlog_entries) != 1: 

513 raise InvalidBundle("expected exactly one log entry in bundle") 

514 tlog_entry = tlog_entries[0] 

515 

516 # Handling of inclusion promises and proofs varies between bundle 

517 # format versions: 

518 # 

519 # * For 0.1, an inclusion promise is required; the client 

520 # MUST verify the inclusion promise. 

521 # The inclusion proof is NOT required. If provided, it might NOT 

522 # contain a checkpoint; in this case, we ignore it (since it's 

523 # useless without one). 

524 # 

525 # * For 0.2+, an inclusion proof is required; the client MUST 

526 # verify the inclusion proof. The inclusion prof MUST contain 

527 # a checkpoint. 

528 # 

529 # The inclusion promise is NOT required if another source of signed 

530 # time (such as a signed timestamp) is present. If no other source 

531 # of signed time is present, then the inclusion promise MUST be 

532 # present. 

533 # 

534 # Before all of this, we require that the inclusion proof be present 

535 # (when constructing the LogEntry). 

536 log_entry = LogEntry._from_dict_rekor(tlog_entry.to_dict()) 

537 

538 if media_type == Bundle.BundleType.BUNDLE_0_1: 

539 if not log_entry.inclusion_promise: 

540 raise InvalidBundle("bundle must contain an inclusion promise") 

541 if not log_entry.inclusion_proof.checkpoint: 

542 _logger.debug( 

543 "0.1 bundle contains inclusion proof without checkpoint; ignoring" 

544 ) 

545 else: 

546 if not log_entry.inclusion_proof.checkpoint: 

547 raise InvalidBundle("expected checkpoint in inclusion proof") 

548 

549 if ( 

550 not log_entry.inclusion_promise 

551 and not self._inner.verification_material.timestamp_verification_data.rfc3161_timestamps 

552 ): 

553 raise InvalidBundle( 

554 "bundle must contain an inclusion promise or signed timestamp(s)" 

555 ) 

556 

557 self._log_entry = log_entry 

558 

559 @property 

560 def signing_certificate(self) -> Certificate: 

561 """Returns the bundle's contained signing (i.e. leaf) certificate.""" 

562 return self._signing_certificate 

563 

564 @property 

565 def log_entry(self) -> LogEntry: 

566 """ 

567 Returns the bundle's log entry, containing an inclusion proof 

568 (with checkpoint) and an inclusion promise (if the latter is present). 

569 """ 

570 return self._log_entry 

571 

572 @property 

573 def _dsse_envelope(self) -> dsse.Envelope | None: 

574 """ 

575 Returns the DSSE envelope within this Bundle as a `dsse.Envelope`. 

576 

577 @private 

578 """ 

579 if self._inner.dsse_envelope: 

580 return dsse.Envelope(self._inner.dsse_envelope) 

581 return None 

582 

583 @property 

584 def signature(self) -> bytes: 

585 """ 

586 Returns the signature bytes of this bundle. 

587 Either from the DSSE Envelope or from the message itself. 

588 """ 

589 return ( 

590 self._dsse_envelope.signature 

591 if self._dsse_envelope 

592 else self._inner.message_signature.signature 

593 ) 

594 

595 @property 

596 def verification_material(self) -> VerificationMaterial: 

597 """ 

598 Returns the bundle's verification material. 

599 """ 

600 return VerificationMaterial(self._inner.verification_material) 

601 

602 @classmethod 

603 def from_json(cls, raw: bytes | str) -> Bundle: 

604 """ 

605 Deserialize the given Sigstore bundle. 

606 """ 

607 inner = _Bundle().from_json(raw) 

608 return cls(inner) 

609 

610 def to_json(self) -> str: 

611 """ 

612 Return a JSON encoding of this bundle. 

613 """ 

614 return self._inner.to_json() 

615 

616 def _to_parts( 

617 self, 

618 ) -> tuple[Certificate, common_v1.MessageSignature | dsse.Envelope, LogEntry]: 

619 """ 

620 Decompose the `Bundle` into its core constituent parts. 

621 

622 @private 

623 """ 

624 

625 content: common_v1.MessageSignature | dsse.Envelope 

626 content = self._dsse_envelope or self._inner.message_signature 

627 

628 return (self.signing_certificate, content, self.log_entry) 

629 

630 @classmethod 

631 def from_parts(cls, cert: Certificate, sig: bytes, log_entry: LogEntry) -> Bundle: 

632 """ 

633 Construct a Sigstore bundle (of `hashedrekord` type) from its 

634 constituent parts. 

635 """ 

636 

637 return cls._from_parts( 

638 cert, common_v1.MessageSignature(signature=sig), log_entry 

639 ) 

640 

641 @classmethod 

642 def _from_parts( 

643 cls, 

644 cert: Certificate, 

645 content: common_v1.MessageSignature | dsse.Envelope, 

646 log_entry: LogEntry, 

647 signed_timestamp: Optional[list[TimeStampResponse]] = None, 

648 ) -> Bundle: 

649 """ 

650 @private 

651 """ 

652 

653 inner = _Bundle( 

654 media_type=Bundle.BundleType.BUNDLE_0_3.value, 

655 verification_material=bundle_v1.VerificationMaterial( 

656 certificate=common_v1.X509Certificate(cert.public_bytes(Encoding.DER)), 

657 ), 

658 ) 

659 

660 # Fill in the appropriate variants. 

661 if isinstance(content, common_v1.MessageSignature): 

662 inner.message_signature = content 

663 else: 

664 inner.dsse_envelope = content._inner 

665 

666 tlog_entry = log_entry._to_rekor() 

667 inner.verification_material.tlog_entries = [tlog_entry] 

668 

669 if signed_timestamp is not None: 

670 inner.verification_material.timestamp_verification_data = ( 

671 bundle_v1.TimestampVerificationData( 

672 rfc3161_timestamps=[ 

673 Rfc3161SignedTimestamp(signed_timestamp=response.as_bytes()) 

674 for response in signed_timestamp 

675 ] 

676 ) 

677 ) 

678 

679 return cls(inner)