1# Copyright 2022 The Sigstore Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""
16Verification API machinery.
17"""
18
19from __future__ import annotations
20
21import base64
22import logging
23from datetime import datetime, timezone
24from typing import cast
25
26import rekor_types
27from cryptography.exceptions import InvalidSignature
28from cryptography.hazmat.primitives import serialization
29from cryptography.hazmat.primitives.asymmetric import ec
30from cryptography.x509 import Certificate, ExtendedKeyUsage, KeyUsage
31from cryptography.x509.oid import ExtendedKeyUsageOID
32from OpenSSL.crypto import (
33 X509,
34 X509Store,
35 X509StoreContext,
36 X509StoreContextError,
37 X509StoreFlags,
38)
39from pydantic import ValidationError
40from rfc3161_client import TimeStampResponse, VerifierBuilder
41from rfc3161_client import VerificationError as Rfc3161VerificationError
42from sigstore_models.common import v1
43from sigstore_models.rekor import v2
44
45from sigstore import dsse
46from sigstore._internal.rekor import _hashedrekord_from_parts
47from sigstore._internal.rekor.client import RekorClient
48from sigstore._internal.sct import (
49 verify_sct,
50)
51from sigstore._internal.timestamp import TimestampSource, TimestampVerificationResult
52from sigstore._internal.trust import KeyringPurpose
53from sigstore._utils import base64_encode_pem_cert, sha256_digest
54from sigstore.errors import CertValidationError, VerificationError
55from sigstore.hashes import Hashed
56from sigstore.models import Bundle, ClientTrustConfig, TrustedRoot
57from sigstore.verify.policy import VerificationPolicy
58
59_logger = logging.getLogger(__name__)
60
61# Limit the number of timestamps to prevent DoS
62# From https://github.com/sigstore/sigstore-go/blob/e92142f0734064ebf6001f188b7330a1212245fe/pkg/verify/tsa.go#L29
63MAX_ALLOWED_TIMESTAMP: int = 32
64
65# When verifying an entry, this threshold represents the minimum number of required
66# verified times to consider a signature valid.
67VERIFIED_TIME_THRESHOLD: int = 1
68
69
70class Verifier:
71 """
72 The primary API for verification operations.
73 """
74
75 def __init__(self, *, trusted_root: TrustedRoot):
76 """
77 Create a new `Verifier`.
78
79 `trusted_root` is the `TrustedRoot` object containing the root of trust
80 for the verification process.
81 """
82 self._fulcio_certificate_chain: list[X509] = [
83 X509.from_cryptography(parent_cert)
84 for parent_cert in trusted_root.get_fulcio_certs()
85 ]
86 self._trusted_root = trusted_root
87
88 # this is an ugly hack needed for verifying "detached" materials
89 # In reality we should be choosing the rekor instance based on the logid
90 url = trusted_root._inner.tlogs[0].base_url
91 self._rekor = RekorClient(url)
92
93 @classmethod
94 def production(cls, *, offline: bool = False) -> Verifier:
95 """
96 Return a `Verifier` instance configured against Sigstore's production-level services.
97
98 `offline` controls the Trusted Root refresh behavior: if `True`,
99 the verifier uses the Trusted Root in the local TUF cache. If `False`,
100 a TUF repository refresh is attempted.
101 """
102 config = ClientTrustConfig.production(offline=offline)
103 return cls(
104 trusted_root=config.trusted_root,
105 )
106
107 @classmethod
108 def staging(cls, *, offline: bool = False) -> Verifier:
109 """
110 Return a `Verifier` instance configured against Sigstore's staging-level services.
111
112 `offline` controls the Trusted Root refresh behavior: if `True`,
113 the verifier uses the Trusted Root in the local TUF cache. If `False`,
114 a TUF repository refresh is attempted.
115 """
116 config = ClientTrustConfig.staging(offline=offline)
117 return cls(
118 trusted_root=config.trusted_root,
119 )
120
121 def _verify_signed_timestamp(
122 self, timestamp_response: TimeStampResponse, message: bytes
123 ) -> TimestampVerificationResult | None:
124 """
125 Verify a Signed Timestamp using the TSA provided by the Trusted Root.
126 """
127 cert_authorities = self._trusted_root.get_timestamp_authorities()
128 for certificate_authority in cert_authorities:
129 certificates = certificate_authority.certificates(allow_expired=True)
130
131 # We expect at least a signing cert and a root cert but there may be intermediates
132 if len(certificates) < 2:
133 _logger.debug("Unable to verify Timestamp: cert chain is incomplete")
134 continue
135
136 builder = (
137 VerifierBuilder()
138 .tsa_certificate(certificates[0])
139 .add_root_certificate(certificates[-1])
140 )
141 for certificate in certificates[1:-1]:
142 builder = builder.add_intermediate_certificate(certificate)
143
144 verifier = builder.build()
145 try:
146 verifier.verify_message(timestamp_response, message)
147 except Rfc3161VerificationError:
148 _logger.debug("Unable to verify Timestamp with CA.", exc_info=True)
149 continue
150
151 if (
152 certificate_authority.validity_period_start
153 <= timestamp_response.tst_info.gen_time
154 ) and (
155 not certificate_authority.validity_period_end
156 or timestamp_response.tst_info.gen_time
157 < certificate_authority.validity_period_end
158 ):
159 return TimestampVerificationResult(
160 source=TimestampSource.TIMESTAMP_AUTHORITY,
161 time=timestamp_response.tst_info.gen_time,
162 )
163
164 _logger.debug("Unable to verify Timestamp because not in CA time range.")
165
166 return None
167
168 def _verify_timestamp_authority(
169 self, bundle: Bundle
170 ) -> list[TimestampVerificationResult]:
171 """
172 Verify that the given bundle has been timestamped by a trusted timestamp authority
173 and that the timestamp is valid.
174
175 Returns the number of valid signed timestamp in the bundle.
176 """
177 timestamp_responses = []
178 if (
179 timestamp_verification_data
180 := bundle.verification_material.timestamp_verification_data
181 ):
182 timestamp_responses = timestamp_verification_data.rfc3161_timestamps
183
184 if len(timestamp_responses) > MAX_ALLOWED_TIMESTAMP:
185 msg = f"too many signed timestamp: {len(timestamp_responses)} > {MAX_ALLOWED_TIMESTAMP}"
186 raise VerificationError(msg)
187
188 if len(set(timestamp_responses)) != len(timestamp_responses):
189 msg = "duplicate timestamp found"
190 raise VerificationError(msg)
191
192 verified_timestamps = [
193 result
194 for tsr in timestamp_responses
195 if (result := self._verify_signed_timestamp(tsr, bundle.signature))
196 ]
197
198 return verified_timestamps
199
200 def _establish_time(self, bundle: Bundle) -> list[TimestampVerificationResult]:
201 """
202 Establish the time for bundle verification.
203
204 This method uses timestamps from two possible sources:
205 1. RFC3161 signed timestamps from a Timestamping Authority (TSA)
206 2. Transparency Log timestamps
207 """
208 verified_timestamps = []
209
210 # If a timestamp from the timestamping service is available, the Verifier MUST
211 # perform path validation using the timestamp from the Timestamping Service.
212 if bundle.verification_material.timestamp_verification_data:
213 if not self._trusted_root.get_timestamp_authorities():
214 msg = (
215 "no Timestamp Authorities have been provided to validate this "
216 "bundle but it contains a signed timestamp"
217 )
218 raise VerificationError(msg)
219
220 timestamp_from_tsa = self._verify_timestamp_authority(bundle)
221 verified_timestamps.extend(timestamp_from_tsa)
222
223 # If a timestamp from the Transparency Service is available, the Verifier MUST
224 # perform path validation using the timestamp from the Transparency Service.
225 # NOTE: We only include this timestamp if it's accompanied by an inclusion
226 # promise that cryptographically binds it. We verify the inclusion promise
227 # itself later, as part of log entry verification.
228 if (
229 timestamp := bundle.log_entry._inner.integrated_time
230 ) and bundle.log_entry._inner.inclusion_promise:
231 kv = bundle.log_entry._inner.kind_version
232 if not (kv.kind in ["dsse", "hashedrekord"] and kv.version == "0.0.1"):
233 raise VerificationError(
234 "Integrated time only supported for dsse/hashedrekord 0.0.1 types"
235 )
236
237 verified_timestamps.append(
238 TimestampVerificationResult(
239 source=TimestampSource.TRANSPARENCY_SERVICE,
240 time=datetime.fromtimestamp(timestamp, tz=timezone.utc),
241 )
242 )
243 return verified_timestamps
244
245 def _verify_chain_at_time(
246 self, certificate: X509, timestamp_result: TimestampVerificationResult
247 ) -> list[X509]:
248 """
249 Verify the validity of the certificate chain at the given time.
250
251 Raises a VerificationError if the chain can't be built or be verified.
252 """
253 # NOTE: The `X509Store` object cannot have its time reset once the `set_time`
254 # method been called on it. To get around this, we construct a new one in each
255 # call.
256 store = X509Store()
257 # NOTE: By explicitly setting the flags here, we ensure that OpenSSL's
258 # PARTIAL_CHAIN default does not change on us. Enabling PARTIAL_CHAIN
259 # would be strictly more conformant of OpenSSL, but we currently
260 # *want* the "long" chain behavior of performing path validation
261 # down to a self-signed root.
262 store.set_flags(X509StoreFlags.X509_STRICT)
263 for parent_cert_ossl in self._fulcio_certificate_chain:
264 store.add_cert(parent_cert_ossl)
265
266 store.set_time(timestamp_result.time)
267
268 store_ctx = X509StoreContext(store, certificate)
269
270 try:
271 # get_verified_chain returns the full chain including the end-entity certificate
272 # and chain should contain only CA certificates
273 return store_ctx.get_verified_chain()[1:]
274 except X509StoreContextError as e:
275 raise CertValidationError(
276 f"failed to build timestamp certificate chain: {e}"
277 )
278
279 def _verify_common_signing_cert(
280 self, bundle: Bundle, policy: VerificationPolicy
281 ) -> None:
282 """
283 Performs the signing certificate verification steps that are shared between
284 `verify_dsse` and `verify_artifact`.
285
286 Raises `VerificationError` on all failures.
287 """
288
289 # In order to verify an artifact, we need to achieve the following:
290 #
291 # 0. Establish a time for the signature.
292 # 1. Verify that the signing certificate chains to the root of trust
293 # and is valid at the time of signing.
294 # 2. Verify the signing certificate's SCT.
295 # 3. Verify that the signing certificate conforms to the Sigstore
296 # X.509 profile as well as the passed-in `VerificationPolicy`.
297 # 4. Verify the inclusion proof and signed checkpoint for the log
298 # entry.
299 # 5. Verify the inclusion promise for the log entry, if present.
300 # 6. Verify the timely insertion of the log entry against the validity
301 # period for the signing certificate.
302 # 7. Verify the signature and input against the signing certificate's
303 # public key.
304 # 8. Verify the transparency log entry's consistency against the other
305 # materials, to prevent variants of CVE-2022-36056.
306 #
307 # This method performs steps (0) through (6) above. Its caller
308 # MUST perform steps (7) and (8) separately, since they vary based on
309 # the kind of verification being performed (i.e. hashedrekord, DSSE, etc.)
310
311 cert = bundle.signing_certificate
312
313 # NOTE: The `X509Store` object currently cannot have its time reset once the `set_time`
314 # method been called on it. To get around this, we construct a new one for every `verify`
315 # call.
316 store = X509Store()
317 # NOTE: By explicitly setting the flags here, we ensure that OpenSSL's
318 # PARTIAL_CHAIN default does not change on us. Enabling PARTIAL_CHAIN
319 # would be strictly more conformant of OpenSSL, but we currently
320 # *want* the "long" chain behavior of performing path validation
321 # down to a self-signed root.
322 store.set_flags(X509StoreFlags.X509_STRICT)
323 for parent_cert_ossl in self._fulcio_certificate_chain:
324 store.add_cert(parent_cert_ossl)
325
326 # (0): Establishing a Time for the Signature
327 # First, establish verified times for the signature. This is required to
328 # validate the certificate chain, so this step comes first.
329 # These include TSA timestamps and (in the case of rekor v1 entries)
330 # rekor log integrated time.
331 verified_timestamps = self._establish_time(bundle)
332 if len(verified_timestamps) < VERIFIED_TIME_THRESHOLD:
333 raise VerificationError("not enough sources of verified time")
334
335 # (1): verify that the signing certificate is signed by the root
336 # certificate and that the signing certificate was valid at the
337 # time of signing.
338 cert_ossl = X509.from_cryptography(cert)
339 chain: list[X509] = []
340 for vts in verified_timestamps:
341 chain = self._verify_chain_at_time(cert_ossl, vts)
342
343 # (2): verify the signing certificate's SCT.
344 try:
345 verify_sct(
346 cert,
347 [parent_cert.to_cryptography() for parent_cert in chain],
348 self._trusted_root.ct_keyring(KeyringPurpose.VERIFY),
349 )
350 except VerificationError as e:
351 raise VerificationError(f"failed to verify SCT on signing certificate: {e}")
352
353 # (3): verify the signing certificate against the Sigstore
354 # X.509 profile and verify against the given `VerificationPolicy`.
355 usage_ext = cert.extensions.get_extension_for_class(KeyUsage)
356 if not usage_ext.value.digital_signature:
357 raise VerificationError("Key usage is not of type `digital signature`")
358
359 extended_usage_ext = cert.extensions.get_extension_for_class(ExtendedKeyUsage)
360 if ExtendedKeyUsageOID.CODE_SIGNING not in extended_usage_ext.value:
361 raise VerificationError("Extended usage does not contain `code signing`")
362
363 policy.verify(cert)
364
365 _logger.debug("Successfully verified signing certificate validity...")
366
367 # (4): verify the inclusion proof and signed checkpoint for the
368 # log entry.
369 # (5): verify the inclusion promise for the log entry, if present.
370 entry = bundle.log_entry
371 try:
372 entry._verify(self._trusted_root.rekor_keyring(KeyringPurpose.VERIFY))
373 except VerificationError as exc:
374 raise VerificationError(f"invalid log entry: {exc}")
375
376 # (6): verify our established times (timestamps or the log integration time) are
377 # within signing certificate validity period.
378 for vts in verified_timestamps:
379 if not (
380 bundle.signing_certificate.not_valid_before_utc
381 <= vts.time
382 <= bundle.signing_certificate.not_valid_after_utc
383 ):
384 raise VerificationError(
385 f"invalid signing cert: expired at time of signing, time via {vts}"
386 )
387
388 def verify_dsse(
389 self, bundle: Bundle, policy: VerificationPolicy
390 ) -> tuple[str, bytes]:
391 """
392 Verifies an bundle's DSSE envelope, returning the encapsulated payload
393 and its content type.
394
395 This method is only for DSSE-enveloped payloads. To verify
396 an arbitrary input against a bundle, use the `verify_artifact`
397 method.
398
399 `bundle` is the Sigstore `Bundle` to both verify and verify against.
400
401 `policy` is the `VerificationPolicy` to verify against.
402
403 Returns a tuple of `(type, payload)`, where `type` is the payload's
404 type as encoded in the DSSE envelope and `payload` is the raw `bytes`
405 of the payload. No validation of either `type` or `payload` is
406 performed; users of this API **must** assert that `type` is known
407 to them before proceeding to handle `payload` in an application-dependent
408 manner.
409 """
410
411 # (1) through (6) are performed by `_verify_common_signing_cert`.
412 self._verify_common_signing_cert(bundle, policy)
413
414 # (7): verify the bundle's signature and DSSE envelope against the
415 # signing certificate's public key.
416 envelope = bundle._dsse_envelope
417 if envelope is None:
418 raise VerificationError(
419 "cannot perform DSSE verification on a bundle without a DSSE envelope"
420 )
421
422 signing_key = bundle.signing_certificate.public_key()
423 signing_key = cast(ec.EllipticCurvePublicKey, signing_key)
424 dsse._verify(signing_key, envelope)
425
426 # (8): verify the consistency of the log entry's body against
427 # the other bundle materials.
428 # NOTE: This is very slightly weaker than the consistency check
429 # for hashedrekord entries, due to how inclusion is recorded for DSSE:
430 # the included entry for DSSE includes an envelope hash that we
431 # *cannot* verify, since the envelope is uncanonicalized JSON.
432 # Instead, we manually pick apart the entry body below and verify
433 # the parts we can (namely the payload hash and signature list).
434 entry = bundle.log_entry
435 if entry._inner.kind_version.kind != "dsse":
436 raise VerificationError(
437 f"Expected entry type dsse, got {entry._inner.kind_version.kind}"
438 )
439 if entry._inner.kind_version.version == "0.0.2":
440 _validate_dsse_v002_entry_body(bundle)
441 elif entry._inner.kind_version.version == "0.0.1":
442 _validate_dsse_v001_entry_body(bundle)
443 else:
444 raise VerificationError(
445 f"Unsupported dsse version {entry._inner.kind_version.version}"
446 )
447
448 return (envelope._inner.payload_type, envelope._inner.payload)
449
450 def verify_artifact(
451 self,
452 input_: bytes | Hashed,
453 bundle: Bundle,
454 policy: VerificationPolicy,
455 ) -> None:
456 """
457 Public API for verifying.
458
459 `input_` is the input to verify, either as a buffer of contents or as
460 a prehashed `Hashed` object.
461
462 `bundle` is the Sigstore `Bundle` to verify against.
463
464 `policy` is the `VerificationPolicy` to verify against.
465
466 On failure, this method raises `VerificationError`.
467 """
468
469 # (1) through (6) are performed by `_verify_common_signing_cert`.
470 self._verify_common_signing_cert(bundle, policy)
471
472 hashed_input = sha256_digest(input_)
473
474 # (7): verify that the signature was signed by the public key in the signing certificate.
475 try:
476 signing_key = bundle.signing_certificate.public_key()
477 signing_key = cast(ec.EllipticCurvePublicKey, signing_key)
478 signing_key.verify(
479 bundle._inner.message_signature.signature, # type: ignore[union-attr]
480 hashed_input.digest,
481 ec.ECDSA(hashed_input._as_prehashed()),
482 )
483 except InvalidSignature:
484 raise VerificationError("Signature is invalid for input")
485
486 _logger.debug("Successfully verified signature...")
487
488 # (8): verify the consistency of the log entry's body against
489 # the other bundle materials (and input being verified).
490 entry = bundle.log_entry
491 if entry._inner.kind_version.kind != "hashedrekord":
492 raise VerificationError(
493 f"Expected entry type hashedrekord, got {entry._inner.kind_version.kind}"
494 )
495
496 if entry._inner.kind_version.version == "0.0.2":
497 _validate_hashedrekord_v002_entry_body(bundle, hashed_input)
498 elif entry._inner.kind_version.version == "0.0.1":
499 _validate_hashedrekord_v001_entry_body(bundle, hashed_input)
500 else:
501 raise VerificationError(
502 f"Unsupported hashedrekord version {entry._inner.kind_version.version}"
503 )
504
505
506def _validate_dsse_v001_entry_body(bundle: Bundle) -> None:
507 """
508 Validate the Entry body for dsse v001.
509 """
510 entry = bundle.log_entry
511 envelope = bundle._dsse_envelope
512 if envelope is None:
513 raise VerificationError(
514 "cannot perform DSSE verification on a bundle without a DSSE envelope"
515 )
516 try:
517 entry_body = rekor_types.Dsse.model_validate_json(
518 entry._inner.canonicalized_body
519 )
520 except ValidationError as exc:
521 raise VerificationError(f"invalid DSSE log entry: {exc}")
522
523 payload_hash = sha256_digest(envelope._inner.payload).digest.hex()
524 if (
525 entry_body.spec.root.payload_hash.algorithm # type: ignore[union-attr]
526 != rekor_types.dsse.Algorithm.SHA256
527 ):
528 raise VerificationError("expected SHA256 payload hash in DSSE log entry")
529 if payload_hash != entry_body.spec.root.payload_hash.value: # type: ignore[union-attr]
530 raise VerificationError("log entry payload hash does not match bundle")
531
532 # NOTE: Like `dsse._verify`: multiple signatures would be frivolous here,
533 # but we handle them just in case the signer has somehow produced multiple
534 # signatures for their envelope with the same signing key.
535 signatures = [
536 rekor_types.dsse.Signature(
537 signature=base64.b64encode(signature.sig).decode(),
538 verifier=base64_encode_pem_cert(bundle.signing_certificate),
539 )
540 for signature in envelope._inner.signatures
541 ]
542 if signatures != entry_body.spec.root.signatures:
543 raise VerificationError("log entry signatures do not match bundle")
544
545
546def _validate_dsse_v002_entry_body(bundle: Bundle) -> None:
547 """
548 Validate Entry body for dsse v002.
549 """
550 entry = bundle.log_entry
551 envelope = bundle._dsse_envelope
552 if envelope is None:
553 raise VerificationError(
554 "cannot perform DSSE verification on a bundle without a DSSE envelope"
555 )
556 try:
557 v2_body = v2.entry.Entry.from_json(entry._inner.canonicalized_body)
558 except ValidationError as exc:
559 raise VerificationError(f"invalid DSSE log entry: {exc}")
560
561 if v2_body.spec.dsse_v002 is None:
562 raise VerificationError("invalid DSSE log entry: missing dsse_v002 field")
563
564 if v2_body.spec.dsse_v002.payload_hash.algorithm != v1.HashAlgorithm.SHA2_256:
565 raise VerificationError("expected SHA256 hash in DSSE entry")
566
567 digest = sha256_digest(envelope._inner.payload).digest
568 if v2_body.spec.dsse_v002.payload_hash.digest != digest:
569 raise VerificationError("DSSE entry payload hash does not match bundle")
570
571 v2_signatures = [
572 v2.verifier.Signature(
573 content=base64.b64encode(signature.sig),
574 verifier=_v2_verifier_from_certificate(bundle.signing_certificate),
575 )
576 for signature in envelope._inner.signatures
577 ]
578 if v2_signatures != v2_body.spec.dsse_v002.signatures:
579 raise VerificationError("log entry signatures do not match bundle")
580
581
582def _validate_hashedrekord_v001_entry_body(
583 bundle: Bundle, hashed_input: Hashed
584) -> None:
585 """
586 Validate the Entry body for hashedrekord v001.
587 """
588 entry = bundle.log_entry
589 expected_body = _hashedrekord_from_parts(
590 bundle.signing_certificate,
591 bundle._inner.message_signature.signature, # type: ignore[union-attr]
592 hashed_input,
593 )
594 actual_body = rekor_types.Hashedrekord.model_validate_json(
595 entry._inner.canonicalized_body
596 )
597 if expected_body != actual_body:
598 raise VerificationError(
599 "transparency log entry is inconsistent with other materials"
600 )
601
602
603def _validate_hashedrekord_v002_entry_body(
604 bundle: Bundle, hashed_input: Hashed
605) -> None:
606 """
607 Validate Entry body for hashedrekord v002.
608 """
609 entry = bundle.log_entry
610 if bundle._inner.message_signature is None:
611 raise VerificationError(
612 "invalid hashedrekord log entry: missing message signature"
613 )
614 v2_expected_body = v2.entry.Entry(
615 kind=entry._inner.kind_version.kind,
616 api_version=entry._inner.kind_version.version,
617 spec=v2.entry.Spec(
618 hashed_rekord_v002=v2.hashedrekord.HashedRekordLogEntryV002(
619 data=v1.HashOutput(
620 algorithm=hashed_input.algorithm,
621 digest=base64.b64encode(hashed_input.digest),
622 ),
623 signature=v2.verifier.Signature(
624 content=base64.b64encode(bundle._inner.message_signature.signature),
625 verifier=_v2_verifier_from_certificate(bundle.signing_certificate),
626 ),
627 )
628 ),
629 )
630 v2_actual_body = v2.entry.Entry.from_json(entry._inner.canonicalized_body)
631 if v2_expected_body != v2_actual_body:
632 raise VerificationError(
633 "transparency log entry is inconsistent with other materials"
634 )
635
636
637def _v2_verifier_from_certificate(certificate: Certificate) -> v2.verifier.Verifier:
638 """
639 Return a Rekor v2 Verifier for the signing certificate.
640
641 This method decides which signature algorithms are supported for verification
642 (in a rekor v2 entry), see
643 https://github.com/sigstore/architecture-docs/blob/main/algorithm-registry.md.
644 Note that actual signature verification happens in verify_artifact() and
645 verify_dsse(): New keytypes need to be added here and in those methods.
646 """
647 public_key = certificate.public_key()
648
649 if isinstance(public_key, ec.EllipticCurvePublicKey):
650 if isinstance(public_key.curve, ec.SECP256R1):
651 key_details = v1.PublicKeyDetails.PKIX_ECDSA_P256_SHA_256
652 elif isinstance(public_key.curve, ec.SECP384R1):
653 key_details = v1.PublicKeyDetails.PKIX_ECDSA_P384_SHA_384
654 elif isinstance(public_key.curve, ec.SECP521R1):
655 key_details = v1.PublicKeyDetails.PKIX_ECDSA_P521_SHA_512
656 else:
657 raise ValueError(f"Unsupported EC curve: {public_key.curve.name}")
658 else:
659 raise ValueError(f"Unsupported public key type: {type(public_key)}")
660
661 return v2.verifier.Verifier(
662 x509_certificate=v1.X509Certificate(
663 raw_bytes=base64.b64encode(
664 certificate.public_bytes(encoding=serialization.Encoding.DER)
665 )
666 ),
667 key_details=key_details,
668 )