Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sigstore/sign.py: 43%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

100 statements  

1# Copyright 2022 The Sigstore Authors 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15""" 

16API for signing artifacts. 

17 

18Example: 

19 

20```python 

21from pathlib import Path 

22 

23from sigstore.sign import SigningContext 

24from sigstore.oidc import Issuer 

25 

26issuer = Issuer.production() 

27identity = issuer.identity_token() 

28 

29# The artifact to sign 

30artifact = Path("foo.txt").read_bytes() 

31 

32signing_ctx = SigningContext.production() 

33with signing_ctx.signer(identity, cache=True) as signer: 

34 result = signer.sign_artifact(artifact) 

35 print(result) 

36``` 

37""" 

38 

39from __future__ import annotations 

40 

41import base64 

42import logging 

43from collections.abc import Iterator 

44from contextlib import contextmanager 

45from datetime import datetime, timezone 

46from typing import Optional 

47 

48import cryptography.x509 as x509 

49import rekor_types 

50from cryptography.hazmat.primitives import hashes, serialization 

51from cryptography.hazmat.primitives.asymmetric import ec 

52from cryptography.x509.oid import NameOID 

53from sigstore_protobuf_specs.dev.sigstore.common.v1 import ( 

54 HashOutput, 

55 MessageSignature, 

56) 

57 

58from sigstore import dsse 

59from sigstore import hashes as sigstore_hashes 

60from sigstore._internal.fulcio import ( 

61 ExpiredCertificate, 

62 FulcioClient, 

63) 

64from sigstore._internal.rekor.client import RekorClient 

65from sigstore._internal.sct import verify_sct 

66from sigstore._internal.timestamp import TimestampAuthorityClient, TimestampError 

67from sigstore._internal.trust import ClientTrustConfig, KeyringPurpose, TrustedRoot 

68from sigstore._utils import sha256_digest 

69from sigstore.models import Bundle 

70from sigstore.oidc import ExpiredIdentity, IdentityToken 

71 

72_logger = logging.getLogger(__name__) 

73 

74 

75class Signer: 

76 """ 

77 The primary API for signing operations. 

78 """ 

79 

80 def __init__( 

81 self, 

82 identity_token: IdentityToken, 

83 signing_ctx: SigningContext, 

84 cache: bool = True, 

85 ) -> None: 

86 """ 

87 Create a new `Signer`. 

88 

89 `identity_token` is the identity token used to request a signing certificate 

90 from Fulcio. 

91 

92 `signing_ctx` is a `SigningContext` that keeps information about the signing 

93 configuration. 

94 

95 `cache` determines whether the signing certificate and ephemeral private key 

96 should be reused (until the certificate expires) to sign different artifacts. 

97 Default is `True`. 

98 """ 

99 self._identity_token = identity_token 

100 self._signing_ctx: SigningContext = signing_ctx 

101 self.__cached_private_key: Optional[ec.EllipticCurvePrivateKey] = None 

102 self.__cached_signing_certificate: Optional[x509.Certificate] = None 

103 if cache: 

104 _logger.debug("Generating ephemeral keys...") 

105 self.__cached_private_key = ec.generate_private_key(ec.SECP256R1()) 

106 _logger.debug("Requesting ephemeral certificate...") 

107 self.__cached_signing_certificate = self._signing_cert() 

108 

109 @property 

110 def _private_key(self) -> ec.EllipticCurvePrivateKey: 

111 """Get or generate a signing key.""" 

112 if self.__cached_private_key is None: 

113 _logger.debug("no cached key; generating ephemeral key") 

114 return ec.generate_private_key(ec.SECP256R1()) 

115 return self.__cached_private_key 

116 

117 def _signing_cert( 

118 self, 

119 ) -> x509.Certificate: 

120 """ 

121 Get or request a signing certificate from Fulcio. 

122 

123 Internally, this performs a CSR against Fulcio and verifies that 

124 the returned certificate is present in Fulcio's CT log. 

