Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sigstore/verify/verifier.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

234 statements  

1# Copyright 2022 The Sigstore Authors 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15""" 

16Verification API machinery. 

17""" 

18 

19from __future__ import annotations 

20 

21import base64 

22import logging 

23from datetime import datetime, timezone 

24from typing import cast 

25 

26import rekor_types 

27from cryptography.exceptions import InvalidSignature 

28from cryptography.hazmat.primitives import serialization 

29from cryptography.hazmat.primitives.asymmetric import ec 

30from cryptography.x509 import Certificate, ExtendedKeyUsage, KeyUsage 

31from cryptography.x509.oid import ExtendedKeyUsageOID 

32from OpenSSL.crypto import ( 

33 X509, 

34 X509Store, 

35 X509StoreContext, 

36 X509StoreContextError, 

37 X509StoreFlags, 

38) 

39from pydantic import ValidationError 

40from rfc3161_client import TimeStampResponse, VerifierBuilder 

41from rfc3161_client import VerificationError as Rfc3161VerificationError 

42from sigstore_models.common import v1 

43from sigstore_models.rekor import v2 

44 

45from sigstore import dsse 

46from sigstore._internal.rekor import _hashedrekord_from_parts 

47from sigstore._internal.rekor.client import RekorClient 

48from sigstore._internal.sct import ( 

49 verify_sct, 

50) 

51from sigstore._internal.timestamp import TimestampSource, TimestampVerificationResult 

52from sigstore._internal.trust import KeyringPurpose 

53from sigstore._utils import base64_encode_pem_cert, sha256_digest 

54from sigstore.errors import CertValidationError, VerificationError 

55from sigstore.hashes import Hashed 

56from sigstore.models import Bundle, ClientTrustConfig, TrustedRoot 

57from sigstore.verify.policy import VerificationPolicy 

58 

59_logger = logging.getLogger(__name__) 

60 

61# Limit the number of timestamps to prevent DoS 

62# From https://github.com/sigstore/sigstore-go/blob/e92142f0734064ebf6001f188b7330a1212245fe/pkg/verify/tsa.go#L29 

63MAX_ALLOWED_TIMESTAMP: int = 32 

64 

65# When verifying an entry, this threshold represents the minimum number of required 

66# verified times to consider a signature valid. 

67VERIFIED_TIME_THRESHOLD: int = 1 

68 

69 

70class Verifier: 

71 """ 

72 The primary API for verification operations. 

73 """ 

74 

75 def __init__(self, *, trusted_root: TrustedRoot): 

76 """ 

77 Create a new `Verifier`. 

78 

79 `trusted_root` is the `TrustedRoot` object containing the root of trust 

80 for the verification process. 

81 """ 

82 self._fulcio_certificate_chain: list[X509] = [ 

83 X509.from_cryptography(parent_cert) 

84 for parent_cert in trusted_root.get_fulcio_certs() 

85 ] 

86 self._trusted_root = trusted_root 

87 

88 # this is an ugly hack needed for verifying "detached" materials 

89 # In reality we should be choosing the rekor instance based on the logid 

90 url = trusted_root._inner.tlogs[0].base_url 

91 self._rekor = RekorClient(url) 

92 

93 @classmethod 

94 def production(cls, *, offline: bool = False) -> Verifier: 

95 """ 

96 Return a `Verifier` instance configured against Sigstore's production-level services. 

97 

98 `offline` controls the Trusted Root refresh behavior: if `True`, 

99 the verifier uses the Trusted Root in the local TUF cache. If `False`, 

100 a TUF repository refresh is attempted. 

101 """ 

102 config = ClientTrustConfig.production(offline=offline) 

103 return cls( 

104 trusted_root=config.trusted_root, 

105 ) 

106 

107 @classmethod 

108 def staging(cls, *, offline: bool = False) -> Verifier: 

109 """ 

110 Return a `Verifier` instance configured against Sigstore's staging-level services. 

111 

112 `offline` controls the Trusted Root refresh behavior: if `True`, 

113 the verifier uses the Trusted Root in the local TUF cache. If `False`, 

