Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sigstore/verify/verifier.py: 26%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

167 statements  

1# Copyright 2022 The Sigstore Authors 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15""" 

16Verification API machinery. 

17""" 

18 

19from __future__ import annotations 

20 

21import base64 

22import logging 

23from datetime import datetime, timezone 

24from typing import cast 

25 

26import rekor_types 

27from cryptography.exceptions import InvalidSignature 

28from cryptography.hazmat.primitives.asymmetric import ec 

29from cryptography.x509 import ExtendedKeyUsage, KeyUsage 

30from cryptography.x509.oid import ExtendedKeyUsageOID 

31from OpenSSL.crypto import ( 

32 X509, 

33 X509Store, 

34 X509StoreContext, 

35 X509StoreContextError, 

36 X509StoreFlags, 

37) 

38from pydantic import ValidationError 

39from rfc3161_client import TimeStampResponse, VerifierBuilder 

40from rfc3161_client import VerificationError as Rfc3161VerificationError 

41 

42from sigstore import dsse 

43from sigstore._internal.rekor import _hashedrekord_from_parts 

44from sigstore._internal.rekor.client import RekorClient 

45from sigstore._internal.sct import ( 

46 verify_sct, 

47) 

48from sigstore._internal.timestamp import TimestampSource, TimestampVerificationResult 

49from sigstore._internal.trust import ClientTrustConfig, KeyringPurpose, TrustedRoot 

50from sigstore._utils import base64_encode_pem_cert, sha256_digest 

51from sigstore.errors import VerificationError 

52from sigstore.hashes import Hashed 

53from sigstore.models import Bundle 

54from sigstore.verify.policy import VerificationPolicy 

55 

56_logger = logging.getLogger(__name__) 

57 

58# Limit the number of timestamps to prevent DoS 

59# From https://github.com/sigstore/sigstore-go/blob/e92142f0734064ebf6001f188b7330a1212245fe/pkg/verify/tsa.go#L29 

60MAX_ALLOWED_TIMESTAMP: int = 32 

61 

62# When verifying an entry, this threshold represents the minimum number of required 

63# verified times to consider a signature valid. 

64VERIFIED_TIME_THRESHOLD: int = 1 

65 

66 

67class Verifier: 

68 """ 

69 The primary API for verification operations. 

70 """ 

71 

72 def __init__(self, *, rekor: RekorClient, trusted_root: TrustedRoot): 

73 """ 

74 Create a new `Verifier`. 

75 

76 `rekor` is a `RekorClient` capable of connecting to a Rekor instance 

77 containing logs for the file(s) being verified. 

78 

79 `trusted_root` is the `TrustedRoot` object containing the root of trust 

80 for the verification process. 

81 """ 

82 self._rekor = rekor 

83 self._fulcio_certificate_chain: list[X509] = [ 

84 X509.from_cryptography(parent_cert) 

85 for parent_cert in trusted_root.get_fulcio_certs() 

86 ] 

87 self._trusted_root = trusted_root 

88 

89 @classmethod 

90 def production(cls, *, offline: bool = False) -> Verifier: 

91 """ 

92 Return a `Verifier` instance configured against Sigstore's production-level services. 

93 

94 `offline` controls the Trusted Root refresh behavior: if `True`, 

95 the verifier uses the Trusted Root in the local TUF cache. If `False`, 

96 a TUF repository refresh is attempted. 

97 """ 

98 return cls( 

99 rekor=RekorClient.production(), 

100 trusted_root=TrustedRoot.production(offline=offline), 

101 ) 

102 

103 @classmethod 

104 def staging(cls, *, offline: bool = False) -> Verifier: 

105 """ 

106 Return a `Verifier` instance configured against Sigstore's staging-level services. 

107 

108 `offline` controls the Trusted Root refresh behavior: if `True`, 

109 the verifier uses the Trusted Root in the local TUF cache. If `False`, 

110 a TUF repository refresh is attempted. 

111 """ 

112 return cls( 

113 rekor=RekorClient.staging(), 

114 trusted_root=TrustedRoot.staging(offline=offline), 

115 ) 

116 

117 @classmethod 

118 def _from_trust_config(cls, trust_config: ClientTrustConfig) -> Verifier: 

119 """ 

120 Create a `Verifier` from the given `ClientTrustConfig`. 

121 

