1# Copyright 2023 The Sigstore Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""
16Client trust configuration and trust root management for sigstore-python.
17"""
18
19from __future__ import annotations
20
21import logging
22from collections.abc import Iterable
23from dataclasses import dataclass
24from datetime import datetime, timezone
25from enum import Enum
26from pathlib import Path
27from typing import ClassVar, NewType
28
29import cryptography.hazmat.primitives.asymmetric.padding as padding
30from cryptography.exceptions import InvalidSignature
31from cryptography.hazmat.primitives import hashes
32from cryptography.hazmat.primitives.asymmetric import ec, rsa
33from cryptography.x509 import (
34 Certificate,
35 load_der_x509_certificate,
36)
37from sigstore_protobuf_specs.dev.sigstore.common.v1 import PublicKey as _PublicKey
38from sigstore_protobuf_specs.dev.sigstore.common.v1 import (
39 PublicKeyDetails as _PublicKeyDetails,
40)
41from sigstore_protobuf_specs.dev.sigstore.common.v1 import TimeRange
42from sigstore_protobuf_specs.dev.sigstore.trustroot.v1 import (
43 CertificateAuthority as _CertificateAuthority,
44)
45from sigstore_protobuf_specs.dev.sigstore.trustroot.v1 import (
46 ClientTrustConfig as _ClientTrustConfig,
47)
48from sigstore_protobuf_specs.dev.sigstore.trustroot.v1 import (
49 TransparencyLogInstance,
50)
51from sigstore_protobuf_specs.dev.sigstore.trustroot.v1 import (
52 TrustedRoot as _TrustedRoot,
53)
54
55from sigstore._internal.tuf import DEFAULT_TUF_URL, STAGING_TUF_URL, TrustUpdater
56from sigstore._utils import (
57 KeyID,
58 PublicKey,
59 key_id,
60 load_der_public_key,
61)
62from sigstore.errors import Error, MetadataError, VerificationError
63
64_logger = logging.getLogger(__name__)
65
66
67def _is_timerange_valid(period: TimeRange | None, *, allow_expired: bool) -> bool:
68 """
69 Given a `period`, checks that the the current time is not before `start`. If
70 `allow_expired` is `False`, also checks that the current time is not after
71 `end`.
72 """
73 now = datetime.now(timezone.utc)
74
75 # If there was no validity period specified, the key is always valid.
76 if not period:
77 return True
78
79 # Active: if the current time is before the starting period, we are not yet
80 # valid.
81 if now < period.start:
82 return False
83
84 # If we want Expired keys, the key is valid at this point. Otherwise, check
85 # that we are within range.
86 return allow_expired or (period.end is None or now <= period.end)
87
88
89@dataclass(init=False)
90class Key:
91 """
92 Represents a key in a `Keyring`.
93 """
94
95 hash_algorithm: hashes.HashAlgorithm
96 key: PublicKey
97 key_id: KeyID
98
99 _RSA_SHA_256_DETAILS: ClassVar[set[_PublicKeyDetails]] = {
100 _PublicKeyDetails.PKCS1_RSA_PKCS1V5,
101 _PublicKeyDetails.PKIX_RSA_PKCS1V15_2048_SHA256,
102 _PublicKeyDetails.PKIX_RSA_PKCS1V15_3072_SHA256,
103 _PublicKeyDetails.PKIX_RSA_PKCS1V15_4096_SHA256,
104 }
105
106 _EC_DETAILS_TO_HASH: ClassVar[dict[_PublicKeyDetails, hashes.HashAlgorithm]] = {
107 _PublicKeyDetails.PKIX_ECDSA_P256_SHA_256: hashes.SHA256(),
108 _PublicKeyDetails.PKIX_ECDSA_P384_SHA_384: hashes.SHA384(),
109 _PublicKeyDetails.PKIX_ECDSA_P521_SHA_512: hashes.SHA512(),
110 }
111
112 def __init__(self, public_key: _PublicKey) -> None:
113 """
114 Construct a key from the given Sigstore PublicKey message.
115 """
116
117 # NOTE: `raw_bytes` is marked as `optional` in the `PublicKey` message,
118 # for unclear reasons.
119 if not public_key.raw_bytes:
120 raise VerificationError("public key is empty")
121
122 hash_algorithm: hashes.HashAlgorithm
123 if public_key.key_details in self._RSA_SHA_256_DETAILS:
124 hash_algorithm = hashes.SHA256()
125 key = load_der_public_key(public_key.raw_bytes, types=(rsa.RSAPublicKey,))
126 elif public_key.key_details in self._EC_DETAILS_TO_HASH:
127 hash_algorithm = self._EC_DETAILS_TO_HASH[public_key.key_details]
128 key = load_der_public_key(
129 public_key.raw_bytes, types=(ec.EllipticCurvePublicKey,)
130 )
131 else:
132 raise VerificationError(f"unsupported key type: {public_key.key_details}")
133
134 self.hash_algorithm = hash_algorithm
135 self.key = key
136 self.key_id = key_id(key)
137
138 def verify(self, signature: bytes, data: bytes) -> None:
139 """
140 Verifies the given `data` against `signature` using the current key.
141 """
142 if isinstance(self.key, rsa.RSAPublicKey):
143 self.key.verify(
144 signature=signature,
145 data=data,
146 # TODO: Parametrize this as well, for PSS.
147 padding=padding.PKCS1v15(),
148 algorithm=self.hash_algorithm,
149 )
150 elif isinstance(self.key, ec.EllipticCurvePublicKey):
151 self.key.verify(
152 signature=signature,
153 data=data,
154 signature_algorithm=ec.ECDSA(self.hash_algorithm),
155 )
156 else:
157 # Unreachable without API misuse.
158 raise VerificationError(f"keyring: unsupported key: {self.key}")
159
160
161class Keyring:
162 """
163 Represents a set of keys, each of which is a potentially valid verifier.
164 """
165
166 def __init__(self, public_keys: list[_PublicKey] = []):
167 """
168 Create a new `Keyring`, with `keys` as the initial set of verifying keys.
169 """
170 self._keyring: dict[KeyID, Key] = {}
171
172 for public_key in public_keys:
173 try:
174 key = Key(public_key)
175 self._keyring[key.key_id] = key
176 except VerificationError as e:
177 _logger.warning(f"Failed to load a trusted root key: {e}")
178
179 def verify(self, *, key_id: KeyID, signature: bytes, data: bytes) -> None:
180 """
181 Verify that `signature` is a valid signature for `data`, using the
182 key identified by `key_id`.
183
184 `key_id` is an unauthenticated hint; if no key matches the given key ID,
185 all keys in the keyring are tried.
186
187 Raises if the signature is invalid, i.e. is not valid for any of the
188 keys in the keyring.
189 """
190
191 key = self._keyring.get(key_id)
192 candidates = [key] if key is not None else list(self._keyring.values())
193
194 # Try to verify each candidate key. In the happy case, this will
195 # be exactly one candidate.
196 valid = False
197 for candidate in candidates:
198 try:
199 candidate.verify(signature, data)
200 valid = True
201 break
202 except InvalidSignature:
203 pass
204
205 if not valid:
206 raise VerificationError("keyring: invalid signature")
207
208
209RekorKeyring = NewType("RekorKeyring", Keyring)
210CTKeyring = NewType("CTKeyring", Keyring)
211
212
213class KeyringPurpose(str, Enum):
214 """
215 Keyring purpose typing
216 """
217
218 SIGN = "sign"
219 VERIFY = "verify"
220
221 def __str__(self) -> str:
222 """Returns the purpose string value."""
223 return self.value
224
225
226class CertificateAuthority:
227 """
228 Certificate Authority used in a Trusted Root configuration.
229 """
230
231 def __init__(self, inner: _CertificateAuthority):
232 """
233 Construct a new `CertificateAuthority`.
234
235 @api private
236 """
237 self._inner = inner
238 self._certificates: list[Certificate] = []
239 self._verify()
240
241 @classmethod
242 def from_json(cls, path: str) -> CertificateAuthority:
243 """
244 Create a CertificateAuthority directly from JSON.
245 """
246 inner = _CertificateAuthority().from_json(Path(path).read_bytes())
247 return cls(inner)
248
249 def _verify(self) -> None:
250 """
251 Verify and load the certificate authority.
252 """
253 self._certificates = [
254 load_der_x509_certificate(cert.raw_bytes)
255 for cert in self._inner.cert_chain.certificates
256 ]
257
258 if not self._certificates:
259 raise Error("missing a certificate in Certificate Authority")
260
261 @property
262 def validity_period_start(self) -> datetime | None:
263 """
264 Validity period start.
265 """
266 return self._inner.valid_for.start
267
268 @property
269 def validity_period_end(self) -> datetime | None:
270 """
271 Validity period end.
272 """
273 return self._inner.valid_for.end
274
275 def certificates(self, *, allow_expired: bool) -> list[Certificate]:
276 """
277 Return a list of certificates in the authority chain.
278
279 The certificates are returned in order from leaf to root, with any
280 intermediate certificates in between.
281 """
282 if not _is_timerange_valid(self._inner.valid_for, allow_expired=allow_expired):
283 return []
284 return self._certificates
285
286
287class TrustedRoot:
288 """
289 The cryptographic root(s) of trust for a Sigstore instance.
290 """
291
292 class TrustedRootType(str, Enum):
293 """
294 Known Sigstore trusted root media types.
295 """
296
297 TRUSTED_ROOT_0_1 = "application/vnd.dev.sigstore.trustedroot+json;version=0.1"
298
299 def __str__(self) -> str:
300 """Returns the variant's string value."""
301 return self.value
302
303 def __init__(self, inner: _TrustedRoot):
304 """
305 Construct a new `TrustedRoot`.
306
307 @api private
308 """
309 self._inner = inner
310 self._verify()
311
312 def _verify(self) -> None:
313 """
314 Performs various feats of heroism to ensure that the trusted root
315 is well-formed.
316 """
317
318 # The trusted root must have a recognized media type.
319 try:
320 TrustedRoot.TrustedRootType(self._inner.media_type)
321 except ValueError:
322 raise Error(f"unsupported trusted root format: {self._inner.media_type}")
323
324 @classmethod
325 def from_file(
326 cls,
327 path: str,
328 ) -> TrustedRoot:
329 """Create a new trust root from file"""
330 inner = _TrustedRoot().from_json(Path(path).read_bytes())
331 return cls(inner)
332
333 @classmethod
334 def from_tuf(
335 cls,
336 url: str,
337 offline: bool = False,
338 ) -> TrustedRoot:
339 """Create a new trust root from a TUF repository.
340
341 If `offline`, will use trust root in local TUF cache. Otherwise will
342 update the trust root from remote TUF repository.
343 """
344 path = TrustUpdater(url, offline).get_trusted_root_path()
345 return cls.from_file(path)
346
347 @classmethod
348 def production(
349 cls,
350 offline: bool = False,
351 ) -> TrustedRoot:
352 """Create new trust root from Sigstore production TUF repository.
353
354 If `offline`, will use trust root in local TUF cache. Otherwise will
355 update the trust root from remote TUF repository.
356 """
357 return cls.from_tuf(DEFAULT_TUF_URL, offline)
358
359 @classmethod
360 def staging(
361 cls,
362 offline: bool = False,
363 ) -> TrustedRoot:
364 """Create new trust root from Sigstore staging TUF repository.
365
366 If `offline`, will use trust root in local TUF cache. Otherwise will
367 update the trust root from remote TUF repository.
368 """
369 return cls.from_tuf(STAGING_TUF_URL, offline)
370
371 def _get_tlog_keys(
372 self, tlogs: list[TransparencyLogInstance], purpose: KeyringPurpose
373 ) -> Iterable[_PublicKey]:
374 """
375 Yields an iterator of public keys for transparency log instances that
376 are suitable for `purpose`.
377 """
378 allow_expired = purpose is KeyringPurpose.VERIFY
379 for tlog in tlogs:
380 if not _is_timerange_valid(
381 tlog.public_key.valid_for, allow_expired=allow_expired
382 ):
383 continue
384
385 yield tlog.public_key
386
387 def rekor_keyring(self, purpose: KeyringPurpose) -> RekorKeyring:
388 """Return keyring with keys for Rekor."""
389
390 keys: list[_PublicKey] = list(self._get_tlog_keys(self._inner.tlogs, purpose))
391 if len(keys) == 0:
392 raise MetadataError("Did not find any Rekor keys in trusted root")
393 return RekorKeyring(Keyring(keys))
394
395 def ct_keyring(self, purpose: KeyringPurpose) -> CTKeyring:
396 """Return keyring with key for CTFE."""
397 ctfes: list[_PublicKey] = list(self._get_tlog_keys(self._inner.ctlogs, purpose))
398 if not ctfes:
399 raise MetadataError("CTFE keys not found in trusted root")
400 return CTKeyring(Keyring(ctfes))
401
402 def get_fulcio_certs(self) -> list[Certificate]:
403 """Return the Fulcio certificates."""
404
405 certs: list[Certificate] = []
406
407 # Return expired certificates too: they are expired now but may have
408 # been active when the certificate was used to sign.
409 for authority in self._inner.certificate_authorities:
410 certificate_authority = CertificateAuthority(authority)
411 certs.extend(certificate_authority.certificates(allow_expired=True))
412
413 if not certs:
414 raise MetadataError("Fulcio certificates not found in trusted root")
415 return certs
416
417 def get_timestamp_authorities(self) -> list[CertificateAuthority]:
418 """
419 Return the TSA present in the trusted root.
420
421 This list may be empty and in this case, no timestamp verification can be
422 performed.
423 """
424 certificate_authorities: list[CertificateAuthority] = [
425 CertificateAuthority(cert_chain)
426 for cert_chain in self._inner.timestamp_authorities
427 ]
428 return certificate_authorities
429
430
431class ClientTrustConfig:
432 """
433 Represents a Sigstore client's trust configuration, including a root of trust.
434 """
435
436 class ClientTrustConfigType(str, Enum):
437 """
438 Known Sigstore client trust config media types.
439 """
440
441 CONFIG_0_1 = "application/vnd.dev.sigstore.clienttrustconfig.v0.1+json"
442
443 def __str__(self) -> str:
444 """Returns the variant's string value."""
445 return self.value
446
447 @classmethod
448 def from_json(cls, raw: str) -> ClientTrustConfig:
449 """
450 Deserialize the given client trust config.
451 """
452 inner = _ClientTrustConfig().from_json(raw)
453 return cls(inner)
454
455 def __init__(self, inner: _ClientTrustConfig) -> None:
456 """
457 @api private
458 """
459 self._inner = inner
460 self._verify()
461
462 def _verify(self) -> None:
463 """
464 Performs various feats of heroism to ensure that the client trust config
465 is well-formed.
466 """
467
468 # The client trust config must have a recognized media type.
469 try:
470 ClientTrustConfig.ClientTrustConfigType(self._inner.media_type)
471 except ValueError:
472 raise Error(
473 f"unsupported client trust config format: {self._inner.media_type}"
474 )
475
476 @property
477 def trusted_root(self) -> TrustedRoot:
478 """
479 Return the interior root of trust, as a `TrustedRoot`.
480 """
481 return TrustedRoot(self._inner.trusted_root)