114 a TUF repository refresh is attempted. 

115 """ 

116 config = ClientTrustConfig.staging(offline=offline) 

117 return cls( 

118 trusted_root=config.trusted_root, 

119 ) 

120 

121 def _verify_signed_timestamp( 

122 self, timestamp_response: TimeStampResponse, message: bytes 

123 ) -> TimestampVerificationResult | None: 

124 """ 

125 Verify a Signed Timestamp using the TSA provided by the Trusted Root. 

126 """ 

127 cert_authorities = self._trusted_root.get_timestamp_authorities() 

128 for certificate_authority in cert_authorities: 

129 certificates = certificate_authority.certificates(allow_expired=True) 

130 

131 # We expect at least a signing cert and a root cert but there may be intermediates 

132 if len(certificates) < 2: 

133 _logger.debug("Unable to verify Timestamp: cert chain is incomplete") 

134 continue 

135 

136 builder = ( 

137 VerifierBuilder() 

138 .tsa_certificate(certificates[0]) 

139 .add_root_certificate(certificates[-1]) 

140 ) 

141 for certificate in certificates[1:-1]: 

142 builder = builder.add_intermediate_certificate(certificate) 

143 

144 verifier = builder.build() 

145 try: 

146 verifier.verify_message(timestamp_response, message) 

147 except Rfc3161VerificationError: 

148 _logger.debug("Unable to verify Timestamp with CA.", exc_info=True) 

149 continue 

150 

151 if ( 

152 certificate_authority.validity_period_start 

153 <= timestamp_response.tst_info.gen_time 

154 ) and ( 

155 not certificate_authority.validity_period_end 

156 or timestamp_response.tst_info.gen_time 

157 < certificate_authority.validity_period_end 

158 ): 

159 return TimestampVerificationResult( 

160 source=TimestampSource.TIMESTAMP_AUTHORITY, 

161 time=timestamp_response.tst_info.gen_time, 

162 ) 

163 

164 _logger.debug("Unable to verify Timestamp because not in CA time range.") 

165 

166 return None 

167 

168 def _verify_timestamp_authority( 

169 self, bundle: Bundle 

170 ) -> list[TimestampVerificationResult]: 

171 """ 

172 Verify that the given bundle has been timestamped by a trusted timestamp authority 

173 and that the timestamp is valid. 

174 

175 Returns the number of valid signed timestamp in the bundle. 

176 """ 

177 timestamp_responses = [] 

178 if ( 

179 timestamp_verification_data 

180 := bundle.verification_material.timestamp_verification_data 

181 ): 

182 timestamp_responses = timestamp_verification_data.rfc3161_timestamps 

183 

184 if len(timestamp_responses) > MAX_ALLOWED_TIMESTAMP: 

185 msg = f"too many signed timestamp: {len(timestamp_responses)} > {MAX_ALLOWED_TIMESTAMP}" 

186 raise VerificationError(msg) 

187 

188 if len(set(timestamp_responses)) != len(timestamp_responses): 

189 msg = "duplicate timestamp found" 

190 raise VerificationError(msg) 

191 

192 verified_timestamps = [ 

193 result 

194 for tsr in timestamp_responses 

195 if (result := self._verify_signed_timestamp(tsr, bundle.signature)) 

196 ] 

197 

198 return verified_timestamps 

199 

200 def _establish_time(self, bundle: Bundle) -> list[TimestampVerificationResult]: 

201 """ 

202 Establish the time for bundle verification. 

203 

204 This method uses timestamps from two possible sources: 

205 1. RFC3161 signed timestamps from a Timestamping Authority (TSA) 

206 2. Transparency Log timestamps 