122 @api private 

123 """ 

124 return cls( 

125 rekor=RekorClient(trust_config._inner.signing_config.tlog_urls[0]), 

126 trusted_root=trust_config.trusted_root, 

127 ) 

128 

129 def _verify_signed_timestamp( 

130 self, timestamp_response: TimeStampResponse, signature: bytes 

131 ) -> TimestampVerificationResult | None: 

132 """ 

133 Verify a Signed Timestamp using the TSA provided by the Trusted Root. 

134 """ 

135 cert_authorities = self._trusted_root.get_timestamp_authorities() 

136 for certificate_authority in cert_authorities: 

137 certificates = certificate_authority.certificates(allow_expired=True) 

138 

139 builder = VerifierBuilder() 

140 for certificate in certificates: 

141 builder.add_root_certificate(certificate) 

142 

143 verifier = builder.build() 

144 try: 

145 verifier.verify(timestamp_response, signature) 

146 except Rfc3161VerificationError as e: 

147 _logger.debug("Unable to verify Timestamp with CA.") 

148 _logger.exception(e) 

149 continue 

150 

151 if ( 

152 certificate_authority.validity_period_start 

153 and certificate_authority.validity_period_end 

154 ): 

155 if ( 

156 certificate_authority.validity_period_start 

157 <= timestamp_response.tst_info.gen_time 

158 < certificate_authority.validity_period_end 

159 ): 

160 return TimestampVerificationResult( 

161 source=TimestampSource.TIMESTAMP_AUTHORITY, 

162 time=timestamp_response.tst_info.gen_time, 

163 ) 

164 

165 _logger.debug( 

166 "Unable to verify Timestamp because not in CA time range." 

167 ) 

168 else: 

169 _logger.debug( 

170 "Unable to verify Timestamp because no validity provided." 

171 ) 

172 

173 return None 

174 

175 def _verify_timestamp_authority( 

176 self, bundle: Bundle 

177 ) -> list[TimestampVerificationResult]: 

178 """ 

179 Verify that the given bundle has been timestamped by a trusted timestamp authority 

180 and that the timestamp is valid. 

181 

182 Returns the number of valid signed timestamp in the bundle. 

183 """ 

184 timestamp_responses = ( 

185 bundle.verification_material.timestamp_verification_data.rfc3161_timestamps 

186 ) 

187 if len(timestamp_responses) > MAX_ALLOWED_TIMESTAMP: 

188 msg = f"too many signed timestamp: {len(timestamp_responses)} > {MAX_ALLOWED_TIMESTAMP}" 

189 raise VerificationError(msg) 

190 

191 if len(set(timestamp_responses)) != len(timestamp_responses): 

192 msg = "duplicate timestamp found" 

193 raise VerificationError(msg) 

194 

195 # The Signer sends a hash of the signature as the messageImprint in a TimeStampReq 

196 # to the Timestamping Service 

197 signature_hash = sha256_digest(bundle.signature).digest 

198 verified_timestamps = [ 

199 verified_timestamp 

200 for tsr in timestamp_responses 

201 if ( 

202 verified_timestamp := self._verify_signed_timestamp(tsr, signature_hash) 

203 ) 

204 ] 

205 

206 return verified_timestamps 

207 

208 def _establish_time(self, bundle: Bundle) -> list[TimestampVerificationResult]: 

209 """ 

210 Establish the time for bundle verification. 

211 

212 This method uses timestamps from two possible sources: 

213 1. RFC3161 signed timestamps from a Timestamping Authority (TSA) 

214 2. Transparency Log timestamps 

