1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
10
11import math
12import time
13from collections import defaultdict
14from collections.abc import Hashable, Iterable, Iterator, Sequence
15from enum import IntEnum
16from functools import cached_property
17from random import Random
18from typing import TYPE_CHECKING, Any, NoReturn, Optional, TypeVar, Union
19
20import attr
21
22from hypothesis.errors import (
23 CannotProceedScopeT,
24 ChoiceTooLarge,
25 Frozen,
26 InvalidArgument,
27 StopTest,
28)
29from hypothesis.internal.cache import LRUCache
30from hypothesis.internal.compat import add_note
31from hypothesis.internal.conjecture.choice import (
32 BooleanConstraints,
33 BytesConstraints,
34 ChoiceConstraintsT,
35 ChoiceNode,
36 ChoiceT,
37 ChoiceTemplate,
38 ChoiceTypeT,
39 FloatConstraints,
40 IntegerConstraints,
41 StringConstraints,
42 choice_constraints_key,
43 choice_from_index,
44 choice_permitted,
45 choices_size,
46)
47from hypothesis.internal.conjecture.junkdrawer import IntList, gc_cumulative_time
48from hypothesis.internal.conjecture.providers import (
49 COLLECTION_DEFAULT_MAX_SIZE,
50 HypothesisProvider,
51 PrimitiveProvider,
52)
53from hypothesis.internal.conjecture.utils import calc_label_from_name
54from hypothesis.internal.escalation import InterestingOrigin
55from hypothesis.internal.floats import (
56 SMALLEST_SUBNORMAL,
57 float_to_int,
58 int_to_float,
59 sign_aware_lte,
60)
61from hypothesis.internal.intervalsets import IntervalSet
62from hypothesis.reporting import debug_report
63
64if TYPE_CHECKING:
65 from typing import TypeAlias
66
67 from hypothesis.strategies import SearchStrategy
68 from hypothesis.strategies._internal.strategies import Ex
69
70
71def __getattr__(name: str) -> Any:
72 if name == "AVAILABLE_PROVIDERS":
73 from hypothesis._settings import note_deprecation
74 from hypothesis.internal.conjecture.providers import AVAILABLE_PROVIDERS
75
76 note_deprecation(
77 "hypothesis.internal.conjecture.data.AVAILABLE_PROVIDERS has been moved to "
78 "hypothesis.internal.conjecture.providers.AVAILABLE_PROVIDERS.",
79 since="2025-01-25",
80 has_codemod=False,
81 stacklevel=1,
82 )
83 return AVAILABLE_PROVIDERS
84
85 raise AttributeError(
86 f"Module 'hypothesis.internal.conjecture.data' has no attribute {name}"
87 )
88
89
90T = TypeVar("T")
91TargetObservations = dict[str, Union[int, float]]
92# index, choice_type, constraints, forced value
93MisalignedAt: "TypeAlias" = tuple[
94 int, ChoiceTypeT, ChoiceConstraintsT, Optional[ChoiceT]
95]
96
97TOP_LABEL = calc_label_from_name("top")
98
99
100class ExtraInformation:
101 """A class for holding shared state on a ``ConjectureData`` that should
102 be added to the final ``ConjectureResult``."""
103
104 def __repr__(self) -> str:
105 return "ExtraInformation({})".format(
106 ", ".join(f"{k}={v!r}" for k, v in self.__dict__.items()),
107 )
108
109 def has_information(self) -> bool:
110 return bool(self.__dict__)
111
112
113class Status(IntEnum):
114 OVERRUN = 0
115 INVALID = 1
116 VALID = 2
117 INTERESTING = 3
118
119 def __repr__(self) -> str:
120 return f"Status.{self.name}"
121
122
123@attr.s(slots=True, frozen=True)
124class StructuralCoverageTag:
125 label: int = attr.ib()
126
127
128STRUCTURAL_COVERAGE_CACHE: dict[int, StructuralCoverageTag] = {}
129
130
131def structural_coverage(label: int) -> StructuralCoverageTag:
132 try:
133 return STRUCTURAL_COVERAGE_CACHE[label]
134 except KeyError:
135 return STRUCTURAL_COVERAGE_CACHE.setdefault(label, StructuralCoverageTag(label))
136
137
138# This cache can be quite hot and so we prefer LRUCache over LRUReusedCache for
139# performance. We lose scan resistance, but that's probably fine here.
140POOLED_CONSTRAINTS_CACHE: LRUCache[tuple[Any, ...], ChoiceConstraintsT] = LRUCache(4096)
141
142
143class Span:
144 """A span tracks the hierarchical structure of choices within a single test run.
145
146 Spans are created to mark regions of the choice sequence that that are
147 logically related to each other. For instance, Hypothesis tracks:
148 - A single top-level span for the entire choice sequence
149 - A span for the choices made by each strategy
150 - Some strategies define additional spans within their choices. For instance,
151 st.lists() tracks the "should add another element" choice and the "add
152 another element" choices as separate spans.
153
154 Spans provide useful information to the shrinker, mutator, targeted PBT,
155 and other subsystems of Hypothesis.
156
157 Rather than store each ``Span`` as a rich object, it is actually
158 just an index into the ``Spans`` class defined below. This has two
159 purposes: Firstly, for most properties of spans we will never need
160 to allocate storage at all, because most properties are not used on
161 most spans. Secondly, by storing the spans as compact lists
162 of integers, we save a considerable amount of space compared to
163 Python's normal object size.
164
165 This does have the downside that it increases the amount of allocation
166 we do, and slows things down as a result, in some usage patterns because
167 we repeatedly allocate the same Span or int objects, but it will
168 often dramatically reduce our memory usage, so is worth it.
169 """
170
171 __slots__ = ("index", "owner")
172
173 def __init__(self, owner: "Spans", index: int) -> None:
174 self.owner = owner
175 self.index = index
176
177 def __eq__(self, other: object) -> bool:
178 if self is other:
179 return True
180 if not isinstance(other, Span):
181 return NotImplemented
182 return (self.owner is other.owner) and (self.index == other.index)
183
184 def __ne__(self, other: object) -> bool:
185 if self is other:
186 return False
187 if not isinstance(other, Span):
188 return NotImplemented
189 return (self.owner is not other.owner) or (self.index != other.index)
190
191 def __repr__(self) -> str:
192 return f"spans[{self.index}]"
193
194 @property
195 def label(self) -> int:
196 """A label is an opaque value that associates each span with its
197 approximate origin, such as a particular strategy class or a particular
198 kind of draw."""
199 return self.owner.labels[self.owner.label_indices[self.index]]
200
201 @property
202 def parent(self) -> Optional[int]:
203 """The index of the span that this one is nested directly within."""
204 if self.index == 0:
205 return None
206 return self.owner.parentage[self.index]
207
208 @property
209 def start(self) -> int:
210 return self.owner.starts[self.index]
211
212 @property
213 def end(self) -> int:
214 return self.owner.ends[self.index]
215
216 @property
217 def depth(self) -> int:
218 """
219 Depth of this span in the span tree. The top-level span has a depth of 0.
220 """
221 return self.owner.depths[self.index]
222
223 @property
224 def discarded(self) -> bool:
225 """True if this is span's ``stop_span`` call had ``discard`` set to
226 ``True``. This means we believe that the shrinker should be able to delete
227 this span completely, without affecting the value produced by its enclosing
228 strategy. Typically set when a rejection sampler decides to reject a
229 generated value and try again."""
230 return self.index in self.owner.discarded
231
232 @property
233 def choice_count(self) -> int:
234 """The number of choices in this span."""
235 return self.end - self.start
236
237 @property
238 def children(self) -> "list[Span]":
239 """The list of all spans with this as a parent, in increasing index
240 order."""
241 return [self.owner[i] for i in self.owner.children[self.index]]
242
243
244class SpanProperty:
245 """There are many properties of spans that we calculate by
246 essentially rerunning the test case multiple times based on the
247 calls which we record in SpanProperty.
248
249 This class defines a visitor, subclasses of which can be used
250 to calculate these properties.
251 """
252
253 def __init__(self, spans: "Spans"):
254 self.span_stack: list[int] = []
255 self.spans = spans
256 self.span_count = 0
257 self.choice_count = 0
258
259 def run(self) -> Any:
260 """Rerun the test case with this visitor and return the
261 results of ``self.finish()``."""
262 for record in self.spans.trail:
263 if record == TrailType.CHOICE:
264 self.choice_count += 1
265 elif record >= TrailType.START_SPAN:
266 self.__push(record - TrailType.START_SPAN)
267 else:
268 assert record in (
269 TrailType.STOP_SPAN_DISCARD,
270 TrailType.STOP_SPAN_NO_DISCARD,
271 )
272 self.__pop(discarded=record == TrailType.STOP_SPAN_DISCARD)
273 return self.finish()
274
275 def __push(self, label_index: int) -> None:
276 i = self.span_count
277 assert i < len(self.spans)
278 self.start_span(i, label_index=label_index)
279 self.span_count += 1
280 self.span_stack.append(i)
281
282 def __pop(self, *, discarded: bool) -> None:
283 i = self.span_stack.pop()
284 self.stop_span(i, discarded=discarded)
285
286 def start_span(self, i: int, label_index: int) -> None:
287 """Called at the start of each span, with ``i`` the
288 index of the span and ``label_index`` the index of
289 its label in ``self.spans.labels``."""
290
291 def stop_span(self, i: int, *, discarded: bool) -> None:
292 """Called at the end of each span, with ``i`` the
293 index of the span and ``discarded`` being ``True`` if ``stop_span``
294 was called with ``discard=True``."""
295
296 def finish(self) -> Any:
297 raise NotImplementedError
298
299
300class TrailType(IntEnum):
301 STOP_SPAN_DISCARD = 1
302 STOP_SPAN_NO_DISCARD = 2
303 START_SPAN = 3
304 CHOICE = calc_label_from_name("ir draw record")
305
306
307class SpanRecord:
308 """Records the series of ``start_span``, ``stop_span``, and
309 ``draw_bits`` calls so that these may be stored in ``Spans`` and
310 replayed when we need to know about the structure of individual
311 ``Span`` objects.
312
313 Note that there is significant similarity between this class and
314 ``DataObserver``, and the plan is to eventually unify them, but
315 they currently have slightly different functions and implementations.
316 """
317
318 def __init__(self) -> None:
319 self.labels: list[int] = []
320 self.__index_of_labels: Optional[dict[int, int]] = {}
321 self.trail = IntList()
322 self.nodes: list[ChoiceNode] = []
323
324 def freeze(self) -> None:
325 self.__index_of_labels = None
326
327 def record_choice(self) -> None:
328 self.trail.append(TrailType.CHOICE)
329
330 def start_span(self, label: int) -> None:
331 assert self.__index_of_labels is not None
332 try:
333 i = self.__index_of_labels[label]
334 except KeyError:
335 i = self.__index_of_labels.setdefault(label, len(self.labels))
336 self.labels.append(label)
337 self.trail.append(TrailType.START_SPAN + i)
338
339 def stop_span(self, *, discard: bool) -> None:
340 if discard:
341 self.trail.append(TrailType.STOP_SPAN_DISCARD)
342 else:
343 self.trail.append(TrailType.STOP_SPAN_NO_DISCARD)
344
345
346class _starts_and_ends(SpanProperty):
347 def __init__(self, spans: "Spans") -> None:
348 super().__init__(spans)
349 self.starts = IntList.of_length(len(self.spans))
350 self.ends = IntList.of_length(len(self.spans))
351
352 def start_span(self, i: int, label_index: int) -> None:
353 self.starts[i] = self.choice_count
354
355 def stop_span(self, i: int, *, discarded: bool) -> None:
356 self.ends[i] = self.choice_count
357
358 def finish(self) -> tuple[IntList, IntList]:
359 return (self.starts, self.ends)
360
361
362class _discarded(SpanProperty):
363 def __init__(self, spans: "Spans") -> None:
364 super().__init__(spans)
365 self.result: set[int] = set()
366
367 def finish(self) -> frozenset[int]:
368 return frozenset(self.result)
369
370 def stop_span(self, i: int, *, discarded: bool) -> None:
371 if discarded:
372 self.result.add(i)
373
374
375class _parentage(SpanProperty):
376 def __init__(self, spans: "Spans") -> None:
377 super().__init__(spans)
378 self.result = IntList.of_length(len(self.spans))
379
380 def stop_span(self, i: int, *, discarded: bool) -> None:
381 if i > 0:
382 self.result[i] = self.span_stack[-1]
383
384 def finish(self) -> IntList:
385 return self.result
386
387
388class _depths(SpanProperty):
389 def __init__(self, spans: "Spans") -> None:
390 super().__init__(spans)
391 self.result = IntList.of_length(len(self.spans))
392
393 def start_span(self, i: int, label_index: int) -> None:
394 self.result[i] = len(self.span_stack)
395
396 def finish(self) -> IntList:
397 return self.result
398
399
400class _label_indices(SpanProperty):
401 def __init__(self, spans: "Spans") -> None:
402 super().__init__(spans)
403 self.result = IntList.of_length(len(self.spans))
404
405 def start_span(self, i: int, label_index: int) -> None:
406 self.result[i] = label_index
407
408 def finish(self) -> IntList:
409 return self.result
410
411
412class _mutator_groups(SpanProperty):
413 def __init__(self, spans: "Spans") -> None:
414 super().__init__(spans)
415 self.groups: dict[int, set[tuple[int, int]]] = defaultdict(set)
416
417 def start_span(self, i: int, label_index: int) -> None:
418 # TODO should we discard start == end cases? occurs for eg st.data()
419 # which is conditionally or never drawn from. arguably swapping
420 # nodes with the empty list is a useful mutation enabled by start == end?
421 key = (self.spans[i].start, self.spans[i].end)
422 self.groups[label_index].add(key)
423
424 def finish(self) -> Iterable[set[tuple[int, int]]]:
425 # Discard groups with only one span, since the mutator can't
426 # do anything useful with them.
427 return [g for g in self.groups.values() if len(g) >= 2]
428
429
430class Spans:
431 """A lazy collection of ``Span`` objects, derived from
432 the record of recorded behaviour in ``SpanRecord``.
433
434 Behaves logically as if it were a list of ``Span`` objects,
435 but actually mostly exists as a compact store of information
436 for them to reference into. All properties on here are best
437 understood as the backing storage for ``Span`` and are
438 described there.
439 """
440
441 def __init__(self, record: SpanRecord) -> None:
442 self.trail = record.trail
443 self.labels = record.labels
444 self.__length = self.trail.count(
445 TrailType.STOP_SPAN_DISCARD
446 ) + record.trail.count(TrailType.STOP_SPAN_NO_DISCARD)
447 self.__children: Optional[list[Sequence[int]]] = None
448
449 @cached_property
450 def starts_and_ends(self) -> tuple[IntList, IntList]:
451 return _starts_and_ends(self).run()
452
453 @property
454 def starts(self) -> IntList:
455 return self.starts_and_ends[0]
456
457 @property
458 def ends(self) -> IntList:
459 return self.starts_and_ends[1]
460
461 @cached_property
462 def discarded(self) -> frozenset[int]:
463 return _discarded(self).run()
464
465 @cached_property
466 def parentage(self) -> IntList:
467 return _parentage(self).run()
468
469 @cached_property
470 def depths(self) -> IntList:
471 return _depths(self).run()
472
473 @cached_property
474 def label_indices(self) -> IntList:
475 return _label_indices(self).run()
476
477 @cached_property
478 def mutator_groups(self) -> list[set[tuple[int, int]]]:
479 return _mutator_groups(self).run()
480
481 @property
482 def children(self) -> list[Sequence[int]]:
483 if self.__children is None:
484 children = [IntList() for _ in range(len(self))]
485 for i, p in enumerate(self.parentage):
486 if i > 0:
487 children[p].append(i)
488 # Replace empty children lists with a tuple to reduce
489 # memory usage.
490 for i, c in enumerate(children):
491 if not c:
492 children[i] = () # type: ignore
493 self.__children = children # type: ignore
494 return self.__children # type: ignore
495
496 def __len__(self) -> int:
497 return self.__length
498
499 def __getitem__(self, i: int) -> Span:
500 n = self.__length
501 if i < -n or i >= n:
502 raise IndexError(f"Index {i} out of range [-{n}, {n})")
503 if i < 0:
504 i += n
505 return Span(self, i)
506
507 # not strictly necessary as we have len/getitem, but required for mypy.
508 # https://github.com/python/mypy/issues/9737
509 def __iter__(self) -> Iterator[Span]:
510 for i in range(len(self)):
511 yield self[i]
512
513
514class _Overrun:
515 status: Status = Status.OVERRUN
516
517 def __repr__(self) -> str:
518 return "Overrun"
519
520
521Overrun = _Overrun()
522
523global_test_counter = 0
524
525
526MAX_DEPTH = 100
527
528
529class DataObserver:
530 """Observer class for recording the behaviour of a
531 ConjectureData object, primarily used for tracking
532 the behaviour in the tree cache."""
533
534 def conclude_test(
535 self,
536 status: Status,
537 interesting_origin: Optional[InterestingOrigin],
538 ) -> None:
539 """Called when ``conclude_test`` is called on the
540 observed ``ConjectureData``, with the same arguments.
541
542 Note that this is called after ``freeze`` has completed.
543 """
544
545 def kill_branch(self) -> None:
546 """Mark this part of the tree as not worth re-exploring."""
547
548 def draw_integer(
549 self, value: int, *, constraints: IntegerConstraints, was_forced: bool
550 ) -> None:
551 pass
552
553 def draw_float(
554 self, value: float, *, constraints: FloatConstraints, was_forced: bool
555 ) -> None:
556 pass
557
558 def draw_string(
559 self, value: str, *, constraints: StringConstraints, was_forced: bool
560 ) -> None:
561 pass
562
563 def draw_bytes(
564 self, value: bytes, *, constraints: BytesConstraints, was_forced: bool
565 ) -> None:
566 pass
567
568 def draw_boolean(
569 self, value: bool, *, constraints: BooleanConstraints, was_forced: bool
570 ) -> None:
571 pass
572
573
574@attr.s(slots=True)
575class ConjectureResult:
576 """Result class storing the parts of ConjectureData that we
577 will care about after the original ConjectureData has outlived its
578 usefulness."""
579
580 status: Status = attr.ib()
581 interesting_origin: Optional[InterestingOrigin] = attr.ib()
582 nodes: tuple[ChoiceNode, ...] = attr.ib(eq=False, repr=False)
583 length: int = attr.ib()
584 output: str = attr.ib()
585 extra_information: Optional[ExtraInformation] = attr.ib()
586 expected_exception: Optional[BaseException] = attr.ib()
587 expected_traceback: Optional[str] = attr.ib()
588 has_discards: bool = attr.ib()
589 target_observations: TargetObservations = attr.ib()
590 tags: frozenset[StructuralCoverageTag] = attr.ib()
591 spans: Spans = attr.ib(repr=False, eq=False)
592 arg_slices: set[tuple[int, int]] = attr.ib(repr=False)
593 slice_comments: dict[tuple[int, int], str] = attr.ib(repr=False)
594 misaligned_at: Optional[MisalignedAt] = attr.ib(repr=False)
595 cannot_proceed_scope: Optional[CannotProceedScopeT] = attr.ib(repr=False)
596
597 def as_result(self) -> "ConjectureResult":
598 return self
599
600 @property
601 def choices(self) -> tuple[ChoiceT, ...]:
602 return tuple(node.value for node in self.nodes)
603
604
605class ConjectureData:
606 @classmethod
607 def for_choices(
608 cls,
609 choices: Sequence[Union[ChoiceTemplate, ChoiceT]],
610 *,
611 observer: Optional[DataObserver] = None,
612 provider: Union[type, PrimitiveProvider] = HypothesisProvider,
613 random: Optional[Random] = None,
614 ) -> "ConjectureData":
615 from hypothesis.internal.conjecture.engine import choice_count
616
617 return cls(
618 max_choices=choice_count(choices),
619 random=random,
620 prefix=choices,
621 observer=observer,
622 provider=provider,
623 )
624
625 def __init__(
626 self,
627 *,
628 random: Optional[Random],
629 observer: Optional[DataObserver] = None,
630 provider: Union[type, PrimitiveProvider] = HypothesisProvider,
631 prefix: Optional[Sequence[Union[ChoiceTemplate, ChoiceT]]] = None,
632 max_choices: Optional[int] = None,
633 provider_kw: Optional[dict[str, Any]] = None,
634 ) -> None:
635 from hypothesis.internal.conjecture.engine import BUFFER_SIZE
636
637 if observer is None:
638 observer = DataObserver()
639 if provider_kw is None:
640 provider_kw = {}
641 elif not isinstance(provider, type):
642 raise InvalidArgument(
643 f"Expected {provider=} to be a class since {provider_kw=} was "
644 "passed, but got an instance instead."
645 )
646
647 assert isinstance(observer, DataObserver)
648 self.observer = observer
649 self.max_choices = max_choices
650 self.max_length = BUFFER_SIZE
651 self.is_find = False
652 self.overdraw = 0
653 self._random = random
654
655 self.length = 0
656 self.index = 0
657 self.output = ""
658 self.status = Status.VALID
659 self.frozen = False
660 global global_test_counter
661 self.testcounter = global_test_counter
662 global_test_counter += 1
663 self.start_time = time.perf_counter()
664 self.gc_start_time = gc_cumulative_time()
665 self.events: dict[str, Union[str, int, float]] = {}
666 self.interesting_origin: Optional[InterestingOrigin] = None
667 self.draw_times: dict[str, float] = {}
668 self._stateful_run_times: dict[str, float] = defaultdict(float)
669 self.max_depth = 0
670 self.has_discards = False
671
672 self.provider: PrimitiveProvider = (
673 provider(self, **provider_kw) if isinstance(provider, type) else provider
674 )
675 assert isinstance(self.provider, PrimitiveProvider)
676
677 self.__result: Optional[ConjectureResult] = None
678
679 # Observations used for targeted search. They'll be aggregated in
680 # ConjectureRunner.generate_new_examples and fed to TargetSelector.
681 self.target_observations: TargetObservations = {}
682
683 # Tags which indicate something about which part of the search space
684 # this example is in. These are used to guide generation.
685 self.tags: set[StructuralCoverageTag] = set()
686 self.labels_for_structure_stack: list[set[int]] = []
687
688 # Normally unpopulated but we need this in the niche case
689 # that self.as_result() is Overrun but we still want the
690 # examples for reporting purposes.
691 self.__spans: Optional[Spans] = None
692
693 # We want the top level span to have depth 0, so we start
694 # at -1.
695 self.depth = -1
696 self.__span_record = SpanRecord()
697
698 # Slice indices for discrete reportable parts that which-parts-matter can
699 # try varying, to report if the minimal example always fails anyway.
700 self.arg_slices: set[tuple[int, int]] = set()
701 self.slice_comments: dict[tuple[int, int], str] = {}
702 self._observability_args: dict[str, Any] = {}
703 self._observability_predicates: defaultdict = defaultdict(
704 lambda: {"satisfied": 0, "unsatisfied": 0}
705 )
706 self._sampled_from_all_strategies_elements_message: Optional[
707 tuple[str, object]
708 ] = None
709 self._shared_strategy_draws: dict[Hashable, Any] = {}
710
711 self.expected_exception: Optional[BaseException] = None
712 self.expected_traceback: Optional[str] = None
713 self.extra_information = ExtraInformation()
714
715 self.prefix = prefix
716 self.nodes: tuple[ChoiceNode, ...] = ()
717 self.misaligned_at: Optional[MisalignedAt] = None
718 self.cannot_proceed_scope: Optional[CannotProceedScopeT] = None
719 self.start_span(TOP_LABEL)
720
721 def __repr__(self) -> str:
722 return "ConjectureData(%s, %d choices%s)" % (
723 self.status.name,
724 len(self.nodes),
725 ", frozen" if self.frozen else "",
726 )
727
728 @property
729 def choices(self) -> tuple[ChoiceT, ...]:
730 return tuple(node.value for node in self.nodes)
731
732 # draw_* functions might be called in one of two contexts: either "above" or
733 # "below" the choice sequence. For instance, draw_string calls draw_boolean
734 # from ``many`` when calculating the number of characters to return. We do
735 # not want these choices to get written to the choice sequence, because they
736 # are not true choices themselves.
737 #
738 # `observe` formalizes this. The choice will only be written to the choice
739 # sequence if observe is True.
740 def _draw(self, choice_type, constraints, *, observe, forced):
741 # this is somewhat redundant with the length > max_length check at the
742 # end of the function, but avoids trying to use a null self.random when
743 # drawing past the node of a ConjectureData.for_choices data.
744 if self.length == self.max_length:
745 debug_report(f"overrun because hit {self.max_length=}")
746 self.mark_overrun()
747 if len(self.nodes) == self.max_choices:
748 debug_report(f"overrun because hit {self.max_choices=}")
749 self.mark_overrun()
750
751 if observe and self.prefix is not None and self.index < len(self.prefix):
752 value = self._pop_choice(choice_type, constraints, forced=forced)
753 elif forced is None:
754 value = getattr(self.provider, f"draw_{choice_type}")(**constraints)
755
756 if forced is not None:
757 value = forced
758
759 # nan values generated via int_to_float break list membership:
760 #
761 # >>> n = 18444492273895866368
762 # >>> assert math.isnan(int_to_float(n))
763 # >>> assert int_to_float(n) not in [int_to_float(n)]
764 #
765 # because int_to_float nans are not equal in the sense of either
766 # `a == b` or `a is b`.
767 #
768 # This can lead to flaky errors when collections require unique
769 # floats. What was happening is that in some places we provided math.nan
770 # provide math.nan, and in others we provided
771 # int_to_float(float_to_int(math.nan)), and which one gets used
772 # was not deterministic across test iterations.
773 #
774 # To fix this, *never* provide a nan value which is equal (via `is`) to
775 # another provided nan value. This sacrifices some test power; we should
776 # bring that back (ABOVE the choice sequence layer) in the future.
777 #
778 # See https://github.com/HypothesisWorks/hypothesis/issues/3926.
779 if choice_type == "float" and math.isnan(value):
780 value = int_to_float(float_to_int(value))
781
782 if observe:
783 was_forced = forced is not None
784 getattr(self.observer, f"draw_{choice_type}")(
785 value, constraints=constraints, was_forced=was_forced
786 )
787 size = 0 if self.provider.avoid_realization else choices_size([value])
788 if self.length + size > self.max_length:
789 debug_report(
790 f"overrun because {self.length=} + {size=} > {self.max_length=}"
791 )
792 self.mark_overrun()
793
794 node = ChoiceNode(
795 type=choice_type,
796 value=value,
797 constraints=constraints,
798 was_forced=was_forced,
799 index=len(self.nodes),
800 )
801 self.__span_record.record_choice()
802 self.nodes += (node,)
803 self.length += size
804
805 return value
806
807 def draw_integer(
808 self,
809 min_value: Optional[int] = None,
810 max_value: Optional[int] = None,
811 *,
812 weights: Optional[dict[int, float]] = None,
813 shrink_towards: int = 0,
814 forced: Optional[int] = None,
815 observe: bool = True,
816 ) -> int:
817 # Validate arguments
818 if weights is not None:
819 assert min_value is not None
820 assert max_value is not None
821 assert len(weights) <= 255 # arbitrary practical limit
822 # We can and should eventually support total weights. But this
823 # complicates shrinking as we can no longer assume we can force
824 # a value to the unmapped probability mass if that mass might be 0.
825 assert sum(weights.values()) < 1
826 # similarly, things get simpler if we assume every value is possible.
827 # we'll want to drop this restriction eventually.
828 assert all(w != 0 for w in weights.values())
829
830 if forced is not None and min_value is not None:
831 assert min_value <= forced
832 if forced is not None and max_value is not None:
833 assert forced <= max_value
834
835 constraints: IntegerConstraints = self._pooled_constraints(
836 "integer",
837 {
838 "min_value": min_value,
839 "max_value": max_value,
840 "weights": weights,
841 "shrink_towards": shrink_towards,
842 },
843 )
844 return self._draw("integer", constraints, observe=observe, forced=forced)
845
846 def draw_float(
847 self,
848 min_value: float = -math.inf,
849 max_value: float = math.inf,
850 *,
851 allow_nan: bool = True,
852 smallest_nonzero_magnitude: float = SMALLEST_SUBNORMAL,
853 # TODO: consider supporting these float widths at the choice sequence
854 # level in the future.
855 # width: Literal[16, 32, 64] = 64,
856 forced: Optional[float] = None,
857 observe: bool = True,
858 ) -> float:
859 assert smallest_nonzero_magnitude > 0
860 assert not math.isnan(min_value)
861 assert not math.isnan(max_value)
862
863 if smallest_nonzero_magnitude == 0.0: # pragma: no cover
864 raise FloatingPointError(
865 "Got allow_subnormal=True, but we can't represent subnormal floats "
866 "right now, in violation of the IEEE-754 floating-point "
867 "specification. This is usually because something was compiled with "
868 "-ffast-math or a similar option, which sets global processor state. "
869 "See https://simonbyrne.github.io/notes/fastmath/ for a more detailed "
870 "writeup - and good luck!"
871 )
872
873 if forced is not None:
874 assert allow_nan or not math.isnan(forced)
875 assert math.isnan(forced) or (
876 sign_aware_lte(min_value, forced) and sign_aware_lte(forced, max_value)
877 )
878
879 constraints: FloatConstraints = self._pooled_constraints(
880 "float",
881 {
882 "min_value": min_value,
883 "max_value": max_value,
884 "allow_nan": allow_nan,
885 "smallest_nonzero_magnitude": smallest_nonzero_magnitude,
886 },
887 )
888 return self._draw("float", constraints, observe=observe, forced=forced)
889
890 def draw_string(
891 self,
892 intervals: IntervalSet,
893 *,
894 min_size: int = 0,
895 max_size: int = COLLECTION_DEFAULT_MAX_SIZE,
896 forced: Optional[str] = None,
897 observe: bool = True,
898 ) -> str:
899 assert forced is None or min_size <= len(forced) <= max_size
900 assert min_size >= 0
901 if len(intervals) == 0:
902 assert min_size == 0
903
904 constraints: StringConstraints = self._pooled_constraints(
905 "string",
906 {
907 "intervals": intervals,
908 "min_size": min_size,
909 "max_size": max_size,
910 },
911 )
912 return self._draw("string", constraints, observe=observe, forced=forced)
913
914 def draw_bytes(
915 self,
916 min_size: int = 0,
917 max_size: int = COLLECTION_DEFAULT_MAX_SIZE,
918 *,
919 forced: Optional[bytes] = None,
920 observe: bool = True,
921 ) -> bytes:
922 assert forced is None or min_size <= len(forced) <= max_size
923 assert min_size >= 0
924
925 constraints: BytesConstraints = self._pooled_constraints(
926 "bytes", {"min_size": min_size, "max_size": max_size}
927 )
928 return self._draw("bytes", constraints, observe=observe, forced=forced)
929
930 def draw_boolean(
931 self,
932 p: float = 0.5,
933 *,
934 forced: Optional[bool] = None,
935 observe: bool = True,
936 ) -> bool:
937 assert (forced is not True) or p > 0
938 assert (forced is not False) or p < 1
939
940 constraints: BooleanConstraints = self._pooled_constraints("boolean", {"p": p})
941 return self._draw("boolean", constraints, observe=observe, forced=forced)
942
943 def _pooled_constraints(self, choice_type, constraints):
944 """Memoize common dictionary objects to reduce memory pressure."""
945 # caching runs afoul of nondeterminism checks
946 if self.provider.avoid_realization:
947 return constraints
948
949 key = (choice_type, *choice_constraints_key(choice_type, constraints))
950 try:
951 return POOLED_CONSTRAINTS_CACHE[key]
952 except KeyError:
953 POOLED_CONSTRAINTS_CACHE[key] = constraints
954 return constraints
955
956 def _pop_choice(
957 self,
958 choice_type: ChoiceTypeT,
959 constraints: ChoiceConstraintsT,
960 *,
961 forced: Optional[ChoiceT],
962 ) -> ChoiceT:
963 assert self.prefix is not None
964 # checked in _draw
965 assert self.index < len(self.prefix)
966
967 value = self.prefix[self.index]
968 if isinstance(value, ChoiceTemplate):
969 node: ChoiceTemplate = value
970 if node.count is not None:
971 assert node.count >= 0
972 # node templates have to be at the end for now, since it's not immediately
973 # apparent how to handle overruning a node template while generating a single
974 # node if the alternative is not "the entire data is an overrun".
975 assert self.index == len(self.prefix) - 1
976 if node.type == "simplest":
977 if forced is not None:
978 choice = forced
979 try:
980 choice = choice_from_index(0, choice_type, constraints)
981 except ChoiceTooLarge:
982 self.mark_overrun()
983 else:
984 raise NotImplementedError
985
986 if node.count is not None:
987 node.count -= 1
988 if node.count < 0:
989 self.mark_overrun()
990 return choice
991
992 choice = value
993 node_choice_type = {
994 str: "string",
995 float: "float",
996 int: "integer",
997 bool: "boolean",
998 bytes: "bytes",
999 }[type(choice)]
1000 # If we're trying to:
1001 # * draw a different choice type at the same location
1002 # * draw the same choice type with a different constraints, which does not permit
1003 # the current value
1004 #
1005 # then we call this a misalignment, because the choice sequence has
1006 # changed from what we expected at some point. An easy misalignment is
1007 #
1008 # one_of(integers(0, 100), integers(101, 200))
1009 #
1010 # where the choice sequence [0, 100] has constraints {min_value: 0, max_value: 100}
1011 # at index 1, but [0, 101] has constraints {min_value: 101, max_value: 200} at
1012 # index 1 (which does not permit any of the values 0-100).
1013 #
1014 # When the choice sequence becomes misaligned, we generate a new value of the
1015 # type and constraints the strategy expects.
1016 if node_choice_type != choice_type or not choice_permitted(choice, constraints):
1017 # only track first misalignment for now.
1018 if self.misaligned_at is None:
1019 self.misaligned_at = (self.index, choice_type, constraints, forced)
1020 try:
1021 # Fill in any misalignments with index 0 choices. An alternative to
1022 # this is using the index of the misaligned choice instead
1023 # of index 0, which may be useful for maintaining
1024 # "similarly-complex choices" in the shrinker. This requires
1025 # attaching an index to every choice in ConjectureData.for_choices,
1026 # which we don't always have (e.g. when reading from db).
1027 #
1028 # If we really wanted this in the future we could make this complexity
1029 # optional, use it if present, and default to index 0 otherwise.
1030 # This complicates our internal api and so I'd like to avoid it
1031 # if possible.
1032 #
1033 # Additionally, I don't think slips which require
1034 # slipping to high-complexity values are common. Though arguably
1035 # we may want to expand a bit beyond *just* the simplest choice.
1036 # (we could for example consider sampling choices from index 0-10).
1037 choice = choice_from_index(0, choice_type, constraints)
1038 except ChoiceTooLarge:
1039 # should really never happen with a 0-index choice, but let's be safe.
1040 self.mark_overrun()
1041
1042 self.index += 1
1043 return choice
1044
1045 def as_result(self) -> Union[ConjectureResult, _Overrun]:
1046 """Convert the result of running this test into
1047 either an Overrun object or a ConjectureResult."""
1048
1049 assert self.frozen
1050 if self.status == Status.OVERRUN:
1051 return Overrun
1052 if self.__result is None:
1053 self.__result = ConjectureResult(
1054 status=self.status,
1055 interesting_origin=self.interesting_origin,
1056 spans=self.spans,
1057 nodes=self.nodes,
1058 length=self.length,
1059 output=self.output,
1060 expected_traceback=self.expected_traceback,
1061 expected_exception=self.expected_exception,
1062 extra_information=(
1063 self.extra_information
1064 if self.extra_information.has_information()
1065 else None
1066 ),
1067 has_discards=self.has_discards,
1068 target_observations=self.target_observations,
1069 tags=frozenset(self.tags),
1070 arg_slices=self.arg_slices,
1071 slice_comments=self.slice_comments,
1072 misaligned_at=self.misaligned_at,
1073 cannot_proceed_scope=self.cannot_proceed_scope,
1074 )
1075 assert self.__result is not None
1076 return self.__result
1077
1078 def __assert_not_frozen(self, name: str) -> None:
1079 if self.frozen:
1080 raise Frozen(f"Cannot call {name} on frozen ConjectureData")
1081
1082 def note(self, value: Any) -> None:
1083 self.__assert_not_frozen("note")
1084 if not isinstance(value, str):
1085 value = repr(value)
1086 self.output += value
1087
1088 def draw(
1089 self,
1090 strategy: "SearchStrategy[Ex]",
1091 label: Optional[int] = None,
1092 observe_as: Optional[str] = None,
1093 ) -> "Ex":
1094 from hypothesis.internal.observability import TESTCASE_CALLBACKS
1095 from hypothesis.strategies._internal.lazy import unwrap_strategies
1096 from hypothesis.strategies._internal.utils import to_jsonable
1097
1098 if self.is_find and not strategy.supports_find:
1099 raise InvalidArgument(
1100 f"Cannot use strategy {strategy!r} within a call to find "
1101 "(presumably because it would be invalid after the call had ended)."
1102 )
1103
1104 at_top_level = self.depth == 0
1105 start_time = None
1106 if at_top_level:
1107 # We start this timer early, because accessing attributes on a LazyStrategy
1108 # can be almost arbitrarily slow. In cases like characters() and text()
1109 # where we cache something expensive, this led to Flaky deadline errors!
1110 # See https://github.com/HypothesisWorks/hypothesis/issues/2108
1111 start_time = time.perf_counter()
1112 gc_start_time = gc_cumulative_time()
1113
1114 strategy.validate()
1115
1116 if strategy.is_empty:
1117 self.mark_invalid(f"empty strategy {self!r}")
1118
1119 if self.depth >= MAX_DEPTH:
1120 self.mark_invalid("max depth exceeded")
1121
1122 # Jump directly to the unwrapped strategy for the label and for do_draw.
1123 # This avoids adding an extra span to all lazy strategies.
1124 unwrapped = unwrap_strategies(strategy)
1125 if label is None:
1126 label = unwrapped.label
1127 assert isinstance(label, int)
1128
1129 self.start_span(label=label)
1130 try:
1131 if not at_top_level:
1132 return unwrapped.do_draw(self)
1133 assert start_time is not None
1134 key = observe_as or f"generate:unlabeled_{len(self.draw_times)}"
1135 try:
1136 try:
1137 v = unwrapped.do_draw(self)
1138 finally:
1139 # Subtract the time spent in GC to avoid overcounting, as it is
1140 # accounted for at the overall example level.
1141 in_gctime = gc_cumulative_time() - gc_start_time
1142 self.draw_times[key] = time.perf_counter() - start_time - in_gctime
1143 except Exception as err:
1144 add_note(
1145 err,
1146 f"while generating {key.removeprefix('generate:')!r} from {strategy!r}",
1147 )
1148 raise
1149 if TESTCASE_CALLBACKS:
1150 avoid = self.provider.avoid_realization
1151 self._observability_args[key] = to_jsonable(v, avoid_realization=avoid)
1152 return v
1153 finally:
1154 self.stop_span()
1155
1156 def start_span(self, label: int) -> None:
1157 self.provider.span_start(label)
1158 self.__assert_not_frozen("start_span")
1159 self.depth += 1
1160 # Logically it would make sense for this to just be
1161 # ``self.depth = max(self.depth, self.max_depth)``, which is what it used to
1162 # be until we ran the code under tracemalloc and found a rather significant
1163 # chunk of allocation was happening here. This was presumably due to varargs
1164 # or the like, but we didn't investigate further given that it was easy
1165 # to fix with this check.
1166 if self.depth > self.max_depth:
1167 self.max_depth = self.depth
1168 self.__span_record.start_span(label)
1169 self.labels_for_structure_stack.append({label})
1170
1171 def stop_span(self, *, discard: bool = False) -> None:
1172 self.provider.span_end(discard)
1173 if self.frozen:
1174 return
1175 if discard:
1176 self.has_discards = True
1177 self.depth -= 1
1178 assert self.depth >= -1
1179 self.__span_record.stop_span(discard=discard)
1180
1181 labels_for_structure = self.labels_for_structure_stack.pop()
1182
1183 if not discard:
1184 if self.labels_for_structure_stack:
1185 self.labels_for_structure_stack[-1].update(labels_for_structure)
1186 else:
1187 self.tags.update([structural_coverage(l) for l in labels_for_structure])
1188
1189 if discard:
1190 # Once we've discarded a span, every test case starting with
1191 # this prefix contains discards. We prune the tree at that point so
1192 # as to avoid future test cases bothering with this region, on the
1193 # assumption that some span that you could have used instead
1194 # there would *not* trigger the discard. This greatly speeds up
1195 # test case generation in some cases, because it allows us to
1196 # ignore large swathes of the search space that are effectively
1197 # redundant.
1198 #
1199 # A scenario that can cause us problems but which we deliberately
1200 # have decided not to support is that if there are side effects
1201 # during data generation then you may end up with a scenario where
1202 # every good test case generates a discard because the discarded
1203 # section sets up important things for later. This is not terribly
1204 # likely and all that you see in this case is some degradation in
1205 # quality of testing, so we don't worry about it.
1206 #
1207 # Note that killing the branch does *not* mean we will never
1208 # explore below this point, and in particular we may do so during
1209 # shrinking. Any explicit request for a data object that starts
1210 # with the branch here will work just fine, but novel prefix
1211 # generation will avoid it, and we can use it to detect when we
1212 # have explored the entire tree (up to redundancy).
1213
1214 self.observer.kill_branch()
1215
1216 @property
1217 def spans(self) -> Spans:
1218 assert self.frozen
1219 if self.__spans is None:
1220 self.__spans = Spans(record=self.__span_record)
1221 return self.__spans
1222
1223 def freeze(self) -> None:
1224 if self.frozen:
1225 return
1226 self.finish_time = time.perf_counter()
1227 self.gc_finish_time = gc_cumulative_time()
1228
1229 # Always finish by closing all remaining spans so that we have a valid tree.
1230 while self.depth >= 0:
1231 self.stop_span()
1232
1233 self.__span_record.freeze()
1234 self.frozen = True
1235 self.observer.conclude_test(self.status, self.interesting_origin)
1236
1237 def choice(
1238 self,
1239 values: Sequence[T],
1240 *,
1241 forced: Optional[T] = None,
1242 observe: bool = True,
1243 ) -> T:
1244 forced_i = None if forced is None else values.index(forced)
1245 i = self.draw_integer(
1246 0,
1247 len(values) - 1,
1248 forced=forced_i,
1249 observe=observe,
1250 )
1251 return values[i]
1252
1253 def conclude_test(
1254 self,
1255 status: Status,
1256 interesting_origin: Optional[InterestingOrigin] = None,
1257 ) -> NoReturn:
1258 assert (interesting_origin is None) or (status == Status.INTERESTING)
1259 self.__assert_not_frozen("conclude_test")
1260 self.interesting_origin = interesting_origin
1261 self.status = status
1262 self.freeze()
1263 raise StopTest(self.testcounter)
1264
1265 def mark_interesting(
1266 self, interesting_origin: Optional[InterestingOrigin] = None
1267 ) -> NoReturn:
1268 self.conclude_test(Status.INTERESTING, interesting_origin)
1269
1270 def mark_invalid(self, why: Optional[str] = None) -> NoReturn:
1271 if why is not None:
1272 self.events["invalid because"] = why
1273 self.conclude_test(Status.INVALID)
1274
1275 def mark_overrun(self) -> NoReturn:
1276 self.conclude_test(Status.OVERRUN)
1277
1278
1279def draw_choice(choice_type, constraints, *, random):
1280 cd = ConjectureData(random=random)
1281 return getattr(cd.provider, f"draw_{choice_type}")(**constraints)