Coverage for /pythoncovmergedfiles/medio/medio/src/model-transparency/tests/fuzzing/fuzz_simple_sigstore.py: 20%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

76 statements  

1###### Coverage stub 

2import atexit 

3import coverage 

4cov = coverage.coverage(data_file='.coverage', cover_pylib=True) 

5cov.start() 

6# Register an exist handler that will print coverage 

7def exit_handler(): 

8 cov.stop() 

9 cov.save() 

10atexit.register(exit_handler) 

11####### End of coverage stub 

12# Copyright 2025 The Sigstore Authors 

13# 

14# Licensed under the Apache License, Version 2.0 (the "License"); 

15# you may not use this file except in compliance with the License. 

16# You may obtain a copy of the License at 

17# 

18# http://www.apache.org/licenses/LICENSE-2.0 

19# 

20# Unless required by applicable law or agreed to in writing, software 

21# distributed under the License is distributed on an "AS IS" BASIS, 

22# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

23# See the License for the specific language governing permissions and 

24# limitations under the License. 

25 

26import importlib 

27import os 

28from pathlib import Path 

29import sys 

30import tempfile 

31 

32# type: ignore 

33import atheris 

34from sigstore.models import TrustedRoot 

35from utils import any_files 

36from utils import create_fuzz_files 

37 

38from model_signing import signing 

39from model_signing import verifying 

40 

41 

42def _patch_sigstore_get_dirs(metadata_dir: Path, artifacts_dir: Path) -> None: 

43 """Overwrite sigstore._internal.tuf._get_dirs(url: str). 

44 

45 This allows us to return directories that the fuzzer controls. 

46 """ 

47 tuf_mod = importlib.import_module("sigstore._internal.tuf") 

48 

49 def _stub_get_dirs(url: str): 

50 return metadata_dir, artifacts_dir 

51 

52 tuf_mod._get_dirs = _stub_get_dirs 

53 

54 

55def _patch_trust_updater_offline_default_true() -> None: 

56 """Make TrustUpdater.__init__ offline by default. 

57 

58 This avoids network calls at runtime which is important 

59 for when the fuzzer runs on OSS-Fuzz. 

60 """ 

61 tuf_mod = importlib.import_module("sigstore._internal.tuf") 

62 trust_updater = tuf_mod.TrustUpdater 

63 _orig_init = trust_updater.__init__ 

64 

65 def _patched_init(self, url: str, offline: bool = True) -> None: 

66 _orig_init(self, url, offline=True) 

67 

68 trust_updater.__init__ = _patched_init 

69 

70 

71def TestOneInput(data: bytes) -> None: 

72 fdp = atheris.FuzzedDataProvider(data) 

73 

74 # When the fuzzer creates a signer further down, 

75 # Sigstore will use a trusted root that the fuzzer 

76 # has created. It is possible for the fuzzer to create 

77 # an invalid trusted root, so it creates and tests it 

78 # here - very early in the whole iteration - to return 

79 # if it is invalid. If it is valid, it will use it laeter. 

80 root_sz = fdp.ConsumeIntInRange(0, 16 * 1024) # up to 16KB 

81 trusted_root_bytes = fdp.ConsumeBytes(root_sz) 

82 

83 tmp_tr_path: Path 

84 with tempfile.NamedTemporaryFile( 

85 prefix="trusted_root_", suffix=".json", delete=False 

86 ) as tmp_tr: 

87 tmp_tr_path = Path(tmp_tr.name) 

88 tmp_tr.write(trusted_root_bytes) 

89 

90 try: 

91 # Early validation to catch bad JSON 

92 TrustedRoot.from_file(str(tmp_tr_path)) 

93 except Exception: 

94 # Bad or unsupported JSON: return and retry 

95 os.unlink(tmp_tr_path) 

96 return 

97 

98 # Temp dirs for sigstore TUF (metadata/artifacts) + model + signature 

99 with ( 

100 tempfile.TemporaryDirectory(prefix="tuf-metadata-") as md_tmp, 

101 tempfile.TemporaryDirectory(prefix="tuf-artifacts-") as art_tmp, 

102 tempfile.TemporaryDirectory(prefix="mt_file_fuzz_") as tmpdir, 

103 tempfile.TemporaryDirectory(prefix="mt_sig_fuzz_") as sigdir, 

104 ): 

105 # Create the model root 

106 root = Path(tmpdir) 

107 

108 # 1) Populate model dir with randomizd files and exit early if empty 

109 create_fuzz_files(root, fdp) 

110 if not any_files(root): 

111 return 

112 

113 metadata_dir = Path(md_tmp) 

114 artifacts_dir = Path(art_tmp) 

115 

116 # 2) Create the hooks into sigstore python 

117 _patch_sigstore_get_dirs(metadata_dir, artifacts_dir) 

118 _patch_trust_updater_offline_default_true() 

119 

120 # 3) Write the (already validated) trusted_root.json into artifacts dir 

121 trusted_root_path = artifacts_dir / "trusted_root.json" 

122 trusted_root_path.write_bytes(trusted_root_bytes) 

123 

124 # 4) Fuzz/write signing_config.v0.2.json 

125 signing_config_path = artifacts_dir / "signing_config.v0.2.json" 

126 cfg_sz = fdp.ConsumeIntInRange(0, 16 * 1024) # up to 16KB 

127 signing_config_path.write_bytes(fdp.ConsumeBytes(cfg_sz)) 

128 

129 # 5) Prepare signature path 

130 signature_path = Path(sigdir) / "model.signature" 

131 

132 # 6) Sign 

133 expected_identity = ( 

134 fdp.ConsumeBytes(32).decode("utf-8", errors="ignore") 

135 or "default-identity" 

136 ) 

137 expected_oidc_issuer = ( 

138 fdp.ConsumeBytes(32).decode("utf-8", errors="ignore") 

139 or "https://example.com/" 

140 ) 

141 sigstore_oidc_beacon_token = ( 

142 fdp.ConsumeBytes(64).decode("utf-8", errors="ignore") or "token" 

143 ) 

144 

145 sc = signing.Config() 

146 sc.use_sigstore_signer( 

147 use_staging=True, identity_token=sigstore_oidc_beacon_token 

148 ) 

149 sc.sign(root, signature_path) 

150 

151 if not signature_path.exists(): 

152 return 

153 

154 # 7) Verify 

155 vc = verifying.Config() 

156 vc.use_sigstore_verifier( 

157 identity=expected_identity, 

158 oidc_issuer=expected_oidc_issuer, 

159 use_staging=True, 

160 ) 

161 vc.verify(root, signature_path) 

162 

163 

164def main(): 

165 atheris.instrument_all() 

166 atheris.Setup(sys.argv, TestOneInput) 

167 atheris.Fuzz() 

168 

169 

170if __name__ == "__main__": 

171 main()