Coverage for /pythoncovmergedfiles/medio/medio/src/model-transparency/tests/fuzzing/utils.py: 87%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

85 statements  

1# Copyright 2025 The Sigstore Authors 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15from contextlib import suppress 

16import os 

17from pathlib import Path 

18 

19# type: ignore 

20import atheris 

21 

22import model_signing 

23 

24 

25_SAFE_CHARS = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-" 

26 

27 

28def _consume_segment( 

29 fdp: atheris.FuzzedDataProvider, min_len: int = 1, max_len: int = 16 

30) -> str: 

31 """Return a path-safe segment using only _SAFE_CHARS.""" 

32 seg_len = fdp.ConsumeIntInRange(min_len, max_len) 

33 bs = fdp.ConsumeBytes(seg_len) 

34 if not bs: 

35 return "x" 

36 out = [] 

37 for b in bs: 

38 out.append(_SAFE_CHARS[b % len(_SAFE_CHARS)]) 

39 s = "".join(out) 

40 if s in {".", "..", ""}: 

41 s = "x" + s + "x" 

42 return s 

43 

44 

45def random_relpath(fdp: atheris.FuzzedDataProvider) -> Path: 

46 """Generate a relative, nested path (no traversal, no absolute roots).""" 

47 depth = fdp.ConsumeIntInRange(1, 4) 

48 parts = [_consume_segment(fdp) for _ in range(depth)] 

49 if fdp.ConsumeBool(): 

50 ext = _consume_segment(fdp, 1, 6).lower() 

51 parts[-1] = f"{parts[-1]}.{ext}" 

52 rel = Path(parts[0]) 

53 for p in parts[1:]: 

54 rel = rel / p 

55 rel = Path(*[p for p in rel.parts if p not in ("", ".", "..")]) 

56 if rel == Path(): 

57 rel = Path("f") 

58 return rel 

59 

60 

61def is_under(child: Path, parent: Path) -> bool: 

62 """True iff 'child' is inside 'parent' after resolving symlinks.""" 

63 try: 

64 child.resolve().relative_to(parent.resolve()) 

65 return True 

66 except Exception: 

67 return False 

68 

69 

70def safe_write(root: Path, rel: Path, data: bytes) -> bool: 

71 """Write bytes to root/rel only if the resolved path stays under root. 

72 

73 Uses an O_NOFOLLOW open (when supported) to avoid following a final 

74 symlink. Returns True if a regular file exists at the target after the 

75 write attempt. 

76 """ 

77 dest_resolved = (root / rel).resolve() 

78 if not is_under(dest_resolved, root): 

79 return False 

80 

81 try: 

82 dest_resolved.parent.mkdir(parents=True, exist_ok=True) 

83 except Exception: 

84 return False 

85 

86 flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC 

87 nofollow = getattr(os, "O_NOFOLLOW", 0) 

88 

89 try: 

90 if nofollow: 

91 fd = os.open(dest_resolved, flags | nofollow, 0o666) 

92 try: 

93 with os.fdopen(fd, "wb") as f: 

94 f.write(data) 

95 except Exception: 

96 with suppress(Exception): 

97 os.close(fd) 

98 else: 

99 with open(dest_resolved, "wb") as f: 

100 f.write(data) 

101 except Exception: 

102 return False 

103 

104 return dest_resolved.is_file() 

105 

106 

107def create_fuzz_files(root: Path, fdp: atheris.FuzzedDataProvider) -> int: 

108 """Create 0..30 files under root with fuzzed relative paths and contents. 

109 

110 Returns the number of files successfully created. 

111 """ 

112 nfiles = fdp.ConsumeIntInRange(0, 30) 

113 seen: set[Path] = set() 

114 created = 0 

115 for _ in range(nfiles): 

116 rel = random_relpath(fdp) 

117 if rel in seen: 

118 continue 

119 seen.add(rel) 

120 

121 size = fdp.ConsumeIntInRange(0, 64 * 1024) 

122 data = fdp.ConsumeBytes(size) 

123 if safe_write(root, rel, data): 

124 created += 1 

125 return created 

126 

127 

128def any_files(root: Path) -> bool: 

129 """True if there is at least one regular file under root.""" 

130 return any(p.is_file() for p in root.rglob("*")) 

131 

132 

133def _build_hashing_config_from_fdp( 

134 fdp: atheris.FuzzedDataProvider, 

135) -> "model_signing.hashing.Config": 

136 """Randomize serialization strategy and hash algorithm.""" 

137 alg = ["sha256", "blake2", "blake3"][fdp.ConsumeIntInRange(0, 2)] 

138 

139 hcfg = model_signing.hashing.Config() 

140 # Choose serialization mode: file vs shard 

141 if fdp.ConsumeBool(): 

142 # File-based serialization. 

143 hcfg.use_file_serialization(hashing_algorithm=alg) 

144 else: 

145 # Sharded file serialization 

146 hcfg.use_shard_serialization(hashing_algorithm=alg) 

147 return hcfg