1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
10
11import functools
12import os
13import re
14import subprocess
15import sys
16import sysconfig
17import types
18from collections import defaultdict
19from collections.abc import Iterable
20from enum import IntEnum
21from functools import lru_cache, reduce
22from os import sep
23from pathlib import Path
24from typing import TYPE_CHECKING, TypeAlias
25
26from hypothesis._settings import Phase, Verbosity
27from hypothesis.internal.compat import PYPY
28from hypothesis.internal.escalation import is_hypothesis_file
29
30if TYPE_CHECKING:
31 from typing_extensions import Self
32
33Location: TypeAlias = tuple[str, int]
34Branch: TypeAlias = tuple[Location | None, Location]
35Trace: TypeAlias = frozenset[Branch]
36
37
38@functools.cache
39def should_trace_file(fname: str) -> bool:
40 # fname.startswith("<") indicates runtime code-generation via compile,
41 # e.g. compile("def ...", "<string>", "exec") in e.g. attrs methods.
42 return not (is_hypothesis_file(fname) or fname.startswith("<"))
43
44
45# where possible, we'll use 3.12's new sys.monitoring module for low-overhead
46# coverage instrumentation; on older python versions we'll use sys.settrace.
47# tool_id = 1 is designated for coverage, but we intentionally choose a
48# non-reserved tool id so we can co-exist with coverage tools.
49MONITORING_TOOL_ID = 3
50if hasattr(sys, "monitoring"):
51 MONITORING_EVENTS = {sys.monitoring.events.LINE: "trace_line"}
52
53
54class Tracer:
55 """A super-simple branch coverage tracer."""
56
57 __slots__ = (
58 "_branches",
59 "_previous_location",
60 "_should_trace",
61 "_tried_and_failed_to_trace",
62 )
63
64 def __init__(self, *, should_trace: bool) -> None:
65 self._branches: set[Branch] = set()
66 self._previous_location: Location | None = None
67 self._tried_and_failed_to_trace = False
68 self._should_trace = should_trace and self.can_trace()
69
70 @staticmethod
71 def can_trace() -> bool:
72 if PYPY:
73 return False
74 if hasattr(sys, "monitoring"):
75 return sys.monitoring.get_tool(MONITORING_TOOL_ID) is None
76 return sys.gettrace() is None
77
78 @property
79 def branches(self) -> Trace:
80 return frozenset(self._branches)
81
82 def trace(self, frame, event, arg):
83 try:
84 if event == "call":
85 return self.trace
86 elif event == "line":
87 fname = frame.f_code.co_filename
88 if should_trace_file(fname):
89 current_location = (fname, frame.f_lineno)
90 self._branches.add((self._previous_location, current_location))
91 self._previous_location = current_location
92 except RecursionError:
93 pass
94
95 def trace_line(self, code: types.CodeType, line_number: int) -> None:
96 fname = code.co_filename
97 if not should_trace_file(fname):
98 # this function is only called on 3.12+, but we want to avoid an
99 # assertion to that effect for performance.
100 return sys.monitoring.DISABLE # type: ignore
101
102 current_location = (fname, line_number)
103 self._branches.add((self._previous_location, current_location))
104 self._previous_location = current_location
105
106 def __enter__(self) -> "Self":
107 self._tried_and_failed_to_trace = False
108
109 if not self._should_trace:
110 return self
111
112 if not hasattr(sys, "monitoring"):
113 sys.settrace(self.trace)
114 return self
115
116 try:
117 sys.monitoring.use_tool_id(MONITORING_TOOL_ID, "scrutineer")
118 except ValueError:
119 # another thread may have registered a tool for MONITORING_TOOL_ID
120 # since we checked in can_trace.
121 self._tried_and_failed_to_trace = True
122 return self
123
124 for event, callback_name in MONITORING_EVENTS.items():
125 sys.monitoring.set_events(MONITORING_TOOL_ID, event)
126 callback = getattr(self, callback_name)
127 sys.monitoring.register_callback(MONITORING_TOOL_ID, event, callback)
128
129 return self
130
131 def __exit__(self, *args, **kwargs):
132 if not self._should_trace:
133 return
134
135 if not hasattr(sys, "monitoring"):
136 sys.settrace(None)
137 return
138
139 if self._tried_and_failed_to_trace:
140 return
141
142 sys.monitoring.free_tool_id(MONITORING_TOOL_ID)
143 for event in MONITORING_EVENTS:
144 sys.monitoring.register_callback(MONITORING_TOOL_ID, event, None)
145
146
147UNHELPFUL_LOCATIONS = (
148 # There's a branch which is only taken when an exception is active while exiting
149 # a contextmanager; this is probably after the fault has been triggered.
150 # Similar reasoning applies to a few other standard-library modules: even
151 # if the fault was later, these still aren't useful locations to report!
152 # Note: The list is post-processed, so use plain "/" for separator here.
153 "/contextlib.py",
154 "/inspect.py",
155 "/re.py",
156 "/re/__init__.py", # refactored in Python 3.11
157 "/warnings.py",
158 # Quite rarely, the first AFNP line is in Pytest's internals.
159 "/_pytest/**",
160 "/pluggy/_*.py",
161 # used by pytest for failure formatting in the terminal.
162 # seen: pygments/lexer.py, pygments/formatters/, pygments/filter.py.
163 "/pygments/*",
164 # used by pytest for failure formatting
165 "/difflib.py",
166 "/reprlib.py",
167 "/typing.py",
168 "/conftest.py",
169 "/pprint.py",
170 # syrupy registers a pytest_assertrepr_compare hook, which only runs when
171 # assertions fail — making it appear as always-failing-never-passing.
172 "/syrupy/__init__.py",
173)
174
175
176def _glob_to_re(locs: Iterable[str]) -> str:
177 """Translate a list of glob patterns to a combined regular expression.
178 Only the * and ** wildcards are supported, and patterns including special
179 characters will only work by chance."""
180 # fnmatch.translate is not an option since its "*" consumes path sep
181 return "|".join(
182 loc.replace(".", re.escape("."))
183 .replace("**", r".+")
184 .replace("*", r"[^/]+")
185 .replace("/", re.escape(sep))
186 + r"\Z" # right anchored
187 for loc in locs
188 )
189
190
191def get_explaining_locations(traces):
192 # Traces is a dict[interesting_origin | None, set[frozenset[tuple[str, int]]]]
193 # Each trace in the set might later become a Counter instead of frozenset.
194 if not traces:
195 return {}
196
197 unions = {origin: set().union(*values) for origin, values in traces.items()}
198 seen_passing = {None}.union(*unions.pop(None, set()))
199
200 always_failing_never_passing = {
201 origin: reduce(set.intersection, [set().union(*v) for v in values])
202 - seen_passing
203 for origin, values in traces.items()
204 if origin is not None
205 }
206
207 # Build the observed parts of the control-flow graph for each origin
208 cf_graphs = {origin: defaultdict(set) for origin in unions}
209 for origin, seen_arcs in unions.items():
210 for src, dst in seen_arcs:
211 cf_graphs[origin][src].add(dst)
212 assert cf_graphs[origin][None], "Expected start node with >=1 successor"
213
214 # For each origin, our explanation is the always_failing_never_passing lines
215 # which are reachable from the start node (None) without passing through another
216 # AFNP line. So here's a whatever-first search with early stopping:
217 explanations = defaultdict(set)
218 for origin in unions:
219 queue = {None}
220 seen = set()
221 while queue:
222 assert queue.isdisjoint(seen), f"Intersection: {queue & seen}"
223 src = queue.pop()
224 seen.add(src)
225 if src in always_failing_never_passing[origin]:
226 explanations[origin].add(src)
227 else:
228 queue.update(cf_graphs[origin][src] - seen)
229
230 # The last step is to filter out explanations that we know would be uninformative.
231 # When this is the first AFNP location, we conclude that Scrutineer missed the
232 # real divergence (earlier in the trace) and drop that unhelpful explanation.
233 filter_regex = re.compile(_glob_to_re(UNHELPFUL_LOCATIONS))
234 return {
235 origin: {loc for loc in afnp_locs if not filter_regex.search(loc[0])}
236 for origin, afnp_locs in explanations.items()
237 }
238
239
240# see e.g. https://docs.python.org/3/library/sysconfig.html#posix-user
241# for examples of these path schemes
242STDLIB_DIRS = {
243 Path(sysconfig.get_path("platstdlib")).resolve(),
244 Path(sysconfig.get_path("stdlib")).resolve(),
245}
246SITE_PACKAGES_DIRS = {
247 Path(sysconfig.get_path("purelib")).resolve(),
248 Path(sysconfig.get_path("platlib")).resolve(),
249}
250
251EXPLANATION_STUB = (
252 "Explanation:",
253 " These lines were always and only run by failing examples:",
254)
255
256
257class ModuleLocation(IntEnum):
258 LOCAL = 0
259 SITE_PACKAGES = 1
260 STDLIB = 2
261
262 @classmethod
263 @lru_cache(1024)
264 def from_path(cls, path: str) -> "ModuleLocation":
265 path = Path(path).resolve()
266 # site-packages may be a subdir of stdlib or platlib, so it's important to
267 # check is_relative_to for this before the stdlib.
268 if any(path.is_relative_to(p) for p in SITE_PACKAGES_DIRS):
269 return cls.SITE_PACKAGES
270 if any(path.is_relative_to(p) for p in STDLIB_DIRS):
271 return cls.STDLIB
272 return cls.LOCAL
273
274
275# show local files first, then site-packages, then stdlib
276def _sort_key(path: str, lineno: int) -> tuple[int, str, int]:
277 return (ModuleLocation.from_path(path), path, lineno)
278
279
280def make_report(explanations, *, cap_lines_at=5):
281 report = defaultdict(list)
282 for origin, locations in explanations.items():
283 locations = list(locations)
284 locations.sort(key=lambda v: _sort_key(v[0], v[1]))
285 report_lines = [f" {fname}:{lineno}" for fname, lineno in locations]
286 if len(report_lines) > cap_lines_at + 1:
287 msg = " (and {} more with settings.verbosity >= verbose)"
288 report_lines[cap_lines_at:] = [msg.format(len(report_lines[cap_lines_at:]))]
289 if report_lines: # We might have filtered out every location as uninformative.
290 report[origin] = list(EXPLANATION_STUB) + report_lines
291 return report
292
293
294def explanatory_lines(traces, settings):
295 if Phase.explain in settings.phases and sys.gettrace() and not traces:
296 return defaultdict(list)
297 # Return human-readable report lines summarising the traces
298 explanations = get_explaining_locations(traces)
299 max_lines = 5 if settings.verbosity <= Verbosity.normal else float("inf")
300 return make_report(explanations, cap_lines_at=max_lines)
301
302
303# beware the code below; we're using some heuristics to make a nicer report...
304
305
306@functools.lru_cache
307def _get_git_repo_root() -> Path:
308 try:
309 where = subprocess.run(
310 ["git", "rev-parse", "--show-toplevel"],
311 check=True,
312 timeout=10,
313 capture_output=True,
314 text=True,
315 encoding="utf-8",
316 ).stdout.strip()
317 except Exception: # pragma: no cover
318 return Path().absolute().parents[-1]
319 else:
320 return Path(where)
321
322
323def tractable_coverage_report(trace: Trace) -> dict[str, list[int]]:
324 """Report a simple coverage map which is (probably most) of the user's code."""
325 coverage: dict = {}
326 t = dict(trace)
327 for file, line in set(t.keys()).union(t.values()) - {None}: # type: ignore
328 # On Python <= 3.11, we can use coverage.py xor Hypothesis' tracer,
329 # so the trace will be empty and this line never run under coverage.
330 coverage.setdefault(file, set()).add(line) # pragma: no cover
331 stdlib_fragment = f"{os.sep}lib{os.sep}python3.{sys.version_info.minor}{os.sep}"
332 return {
333 k: sorted(v)
334 for k, v in coverage.items()
335 if stdlib_fragment not in k
336 and (p := Path(k)).is_relative_to(_get_git_repo_root())
337 and "site-packages" not in p.parts
338 }