Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sigstore/_utils.py: 31%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright 2022 The Sigstore Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""
16Shared utilities.
17"""
19from __future__ import annotations
21import base64
22import hashlib
23import sys
24from typing import IO, NewType, Union
26from cryptography.hazmat.primitives import serialization
27from cryptography.hazmat.primitives.asymmetric import ec, rsa
28from cryptography.x509 import (
29 Certificate,
30 ExtensionNotFound,
31 Version,
32 load_der_x509_certificate,
33)
34from cryptography.x509.oid import ExtendedKeyUsageOID, ExtensionOID
35from sigstore_protobuf_specs.dev.sigstore.common.v1 import HashAlgorithm
37from sigstore import hashes as sigstore_hashes
38from sigstore.errors import VerificationError
40if sys.version_info < (3, 11):
41 import importlib_resources as resources
42else:
43 from importlib import resources
46PublicKey = Union[rsa.RSAPublicKey, ec.EllipticCurvePublicKey]
48PublicKeyTypes = Union[type[rsa.RSAPublicKey], type[ec.EllipticCurvePublicKey]]
50HexStr = NewType("HexStr", str)
51"""
52A newtype for `str` objects that contain hexadecimal strings (e.g. `ffabcd00ff`).
53"""
54B64Str = NewType("B64Str", str)
55"""
56A newtype for `str` objects that contain base64 encoded strings.
57"""
58KeyID = NewType("KeyID", bytes)
59"""
60A newtype for `bytes` objects that contain a key id.
61"""
64def load_pem_public_key(
65 key_pem: bytes,
66 *,
67 types: tuple[PublicKeyTypes, ...] = (rsa.RSAPublicKey, ec.EllipticCurvePublicKey),
68) -> PublicKey:
69 """
70 A specialization of `cryptography`'s `serialization.load_pem_public_key`
71 with a uniform exception type (`VerificationError`) and filtering on valid key types
72 for Sigstore purposes.
73 """
75 try:
76 key = serialization.load_pem_public_key(key_pem)
77 except Exception as exc:
78 raise VerificationError("could not load PEM-formatted public key") from exc
80 if not isinstance(key, types):
81 raise VerificationError(f"invalid key format: not one of {types}")
83 return key # type: ignore[return-value]
86def load_der_public_key(
87 key_der: bytes,
88 *,
89 types: tuple[PublicKeyTypes, ...] = (rsa.RSAPublicKey, ec.EllipticCurvePublicKey),
90) -> PublicKey:
91 """
92 The `load_pem_public_key` specialization, but DER.
93 """
95 try:
96 key = serialization.load_der_public_key(key_der)
97 except Exception as exc:
98 raise VerificationError("could not load DER-formatted public key") from exc
100 if not isinstance(key, types):
101 raise VerificationError(f"invalid key format: not one of {types}")
103 return key # type: ignore[return-value]
106def base64_encode_pem_cert(cert: Certificate) -> B64Str:
107 """
108 Returns a string containing a base64-encoded PEM-encoded X.509 certificate.
109 """
111 return B64Str(
112 base64.b64encode(cert.public_bytes(serialization.Encoding.PEM)).decode()
113 )
116def cert_der_to_pem(der: bytes) -> str:
117 """
118 Converts a DER-encoded X.509 certificate into its PEM encoding.
120 Returns a string containing a PEM-encoded X.509 certificate.
121 """
123 # NOTE: Technically we don't have to round-trip like this, since
124 # the DER-to-PEM transformation is entirely mechanical.
125 cert = load_der_x509_certificate(der)
126 return cert.public_bytes(serialization.Encoding.PEM).decode()
129def key_id(key: PublicKey) -> KeyID:
130 """
131 Returns an RFC 6962-style "key ID" for the given public key.
133 See: <https://www.rfc-editor.org/rfc/rfc6962#section-3.2>
134 """
135 public_bytes = key.public_bytes(
136 encoding=serialization.Encoding.DER,
137 format=serialization.PublicFormat.SubjectPublicKeyInfo,
138 )
140 return KeyID(hashlib.sha256(public_bytes).digest())
143def sha256_digest(
144 input_: bytes | IO[bytes] | sigstore_hashes.Hashed,
145) -> sigstore_hashes.Hashed:
146 """
147 Compute the SHA256 digest of an input stream or buffer or,
148 if given a `Hashed`, return it directly.
149 """
150 if isinstance(input_, sigstore_hashes.Hashed):
151 return input_
153 # If the input is already buffered into memory, there's no point in
154 # going back through an I/O abstraction.
155 if isinstance(input_, bytes):
156 return sigstore_hashes.Hashed(
157 digest=hashlib.sha256(input_).digest(), algorithm=HashAlgorithm.SHA2_256
158 )
160 return sigstore_hashes.Hashed(
161 digest=_sha256_streaming(input_), algorithm=HashAlgorithm.SHA2_256
162 )
165def _sha256_streaming(io: IO[bytes]) -> bytes:
166 """
167 Compute the SHA256 of a stream.
169 This function does its own internal buffering, so an unbuffered stream
170 should be supplied for optimal performance.
171 """
173 # NOTE: This function performs a SHA256 digest over a stream.
174 # The stream's size is not checked, meaning that the stream's source
175 # is implicitly trusted: if an attacker is able to truncate the stream's
176 # source prematurely, then they could conceivably produce a digest
177 # for a partial stream. This in turn could conceivably result
178 # in a valid signature for an unintended (truncated) input.
179 #
180 # This is currently outside of sigstore-python's threat model: we
181 # assume that the stream is trusted.
182 #
183 # See: https://github.com/sigstore/sigstore-python/pull/329#discussion_r1041215972
185 sha256 = hashlib.sha256()
186 # Per coreutils' ioblksize.h: 128KB performs optimally across a range
187 # of systems in terms of minimizing syscall overhead.
188 view = memoryview(bytearray(128 * 1024))
190 nbytes = io.readinto(view) # type: ignore[attr-defined]
191 while nbytes:
192 sha256.update(view[:nbytes])
193 nbytes = io.readinto(view) # type: ignore[attr-defined]
195 return sha256.digest()
198def read_embedded(name: str, prefix: str) -> bytes:
199 """
200 Read a resource embedded in this distribution of sigstore-python,
201 returning its contents as bytes.
202 """
203 b: bytes = resources.files("sigstore._store").joinpath(prefix, name).read_bytes()
204 return b
207def cert_is_ca(cert: Certificate) -> bool:
208 """
209 Returns `True` if and only if the given `Certificate`
210 is a CA certificate.
212 This function doesn't indicate the trustworthiness of the given
213 `Certificate`, only whether it has the appropriate interior state.
215 This function is **not** naively invertible: users **must** use the
216 dedicated `cert_is_leaf` utility function to determine whether a particular
217 leaf upholds Sigstore's invariants.
218 """
220 # Only v3 certificates should appear in the context of Sigstore;
221 # earlier versions of X.509 lack extensions and have ambiguous CA
222 # behavior.
223 if cert.version != Version.v3:
224 raise VerificationError(f"invalid X.509 version: {cert.version}")
226 # Valid CA certificates must have the following set:
227 #
228 # * `BasicKeyUsage.keyCertSign`
229 # * `BasicConstraints.ca`
230 #
231 # Any other combination of states is inconsistent and invalid, meaning
232 # that we won't consider the certificate a valid non-CA leaf.
234 try:
235 basic_constraints = cert.extensions.get_extension_for_oid(
236 ExtensionOID.BASIC_CONSTRAINTS
237 )
239 # BasicConstraints must be marked as critical, per RFC 5280 4.2.1.9.
240 if not basic_constraints.critical:
241 raise VerificationError(
242 "invalid X.509 certificate: non-critical BasicConstraints in CA"
243 )
245 ca = basic_constraints.value.ca # type: ignore[attr-defined]
246 except ExtensionNotFound:
247 # No BasicConstrains means that this can't possibly be a CA.
248 return False
250 key_cert_sign = False
251 try:
252 key_usage = cert.extensions.get_extension_for_oid(ExtensionOID.KEY_USAGE)
253 key_cert_sign = key_usage.value.key_cert_sign # type: ignore[attr-defined]
254 except ExtensionNotFound:
255 raise VerificationError("invalid X.509 certificate: missing KeyUsage")
257 # If both states are set, this is a CA.
258 if ca and key_cert_sign:
259 return True
261 if not (ca or key_cert_sign):
262 return False
264 # Anything else is an invalid state that should never occur.
265 raise VerificationError(
266 f"invalid X.509 certificate states: KeyUsage.keyCertSign={key_cert_sign}"
267 f", BasicConstraints.ca={ca}"
268 )
271def cert_is_root_ca(cert: Certificate) -> bool:
272 """
273 Returns `True` if and only if the given `Certificate` indicates
274 that it's a root CA.
276 This is **not** a verification function, and it does not establish
277 the trustworthiness of the given certificate.
278 """
280 # NOTE(ww): This function is obnoxiously long to make the different
281 # states explicit.
283 # Only v3 certificates should appear in the context of Sigstore;
284 # earlier versions of X.509 lack extensions and have ambiguous CA
285 # behavior.
286 if cert.version != Version.v3:
287 raise VerificationError(f"invalid X.509 version: {cert.version}")
289 # Non-CAs can't possibly be root CAs.
290 if not cert_is_ca(cert):
291 return False
293 # A certificate that is its own issuer and signer is considered a root CA.
294 try:
295 cert.verify_directly_issued_by(cert)
296 return True
297 except Exception:
298 return False
301def cert_is_leaf(cert: Certificate) -> bool:
302 """
303 Returns `True` if and only if the given `Certificate` is a valid
304 leaf certificate for Sigstore purposes. This means that:
306 * It is not a root or intermediate CA;
307 * It has `KeyUsage.digitalSignature`;
308 * It has `CODE_SIGNING` as an `ExtendedKeyUsage`.
310 This is **not** a verification function, and it does not establish
311 the trustworthiness of the given certificate.
312 """
314 # Only v3 certificates should appear in the context of Sigstore;
315 # earlier versions of X.509 lack extensions and have ambiguous CA
316 # behavior.
317 if cert.version != Version.v3:
318 raise VerificationError(f"invalid X.509 version: {cert.version}")
320 # CAs are not leaves.
321 if cert_is_ca(cert):
322 return False
324 key_usage = cert.extensions.get_extension_for_oid(ExtensionOID.KEY_USAGE)
325 digital_signature = key_usage.value.digital_signature # type: ignore[attr-defined]
327 if not digital_signature:
328 raise VerificationError(
329 "invalid certificate for Sigstore purposes: missing digital signature usage"
330 )
332 # Finally, we check to make sure the leaf has an `ExtendedKeyUsages`
333 # extension that includes a codesigning entitlement. Sigstore should
334 # never issue a leaf that doesn't have this extended usage.
335 try:
336 extended_key_usage = cert.extensions.get_extension_for_oid(
337 ExtensionOID.EXTENDED_KEY_USAGE
338 )
340 return ExtendedKeyUsageOID.CODE_SIGNING in extended_key_usage.value # type: ignore[operator]
341 except ExtensionNotFound:
342 raise VerificationError("invalid X.509 certificate: missing ExtendedKeyUsage")