207 """ 

208 verified_timestamps = [] 

209 

210 # If a timestamp from the timestamping service is available, the Verifier MUST 

211 # perform path validation using the timestamp from the Timestamping Service. 

212 if bundle.verification_material.timestamp_verification_data: 

213 if not self._trusted_root.get_timestamp_authorities(): 

214 msg = ( 

215 "no Timestamp Authorities have been provided to validate this " 

216 "bundle but it contains a signed timestamp" 

217 ) 

218 raise VerificationError(msg) 

219 

220 timestamp_from_tsa = self._verify_timestamp_authority(bundle) 

221 verified_timestamps.extend(timestamp_from_tsa) 

222 

223 # If a timestamp from the Transparency Service is available, the Verifier MUST 

224 # perform path validation using the timestamp from the Transparency Service. 

225 # NOTE: We only include this timestamp if it's accompanied by an inclusion 

226 # promise that cryptographically binds it. We verify the inclusion promise 

227 # itself later, as part of log entry verification. 

228 if ( 

229 timestamp := bundle.log_entry._inner.integrated_time 

230 ) and bundle.log_entry._inner.inclusion_promise: 

231 kv = bundle.log_entry._inner.kind_version 

232 if not (kv.kind in ["dsse", "hashedrekord"] and kv.version == "0.0.1"): 

233 raise VerificationError( 

234 "Integrated time only supported for dsse/hashedrekord 0.0.1 types" 

235 ) 

236 

237 verified_timestamps.append( 

238 TimestampVerificationResult( 

239 source=TimestampSource.TRANSPARENCY_SERVICE, 

240 time=datetime.fromtimestamp(timestamp, tz=timezone.utc), 

241 ) 

242 ) 

243 return verified_timestamps 

244 

245 def _verify_chain_at_time( 

246 self, certificate: X509, timestamp_result: TimestampVerificationResult 

247 ) -> list[X509]: 

248 """ 

249 Verify the validity of the certificate chain at the given time. 

250 

251 Raises a VerificationError if the chain can't be built or be verified. 

252 """ 

253 # NOTE: The `X509Store` object cannot have its time reset once the `set_time` 

254 # method been called on it. To get around this, we construct a new one in each 

255 # call. 

256 store = X509Store() 

257 # NOTE: By explicitly setting the flags here, we ensure that OpenSSL's 

258 # PARTIAL_CHAIN default does not change on us. Enabling PARTIAL_CHAIN 

259 # would be strictly more conformant of OpenSSL, but we currently 

260 # *want* the "long" chain behavior of performing path validation 

261 # down to a self-signed root. 

262 store.set_flags(X509StoreFlags.X509_STRICT) 

263 for parent_cert_ossl in self._fulcio_certificate_chain: 

264 store.add_cert(parent_cert_ossl) 

265 

266 store.set_time(timestamp_result.time) 

267 

268 store_ctx = X509StoreContext(store, certificate) 

269 

270 try: 

271 # get_verified_chain returns the full chain including the end-entity certificate 

272 # and chain should contain only CA certificates 

273 return store_ctx.get_verified_chain()[1:] 

274 except X509StoreContextError as e: 

275 raise CertValidationError( 

276 f"failed to build timestamp certificate chain: {e}" 

277 ) 

278 

279 def _verify_common_signing_cert( 

280 self, bundle: Bundle, policy: VerificationPolicy 

281 ) -> None: 

282 """ 

283 Performs the signing certificate verification steps that are shared between 

284 `verify_dsse` and `verify_artifact`. 

285 

286 Raises `VerificationError` on all failures. 

