Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sigstore/verify/verifier.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

239 statements  

1# Copyright 2022 The Sigstore Authors 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15""" 

16Verification API machinery. 

17""" 

18 

19from __future__ import annotations 

20 

21import base64 

22import logging 

23from datetime import datetime, timezone 

24from typing import cast 

25 

26import rekor_types 

27from cryptography.exceptions import InvalidSignature 

28from cryptography.hazmat.primitives import serialization 

29from cryptography.hazmat.primitives.asymmetric import ec 

30from cryptography.x509 import Certificate, ExtendedKeyUsage, KeyUsage 

31from cryptography.x509.oid import ExtendedKeyUsageOID 

32from OpenSSL.crypto import ( 

33 X509, 

34 X509Store, 

35 X509StoreContext, 

36 X509StoreContextError, 

37 X509StoreFlags, 

38) 

39from pydantic import ValidationError 

40from rfc3161_client import TimeStampResponse, VerifierBuilder 

41from rfc3161_client import VerificationError as Rfc3161VerificationError 

42from sigstore_models.common import v1 

43from sigstore_models.rekor import v2 

44 

45from sigstore import dsse 

46from sigstore._internal.rekor import _hashedrekord_from_parts 

47from sigstore._internal.rekor.client import RekorClient 

48from sigstore._internal.sct import ( 

49 verify_sct, 

50) 

51from sigstore._internal.timestamp import TimestampSource, TimestampVerificationResult 

52from sigstore._internal.trust import KeyringPurpose 

53from sigstore._utils import base64_encode_pem_cert, sha256_digest 

54from sigstore.errors import CertValidationError, VerificationError 

55from sigstore.hashes import Hashed 

56from sigstore.models import Bundle, ClientTrustConfig, TrustedRoot 

57from sigstore.verify.policy import VerificationPolicy 

58 

59_logger = logging.getLogger(__name__) 

60 

61# Limit the number of timestamps to prevent DoS 

62# From https://github.com/sigstore/sigstore-go/blob/e92142f0734064ebf6001f188b7330a1212245fe/pkg/verify/tsa.go#L29 

63MAX_ALLOWED_TIMESTAMP: int = 32 

64 

65# When verifying an entry, this threshold represents the minimum number of required 

66# verified times to consider a signature valid. 

67VERIFIED_TIME_THRESHOLD: int = 1 

68 

69 

70class Verifier: 

71 """ 

72 The primary API for verification operations. 

73 """ 

74 

75 def __init__(self, *, trusted_root: TrustedRoot): 

76 """ 

77 Create a new `Verifier`. 

78 

79 `trusted_root` is the `TrustedRoot` object containing the root of trust 

80 for the verification process. 

81 """ 

82 self._fulcio_certificate_chain: list[X509] = [ 

83 X509.from_cryptography(parent_cert) 

84 for parent_cert in trusted_root.get_fulcio_certs() 

85 ] 

86 self._trusted_root = trusted_root 

87 

88 # this is an ugly hack needed for verifying "detached" materials 

89 # In reality we should be choosing the rekor instance based on the logid 

90 url = trusted_root._inner.tlogs[0].base_url 

91 self._rekor = RekorClient(url) 

92 

93 @classmethod 

94 def production(cls, *, offline: bool = False) -> Verifier: 

95 """ 

96 Return a `Verifier` instance configured against Sigstore's production-level services. 

97 

98 `offline` controls the Trusted Root refresh behavior: if `True`, 

99 the verifier uses the Trusted Root in the local TUF cache. If `False`, 

100 a TUF repository refresh is attempted. 

101 """ 

102 config = ClientTrustConfig.production(offline=offline) 

103 return cls( 

104 trusted_root=config.trusted_root, 

105 ) 

106 

107 @classmethod 

108 def staging(cls, *, offline: bool = False) -> Verifier: 

109 """ 

110 Return a `Verifier` instance configured against Sigstore's staging-level services. 

111 

112 `offline` controls the Trusted Root refresh behavior: if `True`, 

113 the verifier uses the Trusted Root in the local TUF cache. If `False`, 

114 a TUF repository refresh is attempted. 

