Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sigstore/sign.py: 41%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

91 statements  

1# Copyright 2022 The Sigstore Authors 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15""" 

16API for signing artifacts. 

17 

18Example: 

19 

20```python 

21from pathlib import Path 

22 

23from sigstore.sign import SigningContext 

24from sigstore.oidc import Issuer 

25 

26issuer = Issuer.production() 

27identity = issuer.identity_token() 

28 

29# The artifact to sign 

30artifact = Path("foo.txt").read_bytes() 

31 

32signing_ctx = SigningContext.production() 

33with signing_ctx.signer(identity, cache=True) as signer: 

34 result = signer.sign_artifact(artifact) 

35 print(result) 

36``` 

37""" 

38 

39from __future__ import annotations 

40 

41import base64 

42import logging 

43from collections.abc import Iterator 

44from contextlib import contextmanager 

45from datetime import datetime, timezone 

46 

47import cryptography.x509 as x509 

48from cryptography.hazmat.primitives import hashes 

49from cryptography.hazmat.primitives.asymmetric import ec 

50from cryptography.x509.oid import NameOID 

51from sigstore_models.common.v1 import HashOutput, MessageSignature 

52 

53from sigstore import dsse 

54from sigstore import hashes as sigstore_hashes 

55from sigstore._internal.fulcio import ( 

56 ExpiredCertificate, 

57 FulcioClient, 

58) 

59from sigstore._internal.rekor import EntryRequestBody, RekorLogSubmitter 

60from sigstore._internal.sct import verify_sct 

61from sigstore._internal.timestamp import TimestampAuthorityClient, TimestampError 

62from sigstore._internal.trust import KeyringPurpose 

63from sigstore._utils import sha256_digest 

64from sigstore.models import Bundle, ClientTrustConfig, TrustedRoot 

65from sigstore.oidc import ExpiredIdentity, IdentityToken 

66 

67_logger = logging.getLogger(__name__) 

68 

69 

70class Signer: 

71 """ 

72 The primary API for signing operations. 

73 """ 

74 

75 def __init__( 

76 self, 

77 identity_token: IdentityToken, 

78 signing_ctx: SigningContext, 

79 cache: bool = True, 

80 ) -> None: 

81 """ 

82 Create a new `Signer`. 

83 

84 `identity_token` is the identity token used to request a signing certificate 

85 from Fulcio. 

86 

87 `signing_ctx` is a `SigningContext` that keeps information about the signing 

88 configuration. 

89 

90 `cache` determines whether the signing certificate and ephemeral private key 

91 should be reused (until the certificate expires) to sign different artifacts. 

92 Default is `True`. 

93 """ 

94 self._identity_token = identity_token 

95 self._signing_ctx: SigningContext = signing_ctx 

96 self.__cached_private_key: ec.EllipticCurvePrivateKey | None = None 

97 self.__cached_signing_certificate: x509.Certificate | None = None 

98 if cache: 

99 _logger.debug("Generating ephemeral keys...") 

100 self.__cached_private_key = ec.generate_private_key(ec.SECP256R1()) 

101 _logger.debug("Requesting ephemeral certificate...") 

102 self.__cached_signing_certificate = self._signing_cert() 

103 

104 @property 

105 def _private_key(self) -> ec.EllipticCurvePrivateKey: 

106 """Get or generate a signing key.""" 

107 if self.__cached_private_key is None: 

108 _logger.debug("no cached key; generating ephemeral key") 

109 return ec.generate_private_key(ec.SECP256R1()) 

110 return self.__cached_private_key 

111 

112 def _signing_cert( 

113 self, 

114 ) -> x509.Certificate: 

115 """ 

116 Get or request a signing certificate from Fulcio. 

117 

118 Internally, this performs a CSR against Fulcio and verifies that 

119 the returned certificate is present in Fulcio's CT log. 

120 """ 

121 

122 # Our CSR cannot possibly succeed if our underlying identity token 

123 # is expired. 

124 if not self._identity_token.in_validity_period(): 

125 raise ExpiredIdentity 

126 

127 # If it exists, verify if the current certificate is expired 

128 if self.__cached_signing_certificate: 

129 not_valid_after = self.__cached_signing_certificate.not_valid_after_utc 

130 if datetime.now(timezone.utc) > not_valid_after: 

131 raise ExpiredCertificate 

132 return self.__cached_signing_certificate 

133 

134 else: 

135 _logger.debug("Retrieving signed certificate...") 

136 

137 # Build an X.509 Certificate Signing Request 

138 builder = ( 

139 x509.CertificateSigningRequestBuilder() 

140 .subject_name( 

141 x509.Name( 

142 [ 

143 x509.NameAttribute( 

144 NameOID.EMAIL_ADDRESS, self._identity_token._identity 

145 ), 

146 ] 

147 ) 

148 ) 

149 .add_extension( 

150 x509.BasicConstraints(ca=False, path_length=None), 

151 critical=True, 

152 ) 

153 ) 

154 certificate_request = builder.sign(self._private_key, hashes.SHA256()) 

155 

156 certificate_response = self._signing_ctx._fulcio.signing_cert.post( 

157 certificate_request, self._identity_token 

158 ) 

159 

160 verify_sct( 

161 certificate_response.cert, 

162 certificate_response.chain, 

163 self._signing_ctx._trusted_root.ct_keyring(KeyringPurpose.SIGN), 

164 ) 

165 

166 _logger.debug("Successfully verified SCT...") 

167 

168 return certificate_response.cert 

169 

170 def _finalize_sign( 

171 self, 

172 cert: x509.Certificate, 

173 content: MessageSignature | dsse.Envelope, 

174 proposed_entry: EntryRequestBody, 

175 ) -> Bundle: 

176 """ 

