1# Copyright 2022 The Sigstore Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""
16Verification API machinery.
17"""
18
19from __future__ import annotations
20
21import base64
22import logging
23from datetime import datetime, timezone
24from typing import cast
25
26import rekor_types
27from cryptography.exceptions import InvalidSignature
28from cryptography.hazmat.primitives import serialization
29from cryptography.hazmat.primitives.asymmetric import ec
30from cryptography.x509 import Certificate, ExtendedKeyUsage, KeyUsage
31from cryptography.x509.oid import ExtendedKeyUsageOID
32from OpenSSL.crypto import (
33 X509,
34 X509Store,
35 X509StoreContext,
36 X509StoreContextError,
37 X509StoreFlags,
38)
39from pydantic import ValidationError
40from rfc3161_client import TimeStampResponse, VerifierBuilder
41from rfc3161_client import VerificationError as Rfc3161VerificationError
42from sigstore_models.common import v1
43from sigstore_models.rekor import v2
44
45from sigstore import dsse
46from sigstore._internal.key_details import _get_key_details, _get_prehash
47from sigstore._internal.rekor import _hashedrekord_from_parts
48from sigstore._internal.rekor.client import RekorClient
49from sigstore._internal.sct import (
50 verify_sct,
51)
52from sigstore._internal.timestamp import TimestampSource, TimestampVerificationResult
53from sigstore._internal.trust import KeyringPurpose
54from sigstore._utils import base64_encode_pem_cert, sha256_digest
55from sigstore.errors import CertValidationError, VerificationError
56from sigstore.hashes import Hashed
57from sigstore.models import Bundle, ClientTrustConfig, TrustedRoot
58from sigstore.verify.policy import VerificationPolicy
59
60_logger = logging.getLogger(__name__)
61
62# Limit the number of timestamps to prevent DoS
63# From https://github.com/sigstore/sigstore-go/blob/e92142f0734064ebf6001f188b7330a1212245fe/pkg/verify/tsa.go#L29
64MAX_ALLOWED_TIMESTAMP: int = 32
65
66# When verifying an entry, this threshold represents the minimum number of required
67# verified times to consider a signature valid.
68VERIFIED_TIME_THRESHOLD: int = 1
69
70
71class Verifier:
72 """
73 The primary API for verification operations.
74 """
75
76 def __init__(self, *, trusted_root: TrustedRoot):
77 """
78 Create a new `Verifier`.
79
80 `trusted_root` is the `TrustedRoot` object containing the root of trust
81 for the verification process.
82 """
83 self._fulcio_certificate_chain: list[X509] = [
84 X509.from_cryptography(parent_cert)
85 for parent_cert in trusted_root.get_fulcio_certs()
86 ]
87 self._trusted_root = trusted_root
88
89 # this is an ugly hack needed for verifying "detached" materials
90 # In reality we should be choosing the rekor instance based on the logid
91 url = trusted_root._inner.tlogs[0].base_url
92 self._rekor = RekorClient(url)
93
94 @classmethod
95 def production(cls, *, offline: bool = False) -> Verifier:
96 """
97 Return a `Verifier` instance configured against Sigstore's production-level services.
98
99 `offline` controls the Trusted Root refresh behavior: if `True`,
100 the verifier uses the Trusted Root in the local TUF cache. If `False`,
101 a TUF repository refresh is attempted.
102 """
103 config = ClientTrustConfig.production(offline=offline)
104 return cls(
105 trusted_root=config.trusted_root,
106 )
107
108 @classmethod
109 def staging(cls, *, offline: bool = False) -> Verifier:
110 """
111 Return a `Verifier` instance configured against Sigstore's staging-level services.
112
113 `offline` controls the Trusted Root refresh behavior: if `True`,
114 the verifier uses the Trusted Root in the local TUF cache. If `False`,
115 a TUF repository refresh is attempted.
116 """
117 config = ClientTrustConfig.staging(offline=offline)
118 return cls(
119 trusted_root=config.trusted_root,
120 )
121
122 def _verify_signed_timestamp(
123 self, timestamp_response: TimeStampResponse, message: bytes
124 ) -> TimestampVerificationResult | None:
125 """
126 Verify a Signed Timestamp using the TSA provided by the Trusted Root.
127 """
128 cert_authorities = self._trusted_root.get_timestamp_authorities()
129 for certificate_authority in cert_authorities:
130 certificates = certificate_authority.certificates(allow_expired=True)
131
132 # We expect at least a signing cert and a root cert but there may be intermediates
133 if len(certificates) < 2:
134 _logger.debug("Unable to verify Timestamp: cert chain is incomplete")
135 continue
136
137 builder = (
138 VerifierBuilder()
139 .tsa_certificate(certificates[0])
140 .add_root_certificate(certificates[-1])
141 )
142 for certificate in certificates[1:-1]:
143 builder = builder.add_intermediate_certificate(certificate)
144
145 verifier = builder.build()
146 try:
147 verifier.verify_message(timestamp_response, message)
148 except Rfc3161VerificationError:
149 _logger.debug("Unable to verify Timestamp with CA.", exc_info=True)
150 continue
151
152 if (
153 certificate_authority.validity_period_start
154 <= timestamp_response.tst_info.gen_time
155 ) and (
156 not certificate_authority.validity_period_end
157 or timestamp_response.tst_info.gen_time
158 < certificate_authority.validity_period_end
159 ):
160 return TimestampVerificationResult(
161 source=TimestampSource.TIMESTAMP_AUTHORITY,
162 time=timestamp_response.tst_info.gen_time,
163 )
164
165 _logger.debug("Unable to verify Timestamp because not in CA time range.")
166
167 return None
168
169 def _verify_timestamp_authority(
170 self, bundle: Bundle
171 ) -> list[TimestampVerificationResult]:
172 """
173 Verify that the given bundle has been timestamped by a trusted timestamp authority
174 and that the timestamp is valid.
175
176 Returns the number of valid signed timestamp in the bundle.
177 """
178 timestamp_responses = []
179 if (
180 timestamp_verification_data
181 := bundle.verification_material.timestamp_verification_data
182 ):
183 timestamp_responses = timestamp_verification_data.rfc3161_timestamps
184
185 if len(timestamp_responses) > MAX_ALLOWED_TIMESTAMP:
186 msg = f"too many signed timestamp: {len(timestamp_responses)} > {MAX_ALLOWED_TIMESTAMP}"
187 raise VerificationError(msg)
188
189 if len(set(timestamp_responses)) != len(timestamp_responses):
190 msg = "duplicate timestamp found"
191 raise VerificationError(msg)
192
193 verified_timestamps = [
194 result
195 for tsr in timestamp_responses
196 if (result := self._verify_signed_timestamp(tsr, bundle.signature))
197 ]
198
199 return verified_timestamps
200
201 def _establish_time(self, bundle: Bundle) -> list[TimestampVerificationResult]:
202 """
203 Establish the time for bundle verification.
204
205 This method uses timestamps from two possible sources:
206 1. RFC3161 signed timestamps from a Timestamping Authority (TSA)
207 2. Transparency Log timestamps
208 """
209 verified_timestamps = []
210
211 # If a timestamp from the timestamping service is available, the Verifier MUST
212 # perform path validation using the timestamp from the Timestamping Service.
213 if bundle.verification_material.timestamp_verification_data:
214 if not self._trusted_root.get_timestamp_authorities():
215 msg = (
216 "no Timestamp Authorities have been provided to validate this "
217 "bundle but it contains a signed timestamp"
218 )
219 raise VerificationError(msg)
220
221 timestamp_from_tsa = self._verify_timestamp_authority(bundle)
222 verified_timestamps.extend(timestamp_from_tsa)
223
224 # If a timestamp from the Transparency Service is available, the Verifier MUST
225 # perform path validation using the timestamp from the Transparency Service.
226 # NOTE: We only include this timestamp if it's accompanied by an inclusion
227 # promise that cryptographically binds it. We verify the inclusion promise
228 # itself later, as part of log entry verification.
229 if (
230 timestamp := bundle.log_entry._inner.integrated_time
231 ) and bundle.log_entry._inner.inclusion_promise:
232 kv = bundle.log_entry._inner.kind_version
233 if not (kv.kind in ["dsse", "hashedrekord"] and kv.version == "0.0.1"):
234 raise VerificationError(
235 "Integrated time only supported for dsse/hashedrekord 0.0.1 types"
236 )
237
238 verified_timestamps.append(
239 TimestampVerificationResult(
240 source=TimestampSource.TRANSPARENCY_SERVICE,
241 time=datetime.fromtimestamp(timestamp, tz=timezone.utc),
242 )
243 )
244 return verified_timestamps
245
246 def _verify_chain_at_time(
247 self, certificate: X509, timestamp_result: TimestampVerificationResult
248 ) -> list[X509]:
249 """
250 Verify the validity of the certificate chain at the given time.
251
252 Raises a VerificationError if the chain can't be built or be verified.
253 """
254 # NOTE: The `X509Store` object cannot have its time reset once the `set_time`
255 # method been called on it. To get around this, we construct a new one in each
256 # call.
257 store = X509Store()
258 # NOTE: By explicitly setting the flags here, we ensure that OpenSSL's
259 # PARTIAL_CHAIN default does not change on us. Enabling PARTIAL_CHAIN
260 # would be strictly more conformant of OpenSSL, but we currently
261 # *want* the "long" chain behavior of performing path validation
262 # down to a self-signed root.
263 store.set_flags(X509StoreFlags.X509_STRICT)
264 for parent_cert_ossl in self._fulcio_certificate_chain:
265 store.add_cert(parent_cert_ossl)
266
267 store.set_time(timestamp_result.time)
268
269 store_ctx = X509StoreContext(store, certificate)
270
271 try:
272 # get_verified_chain returns the full chain including the end-entity certificate
273 # and chain should contain only CA certificates
274 return store_ctx.get_verified_chain()[1:]
275 except X509StoreContextError as e:
276 raise CertValidationError(
277 f"failed to build timestamp certificate chain: {e}"
278 )
279
280 def _verify_common_signing_cert(
281 self, bundle: Bundle, policy: VerificationPolicy
282 ) -> None:
283 """
284 Performs the signing certificate verification steps that are shared between
285 `verify_dsse` and `verify_artifact`.
286
287 Raises `VerificationError` on all failures.
288 """
289
290 # In order to verify an artifact, we need to achieve the following:
291 #
292 # 0. Establish a time for the signature.
293 # 1. Verify that the signing certificate chains to the root of trust
294 # and is valid at the time of signing.
295 # 2. Verify the signing certificate's SCT.
296 # 3. Verify that the signing certificate conforms to the Sigstore
297 # X.509 profile as well as the passed-in `VerificationPolicy`.
298 # 4. Verify the inclusion proof and signed checkpoint for the log
299 # entry.
300 # 5. Verify the inclusion promise for the log entry, if present.
301 # 6. Verify the timely insertion of the log entry against the validity
302 # period for the signing certificate.
303 # 7. Verify the signature and input against the signing certificate's
304 # public key.
305 # 8. Verify the transparency log entry's consistency against the other
306 # materials, to prevent variants of CVE-2022-36056.
307 #
308 # This method performs steps (0) through (6) above. Its caller
309 # MUST perform steps (7) and (8) separately, since they vary based on
310 # the kind of verification being performed (i.e. hashedrekord, DSSE, etc.)
311
312 cert = bundle.signing_certificate
313
314 # NOTE: The `X509Store` object currently cannot have its time reset once the `set_time`
315 # method been called on it. To get around this, we construct a new one for every `verify`
316 # call.
317 store = X509Store()
318 # NOTE: By explicitly setting the flags here, we ensure that OpenSSL's
319 # PARTIAL_CHAIN default does not change on us. Enabling PARTIAL_CHAIN
320 # would be strictly more conformant of OpenSSL, but we currently
321 # *want* the "long" chain behavior of performing path validation
322 # down to a self-signed root.
323 store.set_flags(X509StoreFlags.X509_STRICT)
324 for parent_cert_ossl in self._fulcio_certificate_chain:
325 store.add_cert(parent_cert_ossl)
326
327 # (0): Establishing a Time for the Signature
328 # First, establish verified times for the signature. This is required to
329 # validate the certificate chain, so this step comes first.
330 # These include TSA timestamps and (in the case of rekor v1 entries)
331 # rekor log integrated time.
332 verified_timestamps = self._establish_time(bundle)
333 if len(verified_timestamps) < VERIFIED_TIME_THRESHOLD:
334 raise VerificationError("not enough sources of verified time")
335
336 # (1): verify that the signing certificate is signed by the root
337 # certificate and that the signing certificate was valid at the
338 # time of signing.
339 cert_ossl = X509.from_cryptography(cert)
340 chain: list[X509] = []
341 for vts in verified_timestamps:
342 chain = self._verify_chain_at_time(cert_ossl, vts)
343
344 # (2): verify the signing certificate's SCT.
345 try:
346 verify_sct(
347 cert,
348 [parent_cert.to_cryptography() for parent_cert in chain],
349 self._trusted_root.ct_keyring(KeyringPurpose.VERIFY),
350 )
351 except VerificationError as e:
352 raise VerificationError(f"failed to verify SCT on signing certificate: {e}")
353
354 # (3): verify the signing certificate against the Sigstore
355 # X.509 profile and verify against the given `VerificationPolicy`.
356 usage_ext = cert.extensions.get_extension_for_class(KeyUsage)
357 if not usage_ext.value.digital_signature:
358 raise VerificationError("Key usage is not of type `digital signature`")
359
360 extended_usage_ext = cert.extensions.get_extension_for_class(ExtendedKeyUsage)
361 if ExtendedKeyUsageOID.CODE_SIGNING not in extended_usage_ext.value:
362 raise VerificationError("Extended usage does not contain `code signing`")
363
364 policy.verify(cert)
365
366 _logger.debug("Successfully verified signing certificate validity...")
367
368 # (4): verify the inclusion proof and signed checkpoint for the
369 # log entry.
370 # (5): verify the inclusion promise for the log entry, if present.
371 entry = bundle.log_entry
372 try:
373 entry._verify(self._trusted_root.rekor_keyring(KeyringPurpose.VERIFY))
374 except VerificationError as exc:
375 raise VerificationError(f"invalid log entry: {exc}")
376
377 # (6): verify our established times (timestamps or the log integration time) are
378 # within signing certificate validity period.
379 for vts in verified_timestamps:
380 if not (
381 bundle.signing_certificate.not_valid_before_utc
382 <= vts.time
383 <= bundle.signing_certificate.not_valid_after_utc
384 ):
385 raise VerificationError(
386 f"invalid signing cert: expired at time of signing, time via {vts}"
387 )
388
389 def verify_dsse(
390 self, bundle: Bundle, policy: VerificationPolicy
391 ) -> tuple[str, bytes]:
392 """
393 Verifies an bundle's DSSE envelope, returning the encapsulated payload
394 and its content type.
395
396 This method is only for DSSE-enveloped payloads. To verify
397 an arbitrary input against a bundle, use the `verify_artifact`
398 method.
399
400 `bundle` is the Sigstore `Bundle` to both verify and verify against.
401
402 `policy` is the `VerificationPolicy` to verify against.
403
404 Returns a tuple of `(type, payload)`, where `type` is the payload's
405 type as encoded in the DSSE envelope and `payload` is the raw `bytes`
406 of the payload. No validation of either `type` or `payload` is
407 performed; users of this API **must** assert that `type` is known
408 to them before proceeding to handle `payload` in an application-dependent
409 manner.
410 """
411
412 # (1) through (6) are performed by `_verify_common_signing_cert`.
413 self._verify_common_signing_cert(bundle, policy)
414
415 # (7): verify the bundle's signature and DSSE envelope against the
416 # signing certificate's public key.
417 envelope = bundle._dsse_envelope
418 if envelope is None:
419 raise VerificationError(
420 "cannot perform DSSE verification on a bundle without a DSSE envelope"
421 )
422
423 signing_key = bundle.signing_certificate.public_key()
424 signing_key = cast(ec.EllipticCurvePublicKey, signing_key)
425 dsse._verify(signing_key, envelope)
426
427 # (8): verify the consistency of the log entry's body against
428 # the other bundle materials.
429 # Rekor v2 records DSSE envelopes as hashedrekord/0.0.2 entries whose
430 # digest covers PAE(payloadType, payload) and whose signature.content
431 # equals envelope.signatures[0].sig (rekor-v2-spec §6.1.4). Rekor v1
432 # used a dsse/0.0.1 entry, which is slightly weaker than the
433 # hashedrekord consistency check: dsse entries record an envelope
434 # hash that we *cannot* verify (the envelope is uncanonicalized JSON),
435 # so we manually pick apart the entry body and verify the parts we
436 # can (payload hash and signature list).
437 entry = bundle.log_entry
438 kind = entry._inner.kind_version.kind
439 version = entry._inner.kind_version.version
440 if kind == "hashedrekord" and version == "0.0.2":
441 _validate_hashedrekord_v002_dsse_entry_body(bundle)
442 elif kind == "dsse" and version == "0.0.1":
443 _validate_dsse_v001_entry_body(bundle)
444 else:
445 raise VerificationError(
446 f"Unsupported DSSE log entry type: {kind}/{version}"
447 )
448
449 return (envelope._inner.payload_type, envelope._inner.payload)
450
451 def verify_artifact(
452 self,
453 input_: bytes | Hashed,
454 bundle: Bundle,
455 policy: VerificationPolicy,
456 ) -> None:
457 """
458 Public API for verifying.
459
460 `input_` is the input to verify, either as a buffer of contents or as
461 a prehashed `Hashed` object.
462
463 `bundle` is the Sigstore `Bundle` to verify against.
464
465 `policy` is the `VerificationPolicy` to verify against.
466
467 On failure, this method raises `VerificationError`.
468 """
469
470 # (1) through (6) are performed by `_verify_common_signing_cert`.
471 self._verify_common_signing_cert(bundle, policy)
472
473 hashed_input = sha256_digest(input_)
474 bundle_signature = bundle._inner.message_signature
475 if bundle_signature is None:
476 raise VerificationError("Missing bundle message signature")
477
478 # signature is verified over input digest, but if the bundle documents the digest we still
479 # want to ensure it matches the input digest:
480 if (
481 bundle_signature.message_digest is not None
482 and hashed_input.digest != bundle_signature.message_digest.digest
483 ):
484 raise VerificationError("Bundle message digest mismatch")
485
486 # (7): verify that the signature was signed by the public key in the signing certificate.
487 try:
488 signing_key = bundle.signing_certificate.public_key()
489 signing_key = cast(ec.EllipticCurvePublicKey, signing_key)
490 signing_key.verify(
491 bundle_signature.signature,
492 hashed_input.digest,
493 ec.ECDSA(hashed_input._as_prehashed()),
494 )
495 except InvalidSignature:
496 raise VerificationError("Signature is invalid for input")
497
498 _logger.debug("Successfully verified signature...")
499
500 # (8): verify the consistency of the log entry's body against
501 # the other bundle materials (and input being verified).
502 entry = bundle.log_entry
503 if entry._inner.kind_version.kind != "hashedrekord":
504 raise VerificationError(
505 f"Expected entry type hashedrekord, got {entry._inner.kind_version.kind}"
506 )
507
508 if entry._inner.kind_version.version == "0.0.2":
509 _validate_hashedrekord_v002_entry_body(bundle, hashed_input)
510 elif entry._inner.kind_version.version == "0.0.1":
511 _validate_hashedrekord_v001_entry_body(bundle, hashed_input)
512 else:
513 raise VerificationError(
514 f"Unsupported hashedrekord version {entry._inner.kind_version.version}"
515 )
516
517
518def _validate_dsse_v001_entry_body(bundle: Bundle) -> None:
519 """
520 Validate the Entry body for dsse v001.
521 """
522 entry = bundle.log_entry
523 envelope = bundle._dsse_envelope
524 if envelope is None:
525 raise VerificationError(
526 "cannot perform DSSE verification on a bundle without a DSSE envelope"
527 )
528 try:
529 entry_body = rekor_types.Dsse.model_validate_json(
530 entry._inner.canonicalized_body
531 )
532 except ValidationError as exc:
533 raise VerificationError(f"invalid DSSE log entry: {exc}")
534
535 payload_hash = sha256_digest(envelope._inner.payload).digest.hex()
536 if (
537 entry_body.spec.root.payload_hash.algorithm # type: ignore[union-attr]
538 != rekor_types.dsse.Algorithm.SHA256
539 ):
540 raise VerificationError("expected SHA256 payload hash in DSSE log entry")
541 if payload_hash != entry_body.spec.root.payload_hash.value: # type: ignore[union-attr]
542 raise VerificationError("log entry payload hash does not match bundle")
543
544 # NOTE: Like `dsse._verify`: multiple signatures would be frivolous here,
545 # but we handle them just in case the signer has somehow produced multiple
546 # signatures for their envelope with the same signing key.
547 signatures = [
548 rekor_types.dsse.Signature(
549 signature=base64.b64encode(signature.sig).decode(),
550 verifier=base64_encode_pem_cert(bundle.signing_certificate),
551 )
552 for signature in envelope._inner.signatures
553 ]
554 if signatures != entry_body.spec.root.signatures:
555 raise VerificationError("log entry signatures do not match bundle")
556
557
558def _validate_hashedrekord_v001_entry_body(
559 bundle: Bundle, hashed_input: Hashed
560) -> None:
561 """
562 Validate the Entry body for hashedrekord v001.
563 """
564 entry = bundle.log_entry
565 expected_body = _hashedrekord_from_parts(
566 bundle.signing_certificate,
567 bundle._inner.message_signature.signature, # type: ignore[union-attr]
568 hashed_input,
569 )
570 actual_body = rekor_types.Hashedrekord.model_validate_json(
571 entry._inner.canonicalized_body
572 )
573 if expected_body != actual_body:
574 raise VerificationError(
575 "transparency log entry is inconsistent with other materials"
576 )
577
578
579def _validate_hashedrekord_v002_dsse_entry_body(bundle: Bundle) -> None:
580 """
581 Validate Entry body for a Rekor v2 DSSE envelope encoded as a
582 hashedrekord/0.0.2 entry (rekor-v2-spec §6.1.4).
583
584 The expected entry body has:
585 - data.digest = Hash(PAE(payloadType, payload)), where Hash is the
586 externalized hash function of the entry's signing algorithm.
587 - data.algorithm = the matching HashAlgorithm.
588 - signature.content = envelope.signatures[0].sig.
589 - signature.verifier = the bundle's signing certificate.
590 """
591 entry = bundle.log_entry
592 envelope = bundle._dsse_envelope
593 if envelope is None:
594 raise VerificationError(
595 "cannot perform DSSE verification on a bundle without a DSSE envelope"
596 )
597 if len(envelope._inner.signatures) != 1:
598 raise VerificationError(
599 "DSSE envelope must have exactly one signature for hashedrekord encoding"
600 )
601
602 expected_verifier = _v2_verifier_from_certificate(bundle.signing_certificate)
603 algorithm, hash_func = _get_prehash(expected_verifier.key_details)
604 pae_digest = hash_func(envelope.pae()).digest()
605
606 expected_body = v2.entry.Entry(
607 kind=entry._inner.kind_version.kind,
608 api_version=entry._inner.kind_version.version,
609 spec=v2.entry.Spec(
610 hashed_rekord_v002=v2.hashedrekord.HashedRekordLogEntryV002(
611 data=v1.HashOutput(
612 algorithm=algorithm,
613 digest=base64.b64encode(pae_digest),
614 ),
615 signature=v2.verifier.Signature(
616 content=base64.b64encode(envelope.signature),
617 verifier=expected_verifier,
618 ),
619 )
620 ),
621 )
622 actual_body = v2.entry.Entry.from_json(entry._inner.canonicalized_body)
623 if expected_body != actual_body:
624 raise VerificationError(
625 "transparency log entry is inconsistent with other materials"
626 )
627
628
629def _validate_hashedrekord_v002_entry_body(
630 bundle: Bundle, hashed_input: Hashed
631) -> None:
632 """
633 Validate Entry body for hashedrekord v002.
634 """
635 entry = bundle.log_entry
636 if bundle._inner.message_signature is None:
637 raise VerificationError(
638 "invalid hashedrekord log entry: missing message signature"
639 )
640 v2_expected_body = v2.entry.Entry(
641 kind=entry._inner.kind_version.kind,
642 api_version=entry._inner.kind_version.version,
643 spec=v2.entry.Spec(
644 hashed_rekord_v002=v2.hashedrekord.HashedRekordLogEntryV002(
645 data=v1.HashOutput(
646 algorithm=hashed_input.algorithm,
647 digest=base64.b64encode(hashed_input.digest),
648 ),
649 signature=v2.verifier.Signature(
650 content=base64.b64encode(bundle._inner.message_signature.signature),
651 verifier=_v2_verifier_from_certificate(bundle.signing_certificate),
652 ),
653 )
654 ),
655 )
656 v2_actual_body = v2.entry.Entry.from_json(entry._inner.canonicalized_body)
657 if v2_expected_body != v2_actual_body:
658 raise VerificationError(
659 "transparency log entry is inconsistent with other materials"
660 )
661
662
663def _v2_verifier_from_certificate(certificate: Certificate) -> v2.verifier.Verifier:
664 """
665 Return a Rekor v2 Verifier for the signing certificate.
666
667 Key-to-algorithm mapping is handled by the algorithm registry via
668 `_get_key_details`.
669 """
670 return v2.verifier.Verifier(
671 x509_certificate=v1.X509Certificate(
672 raw_bytes=base64.b64encode(
673 certificate.public_bytes(encoding=serialization.Encoding.DER)
674 )
675 ),
676 key_details=_get_key_details(certificate),
677 )