215 """ 

216 verified_timestamps = [] 

217 

218 # If a timestamp from the timestamping service is available, the Verifier MUST 

219 # perform path validation using the timestamp from the Timestamping Service. 

220 if bundle.verification_material.timestamp_verification_data.rfc3161_timestamps: 

221 if not self._trusted_root.get_timestamp_authorities(): 

222 msg = ( 

223 "no Timestamp Authorities have been provided to validate this " 

224 "bundle but it contains a signed timestamp" 

225 ) 

226 raise VerificationError(msg) 

227 

228 timestamp_from_tsa = self._verify_timestamp_authority(bundle) 

229 verified_timestamps.extend(timestamp_from_tsa) 

230 

231 # If a timestamp from the Transparency Service is available, the Verifier MUST 

232 # perform path validation using the timestamp from the Transparency Service. 

233 # NOTE: We only include this timestamp if it's accompanied by an inclusion 

234 # promise that cryptographically binds it. We verify the inclusion promise 

235 # itself later, as part of log entry verification. 

236 if ( 

237 timestamp := bundle.log_entry.integrated_time 

238 ) and bundle.log_entry.inclusion_promise: 

239 verified_timestamps.append( 

240 TimestampVerificationResult( 

241 source=TimestampSource.TRANSPARENCY_SERVICE, 

242 time=datetime.fromtimestamp(timestamp, tz=timezone.utc), 

243 ) 

244 ) 

245 return verified_timestamps 

246 

247 def _verify_chain_at_time( 

248 self, certificate: X509, timestamp_result: TimestampVerificationResult 

249 ) -> list[X509]: 

250 """ 

251 Verify the validity of the certificate chain at the given time. 

252 

253 Raises a VerificationError if the chain can't be built or be verified. 

254 """ 

255 # NOTE: The `X509Store` object cannot have its time reset once the `set_time` 

256 # method been called on it. To get around this, we construct a new one in each 

257 # call. 

258 store = X509Store() 

259 # NOTE: By explicitly setting the flags here, we ensure that OpenSSL's 

260 # PARTIAL_CHAIN default does not change on us. Enabling PARTIAL_CHAIN 

261 # would be strictly more conformant of OpenSSL, but we currently 

262 # *want* the "long" chain behavior of performing path validation 

263 # down to a self-signed root. 

264 store.set_flags(X509StoreFlags.X509_STRICT) 

265 for parent_cert_ossl in self._fulcio_certificate_chain: 

266 store.add_cert(parent_cert_ossl) 

267 

268 store.set_time(timestamp_result.time) 

269 

270 store_ctx = X509StoreContext(store, certificate) 

271 

272 try: 

273 # get_verified_chain returns the full chain including the end-entity certificate 

274 # and chain should contain only CA certificates 

275 return store_ctx.get_verified_chain()[1:] 

276 except X509StoreContextError as e: 

277 raise VerificationError(f"failed to build chain: {e}") 

278 

279 def _verify_common_signing_cert( 

280 self, bundle: Bundle, policy: VerificationPolicy 

281 ) -> None: 

282 """ 

283 Performs the signing certificate verification steps that are shared between 

284 `verify_dsse` and `verify_artifact`. 

285 

286 Raises `VerificationError` on all failures. 

