1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
10
11import importlib
12import inspect
13import math
14import time
15from collections import defaultdict
16from collections.abc import Generator, Sequence
17from contextlib import AbstractContextManager, contextmanager, nullcontext, suppress
18from dataclasses import dataclass, field
19from datetime import timedelta
20from enum import Enum
21from random import Random, getrandbits
22from typing import Callable, Final, List, Literal, NoReturn, Optional, Union, cast
23
24from hypothesis import HealthCheck, Phase, Verbosity, settings as Settings
25from hypothesis._settings import local_settings, note_deprecation
26from hypothesis.database import ExampleDatabase, choices_from_bytes, choices_to_bytes
27from hypothesis.errors import (
28 BackendCannotProceed,
29 FlakyBackendFailure,
30 HypothesisException,
31 InvalidArgument,
32 StopTest,
33)
34from hypothesis.internal.cache import LRUReusedCache
35from hypothesis.internal.compat import NotRequired, TypedDict, ceil, override
36from hypothesis.internal.conjecture.choice import (
37 ChoiceConstraintsT,
38 ChoiceKeyT,
39 ChoiceNode,
40 ChoiceT,
41 ChoiceTemplate,
42 choices_key,
43)
44from hypothesis.internal.conjecture.data import (
45 ConjectureData,
46 ConjectureResult,
47 DataObserver,
48 Overrun,
49 Status,
50 _Overrun,
51)
52from hypothesis.internal.conjecture.datatree import (
53 DataTree,
54 PreviouslyUnseenBehaviour,
55 TreeRecordingObserver,
56)
57from hypothesis.internal.conjecture.junkdrawer import (
58 ensure_free_stackframes,
59 startswith,
60)
61from hypothesis.internal.conjecture.pareto import NO_SCORE, ParetoFront, ParetoOptimiser
62from hypothesis.internal.conjecture.providers import (
63 AVAILABLE_PROVIDERS,
64 HypothesisProvider,
65 PrimitiveProvider,
66)
67from hypothesis.internal.conjecture.shrinker import Shrinker, ShrinkPredicateT, sort_key
68from hypothesis.internal.escalation import InterestingOrigin
69from hypothesis.internal.healthcheck import fail_health_check
70from hypothesis.internal.observability import Observation, with_observation_callback
71from hypothesis.reporting import base_report, report
72
73#: The maximum number of times the shrinker will reduce the complexity of a failing
74#: input before giving up. This avoids falling down a trap of exponential (or worse)
75#: complexity, where the shrinker appears to be making progress but will take a
76#: substantially long time to finish completely.
77MAX_SHRINKS: Final[int] = 500
78
79# If the shrinking phase takes more than five minutes, abort it early and print
80# a warning. Many CI systems will kill a build after around ten minutes with
81# no output, and appearing to hang isn't great for interactive use either -
82# showing partially-shrunk examples is better than quitting with no examples!
83# (but make it monkeypatchable, for the rare users who need to keep on shrinking)
84
85#: The maximum total time in seconds that the shrinker will try to shrink a failure
86#: for before giving up. This is across all shrinks for the same failure, so even
87#: if the shrinker successfully reduces the complexity of a single failure several
88#: times, it will stop when it hits |MAX_SHRINKING_SECONDS| of total time taken.
89MAX_SHRINKING_SECONDS: Final[int] = 300
90
91#: The maximum amount of entropy a single test case can use before giving up
92#: while making random choices during input generation.
93#:
94#: The "unit" of one |BUFFER_SIZE| does not have any defined semantics, and you
95#: should not rely on it, except that a linear increase |BUFFER_SIZE| will linearly
96#: increase the amount of entropy a test case can use during generation.
97BUFFER_SIZE: Final[int] = 8 * 1024
98CACHE_SIZE: Final[int] = 10000
99MIN_TEST_CALLS: Final[int] = 10
100
101
102def shortlex(s):
103 return (len(s), s)
104
105
106@dataclass
107class HealthCheckState:
108 valid_examples: int = field(default=0)
109 invalid_examples: int = field(default=0)
110 overrun_examples: int = field(default=0)
111 draw_times: "defaultdict[str, List[float]]" = field(
112 default_factory=lambda: defaultdict(list)
113 )
114
115 @property
116 def total_draw_time(self) -> float:
117 return math.fsum(sum(self.draw_times.values(), start=[]))
118
119 def timing_report(self) -> str:
120 """Return a terminal report describing what was slow."""
121 if not self.draw_times:
122 return ""
123 width = max(
124 len(k.removeprefix("generate:").removesuffix(": ")) for k in self.draw_times
125 )
126 out = [f"\n {'':^{width}} count | fraction | slowest draws (seconds)"]
127 args_in_order = sorted(self.draw_times.items(), key=lambda kv: -sum(kv[1]))
128 for i, (argname, times) in enumerate(args_in_order): # pragma: no branch
129 # If we have very many unique keys, which can happen due to interactive
130 # draws with computed labels, we'll skip uninformative rows.
131 if (
132 5 <= i < (len(self.draw_times) - 2)
133 and math.fsum(times) * 20 < self.total_draw_time
134 ):
135 out.append(f" (skipped {len(self.draw_times) - i} rows of fast draws)")
136 break
137 # Compute the row to report, omitting times <1ms to focus on slow draws
138 reprs = [f"{t:>6.3f}," for t in sorted(times)[-5:] if t > 5e-4]
139 desc = " ".join(([" -- "] * 5 + reprs)[-5:]).rstrip(",")
140 arg = argname.removeprefix("generate:").removesuffix(": ")
141 out.append(
142 f" {arg:^{width}} | {len(times):>4} | "
143 f"{math.fsum(times)/self.total_draw_time:>7.0%} | {desc}"
144 )
145 return "\n".join(out)
146
147
148class ExitReason(Enum):
149 max_examples = "settings.max_examples={s.max_examples}"
150 max_iterations = (
151 "settings.max_examples={s.max_examples}, "
152 "but < 10% of examples satisfied assumptions"
153 )
154 max_shrinks = f"shrunk example {MAX_SHRINKS} times"
155 finished = "nothing left to do"
156 flaky = "test was flaky"
157 very_slow_shrinking = "shrinking was very slow"
158
159 def describe(self, settings: Settings) -> str:
160 return self.value.format(s=settings)
161
162
163class RunIsComplete(Exception):
164 pass
165
166
167def _get_provider(backend: str) -> Union[type, PrimitiveProvider]:
168 mname, cname = AVAILABLE_PROVIDERS[backend].rsplit(".", 1)
169 provider_cls = getattr(importlib.import_module(mname), cname)
170 if provider_cls.lifetime == "test_function":
171 return provider_cls(None)
172 elif provider_cls.lifetime == "test_case":
173 return provider_cls
174 else:
175 raise InvalidArgument(
176 f"invalid lifetime {provider_cls.lifetime} for provider {provider_cls.__name__}. "
177 "Expected one of 'test_function', 'test_case'."
178 )
179
180
181class CallStats(TypedDict):
182 status: str
183 runtime: float
184 drawtime: float
185 gctime: float
186 events: list[str]
187
188
189PhaseStatistics = TypedDict(
190 "PhaseStatistics",
191 {
192 "duration-seconds": float,
193 "test-cases": list[CallStats],
194 "distinct-failures": int,
195 "shrinks-successful": int,
196 },
197)
198StatisticsDict = TypedDict(
199 "StatisticsDict",
200 {
201 "generate-phase": NotRequired[PhaseStatistics],
202 "reuse-phase": NotRequired[PhaseStatistics],
203 "shrink-phase": NotRequired[PhaseStatistics],
204 "stopped-because": NotRequired[str],
205 "targets": NotRequired[dict[str, float]],
206 "nodeid": NotRequired[str],
207 },
208)
209
210
211def choice_count(choices: Sequence[Union[ChoiceT, ChoiceTemplate]]) -> Optional[int]:
212 count = 0
213 for choice in choices:
214 if isinstance(choice, ChoiceTemplate):
215 if choice.count is None:
216 return None
217 count += choice.count
218 else:
219 count += 1
220 return count
221
222
223class DiscardObserver(DataObserver):
224 @override
225 def kill_branch(self) -> NoReturn:
226 raise ContainsDiscard
227
228
229def realize_choices(data: ConjectureData, *, for_failure: bool) -> None:
230 # backwards-compatibility with backends without for_failure, can remove
231 # in a few months
232 kwargs = {}
233 if for_failure:
234 if "for_failure" in inspect.signature(data.provider.realize).parameters:
235 kwargs["for_failure"] = True
236 else:
237 note_deprecation(
238 f"{type(data.provider).__qualname__}.realize does not have the "
239 "for_failure parameter. This will be an error in future versions "
240 "of Hypothesis. (If you installed this backend from a separate "
241 "package, upgrading that package may help).",
242 has_codemod=False,
243 since="2025-05-07",
244 )
245
246 for node in data.nodes:
247 value = data.provider.realize(node.value, **kwargs)
248 expected_type = {
249 "string": str,
250 "float": float,
251 "integer": int,
252 "boolean": bool,
253 "bytes": bytes,
254 }[node.type]
255 if type(value) is not expected_type:
256 raise HypothesisException(
257 f"expected {expected_type} from "
258 f"{data.provider.realize.__qualname__}, got {type(value)}"
259 )
260
261 constraints = cast(
262 ChoiceConstraintsT,
263 {
264 k: data.provider.realize(v, **kwargs)
265 for k, v in node.constraints.items()
266 },
267 )
268 node.value = value
269 node.constraints = constraints
270
271
272class ConjectureRunner:
273 def __init__(
274 self,
275 test_function: Callable[[ConjectureData], None],
276 *,
277 settings: Optional[Settings] = None,
278 random: Optional[Random] = None,
279 database_key: Optional[bytes] = None,
280 ignore_limits: bool = False,
281 ) -> None:
282 self._test_function: Callable[[ConjectureData], None] = test_function
283 self.settings: Settings = settings or Settings()
284 self.shrinks: int = 0
285 self.finish_shrinking_deadline: Optional[float] = None
286 self.call_count: int = 0
287 self.misaligned_count: int = 0
288 self.valid_examples: int = 0
289 self.invalid_examples: int = 0
290 self.overrun_examples: int = 0
291 self.random: Random = random or Random(getrandbits(128))
292 self.database_key: Optional[bytes] = database_key
293 self.ignore_limits: bool = ignore_limits
294
295 # Global dict of per-phase statistics, and a list of per-call stats
296 # which transfer to the global dict at the end of each phase.
297 self._current_phase: str = "(not a phase)"
298 self.statistics: StatisticsDict = {}
299 self.stats_per_test_case: list[CallStats] = []
300
301 self.interesting_examples: dict[InterestingOrigin, ConjectureResult] = {}
302 # We use call_count because there may be few possible valid_examples.
303 self.first_bug_found_at: Optional[int] = None
304 self.last_bug_found_at: Optional[int] = None
305
306 # At runtime, the keys are only ever type `InterestingOrigin`, but can be `None` during tests.
307 self.shrunk_examples: set[Optional[InterestingOrigin]] = set()
308
309 self.health_check_state: Optional[HealthCheckState] = None
310
311 self.tree: DataTree = DataTree()
312
313 self.provider: Union[type, PrimitiveProvider] = _get_provider(
314 self.settings.backend
315 )
316
317 self.best_observed_targets: defaultdict[str, float] = defaultdict(
318 lambda: NO_SCORE
319 )
320 self.best_examples_of_observed_targets: dict[str, ConjectureResult] = {}
321
322 # We keep the pareto front in the example database if we have one. This
323 # is only marginally useful at present, but speeds up local development
324 # because it means that large targets will be quickly surfaced in your
325 # testing.
326 self.pareto_front: Optional[ParetoFront] = None
327 if self.database_key is not None and self.settings.database is not None:
328 self.pareto_front = ParetoFront(self.random)
329 self.pareto_front.on_evict(self.on_pareto_evict)
330
331 # We want to be able to get the ConjectureData object that results
332 # from running a buffer without recalculating, especially during
333 # shrinking where we need to know about the structure of the
334 # executed test case.
335 self.__data_cache = LRUReusedCache[
336 tuple[ChoiceKeyT, ...], Union[ConjectureResult, _Overrun]
337 ](CACHE_SIZE)
338
339 self.reused_previously_shrunk_test_case: bool = False
340
341 self.__pending_call_explanation: Optional[str] = None
342 self._backend_found_failure: bool = False
343 self._backend_exceeded_deadline: bool = False
344 self._switch_to_hypothesis_provider: bool = False
345
346 self.__failed_realize_count: int = 0
347 # note unsound verification by alt backends
348 self._verified_by: Optional[str] = None
349
350 @contextmanager
351 def _with_switch_to_hypothesis_provider(
352 self, value: bool
353 ) -> Generator[None, None, None]:
354 previous = self._switch_to_hypothesis_provider
355 try:
356 self._switch_to_hypothesis_provider = value
357 yield
358 finally:
359 self._switch_to_hypothesis_provider = previous
360
361 @property
362 def using_hypothesis_backend(self) -> bool:
363 return (
364 self.settings.backend == "hypothesis" or self._switch_to_hypothesis_provider
365 )
366
367 def explain_next_call_as(self, explanation: str) -> None:
368 self.__pending_call_explanation = explanation
369
370 def clear_call_explanation(self) -> None:
371 self.__pending_call_explanation = None
372
373 @contextmanager
374 def _log_phase_statistics(
375 self, phase: Literal["reuse", "generate", "shrink"]
376 ) -> Generator[None, None, None]:
377 self.stats_per_test_case.clear()
378 start_time = time.perf_counter()
379 try:
380 self._current_phase = phase
381 yield
382 finally:
383 # We ignore the mypy type error here. Because `phase` is a string literal and "-phase" is a string literal
384 # as well, the concatenation will always be valid key in the dictionary.
385 self.statistics[phase + "-phase"] = { # type: ignore
386 "duration-seconds": time.perf_counter() - start_time,
387 "test-cases": list(self.stats_per_test_case),
388 "distinct-failures": len(self.interesting_examples),
389 "shrinks-successful": self.shrinks,
390 }
391
392 @property
393 def should_optimise(self) -> bool:
394 return Phase.target in self.settings.phases
395
396 def __tree_is_exhausted(self) -> bool:
397 return self.tree.is_exhausted and self.using_hypothesis_backend
398
399 def __stoppable_test_function(self, data: ConjectureData) -> None:
400 """Run ``self._test_function``, but convert a ``StopTest`` exception
401 into a normal return and avoid raising anything flaky for RecursionErrors.
402 """
403 # We ensure that the test has this much stack space remaining, no
404 # matter the size of the stack when called, to de-flake RecursionErrors
405 # (#2494, #3671). Note, this covers the data generation part of the test;
406 # the actual test execution is additionally protected at the call site
407 # in hypothesis.core.execute_once.
408 with ensure_free_stackframes():
409 try:
410 self._test_function(data)
411 except StopTest as e:
412 if e.testcounter == data.testcounter:
413 # This StopTest has successfully stopped its test, and can now
414 # be discarded.
415 pass
416 else:
417 # This StopTest was raised by a different ConjectureData. We
418 # need to re-raise it so that it will eventually reach the
419 # correct engine.
420 raise
421
422 def _cache_key(self, choices: Sequence[ChoiceT]) -> tuple[ChoiceKeyT, ...]:
423 return choices_key(choices)
424
425 def _cache(self, data: ConjectureData) -> None:
426 result = data.as_result()
427 key = self._cache_key(data.choices)
428 self.__data_cache[key] = result
429
430 def cached_test_function(
431 self,
432 choices: Sequence[Union[ChoiceT, ChoiceTemplate]],
433 *,
434 error_on_discard: bool = False,
435 extend: Union[int, Literal["full"]] = 0,
436 ) -> Union[ConjectureResult, _Overrun]:
437 """
438 If ``error_on_discard`` is set to True this will raise ``ContainsDiscard``
439 in preference to running the actual test function. This is to allow us
440 to skip test cases we expect to be redundant in some cases. Note that
441 it may be the case that we don't raise ``ContainsDiscard`` even if the
442 result has discards if we cannot determine from previous runs whether
443 it will have a discard.
444 """
445 # node templates represent a not-yet-filled hole and therefore cannot
446 # be cached or retrieved from the cache.
447 if not any(isinstance(choice, ChoiceTemplate) for choice in choices):
448 # this type cast is validated by the isinstance check above (ie, there
449 # are no ChoiceTemplate elements).
450 choices = cast(Sequence[ChoiceT], choices)
451 key = self._cache_key(choices)
452 try:
453 cached = self.__data_cache[key]
454 # if we have a cached overrun for this key, but we're allowing extensions
455 # of the nodes, it could in fact run to a valid data if we try.
456 if extend == 0 or cached.status is not Status.OVERRUN:
457 return cached
458 except KeyError:
459 pass
460
461 if extend == "full":
462 max_length = None
463 elif (count := choice_count(choices)) is None:
464 max_length = None
465 else:
466 max_length = count + extend
467
468 # explicitly use a no-op DataObserver here instead of a TreeRecordingObserver.
469 # The reason is we don't expect simulate_test_function to explore new choices
470 # and write back to the tree, so we don't want the overhead of the
471 # TreeRecordingObserver tracking those calls.
472 trial_observer: Optional[DataObserver] = DataObserver()
473 if error_on_discard:
474 trial_observer = DiscardObserver()
475
476 try:
477 trial_data = self.new_conjecture_data(
478 choices, observer=trial_observer, max_choices=max_length
479 )
480 self.tree.simulate_test_function(trial_data)
481 except PreviouslyUnseenBehaviour:
482 pass
483 else:
484 trial_data.freeze()
485 key = self._cache_key(trial_data.choices)
486 if trial_data.status > Status.OVERRUN:
487 try:
488 return self.__data_cache[key]
489 except KeyError:
490 pass
491 else:
492 # if we simulated to an overrun, then we our result is certainly
493 # an overrun; no need to consult the cache. (and we store this result
494 # for simulation-less lookup later).
495 self.__data_cache[key] = Overrun
496 return Overrun
497 try:
498 return self.__data_cache[key]
499 except KeyError:
500 pass
501
502 data = self.new_conjecture_data(choices, max_choices=max_length)
503 # note that calling test_function caches `data` for us, for both an ir
504 # tree key and a buffer key.
505 self.test_function(data)
506 return data.as_result()
507
508 def test_function(self, data: ConjectureData) -> None:
509 if self.__pending_call_explanation is not None:
510 self.debug(self.__pending_call_explanation)
511 self.__pending_call_explanation = None
512
513 self.call_count += 1
514 interrupted = False
515
516 try:
517 self.__stoppable_test_function(data)
518 except KeyboardInterrupt:
519 interrupted = True
520 raise
521 except BackendCannotProceed as exc:
522 if exc.scope in ("verified", "exhausted"):
523 self._switch_to_hypothesis_provider = True
524 if exc.scope == "verified":
525 self._verified_by = self.settings.backend
526 elif exc.scope == "discard_test_case":
527 self.__failed_realize_count += 1
528 if (
529 self.__failed_realize_count > 10
530 and (self.__failed_realize_count / self.call_count) > 0.2
531 ):
532 self._switch_to_hypothesis_provider = True
533
534 # treat all BackendCannotProceed exceptions as invalid. This isn't
535 # great; "verified" should really be counted as self.valid_examples += 1.
536 # But we check self.valid_examples == 0 to determine whether to raise
537 # Unsatisfiable, and that would throw this check off.
538 self.invalid_examples += 1
539
540 # skip the post-test-case tracking; we're pretending this never happened
541 interrupted = True
542 data.cannot_proceed_scope = exc.scope
543 data.freeze()
544 return
545 except BaseException:
546 data.freeze()
547 if self.settings.backend != "hypothesis":
548 realize_choices(data, for_failure=True)
549 self.save_choices(data.choices)
550 raise
551 finally:
552 # No branch, because if we're interrupted we always raise
553 # the KeyboardInterrupt, never continue to the code below.
554 if not interrupted: # pragma: no branch
555 assert data.cannot_proceed_scope is None
556 data.freeze()
557 call_stats: CallStats = {
558 "status": data.status.name.lower(),
559 "runtime": data.finish_time - data.start_time,
560 "drawtime": math.fsum(data.draw_times.values()),
561 "gctime": data.gc_finish_time - data.gc_start_time,
562 "events": sorted(
563 k if v == "" else f"{k}: {v}" for k, v in data.events.items()
564 ),
565 }
566 self.stats_per_test_case.append(call_stats)
567 if self.settings.backend != "hypothesis":
568 realize_choices(data, for_failure=data.status is Status.INTERESTING)
569
570 self._cache(data)
571 if data.misaligned_at is not None: # pragma: no branch # coverage bug?
572 self.misaligned_count += 1
573
574 self.debug_data(data)
575
576 if (
577 data.target_observations
578 and self.pareto_front is not None
579 and self.pareto_front.add(data.as_result())
580 ):
581 self.save_choices(data.choices, sub_key=b"pareto")
582
583 if data.status >= Status.VALID:
584 for k, v in data.target_observations.items():
585 self.best_observed_targets[k] = max(self.best_observed_targets[k], v)
586
587 if k not in self.best_examples_of_observed_targets:
588 data_as_result = data.as_result()
589 assert not isinstance(data_as_result, _Overrun)
590 self.best_examples_of_observed_targets[k] = data_as_result
591 continue
592
593 existing_example = self.best_examples_of_observed_targets[k]
594 existing_score = existing_example.target_observations[k]
595
596 if v < existing_score:
597 continue
598
599 if v > existing_score or sort_key(data.nodes) < sort_key(
600 existing_example.nodes
601 ):
602 data_as_result = data.as_result()
603 assert not isinstance(data_as_result, _Overrun)
604 self.best_examples_of_observed_targets[k] = data_as_result
605
606 if data.status is Status.VALID:
607 self.valid_examples += 1
608 if data.status is Status.INVALID:
609 self.invalid_examples += 1
610 if data.status is Status.OVERRUN:
611 self.overrun_examples += 1
612
613 if data.status == Status.INTERESTING:
614 if not self.using_hypothesis_backend:
615 # replay this failure on the hypothesis backend to ensure it still
616 # finds a failure. otherwise, it is flaky.
617 initial_exception = data.expected_exception
618 data = ConjectureData.for_choices(data.choices)
619 # we've already going to use the hypothesis provider for this
620 # data, so the verb "switch" is a bit misleading here. We're really
621 # setting this to inform our on_observation logic that the observation
622 # generated here was from a hypothesis backend, and shouldn't be
623 # sent to the on_observation of any alternative backend.
624 with self._with_switch_to_hypothesis_provider(True):
625 self.__stoppable_test_function(data)
626 data.freeze()
627 # TODO: Should same-origin also be checked? (discussion in
628 # https://github.com/HypothesisWorks/hypothesis/pull/4470#discussion_r2217055487)
629 if data.status != Status.INTERESTING:
630 desc_new_status = {
631 data.status.VALID: "passed",
632 data.status.INVALID: "failed filters",
633 data.status.OVERRUN: "overran",
634 }[data.status]
635 raise FlakyBackendFailure(
636 f"Inconsistent results from replaying a failing test case! "
637 f"Raised {type(initial_exception).__name__} on "
638 f"backend={self.settings.backend!r}, but "
639 f"{desc_new_status} under backend='hypothesis'.",
640 [initial_exception],
641 )
642
643 self._cache(data)
644
645 assert data.interesting_origin is not None
646 key = data.interesting_origin
647 changed = False
648 try:
649 existing = self.interesting_examples[key]
650 except KeyError:
651 changed = True
652 self.last_bug_found_at = self.call_count
653 if self.first_bug_found_at is None:
654 self.first_bug_found_at = self.call_count
655 else:
656 if sort_key(data.nodes) < sort_key(existing.nodes):
657 self.shrinks += 1
658 self.downgrade_choices(existing.choices)
659 self.__data_cache.unpin(self._cache_key(existing.choices))
660 changed = True
661
662 if changed:
663 self.save_choices(data.choices)
664 self.interesting_examples[key] = data.as_result() # type: ignore
665 if not self.using_hypothesis_backend:
666 self._backend_found_failure = True
667 self.__data_cache.pin(self._cache_key(data.choices), data.as_result())
668 self.shrunk_examples.discard(key)
669
670 if self.shrinks >= MAX_SHRINKS:
671 self.exit_with(ExitReason.max_shrinks)
672
673 if (
674 not self.ignore_limits
675 and self.finish_shrinking_deadline is not None
676 and self.finish_shrinking_deadline < time.perf_counter()
677 ):
678 # See https://github.com/HypothesisWorks/hypothesis/issues/2340
679 report(
680 "WARNING: Hypothesis has spent more than five minutes working to shrink"
681 " a failing example, and stopped because it is making very slow"
682 " progress. When you re-run your tests, shrinking will resume and may"
683 " take this long before aborting again.\nPLEASE REPORT THIS if you can"
684 " provide a reproducing example, so that we can improve shrinking"
685 " performance for everyone."
686 )
687 self.exit_with(ExitReason.very_slow_shrinking)
688
689 if not self.interesting_examples:
690 # Note that this logic is reproduced to end the generation phase when
691 # we have interesting examples. Update that too if you change this!
692 # (The doubled implementation is because here we exit the engine entirely,
693 # while in the other case below we just want to move on to shrinking.)
694 if self.valid_examples >= self.settings.max_examples:
695 self.exit_with(ExitReason.max_examples)
696 if self.call_count >= max(
697 self.settings.max_examples * 10,
698 # We have a high-ish default max iterations, so that tests
699 # don't become flaky when max_examples is too low.
700 1000,
701 ):
702 self.exit_with(ExitReason.max_iterations)
703
704 if self.__tree_is_exhausted():
705 self.exit_with(ExitReason.finished)
706
707 self.record_for_health_check(data)
708
709 def on_pareto_evict(self, data: ConjectureResult) -> None:
710 self.settings.database.delete(self.pareto_key, choices_to_bytes(data.choices))
711
712 def generate_novel_prefix(self) -> tuple[ChoiceT, ...]:
713 """Uses the tree to proactively generate a starting choice sequence
714 that we haven't explored yet for this test.
715
716 When this method is called, we assume that there must be at
717 least one novel prefix left to find. If there were not, then the
718 test run should have already stopped due to tree exhaustion.
719 """
720 return self.tree.generate_novel_prefix(self.random)
721
722 def record_for_health_check(self, data: ConjectureData) -> None:
723 # Once we've actually found a bug, there's no point in trying to run
724 # health checks - they'll just mask the actually important information.
725 if data.status == Status.INTERESTING:
726 self.health_check_state = None
727
728 state = self.health_check_state
729
730 if state is None:
731 return
732
733 for k, v in data.draw_times.items():
734 state.draw_times[k].append(v)
735
736 if data.status == Status.VALID:
737 state.valid_examples += 1
738 elif data.status == Status.INVALID:
739 state.invalid_examples += 1
740 else:
741 assert data.status == Status.OVERRUN
742 state.overrun_examples += 1
743
744 max_valid_draws = 10
745 max_invalid_draws = 50
746 max_overrun_draws = 20
747
748 assert state.valid_examples <= max_valid_draws
749
750 if state.valid_examples == max_valid_draws:
751 self.health_check_state = None
752 return
753
754 if state.overrun_examples == max_overrun_draws:
755 fail_health_check(
756 self.settings,
757 "Examples routinely exceeded the max allowable size. "
758 f"({state.overrun_examples} examples overran while generating "
759 f"{state.valid_examples} valid ones). Generating examples this large "
760 "will usually lead to bad results. You could try setting max_size "
761 "parameters on your collections and turning max_leaves down on "
762 "recursive() calls.",
763 HealthCheck.data_too_large,
764 )
765 if state.invalid_examples == max_invalid_draws:
766 fail_health_check(
767 self.settings,
768 "It looks like your strategy is filtering out a lot of data. Health "
769 f"check found {state.invalid_examples} filtered examples but only "
770 f"{state.valid_examples} good ones. This will make your tests much "
771 "slower, and also will probably distort the data generation quite a "
772 "lot. You should adapt your strategy to filter less. This can also "
773 "be caused by a low max_leaves parameter in recursive() calls",
774 HealthCheck.filter_too_much,
775 )
776
777 draw_time = state.total_draw_time
778
779 # Allow at least the greater of one second or 5x the deadline. If deadline
780 # is None, allow 30s - the user can disable the healthcheck too if desired.
781 draw_time_limit = 5 * (self.settings.deadline or timedelta(seconds=6))
782 if draw_time > max(1.0, draw_time_limit.total_seconds()):
783 fail_health_check(
784 self.settings,
785 "Data generation is extremely slow: Only produced "
786 f"{state.valid_examples} valid examples in {draw_time:.2f} seconds "
787 f"({state.invalid_examples} invalid ones and {state.overrun_examples} "
788 "exceeded maximum size). Try decreasing size of the data you're "
789 "generating (with e.g. max_size or max_leaves parameters)."
790 + state.timing_report(),
791 HealthCheck.too_slow,
792 )
793
794 def save_choices(
795 self, choices: Sequence[ChoiceT], sub_key: Optional[bytes] = None
796 ) -> None:
797 if self.settings.database is not None:
798 key = self.sub_key(sub_key)
799 if key is None:
800 return
801 self.settings.database.save(key, choices_to_bytes(choices))
802
803 def downgrade_choices(self, choices: Sequence[ChoiceT]) -> None:
804 buffer = choices_to_bytes(choices)
805 if self.settings.database is not None and self.database_key is not None:
806 self.settings.database.move(self.database_key, self.secondary_key, buffer)
807
808 def sub_key(self, sub_key: Optional[bytes]) -> Optional[bytes]:
809 if self.database_key is None:
810 return None
811 if sub_key is None:
812 return self.database_key
813 return b".".join((self.database_key, sub_key))
814
815 @property
816 def secondary_key(self) -> Optional[bytes]:
817 return self.sub_key(b"secondary")
818
819 @property
820 def pareto_key(self) -> Optional[bytes]:
821 return self.sub_key(b"pareto")
822
823 def debug(self, message: str) -> None:
824 if self.settings.verbosity >= Verbosity.debug:
825 base_report(message)
826
827 @property
828 def report_debug_info(self) -> bool:
829 return self.settings.verbosity >= Verbosity.debug
830
831 def debug_data(self, data: Union[ConjectureData, ConjectureResult]) -> None:
832 if not self.report_debug_info:
833 return
834
835 status = repr(data.status)
836 if data.status == Status.INTERESTING:
837 status = f"{status} ({data.interesting_origin!r})"
838
839 self.debug(
840 f"{len(data.choices)} choices {data.choices} -> {status}"
841 f"{', ' + data.output if data.output else ''}"
842 )
843
844 def observe_for_provider(self) -> AbstractContextManager:
845 def on_observation(observation: Observation) -> None:
846 assert observation.type == "test_case"
847 # because lifetime == "test_function"
848 assert isinstance(self.provider, PrimitiveProvider)
849 # only fire if we actually used that provider to generate this observation
850 if not self._switch_to_hypothesis_provider:
851 self.provider.on_observation(observation)
852
853 if (
854 self.settings.backend != "hypothesis"
855 # only for lifetime = "test_function" providers (guaranteed
856 # by this isinstance check)
857 and isinstance(self.provider, PrimitiveProvider)
858 # and the provider opted-in to observations
859 and self.provider.add_observability_callback
860 ):
861 return with_observation_callback(on_observation)
862 return nullcontext()
863
864 def run(self) -> None:
865 with local_settings(self.settings):
866 # NOTE: For compatibility with Python 3.9's LL(1)
867 # parser, this is written as a nested with-statement,
868 # instead of a compound one.
869 with self.observe_for_provider():
870 try:
871 self._run()
872 except RunIsComplete:
873 pass
874 for v in self.interesting_examples.values():
875 self.debug_data(v)
876 self.debug(
877 "Run complete after %d examples (%d valid) and %d shrinks"
878 % (self.call_count, self.valid_examples, self.shrinks)
879 )
880
881 @property
882 def database(self) -> Optional[ExampleDatabase]:
883 if self.database_key is None:
884 return None
885 return self.settings.database
886
887 def has_existing_examples(self) -> bool:
888 return self.database is not None and Phase.reuse in self.settings.phases
889
890 def reuse_existing_examples(self) -> None:
891 """If appropriate (we have a database and have been told to use it),
892 try to reload existing examples from the database.
893
894 If there are a lot we don't try all of them. We always try the
895 smallest example in the database (which is guaranteed to be the
896 last failure) and the largest (which is usually the seed example
897 which the last failure came from but we don't enforce that). We
898 then take a random sampling of the remainder and try those. Any
899 examples that are no longer interesting are cleared out.
900 """
901 if self.has_existing_examples():
902 self.debug("Reusing examples from database")
903 # We have to do some careful juggling here. We have two database
904 # corpora: The primary and secondary. The primary corpus is a
905 # small set of minimized examples each of which has at one point
906 # demonstrated a distinct bug. We want to retry all of these.
907
908 # We also have a secondary corpus of examples that have at some
909 # point demonstrated interestingness (currently only ones that
910 # were previously non-minimal examples of a bug, but this will
911 # likely expand in future). These are a good source of potentially
912 # interesting examples, but there are a lot of them, so we down
913 # sample the secondary corpus to a more manageable size.
914
915 corpus = sorted(
916 self.settings.database.fetch(self.database_key), key=shortlex
917 )
918 factor = 0.1 if (Phase.generate in self.settings.phases) else 1
919 desired_size = max(2, ceil(factor * self.settings.max_examples))
920 primary_corpus_size = len(corpus)
921
922 if len(corpus) < desired_size:
923 extra_corpus = list(self.settings.database.fetch(self.secondary_key))
924
925 shortfall = desired_size - len(corpus)
926
927 if len(extra_corpus) <= shortfall:
928 extra = extra_corpus
929 else:
930 extra = self.random.sample(extra_corpus, shortfall)
931 extra.sort(key=shortlex)
932 corpus.extend(extra)
933
934 # We want a fast path where every primary entry in the database was
935 # interesting.
936 found_interesting_in_primary = False
937 all_interesting_in_primary_were_exact = True
938
939 for i, existing in enumerate(corpus):
940 if i >= primary_corpus_size and found_interesting_in_primary:
941 break
942 choices = choices_from_bytes(existing)
943 if choices is None:
944 # clear out any keys which fail deserialization
945 self.settings.database.delete(self.database_key, existing)
946 continue
947 data = self.cached_test_function(choices, extend="full")
948 if data.status != Status.INTERESTING:
949 self.settings.database.delete(self.database_key, existing)
950 self.settings.database.delete(self.secondary_key, existing)
951 else:
952 if i < primary_corpus_size:
953 found_interesting_in_primary = True
954 assert not isinstance(data, _Overrun)
955 if choices_key(choices) != choices_key(data.choices):
956 all_interesting_in_primary_were_exact = False
957 if not self.settings.report_multiple_bugs:
958 break
959 if found_interesting_in_primary:
960 if all_interesting_in_primary_were_exact:
961 self.reused_previously_shrunk_test_case = True
962
963 # Because self.database is not None (because self.has_existing_examples())
964 # and self.database_key is not None (because we fetched using it above),
965 # we can guarantee self.pareto_front is not None
966 assert self.pareto_front is not None
967
968 # If we've not found any interesting examples so far we try some of
969 # the pareto front from the last run.
970 if len(corpus) < desired_size and not self.interesting_examples:
971 desired_extra = desired_size - len(corpus)
972 pareto_corpus = list(self.settings.database.fetch(self.pareto_key))
973 if len(pareto_corpus) > desired_extra:
974 pareto_corpus = self.random.sample(pareto_corpus, desired_extra)
975 pareto_corpus.sort(key=shortlex)
976
977 for existing in pareto_corpus:
978 choices = choices_from_bytes(existing)
979 if choices is None:
980 self.settings.database.delete(self.pareto_key, existing)
981 continue
982 data = self.cached_test_function(choices, extend="full")
983 if data not in self.pareto_front:
984 self.settings.database.delete(self.pareto_key, existing)
985 if data.status == Status.INTERESTING:
986 break
987
988 def exit_with(self, reason: ExitReason) -> None:
989 if self.ignore_limits:
990 return
991 self.statistics["stopped-because"] = reason.describe(self.settings)
992 if self.best_observed_targets:
993 self.statistics["targets"] = dict(self.best_observed_targets)
994 self.debug(f"exit_with({reason.name})")
995 self.exit_reason = reason
996 raise RunIsComplete
997
998 def should_generate_more(self) -> bool:
999 # End the generation phase where we would have ended it if no bugs had
1000 # been found. This reproduces the exit logic in `self.test_function`,
1001 # but with the important distinction that this clause will move on to
1002 # the shrinking phase having found one or more bugs, while the other
1003 # will exit having found zero bugs.
1004 if self.valid_examples >= self.settings.max_examples or self.call_count >= max(
1005 self.settings.max_examples * 10, 1000
1006 ): # pragma: no cover
1007 return False
1008
1009 # If we haven't found a bug, keep looking - if we hit any limits on
1010 # the number of tests to run that will raise an exception and stop
1011 # the run.
1012 if not self.interesting_examples:
1013 return True
1014 # Users who disable shrinking probably want to exit as fast as possible.
1015 # If we've found a bug and won't report more than one, stop looking.
1016 elif (
1017 Phase.shrink not in self.settings.phases
1018 or not self.settings.report_multiple_bugs
1019 ):
1020 return False
1021 assert isinstance(self.first_bug_found_at, int)
1022 assert isinstance(self.last_bug_found_at, int)
1023 assert self.first_bug_found_at <= self.last_bug_found_at <= self.call_count
1024 # Otherwise, keep searching for between ten and 'a heuristic' calls.
1025 # We cap 'calls after first bug' so errors are reported reasonably
1026 # soon even for tests that are allowed to run for a very long time,
1027 # or sooner if the latest half of our test effort has been fruitless.
1028 return self.call_count < MIN_TEST_CALLS or self.call_count < min(
1029 self.first_bug_found_at + 1000, self.last_bug_found_at * 2
1030 )
1031
1032 def generate_new_examples(self) -> None:
1033 if Phase.generate not in self.settings.phases:
1034 return
1035 if self.interesting_examples:
1036 # The example database has failing examples from a previous run,
1037 # so we'd rather report that they're still failing ASAP than take
1038 # the time to look for additional failures.
1039 return
1040
1041 self.debug("Generating new examples")
1042
1043 assert self.should_generate_more()
1044 self._switch_to_hypothesis_provider = True
1045 zero_data = self.cached_test_function((ChoiceTemplate("simplest", count=None),))
1046 if zero_data.status > Status.OVERRUN:
1047 assert isinstance(zero_data, ConjectureResult)
1048 # if the crosshair backend cannot proceed, it does not (and cannot)
1049 # realize the symbolic values, with the intent that Hypothesis will
1050 # throw away this test case. We usually do, but if it's the zero data
1051 # then we try to pin it here, which requires realizing the symbolics.
1052 #
1053 # We don't (yet) rely on the zero data being pinned, and so
1054 # it's simply a very slight performance loss to simply not pin it
1055 # if doing so would error.
1056 if zero_data.cannot_proceed_scope is None: # pragma: no branch
1057 self.__data_cache.pin(
1058 self._cache_key(zero_data.choices), zero_data.as_result()
1059 ) # Pin forever
1060
1061 if zero_data.status == Status.OVERRUN or (
1062 zero_data.status == Status.VALID
1063 and isinstance(zero_data, ConjectureResult)
1064 and zero_data.length * 2 > BUFFER_SIZE
1065 ):
1066 fail_health_check(
1067 self.settings,
1068 "The smallest natural example for your test is extremely "
1069 "large. This makes it difficult for Hypothesis to generate "
1070 "good examples, especially when trying to reduce failing ones "
1071 "at the end. Consider reducing the size of your data if it is "
1072 "of a fixed size. You could also fix this by improving how "
1073 "your data shrinks (see https://hypothesis.readthedocs.io/en/"
1074 "latest/data.html#shrinking for details), or by introducing "
1075 "default values inside your strategy. e.g. could you replace "
1076 "some arguments with their defaults by using "
1077 "one_of(none(), some_complex_strategy)?",
1078 HealthCheck.large_base_example,
1079 )
1080
1081 self.health_check_state = HealthCheckState()
1082
1083 # We attempt to use the size of the minimal generated test case starting
1084 # from a given novel prefix as a guideline to generate smaller test
1085 # cases for an initial period, by restriscting ourselves to test cases
1086 # that are not much larger than it.
1087 #
1088 # Calculating the actual minimal generated test case is hard, so we
1089 # take a best guess that zero extending a prefix produces the minimal
1090 # test case starting with that prefix (this is true for our built in
1091 # strategies). This is only a reasonable thing to do if the resulting
1092 # test case is valid. If we regularly run into situations where it is
1093 # not valid then this strategy is a waste of time, so we want to
1094 # abandon it early. In order to do this we track how many times in a
1095 # row it has failed to work, and abort small test case generation when
1096 # it has failed too many times in a row.
1097 consecutive_zero_extend_is_invalid = 0
1098
1099 # We control growth during initial example generation, for two
1100 # reasons:
1101 #
1102 # * It gives us an opportunity to find small examples early, which
1103 # gives us a fast path for easy to find bugs.
1104 # * It avoids low probability events where we might end up
1105 # generating very large examples during health checks, which
1106 # on slower machines can trigger HealthCheck.too_slow.
1107 #
1108 # The heuristic we use is that we attempt to estimate the smallest
1109 # extension of this prefix, and limit the size to no more than
1110 # an order of magnitude larger than that. If we fail to estimate
1111 # the size accurately, we skip over this prefix and try again.
1112 #
1113 # We need to tune the example size based on the initial prefix,
1114 # because any fixed size might be too small, and any size based
1115 # on the strategy in general can fall afoul of strategies that
1116 # have very different sizes for different prefixes.
1117 #
1118 # We previously set a minimum value of 10 on small_example_cap, with the
1119 # reasoning of avoiding flaky health checks. However, some users set a
1120 # low max_examples for performance. A hard lower bound in this case biases
1121 # the distribution towards small (and less powerful) examples. Flaky
1122 # and loud health checks are better than silent performance degradation.
1123 small_example_cap = min(self.settings.max_examples // 10, 50)
1124 optimise_at = max(self.settings.max_examples // 2, small_example_cap + 1, 10)
1125 ran_optimisations = False
1126 self._switch_to_hypothesis_provider = False
1127
1128 while self.should_generate_more():
1129 # we don't yet integrate DataTree with backends. Instead of generating
1130 # a novel prefix, ask the backend for an input.
1131 if not self.using_hypothesis_backend:
1132 data = self.new_conjecture_data([])
1133 with suppress(BackendCannotProceed):
1134 self.test_function(data)
1135 continue
1136
1137 self._current_phase = "generate"
1138 prefix = self.generate_novel_prefix()
1139 if (
1140 self.valid_examples <= small_example_cap
1141 and self.call_count <= 5 * small_example_cap
1142 and not self.interesting_examples
1143 and consecutive_zero_extend_is_invalid < 5
1144 ):
1145 minimal_example = self.cached_test_function(
1146 prefix + (ChoiceTemplate("simplest", count=None),)
1147 )
1148
1149 if minimal_example.status < Status.VALID:
1150 consecutive_zero_extend_is_invalid += 1
1151 continue
1152 # Because the Status code is greater than Status.VALID, it cannot be
1153 # Status.OVERRUN, which guarantees that the minimal_example is a
1154 # ConjectureResult object.
1155 assert isinstance(minimal_example, ConjectureResult)
1156 consecutive_zero_extend_is_invalid = 0
1157 minimal_extension = len(minimal_example.choices) - len(prefix)
1158 max_length = len(prefix) + minimal_extension * 5
1159
1160 # We could end up in a situation where even though the prefix was
1161 # novel when we generated it, because we've now tried zero extending
1162 # it not all possible continuations of it will be novel. In order to
1163 # avoid making redundant test calls, we rerun it in simulation mode
1164 # first. If this has a predictable result, then we don't bother
1165 # running the test function for real here. If however we encounter
1166 # some novel behaviour, we try again with the real test function,
1167 # starting from the new novel prefix that has discovered.
1168 trial_data = self.new_conjecture_data(prefix, max_choices=max_length)
1169 try:
1170 self.tree.simulate_test_function(trial_data)
1171 continue
1172 except PreviouslyUnseenBehaviour:
1173 pass
1174
1175 # If the simulation entered part of the tree that has been killed,
1176 # we don't want to run this.
1177 assert isinstance(trial_data.observer, TreeRecordingObserver)
1178 if trial_data.observer.killed:
1179 continue
1180
1181 # We might have hit the cap on number of examples we should
1182 # run when calculating the minimal example.
1183 if not self.should_generate_more():
1184 break
1185
1186 prefix = trial_data.choices
1187 else:
1188 max_length = None
1189
1190 data = self.new_conjecture_data(prefix, max_choices=max_length)
1191 self.test_function(data)
1192
1193 if (
1194 data.status is Status.OVERRUN
1195 and max_length is not None
1196 and "invalid because" not in data.events
1197 ):
1198 data.events["invalid because"] = (
1199 "reduced max size for early examples (avoids flaky health checks)"
1200 )
1201
1202 self.generate_mutations_from(data)
1203
1204 # Although the optimisations are logically a distinct phase, we
1205 # actually normally run them as part of example generation. The
1206 # reason for this is that we cannot guarantee that optimisation
1207 # actually exhausts our budget: It might finish running and we
1208 # discover that actually we still could run a bunch more test cases
1209 # if we want.
1210 if (
1211 self.valid_examples >= max(small_example_cap, optimise_at)
1212 and not ran_optimisations
1213 ):
1214 ran_optimisations = True
1215 self._current_phase = "target"
1216 self.optimise_targets()
1217
1218 def generate_mutations_from(
1219 self, data: Union[ConjectureData, ConjectureResult]
1220 ) -> None:
1221 # A thing that is often useful but rarely happens by accident is
1222 # to generate the same value at multiple different points in the
1223 # test case.
1224 #
1225 # Rather than make this the responsibility of individual strategies
1226 # we implement a small mutator that just takes parts of the test
1227 # case with the same label and tries replacing one of them with a
1228 # copy of the other and tries running it. If we've made a good
1229 # guess about what to put where, this will run a similar generated
1230 # test case with more duplication.
1231 if (
1232 # An OVERRUN doesn't have enough information about the test
1233 # case to mutate, so we just skip those.
1234 data.status >= Status.INVALID
1235 # This has a tendency to trigger some weird edge cases during
1236 # generation so we don't let it run until we're done with the
1237 # health checks.
1238 and self.health_check_state is None
1239 ):
1240 initial_calls = self.call_count
1241 failed_mutations = 0
1242
1243 while (
1244 self.should_generate_more()
1245 # We implement fairly conservative checks for how long we
1246 # we should run mutation for, as it's generally not obvious
1247 # how helpful it is for any given test case.
1248 and self.call_count <= initial_calls + 5
1249 and failed_mutations <= 5
1250 ):
1251 groups = data.spans.mutator_groups
1252 if not groups:
1253 break
1254
1255 group = self.random.choice(groups)
1256 (start1, end1), (start2, end2) = self.random.sample(sorted(group), 2)
1257 if start1 > start2:
1258 (start1, end1), (start2, end2) = (start2, end2), (start1, end1)
1259
1260 if (
1261 start1 <= start2 <= end2 <= end1
1262 ): # pragma: no cover # flaky on conjecture-cover tests
1263 # One span entirely contains the other. The strategy is very
1264 # likely some kind of tree. e.g. we might have
1265 #
1266 # ┌─────┐
1267 # ┌─────┤ a ├──────┐
1268 # │ └─────┘ │
1269 # ┌──┴──┐ ┌──┴──┐
1270 # ┌──┤ b ├──┐ ┌──┤ c ├──┐
1271 # │ └──┬──┘ │ │ └──┬──┘ │
1272 # ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐
1273 # │ d │ │ e │ │ f │ │ g │ │ h │ │ i │
1274 # └───┘ └───┘ └───┘ └───┘ └───┘ └───┘
1275 #
1276 # where each node is drawn from the same strategy and so
1277 # has the same span label. We might have selected the spans
1278 # corresponding to the a and c nodes, which is the entire
1279 # tree and the subtree of (and including) c respectively.
1280 #
1281 # There are two possible mutations we could apply in this case:
1282 # 1. replace a with c (replace child with parent)
1283 # 2. replace c with a (replace parent with child)
1284 #
1285 # (1) results in multiple partial copies of the
1286 # parent:
1287 # ┌─────┐
1288 # ┌─────┤ a ├────────────┐
1289 # │ └─────┘ │
1290 # ┌──┴──┐ ┌─┴───┐
1291 # ┌──┤ b ├──┐ ┌─────┤ a ├──────┐
1292 # │ └──┬──┘ │ │ └─────┘ │
1293 # ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌──┴──┐ ┌──┴──┐
1294 # │ d │ │ e │ │ f │ ┌──┤ b ├──┐ ┌──┤ c ├──┐
1295 # └───┘ └───┘ └───┘ │ └──┬──┘ │ │ └──┬──┘ │
1296 # ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐
1297 # │ d │ │ e │ │ f │ │ g │ │ h │ │ i │
1298 # └───┘ └───┘ └───┘ └───┘ └───┘ └───┘
1299 #
1300 # While (2) results in truncating part of the parent:
1301 #
1302 # ┌─────┐
1303 # ┌──┤ c ├──┐
1304 # │ └──┬──┘ │
1305 # ┌─┴─┐ ┌─┴─┐ ┌─┴─┐
1306 # │ g │ │ h │ │ i │
1307 # └───┘ └───┘ └───┘
1308 #
1309 # (1) is the same as Example IV.4. in Nautilus (NDSS '19)
1310 # (https://wcventure.github.io/FuzzingPaper/Paper/NDSS19_Nautilus.pdf),
1311 # except we do not repeat the replacement additional times
1312 # (the paper repeats it once for a total of two copies).
1313 #
1314 # We currently only apply mutation (1), and ignore mutation
1315 # (2). The reason is that the attempt generated from (2) is
1316 # always something that Hypothesis could easily have generated
1317 # itself, by simply not making various choices. Whereas
1318 # duplicating the exact value + structure of particular choices
1319 # in (1) would have been hard for Hypothesis to generate by
1320 # chance.
1321 #
1322 # TODO: an extension of this mutation might repeat (1) on
1323 # a geometric distribution between 0 and ~10 times. We would
1324 # need to find the corresponding span to recurse on in the new
1325 # choices, probably just by using the choices index.
1326
1327 # case (1): duplicate the choices in start1:start2.
1328 attempt = data.choices[:start2] + data.choices[start1:]
1329 else:
1330 (start, end) = self.random.choice([(start1, end1), (start2, end2)])
1331 replacement = data.choices[start:end]
1332 # We attempt to replace both the examples with
1333 # whichever choice we made. Note that this might end
1334 # up messing up and getting the example boundaries
1335 # wrong - labels matching are only a best guess as to
1336 # whether the two are equivalent - but it doesn't
1337 # really matter. It may not achieve the desired result,
1338 # but it's still a perfectly acceptable choice sequence
1339 # to try.
1340 attempt = (
1341 data.choices[:start1]
1342 + replacement
1343 + data.choices[end1:start2]
1344 + replacement
1345 + data.choices[end2:]
1346 )
1347
1348 try:
1349 new_data = self.cached_test_function(
1350 attempt,
1351 # We set error_on_discard so that we don't end up
1352 # entering parts of the tree we consider redundant
1353 # and not worth exploring.
1354 error_on_discard=True,
1355 )
1356 except ContainsDiscard:
1357 failed_mutations += 1
1358 continue
1359
1360 if new_data is Overrun:
1361 failed_mutations += 1 # pragma: no cover # annoying case
1362 else:
1363 assert isinstance(new_data, ConjectureResult)
1364 if (
1365 new_data.status >= data.status
1366 and choices_key(data.choices) != choices_key(new_data.choices)
1367 and all(
1368 k in new_data.target_observations
1369 and new_data.target_observations[k] >= v
1370 for k, v in data.target_observations.items()
1371 )
1372 ):
1373 data = new_data
1374 failed_mutations = 0
1375 else:
1376 failed_mutations += 1
1377
1378 def optimise_targets(self) -> None:
1379 """If any target observations have been made, attempt to optimise them
1380 all."""
1381 if not self.should_optimise:
1382 return
1383 from hypothesis.internal.conjecture.optimiser import Optimiser
1384
1385 # We want to avoid running the optimiser for too long in case we hit
1386 # an unbounded target score. We start this off fairly conservatively
1387 # in case interesting examples are easy to find and then ramp it up
1388 # on an exponential schedule so we don't hamper the optimiser too much
1389 # if it needs a long time to find good enough improvements.
1390 max_improvements = 10
1391 while True:
1392 prev_calls = self.call_count
1393
1394 any_improvements = False
1395
1396 for target, data in list(self.best_examples_of_observed_targets.items()):
1397 optimiser = Optimiser(
1398 self, data, target, max_improvements=max_improvements
1399 )
1400 optimiser.run()
1401 if optimiser.improvements > 0:
1402 any_improvements = True
1403
1404 if self.interesting_examples:
1405 break
1406
1407 max_improvements *= 2
1408
1409 if any_improvements:
1410 continue
1411
1412 if self.best_observed_targets:
1413 self.pareto_optimise()
1414
1415 if prev_calls == self.call_count:
1416 break
1417
1418 def pareto_optimise(self) -> None:
1419 if self.pareto_front is not None:
1420 ParetoOptimiser(self).run()
1421
1422 def _run(self) -> None:
1423 # have to use the primitive provider to interpret database bits...
1424 self._switch_to_hypothesis_provider = True
1425 with self._log_phase_statistics("reuse"):
1426 self.reuse_existing_examples()
1427 # Fast path for development: If the database gave us interesting
1428 # examples from the previously stored primary key, don't try
1429 # shrinking it again as it's unlikely to work.
1430 if self.reused_previously_shrunk_test_case:
1431 self.exit_with(ExitReason.finished)
1432 # ...but we should use the supplied provider when generating...
1433 self._switch_to_hypothesis_provider = False
1434 with self._log_phase_statistics("generate"):
1435 self.generate_new_examples()
1436 # We normally run the targeting phase mixed in with the generate phase,
1437 # but if we've been asked to run it but not generation then we have to
1438 # run it explicitly on its own here.
1439 if Phase.generate not in self.settings.phases:
1440 self._current_phase = "target"
1441 self.optimise_targets()
1442 # ...and back to the primitive provider when shrinking.
1443 self._switch_to_hypothesis_provider = True
1444 with self._log_phase_statistics("shrink"):
1445 self.shrink_interesting_examples()
1446 self.exit_with(ExitReason.finished)
1447
1448 def new_conjecture_data(
1449 self,
1450 prefix: Sequence[Union[ChoiceT, ChoiceTemplate]],
1451 *,
1452 observer: Optional[DataObserver] = None,
1453 max_choices: Optional[int] = None,
1454 ) -> ConjectureData:
1455 provider = (
1456 HypothesisProvider if self._switch_to_hypothesis_provider else self.provider
1457 )
1458 observer = observer or self.tree.new_observer()
1459 if not self.using_hypothesis_backend:
1460 observer = DataObserver()
1461
1462 return ConjectureData(
1463 prefix=prefix,
1464 observer=observer,
1465 provider=provider,
1466 max_choices=max_choices,
1467 random=self.random,
1468 )
1469
1470 def shrink_interesting_examples(self) -> None:
1471 """If we've found interesting examples, try to replace each of them
1472 with a minimal interesting example with the same interesting_origin.
1473
1474 We may find one or more examples with a new interesting_origin
1475 during the shrink process. If so we shrink these too.
1476 """
1477 if Phase.shrink not in self.settings.phases or not self.interesting_examples:
1478 return
1479
1480 self.debug("Shrinking interesting examples")
1481 self.finish_shrinking_deadline = time.perf_counter() + MAX_SHRINKING_SECONDS
1482
1483 for prev_data in sorted(
1484 self.interesting_examples.values(), key=lambda d: sort_key(d.nodes)
1485 ):
1486 assert prev_data.status == Status.INTERESTING
1487 data = self.new_conjecture_data(prev_data.choices)
1488 self.test_function(data)
1489 if data.status != Status.INTERESTING:
1490 self.exit_with(ExitReason.flaky)
1491
1492 self.clear_secondary_key()
1493
1494 while len(self.shrunk_examples) < len(self.interesting_examples):
1495 target, example = min(
1496 (
1497 (k, v)
1498 for k, v in self.interesting_examples.items()
1499 if k not in self.shrunk_examples
1500 ),
1501 key=lambda kv: (sort_key(kv[1].nodes), shortlex(repr(kv[0]))),
1502 )
1503 self.debug(f"Shrinking {target!r}: {example.choices}")
1504
1505 if not self.settings.report_multiple_bugs:
1506 # If multi-bug reporting is disabled, we shrink our currently-minimal
1507 # failure, allowing 'slips' to any bug with a smaller minimal example.
1508 self.shrink(example, lambda d: d.status == Status.INTERESTING)
1509 return
1510
1511 def predicate(d: Union[ConjectureResult, _Overrun]) -> bool:
1512 if d.status < Status.INTERESTING:
1513 return False
1514 d = cast(ConjectureResult, d)
1515 return d.interesting_origin == target
1516
1517 self.shrink(example, predicate)
1518
1519 self.shrunk_examples.add(target)
1520
1521 def clear_secondary_key(self) -> None:
1522 if self.has_existing_examples():
1523 # If we have any smaller examples in the secondary corpus, now is
1524 # a good time to try them to see if they work as shrinks. They
1525 # probably won't, but it's worth a shot and gives us a good
1526 # opportunity to clear out the database.
1527
1528 # It's not worth trying the primary corpus because we already
1529 # tried all of those in the initial phase.
1530 corpus = sorted(
1531 self.settings.database.fetch(self.secondary_key), key=shortlex
1532 )
1533 for c in corpus:
1534 choices = choices_from_bytes(c)
1535 if choices is None:
1536 self.settings.database.delete(self.secondary_key, c)
1537 continue
1538 primary = {
1539 choices_to_bytes(v.choices)
1540 for v in self.interesting_examples.values()
1541 }
1542 if shortlex(c) > max(map(shortlex, primary)):
1543 break
1544
1545 self.cached_test_function(choices)
1546 # We unconditionally remove c from the secondary key as it
1547 # is either now primary or worse than our primary example
1548 # of this reason for interestingness.
1549 self.settings.database.delete(self.secondary_key, c)
1550
1551 def shrink(
1552 self,
1553 example: Union[ConjectureData, ConjectureResult],
1554 predicate: Optional[ShrinkPredicateT] = None,
1555 allow_transition: Optional[
1556 Callable[[Union[ConjectureData, ConjectureResult], ConjectureData], bool]
1557 ] = None,
1558 ) -> Union[ConjectureData, ConjectureResult]:
1559 s = self.new_shrinker(example, predicate, allow_transition)
1560 s.shrink()
1561 return s.shrink_target
1562
1563 def new_shrinker(
1564 self,
1565 example: Union[ConjectureData, ConjectureResult],
1566 predicate: Optional[ShrinkPredicateT] = None,
1567 allow_transition: Optional[
1568 Callable[[Union[ConjectureData, ConjectureResult], ConjectureData], bool]
1569 ] = None,
1570 ) -> Shrinker:
1571 return Shrinker(
1572 self,
1573 example,
1574 predicate,
1575 allow_transition=allow_transition,
1576 explain=Phase.explain in self.settings.phases,
1577 in_target_phase=self._current_phase == "target",
1578 )
1579
1580 def passing_choice_sequences(
1581 self, prefix: Sequence[ChoiceNode] = ()
1582 ) -> frozenset[tuple[ChoiceNode, ...]]:
1583 """Return a collection of choice sequence nodes which cause the test to pass.
1584 Optionally restrict this by a certain prefix, which is useful for explain mode.
1585 """
1586 return frozenset(
1587 cast(ConjectureResult, result).nodes
1588 for key in self.__data_cache
1589 if (result := self.__data_cache[key]).status is Status.VALID
1590 and startswith(cast(ConjectureResult, result).nodes, prefix)
1591 )
1592
1593
1594class ContainsDiscard(Exception):
1595 pass