1###### Coverage stub
2import atexit
3import coverage
4cov = coverage.coverage(data_file='.coverage', cover_pylib=True)
5cov.start()
6# Register an exist handler that will print coverage
7def exit_handler():
8 cov.stop()
9 cov.save()
10atexit.register(exit_handler)
11####### End of coverage stub
12# Copyright 2025 The Sigstore Authors
13#
14# Licensed under the Apache License, Version 2.0 (the "License");
15# you may not use this file except in compliance with the License.
16# You may obtain a copy of the License at
17#
18# http://www.apache.org/licenses/LICENSE-2.0
19#
20# Unless required by applicable law or agreed to in writing, software
21# distributed under the License is distributed on an "AS IS" BASIS,
22# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
23# See the License for the specific language governing permissions and
24# limitations under the License.
25
26import importlib
27import os
28from pathlib import Path
29import sys
30import tempfile
31
32# type: ignore
33import atheris
34from sigstore.models import TrustedRoot
35from utils import any_files
36from utils import create_fuzz_files
37
38from model_signing import signing
39from model_signing import verifying
40
41
42def _patch_sigstore_get_dirs(metadata_dir: Path, artifacts_dir: Path) -> None:
43 """Overwrite sigstore._internal.tuf._get_dirs(url: str).
44
45 This allows us to return directories that the fuzzer controls.
46 """
47 tuf_mod = importlib.import_module("sigstore._internal.tuf")
48
49 def _stub_get_dirs(url: str):
50 return metadata_dir, artifacts_dir
51
52 tuf_mod._get_dirs = _stub_get_dirs
53
54
55def _patch_trust_updater_offline_default_true() -> None:
56 """Make TrustUpdater.__init__ offline by default.
57
58 This avoids network calls at runtime which is important
59 for when the fuzzer runs on OSS-Fuzz.
60 """
61 tuf_mod = importlib.import_module("sigstore._internal.tuf")
62 trust_updater = tuf_mod.TrustUpdater
63 _orig_init = trust_updater.__init__
64
65 def _patched_init(self, url: str, offline: bool = True) -> None:
66 _orig_init(self, url, offline=True)
67
68 trust_updater.__init__ = _patched_init
69
70
71def TestOneInput(data: bytes) -> None:
72 fdp = atheris.FuzzedDataProvider(data)
73
74 # When the fuzzer creates a signer further down,
75 # Sigstore will use a trusted root that the fuzzer
76 # has created. It is possible for the fuzzer to create
77 # an invalid trusted root, so it creates and tests it
78 # here - very early in the whole iteration - to return
79 # if it is invalid. If it is valid, it will use it laeter.
80 root_sz = fdp.ConsumeIntInRange(0, 16 * 1024) # up to 16KB
81 trusted_root_bytes = fdp.ConsumeBytes(root_sz)
82
83 tmp_tr_path: Path
84 with tempfile.NamedTemporaryFile(
85 prefix="trusted_root_", suffix=".json", delete=False
86 ) as tmp_tr:
87 tmp_tr_path = Path(tmp_tr.name)
88 tmp_tr.write(trusted_root_bytes)
89
90 try:
91 # Early validation to catch bad JSON
92 TrustedRoot.from_file(str(tmp_tr_path))
93 except Exception:
94 # Bad or unsupported JSON: return and retry
95 os.unlink(tmp_tr_path)
96 return
97
98 # Temp dirs for sigstore TUF (metadata/artifacts) + model + signature
99 with (
100 tempfile.TemporaryDirectory(prefix="tuf-metadata-") as md_tmp,
101 tempfile.TemporaryDirectory(prefix="tuf-artifacts-") as art_tmp,
102 tempfile.TemporaryDirectory(prefix="mt_file_fuzz_") as tmpdir,
103 tempfile.TemporaryDirectory(prefix="mt_sig_fuzz_") as sigdir,
104 ):
105 # Create the model root
106 root = Path(tmpdir)
107
108 # 1) Populate model dir with randomizd files and exit early if empty
109 create_fuzz_files(root, fdp)
110 if not any_files(root):
111 return
112
113 metadata_dir = Path(md_tmp)
114 artifacts_dir = Path(art_tmp)
115
116 # 2) Create the hooks into sigstore python
117 _patch_sigstore_get_dirs(metadata_dir, artifacts_dir)
118 _patch_trust_updater_offline_default_true()
119
120 # 3) Write the (already validated) trusted_root.json into artifacts dir
121 trusted_root_path = artifacts_dir / "trusted_root.json"
122 trusted_root_path.write_bytes(trusted_root_bytes)
123
124 # 4) Fuzz/write signing_config.v0.2.json
125 signing_config_path = artifacts_dir / "signing_config.v0.2.json"
126 cfg_sz = fdp.ConsumeIntInRange(0, 16 * 1024) # up to 16KB
127 signing_config_path.write_bytes(fdp.ConsumeBytes(cfg_sz))
128
129 # 5) Prepare signature path
130 signature_path = Path(sigdir) / "model.signature"
131
132 # 6) Sign
133 expected_identity = (
134 fdp.ConsumeBytes(32).decode("utf-8", errors="ignore")
135 or "default-identity"
136 )
137 expected_oidc_issuer = (
138 fdp.ConsumeBytes(32).decode("utf-8", errors="ignore")
139 or "https://example.com/"
140 )
141 sigstore_oidc_beacon_token = (
142 fdp.ConsumeBytes(64).decode("utf-8", errors="ignore") or "token"
143 )
144
145 sc = signing.Config()
146 sc.use_sigstore_signer(
147 use_staging=True, identity_token=sigstore_oidc_beacon_token
148 )
149 sc.sign(root, signature_path)
150
151 if not signature_path.exists():
152 return
153
154 # 7) Verify
155 vc = verifying.Config()
156 vc.use_sigstore_verifier(
157 identity=expected_identity,
158 oidc_issuer=expected_oidc_issuer,
159 use_staging=True,
160 )
161 vc.verify(root, signature_path)
162
163
164def main():
165 atheris.instrument_all()
166 atheris.Setup(sys.argv, TestOneInput)
167 atheris.Fuzz()
168
169
170if __name__ == "__main__":
171 main()