Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sigstore/verify/verifier.py: 23%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

225 statements  

1# Copyright 2022 The Sigstore Authors 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15""" 

16Verification API machinery. 

17""" 

18 

19from __future__ import annotations 

20 

21import base64 

22import logging 

23from datetime import datetime, timezone 

24from typing import cast 

25 

26import rekor_types 

27from cryptography.exceptions import InvalidSignature 

28from cryptography.hazmat.primitives import serialization 

29from cryptography.hazmat.primitives.asymmetric import ec 

30from cryptography.x509 import Certificate, ExtendedKeyUsage, KeyUsage 

31from cryptography.x509.oid import ExtendedKeyUsageOID 

32from OpenSSL.crypto import ( 

33 X509, 

34 X509Store, 

35 X509StoreContext, 

36 X509StoreContextError, 

37 X509StoreFlags, 

38) 

39from pydantic import ValidationError 

40from rfc3161_client import TimeStampResponse, VerifierBuilder 

41from rfc3161_client import VerificationError as Rfc3161VerificationError 

42from sigstore_models.common import v1 

43from sigstore_models.rekor import v2 

44 

45from sigstore import dsse 

46from sigstore._internal.key_details import _get_key_details, _get_prehash 

47from sigstore._internal.rekor import _hashedrekord_from_parts 

48from sigstore._internal.rekor.client import RekorClient 

49from sigstore._internal.sct import ( 

50 verify_sct, 

51) 

52from sigstore._internal.timestamp import TimestampSource, TimestampVerificationResult 

53from sigstore._internal.trust import KeyringPurpose 

54from sigstore._utils import base64_encode_pem_cert, sha256_digest 

55from sigstore.errors import CertValidationError, VerificationError 

56from sigstore.hashes import Hashed 

57from sigstore.models import Bundle, ClientTrustConfig, TrustedRoot 

58from sigstore.verify.policy import VerificationPolicy 

59 

60_logger = logging.getLogger(__name__) 

61 

62# Limit the number of timestamps to prevent DoS 

63# From https://github.com/sigstore/sigstore-go/blob/e92142f0734064ebf6001f188b7330a1212245fe/pkg/verify/tsa.go#L29 

64MAX_ALLOWED_TIMESTAMP: int = 32 

65 

66# When verifying an entry, this threshold represents the minimum number of required 

67# verified times to consider a signature valid. 

68VERIFIED_TIME_THRESHOLD: int = 1 

69 

70 

71class Verifier: 

72 """ 

73 The primary API for verification operations. 

74 """ 

75 

76 def __init__(self, *, trusted_root: TrustedRoot): 

77 """ 

78 Create a new `Verifier`. 

79 

80 `trusted_root` is the `TrustedRoot` object containing the root of trust 

81 for the verification process. 

82 """ 

83 self._fulcio_certificate_chain: list[X509] = [ 

84 X509.from_cryptography(parent_cert) 

85 for parent_cert in trusted_root.get_fulcio_certs() 

86 ] 

87 self._trusted_root = trusted_root 

88 

89 # this is an ugly hack needed for verifying "detached" materials 

90 # In reality we should be choosing the rekor instance based on the logid 

91 url = trusted_root._inner.tlogs[0].base_url 

92 self._rekor = RekorClient(url) 

93 

94 @classmethod 

95 def production(cls, *, offline: bool = False) -> Verifier: 

96 """ 

97 Return a `Verifier` instance configured against Sigstore's production-level services. 

98 

99 `offline` controls the Trusted Root refresh behavior: if `True`, 

100 the verifier uses the Trusted Root in the local TUF cache. If `False`, 

101 a TUF repository refresh is attempted. 

102 """ 

103 config = ClientTrustConfig.production(offline=offline) 

104 return cls( 

105 trusted_root=config.trusted_root, 

106 ) 

107 

108 @classmethod 

109 def staging(cls, *, offline: bool = False) -> Verifier: 

110 """ 

111 Return a `Verifier` instance configured against Sigstore's staging-level services. 

112 

113 `offline` controls the Trusted Root refresh behavior: if `True`, 