287 """ 

288 

289 # In order to verify an artifact, we need to achieve the following: 

290 # 

291 # 0. Establish a time for the signature. 

292 # 1. Verify that the signing certificate chains to the root of trust 

293 # and is valid at the time of signing. 

294 # 2. Verify the signing certificate's SCT. 

295 # 3. Verify that the signing certificate conforms to the Sigstore 

296 # X.509 profile as well as the passed-in `VerificationPolicy`. 

297 # 4. Verify the inclusion proof and signed checkpoint for the log 

298 # entry. 

299 # 5. Verify the inclusion promise for the log entry, if present. 

300 # 6. Verify the timely insertion of the log entry against the validity 

301 # period for the signing certificate. 

302 # 7. Verify the signature and input against the signing certificate's 

303 # public key. 

304 # 8. Verify the transparency log entry's consistency against the other 

305 # materials, to prevent variants of CVE-2022-36056. 

306 # 

307 # This method performs steps (0) through (6) above. Its caller 

308 # MUST perform steps (7) and (8) separately, since they vary based on 

309 # the kind of verification being performed (i.e. hashedrekord, DSSE, etc.) 

310 

311 cert = bundle.signing_certificate 

312 

313 # NOTE: The `X509Store` object currently cannot have its time reset once the `set_time` 

314 # method been called on it. To get around this, we construct a new one for every `verify` 

315 # call. 

316 store = X509Store() 

317 # NOTE: By explicitly setting the flags here, we ensure that OpenSSL's 

318 # PARTIAL_CHAIN default does not change on us. Enabling PARTIAL_CHAIN 

319 # would be strictly more conformant of OpenSSL, but we currently 

320 # *want* the "long" chain behavior of performing path validation 

321 # down to a self-signed root. 

322 store.set_flags(X509StoreFlags.X509_STRICT) 

323 for parent_cert_ossl in self._fulcio_certificate_chain: 

324 store.add_cert(parent_cert_ossl) 

325 

326 # (0): Establishing a Time for the Signature 

327 # First, establish verified times for the signature. This is required to 

328 # validate the certificate chain, so this step comes first. 

329 # These include TSA timestamps and (in the case of rekor v1 entries) 

330 # rekor log integrated time. 

331 verified_timestamps = self._establish_time(bundle) 

332 if len(verified_timestamps) < VERIFIED_TIME_THRESHOLD: 

333 raise VerificationError("not enough sources of verified time") 

334 

335 # (1): verify that the signing certificate is signed by the root 

336 # certificate and that the signing certificate was valid at the 

337 # time of signing. 

338 cert_ossl = X509.from_cryptography(cert) 

339 chain: list[X509] = [] 

340 for vts in verified_timestamps: 

341 chain = self._verify_chain_at_time(cert_ossl, vts) 

342 

343 # (2): verify the signing certificate's SCT. 

344 try: 

345 verify_sct( 

346 cert, 

347 [parent_cert.to_cryptography() for parent_cert in chain], 

348 self._trusted_root.ct_keyring(KeyringPurpose.VERIFY), 

349 ) 

350 except VerificationError as e: 

351 raise VerificationError(f"failed to verify SCT on signing certificate: {e}") 

352 

353 # (3): verify the signing certificate against the Sigstore 

354 # X.509 profile and verify against the given `VerificationPolicy`. 

355 usage_ext = cert.extensions.get_extension_for_class(KeyUsage) 

356 if not usage_ext.value.digital_signature: 

357 raise VerificationError("Key usage is not of type `digital signature`") 

358 

359 extended_usage_ext = cert.extensions.get_extension_for_class(ExtendedKeyUsage) 

360 if ExtendedKeyUsageOID.CODE_SIGNING not in extended_usage_ext.value: 

361 raise VerificationError("Extended usage does not contain `code signing`") 

362 

363 policy.verify(cert) 

364 

365 _logger.debug("Successfully verified signing certificate validity...") 

366 

367 # (4): verify the inclusion proof and signed checkpoint for the 

368 # log entry. 

369 # (5): verify the inclusion promise for the log entry, if present. 

370 entry = bundle.log_entry 

371 try: 

372 entry._verify(self._trusted_root.rekor_keyring(KeyringPurpose.VERIFY)) 

373 except VerificationError as exc: 

374 raise VerificationError(f"invalid log entry: {exc}") 

375 

376 # (6): verify our established times (timestamps or the log integration time) are 

377 # within signing certificate validity period. 

378 for vts in verified_timestamps: 

379 if not ( 

380 bundle.signing_certificate.not_valid_before_utc 

381 <= vts.time 

382 <= bundle.signing_certificate.not_valid_after_utc 

383 ): 

384 raise VerificationError( 

385 f"invalid signing cert: expired at time of signing, time via {vts}" 

386 ) 

387 

388 def verify_dsse( 

389 self, bundle: Bundle, policy: VerificationPolicy 

390 ) -> tuple[str, bytes]: 

391 """ 

