1###### Coverage stub
2import atexit
3import coverage
4cov = coverage.coverage(data_file='.coverage', cover_pylib=True)
5cov.start()
6# Register an exist handler that will print coverage
7def exit_handler():
8 cov.stop()
9 cov.save()
10atexit.register(exit_handler)
11####### End of coverage stub
12# Copyright 2025 The Sigstore Authors
13#
14# Licensed under the Apache License, Version 2.0 (the "License");
15# you may not use this file except in compliance with the License.
16# You may obtain a copy of the License at
17#
18# http://www.apache.org/licenses/LICENSE-2.0
19#
20# Unless required by applicable law or agreed to in writing, software
21# distributed under the License is distributed on an "AS IS" BASIS,
22# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
23# See the License for the specific language governing permissions and
24# limitations under the License.
25
26import datetime as dt
27from pathlib import Path
28import shutil
29import sys
30import tempfile
31
32import atheris # type: ignore
33from cryptography import x509
34from cryptography.exceptions import UnsupportedAlgorithm
35from cryptography.hazmat.primitives import hashes
36from cryptography.hazmat.primitives import serialization
37from cryptography.hazmat.primitives.asymmetric import ec
38from cryptography.hazmat.primitives.asymmetric import rsa
39from cryptography.x509.oid import ExtendedKeyUsageOID
40from cryptography.x509.oid import NameOID
41from utils import create_fuzz_files
42
43from model_signing import hashing
44from model_signing import signing
45from model_signing import verifying
46
47
48def _rand_utf8(
49 fdp: atheris.FuzzedDataProvider, min_len: int = 1, max_len: int = 32
50) -> str:
51 n = fdp.ConsumeIntInRange(min_len, max_len)
52 data = fdp.ConsumeBytes(n)
53 if not data:
54 return "x"
55 s = "".join(chr(32 + (c % 95)) for c in data).strip()
56 return s or "x"
57
58
59def gen_private_key(fdp: atheris.FuzzedDataProvider):
60 """Generate RSA or EC private key using fuzz data (for CAs)."""
61 if fdp.ConsumeBool():
62 curve = fdp.PickValueInList(
63 [ec.SECP256R1(), ec.SECP384R1(), ec.SECP521R1()]
64 )
65 return ec.generate_private_key(curve)
66 key_size = fdp.PickValueInList([1024, 2048])
67 return rsa.generate_private_key(public_exponent=65537, key_size=key_size)
68
69
70def gen_ec_key(fdp: atheris.FuzzedDataProvider):
71 """Generate an EC private key (for the leaf)."""
72 curve = fdp.PickValueInList(
73 [ec.SECP256R1(), ec.SECP384R1(), ec.SECP521R1()]
74 )
75 return ec.generate_private_key(curve)
76
77
78def gen_name(fdp: atheris.FuzzedDataProvider) -> x509.Name:
79 attrs = [x509.NameAttribute(NameOID.COMMON_NAME, _rand_utf8(fdp, 3, 40))]
80 if fdp.ConsumeBool():
81 attrs.append(
82 x509.NameAttribute(
83 NameOID.ORGANIZATION_NAME, _rand_utf8(fdp, 2, 20)
84 )
85 )
86 if fdp.ConsumeBool():
87 attrs.append(
88 x509.NameAttribute(
89 NameOID.ORGANIZATIONAL_UNIT_NAME, _rand_utf8(fdp, 2, 20)
90 )
91 )
92 if fdp.ConsumeBool():
93 cb = fdp.ConsumeBytes(2) or b"US"
94 country = "".join(chr(ord("A") + (b % 26)) for b in cb)
95 attrs.append(x509.NameAttribute(NameOID.COUNTRY_NAME, country))
96 if fdp.ConsumeBool():
97 attrs.append(
98 x509.NameAttribute(NameOID.LOCALITY_NAME, _rand_utf8(fdp, 2, 20))
99 )
100 return x509.Name(attrs)
101
102
103def _ski(public_key) -> x509.SubjectKeyIdentifier:
104 return x509.SubjectKeyIdentifier.from_public_key(public_key)
105
106
107def deterministic_serial(fdp: atheris.FuzzedDataProvider) -> int:
108 """Deterministic, positive serial (≤159 bits, non-zero) from input."""
109 length = fdp.ConsumeIntInRange(1, 20)
110 b = fdp.ConsumeBytes(length)
111 if not b:
112 b = b"\x01"
113 val = int.from_bytes(b, "big") & ((1 << 159) - 1)
114 return val or 1
115
116
117def deterministic_validity(
118 fdp: atheris.FuzzedDataProvider,
119) -> tuple[dt.datetime, dt.datetime]:
120 """Validity window derived solely from fuzz input (no wall clock)."""
121 base = dt.datetime(2000, 1, 1, tzinfo=dt.timezone.utc)
122 start_days = fdp.ConsumeIntInRange(0, 9000)
123 not_before = base + dt.timedelta(days=start_days)
124 lifetime_days = fdp.ConsumeIntInRange(30, 3650)
125 not_after = not_before + dt.timedelta(days=lifetime_days)
126 return not_before, not_after
127
128
129def _pick_sig_hash(fdp: atheris.FuzzedDataProvider):
130 return fdp.PickValueInList(
131 [hashes.SHA256(), hashes.SHA384(), hashes.SHA512()]
132 )
133
134
135def build_valid_chain(
136 fdp: atheris.FuzzedDataProvider,
137) -> tuple[x509.Certificate, object, list[x509.Certificate]]:
138 """Build a valid chain: root -> 0..3 intermediates -> leaf (depth 1..5).
139
140 Returns (leaf_cert, leaf_key, issuers_chain) where issuers_chain is
141 [nearest_intermediate, ..., root] and does NOT include the leaf.
142 """
143 depth = fdp.ConsumeIntInRange(1, 5)
144 not_before, not_after = deterministic_validity(fdp)
145
146 # Root CA
147 root_key = gen_private_key(fdp)
148 root_name = gen_name(fdp)
149 root_builder = (
150 x509.CertificateBuilder()
151 .subject_name(root_name)
152 .issuer_name(root_name)
153 .public_key(root_key.public_key())
154 .serial_number(deterministic_serial(fdp))
155 .not_valid_before(not_before)
156 .not_valid_after(not_after)
157 .add_extension(
158 x509.BasicConstraints(ca=True, path_length=depth - 1), critical=True
159 )
160 .add_extension(
161 x509.KeyUsage(
162 digital_signature=False,
163 content_commitment=False,
164 key_encipherment=False,
165 data_encipherment=False,
166 key_agreement=False,
167 key_cert_sign=True,
168 crl_sign=True,
169 encipher_only=False,
170 decipher_only=False,
171 ),
172 critical=True,
173 )
174 .add_extension(_ski(root_key.public_key()), critical=False)
175 )
176 root_cert = root_builder.sign(
177 private_key=root_key, algorithm=_pick_sig_hash(fdp)
178 )
179
180 issuer_key = root_key
181 issuer_cert = root_cert
182 issuers: list[x509.Certificate] = [root_cert]
183
184 # Intermediates
185 for i in range(depth - 1):
186 key = gen_private_key(fdp)
187 name = gen_name(fdp)
188 inter_builder = (
189 x509.CertificateBuilder()
190 .subject_name(name)
191 .issuer_name(issuer_cert.subject)
192 .public_key(key.public_key())
193 .serial_number(deterministic_serial(fdp))
194 .not_valid_before(not_before)
195 .not_valid_after(not_after)
196 .add_extension(
197 x509.BasicConstraints(ca=True, path_length=(depth - 2 - i)),
198 critical=True,
199 )
200 .add_extension(
201 x509.KeyUsage(
202 digital_signature=False,
203 content_commitment=False,
204 key_encipherment=False,
205 data_encipherment=False,
206 key_agreement=False,
207 key_cert_sign=True,
208 crl_sign=True,
209 encipher_only=False,
210 decipher_only=False,
211 ),
212 critical=True,
213 )
214 .add_extension(_ski(key.public_key()), critical=False)
215 .add_extension(
216 x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier(
217 x509.SubjectKeyIdentifier.from_public_key(
218 issuer_key.public_key()
219 )
220 ),
221 critical=False,
222 )
223 )
224 inter_cert = inter_builder.sign(
225 private_key=issuer_key, algorithm=_pick_sig_hash(fdp)
226 )
227 issuer_key = key
228 issuer_cert = inter_cert
229 issuers.insert(0, inter_cert) # nearest first
230
231 # Leaf (code signing) — ALWAYS EC to satisfy signer expectations
232 leaf_key = gen_ec_key(fdp)
233 leaf_name = gen_name(fdp)
234 leaf_builder = (
235 x509.CertificateBuilder()
236 .subject_name(leaf_name)
237 .issuer_name(issuer_cert.subject)
238 .public_key(leaf_key.public_key())
239 .serial_number(deterministic_serial(fdp))
240 .not_valid_before(not_before)
241 .not_valid_after(not_after)
242 .add_extension(
243 x509.BasicConstraints(ca=False, path_length=None), critical=True
244 )
245 .add_extension(
246 x509.KeyUsage(
247 digital_signature=True,
248 content_commitment=True,
249 key_encipherment=False,
250 data_encipherment=False,
251 key_agreement=False,
252 key_cert_sign=False,
253 crl_sign=False,
254 encipher_only=False,
255 decipher_only=False,
256 ),
257 critical=True,
258 )
259 .add_extension(
260 x509.ExtendedKeyUsage([ExtendedKeyUsageOID.CODE_SIGNING]),
261 critical=False,
262 )
263 .add_extension(_ski(leaf_key.public_key()), critical=False)
264 .add_extension(
265 x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier(
266 x509.SubjectKeyIdentifier.from_public_key(
267 issuer_key.public_key()
268 )
269 ),
270 critical=False,
271 )
272 )
273 leaf_cert = leaf_builder.sign(
274 private_key=issuer_key, algorithm=_pick_sig_hash(fdp)
275 )
276
277 return leaf_cert, leaf_key, issuers
278
279
280def to_pem_cert(cert: x509.Certificate) -> bytes:
281 return cert.public_bytes(encoding=serialization.Encoding.PEM)
282
283
284def key_to_pem(priv: rsa.RSAPrivateKey | ec.EllipticCurvePrivateKey) -> bytes:
285 return priv.private_bytes(
286 encoding=serialization.Encoding.PEM,
287 format=serialization.PrivateFormat.PKCS8,
288 encryption_algorithm=serialization.NoEncryption(),
289 )
290
291
292def TestOneInput(data: bytes):
293 fdp = atheris.FuzzedDataProvider(data)
294
295 # 1) Build certs & keys (catch x509 construction errors)
296 try:
297 leaf_cert, leaf_key, issuers = build_valid_chain(fdp)
298 except (
299 ValueError,
300 TypeError,
301 x509.DuplicateExtension,
302 x509.UnsupportedGeneralNameType,
303 UnsupportedAlgorithm,
304 ):
305 return # skip this testcase; invalid X.509
306
307 # 2) Convert to PEM and write to disk (catch serialization errors)
308 workdir = tempfile.mkdtemp(prefix="fuzz_cert_")
309 try:
310 leaf_key_path = Path(workdir) / "leaf-key.pem"
311 leaf_cert_path = Path(workdir) / "leaf-cert.pem"
312 chain_paths: list[Path] = []
313
314 with open(leaf_key_path, "wb") as f:
315 f.write(key_to_pem(leaf_key))
316 with open(leaf_cert_path, "wb") as f:
317 f.write(to_pem_cert(leaf_cert))
318
319 for idx, cert in enumerate(issuers):
320 p = Path(workdir) / f"chain-{idx}.pem"
321 with open(p, "wb") as f:
322 f.write(to_pem_cert(cert))
323 chain_paths.append(p)
324
325 # Check chain length before signing and verifying
326 if len(chain_paths) <= 1:
327 shutil.rmtree(workdir, ignore_errors=True)
328 return
329
330 except (ValueError, TypeError, UnsupportedAlgorithm):
331 shutil.rmtree(workdir, ignore_errors=True)
332 return
333
334 # 3) Create model files
335 model_path_dir = tempfile.mkdtemp(prefix="fuzz_model_")
336 model_path_p = Path(model_path_dir)
337 created_files = create_fuzz_files(model_path_p, fdp)
338 if created_files == 0:
339 return
340
341 # Signature output path (we ignore this when signing and verifying)
342 fname = f"signature-{_rand_utf8(fdp, 3, 12).replace('/', '_')}.sig"
343 signature_path = model_path_p / fname
344
345 # Ignores
346 ignore_git = fdp.ConsumeBool()
347 extra_ignores: list[Path] = []
348
349 # 4) Sign and 5) Verify
350 try:
351 signing.Config().use_certificate_signer(
352 private_key=leaf_key_path,
353 signing_certificate=leaf_cert_path,
354 certificate_chain=chain_paths,
355 ).set_hashing_config(
356 hashing.Config().set_ignored_paths(
357 paths=[*list(extra_ignores), signature_path],
358 ignore_git_paths=ignore_git,
359 )
360 ).sign(model_path_p, signature_path)
361
362 verifying.Config().use_certificate_verifier(
363 certificate_chain=chain_paths, log_fingerprints=False
364 ).set_hashing_config(
365 hashing.Config().set_ignored_paths(
366 paths=[*list(extra_ignores), signature_path],
367 ignore_git_paths=ignore_git,
368 )
369 ).verify(model_path_p, signature_path)
370
371 finally:
372 # Always clean up temp dirs
373 shutil.rmtree(model_path_dir, ignore_errors=True)
374 shutil.rmtree(workdir, ignore_errors=True)
375
376
377def main() -> None:
378 atheris.instrument_all()
379 atheris.Setup(sys.argv, TestOneInput)
380 atheris.Fuzz()
381
382
383if __name__ == "__main__":
384 main()