1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
10
11import importlib
12import math
13import threading
14import time
15from collections import defaultdict
16from collections.abc import Callable, Generator, Sequence
17from contextlib import AbstractContextManager, contextmanager, nullcontext
18from dataclasses import dataclass, field
19from datetime import timedelta
20from enum import Enum
21from random import Random
22from typing import Literal, NoReturn, cast
23
24from hypothesis import HealthCheck, Phase, Verbosity, settings as Settings
25from hypothesis._settings import local_settings
26from hypothesis.database import ExampleDatabase, choices_from_bytes, choices_to_bytes
27from hypothesis.errors import (
28 BackendCannotProceed,
29 FlakyBackendFailure,
30 FlakyStrategyDefinition,
31 HypothesisException,
32 InvalidArgument,
33 StopTest,
34)
35from hypothesis.internal.cache import LRUReusedCache
36from hypothesis.internal.compat import NotRequired, TypedDict, ceil, override
37from hypothesis.internal.conjecture.choice import (
38 ChoiceConstraintsT,
39 ChoiceKeyT,
40 ChoiceNode,
41 ChoiceT,
42 ChoiceTemplate,
43 choices_key,
44)
45from hypothesis.internal.conjecture.data import (
46 ConjectureData,
47 ConjectureResult,
48 DataObserver,
49 Overrun,
50 Status,
51 _Overrun,
52)
53from hypothesis.internal.conjecture.datatree import (
54 DataTree,
55 PreviouslyUnseenBehaviour,
56 TreeRecordingObserver,
57)
58from hypothesis.internal.conjecture.junkdrawer import (
59 ensure_free_stackframes,
60 startswith,
61)
62from hypothesis.internal.conjecture.pareto import NO_SCORE, ParetoFront, ParetoOptimiser
63from hypothesis.internal.conjecture.providers import (
64 AVAILABLE_PROVIDERS,
65 HypothesisProvider,
66 PrimitiveProvider,
67)
68from hypothesis.internal.conjecture.shrinker import Shrinker, ShrinkPredicateT, sort_key
69from hypothesis.internal.escalation import InterestingOrigin
70from hypothesis.internal.healthcheck import fail_health_check
71from hypothesis.internal.observability import Observation, with_observability_callback
72from hypothesis.reporting import base_report, report, verbose_report
73
74# In most cases, the following constants are all Final. However, we do allow users
75# to monkeypatch all of these variables, which means we cannot annotate them as
76# Final or mypyc will inline them and render monkeypatching useless.
77
78#: The maximum number of times the shrinker will reduce the complexity of a failing
79#: input before giving up. This avoids falling down a trap of exponential (or worse)
80#: complexity, where the shrinker appears to be making progress but will take a
81#: substantially long time to finish completely.
82MAX_SHRINKS: int = 500
83
84# If the shrinking phase takes more than five minutes, abort it early and print
85# a warning. Many CI systems will kill a build after around ten minutes with
86# no output, and appearing to hang isn't great for interactive use either -
87# showing partially-shrunk examples is better than quitting with no examples!
88# (but make it monkeypatchable, for the rare users who need to keep on shrinking)
89
90#: The maximum total time in seconds that the shrinker will try to shrink a failure
91#: for before giving up. This is across all shrinks for the same failure, so even
92#: if the shrinker successfully reduces the complexity of a single failure several
93#: times, it will stop when it hits |MAX_SHRINKING_SECONDS| of total time taken.
94MAX_SHRINKING_SECONDS: int = 300
95
96#: The maximum amount of entropy a single test case can use before giving up
97#: while making random choices during input generation.
98#:
99#: The "unit" of one |BUFFER_SIZE| does not have any defined semantics, and you
100#: should not rely on it, except that a linear increase |BUFFER_SIZE| will linearly
101#: increase the amount of entropy a test case can use during generation.
102BUFFER_SIZE: int = 8 * 1024
103CACHE_SIZE: int = 10000
104MIN_TEST_CALLS: int = 10
105
106# we use this to isolate Hypothesis from interacting with the global random,
107# to make it easier to reason about our global random warning logic (see
108# deprecate_random_in_strategy).
109_random = Random()
110
111
112def shortlex(s):
113 return (len(s), s)
114
115
116@dataclass(slots=True, frozen=False)
117class HealthCheckState:
118 valid_examples: int = field(default=0)
119 invalid_examples: int = field(default=0)
120 overrun_examples: int = field(default=0)
121 draw_times: defaultdict[str, list[float]] = field(
122 default_factory=lambda: defaultdict(list)
123 )
124
125 @property
126 def total_draw_time(self) -> float:
127 return math.fsum(sum(self.draw_times.values(), start=[]))
128
129 def timing_report(self) -> str:
130 """Return a terminal report describing what was slow."""
131 if not self.draw_times:
132 return ""
133 width = max(
134 len(k.removeprefix("generate:").removesuffix(": ")) for k in self.draw_times
135 )
136 out = [f"\n {'':^{width}} count | fraction | slowest draws (seconds)"]
137 args_in_order = sorted(self.draw_times.items(), key=lambda kv: -sum(kv[1]))
138 for i, (argname, times) in enumerate(args_in_order): # pragma: no branch
139 # If we have very many unique keys, which can happen due to interactive
140 # draws with computed labels, we'll skip uninformative rows.
141 if (
142 5 <= i < (len(self.draw_times) - 2)
143 and math.fsum(times) * 20 < self.total_draw_time
144 ):
145 out.append(f" (skipped {len(self.draw_times) - i} rows of fast draws)")
146 break
147 # Compute the row to report, omitting times <1ms to focus on slow draws
148 reprs = [f"{t:>6.3f}," for t in sorted(times)[-5:] if t > 5e-4]
149 desc = " ".join(([" -- "] * 5 + reprs)[-5:]).rstrip(",")
150 arg = argname.removeprefix("generate:").removesuffix(": ")
151 out.append(
152 f" {arg:^{width}} | {len(times):>4} | "
153 f"{math.fsum(times)/self.total_draw_time:>7.0%} | {desc}"
154 )
155 return "\n".join(out)
156
157
158def _invalid_thresholds(*, r: float, c: float) -> tuple[int, int]:
159 base = math.ceil(math.log(1 - c) / math.log(1 - r)) - 1
160 per_p = math.ceil(1 / r)
161 return base, per_p
162
163
164# stop once we're 99% confident the true valid rate is below 1%. See
165# https://github.com/HypothesisWorks/hypothesis/issues/4623#issuecomment-3814681997
166# for how we derived this formula.
167INVALID_THRESHOLD_BASE, INVALID_PER_VALID = _invalid_thresholds(r=0.01, c=0.99)
168
169
170class ExitReason(Enum):
171 max_examples = "settings.max_examples={s.max_examples}"
172 max_iterations = (
173 "settings.max_examples={s.max_examples}, "
174 "but < 1% of examples satisfied assumptions"
175 )
176 max_shrinks = f"shrunk example {MAX_SHRINKS} times"
177 finished = "nothing left to do"
178 flaky = "test was flaky"
179 very_slow_shrinking = "shrinking was very slow"
180
181 def describe(self, settings: Settings) -> str:
182 return self.value.format(s=settings)
183
184
185class RunIsComplete(Exception):
186 pass
187
188
189def _get_provider(backend: str) -> PrimitiveProvider | type[PrimitiveProvider]:
190 provider_cls = AVAILABLE_PROVIDERS[backend]
191 if isinstance(provider_cls, str):
192 module_name, class_name = provider_cls.rsplit(".", 1)
193 provider_cls = getattr(importlib.import_module(module_name), class_name)
194
195 if provider_cls.lifetime == "test_function":
196 return provider_cls(None)
197 elif provider_cls.lifetime == "test_case":
198 return provider_cls
199 else:
200 raise InvalidArgument(
201 f"invalid lifetime {provider_cls.lifetime} for provider {provider_cls.__name__}. "
202 "Expected one of 'test_function', 'test_case'."
203 )
204
205
206class CallStats(TypedDict):
207 status: str
208 runtime: float
209 drawtime: float
210 gctime: float
211 events: list[str]
212
213
214PhaseStatistics = TypedDict(
215 "PhaseStatistics",
216 {
217 "duration-seconds": float,
218 "test-cases": list[CallStats],
219 "distinct-failures": int,
220 "shrinks-successful": int,
221 },
222)
223StatisticsDict = TypedDict(
224 "StatisticsDict",
225 {
226 "generate-phase": NotRequired[PhaseStatistics],
227 "reuse-phase": NotRequired[PhaseStatistics],
228 "shrink-phase": NotRequired[PhaseStatistics],
229 "explain-phase": NotRequired[PhaseStatistics],
230 "stopped-because": NotRequired[str],
231 "targets": NotRequired[dict[str, float]],
232 "nodeid": NotRequired[str],
233 },
234)
235
236
237def choice_count(choices: Sequence[ChoiceT | ChoiceTemplate]) -> int | None:
238 count = 0
239 for choice in choices:
240 if isinstance(choice, ChoiceTemplate):
241 if choice.count is None:
242 return None
243 count += choice.count
244 else:
245 count += 1
246 return count
247
248
249class DiscardObserver(DataObserver):
250 @override
251 def kill_branch(self) -> NoReturn:
252 raise ContainsDiscard
253
254
255def realize_choices(data: ConjectureData, *, for_failure: bool) -> None:
256 for node in data.nodes:
257 value = data.provider.realize(node.value, for_failure=for_failure)
258 expected_type = {
259 "string": str,
260 "float": float,
261 "integer": int,
262 "boolean": bool,
263 "bytes": bytes,
264 }[node.type]
265 if type(value) is not expected_type:
266 raise HypothesisException(
267 f"expected {expected_type} from "
268 f"{data.provider.realize.__qualname__}, got {type(value)}"
269 )
270
271 constraints = cast(
272 ChoiceConstraintsT,
273 {
274 k: data.provider.realize(v, for_failure=for_failure)
275 for k, v in node.constraints.items()
276 },
277 )
278 node.value = value
279 node.constraints = constraints
280
281
282class ConjectureRunner:
283 def __init__(
284 self,
285 test_function: Callable[[ConjectureData], None],
286 *,
287 settings: Settings | None = None,
288 random: Random | None = None,
289 database_key: bytes | None = None,
290 ignore_limits: bool = False,
291 thread_overlap: dict[int, bool] | None = None,
292 ) -> None:
293 self._test_function: Callable[[ConjectureData], None] = test_function
294 self.settings: Settings = settings or Settings()
295 self.shrinks: int = 0
296 self.finish_shrinking_deadline: float | None = None
297 self.call_count: int = 0
298 self.misaligned_count: int = 0
299 self.valid_examples: int = 0
300 self.invalid_examples: int = 0
301 self.overrun_examples: int = 0
302 self.random: Random = random or Random(_random.getrandbits(128))
303 self.database_key: bytes | None = database_key
304 self.ignore_limits: bool = ignore_limits
305 self.thread_overlap = {} if thread_overlap is None else thread_overlap
306
307 # Global dict of per-phase statistics, and a list of per-call stats
308 # which transfer to the global dict at the end of each phase.
309 self._current_phase: str = "(not a phase)"
310 self.statistics: StatisticsDict = {}
311 self.stats_per_test_case: list[CallStats] = []
312 # Time spent in any nested phase, so the enclosing phase can exclude it.
313 self._nested_phase_seconds: float = 0.0
314
315 self.interesting_examples: dict[InterestingOrigin, ConjectureResult] = {}
316 # We use call_count because there may be few possible valid_examples.
317 self.first_bug_found_at: int | None = None
318 self.last_bug_found_at: int | None = None
319 self.first_bug_found_time: float = math.inf
320
321 self.shrunk_examples: set[InterestingOrigin] = set()
322 self.health_check_state: HealthCheckState | None = None
323 self.tree: DataTree = DataTree()
324 self.provider: PrimitiveProvider | type[PrimitiveProvider] = _get_provider(
325 self.settings.backend
326 )
327
328 self.best_observed_targets: defaultdict[str, float] = defaultdict(
329 lambda: NO_SCORE
330 )
331 self.best_examples_of_observed_targets: dict[str, ConjectureResult] = {}
332
333 # We keep the pareto front in the example database if we have one. This
334 # is only marginally useful at present, but speeds up local development
335 # because it means that large targets will be quickly surfaced in your
336 # testing.
337 self.pareto_front: ParetoFront | None = None
338 if self.database_key is not None and self.settings.database is not None:
339 self.pareto_front = ParetoFront(self.random)
340 self.pareto_front.on_evict(self.on_pareto_evict)
341
342 # We want to be able to get the ConjectureData object that results
343 # from running a choice sequence without recalculating, especially during
344 # shrinking where we need to know about the structure of the
345 # executed test case.
346 self.__data_cache = LRUReusedCache[
347 tuple[ChoiceKeyT, ...], ConjectureResult | _Overrun
348 ](CACHE_SIZE)
349
350 self.reused_previously_shrunk_test_case: bool = False
351
352 self.__pending_call_explanation: str | None = None
353 self._backend_found_failure: bool = False
354 self._backend_exceeded_deadline: bool = False
355 self._backend_discard_count: int = 0
356 # note unsound verification by alt backends
357 self._verified_by_backend: str | None = None
358 self._switch_to_hypothesis_provider: bool = False
359
360 @contextmanager
361 def _with_switch_to_hypothesis_provider(
362 self, value: bool
363 ) -> Generator[None, None, None]:
364 previous = self._switch_to_hypothesis_provider
365 try:
366 self._switch_to_hypothesis_provider = value
367 yield
368 finally:
369 self._switch_to_hypothesis_provider = previous
370
371 @property
372 def using_hypothesis_backend(self) -> bool:
373 return (
374 self.settings.backend == "hypothesis" or self._switch_to_hypothesis_provider
375 )
376
377 def explain_next_call_as(self, explanation: str) -> None:
378 self.__pending_call_explanation = explanation
379
380 def clear_call_explanation(self) -> None:
381 self.__pending_call_explanation = None
382
383 @contextmanager
384 def _log_phase_statistics(
385 self, phase: Literal["reuse", "generate", "shrink", "explain"]
386 ) -> Generator[None, None, None]:
387 # Phases may nest - the explain phase runs inside the shrink phase - so
388 # we save and restore the per-call stats and current phase, exclude the
389 # duration of any nested phase, and accumulate when a phase is entered
390 # more than once (the explain phase runs once per shrinking target).
391 saved_stats = self.stats_per_test_case
392 saved_phase = self._current_phase
393 saved_nested_seconds = self._nested_phase_seconds
394 self.stats_per_test_case = []
395 self._current_phase = phase
396 self._nested_phase_seconds = 0.0
397 start_time = time.perf_counter()
398 try:
399 yield
400 finally:
401 elapsed = time.perf_counter() - start_time
402 # A phase can be entered more than once (the explain phase runs once
403 # per shrinking target), so accumulate into any existing bucket.
404 stats = self.statistics.setdefault(
405 phase + "-phase", # type: ignore
406 {"duration-seconds": 0.0, "test-cases": []},
407 )
408 stats["duration-seconds"] += elapsed - self._nested_phase_seconds
409 stats["test-cases"] += self.stats_per_test_case
410 stats["distinct-failures"] = len(self.interesting_examples)
411 stats["shrinks-successful"] = self.shrinks
412 self.stats_per_test_case = saved_stats
413 self._current_phase = saved_phase
414 self._nested_phase_seconds = saved_nested_seconds + elapsed
415
416 @property
417 def should_optimise(self) -> bool:
418 return Phase.target in self.settings.phases
419
420 def __tree_is_exhausted(self) -> bool:
421 return self.tree.is_exhausted and self.using_hypothesis_backend
422
423 def __stoppable_test_function(self, data: ConjectureData) -> None:
424 """Run ``self._test_function``, but convert a ``StopTest`` exception
425 into a normal return and avoid raising anything flaky for RecursionErrors.
426 """
427 # We ensure that the test has this much stack space remaining, no
428 # matter the size of the stack when called, to de-flake RecursionErrors
429 # (#2494, #3671). Note, this covers the data generation part of the test;
430 # the actual test execution is additionally protected at the call site
431 # in hypothesis.core.execute_once.
432 with ensure_free_stackframes():
433 try:
434 self._test_function(data)
435 except StopTest as e:
436 if e.testcounter == data.testcounter:
437 # This StopTest has successfully stopped its test, and can now
438 # be discarded.
439 pass
440 else:
441 # This StopTest was raised by a different ConjectureData. We
442 # need to re-raise it so that it will eventually reach the
443 # correct engine.
444 raise
445
446 def _cache_key(self, choices: Sequence[ChoiceT]) -> tuple[ChoiceKeyT, ...]:
447 return choices_key(choices)
448
449 def _cache(self, data: ConjectureData) -> None:
450 result = data.as_result()
451 key = self._cache_key(data.choices)
452 self.__data_cache[key] = result
453
454 def cached_test_function(
455 self,
456 choices: Sequence[ChoiceT | ChoiceTemplate],
457 *,
458 error_on_discard: bool = False,
459 extend: int | Literal["full"] = 0,
460 ) -> ConjectureResult | _Overrun:
461 """
462 If ``error_on_discard`` is set to True this will raise ``ContainsDiscard``
463 in preference to running the actual test function. This is to allow us
464 to skip test cases we expect to be redundant in some cases. Note that
465 it may be the case that we don't raise ``ContainsDiscard`` even if the
466 result has discards if we cannot determine from previous runs whether
467 it will have a discard.
468 """
469 # node templates represent a not-yet-filled hole and therefore cannot
470 # be cached or retrieved from the cache.
471 if not any(isinstance(choice, ChoiceTemplate) for choice in choices):
472 # this type cast is validated by the isinstance check above (ie, there
473 # are no ChoiceTemplate elements).
474 choices = cast(Sequence[ChoiceT], choices)
475 key = self._cache_key(choices)
476 try:
477 cached = self.__data_cache[key]
478 # if we have a cached overrun for this key, but we're allowing extensions
479 # of the nodes, it could in fact run to a valid data if we try.
480 if extend == 0 or cached.status is not Status.OVERRUN:
481 return cached
482 except KeyError:
483 pass
484
485 if extend == "full":
486 max_length = None
487 elif (count := choice_count(choices)) is None:
488 max_length = None
489 else:
490 max_length = count + extend
491
492 # explicitly use a no-op DataObserver here instead of a TreeRecordingObserver.
493 # The reason is we don't expect simulate_test_function to explore new choices
494 # and write back to the tree, so we don't want the overhead of the
495 # TreeRecordingObserver tracking those calls.
496 trial_observer: DataObserver | None = DataObserver()
497 if error_on_discard:
498 trial_observer = DiscardObserver()
499
500 try:
501 trial_data = self.new_conjecture_data(
502 choices, observer=trial_observer, max_choices=max_length
503 )
504 self.tree.simulate_test_function(trial_data)
505 except PreviouslyUnseenBehaviour:
506 pass
507 else:
508 trial_data.freeze()
509 key = self._cache_key(trial_data.choices)
510 if trial_data.status > Status.OVERRUN:
511 try:
512 return self.__data_cache[key]
513 except KeyError:
514 pass
515 else:
516 # if we simulated to an overrun, then we our result is certainly
517 # an overrun; no need to consult the cache. (and we store this result
518 # for simulation-less lookup later).
519 self.__data_cache[key] = Overrun
520 return Overrun
521 try:
522 return self.__data_cache[key]
523 except KeyError:
524 pass
525
526 data = self.new_conjecture_data(choices, max_choices=max_length)
527 # note that calling test_function caches `data` for us.
528 self.test_function(data)
529 return data.as_result()
530
531 def test_function(self, data: ConjectureData) -> None:
532 if self.__pending_call_explanation is not None:
533 self.debug(self.__pending_call_explanation)
534 self.__pending_call_explanation = None
535
536 self.call_count += 1
537 interrupted = False
538
539 def _backend_cannot_proceed(
540 exc: BackendCannotProceed, data: ConjectureData
541 ) -> None:
542 if exc.scope in ("verified", "exhausted"):
543 self._switch_to_hypothesis_provider = True
544 if exc.scope == "verified":
545 self._verified_by_backend = self.settings.backend
546 elif exc.scope == "discard_test_case":
547 self._backend_discard_count += 1
548 if (
549 self._backend_discard_count > 10
550 and (self._backend_discard_count / self.call_count) > 0.2
551 ):
552 verbose_report(
553 f"Switching away from backend {self.settings.backend!r} "
554 "to the Hypothesis backend, "
555 f"because {self._backend_discard_count} of {self.call_count} "
556 "attempted test cases "
557 f"({self._backend_discard_count / self.call_count * 100:0.1f}%) "
558 f"were discarded by backend {self.settings.backend!r}"
559 )
560 self._switch_to_hypothesis_provider = True
561
562 # treat all BackendCannotProceed exceptions as invalid. This isn't
563 # great; "verified" should really be counted as self.valid_examples += 1.
564 # But we check self.valid_examples == 0 to determine whether to raise
565 # Unsatisfiable, and that would throw this check off.
566 self.invalid_examples += 1
567 data.cannot_proceed_scope = exc.scope
568
569 # this fiddly bit of control flow is to work around `return` being
570 # disallowed in `finally` blocks as of python 3.14. Otherwise, we would
571 # just return in the _backend_cannot_proceed branch.
572 finally_early_return = False
573
574 try:
575 self.__stoppable_test_function(data)
576 except KeyboardInterrupt:
577 interrupted = True
578 raise
579 except BackendCannotProceed as exc:
580 _backend_cannot_proceed(exc, data)
581 # skip the post-test-case tracking; we're pretending this never happened
582 interrupted = True
583 data.freeze()
584 return
585 except BaseException as err:
586 data.freeze()
587 if isinstance(err, FlakyStrategyDefinition) and data._stateful_repr_parts:
588 # In a stateful test, surface the steps leading up to the
589 # inconsistency.
590 report(
591 "Steps leading up to this error:\n"
592 + "\n".join(f" {s}" for s in data._stateful_repr_parts)
593 )
594 if self.settings.backend != "hypothesis":
595 try:
596 realize_choices(data, for_failure=True)
597 except BackendCannotProceed as exc:
598 _backend_cannot_proceed(exc, data)
599 # skip the post-test-case tracking; we're pretending this
600 # never happened
601 interrupted = True
602 return
603 self.save_choices(data.choices)
604 raise
605 finally:
606 # No branch, because if we're interrupted we always raise
607 # the KeyboardInterrupt, never continue to the code below.
608 if not interrupted: # pragma: no branch
609 assert data.cannot_proceed_scope is None
610 data.freeze()
611
612 if self.settings.backend != "hypothesis":
613 try:
614 realize_choices(
615 data, for_failure=data.status is Status.INTERESTING
616 )
617 except BackendCannotProceed as exc:
618 _backend_cannot_proceed(exc, data)
619 finally_early_return = True
620
621 if not finally_early_return:
622 call_stats: CallStats = {
623 "status": data.status.name.lower(),
624 "runtime": data.finish_time - data.start_time,
625 "drawtime": math.fsum(data.draw_times.values()),
626 "gctime": data.gc_finish_time - data.gc_start_time,
627 "events": sorted(
628 k if v == "" else f"{k}: {v}"
629 for k, v in data.events.items()
630 ),
631 }
632 self.stats_per_test_case.append(call_stats)
633
634 self._cache(data)
635 if (
636 data.misaligned_at is not None
637 ): # pragma: no branch # coverage bug?
638 self.misaligned_count += 1
639
640 if finally_early_return:
641 return
642
643 self.debug_data(data)
644
645 if (
646 data.target_observations
647 and self.pareto_front is not None
648 and self.pareto_front.add(data.as_result())
649 ):
650 self.save_choices(data.choices, sub_key=b"pareto")
651
652 if data.status >= Status.VALID:
653 for k, v in data.target_observations.items():
654 self.best_observed_targets[k] = max(self.best_observed_targets[k], v)
655
656 if k not in self.best_examples_of_observed_targets:
657 data_as_result = data.as_result()
658 assert not isinstance(data_as_result, _Overrun)
659 self.best_examples_of_observed_targets[k] = data_as_result
660 continue
661
662 existing_example = self.best_examples_of_observed_targets[k]
663 existing_score = existing_example.target_observations[k]
664
665 if v < existing_score:
666 continue
667
668 if v > existing_score or sort_key(data.nodes) < sort_key(
669 existing_example.nodes
670 ):
671 data_as_result = data.as_result()
672 assert not isinstance(data_as_result, _Overrun)
673 self.best_examples_of_observed_targets[k] = data_as_result
674
675 if data.status is Status.VALID:
676 self.valid_examples += 1
677 if data.status is Status.INVALID:
678 self.invalid_examples += 1
679 if data.status is Status.OVERRUN:
680 self.overrun_examples += 1
681
682 if data.status == Status.INTERESTING:
683 if not self.using_hypothesis_backend:
684 # replay this failure on the hypothesis backend to ensure it still
685 # finds a failure. otherwise, it is flaky.
686 initial_exception = data.expected_exception
687 data = ConjectureData.for_choices(data.choices)
688 # we've already going to use the hypothesis provider for this
689 # data, so the verb "switch" is a bit misleading here. We're really
690 # setting this to inform our on_observation logic that the observation
691 # generated here was from a hypothesis backend, and shouldn't be
692 # sent to the on_observation of any alternative backend.
693 with self._with_switch_to_hypothesis_provider(True):
694 self.__stoppable_test_function(data)
695 data.freeze()
696 # TODO: Should same-origin also be checked? (discussion in
697 # https://github.com/HypothesisWorks/hypothesis/pull/4470#discussion_r2217055487)
698 if data.status != Status.INTERESTING:
699 desc_new_status = {
700 data.status.VALID: "passed",
701 data.status.INVALID: "failed filters",
702 data.status.OVERRUN: "overran",
703 }[data.status]
704 raise FlakyBackendFailure(
705 f"Inconsistent results from replaying a failing test case! "
706 f"Raised {type(initial_exception).__name__} on "
707 f"backend={self.settings.backend!r}, but "
708 f"{desc_new_status} under backend='hypothesis'.",
709 [initial_exception],
710 )
711
712 self._cache(data)
713
714 assert data.interesting_origin is not None
715 key = data.interesting_origin
716 changed = False
717 try:
718 existing = self.interesting_examples[key]
719 except KeyError:
720 changed = True
721 self.last_bug_found_at = self.call_count
722 if self.first_bug_found_at is None:
723 self.first_bug_found_at = self.call_count
724 self.first_bug_found_time = time.monotonic()
725 else:
726 if sort_key(data.nodes) < sort_key(existing.nodes):
727 self.shrinks += 1
728 self.downgrade_choices(existing.choices)
729 self.__data_cache.unpin(self._cache_key(existing.choices))
730 changed = True
731
732 if changed:
733 self.save_choices(data.choices)
734 self.interesting_examples[key] = data.as_result() # type: ignore
735 if not self.using_hypothesis_backend:
736 self._backend_found_failure = True
737 self.__data_cache.pin(self._cache_key(data.choices), data.as_result())
738 self.shrunk_examples.discard(key)
739
740 if self.shrinks >= MAX_SHRINKS:
741 self.exit_with(ExitReason.max_shrinks)
742
743 if (
744 not self.ignore_limits
745 and self.finish_shrinking_deadline is not None
746 and self.finish_shrinking_deadline < time.perf_counter()
747 ):
748 # See https://github.com/HypothesisWorks/hypothesis/issues/2340
749 report(
750 "WARNING: Hypothesis has spent more than five minutes working to shrink"
751 " a failing example, and stopped because it is making very slow"
752 " progress. When you re-run your tests, shrinking will resume and may"
753 " take this long before aborting again.\n\nPLEASE REPORT THIS if you can"
754 " provide a reproducing example, so that we can improve shrinking"
755 " performance for everyone."
756 )
757 self.exit_with(ExitReason.very_slow_shrinking)
758
759 if not self.interesting_examples:
760 # Note that this logic is reproduced to end the generation phase when
761 # we have interesting examples. Update that too if you change this!
762 # (The doubled implementation is because here we exit the engine entirely,
763 # while in the other case below we just want to move on to shrinking.)
764 if self.valid_examples >= self.settings.max_examples:
765 self.exit_with(ExitReason.max_examples)
766 if (self.invalid_examples + self.overrun_examples) > (
767 INVALID_THRESHOLD_BASE + INVALID_PER_VALID * self.valid_examples
768 ):
769 self.exit_with(ExitReason.max_iterations)
770
771 if self.__tree_is_exhausted():
772 self.exit_with(ExitReason.finished)
773
774 self.record_for_health_check(data)
775
776 def on_pareto_evict(self, data: ConjectureResult) -> None:
777 self.settings.database.delete(self.pareto_key, choices_to_bytes(data.choices))
778
779 def generate_novel_prefix(self) -> tuple[ChoiceT, ...]:
780 """Uses the tree to proactively generate a starting choice sequence
781 that we haven't explored yet for this test.
782
783 When this method is called, we assume that there must be at
784 least one novel prefix left to find. If there were not, then the
785 test run should have already stopped due to tree exhaustion.
786 """
787 return self.tree.generate_novel_prefix(self.random)
788
789 def record_for_health_check(self, data: ConjectureData) -> None:
790 # Once we've actually found a bug, there's no point in trying to run
791 # health checks - they'll just mask the actually important information.
792 if data.status == Status.INTERESTING:
793 self.health_check_state = None
794
795 state = self.health_check_state
796
797 if state is None:
798 return
799
800 for k, v in data.draw_times.items():
801 state.draw_times[k].append(v)
802
803 if data.status == Status.VALID:
804 state.valid_examples += 1
805 elif data.status == Status.INVALID:
806 state.invalid_examples += 1
807 else:
808 assert data.status == Status.OVERRUN
809 state.overrun_examples += 1
810
811 max_valid_draws = 10
812 max_invalid_draws = 50
813 max_overrun_draws = 20
814
815 assert state.valid_examples <= max_valid_draws
816
817 if state.valid_examples == max_valid_draws:
818 self.health_check_state = None
819 return
820
821 if state.overrun_examples == max_overrun_draws:
822 fail_health_check(
823 self.settings,
824 "Generated inputs routinely consumed more than the maximum "
825 f"allowed entropy: {state.valid_examples} inputs were generated "
826 f"successfully, while {state.overrun_examples} inputs exceeded the "
827 f"maximum allowed entropy during generation."
828 "\n\n"
829 f"Testing with inputs this large tends to be slow, and to produce "
830 "failures that are both difficult to shrink and difficult to understand. "
831 "Try decreasing the amount of data generated, for example by "
832 "decreasing the minimum size of collection strategies like "
833 "st.lists()."
834 "\n\n"
835 "If you expect the average size of your input to be this large, "
836 "you can disable this health check with "
837 "@settings(suppress_health_check=[HealthCheck.data_too_large]). "
838 "See "
839 "https://hypothesis.readthedocs.io/en/latest/reference/api.html#hypothesis.HealthCheck "
840 "for details.",
841 HealthCheck.data_too_large,
842 )
843 if state.invalid_examples == max_invalid_draws:
844 fail_health_check(
845 self.settings,
846 "It looks like this test is filtering out a lot of inputs. "
847 f"{state.valid_examples} inputs were generated successfully, "
848 f"while {state.invalid_examples} inputs were filtered out. "
849 "\n\n"
850 "An input might be filtered out by calls to assume(), "
851 "strategy.filter(...), or occasionally by Hypothesis internals."
852 "\n\n"
853 "Applying this much filtering makes input generation slow, since "
854 "Hypothesis must discard inputs which are filtered out and try "
855 "generating it again. It is also possible that applying this much "
856 "filtering will distort the domain and/or distribution of the test, "
857 "leaving your testing less rigorous than expected."
858 "\n\n"
859 "If you expect this many inputs to be filtered out during generation, "
860 "you can disable this health check with "
861 "@settings(suppress_health_check=[HealthCheck.filter_too_much]). See "
862 "https://hypothesis.readthedocs.io/en/latest/reference/api.html#hypothesis.HealthCheck "
863 "for details.",
864 HealthCheck.filter_too_much,
865 )
866
867 # Allow at least the greater of one second or 5x the deadline. If deadline
868 # is None, allow 30s - the user can disable the healthcheck too if desired.
869 draw_time = state.total_draw_time
870 draw_time_limit = 5 * (self.settings.deadline or timedelta(seconds=6))
871 if (
872 draw_time > max(1.0, draw_time_limit.total_seconds())
873 # we disable HealthCheck.too_slow under concurrent threads, since
874 # cpython may switch away from a thread for arbitrarily long.
875 and not self.thread_overlap.get(threading.get_ident(), False)
876 ):
877 extra_str = []
878 if state.invalid_examples:
879 extra_str.append(f"{state.invalid_examples} invalid inputs")
880 if state.overrun_examples:
881 extra_str.append(
882 f"{state.overrun_examples} inputs which exceeded the "
883 "maximum allowed entropy"
884 )
885 extra_str = ", and ".join(extra_str)
886 extra_str = f" ({extra_str})" if extra_str else ""
887
888 fail_health_check(
889 self.settings,
890 "Input generation is slow: Hypothesis only generated "
891 f"{state.valid_examples} valid inputs after {draw_time:.2f} "
892 f"seconds{extra_str}."
893 "\n" + state.timing_report() + "\n\n"
894 "This could be for a few reasons:"
895 "\n"
896 "1. This strategy could be generating too much data per input. "
897 "Try decreasing the amount of data generated, for example by "
898 "decreasing the minimum size of collection strategies like "
899 "st.lists()."
900 "\n"
901 "2. Some other expensive computation could be running during input "
902 "generation. For example, "
903 "if @st.composite or st.data() is interspersed with an expensive "
904 "computation, HealthCheck.too_slow is likely to trigger. If this "
905 "computation is unrelated to input generation, move it elsewhere. "
906 "Otherwise, try making it more efficient, or disable this health "
907 "check if that is not possible."
908 "\n\n"
909 "If you expect input generation to take this long, you can disable "
910 "this health check with "
911 "@settings(suppress_health_check=[HealthCheck.too_slow]). See "
912 "https://hypothesis.readthedocs.io/en/latest/reference/api.html#hypothesis.HealthCheck "
913 "for details.",
914 HealthCheck.too_slow,
915 )
916
917 def save_choices(
918 self, choices: Sequence[ChoiceT], sub_key: bytes | None = None
919 ) -> None:
920 if self.settings.database is not None:
921 key = self.sub_key(sub_key)
922 if key is None:
923 return
924 self.settings.database.save(key, choices_to_bytes(choices))
925
926 def downgrade_choices(self, choices: Sequence[ChoiceT]) -> None:
927 buffer = choices_to_bytes(choices)
928 if self.settings.database is not None and self.database_key is not None:
929 self.settings.database.move(self.database_key, self.secondary_key, buffer)
930
931 def sub_key(self, sub_key: bytes | None) -> bytes | None:
932 if self.database_key is None:
933 return None
934 if sub_key is None:
935 return self.database_key
936 return b".".join((self.database_key, sub_key))
937
938 @property
939 def secondary_key(self) -> bytes | None:
940 return self.sub_key(b"secondary")
941
942 @property
943 def pareto_key(self) -> bytes | None:
944 return self.sub_key(b"pareto")
945
946 def debug(self, message: str) -> None:
947 if self.settings.verbosity >= Verbosity.debug:
948 base_report(message)
949
950 @property
951 def report_debug_info(self) -> bool:
952 return self.settings.verbosity >= Verbosity.debug
953
954 def debug_data(self, data: ConjectureData | ConjectureResult) -> None:
955 if not self.report_debug_info:
956 return
957
958 status = repr(data.status)
959 if data.status == Status.INTERESTING:
960 status = f"{status} ({data.interesting_origin!r})"
961 elif data.status == Status.INVALID and isinstance(data, ConjectureData):
962 assert isinstance(data, ConjectureData) # mypy is silly
963 status = f"{status} ({data.events.get('invalid because', '?')})"
964
965 newline_tab = "\n\t"
966 self.debug(
967 f"{len(data.choices)} choices -> {status}\n\t{data.choices}"
968 f"{newline_tab + data.output if data.output else ''}"
969 )
970
971 def observe_for_provider(self) -> AbstractContextManager:
972 def on_observation(observation: Observation) -> None:
973 assert observation.type == "test_case"
974 # because lifetime == "test_function"
975 assert isinstance(self.provider, PrimitiveProvider)
976 # only fire if we actually used that provider to generate this observation
977 if not self._switch_to_hypothesis_provider:
978 self.provider.on_observation(observation)
979
980 if (
981 self.settings.backend != "hypothesis"
982 # only for lifetime = "test_function" providers (guaranteed
983 # by this isinstance check)
984 and isinstance(self.provider, PrimitiveProvider)
985 # and the provider opted-in to observations
986 and self.provider.add_observability_callback
987 ):
988 return with_observability_callback(on_observation)
989 return nullcontext()
990
991 def run(self) -> None:
992 with local_settings(self.settings), self.observe_for_provider():
993 try:
994 self._run()
995 except RunIsComplete:
996 pass
997 for v in self.interesting_examples.values():
998 self.debug_data(v)
999 self.debug(
1000 f"Run complete after {self.call_count} examples "
1001 f"({self.valid_examples} valid) and {self.shrinks} shrinks"
1002 )
1003
1004 @property
1005 def database(self) -> ExampleDatabase | None:
1006 if self.database_key is None:
1007 return None
1008 return self.settings.database
1009
1010 def has_existing_examples(self) -> bool:
1011 return self.database is not None and Phase.reuse in self.settings.phases
1012
1013 def reuse_existing_examples(self) -> None:
1014 """If appropriate (we have a database and have been told to use it),
1015 try to reload existing examples from the database.
1016
1017 If there are a lot we don't try all of them. We always try the
1018 smallest example in the database (which is guaranteed to be the
1019 last failure) and the largest (which is usually the seed example
1020 which the last failure came from but we don't enforce that). We
1021 then take a random sampling of the remainder and try those. Any
1022 examples that are no longer interesting are cleared out.
1023 """
1024 if self.has_existing_examples():
1025 self.debug("Reusing examples from database")
1026 # We have to do some careful juggling here. We have two database
1027 # corpora: The primary and secondary. The primary corpus is a
1028 # small set of minimized examples each of which has at one point
1029 # demonstrated a distinct bug. We want to retry all of these.
1030
1031 # We also have a secondary corpus of examples that have at some
1032 # point demonstrated interestingness (currently only ones that
1033 # were previously non-minimal examples of a bug, but this will
1034 # likely expand in future). These are a good source of potentially
1035 # interesting examples, but there are a lot of them, so we down
1036 # sample the secondary corpus to a more manageable size.
1037
1038 corpus = sorted(
1039 self.settings.database.fetch(self.database_key), key=shortlex
1040 )
1041 factor = 0.1 if (Phase.generate in self.settings.phases) else 1
1042 desired_size = max(2, ceil(factor * self.settings.max_examples))
1043 primary_corpus_size = len(corpus)
1044
1045 if len(corpus) < desired_size:
1046 extra_corpus = list(self.settings.database.fetch(self.secondary_key))
1047
1048 shortfall = desired_size - len(corpus)
1049
1050 if len(extra_corpus) <= shortfall:
1051 extra = extra_corpus
1052 else:
1053 extra = self.random.sample(extra_corpus, shortfall)
1054 extra.sort(key=shortlex)
1055 corpus.extend(extra)
1056
1057 # We want a fast path where every primary entry in the database was
1058 # interesting.
1059 found_interesting_in_primary = False
1060 all_interesting_in_primary_were_exact = True
1061
1062 for i, existing in enumerate(corpus):
1063 if i >= primary_corpus_size and found_interesting_in_primary:
1064 break
1065 choices = choices_from_bytes(existing)
1066 if choices is None:
1067 # clear out any keys which fail deserialization
1068 self.settings.database.delete(self.database_key, existing)
1069 continue
1070 data = self.cached_test_function(choices, extend="full")
1071 if data.status != Status.INTERESTING:
1072 self.settings.database.delete(self.database_key, existing)
1073 self.settings.database.delete(self.secondary_key, existing)
1074 else:
1075 if i < primary_corpus_size:
1076 found_interesting_in_primary = True
1077 assert not isinstance(data, _Overrun)
1078 if choices_key(choices) != choices_key(data.choices):
1079 all_interesting_in_primary_were_exact = False
1080 if not self.settings.report_multiple_bugs:
1081 break
1082 if found_interesting_in_primary:
1083 if all_interesting_in_primary_were_exact:
1084 self.reused_previously_shrunk_test_case = True
1085
1086 # Because self.database is not None (because self.has_existing_examples())
1087 # and self.database_key is not None (because we fetched using it above),
1088 # we can guarantee self.pareto_front is not None
1089 assert self.pareto_front is not None
1090
1091 # If we've not found any interesting examples so far we try some of
1092 # the pareto front from the last run.
1093 if len(corpus) < desired_size and not self.interesting_examples:
1094 desired_extra = desired_size - len(corpus)
1095 pareto_corpus = list(self.settings.database.fetch(self.pareto_key))
1096 if len(pareto_corpus) > desired_extra:
1097 pareto_corpus = self.random.sample(pareto_corpus, desired_extra)
1098 pareto_corpus.sort(key=shortlex)
1099
1100 for existing in pareto_corpus:
1101 choices = choices_from_bytes(existing)
1102 if choices is None:
1103 self.settings.database.delete(self.pareto_key, existing)
1104 continue
1105 data = self.cached_test_function(choices, extend="full")
1106 if data not in self.pareto_front:
1107 self.settings.database.delete(self.pareto_key, existing)
1108 if data.status == Status.INTERESTING:
1109 break
1110
1111 def exit_with(self, reason: ExitReason) -> None:
1112 if self.ignore_limits:
1113 return
1114 self.statistics["stopped-because"] = reason.describe(self.settings)
1115 if self.best_observed_targets:
1116 self.statistics["targets"] = dict(self.best_observed_targets)
1117 self.debug(f"exit_with({reason.name})")
1118 self.exit_reason = reason
1119 raise RunIsComplete
1120
1121 def should_generate_more(self) -> bool:
1122 # End the generation phase where we would have ended it if no bugs had
1123 # been found. This reproduces the exit logic in `self.test_function`,
1124 # but with the important distinction that this clause will move on to
1125 # the shrinking phase having found one or more bugs, while the other
1126 # will exit having found zero bugs.
1127 invalid_threshold = (
1128 INVALID_THRESHOLD_BASE + INVALID_PER_VALID * self.valid_examples
1129 )
1130 if (
1131 self.valid_examples >= self.settings.max_examples
1132 or (self.invalid_examples + self.overrun_examples) > invalid_threshold
1133 ): # pragma: no cover
1134 return False
1135
1136 # If we haven't found a bug, keep looking - if we hit any limits on
1137 # the number of tests to run that will raise an exception and stop
1138 # the run.
1139 if not self.interesting_examples:
1140 return True
1141 # Users who disable shrinking probably want to exit as fast as possible.
1142 # If we've found a bug and won't report more than one, stop looking.
1143 # If we first saw a bug more than 10 seconds ago, stop looking.
1144 elif (
1145 Phase.shrink not in self.settings.phases
1146 or not self.settings.report_multiple_bugs
1147 or time.monotonic() - self.first_bug_found_time > 10
1148 ):
1149 return False
1150 assert isinstance(self.first_bug_found_at, int)
1151 assert isinstance(self.last_bug_found_at, int)
1152 assert self.first_bug_found_at <= self.last_bug_found_at <= self.call_count
1153 # Otherwise, keep searching for between ten and 'a heuristic' calls.
1154 # We cap 'calls after first bug' so errors are reported reasonably
1155 # soon even for tests that are allowed to run for a very long time,
1156 # or sooner if the latest half of our test effort has been fruitless.
1157 return self.call_count < MIN_TEST_CALLS or self.call_count < min(
1158 self.first_bug_found_at + 1000, self.last_bug_found_at * 2
1159 )
1160
1161 def generate_new_examples(self) -> None:
1162 if Phase.generate not in self.settings.phases:
1163 return
1164 if self.interesting_examples:
1165 # The example database has failing examples from a previous run,
1166 # so we'd rather report that they're still failing ASAP than take
1167 # the time to look for additional failures.
1168 return
1169
1170 self.debug("Generating new examples")
1171
1172 assert self.should_generate_more()
1173 self._switch_to_hypothesis_provider = True
1174 zero_data = self.cached_test_function((ChoiceTemplate("simplest", count=None),))
1175 if zero_data.status > Status.OVERRUN:
1176 assert isinstance(zero_data, ConjectureResult)
1177 # if the crosshair backend cannot proceed, it does not (and cannot)
1178 # realize the symbolic values, with the intent that Hypothesis will
1179 # throw away this test case. We usually do, but if it's the zero data
1180 # then we try to pin it here, which requires realizing the symbolics.
1181 #
1182 # We don't (yet) rely on the zero data being pinned, and so
1183 # it's simply a very slight performance loss to simply not pin it
1184 # if doing so would error.
1185 if zero_data.cannot_proceed_scope is None: # pragma: no branch
1186 self.__data_cache.pin(
1187 self._cache_key(zero_data.choices), zero_data.as_result()
1188 ) # Pin forever
1189
1190 if zero_data.status == Status.OVERRUN or (
1191 zero_data.status == Status.VALID
1192 and isinstance(zero_data, ConjectureResult)
1193 and zero_data.length * 2 > BUFFER_SIZE
1194 ):
1195 fail_health_check(
1196 self.settings,
1197 "The smallest natural input for this test is very "
1198 "large. This makes it difficult for Hypothesis to generate "
1199 "good inputs, especially when trying to shrink failing inputs."
1200 "\n\n"
1201 "Consider reducing the amount of data generated by the strategy. "
1202 "Also consider introducing small alternative values for some "
1203 "strategies. For example, could you "
1204 "mark some arguments as optional by replacing `some_complex_strategy`"
1205 "with `st.none() | some_complex_strategy`?"
1206 "\n\n"
1207 "If you are confident that the size of the smallest natural input "
1208 "to your test cannot be reduced, you can suppress this health check "
1209 "with @settings(suppress_health_check=[HealthCheck.large_base_example]). "
1210 "See "
1211 "https://hypothesis.readthedocs.io/en/latest/reference/api.html#hypothesis.HealthCheck "
1212 "for details.",
1213 HealthCheck.large_base_example,
1214 )
1215
1216 self.health_check_state = HealthCheckState()
1217
1218 # We attempt to use the size of the minimal generated test case starting
1219 # from a given novel prefix as a guideline to generate smaller test
1220 # cases for an initial period, by restriscting ourselves to test cases
1221 # that are not much larger than it.
1222 #
1223 # Calculating the actual minimal generated test case is hard, so we
1224 # take a best guess that zero extending a prefix produces the minimal
1225 # test case starting with that prefix (this is true for our built in
1226 # strategies). This is only a reasonable thing to do if the resulting
1227 # test case is valid. If we regularly run into situations where it is
1228 # not valid then this strategy is a waste of time, so we want to
1229 # abandon it early. In order to do this we track how many times in a
1230 # row it has failed to work, and abort small test case generation when
1231 # it has failed too many times in a row.
1232 consecutive_zero_extend_is_invalid = 0
1233
1234 # We control growth during initial example generation, for two
1235 # reasons:
1236 #
1237 # * It gives us an opportunity to find small examples early, which
1238 # gives us a fast path for easy to find bugs.
1239 # * It avoids low probability events where we might end up
1240 # generating very large examples during health checks, which
1241 # on slower machines can trigger HealthCheck.too_slow.
1242 #
1243 # The heuristic we use is that we attempt to estimate the smallest
1244 # extension of this prefix, and limit the size to no more than
1245 # an order of magnitude larger than that. If we fail to estimate
1246 # the size accurately, we skip over this prefix and try again.
1247 #
1248 # We need to tune the example size based on the initial prefix,
1249 # because any fixed size might be too small, and any size based
1250 # on the strategy in general can fall afoul of strategies that
1251 # have very different sizes for different prefixes.
1252 #
1253 # We previously set a minimum value of 10 on small_example_cap, with the
1254 # reasoning of avoiding flaky health checks. However, some users set a
1255 # low max_examples for performance. A hard lower bound in this case biases
1256 # the distribution towards small (and less powerful) examples. Flaky
1257 # and loud health checks are better than silent performance degradation.
1258 small_example_cap = min(self.settings.max_examples // 10, 50)
1259 optimise_at = max(self.settings.max_examples // 2, small_example_cap + 1, 10)
1260 ran_optimisations = False
1261 self._switch_to_hypothesis_provider = False
1262
1263 while self.should_generate_more():
1264 # we don't yet integrate DataTree with backends. Instead of generating
1265 # a novel prefix, ask the backend for an input.
1266 if not self.using_hypothesis_backend:
1267 data = self.new_conjecture_data([])
1268 self.test_function(data)
1269 continue
1270
1271 self._current_phase = "generate"
1272 prefix = self.generate_novel_prefix()
1273 if (
1274 self.valid_examples <= small_example_cap
1275 and self.call_count <= 5 * small_example_cap
1276 and not self.interesting_examples
1277 and consecutive_zero_extend_is_invalid < 5
1278 ):
1279 minimal_example = self.cached_test_function(
1280 prefix + (ChoiceTemplate("simplest", count=None),)
1281 )
1282
1283 if minimal_example.status < Status.VALID:
1284 consecutive_zero_extend_is_invalid += 1
1285 continue
1286 # Because the Status code is greater than Status.VALID, it cannot be
1287 # Status.OVERRUN, which guarantees that the minimal_example is a
1288 # ConjectureResult object.
1289 assert isinstance(minimal_example, ConjectureResult)
1290 consecutive_zero_extend_is_invalid = 0
1291 minimal_extension = len(minimal_example.choices) - len(prefix)
1292 max_length = len(prefix) + minimal_extension * 5
1293
1294 # We could end up in a situation where even though the prefix was
1295 # novel when we generated it, because we've now tried zero extending
1296 # it not all possible continuations of it will be novel. In order to
1297 # avoid making redundant test calls, we rerun it in simulation mode
1298 # first. If this has a predictable result, then we don't bother
1299 # running the test function for real here. If however we encounter
1300 # some novel behaviour, we try again with the real test function,
1301 # starting from the new novel prefix that has discovered.
1302 trial_data = self.new_conjecture_data(prefix, max_choices=max_length)
1303 try:
1304 self.tree.simulate_test_function(trial_data)
1305 continue
1306 except PreviouslyUnseenBehaviour:
1307 pass
1308
1309 # If the simulation entered part of the tree that has been killed,
1310 # we don't want to run this.
1311 assert isinstance(trial_data.observer, TreeRecordingObserver)
1312 if trial_data.observer.killed:
1313 continue
1314
1315 # We might have hit the cap on number of examples we should
1316 # run when calculating the minimal example.
1317 if not self.should_generate_more():
1318 break
1319
1320 prefix = trial_data.choices
1321 else:
1322 max_length = None
1323
1324 data = self.new_conjecture_data(prefix, max_choices=max_length)
1325 self.test_function(data)
1326
1327 if (
1328 data.status is Status.OVERRUN
1329 and max_length is not None
1330 and "invalid because" not in data.events
1331 ):
1332 data.events["invalid because"] = (
1333 "reduced max size for early examples (avoids flaky health checks)"
1334 )
1335
1336 self.generate_mutations_from(data)
1337
1338 # Although the optimisations are logically a distinct phase, we
1339 # actually normally run them as part of example generation. The
1340 # reason for this is that we cannot guarantee that optimisation
1341 # actually exhausts our budget: It might finish running and we
1342 # discover that actually we still could run a bunch more test cases
1343 # if we want.
1344 if (
1345 self.valid_examples >= max(small_example_cap, optimise_at)
1346 and not ran_optimisations
1347 ):
1348 ran_optimisations = True
1349 self._current_phase = "target"
1350 self.optimise_targets()
1351
1352 def generate_mutations_from(self, data: ConjectureData | ConjectureResult) -> None:
1353 # A thing that is often useful but rarely happens by accident is
1354 # to generate the same value at multiple different points in the
1355 # test case.
1356 #
1357 # Rather than make this the responsibility of individual strategies
1358 # we implement a small mutator that just takes parts of the test
1359 # case with the same label and tries replacing one of them with a
1360 # copy of the other and tries running it. If we've made a good
1361 # guess about what to put where, this will run a similar generated
1362 # test case with more duplication.
1363 if (
1364 # An OVERRUN doesn't have enough information about the test
1365 # case to mutate, so we just skip those.
1366 data.status >= Status.INVALID
1367 # This has a tendency to trigger some weird edge cases during
1368 # generation so we don't let it run until we're done with the
1369 # health checks.
1370 and self.health_check_state is None
1371 ):
1372 initial_calls = self.call_count
1373 failed_mutations = 0
1374
1375 while (
1376 self.should_generate_more()
1377 # We implement fairly conservative checks for how long we
1378 # we should run mutation for, as it's generally not obvious
1379 # how helpful it is for any given test case.
1380 and self.call_count <= initial_calls + 5
1381 and failed_mutations <= 5
1382 ):
1383 groups = data.spans.mutator_groups
1384 if not groups:
1385 break
1386
1387 group = self.random.choice(groups)
1388 (start1, end1), (start2, end2) = self.random.sample(sorted(group), 2)
1389 if start1 > start2:
1390 (start1, end1), (start2, end2) = (start2, end2), (start1, end1)
1391
1392 if (
1393 start1 <= start2 <= end2 <= end1
1394 ): # pragma: no cover # flaky on conjecture-cover tests
1395 # One span entirely contains the other. The strategy is very
1396 # likely some kind of tree. e.g. we might have
1397 #
1398 # ┌─────┐
1399 # ┌─────┤ a ├──────┐
1400 # │ └─────┘ │
1401 # ┌──┴──┐ ┌──┴──┐
1402 # ┌──┤ b ├──┐ ┌──┤ c ├──┐
1403 # │ └──┬──┘ │ │ └──┬──┘ │
1404 # ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐
1405 # │ d │ │ e │ │ f │ │ g │ │ h │ │ i │
1406 # └───┘ └───┘ └───┘ └───┘ └───┘ └───┘
1407 #
1408 # where each node is drawn from the same strategy and so
1409 # has the same span label. We might have selected the spans
1410 # corresponding to the a and c nodes, which is the entire
1411 # tree and the subtree of (and including) c respectively.
1412 #
1413 # There are two possible mutations we could apply in this case:
1414 # 1. replace a with c (replace child with parent)
1415 # 2. replace c with a (replace parent with child)
1416 #
1417 # (1) results in multiple partial copies of the
1418 # parent:
1419 # ┌─────┐
1420 # ┌─────┤ a ├────────────┐
1421 # │ └─────┘ │
1422 # ┌──┴──┐ ┌─┴───┐
1423 # ┌──┤ b ├──┐ ┌─────┤ a ├──────┐
1424 # │ └──┬──┘ │ │ └─────┘ │
1425 # ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌──┴──┐ ┌──┴──┐
1426 # │ d │ │ e │ │ f │ ┌──┤ b ├──┐ ┌──┤ c ├──┐
1427 # └───┘ └───┘ └───┘ │ └──┬──┘ │ │ └──┬──┘ │
1428 # ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐
1429 # │ d │ │ e │ │ f │ │ g │ │ h │ │ i │
1430 # └───┘ └───┘ └───┘ └───┘ └───┘ └───┘
1431 #
1432 # While (2) results in truncating part of the parent:
1433 #
1434 # ┌─────┐
1435 # ┌──┤ c ├──┐
1436 # │ └──┬──┘ │
1437 # ┌─┴─┐ ┌─┴─┐ ┌─┴─┐
1438 # │ g │ │ h │ │ i │
1439 # └───┘ └───┘ └───┘
1440 #
1441 # (1) is the same as Example IV.4. in Nautilus (NDSS '19)
1442 # (https://wcventure.github.io/FuzzingPaper/Paper/NDSS19_Nautilus.pdf),
1443 # except we do not repeat the replacement additional times
1444 # (the paper repeats it once for a total of two copies).
1445 #
1446 # We currently only apply mutation (1), and ignore mutation
1447 # (2). The reason is that the attempt generated from (2) is
1448 # always something that Hypothesis could easily have generated
1449 # itself, by simply not making various choices. Whereas
1450 # duplicating the exact value + structure of particular choices
1451 # in (1) would have been hard for Hypothesis to generate by
1452 # chance.
1453 #
1454 # TODO: an extension of this mutation might repeat (1) on
1455 # a geometric distribution between 0 and ~10 times. We would
1456 # need to find the corresponding span to recurse on in the new
1457 # choices, probably just by using the choices index.
1458
1459 # case (1): duplicate the choices in start1:start2.
1460 attempt = data.choices[:start2] + data.choices[start1:]
1461 else:
1462 start, end = self.random.choice([(start1, end1), (start2, end2)])
1463 replacement = data.choices[start:end]
1464 # We attempt to replace both the examples with
1465 # whichever choice we made. Note that this might end
1466 # up messing up and getting the example boundaries
1467 # wrong - labels matching are only a best guess as to
1468 # whether the two are equivalent - but it doesn't
1469 # really matter. It may not achieve the desired result,
1470 # but it's still a perfectly acceptable choice sequence
1471 # to try.
1472 attempt = (
1473 data.choices[:start1]
1474 + replacement
1475 + data.choices[end1:start2]
1476 + replacement
1477 + data.choices[end2:]
1478 )
1479
1480 try:
1481 new_data = self.cached_test_function(
1482 attempt,
1483 # We set error_on_discard so that we don't end up
1484 # entering parts of the tree we consider redundant
1485 # and not worth exploring.
1486 error_on_discard=True,
1487 )
1488 except ContainsDiscard:
1489 failed_mutations += 1
1490 continue
1491
1492 if new_data is Overrun:
1493 failed_mutations += 1 # pragma: no cover # annoying case
1494 else:
1495 assert isinstance(new_data, ConjectureResult)
1496 if (
1497 new_data.status >= data.status
1498 and choices_key(data.choices) != choices_key(new_data.choices)
1499 and all(
1500 k in new_data.target_observations
1501 and new_data.target_observations[k] >= v
1502 for k, v in data.target_observations.items()
1503 )
1504 ):
1505 data = new_data
1506 failed_mutations = 0
1507 else:
1508 failed_mutations += 1
1509
1510 def optimise_targets(self) -> None:
1511 """If any target observations have been made, attempt to optimise them
1512 all."""
1513 if not self.should_optimise:
1514 return
1515 from hypothesis.internal.conjecture.optimiser import Optimiser
1516
1517 # We want to avoid running the optimiser for too long in case we hit
1518 # an unbounded target score. We start this off fairly conservatively
1519 # in case interesting examples are easy to find and then ramp it up
1520 # on an exponential schedule so we don't hamper the optimiser too much
1521 # if it needs a long time to find good enough improvements.
1522 max_improvements = 10
1523 while True:
1524 prev_calls = self.call_count
1525
1526 any_improvements = False
1527
1528 for target, data in list(self.best_examples_of_observed_targets.items()):
1529 optimiser = Optimiser(
1530 self, data, target, max_improvements=max_improvements
1531 )
1532 optimiser.run()
1533 if optimiser.improvements > 0:
1534 any_improvements = True
1535
1536 if self.interesting_examples:
1537 break
1538
1539 max_improvements *= 2
1540
1541 if any_improvements:
1542 continue
1543
1544 if self.best_observed_targets:
1545 self.pareto_optimise()
1546
1547 if prev_calls == self.call_count:
1548 break
1549
1550 def pareto_optimise(self) -> None:
1551 if self.pareto_front is not None:
1552 ParetoOptimiser(self).run()
1553
1554 def _run(self) -> None:
1555 # have to use the primitive provider to interpret database bits...
1556 self._switch_to_hypothesis_provider = True
1557 with self._log_phase_statistics("reuse"):
1558 self.reuse_existing_examples()
1559 # Fast path for development: If the database gave us interesting
1560 # examples from the previously stored primary key, don't try
1561 # shrinking it again as it's unlikely to work.
1562 if self.reused_previously_shrunk_test_case:
1563 self.exit_with(ExitReason.finished)
1564 # ...but we should use the supplied provider when generating...
1565 self._switch_to_hypothesis_provider = False
1566 with self._log_phase_statistics("generate"):
1567 self.generate_new_examples()
1568 # We normally run the targeting phase mixed in with the generate phase,
1569 # but if we've been asked to run it but not generation then we have to
1570 # run it explicitly on its own here.
1571 if Phase.generate not in self.settings.phases:
1572 self._current_phase = "target"
1573 self.optimise_targets()
1574 # ...and back to the primitive provider when shrinking.
1575 self._switch_to_hypothesis_provider = True
1576 with self._log_phase_statistics("shrink"):
1577 self.shrink_interesting_examples()
1578 self.exit_with(ExitReason.finished)
1579
1580 def new_conjecture_data(
1581 self,
1582 prefix: Sequence[ChoiceT | ChoiceTemplate],
1583 *,
1584 observer: DataObserver | None = None,
1585 max_choices: int | None = None,
1586 ) -> ConjectureData:
1587 provider = (
1588 HypothesisProvider if self._switch_to_hypothesis_provider else self.provider
1589 )
1590 observer = observer or self.tree.new_observer()
1591 if not self.using_hypothesis_backend:
1592 observer = DataObserver()
1593
1594 return ConjectureData(
1595 prefix=prefix,
1596 observer=observer,
1597 provider=provider,
1598 max_choices=max_choices,
1599 random=self.random,
1600 )
1601
1602 def shrink_interesting_examples(self) -> None:
1603 """If we've found interesting examples, try to replace each of them
1604 with a minimal interesting example with the same interesting_origin.
1605
1606 We may find one or more examples with a new interesting_origin
1607 during the shrink process. If so we shrink these too.
1608 """
1609 if Phase.shrink not in self.settings.phases or not self.interesting_examples:
1610 return
1611
1612 self.debug("Shrinking interesting examples")
1613 self.finish_shrinking_deadline = time.perf_counter() + MAX_SHRINKING_SECONDS
1614
1615 for prev_data in sorted(
1616 self.interesting_examples.values(), key=lambda d: sort_key(d.nodes)
1617 ):
1618 assert prev_data.status == Status.INTERESTING
1619 data = self.new_conjecture_data(prev_data.choices)
1620 self.test_function(data)
1621 if data.status != Status.INTERESTING:
1622 self.exit_with(ExitReason.flaky)
1623
1624 self.clear_secondary_key()
1625
1626 while len(self.shrunk_examples) < len(self.interesting_examples):
1627 target, example = min(
1628 (
1629 (k, v)
1630 for k, v in self.interesting_examples.items()
1631 if k not in self.shrunk_examples
1632 ),
1633 key=lambda kv: (sort_key(kv[1].nodes), shortlex(repr(kv[0]))),
1634 )
1635 self.debug(f"Shrinking {target!r}: {example.choices}")
1636
1637 if not self.settings.report_multiple_bugs:
1638 # If multi-bug reporting is disabled, we shrink our currently-minimal
1639 # failure, allowing 'slips' to any bug with a smaller minimal example.
1640 self.shrink(example, lambda d: d.status == Status.INTERESTING)
1641 return
1642
1643 def predicate(d: ConjectureResult | _Overrun) -> bool:
1644 if d.status < Status.INTERESTING:
1645 return False
1646 d = cast(ConjectureResult, d)
1647 return d.interesting_origin == target
1648
1649 self.shrink(example, predicate)
1650
1651 self.shrunk_examples.add(target)
1652
1653 def clear_secondary_key(self) -> None:
1654 if self.has_existing_examples():
1655 # If we have any smaller examples in the secondary corpus, now is
1656 # a good time to try them to see if they work as shrinks. They
1657 # probably won't, but it's worth a shot and gives us a good
1658 # opportunity to clear out the database.
1659
1660 # It's not worth trying the primary corpus because we already
1661 # tried all of those in the initial phase.
1662 corpus = sorted(
1663 self.settings.database.fetch(self.secondary_key), key=shortlex
1664 )
1665 for c in corpus:
1666 choices = choices_from_bytes(c)
1667 if choices is None:
1668 self.settings.database.delete(self.secondary_key, c)
1669 continue
1670 primary = {
1671 choices_to_bytes(v.choices)
1672 for v in self.interesting_examples.values()
1673 }
1674 if shortlex(c) > max(map(shortlex, primary)):
1675 break
1676
1677 self.cached_test_function(choices)
1678 # We unconditionally remove c from the secondary key as it
1679 # is either now primary or worse than our primary example
1680 # of this reason for interestingness.
1681 self.settings.database.delete(self.secondary_key, c)
1682
1683 def shrink(
1684 self,
1685 example: ConjectureData | ConjectureResult,
1686 predicate: ShrinkPredicateT | None = None,
1687 allow_transition: (
1688 Callable[[ConjectureData | ConjectureResult, ConjectureData], bool] | None
1689 ) = None,
1690 ) -> ConjectureData | ConjectureResult:
1691 s = self.new_shrinker(example, predicate, allow_transition)
1692 s.shrink()
1693 return s.shrink_target
1694
1695 def new_shrinker(
1696 self,
1697 example: ConjectureData | ConjectureResult,
1698 predicate: ShrinkPredicateT | None = None,
1699 allow_transition: (
1700 Callable[[ConjectureData | ConjectureResult, ConjectureData], bool] | None
1701 ) = None,
1702 ) -> Shrinker:
1703 return Shrinker(
1704 self,
1705 example,
1706 predicate,
1707 allow_transition=allow_transition,
1708 explain=Phase.explain in self.settings.phases,
1709 in_target_phase=self._current_phase == "target",
1710 )
1711
1712 def passing_choice_sequences(
1713 self, prefix: Sequence[ChoiceNode] = ()
1714 ) -> frozenset[tuple[ChoiceNode, ...]]:
1715 """Return a collection of choice sequence nodes which cause the test to pass.
1716 Optionally restrict this by a certain prefix, which is useful for explain mode.
1717 """
1718 return frozenset(
1719 cast(ConjectureResult, result).nodes
1720 for key in self.__data_cache
1721 if (result := self.__data_cache[key]).status is Status.VALID
1722 and startswith(cast(ConjectureResult, result).nodes, prefix)
1723 )
1724
1725
1726class ContainsDiscard(Exception):
1727 pass