177 Perform the common "finalizing" steps in a Sigstore signing flow. 

178 """ 

179 # If the user provided TSA urls, timestamps the response 

180 signed_timestamp = [] 

181 for tsa_client in self._signing_ctx._tsa_clients: 

182 try: 

183 signed_timestamp.append(tsa_client.request_timestamp(content.signature)) 

184 except TimestampError as e: 

185 _logger.warning( 

186 f"Unable to use {tsa_client.url} to timestamp the bundle. Failed with {e}" 

187 ) 

188 

189 # Submit the proposed entry to the transparency log 

190 entry = self._signing_ctx._rekor.create_entry(proposed_entry) 

191 _logger.debug( 

192 f"Transparency log entry created with index: {entry._inner.log_index}" 

193 ) 

194 

195 return Bundle._from_parts(cert, content, entry, signed_timestamp) 

196 

197 def sign_dsse( 

198 self, 

199 input_: dsse.Statement, 

200 ) -> Bundle: 

201 """ 

202 Sign the given in-toto statement as a DSSE envelope, and return a 

203 `Bundle` containing the signed result. 

204 

205 This API is **only** for in-toto statements; to sign arbitrary artifacts, 

206 use `sign_artifact` instead. 

207 """ 

208 cert = self._signing_cert() 

209 

210 # Sign the statement, producing a DSSE envelope 

211 content = dsse._sign(self._private_key, input_) 

212 

213 # Create the proposed DSSE log entry 

214 proposed_entry = self._signing_ctx._rekor._build_dsse_request( 

215 envelope=content, certificate=cert 

216 ) 

217 

218 return self._finalize_sign(cert, content, proposed_entry) 

219 

220 def sign_artifact( 

221 self, 

222 input_: bytes | sigstore_hashes.Hashed, 

223 ) -> Bundle: 

224 """ 

225 Sign an artifact, and return a `Bundle` corresponding to the signed result. 

226 

227 The input can be one of two forms: 

228 

229 1. A `bytes` buffer; 

230 2. A `Hashed` object, containing a pre-hashed input (e.g., for inputs 

231 that are too large to buffer into memory). 

232 

233 Regardless of the input format, the signing operation will produce a 

234 `hashedrekord` entry within the bundle. No other entry types 

235 are supported by this API. 

236 """ 

237 

238 cert = self._signing_cert() 

239 

240 # Sign artifact 

241 hashed_input = sha256_digest(input_) 

242 

243 artifact_signature = self._private_key.sign( 

244 hashed_input.digest, ec.ECDSA(hashed_input._as_prehashed()) 

245 ) 

246 

247 content = MessageSignature( 

248 message_digest=HashOutput( 

249 algorithm=hashed_input.algorithm, 

250 digest=base64.b64encode(hashed_input.digest), 

251 ), 

252 signature=base64.b64encode(artifact_signature), 

253 ) 

254 

255 # Create the proposed hashedrekord entry 

256 proposed_entry = self._signing_ctx._rekor._build_hashed_rekord_request( 

257 hashed_input=hashed_input, signature=artifact_signature, certificate=cert 

258 ) 

259 

260 return self._finalize_sign(cert, content, proposed_entry) 

261 

262 

263class SigningContext: 

264 """ 

265 Keep a context between signing operations. 

266 """ 

267 

268 def __init__( 

269 self, 

270 *, 

271 fulcio: FulcioClient, 

272 rekor: RekorLogSubmitter, 

273 trusted_root: TrustedRoot, 

274 tsa_clients: list[TimestampAuthorityClient] | None = None, 

275 ): 

276 """ 

277 Create a new `SigningContext`. 

278 

279 `fulcio` is a `FulcioClient` capable of connecting to a Fulcio instance 

280 and returning signing certificates. 

281 

282 `rekor` is a `RekorClient` capable of connecting to a Rekor instance 

283 and creating transparency log entries. 

284 """ 

285 self._fulcio = fulcio 

286 self._rekor = rekor 

287 self._trusted_root = trusted_root 

288 self._tsa_clients = tsa_clients or [] 

289 

290 @classmethod 

291 def from_trust_config(cls, trust_config: ClientTrustConfig) -> SigningContext: 

292 """ 

293 Create a `SigningContext` from the given `ClientTrustConfig`. 

294 

295 @api private 

296 """ 

297 signing_config = trust_config.signing_config 

298 return cls( 

299 fulcio=signing_config.get_fulcio(), 

300 rekor=signing_config.get_tlogs()[0], 

301 trusted_root=trust_config.trusted_root, 

302 tsa_clients=signing_config.get_tsas(), 

303 ) 

304 

305 @contextmanager 

306 def signer( 

307 self, identity_token: IdentityToken, *, cache: bool = True 

308 ) -> Iterator[Signer]: 

309 """ 

310 A context manager for signing operations. 

311 

312 `identity_token` is the identity token passed to the `Signer` instance 

313 and used to request a signing certificate from Fulcio. 

314 

315 `cache` determines whether the signing certificate and ephemeral private key 

316 generated by the `Signer` instance should be reused (until the certificate expires) 

317 to sign different artifacts. 

318 Default is `True`. 

319 """ 

320 yield Signer(identity_token, self, cache)