115 """ 

116 config = ClientTrustConfig.staging(offline=offline) 

117 return cls( 

118 trusted_root=config.trusted_root, 

119 ) 

120 

121 def _verify_signed_timestamp( 

122 self, timestamp_response: TimeStampResponse, message: bytes 

123 ) -> TimestampVerificationResult | None: 

124 """ 

125 Verify a Signed Timestamp using the TSA provided by the Trusted Root. 

126 """ 

127 cert_authorities = self._trusted_root.get_timestamp_authorities() 

128 for certificate_authority in cert_authorities: 

129 certificates = certificate_authority.certificates(allow_expired=True) 

130 

131 # We expect at least a signing cert and a root cert but there may be intermediates 

132 if len(certificates) < 2: 

133 _logger.debug("Unable to verify Timestamp: cert chain is incomplete") 

134 continue 

135 

136 builder = ( 

137 VerifierBuilder() 

138 .tsa_certificate(certificates[0]) 

139 .add_root_certificate(certificates[-1]) 

140 ) 

141 for certificate in certificates[1:-1]: 

142 builder = builder.add_intermediate_certificate(certificate) 

143 

144 verifier = builder.build() 

145 try: 

146 verifier.verify_message(timestamp_response, message) 

147 except Rfc3161VerificationError: 

148 _logger.debug("Unable to verify Timestamp with CA.", exc_info=True) 

149 continue 

150 

151 if ( 

152 certificate_authority.validity_period_start 

153 <= timestamp_response.tst_info.gen_time 

154 ) and ( 

155 not certificate_authority.validity_period_end 

156 or timestamp_response.tst_info.gen_time 

157 < certificate_authority.validity_period_end 

158 ): 

159 return TimestampVerificationResult( 

160 source=TimestampSource.TIMESTAMP_AUTHORITY, 

161 time=timestamp_response.tst_info.gen_time, 

162 ) 

163 

164 _logger.debug("Unable to verify Timestamp because not in CA time range.") 

165 

166 return None 

167 

168 def _verify_timestamp_authority( 

169 self, bundle: Bundle 

170 ) -> list[TimestampVerificationResult]: 

171 """ 

172 Verify that the given bundle has been timestamped by a trusted timestamp authority 

173 and that the timestamp is valid. 

174 

175 Returns the number of valid signed timestamp in the bundle. 

176 """ 

177 timestamp_responses = [] 

178 if ( 

179 timestamp_verification_data 

180 := bundle.verification_material.timestamp_verification_data 

181 ): 

182 timestamp_responses = timestamp_verification_data.rfc3161_timestamps 

183 

184 if len(timestamp_responses) > MAX_ALLOWED_TIMESTAMP: 

185 msg = f"too many signed timestamp: {len(timestamp_responses)} > {MAX_ALLOWED_TIMESTAMP}" 

186 raise VerificationError(msg) 

187 

188 if len(set(timestamp_responses)) != len(timestamp_responses): 

189 msg = "duplicate timestamp found" 

190 raise VerificationError(msg) 

191 

192 verified_timestamps = [ 

193 result 

194 for tsr in timestamp_responses 

195 if (result := self._verify_signed_timestamp(tsr, bundle.signature)) 

196 ] 

197 

198 return verified_timestamps 

199 

200 def _establish_time(self, bundle: Bundle) -> list[TimestampVerificationResult]: 

201 """ 

202 Establish the time for bundle verification. 

203 

204 This method uses timestamps from two possible sources: 

205 1. RFC3161 signed timestamps from a Timestamping Authority (TSA) 

206 2. Transparency Log timestamps 