392 Verifies an bundle's DSSE envelope, returning the encapsulated payload 

393 and its content type. 

394 

395 This method is only for DSSE-enveloped payloads. To verify 

396 an arbitrary input against a bundle, use the `verify_artifact` 

397 method. 

398 

399 `bundle` is the Sigstore `Bundle` to both verify and verify against. 

400 

401 `policy` is the `VerificationPolicy` to verify against. 

402 

403 Returns a tuple of `(type, payload)`, where `type` is the payload's 

404 type as encoded in the DSSE envelope and `payload` is the raw `bytes` 

405 of the payload. No validation of either `type` or `payload` is 

406 performed; users of this API **must** assert that `type` is known 

407 to them before proceeding to handle `payload` in an application-dependent 

408 manner. 

409 """ 

410 

411 # (1) through (6) are performed by `_verify_common_signing_cert`. 

412 self._verify_common_signing_cert(bundle, policy) 

413 

414 # (7): verify the bundle's signature and DSSE envelope against the 

415 # signing certificate's public key. 

416 envelope = bundle._dsse_envelope 

417 if envelope is None: 

418 raise VerificationError( 

419 "cannot perform DSSE verification on a bundle without a DSSE envelope" 

420 ) 

421 

422 signing_key = bundle.signing_certificate.public_key() 

423 signing_key = cast(ec.EllipticCurvePublicKey, signing_key) 

424 dsse._verify(signing_key, envelope) 

425 

426 # (8): verify the consistency of the log entry's body against 

427 # the other bundle materials. 

428 # NOTE: This is very slightly weaker than the consistency check 

429 # for hashedrekord entries, due to how inclusion is recorded for DSSE: 

430 # the included entry for DSSE includes an envelope hash that we 

431 # *cannot* verify, since the envelope is uncanonicalized JSON. 

432 # Instead, we manually pick apart the entry body below and verify 

433 # the parts we can (namely the payload hash and signature list). 

434 entry = bundle.log_entry 

435 if entry._inner.kind_version.kind != "dsse": 

436 raise VerificationError( 

437 f"Expected entry type dsse, got {entry._inner.kind_version.kind}" 

438 ) 

439 if entry._inner.kind_version.version == "0.0.2": 

440 _validate_dsse_v002_entry_body(bundle) 

441 elif entry._inner.kind_version.version == "0.0.1": 

442 _validate_dsse_v001_entry_body(bundle) 

443 else: 

444 raise VerificationError( 

445 f"Unsupported dsse version {entry._inner.kind_version.version}" 

446 ) 

447 

448 return (envelope._inner.payload_type, envelope._inner.payload) 

449 

450 def verify_artifact( 

451 self, 

452 input_: bytes | Hashed, 

453 bundle: Bundle, 

454 policy: VerificationPolicy, 

455 ) -> None: 

456 """ 

457 Public API for verifying. 

458 

459 `input_` is the input to verify, either as a buffer of contents or as 

460 a prehashed `Hashed` object. 

461 

462 `bundle` is the Sigstore `Bundle` to verify against. 

463 

464 `policy` is the `VerificationPolicy` to verify against. 

465 

466 On failure, this method raises `VerificationError`. 

