Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/hypothesis/control.py: 54%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
11import inspect
12import math
13import random
14from collections import defaultdict
15from collections.abc import Callable, Generator, Sequence
16from contextlib import contextmanager
17from typing import Any, Literal, NoReturn, Optional, overload
18from weakref import WeakKeyDictionary
20from hypothesis import Verbosity, settings
21from hypothesis.errors import InvalidArgument, UnsatisfiedAssumption
22from hypothesis.internal.compat import BaseExceptionGroup
23from hypothesis.internal.conjecture.data import ConjectureData
24from hypothesis.internal.observability import observability_enabled
25from hypothesis.internal.reflection import get_pretty_function_description
26from hypothesis.internal.validation import check_type
27from hypothesis.reporting import report, verbose_report
28from hypothesis.utils.deprecation import note_deprecation
29from hypothesis.utils.dynamicvariables import DynamicVariable
30from hypothesis.vendor.pretty import ArgLabelsT, IDKey, PrettyPrintFunction, pretty
33def _calling_function_location(what: str, frame: Any) -> str:
34 where = frame.f_back
35 return f"{what}() in {where.f_code.co_name} (line {where.f_lineno})"
38def reject() -> NoReturn:
39 if _current_build_context.value is None:
40 note_deprecation(
41 "Using `reject` outside a property-based test is deprecated",
42 since="2023-09-25",
43 has_codemod=False,
44 )
45 where = _calling_function_location("reject", inspect.currentframe())
46 if currently_in_test_context():
47 counts = current_build_context().data._observability_predicates[where]
48 counts.update_count(condition=False)
49 raise UnsatisfiedAssumption(where)
52@overload
53def assume(condition: Literal[False] | None) -> NoReturn: ...
54@overload
55def assume(condition: object) -> Literal[True]: ...
58def assume(condition: object) -> Literal[True]:
59 """Calling ``assume`` is like an :ref:`assert <python:assert>` that marks
60 the example as bad, rather than failing the test.
62 This allows you to specify properties that you *assume* will be
63 true, and let Hypothesis try to avoid similar examples in future.
64 """
65 if _current_build_context.value is None:
66 note_deprecation(
67 "Using `assume` outside a property-based test is deprecated",
68 since="2023-09-25",
69 has_codemod=False,
70 )
71 if observability_enabled() or not condition:
72 where = _calling_function_location("assume", inspect.currentframe())
73 if observability_enabled() and currently_in_test_context():
74 counts = current_build_context().data._observability_predicates[where]
75 counts.update_count(condition=bool(condition))
76 if not condition:
77 raise UnsatisfiedAssumption(f"failed to satisfy {where}")
78 return True
81_current_build_context = DynamicVariable[Optional["BuildContext"]](None)
84def currently_in_test_context() -> bool:
85 """Return ``True`` if the calling code is currently running inside an
86 |@given| or :ref:`stateful <stateful>` test, and ``False`` otherwise.
88 This is useful for third-party integrations and assertion helpers which
89 may be called from either traditional or property-based tests, and can only
90 use e.g. |assume| or |target| in the latter case.
91 """
92 return _current_build_context.value is not None
95def current_build_context() -> "BuildContext":
96 context = _current_build_context.value
97 if context is None:
98 raise InvalidArgument("No build context registered")
99 return context
102@contextmanager
103def deprecate_random_in_strategy(fmt, *args):
104 from hypothesis.internal import entropy
106 state_before = random.getstate()
107 yield
108 state_after = random.getstate()
109 if (
110 # there is a threading race condition here with deterministic_PRNG. Say
111 # we have two threads 1 and 2. We start in global random state A, and
112 # deterministic_PRNG sets to global random state B (which is constant across
113 # threads since we seed to 0 unconditionally). Then we might have state
114 # transitions:
115 #
116 # [1] [2]
117 # A -> B deterministic_PRNG().__enter__
118 # B ->B deterministic_PRNG().__enter__
119 # state_before = B deprecate_random_in_strategy.__enter__
120 # B -> A deterministic_PRNG().__exit__
121 # state_after = A deprecate_random_in_strategy.__exit__
122 #
123 # where state_before != state_after because a different thread has reset
124 # the global random state.
125 #
126 # To fix this, we track the known random states set by deterministic_PRNG,
127 # and will not note a deprecation if it matches one of those.
128 state_after != state_before
129 and hash(state_after) not in entropy._known_random_state_hashes
130 ):
131 note_deprecation(
132 "Do not use the `random` module inside strategies; instead "
133 "consider `st.randoms()`, `st.sampled_from()`, etc. " + fmt.format(*args),
134 since="2024-02-05",
135 has_codemod=False,
136 stacklevel=1,
137 )
140class BuildContext:
141 def __init__(
142 self,
143 data: ConjectureData,
144 *,
145 is_final: bool = False,
146 wrapped_test: Callable,
147 ) -> None:
148 self.data = data
149 self.tasks: list[Callable[[], Any]] = []
150 self.is_final = is_final
151 self.wrapped_test = wrapped_test
153 # Use defaultdict(list) here to handle the possibility of having multiple
154 # functions registered for the same object (due to caching, small ints, etc).
155 # The printer will discard duplicates which return different representations.
156 self.known_object_printers: dict[IDKey, list[PrettyPrintFunction]] = (
157 defaultdict(list)
158 )
160 # Track nested strategy calls for explain-phase label paths
161 self._label_path: list[str] = []
163 @contextmanager
164 def track_arg_label(self, label: str) -> Generator[ArgLabelsT, None, None]:
165 start = len(self.data.nodes)
166 self._label_path.append(label)
167 arg_labels: ArgLabelsT = {}
168 try:
169 yield arg_labels
170 finally:
171 self._label_path.pop()
173 # This high up the stack, we can't see or really do much with
174 # Span / SpanRecord - not least because they're only materialized
175 # after the test case is completed.
176 #
177 # Instead, we'll stash the (start_idx, end_idx) pair on our data object
178 # for the ConjectureRunner engine to deal with, and mutate the arg_labels
179 # dict so that the pretty-printer knows where to place the
180 # which-parts-matter comments later.
181 end = len(self.data.nodes)
182 assert start <= end
183 if start != end:
184 arg_labels[label] = (start, end)
185 self.data.arg_slices.add((start, end))
187 def record_call(
188 self,
189 obj: object,
190 func: object,
191 *,
192 args: Sequence[object],
193 kwargs: dict[str, object],
194 arg_labels: ArgLabelsT | None = None,
195 ) -> None:
196 self.known_object_printers[IDKey(obj)].append(
197 lambda obj, p, cycle, *, _func=func, _arg_labels=arg_labels: p.maybe_repr_known_object_as_call( # type: ignore
198 obj,
199 cycle,
200 get_pretty_function_description(_func),
201 args,
202 kwargs,
203 arg_labels=_arg_labels,
204 )
205 )
207 def prep_args_kwargs_from_strategies(
208 self,
209 kwarg_strategies: dict[str, Any],
210 ) -> tuple[dict[str, Any], ArgLabelsT]:
211 arg_labels: ArgLabelsT = {}
212 kwargs: dict[str, Any] = {}
214 for k, s in kwarg_strategies.items():
215 with (
216 self.track_arg_label(k) as arg_label,
217 deprecate_random_in_strategy("from {}={!r}", k, s),
218 ):
219 kwargs[k] = self.data.draw(s, observe_as=f"generate:{k}")
220 arg_labels |= arg_label
222 return kwargs, arg_labels
224 def __enter__(self):
225 self.assign_variable = _current_build_context.with_value(self)
226 self.assign_variable.__enter__()
227 return self
229 def __exit__(self, exc_type, exc_value, tb):
230 self.assign_variable.__exit__(exc_type, exc_value, tb)
231 errors = []
232 for task in self.tasks:
233 try:
234 task()
235 except BaseException as err:
236 errors.append(err)
237 if errors:
238 if len(errors) == 1:
239 raise errors[0] from exc_value
240 raise BaseExceptionGroup("Cleanup failed", errors) from exc_value
243def cleanup(teardown):
244 """Register a function to be called when the current test has finished
245 executing. Any exceptions thrown in teardown will be printed but not
246 rethrown.
248 Inside a test this isn't very interesting, because you can just use
249 a finally block, but note that you can use this inside map, flatmap,
250 etc. in order to e.g. insist that a value is closed at the end.
251 """
252 context = _current_build_context.value
253 if context is None:
254 raise InvalidArgument("Cannot register cleanup outside of build context")
255 context.tasks.append(teardown)
258def should_note():
259 context = _current_build_context.value
260 if context is None:
261 raise InvalidArgument("Cannot make notes outside of a test")
262 return context.is_final or settings.default.verbosity >= Verbosity.verbose
265def note(value: object) -> None:
266 """Report this value for the minimal failing example."""
267 if should_note():
268 if not isinstance(value, str):
269 value = pretty(value)
270 report(value)
273def event(value: str, payload: str | int | float = "") -> None:
274 """Record an event that occurred during this test. Statistics on the number of test
275 runs with each event will be reported at the end if you run Hypothesis in
276 statistics reporting mode.
278 Event values should be strings or convertible to them. If an optional
279 payload is given, it will be included in the string for :ref:`statistics`.
280 """
281 context = _current_build_context.value
282 if context is None:
283 raise InvalidArgument("Cannot record events outside of a test")
285 avoid_realization = context.data.provider.avoid_realization
286 payload = _event_to_string(
287 payload, allowed_types=(str, int, float), avoid_realization=avoid_realization
288 )
289 value = _event_to_string(value, avoid_realization=avoid_realization)
290 context.data.events[value] = payload
293_events_to_strings: WeakKeyDictionary = WeakKeyDictionary()
296def _event_to_string(event, *, allowed_types=str, avoid_realization):
297 if isinstance(event, allowed_types):
298 return event
300 # _events_to_strings is a cache which persists across iterations, causing
301 # problems for symbolic backends. see
302 # https://github.com/pschanely/hypothesis-crosshair/issues/41
303 if avoid_realization:
304 return str(event)
306 try:
307 return _events_to_strings[event]
308 except (KeyError, TypeError):
309 pass
311 result = str(event)
312 try:
313 _events_to_strings[event] = result
314 except TypeError:
315 pass
316 return result
319def target(observation: int | float, *, label: str = "") -> int | float:
320 """Calling this function with an ``int`` or ``float`` observation gives it feedback
321 with which to guide our search for inputs that will cause an error, in
322 addition to all the usual heuristics. Observations must always be finite.
324 Hypothesis will try to maximize the observed value over several examples;
325 almost any metric will work so long as it makes sense to increase it.
326 For example, ``-abs(error)`` is a metric that increases as ``error``
327 approaches zero.
329 Example metrics:
331 - Number of elements in a collection, or tasks in a queue
332 - Mean or maximum runtime of a task (or both, if you use ``label``)
333 - Compression ratio for data (perhaps per-algorithm or per-level)
334 - Number of steps taken by a state machine
336 The optional ``label`` argument can be used to distinguish between
337 and therefore separately optimise distinct observations, such as the
338 mean and standard deviation of a dataset. It is an error to call
339 ``target()`` with any label more than once per test case.
341 .. note::
342 The more examples you run, the better this technique works.
344 As a rule of thumb, the targeting effect is noticeable above
345 :obj:`max_examples=1000 <hypothesis.settings.max_examples>`,
346 and immediately obvious by around ten thousand examples
347 *per label* used by your test.
349 :ref:`statistics` include the best score seen for each label,
350 which can help avoid `the threshold problem
351 <https://hypothesis.works/articles/threshold-problem/>`__ when the minimal
352 example shrinks right down to the threshold of failure (:issue:`2180`).
353 """
354 check_type((int, float), observation, "observation")
355 if not math.isfinite(observation):
356 raise InvalidArgument(f"{observation=} must be a finite float.")
357 check_type(str, label, "label")
359 context = _current_build_context.value
360 if context is None:
361 raise InvalidArgument(
362 "Calling target() outside of a test is invalid. "
363 "Consider guarding this call with `if currently_in_test_context(): ...`"
364 )
365 elif context.data.provider.avoid_realization:
366 # We could in principle realize this in the engine, but it seems more
367 # efficient to have our alternative backend optimize it for us.
368 # See e.g. https://github.com/pschanely/hypothesis-crosshair/issues/3
369 return observation # pragma: no cover
370 verbose_report(f"Saw target({observation!r}, {label=})")
372 if label in context.data.target_observations:
373 raise InvalidArgument(
374 f"Calling target({observation!r}, {label=}) would overwrite "
375 f"target({context.data.target_observations[label]!r}, {label=})"
376 )
377 else:
378 context.data.target_observations[label] = observation
380 return observation