1# Copyright 2023 The Sigstore Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""
16Client trust configuration and trust root management for sigstore-python.
17"""
18
19from __future__ import annotations
20
21import logging
22from dataclasses import dataclass
23from datetime import datetime
24from enum import Enum
25from pathlib import Path
26from typing import ClassVar, NewType
27
28import cryptography.hazmat.primitives.asymmetric.padding as padding
29from cryptography.exceptions import InvalidSignature
30from cryptography.hazmat.primitives import hashes
31from cryptography.hazmat.primitives.asymmetric import ec, ed25519, rsa
32from cryptography.x509 import (
33 Certificate,
34 load_der_x509_certificate,
35)
36from sigstore_models.common import v1 as common_v1
37from sigstore_models.trustroot import v1 as trustroot_v1
38
39from sigstore._utils import (
40 KeyID,
41 PublicKey,
42 is_timerange_valid,
43 key_id,
44 load_der_public_key,
45)
46from sigstore.errors import Error, VerificationError
47
48# Versions supported by this client
49REKOR_VERSIONS = [1, 2]
50TSA_VERSIONS = [1]
51FULCIO_VERSIONS = [1]
52OIDC_VERSIONS = [1]
53
54_logger = logging.getLogger(__name__)
55
56
57@dataclass(init=False)
58class Key:
59 """
60 Represents a key in a `Keyring`.
61 """
62
63 hash_algorithm: hashes.HashAlgorithm | None
64 key: PublicKey
65 key_id: KeyID
66
67 _RSA_SHA_256_DETAILS: ClassVar = {
68 common_v1.PublicKeyDetails.PKCS1_RSA_PKCS1V5,
69 common_v1.PublicKeyDetails.PKIX_RSA_PKCS1V15_2048_SHA256,
70 common_v1.PublicKeyDetails.PKIX_RSA_PKCS1V15_3072_SHA256,
71 common_v1.PublicKeyDetails.PKIX_RSA_PKCS1V15_4096_SHA256,
72 }
73
74 _EC_DETAILS_TO_HASH: ClassVar = {
75 common_v1.PublicKeyDetails.PKIX_ECDSA_P256_SHA_256: hashes.SHA256(),
76 common_v1.PublicKeyDetails.PKIX_ECDSA_P384_SHA_384: hashes.SHA384(),
77 common_v1.PublicKeyDetails.PKIX_ECDSA_P521_SHA_512: hashes.SHA512(),
78 }
79
80 def __init__(self, public_key: common_v1.PublicKey) -> None:
81 """
82 Construct a key from the given Sigstore PublicKey message.
83 """
84
85 # NOTE: `raw_bytes` is marked as `optional` in the `PublicKey` message,
86 # for unclear reasons.
87 if not public_key.raw_bytes:
88 raise VerificationError("public key is empty")
89
90 hash_algorithm: hashes.HashAlgorithm | None
91 if public_key.key_details in self._RSA_SHA_256_DETAILS:
92 hash_algorithm = hashes.SHA256()
93 key = load_der_public_key(public_key.raw_bytes, types=(rsa.RSAPublicKey,))
94 elif public_key.key_details in self._EC_DETAILS_TO_HASH:
95 hash_algorithm = self._EC_DETAILS_TO_HASH[public_key.key_details]
96 key = load_der_public_key(
97 public_key.raw_bytes, types=(ec.EllipticCurvePublicKey,)
98 )
99 elif public_key.key_details == common_v1.PublicKeyDetails.PKIX_ED25519:
100 hash_algorithm = None
101 key = load_der_public_key(
102 public_key.raw_bytes, types=(ed25519.Ed25519PublicKey,)
103 )
104 else:
105 raise VerificationError(f"unsupported key type: {public_key.key_details}")
106
107 self.hash_algorithm = hash_algorithm
108 self.key = key
109 self.key_id = key_id(key)
110
111 def verify(self, signature: bytes, data: bytes) -> None:
112 """
113 Verifies the given `data` against `signature` using the current key.
114 """
115 if isinstance(self.key, rsa.RSAPublicKey) and self.hash_algorithm is not None:
116 self.key.verify(
117 signature=signature,
118 data=data,
119 # TODO: Parametrize this as well, for PSS.
120 padding=padding.PKCS1v15(),
121 algorithm=self.hash_algorithm,
122 )
123 elif (
124 isinstance(self.key, ec.EllipticCurvePublicKey)
125 and self.hash_algorithm is not None
126 ):
127 self.key.verify(
128 signature=signature,
129 data=data,
130 signature_algorithm=ec.ECDSA(self.hash_algorithm),
131 )
132 elif (
133 isinstance(self.key, ed25519.Ed25519PublicKey)
134 and self.hash_algorithm is None
135 ):
136 self.key.verify(
137 signature=signature,
138 data=data,
139 )
140 else:
141 # Unreachable without API misuse.
142 raise VerificationError(f"keyring: unsupported key: {self.key}")
143
144
145class Keyring:
146 """
147 Represents a set of keys, each of which is a potentially valid verifier.
148 """
149
150 def __init__(self, public_keys: list[common_v1.PublicKey] = []):
151 """
152 Create a new `Keyring`, with `keys` as the initial set of verifying keys.
153 """
154 self._keyring: dict[KeyID, Key] = {}
155
156 for public_key in public_keys:
157 try:
158 key = Key(public_key)
159 self._keyring[key.key_id] = key
160 except VerificationError as e:
161 _logger.warning(f"Failed to load a trusted root key: {e}")
162
163 def verify(self, *, key_id: KeyID, signature: bytes, data: bytes) -> None:
164 """
165 Verify that `signature` is a valid signature for `data`, using the
166 key identified by `key_id`.
167
168 `key_id` is an unauthenticated hint; if no key matches the given key ID,
169 all keys in the keyring are tried.
170
171 Raises if the signature is invalid, i.e. is not valid for any of the
172 keys in the keyring.
173 """
174
175 key = self._keyring.get(key_id)
176 candidates = [key] if key is not None else list(self._keyring.values())
177
178 # Try to verify each candidate key. In the happy case, this will
179 # be exactly one candidate.
180 valid = False
181 for candidate in candidates:
182 try:
183 candidate.verify(signature, data)
184 valid = True
185 break
186 except InvalidSignature:
187 pass
188
189 if not valid:
190 raise VerificationError("keyring: invalid signature")
191
192
193RekorKeyring = NewType("RekorKeyring", Keyring)
194CTKeyring = NewType("CTKeyring", Keyring)
195
196
197class KeyringPurpose(str, Enum):
198 """
199 Keyring purpose typing
200 """
201
202 SIGN = "sign"
203 VERIFY = "verify"
204
205 def __str__(self) -> str:
206 """Returns the purpose string value."""
207 return self.value
208
209
210class CertificateAuthority:
211 """
212 Certificate Authority used in a Trusted Root configuration.
213 """
214
215 def __init__(self, inner: trustroot_v1.CertificateAuthority):
216 """
217 Construct a new `CertificateAuthority`.
218
219 @api private
220 """
221 self._inner = inner
222 self._certificates: list[Certificate] = []
223 self._verify()
224
225 @classmethod
226 def from_json(cls, path: str) -> CertificateAuthority:
227 """
228 Create a CertificateAuthority directly from JSON.
229 """
230 inner = trustroot_v1.CertificateAuthority.from_json(Path(path).read_bytes())
231 return cls(inner)
232
233 def _verify(self) -> None:
234 """
235 Verify and load the certificate authority.
236 """
237 self._certificates = [
238 load_der_x509_certificate(cert.raw_bytes)
239 for cert in self._inner.cert_chain.certificates
240 ]
241
242 if not self._certificates:
243 raise Error("missing a certificate in Certificate Authority")
244
245 @property
246 def validity_period_start(self) -> datetime:
247 """
248 Validity period start.
249 """
250 return self._inner.valid_for.start
251
252 @property
253 def validity_period_end(self) -> datetime | None:
254 """
255 Validity period end.
256 """
257 return self._inner.valid_for.end
258
259 def certificates(self, *, allow_expired: bool) -> list[Certificate]:
260 """
261 Return a list of certificates in the authority chain.
262
263 The certificates are returned in order from leaf to root, with any
264 intermediate certificates in between.
265 """
266 if not is_timerange_valid(self._inner.valid_for, allow_expired=allow_expired):
267 return []
268 return self._certificates