Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sigstore/_internal/trust.py: 45%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

185 statements  

1# Copyright 2023 The Sigstore Authors 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15""" 

16Client trust configuration and trust root management for sigstore-python. 

17""" 

18 

19from __future__ import annotations 

20 

21import logging 

22from collections.abc import Iterable 

23from dataclasses import dataclass 

24from datetime import datetime, timezone 

25from enum import Enum 

26from pathlib import Path 

27from typing import ClassVar, NewType 

28 

29import cryptography.hazmat.primitives.asymmetric.padding as padding 

30from cryptography.exceptions import InvalidSignature 

31from cryptography.hazmat.primitives import hashes 

32from cryptography.hazmat.primitives.asymmetric import ec, rsa 

33from cryptography.x509 import ( 

34 Certificate, 

35 load_der_x509_certificate, 

36) 

37from sigstore_protobuf_specs.dev.sigstore.common.v1 import PublicKey as _PublicKey 

38from sigstore_protobuf_specs.dev.sigstore.common.v1 import ( 

39 PublicKeyDetails as _PublicKeyDetails, 

40) 

41from sigstore_protobuf_specs.dev.sigstore.common.v1 import TimeRange 

42from sigstore_protobuf_specs.dev.sigstore.trustroot.v1 import ( 

43 CertificateAuthority as _CertificateAuthority, 

44) 

45from sigstore_protobuf_specs.dev.sigstore.trustroot.v1 import ( 

46 ClientTrustConfig as _ClientTrustConfig, 

47) 

48from sigstore_protobuf_specs.dev.sigstore.trustroot.v1 import ( 

49 TransparencyLogInstance, 

50) 

51from sigstore_protobuf_specs.dev.sigstore.trustroot.v1 import ( 

52 TrustedRoot as _TrustedRoot, 

53) 

54 

55from sigstore._internal.tuf import DEFAULT_TUF_URL, STAGING_TUF_URL, TrustUpdater 

56from sigstore._utils import ( 

57 KeyID, 

58 PublicKey, 

59 key_id, 

60 load_der_public_key, 

61) 

62from sigstore.errors import Error, MetadataError, VerificationError 

63 

64_logger = logging.getLogger(__name__) 

65 

66 

67def _is_timerange_valid(period: TimeRange | None, *, allow_expired: bool) -> bool: 

68 """ 

69 Given a `period`, checks that the the current time is not before `start`. If 

70 `allow_expired` is `False`, also checks that the current time is not after 

71 `end`. 

72 """ 

73 now = datetime.now(timezone.utc) 

74 

75 # If there was no validity period specified, the key is always valid. 

76 if not period: 

77 return True 

78 

79 # Active: if the current time is before the starting period, we are not yet 

80 # valid. 

81 if now < period.start: 

82 return False 

83 

84 # If we want Expired keys, the key is valid at this point. Otherwise, check 

85 # that we are within range. 

86 return allow_expired or (period.end is None or now <= period.end) 

87 

88 

89@dataclass(init=False) 

90class Key: 

91 """ 

92 Represents a key in a `Keyring`. 

93 """ 

94 

95 hash_algorithm: hashes.HashAlgorithm 

96 key: PublicKey 

97 key_id: KeyID 

98 

99 _RSA_SHA_256_DETAILS: ClassVar[set[_PublicKeyDetails]] = { 

100 _PublicKeyDetails.PKCS1_RSA_PKCS1V5, 

101 _PublicKeyDetails.PKIX_RSA_PKCS1V15_2048_SHA256, 

102 _PublicKeyDetails.PKIX_RSA_PKCS1V15_3072_SHA256, 

103 _PublicKeyDetails.PKIX_RSA_PKCS1V15_4096_SHA256, 

104 } 

105 

106 _EC_DETAILS_TO_HASH: ClassVar[dict[_PublicKeyDetails, hashes.HashAlgorithm]] = { 

107 _PublicKeyDetails.PKIX_ECDSA_P256_SHA_256: hashes.SHA256(), 

108 _PublicKeyDetails.PKIX_ECDSA_P384_SHA_384: hashes.SHA384(), 

109 _PublicKeyDetails.PKIX_ECDSA_P521_SHA_512: hashes.SHA512(), 

110 } 

111 

112 def __init__(self, public_key: _PublicKey) -> None: 

113 """ 

