Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sigstore/sign.py: 41%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright 2022 The Sigstore Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""
16API for signing artifacts.
18Example:
20```python
21from pathlib import Path
22from sigstore.models import ClientTrustConfig
23from sigstore.oidc import Issuer
24from sigstore.sign import SigningContext
26artifact_path = Path("README.md")
28# Construct OIDC Issuer and SigningContext for the
29# Sigstore Public Good instance
30trust_config = ClientTrustConfig.production()
31issuer = Issuer(trust_config.signing_config.get_oidc_url())
32context = SigningContext.from_trust_config(trust_config)
34# Get an identity token from OIDC provider using interactive auth
35token = issuer.identity_token()
37# Sign artifact with the identity
38with context.signer(token, cache = True) as signer:
39 bundle = signer.sign_artifact(artifact_path.read_bytes())
41with Path("README.md.sigstore.json").open("w") as f:
42 f.write(bundle.to_json())
43```
44"""
46from __future__ import annotations
48import base64
49import logging
50from collections.abc import Iterator
51from contextlib import contextmanager
52from datetime import datetime, timezone
54import cryptography.x509 as x509
55from cryptography.hazmat.primitives import hashes
56from cryptography.hazmat.primitives.asymmetric import ec
57from cryptography.x509.oid import NameOID
58from sigstore_models.common.v1 import HashOutput, MessageSignature
60from sigstore import dsse
61from sigstore import hashes as sigstore_hashes
62from sigstore._internal.fulcio import (
63 ExpiredCertificate,
64 FulcioClient,
65)
66from sigstore._internal.rekor import EntryRequestBody, RekorLogSubmitter
67from sigstore._internal.sct import verify_sct
68from sigstore._internal.timestamp import TimestampAuthorityClient, TimestampError
69from sigstore._internal.trust import KeyringPurpose
70from sigstore._utils import sha256_digest
71from sigstore.models import Bundle, ClientTrustConfig, TrustedRoot
72from sigstore.oidc import ExpiredIdentity, IdentityToken
74_logger = logging.getLogger(__name__)
77class Signer:
78 """
79 The primary API for signing operations.
80 """
82 def __init__(
83 self,
84 identity_token: IdentityToken,
85 signing_ctx: SigningContext,
86 cache: bool = True,
87 ) -> None:
88 """
89 Create a new `Signer`.
91 `identity_token` is the identity token used to request a signing certificate
92 from Fulcio.
94 `signing_ctx` is a `SigningContext` that keeps information about the signing
95 configuration.
97 `cache` determines whether the signing certificate and ephemeral private key
98 should be reused (until the certificate expires) to sign different artifacts.
99 Default is `True`.
100 """
101 self._identity_token = identity_token
102 self._signing_ctx: SigningContext = signing_ctx
103 self.__cached_private_key: ec.EllipticCurvePrivateKey | None = None
104 self.__cached_signing_certificate: x509.Certificate | None = None
105 if cache:
106 _logger.debug("Generating ephemeral keys...")
107 self.__cached_private_key = ec.generate_private_key(ec.SECP256R1())
108 _logger.debug("Requesting ephemeral certificate...")
109 self.__cached_signing_certificate = self._signing_cert()
111 @property
112 def _private_key(self) -> ec.EllipticCurvePrivateKey:
113 """Get or generate a signing key."""
114 if self.__cached_private_key is None:
115 _logger.debug("no cached key; generating ephemeral key")
116 return ec.generate_private_key(ec.SECP256R1())
117 return self.__cached_private_key
119 def _signing_cert(
120 self,
121 ) -> x509.Certificate:
122 """
123 Get or request a signing certificate from Fulcio.
125 Internally, this performs a CSR against Fulcio and verifies that
126 the returned certificate is present in Fulcio's CT log.
127 """
129 # Our CSR cannot possibly succeed if our underlying identity token
130 # is expired.
131 if not self._identity_token.in_validity_period():
132 raise ExpiredIdentity
134 # If it exists, verify if the current certificate is expired
135 if self.__cached_signing_certificate:
136 not_valid_after = self.__cached_signing_certificate.not_valid_after_utc
137 if datetime.now(timezone.utc) > not_valid_after:
138 raise ExpiredCertificate
139 return self.__cached_signing_certificate
141 else:
142 _logger.debug("Retrieving signed certificate...")
144 # Build an X.509 Certificate Signing Request
145 builder = (
146 x509.CertificateSigningRequestBuilder()
147 .subject_name(
148 x509.Name(
149 [
150 x509.NameAttribute(
151 NameOID.EMAIL_ADDRESS, self._identity_token._identity
152 ),
153 ]
154 )
155 )
156 .add_extension(
157 x509.BasicConstraints(ca=False, path_length=None),
158 critical=True,
159 )
160 )
161 certificate_request = builder.sign(self._private_key, hashes.SHA256())
163 certificate_response = self._signing_ctx._fulcio.signing_cert.post(
164 certificate_request, self._identity_token
165 )
167 verify_sct(
168 certificate_response.cert,
169 certificate_response.chain,
170 self._signing_ctx._trusted_root.ct_keyring(KeyringPurpose.SIGN),
171 )
173 _logger.debug("Successfully verified SCT...")
175 return certificate_response.cert
177 def _finalize_sign(
178 self,
179 cert: x509.Certificate,
180 content: MessageSignature | dsse.Envelope,
181 proposed_entry: EntryRequestBody,
182 ) -> Bundle:
183 """
184 Perform the common "finalizing" steps in a Sigstore signing flow.
185 """
186 # If the user provided TSA urls, timestamps the response
187 signed_timestamp = []
188 for tsa_client in self._signing_ctx._tsa_clients:
189 try:
190 signed_timestamp.append(tsa_client.request_timestamp(content.signature))
191 except TimestampError as e:
192 _logger.warning(
193 f"Unable to use {tsa_client.url} to timestamp the bundle. Failed with {e}"
194 )
196 # Submit the proposed entry to the transparency log
197 entry = self._signing_ctx._rekor.create_entry(proposed_entry)
198 _logger.debug(
199 f"Transparency log entry created with index: {entry._inner.log_index}"
200 )
202 return Bundle._from_parts(cert, content, entry, signed_timestamp)
204 def sign_dsse(
205 self,
206 input_: dsse.Statement,
207 ) -> Bundle:
208 """
209 Sign the given in-toto statement as a DSSE envelope, and return a
210 `Bundle` containing the signed result.
212 This API is **only** for in-toto statements; to sign arbitrary artifacts,
213 use `sign_artifact` instead.
214 """
215 cert = self._signing_cert()
217 # Sign the statement, producing a DSSE envelope
218 content = dsse._sign(self._private_key, input_)
220 # Create the proposed DSSE log entry
221 proposed_entry = self._signing_ctx._rekor._build_dsse_request(
222 envelope=content, certificate=cert
223 )
225 return self._finalize_sign(cert, content, proposed_entry)
227 def sign_artifact(
228 self,
229 input_: bytes | sigstore_hashes.Hashed,
230 ) -> Bundle:
231 """
232 Sign an artifact, and return a `Bundle` corresponding to the signed result.
234 The input can be one of two forms:
236 1. A `bytes` buffer;
237 2. A `Hashed` object, containing a pre-hashed input (e.g., for inputs
238 that are too large to buffer into memory).
240 Regardless of the input format, the signing operation will produce a
241 `hashedrekord` entry within the bundle. No other entry types
242 are supported by this API.
243 """
245 cert = self._signing_cert()
247 # Sign artifact
248 hashed_input = sha256_digest(input_)
250 artifact_signature = self._private_key.sign(
251 hashed_input.digest, ec.ECDSA(hashed_input._as_prehashed())
252 )
254 content = MessageSignature(
255 message_digest=HashOutput(
256 algorithm=hashed_input.algorithm,
257 digest=base64.b64encode(hashed_input.digest),
258 ),
259 signature=base64.b64encode(artifact_signature),
260 )
262 # Create the proposed hashedrekord entry
263 proposed_entry = self._signing_ctx._rekor._build_hashed_rekord_request(
264 hashed_input=hashed_input, signature=artifact_signature, certificate=cert
265 )
267 return self._finalize_sign(cert, content, proposed_entry)
270class SigningContext:
271 """
272 Keep a context between signing operations.
273 """
275 def __init__(
276 self,
277 *,
278 fulcio: FulcioClient,
279 rekor: RekorLogSubmitter,
280 trusted_root: TrustedRoot,
281 tsa_clients: list[TimestampAuthorityClient] | None = None,
282 ):
283 """
284 Create a new `SigningContext`.
286 `fulcio` is a `FulcioClient` capable of connecting to a Fulcio instance
287 and returning signing certificates.
289 `rekor` is a `RekorClient` capable of connecting to a Rekor instance
290 and creating transparency log entries.
291 """
292 self._fulcio = fulcio
293 self._rekor = rekor
294 self._trusted_root = trusted_root
295 self._tsa_clients = tsa_clients or []
297 @classmethod
298 def from_trust_config(cls, trust_config: ClientTrustConfig) -> SigningContext:
299 """
300 Create a `SigningContext` from the given `ClientTrustConfig`.
302 @api private
303 """
304 signing_config = trust_config.signing_config
305 return cls(
306 fulcio=signing_config.get_fulcio(),
307 rekor=signing_config.get_tlogs()[0],
308 trusted_root=trust_config.trusted_root,
309 tsa_clients=signing_config.get_tsas(),
310 )
312 @contextmanager
313 def signer(
314 self, identity_token: IdentityToken, *, cache: bool = True
315 ) -> Iterator[Signer]:
316 """
317 A context manager for signing operations.
319 `identity_token` is the identity token passed to the `Signer` instance
320 and used to request a signing certificate from Fulcio.
322 `cache` determines whether the signing certificate and ephemeral private key
323 generated by the `Signer` instance should be reused (until the certificate expires)
324 to sign different artifacts.
325 Default is `True`.
326 """
327 yield Signer(identity_token, self, cache)