Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sigstore/sign.py: 43%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright 2022 The Sigstore Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""
16API for signing artifacts.
18Example:
20```python
21from pathlib import Path
23from sigstore.sign import SigningContext
24from sigstore.oidc import Issuer
26issuer = Issuer.production()
27identity = issuer.identity_token()
29# The artifact to sign
30artifact = Path("foo.txt").read_bytes()
32signing_ctx = SigningContext.production()
33with signing_ctx.signer(identity, cache=True) as signer:
34 result = signer.sign_artifact(artifact)
35 print(result)
36```
37"""
39from __future__ import annotations
41import base64
42import logging
43from collections.abc import Iterator
44from contextlib import contextmanager
45from datetime import datetime, timezone
46from typing import Optional
48import cryptography.x509 as x509
49import rekor_types
50from cryptography.hazmat.primitives import hashes, serialization
51from cryptography.hazmat.primitives.asymmetric import ec
52from cryptography.x509.oid import NameOID
53from sigstore_protobuf_specs.dev.sigstore.common.v1 import (
54 HashOutput,
55 MessageSignature,
56)
58from sigstore import dsse
59from sigstore import hashes as sigstore_hashes
60from sigstore._internal.fulcio import (
61 ExpiredCertificate,
62 FulcioClient,
63)
64from sigstore._internal.rekor.client import RekorClient
65from sigstore._internal.sct import verify_sct
66from sigstore._internal.timestamp import TimestampAuthorityClient, TimestampError
67from sigstore._internal.trust import ClientTrustConfig, KeyringPurpose, TrustedRoot
68from sigstore._utils import sha256_digest
69from sigstore.models import Bundle
70from sigstore.oidc import ExpiredIdentity, IdentityToken
72_logger = logging.getLogger(__name__)
75class Signer:
76 """
77 The primary API for signing operations.
78 """
80 def __init__(
81 self,
82 identity_token: IdentityToken,
83 signing_ctx: SigningContext,
84 cache: bool = True,
85 ) -> None:
86 """
87 Create a new `Signer`.
89 `identity_token` is the identity token used to request a signing certificate
90 from Fulcio.
92 `signing_ctx` is a `SigningContext` that keeps information about the signing
93 configuration.
95 `cache` determines whether the signing certificate and ephemeral private key
96 should be reused (until the certificate expires) to sign different artifacts.
97 Default is `True`.
98 """
99 self._identity_token = identity_token
100 self._signing_ctx: SigningContext = signing_ctx
101 self.__cached_private_key: Optional[ec.EllipticCurvePrivateKey] = None
102 self.__cached_signing_certificate: Optional[x509.Certificate] = None
103 if cache:
104 _logger.debug("Generating ephemeral keys...")
105 self.__cached_private_key = ec.generate_private_key(ec.SECP256R1())
106 _logger.debug("Requesting ephemeral certificate...")
107 self.__cached_signing_certificate = self._signing_cert()
109 @property
110 def _private_key(self) -> ec.EllipticCurvePrivateKey:
111 """Get or generate a signing key."""
112 if self.__cached_private_key is None:
113 _logger.debug("no cached key; generating ephemeral key")
114 return ec.generate_private_key(ec.SECP256R1())
115 return self.__cached_private_key
117 def _signing_cert(
118 self,
119 ) -> x509.Certificate:
120 """
121 Get or request a signing certificate from Fulcio.
123 Internally, this performs a CSR against Fulcio and verifies that
124 the returned certificate is present in Fulcio's CT log.
125 """
127 # Our CSR cannot possibly succeed if our underlying identity token
128 # is expired.
129 if not self._identity_token.in_validity_period():
130 raise ExpiredIdentity
132 # If it exists, verify if the current certificate is expired
133 if self.__cached_signing_certificate:
134 not_valid_after = self.__cached_signing_certificate.not_valid_after_utc
135 if datetime.now(timezone.utc) > not_valid_after:
136 raise ExpiredCertificate
137 return self.__cached_signing_certificate
139 else:
140 _logger.debug("Retrieving signed certificate...")
142 # Build an X.509 Certificate Signing Request
143 builder = (
144 x509.CertificateSigningRequestBuilder()
145 .subject_name(
146 x509.Name(
147 [
148 x509.NameAttribute(
149 NameOID.EMAIL_ADDRESS, self._identity_token._identity
150 ),
151 ]
152 )
153 )
154 .add_extension(
155 x509.BasicConstraints(ca=False, path_length=None),
156 critical=True,
157 )
158 )
159 certificate_request = builder.sign(self._private_key, hashes.SHA256())
161 certificate_response = self._signing_ctx._fulcio.signing_cert.post(
162 certificate_request, self._identity_token
163 )
165 verify_sct(
166 certificate_response.cert,
167 certificate_response.chain,
168 self._signing_ctx._trusted_root.ct_keyring(KeyringPurpose.SIGN),
169 )
171 _logger.debug("Successfully verified SCT...")
173 return certificate_response.cert
175 def _finalize_sign(
176 self,
177 cert: x509.Certificate,
178 content: MessageSignature | dsse.Envelope,
179 proposed_entry: rekor_types.Hashedrekord | rekor_types.Dsse,
180 ) -> Bundle:
181 """
182 Perform the common "finalizing" steps in a Sigstore signing flow.
183 """
184 # Submit the proposed entry to the transparency log
185 entry = self._signing_ctx._rekor.log.entries.post(proposed_entry)
187 _logger.debug(f"Transparency log entry created with index: {entry.log_index}")
189 # If the user provided TSA urls, timestamps the response
190 signed_timestamp = []
191 for tsa_client in self._signing_ctx._tsa_clients:
192 try:
193 signed_timestamp.append(tsa_client.request_timestamp(content.signature))
194 except TimestampError as e:
195 _logger.warning(
196 f"Unable to use {tsa_client.url} to timestamp the bundle. Failed with {e}"
197 )
199 return Bundle._from_parts(cert, content, entry, signed_timestamp)
201 def sign_dsse(
202 self,
203 input_: dsse.Statement,
204 ) -> Bundle:
205 """
206 Sign the given in-toto statement as a DSSE envelope, and return a
207 `Bundle` containing the signed result.
209 This API is **only** for in-toto statements; to sign arbitrary artifacts,
210 use `sign_artifact` instead.
211 """
212 cert = self._signing_cert()
214 # Prepare inputs
215 b64_cert = base64.b64encode(
216 cert.public_bytes(encoding=serialization.Encoding.PEM)
217 )
219 # Sign the statement, producing a DSSE envelope
220 content = dsse._sign(self._private_key, input_)
222 # Create the proposed DSSE log entry
223 proposed_entry = rekor_types.Dsse(
224 spec=rekor_types.dsse.DsseSchema(
225 # NOTE: mypy can't see that this kwarg is correct due to two interacting
226 # behaviors/bugs (one pydantic, one datamodel-codegen):
227 # See: <https://github.com/pydantic/pydantic/discussions/7418#discussioncomment-9024927>
228 # See: <https://github.com/koxudaxi/datamodel-code-generator/issues/1903>
229 proposed_content=rekor_types.dsse.ProposedContent( # type: ignore[call-arg]
230 envelope=content.to_json(),
231 verifiers=[b64_cert.decode()],
232 ),
233 ),
234 )
236 return self._finalize_sign(cert, content, proposed_entry)
238 def sign_artifact(
239 self,
240 input_: bytes | sigstore_hashes.Hashed,
241 ) -> Bundle:
242 """
243 Sign an artifact, and return a `Bundle` corresponding to the signed result.
245 The input can be one of two forms:
247 1. A `bytes` buffer;
248 2. A `Hashed` object, containing a pre-hashed input (e.g., for inputs
249 that are too large to buffer into memory).
251 Regardless of the input format, the signing operation will produce a
252 `hashedrekord` entry within the bundle. No other entry types
253 are supported by this API.
254 """
256 cert = self._signing_cert()
258 # Prepare inputs
259 b64_cert = base64.b64encode(
260 cert.public_bytes(encoding=serialization.Encoding.PEM)
261 )
263 # Sign artifact
264 hashed_input = sha256_digest(input_)
266 artifact_signature = self._private_key.sign(
267 hashed_input.digest, ec.ECDSA(hashed_input._as_prehashed())
268 )
270 content = MessageSignature(
271 message_digest=HashOutput(
272 algorithm=hashed_input.algorithm,
273 digest=hashed_input.digest,
274 ),
275 signature=artifact_signature,
276 )
278 # Create the proposed hashedrekord entry
279 proposed_entry = rekor_types.Hashedrekord(
280 spec=rekor_types.hashedrekord.HashedrekordV001Schema(
281 signature=rekor_types.hashedrekord.Signature(
282 content=base64.b64encode(artifact_signature).decode(),
283 public_key=rekor_types.hashedrekord.PublicKey(
284 content=b64_cert.decode()
285 ),
286 ),
287 data=rekor_types.hashedrekord.Data(
288 hash=rekor_types.hashedrekord.Hash(
289 algorithm=hashed_input._as_hashedrekord_algorithm(),
290 value=hashed_input.digest.hex(),
291 )
292 ),
293 ),
294 )
296 return self._finalize_sign(cert, content, proposed_entry)
299class SigningContext:
300 """
301 Keep a context between signing operations.
302 """
304 def __init__(
305 self,
306 *,
307 fulcio: FulcioClient,
308 rekor: RekorClient,
309 trusted_root: TrustedRoot,
310 tsa_clients: list[TimestampAuthorityClient] | None = None,
311 ):
312 """
313 Create a new `SigningContext`.
315 `fulcio` is a `FulcioClient` capable of connecting to a Fulcio instance
316 and returning signing certificates.
318 `rekor` is a `RekorClient` capable of connecting to a Rekor instance
319 and creating transparency log entries.
320 """
321 self._fulcio = fulcio
322 self._rekor = rekor
323 self._trusted_root = trusted_root
324 self._tsa_clients = tsa_clients or []
326 @classmethod
327 def production(cls) -> SigningContext:
328 """
329 Return a `SigningContext` instance configured against Sigstore's production-level services.
330 """
331 return cls(
332 fulcio=FulcioClient.production(),
333 rekor=RekorClient.production(),
334 trusted_root=TrustedRoot.production(),
335 )
337 @classmethod
338 def staging(cls) -> SigningContext:
339 """
340 Return a `SignerContext` instance configured against Sigstore's staging-level services.
341 """
342 return cls(
343 fulcio=FulcioClient.staging(),
344 rekor=RekorClient.staging(),
345 trusted_root=TrustedRoot.staging(),
346 )
348 @classmethod
349 def _from_trust_config(cls, trust_config: ClientTrustConfig) -> SigningContext:
350 """
351 Create a `SigningContext` from the given `ClientTrustConfig`.
353 @api private
354 """
355 return cls(
356 fulcio=FulcioClient(trust_config._inner.signing_config.ca_url),
357 rekor=RekorClient(trust_config._inner.signing_config.tlog_urls[0]),
358 trusted_root=trust_config.trusted_root,
359 tsa_clients=[
360 TimestampAuthorityClient(tsa_url)
361 for tsa_url in trust_config._inner.signing_config.tsa_urls
362 ],
363 )
365 @contextmanager
366 def signer(
367 self, identity_token: IdentityToken, *, cache: bool = True
368 ) -> Iterator[Signer]:
369 """
370 A context manager for signing operations.
372 `identity_token` is the identity token passed to the `Signer` instance
373 and used to request a signing certificate from Fulcio.
375 `cache` determines whether the signing certificate and ephemeral private key
376 generated by the `Signer` instance should be reused (until the certificate expires)
377 to sign different artifacts.
378 Default is `True`.
379 """
380 yield Signer(identity_token, self, cache)