1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
10
11import inspect
12import math
13import random
14from collections import defaultdict
15from contextlib import contextmanager
16from typing import Any, NoReturn, Union
17from weakref import WeakKeyDictionary
18
19from hypothesis import Verbosity, settings
20from hypothesis._settings import note_deprecation
21from hypothesis.errors import InvalidArgument, UnsatisfiedAssumption
22from hypothesis.internal.compat import BaseExceptionGroup
23from hypothesis.internal.conjecture.data import ConjectureData
24from hypothesis.internal.observability import TESTCASE_CALLBACKS
25from hypothesis.internal.reflection import get_pretty_function_description
26from hypothesis.internal.validation import check_type
27from hypothesis.reporting import report, verbose_report
28from hypothesis.utils.dynamicvariables import DynamicVariable
29from hypothesis.vendor.pretty import IDKey, pretty
30
31
32def _calling_function_location(what: str, frame: Any) -> str:
33 where = frame.f_back
34 return f"{what}() in {where.f_code.co_name} (line {where.f_lineno})"
35
36
37def reject() -> NoReturn:
38 if _current_build_context.value is None:
39 note_deprecation(
40 "Using `reject` outside a property-based test is deprecated",
41 since="2023-09-25",
42 has_codemod=False,
43 )
44 where = _calling_function_location("reject", inspect.currentframe())
45 if currently_in_test_context():
46 count = current_build_context().data._observability_predicates[where]
47 count["unsatisfied"] += 1
48 raise UnsatisfiedAssumption(where)
49
50
51def assume(condition: object) -> bool:
52 """Calling ``assume`` is like an :ref:`assert <python:assert>` that marks
53 the example as bad, rather than failing the test.
54
55 This allows you to specify properties that you *assume* will be
56 true, and let Hypothesis try to avoid similar examples in future.
57 """
58 if _current_build_context.value is None:
59 note_deprecation(
60 "Using `assume` outside a property-based test is deprecated",
61 since="2023-09-25",
62 has_codemod=False,
63 )
64 if TESTCASE_CALLBACKS or not condition:
65 where = _calling_function_location("assume", inspect.currentframe())
66 if TESTCASE_CALLBACKS and currently_in_test_context():
67 predicates = current_build_context().data._observability_predicates
68 predicates[where]["satisfied" if condition else "unsatisfied"] += 1
69 if not condition:
70 raise UnsatisfiedAssumption(f"failed to satisfy {where}")
71 return True
72
73
74_current_build_context = DynamicVariable(None)
75
76
77def currently_in_test_context() -> bool:
78 """Return ``True`` if the calling code is currently running inside an
79 :func:`@given <hypothesis.given>` or :doc:`stateful <stateful>` test,
80 ``False`` otherwise.
81
82 This is useful for third-party integrations and assertion helpers which
83 may be called from traditional or property-based tests, but can only use
84 :func:`~hypothesis.assume` or :func:`~hypothesis.target` in the latter case.
85 """
86 return _current_build_context.value is not None
87
88
89def current_build_context() -> "BuildContext":
90 context = _current_build_context.value
91 if context is None:
92 raise InvalidArgument("No build context registered")
93 return context
94
95
96class RandomSeeder:
97 def __init__(self, seed):
98 self.seed = seed
99
100 def __repr__(self):
101 return f"RandomSeeder({self.seed!r})"
102
103
104class _Checker:
105 def __init__(self) -> None:
106 self.saw_global_random = False
107
108 def __call__(self, x):
109 self.saw_global_random |= isinstance(x, RandomSeeder)
110 return x
111
112
113@contextmanager
114def deprecate_random_in_strategy(fmt, *args):
115 _global_rand_state = random.getstate()
116 yield (checker := _Checker())
117 if _global_rand_state != random.getstate() and not checker.saw_global_random:
118 # raise InvalidDefinition
119 note_deprecation(
120 "Do not use the `random` module inside strategies; instead "
121 "consider `st.randoms()`, `st.sampled_from()`, etc. " + fmt.format(*args),
122 since="2024-02-05",
123 has_codemod=False,
124 stacklevel=1,
125 )
126
127
128class BuildContext:
129 def __init__(self, data, *, is_final=False, close_on_capture=True):
130 assert isinstance(data, ConjectureData)
131 self.data = data
132 self.tasks = []
133 self.is_final = is_final
134 self.close_on_capture = close_on_capture
135 self.close_on_del = False
136 # Use defaultdict(list) here to handle the possibility of having multiple
137 # functions registered for the same object (due to caching, small ints, etc).
138 # The printer will discard duplicates which return different representations.
139 self.known_object_printers = defaultdict(list)
140
141 def record_call(self, obj, func, args, kwargs, arg_slices=None):
142 name = get_pretty_function_description(func)
143 self.known_object_printers[IDKey(obj)].append(
144 lambda obj, p, cycle: (
145 p.text("<...>")
146 if cycle
147 else p.repr_call(name, args, kwargs, arg_slices=arg_slices)
148 )
149 )
150
151 def prep_args_kwargs_from_strategies(self, kwarg_strategies):
152 arg_labels = {}
153 kwargs = {}
154 for k, s in kwarg_strategies.items():
155 start_idx = self.data.index
156 with deprecate_random_in_strategy("from {}={!r}", k, s) as check:
157 obj = check(self.data.draw(s, observe_as=f"generate:{k}"))
158 end_idx = self.data.index
159 kwargs[k] = obj
160
161 # This high up the stack, we can't see or really do much with the conjecture
162 # Example objects - not least because they're only materialized after the
163 # test case is completed. Instead, we'll stash the (start_idx, end_idx)
164 # pair on our data object for the ConjectureRunner engine to deal with, and
165 # pass a dict of such out so that the pretty-printer knows where to place
166 # the which-parts-matter comments later.
167 if start_idx != end_idx:
168 arg_labels[k] = (start_idx, end_idx)
169 self.data.arg_slices.add((start_idx, end_idx))
170
171 return kwargs, arg_labels
172
173 def __enter__(self):
174 self.assign_variable = _current_build_context.with_value(self)
175 self.assign_variable.__enter__()
176 return self
177
178 def __exit__(self, exc_type, exc_value, tb):
179 self.assign_variable.__exit__(exc_type, exc_value, tb)
180 errors = []
181 for task in self.tasks:
182 try:
183 task()
184 except BaseException as err:
185 errors.append(err)
186 if errors:
187 if len(errors) == 1:
188 raise errors[0] from exc_value
189 raise BaseExceptionGroup("Cleanup failed", errors) from exc_value
190
191
192def cleanup(teardown):
193 """Register a function to be called when the current test has finished
194 executing. Any exceptions thrown in teardown will be printed but not
195 rethrown.
196
197 Inside a test this isn't very interesting, because you can just use
198 a finally block, but note that you can use this inside map, flatmap,
199 etc. in order to e.g. insist that a value is closed at the end.
200 """
201 context = _current_build_context.value
202 if context is None:
203 raise InvalidArgument("Cannot register cleanup outside of build context")
204 context.tasks.append(teardown)
205
206
207def should_note():
208 context = _current_build_context.value
209 if context is None:
210 raise InvalidArgument("Cannot make notes outside of a test")
211 return context.is_final or settings.default.verbosity >= Verbosity.verbose
212
213
214def note(value: object) -> None:
215 """Report this value for the minimal failing example."""
216 if should_note():
217 if not isinstance(value, str):
218 value = pretty(value)
219 report(value)
220
221
222def event(value: str, payload: Union[str, int, float] = "") -> None:
223 """Record an event that occurred during this test. Statistics on the number of test
224 runs with each event will be reported at the end if you run Hypothesis in
225 statistics reporting mode.
226
227 Event values should be strings or convertible to them. If an optional
228 payload is given, it will be included in the string for :ref:`statistics`.
229 """
230 context = _current_build_context.value
231 if context is None:
232 raise InvalidArgument("Cannot make record events outside of a test")
233
234 payload = _event_to_string(payload, (str, int, float))
235 context.data.events[_event_to_string(value)] = payload
236
237
238_events_to_strings: WeakKeyDictionary = WeakKeyDictionary()
239
240
241def _event_to_string(event, allowed_types=str):
242 if isinstance(event, allowed_types):
243 return event
244 try:
245 return _events_to_strings[event]
246 except (KeyError, TypeError):
247 pass
248 result = str(event)
249 try:
250 _events_to_strings[event] = result
251 except TypeError:
252 pass
253 return result
254
255
256def target(observation: Union[int, float], *, label: str = "") -> Union[int, float]:
257 """Calling this function with an ``int`` or ``float`` observation gives it feedback
258 with which to guide our search for inputs that will cause an error, in
259 addition to all the usual heuristics. Observations must always be finite.
260
261 Hypothesis will try to maximize the observed value over several examples;
262 almost any metric will work so long as it makes sense to increase it.
263 For example, ``-abs(error)`` is a metric that increases as ``error``
264 approaches zero.
265
266 Example metrics:
267
268 - Number of elements in a collection, or tasks in a queue
269 - Mean or maximum runtime of a task (or both, if you use ``label``)
270 - Compression ratio for data (perhaps per-algorithm or per-level)
271 - Number of steps taken by a state machine
272
273 The optional ``label`` argument can be used to distinguish between
274 and therefore separately optimise distinct observations, such as the
275 mean and standard deviation of a dataset. It is an error to call
276 ``target()`` with any label more than once per test case.
277
278 .. note::
279 **The more examples you run, the better this technique works.**
280
281 As a rule of thumb, the targeting effect is noticeable above
282 :obj:`max_examples=1000 <hypothesis.settings.max_examples>`,
283 and immediately obvious by around ten thousand examples
284 *per label* used by your test.
285
286 :ref:`statistics` include the best score seen for each label,
287 which can help avoid `the threshold problem
288 <https://hypothesis.works/articles/threshold-problem/>`__ when the minimal
289 example shrinks right down to the threshold of failure (:issue:`2180`).
290 """
291 check_type((int, float), observation, "observation")
292 if not math.isfinite(observation):
293 raise InvalidArgument(f"{observation=} must be a finite float.")
294 check_type(str, label, "label")
295
296 context = _current_build_context.value
297 if context is None:
298 raise InvalidArgument(
299 "Calling target() outside of a test is invalid. "
300 "Consider guarding this call with `if currently_in_test_context(): ...`"
301 )
302 verbose_report(f"Saw target({observation!r}, {label=})")
303
304 if label in context.data.target_observations:
305 raise InvalidArgument(
306 f"Calling target({observation!r}, {label=}) would overwrite "
307 f"target({context.data.target_observations[label]!r}, {label=})"
308 )
309 else:
310 context.data.target_observations[label] = observation
311
312 return observation