287 """ 

288 

289 # In order to verify an artifact, we need to achieve the following: 

290 # 

291 # 0. Establish a time for the signature. 

292 # 1. Verify that the signing certificate chains to the root of trust 

293 # and is valid at the time of signing. 

294 # 2. Verify the signing certificate's SCT. 

295 # 3. Verify that the signing certificate conforms to the Sigstore 

296 # X.509 profile as well as the passed-in `VerificationPolicy`. 

297 # 4. Verify the inclusion proof and signed checkpoint for the log 

298 # entry. 

299 # 5. Verify the inclusion promise for the log entry, if present. 

300 # 6. Verify the timely insertion of the log entry against the validity 

301 # period for the signing certificate. 

302 # 7. Verify the signature and input against the signing certificate's 

303 # public key. 

304 # 8. Verify the transparency log entry's consistency against the other 

305 # materials, to prevent variants of CVE-2022-36056. 

306 # 

307 # This method performs steps (0) through (6) above. Its caller 

308 # MUST perform steps (7) and (8) separately, since they vary based on 

309 # the kind of verification being performed (i.e. hashedrekord, DSSE, etc.) 

310 

311 cert = bundle.signing_certificate 

312 

313 # NOTE: The `X509Store` object currently cannot have its time reset once the `set_time` 

314 # method been called on it. To get around this, we construct a new one for every `verify` 

315 # call. 

316 store = X509Store() 

317 # NOTE: By explicitly setting the flags here, we ensure that OpenSSL's 

318 # PARTIAL_CHAIN default does not change on us. Enabling PARTIAL_CHAIN 

319 # would be strictly more conformant of OpenSSL, but we currently 

320 # *want* the "long" chain behavior of performing path validation 

321 # down to a self-signed root. 

322 store.set_flags(X509StoreFlags.X509_STRICT) 

323 for parent_cert_ossl in self._fulcio_certificate_chain: 

324 store.add_cert(parent_cert_ossl) 

325 

326 # (0): Establishing a Time for the Signature 

327 # First, establish verified times for the signature. This is required to 

328 # validate the certificate chain, so this step comes first. 

329 # These include TSA timestamps and (in the case of rekor v1 entries) 

330 # rekor log integrated time. 

331 verified_timestamps = self._establish_time(bundle) 

332 if len(verified_timestamps) < VERIFIED_TIME_THRESHOLD: 

333 raise VerificationError("not enough sources of verified time") 

334 

335 # (1): verify that the signing certificate is signed by the root 

336 # certificate and that the signing certificate was valid at the 

337 # time of signing. 

338 cert_ossl = X509.from_cryptography(cert) 

339 chain: list[X509] = [] 

340 for vts in verified_timestamps: 

341 chain = self._verify_chain_at_time(cert_ossl, vts) 

342 

343 # (2): verify the signing certificate's SCT. 

344 try: 

345 verify_sct( 

346 cert, 

347 [parent_cert.to_cryptography() for parent_cert in chain], 

348 self._trusted_root.ct_keyring(KeyringPurpose.VERIFY), 

349 ) 

350 except VerificationError as e: 

351 raise VerificationError(f"failed to verify SCT on signing certificate: {e}") 

352 

353 # (3): verify the signing certificate against the Sigstore 

354 # X.509 profile and verify against the given `VerificationPolicy`. 

355 usage_ext = cert.extensions.get_extension_for_class(KeyUsage) 

356 if not usage_ext.value.digital_signature: 

357 raise VerificationError("Key usage is not of type `digital signature`") 

358 

359 extended_usage_ext = cert.extensions.get_extension_for_class(ExtendedKeyUsage) 

360 if ExtendedKeyUsageOID.CODE_SIGNING not in extended_usage_ext.value: 

361 raise VerificationError("Extended usage does not contain `code signing`") 

362 

363 policy.verify(cert) 

364 

365 _logger.debug("Successfully verified signing certificate validity...") 

366 

367 # (4): verify the inclusion proof and signed checkpoint for the 

368 # log entry. 

369 # (5): verify the inclusion promise for the log entry, if present. 

370 entry = bundle.log_entry 

371 try: 

372 entry._verify(self._trusted_root.rekor_keyring(KeyringPurpose.VERIFY)) 

373 except VerificationError as exc: 

374 raise VerificationError(f"invalid log entry: {exc}") 

375 

376 # (6): verify that log entry was integrated circa the signing certificate's 

377 # validity period. 

378 integrated_time = datetime.fromtimestamp(entry.integrated_time, tz=timezone.utc) 

379 if not ( 

380 bundle.signing_certificate.not_valid_before_utc 

381 <= integrated_time 

382 <= bundle.signing_certificate.not_valid_after_utc 

383 ): 

384 raise VerificationError( 

385 "invalid signing cert: expired at time of Rekor entry" 

386 ) 

387 

388 def verify_dsse( 

389 self, bundle: Bundle, policy: VerificationPolicy 

390 ) -> tuple[str, bytes]: 

391 """ 

392 Verifies an bundle's DSSE envelope, returning the encapsulated payload 

393 and its content type. 

394 

395 This method is only for DSSE-enveloped payloads. To verify 

396 an arbitrary input against a bundle, use the `verify_artifact` 

397 method. 

398 

399 `bundle` is the Sigstore `Bundle` to both verify and verify against. 

400 

401 `policy` is the `VerificationPolicy` to verify against. 

402 

403 Returns a tuple of `(type, payload)`, where `type` is the payload's 

404 type as encoded in the DSSE envelope and `payload` is the raw `bytes` 

405 of the payload. No validation of either `type` or `payload` is 

406 performed; users of this API **must** assert that `type` is known 

407 to them before proceeding to handle `payload` in an application-dependent 

408 manner. 

