1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
10
11import sys
12import warnings
13from collections import abc, defaultdict
14from collections.abc import Sequence
15from functools import lru_cache
16from random import shuffle
17from typing import (
18 TYPE_CHECKING,
19 Any,
20 Callable,
21 ClassVar,
22 Generic,
23 Literal,
24 Optional,
25 TypeVar,
26 Union,
27 cast,
28 overload,
29)
30
31from hypothesis._settings import HealthCheck, Phase, Verbosity, settings
32from hypothesis.control import _current_build_context, current_build_context
33from hypothesis.errors import (
34 HypothesisException,
35 HypothesisWarning,
36 InvalidArgument,
37 NonInteractiveExampleWarning,
38 UnsatisfiedAssumption,
39)
40from hypothesis.internal.conjecture import utils as cu
41from hypothesis.internal.conjecture.data import ConjectureData
42from hypothesis.internal.conjecture.utils import (
43 calc_label_from_cls,
44 calc_label_from_hash,
45 calc_label_from_name,
46 combine_labels,
47)
48from hypothesis.internal.coverage import check_function
49from hypothesis.internal.reflection import (
50 get_pretty_function_description,
51 is_identity_function,
52)
53from hypothesis.strategies._internal.utils import defines_strategy
54from hypothesis.utils.conventions import UniqueIdentifier
55
56if TYPE_CHECKING:
57 from typing import TypeAlias
58
59 Ex = TypeVar("Ex", covariant=True, default=Any)
60else:
61 Ex = TypeVar("Ex", covariant=True)
62
63T = TypeVar("T")
64T3 = TypeVar("T3")
65T4 = TypeVar("T4")
66T5 = TypeVar("T5")
67MappedFrom = TypeVar("MappedFrom")
68MappedTo = TypeVar("MappedTo")
69RecurT: "TypeAlias" = Callable[["SearchStrategy"], bool]
70calculating = UniqueIdentifier("calculating")
71
72MAPPED_SEARCH_STRATEGY_DO_DRAW_LABEL = calc_label_from_name(
73 "another attempted draw in MappedStrategy"
74)
75
76FILTERED_SEARCH_STRATEGY_DO_DRAW_LABEL = calc_label_from_name(
77 "single loop iteration in FilteredStrategy"
78)
79
80
81def recursive_property(strategy: "SearchStrategy", name: str, default: object) -> Any:
82 """Handle properties which may be mutually recursive among a set of
83 strategies.
84
85 These are essentially lazily cached properties, with the ability to set
86 an override: If the property has not been explicitly set, we calculate
87 it on first access and memoize the result for later.
88
89 The problem is that for properties that depend on each other, a naive
90 calculation strategy may hit infinite recursion. Consider for example
91 the property is_empty. A strategy defined as x = st.deferred(lambda: x)
92 is certainly empty (in order to draw a value from x we would have to
93 draw a value from x, for which we would have to draw a value from x,
94 ...), but in order to calculate it the naive approach would end up
95 calling x.is_empty in order to calculate x.is_empty in order to etc.
96
97 The solution is one of fixed point calculation. We start with a default
98 value that is the value of the property in the absence of evidence to
99 the contrary, and then update the values of the property for all
100 dependent strategies until we reach a fixed point.
101
102 The approach taken roughly follows that in section 4.2 of Adams,
103 Michael D., Celeste Hollenbeck, and Matthew Might. "On the complexity
104 and performance of parsing with derivatives." ACM SIGPLAN Notices 51.6
105 (2016): 224-236.
106 """
107 cache_key = "cached_" + name
108 calculation = "calc_" + name
109 force_key = "force_" + name
110
111 def forced_value(target: SearchStrategy) -> Any:
112 try:
113 return getattr(target, force_key)
114 except AttributeError:
115 return getattr(target, cache_key)
116
117 try:
118 return forced_value(strategy)
119 except AttributeError:
120 pass
121
122 mapping: dict[SearchStrategy, Any] = {}
123 sentinel = object()
124 hit_recursion = False
125
126 # For a first pass we do a direct recursive calculation of the
127 # property, but we block recursively visiting a value in the
128 # computation of its property: When that happens, we simply
129 # note that it happened and return the default value.
130 def recur(strat: SearchStrategy) -> Any:
131 nonlocal hit_recursion
132 try:
133 return forced_value(strat)
134 except AttributeError:
135 pass
136 result = mapping.get(strat, sentinel)
137 if result is calculating:
138 hit_recursion = True
139 return default
140 elif result is sentinel:
141 mapping[strat] = calculating
142 mapping[strat] = getattr(strat, calculation)(recur)
143 return mapping[strat]
144 return result
145
146 recur(strategy)
147
148 # If we hit self-recursion in the computation of any strategy
149 # value, our mapping at the end is imprecise - it may or may
150 # not have the right values in it. We now need to proceed with
151 # a more careful fixed point calculation to get the exact
152 # values. Hopefully our mapping is still pretty good and it
153 # won't take a large number of updates to reach a fixed point.
154 if hit_recursion:
155 needs_update = set(mapping)
156
157 # We track which strategies use which in the course of
158 # calculating their property value. If A ever uses B in
159 # the course of calculating its value, then whenever the
160 # value of B changes we might need to update the value of
161 # A.
162 listeners: dict[SearchStrategy, set[SearchStrategy]] = defaultdict(set)
163 else:
164 needs_update = None
165
166 def recur2(strat: SearchStrategy) -> Any:
167 def recur_inner(other: SearchStrategy) -> Any:
168 try:
169 return forced_value(other)
170 except AttributeError:
171 pass
172 listeners[other].add(strat)
173 result = mapping.get(other, sentinel)
174 if result is sentinel:
175 assert needs_update is not None
176 needs_update.add(other)
177 mapping[other] = default
178 return default
179 return result
180
181 return recur_inner
182
183 count = 0
184 seen = set()
185 while needs_update:
186 count += 1
187 # If we seem to be taking a really long time to stabilize we
188 # start tracking seen values to attempt to detect an infinite
189 # loop. This should be impossible, and most code will never
190 # hit the count, but having an assertion for it means that
191 # testing is easier to debug and we don't just have a hung
192 # test.
193 # Note: This is actually covered, by test_very_deep_deferral
194 # in tests/cover/test_deferred_strategies.py. Unfortunately it
195 # runs into a coverage bug. See
196 # https://github.com/nedbat/coveragepy/issues/605
197 # for details.
198 if count > 50: # pragma: no cover
199 key = frozenset(mapping.items())
200 assert key not in seen, (key, name)
201 seen.add(key)
202 to_update = needs_update
203 needs_update = set()
204 for strat in to_update:
205 new_value = getattr(strat, calculation)(recur2(strat))
206 if new_value != mapping[strat]:
207 needs_update.update(listeners[strat])
208 mapping[strat] = new_value
209
210 # We now have a complete and accurate calculation of the
211 # property values for everything we have seen in the course of
212 # running this calculation. We simultaneously update all of
213 # them (not just the strategy we started out with).
214 for k, v in mapping.items():
215 setattr(k, cache_key, v)
216 return getattr(strategy, cache_key)
217
218
219class SearchStrategy(Generic[Ex]):
220 """A ``SearchStrategy`` tells Hypothesis how to generate that kind of input.
221
222 This class is only part of the public API for use in type annotations, so that
223 you can write e.g. ``-> SearchStrategy[Foo]`` for your function which returns
224 ``builds(Foo, ...)``. Do not inherit from or directly instantiate this class.
225 """
226
227 validate_called: bool = False
228 __label: Union[int, UniqueIdentifier, None] = None
229 __module__: str = "hypothesis.strategies"
230
231 def _available(self, data: ConjectureData) -> bool:
232 """Returns whether this strategy can *currently* draw any
233 values. This typically useful for stateful testing where ``Bundle``
234 grows over time a list of value to choose from.
235
236 Unlike ``empty`` property, this method's return value may change
237 over time.
238 Note: ``data`` parameter will only be used for introspection and no
239 value drawn from it.
240 """
241 return not self.is_empty
242
243 @property
244 def is_empty(self) -> Any:
245 # Returns True if this strategy can never draw a value and will always
246 # result in the data being marked invalid.
247 # The fact that this returns False does not guarantee that a valid value
248 # can be drawn - this is not intended to be perfect, and is primarily
249 # intended to be an optimisation for some cases.
250 return recursive_property(self, "is_empty", True)
251
252 @property
253 def supports_find(self) -> bool:
254 return True
255
256 # Returns True if values from this strategy can safely be reused without
257 # this causing unexpected behaviour.
258
259 # True if values from this strategy can be implicitly reused (e.g. as
260 # background values in a numpy array) without causing surprising
261 # user-visible behaviour. Should be false for built-in strategies that
262 # produce mutable values, and for strategies that have been mapped/filtered
263 # by arbitrary user-provided functions.
264 @property
265 def has_reusable_values(self) -> Any:
266 return recursive_property(self, "has_reusable_values", True)
267
268 # Whether this strategy is suitable for holding onto in a cache.
269 @property
270 def is_cacheable(self) -> Any:
271 return recursive_property(self, "is_cacheable", True)
272
273 def calc_is_cacheable(self, recur: RecurT) -> bool:
274 return True
275
276 def calc_is_empty(self, recur: RecurT) -> bool:
277 # Note: It is correct and significant that the default return value
278 # from calc_is_empty is False despite the default value for is_empty
279 # being true. The reason for this is that strategies should be treated
280 # as empty absent evidence to the contrary, but most basic strategies
281 # are trivially non-empty and it would be annoying to have to override
282 # this method to show that.
283 return False
284
285 def calc_has_reusable_values(self, recur: RecurT) -> bool:
286 return False
287
288 def example(self) -> Ex: # FIXME
289 """Provide an example of the sort of value that this strategy generates.
290
291 This method is designed for use in a REPL, and will raise an error if
292 called from inside |@given| or a strategy definition. For serious use,
293 see |@composite| or |st.data|.
294 """
295 if getattr(sys, "ps1", None) is None: # pragma: no branch
296 # The other branch *is* covered in cover/test_examples.py; but as that
297 # uses `pexpect` for an interactive session `coverage` doesn't see it.
298 warnings.warn(
299 "The `.example()` method is good for exploring strategies, but should "
300 "only be used interactively. We recommend using `@given` for tests - "
301 "it performs better, saves and replays failures to avoid flakiness, "
302 f"and reports minimal examples. (strategy: {self!r})",
303 NonInteractiveExampleWarning,
304 stacklevel=2,
305 )
306
307 context = _current_build_context.value
308 if context is not None:
309 if context.data is not None and context.data.depth > 0:
310 raise HypothesisException(
311 "Using example() inside a strategy definition is a bad "
312 "idea. Instead consider using hypothesis.strategies.builds() "
313 "or @hypothesis.strategies.composite to define your strategy."
314 " See https://hypothesis.readthedocs.io/en/latest/data.html"
315 "#hypothesis.strategies.builds or "
316 "https://hypothesis.readthedocs.io/en/latest/data.html"
317 "#composite-strategies for more details."
318 )
319 else:
320 raise HypothesisException(
321 "Using example() inside a test function is a bad "
322 "idea. Instead consider using hypothesis.strategies.data() "
323 "to draw more examples during testing. See "
324 "https://hypothesis.readthedocs.io/en/latest/data.html"
325 "#drawing-interactively-in-tests for more details."
326 )
327
328 try:
329 return self.__examples.pop()
330 except (AttributeError, IndexError):
331 self.__examples: list[Ex] = []
332
333 from hypothesis.core import given
334
335 # Note: this function has a weird name because it might appear in
336 # tracebacks, and we want users to know that they can ignore it.
337 @given(self)
338 @settings(
339 database=None,
340 # generate only a few examples at a time to avoid slow interactivity
341 # for large strategies. The overhead of @given is very small relative
342 # to generation, so a small batch size is fine.
343 max_examples=10,
344 deadline=None,
345 verbosity=Verbosity.quiet,
346 phases=(Phase.generate,),
347 suppress_health_check=list(HealthCheck),
348 )
349 def example_generating_inner_function(
350 ex: Ex, # type: ignore # mypy is overzealous in preventing covariant params
351 ) -> None:
352 self.__examples.append(ex)
353
354 example_generating_inner_function()
355 shuffle(self.__examples)
356 return self.__examples.pop()
357
358 def map(self, pack: Callable[[Ex], T]) -> "SearchStrategy[T]":
359 """Returns a new strategy which generates a value from this one, and
360 then returns ``pack(value)``. For example, ``integers().map(str)``
361 could generate ``str(5)`` == ``"5"``.
362 """
363 if is_identity_function(pack):
364 return self # type: ignore # Mypy has no way to know that `Ex == T`
365 return MappedStrategy(self, pack=pack)
366
367 def flatmap(
368 self, expand: Callable[[Ex], "SearchStrategy[T]"]
369 ) -> "SearchStrategy[T]": # FIXME
370 """Old syntax for a special case of |@composite|:
371
372 .. code-block:: python
373
374 @st.composite
375 def flatmap_like(draw, base_strategy, expand):
376 value = draw(base_strategy)
377 new_strategy = expand(value)
378 return draw(new_strategy)
379
380 We find that the greater readability of |@composite| usually outweighs
381 the verbosity, with a few exceptions for simple cases or recipes like
382 ``from_type(type).flatmap(from_type)`` ("pick a type, get a strategy for
383 any instance of that type, and then generate one of those").
384 """
385 from hypothesis.strategies._internal.flatmapped import FlatMapStrategy
386
387 return FlatMapStrategy(self, expand=expand)
388
389 # Note that we previously had condition extracted to a type alias as
390 # PredicateT. However, that was only useful when not specifying a relationship
391 # between the generic Ts and some other function param / return value.
392 # If we do want to - like here, where we want to say that the Ex arg to condition
393 # is of the same type as the strategy's Ex - then you need to write out the
394 # entire Callable[[Ex], Any] expression rather than use a type alias.
395 # TypeAlias is *not* simply a macro that inserts the text. TypeAlias will not
396 # reference the local TypeVar context.
397 def filter(self, condition: Callable[[Ex], Any]) -> "SearchStrategy[Ex]":
398 """Returns a new strategy that generates values from this strategy
399 which satisfy the provided condition.
400
401 Note that if the condition is too hard to satisfy this might result
402 in your tests failing with an Unsatisfiable exception.
403 A basic version of the filtering logic would look something like:
404
405 .. code-block:: python
406
407 @st.composite
408 def filter_like(draw, strategy, condition):
409 for _ in range(3):
410 value = draw(strategy)
411 if condition(value):
412 return value
413 assume(False)
414 """
415 return FilteredStrategy(conditions=(condition,), strategy=self)
416
417 def _filter_for_filtered_draw(
418 self, condition: Callable[[Ex], Any]
419 ) -> "FilteredStrategy[Ex]":
420 # Hook for parent strategies that want to perform fallible filtering
421 # on one of their internal strategies (e.g. UniqueListStrategy).
422 # The returned object must have a `.do_filtered_draw(data)` method
423 # that behaves like `do_draw`, but returns the sentinel object
424 # `filter_not_satisfied` if the condition could not be satisfied.
425
426 # This is separate from the main `filter` method so that strategies
427 # can override `filter` without having to also guarantee a
428 # `do_filtered_draw` method.
429 return FilteredStrategy(conditions=(condition,), strategy=self)
430
431 @property
432 def branches(self) -> Sequence["SearchStrategy[Ex]"]:
433 return [self]
434
435 def __or__(self, other: "SearchStrategy[T]") -> "SearchStrategy[Union[Ex, T]]":
436 """Return a strategy which produces values by randomly drawing from one
437 of this strategy or the other strategy.
438
439 This method is part of the public API.
440 """
441 if not isinstance(other, SearchStrategy):
442 raise ValueError(f"Cannot | a SearchStrategy with {other!r}")
443
444 # Unwrap explicitly or'd strategies. This turns the
445 # common case of e.g. st.integers() | st.integers() | st.integers() from
446 #
447 # one_of(one_of(integers(), integers()), integers())
448 #
449 # into
450 #
451 # one_of(integers(), integers(), integers())
452 #
453 # This is purely an aesthetic unwrapping, for e.g. reprs. In practice
454 # we use .branches / .element_strategies to get the list of possible
455 # strategies, so this unwrapping is *not* necessary for correctness.
456 strategies: list[SearchStrategy] = []
457 strategies.extend(
458 self.original_strategies if isinstance(self, OneOfStrategy) else [self]
459 )
460 strategies.extend(
461 other.original_strategies if isinstance(other, OneOfStrategy) else [other]
462 )
463 return OneOfStrategy(strategies)
464
465 def __bool__(self) -> bool:
466 warnings.warn(
467 f"bool({self!r}) is always True, did you mean to draw a value?",
468 HypothesisWarning,
469 stacklevel=2,
470 )
471 return True
472
473 def validate(self) -> None:
474 """Throw an exception if the strategy is not valid.
475
476 This can happen due to lazy construction
477 """
478 if self.validate_called:
479 return
480 try:
481 self.validate_called = True
482 self.do_validate()
483 self.is_empty
484 self.has_reusable_values
485 except Exception:
486 self.validate_called = False
487 raise
488
489 LABELS: ClassVar[dict[type, int]] = {}
490
491 @property
492 def class_label(self) -> int:
493 cls = self.__class__
494 try:
495 return cls.LABELS[cls]
496 except KeyError:
497 pass
498 result = calc_label_from_cls(cls)
499 cls.LABELS[cls] = result
500 return result
501
502 @property
503 def label(self) -> int:
504 if self.__label is calculating:
505 return 0
506 if self.__label is None:
507 self.__label = calculating
508 self.__label = self.calc_label()
509 return cast(int, self.__label)
510
511 def calc_label(self) -> int:
512 return self.class_label
513
514 def do_validate(self) -> None:
515 pass
516
517 def do_draw(self, data: ConjectureData) -> Ex:
518 raise NotImplementedError(f"{type(self).__name__}.do_draw")
519
520
521def is_hashable(value: object) -> bool:
522 try:
523 hash(value)
524 return True
525 except TypeError:
526 return False
527
528
529class SampledFromStrategy(SearchStrategy[Ex]):
530 """A strategy which samples from a set of elements. This is essentially
531 equivalent to using a OneOfStrategy over Just strategies but may be more
532 efficient and convenient.
533 """
534
535 _MAX_FILTER_CALLS: ClassVar[int] = 10_000
536
537 def __init__(
538 self,
539 elements: Sequence[Ex],
540 repr_: Optional[str] = None,
541 transformations: tuple[
542 tuple[Literal["filter", "map"], Callable[[Ex], Any]],
543 ...,
544 ] = (),
545 ):
546 super().__init__()
547 self.elements = cu.check_sample(elements, "sampled_from")
548 assert self.elements
549 self.repr_ = repr_
550 self._transformations = transformations
551
552 def map(self, pack: Callable[[Ex], T]) -> SearchStrategy[T]:
553 s = type(self)(
554 self.elements,
555 repr_=self.repr_,
556 transformations=(*self._transformations, ("map", pack)),
557 )
558 # guaranteed by the ("map", pack) transformation
559 return cast(SearchStrategy[T], s)
560
561 def filter(self, condition: Callable[[Ex], Any]) -> SearchStrategy[Ex]:
562 return type(self)(
563 self.elements,
564 repr_=self.repr_,
565 transformations=(*self._transformations, ("filter", condition)),
566 )
567
568 def __repr__(self) -> str:
569 return (
570 self.repr_
571 or "sampled_from(["
572 + ", ".join(map(get_pretty_function_description, self.elements))
573 + "])"
574 ) + "".join(
575 f".{name}({get_pretty_function_description(f)})"
576 for name, f in self._transformations
577 )
578
579 def calc_label(self) -> int:
580 # strategy.label is effectively an under-approximation of structural
581 # equality (i.e., some strategies may have the same label when they are not
582 # structurally identical). More importantly for calculating the
583 # SampledFromStrategy label, we might have hash(s1) != hash(s2) even
584 # when s1 and s2 are structurally identical. For instance:
585 #
586 # s1 = st.sampled_from([st.none()])
587 # s2 = st.sampled_from([st.none()])
588 # assert hash(s1) != hash(s2)
589 #
590 # (see also test cases in test_labels.py).
591 #
592 # We therefore use the labels of any component strategies when calculating
593 # our label, and only use the hash if it is not a strategy.
594 #
595 # That's the ideal, anyway. In reality the logic is more complicated than
596 # necessary in order to be efficient in the presence of (very) large sequences:
597 # * add an unabashed special case for range, to avoid iteration over an
598 # enormous range when we know it is entirely integers.
599 # * if there is at least one strategy in self.elements, use strategy label,
600 # and the element hash otherwise.
601 # * if there are no strategies in self.elements, take the hash of the
602 # entire sequence. This prevents worst-case performance of hashing each
603 # element when a hash of the entire sequence would have sufficed.
604 #
605 # The worst case performance of this scheme is
606 # itertools.chain(range(2**100), [st.none()]), where it degrades to
607 # hashing every int in the range.
608
609 if isinstance(self.elements, range) or (
610 is_hashable(self.elements)
611 and not any(isinstance(e, SearchStrategy) for e in self.elements)
612 ):
613 return combine_labels(self.class_label, calc_label_from_hash(self.elements))
614
615 labels = [self.class_label]
616 for element in self.elements:
617 if not is_hashable(element):
618 continue
619
620 labels.append(
621 element.label
622 if isinstance(element, SearchStrategy)
623 else calc_label_from_hash(element)
624 )
625
626 return combine_labels(*labels)
627
628 def calc_has_reusable_values(self, recur: RecurT) -> bool:
629 # Because our custom .map/.filter implementations skip the normal
630 # wrapper strategies (which would automatically return False for us),
631 # we need to manually return False here if any transformations have
632 # been applied.
633 return not self._transformations
634
635 def calc_is_cacheable(self, recur: RecurT) -> bool:
636 return is_hashable(self.elements)
637
638 def _transform(
639 self,
640 # https://github.com/python/mypy/issues/7049, we're not writing `element`
641 # anywhere in the class so this is still type-safe. mypy is being more
642 # conservative than necessary
643 element: Ex, # type: ignore
644 ) -> Union[Ex, UniqueIdentifier]:
645 # Used in UniqueSampledListStrategy
646 for name, f in self._transformations:
647 if name == "map":
648 result = f(element)
649 if build_context := _current_build_context.value:
650 build_context.record_call(result, f, [element], {})
651 element = result
652 else:
653 assert name == "filter"
654 if not f(element):
655 return filter_not_satisfied
656 return element
657
658 def do_draw(self, data: ConjectureData) -> Ex:
659 result = self.do_filtered_draw(data)
660 if isinstance(result, SearchStrategy) and all(
661 isinstance(x, SearchStrategy) for x in self.elements
662 ):
663 data._sampled_from_all_strategies_elements_message = (
664 "sample_from was given a collection of strategies: "
665 "{!r}. Was one_of intended?",
666 self.elements,
667 )
668 if result is filter_not_satisfied:
669 data.mark_invalid(f"Aborted test because unable to satisfy {self!r}")
670 assert not isinstance(result, UniqueIdentifier)
671 return result
672
673 def get_element(self, i: int) -> Union[Ex, UniqueIdentifier]:
674 return self._transform(self.elements[i])
675
676 def do_filtered_draw(self, data: ConjectureData) -> Union[Ex, UniqueIdentifier]:
677 # Set of indices that have been tried so far, so that we never test
678 # the same element twice during a draw.
679 known_bad_indices: set[int] = set()
680
681 # Start with ordinary rejection sampling. It's fast if it works, and
682 # if it doesn't work then it was only a small amount of overhead.
683 for _ in range(3):
684 i = data.draw_integer(0, len(self.elements) - 1)
685 if i not in known_bad_indices:
686 element = self.get_element(i)
687 if element is not filter_not_satisfied:
688 return element
689 if not known_bad_indices:
690 data.events[f"Retried draw from {self!r} to satisfy filter"] = ""
691 known_bad_indices.add(i)
692
693 # If we've tried all the possible elements, give up now.
694 max_good_indices = len(self.elements) - len(known_bad_indices)
695 if not max_good_indices:
696 return filter_not_satisfied
697
698 # Impose an arbitrary cutoff to prevent us from wasting too much time
699 # on very large element lists.
700 max_good_indices = min(max_good_indices, self._MAX_FILTER_CALLS - 3)
701
702 # Before building the list of allowed indices, speculatively choose
703 # one of them. We don't yet know how many allowed indices there will be,
704 # so this choice might be out-of-bounds, but that's OK.
705 speculative_index = data.draw_integer(0, max_good_indices - 1)
706
707 # Calculate the indices of allowed values, so that we can choose one
708 # of them at random. But if we encounter the speculatively-chosen one,
709 # just use that and return immediately. Note that we also track the
710 # allowed elements, in case of .map(some_stateful_function)
711 allowed: list[tuple[int, Ex]] = []
712 for i in range(min(len(self.elements), self._MAX_FILTER_CALLS - 3)):
713 if i not in known_bad_indices:
714 element = self.get_element(i)
715 if element is not filter_not_satisfied:
716 assert not isinstance(element, UniqueIdentifier)
717 allowed.append((i, element))
718 if len(allowed) > speculative_index:
719 # Early-exit case: We reached the speculative index, so
720 # we just return the corresponding element.
721 data.draw_integer(0, len(self.elements) - 1, forced=i)
722 return element
723
724 # The speculative index didn't work out, but at this point we've built
725 # and can choose from the complete list of allowed indices and elements.
726 if allowed:
727 i, element = data.choice(allowed)
728 data.draw_integer(0, len(self.elements) - 1, forced=i)
729 return element
730 # If there are no allowed indices, the filter couldn't be satisfied.
731 return filter_not_satisfied
732
733
734class OneOfStrategy(SearchStrategy[Ex]):
735 """Implements a union of strategies. Given a number of strategies this
736 generates values which could have come from any of them.
737
738 The conditional distribution draws uniformly at random from some
739 non-empty subset of these strategies and then draws from the
740 conditional distribution of that strategy.
741 """
742
743 def __init__(self, strategies: Sequence[SearchStrategy[Ex]]):
744 super().__init__()
745 self.original_strategies = tuple(strategies)
746 self.__element_strategies: Optional[Sequence[SearchStrategy[Ex]]] = None
747 self.__in_branches = False
748
749 def calc_is_empty(self, recur: RecurT) -> bool:
750 return all(recur(e) for e in self.original_strategies)
751
752 def calc_has_reusable_values(self, recur: RecurT) -> bool:
753 return all(recur(e) for e in self.original_strategies)
754
755 def calc_is_cacheable(self, recur: RecurT) -> bool:
756 return all(recur(e) for e in self.original_strategies)
757
758 @property
759 def element_strategies(self) -> Sequence[SearchStrategy[Ex]]:
760 if self.__element_strategies is None:
761 # While strategies are hashable, they use object.__hash__ and are
762 # therefore distinguished only by identity.
763 #
764 # In principle we could "just" define a __hash__ method
765 # (and __eq__, but that's easy in terms of type() and hash())
766 # to make this more powerful, but this is harder than it sounds:
767 #
768 # 1. Strategies are often distinguished by non-hashable attributes,
769 # or by attributes that have the same hash value ("^.+" / b"^.+").
770 # 2. LazyStrategy: can't reify the wrapped strategy without breaking
771 # laziness, so there's a hash each for the lazy and the nonlazy.
772 #
773 # Having made several attempts, the minor benefits of making strategies
774 # hashable are simply not worth the engineering effort it would take.
775 # See also issues #2291 and #2327.
776 seen: set[SearchStrategy] = {self}
777 strategies: list[SearchStrategy] = []
778 for arg in self.original_strategies:
779 check_strategy(arg)
780 if not arg.is_empty:
781 for s in arg.branches:
782 if s not in seen and not s.is_empty:
783 seen.add(s)
784 strategies.append(s)
785 self.__element_strategies = strategies
786 return self.__element_strategies
787
788 def calc_label(self) -> int:
789 return combine_labels(
790 self.class_label, *(p.label for p in self.original_strategies)
791 )
792
793 def do_draw(self, data: ConjectureData) -> Ex:
794 strategy = data.draw(
795 SampledFromStrategy(self.element_strategies).filter(
796 lambda s: s._available(data)
797 )
798 )
799 return data.draw(strategy)
800
801 def __repr__(self) -> str:
802 return "one_of({})".format(", ".join(map(repr, self.original_strategies)))
803
804 def do_validate(self) -> None:
805 for e in self.element_strategies:
806 e.validate()
807
808 @property
809 def branches(self) -> Sequence[SearchStrategy[Ex]]:
810 if not self.__in_branches:
811 try:
812 self.__in_branches = True
813 return self.element_strategies
814 finally:
815 self.__in_branches = False
816 else:
817 return [self]
818
819 def filter(self, condition: Callable[[Ex], Any]) -> SearchStrategy[Ex]:
820 return FilteredStrategy(
821 OneOfStrategy([s.filter(condition) for s in self.original_strategies]),
822 conditions=(),
823 )
824
825
826@overload
827def one_of(
828 __args: Sequence[SearchStrategy[Ex]],
829) -> SearchStrategy[Ex]: # pragma: no cover
830 ...
831
832
833@overload
834def one_of(__a1: SearchStrategy[Ex]) -> SearchStrategy[Ex]: # pragma: no cover
835 ...
836
837
838@overload
839def one_of(
840 __a1: SearchStrategy[Ex], __a2: SearchStrategy[T]
841) -> SearchStrategy[Union[Ex, T]]: # pragma: no cover
842 ...
843
844
845@overload
846def one_of(
847 __a1: SearchStrategy[Ex], __a2: SearchStrategy[T], __a3: SearchStrategy[T3]
848) -> SearchStrategy[Union[Ex, T, T3]]: # pragma: no cover
849 ...
850
851
852@overload
853def one_of(
854 __a1: SearchStrategy[Ex],
855 __a2: SearchStrategy[T],
856 __a3: SearchStrategy[T3],
857 __a4: SearchStrategy[T4],
858) -> SearchStrategy[Union[Ex, T, T3, T4]]: # pragma: no cover
859 ...
860
861
862@overload
863def one_of(
864 __a1: SearchStrategy[Ex],
865 __a2: SearchStrategy[T],
866 __a3: SearchStrategy[T3],
867 __a4: SearchStrategy[T4],
868 __a5: SearchStrategy[T5],
869) -> SearchStrategy[Union[Ex, T, T3, T4, T5]]: # pragma: no cover
870 ...
871
872
873@overload
874def one_of(*args: SearchStrategy[Any]) -> SearchStrategy[Any]: # pragma: no cover
875 ...
876
877
878@defines_strategy(never_lazy=True)
879def one_of(
880 *args: Union[Sequence[SearchStrategy[Any]], SearchStrategy[Any]]
881) -> SearchStrategy[Any]:
882 # Mypy workaround alert: Any is too loose above; the return parameter
883 # should be the union of the input parameters. Unfortunately, Mypy <=0.600
884 # raises errors due to incompatible inputs instead. See #1270 for links.
885 # v0.610 doesn't error; it gets inference wrong for 2+ arguments instead.
886 """Return a strategy which generates values from any of the argument
887 strategies.
888
889 This may be called with one iterable argument instead of multiple
890 strategy arguments, in which case ``one_of(x)`` and ``one_of(*x)`` are
891 equivalent.
892
893 Examples from this strategy will generally shrink to ones that come from
894 strategies earlier in the list, then shrink according to behaviour of the
895 strategy that produced them. In order to get good shrinking behaviour,
896 try to put simpler strategies first. e.g. ``one_of(none(), text())`` is
897 better than ``one_of(text(), none())``.
898
899 This is especially important when using recursive strategies. e.g.
900 ``x = st.deferred(lambda: st.none() | st.tuples(x, x))`` will shrink well,
901 but ``x = st.deferred(lambda: st.tuples(x, x) | st.none())`` will shrink
902 very badly indeed.
903 """
904 if len(args) == 1 and not isinstance(args[0], SearchStrategy):
905 try:
906 args = tuple(args[0])
907 except TypeError:
908 pass
909 if len(args) == 1 and isinstance(args[0], SearchStrategy):
910 # This special-case means that we can one_of over lists of any size
911 # without incurring any performance overhead when there is only one
912 # strategy, and keeps our reprs simple.
913 return args[0]
914 if args and not any(isinstance(a, SearchStrategy) for a in args):
915 # And this special case is to give a more-specific error message if it
916 # seems that the user has confused `one_of()` for `sampled_from()`;
917 # the remaining validation is left to OneOfStrategy. See PR #2627.
918 raise InvalidArgument(
919 f"Did you mean st.sampled_from({list(args)!r})? st.one_of() is used "
920 "to combine strategies, but all of the arguments were of other types."
921 )
922 # we've handled the case where args is a one-element sequence [(s1, s2, ...)]
923 # above, so we can assume it's an actual sequence of strategies.
924 args = cast(Sequence[SearchStrategy], args)
925 return OneOfStrategy(args)
926
927
928class MappedStrategy(SearchStrategy[MappedTo], Generic[MappedFrom, MappedTo]):
929 """A strategy which is defined purely by conversion to and from another
930 strategy.
931
932 Its parameter and distribution come from that other strategy.
933 """
934
935 def __init__(
936 self,
937 strategy: SearchStrategy[MappedFrom],
938 pack: Callable[[MappedFrom], MappedTo],
939 ) -> None:
940 super().__init__()
941 self.mapped_strategy = strategy
942 self.pack = pack
943
944 def calc_is_empty(self, recur: RecurT) -> bool:
945 return recur(self.mapped_strategy)
946
947 def calc_is_cacheable(self, recur: RecurT) -> bool:
948 return recur(self.mapped_strategy)
949
950 def __repr__(self) -> str:
951 if not hasattr(self, "_cached_repr"):
952 self._cached_repr = f"{self.mapped_strategy!r}.map({get_pretty_function_description(self.pack)})"
953 return self._cached_repr
954
955 def do_validate(self) -> None:
956 self.mapped_strategy.validate()
957
958 def do_draw(self, data: ConjectureData) -> MappedTo:
959 with warnings.catch_warnings():
960 if isinstance(self.pack, type) and issubclass(
961 self.pack, (abc.Mapping, abc.Set)
962 ):
963 warnings.simplefilter("ignore", BytesWarning)
964 for _ in range(3):
965 try:
966 data.start_span(MAPPED_SEARCH_STRATEGY_DO_DRAW_LABEL)
967 x = data.draw(self.mapped_strategy)
968 result = self.pack(x)
969 data.stop_span()
970 current_build_context().record_call(result, self.pack, [x], {})
971 return result
972 except UnsatisfiedAssumption:
973 data.stop_span(discard=True)
974 raise UnsatisfiedAssumption
975
976 @property
977 def branches(self) -> Sequence[SearchStrategy[MappedTo]]:
978 return [
979 MappedStrategy(strategy, pack=self.pack)
980 for strategy in self.mapped_strategy.branches
981 ]
982
983 def filter(
984 self, condition: Callable[[MappedTo], Any]
985 ) -> "SearchStrategy[MappedTo]":
986 # Includes a special case so that we can rewrite filters on collection
987 # lengths, when most collections are `st.lists(...).map(the_type)`.
988 ListStrategy = _list_strategy_type()
989 if not isinstance(self.mapped_strategy, ListStrategy) or not (
990 (isinstance(self.pack, type) and issubclass(self.pack, abc.Collection))
991 or self.pack in _collection_ish_functions()
992 ):
993 return super().filter(condition)
994
995 # Check whether our inner list strategy can rewrite this filter condition.
996 # If not, discard the result and _only_ apply a new outer filter.
997 new = ListStrategy.filter(self.mapped_strategy, condition)
998 if getattr(new, "filtered_strategy", None) is self.mapped_strategy:
999 return super().filter(condition) # didn't rewrite
1000
1001 # Apply a new outer filter even though we rewrote the inner strategy,
1002 # because some collections can change the list length (dict, set, etc).
1003 return FilteredStrategy(type(self)(new, self.pack), conditions=(condition,))
1004
1005
1006@lru_cache
1007def _list_strategy_type() -> Any:
1008 from hypothesis.strategies._internal.collections import ListStrategy
1009
1010 return ListStrategy
1011
1012
1013def _collection_ish_functions() -> Sequence[Any]:
1014 funcs = [sorted]
1015 if np := sys.modules.get("numpy"):
1016 # c.f. https://numpy.org/doc/stable/reference/routines.array-creation.html
1017 # Probably only `np.array` and `np.asarray` will be used in practice,
1018 # but why should that stop us when we've already gone this far?
1019 funcs += [
1020 np.empty_like,
1021 np.eye,
1022 np.identity,
1023 np.ones_like,
1024 np.zeros_like,
1025 np.array,
1026 np.asarray,
1027 np.asanyarray,
1028 np.ascontiguousarray,
1029 np.asmatrix,
1030 np.copy,
1031 np.rec.array,
1032 np.rec.fromarrays,
1033 np.rec.fromrecords,
1034 np.diag,
1035 # bonus undocumented functions from tab-completion:
1036 np.asarray_chkfinite,
1037 np.asfortranarray,
1038 ]
1039
1040 return funcs
1041
1042
1043filter_not_satisfied = UniqueIdentifier("filter not satisfied")
1044
1045
1046class FilteredStrategy(SearchStrategy[Ex]):
1047 def __init__(
1048 self, strategy: SearchStrategy[Ex], conditions: tuple[Callable[[Ex], Any], ...]
1049 ):
1050 super().__init__()
1051 if isinstance(strategy, FilteredStrategy):
1052 # Flatten chained filters into a single filter with multiple conditions.
1053 self.flat_conditions: tuple[Callable[[Ex], Any], ...] = (
1054 strategy.flat_conditions + conditions
1055 )
1056 self.filtered_strategy: SearchStrategy[Ex] = strategy.filtered_strategy
1057 else:
1058 self.flat_conditions = conditions
1059 self.filtered_strategy = strategy
1060
1061 assert isinstance(self.flat_conditions, tuple)
1062 assert not isinstance(self.filtered_strategy, FilteredStrategy)
1063
1064 self.__condition: Optional[Callable[[Ex], Any]] = None
1065
1066 def calc_is_empty(self, recur: RecurT) -> bool:
1067 return recur(self.filtered_strategy)
1068
1069 def calc_is_cacheable(self, recur: RecurT) -> bool:
1070 return recur(self.filtered_strategy)
1071
1072 def __repr__(self) -> str:
1073 if not hasattr(self, "_cached_repr"):
1074 self._cached_repr = "{!r}{}".format(
1075 self.filtered_strategy,
1076 "".join(
1077 f".filter({get_pretty_function_description(cond)})"
1078 for cond in self.flat_conditions
1079 ),
1080 )
1081 return self._cached_repr
1082
1083 def do_validate(self) -> None:
1084 # Start by validating our inner filtered_strategy. If this was a LazyStrategy,
1085 # validation also reifies it so that subsequent calls to e.g. `.filter()` will
1086 # be passed through.
1087 self.filtered_strategy.validate()
1088 # So now we have a reified inner strategy, we'll replay all our saved
1089 # predicates in case some or all of them can be rewritten. Note that this
1090 # replaces the `fresh` strategy too!
1091 fresh = self.filtered_strategy
1092 for cond in self.flat_conditions:
1093 fresh = fresh.filter(cond)
1094 if isinstance(fresh, FilteredStrategy):
1095 # In this case we have at least some non-rewritten filter predicates,
1096 # so we just re-initialize the strategy.
1097 FilteredStrategy.__init__(
1098 self, fresh.filtered_strategy, fresh.flat_conditions
1099 )
1100 else:
1101 # But if *all* the predicates were rewritten... well, do_validate() is
1102 # an in-place method so we still just re-initialize the strategy!
1103 FilteredStrategy.__init__(self, fresh, ())
1104
1105 def filter(self, condition: Callable[[Ex], Any]) -> "FilteredStrategy[Ex]":
1106 # If we can, it's more efficient to rewrite our strategy to satisfy the
1107 # condition. We therefore exploit the fact that the order of predicates
1108 # doesn't matter (`f(x) and g(x) == g(x) and f(x)`) by attempting to apply
1109 # condition directly to our filtered strategy as the inner-most filter.
1110 out = self.filtered_strategy.filter(condition)
1111 # If it couldn't be rewritten, we'll get a new FilteredStrategy - and then
1112 # combine the conditions of each in our expected newest=last order.
1113 if isinstance(out, FilteredStrategy):
1114 return FilteredStrategy(
1115 out.filtered_strategy, self.flat_conditions + out.flat_conditions
1116 )
1117 # But if it *could* be rewritten, we can return the more efficient form!
1118 return FilteredStrategy(out, self.flat_conditions)
1119
1120 @property
1121 def condition(self) -> Callable[[Ex], Any]:
1122 if self.__condition is None:
1123 if len(self.flat_conditions) == 1:
1124 # Avoid an extra indirection in the common case of only one condition.
1125 self.__condition = self.flat_conditions[0]
1126 elif len(self.flat_conditions) == 0:
1127 # Possible, if unlikely, due to filter predicate rewriting
1128 self.__condition = lambda _: True # type: ignore # covariant type param
1129 else:
1130 self.__condition = lambda x: all( # type: ignore # covariant type param
1131 cond(x) for cond in self.flat_conditions
1132 )
1133 return self.__condition
1134
1135 def do_draw(self, data: ConjectureData) -> Ex:
1136 result = self.do_filtered_draw(data)
1137 if result is not filter_not_satisfied:
1138 return cast(Ex, result)
1139
1140 data.mark_invalid(f"Aborted test because unable to satisfy {self!r}")
1141
1142 def do_filtered_draw(self, data: ConjectureData) -> Union[Ex, UniqueIdentifier]:
1143 for i in range(3):
1144 data.start_span(FILTERED_SEARCH_STRATEGY_DO_DRAW_LABEL)
1145 value = data.draw(self.filtered_strategy)
1146 if self.condition(value):
1147 data.stop_span()
1148 return value
1149 else:
1150 data.stop_span(discard=True)
1151 if i == 0:
1152 data.events[f"Retried draw from {self!r} to satisfy filter"] = ""
1153
1154 return filter_not_satisfied
1155
1156 @property
1157 def branches(self) -> Sequence[SearchStrategy[Ex]]:
1158 return [
1159 FilteredStrategy(strategy=strategy, conditions=self.flat_conditions)
1160 for strategy in self.filtered_strategy.branches
1161 ]
1162
1163
1164@check_function
1165def check_strategy(arg: object, name: str = "") -> None:
1166 assert isinstance(name, str)
1167 if not isinstance(arg, SearchStrategy):
1168 hint = ""
1169 if isinstance(arg, (list, tuple)):
1170 hint = ", such as st.sampled_from({}),".format(name or "...")
1171 if name:
1172 name += "="
1173 raise InvalidArgument(
1174 f"Expected a SearchStrategy{hint} but got {name}{arg!r} "
1175 f"(type={type(arg).__name__})"
1176 )