1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
10
11import abc
12import contextlib
13import math
14import time
15from collections import defaultdict
16from enum import IntEnum
17from random import Random
18from sys import float_info
19from typing import (
20 TYPE_CHECKING,
21 Any,
22 Callable,
23 DefaultDict,
24 Dict,
25 FrozenSet,
26 Iterable,
27 Iterator,
28 List,
29 Literal,
30 NoReturn,
31 Optional,
32 Sequence,
33 Set,
34 Tuple,
35 Type,
36 TypedDict,
37 TypeVar,
38 Union,
39)
40
41import attr
42
43from hypothesis.errors import Frozen, InvalidArgument, StopTest
44from hypothesis.internal.cache import LRUReusedCache
45from hypothesis.internal.compat import add_note, floor, int_from_bytes, int_to_bytes
46from hypothesis.internal.conjecture.floats import float_to_lex, lex_to_float
47from hypothesis.internal.conjecture.junkdrawer import (
48 IntList,
49 gc_cumulative_time,
50 uniform,
51)
52from hypothesis.internal.conjecture.utils import (
53 INT_SIZES,
54 INT_SIZES_SAMPLER,
55 Sampler,
56 calc_label_from_name,
57 many,
58)
59from hypothesis.internal.floats import (
60 SIGNALING_NAN,
61 SMALLEST_SUBNORMAL,
62 float_to_int,
63 int_to_float,
64 make_float_clamper,
65 next_down,
66 next_up,
67 sign_aware_lte,
68)
69from hypothesis.internal.intervalsets import IntervalSet
70
71if TYPE_CHECKING:
72 from typing import TypeAlias
73
74 from typing_extensions import dataclass_transform
75
76 from hypothesis.strategies import SearchStrategy
77 from hypothesis.strategies._internal.strategies import Ex
78else:
79 TypeAlias = object
80
81 def dataclass_transform():
82 def wrapper(tp):
83 return tp
84
85 return wrapper
86
87
88TOP_LABEL = calc_label_from_name("top")
89InterestingOrigin = Tuple[
90 Type[BaseException], str, int, Tuple[Any, ...], Tuple[Tuple[Any, ...], ...]
91]
92TargetObservations = Dict[str, Union[int, float]]
93
94T = TypeVar("T")
95
96
97class IntegerKWargs(TypedDict):
98 min_value: Optional[int]
99 max_value: Optional[int]
100 weights: Optional[Sequence[float]]
101 shrink_towards: int
102
103
104class FloatKWargs(TypedDict):
105 min_value: float
106 max_value: float
107 allow_nan: bool
108 smallest_nonzero_magnitude: float
109
110
111class StringKWargs(TypedDict):
112 intervals: IntervalSet
113 min_size: int
114 max_size: Optional[int]
115
116
117class BytesKWargs(TypedDict):
118 size: int
119
120
121class BooleanKWargs(TypedDict):
122 p: float
123
124
125IRType: TypeAlias = Union[int, str, bool, float, bytes]
126IRKWargsType: TypeAlias = Union[
127 IntegerKWargs, FloatKWargs, StringKWargs, BytesKWargs, BooleanKWargs
128]
129IRTypeName: TypeAlias = Literal["integer", "string", "boolean", "float", "bytes"]
130# ir_type, kwargs, forced
131InvalidAt: TypeAlias = Tuple[IRTypeName, IRKWargsType, Optional[IRType]]
132
133
134class ExtraInformation:
135 """A class for holding shared state on a ``ConjectureData`` that should
136 be added to the final ``ConjectureResult``."""
137
138 def __repr__(self) -> str:
139 return "ExtraInformation({})".format(
140 ", ".join(f"{k}={v!r}" for k, v in self.__dict__.items()),
141 )
142
143 def has_information(self) -> bool:
144 return bool(self.__dict__)
145
146
147class Status(IntEnum):
148 OVERRUN = 0
149 INVALID = 1
150 VALID = 2
151 INTERESTING = 3
152
153 def __repr__(self) -> str:
154 return f"Status.{self.name}"
155
156
157@dataclass_transform()
158@attr.s(frozen=True, slots=True, auto_attribs=True)
159class StructuralCoverageTag:
160 label: int
161
162
163STRUCTURAL_COVERAGE_CACHE: Dict[int, StructuralCoverageTag] = {}
164
165
166def structural_coverage(label: int) -> StructuralCoverageTag:
167 try:
168 return STRUCTURAL_COVERAGE_CACHE[label]
169 except KeyError:
170 return STRUCTURAL_COVERAGE_CACHE.setdefault(label, StructuralCoverageTag(label))
171
172
173NASTY_FLOATS = sorted(
174 [
175 0.0,
176 0.5,
177 1.1,
178 1.5,
179 1.9,
180 1.0 / 3,
181 10e6,
182 10e-6,
183 1.175494351e-38,
184 next_up(0.0),
185 float_info.min,
186 float_info.max,
187 3.402823466e38,
188 9007199254740992,
189 1 - 10e-6,
190 2 + 10e-6,
191 1.192092896e-07,
192 2.2204460492503131e-016,
193 ]
194 + [2.0**-n for n in (24, 14, 149, 126)] # minimum (sub)normals for float16,32
195 + [float_info.min / n for n in (2, 10, 1000, 100_000)] # subnormal in float64
196 + [math.inf, math.nan] * 5
197 + [SIGNALING_NAN],
198 key=float_to_lex,
199)
200NASTY_FLOATS = list(map(float, NASTY_FLOATS))
201NASTY_FLOATS.extend([-x for x in NASTY_FLOATS])
202
203FLOAT_INIT_LOGIC_CACHE = LRUReusedCache(4096)
204
205POOLED_KWARGS_CACHE = LRUReusedCache(4096)
206
207DRAW_STRING_DEFAULT_MAX_SIZE = 10**10 # "arbitrarily large"
208
209
210class Example:
211 """Examples track the hierarchical structure of draws from the byte stream,
212 within a single test run.
213
214 Examples are created to mark regions of the byte stream that might be
215 useful to the shrinker, such as:
216 - The bytes used by a single draw from a strategy.
217 - Useful groupings within a strategy, such as individual list elements.
218 - Strategy-like helper functions that aren't first-class strategies.
219 - Each lowest-level draw of bits or bytes from the byte stream.
220 - A single top-level example that spans the entire input.
221
222 Example-tracking allows the shrinker to try "high-level" transformations,
223 such as rearranging or deleting the elements of a list, without having
224 to understand their exact representation in the byte stream.
225
226 Rather than store each ``Example`` as a rich object, it is actually
227 just an index into the ``Examples`` class defined below. This has two
228 purposes: Firstly, for most properties of examples we will never need
229 to allocate storage at all, because most properties are not used on
230 most examples. Secondly, by storing the properties as compact lists
231 of integers, we save a considerable amount of space compared to
232 Python's normal object size.
233
234 This does have the downside that it increases the amount of allocation
235 we do, and slows things down as a result, in some usage patterns because
236 we repeatedly allocate the same Example or int objects, but it will
237 often dramatically reduce our memory usage, so is worth it.
238 """
239
240 __slots__ = ("owner", "index")
241
242 def __init__(self, owner: "Examples", index: int) -> None:
243 self.owner = owner
244 self.index = index
245
246 def __eq__(self, other: object) -> bool:
247 if self is other:
248 return True
249 if not isinstance(other, Example):
250 return NotImplemented
251 return (self.owner is other.owner) and (self.index == other.index)
252
253 def __ne__(self, other: object) -> bool:
254 if self is other:
255 return False
256 if not isinstance(other, Example):
257 return NotImplemented
258 return (self.owner is not other.owner) or (self.index != other.index)
259
260 def __repr__(self) -> str:
261 return f"examples[{self.index}]"
262
263 @property
264 def label(self) -> int:
265 """A label is an opaque value that associates each example with its
266 approximate origin, such as a particular strategy class or a particular
267 kind of draw."""
268 return self.owner.labels[self.owner.label_indices[self.index]]
269
270 @property
271 def parent(self):
272 """The index of the example that this one is nested directly within."""
273 if self.index == 0:
274 return None
275 return self.owner.parentage[self.index]
276
277 @property
278 def start(self) -> int:
279 """The position of the start of this example in the byte stream."""
280 return self.owner.starts[self.index]
281
282 @property
283 def end(self) -> int:
284 """The position directly after the last byte in this byte stream.
285 i.e. the example corresponds to the half open region [start, end).
286 """
287 return self.owner.ends[self.index]
288
289 @property
290 def ir_start(self) -> int:
291 return self.owner.ir_starts[self.index]
292
293 @property
294 def ir_end(self) -> int:
295 return self.owner.ir_ends[self.index]
296
297 @property
298 def depth(self):
299 """Depth of this example in the example tree. The top-level example has a
300 depth of 0."""
301 return self.owner.depths[self.index]
302
303 @property
304 def trivial(self):
305 """An example is "trivial" if it only contains forced bytes and zero bytes.
306 All examples start out as trivial, and then get marked non-trivial when
307 we see a byte that is neither forced nor zero."""
308 return self.index in self.owner.trivial
309
310 @property
311 def discarded(self) -> bool:
312 """True if this is example's ``stop_example`` call had ``discard`` set to
313 ``True``. This means we believe that the shrinker should be able to delete
314 this example completely, without affecting the value produced by its enclosing
315 strategy. Typically set when a rejection sampler decides to reject a
316 generated value and try again."""
317 return self.index in self.owner.discarded
318
319 @property
320 def length(self) -> int:
321 """The number of bytes in this example."""
322 return self.end - self.start
323
324 @property
325 def ir_length(self) -> int:
326 """The number of ir nodes in this example."""
327 return self.ir_end - self.ir_start
328
329 @property
330 def children(self) -> "List[Example]":
331 """The list of all examples with this as a parent, in increasing index
332 order."""
333 return [self.owner[i] for i in self.owner.children[self.index]]
334
335
336class ExampleProperty:
337 """There are many properties of examples that we calculate by
338 essentially rerunning the test case multiple times based on the
339 calls which we record in ExampleRecord.
340
341 This class defines a visitor, subclasses of which can be used
342 to calculate these properties.
343 """
344
345 def __init__(self, examples: "Examples"):
346 self.example_stack: "List[int]" = []
347 self.examples = examples
348 self.bytes_read = 0
349 self.example_count = 0
350 self.block_count = 0
351 self.ir_node_count = 0
352
353 def run(self) -> Any:
354 """Rerun the test case with this visitor and return the
355 results of ``self.finish()``."""
356 self.begin()
357 blocks = self.examples.blocks
358 for record in self.examples.trail:
359 if record == DRAW_BITS_RECORD:
360 self.bytes_read = blocks.endpoints[self.block_count]
361 self.block(self.block_count)
362 self.block_count += 1
363 elif record == IR_NODE_RECORD:
364 data = self.examples.ir_nodes[self.ir_node_count]
365 self.ir_node(data)
366 self.ir_node_count += 1
367 elif record >= START_EXAMPLE_RECORD:
368 self.__push(record - START_EXAMPLE_RECORD)
369 else:
370 assert record in (
371 STOP_EXAMPLE_DISCARD_RECORD,
372 STOP_EXAMPLE_NO_DISCARD_RECORD,
373 )
374 self.__pop(discarded=record == STOP_EXAMPLE_DISCARD_RECORD)
375 return self.finish()
376
377 def __push(self, label_index: int) -> None:
378 i = self.example_count
379 assert i < len(self.examples)
380 self.start_example(i, label_index=label_index)
381 self.example_count += 1
382 self.example_stack.append(i)
383
384 def __pop(self, *, discarded: bool) -> None:
385 i = self.example_stack.pop()
386 self.stop_example(i, discarded=discarded)
387
388 def begin(self) -> None:
389 """Called at the beginning of the run to initialise any
390 relevant state."""
391 self.result = IntList.of_length(len(self.examples))
392
393 def start_example(self, i: int, label_index: int) -> None:
394 """Called at the start of each example, with ``i`` the
395 index of the example and ``label_index`` the index of
396 its label in ``self.examples.labels``."""
397
398 def block(self, i: int) -> None:
399 """Called with each ``draw_bits`` call, with ``i`` the index of the
400 corresponding block in ``self.examples.blocks``"""
401
402 def stop_example(self, i: int, *, discarded: bool) -> None:
403 """Called at the end of each example, with ``i`` the
404 index of the example and ``discarded`` being ``True`` if ``stop_example``
405 was called with ``discard=True``."""
406
407 def ir_node(self, node: "IRNode") -> None:
408 """Called when an ir node is drawn."""
409
410 def finish(self) -> Any:
411 return self.result
412
413
414def calculated_example_property(cls: Type[ExampleProperty]) -> Any:
415 """Given an ``ExampleProperty`` as above we use this decorator
416 to transform it into a lazy property on the ``Examples`` class,
417 which has as its value the result of calling ``cls.run()``,
418 computed the first time the property is accessed.
419
420 This has the slightly weird result that we are defining nested
421 classes which get turned into properties."""
422 name = cls.__name__
423 cache_name = "__" + name
424
425 def lazy_calculate(self: "Examples") -> IntList:
426 result = getattr(self, cache_name, None)
427 if result is None:
428 result = cls(self).run()
429 setattr(self, cache_name, result)
430 return result
431
432 lazy_calculate.__name__ = cls.__name__
433 lazy_calculate.__qualname__ = cls.__qualname__
434 return property(lazy_calculate)
435
436
437DRAW_BITS_RECORD = 0
438STOP_EXAMPLE_DISCARD_RECORD = 1
439STOP_EXAMPLE_NO_DISCARD_RECORD = 2
440START_EXAMPLE_RECORD = 3
441
442IR_NODE_RECORD = calc_label_from_name("ir draw record")
443
444
445class ExampleRecord:
446 """Records the series of ``start_example``, ``stop_example``, and
447 ``draw_bits`` calls so that these may be stored in ``Examples`` and
448 replayed when we need to know about the structure of individual
449 ``Example`` objects.
450
451 Note that there is significant similarity between this class and
452 ``DataObserver``, and the plan is to eventually unify them, but
453 they currently have slightly different functions and implementations.
454 """
455
456 def __init__(self) -> None:
457 self.labels: List[int] = []
458 self.__index_of_labels: "Optional[Dict[int, int]]" = {}
459 self.trail = IntList()
460 self.ir_nodes: List[IRNode] = []
461
462 def freeze(self) -> None:
463 self.__index_of_labels = None
464
465 def record_ir_draw(self, ir_type, value, *, kwargs, was_forced):
466 self.trail.append(IR_NODE_RECORD)
467 node = IRNode(
468 ir_type=ir_type,
469 value=value,
470 kwargs=kwargs,
471 was_forced=was_forced,
472 index=len(self.ir_nodes),
473 )
474 self.ir_nodes.append(node)
475
476 def start_example(self, label: int) -> None:
477 assert self.__index_of_labels is not None
478 try:
479 i = self.__index_of_labels[label]
480 except KeyError:
481 i = self.__index_of_labels.setdefault(label, len(self.labels))
482 self.labels.append(label)
483 self.trail.append(START_EXAMPLE_RECORD + i)
484
485 def stop_example(self, *, discard: bool) -> None:
486 if discard:
487 self.trail.append(STOP_EXAMPLE_DISCARD_RECORD)
488 else:
489 self.trail.append(STOP_EXAMPLE_NO_DISCARD_RECORD)
490
491 def draw_bits(self) -> None:
492 self.trail.append(DRAW_BITS_RECORD)
493
494
495class Examples:
496 """A lazy collection of ``Example`` objects, derived from
497 the record of recorded behaviour in ``ExampleRecord``.
498
499 Behaves logically as if it were a list of ``Example`` objects,
500 but actually mostly exists as a compact store of information
501 for them to reference into. All properties on here are best
502 understood as the backing storage for ``Example`` and are
503 described there.
504 """
505
506 def __init__(self, record: ExampleRecord, blocks: "Blocks") -> None:
507 self.trail = record.trail
508 self.ir_nodes = record.ir_nodes
509 self.labels = record.labels
510 self.__length = self.trail.count(
511 STOP_EXAMPLE_DISCARD_RECORD
512 ) + record.trail.count(STOP_EXAMPLE_NO_DISCARD_RECORD)
513 self.blocks = blocks
514 self.__children: "Optional[List[Sequence[int]]]" = None
515
516 class _starts_and_ends(ExampleProperty):
517 def begin(self):
518 self.starts = IntList.of_length(len(self.examples))
519 self.ends = IntList.of_length(len(self.examples))
520
521 def start_example(self, i: int, label_index: int) -> None:
522 self.starts[i] = self.bytes_read
523
524 def stop_example(self, i: int, *, discarded: bool) -> None:
525 self.ends[i] = self.bytes_read
526
527 def finish(self) -> Tuple[IntList, IntList]:
528 return (self.starts, self.ends)
529
530 starts_and_ends: "Tuple[IntList, IntList]" = calculated_example_property(
531 _starts_and_ends
532 )
533
534 @property
535 def starts(self) -> IntList:
536 return self.starts_and_ends[0]
537
538 @property
539 def ends(self) -> IntList:
540 return self.starts_and_ends[1]
541
542 class _ir_starts_and_ends(ExampleProperty):
543 def begin(self):
544 self.starts = IntList.of_length(len(self.examples))
545 self.ends = IntList.of_length(len(self.examples))
546
547 def start_example(self, i: int, label_index: int) -> None:
548 self.starts[i] = self.ir_node_count
549
550 def stop_example(self, i: int, *, discarded: bool) -> None:
551 self.ends[i] = self.ir_node_count
552
553 def finish(self) -> Tuple[IntList, IntList]:
554 return (self.starts, self.ends)
555
556 ir_starts_and_ends: "Tuple[IntList, IntList]" = calculated_example_property(
557 _ir_starts_and_ends
558 )
559
560 @property
561 def ir_starts(self) -> IntList:
562 return self.ir_starts_and_ends[0]
563
564 @property
565 def ir_ends(self) -> IntList:
566 return self.ir_starts_and_ends[1]
567
568 class _discarded(ExampleProperty):
569 def begin(self) -> None:
570 self.result: "Set[int]" = set() # type: ignore # IntList in parent class
571
572 def finish(self) -> FrozenSet[int]:
573 return frozenset(self.result)
574
575 def stop_example(self, i: int, *, discarded: bool) -> None:
576 if discarded:
577 self.result.add(i)
578
579 discarded: FrozenSet[int] = calculated_example_property(_discarded)
580
581 class _trivial(ExampleProperty):
582 def begin(self) -> None:
583 self.nontrivial = IntList.of_length(len(self.examples))
584 self.result: "Set[int]" = set() # type: ignore # IntList in parent class
585
586 def block(self, i: int) -> None:
587 if not self.examples.blocks.trivial(i):
588 self.nontrivial[self.example_stack[-1]] = 1
589
590 def stop_example(self, i: int, *, discarded: bool) -> None:
591 if self.nontrivial[i]:
592 if self.example_stack:
593 self.nontrivial[self.example_stack[-1]] = 1
594 else:
595 self.result.add(i)
596
597 def finish(self) -> FrozenSet[int]:
598 return frozenset(self.result)
599
600 trivial: FrozenSet[int] = calculated_example_property(_trivial)
601
602 class _parentage(ExampleProperty):
603 def stop_example(self, i: int, *, discarded: bool) -> None:
604 if i > 0:
605 self.result[i] = self.example_stack[-1]
606
607 parentage: IntList = calculated_example_property(_parentage)
608
609 class _depths(ExampleProperty):
610 def begin(self):
611 self.result = IntList.of_length(len(self.examples))
612
613 def start_example(self, i: int, label_index: int) -> None:
614 self.result[i] = len(self.example_stack)
615
616 depths: IntList = calculated_example_property(_depths)
617
618 class _ir_tree_nodes(ExampleProperty):
619 def begin(self):
620 self.result = []
621
622 def ir_node(self, ir_node):
623 self.result.append(ir_node)
624
625 ir_tree_nodes: "List[IRNode]" = calculated_example_property(_ir_tree_nodes)
626
627 class _label_indices(ExampleProperty):
628 def start_example(self, i: int, label_index: int) -> None:
629 self.result[i] = label_index
630
631 label_indices: IntList = calculated_example_property(_label_indices)
632
633 class _mutator_groups(ExampleProperty):
634 def begin(self) -> None:
635 self.groups: "Dict[int, Set[Tuple[int, int]]]" = defaultdict(set)
636
637 def start_example(self, i: int, label_index: int) -> None:
638 # TODO should we discard start == end cases? occurs for eg st.data()
639 # which is conditionally or never drawn from. arguably swapping
640 # nodes with the empty list is a useful mutation enabled by start == end?
641 key = (self.examples[i].ir_start, self.examples[i].ir_end)
642 self.groups[label_index].add(key)
643
644 def finish(self) -> Iterable[Set[Tuple[int, int]]]:
645 # Discard groups with only one example, since the mutator can't
646 # do anything useful with them.
647 return [g for g in self.groups.values() if len(g) >= 2]
648
649 mutator_groups: List[Set[Tuple[int, int]]] = calculated_example_property(
650 _mutator_groups
651 )
652
653 @property
654 def children(self) -> List[Sequence[int]]:
655 if self.__children is None:
656 children = [IntList() for _ in range(len(self))]
657 for i, p in enumerate(self.parentage):
658 if i > 0:
659 children[p].append(i)
660 # Replace empty children lists with a tuple to reduce
661 # memory usage.
662 for i, c in enumerate(children):
663 if not c:
664 children[i] = () # type: ignore
665 self.__children = children # type: ignore
666 return self.__children # type: ignore
667
668 def __len__(self) -> int:
669 return self.__length
670
671 def __getitem__(self, i: int) -> Example:
672 assert isinstance(i, int)
673 n = len(self)
674 if i < -n or i >= n:
675 raise IndexError(f"Index {i} out of range [-{n}, {n})")
676 if i < 0:
677 i += n
678 return Example(self, i)
679
680
681@dataclass_transform()
682@attr.s(slots=True, frozen=True)
683class Block:
684 """Blocks track the flat list of lowest-level draws from the byte stream,
685 within a single test run.
686
687 Block-tracking allows the shrinker to try "low-level"
688 transformations, such as minimizing the numeric value of an
689 individual call to ``draw_bits``.
690 """
691
692 start: int = attr.ib()
693 end: int = attr.ib()
694
695 # Index of this block inside the overall list of blocks.
696 index: int = attr.ib()
697
698 # True if this block's byte values were forced by a write operation.
699 # As long as the bytes before this block remain the same, modifying this
700 # block's bytes will have no effect.
701 forced: bool = attr.ib(repr=False)
702
703 # True if this block's byte values are all 0. Reading this flag can be
704 # more convenient than explicitly checking a slice for non-zero bytes.
705 all_zero: bool = attr.ib(repr=False)
706
707 @property
708 def bounds(self) -> Tuple[int, int]:
709 return (self.start, self.end)
710
711 @property
712 def length(self) -> int:
713 return self.end - self.start
714
715 @property
716 def trivial(self) -> bool:
717 return self.forced or self.all_zero
718
719
720class Blocks:
721 """A lazily calculated list of blocks for a particular ``ConjectureResult``
722 or ``ConjectureData`` object.
723
724 Pretends to be a list containing ``Block`` objects but actually only
725 contains their endpoints right up until the point where you want to
726 access the actual block, at which point it is constructed.
727
728 This is designed to be as space efficient as possible, so will at
729 various points silently transform its representation into one
730 that is better suited for the current access pattern.
731
732 In addition, it has a number of convenience methods for accessing
733 properties of the block object at index ``i`` that should generally
734 be preferred to using the Block objects directly, as it will not
735 have to allocate the actual object."""
736
737 __slots__ = ("endpoints", "owner", "__blocks", "__count", "__sparse")
738 owner: "Union[ConjectureData, ConjectureResult, None]"
739 __blocks: Union[Dict[int, Block], List[Optional[Block]]]
740
741 def __init__(self, owner: "ConjectureData") -> None:
742 self.owner = owner
743 self.endpoints = IntList()
744 self.__blocks = {}
745 self.__count = 0
746 self.__sparse = True
747
748 def add_endpoint(self, n: int) -> None:
749 """Add n to the list of endpoints."""
750 assert isinstance(self.owner, ConjectureData)
751 self.endpoints.append(n)
752
753 def transfer_ownership(self, new_owner: "ConjectureResult") -> None:
754 """Used to move ``Blocks`` over to a ``ConjectureResult`` object
755 when that is read to be used and we no longer want to keep the
756 whole ``ConjectureData`` around."""
757 assert isinstance(new_owner, ConjectureResult)
758 self.owner = new_owner
759 self.__check_completion()
760
761 def start(self, i: int) -> int:
762 """Equivalent to self[i].start."""
763 i = self._check_index(i)
764
765 if i == 0:
766 return 0
767 else:
768 return self.end(i - 1)
769
770 def end(self, i: int) -> int:
771 """Equivalent to self[i].end."""
772 return self.endpoints[i]
773
774 def all_bounds(self) -> Iterable[Tuple[int, int]]:
775 """Equivalent to [(b.start, b.end) for b in self]."""
776 prev = 0
777 for e in self.endpoints:
778 yield (prev, e)
779 prev = e
780
781 @property
782 def last_block_length(self):
783 return self.end(-1) - self.start(-1)
784
785 def __len__(self) -> int:
786 return len(self.endpoints)
787
788 def __known_block(self, i: int) -> Optional[Block]:
789 try:
790 return self.__blocks[i]
791 except (KeyError, IndexError):
792 return None
793
794 def trivial(self, i: int) -> Any:
795 """Equivalent to self.blocks[i].trivial."""
796 if self.owner is not None:
797 return self.start(i) in self.owner.forced_indices or not any(
798 self.owner.buffer[self.start(i) : self.end(i)]
799 )
800 else:
801 return self[i].trivial
802
803 def _check_index(self, i: int) -> int:
804 n = len(self)
805 if i < -n or i >= n:
806 raise IndexError(f"Index {i} out of range [-{n}, {n})")
807 if i < 0:
808 i += n
809 return i
810
811 def __getitem__(self, i: int) -> Block:
812 i = self._check_index(i)
813 assert i >= 0
814 result = self.__known_block(i)
815 if result is not None:
816 return result
817
818 # We store the blocks as a sparse dict mapping indices to the
819 # actual result, but this isn't the best representation once we
820 # stop being sparse and want to use most of the blocks. Switch
821 # over to a list at that point.
822 if self.__sparse and len(self.__blocks) * 2 >= len(self):
823 new_blocks: "List[Optional[Block]]" = [None] * len(self)
824 assert isinstance(self.__blocks, dict)
825 for k, v in self.__blocks.items():
826 new_blocks[k] = v
827 self.__sparse = False
828 self.__blocks = new_blocks
829 assert self.__blocks[i] is None
830
831 start = self.start(i)
832 end = self.end(i)
833
834 # We keep track of the number of blocks that have actually been
835 # instantiated so that when every block that could be instantiated
836 # has been we know that the list is complete and can throw away
837 # some data that we no longer need.
838 self.__count += 1
839
840 # Integrity check: We can't have allocated more blocks than we have
841 # positions for blocks.
842 assert self.__count <= len(self)
843 assert self.owner is not None
844 result = Block(
845 start=start,
846 end=end,
847 index=i,
848 forced=start in self.owner.forced_indices,
849 all_zero=not any(self.owner.buffer[start:end]),
850 )
851 try:
852 self.__blocks[i] = result
853 except IndexError:
854 assert isinstance(self.__blocks, list)
855 assert len(self.__blocks) < len(self)
856 self.__blocks.extend([None] * (len(self) - len(self.__blocks)))
857 self.__blocks[i] = result
858
859 self.__check_completion()
860
861 return result
862
863 def __check_completion(self):
864 """The list of blocks is complete if we have created every ``Block``
865 object that we currently good and know that no more will be created.
866
867 If this happens then we don't need to keep the reference to the
868 owner around, and delete it so that there is no circular reference.
869 The main benefit of this is that the gc doesn't need to run to collect
870 this because normal reference counting is enough.
871 """
872 if self.__count == len(self) and isinstance(self.owner, ConjectureResult):
873 self.owner = None
874
875 def __iter__(self) -> Iterator[Block]:
876 for i in range(len(self)):
877 yield self[i]
878
879 def __repr__(self) -> str:
880 parts: "List[str]" = []
881 for i in range(len(self)):
882 b = self.__known_block(i)
883 if b is None:
884 parts.append("...")
885 else:
886 parts.append(repr(b))
887 return "Block([{}])".format(", ".join(parts))
888
889
890class _Overrun:
891 status = Status.OVERRUN
892
893 def __repr__(self):
894 return "Overrun"
895
896
897Overrun = _Overrun()
898
899global_test_counter = 0
900
901
902MAX_DEPTH = 100
903
904
905class DataObserver:
906 """Observer class for recording the behaviour of a
907 ConjectureData object, primarily used for tracking
908 the behaviour in the tree cache."""
909
910 def conclude_test(
911 self,
912 status: Status,
913 interesting_origin: Optional[InterestingOrigin],
914 ) -> None:
915 """Called when ``conclude_test`` is called on the
916 observed ``ConjectureData``, with the same arguments.
917
918 Note that this is called after ``freeze`` has completed.
919 """
920
921 def kill_branch(self) -> None:
922 """Mark this part of the tree as not worth re-exploring."""
923
924 def draw_integer(
925 self, value: int, *, kwargs: IntegerKWargs, was_forced: bool
926 ) -> None:
927 pass
928
929 def draw_float(
930 self, value: float, *, kwargs: FloatKWargs, was_forced: bool
931 ) -> None:
932 pass
933
934 def draw_string(
935 self, value: str, *, kwargs: StringKWargs, was_forced: bool
936 ) -> None:
937 pass
938
939 def draw_bytes(
940 self, value: bytes, *, kwargs: BytesKWargs, was_forced: bool
941 ) -> None:
942 pass
943
944 def draw_boolean(
945 self, value: bool, *, kwargs: BooleanKWargs, was_forced: bool
946 ) -> None:
947 pass
948
949 def mark_invalid(self, invalid_at: InvalidAt) -> None:
950 pass
951
952
953@attr.s(slots=True, repr=False, eq=False)
954class IRNode:
955 ir_type: IRTypeName = attr.ib()
956 value: IRType = attr.ib()
957 kwargs: IRKWargsType = attr.ib()
958 was_forced: bool = attr.ib()
959 index: Optional[int] = attr.ib(default=None)
960
961 def copy(
962 self,
963 *,
964 with_value: Optional[IRType] = None,
965 with_kwargs: Optional[IRKWargsType] = None,
966 ) -> "IRNode":
967 # we may want to allow this combination in the future, but for now it's
968 # a footgun.
969 assert not self.was_forced, "modifying a forced node doesn't make sense"
970 # explicitly not copying index. node indices are only assigned via
971 # ExampleRecord. This prevents footguns with relying on stale indices
972 # after copying.
973 return IRNode(
974 ir_type=self.ir_type,
975 value=self.value if with_value is None else with_value,
976 kwargs=self.kwargs if with_kwargs is None else with_kwargs,
977 was_forced=self.was_forced,
978 )
979
980 @property
981 def trivial(self):
982 """
983 A node is trivial if it cannot be simplified any further. This does not
984 mean that modifying a trivial node can't produce simpler test cases when
985 viewing the tree as a whole. Just that when viewing this node in
986 isolation, this is the simplest the node can get.
987 """
988 if self.was_forced:
989 return True
990
991 if self.ir_type == "integer":
992 shrink_towards = self.kwargs["shrink_towards"]
993 min_value = self.kwargs["min_value"]
994 max_value = self.kwargs["max_value"]
995
996 if min_value is not None:
997 shrink_towards = max(min_value, shrink_towards)
998 if max_value is not None:
999 shrink_towards = min(max_value, shrink_towards)
1000
1001 return self.value == shrink_towards
1002 if self.ir_type == "float":
1003 min_value = self.kwargs["min_value"]
1004 max_value = self.kwargs["max_value"]
1005 shrink_towards = 0
1006
1007 if min_value == -math.inf and max_value == math.inf:
1008 return ir_value_equal("float", self.value, shrink_towards)
1009
1010 if (
1011 not math.isinf(min_value)
1012 and not math.isinf(max_value)
1013 and math.ceil(min_value) <= math.floor(max_value)
1014 ):
1015 # the interval contains an integer. the simplest integer is the
1016 # one closest to shrink_towards
1017 shrink_towards = max(math.ceil(min_value), shrink_towards)
1018 shrink_towards = min(math.floor(max_value), shrink_towards)
1019 return ir_value_equal("float", self.value, shrink_towards)
1020
1021 # the real answer here is "the value in [min_value, max_value] with
1022 # the lowest denominator when represented as a fraction".
1023 # It would be good to compute this correctly in the future, but it's
1024 # also not incorrect to be conservative here.
1025 return False
1026 if self.ir_type == "boolean":
1027 return self.value is False
1028 if self.ir_type == "string":
1029 # smallest size and contains only the smallest-in-shrink-order character.
1030 minimal_char = self.kwargs["intervals"].char_in_shrink_order(0)
1031 return self.value == (minimal_char * self.kwargs["min_size"])
1032 if self.ir_type == "bytes":
1033 # smallest size and all-zero value.
1034 return len(self.value) == self.kwargs["size"] and not any(self.value)
1035
1036 raise NotImplementedError(f"unhandled ir_type {self.ir_type}")
1037
1038 def __eq__(self, other):
1039 if not isinstance(other, IRNode):
1040 return NotImplemented
1041
1042 return (
1043 self.ir_type == other.ir_type
1044 and ir_value_equal(self.ir_type, self.value, other.value)
1045 and ir_kwargs_equal(self.ir_type, self.kwargs, other.kwargs)
1046 and self.was_forced == other.was_forced
1047 )
1048
1049 def __hash__(self):
1050 return hash(
1051 (
1052 self.ir_type,
1053 ir_value_key(self.ir_type, self.value),
1054 ir_kwargs_key(self.ir_type, self.kwargs),
1055 self.was_forced,
1056 )
1057 )
1058
1059 def __repr__(self):
1060 # repr to avoid "BytesWarning: str() on a bytes instance" for bytes nodes
1061 forced_marker = " [forced]" if self.was_forced else ""
1062 return f"{self.ir_type} {self.value!r}{forced_marker} {self.kwargs!r}"
1063
1064
1065def ir_value_permitted(value, ir_type, kwargs):
1066 if ir_type == "integer":
1067 min_value = kwargs["min_value"]
1068 max_value = kwargs["max_value"]
1069 shrink_towards = kwargs["shrink_towards"]
1070 if min_value is not None and value < min_value:
1071 return False
1072 if max_value is not None and value > max_value:
1073 return False
1074
1075 if (max_value is None or min_value is None) and (
1076 value - shrink_towards
1077 ).bit_length() >= 128:
1078 return False
1079
1080 return True
1081 elif ir_type == "float":
1082 if math.isnan(value):
1083 return kwargs["allow_nan"]
1084 return (
1085 sign_aware_lte(kwargs["min_value"], value)
1086 and sign_aware_lte(value, kwargs["max_value"])
1087 ) and not (0 < abs(value) < kwargs["smallest_nonzero_magnitude"])
1088 elif ir_type == "string":
1089 if len(value) < kwargs["min_size"]:
1090 return False
1091 if kwargs["max_size"] is not None and len(value) > kwargs["max_size"]:
1092 return False
1093 return all(ord(c) in kwargs["intervals"] for c in value)
1094 elif ir_type == "bytes":
1095 return len(value) == kwargs["size"]
1096 elif ir_type == "boolean":
1097 if kwargs["p"] <= 2 ** (-64):
1098 return value is False
1099 if kwargs["p"] >= (1 - 2 ** (-64)):
1100 return value is True
1101 return True
1102
1103 raise NotImplementedError(f"unhandled type {type(value)} of ir value {value}")
1104
1105
1106def ir_value_key(ir_type, v):
1107 if ir_type == "float":
1108 return float_to_int(v)
1109 return v
1110
1111
1112def ir_kwargs_key(ir_type, kwargs):
1113 if ir_type == "float":
1114 return (
1115 float_to_int(kwargs["min_value"]),
1116 float_to_int(kwargs["max_value"]),
1117 kwargs["allow_nan"],
1118 kwargs["smallest_nonzero_magnitude"],
1119 )
1120 if ir_type == "integer":
1121 return (
1122 kwargs["min_value"],
1123 kwargs["max_value"],
1124 None if kwargs["weights"] is None else tuple(kwargs["weights"]),
1125 kwargs["shrink_towards"],
1126 )
1127 return tuple(kwargs[key] for key in sorted(kwargs))
1128
1129
1130def ir_value_equal(ir_type, v1, v2):
1131 return ir_value_key(ir_type, v1) == ir_value_key(ir_type, v2)
1132
1133
1134def ir_kwargs_equal(ir_type, kwargs1, kwargs2):
1135 return ir_kwargs_key(ir_type, kwargs1) == ir_kwargs_key(ir_type, kwargs2)
1136
1137
1138@dataclass_transform()
1139@attr.s(slots=True)
1140class ConjectureResult:
1141 """Result class storing the parts of ConjectureData that we
1142 will care about after the original ConjectureData has outlived its
1143 usefulness."""
1144
1145 status: Status = attr.ib()
1146 interesting_origin: Optional[InterestingOrigin] = attr.ib()
1147 buffer: bytes = attr.ib()
1148 # some ConjectureDatas pass through the ir and some pass through buffers.
1149 # the ir does not drive its result through the buffer, which means blocks/examples
1150 # may differ (I think for forced values?) even when the buffer is the same.
1151 # I don't *think* anything was relying on anything but .buffer for result equality,
1152 # though that assumption may be leaning on flakiness detection invariants.
1153 #
1154 # If we consider blocks or examples in equality checks, multiple semantically equal
1155 # results get stored in e.g. the pareto front.
1156 blocks: Blocks = attr.ib(eq=False)
1157 output: str = attr.ib()
1158 extra_information: Optional[ExtraInformation] = attr.ib()
1159 has_discards: bool = attr.ib()
1160 target_observations: TargetObservations = attr.ib()
1161 tags: FrozenSet[StructuralCoverageTag] = attr.ib()
1162 forced_indices: FrozenSet[int] = attr.ib(repr=False)
1163 examples: Examples = attr.ib(repr=False, eq=False)
1164 arg_slices: Set[Tuple[int, int]] = attr.ib(repr=False)
1165 slice_comments: Dict[Tuple[int, int], str] = attr.ib(repr=False)
1166 invalid_at: Optional[InvalidAt] = attr.ib(repr=False)
1167
1168 index: int = attr.ib(init=False)
1169
1170 def __attrs_post_init__(self) -> None:
1171 self.index = len(self.buffer)
1172 self.forced_indices = frozenset(self.forced_indices)
1173
1174 def as_result(self) -> "ConjectureResult":
1175 return self
1176
1177
1178# Masks for masking off the first byte of an n-bit buffer.
1179# The appropriate mask is stored at position n % 8.
1180BYTE_MASKS = [(1 << n) - 1 for n in range(8)]
1181BYTE_MASKS[0] = 255
1182
1183
1184class PrimitiveProvider(abc.ABC):
1185 # This is the low-level interface which would also be implemented
1186 # by e.g. CrossHair, by an Atheris-hypothesis integration, etc.
1187 # We'd then build the structured tree handling, database and replay
1188 # support, etc. on top of this - so all backends get those for free.
1189 #
1190 # See https://github.com/HypothesisWorks/hypothesis/issues/3086
1191
1192 # How long a provider instance is used for. One of test_function or
1193 # test_case. Defaults to test_function.
1194 #
1195 # If test_function, a single provider instance will be instantiated and used
1196 # for the entirety of each test function. I.e., roughly one provider per
1197 # @given annotation. This can be useful if you need to track state over many
1198 # executions to a test function.
1199 #
1200 # This lifetime will cause None to be passed for the ConjectureData object
1201 # in PrimitiveProvider.__init__, because that object is instantiated per
1202 # test case.
1203 #
1204 # If test_case, a new provider instance will be instantiated and used each
1205 # time hypothesis tries to generate a new input to the test function. This
1206 # lifetime can access the passed ConjectureData object.
1207 #
1208 # Non-hypothesis providers probably want to set a lifetime of test_function.
1209 lifetime = "test_function"
1210
1211 def __init__(self, conjecturedata: Optional["ConjectureData"], /) -> None:
1212 self._cd = conjecturedata
1213
1214 def post_test_case_hook(self, value):
1215 # hook for providers to modify values returned by draw_* after a full
1216 # test case concludes. Originally exposed for crosshair to reify its
1217 # symbolic values into actual values.
1218 # I'm not tied to this exact function name or design.
1219 return value
1220
1221 def per_test_case_context_manager(self):
1222 return contextlib.nullcontext()
1223
1224 @abc.abstractmethod
1225 def draw_boolean(
1226 self,
1227 p: float = 0.5,
1228 *,
1229 forced: Optional[bool] = None,
1230 fake_forced: bool = False,
1231 ) -> bool:
1232 raise NotImplementedError
1233
1234 @abc.abstractmethod
1235 def draw_integer(
1236 self,
1237 min_value: Optional[int] = None,
1238 max_value: Optional[int] = None,
1239 *,
1240 # weights are for choosing an element index from a bounded range
1241 weights: Optional[Sequence[float]] = None,
1242 shrink_towards: int = 0,
1243 forced: Optional[int] = None,
1244 fake_forced: bool = False,
1245 ) -> int:
1246 raise NotImplementedError
1247
1248 @abc.abstractmethod
1249 def draw_float(
1250 self,
1251 *,
1252 min_value: float = -math.inf,
1253 max_value: float = math.inf,
1254 allow_nan: bool = True,
1255 smallest_nonzero_magnitude: float,
1256 # TODO: consider supporting these float widths at the IR level in the
1257 # future.
1258 # width: Literal[16, 32, 64] = 64,
1259 # exclude_min and exclude_max handled higher up,
1260 forced: Optional[float] = None,
1261 fake_forced: bool = False,
1262 ) -> float:
1263 raise NotImplementedError
1264
1265 @abc.abstractmethod
1266 def draw_string(
1267 self,
1268 intervals: IntervalSet,
1269 *,
1270 min_size: int = 0,
1271 max_size: Optional[int] = None,
1272 forced: Optional[str] = None,
1273 fake_forced: bool = False,
1274 ) -> str:
1275 raise NotImplementedError
1276
1277 @abc.abstractmethod
1278 def draw_bytes(
1279 self, size: int, *, forced: Optional[bytes] = None, fake_forced: bool = False
1280 ) -> bytes:
1281 raise NotImplementedError
1282
1283
1284class HypothesisProvider(PrimitiveProvider):
1285 lifetime = "test_case"
1286
1287 def __init__(self, conjecturedata: Optional["ConjectureData"], /):
1288 assert conjecturedata is not None
1289 super().__init__(conjecturedata)
1290
1291 def draw_boolean(
1292 self,
1293 p: float = 0.5,
1294 *,
1295 forced: Optional[bool] = None,
1296 fake_forced: bool = False,
1297 ) -> bool:
1298 """Return True with probability p (assuming a uniform generator),
1299 shrinking towards False. If ``forced`` is set to a non-None value, this
1300 will always return that value but will write choices appropriate to having
1301 drawn that value randomly."""
1302 # Note that this could also be implemented in terms of draw_integer().
1303
1304 assert self._cd is not None
1305 # NB this function is vastly more complicated than it may seem reasonable
1306 # for it to be. This is because it is used in a lot of places and it's
1307 # important for it to shrink well, so it's worth the engineering effort.
1308
1309 if p <= 0 or p >= 1:
1310 bits = 1
1311 else:
1312 # When there is a meaningful draw, in order to shrink well we will
1313 # set things up so that 0 and 1 always correspond to False and True
1314 # respectively. This means we want enough bits available that in a
1315 # draw we will always have at least one truthy value and one falsey
1316 # value.
1317 bits = math.ceil(-math.log(min(p, 1 - p), 2))
1318 # In order to avoid stupidly large draws where the probability is
1319 # effectively zero or one, we treat probabilities of under 2^-64 to be
1320 # effectively zero.
1321 if bits > 64:
1322 # There isn't enough precision near one for this to occur for values
1323 # far from 0.
1324 p = 0.0
1325 bits = 1
1326
1327 size = 2**bits
1328
1329 while True:
1330 # The logic here is a bit complicated and special cased to make it
1331 # play better with the shrinker.
1332
1333 # We imagine partitioning the real interval [0, 1] into 2**n equal parts
1334 # and looking at each part and whether its interior is wholly <= p
1335 # or wholly >= p. At most one part can be neither.
1336
1337 # We then pick a random part. If it's wholly on one side or the other
1338 # of p then we use that as the answer. If p is contained in the
1339 # interval then we start again with a new probability that is given
1340 # by the fraction of that interval that was <= our previous p.
1341
1342 # We then take advantage of the fact that we have control of the
1343 # labelling to make this shrink better, using the following tricks:
1344
1345 # If p is <= 0 or >= 1 the result of this coin is certain. We make sure
1346 # to write a byte to the data stream anyway so that these don't cause
1347 # difficulties when shrinking.
1348 if p <= 0:
1349 self._cd.draw_bits(1, forced=0)
1350 result = False
1351 elif p >= 1:
1352 self._cd.draw_bits(1, forced=1)
1353 result = True
1354 else:
1355 falsey = floor(size * (1 - p))
1356 truthy = floor(size * p)
1357 remainder = size * p - truthy
1358
1359 if falsey + truthy == size:
1360 partial = False
1361 else:
1362 partial = True
1363
1364 i = self._cd.draw_bits(
1365 bits,
1366 forced=None if forced is None else int(forced),
1367 fake_forced=fake_forced,
1368 )
1369
1370 # We always choose the region that causes us to repeat the loop as
1371 # the maximum value, so that shrinking the drawn bits never causes
1372 # us to need to draw more self._cd.
1373 if partial and i == size - 1:
1374 p = remainder
1375 continue
1376 if falsey == 0:
1377 # Every other partition is truthy, so the result is true
1378 result = True
1379 elif truthy == 0:
1380 # Every other partition is falsey, so the result is false
1381 result = False
1382 elif i <= 1:
1383 # We special case so that zero is always false and 1 is always
1384 # true which makes shrinking easier because we can always
1385 # replace a truthy block with 1. This has the slightly weird
1386 # property that shrinking from 2 to 1 can cause the result to
1387 # grow, but the shrinker always tries 0 and 1 first anyway, so
1388 # this will usually be fine.
1389 result = bool(i)
1390 else:
1391 # Originally everything in the region 0 <= i < falsey was false
1392 # and everything above was true. We swapped one truthy element
1393 # into this region, so the region becomes 0 <= i <= falsey
1394 # except for i = 1. We know i > 1 here, so the test for truth
1395 # becomes i > falsey.
1396 result = i > falsey
1397
1398 break
1399 return result
1400
1401 def draw_integer(
1402 self,
1403 min_value: Optional[int] = None,
1404 max_value: Optional[int] = None,
1405 *,
1406 # weights are for choosing an element index from a bounded range
1407 weights: Optional[Sequence[float]] = None,
1408 shrink_towards: int = 0,
1409 forced: Optional[int] = None,
1410 fake_forced: bool = False,
1411 ) -> int:
1412 assert self._cd is not None
1413
1414 if min_value is not None:
1415 shrink_towards = max(min_value, shrink_towards)
1416 if max_value is not None:
1417 shrink_towards = min(max_value, shrink_towards)
1418
1419 # This is easy to build on top of our existing conjecture utils,
1420 # and it's easy to build sampled_from and weighted_coin on this.
1421 if weights is not None:
1422 assert min_value is not None
1423 assert max_value is not None
1424
1425 sampler = Sampler(weights, observe=False)
1426 gap = max_value - shrink_towards
1427
1428 forced_idx = None
1429 if forced is not None:
1430 if forced >= shrink_towards:
1431 forced_idx = forced - shrink_towards
1432 else:
1433 forced_idx = shrink_towards + gap - forced
1434 idx = sampler.sample(self._cd, forced=forced_idx, fake_forced=fake_forced)
1435
1436 # For range -2..2, interpret idx = 0..4 as [0, 1, 2, -1, -2]
1437 if idx <= gap:
1438 return shrink_towards + idx
1439 else:
1440 return shrink_towards - (idx - gap)
1441
1442 if min_value is None and max_value is None:
1443 return self._draw_unbounded_integer(forced=forced, fake_forced=fake_forced)
1444
1445 if min_value is None:
1446 assert max_value is not None # make mypy happy
1447 probe = max_value + 1
1448 while max_value < probe:
1449 probe = shrink_towards + self._draw_unbounded_integer(
1450 forced=None if forced is None else forced - shrink_towards,
1451 fake_forced=fake_forced,
1452 )
1453 return probe
1454
1455 if max_value is None:
1456 assert min_value is not None
1457 probe = min_value - 1
1458 while probe < min_value:
1459 probe = shrink_towards + self._draw_unbounded_integer(
1460 forced=None if forced is None else forced - shrink_towards,
1461 fake_forced=fake_forced,
1462 )
1463 return probe
1464
1465 return self._draw_bounded_integer(
1466 min_value,
1467 max_value,
1468 center=shrink_towards,
1469 forced=forced,
1470 fake_forced=fake_forced,
1471 )
1472
1473 def draw_float(
1474 self,
1475 *,
1476 min_value: float = -math.inf,
1477 max_value: float = math.inf,
1478 allow_nan: bool = True,
1479 smallest_nonzero_magnitude: float,
1480 # TODO: consider supporting these float widths at the IR level in the
1481 # future.
1482 # width: Literal[16, 32, 64] = 64,
1483 # exclude_min and exclude_max handled higher up,
1484 forced: Optional[float] = None,
1485 fake_forced: bool = False,
1486 ) -> float:
1487 (
1488 sampler,
1489 forced_sign_bit,
1490 neg_clamper,
1491 pos_clamper,
1492 nasty_floats,
1493 ) = self._draw_float_init_logic(
1494 min_value=min_value,
1495 max_value=max_value,
1496 allow_nan=allow_nan,
1497 smallest_nonzero_magnitude=smallest_nonzero_magnitude,
1498 )
1499
1500 assert self._cd is not None
1501
1502 while True:
1503 # If `forced in nasty_floats`, then `forced` was *probably*
1504 # generated by drawing a nonzero index from the sampler. However, we
1505 # have no obligation to generate it that way when forcing. In particular,
1506 # i == 0 is able to produce all possible floats, and the forcing
1507 # logic is simpler if we assume this choice.
1508 forced_i = None if forced is None else 0
1509 i = (
1510 sampler.sample(self._cd, forced=forced_i, fake_forced=fake_forced)
1511 if sampler
1512 else 0
1513 )
1514 if i == 0:
1515 result = self._draw_float(
1516 forced_sign_bit=forced_sign_bit,
1517 forced=forced,
1518 fake_forced=fake_forced,
1519 )
1520 if allow_nan and math.isnan(result):
1521 clamped = result
1522 elif math.copysign(1.0, result) == -1:
1523 assert neg_clamper is not None
1524 clamped = -neg_clamper(-result)
1525 else:
1526 assert pos_clamper is not None
1527 clamped = pos_clamper(result)
1528 if clamped != result and not (math.isnan(result) and allow_nan):
1529 self._draw_float(forced=clamped, fake_forced=fake_forced)
1530 result = clamped
1531 else:
1532 result = nasty_floats[i - 1]
1533 # nan values generated via int_to_float break list membership:
1534 #
1535 # >>> n = 18444492273895866368
1536 # >>> assert math.isnan(int_to_float(n))
1537 # >>> assert int_to_float(n) not in [int_to_float(n)]
1538 #
1539 # because int_to_float nans are not equal in the sense of either
1540 # `a == b` or `a is b`.
1541 #
1542 # This can lead to flaky errors when collections require unique
1543 # floats. I think what is happening is that in some places we
1544 # provide math.nan, and in others we provide
1545 # int_to_float(float_to_int(math.nan)), and which one gets used
1546 # is not deterministic across test iterations.
1547 #
1548 # As a (temporary?) fix, we'll *always* generate nan values which
1549 # are not equal in the identity sense.
1550 #
1551 # see also https://github.com/HypothesisWorks/hypothesis/issues/3926.
1552 if math.isnan(result):
1553 result = int_to_float(float_to_int(result))
1554
1555 self._draw_float(forced=result, fake_forced=fake_forced)
1556
1557 return result
1558
1559 def draw_string(
1560 self,
1561 intervals: IntervalSet,
1562 *,
1563 min_size: int = 0,
1564 max_size: Optional[int] = None,
1565 forced: Optional[str] = None,
1566 fake_forced: bool = False,
1567 ) -> str:
1568 if max_size is None:
1569 max_size = DRAW_STRING_DEFAULT_MAX_SIZE
1570
1571 assert forced is None or min_size <= len(forced) <= max_size
1572 assert self._cd is not None
1573
1574 average_size = min(
1575 max(min_size * 2, min_size + 5),
1576 0.5 * (min_size + max_size),
1577 )
1578
1579 chars = []
1580 elements = many(
1581 self._cd,
1582 min_size=min_size,
1583 max_size=max_size,
1584 average_size=average_size,
1585 forced=None if forced is None else len(forced),
1586 fake_forced=fake_forced,
1587 observe=False,
1588 )
1589 while elements.more():
1590 forced_i: Optional[int] = None
1591 if forced is not None:
1592 c = forced[elements.count - 1]
1593 forced_i = intervals.index_from_char_in_shrink_order(c)
1594
1595 if len(intervals) > 256:
1596 if self.draw_boolean(
1597 0.2,
1598 forced=None if forced_i is None else forced_i > 255,
1599 fake_forced=fake_forced,
1600 ):
1601 i = self._draw_bounded_integer(
1602 256,
1603 len(intervals) - 1,
1604 forced=forced_i,
1605 fake_forced=fake_forced,
1606 )
1607 else:
1608 i = self._draw_bounded_integer(
1609 0, 255, forced=forced_i, fake_forced=fake_forced
1610 )
1611 else:
1612 i = self._draw_bounded_integer(
1613 0, len(intervals) - 1, forced=forced_i, fake_forced=fake_forced
1614 )
1615
1616 chars.append(intervals.char_in_shrink_order(i))
1617
1618 return "".join(chars)
1619
1620 def draw_bytes(
1621 self, size: int, *, forced: Optional[bytes] = None, fake_forced: bool = False
1622 ) -> bytes:
1623 forced_i = None
1624 if forced is not None:
1625 forced_i = int_from_bytes(forced)
1626 size = len(forced)
1627
1628 assert self._cd is not None
1629 return self._cd.draw_bits(
1630 8 * size, forced=forced_i, fake_forced=fake_forced
1631 ).to_bytes(size, "big")
1632
1633 def _draw_float(
1634 self,
1635 forced_sign_bit: Optional[int] = None,
1636 *,
1637 forced: Optional[float] = None,
1638 fake_forced: bool = False,
1639 ) -> float:
1640 """
1641 Helper for draw_float which draws a random 64-bit float.
1642 """
1643 assert self._cd is not None
1644
1645 if forced is not None:
1646 # sign_aware_lte(forced, -0.0) does not correctly handle the
1647 # math.nan case here.
1648 forced_sign_bit = math.copysign(1, forced) == -1
1649 is_negative = self._cd.draw_bits(
1650 1, forced=forced_sign_bit, fake_forced=fake_forced
1651 )
1652 f = lex_to_float(
1653 self._cd.draw_bits(
1654 64,
1655 forced=None if forced is None else float_to_lex(abs(forced)),
1656 fake_forced=fake_forced,
1657 )
1658 )
1659 return -f if is_negative else f
1660
1661 def _draw_unbounded_integer(
1662 self, *, forced: Optional[int] = None, fake_forced: bool = False
1663 ) -> int:
1664 assert self._cd is not None
1665 forced_i = None
1666 if forced is not None:
1667 # Using any bucket large enough to contain this integer would be a
1668 # valid way to force it. This is because an n bit integer could have
1669 # been drawn from a bucket of size n, or from any bucket of size
1670 # m > n.
1671 # We'll always choose the smallest eligible bucket here.
1672
1673 # We need an extra bit to handle forced signed integers. INT_SIZES
1674 # is interpreted as unsigned sizes.
1675 bit_size = forced.bit_length() + 1
1676 size = min(size for size in INT_SIZES if bit_size <= size)
1677 forced_i = INT_SIZES.index(size)
1678
1679 size = INT_SIZES[
1680 INT_SIZES_SAMPLER.sample(self._cd, forced=forced_i, fake_forced=fake_forced)
1681 ]
1682
1683 forced_r = None
1684 if forced is not None:
1685 forced_r = forced
1686 forced_r <<= 1
1687 if forced < 0:
1688 forced_r = -forced_r
1689 forced_r |= 1
1690
1691 r = self._cd.draw_bits(size, forced=forced_r, fake_forced=fake_forced)
1692 sign = r & 1
1693 r >>= 1
1694 if sign:
1695 r = -r
1696 return r
1697
1698 def _draw_bounded_integer(
1699 self,
1700 lower: int,
1701 upper: int,
1702 *,
1703 center: Optional[int] = None,
1704 forced: Optional[int] = None,
1705 fake_forced: bool = False,
1706 _vary_effective_size: bool = True,
1707 ) -> int:
1708 assert lower <= upper
1709 assert forced is None or lower <= forced <= upper
1710 assert self._cd is not None
1711 if lower == upper:
1712 # Write a value even when this is trivial so that when a bound depends
1713 # on other values we don't suddenly disappear when the gap shrinks to
1714 # zero - if that happens then often the data stream becomes misaligned
1715 # and we fail to shrink in cases where we really should be able to.
1716 self._cd.draw_bits(1, forced=0)
1717 return int(lower)
1718
1719 if center is None:
1720 center = lower
1721 center = min(max(center, lower), upper)
1722
1723 if center == upper:
1724 above = False
1725 elif center == lower:
1726 above = True
1727 else:
1728 force_above = None if forced is None else forced < center
1729 above = not self._cd.draw_bits(
1730 1, forced=force_above, fake_forced=fake_forced
1731 )
1732
1733 if above:
1734 gap = upper - center
1735 else:
1736 gap = center - lower
1737
1738 assert gap > 0
1739
1740 bits = gap.bit_length()
1741 probe = gap + 1
1742
1743 if (
1744 bits > 24
1745 and _vary_effective_size
1746 and self.draw_boolean(
1747 7 / 8, forced=None if forced is None else False, fake_forced=fake_forced
1748 )
1749 ):
1750 # For large ranges, we combine the uniform random distribution from draw_bits
1751 # with a weighting scheme with moderate chance. Cutoff at 2 ** 24 so that our
1752 # choice of unicode characters is uniform but the 32bit distribution is not.
1753 idx = INT_SIZES_SAMPLER.sample(self._cd)
1754 force_bits = min(bits, INT_SIZES[idx])
1755 forced = self._draw_bounded_integer(
1756 lower=center if above else max(lower, center - 2**force_bits - 1),
1757 upper=center if not above else min(upper, center + 2**force_bits - 1),
1758 _vary_effective_size=False,
1759 )
1760
1761 assert lower <= forced <= upper
1762
1763 while probe > gap:
1764 probe = self._cd.draw_bits(
1765 bits,
1766 forced=None if forced is None else abs(forced - center),
1767 fake_forced=fake_forced,
1768 )
1769
1770 if above:
1771 result = center + probe
1772 else:
1773 result = center - probe
1774
1775 assert lower <= result <= upper
1776 assert forced is None or result == forced, (result, forced, center, above)
1777 return result
1778
1779 @classmethod
1780 def _draw_float_init_logic(
1781 cls,
1782 *,
1783 min_value: float,
1784 max_value: float,
1785 allow_nan: bool,
1786 smallest_nonzero_magnitude: float,
1787 ) -> Tuple[
1788 Optional[Sampler],
1789 Optional[Literal[0, 1]],
1790 Optional[Callable[[float], float]],
1791 Optional[Callable[[float], float]],
1792 List[float],
1793 ]:
1794 """
1795 Caches initialization logic for draw_float, as an alternative to
1796 computing this for *every* float draw.
1797 """
1798 # float_to_int allows us to distinguish between e.g. -0.0 and 0.0,
1799 # even in light of hash(-0.0) == hash(0.0) and -0.0 == 0.0.
1800 key = (
1801 float_to_int(min_value),
1802 float_to_int(max_value),
1803 allow_nan,
1804 float_to_int(smallest_nonzero_magnitude),
1805 )
1806 if key in FLOAT_INIT_LOGIC_CACHE:
1807 return FLOAT_INIT_LOGIC_CACHE[key]
1808
1809 result = cls._compute_draw_float_init_logic(
1810 min_value=min_value,
1811 max_value=max_value,
1812 allow_nan=allow_nan,
1813 smallest_nonzero_magnitude=smallest_nonzero_magnitude,
1814 )
1815 FLOAT_INIT_LOGIC_CACHE[key] = result
1816 return result
1817
1818 @staticmethod
1819 def _compute_draw_float_init_logic(
1820 *,
1821 min_value: float,
1822 max_value: float,
1823 allow_nan: bool,
1824 smallest_nonzero_magnitude: float,
1825 ) -> Tuple[
1826 Optional[Sampler],
1827 Optional[Literal[0, 1]],
1828 Optional[Callable[[float], float]],
1829 Optional[Callable[[float], float]],
1830 List[float],
1831 ]:
1832 if smallest_nonzero_magnitude == 0.0: # pragma: no cover
1833 raise FloatingPointError(
1834 "Got allow_subnormal=True, but we can't represent subnormal floats "
1835 "right now, in violation of the IEEE-754 floating-point "
1836 "specification. This is usually because something was compiled with "
1837 "-ffast-math or a similar option, which sets global processor state. "
1838 "See https://simonbyrne.github.io/notes/fastmath/ for a more detailed "
1839 "writeup - and good luck!"
1840 )
1841
1842 def permitted(f):
1843 assert isinstance(f, float)
1844 if math.isnan(f):
1845 return allow_nan
1846 if 0 < abs(f) < smallest_nonzero_magnitude:
1847 return False
1848 return sign_aware_lte(min_value, f) and sign_aware_lte(f, max_value)
1849
1850 boundary_values = [
1851 min_value,
1852 next_up(min_value),
1853 min_value + 1,
1854 max_value - 1,
1855 next_down(max_value),
1856 max_value,
1857 ]
1858 nasty_floats = [f for f in NASTY_FLOATS + boundary_values if permitted(f)]
1859 weights = [0.2 * len(nasty_floats)] + [0.8] * len(nasty_floats)
1860 sampler = Sampler(weights, observe=False) if nasty_floats else None
1861
1862 pos_clamper = neg_clamper = None
1863 if sign_aware_lte(0.0, max_value):
1864 pos_min = max(min_value, smallest_nonzero_magnitude)
1865 allow_zero = sign_aware_lte(min_value, 0.0)
1866 pos_clamper = make_float_clamper(pos_min, max_value, allow_zero=allow_zero)
1867 if sign_aware_lte(min_value, -0.0):
1868 neg_max = min(max_value, -smallest_nonzero_magnitude)
1869 allow_zero = sign_aware_lte(-0.0, max_value)
1870 neg_clamper = make_float_clamper(
1871 -neg_max, -min_value, allow_zero=allow_zero
1872 )
1873
1874 forced_sign_bit: Optional[Literal[0, 1]] = None
1875 if (pos_clamper is None) != (neg_clamper is None):
1876 forced_sign_bit = 1 if neg_clamper else 0
1877
1878 return (sampler, forced_sign_bit, neg_clamper, pos_clamper, nasty_floats)
1879
1880
1881# The set of available `PrimitiveProvider`s, by name. Other libraries, such as
1882# crosshair, can implement this interface and add themselves; at which point users
1883# can configure which backend to use via settings. Keys are the name of the library,
1884# which doubles as the backend= setting, and values are importable class names.
1885#
1886# NOTE: this is a temporary interface. We DO NOT promise to continue supporting it!
1887# (but if you want to experiment and don't mind breakage, here you go)
1888AVAILABLE_PROVIDERS = {
1889 "hypothesis": "hypothesis.internal.conjecture.data.HypothesisProvider",
1890}
1891
1892
1893class ConjectureData:
1894 @classmethod
1895 def for_buffer(
1896 cls,
1897 buffer: Union[List[int], bytes],
1898 *,
1899 observer: Optional[DataObserver] = None,
1900 provider: Union[type, PrimitiveProvider] = HypothesisProvider,
1901 ) -> "ConjectureData":
1902 return cls(
1903 len(buffer), buffer, random=None, observer=observer, provider=provider
1904 )
1905
1906 @classmethod
1907 def for_ir_tree(
1908 cls,
1909 ir_tree_prefix: List[IRNode],
1910 *,
1911 observer: Optional[DataObserver] = None,
1912 provider: Union[type, PrimitiveProvider] = HypothesisProvider,
1913 max_length: Optional[int] = None,
1914 ) -> "ConjectureData":
1915 from hypothesis.internal.conjecture.engine import BUFFER_SIZE
1916
1917 return cls(
1918 max_length=BUFFER_SIZE if max_length is None else max_length,
1919 prefix=b"",
1920 random=None,
1921 ir_tree_prefix=ir_tree_prefix,
1922 observer=observer,
1923 provider=provider,
1924 )
1925
1926 def __init__(
1927 self,
1928 max_length: int,
1929 prefix: Union[List[int], bytes, bytearray],
1930 *,
1931 random: Optional[Random],
1932 observer: Optional[DataObserver] = None,
1933 provider: Union[type, PrimitiveProvider] = HypothesisProvider,
1934 ir_tree_prefix: Optional[List[IRNode]] = None,
1935 ) -> None:
1936 if observer is None:
1937 observer = DataObserver()
1938 assert isinstance(observer, DataObserver)
1939 self._bytes_drawn = 0
1940 self.observer = observer
1941 self.max_length = max_length
1942 self.is_find = False
1943 self.overdraw = 0
1944 self.__prefix = bytes(prefix)
1945 self.__random = random
1946
1947 if ir_tree_prefix is None:
1948 assert random is not None or max_length <= len(prefix)
1949
1950 self.blocks = Blocks(self)
1951 self.buffer: "Union[bytes, bytearray]" = bytearray()
1952 self.index = 0
1953 self.output = ""
1954 self.status = Status.VALID
1955 self.frozen = False
1956 global global_test_counter
1957 self.testcounter = global_test_counter
1958 global_test_counter += 1
1959 self.start_time = time.perf_counter()
1960 self.gc_start_time = gc_cumulative_time()
1961 self.events: Dict[str, Union[str, int, float]] = {}
1962 self.forced_indices: "Set[int]" = set()
1963 self.interesting_origin: Optional[InterestingOrigin] = None
1964 self.draw_times: "Dict[str, float]" = {}
1965 self._stateful_run_times: "DefaultDict[str, float]" = defaultdict(float)
1966 self.max_depth = 0
1967 self.has_discards = False
1968
1969 self.provider = provider(self) if isinstance(provider, type) else provider
1970 assert isinstance(self.provider, PrimitiveProvider)
1971
1972 self.__result: "Optional[ConjectureResult]" = None
1973
1974 # Observations used for targeted search. They'll be aggregated in
1975 # ConjectureRunner.generate_new_examples and fed to TargetSelector.
1976 self.target_observations: TargetObservations = {}
1977
1978 # Tags which indicate something about which part of the search space
1979 # this example is in. These are used to guide generation.
1980 self.tags: "Set[StructuralCoverageTag]" = set()
1981 self.labels_for_structure_stack: "List[Set[int]]" = []
1982
1983 # Normally unpopulated but we need this in the niche case
1984 # that self.as_result() is Overrun but we still want the
1985 # examples for reporting purposes.
1986 self.__examples: "Optional[Examples]" = None
1987
1988 # We want the top level example to have depth 0, so we start
1989 # at -1.
1990 self.depth = -1
1991 self.__example_record = ExampleRecord()
1992
1993 # Slice indices for discrete reportable parts that which-parts-matter can
1994 # try varying, to report if the minimal example always fails anyway.
1995 self.arg_slices: Set[Tuple[int, int]] = set()
1996 self.slice_comments: Dict[Tuple[int, int], str] = {}
1997 self._observability_args: Dict[str, Any] = {}
1998 self._observability_predicates: defaultdict = defaultdict(
1999 lambda: {"satisfied": 0, "unsatisfied": 0}
2000 )
2001
2002 self.extra_information = ExtraInformation()
2003
2004 self.ir_tree_nodes = ir_tree_prefix
2005 self.invalid_at: Optional[InvalidAt] = None
2006 self._node_index = 0
2007 self.start_example(TOP_LABEL)
2008
2009 def __repr__(self):
2010 return "ConjectureData(%s, %d bytes%s)" % (
2011 self.status.name,
2012 len(self.buffer),
2013 ", frozen" if self.frozen else "",
2014 )
2015
2016 # A bit of explanation of the `observe` and `fake_forced` arguments in our
2017 # draw_* functions.
2018 #
2019 # There are two types of draws: sub-ir and super-ir. For instance, some ir
2020 # nodes use `many`, which in turn calls draw_boolean. But some strategies
2021 # also use many, at the super-ir level. We don't want to write sub-ir draws
2022 # to the DataTree (and consequently use them when computing novel prefixes),
2023 # since they are fully recorded by writing the ir node itself.
2024 # But super-ir draws are not included in the ir node, so we do want to write
2025 # these to the tree.
2026 #
2027 # `observe` formalizes this distinction. The draw will only be written to
2028 # the DataTree if observe is True.
2029 #
2030 # `fake_forced` deals with a different problem. We use `forced=` to convert
2031 # ir prefixes, which are potentially from other backends, into our backing
2032 # bits representation. This works fine, except using `forced=` in this way
2033 # also sets `was_forced=True` for all blocks, even those that weren't forced
2034 # in the traditional way. The shrinker chokes on this due to thinking that
2035 # nothing can be modified.
2036 #
2037 # Setting `fake_forced` to true says that yes, we want to force a particular
2038 # value to be returned, but we don't want to treat that block as fixed for
2039 # e.g. the shrinker.
2040
2041 def draw_integer(
2042 self,
2043 min_value: Optional[int] = None,
2044 max_value: Optional[int] = None,
2045 *,
2046 # weights are for choosing an element index from a bounded range
2047 weights: Optional[Sequence[float]] = None,
2048 shrink_towards: int = 0,
2049 forced: Optional[int] = None,
2050 fake_forced: bool = False,
2051 observe: bool = True,
2052 ) -> int:
2053 # Validate arguments
2054 if weights is not None:
2055 assert min_value is not None
2056 assert max_value is not None
2057 width = max_value - min_value + 1
2058 assert width <= 255 # arbitrary practical limit
2059 assert len(weights) == width
2060
2061 if forced is not None and (min_value is None or max_value is None):
2062 # We draw `forced=forced - shrink_towards` here internally. If that
2063 # grows larger than a 128 bit signed integer, we can't represent it.
2064 # Disallow this combination for now.
2065 # Note that bit_length() = 128 -> signed bit size = 129.
2066 assert (forced - shrink_towards).bit_length() < 128
2067 if forced is not None and min_value is not None:
2068 assert min_value <= forced
2069 if forced is not None and max_value is not None:
2070 assert forced <= max_value
2071
2072 kwargs: IntegerKWargs = self._pooled_kwargs(
2073 "integer",
2074 {
2075 "min_value": min_value,
2076 "max_value": max_value,
2077 "weights": weights,
2078 "shrink_towards": shrink_towards,
2079 },
2080 )
2081
2082 if self.ir_tree_nodes is not None and observe:
2083 node = self._pop_ir_tree_node("integer", kwargs, forced=forced)
2084 if forced is None:
2085 assert isinstance(node.value, int)
2086 forced = node.value
2087 fake_forced = True
2088
2089 value = self.provider.draw_integer(
2090 **kwargs, forced=forced, fake_forced=fake_forced
2091 )
2092 if observe:
2093 self.observer.draw_integer(
2094 value, kwargs=kwargs, was_forced=forced is not None and not fake_forced
2095 )
2096 self.__example_record.record_ir_draw(
2097 "integer",
2098 value,
2099 kwargs=kwargs,
2100 was_forced=forced is not None and not fake_forced,
2101 )
2102 return value
2103
2104 def draw_float(
2105 self,
2106 min_value: float = -math.inf,
2107 max_value: float = math.inf,
2108 *,
2109 allow_nan: bool = True,
2110 smallest_nonzero_magnitude: float = SMALLEST_SUBNORMAL,
2111 # TODO: consider supporting these float widths at the IR level in the
2112 # future.
2113 # width: Literal[16, 32, 64] = 64,
2114 # exclude_min and exclude_max handled higher up,
2115 forced: Optional[float] = None,
2116 fake_forced: bool = False,
2117 observe: bool = True,
2118 ) -> float:
2119 assert smallest_nonzero_magnitude > 0
2120 assert not math.isnan(min_value)
2121 assert not math.isnan(max_value)
2122
2123 if forced is not None:
2124 assert allow_nan or not math.isnan(forced)
2125 assert math.isnan(forced) or (
2126 sign_aware_lte(min_value, forced) and sign_aware_lte(forced, max_value)
2127 )
2128
2129 kwargs: FloatKWargs = self._pooled_kwargs(
2130 "float",
2131 {
2132 "min_value": min_value,
2133 "max_value": max_value,
2134 "allow_nan": allow_nan,
2135 "smallest_nonzero_magnitude": smallest_nonzero_magnitude,
2136 },
2137 )
2138
2139 if self.ir_tree_nodes is not None and observe:
2140 node = self._pop_ir_tree_node("float", kwargs, forced=forced)
2141 if forced is None:
2142 assert isinstance(node.value, float)
2143 forced = node.value
2144 fake_forced = True
2145
2146 value = self.provider.draw_float(
2147 **kwargs, forced=forced, fake_forced=fake_forced
2148 )
2149 if observe:
2150 self.observer.draw_float(
2151 value, kwargs=kwargs, was_forced=forced is not None and not fake_forced
2152 )
2153 self.__example_record.record_ir_draw(
2154 "float",
2155 value,
2156 kwargs=kwargs,
2157 was_forced=forced is not None and not fake_forced,
2158 )
2159 return value
2160
2161 def draw_string(
2162 self,
2163 intervals: IntervalSet,
2164 *,
2165 min_size: int = 0,
2166 max_size: Optional[int] = None,
2167 forced: Optional[str] = None,
2168 fake_forced: bool = False,
2169 observe: bool = True,
2170 ) -> str:
2171 assert forced is None or min_size <= len(forced)
2172
2173 kwargs: StringKWargs = self._pooled_kwargs(
2174 "string",
2175 {
2176 "intervals": intervals,
2177 "min_size": min_size,
2178 "max_size": max_size,
2179 },
2180 )
2181 if self.ir_tree_nodes is not None and observe:
2182 node = self._pop_ir_tree_node("string", kwargs, forced=forced)
2183 if forced is None:
2184 assert isinstance(node.value, str)
2185 forced = node.value
2186 fake_forced = True
2187
2188 value = self.provider.draw_string(
2189 **kwargs, forced=forced, fake_forced=fake_forced
2190 )
2191 if observe:
2192 self.observer.draw_string(
2193 value, kwargs=kwargs, was_forced=forced is not None and not fake_forced
2194 )
2195 self.__example_record.record_ir_draw(
2196 "string",
2197 value,
2198 kwargs=kwargs,
2199 was_forced=forced is not None and not fake_forced,
2200 )
2201 return value
2202
2203 def draw_bytes(
2204 self,
2205 # TODO move to min_size and max_size here.
2206 size: int,
2207 *,
2208 forced: Optional[bytes] = None,
2209 fake_forced: bool = False,
2210 observe: bool = True,
2211 ) -> bytes:
2212 assert forced is None or len(forced) == size
2213 assert size >= 0
2214
2215 kwargs: BytesKWargs = self._pooled_kwargs("bytes", {"size": size})
2216
2217 if self.ir_tree_nodes is not None and observe:
2218 node = self._pop_ir_tree_node("bytes", kwargs, forced=forced)
2219 if forced is None:
2220 assert isinstance(node.value, bytes)
2221 forced = node.value
2222 fake_forced = True
2223
2224 value = self.provider.draw_bytes(
2225 **kwargs, forced=forced, fake_forced=fake_forced
2226 )
2227 if observe:
2228 self.observer.draw_bytes(
2229 value, kwargs=kwargs, was_forced=forced is not None and not fake_forced
2230 )
2231 self.__example_record.record_ir_draw(
2232 "bytes",
2233 value,
2234 kwargs=kwargs,
2235 was_forced=forced is not None and not fake_forced,
2236 )
2237 return value
2238
2239 def draw_boolean(
2240 self,
2241 p: float = 0.5,
2242 *,
2243 forced: Optional[bool] = None,
2244 fake_forced: bool = False,
2245 observe: bool = True,
2246 ) -> bool:
2247 # Internally, we treat probabilities lower than 1 / 2**64 as
2248 # unconditionally false.
2249 #
2250 # Note that even if we lift this 64 bit restriction in the future, p
2251 # cannot be 0 (1) when forced is True (False).
2252 if forced is True:
2253 assert p > 2 ** (-64)
2254 if forced is False:
2255 assert p < (1 - 2 ** (-64))
2256
2257 kwargs: BooleanKWargs = self._pooled_kwargs("boolean", {"p": p})
2258
2259 if self.ir_tree_nodes is not None and observe:
2260 node = self._pop_ir_tree_node("boolean", kwargs, forced=forced)
2261 if forced is None:
2262 assert isinstance(node.value, bool)
2263 forced = node.value
2264 fake_forced = True
2265
2266 value = self.provider.draw_boolean(
2267 **kwargs, forced=forced, fake_forced=fake_forced
2268 )
2269 if observe:
2270 self.observer.draw_boolean(
2271 value, kwargs=kwargs, was_forced=forced is not None and not fake_forced
2272 )
2273 self.__example_record.record_ir_draw(
2274 "boolean",
2275 value,
2276 kwargs=kwargs,
2277 was_forced=forced is not None and not fake_forced,
2278 )
2279 return value
2280
2281 def _pooled_kwargs(self, ir_type, kwargs):
2282 """Memoize common dictionary objects to reduce memory pressure."""
2283 key = []
2284 for k, v in kwargs.items():
2285 if ir_type == "float" and k in ["min_value", "max_value"]:
2286 # handle -0.0 vs 0.0, etc.
2287 v = float_to_int(v)
2288 elif ir_type == "integer" and k == "weights":
2289 # make hashable
2290 v = v if v is None else tuple(v)
2291 key.append((k, v))
2292
2293 key = (ir_type, *sorted(key))
2294
2295 try:
2296 return POOLED_KWARGS_CACHE[key]
2297 except KeyError:
2298 POOLED_KWARGS_CACHE[key] = kwargs
2299 return kwargs
2300
2301 def _pop_ir_tree_node(
2302 self, ir_type: IRTypeName, kwargs: IRKWargsType, *, forced: Optional[IRType]
2303 ) -> IRNode:
2304 assert self.ir_tree_nodes is not None
2305
2306 if self._node_index == len(self.ir_tree_nodes):
2307 self.mark_overrun()
2308
2309 node = self.ir_tree_nodes[self._node_index]
2310 # If we're trying to draw a different ir type at the same location, then
2311 # this ir tree has become badly misaligned. We don't have many good/simple
2312 # options here for realigning beyond giving up.
2313 #
2314 # This is more of an issue for ir nodes while shrinking than it was for
2315 # buffers: misaligned buffers are still usually valid, just interpreted
2316 # differently. This would be somewhat like drawing a random value for
2317 # the new ir type here. For what it's worth, misaligned buffers are
2318 # rather unlikely to be *useful* buffers, so giving up isn't a big downgrade.
2319 # (in fact, it is possible that giving up early here results in more time
2320 # for useful shrinks to run).
2321 if node.ir_type != ir_type:
2322 invalid_at = (ir_type, kwargs, forced)
2323 self.invalid_at = invalid_at
2324 self.observer.mark_invalid(invalid_at)
2325 self.mark_invalid(f"(internal) want a {ir_type} but have a {node.ir_type}")
2326
2327 # if a node has different kwargs (and so is misaligned), but has a value
2328 # that is allowed by the expected kwargs, then we can coerce this node
2329 # into an aligned one by using its value. It's unclear how useful this is.
2330 if not ir_value_permitted(node.value, node.ir_type, kwargs):
2331 invalid_at = (ir_type, kwargs, forced)
2332 self.invalid_at = invalid_at
2333 self.observer.mark_invalid(invalid_at)
2334 self.mark_invalid(f"(internal) got a {ir_type} but outside the valid range")
2335
2336 self._node_index += 1
2337 return node
2338
2339 def as_result(self) -> Union[ConjectureResult, _Overrun]:
2340 """Convert the result of running this test into
2341 either an Overrun object or a ConjectureResult."""
2342
2343 assert self.frozen
2344 if self.status == Status.OVERRUN:
2345 return Overrun
2346 if self.__result is None:
2347 self.__result = ConjectureResult(
2348 status=self.status,
2349 interesting_origin=self.interesting_origin,
2350 buffer=self.buffer,
2351 examples=self.examples,
2352 blocks=self.blocks,
2353 output=self.output,
2354 extra_information=(
2355 self.extra_information
2356 if self.extra_information.has_information()
2357 else None
2358 ),
2359 has_discards=self.has_discards,
2360 target_observations=self.target_observations,
2361 tags=frozenset(self.tags),
2362 forced_indices=frozenset(self.forced_indices),
2363 arg_slices=self.arg_slices,
2364 slice_comments=self.slice_comments,
2365 invalid_at=self.invalid_at,
2366 )
2367 assert self.__result is not None
2368 self.blocks.transfer_ownership(self.__result)
2369 return self.__result
2370
2371 def __assert_not_frozen(self, name: str) -> None:
2372 if self.frozen:
2373 raise Frozen(f"Cannot call {name} on frozen ConjectureData")
2374
2375 def note(self, value: Any) -> None:
2376 self.__assert_not_frozen("note")
2377 if not isinstance(value, str):
2378 value = repr(value)
2379 self.output += value
2380
2381 def draw(
2382 self,
2383 strategy: "SearchStrategy[Ex]",
2384 label: Optional[int] = None,
2385 observe_as: Optional[str] = None,
2386 ) -> "Ex":
2387 if self.is_find and not strategy.supports_find:
2388 raise InvalidArgument(
2389 f"Cannot use strategy {strategy!r} within a call to find "
2390 "(presumably because it would be invalid after the call had ended)."
2391 )
2392
2393 at_top_level = self.depth == 0
2394 start_time = None
2395 if at_top_level:
2396 # We start this timer early, because accessing attributes on a LazyStrategy
2397 # can be almost arbitrarily slow. In cases like characters() and text()
2398 # where we cache something expensive, this led to Flaky deadline errors!
2399 # See https://github.com/HypothesisWorks/hypothesis/issues/2108
2400 start_time = time.perf_counter()
2401 gc_start_time = gc_cumulative_time()
2402
2403 strategy.validate()
2404
2405 if strategy.is_empty:
2406 self.mark_invalid(f"empty strategy {self!r}")
2407
2408 if self.depth >= MAX_DEPTH:
2409 self.mark_invalid("max depth exceeded")
2410
2411 if label is None:
2412 assert isinstance(strategy.label, int)
2413 label = strategy.label
2414 self.start_example(label=label)
2415 try:
2416 if not at_top_level:
2417 return strategy.do_draw(self)
2418 assert start_time is not None
2419 key = observe_as or f"generate:unlabeled_{len(self.draw_times)}"
2420 try:
2421 strategy.validate()
2422 try:
2423 return strategy.do_draw(self)
2424 finally:
2425 # Subtract the time spent in GC to avoid overcounting, as it is
2426 # accounted for at the overall example level.
2427 in_gctime = gc_cumulative_time() - gc_start_time
2428 self.draw_times[key] = time.perf_counter() - start_time - in_gctime
2429 except Exception as err:
2430 add_note(err, f"while generating {key[9:]!r} from {strategy!r}")
2431 raise
2432 finally:
2433 self.stop_example()
2434
2435 def start_example(self, label: int) -> None:
2436 self.__assert_not_frozen("start_example")
2437 self.depth += 1
2438 # Logically it would make sense for this to just be
2439 # ``self.depth = max(self.depth, self.max_depth)``, which is what it used to
2440 # be until we ran the code under tracemalloc and found a rather significant
2441 # chunk of allocation was happening here. This was presumably due to varargs
2442 # or the like, but we didn't investigate further given that it was easy
2443 # to fix with this check.
2444 if self.depth > self.max_depth:
2445 self.max_depth = self.depth
2446 self.__example_record.start_example(label)
2447 self.labels_for_structure_stack.append({label})
2448
2449 def stop_example(self, *, discard: bool = False) -> None:
2450 if self.frozen:
2451 return
2452 if discard:
2453 self.has_discards = True
2454 self.depth -= 1
2455 assert self.depth >= -1
2456 self.__example_record.stop_example(discard=discard)
2457
2458 labels_for_structure = self.labels_for_structure_stack.pop()
2459
2460 if not discard:
2461 if self.labels_for_structure_stack:
2462 self.labels_for_structure_stack[-1].update(labels_for_structure)
2463 else:
2464 self.tags.update([structural_coverage(l) for l in labels_for_structure])
2465
2466 if discard:
2467 # Once we've discarded an example, every test case starting with
2468 # this prefix contains discards. We prune the tree at that point so
2469 # as to avoid future test cases bothering with this region, on the
2470 # assumption that some example that you could have used instead
2471 # there would *not* trigger the discard. This greatly speeds up
2472 # test case generation in some cases, because it allows us to
2473 # ignore large swathes of the search space that are effectively
2474 # redundant.
2475 #
2476 # A scenario that can cause us problems but which we deliberately
2477 # have decided not to support is that if there are side effects
2478 # during data generation then you may end up with a scenario where
2479 # every good test case generates a discard because the discarded
2480 # section sets up important things for later. This is not terribly
2481 # likely and all that you see in this case is some degradation in
2482 # quality of testing, so we don't worry about it.
2483 #
2484 # Note that killing the branch does *not* mean we will never
2485 # explore below this point, and in particular we may do so during
2486 # shrinking. Any explicit request for a data object that starts
2487 # with the branch here will work just fine, but novel prefix
2488 # generation will avoid it, and we can use it to detect when we
2489 # have explored the entire tree (up to redundancy).
2490
2491 self.observer.kill_branch()
2492
2493 @property
2494 def examples(self) -> Examples:
2495 assert self.frozen
2496 if self.__examples is None:
2497 self.__examples = Examples(record=self.__example_record, blocks=self.blocks)
2498 return self.__examples
2499
2500 def freeze(self) -> None:
2501 if self.frozen:
2502 assert isinstance(self.buffer, bytes)
2503 return
2504 self.finish_time = time.perf_counter()
2505 self.gc_finish_time = gc_cumulative_time()
2506 assert len(self.buffer) == self.index
2507
2508 # Always finish by closing all remaining examples so that we have a
2509 # valid tree.
2510 while self.depth >= 0:
2511 self.stop_example()
2512
2513 self.__example_record.freeze()
2514
2515 self.frozen = True
2516
2517 self.buffer = bytes(self.buffer)
2518
2519 # if we were invalid because of a misalignment in the tree, we don't
2520 # want to tell the DataTree that. Doing so would lead to inconsistent behavior.
2521 # Given an empty DataTree
2522 # ┌──────┐
2523 # │ root │
2524 # └──────┘
2525 # and supposing the very first draw is misaligned, concluding here would
2526 # tell the datatree that the *only* possibility at the root node is Status.INVALID:
2527 # ┌──────┐
2528 # │ root │
2529 # └──┬───┘
2530 # ┌───────────┴───────────────┐
2531 # │ Conclusion(Status.INVALID)│
2532 # └───────────────────────────┘
2533 # when in fact this is only the case when we try to draw a misaligned node.
2534 # For instance, suppose we come along in the second test case and try a
2535 # valid node as the first draw from the root. The DataTree thinks this
2536 # is flaky (because root must lead to Status.INVALID in the tree) while
2537 # in fact nothing in the test function has changed and the only change
2538 # is in the ir tree prefix we are supplying.
2539 #
2540 # From the perspective of DataTree, it is safe to not conclude here. This
2541 # tells the datatree that we don't know what happens after this node - which
2542 # is true! We are aborting early here because the ir tree became misaligned,
2543 # which is a semantically different invalidity than an assume or filter failing.
2544 if self.invalid_at is None:
2545 self.observer.conclude_test(self.status, self.interesting_origin)
2546
2547 def choice(
2548 self,
2549 values: Sequence[T],
2550 *,
2551 forced: Optional[T] = None,
2552 fake_forced: bool = False,
2553 observe: bool = True,
2554 ) -> T:
2555 forced_i = None if forced is None else values.index(forced)
2556 i = self.draw_integer(
2557 0,
2558 len(values) - 1,
2559 forced=forced_i,
2560 fake_forced=fake_forced,
2561 observe=observe,
2562 )
2563 return values[i]
2564
2565 def draw_bits(
2566 self, n: int, *, forced: Optional[int] = None, fake_forced: bool = False
2567 ) -> int:
2568 """Return an ``n``-bit integer from the underlying source of
2569 bytes. If ``forced`` is set to an integer will instead
2570 ignore the underlying source and simulate a draw as if it had
2571 returned that integer."""
2572 self.__assert_not_frozen("draw_bits")
2573 if n == 0:
2574 return 0
2575 assert n > 0
2576 n_bytes = bits_to_bytes(n)
2577 self.__check_capacity(n_bytes)
2578
2579 if forced is not None:
2580 buf = int_to_bytes(forced, n_bytes)
2581 elif self._bytes_drawn < len(self.__prefix):
2582 index = self._bytes_drawn
2583 buf = self.__prefix[index : index + n_bytes]
2584 if len(buf) < n_bytes:
2585 assert self.__random is not None
2586 buf += uniform(self.__random, n_bytes - len(buf))
2587 else:
2588 assert self.__random is not None
2589 buf = uniform(self.__random, n_bytes)
2590 buf = bytearray(buf)
2591 self._bytes_drawn += n_bytes
2592
2593 assert len(buf) == n_bytes
2594
2595 # If we have a number of bits that is not a multiple of 8
2596 # we have to mask off the high bits.
2597 buf[0] &= BYTE_MASKS[n % 8]
2598 buf = bytes(buf)
2599 result = int_from_bytes(buf)
2600
2601 self.__example_record.draw_bits()
2602
2603 initial = self.index
2604
2605 assert isinstance(self.buffer, bytearray)
2606 self.buffer.extend(buf)
2607 self.index = len(self.buffer)
2608
2609 if forced is not None and not fake_forced:
2610 self.forced_indices.update(range(initial, self.index))
2611
2612 self.blocks.add_endpoint(self.index)
2613
2614 assert result.bit_length() <= n
2615 return result
2616
2617 def __check_capacity(self, n: int) -> None:
2618 if self.index + n > self.max_length:
2619 self.mark_overrun()
2620
2621 def conclude_test(
2622 self,
2623 status: Status,
2624 interesting_origin: Optional[InterestingOrigin] = None,
2625 ) -> NoReturn:
2626 assert (interesting_origin is None) or (status == Status.INTERESTING)
2627 self.__assert_not_frozen("conclude_test")
2628 self.interesting_origin = interesting_origin
2629 self.status = status
2630 self.freeze()
2631 raise StopTest(self.testcounter)
2632
2633 def mark_interesting(
2634 self, interesting_origin: Optional[InterestingOrigin] = None
2635 ) -> NoReturn:
2636 self.conclude_test(Status.INTERESTING, interesting_origin)
2637
2638 def mark_invalid(self, why: Optional[str] = None) -> NoReturn:
2639 if why is not None:
2640 self.events["invalid because"] = why
2641 self.conclude_test(Status.INVALID)
2642
2643 def mark_overrun(self) -> NoReturn:
2644 self.conclude_test(Status.OVERRUN)
2645
2646
2647def bits_to_bytes(n: int) -> int:
2648 """The number of bytes required to represent an n-bit number.
2649 Equivalent to (n + 7) // 8, but slightly faster. This really is
2650 called enough times that that matters."""
2651 return (n + 7) >> 3