Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/hypothesis/core.py: 35%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
11"""This module provides the core primitives of Hypothesis, such as given."""
12import base64
13import contextlib
14import dataclasses
15import datetime
16import inspect
17import io
18import math
19import os
20import sys
21import time
22import traceback
23import types
24import unittest
25import warnings
26import zlib
27from collections import defaultdict
28from collections.abc import Coroutine, Generator, Hashable, Iterable, Sequence
29from dataclasses import dataclass, field
30from functools import partial
31from inspect import Parameter
32from random import Random
33from typing import (
34 Any,
35 BinaryIO,
36 Callable,
37 Optional,
38 TypeVar,
39 Union,
40 overload,
41)
42from unittest import TestCase
44from hypothesis import strategies as st
45from hypothesis._settings import (
46 HealthCheck,
47 Phase,
48 Verbosity,
49 all_settings,
50 local_settings,
51 settings as Settings,
52)
53from hypothesis.control import BuildContext, currently_in_test_context
54from hypothesis.database import choices_from_bytes, choices_to_bytes
55from hypothesis.errors import (
56 BackendCannotProceed,
57 DeadlineExceeded,
58 DidNotReproduce,
59 FailedHealthCheck,
60 FlakyFailure,
61 FlakyReplay,
62 Found,
63 Frozen,
64 HypothesisException,
65 HypothesisWarning,
66 InvalidArgument,
67 NoSuchExample,
68 StopTest,
69 Unsatisfiable,
70 UnsatisfiedAssumption,
71)
72from hypothesis.internal import observability
73from hypothesis.internal.compat import (
74 PYPY,
75 BaseExceptionGroup,
76 EllipsisType,
77 add_note,
78 bad_django_TestCase,
79 get_type_hints,
80 int_from_bytes,
81)
82from hypothesis.internal.conjecture.choice import ChoiceT
83from hypothesis.internal.conjecture.data import ConjectureData, Status
84from hypothesis.internal.conjecture.engine import BUFFER_SIZE, ConjectureRunner
85from hypothesis.internal.conjecture.junkdrawer import (
86 ensure_free_stackframes,
87 gc_cumulative_time,
88)
89from hypothesis.internal.conjecture.providers import (
90 BytestringProvider,
91 PrimitiveProvider,
92)
93from hypothesis.internal.conjecture.shrinker import sort_key
94from hypothesis.internal.entropy import deterministic_PRNG
95from hypothesis.internal.escalation import (
96 InterestingOrigin,
97 current_pytest_item,
98 format_exception,
99 get_trimmed_traceback,
100 is_hypothesis_file,
101)
102from hypothesis.internal.healthcheck import fail_health_check
103from hypothesis.internal.observability import (
104 TESTCASE_CALLBACKS,
105 InfoObservation,
106 InfoObservationType,
107 deliver_observation,
108 make_testcase,
109)
110from hypothesis.internal.reflection import (
111 convert_positional_arguments,
112 define_function_signature,
113 function_digest,
114 get_pretty_function_description,
115 get_signature,
116 impersonate,
117 is_mock,
118 nicerepr,
119 proxies,
120 repr_call,
121)
122from hypothesis.internal.scrutineer import (
123 MONITORING_TOOL_ID,
124 Trace,
125 Tracer,
126 explanatory_lines,
127 tractable_coverage_report,
128)
129from hypothesis.internal.validation import check_type
130from hypothesis.reporting import (
131 current_verbosity,
132 report,
133 verbose_report,
134 with_reporter,
135)
136from hypothesis.statistics import describe_statistics, describe_targets, note_statistics
137from hypothesis.strategies._internal.misc import NOTHING
138from hypothesis.strategies._internal.strategies import (
139 Ex,
140 SearchStrategy,
141 check_strategy,
142)
143from hypothesis.utils.threading import ThreadLocal
144from hypothesis.vendor.pretty import RepresentationPrinter
145from hypothesis.version import __version__
147TestFunc = TypeVar("TestFunc", bound=Callable)
150running_under_pytest = False
151pytest_shows_exceptiongroups = True
152global_force_seed = None
153# `threadlocal` stores "engine-global" constants, which are global relative to a
154# ConjectureRunner instance (roughly speaking). Since only one conjecture runner
155# instance can be active per thread, making engine constants thread-local prevents
156# the ConjectureRunner instances of concurrent threads from treading on each other.
157threadlocal = ThreadLocal(_hypothesis_global_random=lambda: None)
160@dataclass
161class Example:
162 args: Any
163 kwargs: Any
164 # Plus two optional arguments for .xfail()
165 raises: Any = field(default=None)
166 reason: Any = field(default=None)
169# TODO_DOCS link to not-yet-existent patch-dumping docs
172class example:
173 """
174 Add an explicit input to a Hypothesis test, which Hypothesis will always
175 try before generating random inputs. This combines the randomized nature of
176 Hypothesis generation with a traditional parametrized test.
178 For example:
180 .. code-block:: python
182 @example("Hello world")
183 @example("some string with special significance")
184 @given(st.text())
185 def test_strings(s):
186 pass
188 will call ``test_strings("Hello World")`` and
189 ``test_strings("some string with special significance")`` before generating
190 any random inputs. |@example| may be placed in any order relative to |@given|
191 and |@settings|.
193 Explicit inputs from |@example| are run in the |Phase.explicit| phase.
194 Explicit inputs do not count towards |settings.max_examples|. Note that
195 explicit inputs added by |@example| do not shrink. If an explicit input
196 fails, Hypothesis will stop and report the failure without generating any
197 random inputs.
199 |@example| can also be used to easily reproduce a failure. For instance, if
200 Hypothesis reports that ``f(n=[0, math.nan])`` fails, you can add
201 ``@example(n=[0, math.nan])`` to your test to quickly reproduce that failure.
203 Arguments to ``@example``
204 -------------------------
206 Arguments to |@example| have the same behavior and restrictions as arguments
207 to |@given|. This means they may be either positional or keyword arguments
208 (but not both in the same |@example|):
210 .. code-block:: python
212 @example(1, 2)
213 @example(x=1, y=2)
214 @given(st.integers(), st.integers())
215 def test(x, y):
216 pass
218 Noting that while arguments to |@given| are strategies (like |st.integers|),
219 arguments to |@example| are values instead (like ``1``).
221 See the :ref:`given-arguments` section for full details.
222 """
224 def __init__(self, *args: Any, **kwargs: Any) -> None:
225 if args and kwargs:
226 raise InvalidArgument(
227 "Cannot mix positional and keyword arguments for examples"
228 )
229 if not (args or kwargs):
230 raise InvalidArgument("An example must provide at least one argument")
232 self.hypothesis_explicit_examples: list[Example] = []
233 self._this_example = Example(tuple(args), kwargs)
235 def __call__(self, test: TestFunc) -> TestFunc:
236 if not hasattr(test, "hypothesis_explicit_examples"):
237 test.hypothesis_explicit_examples = self.hypothesis_explicit_examples # type: ignore
238 test.hypothesis_explicit_examples.append(self._this_example) # type: ignore
239 return test
241 def xfail(
242 self,
243 condition: bool = True, # noqa: FBT002
244 *,
245 reason: str = "",
246 raises: Union[
247 type[BaseException], tuple[type[BaseException], ...]
248 ] = BaseException,
249 ) -> "example":
250 """Mark this example as an expected failure, similarly to
251 :obj:`pytest.mark.xfail(strict=True) <pytest.mark.xfail>`.
253 Expected-failing examples allow you to check that your test does fail on
254 some examples, and therefore build confidence that *passing* tests are
255 because your code is working, not because the test is missing something.
257 .. code-block:: python
259 @example(...).xfail()
260 @example(...).xfail(reason="Prices must be non-negative")
261 @example(...).xfail(raises=(KeyError, ValueError))
262 @example(...).xfail(sys.version_info[:2] >= (3, 12), reason="needs py 3.12")
263 @example(...).xfail(condition=sys.platform != "linux", raises=OSError)
264 def test(x):
265 pass
267 .. note::
269 Expected-failing examples are handled separately from those generated
270 by strategies, so you should usually ensure that there is no overlap.
272 .. code-block:: python
274 @example(x=1, y=0).xfail(raises=ZeroDivisionError)
275 @given(x=st.just(1), y=st.integers()) # Missing `.filter(bool)`!
276 def test_fraction(x, y):
277 # This test will try the explicit example and see it fail as
278 # expected, then go on to generate more examples from the
279 # strategy. If we happen to generate y=0, the test will fail
280 # because only the explicit example is treated as xfailing.
281 x / y
282 """
283 check_type(bool, condition, "condition")
284 check_type(str, reason, "reason")
285 if not (
286 isinstance(raises, type) and issubclass(raises, BaseException)
287 ) and not (
288 isinstance(raises, tuple)
289 and raises # () -> expected to fail with no error, which is impossible
290 and all(
291 isinstance(r, type) and issubclass(r, BaseException) for r in raises
292 )
293 ):
294 raise InvalidArgument(
295 f"{raises=} must be an exception type or tuple of exception types"
296 )
297 if condition:
298 self._this_example = dataclasses.replace(
299 self._this_example, raises=raises, reason=reason
300 )
301 return self
303 def via(self, whence: str, /) -> "example":
304 """Attach a machine-readable label noting what the origin of this example
305 was. |example.via| is completely optional and does not change runtime
306 behavior.
308 |example.via| is intended to support self-documenting behavior, as well as
309 tooling which might add (or remove) |@example| decorators automatically.
310 For example:
312 .. code-block:: python
314 # Annotating examples is optional and does not change runtime behavior
315 @example(...)
316 @example(...).via("regression test for issue #42")
317 @example(...).via("discovered failure")
318 def test(x):
319 pass
321 .. note::
323 `HypoFuzz <https://hypofuzz.com/>`_ uses |example.via| to tag examples
324 in the patch of its high-coverage set of explicit inputs, on
325 `the patches page <https://hypofuzz.com/example-dashboard/#/patches>`_.
326 """
327 if not isinstance(whence, str):
328 raise InvalidArgument(".via() must be passed a string")
329 # This is deliberately a no-op at runtime; the tools operate on source code.
330 return self
333def seed(seed: Hashable) -> Callable[[TestFunc], TestFunc]:
334 """
335 Seed the randomness for this test.
337 ``seed`` may be any hashable object. No exact meaning for ``seed`` is provided
338 other than that for a fixed seed value Hypothesis will produce the same
339 examples (assuming that there are no other sources of nondeterminisim, such
340 as timing, hash randomization, or external state).
342 For example, the following test function and |RuleBasedStateMachine| will
343 each generate the same series of examples each time they are executed:
345 .. code-block:: python
347 @seed(1234)
348 @given(st.integers())
349 def test(n): ...
351 @seed(6789)
352 class MyMachine(RuleBasedStateMachine): ...
354 If using pytest, you can alternatively pass ``--hypothesis-seed`` on the
355 command line.
357 Setting a seed overrides |settings.derandomize|, which is designed to enable
358 deterministic CI tests rather than reproducing observed failures.
360 Hypothesis will only print the seed which would reproduce a failure if a test
361 fails in an unexpected way, for instance inside Hypothesis internals.
362 """
364 def accept(test):
365 test._hypothesis_internal_use_seed = seed
366 current_settings = getattr(test, "_hypothesis_internal_use_settings", None)
367 test._hypothesis_internal_use_settings = Settings(
368 current_settings, database=None
369 )
370 return test
372 return accept
375# TODO_DOCS: link to /explanation/choice-sequence
378def reproduce_failure(version: str, blob: bytes) -> Callable[[TestFunc], TestFunc]:
379 """
380 Run the example corresponding to the binary ``blob`` in order to reproduce a
381 failure. ``blob`` is a serialized version of the internal input representation
382 of Hypothesis.
384 A test decorated with |@reproduce_failure| always runs exactly one example,
385 which is expected to cause a failure. If the provided ``blob`` does not
386 cause a failure, Hypothesis will raise |DidNotReproduce|.
388 Hypothesis will print an |@reproduce_failure| decorator if
389 |settings.print_blob| is ``True`` (which is the default in CI).
391 |@reproduce_failure| is intended to be temporarily added to your test suite in
392 order to reproduce a failure. It is not intended to be a permanent addition to
393 your test suite. Because of this, no compatibility guarantees are made across
394 Hypothesis versions, and |@reproduce_failure| will error if used on a different
395 Hypothesis version than it was created for.
397 .. seealso::
399 See also the :doc:`/tutorial/replaying-failures` tutorial.
400 """
402 def accept(test):
403 test._hypothesis_internal_use_reproduce_failure = (version, blob)
404 return test
406 return accept
409def reproduction_decorator(choices: Iterable[ChoiceT]) -> str:
410 return f"@reproduce_failure({__version__!r}, {encode_failure(choices)!r})"
413def encode_failure(choices: Iterable[ChoiceT]) -> bytes:
414 blob = choices_to_bytes(choices)
415 compressed = zlib.compress(blob)
416 if len(compressed) < len(blob):
417 blob = b"\1" + compressed
418 else:
419 blob = b"\0" + blob
420 return base64.b64encode(blob)
423def decode_failure(blob: bytes) -> Sequence[ChoiceT]:
424 try:
425 decoded = base64.b64decode(blob)
426 except Exception:
427 raise InvalidArgument(f"Invalid base64 encoded string: {blob!r}") from None
429 prefix = decoded[:1]
430 if prefix == b"\0":
431 decoded = decoded[1:]
432 elif prefix == b"\1":
433 try:
434 decoded = zlib.decompress(decoded[1:])
435 except zlib.error as err:
436 raise InvalidArgument(
437 f"Invalid zlib compression for blob {blob!r}"
438 ) from err
439 else:
440 raise InvalidArgument(
441 f"Could not decode blob {blob!r}: Invalid start byte {prefix!r}"
442 )
444 choices = choices_from_bytes(decoded)
445 if choices is None:
446 raise InvalidArgument(f"Invalid serialized choice sequence for blob {blob!r}")
448 return choices
451def _invalid(message, *, exc=InvalidArgument, test, given_kwargs):
452 @impersonate(test)
453 def wrapped_test(*arguments, **kwargs): # pragma: no cover # coverage limitation
454 raise exc(message)
456 wrapped_test.is_hypothesis_test = True
457 wrapped_test.hypothesis = HypothesisHandle(
458 inner_test=test,
459 _get_fuzz_target=wrapped_test,
460 _given_kwargs=given_kwargs,
461 )
462 return wrapped_test
465def is_invalid_test(test, original_sig, given_arguments, given_kwargs):
466 """Check the arguments to ``@given`` for basic usage constraints.
468 Most errors are not raised immediately; instead we return a dummy test
469 function that will raise the appropriate error if it is actually called.
470 When the user runs a subset of tests (e.g via ``pytest -k``), errors will
471 only be reported for tests that actually ran.
472 """
473 invalid = partial(_invalid, test=test, given_kwargs=given_kwargs)
475 if not (given_arguments or given_kwargs):
476 return invalid("given must be called with at least one argument")
478 params = list(original_sig.parameters.values())
479 pos_params = [p for p in params if p.kind is p.POSITIONAL_OR_KEYWORD]
480 kwonly_params = [p for p in params if p.kind is p.KEYWORD_ONLY]
481 if given_arguments and params != pos_params:
482 return invalid(
483 "positional arguments to @given are not supported with varargs, "
484 "varkeywords, positional-only, or keyword-only arguments"
485 )
487 if len(given_arguments) > len(pos_params):
488 return invalid(
489 f"Too many positional arguments for {test.__name__}() were passed to "
490 f"@given - expected at most {len(pos_params)} "
491 f"arguments, but got {len(given_arguments)} {given_arguments!r}"
492 )
494 if ... in given_arguments:
495 return invalid(
496 "... was passed as a positional argument to @given, but may only be "
497 "passed as a keyword argument or as the sole argument of @given"
498 )
500 if given_arguments and given_kwargs:
501 return invalid("cannot mix positional and keyword arguments to @given")
502 extra_kwargs = [
503 k for k in given_kwargs if k not in {p.name for p in pos_params + kwonly_params}
504 ]
505 if extra_kwargs and (params == [] or params[-1].kind is not params[-1].VAR_KEYWORD):
506 arg = extra_kwargs[0]
507 extra = ""
508 if arg in all_settings:
509 extra = f". Did you mean @settings({arg}={given_kwargs[arg]!r})?"
510 return invalid(
511 f"{test.__name__}() got an unexpected keyword argument {arg!r}, "
512 f"from `{arg}={given_kwargs[arg]!r}` in @given{extra}"
513 )
514 if any(p.default is not p.empty for p in params):
515 return invalid("Cannot apply @given to a function with defaults.")
517 # This case would raise Unsatisfiable *anyway*, but by detecting it here we can
518 # provide a much more helpful error message for people e.g. using the Ghostwriter.
519 empty = [
520 f"{s!r} (arg {idx})" for idx, s in enumerate(given_arguments) if s is NOTHING
521 ] + [f"{name}={s!r}" for name, s in given_kwargs.items() if s is NOTHING]
522 if empty:
523 strats = "strategies" if len(empty) > 1 else "strategy"
524 return invalid(
525 f"Cannot generate examples from empty {strats}: " + ", ".join(empty),
526 exc=Unsatisfiable,
527 )
530def execute_explicit_examples(state, wrapped_test, arguments, kwargs, original_sig):
531 assert isinstance(state, StateForActualGivenExecution)
532 posargs = [
533 p.name
534 for p in original_sig.parameters.values()
535 if p.kind is p.POSITIONAL_OR_KEYWORD
536 ]
538 for example in reversed(getattr(wrapped_test, "hypothesis_explicit_examples", ())):
539 assert isinstance(example, Example)
540 # All of this validation is to check that @example() got "the same" arguments
541 # as @given, i.e. corresponding to the same parameters, even though they might
542 # be any mixture of positional and keyword arguments.
543 if example.args:
544 assert not example.kwargs
545 if any(
546 p.kind is p.POSITIONAL_ONLY for p in original_sig.parameters.values()
547 ):
548 raise InvalidArgument(
549 "Cannot pass positional arguments to @example() when decorating "
550 "a test function which has positional-only parameters."
551 )
552 if len(example.args) > len(posargs):
553 raise InvalidArgument(
554 "example has too many arguments for test. Expected at most "
555 f"{len(posargs)} but got {len(example.args)}"
556 )
557 example_kwargs = dict(zip(posargs[-len(example.args) :], example.args))
558 else:
559 example_kwargs = dict(example.kwargs)
560 given_kws = ", ".join(
561 repr(k) for k in sorted(wrapped_test.hypothesis._given_kwargs)
562 )
563 example_kws = ", ".join(repr(k) for k in sorted(example_kwargs))
564 if given_kws != example_kws:
565 raise InvalidArgument(
566 f"Inconsistent args: @given() got strategies for {given_kws}, "
567 f"but @example() got arguments for {example_kws}"
568 ) from None
570 # This is certainly true because the example_kwargs exactly match the params
571 # reserved by @given(), which are then remove from the function signature.
572 assert set(example_kwargs).isdisjoint(kwargs)
573 example_kwargs.update(kwargs)
575 if Phase.explicit not in state.settings.phases:
576 continue
578 with local_settings(state.settings):
579 fragments_reported = []
580 empty_data = ConjectureData.for_choices([])
581 try:
582 execute_example = partial(
583 state.execute_once,
584 empty_data,
585 is_final=True,
586 print_example=True,
587 example_kwargs=example_kwargs,
588 )
589 with with_reporter(fragments_reported.append):
590 if example.raises is None:
591 execute_example()
592 else:
593 # @example(...).xfail(...)
594 bits = ", ".join(nicerepr(x) for x in arguments) + ", ".join(
595 f"{k}={nicerepr(v)}" for k, v in example_kwargs.items()
596 )
597 try:
598 execute_example()
599 except failure_exceptions_to_catch() as err:
600 if not isinstance(err, example.raises):
601 raise
602 # Save a string form of this example; we'll warn if it's
603 # ever generated by the strategy (which can't be xfailed)
604 state.xfail_example_reprs.add(
605 repr_call(state.test, arguments, example_kwargs)
606 )
607 except example.raises as err:
608 # We'd usually check this as early as possible, but it's
609 # possible for failure_exceptions_to_catch() to grow when
610 # e.g. pytest is imported between import- and test-time.
611 raise InvalidArgument(
612 f"@example({bits}) raised an expected {err!r}, "
613 "but Hypothesis does not treat this as a test failure"
614 ) from err
615 else:
616 # Unexpectedly passing; always raise an error in this case.
617 reason = f" because {example.reason}" * bool(example.reason)
618 if example.raises is BaseException:
619 name = "exception" # special-case no raises= arg
620 elif not isinstance(example.raises, tuple):
621 name = example.raises.__name__
622 elif len(example.raises) == 1:
623 name = example.raises[0].__name__
624 else:
625 name = (
626 ", ".join(ex.__name__ for ex in example.raises[:-1])
627 + f", or {example.raises[-1].__name__}"
628 )
629 vowel = name.upper()[0] in "AEIOU"
630 raise AssertionError(
631 f"Expected a{'n' * vowel} {name} from @example({bits})"
632 f"{reason}, but no exception was raised."
633 )
634 except UnsatisfiedAssumption:
635 # Odd though it seems, we deliberately support explicit examples that
636 # are then rejected by a call to `assume()`. As well as iterative
637 # development, this is rather useful to replay Hypothesis' part of
638 # a saved failure when other arguments are supplied by e.g. pytest.
639 # See https://github.com/HypothesisWorks/hypothesis/issues/2125
640 with contextlib.suppress(StopTest):
641 empty_data.conclude_test(Status.INVALID)
642 except BaseException as err:
643 # In order to support reporting of multiple failing examples, we yield
644 # each of the (report text, error) pairs we find back to the top-level
645 # runner. This also ensures that user-facing stack traces have as few
646 # frames of Hypothesis internals as possible.
647 err = err.with_traceback(get_trimmed_traceback())
649 # One user error - whether misunderstanding or typo - we've seen a few
650 # times is to pass strategies to @example() where values are expected.
651 # Checking is easy, and false-positives not much of a problem, so:
652 if isinstance(err, failure_exceptions_to_catch()) and any(
653 isinstance(arg, SearchStrategy)
654 for arg in example.args + tuple(example.kwargs.values())
655 ):
656 new = HypothesisWarning(
657 "The @example() decorator expects to be passed values, but "
658 "you passed strategies instead. See https://hypothesis."
659 "readthedocs.io/en/latest/reference/api.html#hypothesis"
660 ".example for details."
661 )
662 new.__cause__ = err
663 err = new
665 with contextlib.suppress(StopTest):
666 empty_data.conclude_test(Status.INVALID)
667 yield (fragments_reported, err)
668 if (
669 state.settings.report_multiple_bugs
670 and pytest_shows_exceptiongroups
671 and isinstance(err, failure_exceptions_to_catch())
672 and not isinstance(err, skip_exceptions_to_reraise())
673 ):
674 continue
675 break
676 finally:
677 if fragments_reported:
678 assert fragments_reported[0].startswith("Falsifying example")
679 fragments_reported[0] = fragments_reported[0].replace(
680 "Falsifying example", "Falsifying explicit example", 1
681 )
683 empty_data.freeze()
684 tc = make_testcase(
685 run_start=state._start_timestamp,
686 property=state.test_identifier,
687 data=empty_data,
688 how_generated="explicit example",
689 representation=state._string_repr,
690 timing=state._timing_features,
691 )
692 deliver_observation(tc)
694 if fragments_reported:
695 verbose_report(fragments_reported[0].replace("Falsifying", "Trying", 1))
696 for f in fragments_reported[1:]:
697 verbose_report(f)
700def get_random_for_wrapped_test(test, wrapped_test):
701 settings = wrapped_test._hypothesis_internal_use_settings
702 wrapped_test._hypothesis_internal_use_generated_seed = None
704 if wrapped_test._hypothesis_internal_use_seed is not None:
705 return Random(wrapped_test._hypothesis_internal_use_seed)
706 elif settings.derandomize:
707 return Random(int_from_bytes(function_digest(test)))
708 elif global_force_seed is not None:
709 return Random(global_force_seed)
710 else:
711 if threadlocal._hypothesis_global_random is None: # pragma: no cover
712 threadlocal._hypothesis_global_random = Random()
713 seed = threadlocal._hypothesis_global_random.getrandbits(128)
714 wrapped_test._hypothesis_internal_use_generated_seed = seed
715 return Random(seed)
718@dataclass
719class Stuff:
720 selfy: Any
721 args: tuple
722 kwargs: dict
723 given_kwargs: dict
726def process_arguments_to_given(
727 wrapped_test: Any,
728 arguments: Sequence[object],
729 kwargs: dict[str, object],
730 given_kwargs: dict[str, SearchStrategy],
731 params: dict[str, Parameter],
732) -> tuple[Sequence[object], dict[str, object], Stuff]:
733 selfy = None
734 arguments, kwargs = convert_positional_arguments(wrapped_test, arguments, kwargs)
736 # If the test function is a method of some kind, the bound object
737 # will be the first named argument if there are any, otherwise the
738 # first vararg (if any).
739 posargs = [p.name for p in params.values() if p.kind is p.POSITIONAL_OR_KEYWORD]
740 if posargs:
741 selfy = kwargs.get(posargs[0])
742 elif arguments:
743 selfy = arguments[0]
745 # Ensure that we don't mistake mocks for self here.
746 # This can cause the mock to be used as the test runner.
747 if is_mock(selfy):
748 selfy = None
750 arguments = tuple(arguments)
752 with ensure_free_stackframes():
753 for k, s in given_kwargs.items():
754 check_strategy(s, name=k)
755 s.validate()
757 stuff = Stuff(selfy=selfy, args=arguments, kwargs=kwargs, given_kwargs=given_kwargs)
759 return arguments, kwargs, stuff
762def skip_exceptions_to_reraise():
763 """Return a tuple of exceptions meaning 'skip this test', to re-raise.
765 This is intended to cover most common test runners; if you would
766 like another to be added please open an issue or pull request adding
767 it to this function and to tests/cover/test_lazy_import.py
768 """
769 # This is a set because nose may simply re-export unittest.SkipTest
770 exceptions = set()
771 # We use this sys.modules trick to avoid importing libraries -
772 # you can't be an instance of a type from an unimported module!
773 # This is fast enough that we don't need to cache the result,
774 # and more importantly it avoids possible side-effects :-)
775 if "unittest" in sys.modules:
776 exceptions.add(sys.modules["unittest"].SkipTest)
777 if "unittest2" in sys.modules:
778 exceptions.add(sys.modules["unittest2"].SkipTest)
779 if "nose" in sys.modules:
780 exceptions.add(sys.modules["nose"].SkipTest)
781 if "_pytest.outcomes" in sys.modules:
782 exceptions.add(sys.modules["_pytest.outcomes"].Skipped)
783 return tuple(sorted(exceptions, key=str))
786def failure_exceptions_to_catch() -> tuple[type[BaseException], ...]:
787 """Return a tuple of exceptions meaning 'this test has failed', to catch.
789 This is intended to cover most common test runners; if you would
790 like another to be added please open an issue or pull request.
791 """
792 # While SystemExit and GeneratorExit are instances of BaseException, we also
793 # expect them to be deterministic - unlike KeyboardInterrupt - and so we treat
794 # them as standard exceptions, check for flakiness, etc.
795 # See https://github.com/HypothesisWorks/hypothesis/issues/2223 for details.
796 exceptions = [Exception, SystemExit, GeneratorExit]
797 if "_pytest.outcomes" in sys.modules:
798 exceptions.append(sys.modules["_pytest.outcomes"].Failed)
799 return tuple(exceptions)
802def new_given_signature(original_sig, given_kwargs):
803 """Make an updated signature for the wrapped test."""
804 return original_sig.replace(
805 parameters=[
806 p
807 for p in original_sig.parameters.values()
808 if not (
809 p.name in given_kwargs
810 and p.kind in (p.POSITIONAL_OR_KEYWORD, p.KEYWORD_ONLY)
811 )
812 ],
813 return_annotation=None,
814 )
817def default_executor(data, function):
818 return function(data)
821def get_executor(runner):
822 try:
823 execute_example = runner.execute_example
824 except AttributeError:
825 pass
826 else:
827 return lambda data, function: execute_example(partial(function, data))
829 if hasattr(runner, "setup_example") or hasattr(runner, "teardown_example"):
830 setup = getattr(runner, "setup_example", None) or (lambda: None)
831 teardown = getattr(runner, "teardown_example", None) or (lambda ex: None)
833 def execute(data, function):
834 token = None
835 try:
836 token = setup()
837 return function(data)
838 finally:
839 teardown(token)
841 return execute
843 return default_executor
846@contextlib.contextmanager
847def unwrap_markers_from_group() -> Generator[None, None, None]:
848 # This function is a crude solution, a better way of resolving it would probably
849 # be to rewrite a bunch of exception handlers to use except*.
850 T = TypeVar("T", bound=BaseException)
852 def _flatten_group(excgroup: BaseExceptionGroup[T]) -> list[T]:
853 found_exceptions: list[T] = []
854 for exc in excgroup.exceptions:
855 if isinstance(exc, BaseExceptionGroup):
856 found_exceptions.extend(_flatten_group(exc))
857 else:
858 found_exceptions.append(exc)
859 return found_exceptions
861 try:
862 yield
863 except BaseExceptionGroup as excgroup:
864 frozen_exceptions, non_frozen_exceptions = excgroup.split(Frozen)
866 # group only contains Frozen, reraise the group
867 # it doesn't matter what we raise, since any exceptions get disregarded
868 # and reraised as StopTest if data got frozen.
869 if non_frozen_exceptions is None:
870 raise
871 # in all other cases they are discarded
873 # Can RewindRecursive end up in this group?
874 _, user_exceptions = non_frozen_exceptions.split(
875 lambda e: isinstance(e, (StopTest, HypothesisException))
876 )
878 # this might contain marker exceptions, or internal errors, but not frozen.
879 if user_exceptions is not None:
880 raise
882 # single marker exception - reraise it
883 flattened_non_frozen_exceptions: list[BaseException] = _flatten_group(
884 non_frozen_exceptions
885 )
886 if len(flattened_non_frozen_exceptions) == 1:
887 e = flattened_non_frozen_exceptions[0]
888 # preserve the cause of the original exception to not hinder debugging
889 # note that __context__ is still lost though
890 raise e from e.__cause__
892 # multiple marker exceptions. If we re-raise the whole group we break
893 # a bunch of logic so ....?
894 stoptests, non_stoptests = non_frozen_exceptions.split(StopTest)
896 # TODO: stoptest+hypothesisexception ...? Is it possible? If so, what do?
898 if non_stoptests:
899 # TODO: multiple marker exceptions is easy to produce, but the logic in the
900 # engine does not handle it... so we just reraise the first one for now.
901 e = _flatten_group(non_stoptests)[0]
902 raise e from e.__cause__
903 assert stoptests is not None
905 # multiple stoptests: raising the one with the lowest testcounter
906 raise min(_flatten_group(stoptests), key=lambda s_e: s_e.testcounter)
909class StateForActualGivenExecution:
910 def __init__(self, stuff, test, settings, random, wrapped_test):
911 self.test_runner = get_executor(stuff.selfy)
912 self.stuff = stuff
913 self.settings = settings
914 self.last_exception = None
915 self.falsifying_examples = ()
916 self.random = random
917 self.ever_executed = False
919 self.is_find = getattr(wrapped_test, "_hypothesis_internal_is_find", False)
920 self.wrapped_test = wrapped_test
921 self.xfail_example_reprs = set()
923 self.test = test
925 self.print_given_args = getattr(
926 wrapped_test, "_hypothesis_internal_print_given_args", True
927 )
929 self.files_to_propagate = set()
930 self.failed_normally = False
931 self.failed_due_to_deadline = False
933 self.explain_traces = defaultdict(set)
934 self._start_timestamp = time.time()
935 self._string_repr = ""
936 self._timing_features = {}
938 @property
939 def test_identifier(self):
940 return getattr(
941 current_pytest_item.value, "nodeid", None
942 ) or get_pretty_function_description(self.wrapped_test)
944 def _should_trace(self):
945 # NOTE: we explicitly support monkeypatching this. Keep the namespace
946 # access intact.
947 _trace_obs = TESTCASE_CALLBACKS and observability.OBSERVABILITY_COLLECT_COVERAGE
948 _trace_failure = (
949 self.failed_normally
950 and not self.failed_due_to_deadline
951 and {Phase.shrink, Phase.explain}.issubset(self.settings.phases)
952 )
953 return _trace_obs or _trace_failure
955 def execute_once(
956 self,
957 data,
958 *,
959 print_example=False,
960 is_final=False,
961 expected_failure=None,
962 example_kwargs=None,
963 ):
964 """Run the test function once, using ``data`` as input.
966 If the test raises an exception, it will propagate through to the
967 caller of this method. Depending on its type, this could represent
968 an ordinary test failure, or a fatal error, or a control exception.
970 If this method returns normally, the test might have passed, or
971 it might have placed ``data`` in an unsuccessful state and then
972 swallowed the corresponding control exception.
973 """
975 self.ever_executed = True
976 data.is_find = self.is_find
978 self._string_repr = ""
979 text_repr = None
980 if self.settings.deadline is None and not TESTCASE_CALLBACKS:
982 @proxies(self.test)
983 def test(*args, **kwargs):
984 with unwrap_markers_from_group():
985 # NOTE: For compatibility with Python 3.9's LL(1)
986 # parser, this is written as a nested with-statement,
987 # instead of a compound one.
988 with ensure_free_stackframes():
989 return self.test(*args, **kwargs)
991 else:
993 @proxies(self.test)
994 def test(*args, **kwargs):
995 arg_drawtime = math.fsum(data.draw_times.values())
996 arg_stateful = math.fsum(data._stateful_run_times.values())
997 arg_gctime = gc_cumulative_time()
998 start = time.perf_counter()
999 try:
1000 with unwrap_markers_from_group():
1001 # NOTE: For compatibility with Python 3.9's LL(1)
1002 # parser, this is written as a nested with-statement,
1003 # instead of a compound one.
1004 with ensure_free_stackframes():
1005 result = self.test(*args, **kwargs)
1006 finally:
1007 finish = time.perf_counter()
1008 in_drawtime = math.fsum(data.draw_times.values()) - arg_drawtime
1009 in_stateful = (
1010 math.fsum(data._stateful_run_times.values()) - arg_stateful
1011 )
1012 in_gctime = gc_cumulative_time() - arg_gctime
1013 runtime = finish - start - in_drawtime - in_stateful - in_gctime
1014 self._timing_features = {
1015 "execute:test": runtime,
1016 "overall:gc": in_gctime,
1017 **data.draw_times,
1018 **data._stateful_run_times,
1019 }
1021 if (current_deadline := self.settings.deadline) is not None:
1022 if not is_final:
1023 current_deadline = (current_deadline // 4) * 5
1024 if runtime >= current_deadline.total_seconds():
1025 raise DeadlineExceeded(
1026 datetime.timedelta(seconds=runtime), self.settings.deadline
1027 )
1028 return result
1030 def run(data: ConjectureData) -> None:
1031 # Set up dynamic context needed by a single test run.
1032 if self.stuff.selfy is not None:
1033 data.hypothesis_runner = self.stuff.selfy
1034 # Generate all arguments to the test function.
1035 args = self.stuff.args
1036 kwargs = dict(self.stuff.kwargs)
1037 if example_kwargs is None:
1038 kw, argslices = context.prep_args_kwargs_from_strategies(
1039 self.stuff.given_kwargs
1040 )
1041 else:
1042 kw = example_kwargs
1043 argslices = {}
1044 kwargs.update(kw)
1045 if expected_failure is not None:
1046 nonlocal text_repr
1047 text_repr = repr_call(test, args, kwargs)
1049 if print_example or current_verbosity() >= Verbosity.verbose:
1050 printer = RepresentationPrinter(context=context)
1051 if print_example:
1052 printer.text("Falsifying example:")
1053 else:
1054 printer.text("Trying example:")
1056 if self.print_given_args:
1057 printer.text(" ")
1058 printer.repr_call(
1059 test.__name__,
1060 args,
1061 kwargs,
1062 force_split=True,
1063 arg_slices=argslices,
1064 leading_comment=(
1065 "# " + context.data.slice_comments[(0, 0)]
1066 if (0, 0) in context.data.slice_comments
1067 else None
1068 ),
1069 avoid_realization=data.provider.avoid_realization,
1070 )
1071 report(printer.getvalue())
1073 if TESTCASE_CALLBACKS:
1074 printer = RepresentationPrinter(context=context)
1075 printer.repr_call(
1076 test.__name__,
1077 args,
1078 kwargs,
1079 force_split=True,
1080 arg_slices=argslices,
1081 leading_comment=(
1082 "# " + context.data.slice_comments[(0, 0)]
1083 if (0, 0) in context.data.slice_comments
1084 else None
1085 ),
1086 avoid_realization=data.provider.avoid_realization,
1087 )
1088 self._string_repr = printer.getvalue()
1090 try:
1091 return test(*args, **kwargs)
1092 except TypeError as e:
1093 # If we sampled from a sequence of strategies, AND failed with a
1094 # TypeError, *AND that exception mentions SearchStrategy*, add a note:
1095 if (
1096 "SearchStrategy" in str(e)
1097 and data._sampled_from_all_strategies_elements_message is not None
1098 ):
1099 msg, format_arg = data._sampled_from_all_strategies_elements_message
1100 add_note(e, msg.format(format_arg))
1101 raise
1102 finally:
1103 if parts := getattr(data, "_stateful_repr_parts", None):
1104 self._string_repr = "\n".join(parts)
1106 if TESTCASE_CALLBACKS:
1107 printer = RepresentationPrinter(context=context)
1108 for name, value in data._observability_args.items():
1109 if name.startswith("generate:Draw "):
1110 try:
1111 value = data.provider.realize(value)
1112 except BackendCannotProceed: # pragma: no cover
1113 value = "<backend failed to realize symbolic>"
1114 printer.text(f"\n{name.removeprefix('generate:')}: ")
1115 printer.pretty(value)
1117 self._string_repr += printer.getvalue()
1119 # self.test_runner can include the execute_example method, or setup/teardown
1120 # _example, so it's important to get the PRNG and build context in place first.
1121 #
1122 # NOTE: For compatibility with Python 3.9's LL(1) parser, this is written as
1123 # three nested with-statements, instead of one compound statement.
1124 with local_settings(self.settings):
1125 with deterministic_PRNG():
1126 with BuildContext(
1127 data, is_final=is_final, wrapped_test=self.wrapped_test
1128 ) as context:
1129 # providers may throw in per_case_context_fn, and we'd like
1130 # `result` to still be set in these cases.
1131 result = None
1132 with data.provider.per_test_case_context_manager():
1133 # Run the test function once, via the executor hook.
1134 # In most cases this will delegate straight to `run(data)`.
1135 result = self.test_runner(data, run)
1137 # If a failure was expected, it should have been raised already, so
1138 # instead raise an appropriate diagnostic error.
1139 if expected_failure is not None:
1140 exception, traceback = expected_failure
1141 if isinstance(exception, DeadlineExceeded) and (
1142 runtime_secs := math.fsum(
1143 v
1144 for k, v in self._timing_features.items()
1145 if k.startswith("execute:")
1146 )
1147 ):
1148 report(
1149 "Unreliable test timings! On an initial run, this "
1150 f"test took {exception.runtime.total_seconds() * 1000:.2f}ms, "
1151 "which exceeded the deadline of "
1152 f"{self.settings.deadline.total_seconds() * 1000:.2f}ms, but "
1153 f"on a subsequent run it took {runtime_secs * 1000:.2f} ms, "
1154 "which did not. If you expect this sort of "
1155 "variability in your test timings, consider turning "
1156 "deadlines off for this test by setting deadline=None."
1157 )
1158 else:
1159 report("Failed to reproduce exception. Expected: \n" + traceback)
1160 raise FlakyFailure(
1161 f"Hypothesis {text_repr} produces unreliable results: "
1162 "Falsified on the first call but did not on a subsequent one",
1163 [exception],
1164 )
1165 return result
1167 def _flaky_replay_to_failure(
1168 self, err: FlakyReplay, context: BaseException
1169 ) -> FlakyFailure:
1170 # Note that in the mark_interesting case, _context_ itself
1171 # is part of err._interesting_examples - but it's not in
1172 # _runner.interesting_examples - this is fine, as the context
1173 # (i.e., immediate exception) is appended.
1174 interesting_examples = [
1175 self._runner.interesting_examples[origin]
1176 for origin in err._interesting_origins
1177 if origin in self._runner.interesting_examples
1178 ]
1179 exceptions = [result.expected_exception for result in interesting_examples]
1180 exceptions.append(context) # the immediate exception
1181 return FlakyFailure(err.reason, exceptions)
1183 def _execute_once_for_engine(self, data: ConjectureData) -> None:
1184 """Wrapper around ``execute_once`` that intercepts test failure
1185 exceptions and single-test control exceptions, and turns them into
1186 appropriate method calls to `data` instead.
1188 This allows the engine to assume that any exception other than
1189 ``StopTest`` must be a fatal error, and should stop the entire engine.
1190 """
1191 trace: Trace = set()
1192 try:
1193 with Tracer(should_trace=self._should_trace()) as tracer:
1194 try:
1195 result = self.execute_once(data)
1196 if (
1197 data.status == Status.VALID and tracer.branches
1198 ): # pragma: no cover
1199 # This is in fact covered by our *non-coverage* tests, but due
1200 # to the settrace() contention *not* by our coverage tests.
1201 self.explain_traces[None].add(frozenset(tracer.branches))
1202 finally:
1203 trace = tracer.branches
1204 if result is not None:
1205 fail_health_check(
1206 self.settings,
1207 "Tests run under @given should return None, but "
1208 f"{self.test.__name__} returned {result!r} instead.",
1209 HealthCheck.return_value,
1210 )
1211 except UnsatisfiedAssumption as e:
1212 # An "assume" check failed, so instead we inform the engine that
1213 # this test run was invalid.
1214 try:
1215 data.mark_invalid(e.reason)
1216 except FlakyReplay as err:
1217 # This was unexpected, meaning that the assume was flaky.
1218 # Report it as such.
1219 raise self._flaky_replay_to_failure(err, e) from None
1220 except (StopTest, BackendCannotProceed):
1221 # The engine knows how to handle this control exception, so it's
1222 # OK to re-raise it.
1223 raise
1224 except (
1225 FailedHealthCheck,
1226 *skip_exceptions_to_reraise(),
1227 ):
1228 # These are fatal errors or control exceptions that should stop the
1229 # engine, so we re-raise them.
1230 raise
1231 except failure_exceptions_to_catch() as e:
1232 # If an unhandled (i.e., non-Hypothesis) error was raised by
1233 # Hypothesis-internal code, re-raise it as a fatal error instead
1234 # of treating it as a test failure.
1235 if isinstance(e, BaseExceptionGroup) and len(e.exceptions) == 1:
1236 # When a naked exception is implicitly wrapped in an ExceptionGroup
1237 # due to a re-raising "except*", the ExceptionGroup is constructed in
1238 # the caller's stack frame (see #4183). This workaround is specifically
1239 # for implicit wrapping of naked exceptions by "except*", since explicit
1240 # raising of ExceptionGroup gets the proper traceback in the first place
1241 # - there's no need to handle hierarchical groups here, at least if no
1242 # such implicit wrapping happens inside hypothesis code (we only care
1243 # about the hypothesis-or-not distinction).
1244 #
1245 # 01-25-2025: this was patched to give the correct
1246 # stacktrace in cpython https://github.com/python/cpython/issues/128799.
1247 # can remove once python3.11 is EOL.
1248 tb = e.exceptions[0].__traceback__ or e.__traceback__
1249 else:
1250 tb = e.__traceback__
1251 filepath = traceback.extract_tb(tb)[-1][0]
1252 if (
1253 is_hypothesis_file(filepath)
1254 and not isinstance(e, HypothesisException)
1255 # We expect backend authors to use the provider_conformance test
1256 # to test their backends. If an error occurs there, it is probably
1257 # from their backend, and we would like to treat it as a standard
1258 # error, not a hypothesis-internal error.
1259 and not filepath.endswith(
1260 f"internal{os.sep}conjecture{os.sep}provider_conformance.py"
1261 )
1262 ):
1263 raise
1265 if data.frozen:
1266 # This can happen if an error occurred in a finally
1267 # block somewhere, suppressing our original StopTest.
1268 # We raise a new one here to resume normal operation.
1269 raise StopTest(data.testcounter) from e
1270 else:
1271 # The test failed by raising an exception, so we inform the
1272 # engine that this test run was interesting. This is the normal
1273 # path for test runs that fail.
1274 tb = get_trimmed_traceback()
1275 data.expected_traceback = format_exception(e, tb)
1276 data.expected_exception = e
1277 assert data.expected_traceback is not None # for mypy
1278 verbose_report(data.expected_traceback)
1280 self.failed_normally = True
1282 interesting_origin = InterestingOrigin.from_exception(e)
1283 if trace: # pragma: no cover
1284 # Trace collection is explicitly disabled under coverage.
1285 self.explain_traces[interesting_origin].add(frozenset(trace))
1286 if interesting_origin[0] == DeadlineExceeded:
1287 self.failed_due_to_deadline = True
1288 self.explain_traces.clear()
1289 try:
1290 data.mark_interesting(interesting_origin)
1291 except FlakyReplay as err:
1292 raise self._flaky_replay_to_failure(err, e) from None
1294 finally:
1295 # Conditional here so we can save some time constructing the payload; in
1296 # other cases (without coverage) it's cheap enough to do that regardless.
1297 if TESTCASE_CALLBACKS:
1298 if runner := getattr(self, "_runner", None):
1299 phase = runner._current_phase
1300 else: # pragma: no cover # in case of messing with internals
1301 if self.failed_normally or self.failed_due_to_deadline:
1302 phase = "shrink"
1303 else:
1304 phase = "unknown"
1305 backend_desc = f", using backend={self.settings.backend!r}" * (
1306 self.settings.backend != "hypothesis"
1307 and not getattr(runner, "_switch_to_hypothesis_provider", False)
1308 )
1309 try:
1310 data._observability_args = data.provider.realize(
1311 data._observability_args
1312 )
1313 self._string_repr = data.provider.realize(self._string_repr)
1314 except BackendCannotProceed:
1315 data._observability_args = {}
1316 self._string_repr = "<backend failed to realize symbolic arguments>"
1318 data.freeze()
1319 tc = make_testcase(
1320 run_start=self._start_timestamp,
1321 property=self.test_identifier,
1322 data=data,
1323 how_generated=f"during {phase} phase{backend_desc}",
1324 representation=self._string_repr,
1325 arguments=data._observability_args,
1326 timing=self._timing_features,
1327 coverage=tractable_coverage_report(trace) or None,
1328 phase=phase,
1329 backend_metadata=data.provider.observe_test_case(),
1330 )
1331 deliver_observation(tc)
1332 for msg in data.provider.observe_information_messages(
1333 lifetime="test_case"
1334 ):
1335 self._deliver_information_message(**msg)
1336 self._timing_features = {}
1338 def _deliver_information_message(
1339 self, *, type: InfoObservationType, title: str, content: Union[str, dict]
1340 ) -> None:
1341 deliver_observation(
1342 InfoObservation(
1343 type=type,
1344 run_start=self._start_timestamp,
1345 property=self.test_identifier,
1346 title=title,
1347 content=content,
1348 )
1349 )
1351 def run_engine(self):
1352 """Run the test function many times, on database input and generated
1353 input, using the Conjecture engine.
1354 """
1355 # Tell pytest to omit the body of this function from tracebacks
1356 __tracebackhide__ = True
1357 try:
1358 database_key = self.wrapped_test._hypothesis_internal_database_key
1359 except AttributeError:
1360 if global_force_seed is None:
1361 database_key = function_digest(self.test)
1362 else:
1363 database_key = None
1365 runner = self._runner = ConjectureRunner(
1366 self._execute_once_for_engine,
1367 settings=self.settings,
1368 random=self.random,
1369 database_key=database_key,
1370 )
1371 # Use the Conjecture engine to run the test function many times
1372 # on different inputs.
1373 runner.run()
1374 note_statistics(runner.statistics)
1375 if TESTCASE_CALLBACKS:
1376 self._deliver_information_message(
1377 type="info",
1378 title="Hypothesis Statistics",
1379 content=describe_statistics(runner.statistics),
1380 )
1381 for msg in (
1382 p if isinstance(p := runner.provider, PrimitiveProvider) else p(None)
1383 ).observe_information_messages(lifetime="test_function"):
1384 self._deliver_information_message(**msg)
1386 if runner.call_count == 0:
1387 return
1388 if runner.interesting_examples:
1389 self.falsifying_examples = sorted(
1390 runner.interesting_examples.values(),
1391 key=lambda d: sort_key(d.nodes),
1392 reverse=True,
1393 )
1394 else:
1395 if runner.valid_examples == 0:
1396 explanations = []
1397 # use a somewhat arbitrary cutoff to avoid recommending spurious
1398 # fixes.
1399 # eg, a few invalid examples from internal filters when the
1400 # problem is the user generating large inputs, or a
1401 # few overruns during internal mutation when the problem is
1402 # impossible user filters/assumes.
1403 if runner.invalid_examples > min(20, runner.call_count // 5):
1404 explanations.append(
1405 f"{runner.invalid_examples} of {runner.call_count} "
1406 "examples failed a .filter() or assume() condition. Try "
1407 "making your filters or assumes less strict, or rewrite "
1408 "using strategy parameters: "
1409 "st.integers().filter(lambda x: x > 0) fails less often "
1410 "(that is, never) when rewritten as st.integers(min_value=1)."
1411 )
1412 if runner.overrun_examples > min(20, runner.call_count // 5):
1413 explanations.append(
1414 f"{runner.overrun_examples} of {runner.call_count} "
1415 "examples were too large to finish generating; try "
1416 "reducing the typical size of your inputs?"
1417 )
1418 rep = get_pretty_function_description(self.test)
1419 raise Unsatisfiable(
1420 f"Unable to satisfy assumptions of {rep}. "
1421 f"{' Also, '.join(explanations)}"
1422 )
1424 # If we have not traced executions, warn about that now (but only when
1425 # we'd expect to do so reliably, i.e. on CPython>=3.12)
1426 if (
1427 sys.version_info[:2] >= (3, 12)
1428 and not PYPY
1429 and self._should_trace()
1430 and not Tracer.can_trace()
1431 ): # pragma: no cover
1432 # actually covered by our tests, but only on >= 3.12
1433 warnings.warn(
1434 "avoiding tracing test function because tool id "
1435 f"{MONITORING_TOOL_ID} is already taken by tool "
1436 f"{sys.monitoring.get_tool(MONITORING_TOOL_ID)}.",
1437 HypothesisWarning,
1438 stacklevel=3,
1439 )
1441 if not self.falsifying_examples:
1442 return
1443 elif not (self.settings.report_multiple_bugs and pytest_shows_exceptiongroups):
1444 # Pretend that we only found one failure, by discarding the others.
1445 del self.falsifying_examples[:-1]
1447 # The engine found one or more failures, so we need to reproduce and
1448 # report them.
1450 errors_to_report = []
1452 report_lines = describe_targets(runner.best_observed_targets)
1453 if report_lines:
1454 report_lines.append("")
1456 explanations = explanatory_lines(self.explain_traces, self.settings)
1457 for falsifying_example in self.falsifying_examples:
1458 fragments = []
1460 ran_example = runner.new_conjecture_data(
1461 falsifying_example.choices, max_choices=len(falsifying_example.choices)
1462 )
1463 ran_example.slice_comments = falsifying_example.slice_comments
1464 tb = None
1465 origin = None
1466 assert falsifying_example.expected_exception is not None
1467 assert falsifying_example.expected_traceback is not None
1468 try:
1469 with with_reporter(fragments.append):
1470 self.execute_once(
1471 ran_example,
1472 print_example=not self.is_find,
1473 is_final=True,
1474 expected_failure=(
1475 falsifying_example.expected_exception,
1476 falsifying_example.expected_traceback,
1477 ),
1478 )
1479 except StopTest as e:
1480 # Link the expected exception from the first run. Not sure
1481 # how to access the current exception, if it failed
1482 # differently on this run. In fact, in the only known
1483 # reproducer, the StopTest is caused by OVERRUN before the
1484 # test is even executed. Possibly because all initial examples
1485 # failed until the final non-traced replay, and something was
1486 # exhausted? Possibly a FIXME, but sufficiently weird to
1487 # ignore for now.
1488 err = FlakyFailure(
1489 "Inconsistent results: An example failed on the "
1490 "first run but now succeeds (or fails with another "
1491 "error, or is for some reason not runnable).",
1492 # (note: e is a BaseException)
1493 [falsifying_example.expected_exception or e],
1494 )
1495 errors_to_report.append((fragments, err))
1496 except UnsatisfiedAssumption as e: # pragma: no cover # ironically flaky
1497 err = FlakyFailure(
1498 "Unreliable assumption: An example which satisfied "
1499 "assumptions on the first run now fails it.",
1500 [e],
1501 )
1502 errors_to_report.append((fragments, err))
1503 except BaseException as e:
1504 # If we have anything for explain-mode, this is the time to report.
1505 fragments.extend(explanations[falsifying_example.interesting_origin])
1506 errors_to_report.append(
1507 (fragments, e.with_traceback(get_trimmed_traceback()))
1508 )
1509 tb = format_exception(e, get_trimmed_traceback(e))
1510 origin = InterestingOrigin.from_exception(e)
1511 else:
1512 # execute_once() will always raise either the expected error, or Flaky.
1513 raise NotImplementedError("This should be unreachable")
1514 finally:
1515 ran_example.freeze()
1516 # log our observability line for the final failing example
1517 tc = make_testcase(
1518 run_start=self._start_timestamp,
1519 property=self.test_identifier,
1520 data=ran_example,
1521 how_generated="minimal failing example",
1522 representation=self._string_repr,
1523 arguments=ran_example._observability_args,
1524 timing=self._timing_features,
1525 coverage=None, # Not recorded when we're replaying the MFE
1526 status="passed" if sys.exc_info()[0] else "failed",
1527 status_reason=str(origin or "unexpected/flaky pass"),
1528 metadata={"traceback": tb},
1529 )
1530 deliver_observation(tc)
1531 # Whether or not replay actually raised the exception again, we want
1532 # to print the reproduce_failure decorator for the failing example.
1533 if self.settings.print_blob:
1534 fragments.append(
1535 "\nYou can reproduce this example by temporarily adding "
1536 f"{reproduction_decorator(falsifying_example.choices)} "
1537 "as a decorator on your test case"
1538 )
1540 _raise_to_user(
1541 errors_to_report,
1542 self.settings,
1543 report_lines,
1544 # A backend might report a failure and then report verified afterwards,
1545 # which is to be interpreted as "there are no more failures *other
1546 # than what we already reported*". Do not report this as unsound.
1547 unsound_backend=(
1548 runner._verified_by
1549 if runner._verified_by and not runner._backend_found_failure
1550 else None
1551 ),
1552 )
1555def _raise_to_user(
1556 errors_to_report, settings, target_lines, trailer="", *, unsound_backend=None
1557):
1558 """Helper function for attaching notes and grouping multiple errors."""
1559 failing_prefix = "Falsifying example: "
1560 ls = []
1561 for fragments, err in errors_to_report:
1562 for note in fragments:
1563 add_note(err, note)
1564 if note.startswith(failing_prefix):
1565 ls.append(note.removeprefix(failing_prefix))
1566 if current_pytest_item.value:
1567 current_pytest_item.value._hypothesis_failing_examples = ls
1569 if len(errors_to_report) == 1:
1570 _, the_error_hypothesis_found = errors_to_report[0]
1571 else:
1572 assert errors_to_report
1573 the_error_hypothesis_found = BaseExceptionGroup(
1574 f"Hypothesis found {len(errors_to_report)} distinct failures{trailer}.",
1575 [e for _, e in errors_to_report],
1576 )
1578 if settings.verbosity >= Verbosity.normal:
1579 for line in target_lines:
1580 add_note(the_error_hypothesis_found, line)
1582 if unsound_backend:
1583 msg = f"backend={unsound_backend!r} claimed to verify this test passes - please send them a bug report!"
1584 add_note(err, msg)
1586 raise the_error_hypothesis_found
1589@contextlib.contextmanager
1590def fake_subTest(self, msg=None, **__):
1591 """Monkeypatch for `unittest.TestCase.subTest` during `@given`.
1593 If we don't patch this out, each failing example is reported as a
1594 separate failing test by the unittest test runner, which is
1595 obviously incorrect. We therefore replace it for the duration with
1596 this version.
1597 """
1598 warnings.warn(
1599 "subTest per-example reporting interacts badly with Hypothesis "
1600 "trying hundreds of examples, so we disable it for the duration of "
1601 "any test that uses `@given`.",
1602 HypothesisWarning,
1603 stacklevel=2,
1604 )
1605 yield
1608@dataclass
1609class HypothesisHandle:
1610 """This object is provided as the .hypothesis attribute on @given tests.
1612 Downstream users can reassign its attributes to insert custom logic into
1613 the execution of each case, for example by converting an async into a
1614 sync function.
1616 This must be an attribute of an attribute, because reassignment of a
1617 first-level attribute would not be visible to Hypothesis if the function
1618 had been decorated before the assignment.
1620 See https://github.com/HypothesisWorks/hypothesis/issues/1257 for more
1621 information.
1622 """
1624 inner_test: Any
1625 _get_fuzz_target: Any
1626 _given_kwargs: Any
1628 @property
1629 def fuzz_one_input(
1630 self,
1631 ) -> Callable[[Union[bytes, bytearray, memoryview, BinaryIO]], Optional[bytes]]:
1632 """Run the test as a fuzz target, driven with the `buffer` of bytes.
1634 Returns None if buffer invalid for the strategy, canonical pruned
1635 bytes if the buffer was valid, and leaves raised exceptions alone.
1636 """
1637 # Note: most users, if they care about fuzzer performance, will access the
1638 # property and assign it to a local variable to move the attribute lookup
1639 # outside their fuzzing loop / before the fork point. We cache it anyway,
1640 # so that naive or unusual use-cases get the best possible performance too.
1641 try:
1642 return self.__cached_target # type: ignore
1643 except AttributeError:
1644 self.__cached_target = self._get_fuzz_target()
1645 return self.__cached_target
1648@overload
1649def given(
1650 _: EllipsisType, /
1651) -> Callable[
1652 [Callable[..., Optional[Coroutine[Any, Any, None]]]], Callable[[], None]
1653]: # pragma: no cover
1654 ...
1657@overload
1658def given(
1659 *_given_arguments: SearchStrategy[Any],
1660) -> Callable[
1661 [Callable[..., Optional[Coroutine[Any, Any, None]]]], Callable[..., None]
1662]: # pragma: no cover
1663 ...
1666@overload
1667def given(
1668 **_given_kwargs: Union[SearchStrategy[Any], EllipsisType],
1669) -> Callable[
1670 [Callable[..., Optional[Coroutine[Any, Any, None]]]], Callable[..., None]
1671]: # pragma: no cover
1672 ...
1675def given(
1676 *_given_arguments: Union[SearchStrategy[Any], EllipsisType],
1677 **_given_kwargs: Union[SearchStrategy[Any], EllipsisType],
1678) -> Callable[
1679 [Callable[..., Optional[Coroutine[Any, Any, None]]]], Callable[..., None]
1680]:
1681 """
1682 The |@given| decorator turns a function into a Hypothesis test. This is the
1683 main entry point to Hypothesis.
1685 .. seealso::
1687 See also the :doc:`/tutorial/introduction` tutorial, which introduces
1688 defining Hypothesis tests with |@given|.
1690 .. _given-arguments:
1692 Arguments to ``@given``
1693 -----------------------
1695 Arguments to |@given| may be either positional or keyword arguments:
1697 .. code-block:: python
1699 @given(st.integers(), st.floats())
1700 def test_one(x, y):
1701 pass
1703 @given(x=st.integers(), y=st.floats())
1704 def test_two(x, y):
1705 pass
1707 If using keyword arguments, the arguments may appear in any order, as with
1708 standard Python functions:
1710 .. code-block:: python
1712 # different order, but still equivalent to before
1713 @given(y=st.floats(), x=st.integers())
1714 def test(x, y):
1715 assert isinstance(x, int)
1716 assert isinstance(y, float)
1718 If |@given| is provided fewer positional arguments than the decorated test,
1719 the test arguments are filled in on the right side, leaving the leftmost
1720 positional arguments unfilled:
1722 .. code-block:: python
1724 @given(st.integers(), st.floats())
1725 def test(manual_string, y, z):
1726 assert manual_string == "x"
1727 assert isinstance(y, int)
1728 assert isinstance(z, float)
1730 # `test` is now a callable which takes one argument `manual_string`
1732 test("x")
1733 # or equivalently:
1734 test(manual_string="x")
1736 The reason for this "from the right" behavior is to support using |@given|
1737 with instance methods, by passing through ``self``:
1739 .. code-block:: python
1741 class MyTest(TestCase):
1742 @given(st.integers())
1743 def test(self, x):
1744 assert isinstance(self, MyTest)
1745 assert isinstance(x, int)
1747 If (and only if) using keyword arguments, |@given| may be combined with
1748 ``**kwargs`` or ``*args``:
1750 .. code-block:: python
1752 @given(x=integers(), y=integers())
1753 def test(x, **kwargs):
1754 assert "y" in kwargs
1756 @given(x=integers(), y=integers())
1757 def test(x, *args, **kwargs):
1758 assert args == ()
1759 assert "x" not in kwargs
1760 assert "y" in kwargs
1762 It is an error to:
1764 * Mix positional and keyword arguments to |@given|.
1765 * Use |@given| with a function that has a default value for an argument.
1766 * Use |@given| with positional arguments with a function that uses ``*args``,
1767 ``**kwargs``, or keyword-only arguments.
1769 The function returned by given has all the same arguments as the original
1770 test, minus those that are filled in by |@given|. See the :ref:`notes on
1771 framework compatibility <framework-compatibility>` for how this interacts
1772 with features of other testing libraries, such as :pypi:`pytest` fixtures.
1773 """
1775 if currently_in_test_context():
1776 fail_health_check(
1777 Settings(),
1778 "Nesting @given tests results in quadratic generation and shrinking "
1779 "behavior and can usually be more cleanly expressed by replacing the "
1780 "inner function with an st.data() parameter on the outer @given.",
1781 HealthCheck.nested_given,
1782 )
1784 def run_test_as_given(test):
1785 if inspect.isclass(test):
1786 # Provide a meaningful error to users, instead of exceptions from
1787 # internals that assume we're dealing with a function.
1788 raise InvalidArgument("@given cannot be applied to a class")
1790 if (
1791 "_pytest" in sys.modules
1792 and "_pytest.fixtures" in sys.modules
1793 and (
1794 tuple(map(int, sys.modules["_pytest"].__version__.split(".")[:2]))
1795 >= (8, 4)
1796 )
1797 and isinstance(
1798 test, sys.modules["_pytest.fixtures"].FixtureFunctionDefinition
1799 )
1800 ): # pragma: no cover # covered by pytest/test_fixtures, but not by cover/
1801 raise InvalidArgument("@given cannot be applied to a pytest fixture")
1803 given_arguments = tuple(_given_arguments)
1804 given_kwargs = dict(_given_kwargs)
1806 original_sig = get_signature(test)
1807 if given_arguments == (Ellipsis,) and not given_kwargs:
1808 # user indicated that they want to infer all arguments
1809 given_kwargs = {
1810 p.name: Ellipsis
1811 for p in original_sig.parameters.values()
1812 if p.kind in (p.POSITIONAL_OR_KEYWORD, p.KEYWORD_ONLY)
1813 }
1814 given_arguments = ()
1816 check_invalid = is_invalid_test(
1817 test, original_sig, given_arguments, given_kwargs
1818 )
1820 # If the argument check found problems, return a dummy test function
1821 # that will raise an error if it is actually called.
1822 if check_invalid is not None:
1823 return check_invalid
1825 # Because the argument check succeeded, we can convert @given's
1826 # positional arguments into keyword arguments for simplicity.
1827 if given_arguments:
1828 assert not given_kwargs
1829 posargs = [
1830 p.name
1831 for p in original_sig.parameters.values()
1832 if p.kind is p.POSITIONAL_OR_KEYWORD
1833 ]
1834 given_kwargs = dict(list(zip(posargs[::-1], given_arguments[::-1]))[::-1])
1835 # These have been converted, so delete them to prevent accidental use.
1836 del given_arguments
1838 new_signature = new_given_signature(original_sig, given_kwargs)
1840 # Use type information to convert "infer" arguments into appropriate strategies.
1841 if ... in given_kwargs.values():
1842 hints = get_type_hints(test)
1843 for name in [name for name, value in given_kwargs.items() if value is ...]:
1844 if name not in hints:
1845 return _invalid(
1846 f"passed {name}=... for {test.__name__}, but {name} has "
1847 "no type annotation",
1848 test=test,
1849 given_kwargs=given_kwargs,
1850 )
1851 given_kwargs[name] = st.from_type(hints[name])
1853 prev_self = Unset = object()
1855 @impersonate(test)
1856 @define_function_signature(test.__name__, test.__doc__, new_signature)
1857 def wrapped_test(*arguments, **kwargs):
1858 # Tell pytest to omit the body of this function from tracebacks
1859 __tracebackhide__ = True
1861 test = wrapped_test.hypothesis.inner_test
1863 if getattr(test, "is_hypothesis_test", False):
1864 raise InvalidArgument(
1865 f"You have applied @given to the test {test.__name__} more than "
1866 "once, which wraps the test several times and is extremely slow. "
1867 "A similar effect can be gained by combining the arguments "
1868 "of the two calls to given. For example, instead of "
1869 "@given(booleans()) @given(integers()), you could write "
1870 "@given(booleans(), integers())"
1871 )
1873 settings = wrapped_test._hypothesis_internal_use_settings
1875 random = get_random_for_wrapped_test(test, wrapped_test)
1877 arguments, kwargs, stuff = process_arguments_to_given(
1878 wrapped_test, arguments, kwargs, given_kwargs, new_signature.parameters
1879 )
1881 if (
1882 inspect.iscoroutinefunction(test)
1883 and get_executor(stuff.selfy) is default_executor
1884 ):
1885 # See https://github.com/HypothesisWorks/hypothesis/issues/3054
1886 # If our custom executor doesn't handle coroutines, or we return an
1887 # awaitable from a non-async-def function, we just rely on the
1888 # return_value health check. This catches most user errors though.
1889 raise InvalidArgument(
1890 "Hypothesis doesn't know how to run async test functions like "
1891 f"{test.__name__}. You'll need to write a custom executor, "
1892 "or use a library like pytest-asyncio or pytest-trio which can "
1893 "handle the translation for you.\n See https://hypothesis."
1894 "readthedocs.io/en/latest/details.html#custom-function-execution"
1895 )
1897 runner = stuff.selfy
1898 if isinstance(stuff.selfy, TestCase) and test.__name__ in dir(TestCase):
1899 msg = (
1900 f"You have applied @given to the method {test.__name__}, which is "
1901 "used by the unittest runner but is not itself a test."
1902 " This is not useful in any way."
1903 )
1904 fail_health_check(settings, msg, HealthCheck.not_a_test_method)
1905 if bad_django_TestCase(runner): # pragma: no cover
1906 # Covered by the Django tests, but not the pytest coverage task
1907 raise InvalidArgument(
1908 "You have applied @given to a method on "
1909 f"{type(runner).__qualname__}, but this "
1910 "class does not inherit from the supported versions in "
1911 "`hypothesis.extra.django`. Use the Hypothesis variants "
1912 "to ensure that each example is run in a separate "
1913 "database transaction."
1914 )
1916 nonlocal prev_self
1917 # Check selfy really is self (not e.g. a mock) before we health-check
1918 cur_self = (
1919 stuff.selfy
1920 if getattr(type(stuff.selfy), test.__name__, None) is wrapped_test
1921 else None
1922 )
1923 if prev_self is Unset:
1924 prev_self = cur_self
1925 elif cur_self is not prev_self:
1926 msg = (
1927 f"The method {test.__qualname__} was called from multiple "
1928 "different executors. This may lead to flaky tests and "
1929 "nonreproducible errors when replaying from database."
1930 )
1931 fail_health_check(settings, msg, HealthCheck.differing_executors)
1933 state = StateForActualGivenExecution(
1934 stuff, test, settings, random, wrapped_test
1935 )
1937 reproduce_failure = wrapped_test._hypothesis_internal_use_reproduce_failure
1939 # If there was a @reproduce_failure decorator, use it to reproduce
1940 # the error (or complain that we couldn't). Either way, this will
1941 # always raise some kind of error.
1942 if reproduce_failure is not None:
1943 expected_version, failure = reproduce_failure
1944 if expected_version != __version__:
1945 raise InvalidArgument(
1946 "Attempting to reproduce a failure from a different "
1947 f"version of Hypothesis. This failure is from {expected_version}, but "
1948 f"you are currently running {__version__!r}. Please change your "
1949 "Hypothesis version to a matching one."
1950 )
1951 try:
1952 state.execute_once(
1953 ConjectureData.for_choices(decode_failure(failure)),
1954 print_example=True,
1955 is_final=True,
1956 )
1957 raise DidNotReproduce(
1958 "Expected the test to raise an error, but it "
1959 "completed successfully."
1960 )
1961 except StopTest:
1962 raise DidNotReproduce(
1963 "The shape of the test data has changed in some way "
1964 "from where this blob was defined. Are you sure "
1965 "you're running the same test?"
1966 ) from None
1967 except UnsatisfiedAssumption:
1968 raise DidNotReproduce(
1969 "The test data failed to satisfy an assumption in the "
1970 "test. Have you added it since this blob was generated?"
1971 ) from None
1973 # There was no @reproduce_failure, so start by running any explicit
1974 # examples from @example decorators.
1975 errors = list(
1976 execute_explicit_examples(
1977 state, wrapped_test, arguments, kwargs, original_sig
1978 )
1979 )
1980 if errors:
1981 # If we're not going to report multiple bugs, we would have
1982 # stopped running explicit examples at the first failure.
1983 assert len(errors) == 1 or state.settings.report_multiple_bugs
1985 # If an explicit example raised a 'skip' exception, ensure it's never
1986 # wrapped up in an exception group. Because we break out of the loop
1987 # immediately on finding a skip, if present it's always the last error.
1988 if isinstance(errors[-1][1], skip_exceptions_to_reraise()):
1989 # Covered by `test_issue_3453_regression`, just in a subprocess.
1990 del errors[:-1] # pragma: no cover
1992 _raise_to_user(errors, state.settings, [], " in explicit examples")
1994 # If there were any explicit examples, they all ran successfully.
1995 # The next step is to use the Conjecture engine to run the test on
1996 # many different inputs.
1998 ran_explicit_examples = Phase.explicit in state.settings.phases and getattr(
1999 wrapped_test, "hypothesis_explicit_examples", ()
2000 )
2001 SKIP_BECAUSE_NO_EXAMPLES = unittest.SkipTest(
2002 "Hypothesis has been told to run no examples for this test."
2003 )
2004 if not (
2005 Phase.reuse in settings.phases or Phase.generate in settings.phases
2006 ):
2007 if not ran_explicit_examples:
2008 raise SKIP_BECAUSE_NO_EXAMPLES
2009 return
2011 try:
2012 if isinstance(runner, TestCase) and hasattr(runner, "subTest"):
2013 subTest = runner.subTest
2014 try:
2015 runner.subTest = types.MethodType(fake_subTest, runner)
2016 state.run_engine()
2017 finally:
2018 runner.subTest = subTest
2019 else:
2020 state.run_engine()
2021 except BaseException as e:
2022 # The exception caught here should either be an actual test
2023 # failure (or BaseExceptionGroup), or some kind of fatal error
2024 # that caused the engine to stop.
2025 generated_seed = wrapped_test._hypothesis_internal_use_generated_seed
2026 with local_settings(settings):
2027 if not (state.failed_normally or generated_seed is None):
2028 if running_under_pytest:
2029 report(
2030 f"You can add @seed({generated_seed}) to this test or "
2031 f"run pytest with --hypothesis-seed={generated_seed} "
2032 "to reproduce this failure."
2033 )
2034 else:
2035 report(
2036 f"You can add @seed({generated_seed}) to this test to "
2037 "reproduce this failure."
2038 )
2039 # The dance here is to avoid showing users long tracebacks
2040 # full of Hypothesis internals they don't care about.
2041 # We have to do this inline, to avoid adding another
2042 # internal stack frame just when we've removed the rest.
2043 #
2044 # Using a variable for our trimmed error ensures that the line
2045 # which will actually appear in tracebacks is as clear as
2046 # possible - "raise the_error_hypothesis_found".
2047 the_error_hypothesis_found = e.with_traceback(
2048 None
2049 if isinstance(e, BaseExceptionGroup)
2050 else get_trimmed_traceback()
2051 )
2052 raise the_error_hypothesis_found
2054 if not (ran_explicit_examples or state.ever_executed):
2055 raise SKIP_BECAUSE_NO_EXAMPLES
2057 def _get_fuzz_target() -> (
2058 Callable[[Union[bytes, bytearray, memoryview, BinaryIO]], Optional[bytes]]
2059 ):
2060 # Because fuzzing interfaces are very performance-sensitive, we use a
2061 # somewhat more complicated structure here. `_get_fuzz_target()` is
2062 # called by the `HypothesisHandle.fuzz_one_input` property, allowing
2063 # us to defer our collection of the settings, random instance, and
2064 # reassignable `inner_test` (etc) until `fuzz_one_input` is accessed.
2065 #
2066 # We then share the performance cost of setting up `state` between
2067 # many invocations of the target. We explicitly force `deadline=None`
2068 # for performance reasons, saving ~40% the runtime of an empty test.
2069 test = wrapped_test.hypothesis.inner_test
2070 settings = Settings(
2071 parent=wrapped_test._hypothesis_internal_use_settings, deadline=None
2072 )
2073 random = get_random_for_wrapped_test(test, wrapped_test)
2074 _args, _kwargs, stuff = process_arguments_to_given(
2075 wrapped_test, (), {}, given_kwargs, new_signature.parameters
2076 )
2077 assert not _args
2078 assert not _kwargs
2079 state = StateForActualGivenExecution(
2080 stuff, test, settings, random, wrapped_test
2081 )
2082 database_key = function_digest(test) + b".secondary"
2083 # We track the minimal-so-far example for each distinct origin, so
2084 # that we track log-n instead of n examples for long runs. In particular
2085 # it means that we saturate for common errors in long runs instead of
2086 # storing huge volumes of low-value data.
2087 minimal_failures: dict = {}
2089 def fuzz_one_input(
2090 buffer: Union[bytes, bytearray, memoryview, BinaryIO],
2091 ) -> Optional[bytes]:
2092 # This inner part is all that the fuzzer will actually run,
2093 # so we keep it as small and as fast as possible.
2094 if isinstance(buffer, io.IOBase):
2095 buffer = buffer.read(BUFFER_SIZE)
2096 assert isinstance(buffer, (bytes, bytearray, memoryview))
2097 data = ConjectureData(
2098 random=None,
2099 provider=BytestringProvider,
2100 provider_kw={"bytestring": buffer},
2101 )
2102 try:
2103 state.execute_once(data)
2104 status = Status.VALID
2105 except StopTest:
2106 status = data.status
2107 return None
2108 except UnsatisfiedAssumption:
2109 status = Status.INVALID
2110 return None
2111 except BaseException:
2112 known = minimal_failures.get(data.interesting_origin)
2113 if settings.database is not None and (
2114 known is None or sort_key(data.nodes) <= sort_key(known)
2115 ):
2116 settings.database.save(
2117 database_key, choices_to_bytes(data.choices)
2118 )
2119 minimal_failures[data.interesting_origin] = data.nodes
2120 status = Status.INTERESTING
2121 raise
2122 finally:
2123 if TESTCASE_CALLBACKS:
2124 data.freeze()
2125 tc = make_testcase(
2126 run_start=state._start_timestamp,
2127 property=state.test_identifier,
2128 data=data,
2129 how_generated="fuzz_one_input",
2130 representation=state._string_repr,
2131 arguments=data._observability_args,
2132 timing=state._timing_features,
2133 coverage=None,
2134 status=status,
2135 backend_metadata=data.provider.observe_test_case(),
2136 )
2137 deliver_observation(tc)
2138 state._timing_features = {}
2140 assert isinstance(data.provider, BytestringProvider)
2141 return bytes(data.provider.drawn)
2143 fuzz_one_input.__doc__ = HypothesisHandle.fuzz_one_input.__doc__
2144 return fuzz_one_input
2146 # After having created the decorated test function, we need to copy
2147 # over some attributes to make the switch as seamless as possible.
2149 for attrib in dir(test):
2150 if not (attrib.startswith("_") or hasattr(wrapped_test, attrib)):
2151 setattr(wrapped_test, attrib, getattr(test, attrib))
2152 wrapped_test.is_hypothesis_test = True
2153 if hasattr(test, "_hypothesis_internal_settings_applied"):
2154 # Used to check if @settings is applied twice.
2155 wrapped_test._hypothesis_internal_settings_applied = True
2156 wrapped_test._hypothesis_internal_use_seed = getattr(
2157 test, "_hypothesis_internal_use_seed", None
2158 )
2159 wrapped_test._hypothesis_internal_use_settings = (
2160 getattr(test, "_hypothesis_internal_use_settings", None) or Settings.default
2161 )
2162 wrapped_test._hypothesis_internal_use_reproduce_failure = getattr(
2163 test, "_hypothesis_internal_use_reproduce_failure", None
2164 )
2165 wrapped_test.hypothesis = HypothesisHandle(test, _get_fuzz_target, given_kwargs)
2166 return wrapped_test
2168 return run_test_as_given
2171def find(
2172 specifier: SearchStrategy[Ex],
2173 condition: Callable[[Any], bool],
2174 *,
2175 settings: Optional[Settings] = None,
2176 random: Optional[Random] = None,
2177 database_key: Optional[bytes] = None,
2178) -> Ex:
2179 """Returns the minimal example from the given strategy ``specifier`` that
2180 matches the predicate function ``condition``."""
2181 if settings is None:
2182 settings = Settings(max_examples=2000)
2183 settings = Settings(
2184 settings, suppress_health_check=list(HealthCheck), report_multiple_bugs=False
2185 )
2187 if database_key is None and settings.database is not None:
2188 # Note: The database key is not guaranteed to be unique. If not, replaying
2189 # of database examples may fail to reproduce due to being replayed on the
2190 # wrong condition.
2191 database_key = function_digest(condition)
2193 if not isinstance(specifier, SearchStrategy):
2194 raise InvalidArgument(
2195 f"Expected SearchStrategy but got {specifier!r} of "
2196 f"type {type(specifier).__name__}"
2197 )
2198 specifier.validate()
2200 last: list[Ex] = []
2202 @settings
2203 @given(specifier)
2204 def test(v):
2205 if condition(v):
2206 last[:] = [v]
2207 raise Found
2209 if random is not None:
2210 test = seed(random.getrandbits(64))(test)
2212 # Aliasing as Any avoids mypy errors (attr-defined) when accessing and
2213 # setting custom attributes on the decorated function or class.
2214 _test: Any = test
2215 _test._hypothesis_internal_is_find = True
2216 _test._hypothesis_internal_database_key = database_key
2218 try:
2219 test()
2220 except Found:
2221 return last[0]
2223 raise NoSuchExample(get_pretty_function_description(condition))