1# Copyright 2022 The Sigstore Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""
16Verification API machinery.
17"""
18
19from __future__ import annotations
20
21import base64
22import logging
23from datetime import datetime, timezone
24from typing import cast
25
26import rekor_types
27from cryptography.exceptions import InvalidSignature
28from cryptography.hazmat.primitives.asymmetric import ec
29from cryptography.x509 import ExtendedKeyUsage, KeyUsage
30from cryptography.x509.oid import ExtendedKeyUsageOID
31from OpenSSL.crypto import (
32 X509,
33 X509Store,
34 X509StoreContext,
35 X509StoreContextError,
36 X509StoreFlags,
37)
38from pydantic import ValidationError
39from rfc3161_client import TimeStampResponse, VerifierBuilder
40from rfc3161_client import VerificationError as Rfc3161VerificationError
41
42from sigstore import dsse
43from sigstore._internal.rekor import _hashedrekord_from_parts
44from sigstore._internal.rekor.client import RekorClient
45from sigstore._internal.sct import (
46 verify_sct,
47)
48from sigstore._internal.timestamp import TimestampSource, TimestampVerificationResult
49from sigstore._internal.trust import ClientTrustConfig, KeyringPurpose, TrustedRoot
50from sigstore._utils import base64_encode_pem_cert, sha256_digest
51from sigstore.errors import VerificationError
52from sigstore.hashes import Hashed
53from sigstore.models import Bundle
54from sigstore.verify.policy import VerificationPolicy
55
56_logger = logging.getLogger(__name__)
57
58# Limit the number of timestamps to prevent DoS
59# From https://github.com/sigstore/sigstore-go/blob/e92142f0734064ebf6001f188b7330a1212245fe/pkg/verify/tsa.go#L29
60MAX_ALLOWED_TIMESTAMP: int = 32
61
62# When verifying an entry, this threshold represents the minimum number of required
63# verified times to consider a signature valid.
64VERIFIED_TIME_THRESHOLD: int = 1
65
66
67class Verifier:
68 """
69 The primary API for verification operations.
70 """
71
72 def __init__(self, *, rekor: RekorClient, trusted_root: TrustedRoot):
73 """
74 Create a new `Verifier`.
75
76 `rekor` is a `RekorClient` capable of connecting to a Rekor instance
77 containing logs for the file(s) being verified.
78
79 `trusted_root` is the `TrustedRoot` object containing the root of trust
80 for the verification process.
81 """
82 self._rekor = rekor
83 self._fulcio_certificate_chain: list[X509] = [
84 X509.from_cryptography(parent_cert)
85 for parent_cert in trusted_root.get_fulcio_certs()
86 ]
87 self._trusted_root = trusted_root
88
89 @classmethod
90 def production(cls, *, offline: bool = False) -> Verifier:
91 """
92 Return a `Verifier` instance configured against Sigstore's production-level services.
93
94 `offline` controls the Trusted Root refresh behavior: if `True`,
95 the verifier uses the Trusted Root in the local TUF cache. If `False`,
96 a TUF repository refresh is attempted.
97 """
98 return cls(
99 rekor=RekorClient.production(),
100 trusted_root=TrustedRoot.production(offline=offline),
101 )
102
103 @classmethod
104 def staging(cls, *, offline: bool = False) -> Verifier:
105 """
106 Return a `Verifier` instance configured against Sigstore's staging-level services.
107
108 `offline` controls the Trusted Root refresh behavior: if `True`,
109 the verifier uses the Trusted Root in the local TUF cache. If `False`,
110 a TUF repository refresh is attempted.
111 """
112 return cls(
113 rekor=RekorClient.staging(),
114 trusted_root=TrustedRoot.staging(offline=offline),
115 )
116
117 @classmethod
118 def _from_trust_config(cls, trust_config: ClientTrustConfig) -> Verifier:
119 """
120 Create a `Verifier` from the given `ClientTrustConfig`.
121
122 @api private
123 """
124 return cls(
125 rekor=RekorClient(trust_config._inner.signing_config.tlog_urls[0]),
126 trusted_root=trust_config.trusted_root,
127 )
128
129 def _verify_signed_timestamp(
130 self, timestamp_response: TimeStampResponse, signature: bytes
131 ) -> TimestampVerificationResult | None:
132 """
133 Verify a Signed Timestamp using the TSA provided by the Trusted Root.
134 """
135 cert_authorities = self._trusted_root.get_timestamp_authorities()
136 for certificate_authority in cert_authorities:
137 certificates = certificate_authority.certificates(allow_expired=True)
138
139 builder = VerifierBuilder()
140 for certificate in certificates:
141 builder.add_root_certificate(certificate)
142
143 verifier = builder.build()
144 try:
145 verifier.verify(timestamp_response, signature)
146 except Rfc3161VerificationError as e:
147 _logger.debug("Unable to verify Timestamp with CA.")
148 _logger.exception(e)
149 continue
150
151 if (
152 certificate_authority.validity_period_start
153 and certificate_authority.validity_period_end
154 ):
155 if (
156 certificate_authority.validity_period_start
157 <= timestamp_response.tst_info.gen_time
158 < certificate_authority.validity_period_end
159 ):
160 return TimestampVerificationResult(
161 source=TimestampSource.TIMESTAMP_AUTHORITY,
162 time=timestamp_response.tst_info.gen_time,
163 )
164
165 _logger.debug(
166 "Unable to verify Timestamp because not in CA time range."
167 )
168 else:
169 _logger.debug(
170 "Unable to verify Timestamp because no validity provided."
171 )
172
173 return None
174
175 def _verify_timestamp_authority(
176 self, bundle: Bundle
177 ) -> list[TimestampVerificationResult]:
178 """
179 Verify that the given bundle has been timestamped by a trusted timestamp authority
180 and that the timestamp is valid.
181
182 Returns the number of valid signed timestamp in the bundle.
183 """
184 timestamp_responses = (
185 bundle.verification_material.timestamp_verification_data.rfc3161_timestamps
186 )
187 if len(timestamp_responses) > MAX_ALLOWED_TIMESTAMP:
188 msg = f"too many signed timestamp: {len(timestamp_responses)} > {MAX_ALLOWED_TIMESTAMP}"
189 raise VerificationError(msg)
190
191 if len(set(timestamp_responses)) != len(timestamp_responses):
192 msg = "duplicate timestamp found"
193 raise VerificationError(msg)
194
195 # The Signer sends a hash of the signature as the messageImprint in a TimeStampReq
196 # to the Timestamping Service
197 signature_hash = sha256_digest(bundle.signature).digest
198 verified_timestamps = [
199 verified_timestamp
200 for tsr in timestamp_responses
201 if (
202 verified_timestamp := self._verify_signed_timestamp(tsr, signature_hash)
203 )
204 ]
205
206 return verified_timestamps
207
208 def _establish_time(self, bundle: Bundle) -> list[TimestampVerificationResult]:
209 """
210 Establish the time for bundle verification.
211
212 This method uses timestamps from two possible sources:
213 1. RFC3161 signed timestamps from a Timestamping Authority (TSA)
214 2. Transparency Log timestamps
215 """
216 verified_timestamps = []
217
218 # If a timestamp from the timestamping service is available, the Verifier MUST
219 # perform path validation using the timestamp from the Timestamping Service.
220 if bundle.verification_material.timestamp_verification_data.rfc3161_timestamps:
221 if not self._trusted_root.get_timestamp_authorities():
222 msg = (
223 "no Timestamp Authorities have been provided to validate this "
224 "bundle but it contains a signed timestamp"
225 )
226 raise VerificationError(msg)
227
228 timestamp_from_tsa = self._verify_timestamp_authority(bundle)
229 verified_timestamps.extend(timestamp_from_tsa)
230
231 # If a timestamp from the Transparency Service is available, the Verifier MUST
232 # perform path validation using the timestamp from the Transparency Service.
233 # NOTE: We only include this timestamp if it's accompanied by an inclusion
234 # promise that cryptographically binds it. We verify the inclusion promise
235 # itself later, as part of log entry verification.
236 if (
237 timestamp := bundle.log_entry.integrated_time
238 ) and bundle.log_entry.inclusion_promise:
239 verified_timestamps.append(
240 TimestampVerificationResult(
241 source=TimestampSource.TRANSPARENCY_SERVICE,
242 time=datetime.fromtimestamp(timestamp, tz=timezone.utc),
243 )
244 )
245 return verified_timestamps
246
247 def _verify_chain_at_time(
248 self, certificate: X509, timestamp_result: TimestampVerificationResult
249 ) -> list[X509]:
250 """
251 Verify the validity of the certificate chain at the given time.
252
253 Raises a VerificationError if the chain can't be built or be verified.
254 """
255 # NOTE: The `X509Store` object cannot have its time reset once the `set_time`
256 # method been called on it. To get around this, we construct a new one in each
257 # call.
258 store = X509Store()
259 # NOTE: By explicitly setting the flags here, we ensure that OpenSSL's
260 # PARTIAL_CHAIN default does not change on us. Enabling PARTIAL_CHAIN
261 # would be strictly more conformant of OpenSSL, but we currently
262 # *want* the "long" chain behavior of performing path validation
263 # down to a self-signed root.
264 store.set_flags(X509StoreFlags.X509_STRICT)
265 for parent_cert_ossl in self._fulcio_certificate_chain:
266 store.add_cert(parent_cert_ossl)
267
268 store.set_time(timestamp_result.time)
269
270 store_ctx = X509StoreContext(store, certificate)
271
272 try:
273 # get_verified_chain returns the full chain including the end-entity certificate
274 # and chain should contain only CA certificates
275 return store_ctx.get_verified_chain()[1:]
276 except X509StoreContextError as e:
277 raise VerificationError(f"failed to build chain: {e}")
278
279 def _verify_common_signing_cert(
280 self, bundle: Bundle, policy: VerificationPolicy
281 ) -> None:
282 """
283 Performs the signing certificate verification steps that are shared between
284 `verify_dsse` and `verify_artifact`.
285
286 Raises `VerificationError` on all failures.
287 """
288
289 # In order to verify an artifact, we need to achieve the following:
290 #
291 # 0. Establish a time for the signature.
292 # 1. Verify that the signing certificate chains to the root of trust
293 # and is valid at the time of signing.
294 # 2. Verify the signing certificate's SCT.
295 # 3. Verify that the signing certificate conforms to the Sigstore
296 # X.509 profile as well as the passed-in `VerificationPolicy`.
297 # 4. Verify the inclusion proof and signed checkpoint for the log
298 # entry.
299 # 5. Verify the inclusion promise for the log entry, if present.
300 # 6. Verify the timely insertion of the log entry against the validity
301 # period for the signing certificate.
302 # 7. Verify the signature and input against the signing certificate's
303 # public key.
304 # 8. Verify the transparency log entry's consistency against the other
305 # materials, to prevent variants of CVE-2022-36056.
306 #
307 # This method performs steps (0) through (6) above. Its caller
308 # MUST perform steps (7) and (8) separately, since they vary based on
309 # the kind of verification being performed (i.e. hashedrekord, DSSE, etc.)
310
311 cert = bundle.signing_certificate
312
313 # NOTE: The `X509Store` object currently cannot have its time reset once the `set_time`
314 # method been called on it. To get around this, we construct a new one for every `verify`
315 # call.
316 store = X509Store()
317 # NOTE: By explicitly setting the flags here, we ensure that OpenSSL's
318 # PARTIAL_CHAIN default does not change on us. Enabling PARTIAL_CHAIN
319 # would be strictly more conformant of OpenSSL, but we currently
320 # *want* the "long" chain behavior of performing path validation
321 # down to a self-signed root.
322 store.set_flags(X509StoreFlags.X509_STRICT)
323 for parent_cert_ossl in self._fulcio_certificate_chain:
324 store.add_cert(parent_cert_ossl)
325
326 # (0): Establishing a Time for the Signature
327 # First, establish verified times for the signature. This is required to
328 # validate the certificate chain, so this step comes first.
329 # These include TSA timestamps and (in the case of rekor v1 entries)
330 # rekor log integrated time.
331 verified_timestamps = self._establish_time(bundle)
332 if len(verified_timestamps) < VERIFIED_TIME_THRESHOLD:
333 raise VerificationError("not enough sources of verified time")
334
335 # (1): verify that the signing certificate is signed by the root
336 # certificate and that the signing certificate was valid at the
337 # time of signing.
338 cert_ossl = X509.from_cryptography(cert)
339 chain: list[X509] = []
340 for vts in verified_timestamps:
341 chain = self._verify_chain_at_time(cert_ossl, vts)
342
343 # (2): verify the signing certificate's SCT.
344 try:
345 verify_sct(
346 cert,
347 [parent_cert.to_cryptography() for parent_cert in chain],
348 self._trusted_root.ct_keyring(KeyringPurpose.VERIFY),
349 )
350 except VerificationError as e:
351 raise VerificationError(f"failed to verify SCT on signing certificate: {e}")
352
353 # (3): verify the signing certificate against the Sigstore
354 # X.509 profile and verify against the given `VerificationPolicy`.
355 usage_ext = cert.extensions.get_extension_for_class(KeyUsage)
356 if not usage_ext.value.digital_signature:
357 raise VerificationError("Key usage is not of type `digital signature`")
358
359 extended_usage_ext = cert.extensions.get_extension_for_class(ExtendedKeyUsage)
360 if ExtendedKeyUsageOID.CODE_SIGNING not in extended_usage_ext.value:
361 raise VerificationError("Extended usage does not contain `code signing`")
362
363 policy.verify(cert)
364
365 _logger.debug("Successfully verified signing certificate validity...")
366
367 # (4): verify the inclusion proof and signed checkpoint for the
368 # log entry.
369 # (5): verify the inclusion promise for the log entry, if present.
370 entry = bundle.log_entry
371 try:
372 entry._verify(self._trusted_root.rekor_keyring(KeyringPurpose.VERIFY))
373 except VerificationError as exc:
374 raise VerificationError(f"invalid log entry: {exc}")
375
376 # (6): verify that log entry was integrated circa the signing certificate's
377 # validity period.
378 integrated_time = datetime.fromtimestamp(entry.integrated_time, tz=timezone.utc)
379 if not (
380 bundle.signing_certificate.not_valid_before_utc
381 <= integrated_time
382 <= bundle.signing_certificate.not_valid_after_utc
383 ):
384 raise VerificationError(
385 "invalid signing cert: expired at time of Rekor entry"
386 )
387
388 def verify_dsse(
389 self, bundle: Bundle, policy: VerificationPolicy
390 ) -> tuple[str, bytes]:
391 """
392 Verifies an bundle's DSSE envelope, returning the encapsulated payload
393 and its content type.
394
395 This method is only for DSSE-enveloped payloads. To verify
396 an arbitrary input against a bundle, use the `verify_artifact`
397 method.
398
399 `bundle` is the Sigstore `Bundle` to both verify and verify against.
400
401 `policy` is the `VerificationPolicy` to verify against.
402
403 Returns a tuple of `(type, payload)`, where `type` is the payload's
404 type as encoded in the DSSE envelope and `payload` is the raw `bytes`
405 of the payload. No validation of either `type` or `payload` is
406 performed; users of this API **must** assert that `type` is known
407 to them before proceeding to handle `payload` in an application-dependent
408 manner.
409 """
410
411 # (1) through (6) are performed by `_verify_common_signing_cert`.
412 self._verify_common_signing_cert(bundle, policy)
413
414 # (7): verify the bundle's signature and DSSE envelope against the
415 # signing certificate's public key.
416 envelope = bundle._dsse_envelope
417 if envelope is None:
418 raise VerificationError(
419 "cannot perform DSSE verification on a bundle without a DSSE envelope"
420 )
421
422 signing_key = bundle.signing_certificate.public_key()
423 signing_key = cast(ec.EllipticCurvePublicKey, signing_key)
424 dsse._verify(signing_key, envelope)
425
426 # (8): verify the consistency of the log entry's body against
427 # the other bundle materials.
428 # NOTE: This is very slightly weaker than the consistency check
429 # for hashedrekord entries, due to how inclusion is recorded for DSSE:
430 # the included entry for DSSE includes an envelope hash that we
431 # *cannot* verify, since the envelope is uncanonicalized JSON.
432 # Instead, we manually pick apart the entry body below and verify
433 # the parts we can (namely the payload hash and signature list).
434 entry = bundle.log_entry
435 try:
436 entry_body = rekor_types.Dsse.model_validate_json(
437 base64.b64decode(entry.body)
438 )
439 except ValidationError as exc:
440 raise VerificationError(f"invalid DSSE log entry: {exc}")
441
442 payload_hash = sha256_digest(envelope._inner.payload).digest.hex()
443 if (
444 entry_body.spec.root.payload_hash.algorithm # type: ignore[union-attr]
445 != rekor_types.dsse.Algorithm.SHA256
446 ):
447 raise VerificationError("expected SHA256 payload hash in DSSE log entry")
448 if payload_hash != entry_body.spec.root.payload_hash.value: # type: ignore[union-attr]
449 raise VerificationError("log entry payload hash does not match bundle")
450
451 # NOTE: Like `dsse._verify`: multiple signatures would be frivolous here,
452 # but we handle them just in case the signer has somehow produced multiple
453 # signatures for their envelope with the same signing key.
454 signatures = [
455 rekor_types.dsse.Signature(
456 signature=base64.b64encode(signature.sig).decode(),
457 verifier=base64_encode_pem_cert(bundle.signing_certificate),
458 )
459 for signature in envelope._inner.signatures
460 ]
461 if signatures != entry_body.spec.root.signatures:
462 raise VerificationError("log entry signatures do not match bundle")
463
464 return (envelope._inner.payload_type, envelope._inner.payload)
465
466 def verify_artifact(
467 self,
468 input_: bytes | Hashed,
469 bundle: Bundle,
470 policy: VerificationPolicy,
471 ) -> None:
472 """
473 Public API for verifying.
474
475 `input_` is the input to verify, either as a buffer of contents or as
476 a prehashed `Hashed` object.
477
478 `bundle` is the Sigstore `Bundle` to verify against.
479
480 `policy` is the `VerificationPolicy` to verify against.
481
482 On failure, this method raises `VerificationError`.
483 """
484
485 # (1) through (6) are performed by `_verify_common_signing_cert`.
486 self._verify_common_signing_cert(bundle, policy)
487
488 hashed_input = sha256_digest(input_)
489
490 # (7): verify that the signature was signed by the public key in the signing certificate.
491 try:
492 signing_key = bundle.signing_certificate.public_key()
493 signing_key = cast(ec.EllipticCurvePublicKey, signing_key)
494 signing_key.verify(
495 bundle._inner.message_signature.signature,
496 hashed_input.digest,
497 ec.ECDSA(hashed_input._as_prehashed()),
498 )
499 except InvalidSignature:
500 raise VerificationError("Signature is invalid for input")
501
502 _logger.debug("Successfully verified signature...")
503
504 # (8): verify the consistency of the log entry's body against
505 # the other bundle materials (and input being verified).
506 entry = bundle.log_entry
507
508 expected_body = _hashedrekord_from_parts(
509 bundle.signing_certificate,
510 bundle._inner.message_signature.signature,
511 hashed_input,
512 )
513 actual_body = rekor_types.Hashedrekord.model_validate_json(
514 base64.b64decode(entry.body)
515 )
516 if expected_body != actual_body:
517 raise VerificationError(
518 "transparency log entry is inconsistent with other materials"
519 )