Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sigstore/sign.py: 41%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

91 statements  

1# Copyright 2022 The Sigstore Authors 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15""" 

16API for signing artifacts. 

17 

18Example: 

19 

20```python 

21from pathlib import Path 

22from sigstore.models import ClientTrustConfig 

23from sigstore.oidc import Issuer 

24from sigstore.sign import SigningContext 

25 

26artifact_path = Path("README.md") 

27 

28# Construct OIDC Issuer and SigningContext for the 

29# Sigstore Public Good instance 

30trust_config = ClientTrustConfig.production() 

31issuer = Issuer(trust_config.signing_config.get_oidc_url()) 

32context = SigningContext.from_trust_config(trust_config) 

33 

34# Get an identity token from OIDC provider using interactive auth 

35token = issuer.identity_token() 

36 

37# Sign artifact with the identity 

38with context.signer(token, cache = True) as signer: 

39 bundle = signer.sign_artifact(artifact_path.read_bytes()) 

40 

41with Path("README.md.sigstore.json").open("w") as f: 

42 f.write(bundle.to_json()) 

43``` 

44""" 

45 

46from __future__ import annotations 

47 

48import base64 

49import logging 

50from collections.abc import Iterator 

51from contextlib import contextmanager 

52from datetime import datetime, timezone 

53 

54import cryptography.x509 as x509 

55from cryptography.hazmat.primitives import hashes 

56from cryptography.hazmat.primitives.asymmetric import ec 

57from cryptography.x509.oid import NameOID 

58from sigstore_models.common.v1 import HashOutput, MessageSignature 

59 

60from sigstore import dsse 

61from sigstore import hashes as sigstore_hashes 

62from sigstore._internal.fulcio import ( 

63 ExpiredCertificate, 

64 FulcioClient, 

65) 

66from sigstore._internal.rekor import EntryRequestBody, RekorLogSubmitter 

67from sigstore._internal.sct import verify_sct 

68from sigstore._internal.timestamp import TimestampAuthorityClient, TimestampError 

69from sigstore._internal.trust import KeyringPurpose 

70from sigstore._utils import sha256_digest 

71from sigstore.models import Bundle, ClientTrustConfig, TrustedRoot 

72from sigstore.oidc import ExpiredIdentity, IdentityToken 

73 

74_logger = logging.getLogger(__name__) 

75 

76 

77class Signer: 

78 """ 

79 The primary API for signing operations. 

80 """ 

81 

82 def __init__( 

83 self, 

84 identity_token: IdentityToken, 

85 signing_ctx: SigningContext, 

86 cache: bool = True, 

87 ) -> None: 

88 """ 

89 Create a new `Signer`. 

90 

91 `identity_token` is the identity token used to request a signing certificate 

92 from Fulcio. 

93 

94 `signing_ctx` is a `SigningContext` that keeps information about the signing 

95 configuration. 

96 

97 `cache` determines whether the signing certificate and ephemeral private key 

98 should be reused (until the certificate expires) to sign different artifacts. 

99 Default is `True`. 

100 """ 

101 self._identity_token = identity_token 

102 self._signing_ctx: SigningContext = signing_ctx 

103 self.__cached_private_key: ec.EllipticCurvePrivateKey | None = None 

104 self.__cached_signing_certificate: x509.Certificate | None = None 

105 if cache: 

106 _logger.debug("Generating ephemeral keys...") 

107 self.__cached_private_key = ec.generate_private_key(ec.SECP256R1()) 

108 _logger.debug("Requesting ephemeral certificate...") 

109 self.__cached_signing_certificate = self._signing_cert() 

110 

111 @property 

112 def _private_key(self) -> ec.EllipticCurvePrivateKey: 

113 """Get or generate a signing key.""" 

114 if self.__cached_private_key is None: 

115 _logger.debug("no cached key; generating ephemeral key") 

116 return ec.generate_private_key(ec.SECP256R1()) 

117 return self.__cached_private_key 

118 

119 def _signing_cert( 

120 self, 

121 ) -> x509.Certificate: 

122 """ 

123 Get or request a signing certificate from Fulcio. 

124 

125 Internally, this performs a CSR against Fulcio and verifies that 

126 the returned certificate is present in Fulcio's CT log. 

127 """ 

128 

129 # Our CSR cannot possibly succeed if our underlying identity token 

130 # is expired. 

131 if not self._identity_token.in_validity_period(): 

132 raise ExpiredIdentity 

133 

134 # If it exists, verify if the current certificate is expired 

135 if self.__cached_signing_certificate: 

136 not_valid_after = self.__cached_signing_certificate.not_valid_after_utc 

137 if datetime.now(timezone.utc) > not_valid_after: 

138 raise ExpiredCertificate 

139 return self.__cached_signing_certificate 

140 

141 else: 

142 _logger.debug("Retrieving signed certificate...") 

143 

144 # Build an X.509 Certificate Signing Request 

145 builder = ( 

146 x509.CertificateSigningRequestBuilder() 

147 .subject_name( 

148 x509.Name( 

149 [ 

150 x509.NameAttribute( 

151 NameOID.EMAIL_ADDRESS, self._identity_token._identity 

152 ), 

153 ] 

154 ) 

155 ) 

156 .add_extension( 

157 x509.BasicConstraints(ca=False, path_length=None), 

158 critical=True, 

159 ) 

160 ) 

161 certificate_request = builder.sign(self._private_key, hashes.SHA256()) 

162 

163 certificate_response = self._signing_ctx._fulcio.signing_cert.post( 

164 certificate_request, self._identity_token 

165 ) 

166 

167 verify_sct( 

168 certificate_response.cert, 

169 certificate_response.chain, 

170 self._signing_ctx._trusted_root.ct_keyring(KeyringPurpose.SIGN), 

171 ) 

172 

173 _logger.debug("Successfully verified SCT...") 

174 

175 return certificate_response.cert 

176 

177 def _finalize_sign( 

178 self, 

179 cert: x509.Certificate, 

180 content: MessageSignature | dsse.Envelope, 

181 proposed_entry: EntryRequestBody, 

182 ) -> Bundle: 

183 """ 