467 """ 

468 

469 # (1) through (6) are performed by `_verify_common_signing_cert`. 

470 self._verify_common_signing_cert(bundle, policy) 

471 

472 hashed_input = sha256_digest(input_) 

473 

474 # (7): verify that the signature was signed by the public key in the signing certificate. 

475 try: 

476 signing_key = bundle.signing_certificate.public_key() 

477 signing_key = cast(ec.EllipticCurvePublicKey, signing_key) 

478 signing_key.verify( 

479 bundle._inner.message_signature.signature, # type: ignore[union-attr] 

480 hashed_input.digest, 

481 ec.ECDSA(hashed_input._as_prehashed()), 

482 ) 

483 except InvalidSignature: 

484 raise VerificationError("Signature is invalid for input") 

485 

486 _logger.debug("Successfully verified signature...") 

487 

488 # (8): verify the consistency of the log entry's body against 

489 # the other bundle materials (and input being verified). 

490 entry = bundle.log_entry 

491 if entry._inner.kind_version.kind != "hashedrekord": 

492 raise VerificationError( 

493 f"Expected entry type hashedrekord, got {entry._inner.kind_version.kind}" 

494 ) 

495 

496 if entry._inner.kind_version.version == "0.0.2": 

497 _validate_hashedrekord_v002_entry_body(bundle, hashed_input) 

498 elif entry._inner.kind_version.version == "0.0.1": 

499 _validate_hashedrekord_v001_entry_body(bundle, hashed_input) 

500 else: 

501 raise VerificationError( 

502 f"Unsupported hashedrekord version {entry._inner.kind_version.version}" 

503 ) 

504 

505 

506def _validate_dsse_v001_entry_body(bundle: Bundle) -> None: 

507 """ 

508 Validate the Entry body for dsse v001. 

509 """ 

510 entry = bundle.log_entry 

511 envelope = bundle._dsse_envelope 

512 if envelope is None: 

513 raise VerificationError( 

514 "cannot perform DSSE verification on a bundle without a DSSE envelope" 

515 ) 

516 try: 

517 entry_body = rekor_types.Dsse.model_validate_json( 

518 entry._inner.canonicalized_body 

519 ) 

520 except ValidationError as exc: 

521 raise VerificationError(f"invalid DSSE log entry: {exc}") 

522 

523 payload_hash = sha256_digest(envelope._inner.payload).digest.hex() 

524 if ( 

525 entry_body.spec.root.payload_hash.algorithm # type: ignore[union-attr] 

526 != rekor_types.dsse.Algorithm.SHA256 

527 ): 

528 raise VerificationError("expected SHA256 payload hash in DSSE log entry") 

529 if payload_hash != entry_body.spec.root.payload_hash.value: # type: ignore[union-attr] 

530 raise VerificationError("log entry payload hash does not match bundle") 

531 

532 # NOTE: Like `dsse._verify`: multiple signatures would be frivolous here, 

533 # but we handle them just in case the signer has somehow produced multiple 

534 # signatures for their envelope with the same signing key. 

535 signatures = [ 

536 rekor_types.dsse.Signature( 

537 signature=base64.b64encode(signature.sig).decode(), 

538 verifier=base64_encode_pem_cert(bundle.signing_certificate), 

539 ) 

540 for signature in envelope._inner.signatures 

541 ] 

542 if signatures != entry_body.spec.root.signatures: 

543 raise VerificationError("log entry signatures do not match bundle") 

544 

545 

546def _validate_dsse_v002_entry_body(bundle: Bundle) -> None: 

547 """ 

548 Validate Entry body for dsse v002. 

549 """ 

550 entry = bundle.log_entry 

551 envelope = bundle._dsse_envelope 

552 if envelope is None: 

553 raise VerificationError( 

554 "cannot perform DSSE verification on a bundle without a DSSE envelope" 

555 ) 

556 try: 

557 v2_body = v2.entry.Entry.from_json(entry._inner.canonicalized_body) 

558 except ValidationError as exc: 

559 raise VerificationError(f"invalid DSSE log entry: {exc}") 

560 

561 if v2_body.spec.dsse_v002 is None: 

562 raise VerificationError("invalid DSSE log entry: missing dsse_v002 field") 

563 

564 if v2_body.spec.dsse_v002.payload_hash.algorithm != v1.HashAlgorithm.SHA2_256: 

565 raise VerificationError("expected SHA256 hash in DSSE entry") 

566 

567 digest = sha256_digest(envelope._inner.payload).digest 

568 if v2_body.spec.dsse_v002.payload_hash.digest != digest: 

569 raise VerificationError("DSSE entry payload hash does not match bundle") 

570 

571 v2_signatures = [ 

572 v2.verifier.Signature( 

573 content=base64.b64encode(signature.sig), 

574 verifier=_v2_verifier_from_certificate(bundle.signing_certificate), 

575 ) 

576 for signature in envelope._inner.signatures 

577 ] 

578 if v2_signatures != v2_body.spec.dsse_v002.signatures: 

579 raise VerificationError("log entry signatures do not match bundle") 

580 

581 

582def _validate_hashedrekord_v001_entry_body( 

583 bundle: Bundle, hashed_input: Hashed 

584) -> None: 

585 """ 

