Coverage for /pythoncovmergedfiles/medio/medio/src/model-transparency/tests/fuzzing/utils.py: 86%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright 2025 The Sigstore Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15from contextlib import suppress
16import os
17from pathlib import Path
19# type: ignore
20import atheris
23_SAFE_CHARS = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-"
26def _consume_segment(
27 fdp: atheris.FuzzedDataProvider, min_len: int = 1, max_len: int = 16
28) -> str:
29 """Return a path-safe segment using only _SAFE_CHARS."""
30 seg_len = fdp.ConsumeIntInRange(min_len, max_len)
31 bs = fdp.ConsumeBytes(seg_len)
32 if not bs:
33 return "x"
34 out = []
35 for b in bs:
36 out.append(_SAFE_CHARS[b % len(_SAFE_CHARS)])
37 s = "".join(out)
38 if s in {".", "..", ""}:
39 s = "x" + s + "x"
40 return s
43def random_relpath(fdp: atheris.FuzzedDataProvider) -> Path:
44 """Generate a relative, nested path (no traversal, no absolute roots)."""
45 depth = fdp.ConsumeIntInRange(1, 4)
46 parts = [_consume_segment(fdp) for _ in range(depth)]
47 if fdp.ConsumeBool():
48 ext = _consume_segment(fdp, 1, 6).lower()
49 parts[-1] = f"{parts[-1]}.{ext}"
50 rel = Path(parts[0])
51 for p in parts[1:]:
52 rel = rel / p
53 rel = Path(*[p for p in rel.parts if p not in ("", ".", "..")])
54 if rel == Path():
55 rel = Path("f")
56 return rel
59def is_under(child: Path, parent: Path) -> bool:
60 """True iff 'child' is inside 'parent' after resolving symlinks."""
61 try:
62 child.resolve().relative_to(parent.resolve())
63 return True
64 except Exception:
65 return False
68def safe_write(root: Path, rel: Path, data: bytes) -> bool:
69 """Write bytes to root/rel only if the resolved path stays under root.
71 Uses an O_NOFOLLOW open (when supported) to avoid following a final
72 symlink. Returns True if a regular file exists at the target after the
73 write attempt.
74 """
75 dest_resolved = (root / rel).resolve()
76 if not is_under(dest_resolved, root):
77 return False
79 try:
80 dest_resolved.parent.mkdir(parents=True, exist_ok=True)
81 except Exception:
82 return False
84 flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC
85 nofollow = getattr(os, "O_NOFOLLOW", 0)
87 try:
88 if nofollow:
89 fd = os.open(dest_resolved, flags | nofollow, 0o666)
90 try:
91 with os.fdopen(fd, "wb") as f:
92 f.write(data)
93 except Exception:
94 with suppress(Exception):
95 os.close(fd)
96 else:
97 with open(dest_resolved, "wb") as f:
98 f.write(data)
99 except Exception:
100 return False
102 return dest_resolved.is_file()
105def create_fuzz_files(root: Path, fdp: atheris.FuzzedDataProvider) -> int:
106 """Create 0..30 files under root with fuzzed relative paths and contents.
108 Returns the number of files successfully created.
109 """
110 nfiles = fdp.ConsumeIntInRange(0, 30)
111 seen: set[Path] = set()
112 created = 0
113 for _ in range(nfiles):
114 rel = random_relpath(fdp)
115 if rel in seen:
116 continue
117 seen.add(rel)
119 size = fdp.ConsumeIntInRange(0, 64 * 1024)
120 data = fdp.ConsumeBytes(size)
121 if safe_write(root, rel, data):
122 created += 1
123 return created
126def any_files(root: Path) -> bool:
127 """True if there is at least one regular file under root."""
128 return any(p.is_file() for p in root.rglob("*"))