Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/hypothesis/core.py: 34%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
11"""This module provides the core primitives of Hypothesis, such as given."""
13import base64
14import contextlib
15import dataclasses
16import datetime
17import inspect
18import io
19import math
20import os
21import sys
22import threading
23import time
24import traceback
25import types
26import unittest
27import warnings
28import zlib
29from collections import defaultdict
30from collections.abc import Callable, Coroutine, Generator, Hashable, Iterable, Sequence
31from dataclasses import dataclass, field
32from functools import partial
33from inspect import Parameter
34from random import Random
35from threading import Lock
36from types import EllipsisType
37from typing import (
38 Any,
39 BinaryIO,
40 TypeVar,
41 overload,
42)
43from unittest import TestCase
45from hypothesis import strategies as st
46from hypothesis._settings import (
47 HealthCheck,
48 Phase,
49 Verbosity,
50 all_settings,
51 local_settings,
52 settings as Settings,
53)
54from hypothesis.control import BuildContext, currently_in_test_context
55from hypothesis.database import choices_from_bytes, choices_to_bytes
56from hypothesis.errors import (
57 BackendCannotProceed,
58 DeadlineExceeded,
59 DidNotReproduce,
60 FailedHealthCheck,
61 FlakyFailure,
62 FlakyReplay,
63 Found,
64 Frozen,
65 HypothesisException,
66 HypothesisWarning,
67 InvalidArgument,
68 NoSuchExample,
69 StopTest,
70 Unsatisfiable,
71 UnsatisfiedAssumption,
72)
73from hypothesis.internal import observability
74from hypothesis.internal.compat import (
75 PYPY,
76 BaseExceptionGroup,
77 add_note,
78 bad_django_TestCase,
79 get_type_hints,
80 int_from_bytes,
81)
82from hypothesis.internal.conjecture.choice import ChoiceT
83from hypothesis.internal.conjecture.data import ConjectureData, Status
84from hypothesis.internal.conjecture.engine import BUFFER_SIZE, ConjectureRunner
85from hypothesis.internal.conjecture.junkdrawer import (
86 ensure_free_stackframes,
87 gc_cumulative_time,
88)
89from hypothesis.internal.conjecture.providers import (
90 BytestringProvider,
91 PrimitiveProvider,
92)
93from hypothesis.internal.conjecture.shrinker import sort_key
94from hypothesis.internal.entropy import deterministic_PRNG
95from hypothesis.internal.escalation import (
96 InterestingOrigin,
97 current_pytest_item,
98 format_exception,
99 get_trimmed_traceback,
100 is_hypothesis_file,
101)
102from hypothesis.internal.healthcheck import fail_health_check
103from hypothesis.internal.observability import (
104 InfoObservation,
105 InfoObservationType,
106 deliver_observation,
107 make_testcase,
108 observability_enabled,
109)
110from hypothesis.internal.reflection import (
111 convert_positional_arguments,
112 define_function_signature,
113 function_digest,
114 get_pretty_function_description,
115 get_signature,
116 impersonate,
117 is_mock,
118 nicerepr,
119 proxies,
120 repr_call,
121)
122from hypothesis.internal.scrutineer import (
123 MONITORING_TOOL_ID,
124 Trace,
125 Tracer,
126 explanatory_lines,
127 tractable_coverage_report,
128)
129from hypothesis.internal.validation import check_type
130from hypothesis.reporting import (
131 current_verbosity,
132 report,
133 verbose_report,
134 with_reporter,
135)
136from hypothesis.statistics import describe_statistics, describe_targets, note_statistics
137from hypothesis.strategies._internal.misc import NOTHING
138from hypothesis.strategies._internal.strategies import (
139 Ex,
140 SearchStrategy,
141 check_strategy,
142)
143from hypothesis.utils.conventions import not_set
144from hypothesis.utils.threading import ThreadLocal
145from hypothesis.vendor.pretty import RepresentationPrinter
146from hypothesis.version import __version__
148TestFunc = TypeVar("TestFunc", bound=Callable)
151running_under_pytest = False
152pytest_shows_exceptiongroups = True
153global_force_seed = None
154# `threadlocal` stores "engine-global" constants, which are global relative to a
155# ConjectureRunner instance (roughly speaking). Since only one conjecture runner
156# instance can be active per thread, making engine constants thread-local prevents
157# the ConjectureRunner instances of concurrent threads from treading on each other.
158threadlocal = ThreadLocal(_hypothesis_global_random=lambda: None)
161@dataclass(slots=True, frozen=False)
162class Example:
163 args: Any
164 kwargs: Any
165 # Plus two optional arguments for .xfail()
166 raises: Any = field(default=None)
167 reason: Any = field(default=None)
170@dataclass(slots=True, frozen=True)
171class ReportableError:
172 fragments: list[str]
173 exception: BaseException
176# TODO_DOCS link to not-yet-existent patch-dumping docs
179class example:
180 """
181 Add an explicit input to a Hypothesis test, which Hypothesis will always
182 try before generating random inputs. This combines the randomized nature of
183 Hypothesis generation with a traditional parametrized test.
185 For example:
187 .. code-block:: python
189 @example("Hello world")
190 @example("some string with special significance")
191 @given(st.text())
192 def test_strings(s):
193 pass
195 will call ``test_strings("Hello World")`` and
196 ``test_strings("some string with special significance")`` before generating
197 any random inputs. |@example| may be placed in any order relative to |@given|
198 and |@settings|.
200 Explicit inputs from |@example| are run in the |Phase.explicit| phase.
201 Explicit inputs do not count towards |settings.max_examples|. Note that
202 explicit inputs added by |@example| do not shrink. If an explicit input
203 fails, Hypothesis will stop and report the failure without generating any
204 random inputs.
206 |@example| can also be used to easily reproduce a failure. For instance, if
207 Hypothesis reports that ``f(n=[0, math.nan])`` fails, you can add
208 ``@example(n=[0, math.nan])`` to your test to quickly reproduce that failure.
210 Arguments to ``@example``
211 -------------------------
213 Arguments to |@example| have the same behavior and restrictions as arguments
214 to |@given|. This means they may be either positional or keyword arguments
215 (but not both in the same |@example|):
217 .. code-block:: python
219 @example(1, 2)
220 @example(x=1, y=2)
221 @given(st.integers(), st.integers())
222 def test(x, y):
223 pass
225 Noting that while arguments to |@given| are strategies (like |st.integers|),
226 arguments to |@example| are values instead (like ``1``).
228 See the :ref:`given-arguments` section for full details.
229 """
231 def __init__(self, *args: Any, **kwargs: Any) -> None:
232 if args and kwargs:
233 raise InvalidArgument(
234 "Cannot mix positional and keyword arguments for examples"
235 )
236 if not (args or kwargs):
237 raise InvalidArgument("An example must provide at least one argument")
239 self.hypothesis_explicit_examples: list[Example] = []
240 self._this_example = Example(tuple(args), kwargs)
242 def __call__(self, test: TestFunc) -> TestFunc:
243 if not hasattr(test, "hypothesis_explicit_examples"):
244 test.hypothesis_explicit_examples = self.hypothesis_explicit_examples # type: ignore
245 test.hypothesis_explicit_examples.append(self._this_example) # type: ignore
246 return test
248 def xfail(
249 self,
250 condition: bool = True, # noqa: FBT002
251 *,
252 reason: str = "",
253 raises: type[BaseException] | tuple[type[BaseException], ...] = BaseException,
254 ) -> "example":
255 """Mark this example as an expected failure, similarly to
256 :obj:`pytest.mark.xfail(strict=True) <pytest.mark.xfail>`.
258 Expected-failing examples allow you to check that your test does fail on
259 some examples, and therefore build confidence that *passing* tests are
260 because your code is working, not because the test is missing something.
262 .. code-block:: python
264 @example(...).xfail()
265 @example(...).xfail(reason="Prices must be non-negative")
266 @example(...).xfail(raises=(KeyError, ValueError))
267 @example(...).xfail(sys.version_info[:2] >= (3, 12), reason="needs py 3.12")
268 @example(...).xfail(condition=sys.platform != "linux", raises=OSError)
269 def test(x):
270 pass
272 .. note::
274 Expected-failing examples are handled separately from those generated
275 by strategies, so you should usually ensure that there is no overlap.
277 .. code-block:: python
279 @example(x=1, y=0).xfail(raises=ZeroDivisionError)
280 @given(x=st.just(1), y=st.integers()) # Missing `.filter(bool)`!
281 def test_fraction(x, y):
282 # This test will try the explicit example and see it fail as
283 # expected, then go on to generate more examples from the
284 # strategy. If we happen to generate y=0, the test will fail
285 # because only the explicit example is treated as xfailing.
286 x / y
287 """
288 check_type(bool, condition, "condition")
289 check_type(str, reason, "reason")
290 if not (
291 isinstance(raises, type) and issubclass(raises, BaseException)
292 ) and not (
293 isinstance(raises, tuple)
294 and raises # () -> expected to fail with no error, which is impossible
295 and all(
296 isinstance(r, type) and issubclass(r, BaseException) for r in raises
297 )
298 ):
299 raise InvalidArgument(
300 f"{raises=} must be an exception type or tuple of exception types"
301 )
302 if condition:
303 self._this_example = dataclasses.replace(
304 self._this_example, raises=raises, reason=reason
305 )
306 return self
308 def via(self, whence: str, /) -> "example":
309 """Attach a machine-readable label noting what the origin of this example
310 was. |example.via| is completely optional and does not change runtime
311 behavior.
313 |example.via| is intended to support self-documenting behavior, as well as
314 tooling which might add (or remove) |@example| decorators automatically.
315 For example:
317 .. code-block:: python
319 # Annotating examples is optional and does not change runtime behavior
320 @example(...)
321 @example(...).via("regression test for issue #42")
322 @example(...).via("discovered failure")
323 def test(x):
324 pass
326 .. note::
328 `HypoFuzz <https://hypofuzz.com/>`_ uses |example.via| to tag examples
329 in the patch of its high-coverage set of explicit inputs, on
330 `the patches page <https://hypofuzz.com/example-dashboard/#/patches>`_.
331 """
332 if not isinstance(whence, str):
333 raise InvalidArgument(".via() must be passed a string")
334 # This is deliberately a no-op at runtime; the tools operate on source code.
335 return self
338def seed(seed: Hashable) -> Callable[[TestFunc], TestFunc]:
339 """
340 Seed the randomness for this test.
342 ``seed`` may be any hashable object. No exact meaning for ``seed`` is provided
343 other than that for a fixed seed value Hypothesis will produce the same
344 examples (assuming that there are no other sources of nondeterminisim, such
345 as timing, hash randomization, or external state).
347 For example, the following test function and |RuleBasedStateMachine| will
348 each generate the same series of examples each time they are executed:
350 .. code-block:: python
352 @seed(1234)
353 @given(st.integers())
354 def test(n): ...
356 @seed(6789)
357 class MyMachine(RuleBasedStateMachine): ...
359 If using pytest, you can alternatively pass ``--hypothesis-seed`` on the
360 command line.
362 Setting a seed overrides |settings.derandomize|, which is designed to enable
363 deterministic CI tests rather than reproducing observed failures.
365 Hypothesis will only print the seed which would reproduce a failure if a test
366 fails in an unexpected way, for instance inside Hypothesis internals.
367 """
369 def accept(test):
370 test._hypothesis_internal_use_seed = seed
371 current_settings = getattr(test, "_hypothesis_internal_use_settings", None)
372 test._hypothesis_internal_use_settings = Settings(
373 current_settings, database=None
374 )
375 return test
377 return accept
380# TODO_DOCS: link to /explanation/choice-sequence
383def reproduce_failure(version: str, blob: bytes) -> Callable[[TestFunc], TestFunc]:
384 """
385 Run the example corresponding to the binary ``blob`` in order to reproduce a
386 failure. ``blob`` is a serialized version of the internal input representation
387 of Hypothesis.
389 A test decorated with |@reproduce_failure| always runs exactly one example,
390 which is expected to cause a failure. If the provided ``blob`` does not
391 cause a failure, Hypothesis will raise |DidNotReproduce|.
393 Hypothesis will print an |@reproduce_failure| decorator if
394 |settings.print_blob| is ``True`` (which is the default in CI).
396 |@reproduce_failure| is intended to be temporarily added to your test suite in
397 order to reproduce a failure. It is not intended to be a permanent addition to
398 your test suite. Because of this, no compatibility guarantees are made across
399 Hypothesis versions, and |@reproduce_failure| will error if used on a different
400 Hypothesis version than it was created for.
402 .. seealso::
404 See also the :doc:`/tutorial/replaying-failures` tutorial.
405 """
407 def accept(test):
408 test._hypothesis_internal_use_reproduce_failure = (version, blob)
409 return test
411 return accept
414def reproduction_decorator(choices: Iterable[ChoiceT]) -> str:
415 return f"@reproduce_failure({__version__!r}, {encode_failure(choices)!r})"
418def encode_failure(choices: Iterable[ChoiceT]) -> bytes:
419 blob = choices_to_bytes(choices)
420 compressed = zlib.compress(blob)
421 if len(compressed) < len(blob):
422 blob = b"\1" + compressed
423 else:
424 blob = b"\0" + blob
425 return base64.b64encode(blob)
428def decode_failure(blob: bytes) -> Sequence[ChoiceT]:
429 try:
430 decoded = base64.b64decode(blob)
431 except Exception:
432 raise InvalidArgument(f"Invalid base64 encoded string: {blob!r}") from None
434 prefix = decoded[:1]
435 if prefix == b"\0":
436 decoded = decoded[1:]
437 elif prefix == b"\1":
438 try:
439 decoded = zlib.decompress(decoded[1:])
440 except zlib.error as err:
441 raise InvalidArgument(
442 f"Invalid zlib compression for blob {blob!r}"
443 ) from err
444 else:
445 raise InvalidArgument(
446 f"Could not decode blob {blob!r}: Invalid start byte {prefix!r}"
447 )
449 choices = choices_from_bytes(decoded)
450 if choices is None:
451 raise InvalidArgument(f"Invalid serialized choice sequence for blob {blob!r}")
453 return choices
456def _invalid(message, *, exc=InvalidArgument, test, given_kwargs):
457 @impersonate(test)
458 def wrapped_test(*arguments, **kwargs): # pragma: no cover # coverage limitation
459 raise exc(message)
461 wrapped_test.is_hypothesis_test = True
462 wrapped_test.hypothesis = HypothesisHandle(
463 inner_test=test,
464 _get_fuzz_target=wrapped_test,
465 _given_kwargs=given_kwargs,
466 )
467 return wrapped_test
470def is_invalid_test(test, original_sig, given_arguments, given_kwargs):
471 """Check the arguments to ``@given`` for basic usage constraints.
473 Most errors are not raised immediately; instead we return a dummy test
474 function that will raise the appropriate error if it is actually called.
475 When the user runs a subset of tests (e.g via ``pytest -k``), errors will
476 only be reported for tests that actually ran.
477 """
478 invalid = partial(_invalid, test=test, given_kwargs=given_kwargs)
480 if not (given_arguments or given_kwargs):
481 return invalid("given must be called with at least one argument")
483 params = list(original_sig.parameters.values())
484 pos_params = [p for p in params if p.kind is p.POSITIONAL_OR_KEYWORD]
485 kwonly_params = [p for p in params if p.kind is p.KEYWORD_ONLY]
486 if given_arguments and params != pos_params:
487 return invalid(
488 "positional arguments to @given are not supported with varargs, "
489 "varkeywords, positional-only, or keyword-only arguments"
490 )
492 if len(given_arguments) > len(pos_params):
493 return invalid(
494 f"Too many positional arguments for {test.__name__}() were passed to "
495 f"@given - expected at most {len(pos_params)} "
496 f"arguments, but got {len(given_arguments)} {given_arguments!r}"
497 )
499 if ... in given_arguments:
500 return invalid(
501 "... was passed as a positional argument to @given, but may only be "
502 "passed as a keyword argument or as the sole argument of @given"
503 )
505 if given_arguments and given_kwargs:
506 return invalid("cannot mix positional and keyword arguments to @given")
507 extra_kwargs = [
508 k for k in given_kwargs if k not in {p.name for p in pos_params + kwonly_params}
509 ]
510 if extra_kwargs and (params == [] or params[-1].kind is not params[-1].VAR_KEYWORD):
511 arg = extra_kwargs[0]
512 extra = ""
513 if arg in all_settings:
514 extra = f". Did you mean @settings({arg}={given_kwargs[arg]!r})?"
515 return invalid(
516 f"{test.__name__}() got an unexpected keyword argument {arg!r}, "
517 f"from `{arg}={given_kwargs[arg]!r}` in @given{extra}"
518 )
519 if any(p.default is not p.empty for p in params):
520 return invalid("Cannot apply @given to a function with defaults.")
522 # This case would raise Unsatisfiable *anyway*, but by detecting it here we can
523 # provide a much more helpful error message for people e.g. using the Ghostwriter.
524 empty = [
525 f"{s!r} (arg {idx})" for idx, s in enumerate(given_arguments) if s is NOTHING
526 ] + [f"{name}={s!r}" for name, s in given_kwargs.items() if s is NOTHING]
527 if empty:
528 strats = "strategies" if len(empty) > 1 else "strategy"
529 return invalid(
530 f"Cannot generate examples from empty {strats}: " + ", ".join(empty),
531 exc=Unsatisfiable,
532 )
535def execute_explicit_examples(state, wrapped_test, arguments, kwargs, original_sig):
536 assert isinstance(state, StateForActualGivenExecution)
537 posargs = [
538 p.name
539 for p in original_sig.parameters.values()
540 if p.kind is p.POSITIONAL_OR_KEYWORD
541 ]
543 for example in reversed(getattr(wrapped_test, "hypothesis_explicit_examples", ())):
544 assert isinstance(example, Example)
545 # All of this validation is to check that @example() got "the same" arguments
546 # as @given, i.e. corresponding to the same parameters, even though they might
547 # be any mixture of positional and keyword arguments.
548 if example.args:
549 assert not example.kwargs
550 if any(
551 p.kind is p.POSITIONAL_ONLY for p in original_sig.parameters.values()
552 ):
553 raise InvalidArgument(
554 "Cannot pass positional arguments to @example() when decorating "
555 "a test function which has positional-only parameters."
556 )
557 if len(example.args) > len(posargs):
558 raise InvalidArgument(
559 "example has too many arguments for test. Expected at most "
560 f"{len(posargs)} but got {len(example.args)}"
561 )
562 example_kwargs = dict(
563 zip(posargs[-len(example.args) :], example.args, strict=True)
564 )
565 else:
566 example_kwargs = dict(example.kwargs)
567 given_kws = ", ".join(
568 repr(k) for k in sorted(wrapped_test.hypothesis._given_kwargs)
569 )
570 example_kws = ", ".join(repr(k) for k in sorted(example_kwargs))
571 if given_kws != example_kws:
572 raise InvalidArgument(
573 f"Inconsistent args: @given() got strategies for {given_kws}, "
574 f"but @example() got arguments for {example_kws}"
575 ) from None
577 # This is certainly true because the example_kwargs exactly match the params
578 # reserved by @given(), which are then remove from the function signature.
579 assert set(example_kwargs).isdisjoint(kwargs)
580 example_kwargs.update(kwargs)
582 if Phase.explicit not in state.settings.phases:
583 continue
585 with local_settings(state.settings):
586 fragments_reported = []
587 empty_data = ConjectureData.for_choices([])
588 try:
589 execute_example = partial(
590 state.execute_once,
591 empty_data,
592 is_final=True,
593 print_example=True,
594 example_kwargs=example_kwargs,
595 )
596 with with_reporter(fragments_reported.append):
597 if example.raises is None:
598 execute_example()
599 else:
600 # @example(...).xfail(...)
601 bits = ", ".join(nicerepr(x) for x in arguments) + ", ".join(
602 f"{k}={nicerepr(v)}" for k, v in example_kwargs.items()
603 )
604 try:
605 execute_example()
606 except failure_exceptions_to_catch() as err:
607 if not isinstance(err, example.raises):
608 raise
609 # Save a string form of this example; we'll warn if it's
610 # ever generated by the strategy (which can't be xfailed)
611 state.xfail_example_reprs.add(
612 repr_call(state.test, arguments, example_kwargs)
613 )
614 except example.raises as err:
615 # We'd usually check this as early as possible, but it's
616 # possible for failure_exceptions_to_catch() to grow when
617 # e.g. pytest is imported between import- and test-time.
618 raise InvalidArgument(
619 f"@example({bits}) raised an expected {err!r}, "
620 "but Hypothesis does not treat this as a test failure"
621 ) from err
622 else:
623 # Unexpectedly passing; always raise an error in this case.
624 reason = f" because {example.reason}" * bool(example.reason)
625 if example.raises is BaseException:
626 name = "exception" # special-case no raises= arg
627 elif not isinstance(example.raises, tuple):
628 name = example.raises.__name__
629 elif len(example.raises) == 1:
630 name = example.raises[0].__name__
631 else:
632 name = (
633 ", ".join(ex.__name__ for ex in example.raises[:-1])
634 + f", or {example.raises[-1].__name__}"
635 )
636 vowel = name.upper()[0] in "AEIOU"
637 raise AssertionError(
638 f"Expected a{'n' * vowel} {name} from @example({bits})"
639 f"{reason}, but no exception was raised."
640 )
641 except UnsatisfiedAssumption:
642 # Odd though it seems, we deliberately support explicit examples that
643 # are then rejected by a call to `assume()`. As well as iterative
644 # development, this is rather useful to replay Hypothesis' part of
645 # a saved failure when other arguments are supplied by e.g. pytest.
646 # See https://github.com/HypothesisWorks/hypothesis/issues/2125
647 with contextlib.suppress(StopTest):
648 empty_data.conclude_test(Status.INVALID)
649 except BaseException as err:
650 # In order to support reporting of multiple failing examples, we yield
651 # each of the (report text, error) pairs we find back to the top-level
652 # runner. This also ensures that user-facing stack traces have as few
653 # frames of Hypothesis internals as possible.
654 err = err.with_traceback(get_trimmed_traceback())
656 # One user error - whether misunderstanding or typo - we've seen a few
657 # times is to pass strategies to @example() where values are expected.
658 # Checking is easy, and false-positives not much of a problem, so:
659 if isinstance(err, failure_exceptions_to_catch()) and any(
660 isinstance(arg, SearchStrategy)
661 for arg in example.args + tuple(example.kwargs.values())
662 ):
663 new = HypothesisWarning(
664 "The @example() decorator expects to be passed values, but "
665 "you passed strategies instead. See https://hypothesis."
666 "readthedocs.io/en/latest/reference/api.html#hypothesis"
667 ".example for details."
668 )
669 new.__cause__ = err
670 err = new
672 with contextlib.suppress(StopTest):
673 empty_data.conclude_test(Status.INVALID)
674 yield ReportableError(fragments_reported, err)
675 if (
676 state.settings.report_multiple_bugs
677 and pytest_shows_exceptiongroups
678 and isinstance(err, failure_exceptions_to_catch())
679 and not isinstance(err, skip_exceptions_to_reraise())
680 ):
681 continue
682 break
683 finally:
684 if fragments_reported:
685 assert fragments_reported[0].startswith("Falsifying example")
686 fragments_reported[0] = fragments_reported[0].replace(
687 "Falsifying example", "Falsifying explicit example", 1
688 )
690 empty_data.freeze()
691 if observability_enabled():
692 tc = make_testcase(
693 run_start=state._start_timestamp,
694 property=state.test_identifier,
695 data=empty_data,
696 how_generated="explicit example",
697 representation=state._string_repr,
698 timing=state._timing_features,
699 )
700 deliver_observation(tc)
702 if fragments_reported:
703 verbose_report(fragments_reported[0].replace("Falsifying", "Trying", 1))
704 for f in fragments_reported[1:]:
705 verbose_report(f)
708def get_random_for_wrapped_test(test, wrapped_test):
709 settings = wrapped_test._hypothesis_internal_use_settings
710 wrapped_test._hypothesis_internal_use_generated_seed = None
712 if wrapped_test._hypothesis_internal_use_seed is not None:
713 return Random(wrapped_test._hypothesis_internal_use_seed)
715 if settings.derandomize:
716 return Random(int_from_bytes(function_digest(test)))
718 if global_force_seed is not None:
719 return Random(global_force_seed)
721 if threadlocal._hypothesis_global_random is None: # pragma: no cover
722 threadlocal._hypothesis_global_random = Random()
723 seed = threadlocal._hypothesis_global_random.getrandbits(128)
724 wrapped_test._hypothesis_internal_use_generated_seed = seed
725 return Random(seed)
728@dataclass(slots=True, frozen=False)
729class Stuff:
730 selfy: Any
731 args: tuple
732 kwargs: dict
733 given_kwargs: dict
736def process_arguments_to_given(
737 wrapped_test: Any,
738 arguments: Sequence[object],
739 kwargs: dict[str, object],
740 given_kwargs: dict[str, SearchStrategy],
741 params: dict[str, Parameter],
742) -> tuple[Sequence[object], dict[str, object], Stuff]:
743 selfy = None
744 arguments, kwargs = convert_positional_arguments(wrapped_test, arguments, kwargs)
746 # If the test function is a method of some kind, the bound object
747 # will be the first named argument if there are any, otherwise the
748 # first vararg (if any).
749 posargs = [p.name for p in params.values() if p.kind is p.POSITIONAL_OR_KEYWORD]
750 if posargs:
751 selfy = kwargs.get(posargs[0])
752 elif arguments:
753 selfy = arguments[0]
755 # Ensure that we don't mistake mocks for self here.
756 # This can cause the mock to be used as the test runner.
757 if is_mock(selfy):
758 selfy = None
760 arguments = tuple(arguments)
762 with ensure_free_stackframes():
763 for k, s in given_kwargs.items():
764 check_strategy(s, name=k)
765 s.validate()
767 stuff = Stuff(selfy=selfy, args=arguments, kwargs=kwargs, given_kwargs=given_kwargs)
769 return arguments, kwargs, stuff
772def skip_exceptions_to_reraise():
773 """Return a tuple of exceptions meaning 'skip this test', to re-raise.
775 This is intended to cover most common test runners; if you would
776 like another to be added please open an issue or pull request adding
777 it to this function and to tests/cover/test_lazy_import.py
778 """
779 # This is a set in case any library simply re-exports another's Skip exception
780 exceptions = set()
781 # We use this sys.modules trick to avoid importing libraries -
782 # you can't be an instance of a type from an unimported module!
783 # This is fast enough that we don't need to cache the result,
784 # and more importantly it avoids possible side-effects :-)
785 if "unittest" in sys.modules:
786 exceptions.add(sys.modules["unittest"].SkipTest)
787 if "_pytest.outcomes" in sys.modules:
788 exceptions.add(sys.modules["_pytest.outcomes"].Skipped)
789 return tuple(sorted(exceptions, key=str))
792def failure_exceptions_to_catch() -> tuple[type[BaseException], ...]:
793 """Return a tuple of exceptions meaning 'this test has failed', to catch.
795 This is intended to cover most common test runners; if you would
796 like another to be added please open an issue or pull request.
797 """
798 # While SystemExit and GeneratorExit are instances of BaseException, we also
799 # expect them to be deterministic - unlike KeyboardInterrupt - and so we treat
800 # them as standard exceptions, check for flakiness, etc.
801 # See https://github.com/HypothesisWorks/hypothesis/issues/2223 for details.
802 exceptions = [Exception, SystemExit, GeneratorExit]
803 if "_pytest.outcomes" in sys.modules:
804 exceptions.append(sys.modules["_pytest.outcomes"].Failed)
805 return tuple(exceptions)
808def new_given_signature(original_sig, given_kwargs):
809 """Make an updated signature for the wrapped test."""
810 return original_sig.replace(
811 parameters=[
812 p
813 for p in original_sig.parameters.values()
814 if not (
815 p.name in given_kwargs
816 and p.kind in (p.POSITIONAL_OR_KEYWORD, p.KEYWORD_ONLY)
817 )
818 ],
819 return_annotation=None,
820 )
823def default_executor(data, function):
824 return function(data)
827def get_executor(runner):
828 try:
829 execute_example = runner.execute_example
830 except AttributeError:
831 pass
832 else:
833 return lambda data, function: execute_example(partial(function, data))
835 if hasattr(runner, "setup_example") or hasattr(runner, "teardown_example"):
836 setup = getattr(runner, "setup_example", None) or (lambda: None)
837 teardown = getattr(runner, "teardown_example", None) or (lambda ex: None)
839 def execute(data, function):
840 token = None
841 try:
842 token = setup()
843 return function(data)
844 finally:
845 teardown(token)
847 return execute
849 return default_executor
852# This function is a crude solution, a better way of resolving it would probably
853# be to rewrite a bunch of exception handlers to use except*.
854T = TypeVar("T", bound=BaseException)
857def _flatten_group(excgroup: BaseExceptionGroup[T]) -> list[T]:
858 found_exceptions: list[T] = []
859 for exc in excgroup.exceptions:
860 if isinstance(exc, BaseExceptionGroup):
861 found_exceptions.extend(_flatten_group(exc))
862 else:
863 found_exceptions.append(exc)
864 return found_exceptions
867@contextlib.contextmanager
868def unwrap_markers_from_group() -> Generator[None, None, None]:
869 try:
870 yield
871 except BaseExceptionGroup as excgroup:
872 _frozen_exceptions, non_frozen_exceptions = excgroup.split(Frozen)
874 # group only contains Frozen, reraise the group
875 # it doesn't matter what we raise, since any exceptions get disregarded
876 # and reraised as StopTest if data got frozen.
877 if non_frozen_exceptions is None:
878 raise
879 # in all other cases they are discarded
881 # Can RewindRecursive end up in this group?
882 _, user_exceptions = non_frozen_exceptions.split(
883 lambda e: isinstance(e, (StopTest, HypothesisException))
884 )
886 # this might contain marker exceptions, or internal errors, but not frozen.
887 if user_exceptions is not None:
888 raise
890 # single marker exception - reraise it
891 flattened_non_frozen_exceptions: list[BaseException] = _flatten_group(
892 non_frozen_exceptions
893 )
894 if len(flattened_non_frozen_exceptions) == 1:
895 e = flattened_non_frozen_exceptions[0]
896 # preserve the cause of the original exception to not hinder debugging
897 # note that __context__ is still lost though
898 raise e from e.__cause__
900 # multiple marker exceptions. If we re-raise the whole group we break
901 # a bunch of logic so ....?
902 stoptests, non_stoptests = non_frozen_exceptions.split(StopTest)
904 # TODO: stoptest+hypothesisexception ...? Is it possible? If so, what do?
906 if non_stoptests:
907 # TODO: multiple marker exceptions is easy to produce, but the logic in the
908 # engine does not handle it... so we just reraise the first one for now.
909 e = _flatten_group(non_stoptests)[0]
910 raise e from e.__cause__
911 assert stoptests is not None
913 # multiple stoptests: raising the one with the lowest testcounter
914 raise min(_flatten_group(stoptests), key=lambda s_e: s_e.testcounter)
917class StateForActualGivenExecution:
918 def __init__(
919 self, stuff, test, settings, random, wrapped_test, *, thread_overlap=None
920 ):
921 self.stuff = stuff
922 self.test = test
923 self.settings = settings
924 self.random = random
925 self.wrapped_test = wrapped_test
926 self.thread_overlap = {} if thread_overlap is None else thread_overlap
928 self.test_runner = get_executor(stuff.selfy)
929 self.print_given_args = getattr(
930 wrapped_test, "_hypothesis_internal_print_given_args", True
931 )
933 self.last_exception = None
934 self.falsifying_examples = ()
935 self.ever_executed = False
936 self.xfail_example_reprs = set()
937 self.files_to_propagate = set()
938 self.failed_normally = False
939 self.failed_due_to_deadline = False
941 self.explain_traces = defaultdict(set)
942 self._start_timestamp = time.time()
943 self._string_repr = ""
944 self._timing_features = {}
946 @property
947 def test_identifier(self) -> str:
948 return getattr(
949 current_pytest_item.value, "nodeid", None
950 ) or get_pretty_function_description(self.wrapped_test)
952 def _should_trace(self):
953 # NOTE: we explicitly support monkeypatching this. Keep the namespace
954 # access intact.
955 _trace_obs = (
956 observability_enabled() and observability.OBSERVABILITY_COLLECT_COVERAGE
957 )
958 _trace_failure = (
959 self.failed_normally
960 and not self.failed_due_to_deadline
961 and {Phase.shrink, Phase.explain}.issubset(self.settings.phases)
962 )
963 return _trace_obs or _trace_failure
965 def execute_once(
966 self,
967 data,
968 *,
969 print_example=False,
970 is_final=False,
971 expected_failure=None,
972 example_kwargs=None,
973 ):
974 """Run the test function once, using ``data`` as input.
976 If the test raises an exception, it will propagate through to the
977 caller of this method. Depending on its type, this could represent
978 an ordinary test failure, or a fatal error, or a control exception.
980 If this method returns normally, the test might have passed, or
981 it might have placed ``data`` in an unsuccessful state and then
982 swallowed the corresponding control exception.
983 """
985 self.ever_executed = True
987 self._string_repr = ""
988 text_repr = None
989 if self.settings.deadline is None and not observability_enabled():
991 @proxies(self.test)
992 def test(*args, **kwargs):
993 with unwrap_markers_from_group(), ensure_free_stackframes():
994 return self.test(*args, **kwargs)
996 else:
998 @proxies(self.test)
999 def test(*args, **kwargs):
1000 arg_drawtime = math.fsum(data.draw_times.values())
1001 arg_stateful = math.fsum(data._stateful_run_times.values())
1002 arg_gctime = gc_cumulative_time()
1003 with unwrap_markers_from_group(), ensure_free_stackframes():
1004 start = time.perf_counter()
1005 try:
1006 result = self.test(*args, **kwargs)
1007 finally:
1008 finish = time.perf_counter()
1009 in_drawtime = math.fsum(data.draw_times.values()) - arg_drawtime
1010 in_stateful = (
1011 math.fsum(data._stateful_run_times.values()) - arg_stateful
1012 )
1013 in_gctime = gc_cumulative_time() - arg_gctime
1014 runtime = finish - start - in_drawtime - in_stateful - in_gctime
1015 self._timing_features = {
1016 "execute:test": runtime,
1017 "overall:gc": in_gctime,
1018 **data.draw_times,
1019 **data._stateful_run_times,
1020 }
1022 if (
1023 (current_deadline := self.settings.deadline) is not None
1024 # we disable the deadline check under concurrent threads, since
1025 # cpython may switch away from a thread for arbitrarily long.
1026 and not self.thread_overlap.get(threading.get_ident(), False)
1027 ):
1028 if not is_final:
1029 current_deadline = (current_deadline // 4) * 5
1030 if runtime >= current_deadline.total_seconds():
1031 raise DeadlineExceeded(
1032 datetime.timedelta(seconds=runtime), self.settings.deadline
1033 )
1034 return result
1036 def run(data: ConjectureData) -> None:
1037 # Set up dynamic context needed by a single test run.
1038 if self.stuff.selfy is not None:
1039 data.hypothesis_runner = self.stuff.selfy
1040 # Generate all arguments to the test function.
1041 args = self.stuff.args
1042 kwargs = dict(self.stuff.kwargs)
1043 if example_kwargs is None:
1044 kw, argslices = context.prep_args_kwargs_from_strategies(
1045 self.stuff.given_kwargs
1046 )
1047 else:
1048 kw = example_kwargs
1049 argslices = {}
1050 kwargs.update(kw)
1051 if expected_failure is not None:
1052 nonlocal text_repr
1053 text_repr = repr_call(test, args, kwargs)
1055 if print_example or current_verbosity() >= Verbosity.verbose:
1056 printer = RepresentationPrinter(context=context)
1057 if print_example:
1058 printer.text("Falsifying example:")
1059 else:
1060 printer.text("Trying example:")
1062 if self.print_given_args:
1063 printer.text(" ")
1064 printer.repr_call(
1065 test.__name__,
1066 args,
1067 kwargs,
1068 force_split=True,
1069 arg_slices=argslices,
1070 leading_comment=(
1071 "# " + context.data.slice_comments[(0, 0)]
1072 if (0, 0) in context.data.slice_comments
1073 else None
1074 ),
1075 avoid_realization=data.provider.avoid_realization,
1076 )
1077 report(printer.getvalue())
1079 if observability_enabled():
1080 printer = RepresentationPrinter(context=context)
1081 printer.repr_call(
1082 test.__name__,
1083 args,
1084 kwargs,
1085 force_split=True,
1086 arg_slices=argslices,
1087 leading_comment=(
1088 "# " + context.data.slice_comments[(0, 0)]
1089 if (0, 0) in context.data.slice_comments
1090 else None
1091 ),
1092 avoid_realization=data.provider.avoid_realization,
1093 )
1094 self._string_repr = printer.getvalue()
1096 try:
1097 return test(*args, **kwargs)
1098 except TypeError as e:
1099 # If we sampled from a sequence of strategies, AND failed with a
1100 # TypeError, *AND that exception mentions SearchStrategy*, add a note:
1101 if (
1102 "SearchStrategy" in str(e)
1103 and data._sampled_from_all_strategies_elements_message is not None
1104 ):
1105 msg, format_arg = data._sampled_from_all_strategies_elements_message
1106 add_note(e, msg.format(format_arg))
1107 raise
1108 finally:
1109 if data._stateful_repr_parts is not None:
1110 self._string_repr = "\n".join(data._stateful_repr_parts)
1112 if observability_enabled():
1113 printer = RepresentationPrinter(context=context)
1114 for name, value in data._observability_args.items():
1115 if name.startswith("generate:Draw "):
1116 try:
1117 value = data.provider.realize(value)
1118 except BackendCannotProceed: # pragma: no cover
1119 value = "<backend failed to realize symbolic>"
1120 printer.text(f"\n{name.removeprefix('generate:')}: ")
1121 printer.pretty(value)
1123 self._string_repr += printer.getvalue()
1125 # self.test_runner can include the execute_example method, or setup/teardown
1126 # _example, so it's important to get the PRNG and build context in place first.
1127 with (
1128 local_settings(self.settings),
1129 deterministic_PRNG(),
1130 BuildContext(
1131 data, is_final=is_final, wrapped_test=self.wrapped_test
1132 ) as context,
1133 ):
1134 # providers may throw in per_case_context_fn, and we'd like
1135 # `result` to still be set in these cases.
1136 result = None
1137 with data.provider.per_test_case_context_manager():
1138 # Run the test function once, via the executor hook.
1139 # In most cases this will delegate straight to `run(data)`.
1140 result = self.test_runner(data, run)
1142 # If a failure was expected, it should have been raised already, so
1143 # instead raise an appropriate diagnostic error.
1144 if expected_failure is not None:
1145 exception, traceback = expected_failure
1146 if isinstance(exception, DeadlineExceeded) and (
1147 runtime_secs := math.fsum(
1148 v
1149 for k, v in self._timing_features.items()
1150 if k.startswith("execute:")
1151 )
1152 ):
1153 report(
1154 "Unreliable test timings! On an initial run, this "
1155 f"test took {exception.runtime.total_seconds() * 1000:.2f}ms, "
1156 "which exceeded the deadline of "
1157 f"{self.settings.deadline.total_seconds() * 1000:.2f}ms, but "
1158 f"on a subsequent run it took {runtime_secs * 1000:.2f} ms, "
1159 "which did not. If you expect this sort of "
1160 "variability in your test timings, consider turning "
1161 "deadlines off for this test by setting deadline=None."
1162 )
1163 else:
1164 report("Failed to reproduce exception. Expected: \n" + traceback)
1165 raise FlakyFailure(
1166 f"Hypothesis {text_repr} produces unreliable results: "
1167 "Falsified on the first call but did not on a subsequent one",
1168 [exception],
1169 )
1170 return result
1172 def _flaky_replay_to_failure(
1173 self, err: FlakyReplay, context: BaseException
1174 ) -> FlakyFailure:
1175 # Note that in the mark_interesting case, _context_ itself
1176 # is part of err._interesting_examples - but it's not in
1177 # _runner.interesting_examples - this is fine, as the context
1178 # (i.e., immediate exception) is appended.
1179 interesting_examples = [
1180 self._runner.interesting_examples[origin]
1181 for origin in err._interesting_origins
1182 if origin in self._runner.interesting_examples
1183 ]
1184 exceptions = [result.expected_exception for result in interesting_examples]
1185 exceptions.append(context) # the immediate exception
1186 return FlakyFailure(err.reason, exceptions)
1188 def _execute_once_for_engine(self, data: ConjectureData) -> None:
1189 """Wrapper around ``execute_once`` that intercepts test failure
1190 exceptions and single-test control exceptions, and turns them into
1191 appropriate method calls to `data` instead.
1193 This allows the engine to assume that any exception other than
1194 ``StopTest`` must be a fatal error, and should stop the entire engine.
1195 """
1196 trace: Trace = set()
1197 try:
1198 with Tracer(should_trace=self._should_trace()) as tracer:
1199 try:
1200 result = self.execute_once(data)
1201 if (
1202 data.status == Status.VALID and tracer.branches
1203 ): # pragma: no cover
1204 # This is in fact covered by our *non-coverage* tests, but due
1205 # to the settrace() contention *not* by our coverage tests.
1206 self.explain_traces[None].add(frozenset(tracer.branches))
1207 finally:
1208 trace = tracer.branches
1209 if result is not None:
1210 fail_health_check(
1211 self.settings,
1212 "Tests run under @given should return None, but "
1213 f"{self.test.__name__} returned {result!r} instead.",
1214 HealthCheck.return_value,
1215 )
1216 except UnsatisfiedAssumption as e:
1217 # An "assume" check failed, so instead we inform the engine that
1218 # this test run was invalid.
1219 try:
1220 data.mark_invalid(e.reason)
1221 except FlakyReplay as err:
1222 # This was unexpected, meaning that the assume was flaky.
1223 # Report it as such.
1224 raise self._flaky_replay_to_failure(err, e) from None
1225 except (StopTest, BackendCannotProceed):
1226 # The engine knows how to handle this control exception, so it's
1227 # OK to re-raise it.
1228 raise
1229 except (
1230 FailedHealthCheck,
1231 *skip_exceptions_to_reraise(),
1232 ):
1233 # These are fatal errors or control exceptions that should stop the
1234 # engine, so we re-raise them.
1235 raise
1236 except failure_exceptions_to_catch() as e:
1237 # If an unhandled (i.e., non-Hypothesis) error was raised by
1238 # Hypothesis-internal code, re-raise it as a fatal error instead
1239 # of treating it as a test failure.
1240 if isinstance(e, BaseExceptionGroup) and len(e.exceptions) == 1:
1241 # When a naked exception is implicitly wrapped in an ExceptionGroup
1242 # due to a re-raising "except*", the ExceptionGroup is constructed in
1243 # the caller's stack frame (see #4183). This workaround is specifically
1244 # for implicit wrapping of naked exceptions by "except*", since explicit
1245 # raising of ExceptionGroup gets the proper traceback in the first place
1246 # - there's no need to handle hierarchical groups here, at least if no
1247 # such implicit wrapping happens inside hypothesis code (we only care
1248 # about the hypothesis-or-not distinction).
1249 #
1250 # 01-25-2025: this was patched to give the correct
1251 # stacktrace in cpython https://github.com/python/cpython/issues/128799.
1252 # can remove once python3.11 is EOL.
1253 tb = e.exceptions[0].__traceback__ or e.__traceback__
1254 else:
1255 tb = e.__traceback__
1256 filepath = traceback.extract_tb(tb)[-1][0]
1257 if (
1258 is_hypothesis_file(filepath)
1259 and not isinstance(e, HypothesisException)
1260 # We expect backend authors to use the provider_conformance test
1261 # to test their backends. If an error occurs there, it is probably
1262 # from their backend, and we would like to treat it as a standard
1263 # error, not a hypothesis-internal error.
1264 and not filepath.endswith(
1265 f"internal{os.sep}conjecture{os.sep}provider_conformance.py"
1266 )
1267 ):
1268 raise
1270 if data.frozen:
1271 # This can happen if an error occurred in a finally
1272 # block somewhere, suppressing our original StopTest.
1273 # We raise a new one here to resume normal operation.
1274 raise StopTest(data.testcounter) from e
1275 else:
1276 # The test failed by raising an exception, so we inform the
1277 # engine that this test run was interesting. This is the normal
1278 # path for test runs that fail.
1279 tb = get_trimmed_traceback()
1280 data.expected_traceback = format_exception(e, tb)
1281 data.expected_exception = e
1282 assert data.expected_traceback is not None # for mypy
1283 verbose_report(data.expected_traceback)
1285 self.failed_normally = True
1287 interesting_origin = InterestingOrigin.from_exception(e)
1288 if trace: # pragma: no cover
1289 # Trace collection is explicitly disabled under coverage.
1290 self.explain_traces[interesting_origin].add(frozenset(trace))
1291 if interesting_origin.exc_type == DeadlineExceeded:
1292 self.failed_due_to_deadline = True
1293 self.explain_traces.clear()
1294 try:
1295 data.mark_interesting(interesting_origin)
1296 except FlakyReplay as err:
1297 raise self._flaky_replay_to_failure(err, e) from None
1299 finally:
1300 # Conditional here so we can save some time constructing the payload; in
1301 # other cases (without coverage) it's cheap enough to do that regardless.
1302 #
1303 # Note that we have to unconditionally realize data.events, because
1304 # the statistics reported by the pytest plugin use a different flow
1305 # than observability, but still access symbolic events.
1307 try:
1308 data.events = data.provider.realize(data.events)
1309 except BackendCannotProceed:
1310 data.events = {}
1312 if observability_enabled():
1313 if runner := getattr(self, "_runner", None):
1314 phase = runner._current_phase
1315 else: # pragma: no cover # in case of messing with internals
1316 if self.failed_normally or self.failed_due_to_deadline:
1317 phase = "shrink"
1318 else:
1319 phase = "unknown"
1320 backend_desc = f", using backend={self.settings.backend!r}" * (
1321 self.settings.backend != "hypothesis"
1322 and not getattr(runner, "_switch_to_hypothesis_provider", False)
1323 )
1324 try:
1325 data._observability_args = data.provider.realize(
1326 data._observability_args
1327 )
1328 except BackendCannotProceed:
1329 data._observability_args = {}
1331 try:
1332 self._string_repr = data.provider.realize(self._string_repr)
1333 except BackendCannotProceed:
1334 self._string_repr = "<backend failed to realize symbolic arguments>"
1336 data.freeze()
1337 tc = make_testcase(
1338 run_start=self._start_timestamp,
1339 property=self.test_identifier,
1340 data=data,
1341 how_generated=f"during {phase} phase{backend_desc}",
1342 representation=self._string_repr,
1343 arguments=data._observability_args,
1344 timing=self._timing_features,
1345 coverage=tractable_coverage_report(trace) or None,
1346 phase=phase,
1347 backend_metadata=data.provider.observe_test_case(),
1348 )
1349 deliver_observation(tc)
1351 for msg in data.provider.observe_information_messages(
1352 lifetime="test_case"
1353 ):
1354 self._deliver_information_message(**msg)
1355 self._timing_features = {}
1357 def _deliver_information_message(
1358 self, *, type: InfoObservationType, title: str, content: str | dict
1359 ) -> None:
1360 deliver_observation(
1361 InfoObservation(
1362 type=type,
1363 run_start=self._start_timestamp,
1364 property=self.test_identifier,
1365 title=title,
1366 content=content,
1367 )
1368 )
1370 def run_engine(self):
1371 """Run the test function many times, on database input and generated
1372 input, using the Conjecture engine.
1373 """
1374 # Tell pytest to omit the body of this function from tracebacks
1375 __tracebackhide__ = True
1376 try:
1377 database_key = self.wrapped_test._hypothesis_internal_database_key
1378 except AttributeError:
1379 if global_force_seed is None:
1380 database_key = function_digest(self.test)
1381 else:
1382 database_key = None
1384 runner = self._runner = ConjectureRunner(
1385 self._execute_once_for_engine,
1386 settings=self.settings,
1387 random=self.random,
1388 database_key=database_key,
1389 thread_overlap=self.thread_overlap,
1390 )
1391 # Use the Conjecture engine to run the test function many times
1392 # on different inputs.
1393 runner.run()
1394 note_statistics(runner.statistics)
1395 if observability_enabled():
1396 self._deliver_information_message(
1397 type="info",
1398 title="Hypothesis Statistics",
1399 content=describe_statistics(runner.statistics),
1400 )
1401 for msg in (
1402 p if isinstance(p := runner.provider, PrimitiveProvider) else p(None)
1403 ).observe_information_messages(lifetime="test_function"):
1404 self._deliver_information_message(**msg)
1406 if runner.call_count == 0:
1407 return
1408 if runner.interesting_examples:
1409 self.falsifying_examples = sorted(
1410 runner.interesting_examples.values(),
1411 key=lambda d: sort_key(d.nodes),
1412 reverse=True,
1413 )
1414 else:
1415 if runner.valid_examples == 0:
1416 explanations = []
1417 # use a somewhat arbitrary cutoff to avoid recommending spurious
1418 # fixes.
1419 # eg, a few invalid examples from internal filters when the
1420 # problem is the user generating large inputs, or a
1421 # few overruns during internal mutation when the problem is
1422 # impossible user filters/assumes.
1423 if runner.invalid_examples > min(20, runner.call_count // 5):
1424 explanations.append(
1425 f"{runner.invalid_examples} of {runner.call_count} "
1426 "examples failed a .filter() or assume() condition. Try "
1427 "making your filters or assumes less strict, or rewrite "
1428 "using strategy parameters: "
1429 "st.integers().filter(lambda x: x > 0) fails less often "
1430 "(that is, never) when rewritten as st.integers(min_value=1)."
1431 )
1432 if runner.overrun_examples > min(20, runner.call_count // 5):
1433 explanations.append(
1434 f"{runner.overrun_examples} of {runner.call_count} "
1435 "examples were too large to finish generating; try "
1436 "reducing the typical size of your inputs?"
1437 )
1438 rep = get_pretty_function_description(self.test)
1439 raise Unsatisfiable(
1440 f"Unable to satisfy assumptions of {rep}. "
1441 f"{' Also, '.join(explanations)}"
1442 )
1444 # If we have not traced executions, warn about that now (but only when
1445 # we'd expect to do so reliably, i.e. on CPython>=3.12)
1446 if (
1447 hasattr(sys, "monitoring")
1448 and not PYPY
1449 and self._should_trace()
1450 and not Tracer.can_trace()
1451 ): # pragma: no cover
1452 # actually covered by our tests, but only on >= 3.12
1453 warnings.warn(
1454 "avoiding tracing test function because tool id "
1455 f"{MONITORING_TOOL_ID} is already taken by tool "
1456 f"{sys.monitoring.get_tool(MONITORING_TOOL_ID)}.",
1457 HypothesisWarning,
1458 stacklevel=3,
1459 )
1461 if not self.falsifying_examples:
1462 return
1463 elif not (self.settings.report_multiple_bugs and pytest_shows_exceptiongroups):
1464 # Pretend that we only found one failure, by discarding the others.
1465 del self.falsifying_examples[:-1]
1467 # The engine found one or more failures, so we need to reproduce and
1468 # report them.
1470 errors_to_report = []
1472 report_lines = describe_targets(runner.best_observed_targets)
1473 if report_lines:
1474 report_lines.append("")
1476 explanations = explanatory_lines(self.explain_traces, self.settings)
1477 for falsifying_example in self.falsifying_examples:
1478 fragments = []
1480 ran_example = runner.new_conjecture_data(
1481 falsifying_example.choices, max_choices=len(falsifying_example.choices)
1482 )
1483 ran_example.slice_comments = falsifying_example.slice_comments
1484 tb = None
1485 origin = None
1486 assert falsifying_example.expected_exception is not None
1487 assert falsifying_example.expected_traceback is not None
1488 try:
1489 with with_reporter(fragments.append):
1490 self.execute_once(
1491 ran_example,
1492 print_example=True,
1493 is_final=True,
1494 expected_failure=(
1495 falsifying_example.expected_exception,
1496 falsifying_example.expected_traceback,
1497 ),
1498 )
1499 except StopTest as e:
1500 # Link the expected exception from the first run. Not sure
1501 # how to access the current exception, if it failed
1502 # differently on this run. In fact, in the only known
1503 # reproducer, the StopTest is caused by OVERRUN before the
1504 # test is even executed. Possibly because all initial examples
1505 # failed until the final non-traced replay, and something was
1506 # exhausted? Possibly a FIXME, but sufficiently weird to
1507 # ignore for now.
1508 err = FlakyFailure(
1509 "Inconsistent results: An example failed on the "
1510 "first run but now succeeds (or fails with another "
1511 "error, or is for some reason not runnable).",
1512 # (note: e is a BaseException)
1513 [falsifying_example.expected_exception or e],
1514 )
1515 errors_to_report.append(ReportableError(fragments, err))
1516 except UnsatisfiedAssumption as e: # pragma: no cover # ironically flaky
1517 err = FlakyFailure(
1518 "Unreliable assumption: An example which satisfied "
1519 "assumptions on the first run now fails it.",
1520 [e],
1521 )
1522 errors_to_report.append(ReportableError(fragments, err))
1523 except BaseException as e:
1524 # If we have anything for explain-mode, this is the time to report.
1525 fragments.extend(explanations[falsifying_example.interesting_origin])
1526 error_with_tb = e.with_traceback(get_trimmed_traceback())
1527 errors_to_report.append(ReportableError(fragments, error_with_tb))
1528 tb = format_exception(e, get_trimmed_traceback(e))
1529 origin = InterestingOrigin.from_exception(e)
1530 else:
1531 # execute_once() will always raise either the expected error, or Flaky.
1532 raise NotImplementedError("This should be unreachable")
1533 finally:
1534 ran_example.freeze()
1535 if observability_enabled():
1536 # log our observability line for the final failing example
1537 tc = make_testcase(
1538 run_start=self._start_timestamp,
1539 property=self.test_identifier,
1540 data=ran_example,
1541 how_generated="minimal failing example",
1542 representation=self._string_repr,
1543 arguments=ran_example._observability_args,
1544 timing=self._timing_features,
1545 coverage=None, # Not recorded when we're replaying the MFE
1546 status="passed" if sys.exc_info()[0] else "failed",
1547 status_reason=str(origin or "unexpected/flaky pass"),
1548 metadata={"traceback": tb},
1549 )
1550 deliver_observation(tc)
1552 # Whether or not replay actually raised the exception again, we want
1553 # to print the reproduce_failure decorator for the failing example.
1554 if self.settings.print_blob:
1555 fragments.append(
1556 "\nYou can reproduce this example by temporarily adding "
1557 f"{reproduction_decorator(falsifying_example.choices)} "
1558 "as a decorator on your test case"
1559 )
1561 _raise_to_user(
1562 errors_to_report,
1563 self.settings,
1564 report_lines,
1565 # A backend might report a failure and then report verified afterwards,
1566 # which is to be interpreted as "there are no more failures *other
1567 # than what we already reported*". Do not report this as unsound.
1568 unsound_backend=(
1569 runner._verified_by
1570 if runner._verified_by and not runner._backend_found_failure
1571 else None
1572 ),
1573 )
1576def _simplify_explicit_errors(errors: list[ReportableError]) -> list[ReportableError]:
1577 """
1578 Group explicit example errors by their InterestingOrigin, keeping only the
1579 simplest one, and adding a note of how many other examples failed with the same
1580 error.
1581 """
1582 by_origin: dict[InterestingOrigin, list[ReportableError]] = defaultdict(list)
1583 for error in errors:
1584 origin = InterestingOrigin.from_exception(error.exception)
1585 by_origin[origin].append(error)
1587 result = []
1588 for group in by_origin.values():
1589 if len(group) == 1:
1590 result.append(group[0])
1591 else:
1592 # Sort by shortlex of representation (first fragment)
1593 def shortlex_key(error):
1594 repr_str = error.fragments[0] if error.fragments else ""
1595 return (len(repr_str), repr_str)
1597 sorted_group = sorted(group, key=shortlex_key)
1598 simplest = sorted_group[0]
1599 other_count = len(group) - 1
1600 add_note(
1601 simplest.exception,
1602 f"(note: {other_count} other explicit example{'s' * (other_count > 1)} "
1603 "also failed with this error; use Verbosity.verbose to view)",
1604 )
1605 result.append(simplest)
1607 return result
1610def _raise_to_user(
1611 errors_to_report, settings, target_lines, trailer="", *, unsound_backend=None
1612):
1613 """Helper function for attaching notes and grouping multiple errors."""
1614 failing_prefix = "Falsifying example: "
1615 ls = []
1616 for error in errors_to_report:
1617 for note in error.fragments:
1618 add_note(error.exception, note)
1619 if note.startswith(failing_prefix):
1620 ls.append(note.removeprefix(failing_prefix))
1621 if current_pytest_item.value:
1622 current_pytest_item.value._hypothesis_failing_examples = ls
1624 if len(errors_to_report) == 1:
1625 the_error_hypothesis_found = errors_to_report[0].exception
1626 else:
1627 assert errors_to_report
1628 the_error_hypothesis_found = BaseExceptionGroup(
1629 f"Hypothesis found {len(errors_to_report)} distinct failures{trailer}.",
1630 [error.exception for error in errors_to_report],
1631 )
1633 if settings.verbosity >= Verbosity.normal:
1634 for line in target_lines:
1635 add_note(the_error_hypothesis_found, line)
1637 if unsound_backend:
1638 add_note(
1639 the_error_hypothesis_found,
1640 f"backend={unsound_backend!r} claimed to verify this test passes - "
1641 "please send them a bug report!",
1642 )
1644 raise the_error_hypothesis_found
1647@contextlib.contextmanager
1648def fake_subTest(self, msg=None, **__):
1649 """Monkeypatch for `unittest.TestCase.subTest` during `@given`.
1651 If we don't patch this out, each failing example is reported as a
1652 separate failing test by the unittest test runner, which is
1653 obviously incorrect. We therefore replace it for the duration with
1654 this version.
1655 """
1656 warnings.warn(
1657 "subTest per-example reporting interacts badly with Hypothesis "
1658 "trying hundreds of examples, so we disable it for the duration of "
1659 "any test that uses `@given`.",
1660 HypothesisWarning,
1661 stacklevel=2,
1662 )
1663 yield
1666@dataclass(slots=False, frozen=False)
1667class HypothesisHandle:
1668 """This object is provided as the .hypothesis attribute on @given tests.
1670 Downstream users can reassign its attributes to insert custom logic into
1671 the execution of each case, for example by converting an async into a
1672 sync function.
1674 This must be an attribute of an attribute, because reassignment of a
1675 first-level attribute would not be visible to Hypothesis if the function
1676 had been decorated before the assignment.
1678 See https://github.com/HypothesisWorks/hypothesis/issues/1257 for more
1679 information.
1680 """
1682 inner_test: Any
1683 _get_fuzz_target: Any
1684 _given_kwargs: Any
1686 @property
1687 def fuzz_one_input(
1688 self,
1689 ) -> Callable[[bytes | bytearray | memoryview | BinaryIO], bytes | None]:
1690 """
1691 Run the test as a fuzz target, driven with the ``buffer`` of bytes.
1693 Depending on the passed ``buffer`` one of three things will happen:
1695 * If the bytestring was invalid, for example because it was too short or was
1696 filtered out by |assume| or |.filter|, |fuzz_one_input| returns ``None``.
1697 * If the bytestring was valid and the test passed, |fuzz_one_input| returns
1698 a canonicalised and pruned bytestring which will replay that test case.
1699 This is provided as an option to improve the performance of mutating
1700 fuzzers, but can safely be ignored.
1701 * If the test *failed*, i.e. raised an exception, |fuzz_one_input| will
1702 add the pruned buffer to :ref:`the Hypothesis example database <database>`
1703 and then re-raise that exception. All you need to do to reproduce,
1704 minimize, and de-duplicate all the failures found via fuzzing is run
1705 your test suite!
1707 To reduce the performance impact of database writes, |fuzz_one_input| only
1708 records failing inputs which would be valid shrinks for a known failure -
1709 meaning writes are somewhere between constant and log(N) rather than linear
1710 in runtime. However, this tracking only works within a persistent fuzzing
1711 process; for forkserver fuzzers we recommend ``database=None`` for the main
1712 run, and then replaying with a database enabled if you need to analyse
1713 failures.
1715 Note that the interpretation of both input and output bytestrings is
1716 specific to the exact version of Hypothesis you are using and the strategies
1717 given to the test, just like the :ref:`database <database>` and
1718 |@reproduce_failure|.
1720 Interaction with |@settings|
1721 ----------------------------
1723 |fuzz_one_input| uses just enough of Hypothesis' internals to drive your
1724 test function with a bytestring, and most settings therefore have no effect
1725 in this mode. We recommend running your tests the usual way before fuzzing
1726 to get the benefits of health checks, as well as afterwards to replay,
1727 shrink, deduplicate, and report whatever errors were discovered.
1729 * |settings.database| *is* used by |fuzz_one_input| - adding failures to
1730 the database to be replayed when
1731 you next run your tests is our preferred reporting mechanism and response
1732 to `the 'fuzzer taming' problem <https://blog.regehr.org/archives/925>`__.
1733 * |settings.verbosity| and |settings.stateful_step_count| work as usual.
1734 * The |~settings.deadline|, |~settings.derandomize|, |~settings.max_examples|,
1735 |~settings.phases|, |~settings.print_blob|, |~settings.report_multiple_bugs|,
1736 and |~settings.suppress_health_check| settings do not affect |fuzz_one_input|.
1738 Example Usage
1739 -------------
1741 .. code-block:: python
1743 @given(st.text())
1744 def test_foo(s): ...
1746 # This is a traditional fuzz target - call it with a bytestring,
1747 # or a binary IO object, and it runs the test once.
1748 fuzz_target = test_foo.hypothesis.fuzz_one_input
1750 # For example:
1751 fuzz_target(b"\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00")
1752 fuzz_target(io.BytesIO(b"\\x01"))
1754 .. tip::
1756 If you expect to discover many failures while using |fuzz_one_input|,
1757 consider wrapping your database with |BackgroundWriteDatabase|, for
1758 low-overhead writes of failures.
1760 .. tip::
1762 | Want an integrated workflow for your team's local tests, CI, and continuous fuzzing?
1763 | Use `HypoFuzz <https://hypofuzz.com/>`__ to fuzz your whole test suite, and find more bugs with the same tests!
1765 .. seealso::
1767 See also the :doc:`/how-to/external-fuzzers` how-to.
1768 """
1769 # Note: most users, if they care about fuzzer performance, will access the
1770 # property and assign it to a local variable to move the attribute lookup
1771 # outside their fuzzing loop / before the fork point. We cache it anyway,
1772 # so that naive or unusual use-cases get the best possible performance too.
1773 try:
1774 return self.__cached_target # type: ignore
1775 except AttributeError:
1776 self.__cached_target = self._get_fuzz_target()
1777 return self.__cached_target
1780@overload
1781def given(
1782 _: EllipsisType, /
1783) -> Callable[
1784 [Callable[..., Coroutine[Any, Any, None] | None]], Callable[[], None]
1785]: # pragma: no cover
1786 ...
1789@overload
1790def given(
1791 *_given_arguments: SearchStrategy[Any],
1792) -> Callable[
1793 [Callable[..., Coroutine[Any, Any, None] | None]], Callable[..., None]
1794]: # pragma: no cover
1795 ...
1798@overload
1799def given(
1800 **_given_kwargs: SearchStrategy[Any] | EllipsisType,
1801) -> Callable[
1802 [Callable[..., Coroutine[Any, Any, None] | None]], Callable[..., None]
1803]: # pragma: no cover
1804 ...
1807def given(
1808 *_given_arguments: SearchStrategy[Any] | EllipsisType,
1809 **_given_kwargs: SearchStrategy[Any] | EllipsisType,
1810) -> Callable[[Callable[..., Coroutine[Any, Any, None] | None]], Callable[..., None]]:
1811 """
1812 The |@given| decorator turns a function into a Hypothesis test. This is the
1813 main entry point to Hypothesis.
1815 .. seealso::
1817 See also the :doc:`/tutorial/introduction` tutorial, which introduces
1818 defining Hypothesis tests with |@given|.
1820 .. _given-arguments:
1822 Arguments to ``@given``
1823 -----------------------
1825 Arguments to |@given| may be either positional or keyword arguments:
1827 .. code-block:: python
1829 @given(st.integers(), st.floats())
1830 def test_one(x, y):
1831 pass
1833 @given(x=st.integers(), y=st.floats())
1834 def test_two(x, y):
1835 pass
1837 If using keyword arguments, the arguments may appear in any order, as with
1838 standard Python functions:
1840 .. code-block:: python
1842 # different order, but still equivalent to before
1843 @given(y=st.floats(), x=st.integers())
1844 def test(x, y):
1845 assert isinstance(x, int)
1846 assert isinstance(y, float)
1848 If |@given| is provided fewer positional arguments than the decorated test,
1849 the test arguments are filled in on the right side, leaving the leftmost
1850 positional arguments unfilled:
1852 .. code-block:: python
1854 @given(st.integers(), st.floats())
1855 def test(manual_string, y, z):
1856 assert manual_string == "x"
1857 assert isinstance(y, int)
1858 assert isinstance(z, float)
1860 # `test` is now a callable which takes one argument `manual_string`
1862 test("x")
1863 # or equivalently:
1864 test(manual_string="x")
1866 The reason for this "from the right" behavior is to support using |@given|
1867 with instance methods, by automatically passing through ``self``:
1869 .. code-block:: python
1871 class MyTest(TestCase):
1872 @given(st.integers())
1873 def test(self, x):
1874 assert isinstance(self, MyTest)
1875 assert isinstance(x, int)
1877 If (and only if) using keyword arguments, |@given| may be combined with
1878 ``**kwargs`` or ``*args``:
1880 .. code-block:: python
1882 @given(x=integers(), y=integers())
1883 def test(x, **kwargs):
1884 assert "y" in kwargs
1886 @given(x=integers(), y=integers())
1887 def test(x, *args, **kwargs):
1888 assert args == ()
1889 assert "x" not in kwargs
1890 assert "y" in kwargs
1892 It is an error to:
1894 * Mix positional and keyword arguments to |@given|.
1895 * Use |@given| with a function that has a default value for an argument.
1896 * Use |@given| with positional arguments with a function that uses ``*args``,
1897 ``**kwargs``, or keyword-only arguments.
1899 The function returned by given has all the same arguments as the original
1900 test, minus those that are filled in by |@given|. See the :ref:`notes on
1901 framework compatibility <framework-compatibility>` for how this interacts
1902 with features of other testing libraries, such as :pypi:`pytest` fixtures.
1903 """
1905 if currently_in_test_context():
1906 fail_health_check(
1907 Settings(),
1908 "Nesting @given tests results in quadratic generation and shrinking "
1909 "behavior, and can usually be more cleanly expressed by replacing the "
1910 "inner function with an st.data() parameter on the outer @given."
1911 "\n\n"
1912 "If it is difficult or impossible to refactor this test to remove the "
1913 "nested @given, you can disable this health check with "
1914 "@settings(suppress_health_check=[HealthCheck.nested_given]) on the "
1915 "outer @given. See "
1916 "https://hypothesis.readthedocs.io/en/latest/reference/api.html#hypothesis.HealthCheck "
1917 "for details.",
1918 HealthCheck.nested_given,
1919 )
1921 def run_test_as_given(test):
1922 if inspect.isclass(test):
1923 # Provide a meaningful error to users, instead of exceptions from
1924 # internals that assume we're dealing with a function.
1925 raise InvalidArgument("@given cannot be applied to a class")
1927 if (
1928 "_pytest" in sys.modules
1929 and "_pytest.fixtures" in sys.modules
1930 and (
1931 tuple(map(int, sys.modules["_pytest"].__version__.split(".")[:2]))
1932 >= (8, 4)
1933 )
1934 and isinstance(
1935 test, sys.modules["_pytest.fixtures"].FixtureFunctionDefinition
1936 )
1937 ): # pragma: no cover # covered by pytest/test_fixtures, but not by cover/
1938 raise InvalidArgument("@given cannot be applied to a pytest fixture")
1940 given_arguments = tuple(_given_arguments)
1941 given_kwargs = dict(_given_kwargs)
1943 original_sig = get_signature(test)
1944 if given_arguments == (Ellipsis,) and not given_kwargs:
1945 # user indicated that they want to infer all arguments
1946 given_kwargs = {
1947 p.name: Ellipsis
1948 for p in original_sig.parameters.values()
1949 if p.kind in (p.POSITIONAL_OR_KEYWORD, p.KEYWORD_ONLY)
1950 }
1951 given_arguments = ()
1953 check_invalid = is_invalid_test(
1954 test, original_sig, given_arguments, given_kwargs
1955 )
1957 # If the argument check found problems, return a dummy test function
1958 # that will raise an error if it is actually called.
1959 if check_invalid is not None:
1960 return check_invalid
1962 # Because the argument check succeeded, we can convert @given's
1963 # positional arguments into keyword arguments for simplicity.
1964 if given_arguments:
1965 assert not given_kwargs
1966 posargs = [
1967 p.name
1968 for p in original_sig.parameters.values()
1969 if p.kind is p.POSITIONAL_OR_KEYWORD
1970 ]
1971 given_kwargs = dict(
1972 list(zip(posargs[::-1], given_arguments[::-1], strict=False))[::-1]
1973 )
1974 # These have been converted, so delete them to prevent accidental use.
1975 del given_arguments
1977 new_signature = new_given_signature(original_sig, given_kwargs)
1979 # Use type information to convert "infer" arguments into appropriate strategies.
1980 if ... in given_kwargs.values():
1981 hints = get_type_hints(test)
1982 for name in [name for name, value in given_kwargs.items() if value is ...]:
1983 if name not in hints:
1984 return _invalid(
1985 f"passed {name}=... for {test.__name__}, but {name} has "
1986 "no type annotation",
1987 test=test,
1988 given_kwargs=given_kwargs,
1989 )
1990 given_kwargs[name] = st.from_type(hints[name])
1992 # only raise if the same thread uses two different executors, not if two
1993 # different threads use different executors.
1994 thread_local = ThreadLocal(prev_self=lambda: not_set)
1995 # maps thread_id to whether that thread overlaps in execution with any
1996 # other thread in this @given. We use this to detect whether an @given is
1997 # being run from multiple different threads at once, which informs
1998 # decisions like whether to raise DeadlineExceeded or HealthCheck.too_slow.
1999 thread_overlap: dict[int, bool] = {}
2000 thread_overlap_lock = Lock()
2002 @impersonate(test)
2003 @define_function_signature(test.__name__, test.__doc__, new_signature)
2004 def wrapped_test(*arguments, **kwargs):
2005 # Tell pytest to omit the body of this function from tracebacks
2006 __tracebackhide__ = True
2007 with thread_overlap_lock:
2008 for overlap_thread_id in thread_overlap:
2009 thread_overlap[overlap_thread_id] = True
2011 threadid = threading.get_ident()
2012 # if there are existing threads when this thread starts, then
2013 # this thread starts at an overlapped state.
2014 has_existing_threads = len(thread_overlap) > 0
2015 thread_overlap[threadid] = has_existing_threads
2017 try:
2018 test = wrapped_test.hypothesis.inner_test
2019 if getattr(test, "is_hypothesis_test", False):
2020 raise InvalidArgument(
2021 f"You have applied @given to the test {test.__name__} more than "
2022 "once, which wraps the test several times and is extremely slow. "
2023 "A similar effect can be gained by combining the arguments "
2024 "of the two calls to given. For example, instead of "
2025 "@given(booleans()) @given(integers()), you could write "
2026 "@given(booleans(), integers())"
2027 )
2029 settings = wrapped_test._hypothesis_internal_use_settings
2030 random = get_random_for_wrapped_test(test, wrapped_test)
2031 arguments, kwargs, stuff = process_arguments_to_given(
2032 wrapped_test,
2033 arguments,
2034 kwargs,
2035 given_kwargs,
2036 new_signature.parameters,
2037 )
2039 if (
2040 inspect.iscoroutinefunction(test)
2041 and get_executor(stuff.selfy) is default_executor
2042 ):
2043 # See https://github.com/HypothesisWorks/hypothesis/issues/3054
2044 # If our custom executor doesn't handle coroutines, or we return an
2045 # awaitable from a non-async-def function, we just rely on the
2046 # return_value health check. This catches most user errors though.
2047 raise InvalidArgument(
2048 "Hypothesis doesn't know how to run async test functions like "
2049 f"{test.__name__}. You'll need to write a custom executor, "
2050 "or use a library like pytest-asyncio or pytest-trio which can "
2051 "handle the translation for you.\n See https://hypothesis."
2052 "readthedocs.io/en/latest/details.html#custom-function-execution"
2053 )
2055 runner = stuff.selfy
2056 if isinstance(stuff.selfy, TestCase) and test.__name__ in dir(TestCase):
2057 fail_health_check(
2058 settings,
2059 f"You have applied @given to the method {test.__name__}, which is "
2060 "used by the unittest runner but is not itself a test. "
2061 "This is not useful in any way.",
2062 HealthCheck.not_a_test_method,
2063 )
2064 if bad_django_TestCase(runner): # pragma: no cover
2065 # Covered by the Django tests, but not the pytest coverage task
2066 raise InvalidArgument(
2067 "You have applied @given to a method on "
2068 f"{type(runner).__qualname__}, but this "
2069 "class does not inherit from the supported versions in "
2070 "`hypothesis.extra.django`. Use the Hypothesis variants "
2071 "to ensure that each example is run in a separate "
2072 "database transaction."
2073 )
2075 nonlocal thread_local
2076 # Check selfy really is self (not e.g. a mock) before we health-check
2077 cur_self = (
2078 stuff.selfy
2079 if getattr(type(stuff.selfy), test.__name__, None) is wrapped_test
2080 else None
2081 )
2082 if thread_local.prev_self is not_set:
2083 thread_local.prev_self = cur_self
2084 elif cur_self is not thread_local.prev_self:
2085 fail_health_check(
2086 settings,
2087 f"The method {test.__qualname__} was called from multiple "
2088 "different executors. This may lead to flaky tests and "
2089 "nonreproducible errors when replaying from database."
2090 "\n\n"
2091 "Unlike most health checks, HealthCheck.differing_executors "
2092 "warns about a correctness issue with your test. We "
2093 "therefore recommend fixing the underlying issue, rather "
2094 "than suppressing this health check. However, if you are "
2095 "confident this health check can be safely disabled, you can "
2096 "do so with "
2097 "@settings(suppress_health_check=[HealthCheck.differing_executors]). "
2098 "See "
2099 "https://hypothesis.readthedocs.io/en/latest/reference/api.html#hypothesis.HealthCheck "
2100 "for details.",
2101 HealthCheck.differing_executors,
2102 )
2104 state = StateForActualGivenExecution(
2105 stuff,
2106 test,
2107 settings,
2108 random,
2109 wrapped_test,
2110 thread_overlap=thread_overlap,
2111 )
2113 # If there was a @reproduce_failure decorator, use it to reproduce
2114 # the error (or complain that we couldn't). Either way, this will
2115 # always raise some kind of error.
2116 if (
2117 reproduce_failure := wrapped_test._hypothesis_internal_use_reproduce_failure
2118 ) is not None:
2119 expected_version, failure = reproduce_failure
2120 if expected_version != __version__:
2121 raise InvalidArgument(
2122 "Attempting to reproduce a failure from a different "
2123 f"version of Hypothesis. This failure is from {expected_version}, but "
2124 f"you are currently running {__version__!r}. Please change your "
2125 "Hypothesis version to a matching one."
2126 )
2127 try:
2128 state.execute_once(
2129 ConjectureData.for_choices(decode_failure(failure)),
2130 print_example=True,
2131 is_final=True,
2132 )
2133 raise DidNotReproduce(
2134 "Expected the test to raise an error, but it "
2135 "completed successfully."
2136 )
2137 except StopTest:
2138 raise DidNotReproduce(
2139 "The shape of the test data has changed in some way "
2140 "from where this blob was defined. Are you sure "
2141 "you're running the same test?"
2142 ) from None
2143 except UnsatisfiedAssumption:
2144 raise DidNotReproduce(
2145 "The test data failed to satisfy an assumption in the "
2146 "test. Have you added it since this blob was generated?"
2147 ) from None
2149 # There was no @reproduce_failure, so start by running any explicit
2150 # examples from @example decorators.
2151 if errors := list(
2152 execute_explicit_examples(
2153 state, wrapped_test, arguments, kwargs, original_sig
2154 )
2155 ):
2156 # If we're not going to report multiple bugs, we would have
2157 # stopped running explicit examples at the first failure.
2158 assert len(errors) == 1 or state.settings.report_multiple_bugs
2160 # If an explicit example raised a 'skip' exception, ensure it's never
2161 # wrapped up in an exception group. Because we break out of the loop
2162 # immediately on finding a skip, if present it's always the last error.
2163 if isinstance(errors[-1].exception, skip_exceptions_to_reraise()):
2164 # Covered by `test_issue_3453_regression`, just in a subprocess.
2165 del errors[:-1] # pragma: no cover
2167 if state.settings.verbosity < Verbosity.verbose:
2168 # keep only one error per interesting origin, unless
2169 # verbosity is high
2170 errors = _simplify_explicit_errors(errors)
2172 _raise_to_user(errors, state.settings, [], " in explicit examples")
2174 # If there were any explicit examples, they all ran successfully.
2175 # The next step is to use the Conjecture engine to run the test on
2176 # many different inputs.
2177 ran_explicit_examples = (
2178 Phase.explicit in state.settings.phases
2179 and getattr(wrapped_test, "hypothesis_explicit_examples", ())
2180 )
2181 SKIP_BECAUSE_NO_EXAMPLES = unittest.SkipTest(
2182 "Hypothesis has been told to run no examples for this test."
2183 )
2184 if not (
2185 Phase.reuse in settings.phases or Phase.generate in settings.phases
2186 ):
2187 if not ran_explicit_examples:
2188 raise SKIP_BECAUSE_NO_EXAMPLES
2189 return
2191 try:
2192 if isinstance(runner, TestCase) and hasattr(runner, "subTest"):
2193 subTest = runner.subTest
2194 try:
2195 runner.subTest = types.MethodType(fake_subTest, runner)
2196 state.run_engine()
2197 finally:
2198 runner.subTest = subTest
2199 else:
2200 state.run_engine()
2201 except BaseException as e:
2202 # The exception caught here should either be an actual test
2203 # failure (or BaseExceptionGroup), or some kind of fatal error
2204 # that caused the engine to stop.
2205 generated_seed = (
2206 wrapped_test._hypothesis_internal_use_generated_seed
2207 )
2208 with local_settings(settings):
2209 if not (state.failed_normally or generated_seed is None):
2210 if running_under_pytest:
2211 report(
2212 f"You can add @seed({generated_seed}) to this test or "
2213 f"run pytest with --hypothesis-seed={generated_seed} "
2214 "to reproduce this failure."
2215 )
2216 else:
2217 report(
2218 f"You can add @seed({generated_seed}) to this test to "
2219 "reproduce this failure."
2220 )
2221 # The dance here is to avoid showing users long tracebacks
2222 # full of Hypothesis internals they don't care about.
2223 # We have to do this inline, to avoid adding another
2224 # internal stack frame just when we've removed the rest.
2225 #
2226 # Using a variable for our trimmed error ensures that the line
2227 # which will actually appear in tracebacks is as clear as
2228 # possible - "raise the_error_hypothesis_found".
2229 the_error_hypothesis_found = e.with_traceback(
2230 None
2231 if isinstance(e, BaseExceptionGroup)
2232 else get_trimmed_traceback()
2233 )
2234 raise the_error_hypothesis_found
2236 if not (ran_explicit_examples or state.ever_executed):
2237 raise SKIP_BECAUSE_NO_EXAMPLES
2238 finally:
2239 with thread_overlap_lock:
2240 del thread_overlap[threadid]
2242 def _get_fuzz_target() -> (
2243 Callable[[bytes | bytearray | memoryview | BinaryIO], bytes | None]
2244 ):
2245 # Because fuzzing interfaces are very performance-sensitive, we use a
2246 # somewhat more complicated structure here. `_get_fuzz_target()` is
2247 # called by the `HypothesisHandle.fuzz_one_input` property, allowing
2248 # us to defer our collection of the settings, random instance, and
2249 # reassignable `inner_test` (etc) until `fuzz_one_input` is accessed.
2250 #
2251 # We then share the performance cost of setting up `state` between
2252 # many invocations of the target. We explicitly force `deadline=None`
2253 # for performance reasons, saving ~40% the runtime of an empty test.
2254 test = wrapped_test.hypothesis.inner_test
2255 settings = Settings(
2256 parent=wrapped_test._hypothesis_internal_use_settings, deadline=None
2257 )
2258 random = get_random_for_wrapped_test(test, wrapped_test)
2259 _args, _kwargs, stuff = process_arguments_to_given(
2260 wrapped_test, (), {}, given_kwargs, new_signature.parameters
2261 )
2262 assert not _args
2263 assert not _kwargs
2264 state = StateForActualGivenExecution(
2265 stuff,
2266 test,
2267 settings,
2268 random,
2269 wrapped_test,
2270 thread_overlap=thread_overlap,
2271 )
2272 database_key = function_digest(test) + b".secondary"
2273 # We track the minimal-so-far example for each distinct origin, so
2274 # that we track log-n instead of n examples for long runs. In particular
2275 # it means that we saturate for common errors in long runs instead of
2276 # storing huge volumes of low-value data.
2277 minimal_failures: dict = {}
2279 def fuzz_one_input(
2280 buffer: bytes | bytearray | memoryview | BinaryIO,
2281 ) -> bytes | None:
2282 # This inner part is all that the fuzzer will actually run,
2283 # so we keep it as small and as fast as possible.
2284 if isinstance(buffer, io.IOBase):
2285 buffer = buffer.read(BUFFER_SIZE)
2286 assert isinstance(buffer, (bytes, bytearray, memoryview))
2287 data = ConjectureData(
2288 random=None,
2289 provider=BytestringProvider,
2290 provider_kw={"bytestring": buffer},
2291 )
2292 try:
2293 state.execute_once(data)
2294 status = Status.VALID
2295 except StopTest:
2296 status = data.status
2297 return None
2298 except UnsatisfiedAssumption:
2299 status = Status.INVALID
2300 return None
2301 except BaseException:
2302 known = minimal_failures.get(data.interesting_origin)
2303 if settings.database is not None and (
2304 known is None or sort_key(data.nodes) <= sort_key(known)
2305 ):
2306 settings.database.save(
2307 database_key, choices_to_bytes(data.choices)
2308 )
2309 minimal_failures[data.interesting_origin] = data.nodes
2310 status = Status.INTERESTING
2311 raise
2312 finally:
2313 if observability_enabled():
2314 data.freeze()
2315 tc = make_testcase(
2316 run_start=state._start_timestamp,
2317 property=state.test_identifier,
2318 data=data,
2319 how_generated="fuzz_one_input",
2320 representation=state._string_repr,
2321 arguments=data._observability_args,
2322 timing=state._timing_features,
2323 coverage=None,
2324 status=status,
2325 backend_metadata=data.provider.observe_test_case(),
2326 )
2327 deliver_observation(tc)
2328 state._timing_features = {}
2330 assert isinstance(data.provider, BytestringProvider)
2331 return bytes(data.provider.drawn)
2333 fuzz_one_input.__doc__ = HypothesisHandle.fuzz_one_input.__doc__
2334 return fuzz_one_input
2336 # After having created the decorated test function, we need to copy
2337 # over some attributes to make the switch as seamless as possible.
2339 for attrib in dir(test):
2340 if not (attrib.startswith("_") or hasattr(wrapped_test, attrib)):
2341 setattr(wrapped_test, attrib, getattr(test, attrib))
2342 wrapped_test.is_hypothesis_test = True
2343 if hasattr(test, "_hypothesis_internal_settings_applied"):
2344 # Used to check if @settings is applied twice.
2345 wrapped_test._hypothesis_internal_settings_applied = True
2346 wrapped_test._hypothesis_internal_use_seed = getattr(
2347 test, "_hypothesis_internal_use_seed", None
2348 )
2349 wrapped_test._hypothesis_internal_use_settings = (
2350 getattr(test, "_hypothesis_internal_use_settings", None) or Settings.default
2351 )
2352 wrapped_test._hypothesis_internal_use_reproduce_failure = getattr(
2353 test, "_hypothesis_internal_use_reproduce_failure", None
2354 )
2355 wrapped_test.hypothesis = HypothesisHandle(test, _get_fuzz_target, given_kwargs)
2356 return wrapped_test
2358 return run_test_as_given
2361def find(
2362 specifier: SearchStrategy[Ex],
2363 condition: Callable[[Any], bool],
2364 *,
2365 settings: Settings | None = None,
2366 random: Random | None = None,
2367 database_key: bytes | None = None,
2368) -> Ex:
2369 """Returns the minimal example from the given strategy ``specifier`` that
2370 matches the predicate function ``condition``."""
2371 if settings is None:
2372 settings = Settings(max_examples=2000)
2373 settings = Settings(
2374 settings, suppress_health_check=list(HealthCheck), report_multiple_bugs=False
2375 )
2377 if database_key is None and settings.database is not None:
2378 # Note: The database key is not guaranteed to be unique. If not, replaying
2379 # of database examples may fail to reproduce due to being replayed on the
2380 # wrong condition.
2381 database_key = function_digest(condition)
2383 if not isinstance(specifier, SearchStrategy):
2384 raise InvalidArgument(
2385 f"Expected SearchStrategy but got {specifier!r} of "
2386 f"type {type(specifier).__name__}"
2387 )
2388 specifier.validate()
2390 last: list[Ex] = []
2392 @settings
2393 @given(specifier)
2394 def test(v):
2395 if condition(v):
2396 last[:] = [v]
2397 raise Found
2399 if random is not None:
2400 test = seed(random.getrandbits(64))(test)
2402 test._hypothesis_internal_database_key = database_key # type: ignore
2404 try:
2405 test()
2406 except Found:
2407 return last[0]
2409 raise NoSuchExample(get_pretty_function_description(condition))