1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
10
11import inspect
12import math
13import random
14from collections import defaultdict
15from collections.abc import Sequence
16from contextlib import contextmanager
17from typing import Any, Callable, NoReturn, Optional, Union
18from weakref import WeakKeyDictionary
19
20from hypothesis import Verbosity, settings
21from hypothesis._settings import note_deprecation
22from hypothesis.errors import InvalidArgument, UnsatisfiedAssumption
23from hypothesis.internal.compat import BaseExceptionGroup
24from hypothesis.internal.conjecture.data import ConjectureData
25from hypothesis.internal.observability import TESTCASE_CALLBACKS
26from hypothesis.internal.reflection import get_pretty_function_description
27from hypothesis.internal.validation import check_type
28from hypothesis.reporting import report, verbose_report
29from hypothesis.utils.dynamicvariables import DynamicVariable
30from hypothesis.vendor.pretty import IDKey, PrettyPrintFunction, pretty
31
32
33def _calling_function_location(what: str, frame: Any) -> str:
34 where = frame.f_back
35 return f"{what}() in {where.f_code.co_name} (line {where.f_lineno})"
36
37
38def reject() -> NoReturn:
39 if _current_build_context.value is None:
40 note_deprecation(
41 "Using `reject` outside a property-based test is deprecated",
42 since="2023-09-25",
43 has_codemod=False,
44 )
45 where = _calling_function_location("reject", inspect.currentframe())
46 if currently_in_test_context():
47 counts = current_build_context().data._observability_predicates[where]
48 counts.update_count(condition=False)
49 raise UnsatisfiedAssumption(where)
50
51
52def assume(condition: object) -> bool:
53 """Calling ``assume`` is like an :ref:`assert <python:assert>` that marks
54 the example as bad, rather than failing the test.
55
56 This allows you to specify properties that you *assume* will be
57 true, and let Hypothesis try to avoid similar examples in future.
58 """
59 if _current_build_context.value is None:
60 note_deprecation(
61 "Using `assume` outside a property-based test is deprecated",
62 since="2023-09-25",
63 has_codemod=False,
64 )
65 if TESTCASE_CALLBACKS or not condition:
66 where = _calling_function_location("assume", inspect.currentframe())
67 if TESTCASE_CALLBACKS and currently_in_test_context():
68 counts = current_build_context().data._observability_predicates[where]
69 counts.update_count(condition=bool(condition))
70 if not condition:
71 raise UnsatisfiedAssumption(f"failed to satisfy {where}")
72 return True
73
74
75_current_build_context = DynamicVariable[Optional["BuildContext"]](None)
76
77
78def currently_in_test_context() -> bool:
79 """Return ``True`` if the calling code is currently running inside an
80 |@given| or :ref:`stateful <stateful>` test, and ``False`` otherwise.
81
82 This is useful for third-party integrations and assertion helpers which
83 may be called from either traditional or property-based tests, and can only
84 use e.g. |assume| or |target| in the latter case.
85 """
86 return _current_build_context.value is not None
87
88
89def current_build_context() -> "BuildContext":
90 context = _current_build_context.value
91 if context is None:
92 raise InvalidArgument("No build context registered")
93 return context
94
95
96@contextmanager
97def deprecate_random_in_strategy(fmt, *args):
98 from hypothesis.internal import entropy
99
100 state_before = random.getstate()
101 yield
102 state_after = random.getstate()
103 if (
104 # there is a threading race condition here with deterministic_PRNG. Say
105 # we have two threads 1 and 2. We start in global random state A, and
106 # deterministic_PRNG sets to global random state B (which is constant across
107 # threads since we seed to 0 unconditionally). Then we might have state
108 # transitions:
109 #
110 # [1] [2]
111 # A -> B deterministic_PRNG().__enter__
112 # B ->B deterministic_PRNG().__enter__
113 # state_before = B deprecate_random_in_strategy.__enter__
114 # B -> A deterministic_PRNG().__exit__
115 # state_after = A deprecate_random_in_strategy.__exit__
116 #
117 # where state_before != state_after because a different thread has reset
118 # the global random state.
119 #
120 # To fix this, we track the known random states set by deterministic_PRNG,
121 # and will not note a deprecation if it matches one of those.
122 state_after != state_before
123 and hash(state_after) not in entropy._known_random_state_hashes
124 ):
125 note_deprecation(
126 "Do not use the `random` module inside strategies; instead "
127 "consider `st.randoms()`, `st.sampled_from()`, etc. " + fmt.format(*args),
128 since="2024-02-05",
129 has_codemod=False,
130 stacklevel=1,
131 )
132
133
134class BuildContext:
135 def __init__(
136 self,
137 data: ConjectureData,
138 *,
139 is_final: bool = False,
140 wrapped_test: Callable,
141 ) -> None:
142 self.data = data
143 self.tasks: list[Callable[[], Any]] = []
144 self.is_final = is_final
145 self.wrapped_test = wrapped_test
146
147 # Use defaultdict(list) here to handle the possibility of having multiple
148 # functions registered for the same object (due to caching, small ints, etc).
149 # The printer will discard duplicates which return different representations.
150 self.known_object_printers: dict[IDKey, list[PrettyPrintFunction]] = (
151 defaultdict(list)
152 )
153
154 def record_call(
155 self,
156 obj: object,
157 func: object,
158 args: Sequence[object],
159 kwargs: dict[str, object],
160 ) -> None:
161 self.known_object_printers[IDKey(obj)].append(
162 # _func=func prevents mypy from inferring lambda type. Would need
163 # paramspec I think - not worth it.
164 lambda obj, p, cycle, *, _func=func: p.maybe_repr_known_object_as_call( # type: ignore
165 obj, cycle, get_pretty_function_description(_func), args, kwargs
166 )
167 )
168
169 def prep_args_kwargs_from_strategies(self, kwarg_strategies):
170 arg_labels = {}
171 kwargs = {}
172 for k, s in kwarg_strategies.items():
173 start_idx = len(self.data.nodes)
174 with deprecate_random_in_strategy("from {}={!r}", k, s):
175 obj = self.data.draw(s, observe_as=f"generate:{k}")
176 end_idx = len(self.data.nodes)
177 kwargs[k] = obj
178
179 # This high up the stack, we can't see or really do much with the conjecture
180 # Example objects - not least because they're only materialized after the
181 # test case is completed. Instead, we'll stash the (start_idx, end_idx)
182 # pair on our data object for the ConjectureRunner engine to deal with, and
183 # pass a dict of such out so that the pretty-printer knows where to place
184 # the which-parts-matter comments later.
185 if start_idx != end_idx:
186 arg_labels[k] = (start_idx, end_idx)
187 self.data.arg_slices.add((start_idx, end_idx))
188
189 return kwargs, arg_labels
190
191 def __enter__(self):
192 self.assign_variable = _current_build_context.with_value(self)
193 self.assign_variable.__enter__()
194 return self
195
196 def __exit__(self, exc_type, exc_value, tb):
197 self.assign_variable.__exit__(exc_type, exc_value, tb)
198 errors = []
199 for task in self.tasks:
200 try:
201 task()
202 except BaseException as err:
203 errors.append(err)
204 if errors:
205 if len(errors) == 1:
206 raise errors[0] from exc_value
207 raise BaseExceptionGroup("Cleanup failed", errors) from exc_value
208
209
210def cleanup(teardown):
211 """Register a function to be called when the current test has finished
212 executing. Any exceptions thrown in teardown will be printed but not
213 rethrown.
214
215 Inside a test this isn't very interesting, because you can just use
216 a finally block, but note that you can use this inside map, flatmap,
217 etc. in order to e.g. insist that a value is closed at the end.
218 """
219 context = _current_build_context.value
220 if context is None:
221 raise InvalidArgument("Cannot register cleanup outside of build context")
222 context.tasks.append(teardown)
223
224
225def should_note():
226 context = _current_build_context.value
227 if context is None:
228 raise InvalidArgument("Cannot make notes outside of a test")
229 return context.is_final or settings.default.verbosity >= Verbosity.verbose
230
231
232def note(value: object) -> None:
233 """Report this value for the minimal failing example."""
234 if should_note():
235 if not isinstance(value, str):
236 value = pretty(value)
237 report(value)
238
239
240def event(value: str, payload: Union[str, int, float] = "") -> None:
241 """Record an event that occurred during this test. Statistics on the number of test
242 runs with each event will be reported at the end if you run Hypothesis in
243 statistics reporting mode.
244
245 Event values should be strings or convertible to them. If an optional
246 payload is given, it will be included in the string for :ref:`statistics`.
247 """
248 context = _current_build_context.value
249 if context is None:
250 raise InvalidArgument("Cannot make record events outside of a test")
251
252 payload = _event_to_string(payload, (str, int, float))
253 context.data.events[_event_to_string(value)] = payload
254
255
256_events_to_strings: WeakKeyDictionary = WeakKeyDictionary()
257
258
259def _event_to_string(event, allowed_types=str):
260 if isinstance(event, allowed_types):
261 return event
262 try:
263 return _events_to_strings[event]
264 except (KeyError, TypeError):
265 pass
266 result = str(event)
267 try:
268 _events_to_strings[event] = result
269 except TypeError:
270 pass
271 return result
272
273
274def target(observation: Union[int, float], *, label: str = "") -> Union[int, float]:
275 """Calling this function with an ``int`` or ``float`` observation gives it feedback
276 with which to guide our search for inputs that will cause an error, in
277 addition to all the usual heuristics. Observations must always be finite.
278
279 Hypothesis will try to maximize the observed value over several examples;
280 almost any metric will work so long as it makes sense to increase it.
281 For example, ``-abs(error)`` is a metric that increases as ``error``
282 approaches zero.
283
284 Example metrics:
285
286 - Number of elements in a collection, or tasks in a queue
287 - Mean or maximum runtime of a task (or both, if you use ``label``)
288 - Compression ratio for data (perhaps per-algorithm or per-level)
289 - Number of steps taken by a state machine
290
291 The optional ``label`` argument can be used to distinguish between
292 and therefore separately optimise distinct observations, such as the
293 mean and standard deviation of a dataset. It is an error to call
294 ``target()`` with any label more than once per test case.
295
296 .. note::
297 The more examples you run, the better this technique works.
298
299 As a rule of thumb, the targeting effect is noticeable above
300 :obj:`max_examples=1000 <hypothesis.settings.max_examples>`,
301 and immediately obvious by around ten thousand examples
302 *per label* used by your test.
303
304 :ref:`statistics` include the best score seen for each label,
305 which can help avoid `the threshold problem
306 <https://hypothesis.works/articles/threshold-problem/>`__ when the minimal
307 example shrinks right down to the threshold of failure (:issue:`2180`).
308 """
309 check_type((int, float), observation, "observation")
310 if not math.isfinite(observation):
311 raise InvalidArgument(f"{observation=} must be a finite float.")
312 check_type(str, label, "label")
313
314 context = _current_build_context.value
315 if context is None:
316 raise InvalidArgument(
317 "Calling target() outside of a test is invalid. "
318 "Consider guarding this call with `if currently_in_test_context(): ...`"
319 )
320 elif context.data.provider.avoid_realization:
321 # We could in principle realize this in the engine, but it seems more
322 # efficient to have our alternative backend optimize it for us.
323 # See e.g. https://github.com/pschanely/hypothesis-crosshair/issues/3
324 return observation # pragma: no cover
325 verbose_report(f"Saw target({observation!r}, {label=})")
326
327 if label in context.data.target_observations:
328 raise InvalidArgument(
329 f"Calling target({observation!r}, {label=}) would overwrite "
330 f"target({context.data.target_observations[label]!r}, {label=})"
331 )
332 else:
333 context.data.target_observations[label] = observation
334
335 return observation