586 Validate the Entry body for hashedrekord v001. 

587 """ 

588 entry = bundle.log_entry 

589 expected_body = _hashedrekord_from_parts( 

590 bundle.signing_certificate, 

591 bundle._inner.message_signature.signature, # type: ignore[union-attr] 

592 hashed_input, 

593 ) 

594 actual_body = rekor_types.Hashedrekord.model_validate_json( 

595 entry._inner.canonicalized_body 

596 ) 

597 if expected_body != actual_body: 

598 raise VerificationError( 

599 "transparency log entry is inconsistent with other materials" 

600 ) 

601 

602 

603def _validate_hashedrekord_v002_entry_body( 

604 bundle: Bundle, hashed_input: Hashed 

605) -> None: 

606 """ 

607 Validate Entry body for hashedrekord v002. 

608 """ 

609 entry = bundle.log_entry 

610 if bundle._inner.message_signature is None: 

611 raise VerificationError( 

612 "invalid hashedrekord log entry: missing message signature" 

613 ) 

614 v2_expected_body = v2.entry.Entry( 

615 kind=entry._inner.kind_version.kind, 

616 api_version=entry._inner.kind_version.version, 

617 spec=v2.entry.Spec( 

618 hashed_rekord_v002=v2.hashedrekord.HashedRekordLogEntryV002( 

619 data=v1.HashOutput( 

620 algorithm=hashed_input.algorithm, 

621 digest=base64.b64encode(hashed_input.digest), 

622 ), 

623 signature=v2.verifier.Signature( 

624 content=base64.b64encode(bundle._inner.message_signature.signature), 

625 verifier=_v2_verifier_from_certificate(bundle.signing_certificate), 

626 ), 

627 ) 

628 ), 

629 ) 

630 v2_actual_body = v2.entry.Entry.from_json(entry._inner.canonicalized_body) 

631 if v2_expected_body != v2_actual_body: 

632 raise VerificationError( 

633 "transparency log entry is inconsistent with other materials" 

634 ) 

635 

636 

637def _v2_verifier_from_certificate(certificate: Certificate) -> v2.verifier.Verifier: 

638 """ 

639 Return a Rekor v2 Verifier for the signing certificate. 

640 

641 This method decides which signature algorithms are supported for verification 

642 (in a rekor v2 entry), see 

643 https://github.com/sigstore/architecture-docs/blob/main/algorithm-registry.md. 

644 Note that actual signature verification happens in verify_artifact() and 

645 verify_dsse(): New keytypes need to be added here and in those methods. 

646 """ 

647 public_key = certificate.public_key() 

648 

649 if isinstance(public_key, ec.EllipticCurvePublicKey): 

650 if isinstance(public_key.curve, ec.SECP256R1): 

651 key_details = v1.PublicKeyDetails.PKIX_ECDSA_P256_SHA_256 

652 elif isinstance(public_key.curve, ec.SECP384R1): 

653 key_details = v1.PublicKeyDetails.PKIX_ECDSA_P384_SHA_384 

654 elif isinstance(public_key.curve, ec.SECP521R1): 

655 key_details = v1.PublicKeyDetails.PKIX_ECDSA_P521_SHA_512 

656 else: 

657 raise ValueError(f"Unsupported EC curve: {public_key.curve.name}") 

658 else: 

659 raise ValueError(f"Unsupported public key type: {type(public_key)}") 

660 

661 return v2.verifier.Verifier( 

662 x509_certificate=v1.X509Certificate( 

663 raw_bytes=base64.b64encode( 

664 certificate.public_bytes(encoding=serialization.Encoding.DER) 

665 ) 

666 ), 

667 key_details=key_details, 

668 )