1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
10
11import importlib
12import inspect
13import math
14import threading
15import time
16from collections import defaultdict
17from collections.abc import Generator, Sequence
18from contextlib import AbstractContextManager, contextmanager, nullcontext, suppress
19from dataclasses import dataclass, field
20from datetime import timedelta
21from enum import Enum
22from random import Random, getrandbits
23from typing import Callable, Literal, NoReturn, Optional, Union, cast
24
25from hypothesis import HealthCheck, Phase, Verbosity, settings as Settings
26from hypothesis._settings import local_settings, note_deprecation
27from hypothesis.database import ExampleDatabase, choices_from_bytes, choices_to_bytes
28from hypothesis.errors import (
29 BackendCannotProceed,
30 FlakyBackendFailure,
31 HypothesisException,
32 InvalidArgument,
33 StopTest,
34)
35from hypothesis.internal.cache import LRUReusedCache
36from hypothesis.internal.compat import NotRequired, TypedDict, ceil, override
37from hypothesis.internal.conjecture.choice import (
38 ChoiceConstraintsT,
39 ChoiceKeyT,
40 ChoiceNode,
41 ChoiceT,
42 ChoiceTemplate,
43 choices_key,
44)
45from hypothesis.internal.conjecture.data import (
46 ConjectureData,
47 ConjectureResult,
48 DataObserver,
49 Overrun,
50 Status,
51 _Overrun,
52)
53from hypothesis.internal.conjecture.datatree import (
54 DataTree,
55 PreviouslyUnseenBehaviour,
56 TreeRecordingObserver,
57)
58from hypothesis.internal.conjecture.junkdrawer import (
59 ensure_free_stackframes,
60 startswith,
61)
62from hypothesis.internal.conjecture.pareto import NO_SCORE, ParetoFront, ParetoOptimiser
63from hypothesis.internal.conjecture.providers import (
64 AVAILABLE_PROVIDERS,
65 HypothesisProvider,
66 PrimitiveProvider,
67)
68from hypothesis.internal.conjecture.shrinker import Shrinker, ShrinkPredicateT, sort_key
69from hypothesis.internal.escalation import InterestingOrigin
70from hypothesis.internal.healthcheck import fail_health_check
71from hypothesis.internal.observability import Observation, with_observability_callback
72from hypothesis.reporting import base_report, report
73
74# In most cases, the following constants are all Final. However, we do allow users
75# to monkeypatch all of these variables, which means we cannot annotate them as
76# Final or mypyc will inline them and render monkeypatching useless.
77
78#: The maximum number of times the shrinker will reduce the complexity of a failing
79#: input before giving up. This avoids falling down a trap of exponential (or worse)
80#: complexity, where the shrinker appears to be making progress but will take a
81#: substantially long time to finish completely.
82MAX_SHRINKS: int = 500
83
84# If the shrinking phase takes more than five minutes, abort it early and print
85# a warning. Many CI systems will kill a build after around ten minutes with
86# no output, and appearing to hang isn't great for interactive use either -
87# showing partially-shrunk examples is better than quitting with no examples!
88# (but make it monkeypatchable, for the rare users who need to keep on shrinking)
89
90#: The maximum total time in seconds that the shrinker will try to shrink a failure
91#: for before giving up. This is across all shrinks for the same failure, so even
92#: if the shrinker successfully reduces the complexity of a single failure several
93#: times, it will stop when it hits |MAX_SHRINKING_SECONDS| of total time taken.
94MAX_SHRINKING_SECONDS: int = 300
95
96#: The maximum amount of entropy a single test case can use before giving up
97#: while making random choices during input generation.
98#:
99#: The "unit" of one |BUFFER_SIZE| does not have any defined semantics, and you
100#: should not rely on it, except that a linear increase |BUFFER_SIZE| will linearly
101#: increase the amount of entropy a test case can use during generation.
102BUFFER_SIZE: int = 8 * 1024
103CACHE_SIZE: int = 10000
104MIN_TEST_CALLS: int = 10
105
106
107def shortlex(s):
108 return (len(s), s)
109
110
111@dataclass
112class HealthCheckState:
113 valid_examples: int = field(default=0)
114 invalid_examples: int = field(default=0)
115 overrun_examples: int = field(default=0)
116 draw_times: defaultdict[str, list[float]] = field(
117 default_factory=lambda: defaultdict(list)
118 )
119
120 @property
121 def total_draw_time(self) -> float:
122 return math.fsum(sum(self.draw_times.values(), start=[]))
123
124 def timing_report(self) -> str:
125 """Return a terminal report describing what was slow."""
126 if not self.draw_times:
127 return ""
128 width = max(
129 len(k.removeprefix("generate:").removesuffix(": ")) for k in self.draw_times
130 )
131 out = [f"\n {'':^{width}} count | fraction | slowest draws (seconds)"]
132 args_in_order = sorted(self.draw_times.items(), key=lambda kv: -sum(kv[1]))
133 for i, (argname, times) in enumerate(args_in_order): # pragma: no branch
134 # If we have very many unique keys, which can happen due to interactive
135 # draws with computed labels, we'll skip uninformative rows.
136 if (
137 5 <= i < (len(self.draw_times) - 2)
138 and math.fsum(times) * 20 < self.total_draw_time
139 ):
140 out.append(f" (skipped {len(self.draw_times) - i} rows of fast draws)")
141 break
142 # Compute the row to report, omitting times <1ms to focus on slow draws
143 reprs = [f"{t:>6.3f}," for t in sorted(times)[-5:] if t > 5e-4]
144 desc = " ".join(([" -- "] * 5 + reprs)[-5:]).rstrip(",")
145 arg = argname.removeprefix("generate:").removesuffix(": ")
146 out.append(
147 f" {arg:^{width}} | {len(times):>4} | "
148 f"{math.fsum(times)/self.total_draw_time:>7.0%} | {desc}"
149 )
150 return "\n".join(out)
151
152
153class ExitReason(Enum):
154 max_examples = "settings.max_examples={s.max_examples}"
155 max_iterations = (
156 "settings.max_examples={s.max_examples}, "
157 "but < 10% of examples satisfied assumptions"
158 )
159 max_shrinks = f"shrunk example {MAX_SHRINKS} times"
160 finished = "nothing left to do"
161 flaky = "test was flaky"
162 very_slow_shrinking = "shrinking was very slow"
163
164 def describe(self, settings: Settings) -> str:
165 return self.value.format(s=settings)
166
167
168class RunIsComplete(Exception):
169 pass
170
171
172def _get_provider(backend: str) -> Union[type, PrimitiveProvider]:
173 module_name, class_name = AVAILABLE_PROVIDERS[backend].rsplit(".", 1)
174 provider_cls = getattr(importlib.import_module(module_name), class_name)
175 if provider_cls.lifetime == "test_function":
176 return provider_cls(None)
177 elif provider_cls.lifetime == "test_case":
178 return provider_cls
179 else:
180 raise InvalidArgument(
181 f"invalid lifetime {provider_cls.lifetime} for provider {provider_cls.__name__}. "
182 "Expected one of 'test_function', 'test_case'."
183 )
184
185
186class CallStats(TypedDict):
187 status: str
188 runtime: float
189 drawtime: float
190 gctime: float
191 events: list[str]
192
193
194PhaseStatistics = TypedDict(
195 "PhaseStatistics",
196 {
197 "duration-seconds": float,
198 "test-cases": list[CallStats],
199 "distinct-failures": int,
200 "shrinks-successful": int,
201 },
202)
203StatisticsDict = TypedDict(
204 "StatisticsDict",
205 {
206 "generate-phase": NotRequired[PhaseStatistics],
207 "reuse-phase": NotRequired[PhaseStatistics],
208 "shrink-phase": NotRequired[PhaseStatistics],
209 "stopped-because": NotRequired[str],
210 "targets": NotRequired[dict[str, float]],
211 "nodeid": NotRequired[str],
212 },
213)
214
215
216def choice_count(choices: Sequence[Union[ChoiceT, ChoiceTemplate]]) -> Optional[int]:
217 count = 0
218 for choice in choices:
219 if isinstance(choice, ChoiceTemplate):
220 if choice.count is None:
221 return None
222 count += choice.count
223 else:
224 count += 1
225 return count
226
227
228class DiscardObserver(DataObserver):
229 @override
230 def kill_branch(self) -> NoReturn:
231 raise ContainsDiscard
232
233
234def realize_choices(data: ConjectureData, *, for_failure: bool) -> None:
235 # backwards-compatibility with backends without for_failure, can remove
236 # in a few months
237 kwargs = {}
238 if for_failure:
239 if "for_failure" in inspect.signature(data.provider.realize).parameters:
240 kwargs["for_failure"] = True
241 else:
242 note_deprecation(
243 f"{type(data.provider).__qualname__}.realize does not have the "
244 "for_failure parameter. This will be an error in future versions "
245 "of Hypothesis. (If you installed this backend from a separate "
246 "package, upgrading that package may help).",
247 has_codemod=False,
248 since="2025-05-07",
249 )
250
251 for node in data.nodes:
252 value = data.provider.realize(node.value, **kwargs)
253 expected_type = {
254 "string": str,
255 "float": float,
256 "integer": int,
257 "boolean": bool,
258 "bytes": bytes,
259 }[node.type]
260 if type(value) is not expected_type:
261 raise HypothesisException(
262 f"expected {expected_type} from "
263 f"{data.provider.realize.__qualname__}, got {type(value)}"
264 )
265
266 constraints = cast(
267 ChoiceConstraintsT,
268 {
269 k: data.provider.realize(v, **kwargs)
270 for k, v in node.constraints.items()
271 },
272 )
273 node.value = value
274 node.constraints = constraints
275
276
277class ConjectureRunner:
278 def __init__(
279 self,
280 test_function: Callable[[ConjectureData], None],
281 *,
282 settings: Optional[Settings] = None,
283 random: Optional[Random] = None,
284 database_key: Optional[bytes] = None,
285 ignore_limits: bool = False,
286 thread_overlap: Optional[dict[int, bool]] = None,
287 ) -> None:
288 self._test_function: Callable[[ConjectureData], None] = test_function
289 self.settings: Settings = settings or Settings()
290 self.shrinks: int = 0
291 self.finish_shrinking_deadline: Optional[float] = None
292 self.call_count: int = 0
293 self.misaligned_count: int = 0
294 self.valid_examples: int = 0
295 self.invalid_examples: int = 0
296 self.overrun_examples: int = 0
297 self.random: Random = random or Random(getrandbits(128))
298 self.database_key: Optional[bytes] = database_key
299 self.ignore_limits: bool = ignore_limits
300 self.thread_overlap = {} if thread_overlap is None else thread_overlap
301
302 # Global dict of per-phase statistics, and a list of per-call stats
303 # which transfer to the global dict at the end of each phase.
304 self._current_phase: str = "(not a phase)"
305 self.statistics: StatisticsDict = {}
306 self.stats_per_test_case: list[CallStats] = []
307
308 self.interesting_examples: dict[InterestingOrigin, ConjectureResult] = {}
309 # We use call_count because there may be few possible valid_examples.
310 self.first_bug_found_at: Optional[int] = None
311 self.last_bug_found_at: Optional[int] = None
312
313 self.shrunk_examples: set[InterestingOrigin] = set()
314 self.health_check_state: Optional[HealthCheckState] = None
315 self.tree: DataTree = DataTree()
316 self.provider: Union[type, PrimitiveProvider] = _get_provider(
317 self.settings.backend
318 )
319
320 self.best_observed_targets: defaultdict[str, float] = defaultdict(
321 lambda: NO_SCORE
322 )
323 self.best_examples_of_observed_targets: dict[str, ConjectureResult] = {}
324
325 # We keep the pareto front in the example database if we have one. This
326 # is only marginally useful at present, but speeds up local development
327 # because it means that large targets will be quickly surfaced in your
328 # testing.
329 self.pareto_front: Optional[ParetoFront] = None
330 if self.database_key is not None and self.settings.database is not None:
331 self.pareto_front = ParetoFront(self.random)
332 self.pareto_front.on_evict(self.on_pareto_evict)
333
334 # We want to be able to get the ConjectureData object that results
335 # from running a choice sequence without recalculating, especially during
336 # shrinking where we need to know about the structure of the
337 # executed test case.
338 self.__data_cache = LRUReusedCache[
339 tuple[ChoiceKeyT, ...], Union[ConjectureResult, _Overrun]
340 ](CACHE_SIZE)
341
342 self.reused_previously_shrunk_test_case: bool = False
343
344 self.__pending_call_explanation: Optional[str] = None
345 self._backend_found_failure: bool = False
346 self._backend_exceeded_deadline: bool = False
347 self._switch_to_hypothesis_provider: bool = False
348
349 self.__failed_realize_count: int = 0
350 # note unsound verification by alt backends
351 self._verified_by: Optional[str] = None
352
353 @contextmanager
354 def _with_switch_to_hypothesis_provider(
355 self, value: bool
356 ) -> Generator[None, None, None]:
357 previous = self._switch_to_hypothesis_provider
358 try:
359 self._switch_to_hypothesis_provider = value
360 yield
361 finally:
362 self._switch_to_hypothesis_provider = previous
363
364 @property
365 def using_hypothesis_backend(self) -> bool:
366 return (
367 self.settings.backend == "hypothesis" or self._switch_to_hypothesis_provider
368 )
369
370 def explain_next_call_as(self, explanation: str) -> None:
371 self.__pending_call_explanation = explanation
372
373 def clear_call_explanation(self) -> None:
374 self.__pending_call_explanation = None
375
376 @contextmanager
377 def _log_phase_statistics(
378 self, phase: Literal["reuse", "generate", "shrink"]
379 ) -> Generator[None, None, None]:
380 self.stats_per_test_case.clear()
381 start_time = time.perf_counter()
382 try:
383 self._current_phase = phase
384 yield
385 finally:
386 self.statistics[phase + "-phase"] = { # type: ignore
387 "duration-seconds": time.perf_counter() - start_time,
388 "test-cases": list(self.stats_per_test_case),
389 "distinct-failures": len(self.interesting_examples),
390 "shrinks-successful": self.shrinks,
391 }
392
393 @property
394 def should_optimise(self) -> bool:
395 return Phase.target in self.settings.phases
396
397 def __tree_is_exhausted(self) -> bool:
398 return self.tree.is_exhausted and self.using_hypothesis_backend
399
400 def __stoppable_test_function(self, data: ConjectureData) -> None:
401 """Run ``self._test_function``, but convert a ``StopTest`` exception
402 into a normal return and avoid raising anything flaky for RecursionErrors.
403 """
404 # We ensure that the test has this much stack space remaining, no
405 # matter the size of the stack when called, to de-flake RecursionErrors
406 # (#2494, #3671). Note, this covers the data generation part of the test;
407 # the actual test execution is additionally protected at the call site
408 # in hypothesis.core.execute_once.
409 with ensure_free_stackframes():
410 try:
411 self._test_function(data)
412 except StopTest as e:
413 if e.testcounter == data.testcounter:
414 # This StopTest has successfully stopped its test, and can now
415 # be discarded.
416 pass
417 else:
418 # This StopTest was raised by a different ConjectureData. We
419 # need to re-raise it so that it will eventually reach the
420 # correct engine.
421 raise
422
423 def _cache_key(self, choices: Sequence[ChoiceT]) -> tuple[ChoiceKeyT, ...]:
424 return choices_key(choices)
425
426 def _cache(self, data: ConjectureData) -> None:
427 result = data.as_result()
428 key = self._cache_key(data.choices)
429 self.__data_cache[key] = result
430
431 def cached_test_function(
432 self,
433 choices: Sequence[Union[ChoiceT, ChoiceTemplate]],
434 *,
435 error_on_discard: bool = False,
436 extend: Union[int, Literal["full"]] = 0,
437 ) -> Union[ConjectureResult, _Overrun]:
438 """
439 If ``error_on_discard`` is set to True this will raise ``ContainsDiscard``
440 in preference to running the actual test function. This is to allow us
441 to skip test cases we expect to be redundant in some cases. Note that
442 it may be the case that we don't raise ``ContainsDiscard`` even if the
443 result has discards if we cannot determine from previous runs whether
444 it will have a discard.
445 """
446 # node templates represent a not-yet-filled hole and therefore cannot
447 # be cached or retrieved from the cache.
448 if not any(isinstance(choice, ChoiceTemplate) for choice in choices):
449 # this type cast is validated by the isinstance check above (ie, there
450 # are no ChoiceTemplate elements).
451 choices = cast(Sequence[ChoiceT], choices)
452 key = self._cache_key(choices)
453 try:
454 cached = self.__data_cache[key]
455 # if we have a cached overrun for this key, but we're allowing extensions
456 # of the nodes, it could in fact run to a valid data if we try.
457 if extend == 0 or cached.status is not Status.OVERRUN:
458 return cached
459 except KeyError:
460 pass
461
462 if extend == "full":
463 max_length = None
464 elif (count := choice_count(choices)) is None:
465 max_length = None
466 else:
467 max_length = count + extend
468
469 # explicitly use a no-op DataObserver here instead of a TreeRecordingObserver.
470 # The reason is we don't expect simulate_test_function to explore new choices
471 # and write back to the tree, so we don't want the overhead of the
472 # TreeRecordingObserver tracking those calls.
473 trial_observer: Optional[DataObserver] = DataObserver()
474 if error_on_discard:
475 trial_observer = DiscardObserver()
476
477 try:
478 trial_data = self.new_conjecture_data(
479 choices, observer=trial_observer, max_choices=max_length
480 )
481 self.tree.simulate_test_function(trial_data)
482 except PreviouslyUnseenBehaviour:
483 pass
484 else:
485 trial_data.freeze()
486 key = self._cache_key(trial_data.choices)
487 if trial_data.status > Status.OVERRUN:
488 try:
489 return self.__data_cache[key]
490 except KeyError:
491 pass
492 else:
493 # if we simulated to an overrun, then we our result is certainly
494 # an overrun; no need to consult the cache. (and we store this result
495 # for simulation-less lookup later).
496 self.__data_cache[key] = Overrun
497 return Overrun
498 try:
499 return self.__data_cache[key]
500 except KeyError:
501 pass
502
503 data = self.new_conjecture_data(choices, max_choices=max_length)
504 # note that calling test_function caches `data` for us.
505 self.test_function(data)
506 return data.as_result()
507
508 def test_function(self, data: ConjectureData) -> None:
509 if self.__pending_call_explanation is not None:
510 self.debug(self.__pending_call_explanation)
511 self.__pending_call_explanation = None
512
513 self.call_count += 1
514 interrupted = False
515
516 try:
517 self.__stoppable_test_function(data)
518 except KeyboardInterrupt:
519 interrupted = True
520 raise
521 except BackendCannotProceed as exc:
522 if exc.scope in ("verified", "exhausted"):
523 self._switch_to_hypothesis_provider = True
524 if exc.scope == "verified":
525 self._verified_by = self.settings.backend
526 elif exc.scope == "discard_test_case":
527 self.__failed_realize_count += 1
528 if (
529 self.__failed_realize_count > 10
530 and (self.__failed_realize_count / self.call_count) > 0.2
531 ):
532 self._switch_to_hypothesis_provider = True
533
534 # treat all BackendCannotProceed exceptions as invalid. This isn't
535 # great; "verified" should really be counted as self.valid_examples += 1.
536 # But we check self.valid_examples == 0 to determine whether to raise
537 # Unsatisfiable, and that would throw this check off.
538 self.invalid_examples += 1
539
540 # skip the post-test-case tracking; we're pretending this never happened
541 interrupted = True
542 data.cannot_proceed_scope = exc.scope
543 data.freeze()
544 return
545 except BaseException:
546 data.freeze()
547 if self.settings.backend != "hypothesis":
548 realize_choices(data, for_failure=True)
549 self.save_choices(data.choices)
550 raise
551 finally:
552 # No branch, because if we're interrupted we always raise
553 # the KeyboardInterrupt, never continue to the code below.
554 if not interrupted: # pragma: no branch
555 assert data.cannot_proceed_scope is None
556 data.freeze()
557 call_stats: CallStats = {
558 "status": data.status.name.lower(),
559 "runtime": data.finish_time - data.start_time,
560 "drawtime": math.fsum(data.draw_times.values()),
561 "gctime": data.gc_finish_time - data.gc_start_time,
562 "events": sorted(
563 k if v == "" else f"{k}: {v}" for k, v in data.events.items()
564 ),
565 }
566 self.stats_per_test_case.append(call_stats)
567 if self.settings.backend != "hypothesis":
568 realize_choices(data, for_failure=data.status is Status.INTERESTING)
569
570 self._cache(data)
571 if data.misaligned_at is not None: # pragma: no branch # coverage bug?
572 self.misaligned_count += 1
573
574 self.debug_data(data)
575
576 if (
577 data.target_observations
578 and self.pareto_front is not None
579 and self.pareto_front.add(data.as_result())
580 ):
581 self.save_choices(data.choices, sub_key=b"pareto")
582
583 if data.status >= Status.VALID:
584 for k, v in data.target_observations.items():
585 self.best_observed_targets[k] = max(self.best_observed_targets[k], v)
586
587 if k not in self.best_examples_of_observed_targets:
588 data_as_result = data.as_result()
589 assert not isinstance(data_as_result, _Overrun)
590 self.best_examples_of_observed_targets[k] = data_as_result
591 continue
592
593 existing_example = self.best_examples_of_observed_targets[k]
594 existing_score = existing_example.target_observations[k]
595
596 if v < existing_score:
597 continue
598
599 if v > existing_score or sort_key(data.nodes) < sort_key(
600 existing_example.nodes
601 ):
602 data_as_result = data.as_result()
603 assert not isinstance(data_as_result, _Overrun)
604 self.best_examples_of_observed_targets[k] = data_as_result
605
606 if data.status is Status.VALID:
607 self.valid_examples += 1
608 if data.status is Status.INVALID:
609 self.invalid_examples += 1
610 if data.status is Status.OVERRUN:
611 self.overrun_examples += 1
612
613 if data.status == Status.INTERESTING:
614 if not self.using_hypothesis_backend:
615 # replay this failure on the hypothesis backend to ensure it still
616 # finds a failure. otherwise, it is flaky.
617 initial_exception = data.expected_exception
618 data = ConjectureData.for_choices(data.choices)
619 # we've already going to use the hypothesis provider for this
620 # data, so the verb "switch" is a bit misleading here. We're really
621 # setting this to inform our on_observation logic that the observation
622 # generated here was from a hypothesis backend, and shouldn't be
623 # sent to the on_observation of any alternative backend.
624 with self._with_switch_to_hypothesis_provider(True):
625 self.__stoppable_test_function(data)
626 data.freeze()
627 # TODO: Should same-origin also be checked? (discussion in
628 # https://github.com/HypothesisWorks/hypothesis/pull/4470#discussion_r2217055487)
629 if data.status != Status.INTERESTING:
630 desc_new_status = {
631 data.status.VALID: "passed",
632 data.status.INVALID: "failed filters",
633 data.status.OVERRUN: "overran",
634 }[data.status]
635 raise FlakyBackendFailure(
636 f"Inconsistent results from replaying a failing test case! "
637 f"Raised {type(initial_exception).__name__} on "
638 f"backend={self.settings.backend!r}, but "
639 f"{desc_new_status} under backend='hypothesis'.",
640 [initial_exception],
641 )
642
643 self._cache(data)
644
645 assert data.interesting_origin is not None
646 key = data.interesting_origin
647 changed = False
648 try:
649 existing = self.interesting_examples[key]
650 except KeyError:
651 changed = True
652 self.last_bug_found_at = self.call_count
653 if self.first_bug_found_at is None:
654 self.first_bug_found_at = self.call_count
655 else:
656 if sort_key(data.nodes) < sort_key(existing.nodes):
657 self.shrinks += 1
658 self.downgrade_choices(existing.choices)
659 self.__data_cache.unpin(self._cache_key(existing.choices))
660 changed = True
661
662 if changed:
663 self.save_choices(data.choices)
664 self.interesting_examples[key] = data.as_result() # type: ignore
665 if not self.using_hypothesis_backend:
666 self._backend_found_failure = True
667 self.__data_cache.pin(self._cache_key(data.choices), data.as_result())
668 self.shrunk_examples.discard(key)
669
670 if self.shrinks >= MAX_SHRINKS:
671 self.exit_with(ExitReason.max_shrinks)
672
673 if (
674 not self.ignore_limits
675 and self.finish_shrinking_deadline is not None
676 and self.finish_shrinking_deadline < time.perf_counter()
677 ):
678 # See https://github.com/HypothesisWorks/hypothesis/issues/2340
679 report(
680 "WARNING: Hypothesis has spent more than five minutes working to shrink"
681 " a failing example, and stopped because it is making very slow"
682 " progress. When you re-run your tests, shrinking will resume and may"
683 " take this long before aborting again.\nPLEASE REPORT THIS if you can"
684 " provide a reproducing example, so that we can improve shrinking"
685 " performance for everyone."
686 )
687 self.exit_with(ExitReason.very_slow_shrinking)
688
689 if not self.interesting_examples:
690 # Note that this logic is reproduced to end the generation phase when
691 # we have interesting examples. Update that too if you change this!
692 # (The doubled implementation is because here we exit the engine entirely,
693 # while in the other case below we just want to move on to shrinking.)
694 if self.valid_examples >= self.settings.max_examples:
695 self.exit_with(ExitReason.max_examples)
696 if self.call_count >= max(
697 self.settings.max_examples * 10,
698 # We have a high-ish default max iterations, so that tests
699 # don't become flaky when max_examples is too low.
700 1000,
701 ):
702 self.exit_with(ExitReason.max_iterations)
703
704 if self.__tree_is_exhausted():
705 self.exit_with(ExitReason.finished)
706
707 self.record_for_health_check(data)
708
709 def on_pareto_evict(self, data: ConjectureResult) -> None:
710 self.settings.database.delete(self.pareto_key, choices_to_bytes(data.choices))
711
712 def generate_novel_prefix(self) -> tuple[ChoiceT, ...]:
713 """Uses the tree to proactively generate a starting choice sequence
714 that we haven't explored yet for this test.
715
716 When this method is called, we assume that there must be at
717 least one novel prefix left to find. If there were not, then the
718 test run should have already stopped due to tree exhaustion.
719 """
720 return self.tree.generate_novel_prefix(self.random)
721
722 def record_for_health_check(self, data: ConjectureData) -> None:
723 # Once we've actually found a bug, there's no point in trying to run
724 # health checks - they'll just mask the actually important information.
725 if data.status == Status.INTERESTING:
726 self.health_check_state = None
727
728 state = self.health_check_state
729
730 if state is None:
731 return
732
733 for k, v in data.draw_times.items():
734 state.draw_times[k].append(v)
735
736 if data.status == Status.VALID:
737 state.valid_examples += 1
738 elif data.status == Status.INVALID:
739 state.invalid_examples += 1
740 else:
741 assert data.status == Status.OVERRUN
742 state.overrun_examples += 1
743
744 max_valid_draws = 10
745 max_invalid_draws = 50
746 max_overrun_draws = 20
747
748 assert state.valid_examples <= max_valid_draws
749
750 if state.valid_examples == max_valid_draws:
751 self.health_check_state = None
752 return
753
754 if state.overrun_examples == max_overrun_draws:
755 fail_health_check(
756 self.settings,
757 "Generated inputs routinely consumed more than the maximum "
758 f"allowed entropy: {state.valid_examples} inputs were generated "
759 f"successfully, while {state.overrun_examples} inputs exceeded the "
760 f"maximum allowed entropy during generation."
761 "\n\n"
762 f"Testing with inputs this large tends to be slow, and to produce "
763 "failures that are both difficult to shrink and difficult to understand. "
764 "Try decreasing the amount of data generated, for example by "
765 "decreasing the minimum size of collection strategies like "
766 "st.lists()."
767 "\n\n"
768 "If you expect the average size of your input to be this large, "
769 "you can disable this health check with "
770 "@settings(suppress_health_check=[HealthCheck.data_too_large]). "
771 "See "
772 "https://hypothesis.readthedocs.io/en/latest/reference/api.html#hypothesis.HealthCheck "
773 "for details.",
774 HealthCheck.data_too_large,
775 )
776 if state.invalid_examples == max_invalid_draws:
777 fail_health_check(
778 self.settings,
779 "It looks like this test is filtering out a lot of inputs. "
780 f"{state.valid_examples} inputs were generated successfully, "
781 f"while {state.invalid_examples} inputs were filtered out. "
782 "\n\n"
783 "An input might be filtered out by calls to assume(), "
784 "strategy.filter(...), or occasionally by Hypothesis internals."
785 "\n\n"
786 "Applying this much filtering makes input generation slow, since "
787 "Hypothesis must discard inputs which are filtered out and try "
788 "generating it again. It is also possible that applying this much "
789 "filtering will distort the domain and/or distribution of the test, "
790 "leaving your testing less rigorous than expected."
791 "\n\n"
792 "If you expect this many inputs to be filtered out during generation, "
793 "you can disable this health check with "
794 "@settings(suppress_health_check=[HealthCheck.filter_too_much]). See "
795 "https://hypothesis.readthedocs.io/en/latest/reference/api.html#hypothesis.HealthCheck "
796 "for details.",
797 HealthCheck.filter_too_much,
798 )
799
800 # Allow at least the greater of one second or 5x the deadline. If deadline
801 # is None, allow 30s - the user can disable the healthcheck too if desired.
802 draw_time = state.total_draw_time
803 draw_time_limit = 5 * (self.settings.deadline or timedelta(seconds=6))
804 if (
805 draw_time > max(1.0, draw_time_limit.total_seconds())
806 # we disable HealthCheck.too_slow under concurrent threads, since
807 # cpython may switch away from a thread for arbitrarily long.
808 and not self.thread_overlap.get(threading.get_ident(), False)
809 ):
810 extra_str = []
811 if state.invalid_examples:
812 extra_str.append(f"{state.invalid_examples} invalid inputs")
813 if state.overrun_examples:
814 extra_str.append(
815 f"{state.overrun_examples} inputs which exceeded the "
816 "maximum allowed entropy"
817 )
818 extra_str = ", and ".join(extra_str)
819 extra_str = f" ({extra_str})" if extra_str else ""
820
821 fail_health_check(
822 self.settings,
823 "Input generation is slow: Hypothesis only generated "
824 f"{state.valid_examples} valid inputs after {draw_time:.2f} "
825 f"seconds{extra_str}."
826 "\n" + state.timing_report() + "\n\n"
827 "This could be for a few reasons:"
828 "\n"
829 "1. This strategy could be generating too much data per input. "
830 "Try decreasing the amount of data generated, for example by "
831 "decreasing the minimum size of collection strategies like "
832 "st.lists()."
833 "\n"
834 "2. Some other expensive computation could be running during input "
835 "generation. For example, "
836 "if @st.composite or st.data() is interspersed with an expensive "
837 "computation, HealthCheck.too_slow is likely to trigger. If this "
838 "computation is unrelated to input generation, move it elsewhere. "
839 "Otherwise, try making it more efficient, or disable this health "
840 "check if that is not possible."
841 "\n\n"
842 "If you expect input generation to take this long, you can disable "
843 "this health check with "
844 "@settings(suppress_health_check=[HealthCheck.too_slow]). See "
845 "https://hypothesis.readthedocs.io/en/latest/reference/api.html#hypothesis.HealthCheck "
846 "for details.",
847 HealthCheck.too_slow,
848 )
849
850 def save_choices(
851 self, choices: Sequence[ChoiceT], sub_key: Optional[bytes] = None
852 ) -> None:
853 if self.settings.database is not None:
854 key = self.sub_key(sub_key)
855 if key is None:
856 return
857 self.settings.database.save(key, choices_to_bytes(choices))
858
859 def downgrade_choices(self, choices: Sequence[ChoiceT]) -> None:
860 buffer = choices_to_bytes(choices)
861 if self.settings.database is not None and self.database_key is not None:
862 self.settings.database.move(self.database_key, self.secondary_key, buffer)
863
864 def sub_key(self, sub_key: Optional[bytes]) -> Optional[bytes]:
865 if self.database_key is None:
866 return None
867 if sub_key is None:
868 return self.database_key
869 return b".".join((self.database_key, sub_key))
870
871 @property
872 def secondary_key(self) -> Optional[bytes]:
873 return self.sub_key(b"secondary")
874
875 @property
876 def pareto_key(self) -> Optional[bytes]:
877 return self.sub_key(b"pareto")
878
879 def debug(self, message: str) -> None:
880 if self.settings.verbosity >= Verbosity.debug:
881 base_report(message)
882
883 @property
884 def report_debug_info(self) -> bool:
885 return self.settings.verbosity >= Verbosity.debug
886
887 def debug_data(self, data: Union[ConjectureData, ConjectureResult]) -> None:
888 if not self.report_debug_info:
889 return
890
891 status = repr(data.status)
892 if data.status == Status.INTERESTING:
893 status = f"{status} ({data.interesting_origin!r})"
894
895 self.debug(
896 f"{len(data.choices)} choices {data.choices} -> {status}"
897 f"{', ' + data.output if data.output else ''}"
898 )
899
900 def observe_for_provider(self) -> AbstractContextManager:
901 def on_observation(observation: Observation) -> None:
902 assert observation.type == "test_case"
903 # because lifetime == "test_function"
904 assert isinstance(self.provider, PrimitiveProvider)
905 # only fire if we actually used that provider to generate this observation
906 if not self._switch_to_hypothesis_provider:
907 self.provider.on_observation(observation)
908
909 if (
910 self.settings.backend != "hypothesis"
911 # only for lifetime = "test_function" providers (guaranteed
912 # by this isinstance check)
913 and isinstance(self.provider, PrimitiveProvider)
914 # and the provider opted-in to observations
915 and self.provider.add_observability_callback
916 ):
917 return with_observability_callback(on_observation)
918 return nullcontext()
919
920 def run(self) -> None:
921 with local_settings(self.settings):
922 # NOTE: For compatibility with Python 3.9's LL(1)
923 # parser, this is written as a nested with-statement,
924 # instead of a compound one.
925 with self.observe_for_provider():
926 try:
927 self._run()
928 except RunIsComplete:
929 pass
930 for v in self.interesting_examples.values():
931 self.debug_data(v)
932 self.debug(
933 "Run complete after %d examples (%d valid) and %d shrinks"
934 % (self.call_count, self.valid_examples, self.shrinks)
935 )
936
937 @property
938 def database(self) -> Optional[ExampleDatabase]:
939 if self.database_key is None:
940 return None
941 return self.settings.database
942
943 def has_existing_examples(self) -> bool:
944 return self.database is not None and Phase.reuse in self.settings.phases
945
946 def reuse_existing_examples(self) -> None:
947 """If appropriate (we have a database and have been told to use it),
948 try to reload existing examples from the database.
949
950 If there are a lot we don't try all of them. We always try the
951 smallest example in the database (which is guaranteed to be the
952 last failure) and the largest (which is usually the seed example
953 which the last failure came from but we don't enforce that). We
954 then take a random sampling of the remainder and try those. Any
955 examples that are no longer interesting are cleared out.
956 """
957 if self.has_existing_examples():
958 self.debug("Reusing examples from database")
959 # We have to do some careful juggling here. We have two database
960 # corpora: The primary and secondary. The primary corpus is a
961 # small set of minimized examples each of which has at one point
962 # demonstrated a distinct bug. We want to retry all of these.
963
964 # We also have a secondary corpus of examples that have at some
965 # point demonstrated interestingness (currently only ones that
966 # were previously non-minimal examples of a bug, but this will
967 # likely expand in future). These are a good source of potentially
968 # interesting examples, but there are a lot of them, so we down
969 # sample the secondary corpus to a more manageable size.
970
971 corpus = sorted(
972 self.settings.database.fetch(self.database_key), key=shortlex
973 )
974 factor = 0.1 if (Phase.generate in self.settings.phases) else 1
975 desired_size = max(2, ceil(factor * self.settings.max_examples))
976 primary_corpus_size = len(corpus)
977
978 if len(corpus) < desired_size:
979 extra_corpus = list(self.settings.database.fetch(self.secondary_key))
980
981 shortfall = desired_size - len(corpus)
982
983 if len(extra_corpus) <= shortfall:
984 extra = extra_corpus
985 else:
986 extra = self.random.sample(extra_corpus, shortfall)
987 extra.sort(key=shortlex)
988 corpus.extend(extra)
989
990 # We want a fast path where every primary entry in the database was
991 # interesting.
992 found_interesting_in_primary = False
993 all_interesting_in_primary_were_exact = True
994
995 for i, existing in enumerate(corpus):
996 if i >= primary_corpus_size and found_interesting_in_primary:
997 break
998 choices = choices_from_bytes(existing)
999 if choices is None:
1000 # clear out any keys which fail deserialization
1001 self.settings.database.delete(self.database_key, existing)
1002 continue
1003 data = self.cached_test_function(choices, extend="full")
1004 if data.status != Status.INTERESTING:
1005 self.settings.database.delete(self.database_key, existing)
1006 self.settings.database.delete(self.secondary_key, existing)
1007 else:
1008 if i < primary_corpus_size:
1009 found_interesting_in_primary = True
1010 assert not isinstance(data, _Overrun)
1011 if choices_key(choices) != choices_key(data.choices):
1012 all_interesting_in_primary_were_exact = False
1013 if not self.settings.report_multiple_bugs:
1014 break
1015 if found_interesting_in_primary:
1016 if all_interesting_in_primary_were_exact:
1017 self.reused_previously_shrunk_test_case = True
1018
1019 # Because self.database is not None (because self.has_existing_examples())
1020 # and self.database_key is not None (because we fetched using it above),
1021 # we can guarantee self.pareto_front is not None
1022 assert self.pareto_front is not None
1023
1024 # If we've not found any interesting examples so far we try some of
1025 # the pareto front from the last run.
1026 if len(corpus) < desired_size and not self.interesting_examples:
1027 desired_extra = desired_size - len(corpus)
1028 pareto_corpus = list(self.settings.database.fetch(self.pareto_key))
1029 if len(pareto_corpus) > desired_extra:
1030 pareto_corpus = self.random.sample(pareto_corpus, desired_extra)
1031 pareto_corpus.sort(key=shortlex)
1032
1033 for existing in pareto_corpus:
1034 choices = choices_from_bytes(existing)
1035 if choices is None:
1036 self.settings.database.delete(self.pareto_key, existing)
1037 continue
1038 data = self.cached_test_function(choices, extend="full")
1039 if data not in self.pareto_front:
1040 self.settings.database.delete(self.pareto_key, existing)
1041 if data.status == Status.INTERESTING:
1042 break
1043
1044 def exit_with(self, reason: ExitReason) -> None:
1045 if self.ignore_limits:
1046 return
1047 self.statistics["stopped-because"] = reason.describe(self.settings)
1048 if self.best_observed_targets:
1049 self.statistics["targets"] = dict(self.best_observed_targets)
1050 self.debug(f"exit_with({reason.name})")
1051 self.exit_reason = reason
1052 raise RunIsComplete
1053
1054 def should_generate_more(self) -> bool:
1055 # End the generation phase where we would have ended it if no bugs had
1056 # been found. This reproduces the exit logic in `self.test_function`,
1057 # but with the important distinction that this clause will move on to
1058 # the shrinking phase having found one or more bugs, while the other
1059 # will exit having found zero bugs.
1060 if self.valid_examples >= self.settings.max_examples or self.call_count >= max(
1061 self.settings.max_examples * 10, 1000
1062 ): # pragma: no cover
1063 return False
1064
1065 # If we haven't found a bug, keep looking - if we hit any limits on
1066 # the number of tests to run that will raise an exception and stop
1067 # the run.
1068 if not self.interesting_examples:
1069 return True
1070 # Users who disable shrinking probably want to exit as fast as possible.
1071 # If we've found a bug and won't report more than one, stop looking.
1072 elif (
1073 Phase.shrink not in self.settings.phases
1074 or not self.settings.report_multiple_bugs
1075 ):
1076 return False
1077 assert isinstance(self.first_bug_found_at, int)
1078 assert isinstance(self.last_bug_found_at, int)
1079 assert self.first_bug_found_at <= self.last_bug_found_at <= self.call_count
1080 # Otherwise, keep searching for between ten and 'a heuristic' calls.
1081 # We cap 'calls after first bug' so errors are reported reasonably
1082 # soon even for tests that are allowed to run for a very long time,
1083 # or sooner if the latest half of our test effort has been fruitless.
1084 return self.call_count < MIN_TEST_CALLS or self.call_count < min(
1085 self.first_bug_found_at + 1000, self.last_bug_found_at * 2
1086 )
1087
1088 def generate_new_examples(self) -> None:
1089 if Phase.generate not in self.settings.phases:
1090 return
1091 if self.interesting_examples:
1092 # The example database has failing examples from a previous run,
1093 # so we'd rather report that they're still failing ASAP than take
1094 # the time to look for additional failures.
1095 return
1096
1097 self.debug("Generating new examples")
1098
1099 assert self.should_generate_more()
1100 self._switch_to_hypothesis_provider = True
1101 zero_data = self.cached_test_function((ChoiceTemplate("simplest", count=None),))
1102 if zero_data.status > Status.OVERRUN:
1103 assert isinstance(zero_data, ConjectureResult)
1104 # if the crosshair backend cannot proceed, it does not (and cannot)
1105 # realize the symbolic values, with the intent that Hypothesis will
1106 # throw away this test case. We usually do, but if it's the zero data
1107 # then we try to pin it here, which requires realizing the symbolics.
1108 #
1109 # We don't (yet) rely on the zero data being pinned, and so
1110 # it's simply a very slight performance loss to simply not pin it
1111 # if doing so would error.
1112 if zero_data.cannot_proceed_scope is None: # pragma: no branch
1113 self.__data_cache.pin(
1114 self._cache_key(zero_data.choices), zero_data.as_result()
1115 ) # Pin forever
1116
1117 if zero_data.status == Status.OVERRUN or (
1118 zero_data.status == Status.VALID
1119 and isinstance(zero_data, ConjectureResult)
1120 and zero_data.length * 2 > BUFFER_SIZE
1121 ):
1122 fail_health_check(
1123 self.settings,
1124 "The smallest natural input for this test is very "
1125 "large. This makes it difficult for Hypothesis to generate "
1126 "good inputs, especially when trying to shrink failing inputs."
1127 "\n\n"
1128 "Consider reducing the amount of data generated by the strategy. "
1129 "Also consider introducing small alternative values for some "
1130 "strategies. For example, could you "
1131 "mark some arguments as optional by replacing `some_complex_strategy`"
1132 "with `st.none() | some_complex_strategy`?"
1133 "\n\n"
1134 "If you are confident that the size of the smallest natural input "
1135 "to your test cannot be reduced, you can suppress this health check "
1136 "with @settings(suppress_health_check=[HealthCheck.large_base_example]). "
1137 "See "
1138 "https://hypothesis.readthedocs.io/en/latest/reference/api.html#hypothesis.HealthCheck "
1139 "for details.",
1140 HealthCheck.large_base_example,
1141 )
1142
1143 self.health_check_state = HealthCheckState()
1144
1145 # We attempt to use the size of the minimal generated test case starting
1146 # from a given novel prefix as a guideline to generate smaller test
1147 # cases for an initial period, by restriscting ourselves to test cases
1148 # that are not much larger than it.
1149 #
1150 # Calculating the actual minimal generated test case is hard, so we
1151 # take a best guess that zero extending a prefix produces the minimal
1152 # test case starting with that prefix (this is true for our built in
1153 # strategies). This is only a reasonable thing to do if the resulting
1154 # test case is valid. If we regularly run into situations where it is
1155 # not valid then this strategy is a waste of time, so we want to
1156 # abandon it early. In order to do this we track how many times in a
1157 # row it has failed to work, and abort small test case generation when
1158 # it has failed too many times in a row.
1159 consecutive_zero_extend_is_invalid = 0
1160
1161 # We control growth during initial example generation, for two
1162 # reasons:
1163 #
1164 # * It gives us an opportunity to find small examples early, which
1165 # gives us a fast path for easy to find bugs.
1166 # * It avoids low probability events where we might end up
1167 # generating very large examples during health checks, which
1168 # on slower machines can trigger HealthCheck.too_slow.
1169 #
1170 # The heuristic we use is that we attempt to estimate the smallest
1171 # extension of this prefix, and limit the size to no more than
1172 # an order of magnitude larger than that. If we fail to estimate
1173 # the size accurately, we skip over this prefix and try again.
1174 #
1175 # We need to tune the example size based on the initial prefix,
1176 # because any fixed size might be too small, and any size based
1177 # on the strategy in general can fall afoul of strategies that
1178 # have very different sizes for different prefixes.
1179 #
1180 # We previously set a minimum value of 10 on small_example_cap, with the
1181 # reasoning of avoiding flaky health checks. However, some users set a
1182 # low max_examples for performance. A hard lower bound in this case biases
1183 # the distribution towards small (and less powerful) examples. Flaky
1184 # and loud health checks are better than silent performance degradation.
1185 small_example_cap = min(self.settings.max_examples // 10, 50)
1186 optimise_at = max(self.settings.max_examples // 2, small_example_cap + 1, 10)
1187 ran_optimisations = False
1188 self._switch_to_hypothesis_provider = False
1189
1190 while self.should_generate_more():
1191 # we don't yet integrate DataTree with backends. Instead of generating
1192 # a novel prefix, ask the backend for an input.
1193 if not self.using_hypothesis_backend:
1194 data = self.new_conjecture_data([])
1195 with suppress(BackendCannotProceed):
1196 self.test_function(data)
1197 continue
1198
1199 self._current_phase = "generate"
1200 prefix = self.generate_novel_prefix()
1201 if (
1202 self.valid_examples <= small_example_cap
1203 and self.call_count <= 5 * small_example_cap
1204 and not self.interesting_examples
1205 and consecutive_zero_extend_is_invalid < 5
1206 ):
1207 minimal_example = self.cached_test_function(
1208 prefix + (ChoiceTemplate("simplest", count=None),)
1209 )
1210
1211 if minimal_example.status < Status.VALID:
1212 consecutive_zero_extend_is_invalid += 1
1213 continue
1214 # Because the Status code is greater than Status.VALID, it cannot be
1215 # Status.OVERRUN, which guarantees that the minimal_example is a
1216 # ConjectureResult object.
1217 assert isinstance(minimal_example, ConjectureResult)
1218 consecutive_zero_extend_is_invalid = 0
1219 minimal_extension = len(minimal_example.choices) - len(prefix)
1220 max_length = len(prefix) + minimal_extension * 5
1221
1222 # We could end up in a situation where even though the prefix was
1223 # novel when we generated it, because we've now tried zero extending
1224 # it not all possible continuations of it will be novel. In order to
1225 # avoid making redundant test calls, we rerun it in simulation mode
1226 # first. If this has a predictable result, then we don't bother
1227 # running the test function for real here. If however we encounter
1228 # some novel behaviour, we try again with the real test function,
1229 # starting from the new novel prefix that has discovered.
1230 trial_data = self.new_conjecture_data(prefix, max_choices=max_length)
1231 try:
1232 self.tree.simulate_test_function(trial_data)
1233 continue
1234 except PreviouslyUnseenBehaviour:
1235 pass
1236
1237 # If the simulation entered part of the tree that has been killed,
1238 # we don't want to run this.
1239 assert isinstance(trial_data.observer, TreeRecordingObserver)
1240 if trial_data.observer.killed:
1241 continue
1242
1243 # We might have hit the cap on number of examples we should
1244 # run when calculating the minimal example.
1245 if not self.should_generate_more():
1246 break
1247
1248 prefix = trial_data.choices
1249 else:
1250 max_length = None
1251
1252 data = self.new_conjecture_data(prefix, max_choices=max_length)
1253 self.test_function(data)
1254
1255 if (
1256 data.status is Status.OVERRUN
1257 and max_length is not None
1258 and "invalid because" not in data.events
1259 ):
1260 data.events["invalid because"] = (
1261 "reduced max size for early examples (avoids flaky health checks)"
1262 )
1263
1264 self.generate_mutations_from(data)
1265
1266 # Although the optimisations are logically a distinct phase, we
1267 # actually normally run them as part of example generation. The
1268 # reason for this is that we cannot guarantee that optimisation
1269 # actually exhausts our budget: It might finish running and we
1270 # discover that actually we still could run a bunch more test cases
1271 # if we want.
1272 if (
1273 self.valid_examples >= max(small_example_cap, optimise_at)
1274 and not ran_optimisations
1275 ):
1276 ran_optimisations = True
1277 self._current_phase = "target"
1278 self.optimise_targets()
1279
1280 def generate_mutations_from(
1281 self, data: Union[ConjectureData, ConjectureResult]
1282 ) -> None:
1283 # A thing that is often useful but rarely happens by accident is
1284 # to generate the same value at multiple different points in the
1285 # test case.
1286 #
1287 # Rather than make this the responsibility of individual strategies
1288 # we implement a small mutator that just takes parts of the test
1289 # case with the same label and tries replacing one of them with a
1290 # copy of the other and tries running it. If we've made a good
1291 # guess about what to put where, this will run a similar generated
1292 # test case with more duplication.
1293 if (
1294 # An OVERRUN doesn't have enough information about the test
1295 # case to mutate, so we just skip those.
1296 data.status >= Status.INVALID
1297 # This has a tendency to trigger some weird edge cases during
1298 # generation so we don't let it run until we're done with the
1299 # health checks.
1300 and self.health_check_state is None
1301 ):
1302 initial_calls = self.call_count
1303 failed_mutations = 0
1304
1305 while (
1306 self.should_generate_more()
1307 # We implement fairly conservative checks for how long we
1308 # we should run mutation for, as it's generally not obvious
1309 # how helpful it is for any given test case.
1310 and self.call_count <= initial_calls + 5
1311 and failed_mutations <= 5
1312 ):
1313 groups = data.spans.mutator_groups
1314 if not groups:
1315 break
1316
1317 group = self.random.choice(groups)
1318 (start1, end1), (start2, end2) = self.random.sample(sorted(group), 2)
1319 if start1 > start2:
1320 (start1, end1), (start2, end2) = (start2, end2), (start1, end1)
1321
1322 if (
1323 start1 <= start2 <= end2 <= end1
1324 ): # pragma: no cover # flaky on conjecture-cover tests
1325 # One span entirely contains the other. The strategy is very
1326 # likely some kind of tree. e.g. we might have
1327 #
1328 # ┌─────┐
1329 # ┌─────┤ a ├──────┐
1330 # │ └─────┘ │
1331 # ┌──┴──┐ ┌──┴──┐
1332 # ┌──┤ b ├──┐ ┌──┤ c ├──┐
1333 # │ └──┬──┘ │ │ └──┬──┘ │
1334 # ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐
1335 # │ d │ │ e │ │ f │ │ g │ │ h │ │ i │
1336 # └───┘ └───┘ └───┘ └───┘ └───┘ └───┘
1337 #
1338 # where each node is drawn from the same strategy and so
1339 # has the same span label. We might have selected the spans
1340 # corresponding to the a and c nodes, which is the entire
1341 # tree and the subtree of (and including) c respectively.
1342 #
1343 # There are two possible mutations we could apply in this case:
1344 # 1. replace a with c (replace child with parent)
1345 # 2. replace c with a (replace parent with child)
1346 #
1347 # (1) results in multiple partial copies of the
1348 # parent:
1349 # ┌─────┐
1350 # ┌─────┤ a ├────────────┐
1351 # │ └─────┘ │
1352 # ┌──┴──┐ ┌─┴───┐
1353 # ┌──┤ b ├──┐ ┌─────┤ a ├──────┐
1354 # │ └──┬──┘ │ │ └─────┘ │
1355 # ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌──┴──┐ ┌──┴──┐
1356 # │ d │ │ e │ │ f │ ┌──┤ b ├──┐ ┌──┤ c ├──┐
1357 # └───┘ └───┘ └───┘ │ └──┬──┘ │ │ └──┬──┘ │
1358 # ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐
1359 # │ d │ │ e │ │ f │ │ g │ │ h │ │ i │
1360 # └───┘ └───┘ └───┘ └───┘ └───┘ └───┘
1361 #
1362 # While (2) results in truncating part of the parent:
1363 #
1364 # ┌─────┐
1365 # ┌──┤ c ├──┐
1366 # │ └──┬──┘ │
1367 # ┌─┴─┐ ┌─┴─┐ ┌─┴─┐
1368 # │ g │ │ h │ │ i │
1369 # └───┘ └───┘ └───┘
1370 #
1371 # (1) is the same as Example IV.4. in Nautilus (NDSS '19)
1372 # (https://wcventure.github.io/FuzzingPaper/Paper/NDSS19_Nautilus.pdf),
1373 # except we do not repeat the replacement additional times
1374 # (the paper repeats it once for a total of two copies).
1375 #
1376 # We currently only apply mutation (1), and ignore mutation
1377 # (2). The reason is that the attempt generated from (2) is
1378 # always something that Hypothesis could easily have generated
1379 # itself, by simply not making various choices. Whereas
1380 # duplicating the exact value + structure of particular choices
1381 # in (1) would have been hard for Hypothesis to generate by
1382 # chance.
1383 #
1384 # TODO: an extension of this mutation might repeat (1) on
1385 # a geometric distribution between 0 and ~10 times. We would
1386 # need to find the corresponding span to recurse on in the new
1387 # choices, probably just by using the choices index.
1388
1389 # case (1): duplicate the choices in start1:start2.
1390 attempt = data.choices[:start2] + data.choices[start1:]
1391 else:
1392 (start, end) = self.random.choice([(start1, end1), (start2, end2)])
1393 replacement = data.choices[start:end]
1394 # We attempt to replace both the examples with
1395 # whichever choice we made. Note that this might end
1396 # up messing up and getting the example boundaries
1397 # wrong - labels matching are only a best guess as to
1398 # whether the two are equivalent - but it doesn't
1399 # really matter. It may not achieve the desired result,
1400 # but it's still a perfectly acceptable choice sequence
1401 # to try.
1402 attempt = (
1403 data.choices[:start1]
1404 + replacement
1405 + data.choices[end1:start2]
1406 + replacement
1407 + data.choices[end2:]
1408 )
1409
1410 try:
1411 new_data = self.cached_test_function(
1412 attempt,
1413 # We set error_on_discard so that we don't end up
1414 # entering parts of the tree we consider redundant
1415 # and not worth exploring.
1416 error_on_discard=True,
1417 )
1418 except ContainsDiscard:
1419 failed_mutations += 1
1420 continue
1421
1422 if new_data is Overrun:
1423 failed_mutations += 1 # pragma: no cover # annoying case
1424 else:
1425 assert isinstance(new_data, ConjectureResult)
1426 if (
1427 new_data.status >= data.status
1428 and choices_key(data.choices) != choices_key(new_data.choices)
1429 and all(
1430 k in new_data.target_observations
1431 and new_data.target_observations[k] >= v
1432 for k, v in data.target_observations.items()
1433 )
1434 ):
1435 data = new_data
1436 failed_mutations = 0
1437 else:
1438 failed_mutations += 1
1439
1440 def optimise_targets(self) -> None:
1441 """If any target observations have been made, attempt to optimise them
1442 all."""
1443 if not self.should_optimise:
1444 return
1445 from hypothesis.internal.conjecture.optimiser import Optimiser
1446
1447 # We want to avoid running the optimiser for too long in case we hit
1448 # an unbounded target score. We start this off fairly conservatively
1449 # in case interesting examples are easy to find and then ramp it up
1450 # on an exponential schedule so we don't hamper the optimiser too much
1451 # if it needs a long time to find good enough improvements.
1452 max_improvements = 10
1453 while True:
1454 prev_calls = self.call_count
1455
1456 any_improvements = False
1457
1458 for target, data in list(self.best_examples_of_observed_targets.items()):
1459 optimiser = Optimiser(
1460 self, data, target, max_improvements=max_improvements
1461 )
1462 optimiser.run()
1463 if optimiser.improvements > 0:
1464 any_improvements = True
1465
1466 if self.interesting_examples:
1467 break
1468
1469 max_improvements *= 2
1470
1471 if any_improvements:
1472 continue
1473
1474 if self.best_observed_targets:
1475 self.pareto_optimise()
1476
1477 if prev_calls == self.call_count:
1478 break
1479
1480 def pareto_optimise(self) -> None:
1481 if self.pareto_front is not None:
1482 ParetoOptimiser(self).run()
1483
1484 def _run(self) -> None:
1485 # have to use the primitive provider to interpret database bits...
1486 self._switch_to_hypothesis_provider = True
1487 with self._log_phase_statistics("reuse"):
1488 self.reuse_existing_examples()
1489 # Fast path for development: If the database gave us interesting
1490 # examples from the previously stored primary key, don't try
1491 # shrinking it again as it's unlikely to work.
1492 if self.reused_previously_shrunk_test_case:
1493 self.exit_with(ExitReason.finished)
1494 # ...but we should use the supplied provider when generating...
1495 self._switch_to_hypothesis_provider = False
1496 with self._log_phase_statistics("generate"):
1497 self.generate_new_examples()
1498 # We normally run the targeting phase mixed in with the generate phase,
1499 # but if we've been asked to run it but not generation then we have to
1500 # run it explicitly on its own here.
1501 if Phase.generate not in self.settings.phases:
1502 self._current_phase = "target"
1503 self.optimise_targets()
1504 # ...and back to the primitive provider when shrinking.
1505 self._switch_to_hypothesis_provider = True
1506 with self._log_phase_statistics("shrink"):
1507 self.shrink_interesting_examples()
1508 self.exit_with(ExitReason.finished)
1509
1510 def new_conjecture_data(
1511 self,
1512 prefix: Sequence[Union[ChoiceT, ChoiceTemplate]],
1513 *,
1514 observer: Optional[DataObserver] = None,
1515 max_choices: Optional[int] = None,
1516 ) -> ConjectureData:
1517 provider = (
1518 HypothesisProvider if self._switch_to_hypothesis_provider else self.provider
1519 )
1520 observer = observer or self.tree.new_observer()
1521 if not self.using_hypothesis_backend:
1522 observer = DataObserver()
1523
1524 return ConjectureData(
1525 prefix=prefix,
1526 observer=observer,
1527 provider=provider,
1528 max_choices=max_choices,
1529 random=self.random,
1530 )
1531
1532 def shrink_interesting_examples(self) -> None:
1533 """If we've found interesting examples, try to replace each of them
1534 with a minimal interesting example with the same interesting_origin.
1535
1536 We may find one or more examples with a new interesting_origin
1537 during the shrink process. If so we shrink these too.
1538 """
1539 if Phase.shrink not in self.settings.phases or not self.interesting_examples:
1540 return
1541
1542 self.debug("Shrinking interesting examples")
1543 self.finish_shrinking_deadline = time.perf_counter() + MAX_SHRINKING_SECONDS
1544
1545 for prev_data in sorted(
1546 self.interesting_examples.values(), key=lambda d: sort_key(d.nodes)
1547 ):
1548 assert prev_data.status == Status.INTERESTING
1549 data = self.new_conjecture_data(prev_data.choices)
1550 self.test_function(data)
1551 if data.status != Status.INTERESTING:
1552 self.exit_with(ExitReason.flaky)
1553
1554 self.clear_secondary_key()
1555
1556 while len(self.shrunk_examples) < len(self.interesting_examples):
1557 target, example = min(
1558 (
1559 (k, v)
1560 for k, v in self.interesting_examples.items()
1561 if k not in self.shrunk_examples
1562 ),
1563 key=lambda kv: (sort_key(kv[1].nodes), shortlex(repr(kv[0]))),
1564 )
1565 self.debug(f"Shrinking {target!r}: {example.choices}")
1566
1567 if not self.settings.report_multiple_bugs:
1568 # If multi-bug reporting is disabled, we shrink our currently-minimal
1569 # failure, allowing 'slips' to any bug with a smaller minimal example.
1570 self.shrink(example, lambda d: d.status == Status.INTERESTING)
1571 return
1572
1573 def predicate(d: Union[ConjectureResult, _Overrun]) -> bool:
1574 if d.status < Status.INTERESTING:
1575 return False
1576 d = cast(ConjectureResult, d)
1577 return d.interesting_origin == target
1578
1579 self.shrink(example, predicate)
1580
1581 self.shrunk_examples.add(target)
1582
1583 def clear_secondary_key(self) -> None:
1584 if self.has_existing_examples():
1585 # If we have any smaller examples in the secondary corpus, now is
1586 # a good time to try them to see if they work as shrinks. They
1587 # probably won't, but it's worth a shot and gives us a good
1588 # opportunity to clear out the database.
1589
1590 # It's not worth trying the primary corpus because we already
1591 # tried all of those in the initial phase.
1592 corpus = sorted(
1593 self.settings.database.fetch(self.secondary_key), key=shortlex
1594 )
1595 for c in corpus:
1596 choices = choices_from_bytes(c)
1597 if choices is None:
1598 self.settings.database.delete(self.secondary_key, c)
1599 continue
1600 primary = {
1601 choices_to_bytes(v.choices)
1602 for v in self.interesting_examples.values()
1603 }
1604 if shortlex(c) > max(map(shortlex, primary)):
1605 break
1606
1607 self.cached_test_function(choices)
1608 # We unconditionally remove c from the secondary key as it
1609 # is either now primary or worse than our primary example
1610 # of this reason for interestingness.
1611 self.settings.database.delete(self.secondary_key, c)
1612
1613 def shrink(
1614 self,
1615 example: Union[ConjectureData, ConjectureResult],
1616 predicate: Optional[ShrinkPredicateT] = None,
1617 allow_transition: Optional[
1618 Callable[[Union[ConjectureData, ConjectureResult], ConjectureData], bool]
1619 ] = None,
1620 ) -> Union[ConjectureData, ConjectureResult]:
1621 s = self.new_shrinker(example, predicate, allow_transition)
1622 s.shrink()
1623 return s.shrink_target
1624
1625 def new_shrinker(
1626 self,
1627 example: Union[ConjectureData, ConjectureResult],
1628 predicate: Optional[ShrinkPredicateT] = None,
1629 allow_transition: Optional[
1630 Callable[[Union[ConjectureData, ConjectureResult], ConjectureData], bool]
1631 ] = None,
1632 ) -> Shrinker:
1633 return Shrinker(
1634 self,
1635 example,
1636 predicate,
1637 allow_transition=allow_transition,
1638 explain=Phase.explain in self.settings.phases,
1639 in_target_phase=self._current_phase == "target",
1640 )
1641
1642 def passing_choice_sequences(
1643 self, prefix: Sequence[ChoiceNode] = ()
1644 ) -> frozenset[tuple[ChoiceNode, ...]]:
1645 """Return a collection of choice sequence nodes which cause the test to pass.
1646 Optionally restrict this by a certain prefix, which is useful for explain mode.
1647 """
1648 return frozenset(
1649 cast(ConjectureResult, result).nodes
1650 for key in self.__data_cache
1651 if (result := self.__data_cache[key]).status is Status.VALID
1652 and startswith(cast(ConjectureResult, result).nodes, prefix)
1653 )
1654
1655
1656class ContainsDiscard(Exception):
1657 pass