Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sigstore/_internal/trust.py: 47%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

106 statements  

1# Copyright 2023 The Sigstore Authors 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15""" 

16Client trust configuration and trust root management for sigstore-python. 

17""" 

18 

19from __future__ import annotations 

20 

21import logging 

22from dataclasses import dataclass 

23from datetime import datetime 

24from enum import Enum 

25from pathlib import Path 

26from typing import ClassVar, NewType 

27 

28import cryptography.hazmat.primitives.asymmetric.padding as padding 

29from cryptography.exceptions import InvalidSignature 

30from cryptography.hazmat.primitives import hashes 

31from cryptography.hazmat.primitives.asymmetric import ec, ed25519, rsa 

32from cryptography.x509 import ( 

33 Certificate, 

34 load_der_x509_certificate, 

35) 

36from sigstore_models.common import v1 as common_v1 

37from sigstore_models.trustroot import v1 as trustroot_v1 

38 

39from sigstore._utils import ( 

40 KeyID, 

41 PublicKey, 

42 is_timerange_valid, 

43 key_id, 

44 load_der_public_key, 

45) 

46from sigstore.errors import Error, VerificationError 

47 

48# Versions supported by this client 

49REKOR_VERSIONS = [1, 2] 

50TSA_VERSIONS = [1] 

51FULCIO_VERSIONS = [1] 

52OIDC_VERSIONS = [1] 

53 

54_logger = logging.getLogger(__name__) 

55 

56 

57@dataclass(init=False) 

58class Key: 

59 """ 

60 Represents a key in a `Keyring`. 

61 """ 

62 

63 hash_algorithm: hashes.HashAlgorithm | None 

64 key: PublicKey 

65 key_id: KeyID 

66 

67 _RSA_SHA_256_DETAILS: ClassVar = { 

68 common_v1.PublicKeyDetails.PKCS1_RSA_PKCS1V5, 

69 common_v1.PublicKeyDetails.PKIX_RSA_PKCS1V15_2048_SHA256, 

70 common_v1.PublicKeyDetails.PKIX_RSA_PKCS1V15_3072_SHA256, 

71 common_v1.PublicKeyDetails.PKIX_RSA_PKCS1V15_4096_SHA256, 

72 } 

73 

74 _EC_DETAILS_TO_HASH: ClassVar = { 

75 common_v1.PublicKeyDetails.PKIX_ECDSA_P256_SHA_256: hashes.SHA256(), 

76 common_v1.PublicKeyDetails.PKIX_ECDSA_P384_SHA_384: hashes.SHA384(), 

77 common_v1.PublicKeyDetails.PKIX_ECDSA_P521_SHA_512: hashes.SHA512(), 

78 } 

79 

80 def __init__(self, public_key: common_v1.PublicKey) -> None: 

81 """ 

82 Construct a key from the given Sigstore PublicKey message. 

83 """ 

84 

85 # NOTE: `raw_bytes` is marked as `optional` in the `PublicKey` message, 

86 # for unclear reasons. 

87 if not public_key.raw_bytes: 

88 raise VerificationError("public key is empty") 

89 

90 hash_algorithm: hashes.HashAlgorithm | None 

91 if public_key.key_details in self._RSA_SHA_256_DETAILS: 

92 hash_algorithm = hashes.SHA256() 

93 key = load_der_public_key(public_key.raw_bytes, types=(rsa.RSAPublicKey,)) 

94 elif public_key.key_details in self._EC_DETAILS_TO_HASH: 

95 hash_algorithm = self._EC_DETAILS_TO_HASH[public_key.key_details] 

96 key = load_der_public_key( 

97 public_key.raw_bytes, types=(ec.EllipticCurvePublicKey,) 

98 ) 

99 elif public_key.key_details == common_v1.PublicKeyDetails.PKIX_ED25519: 

100 hash_algorithm = None 

101 key = load_der_public_key( 

102 public_key.raw_bytes, types=(ed25519.Ed25519PublicKey,) 

103 ) 

104 else: 

105 raise VerificationError(f"unsupported key type: {public_key.key_details}") 

106 

107 self.hash_algorithm = hash_algorithm 

108 self.key = key 

109 self.key_id = key_id(key) 

110 

111 def verify(self, signature: bytes, data: bytes) -> None: 

112 """ 

113 Verifies the given `data` against `signature` using the current key. 

