1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
10
11import math
12import time
13from collections import defaultdict
14from collections.abc import Hashable, Iterable, Iterator, Sequence
15from dataclasses import dataclass, field
16from enum import IntEnum
17from functools import cached_property
18from random import Random
19from typing import (
20 TYPE_CHECKING,
21 Any,
22 Literal,
23 NoReturn,
24 TypeAlias,
25 TypeVar,
26 cast,
27 overload,
28)
29
30from hypothesis.errors import (
31 CannotProceedScopeT,
32 ChoiceTooLarge,
33 FlakyStrategyDefinition,
34 Frozen,
35 InvalidArgument,
36 StopTest,
37)
38from hypothesis.internal.cache import LRUCache
39from hypothesis.internal.compat import add_note
40from hypothesis.internal.conjecture.choice import (
41 BooleanConstraints,
42 BytesConstraints,
43 ChoiceConstraintsT,
44 ChoiceNode,
45 ChoiceT,
46 ChoiceTemplate,
47 ChoiceTypeT,
48 FloatConstraints,
49 IntegerConstraints,
50 StringConstraints,
51 choice_constraints_key,
52 choice_from_index,
53 choice_permitted,
54 choices_size,
55)
56from hypothesis.internal.conjecture.junkdrawer import IntList, gc_cumulative_time
57from hypothesis.internal.conjecture.providers import (
58 COLLECTION_DEFAULT_MAX_SIZE,
59 HypothesisProvider,
60 PrimitiveProvider,
61)
62from hypothesis.internal.conjecture.utils import calc_label_from_name
63from hypothesis.internal.escalation import InterestingOrigin
64from hypothesis.internal.floats import (
65 SMALLEST_SUBNORMAL,
66 float_to_int,
67 int_to_float,
68 sign_aware_lte,
69)
70from hypothesis.internal.intervalsets import IntervalSet
71from hypothesis.internal.observability import PredicateCounts
72from hypothesis.reporting import debug_report
73from hypothesis.utils.conventions import not_set
74from hypothesis.utils.deprecation import note_deprecation
75from hypothesis.utils.threading import ThreadLocal
76
77if TYPE_CHECKING:
78 from hypothesis.strategies import SearchStrategy
79 from hypothesis.strategies._internal.core import DataObject
80 from hypothesis.strategies._internal.random import RandomState
81 from hypothesis.strategies._internal.strategies import Ex
82
83
84def __getattr__(name: str) -> Any:
85 if name == "AVAILABLE_PROVIDERS":
86 from hypothesis.internal.conjecture.providers import AVAILABLE_PROVIDERS
87
88 note_deprecation(
89 "hypothesis.internal.conjecture.data.AVAILABLE_PROVIDERS has been moved to "
90 "hypothesis.internal.conjecture.providers.AVAILABLE_PROVIDERS.",
91 since="2025-01-25",
92 has_codemod=False,
93 stacklevel=1,
94 )
95 return AVAILABLE_PROVIDERS
96
97 raise AttributeError(
98 f"Module 'hypothesis.internal.conjecture.data' has no attribute {name}"
99 )
100
101
102T = TypeVar("T")
103TargetObservations = dict[str, int | float]
104# index, choice_type, constraints, forced value
105MisalignedAt: TypeAlias = tuple[int, ChoiceTypeT, ChoiceConstraintsT, ChoiceT | None]
106
107TOP_LABEL = calc_label_from_name("top")
108MAX_DEPTH = 100
109
110threadlocal = ThreadLocal(global_test_counter=int)
111
112
113class Status(IntEnum):
114 OVERRUN = 0
115 INVALID = 1
116 VALID = 2
117 INTERESTING = 3
118
119 def __repr__(self) -> str:
120 return f"Status.{self.name}"
121
122
123@dataclass(slots=True, frozen=True)
124class StructuralCoverageTag:
125 label: int
126
127
128STRUCTURAL_COVERAGE_CACHE: dict[int, StructuralCoverageTag] = {}
129
130
131def structural_coverage(label: int) -> StructuralCoverageTag:
132 try:
133 return STRUCTURAL_COVERAGE_CACHE[label]
134 except KeyError:
135 return STRUCTURAL_COVERAGE_CACHE.setdefault(label, StructuralCoverageTag(label))
136
137
138# This cache can be quite hot and so we prefer LRUCache over LRUReusedCache for
139# performance. We lose scan resistance, but that's probably fine here.
140POOLED_CONSTRAINTS_CACHE: LRUCache[tuple[Any, ...], ChoiceConstraintsT] = LRUCache(4096)
141
142
143class Span:
144 """A span tracks the hierarchical structure of choices within a single test run.
145
146 Spans are created to mark regions of the choice sequence that are
147 logically related to each other. For instance, Hypothesis tracks:
148 - A single top-level span for the entire choice sequence
149 - A span for the choices made by each strategy
150 - Some strategies define additional spans within their choices. For instance,
151 st.lists() tracks the "should add another element" choice and the "add
152 another element" choices as separate spans.
153
154 Spans provide useful information to the shrinker, mutator, targeted PBT,
155 and other subsystems of Hypothesis.
156
157 Rather than store each ``Span`` as a rich object, it is actually
158 just an index into the ``Spans`` class defined below. This has two
159 purposes: Firstly, for most properties of spans we will never need
160 to allocate storage at all, because most properties are not used on
161 most spans. Secondly, by storing the spans as compact lists
162 of integers, we save a considerable amount of space compared to
163 Python's normal object size.
164
165 This does have the downside that it increases the amount of allocation
166 we do, and slows things down as a result, in some usage patterns because
167 we repeatedly allocate the same Span or int objects, but it will
168 often dramatically reduce our memory usage, so is worth it.
169 """
170
171 __slots__ = ("index", "owner")
172
173 def __init__(self, owner: "Spans", index: int) -> None:
174 self.owner = owner
175 self.index = index
176
177 def __eq__(self, other: object) -> bool:
178 if self is other:
179 return True
180 if not isinstance(other, Span):
181 return NotImplemented
182 return (self.owner is other.owner) and (self.index == other.index)
183
184 def __ne__(self, other: object) -> bool:
185 if self is other:
186 return False
187 if not isinstance(other, Span):
188 return NotImplemented
189 return (self.owner is not other.owner) or (self.index != other.index)
190
191 def __repr__(self) -> str:
192 return f"spans[{self.index}]"
193
194 @property
195 def label(self) -> int:
196 """A label is an opaque value that associates each span with its
197 approximate origin, such as a particular strategy class or a particular
198 kind of draw."""
199 return self.owner.labels[self.owner.label_indices[self.index]]
200
201 @property
202 def parent(self) -> int | None:
203 """The index of the span that this one is nested directly within."""
204 if self.index == 0:
205 return None
206 return self.owner.parentage[self.index]
207
208 @property
209 def start(self) -> int:
210 return self.owner.starts[self.index]
211
212 @property
213 def end(self) -> int:
214 return self.owner.ends[self.index]
215
216 @property
217 def depth(self) -> int:
218 """
219 Depth of this span in the span tree. The top-level span has a depth of 0.
220 """
221 return self.owner.depths[self.index]
222
223 @property
224 def discarded(self) -> bool:
225 """True if this is span's ``stop_span`` call had ``discard`` set to
226 ``True``. This means we believe that the shrinker should be able to delete
227 this span completely, without affecting the value produced by its enclosing
228 strategy. Typically set when a rejection sampler decides to reject a
229 generated value and try again."""
230 return self.index in self.owner.discarded
231
232 @property
233 def choice_count(self) -> int:
234 """The number of choices in this span."""
235 return self.end - self.start
236
237 @property
238 def children(self) -> "list[Span]":
239 """The list of all spans with this as a parent, in increasing index
240 order."""
241 return [self.owner[i] for i in self.owner.children[self.index]]
242
243
244class SpanProperty:
245 """There are many properties of spans that we calculate by
246 essentially rerunning the test case multiple times based on the
247 calls which we record in SpanProperty.
248
249 This class defines a visitor, subclasses of which can be used
250 to calculate these properties.
251 """
252
253 def __init__(self, spans: "Spans"):
254 self.span_stack: list[int] = []
255 self.spans = spans
256 self.span_count = 0
257 self.choice_count = 0
258
259 def run(self) -> Any:
260 """Rerun the test case with this visitor and return the
261 results of ``self.finish()``."""
262 for record in self.spans.trail:
263 if record == TrailType.STOP_SPAN_DISCARD:
264 self.__pop(discarded=True)
265 elif record == TrailType.STOP_SPAN_NO_DISCARD:
266 self.__pop(discarded=False)
267 elif record == TrailType.CHOICE:
268 self.choice_count += 1
269 else:
270 # everything after TrailType.CHOICE is the label of a span start.
271 self.__push(record - TrailType.CHOICE - 1)
272
273 return self.finish()
274
275 def __push(self, label_index: int) -> None:
276 i = self.span_count
277 assert i < len(self.spans)
278 self.start_span(i, label_index=label_index)
279 self.span_count += 1
280 self.span_stack.append(i)
281
282 def __pop(self, *, discarded: bool) -> None:
283 i = self.span_stack.pop()
284 self.stop_span(i, discarded=discarded)
285
286 def start_span(self, i: int, label_index: int) -> None:
287 """Called at the start of each span, with ``i`` the
288 index of the span and ``label_index`` the index of
289 its label in ``self.spans.labels``."""
290
291 def stop_span(self, i: int, *, discarded: bool) -> None:
292 """Called at the end of each span, with ``i`` the
293 index of the span and ``discarded`` being ``True`` if ``stop_span``
294 was called with ``discard=True``."""
295
296 def finish(self) -> Any:
297 raise NotImplementedError
298
299
300class TrailType(IntEnum):
301 STOP_SPAN_DISCARD = 1
302 STOP_SPAN_NO_DISCARD = 2
303 CHOICE = 3
304 # every trail element larger than TrailType.CHOICE is the label of a span
305 # start, offset by its index. So the first span label is stored as 4, the
306 # second as 5, etc, regardless of its actual integer label.
307
308
309class SpanRecord:
310 """Records the series of ``start_span``, ``stop_span``, and
311 ``draw_bits`` calls so that these may be stored in ``Spans`` and
312 replayed when we need to know about the structure of individual
313 ``Span`` objects.
314
315 Note that there is significant similarity between this class and
316 ``DataObserver``, and the plan is to eventually unify them, but
317 they currently have slightly different functions and implementations.
318 """
319
320 def __init__(self) -> None:
321 self.labels: list[int] = []
322 self.__index_of_labels: dict[int, int] | None = {}
323 self.trail = IntList()
324 self.nodes: list[ChoiceNode] = []
325
326 def freeze(self) -> None:
327 self.__index_of_labels = None
328
329 def record_choice(self) -> None:
330 self.trail.append(TrailType.CHOICE)
331
332 def start_span(self, label: int) -> None:
333 assert self.__index_of_labels is not None
334 try:
335 i = self.__index_of_labels[label]
336 except KeyError:
337 i = self.__index_of_labels.setdefault(label, len(self.labels))
338 self.labels.append(label)
339 self.trail.append(TrailType.CHOICE + 1 + i)
340
341 def stop_span(self, *, discard: bool) -> None:
342 if discard:
343 self.trail.append(TrailType.STOP_SPAN_DISCARD)
344 else:
345 self.trail.append(TrailType.STOP_SPAN_NO_DISCARD)
346
347
348class _starts_and_ends(SpanProperty):
349 def __init__(self, spans: "Spans") -> None:
350 super().__init__(spans)
351 self.starts = IntList.of_length(len(self.spans))
352 self.ends = IntList.of_length(len(self.spans))
353
354 def start_span(self, i: int, label_index: int) -> None:
355 self.starts[i] = self.choice_count
356
357 def stop_span(self, i: int, *, discarded: bool) -> None:
358 self.ends[i] = self.choice_count
359
360 def finish(self) -> tuple[IntList, IntList]:
361 return (self.starts, self.ends)
362
363
364class _discarded(SpanProperty):
365 def __init__(self, spans: "Spans") -> None:
366 super().__init__(spans)
367 self.result: set[int] = set()
368
369 def finish(self) -> frozenset[int]:
370 return frozenset(self.result)
371
372 def stop_span(self, i: int, *, discarded: bool) -> None:
373 if discarded:
374 self.result.add(i)
375
376
377class _parentage(SpanProperty):
378 def __init__(self, spans: "Spans") -> None:
379 super().__init__(spans)
380 self.result = IntList.of_length(len(self.spans))
381
382 def stop_span(self, i: int, *, discarded: bool) -> None:
383 if i > 0:
384 self.result[i] = self.span_stack[-1]
385
386 def finish(self) -> IntList:
387 return self.result
388
389
390class _depths(SpanProperty):
391 def __init__(self, spans: "Spans") -> None:
392 super().__init__(spans)
393 self.result = IntList.of_length(len(self.spans))
394
395 def start_span(self, i: int, label_index: int) -> None:
396 self.result[i] = len(self.span_stack)
397
398 def finish(self) -> IntList:
399 return self.result
400
401
402class _label_indices(SpanProperty):
403 def __init__(self, spans: "Spans") -> None:
404 super().__init__(spans)
405 self.result = IntList.of_length(len(self.spans))
406
407 def start_span(self, i: int, label_index: int) -> None:
408 self.result[i] = label_index
409
410 def finish(self) -> IntList:
411 return self.result
412
413
414class _mutator_groups(SpanProperty):
415 def __init__(self, spans: "Spans") -> None:
416 super().__init__(spans)
417 self.groups: dict[int, set[tuple[int, int]]] = defaultdict(set)
418
419 def start_span(self, i: int, label_index: int) -> None:
420 # TODO should we discard start == end cases? occurs for eg st.data()
421 # which is conditionally or never drawn from. arguably swapping
422 # nodes with the empty list is a useful mutation enabled by start == end?
423 key = (self.spans[i].start, self.spans[i].end)
424 self.groups[label_index].add(key)
425
426 def finish(self) -> Iterable[set[tuple[int, int]]]:
427 # Discard groups with only one span, since the mutator can't
428 # do anything useful with them.
429 return [g for g in self.groups.values() if len(g) >= 2]
430
431
432class Spans:
433 """A lazy collection of ``Span`` objects, derived from
434 the record of recorded behaviour in ``SpanRecord``.
435
436 Behaves logically as if it were a list of ``Span`` objects,
437 but actually mostly exists as a compact store of information
438 for them to reference into. All properties on here are best
439 understood as the backing storage for ``Span`` and are
440 described there.
441 """
442
443 def __init__(self, record: SpanRecord) -> None:
444 self.trail = record.trail
445 self.labels = record.labels
446 self.__length = self.trail.count(
447 TrailType.STOP_SPAN_DISCARD
448 ) + record.trail.count(TrailType.STOP_SPAN_NO_DISCARD)
449 self.__children: list[Sequence[int]] | None = None
450
451 @cached_property
452 def starts_and_ends(self) -> tuple[IntList, IntList]:
453 return _starts_and_ends(self).run()
454
455 @property
456 def starts(self) -> IntList:
457 return self.starts_and_ends[0]
458
459 @property
460 def ends(self) -> IntList:
461 return self.starts_and_ends[1]
462
463 @cached_property
464 def discarded(self) -> frozenset[int]:
465 return _discarded(self).run()
466
467 @cached_property
468 def parentage(self) -> IntList:
469 return _parentage(self).run()
470
471 @cached_property
472 def depths(self) -> IntList:
473 return _depths(self).run()
474
475 @cached_property
476 def label_indices(self) -> IntList:
477 return _label_indices(self).run()
478
479 @cached_property
480 def mutator_groups(self) -> list[set[tuple[int, int]]]:
481 return _mutator_groups(self).run()
482
483 @property
484 def children(self) -> list[Sequence[int]]:
485 if self.__children is None:
486 children = [IntList() for _ in range(len(self))]
487 for i, p in enumerate(self.parentage):
488 if i > 0:
489 children[p].append(i)
490 # Replace empty children lists with a tuple to reduce
491 # memory usage.
492 for i, c in enumerate(children):
493 if not c:
494 children[i] = () # type: ignore
495 self.__children = children # type: ignore
496 return self.__children # type: ignore
497
498 def __len__(self) -> int:
499 return self.__length
500
501 def __getitem__(self, i: int) -> Span:
502 n = self.__length
503 if i < -n or i >= n:
504 raise IndexError(f"Index {i} out of range [-{n}, {n})")
505 if i < 0:
506 i += n
507 return Span(self, i)
508
509 # not strictly necessary as we have len/getitem, but required for mypy.
510 # https://github.com/python/mypy/issues/9737
511 def __iter__(self) -> Iterator[Span]:
512 for i in range(len(self)):
513 yield self[i]
514
515
516class _Overrun:
517 status: Status = Status.OVERRUN
518
519 def __repr__(self) -> str:
520 return "Overrun"
521
522
523Overrun = _Overrun()
524
525
526class DataObserver:
527 """Observer class for recording the behaviour of a
528 ConjectureData object, primarily used for tracking
529 the behaviour in the tree cache."""
530
531 def conclude_test(
532 self,
533 status: Status,
534 interesting_origin: InterestingOrigin | None,
535 ) -> None:
536 """Called when ``conclude_test`` is called on the
537 observed ``ConjectureData``, with the same arguments.
538
539 Note that this is called after ``freeze`` has completed.
540 """
541
542 def kill_branch(self) -> None:
543 """Mark this part of the tree as not worth re-exploring."""
544
545 def draw_integer(
546 self, value: int, *, constraints: IntegerConstraints, was_forced: bool
547 ) -> None:
548 pass
549
550 def draw_float(
551 self, value: float, *, constraints: FloatConstraints, was_forced: bool
552 ) -> None:
553 pass
554
555 def draw_string(
556 self, value: str, *, constraints: StringConstraints, was_forced: bool
557 ) -> None:
558 pass
559
560 def draw_bytes(
561 self, value: bytes, *, constraints: BytesConstraints, was_forced: bool
562 ) -> None:
563 pass
564
565 def draw_boolean(
566 self, value: bool, *, constraints: BooleanConstraints, was_forced: bool
567 ) -> None:
568 pass
569
570
571@dataclass(slots=True, frozen=True)
572class ConjectureResult:
573 """Result class storing the parts of ConjectureData that we
574 will care about after the original ConjectureData has outlived its
575 usefulness."""
576
577 status: Status
578 interesting_origin: InterestingOrigin | None
579 nodes: tuple[ChoiceNode, ...] = field(repr=False, compare=False)
580 length: int
581 output: str
582 expected_exception: BaseException | None
583 expected_traceback: str | None
584 has_discards: bool
585 target_observations: TargetObservations
586 tags: frozenset[StructuralCoverageTag]
587 spans: Spans = field(repr=False, compare=False)
588 arg_slices: set[tuple[int, int]] = field(repr=False)
589 slice_comments: dict[tuple[int, int], str] = field(repr=False)
590 misaligned_at: MisalignedAt | None = field(repr=False)
591 cannot_proceed_scope: CannotProceedScopeT | None = field(repr=False)
592
593 def as_result(self) -> "ConjectureResult":
594 return self
595
596 @property
597 def choices(self) -> tuple[ChoiceT, ...]:
598 return tuple(node.value for node in self.nodes)
599
600
601class ConjectureData:
602 @classmethod
603 def for_choices(
604 cls,
605 choices: Sequence[ChoiceTemplate | ChoiceT],
606 *,
607 observer: DataObserver | None = None,
608 provider: PrimitiveProvider | type[PrimitiveProvider] = HypothesisProvider,
609 random: Random | None = None,
610 ) -> "ConjectureData":
611 from hypothesis.internal.conjecture.engine import choice_count
612
613 return cls(
614 max_choices=choice_count(choices),
615 random=random,
616 prefix=choices,
617 observer=observer,
618 provider=provider,
619 )
620
621 def __init__(
622 self,
623 *,
624 random: Random | None,
625 observer: DataObserver | None = None,
626 provider: PrimitiveProvider | type[PrimitiveProvider] = HypothesisProvider,
627 prefix: Sequence[ChoiceTemplate | ChoiceT] | None = None,
628 max_choices: int | None = None,
629 provider_kw: dict[str, Any] | None = None,
630 ) -> None:
631 from hypothesis.internal.conjecture.engine import BUFFER_SIZE
632
633 if observer is None:
634 observer = DataObserver()
635 if provider_kw is None:
636 provider_kw = {}
637 elif not isinstance(provider, type):
638 raise InvalidArgument(
639 f"Expected {provider=} to be a class since {provider_kw=} was "
640 "passed, but got an instance instead."
641 )
642
643 assert isinstance(observer, DataObserver)
644 self.observer = observer
645 self.max_choices = max_choices
646 self.max_length = BUFFER_SIZE
647 self.overdraw = 0
648 self._random = random
649
650 self.length: int = 0
651 self.index: int = 0
652 self.output: str = ""
653 self.status: Status = Status.VALID
654 self.frozen: bool = False
655 self.testcounter: int = threadlocal.global_test_counter
656 threadlocal.global_test_counter += 1
657 self.start_time = time.perf_counter()
658 self.gc_start_time = gc_cumulative_time()
659 self.events: dict[str, str | int | float] = {}
660 self.interesting_origin: InterestingOrigin | None = None
661 self.draw_times: dict[str, float] = {}
662 self._stateful_run_times: dict[str, float] = defaultdict(float)
663 self.max_depth: int = 0
664 self.has_discards: bool = False
665
666 self.provider: PrimitiveProvider = (
667 provider(self, **provider_kw) if isinstance(provider, type) else provider
668 )
669 assert isinstance(self.provider, PrimitiveProvider)
670
671 self.__result: ConjectureResult | None = None
672
673 # Observations used for targeted search. They'll be aggregated in
674 # ConjectureRunner.generate_new_examples and fed to TargetSelector.
675 self.target_observations: TargetObservations = {}
676
677 # Tags which indicate something about which part of the search space
678 # this example is in. These are used to guide generation.
679 self.tags: set[StructuralCoverageTag] = set()
680 self.labels_for_structure_stack: list[set[int]] = []
681
682 # Normally unpopulated but we need this in the niche case
683 # that self.as_result() is Overrun but we still want the
684 # examples for reporting purposes.
685 self.__spans: Spans | None = None
686
687 # We want the top level span to have depth 0, so we start at -1.
688 self.depth: int = -1
689 self.__span_record = SpanRecord()
690
691 # Slice indices for discrete reportable parts that which-parts-matter can
692 # try varying, to report if the minimal example always fails anyway.
693 self.arg_slices: set[tuple[int, int]] = set()
694 self.slice_comments: dict[tuple[int, int], str] = {}
695 self._observability_args: dict[str, Any] = {}
696 self._observability_predicates: defaultdict[str, PredicateCounts] = defaultdict(
697 PredicateCounts
698 )
699
700 self._sampled_from_all_strategies_elements_message: (
701 tuple[str, object] | None
702 ) = None
703 self._shared_strategy_draws: dict[Hashable, tuple[Any, SearchStrategy]] = {}
704 self._shared_data_strategy: DataObject | None = None
705 self._stateful_repr_parts: list[Any] | None = None
706 self.states_for_ids: dict[int, RandomState] | None = None
707 self.seeds_to_states: dict[Any, RandomState] | None = None
708 self.hypothesis_runner: Any = not_set
709
710 self.expected_exception: BaseException | None = None
711 self.expected_traceback: str | None = None
712
713 self.prefix = prefix
714 self.nodes: tuple[ChoiceNode, ...] = ()
715 self.misaligned_at: MisalignedAt | None = None
716 self.cannot_proceed_scope: CannotProceedScopeT | None = None
717 self.start_span(TOP_LABEL)
718
719 def __repr__(self) -> str:
720 return "ConjectureData(%s, %d choices%s)" % (
721 self.status.name,
722 len(self.nodes),
723 ", frozen" if self.frozen else "",
724 )
725
726 @property
727 def choices(self) -> tuple[ChoiceT, ...]:
728 return tuple(node.value for node in self.nodes)
729
730 # draw_* functions might be called in one of two contexts: either "above" or
731 # "below" the choice sequence. For instance, draw_string calls draw_boolean
732 # from ``many`` when calculating the number of characters to return. We do
733 # not want these choices to get written to the choice sequence, because they
734 # are not true choices themselves.
735 #
736 # `observe` formalizes this. The choice will only be written to the choice
737 # sequence if observe is True.
738
739 @overload
740 def _draw(
741 self,
742 choice_type: Literal["integer"],
743 constraints: IntegerConstraints,
744 *,
745 observe: bool,
746 forced: int | None,
747 ) -> int: ...
748
749 @overload
750 def _draw(
751 self,
752 choice_type: Literal["float"],
753 constraints: FloatConstraints,
754 *,
755 observe: bool,
756 forced: float | None,
757 ) -> float: ...
758
759 @overload
760 def _draw(
761 self,
762 choice_type: Literal["string"],
763 constraints: StringConstraints,
764 *,
765 observe: bool,
766 forced: str | None,
767 ) -> str: ...
768
769 @overload
770 def _draw(
771 self,
772 choice_type: Literal["bytes"],
773 constraints: BytesConstraints,
774 *,
775 observe: bool,
776 forced: bytes | None,
777 ) -> bytes: ...
778
779 @overload
780 def _draw(
781 self,
782 choice_type: Literal["boolean"],
783 constraints: BooleanConstraints,
784 *,
785 observe: bool,
786 forced: bool | None,
787 ) -> bool: ...
788
789 def _draw(
790 self,
791 choice_type: ChoiceTypeT,
792 constraints: ChoiceConstraintsT,
793 *,
794 observe: bool,
795 forced: ChoiceT | None,
796 ) -> ChoiceT:
797 # this is somewhat redundant with the length > max_length check at the
798 # end of the function, but avoids trying to use a null self.random when
799 # drawing past the node of a ConjectureData.for_choices data.
800 if self.length == self.max_length:
801 debug_report(f"overrun because hit {self.max_length=}")
802 self.mark_overrun()
803 if len(self.nodes) == self.max_choices:
804 debug_report(f"overrun because hit {self.max_choices=}")
805 self.mark_overrun()
806
807 if observe and self.prefix is not None and self.index < len(self.prefix):
808 value = self._pop_choice(choice_type, constraints, forced=forced)
809 elif forced is None:
810 value = getattr(self.provider, f"draw_{choice_type}")(**constraints)
811
812 if forced is not None:
813 value = forced
814
815 # nan values generated via int_to_float break list membership:
816 #
817 # >>> n = 18444492273895866368
818 # >>> assert math.isnan(int_to_float(n))
819 # >>> assert int_to_float(n) not in [int_to_float(n)]
820 #
821 # because int_to_float nans are not equal in the sense of either
822 # `a == b` or `a is b`.
823 #
824 # This can lead to flaky errors when collections require unique
825 # floats. What was happening is that in some places we provided math.nan
826 # provide math.nan, and in others we provided
827 # int_to_float(float_to_int(math.nan)), and which one gets used
828 # was not deterministic across test iterations.
829 #
830 # To fix this, *never* provide a nan value which is equal (via `is`) to
831 # another provided nan value. This sacrifices some test power; we should
832 # bring that back (ABOVE the choice sequence layer) in the future.
833 #
834 # See https://github.com/HypothesisWorks/hypothesis/issues/3926.
835 if choice_type == "float":
836 assert isinstance(value, float)
837 if math.isnan(value):
838 value = int_to_float(float_to_int(value))
839
840 if observe:
841 was_forced = forced is not None
842 getattr(self.observer, f"draw_{choice_type}")(
843 value, constraints=constraints, was_forced=was_forced
844 )
845 size = 0 if self.provider.avoid_realization else choices_size([value])
846 if self.length + size > self.max_length:
847 debug_report(
848 f"overrun because {self.length=} + {size=} > {self.max_length=}"
849 )
850 self.mark_overrun()
851
852 node = ChoiceNode(
853 type=choice_type,
854 value=value,
855 constraints=constraints,
856 was_forced=was_forced,
857 index=len(self.nodes),
858 )
859 self.__span_record.record_choice()
860 self.nodes += (node,)
861 self.length += size
862
863 return value
864
865 def draw_integer(
866 self,
867 min_value: int | None = None,
868 max_value: int | None = None,
869 *,
870 weights: dict[int, float] | None = None,
871 shrink_towards: int = 0,
872 forced: int | None = None,
873 observe: bool = True,
874 ) -> int:
875 # Validate arguments
876 if weights is not None:
877 assert min_value is not None
878 assert max_value is not None
879 assert len(weights) <= 255 # arbitrary practical limit
880 # We can and should eventually support total weights. But this
881 # complicates shrinking as we can no longer assume we can force
882 # a value to the unmapped probability mass if that mass might be 0.
883 assert sum(weights.values()) < 1
884 # similarly, things get simpler if we assume every value is possible.
885 # we'll want to drop this restriction eventually.
886 assert all(w != 0 for w in weights.values())
887
888 if forced is not None and min_value is not None:
889 assert min_value <= forced
890 if forced is not None and max_value is not None:
891 assert forced <= max_value
892
893 constraints: IntegerConstraints = self._pooled_constraints(
894 "integer",
895 {
896 "min_value": min_value,
897 "max_value": max_value,
898 "weights": weights,
899 "shrink_towards": shrink_towards,
900 },
901 )
902 return self._draw("integer", constraints, observe=observe, forced=forced)
903
904 def draw_float(
905 self,
906 min_value: float = -math.inf,
907 max_value: float = math.inf,
908 *,
909 allow_nan: bool = True,
910 smallest_nonzero_magnitude: float = SMALLEST_SUBNORMAL,
911 # TODO: consider supporting these float widths at the choice sequence
912 # level in the future.
913 # width: Literal[16, 32, 64] = 64,
914 forced: float | None = None,
915 observe: bool = True,
916 ) -> float:
917 assert smallest_nonzero_magnitude > 0
918 assert not math.isnan(min_value)
919 assert not math.isnan(max_value)
920
921 if smallest_nonzero_magnitude == 0.0: # pragma: no cover
922 raise FloatingPointError(
923 "Got allow_subnormal=True, but we can't represent subnormal floats "
924 "right now, in violation of the IEEE-754 floating-point "
925 "specification. This is usually because something was compiled with "
926 "-ffast-math or a similar option, which sets global processor state. "
927 "See https://simonbyrne.github.io/notes/fastmath/ for a more detailed "
928 "writeup - and good luck!"
929 )
930
931 if forced is not None:
932 assert allow_nan or not math.isnan(forced)
933 assert math.isnan(forced) or (
934 sign_aware_lte(min_value, forced) and sign_aware_lte(forced, max_value)
935 )
936
937 constraints: FloatConstraints = self._pooled_constraints(
938 "float",
939 {
940 "min_value": min_value,
941 "max_value": max_value,
942 "allow_nan": allow_nan,
943 "smallest_nonzero_magnitude": smallest_nonzero_magnitude,
944 },
945 )
946 return self._draw("float", constraints, observe=observe, forced=forced)
947
948 def draw_string(
949 self,
950 intervals: IntervalSet,
951 *,
952 min_size: int = 0,
953 max_size: int = COLLECTION_DEFAULT_MAX_SIZE,
954 forced: str | None = None,
955 observe: bool = True,
956 ) -> str:
957 assert forced is None or min_size <= len(forced) <= max_size
958 assert min_size >= 0
959 if len(intervals) == 0:
960 assert min_size == 0
961
962 constraints: StringConstraints = self._pooled_constraints(
963 "string",
964 {
965 "intervals": intervals,
966 "min_size": min_size,
967 "max_size": max_size,
968 },
969 )
970 return self._draw("string", constraints, observe=observe, forced=forced)
971
972 def draw_bytes(
973 self,
974 min_size: int = 0,
975 max_size: int = COLLECTION_DEFAULT_MAX_SIZE,
976 *,
977 forced: bytes | None = None,
978 observe: bool = True,
979 ) -> bytes:
980 assert forced is None or min_size <= len(forced) <= max_size
981 assert min_size >= 0
982
983 constraints: BytesConstraints = self._pooled_constraints(
984 "bytes", {"min_size": min_size, "max_size": max_size}
985 )
986 return self._draw("bytes", constraints, observe=observe, forced=forced)
987
988 def draw_boolean(
989 self,
990 p: float = 0.5,
991 *,
992 forced: bool | None = None,
993 observe: bool = True,
994 ) -> bool:
995 assert (forced is not True) or p > 0
996 assert (forced is not False) or p < 1
997
998 constraints: BooleanConstraints = self._pooled_constraints("boolean", {"p": p})
999 return self._draw("boolean", constraints, observe=observe, forced=forced)
1000
1001 @overload
1002 def _pooled_constraints(
1003 self, choice_type: Literal["integer"], constraints: IntegerConstraints
1004 ) -> IntegerConstraints: ...
1005
1006 @overload
1007 def _pooled_constraints(
1008 self, choice_type: Literal["float"], constraints: FloatConstraints
1009 ) -> FloatConstraints: ...
1010
1011 @overload
1012 def _pooled_constraints(
1013 self, choice_type: Literal["string"], constraints: StringConstraints
1014 ) -> StringConstraints: ...
1015
1016 @overload
1017 def _pooled_constraints(
1018 self, choice_type: Literal["bytes"], constraints: BytesConstraints
1019 ) -> BytesConstraints: ...
1020
1021 @overload
1022 def _pooled_constraints(
1023 self, choice_type: Literal["boolean"], constraints: BooleanConstraints
1024 ) -> BooleanConstraints: ...
1025
1026 def _pooled_constraints(
1027 self, choice_type: ChoiceTypeT, constraints: ChoiceConstraintsT
1028 ) -> ChoiceConstraintsT:
1029 """Memoize common dictionary objects to reduce memory pressure."""
1030 # caching runs afoul of nondeterminism checks
1031 if self.provider.avoid_realization:
1032 return constraints
1033
1034 key = (choice_type, *choice_constraints_key(choice_type, constraints))
1035 try:
1036 return POOLED_CONSTRAINTS_CACHE[key]
1037 except KeyError:
1038 POOLED_CONSTRAINTS_CACHE[key] = constraints
1039 return constraints
1040
1041 def _pop_choice(
1042 self,
1043 choice_type: ChoiceTypeT,
1044 constraints: ChoiceConstraintsT,
1045 *,
1046 forced: ChoiceT | None,
1047 ) -> ChoiceT:
1048 assert self.prefix is not None
1049 # checked in _draw
1050 assert self.index < len(self.prefix)
1051
1052 value = self.prefix[self.index]
1053 if isinstance(value, ChoiceTemplate):
1054 node: ChoiceTemplate = value
1055 if node.count is not None:
1056 assert node.count >= 0
1057 # node templates have to be at the end for now, since it's not immediately
1058 # apparent how to handle overruning a node template while generating a single
1059 # node if the alternative is not "the entire data is an overrun".
1060 assert self.index == len(self.prefix) - 1
1061 if node.type == "simplest":
1062 if forced is not None:
1063 choice = forced
1064 try:
1065 choice = choice_from_index(0, choice_type, constraints)
1066 except ChoiceTooLarge:
1067 self.mark_overrun()
1068 else:
1069 raise NotImplementedError
1070
1071 if node.count is not None:
1072 node.count -= 1
1073 if node.count < 0:
1074 self.mark_overrun()
1075 return choice
1076
1077 choice = value
1078 node_choice_type = {
1079 str: "string",
1080 float: "float",
1081 int: "integer",
1082 bool: "boolean",
1083 bytes: "bytes",
1084 }[type(choice)]
1085 # If we're trying to:
1086 # * draw a different choice type at the same location
1087 # * draw the same choice type with a different constraints, which does not permit
1088 # the current value
1089 #
1090 # then we call this a misalignment, because the choice sequence has
1091 # changed from what we expected at some point. An easy misalignment is
1092 #
1093 # one_of(integers(0, 100), integers(101, 200))
1094 #
1095 # where the choice sequence [0, 100] has constraints {min_value: 0, max_value: 100}
1096 # at index 1, but [0, 101] has constraints {min_value: 101, max_value: 200} at
1097 # index 1 (which does not permit any of the values 0-100).
1098 #
1099 # When the choice sequence becomes misaligned, we generate a new value of the
1100 # type and constraints the strategy expects.
1101 if node_choice_type != choice_type or not choice_permitted(choice, constraints):
1102 # only track first misalignment for now.
1103 if self.misaligned_at is None:
1104 self.misaligned_at = (self.index, choice_type, constraints, forced)
1105 try:
1106 # Fill in any misalignments with index 0 choices. An alternative to
1107 # this is using the index of the misaligned choice instead
1108 # of index 0, which may be useful for maintaining
1109 # "similarly-complex choices" in the shrinker. This requires
1110 # attaching an index to every choice in ConjectureData.for_choices,
1111 # which we don't always have (e.g. when reading from db).
1112 #
1113 # If we really wanted this in the future we could make this complexity
1114 # optional, use it if present, and default to index 0 otherwise.
1115 # This complicates our internal api and so I'd like to avoid it
1116 # if possible.
1117 #
1118 # Additionally, I don't think slips which require
1119 # slipping to high-complexity values are common. Though arguably
1120 # we may want to expand a bit beyond *just* the simplest choice.
1121 # (we could for example consider sampling choices from index 0-10).
1122 choice = choice_from_index(0, choice_type, constraints)
1123 except ChoiceTooLarge:
1124 # should really never happen with a 0-index choice, but let's be safe.
1125 self.mark_overrun()
1126
1127 self.index += 1
1128 return choice
1129
1130 def as_result(self) -> ConjectureResult | _Overrun:
1131 """Convert the result of running this test into
1132 either an Overrun object or a ConjectureResult."""
1133
1134 assert self.frozen
1135 if self.status == Status.OVERRUN:
1136 return Overrun
1137 if self.__result is None:
1138 self.__result = ConjectureResult(
1139 status=self.status,
1140 interesting_origin=self.interesting_origin,
1141 spans=self.spans,
1142 nodes=self.nodes,
1143 length=self.length,
1144 output=self.output,
1145 expected_traceback=self.expected_traceback,
1146 expected_exception=self.expected_exception,
1147 has_discards=self.has_discards,
1148 target_observations=self.target_observations,
1149 tags=frozenset(self.tags),
1150 arg_slices=self.arg_slices,
1151 slice_comments=self.slice_comments,
1152 misaligned_at=self.misaligned_at,
1153 cannot_proceed_scope=self.cannot_proceed_scope,
1154 )
1155 assert self.__result is not None
1156 return self.__result
1157
1158 def __assert_not_frozen(self, name: str) -> None:
1159 if self.frozen:
1160 raise Frozen(f"Cannot call {name} on frozen ConjectureData")
1161
1162 def note(self, value: Any) -> None:
1163 self.__assert_not_frozen("note")
1164 if not isinstance(value, str):
1165 value = repr(value)
1166 self.output += value
1167
1168 def draw(
1169 self,
1170 strategy: "SearchStrategy[Ex]",
1171 label: int | None = None,
1172 observe_as: str | None = None,
1173 ) -> "Ex":
1174 from hypothesis.internal.observability import observability_enabled
1175 from hypothesis.strategies._internal.lazy import unwrap_strategies
1176 from hypothesis.strategies._internal.utils import to_jsonable
1177
1178 at_top_level = self.depth == 0
1179 start_time = None
1180 if at_top_level:
1181 # We start this timer early, because accessing attributes on a LazyStrategy
1182 # can be almost arbitrarily slow. In cases like characters() and text()
1183 # where we cache something expensive, this led to Flaky deadline errors!
1184 # See https://github.com/HypothesisWorks/hypothesis/issues/2108
1185 start_time = time.perf_counter()
1186 gc_start_time = gc_cumulative_time()
1187
1188 strategy.validate()
1189
1190 if strategy.is_empty:
1191 self.mark_invalid(f"empty strategy {self!r}")
1192
1193 if self.depth >= MAX_DEPTH:
1194 self.mark_invalid("max depth exceeded")
1195
1196 # Jump directly to the unwrapped strategy for the label and for do_draw.
1197 # This avoids adding an extra span to all lazy strategies.
1198 unwrapped = unwrap_strategies(strategy)
1199 if label is None:
1200 label = unwrapped.label
1201 assert isinstance(label, int)
1202
1203 self.start_span(label=label)
1204 try:
1205 if not at_top_level:
1206 try:
1207 return unwrapped.do_draw(self)
1208 except FlakyStrategyDefinition as err:
1209 # Record the strategy stack as the error unwinds, so that an
1210 # inconsistent-generation failure is explained in terms of the
1211 # strategies being drawn from, not just the choice sequence.
1212 # The top-level draw adds its own "while generating ..." note.
1213 add_note(err, f"while drawing from {strategy!r}")
1214 raise
1215 assert start_time is not None
1216 key = observe_as or f"generate:unlabeled_{len(self.draw_times)}"
1217 try:
1218 try:
1219 v = unwrapped.do_draw(self)
1220 finally:
1221 # Subtract the time spent in GC to avoid overcounting, as it is
1222 # accounted for at the overall example level.
1223 in_gctime = gc_cumulative_time() - gc_start_time
1224 self.draw_times[key] = time.perf_counter() - start_time - in_gctime
1225 except Exception as err:
1226 add_note(
1227 err,
1228 f"while generating {key.removeprefix('generate:')!r} from {strategy!r}",
1229 )
1230 raise
1231 if observability_enabled():
1232 avoid = self.provider.avoid_realization
1233 self._observability_args[key] = to_jsonable(v, avoid_realization=avoid)
1234 return v
1235 finally:
1236 self.stop_span()
1237
1238 def start_span(self, label: int) -> None:
1239 self.provider.span_start(label)
1240 self.__assert_not_frozen("start_span")
1241 self.depth += 1
1242 # Logically it would make sense for this to just be
1243 # ``self.depth = max(self.depth, self.max_depth)``, which is what it used to
1244 # be until we ran the code under tracemalloc and found a rather significant
1245 # chunk of allocation was happening here. This was presumably due to varargs
1246 # or the like, but we didn't investigate further given that it was easy
1247 # to fix with this check.
1248 if self.depth > self.max_depth:
1249 self.max_depth = self.depth
1250 self.__span_record.start_span(label)
1251 self.labels_for_structure_stack.append({label})
1252
1253 def stop_span(self, *, discard: bool = False) -> None:
1254 self.provider.span_end(discard)
1255 if self.frozen:
1256 return
1257 if discard:
1258 self.has_discards = True
1259 self.depth -= 1
1260 assert self.depth >= -1
1261 self.__span_record.stop_span(discard=discard)
1262
1263 labels_for_structure = self.labels_for_structure_stack.pop()
1264
1265 if not discard:
1266 if self.labels_for_structure_stack:
1267 self.labels_for_structure_stack[-1].update(labels_for_structure)
1268 else:
1269 self.tags.update([structural_coverage(l) for l in labels_for_structure])
1270
1271 if discard:
1272 # Once we've discarded a span, every test case starting with
1273 # this prefix contains discards. We prune the tree at that point so
1274 # as to avoid future test cases bothering with this region, on the
1275 # assumption that some span that you could have used instead
1276 # there would *not* trigger the discard. This greatly speeds up
1277 # test case generation in some cases, because it allows us to
1278 # ignore large swathes of the search space that are effectively
1279 # redundant.
1280 #
1281 # A scenario that can cause us problems but which we deliberately
1282 # have decided not to support is that if there are side effects
1283 # during data generation then you may end up with a scenario where
1284 # every good test case generates a discard because the discarded
1285 # section sets up important things for later. This is not terribly
1286 # likely and all that you see in this case is some degradation in
1287 # quality of testing, so we don't worry about it.
1288 #
1289 # Note that killing the branch does *not* mean we will never
1290 # explore below this point, and in particular we may do so during
1291 # shrinking. Any explicit request for a data object that starts
1292 # with the branch here will work just fine, but novel prefix
1293 # generation will avoid it, and we can use it to detect when we
1294 # have explored the entire tree (up to redundancy).
1295
1296 self.observer.kill_branch()
1297
1298 @property
1299 def spans(self) -> Spans:
1300 assert self.frozen
1301 if self.__spans is None:
1302 self.__spans = Spans(record=self.__span_record)
1303 return self.__spans
1304
1305 def freeze(self) -> None:
1306 if self.frozen:
1307 return
1308 self.finish_time = time.perf_counter()
1309 self.gc_finish_time = gc_cumulative_time()
1310
1311 # Always finish by closing all remaining spans so that we have a valid tree.
1312 while self.depth >= 0:
1313 self.stop_span()
1314
1315 self.__span_record.freeze()
1316 self.frozen = True
1317 self.observer.conclude_test(self.status, self.interesting_origin)
1318
1319 def choice(
1320 self,
1321 values: Sequence[T],
1322 *,
1323 forced: T | None = None,
1324 observe: bool = True,
1325 ) -> T:
1326 forced_i = None if forced is None else values.index(forced)
1327 i = self.draw_integer(
1328 0,
1329 len(values) - 1,
1330 forced=forced_i,
1331 observe=observe,
1332 )
1333 return values[i]
1334
1335 def conclude_test(
1336 self,
1337 status: Status,
1338 interesting_origin: InterestingOrigin | None = None,
1339 ) -> NoReturn:
1340 assert (interesting_origin is None) or (status == Status.INTERESTING)
1341 self.__assert_not_frozen("conclude_test")
1342 self.interesting_origin = interesting_origin
1343 self.status = status
1344 self.freeze()
1345 raise StopTest(self.testcounter)
1346
1347 def mark_interesting(self, interesting_origin: InterestingOrigin) -> NoReturn:
1348 self.conclude_test(Status.INTERESTING, interesting_origin)
1349
1350 def mark_invalid(self, why: str | None = None) -> NoReturn:
1351 if why is not None:
1352 self.events["invalid because"] = why
1353 self.conclude_test(Status.INVALID)
1354
1355 def mark_overrun(self) -> NoReturn:
1356 self.conclude_test(Status.OVERRUN)
1357
1358
1359def draw_choice(
1360 choice_type: ChoiceTypeT, constraints: ChoiceConstraintsT, *, random: Random
1361) -> ChoiceT:
1362 cd = ConjectureData(random=random)
1363 return cast(ChoiceT, getattr(cd.provider, f"draw_{choice_type}")(**constraints))