1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
10
11import math
12import time
13from collections import defaultdict
14from collections.abc import Hashable, Iterable, Iterator, Sequence
15from enum import IntEnum
16from functools import cached_property
17from random import Random
18from typing import (
19 TYPE_CHECKING,
20 Any,
21 Literal,
22 NoReturn,
23 Optional,
24 TypeVar,
25 Union,
26 cast,
27 overload,
28)
29
30import attr
31
32from hypothesis.errors import (
33 CannotProceedScopeT,
34 ChoiceTooLarge,
35 Frozen,
36 InvalidArgument,
37 StopTest,
38)
39from hypothesis.internal.cache import LRUCache
40from hypothesis.internal.compat import add_note
41from hypothesis.internal.conjecture.choice import (
42 BooleanConstraints,
43 BytesConstraints,
44 ChoiceConstraintsT,
45 ChoiceNode,
46 ChoiceT,
47 ChoiceTemplate,
48 ChoiceTypeT,
49 FloatConstraints,
50 IntegerConstraints,
51 StringConstraints,
52 choice_constraints_key,
53 choice_from_index,
54 choice_permitted,
55 choices_size,
56)
57from hypothesis.internal.conjecture.junkdrawer import IntList, gc_cumulative_time
58from hypothesis.internal.conjecture.providers import (
59 COLLECTION_DEFAULT_MAX_SIZE,
60 HypothesisProvider,
61 PrimitiveProvider,
62)
63from hypothesis.internal.conjecture.utils import calc_label_from_name
64from hypothesis.internal.escalation import InterestingOrigin
65from hypothesis.internal.floats import (
66 SMALLEST_SUBNORMAL,
67 float_to_int,
68 int_to_float,
69 sign_aware_lte,
70)
71from hypothesis.internal.intervalsets import IntervalSet
72from hypothesis.internal.observability import PredicateCounts
73from hypothesis.reporting import debug_report
74from hypothesis.utils.conventions import not_set
75from hypothesis.utils.threading import ThreadLocal
76
77if TYPE_CHECKING:
78 from typing import TypeAlias
79
80 from hypothesis.strategies import SearchStrategy
81 from hypothesis.strategies._internal.core import DataObject
82 from hypothesis.strategies._internal.random import RandomState
83 from hypothesis.strategies._internal.strategies import Ex
84
85
86def __getattr__(name: str) -> Any:
87 if name == "AVAILABLE_PROVIDERS":
88 from hypothesis._settings import note_deprecation
89 from hypothesis.internal.conjecture.providers import AVAILABLE_PROVIDERS
90
91 note_deprecation(
92 "hypothesis.internal.conjecture.data.AVAILABLE_PROVIDERS has been moved to "
93 "hypothesis.internal.conjecture.providers.AVAILABLE_PROVIDERS.",
94 since="2025-01-25",
95 has_codemod=False,
96 stacklevel=1,
97 )
98 return AVAILABLE_PROVIDERS
99
100 raise AttributeError(
101 f"Module 'hypothesis.internal.conjecture.data' has no attribute {name}"
102 )
103
104
105T = TypeVar("T")
106TargetObservations = dict[str, Union[int, float]]
107# index, choice_type, constraints, forced value
108MisalignedAt: "TypeAlias" = tuple[
109 int, ChoiceTypeT, ChoiceConstraintsT, Optional[ChoiceT]
110]
111
112TOP_LABEL = calc_label_from_name("top")
113MAX_DEPTH = 100
114
115threadlocal = ThreadLocal(global_test_counter=int)
116
117
118class Status(IntEnum):
119 OVERRUN = 0
120 INVALID = 1
121 VALID = 2
122 INTERESTING = 3
123
124 def __repr__(self) -> str:
125 return f"Status.{self.name}"
126
127
128@attr.s(slots=True, frozen=True)
129class StructuralCoverageTag:
130 label: int = attr.ib()
131
132
133STRUCTURAL_COVERAGE_CACHE: dict[int, StructuralCoverageTag] = {}
134
135
136def structural_coverage(label: int) -> StructuralCoverageTag:
137 try:
138 return STRUCTURAL_COVERAGE_CACHE[label]
139 except KeyError:
140 return STRUCTURAL_COVERAGE_CACHE.setdefault(label, StructuralCoverageTag(label))
141
142
143# This cache can be quite hot and so we prefer LRUCache over LRUReusedCache for
144# performance. We lose scan resistance, but that's probably fine here.
145POOLED_CONSTRAINTS_CACHE: LRUCache[tuple[Any, ...], ChoiceConstraintsT] = LRUCache(4096)
146
147
148class Span:
149 """A span tracks the hierarchical structure of choices within a single test run.
150
151 Spans are created to mark regions of the choice sequence that that are
152 logically related to each other. For instance, Hypothesis tracks:
153 - A single top-level span for the entire choice sequence
154 - A span for the choices made by each strategy
155 - Some strategies define additional spans within their choices. For instance,
156 st.lists() tracks the "should add another element" choice and the "add
157 another element" choices as separate spans.
158
159 Spans provide useful information to the shrinker, mutator, targeted PBT,
160 and other subsystems of Hypothesis.
161
162 Rather than store each ``Span`` as a rich object, it is actually
163 just an index into the ``Spans`` class defined below. This has two
164 purposes: Firstly, for most properties of spans we will never need
165 to allocate storage at all, because most properties are not used on
166 most spans. Secondly, by storing the spans as compact lists
167 of integers, we save a considerable amount of space compared to
168 Python's normal object size.
169
170 This does have the downside that it increases the amount of allocation
171 we do, and slows things down as a result, in some usage patterns because
172 we repeatedly allocate the same Span or int objects, but it will
173 often dramatically reduce our memory usage, so is worth it.
174 """
175
176 __slots__ = ("index", "owner")
177
178 def __init__(self, owner: "Spans", index: int) -> None:
179 self.owner = owner
180 self.index = index
181
182 def __eq__(self, other: object) -> bool:
183 if self is other:
184 return True
185 if not isinstance(other, Span):
186 return NotImplemented
187 return (self.owner is other.owner) and (self.index == other.index)
188
189 def __ne__(self, other: object) -> bool:
190 if self is other:
191 return False
192 if not isinstance(other, Span):
193 return NotImplemented
194 return (self.owner is not other.owner) or (self.index != other.index)
195
196 def __repr__(self) -> str:
197 return f"spans[{self.index}]"
198
199 @property
200 def label(self) -> int:
201 """A label is an opaque value that associates each span with its
202 approximate origin, such as a particular strategy class or a particular
203 kind of draw."""
204 return self.owner.labels[self.owner.label_indices[self.index]]
205
206 @property
207 def parent(self) -> Optional[int]:
208 """The index of the span that this one is nested directly within."""
209 if self.index == 0:
210 return None
211 return self.owner.parentage[self.index]
212
213 @property
214 def start(self) -> int:
215 return self.owner.starts[self.index]
216
217 @property
218 def end(self) -> int:
219 return self.owner.ends[self.index]
220
221 @property
222 def depth(self) -> int:
223 """
224 Depth of this span in the span tree. The top-level span has a depth of 0.
225 """
226 return self.owner.depths[self.index]
227
228 @property
229 def discarded(self) -> bool:
230 """True if this is span's ``stop_span`` call had ``discard`` set to
231 ``True``. This means we believe that the shrinker should be able to delete
232 this span completely, without affecting the value produced by its enclosing
233 strategy. Typically set when a rejection sampler decides to reject a
234 generated value and try again."""
235 return self.index in self.owner.discarded
236
237 @property
238 def choice_count(self) -> int:
239 """The number of choices in this span."""
240 return self.end - self.start
241
242 @property
243 def children(self) -> "list[Span]":
244 """The list of all spans with this as a parent, in increasing index
245 order."""
246 return [self.owner[i] for i in self.owner.children[self.index]]
247
248
249class SpanProperty:
250 """There are many properties of spans that we calculate by
251 essentially rerunning the test case multiple times based on the
252 calls which we record in SpanProperty.
253
254 This class defines a visitor, subclasses of which can be used
255 to calculate these properties.
256 """
257
258 def __init__(self, spans: "Spans"):
259 self.span_stack: list[int] = []
260 self.spans = spans
261 self.span_count = 0
262 self.choice_count = 0
263
264 def run(self) -> Any:
265 """Rerun the test case with this visitor and return the
266 results of ``self.finish()``."""
267 for record in self.spans.trail:
268 if record == TrailType.STOP_SPAN_DISCARD:
269 self.__pop(discarded=True)
270 elif record == TrailType.STOP_SPAN_NO_DISCARD:
271 self.__pop(discarded=False)
272 elif record == TrailType.CHOICE:
273 self.choice_count += 1
274 else:
275 # everything after TrailType.CHOICE is the label of a span start.
276 self.__push(record - TrailType.CHOICE - 1)
277
278 return self.finish()
279
280 def __push(self, label_index: int) -> None:
281 i = self.span_count
282 assert i < len(self.spans)
283 self.start_span(i, label_index=label_index)
284 self.span_count += 1
285 self.span_stack.append(i)
286
287 def __pop(self, *, discarded: bool) -> None:
288 i = self.span_stack.pop()
289 self.stop_span(i, discarded=discarded)
290
291 def start_span(self, i: int, label_index: int) -> None:
292 """Called at the start of each span, with ``i`` the
293 index of the span and ``label_index`` the index of
294 its label in ``self.spans.labels``."""
295
296 def stop_span(self, i: int, *, discarded: bool) -> None:
297 """Called at the end of each span, with ``i`` the
298 index of the span and ``discarded`` being ``True`` if ``stop_span``
299 was called with ``discard=True``."""
300
301 def finish(self) -> Any:
302 raise NotImplementedError
303
304
305class TrailType(IntEnum):
306 STOP_SPAN_DISCARD = 1
307 STOP_SPAN_NO_DISCARD = 2
308 CHOICE = 3
309 # every trail element larger than TrailType.CHOICE is the label of a span
310 # start, offset by its index. So the first span label is stored as 4, the
311 # second as 5, etc, regardless of its actual integer label.
312
313
314class SpanRecord:
315 """Records the series of ``start_span``, ``stop_span``, and
316 ``draw_bits`` calls so that these may be stored in ``Spans`` and
317 replayed when we need to know about the structure of individual
318 ``Span`` objects.
319
320 Note that there is significant similarity between this class and
321 ``DataObserver``, and the plan is to eventually unify them, but
322 they currently have slightly different functions and implementations.
323 """
324
325 def __init__(self) -> None:
326 self.labels: list[int] = []
327 self.__index_of_labels: Optional[dict[int, int]] = {}
328 self.trail = IntList()
329 self.nodes: list[ChoiceNode] = []
330
331 def freeze(self) -> None:
332 self.__index_of_labels = None
333
334 def record_choice(self) -> None:
335 self.trail.append(TrailType.CHOICE)
336
337 def start_span(self, label: int) -> None:
338 assert self.__index_of_labels is not None
339 try:
340 i = self.__index_of_labels[label]
341 except KeyError:
342 i = self.__index_of_labels.setdefault(label, len(self.labels))
343 self.labels.append(label)
344 self.trail.append(TrailType.CHOICE + 1 + i)
345
346 def stop_span(self, *, discard: bool) -> None:
347 if discard:
348 self.trail.append(TrailType.STOP_SPAN_DISCARD)
349 else:
350 self.trail.append(TrailType.STOP_SPAN_NO_DISCARD)
351
352
353class _starts_and_ends(SpanProperty):
354 def __init__(self, spans: "Spans") -> None:
355 super().__init__(spans)
356 self.starts = IntList.of_length(len(self.spans))
357 self.ends = IntList.of_length(len(self.spans))
358
359 def start_span(self, i: int, label_index: int) -> None:
360 self.starts[i] = self.choice_count
361
362 def stop_span(self, i: int, *, discarded: bool) -> None:
363 self.ends[i] = self.choice_count
364
365 def finish(self) -> tuple[IntList, IntList]:
366 return (self.starts, self.ends)
367
368
369class _discarded(SpanProperty):
370 def __init__(self, spans: "Spans") -> None:
371 super().__init__(spans)
372 self.result: set[int] = set()
373
374 def finish(self) -> frozenset[int]:
375 return frozenset(self.result)
376
377 def stop_span(self, i: int, *, discarded: bool) -> None:
378 if discarded:
379 self.result.add(i)
380
381
382class _parentage(SpanProperty):
383 def __init__(self, spans: "Spans") -> None:
384 super().__init__(spans)
385 self.result = IntList.of_length(len(self.spans))
386
387 def stop_span(self, i: int, *, discarded: bool) -> None:
388 if i > 0:
389 self.result[i] = self.span_stack[-1]
390
391 def finish(self) -> IntList:
392 return self.result
393
394
395class _depths(SpanProperty):
396 def __init__(self, spans: "Spans") -> None:
397 super().__init__(spans)
398 self.result = IntList.of_length(len(self.spans))
399
400 def start_span(self, i: int, label_index: int) -> None:
401 self.result[i] = len(self.span_stack)
402
403 def finish(self) -> IntList:
404 return self.result
405
406
407class _label_indices(SpanProperty):
408 def __init__(self, spans: "Spans") -> None:
409 super().__init__(spans)
410 self.result = IntList.of_length(len(self.spans))
411
412 def start_span(self, i: int, label_index: int) -> None:
413 self.result[i] = label_index
414
415 def finish(self) -> IntList:
416 return self.result
417
418
419class _mutator_groups(SpanProperty):
420 def __init__(self, spans: "Spans") -> None:
421 super().__init__(spans)
422 self.groups: dict[int, set[tuple[int, int]]] = defaultdict(set)
423
424 def start_span(self, i: int, label_index: int) -> None:
425 # TODO should we discard start == end cases? occurs for eg st.data()
426 # which is conditionally or never drawn from. arguably swapping
427 # nodes with the empty list is a useful mutation enabled by start == end?
428 key = (self.spans[i].start, self.spans[i].end)
429 self.groups[label_index].add(key)
430
431 def finish(self) -> Iterable[set[tuple[int, int]]]:
432 # Discard groups with only one span, since the mutator can't
433 # do anything useful with them.
434 return [g for g in self.groups.values() if len(g) >= 2]
435
436
437class Spans:
438 """A lazy collection of ``Span`` objects, derived from
439 the record of recorded behaviour in ``SpanRecord``.
440
441 Behaves logically as if it were a list of ``Span`` objects,
442 but actually mostly exists as a compact store of information
443 for them to reference into. All properties on here are best
444 understood as the backing storage for ``Span`` and are
445 described there.
446 """
447
448 def __init__(self, record: SpanRecord) -> None:
449 self.trail = record.trail
450 self.labels = record.labels
451 self.__length = self.trail.count(
452 TrailType.STOP_SPAN_DISCARD
453 ) + record.trail.count(TrailType.STOP_SPAN_NO_DISCARD)
454 self.__children: Optional[list[Sequence[int]]] = None
455
456 @cached_property
457 def starts_and_ends(self) -> tuple[IntList, IntList]:
458 return _starts_and_ends(self).run()
459
460 @property
461 def starts(self) -> IntList:
462 return self.starts_and_ends[0]
463
464 @property
465 def ends(self) -> IntList:
466 return self.starts_and_ends[1]
467
468 @cached_property
469 def discarded(self) -> frozenset[int]:
470 return _discarded(self).run()
471
472 @cached_property
473 def parentage(self) -> IntList:
474 return _parentage(self).run()
475
476 @cached_property
477 def depths(self) -> IntList:
478 return _depths(self).run()
479
480 @cached_property
481 def label_indices(self) -> IntList:
482 return _label_indices(self).run()
483
484 @cached_property
485 def mutator_groups(self) -> list[set[tuple[int, int]]]:
486 return _mutator_groups(self).run()
487
488 @property
489 def children(self) -> list[Sequence[int]]:
490 if self.__children is None:
491 children = [IntList() for _ in range(len(self))]
492 for i, p in enumerate(self.parentage):
493 if i > 0:
494 children[p].append(i)
495 # Replace empty children lists with a tuple to reduce
496 # memory usage.
497 for i, c in enumerate(children):
498 if not c:
499 children[i] = () # type: ignore
500 self.__children = children # type: ignore
501 return self.__children # type: ignore
502
503 def __len__(self) -> int:
504 return self.__length
505
506 def __getitem__(self, i: int) -> Span:
507 n = self.__length
508 if i < -n or i >= n:
509 raise IndexError(f"Index {i} out of range [-{n}, {n})")
510 if i < 0:
511 i += n
512 return Span(self, i)
513
514 # not strictly necessary as we have len/getitem, but required for mypy.
515 # https://github.com/python/mypy/issues/9737
516 def __iter__(self) -> Iterator[Span]:
517 for i in range(len(self)):
518 yield self[i]
519
520
521class _Overrun:
522 status: Status = Status.OVERRUN
523
524 def __repr__(self) -> str:
525 return "Overrun"
526
527
528Overrun = _Overrun()
529
530
531class DataObserver:
532 """Observer class for recording the behaviour of a
533 ConjectureData object, primarily used for tracking
534 the behaviour in the tree cache."""
535
536 def conclude_test(
537 self,
538 status: Status,
539 interesting_origin: Optional[InterestingOrigin],
540 ) -> None:
541 """Called when ``conclude_test`` is called on the
542 observed ``ConjectureData``, with the same arguments.
543
544 Note that this is called after ``freeze`` has completed.
545 """
546
547 def kill_branch(self) -> None:
548 """Mark this part of the tree as not worth re-exploring."""
549
550 def draw_integer(
551 self, value: int, *, constraints: IntegerConstraints, was_forced: bool
552 ) -> None:
553 pass
554
555 def draw_float(
556 self, value: float, *, constraints: FloatConstraints, was_forced: bool
557 ) -> None:
558 pass
559
560 def draw_string(
561 self, value: str, *, constraints: StringConstraints, was_forced: bool
562 ) -> None:
563 pass
564
565 def draw_bytes(
566 self, value: bytes, *, constraints: BytesConstraints, was_forced: bool
567 ) -> None:
568 pass
569
570 def draw_boolean(
571 self, value: bool, *, constraints: BooleanConstraints, was_forced: bool
572 ) -> None:
573 pass
574
575
576@attr.s(slots=True)
577class ConjectureResult:
578 """Result class storing the parts of ConjectureData that we
579 will care about after the original ConjectureData has outlived its
580 usefulness."""
581
582 status: Status = attr.ib()
583 interesting_origin: Optional[InterestingOrigin] = attr.ib()
584 nodes: tuple[ChoiceNode, ...] = attr.ib(eq=False, repr=False)
585 length: int = attr.ib()
586 output: str = attr.ib()
587 expected_exception: Optional[BaseException] = attr.ib()
588 expected_traceback: Optional[str] = attr.ib()
589 has_discards: bool = attr.ib()
590 target_observations: TargetObservations = attr.ib()
591 tags: frozenset[StructuralCoverageTag] = attr.ib()
592 spans: Spans = attr.ib(repr=False, eq=False)
593 arg_slices: set[tuple[int, int]] = attr.ib(repr=False)
594 slice_comments: dict[tuple[int, int], str] = attr.ib(repr=False)
595 misaligned_at: Optional[MisalignedAt] = attr.ib(repr=False)
596 cannot_proceed_scope: Optional[CannotProceedScopeT] = attr.ib(repr=False)
597
598 def as_result(self) -> "ConjectureResult":
599 return self
600
601 @property
602 def choices(self) -> tuple[ChoiceT, ...]:
603 return tuple(node.value for node in self.nodes)
604
605
606class ConjectureData:
607 @classmethod
608 def for_choices(
609 cls,
610 choices: Sequence[Union[ChoiceTemplate, ChoiceT]],
611 *,
612 observer: Optional[DataObserver] = None,
613 provider: Union[type, PrimitiveProvider] = HypothesisProvider,
614 random: Optional[Random] = None,
615 ) -> "ConjectureData":
616 from hypothesis.internal.conjecture.engine import choice_count
617
618 return cls(
619 max_choices=choice_count(choices),
620 random=random,
621 prefix=choices,
622 observer=observer,
623 provider=provider,
624 )
625
626 def __init__(
627 self,
628 *,
629 random: Optional[Random],
630 observer: Optional[DataObserver] = None,
631 provider: Union[type, PrimitiveProvider] = HypothesisProvider,
632 prefix: Optional[Sequence[Union[ChoiceTemplate, ChoiceT]]] = None,
633 max_choices: Optional[int] = None,
634 provider_kw: Optional[dict[str, Any]] = None,
635 ) -> None:
636 from hypothesis.internal.conjecture.engine import BUFFER_SIZE
637
638 if observer is None:
639 observer = DataObserver()
640 if provider_kw is None:
641 provider_kw = {}
642 elif not isinstance(provider, type):
643 raise InvalidArgument(
644 f"Expected {provider=} to be a class since {provider_kw=} was "
645 "passed, but got an instance instead."
646 )
647
648 assert isinstance(observer, DataObserver)
649 self.observer = observer
650 self.max_choices = max_choices
651 self.max_length = BUFFER_SIZE
652 self.overdraw = 0
653 self._random = random
654
655 self.length = 0
656 self.index = 0
657 self.output = ""
658 self.status = Status.VALID
659 self.frozen = False
660 self.testcounter = threadlocal.global_test_counter
661 threadlocal.global_test_counter += 1
662 self.start_time = time.perf_counter()
663 self.gc_start_time = gc_cumulative_time()
664 self.events: dict[str, Union[str, int, float]] = {}
665 self.interesting_origin: Optional[InterestingOrigin] = None
666 self.draw_times: dict[str, float] = {}
667 self._stateful_run_times: dict[str, float] = defaultdict(float)
668 self.max_depth = 0
669 self.has_discards = False
670
671 self.provider: PrimitiveProvider = (
672 provider(self, **provider_kw) if isinstance(provider, type) else provider
673 )
674 assert isinstance(self.provider, PrimitiveProvider)
675
676 self.__result: Optional[ConjectureResult] = None
677
678 # Observations used for targeted search. They'll be aggregated in
679 # ConjectureRunner.generate_new_examples and fed to TargetSelector.
680 self.target_observations: TargetObservations = {}
681
682 # Tags which indicate something about which part of the search space
683 # this example is in. These are used to guide generation.
684 self.tags: set[StructuralCoverageTag] = set()
685 self.labels_for_structure_stack: list[set[int]] = []
686
687 # Normally unpopulated but we need this in the niche case
688 # that self.as_result() is Overrun but we still want the
689 # examples for reporting purposes.
690 self.__spans: Optional[Spans] = None
691
692 # We want the top level span to have depth 0, so we start
693 # at -1.
694 self.depth = -1
695 self.__span_record = SpanRecord()
696
697 # Slice indices for discrete reportable parts that which-parts-matter can
698 # try varying, to report if the minimal example always fails anyway.
699 self.arg_slices: set[tuple[int, int]] = set()
700 self.slice_comments: dict[tuple[int, int], str] = {}
701 self._observability_args: dict[str, Any] = {}
702 self._observability_predicates: defaultdict[str, PredicateCounts] = defaultdict(
703 PredicateCounts
704 )
705
706 self._sampled_from_all_strategies_elements_message: Optional[
707 tuple[str, object]
708 ] = None
709 self._shared_strategy_draws: dict[Hashable, tuple[Any, "SearchStrategy"]] = {}
710 self._shared_data_strategy: Optional[DataObject] = None
711 self._stateful_repr_parts: Optional[list[Any]] = None
712 self.states_for_ids: Optional[dict[int, RandomState]] = None
713 self.seeds_to_states: Optional[dict[Any, RandomState]] = None
714 self.hypothesis_runner: Any = not_set
715
716 self.expected_exception: Optional[BaseException] = None
717 self.expected_traceback: Optional[str] = None
718
719 self.prefix = prefix
720 self.nodes: tuple[ChoiceNode, ...] = ()
721 self.misaligned_at: Optional[MisalignedAt] = None
722 self.cannot_proceed_scope: Optional[CannotProceedScopeT] = None
723 self.start_span(TOP_LABEL)
724
725 def __repr__(self) -> str:
726 return "ConjectureData(%s, %d choices%s)" % (
727 self.status.name,
728 len(self.nodes),
729 ", frozen" if self.frozen else "",
730 )
731
732 @property
733 def choices(self) -> tuple[ChoiceT, ...]:
734 return tuple(node.value for node in self.nodes)
735
736 # draw_* functions might be called in one of two contexts: either "above" or
737 # "below" the choice sequence. For instance, draw_string calls draw_boolean
738 # from ``many`` when calculating the number of characters to return. We do
739 # not want these choices to get written to the choice sequence, because they
740 # are not true choices themselves.
741 #
742 # `observe` formalizes this. The choice will only be written to the choice
743 # sequence if observe is True.
744
745 @overload
746 def _draw(
747 self,
748 choice_type: Literal["integer"],
749 constraints: IntegerConstraints,
750 *,
751 observe: bool,
752 forced: Optional[int],
753 ) -> int: ...
754
755 @overload
756 def _draw(
757 self,
758 choice_type: Literal["float"],
759 constraints: FloatConstraints,
760 *,
761 observe: bool,
762 forced: Optional[float],
763 ) -> float: ...
764
765 @overload
766 def _draw(
767 self,
768 choice_type: Literal["string"],
769 constraints: StringConstraints,
770 *,
771 observe: bool,
772 forced: Optional[str],
773 ) -> str: ...
774
775 @overload
776 def _draw(
777 self,
778 choice_type: Literal["bytes"],
779 constraints: BytesConstraints,
780 *,
781 observe: bool,
782 forced: Optional[bytes],
783 ) -> bytes: ...
784
785 @overload
786 def _draw(
787 self,
788 choice_type: Literal["boolean"],
789 constraints: BooleanConstraints,
790 *,
791 observe: bool,
792 forced: Optional[bool],
793 ) -> bool: ...
794
795 def _draw(
796 self,
797 choice_type: ChoiceTypeT,
798 constraints: ChoiceConstraintsT,
799 *,
800 observe: bool,
801 forced: Optional[ChoiceT],
802 ) -> ChoiceT:
803 # this is somewhat redundant with the length > max_length check at the
804 # end of the function, but avoids trying to use a null self.random when
805 # drawing past the node of a ConjectureData.for_choices data.
806 if self.length == self.max_length:
807 debug_report(f"overrun because hit {self.max_length=}")
808 self.mark_overrun()
809 if len(self.nodes) == self.max_choices:
810 debug_report(f"overrun because hit {self.max_choices=}")
811 self.mark_overrun()
812
813 if observe and self.prefix is not None and self.index < len(self.prefix):
814 value = self._pop_choice(choice_type, constraints, forced=forced)
815 elif forced is None:
816 value = getattr(self.provider, f"draw_{choice_type}")(**constraints)
817
818 if forced is not None:
819 value = forced
820
821 # nan values generated via int_to_float break list membership:
822 #
823 # >>> n = 18444492273895866368
824 # >>> assert math.isnan(int_to_float(n))
825 # >>> assert int_to_float(n) not in [int_to_float(n)]
826 #
827 # because int_to_float nans are not equal in the sense of either
828 # `a == b` or `a is b`.
829 #
830 # This can lead to flaky errors when collections require unique
831 # floats. What was happening is that in some places we provided math.nan
832 # provide math.nan, and in others we provided
833 # int_to_float(float_to_int(math.nan)), and which one gets used
834 # was not deterministic across test iterations.
835 #
836 # To fix this, *never* provide a nan value which is equal (via `is`) to
837 # another provided nan value. This sacrifices some test power; we should
838 # bring that back (ABOVE the choice sequence layer) in the future.
839 #
840 # See https://github.com/HypothesisWorks/hypothesis/issues/3926.
841 if choice_type == "float":
842 assert isinstance(value, float)
843 if math.isnan(value):
844 value = int_to_float(float_to_int(value))
845
846 if observe:
847 was_forced = forced is not None
848 getattr(self.observer, f"draw_{choice_type}")(
849 value, constraints=constraints, was_forced=was_forced
850 )
851 size = 0 if self.provider.avoid_realization else choices_size([value])
852 if self.length + size > self.max_length:
853 debug_report(
854 f"overrun because {self.length=} + {size=} > {self.max_length=}"
855 )
856 self.mark_overrun()
857
858 node = ChoiceNode(
859 type=choice_type,
860 value=value,
861 constraints=constraints,
862 was_forced=was_forced,
863 index=len(self.nodes),
864 )
865 self.__span_record.record_choice()
866 self.nodes += (node,)
867 self.length += size
868
869 return value
870
871 def draw_integer(
872 self,
873 min_value: Optional[int] = None,
874 max_value: Optional[int] = None,
875 *,
876 weights: Optional[dict[int, float]] = None,
877 shrink_towards: int = 0,
878 forced: Optional[int] = None,
879 observe: bool = True,
880 ) -> int:
881 # Validate arguments
882 if weights is not None:
883 assert min_value is not None
884 assert max_value is not None
885 assert len(weights) <= 255 # arbitrary practical limit
886 # We can and should eventually support total weights. But this
887 # complicates shrinking as we can no longer assume we can force
888 # a value to the unmapped probability mass if that mass might be 0.
889 assert sum(weights.values()) < 1
890 # similarly, things get simpler if we assume every value is possible.
891 # we'll want to drop this restriction eventually.
892 assert all(w != 0 for w in weights.values())
893
894 if forced is not None and min_value is not None:
895 assert min_value <= forced
896 if forced is not None and max_value is not None:
897 assert forced <= max_value
898
899 constraints: IntegerConstraints = self._pooled_constraints(
900 "integer",
901 {
902 "min_value": min_value,
903 "max_value": max_value,
904 "weights": weights,
905 "shrink_towards": shrink_towards,
906 },
907 )
908 return self._draw("integer", constraints, observe=observe, forced=forced)
909
910 def draw_float(
911 self,
912 min_value: float = -math.inf,
913 max_value: float = math.inf,
914 *,
915 allow_nan: bool = True,
916 smallest_nonzero_magnitude: float = SMALLEST_SUBNORMAL,
917 # TODO: consider supporting these float widths at the choice sequence
918 # level in the future.
919 # width: Literal[16, 32, 64] = 64,
920 forced: Optional[float] = None,
921 observe: bool = True,
922 ) -> float:
923 assert smallest_nonzero_magnitude > 0
924 assert not math.isnan(min_value)
925 assert not math.isnan(max_value)
926
927 if smallest_nonzero_magnitude == 0.0: # pragma: no cover
928 raise FloatingPointError(
929 "Got allow_subnormal=True, but we can't represent subnormal floats "
930 "right now, in violation of the IEEE-754 floating-point "
931 "specification. This is usually because something was compiled with "
932 "-ffast-math or a similar option, which sets global processor state. "
933 "See https://simonbyrne.github.io/notes/fastmath/ for a more detailed "
934 "writeup - and good luck!"
935 )
936
937 if forced is not None:
938 assert allow_nan or not math.isnan(forced)
939 assert math.isnan(forced) or (
940 sign_aware_lte(min_value, forced) and sign_aware_lte(forced, max_value)
941 )
942
943 constraints: FloatConstraints = self._pooled_constraints(
944 "float",
945 {
946 "min_value": min_value,
947 "max_value": max_value,
948 "allow_nan": allow_nan,
949 "smallest_nonzero_magnitude": smallest_nonzero_magnitude,
950 },
951 )
952 return self._draw("float", constraints, observe=observe, forced=forced)
953
954 def draw_string(
955 self,
956 intervals: IntervalSet,
957 *,
958 min_size: int = 0,
959 max_size: int = COLLECTION_DEFAULT_MAX_SIZE,
960 forced: Optional[str] = None,
961 observe: bool = True,
962 ) -> str:
963 assert forced is None or min_size <= len(forced) <= max_size
964 assert min_size >= 0
965 if len(intervals) == 0:
966 assert min_size == 0
967
968 constraints: StringConstraints = self._pooled_constraints(
969 "string",
970 {
971 "intervals": intervals,
972 "min_size": min_size,
973 "max_size": max_size,
974 },
975 )
976 return self._draw("string", constraints, observe=observe, forced=forced)
977
978 def draw_bytes(
979 self,
980 min_size: int = 0,
981 max_size: int = COLLECTION_DEFAULT_MAX_SIZE,
982 *,
983 forced: Optional[bytes] = None,
984 observe: bool = True,
985 ) -> bytes:
986 assert forced is None or min_size <= len(forced) <= max_size
987 assert min_size >= 0
988
989 constraints: BytesConstraints = self._pooled_constraints(
990 "bytes", {"min_size": min_size, "max_size": max_size}
991 )
992 return self._draw("bytes", constraints, observe=observe, forced=forced)
993
994 def draw_boolean(
995 self,
996 p: float = 0.5,
997 *,
998 forced: Optional[bool] = None,
999 observe: bool = True,
1000 ) -> bool:
1001 assert (forced is not True) or p > 0
1002 assert (forced is not False) or p < 1
1003
1004 constraints: BooleanConstraints = self._pooled_constraints("boolean", {"p": p})
1005 return self._draw("boolean", constraints, observe=observe, forced=forced)
1006
1007 @overload
1008 def _pooled_constraints(
1009 self, choice_type: Literal["integer"], constraints: IntegerConstraints
1010 ) -> IntegerConstraints: ...
1011
1012 @overload
1013 def _pooled_constraints(
1014 self, choice_type: Literal["float"], constraints: FloatConstraints
1015 ) -> FloatConstraints: ...
1016
1017 @overload
1018 def _pooled_constraints(
1019 self, choice_type: Literal["string"], constraints: StringConstraints
1020 ) -> StringConstraints: ...
1021
1022 @overload
1023 def _pooled_constraints(
1024 self, choice_type: Literal["bytes"], constraints: BytesConstraints
1025 ) -> BytesConstraints: ...
1026
1027 @overload
1028 def _pooled_constraints(
1029 self, choice_type: Literal["boolean"], constraints: BooleanConstraints
1030 ) -> BooleanConstraints: ...
1031
1032 def _pooled_constraints(
1033 self, choice_type: ChoiceTypeT, constraints: ChoiceConstraintsT
1034 ) -> ChoiceConstraintsT:
1035 """Memoize common dictionary objects to reduce memory pressure."""
1036 # caching runs afoul of nondeterminism checks
1037 if self.provider.avoid_realization:
1038 return constraints
1039
1040 key = (choice_type, *choice_constraints_key(choice_type, constraints))
1041 try:
1042 return POOLED_CONSTRAINTS_CACHE[key]
1043 except KeyError:
1044 POOLED_CONSTRAINTS_CACHE[key] = constraints
1045 return constraints
1046
1047 def _pop_choice(
1048 self,
1049 choice_type: ChoiceTypeT,
1050 constraints: ChoiceConstraintsT,
1051 *,
1052 forced: Optional[ChoiceT],
1053 ) -> ChoiceT:
1054 assert self.prefix is not None
1055 # checked in _draw
1056 assert self.index < len(self.prefix)
1057
1058 value = self.prefix[self.index]
1059 if isinstance(value, ChoiceTemplate):
1060 node: ChoiceTemplate = value
1061 if node.count is not None:
1062 assert node.count >= 0
1063 # node templates have to be at the end for now, since it's not immediately
1064 # apparent how to handle overruning a node template while generating a single
1065 # node if the alternative is not "the entire data is an overrun".
1066 assert self.index == len(self.prefix) - 1
1067 if node.type == "simplest":
1068 if forced is not None:
1069 choice = forced
1070 try:
1071 choice = choice_from_index(0, choice_type, constraints)
1072 except ChoiceTooLarge:
1073 self.mark_overrun()
1074 else:
1075 raise NotImplementedError
1076
1077 if node.count is not None:
1078 node.count -= 1
1079 if node.count < 0:
1080 self.mark_overrun()
1081 return choice
1082
1083 choice = value
1084 node_choice_type = {
1085 str: "string",
1086 float: "float",
1087 int: "integer",
1088 bool: "boolean",
1089 bytes: "bytes",
1090 }[type(choice)]
1091 # If we're trying to:
1092 # * draw a different choice type at the same location
1093 # * draw the same choice type with a different constraints, which does not permit
1094 # the current value
1095 #
1096 # then we call this a misalignment, because the choice sequence has
1097 # changed from what we expected at some point. An easy misalignment is
1098 #
1099 # one_of(integers(0, 100), integers(101, 200))
1100 #
1101 # where the choice sequence [0, 100] has constraints {min_value: 0, max_value: 100}
1102 # at index 1, but [0, 101] has constraints {min_value: 101, max_value: 200} at
1103 # index 1 (which does not permit any of the values 0-100).
1104 #
1105 # When the choice sequence becomes misaligned, we generate a new value of the
1106 # type and constraints the strategy expects.
1107 if node_choice_type != choice_type or not choice_permitted(choice, constraints):
1108 # only track first misalignment for now.
1109 if self.misaligned_at is None:
1110 self.misaligned_at = (self.index, choice_type, constraints, forced)
1111 try:
1112 # Fill in any misalignments with index 0 choices. An alternative to
1113 # this is using the index of the misaligned choice instead
1114 # of index 0, which may be useful for maintaining
1115 # "similarly-complex choices" in the shrinker. This requires
1116 # attaching an index to every choice in ConjectureData.for_choices,
1117 # which we don't always have (e.g. when reading from db).
1118 #
1119 # If we really wanted this in the future we could make this complexity
1120 # optional, use it if present, and default to index 0 otherwise.
1121 # This complicates our internal api and so I'd like to avoid it
1122 # if possible.
1123 #
1124 # Additionally, I don't think slips which require
1125 # slipping to high-complexity values are common. Though arguably
1126 # we may want to expand a bit beyond *just* the simplest choice.
1127 # (we could for example consider sampling choices from index 0-10).
1128 choice = choice_from_index(0, choice_type, constraints)
1129 except ChoiceTooLarge:
1130 # should really never happen with a 0-index choice, but let's be safe.
1131 self.mark_overrun()
1132
1133 self.index += 1
1134 return choice
1135
1136 def as_result(self) -> Union[ConjectureResult, _Overrun]:
1137 """Convert the result of running this test into
1138 either an Overrun object or a ConjectureResult."""
1139
1140 assert self.frozen
1141 if self.status == Status.OVERRUN:
1142 return Overrun
1143 if self.__result is None:
1144 self.__result = ConjectureResult(
1145 status=self.status,
1146 interesting_origin=self.interesting_origin,
1147 spans=self.spans,
1148 nodes=self.nodes,
1149 length=self.length,
1150 output=self.output,
1151 expected_traceback=self.expected_traceback,
1152 expected_exception=self.expected_exception,
1153 has_discards=self.has_discards,
1154 target_observations=self.target_observations,
1155 tags=frozenset(self.tags),
1156 arg_slices=self.arg_slices,
1157 slice_comments=self.slice_comments,
1158 misaligned_at=self.misaligned_at,
1159 cannot_proceed_scope=self.cannot_proceed_scope,
1160 )
1161 assert self.__result is not None
1162 return self.__result
1163
1164 def __assert_not_frozen(self, name: str) -> None:
1165 if self.frozen:
1166 raise Frozen(f"Cannot call {name} on frozen ConjectureData")
1167
1168 def note(self, value: Any) -> None:
1169 self.__assert_not_frozen("note")
1170 if not isinstance(value, str):
1171 value = repr(value)
1172 self.output += value
1173
1174 def draw(
1175 self,
1176 strategy: "SearchStrategy[Ex]",
1177 label: Optional[int] = None,
1178 observe_as: Optional[str] = None,
1179 ) -> "Ex":
1180 from hypothesis.internal.observability import observability_enabled
1181 from hypothesis.strategies._internal.lazy import unwrap_strategies
1182 from hypothesis.strategies._internal.utils import to_jsonable
1183
1184 at_top_level = self.depth == 0
1185 start_time = None
1186 if at_top_level:
1187 # We start this timer early, because accessing attributes on a LazyStrategy
1188 # can be almost arbitrarily slow. In cases like characters() and text()
1189 # where we cache something expensive, this led to Flaky deadline errors!
1190 # See https://github.com/HypothesisWorks/hypothesis/issues/2108
1191 start_time = time.perf_counter()
1192 gc_start_time = gc_cumulative_time()
1193
1194 strategy.validate()
1195
1196 if strategy.is_empty:
1197 self.mark_invalid(f"empty strategy {self!r}")
1198
1199 if self.depth >= MAX_DEPTH:
1200 self.mark_invalid("max depth exceeded")
1201
1202 # Jump directly to the unwrapped strategy for the label and for do_draw.
1203 # This avoids adding an extra span to all lazy strategies.
1204 unwrapped = unwrap_strategies(strategy)
1205 if label is None:
1206 label = unwrapped.label
1207 assert isinstance(label, int)
1208
1209 self.start_span(label=label)
1210 try:
1211 if not at_top_level:
1212 return unwrapped.do_draw(self)
1213 assert start_time is not None
1214 key = observe_as or f"generate:unlabeled_{len(self.draw_times)}"
1215 try:
1216 try:
1217 v = unwrapped.do_draw(self)
1218 finally:
1219 # Subtract the time spent in GC to avoid overcounting, as it is
1220 # accounted for at the overall example level.
1221 in_gctime = gc_cumulative_time() - gc_start_time
1222 self.draw_times[key] = time.perf_counter() - start_time - in_gctime
1223 except Exception as err:
1224 add_note(
1225 err,
1226 f"while generating {key.removeprefix('generate:')!r} from {strategy!r}",
1227 )
1228 raise
1229 if observability_enabled():
1230 avoid = self.provider.avoid_realization
1231 self._observability_args[key] = to_jsonable(v, avoid_realization=avoid)
1232 return v
1233 finally:
1234 self.stop_span()
1235
1236 def start_span(self, label: int) -> None:
1237 self.provider.span_start(label)
1238 self.__assert_not_frozen("start_span")
1239 self.depth += 1
1240 # Logically it would make sense for this to just be
1241 # ``self.depth = max(self.depth, self.max_depth)``, which is what it used to
1242 # be until we ran the code under tracemalloc and found a rather significant
1243 # chunk of allocation was happening here. This was presumably due to varargs
1244 # or the like, but we didn't investigate further given that it was easy
1245 # to fix with this check.
1246 if self.depth > self.max_depth:
1247 self.max_depth = self.depth
1248 self.__span_record.start_span(label)
1249 self.labels_for_structure_stack.append({label})
1250
1251 def stop_span(self, *, discard: bool = False) -> None:
1252 self.provider.span_end(discard)
1253 if self.frozen:
1254 return
1255 if discard:
1256 self.has_discards = True
1257 self.depth -= 1
1258 assert self.depth >= -1
1259 self.__span_record.stop_span(discard=discard)
1260
1261 labels_for_structure = self.labels_for_structure_stack.pop()
1262
1263 if not discard:
1264 if self.labels_for_structure_stack:
1265 self.labels_for_structure_stack[-1].update(labels_for_structure)
1266 else:
1267 self.tags.update([structural_coverage(l) for l in labels_for_structure])
1268
1269 if discard:
1270 # Once we've discarded a span, every test case starting with
1271 # this prefix contains discards. We prune the tree at that point so
1272 # as to avoid future test cases bothering with this region, on the
1273 # assumption that some span that you could have used instead
1274 # there would *not* trigger the discard. This greatly speeds up
1275 # test case generation in some cases, because it allows us to
1276 # ignore large swathes of the search space that are effectively
1277 # redundant.
1278 #
1279 # A scenario that can cause us problems but which we deliberately
1280 # have decided not to support is that if there are side effects
1281 # during data generation then you may end up with a scenario where
1282 # every good test case generates a discard because the discarded
1283 # section sets up important things for later. This is not terribly
1284 # likely and all that you see in this case is some degradation in
1285 # quality of testing, so we don't worry about it.
1286 #
1287 # Note that killing the branch does *not* mean we will never
1288 # explore below this point, and in particular we may do so during
1289 # shrinking. Any explicit request for a data object that starts
1290 # with the branch here will work just fine, but novel prefix
1291 # generation will avoid it, and we can use it to detect when we
1292 # have explored the entire tree (up to redundancy).
1293
1294 self.observer.kill_branch()
1295
1296 @property
1297 def spans(self) -> Spans:
1298 assert self.frozen
1299 if self.__spans is None:
1300 self.__spans = Spans(record=self.__span_record)
1301 return self.__spans
1302
1303 def freeze(self) -> None:
1304 if self.frozen:
1305 return
1306 self.finish_time = time.perf_counter()
1307 self.gc_finish_time = gc_cumulative_time()
1308
1309 # Always finish by closing all remaining spans so that we have a valid tree.
1310 while self.depth >= 0:
1311 self.stop_span()
1312
1313 self.__span_record.freeze()
1314 self.frozen = True
1315 self.observer.conclude_test(self.status, self.interesting_origin)
1316
1317 def choice(
1318 self,
1319 values: Sequence[T],
1320 *,
1321 forced: Optional[T] = None,
1322 observe: bool = True,
1323 ) -> T:
1324 forced_i = None if forced is None else values.index(forced)
1325 i = self.draw_integer(
1326 0,
1327 len(values) - 1,
1328 forced=forced_i,
1329 observe=observe,
1330 )
1331 return values[i]
1332
1333 def conclude_test(
1334 self,
1335 status: Status,
1336 interesting_origin: Optional[InterestingOrigin] = None,
1337 ) -> NoReturn:
1338 assert (interesting_origin is None) or (status == Status.INTERESTING)
1339 self.__assert_not_frozen("conclude_test")
1340 self.interesting_origin = interesting_origin
1341 self.status = status
1342 self.freeze()
1343 raise StopTest(self.testcounter)
1344
1345 def mark_interesting(self, interesting_origin: InterestingOrigin) -> NoReturn:
1346 self.conclude_test(Status.INTERESTING, interesting_origin)
1347
1348 def mark_invalid(self, why: Optional[str] = None) -> NoReturn:
1349 if why is not None:
1350 self.events["invalid because"] = why
1351 self.conclude_test(Status.INVALID)
1352
1353 def mark_overrun(self) -> NoReturn:
1354 self.conclude_test(Status.OVERRUN)
1355
1356
1357def draw_choice(
1358 choice_type: ChoiceTypeT, constraints: ChoiceConstraintsT, *, random: Random
1359) -> ChoiceT:
1360 cd = ConjectureData(random=random)
1361 return cast(ChoiceT, getattr(cd.provider, f"draw_{choice_type}")(**constraints))