114 Construct a key from the given Sigstore PublicKey message. 

115 """ 

116 

117 # NOTE: `raw_bytes` is marked as `optional` in the `PublicKey` message, 

118 # for unclear reasons. 

119 if not public_key.raw_bytes: 

120 raise VerificationError("public key is empty") 

121 

122 hash_algorithm: hashes.HashAlgorithm 

123 if public_key.key_details in self._RSA_SHA_256_DETAILS: 

124 hash_algorithm = hashes.SHA256() 

125 key = load_der_public_key(public_key.raw_bytes, types=(rsa.RSAPublicKey,)) 

126 elif public_key.key_details in self._EC_DETAILS_TO_HASH: 

127 hash_algorithm = self._EC_DETAILS_TO_HASH[public_key.key_details] 

128 key = load_der_public_key( 

129 public_key.raw_bytes, types=(ec.EllipticCurvePublicKey,) 

130 ) 

131 else: 

132 raise VerificationError(f"unsupported key type: {public_key.key_details}") 

133 

134 self.hash_algorithm = hash_algorithm 

135 self.key = key 

136 self.key_id = key_id(key) 

137 

138 def verify(self, signature: bytes, data: bytes) -> None: 

139 """ 

140 Verifies the given `data` against `signature` using the current key. 

141 """ 

142 if isinstance(self.key, rsa.RSAPublicKey): 

143 self.key.verify( 

144 signature=signature, 

145 data=data, 

146 # TODO: Parametrize this as well, for PSS. 

147 padding=padding.PKCS1v15(), 

148 algorithm=self.hash_algorithm, 

149 ) 

150 elif isinstance(self.key, ec.EllipticCurvePublicKey): 

151 self.key.verify( 

152 signature=signature, 

153 data=data, 

154 signature_algorithm=ec.ECDSA(self.hash_algorithm), 

155 ) 

156 else: 

157 # Unreachable without API misuse. 

158 raise VerificationError(f"keyring: unsupported key: {self.key}") 

159 

160 

161class Keyring: 

162 """ 

163 Represents a set of keys, each of which is a potentially valid verifier. 

164 """ 

165 

166 def __init__(self, public_keys: list[_PublicKey] = []): 

167 """ 

168 Create a new `Keyring`, with `keys` as the initial set of verifying keys. 

169 """ 

170 self._keyring: dict[KeyID, Key] = {} 

171 

172 for public_key in public_keys: 

173 try: 

174 key = Key(public_key) 

175 self._keyring[key.key_id] = key 

176 except VerificationError as e: 

177 _logger.warning(f"Failed to load a trusted root key: {e}") 

178 

179 def verify(self, *, key_id: KeyID, signature: bytes, data: bytes) -> None: 

180 """ 

181 Verify that `signature` is a valid signature for `data`, using the 

182 key identified by `key_id`. 

183 

184 `key_id` is an unauthenticated hint; if no key matches the given key ID, 

185 all keys in the keyring are tried. 

186 

187 Raises if the signature is invalid, i.e. is not valid for any of the 

188 keys in the keyring. 

189 """ 

190 

191 key = self._keyring.get(key_id) 

192 candidates = [key] if key is not None else list(self._keyring.values()) 

193 

194 # Try to verify each candidate key. In the happy case, this will 

195 # be exactly one candidate. 

196 valid = False 

197 for candidate in candidates: 

198 try: 

199 candidate.verify(signature, data) 

200 valid = True 

201 break 

202 except InvalidSignature: 

203 pass 

204 

205 if not valid: 

206 raise VerificationError("keyring: invalid signature") 

207 

208 

209RekorKeyring = NewType("RekorKeyring", Keyring) 

210CTKeyring = NewType("CTKeyring", Keyring) 

211 

212 

213class KeyringPurpose(str, Enum): 

214 """ 

215 Keyring purpose typing 

216 """ 

217 

218 SIGN = "sign" 

219 VERIFY = "verify" 

220 

221 def __str__(self) -> str: 

222 """Returns the purpose string value.""" 

223 return self.value 

224 

225 

226class CertificateAuthority: 

227 """ 

228 Certificate Authority used in a Trusted Root configuration. 

229 """ 

230 

231 def __init__(self, inner: _CertificateAuthority): 

232 """ 

