1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
10
11import math
12import time
13from collections import defaultdict
14from collections.abc import Hashable, Iterable, Iterator, Sequence
15from enum import IntEnum
16from functools import cached_property
17from random import Random
18from typing import (
19 TYPE_CHECKING,
20 Any,
21 Literal,
22 NoReturn,
23 Optional,
24 TypeVar,
25 Union,
26 cast,
27 overload,
28)
29
30import attr
31
32from hypothesis.errors import (
33 CannotProceedScopeT,
34 ChoiceTooLarge,
35 Frozen,
36 InvalidArgument,
37 StopTest,
38)
39from hypothesis.internal.cache import LRUCache
40from hypothesis.internal.compat import add_note
41from hypothesis.internal.conjecture.choice import (
42 BooleanConstraints,
43 BytesConstraints,
44 ChoiceConstraintsT,
45 ChoiceNode,
46 ChoiceT,
47 ChoiceTemplate,
48 ChoiceTypeT,
49 FloatConstraints,
50 IntegerConstraints,
51 StringConstraints,
52 choice_constraints_key,
53 choice_from_index,
54 choice_permitted,
55 choices_size,
56)
57from hypothesis.internal.conjecture.junkdrawer import IntList, gc_cumulative_time
58from hypothesis.internal.conjecture.providers import (
59 COLLECTION_DEFAULT_MAX_SIZE,
60 HypothesisProvider,
61 PrimitiveProvider,
62)
63from hypothesis.internal.conjecture.utils import calc_label_from_name
64from hypothesis.internal.escalation import InterestingOrigin
65from hypothesis.internal.floats import (
66 SMALLEST_SUBNORMAL,
67 float_to_int,
68 int_to_float,
69 sign_aware_lte,
70)
71from hypothesis.internal.intervalsets import IntervalSet
72from hypothesis.internal.observability import PredicateCounts
73from hypothesis.reporting import debug_report
74from hypothesis.utils.conventions import not_set
75from hypothesis.utils.threading import ThreadLocal
76
77if TYPE_CHECKING:
78 from typing import TypeAlias
79
80 from hypothesis.strategies import SearchStrategy
81 from hypothesis.strategies._internal.strategies import Ex
82
83
84def __getattr__(name: str) -> Any:
85 if name == "AVAILABLE_PROVIDERS":
86 from hypothesis._settings import note_deprecation
87 from hypothesis.internal.conjecture.providers import AVAILABLE_PROVIDERS
88
89 note_deprecation(
90 "hypothesis.internal.conjecture.data.AVAILABLE_PROVIDERS has been moved to "
91 "hypothesis.internal.conjecture.providers.AVAILABLE_PROVIDERS.",
92 since="2025-01-25",
93 has_codemod=False,
94 stacklevel=1,
95 )
96 return AVAILABLE_PROVIDERS
97
98 raise AttributeError(
99 f"Module 'hypothesis.internal.conjecture.data' has no attribute {name}"
100 )
101
102
103T = TypeVar("T")
104TargetObservations = dict[str, Union[int, float]]
105# index, choice_type, constraints, forced value
106MisalignedAt: "TypeAlias" = tuple[
107 int, ChoiceTypeT, ChoiceConstraintsT, Optional[ChoiceT]
108]
109
110TOP_LABEL = calc_label_from_name("top")
111MAX_DEPTH = 100
112
113threadlocal = ThreadLocal(global_test_counter=int)
114
115
116class ExtraInformation:
117 """A class for holding shared state on a ``ConjectureData`` that should
118 be added to the final ``ConjectureResult``."""
119
120 def __repr__(self) -> str:
121 return "ExtraInformation({})".format(
122 ", ".join(f"{k}={v!r}" for k, v in self.__dict__.items()),
123 )
124
125 def has_information(self) -> bool:
126 return bool(self.__dict__)
127
128
129class Status(IntEnum):
130 OVERRUN = 0
131 INVALID = 1
132 VALID = 2
133 INTERESTING = 3
134
135 def __repr__(self) -> str:
136 return f"Status.{self.name}"
137
138
139@attr.s(slots=True, frozen=True)
140class StructuralCoverageTag:
141 label: int = attr.ib()
142
143
144STRUCTURAL_COVERAGE_CACHE: dict[int, StructuralCoverageTag] = {}
145
146
147def structural_coverage(label: int) -> StructuralCoverageTag:
148 try:
149 return STRUCTURAL_COVERAGE_CACHE[label]
150 except KeyError:
151 return STRUCTURAL_COVERAGE_CACHE.setdefault(label, StructuralCoverageTag(label))
152
153
154# This cache can be quite hot and so we prefer LRUCache over LRUReusedCache for
155# performance. We lose scan resistance, but that's probably fine here.
156POOLED_CONSTRAINTS_CACHE: LRUCache[tuple[Any, ...], ChoiceConstraintsT] = LRUCache(4096)
157
158
159class Span:
160 """A span tracks the hierarchical structure of choices within a single test run.
161
162 Spans are created to mark regions of the choice sequence that that are
163 logically related to each other. For instance, Hypothesis tracks:
164 - A single top-level span for the entire choice sequence
165 - A span for the choices made by each strategy
166 - Some strategies define additional spans within their choices. For instance,
167 st.lists() tracks the "should add another element" choice and the "add
168 another element" choices as separate spans.
169
170 Spans provide useful information to the shrinker, mutator, targeted PBT,
171 and other subsystems of Hypothesis.
172
173 Rather than store each ``Span`` as a rich object, it is actually
174 just an index into the ``Spans`` class defined below. This has two
175 purposes: Firstly, for most properties of spans we will never need
176 to allocate storage at all, because most properties are not used on
177 most spans. Secondly, by storing the spans as compact lists
178 of integers, we save a considerable amount of space compared to
179 Python's normal object size.
180
181 This does have the downside that it increases the amount of allocation
182 we do, and slows things down as a result, in some usage patterns because
183 we repeatedly allocate the same Span or int objects, but it will
184 often dramatically reduce our memory usage, so is worth it.
185 """
186
187 __slots__ = ("index", "owner")
188
189 def __init__(self, owner: "Spans", index: int) -> None:
190 self.owner = owner
191 self.index = index
192
193 def __eq__(self, other: object) -> bool:
194 if self is other:
195 return True
196 if not isinstance(other, Span):
197 return NotImplemented
198 return (self.owner is other.owner) and (self.index == other.index)
199
200 def __ne__(self, other: object) -> bool:
201 if self is other:
202 return False
203 if not isinstance(other, Span):
204 return NotImplemented
205 return (self.owner is not other.owner) or (self.index != other.index)
206
207 def __repr__(self) -> str:
208 return f"spans[{self.index}]"
209
210 @property
211 def label(self) -> int:
212 """A label is an opaque value that associates each span with its
213 approximate origin, such as a particular strategy class or a particular
214 kind of draw."""
215 return self.owner.labels[self.owner.label_indices[self.index]]
216
217 @property
218 def parent(self) -> Optional[int]:
219 """The index of the span that this one is nested directly within."""
220 if self.index == 0:
221 return None
222 return self.owner.parentage[self.index]
223
224 @property
225 def start(self) -> int:
226 return self.owner.starts[self.index]
227
228 @property
229 def end(self) -> int:
230 return self.owner.ends[self.index]
231
232 @property
233 def depth(self) -> int:
234 """
235 Depth of this span in the span tree. The top-level span has a depth of 0.
236 """
237 return self.owner.depths[self.index]
238
239 @property
240 def discarded(self) -> bool:
241 """True if this is span's ``stop_span`` call had ``discard`` set to
242 ``True``. This means we believe that the shrinker should be able to delete
243 this span completely, without affecting the value produced by its enclosing
244 strategy. Typically set when a rejection sampler decides to reject a
245 generated value and try again."""
246 return self.index in self.owner.discarded
247
248 @property
249 def choice_count(self) -> int:
250 """The number of choices in this span."""
251 return self.end - self.start
252
253 @property
254 def children(self) -> "list[Span]":
255 """The list of all spans with this as a parent, in increasing index
256 order."""
257 return [self.owner[i] for i in self.owner.children[self.index]]
258
259
260class SpanProperty:
261 """There are many properties of spans that we calculate by
262 essentially rerunning the test case multiple times based on the
263 calls which we record in SpanProperty.
264
265 This class defines a visitor, subclasses of which can be used
266 to calculate these properties.
267 """
268
269 def __init__(self, spans: "Spans"):
270 self.span_stack: list[int] = []
271 self.spans = spans
272 self.span_count = 0
273 self.choice_count = 0
274
275 def run(self) -> Any:
276 """Rerun the test case with this visitor and return the
277 results of ``self.finish()``."""
278 for record in self.spans.trail:
279 if record == TrailType.STOP_SPAN_DISCARD:
280 self.__pop(discarded=True)
281 elif record == TrailType.STOP_SPAN_NO_DISCARD:
282 self.__pop(discarded=False)
283 elif record == TrailType.CHOICE:
284 self.choice_count += 1
285 else:
286 # everything after TrailType.CHOICE is the label of a span start.
287 self.__push(record - TrailType.CHOICE - 1)
288
289 return self.finish()
290
291 def __push(self, label_index: int) -> None:
292 i = self.span_count
293 assert i < len(self.spans)
294 self.start_span(i, label_index=label_index)
295 self.span_count += 1
296 self.span_stack.append(i)
297
298 def __pop(self, *, discarded: bool) -> None:
299 i = self.span_stack.pop()
300 self.stop_span(i, discarded=discarded)
301
302 def start_span(self, i: int, label_index: int) -> None:
303 """Called at the start of each span, with ``i`` the
304 index of the span and ``label_index`` the index of
305 its label in ``self.spans.labels``."""
306
307 def stop_span(self, i: int, *, discarded: bool) -> None:
308 """Called at the end of each span, with ``i`` the
309 index of the span and ``discarded`` being ``True`` if ``stop_span``
310 was called with ``discard=True``."""
311
312 def finish(self) -> Any:
313 raise NotImplementedError
314
315
316class TrailType(IntEnum):
317 STOP_SPAN_DISCARD = 1
318 STOP_SPAN_NO_DISCARD = 2
319 CHOICE = 3
320 # every trail element larger than TrailType.CHOICE is the label of a span
321 # start, offset by its index. So the first span label is stored as 4, the
322 # second as 5, etc, regardless of its actual integer label.
323
324
325class SpanRecord:
326 """Records the series of ``start_span``, ``stop_span``, and
327 ``draw_bits`` calls so that these may be stored in ``Spans`` and
328 replayed when we need to know about the structure of individual
329 ``Span`` objects.
330
331 Note that there is significant similarity between this class and
332 ``DataObserver``, and the plan is to eventually unify them, but
333 they currently have slightly different functions and implementations.
334 """
335
336 def __init__(self) -> None:
337 self.labels: list[int] = []
338 self.__index_of_labels: Optional[dict[int, int]] = {}
339 self.trail = IntList()
340 self.nodes: list[ChoiceNode] = []
341
342 def freeze(self) -> None:
343 self.__index_of_labels = None
344
345 def record_choice(self) -> None:
346 self.trail.append(TrailType.CHOICE)
347
348 def start_span(self, label: int) -> None:
349 assert self.__index_of_labels is not None
350 try:
351 i = self.__index_of_labels[label]
352 except KeyError:
353 i = self.__index_of_labels.setdefault(label, len(self.labels))
354 self.labels.append(label)
355 self.trail.append(TrailType.CHOICE + 1 + i)
356
357 def stop_span(self, *, discard: bool) -> None:
358 if discard:
359 self.trail.append(TrailType.STOP_SPAN_DISCARD)
360 else:
361 self.trail.append(TrailType.STOP_SPAN_NO_DISCARD)
362
363
364class _starts_and_ends(SpanProperty):
365 def __init__(self, spans: "Spans") -> None:
366 super().__init__(spans)
367 self.starts = IntList.of_length(len(self.spans))
368 self.ends = IntList.of_length(len(self.spans))
369
370 def start_span(self, i: int, label_index: int) -> None:
371 self.starts[i] = self.choice_count
372
373 def stop_span(self, i: int, *, discarded: bool) -> None:
374 self.ends[i] = self.choice_count
375
376 def finish(self) -> tuple[IntList, IntList]:
377 return (self.starts, self.ends)
378
379
380class _discarded(SpanProperty):
381 def __init__(self, spans: "Spans") -> None:
382 super().__init__(spans)
383 self.result: set[int] = set()
384
385 def finish(self) -> frozenset[int]:
386 return frozenset(self.result)
387
388 def stop_span(self, i: int, *, discarded: bool) -> None:
389 if discarded:
390 self.result.add(i)
391
392
393class _parentage(SpanProperty):
394 def __init__(self, spans: "Spans") -> None:
395 super().__init__(spans)
396 self.result = IntList.of_length(len(self.spans))
397
398 def stop_span(self, i: int, *, discarded: bool) -> None:
399 if i > 0:
400 self.result[i] = self.span_stack[-1]
401
402 def finish(self) -> IntList:
403 return self.result
404
405
406class _depths(SpanProperty):
407 def __init__(self, spans: "Spans") -> None:
408 super().__init__(spans)
409 self.result = IntList.of_length(len(self.spans))
410
411 def start_span(self, i: int, label_index: int) -> None:
412 self.result[i] = len(self.span_stack)
413
414 def finish(self) -> IntList:
415 return self.result
416
417
418class _label_indices(SpanProperty):
419 def __init__(self, spans: "Spans") -> None:
420 super().__init__(spans)
421 self.result = IntList.of_length(len(self.spans))
422
423 def start_span(self, i: int, label_index: int) -> None:
424 self.result[i] = label_index
425
426 def finish(self) -> IntList:
427 return self.result
428
429
430class _mutator_groups(SpanProperty):
431 def __init__(self, spans: "Spans") -> None:
432 super().__init__(spans)
433 self.groups: dict[int, set[tuple[int, int]]] = defaultdict(set)
434
435 def start_span(self, i: int, label_index: int) -> None:
436 # TODO should we discard start == end cases? occurs for eg st.data()
437 # which is conditionally or never drawn from. arguably swapping
438 # nodes with the empty list is a useful mutation enabled by start == end?
439 key = (self.spans[i].start, self.spans[i].end)
440 self.groups[label_index].add(key)
441
442 def finish(self) -> Iterable[set[tuple[int, int]]]:
443 # Discard groups with only one span, since the mutator can't
444 # do anything useful with them.
445 return [g for g in self.groups.values() if len(g) >= 2]
446
447
448class Spans:
449 """A lazy collection of ``Span`` objects, derived from
450 the record of recorded behaviour in ``SpanRecord``.
451
452 Behaves logically as if it were a list of ``Span`` objects,
453 but actually mostly exists as a compact store of information
454 for them to reference into. All properties on here are best
455 understood as the backing storage for ``Span`` and are
456 described there.
457 """
458
459 def __init__(self, record: SpanRecord) -> None:
460 self.trail = record.trail
461 self.labels = record.labels
462 self.__length = self.trail.count(
463 TrailType.STOP_SPAN_DISCARD
464 ) + record.trail.count(TrailType.STOP_SPAN_NO_DISCARD)
465 self.__children: Optional[list[Sequence[int]]] = None
466
467 @cached_property
468 def starts_and_ends(self) -> tuple[IntList, IntList]:
469 return _starts_and_ends(self).run()
470
471 @property
472 def starts(self) -> IntList:
473 return self.starts_and_ends[0]
474
475 @property
476 def ends(self) -> IntList:
477 return self.starts_and_ends[1]
478
479 @cached_property
480 def discarded(self) -> frozenset[int]:
481 return _discarded(self).run()
482
483 @cached_property
484 def parentage(self) -> IntList:
485 return _parentage(self).run()
486
487 @cached_property
488 def depths(self) -> IntList:
489 return _depths(self).run()
490
491 @cached_property
492 def label_indices(self) -> IntList:
493 return _label_indices(self).run()
494
495 @cached_property
496 def mutator_groups(self) -> list[set[tuple[int, int]]]:
497 return _mutator_groups(self).run()
498
499 @property
500 def children(self) -> list[Sequence[int]]:
501 if self.__children is None:
502 children = [IntList() for _ in range(len(self))]
503 for i, p in enumerate(self.parentage):
504 if i > 0:
505 children[p].append(i)
506 # Replace empty children lists with a tuple to reduce
507 # memory usage.
508 for i, c in enumerate(children):
509 if not c:
510 children[i] = () # type: ignore
511 self.__children = children # type: ignore
512 return self.__children # type: ignore
513
514 def __len__(self) -> int:
515 return self.__length
516
517 def __getitem__(self, i: int) -> Span:
518 n = self.__length
519 if i < -n or i >= n:
520 raise IndexError(f"Index {i} out of range [-{n}, {n})")
521 if i < 0:
522 i += n
523 return Span(self, i)
524
525 # not strictly necessary as we have len/getitem, but required for mypy.
526 # https://github.com/python/mypy/issues/9737
527 def __iter__(self) -> Iterator[Span]:
528 for i in range(len(self)):
529 yield self[i]
530
531
532class _Overrun:
533 status: Status = Status.OVERRUN
534
535 def __repr__(self) -> str:
536 return "Overrun"
537
538
539Overrun = _Overrun()
540
541
542class DataObserver:
543 """Observer class for recording the behaviour of a
544 ConjectureData object, primarily used for tracking
545 the behaviour in the tree cache."""
546
547 def conclude_test(
548 self,
549 status: Status,
550 interesting_origin: Optional[InterestingOrigin],
551 ) -> None:
552 """Called when ``conclude_test`` is called on the
553 observed ``ConjectureData``, with the same arguments.
554
555 Note that this is called after ``freeze`` has completed.
556 """
557
558 def kill_branch(self) -> None:
559 """Mark this part of the tree as not worth re-exploring."""
560
561 def draw_integer(
562 self, value: int, *, constraints: IntegerConstraints, was_forced: bool
563 ) -> None:
564 pass
565
566 def draw_float(
567 self, value: float, *, constraints: FloatConstraints, was_forced: bool
568 ) -> None:
569 pass
570
571 def draw_string(
572 self, value: str, *, constraints: StringConstraints, was_forced: bool
573 ) -> None:
574 pass
575
576 def draw_bytes(
577 self, value: bytes, *, constraints: BytesConstraints, was_forced: bool
578 ) -> None:
579 pass
580
581 def draw_boolean(
582 self, value: bool, *, constraints: BooleanConstraints, was_forced: bool
583 ) -> None:
584 pass
585
586
587@attr.s(slots=True)
588class ConjectureResult:
589 """Result class storing the parts of ConjectureData that we
590 will care about after the original ConjectureData has outlived its
591 usefulness."""
592
593 status: Status = attr.ib()
594 interesting_origin: Optional[InterestingOrigin] = attr.ib()
595 nodes: tuple[ChoiceNode, ...] = attr.ib(eq=False, repr=False)
596 length: int = attr.ib()
597 output: str = attr.ib()
598 extra_information: Optional[ExtraInformation] = attr.ib()
599 expected_exception: Optional[BaseException] = attr.ib()
600 expected_traceback: Optional[str] = attr.ib()
601 has_discards: bool = attr.ib()
602 target_observations: TargetObservations = attr.ib()
603 tags: frozenset[StructuralCoverageTag] = attr.ib()
604 spans: Spans = attr.ib(repr=False, eq=False)
605 arg_slices: set[tuple[int, int]] = attr.ib(repr=False)
606 slice_comments: dict[tuple[int, int], str] = attr.ib(repr=False)
607 misaligned_at: Optional[MisalignedAt] = attr.ib(repr=False)
608 cannot_proceed_scope: Optional[CannotProceedScopeT] = attr.ib(repr=False)
609
610 def as_result(self) -> "ConjectureResult":
611 return self
612
613 @property
614 def choices(self) -> tuple[ChoiceT, ...]:
615 return tuple(node.value for node in self.nodes)
616
617
618class ConjectureData:
619 @classmethod
620 def for_choices(
621 cls,
622 choices: Sequence[Union[ChoiceTemplate, ChoiceT]],
623 *,
624 observer: Optional[DataObserver] = None,
625 provider: Union[type, PrimitiveProvider] = HypothesisProvider,
626 random: Optional[Random] = None,
627 ) -> "ConjectureData":
628 from hypothesis.internal.conjecture.engine import choice_count
629
630 return cls(
631 max_choices=choice_count(choices),
632 random=random,
633 prefix=choices,
634 observer=observer,
635 provider=provider,
636 )
637
638 def __init__(
639 self,
640 *,
641 random: Optional[Random],
642 observer: Optional[DataObserver] = None,
643 provider: Union[type, PrimitiveProvider] = HypothesisProvider,
644 prefix: Optional[Sequence[Union[ChoiceTemplate, ChoiceT]]] = None,
645 max_choices: Optional[int] = None,
646 provider_kw: Optional[dict[str, Any]] = None,
647 ) -> None:
648 from hypothesis.internal.conjecture.engine import BUFFER_SIZE
649
650 if observer is None:
651 observer = DataObserver()
652 if provider_kw is None:
653 provider_kw = {}
654 elif not isinstance(provider, type):
655 raise InvalidArgument(
656 f"Expected {provider=} to be a class since {provider_kw=} was "
657 "passed, but got an instance instead."
658 )
659
660 assert isinstance(observer, DataObserver)
661 self.observer = observer
662 self.max_choices = max_choices
663 self.max_length = BUFFER_SIZE
664 self.is_find = False
665 self.overdraw = 0
666 self._random = random
667
668 self.length = 0
669 self.index = 0
670 self.output = ""
671 self.status = Status.VALID
672 self.frozen = False
673 self.testcounter = threadlocal.global_test_counter
674 threadlocal.global_test_counter += 1
675 self.start_time = time.perf_counter()
676 self.gc_start_time = gc_cumulative_time()
677 self.events: dict[str, Union[str, int, float]] = {}
678 self.interesting_origin: Optional[InterestingOrigin] = None
679 self.draw_times: dict[str, float] = {}
680 self._stateful_run_times: dict[str, float] = defaultdict(float)
681 self.max_depth = 0
682 self.has_discards = False
683
684 self.provider: PrimitiveProvider = (
685 provider(self, **provider_kw) if isinstance(provider, type) else provider
686 )
687 assert isinstance(self.provider, PrimitiveProvider)
688
689 self.__result: Optional[ConjectureResult] = None
690
691 # Observations used for targeted search. They'll be aggregated in
692 # ConjectureRunner.generate_new_examples and fed to TargetSelector.
693 self.target_observations: TargetObservations = {}
694
695 # Tags which indicate something about which part of the search space
696 # this example is in. These are used to guide generation.
697 self.tags: set[StructuralCoverageTag] = set()
698 self.labels_for_structure_stack: list[set[int]] = []
699
700 # Normally unpopulated but we need this in the niche case
701 # that self.as_result() is Overrun but we still want the
702 # examples for reporting purposes.
703 self.__spans: Optional[Spans] = None
704
705 # We want the top level span to have depth 0, so we start
706 # at -1.
707 self.depth = -1
708 self.__span_record = SpanRecord()
709
710 # Slice indices for discrete reportable parts that which-parts-matter can
711 # try varying, to report if the minimal example always fails anyway.
712 self.arg_slices: set[tuple[int, int]] = set()
713 self.slice_comments: dict[tuple[int, int], str] = {}
714 self._observability_args: dict[str, Any] = {}
715 self._observability_predicates: defaultdict[str, PredicateCounts] = defaultdict(
716 PredicateCounts
717 )
718 self._sampled_from_all_strategies_elements_message: Optional[
719 tuple[str, object]
720 ] = None
721 self._shared_strategy_draws: dict[Hashable, tuple[int, Any]] = {}
722 self.hypothesis_runner = not_set
723
724 self.expected_exception: Optional[BaseException] = None
725 self.expected_traceback: Optional[str] = None
726 self.extra_information = ExtraInformation()
727
728 self.prefix = prefix
729 self.nodes: tuple[ChoiceNode, ...] = ()
730 self.misaligned_at: Optional[MisalignedAt] = None
731 self.cannot_proceed_scope: Optional[CannotProceedScopeT] = None
732 self.start_span(TOP_LABEL)
733
734 def __repr__(self) -> str:
735 return "ConjectureData(%s, %d choices%s)" % (
736 self.status.name,
737 len(self.nodes),
738 ", frozen" if self.frozen else "",
739 )
740
741 @property
742 def choices(self) -> tuple[ChoiceT, ...]:
743 return tuple(node.value for node in self.nodes)
744
745 # draw_* functions might be called in one of two contexts: either "above" or
746 # "below" the choice sequence. For instance, draw_string calls draw_boolean
747 # from ``many`` when calculating the number of characters to return. We do
748 # not want these choices to get written to the choice sequence, because they
749 # are not true choices themselves.
750 #
751 # `observe` formalizes this. The choice will only be written to the choice
752 # sequence if observe is True.
753
754 @overload
755 def _draw(
756 self,
757 choice_type: Literal["integer"],
758 constraints: IntegerConstraints,
759 *,
760 observe: bool,
761 forced: Optional[int],
762 ) -> int: ...
763
764 @overload
765 def _draw(
766 self,
767 choice_type: Literal["float"],
768 constraints: FloatConstraints,
769 *,
770 observe: bool,
771 forced: Optional[float],
772 ) -> float: ...
773
774 @overload
775 def _draw(
776 self,
777 choice_type: Literal["string"],
778 constraints: StringConstraints,
779 *,
780 observe: bool,
781 forced: Optional[str],
782 ) -> str: ...
783
784 @overload
785 def _draw(
786 self,
787 choice_type: Literal["bytes"],
788 constraints: BytesConstraints,
789 *,
790 observe: bool,
791 forced: Optional[bytes],
792 ) -> bytes: ...
793
794 @overload
795 def _draw(
796 self,
797 choice_type: Literal["boolean"],
798 constraints: BooleanConstraints,
799 *,
800 observe: bool,
801 forced: Optional[bool],
802 ) -> bool: ...
803
804 def _draw(
805 self,
806 choice_type: ChoiceTypeT,
807 constraints: ChoiceConstraintsT,
808 *,
809 observe: bool,
810 forced: Optional[ChoiceT],
811 ) -> ChoiceT:
812 # this is somewhat redundant with the length > max_length check at the
813 # end of the function, but avoids trying to use a null self.random when
814 # drawing past the node of a ConjectureData.for_choices data.
815 if self.length == self.max_length:
816 debug_report(f"overrun because hit {self.max_length=}")
817 self.mark_overrun()
818 if len(self.nodes) == self.max_choices:
819 debug_report(f"overrun because hit {self.max_choices=}")
820 self.mark_overrun()
821
822 if observe and self.prefix is not None and self.index < len(self.prefix):
823 value = self._pop_choice(choice_type, constraints, forced=forced)
824 elif forced is None:
825 value = getattr(self.provider, f"draw_{choice_type}")(**constraints)
826
827 if forced is not None:
828 value = forced
829
830 # nan values generated via int_to_float break list membership:
831 #
832 # >>> n = 18444492273895866368
833 # >>> assert math.isnan(int_to_float(n))
834 # >>> assert int_to_float(n) not in [int_to_float(n)]
835 #
836 # because int_to_float nans are not equal in the sense of either
837 # `a == b` or `a is b`.
838 #
839 # This can lead to flaky errors when collections require unique
840 # floats. What was happening is that in some places we provided math.nan
841 # provide math.nan, and in others we provided
842 # int_to_float(float_to_int(math.nan)), and which one gets used
843 # was not deterministic across test iterations.
844 #
845 # To fix this, *never* provide a nan value which is equal (via `is`) to
846 # another provided nan value. This sacrifices some test power; we should
847 # bring that back (ABOVE the choice sequence layer) in the future.
848 #
849 # See https://github.com/HypothesisWorks/hypothesis/issues/3926.
850 if choice_type == "float":
851 assert isinstance(value, float)
852 if math.isnan(value):
853 value = int_to_float(float_to_int(value))
854
855 if observe:
856 was_forced = forced is not None
857 getattr(self.observer, f"draw_{choice_type}")(
858 value, constraints=constraints, was_forced=was_forced
859 )
860 size = 0 if self.provider.avoid_realization else choices_size([value])
861 if self.length + size > self.max_length:
862 debug_report(
863 f"overrun because {self.length=} + {size=} > {self.max_length=}"
864 )
865 self.mark_overrun()
866
867 node = ChoiceNode(
868 type=choice_type,
869 value=value,
870 constraints=constraints,
871 was_forced=was_forced,
872 index=len(self.nodes),
873 )
874 self.__span_record.record_choice()
875 self.nodes += (node,)
876 self.length += size
877
878 return value
879
880 def draw_integer(
881 self,
882 min_value: Optional[int] = None,
883 max_value: Optional[int] = None,
884 *,
885 weights: Optional[dict[int, float]] = None,
886 shrink_towards: int = 0,
887 forced: Optional[int] = None,
888 observe: bool = True,
889 ) -> int:
890 # Validate arguments
891 if weights is not None:
892 assert min_value is not None
893 assert max_value is not None
894 assert len(weights) <= 255 # arbitrary practical limit
895 # We can and should eventually support total weights. But this
896 # complicates shrinking as we can no longer assume we can force
897 # a value to the unmapped probability mass if that mass might be 0.
898 assert sum(weights.values()) < 1
899 # similarly, things get simpler if we assume every value is possible.
900 # we'll want to drop this restriction eventually.
901 assert all(w != 0 for w in weights.values())
902
903 if forced is not None and min_value is not None:
904 assert min_value <= forced
905 if forced is not None and max_value is not None:
906 assert forced <= max_value
907
908 constraints: IntegerConstraints = self._pooled_constraints(
909 "integer",
910 {
911 "min_value": min_value,
912 "max_value": max_value,
913 "weights": weights,
914 "shrink_towards": shrink_towards,
915 },
916 )
917 return self._draw("integer", constraints, observe=observe, forced=forced)
918
919 def draw_float(
920 self,
921 min_value: float = -math.inf,
922 max_value: float = math.inf,
923 *,
924 allow_nan: bool = True,
925 smallest_nonzero_magnitude: float = SMALLEST_SUBNORMAL,
926 # TODO: consider supporting these float widths at the choice sequence
927 # level in the future.
928 # width: Literal[16, 32, 64] = 64,
929 forced: Optional[float] = None,
930 observe: bool = True,
931 ) -> float:
932 assert smallest_nonzero_magnitude > 0
933 assert not math.isnan(min_value)
934 assert not math.isnan(max_value)
935
936 if smallest_nonzero_magnitude == 0.0: # pragma: no cover
937 raise FloatingPointError(
938 "Got allow_subnormal=True, but we can't represent subnormal floats "
939 "right now, in violation of the IEEE-754 floating-point "
940 "specification. This is usually because something was compiled with "
941 "-ffast-math or a similar option, which sets global processor state. "
942 "See https://simonbyrne.github.io/notes/fastmath/ for a more detailed "
943 "writeup - and good luck!"
944 )
945
946 if forced is not None:
947 assert allow_nan or not math.isnan(forced)
948 assert math.isnan(forced) or (
949 sign_aware_lte(min_value, forced) and sign_aware_lte(forced, max_value)
950 )
951
952 constraints: FloatConstraints = self._pooled_constraints(
953 "float",
954 {
955 "min_value": min_value,
956 "max_value": max_value,
957 "allow_nan": allow_nan,
958 "smallest_nonzero_magnitude": smallest_nonzero_magnitude,
959 },
960 )
961 return self._draw("float", constraints, observe=observe, forced=forced)
962
963 def draw_string(
964 self,
965 intervals: IntervalSet,
966 *,
967 min_size: int = 0,
968 max_size: int = COLLECTION_DEFAULT_MAX_SIZE,
969 forced: Optional[str] = None,
970 observe: bool = True,
971 ) -> str:
972 assert forced is None or min_size <= len(forced) <= max_size
973 assert min_size >= 0
974 if len(intervals) == 0:
975 assert min_size == 0
976
977 constraints: StringConstraints = self._pooled_constraints(
978 "string",
979 {
980 "intervals": intervals,
981 "min_size": min_size,
982 "max_size": max_size,
983 },
984 )
985 return self._draw("string", constraints, observe=observe, forced=forced)
986
987 def draw_bytes(
988 self,
989 min_size: int = 0,
990 max_size: int = COLLECTION_DEFAULT_MAX_SIZE,
991 *,
992 forced: Optional[bytes] = None,
993 observe: bool = True,
994 ) -> bytes:
995 assert forced is None or min_size <= len(forced) <= max_size
996 assert min_size >= 0
997
998 constraints: BytesConstraints = self._pooled_constraints(
999 "bytes", {"min_size": min_size, "max_size": max_size}
1000 )
1001 return self._draw("bytes", constraints, observe=observe, forced=forced)
1002
1003 def draw_boolean(
1004 self,
1005 p: float = 0.5,
1006 *,
1007 forced: Optional[bool] = None,
1008 observe: bool = True,
1009 ) -> bool:
1010 assert (forced is not True) or p > 0
1011 assert (forced is not False) or p < 1
1012
1013 constraints: BooleanConstraints = self._pooled_constraints("boolean", {"p": p})
1014 return self._draw("boolean", constraints, observe=observe, forced=forced)
1015
1016 @overload
1017 def _pooled_constraints(
1018 self, choice_type: Literal["integer"], constraints: IntegerConstraints
1019 ) -> IntegerConstraints: ...
1020
1021 @overload
1022 def _pooled_constraints(
1023 self, choice_type: Literal["float"], constraints: FloatConstraints
1024 ) -> FloatConstraints: ...
1025
1026 @overload
1027 def _pooled_constraints(
1028 self, choice_type: Literal["string"], constraints: StringConstraints
1029 ) -> StringConstraints: ...
1030
1031 @overload
1032 def _pooled_constraints(
1033 self, choice_type: Literal["bytes"], constraints: BytesConstraints
1034 ) -> BytesConstraints: ...
1035
1036 @overload
1037 def _pooled_constraints(
1038 self, choice_type: Literal["boolean"], constraints: BooleanConstraints
1039 ) -> BooleanConstraints: ...
1040
1041 def _pooled_constraints(
1042 self, choice_type: ChoiceTypeT, constraints: ChoiceConstraintsT
1043 ) -> ChoiceConstraintsT:
1044 """Memoize common dictionary objects to reduce memory pressure."""
1045 # caching runs afoul of nondeterminism checks
1046 if self.provider.avoid_realization:
1047 return constraints
1048
1049 key = (choice_type, *choice_constraints_key(choice_type, constraints))
1050 try:
1051 return POOLED_CONSTRAINTS_CACHE[key]
1052 except KeyError:
1053 POOLED_CONSTRAINTS_CACHE[key] = constraints
1054 return constraints
1055
1056 def _pop_choice(
1057 self,
1058 choice_type: ChoiceTypeT,
1059 constraints: ChoiceConstraintsT,
1060 *,
1061 forced: Optional[ChoiceT],
1062 ) -> ChoiceT:
1063 assert self.prefix is not None
1064 # checked in _draw
1065 assert self.index < len(self.prefix)
1066
1067 value = self.prefix[self.index]
1068 if isinstance(value, ChoiceTemplate):
1069 node: ChoiceTemplate = value
1070 if node.count is not None:
1071 assert node.count >= 0
1072 # node templates have to be at the end for now, since it's not immediately
1073 # apparent how to handle overruning a node template while generating a single
1074 # node if the alternative is not "the entire data is an overrun".
1075 assert self.index == len(self.prefix) - 1
1076 if node.type == "simplest":
1077 if forced is not None:
1078 choice = forced
1079 try:
1080 choice = choice_from_index(0, choice_type, constraints)
1081 except ChoiceTooLarge:
1082 self.mark_overrun()
1083 else:
1084 raise NotImplementedError
1085
1086 if node.count is not None:
1087 node.count -= 1
1088 if node.count < 0:
1089 self.mark_overrun()
1090 return choice
1091
1092 choice = value
1093 node_choice_type = {
1094 str: "string",
1095 float: "float",
1096 int: "integer",
1097 bool: "boolean",
1098 bytes: "bytes",
1099 }[type(choice)]
1100 # If we're trying to:
1101 # * draw a different choice type at the same location
1102 # * draw the same choice type with a different constraints, which does not permit
1103 # the current value
1104 #
1105 # then we call this a misalignment, because the choice sequence has
1106 # changed from what we expected at some point. An easy misalignment is
1107 #
1108 # one_of(integers(0, 100), integers(101, 200))
1109 #
1110 # where the choice sequence [0, 100] has constraints {min_value: 0, max_value: 100}
1111 # at index 1, but [0, 101] has constraints {min_value: 101, max_value: 200} at
1112 # index 1 (which does not permit any of the values 0-100).
1113 #
1114 # When the choice sequence becomes misaligned, we generate a new value of the
1115 # type and constraints the strategy expects.
1116 if node_choice_type != choice_type or not choice_permitted(choice, constraints):
1117 # only track first misalignment for now.
1118 if self.misaligned_at is None:
1119 self.misaligned_at = (self.index, choice_type, constraints, forced)
1120 try:
1121 # Fill in any misalignments with index 0 choices. An alternative to
1122 # this is using the index of the misaligned choice instead
1123 # of index 0, which may be useful for maintaining
1124 # "similarly-complex choices" in the shrinker. This requires
1125 # attaching an index to every choice in ConjectureData.for_choices,
1126 # which we don't always have (e.g. when reading from db).
1127 #
1128 # If we really wanted this in the future we could make this complexity
1129 # optional, use it if present, and default to index 0 otherwise.
1130 # This complicates our internal api and so I'd like to avoid it
1131 # if possible.
1132 #
1133 # Additionally, I don't think slips which require
1134 # slipping to high-complexity values are common. Though arguably
1135 # we may want to expand a bit beyond *just* the simplest choice.
1136 # (we could for example consider sampling choices from index 0-10).
1137 choice = choice_from_index(0, choice_type, constraints)
1138 except ChoiceTooLarge:
1139 # should really never happen with a 0-index choice, but let's be safe.
1140 self.mark_overrun()
1141
1142 self.index += 1
1143 return choice
1144
1145 def as_result(self) -> Union[ConjectureResult, _Overrun]:
1146 """Convert the result of running this test into
1147 either an Overrun object or a ConjectureResult."""
1148
1149 assert self.frozen
1150 if self.status == Status.OVERRUN:
1151 return Overrun
1152 if self.__result is None:
1153 self.__result = ConjectureResult(
1154 status=self.status,
1155 interesting_origin=self.interesting_origin,
1156 spans=self.spans,
1157 nodes=self.nodes,
1158 length=self.length,
1159 output=self.output,
1160 expected_traceback=self.expected_traceback,
1161 expected_exception=self.expected_exception,
1162 extra_information=(
1163 self.extra_information
1164 if self.extra_information.has_information()
1165 else None
1166 ),
1167 has_discards=self.has_discards,
1168 target_observations=self.target_observations,
1169 tags=frozenset(self.tags),
1170 arg_slices=self.arg_slices,
1171 slice_comments=self.slice_comments,
1172 misaligned_at=self.misaligned_at,
1173 cannot_proceed_scope=self.cannot_proceed_scope,
1174 )
1175 assert self.__result is not None
1176 return self.__result
1177
1178 def __assert_not_frozen(self, name: str) -> None:
1179 if self.frozen:
1180 raise Frozen(f"Cannot call {name} on frozen ConjectureData")
1181
1182 def note(self, value: Any) -> None:
1183 self.__assert_not_frozen("note")
1184 if not isinstance(value, str):
1185 value = repr(value)
1186 self.output += value
1187
1188 def draw(
1189 self,
1190 strategy: "SearchStrategy[Ex]",
1191 label: Optional[int] = None,
1192 observe_as: Optional[str] = None,
1193 ) -> "Ex":
1194 from hypothesis.internal.observability import TESTCASE_CALLBACKS
1195 from hypothesis.strategies._internal.lazy import unwrap_strategies
1196 from hypothesis.strategies._internal.utils import to_jsonable
1197
1198 if self.is_find and not strategy.supports_find:
1199 raise InvalidArgument(
1200 f"Cannot use strategy {strategy!r} within a call to find "
1201 "(presumably because it would be invalid after the call had ended)."
1202 )
1203
1204 at_top_level = self.depth == 0
1205 start_time = None
1206 if at_top_level:
1207 # We start this timer early, because accessing attributes on a LazyStrategy
1208 # can be almost arbitrarily slow. In cases like characters() and text()
1209 # where we cache something expensive, this led to Flaky deadline errors!
1210 # See https://github.com/HypothesisWorks/hypothesis/issues/2108
1211 start_time = time.perf_counter()
1212 gc_start_time = gc_cumulative_time()
1213
1214 strategy.validate()
1215
1216 if strategy.is_empty:
1217 self.mark_invalid(f"empty strategy {self!r}")
1218
1219 if self.depth >= MAX_DEPTH:
1220 self.mark_invalid("max depth exceeded")
1221
1222 # Jump directly to the unwrapped strategy for the label and for do_draw.
1223 # This avoids adding an extra span to all lazy strategies.
1224 unwrapped = unwrap_strategies(strategy)
1225 if label is None:
1226 label = unwrapped.label
1227 assert isinstance(label, int)
1228
1229 self.start_span(label=label)
1230 try:
1231 if not at_top_level:
1232 return unwrapped.do_draw(self)
1233 assert start_time is not None
1234 key = observe_as or f"generate:unlabeled_{len(self.draw_times)}"
1235 try:
1236 try:
1237 v = unwrapped.do_draw(self)
1238 finally:
1239 # Subtract the time spent in GC to avoid overcounting, as it is
1240 # accounted for at the overall example level.
1241 in_gctime = gc_cumulative_time() - gc_start_time
1242 self.draw_times[key] = time.perf_counter() - start_time - in_gctime
1243 except Exception as err:
1244 add_note(
1245 err,
1246 f"while generating {key.removeprefix('generate:')!r} from {strategy!r}",
1247 )
1248 raise
1249 if TESTCASE_CALLBACKS:
1250 avoid = self.provider.avoid_realization
1251 self._observability_args[key] = to_jsonable(v, avoid_realization=avoid)
1252 return v
1253 finally:
1254 self.stop_span()
1255
1256 def start_span(self, label: int) -> None:
1257 self.provider.span_start(label)
1258 self.__assert_not_frozen("start_span")
1259 self.depth += 1
1260 # Logically it would make sense for this to just be
1261 # ``self.depth = max(self.depth, self.max_depth)``, which is what it used to
1262 # be until we ran the code under tracemalloc and found a rather significant
1263 # chunk of allocation was happening here. This was presumably due to varargs
1264 # or the like, but we didn't investigate further given that it was easy
1265 # to fix with this check.
1266 if self.depth > self.max_depth:
1267 self.max_depth = self.depth
1268 self.__span_record.start_span(label)
1269 self.labels_for_structure_stack.append({label})
1270
1271 def stop_span(self, *, discard: bool = False) -> None:
1272 self.provider.span_end(discard)
1273 if self.frozen:
1274 return
1275 if discard:
1276 self.has_discards = True
1277 self.depth -= 1
1278 assert self.depth >= -1
1279 self.__span_record.stop_span(discard=discard)
1280
1281 labels_for_structure = self.labels_for_structure_stack.pop()
1282
1283 if not discard:
1284 if self.labels_for_structure_stack:
1285 self.labels_for_structure_stack[-1].update(labels_for_structure)
1286 else:
1287 self.tags.update([structural_coverage(l) for l in labels_for_structure])
1288
1289 if discard:
1290 # Once we've discarded a span, every test case starting with
1291 # this prefix contains discards. We prune the tree at that point so
1292 # as to avoid future test cases bothering with this region, on the
1293 # assumption that some span that you could have used instead
1294 # there would *not* trigger the discard. This greatly speeds up
1295 # test case generation in some cases, because it allows us to
1296 # ignore large swathes of the search space that are effectively
1297 # redundant.
1298 #
1299 # A scenario that can cause us problems but which we deliberately
1300 # have decided not to support is that if there are side effects
1301 # during data generation then you may end up with a scenario where
1302 # every good test case generates a discard because the discarded
1303 # section sets up important things for later. This is not terribly
1304 # likely and all that you see in this case is some degradation in
1305 # quality of testing, so we don't worry about it.
1306 #
1307 # Note that killing the branch does *not* mean we will never
1308 # explore below this point, and in particular we may do so during
1309 # shrinking. Any explicit request for a data object that starts
1310 # with the branch here will work just fine, but novel prefix
1311 # generation will avoid it, and we can use it to detect when we
1312 # have explored the entire tree (up to redundancy).
1313
1314 self.observer.kill_branch()
1315
1316 @property
1317 def spans(self) -> Spans:
1318 assert self.frozen
1319 if self.__spans is None:
1320 self.__spans = Spans(record=self.__span_record)
1321 return self.__spans
1322
1323 def freeze(self) -> None:
1324 if self.frozen:
1325 return
1326 self.finish_time = time.perf_counter()
1327 self.gc_finish_time = gc_cumulative_time()
1328
1329 # Always finish by closing all remaining spans so that we have a valid tree.
1330 while self.depth >= 0:
1331 self.stop_span()
1332
1333 self.__span_record.freeze()
1334 self.frozen = True
1335 self.observer.conclude_test(self.status, self.interesting_origin)
1336
1337 def choice(
1338 self,
1339 values: Sequence[T],
1340 *,
1341 forced: Optional[T] = None,
1342 observe: bool = True,
1343 ) -> T:
1344 forced_i = None if forced is None else values.index(forced)
1345 i = self.draw_integer(
1346 0,
1347 len(values) - 1,
1348 forced=forced_i,
1349 observe=observe,
1350 )
1351 return values[i]
1352
1353 def conclude_test(
1354 self,
1355 status: Status,
1356 interesting_origin: Optional[InterestingOrigin] = None,
1357 ) -> NoReturn:
1358 assert (interesting_origin is None) or (status == Status.INTERESTING)
1359 self.__assert_not_frozen("conclude_test")
1360 self.interesting_origin = interesting_origin
1361 self.status = status
1362 self.freeze()
1363 raise StopTest(self.testcounter)
1364
1365 def mark_interesting(self, interesting_origin: InterestingOrigin) -> NoReturn:
1366 self.conclude_test(Status.INTERESTING, interesting_origin)
1367
1368 def mark_invalid(self, why: Optional[str] = None) -> NoReturn:
1369 if why is not None:
1370 self.events["invalid because"] = why
1371 self.conclude_test(Status.INVALID)
1372
1373 def mark_overrun(self) -> NoReturn:
1374 self.conclude_test(Status.OVERRUN)
1375
1376
1377def draw_choice(
1378 choice_type: ChoiceTypeT, constraints: ChoiceConstraintsT, *, random: Random
1379) -> ChoiceT:
1380 cd = ConjectureData(random=random)
1381 return cast(ChoiceT, getattr(cd.provider, f"draw_{choice_type}")(**constraints))