409 """ 

410 

411 # (1) through (6) are performed by `_verify_common_signing_cert`. 

412 self._verify_common_signing_cert(bundle, policy) 

413 

414 # (7): verify the bundle's signature and DSSE envelope against the 

415 # signing certificate's public key. 

416 envelope = bundle._dsse_envelope 

417 if envelope is None: 

418 raise VerificationError( 

419 "cannot perform DSSE verification on a bundle without a DSSE envelope" 

420 ) 

421 

422 signing_key = bundle.signing_certificate.public_key() 

423 signing_key = cast(ec.EllipticCurvePublicKey, signing_key) 

424 dsse._verify(signing_key, envelope) 

425 

426 # (8): verify the consistency of the log entry's body against 

427 # the other bundle materials. 

428 # NOTE: This is very slightly weaker than the consistency check 

429 # for hashedrekord entries, due to how inclusion is recorded for DSSE: 

430 # the included entry for DSSE includes an envelope hash that we 

431 # *cannot* verify, since the envelope is uncanonicalized JSON. 

432 # Instead, we manually pick apart the entry body below and verify 

433 # the parts we can (namely the payload hash and signature list). 

434 entry = bundle.log_entry 

435 try: 

436 entry_body = rekor_types.Dsse.model_validate_json( 

437 base64.b64decode(entry.body) 

438 ) 

439 except ValidationError as exc: 

440 raise VerificationError(f"invalid DSSE log entry: {exc}") 

441 

442 payload_hash = sha256_digest(envelope._inner.payload).digest.hex() 

443 if ( 

444 entry_body.spec.root.payload_hash.algorithm # type: ignore[union-attr] 

445 != rekor_types.dsse.Algorithm.SHA256 

446 ): 

447 raise VerificationError("expected SHA256 payload hash in DSSE log entry") 

448 if payload_hash != entry_body.spec.root.payload_hash.value: # type: ignore[union-attr] 

449 raise VerificationError("log entry payload hash does not match bundle") 

450 

451 # NOTE: Like `dsse._verify`: multiple signatures would be frivolous here, 

452 # but we handle them just in case the signer has somehow produced multiple 

453 # signatures for their envelope with the same signing key. 

454 signatures = [ 

455 rekor_types.dsse.Signature( 

456 signature=base64.b64encode(signature.sig).decode(), 

457 verifier=base64_encode_pem_cert(bundle.signing_certificate), 

458 ) 

459 for signature in envelope._inner.signatures 

460 ] 

461 if signatures != entry_body.spec.root.signatures: 

462 raise VerificationError("log entry signatures do not match bundle") 

463 

464 return (envelope._inner.payload_type, envelope._inner.payload) 

465 

466 def verify_artifact( 

467 self, 

468 input_: bytes | Hashed, 

469 bundle: Bundle, 

470 policy: VerificationPolicy, 

471 ) -> None: 

472 """ 

473 Public API for verifying. 

474 

475 `input_` is the input to verify, either as a buffer of contents or as 

476 a prehashed `Hashed` object. 

477 

478 `bundle` is the Sigstore `Bundle` to verify against. 

479 

480 `policy` is the `VerificationPolicy` to verify against. 

481 

482 On failure, this method raises `VerificationError`. 

483 """ 

484 

485 # (1) through (6) are performed by `_verify_common_signing_cert`. 

486 self._verify_common_signing_cert(bundle, policy) 

487 

488 hashed_input = sha256_digest(input_) 

489 

490 # (7): verify that the signature was signed by the public key in the signing certificate. 

491 try: 

492 signing_key = bundle.signing_certificate.public_key() 

493 signing_key = cast(ec.EllipticCurvePublicKey, signing_key) 

494 signing_key.verify( 

495 bundle._inner.message_signature.signature, 

496 hashed_input.digest, 

497 ec.ECDSA(hashed_input._as_prehashed()), 

498 ) 

499 except InvalidSignature: 

500 raise VerificationError("Signature is invalid for input") 

501 

502 _logger.debug("Successfully verified signature...") 

503 

504 # (8): verify the consistency of the log entry's body against 

505 # the other bundle materials (and input being verified). 

506 entry = bundle.log_entry 

507 

508 expected_body = _hashedrekord_from_parts( 

509 bundle.signing_certificate, 

510 bundle._inner.message_signature.signature, 

511 hashed_input, 

512 ) 

513 actual_body = rekor_types.Hashedrekord.model_validate_json( 

514 base64.b64decode(entry.body) 

515 ) 

516 if expected_body != actual_body: 

517 raise VerificationError( 

518 "transparency log entry is inconsistent with other materials" 

519 )