1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
10
11import sys
12import warnings
13from collections import abc, defaultdict
14from collections.abc import Sequence
15from functools import lru_cache
16from random import shuffle
17from typing import (
18 TYPE_CHECKING,
19 Any,
20 Callable,
21 ClassVar,
22 Generic,
23 Literal,
24 Optional,
25 TypeVar,
26 Union,
27 cast,
28 overload,
29)
30
31from hypothesis._settings import HealthCheck, Phase, Verbosity, settings
32from hypothesis.control import _current_build_context, current_build_context
33from hypothesis.errors import (
34 HypothesisException,
35 HypothesisWarning,
36 InvalidArgument,
37 NonInteractiveExampleWarning,
38 UnsatisfiedAssumption,
39)
40from hypothesis.internal.conjecture import utils as cu
41from hypothesis.internal.conjecture.data import ConjectureData
42from hypothesis.internal.conjecture.utils import (
43 calc_label_from_cls,
44 calc_label_from_hash,
45 calc_label_from_name,
46 combine_labels,
47)
48from hypothesis.internal.coverage import check_function
49from hypothesis.internal.reflection import (
50 get_pretty_function_description,
51 is_identity_function,
52)
53from hypothesis.strategies._internal.utils import defines_strategy
54from hypothesis.utils.conventions import UniqueIdentifier
55
56if TYPE_CHECKING:
57 from typing import TypeAlias
58
59 Ex = TypeVar("Ex", covariant=True, default=Any)
60else:
61 Ex = TypeVar("Ex", covariant=True)
62
63T = TypeVar("T")
64T3 = TypeVar("T3")
65T4 = TypeVar("T4")
66T5 = TypeVar("T5")
67MappedFrom = TypeVar("MappedFrom")
68MappedTo = TypeVar("MappedTo")
69RecurT: "TypeAlias" = Callable[["SearchStrategy"], bool]
70calculating = UniqueIdentifier("calculating")
71
72MAPPED_SEARCH_STRATEGY_DO_DRAW_LABEL = calc_label_from_name(
73 "another attempted draw in MappedStrategy"
74)
75
76FILTERED_SEARCH_STRATEGY_DO_DRAW_LABEL = calc_label_from_name(
77 "single loop iteration in FilteredStrategy"
78)
79
80
81def recursive_property(strategy: "SearchStrategy", name: str, default: object) -> Any:
82 """Handle properties which may be mutually recursive among a set of
83 strategies.
84
85 These are essentially lazily cached properties, with the ability to set
86 an override: If the property has not been explicitly set, we calculate
87 it on first access and memoize the result for later.
88
89 The problem is that for properties that depend on each other, a naive
90 calculation strategy may hit infinite recursion. Consider for example
91 the property is_empty. A strategy defined as x = st.deferred(lambda: x)
92 is certainly empty (in order to draw a value from x we would have to
93 draw a value from x, for which we would have to draw a value from x,
94 ...), but in order to calculate it the naive approach would end up
95 calling x.is_empty in order to calculate x.is_empty in order to etc.
96
97 The solution is one of fixed point calculation. We start with a default
98 value that is the value of the property in the absence of evidence to
99 the contrary, and then update the values of the property for all
100 dependent strategies until we reach a fixed point.
101
102 The approach taken roughly follows that in section 4.2 of Adams,
103 Michael D., Celeste Hollenbeck, and Matthew Might. "On the complexity
104 and performance of parsing with derivatives." ACM SIGPLAN Notices 51.6
105 (2016): 224-236.
106 """
107 cache_key = "cached_" + name
108 calculation = "calc_" + name
109 force_key = "force_" + name
110
111 def forced_value(target: SearchStrategy) -> Any:
112 try:
113 return getattr(target, force_key)
114 except AttributeError:
115 return getattr(target, cache_key)
116
117 try:
118 return forced_value(strategy)
119 except AttributeError:
120 pass
121
122 mapping: dict[SearchStrategy, Any] = {}
123 sentinel = object()
124 hit_recursion = False
125
126 # For a first pass we do a direct recursive calculation of the
127 # property, but we block recursively visiting a value in the
128 # computation of its property: When that happens, we simply
129 # note that it happened and return the default value.
130 def recur(strat: SearchStrategy) -> Any:
131 nonlocal hit_recursion
132 try:
133 return forced_value(strat)
134 except AttributeError:
135 pass
136 result = mapping.get(strat, sentinel)
137 if result is calculating:
138 hit_recursion = True
139 return default
140 elif result is sentinel:
141 mapping[strat] = calculating
142 mapping[strat] = getattr(strat, calculation)(recur)
143 return mapping[strat]
144 return result
145
146 recur(strategy)
147
148 # If we hit self-recursion in the computation of any strategy
149 # value, our mapping at the end is imprecise - it may or may
150 # not have the right values in it. We now need to proceed with
151 # a more careful fixed point calculation to get the exact
152 # values. Hopefully our mapping is still pretty good and it
153 # won't take a large number of updates to reach a fixed point.
154 if hit_recursion:
155 needs_update = set(mapping)
156
157 # We track which strategies use which in the course of
158 # calculating their property value. If A ever uses B in
159 # the course of calculating its value, then whenever the
160 # value of B changes we might need to update the value of
161 # A.
162 listeners: dict[SearchStrategy, set[SearchStrategy]] = defaultdict(set)
163 else:
164 needs_update = None
165
166 def recur2(strat: SearchStrategy) -> Any:
167 def recur_inner(other: SearchStrategy) -> Any:
168 try:
169 return forced_value(other)
170 except AttributeError:
171 pass
172 listeners[other].add(strat)
173 result = mapping.get(other, sentinel)
174 if result is sentinel:
175 assert needs_update is not None
176 needs_update.add(other)
177 mapping[other] = default
178 return default
179 return result
180
181 return recur_inner
182
183 count = 0
184 seen = set()
185 while needs_update:
186 count += 1
187 # If we seem to be taking a really long time to stabilize we
188 # start tracking seen values to attempt to detect an infinite
189 # loop. This should be impossible, and most code will never
190 # hit the count, but having an assertion for it means that
191 # testing is easier to debug and we don't just have a hung
192 # test.
193 # Note: This is actually covered, by test_very_deep_deferral
194 # in tests/cover/test_deferred_strategies.py. Unfortunately it
195 # runs into a coverage bug. See
196 # https://github.com/nedbat/coveragepy/issues/605
197 # for details.
198 if count > 50: # pragma: no cover
199 key = frozenset(mapping.items())
200 assert key not in seen, (key, name)
201 seen.add(key)
202 to_update = needs_update
203 needs_update = set()
204 for strat in to_update:
205 new_value = getattr(strat, calculation)(recur2(strat))
206 if new_value != mapping[strat]:
207 needs_update.update(listeners[strat])
208 mapping[strat] = new_value
209
210 # We now have a complete and accurate calculation of the
211 # property values for everything we have seen in the course of
212 # running this calculation. We simultaneously update all of
213 # them (not just the strategy we started out with).
214 for k, v in mapping.items():
215 setattr(k, cache_key, v)
216 return getattr(strategy, cache_key)
217
218
219class SearchStrategy(Generic[Ex]):
220 """A ``SearchStrategy`` tells Hypothesis how to generate that kind of input.
221
222 This class is only part of the public API for use in type annotations, so that
223 you can write e.g. ``-> SearchStrategy[Foo]`` for your function which returns
224 ``builds(Foo, ...)``. Do not inherit from or directly instantiate this class.
225 """
226
227 validate_called: bool = False
228 __label: Union[int, UniqueIdentifier, None] = None
229 __module__: str = "hypothesis.strategies"
230
231 def _available(self, data: ConjectureData) -> bool:
232 """Returns whether this strategy can *currently* draw any
233 values. This typically useful for stateful testing where ``Bundle``
234 grows over time a list of value to choose from.
235
236 Unlike ``empty`` property, this method's return value may change
237 over time.
238 Note: ``data`` parameter will only be used for introspection and no
239 value drawn from it.
240 """
241 return not self.is_empty
242
243 @property
244 def is_empty(self) -> Any:
245 # Returns True if this strategy can never draw a value and will always
246 # result in the data being marked invalid.
247 # The fact that this returns False does not guarantee that a valid value
248 # can be drawn - this is not intended to be perfect, and is primarily
249 # intended to be an optimisation for some cases.
250 return recursive_property(self, "is_empty", True)
251
252 @property
253 def supports_find(self) -> bool:
254 return True
255
256 # Returns True if values from this strategy can safely be reused without
257 # this causing unexpected behaviour.
258
259 # True if values from this strategy can be implicitly reused (e.g. as
260 # background values in a numpy array) without causing surprising
261 # user-visible behaviour. Should be false for built-in strategies that
262 # produce mutable values, and for strategies that have been mapped/filtered
263 # by arbitrary user-provided functions.
264 @property
265 def has_reusable_values(self) -> Any:
266 return recursive_property(self, "has_reusable_values", True)
267
268 # Whether this strategy is suitable for holding onto in a cache.
269 @property
270 def is_cacheable(self) -> Any:
271 return recursive_property(self, "is_cacheable", True)
272
273 def calc_is_cacheable(self, recur: RecurT) -> bool:
274 return True
275
276 def calc_is_empty(self, recur: RecurT) -> bool:
277 # Note: It is correct and significant that the default return value
278 # from calc_is_empty is False despite the default value for is_empty
279 # being true. The reason for this is that strategies should be treated
280 # as empty absent evidence to the contrary, but most basic strategies
281 # are trivially non-empty and it would be annoying to have to override
282 # this method to show that.
283 return False
284
285 def calc_has_reusable_values(self, recur: RecurT) -> bool:
286 return False
287
288 def example(self) -> Ex: # FIXME
289 """Provide an example of the sort of value that this strategy generates.
290
291 This method is designed for use in a REPL, and will raise an error if
292 called from inside |@given| or a strategy definition. For serious use,
293 see |@composite| or |st.data|.
294 """
295 if getattr(sys, "ps1", None) is None: # pragma: no branch
296 # The other branch *is* covered in cover/test_examples.py; but as that
297 # uses `pexpect` for an interactive session `coverage` doesn't see it.
298 warnings.warn(
299 "The `.example()` method is good for exploring strategies, but should "
300 "only be used interactively. We recommend using `@given` for tests - "
301 "it performs better, saves and replays failures to avoid flakiness, "
302 f"and reports minimal examples. (strategy: {self!r})",
303 NonInteractiveExampleWarning,
304 stacklevel=2,
305 )
306
307 context = _current_build_context.value
308 if context is not None:
309 if context.data is not None and context.data.depth > 0:
310 raise HypothesisException(
311 "Using example() inside a strategy definition is a bad "
312 "idea. Instead consider using hypothesis.strategies.builds() "
313 "or @hypothesis.strategies.composite to define your strategy."
314 " See https://hypothesis.readthedocs.io/en/latest/data.html"
315 "#hypothesis.strategies.builds or "
316 "https://hypothesis.readthedocs.io/en/latest/data.html"
317 "#composite-strategies for more details."
318 )
319 else:
320 raise HypothesisException(
321 "Using example() inside a test function is a bad "
322 "idea. Instead consider using hypothesis.strategies.data() "
323 "to draw more examples during testing. See "
324 "https://hypothesis.readthedocs.io/en/latest/data.html"
325 "#drawing-interactively-in-tests for more details."
326 )
327
328 try:
329 return self.__examples.pop()
330 except (AttributeError, IndexError):
331 self.__examples: list[Ex] = []
332
333 from hypothesis.core import given
334
335 # Note: this function has a weird name because it might appear in
336 # tracebacks, and we want users to know that they can ignore it.
337 @given(self)
338 @settings(
339 database=None,
340 # generate only a few examples at a time to avoid slow interactivity
341 # for large strategies. The overhead of @given is very small relative
342 # to generation, so a small batch size is fine.
343 max_examples=10,
344 deadline=None,
345 verbosity=Verbosity.quiet,
346 phases=(Phase.generate,),
347 suppress_health_check=list(HealthCheck),
348 )
349 def example_generating_inner_function(
350 ex: Ex, # type: ignore # mypy is overzealous in preventing covariant params
351 ) -> None:
352 self.__examples.append(ex)
353
354 example_generating_inner_function()
355 shuffle(self.__examples)
356 return self.__examples.pop()
357
358 def map(self, pack: Callable[[Ex], T]) -> "SearchStrategy[T]":
359 """Returns a new strategy which generates a value from this one, and
360 then returns ``pack(value)``. For example, ``integers().map(str)``
361 could generate ``str(5)`` == ``"5"``.
362 """
363 if is_identity_function(pack):
364 return self # type: ignore # Mypy has no way to know that `Ex == T`
365 return MappedStrategy(self, pack=pack)
366
367 def flatmap(
368 self, expand: Callable[[Ex], "SearchStrategy[T]"]
369 ) -> "SearchStrategy[T]": # FIXME
370 """Old syntax for a special case of |@composite|:
371
372 .. code-block:: python
373
374 @st.composite
375 def flatmap_like(draw, base_strategy, expand):
376 value = draw(base_strategy)
377 new_strategy = expand(value)
378 return draw(new_strategy)
379
380 We find that the greater readability of |@composite| usually outweighs
381 the verbosity, with a few exceptions for simple cases or recipes like
382 ``from_type(type).flatmap(from_type)`` ("pick a type, get a strategy for
383 any instance of that type, and then generate one of those").
384 """
385 from hypothesis.strategies._internal.flatmapped import FlatMapStrategy
386
387 return FlatMapStrategy(self, expand=expand)
388
389 # Note that we previously had condition extracted to a type alias as
390 # PredicateT. However, that was only useful when not specifying a relationship
391 # between the generic Ts and some other function param / return value.
392 # If we do want to - like here, where we want to say that the Ex arg to condition
393 # is of the same type as the strategy's Ex - then you need to write out the
394 # entire Callable[[Ex], Any] expression rather than use a type alias.
395 # TypeAlias is *not* simply a macro that inserts the text. TypeAlias will not
396 # reference the local TypeVar context.
397 def filter(self, condition: Callable[[Ex], Any]) -> "SearchStrategy[Ex]":
398 """Returns a new strategy that generates values from this strategy
399 which satisfy the provided condition.
400
401 Note that if the condition is too hard to satisfy this might result
402 in your tests failing with an Unsatisfiable exception.
403 A basic version of the filtering logic would look something like:
404
405 .. code-block:: python
406
407 @st.composite
408 def filter_like(draw, strategy, condition):
409 for _ in range(3):
410 value = draw(strategy)
411 if condition(value):
412 return value
413 assume(False)
414 """
415 return FilteredStrategy(conditions=(condition,), strategy=self)
416
417 def _filter_for_filtered_draw(
418 self, condition: Callable[[Ex], Any]
419 ) -> "FilteredStrategy[Ex]":
420 # Hook for parent strategies that want to perform fallible filtering
421 # on one of their internal strategies (e.g. UniqueListStrategy).
422 # The returned object must have a `.do_filtered_draw(data)` method
423 # that behaves like `do_draw`, but returns the sentinel object
424 # `filter_not_satisfied` if the condition could not be satisfied.
425
426 # This is separate from the main `filter` method so that strategies
427 # can override `filter` without having to also guarantee a
428 # `do_filtered_draw` method.
429 return FilteredStrategy(conditions=(condition,), strategy=self)
430
431 @property
432 def branches(self) -> Sequence["SearchStrategy[Ex]"]:
433 return [self]
434
435 def __or__(self, other: "SearchStrategy[T]") -> "SearchStrategy[Union[Ex, T]]":
436 """Return a strategy which produces values by randomly drawing from one
437 of this strategy or the other strategy.
438
439 This method is part of the public API.
440 """
441 if not isinstance(other, SearchStrategy):
442 raise ValueError(f"Cannot | a SearchStrategy with {other!r}")
443
444 # Unwrap explicitly or'd strategies. This turns the
445 # common case of e.g. st.integers() | st.integers() | st.integers() from
446 #
447 # one_of(one_of(integers(), integers()), integers())
448 #
449 # into
450 #
451 # one_of(integers(), integers(), integers())
452 #
453 # This is purely an aesthetic unwrapping, for e.g. reprs. In practice
454 # we use .branches / .element_strategies to get the list of possible
455 # strategies, so this unwrapping is *not* necessary for correctness.
456 strategies: list[SearchStrategy] = []
457 strategies.extend(
458 self.original_strategies if isinstance(self, OneOfStrategy) else [self]
459 )
460 strategies.extend(
461 other.original_strategies if isinstance(other, OneOfStrategy) else [other]
462 )
463 return OneOfStrategy(strategies)
464
465 def __bool__(self) -> bool:
466 warnings.warn(
467 f"bool({self!r}) is always True, did you mean to draw a value?",
468 HypothesisWarning,
469 stacklevel=2,
470 )
471 return True
472
473 def validate(self) -> None:
474 """Throw an exception if the strategy is not valid.
475
476 This can happen due to lazy construction
477 """
478 if self.validate_called:
479 return
480 try:
481 self.validate_called = True
482 self.do_validate()
483 self.is_empty
484 self.has_reusable_values
485 except Exception:
486 self.validate_called = False
487 raise
488
489 LABELS: ClassVar[dict[type, int]] = {}
490
491 @property
492 def class_label(self) -> int:
493 cls = self.__class__
494 try:
495 return cls.LABELS[cls]
496 except KeyError:
497 pass
498 result = calc_label_from_cls(cls)
499 cls.LABELS[cls] = result
500 return result
501
502 @property
503 def label(self) -> int:
504 if self.__label is calculating:
505 return 0
506 if self.__label is None:
507 self.__label = calculating
508 self.__label = self.calc_label()
509 return cast(int, self.__label)
510
511 def calc_label(self) -> int:
512 return self.class_label
513
514 def do_validate(self) -> None:
515 pass
516
517 def do_draw(self, data: ConjectureData) -> Ex:
518 raise NotImplementedError(f"{type(self).__name__}.do_draw")
519
520
521def is_hashable(value: object) -> bool:
522 try:
523 hash(value)
524 return True
525 except TypeError:
526 return False
527
528
529class SampledFromStrategy(SearchStrategy[Ex]):
530 """A strategy which samples from a set of elements. This is essentially
531 equivalent to using a OneOfStrategy over Just strategies but may be more
532 efficient and convenient.
533 """
534
535 _MAX_FILTER_CALLS: ClassVar[int] = 10_000
536
537 def __init__(
538 self,
539 elements: Sequence[Ex],
540 *,
541 force_repr: Optional[str] = None,
542 force_repr_braces: Optional[tuple[str, str]] = None,
543 transformations: tuple[
544 tuple[Literal["filter", "map"], Callable[[Ex], Any]],
545 ...,
546 ] = (),
547 ):
548 super().__init__()
549 self.elements = cu.check_sample(elements, "sampled_from")
550 assert self.elements
551 self.force_repr = force_repr
552 self.force_repr_braces = force_repr_braces
553 self._transformations = transformations
554
555 self._cached_repr: Optional[str] = None
556
557 def map(self, pack: Callable[[Ex], T]) -> SearchStrategy[T]:
558 s = type(self)(
559 self.elements,
560 force_repr=self.force_repr,
561 force_repr_braces=self.force_repr_braces,
562 transformations=(*self._transformations, ("map", pack)),
563 )
564 # guaranteed by the ("map", pack) transformation
565 return cast(SearchStrategy[T], s)
566
567 def filter(self, condition: Callable[[Ex], Any]) -> SearchStrategy[Ex]:
568 return type(self)(
569 self.elements,
570 force_repr=self.force_repr,
571 force_repr_braces=self.force_repr_braces,
572 transformations=(*self._transformations, ("filter", condition)),
573 )
574
575 def __repr__(self):
576 if self._cached_repr is None:
577 rep = get_pretty_function_description
578 elements_s = (
579 ", ".join(rep(v) for v in self.elements[:512]) + ", ..."
580 if len(self.elements) > 512
581 else ", ".join(rep(v) for v in self.elements)
582 )
583 braces = self.force_repr_braces or ("(", ")")
584 instance_s = (
585 self.force_repr or f"sampled_from({braces[0]}{elements_s}{braces[1]})"
586 )
587 transforms_s = "".join(
588 f".{name}({get_pretty_function_description(f)})"
589 for name, f in self._transformations
590 )
591 repr_s = instance_s + transforms_s
592 self._cached_repr = repr_s
593 return self._cached_repr
594
595 def calc_label(self) -> int:
596 # strategy.label is effectively an under-approximation of structural
597 # equality (i.e., some strategies may have the same label when they are not
598 # structurally identical). More importantly for calculating the
599 # SampledFromStrategy label, we might have hash(s1) != hash(s2) even
600 # when s1 and s2 are structurally identical. For instance:
601 #
602 # s1 = st.sampled_from([st.none()])
603 # s2 = st.sampled_from([st.none()])
604 # assert hash(s1) != hash(s2)
605 #
606 # (see also test cases in test_labels.py).
607 #
608 # We therefore use the labels of any component strategies when calculating
609 # our label, and only use the hash if it is not a strategy.
610 #
611 # That's the ideal, anyway. In reality the logic is more complicated than
612 # necessary in order to be efficient in the presence of (very) large sequences:
613 # * add an unabashed special case for range, to avoid iteration over an
614 # enormous range when we know it is entirely integers.
615 # * if there is at least one strategy in self.elements, use strategy label,
616 # and the element hash otherwise.
617 # * if there are no strategies in self.elements, take the hash of the
618 # entire sequence. This prevents worst-case performance of hashing each
619 # element when a hash of the entire sequence would have sufficed.
620 #
621 # The worst case performance of this scheme is
622 # itertools.chain(range(2**100), [st.none()]), where it degrades to
623 # hashing every int in the range.
624
625 if isinstance(self.elements, range) or (
626 is_hashable(self.elements)
627 and not any(isinstance(e, SearchStrategy) for e in self.elements)
628 ):
629 return combine_labels(self.class_label, calc_label_from_hash(self.elements))
630
631 labels = [self.class_label]
632 for element in self.elements:
633 if not is_hashable(element):
634 continue
635
636 labels.append(
637 element.label
638 if isinstance(element, SearchStrategy)
639 else calc_label_from_hash(element)
640 )
641
642 return combine_labels(*labels)
643
644 def calc_has_reusable_values(self, recur: RecurT) -> bool:
645 # Because our custom .map/.filter implementations skip the normal
646 # wrapper strategies (which would automatically return False for us),
647 # we need to manually return False here if any transformations have
648 # been applied.
649 return not self._transformations
650
651 def calc_is_cacheable(self, recur: RecurT) -> bool:
652 return is_hashable(self.elements)
653
654 def _transform(
655 self,
656 # https://github.com/python/mypy/issues/7049, we're not writing `element`
657 # anywhere in the class so this is still type-safe. mypy is being more
658 # conservative than necessary
659 element: Ex, # type: ignore
660 ) -> Union[Ex, UniqueIdentifier]:
661 # Used in UniqueSampledListStrategy
662 for name, f in self._transformations:
663 if name == "map":
664 result = f(element)
665 if build_context := _current_build_context.value:
666 build_context.record_call(result, f, [element], {})
667 element = result
668 else:
669 assert name == "filter"
670 if not f(element):
671 return filter_not_satisfied
672 return element
673
674 def do_draw(self, data: ConjectureData) -> Ex:
675 result = self.do_filtered_draw(data)
676 if isinstance(result, SearchStrategy) and all(
677 isinstance(x, SearchStrategy) for x in self.elements
678 ):
679 data._sampled_from_all_strategies_elements_message = (
680 "sample_from was given a collection of strategies: "
681 "{!r}. Was one_of intended?",
682 self.elements,
683 )
684 if result is filter_not_satisfied:
685 data.mark_invalid(f"Aborted test because unable to satisfy {self!r}")
686 assert not isinstance(result, UniqueIdentifier)
687 return result
688
689 def get_element(self, i: int) -> Union[Ex, UniqueIdentifier]:
690 return self._transform(self.elements[i])
691
692 def do_filtered_draw(self, data: ConjectureData) -> Union[Ex, UniqueIdentifier]:
693 # Set of indices that have been tried so far, so that we never test
694 # the same element twice during a draw.
695 known_bad_indices: set[int] = set()
696
697 # Start with ordinary rejection sampling. It's fast if it works, and
698 # if it doesn't work then it was only a small amount of overhead.
699 for _ in range(3):
700 i = data.draw_integer(0, len(self.elements) - 1)
701 if i not in known_bad_indices:
702 element = self.get_element(i)
703 if element is not filter_not_satisfied:
704 return element
705 if not known_bad_indices:
706 data.events[f"Retried draw from {self!r} to satisfy filter"] = ""
707 known_bad_indices.add(i)
708
709 # If we've tried all the possible elements, give up now.
710 max_good_indices = len(self.elements) - len(known_bad_indices)
711 if not max_good_indices:
712 return filter_not_satisfied
713
714 # Impose an arbitrary cutoff to prevent us from wasting too much time
715 # on very large element lists.
716 max_good_indices = min(max_good_indices, self._MAX_FILTER_CALLS - 3)
717
718 # Before building the list of allowed indices, speculatively choose
719 # one of them. We don't yet know how many allowed indices there will be,
720 # so this choice might be out-of-bounds, but that's OK.
721 speculative_index = data.draw_integer(0, max_good_indices - 1)
722
723 # Calculate the indices of allowed values, so that we can choose one
724 # of them at random. But if we encounter the speculatively-chosen one,
725 # just use that and return immediately. Note that we also track the
726 # allowed elements, in case of .map(some_stateful_function)
727 allowed: list[tuple[int, Ex]] = []
728 for i in range(min(len(self.elements), self._MAX_FILTER_CALLS - 3)):
729 if i not in known_bad_indices:
730 element = self.get_element(i)
731 if element is not filter_not_satisfied:
732 assert not isinstance(element, UniqueIdentifier)
733 allowed.append((i, element))
734 if len(allowed) > speculative_index:
735 # Early-exit case: We reached the speculative index, so
736 # we just return the corresponding element.
737 data.draw_integer(0, len(self.elements) - 1, forced=i)
738 return element
739
740 # The speculative index didn't work out, but at this point we've built
741 # and can choose from the complete list of allowed indices and elements.
742 if allowed:
743 i, element = data.choice(allowed)
744 data.draw_integer(0, len(self.elements) - 1, forced=i)
745 return element
746 # If there are no allowed indices, the filter couldn't be satisfied.
747 return filter_not_satisfied
748
749
750class OneOfStrategy(SearchStrategy[Ex]):
751 """Implements a union of strategies. Given a number of strategies this
752 generates values which could have come from any of them.
753
754 The conditional distribution draws uniformly at random from some
755 non-empty subset of these strategies and then draws from the
756 conditional distribution of that strategy.
757 """
758
759 def __init__(self, strategies: Sequence[SearchStrategy[Ex]]):
760 super().__init__()
761 self.original_strategies = tuple(strategies)
762 self.__element_strategies: Optional[Sequence[SearchStrategy[Ex]]] = None
763 self.__in_branches = False
764
765 def calc_is_empty(self, recur: RecurT) -> bool:
766 return all(recur(e) for e in self.original_strategies)
767
768 def calc_has_reusable_values(self, recur: RecurT) -> bool:
769 return all(recur(e) for e in self.original_strategies)
770
771 def calc_is_cacheable(self, recur: RecurT) -> bool:
772 return all(recur(e) for e in self.original_strategies)
773
774 @property
775 def element_strategies(self) -> Sequence[SearchStrategy[Ex]]:
776 if self.__element_strategies is None:
777 # While strategies are hashable, they use object.__hash__ and are
778 # therefore distinguished only by identity.
779 #
780 # In principle we could "just" define a __hash__ method
781 # (and __eq__, but that's easy in terms of type() and hash())
782 # to make this more powerful, but this is harder than it sounds:
783 #
784 # 1. Strategies are often distinguished by non-hashable attributes,
785 # or by attributes that have the same hash value ("^.+" / b"^.+").
786 # 2. LazyStrategy: can't reify the wrapped strategy without breaking
787 # laziness, so there's a hash each for the lazy and the nonlazy.
788 #
789 # Having made several attempts, the minor benefits of making strategies
790 # hashable are simply not worth the engineering effort it would take.
791 # See also issues #2291 and #2327.
792 seen: set[SearchStrategy] = {self}
793 strategies: list[SearchStrategy] = []
794 for arg in self.original_strategies:
795 check_strategy(arg)
796 if not arg.is_empty:
797 for s in arg.branches:
798 if s not in seen and not s.is_empty:
799 seen.add(s)
800 strategies.append(s)
801 self.__element_strategies = strategies
802 return self.__element_strategies
803
804 def calc_label(self) -> int:
805 return combine_labels(
806 self.class_label, *(p.label for p in self.original_strategies)
807 )
808
809 def do_draw(self, data: ConjectureData) -> Ex:
810 strategy = data.draw(
811 SampledFromStrategy(self.element_strategies).filter(
812 lambda s: s._available(data)
813 )
814 )
815 return data.draw(strategy)
816
817 def __repr__(self) -> str:
818 return "one_of({})".format(", ".join(map(repr, self.original_strategies)))
819
820 def do_validate(self) -> None:
821 for e in self.element_strategies:
822 e.validate()
823
824 @property
825 def branches(self) -> Sequence[SearchStrategy[Ex]]:
826 if not self.__in_branches:
827 try:
828 self.__in_branches = True
829 return self.element_strategies
830 finally:
831 self.__in_branches = False
832 else:
833 return [self]
834
835 def filter(self, condition: Callable[[Ex], Any]) -> SearchStrategy[Ex]:
836 return FilteredStrategy(
837 OneOfStrategy([s.filter(condition) for s in self.original_strategies]),
838 conditions=(),
839 )
840
841
842@overload
843def one_of(
844 __args: Sequence[SearchStrategy[Ex]],
845) -> SearchStrategy[Ex]: # pragma: no cover
846 ...
847
848
849@overload
850def one_of(__a1: SearchStrategy[Ex]) -> SearchStrategy[Ex]: # pragma: no cover
851 ...
852
853
854@overload
855def one_of(
856 __a1: SearchStrategy[Ex], __a2: SearchStrategy[T]
857) -> SearchStrategy[Union[Ex, T]]: # pragma: no cover
858 ...
859
860
861@overload
862def one_of(
863 __a1: SearchStrategy[Ex], __a2: SearchStrategy[T], __a3: SearchStrategy[T3]
864) -> SearchStrategy[Union[Ex, T, T3]]: # pragma: no cover
865 ...
866
867
868@overload
869def one_of(
870 __a1: SearchStrategy[Ex],
871 __a2: SearchStrategy[T],
872 __a3: SearchStrategy[T3],
873 __a4: SearchStrategy[T4],
874) -> SearchStrategy[Union[Ex, T, T3, T4]]: # pragma: no cover
875 ...
876
877
878@overload
879def one_of(
880 __a1: SearchStrategy[Ex],
881 __a2: SearchStrategy[T],
882 __a3: SearchStrategy[T3],
883 __a4: SearchStrategy[T4],
884 __a5: SearchStrategy[T5],
885) -> SearchStrategy[Union[Ex, T, T3, T4, T5]]: # pragma: no cover
886 ...
887
888
889@overload
890def one_of(*args: SearchStrategy[Any]) -> SearchStrategy[Any]: # pragma: no cover
891 ...
892
893
894@defines_strategy(never_lazy=True)
895def one_of(
896 *args: Union[Sequence[SearchStrategy[Any]], SearchStrategy[Any]]
897) -> SearchStrategy[Any]:
898 # Mypy workaround alert: Any is too loose above; the return parameter
899 # should be the union of the input parameters. Unfortunately, Mypy <=0.600
900 # raises errors due to incompatible inputs instead. See #1270 for links.
901 # v0.610 doesn't error; it gets inference wrong for 2+ arguments instead.
902 """Return a strategy which generates values from any of the argument
903 strategies.
904
905 This may be called with one iterable argument instead of multiple
906 strategy arguments, in which case ``one_of(x)`` and ``one_of(*x)`` are
907 equivalent.
908
909 Examples from this strategy will generally shrink to ones that come from
910 strategies earlier in the list, then shrink according to behaviour of the
911 strategy that produced them. In order to get good shrinking behaviour,
912 try to put simpler strategies first. e.g. ``one_of(none(), text())`` is
913 better than ``one_of(text(), none())``.
914
915 This is especially important when using recursive strategies. e.g.
916 ``x = st.deferred(lambda: st.none() | st.tuples(x, x))`` will shrink well,
917 but ``x = st.deferred(lambda: st.tuples(x, x) | st.none())`` will shrink
918 very badly indeed.
919 """
920 if len(args) == 1 and not isinstance(args[0], SearchStrategy):
921 try:
922 args = tuple(args[0])
923 except TypeError:
924 pass
925 if len(args) == 1 and isinstance(args[0], SearchStrategy):
926 # This special-case means that we can one_of over lists of any size
927 # without incurring any performance overhead when there is only one
928 # strategy, and keeps our reprs simple.
929 return args[0]
930 if args and not any(isinstance(a, SearchStrategy) for a in args):
931 # And this special case is to give a more-specific error message if it
932 # seems that the user has confused `one_of()` for `sampled_from()`;
933 # the remaining validation is left to OneOfStrategy. See PR #2627.
934 raise InvalidArgument(
935 f"Did you mean st.sampled_from({list(args)!r})? st.one_of() is used "
936 "to combine strategies, but all of the arguments were of other types."
937 )
938 # we've handled the case where args is a one-element sequence [(s1, s2, ...)]
939 # above, so we can assume it's an actual sequence of strategies.
940 args = cast(Sequence[SearchStrategy], args)
941 return OneOfStrategy(args)
942
943
944class MappedStrategy(SearchStrategy[MappedTo], Generic[MappedFrom, MappedTo]):
945 """A strategy which is defined purely by conversion to and from another
946 strategy.
947
948 Its parameter and distribution come from that other strategy.
949 """
950
951 def __init__(
952 self,
953 strategy: SearchStrategy[MappedFrom],
954 pack: Callable[[MappedFrom], MappedTo],
955 ) -> None:
956 super().__init__()
957 self.mapped_strategy = strategy
958 self.pack = pack
959
960 def calc_is_empty(self, recur: RecurT) -> bool:
961 return recur(self.mapped_strategy)
962
963 def calc_is_cacheable(self, recur: RecurT) -> bool:
964 return recur(self.mapped_strategy)
965
966 def __repr__(self) -> str:
967 if not hasattr(self, "_cached_repr"):
968 self._cached_repr = f"{self.mapped_strategy!r}.map({get_pretty_function_description(self.pack)})"
969 return self._cached_repr
970
971 def do_validate(self) -> None:
972 self.mapped_strategy.validate()
973
974 def do_draw(self, data: ConjectureData) -> MappedTo:
975 with warnings.catch_warnings():
976 if isinstance(self.pack, type) and issubclass(
977 self.pack, (abc.Mapping, abc.Set)
978 ):
979 warnings.simplefilter("ignore", BytesWarning)
980 for _ in range(3):
981 try:
982 data.start_span(MAPPED_SEARCH_STRATEGY_DO_DRAW_LABEL)
983 x = data.draw(self.mapped_strategy)
984 result = self.pack(x)
985 data.stop_span()
986 current_build_context().record_call(result, self.pack, [x], {})
987 return result
988 except UnsatisfiedAssumption:
989 data.stop_span(discard=True)
990 raise UnsatisfiedAssumption
991
992 @property
993 def branches(self) -> Sequence[SearchStrategy[MappedTo]]:
994 return [
995 MappedStrategy(strategy, pack=self.pack)
996 for strategy in self.mapped_strategy.branches
997 ]
998
999 def filter(
1000 self, condition: Callable[[MappedTo], Any]
1001 ) -> "SearchStrategy[MappedTo]":
1002 # Includes a special case so that we can rewrite filters on collection
1003 # lengths, when most collections are `st.lists(...).map(the_type)`.
1004 ListStrategy = _list_strategy_type()
1005 if not isinstance(self.mapped_strategy, ListStrategy) or not (
1006 (isinstance(self.pack, type) and issubclass(self.pack, abc.Collection))
1007 or self.pack in _collection_ish_functions()
1008 ):
1009 return super().filter(condition)
1010
1011 # Check whether our inner list strategy can rewrite this filter condition.
1012 # If not, discard the result and _only_ apply a new outer filter.
1013 new = ListStrategy.filter(self.mapped_strategy, condition)
1014 if getattr(new, "filtered_strategy", None) is self.mapped_strategy:
1015 return super().filter(condition) # didn't rewrite
1016
1017 # Apply a new outer filter even though we rewrote the inner strategy,
1018 # because some collections can change the list length (dict, set, etc).
1019 return FilteredStrategy(type(self)(new, self.pack), conditions=(condition,))
1020
1021
1022@lru_cache
1023def _list_strategy_type() -> Any:
1024 from hypothesis.strategies._internal.collections import ListStrategy
1025
1026 return ListStrategy
1027
1028
1029def _collection_ish_functions() -> Sequence[Any]:
1030 funcs = [sorted]
1031 if np := sys.modules.get("numpy"):
1032 # c.f. https://numpy.org/doc/stable/reference/routines.array-creation.html
1033 # Probably only `np.array` and `np.asarray` will be used in practice,
1034 # but why should that stop us when we've already gone this far?
1035 funcs += [
1036 np.empty_like,
1037 np.eye,
1038 np.identity,
1039 np.ones_like,
1040 np.zeros_like,
1041 np.array,
1042 np.asarray,
1043 np.asanyarray,
1044 np.ascontiguousarray,
1045 np.asmatrix,
1046 np.copy,
1047 np.rec.array,
1048 np.rec.fromarrays,
1049 np.rec.fromrecords,
1050 np.diag,
1051 # bonus undocumented functions from tab-completion:
1052 np.asarray_chkfinite,
1053 np.asfortranarray,
1054 ]
1055
1056 return funcs
1057
1058
1059filter_not_satisfied = UniqueIdentifier("filter not satisfied")
1060
1061
1062class FilteredStrategy(SearchStrategy[Ex]):
1063 def __init__(
1064 self, strategy: SearchStrategy[Ex], conditions: tuple[Callable[[Ex], Any], ...]
1065 ):
1066 super().__init__()
1067 if isinstance(strategy, FilteredStrategy):
1068 # Flatten chained filters into a single filter with multiple conditions.
1069 self.flat_conditions: tuple[Callable[[Ex], Any], ...] = (
1070 strategy.flat_conditions + conditions
1071 )
1072 self.filtered_strategy: SearchStrategy[Ex] = strategy.filtered_strategy
1073 else:
1074 self.flat_conditions = conditions
1075 self.filtered_strategy = strategy
1076
1077 assert isinstance(self.flat_conditions, tuple)
1078 assert not isinstance(self.filtered_strategy, FilteredStrategy)
1079
1080 self.__condition: Optional[Callable[[Ex], Any]] = None
1081
1082 def calc_is_empty(self, recur: RecurT) -> bool:
1083 return recur(self.filtered_strategy)
1084
1085 def calc_is_cacheable(self, recur: RecurT) -> bool:
1086 return recur(self.filtered_strategy)
1087
1088 def __repr__(self) -> str:
1089 if not hasattr(self, "_cached_repr"):
1090 self._cached_repr = "{!r}{}".format(
1091 self.filtered_strategy,
1092 "".join(
1093 f".filter({get_pretty_function_description(cond)})"
1094 for cond in self.flat_conditions
1095 ),
1096 )
1097 return self._cached_repr
1098
1099 def do_validate(self) -> None:
1100 # Start by validating our inner filtered_strategy. If this was a LazyStrategy,
1101 # validation also reifies it so that subsequent calls to e.g. `.filter()` will
1102 # be passed through.
1103 self.filtered_strategy.validate()
1104 # So now we have a reified inner strategy, we'll replay all our saved
1105 # predicates in case some or all of them can be rewritten. Note that this
1106 # replaces the `fresh` strategy too!
1107 fresh = self.filtered_strategy
1108 for cond in self.flat_conditions:
1109 fresh = fresh.filter(cond)
1110 if isinstance(fresh, FilteredStrategy):
1111 # In this case we have at least some non-rewritten filter predicates,
1112 # so we just re-initialize the strategy.
1113 FilteredStrategy.__init__(
1114 self, fresh.filtered_strategy, fresh.flat_conditions
1115 )
1116 else:
1117 # But if *all* the predicates were rewritten... well, do_validate() is
1118 # an in-place method so we still just re-initialize the strategy!
1119 FilteredStrategy.__init__(self, fresh, ())
1120
1121 def filter(self, condition: Callable[[Ex], Any]) -> "FilteredStrategy[Ex]":
1122 # If we can, it's more efficient to rewrite our strategy to satisfy the
1123 # condition. We therefore exploit the fact that the order of predicates
1124 # doesn't matter (`f(x) and g(x) == g(x) and f(x)`) by attempting to apply
1125 # condition directly to our filtered strategy as the inner-most filter.
1126 out = self.filtered_strategy.filter(condition)
1127 # If it couldn't be rewritten, we'll get a new FilteredStrategy - and then
1128 # combine the conditions of each in our expected newest=last order.
1129 if isinstance(out, FilteredStrategy):
1130 return FilteredStrategy(
1131 out.filtered_strategy, self.flat_conditions + out.flat_conditions
1132 )
1133 # But if it *could* be rewritten, we can return the more efficient form!
1134 return FilteredStrategy(out, self.flat_conditions)
1135
1136 @property
1137 def condition(self) -> Callable[[Ex], Any]:
1138 if self.__condition is None:
1139 if len(self.flat_conditions) == 1:
1140 # Avoid an extra indirection in the common case of only one condition.
1141 self.__condition = self.flat_conditions[0]
1142 elif len(self.flat_conditions) == 0:
1143 # Possible, if unlikely, due to filter predicate rewriting
1144 self.__condition = lambda _: True # type: ignore # covariant type param
1145 else:
1146 self.__condition = lambda x: all( # type: ignore # covariant type param
1147 cond(x) for cond in self.flat_conditions
1148 )
1149 return self.__condition
1150
1151 def do_draw(self, data: ConjectureData) -> Ex:
1152 result = self.do_filtered_draw(data)
1153 if result is not filter_not_satisfied:
1154 return cast(Ex, result)
1155
1156 data.mark_invalid(f"Aborted test because unable to satisfy {self!r}")
1157
1158 def do_filtered_draw(self, data: ConjectureData) -> Union[Ex, UniqueIdentifier]:
1159 for i in range(3):
1160 data.start_span(FILTERED_SEARCH_STRATEGY_DO_DRAW_LABEL)
1161 value = data.draw(self.filtered_strategy)
1162 if self.condition(value):
1163 data.stop_span()
1164 return value
1165 else:
1166 data.stop_span(discard=True)
1167 if i == 0:
1168 data.events[f"Retried draw from {self!r} to satisfy filter"] = ""
1169
1170 return filter_not_satisfied
1171
1172 @property
1173 def branches(self) -> Sequence[SearchStrategy[Ex]]:
1174 return [
1175 FilteredStrategy(strategy=strategy, conditions=self.flat_conditions)
1176 for strategy in self.filtered_strategy.branches
1177 ]
1178
1179
1180@check_function
1181def check_strategy(arg: object, name: str = "") -> None:
1182 assert isinstance(name, str)
1183 if not isinstance(arg, SearchStrategy):
1184 hint = ""
1185 if isinstance(arg, (list, tuple)):
1186 hint = ", such as st.sampled_from({}),".format(name or "...")
1187 if name:
1188 name += "="
1189 raise InvalidArgument(
1190 f"Expected a SearchStrategy{hint} but got {name}{arg!r} "
1191 f"(type={type(arg).__name__})"
1192 )