125 """ 

126 

127 # Our CSR cannot possibly succeed if our underlying identity token 

128 # is expired. 

129 if not self._identity_token.in_validity_period(): 

130 raise ExpiredIdentity 

131 

132 # If it exists, verify if the current certificate is expired 

133 if self.__cached_signing_certificate: 

134 not_valid_after = self.__cached_signing_certificate.not_valid_after_utc 

135 if datetime.now(timezone.utc) > not_valid_after: 

136 raise ExpiredCertificate 

137 return self.__cached_signing_certificate 

138 

139 else: 

140 _logger.debug("Retrieving signed certificate...") 

141 

142 # Build an X.509 Certificate Signing Request 

143 builder = ( 

144 x509.CertificateSigningRequestBuilder() 

145 .subject_name( 

146 x509.Name( 

147 [ 

148 x509.NameAttribute( 

149 NameOID.EMAIL_ADDRESS, self._identity_token._identity 

150 ), 

151 ] 

152 ) 

153 ) 

154 .add_extension( 

155 x509.BasicConstraints(ca=False, path_length=None), 

156 critical=True, 

157 ) 

158 ) 

159 certificate_request = builder.sign(self._private_key, hashes.SHA256()) 

160 

161 certificate_response = self._signing_ctx._fulcio.signing_cert.post( 

162 certificate_request, self._identity_token 

163 ) 

164 

165 verify_sct( 

166 certificate_response.cert, 

167 certificate_response.chain, 

168 self._signing_ctx._trusted_root.ct_keyring(KeyringPurpose.SIGN), 

169 ) 

170 

171 _logger.debug("Successfully verified SCT...") 

172 

173 return certificate_response.cert 

174 

175 def _finalize_sign( 

176 self, 

177 cert: x509.Certificate, 

178 content: MessageSignature | dsse.Envelope, 

179 proposed_entry: rekor_types.Hashedrekord | rekor_types.Dsse, 

180 ) -> Bundle: 

181 """ 

182 Perform the common "finalizing" steps in a Sigstore signing flow. 

183 """ 

184 # Submit the proposed entry to the transparency log 

185 entry = self._signing_ctx._rekor.log.entries.post(proposed_entry) 

186 

187 _logger.debug(f"Transparency log entry created with index: {entry.log_index}") 

188 

189 # If the user provided TSA urls, timestamps the response 

190 signed_timestamp = [] 

191 for tsa_client in self._signing_ctx._tsa_clients: 

192 try: 

193 signed_timestamp.append(tsa_client.request_timestamp(content.signature)) 

194 except TimestampError as e: 

195 _logger.warning( 

196 f"Unable to use {tsa_client.url} to timestamp the bundle. Failed with {e}" 

197 ) 

198 

199 return Bundle._from_parts(cert, content, entry, signed_timestamp) 

200 

201 def sign_dsse( 

202 self, 

203 input_: dsse.Statement, 

204 ) -> Bundle: 

205 """ 

206 Sign the given in-toto statement as a DSSE envelope, and return a 

207 `Bundle` containing the signed result. 

208 

209 This API is **only** for in-toto statements; to sign arbitrary artifacts, 

210 use `sign_artifact` instead. 

211 """ 

212 cert = self._signing_cert() 

213 

214 # Prepare inputs 

215 b64_cert = base64.b64encode( 

216 cert.public_bytes(encoding=serialization.Encoding.PEM) 

217 ) 

218 

219 # Sign the statement, producing a DSSE envelope 

220 content = dsse._sign(self._private_key, input_) 

221 

222 # Create the proposed DSSE log entry 

223 proposed_entry = rekor_types.Dsse( 

224 spec=rekor_types.dsse.DsseSchema( 

225 # NOTE: mypy can't see that this kwarg is correct due to two interacting 

226 # behaviors/bugs (one pydantic, one datamodel-codegen): 

227 # See: <https://github.com/pydantic/pydantic/discussions/7418#discussioncomment-9024927> 

228 # See: <https://github.com/koxudaxi/datamodel-code-generator/issues/1903> 

229 proposed_content=rekor_types.dsse.ProposedContent( # type: ignore[call-arg] 

230 envelope=content.to_json(), 

231 verifiers=[b64_cert.decode()], 

232 ), 

233 ), 

234 ) 

235 

236 return self._finalize_sign(cert, content, proposed_entry) 

237 

238 def sign_artifact( 

239 self, 

240 input_: bytes | sigstore_hashes.Hashed, 

241 ) -> Bundle: 

242 """ 

243 Sign an artifact, and return a `Bundle` corresponding to the signed result. 

244 

245 The input can be one of two forms: 

246 

247 1. A `bytes` buffer; 

248 2. A `Hashed` object, containing a pre-hashed input (e.g., for inputs 

249 that are too large to buffer into memory). 

250 

251 Regardless of the input format, the signing operation will produce a 

252 `hashedrekord` entry within the bundle. No other entry types 

253 are supported by this API. 

