Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sigstore/_utils.py: 31%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright 2022 The Sigstore Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""
16Shared utilities.
17"""
19from __future__ import annotations
21import base64
22import hashlib
23import sys
24from typing import IO, NewType, Union
25from urllib import parse
27from cryptography.hazmat.primitives import serialization
28from cryptography.hazmat.primitives.asymmetric import ec, ed25519, rsa
29from cryptography.x509 import (
30 Certificate,
31 ExtensionNotFound,
32 Version,
33 load_der_x509_certificate,
34)
35from cryptography.x509.oid import ExtendedKeyUsageOID, ExtensionOID
36from sigstore_protobuf_specs.dev.sigstore.common.v1 import HashAlgorithm
38from sigstore import hashes as sigstore_hashes
39from sigstore.errors import VerificationError
41if sys.version_info < (3, 11):
42 import importlib_resources as resources
43else:
44 from importlib import resources
47PublicKey = Union[rsa.RSAPublicKey, ec.EllipticCurvePublicKey, ed25519.Ed25519PublicKey]
49PublicKeyTypes = Union[
50 type[rsa.RSAPublicKey],
51 type[ec.EllipticCurvePublicKey],
52 type[ed25519.Ed25519PublicKey],
53]
55HexStr = NewType("HexStr", str)
56"""
57A newtype for `str` objects that contain hexadecimal strings (e.g. `ffabcd00ff`).
58"""
59B64Str = NewType("B64Str", str)
60"""
61A newtype for `str` objects that contain base64 encoded strings.
62"""
63KeyID = NewType("KeyID", bytes)
64"""
65A newtype for `bytes` objects that contain a key id.
66"""
69def load_pem_public_key(
70 key_pem: bytes,
71 *,
72 types: tuple[PublicKeyTypes, ...] = (
73 rsa.RSAPublicKey,
74 ec.EllipticCurvePublicKey,
75 ed25519.Ed25519PublicKey,
76 ),
77) -> PublicKey:
78 """
79 A specialization of `cryptography`'s `serialization.load_pem_public_key`
80 with a uniform exception type (`VerificationError`) and filtering on valid key types
81 for Sigstore purposes.
82 """
84 try:
85 key = serialization.load_pem_public_key(key_pem)
86 except Exception as exc:
87 raise VerificationError("could not load PEM-formatted public key") from exc
89 if not isinstance(key, types):
90 raise VerificationError(f"invalid key format: not one of {types}")
92 return key # type: ignore[return-value]
95def load_der_public_key(
96 key_der: bytes,
97 *,
98 types: tuple[PublicKeyTypes, ...] = (
99 rsa.RSAPublicKey,
100 ec.EllipticCurvePublicKey,
101 ed25519.Ed25519PublicKey,
102 ),
103) -> PublicKey:
104 """
105 The `load_pem_public_key` specialization, but DER.
106 """
108 try:
109 key = serialization.load_der_public_key(key_der)
110 except Exception as exc:
111 raise VerificationError("could not load DER-formatted public key") from exc
113 if not isinstance(key, types):
114 raise VerificationError(f"invalid key format: not one of {types}")
116 return key # type: ignore[return-value]
119def base64_encode_pem_cert(cert: Certificate) -> B64Str:
120 """
121 Returns a string containing a base64-encoded PEM-encoded X.509 certificate.
122 """
124 return B64Str(
125 base64.b64encode(cert.public_bytes(serialization.Encoding.PEM)).decode()
126 )
129def cert_der_to_pem(der: bytes) -> str:
130 """
131 Converts a DER-encoded X.509 certificate into its PEM encoding.
133 Returns a string containing a PEM-encoded X.509 certificate.
134 """
136 # NOTE: Technically we don't have to round-trip like this, since
137 # the DER-to-PEM transformation is entirely mechanical.
138 cert = load_der_x509_certificate(der)
139 return cert.public_bytes(serialization.Encoding.PEM).decode()
142def key_id(key: PublicKey) -> KeyID:
143 """
144 Returns an RFC 6962-style "key ID" for the given public key.
146 See: <https://www.rfc-editor.org/rfc/rfc6962#section-3.2>
147 """
148 public_bytes = key.public_bytes(
149 encoding=serialization.Encoding.DER,
150 format=serialization.PublicFormat.SubjectPublicKeyInfo,
151 )
153 return KeyID(hashlib.sha256(public_bytes).digest())
156def sha256_digest(
157 input_: bytes | IO[bytes] | sigstore_hashes.Hashed,
158) -> sigstore_hashes.Hashed:
159 """
160 Compute the SHA256 digest of an input stream or buffer or,
161 if given a `Hashed`, return it directly.
162 """
163 if isinstance(input_, sigstore_hashes.Hashed):
164 return input_
166 # If the input is already buffered into memory, there's no point in
167 # going back through an I/O abstraction.
168 if isinstance(input_, bytes):
169 return sigstore_hashes.Hashed(
170 digest=hashlib.sha256(input_).digest(), algorithm=HashAlgorithm.SHA2_256
171 )
173 return sigstore_hashes.Hashed(
174 digest=_sha256_streaming(input_), algorithm=HashAlgorithm.SHA2_256
175 )
178def _sha256_streaming(io: IO[bytes]) -> bytes:
179 """
180 Compute the SHA256 of a stream.
182 This function does its own internal buffering, so an unbuffered stream
183 should be supplied for optimal performance.
184 """
186 # NOTE: This function performs a SHA256 digest over a stream.
187 # The stream's size is not checked, meaning that the stream's source
188 # is implicitly trusted: if an attacker is able to truncate the stream's
189 # source prematurely, then they could conceivably produce a digest
190 # for a partial stream. This in turn could conceivably result
191 # in a valid signature for an unintended (truncated) input.
192 #
193 # This is currently outside of sigstore-python's threat model: we
194 # assume that the stream is trusted.
195 #
196 # See: https://github.com/sigstore/sigstore-python/pull/329#discussion_r1041215972
198 sha256 = hashlib.sha256()
199 # Per coreutils' ioblksize.h: 128KB performs optimally across a range
200 # of systems in terms of minimizing syscall overhead.
201 view = memoryview(bytearray(128 * 1024))
203 nbytes = io.readinto(view) # type: ignore[attr-defined]
204 while nbytes:
205 sha256.update(view[:nbytes])
206 nbytes = io.readinto(view) # type: ignore[attr-defined]
208 return sha256.digest()
211def read_embedded(name: str, url: str) -> bytes:
212 """
213 Read a resource for a given TUF repository embedded in this distribution
214 of sigstore-python, returning its contents as bytes.
215 """
216 embed_dir = parse.quote(url, safe="")
217 b: bytes = resources.files("sigstore._store").joinpath(embed_dir, name).read_bytes()
218 return b
221def cert_is_ca(cert: Certificate) -> bool:
222 """
223 Returns `True` if and only if the given `Certificate`
224 is a CA certificate.
226 This function doesn't indicate the trustworthiness of the given
227 `Certificate`, only whether it has the appropriate interior state.
229 This function is **not** naively invertible: users **must** use the
230 dedicated `cert_is_leaf` utility function to determine whether a particular
231 leaf upholds Sigstore's invariants.
232 """
234 # Only v3 certificates should appear in the context of Sigstore;
235 # earlier versions of X.509 lack extensions and have ambiguous CA
236 # behavior.
237 if cert.version != Version.v3:
238 raise VerificationError(f"invalid X.509 version: {cert.version}")
240 # Valid CA certificates must have the following set:
241 #
242 # * `BasicKeyUsage.keyCertSign`
243 # * `BasicConstraints.ca`
244 #
245 # Any other combination of states is inconsistent and invalid, meaning
246 # that we won't consider the certificate a valid non-CA leaf.
248 try:
249 basic_constraints = cert.extensions.get_extension_for_oid(
250 ExtensionOID.BASIC_CONSTRAINTS
251 )
253 # BasicConstraints must be marked as critical, per RFC 5280 4.2.1.9.
254 if not basic_constraints.critical:
255 raise VerificationError(
256 "invalid X.509 certificate: non-critical BasicConstraints in CA"
257 )
259 ca = basic_constraints.value.ca # type: ignore[attr-defined]
260 except ExtensionNotFound:
261 # No BasicConstrains means that this can't possibly be a CA.
262 return False
264 key_cert_sign = False
265 try:
266 key_usage = cert.extensions.get_extension_for_oid(ExtensionOID.KEY_USAGE)
267 key_cert_sign = key_usage.value.key_cert_sign # type: ignore[attr-defined]
268 except ExtensionNotFound:
269 raise VerificationError("invalid X.509 certificate: missing KeyUsage")
271 # If both states are set, this is a CA.
272 if ca and key_cert_sign:
273 return True
275 if not (ca or key_cert_sign):
276 return False
278 # Anything else is an invalid state that should never occur.
279 raise VerificationError(
280 f"invalid X.509 certificate states: KeyUsage.keyCertSign={key_cert_sign}"
281 f", BasicConstraints.ca={ca}"
282 )
285def cert_is_root_ca(cert: Certificate) -> bool:
286 """
287 Returns `True` if and only if the given `Certificate` indicates
288 that it's a root CA.
290 This is **not** a verification function, and it does not establish
291 the trustworthiness of the given certificate.
292 """
294 # NOTE(ww): This function is obnoxiously long to make the different
295 # states explicit.
297 # Only v3 certificates should appear in the context of Sigstore;
298 # earlier versions of X.509 lack extensions and have ambiguous CA
299 # behavior.
300 if cert.version != Version.v3:
301 raise VerificationError(f"invalid X.509 version: {cert.version}")
303 # Non-CAs can't possibly be root CAs.
304 if not cert_is_ca(cert):
305 return False
307 # A certificate that is its own issuer and signer is considered a root CA.
308 try:
309 cert.verify_directly_issued_by(cert)
310 return True
311 except Exception:
312 return False
315def cert_is_leaf(cert: Certificate) -> bool:
316 """
317 Returns `True` if and only if the given `Certificate` is a valid
318 leaf certificate for Sigstore purposes. This means that:
320 * It is not a root or intermediate CA;
321 * It has `KeyUsage.digitalSignature`;
322 * It has `CODE_SIGNING` as an `ExtendedKeyUsage`.
324 This is **not** a verification function, and it does not establish
325 the trustworthiness of the given certificate.
326 """
328 # Only v3 certificates should appear in the context of Sigstore;
329 # earlier versions of X.509 lack extensions and have ambiguous CA
330 # behavior.
331 if cert.version != Version.v3:
332 raise VerificationError(f"invalid X.509 version: {cert.version}")
334 # CAs are not leaves.
335 if cert_is_ca(cert):
336 return False
338 key_usage = cert.extensions.get_extension_for_oid(ExtensionOID.KEY_USAGE)
339 digital_signature = key_usage.value.digital_signature # type: ignore[attr-defined]
341 if not digital_signature:
342 raise VerificationError(
343 "invalid certificate for Sigstore purposes: missing digital signature usage"
344 )
346 # Finally, we check to make sure the leaf has an `ExtendedKeyUsages`
347 # extension that includes a codesigning entitlement. Sigstore should
348 # never issue a leaf that doesn't have this extended usage.
349 try:
350 extended_key_usage = cert.extensions.get_extension_for_oid(
351 ExtensionOID.EXTENDED_KEY_USAGE
352 )
354 return ExtendedKeyUsageOID.CODE_SIGNING in extended_key_usage.value # type: ignore[operator]
355 except ExtensionNotFound:
356 raise VerificationError("invalid X.509 certificate: missing ExtendedKeyUsage")