1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
10
11import importlib
12import inspect
13import math
14import threading
15import time
16from collections import defaultdict
17from collections.abc import Callable, Generator, Sequence
18from contextlib import AbstractContextManager, contextmanager, nullcontext, suppress
19from dataclasses import dataclass, field
20from datetime import timedelta
21from enum import Enum
22from random import Random
23from typing import Literal, NoReturn, cast
24
25from hypothesis import HealthCheck, Phase, Verbosity, settings as Settings
26from hypothesis._settings import local_settings, note_deprecation
27from hypothesis.database import ExampleDatabase, choices_from_bytes, choices_to_bytes
28from hypothesis.errors import (
29 BackendCannotProceed,
30 FlakyBackendFailure,
31 HypothesisException,
32 InvalidArgument,
33 StopTest,
34)
35from hypothesis.internal.cache import LRUReusedCache
36from hypothesis.internal.compat import NotRequired, TypedDict, ceil, override
37from hypothesis.internal.conjecture.choice import (
38 ChoiceConstraintsT,
39 ChoiceKeyT,
40 ChoiceNode,
41 ChoiceT,
42 ChoiceTemplate,
43 choices_key,
44)
45from hypothesis.internal.conjecture.data import (
46 ConjectureData,
47 ConjectureResult,
48 DataObserver,
49 Overrun,
50 Status,
51 _Overrun,
52)
53from hypothesis.internal.conjecture.datatree import (
54 DataTree,
55 PreviouslyUnseenBehaviour,
56 TreeRecordingObserver,
57)
58from hypothesis.internal.conjecture.junkdrawer import (
59 ensure_free_stackframes,
60 startswith,
61)
62from hypothesis.internal.conjecture.pareto import NO_SCORE, ParetoFront, ParetoOptimiser
63from hypothesis.internal.conjecture.providers import (
64 AVAILABLE_PROVIDERS,
65 HypothesisProvider,
66 PrimitiveProvider,
67)
68from hypothesis.internal.conjecture.shrinker import Shrinker, ShrinkPredicateT, sort_key
69from hypothesis.internal.escalation import InterestingOrigin
70from hypothesis.internal.healthcheck import fail_health_check
71from hypothesis.internal.observability import Observation, with_observability_callback
72from hypothesis.reporting import base_report, report
73
74# In most cases, the following constants are all Final. However, we do allow users
75# to monkeypatch all of these variables, which means we cannot annotate them as
76# Final or mypyc will inline them and render monkeypatching useless.
77
78#: The maximum number of times the shrinker will reduce the complexity of a failing
79#: input before giving up. This avoids falling down a trap of exponential (or worse)
80#: complexity, where the shrinker appears to be making progress but will take a
81#: substantially long time to finish completely.
82MAX_SHRINKS: int = 500
83
84# If the shrinking phase takes more than five minutes, abort it early and print
85# a warning. Many CI systems will kill a build after around ten minutes with
86# no output, and appearing to hang isn't great for interactive use either -
87# showing partially-shrunk examples is better than quitting with no examples!
88# (but make it monkeypatchable, for the rare users who need to keep on shrinking)
89
90#: The maximum total time in seconds that the shrinker will try to shrink a failure
91#: for before giving up. This is across all shrinks for the same failure, so even
92#: if the shrinker successfully reduces the complexity of a single failure several
93#: times, it will stop when it hits |MAX_SHRINKING_SECONDS| of total time taken.
94MAX_SHRINKING_SECONDS: int = 300
95
96#: The maximum amount of entropy a single test case can use before giving up
97#: while making random choices during input generation.
98#:
99#: The "unit" of one |BUFFER_SIZE| does not have any defined semantics, and you
100#: should not rely on it, except that a linear increase |BUFFER_SIZE| will linearly
101#: increase the amount of entropy a test case can use during generation.
102BUFFER_SIZE: int = 8 * 1024
103CACHE_SIZE: int = 10000
104MIN_TEST_CALLS: int = 10
105
106# we use this to isolate Hypothesis from interacting with the global random,
107# to make it easier to reason about our global random warning logic easier (see
108# deprecate_random_in_strategy).
109_random = Random()
110
111
112def shortlex(s):
113 return (len(s), s)
114
115
116@dataclass(slots=True, frozen=False)
117class HealthCheckState:
118 valid_examples: int = field(default=0)
119 invalid_examples: int = field(default=0)
120 overrun_examples: int = field(default=0)
121 draw_times: defaultdict[str, list[float]] = field(
122 default_factory=lambda: defaultdict(list)
123 )
124
125 @property
126 def total_draw_time(self) -> float:
127 return math.fsum(sum(self.draw_times.values(), start=[]))
128
129 def timing_report(self) -> str:
130 """Return a terminal report describing what was slow."""
131 if not self.draw_times:
132 return ""
133 width = max(
134 len(k.removeprefix("generate:").removesuffix(": ")) for k in self.draw_times
135 )
136 out = [f"\n {'':^{width}} count | fraction | slowest draws (seconds)"]
137 args_in_order = sorted(self.draw_times.items(), key=lambda kv: -sum(kv[1]))
138 for i, (argname, times) in enumerate(args_in_order): # pragma: no branch
139 # If we have very many unique keys, which can happen due to interactive
140 # draws with computed labels, we'll skip uninformative rows.
141 if (
142 5 <= i < (len(self.draw_times) - 2)
143 and math.fsum(times) * 20 < self.total_draw_time
144 ):
145 out.append(f" (skipped {len(self.draw_times) - i} rows of fast draws)")
146 break
147 # Compute the row to report, omitting times <1ms to focus on slow draws
148 reprs = [f"{t:>6.3f}," for t in sorted(times)[-5:] if t > 5e-4]
149 desc = " ".join(([" -- "] * 5 + reprs)[-5:]).rstrip(",")
150 arg = argname.removeprefix("generate:").removesuffix(": ")
151 out.append(
152 f" {arg:^{width}} | {len(times):>4} | "
153 f"{math.fsum(times)/self.total_draw_time:>7.0%} | {desc}"
154 )
155 return "\n".join(out)
156
157
158class ExitReason(Enum):
159 max_examples = "settings.max_examples={s.max_examples}"
160 max_iterations = (
161 "settings.max_examples={s.max_examples}, "
162 "but < 10% of examples satisfied assumptions"
163 )
164 max_shrinks = f"shrunk example {MAX_SHRINKS} times"
165 finished = "nothing left to do"
166 flaky = "test was flaky"
167 very_slow_shrinking = "shrinking was very slow"
168
169 def describe(self, settings: Settings) -> str:
170 return self.value.format(s=settings)
171
172
173class RunIsComplete(Exception):
174 pass
175
176
177def _get_provider(backend: str) -> PrimitiveProvider | type[PrimitiveProvider]:
178 provider_cls = AVAILABLE_PROVIDERS[backend]
179 if isinstance(provider_cls, str):
180 module_name, class_name = provider_cls.rsplit(".", 1)
181 provider_cls = getattr(importlib.import_module(module_name), class_name)
182
183 if provider_cls.lifetime == "test_function":
184 return provider_cls(None)
185 elif provider_cls.lifetime == "test_case":
186 return provider_cls
187 else:
188 raise InvalidArgument(
189 f"invalid lifetime {provider_cls.lifetime} for provider {provider_cls.__name__}. "
190 "Expected one of 'test_function', 'test_case'."
191 )
192
193
194class CallStats(TypedDict):
195 status: str
196 runtime: float
197 drawtime: float
198 gctime: float
199 events: list[str]
200
201
202PhaseStatistics = TypedDict(
203 "PhaseStatistics",
204 {
205 "duration-seconds": float,
206 "test-cases": list[CallStats],
207 "distinct-failures": int,
208 "shrinks-successful": int,
209 },
210)
211StatisticsDict = TypedDict(
212 "StatisticsDict",
213 {
214 "generate-phase": NotRequired[PhaseStatistics],
215 "reuse-phase": NotRequired[PhaseStatistics],
216 "shrink-phase": NotRequired[PhaseStatistics],
217 "stopped-because": NotRequired[str],
218 "targets": NotRequired[dict[str, float]],
219 "nodeid": NotRequired[str],
220 },
221)
222
223
224def choice_count(choices: Sequence[ChoiceT | ChoiceTemplate]) -> int | None:
225 count = 0
226 for choice in choices:
227 if isinstance(choice, ChoiceTemplate):
228 if choice.count is None:
229 return None
230 count += choice.count
231 else:
232 count += 1
233 return count
234
235
236class DiscardObserver(DataObserver):
237 @override
238 def kill_branch(self) -> NoReturn:
239 raise ContainsDiscard
240
241
242def realize_choices(data: ConjectureData, *, for_failure: bool) -> None:
243 # backwards-compatibility with backends without for_failure, can remove
244 # in a few months
245 kwargs = {}
246 if for_failure:
247 if "for_failure" in inspect.signature(data.provider.realize).parameters:
248 kwargs["for_failure"] = True
249 else:
250 note_deprecation(
251 f"{type(data.provider).__qualname__}.realize does not have the "
252 "for_failure parameter. This will be an error in future versions "
253 "of Hypothesis. (If you installed this backend from a separate "
254 "package, upgrading that package may help).",
255 has_codemod=False,
256 since="2025-05-07",
257 )
258
259 for node in data.nodes:
260 value = data.provider.realize(node.value, **kwargs)
261 expected_type = {
262 "string": str,
263 "float": float,
264 "integer": int,
265 "boolean": bool,
266 "bytes": bytes,
267 }[node.type]
268 if type(value) is not expected_type:
269 raise HypothesisException(
270 f"expected {expected_type} from "
271 f"{data.provider.realize.__qualname__}, got {type(value)}"
272 )
273
274 constraints = cast(
275 ChoiceConstraintsT,
276 {
277 k: data.provider.realize(v, **kwargs)
278 for k, v in node.constraints.items()
279 },
280 )
281 node.value = value
282 node.constraints = constraints
283
284
285class ConjectureRunner:
286 def __init__(
287 self,
288 test_function: Callable[[ConjectureData], None],
289 *,
290 settings: Settings | None = None,
291 random: Random | None = None,
292 database_key: bytes | None = None,
293 ignore_limits: bool = False,
294 thread_overlap: dict[int, bool] | None = None,
295 ) -> None:
296 self._test_function: Callable[[ConjectureData], None] = test_function
297 self.settings: Settings = settings or Settings()
298 self.shrinks: int = 0
299 self.finish_shrinking_deadline: float | None = None
300 self.call_count: int = 0
301 self.misaligned_count: int = 0
302 self.valid_examples: int = 0
303 self.invalid_examples: int = 0
304 self.overrun_examples: int = 0
305 self.random: Random = random or Random(_random.getrandbits(128))
306 self.database_key: bytes | None = database_key
307 self.ignore_limits: bool = ignore_limits
308 self.thread_overlap = {} if thread_overlap is None else thread_overlap
309
310 # Global dict of per-phase statistics, and a list of per-call stats
311 # which transfer to the global dict at the end of each phase.
312 self._current_phase: str = "(not a phase)"
313 self.statistics: StatisticsDict = {}
314 self.stats_per_test_case: list[CallStats] = []
315
316 self.interesting_examples: dict[InterestingOrigin, ConjectureResult] = {}
317 # We use call_count because there may be few possible valid_examples.
318 self.first_bug_found_at: int | None = None
319 self.last_bug_found_at: int | None = None
320 self.first_bug_found_time: float = math.inf
321
322 self.shrunk_examples: set[InterestingOrigin] = set()
323 self.health_check_state: HealthCheckState | None = None
324 self.tree: DataTree = DataTree()
325 self.provider: PrimitiveProvider | type[PrimitiveProvider] = _get_provider(
326 self.settings.backend
327 )
328
329 self.best_observed_targets: defaultdict[str, float] = defaultdict(
330 lambda: NO_SCORE
331 )
332 self.best_examples_of_observed_targets: dict[str, ConjectureResult] = {}
333
334 # We keep the pareto front in the example database if we have one. This
335 # is only marginally useful at present, but speeds up local development
336 # because it means that large targets will be quickly surfaced in your
337 # testing.
338 self.pareto_front: ParetoFront | None = None
339 if self.database_key is not None and self.settings.database is not None:
340 self.pareto_front = ParetoFront(self.random)
341 self.pareto_front.on_evict(self.on_pareto_evict)
342
343 # We want to be able to get the ConjectureData object that results
344 # from running a choice sequence without recalculating, especially during
345 # shrinking where we need to know about the structure of the
346 # executed test case.
347 self.__data_cache = LRUReusedCache[
348 tuple[ChoiceKeyT, ...], ConjectureResult | _Overrun
349 ](CACHE_SIZE)
350
351 self.reused_previously_shrunk_test_case: bool = False
352
353 self.__pending_call_explanation: str | None = None
354 self._backend_found_failure: bool = False
355 self._backend_exceeded_deadline: bool = False
356 self._switch_to_hypothesis_provider: bool = False
357
358 self.__failed_realize_count: int = 0
359 # note unsound verification by alt backends
360 self._verified_by: str | None = None
361
362 @contextmanager
363 def _with_switch_to_hypothesis_provider(
364 self, value: bool
365 ) -> Generator[None, None, None]:
366 previous = self._switch_to_hypothesis_provider
367 try:
368 self._switch_to_hypothesis_provider = value
369 yield
370 finally:
371 self._switch_to_hypothesis_provider = previous
372
373 @property
374 def using_hypothesis_backend(self) -> bool:
375 return (
376 self.settings.backend == "hypothesis" or self._switch_to_hypothesis_provider
377 )
378
379 def explain_next_call_as(self, explanation: str) -> None:
380 self.__pending_call_explanation = explanation
381
382 def clear_call_explanation(self) -> None:
383 self.__pending_call_explanation = None
384
385 @contextmanager
386 def _log_phase_statistics(
387 self, phase: Literal["reuse", "generate", "shrink"]
388 ) -> Generator[None, None, None]:
389 self.stats_per_test_case.clear()
390 start_time = time.perf_counter()
391 try:
392 self._current_phase = phase
393 yield
394 finally:
395 self.statistics[phase + "-phase"] = { # type: ignore
396 "duration-seconds": time.perf_counter() - start_time,
397 "test-cases": list(self.stats_per_test_case),
398 "distinct-failures": len(self.interesting_examples),
399 "shrinks-successful": self.shrinks,
400 }
401
402 @property
403 def should_optimise(self) -> bool:
404 return Phase.target in self.settings.phases
405
406 def __tree_is_exhausted(self) -> bool:
407 return self.tree.is_exhausted and self.using_hypothesis_backend
408
409 def __stoppable_test_function(self, data: ConjectureData) -> None:
410 """Run ``self._test_function``, but convert a ``StopTest`` exception
411 into a normal return and avoid raising anything flaky for RecursionErrors.
412 """
413 # We ensure that the test has this much stack space remaining, no
414 # matter the size of the stack when called, to de-flake RecursionErrors
415 # (#2494, #3671). Note, this covers the data generation part of the test;
416 # the actual test execution is additionally protected at the call site
417 # in hypothesis.core.execute_once.
418 with ensure_free_stackframes():
419 try:
420 self._test_function(data)
421 except StopTest as e:
422 if e.testcounter == data.testcounter:
423 # This StopTest has successfully stopped its test, and can now
424 # be discarded.
425 pass
426 else:
427 # This StopTest was raised by a different ConjectureData. We
428 # need to re-raise it so that it will eventually reach the
429 # correct engine.
430 raise
431
432 def _cache_key(self, choices: Sequence[ChoiceT]) -> tuple[ChoiceKeyT, ...]:
433 return choices_key(choices)
434
435 def _cache(self, data: ConjectureData) -> None:
436 result = data.as_result()
437 key = self._cache_key(data.choices)
438 self.__data_cache[key] = result
439
440 def cached_test_function(
441 self,
442 choices: Sequence[ChoiceT | ChoiceTemplate],
443 *,
444 error_on_discard: bool = False,
445 extend: int | Literal["full"] = 0,
446 ) -> ConjectureResult | _Overrun:
447 """
448 If ``error_on_discard`` is set to True this will raise ``ContainsDiscard``
449 in preference to running the actual test function. This is to allow us
450 to skip test cases we expect to be redundant in some cases. Note that
451 it may be the case that we don't raise ``ContainsDiscard`` even if the
452 result has discards if we cannot determine from previous runs whether
453 it will have a discard.
454 """
455 # node templates represent a not-yet-filled hole and therefore cannot
456 # be cached or retrieved from the cache.
457 if not any(isinstance(choice, ChoiceTemplate) for choice in choices):
458 # this type cast is validated by the isinstance check above (ie, there
459 # are no ChoiceTemplate elements).
460 choices = cast(Sequence[ChoiceT], choices)
461 key = self._cache_key(choices)
462 try:
463 cached = self.__data_cache[key]
464 # if we have a cached overrun for this key, but we're allowing extensions
465 # of the nodes, it could in fact run to a valid data if we try.
466 if extend == 0 or cached.status is not Status.OVERRUN:
467 return cached
468 except KeyError:
469 pass
470
471 if extend == "full":
472 max_length = None
473 elif (count := choice_count(choices)) is None:
474 max_length = None
475 else:
476 max_length = count + extend
477
478 # explicitly use a no-op DataObserver here instead of a TreeRecordingObserver.
479 # The reason is we don't expect simulate_test_function to explore new choices
480 # and write back to the tree, so we don't want the overhead of the
481 # TreeRecordingObserver tracking those calls.
482 trial_observer: DataObserver | None = DataObserver()
483 if error_on_discard:
484 trial_observer = DiscardObserver()
485
486 try:
487 trial_data = self.new_conjecture_data(
488 choices, observer=trial_observer, max_choices=max_length
489 )
490 self.tree.simulate_test_function(trial_data)
491 except PreviouslyUnseenBehaviour:
492 pass
493 else:
494 trial_data.freeze()
495 key = self._cache_key(trial_data.choices)
496 if trial_data.status > Status.OVERRUN:
497 try:
498 return self.__data_cache[key]
499 except KeyError:
500 pass
501 else:
502 # if we simulated to an overrun, then we our result is certainly
503 # an overrun; no need to consult the cache. (and we store this result
504 # for simulation-less lookup later).
505 self.__data_cache[key] = Overrun
506 return Overrun
507 try:
508 return self.__data_cache[key]
509 except KeyError:
510 pass
511
512 data = self.new_conjecture_data(choices, max_choices=max_length)
513 # note that calling test_function caches `data` for us.
514 self.test_function(data)
515 return data.as_result()
516
517 def test_function(self, data: ConjectureData) -> None:
518 if self.__pending_call_explanation is not None:
519 self.debug(self.__pending_call_explanation)
520 self.__pending_call_explanation = None
521
522 self.call_count += 1
523 interrupted = False
524
525 try:
526 self.__stoppable_test_function(data)
527 except KeyboardInterrupt:
528 interrupted = True
529 raise
530 except BackendCannotProceed as exc:
531 if exc.scope in ("verified", "exhausted"):
532 self._switch_to_hypothesis_provider = True
533 if exc.scope == "verified":
534 self._verified_by = self.settings.backend
535 elif exc.scope == "discard_test_case":
536 self.__failed_realize_count += 1
537 if (
538 self.__failed_realize_count > 10
539 and (self.__failed_realize_count / self.call_count) > 0.2
540 ):
541 self._switch_to_hypothesis_provider = True
542
543 # treat all BackendCannotProceed exceptions as invalid. This isn't
544 # great; "verified" should really be counted as self.valid_examples += 1.
545 # But we check self.valid_examples == 0 to determine whether to raise
546 # Unsatisfiable, and that would throw this check off.
547 self.invalid_examples += 1
548
549 # skip the post-test-case tracking; we're pretending this never happened
550 interrupted = True
551 data.cannot_proceed_scope = exc.scope
552 data.freeze()
553 return
554 except BaseException:
555 data.freeze()
556 if self.settings.backend != "hypothesis":
557 realize_choices(data, for_failure=True)
558 self.save_choices(data.choices)
559 raise
560 finally:
561 # No branch, because if we're interrupted we always raise
562 # the KeyboardInterrupt, never continue to the code below.
563 if not interrupted: # pragma: no branch
564 assert data.cannot_proceed_scope is None
565 data.freeze()
566 call_stats: CallStats = {
567 "status": data.status.name.lower(),
568 "runtime": data.finish_time - data.start_time,
569 "drawtime": math.fsum(data.draw_times.values()),
570 "gctime": data.gc_finish_time - data.gc_start_time,
571 "events": sorted(
572 k if v == "" else f"{k}: {v}" for k, v in data.events.items()
573 ),
574 }
575 self.stats_per_test_case.append(call_stats)
576 if self.settings.backend != "hypothesis":
577 realize_choices(data, for_failure=data.status is Status.INTERESTING)
578
579 self._cache(data)
580 if data.misaligned_at is not None: # pragma: no branch # coverage bug?
581 self.misaligned_count += 1
582
583 self.debug_data(data)
584
585 if (
586 data.target_observations
587 and self.pareto_front is not None
588 and self.pareto_front.add(data.as_result())
589 ):
590 self.save_choices(data.choices, sub_key=b"pareto")
591
592 if data.status >= Status.VALID:
593 for k, v in data.target_observations.items():
594 self.best_observed_targets[k] = max(self.best_observed_targets[k], v)
595
596 if k not in self.best_examples_of_observed_targets:
597 data_as_result = data.as_result()
598 assert not isinstance(data_as_result, _Overrun)
599 self.best_examples_of_observed_targets[k] = data_as_result
600 continue
601
602 existing_example = self.best_examples_of_observed_targets[k]
603 existing_score = existing_example.target_observations[k]
604
605 if v < existing_score:
606 continue
607
608 if v > existing_score or sort_key(data.nodes) < sort_key(
609 existing_example.nodes
610 ):
611 data_as_result = data.as_result()
612 assert not isinstance(data_as_result, _Overrun)
613 self.best_examples_of_observed_targets[k] = data_as_result
614
615 if data.status is Status.VALID:
616 self.valid_examples += 1
617 if data.status is Status.INVALID:
618 self.invalid_examples += 1
619 if data.status is Status.OVERRUN:
620 self.overrun_examples += 1
621
622 if data.status == Status.INTERESTING:
623 if not self.using_hypothesis_backend:
624 # replay this failure on the hypothesis backend to ensure it still
625 # finds a failure. otherwise, it is flaky.
626 initial_exception = data.expected_exception
627 data = ConjectureData.for_choices(data.choices)
628 # we've already going to use the hypothesis provider for this
629 # data, so the verb "switch" is a bit misleading here. We're really
630 # setting this to inform our on_observation logic that the observation
631 # generated here was from a hypothesis backend, and shouldn't be
632 # sent to the on_observation of any alternative backend.
633 with self._with_switch_to_hypothesis_provider(True):
634 self.__stoppable_test_function(data)
635 data.freeze()
636 # TODO: Should same-origin also be checked? (discussion in
637 # https://github.com/HypothesisWorks/hypothesis/pull/4470#discussion_r2217055487)
638 if data.status != Status.INTERESTING:
639 desc_new_status = {
640 data.status.VALID: "passed",
641 data.status.INVALID: "failed filters",
642 data.status.OVERRUN: "overran",
643 }[data.status]
644 raise FlakyBackendFailure(
645 f"Inconsistent results from replaying a failing test case! "
646 f"Raised {type(initial_exception).__name__} on "
647 f"backend={self.settings.backend!r}, but "
648 f"{desc_new_status} under backend='hypothesis'.",
649 [initial_exception],
650 )
651
652 self._cache(data)
653
654 assert data.interesting_origin is not None
655 key = data.interesting_origin
656 changed = False
657 try:
658 existing = self.interesting_examples[key]
659 except KeyError:
660 changed = True
661 self.last_bug_found_at = self.call_count
662 if self.first_bug_found_at is None:
663 self.first_bug_found_at = self.call_count
664 self.first_bug_found_time = time.monotonic()
665 else:
666 if sort_key(data.nodes) < sort_key(existing.nodes):
667 self.shrinks += 1
668 self.downgrade_choices(existing.choices)
669 self.__data_cache.unpin(self._cache_key(existing.choices))
670 changed = True
671
672 if changed:
673 self.save_choices(data.choices)
674 self.interesting_examples[key] = data.as_result() # type: ignore
675 if not self.using_hypothesis_backend:
676 self._backend_found_failure = True
677 self.__data_cache.pin(self._cache_key(data.choices), data.as_result())
678 self.shrunk_examples.discard(key)
679
680 if self.shrinks >= MAX_SHRINKS:
681 self.exit_with(ExitReason.max_shrinks)
682
683 if (
684 not self.ignore_limits
685 and self.finish_shrinking_deadline is not None
686 and self.finish_shrinking_deadline < time.perf_counter()
687 ):
688 # See https://github.com/HypothesisWorks/hypothesis/issues/2340
689 report(
690 "WARNING: Hypothesis has spent more than five minutes working to shrink"
691 " a failing example, and stopped because it is making very slow"
692 " progress. When you re-run your tests, shrinking will resume and may"
693 " take this long before aborting again.\nPLEASE REPORT THIS if you can"
694 " provide a reproducing example, so that we can improve shrinking"
695 " performance for everyone."
696 )
697 self.exit_with(ExitReason.very_slow_shrinking)
698
699 if not self.interesting_examples:
700 # Note that this logic is reproduced to end the generation phase when
701 # we have interesting examples. Update that too if you change this!
702 # (The doubled implementation is because here we exit the engine entirely,
703 # while in the other case below we just want to move on to shrinking.)
704 if self.valid_examples >= self.settings.max_examples:
705 self.exit_with(ExitReason.max_examples)
706 if self.call_count >= max(
707 self.settings.max_examples * 10,
708 # We have a high-ish default max iterations, so that tests
709 # don't become flaky when max_examples is too low.
710 1000,
711 ):
712 self.exit_with(ExitReason.max_iterations)
713
714 if self.__tree_is_exhausted():
715 self.exit_with(ExitReason.finished)
716
717 self.record_for_health_check(data)
718
719 def on_pareto_evict(self, data: ConjectureResult) -> None:
720 self.settings.database.delete(self.pareto_key, choices_to_bytes(data.choices))
721
722 def generate_novel_prefix(self) -> tuple[ChoiceT, ...]:
723 """Uses the tree to proactively generate a starting choice sequence
724 that we haven't explored yet for this test.
725
726 When this method is called, we assume that there must be at
727 least one novel prefix left to find. If there were not, then the
728 test run should have already stopped due to tree exhaustion.
729 """
730 return self.tree.generate_novel_prefix(self.random)
731
732 def record_for_health_check(self, data: ConjectureData) -> None:
733 # Once we've actually found a bug, there's no point in trying to run
734 # health checks - they'll just mask the actually important information.
735 if data.status == Status.INTERESTING:
736 self.health_check_state = None
737
738 state = self.health_check_state
739
740 if state is None:
741 return
742
743 for k, v in data.draw_times.items():
744 state.draw_times[k].append(v)
745
746 if data.status == Status.VALID:
747 state.valid_examples += 1
748 elif data.status == Status.INVALID:
749 state.invalid_examples += 1
750 else:
751 assert data.status == Status.OVERRUN
752 state.overrun_examples += 1
753
754 max_valid_draws = 10
755 max_invalid_draws = 50
756 max_overrun_draws = 20
757
758 assert state.valid_examples <= max_valid_draws
759
760 if state.valid_examples == max_valid_draws:
761 self.health_check_state = None
762 return
763
764 if state.overrun_examples == max_overrun_draws:
765 fail_health_check(
766 self.settings,
767 "Generated inputs routinely consumed more than the maximum "
768 f"allowed entropy: {state.valid_examples} inputs were generated "
769 f"successfully, while {state.overrun_examples} inputs exceeded the "
770 f"maximum allowed entropy during generation."
771 "\n\n"
772 f"Testing with inputs this large tends to be slow, and to produce "
773 "failures that are both difficult to shrink and difficult to understand. "
774 "Try decreasing the amount of data generated, for example by "
775 "decreasing the minimum size of collection strategies like "
776 "st.lists()."
777 "\n\n"
778 "If you expect the average size of your input to be this large, "
779 "you can disable this health check with "
780 "@settings(suppress_health_check=[HealthCheck.data_too_large]). "
781 "See "
782 "https://hypothesis.readthedocs.io/en/latest/reference/api.html#hypothesis.HealthCheck "
783 "for details.",
784 HealthCheck.data_too_large,
785 )
786 if state.invalid_examples == max_invalid_draws:
787 fail_health_check(
788 self.settings,
789 "It looks like this test is filtering out a lot of inputs. "
790 f"{state.valid_examples} inputs were generated successfully, "
791 f"while {state.invalid_examples} inputs were filtered out. "
792 "\n\n"
793 "An input might be filtered out by calls to assume(), "
794 "strategy.filter(...), or occasionally by Hypothesis internals."
795 "\n\n"
796 "Applying this much filtering makes input generation slow, since "
797 "Hypothesis must discard inputs which are filtered out and try "
798 "generating it again. It is also possible that applying this much "
799 "filtering will distort the domain and/or distribution of the test, "
800 "leaving your testing less rigorous than expected."
801 "\n\n"
802 "If you expect this many inputs to be filtered out during generation, "
803 "you can disable this health check with "
804 "@settings(suppress_health_check=[HealthCheck.filter_too_much]). See "
805 "https://hypothesis.readthedocs.io/en/latest/reference/api.html#hypothesis.HealthCheck "
806 "for details.",
807 HealthCheck.filter_too_much,
808 )
809
810 # Allow at least the greater of one second or 5x the deadline. If deadline
811 # is None, allow 30s - the user can disable the healthcheck too if desired.
812 draw_time = state.total_draw_time
813 draw_time_limit = 5 * (self.settings.deadline or timedelta(seconds=6))
814 if (
815 draw_time > max(1.0, draw_time_limit.total_seconds())
816 # we disable HealthCheck.too_slow under concurrent threads, since
817 # cpython may switch away from a thread for arbitrarily long.
818 and not self.thread_overlap.get(threading.get_ident(), False)
819 ):
820 extra_str = []
821 if state.invalid_examples:
822 extra_str.append(f"{state.invalid_examples} invalid inputs")
823 if state.overrun_examples:
824 extra_str.append(
825 f"{state.overrun_examples} inputs which exceeded the "
826 "maximum allowed entropy"
827 )
828 extra_str = ", and ".join(extra_str)
829 extra_str = f" ({extra_str})" if extra_str else ""
830
831 fail_health_check(
832 self.settings,
833 "Input generation is slow: Hypothesis only generated "
834 f"{state.valid_examples} valid inputs after {draw_time:.2f} "
835 f"seconds{extra_str}."
836 "\n" + state.timing_report() + "\n\n"
837 "This could be for a few reasons:"
838 "\n"
839 "1. This strategy could be generating too much data per input. "
840 "Try decreasing the amount of data generated, for example by "
841 "decreasing the minimum size of collection strategies like "
842 "st.lists()."
843 "\n"
844 "2. Some other expensive computation could be running during input "
845 "generation. For example, "
846 "if @st.composite or st.data() is interspersed with an expensive "
847 "computation, HealthCheck.too_slow is likely to trigger. If this "
848 "computation is unrelated to input generation, move it elsewhere. "
849 "Otherwise, try making it more efficient, or disable this health "
850 "check if that is not possible."
851 "\n\n"
852 "If you expect input generation to take this long, you can disable "
853 "this health check with "
854 "@settings(suppress_health_check=[HealthCheck.too_slow]). See "
855 "https://hypothesis.readthedocs.io/en/latest/reference/api.html#hypothesis.HealthCheck "
856 "for details.",
857 HealthCheck.too_slow,
858 )
859
860 def save_choices(
861 self, choices: Sequence[ChoiceT], sub_key: bytes | None = None
862 ) -> None:
863 if self.settings.database is not None:
864 key = self.sub_key(sub_key)
865 if key is None:
866 return
867 self.settings.database.save(key, choices_to_bytes(choices))
868
869 def downgrade_choices(self, choices: Sequence[ChoiceT]) -> None:
870 buffer = choices_to_bytes(choices)
871 if self.settings.database is not None and self.database_key is not None:
872 self.settings.database.move(self.database_key, self.secondary_key, buffer)
873
874 def sub_key(self, sub_key: bytes | None) -> bytes | None:
875 if self.database_key is None:
876 return None
877 if sub_key is None:
878 return self.database_key
879 return b".".join((self.database_key, sub_key))
880
881 @property
882 def secondary_key(self) -> bytes | None:
883 return self.sub_key(b"secondary")
884
885 @property
886 def pareto_key(self) -> bytes | None:
887 return self.sub_key(b"pareto")
888
889 def debug(self, message: str) -> None:
890 if self.settings.verbosity >= Verbosity.debug:
891 base_report(message)
892
893 @property
894 def report_debug_info(self) -> bool:
895 return self.settings.verbosity >= Verbosity.debug
896
897 def debug_data(self, data: ConjectureData | ConjectureResult) -> None:
898 if not self.report_debug_info:
899 return
900
901 status = repr(data.status)
902 if data.status == Status.INTERESTING:
903 status = f"{status} ({data.interesting_origin!r})"
904
905 self.debug(
906 f"{len(data.choices)} choices {data.choices} -> {status}"
907 f"{', ' + data.output if data.output else ''}"
908 )
909
910 def observe_for_provider(self) -> AbstractContextManager:
911 def on_observation(observation: Observation) -> None:
912 assert observation.type == "test_case"
913 # because lifetime == "test_function"
914 assert isinstance(self.provider, PrimitiveProvider)
915 # only fire if we actually used that provider to generate this observation
916 if not self._switch_to_hypothesis_provider:
917 self.provider.on_observation(observation)
918
919 if (
920 self.settings.backend != "hypothesis"
921 # only for lifetime = "test_function" providers (guaranteed
922 # by this isinstance check)
923 and isinstance(self.provider, PrimitiveProvider)
924 # and the provider opted-in to observations
925 and self.provider.add_observability_callback
926 ):
927 return with_observability_callback(on_observation)
928 return nullcontext()
929
930 def run(self) -> None:
931 with local_settings(self.settings), self.observe_for_provider():
932 try:
933 self._run()
934 except RunIsComplete:
935 pass
936 for v in self.interesting_examples.values():
937 self.debug_data(v)
938 self.debug(
939 f"Run complete after {self.call_count} examples "
940 f"({self.valid_examples} valid) and {self.shrinks} shrinks"
941 )
942
943 @property
944 def database(self) -> ExampleDatabase | None:
945 if self.database_key is None:
946 return None
947 return self.settings.database
948
949 def has_existing_examples(self) -> bool:
950 return self.database is not None and Phase.reuse in self.settings.phases
951
952 def reuse_existing_examples(self) -> None:
953 """If appropriate (we have a database and have been told to use it),
954 try to reload existing examples from the database.
955
956 If there are a lot we don't try all of them. We always try the
957 smallest example in the database (which is guaranteed to be the
958 last failure) and the largest (which is usually the seed example
959 which the last failure came from but we don't enforce that). We
960 then take a random sampling of the remainder and try those. Any
961 examples that are no longer interesting are cleared out.
962 """
963 if self.has_existing_examples():
964 self.debug("Reusing examples from database")
965 # We have to do some careful juggling here. We have two database
966 # corpora: The primary and secondary. The primary corpus is a
967 # small set of minimized examples each of which has at one point
968 # demonstrated a distinct bug. We want to retry all of these.
969
970 # We also have a secondary corpus of examples that have at some
971 # point demonstrated interestingness (currently only ones that
972 # were previously non-minimal examples of a bug, but this will
973 # likely expand in future). These are a good source of potentially
974 # interesting examples, but there are a lot of them, so we down
975 # sample the secondary corpus to a more manageable size.
976
977 corpus = sorted(
978 self.settings.database.fetch(self.database_key), key=shortlex
979 )
980 factor = 0.1 if (Phase.generate in self.settings.phases) else 1
981 desired_size = max(2, ceil(factor * self.settings.max_examples))
982 primary_corpus_size = len(corpus)
983
984 if len(corpus) < desired_size:
985 extra_corpus = list(self.settings.database.fetch(self.secondary_key))
986
987 shortfall = desired_size - len(corpus)
988
989 if len(extra_corpus) <= shortfall:
990 extra = extra_corpus
991 else:
992 extra = self.random.sample(extra_corpus, shortfall)
993 extra.sort(key=shortlex)
994 corpus.extend(extra)
995
996 # We want a fast path where every primary entry in the database was
997 # interesting.
998 found_interesting_in_primary = False
999 all_interesting_in_primary_were_exact = True
1000
1001 for i, existing in enumerate(corpus):
1002 if i >= primary_corpus_size and found_interesting_in_primary:
1003 break
1004 choices = choices_from_bytes(existing)
1005 if choices is None:
1006 # clear out any keys which fail deserialization
1007 self.settings.database.delete(self.database_key, existing)
1008 continue
1009 data = self.cached_test_function(choices, extend="full")
1010 if data.status != Status.INTERESTING:
1011 self.settings.database.delete(self.database_key, existing)
1012 self.settings.database.delete(self.secondary_key, existing)
1013 else:
1014 if i < primary_corpus_size:
1015 found_interesting_in_primary = True
1016 assert not isinstance(data, _Overrun)
1017 if choices_key(choices) != choices_key(data.choices):
1018 all_interesting_in_primary_were_exact = False
1019 if not self.settings.report_multiple_bugs:
1020 break
1021 if found_interesting_in_primary:
1022 if all_interesting_in_primary_were_exact:
1023 self.reused_previously_shrunk_test_case = True
1024
1025 # Because self.database is not None (because self.has_existing_examples())
1026 # and self.database_key is not None (because we fetched using it above),
1027 # we can guarantee self.pareto_front is not None
1028 assert self.pareto_front is not None
1029
1030 # If we've not found any interesting examples so far we try some of
1031 # the pareto front from the last run.
1032 if len(corpus) < desired_size and not self.interesting_examples:
1033 desired_extra = desired_size - len(corpus)
1034 pareto_corpus = list(self.settings.database.fetch(self.pareto_key))
1035 if len(pareto_corpus) > desired_extra:
1036 pareto_corpus = self.random.sample(pareto_corpus, desired_extra)
1037 pareto_corpus.sort(key=shortlex)
1038
1039 for existing in pareto_corpus:
1040 choices = choices_from_bytes(existing)
1041 if choices is None:
1042 self.settings.database.delete(self.pareto_key, existing)
1043 continue
1044 data = self.cached_test_function(choices, extend="full")
1045 if data not in self.pareto_front:
1046 self.settings.database.delete(self.pareto_key, existing)
1047 if data.status == Status.INTERESTING:
1048 break
1049
1050 def exit_with(self, reason: ExitReason) -> None:
1051 if self.ignore_limits:
1052 return
1053 self.statistics["stopped-because"] = reason.describe(self.settings)
1054 if self.best_observed_targets:
1055 self.statistics["targets"] = dict(self.best_observed_targets)
1056 self.debug(f"exit_with({reason.name})")
1057 self.exit_reason = reason
1058 raise RunIsComplete
1059
1060 def should_generate_more(self) -> bool:
1061 # End the generation phase where we would have ended it if no bugs had
1062 # been found. This reproduces the exit logic in `self.test_function`,
1063 # but with the important distinction that this clause will move on to
1064 # the shrinking phase having found one or more bugs, while the other
1065 # will exit having found zero bugs.
1066 if self.valid_examples >= self.settings.max_examples or self.call_count >= max(
1067 self.settings.max_examples * 10, 1000
1068 ): # pragma: no cover
1069 return False
1070
1071 # If we haven't found a bug, keep looking - if we hit any limits on
1072 # the number of tests to run that will raise an exception and stop
1073 # the run.
1074 if not self.interesting_examples:
1075 return True
1076 # Users who disable shrinking probably want to exit as fast as possible.
1077 # If we've found a bug and won't report more than one, stop looking.
1078 # If we first saw a bug more than 10 seconds ago, stop looking.
1079 elif (
1080 Phase.shrink not in self.settings.phases
1081 or not self.settings.report_multiple_bugs
1082 or time.monotonic() - self.first_bug_found_time > 10
1083 ):
1084 return False
1085 assert isinstance(self.first_bug_found_at, int)
1086 assert isinstance(self.last_bug_found_at, int)
1087 assert self.first_bug_found_at <= self.last_bug_found_at <= self.call_count
1088 # Otherwise, keep searching for between ten and 'a heuristic' calls.
1089 # We cap 'calls after first bug' so errors are reported reasonably
1090 # soon even for tests that are allowed to run for a very long time,
1091 # or sooner if the latest half of our test effort has been fruitless.
1092 return self.call_count < MIN_TEST_CALLS or self.call_count < min(
1093 self.first_bug_found_at + 1000, self.last_bug_found_at * 2
1094 )
1095
1096 def generate_new_examples(self) -> None:
1097 if Phase.generate not in self.settings.phases:
1098 return
1099 if self.interesting_examples:
1100 # The example database has failing examples from a previous run,
1101 # so we'd rather report that they're still failing ASAP than take
1102 # the time to look for additional failures.
1103 return
1104
1105 self.debug("Generating new examples")
1106
1107 assert self.should_generate_more()
1108 self._switch_to_hypothesis_provider = True
1109 zero_data = self.cached_test_function((ChoiceTemplate("simplest", count=None),))
1110 if zero_data.status > Status.OVERRUN:
1111 assert isinstance(zero_data, ConjectureResult)
1112 # if the crosshair backend cannot proceed, it does not (and cannot)
1113 # realize the symbolic values, with the intent that Hypothesis will
1114 # throw away this test case. We usually do, but if it's the zero data
1115 # then we try to pin it here, which requires realizing the symbolics.
1116 #
1117 # We don't (yet) rely on the zero data being pinned, and so
1118 # it's simply a very slight performance loss to simply not pin it
1119 # if doing so would error.
1120 if zero_data.cannot_proceed_scope is None: # pragma: no branch
1121 self.__data_cache.pin(
1122 self._cache_key(zero_data.choices), zero_data.as_result()
1123 ) # Pin forever
1124
1125 if zero_data.status == Status.OVERRUN or (
1126 zero_data.status == Status.VALID
1127 and isinstance(zero_data, ConjectureResult)
1128 and zero_data.length * 2 > BUFFER_SIZE
1129 ):
1130 fail_health_check(
1131 self.settings,
1132 "The smallest natural input for this test is very "
1133 "large. This makes it difficult for Hypothesis to generate "
1134 "good inputs, especially when trying to shrink failing inputs."
1135 "\n\n"
1136 "Consider reducing the amount of data generated by the strategy. "
1137 "Also consider introducing small alternative values for some "
1138 "strategies. For example, could you "
1139 "mark some arguments as optional by replacing `some_complex_strategy`"
1140 "with `st.none() | some_complex_strategy`?"
1141 "\n\n"
1142 "If you are confident that the size of the smallest natural input "
1143 "to your test cannot be reduced, you can suppress this health check "
1144 "with @settings(suppress_health_check=[HealthCheck.large_base_example]). "
1145 "See "
1146 "https://hypothesis.readthedocs.io/en/latest/reference/api.html#hypothesis.HealthCheck "
1147 "for details.",
1148 HealthCheck.large_base_example,
1149 )
1150
1151 self.health_check_state = HealthCheckState()
1152
1153 # We attempt to use the size of the minimal generated test case starting
1154 # from a given novel prefix as a guideline to generate smaller test
1155 # cases for an initial period, by restriscting ourselves to test cases
1156 # that are not much larger than it.
1157 #
1158 # Calculating the actual minimal generated test case is hard, so we
1159 # take a best guess that zero extending a prefix produces the minimal
1160 # test case starting with that prefix (this is true for our built in
1161 # strategies). This is only a reasonable thing to do if the resulting
1162 # test case is valid. If we regularly run into situations where it is
1163 # not valid then this strategy is a waste of time, so we want to
1164 # abandon it early. In order to do this we track how many times in a
1165 # row it has failed to work, and abort small test case generation when
1166 # it has failed too many times in a row.
1167 consecutive_zero_extend_is_invalid = 0
1168
1169 # We control growth during initial example generation, for two
1170 # reasons:
1171 #
1172 # * It gives us an opportunity to find small examples early, which
1173 # gives us a fast path for easy to find bugs.
1174 # * It avoids low probability events where we might end up
1175 # generating very large examples during health checks, which
1176 # on slower machines can trigger HealthCheck.too_slow.
1177 #
1178 # The heuristic we use is that we attempt to estimate the smallest
1179 # extension of this prefix, and limit the size to no more than
1180 # an order of magnitude larger than that. If we fail to estimate
1181 # the size accurately, we skip over this prefix and try again.
1182 #
1183 # We need to tune the example size based on the initial prefix,
1184 # because any fixed size might be too small, and any size based
1185 # on the strategy in general can fall afoul of strategies that
1186 # have very different sizes for different prefixes.
1187 #
1188 # We previously set a minimum value of 10 on small_example_cap, with the
1189 # reasoning of avoiding flaky health checks. However, some users set a
1190 # low max_examples for performance. A hard lower bound in this case biases
1191 # the distribution towards small (and less powerful) examples. Flaky
1192 # and loud health checks are better than silent performance degradation.
1193 small_example_cap = min(self.settings.max_examples // 10, 50)
1194 optimise_at = max(self.settings.max_examples // 2, small_example_cap + 1, 10)
1195 ran_optimisations = False
1196 self._switch_to_hypothesis_provider = False
1197
1198 while self.should_generate_more():
1199 # we don't yet integrate DataTree with backends. Instead of generating
1200 # a novel prefix, ask the backend for an input.
1201 if not self.using_hypothesis_backend:
1202 data = self.new_conjecture_data([])
1203 with suppress(BackendCannotProceed):
1204 self.test_function(data)
1205 continue
1206
1207 self._current_phase = "generate"
1208 prefix = self.generate_novel_prefix()
1209 if (
1210 self.valid_examples <= small_example_cap
1211 and self.call_count <= 5 * small_example_cap
1212 and not self.interesting_examples
1213 and consecutive_zero_extend_is_invalid < 5
1214 ):
1215 minimal_example = self.cached_test_function(
1216 prefix + (ChoiceTemplate("simplest", count=None),)
1217 )
1218
1219 if minimal_example.status < Status.VALID:
1220 consecutive_zero_extend_is_invalid += 1
1221 continue
1222 # Because the Status code is greater than Status.VALID, it cannot be
1223 # Status.OVERRUN, which guarantees that the minimal_example is a
1224 # ConjectureResult object.
1225 assert isinstance(minimal_example, ConjectureResult)
1226 consecutive_zero_extend_is_invalid = 0
1227 minimal_extension = len(minimal_example.choices) - len(prefix)
1228 max_length = len(prefix) + minimal_extension * 5
1229
1230 # We could end up in a situation where even though the prefix was
1231 # novel when we generated it, because we've now tried zero extending
1232 # it not all possible continuations of it will be novel. In order to
1233 # avoid making redundant test calls, we rerun it in simulation mode
1234 # first. If this has a predictable result, then we don't bother
1235 # running the test function for real here. If however we encounter
1236 # some novel behaviour, we try again with the real test function,
1237 # starting from the new novel prefix that has discovered.
1238 trial_data = self.new_conjecture_data(prefix, max_choices=max_length)
1239 try:
1240 self.tree.simulate_test_function(trial_data)
1241 continue
1242 except PreviouslyUnseenBehaviour:
1243 pass
1244
1245 # If the simulation entered part of the tree that has been killed,
1246 # we don't want to run this.
1247 assert isinstance(trial_data.observer, TreeRecordingObserver)
1248 if trial_data.observer.killed:
1249 continue
1250
1251 # We might have hit the cap on number of examples we should
1252 # run when calculating the minimal example.
1253 if not self.should_generate_more():
1254 break
1255
1256 prefix = trial_data.choices
1257 else:
1258 max_length = None
1259
1260 data = self.new_conjecture_data(prefix, max_choices=max_length)
1261 self.test_function(data)
1262
1263 if (
1264 data.status is Status.OVERRUN
1265 and max_length is not None
1266 and "invalid because" not in data.events
1267 ):
1268 data.events["invalid because"] = (
1269 "reduced max size for early examples (avoids flaky health checks)"
1270 )
1271
1272 self.generate_mutations_from(data)
1273
1274 # Although the optimisations are logically a distinct phase, we
1275 # actually normally run them as part of example generation. The
1276 # reason for this is that we cannot guarantee that optimisation
1277 # actually exhausts our budget: It might finish running and we
1278 # discover that actually we still could run a bunch more test cases
1279 # if we want.
1280 if (
1281 self.valid_examples >= max(small_example_cap, optimise_at)
1282 and not ran_optimisations
1283 ):
1284 ran_optimisations = True
1285 self._current_phase = "target"
1286 self.optimise_targets()
1287
1288 def generate_mutations_from(self, data: ConjectureData | ConjectureResult) -> None:
1289 # A thing that is often useful but rarely happens by accident is
1290 # to generate the same value at multiple different points in the
1291 # test case.
1292 #
1293 # Rather than make this the responsibility of individual strategies
1294 # we implement a small mutator that just takes parts of the test
1295 # case with the same label and tries replacing one of them with a
1296 # copy of the other and tries running it. If we've made a good
1297 # guess about what to put where, this will run a similar generated
1298 # test case with more duplication.
1299 if (
1300 # An OVERRUN doesn't have enough information about the test
1301 # case to mutate, so we just skip those.
1302 data.status >= Status.INVALID
1303 # This has a tendency to trigger some weird edge cases during
1304 # generation so we don't let it run until we're done with the
1305 # health checks.
1306 and self.health_check_state is None
1307 ):
1308 initial_calls = self.call_count
1309 failed_mutations = 0
1310
1311 while (
1312 self.should_generate_more()
1313 # We implement fairly conservative checks for how long we
1314 # we should run mutation for, as it's generally not obvious
1315 # how helpful it is for any given test case.
1316 and self.call_count <= initial_calls + 5
1317 and failed_mutations <= 5
1318 ):
1319 groups = data.spans.mutator_groups
1320 if not groups:
1321 break
1322
1323 group = self.random.choice(groups)
1324 (start1, end1), (start2, end2) = self.random.sample(sorted(group), 2)
1325 if start1 > start2:
1326 (start1, end1), (start2, end2) = (start2, end2), (start1, end1)
1327
1328 if (
1329 start1 <= start2 <= end2 <= end1
1330 ): # pragma: no cover # flaky on conjecture-cover tests
1331 # One span entirely contains the other. The strategy is very
1332 # likely some kind of tree. e.g. we might have
1333 #
1334 # ┌─────┐
1335 # ┌─────┤ a ├──────┐
1336 # │ └─────┘ │
1337 # ┌──┴──┐ ┌──┴──┐
1338 # ┌──┤ b ├──┐ ┌──┤ c ├──┐
1339 # │ └──┬──┘ │ │ └──┬──┘ │
1340 # ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐
1341 # │ d │ │ e │ │ f │ │ g │ │ h │ │ i │
1342 # └───┘ └───┘ └───┘ └───┘ └───┘ └───┘
1343 #
1344 # where each node is drawn from the same strategy and so
1345 # has the same span label. We might have selected the spans
1346 # corresponding to the a and c nodes, which is the entire
1347 # tree and the subtree of (and including) c respectively.
1348 #
1349 # There are two possible mutations we could apply in this case:
1350 # 1. replace a with c (replace child with parent)
1351 # 2. replace c with a (replace parent with child)
1352 #
1353 # (1) results in multiple partial copies of the
1354 # parent:
1355 # ┌─────┐
1356 # ┌─────┤ a ├────────────┐
1357 # │ └─────┘ │
1358 # ┌──┴──┐ ┌─┴───┐
1359 # ┌──┤ b ├──┐ ┌─────┤ a ├──────┐
1360 # │ └──┬──┘ │ │ └─────┘ │
1361 # ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌──┴──┐ ┌──┴──┐
1362 # │ d │ │ e │ │ f │ ┌──┤ b ├──┐ ┌──┤ c ├──┐
1363 # └───┘ └───┘ └───┘ │ └──┬──┘ │ │ └──┬──┘ │
1364 # ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐
1365 # │ d │ │ e │ │ f │ │ g │ │ h │ │ i │
1366 # └───┘ └───┘ └───┘ └───┘ └───┘ └───┘
1367 #
1368 # While (2) results in truncating part of the parent:
1369 #
1370 # ┌─────┐
1371 # ┌──┤ c ├──┐
1372 # │ └──┬──┘ │
1373 # ┌─┴─┐ ┌─┴─┐ ┌─┴─┐
1374 # │ g │ │ h │ │ i │
1375 # └───┘ └───┘ └───┘
1376 #
1377 # (1) is the same as Example IV.4. in Nautilus (NDSS '19)
1378 # (https://wcventure.github.io/FuzzingPaper/Paper/NDSS19_Nautilus.pdf),
1379 # except we do not repeat the replacement additional times
1380 # (the paper repeats it once for a total of two copies).
1381 #
1382 # We currently only apply mutation (1), and ignore mutation
1383 # (2). The reason is that the attempt generated from (2) is
1384 # always something that Hypothesis could easily have generated
1385 # itself, by simply not making various choices. Whereas
1386 # duplicating the exact value + structure of particular choices
1387 # in (1) would have been hard for Hypothesis to generate by
1388 # chance.
1389 #
1390 # TODO: an extension of this mutation might repeat (1) on
1391 # a geometric distribution between 0 and ~10 times. We would
1392 # need to find the corresponding span to recurse on in the new
1393 # choices, probably just by using the choices index.
1394
1395 # case (1): duplicate the choices in start1:start2.
1396 attempt = data.choices[:start2] + data.choices[start1:]
1397 else:
1398 (start, end) = self.random.choice([(start1, end1), (start2, end2)])
1399 replacement = data.choices[start:end]
1400 # We attempt to replace both the examples with
1401 # whichever choice we made. Note that this might end
1402 # up messing up and getting the example boundaries
1403 # wrong - labels matching are only a best guess as to
1404 # whether the two are equivalent - but it doesn't
1405 # really matter. It may not achieve the desired result,
1406 # but it's still a perfectly acceptable choice sequence
1407 # to try.
1408 attempt = (
1409 data.choices[:start1]
1410 + replacement
1411 + data.choices[end1:start2]
1412 + replacement
1413 + data.choices[end2:]
1414 )
1415
1416 try:
1417 new_data = self.cached_test_function(
1418 attempt,
1419 # We set error_on_discard so that we don't end up
1420 # entering parts of the tree we consider redundant
1421 # and not worth exploring.
1422 error_on_discard=True,
1423 )
1424 except ContainsDiscard:
1425 failed_mutations += 1
1426 continue
1427
1428 if new_data is Overrun:
1429 failed_mutations += 1 # pragma: no cover # annoying case
1430 else:
1431 assert isinstance(new_data, ConjectureResult)
1432 if (
1433 new_data.status >= data.status
1434 and choices_key(data.choices) != choices_key(new_data.choices)
1435 and all(
1436 k in new_data.target_observations
1437 and new_data.target_observations[k] >= v
1438 for k, v in data.target_observations.items()
1439 )
1440 ):
1441 data = new_data
1442 failed_mutations = 0
1443 else:
1444 failed_mutations += 1
1445
1446 def optimise_targets(self) -> None:
1447 """If any target observations have been made, attempt to optimise them
1448 all."""
1449 if not self.should_optimise:
1450 return
1451 from hypothesis.internal.conjecture.optimiser import Optimiser
1452
1453 # We want to avoid running the optimiser for too long in case we hit
1454 # an unbounded target score. We start this off fairly conservatively
1455 # in case interesting examples are easy to find and then ramp it up
1456 # on an exponential schedule so we don't hamper the optimiser too much
1457 # if it needs a long time to find good enough improvements.
1458 max_improvements = 10
1459 while True:
1460 prev_calls = self.call_count
1461
1462 any_improvements = False
1463
1464 for target, data in list(self.best_examples_of_observed_targets.items()):
1465 optimiser = Optimiser(
1466 self, data, target, max_improvements=max_improvements
1467 )
1468 optimiser.run()
1469 if optimiser.improvements > 0:
1470 any_improvements = True
1471
1472 if self.interesting_examples:
1473 break
1474
1475 max_improvements *= 2
1476
1477 if any_improvements:
1478 continue
1479
1480 if self.best_observed_targets:
1481 self.pareto_optimise()
1482
1483 if prev_calls == self.call_count:
1484 break
1485
1486 def pareto_optimise(self) -> None:
1487 if self.pareto_front is not None:
1488 ParetoOptimiser(self).run()
1489
1490 def _run(self) -> None:
1491 # have to use the primitive provider to interpret database bits...
1492 self._switch_to_hypothesis_provider = True
1493 with self._log_phase_statistics("reuse"):
1494 self.reuse_existing_examples()
1495 # Fast path for development: If the database gave us interesting
1496 # examples from the previously stored primary key, don't try
1497 # shrinking it again as it's unlikely to work.
1498 if self.reused_previously_shrunk_test_case:
1499 self.exit_with(ExitReason.finished)
1500 # ...but we should use the supplied provider when generating...
1501 self._switch_to_hypothesis_provider = False
1502 with self._log_phase_statistics("generate"):
1503 self.generate_new_examples()
1504 # We normally run the targeting phase mixed in with the generate phase,
1505 # but if we've been asked to run it but not generation then we have to
1506 # run it explicitly on its own here.
1507 if Phase.generate not in self.settings.phases:
1508 self._current_phase = "target"
1509 self.optimise_targets()
1510 # ...and back to the primitive provider when shrinking.
1511 self._switch_to_hypothesis_provider = True
1512 with self._log_phase_statistics("shrink"):
1513 self.shrink_interesting_examples()
1514 self.exit_with(ExitReason.finished)
1515
1516 def new_conjecture_data(
1517 self,
1518 prefix: Sequence[ChoiceT | ChoiceTemplate],
1519 *,
1520 observer: DataObserver | None = None,
1521 max_choices: int | None = None,
1522 ) -> ConjectureData:
1523 provider = (
1524 HypothesisProvider if self._switch_to_hypothesis_provider else self.provider
1525 )
1526 observer = observer or self.tree.new_observer()
1527 if not self.using_hypothesis_backend:
1528 observer = DataObserver()
1529
1530 return ConjectureData(
1531 prefix=prefix,
1532 observer=observer,
1533 provider=provider,
1534 max_choices=max_choices,
1535 random=self.random,
1536 )
1537
1538 def shrink_interesting_examples(self) -> None:
1539 """If we've found interesting examples, try to replace each of them
1540 with a minimal interesting example with the same interesting_origin.
1541
1542 We may find one or more examples with a new interesting_origin
1543 during the shrink process. If so we shrink these too.
1544 """
1545 if Phase.shrink not in self.settings.phases or not self.interesting_examples:
1546 return
1547
1548 self.debug("Shrinking interesting examples")
1549 self.finish_shrinking_deadline = time.perf_counter() + MAX_SHRINKING_SECONDS
1550
1551 for prev_data in sorted(
1552 self.interesting_examples.values(), key=lambda d: sort_key(d.nodes)
1553 ):
1554 assert prev_data.status == Status.INTERESTING
1555 data = self.new_conjecture_data(prev_data.choices)
1556 self.test_function(data)
1557 if data.status != Status.INTERESTING:
1558 self.exit_with(ExitReason.flaky)
1559
1560 self.clear_secondary_key()
1561
1562 while len(self.shrunk_examples) < len(self.interesting_examples):
1563 target, example = min(
1564 (
1565 (k, v)
1566 for k, v in self.interesting_examples.items()
1567 if k not in self.shrunk_examples
1568 ),
1569 key=lambda kv: (sort_key(kv[1].nodes), shortlex(repr(kv[0]))),
1570 )
1571 self.debug(f"Shrinking {target!r}: {example.choices}")
1572
1573 if not self.settings.report_multiple_bugs:
1574 # If multi-bug reporting is disabled, we shrink our currently-minimal
1575 # failure, allowing 'slips' to any bug with a smaller minimal example.
1576 self.shrink(example, lambda d: d.status == Status.INTERESTING)
1577 return
1578
1579 def predicate(d: ConjectureResult | _Overrun) -> bool:
1580 if d.status < Status.INTERESTING:
1581 return False
1582 d = cast(ConjectureResult, d)
1583 return d.interesting_origin == target
1584
1585 self.shrink(example, predicate)
1586
1587 self.shrunk_examples.add(target)
1588
1589 def clear_secondary_key(self) -> None:
1590 if self.has_existing_examples():
1591 # If we have any smaller examples in the secondary corpus, now is
1592 # a good time to try them to see if they work as shrinks. They
1593 # probably won't, but it's worth a shot and gives us a good
1594 # opportunity to clear out the database.
1595
1596 # It's not worth trying the primary corpus because we already
1597 # tried all of those in the initial phase.
1598 corpus = sorted(
1599 self.settings.database.fetch(self.secondary_key), key=shortlex
1600 )
1601 for c in corpus:
1602 choices = choices_from_bytes(c)
1603 if choices is None:
1604 self.settings.database.delete(self.secondary_key, c)
1605 continue
1606 primary = {
1607 choices_to_bytes(v.choices)
1608 for v in self.interesting_examples.values()
1609 }
1610 if shortlex(c) > max(map(shortlex, primary)):
1611 break
1612
1613 self.cached_test_function(choices)
1614 # We unconditionally remove c from the secondary key as it
1615 # is either now primary or worse than our primary example
1616 # of this reason for interestingness.
1617 self.settings.database.delete(self.secondary_key, c)
1618
1619 def shrink(
1620 self,
1621 example: ConjectureData | ConjectureResult,
1622 predicate: ShrinkPredicateT | None = None,
1623 allow_transition: (
1624 Callable[[ConjectureData | ConjectureResult, ConjectureData], bool] | None
1625 ) = None,
1626 ) -> ConjectureData | ConjectureResult:
1627 s = self.new_shrinker(example, predicate, allow_transition)
1628 s.shrink()
1629 return s.shrink_target
1630
1631 def new_shrinker(
1632 self,
1633 example: ConjectureData | ConjectureResult,
1634 predicate: ShrinkPredicateT | None = None,
1635 allow_transition: (
1636 Callable[[ConjectureData | ConjectureResult, ConjectureData], bool] | None
1637 ) = None,
1638 ) -> Shrinker:
1639 return Shrinker(
1640 self,
1641 example,
1642 predicate,
1643 allow_transition=allow_transition,
1644 explain=Phase.explain in self.settings.phases,
1645 in_target_phase=self._current_phase == "target",
1646 )
1647
1648 def passing_choice_sequences(
1649 self, prefix: Sequence[ChoiceNode] = ()
1650 ) -> frozenset[tuple[ChoiceNode, ...]]:
1651 """Return a collection of choice sequence nodes which cause the test to pass.
1652 Optionally restrict this by a certain prefix, which is useful for explain mode.
1653 """
1654 return frozenset(
1655 cast(ConjectureResult, result).nodes
1656 for key in self.__data_cache
1657 if (result := self.__data_cache[key]).status is Status.VALID
1658 and startswith(cast(ConjectureResult, result).nodes, prefix)
1659 )
1660
1661
1662class ContainsDiscard(Exception):
1663 pass