207 """ 

208 verified_timestamps = [] 

209 

210 # If a timestamp from the timestamping service is available, the Verifier MUST 

211 # perform path validation using the timestamp from the Timestamping Service. 

212 if bundle.verification_material.timestamp_verification_data: 

213 if not self._trusted_root.get_timestamp_authorities(): 

214 msg = ( 

215 "no Timestamp Authorities have been provided to validate this " 

216 "bundle but it contains a signed timestamp" 

217 ) 

218 raise VerificationError(msg) 

219 

220 timestamp_from_tsa = self._verify_timestamp_authority(bundle) 

221 verified_timestamps.extend(timestamp_from_tsa) 

222 

223 # If a timestamp from the Transparency Service is available, the Verifier MUST 

224 # perform path validation using the timestamp from the Transparency Service. 

225 # NOTE: We only include this timestamp if it's accompanied by an inclusion 

226 # promise that cryptographically binds it. We verify the inclusion promise 

227 # itself later, as part of log entry verification. 

228 if ( 

229 timestamp := bundle.log_entry._inner.integrated_time 

230 ) and bundle.log_entry._inner.inclusion_promise: 

231 kv = bundle.log_entry._inner.kind_version 

232 if not (kv.kind in ["dsse", "hashedrekord"] and kv.version == "0.0.1"): 

233 raise VerificationError( 

234 "Integrated time only supported for dsse/hashedrekord 0.0.1 types" 

235 ) 

236 

237 verified_timestamps.append( 

238 TimestampVerificationResult( 

239 source=TimestampSource.TRANSPARENCY_SERVICE, 

240 time=datetime.fromtimestamp(timestamp, tz=timezone.utc), 

241 ) 

242 ) 

243 return verified_timestamps 

244 

245 def _verify_chain_at_time( 

246 self, certificate: X509, timestamp_result: TimestampVerificationResult 

247 ) -> list[X509]: 

248 """ 

249 Verify the validity of the certificate chain at the given time. 

250 

251 Raises a VerificationError if the chain can't be built or be verified. 

252 """ 

253 # NOTE: The `X509Store` object cannot have its time reset once the `set_time` 

254 # method been called on it. To get around this, we construct a new one in each 

255 # call. 

256 store = X509Store() 

257 # NOTE: By explicitly setting the flags here, we ensure that OpenSSL's 

258 # PARTIAL_CHAIN default does not change on us. Enabling PARTIAL_CHAIN 

259 # would be strictly more conformant of OpenSSL, but we currently 

260 # *want* the "long" chain behavior of performing path validation 

261 # down to a self-signed root. 

262 store.set_flags(X509StoreFlags.X509_STRICT) 

263 for parent_cert_ossl in self._fulcio_certificate_chain: 

264 store.add_cert(parent_cert_ossl) 

265 

266 store.set_time(timestamp_result.time) 

267 

268 store_ctx = X509StoreContext(store, certificate) 

269 

270 try: 

271 # get_verified_chain returns the full chain including the end-entity certificate 

272 # and chain should contain only CA certificates 

273 return store_ctx.get_verified_chain()[1:] 

274 except X509StoreContextError as e: 

275 raise CertValidationError( 

276 f"failed to build timestamp certificate chain: {e}" 

277 ) 

278 

279 def _verify_common_signing_cert( 

280 self, bundle: Bundle, policy: VerificationPolicy 

281 ) -> None: 

282 """ 

283 Performs the signing certificate verification steps that are shared between 

284 `verify_dsse` and `verify_artifact`. 

285 

286 Raises `VerificationError` on all failures. 