233 Construct a new `CertificateAuthority`. 

234 

235 @api private 

236 """ 

237 self._inner = inner 

238 self._certificates: list[Certificate] = [] 

239 self._verify() 

240 

241 @classmethod 

242 def from_json(cls, path: str) -> CertificateAuthority: 

243 """ 

244 Create a CertificateAuthority directly from JSON. 

245 """ 

246 inner = _CertificateAuthority().from_json(Path(path).read_bytes()) 

247 return cls(inner) 

248 

249 def _verify(self) -> None: 

250 """ 

251 Verify and load the certificate authority. 

252 """ 

253 self._certificates = [ 

254 load_der_x509_certificate(cert.raw_bytes) 

255 for cert in self._inner.cert_chain.certificates 

256 ] 

257 

258 if not self._certificates: 

259 raise Error("missing a certificate in Certificate Authority") 

260 

261 @property 

262 def validity_period_start(self) -> datetime | None: 

263 """ 

264 Validity period start. 

265 """ 

266 return self._inner.valid_for.start 

267 

268 @property 

269 def validity_period_end(self) -> datetime | None: 

270 """ 

271 Validity period end. 

272 """ 

273 return self._inner.valid_for.end 

274 

275 def certificates(self, *, allow_expired: bool) -> list[Certificate]: 

276 """ 

277 Return a list of certificates in the authority chain. 

278 

279 The certificates are returned in order from leaf to root, with any 

280 intermediate certificates in between. 

281 """ 

282 if not _is_timerange_valid(self._inner.valid_for, allow_expired=allow_expired): 

283 return [] 

284 return self._certificates 

285 

286 

287class TrustedRoot: 

288 """ 

289 The cryptographic root(s) of trust for a Sigstore instance. 

290 """ 

291 

292 class TrustedRootType(str, Enum): 

293 """ 

294 Known Sigstore trusted root media types. 

295 """ 

296 

297 TRUSTED_ROOT_0_1 = "application/vnd.dev.sigstore.trustedroot+json;version=0.1" 

298 

299 def __str__(self) -> str: 

300 """Returns the variant's string value.""" 

301 return self.value 

302 

303 def __init__(self, inner: _TrustedRoot): 

304 """ 

305 Construct a new `TrustedRoot`. 

306 

307 @api private 

308 """ 

309 self._inner = inner 

310 self._verify() 

311 

312 def _verify(self) -> None: 

313 """ 

314 Performs various feats of heroism to ensure that the trusted root 

315 is well-formed. 

316 """ 

317 

318 # The trusted root must have a recognized media type. 

319 try: 

320 TrustedRoot.TrustedRootType(self._inner.media_type) 

321 except ValueError: 

322 raise Error(f"unsupported trusted root format: {self._inner.media_type}") 

323 

324 @classmethod 

325 def from_file( 

326 cls, 

327 path: str, 

328 ) -> TrustedRoot: 

329 """Create a new trust root from file""" 

330 inner = _TrustedRoot().from_json(Path(path).read_bytes()) 

331 return cls(inner) 

332 

333 @classmethod 

334 def from_tuf( 

335 cls, 

336 url: str, 

337 offline: bool = False, 

338 ) -> TrustedRoot: 

339 """Create a new trust root from a TUF repository. 

340 

341 If `offline`, will use trust root in local TUF cache. Otherwise will 

342 update the trust root from remote TUF repository. 

343 """ 

344 path = TrustUpdater(url, offline).get_trusted_root_path() 

345 return cls.from_file(path) 

346 

347 @classmethod 

348 def production( 

349 cls, 

350 offline: bool = False, 

351 ) -> TrustedRoot: 

352 """Create new trust root from Sigstore production TUF repository. 

353 

354 If `offline`, will use trust root in local TUF cache. Otherwise will 

355 update the trust root from remote TUF repository. 

356 """ 

357 return cls.from_tuf(DEFAULT_TUF_URL, offline) 

358 

359 @classmethod 

360 def staging( 

361 cls, 

362 offline: bool = False, 

363 ) -> TrustedRoot: 

364 """Create new trust root from Sigstore staging TUF repository. 

365 

366 If `offline`, will use trust root in local TUF cache. Otherwise will 

367 update the trust root from remote TUF repository. 

368 """ 

