1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
10
11import inspect
12import math
13import random
14from collections import defaultdict
15from collections.abc import Sequence
16from contextlib import contextmanager
17from typing import Any, Callable, NoReturn, Optional, Union
18from weakref import WeakKeyDictionary
19
20from hypothesis import Verbosity, settings
21from hypothesis._settings import note_deprecation
22from hypothesis.errors import InvalidArgument, UnsatisfiedAssumption
23from hypothesis.internal.compat import BaseExceptionGroup
24from hypothesis.internal.conjecture.data import ConjectureData
25from hypothesis.internal.observability import TESTCASE_CALLBACKS
26from hypothesis.internal.reflection import get_pretty_function_description
27from hypothesis.internal.validation import check_type
28from hypothesis.reporting import report, verbose_report
29from hypothesis.utils.dynamicvariables import DynamicVariable
30from hypothesis.vendor.pretty import IDKey, PrettyPrintFunction, pretty
31
32
33def _calling_function_location(what: str, frame: Any) -> str:
34 where = frame.f_back
35 return f"{what}() in {where.f_code.co_name} (line {where.f_lineno})"
36
37
38def reject() -> NoReturn:
39 if _current_build_context.value is None:
40 note_deprecation(
41 "Using `reject` outside a property-based test is deprecated",
42 since="2023-09-25",
43 has_codemod=False,
44 )
45 where = _calling_function_location("reject", inspect.currentframe())
46 if currently_in_test_context():
47 counts = current_build_context().data._observability_predicates[where]
48 counts.update_count(condition=False)
49 raise UnsatisfiedAssumption(where)
50
51
52def assume(condition: object) -> bool:
53 """Calling ``assume`` is like an :ref:`assert <python:assert>` that marks
54 the example as bad, rather than failing the test.
55
56 This allows you to specify properties that you *assume* will be
57 true, and let Hypothesis try to avoid similar examples in future.
58 """
59 if _current_build_context.value is None:
60 note_deprecation(
61 "Using `assume` outside a property-based test is deprecated",
62 since="2023-09-25",
63 has_codemod=False,
64 )
65 if TESTCASE_CALLBACKS or not condition:
66 where = _calling_function_location("assume", inspect.currentframe())
67 if TESTCASE_CALLBACKS and currently_in_test_context():
68 counts = current_build_context().data._observability_predicates[where]
69 counts.update_count(condition=bool(condition))
70 if not condition:
71 raise UnsatisfiedAssumption(f"failed to satisfy {where}")
72 return True
73
74
75_current_build_context = DynamicVariable[Optional["BuildContext"]](None)
76
77
78def currently_in_test_context() -> bool:
79 """Return ``True`` if the calling code is currently running inside an
80 |@given| or :ref:`stateful <stateful>` test, and ``False`` otherwise.
81
82 This is useful for third-party integrations and assertion helpers which
83 may be called from either traditional or property-based tests, and can only
84 use e.g. |assume| or |target| in the latter case.
85 """
86 return _current_build_context.value is not None
87
88
89def current_build_context() -> "BuildContext":
90 context = _current_build_context.value
91 if context is None:
92 raise InvalidArgument("No build context registered")
93 return context
94
95
96class RandomSeeder:
97 def __init__(self, seed):
98 self.seed = seed
99
100 def __repr__(self):
101 return f"RandomSeeder({self.seed!r})"
102
103
104class _Checker:
105 def __init__(self) -> None:
106 self.saw_global_random = False
107
108 def __call__(self, x):
109 self.saw_global_random |= isinstance(x, RandomSeeder)
110 return x
111
112
113@contextmanager
114def deprecate_random_in_strategy(fmt, *args):
115 _global_rand_state = random.getstate()
116 yield (checker := _Checker())
117 if _global_rand_state != random.getstate() and not checker.saw_global_random:
118 # raise InvalidDefinition
119 note_deprecation(
120 "Do not use the `random` module inside strategies; instead "
121 "consider `st.randoms()`, `st.sampled_from()`, etc. " + fmt.format(*args),
122 since="2024-02-05",
123 has_codemod=False,
124 stacklevel=1,
125 )
126
127
128class BuildContext:
129 def __init__(
130 self,
131 data: ConjectureData,
132 *,
133 is_final: bool = False,
134 wrapped_test: Callable,
135 ) -> None:
136 self.data = data
137 self.tasks: list[Callable[[], Any]] = []
138 self.is_final = is_final
139 self.wrapped_test = wrapped_test
140
141 # Use defaultdict(list) here to handle the possibility of having multiple
142 # functions registered for the same object (due to caching, small ints, etc).
143 # The printer will discard duplicates which return different representations.
144 self.known_object_printers: dict[IDKey, list[PrettyPrintFunction]] = (
145 defaultdict(list)
146 )
147
148 def record_call(
149 self,
150 obj: object,
151 func: object,
152 args: Sequence[object],
153 kwargs: dict[str, object],
154 ) -> None:
155 self.known_object_printers[IDKey(obj)].append(
156 # _func=func prevents mypy from inferring lambda type. Would need
157 # paramspec I think - not worth it.
158 lambda obj, p, cycle, *, _func=func: p.maybe_repr_known_object_as_call( # type: ignore
159 obj, cycle, get_pretty_function_description(_func), args, kwargs
160 )
161 )
162
163 def prep_args_kwargs_from_strategies(self, kwarg_strategies):
164 arg_labels = {}
165 kwargs = {}
166 for k, s in kwarg_strategies.items():
167 start_idx = len(self.data.nodes)
168 with deprecate_random_in_strategy("from {}={!r}", k, s) as check:
169 obj = check(self.data.draw(s, observe_as=f"generate:{k}"))
170 end_idx = len(self.data.nodes)
171 kwargs[k] = obj
172
173 # This high up the stack, we can't see or really do much with the conjecture
174 # Example objects - not least because they're only materialized after the
175 # test case is completed. Instead, we'll stash the (start_idx, end_idx)
176 # pair on our data object for the ConjectureRunner engine to deal with, and
177 # pass a dict of such out so that the pretty-printer knows where to place
178 # the which-parts-matter comments later.
179 if start_idx != end_idx:
180 arg_labels[k] = (start_idx, end_idx)
181 self.data.arg_slices.add((start_idx, end_idx))
182
183 return kwargs, arg_labels
184
185 def __enter__(self):
186 self.assign_variable = _current_build_context.with_value(self)
187 self.assign_variable.__enter__()
188 return self
189
190 def __exit__(self, exc_type, exc_value, tb):
191 self.assign_variable.__exit__(exc_type, exc_value, tb)
192 errors = []
193 for task in self.tasks:
194 try:
195 task()
196 except BaseException as err:
197 errors.append(err)
198 if errors:
199 if len(errors) == 1:
200 raise errors[0] from exc_value
201 raise BaseExceptionGroup("Cleanup failed", errors) from exc_value
202
203
204def cleanup(teardown):
205 """Register a function to be called when the current test has finished
206 executing. Any exceptions thrown in teardown will be printed but not
207 rethrown.
208
209 Inside a test this isn't very interesting, because you can just use
210 a finally block, but note that you can use this inside map, flatmap,
211 etc. in order to e.g. insist that a value is closed at the end.
212 """
213 context = _current_build_context.value
214 if context is None:
215 raise InvalidArgument("Cannot register cleanup outside of build context")
216 context.tasks.append(teardown)
217
218
219def should_note():
220 context = _current_build_context.value
221 if context is None:
222 raise InvalidArgument("Cannot make notes outside of a test")
223 return context.is_final or settings.default.verbosity >= Verbosity.verbose
224
225
226def note(value: object) -> None:
227 """Report this value for the minimal failing example."""
228 if should_note():
229 if not isinstance(value, str):
230 value = pretty(value)
231 report(value)
232
233
234def event(value: str, payload: Union[str, int, float] = "") -> None:
235 """Record an event that occurred during this test. Statistics on the number of test
236 runs with each event will be reported at the end if you run Hypothesis in
237 statistics reporting mode.
238
239 Event values should be strings or convertible to them. If an optional
240 payload is given, it will be included in the string for :ref:`statistics`.
241 """
242 context = _current_build_context.value
243 if context is None:
244 raise InvalidArgument("Cannot make record events outside of a test")
245
246 payload = _event_to_string(payload, (str, int, float))
247 context.data.events[_event_to_string(value)] = payload
248
249
250_events_to_strings: WeakKeyDictionary = WeakKeyDictionary()
251
252
253def _event_to_string(event, allowed_types=str):
254 if isinstance(event, allowed_types):
255 return event
256 try:
257 return _events_to_strings[event]
258 except (KeyError, TypeError):
259 pass
260 result = str(event)
261 try:
262 _events_to_strings[event] = result
263 except TypeError:
264 pass
265 return result
266
267
268def target(observation: Union[int, float], *, label: str = "") -> Union[int, float]:
269 """Calling this function with an ``int`` or ``float`` observation gives it feedback
270 with which to guide our search for inputs that will cause an error, in
271 addition to all the usual heuristics. Observations must always be finite.
272
273 Hypothesis will try to maximize the observed value over several examples;
274 almost any metric will work so long as it makes sense to increase it.
275 For example, ``-abs(error)`` is a metric that increases as ``error``
276 approaches zero.
277
278 Example metrics:
279
280 - Number of elements in a collection, or tasks in a queue
281 - Mean or maximum runtime of a task (or both, if you use ``label``)
282 - Compression ratio for data (perhaps per-algorithm or per-level)
283 - Number of steps taken by a state machine
284
285 The optional ``label`` argument can be used to distinguish between
286 and therefore separately optimise distinct observations, such as the
287 mean and standard deviation of a dataset. It is an error to call
288 ``target()`` with any label more than once per test case.
289
290 .. note::
291 The more examples you run, the better this technique works.
292
293 As a rule of thumb, the targeting effect is noticeable above
294 :obj:`max_examples=1000 <hypothesis.settings.max_examples>`,
295 and immediately obvious by around ten thousand examples
296 *per label* used by your test.
297
298 :ref:`statistics` include the best score seen for each label,
299 which can help avoid `the threshold problem
300 <https://hypothesis.works/articles/threshold-problem/>`__ when the minimal
301 example shrinks right down to the threshold of failure (:issue:`2180`).
302 """
303 check_type((int, float), observation, "observation")
304 if not math.isfinite(observation):
305 raise InvalidArgument(f"{observation=} must be a finite float.")
306 check_type(str, label, "label")
307
308 context = _current_build_context.value
309 if context is None:
310 raise InvalidArgument(
311 "Calling target() outside of a test is invalid. "
312 "Consider guarding this call with `if currently_in_test_context(): ...`"
313 )
314 elif context.data.provider.avoid_realization:
315 # We could in principle realize this in the engine, but it seems more
316 # efficient to have our alternative backend optimize it for us.
317 # See e.g. https://github.com/pschanely/hypothesis-crosshair/issues/3
318 return observation # pragma: no cover
319 verbose_report(f"Saw target({observation!r}, {label=})")
320
321 if label in context.data.target_observations:
322 raise InvalidArgument(
323 f"Calling target({observation!r}, {label=}) would overwrite "
324 f"target({context.data.target_observations[label]!r}, {label=})"
325 )
326 else:
327 context.data.target_observations[label] = observation
328
329 return observation