114 the verifier uses the Trusted Root in the local TUF cache. If `False`, 

115 a TUF repository refresh is attempted. 

116 """ 

117 config = ClientTrustConfig.staging(offline=offline) 

118 return cls( 

119 trusted_root=config.trusted_root, 

120 ) 

121 

122 def _verify_signed_timestamp( 

123 self, timestamp_response: TimeStampResponse, message: bytes 

124 ) -> TimestampVerificationResult | None: 

125 """ 

126 Verify a Signed Timestamp using the TSA provided by the Trusted Root. 

127 """ 

128 cert_authorities = self._trusted_root.get_timestamp_authorities() 

129 for certificate_authority in cert_authorities: 

130 certificates = certificate_authority.certificates(allow_expired=True) 

131 

132 # We expect at least a signing cert and a root cert but there may be intermediates 

133 if len(certificates) < 2: 

134 _logger.debug("Unable to verify Timestamp: cert chain is incomplete") 

135 continue 

136 

137 builder = ( 

138 VerifierBuilder() 

139 .tsa_certificate(certificates[0]) 

140 .add_root_certificate(certificates[-1]) 

141 ) 

142 for certificate in certificates[1:-1]: 

143 builder = builder.add_intermediate_certificate(certificate) 

144 

145 verifier = builder.build() 

146 try: 

147 verifier.verify_message(timestamp_response, message) 

148 except Rfc3161VerificationError: 

149 _logger.debug("Unable to verify Timestamp with CA.", exc_info=True) 

150 continue 

151 

152 if ( 

153 certificate_authority.validity_period_start 

154 <= timestamp_response.tst_info.gen_time 

155 ) and ( 

156 not certificate_authority.validity_period_end 

157 or timestamp_response.tst_info.gen_time 

158 < certificate_authority.validity_period_end 

159 ): 

160 return TimestampVerificationResult( 

161 source=TimestampSource.TIMESTAMP_AUTHORITY, 

162 time=timestamp_response.tst_info.gen_time, 

163 ) 

164 

165 _logger.debug("Unable to verify Timestamp because not in CA time range.") 

166 

167 return None 

168 

169 def _verify_timestamp_authority( 

170 self, bundle: Bundle 

171 ) -> list[TimestampVerificationResult]: 

172 """ 

173 Verify that the given bundle has been timestamped by a trusted timestamp authority 

174 and that the timestamp is valid. 

175 

176 Returns the number of valid signed timestamp in the bundle. 

177 """ 

178 timestamp_responses = [] 

179 if ( 

180 timestamp_verification_data 

181 := bundle.verification_material.timestamp_verification_data 

182 ): 

183 timestamp_responses = timestamp_verification_data.rfc3161_timestamps 

184 

185 if len(timestamp_responses) > MAX_ALLOWED_TIMESTAMP: 

186 msg = f"too many signed timestamp: {len(timestamp_responses)} > {MAX_ALLOWED_TIMESTAMP}" 

187 raise VerificationError(msg) 

188 

189 if len(set(timestamp_responses)) != len(timestamp_responses): 

190 msg = "duplicate timestamp found" 

191 raise VerificationError(msg) 

192 

193 verified_timestamps = [ 

194 result 

195 for tsr in timestamp_responses 

196 if (result := self._verify_signed_timestamp(tsr, bundle.signature)) 

197 ] 

198 

199 return verified_timestamps 

200 

201 def _establish_time(self, bundle: Bundle) -> list[TimestampVerificationResult]: 

202 """ 

203 Establish the time for bundle verification. 

204 

205 This method uses timestamps from two possible sources: 

206 1. RFC3161 signed timestamps from a Timestamping Authority (TSA) 

207 2. Transparency Log timestamps 

