Coverage for /pythoncovmergedfiles/medio/medio/src/model-transparency/tests/fuzzing/utils.py: 87%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright 2025 The Sigstore Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15from contextlib import suppress
16import os
17from pathlib import Path
19# type: ignore
20import atheris
22import model_signing
25_SAFE_CHARS = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-"
28def _consume_segment(
29 fdp: atheris.FuzzedDataProvider, min_len: int = 1, max_len: int = 16
30) -> str:
31 """Return a path-safe segment using only _SAFE_CHARS."""
32 seg_len = fdp.ConsumeIntInRange(min_len, max_len)
33 bs = fdp.ConsumeBytes(seg_len)
34 if not bs:
35 return "x"
36 out = []
37 for b in bs:
38 out.append(_SAFE_CHARS[b % len(_SAFE_CHARS)])
39 s = "".join(out)
40 if s in {".", "..", ""}:
41 s = "x" + s + "x"
42 return s
45def random_relpath(fdp: atheris.FuzzedDataProvider) -> Path:
46 """Generate a relative, nested path (no traversal, no absolute roots)."""
47 depth = fdp.ConsumeIntInRange(1, 4)
48 parts = [_consume_segment(fdp) for _ in range(depth)]
49 if fdp.ConsumeBool():
50 ext = _consume_segment(fdp, 1, 6).lower()
51 parts[-1] = f"{parts[-1]}.{ext}"
52 rel = Path(parts[0])
53 for p in parts[1:]:
54 rel = rel / p
55 rel = Path(*[p for p in rel.parts if p not in ("", ".", "..")])
56 if rel == Path():
57 rel = Path("f")
58 return rel
61def is_under(child: Path, parent: Path) -> bool:
62 """True iff 'child' is inside 'parent' after resolving symlinks."""
63 try:
64 child.resolve().relative_to(parent.resolve())
65 return True
66 except Exception:
67 return False
70def safe_write(root: Path, rel: Path, data: bytes) -> bool:
71 """Write bytes to root/rel only if the resolved path stays under root.
73 Uses an O_NOFOLLOW open (when supported) to avoid following a final
74 symlink. Returns True if a regular file exists at the target after the
75 write attempt.
76 """
77 dest_resolved = (root / rel).resolve()
78 if not is_under(dest_resolved, root):
79 return False
81 try:
82 dest_resolved.parent.mkdir(parents=True, exist_ok=True)
83 except Exception:
84 return False
86 flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC
87 nofollow = getattr(os, "O_NOFOLLOW", 0)
89 try:
90 if nofollow:
91 fd = os.open(dest_resolved, flags | nofollow, 0o666)
92 try:
93 with os.fdopen(fd, "wb") as f:
94 f.write(data)
95 except Exception:
96 with suppress(Exception):
97 os.close(fd)
98 else:
99 with open(dest_resolved, "wb") as f:
100 f.write(data)
101 except Exception:
102 return False
104 return dest_resolved.is_file()
107def create_fuzz_files(root: Path, fdp: atheris.FuzzedDataProvider) -> int:
108 """Create 0..30 files under root with fuzzed relative paths and contents.
110 Returns the number of files successfully created.
111 """
112 nfiles = fdp.ConsumeIntInRange(0, 30)
113 seen: set[Path] = set()
114 created = 0
115 for _ in range(nfiles):
116 rel = random_relpath(fdp)
117 if rel in seen:
118 continue
119 seen.add(rel)
121 size = fdp.ConsumeIntInRange(0, 64 * 1024)
122 data = fdp.ConsumeBytes(size)
123 if safe_write(root, rel, data):
124 created += 1
125 return created
128def any_files(root: Path) -> bool:
129 """True if there is at least one regular file under root."""
130 return any(p.is_file() for p in root.rglob("*"))
133def _build_hashing_config_from_fdp(
134 fdp: atheris.FuzzedDataProvider,
135) -> "model_signing.hashing.Config":
136 """Randomize serialization strategy and hash algorithm."""
137 alg = ["sha256", "blake2", "blake3"][fdp.ConsumeIntInRange(0, 2)]
139 hcfg = model_signing.hashing.Config()
140 # Choose serialization mode: file vs shard
141 if fdp.ConsumeBool():
142 # File-based serialization.
143 hcfg.use_file_serialization(hashing_algorithm=alg)
144 else:
145 # Sharded file serialization
146 hcfg.use_shard_serialization(hashing_algorithm=alg)
147 return hcfg