Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/hypothesis/core.py: 34%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
11"""This module provides the core primitives of Hypothesis, such as given."""
12import base64
13import contextlib
14import dataclasses
15import datetime
16import inspect
17import io
18import math
19import os
20import sys
21import time
22import traceback
23import types
24import unittest
25import warnings
26import zlib
27from collections import defaultdict
28from collections.abc import Coroutine, Generator, Hashable, Iterable, Sequence
29from dataclasses import dataclass, field
30from functools import partial
31from inspect import Parameter
32from random import Random
33from typing import (
34 Any,
35 BinaryIO,
36 Callable,
37 Optional,
38 TypeVar,
39 Union,
40 overload,
41)
42from unittest import TestCase
44from hypothesis import strategies as st
45from hypothesis._settings import (
46 HealthCheck,
47 Phase,
48 Verbosity,
49 all_settings,
50 local_settings,
51 settings as Settings,
52)
53from hypothesis.control import BuildContext, currently_in_test_context
54from hypothesis.database import choices_from_bytes, choices_to_bytes
55from hypothesis.errors import (
56 BackendCannotProceed,
57 DeadlineExceeded,
58 DidNotReproduce,
59 FailedHealthCheck,
60 FlakyFailure,
61 FlakyReplay,
62 Found,
63 Frozen,
64 HypothesisException,
65 HypothesisWarning,
66 InvalidArgument,
67 NoSuchExample,
68 StopTest,
69 Unsatisfiable,
70 UnsatisfiedAssumption,
71)
72from hypothesis.internal import observability
73from hypothesis.internal.compat import (
74 PYPY,
75 BaseExceptionGroup,
76 EllipsisType,
77 add_note,
78 bad_django_TestCase,
79 get_type_hints,
80 int_from_bytes,
81)
82from hypothesis.internal.conjecture.choice import ChoiceT
83from hypothesis.internal.conjecture.data import ConjectureData, Status
84from hypothesis.internal.conjecture.engine import BUFFER_SIZE, ConjectureRunner
85from hypothesis.internal.conjecture.junkdrawer import (
86 ensure_free_stackframes,
87 gc_cumulative_time,
88)
89from hypothesis.internal.conjecture.providers import (
90 BytestringProvider,
91 PrimitiveProvider,
92)
93from hypothesis.internal.conjecture.shrinker import sort_key
94from hypothesis.internal.entropy import deterministic_PRNG
95from hypothesis.internal.escalation import (
96 InterestingOrigin,
97 current_pytest_item,
98 format_exception,
99 get_trimmed_traceback,
100 is_hypothesis_file,
101)
102from hypothesis.internal.healthcheck import fail_health_check
103from hypothesis.internal.observability import (
104 TESTCASE_CALLBACKS,
105 InfoObservation,
106 InfoObservationType,
107 deliver_observation,
108 make_testcase,
109)
110from hypothesis.internal.reflection import (
111 convert_positional_arguments,
112 define_function_signature,
113 function_digest,
114 get_pretty_function_description,
115 get_signature,
116 impersonate,
117 is_mock,
118 nicerepr,
119 proxies,
120 repr_call,
121)
122from hypothesis.internal.scrutineer import (
123 MONITORING_TOOL_ID,
124 Trace,
125 Tracer,
126 explanatory_lines,
127 tractable_coverage_report,
128)
129from hypothesis.internal.validation import check_type
130from hypothesis.reporting import (
131 current_verbosity,
132 report,
133 verbose_report,
134 with_reporter,
135)
136from hypothesis.statistics import describe_statistics, describe_targets, note_statistics
137from hypothesis.strategies._internal.misc import NOTHING
138from hypothesis.strategies._internal.strategies import (
139 Ex,
140 SearchStrategy,
141 check_strategy,
142)
143from hypothesis.vendor.pretty import RepresentationPrinter
144from hypothesis.version import __version__
146TestFunc = TypeVar("TestFunc", bound=Callable)
149running_under_pytest = False
150pytest_shows_exceptiongroups = True
151global_force_seed = None
152_hypothesis_global_random = None
155@dataclass
156class Example:
157 args: Any
158 kwargs: Any
159 # Plus two optional arguments for .xfail()
160 raises: Any = field(default=None)
161 reason: Any = field(default=None)
164# TODO_DOCS link to not-yet-existent patch-dumping docs
167class example:
168 """
169 Add an explicit input to a Hypothesis test, which Hypothesis will always
170 try before generating random inputs. This combines the randomized nature of
171 Hypothesis generation with a traditional parametrized test.
173 For example:
175 .. code-block:: python
177 @example("Hello world")
178 @example("some string with special significance")
179 @given(st.text())
180 def test_strings(s):
181 pass
183 will call ``test_strings("Hello World")`` and
184 ``test_strings("some string with special significance")`` before generating
185 any random inputs. |@example| may be placed in any order relative to |@given|
186 and |@settings|.
188 Explicit inputs from |@example| are run in the |Phase.explicit| phase.
189 Explicit inputs do not count towards |settings.max_examples|. Note that
190 explicit inputs added by |@example| do not shrink. If an explicit input
191 fails, Hypothesis will stop and report the failure without generating any
192 random inputs.
194 |@example| can also be used to easily reproduce a failure. For instance, if
195 Hypothesis reports that ``f(n=[0, math.nan])`` fails, you can add
196 ``@example(n=[0, math.nan])`` to your test to quickly reproduce that failure.
198 Arguments to ``@example``
199 -------------------------
201 Arguments to |@example| have the same behavior and restrictions as arguments
202 to |@given|. This means they may be either positional or keyword arguments
203 (but not both in the same |@example|):
205 .. code-block:: python
207 @example(1, 2)
208 @example(x=1, y=2)
209 @given(st.integers(), st.integers())
210 def test(x, y):
211 pass
213 Noting that while arguments to |@given| are strategies (like |st.integers|),
214 arguments to |@example| are values instead (like ``1``).
216 See the :ref:`given-arguments` section for full details.
217 """
219 def __init__(self, *args: Any, **kwargs: Any) -> None:
220 if args and kwargs:
221 raise InvalidArgument(
222 "Cannot mix positional and keyword arguments for examples"
223 )
224 if not (args or kwargs):
225 raise InvalidArgument("An example must provide at least one argument")
227 self.hypothesis_explicit_examples: list[Example] = []
228 self._this_example = Example(tuple(args), kwargs)
230 def __call__(self, test: TestFunc) -> TestFunc:
231 if not hasattr(test, "hypothesis_explicit_examples"):
232 test.hypothesis_explicit_examples = self.hypothesis_explicit_examples # type: ignore
233 test.hypothesis_explicit_examples.append(self._this_example) # type: ignore
234 return test
236 def xfail(
237 self,
238 condition: bool = True, # noqa: FBT002
239 *,
240 reason: str = "",
241 raises: Union[
242 type[BaseException], tuple[type[BaseException], ...]
243 ] = BaseException,
244 ) -> "example":
245 """Mark this example as an expected failure, similarly to
246 :obj:`pytest.mark.xfail(strict=True) <pytest.mark.xfail>`.
248 Expected-failing examples allow you to check that your test does fail on
249 some examples, and therefore build confidence that *passing* tests are
250 because your code is working, not because the test is missing something.
252 .. code-block:: python
254 @example(...).xfail()
255 @example(...).xfail(reason="Prices must be non-negative")
256 @example(...).xfail(raises=(KeyError, ValueError))
257 @example(...).xfail(sys.version_info[:2] >= (3, 12), reason="needs py 3.12")
258 @example(...).xfail(condition=sys.platform != "linux", raises=OSError)
259 def test(x):
260 pass
262 .. note::
264 Expected-failing examples are handled separately from those generated
265 by strategies, so you should usually ensure that there is no overlap.
267 .. code-block:: python
269 @example(x=1, y=0).xfail(raises=ZeroDivisionError)
270 @given(x=st.just(1), y=st.integers()) # Missing `.filter(bool)`!
271 def test_fraction(x, y):
272 # This test will try the explicit example and see it fail as
273 # expected, then go on to generate more examples from the
274 # strategy. If we happen to generate y=0, the test will fail
275 # because only the explicit example is treated as xfailing.
276 x / y
277 """
278 check_type(bool, condition, "condition")
279 check_type(str, reason, "reason")
280 if not (
281 isinstance(raises, type) and issubclass(raises, BaseException)
282 ) and not (
283 isinstance(raises, tuple)
284 and raises # () -> expected to fail with no error, which is impossible
285 and all(
286 isinstance(r, type) and issubclass(r, BaseException) for r in raises
287 )
288 ):
289 raise InvalidArgument(
290 f"{raises=} must be an exception type or tuple of exception types"
291 )
292 if condition:
293 self._this_example = dataclasses.replace(
294 self._this_example, raises=raises, reason=reason
295 )
296 return self
298 def via(self, whence: str, /) -> "example":
299 """Attach a machine-readable label noting what the origin of this example
300 was. |example.via| is completely optional and does not change runtime
301 behavior.
303 |example.via| is intended to support self-documenting behavior, as well as
304 tooling which might add (or remove) |@example| decorators automatically.
305 For example:
307 .. code-block:: python
309 # Annotating examples is optional and does not change runtime behavior
310 @example(...)
311 @example(...).via("regression test for issue #42")
312 @example(...).via("discovered failure")
313 def test(x):
314 pass
316 .. note::
318 `HypoFuzz <https://hypofuzz.com/>`_ uses |example.via| to tag examples
319 in the patch of its high-coverage set of explicit inputs, on
320 `the patches page <https://hypofuzz.com/example-dashboard/#/patches>`_.
321 """
322 if not isinstance(whence, str):
323 raise InvalidArgument(".via() must be passed a string")
324 # This is deliberately a no-op at runtime; the tools operate on source code.
325 return self
328def seed(seed: Hashable) -> Callable[[TestFunc], TestFunc]:
329 """
330 Seed the randomness for this test.
332 ``seed`` may be any hashable object. No exact meaning for ``seed`` is provided
333 other than that for a fixed seed value Hypothesis will produce the same
334 examples (assuming that there are no other sources of nondeterminisim, such
335 as timing, hash randomization, or external state).
337 For example, the following test function and |RuleBasedStateMachine| will
338 each generate the same series of examples each time they are executed:
340 .. code-block:: python
342 @seed(1234)
343 @given(st.integers())
344 def test(n): ...
346 @seed(6789)
347 class MyMachine(RuleBasedStateMachine): ...
349 If using pytest, you can alternatively pass ``--hypothesis-seed`` on the
350 command line.
352 Setting a seed overrides |settings.derandomize|, which is designed to enable
353 deterministic CI tests rather than reproducing observed failures.
355 Hypothesis will only print the seed which would reproduce a failure if a test
356 fails in an unexpected way, for instance inside Hypothesis internals.
357 """
359 def accept(test):
360 test._hypothesis_internal_use_seed = seed
361 current_settings = getattr(test, "_hypothesis_internal_use_settings", None)
362 test._hypothesis_internal_use_settings = Settings(
363 current_settings, database=None
364 )
365 return test
367 return accept
370# TODO_DOCS: link to /explanation/choice-sequence
373def reproduce_failure(version: str, blob: bytes) -> Callable[[TestFunc], TestFunc]:
374 """
375 Run the example corresponding to the binary ``blob`` in order to reproduce a
376 failure. ``blob`` is a serialized version of the internal input representation
377 of Hypothesis.
379 A test decorated with |@reproduce_failure| always runs exactly one example,
380 which is expected to cause a failure. If the provided ``blob`` does not
381 cause a failure, Hypothesis will raise |DidNotReproduce|.
383 Hypothesis will print an |@reproduce_failure| decorator if
384 |settings.print_blob| is ``True`` (which is the default in CI).
386 |@reproduce_failure| is intended to be temporarily added to your test suite in
387 order to reproduce a failure. It is not intended to be a permanent addition to
388 your test suite. Because of this, no compatibility guarantees are made across
389 Hypothesis versions, and |@reproduce_failure| will error if used on a different
390 Hypothesis version than it was created for.
392 .. seealso::
394 See also the :doc:`/tutorial/replaying-failures` tutorial.
395 """
397 def accept(test):
398 test._hypothesis_internal_use_reproduce_failure = (version, blob)
399 return test
401 return accept
404def reproduction_decorator(choices: Iterable[ChoiceT]) -> str:
405 return f"@reproduce_failure({__version__!r}, {encode_failure(choices)!r})"
408def encode_failure(choices: Iterable[ChoiceT]) -> bytes:
409 blob = choices_to_bytes(choices)
410 compressed = zlib.compress(blob)
411 if len(compressed) < len(blob):
412 blob = b"\1" + compressed
413 else:
414 blob = b"\0" + blob
415 return base64.b64encode(blob)
418def decode_failure(blob: bytes) -> Sequence[ChoiceT]:
419 try:
420 decoded = base64.b64decode(blob)
421 except Exception:
422 raise InvalidArgument(f"Invalid base64 encoded string: {blob!r}") from None
424 prefix = decoded[:1]
425 if prefix == b"\0":
426 decoded = decoded[1:]
427 elif prefix == b"\1":
428 try:
429 decoded = zlib.decompress(decoded[1:])
430 except zlib.error as err:
431 raise InvalidArgument(
432 f"Invalid zlib compression for blob {blob!r}"
433 ) from err
434 else:
435 raise InvalidArgument(
436 f"Could not decode blob {blob!r}: Invalid start byte {prefix!r}"
437 )
439 choices = choices_from_bytes(decoded)
440 if choices is None:
441 raise InvalidArgument(f"Invalid serialized choice sequence for blob {blob!r}")
443 return choices
446def _invalid(message, *, exc=InvalidArgument, test, given_kwargs):
447 @impersonate(test)
448 def wrapped_test(*arguments, **kwargs): # pragma: no cover # coverage limitation
449 raise exc(message)
451 wrapped_test.is_hypothesis_test = True
452 wrapped_test.hypothesis = HypothesisHandle(
453 inner_test=test,
454 _get_fuzz_target=wrapped_test,
455 _given_kwargs=given_kwargs,
456 )
457 return wrapped_test
460def is_invalid_test(test, original_sig, given_arguments, given_kwargs):
461 """Check the arguments to ``@given`` for basic usage constraints.
463 Most errors are not raised immediately; instead we return a dummy test
464 function that will raise the appropriate error if it is actually called.
465 When the user runs a subset of tests (e.g via ``pytest -k``), errors will
466 only be reported for tests that actually ran.
467 """
468 invalid = partial(_invalid, test=test, given_kwargs=given_kwargs)
470 if not (given_arguments or given_kwargs):
471 return invalid("given must be called with at least one argument")
473 params = list(original_sig.parameters.values())
474 pos_params = [p for p in params if p.kind is p.POSITIONAL_OR_KEYWORD]
475 kwonly_params = [p for p in params if p.kind is p.KEYWORD_ONLY]
476 if given_arguments and params != pos_params:
477 return invalid(
478 "positional arguments to @given are not supported with varargs, "
479 "varkeywords, positional-only, or keyword-only arguments"
480 )
482 if len(given_arguments) > len(pos_params):
483 return invalid(
484 f"Too many positional arguments for {test.__name__}() were passed to "
485 f"@given - expected at most {len(pos_params)} "
486 f"arguments, but got {len(given_arguments)} {given_arguments!r}"
487 )
489 if ... in given_arguments:
490 return invalid(
491 "... was passed as a positional argument to @given, but may only be "
492 "passed as a keyword argument or as the sole argument of @given"
493 )
495 if given_arguments and given_kwargs:
496 return invalid("cannot mix positional and keyword arguments to @given")
497 extra_kwargs = [
498 k for k in given_kwargs if k not in {p.name for p in pos_params + kwonly_params}
499 ]
500 if extra_kwargs and (params == [] or params[-1].kind is not params[-1].VAR_KEYWORD):
501 arg = extra_kwargs[0]
502 extra = ""
503 if arg in all_settings:
504 extra = f". Did you mean @settings({arg}={given_kwargs[arg]!r})?"
505 return invalid(
506 f"{test.__name__}() got an unexpected keyword argument {arg!r}, "
507 f"from `{arg}={given_kwargs[arg]!r}` in @given{extra}"
508 )
509 if any(p.default is not p.empty for p in params):
510 return invalid("Cannot apply @given to a function with defaults.")
512 # This case would raise Unsatisfiable *anyway*, but by detecting it here we can
513 # provide a much more helpful error message for people e.g. using the Ghostwriter.
514 empty = [
515 f"{s!r} (arg {idx})" for idx, s in enumerate(given_arguments) if s is NOTHING
516 ] + [f"{name}={s!r}" for name, s in given_kwargs.items() if s is NOTHING]
517 if empty:
518 strats = "strategies" if len(empty) > 1 else "strategy"
519 return invalid(
520 f"Cannot generate examples from empty {strats}: " + ", ".join(empty),
521 exc=Unsatisfiable,
522 )
525def execute_explicit_examples(state, wrapped_test, arguments, kwargs, original_sig):
526 assert isinstance(state, StateForActualGivenExecution)
527 posargs = [
528 p.name
529 for p in original_sig.parameters.values()
530 if p.kind is p.POSITIONAL_OR_KEYWORD
531 ]
533 for example in reversed(getattr(wrapped_test, "hypothesis_explicit_examples", ())):
534 assert isinstance(example, Example)
535 # All of this validation is to check that @example() got "the same" arguments
536 # as @given, i.e. corresponding to the same parameters, even though they might
537 # be any mixture of positional and keyword arguments.
538 if example.args:
539 assert not example.kwargs
540 if any(
541 p.kind is p.POSITIONAL_ONLY for p in original_sig.parameters.values()
542 ):
543 raise InvalidArgument(
544 "Cannot pass positional arguments to @example() when decorating "
545 "a test function which has positional-only parameters."
546 )
547 if len(example.args) > len(posargs):
548 raise InvalidArgument(
549 "example has too many arguments for test. Expected at most "
550 f"{len(posargs)} but got {len(example.args)}"
551 )
552 example_kwargs = dict(zip(posargs[-len(example.args) :], example.args))
553 else:
554 example_kwargs = dict(example.kwargs)
555 given_kws = ", ".join(
556 repr(k) for k in sorted(wrapped_test.hypothesis._given_kwargs)
557 )
558 example_kws = ", ".join(repr(k) for k in sorted(example_kwargs))
559 if given_kws != example_kws:
560 raise InvalidArgument(
561 f"Inconsistent args: @given() got strategies for {given_kws}, "
562 f"but @example() got arguments for {example_kws}"
563 ) from None
565 # This is certainly true because the example_kwargs exactly match the params
566 # reserved by @given(), which are then remove from the function signature.
567 assert set(example_kwargs).isdisjoint(kwargs)
568 example_kwargs.update(kwargs)
570 if Phase.explicit not in state.settings.phases:
571 continue
573 with local_settings(state.settings):
574 fragments_reported = []
575 empty_data = ConjectureData.for_choices([])
576 try:
577 execute_example = partial(
578 state.execute_once,
579 empty_data,
580 is_final=True,
581 print_example=True,
582 example_kwargs=example_kwargs,
583 )
584 with with_reporter(fragments_reported.append):
585 if example.raises is None:
586 execute_example()
587 else:
588 # @example(...).xfail(...)
589 bits = ", ".join(nicerepr(x) for x in arguments) + ", ".join(
590 f"{k}={nicerepr(v)}" for k, v in example_kwargs.items()
591 )
592 try:
593 execute_example()
594 except failure_exceptions_to_catch() as err:
595 if not isinstance(err, example.raises):
596 raise
597 # Save a string form of this example; we'll warn if it's
598 # ever generated by the strategy (which can't be xfailed)
599 state.xfail_example_reprs.add(
600 repr_call(state.test, arguments, example_kwargs)
601 )
602 except example.raises as err:
603 # We'd usually check this as early as possible, but it's
604 # possible for failure_exceptions_to_catch() to grow when
605 # e.g. pytest is imported between import- and test-time.
606 raise InvalidArgument(
607 f"@example({bits}) raised an expected {err!r}, "
608 "but Hypothesis does not treat this as a test failure"
609 ) from err
610 else:
611 # Unexpectedly passing; always raise an error in this case.
612 reason = f" because {example.reason}" * bool(example.reason)
613 if example.raises is BaseException:
614 name = "exception" # special-case no raises= arg
615 elif not isinstance(example.raises, tuple):
616 name = example.raises.__name__
617 elif len(example.raises) == 1:
618 name = example.raises[0].__name__
619 else:
620 name = (
621 ", ".join(ex.__name__ for ex in example.raises[:-1])
622 + f", or {example.raises[-1].__name__}"
623 )
624 vowel = name.upper()[0] in "AEIOU"
625 raise AssertionError(
626 f"Expected a{'n' * vowel} {name} from @example({bits})"
627 f"{reason}, but no exception was raised."
628 )
629 except UnsatisfiedAssumption:
630 # Odd though it seems, we deliberately support explicit examples that
631 # are then rejected by a call to `assume()`. As well as iterative
632 # development, this is rather useful to replay Hypothesis' part of
633 # a saved failure when other arguments are supplied by e.g. pytest.
634 # See https://github.com/HypothesisWorks/hypothesis/issues/2125
635 with contextlib.suppress(StopTest):
636 empty_data.conclude_test(Status.INVALID)
637 except BaseException as err:
638 # In order to support reporting of multiple failing examples, we yield
639 # each of the (report text, error) pairs we find back to the top-level
640 # runner. This also ensures that user-facing stack traces have as few
641 # frames of Hypothesis internals as possible.
642 err = err.with_traceback(get_trimmed_traceback())
644 # One user error - whether misunderstanding or typo - we've seen a few
645 # times is to pass strategies to @example() where values are expected.
646 # Checking is easy, and false-positives not much of a problem, so:
647 if isinstance(err, failure_exceptions_to_catch()) and any(
648 isinstance(arg, SearchStrategy)
649 for arg in example.args + tuple(example.kwargs.values())
650 ):
651 new = HypothesisWarning(
652 "The @example() decorator expects to be passed values, but "
653 "you passed strategies instead. See https://hypothesis."
654 "readthedocs.io/en/latest/reference/api.html#hypothesis"
655 ".example for details."
656 )
657 new.__cause__ = err
658 err = new
660 with contextlib.suppress(StopTest):
661 empty_data.conclude_test(Status.INVALID)
662 yield (fragments_reported, err)
663 if (
664 state.settings.report_multiple_bugs
665 and pytest_shows_exceptiongroups
666 and isinstance(err, failure_exceptions_to_catch())
667 and not isinstance(err, skip_exceptions_to_reraise())
668 ):
669 continue
670 break
671 finally:
672 if fragments_reported:
673 assert fragments_reported[0].startswith("Falsifying example")
674 fragments_reported[0] = fragments_reported[0].replace(
675 "Falsifying example", "Falsifying explicit example", 1
676 )
678 empty_data.freeze()
679 tc = make_testcase(
680 run_start=state._start_timestamp,
681 property=state.test_identifier,
682 data=empty_data,
683 how_generated="explicit example",
684 representation=state._string_repr,
685 timing=state._timing_features,
686 )
687 deliver_observation(tc)
689 if fragments_reported:
690 verbose_report(fragments_reported[0].replace("Falsifying", "Trying", 1))
691 for f in fragments_reported[1:]:
692 verbose_report(f)
695def get_random_for_wrapped_test(test, wrapped_test):
696 settings = wrapped_test._hypothesis_internal_use_settings
697 wrapped_test._hypothesis_internal_use_generated_seed = None
699 if wrapped_test._hypothesis_internal_use_seed is not None:
700 return Random(wrapped_test._hypothesis_internal_use_seed)
701 elif settings.derandomize:
702 return Random(int_from_bytes(function_digest(test)))
703 elif global_force_seed is not None:
704 return Random(global_force_seed)
705 else:
706 global _hypothesis_global_random
707 if _hypothesis_global_random is None: # pragma: no cover
708 _hypothesis_global_random = Random()
709 seed = _hypothesis_global_random.getrandbits(128)
710 wrapped_test._hypothesis_internal_use_generated_seed = seed
711 return Random(seed)
714@dataclass
715class Stuff:
716 selfy: Any
717 args: tuple
718 kwargs: dict
719 given_kwargs: dict
722def process_arguments_to_given(
723 wrapped_test: Any,
724 arguments: Sequence[object],
725 kwargs: dict[str, object],
726 given_kwargs: dict[str, SearchStrategy],
727 params: dict[str, Parameter],
728) -> tuple[Sequence[object], dict[str, object], Stuff]:
729 selfy = None
730 arguments, kwargs = convert_positional_arguments(wrapped_test, arguments, kwargs)
732 # If the test function is a method of some kind, the bound object
733 # will be the first named argument if there are any, otherwise the
734 # first vararg (if any).
735 posargs = [p.name for p in params.values() if p.kind is p.POSITIONAL_OR_KEYWORD]
736 if posargs:
737 selfy = kwargs.get(posargs[0])
738 elif arguments:
739 selfy = arguments[0]
741 # Ensure that we don't mistake mocks for self here.
742 # This can cause the mock to be used as the test runner.
743 if is_mock(selfy):
744 selfy = None
746 arguments = tuple(arguments)
748 with ensure_free_stackframes():
749 for k, s in given_kwargs.items():
750 check_strategy(s, name=k)
751 s.validate()
753 stuff = Stuff(selfy=selfy, args=arguments, kwargs=kwargs, given_kwargs=given_kwargs)
755 return arguments, kwargs, stuff
758def skip_exceptions_to_reraise():
759 """Return a tuple of exceptions meaning 'skip this test', to re-raise.
761 This is intended to cover most common test runners; if you would
762 like another to be added please open an issue or pull request adding
763 it to this function and to tests/cover/test_lazy_import.py
764 """
765 # This is a set because nose may simply re-export unittest.SkipTest
766 exceptions = set()
767 # We use this sys.modules trick to avoid importing libraries -
768 # you can't be an instance of a type from an unimported module!
769 # This is fast enough that we don't need to cache the result,
770 # and more importantly it avoids possible side-effects :-)
771 if "unittest" in sys.modules:
772 exceptions.add(sys.modules["unittest"].SkipTest)
773 if "unittest2" in sys.modules:
774 exceptions.add(sys.modules["unittest2"].SkipTest)
775 if "nose" in sys.modules:
776 exceptions.add(sys.modules["nose"].SkipTest)
777 if "_pytest" in sys.modules:
778 exceptions.add(sys.modules["_pytest"].outcomes.Skipped)
779 return tuple(sorted(exceptions, key=str))
782def failure_exceptions_to_catch() -> tuple[type[BaseException], ...]:
783 """Return a tuple of exceptions meaning 'this test has failed', to catch.
785 This is intended to cover most common test runners; if you would
786 like another to be added please open an issue or pull request.
787 """
788 # While SystemExit and GeneratorExit are instances of BaseException, we also
789 # expect them to be deterministic - unlike KeyboardInterrupt - and so we treat
790 # them as standard exceptions, check for flakiness, etc.
791 # See https://github.com/HypothesisWorks/hypothesis/issues/2223 for details.
792 exceptions = [Exception, SystemExit, GeneratorExit]
793 if "_pytest" in sys.modules:
794 exceptions.append(sys.modules["_pytest"].outcomes.Failed)
795 return tuple(exceptions)
798def new_given_signature(original_sig, given_kwargs):
799 """Make an updated signature for the wrapped test."""
800 return original_sig.replace(
801 parameters=[
802 p
803 for p in original_sig.parameters.values()
804 if not (
805 p.name in given_kwargs
806 and p.kind in (p.POSITIONAL_OR_KEYWORD, p.KEYWORD_ONLY)
807 )
808 ],
809 return_annotation=None,
810 )
813def default_executor(data, function):
814 return function(data)
817def get_executor(runner):
818 try:
819 execute_example = runner.execute_example
820 except AttributeError:
821 pass
822 else:
823 return lambda data, function: execute_example(partial(function, data))
825 if hasattr(runner, "setup_example") or hasattr(runner, "teardown_example"):
826 setup = getattr(runner, "setup_example", None) or (lambda: None)
827 teardown = getattr(runner, "teardown_example", None) or (lambda ex: None)
829 def execute(data, function):
830 token = None
831 try:
832 token = setup()
833 return function(data)
834 finally:
835 teardown(token)
837 return execute
839 return default_executor
842@contextlib.contextmanager
843def unwrap_markers_from_group() -> Generator[None, None, None]:
844 # This function is a crude solution, a better way of resolving it would probably
845 # be to rewrite a bunch of exception handlers to use except*.
846 T = TypeVar("T", bound=BaseException)
848 def _flatten_group(excgroup: BaseExceptionGroup[T]) -> list[T]:
849 found_exceptions: list[T] = []
850 for exc in excgroup.exceptions:
851 if isinstance(exc, BaseExceptionGroup):
852 found_exceptions.extend(_flatten_group(exc))
853 else:
854 found_exceptions.append(exc)
855 return found_exceptions
857 try:
858 yield
859 except BaseExceptionGroup as excgroup:
860 frozen_exceptions, non_frozen_exceptions = excgroup.split(Frozen)
862 # group only contains Frozen, reraise the group
863 # it doesn't matter what we raise, since any exceptions get disregarded
864 # and reraised as StopTest if data got frozen.
865 if non_frozen_exceptions is None:
866 raise
867 # in all other cases they are discarded
869 # Can RewindRecursive end up in this group?
870 _, user_exceptions = non_frozen_exceptions.split(
871 lambda e: isinstance(e, (StopTest, HypothesisException))
872 )
874 # this might contain marker exceptions, or internal errors, but not frozen.
875 if user_exceptions is not None:
876 raise
878 # single marker exception - reraise it
879 flattened_non_frozen_exceptions: list[BaseException] = _flatten_group(
880 non_frozen_exceptions
881 )
882 if len(flattened_non_frozen_exceptions) == 1:
883 e = flattened_non_frozen_exceptions[0]
884 # preserve the cause of the original exception to not hinder debugging
885 # note that __context__ is still lost though
886 raise e from e.__cause__
888 # multiple marker exceptions. If we re-raise the whole group we break
889 # a bunch of logic so ....?
890 stoptests, non_stoptests = non_frozen_exceptions.split(StopTest)
892 # TODO: stoptest+hypothesisexception ...? Is it possible? If so, what do?
894 if non_stoptests:
895 # TODO: multiple marker exceptions is easy to produce, but the logic in the
896 # engine does not handle it... so we just reraise the first one for now.
897 e = _flatten_group(non_stoptests)[0]
898 raise e from e.__cause__
899 assert stoptests is not None
901 # multiple stoptests: raising the one with the lowest testcounter
902 raise min(_flatten_group(stoptests), key=lambda s_e: s_e.testcounter)
905class StateForActualGivenExecution:
906 def __init__(self, stuff, test, settings, random, wrapped_test):
907 self.test_runner = get_executor(stuff.selfy)
908 self.stuff = stuff
909 self.settings = settings
910 self.last_exception = None
911 self.falsifying_examples = ()
912 self.random = random
913 self.ever_executed = False
915 self.is_find = getattr(wrapped_test, "_hypothesis_internal_is_find", False)
916 self.wrapped_test = wrapped_test
917 self.xfail_example_reprs = set()
919 self.test = test
921 self.print_given_args = getattr(
922 wrapped_test, "_hypothesis_internal_print_given_args", True
923 )
925 self.files_to_propagate = set()
926 self.failed_normally = False
927 self.failed_due_to_deadline = False
929 self.explain_traces = defaultdict(set)
930 self._start_timestamp = time.time()
931 self._string_repr = ""
932 self._timing_features = {}
934 @property
935 def test_identifier(self):
936 return getattr(
937 current_pytest_item.value, "nodeid", None
938 ) or get_pretty_function_description(self.wrapped_test)
940 def _should_trace(self):
941 # NOTE: we explicitly support monkeypatching this. Keep the namespace
942 # access intact.
943 _trace_obs = TESTCASE_CALLBACKS and observability.OBSERVABILITY_COLLECT_COVERAGE
944 _trace_failure = (
945 self.failed_normally
946 and not self.failed_due_to_deadline
947 and {Phase.shrink, Phase.explain}.issubset(self.settings.phases)
948 )
949 return _trace_obs or _trace_failure
951 def execute_once(
952 self,
953 data,
954 *,
955 print_example=False,
956 is_final=False,
957 expected_failure=None,
958 example_kwargs=None,
959 ):
960 """Run the test function once, using ``data`` as input.
962 If the test raises an exception, it will propagate through to the
963 caller of this method. Depending on its type, this could represent
964 an ordinary test failure, or a fatal error, or a control exception.
966 If this method returns normally, the test might have passed, or
967 it might have placed ``data`` in an unsuccessful state and then
968 swallowed the corresponding control exception.
969 """
971 self.ever_executed = True
972 data.is_find = self.is_find
974 self._string_repr = ""
975 text_repr = None
976 if self.settings.deadline is None and not TESTCASE_CALLBACKS:
978 @proxies(self.test)
979 def test(*args, **kwargs):
980 with unwrap_markers_from_group():
981 # NOTE: For compatibility with Python 3.9's LL(1)
982 # parser, this is written as a nested with-statement,
983 # instead of a compound one.
984 with ensure_free_stackframes():
985 return self.test(*args, **kwargs)
987 else:
989 @proxies(self.test)
990 def test(*args, **kwargs):
991 arg_drawtime = math.fsum(data.draw_times.values())
992 arg_stateful = math.fsum(data._stateful_run_times.values())
993 arg_gctime = gc_cumulative_time()
994 start = time.perf_counter()
995 try:
996 with unwrap_markers_from_group():
997 # NOTE: For compatibility with Python 3.9's LL(1)
998 # parser, this is written as a nested with-statement,
999 # instead of a compound one.
1000 with ensure_free_stackframes():
1001 result = self.test(*args, **kwargs)
1002 finally:
1003 finish = time.perf_counter()
1004 in_drawtime = math.fsum(data.draw_times.values()) - arg_drawtime
1005 in_stateful = (
1006 math.fsum(data._stateful_run_times.values()) - arg_stateful
1007 )
1008 in_gctime = gc_cumulative_time() - arg_gctime
1009 runtime = finish - start - in_drawtime - in_stateful - in_gctime
1010 self._timing_features = {
1011 "execute:test": runtime,
1012 "overall:gc": in_gctime,
1013 **data.draw_times,
1014 **data._stateful_run_times,
1015 }
1017 if (current_deadline := self.settings.deadline) is not None:
1018 if not is_final:
1019 current_deadline = (current_deadline // 4) * 5
1020 if runtime >= current_deadline.total_seconds():
1021 raise DeadlineExceeded(
1022 datetime.timedelta(seconds=runtime), self.settings.deadline
1023 )
1024 return result
1026 def run(data: ConjectureData) -> None:
1027 # Set up dynamic context needed by a single test run.
1028 if self.stuff.selfy is not None:
1029 data.hypothesis_runner = self.stuff.selfy
1030 # Generate all arguments to the test function.
1031 args = self.stuff.args
1032 kwargs = dict(self.stuff.kwargs)
1033 if example_kwargs is None:
1034 kw, argslices = context.prep_args_kwargs_from_strategies(
1035 self.stuff.given_kwargs
1036 )
1037 else:
1038 kw = example_kwargs
1039 argslices = {}
1040 kwargs.update(kw)
1041 if expected_failure is not None:
1042 nonlocal text_repr
1043 text_repr = repr_call(test, args, kwargs)
1045 if print_example or current_verbosity() >= Verbosity.verbose:
1046 printer = RepresentationPrinter(context=context)
1047 if print_example:
1048 printer.text("Falsifying example:")
1049 else:
1050 printer.text("Trying example:")
1052 if self.print_given_args:
1053 printer.text(" ")
1054 printer.repr_call(
1055 test.__name__,
1056 args,
1057 kwargs,
1058 force_split=True,
1059 arg_slices=argslices,
1060 leading_comment=(
1061 "# " + context.data.slice_comments[(0, 0)]
1062 if (0, 0) in context.data.slice_comments
1063 else None
1064 ),
1065 avoid_realization=data.provider.avoid_realization,
1066 )
1067 report(printer.getvalue())
1069 if TESTCASE_CALLBACKS:
1070 printer = RepresentationPrinter(context=context)
1071 printer.repr_call(
1072 test.__name__,
1073 args,
1074 kwargs,
1075 force_split=True,
1076 arg_slices=argslices,
1077 leading_comment=(
1078 "# " + context.data.slice_comments[(0, 0)]
1079 if (0, 0) in context.data.slice_comments
1080 else None
1081 ),
1082 avoid_realization=data.provider.avoid_realization,
1083 )
1084 self._string_repr = printer.getvalue()
1086 try:
1087 return test(*args, **kwargs)
1088 except TypeError as e:
1089 # If we sampled from a sequence of strategies, AND failed with a
1090 # TypeError, *AND that exception mentions SearchStrategy*, add a note:
1091 if (
1092 "SearchStrategy" in str(e)
1093 and data._sampled_from_all_strategies_elements_message is not None
1094 ):
1095 msg, format_arg = data._sampled_from_all_strategies_elements_message
1096 add_note(e, msg.format(format_arg))
1097 raise
1098 finally:
1099 if parts := getattr(data, "_stateful_repr_parts", None):
1100 self._string_repr = "\n".join(parts)
1102 if TESTCASE_CALLBACKS:
1103 printer = RepresentationPrinter(context=context)
1104 for name, value in data._observability_args.items():
1105 if name.startswith("generate:Draw "):
1106 try:
1107 value = data.provider.realize(value)
1108 except BackendCannotProceed: # pragma: no cover
1109 value = "<backend failed to realize symbolic>"
1110 printer.text(f"\n{name.removeprefix('generate:')}: ")
1111 printer.pretty(value)
1113 self._string_repr += printer.getvalue()
1115 # self.test_runner can include the execute_example method, or setup/teardown
1116 # _example, so it's important to get the PRNG and build context in place first.
1117 #
1118 # NOTE: For compatibility with Python 3.9's LL(1) parser, this is written as
1119 # three nested with-statements, instead of one compound statement.
1120 with local_settings(self.settings):
1121 with deterministic_PRNG():
1122 with BuildContext(
1123 data, is_final=is_final, wrapped_test=self.wrapped_test
1124 ) as context:
1125 # providers may throw in per_case_context_fn, and we'd like
1126 # `result` to still be set in these cases.
1127 result = None
1128 with data.provider.per_test_case_context_manager():
1129 # Run the test function once, via the executor hook.
1130 # In most cases this will delegate straight to `run(data)`.
1131 result = self.test_runner(data, run)
1133 # If a failure was expected, it should have been raised already, so
1134 # instead raise an appropriate diagnostic error.
1135 if expected_failure is not None:
1136 exception, traceback = expected_failure
1137 if isinstance(exception, DeadlineExceeded) and (
1138 runtime_secs := math.fsum(
1139 v
1140 for k, v in self._timing_features.items()
1141 if k.startswith("execute:")
1142 )
1143 ):
1144 report(
1145 "Unreliable test timings! On an initial run, this "
1146 f"test took {exception.runtime.total_seconds() * 1000:.2f}ms, "
1147 "which exceeded the deadline of "
1148 f"{self.settings.deadline.total_seconds() * 1000:.2f}ms, but "
1149 f"on a subsequent run it took {runtime_secs * 1000:.2f} ms, "
1150 "which did not. If you expect this sort of "
1151 "variability in your test timings, consider turning "
1152 "deadlines off for this test by setting deadline=None."
1153 )
1154 else:
1155 report("Failed to reproduce exception. Expected: \n" + traceback)
1156 raise FlakyFailure(
1157 f"Hypothesis {text_repr} produces unreliable results: "
1158 "Falsified on the first call but did not on a subsequent one",
1159 [exception],
1160 )
1161 return result
1163 def _flaky_replay_to_failure(
1164 self, err: FlakyReplay, context: BaseException
1165 ) -> FlakyFailure:
1166 # Note that in the mark_interesting case, _context_ itself
1167 # is part of err._interesting_examples - but it's not in
1168 # _runner.interesting_examples - this is fine, as the context
1169 # (i.e., immediate exception) is appended.
1170 interesting_examples = [
1171 self._runner.interesting_examples[io]
1172 for io in err._interesting_origins
1173 if io in self._runner.interesting_examples
1174 ]
1175 exceptions = [ie.expected_exception for ie in interesting_examples]
1176 exceptions.append(context) # the immediate exception
1177 return FlakyFailure(err.reason, exceptions)
1179 def _execute_once_for_engine(self, data: ConjectureData) -> None:
1180 """Wrapper around ``execute_once`` that intercepts test failure
1181 exceptions and single-test control exceptions, and turns them into
1182 appropriate method calls to `data` instead.
1184 This allows the engine to assume that any exception other than
1185 ``StopTest`` must be a fatal error, and should stop the entire engine.
1186 """
1187 trace: Trace = set()
1188 try:
1189 with Tracer(should_trace=self._should_trace()) as tracer:
1190 try:
1191 result = self.execute_once(data)
1192 if (
1193 data.status == Status.VALID and tracer.branches
1194 ): # pragma: no cover
1195 # This is in fact covered by our *non-coverage* tests, but due
1196 # to the settrace() contention *not* by our coverage tests.
1197 self.explain_traces[None].add(frozenset(tracer.branches))
1198 finally:
1199 trace = tracer.branches
1200 if result is not None:
1201 fail_health_check(
1202 self.settings,
1203 "Tests run under @given should return None, but "
1204 f"{self.test.__name__} returned {result!r} instead.",
1205 HealthCheck.return_value,
1206 )
1207 except UnsatisfiedAssumption as e:
1208 # An "assume" check failed, so instead we inform the engine that
1209 # this test run was invalid.
1210 try:
1211 data.mark_invalid(e.reason)
1212 except FlakyReplay as err:
1213 # This was unexpected, meaning that the assume was flaky.
1214 # Report it as such.
1215 raise self._flaky_replay_to_failure(err, e) from None
1216 except (StopTest, BackendCannotProceed):
1217 # The engine knows how to handle this control exception, so it's
1218 # OK to re-raise it.
1219 raise
1220 except (
1221 FailedHealthCheck,
1222 *skip_exceptions_to_reraise(),
1223 ):
1224 # These are fatal errors or control exceptions that should stop the
1225 # engine, so we re-raise them.
1226 raise
1227 except failure_exceptions_to_catch() as e:
1228 # If an unhandled (i.e., non-Hypothesis) error was raised by
1229 # Hypothesis-internal code, re-raise it as a fatal error instead
1230 # of treating it as a test failure.
1231 if isinstance(e, BaseExceptionGroup) and len(e.exceptions) == 1:
1232 # When a naked exception is implicitly wrapped in an ExceptionGroup
1233 # due to a re-raising "except*", the ExceptionGroup is constructed in
1234 # the caller's stack frame (see #4183). This workaround is specifically
1235 # for implicit wrapping of naked exceptions by "except*", since explicit
1236 # raising of ExceptionGroup gets the proper traceback in the first place
1237 # - there's no need to handle hierarchical groups here, at least if no
1238 # such implicit wrapping happens inside hypothesis code (we only care
1239 # about the hypothesis-or-not distinction).
1240 #
1241 # 01-25-2025: this was patched to give the correct
1242 # stacktrace in cpython https://github.com/python/cpython/issues/128799.
1243 # can remove once python3.11 is EOL.
1244 tb = e.exceptions[0].__traceback__ or e.__traceback__
1245 else:
1246 tb = e.__traceback__
1247 filepath = traceback.extract_tb(tb)[-1][0]
1248 if (
1249 is_hypothesis_file(filepath)
1250 and not isinstance(e, HypothesisException)
1251 # We expect backend authors to use the provider_conformance test
1252 # to test their backends. If an error occurs there, it is probably
1253 # from their backend, and we would like to treat it as a standard
1254 # error, not a hypothesis-internal error.
1255 and not filepath.endswith(
1256 f"internal{os.sep}conjecture{os.sep}provider_conformance.py"
1257 )
1258 ):
1259 raise
1261 if data.frozen:
1262 # This can happen if an error occurred in a finally
1263 # block somewhere, suppressing our original StopTest.
1264 # We raise a new one here to resume normal operation.
1265 raise StopTest(data.testcounter) from e
1266 else:
1267 # The test failed by raising an exception, so we inform the
1268 # engine that this test run was interesting. This is the normal
1269 # path for test runs that fail.
1270 tb = get_trimmed_traceback()
1271 data.expected_traceback = format_exception(e, tb)
1272 data.expected_exception = e
1273 assert data.expected_traceback is not None # for mypy
1274 verbose_report(data.expected_traceback)
1276 self.failed_normally = True
1278 interesting_origin = InterestingOrigin.from_exception(e)
1279 if trace: # pragma: no cover
1280 # Trace collection is explicitly disabled under coverage.
1281 self.explain_traces[interesting_origin].add(frozenset(trace))
1282 if interesting_origin[0] == DeadlineExceeded:
1283 self.failed_due_to_deadline = True
1284 self.explain_traces.clear()
1285 try:
1286 data.mark_interesting(interesting_origin)
1287 except FlakyReplay as err:
1288 raise self._flaky_replay_to_failure(err, e) from None
1290 finally:
1291 # Conditional here so we can save some time constructing the payload; in
1292 # other cases (without coverage) it's cheap enough to do that regardless.
1293 if TESTCASE_CALLBACKS:
1294 if runner := getattr(self, "_runner", None):
1295 phase = runner._current_phase
1296 else: # pragma: no cover # in case of messing with internals
1297 if self.failed_normally or self.failed_due_to_deadline:
1298 phase = "shrink"
1299 else:
1300 phase = "unknown"
1301 backend_desc = f", using backend={self.settings.backend!r}" * (
1302 self.settings.backend != "hypothesis"
1303 and not getattr(runner, "_switch_to_hypothesis_provider", False)
1304 )
1305 try:
1306 data._observability_args = data.provider.realize(
1307 data._observability_args
1308 )
1309 self._string_repr = data.provider.realize(self._string_repr)
1310 except BackendCannotProceed:
1311 data._observability_args = {}
1312 self._string_repr = "<backend failed to realize symbolic arguments>"
1314 data.freeze()
1315 tc = make_testcase(
1316 run_start=self._start_timestamp,
1317 property=self.test_identifier,
1318 data=data,
1319 how_generated=f"during {phase} phase{backend_desc}",
1320 representation=self._string_repr,
1321 arguments=data._observability_args,
1322 timing=self._timing_features,
1323 coverage=tractable_coverage_report(trace) or None,
1324 phase=phase,
1325 backend_metadata=data.provider.observe_test_case(),
1326 )
1327 deliver_observation(tc)
1328 for msg in data.provider.observe_information_messages(
1329 lifetime="test_case"
1330 ):
1331 self._deliver_information_message(**msg)
1332 self._timing_features = {}
1334 def _deliver_information_message(
1335 self, *, type: InfoObservationType, title: str, content: Union[str, dict]
1336 ) -> None:
1337 deliver_observation(
1338 InfoObservation(
1339 type=type,
1340 run_start=self._start_timestamp,
1341 property=self.test_identifier,
1342 title=title,
1343 content=content,
1344 )
1345 )
1347 def run_engine(self):
1348 """Run the test function many times, on database input and generated
1349 input, using the Conjecture engine.
1350 """
1351 # Tell pytest to omit the body of this function from tracebacks
1352 __tracebackhide__ = True
1353 try:
1354 database_key = self.wrapped_test._hypothesis_internal_database_key
1355 except AttributeError:
1356 if global_force_seed is None:
1357 database_key = function_digest(self.test)
1358 else:
1359 database_key = None
1361 runner = self._runner = ConjectureRunner(
1362 self._execute_once_for_engine,
1363 settings=self.settings,
1364 random=self.random,
1365 database_key=database_key,
1366 )
1367 # Use the Conjecture engine to run the test function many times
1368 # on different inputs.
1369 runner.run()
1370 note_statistics(runner.statistics)
1371 if TESTCASE_CALLBACKS:
1372 self._deliver_information_message(
1373 type="info",
1374 title="Hypothesis Statistics",
1375 content=describe_statistics(runner.statistics),
1376 )
1377 for msg in (
1378 p if isinstance(p := runner.provider, PrimitiveProvider) else p(None)
1379 ).observe_information_messages(lifetime="test_function"):
1380 self._deliver_information_message(**msg)
1382 if runner.call_count == 0:
1383 return
1384 if runner.interesting_examples:
1385 self.falsifying_examples = sorted(
1386 runner.interesting_examples.values(),
1387 key=lambda d: sort_key(d.nodes),
1388 reverse=True,
1389 )
1390 else:
1391 if runner.valid_examples == 0:
1392 explanations = []
1393 # use a somewhat arbitrary cutoff to avoid recommending spurious
1394 # fixes.
1395 # eg, a few invalid examples from internal filters when the
1396 # problem is the user generating large inputs, or a
1397 # few overruns during internal mutation when the problem is
1398 # impossible user filters/assumes.
1399 if runner.invalid_examples > min(20, runner.call_count // 5):
1400 explanations.append(
1401 f"{runner.invalid_examples} of {runner.call_count} "
1402 "examples failed a .filter() or assume() condition. Try "
1403 "making your filters or assumes less strict, or rewrite "
1404 "using strategy parameters: "
1405 "st.integers().filter(lambda x: x > 0) fails less often "
1406 "(that is, never) when rewritten as st.integers(min_value=1)."
1407 )
1408 if runner.overrun_examples > min(20, runner.call_count // 5):
1409 explanations.append(
1410 f"{runner.overrun_examples} of {runner.call_count} "
1411 "examples were too large to finish generating; try "
1412 "reducing the typical size of your inputs?"
1413 )
1414 rep = get_pretty_function_description(self.test)
1415 raise Unsatisfiable(
1416 f"Unable to satisfy assumptions of {rep}. "
1417 f"{' Also, '.join(explanations)}"
1418 )
1420 # If we have not traced executions, warn about that now (but only when
1421 # we'd expect to do so reliably, i.e. on CPython>=3.12)
1422 if (
1423 sys.version_info[:2] >= (3, 12)
1424 and not PYPY
1425 and self._should_trace()
1426 and not Tracer.can_trace()
1427 ): # pragma: no cover
1428 # actually covered by our tests, but only on >= 3.12
1429 warnings.warn(
1430 "avoiding tracing test function because tool id "
1431 f"{MONITORING_TOOL_ID} is already taken by tool "
1432 f"{sys.monitoring.get_tool(MONITORING_TOOL_ID)}.",
1433 HypothesisWarning,
1434 stacklevel=3,
1435 )
1437 if not self.falsifying_examples:
1438 return
1439 elif not (self.settings.report_multiple_bugs and pytest_shows_exceptiongroups):
1440 # Pretend that we only found one failure, by discarding the others.
1441 del self.falsifying_examples[:-1]
1443 # The engine found one or more failures, so we need to reproduce and
1444 # report them.
1446 errors_to_report = []
1448 report_lines = describe_targets(runner.best_observed_targets)
1449 if report_lines:
1450 report_lines.append("")
1452 explanations = explanatory_lines(self.explain_traces, self.settings)
1453 for falsifying_example in self.falsifying_examples:
1454 fragments = []
1456 ran_example = runner.new_conjecture_data(
1457 falsifying_example.choices, max_choices=len(falsifying_example.choices)
1458 )
1459 ran_example.slice_comments = falsifying_example.slice_comments
1460 tb = None
1461 origin = None
1462 assert falsifying_example.expected_exception is not None
1463 assert falsifying_example.expected_traceback is not None
1464 try:
1465 with with_reporter(fragments.append):
1466 self.execute_once(
1467 ran_example,
1468 print_example=not self.is_find,
1469 is_final=True,
1470 expected_failure=(
1471 falsifying_example.expected_exception,
1472 falsifying_example.expected_traceback,
1473 ),
1474 )
1475 except StopTest as e:
1476 # Link the expected exception from the first run. Not sure
1477 # how to access the current exception, if it failed
1478 # differently on this run. In fact, in the only known
1479 # reproducer, the StopTest is caused by OVERRUN before the
1480 # test is even executed. Possibly because all initial examples
1481 # failed until the final non-traced replay, and something was
1482 # exhausted? Possibly a FIXME, but sufficiently weird to
1483 # ignore for now.
1484 err = FlakyFailure(
1485 "Inconsistent results: An example failed on the "
1486 "first run but now succeeds (or fails with another "
1487 "error, or is for some reason not runnable).",
1488 # (note: e is a BaseException)
1489 [falsifying_example.expected_exception or e],
1490 )
1491 errors_to_report.append((fragments, err))
1492 except UnsatisfiedAssumption as e: # pragma: no cover # ironically flaky
1493 err = FlakyFailure(
1494 "Unreliable assumption: An example which satisfied "
1495 "assumptions on the first run now fails it.",
1496 [e],
1497 )
1498 errors_to_report.append((fragments, err))
1499 except BaseException as e:
1500 # If we have anything for explain-mode, this is the time to report.
1501 fragments.extend(explanations[falsifying_example.interesting_origin])
1502 errors_to_report.append(
1503 (fragments, e.with_traceback(get_trimmed_traceback()))
1504 )
1505 tb = format_exception(e, get_trimmed_traceback(e))
1506 origin = InterestingOrigin.from_exception(e)
1507 else:
1508 # execute_once() will always raise either the expected error, or Flaky.
1509 raise NotImplementedError("This should be unreachable")
1510 finally:
1511 ran_example.freeze()
1512 # log our observability line for the final failing example
1513 tc = make_testcase(
1514 run_start=self._start_timestamp,
1515 property=self.test_identifier,
1516 data=ran_example,
1517 how_generated="minimal failing example",
1518 representation=self._string_repr,
1519 arguments=ran_example._observability_args,
1520 timing=self._timing_features,
1521 coverage=None, # Not recorded when we're replaying the MFE
1522 status="passed" if sys.exc_info()[0] else "failed",
1523 status_reason=str(origin or "unexpected/flaky pass"),
1524 metadata={"traceback": tb},
1525 )
1526 deliver_observation(tc)
1527 # Whether or not replay actually raised the exception again, we want
1528 # to print the reproduce_failure decorator for the failing example.
1529 if self.settings.print_blob:
1530 fragments.append(
1531 "\nYou can reproduce this example by temporarily adding "
1532 f"{reproduction_decorator(falsifying_example.choices)} "
1533 "as a decorator on your test case"
1534 )
1536 _raise_to_user(
1537 errors_to_report,
1538 self.settings,
1539 report_lines,
1540 # A backend might report a failure and then report verified afterwards,
1541 # which is to be interpreted as "there are no more failures *other
1542 # than what we already reported*". Do not report this as unsound.
1543 unsound_backend=(
1544 runner._verified_by
1545 if runner._verified_by and not runner._backend_found_failure
1546 else None
1547 ),
1548 )
1551def _raise_to_user(
1552 errors_to_report, settings, target_lines, trailer="", *, unsound_backend=None
1553):
1554 """Helper function for attaching notes and grouping multiple errors."""
1555 failing_prefix = "Falsifying example: "
1556 ls = []
1557 for fragments, err in errors_to_report:
1558 for note in fragments:
1559 add_note(err, note)
1560 if note.startswith(failing_prefix):
1561 ls.append(note.removeprefix(failing_prefix))
1562 if current_pytest_item.value:
1563 current_pytest_item.value._hypothesis_failing_examples = ls
1565 if len(errors_to_report) == 1:
1566 _, the_error_hypothesis_found = errors_to_report[0]
1567 else:
1568 assert errors_to_report
1569 the_error_hypothesis_found = BaseExceptionGroup(
1570 f"Hypothesis found {len(errors_to_report)} distinct failures{trailer}.",
1571 [e for _, e in errors_to_report],
1572 )
1574 if settings.verbosity >= Verbosity.normal:
1575 for line in target_lines:
1576 add_note(the_error_hypothesis_found, line)
1578 if unsound_backend:
1579 msg = f"backend={unsound_backend!r} claimed to verify this test passes - please send them a bug report!"
1580 add_note(err, msg)
1582 raise the_error_hypothesis_found
1585@contextlib.contextmanager
1586def fake_subTest(self, msg=None, **__):
1587 """Monkeypatch for `unittest.TestCase.subTest` during `@given`.
1589 If we don't patch this out, each failing example is reported as a
1590 separate failing test by the unittest test runner, which is
1591 obviously incorrect. We therefore replace it for the duration with
1592 this version.
1593 """
1594 warnings.warn(
1595 "subTest per-example reporting interacts badly with Hypothesis "
1596 "trying hundreds of examples, so we disable it for the duration of "
1597 "any test that uses `@given`.",
1598 HypothesisWarning,
1599 stacklevel=2,
1600 )
1601 yield
1604@dataclass
1605class HypothesisHandle:
1606 """This object is provided as the .hypothesis attribute on @given tests.
1608 Downstream users can reassign its attributes to insert custom logic into
1609 the execution of each case, for example by converting an async into a
1610 sync function.
1612 This must be an attribute of an attribute, because reassignment of a
1613 first-level attribute would not be visible to Hypothesis if the function
1614 had been decorated before the assignment.
1616 See https://github.com/HypothesisWorks/hypothesis/issues/1257 for more
1617 information.
1618 """
1620 inner_test: Any
1621 _get_fuzz_target: Any
1622 _given_kwargs: Any
1624 @property
1625 def fuzz_one_input(
1626 self,
1627 ) -> Callable[[Union[bytes, bytearray, memoryview, BinaryIO]], Optional[bytes]]:
1628 """Run the test as a fuzz target, driven with the `buffer` of bytes.
1630 Returns None if buffer invalid for the strategy, canonical pruned
1631 bytes if the buffer was valid, and leaves raised exceptions alone.
1632 """
1633 # Note: most users, if they care about fuzzer performance, will access the
1634 # property and assign it to a local variable to move the attribute lookup
1635 # outside their fuzzing loop / before the fork point. We cache it anyway,
1636 # so that naive or unusual use-cases get the best possible performance too.
1637 try:
1638 return self.__cached_target # type: ignore
1639 except AttributeError:
1640 self.__cached_target = self._get_fuzz_target()
1641 return self.__cached_target
1644@overload
1645def given(
1646 _: EllipsisType, /
1647) -> Callable[
1648 [Callable[..., Optional[Coroutine[Any, Any, None]]]], Callable[[], None]
1649]: # pragma: no cover
1650 ...
1653@overload
1654def given(
1655 *_given_arguments: SearchStrategy[Any],
1656) -> Callable[
1657 [Callable[..., Optional[Coroutine[Any, Any, None]]]], Callable[..., None]
1658]: # pragma: no cover
1659 ...
1662@overload
1663def given(
1664 **_given_kwargs: Union[SearchStrategy[Any], EllipsisType],
1665) -> Callable[
1666 [Callable[..., Optional[Coroutine[Any, Any, None]]]], Callable[..., None]
1667]: # pragma: no cover
1668 ...
1671def given(
1672 *_given_arguments: Union[SearchStrategy[Any], EllipsisType],
1673 **_given_kwargs: Union[SearchStrategy[Any], EllipsisType],
1674) -> Callable[
1675 [Callable[..., Optional[Coroutine[Any, Any, None]]]], Callable[..., None]
1676]:
1677 """
1678 The |@given| decorator turns a function into a Hypothesis test. This is the
1679 main entry point to Hypothesis.
1681 .. seealso::
1683 See also the :doc:`/tutorial/introduction` tutorial, which introduces
1684 defining Hypothesis tests with |@given|.
1686 .. _given-arguments:
1688 Arguments to ``@given``
1689 -----------------------
1691 Arguments to |@given| may be either positional or keyword arguments:
1693 .. code-block:: python
1695 @given(st.integers(), st.floats())
1696 def test_one(x, y):
1697 pass
1699 @given(x=st.integers(), y=st.floats())
1700 def test_two(x, y):
1701 pass
1703 If using keyword arguments, the arguments may appear in any order, as with
1704 standard Python functions:
1706 .. code-block:: python
1708 # different order, but still equivalent to before
1709 @given(y=st.floats(), x=st.integers())
1710 def test(x, y):
1711 assert isinstance(x, int)
1712 assert isinstance(y, float)
1714 If |@given| is provided fewer positional arguments than the decorated test,
1715 the test arguments are filled in on the right side, leaving the leftmost
1716 positional arguments unfilled:
1718 .. code-block:: python
1720 @given(st.integers(), st.floats())
1721 def test(manual_string, y, z):
1722 assert manual_string == "x"
1723 assert isinstance(y, int)
1724 assert isinstance(z, float)
1726 # `test` is now a callable which takes one argument `manual_string`
1728 test("x")
1729 # or equivalently:
1730 test(manual_string="x")
1732 The reason for this "from the right" behavior is to support using |@given|
1733 with instance methods, by passing through ``self``:
1735 .. code-block:: python
1737 class MyTest(TestCase):
1738 @given(st.integers())
1739 def test(self, x):
1740 assert isinstance(self, MyTest)
1741 assert isinstance(x, int)
1743 If (and only if) using keyword arguments, |@given| may be combined with
1744 ``**kwargs`` or ``*args``:
1746 .. code-block:: python
1748 @given(x=integers(), y=integers())
1749 def test(x, **kwargs):
1750 assert "y" in kwargs
1752 @given(x=integers(), y=integers())
1753 def test(x, *args, **kwargs):
1754 assert args == ()
1755 assert "x" not in kwargs
1756 assert "y" in kwargs
1758 It is an error to:
1760 * Mix positional and keyword arguments to |@given|.
1761 * Use |@given| with a function that has a default value for an argument.
1762 * Use |@given| with positional arguments with a function that uses ``*args``,
1763 ``**kwargs``, or keyword-only arguments.
1765 The function returned by given has all the same arguments as the original
1766 test, minus those that are filled in by |@given|. See the :ref:`notes on
1767 framework compatibility <framework-compatibility>` for how this interacts
1768 with features of other testing libraries, such as :pypi:`pytest` fixtures.
1769 """
1771 if currently_in_test_context():
1772 fail_health_check(
1773 Settings(),
1774 "Nesting @given tests results in quadratic generation and shrinking "
1775 "behavior and can usually be more cleanly expressed by replacing the "
1776 "inner function with an st.data() parameter on the outer @given.",
1777 HealthCheck.nested_given,
1778 )
1780 def run_test_as_given(test):
1781 if inspect.isclass(test):
1782 # Provide a meaningful error to users, instead of exceptions from
1783 # internals that assume we're dealing with a function.
1784 raise InvalidArgument("@given cannot be applied to a class")
1786 if (
1787 "_pytest" in sys.modules
1788 and (
1789 tuple(map(int, sys.modules["_pytest"].__version__.split(".")[:2]))
1790 >= (8, 4)
1791 )
1792 and isinstance(
1793 test, sys.modules["_pytest"].fixtures.FixtureFunctionDefinition
1794 )
1795 ): # pragma: no cover # covered by pytest/test_fixtures, but not by cover/
1796 raise InvalidArgument("@given cannot be applied to a pytest fixture")
1798 given_arguments = tuple(_given_arguments)
1799 given_kwargs = dict(_given_kwargs)
1801 original_sig = get_signature(test)
1802 if given_arguments == (Ellipsis,) and not given_kwargs:
1803 # user indicated that they want to infer all arguments
1804 given_kwargs = {
1805 p.name: Ellipsis
1806 for p in original_sig.parameters.values()
1807 if p.kind in (p.POSITIONAL_OR_KEYWORD, p.KEYWORD_ONLY)
1808 }
1809 given_arguments = ()
1811 check_invalid = is_invalid_test(
1812 test, original_sig, given_arguments, given_kwargs
1813 )
1815 # If the argument check found problems, return a dummy test function
1816 # that will raise an error if it is actually called.
1817 if check_invalid is not None:
1818 return check_invalid
1820 # Because the argument check succeeded, we can convert @given's
1821 # positional arguments into keyword arguments for simplicity.
1822 if given_arguments:
1823 assert not given_kwargs
1824 posargs = [
1825 p.name
1826 for p in original_sig.parameters.values()
1827 if p.kind is p.POSITIONAL_OR_KEYWORD
1828 ]
1829 given_kwargs = dict(list(zip(posargs[::-1], given_arguments[::-1]))[::-1])
1830 # These have been converted, so delete them to prevent accidental use.
1831 del given_arguments
1833 new_signature = new_given_signature(original_sig, given_kwargs)
1835 # Use type information to convert "infer" arguments into appropriate strategies.
1836 if ... in given_kwargs.values():
1837 hints = get_type_hints(test)
1838 for name in [name for name, value in given_kwargs.items() if value is ...]:
1839 if name not in hints:
1840 return _invalid(
1841 f"passed {name}=... for {test.__name__}, but {name} has "
1842 "no type annotation",
1843 test=test,
1844 given_kwargs=given_kwargs,
1845 )
1846 given_kwargs[name] = st.from_type(hints[name])
1848 prev_self = Unset = object()
1850 @impersonate(test)
1851 @define_function_signature(test.__name__, test.__doc__, new_signature)
1852 def wrapped_test(*arguments, **kwargs):
1853 # Tell pytest to omit the body of this function from tracebacks
1854 __tracebackhide__ = True
1856 test = wrapped_test.hypothesis.inner_test
1858 if getattr(test, "is_hypothesis_test", False):
1859 raise InvalidArgument(
1860 f"You have applied @given to the test {test.__name__} more than "
1861 "once, which wraps the test several times and is extremely slow. "
1862 "A similar effect can be gained by combining the arguments "
1863 "of the two calls to given. For example, instead of "
1864 "@given(booleans()) @given(integers()), you could write "
1865 "@given(booleans(), integers())"
1866 )
1868 settings = wrapped_test._hypothesis_internal_use_settings
1870 random = get_random_for_wrapped_test(test, wrapped_test)
1872 arguments, kwargs, stuff = process_arguments_to_given(
1873 wrapped_test, arguments, kwargs, given_kwargs, new_signature.parameters
1874 )
1876 if (
1877 inspect.iscoroutinefunction(test)
1878 and get_executor(stuff.selfy) is default_executor
1879 ):
1880 # See https://github.com/HypothesisWorks/hypothesis/issues/3054
1881 # If our custom executor doesn't handle coroutines, or we return an
1882 # awaitable from a non-async-def function, we just rely on the
1883 # return_value health check. This catches most user errors though.
1884 raise InvalidArgument(
1885 "Hypothesis doesn't know how to run async test functions like "
1886 f"{test.__name__}. You'll need to write a custom executor, "
1887 "or use a library like pytest-asyncio or pytest-trio which can "
1888 "handle the translation for you.\n See https://hypothesis."
1889 "readthedocs.io/en/latest/details.html#custom-function-execution"
1890 )
1892 runner = stuff.selfy
1893 if isinstance(stuff.selfy, TestCase) and test.__name__ in dir(TestCase):
1894 msg = (
1895 f"You have applied @given to the method {test.__name__}, which is "
1896 "used by the unittest runner but is not itself a test."
1897 " This is not useful in any way."
1898 )
1899 fail_health_check(settings, msg, HealthCheck.not_a_test_method)
1900 if bad_django_TestCase(runner): # pragma: no cover
1901 # Covered by the Django tests, but not the pytest coverage task
1902 raise InvalidArgument(
1903 "You have applied @given to a method on "
1904 f"{type(runner).__qualname__}, but this "
1905 "class does not inherit from the supported versions in "
1906 "`hypothesis.extra.django`. Use the Hypothesis variants "
1907 "to ensure that each example is run in a separate "
1908 "database transaction."
1909 )
1911 nonlocal prev_self
1912 # Check selfy really is self (not e.g. a mock) before we health-check
1913 cur_self = (
1914 stuff.selfy
1915 if getattr(type(stuff.selfy), test.__name__, None) is wrapped_test
1916 else None
1917 )
1918 if prev_self is Unset:
1919 prev_self = cur_self
1920 elif cur_self is not prev_self:
1921 msg = (
1922 f"The method {test.__qualname__} was called from multiple "
1923 "different executors. This may lead to flaky tests and "
1924 "nonreproducible errors when replaying from database."
1925 )
1926 fail_health_check(settings, msg, HealthCheck.differing_executors)
1928 state = StateForActualGivenExecution(
1929 stuff, test, settings, random, wrapped_test
1930 )
1932 reproduce_failure = wrapped_test._hypothesis_internal_use_reproduce_failure
1934 # If there was a @reproduce_failure decorator, use it to reproduce
1935 # the error (or complain that we couldn't). Either way, this will
1936 # always raise some kind of error.
1937 if reproduce_failure is not None:
1938 expected_version, failure = reproduce_failure
1939 if expected_version != __version__:
1940 raise InvalidArgument(
1941 "Attempting to reproduce a failure from a different "
1942 f"version of Hypothesis. This failure is from {expected_version}, but "
1943 f"you are currently running {__version__!r}. Please change your "
1944 "Hypothesis version to a matching one."
1945 )
1946 try:
1947 state.execute_once(
1948 ConjectureData.for_choices(decode_failure(failure)),
1949 print_example=True,
1950 is_final=True,
1951 )
1952 raise DidNotReproduce(
1953 "Expected the test to raise an error, but it "
1954 "completed successfully."
1955 )
1956 except StopTest:
1957 raise DidNotReproduce(
1958 "The shape of the test data has changed in some way "
1959 "from where this blob was defined. Are you sure "
1960 "you're running the same test?"
1961 ) from None
1962 except UnsatisfiedAssumption:
1963 raise DidNotReproduce(
1964 "The test data failed to satisfy an assumption in the "
1965 "test. Have you added it since this blob was generated?"
1966 ) from None
1968 # There was no @reproduce_failure, so start by running any explicit
1969 # examples from @example decorators.
1970 errors = list(
1971 execute_explicit_examples(
1972 state, wrapped_test, arguments, kwargs, original_sig
1973 )
1974 )
1975 if errors:
1976 # If we're not going to report multiple bugs, we would have
1977 # stopped running explicit examples at the first failure.
1978 assert len(errors) == 1 or state.settings.report_multiple_bugs
1980 # If an explicit example raised a 'skip' exception, ensure it's never
1981 # wrapped up in an exception group. Because we break out of the loop
1982 # immediately on finding a skip, if present it's always the last error.
1983 if isinstance(errors[-1][1], skip_exceptions_to_reraise()):
1984 # Covered by `test_issue_3453_regression`, just in a subprocess.
1985 del errors[:-1] # pragma: no cover
1987 _raise_to_user(errors, state.settings, [], " in explicit examples")
1989 # If there were any explicit examples, they all ran successfully.
1990 # The next step is to use the Conjecture engine to run the test on
1991 # many different inputs.
1993 ran_explicit_examples = Phase.explicit in state.settings.phases and getattr(
1994 wrapped_test, "hypothesis_explicit_examples", ()
1995 )
1996 SKIP_BECAUSE_NO_EXAMPLES = unittest.SkipTest(
1997 "Hypothesis has been told to run no examples for this test."
1998 )
1999 if not (
2000 Phase.reuse in settings.phases or Phase.generate in settings.phases
2001 ):
2002 if not ran_explicit_examples:
2003 raise SKIP_BECAUSE_NO_EXAMPLES
2004 return
2006 try:
2007 if isinstance(runner, TestCase) and hasattr(runner, "subTest"):
2008 subTest = runner.subTest
2009 try:
2010 runner.subTest = types.MethodType(fake_subTest, runner)
2011 state.run_engine()
2012 finally:
2013 runner.subTest = subTest
2014 else:
2015 state.run_engine()
2016 except BaseException as e:
2017 # The exception caught here should either be an actual test
2018 # failure (or BaseExceptionGroup), or some kind of fatal error
2019 # that caused the engine to stop.
2020 generated_seed = wrapped_test._hypothesis_internal_use_generated_seed
2021 with local_settings(settings):
2022 if not (state.failed_normally or generated_seed is None):
2023 if running_under_pytest:
2024 report(
2025 f"You can add @seed({generated_seed}) to this test or "
2026 f"run pytest with --hypothesis-seed={generated_seed} "
2027 "to reproduce this failure."
2028 )
2029 else:
2030 report(
2031 f"You can add @seed({generated_seed}) to this test to "
2032 "reproduce this failure."
2033 )
2034 # The dance here is to avoid showing users long tracebacks
2035 # full of Hypothesis internals they don't care about.
2036 # We have to do this inline, to avoid adding another
2037 # internal stack frame just when we've removed the rest.
2038 #
2039 # Using a variable for our trimmed error ensures that the line
2040 # which will actually appear in tracebacks is as clear as
2041 # possible - "raise the_error_hypothesis_found".
2042 the_error_hypothesis_found = e.with_traceback(
2043 None
2044 if isinstance(e, BaseExceptionGroup)
2045 else get_trimmed_traceback()
2046 )
2047 raise the_error_hypothesis_found
2049 if not (ran_explicit_examples or state.ever_executed):
2050 raise SKIP_BECAUSE_NO_EXAMPLES
2052 def _get_fuzz_target() -> (
2053 Callable[[Union[bytes, bytearray, memoryview, BinaryIO]], Optional[bytes]]
2054 ):
2055 # Because fuzzing interfaces are very performance-sensitive, we use a
2056 # somewhat more complicated structure here. `_get_fuzz_target()` is
2057 # called by the `HypothesisHandle.fuzz_one_input` property, allowing
2058 # us to defer our collection of the settings, random instance, and
2059 # reassignable `inner_test` (etc) until `fuzz_one_input` is accessed.
2060 #
2061 # We then share the performance cost of setting up `state` between
2062 # many invocations of the target. We explicitly force `deadline=None`
2063 # for performance reasons, saving ~40% the runtime of an empty test.
2064 test = wrapped_test.hypothesis.inner_test
2065 settings = Settings(
2066 parent=wrapped_test._hypothesis_internal_use_settings, deadline=None
2067 )
2068 random = get_random_for_wrapped_test(test, wrapped_test)
2069 _args, _kwargs, stuff = process_arguments_to_given(
2070 wrapped_test, (), {}, given_kwargs, new_signature.parameters
2071 )
2072 assert not _args
2073 assert not _kwargs
2074 state = StateForActualGivenExecution(
2075 stuff, test, settings, random, wrapped_test
2076 )
2077 database_key = function_digest(test) + b".secondary"
2078 # We track the minimal-so-far example for each distinct origin, so
2079 # that we track log-n instead of n examples for long runs. In particular
2080 # it means that we saturate for common errors in long runs instead of
2081 # storing huge volumes of low-value data.
2082 minimal_failures: dict = {}
2084 def fuzz_one_input(
2085 buffer: Union[bytes, bytearray, memoryview, BinaryIO],
2086 ) -> Optional[bytes]:
2087 # This inner part is all that the fuzzer will actually run,
2088 # so we keep it as small and as fast as possible.
2089 if isinstance(buffer, io.IOBase):
2090 buffer = buffer.read(BUFFER_SIZE)
2091 assert isinstance(buffer, (bytes, bytearray, memoryview))
2092 data = ConjectureData(
2093 random=None,
2094 provider=BytestringProvider,
2095 provider_kw={"bytestring": buffer},
2096 )
2097 try:
2098 state.execute_once(data)
2099 status = Status.VALID
2100 except StopTest:
2101 status = data.status
2102 return None
2103 except UnsatisfiedAssumption:
2104 status = Status.INVALID
2105 return None
2106 except BaseException:
2107 known = minimal_failures.get(data.interesting_origin)
2108 if settings.database is not None and (
2109 known is None or sort_key(data.nodes) <= sort_key(known)
2110 ):
2111 settings.database.save(
2112 database_key, choices_to_bytes(data.choices)
2113 )
2114 minimal_failures[data.interesting_origin] = data.nodes
2115 status = Status.INTERESTING
2116 raise
2117 finally:
2118 if TESTCASE_CALLBACKS:
2119 data.freeze()
2120 tc = make_testcase(
2121 run_start=state._start_timestamp,
2122 property=state.test_identifier,
2123 data=data,
2124 how_generated="fuzz_one_input",
2125 representation=state._string_repr,
2126 arguments=data._observability_args,
2127 timing=state._timing_features,
2128 coverage=None,
2129 status=status,
2130 backend_metadata=data.provider.observe_test_case(),
2131 )
2132 deliver_observation(tc)
2133 state._timing_features = {}
2135 assert isinstance(data.provider, BytestringProvider)
2136 return bytes(data.provider.drawn)
2138 fuzz_one_input.__doc__ = HypothesisHandle.fuzz_one_input.__doc__
2139 return fuzz_one_input
2141 # After having created the decorated test function, we need to copy
2142 # over some attributes to make the switch as seamless as possible.
2144 for attrib in dir(test):
2145 if not (attrib.startswith("_") or hasattr(wrapped_test, attrib)):
2146 setattr(wrapped_test, attrib, getattr(test, attrib))
2147 wrapped_test.is_hypothesis_test = True
2148 if hasattr(test, "_hypothesis_internal_settings_applied"):
2149 # Used to check if @settings is applied twice.
2150 wrapped_test._hypothesis_internal_settings_applied = True
2151 wrapped_test._hypothesis_internal_use_seed = getattr(
2152 test, "_hypothesis_internal_use_seed", None
2153 )
2154 wrapped_test._hypothesis_internal_use_settings = (
2155 getattr(test, "_hypothesis_internal_use_settings", None) or Settings.default
2156 )
2157 wrapped_test._hypothesis_internal_use_reproduce_failure = getattr(
2158 test, "_hypothesis_internal_use_reproduce_failure", None
2159 )
2160 wrapped_test.hypothesis = HypothesisHandle(test, _get_fuzz_target, given_kwargs)
2161 return wrapped_test
2163 return run_test_as_given
2166def find(
2167 specifier: SearchStrategy[Ex],
2168 condition: Callable[[Any], bool],
2169 *,
2170 settings: Optional[Settings] = None,
2171 random: Optional[Random] = None,
2172 database_key: Optional[bytes] = None,
2173) -> Ex:
2174 """Returns the minimal example from the given strategy ``specifier`` that
2175 matches the predicate function ``condition``."""
2176 if settings is None:
2177 settings = Settings(max_examples=2000)
2178 settings = Settings(
2179 settings, suppress_health_check=list(HealthCheck), report_multiple_bugs=False
2180 )
2182 if database_key is None and settings.database is not None:
2183 # Note: The database key is not guaranteed to be unique. If not, replaying
2184 # of database examples may fail to reproduce due to being replayed on the
2185 # wrong condition.
2186 database_key = function_digest(condition)
2188 if not isinstance(specifier, SearchStrategy):
2189 raise InvalidArgument(
2190 f"Expected SearchStrategy but got {specifier!r} of "
2191 f"type {type(specifier).__name__}"
2192 )
2193 specifier.validate()
2195 last: list[Ex] = []
2197 @settings
2198 @given(specifier)
2199 def test(v):
2200 if condition(v):
2201 last[:] = [v]
2202 raise Found
2204 if random is not None:
2205 test = seed(random.getrandbits(64))(test)
2207 # Aliasing as Any avoids mypy errors (attr-defined) when accessing and
2208 # setting custom attributes on the decorated function or class.
2209 _test: Any = test
2210 _test._hypothesis_internal_is_find = True
2211 _test._hypothesis_internal_database_key = database_key
2213 try:
2214 test()
2215 except Found:
2216 return last[0]
2218 raise NoSuchExample(get_pretty_function_description(condition))