369 return cls.from_tuf(STAGING_TUF_URL, offline) 

370 

371 def _get_tlog_keys( 

372 self, tlogs: list[TransparencyLogInstance], purpose: KeyringPurpose 

373 ) -> Iterable[_PublicKey]: 

374 """ 

375 Yields an iterator of public keys for transparency log instances that 

376 are suitable for `purpose`. 

377 """ 

378 allow_expired = purpose is KeyringPurpose.VERIFY 

379 for tlog in tlogs: 

380 if not _is_timerange_valid( 

381 tlog.public_key.valid_for, allow_expired=allow_expired 

382 ): 

383 continue 

384 

385 yield tlog.public_key 

386 

387 def rekor_keyring(self, purpose: KeyringPurpose) -> RekorKeyring: 

388 """Return keyring with keys for Rekor.""" 

389 

390 keys: list[_PublicKey] = list(self._get_tlog_keys(self._inner.tlogs, purpose)) 

391 if len(keys) == 0: 

392 raise MetadataError("Did not find any Rekor keys in trusted root") 

393 return RekorKeyring(Keyring(keys)) 

394 

395 def ct_keyring(self, purpose: KeyringPurpose) -> CTKeyring: 

396 """Return keyring with key for CTFE.""" 

397 ctfes: list[_PublicKey] = list(self._get_tlog_keys(self._inner.ctlogs, purpose)) 

398 if not ctfes: 

399 raise MetadataError("CTFE keys not found in trusted root") 

400 return CTKeyring(Keyring(ctfes)) 

401 

402 def get_fulcio_certs(self) -> list[Certificate]: 

403 """Return the Fulcio certificates.""" 

404 

405 certs: list[Certificate] = [] 

406 

407 # Return expired certificates too: they are expired now but may have 

408 # been active when the certificate was used to sign. 

409 for authority in self._inner.certificate_authorities: 

410 certificate_authority = CertificateAuthority(authority) 

411 certs.extend(certificate_authority.certificates(allow_expired=True)) 

412 

413 if not certs: 

414 raise MetadataError("Fulcio certificates not found in trusted root") 

415 return certs 

416 

417 def get_timestamp_authorities(self) -> list[CertificateAuthority]: 

418 """ 

419 Return the TSA present in the trusted root. 

420 

421 This list may be empty and in this case, no timestamp verification can be 

422 performed. 

423 """ 

424 certificate_authorities: list[CertificateAuthority] = [ 

425 CertificateAuthority(cert_chain) 

426 for cert_chain in self._inner.timestamp_authorities 

427 ] 

428 return certificate_authorities 

429 

430 

431class ClientTrustConfig: 

432 """ 

433 Represents a Sigstore client's trust configuration, including a root of trust. 

434 """ 

435 

436 class ClientTrustConfigType(str, Enum): 

437 """ 

438 Known Sigstore client trust config media types. 

439 """ 

440 

441 CONFIG_0_1 = "application/vnd.dev.sigstore.clienttrustconfig.v0.1+json" 

442 

443 def __str__(self) -> str: 

444 """Returns the variant's string value.""" 

445 return self.value 

446 

447 @classmethod 

448 def from_json(cls, raw: str) -> ClientTrustConfig: 

449 """ 

450 Deserialize the given client trust config. 

451 """ 

452 inner = _ClientTrustConfig().from_json(raw) 

453 return cls(inner) 

454 

455 def __init__(self, inner: _ClientTrustConfig) -> None: 

456 """ 

457 @api private 

458 """ 

459 self._inner = inner 

460 self._verify() 

461 

462 def _verify(self) -> None: 

463 """ 

464 Performs various feats of heroism to ensure that the client trust config 

465 is well-formed. 

466 """ 

467 

468 # The client trust config must have a recognized media type. 

469 try: 

470 ClientTrustConfig.ClientTrustConfigType(self._inner.media_type) 

471 except ValueError: 

472 raise Error( 

473 f"unsupported client trust config format: {self._inner.media_type}" 

474 ) 

475 

476 @property 

477 def trusted_root(self) -> TrustedRoot: 

478 """ 

479 Return the interior root of trust, as a `TrustedRoot`. 

480 """ 

481 return TrustedRoot(self._inner.trusted_root)