287 """ 

288 

289 # In order to verify an artifact, we need to achieve the following: 

290 # 

291 # 0. Establish a time for the signature. 

292 # 1. Verify that the signing certificate chains to the root of trust 

293 # and is valid at the time of signing. 

294 # 2. Verify the signing certificate's SCT. 

295 # 3. Verify that the signing certificate conforms to the Sigstore 

296 # X.509 profile as well as the passed-in `VerificationPolicy`. 

297 # 4. Verify the inclusion proof and signed checkpoint for the log 

298 # entry. 

299 # 5. Verify the inclusion promise for the log entry, if present. 

300 # 6. Verify the timely insertion of the log entry against the validity 

301 # period for the signing certificate. 

302 # 7. Verify the signature and input against the signing certificate's 

303 # public key. 

304 # 8. Verify the transparency log entry's consistency against the other 

305 # materials, to prevent variants of CVE-2022-36056. 

306 # 

307 # This method performs steps (0) through (6) above. Its caller 

308 # MUST perform steps (7) and (8) separately, since they vary based on 

309 # the kind of verification being performed (i.e. hashedrekord, DSSE, etc.) 

310 

311 cert = bundle.signing_certificate 

312 

313 # NOTE: The `X509Store` object currently cannot have its time reset once the `set_time` 

314 # method been called on it. To get around this, we construct a new one for every `verify` 

315 # call. 

316 store = X509Store() 

317 # NOTE: By explicitly setting the flags here, we ensure that OpenSSL's 

318 # PARTIAL_CHAIN default does not change on us. Enabling PARTIAL_CHAIN 

319 # would be strictly more conformant of OpenSSL, but we currently 

320 # *want* the "long" chain behavior of performing path validation 

321 # down to a self-signed root. 

322 store.set_flags(X509StoreFlags.X509_STRICT) 

323 for parent_cert_ossl in self._fulcio_certificate_chain: 

324 store.add_cert(parent_cert_ossl) 

325 

326 # (0): Establishing a Time for the Signature 

327 # First, establish verified times for the signature. This is required to 

328 # validate the certificate chain, so this step comes first. 

329 # These include TSA timestamps and (in the case of rekor v1 entries) 

330 # rekor log integrated time. 

331 verified_timestamps = self._establish_time(bundle) 

332 if len(verified_timestamps) < VERIFIED_TIME_THRESHOLD: 

333 raise VerificationError("not enough sources of verified time") 

334 

335 # (1): verify that the signing certificate is signed by the root 

336 # certificate and that the signing certificate was valid at the 

337 # time of signing. 

338 cert_ossl = X509.from_cryptography(cert) 

339 chain: list[X509] = [] 

340 for vts in verified_timestamps: 

341 chain = self._verify_chain_at_time(cert_ossl, vts) 

342 

343 # (2): verify the signing certificate's SCT. 

344 try: 

345 verify_sct( 

346 cert, 

347 [parent_cert.to_cryptography() for parent_cert in chain], 

348 self._trusted_root.ct_keyring(KeyringPurpose.VERIFY), 

349 ) 

350 except VerificationError as e: 

351 raise VerificationError(f"failed to verify SCT on signing certificate: {e}") 

352 

353 # (3): verify the signing certificate against the Sigstore 

354 # X.509 profile and verify against the given `VerificationPolicy`. 

355 usage_ext = cert.extensions.get_extension_for_class(KeyUsage) 

356 if not usage_ext.value.digital_signature: 

357 raise VerificationError("Key usage is not of type `digital signature`") 

358 

359 extended_usage_ext = cert.extensions.get_extension_for_class(ExtendedKeyUsage) 

360 if ExtendedKeyUsageOID.CODE_SIGNING not in extended_usage_ext.value: 

361 raise VerificationError("Extended usage does not contain `code signing`") 

362 

363 policy.verify(cert) 

364 

365 _logger.debug("Successfully verified signing certificate validity...") 

366 

367 # (4): verify the inclusion proof and signed checkpoint for the 

368 # log entry. 

369 # (5): verify the inclusion promise for the log entry, if present. 

370 entry = bundle.log_entry 

371 try: 

372 entry._verify(self._trusted_root.rekor_keyring(KeyringPurpose.VERIFY)) 

373 except VerificationError as exc: 

374 raise VerificationError(f"invalid log entry: {exc}") 

375 

376 # (6): verify our established times (timestamps or the log integration time) are 

377 # within signing certificate validity period. 

378 for vts in verified_timestamps: 

379 if not ( 

380 bundle.signing_certificate.not_valid_before_utc 

381 <= vts.time 

382 <= bundle.signing_certificate.not_valid_after_utc 

383 ): 

384 raise VerificationError( 

385 f"invalid signing cert: expired at time of signing, time via {vts}" 

386 ) 

387 

388 def verify_dsse( 

389 self, bundle: Bundle, policy: VerificationPolicy 

390 ) -> tuple[str, bytes]: 

391 """ 

392 Verifies an bundle's DSSE envelope, returning the encapsulated payload 

393 and its content type. 

394 

395 This method is only for DSSE-enveloped payloads. To verify 