208 """ 

209 verified_timestamps = [] 

210 

211 # If a timestamp from the timestamping service is available, the Verifier MUST 

212 # perform path validation using the timestamp from the Timestamping Service. 

213 if bundle.verification_material.timestamp_verification_data: 

214 if not self._trusted_root.get_timestamp_authorities(): 

215 msg = ( 

216 "no Timestamp Authorities have been provided to validate this " 

217 "bundle but it contains a signed timestamp" 

218 ) 

219 raise VerificationError(msg) 

220 

221 timestamp_from_tsa = self._verify_timestamp_authority(bundle) 

222 verified_timestamps.extend(timestamp_from_tsa) 

223 

224 # If a timestamp from the Transparency Service is available, the Verifier MUST 

225 # perform path validation using the timestamp from the Transparency Service. 

226 # NOTE: We only include this timestamp if it's accompanied by an inclusion 

227 # promise that cryptographically binds it. We verify the inclusion promise 

228 # itself later, as part of log entry verification. 

229 if ( 

230 timestamp := bundle.log_entry._inner.integrated_time 

231 ) and bundle.log_entry._inner.inclusion_promise: 

232 kv = bundle.log_entry._inner.kind_version 

233 if not (kv.kind in ["dsse", "hashedrekord"] and kv.version == "0.0.1"): 

234 raise VerificationError( 

235 "Integrated time only supported for dsse/hashedrekord 0.0.1 types" 

236 ) 

237 

238 verified_timestamps.append( 

239 TimestampVerificationResult( 

240 source=TimestampSource.TRANSPARENCY_SERVICE, 

241 time=datetime.fromtimestamp(timestamp, tz=timezone.utc), 

242 ) 

243 ) 

244 return verified_timestamps 

245 

246 def _verify_chain_at_time( 

247 self, certificate: X509, timestamp_result: TimestampVerificationResult 

248 ) -> list[X509]: 

249 """ 

250 Verify the validity of the certificate chain at the given time. 

251 

252 Raises a VerificationError if the chain can't be built or be verified. 

253 """ 

254 # NOTE: The `X509Store` object cannot have its time reset once the `set_time` 

255 # method been called on it. To get around this, we construct a new one in each 

256 # call. 

257 store = X509Store() 

258 # NOTE: By explicitly setting the flags here, we ensure that OpenSSL's 

259 # PARTIAL_CHAIN default does not change on us. Enabling PARTIAL_CHAIN 

260 # would be strictly more conformant of OpenSSL, but we currently 

261 # *want* the "long" chain behavior of performing path validation 

262 # down to a self-signed root. 

263 store.set_flags(X509StoreFlags.X509_STRICT) 

264 for parent_cert_ossl in self._fulcio_certificate_chain: 

265 store.add_cert(parent_cert_ossl) 

266 

267 store.set_time(timestamp_result.time) 

268 

269 store_ctx = X509StoreContext(store, certificate) 

270 

271 try: 

272 # get_verified_chain returns the full chain including the end-entity certificate 

273 # and chain should contain only CA certificates 

274 return store_ctx.get_verified_chain()[1:] 

275 except X509StoreContextError as e: 

276 raise CertValidationError( 

277 f"failed to build timestamp certificate chain: {e}" 

278 ) 

279 

280 def _verify_common_signing_cert( 

281 self, bundle: Bundle, policy: VerificationPolicy 

282 ) -> None: 

283 """ 

284 Performs the signing certificate verification steps that are shared between 

285 `verify_dsse` and `verify_artifact`. 

286 

287 Raises `VerificationError` on all failures. 

