1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
10
11import math
12import time
13from collections import defaultdict
14from collections.abc import Hashable, Iterable, Iterator, Sequence
15from enum import IntEnum
16from functools import cached_property
17from random import Random
18from typing import (
19 TYPE_CHECKING,
20 Any,
21 Literal,
22 NoReturn,
23 Optional,
24 TypeVar,
25 Union,
26 cast,
27 overload,
28)
29
30import attr
31
32from hypothesis.errors import (
33 CannotProceedScopeT,
34 ChoiceTooLarge,
35 Frozen,
36 InvalidArgument,
37 StopTest,
38)
39from hypothesis.internal.cache import LRUCache
40from hypothesis.internal.compat import add_note
41from hypothesis.internal.conjecture.choice import (
42 BooleanConstraints,
43 BytesConstraints,
44 ChoiceConstraintsT,
45 ChoiceNode,
46 ChoiceT,
47 ChoiceTemplate,
48 ChoiceTypeT,
49 FloatConstraints,
50 IntegerConstraints,
51 StringConstraints,
52 choice_constraints_key,
53 choice_from_index,
54 choice_permitted,
55 choices_size,
56)
57from hypothesis.internal.conjecture.junkdrawer import IntList, gc_cumulative_time
58from hypothesis.internal.conjecture.providers import (
59 COLLECTION_DEFAULT_MAX_SIZE,
60 HypothesisProvider,
61 PrimitiveProvider,
62)
63from hypothesis.internal.conjecture.utils import calc_label_from_name
64from hypothesis.internal.escalation import InterestingOrigin
65from hypothesis.internal.floats import (
66 SMALLEST_SUBNORMAL,
67 float_to_int,
68 int_to_float,
69 sign_aware_lte,
70)
71from hypothesis.internal.intervalsets import IntervalSet
72from hypothesis.internal.observability import PredicateCounts
73from hypothesis.reporting import debug_report
74from hypothesis.utils.conventions import not_set
75from hypothesis.utils.threading import ThreadLocal
76
77if TYPE_CHECKING:
78 from typing import TypeAlias
79
80 from hypothesis.strategies import SearchStrategy
81 from hypothesis.strategies._internal.core import DataObject
82 from hypothesis.strategies._internal.random import RandomState
83 from hypothesis.strategies._internal.strategies import Ex
84
85
86def __getattr__(name: str) -> Any:
87 if name == "AVAILABLE_PROVIDERS":
88 from hypothesis._settings import note_deprecation
89 from hypothesis.internal.conjecture.providers import AVAILABLE_PROVIDERS
90
91 note_deprecation(
92 "hypothesis.internal.conjecture.data.AVAILABLE_PROVIDERS has been moved to "
93 "hypothesis.internal.conjecture.providers.AVAILABLE_PROVIDERS.",
94 since="2025-01-25",
95 has_codemod=False,
96 stacklevel=1,
97 )
98 return AVAILABLE_PROVIDERS
99
100 raise AttributeError(
101 f"Module 'hypothesis.internal.conjecture.data' has no attribute {name}"
102 )
103
104
105T = TypeVar("T")
106TargetObservations = dict[str, Union[int, float]]
107# index, choice_type, constraints, forced value
108MisalignedAt: "TypeAlias" = tuple[
109 int, ChoiceTypeT, ChoiceConstraintsT, Optional[ChoiceT]
110]
111
112TOP_LABEL = calc_label_from_name("top")
113MAX_DEPTH = 100
114
115threadlocal = ThreadLocal(global_test_counter=int)
116
117
118class ExtraInformation:
119 """A class for holding shared state on a ``ConjectureData`` that should
120 be added to the final ``ConjectureResult``."""
121
122 def __repr__(self) -> str:
123 return "ExtraInformation({})".format(
124 ", ".join(f"{k}={v!r}" for k, v in self.__dict__.items()),
125 )
126
127 def has_information(self) -> bool:
128 return bool(self.__dict__)
129
130
131class Status(IntEnum):
132 OVERRUN = 0
133 INVALID = 1
134 VALID = 2
135 INTERESTING = 3
136
137 def __repr__(self) -> str:
138 return f"Status.{self.name}"
139
140
141@attr.s(slots=True, frozen=True)
142class StructuralCoverageTag:
143 label: int = attr.ib()
144
145
146STRUCTURAL_COVERAGE_CACHE: dict[int, StructuralCoverageTag] = {}
147
148
149def structural_coverage(label: int) -> StructuralCoverageTag:
150 try:
151 return STRUCTURAL_COVERAGE_CACHE[label]
152 except KeyError:
153 return STRUCTURAL_COVERAGE_CACHE.setdefault(label, StructuralCoverageTag(label))
154
155
156# This cache can be quite hot and so we prefer LRUCache over LRUReusedCache for
157# performance. We lose scan resistance, but that's probably fine here.
158POOLED_CONSTRAINTS_CACHE: LRUCache[tuple[Any, ...], ChoiceConstraintsT] = LRUCache(4096)
159
160
161class Span:
162 """A span tracks the hierarchical structure of choices within a single test run.
163
164 Spans are created to mark regions of the choice sequence that that are
165 logically related to each other. For instance, Hypothesis tracks:
166 - A single top-level span for the entire choice sequence
167 - A span for the choices made by each strategy
168 - Some strategies define additional spans within their choices. For instance,
169 st.lists() tracks the "should add another element" choice and the "add
170 another element" choices as separate spans.
171
172 Spans provide useful information to the shrinker, mutator, targeted PBT,
173 and other subsystems of Hypothesis.
174
175 Rather than store each ``Span`` as a rich object, it is actually
176 just an index into the ``Spans`` class defined below. This has two
177 purposes: Firstly, for most properties of spans we will never need
178 to allocate storage at all, because most properties are not used on
179 most spans. Secondly, by storing the spans as compact lists
180 of integers, we save a considerable amount of space compared to
181 Python's normal object size.
182
183 This does have the downside that it increases the amount of allocation
184 we do, and slows things down as a result, in some usage patterns because
185 we repeatedly allocate the same Span or int objects, but it will
186 often dramatically reduce our memory usage, so is worth it.
187 """
188
189 __slots__ = ("index", "owner")
190
191 def __init__(self, owner: "Spans", index: int) -> None:
192 self.owner = owner
193 self.index = index
194
195 def __eq__(self, other: object) -> bool:
196 if self is other:
197 return True
198 if not isinstance(other, Span):
199 return NotImplemented
200 return (self.owner is other.owner) and (self.index == other.index)
201
202 def __ne__(self, other: object) -> bool:
203 if self is other:
204 return False
205 if not isinstance(other, Span):
206 return NotImplemented
207 return (self.owner is not other.owner) or (self.index != other.index)
208
209 def __repr__(self) -> str:
210 return f"spans[{self.index}]"
211
212 @property
213 def label(self) -> int:
214 """A label is an opaque value that associates each span with its
215 approximate origin, such as a particular strategy class or a particular
216 kind of draw."""
217 return self.owner.labels[self.owner.label_indices[self.index]]
218
219 @property
220 def parent(self) -> Optional[int]:
221 """The index of the span that this one is nested directly within."""
222 if self.index == 0:
223 return None
224 return self.owner.parentage[self.index]
225
226 @property
227 def start(self) -> int:
228 return self.owner.starts[self.index]
229
230 @property
231 def end(self) -> int:
232 return self.owner.ends[self.index]
233
234 @property
235 def depth(self) -> int:
236 """
237 Depth of this span in the span tree. The top-level span has a depth of 0.
238 """
239 return self.owner.depths[self.index]
240
241 @property
242 def discarded(self) -> bool:
243 """True if this is span's ``stop_span`` call had ``discard`` set to
244 ``True``. This means we believe that the shrinker should be able to delete
245 this span completely, without affecting the value produced by its enclosing
246 strategy. Typically set when a rejection sampler decides to reject a
247 generated value and try again."""
248 return self.index in self.owner.discarded
249
250 @property
251 def choice_count(self) -> int:
252 """The number of choices in this span."""
253 return self.end - self.start
254
255 @property
256 def children(self) -> "list[Span]":
257 """The list of all spans with this as a parent, in increasing index
258 order."""
259 return [self.owner[i] for i in self.owner.children[self.index]]
260
261
262class SpanProperty:
263 """There are many properties of spans that we calculate by
264 essentially rerunning the test case multiple times based on the
265 calls which we record in SpanProperty.
266
267 This class defines a visitor, subclasses of which can be used
268 to calculate these properties.
269 """
270
271 def __init__(self, spans: "Spans"):
272 self.span_stack: list[int] = []
273 self.spans = spans
274 self.span_count = 0
275 self.choice_count = 0
276
277 def run(self) -> Any:
278 """Rerun the test case with this visitor and return the
279 results of ``self.finish()``."""
280 for record in self.spans.trail:
281 if record == TrailType.STOP_SPAN_DISCARD:
282 self.__pop(discarded=True)
283 elif record == TrailType.STOP_SPAN_NO_DISCARD:
284 self.__pop(discarded=False)
285 elif record == TrailType.CHOICE:
286 self.choice_count += 1
287 else:
288 # everything after TrailType.CHOICE is the label of a span start.
289 self.__push(record - TrailType.CHOICE - 1)
290
291 return self.finish()
292
293 def __push(self, label_index: int) -> None:
294 i = self.span_count
295 assert i < len(self.spans)
296 self.start_span(i, label_index=label_index)
297 self.span_count += 1
298 self.span_stack.append(i)
299
300 def __pop(self, *, discarded: bool) -> None:
301 i = self.span_stack.pop()
302 self.stop_span(i, discarded=discarded)
303
304 def start_span(self, i: int, label_index: int) -> None:
305 """Called at the start of each span, with ``i`` the
306 index of the span and ``label_index`` the index of
307 its label in ``self.spans.labels``."""
308
309 def stop_span(self, i: int, *, discarded: bool) -> None:
310 """Called at the end of each span, with ``i`` the
311 index of the span and ``discarded`` being ``True`` if ``stop_span``
312 was called with ``discard=True``."""
313
314 def finish(self) -> Any:
315 raise NotImplementedError
316
317
318class TrailType(IntEnum):
319 STOP_SPAN_DISCARD = 1
320 STOP_SPAN_NO_DISCARD = 2
321 CHOICE = 3
322 # every trail element larger than TrailType.CHOICE is the label of a span
323 # start, offset by its index. So the first span label is stored as 4, the
324 # second as 5, etc, regardless of its actual integer label.
325
326
327class SpanRecord:
328 """Records the series of ``start_span``, ``stop_span``, and
329 ``draw_bits`` calls so that these may be stored in ``Spans`` and
330 replayed when we need to know about the structure of individual
331 ``Span`` objects.
332
333 Note that there is significant similarity between this class and
334 ``DataObserver``, and the plan is to eventually unify them, but
335 they currently have slightly different functions and implementations.
336 """
337
338 def __init__(self) -> None:
339 self.labels: list[int] = []
340 self.__index_of_labels: Optional[dict[int, int]] = {}
341 self.trail = IntList()
342 self.nodes: list[ChoiceNode] = []
343
344 def freeze(self) -> None:
345 self.__index_of_labels = None
346
347 def record_choice(self) -> None:
348 self.trail.append(TrailType.CHOICE)
349
350 def start_span(self, label: int) -> None:
351 assert self.__index_of_labels is not None
352 try:
353 i = self.__index_of_labels[label]
354 except KeyError:
355 i = self.__index_of_labels.setdefault(label, len(self.labels))
356 self.labels.append(label)
357 self.trail.append(TrailType.CHOICE + 1 + i)
358
359 def stop_span(self, *, discard: bool) -> None:
360 if discard:
361 self.trail.append(TrailType.STOP_SPAN_DISCARD)
362 else:
363 self.trail.append(TrailType.STOP_SPAN_NO_DISCARD)
364
365
366class _starts_and_ends(SpanProperty):
367 def __init__(self, spans: "Spans") -> None:
368 super().__init__(spans)
369 self.starts = IntList.of_length(len(self.spans))
370 self.ends = IntList.of_length(len(self.spans))
371
372 def start_span(self, i: int, label_index: int) -> None:
373 self.starts[i] = self.choice_count
374
375 def stop_span(self, i: int, *, discarded: bool) -> None:
376 self.ends[i] = self.choice_count
377
378 def finish(self) -> tuple[IntList, IntList]:
379 return (self.starts, self.ends)
380
381
382class _discarded(SpanProperty):
383 def __init__(self, spans: "Spans") -> None:
384 super().__init__(spans)
385 self.result: set[int] = set()
386
387 def finish(self) -> frozenset[int]:
388 return frozenset(self.result)
389
390 def stop_span(self, i: int, *, discarded: bool) -> None:
391 if discarded:
392 self.result.add(i)
393
394
395class _parentage(SpanProperty):
396 def __init__(self, spans: "Spans") -> None:
397 super().__init__(spans)
398 self.result = IntList.of_length(len(self.spans))
399
400 def stop_span(self, i: int, *, discarded: bool) -> None:
401 if i > 0:
402 self.result[i] = self.span_stack[-1]
403
404 def finish(self) -> IntList:
405 return self.result
406
407
408class _depths(SpanProperty):
409 def __init__(self, spans: "Spans") -> None:
410 super().__init__(spans)
411 self.result = IntList.of_length(len(self.spans))
412
413 def start_span(self, i: int, label_index: int) -> None:
414 self.result[i] = len(self.span_stack)
415
416 def finish(self) -> IntList:
417 return self.result
418
419
420class _label_indices(SpanProperty):
421 def __init__(self, spans: "Spans") -> None:
422 super().__init__(spans)
423 self.result = IntList.of_length(len(self.spans))
424
425 def start_span(self, i: int, label_index: int) -> None:
426 self.result[i] = label_index
427
428 def finish(self) -> IntList:
429 return self.result
430
431
432class _mutator_groups(SpanProperty):
433 def __init__(self, spans: "Spans") -> None:
434 super().__init__(spans)
435 self.groups: dict[int, set[tuple[int, int]]] = defaultdict(set)
436
437 def start_span(self, i: int, label_index: int) -> None:
438 # TODO should we discard start == end cases? occurs for eg st.data()
439 # which is conditionally or never drawn from. arguably swapping
440 # nodes with the empty list is a useful mutation enabled by start == end?
441 key = (self.spans[i].start, self.spans[i].end)
442 self.groups[label_index].add(key)
443
444 def finish(self) -> Iterable[set[tuple[int, int]]]:
445 # Discard groups with only one span, since the mutator can't
446 # do anything useful with them.
447 return [g for g in self.groups.values() if len(g) >= 2]
448
449
450class Spans:
451 """A lazy collection of ``Span`` objects, derived from
452 the record of recorded behaviour in ``SpanRecord``.
453
454 Behaves logically as if it were a list of ``Span`` objects,
455 but actually mostly exists as a compact store of information
456 for them to reference into. All properties on here are best
457 understood as the backing storage for ``Span`` and are
458 described there.
459 """
460
461 def __init__(self, record: SpanRecord) -> None:
462 self.trail = record.trail
463 self.labels = record.labels
464 self.__length = self.trail.count(
465 TrailType.STOP_SPAN_DISCARD
466 ) + record.trail.count(TrailType.STOP_SPAN_NO_DISCARD)
467 self.__children: Optional[list[Sequence[int]]] = None
468
469 @cached_property
470 def starts_and_ends(self) -> tuple[IntList, IntList]:
471 return _starts_and_ends(self).run()
472
473 @property
474 def starts(self) -> IntList:
475 return self.starts_and_ends[0]
476
477 @property
478 def ends(self) -> IntList:
479 return self.starts_and_ends[1]
480
481 @cached_property
482 def discarded(self) -> frozenset[int]:
483 return _discarded(self).run()
484
485 @cached_property
486 def parentage(self) -> IntList:
487 return _parentage(self).run()
488
489 @cached_property
490 def depths(self) -> IntList:
491 return _depths(self).run()
492
493 @cached_property
494 def label_indices(self) -> IntList:
495 return _label_indices(self).run()
496
497 @cached_property
498 def mutator_groups(self) -> list[set[tuple[int, int]]]:
499 return _mutator_groups(self).run()
500
501 @property
502 def children(self) -> list[Sequence[int]]:
503 if self.__children is None:
504 children = [IntList() for _ in range(len(self))]
505 for i, p in enumerate(self.parentage):
506 if i > 0:
507 children[p].append(i)
508 # Replace empty children lists with a tuple to reduce
509 # memory usage.
510 for i, c in enumerate(children):
511 if not c:
512 children[i] = () # type: ignore
513 self.__children = children # type: ignore
514 return self.__children # type: ignore
515
516 def __len__(self) -> int:
517 return self.__length
518
519 def __getitem__(self, i: int) -> Span:
520 n = self.__length
521 if i < -n or i >= n:
522 raise IndexError(f"Index {i} out of range [-{n}, {n})")
523 if i < 0:
524 i += n
525 return Span(self, i)
526
527 # not strictly necessary as we have len/getitem, but required for mypy.
528 # https://github.com/python/mypy/issues/9737
529 def __iter__(self) -> Iterator[Span]:
530 for i in range(len(self)):
531 yield self[i]
532
533
534class _Overrun:
535 status: Status = Status.OVERRUN
536
537 def __repr__(self) -> str:
538 return "Overrun"
539
540
541Overrun = _Overrun()
542
543
544class DataObserver:
545 """Observer class for recording the behaviour of a
546 ConjectureData object, primarily used for tracking
547 the behaviour in the tree cache."""
548
549 def conclude_test(
550 self,
551 status: Status,
552 interesting_origin: Optional[InterestingOrigin],
553 ) -> None:
554 """Called when ``conclude_test`` is called on the
555 observed ``ConjectureData``, with the same arguments.
556
557 Note that this is called after ``freeze`` has completed.
558 """
559
560 def kill_branch(self) -> None:
561 """Mark this part of the tree as not worth re-exploring."""
562
563 def draw_integer(
564 self, value: int, *, constraints: IntegerConstraints, was_forced: bool
565 ) -> None:
566 pass
567
568 def draw_float(
569 self, value: float, *, constraints: FloatConstraints, was_forced: bool
570 ) -> None:
571 pass
572
573 def draw_string(
574 self, value: str, *, constraints: StringConstraints, was_forced: bool
575 ) -> None:
576 pass
577
578 def draw_bytes(
579 self, value: bytes, *, constraints: BytesConstraints, was_forced: bool
580 ) -> None:
581 pass
582
583 def draw_boolean(
584 self, value: bool, *, constraints: BooleanConstraints, was_forced: bool
585 ) -> None:
586 pass
587
588
589@attr.s(slots=True)
590class ConjectureResult:
591 """Result class storing the parts of ConjectureData that we
592 will care about after the original ConjectureData has outlived its
593 usefulness."""
594
595 status: Status = attr.ib()
596 interesting_origin: Optional[InterestingOrigin] = attr.ib()
597 nodes: tuple[ChoiceNode, ...] = attr.ib(eq=False, repr=False)
598 length: int = attr.ib()
599 output: str = attr.ib()
600 extra_information: Optional[ExtraInformation] = attr.ib()
601 expected_exception: Optional[BaseException] = attr.ib()
602 expected_traceback: Optional[str] = attr.ib()
603 has_discards: bool = attr.ib()
604 target_observations: TargetObservations = attr.ib()
605 tags: frozenset[StructuralCoverageTag] = attr.ib()
606 spans: Spans = attr.ib(repr=False, eq=False)
607 arg_slices: set[tuple[int, int]] = attr.ib(repr=False)
608 slice_comments: dict[tuple[int, int], str] = attr.ib(repr=False)
609 misaligned_at: Optional[MisalignedAt] = attr.ib(repr=False)
610 cannot_proceed_scope: Optional[CannotProceedScopeT] = attr.ib(repr=False)
611
612 def as_result(self) -> "ConjectureResult":
613 return self
614
615 @property
616 def choices(self) -> tuple[ChoiceT, ...]:
617 return tuple(node.value for node in self.nodes)
618
619
620class ConjectureData:
621 @classmethod
622 def for_choices(
623 cls,
624 choices: Sequence[Union[ChoiceTemplate, ChoiceT]],
625 *,
626 observer: Optional[DataObserver] = None,
627 provider: Union[type, PrimitiveProvider] = HypothesisProvider,
628 random: Optional[Random] = None,
629 ) -> "ConjectureData":
630 from hypothesis.internal.conjecture.engine import choice_count
631
632 return cls(
633 max_choices=choice_count(choices),
634 random=random,
635 prefix=choices,
636 observer=observer,
637 provider=provider,
638 )
639
640 def __init__(
641 self,
642 *,
643 random: Optional[Random],
644 observer: Optional[DataObserver] = None,
645 provider: Union[type, PrimitiveProvider] = HypothesisProvider,
646 prefix: Optional[Sequence[Union[ChoiceTemplate, ChoiceT]]] = None,
647 max_choices: Optional[int] = None,
648 provider_kw: Optional[dict[str, Any]] = None,
649 ) -> None:
650 from hypothesis.internal.conjecture.engine import BUFFER_SIZE
651
652 if observer is None:
653 observer = DataObserver()
654 if provider_kw is None:
655 provider_kw = {}
656 elif not isinstance(provider, type):
657 raise InvalidArgument(
658 f"Expected {provider=} to be a class since {provider_kw=} was "
659 "passed, but got an instance instead."
660 )
661
662 assert isinstance(observer, DataObserver)
663 self.observer = observer
664 self.max_choices = max_choices
665 self.max_length = BUFFER_SIZE
666 self.is_find = False
667 self.overdraw = 0
668 self._random = random
669
670 self.length = 0
671 self.index = 0
672 self.output = ""
673 self.status = Status.VALID
674 self.frozen = False
675 self.testcounter = threadlocal.global_test_counter
676 threadlocal.global_test_counter += 1
677 self.start_time = time.perf_counter()
678 self.gc_start_time = gc_cumulative_time()
679 self.events: dict[str, Union[str, int, float]] = {}
680 self.interesting_origin: Optional[InterestingOrigin] = None
681 self.draw_times: dict[str, float] = {}
682 self._stateful_run_times: dict[str, float] = defaultdict(float)
683 self.max_depth = 0
684 self.has_discards = False
685
686 self.provider: PrimitiveProvider = (
687 provider(self, **provider_kw) if isinstance(provider, type) else provider
688 )
689 assert isinstance(self.provider, PrimitiveProvider)
690
691 self.__result: Optional[ConjectureResult] = None
692
693 # Observations used for targeted search. They'll be aggregated in
694 # ConjectureRunner.generate_new_examples and fed to TargetSelector.
695 self.target_observations: TargetObservations = {}
696
697 # Tags which indicate something about which part of the search space
698 # this example is in. These are used to guide generation.
699 self.tags: set[StructuralCoverageTag] = set()
700 self.labels_for_structure_stack: list[set[int]] = []
701
702 # Normally unpopulated but we need this in the niche case
703 # that self.as_result() is Overrun but we still want the
704 # examples for reporting purposes.
705 self.__spans: Optional[Spans] = None
706
707 # We want the top level span to have depth 0, so we start
708 # at -1.
709 self.depth = -1
710 self.__span_record = SpanRecord()
711
712 # Slice indices for discrete reportable parts that which-parts-matter can
713 # try varying, to report if the minimal example always fails anyway.
714 self.arg_slices: set[tuple[int, int]] = set()
715 self.slice_comments: dict[tuple[int, int], str] = {}
716 self._observability_args: dict[str, Any] = {}
717 self._observability_predicates: defaultdict[str, PredicateCounts] = defaultdict(
718 PredicateCounts
719 )
720
721 self._sampled_from_all_strategies_elements_message: Optional[
722 tuple[str, object]
723 ] = None
724 self._shared_strategy_draws: dict[Hashable, tuple[int, Any]] = {}
725 self._shared_data_strategy: Optional[DataObject] = None
726 self._stateful_repr_parts: Optional[list[Any]] = None
727 self.states_for_ids: Optional[dict[int, RandomState]] = None
728 self.seeds_to_states: Optional[dict[Any, RandomState]] = None
729 self.hypothesis_runner = not_set
730
731 self.expected_exception: Optional[BaseException] = None
732 self.expected_traceback: Optional[str] = None
733 self.extra_information = ExtraInformation()
734
735 self.prefix = prefix
736 self.nodes: tuple[ChoiceNode, ...] = ()
737 self.misaligned_at: Optional[MisalignedAt] = None
738 self.cannot_proceed_scope: Optional[CannotProceedScopeT] = None
739 self.start_span(TOP_LABEL)
740
741 def __repr__(self) -> str:
742 return "ConjectureData(%s, %d choices%s)" % (
743 self.status.name,
744 len(self.nodes),
745 ", frozen" if self.frozen else "",
746 )
747
748 @property
749 def choices(self) -> tuple[ChoiceT, ...]:
750 return tuple(node.value for node in self.nodes)
751
752 # draw_* functions might be called in one of two contexts: either "above" or
753 # "below" the choice sequence. For instance, draw_string calls draw_boolean
754 # from ``many`` when calculating the number of characters to return. We do
755 # not want these choices to get written to the choice sequence, because they
756 # are not true choices themselves.
757 #
758 # `observe` formalizes this. The choice will only be written to the choice
759 # sequence if observe is True.
760
761 @overload
762 def _draw(
763 self,
764 choice_type: Literal["integer"],
765 constraints: IntegerConstraints,
766 *,
767 observe: bool,
768 forced: Optional[int],
769 ) -> int: ...
770
771 @overload
772 def _draw(
773 self,
774 choice_type: Literal["float"],
775 constraints: FloatConstraints,
776 *,
777 observe: bool,
778 forced: Optional[float],
779 ) -> float: ...
780
781 @overload
782 def _draw(
783 self,
784 choice_type: Literal["string"],
785 constraints: StringConstraints,
786 *,
787 observe: bool,
788 forced: Optional[str],
789 ) -> str: ...
790
791 @overload
792 def _draw(
793 self,
794 choice_type: Literal["bytes"],
795 constraints: BytesConstraints,
796 *,
797 observe: bool,
798 forced: Optional[bytes],
799 ) -> bytes: ...
800
801 @overload
802 def _draw(
803 self,
804 choice_type: Literal["boolean"],
805 constraints: BooleanConstraints,
806 *,
807 observe: bool,
808 forced: Optional[bool],
809 ) -> bool: ...
810
811 def _draw(
812 self,
813 choice_type: ChoiceTypeT,
814 constraints: ChoiceConstraintsT,
815 *,
816 observe: bool,
817 forced: Optional[ChoiceT],
818 ) -> ChoiceT:
819 # this is somewhat redundant with the length > max_length check at the
820 # end of the function, but avoids trying to use a null self.random when
821 # drawing past the node of a ConjectureData.for_choices data.
822 if self.length == self.max_length:
823 debug_report(f"overrun because hit {self.max_length=}")
824 self.mark_overrun()
825 if len(self.nodes) == self.max_choices:
826 debug_report(f"overrun because hit {self.max_choices=}")
827 self.mark_overrun()
828
829 if observe and self.prefix is not None and self.index < len(self.prefix):
830 value = self._pop_choice(choice_type, constraints, forced=forced)
831 elif forced is None:
832 value = getattr(self.provider, f"draw_{choice_type}")(**constraints)
833
834 if forced is not None:
835 value = forced
836
837 # nan values generated via int_to_float break list membership:
838 #
839 # >>> n = 18444492273895866368
840 # >>> assert math.isnan(int_to_float(n))
841 # >>> assert int_to_float(n) not in [int_to_float(n)]
842 #
843 # because int_to_float nans are not equal in the sense of either
844 # `a == b` or `a is b`.
845 #
846 # This can lead to flaky errors when collections require unique
847 # floats. What was happening is that in some places we provided math.nan
848 # provide math.nan, and in others we provided
849 # int_to_float(float_to_int(math.nan)), and which one gets used
850 # was not deterministic across test iterations.
851 #
852 # To fix this, *never* provide a nan value which is equal (via `is`) to
853 # another provided nan value. This sacrifices some test power; we should
854 # bring that back (ABOVE the choice sequence layer) in the future.
855 #
856 # See https://github.com/HypothesisWorks/hypothesis/issues/3926.
857 if choice_type == "float":
858 assert isinstance(value, float)
859 if math.isnan(value):
860 value = int_to_float(float_to_int(value))
861
862 if observe:
863 was_forced = forced is not None
864 getattr(self.observer, f"draw_{choice_type}")(
865 value, constraints=constraints, was_forced=was_forced
866 )
867 size = 0 if self.provider.avoid_realization else choices_size([value])
868 if self.length + size > self.max_length:
869 debug_report(
870 f"overrun because {self.length=} + {size=} > {self.max_length=}"
871 )
872 self.mark_overrun()
873
874 node = ChoiceNode(
875 type=choice_type,
876 value=value,
877 constraints=constraints,
878 was_forced=was_forced,
879 index=len(self.nodes),
880 )
881 self.__span_record.record_choice()
882 self.nodes += (node,)
883 self.length += size
884
885 return value
886
887 def draw_integer(
888 self,
889 min_value: Optional[int] = None,
890 max_value: Optional[int] = None,
891 *,
892 weights: Optional[dict[int, float]] = None,
893 shrink_towards: int = 0,
894 forced: Optional[int] = None,
895 observe: bool = True,
896 ) -> int:
897 # Validate arguments
898 if weights is not None:
899 assert min_value is not None
900 assert max_value is not None
901 assert len(weights) <= 255 # arbitrary practical limit
902 # We can and should eventually support total weights. But this
903 # complicates shrinking as we can no longer assume we can force
904 # a value to the unmapped probability mass if that mass might be 0.
905 assert sum(weights.values()) < 1
906 # similarly, things get simpler if we assume every value is possible.
907 # we'll want to drop this restriction eventually.
908 assert all(w != 0 for w in weights.values())
909
910 if forced is not None and min_value is not None:
911 assert min_value <= forced
912 if forced is not None and max_value is not None:
913 assert forced <= max_value
914
915 constraints: IntegerConstraints = self._pooled_constraints(
916 "integer",
917 {
918 "min_value": min_value,
919 "max_value": max_value,
920 "weights": weights,
921 "shrink_towards": shrink_towards,
922 },
923 )
924 return self._draw("integer", constraints, observe=observe, forced=forced)
925
926 def draw_float(
927 self,
928 min_value: float = -math.inf,
929 max_value: float = math.inf,
930 *,
931 allow_nan: bool = True,
932 smallest_nonzero_magnitude: float = SMALLEST_SUBNORMAL,
933 # TODO: consider supporting these float widths at the choice sequence
934 # level in the future.
935 # width: Literal[16, 32, 64] = 64,
936 forced: Optional[float] = None,
937 observe: bool = True,
938 ) -> float:
939 assert smallest_nonzero_magnitude > 0
940 assert not math.isnan(min_value)
941 assert not math.isnan(max_value)
942
943 if smallest_nonzero_magnitude == 0.0: # pragma: no cover
944 raise FloatingPointError(
945 "Got allow_subnormal=True, but we can't represent subnormal floats "
946 "right now, in violation of the IEEE-754 floating-point "
947 "specification. This is usually because something was compiled with "
948 "-ffast-math or a similar option, which sets global processor state. "
949 "See https://simonbyrne.github.io/notes/fastmath/ for a more detailed "
950 "writeup - and good luck!"
951 )
952
953 if forced is not None:
954 assert allow_nan or not math.isnan(forced)
955 assert math.isnan(forced) or (
956 sign_aware_lte(min_value, forced) and sign_aware_lte(forced, max_value)
957 )
958
959 constraints: FloatConstraints = self._pooled_constraints(
960 "float",
961 {
962 "min_value": min_value,
963 "max_value": max_value,
964 "allow_nan": allow_nan,
965 "smallest_nonzero_magnitude": smallest_nonzero_magnitude,
966 },
967 )
968 return self._draw("float", constraints, observe=observe, forced=forced)
969
970 def draw_string(
971 self,
972 intervals: IntervalSet,
973 *,
974 min_size: int = 0,
975 max_size: int = COLLECTION_DEFAULT_MAX_SIZE,
976 forced: Optional[str] = None,
977 observe: bool = True,
978 ) -> str:
979 assert forced is None or min_size <= len(forced) <= max_size
980 assert min_size >= 0
981 if len(intervals) == 0:
982 assert min_size == 0
983
984 constraints: StringConstraints = self._pooled_constraints(
985 "string",
986 {
987 "intervals": intervals,
988 "min_size": min_size,
989 "max_size": max_size,
990 },
991 )
992 return self._draw("string", constraints, observe=observe, forced=forced)
993
994 def draw_bytes(
995 self,
996 min_size: int = 0,
997 max_size: int = COLLECTION_DEFAULT_MAX_SIZE,
998 *,
999 forced: Optional[bytes] = None,
1000 observe: bool = True,
1001 ) -> bytes:
1002 assert forced is None or min_size <= len(forced) <= max_size
1003 assert min_size >= 0
1004
1005 constraints: BytesConstraints = self._pooled_constraints(
1006 "bytes", {"min_size": min_size, "max_size": max_size}
1007 )
1008 return self._draw("bytes", constraints, observe=observe, forced=forced)
1009
1010 def draw_boolean(
1011 self,
1012 p: float = 0.5,
1013 *,
1014 forced: Optional[bool] = None,
1015 observe: bool = True,
1016 ) -> bool:
1017 assert (forced is not True) or p > 0
1018 assert (forced is not False) or p < 1
1019
1020 constraints: BooleanConstraints = self._pooled_constraints("boolean", {"p": p})
1021 return self._draw("boolean", constraints, observe=observe, forced=forced)
1022
1023 @overload
1024 def _pooled_constraints(
1025 self, choice_type: Literal["integer"], constraints: IntegerConstraints
1026 ) -> IntegerConstraints: ...
1027
1028 @overload
1029 def _pooled_constraints(
1030 self, choice_type: Literal["float"], constraints: FloatConstraints
1031 ) -> FloatConstraints: ...
1032
1033 @overload
1034 def _pooled_constraints(
1035 self, choice_type: Literal["string"], constraints: StringConstraints
1036 ) -> StringConstraints: ...
1037
1038 @overload
1039 def _pooled_constraints(
1040 self, choice_type: Literal["bytes"], constraints: BytesConstraints
1041 ) -> BytesConstraints: ...
1042
1043 @overload
1044 def _pooled_constraints(
1045 self, choice_type: Literal["boolean"], constraints: BooleanConstraints
1046 ) -> BooleanConstraints: ...
1047
1048 def _pooled_constraints(
1049 self, choice_type: ChoiceTypeT, constraints: ChoiceConstraintsT
1050 ) -> ChoiceConstraintsT:
1051 """Memoize common dictionary objects to reduce memory pressure."""
1052 # caching runs afoul of nondeterminism checks
1053 if self.provider.avoid_realization:
1054 return constraints
1055
1056 key = (choice_type, *choice_constraints_key(choice_type, constraints))
1057 try:
1058 return POOLED_CONSTRAINTS_CACHE[key]
1059 except KeyError:
1060 POOLED_CONSTRAINTS_CACHE[key] = constraints
1061 return constraints
1062
1063 def _pop_choice(
1064 self,
1065 choice_type: ChoiceTypeT,
1066 constraints: ChoiceConstraintsT,
1067 *,
1068 forced: Optional[ChoiceT],
1069 ) -> ChoiceT:
1070 assert self.prefix is not None
1071 # checked in _draw
1072 assert self.index < len(self.prefix)
1073
1074 value = self.prefix[self.index]
1075 if isinstance(value, ChoiceTemplate):
1076 node: ChoiceTemplate = value
1077 if node.count is not None:
1078 assert node.count >= 0
1079 # node templates have to be at the end for now, since it's not immediately
1080 # apparent how to handle overruning a node template while generating a single
1081 # node if the alternative is not "the entire data is an overrun".
1082 assert self.index == len(self.prefix) - 1
1083 if node.type == "simplest":
1084 if forced is not None:
1085 choice = forced
1086 try:
1087 choice = choice_from_index(0, choice_type, constraints)
1088 except ChoiceTooLarge:
1089 self.mark_overrun()
1090 else:
1091 raise NotImplementedError
1092
1093 if node.count is not None:
1094 node.count -= 1
1095 if node.count < 0:
1096 self.mark_overrun()
1097 return choice
1098
1099 choice = value
1100 node_choice_type = {
1101 str: "string",
1102 float: "float",
1103 int: "integer",
1104 bool: "boolean",
1105 bytes: "bytes",
1106 }[type(choice)]
1107 # If we're trying to:
1108 # * draw a different choice type at the same location
1109 # * draw the same choice type with a different constraints, which does not permit
1110 # the current value
1111 #
1112 # then we call this a misalignment, because the choice sequence has
1113 # changed from what we expected at some point. An easy misalignment is
1114 #
1115 # one_of(integers(0, 100), integers(101, 200))
1116 #
1117 # where the choice sequence [0, 100] has constraints {min_value: 0, max_value: 100}
1118 # at index 1, but [0, 101] has constraints {min_value: 101, max_value: 200} at
1119 # index 1 (which does not permit any of the values 0-100).
1120 #
1121 # When the choice sequence becomes misaligned, we generate a new value of the
1122 # type and constraints the strategy expects.
1123 if node_choice_type != choice_type or not choice_permitted(choice, constraints):
1124 # only track first misalignment for now.
1125 if self.misaligned_at is None:
1126 self.misaligned_at = (self.index, choice_type, constraints, forced)
1127 try:
1128 # Fill in any misalignments with index 0 choices. An alternative to
1129 # this is using the index of the misaligned choice instead
1130 # of index 0, which may be useful for maintaining
1131 # "similarly-complex choices" in the shrinker. This requires
1132 # attaching an index to every choice in ConjectureData.for_choices,
1133 # which we don't always have (e.g. when reading from db).
1134 #
1135 # If we really wanted this in the future we could make this complexity
1136 # optional, use it if present, and default to index 0 otherwise.
1137 # This complicates our internal api and so I'd like to avoid it
1138 # if possible.
1139 #
1140 # Additionally, I don't think slips which require
1141 # slipping to high-complexity values are common. Though arguably
1142 # we may want to expand a bit beyond *just* the simplest choice.
1143 # (we could for example consider sampling choices from index 0-10).
1144 choice = choice_from_index(0, choice_type, constraints)
1145 except ChoiceTooLarge:
1146 # should really never happen with a 0-index choice, but let's be safe.
1147 self.mark_overrun()
1148
1149 self.index += 1
1150 return choice
1151
1152 def as_result(self) -> Union[ConjectureResult, _Overrun]:
1153 """Convert the result of running this test into
1154 either an Overrun object or a ConjectureResult."""
1155
1156 assert self.frozen
1157 if self.status == Status.OVERRUN:
1158 return Overrun
1159 if self.__result is None:
1160 self.__result = ConjectureResult(
1161 status=self.status,
1162 interesting_origin=self.interesting_origin,
1163 spans=self.spans,
1164 nodes=self.nodes,
1165 length=self.length,
1166 output=self.output,
1167 expected_traceback=self.expected_traceback,
1168 expected_exception=self.expected_exception,
1169 extra_information=(
1170 self.extra_information
1171 if self.extra_information.has_information()
1172 else None
1173 ),
1174 has_discards=self.has_discards,
1175 target_observations=self.target_observations,
1176 tags=frozenset(self.tags),
1177 arg_slices=self.arg_slices,
1178 slice_comments=self.slice_comments,
1179 misaligned_at=self.misaligned_at,
1180 cannot_proceed_scope=self.cannot_proceed_scope,
1181 )
1182 assert self.__result is not None
1183 return self.__result
1184
1185 def __assert_not_frozen(self, name: str) -> None:
1186 if self.frozen:
1187 raise Frozen(f"Cannot call {name} on frozen ConjectureData")
1188
1189 def note(self, value: Any) -> None:
1190 self.__assert_not_frozen("note")
1191 if not isinstance(value, str):
1192 value = repr(value)
1193 self.output += value
1194
1195 def draw(
1196 self,
1197 strategy: "SearchStrategy[Ex]",
1198 label: Optional[int] = None,
1199 observe_as: Optional[str] = None,
1200 ) -> "Ex":
1201 from hypothesis.internal.observability import observability_enabled
1202 from hypothesis.strategies._internal.lazy import unwrap_strategies
1203 from hypothesis.strategies._internal.utils import to_jsonable
1204
1205 if self.is_find and not strategy.supports_find:
1206 raise InvalidArgument(
1207 f"Cannot use strategy {strategy!r} within a call to find "
1208 "(presumably because it would be invalid after the call had ended)."
1209 )
1210
1211 at_top_level = self.depth == 0
1212 start_time = None
1213 if at_top_level:
1214 # We start this timer early, because accessing attributes on a LazyStrategy
1215 # can be almost arbitrarily slow. In cases like characters() and text()
1216 # where we cache something expensive, this led to Flaky deadline errors!
1217 # See https://github.com/HypothesisWorks/hypothesis/issues/2108
1218 start_time = time.perf_counter()
1219 gc_start_time = gc_cumulative_time()
1220
1221 strategy.validate()
1222
1223 if strategy.is_empty:
1224 self.mark_invalid(f"empty strategy {self!r}")
1225
1226 if self.depth >= MAX_DEPTH:
1227 self.mark_invalid("max depth exceeded")
1228
1229 # Jump directly to the unwrapped strategy for the label and for do_draw.
1230 # This avoids adding an extra span to all lazy strategies.
1231 unwrapped = unwrap_strategies(strategy)
1232 if label is None:
1233 label = unwrapped.label
1234 assert isinstance(label, int)
1235
1236 self.start_span(label=label)
1237 try:
1238 if not at_top_level:
1239 return unwrapped.do_draw(self)
1240 assert start_time is not None
1241 key = observe_as or f"generate:unlabeled_{len(self.draw_times)}"
1242 try:
1243 try:
1244 v = unwrapped.do_draw(self)
1245 finally:
1246 # Subtract the time spent in GC to avoid overcounting, as it is
1247 # accounted for at the overall example level.
1248 in_gctime = gc_cumulative_time() - gc_start_time
1249 self.draw_times[key] = time.perf_counter() - start_time - in_gctime
1250 except Exception as err:
1251 add_note(
1252 err,
1253 f"while generating {key.removeprefix('generate:')!r} from {strategy!r}",
1254 )
1255 raise
1256 if observability_enabled():
1257 avoid = self.provider.avoid_realization
1258 self._observability_args[key] = to_jsonable(v, avoid_realization=avoid)
1259 return v
1260 finally:
1261 self.stop_span()
1262
1263 def start_span(self, label: int) -> None:
1264 self.provider.span_start(label)
1265 self.__assert_not_frozen("start_span")
1266 self.depth += 1
1267 # Logically it would make sense for this to just be
1268 # ``self.depth = max(self.depth, self.max_depth)``, which is what it used to
1269 # be until we ran the code under tracemalloc and found a rather significant
1270 # chunk of allocation was happening here. This was presumably due to varargs
1271 # or the like, but we didn't investigate further given that it was easy
1272 # to fix with this check.
1273 if self.depth > self.max_depth:
1274 self.max_depth = self.depth
1275 self.__span_record.start_span(label)
1276 self.labels_for_structure_stack.append({label})
1277
1278 def stop_span(self, *, discard: bool = False) -> None:
1279 self.provider.span_end(discard)
1280 if self.frozen:
1281 return
1282 if discard:
1283 self.has_discards = True
1284 self.depth -= 1
1285 assert self.depth >= -1
1286 self.__span_record.stop_span(discard=discard)
1287
1288 labels_for_structure = self.labels_for_structure_stack.pop()
1289
1290 if not discard:
1291 if self.labels_for_structure_stack:
1292 self.labels_for_structure_stack[-1].update(labels_for_structure)
1293 else:
1294 self.tags.update([structural_coverage(l) for l in labels_for_structure])
1295
1296 if discard:
1297 # Once we've discarded a span, every test case starting with
1298 # this prefix contains discards. We prune the tree at that point so
1299 # as to avoid future test cases bothering with this region, on the
1300 # assumption that some span that you could have used instead
1301 # there would *not* trigger the discard. This greatly speeds up
1302 # test case generation in some cases, because it allows us to
1303 # ignore large swathes of the search space that are effectively
1304 # redundant.
1305 #
1306 # A scenario that can cause us problems but which we deliberately
1307 # have decided not to support is that if there are side effects
1308 # during data generation then you may end up with a scenario where
1309 # every good test case generates a discard because the discarded
1310 # section sets up important things for later. This is not terribly
1311 # likely and all that you see in this case is some degradation in
1312 # quality of testing, so we don't worry about it.
1313 #
1314 # Note that killing the branch does *not* mean we will never
1315 # explore below this point, and in particular we may do so during
1316 # shrinking. Any explicit request for a data object that starts
1317 # with the branch here will work just fine, but novel prefix
1318 # generation will avoid it, and we can use it to detect when we
1319 # have explored the entire tree (up to redundancy).
1320
1321 self.observer.kill_branch()
1322
1323 @property
1324 def spans(self) -> Spans:
1325 assert self.frozen
1326 if self.__spans is None:
1327 self.__spans = Spans(record=self.__span_record)
1328 return self.__spans
1329
1330 def freeze(self) -> None:
1331 if self.frozen:
1332 return
1333 self.finish_time = time.perf_counter()
1334 self.gc_finish_time = gc_cumulative_time()
1335
1336 # Always finish by closing all remaining spans so that we have a valid tree.
1337 while self.depth >= 0:
1338 self.stop_span()
1339
1340 self.__span_record.freeze()
1341 self.frozen = True
1342 self.observer.conclude_test(self.status, self.interesting_origin)
1343
1344 def choice(
1345 self,
1346 values: Sequence[T],
1347 *,
1348 forced: Optional[T] = None,
1349 observe: bool = True,
1350 ) -> T:
1351 forced_i = None if forced is None else values.index(forced)
1352 i = self.draw_integer(
1353 0,
1354 len(values) - 1,
1355 forced=forced_i,
1356 observe=observe,
1357 )
1358 return values[i]
1359
1360 def conclude_test(
1361 self,
1362 status: Status,
1363 interesting_origin: Optional[InterestingOrigin] = None,
1364 ) -> NoReturn:
1365 assert (interesting_origin is None) or (status == Status.INTERESTING)
1366 self.__assert_not_frozen("conclude_test")
1367 self.interesting_origin = interesting_origin
1368 self.status = status
1369 self.freeze()
1370 raise StopTest(self.testcounter)
1371
1372 def mark_interesting(self, interesting_origin: InterestingOrigin) -> NoReturn:
1373 self.conclude_test(Status.INTERESTING, interesting_origin)
1374
1375 def mark_invalid(self, why: Optional[str] = None) -> NoReturn:
1376 if why is not None:
1377 self.events["invalid because"] = why
1378 self.conclude_test(Status.INVALID)
1379
1380 def mark_overrun(self) -> NoReturn:
1381 self.conclude_test(Status.OVERRUN)
1382
1383
1384def draw_choice(
1385 choice_type: ChoiceTypeT, constraints: ChoiceConstraintsT, *, random: Random
1386) -> ChoiceT:
1387 cd = ConjectureData(random=random)
1388 return cast(ChoiceT, getattr(cd.provider, f"draw_{choice_type}")(**constraints))