Coverage for /pythoncovmergedfiles/medio/medio/src/model-transparency/tests/fuzzing/fuzz_simple_sigstore.py: 19%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

80 statements  

1###### Coverage stub 

2import atexit 

3import coverage 

4cov = coverage.coverage(data_file='.coverage', cover_pylib=True) 

5cov.start() 

6# Register an exist handler that will print coverage 

7def exit_handler(): 

8 cov.stop() 

9 cov.save() 

10atexit.register(exit_handler) 

11####### End of coverage stub 

12# Copyright 2025 The Sigstore Authors 

13# 

14# Licensed under the Apache License, Version 2.0 (the "License"); 

15# you may not use this file except in compliance with the License. 

16# You may obtain a copy of the License at 

17# 

18# http://www.apache.org/licenses/LICENSE-2.0 

19# 

20# Unless required by applicable law or agreed to in writing, software 

21# distributed under the License is distributed on an "AS IS" BASIS, 

22# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

23# See the License for the specific language governing permissions and 

24# limitations under the License. 

25 

26import importlib 

27import os 

28from pathlib import Path 

29import sys 

30import tempfile 

31 

32# type: ignore 

33import atheris 

34from sigstore.models import TrustedRoot 

35from utils import _build_hashing_config_from_fdp 

36from utils import any_files 

37from utils import create_fuzz_files 

38 

39from model_signing import signing 

40from model_signing import verifying 

41 

42 

43def _patch_sigstore_get_dirs(metadata_dir: Path, artifacts_dir: Path) -> None: 

44 """Overwrite sigstore._internal.tuf._get_dirs(url: str). 

45 

46 This allows us to return directories that the fuzzer controls. 

47 """ 

48 tuf_mod = importlib.import_module("sigstore._internal.tuf") 

49 

50 def _stub_get_dirs(url: str): 

51 return metadata_dir, artifacts_dir 

52 

53 tuf_mod._get_dirs = _stub_get_dirs 

54 

55 

56def _patch_trust_updater_offline_default_true() -> None: 

57 """Make TrustUpdater.__init__ offline by default. 

58 

59 This avoids network calls at runtime which is important 

60 for when the fuzzer runs on OSS-Fuzz. 

61 """ 

62 tuf_mod = importlib.import_module("sigstore._internal.tuf") 

63 trust_updater = tuf_mod.TrustUpdater 

64 _orig_init = trust_updater.__init__ 

65 

66 def _patched_init(self, url: str, offline: bool = True) -> None: 

67 _orig_init(self, url, offline=True) 

68 

69 trust_updater.__init__ = _patched_init 

70 

71 

72def TestOneInput(data: bytes) -> None: 

73 fdp = atheris.FuzzedDataProvider(data) 

74 

75 # When the fuzzer creates a signer further down, 

76 # Sigstore will use a trusted root that the fuzzer 

77 # has created. It is possible for the fuzzer to create 

78 # an invalid trusted root, so it creates and tests it 

79 # here - very early in the whole iteration - to return 

80 # if it is invalid. If it is valid, it will use it laeter. 

81 root_sz = fdp.ConsumeIntInRange(0, 16 * 1024) # up to 16KB 

82 trusted_root_bytes = fdp.ConsumeBytes(root_sz) 

83 

84 tmp_tr_path: Path 

85 with tempfile.NamedTemporaryFile( 

86 prefix="trusted_root_", suffix=".json", delete=False 

87 ) as tmp_tr: 

88 tmp_tr_path = Path(tmp_tr.name) 

89 tmp_tr.write(trusted_root_bytes) 

90 

91 try: 

92 # Early validation to catch bad JSON 

93 TrustedRoot.from_file(str(tmp_tr_path)) 

94 except Exception: 

95 # Bad or unsupported JSON: return and retry 

96 os.unlink(tmp_tr_path) 

97 return 

98 

99 # Temp dirs for sigstore TUF (metadata/artifacts) + model + signature 

100 with ( 

101 tempfile.TemporaryDirectory(prefix="tuf-metadata-") as md_tmp, 

102 tempfile.TemporaryDirectory(prefix="tuf-artifacts-") as art_tmp, 

103 tempfile.TemporaryDirectory(prefix="mt_file_fuzz_") as tmpdir, 

104 tempfile.TemporaryDirectory(prefix="mt_sig_fuzz_") as sigdir, 

105 ): 

106 # Create the model root 

107 root = Path(tmpdir) 

108 

109 # 1) Populate model dir with randomizd files and exit early if empty 

110 create_fuzz_files(root, fdp) 

111 if not any_files(root): 

112 return 

113 

114 metadata_dir = Path(md_tmp) 

115 artifacts_dir = Path(art_tmp) 

116 

117 # 2) Create the hooks into sigstore python 

118 _patch_sigstore_get_dirs(metadata_dir, artifacts_dir) 

119 _patch_trust_updater_offline_default_true() 

120 

121 # 3) Write the (already validated) trusted_root.json into artifacts dir 

122 trusted_root_path = artifacts_dir / "trusted_root.json" 

123 trusted_root_path.write_bytes(trusted_root_bytes) 

124 

125 # 4) Fuzz/write signing_config.v0.2.json 

126 signing_config_path = artifacts_dir / "signing_config.v0.2.json" 

127 cfg_sz = fdp.ConsumeIntInRange(0, 16 * 1024) # up to 16KB 

128 signing_config_path.write_bytes(fdp.ConsumeBytes(cfg_sz)) 

129 

130 # 5) Prepare signature path 

131 signature_path = Path(sigdir) / "model.signature" 

132 

133 # 6) Sign 

134 expected_identity = ( 

135 fdp.ConsumeBytes(32).decode("utf-8", errors="ignore") 

136 or "default-identity" 

137 ) 

138 expected_oidc_issuer = ( 

139 fdp.ConsumeBytes(32).decode("utf-8", errors="ignore") 

140 or "https://example.com/" 

141 ) 

142 sigstore_oidc_beacon_token = ( 

143 fdp.ConsumeBytes(64).decode("utf-8", errors="ignore") or "token" 

144 ) 

145 

146 hcfg = _build_hashing_config_from_fdp(fdp) 

147 

148 sc = signing.Config() 

149 sc.set_hashing_config(hcfg) 

150 sc.use_sigstore_signer( 

151 use_staging=True, identity_token=sigstore_oidc_beacon_token 

152 ) 

153 sc.sign(root, signature_path) 

154 

155 if not signature_path.exists(): 

156 return 

157 

158 # 7) Verify 

159 vc = verifying.Config() 

160 vc.set_hashing_config(hcfg) 

161 vc.use_sigstore_verifier( 

162 identity=expected_identity, 

163 oidc_issuer=expected_oidc_issuer, 

164 use_staging=True, 

165 ) 

166 vc.verify(root, signature_path) 

167 

168 

169def main(): 

170 atheris.instrument_all() 

171 atheris.Setup(sys.argv, TestOneInput) 

172 atheris.Fuzz() 

173 

174 

175if __name__ == "__main__": 

176 main()