1# This file is part of Hypothesis, which may be found at 
    2# https://github.com/HypothesisWorks/hypothesis/ 
    3# 
    4# Copyright the Hypothesis Authors. 
    5# Individual contributors are listed in AUTHORS.rst and the git log. 
    6# 
    7# This Source Code Form is subject to the terms of the Mozilla Public License, 
    8# v. 2.0. If a copy of the MPL was not distributed with this file, You can 
    9# obtain one at https://mozilla.org/MPL/2.0/. 
    10 
    11import functools 
    12import os 
    13import re 
    14import subprocess 
    15import sys 
    16import types 
    17from collections import defaultdict 
    18from functools import lru_cache, reduce 
    19from os import sep 
    20from pathlib import Path 
    21from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple 
    22 
    23from hypothesis._settings import Phase, Verbosity 
    24from hypothesis.internal.escalation import is_hypothesis_file 
    25 
    26if TYPE_CHECKING: 
    27    from typing import TypeAlias 
    28else: 
    29    TypeAlias = object 
    30 
    31Location: TypeAlias = Tuple[str, int] 
    32Branch: TypeAlias = Tuple[Optional[Location], Location] 
    33Trace: TypeAlias = Set[Branch] 
    34 
    35 
    36@lru_cache(maxsize=None) 
    37def should_trace_file(fname): 
    38    # fname.startswith("<") indicates runtime code-generation via compile, 
    39    # e.g. compile("def ...", "<string>", "exec") in e.g. attrs methods. 
    40    return not (is_hypothesis_file(fname) or fname.startswith("<")) 
    41 
    42 
    43# where possible, we'll use 3.12's new sys.monitoring module for low-overhead 
    44# coverage instrumentation; on older python versions we'll use sys.settrace. 
    45# tool_id = 1 is designated for coverage, but we intentionally choose a 
    46# non-reserved tool id so we can co-exist with coverage tools. 
    47MONITORING_TOOL_ID = 3 
    48if sys.version_info[:2] >= (3, 12): 
    49    MONITORING_EVENTS = {sys.monitoring.events.LINE: "trace_line"} 
    50 
    51 
    52class Tracer: 
    53    """A super-simple branch coverage tracer.""" 
    54 
    55    __slots__ = ("branches", "_previous_location") 
    56 
    57    def __init__(self): 
    58        self.branches: Trace = set() 
    59        self._previous_location = None 
    60 
    61    def trace(self, frame, event, arg): 
    62        try: 
    63            if event == "call": 
    64                return self.trace 
    65            elif event == "line": 
    66                # manual inlining of self.trace_line for performance. 
    67                fname = frame.f_code.co_filename 
    68                if should_trace_file(fname): 
    69                    current_location = (fname, frame.f_lineno) 
    70                    self.branches.add((self._previous_location, current_location)) 
    71                    self._previous_location = current_location 
    72        except RecursionError: 
    73            pass 
    74 
    75    def trace_line(self, code: types.CodeType, line_number: int) -> None: 
    76        fname = code.co_filename 
    77        if should_trace_file(fname): 
    78            current_location = (fname, line_number) 
    79            self.branches.add((self._previous_location, current_location)) 
    80            self._previous_location = current_location 
    81 
    82    def __enter__(self): 
    83        if sys.version_info[:2] < (3, 12): 
    84            assert sys.gettrace() is None  # caller checks in core.py 
    85            sys.settrace(self.trace) 
    86            return self 
    87 
    88        sys.monitoring.use_tool_id(MONITORING_TOOL_ID, "scrutineer") 
    89        for event, callback_name in MONITORING_EVENTS.items(): 
    90            sys.monitoring.set_events(MONITORING_TOOL_ID, event) 
    91            callback = getattr(self, callback_name) 
    92            sys.monitoring.register_callback(MONITORING_TOOL_ID, event, callback) 
    93 
    94        return self 
    95 
    96    def __exit__(self, *args, **kwargs): 
    97        if sys.version_info[:2] < (3, 12): 
    98            sys.settrace(None) 
    99            return 
    100 
    101        sys.monitoring.free_tool_id(MONITORING_TOOL_ID) 
    102        for event in MONITORING_EVENTS: 
    103            sys.monitoring.register_callback(MONITORING_TOOL_ID, event, None) 
    104 
    105 
    106UNHELPFUL_LOCATIONS = ( 
    107    # There's a branch which is only taken when an exception is active while exiting 
    108    # a contextmanager; this is probably after the fault has been triggered. 
    109    # Similar reasoning applies to a few other standard-library modules: even 
    110    # if the fault was later, these still aren't useful locations to report! 
    111    # Note: The list is post-processed, so use plain "/" for separator here. 
    112    "/contextlib.py", 
    113    "/inspect.py", 
    114    "/re.py", 
    115    "/re/__init__.py",  # refactored in Python 3.11 
    116    "/warnings.py", 
    117    # Quite rarely, the first AFNP line is in Pytest's internals. 
    118    "/_pytest/_io/saferepr.py", 
    119    "/_pytest/assertion/*.py", 
    120    "/_pytest/config/__init__.py", 
    121    "/_pytest/pytester.py", 
    122    "/pluggy/_*.py", 
    123    "/reprlib.py", 
    124    "/typing.py", 
    125    "/conftest.py", 
    126) 
    127 
    128 
    129def _glob_to_re(locs): 
    130    """Translate a list of glob patterns to a combined regular expression. 
    131    Only the * wildcard is supported, and patterns including special 
    132    characters will only work by chance.""" 
    133    # fnmatch.translate is not an option since its "*" consumes path sep 
    134    return "|".join( 
    135        loc.replace("*", r"[^/]+") 
    136        .replace(".", re.escape(".")) 
    137        .replace("/", re.escape(sep)) 
    138        + r"\Z"  # right anchored 
    139        for loc in locs 
    140    ) 
    141 
    142 
    143def get_explaining_locations(traces): 
    144    # Traces is a dict[interesting_origin | None, set[frozenset[tuple[str, int]]]] 
    145    # Each trace in the set might later become a Counter instead of frozenset. 
    146    if not traces: 
    147        return {} 
    148 
    149    unions = {origin: set().union(*values) for origin, values in traces.items()} 
    150    seen_passing = {None}.union(*unions.pop(None, set())) 
    151 
    152    always_failing_never_passing = { 
    153        origin: reduce(set.intersection, [set().union(*v) for v in values]) 
    154        - seen_passing 
    155        for origin, values in traces.items() 
    156        if origin is not None 
    157    } 
    158 
    159    # Build the observed parts of the control-flow graph for each origin 
    160    cf_graphs = {origin: defaultdict(set) for origin in unions} 
    161    for origin, seen_arcs in unions.items(): 
    162        for src, dst in seen_arcs: 
    163            cf_graphs[origin][src].add(dst) 
    164        assert cf_graphs[origin][None], "Expected start node with >=1 successor" 
    165 
    166    # For each origin, our explanation is the always_failing_never_passing lines 
    167    # which are reachable from the start node (None) without passing through another 
    168    # AFNP line.  So here's a whatever-first search with early stopping: 
    169    explanations = defaultdict(set) 
    170    for origin in unions: 
    171        queue = {None} 
    172        seen = set() 
    173        while queue: 
    174            assert queue.isdisjoint(seen), f"Intersection: {queue & seen}" 
    175            src = queue.pop() 
    176            seen.add(src) 
    177            if src in always_failing_never_passing[origin]: 
    178                explanations[origin].add(src) 
    179            else: 
    180                queue.update(cf_graphs[origin][src] - seen) 
    181 
    182    # The last step is to filter out explanations that we know would be uninformative. 
    183    # When this is the first AFNP location, we conclude that Scrutineer missed the 
    184    # real divergence (earlier in the trace) and drop that unhelpful explanation. 
    185    filter_regex = re.compile(_glob_to_re(UNHELPFUL_LOCATIONS)) 
    186    return { 
    187        origin: {loc for loc in afnp_locs if not filter_regex.search(loc[0])} 
    188        for origin, afnp_locs in explanations.items() 
    189    } 
    190 
    191 
    192LIB_DIR = str(Path(sys.executable).parent / "lib") 
    193EXPLANATION_STUB = ( 
    194    "Explanation:", 
    195    "    These lines were always and only run by failing examples:", 
    196) 
    197 
    198 
    199def make_report(explanations, cap_lines_at=5): 
    200    report = defaultdict(list) 
    201    for origin, locations in explanations.items(): 
    202        report_lines = [f"        {fname}:{lineno}" for fname, lineno in locations] 
    203        report_lines.sort(key=lambda line: (line.startswith(LIB_DIR), line)) 
    204        if len(report_lines) > cap_lines_at + 1: 
    205            msg = "        (and {} more with settings.verbosity >= verbose)" 
    206            report_lines[cap_lines_at:] = [msg.format(len(report_lines[cap_lines_at:]))] 
    207        if report_lines:  # We might have filtered out every location as uninformative. 
    208            report[origin] = list(EXPLANATION_STUB) + report_lines 
    209    return report 
    210 
    211 
    212def explanatory_lines(traces, settings): 
    213    if Phase.explain in settings.phases and sys.gettrace() and not traces: 
    214        return defaultdict(list) 
    215    # Return human-readable report lines summarising the traces 
    216    explanations = get_explaining_locations(traces) 
    217    max_lines = 5 if settings.verbosity <= Verbosity.normal else float("inf") 
    218    return make_report(explanations, cap_lines_at=max_lines) 
    219 
    220 
    221# beware the code below; we're using some heuristics to make a nicer report... 
    222 
    223 
    224@functools.lru_cache 
    225def _get_git_repo_root() -> Path: 
    226    try: 
    227        where = subprocess.run( 
    228            ["git", "rev-parse", "--show-toplevel"], 
    229            check=True, 
    230            timeout=10, 
    231            capture_output=True, 
    232            text=True, 
    233            encoding="utf-8", 
    234        ).stdout.strip() 
    235    except Exception:  # pragma: no cover 
    236        return Path().absolute().parents[-1] 
    237    else: 
    238        return Path(where) 
    239 
    240 
    241if sys.version_info[:2] <= (3, 8): 
    242 
    243    def is_relative_to(self, other): 
    244        return other == self or other in self.parents 
    245 
    246else: 
    247    is_relative_to = Path.is_relative_to 
    248 
    249 
    250def tractable_coverage_report(trace: Trace) -> Dict[str, List[int]]: 
    251    """Report a simple coverage map which is (probably most) of the user's code.""" 
    252    coverage: dict = {} 
    253    t = dict(trace) 
    254    for file, line in set(t.keys()).union(t.values()) - {None}:  # type: ignore 
    255        # On Python <= 3.11, we can use coverage.py xor Hypothesis' tracer, 
    256        # so the trace will be empty and this line never run under coverage. 
    257        coverage.setdefault(file, set()).add(line)  # pragma: no cover 
    258    stdlib_fragment = f"{os.sep}lib{os.sep}python3.{sys.version_info.minor}{os.sep}" 
    259    return { 
    260        k: sorted(v) 
    261        for k, v in coverage.items() 
    262        if stdlib_fragment not in k 
    263        and is_relative_to(p := Path(k), _get_git_repo_root()) 
    264        and "site-packages" not in p.parts 
    265    }