254 """ 

255 

256 cert = self._signing_cert() 

257 

258 # Prepare inputs 

259 b64_cert = base64.b64encode( 

260 cert.public_bytes(encoding=serialization.Encoding.PEM) 

261 ) 

262 

263 # Sign artifact 

264 hashed_input = sha256_digest(input_) 

265 

266 artifact_signature = self._private_key.sign( 

267 hashed_input.digest, ec.ECDSA(hashed_input._as_prehashed()) 

268 ) 

269 

270 content = MessageSignature( 

271 message_digest=HashOutput( 

272 algorithm=hashed_input.algorithm, 

273 digest=hashed_input.digest, 

274 ), 

275 signature=artifact_signature, 

276 ) 

277 

278 # Create the proposed hashedrekord entry 

279 proposed_entry = rekor_types.Hashedrekord( 

280 spec=rekor_types.hashedrekord.HashedrekordV001Schema( 

281 signature=rekor_types.hashedrekord.Signature( 

282 content=base64.b64encode(artifact_signature).decode(), 

283 public_key=rekor_types.hashedrekord.PublicKey( 

284 content=b64_cert.decode() 

285 ), 

286 ), 

287 data=rekor_types.hashedrekord.Data( 

288 hash=rekor_types.hashedrekord.Hash( 

289 algorithm=hashed_input._as_hashedrekord_algorithm(), 

290 value=hashed_input.digest.hex(), 

291 ) 

292 ), 

293 ), 

294 ) 

295 

296 return self._finalize_sign(cert, content, proposed_entry) 

297 

298 

299class SigningContext: 

300 """ 

301 Keep a context between signing operations. 

302 """ 

303 

304 def __init__( 

305 self, 

306 *, 

307 fulcio: FulcioClient, 

308 rekor: RekorClient, 

309 trusted_root: TrustedRoot, 

310 tsa_clients: list[TimestampAuthorityClient] | None = None, 

311 ): 

312 """ 

313 Create a new `SigningContext`. 

314 

315 `fulcio` is a `FulcioClient` capable of connecting to a Fulcio instance 

316 and returning signing certificates. 

317 

318 `rekor` is a `RekorClient` capable of connecting to a Rekor instance 

319 and creating transparency log entries. 

320 """ 

321 self._fulcio = fulcio 

322 self._rekor = rekor 

323 self._trusted_root = trusted_root 

324 self._tsa_clients = tsa_clients or [] 

325 

326 @classmethod 

327 def production(cls) -> SigningContext: 

328 """ 

329 Return a `SigningContext` instance configured against Sigstore's production-level services. 

330 """ 

331 return cls( 

332 fulcio=FulcioClient.production(), 

333 rekor=RekorClient.production(), 

334 trusted_root=TrustedRoot.production(), 

335 ) 

336 

337 @classmethod 

338 def staging(cls) -> SigningContext: 

339 """ 

340 Return a `SignerContext` instance configured against Sigstore's staging-level services. 

341 """ 

342 return cls( 

343 fulcio=FulcioClient.staging(), 

344 rekor=RekorClient.staging(), 

345 trusted_root=TrustedRoot.staging(), 

346 ) 

347 

348 @classmethod 

349 def _from_trust_config(cls, trust_config: ClientTrustConfig) -> SigningContext: 

350 """ 

351 Create a `SigningContext` from the given `ClientTrustConfig`. 

352 

353 @api private 

354 """ 

355 return cls( 

356 fulcio=FulcioClient(trust_config._inner.signing_config.ca_url), 

357 rekor=RekorClient(trust_config._inner.signing_config.tlog_urls[0]), 

358 trusted_root=trust_config.trusted_root, 

359 tsa_clients=[ 

360 TimestampAuthorityClient(tsa_url) 

361 for tsa_url in trust_config._inner.signing_config.tsa_urls 

362 ], 

363 ) 

364 

365 @contextmanager 

366 def signer( 

367 self, identity_token: IdentityToken, *, cache: bool = True 

368 ) -> Iterator[Signer]: 

369 """ 

370 A context manager for signing operations. 

371 

372 `identity_token` is the identity token passed to the `Signer` instance 

373 and used to request a signing certificate from Fulcio. 

374 

375 `cache` determines whether the signing certificate and ephemeral private key 

376 generated by the `Signer` instance should be reused (until the certificate expires) 

377 to sign different artifacts. 

378 Default is `True`. 

379 """ 

380 yield Signer(identity_token, self, cache)