1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
10
11import math
12import time
13from collections import defaultdict
14from collections.abc import Hashable, Iterable, Iterator, Sequence
15from enum import IntEnum
16from functools import cached_property
17from random import Random
18from typing import (
19 TYPE_CHECKING,
20 Any,
21 Literal,
22 NoReturn,
23 Optional,
24 TypeVar,
25 Union,
26 cast,
27 overload,
28)
29
30import attr
31
32from hypothesis.errors import (
33 CannotProceedScopeT,
34 ChoiceTooLarge,
35 Frozen,
36 InvalidArgument,
37 StopTest,
38)
39from hypothesis.internal.cache import LRUCache
40from hypothesis.internal.compat import add_note
41from hypothesis.internal.conjecture.choice import (
42 BooleanConstraints,
43 BytesConstraints,
44 ChoiceConstraintsT,
45 ChoiceNode,
46 ChoiceT,
47 ChoiceTemplate,
48 ChoiceTypeT,
49 FloatConstraints,
50 IntegerConstraints,
51 StringConstraints,
52 choice_constraints_key,
53 choice_from_index,
54 choice_permitted,
55 choices_size,
56)
57from hypothesis.internal.conjecture.junkdrawer import IntList, gc_cumulative_time
58from hypothesis.internal.conjecture.providers import (
59 COLLECTION_DEFAULT_MAX_SIZE,
60 HypothesisProvider,
61 PrimitiveProvider,
62)
63from hypothesis.internal.conjecture.utils import calc_label_from_name
64from hypothesis.internal.escalation import InterestingOrigin
65from hypothesis.internal.floats import (
66 SMALLEST_SUBNORMAL,
67 float_to_int,
68 int_to_float,
69 sign_aware_lte,
70)
71from hypothesis.internal.intervalsets import IntervalSet
72from hypothesis.internal.observability import PredicateCounts
73from hypothesis.reporting import debug_report
74from hypothesis.utils.conventions import not_set
75from hypothesis.utils.threading import ThreadLocal
76
77if TYPE_CHECKING:
78 from typing import TypeAlias
79
80 from hypothesis.strategies import SearchStrategy
81 from hypothesis.strategies._internal.core import DataObject
82 from hypothesis.strategies._internal.random import RandomState
83 from hypothesis.strategies._internal.strategies import Ex
84
85
86def __getattr__(name: str) -> Any:
87 if name == "AVAILABLE_PROVIDERS":
88 from hypothesis._settings import note_deprecation
89 from hypothesis.internal.conjecture.providers import AVAILABLE_PROVIDERS
90
91 note_deprecation(
92 "hypothesis.internal.conjecture.data.AVAILABLE_PROVIDERS has been moved to "
93 "hypothesis.internal.conjecture.providers.AVAILABLE_PROVIDERS.",
94 since="2025-01-25",
95 has_codemod=False,
96 stacklevel=1,
97 )
98 return AVAILABLE_PROVIDERS
99
100 raise AttributeError(
101 f"Module 'hypothesis.internal.conjecture.data' has no attribute {name}"
102 )
103
104
105T = TypeVar("T")
106TargetObservations = dict[str, Union[int, float]]
107# index, choice_type, constraints, forced value
108MisalignedAt: "TypeAlias" = tuple[
109 int, ChoiceTypeT, ChoiceConstraintsT, Optional[ChoiceT]
110]
111
112TOP_LABEL = calc_label_from_name("top")
113MAX_DEPTH = 100
114
115threadlocal = ThreadLocal(global_test_counter=int)
116
117
118class ExtraInformation:
119 """A class for holding shared state on a ``ConjectureData`` that should
120 be added to the final ``ConjectureResult``."""
121
122 def __repr__(self) -> str:
123 return "ExtraInformation({})".format(
124 ", ".join(f"{k}={v!r}" for k, v in self.__dict__.items()),
125 )
126
127 def has_information(self) -> bool:
128 return bool(self.__dict__)
129
130
131class Status(IntEnum):
132 OVERRUN = 0
133 INVALID = 1
134 VALID = 2
135 INTERESTING = 3
136
137 def __repr__(self) -> str:
138 return f"Status.{self.name}"
139
140
141@attr.s(slots=True, frozen=True)
142class StructuralCoverageTag:
143 label: int = attr.ib()
144
145
146STRUCTURAL_COVERAGE_CACHE: dict[int, StructuralCoverageTag] = {}
147
148
149def structural_coverage(label: int) -> StructuralCoverageTag:
150 try:
151 return STRUCTURAL_COVERAGE_CACHE[label]
152 except KeyError:
153 return STRUCTURAL_COVERAGE_CACHE.setdefault(label, StructuralCoverageTag(label))
154
155
156# This cache can be quite hot and so we prefer LRUCache over LRUReusedCache for
157# performance. We lose scan resistance, but that's probably fine here.
158POOLED_CONSTRAINTS_CACHE: LRUCache[tuple[Any, ...], ChoiceConstraintsT] = LRUCache(4096)
159
160
161class Span:
162 """A span tracks the hierarchical structure of choices within a single test run.
163
164 Spans are created to mark regions of the choice sequence that that are
165 logically related to each other. For instance, Hypothesis tracks:
166 - A single top-level span for the entire choice sequence
167 - A span for the choices made by each strategy
168 - Some strategies define additional spans within their choices. For instance,
169 st.lists() tracks the "should add another element" choice and the "add
170 another element" choices as separate spans.
171
172 Spans provide useful information to the shrinker, mutator, targeted PBT,
173 and other subsystems of Hypothesis.
174
175 Rather than store each ``Span`` as a rich object, it is actually
176 just an index into the ``Spans`` class defined below. This has two
177 purposes: Firstly, for most properties of spans we will never need
178 to allocate storage at all, because most properties are not used on
179 most spans. Secondly, by storing the spans as compact lists
180 of integers, we save a considerable amount of space compared to
181 Python's normal object size.
182
183 This does have the downside that it increases the amount of allocation
184 we do, and slows things down as a result, in some usage patterns because
185 we repeatedly allocate the same Span or int objects, but it will
186 often dramatically reduce our memory usage, so is worth it.
187 """
188
189 __slots__ = ("index", "owner")
190
191 def __init__(self, owner: "Spans", index: int) -> None:
192 self.owner = owner
193 self.index = index
194
195 def __eq__(self, other: object) -> bool:
196 if self is other:
197 return True
198 if not isinstance(other, Span):
199 return NotImplemented
200 return (self.owner is other.owner) and (self.index == other.index)
201
202 def __ne__(self, other: object) -> bool:
203 if self is other:
204 return False
205 if not isinstance(other, Span):
206 return NotImplemented
207 return (self.owner is not other.owner) or (self.index != other.index)
208
209 def __repr__(self) -> str:
210 return f"spans[{self.index}]"
211
212 @property
213 def label(self) -> int:
214 """A label is an opaque value that associates each span with its
215 approximate origin, such as a particular strategy class or a particular
216 kind of draw."""
217 return self.owner.labels[self.owner.label_indices[self.index]]
218
219 @property
220 def parent(self) -> Optional[int]:
221 """The index of the span that this one is nested directly within."""
222 if self.index == 0:
223 return None
224 return self.owner.parentage[self.index]
225
226 @property
227 def start(self) -> int:
228 return self.owner.starts[self.index]
229
230 @property
231 def end(self) -> int:
232 return self.owner.ends[self.index]
233
234 @property
235 def depth(self) -> int:
236 """
237 Depth of this span in the span tree. The top-level span has a depth of 0.
238 """
239 return self.owner.depths[self.index]
240
241 @property
242 def discarded(self) -> bool:
243 """True if this is span's ``stop_span`` call had ``discard`` set to
244 ``True``. This means we believe that the shrinker should be able to delete
245 this span completely, without affecting the value produced by its enclosing
246 strategy. Typically set when a rejection sampler decides to reject a
247 generated value and try again."""
248 return self.index in self.owner.discarded
249
250 @property
251 def choice_count(self) -> int:
252 """The number of choices in this span."""
253 return self.end - self.start
254
255 @property
256 def children(self) -> "list[Span]":
257 """The list of all spans with this as a parent, in increasing index
258 order."""
259 return [self.owner[i] for i in self.owner.children[self.index]]
260
261
262class SpanProperty:
263 """There are many properties of spans that we calculate by
264 essentially rerunning the test case multiple times based on the
265 calls which we record in SpanProperty.
266
267 This class defines a visitor, subclasses of which can be used
268 to calculate these properties.
269 """
270
271 def __init__(self, spans: "Spans"):
272 self.span_stack: list[int] = []
273 self.spans = spans
274 self.span_count = 0
275 self.choice_count = 0
276
277 def run(self) -> Any:
278 """Rerun the test case with this visitor and return the
279 results of ``self.finish()``."""
280 for record in self.spans.trail:
281 if record == TrailType.STOP_SPAN_DISCARD:
282 self.__pop(discarded=True)
283 elif record == TrailType.STOP_SPAN_NO_DISCARD:
284 self.__pop(discarded=False)
285 elif record == TrailType.CHOICE:
286 self.choice_count += 1
287 else:
288 # everything after TrailType.CHOICE is the label of a span start.
289 self.__push(record - TrailType.CHOICE - 1)
290
291 return self.finish()
292
293 def __push(self, label_index: int) -> None:
294 i = self.span_count
295 assert i < len(self.spans)
296 self.start_span(i, label_index=label_index)
297 self.span_count += 1
298 self.span_stack.append(i)
299
300 def __pop(self, *, discarded: bool) -> None:
301 i = self.span_stack.pop()
302 self.stop_span(i, discarded=discarded)
303
304 def start_span(self, i: int, label_index: int) -> None:
305 """Called at the start of each span, with ``i`` the
306 index of the span and ``label_index`` the index of
307 its label in ``self.spans.labels``."""
308
309 def stop_span(self, i: int, *, discarded: bool) -> None:
310 """Called at the end of each span, with ``i`` the
311 index of the span and ``discarded`` being ``True`` if ``stop_span``
312 was called with ``discard=True``."""
313
314 def finish(self) -> Any:
315 raise NotImplementedError
316
317
318class TrailType(IntEnum):
319 STOP_SPAN_DISCARD = 1
320 STOP_SPAN_NO_DISCARD = 2
321 CHOICE = 3
322 # every trail element larger than TrailType.CHOICE is the label of a span
323 # start, offset by its index. So the first span label is stored as 4, the
324 # second as 5, etc, regardless of its actual integer label.
325
326
327class SpanRecord:
328 """Records the series of ``start_span``, ``stop_span``, and
329 ``draw_bits`` calls so that these may be stored in ``Spans`` and
330 replayed when we need to know about the structure of individual
331 ``Span`` objects.
332
333 Note that there is significant similarity between this class and
334 ``DataObserver``, and the plan is to eventually unify them, but
335 they currently have slightly different functions and implementations.
336 """
337
338 def __init__(self) -> None:
339 self.labels: list[int] = []
340 self.__index_of_labels: Optional[dict[int, int]] = {}
341 self.trail = IntList()
342 self.nodes: list[ChoiceNode] = []
343
344 def freeze(self) -> None:
345 self.__index_of_labels = None
346
347 def record_choice(self) -> None:
348 self.trail.append(TrailType.CHOICE)
349
350 def start_span(self, label: int) -> None:
351 assert self.__index_of_labels is not None
352 try:
353 i = self.__index_of_labels[label]
354 except KeyError:
355 i = self.__index_of_labels.setdefault(label, len(self.labels))
356 self.labels.append(label)
357 self.trail.append(TrailType.CHOICE + 1 + i)
358
359 def stop_span(self, *, discard: bool) -> None:
360 if discard:
361 self.trail.append(TrailType.STOP_SPAN_DISCARD)
362 else:
363 self.trail.append(TrailType.STOP_SPAN_NO_DISCARD)
364
365
366class _starts_and_ends(SpanProperty):
367 def __init__(self, spans: "Spans") -> None:
368 super().__init__(spans)
369 self.starts = IntList.of_length(len(self.spans))
370 self.ends = IntList.of_length(len(self.spans))
371
372 def start_span(self, i: int, label_index: int) -> None:
373 self.starts[i] = self.choice_count
374
375 def stop_span(self, i: int, *, discarded: bool) -> None:
376 self.ends[i] = self.choice_count
377
378 def finish(self) -> tuple[IntList, IntList]:
379 return (self.starts, self.ends)
380
381
382class _discarded(SpanProperty):
383 def __init__(self, spans: "Spans") -> None:
384 super().__init__(spans)
385 self.result: set[int] = set()
386
387 def finish(self) -> frozenset[int]:
388 return frozenset(self.result)
389
390 def stop_span(self, i: int, *, discarded: bool) -> None:
391 if discarded:
392 self.result.add(i)
393
394
395class _parentage(SpanProperty):
396 def __init__(self, spans: "Spans") -> None:
397 super().__init__(spans)
398 self.result = IntList.of_length(len(self.spans))
399
400 def stop_span(self, i: int, *, discarded: bool) -> None:
401 if i > 0:
402 self.result[i] = self.span_stack[-1]
403
404 def finish(self) -> IntList:
405 return self.result
406
407
408class _depths(SpanProperty):
409 def __init__(self, spans: "Spans") -> None:
410 super().__init__(spans)
411 self.result = IntList.of_length(len(self.spans))
412
413 def start_span(self, i: int, label_index: int) -> None:
414 self.result[i] = len(self.span_stack)
415
416 def finish(self) -> IntList:
417 return self.result
418
419
420class _label_indices(SpanProperty):
421 def __init__(self, spans: "Spans") -> None:
422 super().__init__(spans)
423 self.result = IntList.of_length(len(self.spans))
424
425 def start_span(self, i: int, label_index: int) -> None:
426 self.result[i] = label_index
427
428 def finish(self) -> IntList:
429 return self.result
430
431
432class _mutator_groups(SpanProperty):
433 def __init__(self, spans: "Spans") -> None:
434 super().__init__(spans)
435 self.groups: dict[int, set[tuple[int, int]]] = defaultdict(set)
436
437 def start_span(self, i: int, label_index: int) -> None:
438 # TODO should we discard start == end cases? occurs for eg st.data()
439 # which is conditionally or never drawn from. arguably swapping
440 # nodes with the empty list is a useful mutation enabled by start == end?
441 key = (self.spans[i].start, self.spans[i].end)
442 self.groups[label_index].add(key)
443
444 def finish(self) -> Iterable[set[tuple[int, int]]]:
445 # Discard groups with only one span, since the mutator can't
446 # do anything useful with them.
447 return [g for g in self.groups.values() if len(g) >= 2]
448
449
450class Spans:
451 """A lazy collection of ``Span`` objects, derived from
452 the record of recorded behaviour in ``SpanRecord``.
453
454 Behaves logically as if it were a list of ``Span`` objects,
455 but actually mostly exists as a compact store of information
456 for them to reference into. All properties on here are best
457 understood as the backing storage for ``Span`` and are
458 described there.
459 """
460
461 def __init__(self, record: SpanRecord) -> None:
462 self.trail = record.trail
463 self.labels = record.labels
464 self.__length = self.trail.count(
465 TrailType.STOP_SPAN_DISCARD
466 ) + record.trail.count(TrailType.STOP_SPAN_NO_DISCARD)
467 self.__children: Optional[list[Sequence[int]]] = None
468
469 @cached_property
470 def starts_and_ends(self) -> tuple[IntList, IntList]:
471 return _starts_and_ends(self).run()
472
473 @property
474 def starts(self) -> IntList:
475 return self.starts_and_ends[0]
476
477 @property
478 def ends(self) -> IntList:
479 return self.starts_and_ends[1]
480
481 @cached_property
482 def discarded(self) -> frozenset[int]:
483 return _discarded(self).run()
484
485 @cached_property
486 def parentage(self) -> IntList:
487 return _parentage(self).run()
488
489 @cached_property
490 def depths(self) -> IntList:
491 return _depths(self).run()
492
493 @cached_property
494 def label_indices(self) -> IntList:
495 return _label_indices(self).run()
496
497 @cached_property
498 def mutator_groups(self) -> list[set[tuple[int, int]]]:
499 return _mutator_groups(self).run()
500
501 @property
502 def children(self) -> list[Sequence[int]]:
503 if self.__children is None:
504 children = [IntList() for _ in range(len(self))]
505 for i, p in enumerate(self.parentage):
506 if i > 0:
507 children[p].append(i)
508 # Replace empty children lists with a tuple to reduce
509 # memory usage.
510 for i, c in enumerate(children):
511 if not c:
512 children[i] = () # type: ignore
513 self.__children = children # type: ignore
514 return self.__children # type: ignore
515
516 def __len__(self) -> int:
517 return self.__length
518
519 def __getitem__(self, i: int) -> Span:
520 n = self.__length
521 if i < -n or i >= n:
522 raise IndexError(f"Index {i} out of range [-{n}, {n})")
523 if i < 0:
524 i += n
525 return Span(self, i)
526
527 # not strictly necessary as we have len/getitem, but required for mypy.
528 # https://github.com/python/mypy/issues/9737
529 def __iter__(self) -> Iterator[Span]:
530 for i in range(len(self)):
531 yield self[i]
532
533
534class _Overrun:
535 status: Status = Status.OVERRUN
536
537 def __repr__(self) -> str:
538 return "Overrun"
539
540
541Overrun = _Overrun()
542
543
544class DataObserver:
545 """Observer class for recording the behaviour of a
546 ConjectureData object, primarily used for tracking
547 the behaviour in the tree cache."""
548
549 def conclude_test(
550 self,
551 status: Status,
552 interesting_origin: Optional[InterestingOrigin],
553 ) -> None:
554 """Called when ``conclude_test`` is called on the
555 observed ``ConjectureData``, with the same arguments.
556
557 Note that this is called after ``freeze`` has completed.
558 """
559
560 def kill_branch(self) -> None:
561 """Mark this part of the tree as not worth re-exploring."""
562
563 def draw_integer(
564 self, value: int, *, constraints: IntegerConstraints, was_forced: bool
565 ) -> None:
566 pass
567
568 def draw_float(
569 self, value: float, *, constraints: FloatConstraints, was_forced: bool
570 ) -> None:
571 pass
572
573 def draw_string(
574 self, value: str, *, constraints: StringConstraints, was_forced: bool
575 ) -> None:
576 pass
577
578 def draw_bytes(
579 self, value: bytes, *, constraints: BytesConstraints, was_forced: bool
580 ) -> None:
581 pass
582
583 def draw_boolean(
584 self, value: bool, *, constraints: BooleanConstraints, was_forced: bool
585 ) -> None:
586 pass
587
588
589@attr.s(slots=True)
590class ConjectureResult:
591 """Result class storing the parts of ConjectureData that we
592 will care about after the original ConjectureData has outlived its
593 usefulness."""
594
595 status: Status = attr.ib()
596 interesting_origin: Optional[InterestingOrigin] = attr.ib()
597 nodes: tuple[ChoiceNode, ...] = attr.ib(eq=False, repr=False)
598 length: int = attr.ib()
599 output: str = attr.ib()
600 extra_information: Optional[ExtraInformation] = attr.ib()
601 expected_exception: Optional[BaseException] = attr.ib()
602 expected_traceback: Optional[str] = attr.ib()
603 has_discards: bool = attr.ib()
604 target_observations: TargetObservations = attr.ib()
605 tags: frozenset[StructuralCoverageTag] = attr.ib()
606 spans: Spans = attr.ib(repr=False, eq=False)
607 arg_slices: set[tuple[int, int]] = attr.ib(repr=False)
608 slice_comments: dict[tuple[int, int], str] = attr.ib(repr=False)
609 misaligned_at: Optional[MisalignedAt] = attr.ib(repr=False)
610 cannot_proceed_scope: Optional[CannotProceedScopeT] = attr.ib(repr=False)
611
612 def as_result(self) -> "ConjectureResult":
613 return self
614
615 @property
616 def choices(self) -> tuple[ChoiceT, ...]:
617 return tuple(node.value for node in self.nodes)
618
619
620class ConjectureData:
621 @classmethod
622 def for_choices(
623 cls,
624 choices: Sequence[Union[ChoiceTemplate, ChoiceT]],
625 *,
626 observer: Optional[DataObserver] = None,
627 provider: Union[type, PrimitiveProvider] = HypothesisProvider,
628 random: Optional[Random] = None,
629 ) -> "ConjectureData":
630 from hypothesis.internal.conjecture.engine import choice_count
631
632 return cls(
633 max_choices=choice_count(choices),
634 random=random,
635 prefix=choices,
636 observer=observer,
637 provider=provider,
638 )
639
640 def __init__(
641 self,
642 *,
643 random: Optional[Random],
644 observer: Optional[DataObserver] = None,
645 provider: Union[type, PrimitiveProvider] = HypothesisProvider,
646 prefix: Optional[Sequence[Union[ChoiceTemplate, ChoiceT]]] = None,
647 max_choices: Optional[int] = None,
648 provider_kw: Optional[dict[str, Any]] = None,
649 ) -> None:
650 from hypothesis.internal.conjecture.engine import BUFFER_SIZE
651
652 if observer is None:
653 observer = DataObserver()
654 if provider_kw is None:
655 provider_kw = {}
656 elif not isinstance(provider, type):
657 raise InvalidArgument(
658 f"Expected {provider=} to be a class since {provider_kw=} was "
659 "passed, but got an instance instead."
660 )
661
662 assert isinstance(observer, DataObserver)
663 self.observer = observer
664 self.max_choices = max_choices
665 self.max_length = BUFFER_SIZE
666 self.overdraw = 0
667 self._random = random
668
669 self.length = 0
670 self.index = 0
671 self.output = ""
672 self.status = Status.VALID
673 self.frozen = False
674 self.testcounter = threadlocal.global_test_counter
675 threadlocal.global_test_counter += 1
676 self.start_time = time.perf_counter()
677 self.gc_start_time = gc_cumulative_time()
678 self.events: dict[str, Union[str, int, float]] = {}
679 self.interesting_origin: Optional[InterestingOrigin] = None
680 self.draw_times: dict[str, float] = {}
681 self._stateful_run_times: dict[str, float] = defaultdict(float)
682 self.max_depth = 0
683 self.has_discards = False
684
685 self.provider: PrimitiveProvider = (
686 provider(self, **provider_kw) if isinstance(provider, type) else provider
687 )
688 assert isinstance(self.provider, PrimitiveProvider)
689
690 self.__result: Optional[ConjectureResult] = None
691
692 # Observations used for targeted search. They'll be aggregated in
693 # ConjectureRunner.generate_new_examples and fed to TargetSelector.
694 self.target_observations: TargetObservations = {}
695
696 # Tags which indicate something about which part of the search space
697 # this example is in. These are used to guide generation.
698 self.tags: set[StructuralCoverageTag] = set()
699 self.labels_for_structure_stack: list[set[int]] = []
700
701 # Normally unpopulated but we need this in the niche case
702 # that self.as_result() is Overrun but we still want the
703 # examples for reporting purposes.
704 self.__spans: Optional[Spans] = None
705
706 # We want the top level span to have depth 0, so we start
707 # at -1.
708 self.depth = -1
709 self.__span_record = SpanRecord()
710
711 # Slice indices for discrete reportable parts that which-parts-matter can
712 # try varying, to report if the minimal example always fails anyway.
713 self.arg_slices: set[tuple[int, int]] = set()
714 self.slice_comments: dict[tuple[int, int], str] = {}
715 self._observability_args: dict[str, Any] = {}
716 self._observability_predicates: defaultdict[str, PredicateCounts] = defaultdict(
717 PredicateCounts
718 )
719
720 self._sampled_from_all_strategies_elements_message: Optional[
721 tuple[str, object]
722 ] = None
723 self._shared_strategy_draws: dict[Hashable, tuple[int, Any]] = {}
724 self._shared_data_strategy: Optional[DataObject] = None
725 self._stateful_repr_parts: Optional[list[Any]] = None
726 self.states_for_ids: Optional[dict[int, RandomState]] = None
727 self.seeds_to_states: Optional[dict[Any, RandomState]] = None
728 self.hypothesis_runner = not_set
729
730 self.expected_exception: Optional[BaseException] = None
731 self.expected_traceback: Optional[str] = None
732 self.extra_information = ExtraInformation()
733
734 self.prefix = prefix
735 self.nodes: tuple[ChoiceNode, ...] = ()
736 self.misaligned_at: Optional[MisalignedAt] = None
737 self.cannot_proceed_scope: Optional[CannotProceedScopeT] = None
738 self.start_span(TOP_LABEL)
739
740 def __repr__(self) -> str:
741 return "ConjectureData(%s, %d choices%s)" % (
742 self.status.name,
743 len(self.nodes),
744 ", frozen" if self.frozen else "",
745 )
746
747 @property
748 def choices(self) -> tuple[ChoiceT, ...]:
749 return tuple(node.value for node in self.nodes)
750
751 # draw_* functions might be called in one of two contexts: either "above" or
752 # "below" the choice sequence. For instance, draw_string calls draw_boolean
753 # from ``many`` when calculating the number of characters to return. We do
754 # not want these choices to get written to the choice sequence, because they
755 # are not true choices themselves.
756 #
757 # `observe` formalizes this. The choice will only be written to the choice
758 # sequence if observe is True.
759
760 @overload
761 def _draw(
762 self,
763 choice_type: Literal["integer"],
764 constraints: IntegerConstraints,
765 *,
766 observe: bool,
767 forced: Optional[int],
768 ) -> int: ...
769
770 @overload
771 def _draw(
772 self,
773 choice_type: Literal["float"],
774 constraints: FloatConstraints,
775 *,
776 observe: bool,
777 forced: Optional[float],
778 ) -> float: ...
779
780 @overload
781 def _draw(
782 self,
783 choice_type: Literal["string"],
784 constraints: StringConstraints,
785 *,
786 observe: bool,
787 forced: Optional[str],
788 ) -> str: ...
789
790 @overload
791 def _draw(
792 self,
793 choice_type: Literal["bytes"],
794 constraints: BytesConstraints,
795 *,
796 observe: bool,
797 forced: Optional[bytes],
798 ) -> bytes: ...
799
800 @overload
801 def _draw(
802 self,
803 choice_type: Literal["boolean"],
804 constraints: BooleanConstraints,
805 *,
806 observe: bool,
807 forced: Optional[bool],
808 ) -> bool: ...
809
810 def _draw(
811 self,
812 choice_type: ChoiceTypeT,
813 constraints: ChoiceConstraintsT,
814 *,
815 observe: bool,
816 forced: Optional[ChoiceT],
817 ) -> ChoiceT:
818 # this is somewhat redundant with the length > max_length check at the
819 # end of the function, but avoids trying to use a null self.random when
820 # drawing past the node of a ConjectureData.for_choices data.
821 if self.length == self.max_length:
822 debug_report(f"overrun because hit {self.max_length=}")
823 self.mark_overrun()
824 if len(self.nodes) == self.max_choices:
825 debug_report(f"overrun because hit {self.max_choices=}")
826 self.mark_overrun()
827
828 if observe and self.prefix is not None and self.index < len(self.prefix):
829 value = self._pop_choice(choice_type, constraints, forced=forced)
830 elif forced is None:
831 value = getattr(self.provider, f"draw_{choice_type}")(**constraints)
832
833 if forced is not None:
834 value = forced
835
836 # nan values generated via int_to_float break list membership:
837 #
838 # >>> n = 18444492273895866368
839 # >>> assert math.isnan(int_to_float(n))
840 # >>> assert int_to_float(n) not in [int_to_float(n)]
841 #
842 # because int_to_float nans are not equal in the sense of either
843 # `a == b` or `a is b`.
844 #
845 # This can lead to flaky errors when collections require unique
846 # floats. What was happening is that in some places we provided math.nan
847 # provide math.nan, and in others we provided
848 # int_to_float(float_to_int(math.nan)), and which one gets used
849 # was not deterministic across test iterations.
850 #
851 # To fix this, *never* provide a nan value which is equal (via `is`) to
852 # another provided nan value. This sacrifices some test power; we should
853 # bring that back (ABOVE the choice sequence layer) in the future.
854 #
855 # See https://github.com/HypothesisWorks/hypothesis/issues/3926.
856 if choice_type == "float":
857 assert isinstance(value, float)
858 if math.isnan(value):
859 value = int_to_float(float_to_int(value))
860
861 if observe:
862 was_forced = forced is not None
863 getattr(self.observer, f"draw_{choice_type}")(
864 value, constraints=constraints, was_forced=was_forced
865 )
866 size = 0 if self.provider.avoid_realization else choices_size([value])
867 if self.length + size > self.max_length:
868 debug_report(
869 f"overrun because {self.length=} + {size=} > {self.max_length=}"
870 )
871 self.mark_overrun()
872
873 node = ChoiceNode(
874 type=choice_type,
875 value=value,
876 constraints=constraints,
877 was_forced=was_forced,
878 index=len(self.nodes),
879 )
880 self.__span_record.record_choice()
881 self.nodes += (node,)
882 self.length += size
883
884 return value
885
886 def draw_integer(
887 self,
888 min_value: Optional[int] = None,
889 max_value: Optional[int] = None,
890 *,
891 weights: Optional[dict[int, float]] = None,
892 shrink_towards: int = 0,
893 forced: Optional[int] = None,
894 observe: bool = True,
895 ) -> int:
896 # Validate arguments
897 if weights is not None:
898 assert min_value is not None
899 assert max_value is not None
900 assert len(weights) <= 255 # arbitrary practical limit
901 # We can and should eventually support total weights. But this
902 # complicates shrinking as we can no longer assume we can force
903 # a value to the unmapped probability mass if that mass might be 0.
904 assert sum(weights.values()) < 1
905 # similarly, things get simpler if we assume every value is possible.
906 # we'll want to drop this restriction eventually.
907 assert all(w != 0 for w in weights.values())
908
909 if forced is not None and min_value is not None:
910 assert min_value <= forced
911 if forced is not None and max_value is not None:
912 assert forced <= max_value
913
914 constraints: IntegerConstraints = self._pooled_constraints(
915 "integer",
916 {
917 "min_value": min_value,
918 "max_value": max_value,
919 "weights": weights,
920 "shrink_towards": shrink_towards,
921 },
922 )
923 return self._draw("integer", constraints, observe=observe, forced=forced)
924
925 def draw_float(
926 self,
927 min_value: float = -math.inf,
928 max_value: float = math.inf,
929 *,
930 allow_nan: bool = True,
931 smallest_nonzero_magnitude: float = SMALLEST_SUBNORMAL,
932 # TODO: consider supporting these float widths at the choice sequence
933 # level in the future.
934 # width: Literal[16, 32, 64] = 64,
935 forced: Optional[float] = None,
936 observe: bool = True,
937 ) -> float:
938 assert smallest_nonzero_magnitude > 0
939 assert not math.isnan(min_value)
940 assert not math.isnan(max_value)
941
942 if smallest_nonzero_magnitude == 0.0: # pragma: no cover
943 raise FloatingPointError(
944 "Got allow_subnormal=True, but we can't represent subnormal floats "
945 "right now, in violation of the IEEE-754 floating-point "
946 "specification. This is usually because something was compiled with "
947 "-ffast-math or a similar option, which sets global processor state. "
948 "See https://simonbyrne.github.io/notes/fastmath/ for a more detailed "
949 "writeup - and good luck!"
950 )
951
952 if forced is not None:
953 assert allow_nan or not math.isnan(forced)
954 assert math.isnan(forced) or (
955 sign_aware_lte(min_value, forced) and sign_aware_lte(forced, max_value)
956 )
957
958 constraints: FloatConstraints = self._pooled_constraints(
959 "float",
960 {
961 "min_value": min_value,
962 "max_value": max_value,
963 "allow_nan": allow_nan,
964 "smallest_nonzero_magnitude": smallest_nonzero_magnitude,
965 },
966 )
967 return self._draw("float", constraints, observe=observe, forced=forced)
968
969 def draw_string(
970 self,
971 intervals: IntervalSet,
972 *,
973 min_size: int = 0,
974 max_size: int = COLLECTION_DEFAULT_MAX_SIZE,
975 forced: Optional[str] = None,
976 observe: bool = True,
977 ) -> str:
978 assert forced is None or min_size <= len(forced) <= max_size
979 assert min_size >= 0
980 if len(intervals) == 0:
981 assert min_size == 0
982
983 constraints: StringConstraints = self._pooled_constraints(
984 "string",
985 {
986 "intervals": intervals,
987 "min_size": min_size,
988 "max_size": max_size,
989 },
990 )
991 return self._draw("string", constraints, observe=observe, forced=forced)
992
993 def draw_bytes(
994 self,
995 min_size: int = 0,
996 max_size: int = COLLECTION_DEFAULT_MAX_SIZE,
997 *,
998 forced: Optional[bytes] = None,
999 observe: bool = True,
1000 ) -> bytes:
1001 assert forced is None or min_size <= len(forced) <= max_size
1002 assert min_size >= 0
1003
1004 constraints: BytesConstraints = self._pooled_constraints(
1005 "bytes", {"min_size": min_size, "max_size": max_size}
1006 )
1007 return self._draw("bytes", constraints, observe=observe, forced=forced)
1008
1009 def draw_boolean(
1010 self,
1011 p: float = 0.5,
1012 *,
1013 forced: Optional[bool] = None,
1014 observe: bool = True,
1015 ) -> bool:
1016 assert (forced is not True) or p > 0
1017 assert (forced is not False) or p < 1
1018
1019 constraints: BooleanConstraints = self._pooled_constraints("boolean", {"p": p})
1020 return self._draw("boolean", constraints, observe=observe, forced=forced)
1021
1022 @overload
1023 def _pooled_constraints(
1024 self, choice_type: Literal["integer"], constraints: IntegerConstraints
1025 ) -> IntegerConstraints: ...
1026
1027 @overload
1028 def _pooled_constraints(
1029 self, choice_type: Literal["float"], constraints: FloatConstraints
1030 ) -> FloatConstraints: ...
1031
1032 @overload
1033 def _pooled_constraints(
1034 self, choice_type: Literal["string"], constraints: StringConstraints
1035 ) -> StringConstraints: ...
1036
1037 @overload
1038 def _pooled_constraints(
1039 self, choice_type: Literal["bytes"], constraints: BytesConstraints
1040 ) -> BytesConstraints: ...
1041
1042 @overload
1043 def _pooled_constraints(
1044 self, choice_type: Literal["boolean"], constraints: BooleanConstraints
1045 ) -> BooleanConstraints: ...
1046
1047 def _pooled_constraints(
1048 self, choice_type: ChoiceTypeT, constraints: ChoiceConstraintsT
1049 ) -> ChoiceConstraintsT:
1050 """Memoize common dictionary objects to reduce memory pressure."""
1051 # caching runs afoul of nondeterminism checks
1052 if self.provider.avoid_realization:
1053 return constraints
1054
1055 key = (choice_type, *choice_constraints_key(choice_type, constraints))
1056 try:
1057 return POOLED_CONSTRAINTS_CACHE[key]
1058 except KeyError:
1059 POOLED_CONSTRAINTS_CACHE[key] = constraints
1060 return constraints
1061
1062 def _pop_choice(
1063 self,
1064 choice_type: ChoiceTypeT,
1065 constraints: ChoiceConstraintsT,
1066 *,
1067 forced: Optional[ChoiceT],
1068 ) -> ChoiceT:
1069 assert self.prefix is not None
1070 # checked in _draw
1071 assert self.index < len(self.prefix)
1072
1073 value = self.prefix[self.index]
1074 if isinstance(value, ChoiceTemplate):
1075 node: ChoiceTemplate = value
1076 if node.count is not None:
1077 assert node.count >= 0
1078 # node templates have to be at the end for now, since it's not immediately
1079 # apparent how to handle overruning a node template while generating a single
1080 # node if the alternative is not "the entire data is an overrun".
1081 assert self.index == len(self.prefix) - 1
1082 if node.type == "simplest":
1083 if forced is not None:
1084 choice = forced
1085 try:
1086 choice = choice_from_index(0, choice_type, constraints)
1087 except ChoiceTooLarge:
1088 self.mark_overrun()
1089 else:
1090 raise NotImplementedError
1091
1092 if node.count is not None:
1093 node.count -= 1
1094 if node.count < 0:
1095 self.mark_overrun()
1096 return choice
1097
1098 choice = value
1099 node_choice_type = {
1100 str: "string",
1101 float: "float",
1102 int: "integer",
1103 bool: "boolean",
1104 bytes: "bytes",
1105 }[type(choice)]
1106 # If we're trying to:
1107 # * draw a different choice type at the same location
1108 # * draw the same choice type with a different constraints, which does not permit
1109 # the current value
1110 #
1111 # then we call this a misalignment, because the choice sequence has
1112 # changed from what we expected at some point. An easy misalignment is
1113 #
1114 # one_of(integers(0, 100), integers(101, 200))
1115 #
1116 # where the choice sequence [0, 100] has constraints {min_value: 0, max_value: 100}
1117 # at index 1, but [0, 101] has constraints {min_value: 101, max_value: 200} at
1118 # index 1 (which does not permit any of the values 0-100).
1119 #
1120 # When the choice sequence becomes misaligned, we generate a new value of the
1121 # type and constraints the strategy expects.
1122 if node_choice_type != choice_type or not choice_permitted(choice, constraints):
1123 # only track first misalignment for now.
1124 if self.misaligned_at is None:
1125 self.misaligned_at = (self.index, choice_type, constraints, forced)
1126 try:
1127 # Fill in any misalignments with index 0 choices. An alternative to
1128 # this is using the index of the misaligned choice instead
1129 # of index 0, which may be useful for maintaining
1130 # "similarly-complex choices" in the shrinker. This requires
1131 # attaching an index to every choice in ConjectureData.for_choices,
1132 # which we don't always have (e.g. when reading from db).
1133 #
1134 # If we really wanted this in the future we could make this complexity
1135 # optional, use it if present, and default to index 0 otherwise.
1136 # This complicates our internal api and so I'd like to avoid it
1137 # if possible.
1138 #
1139 # Additionally, I don't think slips which require
1140 # slipping to high-complexity values are common. Though arguably
1141 # we may want to expand a bit beyond *just* the simplest choice.
1142 # (we could for example consider sampling choices from index 0-10).
1143 choice = choice_from_index(0, choice_type, constraints)
1144 except ChoiceTooLarge:
1145 # should really never happen with a 0-index choice, but let's be safe.
1146 self.mark_overrun()
1147
1148 self.index += 1
1149 return choice
1150
1151 def as_result(self) -> Union[ConjectureResult, _Overrun]:
1152 """Convert the result of running this test into
1153 either an Overrun object or a ConjectureResult."""
1154
1155 assert self.frozen
1156 if self.status == Status.OVERRUN:
1157 return Overrun
1158 if self.__result is None:
1159 self.__result = ConjectureResult(
1160 status=self.status,
1161 interesting_origin=self.interesting_origin,
1162 spans=self.spans,
1163 nodes=self.nodes,
1164 length=self.length,
1165 output=self.output,
1166 expected_traceback=self.expected_traceback,
1167 expected_exception=self.expected_exception,
1168 extra_information=(
1169 self.extra_information
1170 if self.extra_information.has_information()
1171 else None
1172 ),
1173 has_discards=self.has_discards,
1174 target_observations=self.target_observations,
1175 tags=frozenset(self.tags),
1176 arg_slices=self.arg_slices,
1177 slice_comments=self.slice_comments,
1178 misaligned_at=self.misaligned_at,
1179 cannot_proceed_scope=self.cannot_proceed_scope,
1180 )
1181 assert self.__result is not None
1182 return self.__result
1183
1184 def __assert_not_frozen(self, name: str) -> None:
1185 if self.frozen:
1186 raise Frozen(f"Cannot call {name} on frozen ConjectureData")
1187
1188 def note(self, value: Any) -> None:
1189 self.__assert_not_frozen("note")
1190 if not isinstance(value, str):
1191 value = repr(value)
1192 self.output += value
1193
1194 def draw(
1195 self,
1196 strategy: "SearchStrategy[Ex]",
1197 label: Optional[int] = None,
1198 observe_as: Optional[str] = None,
1199 ) -> "Ex":
1200 from hypothesis.internal.observability import observability_enabled
1201 from hypothesis.strategies._internal.lazy import unwrap_strategies
1202 from hypothesis.strategies._internal.utils import to_jsonable
1203
1204 at_top_level = self.depth == 0
1205 start_time = None
1206 if at_top_level:
1207 # We start this timer early, because accessing attributes on a LazyStrategy
1208 # can be almost arbitrarily slow. In cases like characters() and text()
1209 # where we cache something expensive, this led to Flaky deadline errors!
1210 # See https://github.com/HypothesisWorks/hypothesis/issues/2108
1211 start_time = time.perf_counter()
1212 gc_start_time = gc_cumulative_time()
1213
1214 strategy.validate()
1215
1216 if strategy.is_empty:
1217 self.mark_invalid(f"empty strategy {self!r}")
1218
1219 if self.depth >= MAX_DEPTH:
1220 self.mark_invalid("max depth exceeded")
1221
1222 # Jump directly to the unwrapped strategy for the label and for do_draw.
1223 # This avoids adding an extra span to all lazy strategies.
1224 unwrapped = unwrap_strategies(strategy)
1225 if label is None:
1226 label = unwrapped.label
1227 assert isinstance(label, int)
1228
1229 self.start_span(label=label)
1230 try:
1231 if not at_top_level:
1232 return unwrapped.do_draw(self)
1233 assert start_time is not None
1234 key = observe_as or f"generate:unlabeled_{len(self.draw_times)}"
1235 try:
1236 try:
1237 v = unwrapped.do_draw(self)
1238 finally:
1239 # Subtract the time spent in GC to avoid overcounting, as it is
1240 # accounted for at the overall example level.
1241 in_gctime = gc_cumulative_time() - gc_start_time
1242 self.draw_times[key] = time.perf_counter() - start_time - in_gctime
1243 except Exception as err:
1244 add_note(
1245 err,
1246 f"while generating {key.removeprefix('generate:')!r} from {strategy!r}",
1247 )
1248 raise
1249 if observability_enabled():
1250 avoid = self.provider.avoid_realization
1251 self._observability_args[key] = to_jsonable(v, avoid_realization=avoid)
1252 return v
1253 finally:
1254 self.stop_span()
1255
1256 def start_span(self, label: int) -> None:
1257 self.provider.span_start(label)
1258 self.__assert_not_frozen("start_span")
1259 self.depth += 1
1260 # Logically it would make sense for this to just be
1261 # ``self.depth = max(self.depth, self.max_depth)``, which is what it used to
1262 # be until we ran the code under tracemalloc and found a rather significant
1263 # chunk of allocation was happening here. This was presumably due to varargs
1264 # or the like, but we didn't investigate further given that it was easy
1265 # to fix with this check.
1266 if self.depth > self.max_depth:
1267 self.max_depth = self.depth
1268 self.__span_record.start_span(label)
1269 self.labels_for_structure_stack.append({label})
1270
1271 def stop_span(self, *, discard: bool = False) -> None:
1272 self.provider.span_end(discard)
1273 if self.frozen:
1274 return
1275 if discard:
1276 self.has_discards = True
1277 self.depth -= 1
1278 assert self.depth >= -1
1279 self.__span_record.stop_span(discard=discard)
1280
1281 labels_for_structure = self.labels_for_structure_stack.pop()
1282
1283 if not discard:
1284 if self.labels_for_structure_stack:
1285 self.labels_for_structure_stack[-1].update(labels_for_structure)
1286 else:
1287 self.tags.update([structural_coverage(l) for l in labels_for_structure])
1288
1289 if discard:
1290 # Once we've discarded a span, every test case starting with
1291 # this prefix contains discards. We prune the tree at that point so
1292 # as to avoid future test cases bothering with this region, on the
1293 # assumption that some span that you could have used instead
1294 # there would *not* trigger the discard. This greatly speeds up
1295 # test case generation in some cases, because it allows us to
1296 # ignore large swathes of the search space that are effectively
1297 # redundant.
1298 #
1299 # A scenario that can cause us problems but which we deliberately
1300 # have decided not to support is that if there are side effects
1301 # during data generation then you may end up with a scenario where
1302 # every good test case generates a discard because the discarded
1303 # section sets up important things for later. This is not terribly
1304 # likely and all that you see in this case is some degradation in
1305 # quality of testing, so we don't worry about it.
1306 #
1307 # Note that killing the branch does *not* mean we will never
1308 # explore below this point, and in particular we may do so during
1309 # shrinking. Any explicit request for a data object that starts
1310 # with the branch here will work just fine, but novel prefix
1311 # generation will avoid it, and we can use it to detect when we
1312 # have explored the entire tree (up to redundancy).
1313
1314 self.observer.kill_branch()
1315
1316 @property
1317 def spans(self) -> Spans:
1318 assert self.frozen
1319 if self.__spans is None:
1320 self.__spans = Spans(record=self.__span_record)
1321 return self.__spans
1322
1323 def freeze(self) -> None:
1324 if self.frozen:
1325 return
1326 self.finish_time = time.perf_counter()
1327 self.gc_finish_time = gc_cumulative_time()
1328
1329 # Always finish by closing all remaining spans so that we have a valid tree.
1330 while self.depth >= 0:
1331 self.stop_span()
1332
1333 self.__span_record.freeze()
1334 self.frozen = True
1335 self.observer.conclude_test(self.status, self.interesting_origin)
1336
1337 def choice(
1338 self,
1339 values: Sequence[T],
1340 *,
1341 forced: Optional[T] = None,
1342 observe: bool = True,
1343 ) -> T:
1344 forced_i = None if forced is None else values.index(forced)
1345 i = self.draw_integer(
1346 0,
1347 len(values) - 1,
1348 forced=forced_i,
1349 observe=observe,
1350 )
1351 return values[i]
1352
1353 def conclude_test(
1354 self,
1355 status: Status,
1356 interesting_origin: Optional[InterestingOrigin] = None,
1357 ) -> NoReturn:
1358 assert (interesting_origin is None) or (status == Status.INTERESTING)
1359 self.__assert_not_frozen("conclude_test")
1360 self.interesting_origin = interesting_origin
1361 self.status = status
1362 self.freeze()
1363 raise StopTest(self.testcounter)
1364
1365 def mark_interesting(self, interesting_origin: InterestingOrigin) -> NoReturn:
1366 self.conclude_test(Status.INTERESTING, interesting_origin)
1367
1368 def mark_invalid(self, why: Optional[str] = None) -> NoReturn:
1369 if why is not None:
1370 self.events["invalid because"] = why
1371 self.conclude_test(Status.INVALID)
1372
1373 def mark_overrun(self) -> NoReturn:
1374 self.conclude_test(Status.OVERRUN)
1375
1376
1377def draw_choice(
1378 choice_type: ChoiceTypeT, constraints: ChoiceConstraintsT, *, random: Random
1379) -> ChoiceT:
1380 cd = ConjectureData(random=random)
1381 return cast(ChoiceT, getattr(cd.provider, f"draw_{choice_type}")(**constraints))