Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/hypothesis/core.py: 36%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
11"""This module provides the core primitives of Hypothesis, such as given."""
13import base64
14import contextlib
15import datetime
16import inspect
17import io
18import math
19import sys
20import time
21import types
22import unittest
23import warnings
24import zlib
25from collections import defaultdict
26from functools import partial
27from random import Random
28from typing import (
29 TYPE_CHECKING,
30 Any,
31 BinaryIO,
32 Callable,
33 Coroutine,
34 Hashable,
35 List,
36 Optional,
37 Tuple,
38 Type,
39 TypeVar,
40 Union,
41 overload,
42)
43from unittest import TestCase
45import attr
47from hypothesis import strategies as st
48from hypothesis._settings import (
49 HealthCheck,
50 Phase,
51 Verbosity,
52 local_settings,
53 settings as Settings,
54)
55from hypothesis.control import BuildContext
56from hypothesis.errors import (
57 DeadlineExceeded,
58 DidNotReproduce,
59 FailedHealthCheck,
60 Flaky,
61 Found,
62 HypothesisDeprecationWarning,
63 HypothesisWarning,
64 InvalidArgument,
65 NoSuchExample,
66 StopTest,
67 Unsatisfiable,
68 UnsatisfiedAssumption,
69)
70from hypothesis.internal.compat import (
71 PYPY,
72 BaseExceptionGroup,
73 add_note,
74 bad_django_TestCase,
75 get_type_hints,
76 int_from_bytes,
77)
78from hypothesis.internal.conjecture.data import ConjectureData, Status
79from hypothesis.internal.conjecture.engine import BUFFER_SIZE, ConjectureRunner
80from hypothesis.internal.conjecture.junkdrawer import (
81 ensure_free_stackframes,
82 gc_cumulative_time,
83)
84from hypothesis.internal.conjecture.shrinker import sort_key
85from hypothesis.internal.entropy import deterministic_PRNG
86from hypothesis.internal.escalation import (
87 InterestingOrigin,
88 current_pytest_item,
89 escalate_hypothesis_internal_error,
90 format_exception,
91 get_trimmed_traceback,
92)
93from hypothesis.internal.healthcheck import fail_health_check
94from hypothesis.internal.observability import (
95 OBSERVABILITY_COLLECT_COVERAGE,
96 TESTCASE_CALLBACKS,
97 _system_metadata,
98 deliver_json_blob,
99 make_testcase,
100)
101from hypothesis.internal.reflection import (
102 convert_positional_arguments,
103 define_function_signature,
104 function_digest,
105 get_pretty_function_description,
106 get_signature,
107 impersonate,
108 is_mock,
109 nicerepr,
110 proxies,
111 repr_call,
112)
113from hypothesis.internal.scrutineer import (
114 MONITORING_TOOL_ID,
115 Trace,
116 Tracer,
117 explanatory_lines,
118 tractable_coverage_report,
119)
120from hypothesis.internal.validation import check_type
121from hypothesis.reporting import (
122 current_verbosity,
123 report,
124 verbose_report,
125 with_reporter,
126)
127from hypothesis.statistics import describe_statistics, describe_targets, note_statistics
128from hypothesis.strategies._internal.misc import NOTHING
129from hypothesis.strategies._internal.strategies import (
130 Ex,
131 SearchStrategy,
132 check_strategy,
133)
134from hypothesis.strategies._internal.utils import to_jsonable
135from hypothesis.vendor.pretty import RepresentationPrinter
136from hypothesis.version import __version__
138if sys.version_info >= (3, 10):
139 from types import EllipsisType as EllipsisType
140elif TYPE_CHECKING:
141 from builtins import ellipsis as EllipsisType
142else: # pragma: no cover
143 EllipsisType = type(Ellipsis)
146TestFunc = TypeVar("TestFunc", bound=Callable)
149running_under_pytest = False
150pytest_shows_exceptiongroups = True
151global_force_seed = None
152_hypothesis_global_random = None
155@attr.s()
156class Example:
157 args = attr.ib()
158 kwargs = attr.ib()
159 # Plus two optional arguments for .xfail()
160 raises = attr.ib(default=None)
161 reason = attr.ib(default=None)
164class example:
165 """A decorator which ensures a specific example is always tested."""
167 def __init__(self, *args: Any, **kwargs: Any) -> None:
168 if args and kwargs:
169 raise InvalidArgument(
170 "Cannot mix positional and keyword arguments for examples"
171 )
172 if not (args or kwargs):
173 raise InvalidArgument("An example must provide at least one argument")
175 self.hypothesis_explicit_examples: List[Example] = []
176 self._this_example = Example(tuple(args), kwargs)
178 def __call__(self, test: TestFunc) -> TestFunc:
179 if not hasattr(test, "hypothesis_explicit_examples"):
180 test.hypothesis_explicit_examples = self.hypothesis_explicit_examples # type: ignore
181 test.hypothesis_explicit_examples.append(self._this_example) # type: ignore
182 return test
184 def xfail(
185 self,
186 condition: bool = True, # noqa: FBT002
187 *,
188 reason: str = "",
189 raises: Union[
190 Type[BaseException], Tuple[Type[BaseException], ...]
191 ] = BaseException,
192 ) -> "example":
193 """Mark this example as an expected failure, similarly to
194 :obj:`pytest.mark.xfail(strict=True) <pytest.mark.xfail>`.
196 Expected-failing examples allow you to check that your test does fail on
197 some examples, and therefore build confidence that *passing* tests are
198 because your code is working, not because the test is missing something.
200 .. code-block:: python
202 @example(...).xfail()
203 @example(...).xfail(reason="Prices must be non-negative")
204 @example(...).xfail(raises=(KeyError, ValueError))
205 @example(...).xfail(sys.version_info[:2] >= (3, 9), reason="needs py39+")
206 @example(...).xfail(condition=sys.platform != "linux", raises=OSError)
207 def test(x):
208 pass
210 .. note::
212 Expected-failing examples are handled separately from those generated
213 by strategies, so you should usually ensure that there is no overlap.
215 .. code-block:: python
217 @example(x=1, y=0).xfail(raises=ZeroDivisionError)
218 @given(x=st.just(1), y=st.integers()) # Missing `.filter(bool)`!
219 def test_fraction(x, y):
220 # This test will try the explicit example and see it fail as
221 # expected, then go on to generate more examples from the
222 # strategy. If we happen to generate y=0, the test will fail
223 # because only the explicit example is treated as xfailing.
224 x / y
226 Note that this "method chaining" syntax requires Python 3.9 or later, for
227 :pep:`614` relaxing grammar restrictions on decorators. If you need to
228 support older versions of Python, you can use an identity function:
230 .. code-block:: python
232 def identity(x):
233 return x
236 @identity(example(...).xfail())
237 def test(x):
238 pass
240 """
241 check_type(bool, condition, "condition")
242 check_type(str, reason, "reason")
243 if not (
244 isinstance(raises, type) and issubclass(raises, BaseException)
245 ) and not (
246 isinstance(raises, tuple)
247 and raises # () -> expected to fail with no error, which is impossible
248 and all(
249 isinstance(r, type) and issubclass(r, BaseException) for r in raises
250 )
251 ):
252 raise InvalidArgument(
253 f"{raises=} must be an exception type or tuple of exception types"
254 )
255 if condition:
256 self._this_example = attr.evolve(
257 self._this_example, raises=raises, reason=reason
258 )
259 return self
261 def via(self, whence: str, /) -> "example":
262 """Attach a machine-readable label noting whence this example came.
264 The idea is that tools will be able to add ``@example()`` cases for you, e.g.
265 to maintain a high-coverage set of explicit examples, but also *remove* them
266 if they become redundant - without ever deleting manually-added examples:
268 .. code-block:: python
270 # You can choose to annotate examples, or not, as you prefer
271 @example(...)
272 @example(...).via("regression test for issue #42")
274 # The `hy-` prefix is reserved for automated tooling
275 @example(...).via("hy-failing")
276 @example(...).via("hy-coverage")
277 @example(...).via("hy-target-$label")
278 def test(x):
279 pass
281 Note that this "method chaining" syntax requires Python 3.9 or later, for
282 :pep:`614` relaxing grammar restrictions on decorators. If you need to
283 support older versions of Python, you can use an identity function:
285 .. code-block:: python
287 def identity(x):
288 return x
291 @identity(example(...).via("label"))
292 def test(x):
293 pass
295 """
296 if not isinstance(whence, str):
297 raise InvalidArgument(".via() must be passed a string")
298 # This is deliberately a no-op at runtime; the tools operate on source code.
299 return self
302def seed(seed: Hashable) -> Callable[[TestFunc], TestFunc]:
303 """seed: Start the test execution from a specific seed.
305 May be any hashable object. No exact meaning for seed is provided
306 other than that for a fixed seed value Hypothesis will try the same
307 actions (insofar as it can given external sources of non-
308 determinism. e.g. timing and hash randomization).
310 Overrides the derandomize setting, which is designed to enable
311 deterministic builds rather than reproducing observed failures.
313 """
315 def accept(test):
316 test._hypothesis_internal_use_seed = seed
317 current_settings = getattr(test, "_hypothesis_internal_use_settings", None)
318 test._hypothesis_internal_use_settings = Settings(
319 current_settings, database=None
320 )
321 return test
323 return accept
326def reproduce_failure(version: str, blob: bytes) -> Callable[[TestFunc], TestFunc]:
327 """Run the example that corresponds to this data blob in order to reproduce
328 a failure.
330 A test with this decorator *always* runs only one example and always fails.
331 If the provided example does not cause a failure, or is in some way invalid
332 for this test, then this will fail with a DidNotReproduce error.
334 This decorator is not intended to be a permanent addition to your test
335 suite. It's simply some code you can add to ease reproduction of a problem
336 in the event that you don't have access to the test database. Because of
337 this, *no* compatibility guarantees are made between different versions of
338 Hypothesis - its API may change arbitrarily from version to version.
339 """
341 def accept(test):
342 test._hypothesis_internal_use_reproduce_failure = (version, blob)
343 return test
345 return accept
348def encode_failure(buffer):
349 buffer = bytes(buffer)
350 compressed = zlib.compress(buffer)
351 if len(compressed) < len(buffer):
352 buffer = b"\1" + compressed
353 else:
354 buffer = b"\0" + buffer
355 return base64.b64encode(buffer)
358def decode_failure(blob):
359 try:
360 buffer = base64.b64decode(blob)
361 except Exception:
362 raise InvalidArgument(f"Invalid base64 encoded string: {blob!r}") from None
363 prefix = buffer[:1]
364 if prefix == b"\0":
365 return buffer[1:]
366 elif prefix == b"\1":
367 try:
368 return zlib.decompress(buffer[1:])
369 except zlib.error as err:
370 raise InvalidArgument(
371 f"Invalid zlib compression for blob {blob!r}"
372 ) from err
373 else:
374 raise InvalidArgument(
375 f"Could not decode blob {blob!r}: Invalid start byte {prefix!r}"
376 )
379def _invalid(message, *, exc=InvalidArgument, test, given_kwargs):
380 @impersonate(test)
381 def wrapped_test(*arguments, **kwargs): # pragma: no cover # coverage limitation
382 raise exc(message)
384 wrapped_test.is_hypothesis_test = True
385 wrapped_test.hypothesis = HypothesisHandle(
386 inner_test=test,
387 get_fuzz_target=wrapped_test,
388 given_kwargs=given_kwargs,
389 )
390 return wrapped_test
393def is_invalid_test(test, original_sig, given_arguments, given_kwargs):
394 """Check the arguments to ``@given`` for basic usage constraints.
396 Most errors are not raised immediately; instead we return a dummy test
397 function that will raise the appropriate error if it is actually called.
398 When the user runs a subset of tests (e.g via ``pytest -k``), errors will
399 only be reported for tests that actually ran.
400 """
401 invalid = partial(_invalid, test=test, given_kwargs=given_kwargs)
403 if not (given_arguments or given_kwargs):
404 return invalid("given must be called with at least one argument")
406 params = list(original_sig.parameters.values())
407 pos_params = [p for p in params if p.kind is p.POSITIONAL_OR_KEYWORD]
408 kwonly_params = [p for p in params if p.kind is p.KEYWORD_ONLY]
409 if given_arguments and params != pos_params:
410 return invalid(
411 "positional arguments to @given are not supported with varargs, "
412 "varkeywords, positional-only, or keyword-only arguments"
413 )
415 if len(given_arguments) > len(pos_params):
416 return invalid(
417 f"Too many positional arguments for {test.__name__}() were passed to "
418 f"@given - expected at most {len(pos_params)} "
419 f"arguments, but got {len(given_arguments)} {given_arguments!r}"
420 )
422 if ... in given_arguments:
423 return invalid(
424 "... was passed as a positional argument to @given, but may only be "
425 "passed as a keyword argument or as the sole argument of @given"
426 )
428 if given_arguments and given_kwargs:
429 return invalid("cannot mix positional and keyword arguments to @given")
430 extra_kwargs = [
431 k for k in given_kwargs if k not in {p.name for p in pos_params + kwonly_params}
432 ]
433 if extra_kwargs and (params == [] or params[-1].kind is not params[-1].VAR_KEYWORD):
434 arg = extra_kwargs[0]
435 return invalid(
436 f"{test.__name__}() got an unexpected keyword argument {arg!r}, "
437 f"from `{arg}={given_kwargs[arg]!r}` in @given"
438 )
439 if any(p.default is not p.empty for p in params):
440 return invalid("Cannot apply @given to a function with defaults.")
442 # This case would raise Unsatisfiable *anyway*, but by detecting it here we can
443 # provide a much more helpful error message for people e.g. using the Ghostwriter.
444 empty = [
445 f"{s!r} (arg {idx})" for idx, s in enumerate(given_arguments) if s is NOTHING
446 ] + [f"{name}={s!r}" for name, s in given_kwargs.items() if s is NOTHING]
447 if empty:
448 strats = "strategies" if len(empty) > 1 else "strategy"
449 return invalid(
450 f"Cannot generate examples from empty {strats}: " + ", ".join(empty),
451 exc=Unsatisfiable,
452 )
455def execute_explicit_examples(state, wrapped_test, arguments, kwargs, original_sig):
456 assert isinstance(state, StateForActualGivenExecution)
457 posargs = [
458 p.name
459 for p in original_sig.parameters.values()
460 if p.kind is p.POSITIONAL_OR_KEYWORD
461 ]
463 for example in reversed(getattr(wrapped_test, "hypothesis_explicit_examples", ())):
464 assert isinstance(example, Example)
465 # All of this validation is to check that @example() got "the same" arguments
466 # as @given, i.e. corresponding to the same parameters, even though they might
467 # be any mixture of positional and keyword arguments.
468 if example.args:
469 assert not example.kwargs
470 if any(
471 p.kind is p.POSITIONAL_ONLY for p in original_sig.parameters.values()
472 ):
473 raise InvalidArgument(
474 "Cannot pass positional arguments to @example() when decorating "
475 "a test function which has positional-only parameters."
476 )
477 if len(example.args) > len(posargs):
478 raise InvalidArgument(
479 "example has too many arguments for test. Expected at most "
480 f"{len(posargs)} but got {len(example.args)}"
481 )
482 example_kwargs = dict(zip(posargs[-len(example.args) :], example.args))
483 else:
484 example_kwargs = dict(example.kwargs)
485 given_kws = ", ".join(
486 repr(k) for k in sorted(wrapped_test.hypothesis._given_kwargs)
487 )
488 example_kws = ", ".join(repr(k) for k in sorted(example_kwargs))
489 if given_kws != example_kws:
490 raise InvalidArgument(
491 f"Inconsistent args: @given() got strategies for {given_kws}, "
492 f"but @example() got arguments for {example_kws}"
493 ) from None
495 # This is certainly true because the example_kwargs exactly match the params
496 # reserved by @given(), which are then remove from the function signature.
497 assert set(example_kwargs).isdisjoint(kwargs)
498 example_kwargs.update(kwargs)
500 if Phase.explicit not in state.settings.phases:
501 continue
503 with local_settings(state.settings):
504 fragments_reported = []
505 empty_data = ConjectureData.for_buffer(b"")
506 try:
507 bits = ", ".join(nicerepr(x) for x in arguments) + ", ".join(
508 f"{k}={nicerepr(v)}" for k, v in example_kwargs.items()
509 )
510 execute_example = partial(
511 state.execute_once,
512 empty_data,
513 is_final=True,
514 print_example=True,
515 example_kwargs=example_kwargs,
516 )
517 with with_reporter(fragments_reported.append):
518 if example.raises is None:
519 execute_example()
520 else:
521 # @example(...).xfail(...)
523 try:
524 execute_example()
525 except failure_exceptions_to_catch() as err:
526 if not isinstance(err, example.raises):
527 raise
528 # Save a string form of this example; we'll warn if it's
529 # ever generated by the strategy (which can't be xfailed)
530 state.xfail_example_reprs.add(
531 repr_call(state.test, arguments, example_kwargs)
532 )
533 except example.raises as err:
534 # We'd usually check this as early as possible, but it's
535 # possible for failure_exceptions_to_catch() to grow when
536 # e.g. pytest is imported between import- and test-time.
537 raise InvalidArgument(
538 f"@example({bits}) raised an expected {err!r}, "
539 "but Hypothesis does not treat this as a test failure"
540 ) from err
541 else:
542 # Unexpectedly passing; always raise an error in this case.
543 reason = f" because {example.reason}" * bool(example.reason)
544 if example.raises is BaseException:
545 name = "exception" # special-case no raises= arg
546 elif not isinstance(example.raises, tuple):
547 name = example.raises.__name__
548 elif len(example.raises) == 1:
549 name = example.raises[0].__name__
550 else:
551 name = (
552 ", ".join(ex.__name__ for ex in example.raises[:-1])
553 + f", or {example.raises[-1].__name__}"
554 )
555 vowel = name.upper()[0] in "AEIOU"
556 raise AssertionError(
557 f"Expected a{'n' * vowel} {name} from @example({bits})"
558 f"{reason}, but no exception was raised."
559 )
560 except UnsatisfiedAssumption:
561 # Odd though it seems, we deliberately support explicit examples that
562 # are then rejected by a call to `assume()`. As well as iterative
563 # development, this is rather useful to replay Hypothesis' part of
564 # a saved failure when other arguments are supplied by e.g. pytest.
565 # See https://github.com/HypothesisWorks/hypothesis/issues/2125
566 with contextlib.suppress(StopTest):
567 empty_data.conclude_test(Status.INVALID)
568 except BaseException as err:
569 # In order to support reporting of multiple failing examples, we yield
570 # each of the (report text, error) pairs we find back to the top-level
571 # runner. This also ensures that user-facing stack traces have as few
572 # frames of Hypothesis internals as possible.
573 err = err.with_traceback(get_trimmed_traceback())
575 # One user error - whether misunderstanding or typo - we've seen a few
576 # times is to pass strategies to @example() where values are expected.
577 # Checking is easy, and false-positives not much of a problem, so:
578 if isinstance(err, failure_exceptions_to_catch()) and any(
579 isinstance(arg, SearchStrategy)
580 for arg in example.args + tuple(example.kwargs.values())
581 ):
582 new = HypothesisWarning(
583 "The @example() decorator expects to be passed values, but "
584 "you passed strategies instead. See https://hypothesis."
585 "readthedocs.io/en/latest/reproducing.html for details."
586 )
587 new.__cause__ = err
588 err = new
590 with contextlib.suppress(StopTest):
591 empty_data.conclude_test(Status.INVALID)
592 yield (fragments_reported, err)
593 if (
594 state.settings.report_multiple_bugs
595 and pytest_shows_exceptiongroups
596 and isinstance(err, failure_exceptions_to_catch())
597 and not isinstance(err, skip_exceptions_to_reraise())
598 ):
599 continue
600 break
601 finally:
602 if fragments_reported:
603 assert fragments_reported[0].startswith("Falsifying example")
604 fragments_reported[0] = fragments_reported[0].replace(
605 "Falsifying example", "Falsifying explicit example", 1
606 )
608 tc = make_testcase(
609 start_timestamp=state._start_timestamp,
610 test_name_or_nodeid=state.test_identifier,
611 data=empty_data,
612 how_generated="explicit example",
613 string_repr=state._string_repr,
614 timing=state._timing_features,
615 )
616 deliver_json_blob(tc)
618 if fragments_reported:
619 verbose_report(fragments_reported[0].replace("Falsifying", "Trying", 1))
620 for f in fragments_reported[1:]:
621 verbose_report(f)
624def get_random_for_wrapped_test(test, wrapped_test):
625 settings = wrapped_test._hypothesis_internal_use_settings
626 wrapped_test._hypothesis_internal_use_generated_seed = None
628 if wrapped_test._hypothesis_internal_use_seed is not None:
629 return Random(wrapped_test._hypothesis_internal_use_seed)
630 elif settings.derandomize:
631 return Random(int_from_bytes(function_digest(test)))
632 elif global_force_seed is not None:
633 return Random(global_force_seed)
634 else:
635 global _hypothesis_global_random
636 if _hypothesis_global_random is None: # pragma: no cover
637 _hypothesis_global_random = Random()
638 seed = _hypothesis_global_random.getrandbits(128)
639 wrapped_test._hypothesis_internal_use_generated_seed = seed
640 return Random(seed)
643@attr.s
644class Stuff:
645 selfy: Any = attr.ib(default=None)
646 args: tuple = attr.ib(factory=tuple)
647 kwargs: dict = attr.ib(factory=dict)
648 given_kwargs: dict = attr.ib(factory=dict)
651def process_arguments_to_given(wrapped_test, arguments, kwargs, given_kwargs, params):
652 selfy = None
653 arguments, kwargs = convert_positional_arguments(wrapped_test, arguments, kwargs)
655 # If the test function is a method of some kind, the bound object
656 # will be the first named argument if there are any, otherwise the
657 # first vararg (if any).
658 posargs = [p.name for p in params.values() if p.kind is p.POSITIONAL_OR_KEYWORD]
659 if posargs:
660 selfy = kwargs.get(posargs[0])
661 elif arguments:
662 selfy = arguments[0]
664 # Ensure that we don't mistake mocks for self here.
665 # This can cause the mock to be used as the test runner.
666 if is_mock(selfy):
667 selfy = None
669 arguments = tuple(arguments)
671 with ensure_free_stackframes():
672 for k, s in given_kwargs.items():
673 check_strategy(s, name=k)
674 s.validate()
676 stuff = Stuff(selfy=selfy, args=arguments, kwargs=kwargs, given_kwargs=given_kwargs)
678 return arguments, kwargs, stuff
681def skip_exceptions_to_reraise():
682 """Return a tuple of exceptions meaning 'skip this test', to re-raise.
684 This is intended to cover most common test runners; if you would
685 like another to be added please open an issue or pull request adding
686 it to this function and to tests/cover/test_lazy_import.py
687 """
688 # This is a set because nose may simply re-export unittest.SkipTest
689 exceptions = set()
690 # We use this sys.modules trick to avoid importing libraries -
691 # you can't be an instance of a type from an unimported module!
692 # This is fast enough that we don't need to cache the result,
693 # and more importantly it avoids possible side-effects :-)
694 if "unittest" in sys.modules:
695 exceptions.add(sys.modules["unittest"].SkipTest)
696 if "unittest2" in sys.modules:
697 exceptions.add(sys.modules["unittest2"].SkipTest)
698 if "nose" in sys.modules:
699 exceptions.add(sys.modules["nose"].SkipTest)
700 if "_pytest" in sys.modules:
701 exceptions.add(sys.modules["_pytest"].outcomes.Skipped)
702 return tuple(sorted(exceptions, key=str))
705def failure_exceptions_to_catch():
706 """Return a tuple of exceptions meaning 'this test has failed', to catch.
708 This is intended to cover most common test runners; if you would
709 like another to be added please open an issue or pull request.
710 """
711 # While SystemExit and GeneratorExit are instances of BaseException, we also
712 # expect them to be deterministic - unlike KeyboardInterrupt - and so we treat
713 # them as standard exceptions, check for flakiness, etc.
714 # See https://github.com/HypothesisWorks/hypothesis/issues/2223 for details.
715 exceptions = [Exception, SystemExit, GeneratorExit]
716 if "_pytest" in sys.modules:
717 exceptions.append(sys.modules["_pytest"].outcomes.Failed)
718 return tuple(exceptions)
721def new_given_signature(original_sig, given_kwargs):
722 """Make an updated signature for the wrapped test."""
723 return original_sig.replace(
724 parameters=[
725 p
726 for p in original_sig.parameters.values()
727 if not (
728 p.name in given_kwargs
729 and p.kind in (p.POSITIONAL_OR_KEYWORD, p.KEYWORD_ONLY)
730 )
731 ],
732 return_annotation=None,
733 )
736def default_executor(data, function):
737 return function(data)
740def get_executor(runner):
741 try:
742 execute_example = runner.execute_example
743 except AttributeError:
744 pass
745 else:
746 return lambda data, function: execute_example(partial(function, data))
748 if hasattr(runner, "setup_example") or hasattr(runner, "teardown_example"):
749 setup = getattr(runner, "setup_example", None) or (lambda: None)
750 teardown = getattr(runner, "teardown_example", None) or (lambda ex: None)
752 def execute(data, function):
753 token = None
754 try:
755 token = setup()
756 return function(data)
757 finally:
758 teardown(token)
760 return execute
762 return default_executor
765class StateForActualGivenExecution:
766 def __init__(self, stuff, test, settings, random, wrapped_test):
767 self.test_runner = get_executor(stuff.selfy)
768 self.stuff = stuff
769 self.settings = settings
770 self.last_exception = None
771 self.falsifying_examples = ()
772 self.random = random
773 self.ever_executed = False
775 self.is_find = getattr(wrapped_test, "_hypothesis_internal_is_find", False)
776 self.wrapped_test = wrapped_test
777 self.xfail_example_reprs = set()
779 self.test = test
781 self.print_given_args = getattr(
782 wrapped_test, "_hypothesis_internal_print_given_args", True
783 )
785 self.files_to_propagate = set()
786 self.failed_normally = False
787 self.failed_due_to_deadline = False
789 self.explain_traces = defaultdict(set)
790 self._start_timestamp = time.time()
791 self._string_repr = ""
792 self._timing_features = {}
794 @property
795 def test_identifier(self):
796 return getattr(
797 current_pytest_item.value, "nodeid", None
798 ) or get_pretty_function_description(self.wrapped_test)
800 def execute_once(
801 self,
802 data,
803 *,
804 print_example=False,
805 is_final=False,
806 expected_failure=None,
807 example_kwargs=None,
808 ):
809 """Run the test function once, using ``data`` as input.
811 If the test raises an exception, it will propagate through to the
812 caller of this method. Depending on its type, this could represent
813 an ordinary test failure, or a fatal error, or a control exception.
815 If this method returns normally, the test might have passed, or
816 it might have placed ``data`` in an unsuccessful state and then
817 swallowed the corresponding control exception.
818 """
820 self.ever_executed = True
821 data.is_find = self.is_find
823 self._string_repr = ""
824 text_repr = None
825 if self.settings.deadline is None and not TESTCASE_CALLBACKS:
827 @proxies(self.test)
828 def test(*args, **kwargs):
829 with ensure_free_stackframes():
830 return self.test(*args, **kwargs)
832 else:
834 @proxies(self.test)
835 def test(*args, **kwargs):
836 arg_drawtime = math.fsum(data.draw_times.values())
837 arg_stateful = math.fsum(data._stateful_run_times.values())
838 arg_gctime = gc_cumulative_time()
839 start = time.perf_counter()
840 try:
841 with ensure_free_stackframes():
842 result = self.test(*args, **kwargs)
843 finally:
844 finish = time.perf_counter()
845 in_drawtime = math.fsum(data.draw_times.values()) - arg_drawtime
846 in_stateful = (
847 math.fsum(data._stateful_run_times.values()) - arg_stateful
848 )
849 in_gctime = gc_cumulative_time() - arg_gctime
850 runtime = finish - start - in_drawtime - in_stateful - in_gctime
851 self._timing_features = {
852 "execute:test": runtime,
853 "overall:gc": in_gctime,
854 **data.draw_times,
855 **data._stateful_run_times,
856 }
858 if (current_deadline := self.settings.deadline) is not None:
859 if not is_final:
860 current_deadline = (current_deadline // 4) * 5
861 if runtime >= current_deadline.total_seconds():
862 raise DeadlineExceeded(
863 datetime.timedelta(seconds=runtime), self.settings.deadline
864 )
865 return result
867 def run(data):
868 # Set up dynamic context needed by a single test run.
869 if self.stuff.selfy is not None:
870 data.hypothesis_runner = self.stuff.selfy
871 # Generate all arguments to the test function.
872 args = self.stuff.args
873 kwargs = dict(self.stuff.kwargs)
874 if example_kwargs is None:
875 kw, argslices = context.prep_args_kwargs_from_strategies(
876 self.stuff.given_kwargs
877 )
878 else:
879 kw = example_kwargs
880 argslices = {}
881 kwargs.update(kw)
882 if expected_failure is not None:
883 nonlocal text_repr
884 text_repr = repr_call(test, args, kwargs)
885 if text_repr in self.xfail_example_reprs:
886 warnings.warn(
887 f"We generated {text_repr}, which seems identical "
888 "to one of your `@example(...).xfail()` cases. "
889 "Revise the strategy to avoid this overlap?",
890 HypothesisWarning,
891 # Checked in test_generating_xfailed_examples_warns!
892 stacklevel=6,
893 )
895 if print_example or current_verbosity() >= Verbosity.verbose:
896 printer = RepresentationPrinter(context=context)
897 if print_example:
898 printer.text("Falsifying example:")
899 else:
900 printer.text("Trying example:")
902 if self.print_given_args:
903 printer.text(" ")
904 printer.repr_call(
905 test.__name__,
906 args,
907 kwargs,
908 force_split=True,
909 arg_slices=argslices,
910 leading_comment=(
911 "# " + context.data.slice_comments[(0, 0)]
912 if (0, 0) in context.data.slice_comments
913 else None
914 ),
915 )
916 report(printer.getvalue())
918 if TESTCASE_CALLBACKS:
919 printer = RepresentationPrinter(context=context)
920 printer.repr_call(
921 test.__name__,
922 args,
923 kwargs,
924 force_split=True,
925 arg_slices=argslices,
926 leading_comment=(
927 "# " + context.data.slice_comments[(0, 0)]
928 if (0, 0) in context.data.slice_comments
929 else None
930 ),
931 )
932 self._string_repr = printer.getvalue()
933 data._observability_arguments = {
934 **dict(enumerate(map(to_jsonable, args))),
935 **{k: to_jsonable(v) for k, v in kwargs.items()},
936 }
938 try:
939 return test(*args, **kwargs)
940 except TypeError as e:
941 # If we sampled from a sequence of strategies, AND failed with a
942 # TypeError, *AND that exception mentions SearchStrategy*, add a note:
943 if "SearchStrategy" in str(e) and hasattr(
944 data, "_sampled_from_all_strategies_elements_message"
945 ):
946 msg, format_arg = data._sampled_from_all_strategies_elements_message
947 add_note(e, msg.format(format_arg))
948 raise
949 finally:
950 if parts := getattr(data, "_stateful_repr_parts", None):
951 self._string_repr = "\n".join(parts)
953 # self.test_runner can include the execute_example method, or setup/teardown
954 # _example, so it's important to get the PRNG and build context in place first.
955 with local_settings(self.settings):
956 with deterministic_PRNG():
957 with BuildContext(data, is_final=is_final) as context:
958 # providers may throw in per_case_context_fn, and we'd like
959 # `result` to still be set in these cases.
960 result = None
961 with data.provider.per_test_case_context_manager():
962 # Run the test function once, via the executor hook.
963 # In most cases this will delegate straight to `run(data)`.
964 result = self.test_runner(data, run)
966 # If a failure was expected, it should have been raised already, so
967 # instead raise an appropriate diagnostic error.
968 if expected_failure is not None:
969 exception, traceback = expected_failure
970 if isinstance(exception, DeadlineExceeded) and (
971 runtime_secs := math.fsum(
972 v
973 for k, v in self._timing_features.items()
974 if k.startswith("execute:")
975 )
976 ):
977 report(
978 "Unreliable test timings! On an initial run, this "
979 "test took %.2fms, which exceeded the deadline of "
980 "%.2fms, but on a subsequent run it took %.2f ms, "
981 "which did not. If you expect this sort of "
982 "variability in your test timings, consider turning "
983 "deadlines off for this test by setting deadline=None."
984 % (
985 exception.runtime.total_seconds() * 1000,
986 self.settings.deadline.total_seconds() * 1000,
987 runtime_secs * 1000,
988 )
989 )
990 else:
991 report("Failed to reproduce exception. Expected: \n" + traceback)
992 raise Flaky(
993 f"Hypothesis {text_repr} produces unreliable results: "
994 "Falsified on the first call but did not on a subsequent one"
995 ) from exception
996 return result
998 def _execute_once_for_engine(self, data: ConjectureData) -> None:
999 """Wrapper around ``execute_once`` that intercepts test failure
1000 exceptions and single-test control exceptions, and turns them into
1001 appropriate method calls to `data` instead.
1003 This allows the engine to assume that any exception other than
1004 ``StopTest`` must be a fatal error, and should stop the entire engine.
1005 """
1006 trace: Trace = set()
1007 try:
1008 # this is actually covered by our tests, but only on >= 3.12.
1009 if (
1010 sys.version_info[:2] >= (3, 12)
1011 and sys.monitoring.get_tool(MONITORING_TOOL_ID) is not None
1012 ): # pragma: no cover
1013 warnings.warn(
1014 "avoiding tracing test function because tool id "
1015 f"{MONITORING_TOOL_ID} is already taken by tool "
1016 f"{sys.monitoring.get_tool(MONITORING_TOOL_ID)}.",
1017 HypothesisWarning,
1018 # I'm not sure computing a correct stacklevel is reasonable
1019 # given the number of entry points here.
1020 stacklevel=1,
1021 )
1023 _can_trace = (
1024 (sys.version_info[:2] < (3, 12) and sys.gettrace() is None)
1025 or (
1026 sys.version_info[:2] >= (3, 12)
1027 and sys.monitoring.get_tool(MONITORING_TOOL_ID) is None
1028 )
1029 ) and not PYPY
1030 _trace_obs = TESTCASE_CALLBACKS and OBSERVABILITY_COLLECT_COVERAGE
1031 _trace_failure = (
1032 self.failed_normally
1033 and not self.failed_due_to_deadline
1034 and {Phase.shrink, Phase.explain}.issubset(self.settings.phases)
1035 )
1036 if _can_trace and (_trace_obs or _trace_failure): # pragma: no cover
1037 # This is in fact covered by our *non-coverage* tests, but due to the
1038 # settrace() contention *not* by our coverage tests. Ah well.
1039 with Tracer() as tracer:
1040 try:
1041 result = self.execute_once(data)
1042 if data.status == Status.VALID:
1043 self.explain_traces[None].add(frozenset(tracer.branches))
1044 finally:
1045 trace = tracer.branches
1046 else:
1047 result = self.execute_once(data)
1048 if result is not None:
1049 fail_health_check(
1050 self.settings,
1051 "Tests run under @given should return None, but "
1052 f"{self.test.__name__} returned {result!r} instead.",
1053 HealthCheck.return_value,
1054 )
1055 except UnsatisfiedAssumption as e:
1056 # An "assume" check failed, so instead we inform the engine that
1057 # this test run was invalid.
1058 data.mark_invalid(e.reason)
1059 except StopTest:
1060 # The engine knows how to handle this control exception, so it's
1061 # OK to re-raise it.
1062 raise
1063 except (
1064 HypothesisDeprecationWarning,
1065 FailedHealthCheck,
1066 *skip_exceptions_to_reraise(),
1067 ):
1068 # These are fatal errors or control exceptions that should stop the
1069 # engine, so we re-raise them.
1070 raise
1071 except failure_exceptions_to_catch() as e:
1072 # If the error was raised by Hypothesis-internal code, re-raise it
1073 # as a fatal error instead of treating it as a test failure.
1074 escalate_hypothesis_internal_error()
1076 if data.frozen:
1077 # This can happen if an error occurred in a finally
1078 # block somewhere, suppressing our original StopTest.
1079 # We raise a new one here to resume normal operation.
1080 raise StopTest(data.testcounter) from e
1081 else:
1082 # The test failed by raising an exception, so we inform the
1083 # engine that this test run was interesting. This is the normal
1084 # path for test runs that fail.
1085 tb = get_trimmed_traceback()
1086 info = data.extra_information
1087 info._expected_traceback = format_exception(e, tb) # type: ignore
1088 info._expected_exception = e # type: ignore
1089 verbose_report(info._expected_traceback) # type: ignore
1091 self.failed_normally = True
1093 interesting_origin = InterestingOrigin.from_exception(e)
1094 if trace: # pragma: no cover
1095 # Trace collection is explicitly disabled under coverage.
1096 self.explain_traces[interesting_origin].add(frozenset(trace))
1097 if interesting_origin[0] == DeadlineExceeded:
1098 self.failed_due_to_deadline = True
1099 self.explain_traces.clear()
1100 data.mark_interesting(interesting_origin) # type: ignore # mypy bug?
1101 finally:
1102 # Conditional here so we can save some time constructing the payload; in
1103 # other cases (without coverage) it's cheap enough to do that regardless.
1104 if TESTCASE_CALLBACKS:
1105 if runner := getattr(self, "_runner", None):
1106 phase = runner._current_phase
1107 elif self.failed_normally or self.failed_due_to_deadline:
1108 phase = "shrink"
1109 else: # pragma: no cover # in case of messing with internals
1110 phase = "unknown"
1111 backend_desc = f", using backend={self.settings.backend!r}" * (
1112 self.settings.backend != "hypothesis"
1113 and not getattr(runner, "_switch_to_hypothesis_provider", False)
1114 )
1115 tc = make_testcase(
1116 start_timestamp=self._start_timestamp,
1117 test_name_or_nodeid=self.test_identifier,
1118 data=data,
1119 how_generated=f"during {phase} phase{backend_desc}",
1120 string_repr=self._string_repr,
1121 arguments=data._observability_args,
1122 timing=self._timing_features,
1123 coverage=tractable_coverage_report(trace) or None,
1124 phase=phase,
1125 )
1126 deliver_json_blob(tc)
1127 self._timing_features = {}
1129 def run_engine(self):
1130 """Run the test function many times, on database input and generated
1131 input, using the Conjecture engine.
1132 """
1133 # Tell pytest to omit the body of this function from tracebacks
1134 __tracebackhide__ = True
1135 try:
1136 database_key = self.wrapped_test._hypothesis_internal_database_key
1137 except AttributeError:
1138 if global_force_seed is None:
1139 database_key = function_digest(self.test)
1140 else:
1141 database_key = None
1143 runner = self._runner = ConjectureRunner(
1144 self._execute_once_for_engine,
1145 settings=self.settings,
1146 random=self.random,
1147 database_key=database_key,
1148 )
1149 # Use the Conjecture engine to run the test function many times
1150 # on different inputs.
1151 runner.run()
1152 note_statistics(runner.statistics)
1153 deliver_json_blob(
1154 {
1155 "type": "info",
1156 "run_start": self._start_timestamp,
1157 "property": self.test_identifier,
1158 "title": "Hypothesis Statistics",
1159 "content": describe_statistics(runner.statistics),
1160 }
1161 )
1163 if runner.call_count == 0:
1164 return
1165 if runner.interesting_examples:
1166 self.falsifying_examples = sorted(
1167 runner.interesting_examples.values(),
1168 key=lambda d: sort_key(d.buffer),
1169 reverse=True,
1170 )
1171 else:
1172 if runner.valid_examples == 0:
1173 rep = get_pretty_function_description(self.test)
1174 raise Unsatisfiable(f"Unable to satisfy assumptions of {rep}")
1176 if not self.falsifying_examples:
1177 return
1178 elif not (self.settings.report_multiple_bugs and pytest_shows_exceptiongroups):
1179 # Pretend that we only found one failure, by discarding the others.
1180 del self.falsifying_examples[:-1]
1182 # The engine found one or more failures, so we need to reproduce and
1183 # report them.
1185 errors_to_report = []
1187 report_lines = describe_targets(runner.best_observed_targets)
1188 if report_lines:
1189 report_lines.append("")
1191 explanations = explanatory_lines(self.explain_traces, self.settings)
1192 for falsifying_example in self.falsifying_examples:
1193 info = falsifying_example.extra_information
1194 fragments = []
1196 ran_example = runner.new_conjecture_data_for_buffer(
1197 falsifying_example.buffer
1198 )
1199 ran_example.slice_comments = falsifying_example.slice_comments
1200 tb = None
1201 origin = None
1202 assert info._expected_exception is not None
1203 try:
1204 with with_reporter(fragments.append):
1205 self.execute_once(
1206 ran_example,
1207 print_example=not self.is_find,
1208 is_final=True,
1209 expected_failure=(
1210 info._expected_exception,
1211 info._expected_traceback,
1212 ),
1213 )
1214 except (UnsatisfiedAssumption, StopTest) as e:
1215 err = Flaky(
1216 "Unreliable assumption: An example which satisfied "
1217 "assumptions on the first run now fails it.",
1218 )
1219 err.__cause__ = err.__context__ = e
1220 errors_to_report.append((fragments, err))
1221 except BaseException as e:
1222 # If we have anything for explain-mode, this is the time to report.
1223 fragments.extend(explanations[falsifying_example.interesting_origin])
1224 errors_to_report.append(
1225 (fragments, e.with_traceback(get_trimmed_traceback()))
1226 )
1227 tb = format_exception(e, get_trimmed_traceback(e))
1228 origin = InterestingOrigin.from_exception(e)
1229 else:
1230 # execute_once() will always raise either the expected error, or Flaky.
1231 raise NotImplementedError("This should be unreachable")
1232 finally:
1233 # log our observability line for the final failing example
1234 tc = {
1235 "type": "test_case",
1236 "run_start": self._start_timestamp,
1237 "property": self.test_identifier,
1238 "status": "passed" if sys.exc_info()[0] else "failed",
1239 "status_reason": str(origin or "unexpected/flaky pass"),
1240 "representation": self._string_repr,
1241 "arguments": ran_example._observability_args,
1242 "how_generated": "minimal failing example",
1243 "features": {
1244 **{
1245 f"target:{k}".strip(":"): v
1246 for k, v in ran_example.target_observations.items()
1247 },
1248 **ran_example.events,
1249 },
1250 "timing": self._timing_features,
1251 "coverage": None, # Not recorded when we're replaying the MFE
1252 "metadata": {
1253 "traceback": tb,
1254 "predicates": ran_example._observability_predicates,
1255 **_system_metadata(),
1256 },
1257 }
1258 deliver_json_blob(tc)
1259 # Whether or not replay actually raised the exception again, we want
1260 # to print the reproduce_failure decorator for the failing example.
1261 if self.settings.print_blob:
1262 fragments.append(
1263 "\nYou can reproduce this example by temporarily adding "
1264 "@reproduce_failure(%r, %r) as a decorator on your test case"
1265 % (__version__, encode_failure(falsifying_example.buffer))
1266 )
1267 # Mostly useful for ``find`` and ensuring that objects that
1268 # hold on to a reference to ``data`` know that it's now been
1269 # finished and they can't draw more data from it.
1270 ran_example.freeze() # pragma: no branch
1271 # No branch is possible here because we never have an active exception.
1272 _raise_to_user(errors_to_report, self.settings, report_lines)
1275def _raise_to_user(errors_to_report, settings, target_lines, trailer=""):
1276 """Helper function for attaching notes and grouping multiple errors."""
1277 failing_prefix = "Falsifying example: "
1278 ls = []
1279 for fragments, err in errors_to_report:
1280 for note in fragments:
1281 add_note(err, note)
1282 if note.startswith(failing_prefix):
1283 ls.append(note[len(failing_prefix) :])
1284 if current_pytest_item.value:
1285 current_pytest_item.value._hypothesis_failing_examples = ls
1287 if len(errors_to_report) == 1:
1288 _, the_error_hypothesis_found = errors_to_report[0]
1289 else:
1290 assert errors_to_report
1291 the_error_hypothesis_found = BaseExceptionGroup(
1292 f"Hypothesis found {len(errors_to_report)} distinct failures{trailer}.",
1293 [e for _, e in errors_to_report],
1294 )
1296 if settings.verbosity >= Verbosity.normal:
1297 for line in target_lines:
1298 add_note(the_error_hypothesis_found, line)
1299 raise the_error_hypothesis_found
1302@contextlib.contextmanager
1303def fake_subTest(self, msg=None, **__):
1304 """Monkeypatch for `unittest.TestCase.subTest` during `@given`.
1306 If we don't patch this out, each failing example is reported as a
1307 separate failing test by the unittest test runner, which is
1308 obviously incorrect. We therefore replace it for the duration with
1309 this version.
1310 """
1311 warnings.warn(
1312 "subTest per-example reporting interacts badly with Hypothesis "
1313 "trying hundreds of examples, so we disable it for the duration of "
1314 "any test that uses `@given`.",
1315 HypothesisWarning,
1316 stacklevel=2,
1317 )
1318 yield
1321@attr.s()
1322class HypothesisHandle:
1323 """This object is provided as the .hypothesis attribute on @given tests.
1325 Downstream users can reassign its attributes to insert custom logic into
1326 the execution of each case, for example by converting an async into a
1327 sync function.
1329 This must be an attribute of an attribute, because reassignment of a
1330 first-level attribute would not be visible to Hypothesis if the function
1331 had been decorated before the assignment.
1333 See https://github.com/HypothesisWorks/hypothesis/issues/1257 for more
1334 information.
1335 """
1337 inner_test = attr.ib()
1338 _get_fuzz_target = attr.ib()
1339 _given_kwargs = attr.ib()
1341 @property
1342 def fuzz_one_input(
1343 self,
1344 ) -> Callable[[Union[bytes, bytearray, memoryview, BinaryIO]], Optional[bytes]]:
1345 """Run the test as a fuzz target, driven with the `buffer` of bytes.
1347 Returns None if buffer invalid for the strategy, canonical pruned
1348 bytes if the buffer was valid, and leaves raised exceptions alone.
1349 """
1350 # Note: most users, if they care about fuzzer performance, will access the
1351 # property and assign it to a local variable to move the attribute lookup
1352 # outside their fuzzing loop / before the fork point. We cache it anyway,
1353 # so that naive or unusual use-cases get the best possible performance too.
1354 try:
1355 return self.__cached_target # type: ignore
1356 except AttributeError:
1357 self.__cached_target = self._get_fuzz_target()
1358 return self.__cached_target
1361@overload
1362def given(
1363 _: EllipsisType, /
1364) -> Callable[
1365 [Callable[..., Optional[Coroutine[Any, Any, None]]]], Callable[[], None]
1366]: # pragma: no cover
1367 ...
1370@overload
1371def given(
1372 *_given_arguments: SearchStrategy[Any],
1373) -> Callable[
1374 [Callable[..., Optional[Coroutine[Any, Any, None]]]], Callable[..., None]
1375]: # pragma: no cover
1376 ...
1379@overload
1380def given(
1381 **_given_kwargs: Union[SearchStrategy[Any], EllipsisType],
1382) -> Callable[
1383 [Callable[..., Optional[Coroutine[Any, Any, None]]]], Callable[..., None]
1384]: # pragma: no cover
1385 ...
1388def given(
1389 *_given_arguments: Union[SearchStrategy[Any], EllipsisType],
1390 **_given_kwargs: Union[SearchStrategy[Any], EllipsisType],
1391) -> Callable[
1392 [Callable[..., Optional[Coroutine[Any, Any, None]]]], Callable[..., None]
1393]:
1394 """A decorator for turning a test function that accepts arguments into a
1395 randomized test.
1397 This is the main entry point to Hypothesis.
1398 """
1400 def run_test_as_given(test):
1401 if inspect.isclass(test):
1402 # Provide a meaningful error to users, instead of exceptions from
1403 # internals that assume we're dealing with a function.
1404 raise InvalidArgument("@given cannot be applied to a class.")
1405 given_arguments = tuple(_given_arguments)
1406 given_kwargs = dict(_given_kwargs)
1408 original_sig = get_signature(test)
1409 if given_arguments == (Ellipsis,) and not given_kwargs:
1410 # user indicated that they want to infer all arguments
1411 given_kwargs = {
1412 p.name: Ellipsis
1413 for p in original_sig.parameters.values()
1414 if p.kind in (p.POSITIONAL_OR_KEYWORD, p.KEYWORD_ONLY)
1415 }
1416 given_arguments = ()
1418 check_invalid = is_invalid_test(
1419 test, original_sig, given_arguments, given_kwargs
1420 )
1422 # If the argument check found problems, return a dummy test function
1423 # that will raise an error if it is actually called.
1424 if check_invalid is not None:
1425 return check_invalid
1427 # Because the argument check succeeded, we can convert @given's
1428 # positional arguments into keyword arguments for simplicity.
1429 if given_arguments:
1430 assert not given_kwargs
1431 posargs = [
1432 p.name
1433 for p in original_sig.parameters.values()
1434 if p.kind is p.POSITIONAL_OR_KEYWORD
1435 ]
1436 given_kwargs = dict(list(zip(posargs[::-1], given_arguments[::-1]))[::-1])
1437 # These have been converted, so delete them to prevent accidental use.
1438 del given_arguments
1440 new_signature = new_given_signature(original_sig, given_kwargs)
1442 # Use type information to convert "infer" arguments into appropriate strategies.
1443 if ... in given_kwargs.values():
1444 hints = get_type_hints(test)
1445 for name in [name for name, value in given_kwargs.items() if value is ...]:
1446 if name not in hints:
1447 return _invalid(
1448 f"passed {name}=... for {test.__name__}, but {name} has "
1449 "no type annotation",
1450 test=test,
1451 given_kwargs=given_kwargs,
1452 )
1453 given_kwargs[name] = st.from_type(hints[name])
1455 prev_self = Unset = object()
1457 @impersonate(test)
1458 @define_function_signature(test.__name__, test.__doc__, new_signature)
1459 def wrapped_test(*arguments, **kwargs):
1460 # Tell pytest to omit the body of this function from tracebacks
1461 __tracebackhide__ = True
1463 test = wrapped_test.hypothesis.inner_test
1465 if getattr(test, "is_hypothesis_test", False):
1466 raise InvalidArgument(
1467 f"You have applied @given to the test {test.__name__} more than "
1468 "once, which wraps the test several times and is extremely slow. "
1469 "A similar effect can be gained by combining the arguments "
1470 "of the two calls to given. For example, instead of "
1471 "@given(booleans()) @given(integers()), you could write "
1472 "@given(booleans(), integers())"
1473 )
1475 settings = wrapped_test._hypothesis_internal_use_settings
1477 random = get_random_for_wrapped_test(test, wrapped_test)
1479 arguments, kwargs, stuff = process_arguments_to_given(
1480 wrapped_test, arguments, kwargs, given_kwargs, new_signature.parameters
1481 )
1483 if (
1484 inspect.iscoroutinefunction(test)
1485 and get_executor(stuff.selfy) is default_executor
1486 ):
1487 # See https://github.com/HypothesisWorks/hypothesis/issues/3054
1488 # If our custom executor doesn't handle coroutines, or we return an
1489 # awaitable from a non-async-def function, we just rely on the
1490 # return_value health check. This catches most user errors though.
1491 raise InvalidArgument(
1492 "Hypothesis doesn't know how to run async test functions like "
1493 f"{test.__name__}. You'll need to write a custom executor, "
1494 "or use a library like pytest-asyncio or pytest-trio which can "
1495 "handle the translation for you.\n See https://hypothesis."
1496 "readthedocs.io/en/latest/details.html#custom-function-execution"
1497 )
1499 runner = stuff.selfy
1500 if isinstance(stuff.selfy, TestCase) and test.__name__ in dir(TestCase):
1501 msg = (
1502 f"You have applied @given to the method {test.__name__}, which is "
1503 "used by the unittest runner but is not itself a test."
1504 " This is not useful in any way."
1505 )
1506 fail_health_check(settings, msg, HealthCheck.not_a_test_method)
1507 if bad_django_TestCase(runner): # pragma: no cover
1508 # Covered by the Django tests, but not the pytest coverage task
1509 raise InvalidArgument(
1510 "You have applied @given to a method on "
1511 f"{type(runner).__qualname__}, but this "
1512 "class does not inherit from the supported versions in "
1513 "`hypothesis.extra.django`. Use the Hypothesis variants "
1514 "to ensure that each example is run in a separate "
1515 "database transaction."
1516 )
1517 if settings.database is not None:
1518 nonlocal prev_self
1519 # Check selfy really is self (not e.g. a mock) before we health-check
1520 cur_self = (
1521 stuff.selfy
1522 if getattr(type(stuff.selfy), test.__name__, None) is wrapped_test
1523 else None
1524 )
1525 if prev_self is Unset:
1526 prev_self = cur_self
1527 elif cur_self is not prev_self:
1528 msg = (
1529 f"The method {test.__qualname__} was called from multiple "
1530 "different executors. This may lead to flaky tests and "
1531 "nonreproducible errors when replaying from database."
1532 )
1533 fail_health_check(settings, msg, HealthCheck.differing_executors)
1535 state = StateForActualGivenExecution(
1536 stuff, test, settings, random, wrapped_test
1537 )
1539 reproduce_failure = wrapped_test._hypothesis_internal_use_reproduce_failure
1541 # If there was a @reproduce_failure decorator, use it to reproduce
1542 # the error (or complain that we couldn't). Either way, this will
1543 # always raise some kind of error.
1544 if reproduce_failure is not None:
1545 expected_version, failure = reproduce_failure
1546 if expected_version != __version__:
1547 raise InvalidArgument(
1548 "Attempting to reproduce a failure from a different "
1549 "version of Hypothesis. This failure is from %s, but "
1550 "you are currently running %r. Please change your "
1551 "Hypothesis version to a matching one."
1552 % (expected_version, __version__)
1553 )
1554 try:
1555 state.execute_once(
1556 ConjectureData.for_buffer(decode_failure(failure)),
1557 print_example=True,
1558 is_final=True,
1559 )
1560 raise DidNotReproduce(
1561 "Expected the test to raise an error, but it "
1562 "completed successfully."
1563 )
1564 except StopTest:
1565 raise DidNotReproduce(
1566 "The shape of the test data has changed in some way "
1567 "from where this blob was defined. Are you sure "
1568 "you're running the same test?"
1569 ) from None
1570 except UnsatisfiedAssumption:
1571 raise DidNotReproduce(
1572 "The test data failed to satisfy an assumption in the "
1573 "test. Have you added it since this blob was generated?"
1574 ) from None
1576 # There was no @reproduce_failure, so start by running any explicit
1577 # examples from @example decorators.
1578 errors = list(
1579 execute_explicit_examples(
1580 state, wrapped_test, arguments, kwargs, original_sig
1581 )
1582 )
1583 if errors:
1584 # If we're not going to report multiple bugs, we would have
1585 # stopped running explicit examples at the first failure.
1586 assert len(errors) == 1 or state.settings.report_multiple_bugs
1588 # If an explicit example raised a 'skip' exception, ensure it's never
1589 # wrapped up in an exception group. Because we break out of the loop
1590 # immediately on finding a skip, if present it's always the last error.
1591 if isinstance(errors[-1][1], skip_exceptions_to_reraise()):
1592 # Covered by `test_issue_3453_regression`, just in a subprocess.
1593 del errors[:-1] # pragma: no cover
1595 _raise_to_user(errors, state.settings, [], " in explicit examples")
1597 # If there were any explicit examples, they all ran successfully.
1598 # The next step is to use the Conjecture engine to run the test on
1599 # many different inputs.
1601 ran_explicit_examples = Phase.explicit in state.settings.phases and getattr(
1602 wrapped_test, "hypothesis_explicit_examples", ()
1603 )
1604 SKIP_BECAUSE_NO_EXAMPLES = unittest.SkipTest(
1605 "Hypothesis has been told to run no examples for this test."
1606 )
1607 if not (
1608 Phase.reuse in settings.phases or Phase.generate in settings.phases
1609 ):
1610 if not ran_explicit_examples:
1611 raise SKIP_BECAUSE_NO_EXAMPLES
1612 return
1614 try:
1615 if isinstance(runner, TestCase) and hasattr(runner, "subTest"):
1616 subTest = runner.subTest
1617 try:
1618 runner.subTest = types.MethodType(fake_subTest, runner)
1619 state.run_engine()
1620 finally:
1621 runner.subTest = subTest
1622 else:
1623 state.run_engine()
1624 except BaseException as e:
1625 # The exception caught here should either be an actual test
1626 # failure (or BaseExceptionGroup), or some kind of fatal error
1627 # that caused the engine to stop.
1629 generated_seed = wrapped_test._hypothesis_internal_use_generated_seed
1630 with local_settings(settings):
1631 if not (state.failed_normally or generated_seed is None):
1632 if running_under_pytest:
1633 report(
1634 f"You can add @seed({generated_seed}) to this test or "
1635 f"run pytest with --hypothesis-seed={generated_seed} "
1636 "to reproduce this failure."
1637 )
1638 else:
1639 report(
1640 f"You can add @seed({generated_seed}) to this test to "
1641 "reproduce this failure."
1642 )
1643 # The dance here is to avoid showing users long tracebacks
1644 # full of Hypothesis internals they don't care about.
1645 # We have to do this inline, to avoid adding another
1646 # internal stack frame just when we've removed the rest.
1647 #
1648 # Using a variable for our trimmed error ensures that the line
1649 # which will actually appear in tracebacks is as clear as
1650 # possible - "raise the_error_hypothesis_found".
1651 the_error_hypothesis_found = e.with_traceback(
1652 None
1653 if isinstance(e, BaseExceptionGroup)
1654 else get_trimmed_traceback()
1655 )
1656 raise the_error_hypothesis_found
1658 if not (ran_explicit_examples or state.ever_executed):
1659 raise SKIP_BECAUSE_NO_EXAMPLES
1661 def _get_fuzz_target() -> (
1662 Callable[[Union[bytes, bytearray, memoryview, BinaryIO]], Optional[bytes]]
1663 ):
1664 # Because fuzzing interfaces are very performance-sensitive, we use a
1665 # somewhat more complicated structure here. `_get_fuzz_target()` is
1666 # called by the `HypothesisHandle.fuzz_one_input` property, allowing
1667 # us to defer our collection of the settings, random instance, and
1668 # reassignable `inner_test` (etc) until `fuzz_one_input` is accessed.
1669 #
1670 # We then share the performance cost of setting up `state` between
1671 # many invocations of the target. We explicitly force `deadline=None`
1672 # for performance reasons, saving ~40% the runtime of an empty test.
1673 test = wrapped_test.hypothesis.inner_test
1674 settings = Settings(
1675 parent=wrapped_test._hypothesis_internal_use_settings, deadline=None
1676 )
1677 random = get_random_for_wrapped_test(test, wrapped_test)
1678 _args, _kwargs, stuff = process_arguments_to_given(
1679 wrapped_test, (), {}, given_kwargs, new_signature.parameters
1680 )
1681 assert not _args
1682 assert not _kwargs
1683 state = StateForActualGivenExecution(
1684 stuff, test, settings, random, wrapped_test
1685 )
1686 digest = function_digest(test)
1687 # We track the minimal-so-far example for each distinct origin, so
1688 # that we track log-n instead of n examples for long runs. In particular
1689 # it means that we saturate for common errors in long runs instead of
1690 # storing huge volumes of low-value data.
1691 minimal_failures: dict = {}
1693 def fuzz_one_input(
1694 buffer: Union[bytes, bytearray, memoryview, BinaryIO]
1695 ) -> Optional[bytes]:
1696 # This inner part is all that the fuzzer will actually run,
1697 # so we keep it as small and as fast as possible.
1698 if isinstance(buffer, io.IOBase):
1699 buffer = buffer.read(BUFFER_SIZE)
1700 assert isinstance(buffer, (bytes, bytearray, memoryview))
1701 data = ConjectureData.for_buffer(buffer)
1702 try:
1703 state.execute_once(data)
1704 except (StopTest, UnsatisfiedAssumption):
1705 return None
1706 except BaseException:
1707 buffer = bytes(data.buffer)
1708 known = minimal_failures.get(data.interesting_origin)
1709 if settings.database is not None and (
1710 known is None or sort_key(buffer) <= sort_key(known)
1711 ):
1712 settings.database.save(digest, buffer)
1713 minimal_failures[data.interesting_origin] = buffer
1714 raise
1715 return bytes(data.buffer)
1717 fuzz_one_input.__doc__ = HypothesisHandle.fuzz_one_input.__doc__
1718 return fuzz_one_input
1720 # After having created the decorated test function, we need to copy
1721 # over some attributes to make the switch as seamless as possible.
1723 for attrib in dir(test):
1724 if not (attrib.startswith("_") or hasattr(wrapped_test, attrib)):
1725 setattr(wrapped_test, attrib, getattr(test, attrib))
1726 wrapped_test.is_hypothesis_test = True
1727 if hasattr(test, "_hypothesis_internal_settings_applied"):
1728 # Used to check if @settings is applied twice.
1729 wrapped_test._hypothesis_internal_settings_applied = True
1730 wrapped_test._hypothesis_internal_use_seed = getattr(
1731 test, "_hypothesis_internal_use_seed", None
1732 )
1733 wrapped_test._hypothesis_internal_use_settings = (
1734 getattr(test, "_hypothesis_internal_use_settings", None) or Settings.default
1735 )
1736 wrapped_test._hypothesis_internal_use_reproduce_failure = getattr(
1737 test, "_hypothesis_internal_use_reproduce_failure", None
1738 )
1739 wrapped_test.hypothesis = HypothesisHandle(test, _get_fuzz_target, given_kwargs)
1740 return wrapped_test
1742 return run_test_as_given
1745def find(
1746 specifier: SearchStrategy[Ex],
1747 condition: Callable[[Any], bool],
1748 *,
1749 settings: Optional[Settings] = None,
1750 random: Optional[Random] = None,
1751 database_key: Optional[bytes] = None,
1752) -> Ex:
1753 """Returns the minimal example from the given strategy ``specifier`` that
1754 matches the predicate function ``condition``."""
1755 if settings is None:
1756 settings = Settings(max_examples=2000)
1757 settings = Settings(
1758 settings, suppress_health_check=list(HealthCheck), report_multiple_bugs=False
1759 )
1761 if database_key is None and settings.database is not None:
1762 # Note: The database key is not guaranteed to be unique. If not, replaying
1763 # of database examples may fail to reproduce due to being replayed on the
1764 # wrong condition.
1765 database_key = function_digest(condition)
1767 if not isinstance(specifier, SearchStrategy):
1768 raise InvalidArgument(
1769 f"Expected SearchStrategy but got {specifier!r} of "
1770 f"type {type(specifier).__name__}"
1771 )
1772 specifier.validate()
1774 last: List[Ex] = []
1776 @settings
1777 @given(specifier)
1778 def test(v):
1779 if condition(v):
1780 last[:] = [v]
1781 raise Found
1783 if random is not None:
1784 test = seed(random.getrandbits(64))(test)
1786 # Aliasing as Any avoids mypy errors (attr-defined) when accessing and
1787 # setting custom attributes on the decorated function or class.
1788 _test: Any = test
1789 _test._hypothesis_internal_is_find = True
1790 _test._hypothesis_internal_database_key = database_key
1792 try:
1793 test()
1794 except Found:
1795 return last[0]
1797 raise NoSuchExample(get_pretty_function_description(condition))