1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
10
11import math
12import time
13from collections import defaultdict
14from collections.abc import Hashable, Iterable, Iterator, Sequence
15from dataclasses import dataclass, field
16from enum import IntEnum
17from functools import cached_property
18from random import Random
19from typing import (
20 TYPE_CHECKING,
21 Any,
22 Literal,
23 NoReturn,
24 TypeAlias,
25 TypeVar,
26 cast,
27 overload,
28)
29
30from hypothesis.errors import (
31 CannotProceedScopeT,
32 ChoiceTooLarge,
33 Frozen,
34 InvalidArgument,
35 StopTest,
36)
37from hypothesis.internal.cache import LRUCache
38from hypothesis.internal.compat import add_note
39from hypothesis.internal.conjecture.choice import (
40 BooleanConstraints,
41 BytesConstraints,
42 ChoiceConstraintsT,
43 ChoiceNode,
44 ChoiceT,
45 ChoiceTemplate,
46 ChoiceTypeT,
47 FloatConstraints,
48 IntegerConstraints,
49 StringConstraints,
50 choice_constraints_key,
51 choice_from_index,
52 choice_permitted,
53 choices_size,
54)
55from hypothesis.internal.conjecture.junkdrawer import IntList, gc_cumulative_time
56from hypothesis.internal.conjecture.providers import (
57 COLLECTION_DEFAULT_MAX_SIZE,
58 HypothesisProvider,
59 PrimitiveProvider,
60)
61from hypothesis.internal.conjecture.utils import calc_label_from_name
62from hypothesis.internal.escalation import InterestingOrigin
63from hypothesis.internal.floats import (
64 SMALLEST_SUBNORMAL,
65 float_to_int,
66 int_to_float,
67 sign_aware_lte,
68)
69from hypothesis.internal.intervalsets import IntervalSet
70from hypothesis.internal.observability import PredicateCounts
71from hypothesis.reporting import debug_report
72from hypothesis.utils.conventions import not_set
73from hypothesis.utils.threading import ThreadLocal
74
75if TYPE_CHECKING:
76 from hypothesis.strategies import SearchStrategy
77 from hypothesis.strategies._internal.core import DataObject
78 from hypothesis.strategies._internal.random import RandomState
79 from hypothesis.strategies._internal.strategies import Ex
80
81
82def __getattr__(name: str) -> Any:
83 if name == "AVAILABLE_PROVIDERS":
84 from hypothesis._settings import note_deprecation
85 from hypothesis.internal.conjecture.providers import AVAILABLE_PROVIDERS
86
87 note_deprecation(
88 "hypothesis.internal.conjecture.data.AVAILABLE_PROVIDERS has been moved to "
89 "hypothesis.internal.conjecture.providers.AVAILABLE_PROVIDERS.",
90 since="2025-01-25",
91 has_codemod=False,
92 stacklevel=1,
93 )
94 return AVAILABLE_PROVIDERS
95
96 raise AttributeError(
97 f"Module 'hypothesis.internal.conjecture.data' has no attribute {name}"
98 )
99
100
101T = TypeVar("T")
102TargetObservations = dict[str, int | float]
103# index, choice_type, constraints, forced value
104MisalignedAt: TypeAlias = tuple[int, ChoiceTypeT, ChoiceConstraintsT, ChoiceT | None]
105
106TOP_LABEL = calc_label_from_name("top")
107MAX_DEPTH = 100
108
109threadlocal = ThreadLocal(global_test_counter=int)
110
111
112class Status(IntEnum):
113 OVERRUN = 0
114 INVALID = 1
115 VALID = 2
116 INTERESTING = 3
117
118 def __repr__(self) -> str:
119 return f"Status.{self.name}"
120
121
122@dataclass(slots=True, frozen=True)
123class StructuralCoverageTag:
124 label: int
125
126
127STRUCTURAL_COVERAGE_CACHE: dict[int, StructuralCoverageTag] = {}
128
129
130def structural_coverage(label: int) -> StructuralCoverageTag:
131 try:
132 return STRUCTURAL_COVERAGE_CACHE[label]
133 except KeyError:
134 return STRUCTURAL_COVERAGE_CACHE.setdefault(label, StructuralCoverageTag(label))
135
136
137# This cache can be quite hot and so we prefer LRUCache over LRUReusedCache for
138# performance. We lose scan resistance, but that's probably fine here.
139POOLED_CONSTRAINTS_CACHE: LRUCache[tuple[Any, ...], ChoiceConstraintsT] = LRUCache(4096)
140
141
142class Span:
143 """A span tracks the hierarchical structure of choices within a single test run.
144
145 Spans are created to mark regions of the choice sequence that that are
146 logically related to each other. For instance, Hypothesis tracks:
147 - A single top-level span for the entire choice sequence
148 - A span for the choices made by each strategy
149 - Some strategies define additional spans within their choices. For instance,
150 st.lists() tracks the "should add another element" choice and the "add
151 another element" choices as separate spans.
152
153 Spans provide useful information to the shrinker, mutator, targeted PBT,
154 and other subsystems of Hypothesis.
155
156 Rather than store each ``Span`` as a rich object, it is actually
157 just an index into the ``Spans`` class defined below. This has two
158 purposes: Firstly, for most properties of spans we will never need
159 to allocate storage at all, because most properties are not used on
160 most spans. Secondly, by storing the spans as compact lists
161 of integers, we save a considerable amount of space compared to
162 Python's normal object size.
163
164 This does have the downside that it increases the amount of allocation
165 we do, and slows things down as a result, in some usage patterns because
166 we repeatedly allocate the same Span or int objects, but it will
167 often dramatically reduce our memory usage, so is worth it.
168 """
169
170 __slots__ = ("index", "owner")
171
172 def __init__(self, owner: "Spans", index: int) -> None:
173 self.owner = owner
174 self.index = index
175
176 def __eq__(self, other: object) -> bool:
177 if self is other:
178 return True
179 if not isinstance(other, Span):
180 return NotImplemented
181 return (self.owner is other.owner) and (self.index == other.index)
182
183 def __ne__(self, other: object) -> bool:
184 if self is other:
185 return False
186 if not isinstance(other, Span):
187 return NotImplemented
188 return (self.owner is not other.owner) or (self.index != other.index)
189
190 def __repr__(self) -> str:
191 return f"spans[{self.index}]"
192
193 @property
194 def label(self) -> int:
195 """A label is an opaque value that associates each span with its
196 approximate origin, such as a particular strategy class or a particular
197 kind of draw."""
198 return self.owner.labels[self.owner.label_indices[self.index]]
199
200 @property
201 def parent(self) -> int | None:
202 """The index of the span that this one is nested directly within."""
203 if self.index == 0:
204 return None
205 return self.owner.parentage[self.index]
206
207 @property
208 def start(self) -> int:
209 return self.owner.starts[self.index]
210
211 @property
212 def end(self) -> int:
213 return self.owner.ends[self.index]
214
215 @property
216 def depth(self) -> int:
217 """
218 Depth of this span in the span tree. The top-level span has a depth of 0.
219 """
220 return self.owner.depths[self.index]
221
222 @property
223 def discarded(self) -> bool:
224 """True if this is span's ``stop_span`` call had ``discard`` set to
225 ``True``. This means we believe that the shrinker should be able to delete
226 this span completely, without affecting the value produced by its enclosing
227 strategy. Typically set when a rejection sampler decides to reject a
228 generated value and try again."""
229 return self.index in self.owner.discarded
230
231 @property
232 def choice_count(self) -> int:
233 """The number of choices in this span."""
234 return self.end - self.start
235
236 @property
237 def children(self) -> "list[Span]":
238 """The list of all spans with this as a parent, in increasing index
239 order."""
240 return [self.owner[i] for i in self.owner.children[self.index]]
241
242
243class SpanProperty:
244 """There are many properties of spans that we calculate by
245 essentially rerunning the test case multiple times based on the
246 calls which we record in SpanProperty.
247
248 This class defines a visitor, subclasses of which can be used
249 to calculate these properties.
250 """
251
252 def __init__(self, spans: "Spans"):
253 self.span_stack: list[int] = []
254 self.spans = spans
255 self.span_count = 0
256 self.choice_count = 0
257
258 def run(self) -> Any:
259 """Rerun the test case with this visitor and return the
260 results of ``self.finish()``."""
261 for record in self.spans.trail:
262 if record == TrailType.STOP_SPAN_DISCARD:
263 self.__pop(discarded=True)
264 elif record == TrailType.STOP_SPAN_NO_DISCARD:
265 self.__pop(discarded=False)
266 elif record == TrailType.CHOICE:
267 self.choice_count += 1
268 else:
269 # everything after TrailType.CHOICE is the label of a span start.
270 self.__push(record - TrailType.CHOICE - 1)
271
272 return self.finish()
273
274 def __push(self, label_index: int) -> None:
275 i = self.span_count
276 assert i < len(self.spans)
277 self.start_span(i, label_index=label_index)
278 self.span_count += 1
279 self.span_stack.append(i)
280
281 def __pop(self, *, discarded: bool) -> None:
282 i = self.span_stack.pop()
283 self.stop_span(i, discarded=discarded)
284
285 def start_span(self, i: int, label_index: int) -> None:
286 """Called at the start of each span, with ``i`` the
287 index of the span and ``label_index`` the index of
288 its label in ``self.spans.labels``."""
289
290 def stop_span(self, i: int, *, discarded: bool) -> None:
291 """Called at the end of each span, with ``i`` the
292 index of the span and ``discarded`` being ``True`` if ``stop_span``
293 was called with ``discard=True``."""
294
295 def finish(self) -> Any:
296 raise NotImplementedError
297
298
299class TrailType(IntEnum):
300 STOP_SPAN_DISCARD = 1
301 STOP_SPAN_NO_DISCARD = 2
302 CHOICE = 3
303 # every trail element larger than TrailType.CHOICE is the label of a span
304 # start, offset by its index. So the first span label is stored as 4, the
305 # second as 5, etc, regardless of its actual integer label.
306
307
308class SpanRecord:
309 """Records the series of ``start_span``, ``stop_span``, and
310 ``draw_bits`` calls so that these may be stored in ``Spans`` and
311 replayed when we need to know about the structure of individual
312 ``Span`` objects.
313
314 Note that there is significant similarity between this class and
315 ``DataObserver``, and the plan is to eventually unify them, but
316 they currently have slightly different functions and implementations.
317 """
318
319 def __init__(self) -> None:
320 self.labels: list[int] = []
321 self.__index_of_labels: dict[int, int] | None = {}
322 self.trail = IntList()
323 self.nodes: list[ChoiceNode] = []
324
325 def freeze(self) -> None:
326 self.__index_of_labels = None
327
328 def record_choice(self) -> None:
329 self.trail.append(TrailType.CHOICE)
330
331 def start_span(self, label: int) -> None:
332 assert self.__index_of_labels is not None
333 try:
334 i = self.__index_of_labels[label]
335 except KeyError:
336 i = self.__index_of_labels.setdefault(label, len(self.labels))
337 self.labels.append(label)
338 self.trail.append(TrailType.CHOICE + 1 + i)
339
340 def stop_span(self, *, discard: bool) -> None:
341 if discard:
342 self.trail.append(TrailType.STOP_SPAN_DISCARD)
343 else:
344 self.trail.append(TrailType.STOP_SPAN_NO_DISCARD)
345
346
347class _starts_and_ends(SpanProperty):
348 def __init__(self, spans: "Spans") -> None:
349 super().__init__(spans)
350 self.starts = IntList.of_length(len(self.spans))
351 self.ends = IntList.of_length(len(self.spans))
352
353 def start_span(self, i: int, label_index: int) -> None:
354 self.starts[i] = self.choice_count
355
356 def stop_span(self, i: int, *, discarded: bool) -> None:
357 self.ends[i] = self.choice_count
358
359 def finish(self) -> tuple[IntList, IntList]:
360 return (self.starts, self.ends)
361
362
363class _discarded(SpanProperty):
364 def __init__(self, spans: "Spans") -> None:
365 super().__init__(spans)
366 self.result: set[int] = set()
367
368 def finish(self) -> frozenset[int]:
369 return frozenset(self.result)
370
371 def stop_span(self, i: int, *, discarded: bool) -> None:
372 if discarded:
373 self.result.add(i)
374
375
376class _parentage(SpanProperty):
377 def __init__(self, spans: "Spans") -> None:
378 super().__init__(spans)
379 self.result = IntList.of_length(len(self.spans))
380
381 def stop_span(self, i: int, *, discarded: bool) -> None:
382 if i > 0:
383 self.result[i] = self.span_stack[-1]
384
385 def finish(self) -> IntList:
386 return self.result
387
388
389class _depths(SpanProperty):
390 def __init__(self, spans: "Spans") -> None:
391 super().__init__(spans)
392 self.result = IntList.of_length(len(self.spans))
393
394 def start_span(self, i: int, label_index: int) -> None:
395 self.result[i] = len(self.span_stack)
396
397 def finish(self) -> IntList:
398 return self.result
399
400
401class _label_indices(SpanProperty):
402 def __init__(self, spans: "Spans") -> None:
403 super().__init__(spans)
404 self.result = IntList.of_length(len(self.spans))
405
406 def start_span(self, i: int, label_index: int) -> None:
407 self.result[i] = label_index
408
409 def finish(self) -> IntList:
410 return self.result
411
412
413class _mutator_groups(SpanProperty):
414 def __init__(self, spans: "Spans") -> None:
415 super().__init__(spans)
416 self.groups: dict[int, set[tuple[int, int]]] = defaultdict(set)
417
418 def start_span(self, i: int, label_index: int) -> None:
419 # TODO should we discard start == end cases? occurs for eg st.data()
420 # which is conditionally or never drawn from. arguably swapping
421 # nodes with the empty list is a useful mutation enabled by start == end?
422 key = (self.spans[i].start, self.spans[i].end)
423 self.groups[label_index].add(key)
424
425 def finish(self) -> Iterable[set[tuple[int, int]]]:
426 # Discard groups with only one span, since the mutator can't
427 # do anything useful with them.
428 return [g for g in self.groups.values() if len(g) >= 2]
429
430
431class Spans:
432 """A lazy collection of ``Span`` objects, derived from
433 the record of recorded behaviour in ``SpanRecord``.
434
435 Behaves logically as if it were a list of ``Span`` objects,
436 but actually mostly exists as a compact store of information
437 for them to reference into. All properties on here are best
438 understood as the backing storage for ``Span`` and are
439 described there.
440 """
441
442 def __init__(self, record: SpanRecord) -> None:
443 self.trail = record.trail
444 self.labels = record.labels
445 self.__length = self.trail.count(
446 TrailType.STOP_SPAN_DISCARD
447 ) + record.trail.count(TrailType.STOP_SPAN_NO_DISCARD)
448 self.__children: list[Sequence[int]] | None = None
449
450 @cached_property
451 def starts_and_ends(self) -> tuple[IntList, IntList]:
452 return _starts_and_ends(self).run()
453
454 @property
455 def starts(self) -> IntList:
456 return self.starts_and_ends[0]
457
458 @property
459 def ends(self) -> IntList:
460 return self.starts_and_ends[1]
461
462 @cached_property
463 def discarded(self) -> frozenset[int]:
464 return _discarded(self).run()
465
466 @cached_property
467 def parentage(self) -> IntList:
468 return _parentage(self).run()
469
470 @cached_property
471 def depths(self) -> IntList:
472 return _depths(self).run()
473
474 @cached_property
475 def label_indices(self) -> IntList:
476 return _label_indices(self).run()
477
478 @cached_property
479 def mutator_groups(self) -> list[set[tuple[int, int]]]:
480 return _mutator_groups(self).run()
481
482 @property
483 def children(self) -> list[Sequence[int]]:
484 if self.__children is None:
485 children = [IntList() for _ in range(len(self))]
486 for i, p in enumerate(self.parentage):
487 if i > 0:
488 children[p].append(i)
489 # Replace empty children lists with a tuple to reduce
490 # memory usage.
491 for i, c in enumerate(children):
492 if not c:
493 children[i] = () # type: ignore
494 self.__children = children # type: ignore
495 return self.__children # type: ignore
496
497 def __len__(self) -> int:
498 return self.__length
499
500 def __getitem__(self, i: int) -> Span:
501 n = self.__length
502 if i < -n or i >= n:
503 raise IndexError(f"Index {i} out of range [-{n}, {n})")
504 if i < 0:
505 i += n
506 return Span(self, i)
507
508 # not strictly necessary as we have len/getitem, but required for mypy.
509 # https://github.com/python/mypy/issues/9737
510 def __iter__(self) -> Iterator[Span]:
511 for i in range(len(self)):
512 yield self[i]
513
514
515class _Overrun:
516 status: Status = Status.OVERRUN
517
518 def __repr__(self) -> str:
519 return "Overrun"
520
521
522Overrun = _Overrun()
523
524
525class DataObserver:
526 """Observer class for recording the behaviour of a
527 ConjectureData object, primarily used for tracking
528 the behaviour in the tree cache."""
529
530 def conclude_test(
531 self,
532 status: Status,
533 interesting_origin: InterestingOrigin | None,
534 ) -> None:
535 """Called when ``conclude_test`` is called on the
536 observed ``ConjectureData``, with the same arguments.
537
538 Note that this is called after ``freeze`` has completed.
539 """
540
541 def kill_branch(self) -> None:
542 """Mark this part of the tree as not worth re-exploring."""
543
544 def draw_integer(
545 self, value: int, *, constraints: IntegerConstraints, was_forced: bool
546 ) -> None:
547 pass
548
549 def draw_float(
550 self, value: float, *, constraints: FloatConstraints, was_forced: bool
551 ) -> None:
552 pass
553
554 def draw_string(
555 self, value: str, *, constraints: StringConstraints, was_forced: bool
556 ) -> None:
557 pass
558
559 def draw_bytes(
560 self, value: bytes, *, constraints: BytesConstraints, was_forced: bool
561 ) -> None:
562 pass
563
564 def draw_boolean(
565 self, value: bool, *, constraints: BooleanConstraints, was_forced: bool
566 ) -> None:
567 pass
568
569
570@dataclass(slots=True, frozen=True)
571class ConjectureResult:
572 """Result class storing the parts of ConjectureData that we
573 will care about after the original ConjectureData has outlived its
574 usefulness."""
575
576 status: Status
577 interesting_origin: InterestingOrigin | None
578 nodes: tuple[ChoiceNode, ...] = field(repr=False, compare=False)
579 length: int
580 output: str
581 expected_exception: BaseException | None
582 expected_traceback: str | None
583 has_discards: bool
584 target_observations: TargetObservations
585 tags: frozenset[StructuralCoverageTag]
586 spans: Spans = field(repr=False, compare=False)
587 arg_slices: set[tuple[int, int]] = field(repr=False)
588 slice_comments: dict[tuple[int, int], str] = field(repr=False)
589 misaligned_at: MisalignedAt | None = field(repr=False)
590 cannot_proceed_scope: CannotProceedScopeT | None = field(repr=False)
591
592 def as_result(self) -> "ConjectureResult":
593 return self
594
595 @property
596 def choices(self) -> tuple[ChoiceT, ...]:
597 return tuple(node.value for node in self.nodes)
598
599
600class ConjectureData:
601 @classmethod
602 def for_choices(
603 cls,
604 choices: Sequence[ChoiceTemplate | ChoiceT],
605 *,
606 observer: DataObserver | None = None,
607 provider: PrimitiveProvider | type[PrimitiveProvider] = HypothesisProvider,
608 random: Random | None = None,
609 ) -> "ConjectureData":
610 from hypothesis.internal.conjecture.engine import choice_count
611
612 return cls(
613 max_choices=choice_count(choices),
614 random=random,
615 prefix=choices,
616 observer=observer,
617 provider=provider,
618 )
619
620 def __init__(
621 self,
622 *,
623 random: Random | None,
624 observer: DataObserver | None = None,
625 provider: PrimitiveProvider | type[PrimitiveProvider] = HypothesisProvider,
626 prefix: Sequence[ChoiceTemplate | ChoiceT] | None = None,
627 max_choices: int | None = None,
628 provider_kw: dict[str, Any] | None = None,
629 ) -> None:
630 from hypothesis.internal.conjecture.engine import BUFFER_SIZE
631
632 if observer is None:
633 observer = DataObserver()
634 if provider_kw is None:
635 provider_kw = {}
636 elif not isinstance(provider, type):
637 raise InvalidArgument(
638 f"Expected {provider=} to be a class since {provider_kw=} was "
639 "passed, but got an instance instead."
640 )
641
642 assert isinstance(observer, DataObserver)
643 self.observer = observer
644 self.max_choices = max_choices
645 self.max_length = BUFFER_SIZE
646 self.overdraw = 0
647 self._random = random
648
649 self.length = 0
650 self.index = 0
651 self.output = ""
652 self.status = Status.VALID
653 self.frozen = False
654 self.testcounter = threadlocal.global_test_counter
655 threadlocal.global_test_counter += 1
656 self.start_time = time.perf_counter()
657 self.gc_start_time = gc_cumulative_time()
658 self.events: dict[str, str | int | float] = {}
659 self.interesting_origin: InterestingOrigin | None = None
660 self.draw_times: dict[str, float] = {}
661 self._stateful_run_times: dict[str, float] = defaultdict(float)
662 self.max_depth = 0
663 self.has_discards = False
664
665 self.provider: PrimitiveProvider = (
666 provider(self, **provider_kw) if isinstance(provider, type) else provider
667 )
668 assert isinstance(self.provider, PrimitiveProvider)
669
670 self.__result: ConjectureResult | None = None
671
672 # Observations used for targeted search. They'll be aggregated in
673 # ConjectureRunner.generate_new_examples and fed to TargetSelector.
674 self.target_observations: TargetObservations = {}
675
676 # Tags which indicate something about which part of the search space
677 # this example is in. These are used to guide generation.
678 self.tags: set[StructuralCoverageTag] = set()
679 self.labels_for_structure_stack: list[set[int]] = []
680
681 # Normally unpopulated but we need this in the niche case
682 # that self.as_result() is Overrun but we still want the
683 # examples for reporting purposes.
684 self.__spans: Spans | None = None
685
686 # We want the top level span to have depth 0, so we start
687 # at -1.
688 self.depth = -1
689 self.__span_record = SpanRecord()
690
691 # Slice indices for discrete reportable parts that which-parts-matter can
692 # try varying, to report if the minimal example always fails anyway.
693 self.arg_slices: set[tuple[int, int]] = set()
694 self.slice_comments: dict[tuple[int, int], str] = {}
695 self._observability_args: dict[str, Any] = {}
696 self._observability_predicates: defaultdict[str, PredicateCounts] = defaultdict(
697 PredicateCounts
698 )
699
700 self._sampled_from_all_strategies_elements_message: (
701 tuple[str, object] | None
702 ) = None
703 self._shared_strategy_draws: dict[Hashable, tuple[Any, SearchStrategy]] = {}
704 self._shared_data_strategy: DataObject | None = None
705 self._stateful_repr_parts: list[Any] | None = None
706 self.states_for_ids: dict[int, RandomState] | None = None
707 self.seeds_to_states: dict[Any, RandomState] | None = None
708 self.hypothesis_runner: Any = not_set
709
710 self.expected_exception: BaseException | None = None
711 self.expected_traceback: str | None = None
712
713 self.prefix = prefix
714 self.nodes: tuple[ChoiceNode, ...] = ()
715 self.misaligned_at: MisalignedAt | None = None
716 self.cannot_proceed_scope: CannotProceedScopeT | None = None
717 self.start_span(TOP_LABEL)
718
719 def __repr__(self) -> str:
720 return "ConjectureData(%s, %d choices%s)" % (
721 self.status.name,
722 len(self.nodes),
723 ", frozen" if self.frozen else "",
724 )
725
726 @property
727 def choices(self) -> tuple[ChoiceT, ...]:
728 return tuple(node.value for node in self.nodes)
729
730 # draw_* functions might be called in one of two contexts: either "above" or
731 # "below" the choice sequence. For instance, draw_string calls draw_boolean
732 # from ``many`` when calculating the number of characters to return. We do
733 # not want these choices to get written to the choice sequence, because they
734 # are not true choices themselves.
735 #
736 # `observe` formalizes this. The choice will only be written to the choice
737 # sequence if observe is True.
738
739 @overload
740 def _draw(
741 self,
742 choice_type: Literal["integer"],
743 constraints: IntegerConstraints,
744 *,
745 observe: bool,
746 forced: int | None,
747 ) -> int: ...
748
749 @overload
750 def _draw(
751 self,
752 choice_type: Literal["float"],
753 constraints: FloatConstraints,
754 *,
755 observe: bool,
756 forced: float | None,
757 ) -> float: ...
758
759 @overload
760 def _draw(
761 self,
762 choice_type: Literal["string"],
763 constraints: StringConstraints,
764 *,
765 observe: bool,
766 forced: str | None,
767 ) -> str: ...
768
769 @overload
770 def _draw(
771 self,
772 choice_type: Literal["bytes"],
773 constraints: BytesConstraints,
774 *,
775 observe: bool,
776 forced: bytes | None,
777 ) -> bytes: ...
778
779 @overload
780 def _draw(
781 self,
782 choice_type: Literal["boolean"],
783 constraints: BooleanConstraints,
784 *,
785 observe: bool,
786 forced: bool | None,
787 ) -> bool: ...
788
789 def _draw(
790 self,
791 choice_type: ChoiceTypeT,
792 constraints: ChoiceConstraintsT,
793 *,
794 observe: bool,
795 forced: ChoiceT | None,
796 ) -> ChoiceT:
797 # this is somewhat redundant with the length > max_length check at the
798 # end of the function, but avoids trying to use a null self.random when
799 # drawing past the node of a ConjectureData.for_choices data.
800 if self.length == self.max_length:
801 debug_report(f"overrun because hit {self.max_length=}")
802 self.mark_overrun()
803 if len(self.nodes) == self.max_choices:
804 debug_report(f"overrun because hit {self.max_choices=}")
805 self.mark_overrun()
806
807 if observe and self.prefix is not None and self.index < len(self.prefix):
808 value = self._pop_choice(choice_type, constraints, forced=forced)
809 elif forced is None:
810 value = getattr(self.provider, f"draw_{choice_type}")(**constraints)
811
812 if forced is not None:
813 value = forced
814
815 # nan values generated via int_to_float break list membership:
816 #
817 # >>> n = 18444492273895866368
818 # >>> assert math.isnan(int_to_float(n))
819 # >>> assert int_to_float(n) not in [int_to_float(n)]
820 #
821 # because int_to_float nans are not equal in the sense of either
822 # `a == b` or `a is b`.
823 #
824 # This can lead to flaky errors when collections require unique
825 # floats. What was happening is that in some places we provided math.nan
826 # provide math.nan, and in others we provided
827 # int_to_float(float_to_int(math.nan)), and which one gets used
828 # was not deterministic across test iterations.
829 #
830 # To fix this, *never* provide a nan value which is equal (via `is`) to
831 # another provided nan value. This sacrifices some test power; we should
832 # bring that back (ABOVE the choice sequence layer) in the future.
833 #
834 # See https://github.com/HypothesisWorks/hypothesis/issues/3926.
835 if choice_type == "float":
836 assert isinstance(value, float)
837 if math.isnan(value):
838 value = int_to_float(float_to_int(value))
839
840 if observe:
841 was_forced = forced is not None
842 getattr(self.observer, f"draw_{choice_type}")(
843 value, constraints=constraints, was_forced=was_forced
844 )
845 size = 0 if self.provider.avoid_realization else choices_size([value])
846 if self.length + size > self.max_length:
847 debug_report(
848 f"overrun because {self.length=} + {size=} > {self.max_length=}"
849 )
850 self.mark_overrun()
851
852 node = ChoiceNode(
853 type=choice_type,
854 value=value,
855 constraints=constraints,
856 was_forced=was_forced,
857 index=len(self.nodes),
858 )
859 self.__span_record.record_choice()
860 self.nodes += (node,)
861 self.length += size
862
863 return value
864
865 def draw_integer(
866 self,
867 min_value: int | None = None,
868 max_value: int | None = None,
869 *,
870 weights: dict[int, float] | None = None,
871 shrink_towards: int = 0,
872 forced: int | None = None,
873 observe: bool = True,
874 ) -> int:
875 # Validate arguments
876 if weights is not None:
877 assert min_value is not None
878 assert max_value is not None
879 assert len(weights) <= 255 # arbitrary practical limit
880 # We can and should eventually support total weights. But this
881 # complicates shrinking as we can no longer assume we can force
882 # a value to the unmapped probability mass if that mass might be 0.
883 assert sum(weights.values()) < 1
884 # similarly, things get simpler if we assume every value is possible.
885 # we'll want to drop this restriction eventually.
886 assert all(w != 0 for w in weights.values())
887
888 if forced is not None and min_value is not None:
889 assert min_value <= forced
890 if forced is not None and max_value is not None:
891 assert forced <= max_value
892
893 constraints: IntegerConstraints = self._pooled_constraints(
894 "integer",
895 {
896 "min_value": min_value,
897 "max_value": max_value,
898 "weights": weights,
899 "shrink_towards": shrink_towards,
900 },
901 )
902 return self._draw("integer", constraints, observe=observe, forced=forced)
903
904 def draw_float(
905 self,
906 min_value: float = -math.inf,
907 max_value: float = math.inf,
908 *,
909 allow_nan: bool = True,
910 smallest_nonzero_magnitude: float = SMALLEST_SUBNORMAL,
911 # TODO: consider supporting these float widths at the choice sequence
912 # level in the future.
913 # width: Literal[16, 32, 64] = 64,
914 forced: float | None = None,
915 observe: bool = True,
916 ) -> float:
917 assert smallest_nonzero_magnitude > 0
918 assert not math.isnan(min_value)
919 assert not math.isnan(max_value)
920
921 if smallest_nonzero_magnitude == 0.0: # pragma: no cover
922 raise FloatingPointError(
923 "Got allow_subnormal=True, but we can't represent subnormal floats "
924 "right now, in violation of the IEEE-754 floating-point "
925 "specification. This is usually because something was compiled with "
926 "-ffast-math or a similar option, which sets global processor state. "
927 "See https://simonbyrne.github.io/notes/fastmath/ for a more detailed "
928 "writeup - and good luck!"
929 )
930
931 if forced is not None:
932 assert allow_nan or not math.isnan(forced)
933 assert math.isnan(forced) or (
934 sign_aware_lte(min_value, forced) and sign_aware_lte(forced, max_value)
935 )
936
937 constraints: FloatConstraints = self._pooled_constraints(
938 "float",
939 {
940 "min_value": min_value,
941 "max_value": max_value,
942 "allow_nan": allow_nan,
943 "smallest_nonzero_magnitude": smallest_nonzero_magnitude,
944 },
945 )
946 return self._draw("float", constraints, observe=observe, forced=forced)
947
948 def draw_string(
949 self,
950 intervals: IntervalSet,
951 *,
952 min_size: int = 0,
953 max_size: int = COLLECTION_DEFAULT_MAX_SIZE,
954 forced: str | None = None,
955 observe: bool = True,
956 ) -> str:
957 assert forced is None or min_size <= len(forced) <= max_size
958 assert min_size >= 0
959 if len(intervals) == 0:
960 assert min_size == 0
961
962 constraints: StringConstraints = self._pooled_constraints(
963 "string",
964 {
965 "intervals": intervals,
966 "min_size": min_size,
967 "max_size": max_size,
968 },
969 )
970 return self._draw("string", constraints, observe=observe, forced=forced)
971
972 def draw_bytes(
973 self,
974 min_size: int = 0,
975 max_size: int = COLLECTION_DEFAULT_MAX_SIZE,
976 *,
977 forced: bytes | None = None,
978 observe: bool = True,
979 ) -> bytes:
980 assert forced is None or min_size <= len(forced) <= max_size
981 assert min_size >= 0
982
983 constraints: BytesConstraints = self._pooled_constraints(
984 "bytes", {"min_size": min_size, "max_size": max_size}
985 )
986 return self._draw("bytes", constraints, observe=observe, forced=forced)
987
988 def draw_boolean(
989 self,
990 p: float = 0.5,
991 *,
992 forced: bool | None = None,
993 observe: bool = True,
994 ) -> bool:
995 assert (forced is not True) or p > 0
996 assert (forced is not False) or p < 1
997
998 constraints: BooleanConstraints = self._pooled_constraints("boolean", {"p": p})
999 return self._draw("boolean", constraints, observe=observe, forced=forced)
1000
1001 @overload
1002 def _pooled_constraints(
1003 self, choice_type: Literal["integer"], constraints: IntegerConstraints
1004 ) -> IntegerConstraints: ...
1005
1006 @overload
1007 def _pooled_constraints(
1008 self, choice_type: Literal["float"], constraints: FloatConstraints
1009 ) -> FloatConstraints: ...
1010
1011 @overload
1012 def _pooled_constraints(
1013 self, choice_type: Literal["string"], constraints: StringConstraints
1014 ) -> StringConstraints: ...
1015
1016 @overload
1017 def _pooled_constraints(
1018 self, choice_type: Literal["bytes"], constraints: BytesConstraints
1019 ) -> BytesConstraints: ...
1020
1021 @overload
1022 def _pooled_constraints(
1023 self, choice_type: Literal["boolean"], constraints: BooleanConstraints
1024 ) -> BooleanConstraints: ...
1025
1026 def _pooled_constraints(
1027 self, choice_type: ChoiceTypeT, constraints: ChoiceConstraintsT
1028 ) -> ChoiceConstraintsT:
1029 """Memoize common dictionary objects to reduce memory pressure."""
1030 # caching runs afoul of nondeterminism checks
1031 if self.provider.avoid_realization:
1032 return constraints
1033
1034 key = (choice_type, *choice_constraints_key(choice_type, constraints))
1035 try:
1036 return POOLED_CONSTRAINTS_CACHE[key]
1037 except KeyError:
1038 POOLED_CONSTRAINTS_CACHE[key] = constraints
1039 return constraints
1040
1041 def _pop_choice(
1042 self,
1043 choice_type: ChoiceTypeT,
1044 constraints: ChoiceConstraintsT,
1045 *,
1046 forced: ChoiceT | None,
1047 ) -> ChoiceT:
1048 assert self.prefix is not None
1049 # checked in _draw
1050 assert self.index < len(self.prefix)
1051
1052 value = self.prefix[self.index]
1053 if isinstance(value, ChoiceTemplate):
1054 node: ChoiceTemplate = value
1055 if node.count is not None:
1056 assert node.count >= 0
1057 # node templates have to be at the end for now, since it's not immediately
1058 # apparent how to handle overruning a node template while generating a single
1059 # node if the alternative is not "the entire data is an overrun".
1060 assert self.index == len(self.prefix) - 1
1061 if node.type == "simplest":
1062 if forced is not None:
1063 choice = forced
1064 try:
1065 choice = choice_from_index(0, choice_type, constraints)
1066 except ChoiceTooLarge:
1067 self.mark_overrun()
1068 else:
1069 raise NotImplementedError
1070
1071 if node.count is not None:
1072 node.count -= 1
1073 if node.count < 0:
1074 self.mark_overrun()
1075 return choice
1076
1077 choice = value
1078 node_choice_type = {
1079 str: "string",
1080 float: "float",
1081 int: "integer",
1082 bool: "boolean",
1083 bytes: "bytes",
1084 }[type(choice)]
1085 # If we're trying to:
1086 # * draw a different choice type at the same location
1087 # * draw the same choice type with a different constraints, which does not permit
1088 # the current value
1089 #
1090 # then we call this a misalignment, because the choice sequence has
1091 # changed from what we expected at some point. An easy misalignment is
1092 #
1093 # one_of(integers(0, 100), integers(101, 200))
1094 #
1095 # where the choice sequence [0, 100] has constraints {min_value: 0, max_value: 100}
1096 # at index 1, but [0, 101] has constraints {min_value: 101, max_value: 200} at
1097 # index 1 (which does not permit any of the values 0-100).
1098 #
1099 # When the choice sequence becomes misaligned, we generate a new value of the
1100 # type and constraints the strategy expects.
1101 if node_choice_type != choice_type or not choice_permitted(choice, constraints):
1102 # only track first misalignment for now.
1103 if self.misaligned_at is None:
1104 self.misaligned_at = (self.index, choice_type, constraints, forced)
1105 try:
1106 # Fill in any misalignments with index 0 choices. An alternative to
1107 # this is using the index of the misaligned choice instead
1108 # of index 0, which may be useful for maintaining
1109 # "similarly-complex choices" in the shrinker. This requires
1110 # attaching an index to every choice in ConjectureData.for_choices,
1111 # which we don't always have (e.g. when reading from db).
1112 #
1113 # If we really wanted this in the future we could make this complexity
1114 # optional, use it if present, and default to index 0 otherwise.
1115 # This complicates our internal api and so I'd like to avoid it
1116 # if possible.
1117 #
1118 # Additionally, I don't think slips which require
1119 # slipping to high-complexity values are common. Though arguably
1120 # we may want to expand a bit beyond *just* the simplest choice.
1121 # (we could for example consider sampling choices from index 0-10).
1122 choice = choice_from_index(0, choice_type, constraints)
1123 except ChoiceTooLarge:
1124 # should really never happen with a 0-index choice, but let's be safe.
1125 self.mark_overrun()
1126
1127 self.index += 1
1128 return choice
1129
1130 def as_result(self) -> ConjectureResult | _Overrun:
1131 """Convert the result of running this test into
1132 either an Overrun object or a ConjectureResult."""
1133
1134 assert self.frozen
1135 if self.status == Status.OVERRUN:
1136 return Overrun
1137 if self.__result is None:
1138 self.__result = ConjectureResult(
1139 status=self.status,
1140 interesting_origin=self.interesting_origin,
1141 spans=self.spans,
1142 nodes=self.nodes,
1143 length=self.length,
1144 output=self.output,
1145 expected_traceback=self.expected_traceback,
1146 expected_exception=self.expected_exception,
1147 has_discards=self.has_discards,
1148 target_observations=self.target_observations,
1149 tags=frozenset(self.tags),
1150 arg_slices=self.arg_slices,
1151 slice_comments=self.slice_comments,
1152 misaligned_at=self.misaligned_at,
1153 cannot_proceed_scope=self.cannot_proceed_scope,
1154 )
1155 assert self.__result is not None
1156 return self.__result
1157
1158 def __assert_not_frozen(self, name: str) -> None:
1159 if self.frozen:
1160 raise Frozen(f"Cannot call {name} on frozen ConjectureData")
1161
1162 def note(self, value: Any) -> None:
1163 self.__assert_not_frozen("note")
1164 if not isinstance(value, str):
1165 value = repr(value)
1166 self.output += value
1167
1168 def draw(
1169 self,
1170 strategy: "SearchStrategy[Ex]",
1171 label: int | None = None,
1172 observe_as: str | None = None,
1173 ) -> "Ex":
1174 from hypothesis.internal.observability import observability_enabled
1175 from hypothesis.strategies._internal.lazy import unwrap_strategies
1176 from hypothesis.strategies._internal.utils import to_jsonable
1177
1178 at_top_level = self.depth == 0
1179 start_time = None
1180 if at_top_level:
1181 # We start this timer early, because accessing attributes on a LazyStrategy
1182 # can be almost arbitrarily slow. In cases like characters() and text()
1183 # where we cache something expensive, this led to Flaky deadline errors!
1184 # See https://github.com/HypothesisWorks/hypothesis/issues/2108
1185 start_time = time.perf_counter()
1186 gc_start_time = gc_cumulative_time()
1187
1188 strategy.validate()
1189
1190 if strategy.is_empty:
1191 self.mark_invalid(f"empty strategy {self!r}")
1192
1193 if self.depth >= MAX_DEPTH:
1194 self.mark_invalid("max depth exceeded")
1195
1196 # Jump directly to the unwrapped strategy for the label and for do_draw.
1197 # This avoids adding an extra span to all lazy strategies.
1198 unwrapped = unwrap_strategies(strategy)
1199 if label is None:
1200 label = unwrapped.label
1201 assert isinstance(label, int)
1202
1203 self.start_span(label=label)
1204 try:
1205 if not at_top_level:
1206 return unwrapped.do_draw(self)
1207 assert start_time is not None
1208 key = observe_as or f"generate:unlabeled_{len(self.draw_times)}"
1209 try:
1210 try:
1211 v = unwrapped.do_draw(self)
1212 finally:
1213 # Subtract the time spent in GC to avoid overcounting, as it is
1214 # accounted for at the overall example level.
1215 in_gctime = gc_cumulative_time() - gc_start_time
1216 self.draw_times[key] = time.perf_counter() - start_time - in_gctime
1217 except Exception as err:
1218 add_note(
1219 err,
1220 f"while generating {key.removeprefix('generate:')!r} from {strategy!r}",
1221 )
1222 raise
1223 if observability_enabled():
1224 avoid = self.provider.avoid_realization
1225 self._observability_args[key] = to_jsonable(v, avoid_realization=avoid)
1226 return v
1227 finally:
1228 self.stop_span()
1229
1230 def start_span(self, label: int) -> None:
1231 self.provider.span_start(label)
1232 self.__assert_not_frozen("start_span")
1233 self.depth += 1
1234 # Logically it would make sense for this to just be
1235 # ``self.depth = max(self.depth, self.max_depth)``, which is what it used to
1236 # be until we ran the code under tracemalloc and found a rather significant
1237 # chunk of allocation was happening here. This was presumably due to varargs
1238 # or the like, but we didn't investigate further given that it was easy
1239 # to fix with this check.
1240 if self.depth > self.max_depth:
1241 self.max_depth = self.depth
1242 self.__span_record.start_span(label)
1243 self.labels_for_structure_stack.append({label})
1244
1245 def stop_span(self, *, discard: bool = False) -> None:
1246 self.provider.span_end(discard)
1247 if self.frozen:
1248 return
1249 if discard:
1250 self.has_discards = True
1251 self.depth -= 1
1252 assert self.depth >= -1
1253 self.__span_record.stop_span(discard=discard)
1254
1255 labels_for_structure = self.labels_for_structure_stack.pop()
1256
1257 if not discard:
1258 if self.labels_for_structure_stack:
1259 self.labels_for_structure_stack[-1].update(labels_for_structure)
1260 else:
1261 self.tags.update([structural_coverage(l) for l in labels_for_structure])
1262
1263 if discard:
1264 # Once we've discarded a span, every test case starting with
1265 # this prefix contains discards. We prune the tree at that point so
1266 # as to avoid future test cases bothering with this region, on the
1267 # assumption that some span that you could have used instead
1268 # there would *not* trigger the discard. This greatly speeds up
1269 # test case generation in some cases, because it allows us to
1270 # ignore large swathes of the search space that are effectively
1271 # redundant.
1272 #
1273 # A scenario that can cause us problems but which we deliberately
1274 # have decided not to support is that if there are side effects
1275 # during data generation then you may end up with a scenario where
1276 # every good test case generates a discard because the discarded
1277 # section sets up important things for later. This is not terribly
1278 # likely and all that you see in this case is some degradation in
1279 # quality of testing, so we don't worry about it.
1280 #
1281 # Note that killing the branch does *not* mean we will never
1282 # explore below this point, and in particular we may do so during
1283 # shrinking. Any explicit request for a data object that starts
1284 # with the branch here will work just fine, but novel prefix
1285 # generation will avoid it, and we can use it to detect when we
1286 # have explored the entire tree (up to redundancy).
1287
1288 self.observer.kill_branch()
1289
1290 @property
1291 def spans(self) -> Spans:
1292 assert self.frozen
1293 if self.__spans is None:
1294 self.__spans = Spans(record=self.__span_record)
1295 return self.__spans
1296
1297 def freeze(self) -> None:
1298 if self.frozen:
1299 return
1300 self.finish_time = time.perf_counter()
1301 self.gc_finish_time = gc_cumulative_time()
1302
1303 # Always finish by closing all remaining spans so that we have a valid tree.
1304 while self.depth >= 0:
1305 self.stop_span()
1306
1307 self.__span_record.freeze()
1308 self.frozen = True
1309 self.observer.conclude_test(self.status, self.interesting_origin)
1310
1311 def choice(
1312 self,
1313 values: Sequence[T],
1314 *,
1315 forced: T | None = None,
1316 observe: bool = True,
1317 ) -> T:
1318 forced_i = None if forced is None else values.index(forced)
1319 i = self.draw_integer(
1320 0,
1321 len(values) - 1,
1322 forced=forced_i,
1323 observe=observe,
1324 )
1325 return values[i]
1326
1327 def conclude_test(
1328 self,
1329 status: Status,
1330 interesting_origin: InterestingOrigin | None = None,
1331 ) -> NoReturn:
1332 assert (interesting_origin is None) or (status == Status.INTERESTING)
1333 self.__assert_not_frozen("conclude_test")
1334 self.interesting_origin = interesting_origin
1335 self.status = status
1336 self.freeze()
1337 raise StopTest(self.testcounter)
1338
1339 def mark_interesting(self, interesting_origin: InterestingOrigin) -> NoReturn:
1340 self.conclude_test(Status.INTERESTING, interesting_origin)
1341
1342 def mark_invalid(self, why: str | None = None) -> NoReturn:
1343 if why is not None:
1344 self.events["invalid because"] = why
1345 self.conclude_test(Status.INVALID)
1346
1347 def mark_overrun(self) -> NoReturn:
1348 self.conclude_test(Status.OVERRUN)
1349
1350
1351def draw_choice(
1352 choice_type: ChoiceTypeT, constraints: ChoiceConstraintsT, *, random: Random
1353) -> ChoiceT:
1354 cd = ConjectureData(random=random)
1355 return cast(ChoiceT, getattr(cd.provider, f"draw_{choice_type}")(**constraints))