396 an arbitrary input against a bundle, use the `verify_artifact` 

397 method. 

398 

399 `bundle` is the Sigstore `Bundle` to both verify and verify against. 

400 

401 `policy` is the `VerificationPolicy` to verify against. 

402 

403 Returns a tuple of `(type, payload)`, where `type` is the payload's 

404 type as encoded in the DSSE envelope and `payload` is the raw `bytes` 

405 of the payload. No validation of either `type` or `payload` is 

406 performed; users of this API **must** assert that `type` is known 

407 to them before proceeding to handle `payload` in an application-dependent 

408 manner. 

409 """ 

410 

411 # (1) through (6) are performed by `_verify_common_signing_cert`. 

412 self._verify_common_signing_cert(bundle, policy) 

413 

414 # (7): verify the bundle's signature and DSSE envelope against the 

415 # signing certificate's public key. 

416 envelope = bundle._dsse_envelope 

417 if envelope is None: 

418 raise VerificationError( 

419 "cannot perform DSSE verification on a bundle without a DSSE envelope" 

420 ) 

421 

422 signing_key = bundle.signing_certificate.public_key() 

423 signing_key = cast(ec.EllipticCurvePublicKey, signing_key) 

424 dsse._verify(signing_key, envelope) 

425 

426 # (8): verify the consistency of the log entry's body against 

427 # the other bundle materials. 

428 # NOTE: This is very slightly weaker than the consistency check 

429 # for hashedrekord entries, due to how inclusion is recorded for DSSE: 

430 # the included entry for DSSE includes an envelope hash that we 

431 # *cannot* verify, since the envelope is uncanonicalized JSON. 

432 # Instead, we manually pick apart the entry body below and verify 

433 # the parts we can (namely the payload hash and signature list). 

434 entry = bundle.log_entry 

435 if entry._inner.kind_version.kind != "dsse": 

436 raise VerificationError( 

437 f"Expected entry type dsse, got {entry._inner.kind_version.kind}" 

438 ) 

439 if entry._inner.kind_version.version == "0.0.2": 

440 _validate_dsse_v002_entry_body(bundle) 

441 elif entry._inner.kind_version.version == "0.0.1": 

442 _validate_dsse_v001_entry_body(bundle) 

443 else: 

444 raise VerificationError( 

445 f"Unsupported dsse version {entry._inner.kind_version.version}" 

446 ) 

447 

448 return (envelope._inner.payload_type, envelope._inner.payload) 

449 

450 def verify_artifact( 

451 self, 

452 input_: bytes | Hashed, 

453 bundle: Bundle, 

454 policy: VerificationPolicy, 

455 ) -> None: 

456 """ 

457 Public API for verifying. 

458 

459 `input_` is the input to verify, either as a buffer of contents or as 

460 a prehashed `Hashed` object. 

461 

462 `bundle` is the Sigstore `Bundle` to verify against. 

463 

464 `policy` is the `VerificationPolicy` to verify against. 

465 

466 On failure, this method raises `VerificationError`. 

467 """ 

468 

469 # (1) through (6) are performed by `_verify_common_signing_cert`. 

470 self._verify_common_signing_cert(bundle, policy) 

471 

472 hashed_input = sha256_digest(input_) 

473 bundle_signature = bundle._inner.message_signature 

474 if bundle_signature is None: 

475 raise VerificationError("Missing bundle message signature") 

476 

477 # signature is verified over input digest, but if the bundle documents the digest we still 

478 # want to ensure it matches the input digest: 

479 if ( 

480 bundle_signature.message_digest is not None 

481 and hashed_input.digest != bundle_signature.message_digest.digest 

482 ): 

483 raise VerificationError("Bundle message digest mismatch") 

484 

485 # (7): verify that the signature was signed by the public key in the signing certificate. 

486 try: 

487 signing_key = bundle.signing_certificate.public_key() 

488 signing_key = cast(ec.EllipticCurvePublicKey, signing_key) 

489 signing_key.verify( 

490 bundle_signature.signature, 

491 hashed_input.digest, 

492 ec.ECDSA(hashed_input._as_prehashed()), 

493 ) 

494 except InvalidSignature: 

495 raise VerificationError("Signature is invalid for input") 

496 

497 _logger.debug("Successfully verified signature...") 

498 

499 # (8): verify the consistency of the log entry's body against 

500 # the other bundle materials (and input being verified). 

501 entry = bundle.log_entry 

502 if entry._inner.kind_version.kind != "hashedrekord": 

503 raise VerificationError( 

504 f"Expected entry type hashedrekord, got {entry._inner.kind_version.kind}" 

505 ) 

506 

507 if entry._inner.kind_version.version == "0.0.2": 

508 _validate_hashedrekord_v002_entry_body(bundle, hashed_input) 

509 elif entry._inner.kind_version.version == "0.0.1": 

510 _validate_hashedrekord_v001_entry_body(bundle, hashed_input) 

511 else: 

512 raise VerificationError( 

513 f"Unsupported hashedrekord version {entry._inner.kind_version.version}" 

514 ) 

515 

516 

517def _validate_dsse_v001_entry_body(bundle: Bundle) -> None: 

518 """ 

