1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
10
11import importlib
12import inspect
13import math
14import textwrap
15import time
16from collections import defaultdict
17from collections.abc import Generator, Sequence
18from contextlib import AbstractContextManager, contextmanager, nullcontext, suppress
19from dataclasses import dataclass, field
20from datetime import timedelta
21from enum import Enum
22from random import Random, getrandbits
23from typing import Callable, Final, List, Literal, NoReturn, Optional, Union, cast
24
25from hypothesis import HealthCheck, Phase, Verbosity, settings as Settings
26from hypothesis._settings import local_settings, note_deprecation
27from hypothesis.database import ExampleDatabase, choices_from_bytes, choices_to_bytes
28from hypothesis.errors import (
29 BackendCannotProceed,
30 FlakyReplay,
31 HypothesisException,
32 InvalidArgument,
33 StopTest,
34)
35from hypothesis.internal.cache import LRUReusedCache
36from hypothesis.internal.compat import NotRequired, TypedDict, ceil, override
37from hypothesis.internal.conjecture.choice import (
38 ChoiceConstraintsT,
39 ChoiceKeyT,
40 ChoiceNode,
41 ChoiceT,
42 ChoiceTemplate,
43 choices_key,
44)
45from hypothesis.internal.conjecture.data import (
46 ConjectureData,
47 ConjectureResult,
48 DataObserver,
49 Overrun,
50 Status,
51 _Overrun,
52)
53from hypothesis.internal.conjecture.datatree import (
54 DataTree,
55 PreviouslyUnseenBehaviour,
56 TreeRecordingObserver,
57)
58from hypothesis.internal.conjecture.junkdrawer import (
59 ensure_free_stackframes,
60 startswith,
61)
62from hypothesis.internal.conjecture.pareto import NO_SCORE, ParetoFront, ParetoOptimiser
63from hypothesis.internal.conjecture.providers import (
64 AVAILABLE_PROVIDERS,
65 HypothesisProvider,
66 PrimitiveProvider,
67)
68from hypothesis.internal.conjecture.shrinker import Shrinker, ShrinkPredicateT, sort_key
69from hypothesis.internal.escalation import InterestingOrigin
70from hypothesis.internal.healthcheck import fail_health_check
71from hypothesis.internal.observability import Observation, with_observation_callback
72from hypothesis.reporting import base_report, report
73
74#: The maximum number of times the shrinker will reduce the complexity of a failing
75#: input before giving up. This avoids falling down a trap of exponential (or worse)
76#: complexity, where the shrinker appears to be making progress but will take a
77#: substantially long time to finish completely.
78MAX_SHRINKS: Final[int] = 500
79
80# If the shrinking phase takes more than five minutes, abort it early and print
81# a warning. Many CI systems will kill a build after around ten minutes with
82# no output, and appearing to hang isn't great for interactive use either -
83# showing partially-shrunk examples is better than quitting with no examples!
84# (but make it monkeypatchable, for the rare users who need to keep on shrinking)
85
86#: The maximum total time in seconds that the shrinker will try to shrink a failure
87#: for before giving up. This is across all shrinks for the same failure, so even
88#: if the shrinker successfully reduces the complexity of a single failure several
89#: times, it will stop when it hits |MAX_SHRINKING_SECONDS| of total time taken.
90MAX_SHRINKING_SECONDS: Final[int] = 300
91
92#: The maximum amount of entropy a single test case can use before giving up
93#: while making random choices during input generation.
94#:
95#: The "unit" of one |BUFFER_SIZE| does not have any defined semantics, and you
96#: should not rely on it, except that a linear increase |BUFFER_SIZE| will linearly
97#: increase the amount of entropy a test case can use during generation.
98BUFFER_SIZE: Final[int] = 8 * 1024
99CACHE_SIZE: Final[int] = 10000
100MIN_TEST_CALLS: Final[int] = 10
101
102
103def shortlex(s):
104 return (len(s), s)
105
106
107@dataclass
108class HealthCheckState:
109 valid_examples: int = field(default=0)
110 invalid_examples: int = field(default=0)
111 overrun_examples: int = field(default=0)
112 draw_times: "defaultdict[str, List[float]]" = field(
113 default_factory=lambda: defaultdict(list)
114 )
115
116 @property
117 def total_draw_time(self) -> float:
118 return math.fsum(sum(self.draw_times.values(), start=[]))
119
120 def timing_report(self) -> str:
121 """Return a terminal report describing what was slow."""
122 if not self.draw_times:
123 return ""
124 width = max(
125 len(k.removeprefix("generate:").removesuffix(": ")) for k in self.draw_times
126 )
127 out = [f"\n {'':^{width}} count | fraction | slowest draws (seconds)"]
128 args_in_order = sorted(self.draw_times.items(), key=lambda kv: -sum(kv[1]))
129 for i, (argname, times) in enumerate(args_in_order): # pragma: no branch
130 # If we have very many unique keys, which can happen due to interactive
131 # draws with computed labels, we'll skip uninformative rows.
132 if (
133 5 <= i < (len(self.draw_times) - 2)
134 and math.fsum(times) * 20 < self.total_draw_time
135 ):
136 out.append(f" (skipped {len(self.draw_times) - i} rows of fast draws)")
137 break
138 # Compute the row to report, omitting times <1ms to focus on slow draws
139 reprs = [f"{t:>6.3f}," for t in sorted(times)[-5:] if t > 5e-4]
140 desc = " ".join(([" -- "] * 5 + reprs)[-5:]).rstrip(",")
141 arg = argname.removeprefix("generate:").removesuffix(": ")
142 out.append(
143 f" {arg:^{width}} | {len(times):>4} | "
144 f"{math.fsum(times)/self.total_draw_time:>7.0%} | {desc}"
145 )
146 return "\n".join(out)
147
148
149class ExitReason(Enum):
150 max_examples = "settings.max_examples={s.max_examples}"
151 max_iterations = (
152 "settings.max_examples={s.max_examples}, "
153 "but < 10% of examples satisfied assumptions"
154 )
155 max_shrinks = f"shrunk example {MAX_SHRINKS} times"
156 finished = "nothing left to do"
157 flaky = "test was flaky"
158 very_slow_shrinking = "shrinking was very slow"
159
160 def describe(self, settings: Settings) -> str:
161 return self.value.format(s=settings)
162
163
164class RunIsComplete(Exception):
165 pass
166
167
168def _get_provider(backend: str) -> Union[type, PrimitiveProvider]:
169 mname, cname = AVAILABLE_PROVIDERS[backend].rsplit(".", 1)
170 provider_cls = getattr(importlib.import_module(mname), cname)
171 if provider_cls.lifetime == "test_function":
172 return provider_cls(None)
173 elif provider_cls.lifetime == "test_case":
174 return provider_cls
175 else:
176 raise InvalidArgument(
177 f"invalid lifetime {provider_cls.lifetime} for provider {provider_cls.__name__}. "
178 "Expected one of 'test_function', 'test_case'."
179 )
180
181
182class CallStats(TypedDict):
183 status: str
184 runtime: float
185 drawtime: float
186 gctime: float
187 events: list[str]
188
189
190PhaseStatistics = TypedDict(
191 "PhaseStatistics",
192 {
193 "duration-seconds": float,
194 "test-cases": list[CallStats],
195 "distinct-failures": int,
196 "shrinks-successful": int,
197 },
198)
199StatisticsDict = TypedDict(
200 "StatisticsDict",
201 {
202 "generate-phase": NotRequired[PhaseStatistics],
203 "reuse-phase": NotRequired[PhaseStatistics],
204 "shrink-phase": NotRequired[PhaseStatistics],
205 "stopped-because": NotRequired[str],
206 "targets": NotRequired[dict[str, float]],
207 "nodeid": NotRequired[str],
208 },
209)
210
211
212def choice_count(choices: Sequence[Union[ChoiceT, ChoiceTemplate]]) -> Optional[int]:
213 count = 0
214 for choice in choices:
215 if isinstance(choice, ChoiceTemplate):
216 if choice.count is None:
217 return None
218 count += choice.count
219 else:
220 count += 1
221 return count
222
223
224class DiscardObserver(DataObserver):
225 @override
226 def kill_branch(self) -> NoReturn:
227 raise ContainsDiscard
228
229
230def realize_choices(data: ConjectureData, *, for_failure: bool) -> None:
231 # backwards-compatibility with backends without for_failure, can remove
232 # in a few months
233 kwargs = {}
234 if for_failure:
235 if "for_failure" in inspect.signature(data.provider.realize).parameters:
236 kwargs["for_failure"] = True
237 else:
238 note_deprecation(
239 f"{type(data.provider).__qualname__}.realize does not have the "
240 "for_failure parameter. This will be an error in future versions "
241 "of Hypothesis. (If you installed this backend from a separate "
242 "package, upgrading that package may help).",
243 has_codemod=False,
244 since="2025-05-07",
245 )
246
247 for node in data.nodes:
248 value = data.provider.realize(node.value, **kwargs)
249 expected_type = {
250 "string": str,
251 "float": float,
252 "integer": int,
253 "boolean": bool,
254 "bytes": bytes,
255 }[node.type]
256 if type(value) is not expected_type:
257 raise HypothesisException(
258 f"expected {expected_type} from "
259 f"{data.provider.realize.__qualname__}, got {type(value)}"
260 )
261
262 constraints = cast(
263 ChoiceConstraintsT,
264 {
265 k: data.provider.realize(v, **kwargs)
266 for k, v in node.constraints.items()
267 },
268 )
269 node.value = value
270 node.constraints = constraints
271
272
273class ConjectureRunner:
274 def __init__(
275 self,
276 test_function: Callable[[ConjectureData], None],
277 *,
278 settings: Optional[Settings] = None,
279 random: Optional[Random] = None,
280 database_key: Optional[bytes] = None,
281 ignore_limits: bool = False,
282 ) -> None:
283 self._test_function: Callable[[ConjectureData], None] = test_function
284 self.settings: Settings = settings or Settings()
285 self.shrinks: int = 0
286 self.finish_shrinking_deadline: Optional[float] = None
287 self.call_count: int = 0
288 self.misaligned_count: int = 0
289 self.valid_examples: int = 0
290 self.invalid_examples: int = 0
291 self.overrun_examples: int = 0
292 self.random: Random = random or Random(getrandbits(128))
293 self.database_key: Optional[bytes] = database_key
294 self.ignore_limits: bool = ignore_limits
295
296 # Global dict of per-phase statistics, and a list of per-call stats
297 # which transfer to the global dict at the end of each phase.
298 self._current_phase: str = "(not a phase)"
299 self.statistics: StatisticsDict = {}
300 self.stats_per_test_case: list[CallStats] = []
301
302 # At runtime, the keys are only ever type `InterestingOrigin`, but can be `None` during tests.
303 self.interesting_examples: dict[
304 Optional[InterestingOrigin], ConjectureResult
305 ] = {}
306 # We use call_count because there may be few possible valid_examples.
307 self.first_bug_found_at: Optional[int] = None
308 self.last_bug_found_at: Optional[int] = None
309
310 # At runtime, the keys are only ever type `InterestingOrigin`, but can be `None` during tests.
311 self.shrunk_examples: set[Optional[InterestingOrigin]] = set()
312
313 self.health_check_state: Optional[HealthCheckState] = None
314
315 self.tree: DataTree = DataTree()
316
317 self.provider: Union[type, PrimitiveProvider] = _get_provider(
318 self.settings.backend
319 )
320
321 self.best_observed_targets: defaultdict[str, float] = defaultdict(
322 lambda: NO_SCORE
323 )
324 self.best_examples_of_observed_targets: dict[str, ConjectureResult] = {}
325
326 # We keep the pareto front in the example database if we have one. This
327 # is only marginally useful at present, but speeds up local development
328 # because it means that large targets will be quickly surfaced in your
329 # testing.
330 self.pareto_front: Optional[ParetoFront] = None
331 if self.database_key is not None and self.settings.database is not None:
332 self.pareto_front = ParetoFront(self.random)
333 self.pareto_front.on_evict(self.on_pareto_evict)
334
335 # We want to be able to get the ConjectureData object that results
336 # from running a buffer without recalculating, especially during
337 # shrinking where we need to know about the structure of the
338 # executed test case.
339 self.__data_cache = LRUReusedCache[
340 tuple[ChoiceKeyT, ...], Union[ConjectureResult, _Overrun]
341 ](CACHE_SIZE)
342
343 self.reused_previously_shrunk_test_case: bool = False
344
345 self.__pending_call_explanation: Optional[str] = None
346 self._backend_found_failure: bool = False
347 self._switch_to_hypothesis_provider: bool = False
348
349 self.__failed_realize_count: int = 0
350 # note unsound verification by alt backends
351 self._verified_by: Optional[str] = None
352
353 @property
354 def using_hypothesis_backend(self) -> bool:
355 return (
356 self.settings.backend == "hypothesis" or self._switch_to_hypothesis_provider
357 )
358
359 def explain_next_call_as(self, explanation: str) -> None:
360 self.__pending_call_explanation = explanation
361
362 def clear_call_explanation(self) -> None:
363 self.__pending_call_explanation = None
364
365 @contextmanager
366 def _log_phase_statistics(
367 self, phase: Literal["reuse", "generate", "shrink"]
368 ) -> Generator[None, None, None]:
369 self.stats_per_test_case.clear()
370 start_time = time.perf_counter()
371 try:
372 self._current_phase = phase
373 yield
374 finally:
375 # We ignore the mypy type error here. Because `phase` is a string literal and "-phase" is a string literal
376 # as well, the concatenation will always be valid key in the dictionary.
377 self.statistics[phase + "-phase"] = { # type: ignore
378 "duration-seconds": time.perf_counter() - start_time,
379 "test-cases": list(self.stats_per_test_case),
380 "distinct-failures": len(self.interesting_examples),
381 "shrinks-successful": self.shrinks,
382 }
383
384 @property
385 def should_optimise(self) -> bool:
386 return Phase.target in self.settings.phases
387
388 def __tree_is_exhausted(self) -> bool:
389 return self.tree.is_exhausted and self.using_hypothesis_backend
390
391 def __stoppable_test_function(self, data: ConjectureData) -> None:
392 """Run ``self._test_function``, but convert a ``StopTest`` exception
393 into a normal return and avoid raising anything flaky for RecursionErrors.
394 """
395 # We ensure that the test has this much stack space remaining, no
396 # matter the size of the stack when called, to de-flake RecursionErrors
397 # (#2494, #3671). Note, this covers the data generation part of the test;
398 # the actual test execution is additionally protected at the call site
399 # in hypothesis.core.execute_once.
400 with ensure_free_stackframes():
401 try:
402 self._test_function(data)
403 except StopTest as e:
404 if e.testcounter == data.testcounter:
405 # This StopTest has successfully stopped its test, and can now
406 # be discarded.
407 pass
408 else:
409 # This StopTest was raised by a different ConjectureData. We
410 # need to re-raise it so that it will eventually reach the
411 # correct engine.
412 raise
413
414 def _cache_key(self, choices: Sequence[ChoiceT]) -> tuple[ChoiceKeyT, ...]:
415 return choices_key(choices)
416
417 def _cache(self, data: ConjectureData) -> None:
418 result = data.as_result()
419 key = self._cache_key(data.choices)
420 self.__data_cache[key] = result
421
422 def cached_test_function(
423 self,
424 choices: Sequence[Union[ChoiceT, ChoiceTemplate]],
425 *,
426 error_on_discard: bool = False,
427 extend: Union[int, Literal["full"]] = 0,
428 ) -> Union[ConjectureResult, _Overrun]:
429 """
430 If ``error_on_discard`` is set to True this will raise ``ContainsDiscard``
431 in preference to running the actual test function. This is to allow us
432 to skip test cases we expect to be redundant in some cases. Note that
433 it may be the case that we don't raise ``ContainsDiscard`` even if the
434 result has discards if we cannot determine from previous runs whether
435 it will have a discard.
436 """
437 # node templates represent a not-yet-filled hole and therefore cannot
438 # be cached or retrieved from the cache.
439 if not any(isinstance(choice, ChoiceTemplate) for choice in choices):
440 # this type cast is validated by the isinstance check above (ie, there
441 # are no ChoiceTemplate elements).
442 choices = cast(Sequence[ChoiceT], choices)
443 key = self._cache_key(choices)
444 try:
445 cached = self.__data_cache[key]
446 # if we have a cached overrun for this key, but we're allowing extensions
447 # of the nodes, it could in fact run to a valid data if we try.
448 if extend == 0 or cached.status is not Status.OVERRUN:
449 return cached
450 except KeyError:
451 pass
452
453 if extend == "full":
454 max_length = None
455 elif (count := choice_count(choices)) is None:
456 max_length = None
457 else:
458 max_length = count + extend
459
460 # explicitly use a no-op DataObserver here instead of a TreeRecordingObserver.
461 # The reason is we don't expect simulate_test_function to explore new choices
462 # and write back to the tree, so we don't want the overhead of the
463 # TreeRecordingObserver tracking those calls.
464 trial_observer: Optional[DataObserver] = DataObserver()
465 if error_on_discard:
466 trial_observer = DiscardObserver()
467
468 try:
469 trial_data = self.new_conjecture_data(
470 choices, observer=trial_observer, max_choices=max_length
471 )
472 self.tree.simulate_test_function(trial_data)
473 except PreviouslyUnseenBehaviour:
474 pass
475 else:
476 trial_data.freeze()
477 key = self._cache_key(trial_data.choices)
478 if trial_data.status > Status.OVERRUN:
479 try:
480 return self.__data_cache[key]
481 except KeyError:
482 pass
483 else:
484 # if we simulated to an overrun, then we our result is certainly
485 # an overrun; no need to consult the cache. (and we store this result
486 # for simulation-less lookup later).
487 self.__data_cache[key] = Overrun
488 return Overrun
489 try:
490 return self.__data_cache[key]
491 except KeyError:
492 pass
493
494 data = self.new_conjecture_data(choices, max_choices=max_length)
495 # note that calling test_function caches `data` for us, for both an ir
496 # tree key and a buffer key.
497 self.test_function(data)
498 return data.as_result()
499
500 def test_function(self, data: ConjectureData) -> None:
501 if self.__pending_call_explanation is not None:
502 self.debug(self.__pending_call_explanation)
503 self.__pending_call_explanation = None
504
505 self.call_count += 1
506 interrupted = False
507
508 try:
509 self.__stoppable_test_function(data)
510 except KeyboardInterrupt:
511 interrupted = True
512 raise
513 except BackendCannotProceed as exc:
514 if exc.scope in ("verified", "exhausted"):
515 self._switch_to_hypothesis_provider = True
516 if exc.scope == "verified":
517 self._verified_by = self.settings.backend
518 elif exc.scope == "discard_test_case":
519 self.__failed_realize_count += 1
520 if (
521 self.__failed_realize_count > 10
522 and (self.__failed_realize_count / self.call_count) > 0.2
523 ):
524 self._switch_to_hypothesis_provider = True
525
526 # treat all BackendCannotProceed exceptions as invalid. This isn't
527 # great; "verified" should really be counted as self.valid_examples += 1.
528 # But we check self.valid_examples == 0 to determine whether to raise
529 # Unsatisfiable, and that would throw this check off.
530 self.invalid_examples += 1
531
532 # skip the post-test-case tracking; we're pretending this never happened
533 interrupted = True
534 data.cannot_proceed_scope = exc.scope
535 data.freeze()
536 return
537 except BaseException:
538 data.freeze()
539 if self.settings.backend != "hypothesis":
540 realize_choices(data, for_failure=True)
541 self.save_choices(data.choices)
542 raise
543 finally:
544 # No branch, because if we're interrupted we always raise
545 # the KeyboardInterrupt, never continue to the code below.
546 if not interrupted: # pragma: no branch
547 assert data.cannot_proceed_scope is None
548 data.freeze()
549 call_stats: CallStats = {
550 "status": data.status.name.lower(),
551 "runtime": data.finish_time - data.start_time,
552 "drawtime": math.fsum(data.draw_times.values()),
553 "gctime": data.gc_finish_time - data.gc_start_time,
554 "events": sorted(
555 k if v == "" else f"{k}: {v}" for k, v in data.events.items()
556 ),
557 }
558 self.stats_per_test_case.append(call_stats)
559 if self.settings.backend != "hypothesis":
560 realize_choices(data, for_failure=data.status is Status.INTERESTING)
561
562 self._cache(data)
563 if data.misaligned_at is not None: # pragma: no branch # coverage bug?
564 self.misaligned_count += 1
565
566 self.debug_data(data)
567
568 if (
569 data.target_observations
570 and self.pareto_front is not None
571 and self.pareto_front.add(data.as_result())
572 ):
573 self.save_choices(data.choices, sub_key=b"pareto")
574
575 if data.status >= Status.VALID:
576 for k, v in data.target_observations.items():
577 self.best_observed_targets[k] = max(self.best_observed_targets[k], v)
578
579 if k not in self.best_examples_of_observed_targets:
580 data_as_result = data.as_result()
581 assert not isinstance(data_as_result, _Overrun)
582 self.best_examples_of_observed_targets[k] = data_as_result
583 continue
584
585 existing_example = self.best_examples_of_observed_targets[k]
586 existing_score = existing_example.target_observations[k]
587
588 if v < existing_score:
589 continue
590
591 if v > existing_score or sort_key(data.nodes) < sort_key(
592 existing_example.nodes
593 ):
594 data_as_result = data.as_result()
595 assert not isinstance(data_as_result, _Overrun)
596 self.best_examples_of_observed_targets[k] = data_as_result
597
598 if data.status is Status.VALID:
599 self.valid_examples += 1
600 if data.status is Status.INVALID:
601 self.invalid_examples += 1
602 if data.status is Status.OVERRUN:
603 self.overrun_examples += 1
604
605 if data.status == Status.INTERESTING:
606 if not self.using_hypothesis_backend:
607 # replay this failure on the hypothesis backend to ensure it still
608 # finds a failure. otherwise, it is flaky.
609 initial_origin = data.interesting_origin
610 initial_traceback = data.expected_traceback
611 data = ConjectureData.for_choices(data.choices)
612 self.__stoppable_test_function(data)
613 data.freeze()
614 # TODO: Convert to FlakyFailure on the way out. Should same-origin
615 # also be checked?
616 if data.status != Status.INTERESTING:
617 desc_new_status = {
618 data.status.VALID: "passed",
619 data.status.INVALID: "failed filters",
620 data.status.OVERRUN: "overran",
621 }[data.status]
622 wrapped_tb = (
623 ""
624 if initial_traceback is None
625 else textwrap.indent(initial_traceback, " | ")
626 )
627 raise FlakyReplay(
628 f"Inconsistent results from replaying a failing test case!\n"
629 f"{wrapped_tb}on backend={self.settings.backend!r} but "
630 f"{desc_new_status} under backend='hypothesis'",
631 interesting_origins=[initial_origin],
632 )
633
634 self._cache(data)
635
636 key = data.interesting_origin
637 changed = False
638 try:
639 existing = self.interesting_examples[key]
640 except KeyError:
641 changed = True
642 self.last_bug_found_at = self.call_count
643 if self.first_bug_found_at is None:
644 self.first_bug_found_at = self.call_count
645 else:
646 if sort_key(data.nodes) < sort_key(existing.nodes):
647 self.shrinks += 1
648 self.downgrade_choices(existing.choices)
649 self.__data_cache.unpin(self._cache_key(existing.choices))
650 changed = True
651
652 if changed:
653 self.save_choices(data.choices)
654 self.interesting_examples[key] = data.as_result() # type: ignore
655 if not self.using_hypothesis_backend:
656 self._backend_found_failure = True
657 self.__data_cache.pin(self._cache_key(data.choices), data.as_result())
658 self.shrunk_examples.discard(key)
659
660 if self.shrinks >= MAX_SHRINKS:
661 self.exit_with(ExitReason.max_shrinks)
662
663 if (
664 not self.ignore_limits
665 and self.finish_shrinking_deadline is not None
666 and self.finish_shrinking_deadline < time.perf_counter()
667 ):
668 # See https://github.com/HypothesisWorks/hypothesis/issues/2340
669 report(
670 "WARNING: Hypothesis has spent more than five minutes working to shrink"
671 " a failing example, and stopped because it is making very slow"
672 " progress. When you re-run your tests, shrinking will resume and may"
673 " take this long before aborting again.\nPLEASE REPORT THIS if you can"
674 " provide a reproducing example, so that we can improve shrinking"
675 " performance for everyone."
676 )
677 self.exit_with(ExitReason.very_slow_shrinking)
678
679 if not self.interesting_examples:
680 # Note that this logic is reproduced to end the generation phase when
681 # we have interesting examples. Update that too if you change this!
682 # (The doubled implementation is because here we exit the engine entirely,
683 # while in the other case below we just want to move on to shrinking.)
684 if self.valid_examples >= self.settings.max_examples:
685 self.exit_with(ExitReason.max_examples)
686 if self.call_count >= max(
687 self.settings.max_examples * 10,
688 # We have a high-ish default max iterations, so that tests
689 # don't become flaky when max_examples is too low.
690 1000,
691 ):
692 self.exit_with(ExitReason.max_iterations)
693
694 if self.__tree_is_exhausted():
695 self.exit_with(ExitReason.finished)
696
697 self.record_for_health_check(data)
698
699 def on_pareto_evict(self, data: ConjectureResult) -> None:
700 self.settings.database.delete(self.pareto_key, choices_to_bytes(data.choices))
701
702 def generate_novel_prefix(self) -> tuple[ChoiceT, ...]:
703 """Uses the tree to proactively generate a starting choice sequence
704 that we haven't explored yet for this test.
705
706 When this method is called, we assume that there must be at
707 least one novel prefix left to find. If there were not, then the
708 test run should have already stopped due to tree exhaustion.
709 """
710 return self.tree.generate_novel_prefix(self.random)
711
712 def record_for_health_check(self, data: ConjectureData) -> None:
713 # Once we've actually found a bug, there's no point in trying to run
714 # health checks - they'll just mask the actually important information.
715 if data.status == Status.INTERESTING:
716 self.health_check_state = None
717
718 state = self.health_check_state
719
720 if state is None:
721 return
722
723 for k, v in data.draw_times.items():
724 state.draw_times[k].append(v)
725
726 if data.status == Status.VALID:
727 state.valid_examples += 1
728 elif data.status == Status.INVALID:
729 state.invalid_examples += 1
730 else:
731 assert data.status == Status.OVERRUN
732 state.overrun_examples += 1
733
734 max_valid_draws = 10
735 max_invalid_draws = 50
736 max_overrun_draws = 20
737
738 assert state.valid_examples <= max_valid_draws
739
740 if state.valid_examples == max_valid_draws:
741 self.health_check_state = None
742 return
743
744 if state.overrun_examples == max_overrun_draws:
745 fail_health_check(
746 self.settings,
747 "Examples routinely exceeded the max allowable size. "
748 f"({state.overrun_examples} examples overran while generating "
749 f"{state.valid_examples} valid ones). Generating examples this large "
750 "will usually lead to bad results. You could try setting max_size "
751 "parameters on your collections and turning max_leaves down on "
752 "recursive() calls.",
753 HealthCheck.data_too_large,
754 )
755 if state.invalid_examples == max_invalid_draws:
756 fail_health_check(
757 self.settings,
758 "It looks like your strategy is filtering out a lot of data. Health "
759 f"check found {state.invalid_examples} filtered examples but only "
760 f"{state.valid_examples} good ones. This will make your tests much "
761 "slower, and also will probably distort the data generation quite a "
762 "lot. You should adapt your strategy to filter less. This can also "
763 "be caused by a low max_leaves parameter in recursive() calls",
764 HealthCheck.filter_too_much,
765 )
766
767 draw_time = state.total_draw_time
768
769 # Allow at least the greater of one second or 5x the deadline. If deadline
770 # is None, allow 30s - the user can disable the healthcheck too if desired.
771 draw_time_limit = 5 * (self.settings.deadline or timedelta(seconds=6))
772 if draw_time > max(1.0, draw_time_limit.total_seconds()):
773 fail_health_check(
774 self.settings,
775 "Data generation is extremely slow: Only produced "
776 f"{state.valid_examples} valid examples in {draw_time:.2f} seconds "
777 f"({state.invalid_examples} invalid ones and {state.overrun_examples} "
778 "exceeded maximum size). Try decreasing size of the data you're "
779 "generating (with e.g. max_size or max_leaves parameters)."
780 + state.timing_report(),
781 HealthCheck.too_slow,
782 )
783
784 def save_choices(
785 self, choices: Sequence[ChoiceT], sub_key: Optional[bytes] = None
786 ) -> None:
787 if self.settings.database is not None:
788 key = self.sub_key(sub_key)
789 if key is None:
790 return
791 self.settings.database.save(key, choices_to_bytes(choices))
792
793 def downgrade_choices(self, choices: Sequence[ChoiceT]) -> None:
794 buffer = choices_to_bytes(choices)
795 if self.settings.database is not None and self.database_key is not None:
796 self.settings.database.move(self.database_key, self.secondary_key, buffer)
797
798 def sub_key(self, sub_key: Optional[bytes]) -> Optional[bytes]:
799 if self.database_key is None:
800 return None
801 if sub_key is None:
802 return self.database_key
803 return b".".join((self.database_key, sub_key))
804
805 @property
806 def secondary_key(self) -> Optional[bytes]:
807 return self.sub_key(b"secondary")
808
809 @property
810 def pareto_key(self) -> Optional[bytes]:
811 return self.sub_key(b"pareto")
812
813 def debug(self, message: str) -> None:
814 if self.settings.verbosity >= Verbosity.debug:
815 base_report(message)
816
817 @property
818 def report_debug_info(self) -> bool:
819 return self.settings.verbosity >= Verbosity.debug
820
821 def debug_data(self, data: Union[ConjectureData, ConjectureResult]) -> None:
822 if not self.report_debug_info:
823 return
824
825 status = repr(data.status)
826 if data.status == Status.INTERESTING:
827 status = f"{status} ({data.interesting_origin!r})"
828
829 self.debug(
830 f"{len(data.choices)} choices {data.choices} -> {status}"
831 f"{', ' + data.output if data.output else ''}"
832 )
833
834 def observe_for_provider(self) -> AbstractContextManager:
835 def on_observation(observation: Observation) -> None:
836 assert observation.type == "test_case"
837 # because lifetime == "test_function"
838 assert isinstance(self.provider, PrimitiveProvider)
839 # only fire if we actually used that provider to generate this observation
840 if not self._switch_to_hypothesis_provider:
841 self.provider.on_observation(observation)
842
843 if (
844 self.settings.backend != "hypothesis"
845 # only for lifetime = "test_function" providers (guaranteed
846 # by this isinstance check)
847 and isinstance(self.provider, PrimitiveProvider)
848 # and the provider opted-in to observations
849 and self.provider.add_observability_callback
850 ):
851 return with_observation_callback(on_observation)
852 return nullcontext()
853
854 def run(self) -> None:
855 with local_settings(self.settings):
856 # NOTE: For compatibility with Python 3.9's LL(1)
857 # parser, this is written as a nested with-statement,
858 # instead of a compound one.
859 with self.observe_for_provider():
860 try:
861 self._run()
862 except RunIsComplete:
863 pass
864 for v in self.interesting_examples.values():
865 self.debug_data(v)
866 self.debug(
867 "Run complete after %d examples (%d valid) and %d shrinks"
868 % (self.call_count, self.valid_examples, self.shrinks)
869 )
870
871 @property
872 def database(self) -> Optional[ExampleDatabase]:
873 if self.database_key is None:
874 return None
875 return self.settings.database
876
877 def has_existing_examples(self) -> bool:
878 return self.database is not None and Phase.reuse in self.settings.phases
879
880 def reuse_existing_examples(self) -> None:
881 """If appropriate (we have a database and have been told to use it),
882 try to reload existing examples from the database.
883
884 If there are a lot we don't try all of them. We always try the
885 smallest example in the database (which is guaranteed to be the
886 last failure) and the largest (which is usually the seed example
887 which the last failure came from but we don't enforce that). We
888 then take a random sampling of the remainder and try those. Any
889 examples that are no longer interesting are cleared out.
890 """
891 if self.has_existing_examples():
892 self.debug("Reusing examples from database")
893 # We have to do some careful juggling here. We have two database
894 # corpora: The primary and secondary. The primary corpus is a
895 # small set of minimized examples each of which has at one point
896 # demonstrated a distinct bug. We want to retry all of these.
897
898 # We also have a secondary corpus of examples that have at some
899 # point demonstrated interestingness (currently only ones that
900 # were previously non-minimal examples of a bug, but this will
901 # likely expand in future). These are a good source of potentially
902 # interesting examples, but there are a lot of them, so we down
903 # sample the secondary corpus to a more manageable size.
904
905 corpus = sorted(
906 self.settings.database.fetch(self.database_key), key=shortlex
907 )
908 factor = 0.1 if (Phase.generate in self.settings.phases) else 1
909 desired_size = max(2, ceil(factor * self.settings.max_examples))
910 primary_corpus_size = len(corpus)
911
912 if len(corpus) < desired_size:
913 extra_corpus = list(self.settings.database.fetch(self.secondary_key))
914
915 shortfall = desired_size - len(corpus)
916
917 if len(extra_corpus) <= shortfall:
918 extra = extra_corpus
919 else:
920 extra = self.random.sample(extra_corpus, shortfall)
921 extra.sort(key=shortlex)
922 corpus.extend(extra)
923
924 # We want a fast path where every primary entry in the database was
925 # interesting.
926 found_interesting_in_primary = False
927 all_interesting_in_primary_were_exact = True
928
929 for i, existing in enumerate(corpus):
930 if i >= primary_corpus_size and found_interesting_in_primary:
931 break
932 choices = choices_from_bytes(existing)
933 if choices is None:
934 # clear out any keys which fail deserialization
935 self.settings.database.delete(self.database_key, existing)
936 continue
937 data = self.cached_test_function(choices, extend="full")
938 if data.status != Status.INTERESTING:
939 self.settings.database.delete(self.database_key, existing)
940 self.settings.database.delete(self.secondary_key, existing)
941 else:
942 if i < primary_corpus_size:
943 found_interesting_in_primary = True
944 assert not isinstance(data, _Overrun)
945 if choices_key(choices) != choices_key(data.choices):
946 all_interesting_in_primary_were_exact = False
947 if not self.settings.report_multiple_bugs:
948 break
949 if found_interesting_in_primary:
950 if all_interesting_in_primary_were_exact:
951 self.reused_previously_shrunk_test_case = True
952
953 # Because self.database is not None (because self.has_existing_examples())
954 # and self.database_key is not None (because we fetched using it above),
955 # we can guarantee self.pareto_front is not None
956 assert self.pareto_front is not None
957
958 # If we've not found any interesting examples so far we try some of
959 # the pareto front from the last run.
960 if len(corpus) < desired_size and not self.interesting_examples:
961 desired_extra = desired_size - len(corpus)
962 pareto_corpus = list(self.settings.database.fetch(self.pareto_key))
963 if len(pareto_corpus) > desired_extra:
964 pareto_corpus = self.random.sample(pareto_corpus, desired_extra)
965 pareto_corpus.sort(key=shortlex)
966
967 for existing in pareto_corpus:
968 choices = choices_from_bytes(existing)
969 if choices is None:
970 self.settings.database.delete(self.pareto_key, existing)
971 continue
972 data = self.cached_test_function(choices, extend="full")
973 if data not in self.pareto_front:
974 self.settings.database.delete(self.pareto_key, existing)
975 if data.status == Status.INTERESTING:
976 break
977
978 def exit_with(self, reason: ExitReason) -> None:
979 if self.ignore_limits:
980 return
981 self.statistics["stopped-because"] = reason.describe(self.settings)
982 if self.best_observed_targets:
983 self.statistics["targets"] = dict(self.best_observed_targets)
984 self.debug(f"exit_with({reason.name})")
985 self.exit_reason = reason
986 raise RunIsComplete
987
988 def should_generate_more(self) -> bool:
989 # End the generation phase where we would have ended it if no bugs had
990 # been found. This reproduces the exit logic in `self.test_function`,
991 # but with the important distinction that this clause will move on to
992 # the shrinking phase having found one or more bugs, while the other
993 # will exit having found zero bugs.
994 if self.valid_examples >= self.settings.max_examples or self.call_count >= max(
995 self.settings.max_examples * 10, 1000
996 ): # pragma: no cover
997 return False
998
999 # If we haven't found a bug, keep looking - if we hit any limits on
1000 # the number of tests to run that will raise an exception and stop
1001 # the run.
1002 if not self.interesting_examples:
1003 return True
1004 # Users who disable shrinking probably want to exit as fast as possible.
1005 # If we've found a bug and won't report more than one, stop looking.
1006 elif (
1007 Phase.shrink not in self.settings.phases
1008 or not self.settings.report_multiple_bugs
1009 ):
1010 return False
1011 assert isinstance(self.first_bug_found_at, int)
1012 assert isinstance(self.last_bug_found_at, int)
1013 assert self.first_bug_found_at <= self.last_bug_found_at <= self.call_count
1014 # Otherwise, keep searching for between ten and 'a heuristic' calls.
1015 # We cap 'calls after first bug' so errors are reported reasonably
1016 # soon even for tests that are allowed to run for a very long time,
1017 # or sooner if the latest half of our test effort has been fruitless.
1018 return self.call_count < MIN_TEST_CALLS or self.call_count < min(
1019 self.first_bug_found_at + 1000, self.last_bug_found_at * 2
1020 )
1021
1022 def generate_new_examples(self) -> None:
1023 if Phase.generate not in self.settings.phases:
1024 return
1025 if self.interesting_examples:
1026 # The example database has failing examples from a previous run,
1027 # so we'd rather report that they're still failing ASAP than take
1028 # the time to look for additional failures.
1029 return
1030
1031 self.debug("Generating new examples")
1032
1033 assert self.should_generate_more()
1034 self._switch_to_hypothesis_provider = True
1035 zero_data = self.cached_test_function((ChoiceTemplate("simplest", count=None),))
1036 if zero_data.status > Status.OVERRUN:
1037 assert isinstance(zero_data, ConjectureResult)
1038 # if the crosshair backend cannot proceed, it does not (and cannot)
1039 # realize the symbolic values, with the intent that Hypothesis will
1040 # throw away this test case. We usually do, but if it's the zero data
1041 # then we try to pin it here, which requires realizing the symbolics.
1042 #
1043 # We don't (yet) rely on the zero data being pinned, and so
1044 # it's simply a very slight performance loss to simply not pin it
1045 # if doing so would error.
1046 if zero_data.cannot_proceed_scope is None: # pragma: no branch
1047 self.__data_cache.pin(
1048 self._cache_key(zero_data.choices), zero_data.as_result()
1049 ) # Pin forever
1050
1051 if zero_data.status == Status.OVERRUN or (
1052 zero_data.status == Status.VALID
1053 and isinstance(zero_data, ConjectureResult)
1054 and zero_data.length * 2 > BUFFER_SIZE
1055 ):
1056 fail_health_check(
1057 self.settings,
1058 "The smallest natural example for your test is extremely "
1059 "large. This makes it difficult for Hypothesis to generate "
1060 "good examples, especially when trying to reduce failing ones "
1061 "at the end. Consider reducing the size of your data if it is "
1062 "of a fixed size. You could also fix this by improving how "
1063 "your data shrinks (see https://hypothesis.readthedocs.io/en/"
1064 "latest/data.html#shrinking for details), or by introducing "
1065 "default values inside your strategy. e.g. could you replace "
1066 "some arguments with their defaults by using "
1067 "one_of(none(), some_complex_strategy)?",
1068 HealthCheck.large_base_example,
1069 )
1070
1071 self.health_check_state = HealthCheckState()
1072
1073 # We attempt to use the size of the minimal generated test case starting
1074 # from a given novel prefix as a guideline to generate smaller test
1075 # cases for an initial period, by restriscting ourselves to test cases
1076 # that are not much larger than it.
1077 #
1078 # Calculating the actual minimal generated test case is hard, so we
1079 # take a best guess that zero extending a prefix produces the minimal
1080 # test case starting with that prefix (this is true for our built in
1081 # strategies). This is only a reasonable thing to do if the resulting
1082 # test case is valid. If we regularly run into situations where it is
1083 # not valid then this strategy is a waste of time, so we want to
1084 # abandon it early. In order to do this we track how many times in a
1085 # row it has failed to work, and abort small test case generation when
1086 # it has failed too many times in a row.
1087 consecutive_zero_extend_is_invalid = 0
1088
1089 # We control growth during initial example generation, for two
1090 # reasons:
1091 #
1092 # * It gives us an opportunity to find small examples early, which
1093 # gives us a fast path for easy to find bugs.
1094 # * It avoids low probability events where we might end up
1095 # generating very large examples during health checks, which
1096 # on slower machines can trigger HealthCheck.too_slow.
1097 #
1098 # The heuristic we use is that we attempt to estimate the smallest
1099 # extension of this prefix, and limit the size to no more than
1100 # an order of magnitude larger than that. If we fail to estimate
1101 # the size accurately, we skip over this prefix and try again.
1102 #
1103 # We need to tune the example size based on the initial prefix,
1104 # because any fixed size might be too small, and any size based
1105 # on the strategy in general can fall afoul of strategies that
1106 # have very different sizes for different prefixes.
1107 #
1108 # We previously set a minimum value of 10 on small_example_cap, with the
1109 # reasoning of avoiding flaky health checks. However, some users set a
1110 # low max_examples for performance. A hard lower bound in this case biases
1111 # the distribution towards small (and less powerful) examples. Flaky
1112 # and loud health checks are better than silent performance degradation.
1113 small_example_cap = min(self.settings.max_examples // 10, 50)
1114 optimise_at = max(self.settings.max_examples // 2, small_example_cap + 1, 10)
1115 ran_optimisations = False
1116 self._switch_to_hypothesis_provider = False
1117
1118 while self.should_generate_more():
1119 # we don't yet integrate DataTree with backends. Instead of generating
1120 # a novel prefix, ask the backend for an input.
1121 if not self.using_hypothesis_backend:
1122 data = self.new_conjecture_data([])
1123 with suppress(BackendCannotProceed):
1124 self.test_function(data)
1125 continue
1126
1127 self._current_phase = "generate"
1128 prefix = self.generate_novel_prefix()
1129 if (
1130 self.valid_examples <= small_example_cap
1131 and self.call_count <= 5 * small_example_cap
1132 and not self.interesting_examples
1133 and consecutive_zero_extend_is_invalid < 5
1134 ):
1135 minimal_example = self.cached_test_function(
1136 prefix + (ChoiceTemplate("simplest", count=None),)
1137 )
1138
1139 if minimal_example.status < Status.VALID:
1140 consecutive_zero_extend_is_invalid += 1
1141 continue
1142 # Because the Status code is greater than Status.VALID, it cannot be
1143 # Status.OVERRUN, which guarantees that the minimal_example is a
1144 # ConjectureResult object.
1145 assert isinstance(minimal_example, ConjectureResult)
1146 consecutive_zero_extend_is_invalid = 0
1147 minimal_extension = len(minimal_example.choices) - len(prefix)
1148 max_length = len(prefix) + minimal_extension * 5
1149
1150 # We could end up in a situation where even though the prefix was
1151 # novel when we generated it, because we've now tried zero extending
1152 # it not all possible continuations of it will be novel. In order to
1153 # avoid making redundant test calls, we rerun it in simulation mode
1154 # first. If this has a predictable result, then we don't bother
1155 # running the test function for real here. If however we encounter
1156 # some novel behaviour, we try again with the real test function,
1157 # starting from the new novel prefix that has discovered.
1158 trial_data = self.new_conjecture_data(prefix, max_choices=max_length)
1159 try:
1160 self.tree.simulate_test_function(trial_data)
1161 continue
1162 except PreviouslyUnseenBehaviour:
1163 pass
1164
1165 # If the simulation entered part of the tree that has been killed,
1166 # we don't want to run this.
1167 assert isinstance(trial_data.observer, TreeRecordingObserver)
1168 if trial_data.observer.killed:
1169 continue
1170
1171 # We might have hit the cap on number of examples we should
1172 # run when calculating the minimal example.
1173 if not self.should_generate_more():
1174 break
1175
1176 prefix = trial_data.choices
1177 else:
1178 max_length = None
1179
1180 data = self.new_conjecture_data(prefix, max_choices=max_length)
1181 self.test_function(data)
1182
1183 if (
1184 data.status is Status.OVERRUN
1185 and max_length is not None
1186 and "invalid because" not in data.events
1187 ):
1188 data.events["invalid because"] = (
1189 "reduced max size for early examples (avoids flaky health checks)"
1190 )
1191
1192 self.generate_mutations_from(data)
1193
1194 # Although the optimisations are logically a distinct phase, we
1195 # actually normally run them as part of example generation. The
1196 # reason for this is that we cannot guarantee that optimisation
1197 # actually exhausts our budget: It might finish running and we
1198 # discover that actually we still could run a bunch more test cases
1199 # if we want.
1200 if (
1201 self.valid_examples >= max(small_example_cap, optimise_at)
1202 and not ran_optimisations
1203 ):
1204 ran_optimisations = True
1205 self._current_phase = "target"
1206 self.optimise_targets()
1207
1208 def generate_mutations_from(
1209 self, data: Union[ConjectureData, ConjectureResult]
1210 ) -> None:
1211 # A thing that is often useful but rarely happens by accident is
1212 # to generate the same value at multiple different points in the
1213 # test case.
1214 #
1215 # Rather than make this the responsibility of individual strategies
1216 # we implement a small mutator that just takes parts of the test
1217 # case with the same label and tries replacing one of them with a
1218 # copy of the other and tries running it. If we've made a good
1219 # guess about what to put where, this will run a similar generated
1220 # test case with more duplication.
1221 if (
1222 # An OVERRUN doesn't have enough information about the test
1223 # case to mutate, so we just skip those.
1224 data.status >= Status.INVALID
1225 # This has a tendency to trigger some weird edge cases during
1226 # generation so we don't let it run until we're done with the
1227 # health checks.
1228 and self.health_check_state is None
1229 ):
1230 initial_calls = self.call_count
1231 failed_mutations = 0
1232
1233 while (
1234 self.should_generate_more()
1235 # We implement fairly conservative checks for how long we
1236 # we should run mutation for, as it's generally not obvious
1237 # how helpful it is for any given test case.
1238 and self.call_count <= initial_calls + 5
1239 and failed_mutations <= 5
1240 ):
1241 groups = data.spans.mutator_groups
1242 if not groups:
1243 break
1244
1245 group = self.random.choice(groups)
1246 (start1, end1), (start2, end2) = self.random.sample(sorted(group), 2)
1247 if start1 > start2:
1248 (start1, end1), (start2, end2) = (start2, end2), (start1, end1)
1249
1250 if (
1251 start1 <= start2 <= end2 <= end1
1252 ): # pragma: no cover # flaky on conjecture-cover tests
1253 # One span entirely contains the other. The strategy is very
1254 # likely some kind of tree. e.g. we might have
1255 #
1256 # ┌─────┐
1257 # ┌─────┤ a ├──────┐
1258 # │ └─────┘ │
1259 # ┌──┴──┐ ┌──┴──┐
1260 # ┌──┤ b ├──┐ ┌──┤ c ├──┐
1261 # │ └──┬──┘ │ │ └──┬──┘ │
1262 # ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐
1263 # │ d │ │ e │ │ f │ │ g │ │ h │ │ i │
1264 # └───┘ └───┘ └───┘ └───┘ └───┘ └───┘
1265 #
1266 # where each node is drawn from the same strategy and so
1267 # has the same span label. We might have selected the spans
1268 # corresponding to the a and c nodes, which is the entire
1269 # tree and the subtree of (and including) c respectively.
1270 #
1271 # There are two possible mutations we could apply in this case:
1272 # 1. replace a with c (replace child with parent)
1273 # 2. replace c with a (replace parent with child)
1274 #
1275 # (1) results in multiple partial copies of the
1276 # parent:
1277 # ┌─────┐
1278 # ┌─────┤ a ├────────────┐
1279 # │ └─────┘ │
1280 # ┌──┴──┐ ┌─┴───┐
1281 # ┌──┤ b ├──┐ ┌─────┤ a ├──────┐
1282 # │ └──┬──┘ │ │ └─────┘ │
1283 # ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌──┴──┐ ┌──┴──┐
1284 # │ d │ │ e │ │ f │ ┌──┤ b ├──┐ ┌──┤ c ├──┐
1285 # └───┘ └───┘ └───┘ │ └──┬──┘ │ │ └──┬──┘ │
1286 # ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐
1287 # │ d │ │ e │ │ f │ │ g │ │ h │ │ i │
1288 # └───┘ └───┘ └───┘ └───┘ └───┘ └───┘
1289 #
1290 # While (2) results in truncating part of the parent:
1291 #
1292 # ┌─────┐
1293 # ┌──┤ c ├──┐
1294 # │ └──┬──┘ │
1295 # ┌─┴─┐ ┌─┴─┐ ┌─┴─┐
1296 # │ g │ │ h │ │ i │
1297 # └───┘ └───┘ └───┘
1298 #
1299 # (1) is the same as Example IV.4. in Nautilus (NDSS '19)
1300 # (https://wcventure.github.io/FuzzingPaper/Paper/NDSS19_Nautilus.pdf),
1301 # except we do not repeat the replacement additional times
1302 # (the paper repeats it once for a total of two copies).
1303 #
1304 # We currently only apply mutation (1), and ignore mutation
1305 # (2). The reason is that the attempt generated from (2) is
1306 # always something that Hypothesis could easily have generated
1307 # itself, by simply not making various choices. Whereas
1308 # duplicating the exact value + structure of particular choices
1309 # in (1) would have been hard for Hypothesis to generate by
1310 # chance.
1311 #
1312 # TODO: an extension of this mutation might repeat (1) on
1313 # a geometric distribution between 0 and ~10 times. We would
1314 # need to find the corresponding span to recurse on in the new
1315 # choices, probably just by using the choices index.
1316
1317 # case (1): duplicate the choices in start1:start2.
1318 attempt = data.choices[:start2] + data.choices[start1:]
1319 else:
1320 (start, end) = self.random.choice([(start1, end1), (start2, end2)])
1321 replacement = data.choices[start:end]
1322 # We attempt to replace both the examples with
1323 # whichever choice we made. Note that this might end
1324 # up messing up and getting the example boundaries
1325 # wrong - labels matching are only a best guess as to
1326 # whether the two are equivalent - but it doesn't
1327 # really matter. It may not achieve the desired result,
1328 # but it's still a perfectly acceptable choice sequence
1329 # to try.
1330 attempt = (
1331 data.choices[:start1]
1332 + replacement
1333 + data.choices[end1:start2]
1334 + replacement
1335 + data.choices[end2:]
1336 )
1337
1338 try:
1339 new_data = self.cached_test_function(
1340 attempt,
1341 # We set error_on_discard so that we don't end up
1342 # entering parts of the tree we consider redundant
1343 # and not worth exploring.
1344 error_on_discard=True,
1345 )
1346 except ContainsDiscard:
1347 failed_mutations += 1
1348 continue
1349
1350 if new_data is Overrun:
1351 failed_mutations += 1 # pragma: no cover # annoying case
1352 else:
1353 assert isinstance(new_data, ConjectureResult)
1354 if (
1355 new_data.status >= data.status
1356 and choices_key(data.choices) != choices_key(new_data.choices)
1357 and all(
1358 k in new_data.target_observations
1359 and new_data.target_observations[k] >= v
1360 for k, v in data.target_observations.items()
1361 )
1362 ):
1363 data = new_data
1364 failed_mutations = 0
1365 else:
1366 failed_mutations += 1
1367
1368 def optimise_targets(self) -> None:
1369 """If any target observations have been made, attempt to optimise them
1370 all."""
1371 if not self.should_optimise:
1372 return
1373 from hypothesis.internal.conjecture.optimiser import Optimiser
1374
1375 # We want to avoid running the optimiser for too long in case we hit
1376 # an unbounded target score. We start this off fairly conservatively
1377 # in case interesting examples are easy to find and then ramp it up
1378 # on an exponential schedule so we don't hamper the optimiser too much
1379 # if it needs a long time to find good enough improvements.
1380 max_improvements = 10
1381 while True:
1382 prev_calls = self.call_count
1383
1384 any_improvements = False
1385
1386 for target, data in list(self.best_examples_of_observed_targets.items()):
1387 optimiser = Optimiser(
1388 self, data, target, max_improvements=max_improvements
1389 )
1390 optimiser.run()
1391 if optimiser.improvements > 0:
1392 any_improvements = True
1393
1394 if self.interesting_examples:
1395 break
1396
1397 max_improvements *= 2
1398
1399 if any_improvements:
1400 continue
1401
1402 if self.best_observed_targets:
1403 self.pareto_optimise()
1404
1405 if prev_calls == self.call_count:
1406 break
1407
1408 def pareto_optimise(self) -> None:
1409 if self.pareto_front is not None:
1410 ParetoOptimiser(self).run()
1411
1412 def _run(self) -> None:
1413 # have to use the primitive provider to interpret database bits...
1414 self._switch_to_hypothesis_provider = True
1415 with self._log_phase_statistics("reuse"):
1416 self.reuse_existing_examples()
1417 # Fast path for development: If the database gave us interesting
1418 # examples from the previously stored primary key, don't try
1419 # shrinking it again as it's unlikely to work.
1420 if self.reused_previously_shrunk_test_case:
1421 self.exit_with(ExitReason.finished)
1422 # ...but we should use the supplied provider when generating...
1423 self._switch_to_hypothesis_provider = False
1424 with self._log_phase_statistics("generate"):
1425 self.generate_new_examples()
1426 # We normally run the targeting phase mixed in with the generate phase,
1427 # but if we've been asked to run it but not generation then we have to
1428 # run it explicitly on its own here.
1429 if Phase.generate not in self.settings.phases:
1430 self._current_phase = "target"
1431 self.optimise_targets()
1432 # ...and back to the primitive provider when shrinking.
1433 self._switch_to_hypothesis_provider = True
1434 with self._log_phase_statistics("shrink"):
1435 self.shrink_interesting_examples()
1436 self.exit_with(ExitReason.finished)
1437
1438 def new_conjecture_data(
1439 self,
1440 prefix: Sequence[Union[ChoiceT, ChoiceTemplate]],
1441 *,
1442 observer: Optional[DataObserver] = None,
1443 max_choices: Optional[int] = None,
1444 ) -> ConjectureData:
1445 provider = (
1446 HypothesisProvider if self._switch_to_hypothesis_provider else self.provider
1447 )
1448 observer = observer or self.tree.new_observer()
1449 if not self.using_hypothesis_backend:
1450 observer = DataObserver()
1451
1452 return ConjectureData(
1453 prefix=prefix,
1454 observer=observer,
1455 provider=provider,
1456 max_choices=max_choices,
1457 random=self.random,
1458 )
1459
1460 def shrink_interesting_examples(self) -> None:
1461 """If we've found interesting examples, try to replace each of them
1462 with a minimal interesting example with the same interesting_origin.
1463
1464 We may find one or more examples with a new interesting_origin
1465 during the shrink process. If so we shrink these too.
1466 """
1467 if Phase.shrink not in self.settings.phases or not self.interesting_examples:
1468 return
1469
1470 self.debug("Shrinking interesting examples")
1471 self.finish_shrinking_deadline = time.perf_counter() + MAX_SHRINKING_SECONDS
1472
1473 for prev_data in sorted(
1474 self.interesting_examples.values(), key=lambda d: sort_key(d.nodes)
1475 ):
1476 assert prev_data.status == Status.INTERESTING
1477 data = self.new_conjecture_data(prev_data.choices)
1478 self.test_function(data)
1479 if data.status != Status.INTERESTING:
1480 self.exit_with(ExitReason.flaky)
1481
1482 self.clear_secondary_key()
1483
1484 while len(self.shrunk_examples) < len(self.interesting_examples):
1485 target, example = min(
1486 (
1487 (k, v)
1488 for k, v in self.interesting_examples.items()
1489 if k not in self.shrunk_examples
1490 ),
1491 key=lambda kv: (sort_key(kv[1].nodes), shortlex(repr(kv[0]))),
1492 )
1493 self.debug(f"Shrinking {target!r}: {example.choices}")
1494
1495 if not self.settings.report_multiple_bugs:
1496 # If multi-bug reporting is disabled, we shrink our currently-minimal
1497 # failure, allowing 'slips' to any bug with a smaller minimal example.
1498 self.shrink(example, lambda d: d.status == Status.INTERESTING)
1499 return
1500
1501 def predicate(d: Union[ConjectureResult, _Overrun]) -> bool:
1502 if d.status < Status.INTERESTING:
1503 return False
1504 d = cast(ConjectureResult, d)
1505 return d.interesting_origin == target
1506
1507 self.shrink(example, predicate)
1508
1509 self.shrunk_examples.add(target)
1510
1511 def clear_secondary_key(self) -> None:
1512 if self.has_existing_examples():
1513 # If we have any smaller examples in the secondary corpus, now is
1514 # a good time to try them to see if they work as shrinks. They
1515 # probably won't, but it's worth a shot and gives us a good
1516 # opportunity to clear out the database.
1517
1518 # It's not worth trying the primary corpus because we already
1519 # tried all of those in the initial phase.
1520 corpus = sorted(
1521 self.settings.database.fetch(self.secondary_key), key=shortlex
1522 )
1523 for c in corpus:
1524 choices = choices_from_bytes(c)
1525 if choices is None:
1526 self.settings.database.delete(self.secondary_key, c)
1527 continue
1528 primary = {
1529 choices_to_bytes(v.choices)
1530 for v in self.interesting_examples.values()
1531 }
1532 if shortlex(c) > max(map(shortlex, primary)):
1533 break
1534
1535 self.cached_test_function(choices)
1536 # We unconditionally remove c from the secondary key as it
1537 # is either now primary or worse than our primary example
1538 # of this reason for interestingness.
1539 self.settings.database.delete(self.secondary_key, c)
1540
1541 def shrink(
1542 self,
1543 example: Union[ConjectureData, ConjectureResult],
1544 predicate: Optional[ShrinkPredicateT] = None,
1545 allow_transition: Optional[
1546 Callable[[Union[ConjectureData, ConjectureResult], ConjectureData], bool]
1547 ] = None,
1548 ) -> Union[ConjectureData, ConjectureResult]:
1549 s = self.new_shrinker(example, predicate, allow_transition)
1550 s.shrink()
1551 return s.shrink_target
1552
1553 def new_shrinker(
1554 self,
1555 example: Union[ConjectureData, ConjectureResult],
1556 predicate: Optional[ShrinkPredicateT] = None,
1557 allow_transition: Optional[
1558 Callable[[Union[ConjectureData, ConjectureResult], ConjectureData], bool]
1559 ] = None,
1560 ) -> Shrinker:
1561 return Shrinker(
1562 self,
1563 example,
1564 predicate,
1565 allow_transition=allow_transition,
1566 explain=Phase.explain in self.settings.phases,
1567 in_target_phase=self._current_phase == "target",
1568 )
1569
1570 def passing_choice_sequences(
1571 self, prefix: Sequence[ChoiceNode] = ()
1572 ) -> frozenset[tuple[ChoiceNode, ...]]:
1573 """Return a collection of choice sequence nodes which cause the test to pass.
1574 Optionally restrict this by a certain prefix, which is useful for explain mode.
1575 """
1576 return frozenset(
1577 cast(ConjectureResult, result).nodes
1578 for key in self.__data_cache
1579 if (result := self.__data_cache[key]).status is Status.VALID
1580 and startswith(cast(ConjectureResult, result).nodes, prefix)
1581 )
1582
1583
1584class ContainsDiscard(Exception):
1585 pass