1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
10
11import sys
12import warnings
13from collections import abc, defaultdict
14from functools import lru_cache
15from random import shuffle
16from typing import (
17 TYPE_CHECKING,
18 Any,
19 Callable,
20 ClassVar,
21 Dict,
22 Generic,
23 List,
24 Sequence,
25 TypeVar,
26 Union,
27 cast,
28 overload,
29)
30
31from hypothesis._settings import HealthCheck, Phase, Verbosity, settings
32from hypothesis.control import _current_build_context
33from hypothesis.errors import (
34 HypothesisException,
35 HypothesisWarning,
36 InvalidArgument,
37 NonInteractiveExampleWarning,
38 UnsatisfiedAssumption,
39)
40from hypothesis.internal.conjecture import utils as cu
41from hypothesis.internal.conjecture.data import ConjectureData
42from hypothesis.internal.conjecture.utils import (
43 calc_label_from_cls,
44 calc_label_from_name,
45 combine_labels,
46)
47from hypothesis.internal.coverage import check_function
48from hypothesis.internal.reflection import (
49 get_pretty_function_description,
50 is_identity_function,
51)
52from hypothesis.strategies._internal.utils import defines_strategy
53from hypothesis.utils.conventions import UniqueIdentifier
54
55# TODO: Use `(3, 13)` once Python 3.13 is released.
56if sys.version_info >= (3, 13, 0, "final"):
57 Ex = TypeVar("Ex", covariant=True, default=Any)
58elif TYPE_CHECKING:
59 from typing_extensions import TypeVar # type: ignore[assignment]
60
61 Ex = TypeVar("Ex", covariant=True, default=Any) # type: ignore[call-arg,misc]
62else:
63 Ex = TypeVar("Ex", covariant=True)
64
65Ex_Inv = TypeVar("Ex_Inv")
66T = TypeVar("T")
67T3 = TypeVar("T3")
68T4 = TypeVar("T4")
69T5 = TypeVar("T5")
70
71calculating = UniqueIdentifier("calculating")
72
73MAPPED_SEARCH_STRATEGY_DO_DRAW_LABEL = calc_label_from_name(
74 "another attempted draw in MappedStrategy"
75)
76
77FILTERED_SEARCH_STRATEGY_DO_DRAW_LABEL = calc_label_from_name(
78 "single loop iteration in FilteredStrategy"
79)
80
81
82def recursive_property(name, default):
83 """Handle properties which may be mutually recursive among a set of
84 strategies.
85
86 These are essentially lazily cached properties, with the ability to set
87 an override: If the property has not been explicitly set, we calculate
88 it on first access and memoize the result for later.
89
90 The problem is that for properties that depend on each other, a naive
91 calculation strategy may hit infinite recursion. Consider for example
92 the property is_empty. A strategy defined as x = st.deferred(lambda: x)
93 is certainly empty (in order to draw a value from x we would have to
94 draw a value from x, for which we would have to draw a value from x,
95 ...), but in order to calculate it the naive approach would end up
96 calling x.is_empty in order to calculate x.is_empty in order to etc.
97
98 The solution is one of fixed point calculation. We start with a default
99 value that is the value of the property in the absence of evidence to
100 the contrary, and then update the values of the property for all
101 dependent strategies until we reach a fixed point.
102
103 The approach taken roughly follows that in section 4.2 of Adams,
104 Michael D., Celeste Hollenbeck, and Matthew Might. "On the complexity
105 and performance of parsing with derivatives." ACM SIGPLAN Notices 51.6
106 (2016): 224-236.
107 """
108 cache_key = "cached_" + name
109 calculation = "calc_" + name
110 force_key = "force_" + name
111
112 def forced_value(target):
113 try:
114 return getattr(target, force_key)
115 except AttributeError:
116 return getattr(target, cache_key)
117
118 def accept(self):
119 try:
120 return forced_value(self)
121 except AttributeError:
122 pass
123
124 mapping = {}
125 sentinel = object()
126 hit_recursion = [False]
127
128 # For a first pass we do a direct recursive calculation of the
129 # property, but we block recursively visiting a value in the
130 # computation of its property: When that happens, we simply
131 # note that it happened and return the default value.
132 def recur(strat):
133 try:
134 return forced_value(strat)
135 except AttributeError:
136 pass
137 result = mapping.get(strat, sentinel)
138 if result is calculating:
139 hit_recursion[0] = True
140 return default
141 elif result is sentinel:
142 mapping[strat] = calculating
143 mapping[strat] = getattr(strat, calculation)(recur)
144 return mapping[strat]
145 return result
146
147 recur(self)
148
149 # If we hit self-recursion in the computation of any strategy
150 # value, our mapping at the end is imprecise - it may or may
151 # not have the right values in it. We now need to proceed with
152 # a more careful fixed point calculation to get the exact
153 # values. Hopefully our mapping is still pretty good and it
154 # won't take a large number of updates to reach a fixed point.
155 if hit_recursion[0]:
156 needs_update = set(mapping)
157
158 # We track which strategies use which in the course of
159 # calculating their property value. If A ever uses B in
160 # the course of calculating its value, then whenever the
161 # value of B changes we might need to update the value of
162 # A.
163 listeners = defaultdict(set)
164 else:
165 needs_update = None
166
167 def recur2(strat):
168 def recur_inner(other):
169 try:
170 return forced_value(other)
171 except AttributeError:
172 pass
173 listeners[other].add(strat)
174 result = mapping.get(other, sentinel)
175 if result is sentinel:
176 needs_update.add(other)
177 mapping[other] = default
178 return default
179 return result
180
181 return recur_inner
182
183 count = 0
184 seen = set()
185 while needs_update:
186 count += 1
187 # If we seem to be taking a really long time to stabilize we
188 # start tracking seen values to attempt to detect an infinite
189 # loop. This should be impossible, and most code will never
190 # hit the count, but having an assertion for it means that
191 # testing is easier to debug and we don't just have a hung
192 # test.
193 # Note: This is actually covered, by test_very_deep_deferral
194 # in tests/cover/test_deferred_strategies.py. Unfortunately it
195 # runs into a coverage bug. See
196 # https://github.com/nedbat/coveragepy/issues/605
197 # for details.
198 if count > 50: # pragma: no cover
199 key = frozenset(mapping.items())
200 assert key not in seen, (key, name)
201 seen.add(key)
202 to_update = needs_update
203 needs_update = set()
204 for strat in to_update:
205 new_value = getattr(strat, calculation)(recur2(strat))
206 if new_value != mapping[strat]:
207 needs_update.update(listeners[strat])
208 mapping[strat] = new_value
209
210 # We now have a complete and accurate calculation of the
211 # property values for everything we have seen in the course of
212 # running this calculation. We simultaneously update all of
213 # them (not just the strategy we started out with).
214 for k, v in mapping.items():
215 setattr(k, cache_key, v)
216 return getattr(self, cache_key)
217
218 accept.__name__ = name
219 return property(accept)
220
221
222class SearchStrategy(Generic[Ex]):
223 """A SearchStrategy is an object that knows how to explore data of a given
224 type.
225
226 Except where noted otherwise, methods on this class are not part of
227 the public API and their behaviour may change significantly between
228 minor version releases. They will generally be stable between patch
229 releases.
230 """
231
232 supports_find = True
233 validate_called = False
234 __label = None
235 __module__ = "hypothesis.strategies"
236
237 def available(self, data):
238 """Returns whether this strategy can *currently* draw any
239 values. This typically useful for stateful testing where ``Bundle``
240 grows over time a list of value to choose from.
241
242 Unlike ``empty`` property, this method's return value may change
243 over time.
244 Note: ``data`` parameter will only be used for introspection and no
245 value drawn from it.
246 """
247 return not self.is_empty
248
249 # Returns True if this strategy can never draw a value and will always
250 # result in the data being marked invalid.
251 # The fact that this returns False does not guarantee that a valid value
252 # can be drawn - this is not intended to be perfect, and is primarily
253 # intended to be an optimisation for some cases.
254 is_empty = recursive_property("is_empty", True)
255
256 # Returns True if values from this strategy can safely be reused without
257 # this causing unexpected behaviour.
258
259 # True if values from this strategy can be implicitly reused (e.g. as
260 # background values in a numpy array) without causing surprising
261 # user-visible behaviour. Should be false for built-in strategies that
262 # produce mutable values, and for strategies that have been mapped/filtered
263 # by arbitrary user-provided functions.
264 has_reusable_values = recursive_property("has_reusable_values", True)
265
266 # Whether this strategy is suitable for holding onto in a cache.
267 is_cacheable = recursive_property("is_cacheable", True)
268
269 def calc_is_cacheable(self, recur):
270 return True
271
272 def calc_is_empty(self, recur):
273 # Note: It is correct and significant that the default return value
274 # from calc_is_empty is False despite the default value for is_empty
275 # being true. The reason for this is that strategies should be treated
276 # as empty absent evidence to the contrary, but most basic strategies
277 # are trivially non-empty and it would be annoying to have to override
278 # this method to show that.
279 return False
280
281 def calc_has_reusable_values(self, recur):
282 return False
283
284 def example(self) -> Ex:
285 """Provide an example of the sort of value that this strategy
286 generates. This is biased to be slightly simpler than is typical for
287 values from this strategy, for clarity purposes.
288
289 This method shouldn't be taken too seriously. It's here for interactive
290 exploration of the API, not for any sort of real testing.
291
292 This method is part of the public API.
293 """
294 if getattr(sys, "ps1", None) is None: # pragma: no branch
295 # The other branch *is* covered in cover/test_examples.py; but as that
296 # uses `pexpect` for an interactive session `coverage` doesn't see it.
297 warnings.warn(
298 "The `.example()` method is good for exploring strategies, but should "
299 "only be used interactively. We recommend using `@given` for tests - "
300 "it performs better, saves and replays failures to avoid flakiness, "
301 "and reports minimal examples. (strategy: %r)" % (self,),
302 NonInteractiveExampleWarning,
303 stacklevel=2,
304 )
305
306 context = _current_build_context.value
307 if context is not None:
308 if context.data is not None and context.data.depth > 0:
309 raise HypothesisException(
310 "Using example() inside a strategy definition is a bad "
311 "idea. Instead consider using hypothesis.strategies.builds() "
312 "or @hypothesis.strategies.composite to define your strategy."
313 " See https://hypothesis.readthedocs.io/en/latest/data.html"
314 "#hypothesis.strategies.builds or "
315 "https://hypothesis.readthedocs.io/en/latest/data.html"
316 "#composite-strategies for more details."
317 )
318 else:
319 raise HypothesisException(
320 "Using example() inside a test function is a bad "
321 "idea. Instead consider using hypothesis.strategies.data() "
322 "to draw more examples during testing. See "
323 "https://hypothesis.readthedocs.io/en/latest/data.html"
324 "#drawing-interactively-in-tests for more details."
325 )
326
327 try:
328 return self.__examples.pop()
329 except (AttributeError, IndexError):
330 self.__examples: List[Ex] = []
331
332 from hypothesis.core import given
333
334 # Note: this function has a weird name because it might appear in
335 # tracebacks, and we want users to know that they can ignore it.
336 @given(self)
337 @settings(
338 database=None,
339 max_examples=100,
340 deadline=None,
341 verbosity=Verbosity.quiet,
342 phases=(Phase.generate,),
343 suppress_health_check=list(HealthCheck),
344 )
345 def example_generating_inner_function(ex):
346 self.__examples.append(ex)
347
348 example_generating_inner_function()
349 shuffle(self.__examples)
350 return self.__examples.pop()
351
352 def map(self, pack: Callable[[Ex], T]) -> "SearchStrategy[T]":
353 """Returns a new strategy that generates values by generating a value
354 from this strategy and then calling pack() on the result, giving that.
355
356 This method is part of the public API.
357 """
358 if is_identity_function(pack):
359 return self # type: ignore # Mypy has no way to know that `Ex == T`
360 return MappedStrategy(self, pack=pack)
361
362 def flatmap(
363 self, expand: Callable[[Ex], "SearchStrategy[T]"]
364 ) -> "SearchStrategy[T]":
365 """Returns a new strategy that generates values by generating a value
366 from this strategy, say x, then generating a value from
367 strategy(expand(x))
368
369 This method is part of the public API.
370 """
371 from hypothesis.strategies._internal.flatmapped import FlatMapStrategy
372
373 return FlatMapStrategy(expand=expand, strategy=self)
374
375 def filter(self, condition: Callable[[Ex], Any]) -> "SearchStrategy[Ex]":
376 """Returns a new strategy that generates values from this strategy
377 which satisfy the provided condition. Note that if the condition is too
378 hard to satisfy this might result in your tests failing with
379 Unsatisfiable.
380
381 This method is part of the public API.
382 """
383 return FilteredStrategy(conditions=(condition,), strategy=self)
384
385 def _filter_for_filtered_draw(self, condition):
386 # Hook for parent strategies that want to perform fallible filtering
387 # on one of their internal strategies (e.g. UniqueListStrategy).
388 # The returned object must have a `.do_filtered_draw(data)` method
389 # that behaves like `do_draw`, but returns the sentinel object
390 # `filter_not_satisfied` if the condition could not be satisfied.
391
392 # This is separate from the main `filter` method so that strategies
393 # can override `filter` without having to also guarantee a
394 # `do_filtered_draw` method.
395 return FilteredStrategy(conditions=(condition,), strategy=self)
396
397 @property
398 def branches(self) -> List["SearchStrategy[Ex]"]:
399 return [self]
400
401 def __or__(self, other: "SearchStrategy[T]") -> "SearchStrategy[Union[Ex, T]]":
402 """Return a strategy which produces values by randomly drawing from one
403 of this strategy or the other strategy.
404
405 This method is part of the public API.
406 """
407 if not isinstance(other, SearchStrategy):
408 raise ValueError(f"Cannot | a SearchStrategy with {other!r}")
409 return OneOfStrategy((self, other))
410
411 def __bool__(self) -> bool:
412 warnings.warn(
413 f"bool({self!r}) is always True, did you mean to draw a value?",
414 HypothesisWarning,
415 stacklevel=2,
416 )
417 return True
418
419 def validate(self) -> None:
420 """Throw an exception if the strategy is not valid.
421
422 This can happen due to lazy construction
423 """
424 if self.validate_called:
425 return
426 try:
427 self.validate_called = True
428 self.do_validate()
429 self.is_empty
430 self.has_reusable_values
431 except Exception:
432 self.validate_called = False
433 raise
434
435 LABELS: ClassVar[Dict[type, int]] = {}
436
437 @property
438 def class_label(self):
439 cls = self.__class__
440 try:
441 return cls.LABELS[cls]
442 except KeyError:
443 pass
444 result = calc_label_from_cls(cls)
445 cls.LABELS[cls] = result
446 return result
447
448 @property
449 def label(self) -> int:
450 if self.__label is calculating:
451 return 0
452 if self.__label is None:
453 self.__label = calculating
454 self.__label = self.calc_label()
455 return cast(int, self.__label)
456
457 def calc_label(self):
458 return self.class_label
459
460 def do_validate(self):
461 pass
462
463 def do_draw(self, data: ConjectureData) -> Ex:
464 raise NotImplementedError(f"{type(self).__name__}.do_draw")
465
466 def __init__(self):
467 pass
468
469
470def is_simple_data(value):
471 try:
472 hash(value)
473 return True
474 except TypeError:
475 return False
476
477
478class SampledFromStrategy(SearchStrategy):
479 """A strategy which samples from a set of elements. This is essentially
480 equivalent to using a OneOfStrategy over Just strategies but may be more
481 efficient and convenient.
482 """
483
484 _MAX_FILTER_CALLS = 10_000
485
486 def __init__(self, elements, repr_=None, transformations=()):
487 super().__init__()
488 self.elements = cu.check_sample(elements, "sampled_from")
489 assert self.elements
490 self.repr_ = repr_
491 self._transformations = transformations
492
493 def map(self, pack):
494 return type(self)(
495 self.elements,
496 repr_=self.repr_,
497 transformations=(*self._transformations, ("map", pack)),
498 )
499
500 def filter(self, condition):
501 return type(self)(
502 self.elements,
503 repr_=self.repr_,
504 transformations=(*self._transformations, ("filter", condition)),
505 )
506
507 def __repr__(self):
508 return (
509 self.repr_
510 or "sampled_from(["
511 + ", ".join(map(get_pretty_function_description, self.elements))
512 + "])"
513 ) + "".join(
514 f".{name}({get_pretty_function_description(f)})"
515 for name, f in self._transformations
516 )
517
518 def calc_has_reusable_values(self, recur):
519 # Because our custom .map/.filter implementations skip the normal
520 # wrapper strategies (which would automatically return False for us),
521 # we need to manually return False here if any transformations have
522 # been applied.
523 return not self._transformations
524
525 def calc_is_cacheable(self, recur):
526 return is_simple_data(self.elements)
527
528 def _transform(self, element):
529 # Used in UniqueSampledListStrategy
530 for name, f in self._transformations:
531 if name == "map":
532 result = f(element)
533 if build_context := _current_build_context.value:
534 build_context.record_call(result, f, [element], {})
535 element = result
536 else:
537 assert name == "filter"
538 if not f(element):
539 return filter_not_satisfied
540 return element
541
542 def do_draw(self, data):
543 result = self.do_filtered_draw(data)
544 if isinstance(result, SearchStrategy) and all(
545 isinstance(x, SearchStrategy) for x in self.elements
546 ):
547 data._sampled_from_all_strategies_elements_message = (
548 "sample_from was given a collection of strategies: "
549 "{!r}. Was one_of intended?",
550 self.elements,
551 )
552 if result is filter_not_satisfied:
553 data.mark_invalid(f"Aborted test because unable to satisfy {self!r}")
554 return result
555
556 def get_element(self, i):
557 return self._transform(self.elements[i])
558
559 def do_filtered_draw(self, data):
560 # Set of indices that have been tried so far, so that we never test
561 # the same element twice during a draw.
562 known_bad_indices = set()
563
564 # Start with ordinary rejection sampling. It's fast if it works, and
565 # if it doesn't work then it was only a small amount of overhead.
566 for _ in range(3):
567 i = data.draw_integer(0, len(self.elements) - 1)
568 if i not in known_bad_indices:
569 element = self.get_element(i)
570 if element is not filter_not_satisfied:
571 return element
572 if not known_bad_indices:
573 data.events[f"Retried draw from {self!r} to satisfy filter"] = ""
574 known_bad_indices.add(i)
575
576 # If we've tried all the possible elements, give up now.
577 max_good_indices = len(self.elements) - len(known_bad_indices)
578 if not max_good_indices:
579 return filter_not_satisfied
580
581 # Impose an arbitrary cutoff to prevent us from wasting too much time
582 # on very large element lists.
583 max_good_indices = min(max_good_indices, self._MAX_FILTER_CALLS - 3)
584
585 # Before building the list of allowed indices, speculatively choose
586 # one of them. We don't yet know how many allowed indices there will be,
587 # so this choice might be out-of-bounds, but that's OK.
588 speculative_index = data.draw_integer(0, max_good_indices - 1)
589
590 # Calculate the indices of allowed values, so that we can choose one
591 # of them at random. But if we encounter the speculatively-chosen one,
592 # just use that and return immediately. Note that we also track the
593 # allowed elements, in case of .map(some_stateful_function)
594 allowed = []
595 for i in range(min(len(self.elements), self._MAX_FILTER_CALLS - 3)):
596 if i not in known_bad_indices:
597 element = self.get_element(i)
598 if element is not filter_not_satisfied:
599 allowed.append((i, element))
600 if len(allowed) > speculative_index:
601 # Early-exit case: We reached the speculative index, so
602 # we just return the corresponding element.
603 data.draw_integer(0, len(self.elements) - 1, forced=i)
604 return element
605
606 # The speculative index didn't work out, but at this point we've built
607 # and can choose from the complete list of allowed indices and elements.
608 if allowed:
609 i, element = data.choice(allowed)
610 data.draw_integer(0, len(self.elements) - 1, forced=i)
611 return element
612 # If there are no allowed indices, the filter couldn't be satisfied.
613 return filter_not_satisfied
614
615
616class OneOfStrategy(SearchStrategy[Ex]):
617 """Implements a union of strategies. Given a number of strategies this
618 generates values which could have come from any of them.
619
620 The conditional distribution draws uniformly at random from some
621 non-empty subset of these strategies and then draws from the
622 conditional distribution of that strategy.
623 """
624
625 def __init__(self, strategies):
626 super().__init__()
627 strategies = tuple(strategies)
628 self.original_strategies = list(strategies)
629 self.__element_strategies = None
630 self.__in_branches = False
631
632 def calc_is_empty(self, recur):
633 return all(recur(e) for e in self.original_strategies)
634
635 def calc_has_reusable_values(self, recur):
636 return all(recur(e) for e in self.original_strategies)
637
638 def calc_is_cacheable(self, recur):
639 return all(recur(e) for e in self.original_strategies)
640
641 @property
642 def element_strategies(self):
643 if self.__element_strategies is None:
644 # While strategies are hashable, they use object.__hash__ and are
645 # therefore distinguished only by identity.
646 #
647 # In principle we could "just" define a __hash__ method
648 # (and __eq__, but that's easy in terms of type() and hash())
649 # to make this more powerful, but this is harder than it sounds:
650 #
651 # 1. Strategies are often distinguished by non-hashable attributes,
652 # or by attributes that have the same hash value ("^.+" / b"^.+").
653 # 2. LazyStrategy: can't reify the wrapped strategy without breaking
654 # laziness, so there's a hash each for the lazy and the nonlazy.
655 #
656 # Having made several attempts, the minor benefits of making strategies
657 # hashable are simply not worth the engineering effort it would take.
658 # See also issues #2291 and #2327.
659 seen = {self}
660 strategies = []
661 for arg in self.original_strategies:
662 check_strategy(arg)
663 if not arg.is_empty:
664 for s in arg.branches:
665 if s not in seen and not s.is_empty:
666 seen.add(s)
667 strategies.append(s)
668 self.__element_strategies = strategies
669 return self.__element_strategies
670
671 def calc_label(self):
672 return combine_labels(
673 self.class_label, *(p.label for p in self.original_strategies)
674 )
675
676 def do_draw(self, data: ConjectureData) -> Ex:
677 strategy = data.draw(
678 SampledFromStrategy(self.element_strategies).filter(
679 lambda s: s.available(data)
680 )
681 )
682 return data.draw(strategy)
683
684 def __repr__(self):
685 return "one_of(%s)" % ", ".join(map(repr, self.original_strategies))
686
687 def do_validate(self):
688 for e in self.element_strategies:
689 e.validate()
690
691 @property
692 def branches(self):
693 if not self.__in_branches:
694 try:
695 self.__in_branches = True
696 return self.element_strategies
697 finally:
698 self.__in_branches = False
699 else:
700 return [self]
701
702 def filter(self, condition):
703 return FilteredStrategy(
704 OneOfStrategy([s.filter(condition) for s in self.original_strategies]),
705 conditions=(),
706 )
707
708
709@overload
710def one_of(
711 __args: Sequence[SearchStrategy[Any]],
712) -> SearchStrategy[Any]: # pragma: no cover
713 ...
714
715
716@overload
717def one_of(__a1: SearchStrategy[Ex]) -> SearchStrategy[Ex]: # pragma: no cover
718 ...
719
720
721@overload
722def one_of(
723 __a1: SearchStrategy[Ex], __a2: SearchStrategy[T]
724) -> SearchStrategy[Union[Ex, T]]: # pragma: no cover
725 ...
726
727
728@overload
729def one_of(
730 __a1: SearchStrategy[Ex], __a2: SearchStrategy[T], __a3: SearchStrategy[T3]
731) -> SearchStrategy[Union[Ex, T, T3]]: # pragma: no cover
732 ...
733
734
735@overload
736def one_of(
737 __a1: SearchStrategy[Ex],
738 __a2: SearchStrategy[T],
739 __a3: SearchStrategy[T3],
740 __a4: SearchStrategy[T4],
741) -> SearchStrategy[Union[Ex, T, T3, T4]]: # pragma: no cover
742 ...
743
744
745@overload
746def one_of(
747 __a1: SearchStrategy[Ex],
748 __a2: SearchStrategy[T],
749 __a3: SearchStrategy[T3],
750 __a4: SearchStrategy[T4],
751 __a5: SearchStrategy[T5],
752) -> SearchStrategy[Union[Ex, T, T3, T4, T5]]: # pragma: no cover
753 ...
754
755
756@overload
757def one_of(*args: SearchStrategy[Any]) -> SearchStrategy[Any]: # pragma: no cover
758 ...
759
760
761@defines_strategy(never_lazy=True)
762def one_of(
763 *args: Union[Sequence[SearchStrategy[Any]], SearchStrategy[Any]]
764) -> SearchStrategy[Any]:
765 # Mypy workaround alert: Any is too loose above; the return parameter
766 # should be the union of the input parameters. Unfortunately, Mypy <=0.600
767 # raises errors due to incompatible inputs instead. See #1270 for links.
768 # v0.610 doesn't error; it gets inference wrong for 2+ arguments instead.
769 """Return a strategy which generates values from any of the argument
770 strategies.
771
772 This may be called with one iterable argument instead of multiple
773 strategy arguments, in which case ``one_of(x)`` and ``one_of(*x)`` are
774 equivalent.
775
776 Examples from this strategy will generally shrink to ones that come from
777 strategies earlier in the list, then shrink according to behaviour of the
778 strategy that produced them. In order to get good shrinking behaviour,
779 try to put simpler strategies first. e.g. ``one_of(none(), text())`` is
780 better than ``one_of(text(), none())``.
781
782 This is especially important when using recursive strategies. e.g.
783 ``x = st.deferred(lambda: st.none() | st.tuples(x, x))`` will shrink well,
784 but ``x = st.deferred(lambda: st.tuples(x, x) | st.none())`` will shrink
785 very badly indeed.
786 """
787 if len(args) == 1 and not isinstance(args[0], SearchStrategy):
788 try:
789 args = tuple(args[0])
790 except TypeError:
791 pass
792 if len(args) == 1 and isinstance(args[0], SearchStrategy):
793 # This special-case means that we can one_of over lists of any size
794 # without incurring any performance overhead when there is only one
795 # strategy, and keeps our reprs simple.
796 return args[0]
797 if args and not any(isinstance(a, SearchStrategy) for a in args):
798 # And this special case is to give a more-specific error message if it
799 # seems that the user has confused `one_of()` for `sampled_from()`;
800 # the remaining validation is left to OneOfStrategy. See PR #2627.
801 raise InvalidArgument(
802 f"Did you mean st.sampled_from({list(args)!r})? st.one_of() is used "
803 "to combine strategies, but all of the arguments were of other types."
804 )
805 return OneOfStrategy(args)
806
807
808class MappedStrategy(SearchStrategy[Ex]):
809 """A strategy which is defined purely by conversion to and from another
810 strategy.
811
812 Its parameter and distribution come from that other strategy.
813 """
814
815 def __init__(self, strategy, pack):
816 super().__init__()
817 self.mapped_strategy = strategy
818 self.pack = pack
819
820 def calc_is_empty(self, recur):
821 return recur(self.mapped_strategy)
822
823 def calc_is_cacheable(self, recur):
824 return recur(self.mapped_strategy)
825
826 def __repr__(self):
827 if not hasattr(self, "_cached_repr"):
828 self._cached_repr = f"{self.mapped_strategy!r}.map({get_pretty_function_description(self.pack)})"
829 return self._cached_repr
830
831 def do_validate(self):
832 self.mapped_strategy.validate()
833
834 def do_draw(self, data: ConjectureData) -> Any:
835 with warnings.catch_warnings():
836 if isinstance(self.pack, type) and issubclass(
837 self.pack, (abc.Mapping, abc.Set)
838 ):
839 warnings.simplefilter("ignore", BytesWarning)
840 for _ in range(3):
841 try:
842 data.start_example(MAPPED_SEARCH_STRATEGY_DO_DRAW_LABEL)
843 x = data.draw(self.mapped_strategy)
844 result = self.pack(x) # type: ignore
845 data.stop_example()
846 _current_build_context.value.record_call(result, self.pack, [x], {})
847 return result
848 except UnsatisfiedAssumption:
849 data.stop_example(discard=True)
850 raise UnsatisfiedAssumption
851
852 @property
853 def branches(self) -> List[SearchStrategy[Ex]]:
854 return [
855 MappedStrategy(strategy, pack=self.pack)
856 for strategy in self.mapped_strategy.branches
857 ]
858
859 def filter(self, condition: Callable[[Ex], Any]) -> "SearchStrategy[Ex]":
860 # Includes a special case so that we can rewrite filters on collection
861 # lengths, when most collections are `st.lists(...).map(the_type)`.
862 ListStrategy = _list_strategy_type()
863 if not isinstance(self.mapped_strategy, ListStrategy) or not (
864 (isinstance(self.pack, type) and issubclass(self.pack, abc.Collection))
865 or self.pack in _collection_ish_functions()
866 ):
867 return super().filter(condition)
868
869 # Check whether our inner list strategy can rewrite this filter condition.
870 # If not, discard the result and _only_ apply a new outer filter.
871 new = ListStrategy.filter(self.mapped_strategy, condition)
872 if getattr(new, "filtered_strategy", None) is self.mapped_strategy:
873 return super().filter(condition) # didn't rewrite
874
875 # Apply a new outer filter even though we rewrote the inner strategy,
876 # because some collections can change the list length (dict, set, etc).
877 return FilteredStrategy(type(self)(new, self.pack), conditions=(condition,))
878
879
880@lru_cache
881def _list_strategy_type():
882 from hypothesis.strategies._internal.collections import ListStrategy
883
884 return ListStrategy
885
886
887def _collection_ish_functions():
888 funcs = [sorted]
889 if np := sys.modules.get("numpy"):
890 # c.f. https://numpy.org/doc/stable/reference/routines.array-creation.html
891 # Probably only `np.array` and `np.asarray` will be used in practice,
892 # but why should that stop us when we've already gone this far?
893 funcs += [
894 np.empty_like,
895 np.eye,
896 np.identity,
897 np.ones_like,
898 np.zeros_like,
899 np.array,
900 np.asarray,
901 np.asanyarray,
902 np.ascontiguousarray,
903 np.asmatrix,
904 np.copy,
905 np.rec.array,
906 np.rec.fromarrays,
907 np.rec.fromrecords,
908 np.diag,
909 # bonus undocumented functions from tab-completion:
910 np.asarray_chkfinite,
911 np.asfortranarray,
912 ]
913
914 return funcs
915
916
917filter_not_satisfied = UniqueIdentifier("filter not satisfied")
918
919
920class FilteredStrategy(SearchStrategy[Ex]):
921 def __init__(self, strategy, conditions):
922 super().__init__()
923 if isinstance(strategy, FilteredStrategy):
924 # Flatten chained filters into a single filter with multiple conditions.
925 self.flat_conditions = strategy.flat_conditions + conditions
926 self.filtered_strategy = strategy.filtered_strategy
927 else:
928 self.flat_conditions = conditions
929 self.filtered_strategy = strategy
930
931 assert isinstance(self.flat_conditions, tuple)
932 assert not isinstance(self.filtered_strategy, FilteredStrategy)
933
934 self.__condition = None
935
936 def calc_is_empty(self, recur):
937 return recur(self.filtered_strategy)
938
939 def calc_is_cacheable(self, recur):
940 return recur(self.filtered_strategy)
941
942 def __repr__(self):
943 if not hasattr(self, "_cached_repr"):
944 self._cached_repr = "{!r}{}".format(
945 self.filtered_strategy,
946 "".join(
947 f".filter({get_pretty_function_description(cond)})"
948 for cond in self.flat_conditions
949 ),
950 )
951 return self._cached_repr
952
953 def do_validate(self):
954 # Start by validating our inner filtered_strategy. If this was a LazyStrategy,
955 # validation also reifies it so that subsequent calls to e.g. `.filter()` will
956 # be passed through.
957 self.filtered_strategy.validate()
958 # So now we have a reified inner strategy, we'll replay all our saved
959 # predicates in case some or all of them can be rewritten. Note that this
960 # replaces the `fresh` strategy too!
961 fresh = self.filtered_strategy
962 for cond in self.flat_conditions:
963 fresh = fresh.filter(cond)
964 if isinstance(fresh, FilteredStrategy):
965 # In this case we have at least some non-rewritten filter predicates,
966 # so we just re-initialize the strategy.
967 FilteredStrategy.__init__(
968 self, fresh.filtered_strategy, fresh.flat_conditions
969 )
970 else:
971 # But if *all* the predicates were rewritten... well, do_validate() is
972 # an in-place method so we still just re-initialize the strategy!
973 FilteredStrategy.__init__(self, fresh, ())
974
975 def filter(self, condition):
976 # If we can, it's more efficient to rewrite our strategy to satisfy the
977 # condition. We therefore exploit the fact that the order of predicates
978 # doesn't matter (`f(x) and g(x) == g(x) and f(x)`) by attempting to apply
979 # condition directly to our filtered strategy as the inner-most filter.
980 out = self.filtered_strategy.filter(condition)
981 # If it couldn't be rewritten, we'll get a new FilteredStrategy - and then
982 # combine the conditions of each in our expected newest=last order.
983 if isinstance(out, FilteredStrategy):
984 return FilteredStrategy(
985 out.filtered_strategy, self.flat_conditions + out.flat_conditions
986 )
987 # But if it *could* be rewritten, we can return the more efficient form!
988 return FilteredStrategy(out, self.flat_conditions)
989
990 @property
991 def condition(self):
992 if self.__condition is None:
993 if len(self.flat_conditions) == 1:
994 # Avoid an extra indirection in the common case of only one condition.
995 self.__condition = self.flat_conditions[0]
996 elif len(self.flat_conditions) == 0:
997 # Possible, if unlikely, due to filter predicate rewriting
998 self.__condition = lambda _: True
999 else:
1000 self.__condition = lambda x: all(
1001 cond(x) for cond in self.flat_conditions
1002 )
1003 return self.__condition
1004
1005 def do_draw(self, data: ConjectureData) -> Ex:
1006 result = self.do_filtered_draw(data)
1007 if result is not filter_not_satisfied:
1008 return result
1009
1010 data.mark_invalid(f"Aborted test because unable to satisfy {self!r}")
1011 raise NotImplementedError("Unreachable, for Mypy")
1012
1013 def do_filtered_draw(self, data):
1014 for i in range(3):
1015 data.start_example(FILTERED_SEARCH_STRATEGY_DO_DRAW_LABEL)
1016 value = data.draw(self.filtered_strategy)
1017 if self.condition(value):
1018 data.stop_example()
1019 return value
1020 else:
1021 data.stop_example(discard=True)
1022 if i == 0:
1023 data.events[f"Retried draw from {self!r} to satisfy filter"] = ""
1024
1025 return filter_not_satisfied
1026
1027 @property
1028 def branches(self) -> List[SearchStrategy[Ex]]:
1029 return [
1030 FilteredStrategy(strategy=strategy, conditions=self.flat_conditions)
1031 for strategy in self.filtered_strategy.branches
1032 ]
1033
1034
1035@check_function
1036def check_strategy(arg, name=""):
1037 assert isinstance(name, str)
1038 if not isinstance(arg, SearchStrategy):
1039 hint = ""
1040 if isinstance(arg, (list, tuple)):
1041 hint = ", such as st.sampled_from({}),".format(name or "...")
1042 if name:
1043 name += "="
1044 raise InvalidArgument(
1045 "Expected a SearchStrategy%s but got %s%r (type=%s)"
1046 % (hint, name, arg, type(arg).__name__)
1047 )