288 """ 

289 

290 # In order to verify an artifact, we need to achieve the following: 

291 # 

292 # 0. Establish a time for the signature. 

293 # 1. Verify that the signing certificate chains to the root of trust 

294 # and is valid at the time of signing. 

295 # 2. Verify the signing certificate's SCT. 

296 # 3. Verify that the signing certificate conforms to the Sigstore 

297 # X.509 profile as well as the passed-in `VerificationPolicy`. 

298 # 4. Verify the inclusion proof and signed checkpoint for the log 

299 # entry. 

300 # 5. Verify the inclusion promise for the log entry, if present. 

301 # 6. Verify the timely insertion of the log entry against the validity 

302 # period for the signing certificate. 

303 # 7. Verify the signature and input against the signing certificate's 

304 # public key. 

305 # 8. Verify the transparency log entry's consistency against the other 

306 # materials, to prevent variants of CVE-2022-36056. 

307 # 

308 # This method performs steps (0) through (6) above. Its caller 

309 # MUST perform steps (7) and (8) separately, since they vary based on 

310 # the kind of verification being performed (i.e. hashedrekord, DSSE, etc.) 

311 

312 cert = bundle.signing_certificate 

313 

314 # NOTE: The `X509Store` object currently cannot have its time reset once the `set_time` 

315 # method been called on it. To get around this, we construct a new one for every `verify` 

316 # call. 

317 store = X509Store() 

318 # NOTE: By explicitly setting the flags here, we ensure that OpenSSL's 

319 # PARTIAL_CHAIN default does not change on us. Enabling PARTIAL_CHAIN 

320 # would be strictly more conformant of OpenSSL, but we currently 

321 # *want* the "long" chain behavior of performing path validation 

322 # down to a self-signed root. 

323 store.set_flags(X509StoreFlags.X509_STRICT) 

324 for parent_cert_ossl in self._fulcio_certificate_chain: 

325 store.add_cert(parent_cert_ossl) 

326 

327 # (0): Establishing a Time for the Signature 

328 # First, establish verified times for the signature. This is required to 

329 # validate the certificate chain, so this step comes first. 

330 # These include TSA timestamps and (in the case of rekor v1 entries) 

331 # rekor log integrated time. 

332 verified_timestamps = self._establish_time(bundle) 

333 if len(verified_timestamps) < VERIFIED_TIME_THRESHOLD: 

334 raise VerificationError("not enough sources of verified time") 

335 

336 # (1): verify that the signing certificate is signed by the root 

337 # certificate and that the signing certificate was valid at the 

338 # time of signing. 

339 cert_ossl = X509.from_cryptography(cert) 

340 chain: list[X509] = [] 

341 for vts in verified_timestamps: 

342 chain = self._verify_chain_at_time(cert_ossl, vts) 

343 

344 # (2): verify the signing certificate's SCT. 

345 try: 

346 verify_sct( 

347 cert, 

348 [parent_cert.to_cryptography() for parent_cert in chain], 

349 self._trusted_root.ct_keyring(KeyringPurpose.VERIFY), 

350 ) 

351 except VerificationError as e: 

352 raise VerificationError(f"failed to verify SCT on signing certificate: {e}") 

353 

354 # (3): verify the signing certificate against the Sigstore 

355 # X.509 profile and verify against the given `VerificationPolicy`. 

356 usage_ext = cert.extensions.get_extension_for_class(KeyUsage) 

357 if not usage_ext.value.digital_signature: 

358 raise VerificationError("Key usage is not of type `digital signature`") 

359 

360 extended_usage_ext = cert.extensions.get_extension_for_class(ExtendedKeyUsage) 

361 if ExtendedKeyUsageOID.CODE_SIGNING not in extended_usage_ext.value: 

362 raise VerificationError("Extended usage does not contain `code signing`") 

363 

364 policy.verify(cert) 

365 

366 _logger.debug("Successfully verified signing certificate validity...") 

367 

368 # (4): verify the inclusion proof and signed checkpoint for the 

369 # log entry. 

370 # (5): verify the inclusion promise for the log entry, if present. 

371 entry = bundle.log_entry 

372 try: 

373 entry._verify(self._trusted_root.rekor_keyring(KeyringPurpose.VERIFY)) 

374 except VerificationError as exc: 

375 raise VerificationError(f"invalid log entry: {exc}") 

376 

377 # (6): verify our established times (timestamps or the log integration time) are 

378 # within signing certificate validity period. 

379 for vts in verified_timestamps: 

380 if not ( 

381 bundle.signing_certificate.not_valid_before_utc 

382 <= vts.time 

383 <= bundle.signing_certificate.not_valid_after_utc 

384 ): 

385 raise VerificationError( 

386 f"invalid signing cert: expired at time of signing, time via {vts}" 

387 ) 

388 

389 def verify_dsse( 

390 self, bundle: Bundle, policy: VerificationPolicy 

391 ) -> tuple[str, bytes]: 

392 """ 