114 """ 

115 if isinstance(self.key, rsa.RSAPublicKey) and self.hash_algorithm is not None: 

116 self.key.verify( 

117 signature=signature, 

118 data=data, 

119 # TODO: Parametrize this as well, for PSS. 

120 padding=padding.PKCS1v15(), 

121 algorithm=self.hash_algorithm, 

122 ) 

123 elif ( 

124 isinstance(self.key, ec.EllipticCurvePublicKey) 

125 and self.hash_algorithm is not None 

126 ): 

127 self.key.verify( 

128 signature=signature, 

129 data=data, 

130 signature_algorithm=ec.ECDSA(self.hash_algorithm), 

131 ) 

132 elif ( 

133 isinstance(self.key, ed25519.Ed25519PublicKey) 

134 and self.hash_algorithm is None 

135 ): 

136 self.key.verify( 

137 signature=signature, 

138 data=data, 

139 ) 

140 else: 

141 # Unreachable without API misuse. 

142 raise VerificationError(f"keyring: unsupported key: {self.key}") 

143 

144 

145class Keyring: 

146 """ 

147 Represents a set of keys, each of which is a potentially valid verifier. 

148 """ 

149 

150 def __init__(self, public_keys: list[common_v1.PublicKey] = []): 

151 """ 

152 Create a new `Keyring`, with `keys` as the initial set of verifying keys. 

153 """ 

154 self._keyring: dict[KeyID, Key] = {} 

155 

156 for public_key in public_keys: 

157 try: 

158 key = Key(public_key) 

159 self._keyring[key.key_id] = key 

160 except VerificationError as e: 

161 _logger.warning(f"Failed to load a trusted root key: {e}") 

162 

163 def verify(self, *, key_id: KeyID, signature: bytes, data: bytes) -> None: 

164 """ 

165 Verify that `signature` is a valid signature for `data`, using the 

166 key identified by `key_id`. 

167 

168 `key_id` is an unauthenticated hint; if no key matches the given key ID, 

169 all keys in the keyring are tried. 

170 

171 Raises if the signature is invalid, i.e. is not valid for any of the 

172 keys in the keyring. 

173 """ 

174 

175 key = self._keyring.get(key_id) 

176 candidates = [key] if key is not None else list(self._keyring.values()) 

177 

178 # Try to verify each candidate key. In the happy case, this will 

179 # be exactly one candidate. 

180 valid = False 

181 for candidate in candidates: 

182 try: 

183 candidate.verify(signature, data) 

184 valid = True 

185 break 

186 except InvalidSignature: 

187 pass 

188 

189 if not valid: 

190 raise VerificationError("keyring: invalid signature") 

191 

192 

193RekorKeyring = NewType("RekorKeyring", Keyring) 

194CTKeyring = NewType("CTKeyring", Keyring) 

195 

196 

197class KeyringPurpose(str, Enum): 

198 """ 

199 Keyring purpose typing 

200 """ 

201 

202 SIGN = "sign" 

203 VERIFY = "verify" 

204 

205 def __str__(self) -> str: 

206 """Returns the purpose string value.""" 

207 return self.value 

208 

209 

210class CertificateAuthority: 

211 """ 

212 Certificate Authority used in a Trusted Root configuration. 

213 """ 

214 

215 def __init__(self, inner: trustroot_v1.CertificateAuthority): 

216 """ 

217 Construct a new `CertificateAuthority`. 

218 

219 @api private 

220 """ 

221 self._inner = inner 

222 self._certificates: list[Certificate] = [] 

223 self._verify() 

224 

225 @classmethod 

226 def from_json(cls, path: str) -> CertificateAuthority: 

227 """ 

228 Create a CertificateAuthority directly from JSON. 

229 """ 

230 inner = trustroot_v1.CertificateAuthority.from_json(Path(path).read_bytes()) 

231 return cls(inner) 

232 

233 def _verify(self) -> None: 

234 """ 

235 Verify and load the certificate authority. 

236 """ 

237 self._certificates = [ 

238 load_der_x509_certificate(cert.raw_bytes) 

239 for cert in self._inner.cert_chain.certificates 

240 ] 

241 

242 if not self._certificates: 

243 raise Error("missing a certificate in Certificate Authority") 

244 

245 @property 

246 def validity_period_start(self) -> datetime: 

247 """ 

248 Validity period start. 

249 """ 

250 return self._inner.valid_for.start 

251 

252 @property 

253 def validity_period_end(self) -> datetime | None: 

254 """ 

255 Validity period end. 

256 """ 

257 return self._inner.valid_for.end 

258 

259 def certificates(self, *, allow_expired: bool) -> list[Certificate]: 

260 """ 

261 Return a list of certificates in the authority chain. 

262 

263 The certificates are returned in order from leaf to root, with any 

264 intermediate certificates in between. 

265 """ 

266 if not is_timerange_valid(self._inner.valid_for, allow_expired=allow_expired): 

267 return [] 

268 return self._certificates