Coverage for /pythoncovmergedfiles/medio/medio/src/model-transparency/tests/fuzzing/fuzz_with_cert_chain.py: 71%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1###### Coverage stub
2import atexit
3import coverage
4cov = coverage.coverage(data_file='.coverage', cover_pylib=True)
5cov.start()
6# Register an exist handler that will print coverage
7def exit_handler():
8 cov.stop()
9 cov.save()
10atexit.register(exit_handler)
11####### End of coverage stub
12# Copyright 2025 The Sigstore Authors
13#
14# Licensed under the Apache License, Version 2.0 (the "License");
15# you may not use this file except in compliance with the License.
16# You may obtain a copy of the License at
17#
18# http://www.apache.org/licenses/LICENSE-2.0
19#
20# Unless required by applicable law or agreed to in writing, software
21# distributed under the License is distributed on an "AS IS" BASIS,
22# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
23# See the License for the specific language governing permissions and
24# limitations under the License.
26import datetime as dt
27from pathlib import Path
28import shutil
29import sys
30import tempfile
32import atheris # type: ignore
33from cryptography import x509
34from cryptography.exceptions import UnsupportedAlgorithm
35from cryptography.hazmat.primitives import hashes
36from cryptography.hazmat.primitives import serialization
37from cryptography.hazmat.primitives.asymmetric import ec
38from cryptography.hazmat.primitives.asymmetric import rsa
39from cryptography.x509.oid import ExtendedKeyUsageOID
40from cryptography.x509.oid import NameOID
41from utils import create_fuzz_files
43from model_signing import hashing
44from model_signing import signing
45from model_signing import verifying
48def _rand_utf8(
49 fdp: atheris.FuzzedDataProvider, min_len: int = 1, max_len: int = 32
50) -> str:
51 n = fdp.ConsumeIntInRange(min_len, max_len)
52 data = fdp.ConsumeBytes(n)
53 if not data:
54 return "x"
55 s = "".join(chr(32 + (c % 95)) for c in data).strip()
56 return s or "x"
59def gen_private_key(fdp: atheris.FuzzedDataProvider):
60 """Generate RSA or EC private key using fuzz data (for CAs)."""
61 if fdp.ConsumeBool():
62 curve = fdp.PickValueInList(
63 [ec.SECP256R1(), ec.SECP384R1(), ec.SECP521R1()]
64 )
65 return ec.generate_private_key(curve)
66 key_size = fdp.PickValueInList([1024, 2048])
67 return rsa.generate_private_key(public_exponent=65537, key_size=key_size)
70def gen_ec_key(fdp: atheris.FuzzedDataProvider):
71 """Generate an EC private key (for the leaf)."""
72 curve = fdp.PickValueInList(
73 [ec.SECP256R1(), ec.SECP384R1(), ec.SECP521R1()]
74 )
75 return ec.generate_private_key(curve)
78def gen_name(fdp: atheris.FuzzedDataProvider) -> x509.Name:
79 attrs = [x509.NameAttribute(NameOID.COMMON_NAME, _rand_utf8(fdp, 3, 40))]
80 if fdp.ConsumeBool():
81 attrs.append(
82 x509.NameAttribute(
83 NameOID.ORGANIZATION_NAME, _rand_utf8(fdp, 2, 20)
84 )
85 )
86 if fdp.ConsumeBool():
87 attrs.append(
88 x509.NameAttribute(
89 NameOID.ORGANIZATIONAL_UNIT_NAME, _rand_utf8(fdp, 2, 20)
90 )
91 )
92 if fdp.ConsumeBool():
93 cb = fdp.ConsumeBytes(2) or b"US"
94 country = "".join(chr(ord("A") + (b % 26)) for b in cb)
95 attrs.append(x509.NameAttribute(NameOID.COUNTRY_NAME, country))
96 if fdp.ConsumeBool():
97 attrs.append(
98 x509.NameAttribute(NameOID.LOCALITY_NAME, _rand_utf8(fdp, 2, 20))
99 )
100 return x509.Name(attrs)
103def _ski(public_key) -> x509.SubjectKeyIdentifier:
104 return x509.SubjectKeyIdentifier.from_public_key(public_key)
107def deterministic_serial(fdp: atheris.FuzzedDataProvider) -> int:
108 """Deterministic, positive serial (≤159 bits, non-zero) from input."""
109 length = fdp.ConsumeIntInRange(1, 20)
110 b = fdp.ConsumeBytes(length)
111 if not b:
112 b = b"\x01"
113 val = int.from_bytes(b, "big") & ((1 << 159) - 1)
114 return val or 1
117def deterministic_validity(
118 fdp: atheris.FuzzedDataProvider,
119) -> tuple[dt.datetime, dt.datetime]:
120 """Validity window derived solely from fuzz input (no wall clock)."""
121 base = dt.datetime(2000, 1, 1, tzinfo=dt.timezone.utc)
122 start_days = fdp.ConsumeIntInRange(0, 9000)
123 not_before = base + dt.timedelta(days=start_days)
124 lifetime_days = fdp.ConsumeIntInRange(30, 3650)
125 not_after = not_before + dt.timedelta(days=lifetime_days)
126 return not_before, not_after
129def _pick_sig_hash(fdp: atheris.FuzzedDataProvider):
130 return fdp.PickValueInList(
131 [hashes.SHA256(), hashes.SHA384(), hashes.SHA512()]
132 )
135def build_valid_chain(
136 fdp: atheris.FuzzedDataProvider,
137) -> tuple[x509.Certificate, object, list[x509.Certificate]]:
138 """Build a valid chain: root -> 0..3 intermediates -> leaf (depth 1..5).
140 Returns (leaf_cert, leaf_key, issuers_chain) where issuers_chain is
141 [nearest_intermediate, ..., root] and does NOT include the leaf.
142 """
143 depth = fdp.ConsumeIntInRange(1, 5)
144 not_before, not_after = deterministic_validity(fdp)
146 # Root CA
147 root_key = gen_private_key(fdp)
148 root_name = gen_name(fdp)
149 root_builder = (
150 x509.CertificateBuilder()
151 .subject_name(root_name)
152 .issuer_name(root_name)
153 .public_key(root_key.public_key())
154 .serial_number(deterministic_serial(fdp))
155 .not_valid_before(not_before)
156 .not_valid_after(not_after)
157 .add_extension(
158 x509.BasicConstraints(ca=True, path_length=depth - 1), critical=True
159 )
160 .add_extension(
161 x509.KeyUsage(
162 digital_signature=False,
163 content_commitment=False,
164 key_encipherment=False,
165 data_encipherment=False,
166 key_agreement=False,
167 key_cert_sign=True,
168 crl_sign=True,
169 encipher_only=False,
170 decipher_only=False,
171 ),
172 critical=True,
173 )
174 .add_extension(_ski(root_key.public_key()), critical=False)
175 )
176 root_cert = root_builder.sign(
177 private_key=root_key, algorithm=_pick_sig_hash(fdp)
178 )
180 issuer_key = root_key
181 issuer_cert = root_cert
182 issuers: list[x509.Certificate] = [root_cert]
184 # Intermediates
185 for i in range(depth - 1):
186 key = gen_private_key(fdp)
187 name = gen_name(fdp)
188 inter_builder = (
189 x509.CertificateBuilder()
190 .subject_name(name)
191 .issuer_name(issuer_cert.subject)
192 .public_key(key.public_key())
193 .serial_number(deterministic_serial(fdp))
194 .not_valid_before(not_before)
195 .not_valid_after(not_after)
196 .add_extension(
197 x509.BasicConstraints(ca=True, path_length=(depth - 2 - i)),
198 critical=True,
199 )
200 .add_extension(
201 x509.KeyUsage(
202 digital_signature=False,
203 content_commitment=False,
204 key_encipherment=False,
205 data_encipherment=False,
206 key_agreement=False,
207 key_cert_sign=True,
208 crl_sign=True,
209 encipher_only=False,
210 decipher_only=False,
211 ),
212 critical=True,
213 )
214 .add_extension(_ski(key.public_key()), critical=False)
215 .add_extension(
216 x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier(
217 x509.SubjectKeyIdentifier.from_public_key(
218 issuer_key.public_key()
219 )
220 ),
221 critical=False,
222 )
223 )
224 inter_cert = inter_builder.sign(
225 private_key=issuer_key, algorithm=_pick_sig_hash(fdp)
226 )
227 issuer_key = key
228 issuer_cert = inter_cert
229 issuers.insert(0, inter_cert) # nearest first
231 # Leaf (code signing) — ALWAYS EC to satisfy signer expectations
232 leaf_key = gen_ec_key(fdp)
233 leaf_name = gen_name(fdp)
234 leaf_builder = (
235 x509.CertificateBuilder()
236 .subject_name(leaf_name)
237 .issuer_name(issuer_cert.subject)
238 .public_key(leaf_key.public_key())
239 .serial_number(deterministic_serial(fdp))
240 .not_valid_before(not_before)
241 .not_valid_after(not_after)
242 .add_extension(
243 x509.BasicConstraints(ca=False, path_length=None), critical=True
244 )
245 .add_extension(
246 x509.KeyUsage(
247 digital_signature=True,
248 content_commitment=True,
249 key_encipherment=False,
250 data_encipherment=False,
251 key_agreement=False,
252 key_cert_sign=False,
253 crl_sign=False,
254 encipher_only=False,
255 decipher_only=False,
256 ),
257 critical=True,
258 )
259 .add_extension(
260 x509.ExtendedKeyUsage([ExtendedKeyUsageOID.CODE_SIGNING]),
261 critical=False,
262 )
263 .add_extension(_ski(leaf_key.public_key()), critical=False)
264 .add_extension(
265 x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier(
266 x509.SubjectKeyIdentifier.from_public_key(
267 issuer_key.public_key()
268 )
269 ),
270 critical=False,
271 )
272 )
273 leaf_cert = leaf_builder.sign(
274 private_key=issuer_key, algorithm=_pick_sig_hash(fdp)
275 )
277 return leaf_cert, leaf_key, issuers
280def to_pem_cert(cert: x509.Certificate) -> bytes:
281 return cert.public_bytes(encoding=serialization.Encoding.PEM)
284def key_to_pem(priv: rsa.RSAPrivateKey | ec.EllipticCurvePrivateKey) -> bytes:
285 return priv.private_bytes(
286 encoding=serialization.Encoding.PEM,
287 format=serialization.PrivateFormat.PKCS8,
288 encryption_algorithm=serialization.NoEncryption(),
289 )
292def _build_hashing_config_from_fdp(
293 fdp: atheris.FuzzedDataProvider,
294 extra_ignores: list[Path],
295 signature_path: Path,
296) -> hashing.Config:
297 alg = ["sha256", "blake2", "blake3"][fdp.ConsumeIntInRange(0, 2)]
298 hcfg = hashing.Config().set_ignored_paths(
299 paths=[*list(extra_ignores), signature_path],
300 ignore_git_paths=fdp.ConsumeBool(),
301 )
302 if fdp.ConsumeBool():
303 hcfg.use_file_serialization(hashing_algorithm=alg)
304 else:
305 hcfg.use_shard_serialization(hashing_algorithm=alg)
306 return hcfg
309def TestOneInput(data: bytes):
310 fdp = atheris.FuzzedDataProvider(data)
312 # 1) Build certs & keys (catch x509 construction errors)
313 try:
314 leaf_cert, leaf_key, issuers = build_valid_chain(fdp)
315 except (
316 ValueError,
317 TypeError,
318 x509.DuplicateExtension,
319 x509.UnsupportedGeneralNameType,
320 UnsupportedAlgorithm,
321 ):
322 return # skip this testcase; invalid X.509
324 # 2) Convert to PEM and write to disk (catch serialization errors)
325 workdir = tempfile.mkdtemp(prefix="fuzz_cert_")
326 try:
327 leaf_key_path = Path(workdir) / "leaf-key.pem"
328 leaf_cert_path = Path(workdir) / "leaf-cert.pem"
329 chain_paths: list[Path] = []
331 with open(leaf_key_path, "wb") as f:
332 f.write(key_to_pem(leaf_key))
333 with open(leaf_cert_path, "wb") as f:
334 f.write(to_pem_cert(leaf_cert))
336 for idx, cert in enumerate(issuers):
337 p = Path(workdir) / f"chain-{idx}.pem"
338 with open(p, "wb") as f:
339 f.write(to_pem_cert(cert))
340 chain_paths.append(p)
342 # Check chain length before signing and verifying
343 if len(chain_paths) <= 1:
344 shutil.rmtree(workdir, ignore_errors=True)
345 return
347 except (ValueError, TypeError, UnsupportedAlgorithm):
348 shutil.rmtree(workdir, ignore_errors=True)
349 return
351 # 3) Create model files
352 model_path_dir = tempfile.mkdtemp(prefix="fuzz_model_")
353 model_path_p = Path(model_path_dir)
354 created_files = create_fuzz_files(model_path_p, fdp)
355 if created_files == 0:
356 return
358 # Signature output path (we ignore this when signing and verifying)
359 fname = f"signature-{_rand_utf8(fdp, 3, 12).replace('/', '_')}.sig"
360 signature_path = model_path_p / fname
362 # Ignores (collected for hashing config)
363 extra_ignores: list[Path] = []
365 # Build hashing config (serialization + algorithm + ignores)
366 hcfg = _build_hashing_config_from_fdp(fdp, extra_ignores, signature_path)
368 # 4) Sign and 5) Verify
369 try:
370 signing.Config().use_certificate_signer(
371 private_key=leaf_key_path,
372 signing_certificate=leaf_cert_path,
373 certificate_chain=chain_paths,
374 ).set_hashing_config(hcfg).sign(model_path_p, signature_path)
376 verifying.Config().use_certificate_verifier(
377 certificate_chain=chain_paths, log_fingerprints=False
378 ).set_hashing_config(hcfg).verify(model_path_p, signature_path)
380 finally:
381 # Always clean up temp dirs
382 shutil.rmtree(model_path_dir, ignore_errors=True)
383 shutil.rmtree(workdir, ignore_errors=True)
386def main() -> None:
387 atheris.instrument_all()
388 atheris.Setup(sys.argv, TestOneInput)
389 atheris.Fuzz()
392if __name__ == "__main__":
393 main()