393 Verifies an bundle's DSSE envelope, returning the encapsulated payload 

394 and its content type. 

395 

396 This method is only for DSSE-enveloped payloads. To verify 

397 an arbitrary input against a bundle, use the `verify_artifact` 

398 method. 

399 

400 `bundle` is the Sigstore `Bundle` to both verify and verify against. 

401 

402 `policy` is the `VerificationPolicy` to verify against. 

403 

404 Returns a tuple of `(type, payload)`, where `type` is the payload's 

405 type as encoded in the DSSE envelope and `payload` is the raw `bytes` 

406 of the payload. No validation of either `type` or `payload` is 

407 performed; users of this API **must** assert that `type` is known 

408 to them before proceeding to handle `payload` in an application-dependent 

409 manner. 

410 """ 

411 

412 # (1) through (6) are performed by `_verify_common_signing_cert`. 

413 self._verify_common_signing_cert(bundle, policy) 

414 

415 # (7): verify the bundle's signature and DSSE envelope against the 

416 # signing certificate's public key. 

417 envelope = bundle._dsse_envelope 

418 if envelope is None: 

419 raise VerificationError( 

420 "cannot perform DSSE verification on a bundle without a DSSE envelope" 

421 ) 

422 

423 signing_key = bundle.signing_certificate.public_key() 

424 signing_key = cast(ec.EllipticCurvePublicKey, signing_key) 

425 dsse._verify(signing_key, envelope) 

426 

427 # (8): verify the consistency of the log entry's body against 

428 # the other bundle materials. 

429 # Rekor v2 records DSSE envelopes as hashedrekord/0.0.2 entries whose 

430 # digest covers PAE(payloadType, payload) and whose signature.content 

431 # equals envelope.signatures[0].sig (rekor-v2-spec §6.1.4). Rekor v1 

432 # used a dsse/0.0.1 entry, which is slightly weaker than the 

433 # hashedrekord consistency check: dsse entries record an envelope 

434 # hash that we *cannot* verify (the envelope is uncanonicalized JSON), 

435 # so we manually pick apart the entry body and verify the parts we 

436 # can (payload hash and signature list). 

437 entry = bundle.log_entry 

438 kind = entry._inner.kind_version.kind 

439 version = entry._inner.kind_version.version 

440 if kind == "hashedrekord" and version == "0.0.2": 

441 _validate_hashedrekord_v002_dsse_entry_body(bundle) 

442 elif kind == "dsse" and version == "0.0.1": 

443 _validate_dsse_v001_entry_body(bundle) 

444 else: 

445 raise VerificationError( 

446 f"Unsupported DSSE log entry type: {kind}/{version}" 

447 ) 

448 

449 return (envelope._inner.payload_type, envelope._inner.payload) 

450 

451 def verify_artifact( 

452 self, 

453 input_: bytes | Hashed, 

454 bundle: Bundle, 

455 policy: VerificationPolicy, 

456 ) -> None: 

457 """ 

458 Public API for verifying. 

459 

460 `input_` is the input to verify, either as a buffer of contents or as 

461 a prehashed `Hashed` object. 

462 

463 `bundle` is the Sigstore `Bundle` to verify against. 

464 

465 `policy` is the `VerificationPolicy` to verify against. 

466 

467 On failure, this method raises `VerificationError`. 

