1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
10
11import importlib
12import math
13import time
14from collections import defaultdict
15from contextlib import contextmanager
16from datetime import timedelta
17from enum import Enum
18from random import Random, getrandbits
19from typing import (
20 Any,
21 Callable,
22 Dict,
23 Final,
24 FrozenSet,
25 Generator,
26 List,
27 Literal,
28 NoReturn,
29 Optional,
30 Set,
31 Tuple,
32 Union,
33 overload,
34)
35
36import attr
37
38from hypothesis import HealthCheck, Phase, Verbosity, settings as Settings
39from hypothesis._settings import local_settings
40from hypothesis.database import ExampleDatabase
41from hypothesis.errors import InvalidArgument, StopTest
42from hypothesis.internal.cache import LRUReusedCache
43from hypothesis.internal.compat import (
44 NotRequired,
45 TypeAlias,
46 TypedDict,
47 ceil,
48 int_from_bytes,
49 override,
50)
51from hypothesis.internal.conjecture.data import (
52 AVAILABLE_PROVIDERS,
53 ConjectureData,
54 ConjectureResult,
55 DataObserver,
56 Example,
57 HypothesisProvider,
58 InterestingOrigin,
59 IRNode,
60 Overrun,
61 PrimitiveProvider,
62 Status,
63 _Overrun,
64 ir_kwargs_key,
65 ir_value_key,
66)
67from hypothesis.internal.conjecture.datatree import (
68 DataTree,
69 PreviouslyUnseenBehaviour,
70 TreeRecordingObserver,
71)
72from hypothesis.internal.conjecture.junkdrawer import clamp, ensure_free_stackframes
73from hypothesis.internal.conjecture.pareto import NO_SCORE, ParetoFront, ParetoOptimiser
74from hypothesis.internal.conjecture.shrinker import Shrinker, sort_key
75from hypothesis.internal.healthcheck import fail_health_check
76from hypothesis.reporting import base_report, report
77
78MAX_SHRINKS: Final[int] = 500
79CACHE_SIZE: Final[int] = 10000
80MUTATION_POOL_SIZE: Final[int] = 100
81MIN_TEST_CALLS: Final[int] = 10
82BUFFER_SIZE: Final[int] = 8 * 1024
83
84# If the shrinking phase takes more than five minutes, abort it early and print
85# a warning. Many CI systems will kill a build after around ten minutes with
86# no output, and appearing to hang isn't great for interactive use either -
87# showing partially-shrunk examples is better than quitting with no examples!
88# (but make it monkeypatchable, for the rare users who need to keep on shrinking)
89MAX_SHRINKING_SECONDS: Final[int] = 300
90
91Ls: TypeAlias = List["Ls | int"]
92
93
94@attr.s
95class HealthCheckState:
96 valid_examples: int = attr.ib(default=0)
97 invalid_examples: int = attr.ib(default=0)
98 overrun_examples: int = attr.ib(default=0)
99 draw_times: "defaultdict[str, List[float]]" = attr.ib(
100 factory=lambda: defaultdict(list)
101 )
102
103 @property
104 def total_draw_time(self) -> float:
105 return math.fsum(sum(self.draw_times.values(), start=[]))
106
107 def timing_report(self) -> str:
108 """Return a terminal report describing what was slow."""
109 if not self.draw_times:
110 return ""
111 width = max(len(k[len("generate:") :].strip(": ")) for k in self.draw_times)
112 out = [f"\n {'':^{width}} count | fraction | slowest draws (seconds)"]
113 args_in_order = sorted(self.draw_times.items(), key=lambda kv: -sum(kv[1]))
114 for i, (argname, times) in enumerate(args_in_order): # pragma: no branch
115 # If we have very many unique keys, which can happen due to interactive
116 # draws with computed labels, we'll skip uninformative rows.
117 if (
118 5 <= i < (len(self.draw_times) - 2)
119 and math.fsum(times) * 20 < self.total_draw_time
120 ):
121 out.append(f" (skipped {len(self.draw_times) - i} rows of fast draws)")
122 break
123 # Compute the row to report, omitting times <1ms to focus on slow draws
124 reprs = [f"{t:>6.3f}," for t in sorted(times)[-5:] if t > 5e-4]
125 desc = " ".join(([" -- "] * 5 + reprs)[-5:]).rstrip(",")
126 arg = argname[len("generate:") :].strip(": ") # removeprefix in py3.9
127 out.append(
128 f" {arg:^{width}} | {len(times):>4} | "
129 f"{math.fsum(times)/self.total_draw_time:>7.0%} | {desc}"
130 )
131 return "\n".join(out)
132
133
134class ExitReason(Enum):
135 max_examples = "settings.max_examples={s.max_examples}"
136 max_iterations = (
137 "settings.max_examples={s.max_examples}, "
138 "but < 10% of examples satisfied assumptions"
139 )
140 max_shrinks = f"shrunk example {MAX_SHRINKS} times"
141 finished = "nothing left to do"
142 flaky = "test was flaky"
143 very_slow_shrinking = "shrinking was very slow"
144
145 def describe(self, settings: Settings) -> str:
146 return self.value.format(s=settings)
147
148
149class RunIsComplete(Exception):
150 pass
151
152
153def _get_provider(backend: str) -> Union[type, PrimitiveProvider]:
154 mname, cname = AVAILABLE_PROVIDERS[backend].rsplit(".", 1)
155 provider_cls = getattr(importlib.import_module(mname), cname)
156 if provider_cls.lifetime == "test_function":
157 return provider_cls(None)
158 elif provider_cls.lifetime == "test_case":
159 return provider_cls
160 else:
161 raise InvalidArgument(
162 f"invalid lifetime {provider_cls.lifetime} for provider {provider_cls.__name__}. "
163 "Expected one of 'test_function', 'test_case'."
164 )
165
166
167class CallStats(TypedDict):
168 status: str
169 runtime: float
170 drawtime: float
171 gctime: float
172 events: List[str]
173
174
175PhaseStatistics = TypedDict(
176 "PhaseStatistics",
177 {
178 "duration-seconds": float,
179 "test-cases": List[CallStats],
180 "distinct-failures": int,
181 "shrinks-successful": int,
182 },
183)
184StatisticsDict = TypedDict(
185 "StatisticsDict",
186 {
187 "generate-phase": NotRequired[PhaseStatistics],
188 "reuse-phase": NotRequired[PhaseStatistics],
189 "shrink-phase": NotRequired[PhaseStatistics],
190 "stopped-because": NotRequired[str],
191 "targets": NotRequired[Dict[str, float]],
192 },
193)
194
195
196class ConjectureRunner:
197 def __init__(
198 self,
199 test_function: Callable[[ConjectureData], None],
200 *,
201 settings: Optional[Settings] = None,
202 random: Optional[Random] = None,
203 database_key: Optional[bytes] = None,
204 ignore_limits: bool = False,
205 ) -> None:
206 self._test_function: Callable[[ConjectureData], None] = test_function
207 self.settings: Settings = settings or Settings()
208 self.shrinks: int = 0
209 self.finish_shrinking_deadline: Optional[float] = None
210 self.call_count: int = 0
211 self.misaligned_count: int = 0
212 self.valid_examples: int = 0
213 self.random: Random = random or Random(getrandbits(128))
214 self.database_key: Optional[bytes] = database_key
215 self.ignore_limits: bool = ignore_limits
216
217 # Global dict of per-phase statistics, and a list of per-call stats
218 # which transfer to the global dict at the end of each phase.
219 self._current_phase: str = "(not a phase)"
220 self.statistics: StatisticsDict = {}
221 self.stats_per_test_case: List[CallStats] = []
222
223 # At runtime, the keys are only ever type `InterestingOrigin`, but can be `None` during tests.
224 self.interesting_examples: Dict[InterestingOrigin, ConjectureResult] = {}
225 # We use call_count because there may be few possible valid_examples.
226 self.first_bug_found_at: Optional[int] = None
227 self.last_bug_found_at: Optional[int] = None
228
229 # At runtime, the keys are only ever type `InterestingOrigin`, but can be `None` during tests.
230 self.shrunk_examples: Set[Optional[InterestingOrigin]] = set()
231
232 self.health_check_state: Optional[HealthCheckState] = None
233
234 self.tree: DataTree = DataTree()
235
236 self.provider: Union[type, PrimitiveProvider] = _get_provider(
237 self.settings.backend
238 )
239
240 self.best_observed_targets: "defaultdict[str, float]" = defaultdict(
241 lambda: NO_SCORE
242 )
243 self.best_examples_of_observed_targets: Dict[str, ConjectureResult] = {}
244
245 # We keep the pareto front in the example database if we have one. This
246 # is only marginally useful at present, but speeds up local development
247 # because it means that large targets will be quickly surfaced in your
248 # testing.
249 self.pareto_front: Optional[ParetoFront] = None
250 if self.database_key is not None and self.settings.database is not None:
251 self.pareto_front = ParetoFront(self.random)
252 self.pareto_front.on_evict(self.on_pareto_evict)
253
254 # We want to be able to get the ConjectureData object that results
255 # from running a buffer without recalculating, especially during
256 # shrinking where we need to know about the structure of the
257 # executed test case.
258 self.__data_cache = LRUReusedCache(CACHE_SIZE)
259 self.__data_cache_ir = LRUReusedCache(CACHE_SIZE)
260
261 self.__pending_call_explanation: Optional[str] = None
262 self._switch_to_hypothesis_provider: bool = False
263
264 def explain_next_call_as(self, explanation: str) -> None:
265 self.__pending_call_explanation = explanation
266
267 def clear_call_explanation(self) -> None:
268 self.__pending_call_explanation = None
269
270 @contextmanager
271 def _log_phase_statistics(
272 self, phase: Literal["reuse", "generate", "shrink"]
273 ) -> Generator[None, None, None]:
274 self.stats_per_test_case.clear()
275 start_time = time.perf_counter()
276 try:
277 self._current_phase = phase
278 yield
279 finally:
280 # We ignore the mypy type error here. Because `phase` is a string literal and "-phase" is a string literal
281 # as well, the concatenation will always be valid key in the dictionary.
282 self.statistics[phase + "-phase"] = { # type: ignore
283 "duration-seconds": time.perf_counter() - start_time,
284 "test-cases": list(self.stats_per_test_case),
285 "distinct-failures": len(self.interesting_examples),
286 "shrinks-successful": self.shrinks,
287 }
288
289 @property
290 def should_optimise(self) -> bool:
291 return Phase.target in self.settings.phases
292
293 def __tree_is_exhausted(self) -> bool:
294 return self.tree.is_exhausted and self.settings.backend == "hypothesis"
295
296 def __stoppable_test_function(self, data: ConjectureData) -> None:
297 """Run ``self._test_function``, but convert a ``StopTest`` exception
298 into a normal return and avoid raising Flaky for RecursionErrors.
299 """
300 # We ensure that the test has this much stack space remaining, no
301 # matter the size of the stack when called, to de-flake RecursionErrors
302 # (#2494, #3671). Note, this covers the data generation part of the test;
303 # the actual test execution is additionally protected at the call site
304 # in hypothesis.core.execute_once.
305 with ensure_free_stackframes():
306 try:
307 self._test_function(data)
308 except StopTest as e:
309 if e.testcounter == data.testcounter:
310 # This StopTest has successfully stopped its test, and can now
311 # be discarded.
312 pass
313 else:
314 # This StopTest was raised by a different ConjectureData. We
315 # need to re-raise it so that it will eventually reach the
316 # correct engine.
317 raise
318
319 def _cache_key_ir(
320 self,
321 *,
322 nodes: Optional[List[IRNode]] = None,
323 data: Union[ConjectureData, ConjectureResult, None] = None,
324 ) -> Tuple[Tuple[Any, ...], ...]:
325 assert (nodes is not None) ^ (data is not None)
326 extension = []
327 if data is not None:
328 nodes = data.examples.ir_tree_nodes
329 if data.invalid_at is not None:
330 # if we're invalid then we should have at least one node left (the invalid one).
331 assert isinstance(data, ConjectureData)
332 assert data.ir_tree_nodes is not None
333 assert data._node_index < len(data.ir_tree_nodes)
334 extension = [data.ir_tree_nodes[data._node_index]]
335 assert nodes is not None
336
337 # intentionally drop was_forced from equality here, because the was_forced
338 # of node prefixes on ConjectureData has no impact on that data's result
339 return tuple(
340 (
341 node.ir_type,
342 ir_value_key(node.ir_type, node.value),
343 ir_kwargs_key(node.ir_type, node.kwargs),
344 )
345 for node in nodes + extension
346 )
347
348 def _cache(self, data: ConjectureData) -> None:
349 result = data.as_result()
350 # when we shrink, we try out of bounds things, which can lead to the same
351 # data.buffer having multiple outcomes. eg data.buffer=b'' is Status.OVERRUN
352 # in normal circumstances, but a data with
353 # ir_nodes=[integer -5 {min_value: 0, max_value: 10}] will also have
354 # data.buffer=b'' but will be Status.INVALID instead. We do not want to
355 # change the cached value to INVALID in this case.
356 #
357 # We handle this specially for the ir cache by keying off the misaligned node
358 # as well, but we cannot do the same for buffers as we do not know ahead of
359 # time what buffer a node maps to. I think it's largely fine that we don't
360 # write to the buffer cache here as we move more things to the ir cache.
361 if data.invalid_at is None:
362 self.__data_cache[data.buffer] = result
363
364 # interesting buffer-based data can mislead the shrinker if we cache them.
365 #
366 # @given(st.integers())
367 # def f(n):
368 # assert n < 100
369 #
370 # may generate two counterexamples, n=101 and n=m > 101, in that order,
371 # where the buffer corresponding to n is large due to eg failed probes.
372 # We shrink m and eventually try n=101, but it is cached to a large buffer
373 # and so the best we can do is n=102, a non-ideal shrink.
374 #
375 # We can cache ir-based buffers fine, which always correspond to the
376 # smallest buffer via forced=. The overhead here is small because almost
377 # all interesting data are ir-based via the shrinker (and that overhead
378 # will tend towards zero as we move generation to the ir).
379 if data.ir_tree_nodes is not None or data.status < Status.INTERESTING:
380 key = self._cache_key_ir(data=data)
381 self.__data_cache_ir[key] = result
382
383 def cached_test_function_ir(
384 self, nodes: List[IRNode], *, error_on_discard: bool = False
385 ) -> Union[ConjectureResult, _Overrun]:
386 key = self._cache_key_ir(nodes=nodes)
387 try:
388 return self.__data_cache_ir[key]
389 except KeyError:
390 pass
391
392 # explicitly use a no-op DataObserver here instead of a TreeRecordingObserver.
393 # The reason is we don't expect simulate_test_function to explore new choices
394 # and write back to the tree, so we don't want the overhead of the
395 # TreeRecordingObserver tracking those calls.
396 trial_observer: Optional[DataObserver] = DataObserver()
397 if error_on_discard:
398
399 class DiscardObserver(DataObserver):
400 @override
401 def kill_branch(self) -> NoReturn:
402 raise ContainsDiscard
403
404 trial_observer = DiscardObserver()
405
406 try:
407 trial_data = self.new_conjecture_data_ir(nodes, observer=trial_observer)
408 self.tree.simulate_test_function(trial_data)
409 except PreviouslyUnseenBehaviour:
410 pass
411 else:
412 trial_data.freeze()
413 key = self._cache_key_ir(data=trial_data)
414 try:
415 return self.__data_cache_ir[key]
416 except KeyError:
417 pass
418
419 data = self.new_conjecture_data_ir(nodes)
420 # note that calling test_function caches `data` for us, for both an ir
421 # tree key and a buffer key.
422 self.test_function(data)
423 return data.as_result()
424
425 def test_function(self, data: ConjectureData) -> None:
426 if self.__pending_call_explanation is not None:
427 self.debug(self.__pending_call_explanation)
428 self.__pending_call_explanation = None
429
430 self.call_count += 1
431
432 interrupted = False
433 try:
434 self.__stoppable_test_function(data)
435 except KeyboardInterrupt:
436 interrupted = True
437 raise
438 except BaseException:
439 self.save_buffer(data.buffer)
440 raise
441 finally:
442 # No branch, because if we're interrupted we always raise
443 # the KeyboardInterrupt, never continue to the code below.
444 if not interrupted: # pragma: no branch
445 data.freeze()
446 call_stats: CallStats = {
447 "status": data.status.name.lower(),
448 "runtime": data.finish_time - data.start_time,
449 "drawtime": math.fsum(data.draw_times.values()),
450 "gctime": data.gc_finish_time - data.gc_start_time,
451 "events": sorted(
452 k if v == "" else f"{k}: {v}" for k, v in data.events.items()
453 ),
454 }
455 self.stats_per_test_case.append(call_stats)
456 self._cache(data)
457 if data.invalid_at is not None: # pragma: no branch # coverage bug?
458 self.misaligned_count += 1
459
460 self.debug_data(data)
461
462 if (
463 data.target_observations
464 and self.pareto_front is not None
465 and self.pareto_front.add(data.as_result())
466 ):
467 self.save_buffer(data.buffer, sub_key=b"pareto")
468
469 assert len(data.buffer) <= BUFFER_SIZE
470
471 if data.status >= Status.VALID:
472 for k, v in data.target_observations.items():
473 self.best_observed_targets[k] = max(self.best_observed_targets[k], v)
474
475 if k not in self.best_examples_of_observed_targets:
476 data_as_result = data.as_result()
477 assert not isinstance(data_as_result, _Overrun)
478 self.best_examples_of_observed_targets[k] = data_as_result
479 continue
480
481 existing_example = self.best_examples_of_observed_targets[k]
482 existing_score = existing_example.target_observations[k]
483
484 if v < existing_score:
485 continue
486
487 if v > existing_score or sort_key(data.buffer) < sort_key(
488 existing_example.buffer
489 ):
490 data_as_result = data.as_result()
491 assert not isinstance(data_as_result, _Overrun)
492 self.best_examples_of_observed_targets[k] = data_as_result
493
494 if data.status == Status.VALID:
495 self.valid_examples += 1
496
497 if data.status == Status.INTERESTING:
498 if self.settings.backend != "hypothesis":
499 for node in data.examples.ir_tree_nodes:
500 value = data.provider.post_test_case_hook(node.value)
501 # require providers to return something valid here.
502 assert (
503 value is not None
504 ), "providers must return a non-null value from post_test_case_hook"
505 node.value = value
506
507 # drive the ir tree through the test function to convert it
508 # to a buffer
509 data = ConjectureData.for_ir_tree(data.examples.ir_tree_nodes)
510 self.__stoppable_test_function(data)
511 self._cache(data)
512
513 key = data.interesting_origin
514 changed = False
515 try:
516 existing = self.interesting_examples[key] # type: ignore
517 except KeyError:
518 changed = True
519 self.last_bug_found_at = self.call_count
520 if self.first_bug_found_at is None:
521 self.first_bug_found_at = self.call_count
522 else:
523 if sort_key(data.buffer) < sort_key(existing.buffer):
524 self.shrinks += 1
525 self.downgrade_buffer(existing.buffer)
526 self.__data_cache.unpin(existing.buffer)
527 changed = True
528
529 if changed:
530 self.save_buffer(data.buffer)
531 self.interesting_examples[key] = data.as_result() # type: ignore
532 self.__data_cache.pin(data.buffer)
533 self.shrunk_examples.discard(key)
534
535 if self.shrinks >= MAX_SHRINKS:
536 self.exit_with(ExitReason.max_shrinks)
537
538 if (
539 not self.ignore_limits
540 and self.finish_shrinking_deadline is not None
541 and self.finish_shrinking_deadline < time.perf_counter()
542 ):
543 # See https://github.com/HypothesisWorks/hypothesis/issues/2340
544 report(
545 "WARNING: Hypothesis has spent more than five minutes working to shrink"
546 " a failing example, and stopped because it is making very slow"
547 " progress. When you re-run your tests, shrinking will resume and may"
548 " take this long before aborting again.\nPLEASE REPORT THIS if you can"
549 " provide a reproducing example, so that we can improve shrinking"
550 " performance for everyone."
551 )
552 self.exit_with(ExitReason.very_slow_shrinking)
553
554 if not self.interesting_examples:
555 # Note that this logic is reproduced to end the generation phase when
556 # we have interesting examples. Update that too if you change this!
557 # (The doubled implementation is because here we exit the engine entirely,
558 # while in the other case below we just want to move on to shrinking.)
559 if self.valid_examples >= self.settings.max_examples:
560 self.exit_with(ExitReason.max_examples)
561 if self.call_count >= max(
562 self.settings.max_examples * 10,
563 # We have a high-ish default max iterations, so that tests
564 # don't become flaky when max_examples is too low.
565 1000,
566 ):
567 self.exit_with(ExitReason.max_iterations)
568
569 if self.__tree_is_exhausted():
570 self.exit_with(ExitReason.finished)
571
572 self.record_for_health_check(data)
573
574 def on_pareto_evict(self, data: ConjectureData) -> None:
575 self.settings.database.delete(self.pareto_key, data.buffer)
576
577 def generate_novel_prefix(self) -> bytes:
578 """Uses the tree to proactively generate a starting sequence of bytes
579 that we haven't explored yet for this test.
580
581 When this method is called, we assume that there must be at
582 least one novel prefix left to find. If there were not, then the
583 test run should have already stopped due to tree exhaustion.
584 """
585 return self.tree.generate_novel_prefix(self.random)
586
587 def record_for_health_check(self, data: ConjectureData) -> None:
588 # Once we've actually found a bug, there's no point in trying to run
589 # health checks - they'll just mask the actually important information.
590 if data.status == Status.INTERESTING:
591 self.health_check_state = None
592
593 state = self.health_check_state
594
595 if state is None:
596 return
597
598 for k, v in data.draw_times.items():
599 state.draw_times[k].append(v)
600
601 if data.status == Status.VALID:
602 state.valid_examples += 1
603 elif data.status == Status.INVALID:
604 state.invalid_examples += 1
605 else:
606 assert data.status == Status.OVERRUN
607 state.overrun_examples += 1
608
609 max_valid_draws = 10
610 max_invalid_draws = 50
611 max_overrun_draws = 20
612
613 assert state.valid_examples <= max_valid_draws
614
615 if state.valid_examples == max_valid_draws:
616 self.health_check_state = None
617 return
618
619 if state.overrun_examples == max_overrun_draws:
620 fail_health_check(
621 self.settings,
622 "Examples routinely exceeded the max allowable size. "
623 f"({state.overrun_examples} examples overran while generating "
624 f"{state.valid_examples} valid ones). Generating examples this large "
625 "will usually lead to bad results. You could try setting max_size "
626 "parameters on your collections and turning max_leaves down on "
627 "recursive() calls.",
628 HealthCheck.data_too_large,
629 )
630 if state.invalid_examples == max_invalid_draws:
631 fail_health_check(
632 self.settings,
633 "It looks like your strategy is filtering out a lot of data. Health "
634 f"check found {state.invalid_examples} filtered examples but only "
635 f"{state.valid_examples} good ones. This will make your tests much "
636 "slower, and also will probably distort the data generation quite a "
637 "lot. You should adapt your strategy to filter less. This can also "
638 "be caused by a low max_leaves parameter in recursive() calls",
639 HealthCheck.filter_too_much,
640 )
641
642 draw_time = state.total_draw_time
643
644 # Allow at least the greater of one second or 5x the deadline. If deadline
645 # is None, allow 30s - the user can disable the healthcheck too if desired.
646 draw_time_limit = 5 * (self.settings.deadline or timedelta(seconds=6))
647 if draw_time > max(1.0, draw_time_limit.total_seconds()):
648 fail_health_check(
649 self.settings,
650 "Data generation is extremely slow: Only produced "
651 f"{state.valid_examples} valid examples in {draw_time:.2f} seconds "
652 f"({state.invalid_examples} invalid ones and {state.overrun_examples} "
653 "exceeded maximum size). Try decreasing size of the data you're "
654 "generating (with e.g. max_size or max_leaves parameters)."
655 + state.timing_report(),
656 HealthCheck.too_slow,
657 )
658
659 def save_buffer(
660 self, buffer: Union[bytes, bytearray], sub_key: Optional[bytes] = None
661 ) -> None:
662 if self.settings.database is not None:
663 key = self.sub_key(sub_key)
664 if key is None:
665 return
666 self.settings.database.save(key, bytes(buffer))
667
668 def downgrade_buffer(self, buffer: Union[bytes, bytearray]) -> None:
669 if self.settings.database is not None and self.database_key is not None:
670 self.settings.database.move(self.database_key, self.secondary_key, buffer)
671
672 def sub_key(self, sub_key: Optional[bytes]) -> Optional[bytes]:
673 if self.database_key is None:
674 return None
675 if sub_key is None:
676 return self.database_key
677 return b".".join((self.database_key, sub_key))
678
679 @property
680 def secondary_key(self) -> Optional[bytes]:
681 return self.sub_key(b"secondary")
682
683 @property
684 def pareto_key(self) -> Optional[bytes]:
685 return self.sub_key(b"pareto")
686
687 def debug(self, message: str) -> None:
688 if self.settings.verbosity >= Verbosity.debug:
689 base_report(message)
690
691 @property
692 def report_debug_info(self) -> bool:
693 return self.settings.verbosity >= Verbosity.debug
694
695 def debug_data(self, data: Union[ConjectureData, ConjectureResult]) -> None:
696 if not self.report_debug_info:
697 return
698
699 stack: List[Ls] = [[]]
700
701 def go(ex: Example) -> None:
702 if ex.length == 0:
703 return
704 if len(ex.children) == 0:
705 stack[-1].append(int_from_bytes(data.buffer[ex.start : ex.end]))
706 else:
707 node: Ls = []
708 stack.append(node)
709
710 for v in ex.children:
711 go(v)
712 stack.pop()
713 if len(node) == 1:
714 stack[-1].extend(node)
715 else:
716 stack[-1].append(node)
717
718 go(data.examples[0])
719 assert len(stack) == 1
720
721 status = repr(data.status)
722
723 if data.status == Status.INTERESTING:
724 status = f"{status} ({data.interesting_origin!r})"
725
726 self.debug(f"{data.index} bytes {stack[0]!r} -> {status}, {data.output}")
727
728 def run(self) -> None:
729 with local_settings(self.settings):
730 try:
731 self._run()
732 except RunIsComplete:
733 pass
734 for v in self.interesting_examples.values():
735 self.debug_data(v)
736 self.debug(
737 "Run complete after %d examples (%d valid) and %d shrinks"
738 % (self.call_count, self.valid_examples, self.shrinks)
739 )
740
741 @property
742 def database(self) -> Optional[ExampleDatabase]:
743 if self.database_key is None:
744 return None
745 return self.settings.database
746
747 def has_existing_examples(self) -> bool:
748 return self.database is not None and Phase.reuse in self.settings.phases
749
750 def reuse_existing_examples(self) -> None:
751 """If appropriate (we have a database and have been told to use it),
752 try to reload existing examples from the database.
753
754 If there are a lot we don't try all of them. We always try the
755 smallest example in the database (which is guaranteed to be the
756 last failure) and the largest (which is usually the seed example
757 which the last failure came from but we don't enforce that). We
758 then take a random sampling of the remainder and try those. Any
759 examples that are no longer interesting are cleared out.
760 """
761 if self.has_existing_examples():
762 self.debug("Reusing examples from database")
763 # We have to do some careful juggling here. We have two database
764 # corpora: The primary and secondary. The primary corpus is a
765 # small set of minimized examples each of which has at one point
766 # demonstrated a distinct bug. We want to retry all of these.
767
768 # We also have a secondary corpus of examples that have at some
769 # point demonstrated interestingness (currently only ones that
770 # were previously non-minimal examples of a bug, but this will
771 # likely expand in future). These are a good source of potentially
772 # interesting examples, but there are a lot of them, so we down
773 # sample the secondary corpus to a more manageable size.
774
775 corpus = sorted(
776 self.settings.database.fetch(self.database_key), key=sort_key
777 )
778 factor = 0.1 if (Phase.generate in self.settings.phases) else 1
779 desired_size = max(2, ceil(factor * self.settings.max_examples))
780
781 if len(corpus) < desired_size:
782 extra_corpus = list(self.settings.database.fetch(self.secondary_key))
783
784 shortfall = desired_size - len(corpus)
785
786 if len(extra_corpus) <= shortfall:
787 extra = extra_corpus
788 else:
789 extra = self.random.sample(extra_corpus, shortfall)
790 extra.sort(key=sort_key)
791 corpus.extend(extra)
792
793 for existing in corpus:
794 data = self.cached_test_function(existing, extend=BUFFER_SIZE)
795 if data.status != Status.INTERESTING:
796 self.settings.database.delete(self.database_key, existing)
797 self.settings.database.delete(self.secondary_key, existing)
798
799 # Because self.database is not None (because self.has_existing_examples())
800 # and self.database_key is not None (because we fetched using it above),
801 # we can guarantee self.pareto_front is not None
802 assert self.pareto_front is not None
803
804 # If we've not found any interesting examples so far we try some of
805 # the pareto front from the last run.
806 if len(corpus) < desired_size and not self.interesting_examples:
807 desired_extra = desired_size - len(corpus)
808 pareto_corpus = list(self.settings.database.fetch(self.pareto_key))
809 if len(pareto_corpus) > desired_extra:
810 pareto_corpus = self.random.sample(pareto_corpus, desired_extra)
811 pareto_corpus.sort(key=sort_key)
812
813 for existing in pareto_corpus:
814 data = self.cached_test_function(existing, extend=BUFFER_SIZE)
815 if data not in self.pareto_front:
816 self.settings.database.delete(self.pareto_key, existing)
817 if data.status == Status.INTERESTING:
818 break
819
820 def exit_with(self, reason: ExitReason) -> None:
821 if self.ignore_limits:
822 return
823 self.statistics["stopped-because"] = reason.describe(self.settings)
824 if self.best_observed_targets:
825 self.statistics["targets"] = dict(self.best_observed_targets)
826 self.debug(f"exit_with({reason.name})")
827 self.exit_reason = reason
828 raise RunIsComplete
829
830 def should_generate_more(self) -> bool:
831 # End the generation phase where we would have ended it if no bugs had
832 # been found. This reproduces the exit logic in `self.test_function`,
833 # but with the important distinction that this clause will move on to
834 # the shrinking phase having found one or more bugs, while the other
835 # will exit having found zero bugs.
836 if self.valid_examples >= self.settings.max_examples or self.call_count >= max(
837 self.settings.max_examples * 10, 1000
838 ): # pragma: no cover
839 return False
840
841 # If we haven't found a bug, keep looking - if we hit any limits on
842 # the number of tests to run that will raise an exception and stop
843 # the run.
844 if not self.interesting_examples:
845 return True
846 # Users who disable shrinking probably want to exit as fast as possible.
847 # If we've found a bug and won't report more than one, stop looking.
848 elif (
849 Phase.shrink not in self.settings.phases
850 or not self.settings.report_multiple_bugs
851 ):
852 return False
853 assert isinstance(self.first_bug_found_at, int)
854 assert isinstance(self.last_bug_found_at, int)
855 assert self.first_bug_found_at <= self.last_bug_found_at <= self.call_count
856 # Otherwise, keep searching for between ten and 'a heuristic' calls.
857 # We cap 'calls after first bug' so errors are reported reasonably
858 # soon even for tests that are allowed to run for a very long time,
859 # or sooner if the latest half of our test effort has been fruitless.
860 return self.call_count < MIN_TEST_CALLS or self.call_count < min(
861 self.first_bug_found_at + 1000, self.last_bug_found_at * 2
862 )
863
864 def generate_new_examples(self) -> None:
865 if Phase.generate not in self.settings.phases:
866 return
867 if self.interesting_examples:
868 # The example database has failing examples from a previous run,
869 # so we'd rather report that they're still failing ASAP than take
870 # the time to look for additional failures.
871 return
872
873 self.debug("Generating new examples")
874
875 assert self.should_generate_more()
876 zero_data = self.cached_test_function(bytes(BUFFER_SIZE))
877 if zero_data.status > Status.OVERRUN:
878 assert isinstance(zero_data, ConjectureResult)
879 self.__data_cache.pin(zero_data.buffer)
880
881 if zero_data.status == Status.OVERRUN or (
882 zero_data.status == Status.VALID
883 and isinstance(zero_data, ConjectureResult)
884 and len(zero_data.buffer) * 2 > BUFFER_SIZE
885 ):
886 fail_health_check(
887 self.settings,
888 "The smallest natural example for your test is extremely "
889 "large. This makes it difficult for Hypothesis to generate "
890 "good examples, especially when trying to reduce failing ones "
891 "at the end. Consider reducing the size of your data if it is "
892 "of a fixed size. You could also fix this by improving how "
893 "your data shrinks (see https://hypothesis.readthedocs.io/en/"
894 "latest/data.html#shrinking for details), or by introducing "
895 "default values inside your strategy. e.g. could you replace "
896 "some arguments with their defaults by using "
897 "one_of(none(), some_complex_strategy)?",
898 HealthCheck.large_base_example,
899 )
900
901 self.health_check_state = HealthCheckState()
902
903 # We attempt to use the size of the minimal generated test case starting
904 # from a given novel prefix as a guideline to generate smaller test
905 # cases for an initial period, by restriscting ourselves to test cases
906 # that are not much larger than it.
907 #
908 # Calculating the actual minimal generated test case is hard, so we
909 # take a best guess that zero extending a prefix produces the minimal
910 # test case starting with that prefix (this is true for our built in
911 # strategies). This is only a reasonable thing to do if the resulting
912 # test case is valid. If we regularly run into situations where it is
913 # not valid then this strategy is a waste of time, so we want to
914 # abandon it early. In order to do this we track how many times in a
915 # row it has failed to work, and abort small test case generation when
916 # it has failed too many times in a row.
917 consecutive_zero_extend_is_invalid = 0
918
919 # We control growth during initial example generation, for two
920 # reasons:
921 #
922 # * It gives us an opportunity to find small examples early, which
923 # gives us a fast path for easy to find bugs.
924 # * It avoids low probability events where we might end up
925 # generating very large examples during health checks, which
926 # on slower machines can trigger HealthCheck.too_slow.
927 #
928 # The heuristic we use is that we attempt to estimate the smallest
929 # extension of this prefix, and limit the size to no more than
930 # an order of magnitude larger than that. If we fail to estimate
931 # the size accurately, we skip over this prefix and try again.
932 #
933 # We need to tune the example size based on the initial prefix,
934 # because any fixed size might be too small, and any size based
935 # on the strategy in general can fall afoul of strategies that
936 # have very different sizes for different prefixes.
937 small_example_cap = clamp(10, self.settings.max_examples // 10, 50)
938
939 optimise_at = max(self.settings.max_examples // 2, small_example_cap + 1)
940 ran_optimisations = False
941
942 while self.should_generate_more():
943 # Unfortunately generate_novel_prefix still operates in terms of
944 # a buffer and uses HypothesisProvider as its backing provider,
945 # not whatever is specified by the backend. We can improve this
946 # once more things are on the ir.
947 if self.settings.backend != "hypothesis":
948 data = self.new_conjecture_data(prefix=b"", max_length=BUFFER_SIZE)
949 self.test_function(data)
950 continue
951
952 self._current_phase = "generate"
953 prefix = self.generate_novel_prefix()
954 assert len(prefix) <= BUFFER_SIZE
955 if (
956 self.valid_examples <= small_example_cap
957 and self.call_count <= 5 * small_example_cap
958 and not self.interesting_examples
959 and consecutive_zero_extend_is_invalid < 5
960 ):
961 minimal_example = self.cached_test_function(
962 prefix + bytes(BUFFER_SIZE - len(prefix))
963 )
964
965 if minimal_example.status < Status.VALID:
966 consecutive_zero_extend_is_invalid += 1
967 continue
968 # Because the Status code is greater than Status.VALID, it cannot be
969 # Status.OVERRUN, which guarantees that the minimal_example is a
970 # ConjectureResult object.
971 assert isinstance(minimal_example, ConjectureResult)
972
973 consecutive_zero_extend_is_invalid = 0
974
975 minimal_extension = len(minimal_example.buffer) - len(prefix)
976
977 max_length = min(len(prefix) + minimal_extension * 10, BUFFER_SIZE)
978
979 # We could end up in a situation where even though the prefix was
980 # novel when we generated it, because we've now tried zero extending
981 # it not all possible continuations of it will be novel. In order to
982 # avoid making redundant test calls, we rerun it in simulation mode
983 # first. If this has a predictable result, then we don't bother
984 # running the test function for real here. If however we encounter
985 # some novel behaviour, we try again with the real test function,
986 # starting from the new novel prefix that has discovered.
987
988 trial_data = self.new_conjecture_data(
989 prefix=prefix, max_length=max_length
990 )
991 try:
992 self.tree.simulate_test_function(trial_data)
993 continue
994 except PreviouslyUnseenBehaviour:
995 pass
996
997 # If the simulation entered part of the tree that has been killed,
998 # we don't want to run this.
999 assert isinstance(trial_data.observer, TreeRecordingObserver)
1000 if trial_data.observer.killed:
1001 continue
1002
1003 # We might have hit the cap on number of examples we should
1004 # run when calculating the minimal example.
1005 if not self.should_generate_more():
1006 break
1007
1008 prefix = trial_data.buffer
1009 else:
1010 max_length = BUFFER_SIZE
1011
1012 data = self.new_conjecture_data(prefix=prefix, max_length=max_length)
1013
1014 self.test_function(data)
1015
1016 if (
1017 data.status == Status.OVERRUN
1018 and max_length < BUFFER_SIZE
1019 and "invalid because" not in data.events
1020 ):
1021 data.events["invalid because"] = (
1022 "reduced max size for early examples (avoids flaky health checks)"
1023 )
1024
1025 self.generate_mutations_from(data)
1026
1027 # Although the optimisations are logically a distinct phase, we
1028 # actually normally run them as part of example generation. The
1029 # reason for this is that we cannot guarantee that optimisation
1030 # actually exhausts our budget: It might finish running and we
1031 # discover that actually we still could run a bunch more test cases
1032 # if we want.
1033 if (
1034 self.valid_examples >= max(small_example_cap, optimise_at)
1035 and not ran_optimisations
1036 ):
1037 ran_optimisations = True
1038 self._current_phase = "target"
1039 self.optimise_targets()
1040
1041 def generate_mutations_from(
1042 self, data: Union[ConjectureData, ConjectureResult]
1043 ) -> None:
1044 # A thing that is often useful but rarely happens by accident is
1045 # to generate the same value at multiple different points in the
1046 # test case.
1047 #
1048 # Rather than make this the responsibility of individual strategies
1049 # we implement a small mutator that just takes parts of the test
1050 # case with the same label and tries replacing one of them with a
1051 # copy of the other and tries running it. If we've made a good
1052 # guess about what to put where, this will run a similar generated
1053 # test case with more duplication.
1054 if (
1055 # An OVERRUN doesn't have enough information about the test
1056 # case to mutate, so we just skip those.
1057 data.status >= Status.INVALID
1058 # This has a tendency to trigger some weird edge cases during
1059 # generation so we don't let it run until we're done with the
1060 # health checks.
1061 and self.health_check_state is None
1062 ):
1063 initial_calls = self.call_count
1064 failed_mutations = 0
1065
1066 while (
1067 self.should_generate_more()
1068 # We implement fairly conservative checks for how long we
1069 # we should run mutation for, as it's generally not obvious
1070 # how helpful it is for any given test case.
1071 and self.call_count <= initial_calls + 5
1072 and failed_mutations <= 5
1073 ):
1074 groups = data.examples.mutator_groups
1075 if not groups:
1076 break
1077
1078 group = self.random.choice(groups)
1079
1080 (start1, end1), (start2, end2) = self.random.sample(sorted(group), 2)
1081 if (start1 <= start2 <= end2 <= end1) or (
1082 start2 <= start1 <= end1 <= end2
1083 ):
1084 # one example entirely contains the other. give up.
1085 # TODO use more intelligent mutation for containment, like
1086 # replacing child with parent or vice versa. Would allow for
1087 # recursive / subtree mutation
1088 failed_mutations += 1
1089 continue
1090
1091 if start1 > start2:
1092 (start1, end1), (start2, end2) = (start2, end2), (start1, end1)
1093 assert end1 <= start2
1094
1095 nodes = data.examples.ir_tree_nodes
1096 (start, end) = self.random.choice([(start1, end1), (start2, end2)])
1097 replacement = nodes[start:end]
1098
1099 try:
1100 # We attempt to replace both the examples with
1101 # whichever choice we made. Note that this might end
1102 # up messing up and getting the example boundaries
1103 # wrong - labels matching are only a best guess as to
1104 # whether the two are equivalent - but it doesn't
1105 # really matter. It may not achieve the desired result,
1106 # but it's still a perfectly acceptable choice sequence
1107 # to try.
1108 new_data = self.cached_test_function_ir(
1109 nodes[:start1]
1110 + replacement
1111 + nodes[end1:start2]
1112 + replacement
1113 + nodes[end2:],
1114 # We set error_on_discard so that we don't end up
1115 # entering parts of the tree we consider redundant
1116 # and not worth exploring.
1117 error_on_discard=True,
1118 )
1119 except ContainsDiscard:
1120 failed_mutations += 1
1121 continue
1122
1123 if new_data is Overrun:
1124 failed_mutations += 1 # pragma: no cover # annoying case
1125 else:
1126 assert isinstance(new_data, ConjectureResult)
1127 if (
1128 new_data.status >= data.status
1129 and data.buffer != new_data.buffer
1130 and all(
1131 k in new_data.target_observations
1132 and new_data.target_observations[k] >= v
1133 for k, v in data.target_observations.items()
1134 )
1135 ):
1136 data = new_data
1137 failed_mutations = 0
1138 else:
1139 failed_mutations += 1
1140
1141 def optimise_targets(self) -> None:
1142 """If any target observations have been made, attempt to optimise them
1143 all."""
1144 if not self.should_optimise:
1145 return
1146 from hypothesis.internal.conjecture.optimiser import Optimiser
1147
1148 # We want to avoid running the optimiser for too long in case we hit
1149 # an unbounded target score. We start this off fairly conservatively
1150 # in case interesting examples are easy to find and then ramp it up
1151 # on an exponential schedule so we don't hamper the optimiser too much
1152 # if it needs a long time to find good enough improvements.
1153 max_improvements = 10
1154 while True:
1155 prev_calls = self.call_count
1156
1157 any_improvements = False
1158
1159 for target, data in list(self.best_examples_of_observed_targets.items()):
1160 optimiser = Optimiser(
1161 self, data, target, max_improvements=max_improvements
1162 )
1163 optimiser.run()
1164 if optimiser.improvements > 0:
1165 any_improvements = True
1166
1167 if self.interesting_examples:
1168 break
1169
1170 max_improvements *= 2
1171
1172 if any_improvements:
1173 continue
1174
1175 if self.best_observed_targets:
1176 self.pareto_optimise()
1177
1178 if prev_calls == self.call_count:
1179 break
1180
1181 def pareto_optimise(self) -> None:
1182 if self.pareto_front is not None:
1183 ParetoOptimiser(self).run()
1184
1185 def _run(self) -> None:
1186 # have to use the primitive provider to interpret database bits...
1187 self._switch_to_hypothesis_provider = True
1188 with self._log_phase_statistics("reuse"):
1189 self.reuse_existing_examples()
1190 # ...but we should use the supplied provider when generating...
1191 self._switch_to_hypothesis_provider = False
1192 with self._log_phase_statistics("generate"):
1193 self.generate_new_examples()
1194 # We normally run the targeting phase mixed in with the generate phase,
1195 # but if we've been asked to run it but not generation then we have to
1196 # run it explicitly on its own here.
1197 if Phase.generate not in self.settings.phases:
1198 self._current_phase = "target"
1199 self.optimise_targets()
1200 # ...and back to the primitive provider when shrinking.
1201 self._switch_to_hypothesis_provider = True
1202 with self._log_phase_statistics("shrink"):
1203 self.shrink_interesting_examples()
1204 self.exit_with(ExitReason.finished)
1205
1206 def new_conjecture_data_ir(
1207 self,
1208 ir_tree_prefix: List[IRNode],
1209 *,
1210 observer: Optional[DataObserver] = None,
1211 max_length: Optional[int] = None,
1212 ) -> ConjectureData:
1213 provider = (
1214 HypothesisProvider if self._switch_to_hypothesis_provider else self.provider
1215 )
1216 observer = observer or self.tree.new_observer()
1217 if self.settings.backend != "hypothesis":
1218 observer = DataObserver()
1219
1220 return ConjectureData.for_ir_tree(
1221 ir_tree_prefix, observer=observer, provider=provider, max_length=max_length
1222 )
1223
1224 def new_conjecture_data(
1225 self,
1226 prefix: Union[bytes, bytearray],
1227 max_length: int = BUFFER_SIZE,
1228 observer: Optional[DataObserver] = None,
1229 ) -> ConjectureData:
1230 provider = (
1231 HypothesisProvider if self._switch_to_hypothesis_provider else self.provider
1232 )
1233 observer = observer or self.tree.new_observer()
1234 if self.settings.backend != "hypothesis":
1235 observer = DataObserver()
1236
1237 return ConjectureData(
1238 prefix=prefix,
1239 max_length=max_length,
1240 random=self.random,
1241 observer=observer,
1242 provider=provider,
1243 )
1244
1245 def new_conjecture_data_for_buffer(
1246 self, buffer: Union[bytes, bytearray]
1247 ) -> ConjectureData:
1248 return self.new_conjecture_data(buffer, max_length=len(buffer))
1249
1250 def shrink_interesting_examples(self) -> None:
1251 """If we've found interesting examples, try to replace each of them
1252 with a minimal interesting example with the same interesting_origin.
1253
1254 We may find one or more examples with a new interesting_origin
1255 during the shrink process. If so we shrink these too.
1256 """
1257 if Phase.shrink not in self.settings.phases or not self.interesting_examples:
1258 return
1259
1260 self.debug("Shrinking interesting examples")
1261 self.finish_shrinking_deadline = time.perf_counter() + MAX_SHRINKING_SECONDS
1262
1263 for prev_data in sorted(
1264 self.interesting_examples.values(), key=lambda d: sort_key(d.buffer)
1265 ):
1266 assert prev_data.status == Status.INTERESTING
1267 data = self.new_conjecture_data_ir(prev_data.examples.ir_tree_nodes)
1268 self.test_function(data)
1269 if data.status != Status.INTERESTING:
1270 self.exit_with(ExitReason.flaky)
1271
1272 self.clear_secondary_key()
1273
1274 while len(self.shrunk_examples) < len(self.interesting_examples):
1275 target, example = min(
1276 (
1277 (k, v)
1278 for k, v in self.interesting_examples.items()
1279 if k not in self.shrunk_examples
1280 ),
1281 key=lambda kv: (sort_key(kv[1].buffer), sort_key(repr(kv[0]))),
1282 )
1283 self.debug(f"Shrinking {target!r}")
1284
1285 if not self.settings.report_multiple_bugs:
1286 # If multi-bug reporting is disabled, we shrink our currently-minimal
1287 # failure, allowing 'slips' to any bug with a smaller minimal example.
1288 self.shrink(example, lambda d: d.status == Status.INTERESTING)
1289 return
1290
1291 def predicate(d: ConjectureData) -> bool:
1292 if d.status < Status.INTERESTING:
1293 return False
1294 return d.interesting_origin == target
1295
1296 self.shrink(example, predicate)
1297
1298 self.shrunk_examples.add(target)
1299
1300 def clear_secondary_key(self) -> None:
1301 if self.has_existing_examples():
1302 # If we have any smaller examples in the secondary corpus, now is
1303 # a good time to try them to see if they work as shrinks. They
1304 # probably won't, but it's worth a shot and gives us a good
1305 # opportunity to clear out the database.
1306
1307 # It's not worth trying the primary corpus because we already
1308 # tried all of those in the initial phase.
1309 corpus = sorted(
1310 self.settings.database.fetch(self.secondary_key), key=sort_key
1311 )
1312 for c in corpus:
1313 primary = {v.buffer for v in self.interesting_examples.values()}
1314
1315 cap = max(map(sort_key, primary))
1316
1317 if sort_key(c) > cap:
1318 break
1319 else:
1320 self.cached_test_function(c)
1321 # We unconditionally remove c from the secondary key as it
1322 # is either now primary or worse than our primary example
1323 # of this reason for interestingness.
1324 self.settings.database.delete(self.secondary_key, c)
1325
1326 def shrink(
1327 self,
1328 example: Union[ConjectureData, ConjectureResult],
1329 predicate: Optional[Callable[[ConjectureData], bool]] = None,
1330 allow_transition: Optional[
1331 Callable[[Union[ConjectureData, ConjectureResult], ConjectureData], bool]
1332 ] = None,
1333 ) -> Union[ConjectureData, ConjectureResult]:
1334 s = self.new_shrinker(example, predicate, allow_transition)
1335 s.shrink()
1336 return s.shrink_target
1337
1338 def new_shrinker(
1339 self,
1340 example: Union[ConjectureData, ConjectureResult],
1341 predicate: Optional[Callable[[ConjectureData], bool]] = None,
1342 allow_transition: Optional[
1343 Callable[[Union[ConjectureData, ConjectureResult], ConjectureData], bool]
1344 ] = None,
1345 ) -> Shrinker:
1346 return Shrinker(
1347 self,
1348 example,
1349 predicate,
1350 allow_transition=allow_transition,
1351 explain=Phase.explain in self.settings.phases,
1352 in_target_phase=self._current_phase == "target",
1353 )
1354
1355 def cached_test_function(
1356 self,
1357 buffer: Union[bytes, bytearray],
1358 *,
1359 extend: int = 0,
1360 ) -> Union[ConjectureResult, _Overrun]:
1361 """Checks the tree to see if we've tested this buffer, and returns the
1362 previous result if we have.
1363
1364 Otherwise we call through to ``test_function``, and return a
1365 fresh result.
1366
1367 If ``error_on_discard`` is set to True this will raise ``ContainsDiscard``
1368 in preference to running the actual test function. This is to allow us
1369 to skip test cases we expect to be redundant in some cases. Note that
1370 it may be the case that we don't raise ``ContainsDiscard`` even if the
1371 result has discards if we cannot determine from previous runs whether
1372 it will have a discard.
1373 """
1374 buffer = bytes(buffer)[:BUFFER_SIZE]
1375
1376 max_length = min(BUFFER_SIZE, len(buffer) + extend)
1377
1378 @overload
1379 def check_result(result: _Overrun) -> _Overrun: ...
1380 @overload
1381 def check_result(result: ConjectureResult) -> ConjectureResult: ...
1382 def check_result(
1383 result: Union[_Overrun, ConjectureResult],
1384 ) -> Union[_Overrun, ConjectureResult]:
1385 assert result is Overrun or (
1386 isinstance(result, ConjectureResult) and result.status != Status.OVERRUN
1387 )
1388 return result
1389
1390 try:
1391 cached = check_result(self.__data_cache[buffer])
1392 if cached.status > Status.OVERRUN or extend == 0:
1393 return cached
1394 except KeyError:
1395 pass
1396
1397 observer = DataObserver()
1398 dummy_data = self.new_conjecture_data(
1399 prefix=buffer, max_length=max_length, observer=observer
1400 )
1401
1402 if self.settings.backend == "hypothesis":
1403 try:
1404 self.tree.simulate_test_function(dummy_data)
1405 except PreviouslyUnseenBehaviour:
1406 pass
1407 else:
1408 if dummy_data.status > Status.OVERRUN:
1409 dummy_data.freeze()
1410 try:
1411 return self.__data_cache[dummy_data.buffer]
1412 except KeyError:
1413 pass
1414 else:
1415 self.__data_cache[buffer] = Overrun
1416 return Overrun
1417
1418 # We didn't find a match in the tree, so we need to run the test
1419 # function normally. Note that test_function will automatically
1420 # add this to the tree so we don't need to update the cache.
1421
1422 result = None
1423
1424 data = self.new_conjecture_data(
1425 prefix=max((buffer, dummy_data.buffer), key=len), max_length=max_length
1426 )
1427 self.test_function(data)
1428 result = check_result(data.as_result())
1429 if extend == 0 or (
1430 result is not Overrun
1431 and not isinstance(result, _Overrun)
1432 and len(result.buffer) <= len(buffer)
1433 ):
1434 self.__data_cache[buffer] = result
1435 return result
1436
1437 def passing_buffers(self, prefix: bytes = b"") -> FrozenSet[bytes]:
1438 """Return a collection of bytestrings which cause the test to pass.
1439
1440 Optionally restrict this by a certain prefix, which is useful for explain mode.
1441 """
1442 return frozenset(
1443 buf
1444 for buf in self.__data_cache
1445 if buf.startswith(prefix) and self.__data_cache[buf].status == Status.VALID
1446 )
1447
1448
1449class ContainsDiscard(Exception):
1450 pass