184 Perform the common "finalizing" steps in a Sigstore signing flow. 

185 """ 

186 # If the user provided TSA urls, timestamps the response 

187 signed_timestamp = [] 

188 for tsa_client in self._signing_ctx._tsa_clients: 

189 try: 

190 signed_timestamp.append(tsa_client.request_timestamp(content.signature)) 

191 except TimestampError as e: 

192 _logger.warning( 

193 f"Unable to use {tsa_client.url} to timestamp the bundle. Failed with {e}" 

194 ) 

195 

196 # Submit the proposed entry to the transparency log 

197 entry = self._signing_ctx._rekor.create_entry(proposed_entry) 

198 _logger.debug( 

199 f"Transparency log entry created with index: {entry._inner.log_index}" 

200 ) 

201 

202 return Bundle._from_parts(cert, content, entry, signed_timestamp) 

203 

204 def sign_dsse( 

205 self, 

206 input_: dsse.Statement, 

207 ) -> Bundle: 

208 """ 

209 Sign the given in-toto statement as a DSSE envelope, and return a 

210 `Bundle` containing the signed result. 

211 

212 This API is **only** for in-toto statements; to sign arbitrary artifacts, 

213 use `sign_artifact` instead. 

214 """ 

215 cert = self._signing_cert() 

216 

217 # Sign the statement, producing a DSSE envelope 

218 content = dsse._sign(self._private_key, input_) 

219 

220 # Create the proposed DSSE log entry 

221 proposed_entry = self._signing_ctx._rekor._build_dsse_request( 

222 envelope=content, certificate=cert 

223 ) 

224 

225 return self._finalize_sign(cert, content, proposed_entry) 

226 

227 def sign_artifact( 

228 self, 

229 input_: bytes | sigstore_hashes.Hashed, 

230 ) -> Bundle: 

231 """ 

232 Sign an artifact, and return a `Bundle` corresponding to the signed result. 

233 

234 The input can be one of two forms: 

235 

236 1. A `bytes` buffer; 

237 2. A `Hashed` object, containing a pre-hashed input (e.g., for inputs 

238 that are too large to buffer into memory). 

239 

240 Regardless of the input format, the signing operation will produce a 

241 `hashedrekord` entry within the bundle. No other entry types 

242 are supported by this API. 

243 """ 

244 

245 cert = self._signing_cert() 

246 

247 # Sign artifact 

248 hashed_input = sha256_digest(input_) 

249 

250 artifact_signature = self._private_key.sign( 

251 hashed_input.digest, ec.ECDSA(hashed_input._as_prehashed()) 

252 ) 

253 

254 content = MessageSignature( 

255 message_digest=HashOutput( 

256 algorithm=hashed_input.algorithm, 

257 digest=base64.b64encode(hashed_input.digest), 

258 ), 

259 signature=base64.b64encode(artifact_signature), 

260 ) 

261 

262 # Create the proposed hashedrekord entry 

263 proposed_entry = self._signing_ctx._rekor._build_hashed_rekord_request( 

264 hashed_input=hashed_input, signature=artifact_signature, certificate=cert 

265 ) 

266 

267 return self._finalize_sign(cert, content, proposed_entry) 

268 

269 

270class SigningContext: 

271 """ 

272 Keep a context between signing operations. 

273 """ 

274 

275 def __init__( 

276 self, 

277 *, 

278 fulcio: FulcioClient, 

279 rekor: RekorLogSubmitter, 

280 trusted_root: TrustedRoot, 

281 tsa_clients: list[TimestampAuthorityClient] | None = None, 

282 ): 

283 """ 

284 Create a new `SigningContext`. 

285 

286 `fulcio` is a `FulcioClient` capable of connecting to a Fulcio instance 

287 and returning signing certificates. 

288 

289 `rekor` is a `RekorClient` capable of connecting to a Rekor instance 

290 and creating transparency log entries. 

291 """ 

292 self._fulcio = fulcio 

293 self._rekor = rekor 

294 self._trusted_root = trusted_root 

295 self._tsa_clients = tsa_clients or [] 

296 

297 @classmethod 

298 def from_trust_config(cls, trust_config: ClientTrustConfig) -> SigningContext: 

299 """ 

300 Create a `SigningContext` from the given `ClientTrustConfig`. 

301 

302 @api private 

303 """ 

304 signing_config = trust_config.signing_config 

305 return cls( 

306 fulcio=signing_config.get_fulcio(), 

307 rekor=signing_config.get_tlogs()[0], 

308 trusted_root=trust_config.trusted_root, 

309 tsa_clients=signing_config.get_tsas(), 

310 ) 

311 

312 @contextmanager 

313 def signer( 

314 self, identity_token: IdentityToken, *, cache: bool = True 

315 ) -> Iterator[Signer]: 

316 """ 

317 A context manager for signing operations. 

318 

319 `identity_token` is the identity token passed to the `Signer` instance 

320 and used to request a signing certificate from Fulcio. 

321 

322 `cache` determines whether the signing certificate and ephemeral private key 

323 generated by the `Signer` instance should be reused (until the certificate expires) 

324 to sign different artifacts. 

325 Default is `True`. 

326 """ 

327 yield Signer(identity_token, self, cache)