1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
10
11import math
12import time
13from collections import defaultdict
14from collections.abc import Hashable, Iterable, Iterator, Sequence
15from enum import IntEnum
16from functools import cached_property
17from random import Random
18from typing import (
19 TYPE_CHECKING,
20 Any,
21 Literal,
22 NoReturn,
23 Optional,
24 TypeVar,
25 Union,
26 cast,
27 overload,
28)
29
30import attr
31
32from hypothesis.errors import (
33 CannotProceedScopeT,
34 ChoiceTooLarge,
35 Frozen,
36 InvalidArgument,
37 StopTest,
38)
39from hypothesis.internal.cache import LRUCache
40from hypothesis.internal.compat import add_note
41from hypothesis.internal.conjecture.choice import (
42 BooleanConstraints,
43 BytesConstraints,
44 ChoiceConstraintsT,
45 ChoiceNode,
46 ChoiceT,
47 ChoiceTemplate,
48 ChoiceTypeT,
49 FloatConstraints,
50 IntegerConstraints,
51 StringConstraints,
52 choice_constraints_key,
53 choice_from_index,
54 choice_permitted,
55 choices_size,
56)
57from hypothesis.internal.conjecture.junkdrawer import IntList, gc_cumulative_time
58from hypothesis.internal.conjecture.providers import (
59 COLLECTION_DEFAULT_MAX_SIZE,
60 HypothesisProvider,
61 PrimitiveProvider,
62)
63from hypothesis.internal.conjecture.utils import calc_label_from_name
64from hypothesis.internal.escalation import InterestingOrigin
65from hypothesis.internal.floats import (
66 SMALLEST_SUBNORMAL,
67 float_to_int,
68 int_to_float,
69 sign_aware_lte,
70)
71from hypothesis.internal.intervalsets import IntervalSet
72from hypothesis.internal.observability import PredicateCounts
73from hypothesis.reporting import debug_report
74from hypothesis.utils.conventions import not_set
75
76if TYPE_CHECKING:
77 from typing import TypeAlias
78
79 from hypothesis.strategies import SearchStrategy
80 from hypothesis.strategies._internal.strategies import Ex
81
82
83def __getattr__(name: str) -> Any:
84 if name == "AVAILABLE_PROVIDERS":
85 from hypothesis._settings import note_deprecation
86 from hypothesis.internal.conjecture.providers import AVAILABLE_PROVIDERS
87
88 note_deprecation(
89 "hypothesis.internal.conjecture.data.AVAILABLE_PROVIDERS has been moved to "
90 "hypothesis.internal.conjecture.providers.AVAILABLE_PROVIDERS.",
91 since="2025-01-25",
92 has_codemod=False,
93 stacklevel=1,
94 )
95 return AVAILABLE_PROVIDERS
96
97 raise AttributeError(
98 f"Module 'hypothesis.internal.conjecture.data' has no attribute {name}"
99 )
100
101
102T = TypeVar("T")
103TargetObservations = dict[str, Union[int, float]]
104# index, choice_type, constraints, forced value
105MisalignedAt: "TypeAlias" = tuple[
106 int, ChoiceTypeT, ChoiceConstraintsT, Optional[ChoiceT]
107]
108
109TOP_LABEL = calc_label_from_name("top")
110
111
112class ExtraInformation:
113 """A class for holding shared state on a ``ConjectureData`` that should
114 be added to the final ``ConjectureResult``."""
115
116 def __repr__(self) -> str:
117 return "ExtraInformation({})".format(
118 ", ".join(f"{k}={v!r}" for k, v in self.__dict__.items()),
119 )
120
121 def has_information(self) -> bool:
122 return bool(self.__dict__)
123
124
125class Status(IntEnum):
126 OVERRUN = 0
127 INVALID = 1
128 VALID = 2
129 INTERESTING = 3
130
131 def __repr__(self) -> str:
132 return f"Status.{self.name}"
133
134
135@attr.s(slots=True, frozen=True)
136class StructuralCoverageTag:
137 label: int = attr.ib()
138
139
140STRUCTURAL_COVERAGE_CACHE: dict[int, StructuralCoverageTag] = {}
141
142
143def structural_coverage(label: int) -> StructuralCoverageTag:
144 try:
145 return STRUCTURAL_COVERAGE_CACHE[label]
146 except KeyError:
147 return STRUCTURAL_COVERAGE_CACHE.setdefault(label, StructuralCoverageTag(label))
148
149
150# This cache can be quite hot and so we prefer LRUCache over LRUReusedCache for
151# performance. We lose scan resistance, but that's probably fine here.
152POOLED_CONSTRAINTS_CACHE: LRUCache[tuple[Any, ...], ChoiceConstraintsT] = LRUCache(4096)
153
154
155class Span:
156 """A span tracks the hierarchical structure of choices within a single test run.
157
158 Spans are created to mark regions of the choice sequence that that are
159 logically related to each other. For instance, Hypothesis tracks:
160 - A single top-level span for the entire choice sequence
161 - A span for the choices made by each strategy
162 - Some strategies define additional spans within their choices. For instance,
163 st.lists() tracks the "should add another element" choice and the "add
164 another element" choices as separate spans.
165
166 Spans provide useful information to the shrinker, mutator, targeted PBT,
167 and other subsystems of Hypothesis.
168
169 Rather than store each ``Span`` as a rich object, it is actually
170 just an index into the ``Spans`` class defined below. This has two
171 purposes: Firstly, for most properties of spans we will never need
172 to allocate storage at all, because most properties are not used on
173 most spans. Secondly, by storing the spans as compact lists
174 of integers, we save a considerable amount of space compared to
175 Python's normal object size.
176
177 This does have the downside that it increases the amount of allocation
178 we do, and slows things down as a result, in some usage patterns because
179 we repeatedly allocate the same Span or int objects, but it will
180 often dramatically reduce our memory usage, so is worth it.
181 """
182
183 __slots__ = ("index", "owner")
184
185 def __init__(self, owner: "Spans", index: int) -> None:
186 self.owner = owner
187 self.index = index
188
189 def __eq__(self, other: object) -> bool:
190 if self is other:
191 return True
192 if not isinstance(other, Span):
193 return NotImplemented
194 return (self.owner is other.owner) and (self.index == other.index)
195
196 def __ne__(self, other: object) -> bool:
197 if self is other:
198 return False
199 if not isinstance(other, Span):
200 return NotImplemented
201 return (self.owner is not other.owner) or (self.index != other.index)
202
203 def __repr__(self) -> str:
204 return f"spans[{self.index}]"
205
206 @property
207 def label(self) -> int:
208 """A label is an opaque value that associates each span with its
209 approximate origin, such as a particular strategy class or a particular
210 kind of draw."""
211 return self.owner.labels[self.owner.label_indices[self.index]]
212
213 @property
214 def parent(self) -> Optional[int]:
215 """The index of the span that this one is nested directly within."""
216 if self.index == 0:
217 return None
218 return self.owner.parentage[self.index]
219
220 @property
221 def start(self) -> int:
222 return self.owner.starts[self.index]
223
224 @property
225 def end(self) -> int:
226 return self.owner.ends[self.index]
227
228 @property
229 def depth(self) -> int:
230 """
231 Depth of this span in the span tree. The top-level span has a depth of 0.
232 """
233 return self.owner.depths[self.index]
234
235 @property
236 def discarded(self) -> bool:
237 """True if this is span's ``stop_span`` call had ``discard`` set to
238 ``True``. This means we believe that the shrinker should be able to delete
239 this span completely, without affecting the value produced by its enclosing
240 strategy. Typically set when a rejection sampler decides to reject a
241 generated value and try again."""
242 return self.index in self.owner.discarded
243
244 @property
245 def choice_count(self) -> int:
246 """The number of choices in this span."""
247 return self.end - self.start
248
249 @property
250 def children(self) -> "list[Span]":
251 """The list of all spans with this as a parent, in increasing index
252 order."""
253 return [self.owner[i] for i in self.owner.children[self.index]]
254
255
256class SpanProperty:
257 """There are many properties of spans that we calculate by
258 essentially rerunning the test case multiple times based on the
259 calls which we record in SpanProperty.
260
261 This class defines a visitor, subclasses of which can be used
262 to calculate these properties.
263 """
264
265 def __init__(self, spans: "Spans"):
266 self.span_stack: list[int] = []
267 self.spans = spans
268 self.span_count = 0
269 self.choice_count = 0
270
271 def run(self) -> Any:
272 """Rerun the test case with this visitor and return the
273 results of ``self.finish()``."""
274 for record in self.spans.trail:
275 if record == TrailType.CHOICE:
276 self.choice_count += 1
277 elif record >= TrailType.START_SPAN:
278 self.__push(record - TrailType.START_SPAN)
279 else:
280 assert record in (
281 TrailType.STOP_SPAN_DISCARD,
282 TrailType.STOP_SPAN_NO_DISCARD,
283 )
284 self.__pop(discarded=record == TrailType.STOP_SPAN_DISCARD)
285 return self.finish()
286
287 def __push(self, label_index: int) -> None:
288 i = self.span_count
289 assert i < len(self.spans)
290 self.start_span(i, label_index=label_index)
291 self.span_count += 1
292 self.span_stack.append(i)
293
294 def __pop(self, *, discarded: bool) -> None:
295 i = self.span_stack.pop()
296 self.stop_span(i, discarded=discarded)
297
298 def start_span(self, i: int, label_index: int) -> None:
299 """Called at the start of each span, with ``i`` the
300 index of the span and ``label_index`` the index of
301 its label in ``self.spans.labels``."""
302
303 def stop_span(self, i: int, *, discarded: bool) -> None:
304 """Called at the end of each span, with ``i`` the
305 index of the span and ``discarded`` being ``True`` if ``stop_span``
306 was called with ``discard=True``."""
307
308 def finish(self) -> Any:
309 raise NotImplementedError
310
311
312class TrailType(IntEnum):
313 STOP_SPAN_DISCARD = 1
314 STOP_SPAN_NO_DISCARD = 2
315 START_SPAN = 3
316 CHOICE = calc_label_from_name("ir draw record")
317
318
319class SpanRecord:
320 """Records the series of ``start_span``, ``stop_span``, and
321 ``draw_bits`` calls so that these may be stored in ``Spans`` and
322 replayed when we need to know about the structure of individual
323 ``Span`` objects.
324
325 Note that there is significant similarity between this class and
326 ``DataObserver``, and the plan is to eventually unify them, but
327 they currently have slightly different functions and implementations.
328 """
329
330 def __init__(self) -> None:
331 self.labels: list[int] = []
332 self.__index_of_labels: Optional[dict[int, int]] = {}
333 self.trail = IntList()
334 self.nodes: list[ChoiceNode] = []
335
336 def freeze(self) -> None:
337 self.__index_of_labels = None
338
339 def record_choice(self) -> None:
340 self.trail.append(TrailType.CHOICE)
341
342 def start_span(self, label: int) -> None:
343 assert self.__index_of_labels is not None
344 try:
345 i = self.__index_of_labels[label]
346 except KeyError:
347 i = self.__index_of_labels.setdefault(label, len(self.labels))
348 self.labels.append(label)
349 self.trail.append(TrailType.START_SPAN + i)
350
351 def stop_span(self, *, discard: bool) -> None:
352 if discard:
353 self.trail.append(TrailType.STOP_SPAN_DISCARD)
354 else:
355 self.trail.append(TrailType.STOP_SPAN_NO_DISCARD)
356
357
358class _starts_and_ends(SpanProperty):
359 def __init__(self, spans: "Spans") -> None:
360 super().__init__(spans)
361 self.starts = IntList.of_length(len(self.spans))
362 self.ends = IntList.of_length(len(self.spans))
363
364 def start_span(self, i: int, label_index: int) -> None:
365 self.starts[i] = self.choice_count
366
367 def stop_span(self, i: int, *, discarded: bool) -> None:
368 self.ends[i] = self.choice_count
369
370 def finish(self) -> tuple[IntList, IntList]:
371 return (self.starts, self.ends)
372
373
374class _discarded(SpanProperty):
375 def __init__(self, spans: "Spans") -> None:
376 super().__init__(spans)
377 self.result: set[int] = set()
378
379 def finish(self) -> frozenset[int]:
380 return frozenset(self.result)
381
382 def stop_span(self, i: int, *, discarded: bool) -> None:
383 if discarded:
384 self.result.add(i)
385
386
387class _parentage(SpanProperty):
388 def __init__(self, spans: "Spans") -> None:
389 super().__init__(spans)
390 self.result = IntList.of_length(len(self.spans))
391
392 def stop_span(self, i: int, *, discarded: bool) -> None:
393 if i > 0:
394 self.result[i] = self.span_stack[-1]
395
396 def finish(self) -> IntList:
397 return self.result
398
399
400class _depths(SpanProperty):
401 def __init__(self, spans: "Spans") -> None:
402 super().__init__(spans)
403 self.result = IntList.of_length(len(self.spans))
404
405 def start_span(self, i: int, label_index: int) -> None:
406 self.result[i] = len(self.span_stack)
407
408 def finish(self) -> IntList:
409 return self.result
410
411
412class _label_indices(SpanProperty):
413 def __init__(self, spans: "Spans") -> None:
414 super().__init__(spans)
415 self.result = IntList.of_length(len(self.spans))
416
417 def start_span(self, i: int, label_index: int) -> None:
418 self.result[i] = label_index
419
420 def finish(self) -> IntList:
421 return self.result
422
423
424class _mutator_groups(SpanProperty):
425 def __init__(self, spans: "Spans") -> None:
426 super().__init__(spans)
427 self.groups: dict[int, set[tuple[int, int]]] = defaultdict(set)
428
429 def start_span(self, i: int, label_index: int) -> None:
430 # TODO should we discard start == end cases? occurs for eg st.data()
431 # which is conditionally or never drawn from. arguably swapping
432 # nodes with the empty list is a useful mutation enabled by start == end?
433 key = (self.spans[i].start, self.spans[i].end)
434 self.groups[label_index].add(key)
435
436 def finish(self) -> Iterable[set[tuple[int, int]]]:
437 # Discard groups with only one span, since the mutator can't
438 # do anything useful with them.
439 return [g for g in self.groups.values() if len(g) >= 2]
440
441
442class Spans:
443 """A lazy collection of ``Span`` objects, derived from
444 the record of recorded behaviour in ``SpanRecord``.
445
446 Behaves logically as if it were a list of ``Span`` objects,
447 but actually mostly exists as a compact store of information
448 for them to reference into. All properties on here are best
449 understood as the backing storage for ``Span`` and are
450 described there.
451 """
452
453 def __init__(self, record: SpanRecord) -> None:
454 self.trail = record.trail
455 self.labels = record.labels
456 self.__length = self.trail.count(
457 TrailType.STOP_SPAN_DISCARD
458 ) + record.trail.count(TrailType.STOP_SPAN_NO_DISCARD)
459 self.__children: Optional[list[Sequence[int]]] = None
460
461 @cached_property
462 def starts_and_ends(self) -> tuple[IntList, IntList]:
463 return _starts_and_ends(self).run()
464
465 @property
466 def starts(self) -> IntList:
467 return self.starts_and_ends[0]
468
469 @property
470 def ends(self) -> IntList:
471 return self.starts_and_ends[1]
472
473 @cached_property
474 def discarded(self) -> frozenset[int]:
475 return _discarded(self).run()
476
477 @cached_property
478 def parentage(self) -> IntList:
479 return _parentage(self).run()
480
481 @cached_property
482 def depths(self) -> IntList:
483 return _depths(self).run()
484
485 @cached_property
486 def label_indices(self) -> IntList:
487 return _label_indices(self).run()
488
489 @cached_property
490 def mutator_groups(self) -> list[set[tuple[int, int]]]:
491 return _mutator_groups(self).run()
492
493 @property
494 def children(self) -> list[Sequence[int]]:
495 if self.__children is None:
496 children = [IntList() for _ in range(len(self))]
497 for i, p in enumerate(self.parentage):
498 if i > 0:
499 children[p].append(i)
500 # Replace empty children lists with a tuple to reduce
501 # memory usage.
502 for i, c in enumerate(children):
503 if not c:
504 children[i] = () # type: ignore
505 self.__children = children # type: ignore
506 return self.__children # type: ignore
507
508 def __len__(self) -> int:
509 return self.__length
510
511 def __getitem__(self, i: int) -> Span:
512 n = self.__length
513 if i < -n or i >= n:
514 raise IndexError(f"Index {i} out of range [-{n}, {n})")
515 if i < 0:
516 i += n
517 return Span(self, i)
518
519 # not strictly necessary as we have len/getitem, but required for mypy.
520 # https://github.com/python/mypy/issues/9737
521 def __iter__(self) -> Iterator[Span]:
522 for i in range(len(self)):
523 yield self[i]
524
525
526class _Overrun:
527 status: Status = Status.OVERRUN
528
529 def __repr__(self) -> str:
530 return "Overrun"
531
532
533Overrun = _Overrun()
534
535global_test_counter = 0
536
537
538MAX_DEPTH = 100
539
540
541class DataObserver:
542 """Observer class for recording the behaviour of a
543 ConjectureData object, primarily used for tracking
544 the behaviour in the tree cache."""
545
546 def conclude_test(
547 self,
548 status: Status,
549 interesting_origin: Optional[InterestingOrigin],
550 ) -> None:
551 """Called when ``conclude_test`` is called on the
552 observed ``ConjectureData``, with the same arguments.
553
554 Note that this is called after ``freeze`` has completed.
555 """
556
557 def kill_branch(self) -> None:
558 """Mark this part of the tree as not worth re-exploring."""
559
560 def draw_integer(
561 self, value: int, *, constraints: IntegerConstraints, was_forced: bool
562 ) -> None:
563 pass
564
565 def draw_float(
566 self, value: float, *, constraints: FloatConstraints, was_forced: bool
567 ) -> None:
568 pass
569
570 def draw_string(
571 self, value: str, *, constraints: StringConstraints, was_forced: bool
572 ) -> None:
573 pass
574
575 def draw_bytes(
576 self, value: bytes, *, constraints: BytesConstraints, was_forced: bool
577 ) -> None:
578 pass
579
580 def draw_boolean(
581 self, value: bool, *, constraints: BooleanConstraints, was_forced: bool
582 ) -> None:
583 pass
584
585
586@attr.s(slots=True)
587class ConjectureResult:
588 """Result class storing the parts of ConjectureData that we
589 will care about after the original ConjectureData has outlived its
590 usefulness."""
591
592 status: Status = attr.ib()
593 interesting_origin: Optional[InterestingOrigin] = attr.ib()
594 nodes: tuple[ChoiceNode, ...] = attr.ib(eq=False, repr=False)
595 length: int = attr.ib()
596 output: str = attr.ib()
597 extra_information: Optional[ExtraInformation] = attr.ib()
598 expected_exception: Optional[BaseException] = attr.ib()
599 expected_traceback: Optional[str] = attr.ib()
600 has_discards: bool = attr.ib()
601 target_observations: TargetObservations = attr.ib()
602 tags: frozenset[StructuralCoverageTag] = attr.ib()
603 spans: Spans = attr.ib(repr=False, eq=False)
604 arg_slices: set[tuple[int, int]] = attr.ib(repr=False)
605 slice_comments: dict[tuple[int, int], str] = attr.ib(repr=False)
606 misaligned_at: Optional[MisalignedAt] = attr.ib(repr=False)
607 cannot_proceed_scope: Optional[CannotProceedScopeT] = attr.ib(repr=False)
608
609 def as_result(self) -> "ConjectureResult":
610 return self
611
612 @property
613 def choices(self) -> tuple[ChoiceT, ...]:
614 return tuple(node.value for node in self.nodes)
615
616
617class ConjectureData:
618 @classmethod
619 def for_choices(
620 cls,
621 choices: Sequence[Union[ChoiceTemplate, ChoiceT]],
622 *,
623 observer: Optional[DataObserver] = None,
624 provider: Union[type, PrimitiveProvider] = HypothesisProvider,
625 random: Optional[Random] = None,
626 ) -> "ConjectureData":
627 from hypothesis.internal.conjecture.engine import choice_count
628
629 return cls(
630 max_choices=choice_count(choices),
631 random=random,
632 prefix=choices,
633 observer=observer,
634 provider=provider,
635 )
636
637 def __init__(
638 self,
639 *,
640 random: Optional[Random],
641 observer: Optional[DataObserver] = None,
642 provider: Union[type, PrimitiveProvider] = HypothesisProvider,
643 prefix: Optional[Sequence[Union[ChoiceTemplate, ChoiceT]]] = None,
644 max_choices: Optional[int] = None,
645 provider_kw: Optional[dict[str, Any]] = None,
646 ) -> None:
647 from hypothesis.internal.conjecture.engine import BUFFER_SIZE
648
649 if observer is None:
650 observer = DataObserver()
651 if provider_kw is None:
652 provider_kw = {}
653 elif not isinstance(provider, type):
654 raise InvalidArgument(
655 f"Expected {provider=} to be a class since {provider_kw=} was "
656 "passed, but got an instance instead."
657 )
658
659 assert isinstance(observer, DataObserver)
660 self.observer = observer
661 self.max_choices = max_choices
662 self.max_length = BUFFER_SIZE
663 self.is_find = False
664 self.overdraw = 0
665 self._random = random
666
667 self.length = 0
668 self.index = 0
669 self.output = ""
670 self.status = Status.VALID
671 self.frozen = False
672 global global_test_counter
673 self.testcounter = global_test_counter
674 global_test_counter += 1
675 self.start_time = time.perf_counter()
676 self.gc_start_time = gc_cumulative_time()
677 self.events: dict[str, Union[str, int, float]] = {}
678 self.interesting_origin: Optional[InterestingOrigin] = None
679 self.draw_times: dict[str, float] = {}
680 self._stateful_run_times: dict[str, float] = defaultdict(float)
681 self.max_depth = 0
682 self.has_discards = False
683
684 self.provider: PrimitiveProvider = (
685 provider(self, **provider_kw) if isinstance(provider, type) else provider
686 )
687 assert isinstance(self.provider, PrimitiveProvider)
688
689 self.__result: Optional[ConjectureResult] = None
690
691 # Observations used for targeted search. They'll be aggregated in
692 # ConjectureRunner.generate_new_examples and fed to TargetSelector.
693 self.target_observations: TargetObservations = {}
694
695 # Tags which indicate something about which part of the search space
696 # this example is in. These are used to guide generation.
697 self.tags: set[StructuralCoverageTag] = set()
698 self.labels_for_structure_stack: list[set[int]] = []
699
700 # Normally unpopulated but we need this in the niche case
701 # that self.as_result() is Overrun but we still want the
702 # examples for reporting purposes.
703 self.__spans: Optional[Spans] = None
704
705 # We want the top level span to have depth 0, so we start
706 # at -1.
707 self.depth = -1
708 self.__span_record = SpanRecord()
709
710 # Slice indices for discrete reportable parts that which-parts-matter can
711 # try varying, to report if the minimal example always fails anyway.
712 self.arg_slices: set[tuple[int, int]] = set()
713 self.slice_comments: dict[tuple[int, int], str] = {}
714 self._observability_args: dict[str, Any] = {}
715 self._observability_predicates: defaultdict[str, PredicateCounts] = defaultdict(
716 PredicateCounts
717 )
718 self._sampled_from_all_strategies_elements_message: Optional[
719 tuple[str, object]
720 ] = None
721 self._shared_strategy_draws: dict[Hashable, tuple[int, Any]] = {}
722 self.hypothesis_runner = not_set
723
724 self.expected_exception: Optional[BaseException] = None
725 self.expected_traceback: Optional[str] = None
726 self.extra_information = ExtraInformation()
727
728 self.prefix = prefix
729 self.nodes: tuple[ChoiceNode, ...] = ()
730 self.misaligned_at: Optional[MisalignedAt] = None
731 self.cannot_proceed_scope: Optional[CannotProceedScopeT] = None
732 self.start_span(TOP_LABEL)
733
734 def __repr__(self) -> str:
735 return "ConjectureData(%s, %d choices%s)" % (
736 self.status.name,
737 len(self.nodes),
738 ", frozen" if self.frozen else "",
739 )
740
741 @property
742 def choices(self) -> tuple[ChoiceT, ...]:
743 return tuple(node.value for node in self.nodes)
744
745 # draw_* functions might be called in one of two contexts: either "above" or
746 # "below" the choice sequence. For instance, draw_string calls draw_boolean
747 # from ``many`` when calculating the number of characters to return. We do
748 # not want these choices to get written to the choice sequence, because they
749 # are not true choices themselves.
750 #
751 # `observe` formalizes this. The choice will only be written to the choice
752 # sequence if observe is True.
753
754 @overload
755 def _draw(
756 self,
757 choice_type: Literal["integer"],
758 constraints: IntegerConstraints,
759 *,
760 observe: bool,
761 forced: Optional[int],
762 ) -> int: ...
763
764 @overload
765 def _draw(
766 self,
767 choice_type: Literal["float"],
768 constraints: FloatConstraints,
769 *,
770 observe: bool,
771 forced: Optional[float],
772 ) -> float: ...
773
774 @overload
775 def _draw(
776 self,
777 choice_type: Literal["string"],
778 constraints: StringConstraints,
779 *,
780 observe: bool,
781 forced: Optional[str],
782 ) -> str: ...
783
784 @overload
785 def _draw(
786 self,
787 choice_type: Literal["bytes"],
788 constraints: BytesConstraints,
789 *,
790 observe: bool,
791 forced: Optional[bytes],
792 ) -> bytes: ...
793
794 @overload
795 def _draw(
796 self,
797 choice_type: Literal["boolean"],
798 constraints: BooleanConstraints,
799 *,
800 observe: bool,
801 forced: Optional[bool],
802 ) -> bool: ...
803
804 def _draw(
805 self,
806 choice_type: ChoiceTypeT,
807 constraints: ChoiceConstraintsT,
808 *,
809 observe: bool,
810 forced: Optional[ChoiceT],
811 ) -> ChoiceT:
812 # this is somewhat redundant with the length > max_length check at the
813 # end of the function, but avoids trying to use a null self.random when
814 # drawing past the node of a ConjectureData.for_choices data.
815 if self.length == self.max_length:
816 debug_report(f"overrun because hit {self.max_length=}")
817 self.mark_overrun()
818 if len(self.nodes) == self.max_choices:
819 debug_report(f"overrun because hit {self.max_choices=}")
820 self.mark_overrun()
821
822 if observe and self.prefix is not None and self.index < len(self.prefix):
823 value = self._pop_choice(choice_type, constraints, forced=forced)
824 elif forced is None:
825 value = getattr(self.provider, f"draw_{choice_type}")(**constraints)
826
827 if forced is not None:
828 value = forced
829
830 # nan values generated via int_to_float break list membership:
831 #
832 # >>> n = 18444492273895866368
833 # >>> assert math.isnan(int_to_float(n))
834 # >>> assert int_to_float(n) not in [int_to_float(n)]
835 #
836 # because int_to_float nans are not equal in the sense of either
837 # `a == b` or `a is b`.
838 #
839 # This can lead to flaky errors when collections require unique
840 # floats. What was happening is that in some places we provided math.nan
841 # provide math.nan, and in others we provided
842 # int_to_float(float_to_int(math.nan)), and which one gets used
843 # was not deterministic across test iterations.
844 #
845 # To fix this, *never* provide a nan value which is equal (via `is`) to
846 # another provided nan value. This sacrifices some test power; we should
847 # bring that back (ABOVE the choice sequence layer) in the future.
848 #
849 # See https://github.com/HypothesisWorks/hypothesis/issues/3926.
850 if choice_type == "float":
851 assert isinstance(value, float)
852 if math.isnan(value):
853 value = int_to_float(float_to_int(value))
854
855 if observe:
856 was_forced = forced is not None
857 getattr(self.observer, f"draw_{choice_type}")(
858 value, constraints=constraints, was_forced=was_forced
859 )
860 size = 0 if self.provider.avoid_realization else choices_size([value])
861 if self.length + size > self.max_length:
862 debug_report(
863 f"overrun because {self.length=} + {size=} > {self.max_length=}"
864 )
865 self.mark_overrun()
866
867 node = ChoiceNode(
868 type=choice_type,
869 value=value,
870 constraints=constraints,
871 was_forced=was_forced,
872 index=len(self.nodes),
873 )
874 self.__span_record.record_choice()
875 self.nodes += (node,)
876 self.length += size
877
878 return value
879
880 def draw_integer(
881 self,
882 min_value: Optional[int] = None,
883 max_value: Optional[int] = None,
884 *,
885 weights: Optional[dict[int, float]] = None,
886 shrink_towards: int = 0,
887 forced: Optional[int] = None,
888 observe: bool = True,
889 ) -> int:
890 # Validate arguments
891 if weights is not None:
892 assert min_value is not None
893 assert max_value is not None
894 assert len(weights) <= 255 # arbitrary practical limit
895 # We can and should eventually support total weights. But this
896 # complicates shrinking as we can no longer assume we can force
897 # a value to the unmapped probability mass if that mass might be 0.
898 assert sum(weights.values()) < 1
899 # similarly, things get simpler if we assume every value is possible.
900 # we'll want to drop this restriction eventually.
901 assert all(w != 0 for w in weights.values())
902
903 if forced is not None and min_value is not None:
904 assert min_value <= forced
905 if forced is not None and max_value is not None:
906 assert forced <= max_value
907
908 constraints: IntegerConstraints = self._pooled_constraints(
909 "integer",
910 {
911 "min_value": min_value,
912 "max_value": max_value,
913 "weights": weights,
914 "shrink_towards": shrink_towards,
915 },
916 )
917 return self._draw("integer", constraints, observe=observe, forced=forced)
918
919 def draw_float(
920 self,
921 min_value: float = -math.inf,
922 max_value: float = math.inf,
923 *,
924 allow_nan: bool = True,
925 smallest_nonzero_magnitude: float = SMALLEST_SUBNORMAL,
926 # TODO: consider supporting these float widths at the choice sequence
927 # level in the future.
928 # width: Literal[16, 32, 64] = 64,
929 forced: Optional[float] = None,
930 observe: bool = True,
931 ) -> float:
932 assert smallest_nonzero_magnitude > 0
933 assert not math.isnan(min_value)
934 assert not math.isnan(max_value)
935
936 if smallest_nonzero_magnitude == 0.0: # pragma: no cover
937 raise FloatingPointError(
938 "Got allow_subnormal=True, but we can't represent subnormal floats "
939 "right now, in violation of the IEEE-754 floating-point "
940 "specification. This is usually because something was compiled with "
941 "-ffast-math or a similar option, which sets global processor state. "
942 "See https://simonbyrne.github.io/notes/fastmath/ for a more detailed "
943 "writeup - and good luck!"
944 )
945
946 if forced is not None:
947 assert allow_nan or not math.isnan(forced)
948 assert math.isnan(forced) or (
949 sign_aware_lte(min_value, forced) and sign_aware_lte(forced, max_value)
950 )
951
952 constraints: FloatConstraints = self._pooled_constraints(
953 "float",
954 {
955 "min_value": min_value,
956 "max_value": max_value,
957 "allow_nan": allow_nan,
958 "smallest_nonzero_magnitude": smallest_nonzero_magnitude,
959 },
960 )
961 return self._draw("float", constraints, observe=observe, forced=forced)
962
963 def draw_string(
964 self,
965 intervals: IntervalSet,
966 *,
967 min_size: int = 0,
968 max_size: int = COLLECTION_DEFAULT_MAX_SIZE,
969 forced: Optional[str] = None,
970 observe: bool = True,
971 ) -> str:
972 assert forced is None or min_size <= len(forced) <= max_size
973 assert min_size >= 0
974 if len(intervals) == 0:
975 assert min_size == 0
976
977 constraints: StringConstraints = self._pooled_constraints(
978 "string",
979 {
980 "intervals": intervals,
981 "min_size": min_size,
982 "max_size": max_size,
983 },
984 )
985 return self._draw("string", constraints, observe=observe, forced=forced)
986
987 def draw_bytes(
988 self,
989 min_size: int = 0,
990 max_size: int = COLLECTION_DEFAULT_MAX_SIZE,
991 *,
992 forced: Optional[bytes] = None,
993 observe: bool = True,
994 ) -> bytes:
995 assert forced is None or min_size <= len(forced) <= max_size
996 assert min_size >= 0
997
998 constraints: BytesConstraints = self._pooled_constraints(
999 "bytes", {"min_size": min_size, "max_size": max_size}
1000 )
1001 return self._draw("bytes", constraints, observe=observe, forced=forced)
1002
1003 def draw_boolean(
1004 self,
1005 p: float = 0.5,
1006 *,
1007 forced: Optional[bool] = None,
1008 observe: bool = True,
1009 ) -> bool:
1010 assert (forced is not True) or p > 0
1011 assert (forced is not False) or p < 1
1012
1013 constraints: BooleanConstraints = self._pooled_constraints("boolean", {"p": p})
1014 return self._draw("boolean", constraints, observe=observe, forced=forced)
1015
1016 @overload
1017 def _pooled_constraints(
1018 self, choice_type: Literal["integer"], constraints: IntegerConstraints
1019 ) -> IntegerConstraints: ...
1020
1021 @overload
1022 def _pooled_constraints(
1023 self, choice_type: Literal["float"], constraints: FloatConstraints
1024 ) -> FloatConstraints: ...
1025
1026 @overload
1027 def _pooled_constraints(
1028 self, choice_type: Literal["string"], constraints: StringConstraints
1029 ) -> StringConstraints: ...
1030
1031 @overload
1032 def _pooled_constraints(
1033 self, choice_type: Literal["bytes"], constraints: BytesConstraints
1034 ) -> BytesConstraints: ...
1035
1036 @overload
1037 def _pooled_constraints(
1038 self, choice_type: Literal["boolean"], constraints: BooleanConstraints
1039 ) -> BooleanConstraints: ...
1040
1041 def _pooled_constraints(
1042 self, choice_type: ChoiceTypeT, constraints: ChoiceConstraintsT
1043 ) -> ChoiceConstraintsT:
1044 """Memoize common dictionary objects to reduce memory pressure."""
1045 # caching runs afoul of nondeterminism checks
1046 if self.provider.avoid_realization:
1047 return constraints
1048
1049 key = (choice_type, *choice_constraints_key(choice_type, constraints))
1050 try:
1051 return POOLED_CONSTRAINTS_CACHE[key]
1052 except KeyError:
1053 POOLED_CONSTRAINTS_CACHE[key] = constraints
1054 return constraints
1055
1056 def _pop_choice(
1057 self,
1058 choice_type: ChoiceTypeT,
1059 constraints: ChoiceConstraintsT,
1060 *,
1061 forced: Optional[ChoiceT],
1062 ) -> ChoiceT:
1063 assert self.prefix is not None
1064 # checked in _draw
1065 assert self.index < len(self.prefix)
1066
1067 value = self.prefix[self.index]
1068 if isinstance(value, ChoiceTemplate):
1069 node: ChoiceTemplate = value
1070 if node.count is not None:
1071 assert node.count >= 0
1072 # node templates have to be at the end for now, since it's not immediately
1073 # apparent how to handle overruning a node template while generating a single
1074 # node if the alternative is not "the entire data is an overrun".
1075 assert self.index == len(self.prefix) - 1
1076 if node.type == "simplest":
1077 if forced is not None:
1078 choice = forced
1079 try:
1080 choice = choice_from_index(0, choice_type, constraints)
1081 except ChoiceTooLarge:
1082 self.mark_overrun()
1083 else:
1084 raise NotImplementedError
1085
1086 if node.count is not None:
1087 node.count -= 1
1088 if node.count < 0:
1089 self.mark_overrun()
1090 return choice
1091
1092 choice = value
1093 node_choice_type = {
1094 str: "string",
1095 float: "float",
1096 int: "integer",
1097 bool: "boolean",
1098 bytes: "bytes",
1099 }[type(choice)]
1100 # If we're trying to:
1101 # * draw a different choice type at the same location
1102 # * draw the same choice type with a different constraints, which does not permit
1103 # the current value
1104 #
1105 # then we call this a misalignment, because the choice sequence has
1106 # changed from what we expected at some point. An easy misalignment is
1107 #
1108 # one_of(integers(0, 100), integers(101, 200))
1109 #
1110 # where the choice sequence [0, 100] has constraints {min_value: 0, max_value: 100}
1111 # at index 1, but [0, 101] has constraints {min_value: 101, max_value: 200} at
1112 # index 1 (which does not permit any of the values 0-100).
1113 #
1114 # When the choice sequence becomes misaligned, we generate a new value of the
1115 # type and constraints the strategy expects.
1116 if node_choice_type != choice_type or not choice_permitted(choice, constraints):
1117 # only track first misalignment for now.
1118 if self.misaligned_at is None:
1119 self.misaligned_at = (self.index, choice_type, constraints, forced)
1120 try:
1121 # Fill in any misalignments with index 0 choices. An alternative to
1122 # this is using the index of the misaligned choice instead
1123 # of index 0, which may be useful for maintaining
1124 # "similarly-complex choices" in the shrinker. This requires
1125 # attaching an index to every choice in ConjectureData.for_choices,
1126 # which we don't always have (e.g. when reading from db).
1127 #
1128 # If we really wanted this in the future we could make this complexity
1129 # optional, use it if present, and default to index 0 otherwise.
1130 # This complicates our internal api and so I'd like to avoid it
1131 # if possible.
1132 #
1133 # Additionally, I don't think slips which require
1134 # slipping to high-complexity values are common. Though arguably
1135 # we may want to expand a bit beyond *just* the simplest choice.
1136 # (we could for example consider sampling choices from index 0-10).
1137 choice = choice_from_index(0, choice_type, constraints)
1138 except ChoiceTooLarge:
1139 # should really never happen with a 0-index choice, but let's be safe.
1140 self.mark_overrun()
1141
1142 self.index += 1
1143 return choice
1144
1145 def as_result(self) -> Union[ConjectureResult, _Overrun]:
1146 """Convert the result of running this test into
1147 either an Overrun object or a ConjectureResult."""
1148
1149 assert self.frozen
1150 if self.status == Status.OVERRUN:
1151 return Overrun
1152 if self.__result is None:
1153 self.__result = ConjectureResult(
1154 status=self.status,
1155 interesting_origin=self.interesting_origin,
1156 spans=self.spans,
1157 nodes=self.nodes,
1158 length=self.length,
1159 output=self.output,
1160 expected_traceback=self.expected_traceback,
1161 expected_exception=self.expected_exception,
1162 extra_information=(
1163 self.extra_information
1164 if self.extra_information.has_information()
1165 else None
1166 ),
1167 has_discards=self.has_discards,
1168 target_observations=self.target_observations,
1169 tags=frozenset(self.tags),
1170 arg_slices=self.arg_slices,
1171 slice_comments=self.slice_comments,
1172 misaligned_at=self.misaligned_at,
1173 cannot_proceed_scope=self.cannot_proceed_scope,
1174 )
1175 assert self.__result is not None
1176 return self.__result
1177
1178 def __assert_not_frozen(self, name: str) -> None:
1179 if self.frozen:
1180 raise Frozen(f"Cannot call {name} on frozen ConjectureData")
1181
1182 def note(self, value: Any) -> None:
1183 self.__assert_not_frozen("note")
1184 if not isinstance(value, str):
1185 value = repr(value)
1186 self.output += value
1187
1188 def draw(
1189 self,
1190 strategy: "SearchStrategy[Ex]",
1191 label: Optional[int] = None,
1192 observe_as: Optional[str] = None,
1193 ) -> "Ex":
1194 from hypothesis.internal.observability import TESTCASE_CALLBACKS
1195 from hypothesis.strategies._internal.lazy import unwrap_strategies
1196 from hypothesis.strategies._internal.utils import to_jsonable
1197
1198 if self.is_find and not strategy.supports_find:
1199 raise InvalidArgument(
1200 f"Cannot use strategy {strategy!r} within a call to find "
1201 "(presumably because it would be invalid after the call had ended)."
1202 )
1203
1204 at_top_level = self.depth == 0
1205 start_time = None
1206 if at_top_level:
1207 # We start this timer early, because accessing attributes on a LazyStrategy
1208 # can be almost arbitrarily slow. In cases like characters() and text()
1209 # where we cache something expensive, this led to Flaky deadline errors!
1210 # See https://github.com/HypothesisWorks/hypothesis/issues/2108
1211 start_time = time.perf_counter()
1212 gc_start_time = gc_cumulative_time()
1213
1214 strategy.validate()
1215
1216 if strategy.is_empty:
1217 self.mark_invalid(f"empty strategy {self!r}")
1218
1219 if self.depth >= MAX_DEPTH:
1220 self.mark_invalid("max depth exceeded")
1221
1222 # Jump directly to the unwrapped strategy for the label and for do_draw.
1223 # This avoids adding an extra span to all lazy strategies.
1224 unwrapped = unwrap_strategies(strategy)
1225 if label is None:
1226 label = unwrapped.label
1227 assert isinstance(label, int)
1228
1229 self.start_span(label=label)
1230 try:
1231 if not at_top_level:
1232 return unwrapped.do_draw(self)
1233 assert start_time is not None
1234 key = observe_as or f"generate:unlabeled_{len(self.draw_times)}"
1235 try:
1236 try:
1237 v = unwrapped.do_draw(self)
1238 finally:
1239 # Subtract the time spent in GC to avoid overcounting, as it is
1240 # accounted for at the overall example level.
1241 in_gctime = gc_cumulative_time() - gc_start_time
1242 self.draw_times[key] = time.perf_counter() - start_time - in_gctime
1243 except Exception as err:
1244 add_note(
1245 err,
1246 f"while generating {key.removeprefix('generate:')!r} from {strategy!r}",
1247 )
1248 raise
1249 if TESTCASE_CALLBACKS:
1250 avoid = self.provider.avoid_realization
1251 self._observability_args[key] = to_jsonable(v, avoid_realization=avoid)
1252 return v
1253 finally:
1254 self.stop_span()
1255
1256 def start_span(self, label: int) -> None:
1257 self.provider.span_start(label)
1258 self.__assert_not_frozen("start_span")
1259 self.depth += 1
1260 # Logically it would make sense for this to just be
1261 # ``self.depth = max(self.depth, self.max_depth)``, which is what it used to
1262 # be until we ran the code under tracemalloc and found a rather significant
1263 # chunk of allocation was happening here. This was presumably due to varargs
1264 # or the like, but we didn't investigate further given that it was easy
1265 # to fix with this check.
1266 if self.depth > self.max_depth:
1267 self.max_depth = self.depth
1268 self.__span_record.start_span(label)
1269 self.labels_for_structure_stack.append({label})
1270
1271 def stop_span(self, *, discard: bool = False) -> None:
1272 self.provider.span_end(discard)
1273 if self.frozen:
1274 return
1275 if discard:
1276 self.has_discards = True
1277 self.depth -= 1
1278 assert self.depth >= -1
1279 self.__span_record.stop_span(discard=discard)
1280
1281 labels_for_structure = self.labels_for_structure_stack.pop()
1282
1283 if not discard:
1284 if self.labels_for_structure_stack:
1285 self.labels_for_structure_stack[-1].update(labels_for_structure)
1286 else:
1287 self.tags.update([structural_coverage(l) for l in labels_for_structure])
1288
1289 if discard:
1290 # Once we've discarded a span, every test case starting with
1291 # this prefix contains discards. We prune the tree at that point so
1292 # as to avoid future test cases bothering with this region, on the
1293 # assumption that some span that you could have used instead
1294 # there would *not* trigger the discard. This greatly speeds up
1295 # test case generation in some cases, because it allows us to
1296 # ignore large swathes of the search space that are effectively
1297 # redundant.
1298 #
1299 # A scenario that can cause us problems but which we deliberately
1300 # have decided not to support is that if there are side effects
1301 # during data generation then you may end up with a scenario where
1302 # every good test case generates a discard because the discarded
1303 # section sets up important things for later. This is not terribly
1304 # likely and all that you see in this case is some degradation in
1305 # quality of testing, so we don't worry about it.
1306 #
1307 # Note that killing the branch does *not* mean we will never
1308 # explore below this point, and in particular we may do so during
1309 # shrinking. Any explicit request for a data object that starts
1310 # with the branch here will work just fine, but novel prefix
1311 # generation will avoid it, and we can use it to detect when we
1312 # have explored the entire tree (up to redundancy).
1313
1314 self.observer.kill_branch()
1315
1316 @property
1317 def spans(self) -> Spans:
1318 assert self.frozen
1319 if self.__spans is None:
1320 self.__spans = Spans(record=self.__span_record)
1321 return self.__spans
1322
1323 def freeze(self) -> None:
1324 if self.frozen:
1325 return
1326 self.finish_time = time.perf_counter()
1327 self.gc_finish_time = gc_cumulative_time()
1328
1329 # Always finish by closing all remaining spans so that we have a valid tree.
1330 while self.depth >= 0:
1331 self.stop_span()
1332
1333 self.__span_record.freeze()
1334 self.frozen = True
1335 self.observer.conclude_test(self.status, self.interesting_origin)
1336
1337 def choice(
1338 self,
1339 values: Sequence[T],
1340 *,
1341 forced: Optional[T] = None,
1342 observe: bool = True,
1343 ) -> T:
1344 forced_i = None if forced is None else values.index(forced)
1345 i = self.draw_integer(
1346 0,
1347 len(values) - 1,
1348 forced=forced_i,
1349 observe=observe,
1350 )
1351 return values[i]
1352
1353 def conclude_test(
1354 self,
1355 status: Status,
1356 interesting_origin: Optional[InterestingOrigin] = None,
1357 ) -> NoReturn:
1358 assert (interesting_origin is None) or (status == Status.INTERESTING)
1359 self.__assert_not_frozen("conclude_test")
1360 self.interesting_origin = interesting_origin
1361 self.status = status
1362 self.freeze()
1363 raise StopTest(self.testcounter)
1364
1365 def mark_interesting(
1366 self, interesting_origin: Optional[InterestingOrigin] = None
1367 ) -> NoReturn:
1368 self.conclude_test(Status.INTERESTING, interesting_origin)
1369
1370 def mark_invalid(self, why: Optional[str] = None) -> NoReturn:
1371 if why is not None:
1372 self.events["invalid because"] = why
1373 self.conclude_test(Status.INVALID)
1374
1375 def mark_overrun(self) -> NoReturn:
1376 self.conclude_test(Status.OVERRUN)
1377
1378
1379def draw_choice(
1380 choice_type: ChoiceTypeT, constraints: ChoiceConstraintsT, *, random: Random
1381) -> ChoiceT:
1382 cd = ConjectureData(random=random)
1383 return cast(ChoiceT, getattr(cd.provider, f"draw_{choice_type}")(**constraints))