Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sigstore/sign.py: 41%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright 2022 The Sigstore Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""
16API for signing artifacts.
18Example:
20```python
21from pathlib import Path
23from sigstore.sign import SigningContext
24from sigstore.oidc import Issuer
26issuer = Issuer.production()
27identity = issuer.identity_token()
29# The artifact to sign
30artifact = Path("foo.txt").read_bytes()
32signing_ctx = SigningContext.production()
33with signing_ctx.signer(identity, cache=True) as signer:
34 result = signer.sign_artifact(artifact)
35 print(result)
36```
37"""
39from __future__ import annotations
41import base64
42import logging
43from collections.abc import Iterator
44from contextlib import contextmanager
45from datetime import datetime, timezone
47import cryptography.x509 as x509
48from cryptography.hazmat.primitives import hashes
49from cryptography.hazmat.primitives.asymmetric import ec
50from cryptography.x509.oid import NameOID
51from sigstore_models.common.v1 import HashOutput, MessageSignature
53from sigstore import dsse
54from sigstore import hashes as sigstore_hashes
55from sigstore._internal.fulcio import (
56 ExpiredCertificate,
57 FulcioClient,
58)
59from sigstore._internal.rekor import EntryRequestBody, RekorLogSubmitter
60from sigstore._internal.sct import verify_sct
61from sigstore._internal.timestamp import TimestampAuthorityClient, TimestampError
62from sigstore._internal.trust import KeyringPurpose
63from sigstore._utils import sha256_digest
64from sigstore.models import Bundle, ClientTrustConfig, TrustedRoot
65from sigstore.oidc import ExpiredIdentity, IdentityToken
67_logger = logging.getLogger(__name__)
70class Signer:
71 """
72 The primary API for signing operations.
73 """
75 def __init__(
76 self,
77 identity_token: IdentityToken,
78 signing_ctx: SigningContext,
79 cache: bool = True,
80 ) -> None:
81 """
82 Create a new `Signer`.
84 `identity_token` is the identity token used to request a signing certificate
85 from Fulcio.
87 `signing_ctx` is a `SigningContext` that keeps information about the signing
88 configuration.
90 `cache` determines whether the signing certificate and ephemeral private key
91 should be reused (until the certificate expires) to sign different artifacts.
92 Default is `True`.
93 """
94 self._identity_token = identity_token
95 self._signing_ctx: SigningContext = signing_ctx
96 self.__cached_private_key: ec.EllipticCurvePrivateKey | None = None
97 self.__cached_signing_certificate: x509.Certificate | None = None
98 if cache:
99 _logger.debug("Generating ephemeral keys...")
100 self.__cached_private_key = ec.generate_private_key(ec.SECP256R1())
101 _logger.debug("Requesting ephemeral certificate...")
102 self.__cached_signing_certificate = self._signing_cert()
104 @property
105 def _private_key(self) -> ec.EllipticCurvePrivateKey:
106 """Get or generate a signing key."""
107 if self.__cached_private_key is None:
108 _logger.debug("no cached key; generating ephemeral key")
109 return ec.generate_private_key(ec.SECP256R1())
110 return self.__cached_private_key
112 def _signing_cert(
113 self,
114 ) -> x509.Certificate:
115 """
116 Get or request a signing certificate from Fulcio.
118 Internally, this performs a CSR against Fulcio and verifies that
119 the returned certificate is present in Fulcio's CT log.
120 """
122 # Our CSR cannot possibly succeed if our underlying identity token
123 # is expired.
124 if not self._identity_token.in_validity_period():
125 raise ExpiredIdentity
127 # If it exists, verify if the current certificate is expired
128 if self.__cached_signing_certificate:
129 not_valid_after = self.__cached_signing_certificate.not_valid_after_utc
130 if datetime.now(timezone.utc) > not_valid_after:
131 raise ExpiredCertificate
132 return self.__cached_signing_certificate
134 else:
135 _logger.debug("Retrieving signed certificate...")
137 # Build an X.509 Certificate Signing Request
138 builder = (
139 x509.CertificateSigningRequestBuilder()
140 .subject_name(
141 x509.Name(
142 [
143 x509.NameAttribute(
144 NameOID.EMAIL_ADDRESS, self._identity_token._identity
145 ),
146 ]
147 )
148 )
149 .add_extension(
150 x509.BasicConstraints(ca=False, path_length=None),
151 critical=True,
152 )
153 )
154 certificate_request = builder.sign(self._private_key, hashes.SHA256())
156 certificate_response = self._signing_ctx._fulcio.signing_cert.post(
157 certificate_request, self._identity_token
158 )
160 verify_sct(
161 certificate_response.cert,
162 certificate_response.chain,
163 self._signing_ctx._trusted_root.ct_keyring(KeyringPurpose.SIGN),
164 )
166 _logger.debug("Successfully verified SCT...")
168 return certificate_response.cert
170 def _finalize_sign(
171 self,
172 cert: x509.Certificate,
173 content: MessageSignature | dsse.Envelope,
174 proposed_entry: EntryRequestBody,
175 ) -> Bundle:
176 """
177 Perform the common "finalizing" steps in a Sigstore signing flow.
178 """
179 # If the user provided TSA urls, timestamps the response
180 signed_timestamp = []
181 for tsa_client in self._signing_ctx._tsa_clients:
182 try:
183 signed_timestamp.append(tsa_client.request_timestamp(content.signature))
184 except TimestampError as e:
185 _logger.warning(
186 f"Unable to use {tsa_client.url} to timestamp the bundle. Failed with {e}"
187 )
189 # Submit the proposed entry to the transparency log
190 entry = self._signing_ctx._rekor.create_entry(proposed_entry)
191 _logger.debug(
192 f"Transparency log entry created with index: {entry._inner.log_index}"
193 )
195 return Bundle._from_parts(cert, content, entry, signed_timestamp)
197 def sign_dsse(
198 self,
199 input_: dsse.Statement,
200 ) -> Bundle:
201 """
202 Sign the given in-toto statement as a DSSE envelope, and return a
203 `Bundle` containing the signed result.
205 This API is **only** for in-toto statements; to sign arbitrary artifacts,
206 use `sign_artifact` instead.
207 """
208 cert = self._signing_cert()
210 # Sign the statement, producing a DSSE envelope
211 content = dsse._sign(self._private_key, input_)
213 # Create the proposed DSSE log entry
214 proposed_entry = self._signing_ctx._rekor._build_dsse_request(
215 envelope=content, certificate=cert
216 )
218 return self._finalize_sign(cert, content, proposed_entry)
220 def sign_artifact(
221 self,
222 input_: bytes | sigstore_hashes.Hashed,
223 ) -> Bundle:
224 """
225 Sign an artifact, and return a `Bundle` corresponding to the signed result.
227 The input can be one of two forms:
229 1. A `bytes` buffer;
230 2. A `Hashed` object, containing a pre-hashed input (e.g., for inputs
231 that are too large to buffer into memory).
233 Regardless of the input format, the signing operation will produce a
234 `hashedrekord` entry within the bundle. No other entry types
235 are supported by this API.
236 """
238 cert = self._signing_cert()
240 # Sign artifact
241 hashed_input = sha256_digest(input_)
243 artifact_signature = self._private_key.sign(
244 hashed_input.digest, ec.ECDSA(hashed_input._as_prehashed())
245 )
247 content = MessageSignature(
248 message_digest=HashOutput(
249 algorithm=hashed_input.algorithm,
250 digest=base64.b64encode(hashed_input.digest),
251 ),
252 signature=base64.b64encode(artifact_signature),
253 )
255 # Create the proposed hashedrekord entry
256 proposed_entry = self._signing_ctx._rekor._build_hashed_rekord_request(
257 hashed_input=hashed_input, signature=artifact_signature, certificate=cert
258 )
260 return self._finalize_sign(cert, content, proposed_entry)
263class SigningContext:
264 """
265 Keep a context between signing operations.
266 """
268 def __init__(
269 self,
270 *,
271 fulcio: FulcioClient,
272 rekor: RekorLogSubmitter,
273 trusted_root: TrustedRoot,
274 tsa_clients: list[TimestampAuthorityClient] | None = None,
275 ):
276 """
277 Create a new `SigningContext`.
279 `fulcio` is a `FulcioClient` capable of connecting to a Fulcio instance
280 and returning signing certificates.
282 `rekor` is a `RekorClient` capable of connecting to a Rekor instance
283 and creating transparency log entries.
284 """
285 self._fulcio = fulcio
286 self._rekor = rekor
287 self._trusted_root = trusted_root
288 self._tsa_clients = tsa_clients or []
290 @classmethod
291 def from_trust_config(cls, trust_config: ClientTrustConfig) -> SigningContext:
292 """
293 Create a `SigningContext` from the given `ClientTrustConfig`.
295 @api private
296 """
297 signing_config = trust_config.signing_config
298 return cls(
299 fulcio=signing_config.get_fulcio(),
300 rekor=signing_config.get_tlogs()[0],
301 trusted_root=trust_config.trusted_root,
302 tsa_clients=signing_config.get_tsas(),
303 )
305 @contextmanager
306 def signer(
307 self, identity_token: IdentityToken, *, cache: bool = True
308 ) -> Iterator[Signer]:
309 """
310 A context manager for signing operations.
312 `identity_token` is the identity token passed to the `Signer` instance
313 and used to request a signing certificate from Fulcio.
315 `cache` determines whether the signing certificate and ephemeral private key
316 generated by the `Signer` instance should be reused (until the certificate expires)
317 to sign different artifacts.
318 Default is `True`.
319 """
320 yield Signer(identity_token, self, cache)