519 Validate the Entry body for dsse v001. 

520 """ 

521 entry = bundle.log_entry 

522 envelope = bundle._dsse_envelope 

523 if envelope is None: 

524 raise VerificationError( 

525 "cannot perform DSSE verification on a bundle without a DSSE envelope" 

526 ) 

527 try: 

528 entry_body = rekor_types.Dsse.model_validate_json( 

529 entry._inner.canonicalized_body 

530 ) 

531 except ValidationError as exc: 

532 raise VerificationError(f"invalid DSSE log entry: {exc}") 

533 

534 payload_hash = sha256_digest(envelope._inner.payload).digest.hex() 

535 if ( 

536 entry_body.spec.root.payload_hash.algorithm # type: ignore[union-attr] 

537 != rekor_types.dsse.Algorithm.SHA256 

538 ): 

539 raise VerificationError("expected SHA256 payload hash in DSSE log entry") 

540 if payload_hash != entry_body.spec.root.payload_hash.value: # type: ignore[union-attr] 

541 raise VerificationError("log entry payload hash does not match bundle") 

542 

543 # NOTE: Like `dsse._verify`: multiple signatures would be frivolous here, 

544 # but we handle them just in case the signer has somehow produced multiple 

545 # signatures for their envelope with the same signing key. 

546 signatures = [ 

547 rekor_types.dsse.Signature( 

548 signature=base64.b64encode(signature.sig).decode(), 

549 verifier=base64_encode_pem_cert(bundle.signing_certificate), 

550 ) 

551 for signature in envelope._inner.signatures 

552 ] 

553 if signatures != entry_body.spec.root.signatures: 

554 raise VerificationError("log entry signatures do not match bundle") 

555 

556 

557def _validate_dsse_v002_entry_body(bundle: Bundle) -> None: 

558 """ 

559 Validate Entry body for dsse v002. 

560 """ 

561 entry = bundle.log_entry 

562 envelope = bundle._dsse_envelope 

563 if envelope is None: 

564 raise VerificationError( 

565 "cannot perform DSSE verification on a bundle without a DSSE envelope" 

566 ) 

567 try: 

568 v2_body = v2.entry.Entry.from_json(entry._inner.canonicalized_body) 

569 except ValidationError as exc: 

570 raise VerificationError(f"invalid DSSE log entry: {exc}") 

571 

572 if v2_body.spec.dsse_v002 is None: 

573 raise VerificationError("invalid DSSE log entry: missing dsse_v002 field") 

574 

575 if v2_body.spec.dsse_v002.payload_hash.algorithm != v1.HashAlgorithm.SHA2_256: 

576 raise VerificationError("expected SHA256 hash in DSSE entry") 

577 

578 digest = sha256_digest(envelope._inner.payload).digest 

579 if v2_body.spec.dsse_v002.payload_hash.digest != digest: 

580 raise VerificationError("DSSE entry payload hash does not match bundle") 

581 

582 v2_signatures = [ 

583 v2.verifier.Signature( 

584 content=base64.b64encode(signature.sig), 

585 verifier=_v2_verifier_from_certificate(bundle.signing_certificate), 

586 ) 

587 for signature in envelope._inner.signatures 

588 ] 

589 if v2_signatures != v2_body.spec.dsse_v002.signatures: 

590 raise VerificationError("log entry signatures do not match bundle") 

591 

592 

593def _validate_hashedrekord_v001_entry_body( 

594 bundle: Bundle, hashed_input: Hashed 

595) -> None: 

596 """ 