468 """ 

469 

470 # (1) through (6) are performed by `_verify_common_signing_cert`. 

471 self._verify_common_signing_cert(bundle, policy) 

472 

473 hashed_input = sha256_digest(input_) 

474 bundle_signature = bundle._inner.message_signature 

475 if bundle_signature is None: 

476 raise VerificationError("Missing bundle message signature") 

477 

478 # signature is verified over input digest, but if the bundle documents the digest we still 

479 # want to ensure it matches the input digest: 

480 if ( 

481 bundle_signature.message_digest is not None 

482 and hashed_input.digest != bundle_signature.message_digest.digest 

483 ): 

484 raise VerificationError("Bundle message digest mismatch") 

485 

486 # (7): verify that the signature was signed by the public key in the signing certificate. 

487 try: 

488 signing_key = bundle.signing_certificate.public_key() 

489 signing_key = cast(ec.EllipticCurvePublicKey, signing_key) 

490 signing_key.verify( 

491 bundle_signature.signature, 

492 hashed_input.digest, 

493 ec.ECDSA(hashed_input._as_prehashed()), 

494 ) 

495 except InvalidSignature: 

496 raise VerificationError("Signature is invalid for input") 

497 

498 _logger.debug("Successfully verified signature...") 

499 

500 # (8): verify the consistency of the log entry's body against 

501 # the other bundle materials (and input being verified). 

502 entry = bundle.log_entry 

503 if entry._inner.kind_version.kind != "hashedrekord": 

504 raise VerificationError( 

505 f"Expected entry type hashedrekord, got {entry._inner.kind_version.kind}" 

506 ) 

507 

508 if entry._inner.kind_version.version == "0.0.2": 

509 _validate_hashedrekord_v002_entry_body(bundle, hashed_input) 

510 elif entry._inner.kind_version.version == "0.0.1": 

511 _validate_hashedrekord_v001_entry_body(bundle, hashed_input) 

512 else: 

513 raise VerificationError( 

514 f"Unsupported hashedrekord version {entry._inner.kind_version.version}" 

515 ) 

516 

517 

518def _validate_dsse_v001_entry_body(bundle: Bundle) -> None: 

519 """ 

520 Validate the Entry body for dsse v001. 

521 """ 

522 entry = bundle.log_entry 

523 envelope = bundle._dsse_envelope 

524 if envelope is None: 

525 raise VerificationError( 

526 "cannot perform DSSE verification on a bundle without a DSSE envelope" 

527 ) 

528 try: 

529 entry_body = rekor_types.Dsse.model_validate_json( 

530 entry._inner.canonicalized_body 

531 ) 

532 except ValidationError as exc: 

533 raise VerificationError(f"invalid DSSE log entry: {exc}") 

534 

535 payload_hash = sha256_digest(envelope._inner.payload).digest.hex() 

536 if ( 

537 entry_body.spec.root.payload_hash.algorithm # type: ignore[union-attr] 

538 != rekor_types.dsse.Algorithm.SHA256 

539 ): 

540 raise VerificationError("expected SHA256 payload hash in DSSE log entry") 

541 if payload_hash != entry_body.spec.root.payload_hash.value: # type: ignore[union-attr] 

542 raise VerificationError("log entry payload hash does not match bundle") 

543 

544 # NOTE: Like `dsse._verify`: multiple signatures would be frivolous here, 

545 # but we handle them just in case the signer has somehow produced multiple 

546 # signatures for their envelope with the same signing key. 

547 signatures = [ 

548 rekor_types.dsse.Signature( 

549 signature=base64.b64encode(signature.sig).decode(), 

550 verifier=base64_encode_pem_cert(bundle.signing_certificate), 

551 ) 

552 for signature in envelope._inner.signatures 

553 ] 

554 if signatures != entry_body.spec.root.signatures: 

555 raise VerificationError("log entry signatures do not match bundle") 

556 

557 

558def _validate_hashedrekord_v001_entry_body( 

559 bundle: Bundle, hashed_input: Hashed 

560) -> None: 

561 """ 

562 Validate the Entry body for hashedrekord v001. 

563 """ 

564 entry = bundle.log_entry 

565 expected_body = _hashedrekord_from_parts( 

566 bundle.signing_certificate, 

567 bundle._inner.message_signature.signature, # type: ignore[union-attr] 

568 hashed_input, 

569 ) 

570 actual_body = rekor_types.Hashedrekord.model_validate_json( 

571 entry._inner.canonicalized_body 

572 ) 

573 if expected_body != actual_body: 

574 raise VerificationError( 

575 "transparency log entry is inconsistent with other materials" 

576 ) 

577 

578 

579def _validate_hashedrekord_v002_dsse_entry_body(bundle: Bundle) -> None: 

580 """ 

