Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/hypothesis/core.py: 34%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
11"""This module provides the core primitives of Hypothesis, such as given."""
12import base64
13import contextlib
14import dataclasses
15import datetime
16import inspect
17import io
18import math
19import os
20import sys
21import threading
22import time
23import traceback
24import types
25import unittest
26import warnings
27import zlib
28from collections import defaultdict
29from collections.abc import Coroutine, Generator, Hashable, Iterable, Sequence
30from dataclasses import dataclass, field
31from functools import partial
32from inspect import Parameter
33from random import Random
34from threading import Lock
35from typing import (
36 Any,
37 BinaryIO,
38 Callable,
39 Optional,
40 TypeVar,
41 Union,
42 overload,
43)
44from unittest import TestCase
46from hypothesis import strategies as st
47from hypothesis._settings import (
48 HealthCheck,
49 Phase,
50 Verbosity,
51 all_settings,
52 local_settings,
53 settings as Settings,
54)
55from hypothesis.control import BuildContext, currently_in_test_context
56from hypothesis.database import choices_from_bytes, choices_to_bytes
57from hypothesis.errors import (
58 BackendCannotProceed,
59 DeadlineExceeded,
60 DidNotReproduce,
61 FailedHealthCheck,
62 FlakyFailure,
63 FlakyReplay,
64 Found,
65 Frozen,
66 HypothesisException,
67 HypothesisWarning,
68 InvalidArgument,
69 NoSuchExample,
70 StopTest,
71 Unsatisfiable,
72 UnsatisfiedAssumption,
73)
74from hypothesis.internal import observability
75from hypothesis.internal.compat import (
76 PYPY,
77 BaseExceptionGroup,
78 EllipsisType,
79 add_note,
80 bad_django_TestCase,
81 get_type_hints,
82 int_from_bytes,
83)
84from hypothesis.internal.conjecture.choice import ChoiceT
85from hypothesis.internal.conjecture.data import ConjectureData, Status
86from hypothesis.internal.conjecture.engine import BUFFER_SIZE, ConjectureRunner
87from hypothesis.internal.conjecture.junkdrawer import (
88 ensure_free_stackframes,
89 gc_cumulative_time,
90)
91from hypothesis.internal.conjecture.providers import (
92 BytestringProvider,
93 PrimitiveProvider,
94)
95from hypothesis.internal.conjecture.shrinker import sort_key
96from hypothesis.internal.entropy import deterministic_PRNG
97from hypothesis.internal.escalation import (
98 InterestingOrigin,
99 current_pytest_item,
100 format_exception,
101 get_trimmed_traceback,
102 is_hypothesis_file,
103)
104from hypothesis.internal.healthcheck import fail_health_check
105from hypothesis.internal.observability import (
106 InfoObservation,
107 InfoObservationType,
108 deliver_observation,
109 make_testcase,
110 observability_enabled,
111)
112from hypothesis.internal.reflection import (
113 convert_positional_arguments,
114 define_function_signature,
115 function_digest,
116 get_pretty_function_description,
117 get_signature,
118 impersonate,
119 is_mock,
120 nicerepr,
121 proxies,
122 repr_call,
123)
124from hypothesis.internal.scrutineer import (
125 MONITORING_TOOL_ID,
126 Trace,
127 Tracer,
128 explanatory_lines,
129 tractable_coverage_report,
130)
131from hypothesis.internal.validation import check_type
132from hypothesis.reporting import (
133 current_verbosity,
134 report,
135 verbose_report,
136 with_reporter,
137)
138from hypothesis.statistics import describe_statistics, describe_targets, note_statistics
139from hypothesis.strategies._internal.misc import NOTHING
140from hypothesis.strategies._internal.strategies import (
141 Ex,
142 SearchStrategy,
143 check_strategy,
144)
145from hypothesis.utils.conventions import not_set
146from hypothesis.utils.threading import ThreadLocal
147from hypothesis.vendor.pretty import RepresentationPrinter
148from hypothesis.version import __version__
150TestFunc = TypeVar("TestFunc", bound=Callable)
153running_under_pytest = False
154pytest_shows_exceptiongroups = True
155global_force_seed = None
156# `threadlocal` stores "engine-global" constants, which are global relative to a
157# ConjectureRunner instance (roughly speaking). Since only one conjecture runner
158# instance can be active per thread, making engine constants thread-local prevents
159# the ConjectureRunner instances of concurrent threads from treading on each other.
160threadlocal = ThreadLocal(_hypothesis_global_random=lambda: None)
163@dataclass
164class Example:
165 args: Any
166 kwargs: Any
167 # Plus two optional arguments for .xfail()
168 raises: Any = field(default=None)
169 reason: Any = field(default=None)
172# TODO_DOCS link to not-yet-existent patch-dumping docs
175class example:
176 """
177 Add an explicit input to a Hypothesis test, which Hypothesis will always
178 try before generating random inputs. This combines the randomized nature of
179 Hypothesis generation with a traditional parametrized test.
181 For example:
183 .. code-block:: python
185 @example("Hello world")
186 @example("some string with special significance")
187 @given(st.text())
188 def test_strings(s):
189 pass
191 will call ``test_strings("Hello World")`` and
192 ``test_strings("some string with special significance")`` before generating
193 any random inputs. |@example| may be placed in any order relative to |@given|
194 and |@settings|.
196 Explicit inputs from |@example| are run in the |Phase.explicit| phase.
197 Explicit inputs do not count towards |settings.max_examples|. Note that
198 explicit inputs added by |@example| do not shrink. If an explicit input
199 fails, Hypothesis will stop and report the failure without generating any
200 random inputs.
202 |@example| can also be used to easily reproduce a failure. For instance, if
203 Hypothesis reports that ``f(n=[0, math.nan])`` fails, you can add
204 ``@example(n=[0, math.nan])`` to your test to quickly reproduce that failure.
206 Arguments to ``@example``
207 -------------------------
209 Arguments to |@example| have the same behavior and restrictions as arguments
210 to |@given|. This means they may be either positional or keyword arguments
211 (but not both in the same |@example|):
213 .. code-block:: python
215 @example(1, 2)
216 @example(x=1, y=2)
217 @given(st.integers(), st.integers())
218 def test(x, y):
219 pass
221 Noting that while arguments to |@given| are strategies (like |st.integers|),
222 arguments to |@example| are values instead (like ``1``).
224 See the :ref:`given-arguments` section for full details.
225 """
227 def __init__(self, *args: Any, **kwargs: Any) -> None:
228 if args and kwargs:
229 raise InvalidArgument(
230 "Cannot mix positional and keyword arguments for examples"
231 )
232 if not (args or kwargs):
233 raise InvalidArgument("An example must provide at least one argument")
235 self.hypothesis_explicit_examples: list[Example] = []
236 self._this_example = Example(tuple(args), kwargs)
238 def __call__(self, test: TestFunc) -> TestFunc:
239 if not hasattr(test, "hypothesis_explicit_examples"):
240 test.hypothesis_explicit_examples = self.hypothesis_explicit_examples # type: ignore
241 test.hypothesis_explicit_examples.append(self._this_example) # type: ignore
242 return test
244 def xfail(
245 self,
246 condition: bool = True, # noqa: FBT002
247 *,
248 reason: str = "",
249 raises: Union[
250 type[BaseException], tuple[type[BaseException], ...]
251 ] = BaseException,
252 ) -> "example":
253 """Mark this example as an expected failure, similarly to
254 :obj:`pytest.mark.xfail(strict=True) <pytest.mark.xfail>`.
256 Expected-failing examples allow you to check that your test does fail on
257 some examples, and therefore build confidence that *passing* tests are
258 because your code is working, not because the test is missing something.
260 .. code-block:: python
262 @example(...).xfail()
263 @example(...).xfail(reason="Prices must be non-negative")
264 @example(...).xfail(raises=(KeyError, ValueError))
265 @example(...).xfail(sys.version_info[:2] >= (3, 12), reason="needs py 3.12")
266 @example(...).xfail(condition=sys.platform != "linux", raises=OSError)
267 def test(x):
268 pass
270 .. note::
272 Expected-failing examples are handled separately from those generated
273 by strategies, so you should usually ensure that there is no overlap.
275 .. code-block:: python
277 @example(x=1, y=0).xfail(raises=ZeroDivisionError)
278 @given(x=st.just(1), y=st.integers()) # Missing `.filter(bool)`!
279 def test_fraction(x, y):
280 # This test will try the explicit example and see it fail as
281 # expected, then go on to generate more examples from the
282 # strategy. If we happen to generate y=0, the test will fail
283 # because only the explicit example is treated as xfailing.
284 x / y
285 """
286 check_type(bool, condition, "condition")
287 check_type(str, reason, "reason")
288 if not (
289 isinstance(raises, type) and issubclass(raises, BaseException)
290 ) and not (
291 isinstance(raises, tuple)
292 and raises # () -> expected to fail with no error, which is impossible
293 and all(
294 isinstance(r, type) and issubclass(r, BaseException) for r in raises
295 )
296 ):
297 raise InvalidArgument(
298 f"{raises=} must be an exception type or tuple of exception types"
299 )
300 if condition:
301 self._this_example = dataclasses.replace(
302 self._this_example, raises=raises, reason=reason
303 )
304 return self
306 def via(self, whence: str, /) -> "example":
307 """Attach a machine-readable label noting what the origin of this example
308 was. |example.via| is completely optional and does not change runtime
309 behavior.
311 |example.via| is intended to support self-documenting behavior, as well as
312 tooling which might add (or remove) |@example| decorators automatically.
313 For example:
315 .. code-block:: python
317 # Annotating examples is optional and does not change runtime behavior
318 @example(...)
319 @example(...).via("regression test for issue #42")
320 @example(...).via("discovered failure")
321 def test(x):
322 pass
324 .. note::
326 `HypoFuzz <https://hypofuzz.com/>`_ uses |example.via| to tag examples
327 in the patch of its high-coverage set of explicit inputs, on
328 `the patches page <https://hypofuzz.com/example-dashboard/#/patches>`_.
329 """
330 if not isinstance(whence, str):
331 raise InvalidArgument(".via() must be passed a string")
332 # This is deliberately a no-op at runtime; the tools operate on source code.
333 return self
336def seed(seed: Hashable) -> Callable[[TestFunc], TestFunc]:
337 """
338 Seed the randomness for this test.
340 ``seed`` may be any hashable object. No exact meaning for ``seed`` is provided
341 other than that for a fixed seed value Hypothesis will produce the same
342 examples (assuming that there are no other sources of nondeterminisim, such
343 as timing, hash randomization, or external state).
345 For example, the following test function and |RuleBasedStateMachine| will
346 each generate the same series of examples each time they are executed:
348 .. code-block:: python
350 @seed(1234)
351 @given(st.integers())
352 def test(n): ...
354 @seed(6789)
355 class MyMachine(RuleBasedStateMachine): ...
357 If using pytest, you can alternatively pass ``--hypothesis-seed`` on the
358 command line.
360 Setting a seed overrides |settings.derandomize|, which is designed to enable
361 deterministic CI tests rather than reproducing observed failures.
363 Hypothesis will only print the seed which would reproduce a failure if a test
364 fails in an unexpected way, for instance inside Hypothesis internals.
365 """
367 def accept(test):
368 test._hypothesis_internal_use_seed = seed
369 current_settings = getattr(test, "_hypothesis_internal_use_settings", None)
370 test._hypothesis_internal_use_settings = Settings(
371 current_settings, database=None
372 )
373 return test
375 return accept
378# TODO_DOCS: link to /explanation/choice-sequence
381def reproduce_failure(version: str, blob: bytes) -> Callable[[TestFunc], TestFunc]:
382 """
383 Run the example corresponding to the binary ``blob`` in order to reproduce a
384 failure. ``blob`` is a serialized version of the internal input representation
385 of Hypothesis.
387 A test decorated with |@reproduce_failure| always runs exactly one example,
388 which is expected to cause a failure. If the provided ``blob`` does not
389 cause a failure, Hypothesis will raise |DidNotReproduce|.
391 Hypothesis will print an |@reproduce_failure| decorator if
392 |settings.print_blob| is ``True`` (which is the default in CI).
394 |@reproduce_failure| is intended to be temporarily added to your test suite in
395 order to reproduce a failure. It is not intended to be a permanent addition to
396 your test suite. Because of this, no compatibility guarantees are made across
397 Hypothesis versions, and |@reproduce_failure| will error if used on a different
398 Hypothesis version than it was created for.
400 .. seealso::
402 See also the :doc:`/tutorial/replaying-failures` tutorial.
403 """
405 def accept(test):
406 test._hypothesis_internal_use_reproduce_failure = (version, blob)
407 return test
409 return accept
412def reproduction_decorator(choices: Iterable[ChoiceT]) -> str:
413 return f"@reproduce_failure({__version__!r}, {encode_failure(choices)!r})"
416def encode_failure(choices: Iterable[ChoiceT]) -> bytes:
417 blob = choices_to_bytes(choices)
418 compressed = zlib.compress(blob)
419 if len(compressed) < len(blob):
420 blob = b"\1" + compressed
421 else:
422 blob = b"\0" + blob
423 return base64.b64encode(blob)
426def decode_failure(blob: bytes) -> Sequence[ChoiceT]:
427 try:
428 decoded = base64.b64decode(blob)
429 except Exception:
430 raise InvalidArgument(f"Invalid base64 encoded string: {blob!r}") from None
432 prefix = decoded[:1]
433 if prefix == b"\0":
434 decoded = decoded[1:]
435 elif prefix == b"\1":
436 try:
437 decoded = zlib.decompress(decoded[1:])
438 except zlib.error as err:
439 raise InvalidArgument(
440 f"Invalid zlib compression for blob {blob!r}"
441 ) from err
442 else:
443 raise InvalidArgument(
444 f"Could not decode blob {blob!r}: Invalid start byte {prefix!r}"
445 )
447 choices = choices_from_bytes(decoded)
448 if choices is None:
449 raise InvalidArgument(f"Invalid serialized choice sequence for blob {blob!r}")
451 return choices
454def _invalid(message, *, exc=InvalidArgument, test, given_kwargs):
455 @impersonate(test)
456 def wrapped_test(*arguments, **kwargs): # pragma: no cover # coverage limitation
457 raise exc(message)
459 wrapped_test.is_hypothesis_test = True
460 wrapped_test.hypothesis = HypothesisHandle(
461 inner_test=test,
462 _get_fuzz_target=wrapped_test,
463 _given_kwargs=given_kwargs,
464 )
465 return wrapped_test
468def is_invalid_test(test, original_sig, given_arguments, given_kwargs):
469 """Check the arguments to ``@given`` for basic usage constraints.
471 Most errors are not raised immediately; instead we return a dummy test
472 function that will raise the appropriate error if it is actually called.
473 When the user runs a subset of tests (e.g via ``pytest -k``), errors will
474 only be reported for tests that actually ran.
475 """
476 invalid = partial(_invalid, test=test, given_kwargs=given_kwargs)
478 if not (given_arguments or given_kwargs):
479 return invalid("given must be called with at least one argument")
481 params = list(original_sig.parameters.values())
482 pos_params = [p for p in params if p.kind is p.POSITIONAL_OR_KEYWORD]
483 kwonly_params = [p for p in params if p.kind is p.KEYWORD_ONLY]
484 if given_arguments and params != pos_params:
485 return invalid(
486 "positional arguments to @given are not supported with varargs, "
487 "varkeywords, positional-only, or keyword-only arguments"
488 )
490 if len(given_arguments) > len(pos_params):
491 return invalid(
492 f"Too many positional arguments for {test.__name__}() were passed to "
493 f"@given - expected at most {len(pos_params)} "
494 f"arguments, but got {len(given_arguments)} {given_arguments!r}"
495 )
497 if ... in given_arguments:
498 return invalid(
499 "... was passed as a positional argument to @given, but may only be "
500 "passed as a keyword argument or as the sole argument of @given"
501 )
503 if given_arguments and given_kwargs:
504 return invalid("cannot mix positional and keyword arguments to @given")
505 extra_kwargs = [
506 k for k in given_kwargs if k not in {p.name for p in pos_params + kwonly_params}
507 ]
508 if extra_kwargs and (params == [] or params[-1].kind is not params[-1].VAR_KEYWORD):
509 arg = extra_kwargs[0]
510 extra = ""
511 if arg in all_settings:
512 extra = f". Did you mean @settings({arg}={given_kwargs[arg]!r})?"
513 return invalid(
514 f"{test.__name__}() got an unexpected keyword argument {arg!r}, "
515 f"from `{arg}={given_kwargs[arg]!r}` in @given{extra}"
516 )
517 if any(p.default is not p.empty for p in params):
518 return invalid("Cannot apply @given to a function with defaults.")
520 # This case would raise Unsatisfiable *anyway*, but by detecting it here we can
521 # provide a much more helpful error message for people e.g. using the Ghostwriter.
522 empty = [
523 f"{s!r} (arg {idx})" for idx, s in enumerate(given_arguments) if s is NOTHING
524 ] + [f"{name}={s!r}" for name, s in given_kwargs.items() if s is NOTHING]
525 if empty:
526 strats = "strategies" if len(empty) > 1 else "strategy"
527 return invalid(
528 f"Cannot generate examples from empty {strats}: " + ", ".join(empty),
529 exc=Unsatisfiable,
530 )
533def execute_explicit_examples(state, wrapped_test, arguments, kwargs, original_sig):
534 assert isinstance(state, StateForActualGivenExecution)
535 posargs = [
536 p.name
537 for p in original_sig.parameters.values()
538 if p.kind is p.POSITIONAL_OR_KEYWORD
539 ]
541 for example in reversed(getattr(wrapped_test, "hypothesis_explicit_examples", ())):
542 assert isinstance(example, Example)
543 # All of this validation is to check that @example() got "the same" arguments
544 # as @given, i.e. corresponding to the same parameters, even though they might
545 # be any mixture of positional and keyword arguments.
546 if example.args:
547 assert not example.kwargs
548 if any(
549 p.kind is p.POSITIONAL_ONLY for p in original_sig.parameters.values()
550 ):
551 raise InvalidArgument(
552 "Cannot pass positional arguments to @example() when decorating "
553 "a test function which has positional-only parameters."
554 )
555 if len(example.args) > len(posargs):
556 raise InvalidArgument(
557 "example has too many arguments for test. Expected at most "
558 f"{len(posargs)} but got {len(example.args)}"
559 )
560 example_kwargs = dict(zip(posargs[-len(example.args) :], example.args))
561 else:
562 example_kwargs = dict(example.kwargs)
563 given_kws = ", ".join(
564 repr(k) for k in sorted(wrapped_test.hypothesis._given_kwargs)
565 )
566 example_kws = ", ".join(repr(k) for k in sorted(example_kwargs))
567 if given_kws != example_kws:
568 raise InvalidArgument(
569 f"Inconsistent args: @given() got strategies for {given_kws}, "
570 f"but @example() got arguments for {example_kws}"
571 ) from None
573 # This is certainly true because the example_kwargs exactly match the params
574 # reserved by @given(), which are then remove from the function signature.
575 assert set(example_kwargs).isdisjoint(kwargs)
576 example_kwargs.update(kwargs)
578 if Phase.explicit not in state.settings.phases:
579 continue
581 with local_settings(state.settings):
582 fragments_reported = []
583 empty_data = ConjectureData.for_choices([])
584 try:
585 execute_example = partial(
586 state.execute_once,
587 empty_data,
588 is_final=True,
589 print_example=True,
590 example_kwargs=example_kwargs,
591 )
592 with with_reporter(fragments_reported.append):
593 if example.raises is None:
594 execute_example()
595 else:
596 # @example(...).xfail(...)
597 bits = ", ".join(nicerepr(x) for x in arguments) + ", ".join(
598 f"{k}={nicerepr(v)}" for k, v in example_kwargs.items()
599 )
600 try:
601 execute_example()
602 except failure_exceptions_to_catch() as err:
603 if not isinstance(err, example.raises):
604 raise
605 # Save a string form of this example; we'll warn if it's
606 # ever generated by the strategy (which can't be xfailed)
607 state.xfail_example_reprs.add(
608 repr_call(state.test, arguments, example_kwargs)
609 )
610 except example.raises as err:
611 # We'd usually check this as early as possible, but it's
612 # possible for failure_exceptions_to_catch() to grow when
613 # e.g. pytest is imported between import- and test-time.
614 raise InvalidArgument(
615 f"@example({bits}) raised an expected {err!r}, "
616 "but Hypothesis does not treat this as a test failure"
617 ) from err
618 else:
619 # Unexpectedly passing; always raise an error in this case.
620 reason = f" because {example.reason}" * bool(example.reason)
621 if example.raises is BaseException:
622 name = "exception" # special-case no raises= arg
623 elif not isinstance(example.raises, tuple):
624 name = example.raises.__name__
625 elif len(example.raises) == 1:
626 name = example.raises[0].__name__
627 else:
628 name = (
629 ", ".join(ex.__name__ for ex in example.raises[:-1])
630 + f", or {example.raises[-1].__name__}"
631 )
632 vowel = name.upper()[0] in "AEIOU"
633 raise AssertionError(
634 f"Expected a{'n' * vowel} {name} from @example({bits})"
635 f"{reason}, but no exception was raised."
636 )
637 except UnsatisfiedAssumption:
638 # Odd though it seems, we deliberately support explicit examples that
639 # are then rejected by a call to `assume()`. As well as iterative
640 # development, this is rather useful to replay Hypothesis' part of
641 # a saved failure when other arguments are supplied by e.g. pytest.
642 # See https://github.com/HypothesisWorks/hypothesis/issues/2125
643 with contextlib.suppress(StopTest):
644 empty_data.conclude_test(Status.INVALID)
645 except BaseException as err:
646 # In order to support reporting of multiple failing examples, we yield
647 # each of the (report text, error) pairs we find back to the top-level
648 # runner. This also ensures that user-facing stack traces have as few
649 # frames of Hypothesis internals as possible.
650 err = err.with_traceback(get_trimmed_traceback())
652 # One user error - whether misunderstanding or typo - we've seen a few
653 # times is to pass strategies to @example() where values are expected.
654 # Checking is easy, and false-positives not much of a problem, so:
655 if isinstance(err, failure_exceptions_to_catch()) and any(
656 isinstance(arg, SearchStrategy)
657 for arg in example.args + tuple(example.kwargs.values())
658 ):
659 new = HypothesisWarning(
660 "The @example() decorator expects to be passed values, but "
661 "you passed strategies instead. See https://hypothesis."
662 "readthedocs.io/en/latest/reference/api.html#hypothesis"
663 ".example for details."
664 )
665 new.__cause__ = err
666 err = new
668 with contextlib.suppress(StopTest):
669 empty_data.conclude_test(Status.INVALID)
670 yield (fragments_reported, err)
671 if (
672 state.settings.report_multiple_bugs
673 and pytest_shows_exceptiongroups
674 and isinstance(err, failure_exceptions_to_catch())
675 and not isinstance(err, skip_exceptions_to_reraise())
676 ):
677 continue
678 break
679 finally:
680 if fragments_reported:
681 assert fragments_reported[0].startswith("Falsifying example")
682 fragments_reported[0] = fragments_reported[0].replace(
683 "Falsifying example", "Falsifying explicit example", 1
684 )
686 empty_data.freeze()
687 if observability_enabled():
688 tc = make_testcase(
689 run_start=state._start_timestamp,
690 property=state.test_identifier,
691 data=empty_data,
692 how_generated="explicit example",
693 representation=state._string_repr,
694 timing=state._timing_features,
695 )
696 deliver_observation(tc)
698 if fragments_reported:
699 verbose_report(fragments_reported[0].replace("Falsifying", "Trying", 1))
700 for f in fragments_reported[1:]:
701 verbose_report(f)
704def get_random_for_wrapped_test(test, wrapped_test):
705 settings = wrapped_test._hypothesis_internal_use_settings
706 wrapped_test._hypothesis_internal_use_generated_seed = None
708 if wrapped_test._hypothesis_internal_use_seed is not None:
709 return Random(wrapped_test._hypothesis_internal_use_seed)
710 elif settings.derandomize:
711 return Random(int_from_bytes(function_digest(test)))
712 elif global_force_seed is not None:
713 return Random(global_force_seed)
714 else:
715 if threadlocal._hypothesis_global_random is None: # pragma: no cover
716 threadlocal._hypothesis_global_random = Random()
717 seed = threadlocal._hypothesis_global_random.getrandbits(128)
718 wrapped_test._hypothesis_internal_use_generated_seed = seed
719 return Random(seed)
722@dataclass
723class Stuff:
724 selfy: Any
725 args: tuple
726 kwargs: dict
727 given_kwargs: dict
730def process_arguments_to_given(
731 wrapped_test: Any,
732 arguments: Sequence[object],
733 kwargs: dict[str, object],
734 given_kwargs: dict[str, SearchStrategy],
735 params: dict[str, Parameter],
736) -> tuple[Sequence[object], dict[str, object], Stuff]:
737 selfy = None
738 arguments, kwargs = convert_positional_arguments(wrapped_test, arguments, kwargs)
740 # If the test function is a method of some kind, the bound object
741 # will be the first named argument if there are any, otherwise the
742 # first vararg (if any).
743 posargs = [p.name for p in params.values() if p.kind is p.POSITIONAL_OR_KEYWORD]
744 if posargs:
745 selfy = kwargs.get(posargs[0])
746 elif arguments:
747 selfy = arguments[0]
749 # Ensure that we don't mistake mocks for self here.
750 # This can cause the mock to be used as the test runner.
751 if is_mock(selfy):
752 selfy = None
754 arguments = tuple(arguments)
756 with ensure_free_stackframes():
757 for k, s in given_kwargs.items():
758 check_strategy(s, name=k)
759 s.validate()
761 stuff = Stuff(selfy=selfy, args=arguments, kwargs=kwargs, given_kwargs=given_kwargs)
763 return arguments, kwargs, stuff
766def skip_exceptions_to_reraise():
767 """Return a tuple of exceptions meaning 'skip this test', to re-raise.
769 This is intended to cover most common test runners; if you would
770 like another to be added please open an issue or pull request adding
771 it to this function and to tests/cover/test_lazy_import.py
772 """
773 # This is a set because nose may simply re-export unittest.SkipTest
774 exceptions = set()
775 # We use this sys.modules trick to avoid importing libraries -
776 # you can't be an instance of a type from an unimported module!
777 # This is fast enough that we don't need to cache the result,
778 # and more importantly it avoids possible side-effects :-)
779 if "unittest" in sys.modules:
780 exceptions.add(sys.modules["unittest"].SkipTest)
781 if "unittest2" in sys.modules:
782 exceptions.add(sys.modules["unittest2"].SkipTest)
783 if "nose" in sys.modules:
784 exceptions.add(sys.modules["nose"].SkipTest)
785 if "_pytest.outcomes" in sys.modules:
786 exceptions.add(sys.modules["_pytest.outcomes"].Skipped)
787 return tuple(sorted(exceptions, key=str))
790def failure_exceptions_to_catch() -> tuple[type[BaseException], ...]:
791 """Return a tuple of exceptions meaning 'this test has failed', to catch.
793 This is intended to cover most common test runners; if you would
794 like another to be added please open an issue or pull request.
795 """
796 # While SystemExit and GeneratorExit are instances of BaseException, we also
797 # expect them to be deterministic - unlike KeyboardInterrupt - and so we treat
798 # them as standard exceptions, check for flakiness, etc.
799 # See https://github.com/HypothesisWorks/hypothesis/issues/2223 for details.
800 exceptions = [Exception, SystemExit, GeneratorExit]
801 if "_pytest.outcomes" in sys.modules:
802 exceptions.append(sys.modules["_pytest.outcomes"].Failed)
803 return tuple(exceptions)
806def new_given_signature(original_sig, given_kwargs):
807 """Make an updated signature for the wrapped test."""
808 return original_sig.replace(
809 parameters=[
810 p
811 for p in original_sig.parameters.values()
812 if not (
813 p.name in given_kwargs
814 and p.kind in (p.POSITIONAL_OR_KEYWORD, p.KEYWORD_ONLY)
815 )
816 ],
817 return_annotation=None,
818 )
821def default_executor(data, function):
822 return function(data)
825def get_executor(runner):
826 try:
827 execute_example = runner.execute_example
828 except AttributeError:
829 pass
830 else:
831 return lambda data, function: execute_example(partial(function, data))
833 if hasattr(runner, "setup_example") or hasattr(runner, "teardown_example"):
834 setup = getattr(runner, "setup_example", None) or (lambda: None)
835 teardown = getattr(runner, "teardown_example", None) or (lambda ex: None)
837 def execute(data, function):
838 token = None
839 try:
840 token = setup()
841 return function(data)
842 finally:
843 teardown(token)
845 return execute
847 return default_executor
850# This function is a crude solution, a better way of resolving it would probably
851# be to rewrite a bunch of exception handlers to use except*.
852T = TypeVar("T", bound=BaseException)
855def _flatten_group(excgroup: BaseExceptionGroup[T]) -> list[T]:
856 found_exceptions: list[T] = []
857 for exc in excgroup.exceptions:
858 if isinstance(exc, BaseExceptionGroup):
859 found_exceptions.extend(_flatten_group(exc))
860 else:
861 found_exceptions.append(exc)
862 return found_exceptions
865@contextlib.contextmanager
866def unwrap_markers_from_group() -> Generator[None, None, None]:
867 try:
868 yield
869 except BaseExceptionGroup as excgroup:
870 frozen_exceptions, non_frozen_exceptions = excgroup.split(Frozen)
872 # group only contains Frozen, reraise the group
873 # it doesn't matter what we raise, since any exceptions get disregarded
874 # and reraised as StopTest if data got frozen.
875 if non_frozen_exceptions is None:
876 raise
877 # in all other cases they are discarded
879 # Can RewindRecursive end up in this group?
880 _, user_exceptions = non_frozen_exceptions.split(
881 lambda e: isinstance(e, (StopTest, HypothesisException))
882 )
884 # this might contain marker exceptions, or internal errors, but not frozen.
885 if user_exceptions is not None:
886 raise
888 # single marker exception - reraise it
889 flattened_non_frozen_exceptions: list[BaseException] = _flatten_group(
890 non_frozen_exceptions
891 )
892 if len(flattened_non_frozen_exceptions) == 1:
893 e = flattened_non_frozen_exceptions[0]
894 # preserve the cause of the original exception to not hinder debugging
895 # note that __context__ is still lost though
896 raise e from e.__cause__
898 # multiple marker exceptions. If we re-raise the whole group we break
899 # a bunch of logic so ....?
900 stoptests, non_stoptests = non_frozen_exceptions.split(StopTest)
902 # TODO: stoptest+hypothesisexception ...? Is it possible? If so, what do?
904 if non_stoptests:
905 # TODO: multiple marker exceptions is easy to produce, but the logic in the
906 # engine does not handle it... so we just reraise the first one for now.
907 e = _flatten_group(non_stoptests)[0]
908 raise e from e.__cause__
909 assert stoptests is not None
911 # multiple stoptests: raising the one with the lowest testcounter
912 raise min(_flatten_group(stoptests), key=lambda s_e: s_e.testcounter)
915class StateForActualGivenExecution:
916 def __init__(
917 self, stuff, test, settings, random, wrapped_test, *, thread_overlap=None
918 ):
919 self.stuff = stuff
920 self.test = test
921 self.settings = settings
922 self.random = random
923 self.wrapped_test = wrapped_test
924 self.thread_overlap = {} if thread_overlap is None else thread_overlap
926 self.test_runner = get_executor(stuff.selfy)
927 self.is_find = getattr(wrapped_test, "_hypothesis_internal_is_find", False)
928 self.print_given_args = getattr(
929 wrapped_test, "_hypothesis_internal_print_given_args", True
930 )
932 self.last_exception = None
933 self.falsifying_examples = ()
934 self.ever_executed = False
935 self.xfail_example_reprs = set()
936 self.files_to_propagate = set()
937 self.failed_normally = False
938 self.failed_due_to_deadline = False
940 self.explain_traces = defaultdict(set)
941 self._start_timestamp = time.time()
942 self._string_repr = ""
943 self._timing_features = {}
945 @property
946 def test_identifier(self):
947 return getattr(
948 current_pytest_item.value, "nodeid", None
949 ) or get_pretty_function_description(self.wrapped_test)
951 def _should_trace(self):
952 # NOTE: we explicitly support monkeypatching this. Keep the namespace
953 # access intact.
954 _trace_obs = (
955 observability_enabled() and observability.OBSERVABILITY_COLLECT_COVERAGE
956 )
957 _trace_failure = (
958 self.failed_normally
959 and not self.failed_due_to_deadline
960 and {Phase.shrink, Phase.explain}.issubset(self.settings.phases)
961 )
962 return _trace_obs or _trace_failure
964 def execute_once(
965 self,
966 data,
967 *,
968 print_example=False,
969 is_final=False,
970 expected_failure=None,
971 example_kwargs=None,
972 ):
973 """Run the test function once, using ``data`` as input.
975 If the test raises an exception, it will propagate through to the
976 caller of this method. Depending on its type, this could represent
977 an ordinary test failure, or a fatal error, or a control exception.
979 If this method returns normally, the test might have passed, or
980 it might have placed ``data`` in an unsuccessful state and then
981 swallowed the corresponding control exception.
982 """
984 self.ever_executed = True
985 data.is_find = self.is_find
987 self._string_repr = ""
988 text_repr = None
989 if self.settings.deadline is None and not observability_enabled():
991 @proxies(self.test)
992 def test(*args, **kwargs):
993 with unwrap_markers_from_group():
994 # NOTE: For compatibility with Python 3.9's LL(1)
995 # parser, this is written as a nested with-statement,
996 # instead of a compound one.
997 with ensure_free_stackframes():
998 return self.test(*args, **kwargs)
1000 else:
1002 @proxies(self.test)
1003 def test(*args, **kwargs):
1004 arg_drawtime = math.fsum(data.draw_times.values())
1005 arg_stateful = math.fsum(data._stateful_run_times.values())
1006 arg_gctime = gc_cumulative_time()
1007 start = time.perf_counter()
1008 try:
1009 with unwrap_markers_from_group():
1010 # NOTE: For compatibility with Python 3.9's LL(1)
1011 # parser, this is written as a nested with-statement,
1012 # instead of a compound one.
1013 with ensure_free_stackframes():
1014 result = self.test(*args, **kwargs)
1015 finally:
1016 finish = time.perf_counter()
1017 in_drawtime = math.fsum(data.draw_times.values()) - arg_drawtime
1018 in_stateful = (
1019 math.fsum(data._stateful_run_times.values()) - arg_stateful
1020 )
1021 in_gctime = gc_cumulative_time() - arg_gctime
1022 runtime = finish - start - in_drawtime - in_stateful - in_gctime
1023 self._timing_features = {
1024 "execute:test": runtime,
1025 "overall:gc": in_gctime,
1026 **data.draw_times,
1027 **data._stateful_run_times,
1028 }
1030 if (
1031 (current_deadline := self.settings.deadline) is not None
1032 # we disable the deadline check under concurrent threads, since
1033 # cpython may switch away from a thread for arbitrarily long.
1034 and not self.thread_overlap.get(threading.get_ident(), False)
1035 ):
1036 if not is_final:
1037 current_deadline = (current_deadline // 4) * 5
1038 if runtime >= current_deadline.total_seconds():
1039 raise DeadlineExceeded(
1040 datetime.timedelta(seconds=runtime), self.settings.deadline
1041 )
1042 return result
1044 def run(data: ConjectureData) -> None:
1045 # Set up dynamic context needed by a single test run.
1046 if self.stuff.selfy is not None:
1047 data.hypothesis_runner = self.stuff.selfy
1048 # Generate all arguments to the test function.
1049 args = self.stuff.args
1050 kwargs = dict(self.stuff.kwargs)
1051 if example_kwargs is None:
1052 kw, argslices = context.prep_args_kwargs_from_strategies(
1053 self.stuff.given_kwargs
1054 )
1055 else:
1056 kw = example_kwargs
1057 argslices = {}
1058 kwargs.update(kw)
1059 if expected_failure is not None:
1060 nonlocal text_repr
1061 text_repr = repr_call(test, args, kwargs)
1063 if print_example or current_verbosity() >= Verbosity.verbose:
1064 printer = RepresentationPrinter(context=context)
1065 if print_example:
1066 printer.text("Falsifying example:")
1067 else:
1068 printer.text("Trying example:")
1070 if self.print_given_args:
1071 printer.text(" ")
1072 printer.repr_call(
1073 test.__name__,
1074 args,
1075 kwargs,
1076 force_split=True,
1077 arg_slices=argslices,
1078 leading_comment=(
1079 "# " + context.data.slice_comments[(0, 0)]
1080 if (0, 0) in context.data.slice_comments
1081 else None
1082 ),
1083 avoid_realization=data.provider.avoid_realization,
1084 )
1085 report(printer.getvalue())
1087 if observability_enabled():
1088 printer = RepresentationPrinter(context=context)
1089 printer.repr_call(
1090 test.__name__,
1091 args,
1092 kwargs,
1093 force_split=True,
1094 arg_slices=argslices,
1095 leading_comment=(
1096 "# " + context.data.slice_comments[(0, 0)]
1097 if (0, 0) in context.data.slice_comments
1098 else None
1099 ),
1100 avoid_realization=data.provider.avoid_realization,
1101 )
1102 self._string_repr = printer.getvalue()
1104 try:
1105 return test(*args, **kwargs)
1106 except TypeError as e:
1107 # If we sampled from a sequence of strategies, AND failed with a
1108 # TypeError, *AND that exception mentions SearchStrategy*, add a note:
1109 if (
1110 "SearchStrategy" in str(e)
1111 and data._sampled_from_all_strategies_elements_message is not None
1112 ):
1113 msg, format_arg = data._sampled_from_all_strategies_elements_message
1114 add_note(e, msg.format(format_arg))
1115 raise
1116 finally:
1117 if data._stateful_repr_parts is not None:
1118 self._string_repr = "\n".join(data._stateful_repr_parts)
1120 if observability_enabled():
1121 printer = RepresentationPrinter(context=context)
1122 for name, value in data._observability_args.items():
1123 if name.startswith("generate:Draw "):
1124 try:
1125 value = data.provider.realize(value)
1126 except BackendCannotProceed: # pragma: no cover
1127 value = "<backend failed to realize symbolic>"
1128 printer.text(f"\n{name.removeprefix('generate:')}: ")
1129 printer.pretty(value)
1131 self._string_repr += printer.getvalue()
1133 # self.test_runner can include the execute_example method, or setup/teardown
1134 # _example, so it's important to get the PRNG and build context in place first.
1135 #
1136 # NOTE: For compatibility with Python 3.9's LL(1) parser, this is written as
1137 # three nested with-statements, instead of one compound statement.
1138 with local_settings(self.settings):
1139 with deterministic_PRNG():
1140 with BuildContext(
1141 data, is_final=is_final, wrapped_test=self.wrapped_test
1142 ) as context:
1143 # providers may throw in per_case_context_fn, and we'd like
1144 # `result` to still be set in these cases.
1145 result = None
1146 with data.provider.per_test_case_context_manager():
1147 # Run the test function once, via the executor hook.
1148 # In most cases this will delegate straight to `run(data)`.
1149 result = self.test_runner(data, run)
1151 # If a failure was expected, it should have been raised already, so
1152 # instead raise an appropriate diagnostic error.
1153 if expected_failure is not None:
1154 exception, traceback = expected_failure
1155 if isinstance(exception, DeadlineExceeded) and (
1156 runtime_secs := math.fsum(
1157 v
1158 for k, v in self._timing_features.items()
1159 if k.startswith("execute:")
1160 )
1161 ):
1162 report(
1163 "Unreliable test timings! On an initial run, this "
1164 f"test took {exception.runtime.total_seconds() * 1000:.2f}ms, "
1165 "which exceeded the deadline of "
1166 f"{self.settings.deadline.total_seconds() * 1000:.2f}ms, but "
1167 f"on a subsequent run it took {runtime_secs * 1000:.2f} ms, "
1168 "which did not. If you expect this sort of "
1169 "variability in your test timings, consider turning "
1170 "deadlines off for this test by setting deadline=None."
1171 )
1172 else:
1173 report("Failed to reproduce exception. Expected: \n" + traceback)
1174 raise FlakyFailure(
1175 f"Hypothesis {text_repr} produces unreliable results: "
1176 "Falsified on the first call but did not on a subsequent one",
1177 [exception],
1178 )
1179 return result
1181 def _flaky_replay_to_failure(
1182 self, err: FlakyReplay, context: BaseException
1183 ) -> FlakyFailure:
1184 # Note that in the mark_interesting case, _context_ itself
1185 # is part of err._interesting_examples - but it's not in
1186 # _runner.interesting_examples - this is fine, as the context
1187 # (i.e., immediate exception) is appended.
1188 interesting_examples = [
1189 self._runner.interesting_examples[origin]
1190 for origin in err._interesting_origins
1191 if origin in self._runner.interesting_examples
1192 ]
1193 exceptions = [result.expected_exception for result in interesting_examples]
1194 exceptions.append(context) # the immediate exception
1195 return FlakyFailure(err.reason, exceptions)
1197 def _execute_once_for_engine(self, data: ConjectureData) -> None:
1198 """Wrapper around ``execute_once`` that intercepts test failure
1199 exceptions and single-test control exceptions, and turns them into
1200 appropriate method calls to `data` instead.
1202 This allows the engine to assume that any exception other than
1203 ``StopTest`` must be a fatal error, and should stop the entire engine.
1204 """
1205 trace: Trace = set()
1206 try:
1207 with Tracer(should_trace=self._should_trace()) as tracer:
1208 try:
1209 result = self.execute_once(data)
1210 if (
1211 data.status == Status.VALID and tracer.branches
1212 ): # pragma: no cover
1213 # This is in fact covered by our *non-coverage* tests, but due
1214 # to the settrace() contention *not* by our coverage tests.
1215 self.explain_traces[None].add(frozenset(tracer.branches))
1216 finally:
1217 trace = tracer.branches
1218 if result is not None:
1219 fail_health_check(
1220 self.settings,
1221 "Tests run under @given should return None, but "
1222 f"{self.test.__name__} returned {result!r} instead.",
1223 HealthCheck.return_value,
1224 )
1225 except UnsatisfiedAssumption as e:
1226 # An "assume" check failed, so instead we inform the engine that
1227 # this test run was invalid.
1228 try:
1229 data.mark_invalid(e.reason)
1230 except FlakyReplay as err:
1231 # This was unexpected, meaning that the assume was flaky.
1232 # Report it as such.
1233 raise self._flaky_replay_to_failure(err, e) from None
1234 except (StopTest, BackendCannotProceed):
1235 # The engine knows how to handle this control exception, so it's
1236 # OK to re-raise it.
1237 raise
1238 except (
1239 FailedHealthCheck,
1240 *skip_exceptions_to_reraise(),
1241 ):
1242 # These are fatal errors or control exceptions that should stop the
1243 # engine, so we re-raise them.
1244 raise
1245 except failure_exceptions_to_catch() as e:
1246 # If an unhandled (i.e., non-Hypothesis) error was raised by
1247 # Hypothesis-internal code, re-raise it as a fatal error instead
1248 # of treating it as a test failure.
1249 if isinstance(e, BaseExceptionGroup) and len(e.exceptions) == 1:
1250 # When a naked exception is implicitly wrapped in an ExceptionGroup
1251 # due to a re-raising "except*", the ExceptionGroup is constructed in
1252 # the caller's stack frame (see #4183). This workaround is specifically
1253 # for implicit wrapping of naked exceptions by "except*", since explicit
1254 # raising of ExceptionGroup gets the proper traceback in the first place
1255 # - there's no need to handle hierarchical groups here, at least if no
1256 # such implicit wrapping happens inside hypothesis code (we only care
1257 # about the hypothesis-or-not distinction).
1258 #
1259 # 01-25-2025: this was patched to give the correct
1260 # stacktrace in cpython https://github.com/python/cpython/issues/128799.
1261 # can remove once python3.11 is EOL.
1262 tb = e.exceptions[0].__traceback__ or e.__traceback__
1263 else:
1264 tb = e.__traceback__
1265 filepath = traceback.extract_tb(tb)[-1][0]
1266 if (
1267 is_hypothesis_file(filepath)
1268 and not isinstance(e, HypothesisException)
1269 # We expect backend authors to use the provider_conformance test
1270 # to test their backends. If an error occurs there, it is probably
1271 # from their backend, and we would like to treat it as a standard
1272 # error, not a hypothesis-internal error.
1273 and not filepath.endswith(
1274 f"internal{os.sep}conjecture{os.sep}provider_conformance.py"
1275 )
1276 ):
1277 raise
1279 if data.frozen:
1280 # This can happen if an error occurred in a finally
1281 # block somewhere, suppressing our original StopTest.
1282 # We raise a new one here to resume normal operation.
1283 raise StopTest(data.testcounter) from e
1284 else:
1285 # The test failed by raising an exception, so we inform the
1286 # engine that this test run was interesting. This is the normal
1287 # path for test runs that fail.
1288 tb = get_trimmed_traceback()
1289 data.expected_traceback = format_exception(e, tb)
1290 data.expected_exception = e
1291 assert data.expected_traceback is not None # for mypy
1292 verbose_report(data.expected_traceback)
1294 self.failed_normally = True
1296 interesting_origin = InterestingOrigin.from_exception(e)
1297 if trace: # pragma: no cover
1298 # Trace collection is explicitly disabled under coverage.
1299 self.explain_traces[interesting_origin].add(frozenset(trace))
1300 if interesting_origin.exc_type == DeadlineExceeded:
1301 self.failed_due_to_deadline = True
1302 self.explain_traces.clear()
1303 try:
1304 data.mark_interesting(interesting_origin)
1305 except FlakyReplay as err:
1306 raise self._flaky_replay_to_failure(err, e) from None
1308 finally:
1309 # Conditional here so we can save some time constructing the payload; in
1310 # other cases (without coverage) it's cheap enough to do that regardless.
1311 if observability_enabled():
1312 if runner := getattr(self, "_runner", None):
1313 phase = runner._current_phase
1314 else: # pragma: no cover # in case of messing with internals
1315 if self.failed_normally or self.failed_due_to_deadline:
1316 phase = "shrink"
1317 else:
1318 phase = "unknown"
1319 backend_desc = f", using backend={self.settings.backend!r}" * (
1320 self.settings.backend != "hypothesis"
1321 and not getattr(runner, "_switch_to_hypothesis_provider", False)
1322 )
1323 try:
1324 data._observability_args = data.provider.realize(
1325 data._observability_args
1326 )
1327 except BackendCannotProceed:
1328 data._observability_args = {}
1330 try:
1331 self._string_repr = data.provider.realize(self._string_repr)
1332 except BackendCannotProceed:
1333 self._string_repr = "<backend failed to realize symbolic arguments>"
1335 try:
1336 data.events = data.provider.realize(data.events)
1337 except BackendCannotProceed:
1338 data.events = {}
1340 data.freeze()
1341 tc = make_testcase(
1342 run_start=self._start_timestamp,
1343 property=self.test_identifier,
1344 data=data,
1345 how_generated=f"during {phase} phase{backend_desc}",
1346 representation=self._string_repr,
1347 arguments=data._observability_args,
1348 timing=self._timing_features,
1349 coverage=tractable_coverage_report(trace) or None,
1350 phase=phase,
1351 backend_metadata=data.provider.observe_test_case(),
1352 )
1353 deliver_observation(tc)
1355 for msg in data.provider.observe_information_messages(
1356 lifetime="test_case"
1357 ):
1358 self._deliver_information_message(**msg)
1359 self._timing_features = {}
1361 def _deliver_information_message(
1362 self, *, type: InfoObservationType, title: str, content: Union[str, dict]
1363 ) -> None:
1364 deliver_observation(
1365 InfoObservation(
1366 type=type,
1367 run_start=self._start_timestamp,
1368 property=self.test_identifier,
1369 title=title,
1370 content=content,
1371 )
1372 )
1374 def run_engine(self):
1375 """Run the test function many times, on database input and generated
1376 input, using the Conjecture engine.
1377 """
1378 # Tell pytest to omit the body of this function from tracebacks
1379 __tracebackhide__ = True
1380 try:
1381 database_key = self.wrapped_test._hypothesis_internal_database_key
1382 except AttributeError:
1383 if global_force_seed is None:
1384 database_key = function_digest(self.test)
1385 else:
1386 database_key = None
1388 runner = self._runner = ConjectureRunner(
1389 self._execute_once_for_engine,
1390 settings=self.settings,
1391 random=self.random,
1392 database_key=database_key,
1393 thread_overlap=self.thread_overlap,
1394 )
1395 # Use the Conjecture engine to run the test function many times
1396 # on different inputs.
1397 runner.run()
1398 note_statistics(runner.statistics)
1399 if observability_enabled():
1400 self._deliver_information_message(
1401 type="info",
1402 title="Hypothesis Statistics",
1403 content=describe_statistics(runner.statistics),
1404 )
1405 for msg in (
1406 p if isinstance(p := runner.provider, PrimitiveProvider) else p(None)
1407 ).observe_information_messages(lifetime="test_function"):
1408 self._deliver_information_message(**msg)
1410 if runner.call_count == 0:
1411 return
1412 if runner.interesting_examples:
1413 self.falsifying_examples = sorted(
1414 runner.interesting_examples.values(),
1415 key=lambda d: sort_key(d.nodes),
1416 reverse=True,
1417 )
1418 else:
1419 if runner.valid_examples == 0:
1420 explanations = []
1421 # use a somewhat arbitrary cutoff to avoid recommending spurious
1422 # fixes.
1423 # eg, a few invalid examples from internal filters when the
1424 # problem is the user generating large inputs, or a
1425 # few overruns during internal mutation when the problem is
1426 # impossible user filters/assumes.
1427 if runner.invalid_examples > min(20, runner.call_count // 5):
1428 explanations.append(
1429 f"{runner.invalid_examples} of {runner.call_count} "
1430 "examples failed a .filter() or assume() condition. Try "
1431 "making your filters or assumes less strict, or rewrite "
1432 "using strategy parameters: "
1433 "st.integers().filter(lambda x: x > 0) fails less often "
1434 "(that is, never) when rewritten as st.integers(min_value=1)."
1435 )
1436 if runner.overrun_examples > min(20, runner.call_count // 5):
1437 explanations.append(
1438 f"{runner.overrun_examples} of {runner.call_count} "
1439 "examples were too large to finish generating; try "
1440 "reducing the typical size of your inputs?"
1441 )
1442 rep = get_pretty_function_description(self.test)
1443 raise Unsatisfiable(
1444 f"Unable to satisfy assumptions of {rep}. "
1445 f"{' Also, '.join(explanations)}"
1446 )
1448 # If we have not traced executions, warn about that now (but only when
1449 # we'd expect to do so reliably, i.e. on CPython>=3.12)
1450 if (
1451 hasattr(sys, "monitoring")
1452 and not PYPY
1453 and self._should_trace()
1454 and not Tracer.can_trace()
1455 ): # pragma: no cover
1456 # actually covered by our tests, but only on >= 3.12
1457 warnings.warn(
1458 "avoiding tracing test function because tool id "
1459 f"{MONITORING_TOOL_ID} is already taken by tool "
1460 f"{sys.monitoring.get_tool(MONITORING_TOOL_ID)}.",
1461 HypothesisWarning,
1462 stacklevel=3,
1463 )
1465 if not self.falsifying_examples:
1466 return
1467 elif not (self.settings.report_multiple_bugs and pytest_shows_exceptiongroups):
1468 # Pretend that we only found one failure, by discarding the others.
1469 del self.falsifying_examples[:-1]
1471 # The engine found one or more failures, so we need to reproduce and
1472 # report them.
1474 errors_to_report = []
1476 report_lines = describe_targets(runner.best_observed_targets)
1477 if report_lines:
1478 report_lines.append("")
1480 explanations = explanatory_lines(self.explain_traces, self.settings)
1481 for falsifying_example in self.falsifying_examples:
1482 fragments = []
1484 ran_example = runner.new_conjecture_data(
1485 falsifying_example.choices, max_choices=len(falsifying_example.choices)
1486 )
1487 ran_example.slice_comments = falsifying_example.slice_comments
1488 tb = None
1489 origin = None
1490 assert falsifying_example.expected_exception is not None
1491 assert falsifying_example.expected_traceback is not None
1492 try:
1493 with with_reporter(fragments.append):
1494 self.execute_once(
1495 ran_example,
1496 print_example=not self.is_find,
1497 is_final=True,
1498 expected_failure=(
1499 falsifying_example.expected_exception,
1500 falsifying_example.expected_traceback,
1501 ),
1502 )
1503 except StopTest as e:
1504 # Link the expected exception from the first run. Not sure
1505 # how to access the current exception, if it failed
1506 # differently on this run. In fact, in the only known
1507 # reproducer, the StopTest is caused by OVERRUN before the
1508 # test is even executed. Possibly because all initial examples
1509 # failed until the final non-traced replay, and something was
1510 # exhausted? Possibly a FIXME, but sufficiently weird to
1511 # ignore for now.
1512 err = FlakyFailure(
1513 "Inconsistent results: An example failed on the "
1514 "first run but now succeeds (or fails with another "
1515 "error, or is for some reason not runnable).",
1516 # (note: e is a BaseException)
1517 [falsifying_example.expected_exception or e],
1518 )
1519 errors_to_report.append((fragments, err))
1520 except UnsatisfiedAssumption as e: # pragma: no cover # ironically flaky
1521 err = FlakyFailure(
1522 "Unreliable assumption: An example which satisfied "
1523 "assumptions on the first run now fails it.",
1524 [e],
1525 )
1526 errors_to_report.append((fragments, err))
1527 except BaseException as e:
1528 # If we have anything for explain-mode, this is the time to report.
1529 fragments.extend(explanations[falsifying_example.interesting_origin])
1530 errors_to_report.append(
1531 (fragments, e.with_traceback(get_trimmed_traceback()))
1532 )
1533 tb = format_exception(e, get_trimmed_traceback(e))
1534 origin = InterestingOrigin.from_exception(e)
1535 else:
1536 # execute_once() will always raise either the expected error, or Flaky.
1537 raise NotImplementedError("This should be unreachable")
1538 finally:
1539 ran_example.freeze()
1540 if observability_enabled():
1541 # log our observability line for the final failing example
1542 tc = make_testcase(
1543 run_start=self._start_timestamp,
1544 property=self.test_identifier,
1545 data=ran_example,
1546 how_generated="minimal failing example",
1547 representation=self._string_repr,
1548 arguments=ran_example._observability_args,
1549 timing=self._timing_features,
1550 coverage=None, # Not recorded when we're replaying the MFE
1551 status="passed" if sys.exc_info()[0] else "failed",
1552 status_reason=str(origin or "unexpected/flaky pass"),
1553 metadata={"traceback": tb},
1554 )
1555 deliver_observation(tc)
1557 # Whether or not replay actually raised the exception again, we want
1558 # to print the reproduce_failure decorator for the failing example.
1559 if self.settings.print_blob:
1560 fragments.append(
1561 "\nYou can reproduce this example by temporarily adding "
1562 f"{reproduction_decorator(falsifying_example.choices)} "
1563 "as a decorator on your test case"
1564 )
1566 _raise_to_user(
1567 errors_to_report,
1568 self.settings,
1569 report_lines,
1570 # A backend might report a failure and then report verified afterwards,
1571 # which is to be interpreted as "there are no more failures *other
1572 # than what we already reported*". Do not report this as unsound.
1573 unsound_backend=(
1574 runner._verified_by
1575 if runner._verified_by and not runner._backend_found_failure
1576 else None
1577 ),
1578 )
1581def _raise_to_user(
1582 errors_to_report, settings, target_lines, trailer="", *, unsound_backend=None
1583):
1584 """Helper function for attaching notes and grouping multiple errors."""
1585 failing_prefix = "Falsifying example: "
1586 ls = []
1587 for fragments, err in errors_to_report:
1588 for note in fragments:
1589 add_note(err, note)
1590 if note.startswith(failing_prefix):
1591 ls.append(note.removeprefix(failing_prefix))
1592 if current_pytest_item.value:
1593 current_pytest_item.value._hypothesis_failing_examples = ls
1595 if len(errors_to_report) == 1:
1596 _, the_error_hypothesis_found = errors_to_report[0]
1597 else:
1598 assert errors_to_report
1599 the_error_hypothesis_found = BaseExceptionGroup(
1600 f"Hypothesis found {len(errors_to_report)} distinct failures{trailer}.",
1601 [e for _, e in errors_to_report],
1602 )
1604 if settings.verbosity >= Verbosity.normal:
1605 for line in target_lines:
1606 add_note(the_error_hypothesis_found, line)
1608 if unsound_backend:
1609 add_note(
1610 err,
1611 f"backend={unsound_backend!r} claimed to verify this test passes - "
1612 "please send them a bug report!",
1613 )
1615 raise the_error_hypothesis_found
1618@contextlib.contextmanager
1619def fake_subTest(self, msg=None, **__):
1620 """Monkeypatch for `unittest.TestCase.subTest` during `@given`.
1622 If we don't patch this out, each failing example is reported as a
1623 separate failing test by the unittest test runner, which is
1624 obviously incorrect. We therefore replace it for the duration with
1625 this version.
1626 """
1627 warnings.warn(
1628 "subTest per-example reporting interacts badly with Hypothesis "
1629 "trying hundreds of examples, so we disable it for the duration of "
1630 "any test that uses `@given`.",
1631 HypothesisWarning,
1632 stacklevel=2,
1633 )
1634 yield
1637@dataclass
1638class HypothesisHandle:
1639 """This object is provided as the .hypothesis attribute on @given tests.
1641 Downstream users can reassign its attributes to insert custom logic into
1642 the execution of each case, for example by converting an async into a
1643 sync function.
1645 This must be an attribute of an attribute, because reassignment of a
1646 first-level attribute would not be visible to Hypothesis if the function
1647 had been decorated before the assignment.
1649 See https://github.com/HypothesisWorks/hypothesis/issues/1257 for more
1650 information.
1651 """
1653 inner_test: Any
1654 _get_fuzz_target: Any
1655 _given_kwargs: Any
1657 @property
1658 def fuzz_one_input(
1659 self,
1660 ) -> Callable[[Union[bytes, bytearray, memoryview, BinaryIO]], Optional[bytes]]:
1661 """Run the test as a fuzz target, driven with the `buffer` of bytes.
1663 Returns None if buffer invalid for the strategy, canonical pruned
1664 bytes if the buffer was valid, and leaves raised exceptions alone.
1665 """
1666 # Note: most users, if they care about fuzzer performance, will access the
1667 # property and assign it to a local variable to move the attribute lookup
1668 # outside their fuzzing loop / before the fork point. We cache it anyway,
1669 # so that naive or unusual use-cases get the best possible performance too.
1670 try:
1671 return self.__cached_target # type: ignore
1672 except AttributeError:
1673 self.__cached_target = self._get_fuzz_target()
1674 return self.__cached_target
1677@overload
1678def given(
1679 _: EllipsisType, /
1680) -> Callable[
1681 [Callable[..., Optional[Coroutine[Any, Any, None]]]], Callable[[], None]
1682]: # pragma: no cover
1683 ...
1686@overload
1687def given(
1688 *_given_arguments: SearchStrategy[Any],
1689) -> Callable[
1690 [Callable[..., Optional[Coroutine[Any, Any, None]]]], Callable[..., None]
1691]: # pragma: no cover
1692 ...
1695@overload
1696def given(
1697 **_given_kwargs: Union[SearchStrategy[Any], EllipsisType],
1698) -> Callable[
1699 [Callable[..., Optional[Coroutine[Any, Any, None]]]], Callable[..., None]
1700]: # pragma: no cover
1701 ...
1704def given(
1705 *_given_arguments: Union[SearchStrategy[Any], EllipsisType],
1706 **_given_kwargs: Union[SearchStrategy[Any], EllipsisType],
1707) -> Callable[
1708 [Callable[..., Optional[Coroutine[Any, Any, None]]]], Callable[..., None]
1709]:
1710 """
1711 The |@given| decorator turns a function into a Hypothesis test. This is the
1712 main entry point to Hypothesis.
1714 .. seealso::
1716 See also the :doc:`/tutorial/introduction` tutorial, which introduces
1717 defining Hypothesis tests with |@given|.
1719 .. _given-arguments:
1721 Arguments to ``@given``
1722 -----------------------
1724 Arguments to |@given| may be either positional or keyword arguments:
1726 .. code-block:: python
1728 @given(st.integers(), st.floats())
1729 def test_one(x, y):
1730 pass
1732 @given(x=st.integers(), y=st.floats())
1733 def test_two(x, y):
1734 pass
1736 If using keyword arguments, the arguments may appear in any order, as with
1737 standard Python functions:
1739 .. code-block:: python
1741 # different order, but still equivalent to before
1742 @given(y=st.floats(), x=st.integers())
1743 def test(x, y):
1744 assert isinstance(x, int)
1745 assert isinstance(y, float)
1747 If |@given| is provided fewer positional arguments than the decorated test,
1748 the test arguments are filled in on the right side, leaving the leftmost
1749 positional arguments unfilled:
1751 .. code-block:: python
1753 @given(st.integers(), st.floats())
1754 def test(manual_string, y, z):
1755 assert manual_string == "x"
1756 assert isinstance(y, int)
1757 assert isinstance(z, float)
1759 # `test` is now a callable which takes one argument `manual_string`
1761 test("x")
1762 # or equivalently:
1763 test(manual_string="x")
1765 The reason for this "from the right" behavior is to support using |@given|
1766 with instance methods, by automatically passing through ``self``:
1768 .. code-block:: python
1770 class MyTest(TestCase):
1771 @given(st.integers())
1772 def test(self, x):
1773 assert isinstance(self, MyTest)
1774 assert isinstance(x, int)
1776 If (and only if) using keyword arguments, |@given| may be combined with
1777 ``**kwargs`` or ``*args``:
1779 .. code-block:: python
1781 @given(x=integers(), y=integers())
1782 def test(x, **kwargs):
1783 assert "y" in kwargs
1785 @given(x=integers(), y=integers())
1786 def test(x, *args, **kwargs):
1787 assert args == ()
1788 assert "x" not in kwargs
1789 assert "y" in kwargs
1791 It is an error to:
1793 * Mix positional and keyword arguments to |@given|.
1794 * Use |@given| with a function that has a default value for an argument.
1795 * Use |@given| with positional arguments with a function that uses ``*args``,
1796 ``**kwargs``, or keyword-only arguments.
1798 The function returned by given has all the same arguments as the original
1799 test, minus those that are filled in by |@given|. See the :ref:`notes on
1800 framework compatibility <framework-compatibility>` for how this interacts
1801 with features of other testing libraries, such as :pypi:`pytest` fixtures.
1802 """
1804 if currently_in_test_context():
1805 fail_health_check(
1806 Settings(),
1807 "Nesting @given tests results in quadratic generation and shrinking "
1808 "behavior, and can usually be more cleanly expressed by replacing the "
1809 "inner function with an st.data() parameter on the outer @given."
1810 "\n\n"
1811 "If it is difficult or impossible to refactor this test to remove the "
1812 "nested @given, you can disable this health check with "
1813 "@settings(suppress_health_check=[HealthCheck.nested_given]) on the "
1814 "outer @given. See "
1815 "https://hypothesis.readthedocs.io/en/latest/reference/api.html#hypothesis.HealthCheck "
1816 "for details.",
1817 HealthCheck.nested_given,
1818 )
1820 def run_test_as_given(test):
1821 if inspect.isclass(test):
1822 # Provide a meaningful error to users, instead of exceptions from
1823 # internals that assume we're dealing with a function.
1824 raise InvalidArgument("@given cannot be applied to a class")
1826 if (
1827 "_pytest" in sys.modules
1828 and "_pytest.fixtures" in sys.modules
1829 and (
1830 tuple(map(int, sys.modules["_pytest"].__version__.split(".")[:2]))
1831 >= (8, 4)
1832 )
1833 and isinstance(
1834 test, sys.modules["_pytest.fixtures"].FixtureFunctionDefinition
1835 )
1836 ): # pragma: no cover # covered by pytest/test_fixtures, but not by cover/
1837 raise InvalidArgument("@given cannot be applied to a pytest fixture")
1839 given_arguments = tuple(_given_arguments)
1840 given_kwargs = dict(_given_kwargs)
1842 original_sig = get_signature(test)
1843 if given_arguments == (Ellipsis,) and not given_kwargs:
1844 # user indicated that they want to infer all arguments
1845 given_kwargs = {
1846 p.name: Ellipsis
1847 for p in original_sig.parameters.values()
1848 if p.kind in (p.POSITIONAL_OR_KEYWORD, p.KEYWORD_ONLY)
1849 }
1850 given_arguments = ()
1852 check_invalid = is_invalid_test(
1853 test, original_sig, given_arguments, given_kwargs
1854 )
1856 # If the argument check found problems, return a dummy test function
1857 # that will raise an error if it is actually called.
1858 if check_invalid is not None:
1859 return check_invalid
1861 # Because the argument check succeeded, we can convert @given's
1862 # positional arguments into keyword arguments for simplicity.
1863 if given_arguments:
1864 assert not given_kwargs
1865 posargs = [
1866 p.name
1867 for p in original_sig.parameters.values()
1868 if p.kind is p.POSITIONAL_OR_KEYWORD
1869 ]
1870 given_kwargs = dict(list(zip(posargs[::-1], given_arguments[::-1]))[::-1])
1871 # These have been converted, so delete them to prevent accidental use.
1872 del given_arguments
1874 new_signature = new_given_signature(original_sig, given_kwargs)
1876 # Use type information to convert "infer" arguments into appropriate strategies.
1877 if ... in given_kwargs.values():
1878 hints = get_type_hints(test)
1879 for name in [name for name, value in given_kwargs.items() if value is ...]:
1880 if name not in hints:
1881 return _invalid(
1882 f"passed {name}=... for {test.__name__}, but {name} has "
1883 "no type annotation",
1884 test=test,
1885 given_kwargs=given_kwargs,
1886 )
1887 given_kwargs[name] = st.from_type(hints[name])
1889 # only raise if the same thread uses two different executors, not if two
1890 # different threads use different executors.
1891 thread_local = ThreadLocal(prev_self=lambda: not_set)
1892 # maps thread_id to whether that thread overlaps in execution with any
1893 # other thread in this @given. We use this to detect whether an @given is
1894 # being run from multiple different threads at once, which informs
1895 # decisions like whether to raise DeadlineExceeded or HealthCheck.too_slow.
1896 thread_overlap: dict[int, bool] = {}
1897 thread_overlap_lock = Lock()
1899 @impersonate(test)
1900 @define_function_signature(test.__name__, test.__doc__, new_signature)
1901 def wrapped_test(*arguments, **kwargs):
1902 # Tell pytest to omit the body of this function from tracebacks
1903 __tracebackhide__ = True
1904 with thread_overlap_lock:
1905 for overlap_thread_id in thread_overlap:
1906 thread_overlap[overlap_thread_id] = True
1908 threadid = threading.get_ident()
1909 # if there are existing threads when this thread starts, then
1910 # this thread starts at an overlapped state.
1911 has_existing_threads = len(thread_overlap) > 0
1912 thread_overlap[threadid] = has_existing_threads
1914 try:
1915 test = wrapped_test.hypothesis.inner_test
1916 if getattr(test, "is_hypothesis_test", False):
1917 raise InvalidArgument(
1918 f"You have applied @given to the test {test.__name__} more than "
1919 "once, which wraps the test several times and is extremely slow. "
1920 "A similar effect can be gained by combining the arguments "
1921 "of the two calls to given. For example, instead of "
1922 "@given(booleans()) @given(integers()), you could write "
1923 "@given(booleans(), integers())"
1924 )
1926 settings = wrapped_test._hypothesis_internal_use_settings
1927 random = get_random_for_wrapped_test(test, wrapped_test)
1928 arguments, kwargs, stuff = process_arguments_to_given(
1929 wrapped_test,
1930 arguments,
1931 kwargs,
1932 given_kwargs,
1933 new_signature.parameters,
1934 )
1936 if (
1937 inspect.iscoroutinefunction(test)
1938 and get_executor(stuff.selfy) is default_executor
1939 ):
1940 # See https://github.com/HypothesisWorks/hypothesis/issues/3054
1941 # If our custom executor doesn't handle coroutines, or we return an
1942 # awaitable from a non-async-def function, we just rely on the
1943 # return_value health check. This catches most user errors though.
1944 raise InvalidArgument(
1945 "Hypothesis doesn't know how to run async test functions like "
1946 f"{test.__name__}. You'll need to write a custom executor, "
1947 "or use a library like pytest-asyncio or pytest-trio which can "
1948 "handle the translation for you.\n See https://hypothesis."
1949 "readthedocs.io/en/latest/details.html#custom-function-execution"
1950 )
1952 runner = stuff.selfy
1953 if isinstance(stuff.selfy, TestCase) and test.__name__ in dir(TestCase):
1954 fail_health_check(
1955 settings,
1956 f"You have applied @given to the method {test.__name__}, which is "
1957 "used by the unittest runner but is not itself a test. "
1958 "This is not useful in any way.",
1959 HealthCheck.not_a_test_method,
1960 )
1961 if bad_django_TestCase(runner): # pragma: no cover
1962 # Covered by the Django tests, but not the pytest coverage task
1963 raise InvalidArgument(
1964 "You have applied @given to a method on "
1965 f"{type(runner).__qualname__}, but this "
1966 "class does not inherit from the supported versions in "
1967 "`hypothesis.extra.django`. Use the Hypothesis variants "
1968 "to ensure that each example is run in a separate "
1969 "database transaction."
1970 )
1972 nonlocal thread_local
1973 # Check selfy really is self (not e.g. a mock) before we health-check
1974 cur_self = (
1975 stuff.selfy
1976 if getattr(type(stuff.selfy), test.__name__, None) is wrapped_test
1977 else None
1978 )
1979 if thread_local.prev_self is not_set:
1980 thread_local.prev_self = cur_self
1981 elif cur_self is not thread_local.prev_self:
1982 fail_health_check(
1983 settings,
1984 f"The method {test.__qualname__} was called from multiple "
1985 "different executors. This may lead to flaky tests and "
1986 "nonreproducible errors when replaying from database."
1987 "\n\n"
1988 "Unlike most health checks, HealthCheck.differing_executors "
1989 "warns about a correctness issue with your test. We "
1990 "therefore recommend fixing the underlying issue, rather "
1991 "than suppressing this health check. However, if you are "
1992 "confident this health check can be safely disabled, you can "
1993 "do so with "
1994 "@settings(suppress_health_check=[HealthCheck.differing_executors]). "
1995 "See "
1996 "https://hypothesis.readthedocs.io/en/latest/reference/api.html#hypothesis.HealthCheck "
1997 "for details.",
1998 HealthCheck.differing_executors,
1999 )
2001 state = StateForActualGivenExecution(
2002 stuff,
2003 test,
2004 settings,
2005 random,
2006 wrapped_test,
2007 thread_overlap=thread_overlap,
2008 )
2010 # If there was a @reproduce_failure decorator, use it to reproduce
2011 # the error (or complain that we couldn't). Either way, this will
2012 # always raise some kind of error.
2013 if (
2014 reproduce_failure := wrapped_test._hypothesis_internal_use_reproduce_failure
2015 ) is not None:
2016 expected_version, failure = reproduce_failure
2017 if expected_version != __version__:
2018 raise InvalidArgument(
2019 "Attempting to reproduce a failure from a different "
2020 f"version of Hypothesis. This failure is from {expected_version}, but "
2021 f"you are currently running {__version__!r}. Please change your "
2022 "Hypothesis version to a matching one."
2023 )
2024 try:
2025 state.execute_once(
2026 ConjectureData.for_choices(decode_failure(failure)),
2027 print_example=True,
2028 is_final=True,
2029 )
2030 raise DidNotReproduce(
2031 "Expected the test to raise an error, but it "
2032 "completed successfully."
2033 )
2034 except StopTest:
2035 raise DidNotReproduce(
2036 "The shape of the test data has changed in some way "
2037 "from where this blob was defined. Are you sure "
2038 "you're running the same test?"
2039 ) from None
2040 except UnsatisfiedAssumption:
2041 raise DidNotReproduce(
2042 "The test data failed to satisfy an assumption in the "
2043 "test. Have you added it since this blob was generated?"
2044 ) from None
2046 # There was no @reproduce_failure, so start by running any explicit
2047 # examples from @example decorators.
2048 if errors := list(
2049 execute_explicit_examples(
2050 state, wrapped_test, arguments, kwargs, original_sig
2051 )
2052 ):
2053 # If we're not going to report multiple bugs, we would have
2054 # stopped running explicit examples at the first failure.
2055 assert len(errors) == 1 or state.settings.report_multiple_bugs
2057 # If an explicit example raised a 'skip' exception, ensure it's never
2058 # wrapped up in an exception group. Because we break out of the loop
2059 # immediately on finding a skip, if present it's always the last error.
2060 if isinstance(errors[-1][1], skip_exceptions_to_reraise()):
2061 # Covered by `test_issue_3453_regression`, just in a subprocess.
2062 del errors[:-1] # pragma: no cover
2064 _raise_to_user(errors, state.settings, [], " in explicit examples")
2066 # If there were any explicit examples, they all ran successfully.
2067 # The next step is to use the Conjecture engine to run the test on
2068 # many different inputs.
2069 ran_explicit_examples = (
2070 Phase.explicit in state.settings.phases
2071 and getattr(wrapped_test, "hypothesis_explicit_examples", ())
2072 )
2073 SKIP_BECAUSE_NO_EXAMPLES = unittest.SkipTest(
2074 "Hypothesis has been told to run no examples for this test."
2075 )
2076 if not (
2077 Phase.reuse in settings.phases or Phase.generate in settings.phases
2078 ):
2079 if not ran_explicit_examples:
2080 raise SKIP_BECAUSE_NO_EXAMPLES
2081 return
2083 try:
2084 if isinstance(runner, TestCase) and hasattr(runner, "subTest"):
2085 subTest = runner.subTest
2086 try:
2087 runner.subTest = types.MethodType(fake_subTest, runner)
2088 state.run_engine()
2089 finally:
2090 runner.subTest = subTest
2091 else:
2092 state.run_engine()
2093 except BaseException as e:
2094 # The exception caught here should either be an actual test
2095 # failure (or BaseExceptionGroup), or some kind of fatal error
2096 # that caused the engine to stop.
2097 generated_seed = (
2098 wrapped_test._hypothesis_internal_use_generated_seed
2099 )
2100 with local_settings(settings):
2101 if not (state.failed_normally or generated_seed is None):
2102 if running_under_pytest:
2103 report(
2104 f"You can add @seed({generated_seed}) to this test or "
2105 f"run pytest with --hypothesis-seed={generated_seed} "
2106 "to reproduce this failure."
2107 )
2108 else:
2109 report(
2110 f"You can add @seed({generated_seed}) to this test to "
2111 "reproduce this failure."
2112 )
2113 # The dance here is to avoid showing users long tracebacks
2114 # full of Hypothesis internals they don't care about.
2115 # We have to do this inline, to avoid adding another
2116 # internal stack frame just when we've removed the rest.
2117 #
2118 # Using a variable for our trimmed error ensures that the line
2119 # which will actually appear in tracebacks is as clear as
2120 # possible - "raise the_error_hypothesis_found".
2121 the_error_hypothesis_found = e.with_traceback(
2122 None
2123 if isinstance(e, BaseExceptionGroup)
2124 else get_trimmed_traceback()
2125 )
2126 raise the_error_hypothesis_found
2128 if not (ran_explicit_examples or state.ever_executed):
2129 raise SKIP_BECAUSE_NO_EXAMPLES
2130 finally:
2131 with thread_overlap_lock:
2132 del thread_overlap[threadid]
2134 def _get_fuzz_target() -> (
2135 Callable[[Union[bytes, bytearray, memoryview, BinaryIO]], Optional[bytes]]
2136 ):
2137 # Because fuzzing interfaces are very performance-sensitive, we use a
2138 # somewhat more complicated structure here. `_get_fuzz_target()` is
2139 # called by the `HypothesisHandle.fuzz_one_input` property, allowing
2140 # us to defer our collection of the settings, random instance, and
2141 # reassignable `inner_test` (etc) until `fuzz_one_input` is accessed.
2142 #
2143 # We then share the performance cost of setting up `state` between
2144 # many invocations of the target. We explicitly force `deadline=None`
2145 # for performance reasons, saving ~40% the runtime of an empty test.
2146 test = wrapped_test.hypothesis.inner_test
2147 settings = Settings(
2148 parent=wrapped_test._hypothesis_internal_use_settings, deadline=None
2149 )
2150 random = get_random_for_wrapped_test(test, wrapped_test)
2151 _args, _kwargs, stuff = process_arguments_to_given(
2152 wrapped_test, (), {}, given_kwargs, new_signature.parameters
2153 )
2154 assert not _args
2155 assert not _kwargs
2156 state = StateForActualGivenExecution(
2157 stuff,
2158 test,
2159 settings,
2160 random,
2161 wrapped_test,
2162 thread_overlap=thread_overlap,
2163 )
2164 database_key = function_digest(test) + b".secondary"
2165 # We track the minimal-so-far example for each distinct origin, so
2166 # that we track log-n instead of n examples for long runs. In particular
2167 # it means that we saturate for common errors in long runs instead of
2168 # storing huge volumes of low-value data.
2169 minimal_failures: dict = {}
2171 def fuzz_one_input(
2172 buffer: Union[bytes, bytearray, memoryview, BinaryIO],
2173 ) -> Optional[bytes]:
2174 # This inner part is all that the fuzzer will actually run,
2175 # so we keep it as small and as fast as possible.
2176 if isinstance(buffer, io.IOBase):
2177 buffer = buffer.read(BUFFER_SIZE)
2178 assert isinstance(buffer, (bytes, bytearray, memoryview))
2179 data = ConjectureData(
2180 random=None,
2181 provider=BytestringProvider,
2182 provider_kw={"bytestring": buffer},
2183 )
2184 try:
2185 state.execute_once(data)
2186 status = Status.VALID
2187 except StopTest:
2188 status = data.status
2189 return None
2190 except UnsatisfiedAssumption:
2191 status = Status.INVALID
2192 return None
2193 except BaseException:
2194 known = minimal_failures.get(data.interesting_origin)
2195 if settings.database is not None and (
2196 known is None or sort_key(data.nodes) <= sort_key(known)
2197 ):
2198 settings.database.save(
2199 database_key, choices_to_bytes(data.choices)
2200 )
2201 minimal_failures[data.interesting_origin] = data.nodes
2202 status = Status.INTERESTING
2203 raise
2204 finally:
2205 if observability_enabled():
2206 data.freeze()
2207 tc = make_testcase(
2208 run_start=state._start_timestamp,
2209 property=state.test_identifier,
2210 data=data,
2211 how_generated="fuzz_one_input",
2212 representation=state._string_repr,
2213 arguments=data._observability_args,
2214 timing=state._timing_features,
2215 coverage=None,
2216 status=status,
2217 backend_metadata=data.provider.observe_test_case(),
2218 )
2219 deliver_observation(tc)
2220 state._timing_features = {}
2222 assert isinstance(data.provider, BytestringProvider)
2223 return bytes(data.provider.drawn)
2225 fuzz_one_input.__doc__ = HypothesisHandle.fuzz_one_input.__doc__
2226 return fuzz_one_input
2228 # After having created the decorated test function, we need to copy
2229 # over some attributes to make the switch as seamless as possible.
2231 for attrib in dir(test):
2232 if not (attrib.startswith("_") or hasattr(wrapped_test, attrib)):
2233 setattr(wrapped_test, attrib, getattr(test, attrib))
2234 wrapped_test.is_hypothesis_test = True
2235 if hasattr(test, "_hypothesis_internal_settings_applied"):
2236 # Used to check if @settings is applied twice.
2237 wrapped_test._hypothesis_internal_settings_applied = True
2238 wrapped_test._hypothesis_internal_use_seed = getattr(
2239 test, "_hypothesis_internal_use_seed", None
2240 )
2241 wrapped_test._hypothesis_internal_use_settings = (
2242 getattr(test, "_hypothesis_internal_use_settings", None) or Settings.default
2243 )
2244 wrapped_test._hypothesis_internal_use_reproduce_failure = getattr(
2245 test, "_hypothesis_internal_use_reproduce_failure", None
2246 )
2247 wrapped_test.hypothesis = HypothesisHandle(test, _get_fuzz_target, given_kwargs)
2248 return wrapped_test
2250 return run_test_as_given
2253def find(
2254 specifier: SearchStrategy[Ex],
2255 condition: Callable[[Any], bool],
2256 *,
2257 settings: Optional[Settings] = None,
2258 random: Optional[Random] = None,
2259 database_key: Optional[bytes] = None,
2260) -> Ex:
2261 """Returns the minimal example from the given strategy ``specifier`` that
2262 matches the predicate function ``condition``."""
2263 if settings is None:
2264 settings = Settings(max_examples=2000)
2265 settings = Settings(
2266 settings, suppress_health_check=list(HealthCheck), report_multiple_bugs=False
2267 )
2269 if database_key is None and settings.database is not None:
2270 # Note: The database key is not guaranteed to be unique. If not, replaying
2271 # of database examples may fail to reproduce due to being replayed on the
2272 # wrong condition.
2273 database_key = function_digest(condition)
2275 if not isinstance(specifier, SearchStrategy):
2276 raise InvalidArgument(
2277 f"Expected SearchStrategy but got {specifier!r} of "
2278 f"type {type(specifier).__name__}"
2279 )
2280 specifier.validate()
2282 last: list[Ex] = []
2284 @settings
2285 @given(specifier)
2286 def test(v):
2287 if condition(v):
2288 last[:] = [v]
2289 raise Found
2291 if random is not None:
2292 test = seed(random.getrandbits(64))(test)
2294 # Aliasing as Any avoids mypy errors (attr-defined) when accessing and
2295 # setting custom attributes on the decorated function or class.
2296 _test: Any = test
2297 _test._hypothesis_internal_is_find = True
2298 _test._hypothesis_internal_database_key = database_key
2300 try:
2301 test()
2302 except Found:
2303 return last[0]
2305 raise NoSuchExample(get_pretty_function_description(condition))