597 Validate the Entry body for hashedrekord v001. 

598 """ 

599 entry = bundle.log_entry 

600 expected_body = _hashedrekord_from_parts( 

601 bundle.signing_certificate, 

602 bundle._inner.message_signature.signature, # type: ignore[union-attr] 

603 hashed_input, 

604 ) 

605 actual_body = rekor_types.Hashedrekord.model_validate_json( 

606 entry._inner.canonicalized_body 

607 ) 

608 if expected_body != actual_body: 

609 raise VerificationError( 

610 "transparency log entry is inconsistent with other materials" 

611 ) 

612 

613 

614def _validate_hashedrekord_v002_entry_body( 

615 bundle: Bundle, hashed_input: Hashed 

616) -> None: 

617 """ 

618 Validate Entry body for hashedrekord v002. 

619 """ 

620 entry = bundle.log_entry 

621 if bundle._inner.message_signature is None: 

622 raise VerificationError( 

623 "invalid hashedrekord log entry: missing message signature" 

624 ) 

625 v2_expected_body = v2.entry.Entry( 

626 kind=entry._inner.kind_version.kind, 

627 api_version=entry._inner.kind_version.version, 

628 spec=v2.entry.Spec( 

629 hashed_rekord_v002=v2.hashedrekord.HashedRekordLogEntryV002( 

630 data=v1.HashOutput( 

631 algorithm=hashed_input.algorithm, 

632 digest=base64.b64encode(hashed_input.digest), 

633 ), 

634 signature=v2.verifier.Signature( 

635 content=base64.b64encode(bundle._inner.message_signature.signature), 

636 verifier=_v2_verifier_from_certificate(bundle.signing_certificate), 

637 ), 

638 ) 

639 ), 

640 ) 

641 v2_actual_body = v2.entry.Entry.from_json(entry._inner.canonicalized_body) 

642 if v2_expected_body != v2_actual_body: 

643 raise VerificationError( 

644 "transparency log entry is inconsistent with other materials" 

645 ) 

646 

647 

648def _v2_verifier_from_certificate(certificate: Certificate) -> v2.verifier.Verifier: 

649 """ 

650 Return a Rekor v2 Verifier for the signing certificate. 

651 

652 This method decides which signature algorithms are supported for verification 

653 (in a rekor v2 entry), see 

654 https://github.com/sigstore/architecture-docs/blob/main/algorithm-registry.md. 

655 Note that actual signature verification happens in verify_artifact() and 

656 verify_dsse(): New keytypes need to be added here and in those methods. 

657 """ 

658 public_key = certificate.public_key() 

659 

660 if isinstance(public_key, ec.EllipticCurvePublicKey): 

661 if isinstance(public_key.curve, ec.SECP256R1): 

662 key_details = v1.PublicKeyDetails.PKIX_ECDSA_P256_SHA_256 

663 elif isinstance(public_key.curve, ec.SECP384R1): 

664 key_details = v1.PublicKeyDetails.PKIX_ECDSA_P384_SHA_384 

665 elif isinstance(public_key.curve, ec.SECP521R1): 

666 key_details = v1.PublicKeyDetails.PKIX_ECDSA_P521_SHA_512 

667 else: 

668 raise ValueError(f"Unsupported EC curve: {public_key.curve.name}") 

669 else: 

670 raise ValueError(f"Unsupported public key type: {type(public_key)}") 

671 

672 return v2.verifier.Verifier( 

673 x509_certificate=v1.X509Certificate( 

674 raw_bytes=base64.b64encode( 

675 certificate.public_bytes(encoding=serialization.Encoding.DER) 

676 ) 

677 ), 

678 key_details=key_details, 

679 )