Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sigstore/_utils.py: 41%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright 2022 The Sigstore Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""
16Shared utilities.
17"""
19from __future__ import annotations
21import base64
22import hashlib
23import sys
24from datetime import datetime, timezone
25from typing import IO, NewType, Union
26from urllib import parse
28from cryptography.hazmat.primitives import serialization
29from cryptography.hazmat.primitives.asymmetric import ec, ed25519, rsa
30from cryptography.x509 import (
31 Certificate,
32 ExtensionNotFound,
33 Version,
34 load_der_x509_certificate,
35)
36from cryptography.x509.oid import ExtendedKeyUsageOID, ExtensionOID
37from sigstore_models.common.v1 import HashAlgorithm, TimeRange
39from sigstore import hashes as sigstore_hashes
40from sigstore.errors import VerificationError
42if sys.version_info < (3, 11):
43 import importlib_resources as resources
44else:
45 from importlib import resources
48PublicKey = Union[rsa.RSAPublicKey, ec.EllipticCurvePublicKey, ed25519.Ed25519PublicKey]
50PublicKeyTypes = Union[
51 type[rsa.RSAPublicKey],
52 type[ec.EllipticCurvePublicKey],
53 type[ed25519.Ed25519PublicKey],
54]
56HexStr = NewType("HexStr", str)
57"""
58A newtype for `str` objects that contain hexadecimal strings (e.g. `ffabcd00ff`).
59"""
60B64Str = NewType("B64Str", str)
61"""
62A newtype for `str` objects that contain base64 encoded strings.
63"""
64KeyID = NewType("KeyID", bytes)
65"""
66A newtype for `bytes` objects that contain a key id.
67"""
70def load_pem_public_key(
71 key_pem: bytes,
72 *,
73 types: tuple[PublicKeyTypes, ...] = (
74 rsa.RSAPublicKey,
75 ec.EllipticCurvePublicKey,
76 ed25519.Ed25519PublicKey,
77 ),
78) -> PublicKey:
79 """
80 A specialization of `cryptography`'s `serialization.load_pem_public_key`
81 with a uniform exception type (`VerificationError`) and filtering on valid key types
82 for Sigstore purposes.
83 """
85 try:
86 key = serialization.load_pem_public_key(key_pem)
87 except Exception as exc:
88 raise VerificationError("could not load PEM-formatted public key") from exc
90 if not isinstance(key, types):
91 raise VerificationError(f"invalid key format: not one of {types}")
93 return key # type: ignore[return-value]
96def load_der_public_key(
97 key_der: bytes,
98 *,
99 types: tuple[PublicKeyTypes, ...] = (
100 rsa.RSAPublicKey,
101 ec.EllipticCurvePublicKey,
102 ed25519.Ed25519PublicKey,
103 ),
104) -> PublicKey:
105 """
106 The `load_pem_public_key` specialization, but DER.
107 """
109 try:
110 key = serialization.load_der_public_key(key_der)
111 except Exception as exc:
112 raise VerificationError("could not load DER-formatted public key") from exc
114 if not isinstance(key, types):
115 raise VerificationError(f"invalid key format: not one of {types}")
117 return key # type: ignore[return-value]
120def base64_encode_pem_cert(cert: Certificate) -> B64Str:
121 """
122 Returns a string containing a base64-encoded PEM-encoded X.509 certificate.
123 """
125 return B64Str(
126 base64.b64encode(cert.public_bytes(serialization.Encoding.PEM)).decode()
127 )
130def cert_der_to_pem(der: bytes) -> str:
131 """
132 Converts a DER-encoded X.509 certificate into its PEM encoding.
134 Returns a string containing a PEM-encoded X.509 certificate.
135 """
137 # NOTE: Technically we don't have to round-trip like this, since
138 # the DER-to-PEM transformation is entirely mechanical.
139 cert = load_der_x509_certificate(der)
140 return cert.public_bytes(serialization.Encoding.PEM).decode()
143def key_id(key: PublicKey) -> KeyID:
144 """
145 Returns an RFC 6962-style "key ID" for the given public key.
147 See: <https://www.rfc-editor.org/rfc/rfc6962#section-3.2>
148 """
149 public_bytes = key.public_bytes(
150 encoding=serialization.Encoding.DER,
151 format=serialization.PublicFormat.SubjectPublicKeyInfo,
152 )
154 return KeyID(hashlib.sha256(public_bytes).digest())
157def sha256_digest(
158 input_: bytes | IO[bytes] | sigstore_hashes.Hashed,
159) -> sigstore_hashes.Hashed:
160 """
161 Compute the SHA256 digest of an input stream or buffer or,
162 if given a `Hashed`, return it directly.
163 """
164 if isinstance(input_, sigstore_hashes.Hashed):
165 return input_
167 # If the input is already buffered into memory, there's no point in
168 # going back through an I/O abstraction.
169 if isinstance(input_, bytes):
170 return sigstore_hashes.Hashed(
171 digest=hashlib.sha256(input_).digest(), algorithm=HashAlgorithm.SHA2_256
172 )
174 return sigstore_hashes.Hashed(
175 digest=_sha256_streaming(input_), algorithm=HashAlgorithm.SHA2_256
176 )
179def _sha256_streaming(io: IO[bytes]) -> bytes:
180 """
181 Compute the SHA256 of a stream.
183 This function does its own internal buffering, so an unbuffered stream
184 should be supplied for optimal performance.
185 """
187 # NOTE: This function performs a SHA256 digest over a stream.
188 # The stream's size is not checked, meaning that the stream's source
189 # is implicitly trusted: if an attacker is able to truncate the stream's
190 # source prematurely, then they could conceivably produce a digest
191 # for a partial stream. This in turn could conceivably result
192 # in a valid signature for an unintended (truncated) input.
193 #
194 # This is currently outside of sigstore-python's threat model: we
195 # assume that the stream is trusted.
196 #
197 # See: https://github.com/sigstore/sigstore-python/pull/329#discussion_r1041215972
199 sha256 = hashlib.sha256()
200 # Per coreutils' ioblksize.h: 128KB performs optimally across a range
201 # of systems in terms of minimizing syscall overhead.
202 view = memoryview(bytearray(128 * 1024))
204 nbytes = io.readinto(view) # type: ignore[attr-defined]
205 while nbytes:
206 sha256.update(view[:nbytes])
207 nbytes = io.readinto(view) # type: ignore[attr-defined]
209 return sha256.digest()
212def read_embedded(name: str, url: str) -> bytes:
213 """
214 Read a resource for a given TUF repository embedded in this distribution
215 of sigstore-python, returning its contents as bytes.
216 """
217 embed_dir = parse.quote(url, safe="")
218 b: bytes = resources.files("sigstore._store").joinpath(embed_dir, name).read_bytes()
219 return b
222def cert_is_ca(cert: Certificate) -> bool:
223 """
224 Returns `True` if and only if the given `Certificate`
225 is a CA certificate.
227 This function doesn't indicate the trustworthiness of the given
228 `Certificate`, only whether it has the appropriate interior state.
230 This function is **not** naively invertible: users **must** use the
231 dedicated `cert_is_leaf` utility function to determine whether a particular
232 leaf upholds Sigstore's invariants.
233 """
235 # Only v3 certificates should appear in the context of Sigstore;
236 # earlier versions of X.509 lack extensions and have ambiguous CA
237 # behavior.
238 if cert.version != Version.v3:
239 raise VerificationError(f"invalid X.509 version: {cert.version}")
241 # Valid CA certificates must have the following set:
242 #
243 # * `BasicKeyUsage.keyCertSign`
244 # * `BasicConstraints.ca`
245 #
246 # Any other combination of states is inconsistent and invalid, meaning
247 # that we won't consider the certificate a valid non-CA leaf.
249 try:
250 basic_constraints = cert.extensions.get_extension_for_oid(
251 ExtensionOID.BASIC_CONSTRAINTS
252 )
254 # BasicConstraints must be marked as critical, per RFC 5280 4.2.1.9.
255 if not basic_constraints.critical:
256 raise VerificationError(
257 "invalid X.509 certificate: non-critical BasicConstraints in CA"
258 )
260 ca = basic_constraints.value.ca # type: ignore[attr-defined]
261 except ExtensionNotFound:
262 # No BasicConstrains means that this can't possibly be a CA.
263 return False
265 key_cert_sign = False
266 try:
267 key_usage = cert.extensions.get_extension_for_oid(ExtensionOID.KEY_USAGE)
268 key_cert_sign = key_usage.value.key_cert_sign # type: ignore[attr-defined]
269 except ExtensionNotFound:
270 raise VerificationError("invalid X.509 certificate: missing KeyUsage")
272 # If both states are set, this is a CA.
273 if ca and key_cert_sign:
274 return True
276 if not (ca or key_cert_sign):
277 return False
279 # Anything else is an invalid state that should never occur.
280 raise VerificationError(
281 f"invalid X.509 certificate states: KeyUsage.keyCertSign={key_cert_sign}"
282 f", BasicConstraints.ca={ca}"
283 )
286def cert_is_root_ca(cert: Certificate) -> bool:
287 """
288 Returns `True` if and only if the given `Certificate` indicates
289 that it's a root CA.
291 This is **not** a verification function, and it does not establish
292 the trustworthiness of the given certificate.
293 """
295 # NOTE(ww): This function is obnoxiously long to make the different
296 # states explicit.
298 # Only v3 certificates should appear in the context of Sigstore;
299 # earlier versions of X.509 lack extensions and have ambiguous CA
300 # behavior.
301 if cert.version != Version.v3:
302 raise VerificationError(f"invalid X.509 version: {cert.version}")
304 # Non-CAs can't possibly be root CAs.
305 if not cert_is_ca(cert):
306 return False
308 # A certificate that is its own issuer and signer is considered a root CA.
309 try:
310 cert.verify_directly_issued_by(cert)
311 return True
312 except Exception:
313 return False
316def cert_is_leaf(cert: Certificate) -> bool:
317 """
318 Returns `True` if and only if the given `Certificate` is a valid
319 leaf certificate for Sigstore purposes. This means that:
321 * It is not a root or intermediate CA;
322 * It has `KeyUsage.digitalSignature`;
323 * It has `CODE_SIGNING` as an `ExtendedKeyUsage`.
325 This is **not** a verification function, and it does not establish
326 the trustworthiness of the given certificate.
327 """
329 # Only v3 certificates should appear in the context of Sigstore;
330 # earlier versions of X.509 lack extensions and have ambiguous CA
331 # behavior.
332 if cert.version != Version.v3:
333 raise VerificationError(f"invalid X.509 version: {cert.version}")
335 # CAs are not leaves.
336 if cert_is_ca(cert):
337 return False
339 key_usage = cert.extensions.get_extension_for_oid(ExtensionOID.KEY_USAGE)
340 digital_signature = key_usage.value.digital_signature # type: ignore[attr-defined]
342 if not digital_signature:
343 raise VerificationError(
344 "invalid certificate for Sigstore purposes: missing digital signature usage"
345 )
347 # Finally, we check to make sure the leaf has an `ExtendedKeyUsages`
348 # extension that includes a codesigning entitlement. Sigstore should
349 # never issue a leaf that doesn't have this extended usage.
350 try:
351 extended_key_usage = cert.extensions.get_extension_for_oid(
352 ExtensionOID.EXTENDED_KEY_USAGE
353 )
355 return ExtendedKeyUsageOID.CODE_SIGNING in extended_key_usage.value # type: ignore[operator]
356 except ExtensionNotFound:
357 raise VerificationError("invalid X.509 certificate: missing ExtendedKeyUsage")
360def is_timerange_valid(period: TimeRange | None, *, allow_expired: bool) -> bool:
361 """
362 Given a `period`, checks that the the current time is not before `start`. If
363 `allow_expired` is `False`, also checks that the current time is not after
364 `end`.
365 """
366 now = datetime.now(timezone.utc)
368 # If there was no validity period specified, the key is always valid.
369 if not period:
370 return True
372 # Active: if the current time is before the starting period, we are not yet
373 # valid.
374 if now < period.start:
375 return False
377 # If we want Expired keys, the key is valid at this point. Otherwise, check
378 # that we are within range.
379 return allow_expired or (period.end is None or now <= period.end)