581 Validate Entry body for a Rekor v2 DSSE envelope encoded as a 

582 hashedrekord/0.0.2 entry (rekor-v2-spec §6.1.4). 

583 

584 The expected entry body has: 

585 - data.digest = Hash(PAE(payloadType, payload)), where Hash is the 

586 externalized hash function of the entry's signing algorithm. 

587 - data.algorithm = the matching HashAlgorithm. 

588 - signature.content = envelope.signatures[0].sig. 

589 - signature.verifier = the bundle's signing certificate. 

590 """ 

591 entry = bundle.log_entry 

592 envelope = bundle._dsse_envelope 

593 if envelope is None: 

594 raise VerificationError( 

595 "cannot perform DSSE verification on a bundle without a DSSE envelope" 

596 ) 

597 if len(envelope._inner.signatures) != 1: 

598 raise VerificationError( 

599 "DSSE envelope must have exactly one signature for hashedrekord encoding" 

600 ) 

601 

602 expected_verifier = _v2_verifier_from_certificate(bundle.signing_certificate) 

603 algorithm, hash_func = _get_prehash(expected_verifier.key_details) 

604 pae_digest = hash_func(envelope.pae()).digest() 

605 

606 expected_body = v2.entry.Entry( 

607 kind=entry._inner.kind_version.kind, 

608 api_version=entry._inner.kind_version.version, 

609 spec=v2.entry.Spec( 

610 hashed_rekord_v002=v2.hashedrekord.HashedRekordLogEntryV002( 

611 data=v1.HashOutput( 

612 algorithm=algorithm, 

613 digest=base64.b64encode(pae_digest), 

614 ), 

615 signature=v2.verifier.Signature( 

616 content=base64.b64encode(envelope.signature), 

617 verifier=expected_verifier, 

618 ), 

619 ) 

620 ), 

621 ) 

622 actual_body = v2.entry.Entry.from_json(entry._inner.canonicalized_body) 

623 if expected_body != actual_body: 

624 raise VerificationError( 

625 "transparency log entry is inconsistent with other materials" 

626 ) 

627 

628 

629def _validate_hashedrekord_v002_entry_body( 

630 bundle: Bundle, hashed_input: Hashed 

631) -> None: 

632 """ 

633 Validate Entry body for hashedrekord v002. 

634 """ 

635 entry = bundle.log_entry 

636 if bundle._inner.message_signature is None: 

637 raise VerificationError( 

638 "invalid hashedrekord log entry: missing message signature" 

639 ) 

640 v2_expected_body = v2.entry.Entry( 

641 kind=entry._inner.kind_version.kind, 

642 api_version=entry._inner.kind_version.version, 

643 spec=v2.entry.Spec( 

644 hashed_rekord_v002=v2.hashedrekord.HashedRekordLogEntryV002( 

645 data=v1.HashOutput( 

646 algorithm=hashed_input.algorithm, 

647 digest=base64.b64encode(hashed_input.digest), 

648 ), 

649 signature=v2.verifier.Signature( 

650 content=base64.b64encode(bundle._inner.message_signature.signature), 

651 verifier=_v2_verifier_from_certificate(bundle.signing_certificate), 

652 ), 

653 ) 

654 ), 

655 ) 

656 v2_actual_body = v2.entry.Entry.from_json(entry._inner.canonicalized_body) 

657 if v2_expected_body != v2_actual_body: 

658 raise VerificationError( 

659 "transparency log entry is inconsistent with other materials" 

660 ) 

661 

662 

663def _v2_verifier_from_certificate(certificate: Certificate) -> v2.verifier.Verifier: 

664 """ 

665 Return a Rekor v2 Verifier for the signing certificate. 

666 

667 Key-to-algorithm mapping is handled by the algorithm registry via 

668 `_get_key_details`. 

669 """ 

670 return v2.verifier.Verifier( 

671 x509_certificate=v1.X509Certificate( 

672 raw_bytes=base64.b64encode( 

673 certificate.public_bytes(encoding=serialization.Encoding.DER) 

674 ) 

675 ), 

676 key_details=_get_key_details(certificate), 

677 )